<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
工作佇列(任務佇列)主要思想是避免立即執行資源密集型任務,而不得不等待它完成,相反我們安排任務在之後執行。我們把任務封裝為訊息並將其傳送到佇列。在後臺執行的工作程序將彈出任務並最終執行作業。當有多個工作執行緒時,這些工作執行緒將一起處理這些任務。
其實就是生產者傳送大量的訊息,傳送到佇列之後,由多個消費者(工作執行緒)來處理訊息,並且每個訊息只能被處理一次。
多個工作執行緒按照次序每來一個訊息執行一次。
直接通過資訊獲取通道
/** * @Description RabbitMQ工具類 * @date 2022/3/5 10:02 */ public class RabbitMQUtils { public static Channel getChannel() throws Exception{ ConnectionFactory factory = new ConnectionFactory(); factory.setHost("1"); factory.setUsername("guest"); factory.setPassword("guest"); Connection connection = factory.newConnection(); return connection.createChannel(); } }
Work2和Work1程式碼沒有區別,只需要對它做出區分即可。
public class Worker1 { // 指定佇列名稱 private static final String QUEUE_NAME = "work_queue"; public static void main(String[] args) throws Exception { // 獲取通道 Channel channel = RabbitMQUtils.getChannel(); // 宣告:接收訊息回撥 DeliverCallback deliverCallback = (consumerTag, message) -> { System.out.println("工作執行緒01:"+ new String(message.getBody())); }; // 宣告:取消消費回撥 CancelCallback cancelCallback = consumerTag -> { System.out.println("工作執行緒01取消接收:"+consumerTag); }; System.out.println("工作執行緒01啟動完成......"); channel.basicConsume(QUEUE_NAME,false,deliverCallback,cancelCallback); } }
public class Producer { private static final String QUEUE_NAME = "work_queue"; public static void main(String[] args) throws Exception { Channel channel = RabbitMQUtils.getChannel(); // 產生佇列 channel.queueDeclare(QUEUE_NAME,false,false,true,null); // 訊息體 Scanner scanner = new Scanner(System.in); int i = 1; while (scanner.hasNext()){ String msg = scanner.next(); msg = msg + i; channel.basicPublish("",QUEUE_NAME,null,msg.getBytes()); System.out.println("傳送成功:" + msg); } System.out.println("----------==========傳送完畢==========----------"); } }
先啟動兩個工作執行緒,再啟動生產者。
出現404異常請參考下方1.6
生產者傳送情況:
輪詢狀態下兩個工作佇列接收狀態:
在先啟動兩個消費者執行緒時,會提示404找不到佇列。
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'work_queue' in vhost '/', class-id=60, method-id=20)
發生這個情況的原因很顯然是因為先啟動了消費者,但是在RabbitMQ中沒有建立相對應的佇列名稱,解決方法可以:
1.先啟動生產者建立佇列(也可以在RabbitMQ中建立佇列);
2.再啟動消費者就不會產生這個錯誤;
3.再在生產者中使用Scanner
類去傳送訊息測試。
消費者在接收到訊息並且處理該訊息之後,告訴RabbitMQ它已經處理了,RabbitMQ可以刪除訊息。其目的就是為了保護訊息在被處理之前不會消失。
這種方式傳送後就被認定為已經傳送成功,所以在訊息接收到之前消費者的連線或者channel關閉,那麼這個訊息就會丟失。其特點是消費者可以傳遞過載的訊息,對傳遞的訊息沒有限制,但如果因記憶體耗盡消費者執行緒被系統殺死,就會使得多條訊息丟失。所以這個模式需要在資料安全性和吞吐量之間選擇,適合使用在消費者可以高效並以某種速率能夠處理這些訊息的情況下使用。
所以自動應答的方式侷限性很高。
優點:可以批次應答和減少網路擁擠。
1.channel.basicAck(long deliveryTag, boolean multiple);
:肯應應答,處理完訊息之後提醒RabbitMQ可以刪除當前佇列,deliveryTag:當前佇列中選中的訊息;multiple:是否批次應答。
2.channel.basicNack(long deliveryTag, boolean multiple, boolean requeue)
:否定應答,
3.channel.basicReject(long deliveryTag, boolean requeue)
:否定並且拒絕應答。
如果消費者因為一些原因失去了對RabbitMQ的連線,導致沒有傳送ACK確認,RabbitMQ就會對該訊息進行重新排隊,並且分發給可以處理該訊息的消費者,所以即使某個消費者死亡,也可以保證訊息不會丟失。
測試目的:在手動應答狀態下不會發生訊息丟失的情況。
測試方法:
1.建立兩個消費者;
2.使用工具類使執行緒睡眠一定時間;
3.在睡眠時關閉執行緒,看能否自動重新入隊。
/** * @Description 手動應答生產者 * @date 2022/3/5 19:03 */ public class Producer1 { // 指定佇列名 private static final String TASK_QUEUE_RES = "queue_res"; public static void main(String[] args) throws Exception { Channel channel = RabbitMQUtils.getChannel(); channel.queueDeclare(TASK_QUEUE_RES,false,false,false,null); Scanner scanner = new Scanner(System.in); int i = 0; while (scanner.hasNext()){ i++; String msg = scanner.next() + i; channel.basicPublish("",TASK_QUEUE_RES,null,msg.getBytes(StandardCharsets.UTF_8)); System.out.println("傳送訊息:'" + msg + "'成功"); } } }
/** * @Description 手動應答消費者1 * @date 2022/3/5 19:17 */ public class Worker1 { private static final String TASK_QUEUE_RES = "queue_res"; public static void main(String[] args) throws Exception{ Channel channel = RabbitMQUtils.getChannel(); System.out.println("執行緒A等待接收......"); DeliverCallback deliverCallback = (consumerTag, message) -> { // 模擬並行沉睡一秒 try { Thread.sleep(1000); System.out.println("執行緒A接收訊息:"+ new String(message.getBody(), StandardCharsets.UTF_8)); /** * basicAck: * 1. 訊息標記 * 2. 是否批次 */ channel.basicAck(message.getEnvelope().getDeliveryTag(),false); } catch (InterruptedException e) { e.printStackTrace(); } }; channel.basicConsume(TASK_QUEUE_RES,false,deliverCallback, consumerTag -> { System.out.println(consumerTag + "消費者取消消費"); }); } }
Worker2類和1區別不大,將名稱改成B再將睡眠事件改成30即可。
測試方法:
1.先啟動生產者建立佇列;
2.啟動兩個消費者接收訊息;
3.因為是輪詢方式,所以A執行緒接收之後肯定是B執行緒接收,在睡眠時關閉B執行緒,如果A執行緒接收到說明測試成功。
傳送訊息:
執行緒A接收:
再傳送訊息:
關閉執行緒B執行緒A接收到訊息:
測試成功!
本篇文章就到這裡了,希望能夠給你帶來幫助,也希望您能夠多多關注it145.com的更多內容!
相關文章
<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
综合看Anker超能充系列的性价比很高,并且与不仅和iPhone12/苹果<em>Mac</em>Book很配,而且适合多设备充电需求的日常使用或差旅场景,不管是安卓还是Switch同样也能用得上它,希望这次分享能给准备购入充电器的小伙伴们有所
2021-06-01 09:31:42
除了L4WUDU与吴亦凡已经多次共事,成为了明面上的厂牌成员,吴亦凡还曾带领20XXCLUB全队参加2020年的一场音乐节,这也是20XXCLUB首次全员合照,王嗣尧Turbo、陈彦希Regi、<em>Mac</em> Ova Seas、林渝植等人全部出场。然而让
2021-06-01 09:31:34
目前应用IPFS的机构:1 谷歌<em>浏览器</em>支持IPFS分布式协议 2 万维网 (历史档案博物馆)数据库 3 火狐<em>浏览器</em>支持 IPFS分布式协议 4 EOS 等数字货币数据存储 5 美国国会图书馆,历史资料永久保存在 IPFS 6 加
2021-06-01 09:31:24
开拓者的车机是兼容苹果和<em>安卓</em>,虽然我不怎么用,但确实兼顾了我家人的很多需求:副驾的门板还配有解锁开关,有的时候老婆开车,下车的时候偶尔会忘记解锁,我在副驾驶可以自己开门:第二排设计很好,不仅配置了一个很大的
2021-06-01 09:30:48
不仅是<em>安卓</em>手机,苹果手机的降价力度也是前所未有了,iPhone12也“跳水价”了,发布价是6799元,如今已经跌至5308元,降价幅度超过1400元,最新定价确认了。iPhone12是苹果首款5G手机,同时也是全球首款5nm芯片的智能机,它
2021-06-01 09:30:45