首頁 > 軟體

spring-integration連線MQTT全過程

2023-03-13 06:00:33

MQTT一種物聯網資料傳輸協定,構建在TCP之上,採用釋出與訂閱的模式進行資料互動,釋出與訂閱是兩個獨立的連線通道,這裡採用spring-integration-mqt來實現釋出與訂閱MQTT,與直接採用MQTT的SDK相對要簡單許多,伺服器端採用ActiveMQ來支援MQTT的訊息服務並實現訊息轉發。

首先需要引入spring-integration-mqt的包

這裡只需要引入這一個包即可。

<dependency>
     <groupId>org.springframework.integration</groupId>
     <artifactId>spring-integration-mqtt</artifactId>
     <version>5.3.1.RELEASE</version>
</dependency>

MQTT的設定比較簡單

和spring-integration整合一樣,需要設定相對應的入站、出站就可以了

具體設定如下:

package org.noka.serialservice.config;
 
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.noka.serialservice.service.MsgSendService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.endpoint.MessageProducerSupport;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.event.MqttSubscribedEvent;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.AbstractMqttMessageHandler;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.integration.support.MessageBuilder;
 
/**--------------------------------------------------------------
 * MQTT 資料轉發服務
 * mqtt.services MQTT服務地址不設定時,不會啟用該服務
 * 檢測mqtt.services這個引數是否設定,以確定是否啟用MQTT服務
 * @author  xiefangjian@163.com
 * @version 1.0.0
 **------------------------------------------------------------*/
@EnableIntegration
@Configuration
@ConditionalOnProperty("mqtt.services")
public class MQTTConfig implements ApplicationListener<ApplicationEvent> {
    private static Logger logger = LoggerFactory.getLogger(MQTTConfig.class);
 
    private final MsgSendService msgSendService;//釋出訊息到訊息中介軟體介面
 
    @Value("${mqtt.appid:mqtt_id}")
    private String appid;//使用者端ID
 
    @Value("${mqtt.input.topic:mqtt_input_topic}")
    private String[] inputTopic;//訂閱主題,可以是多個主題
 
    @Value("${mqtt.out.topic:mqtt_out_topic}")
    private String[] outTopic;//釋出主題,可以是多個主題
 
    @Value("${mqtt.services:#{null}}")
    private String[] mqttServices;//伺服器地址以及埠
 
    @Value("${mqtt.user:#{null}}")
    private String user;//使用者名稱
 
    @Value("${mqtt.password:#{null}}")
    private String password;//密碼
 
    @Value("${mqtt.KeepAliveInterval:300}")
    private Integer KeepAliveInterval;//心跳時間,預設為5分鐘
 
    @Value("${mqtt.CleanSession:false}")
    private Boolean CleanSession;//是否不保持session,預設為session保持
 
    @Value("${mqtt.AutomaticReconnect:true}")
    private Boolean AutomaticReconnect;//是否自動重聯,預設為開啟自動重聯
 
    @Value("${mqtt.CompletionTimeout:30000}")
    private Long CompletionTimeout;//連線超時,預設為30秒
 
    @Value("${mqtt.Qos:1}")
    private Integer Qos;//通訊質量,詳見MQTT協定
 
 
    public MQTTConfig(MsgSendService msgSendService) {
        this.msgSendService = msgSendService;
    }
 
    /**
     * MQTT連線設定
     * @return 連線工廠
     */
    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();//連線工廠類
        MqttConnectOptions options = new MqttConnectOptions();//連線引數
        options.setServerURIs(mqttServices);//連線地址
        if(null!=user) {
            options.setUserName(user);//使用者名稱
        }
        if(null!=password) {
            options.setPassword(password.toCharArray());//密碼
        }
        options.setKeepAliveInterval(KeepAliveInterval);//心跳時間
        options.setAutomaticReconnect(AutomaticReconnect);//斷開是否自動重聯
        options.setCleanSession(CleanSession);//保持session
        factory.setConnectionOptions(options);
        return factory;
    }
 
    /**
     * 入站管道
     * @param mqttPahoClientFactory
     * @return
     */
    @Bean
    public MessageProducerSupport mqttInput(MqttPahoClientFactory mqttPahoClientFactory){
        MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(appid, mqttPahoClientFactory, inputTopic);//建立訂閱連線
        DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter();
        converter.setPayloadAsBytes(true);//bytes型別接收
        adapter.setCompletionTimeout(CompletionTimeout);//連線超時的時間
        adapter.setConverter(converter);
        adapter.setQos(Qos);//訊息質量
        adapter.setOutputChannelName(ChannelName.INPUT_DATA);//輸入管道名稱
        return adapter;
    }
    /**
     * 向伺服器傳送資料管道繫結
     * @param connectionFactory tcp連線工廠類
     * @return 訊息管道物件
     */
    @Bean
    @ServiceActivator(inputChannel = ChannelName.OUTPUT_DATA_MQTT)
    public AbstractMqttMessageHandler MQTTOutAdapter(MqttPahoClientFactory connectionFactory) {
        //建立一個新的出站管道,由於MQTT的釋出與訂閱是兩個獨立的連線,因此使用者端的ID(即APPID)不能與訂閱時所使用的ID一樣,否則在伺服器端會認為是同一個使用者端,而造成連線失敗
        MqttPahoMessageHandler outGate = new MqttPahoMessageHandler(appid + "_put", connectionFactory);
        DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter();
        converter.setPayloadAsBytes(true);//bytes型別接收
        outGate.setAsync(true);
        outGate.setCompletionTimeout(CompletionTimeout);//設定連線超時時時
        outGate.setDefaultQos(Qos);//設定通訊質量
        outGate.setConverter(converter);
        return outGate;
    }
 
    /**
     * MQTT連線時呼叫的方法
     * @param event
     */
    @Override
    public void onApplicationEvent(ApplicationEvent event) {
        if (event instanceof MqttSubscribedEvent) {
            String msg = "OK";
            /**------------------連線時需要傳送起始訊息,寫在這裡-------------**/
            msgSendService.send(MessageBuilder.withPayload(msg.getBytes()).build());
        }
    }
}

