首頁 > 軟體

解決spring-integration-mqtt頻繁報Lost connection錯誤問題

2023-03-13 06:00:32

問題描述

在之前的部落格介紹瞭如何在 Spring Boot 整合 MQTT,後面使用中沒有發現問題,最近發現一直報錯:

Lost connection: Connection lost; retrying...
Lost connection: 已斷開連線; retrying...

解決過程

網上說是因為 client ID 重複,最開始是不相信的,因為我測試只啟動了一個使用者端。但是卻怎麼都定位不到異常原因,用重新回到 client ID 重複的這個思路上來:

因為程式裡同時作為訂閱者和釋出者,就懷疑訂閱和釋出服務是不是單獨建立的連線,抱著試試看的想法試了一下,結果果然是這個原因

原始碼:

    /* 釋出者 */
    @Bean
    @ServiceActivator(inputChannel = OUTBOUND_CHANNEL)
    public MessageHandler getMqttProducer() {
        MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(clientId, getMqttClientFactory());
        messageHandler.setAsync(true);
        messageHandler.setDefaultTopic(defaultTopic);
        messageHandler.setDefaultRetained(defaultRetained);
        messageHandler.setDefaultQos(defaultProducerQos);

        return messageHandler;
    }

    /* 訂閱者 */
    @Bean
    public MessageProducer getMqttConsumer() {
        MqttPahoMessageDrivenChannelAdapter adapter =
                new MqttPahoMessageDrivenChannelAdapter(clientId, getMqttClientFactory(), consumerTopics);
        adapter.setCompletionTimeout(completionTimeout);
        adapter.setConverter(new DefaultPahoMessageConverter());
        adapter.setQos(defaultConsumerQos);
        adapter.setOutputChannel(inboundChannel());

        return adapter;
    }

訂閱者和釋出者使用的是相同的 client ID,修改後程式碼:

    /* 釋出者 */
    @Bean
    @ServiceActivator(inputChannel = OUTBOUND_CHANNEL)
    public MessageHandler getMqttProducer() {
        MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(clientId + "_producer", getMqttClientFactory());
        messageHandler.setAsync(true);
        messageHandler.setDefaultTopic(defaultTopic);
        messageHandler.setDefaultRetained(defaultRetained);
        messageHandler.setDefaultQos(defaultProducerQos);

        return messageHandler;
    }

    /* 訂閱者 */
    @Bean
    public MessageProducer getMqttConsumer() {
        MqttPahoMessageDrivenChannelAdapter adapter =
                new MqttPahoMessageDrivenChannelAdapter(clientId + "_consumer", getMqttClientFactory(), consumerTopics);
        adapter.setCompletionTimeout(completionTimeout);
        adapter.setConverter(new DefaultPahoMessageConverter());
        adapter.setQos(defaultConsumerQos);
        adapter.setOutputChannel(inboundChannel());

        return adapter;
    }

總結

雖然目前解決了這個問題,但是為什麼會單獨建立兩個連線的原因還未找到;另外,一個程式兩個連線還是感覺怪怪的,不知道還有沒有更優的處理方案。

希望能給大家一個參考,也希望大家多多支援it145.com。


IT145.com E-mail:sddin#qq.com