日本免费高清视频-国产福利视频导航-黄色在线播放国产-天天操天天操天天操天天操|www.shdianci.com

學(xué)無(wú)先后,達(dá)者為師

網(wǎng)站首頁(yè) 編程語(yǔ)言 正文

剖析后OpLog訂閱MongoDB的數(shù)據(jù)變更就沒(méi)那么難了_MongoDB

作者:kl ? 更新時(shí)間: 2022-04-27 編程語(yǔ)言

前言

我們開(kāi)源了一個(gè)訂閱分發(fā)mysql的binlog的項(xiàng)目,一直用的非常好,忽然有天開(kāi)發(fā)說(shuō)能不能支持MongoDB的數(shù)據(jù)訂閱呢,MongoDB的使用度也挺廣泛的。安排。經(jīng)過(guò)簡(jiǎn)單的了解后發(fā)現(xiàn)MongoDB也有類(lèi)似binlog的機(jī)制,最終花了兩天時(shí)間把功能完成,并統(tǒng)一抽象集成到binlog開(kāi)源項(xiàng)目中,使用和binlog同一套訂閱分發(fā)模型管理MongoDB數(shù)據(jù)源。整個(gè)過(guò)程非常順利,比整mysql的binlog要簡(jiǎn)單的多了。

oplog簡(jiǎn)介

先來(lái)聊聊MongoDB的主備機(jī)制,和mysql的binlog類(lèi)似,在MongoDB中,有一個(gè)系統(tǒng)庫(kù)“”Local”,庫(kù)里有一個(gè)集合“oplog.rs”,這個(gè)集合類(lèi)似于binlog文件,里面記錄了MongoDB的所有操作。從節(jié)點(diǎn)通過(guò)讀取oplog.rs里的數(shù)據(jù)做到數(shù)據(jù)同步。

解析oplog

和訂閱mysql的binlog一樣(模擬一個(gè)從節(jié)點(diǎn)mysql)。我們的訂閱服務(wù)要像從節(jié)點(diǎn)那樣讀取解析oplog.rs里的數(shù)據(jù)。解析前先看下oplog.rs的Document的數(shù)據(jù)結(jié)構(gòu)

上圖是一個(gè)插入的數(shù)據(jù)的日志,可見(jiàn)oplog的doc中共有如下字段,含義分別如下:

ts:操作的時(shí)間戳(非常重要)

t:term最初在主數(shù)據(jù)庫(kù)上生成操作的。(含義不明)

h:本次操作的唯一hashID

v: 版本號(hào)

op:操作類(lèi)型,有六種類(lèi)型,我們只需要關(guān)注其中的i(插入)、u(更新)、d(刪除)即可

ns:庫(kù)名和集合名稱(chēng),中間使用“.”連接

o:本次操作的document內(nèi)容

o2:只有op操作類(lèi)型時(shí)u更新時(shí),才會(huì)有這個(gè)字段,代表更新的條件語(yǔ)句

$set:o2獲取后的文檔里的屬性,代表更新的字段

如上字段,完成一次oplog的解析,只需要ts、op、ns、o、o2、$set即可,其中ts非常重要,可以類(lèi)比為binlog中的Position。同步mysql的數(shù)據(jù)時(shí),通過(guò)記錄消費(fèi)binlog的位置,也就是Position,可以有效避免訂閱服務(wù)停機(jī)后,消費(fèi)記錄丟失的問(wèn)題。同步MongoDB時(shí),通過(guò)記錄ts的值,來(lái)記錄消費(fèi)的位置,可以到達(dá)和訂閱binlog一樣的效果。和mysql訂閱不同的是,MongoDB的同步需要同步服務(wù)自己查詢(xún),而且oplog在MongoDB4.0之前的版本有大小限制,超過(guò)設(shè)置的容量后,老的數(shù)據(jù)就會(huì)被丟失,在4.0之后的版本已經(jīng)解除了這個(gè)限制。

代碼

上面已經(jīng)分析了oplog的結(jié)構(gòu)以及訂閱步驟,下面我們直接構(gòu)建查詢(xún)即可,需要注意,每次獲取到的ts值,需要存儲(chǔ)記錄下來(lái),已便重新訂閱時(shí),從上次斷開(kāi)的記錄重新開(kāi)始。下面直接看代碼,重點(diǎn)邏輯都以注釋詳盡

