<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
MQTT一種物聯網資料傳輸協定,構建在TCP之上,採用釋出與訂閱的模式進行資料互動,釋出與訂閱是兩個獨立的連線通道,這裡採用spring-integration-mqt來實現釋出與訂閱MQTT,與直接採用MQTT的SDK相對要簡單許多,伺服器端採用ActiveMQ來支援MQTT的訊息服務並實現訊息轉發。
這裡只需要引入這一個包即可。
<dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-mqtt</artifactId> <version>5.3.1.RELEASE</version> </dependency>
和spring-integration整合一樣,需要設定相對應的入站、出站就可以了
具體設定如下:
package org.noka.serialservice.config; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.noka.serialservice.service.MsgSendService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.ApplicationEvent; import org.springframework.context.ApplicationListener; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.config.EnableIntegration; import org.springframework.integration.endpoint.MessageProducerSupport; import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory; import org.springframework.integration.mqtt.core.MqttPahoClientFactory; import org.springframework.integration.mqtt.event.MqttSubscribedEvent; import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter; import org.springframework.integration.mqtt.outbound.AbstractMqttMessageHandler; import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler; import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter; import org.springframework.integration.support.MessageBuilder; /**-------------------------------------------------------------- * MQTT 資料轉發服務 * mqtt.services MQTT服務地址不設定時,不會啟用該服務 * 檢測mqtt.services這個引數是否設定,以確定是否啟用MQTT服務 * @author xiefangjian@163.com * @version 1.0.0 **------------------------------------------------------------*/ @EnableIntegration @Configuration @ConditionalOnProperty("mqtt.services") public class MQTTConfig implements ApplicationListener<ApplicationEvent> { private static Logger logger = LoggerFactory.getLogger(MQTTConfig.class); private final MsgSendService msgSendService;//釋出訊息到訊息中介軟體介面 @Value("${mqtt.appid:mqtt_id}") private String appid;//使用者端ID @Value("${mqtt.input.topic:mqtt_input_topic}") private String[] inputTopic;//訂閱主題,可以是多個主題 @Value("${mqtt.out.topic:mqtt_out_topic}") private String[] outTopic;//釋出主題,可以是多個主題 @Value("${mqtt.services:#{null}}") private String[] mqttServices;//伺服器地址以及埠 @Value("${mqtt.user:#{null}}") private String user;//使用者名稱 @Value("${mqtt.password:#{null}}") private String password;//密碼 @Value("${mqtt.KeepAliveInterval:300}") private Integer KeepAliveInterval;//心跳時間,預設為5分鐘 @Value("${mqtt.CleanSession:false}") private Boolean CleanSession;//是否不保持session,預設為session保持 @Value("${mqtt.AutomaticReconnect:true}") private Boolean AutomaticReconnect;//是否自動重聯,預設為開啟自動重聯 @Value("${mqtt.CompletionTimeout:30000}") private Long CompletionTimeout;//連線超時,預設為30秒 @Value("${mqtt.Qos:1}") private Integer Qos;//通訊質量,詳見MQTT協定 public MQTTConfig(MsgSendService msgSendService) { this.msgSendService = msgSendService; } /** * MQTT連線設定 * @return 連線工廠 */ @Bean public MqttPahoClientFactory mqttClientFactory() { DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();//連線工廠類 MqttConnectOptions options = new MqttConnectOptions();//連線引數 options.setServerURIs(mqttServices);//連線地址 if(null!=user) { options.setUserName(user);//使用者名稱 } if(null!=password) { options.setPassword(password.toCharArray());//密碼 } options.setKeepAliveInterval(KeepAliveInterval);//心跳時間 options.setAutomaticReconnect(AutomaticReconnect);//斷開是否自動重聯 options.setCleanSession(CleanSession);//保持session factory.setConnectionOptions(options); return factory; } /** * 入站管道 * @param mqttPahoClientFactory * @return */ @Bean public MessageProducerSupport mqttInput(MqttPahoClientFactory mqttPahoClientFactory){ MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(appid, mqttPahoClientFactory, inputTopic);//建立訂閱連線 DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter(); converter.setPayloadAsBytes(true);//bytes型別接收 adapter.setCompletionTimeout(CompletionTimeout);//連線超時的時間 adapter.setConverter(converter); adapter.setQos(Qos);//訊息質量 adapter.setOutputChannelName(ChannelName.INPUT_DATA);//輸入管道名稱 return adapter; } /** * 向伺服器傳送資料管道繫結 * @param connectionFactory tcp連線工廠類 * @return 訊息管道物件 */ @Bean @ServiceActivator(inputChannel = ChannelName.OUTPUT_DATA_MQTT) public AbstractMqttMessageHandler MQTTOutAdapter(MqttPahoClientFactory connectionFactory) { //建立一個新的出站管道,由於MQTT的釋出與訂閱是兩個獨立的連線,因此使用者端的ID(即APPID)不能與訂閱時所使用的ID一樣,否則在伺服器端會認為是同一個使用者端,而造成連線失敗 MqttPahoMessageHandler outGate = new MqttPahoMessageHandler(appid + "_put", connectionFactory); DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter(); converter.setPayloadAsBytes(true);//bytes型別接收 outGate.setAsync(true); outGate.setCompletionTimeout(CompletionTimeout);//設定連線超時時時 outGate.setDefaultQos(Qos);//設定通訊質量 outGate.setConverter(converter); return outGate; } /** * MQTT連線時呼叫的方法 * @param event */ @Override public void onApplicationEvent(ApplicationEvent event) { if (event instanceof MqttSubscribedEvent) { String msg = "OK"; /**------------------連線時需要傳送起始訊息,寫在這裡-------------**/ msgSendService.send(MessageBuilder.withPayload(msg.getBytes()).build()); } } }
來標識入站、出站管道的名稱,以便在其它需要的地方使用,實現方法如下:
/** ----------------------------------------- * 管道名稱常數類 * @author xiefangjian@163.com * @version 1.0.0 ** ---------------------------------------**/ public class ChannelName { public final static String INPUT_DATA="input_data";//入站管道 public final static String OUTPUT_DATA_TCP="output_data_TCP";//TCP出站管道 public final static String OUTPUT_DATA_MQTT="output_data_MQTT";//mqtt出站管道名稱 }
此時所有設定完成,接下來需要做的就是處理接收到的資料和釋出資料,以上設定完成以後,接收和傳送資料都是通過資料管道來完成,設定的是資料管道名稱。
用於向指定的資料管道里面傳送資料,實現如下:
package org.noka.serialservice.service; import org.noka.serialservice.config.ChannelName; import org.springframework.integration.annotation.Gateway; import org.springframework.integration.annotation.MessagingGateway; import org.springframework.integration.mqtt.support.MqttHeaders; import org.springframework.messaging.Message; import org.springframework.messaging.handler.annotation.Header; import org.springframework.stereotype.Component; /**---------------------------------------------------------------- * 傳送訊息閘道器,其它需要發向伺服器傳送訊息時,呼叫該介面 * @author xiefangjian@163.com * @version 1.0.0 **--------------------------------------------------------------**/ @MessagingGateway @Component public interface MsgGateway { /** * MQTT 傳送閘道器 * @param a 主題,可以指定不同的資料釋出主題,在訊息中介軟體裡面體現為不同的訊息佇列 * @param out 訊息內容 */ @Gateway(requestChannel = ChannelName.OUTPUT_DATA_MQTT) void send(@Header(MqttHeaders.TOPIC) String a, Message<byte[]> out); }
在需要的地方,可以向下面這樣呼叫這個介面,向MQTT伺服器傳送訊息
//topic為主題名稱,out為訊息內容 msgGateway.send(topic, out);
會自動調將資料放入設定的入站資料管道中,在需要接收資料的地方,向下面這樣設定即可
/** * 伺服器有資料下發 * 用ServiceActivator設定需要接收的資料管道名稱,當該管道里面的資料時,會自動呼叫該方法 * @param in 伺服器有資料下發時,序列化後的物件,這裡使用byte陣列 */ @ServiceActivator(inputChannel = ChannelName.INPUT_DATA) public void upCase(Message<byte[]> in) { logger.info("[net service data]========================================"); logger.info("[net dow data]"+new String(in.getPayload()));//字串方式列印伺服器下發的資料 logger.info("[net dow hex]"+ Hex.encodeHexString(in.getPayload(),false));//16進位制方式列印伺服器下發的資料 serialService.send(in.getPayload());//將伺服器下發的資料轉發給串列埠 }
#--------MQTT--------------------------- #裝置ID,唯一標識 mqtt.appid=mqtt_id #訂閱主題,多個主題用逗號分隔 mqtt.input.topic=mqtt_input_topic #釋出主題 mqtt.out.topic=mqtt_out_topic,aac #MQTT伺服器地址,可以是多個地址 mqtt.services=tcp://47.244.191.41:1883 #mqtt使用者名稱,預設無 #mqtt.user=guest #mqtt密碼,預設無 #mqtt.password=guest #心跳間隔時間,預設3000 #mqtt.KeepAliveInterval=3000 #是否不保持session,預設false #mqtt.CleanSession=false #是否自動連線,預設true #mqtt.AutomaticReconnect=true #連線超時,預設30000 #mqtt.CompletionTimeout=30000 #傳輸質量,預設1 #mqtt.Qos=1
以上為個人經驗,希望能給大家一個參考,也希望大家多多支援it145.com。
相關文章
<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
综合看Anker超能充系列的性价比很高,并且与不仅和iPhone12/苹果<em>Mac</em>Book很配,而且适合多设备充电需求的日常使用或差旅场景,不管是安卓还是Switch同样也能用得上它,希望这次分享能给准备购入充电器的小伙伴们有所
2021-06-01 09:31:42
除了L4WUDU与吴亦凡已经多次共事,成为了明面上的厂牌成员,吴亦凡还曾带领20XXCLUB全队参加2020年的一场音乐节,这也是20XXCLUB首次全员合照,王嗣尧Turbo、陈彦希Regi、<em>Mac</em> Ova Seas、林渝植等人全部出场。然而让
2021-06-01 09:31:34
目前应用IPFS的机构:1 谷歌<em>浏览器</em>支持IPFS分布式协议 2 万维网 (历史档案博物馆)数据库 3 火狐<em>浏览器</em>支持 IPFS分布式协议 4 EOS 等数字货币数据存储 5 美国国会图书馆,历史资料永久保存在 IPFS 6 加
2021-06-01 09:31:24
开拓者的车机是兼容苹果和<em>安卓</em>,虽然我不怎么用,但确实兼顾了我家人的很多需求:副驾的门板还配有解锁开关,有的时候老婆开车,下车的时候偶尔会忘记解锁,我在副驾驶可以自己开门:第二排设计很好,不仅配置了一个很大的
2021-06-01 09:30:48
不仅是<em>安卓</em>手机,苹果手机的降价力度也是前所未有了,iPhone12也“跳水价”了,发布价是6799元,如今已经跌至5308元,降价幅度超过1400元,最新定价确认了。iPhone12是苹果首款5G手机,同时也是全球首款5nm芯片的智能机,它
2021-06-01 09:30:45