首頁 > 軟體

springboot整合mongodb changestream的範例程式碼

2022-02-22 13:02:20

前言

changestream是monggodb的3.6版本之後出現的一種基於collection(資料庫集合)的變更事件流,應用程式通過db.collection.watch()這樣的命令可以獲得被監聽物件的實時變更

想必對mysql主從複製原理比較熟悉的同學應該知道,其根本就是從節點通過監聽binlog紀錄檔,然後解析binlog紀錄檔資料達到資料同步的目的,於是,基於mysql主從複製原理,阿里開源了canal這樣的資料同步中介軟體工具

Change Stream 介紹

Chang Stream(變更記錄流) 是指collection(資料庫集合)的變更事件流,應用程式通過db.collection.watch()這樣的命令可以獲得被監聽物件的實時變更。

關於changestream做如下說明,提供參考

  • 在該特性出現之前,開發者可通過拉取 oplog達到同樣的目的;
  • 但 oplog 的處理及解析相對複雜,而且存在被回滾的風險,如果使用不當的話還會帶來效能問題;
  • Change Stream 可以與aggregate framework結合使用,對變更集進行進一步的過濾或轉換;
  • 由於Change Stream 利用了儲存在 oplog 中的資訊,因此對於單程序部署的MongoDB無法支援Change Stream功能,其只能用於啟用了副本集的獨立叢集或分片叢集

