網站首頁 編程語言 正文
一、前言
介紹我們在前面已經知道ElasticSearch底層的寫入是基于lucence依進行doc寫入的。ElasticSearch作為一款分布式系統,在寫入數據時還需要考慮很多重要的事項,比如:可靠性、原子性、一致性、實時性、隔離性、性能等多個指標。
ElasticSearch是如何做到的呢?下面我們針對ElasticSearch的寫入進行分析。
二、lucence寫
2.1 增刪改
ElasticSearch拿到一個doc后調用lucence的api進行寫入的。
public long addDocument();
public long updateDocuments();
public long deleteDocuments();
如上面的代碼所示,我們使用lucence的上面的接口就可以完成文檔的增刪改操作。在lucence中有一個核心的類IndexWriter負責數據寫入和索引相關的工作。
//1. 初始化indexwriter對象
IndexWriter writer = new IndexWriter(new Directory(Paths.get("/index")), new IndexWriterConfig());
//2. 創建文檔
Document doc = new Document();
doc.add(new StringField("empName", "王某某", Field.Store.YES));
doc.add(new TextField("content", "操作了某菜單", Field.Store.YES));
//3. 添加文檔
writer.addDocument(doc);
//4. 提交
writer.commit();
以上代碼演示了最基礎的lucence的寫入操作,主要涉及到幾個關鍵點: 初始化: Directory是負責持久化的,他的具體實現有很多,有本地文件系統、數據庫、分布式文件系統等待,ElasticSearch默認的實現是本地文件系統。 Document: Document就是es中的文檔,FiledType定義了很多索引類型。這里列舉幾個常見的類型:
- stored: 字段原始內容存儲?
- indexOptions:(NONE/DOCS/DOCS_AND_FREQS/DOCS_AND_FREQS_AND_POSITIONS/DOCS_AND_FREQS_AND_POSITIONS_AND_OFFSETS),倒排索引的選項,存儲詞頻、位置信息等。
- docValuesType: 正排索引,建立一個docid到field的的一個列存儲。
- 一些其它的類型
IndexWriter:IndexWriter在doc進行commit后,才會被持久化并且是可搜索的。IndexWriterConfig:IndexWriterConfig負責了一些整體的配置參數,并提供了方便使用者進行功能定制的參數:?
- Similarity: 這個是搜索的核心參數,實現了這個接口就能夠進行自定義算分。lucence默認實現了前面文章提到的TF-IDF、BM25算法。?
- MergePolicy: 合并的策略。我們知道ElasticSearch會進行合并,從而減少段的數量。?
- IndexerThreadPool: 線程池的管理。
- FlushPolicy: flush的策略。
- Analyzer: 定制分詞器。
- IndexDeletionPolicy: 提交管理。
PS:在ElasticSearch中,為了支持分布式的功能,新增了一些系統默認字段:
- _uid,主鍵,在寫入的時候,可以指定該Doc的ID值,如果不指定,則系統自動生成一個唯一的UUID值。
- _version,版本字段,version來保證對文檔的變更正確的執行,更新文檔時有用。?
- _source,原始信息,如果后面維護不需要reindex索引可以關閉該字段,從而節省空間?
- _routiong,路由字段。?
- 其它的字段
2.2. 并發模型
上面我們知道indexwriter負責了ElasticSearch索引增刪改查。那它具體是如何管理的呢?
2.2.1. 基本操作
關鍵點:??
- DocumentsWriter處理寫請求,并分配具體的線程DocumentsWriterPerThread
- DocumentsWriterPerThread具有獨立內存空間,對文檔進行處理DocumentsWriter觸發一些flush的操作。
- DocumentsWriterPerThread中的內存In-memory buffer會被flush成獨立的segement文件。?
- 對于這種設計,多線程的寫入,針對純新增文檔的場景,所有數據都不會有沖突,非常適合隔離的數據寫入方式
2.2.2 更新
Lucene的update和數據庫的update不太一樣,Lucene的更新是查詢后刪除再新增。??
- 分配一個操作線程?
- 在線程里執行刪除?
- 在線程里執行新增
2.2.3 刪除
上面已經說了,在update中會刪除,普通的也會刪除,lucence維護了一個全局的刪除表,每個線程也會維護一個刪除表,他們雙向同步數據
- update的刪除會先在內部記錄刪除的數據,然后同步到全局表中。
- delete的刪除會作用在Global級別,后異步同步到線程中。
- Lucene Segment內部,數據實際上其實并不會被真正刪除,Segment內部會維持一個文件記錄,哪些是docid是刪除的,在merge時,相應的doc文檔會被真正的刪除。
2.2.4 flush和commit
每一個WriterPerThread線程會根據flush策略將文檔形成segment文件,此時segment的文件還是不可見的,需要indexWriter進行commit后才能被搜索。?這里需要注意:ElasticSearch的refresh對應于lucene的flush,ElasticSearch的flush對應于lucene的commit,ElasticSearch在refresh時通過其它方式使得segment變得可讀。
2.2.5?merge
merge是對segment文件合并的動作,這樣可以提升查詢的效率并且可以真正的刪除的文檔。
小結
在這里我們稍微總結一下,一個ElasticSearch索引的一個分片對應一個完整的lucene索引, 而一個lucene索引對應多個segment。我們在構建同一個lucene索引的時候, 可能有多個線程在并發構建同一個lucene索引, 這個時候每個線程會對應一個DocumentsWriterPerThread, 而每個 DocumentsWriterPerThread會對應一個index buffer.?在執行了flush以后, 一個 DocumentsWriterPerThread會生成一個segment。
三、?ElasticSearch的寫
3.1. 宏觀看ElasticSearch請求
在前面的文章已經討論了寫入的流程ElasticSearch
圖片來自官網?當寫入文檔的時候,根據routing規則,會將文檔發送至特定的Shard中建立lucence。
- 介紹在Primary Shard上執行成功后,再從Primary Shard上將請求同時發送給多個Replica Shardgit?
- 請求在多個Replica Shard上執行成功并返回給Primary Shard后,寫入請求執行成功,返回結果給客戶端
注意上面的寫入延時=主分片延時+max(Replicas Write),即寫入性能如果有副本分片在,就至少是寫入兩個分片的延時延時之和。
3.2. 詳細流程
3.2.1 協調節點內部流程
如上圖所示:
- 協調節點會對請求檢查放在第一位,如果如果有問題就直接拒絕。主要有長度校驗、必傳參數、類型、版本、id等等。
- pipeline,用戶可以自定義設置處理器,比如可以對字段切割或者新增字段,還支持一些腳本語言,可以查看官方文檔編寫。
- 如果允許自動創建索引(默認是允許的),會先創建索引,創建索引會發送到主節點上,必須等待master成功響應后,才會進入下一流程。
- 請求預處理,比如是否會自動生成id、路由,獲取到整個集群的信息了,并檢查集群狀態,比如集群master不存在,都會被拒絕。
- 構建sharding請求,比如這一批有5個文檔, 如果都是屬于同一個分片的,那么就會合并到一個請求里,會根據路由算法將文檔分類放到一個map里 Map> requestsByShard = new HashMap<>();路由算法默認是文檔id%分片數。
- 轉發請求,有了分片會根據前面的集群狀態來確定具體的ElasticSearch節點ip,然后并行去請求它們。
3.2.2 主分片節點流程*
?寫入(index)
該部分是elasticsarch的核心寫入流程,在前面的文章也介紹了,請求到該節點會最終調用lucence的方法,建立lucence索引。其中主要的關鍵點:
- ElasticSearch節點接收index請求,存入index buffer,同步存入磁盤translog后返回索引結果
- Refresh定時將lucence數據生成segment,存入到操作系統緩存,此時沒有fsync,清空lucence,此時就可以被ElasticSearch查詢了,如果index buffer占滿時,也會觸發refresh,默認為jvm的10%。
- Flush定時將緩存中的segments寫入到磁盤,刪除translog。如果translog滿時(512m),也會觸發flush。
- 如果數據很多,segment的也很多,同時也可能由刪除的文檔,ElasticSearch會定期將它們合并。
update
- 讀取同id的完整Doc, 記錄版本為version1。
- 將version1的doc和update請求的Doc合并成一個Doc,更新內存中的VersionMap。獲取到完整Doc后。進入后續的操作。
- 后面的操作會加鎖。
- 第二次從versionMap中讀取該doc的的最大版本號version2,這里基本都會從versionMap中獲取到。
- 檢查版本是否沖突,判斷版本是否一致(沖突),如果發生沖突,則回到第一步,重新執行查詢doc合并操作。如果不沖突,則執行最新的添加doc請求。
- 介紹在add Doc時,首先將Version + 1得到V3,再將Doc加入到Lucene中去,Lucene中會先刪同id下的已存在doc id,然后再增加新Doc。寫入Lucene成功后,將當前V3更新到versionMap中。
- 釋放鎖,更新流程就結束了。
介紹其實就是樂觀鎖的機制,每次更新一次版本號加 1 ,不像關系式數據庫有事物,你在更新數據,可能別人也在更新的話,就把你的給覆蓋了。你要更新的時候,先查詢出來,記住版本號,在更新的時候最新的版本號和你查詢的時候不一樣,說明別人先更新了。你應該讀取最新的數據之后再更新。寫成功后,會轉發寫副本分片,等待響應,并最后返回數據給協調節點。具體的流程:
- 校驗,校驗寫的分片是否存在、索引的狀態是否正常等等。
- 是否需要延遲執行,如果是則會放入到隊列里等待。
- 校驗活躍的分片數是否存在,不足則拒絕寫入。
public boolean enoughShardsActive(final int activeShardCount) {
if (this.value < 0) {
throw new IllegalStateException("not enough information to resolve to shard count");
}
if (activeShardCount < 0) {
throw new IllegalArgumentException("activeShardCount cannot be negative");
}
return this.value <= activeShardCount;
}
為什么會要校驗這個活躍的分片數呢?
- ElasticSearch的索引層有個一waitforactiveshards參數代表寫入的時候必須的分片數,默認是1。如果一個索引是每個分片3個副本的話,那么一共有4個分片,請求時至少需要校驗存活的分片數至少為1,相當于提前校驗了。如果對數據的可靠性要求很高,就可以調高這個值,必須要達到這個數量才會寫入。
- 調用lucence寫入doc.
- 寫入translog日志。
- 寫入副本分片,循環處理副本請求,會傳遞一些信息。在這里需要注意,它們是異步發送到副本分片上的,并且需要全部等待響應結果,直至超時。
- 接著上一步,如果有副本分片失敗的情況,會把這個失敗的分片發送給master,master會更新集群狀態,這個副本分片會從可分配列表中移除。?
發送請求至副本
@Override
public void tryAction(ActionListener<ReplicaResponse> listener) {
replicasProxy.performOn(shard, replicaRequest, primaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, listener);
}
等待結果
privatevoid decPendingAndFinishIfNeeded() {
assert pendingActions.get() > 0 : "pending action count goes below 0 for request [" + request + "]";
if (pendingActions.decrementAndGet() == 0) {
finish();
}
}
在以前的版本中,其實是異步請求副本分片的,后來覺得丟失數據的風險很大,就改成同步發送了,即Primary等Replica返回后再返回給客戶端。如果副本有寫入失敗的,ElasticSearch會進行一些重試,但最終并不強求一定要在多少個節點寫入成功。在返回的結果中,會包含數據在多少個shard中寫入成功了,多少個失敗了,如果有副本上傳失敗,會將失敗的副本上報至Master。
PS:ElasticSearch的數據副本模型和kafka副本很相似,都是采用的是ISR機制。即:ES里面有一個:in-sync copies概念,主分片會在索引的時候會同步數據至in-sync copies里面所有的節點,然后再返回ACK給client。而in-sync copies里面的節點是動態變化的,如果出現極端情況,在in-sync copies列表中只有主分片一個的話,這里很容易出現SPOF問題,這個是在ElasticSearch中是如何解決的呢?
就是依靠上面我們分析的wait_for_active_shards參數來防止SPOF,如果配置index的wait_for_active_shards=3就會提前校驗必須要有三個活躍的分片才會進行同步,否則拒絕請求。對于可靠性要求高的索引可以提升這個值。
PS:為什么是先寫lucence再寫入translog呢,這是因為寫入lucence寫入時會有數據檢查,有可能會寫入失敗,這個是發生在內存之中的,如果先寫入磁盤的translog的話,還需要回退日志,比較麻煩
3.2.3 副本分片節點流程8
這個過程和主分片節點的流程基本一樣,有些校驗可能略微不同,最終都會寫入lucence索引。
四、總結
本文介紹了ElasticSearch的寫入流程和一些比較詳細的機制,最后我們總結下開頭我們提出的問題,一個分布式系統需要滿足很多特性,大部分特性都能夠在ElasticSearch中得到滿足。
- 可靠性:lucence只是個工具,ElasticSearch中通過自己設計的副本來保證了節點的容錯,通過translog日志保證宕機后能夠恢復。通過這兩套機制提供了可靠性保障。
- 一致性:ElasticSearch實現的是最終一致性,副本和主分片在同一時刻讀取的數據可能不一致。比如副本的refresh頻率和主分片的頻率可能不一樣。
- 高性能:ElasticSearch通過多種手段來提升性能,具體包括:
- lucence自身獨立線程維護各自的Segment,多線程需要競爭的資源更少,性能更好。?
- update等操作使用versionMap緩存,減少io.
- refresh至操作系統緩存。
- 原子性、隔離性:使用版本的樂觀鎖機制保證的。
- 實時性:ElasticSearch設計的是近實時的,如果同步進行refresh、flush將大幅降低性能,所以是”攢一部分數據“再刷入磁盤,不過實時寫入的tranlog日志還是可以實時通過id查到的。
原文鏈接:https://juejin.cn/post/7030961140167344159
相關推薦
- 2022-10-01 iOS簡單實現輪播圖效果_IOS
- 2023-12-10 記錄一次多數據源配置失效的情況
- 2022-03-17 Android自定View實現滑動驗證效果的代碼_Android
- 2022-06-10 Docker部署springboot項目到騰訊云的實現步驟_docker
- 2022-02-27 Error in render: “TypeError: Cannot read propertie
- 2022-11-24 Python?Django中間件詳細介紹_python
- 2023-01-08 Flutter開發技巧ListView去除水波紋方法示例_Android
- 2022-09-20 Python處理時間戳和時間計算等的腳本分享_python
- 最近更新
-
- window11 系統安裝 yarn
- 超詳細win安裝深度學習環境2025年最新版(
- Linux 中運行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎操作-- 運算符,流程控制 Flo
- 1. Int 和Integer 的區別,Jav
- spring @retryable不生效的一種
- Spring Security之認證信息的處理
- Spring Security之認證過濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權
- redisson分布式鎖中waittime的設
- maven:解決release錯誤:Artif
- restTemplate使用總結
- Spring Security之安全異常處理
- MybatisPlus優雅實現加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務發現-Nac
- Spring Security之基于HttpR
- Redis 底層數據結構-簡單動態字符串(SD
- arthas操作spring被代理目標對象命令
- Spring中的單例模式應用詳解
- 聊聊消息隊列,發送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠程分支