首頁 > 軟體

Springcloud Stream訊息驅動工具使用介紹

2022-09-14 22:06:54

springcloud Stream

什麼是springcloud Stream

  現在市面上有很多的訊息中介軟體,每一個公司使用的都有所不同,為了減少學習的成本,springcloud Stream可以讓我們不再關注訊息中介軟體MQ的具體細節,我們只需要通過適配繫結的方式即可實現不同MQ之間的切換,但是遺憾的是springcloud Stream目前只支援RabbitMQ和Kafka。

  SpringCloud Stream是一個構建訊息驅動微服務的框架,應用程式通過inputs或者 outputs來與SpringCloud Stream中的binder進行互動,我們可以通過設定來binding ,而 SpringCloud Stream 的binder負責與中介軟體互動,所以我們只需要搞清楚如何與Stream互動就可以很方便的使用訊息驅動了!

什麼是Binder

  Binder是SpringCloud Stream的一個抽象概念,是應用與訊息中介軟體之間的粘合劑,通過定義繫結器Binder作為中間層,實現了應用程式與訊息中介軟體細節之間的隔離,可以動態的改變訊息的destinations(對應於 Kafka的topic,RabbitMQ的exchanges),這些都可以通過外部設定項來做到,甚至可以任意的改變中介軟體的型別但是不需要修改一行程式碼

為什麼使用Stream

  比方說我們用到了RabbitMQ和Kafka,由於這兩個訊息中介軟體的架構上的不同像RabbitMQ有exchange,kafka有Topic和Partitions分割區,這些中介軟體的差異性導致我們實際專案開發給我們造成了一定的困擾,我們如果用了兩個訊息佇列的其中一種,後面的業務需求,我想往另外一種訊息佇列進行遷移;這時候無疑就是一個災難性的,一大堆東西都要重新推倒重新做,因為它跟我們的系統耦合了,這襯候springcloud Stream給我們提供了一種解耦合的方式。

Stream使用案例

前置知識

Stream處理訊息的架構

  Source、Sink: 簡單的可理解為參照物件是Spring Cloud Stream自身,從Stream釋出訊息就是輸出,接受訊息就是輸入。Channel: 通道,是佇列Queue的一種抽象,在訊息通訊系統中就是實現儲存和轉發的媒介。Binder: 訊息的生產者和消費者中間層,實現了應用程式與訊息中介軟體細節之間的隔離

  通過以上兩張圖片可知,訊息的處理流向是:訊息生產者處理完業務邏輯之後訊息到達source中,接著前往Channel通道進行排隊,然後通過binder繫結器將訊息資料傳送到底層mq,然後又通過binder繫結器接收到底層mq傳送來的訊息資料,接著前往Channel通道進行排隊,由Sink接收到訊息資料,訊息消費者拿到訊息資料執行相應的業務邏輯

Stream常用註解

訊息生產者8801模組搭建

  第一步: 建立一個maven模組,引入相關依賴,最主要的就是stream整合rabbitmq的依賴

<!--stream的rabbitmq依賴-->
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

  第二步: 組態檔的編寫

server:
  port: 8801

spring:
  application:
    name: cloud-stream-provider
  cloud:
    stream:
      binders: # 在此處設定要繫結的rabbitmq的服務資訊;
        defaultRabbit: # 表示定義的名稱,用於於binding整合
          type: rabbit # 訊息元件型別
          environment: # 設定rabbitmq的相關的環境設定
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest
      bindings: # 服務的整合處理
        output: # 這個名字是一個通道的名稱
          destination: studyExchange # 表示要使用的Exchange名稱定義
          content-type: application/json # 設定訊息型別,本次為json,文字則設定“text/plain”
          binder: defaultRabbit  # 設定要繫結的訊息服務的具體設定

eureka:
  client: # 使用者端進行Eureka註冊的設定
    service-url:
      defaultZone: http://localhost:7001/eureka

  第三步: 主程式類

@SpringBootApplication
public class CloudStreamRabbitmqProvider8801Application {
    public static void main(String[] args) {
        SpringApplication.run(CloudStreamRabbitmqProvider8801Application.class, args);
        System.out.println("啟動成功");
    }
}

  第四步: 業務層service程式碼編寫,注意:這裡實現類注入的物件由之前的dao層物件換成了channel通道物件,詳細的傳送由實現類的第12完成

public interface IMessageProviderService {
    /**
     * 定義訊息的推播管道
     *
     * @return
     */
    String send();
}
@EnableBinding(Source.class)
public class MessageProviderServiceImpl implements IMessageProviderService {
    /**
     * 訊息傳送管道/通道
     */
    @Resource
    private MessageChannel output;
    @Override
    public String send() {
        String serial = UUID.randomUUID().toString();
        output.send(MessageBuilder.withPayload(serial).build());

        System.out.println("*****serial: " + serial);
        return serial;
    }
}

  第五步: controller介面

