<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
前一段有幸參與到一個智慧家居專案的開發,由於之前都沒有過這方面的開發經驗,所以對智慧硬體的開發模式和技術棧都頗為好奇。
智慧可燃氣體報警器
產品是一款可燃氣體報警器,如果家中燃氣洩露濃度到達一定閾值,報警器檢測到並上傳氣體濃度值給後臺,後臺以電話、簡訊、微信等方式,提醒使用者家中可能有氣體洩漏。
使用者還可能向報警器發一些關閉報警、調整音量的指令等。整體功能還是比較簡單的,大致的邏輯如下圖所示:
但當我真正的參與其中開發時,其實有一點小小的失望,因為在整個研發過程中,並沒用到什麼新的技術,還是常規的幾種中介軟體,只不過換個用法而已。
技術選型用rabbitmq
來做核心的元件,主要考慮到運維成本低,組內成員使用的熟練度比較高。
下面和小夥伴分享一下如何用 springboot
+ rabbitmq
搭建物聯網(IOT
)平臺,其實智慧硬體也沒想象的那麼高不可攀!
很多小夥伴可能有點懵?rabbitmq
不是訊息佇列嗎?怎麼又能做智慧硬體了?
其實rabbitmq
有兩種協定,我們平時接觸的訊息佇列是用的AMQP
協定,而用在智慧硬體中的是MQTT
協定。
MQTT
全稱(Message Queue Telemetry Transport):一種基於釋出/訂閱(publish
/subscribe
)模式的輕量級
通訊協定,通過訂閱相應的主題來獲取訊息,是物聯網(Internet of Thing
)中的一個標準傳輸協定。
該協定將訊息的釋出者(publisher
)與訂閱者(subscriber
)進行分離,因此可以在不可靠的網路環境中,為遠端連線的裝置提供可靠的訊息服務,使用方式與傳統的MQ有點類似。
TCP
協定位於傳輸層,MQTT
協定位於應用層,MQTT
協定構建於TCP/IP
協定上,也就是說只要支援TCP/IP
協定棧的地方,都可以使用MQTT
協定。
MQTT
協定為什麼在物聯網(IOT)中如此受偏愛?而不是其它協定,比如我們更為熟悉的 HTTP
協定呢?
HTTP
協定它是一種同步協定,使用者端請求後需要等待伺服器的響應。而在物聯網(IOT)環境中,裝置會很受制於環境的影響,比如頻寬低、網路延遲高、網路通訊不穩定等,顯然非同步訊息協定更為適合IOT
應用程式。HTTP
是單向的,如果要獲取訊息使用者端必須發起連線,而在物聯網(IOT)應用程式中,裝置或感測器往往都是使用者端,這意味著它們無法被動地接收來自網路的命令。HTTP
要實現這樣的功能不但很困難,而且成本極高。前邊說過MQTT
是一種輕量級的協定,它只專注於發訊息, 所以此協定的結構也非常簡單。
在MQTT
協定中,一個MQTT
封包由:固定頭
(Fixed header)、 可變頭
(Variable header)、 訊息體
(payload)三部分構成。
在這裡插入圖片描述
固定頭部,使用兩個位元組,共16位元:
(4-7)位表示訊息型別,使用4位元二進位制表示,可代表如下的16種訊息型別,不過 0 和 15位置屬於保留待用,所以共14種訊息事件型別。
DUP Flag(重試標識)
DUP Flag:保證訊息可靠傳輸,訊息是否已送達的標識。預設為0,只佔用一個位元組,表示第一次傳送,當值為1時,表示當前訊息先前已經被傳送過。
QoS Level(訊息質量等級)
QoS Level:訊息的質量等級,後邊會詳細介紹
RETAIN(持久化)
1
:表示傳送的訊息需要一直持久儲存,而且不受伺服器重啟影響,不但要傳送給當前的訂閱者,且以後新加入的使用者端訂閱了此Topic
,訂閱者也會馬上得到推播。注意:新加入的訂閱者,只會取出最新的一個RETAIN flag = 1
的訊息推播。0
:僅為當前訂閱者推播此訊息。Remaining Length(剩餘長度)
在當前訊息中剩餘的byte
(位元組)數,包含可變頭部和訊息體payload。
固定頭部僅定義了訊息型別和一些標誌位,一些訊息的後設資料需要放入可變頭部中。可變頭部內容位元組長度 + 訊息體payload = 剩餘長度。
可變頭部居於固定頭部和payload中間,包含了協定名稱,版本號,連線標誌,使用者授權,心跳時間等內容。
可變頭存在於這些型別的訊息:PUBLISH (QoS > 0)、PUBACK、PUBREC、PUBREL、PUBCOMP、SUBSCRIBE、SUBACK、UNSUBSCRIBE、UNSUBACK。
訊息體payload只存在於CONNECT
、PUBLISH
、SUBSCRIBE
、SUBACK
、UNSUBSCRIBE
這幾種型別的訊息:
CONNECT
:包含使用者端的ClientId
、訂閱的Topic
、Message
以及使用者名稱
和密碼
。PUBLISH
:向對應主題傳送訊息。SUBSCRIBE
:要訂閱的主題以及QoS
。SUBACK
:伺服器對於SUBSCRIBE
所申請的主題及QoS
進行確認和回覆。UNSUBSCRIBE
:取消要訂閱的主題。訊息質量
(Quality of Service),即訊息的傳送質量,釋出者(publisher
)和訂閱者(subscriber
)都可以指定qos
等級,有QoS 0
、QoS 1
、QoS 2
三個等級。
下邊分別說明一下這三個等級的區別。
Qos 0:At most once
(至多一次)只傳送一次訊息,不保證訊息是否成功送達,沒有確認機制,訊息可能會丟失或重複。
圖片源於網路,如有侵權聯絡刪除
Qos 1:At least once
(至少一次),相對於QoS 0
而言Qos 1
增加了ack
確認機制,傳送者(publisher
)推播訊息到MQTT代理(broker
)時,兩者自身都會先持久化訊息,只有當publisher
或者 Broker
分別收到 PUBACK
確認時,才會刪除自身持久化的訊息,否則就會重發。
但有個問題,儘管我們可以通過確認來保證一定收到使用者端 或 伺服器的message
,可我們卻不能保證僅收到一次message
,也就是當用戶端publisher
沒收到Broker
的puback
或者 Broker
沒有收到subscriber
的puback
,那麼就會一直重發。
publisher -> broker 大致流程:
publisher store msg -> publish ->broker (傳遞message)
broker -> puback -> publisher delete msg (確認傳遞成功)
圖片源於網路,如有侵權聯絡刪除
Qos 2:Exactly once
(只有一次),相對於QoS 1
,QoS 2
升級實現了僅接受一次message
,publisher
和 broker
同樣對訊息進行持久化,其中 publisher
快取了message
和 對應的msgID
,而 broker
快取了 msgID
,可以保證訊息不重複,由於又增加了一個confirm
機制,整個流程變得複雜很多。
publisher -> broker 大致流程:
publisher store msg -> publish ->broker -> broker store
msgID(傳遞message) broker -> puberc (確認傳遞成功)
publisher -> pubrel ->broker delete msgID (告訴broker刪除msgID)
broker -> pubcomp -> publisher delete msg (告訴publisher刪除msg)
LWT
全稱為 Last Will and Testament
,其實遺囑是一個由使用者端預先定義好的主題和對應訊息,附加在CONNECT
的封包中,包括遺願主題
、遺願 QoS
、遺願訊息
等。
當MQTT代理 Broker
檢測到有使用者端client
非正常斷開連線時,再由伺服器主動釋出此訊息,然後相關的訂閱者會收到訊息。
舉個栗子:聊天室中所有人都訂閱一個叫talk
的主題 ,但小富由於網路抖動突然斷開了連結,這時聊天室中所有訂閱主題 talk
的使用者端都會收到一個 “小富離開聊天室
” 的遺願訊息。
遺囑的相關引數:
Will Flag
:是否使用 LWT,1 開啟
Will Topic
:遺願主題名,不可使用萬用字元
Will Qos
:釋出遺願訊息時使用的 QoS
Will Retain
:遺願訊息的 Retain 標識
Will Message
:遺願訊息內容
那使用者端Client
有哪些場景是非正常斷開連線呢?
Broker
檢測到底層的 I/O 異常;Keep Alive
的間隔內和 Broker
進行訊息互動;TCP
連線前沒有傳送 DISCONNECT
封包;Broker
,導致關閉和使用者端的連線等。注意:當用戶端通過釋出 DISCONNECT
封包斷開連線時,屬於正常斷開連線,並不會觸發 LWT
的機制,與此同時Broker
還會丟棄掉當前使用者端在連線時指定的相關 LWT
引數。
MQTT
協定廣泛應用於物聯網、行動網際網路、智慧硬體、車聯網、電力能源等領域。使用的場景也是非常非常多,下邊列舉一些:
具體 rabbitmq
的環境搭建就不贅述了,網上教學比較多,有條件的用伺服器,沒條件的像我搞個Windows
版的也很快樂嘛。
在這裡插入圖片描述
我們先開啟 rabbitmq
的 mqtt
協定,因為預設安裝下是關閉的,命令如下:
rabbitmq-plugins enable rabbitmq_mqtt
上一步中安裝rabbitmq
環境並開啟 mqtt
協定後,實際上mqtt
訊息代理服務就搭建好了,接下來要做的就是實現使用者端訊息的推播和訂閱。
這裡使用spring-integration-mqtt
、org.eclipse.paho.client.mqttv3
兩個工具包實現。
<!--mqtt依賴包--> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-mqtt</artifactId> </dependency> <dependency> <groupId>org.eclipse.paho</groupId> <artifactId>org.eclipse.paho.client.mqttv3</artifactId> <version>1.2.0</version> </dependency>
訊息的傳送比較簡單,主要是應用到@ServiceActivator
註解,需要注意messageHandler.setAsync
屬性,如果設定成false
,關閉非同步模式傳送訊息時可能會阻塞。
@Configuration public class IotMqttProducerConfig { @Autowired private MqttConfig mqttConfig; @Bean public MqttPahoClientFactory mqttClientFactory() { DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); factory.setServerURIs(mqttConfig.getServers()); return factory; } @Bean public MessageChannel mqttOutboundChannel() { return new DirectChannel(); } @Bean @ServiceActivator(inputChannel = "iotMqttInputChannel") public MessageHandler mqttOutbound() { MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(mqttConfig.getServerClientId(), mqttClientFactory()); messageHandler.setAsync(false); messageHandler.setDefaultTopic(mqttConfig.getDefaultTopic()); return messageHandler; } }
MQTT
對外提供傳送訊息的API
時,需要使用@MessagingGateway
註解,去提供一個訊息閘道器代理,引數defaultRequestChannel
指定傳送訊息繫結的channel
。
可以實現三種API
介面,payload
為傳送的訊息,topic
傳送訊息的主題,qos
訊息質量。
@MessagingGateway(defaultRequestChannel = "iotMqttInputChannel") public interface IotMqttGateway { // 向預設的 topic 傳送訊息 void sendMessage2Mqtt(String payload); // 向指定的 topic 傳送訊息 void sendMessage2Mqtt(String payload,@Header(MqttHeaders.TOPIC) String topic); // 向指定的 topic 傳送訊息,並指定服務質量引數 void sendMessage2Mqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload); }
訊息訂閱和我們平時用的MQ訊息監聽實現思路基本相似,@ServiceActivator
註解表明當前方法用於處理MQTT
訊息,inputChannel
引數指定了用於接收訊息的channel
。
/** * @Author: xiaofu * @Description: 訊息訂閱設定 * @date 2020/6/8 18:24 */ @Configuration public class IotMqttSubscriberConfig { @Autowired private MqttConfig mqttConfig; @Bean public MqttPahoClientFactory mqttClientFactory() { DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); factory.setServerURIs(mqttConfig.getServers()); return factory; } @Bean public MessageChannel iotMqttInputChannel() { return new DirectChannel(); } @Bean public MessageProducer inbound() { MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(mqttConfig.getClientId(), mqttClientFactory(), mqttConfig.getDefaultTopic()); adapter.setCompletionTimeout(5000); adapter.setConverter(new DefaultPahoMessageConverter()); adapter.setQos(1); adapter.setOutputChannel(iotMqttInputChannel()); return adapter; } /** * @author xiaofu * @description 訊息訂閱 * @date 2020/6/8 18:20 */ @Bean @ServiceActivator(inputChannel = "iotMqttInputChannel") public MessageHandler handlerTest() { return message -> { try { String string = message.getPayload().toString(); System.out.println("接收到訊息:" + string); } catch (MessagingException ex) { //logger.info(ex.getMessage()); } }; } }
額~ 由於本渣渣對硬體一竅不通,為了模擬硬體的傳送訊息,只能藉助一下工具,其實硬體端實現MQTT
協定,跟我們前邊的基本沒什麼區別,只不過換種語言嵌入到硬體中而已。
這裡選的測試工具為mqttbox
,下載地址:http://workswithweb.com/mqttbox.html
我們用先用mqttbox
模擬向主題mqtt_test_topic
傳送訊息,看後臺是否能成功接收到。
看到後臺成功拿到了向主題mqtt_test_topic
傳送的訊息。
用mqttbox
模擬訂閱主題mqtt_test_topic
,在後臺向主題mqtt_test_topic
傳送一條訊息,這裡我簡單的寫了個controller
呼叫API傳送訊息。
http://127.0.0.1:8080/fun/testMqtt?topic=mqtt_test_topic&message=我是後臺向主題 mqtt_test_topic 傳送的訊息
我們看mqttbox
的訂閱訊息,已經成功的接收到了後臺的訊息,到此我們的MQTT
通訊環境就算搭建成功了。如果把mqttbox
工具換成具體硬體裝置,整個流程就是我們常說的智慧家居了,其實真的沒那麼難。
在我們實際的生產環境中遇到過的問題,這裡分享一下讓大家少踩坑。
在使用者端connect
連線的時,會有一個clientId
引數,需要每個使用者端都保持唯一的。但我們在開發測試階段clientId
直接在程式碼中寫死了,而且服務都是單範例部署,並沒有暴露出什麼問題。
MqttPahoMessageDrivenChannelAdapter(mqttConfig.getClientId(), mqttClientFactory(), mqttConfig.getDefaultTopic());
然而在生產環境內側的時候,由於服務是多範例叢集部署,結果出現了下邊的奇怪問題。同一時間內只能有一個使用者端能拿到訊息,其他使用者端不但不能消費訊息,而且還在不斷的掉線重連:Lost connection: 已斷開連線; retrying...
。
這就是由於clientId
相同導致使用者端間相互競爭消費,最後將clientId
獲取方式換成從發號器中拿,問題就好了,所以這個地方是需要特別注意的。
平時程式在開發環境沒問題,可偏偏到了生產環境就一大堆問題,很多都是因為服務部署方式不同導致的。所以多學習分散式還是很有必要的。
MQTT
它只是一種協定,支援MQTT
協定的訊息中介軟體產品非常多,下邊的也只是其中的一部分
我也是第一次做和硬體相關的專案,之前聽到智慧家居都會覺得好高大上,但實際上手開發後發現,技術嘛萬變不離其宗,也只是換種用法而已。
以上就是springboot+rabbitmq實現智慧家居範例詳解的詳細內容,更多關於springboot rabbitmq智慧家居的資料請關注it145.com其它相關文章!
相關文章
<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
综合看Anker超能充系列的性价比很高,并且与不仅和iPhone12/苹果<em>Mac</em>Book很配,而且适合多设备充电需求的日常使用或差旅场景,不管是安卓还是Switch同样也能用得上它,希望这次分享能给准备购入充电器的小伙伴们有所
2021-06-01 09:31:42
除了L4WUDU与吴亦凡已经多次共事,成为了明面上的厂牌成员,吴亦凡还曾带领20XXCLUB全队参加2020年的一场音乐节,这也是20XXCLUB首次全员合照,王嗣尧Turbo、陈彦希Regi、<em>Mac</em> Ova Seas、林渝植等人全部出场。然而让
2021-06-01 09:31:34
目前应用IPFS的机构:1 谷歌<em>浏览器</em>支持IPFS分布式协议 2 万维网 (历史档案博物馆)数据库 3 火狐<em>浏览器</em>支持 IPFS分布式协议 4 EOS 等数字货币数据存储 5 美国国会图书馆,历史资料永久保存在 IPFS 6 加
2021-06-01 09:31:24
开拓者的车机是兼容苹果和<em>安卓</em>,虽然我不怎么用,但确实兼顾了我家人的很多需求:副驾的门板还配有解锁开关,有的时候老婆开车,下车的时候偶尔会忘记解锁,我在副驾驶可以自己开门:第二排设计很好,不仅配置了一个很大的
2021-06-01 09:30:48
不仅是<em>安卓</em>手机,苹果手机的降价力度也是前所未有了,iPhone12也“跳水价”了,发布价是6799元,如今已经跌至5308元,降价幅度超过1400元,最新定价确认了。iPhone12是苹果首款5G手机,同时也是全球首款5nm芯片的智能机,它
2021-06-01 09:30:45