<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
當RabbitMQ服務停掉以後訊息生產者傳送過的訊息不丟失。預設情況下RabbitMQ退出或者崩潰時,會忽視掉佇列和訊息。為了保證訊息不丟失需要將佇列和訊息都標記為持久化。
1.佇列持久化:在建立佇列時將channel.queueDeclare();
第二個引數改為true。
2.訊息持久化:在使用通道傳送訊息時channel.basicPublish();
將第三個引數改為:MessageProperties.PERSISTENT_TEXT_PLAIN
表示持久化訊息。
/** * @Description 持久化MQ * @date 2022/3/7 9:14 */ public class Producer3 { private static final String LONG_QUEUE = "long_queue"; public static void main(String[] args) throws Exception { Channel channel = RabbitMQUtils.getChannel(); // 持久化佇列 channel.queueDeclare(LONG_QUEUE,true,false,false,null); Scanner scanner = new Scanner(System.in); int i = 0; while (scanner.hasNext()){ i++; String msg = scanner.next() + i; // 持久化訊息 channel.basicPublish("",LONG_QUEUE, MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes(StandardCharsets.UTF_8)); System.out.println("傳送訊息:'" + msg + "'成功"); } } }
但是儲存訊息還有存在一個快取的間隔點,沒有真正的寫入磁碟,永續性保證不夠強,但是對於簡單佇列而言也綽綽有餘。
輪詢分發的方式在消費者處理效率不同的情況下並不適用。所以真正的公平應該是遵循能者多勞的前提。
在消費者處修改channel.basicQos(1);
表示開啟不公平分發
/** * @Description 不公平分發消費者 * @date 2022/3/7 9:27 */ public class Consumer2 { private static final String LONG_QUEUE = "long_queue"; public static void main(String[] args) throws Exception { Channel channel = RabbitMQUtils.getChannel(); DeliverCallback deliverCallback = (consumerTag, message) -> { // 模擬並行沉睡三十秒 try { Thread.sleep(30000); System.out.println("執行緒B接收訊息:"+ new String(message.getBody(), StandardCharsets.UTF_8)); channel.basicAck(message.getEnvelope().getDeliveryTag(),false); } catch (InterruptedException e) { e.printStackTrace(); } }; // 設定不公平分發 channel.basicQos(1); channel.basicConsume(LONG_QUEUE,false,deliverCallback, consumerTag -> { System.out.println(consumerTag + "消費者取消消費"); }); } }
測試目的:是否能實現能者多勞。
測試方法:兩個消費者睡眠不同的事件來模擬處理事件不同,如果處理時間(睡眠時間)短的能夠處理多個訊息就代表目的達成。
先啟動生產者建立佇列,再分別啟動兩個消費者。
生產者按照順序發四條訊息:
睡眠時間短的執行緒A接收到了三條訊息
而睡眠時間長的執行緒B只接收到的第二條訊息:
因為執行緒B在處理訊息時消耗的時間較長,所以就將其他訊息分配給了執行緒A。
實驗成功!
訊息的傳送和手動確認都是非同步完成的,因此就存在一個未確認訊息的緩衝區,開發人員希望能夠限制緩衝區的大小,用來避免緩衝區裡面無限制的未確認訊息問題。
這裡的預期值就值得是上述方法channel.basicQos();
裡面的引數,如果在當前通道上存在等於引數的訊息就不會在安排當前通道進行消費訊息。
測試方法:
1.新建兩個不同的消費者分別給定預期值5個2。
2.給睡眠時間長的指定為5,時間短的指定為2。
3.假如按照指定的預期值獲取訊息則表示測試成功,但並不是代表一定會按照5和2分配,這個類似於權重的判別。
程式碼根據上述程式碼修改預期值即可。
釋出確認就是生產者釋出訊息到佇列之後,佇列確認進行持久化完畢再通知給生產者的過程。這樣才能保證訊息不會丟失。
需要注意的是需要開啟佇列持久化才能使用確認釋出。
開啟方法:channel.confirmSelect();
是一種同步釋出的方式,即傳送完一個訊息之後只有確認它確認釋出後,後續的訊息才會繼續釋出,在指定的時間內沒有確認就會丟擲異常。缺點就是特別慢。
/** * @Description 確認釋出——單個確認 * @date 2022/3/7 14:49 */ public class SoloProducer { private static final int MESSAGE_COUNT = 100; private static final String QUEUE_NAME = "confirm_solo"; public static void main(String[] args) throws Exception { Channel channel = RabbitMQUtils.getChannel(); // 產生佇列 channel.queueDeclare(QUEUE_NAME,true,false,false,null); // 開啟確認釋出 channel.confirmSelect(); // 記錄開始時間 long beginTime = System.currentTimeMillis(); for (int i = 0; i < MESSAGE_COUNT; i++) { String msg = ""+i; channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes(StandardCharsets.UTF_8)); // 單個釋出確認 boolean flag = channel.waitForConfirms(); if (flag){ System.out.println("傳送訊息:" + i); } } // 記錄結束時間 long endTime = System.currentTimeMillis(); System.out.println("傳送" + MESSAGE_COUNT + "條訊息消耗:"+(endTime - beginTime) + "毫秒"); } }
一批一批的確認釋出可以提高系統的吞吐量。但是缺點是發生故障導致釋出出現問題時,需要將整個批次處理儲存在記憶體中,後面再重新發布。
/** * @Description 確認釋出——批次確認 * @date 2022/3/7 14:49 */ public class BatchProducer { private static final int MESSAGE_COUNT = 100; private static final String QUEUE_NAME = "confirm_batch"; public static void main(String[] args) throws Exception { Channel channel = RabbitMQUtils.getChannel(); // 產生佇列 channel.queueDeclare(QUEUE_NAME,true,false,false,null); // 開啟確認釋出 channel.confirmSelect(); // 設定一個多少一批確認一次。 int batchSize = MESSAGE_COUNT / 10; // 記錄開始時間 long beginTime = System.currentTimeMillis(); for (int i = 0; i < MESSAGE_COUNT; i++) { String msg = ""+i; channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes(StandardCharsets.UTF_8)); // 批次釋出確認 if (i % batchSize == 0){ if (channel.waitForConfirms()){ System.out.println("傳送訊息:" + i); } } } // 記錄結束時間 long endTime = System.currentTimeMillis(); System.out.println("傳送" + MESSAGE_COUNT + "條訊息消耗:"+(endTime - beginTime) + "毫秒"); } }
顯然效率要比單個確認釋出的高很多。
在程式設計上比上述兩個要複雜,但是價效比很高,無論是可靠性還行效率的都好很多,利用回撥函數來達到訊息可靠性傳遞的。
/** * @Description 確認釋出——非同步確認 * @date 2022/3/7 14:49 */ public class AsyncProducer { private static final int MESSAGE_COUNT = 100; private static final String QUEUE_NAME = "confirm_async"; public static void main(String[] args) throws Exception { Channel channel = RabbitMQUtils.getChannel(); // 產生佇列 channel.queueDeclare(QUEUE_NAME,true,false,false,null); // 開啟確認釋出 channel.confirmSelect(); // 記錄開始時間 long beginTime = System.currentTimeMillis(); // 確認成功回撥 ConfirmCallback ackCallback = (deliveryTab,multiple) ->{ System.out.println("確認成功訊息:" + deliveryTab); }; // 確認失敗回撥 ConfirmCallback nackCallback = (deliveryTab,multiple) ->{ System.out.println("未確認的訊息:" + deliveryTab); }; // 訊息監聽器 /** * addConfirmListener: * 1. 確認成功的訊息; * 2. 確認失敗的訊息。 */ channel.addConfirmListener(ackCallback,nackCallback); for (int i = 0; i < MESSAGE_COUNT; i++) { String msg = "" + i; channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes(StandardCharsets.UTF_8)); } // 記錄結束時間 long endTime = System.currentTimeMillis(); System.out.println("傳送" + MESSAGE_COUNT + "條訊息消耗:"+(endTime - beginTime) + "毫秒"); } }
最好的處理方式把未確認的訊息放到一個基於記憶體的能被髮布執行緒存取的佇列。
例如:ConcurrentLinkedQueue
可以在確認佇列confirm callbacks
與釋出執行緒之間進行訊息的傳遞。
處理方式:
1.記錄要傳送的全部訊息;
2.在釋出成功確認處刪除;
3.列印未確認的訊息。
使用一個雜湊表儲存訊息,它的優點:
可以將需要和訊息進行關聯;輕鬆批次刪除條目;支援高並行。
ConcurrentSkipListMap<Long,String > map = new ConcurrentSkipListMap<>();
/** * @Description 非同步釋出確認,處理未釋出成功的訊息 * @date 2022/3/7 18:09 */ public class AsyncProducerRemember { private static final int MESSAGE_COUNT = 100; private static final String QUEUE_NAME = "confirm_async_remember"; public static void main(String[] args) throws Exception { Channel channel = RabbitMQUtils.getChannel(); // 產生佇列 channel.queueDeclare(QUEUE_NAME,true,false,false,null); // 開啟確認釋出 channel.confirmSelect(); // 執行緒安全有序的一個hash表,適用與高並行 ConcurrentSkipListMap< Long, String > map = new ConcurrentSkipListMap<>(); // 記錄開始時間 long beginTime = System.currentTimeMillis(); // 確認成功回撥 ConfirmCallback ackCallback = (deliveryTab, multiple) ->{ //2. 在釋出成功確認處刪除; // 批次刪除 if (multiple){ ConcurrentNavigableMap<Long, String> confirmMap = map.headMap(deliveryTab); confirmMap.clear(); }else { // 單獨刪除 map.remove(deliveryTab); } System.out.println("確認成功訊息:" + deliveryTab); }; // 確認失敗回撥 ConfirmCallback nackCallback = (deliveryTab,multiple) ->{ // 3. 列印未確認的訊息。 System.out.println("未確認的訊息:" + map.get(deliveryTab) + ",標記:" + deliveryTab); }; // 訊息監聽器 /** * addConfirmListener: * 1. 確認成功的訊息; * 2. 確認失敗的訊息。 */ channel.addConfirmListener(ackCallback,nackCallback); for (int i = 0; i < MESSAGE_COUNT; i++) { String msg = "" + i; channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes(StandardCharsets.UTF_8)); // 1. 記錄要傳送的全部訊息; map.put(channel.getNextPublishSeqNo(),msg); } // 記錄結束時間 long endTime = System.currentTimeMillis(); System.out.println("傳送" + MESSAGE_COUNT + "條訊息消耗:"+(endTime - beginTime) + "毫秒"); } }
顯然來說,非同步處理除了在編碼處有些麻煩,在處理時間效率和可用性上都是比單處理和批次處理好很多。
本篇文章就到這裡了,希望能夠給你帶來幫助,也希望您能夠多多關注it145.com的更多內容!
相關文章
<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
综合看Anker超能充系列的性价比很高,并且与不仅和iPhone12/苹果<em>Mac</em>Book很配,而且适合多设备充电需求的日常使用或差旅场景,不管是安卓还是Switch同样也能用得上它,希望这次分享能给准备购入充电器的小伙伴们有所
2021-06-01 09:31:42
除了L4WUDU与吴亦凡已经多次共事,成为了明面上的厂牌成员,吴亦凡还曾带领20XXCLUB全队参加2020年的一场音乐节,这也是20XXCLUB首次全员合照,王嗣尧Turbo、陈彦希Regi、<em>Mac</em> Ova Seas、林渝植等人全部出场。然而让
2021-06-01 09:31:34
目前应用IPFS的机构:1 谷歌<em>浏览器</em>支持IPFS分布式协议 2 万维网 (历史档案博物馆)数据库 3 火狐<em>浏览器</em>支持 IPFS分布式协议 4 EOS 等数字货币数据存储 5 美国国会图书馆,历史资料永久保存在 IPFS 6 加
2021-06-01 09:31:24
开拓者的车机是兼容苹果和<em>安卓</em>,虽然我不怎么用,但确实兼顾了我家人的很多需求:副驾的门板还配有解锁开关,有的时候老婆开车,下车的时候偶尔会忘记解锁,我在副驾驶可以自己开门:第二排设计很好,不仅配置了一个很大的
2021-06-01 09:30:48
不仅是<em>安卓</em>手机,苹果手机的降价力度也是前所未有了,iPhone12也“跳水价”了,发布价是6799元,如今已经跌至5308元,降价幅度超过1400元,最新定价确认了。iPhone12是苹果首款5G手机,同时也是全球首款5nm芯片的智能机,它
2021-06-01 09:30:45