日本免费高清视频-国产福利视频导航-黄色在线播放国产-天天操天天操天天操天天操|www.shdianci.com

學無先后,達者為師

網站首頁 編程語言 正文

ElasticSearch寫入流程實例解析_相關技巧

作者:IT巔峰技術 ? 更新時間: 2022-11-01 編程語言

一、前言

介紹我們在前面已經知道ElasticSearch底層的寫入是基于lucence依進行doc寫入的。ElasticSearch作為一款分布式系統,在寫入數據時還需要考慮很多重要的事項,比如:可靠性、原子性、一致性、實時性、隔離性、性能等多個指標。

ElasticSearch是如何做到的呢?下面我們針對ElasticSearch的寫入進行分析。

二、lucence寫

2.1 增刪改

ElasticSearch拿到一個doc后調用lucence的api進行寫入的。

 public long addDocument();
 public long updateDocuments();
 public long deleteDocuments();

如上面的代碼所示,我們使用lucence的上面的接口就可以完成文檔的增刪改操作。在lucence中有一個核心的類IndexWriter負責數據寫入和索引相關的工作。

//1. 初始化indexwriter對象
IndexWriter writer = new IndexWriter(new Directory(Paths.get("/index")), new IndexWriterConfig());
//2. 創建文檔
Document doc = new Document();
doc.add(new StringField("empName", "王某某", Field.Store.YES));
doc.add(new TextField("content", "操作了某菜單", Field.Store.YES));
//3. 添加文檔
writer.addDocument(doc);
//4. 提交
writer.commit();

以上代碼演示了最基礎的lucence的寫入操作,主要涉及到幾個關鍵點: 初始化: Directory是負責持久化的,他的具體實現有很多,有本地文件系統、數據庫、分布式文件系統等待,ElasticSearch默認的實現是本地文件系統。 Document: Document就是es中的文檔,FiledType定義了很多索引類型。這里列舉幾個常見的類型:

  • stored: 字段原始內容存儲?
  • indexOptions:(NONE/DOCS/DOCS_AND_FREQS/DOCS_AND_FREQS_AND_POSITIONS/DOCS_AND_FREQS_AND_POSITIONS_AND_OFFSETS),倒排索引的選項,存儲詞頻、位置信息等。
  • docValuesType: 正排索引,建立一個docid到field的的一個列存儲。
  • 一些其它的類型

IndexWriter:IndexWriter在doc進行commit后,才會被持久化并且是可搜索的。IndexWriterConfig:IndexWriterConfig負責了一些整體的配置參數,并提供了方便使用者進行功能定制的參數:?

  • Similarity: 這個是搜索的核心參數,實現了這個接口就能夠進行自定義算分。lucence默認實現了前面文章提到的TF-IDF、BM25算法。?
  • MergePolicy: 合并的策略。我們知道ElasticSearch會進行合并,從而減少段的數量。?
  • IndexerThreadPool: 線程池的管理。
  • FlushPolicy: flush的策略。
  • Analyzer: 定制分詞器。
  • IndexDeletionPolicy: 提交管理。

PS:在ElasticSearch中,為了支持分布式的功能,新增了一些系統默認字段:

  • _uid,主鍵,在寫入的時候,可以指定該Doc的ID值,如果不指定,則系統自動生成一個唯一的UUID值。
  • _version,版本字段,version來保證對文檔的變更正確的執行,更新文檔時有用。?
  • _source,原始信息,如果后面維護不需要reindex索引可以關閉該字段,從而節省空間?
  • _routiong,路由字段。?
  • 其它的字段

2.2. 并發模型

上面我們知道indexwriter負責了ElasticSearch索引增刪改查。那它具體是如何管理的呢?

2.2.1. 基本操作

關鍵點:??

  • DocumentsWriter處理寫請求,并分配具體的線程DocumentsWriterPerThread
  • DocumentsWriterPerThread具有獨立內存空間,對文檔進行處理DocumentsWriter觸發一些flush的操作。
  • DocumentsWriterPerThread中的內存In-memory buffer會被flush成獨立的segement文件。?
  • 對于這種設計,多線程的寫入,針對純新增文檔的場景,所有數據都不會有沖突,非常適合隔離的數據寫入方式

2.2.2 更新

Lucene的update和數據庫的update不太一樣,Lucene的更新是查詢后刪除再新增。??

  • 分配一個操作線程?
  • 在線程里執行刪除?
  • 在線程里執行新增

2.2.3 刪除

上面已經說了,在update中會刪除,普通的也會刪除,lucence維護了一個全局的刪除表,每個線程也會維護一個刪除表,他們雙向同步數據

  • update的刪除會先在內部記錄刪除的數據,然后同步到全局表中。
  • delete的刪除會作用在Global級別,后異步同步到線程中。
  • Lucene Segment內部,數據實際上其實并不會被真正刪除,Segment內部會維持一個文件記錄,哪些是docid是刪除的,在merge時,相應的doc文檔會被真正的刪除。