private BsonTimestamp queryTs;
    @Test
    public void OpLogTest() {
        MongoClient mongoClient = new MongoClient(new MongoClientURI("mongodb://admin:admin@127.0.0.1:3717"));
        MongoCollectioncollection = mongoClient.getDatabase("local")
                .getCollection("oplog.rs");

        //如果是首次訂閱,需要使用自然排序查詢(xún),獲取第最后一次操作的操作時(shí)間戳。如果是續(xù)訂閱直接讀取記錄的值賦值給queryTs即可
        FindIterabletsCursor = collection.find().sort(new BasicDBObject("$natural", -1))
                .limit(1);
        Document tsDoc = tsCursor.first();
        queryTs = (BsonTimestamp) tsDoc.get("ts");
        while (true) try {
            //構(gòu)建查詢(xún)語(yǔ)句,查詢(xún)大于當(dāng)前查詢(xún)時(shí)間戳queryTs的記錄
            BasicDBObject query = new BasicDBObject("ts", new BasicDBObject("$gt", queryTs));
            MongoCursordocCursor = collection.find(query)
                    .cursorType(CursorType.TailableAwait) //沒(méi)有數(shù)據(jù)時(shí)阻塞休眠
                    .noCursorTimeout(true) //防止服務(wù)器在不活動(dòng)時(shí)間(10分鐘)后使空閑的游標(biāo)超時(shí)。
                    .oplogReplay(true) //結(jié)合query條件,獲取增量數(shù)據(jù),這個(gè)參數(shù)比較難懂,見(jiàn):https://docs.mongodb.com/manual/reference/command/find/index.html
                    .maxAwaitTime(1, TimeUnit.SECONDS) //設(shè)置此操作在服務(wù)器上的最大等待執(zhí)行時(shí)間
                    .iterator();
            while (docCursor.hasNext()) {
                Document document = docCursor.next();
                //更新查詢(xún)時(shí)間戳
                queryTs = (BsonTimestamp) document.get("ts");
                //TODO 在這里接收到數(shù)據(jù)后通過(guò)訂閱數(shù)據(jù)路由分發(fā)

                String op = document.getString("op");
                String database = document.getString("ns");
                Document context = (Document) document.get("o");
                Document where = null;
                if (op.equals("u")) {
                    where = (Document) document.get("o2");
                    if (context != null) {
                        context = (Document) context.get("$set");
                    }
                }
                System.err.println("操作時(shí)間戳:" + queryTs.getTime());
                System.err.println("操作類(lèi)  型:" + op);
                System.err.println("數(shù)據(jù)庫(kù).集合:" + database);
                System.err.println("更新條件:" + JSON.toJSONString(where));
                System.err.println("文檔內(nèi)容:" + JSON.toJSONString(context));
            }
        } catch (Exception e) { e.printStackTrace(); }
    }

結(jié)語(yǔ)

上面代碼只是一個(gè)簡(jiǎn)單的測(cè)試用例,完整的應(yīng)用還需要考慮ts的記錄更新,事件的抽象,數(shù)據(jù)的分發(fā)等。我們已經(jīng)開(kāi)源的binlog訂閱分發(fā)項(xiàng)目目前支持?jǐn)?shù)據(jù)源在線管理,訂閱數(shù)據(jù)(庫(kù)、表)在線管理,如果能夠使用同一套管理后臺(tái)管理binlog和oplog的訂閱在好不過(guò)。要實(shí)現(xiàn)和binlog統(tǒng)一管理模型,配置和分發(fā)方面基本不需要改動(dòng),然后從頂層數(shù)據(jù)源方面做區(qū)分實(shí)現(xiàn)即可。

目前我們整合管理的功能都已經(jīng)開(kāi)發(fā)好了,關(guān)于oplog部分的代碼還沒(méi)提交到github上,后面會(huì)和大家相見(jiàn)。

原文鏈接:http://www.kailing.pub/article/index/arcid/274.html

欄目分類(lèi)
最近更新