<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
①需要實現一個定時釋出系統通告的功能,如何實現? ②支付超時,訂單自動取消,如何實現?
推薦指數:★★☆ 優點: JDK原生(JUC包下)支援,無需引入新的依賴; 缺點: (1)基於記憶體,應用重啟(或宕機)會導致任務丟失 (2)基於記憶體掛起執行緒實現延時,不支援叢集 (3)程式碼耦合性大,不易維護 (4)一個任務就要新建一個執行緒繫結任務的執行,容易造成資源浪費
①設定延遲任務專用執行緒池
/** * 執行緒池設定 */ @Configuration @EnableAsync @EnableConfigurationProperties(ThreadPoolProperties.class) public class ThreadPoolConfig { //ThreadPoolProperties的設定依據需求和伺服器設定自行設定 @Resource private ThreadPoolProperties threadPoolProperties; //延遲任務佇列容量 private final static int DELAY_TASK_QUEUE_CAPACITY = 100; @Bean public ThreadPoolTaskExecutor delayTaskExecutor() { log.info("start delayTaskExecutor"); ThreadPoolTaskExecutor threadPool = new ThreadPoolTaskExecutor(); //設定核心執行緒數 threadPool.setCorePoolSize(threadPoolProperties.getCorePoolSize()); //設定最大執行緒數 threadPool.setMaxPoolSize(threadPoolProperties.getMaxPoolSize()); //設定佇列大小 threadPool.setQueueCapacity(DELAY_TASK_QUEUE_CAPACITY); //執行緒最大存活時間 threadPool.setKeepAliveSeconds (threadPoolProperties.getKeepAliveSeconds()); //設定執行緒池中的執行緒的名稱字首 threadPool.setThreadNamePrefix(threadPoolProperties.getThreadNamePrefix()); // rejection-policy:當pool已經達到max size的時候執行的策略 threadPool.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy()); //執行初始化 threadPool.initialize(); return threadPool; } }
②建立延時任務
在需要執行的程式碼塊建立延時任務
delayTaskExecutor.execute(() -> { try { //執行緒掛起指定時間 TimeUnit.MINUTES.sleep(time); //執行業務邏輯 doSomething(); } catch (InterruptedException e) { log.error("執行緒被打斷,執行業務邏輯失敗"); } });
推薦指數:★★★ 優點: 程式碼簡潔,JDK原生支援 缺點: (1)基於記憶體,應用重啟(或宕機)會導致任務丟失 (2)基於記憶體存放任務,不支援叢集 (3)一個任務就要新建一個執行緒繫結任務的執行,容易造成資源浪費
class Task implements Runnable{ @Override public void run() { System.out.println(Thread.currentThread().getId()+":"+Thread.currentThread().getName()); System.out.println("scheduledExecutorService====>>>延時器"); } } public class ScheduleServiceTest { public static void main(String[] args) { ScheduledExecutorService scheduledExecutorService=new ScheduledThreadPoolExecutor(10); scheduledExecutorService.schedule(new Task(),1, TimeUnit.SECONDS); scheduledExecutorService.schedule(new Task(),2, TimeUnit.SECONDS); scheduledExecutorService.schedule(new Task(),1, TimeUnit.SECONDS); } }
推薦指數:★★★☆ 優點: (1)JDK原生(JUC包下)支援,無需引入新的依賴; (2)可以用一個執行緒對整個延時佇列按序執行; 缺點: (1)基於記憶體,應用重啟(或宕機)會導致任務丟失 (2)基於記憶體存放佇列,不支援叢集 (3)依據compareTo方法排列佇列,呼叫take阻塞式的取出第一個任務(不呼叫則不取出),比較不靈活,會影響時間的準確性
①新建一個延時任務
public class DelayTask implements Delayed { private Integer taskId; private long executeTime; DelayTask(Integer taskId, long executeTime) { this.taskId = taskId; this.executeTime = executeTime; } /** * 該任務的延時時長 * @param unit * @return */ @Override public long getDelay(TimeUnit unit) { return executeTime - System.currentTimeMillis(); } @Override public int compareTo(Delayed o) { DelayTask t = (DelayTask) o; if (this.executeTime - t.executeTime <= 0) { return -1; } else { return 1; } } @Override public String toString() { return "延時任務{" + "任務編號=" + taskId + ", 執行時間=" + new Date(executeTime) + '}'; } /** * 執行具體業務程式碼 */ public void doTask(){ System.out.println(this+":"); System.out.println("執行緒ID-"+Thread.currentThread().getId()+":執行緒名稱-"+Thread.currentThread().getName()+":do something!"); } }
②執行延時任務
public class TestDelay { public static void main(String[] args) throws InterruptedException { // 新建3個任務,並依次設定超時時間為 30s 10s 60s DelayTask d1 = new DelayTask(1, System.currentTimeMillis() + 3000L); DelayTask d2 = new DelayTask(2, System.currentTimeMillis() + 1000L); DelayTask d3 = new DelayTask(3, System.currentTimeMillis() + 6000L); DelayQueue<DelayTask> queue = new DelayQueue<>(); queue.add(d1); queue.add(d2); queue.add(d3); System.out.println("開啟延時佇列時間:" + new Date()+"n"); // 從延時佇列中獲取元素 while (!queue.isEmpty()) { queue.take().doTask(); } System.out.println("n任務結束"); } }
執行結果:
推薦指數:★★★☆ 優點: 對於有依賴redis的業務且有延時任務的需求,能夠快速對接 缺點: (1)使用者端斷開後重連會導致所有事件丟失 (2)高並行場景下,存在大量的失效key場景會匯出失效時間存在延遲 (3)若有多個監聽器監聽該key,是會重複消費這個過期事件的,需要特定邏輯判斷
① 修改Redis組態檔並重啟Redis
notify-keyspace-events Ex
注意: redis組態檔不能有空格,否則會啟動報錯
②Java中關於Redis的設定類
redisTemplate範例bean需要自定義生成; RedisMessageListenerContainer 是redis-key過期監聽需要的監聽器容器;
@Configuration @Slf4j public class RedisConfiguration { /** * Redis設定 * @param factory * @return */ @Bean(name = "redisTemplate") public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory factory) { RedisTemplate<Object, Object> template = new RedisTemplate<>(); RedisSerializer<String> redisSerializer = new StringRedisSerializer(); template.setConnectionFactory(factory); //key序列化方式 template.setKeySerializer(redisSerializer); //value序列化 template.setValueSerializer(redisSerializer); //value hashmap序列化 template.setHashValueSerializer(redisSerializer); //key hashmap序列化 template.setHashKeySerializer(redisSerializer); return template; } /** * 訊息監聽器容器bean * @param connectionFactory * @return */ @Bean public RedisMessageListenerContainer container(LettuceConnectionFactory connectionFactory) { RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(connectionFactory); return container; } }
③監聽器程式碼
@Slf4j @Component public class RedisKeyExpirationListener extends KeyExpirationEventMessageListener { private static final String TEST_REDIS_KEY = "testExpired"; public RedisKeyExpirationListener(RedisMessageListenerContainer listenerContainer, RedisTemplate redisTemplate) { super(listenerContainer); /** * 設定一個Redis延遲過期key(key名:testExpired,過期時間:30秒) */ redisTemplate.opsForValue().set(TEST_REDIS_KEY, "1", 20, TimeUnit.SECONDS); log.info("設定redis-key"); } @Override public void onMessage(Message message, byte[] pattern) { try { String expiredKey = message.toString(); if (TEST_REDIS_KEY.equals(expiredKey)) { //業務處理 log.info(expiredKey + "過期,觸發回撥"); } } catch (Exception e) { log.error("key 過期通知處理異常,{}", e); } } }
測試結果:
推薦指數:★★★★ 優點: (1)對於大量定時任務,時間輪可以僅用一個工作執行緒對編排的任務進行順序執行; (2)自動執行,可以自定義時間輪每輪的tick數,tick間隔,靈活且時間精度可控 缺點: (1)基於記憶體,應用重啟(或宕機)會導致任務丟失 (2)基於記憶體存放任務,不支援叢集
public class WheelTimerTest { public static void main(String[] args) { //設定每個格子是 100ms, 總共 256 個格子 HashedWheelTimer hashedWheelTimer = new HashedWheelTimer(100, TimeUnit.MILLISECONDS, 256); //加入三個任務,依次設定超時時間是 10s 5s 20s System.out.println("加入一個任務,ID = 1, time= " + LocalDateTime.now()); hashedWheelTimer.newTimeout(timeout -> { System.out.println(Thread.currentThread().getName()); System.out.println("執行一個任務,ID = 1, time= " + LocalDateTime.now()); }, 10, TimeUnit.SECONDS); System.out.println("加入一個任務,ID = 2, time= " + LocalDateTime.now()); hashedWheelTimer.newTimeout(timeout -> { System.out.println(Thread.currentThread().getName()); System.out.println("執行一個任務,ID = 2, time= " + LocalDateTime.now()); }, 5, TimeUnit.SECONDS); System.out.println("加入一個任務,ID = 3, time= " + LocalDateTime.now()); hashedWheelTimer.newTimeout(timeout -> { System.out.println(Thread.currentThread().getName()); System.out.println("執行一個任務,ID = 3, time= " + LocalDateTime.now()); }, 20, TimeUnit.SECONDS); System.out.println("加入一個任務,ID = 4, time= " + LocalDateTime.now()); hashedWheelTimer.newTimeout(timeout -> { System.out.println(Thread.currentThread().getName()); System.out.println("執行一個任務,ID = 4, time= " + LocalDateTime.now()); }, 20, TimeUnit.SECONDS); System.out.println("等待任務執行==========="); } }
針對任務丟失的代價過大,高並行的場景 推薦指數:★★★★ 優點: 支援叢集,分散式,高並行場景; 缺點: 引入額外的訊息佇列,增加專案的部署和維護的複雜度。
場景:為一個委託指定期限,委託到期後,委託關係終止,相關業務許可權移交回原擁有者 這裡採用的是RabbitMq的死信佇列加TTL訊息轉化為延遲佇列的方式(RabbitMq沒有延時佇列)
①宣告一個佇列設定其的死信佇列
@Configuration public class MqConfig { public static final String GLOBAL_RABBIT_TEMPLATE = "rabbitTemplateGlobal"; public static final String DLX_EXCHANGE_NAME = "dlxExchange"; public static final String AUTH_EXCHANGE_NAME = "authExchange"; public static final String DLX_QUEUE_NAME = "dlxQueue"; public static final String AUTH_QUEUE_NAME = "authQueue"; public static final String DLX_AUTH_QUEUE_NAME = "dlxAuthQueue"; @Bean @Qualifier(GLOBAL_RABBIT_TEMPLATE) public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); return rabbitTemplate; } @Bean @Qualifier(AUTH_EXCHANGE_NAME) public Exchange authExchange() { return ExchangeBuilder.directExchange (AUTH_EXCHANGE_NAME).durable (true).build (); } /** * 死信交換機 * @return */ @Bean @Qualifier(DLX_EXCHANGE_NAME) public Exchange dlxExchange() { return ExchangeBuilder.directExchange (DLX_EXCHANGE_NAME).durable (true).build (); } /** * 記錄紀錄檔的死信佇列 * @return */ @Bean @Qualifier(DLX_QUEUE_NAME) public Queue dlxQueue() { // Queue(String name, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) return QueueBuilder.durable (DLX_QUEUE_NAME).build (); } /** * 委託授權專用佇列 * @return */ @Bean @Qualifier(AUTH_QUEUE_NAME) public Queue authQueue() { return QueueBuilder .durable (AUTH_QUEUE_NAME) .withArgument("x-dead-letter-exchange", DLX_EXCHANGE_NAME) .withArgument("x-dead-letter-routing-key", "dlx_auth") .build (); } /** * 委託授權專用死信佇列 * @return */ @Bean @Qualifier(DLX_AUTH_QUEUE_NAME) public Queue dlxAuthQueue() { // Queue(String name, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) return QueueBuilder .durable (DLX_AUTH_QUEUE_NAME) .withArgument("x-dead-letter-exchange", DLX_EXCHANGE_NAME) .withArgument("x-dead-letter-routing-key", "dlx_key") .build (); } @Bean public Binding bindDlxQueueExchange(@Qualifier(DLX_QUEUE_NAME) Queue dlxQueue, @Qualifier(DLX_EXCHANGE_NAME) Exchange dlxExchange){ return BindingBuilder.bind (dlxQueue).to (dlxExchange).with ("dlx_key").noargs (); } /** * 委託授權專用死信佇列繫結關係 * @param dlxAuthQueue * @param dlxExchange * @return */ @Bean public Binding bindDlxAuthQueueExchange(@Qualifier(DLX_AUTH_QUEUE_NAME) Queue dlxAuthQueue, @Qualifier(DLX_EXCHANGE_NAME) Exchange dlxExchange){ return BindingBuilder.bind (dlxAuthQueue).to (dlxExchange).with ("dlx_auth").noargs (); } /** * 委託授權專用佇列繫結關係 * @param authQueue * @param authExchange * @return */ @Bean public Binding bindAuthQueueExchange(@Qualifier(AUTH_QUEUE_NAME) Queue authQueue, @Qualifier(AUTH_EXCHANGE_NAME) Exchange authExchange){ return BindingBuilder.bind (authQueue).to (authExchange).with ("auth").noargs (); } }
②傳送含過期時間的訊息
向授權交換機,傳送路由為"auth"的訊息(指定了業務所需的超時時間) =》發向MqConfig.AUTH_QUEUE_NAME 佇列
rabbitTemplate.convertAndSend(MqConfig.AUTH_EXCHANGE_NAME, "auth", "型別:END,資訊:{id:1,fromUserId:111,toUserId:222,beginData:20201204,endData:20211104}", message -> { /** * MessagePostProcessor:訊息後置處理 * 為訊息設定屬性,然後返回訊息,相當於包裝訊息的類 */ //業務邏輯:過期時間=xxxx String ttl = "5000"; //設定訊息的過期時間 message.getMessageProperties ().setExpiration (ttl); return message; });
③超時後佇列MqConfig.AUTH_QUEUE_NAME會將訊息轉發至其設定的死信路由"dlx_auth",監聽該死信佇列即可消費定時的訊息
/** * 授權定時處理 * @param channel * @param message */ @RabbitListener(queues = MqConfig.DLX_AUTH_QUEUE_NAME) public void dlxAuthQ(Channel channel, Message message) throws IOException { System.out.println ("n死信原因:" + message.getMessageProperties ().getHeaders ().get ("x-first-death-reason")); //1.判斷訊息型別:1.BEGIN 2.END try { //2.1 型別為授權到期(END) //2.1.1 修改報件辦理人 //2.1.2 修改授權狀態為0(失效) //2.2 型別為授權開啟(BEGIN) //2.2.1 修改授權狀態為1(開啟) System.out.println (new String(message.getBody (), Charset.forName ("utf8"))); channel.basicAck (message.getMessageProperties ().getDeliveryTag (), false); System.out.println ("已處理,授權相關資訊修改成功"); } catch (Exception e) { //拒籤訊息 channel.basicNack (message.getMessageProperties ().getDeliveryTag (), false, false); System.out.println ("授權相關資訊處理失敗, 進入死信佇列記錄紀錄檔"); } }
以上就是盤點Java中延時任務的多種實現方式的詳細內容,更多關於Java延時任務的資料請關注it145.com其它相關文章!
相關文章
<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
综合看Anker超能充系列的性价比很高,并且与不仅和iPhone12/苹果<em>Mac</em>Book很配,而且适合多设备充电需求的日常使用或差旅场景,不管是安卓还是Switch同样也能用得上它,希望这次分享能给准备购入充电器的小伙伴们有所
2021-06-01 09:31:42
除了L4WUDU与吴亦凡已经多次共事,成为了明面上的厂牌成员,吴亦凡还曾带领20XXCLUB全队参加2020年的一场音乐节,这也是20XXCLUB首次全员合照,王嗣尧Turbo、陈彦希Regi、<em>Mac</em> Ova Seas、林渝植等人全部出场。然而让
2021-06-01 09:31:34
目前应用IPFS的机构:1 谷歌<em>浏览器</em>支持IPFS分布式协议 2 万维网 (历史档案博物馆)数据库 3 火狐<em>浏览器</em>支持 IPFS分布式协议 4 EOS 等数字货币数据存储 5 美国国会图书馆,历史资料永久保存在 IPFS 6 加
2021-06-01 09:31:24
开拓者的车机是兼容苹果和<em>安卓</em>,虽然我不怎么用,但确实兼顾了我家人的很多需求:副驾的门板还配有解锁开关,有的时候老婆开车,下车的时候偶尔会忘记解锁,我在副驾驶可以自己开门:第二排设计很好,不仅配置了一个很大的
2021-06-01 09:30:48
不仅是<em>安卓</em>手机,苹果手机的降价力度也是前所未有了,iPhone12也“跳水价”了,发布价是6799元,如今已经跌至5308元,降价幅度超过1400元,最新定价确认了。iPhone12是苹果首款5G手机,同时也是全球首款5nm芯片的智能机,它
2021-06-01 09:30:45