首頁 > 軟體

剖析後OpLog訂閱MongoDB的資料變更就沒那麼難了

2022-02-24 13:01:29

前言

我們開源了一個訂閱分發mysql的binlog的專案,一直用的非常好,忽然有天開發說能不能支援MongoDB的資料訂閱呢,MongoDB的使用度也挺廣泛的。安排。經過簡單的瞭解後發現MongoDB也有類似binlog的機制,最終花了兩天時間把功能完成,並統一抽象整合到binlog開源專案中,使用和binlog同一套訂閱分發模型管理MongoDB資料來源。整個過程非常順利,比整mysql的binlog要簡單的多了。

oplog簡介

先來聊聊MongoDB的主備機制,和mysql的binlog類似,在MongoDB中,有一個系統庫“”Local”,庫裡有一個集合“oplog.rs”,這個集合類似於binlog檔案,裡面記錄了MongoDB的所有操作。從節點通過讀取oplog.rs裡的資料做到資料同步。

解析oplog

和訂閱mysql的binlog一樣(模擬一個從節點mysql)。我們的訂閱服務要像從節點那樣讀取解析oplog.rs裡的資料。解析前先看下oplog.rs的Document的資料結構

上圖是一個插入的資料的紀錄檔,可見oplog的doc中共有如下欄位,含義分別如下:

ts:操作的時間戳(非常重要)

t:term最初在主資料庫上生成操作的。(含義不明)

h:本次操作的唯一hashID

v: 版本號

op:操作型別,有六種型別,我們只需要關注其中的i(插入)、u(更新)、d(刪除)即可

ns:庫名和集合名稱,中間使用“.”連線

o:本次操作的document內容

o2:只有op操作型別時u更新時,才會有這個欄位,代表更新的條件語句

$set:o2獲取後的檔案裡的屬性,代表更新的欄位

如上欄位,完成一次oplog的解析,只需要ts、op、ns、o、o2、$set即可,其中ts非常重要,可以類比為binlog中的Position。同步mysql的資料時,通過記錄消費binlog的位置,也就是Position,可以有效避免訂閱服務停機後,消費記錄丟失的問題。同步MongoDB時,通過記錄ts的值,來記錄消費的位置,可以到達和訂閱binlog一樣的效果。和mysql訂閱不同的是,MongoDB的同步需要同步服務自己查詢,而且oplog在MongoDB4.0之前的版本有大小限制,超過設定的容量後,老的資料就會被丟失,在4.0之後的版本已經解除了這個限制。

程式碼

上面已經分析了oplog的結構以及訂閱步驟,下面我們直接構建查詢即可,需要注意,每次獲取到的ts值,需要儲存記錄下來,已便重新訂閱時,從上次斷開的記錄重新開始。下面直接看程式碼,重點邏輯都以註釋詳盡

private BsonTimestamp queryTs;
    @Test
    public void OpLogTest() {
        MongoClient mongoClient = new MongoClient(new MongoClientURI("mongodb://admin:admin@127.0.0.1:3717"));
        MongoCollectioncollection = mongoClient.getDatabase("local")
                .getCollection("oplog.rs");

        //如果是首次訂閱,需要使用自然排序查詢,獲取第最後一次操作的操作時間戳。如果是續訂閱直接讀取記錄的值賦值給queryTs即可
        FindIterabletsCursor = collection.find().sort(new BasicDBObject("$natural", -1))
                .limit(1);
        Document tsDoc = tsCursor.first();
        queryTs = (BsonTimestamp) tsDoc.get("ts");
        while (true) try {
            //構建查詢語句,查詢大於當前查詢時間戳queryTs的記錄
            BasicDBObject query = new BasicDBObject("ts", new BasicDBObject("$gt", queryTs));
            MongoCursordocCursor = collection.find(query)
                    .cursorType(CursorType.TailableAwait) //沒有資料時阻塞休眠
                    .noCursorTimeout(true) //防止伺服器在不活動時間(10分鐘)後使空閒的遊標超時。
                    .oplogReplay(true) //結合query條件,獲取增量資料,這個引數比較難懂,見:https://docs.mongodb.com/manual/reference/command/find/index.html
                    .maxAwaitTime(1, TimeUnit.SECONDS) //設定此操作在伺服器上的最大等待執行時間
                    .iterator();
            while (docCursor.hasNext()) {
                Document document = docCursor.next();
                //更新查詢時間戳
                queryTs = (BsonTimestamp) document.get("ts");
                //TODO 在這裡接收到資料後通過訂閱資料路由分發

                String op = document.getString("op");
                String database = document.getString("ns");
                Document context = (Document) document.get("o");
                Document where = null;
                if (op.equals("u")) {
                    where = (Document) document.get("o2");
                    if (context != null) {
                        context = (Document) context.get("$set");
                    }
                }
                System.err.println("操作時間戳:" + queryTs.getTime());
                System.err.println("操作類  型:" + op);
                System.err.println("資料庫.集合:" + database);
                System.err.println("更新條件:" + JSON.toJSONString(where));
                System.err.println("檔案內容:" + JSON.toJSONString(context));
            }
        } catch (Exception e) { e.printStackTrace(); }
    }

結語

上面程式碼只是一個簡單的測試用例,完整的應用還需要考慮ts的記錄更新,事件的抽象,資料的分發等。我們已經開源的binlog訂閱分發專案目前支援資料來源線上管理,訂閱資料(庫、表)線上管理,如果能夠使用同一套管理後臺管理binlog和oplog的訂閱在好不過。要實現和binlog統一管理模型,設定和分發方面基本不需要改動,然後從頂層資料來源方面做區分實現即可。

目前我們整合管理的功能都已經開發好了,關於oplog部分的程式碼還沒提交到github上,後面會和大家相見。

以上就是剖析後OpLog訂閱MongoDB的資料變更就沒那麼難了的詳細內容,更多關於OpLog訂閱MongoDB的資料變更的資料請關注it145.com其它相關文章!


IT145.com E-mail:sddin#qq.com