线程助手 - ThreadUtil 曾几何时,多线程是JAVA里面很少触碰的技术,而随着技术的发展,对于多线程使用越来越多,在合适的时候合理的使用多线程,对于性能有着质的提高, 但是如果自己从头写,会比较繁琐,那么我们就封装一个吧(这也是feilong的精髓思想之所在) 我们来看看 线程助手 - `ThreadUtil` ## 1.执行. 方法 | Description :---- | :--------- void **execute**(List<T> list,int eachSize,Map<String, ?> paramsMap,PartitionRunnableBuilder<T> partitionRunnableBuilder) | 给定一个待解析的 list,设定每个线程执行多少条 eachSize,传入一些额外的参数 paramsMap,使用自定义的 partitionRunnableBuilder,自动构造多条线程并运行.. ### 1.1 void **execute**(List<T> list,int eachSize,Map<String, ?> paramsMap,PartitionRunnableBuilder<T> partitionRunnableBuilder) 给定一个待解析的 list,设定每个线程执行多少条 eachSize,传入一些额外的参数 paramsMap,使用自定义的 partitionRunnableBuilder,自动构造多条线程并运行. **适用场景:** 比如同步库存,一次从MQ或者其他接口中得到了5000条数据,如果使用单线程做5000次循环,势必会比较慢,并且影响性能; 如果调用这个方法,传入eachSize=100, 那么自动会开启5000/100=50 个线程来跑功能,大大提高同步库存的速度 其他的适用场景还有诸如同步商品主档数据,同步订单等等这类每个独立对象之间没有相关联关系的数据,能提高执行速度和效率 **重构:** 对于以下代码:模拟10个对象/数字,循环执行任务(可能是操作数据库等) ```JAVA public void testExecuteTest() throws InterruptedException{ Date beginDate = new Date(); List<Integer> list = toList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9); for (Integer integer : list){ //------ //模拟 do something //--------- Thread.sleep(1 * MILLISECOND_PER_SECONDS); } LOGGER.info("use time:{}", formatDuration(beginDate)); } ``` 统计总耗时时间 需要 use time:`10秒28毫秒` 此时你可以调用此方法,改成多线程执行: ```JAVA public void testExecuteTestUsePartitionRunnableBuilder() throws InterruptedException{ Date beginDate = new Date(); List<Integer> list = toList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9); //每个线程执行2条数据, 没有自定义 paramsMap //将会自动创建 list.size()/2 =5个 线程执行 //每个线程执行的,将会是 PartitionRunnableBuilder build 返回的 Runnable ThreadUtil.execute(list, 1, null, new PartitionRunnableBuilder<Integer>(){ @Override public Runnable build(final List<Integer> perBatchList,PartitionThreadEntity partitionThreadEntity,Map<String, ?> paramsMap){ return new Runnable(){ @Override public void run(){ for (Integer integer : perBatchList){ //------ //模拟 do something //--------- try{ Thread.sleep(1 * MILLISECOND_PER_SECONDS); }catch (InterruptedException e){ LOGGER.error("", e); } } } }; } }); LOGGER.info("use time:{}", formatDuration(beginDate)); } ``` 统计总耗时时间 需要 use time:`2秒36毫秒` 对于上述的case,如果将 eachSize 参数由2 改成1, 统计总耗时时间 需要 use time:`1秒36毫秒` 可见 调用该方法,使用多线程能节省执行时间,提高效率; 但是也需要酌情考虑eachSize大小,合理的开启线程数量 **说明:** - 线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性; - 需要注意合理的评估list 的大小和eachSize 比率; 不建议list size很大,比如 20W,而eachSize值很小,比如2 ,那么会开启20W/2=10W个线程;此时建议考虑 线程池的实现方案 - 对于参数 paramsMap 的使用: 比如你需要拿到最终每条数据执行的结果,以便后续进行处理(比如对失败的操作再次执行或者发送汇报邮件等) ```JAVA public void testExecuteTestUsePartitionRunnableBuilderParamsMap() throws InterruptedException{ Date beginDate = new Date(); List<Integer> list = toList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9); final Map<Integer, Boolean> indexAndResultMap = Collections.synchronizedSortedMap(new TreeMap()); ThreadUtil.execute(list, 2, null, new PartitionRunnableBuilder<Integer>(){ @Override public Runnable build( final List<Integer> perBatchList, final PartitionThreadEntity partitionThreadEntity, Map<String, ?> paramsMap){ return new Runnable(){ @Override public void run(){ int i = 0; for (Integer integer : perBatchList){ //------ //模拟 do something //--------- try{ Thread.sleep(1 * MILLISECOND_PER_SECONDS); }catch (InterruptedException e){ LOGGER.error("", e); } int indexInTotalList = getIndex(partitionThreadEntity, i); //模拟 当值是 5 或者8 的时候 操作结果是false boolean result = (integer == 5 || integer == 8) ? false : true; indexAndResultMap.put(indexInTotalList, result); i++; } } private Integer getIndex(PartitionThreadEntity partitionThreadEntity,int i){ int batchNumber = partitionThreadEntity.getBatchNumber(); return batchNumber * partitionThreadEntity.getEachSize() + i; } }; } }); LOGGER.debug(JsonUtil.format(indexAndResultMap)); LOGGER.info("use time:{}", formatDuration(beginDate)); } ``` 输出结果: ```JAVA 29:21 DEBUG (ThreadUtilExample.java:161) [testExecuteTestUsePartitionRunnableBuilderParamsMap()] { "0": true, "1": true, "2": true, "3": true, "4": true, "5": false, "6": true, "7": true, "8": false, "9": true } 29:21 INFO (ThreadUtilExample.java:164) [testExecuteTestUsePartitionRunnableBuilderParamsMap()] use time:2秒181毫秒 ``` ## 2.创建指定数量 **threadCount** 的线程,并执行 方法 | Description :---- | :--------- void **execute**(Runnable runnable,int threadCount) | 创建指定数量 threadCount 的线程,并执行. ### 2.1 void **execute**(Runnable runnable,int threadCount) 创建指定数量 **threadCount** 的线程,并执行.