@RestController
public class SendMessageController {
    @Resource
    private IMessageProviderService messageProviderService;
    @GetMapping(value = "/sendMessage")
    public String sendMessage() {
        return messageProviderService.send();
    }
}

訊息消費者8802模組搭建

  第一步: 建立一個maven模組,引入相關依賴,最主要的就是stream整合rabbitmq的依賴

<!--stream的rabbitmq依賴-->
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

  第二步: 組態檔的編寫,與生產者的區別就在於bindings下的是input而不是output

server:
  port: 8802

spring:
  application:
    name: cloud-stream-consumer
  cloud:
    stream:
      binders: # 在此處設定要繫結的rabbitmq的服務資訊;
        defaultRabbit: # 表示定義的名稱,用於於binding整合
          type: rabbit # 訊息元件型別
          environment: # 設定rabbitmq的相關的環境設定
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest
      bindings: # 服務的整合處理
        input: # 這個名字是一個通道的名稱
          destination: studyExchange # 表示要使用的Exchange名稱定義
          content-type: application/json # 設定訊息型別,本次為json,文字則設定“text/plain”
          binder: defaultRabbit  # 設定要繫結的訊息服務的具體設定
eureka:
  client: # 使用者端進行Eureka註冊的設定
    service-url:
      defaultZone: http://localhost:7001/eureka

  第三步: 主程式類

@SpringBootApplication
public class CloudStreamRabbitmqConsumer8802Application {
    public static void main(String[] args) {
        SpringApplication.run(CloudStreamRabbitmqConsumer8802Application.class, args);
        System.out.println("啟動成功");
    }
}

  第四步: controller介面,使用url請求生產者8801,即可在消費者8802端接收到8801傳送的訊息

@Component
@EnableBinding(Sink.class)
public class ReceiveMessageListener {
    @Value("${server.port}")
    private String serverPort;
    @StreamListener(Sink.INPUT)
    public void input(Message<String> message) {
        System.out.println("消費者1號 ----> port:" + serverPort + "t從8801接受到的訊息是:" + message.getPayload());
    }
}

  兩個模組搭建完成進行測試,首先啟動註冊中心7001,然後分別啟動訊息生產者8801和訊息消費者8802,通過url請求存取8001的傳送訊息請求,會向指定管道中傳送一條訊息,如果此時這個管道中有消費者即可接收到這條訊息。而如何指定訊息的管道歸屬呢,就是通過組態檔中的indings.input.destination來指定,命名相同的服務就會處在同一條管道中

Stream帶來的問題

重複消費問題

  按照之前的使用,會帶來重複消費問題: 也就是說一個通道上有不止一個訊息消費者,stream上預設每一個消費者都屬於不同的組,這樣的話就會導致這個訊息被多個組的消費者重複消費

  知道了問題出現的原因就很容易解決了,只要我們自定義設定分組,將這些消費者都分配到同一個組中就能避免重複消費的問題出現了(同一個組間的消費者是競爭關係,不管組間有多少的消費者都只會消費一次)

自定義分組

  只需要在組態檔修改一處設定即可實現自定義組名並且自定義分組,組名相同的服務會被分配到同一組,通道內的訊息資料會被該組中的所有消費者輪詢消費

持久化問題

  上面自定義分組使用的group設定除了可以自定義分組和分組名之外,還可以實現訊息的持久化,也就是說使用group設定自定義分組和分組名的訊息消費者,就算在訊息生產者傳送訊息的時候掛掉了,等這個消費者重啟之後依然是能夠消費之前傳送的訊息

這裡一個生產者和兩個消費者存在以下十三種情況(生產者傳送四次訊息):

1、都使用group分組的兩個不同組成員,在生產者生產的時候

  • 都沒掛(各消費四次)
  • 掛了其中一個(各消費四次)
  • 都掛了(各消費四次)

2、都使用group分組的兩個同組成員,在生產者生產的時候

  • 都沒掛(各消費兩次)
  • 掛了其中一個(沒掛的把四次消費完)
  • 都掛了(各消費兩次)

3、其中一個使用group分組的兩個成員,在生產者生產的時候

  • 都掛了(都不消費)
  • group的掛了(各消費四次)
  • 沒group的掛了(沒掛的消費四次,掛的由於沒有持久化所以不消費)
  • 都沒掛(各消費四次)

4、都不使用group分組的兩個成員,在生產者生產的時候

  • 都掛了(都不消費)
  • 掛了其中一個(沒掛的消費四次,掛的由於沒有持久化所以不消費)
  • 都沒掛(各消費四次)

  總之一句話,通道里的訊息會持久化給使用group設定的訊息消費者(每一組都有一份),就算傳送訊息的時候這些消費者掛了,如果同組的消費者有沒掛的就會把這些訊息競爭消費完;如果同組沒有消費者,等他重啟之後還是會消費這些訊息

到此這篇關於Springcloud Stream訊息驅動工具使用介紹的文章就介紹到這了,更多相關Springcloud Stream內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!


IT145.com E-mail:sddin#qq.com