<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
對於釋出者:
1.訊息通過訊息閘道器傳送出去,由 MessageChannel
的範例 DirectChannel
處理傳送的細節。
2.DirectChannel
收到訊息後,內部通過 MessageHandler
的範例 MqttPahoMessageHandler
傳送到指定的 Topic。
對於訂閱者:
1.通過注入 MessageProducerSupport
的範例 MqttPahoMessageDrivenChannelAdapter
,實現訂閱 Topic 和繫結訊息消費的 MessageChannel
。
2.同樣由 MessageChannel
的範例 DirectChannel
處理消費細節。
Channel 訊息後會傳送給我們自定義的 MqttInboundMessageHandler
範例進行消費。
可以看到整個處理的流程和前面將的基本一致。Spring Integration 就是抽象出了這麼一套訊息通訊的機制,具體的通訊細節由它整合的中介軟體來決定
<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-integration --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-integration</artifactId> <version>2.5.1</version> </dependency> <!-- https://mvnrepository.com/artifact/org.springframework.integration/spring-integration-stream --> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-stream</artifactId> <version>5.5.5</version> </dependency> <!-- https://mvnrepository.com/artifact/org.springframework.integration/spring-integration-mqtt --> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-mqtt</artifactId> <version>5.5.5</version> </dependency>
#mqtt設定 mqtt: username: 123 password: 123 #MQTT-伺服器連線地址,如果有多個,用逗號隔開 url: tcp://127.0.0.1:1883 #MQTT-連線伺服器預設使用者端ID client: id: ${random.value} default: #MQTT-預設的訊息推播主題,實際可在呼叫介面時指定 topic: topic,mqtt/test/# #連線超時 completionTimeout: 3000
import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.integration.annotation.IntegrationComponentScan; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.core.MessageProducer; import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory; import org.springframework.integration.mqtt.core.MqttPahoClientFactory; import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter; import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler; import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter; import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; import org.springframework.messaging.MessagingException; import java.util.Arrays; import java.util.List; /** * mqtt 推播and接收 訊息類 **/ @Configuration @IntegrationComponentScan @Slf4j public class MqttSenderAndReceiveConfig { private static final byte[] WILL_DATA; static { WILL_DATA = "offline".getBytes(); } @Autowired private MqttReceiveHandle mqttReceiveHandle; @Value("${mqtt.username}") private String username; @Value("${mqtt.password}") private String password; @Value("${mqtt.url}") private String hostUrl; @Value("${mqtt.client.id}") private String clientId; @Value("${mqtt.default.topic}") private String defaultTopic; @Value("${mqtt.completionTimeout}") private int completionTimeout; //連線超時 /** * MQTT聯結器選項 **/ @Bean(value = "getMqttConnectOptions") public MqttConnectOptions getMqttConnectOptions1() { MqttConnectOptions mqttConnectOptions = new MqttConnectOptions(); // 設定是否清空session,這裡如果設定為false表示伺服器會保留使用者端的連線記錄,這裡設定為true表示每次連線到伺服器都以新的身份連線 mqttConnectOptions.setCleanSession(true); // 設定超時時間 單位為秒 mqttConnectOptions.setConnectionTimeout(10); mqttConnectOptions.setAutomaticReconnect(true); mqttConnectOptions.setUserName(username); mqttConnectOptions.setPassword(password.toCharArray()); mqttConnectOptions.setServerURIs(new String[]{hostUrl}); // 設定對談心跳時間 單位為秒 伺服器會每隔1.5*20秒的時間向用戶端傳送心跳判斷使用者端是否線上,但這個方法並沒有重連的機制 mqttConnectOptions.setKeepAliveInterval(10); // 設定「遺囑」訊息的話題,若使用者端與伺服器之間的連線意外中斷,伺服器將釋出使用者端的「遺囑」訊息。 //mqttConnectOptions.setWill("willTopic", WILL_DATA, 2, false); return mqttConnectOptions; } /** * MQTT工廠 **/ @Bean public MqttPahoClientFactory mqttClientFactory() { DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); factory.setConnectionOptions(getMqttConnectOptions1()); return factory; } /** * MQTT資訊通道(生產者) **/ @Bean public MessageChannel mqttOutboundChannel() { return new DirectChannel(); } /** * MQTT訊息處理器(生產者) **/ @Bean @ServiceActivator(inputChannel = "mqttOutboundChannel") public MessageHandler mqttOutbound() { MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(clientId + "_producer", mqttClientFactory()); messageHandler.setAsync(true); messageHandler.setDefaultTopic(defaultTopic); messageHandler.setAsyncEvents(true); // 訊息傳送和傳輸完成會有非同步的通知回撥 //設定轉換器 傳送bytes資料 DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter(); converter.setPayloadAsBytes(true); return messageHandler; } /** * 設定client,監聽的topic * MQTT訊息訂閱繫結(消費者) **/ @Bean public MessageProducer inbound() { List<String> topicList = Arrays.asList(defaultTopic.trim().split(",")); String[] topics = new String[topicList.size()]; topicList.toArray(topics); MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(clientId + "_consumer", mqttClientFactory(), topics); adapter.setCompletionTimeout(completionTimeout); DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter(); converter.setPayloadAsBytes(true); adapter.setConverter(converter); adapter.setQos(2); adapter.setOutputChannel(mqttInputChannel()); return adapter; } /** * MQTT資訊通道(消費者) **/ @Bean public MessageChannel mqttInputChannel() { return new DirectChannel(); } /** * MQTT訊息處理器(消費者) **/ @Bean @ServiceActivator(inputChannel = "mqttInputChannel") public MessageHandler handler() { return new MessageHandler() { @Override public void handleMessage(Message<?> message) throws MessagingException { //處理接收訊息 mqttReceiveHandle.handle(message); } }; } }
/** * mqtt使用者端訊息處理類 **/ @Slf4j @Component public class MqttReceiveHandle { public void handle(Message<?> message) { log.info("收到訂閱訊息: {}", message); String topic = message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC).toString(); log.info("訊息主題:{}", topic); Object payLoad = message.getPayload(); byte[] data = (byte[]) payLoad; Packet packet = Packet.parse(data); log.info("傳送的Packet資料{}", JSON.toJSONString(packet)); } }
import org.springframework.integration.annotation.MessagingGateway; import org.springframework.integration.mqtt.support.MqttHeaders; import org.springframework.messaging.handler.annotation.Header; /** * mqtt傳送訊息 * (defaultRequestChannel = "mqttOutboundChannel" 對應config設定) * **/ @MessagingGateway(defaultRequestChannel = "mqttOutboundChannel") public interface MqttGateway { /** * 傳送資訊到MQTT伺服器 * * @param */ void sendToMqttObject(@Header(MqttHeaders.TOPIC) String topic, byte[] payload); /** * 傳送資訊到MQTT伺服器 * * @param topic 主題 * @param payload 訊息主體 */ void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload); /** * 傳送資訊到MQTT伺服器 * * @param topic 主題 * @param qos 對訊息處理的幾種機制。 * 0 表示的是訂閱者沒收到訊息不會再次傳送,訊息會丟失。 * 1 表示的是會嘗試重試,一直到接收到訊息,但這種情況可能導致訂閱者收到多次重複訊息。 * 2 多了一次去重的動作,確保訂閱者收到的訊息有一次。 * @param payload 訊息主體 */ void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload); /** * 傳送資訊到MQTT伺服器 * * @param topic 主題 * @param payload 訊息主體 */ void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, Object payload); /** * 傳送資訊到MQTT伺服器 * * @param topic 主題 * @param payload 訊息主體 */ void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, byte[] payload); }
import lombok.extern.slf4j.Slf4j; import org.springframework.context.event.EventListener; import org.springframework.integration.mqtt.event.MqttConnectionFailedEvent; import org.springframework.integration.mqtt.event.MqttMessageDeliveredEvent; import org.springframework.integration.mqtt.event.MqttMessageSentEvent; import org.springframework.integration.mqtt.event.MqttSubscribedEvent; import org.springframework.stereotype.Component; @Slf4j @Component public class MqttListener { /** * 連線失敗的事件通知 * @param mqttConnectionFailedEvent */ @EventListener(classes = MqttConnectionFailedEvent.class) public void listenerAction(MqttConnectionFailedEvent mqttConnectionFailedEvent) { log.info("連線失敗的事件通知"); } /** * 已傳送的事件通知 * @param mqttMessageSentEvent */ @EventListener(classes = MqttMessageSentEvent.class) public void listenerAction(MqttMessageSentEvent mqttMessageSentEvent) { log.info("已傳送的事件通知"); } /** * 已傳輸完成的事件通知 * 1.QOS == 0,傳送訊息後會即可進行此事件回撥,因為不需要等待回執 * 2.QOS == 1,傳送訊息後會等待ACK回執,ACK回執後會進行此事件通知 * 3.QOS == 2,傳送訊息後會等待PubRECV回執,知道收到PubCOMP後會進行此事件通知 * @param mqttMessageDeliveredEvent */ @EventListener(classes = MqttMessageDeliveredEvent.class) public void listenerAction(MqttMessageDeliveredEvent mqttMessageDeliveredEvent) { log.info("已傳輸完成的事件通知"); } /** * 訊息訂閱的事件通知 * @param mqttSubscribedEvent */ @EventListener(classes = MqttSubscribedEvent.class) public void listenerAction(MqttSubscribedEvent mqttSubscribedEvent) { log.info("訊息訂閱的事件通知"); } }
@Resource private MqttGateway mqttGateway; /** * sendData 訊息 * topic 訂閱主題 **/ @RequestMapping(value = "/sendMqtt",method = RequestMethod.POST) public String sendMqtt(String sendData, String topic) { MqttMessage mqttMessage = new MqttMessage(); mqttGateway.sendToMqtt(topic, sendData); //mqttGateway.sendToMqttObject(topic, sendData.getBytes()); return "OK"; }
以上為個人經驗,希望能給大家一個參考,也希望大家多多支援it145.com。
相關文章
<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
综合看Anker超能充系列的性价比很高,并且与不仅和iPhone12/苹果<em>Mac</em>Book很配,而且适合多设备充电需求的日常使用或差旅场景,不管是安卓还是Switch同样也能用得上它,希望这次分享能给准备购入充电器的小伙伴们有所
2021-06-01 09:31:42
除了L4WUDU与吴亦凡已经多次共事,成为了明面上的厂牌成员,吴亦凡还曾带领20XXCLUB全队参加2020年的一场音乐节,这也是20XXCLUB首次全员合照,王嗣尧Turbo、陈彦希Regi、<em>Mac</em> Ova Seas、林渝植等人全部出场。然而让
2021-06-01 09:31:34
目前应用IPFS的机构:1 谷歌<em>浏览器</em>支持IPFS分布式协议 2 万维网 (历史档案博物馆)数据库 3 火狐<em>浏览器</em>支持 IPFS分布式协议 4 EOS 等数字货币数据存储 5 美国国会图书馆,历史资料永久保存在 IPFS 6 加
2021-06-01 09:31:24
开拓者的车机是兼容苹果和<em>安卓</em>,虽然我不怎么用,但确实兼顾了我家人的很多需求:副驾的门板还配有解锁开关,有的时候老婆开车,下车的时候偶尔会忘记解锁,我在副驾驶可以自己开门:第二排设计很好,不仅配置了一个很大的
2021-06-01 09:30:48
不仅是<em>安卓</em>手机,苹果手机的降价力度也是前所未有了,iPhone12也“跳水价”了,发布价是6799元,如今已经跌至5308元,降价幅度超过1400元,最新定价确认了。iPhone12是苹果首款5G手机,同时也是全球首款5nm芯片的智能机,它
2021-06-01 09:30:45