首頁 > 軟體

RocketMQ普通訊息實戰演練詳解

2022-08-22 18:02:15

引言

之前研究了RocketMQ的原始碼,在這裡將各種訊息傳送與消費的demo進行舉例,方便以後使用的時候CV。

相關的設定,安裝和啟動在這篇文章有相關講解  https://www.jb51.net/article/260237.htm

普通訊息同步傳送

同步訊息是指傳送出訊息後,同步等待,直到接收到Broker傳送成功的響應才會繼續傳送下一個訊息。這個方式可以確保訊息傳送到Broker成功,一些重要的訊息可以使用此方式,比如重要的通知。

public static void main(String[] args) throws Exception {
    //範例化訊息生產者物件
    DefaultMQProducer producer = new DefaultMQProducer("group_luke");
    //設定NameSever地址
    producer.setNamesrvAddr("127.0.0.1:9876");
    //啟動Producer範例
    producer.start();
    for (int i = 0; i < 10; i++) {
        Message msg = new Message("topic_luke", "tag", ("這是第"+i+"條訊息。").getBytes(StandardCharsets.UTF_8));
        //同步傳送方式
        SendResult send = producer.send(msg);
        //確認返回
        System.out.println(send);
    }
    //關閉producer
    producer.shutdown();
}

普通訊息非同步傳送

非同步訊息傳送方在傳送了一條訊息後,不等接收方發回響應,接著進行第二條訊息傳送。傳送方通過回撥介面的方式接收伺服器響應,並對響應結果進行處理。

public static void main(String[] args) throws Exception {
    //範例化訊息生產者物件
    DefaultMQProducer producer = new DefaultMQProducer("group_luke");
    //設定NameSever地址
    producer.setNamesrvAddr("127.0.0.1:9876");
    //啟動Producer範例
    producer.start();
    for (int i = 0; i < 10; i++) {
        Message msg = new Message("topic_luke", "tag", ("這是第"+i+"條訊息。").getBytes(StandardCharsets.UTF_8));
        //SendCallback會接收非同步返回結果的回撥
        producer.send(msg, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                System.out.println(sendResult);
            }
            @Override
            public void onException(Throwable throwable) {
                throwable.printStackTrace();
            }
        });
    }
    //若是過早關閉producer,會丟擲The producer service state not OK, SHUTDOWN_ALREADY的錯
    Thread.sleep(10000);
    //關閉producer
    producer.shutdown();
}

普通訊息單向傳送

單項傳送不關心傳送的結果,只傳送請求不等待應答。傳送訊息耗時極短。

public static void main(String[] args) throws Exception {
    //範例化訊息生產者物件
    DefaultMQProducer producer = new DefaultMQProducer("group_luke");
    //設定NameSever地址
    producer.setNamesrvAddr("127.0.0.1:9876");
    //啟動Producer範例
    producer.start();
    for (int i = 0; i < 10; i++) {
        Message msg = new Message("topic_luke", "tag", ("這是第"+i+"條訊息。").getBytes(StandardCharsets.UTF_8));
        //同步傳送方式
        producer.sendOneway(msg);
    }
    //關閉producer
    producer.shutdown();
}

叢集消費模式

消費者採用負載均衡的方式消費訊息,同一個Group下的多個Consumer共同消費Queue裡的Message,每個Consumer處理的訊息不同。

一個Consumer Group中的各個Consumer範例分共同消費訊息,即一條訊息只會投遞到一個Group下面的一個範例,並且只消費一遍。

例如某個Topic有3個佇列,其中一個Consumer Group 有 3 個範例,那麼每個範例只消費其中的1個佇列。叢集消費模式是消費者預設的消費方式。

public static void main(String[] args) throws Exception {
    //範例化訊息消費者
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_luke");
    //指定nameserver地址
    consumer.setNamesrvAddr("127.0.0.1:9876");
    //訂閱topic,"*"表示所有tag
    consumer.subscribe("topic_luke","*");
    consumer.setMessageModel(MessageModel.CLUSTERING);
    // 註冊回撥實現類來處理從broker拉取回來的訊息
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @SneakyThrows
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            for (MessageExt msg : msgs) {
                System.out.println(new String(msg.getBody()));
            }
            // 標記該訊息已經被成功消費
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    // 啟動消費者範例
    consumer.start();
    System.out.printf("Consumer Started.%n");
}

廣播消費模式

廣播消費模式中把訊息對一個Group下的各個Consumer範例都投遞一遍。也就是說訊息也會被 Group 中的每個Consumer都消費一次。

實際上,是一個消費組下的每個消費者範例都獲取到了topic下面的每個Message Queue去拉取消費。所以訊息會投遞到每個消費者範例。

public static void main(String[] args) throws Exception {
    //範例化訊息消費者
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_luke");
    //指定nameserver地址
    consumer.setNamesrvAddr("127.0.0.1:9876");
    //訂閱topic,"*"表示所有tag
    consumer.subscribe("topic_luke","*");
    consumer.setMessageModel(MessageModel.BROADCASTING);
    // 註冊回撥實現類來處理從broker拉取回來的訊息
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @SneakyThrows
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            for (MessageExt msg : msgs) {
                System.out.println(new String(msg.getBody()));
            }
            // 標記該訊息已經被成功消費
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    // 啟動消費者範例
    consumer.start();
    System.out.printf("Consumer Started.%n");
}

以上就是RocketMQ普通訊息實戰演練詳解的詳細內容,更多關於RocketMQ普通訊息的資料請關注it145.com其它相關文章!


IT145.com E-mail:sddin#qq.com