changestream可用於監聽的mongodb目標型別

  • 單個集合,除系統庫(admin/local/config)之外的集合,3.6版本支援
  • 單個資料庫,除系統庫(admin/local/config)之外的資料庫集合,4.0版本支援
  • 整個叢集,整個叢集內除去系統庫( (admin/local/config)之外的集合 ,4.0版本支援

一個Change Stream Event的基本結構如下所示:

{
   _id : { <BSON Object> },
   "operationType" : "<operation>",
   "fullDocument" : { <document> },
   "ns" : {
      "db" : "<database>",
      "coll" : "<collection"
   },
   "documentKey" : { "_id" : <ObjectId> },
   "updateDescription" : {
      "updatedFields" : { <document> },
      "removedFields" : [ "<field>", ... ]
   }
   "clusterTime" : <Timestamp>,
   "txnNumber" : <NumberLong>,
   "lsid" : {
      "id" : <UUID>,
      "uid" : <BinData>
   }
}

關於上面的資料結構,做簡單的解釋說明,

  • _id,變更事件的Token物件
  • operationType,變更型別(見下面介紹)
  • fullDocument,檔案內容
  • ns,監聽的目標
  • ns.db,變更的資料庫
  • ns.coll,變更的集合
  • documentKey,變更檔案的鍵值,含_id欄位
  • updateDescription,變更描述
  • updateDescription.updatedFields,變更中更新欄位
  • updateDescription.removedFields,變更中刪除欄位
  • clusterTime,對應oplog的時間戳
  • txnNumber,事務編號,僅在多檔案事務中出現,4.0版本支援
  • lsid,事務關聯的對談編號,僅在多檔案事務中出現,4.0版本支援

Change Steram支援的變更型別,對於上面的operationType 這個引數,主要包括有以下幾個:

  • insert,插入檔案
  • delete,刪除檔案
  • replace,替換檔案,當執行replace操作指定upsert時,可能是insert事件
  • update,更新檔案,當執行update操作指定upsert時,可能是insert事件
  • invalidate,失效事件,比如執行了collection.drop或collection.rename

以上的幾種型別,可以簡單理解為,監聽的mongo使用者操作的事件型別,比如新增資料,刪除資料,修改資料等

以上為changestream的必備理論知識,想要深入學習的話無比要了解,下面通過實操來展示下changestream的使用

環境準備

mongdb複製叢集,本例的複製叢集對應的mongodb版本為 4.0.X

登入primary節點,建立一個資料庫

友情提醒:資料庫需要提前建立

1、啟動兩個Mongo shell,一個運算元據庫,一個watch

在其中一個視窗執行如下命令,開啟監聽

cursor = db.comment.watch()

2、在另一個視窗下,給上面的articledb插入一條資料

資料寫入成功後,在第一個視窗下,執行下面的命令:

cursor.next()

說明已經成功監聽到新增的資料,修改、刪除事件可以做類似的操作即可

以上先通過shell視窗展示了一下changestream的使用效果,接下來,將通過程式演示下如何在使用者端整合並使用changestream

Java使用者端操作changestream

1、引入maven依賴

<dependency>
            <groupId>org.mongodb</groupId>
            <artifactId>mongo-java-driver</artifactId>
            <version>3.12.2</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.75</version>
        </dependency>

2、測試類核心程式碼

import com.mongodb.*;
import com.mongodb.client.MongoDatabase;
import org.bson.conversions.Bson;

import java.util.List;
import static java.util.Collections.singletonList;
import com.alibaba.fastjson.JSONObject;
import com.mongodb.MongoClient;
import com.mongodb.MongoClientURI;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor;
import com.mongodb.client.model.Aggregates;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.changestream.ChangeStreamDocument;
import org.bson.Document;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import static java.util.Arrays.asList;
public class MongoTest {
    private static Logger logger = LoggerFactory.getLogger(MongoTest.class);
    public static void main(String[] args) {
        showmogodbdata();
    }
    private static void showmogodbdata() {
		
        String sURI = "mongodb://IP:27017";
        MongoClient mongoClient = new MongoClient(new MongoClientURI(sURI));
        MongoDatabase database = mongoClient.getDatabase("articledb");
        MongoCollection<Document> collec = database.getCollection("comment");
        List<Bson> pipeline = singletonList(Aggregates.match(Filters.or(
                Document.parse("{'fullDocument.articleid': '100007'}"),
                Filters.in("operationType", asList("insert", "update", "delete")))));
        MongoCursor<ChangeStreamDocument<Document>> cursor = collec.watch(pipeline).iterator();
        while (cursor.hasNext()) {
            ChangeStreamDocument<Document> next = cursor.next();
            logger.info("輸出mogodb的next的對應的值" + next.toString());
            String Operation = next.getOperationType().getValue();
            String tableNames = next.getNamespace().getCollectionName();
            System.out.println(tableNames);
            //獲取主鍵id的值
            String pk_id = next.getDocumentKey().toString();
            //同步修改資料的操作
            if (next.getUpdateDescription() != null) {
                JSONObject jsonObject = JSONObject.parseObject(next.getUpdateDescription().getUpdatedFields().toJson());
                System.out.println(jsonObject);
            }
            //同步插入資料的操作
            if (next.getFullDocument() != null) {
                JSONObject jsonObject = JSONObject.parseObject(next.getFullDocument().toJson());
            //同步刪除資料的操作
            if (next.getUpdateDescription() == null && Operation.matches("delete")) {
                JSONObject jsonObject = JSONObject.parseObject(pk_id);
        }
}

這段程式主要分為幾個核心部分,做如下解釋說明,

  • 連線mogodb伺服器端及相關設定
  • 通過pipline開啟watch監聽
  • 監聽到特定資料庫下集合的資料變化,然後列印出變化的資料

啟動這段程式,觀察控制檯紀錄檔資料

在未對articledb資料庫下的comment集合做任何操作之前,由於watch為檢測到任何資料變化,所以無法進入到while迴圈中,接下來,從shell端給comment集合新增一條資料,然後再次觀察控制檯資料變化

可以看到,控制檯很快就檢測到變化的資料

以下為完整的紀錄檔資料

{ operationType=OperationType{value='insert'}, resumeToken={"_data": "8262138891000000022B022C0100296E5A1004B9065629412942F8852D592B9FD441B946645F696400646213889158B116A29C3FD1140004"}, namespace=articledb.comment, destinationNamespace=null, fullDocument=Document{{_id=6213889158b116a29c3fd114, articleid=100010, content=hello kafka, userid=1010, nickname=marry}}, documentKey={"_id": {"$oid": "6213889158b116a29c3fd114"}}, clusterTime=Timestamp{value=7067142396626075650, seconds=1645447313, inc=2}, updateDescription=null, txnNumber=null, lsid=null}

至於在業務中的具體使用,可以結合自身的情況,舉例來說,應用程式只想監聽修改資料的事件,那麼就可以在修改資料事件的監聽邏輯中,解析變化後的資料做後續的操作

springboot整合changestream

在實際開發中,更通用的場景是整合到springboot工程中使用,有過一定的開發經驗的同學應該很容易想到核心的邏輯長什麼樣了,和canal的使用者端操作類似,需要在一個設定類去監聽即可

下面來看看具體的整合步驟

1、引入核心依賴

<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <artifactId>spring-boot-starter</artifactId>

2、核心組態檔

本例演示的是基於上文搭建的mongodb複製叢集

server.port=8081

#mongodb設定
spring.data.mongodb.uri=mongodb://IP:27017,IP:27018,IP:27019/articledb?maxPoolSize=512

3、編寫實體類,對映comment集合中的欄位

import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.core.mapping.Document;

@Document(collection="comment")
public class Comment {
    @Id
    private String articleid;
    private String content;
    private String userid;
    private String nickname;
    private Date createdatetime;
    public String getArticleid() {
        return articleid;
    }
    public void setArticleid(String articleid) {
        this.articleid = articleid;
    public String getContent() {
        return content;
    public void setContent(String content) {
        this.content = content;
    public String getUserid() {
        return userid;
    public void setUserid(String userid) {
        this.userid = userid;
    public String getNickname() {
        return nickname;
    public void setNickname(String nickname) {
        this.nickname = nickname;
    public Date getCreatedatetime() {
        return createdatetime;
    public void setCreatedatetime(Date createdatetime) {
        this.createdatetime = createdatetime;
}

4、編寫一個服務類

簡單的新增2個用介面測試的方法

import com.congge.entity.Comment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.stereotype.Service;

import java.util.List;
@Service
public class MongoDbService {
    private static final Logger logger = LoggerFactory.getLogger(MongoDbService.class);
    @Autowired
    private MongoTemplate mongoTemplate;
    /**
     * 查詢所有
     * @return
     */
    public List<Comment> findAll() {
        return mongoTemplate.findAll(Comment.class);
    }
    /***
     * 根據id查詢
     * @param id
    public Comment getBookById(String id) {
        Query query = new Query(Criteria.where("articleid").is(id));
        return mongoTemplate.findOne(query, Comment.class);
}

5、編寫一個介面

@RestController
public class CommentController {

    @Autowired
    private MongoDbService mongoDbService;
    @GetMapping("/listAll")
    public Object listAll(){
        return mongoDbService.findAll();
    }
    @GetMapping("/findById")
    public Object findById(String id){
        return mongoDbService.getBookById(id);
}

啟動本工程,然後瀏覽器呼叫下查詢所有資料的介面,資料能正常返回,說明工程的基礎結構就完成了

6、接下來,只需要依次新增下面3個設定類即可

MongoMessageListener 類 ,顧名思義,該類用於監聽特定資料庫下的集合資料變化使用的,在實際開發中,該類的作用也是非常重要的,類似於許多中介軟體的使用者端監聽程式,當監聽到資料變化後,做出後續的業務響應,比如,資料入庫、推播訊息到kafka、傳送相關的事件等等

import com.congge.entity.Comment;
import com.mongodb.client.model.changestream.ChangeStreamDocument;
import com.mongodb.client.model.changestream.OperationType;
import org.bson.Document;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.mongodb.core.messaging.Message;
import org.springframework.data.mongodb.core.messaging.MessageListener;
import org.springframework.stereotype.Component;

@Component
public class MongoMessageListener implements MessageListener<ChangeStreamDocument<Document>,Comment> {
    private static Logger logger = LoggerFactory.getLogger(MongoMessageListener.class);
    @Override
    public void onMessage(Message<ChangeStreamDocument<Document>, Comment> message) {
        OperationType operationType = message.getRaw().getOperationType();
        System.out.println("操作型別為 :" + operationType);
        System.out.println("變更資料主體 :" + message.getBody().getArticleid());
        System.out.println("變更資料主體 :" + message.getBody().getContent());
        System.out.println("變更資料主體 :" + message.getBody().getNickname());
        System.out.println("變更資料主體 :" + message.getBody().getUserid());
        System.out.println();
        /*logger.info("Received Message in collection: {},message raw: {}, message body:{}",
                message.getProperties().getCollectionName(), message.getRaw(), message.getBody());*/
    }
}

ChangeStream 類 ,事件註冊類,即開篇中提到的那幾種事件型別的操作等

import com.congge.entity.Comment;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.mongodb.core.aggregation.Aggregation;
import org.springframework.data.mongodb.core.messaging.ChangeStreamRequest;
import org.springframework.data.mongodb.core.messaging.MessageListenerContainer;
import org.springframework.data.mongodb.core.query.Criteria;

@Configuration
public class ChangeStream implements CommandLineRunner {
    @Autowired
    private MongoMessageListener mongoMessageListener;
    private MessageListenerContainer messageListenerContainer;
    @Override
    public void run(String... args) throws Exception{
        ChangeStreamRequest<Comment> request = ChangeStreamRequest.builder(mongoMessageListener)
                .collection("comment")
                .filter(Aggregation.newAggregation(Aggregation.match(Criteria.where("operationType").in("insert","update","replace"))))
                .build();
        messageListenerContainer.register(request,Comment.class);
    }
}

MongoConfig 設定MessageListenerContainer 容器的相關引數

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.messaging.DefaultMessageListenerContainer;
import org.springframework.data.mongodb.core.messaging.MessageListenerContainer;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
@Configuration
public class MongoConfig {
    @Bean
    MessageListenerContainer messageListenerContainer(MongoTemplate mongoTemplate){
        Executor executor = Executors.newFixedThreadPool(5);
        return new DefaultMessageListenerContainer(mongoTemplate,executor){
            @Override
            public boolean isAutoStartup(){
                return true;
            }
        };
    }
}

3個類新增完成後,再次啟動程式,並觀察控制檯資料紀錄檔

測試1:通過shell視窗登入primary節點,並給comment集合新增一條資料

幾乎是實時的監聽到事件操作的資料變化,下面是完整的輸出紀錄檔

測試2:通過shell視窗刪除上面新增的這條資料

典型應用場景

資料遷移

如果一個系統的資料需要遷移到另一個系統,可以考慮使用mongodb changestream這種方式,試想,如果老系統資料非常雜亂,並且檔案中存在一些髒資料時,為了確保遷移後的資料能較快的投產,通過應用程式的方式,能夠原始的資料做類似ETL的處理,這樣更加方便

應用監控

如果您的系統對資料監管較為嚴格,可以考慮使用changestream這種方式,訂閱特定事件的資料操作,比如修改和刪除資料的事件,然後及時的傳送告警通知

對接巨量資料應用

我們知道,mongodb作為一款效能優秀的分散式檔案型資料庫,其實是可以儲存海量資料的,在一些巨量資料場景下,比如下游其他的應用採用巨量資料技術,需要對mongo中的資料做軌跡行為分析,changestream就是一種不錯的選擇,當監聽到特定事件的資料變化時,向訊息佇列,比如kafka推播相應的訊息,下游相關的巨量資料應用就可以做後續的業務處理了

到此這篇關於springboot整合mongodb changestream的範例程式碼的文章就介紹到這了,更多相關springboot整合mongodb changestream內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!


IT145.com E-mail:sddin#qq.com