<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
之前研究了RocketMQ的原始碼,在這裡將各種訊息傳送與消費的demo進行舉例,方便以後使用的時候CV。
相關的設定,安裝和啟動在這篇文章有相關講解 https://www.jb51.net/article/260237.htm
同步訊息是指傳送出訊息後,同步等待,直到接收到Broker傳送成功的響應才會繼續傳送下一個訊息。這個方式可以確保訊息傳送到Broker成功,一些重要的訊息可以使用此方式,比如重要的通知。
public static void main(String[] args) throws Exception { //範例化訊息生產者物件 DefaultMQProducer producer = new DefaultMQProducer("group_luke"); //設定NameSever地址 producer.setNamesrvAddr("127.0.0.1:9876"); //啟動Producer範例 producer.start(); for (int i = 0; i < 10; i++) { Message msg = new Message("topic_luke", "tag", ("這是第"+i+"條訊息。").getBytes(StandardCharsets.UTF_8)); //同步傳送方式 SendResult send = producer.send(msg); //確認返回 System.out.println(send); } //關閉producer producer.shutdown(); }
非同步訊息傳送方在傳送了一條訊息後,不等接收方發回響應,接著進行第二條訊息傳送。傳送方通過回撥介面的方式接收伺服器響應,並對響應結果進行處理。
public static void main(String[] args) throws Exception { //範例化訊息生產者物件 DefaultMQProducer producer = new DefaultMQProducer("group_luke"); //設定NameSever地址 producer.setNamesrvAddr("127.0.0.1:9876"); //啟動Producer範例 producer.start(); for (int i = 0; i < 10; i++) { Message msg = new Message("topic_luke", "tag", ("這是第"+i+"條訊息。").getBytes(StandardCharsets.UTF_8)); //SendCallback會接收非同步返回結果的回撥 producer.send(msg, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { System.out.println(sendResult); } @Override public void onException(Throwable throwable) { throwable.printStackTrace(); } }); } //若是過早關閉producer,會丟擲The producer service state not OK, SHUTDOWN_ALREADY的錯 Thread.sleep(10000); //關閉producer producer.shutdown(); }
單項傳送不關心傳送的結果,只傳送請求不等待應答。傳送訊息耗時極短。
public static void main(String[] args) throws Exception { //範例化訊息生產者物件 DefaultMQProducer producer = new DefaultMQProducer("group_luke"); //設定NameSever地址 producer.setNamesrvAddr("127.0.0.1:9876"); //啟動Producer範例 producer.start(); for (int i = 0; i < 10; i++) { Message msg = new Message("topic_luke", "tag", ("這是第"+i+"條訊息。").getBytes(StandardCharsets.UTF_8)); //同步傳送方式 producer.sendOneway(msg); } //關閉producer producer.shutdown(); }
消費者採用負載均衡的方式消費訊息,同一個Group下的多個Consumer共同消費Queue裡的Message,每個Consumer處理的訊息不同。
一個Consumer Group中的各個Consumer範例分共同消費訊息,即一條訊息只會投遞到一個Group下面的一個範例,並且只消費一遍。
例如某個Topic有3個佇列,其中一個Consumer Group 有 3 個範例,那麼每個範例只消費其中的1個佇列。叢集消費模式是消費者預設的消費方式。
public static void main(String[] args) throws Exception { //範例化訊息消費者 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_luke"); //指定nameserver地址 consumer.setNamesrvAddr("127.0.0.1:9876"); //訂閱topic,"*"表示所有tag consumer.subscribe("topic_luke","*"); consumer.setMessageModel(MessageModel.CLUSTERING); // 註冊回撥實現類來處理從broker拉取回來的訊息 consumer.registerMessageListener(new MessageListenerConcurrently() { @SneakyThrows @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { for (MessageExt msg : msgs) { System.out.println(new String(msg.getBody())); } // 標記該訊息已經被成功消費 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); // 啟動消費者範例 consumer.start(); System.out.printf("Consumer Started.%n"); }
廣播消費模式中把訊息對一個Group下的各個Consumer範例都投遞一遍。也就是說訊息也會被 Group 中的每個Consumer都消費一次。
實際上,是一個消費組下的每個消費者範例都獲取到了topic下面的每個Message Queue去拉取消費。所以訊息會投遞到每個消費者範例。
public static void main(String[] args) throws Exception { //範例化訊息消費者 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_luke"); //指定nameserver地址 consumer.setNamesrvAddr("127.0.0.1:9876"); //訂閱topic,"*"表示所有tag consumer.subscribe("topic_luke","*"); consumer.setMessageModel(MessageModel.BROADCASTING); // 註冊回撥實現類來處理從broker拉取回來的訊息 consumer.registerMessageListener(new MessageListenerConcurrently() { @SneakyThrows @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { for (MessageExt msg : msgs) { System.out.println(new String(msg.getBody())); } // 標記該訊息已經被成功消費 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); // 啟動消費者範例 consumer.start(); System.out.printf("Consumer Started.%n"); }
以上就是RocketMQ普通訊息實戰演練詳解的詳細內容,更多關於RocketMQ普通訊息的資料請關注it145.com其它相關文章!
相關文章
<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
综合看Anker超能充系列的性价比很高,并且与不仅和iPhone12/苹果<em>Mac</em>Book很配,而且适合多设备充电需求的日常使用或差旅场景,不管是安卓还是Switch同样也能用得上它,希望这次分享能给准备购入充电器的小伙伴们有所
2021-06-01 09:31:42
除了L4WUDU与吴亦凡已经多次共事,成为了明面上的厂牌成员,吴亦凡还曾带领20XXCLUB全队参加2020年的一场音乐节,这也是20XXCLUB首次全员合照,王嗣尧Turbo、陈彦希Regi、<em>Mac</em> Ova Seas、林渝植等人全部出场。然而让
2021-06-01 09:31:34
目前应用IPFS的机构:1 谷歌<em>浏览器</em>支持IPFS分布式协议 2 万维网 (历史档案博物馆)数据库 3 火狐<em>浏览器</em>支持 IPFS分布式协议 4 EOS 等数字货币数据存储 5 美国国会图书馆,历史资料永久保存在 IPFS 6 加
2021-06-01 09:31:24
开拓者的车机是兼容苹果和<em>安卓</em>,虽然我不怎么用,但确实兼顾了我家人的很多需求:副驾的门板还配有解锁开关,有的时候老婆开车,下车的时候偶尔会忘记解锁,我在副驾驶可以自己开门:第二排设计很好,不仅配置了一个很大的
2021-06-01 09:30:48
不仅是<em>安卓</em>手机,苹果手机的降价力度也是前所未有了,iPhone12也“跳水价”了,发布价是6799元,如今已经跌至5308元,降价幅度超过1400元,最新定价确认了。iPhone12是苹果首款5G手机,同时也是全球首款5nm芯片的智能机,它
2021-06-01 09:30:45