<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
本文範例為大家分享了Java實現多工執行助手的具體程式碼,供大家參考,具體內容如下
1.多執行緒執行任務類
package com.visy.threadpool; import com.visy.executor.ExecutorFactory; import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.springframework.context.annotation.Configuration; @Configuration public class ThreadPoolConfig { private TheadPoolProperties theadPoolProperties; private ThreadPoolExecutor executor; private ThreadPoolExecutor executorChild; public ThreadPoolConfig(TheadPoolProperties theadPoolProperties) { this.theadPoolProperties = theadPoolProperties; this.executor = ExecutorFactory.getInstance().getThreadPoolExecutor("ThreadPoolConfig-service", theadPoolProperties.getQueueSize(), theadPoolProperties.getCoreThreadNum(), theadPoolProperties.getMaxPoolSize()); this.executorChild = ExecutorFactory.getInstance().getThreadPoolExecutor("ThreadPoolConfig-service-child", theadPoolProperties.getQueueSize(), theadPoolProperties.getCoreThreadNum(), theadPoolProperties.getMaxPoolSize()); } public <V> List<V> doConcurrentTask(List<Callable<V>> taskList, ThreadPoolExecutor... executorChilds) { if (taskList != null && !taskList.isEmpty()) { List<V> resultList = new ArrayList(); List futureList = null; try { if (this.executor.getQueue().size() >= this.theadPoolProperties.getQueueSize()) { throw new RuntimeException("queue size bigger than 100, now size is " + this.executor.getQueue().size()); } if (executorChilds != null && executorChilds.length > 0 && executorChilds[0] != null) { futureList = executorChilds[0].invokeAll(taskList); } else { futureList = this.executor.invokeAll(taskList, (long)this.theadPoolProperties.getTimeOut(), TimeUnit.SECONDS); } } catch (InterruptedException var6) { var6.printStackTrace(); } this.doFutureList(resultList, futureList); return resultList; } else { return null; } } <V> void doFutureList(List<V> resultList, List<Future<V>> futureList) { if (futureList != null) { Iterator var3 = futureList.iterator(); while(var3.hasNext()) { Future future = (Future)var3.next(); try { resultList.add(future.get()); } catch (ExecutionException | InterruptedException var6) { var6.printStackTrace(); } } } } public <V> void doVoidConcurrentTask(List<Callable<V>> taskList) { if (taskList != null && !taskList.isEmpty()) { Iterator var2 = taskList.iterator(); while(var2.hasNext()) { Callable<V> call = (Callable)var2.next(); this.executor.submit(call); } } } public TheadPoolProperties getTheadPoolProperties() { return this.theadPoolProperties; } public ThreadPoolExecutor getExecutor() { return this.executor; } public ThreadPoolExecutor getExecutorChild() { return this.executorChild; } public void setTheadPoolProperties(TheadPoolProperties theadPoolProperties) { this.theadPoolProperties = theadPoolProperties; } public void setExecutor(ThreadPoolExecutor executor) { this.executor = executor; } public void setExecutorChild(ThreadPoolExecutor executorChild) { this.executorChild = executorChild; } public boolean equals(Object o) { if (o == this) { return true; } else if (!(o instanceof ThreadPoolConfig)) { return false; } else { ThreadPoolConfig other = (ThreadPoolConfig)o; if (!other.canEqual(this)) { return false; } else { label47: { Object this$theadPoolProperties = this.getTheadPoolProperties(); Object other$theadPoolProperties = other.getTheadPoolProperties(); if (this$theadPoolProperties == null) { if (other$theadPoolProperties == null) { break label47; } } else if (this$theadPoolProperties.equals(other$theadPoolProperties)) { break label47; } return false; } Object this$executor = this.getExecutor(); Object other$executor = other.getExecutor(); if (this$executor == null) { if (other$executor != null) { return false; } } else if (!this$executor.equals(other$executor)) { return false; } Object this$executorChild = this.getExecutorChild(); Object other$executorChild = other.getExecutorChild(); if (this$executorChild == null) { if (other$executorChild != null) { return false; } } else if (!this$executorChild.equals(other$executorChild)) { return false; } return true; } } } protected boolean canEqual(Object other) { return other instanceof ThreadPoolConfig; } public int hashCode() { int PRIME = true; int result = 1; Object $theadPoolProperties = this.getTheadPoolProperties(); int result = result * 59 + ($theadPoolProperties == null ? 43 : $theadPoolProperties.hashCode()); Object $executor = this.getExecutor(); result = result * 59 + ($executor == null ? 43 : $executor.hashCode()); Object $executorChild = this.getExecutorChild(); result = result * 59 + ($executorChild == null ? 43 : $executorChild.hashCode()); return result; } public String toString() { return "ThreadPoolConfig(theadPoolProperties=" + this.getTheadPoolProperties() + ", executor=" + this.getExecutor() + ", executorChild=" + this.getExecutorChild() + ")"; } }
2.執行器工廠類
package com.visy.executor; import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Future; import java.util.concurrent.ThreadPoolExecutor; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class ExecutorFactory { private static final Logger logger = LoggerFactory.getLogger(ExecutorFactory.class); private static final Map<String, ThreadPoolExecutor> threadPoolExecutorMap = new ConcurrentHashMap(); private static final int DEFAULT_QUEUE_SIZE = 1000; private static final String DEFAULT_EXECUTOR_NAME = "default-executor"; private static final int MAX_THREAD_NUM = 100; private static final int CORE_THREAD_NUM = 1; private static volatile ExecutorFactory instance; private ExecutorFactory() { } public static ExecutorFactory getInstance() { if (instance == null) { Class var0 = ExecutorFactory.class; synchronized(ExecutorFactory.class) { if (instance == null) { instance = new ExecutorFactory(); } } } return instance; } public ThreadPoolExecutor getThreadPoolExecutorByName(String name) { return (ThreadPoolExecutor)threadPoolExecutorMap.get(name); } public static Map<String, ThreadPoolExecutor> getThreadPoolExecutorMap() { return threadPoolExecutorMap; } public ThreadPoolExecutor getThreadPoolExecutor(String threadPoolExecutorName, int queueSize, int coreThreadNum, int maxPoolSize) { if (StringUtils.isBlank(threadPoolExecutorName)) { throw new IllegalArgumentException("thread name empty"); } else { if (!threadPoolExecutorMap.containsKey(threadPoolExecutorName)) { Class var5 = ExecutorFactory.class; synchronized(ExecutorFactory.class) { if (!threadPoolExecutorMap.containsKey(threadPoolExecutorName)) { ThreadPoolExecutor executor = (new ThreadPool(coreThreadNum, maxPoolSize, 30L, queueSize, threadPoolExecutorName)).getExecutor(); threadPoolExecutorMap.put(threadPoolExecutorName, executor); logger.info("thread name: {} executor created", threadPoolExecutorName); } } } return (ThreadPoolExecutor)threadPoolExecutorMap.get(threadPoolExecutorName); } } public <T extends Runnable> void submit(T t) { ThreadPoolExecutor defaultExecutor = this.getThreadPoolExecutor(); defaultExecutor.submit(t); } public <T extends Runnable> void submit(String poolName, T t) { ThreadPoolExecutor executor = this.getThreadPoolExecutorByName(poolName); if (executor == null) { logger.error("thread name: {} executor not exist.", poolName); throw new IllegalArgumentException("thread name:" + poolName + " executor not exist."); } else { executor.submit(t); } } public <T extends Callable<Object>> Future<Object> submit(T t) { ThreadPoolExecutor defaultExecutor = this.getThreadPoolExecutor(); return defaultExecutor.submit(t); } public <T extends Callable<Object>> Future<Object> submit(String poolName, T t) { ThreadPoolExecutor executor = this.getThreadPoolExecutorByName(poolName); if (executor == null) { logger.error("thread poolName: {} executor not exist.", poolName); throw new IllegalArgumentException("thread poolName:" + poolName + " executor not exist."); } else { return executor.submit(t); } } public ThreadPoolExecutor getThreadPoolExecutor() { return this.getThreadPoolExecutor("default-executor", 1000, 1, 100); } }
3.多執行緒設定類
package com.visy.threadpool; import javax.validation.constraints.NotNull; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Configuration; import org.springframework.validation.annotation.Validated; @Validated @Configuration @ConfigurationProperties(prefix = "visy.threadpool") public class TheadPoolProperties { // 執行並行任務時,等待多久時間超時(單位:秒) @NotNull private Integer timeOut; // 佇列大小 @NotNull private Integer queueSize; // 核心執行緒數量 @NotNull private Integer coreThreadNum; // 執行緒池最大執行緒數量 @NotNull private Integer maxPoolSize; // 並行執行每組大小 private Integer groupSize = 20; public TheadPoolProperties() { } public Integer getTimeOut() { return this.timeOut; } public Integer getQueueSize() { return this.queueSize; } public Integer getCoreThreadNum() { return this.coreThreadNum; } public Integer getMaxPoolSize() { return this.maxPoolSize; } public Integer getGroupSize() { return this.groupSize; } public void setTimeOut(Integer timeOut) { this.timeOut = timeOut; } public void setQueueSize(Integer queueSize) { this.queueSize = queueSize; } public void setCoreThreadNum(Integer coreThreadNum) { this.coreThreadNum = coreThreadNum; } public void setMaxPoolSize(Integer maxPoolSize) { this.maxPoolSize = maxPoolSize; } public void setGroupSize(Integer groupSize) { this.groupSize = groupSize; } public boolean equals(Object o) { if (o == this) { return true; } else if (!(o instanceof TheadPoolProperties)) { return false; } else { TheadPoolProperties other = (TheadPoolProperties)o; if (!other.canEqual(this)) { return false; } else { label71: { Object this$timeOut = this.getTimeOut(); Object other$timeOut = other.getTimeOut(); if (this$timeOut == null) { if (other$timeOut == null) { break label71; } } else if (this$timeOut.equals(other$timeOut)) { break label71; } return false; } Object this$queueSize = this.getQueueSize(); Object other$queueSize = other.getQueueSize(); if (this$queueSize == null) { if (other$queueSize != null) { return false; } } else if (!this$queueSize.equals(other$queueSize)) { return false; } label57: { Object this$coreThreadNum = this.getCoreThreadNum(); Object other$coreThreadNum = other.getCoreThreadNum(); if (this$coreThreadNum == null) { if (other$coreThreadNum == null) { break label57; } } else if (this$coreThreadNum.equals(other$coreThreadNum)) { break label57; } return false; } Object this$maxPoolSize = this.getMaxPoolSize(); Object other$maxPoolSize = other.getMaxPoolSize(); if (this$maxPoolSize == null) { if (other$maxPoolSize != null) { return false; } } else if (!this$maxPoolSize.equals(other$maxPoolSize)) { return false; } Object this$groupSize = this.getGroupSize(); Object other$groupSize = other.getGroupSize(); if (this$groupSize == null) { if (other$groupSize == null) { return true; } } else if (this$groupSize.equals(other$groupSize)) { return true; } return false; } } } protected boolean canEqual(Object other) { return other instanceof TheadPoolProperties; } public int hashCode() { int PRIME = true; int result = 1; Object $timeOut = this.getTimeOut(); int result = result * 59 + ($timeOut == null ? 43 : $timeOut.hashCode()); Object $queueSize = this.getQueueSize(); result = result * 59 + ($queueSize == null ? 43 : $queueSize.hashCode()); Object $coreThreadNum = this.getCoreThreadNum(); result = result * 59 + ($coreThreadNum == null ? 43 : $coreThreadNum.hashCode()); Object $maxPoolSize = this.getMaxPoolSize(); result = result * 59 + ($maxPoolSize == null ? 43 : $maxPoolSize.hashCode()); Object $groupSize = this.getGroupSize(); result = result * 59 + ($groupSize == null ? 43 : $groupSize.hashCode()); return result; } public String toString() { return "TheadPoolProperties(timeOut=" + this.getTimeOut() + ", queueSize=" + this.getQueueSize() + ", coreThreadNum=" + this.getCoreThreadNum() + ", maxPoolSize=" + this.getMaxPoolSize() + ", groupSize=" + this.getGroupSize() + ")"; } }
4.列表拆分工具類
package com.visy.utils; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.jar.Attributes; /** * 列表或陣列按指定大小分組,用於批次取一部分資料迴圈處理 * */ public class ArraySplitUtil<T> { /** * 按指定大小對列表分組 * @param list * @param splitSize * @return */ public List<List<T>> splistList(List<T> list, int splitSize) { if (null == list || list.size() == 0) { return null; } int listSize = list.size(); List<List<T>> newList = new ArrayList<>(); if (listSize < splitSize) { newList.add(list); return newList; } int addLength = splitSize; int times = listSize / splitSize; if (listSize % splitSize != 0) { times += 1; } int start = 0; int end = 0; int last = times - 1; for (int i = 0; i < times; i++) { start = i * splitSize; if (i < last) { end = start + addLength; } else { end = listSize; } newList.add(list.subList(start, end)); } return newList; } /** * 按指定大小對陣列分組 * @param array * @param splitSize * @return */ public List<T[]> splistArray(T[] array, int splitSize) { if (null == array) { return null; } int listSize = array.length; List<T[]> newList = new ArrayList<>(); if (listSize < splitSize) { newList.add(array); return newList; } int addLength = splitSize; int times = listSize / splitSize; if (listSize % splitSize != 0) { times += 1; } int start = 0; int end = 0; int last = times - 1; for (int i = 0; i < times; i++) { start = i * splitSize; if (i < last) { end = start + addLength; } else { end = listSize; } newList.add(Arrays.copyOfRange(array, start, end)); } return newList; } public static <E> ArraySplitUtil<E> build(){ return new ArraySplitUtil<>(); } }
5.多工執行助手類
package com.visy.helper; import com.baomidou.mybatisplus.toolkit.CollectionUtils; import com.google.common.collect.Lists; import com.visy.utils.ArraySplitUtil; import com.visy.threadpool.ThreadPoolConfig; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.function.Consumer; import java.util.function.Function; import java.util.stream.Collectors; /** * 多工助手 * @author visy.wang * @date 2022/5/9 14:38 */ @Service public class MultiTaskHelper { @Autowired private ThreadPoolConfig threadPoolConfig; private static final Map<String,ArraySplitUtil<?>> ArraySplitUtilCache = new ConcurrentHashMap<>(); public <I,O> List<List<O>> createAndRunListTask(List<I> list, Function<I,O> handler){ return createAndRunListTask(list, null, handler); } public <I,O> List<List<O>> createAndRunListTaskV2(List<I> list, Function<List<I>, List<O>> handler){ return createAndRunListTask(list, handler, null); } public <I> void createAndRunListTaskWithoutReturn(List<I> list, Consumer<I> handler){ createAndRunListTaskWithoutReturn(list, null, handler); } public <I> void createAndRunListTaskWithoutReturnV2(List<I> list, Consumer<List<I>> handler){ createAndRunListTaskWithoutReturn(list, handler, null); } /** * 把列表按執行緒數分組 * @param list 列表 * @return 分組後的列表 */ @SuppressWarnings("unchecked") private <T> List<List<T>> listSplit(List<T> list){ String key = list.get(0).getClass().getName(); int groupSize = threadPoolConfig.getTheadPoolProperties().getGroupSize(); ArraySplitUtil<T> arraySplitUtil = (ArraySplitUtil<T>)ArraySplitUtilCache.get(key); if(Objects.isNull(arraySplitUtil)){ arraySplitUtil = ArraySplitUtil.build(); ArraySplitUtilCache.put(key, arraySplitUtil); } return arraySplitUtil.splistList(list, groupSize); } /** * 建立並執行多工 * @param list 輸入資料列表 * @param handler1 處理器1 (優先順序使用) * @param handler2 處理器2 * @param <I> 輸入資料型別 * @param <O> 輸出資料型別 * @return 執行結果分組列表 */ private <I,O> List<List<O>> createAndRunListTask(List<I> list, Function<List<I>, List<O>> handler1, Function<I,O> handler2){ List<List<I>> listGroup = listSplit(list); //設定每個組的任務 List<Callable<List<O>>> taskList = Lists.newArrayListWithExpectedSize(listGroup.size()); listGroup.stream().filter(CollectionUtils::isNotEmpty).forEach(subList -> { taskList.add(() -> { if(Objects.nonNull(handler1)){ return handler1.apply(subList); }else if(Objects.nonNull(handler2)){ return subList.stream().map(handler2).collect(Collectors.toList()); }else{ return null; } }); }); return threadPoolConfig.doConcurrentTask(taskList); } /** * 建立並執行多工(無返回結果) * @param list 輸入資料列表 * @param handler1 處理器1 (優先順序更高) * @param handler2 處理器2 * @param <I> 輸入資料型別 */ private <I> void createAndRunListTaskWithoutReturn(List<I> list, Consumer<List<I>> handler1, Consumer<I> handler2){ List<List<I>> listGroup = listSplit(list); //設定每個組的任務 List<Callable<List<?>>> taskList = Lists.newArrayListWithExpectedSize(listGroup.size()); listGroup.stream().filter(CollectionUtils::isNotEmpty).forEach(subList -> { taskList.add(() -> { if(Objects.nonNull(handler1)){ handler1.accept(subList); }else if(Objects.nonNull(handler2)){ subList.forEach(handler2); } return null; }); }); threadPoolConfig.doConcurrentTask(taskList); } }
6.多工助手使用:
@Autowired package com.zoom.fleet.schedule.service; import com.visy.helper.MultiTaskHelper; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.util.ArrayList; import java.util.List; import java.util.stream.Collectors; /** * 多工助手使用範例 * @author visy.wang * @date 2022/5/13 14:11 */ @Service public class MultiTaskTest { @Autowired private MultiTaskHelper multiTaskHelper; private void test(){ //待多工執行的資料列表 List<String> idList = new ArrayList<>(); //1.有返回結果的執行方式一, 定義單個資料的處理邏輯,返回多工執行結果和合集 List<List<Long>> resultList = multiTaskHelper.createAndRunListTask(idList, id->{ //每一項資料的業務程式碼 return Long.valueOf(id); }); //2.有返回結果的執行方式二, 定義單個數執行緒的處理邏輯,返回多工執行結果和合集 resultList = multiTaskHelper.createAndRunListTaskV2(idList, subIdList->{ //每一個執行緒下列表操作的業務程式碼 return subIdList.stream().map(id->{ //每一項資料的業務程式碼 return Long.valueOf(id); }).collect(Collectors.toList()); }); //3.無返回結果的執行方式一, 定義單個資料的處理邏輯 multiTaskHelper.createAndRunListTaskWithoutReturn(idList, id->{ //每一項資料的業務程式碼... }); //3.無返回結果的執行方式一, 定義單個資料的處理邏輯 multiTaskHelper.createAndRunListTaskWithoutReturnV2(idList, subIdList->{ subIdList.forEach(id->{ //每一項資料的業務程式碼... }); //繼續操作subIdList... }); } }
以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支援it145.com。
相關文章
<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
综合看Anker超能充系列的性价比很高,并且与不仅和iPhone12/苹果<em>Mac</em>Book很配,而且适合多设备充电需求的日常使用或差旅场景,不管是安卓还是Switch同样也能用得上它,希望这次分享能给准备购入充电器的小伙伴们有所
2021-06-01 09:31:42
除了L4WUDU与吴亦凡已经多次共事,成为了明面上的厂牌成员,吴亦凡还曾带领20XXCLUB全队参加2020年的一场音乐节,这也是20XXCLUB首次全员合照,王嗣尧Turbo、陈彦希Regi、<em>Mac</em> Ova Seas、林渝植等人全部出场。然而让
2021-06-01 09:31:34
目前应用IPFS的机构:1 谷歌<em>浏览器</em>支持IPFS分布式协议 2 万维网 (历史档案博物馆)数据库 3 火狐<em>浏览器</em>支持 IPFS分布式协议 4 EOS 等数字货币数据存储 5 美国国会图书馆,历史资料永久保存在 IPFS 6 加
2021-06-01 09:31:24
开拓者的车机是兼容苹果和<em>安卓</em>,虽然我不怎么用,但确实兼顾了我家人的很多需求:副驾的门板还配有解锁开关,有的时候老婆开车,下车的时候偶尔会忘记解锁,我在副驾驶可以自己开门:第二排设计很好,不仅配置了一个很大的
2021-06-01 09:30:48
不仅是<em>安卓</em>手机,苹果手机的降价力度也是前所未有了,iPhone12也“跳水价”了,发布价是6799元,如今已经跌至5308元,降价幅度超过1400元,最新定价确认了。iPhone12是苹果首款5G手机,同时也是全球首款5nm芯片的智能机,它
2021-06-01 09:30:45