2.2.4 flush和commit

每一個WriterPerThread線程會根據flush策略將文檔形成segment文件,此時segment的文件還是不可見的,需要indexWriter進行commit后才能被搜索。?這里需要注意:ElasticSearch的refresh對應于lucene的flush,ElasticSearch的flush對應于lucene的commit,ElasticSearch在refresh時通過其它方式使得segment變得可讀。

2.2.5?merge

merge是對segment文件合并的動作,這樣可以提升查詢的效率并且可以真正的刪除的文檔。

小結

在這里我們稍微總結一下,一個ElasticSearch索引的一個分片對應一個完整的lucene索引, 而一個lucene索引對應多個segment。我們在構建同一個lucene索引的時候, 可能有多個線程在并發構建同一個lucene索引, 這個時候每個線程會對應一個DocumentsWriterPerThread, 而每個 DocumentsWriterPerThread會對應一個index buffer.?在執行了flush以后, 一個 DocumentsWriterPerThread會生成一個segment。

三、?ElasticSearch的寫

3.1. 宏觀看ElasticSearch請求

在前面的文章已經討論了寫入的流程ElasticSearch

圖片來自官網?當寫入文檔的時候,根據routing規則,會將文檔發送至特定的Shard中建立lucence。

  • 介紹在Primary Shard上執行成功后,再從Primary Shard上將請求同時發送給多個Replica Shardgit?
  • 請求在多個Replica Shard上執行成功并返回給Primary Shard后,寫入請求執行成功,返回結果給客戶端

注意上面的寫入延時=主分片延時+max(Replicas Write),即寫入性能如果有副本分片在,就至少是寫入兩個分片的延時延時之和。

3.2. 詳細流程

3.2.1 協調節點內部流程

如上圖所示:

  • 協調節點會對請求檢查放在第一位,如果如果有問題就直接拒絕。主要有長度校驗、必傳參數、類型、版本、id等等。
  • pipeline,用戶可以自定義設置處理器,比如可以對字段切割或者新增字段,還支持一些腳本語言,可以查看官方文檔編寫。
  • 如果允許自動創建索引(默認是允許的),會先創建索引,創建索引會發送到主節點上,必須等待master成功響應后,才會進入下一流程。
  • 請求預處理,比如是否會自動生成id、路由,獲取到整個集群的信息了,并檢查集群狀態,比如集群master不存在,都會被拒絕。
  • 構建sharding請求,比如這一批有5個文檔, 如果都是屬于同一個分片的,那么就會合并到一個請求里,會根據路由算法將文檔分類放到一個map里 Map> requestsByShard = new HashMap<>();路由算法默認是文檔id%分片數。
  • 轉發請求,有了分片會根據前面的集群狀態來確定具體的ElasticSearch節點ip,然后并行去請求它們。

3.2.2 主分片節點流程*

?寫入(index)

該部分是elasticsarch的核心寫入流程,在前面的文章也介紹了,請求到該節點會最終調用lucence的方法,建立lucence索引。其中主要的關鍵點:

  • ElasticSearch節點接收index請求,存入index buffer,同步存入磁盤translog后返回索引結果
  • Refresh定時將lucence數據生成segment,存入到操作系統緩存,此時沒有fsync,清空lucence,此時就可以被ElasticSearch查詢了,如果index buffer占滿時,也會觸發refresh,默認為jvm的10%。
  • Flush定時將緩存中的segments寫入到磁盤,刪除translog。如果translog滿時(512m),也會觸發flush。
  • 如果數據很多,segment的也很多,同時也可能由刪除的文檔,ElasticSearch會定期將它們合并。

update

  • 讀取同id的完整Doc, 記錄版本為version1。
  • 將version1的doc和update請求的Doc合并成一個Doc,更新內存中的VersionMap。獲取到完整Doc后。進入后續的操作。
  • 后面的操作會加鎖。
  • 第二次從versionMap中讀取該doc的的最大版本號version2,這里基本都會從versionMap中獲取到。
  • 檢查版本是否沖突,判斷版本是否一致(沖突),如果發生沖突,則回到第一步,重新執行查詢doc合并操作。如果不沖突,則執行最新的添加doc請求。
  • 介紹在add Doc時,首先將Version + 1得到V3,再將Doc加入到Lucene中去,Lucene中會先刪同id下的已存在doc id,然后再增加新Doc。寫入Lucene成功后,將當前V3更新到versionMap中。
  • 釋放鎖,更新流程就結束了。

