首頁 > 軟體

分散式利器redis及redisson的延遲佇列實踐

2022-03-01 19:01:52

前言碎語

首先說明下需求,一個使用者中心產品,使用者在試用產品有三天的期限,三天到期後準時準點通知使用者,試用產品到期了。這個需求如果不是準時通知,而是每天定點通知就簡單了。如果需要準時通知就只能上延遲佇列了。使用場景除了如上,典型的業務場景還有電商中的延時未支付訂單失效等等。

延遲佇列多種實現方式

  • 1.如基於RabbitMQ的佇列ttl+死信路由策略:通過設定一個佇列的超時未消費時間,配合死信路由策略,到達時間未消費後,回會將此訊息路由到指定佇列
  • 2.基於RabbitMQ延遲佇列外掛(rabbitmq-delayed-message-exchange):傳送訊息時通過在請求頭新增延時引數(headers.put("x-delay", 5000))即可達到延遲佇列的效果
  • 3.使用redis的zset有序性,輪詢zset中的每個元素,到點後將內容遷移至待消費的佇列,(redisson已有實現)
  • 4.使用redis的key的過期通知策略,設定一個key的過期時間為延遲時間,過期後通知使用者端

redisson中的延遲佇列實現

怎麼封裝便於業務使用。

1.首先定義一個延遲job,裡面包含一個map引數,和佇列執行器的具體實現class,觸發任務執行時,map引數會被傳遞到具體的業務執行器實現內

/**
 * Created by kl on 2018/7/20.
 * Content :延時job
 */
public class DelayJob {
    private Map jobParams;//job執行引數
    private Class aClass;//具體執行範例實現
}

2.定義一個延遲job執行器介面,業務需要實現這個介面,然後在execute方法內寫自己的業務邏輯

/**
 * Created by kl on 2018/7/20.
 * Content :延時job執行器介面
 */
public interface ExecuteJob {
     void execute(DelayJob job);
}

3.消費已經到點的延時job服務,通過job引數呼叫業務執行器實現

@Component
public class JobTimer {
    static final String jobsTag = "customer_jobtimer_jobs";
    @Autowired
    private RedissonClient client;
    @Autowired
    private ApplicationContext context;
    ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2);
    @PostConstruct
    public void startJobTimer() {
        RBlockingQueueblockingQueue = client.getBlockingQueue(jobsTag);
        new Thread() {
            @Override
            public void run() {
                while (true) {
                    try {
                        DelayJob job = blockingQueue.take();
                        executorService.execute(new ExecutorTask(context, job));
                    } catch (Exception e) {
                        e.printStackTrace();
                        try {
                            TimeUnit.SECONDS.sleep(60);
                        } catch (Exception ex) {
                        }
                    }
                }
            }
        }.start();
    }
    class ExecutorTask implements Runnable {
        private ApplicationContext context;
        private DelayJob delayJob;
        public ExecutorTask(ApplicationContext context, DelayJob delayJob) {
            this.context = context;
            this.delayJob = delayJob;
        }
        @Override
        public void run() {
            ExecuteJob service = (ExecuteJob) context.getBean(delayJob.getaClass());
            service.execute(delayJob);
        }
    }
}

4.封裝延時job服務

/**
 * Created by kl on 2018/7/20.
 * Content :延時job服務
 */
@Component
public class DelayJobService {
    @Autowired
    private RedissonClient client;
    public void submitJob(DelayJob job, Long delay, TimeUnit timeUnit){
        RBlockingQueueblockingQueue = client.getBlockingQueue(JobTimer.jobsTag);
        RDelayedQueue delayedQueue = client.getDelayedQueue(blockingQueue);
        delayedQueue.offer(job,delay,timeUnit);
    }
}

文末結語

redisson作為一個分散式利器,這麼好用的工具沒人用有點可惜,還有一個原因是有個想法,想將延遲佇列這個功能封裝成一個spring boot的start依賴,然後開源出來,造福四方,希望大家以後多多支援it145.com!


IT145.com E-mail:sddin#qq.com