<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
changestream是monggodb的3.6版本之後出現的一種基於collection(資料庫集合)的變更事件流,應用程式通過db.collection.watch()這樣的命令可以獲得被監聽物件的實時變更
想必對mysql主從複製原理比較熟悉的同學應該知道,其根本就是從節點通過監聽binlog紀錄檔,然後解析binlog紀錄檔資料達到資料同步的目的,於是,基於mysql主從複製原理,阿里開源了canal這樣的資料同步中介軟體工具
Chang Stream(變更記錄流) 是指collection(資料庫集合)的變更事件流,應用程式通過db.collection.watch()這樣的命令可以獲得被監聽物件的實時變更。
關於changestream做如下說明,提供參考
changestream可用於監聽的mongodb目標型別
一個Change Stream Event的基本結構如下所示:
{ _id : { <BSON Object> }, "operationType" : "<operation>", "fullDocument" : { <document> }, "ns" : { "db" : "<database>", "coll" : "<collection" }, "documentKey" : { "_id" : <ObjectId> }, "updateDescription" : { "updatedFields" : { <document> }, "removedFields" : [ "<field>", ... ] } "clusterTime" : <Timestamp>, "txnNumber" : <NumberLong>, "lsid" : { "id" : <UUID>, "uid" : <BinData> } }
關於上面的資料結構,做簡單的解釋說明,
Change Steram支援的變更型別,對於上面的operationType 這個引數,主要包括有以下幾個:
以上的幾種型別,可以簡單理解為,監聽的mongo使用者操作的事件型別,比如新增資料,刪除資料,修改資料等
以上為changestream的必備理論知識,想要深入學習的話無比要了解,下面通過實操來展示下changestream的使用
mongdb複製叢集,本例的複製叢集對應的mongodb版本為 4.0.X
登入primary節點,建立一個資料庫
友情提醒:資料庫需要提前建立
1、啟動兩個Mongo shell,一個運算元據庫,一個watch
在其中一個視窗執行如下命令,開啟監聽
cursor = db.comment.watch()
2、在另一個視窗下,給上面的articledb插入一條資料
資料寫入成功後,在第一個視窗下,執行下面的命令:
cursor.next()
說明已經成功監聽到新增的資料,修改、刪除事件可以做類似的操作即可
以上先通過shell視窗展示了一下changestream的使用效果,接下來,將通過程式演示下如何在使用者端整合並使用changestream
<dependency> <groupId>org.mongodb</groupId> <artifactId>mongo-java-driver</artifactId> <version>3.12.2</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.75</version> </dependency>
import com.mongodb.*; import com.mongodb.client.MongoDatabase; import org.bson.conversions.Bson; import java.util.List; import static java.util.Collections.singletonList; import com.alibaba.fastjson.JSONObject; import com.mongodb.MongoClient; import com.mongodb.MongoClientURI; import com.mongodb.client.MongoCollection; import com.mongodb.client.MongoCursor; import com.mongodb.client.model.Aggregates; import com.mongodb.client.model.Filters; import com.mongodb.client.model.changestream.ChangeStreamDocument; import org.bson.Document; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.time.LocalDateTime; import java.util.ArrayList; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import static java.util.Arrays.asList; public class MongoTest { private static Logger logger = LoggerFactory.getLogger(MongoTest.class); public static void main(String[] args) { showmogodbdata(); } private static void showmogodbdata() { String sURI = "mongodb://IP:27017"; MongoClient mongoClient = new MongoClient(new MongoClientURI(sURI)); MongoDatabase database = mongoClient.getDatabase("articledb"); MongoCollection<Document> collec = database.getCollection("comment"); List<Bson> pipeline = singletonList(Aggregates.match(Filters.or( Document.parse("{'fullDocument.articleid': '100007'}"), Filters.in("operationType", asList("insert", "update", "delete"))))); MongoCursor<ChangeStreamDocument<Document>> cursor = collec.watch(pipeline).iterator(); while (cursor.hasNext()) { ChangeStreamDocument<Document> next = cursor.next(); logger.info("輸出mogodb的next的對應的值" + next.toString()); String Operation = next.getOperationType().getValue(); String tableNames = next.getNamespace().getCollectionName(); System.out.println(tableNames); //獲取主鍵id的值 String pk_id = next.getDocumentKey().toString(); //同步修改資料的操作 if (next.getUpdateDescription() != null) { JSONObject jsonObject = JSONObject.parseObject(next.getUpdateDescription().getUpdatedFields().toJson()); System.out.println(jsonObject); } //同步插入資料的操作 if (next.getFullDocument() != null) { JSONObject jsonObject = JSONObject.parseObject(next.getFullDocument().toJson()); //同步刪除資料的操作 if (next.getUpdateDescription() == null && Operation.matches("delete")) { JSONObject jsonObject = JSONObject.parseObject(pk_id); } }
這段程式主要分為幾個核心部分,做如下解釋說明,
啟動這段程式,觀察控制檯紀錄檔資料
在未對articledb資料庫下的comment集合做任何操作之前,由於watch為檢測到任何資料變化,所以無法進入到while迴圈中,接下來,從shell端給comment集合新增一條資料,然後再次觀察控制檯資料變化
可以看到,控制檯很快就檢測到變化的資料
以下為完整的紀錄檔資料
{ operationType=OperationType{value='insert'}, resumeToken={"_data": "8262138891000000022B022C0100296E5A1004B9065629412942F8852D592B9FD441B946645F696400646213889158B116A29C3FD1140004"}, namespace=articledb.comment, destinationNamespace=null, fullDocument=Document{{_id=6213889158b116a29c3fd114, articleid=100010, content=hello kafka, userid=1010, nickname=marry}}, documentKey={"_id": {"$oid": "6213889158b116a29c3fd114"}}, clusterTime=Timestamp{value=7067142396626075650, seconds=1645447313, inc=2}, updateDescription=null, txnNumber=null, lsid=null}
至於在業務中的具體使用,可以結合自身的情況,舉例來說,應用程式只想監聽修改資料的事件,那麼就可以在修改資料事件的監聽邏輯中,解析變化後的資料做後續的操作
springboot整合changestream
在實際開發中,更通用的場景是整合到springboot工程中使用,有過一定的開發經驗的同學應該很容易想到核心的邏輯長什麼樣了,和canal的使用者端操作類似,需要在一個設定類去監聽即可
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> <artifactId>spring-boot-starter</artifactId>
本例演示的是基於上文搭建的mongodb複製叢集
server.port=8081 #mongodb設定 spring.data.mongodb.uri=mongodb://IP:27017,IP:27018,IP:27019/articledb?maxPoolSize=512
import org.springframework.data.annotation.Id; import org.springframework.data.mongodb.core.mapping.Document; @Document(collection="comment") public class Comment { @Id private String articleid; private String content; private String userid; private String nickname; private Date createdatetime; public String getArticleid() { return articleid; } public void setArticleid(String articleid) { this.articleid = articleid; public String getContent() { return content; public void setContent(String content) { this.content = content; public String getUserid() { return userid; public void setUserid(String userid) { this.userid = userid; public String getNickname() { return nickname; public void setNickname(String nickname) { this.nickname = nickname; public Date getCreatedatetime() { return createdatetime; public void setCreatedatetime(Date createdatetime) { this.createdatetime = createdatetime; }
簡單的新增2個用介面測試的方法
import com.congge.entity.Comment; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.mongodb.core.MongoTemplate; import org.springframework.data.mongodb.core.query.Criteria; import org.springframework.data.mongodb.core.query.Query; import org.springframework.stereotype.Service; import java.util.List; @Service public class MongoDbService { private static final Logger logger = LoggerFactory.getLogger(MongoDbService.class); @Autowired private MongoTemplate mongoTemplate; /** * 查詢所有 * @return */ public List<Comment> findAll() { return mongoTemplate.findAll(Comment.class); } /*** * 根據id查詢 * @param id public Comment getBookById(String id) { Query query = new Query(Criteria.where("articleid").is(id)); return mongoTemplate.findOne(query, Comment.class); }
@RestController public class CommentController { @Autowired private MongoDbService mongoDbService; @GetMapping("/listAll") public Object listAll(){ return mongoDbService.findAll(); } @GetMapping("/findById") public Object findById(String id){ return mongoDbService.getBookById(id); }
啟動本工程,然後瀏覽器呼叫下查詢所有資料的介面,資料能正常返回,說明工程的基礎結構就完成了
MongoMessageListener 類 ,顧名思義,該類用於監聽特定資料庫下的集合資料變化使用的,在實際開發中,該類的作用也是非常重要的,類似於許多中介軟體的使用者端監聽程式,當監聽到資料變化後,做出後續的業務響應,比如,資料入庫、推播訊息到kafka、傳送相關的事件等等
import com.congge.entity.Comment; import com.mongodb.client.model.changestream.ChangeStreamDocument; import com.mongodb.client.model.changestream.OperationType; import org.bson.Document; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.data.mongodb.core.messaging.Message; import org.springframework.data.mongodb.core.messaging.MessageListener; import org.springframework.stereotype.Component; @Component public class MongoMessageListener implements MessageListener<ChangeStreamDocument<Document>,Comment> { private static Logger logger = LoggerFactory.getLogger(MongoMessageListener.class); @Override public void onMessage(Message<ChangeStreamDocument<Document>, Comment> message) { OperationType operationType = message.getRaw().getOperationType(); System.out.println("操作型別為 :" + operationType); System.out.println("變更資料主體 :" + message.getBody().getArticleid()); System.out.println("變更資料主體 :" + message.getBody().getContent()); System.out.println("變更資料主體 :" + message.getBody().getNickname()); System.out.println("變更資料主體 :" + message.getBody().getUserid()); System.out.println(); /*logger.info("Received Message in collection: {},message raw: {}, message body:{}", message.getProperties().getCollectionName(), message.getRaw(), message.getBody());*/ } }
ChangeStream 類 ,事件註冊類,即開篇中提到的那幾種事件型別的操作等
import com.congge.entity.Comment; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.CommandLineRunner; import org.springframework.context.annotation.Configuration; import org.springframework.data.mongodb.core.aggregation.Aggregation; import org.springframework.data.mongodb.core.messaging.ChangeStreamRequest; import org.springframework.data.mongodb.core.messaging.MessageListenerContainer; import org.springframework.data.mongodb.core.query.Criteria; @Configuration public class ChangeStream implements CommandLineRunner { @Autowired private MongoMessageListener mongoMessageListener; private MessageListenerContainer messageListenerContainer; @Override public void run(String... args) throws Exception{ ChangeStreamRequest<Comment> request = ChangeStreamRequest.builder(mongoMessageListener) .collection("comment") .filter(Aggregation.newAggregation(Aggregation.match(Criteria.where("operationType").in("insert","update","replace")))) .build(); messageListenerContainer.register(request,Comment.class); } }
MongoConfig 設定MessageListenerContainer 容器的相關引數
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.mongodb.core.MongoTemplate; import org.springframework.data.mongodb.core.messaging.DefaultMessageListenerContainer; import org.springframework.data.mongodb.core.messaging.MessageListenerContainer; import java.util.concurrent.Executor; import java.util.concurrent.Executors; @Configuration public class MongoConfig { @Bean MessageListenerContainer messageListenerContainer(MongoTemplate mongoTemplate){ Executor executor = Executors.newFixedThreadPool(5); return new DefaultMessageListenerContainer(mongoTemplate,executor){ @Override public boolean isAutoStartup(){ return true; } }; } }
3個類新增完成後,再次啟動程式,並觀察控制檯資料紀錄檔
測試1:通過shell視窗登入primary節點,並給comment集合新增一條資料
幾乎是實時的監聽到事件操作的資料變化,下面是完整的輸出紀錄檔
測試2:通過shell視窗刪除上面新增的這條資料
如果一個系統的資料需要遷移到另一個系統,可以考慮使用mongodb changestream這種方式,試想,如果老系統資料非常雜亂,並且檔案中存在一些髒資料時,為了確保遷移後的資料能較快的投產,通過應用程式的方式,能夠原始的資料做類似ETL的處理,這樣更加方便
如果您的系統對資料監管較為嚴格,可以考慮使用changestream這種方式,訂閱特定事件的資料操作,比如修改和刪除資料的事件,然後及時的傳送告警通知
我們知道,mongodb作為一款效能優秀的分散式檔案型資料庫,其實是可以儲存海量資料的,在一些巨量資料場景下,比如下游其他的應用採用巨量資料技術,需要對mongo中的資料做軌跡行為分析,changestream就是一種不錯的選擇,當監聽到特定事件的資料變化時,向訊息佇列,比如kafka推播相應的訊息,下游相關的巨量資料應用就可以做後續的業務處理了
到此這篇關於springboot整合mongodb changestream的範例程式碼的文章就介紹到這了,更多相關springboot整合mongodb changestream內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!
相關文章
<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
综合看Anker超能充系列的性价比很高,并且与不仅和iPhone12/苹果<em>Mac</em>Book很配,而且适合多设备充电需求的日常使用或差旅场景,不管是安卓还是Switch同样也能用得上它,希望这次分享能给准备购入充电器的小伙伴们有所
2021-06-01 09:31:42
除了L4WUDU与吴亦凡已经多次共事,成为了明面上的厂牌成员,吴亦凡还曾带领20XXCLUB全队参加2020年的一场音乐节,这也是20XXCLUB首次全员合照,王嗣尧Turbo、陈彦希Regi、<em>Mac</em> Ova Seas、林渝植等人全部出场。然而让
2021-06-01 09:31:34
目前应用IPFS的机构:1 谷歌<em>浏览器</em>支持IPFS分布式协议 2 万维网 (历史档案博物馆)数据库 3 火狐<em>浏览器</em>支持 IPFS分布式协议 4 EOS 等数字货币数据存储 5 美国国会图书馆,历史资料永久保存在 IPFS 6 加
2021-06-01 09:31:24
开拓者的车机是兼容苹果和<em>安卓</em>,虽然我不怎么用,但确实兼顾了我家人的很多需求:副驾的门板还配有解锁开关,有的时候老婆开车,下车的时候偶尔会忘记解锁,我在副驾驶可以自己开门:第二排设计很好,不仅配置了一个很大的
2021-06-01 09:30:48
不仅是<em>安卓</em>手机,苹果手机的降价力度也是前所未有了,iPhone12也“跳水价”了,发布价是6799元,如今已经跌至5308元,降价幅度超过1400元,最新定价确认了。iPhone12是苹果首款5G手机,同时也是全球首款5nm芯片的智能机,它
2021-06-01 09:30:45