介紹其實就是樂觀鎖的機制,每次更新一次版本號加 1 ,不像關系式數據庫有事物,你在更新數據,可能別人也在更新的話,就把你的給覆蓋了。你要更新的時候,先查詢出來,記住版本號,在更新的時候最新的版本號和你查詢的時候不一樣,說明別人先更新了。你應該讀取最新的數據之后再更新。寫成功后,會轉發寫副本分片,等待響應,并最后返回數據給協調節點。具體的流程:

  • 校驗,校驗寫的分片是否存在、索引的狀態是否正常等等。
  • 是否需要延遲執行,如果是則會放入到隊列里等待。
  • 校驗活躍的分片數是否存在,不足則拒絕寫入。
public boolean enoughShardsActive(final int activeShardCount) {
  if (this.value < 0) {
    throw new IllegalStateException("not enough information to resolve to shard count");
  }
  if (activeShardCount < 0) {
    throw new IllegalArgumentException("activeShardCount cannot be negative");
  }
  return this.value <= activeShardCount;
}

為什么會要校驗這個活躍的分片數呢?

  • ElasticSearch的索引層有個一waitforactiveshards參數代表寫入的時候必須的分片數,默認是1。如果一個索引是每個分片3個副本的話,那么一共有4個分片,請求時至少需要校驗存活的分片數至少為1,相當于提前校驗了。如果對數據的可靠性要求很高,就可以調高這個值,必須要達到這個數量才會寫入。
  • 調用lucence寫入doc.
  • 寫入translog日志。
  • 寫入副本分片,循環處理副本請求,會傳遞一些信息。在這里需要注意,它們是異步發送到副本分片上的,并且需要全部等待響應結果,直至超時。
  • 接著上一步,如果有副本分片失敗的情況,會把這個失敗的分片發送給master,master會更新集群狀態,這個副本分片會從可分配列表中移除。?

發送請求至副本

@Override
public void tryAction(ActionListener<ReplicaResponse> listener) {
  replicasProxy.performOn(shard, replicaRequest, primaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, listener);
}

等待結果

privatevoid decPendingAndFinishIfNeeded() {
  assert pendingActions.get() > 0 : "pending action count goes below 0 for request [" + request + "]";
  if (pendingActions.decrementAndGet() == 0) {
    finish();
  }
}

在以前的版本中,其實是異步請求副本分片的,后來覺得丟失數據的風險很大,就改成同步發送了,即Primary等Replica返回后再返回給客戶端。如果副本有寫入失敗的,ElasticSearch會進行一些重試,但最終并不強求一定要在多少個節點寫入成功。在返回的結果中,會包含數據在多少個shard中寫入成功了,多少個失敗了,如果有副本上傳失敗,會將失敗的副本上報至Master。

PS:ElasticSearch的數據副本模型和kafka副本很相似,都是采用的是ISR機制。即:ES里面有一個:in-sync copies概念,主分片會在索引的時候會同步數據至in-sync copies里面所有的節點,然后再返回ACK給client。而in-sync copies里面的節點是動態變化的,如果出現極端情況,在in-sync copies列表中只有主分片一個的話,這里很容易出現SPOF問題,這個是在ElasticSearch中是如何解決的呢?

就是依靠上面我們分析的wait_for_active_shards參數來防止SPOF,如果配置index的wait_for_active_shards=3就會提前校驗必須要有三個活躍的分片才會進行同步,否則拒絕請求。對于可靠性要求高的索引可以提升這個值。

PS:為什么是先寫lucence再寫入translog呢,這是因為寫入lucence寫入時會有數據檢查,有可能會寫入失敗,這個是發生在內存之中的,如果先寫入磁盤的translog的話,還需要回退日志,比較麻煩

3.2.3 副本分片節點流程8

這個過程和主分片節點的流程基本一樣,有些校驗可能略微不同,最終都會寫入lucence索引。

四、總結

本文介紹了ElasticSearch的寫入流程和一些比較詳細的機制,最后我們總結下開頭我們提出的問題,一個分布式系統需要滿足很多特性,大部分特性都能夠在ElasticSearch中得到滿足。

  • 可靠性:lucence只是個工具,ElasticSearch中通過自己設計的副本來保證了節點的容錯,通過translog日志保證宕機后能夠恢復。通過這兩套機制提供了可靠性保障。
  • 一致性:ElasticSearch實現的是最終一致性,副本和主分片在同一時刻讀取的數據可能不一致。比如副本的refresh頻率和主分片的頻率可能不一樣。
  • 高性能:ElasticSearch通過多種手段來提升性能,具體包括:
  • lucence自身獨立線程維護各自的Segment,多線程需要競爭的資源更少,性能更好。?
  • update等操作使用versionMap緩存,減少io.
  • refresh至操作系統緩存。
  • 原子性、隔離性:使用版本的樂觀鎖機制保證的。
  • 實時性:ElasticSearch設計的是近實時的,如果同步進行refresh、flush將大幅降低性能,所以是”攢一部分數據“再刷入磁盤,不過實時寫入的tranlog日志還是可以實時通過id查到的。

原文鏈接:https://juejin.cn/post/7030961140167344159

欄目分類
最近更新