<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
現在市面上有很多的訊息中介軟體,每一個公司使用的都有所不同,為了減少學習的成本,springcloud Stream可以讓我們不再關注訊息中介軟體MQ的具體細節,我們只需要通過適配繫結的方式即可實現不同MQ之間的切換,但是遺憾的是springcloud Stream目前只支援RabbitMQ和Kafka。
SpringCloud Stream是一個構建訊息驅動微服務的框架,應用程式通過inputs或者 outputs來與SpringCloud Stream中的binder進行互動,我們可以通過設定來binding ,而 SpringCloud Stream 的binder負責與中介軟體互動,所以我們只需要搞清楚如何與Stream互動就可以很方便的使用訊息驅動了!
Binder是SpringCloud Stream的一個抽象概念,是應用與訊息中介軟體之間的粘合劑,通過定義繫結器Binder作為中間層,實現了應用程式與訊息中介軟體細節之間的隔離,可以動態的改變訊息的destinations(對應於 Kafka的topic,RabbitMQ的exchanges),這些都可以通過外部設定項來做到,甚至可以任意的改變中介軟體的型別但是不需要修改一行程式碼
比方說我們用到了RabbitMQ和Kafka,由於這兩個訊息中介軟體的架構上的不同像RabbitMQ有exchange,kafka有Topic和Partitions分割區,這些中介軟體的差異性導致我們實際專案開發給我們造成了一定的困擾,我們如果用了兩個訊息佇列的其中一種,後面的業務需求,我想往另外一種訊息佇列進行遷移;這時候無疑就是一個災難性的,一大堆東西都要重新推倒重新做,因為它跟我們的系統耦合了,這襯候springcloud Stream給我們提供了一種解耦合的方式。
Source、Sink: 簡單的可理解為參照物件是Spring Cloud Stream自身,從Stream釋出訊息就是輸出,接受訊息就是輸入。Channel: 通道,是佇列Queue的一種抽象,在訊息通訊系統中就是實現儲存和轉發的媒介。Binder: 訊息的生產者和消費者中間層,實現了應用程式與訊息中介軟體細節之間的隔離
通過以上兩張圖片可知,訊息的處理流向是:訊息生產者處理完業務邏輯之後訊息到達source中,接著前往Channel通道進行排隊,然後通過binder繫結器將訊息資料傳送到底層mq,然後又通過binder繫結器接收到底層mq傳送來的訊息資料,接著前往Channel通道進行排隊,由Sink接收到訊息資料,訊息消費者拿到訊息資料執行相應的業務邏輯
第一步: 建立一個maven模組,引入相關依賴,最主要的就是stream整合rabbitmq的依賴
<!--stream的rabbitmq依賴--> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency>
第二步: 組態檔的編寫
server:
port: 8801spring:
application:
name: cloud-stream-provider
cloud:
stream:
binders: # 在此處設定要繫結的rabbitmq的服務資訊;
defaultRabbit: # 表示定義的名稱,用於於binding整合
type: rabbit # 訊息元件型別
environment: # 設定rabbitmq的相關的環境設定
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
bindings: # 服務的整合處理
output: # 這個名字是一個通道的名稱
destination: studyExchange # 表示要使用的Exchange名稱定義
content-type: application/json # 設定訊息型別,本次為json,文字則設定“text/plain”
binder: defaultRabbit # 設定要繫結的訊息服務的具體設定eureka:
client: # 使用者端進行Eureka註冊的設定
service-url:
defaultZone: http://localhost:7001/eureka
第三步: 主程式類
@SpringBootApplication public class CloudStreamRabbitmqProvider8801Application { public static void main(String[] args) { SpringApplication.run(CloudStreamRabbitmqProvider8801Application.class, args); System.out.println("啟動成功"); } }
第四步: 業務層service程式碼編寫,注意:這裡實現類注入的物件由之前的dao層物件換成了channel通道物件,詳細的傳送由實現類的第12完成
public interface IMessageProviderService { /** * 定義訊息的推播管道 * * @return */ String send(); }
@EnableBinding(Source.class) public class MessageProviderServiceImpl implements IMessageProviderService { /** * 訊息傳送管道/通道 */ @Resource private MessageChannel output; @Override public String send() { String serial = UUID.randomUUID().toString(); output.send(MessageBuilder.withPayload(serial).build()); System.out.println("*****serial: " + serial); return serial; } }
第五步: controller介面
@RestController public class SendMessageController { @Resource private IMessageProviderService messageProviderService; @GetMapping(value = "/sendMessage") public String sendMessage() { return messageProviderService.send(); } }
第一步: 建立一個maven模組,引入相關依賴,最主要的就是stream整合rabbitmq的依賴
<!--stream的rabbitmq依賴--> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency>
第二步: 組態檔的編寫,與生產者的區別就在於bindings下的是input而不是output
server:
port: 8802spring:
application:
name: cloud-stream-consumer
cloud:
stream:
binders: # 在此處設定要繫結的rabbitmq的服務資訊;
defaultRabbit: # 表示定義的名稱,用於於binding整合
type: rabbit # 訊息元件型別
environment: # 設定rabbitmq的相關的環境設定
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
bindings: # 服務的整合處理
input: # 這個名字是一個通道的名稱
destination: studyExchange # 表示要使用的Exchange名稱定義
content-type: application/json # 設定訊息型別,本次為json,文字則設定“text/plain”
binder: defaultRabbit # 設定要繫結的訊息服務的具體設定
eureka:
client: # 使用者端進行Eureka註冊的設定
service-url:
defaultZone: http://localhost:7001/eureka
第三步: 主程式類
@SpringBootApplication public class CloudStreamRabbitmqConsumer8802Application { public static void main(String[] args) { SpringApplication.run(CloudStreamRabbitmqConsumer8802Application.class, args); System.out.println("啟動成功"); } }
第四步: controller介面,使用url請求生產者8801,即可在消費者8802端接收到8801傳送的訊息
@Component @EnableBinding(Sink.class) public class ReceiveMessageListener { @Value("${server.port}") private String serverPort; @StreamListener(Sink.INPUT) public void input(Message<String> message) { System.out.println("消費者1號 ----> port:" + serverPort + "t從8801接受到的訊息是:" + message.getPayload()); } }
兩個模組搭建完成進行測試,首先啟動註冊中心7001,然後分別啟動訊息生產者8801和訊息消費者8802,通過url請求存取8001的傳送訊息請求,會向指定管道中傳送一條訊息,如果此時這個管道中有消費者即可接收到這條訊息。而如何指定訊息的管道歸屬呢,就是通過組態檔中的indings.input.destination來指定,命名相同的服務就會處在同一條管道中
按照之前的使用,會帶來重複消費問題: 也就是說一個通道上有不止一個訊息消費者,stream上預設每一個消費者都屬於不同的組,這樣的話就會導致這個訊息被多個組的消費者重複消費
知道了問題出現的原因就很容易解決了,只要我們自定義設定分組,將這些消費者都分配到同一個組中就能避免重複消費的問題出現了(同一個組間的消費者是競爭關係,不管組間有多少的消費者都只會消費一次)
只需要在組態檔修改一處設定即可實現自定義組名並且自定義分組,組名相同的服務會被分配到同一組,通道內的訊息資料會被該組中的所有消費者輪詢消費
上面自定義分組使用的group設定除了可以自定義分組和分組名之外,還可以實現訊息的持久化,也就是說使用group設定自定義分組和分組名的訊息消費者,就算在訊息生產者傳送訊息的時候掛掉了,等這個消費者重啟之後依然是能夠消費之前傳送的訊息
這裡一個生產者和兩個消費者存在以下十三種情況(生產者傳送四次訊息):
1、都使用group分組的兩個不同組成員,在生產者生產的時候
2、都使用group分組的兩個同組成員,在生產者生產的時候
3、其中一個使用group分組的兩個成員,在生產者生產的時候
4、都不使用group分組的兩個成員,在生產者生產的時候
總之一句話,通道里的訊息會持久化給使用group設定的訊息消費者(每一組都有一份),就算傳送訊息的時候這些消費者掛了,如果同組的消費者有沒掛的就會把這些訊息競爭消費完;如果同組沒有消費者,等他重啟之後還是會消費這些訊息
到此這篇關於Springcloud Stream訊息驅動工具使用介紹的文章就介紹到這了,更多相關Springcloud Stream內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!
相關文章
<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
综合看Anker超能充系列的性价比很高,并且与不仅和iPhone12/苹果<em>Mac</em>Book很配,而且适合多设备充电需求的日常使用或差旅场景,不管是安卓还是Switch同样也能用得上它,希望这次分享能给准备购入充电器的小伙伴们有所
2021-06-01 09:31:42
除了L4WUDU与吴亦凡已经多次共事,成为了明面上的厂牌成员,吴亦凡还曾带领20XXCLUB全队参加2020年的一场音乐节,这也是20XXCLUB首次全员合照,王嗣尧Turbo、陈彦希Regi、<em>Mac</em> Ova Seas、林渝植等人全部出场。然而让
2021-06-01 09:31:34
目前应用IPFS的机构:1 谷歌<em>浏览器</em>支持IPFS分布式协议 2 万维网 (历史档案博物馆)数据库 3 火狐<em>浏览器</em>支持 IPFS分布式协议 4 EOS 等数字货币数据存储 5 美国国会图书馆,历史资料永久保存在 IPFS 6 加
2021-06-01 09:31:24
开拓者的车机是兼容苹果和<em>安卓</em>,虽然我不怎么用,但确实兼顾了我家人的很多需求:副驾的门板还配有解锁开关,有的时候老婆开车,下车的时候偶尔会忘记解锁,我在副驾驶可以自己开门:第二排设计很好,不仅配置了一个很大的
2021-06-01 09:30:48
不仅是<em>安卓</em>手机,苹果手机的降价力度也是前所未有了,iPhone12也“跳水价”了,发布价是6799元,如今已经跌至5308元,降价幅度超过1400元,最新定价确认了。iPhone12是苹果首款5G手机,同时也是全球首款5nm芯片的智能机,它
2021-06-01 09:30:45