本文概览:使用spring线程池来实现多线程处理detail,并且需要每一线程会返回一个结果。
1 spring线程池与JDK线程池区别
spring线程池是通过JDK的线程池ThreadPoolExecutor的实现。但是具有如下两个优点
(1)不需要自己设置阻塞队列
只需要设置阻塞队列大小,不需要指定那种阻塞队列。
1 2 3 4 5 6 7 8 |
protected BlockingQueue<Runnable> createQueue(int queueCapacity) { if (queueCapacity > 0) { return new LinkedBlockingQueue<Runnable>(queueCapacity); } else { return new SynchronousQueue<Runnable>(); } } |
(2)自动关闭线程池
通过实现DisposableBean的destroy方法
1 2 3 |
public void destroy() { shutdown(); } |
2 实例
2.1 场景
我们有一个场景,分批从数据库中读取1000条数据Detail,使用调用外部服务(HTTP的服务接口格式)处理这1000个数据,然后记录成功个数和失败个数。对于上述操作进行优化,我们可以通过多线程方式来实现调用外部服务来处理这个数据。
2.2 代码
1、实现要点
- 需要获取每一个线程执行之后的结果
2、sping的xml中定义线程池
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
<!--处理Detail的线程池--> <bean id="processExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"> <!-- 线程池维护线程的最少数量 默认为1 --> <property name="corePoolSize" value="50"/> <!-- 线程池维护线程的最大数量 默认为Integer.MAX_VALUE --> <property name="maxPoolSize" value="80"/> <!-- 线程池所使用的缓冲队列最大长度 一般需要设置值>=notifyScheduledMainExecutor.maxNum;默认为Integer.MAX_VALUE --> <property name="queueCapacity" value="5000"/> <!-- 线程池维护线程所允许的空闲时间 默认为60s --> <property name="keepAliveSeconds" value="300"/> <!-- 线程池对拒绝任务(无线程可用)的处理策略(饱和策略),目前只支持AbortPolicy、CallerRunsPolicy --> <property name="rejectedExecutionHandler"> <!-- AbortPolicy:默认的策略,处理程序遭到拒绝将抛出运行时 RejectedExecutionException --> <!-- CallerRunsPolicy:调用者的线程会执行该任务,如果执行器已关闭,则丢弃. --> <!-- DiscardPolicy:不能执行的任务将被丢弃. --> <!-- DiscardOldestPolicy:如果执行程序尚未关闭,则位于工作队列头部的任务将被删除,然后重试执行程序(如果再次失败,则重复此过程). --> <bean class="java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy"/> </property> </bean> |
一般情况下我们都设置corePoolSzie和maxPoolSize的大小是一样的,那么此时也就不需要keepAliveSeconds了。如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
<!--处理Detail的线程池--> <bean id="processExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"> <!-- 线程池维护线程的最少数量 默认为1 --> <property name="corePoolSize" value="50"/> <!-- 线程池维护线程的最大数量 默认为Integer.MAX_VALUE --> <property name="maxPoolSize" value="50"/> <!-- 线程池所使用的缓冲队列最大长度 一般需要设置值>=notifyScheduledMainExecutor.maxNum;默认为Integer.MAX_VALUE --> <property name="queueCapacity" value="5000"/> <!-- 线程池维护线程所允许的空闲时间 默认为60s --> <!-- <property name="keepAliveSeconds" value="300"/> --> <!-- 线程池对拒绝任务(无线程可用)的处理策略(饱和策略),目前只支持AbortPolicy、CallerRunsPolicy --> <property name="rejectedExecutionHandler"> <!-- AbortPolicy:默认的策略,处理程序遭到拒绝将抛出运行时 RejectedExecutionException --> <!-- CallerRunsPolicy:调用者的线程会执行该任务,如果执行器已关闭,则丢弃. --> <!-- DiscardPolicy:不能执行的任务将被丢弃. --> <!-- DiscardOldestPolicy:如果执行程序尚未关闭,则位于工作队列头部的任务将被删除,然后重试执行程序(如果再次失败,则重复此过程). --> <bean class="java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy"/> </property> </bean> |
对于线程池的大小策略为:CPU个数+1。cpu的个数查看参考:
3、相关代码
(1)待处理的对象
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
/** * 待多线程处理的对象 */ public class Detail { // 金额 int amount; // 状态 boolean status; Detail(int amount, boolean status) { this.amount = amount; this.status = status; } public int getAmount() { return amount; } public void setAmount(int amountCount) { this.amount = amountCount; } public boolean isStatus() { return status; } public void setStatus(boolean status) { this.status = status; } } |
(2)多线程处理detail抽象类
- 定义了一个Callable(ProcessCallable)和Callable的返回类型(ProcessCallableResult)的两个类
- 定义了一个获取线程持仓的抽象方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 |
/** * 多线程带有返回结果的抽象类 */ public abstract class ProcessWithResult { private static final Logger logger = LoggerFactory.getLogger(ProcessWithResult.class); public void process() { // 1.初始化待处理对象 List<Detail> detailList = Lists.newArrayList(); detailList.add(new Detail(10, true)); detailList.add(new Detail(2, false)); detailList.add(new Detail(3, true)); detailList.add(new Detail(4, true)); detailList.add(new Detail(5, false)); // 2.多线程处理 ThreadPoolTaskExecutor threadPool = getThreadPool(); int totalCount = 0; int totalAmount = 0; int successCount = 0; int successAmount = 0; List<Future<ProcessCallableResult>> callResults = Lists.newArrayList(); boolean isProcessSucess = true; for (Detail detail : detailList) { totalAmount += detail.getAmount(); totalCount++; // 多线程处理 callResults.add(threadPool.submit(new ProcessCallable(detail))); } // 3.处理每一个结果 for (Future<ProcessCallableResult> callableResult : callResults) { try { ProcessCallableResult result = callableResult.get(); if (!result.isResultStatus()) { isProcessSucess = false; continue; } // 记录成功的信息 successAmount += result.getDetail().getAmount(); successCount++; } catch (Exception e) { logger.error("Future.get()失败"); throw new RuntimeException(e.getMessage(), e); } } int failCount = totalCount - successCount; int failAmount = totalAmount - successAmount; if (!isProcessSucess) { logger.info("执行失败"); } logger.info("程序执行完成,处理结果为:totalCount" + totalCount + ";totalAmount=" + totalAmount + ";successCount=" + successCount + ";successAmount=" + successAmount + ";failCount=" + failCount + ";failAmount=" + failAmount); } /** * 处理一个detail的逻辑 * * @return */ protected abstract boolean processOneDetail(Detail detail); /****************************************************************************/ /**---------------------- 多线程beg ----------------------- */ /***************************************************************************/ /** * 获取线程池 * * @return */ protected ThreadPoolTaskExecutor getThreadPool() { return null; } /** * 自定义Callable */ public class ProcessCallable implements Callable<ProcessCallableResult> { private Detail detail; public ProcessCallable(Detail detail) { this.detail = detail; } public ProcessCallableResult call() throws Exception { boolean processResult; try { processResult = processOneDetail(detail); } catch (Exception e) { logger.info("process detail fail-exception"); return new ProcessCallableResult(false, detail); } if (processResult) { logger.info("process detail success"); return new ProcessCallableResult(true, detail); } else { logger.info("process detail fail"); return new ProcessCallableResult(false, detail); } } public Detail getDetail() { return detail; } public void setDetail(Detail detail) { this.detail = detail; } } /** * 自定义Callable返回的结果类型 */ private class ProcessCallableResult { private boolean resultStatus; private Detail detail; ProcessCallableResult(boolean resultStatus, Detail detail) { this.resultStatus = resultStatus; this.detail = detail; } public boolean isResultStatus() { return resultStatus; } public void setResultStatus(boolean resultStatus) { this.resultStatus = resultStatus; } public Detail getDetail() { return detail; } public void setDetail(Detail detail) { this.detail = detail; } } /****************************************************************************/ /**-------------------------------- 多线程bend ------------------------- */ /****************************************************************************/ } |
(3)多线程处理detail的具体类
实现了两个方法
- 处理detail的方法processOneDetail
- 获取线程池方法getThreadPool
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
@Service public class DetailProcessWithReturn extends ProcessWithResult { @Resource(name = "processExecutor") private ThreadPoolTaskExecutor processExecutor; @Override protected ThreadPoolTaskExecutor getThreadPool() { return processExecutor; } @Override protected boolean processOneDetail(Detail detail) { // 1.调用外部服务处理detail // TODO // 2.根据处理的结果返回状态 // 这里为了测试,通过detail的status来表示已经处理后的http接口返回的结果。 if (detail.isStatus()) { return true; } else { return false; } } } |
(4)测试类
具体测试类
1 2 3 4 5 6 7 8 |
public class DetailProcessTest extends BaseTest { @Resource private DetailProcessWithReturn detailProcessWithReturn; @Test public void testProcess() { detailProcessWithReturn.process(); } } |
抽象测试类
1 2 3 4 |
@ContextConfiguration("classpath:applicationContext.xml") @RunWith(SpringJUnit4ClassRunner.class) public class BaseTest { } |
执行结果为
14:30:27.017 [processExecutor-4] INFO concurrent.ProcessWithResult – process detail success
14:30:27.017 [processExecutor-1] INFO concurrent.ProcessWithResult – process detail success
14:30:27.017 [processExecutor-5] INFO concurrent.ProcessWithResult – process detail fail
14:30:27.017 [processExecutor-3] INFO concurrent.ProcessWithResult – process detail success
14:30:27.017 [processExecutor-2] INFO concurrent.ProcessWithResult – process detail fail
14:30:27.025 [main] INFO concurrent.ProcessWithResult – 执行失败
14:30:27.025 [main] INFO concurrent.ProcessWithResult – 程序执行完成,处理结果为:totalCount5;totalAmount=24;successCount=3;successAmount=17;failCount=2;failAmount=7
4、 github的代码地址
git@github.com:zhonghuwu/iocTemplate.git
(全文完)