其中ChanneName是一個常數類

來標識入站、出站管道的名稱,以便在其它需要的地方使用,實現方法如下:

/** -----------------------------------------
 * 管道名稱常數類
 * @author  xiefangjian@163.com
 * @version 1.0.0
 ** ---------------------------------------**/
public class ChannelName {
    public final static String INPUT_DATA="input_data";//入站管道
    public final static String OUTPUT_DATA_TCP="output_data_TCP";//TCP出站管道
    public final static String OUTPUT_DATA_MQTT="output_data_MQTT";//mqtt出站管道名稱
}

此時所有設定完成,接下來需要做的就是處理接收到的資料和釋出資料,以上設定完成以後,接收和傳送資料都是通過資料管道來完成,設定的是資料管道名稱。

資料傳送閘道器只是一個介面

用於向指定的資料管道里面傳送資料,實現如下:

package org.noka.serialservice.service;
 
import org.noka.serialservice.config.ChannelName;
import org.springframework.integration.annotation.Gateway;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
 
/**----------------------------------------------------------------
 * 傳送訊息閘道器,其它需要發向伺服器傳送訊息時,呼叫該介面
 * @author  xiefangjian@163.com
 * @version  1.0.0
 **--------------------------------------------------------------**/
@MessagingGateway
@Component
public interface MsgGateway {
    /**
     * MQTT 傳送閘道器
     * @param a 主題,可以指定不同的資料釋出主題,在訊息中介軟體裡面體現為不同的訊息佇列
     * @param out 訊息內容
     */
    @Gateway(requestChannel = ChannelName.OUTPUT_DATA_MQTT)
    void send(@Header(MqttHeaders.TOPIC) String a, Message<byte[]> out);
}

在需要的地方,可以向下面這樣呼叫這個介面,向MQTT伺服器傳送訊息

//topic為主題名稱,out為訊息內容
msgGateway.send(topic, out);

MQTT伺服器有資料下發時

會自動調將資料放入設定的入站資料管道中,在需要接收資料的地方,向下面這樣設定即可

    /**
     * 伺服器有資料下發
     * 用ServiceActivator設定需要接收的資料管道名稱,當該管道里面的資料時,會自動呼叫該方法
     * @param in 伺服器有資料下發時,序列化後的物件,這裡使用byte陣列
     */
    @ServiceActivator(inputChannel = ChannelName.INPUT_DATA)
    public void upCase(Message<byte[]> in) {
        logger.info("[net service data]========================================");
        logger.info("[net dow data]"+new String(in.getPayload()));//字串方式列印伺服器下發的資料
        logger.info("[net dow hex]"+ Hex.encodeHexString(in.getPayload(),false));//16進位制方式列印伺服器下發的資料
        serialService.send(in.getPayload());//將伺服器下發的資料轉發給串列埠
    }

最後是引陣列態檔

#--------MQTT---------------------------
#裝置ID,唯一標識
mqtt.appid=mqtt_id
#訂閱主題,多個主題用逗號分隔
mqtt.input.topic=mqtt_input_topic
#釋出主題
mqtt.out.topic=mqtt_out_topic,aac
#MQTT伺服器地址,可以是多個地址
mqtt.services=tcp://47.244.191.41:1883
#mqtt使用者名稱,預設無
#mqtt.user=guest
#mqtt密碼,預設無
#mqtt.password=guest
#心跳間隔時間,預設3000
#mqtt.KeepAliveInterval=3000
#是否不保持session,預設false
#mqtt.CleanSession=false
#是否自動連線,預設true
#mqtt.AutomaticReconnect=true
#連線超時,預設30000
#mqtt.CompletionTimeout=30000
#傳輸質量,預設1
#mqtt.Qos=1

總結

以上為個人經驗,希望能給大家一個參考,也希望大家多多支援it145.com。


IT145.com E-mail:sddin#qq.com