網(wǎng)站首頁(yè) 編程語(yǔ)言 正文
前言
我們開(kāi)源了一個(gè)訂閱分發(fā)mysql的binlog的項(xiàng)目,一直用的非常好,忽然有天開(kāi)發(fā)說(shuō)能不能支持MongoDB的數(shù)據(jù)訂閱呢,MongoDB的使用度也挺廣泛的。安排。經(jīng)過(guò)簡(jiǎn)單的了解后發(fā)現(xiàn)MongoDB也有類(lèi)似binlog的機(jī)制,最終花了兩天時(shí)間把功能完成,并統(tǒng)一抽象集成到binlog開(kāi)源項(xiàng)目中,使用和binlog同一套訂閱分發(fā)模型管理MongoDB數(shù)據(jù)源。整個(gè)過(guò)程非常順利,比整mysql的binlog要簡(jiǎn)單的多了。
oplog簡(jiǎn)介
先來(lái)聊聊MongoDB的主備機(jī)制,和mysql的binlog類(lèi)似,在MongoDB中,有一個(gè)系統(tǒng)庫(kù)“”Local”,庫(kù)里有一個(gè)集合“oplog.rs”,這個(gè)集合類(lèi)似于binlog文件,里面記錄了MongoDB的所有操作。從節(jié)點(diǎn)通過(guò)讀取oplog.rs里的數(shù)據(jù)做到數(shù)據(jù)同步。
解析oplog
和訂閱mysql的binlog一樣(模擬一個(gè)從節(jié)點(diǎn)mysql)。我們的訂閱服務(wù)要像從節(jié)點(diǎn)那樣讀取解析oplog.rs里的數(shù)據(jù)。解析前先看下oplog.rs的Document的數(shù)據(jù)結(jié)構(gòu)
上圖是一個(gè)插入的數(shù)據(jù)的日志,可見(jiàn)oplog的doc中共有如下字段,含義分別如下:
ts
:操作的時(shí)間戳(非常重要)
t
:term最初在主數(shù)據(jù)庫(kù)上生成操作的。(含義不明)
h
:本次操作的唯一hashID
v
: 版本號(hào)
op
:操作類(lèi)型,有六種類(lèi)型,我們只需要關(guān)注其中的i(插入)、u(更新)、d(刪除)即可
ns
:庫(kù)名和集合名稱(chēng),中間使用“.”連接
o
:本次操作的document內(nèi)容
o2
:只有op操作類(lèi)型時(shí)u更新時(shí),才會(huì)有這個(gè)字段,代表更新的條件語(yǔ)句
$set
:o2獲取后的文檔里的屬性,代表更新的字段
如上字段,完成一次oplog的解析,只需要ts、op、ns、o、o2、$set即可,其中ts非常重要,可以類(lèi)比為binlog中的Position。同步mysql的數(shù)據(jù)時(shí),通過(guò)記錄消費(fèi)binlog的位置,也就是Position,可以有效避免訂閱服務(wù)停機(jī)后,消費(fèi)記錄丟失的問(wèn)題。同步MongoDB時(shí),通過(guò)記錄ts的值,來(lái)記錄消費(fèi)的位置,可以到達(dá)和訂閱binlog一樣的效果。和mysql訂閱不同的是,MongoDB的同步需要同步服務(wù)自己查詢(xún),而且oplog在MongoDB4.0之前的版本有大小限制,超過(guò)設(shè)置的容量后,老的數(shù)據(jù)就會(huì)被丟失,在4.0之后的版本已經(jīng)解除了這個(gè)限制。
代碼
上面已經(jīng)分析了oplog的結(jié)構(gòu)以及訂閱步驟,下面我們直接構(gòu)建查詢(xún)即可,需要注意,每次獲取到的ts值,需要存儲(chǔ)記錄下來(lái),已便重新訂閱時(shí),從上次斷開(kāi)的記錄重新開(kāi)始。下面直接看代碼,重點(diǎn)邏輯都以注釋詳盡
private BsonTimestamp queryTs; @Test public void OpLogTest() { MongoClient mongoClient = new MongoClient(new MongoClientURI("mongodb://admin:admin@127.0.0.1:3717")); MongoCollectioncollection = mongoClient.getDatabase("local") .getCollection("oplog.rs"); //如果是首次訂閱,需要使用自然排序查詢(xún),獲取第最后一次操作的操作時(shí)間戳。如果是續(xù)訂閱直接讀取記錄的值賦值給queryTs即可 FindIterabletsCursor = collection.find().sort(new BasicDBObject("$natural", -1)) .limit(1); Document tsDoc = tsCursor.first(); queryTs = (BsonTimestamp) tsDoc.get("ts"); while (true) try { //構(gòu)建查詢(xún)語(yǔ)句,查詢(xún)大于當(dāng)前查詢(xún)時(shí)間戳queryTs的記錄 BasicDBObject query = new BasicDBObject("ts", new BasicDBObject("$gt", queryTs)); MongoCursordocCursor = collection.find(query) .cursorType(CursorType.TailableAwait) //沒(méi)有數(shù)據(jù)時(shí)阻塞休眠 .noCursorTimeout(true) //防止服務(wù)器在不活動(dòng)時(shí)間(10分鐘)后使空閑的游標(biāo)超時(shí)。 .oplogReplay(true) //結(jié)合query條件,獲取增量數(shù)據(jù),這個(gè)參數(shù)比較難懂,見(jiàn):https://docs.mongodb.com/manual/reference/command/find/index.html .maxAwaitTime(1, TimeUnit.SECONDS) //設(shè)置此操作在服務(wù)器上的最大等待執(zhí)行時(shí)間 .iterator(); while (docCursor.hasNext()) { Document document = docCursor.next(); //更新查詢(xún)時(shí)間戳 queryTs = (BsonTimestamp) document.get("ts"); //TODO 在這里接收到數(shù)據(jù)后通過(guò)訂閱數(shù)據(jù)路由分發(fā) String op = document.getString("op"); String database = document.getString("ns"); Document context = (Document) document.get("o"); Document where = null; if (op.equals("u")) { where = (Document) document.get("o2"); if (context != null) { context = (Document) context.get("$set"); } } System.err.println("操作時(shí)間戳:" + queryTs.getTime()); System.err.println("操作類(lèi) 型:" + op); System.err.println("數(shù)據(jù)庫(kù).集合:" + database); System.err.println("更新條件:" + JSON.toJSONString(where)); System.err.println("文檔內(nèi)容:" + JSON.toJSONString(context)); } } catch (Exception e) { e.printStackTrace(); } }
結(jié)語(yǔ)
上面代碼只是一個(gè)簡(jiǎn)單的測(cè)試用例,完整的應(yīng)用還需要考慮ts的記錄更新,事件的抽象,數(shù)據(jù)的分發(fā)等。我們已經(jīng)開(kāi)源的binlog訂閱分發(fā)項(xiàng)目目前支持?jǐn)?shù)據(jù)源在線管理,訂閱數(shù)據(jù)(庫(kù)、表)在線管理,如果能夠使用同一套管理后臺(tái)管理binlog和oplog的訂閱在好不過(guò)。要實(shí)現(xiàn)和binlog統(tǒng)一管理模型,配置和分發(fā)方面基本不需要改動(dòng),然后從頂層數(shù)據(jù)源方面做區(qū)分實(shí)現(xiàn)即可。
目前我們整合管理的功能都已經(jīng)開(kāi)發(fā)好了,關(guān)于oplog部分的代碼還沒(méi)提交到github上,后面會(huì)和大家相見(jiàn)。
原文鏈接:http://www.kailing.pub/article/index/arcid/274.html
相關(guān)推薦
- 2024-04-01 使用Vite安裝TailwindCSS
- 2024-03-24 golang log包自定義輸出日志格式與寫(xiě)入到文件
- 2022-07-11 deepstream 問(wèn)題
- 2022-03-03 【elementUI】el-table 展開(kāi)行默認(rèn)關(guān)閉或打開(kāi)
- 2023-01-17 Golang?map實(shí)現(xiàn)原理淺析_Golang
- 2021-12-08 Linux之操作文件的系統(tǒng)調(diào)用_Linux
- 2023-06-17 Python構(gòu)建區(qū)塊鏈的方法詳解_python
- 2022-06-12 C語(yǔ)言sizeof和strlen的指針和數(shù)組面試題詳解_C 語(yǔ)言
- 最近更新
-
- window11 系統(tǒng)安裝 yarn
- 超詳細(xì)win安裝深度學(xué)習(xí)環(huán)境2025年最新版(
- Linux 中運(yùn)行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲(chǔ)小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎(chǔ)操作-- 運(yùn)算符,流程控制 Flo
- 1. Int 和Integer 的區(qū)別,Jav
- spring @retryable不生效的一種
- Spring Security之認(rèn)證信息的處理
- Spring Security之認(rèn)證過(guò)濾器
- Spring Security概述快速入門(mén)
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權(quán)
- redisson分布式鎖中waittime的設(shè)
- maven:解決release錯(cuò)誤:Artif
- restTemplate使用總結(jié)
- Spring Security之安全異常處理
- MybatisPlus優(yōu)雅實(shí)現(xiàn)加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務(wù)發(fā)現(xiàn)-Nac
- Spring Security之基于HttpR
- Redis 底層數(shù)據(jù)結(jié)構(gòu)-簡(jiǎn)單動(dòng)態(tài)字符串(SD
- arthas操作spring被代理目標(biāo)對(duì)象命令
- Spring中的單例模式應(yīng)用詳解
- 聊聊消息隊(duì)列,發(fā)送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠(yuǎn)程分支