首頁 > 軟體

ElasticSearch寫入流程範例解析

2022-09-08 18:04:40

一、前言

介紹我們在前面已經知道ElasticSearch底層的寫入是基於lucence依進行doc寫入的。ElasticSearch作為一款分散式系統,在寫入資料時還需要考慮很多重要的事項,比如:可靠性、原子性、一致性、實時性、隔離性、效能等多個指標。

ElasticSearch是如何做到的呢?下面我們針對ElasticSearch的寫入進行分析。

二、lucence寫

2.1 增刪改

ElasticSearch拿到一個doc後呼叫lucence的api進行寫入的。

 public long addDocument();
 public long updateDocuments();
 public long deleteDocuments();

如上面的程式碼所示,我們使用lucence的上面的介面就可以完成檔案的增刪改操作。在lucence中有一個核心的類IndexWriter負責資料寫入和索引相關的工作。

//1. 初始化indexwriter物件
IndexWriter writer = new IndexWriter(new Directory(Paths.get("/index")), new IndexWriterConfig());
//2. 建立檔案
Document doc = new Document();
doc.add(new StringField("empName", "王某某", Field.Store.YES));
doc.add(new TextField("content", "操作了某選單", Field.Store.YES));
//3. 新增檔案
writer.addDocument(doc);
//4. 提交
writer.commit();

以上程式碼演示了最基礎的lucence的寫入操作,主要涉及到幾個關鍵點: 初始化: Directory是負責持久化的,他的具體實現有很多,有本地檔案系統、資料庫、分散式檔案系統等待,ElasticSearch預設的實現是本地檔案系統。 Document: Document就是es中的檔案,FiledType定義了很多索引型別。這裡列舉幾個常見的型別:

  • stored: 欄位原始內容儲存 
  • indexOptions:(NONE/DOCS/DOCS_AND_FREQS/DOCS_AND_FREQS_AND_POSITIONS/DOCS_AND_FREQS_AND_POSITIONS_AND_OFFSETS),倒排索引的選項,儲存詞頻、位置資訊等。
  • docValuesType: 正排索引,建立一個docid到field的的一個列儲存。
  • 一些其它的型別

IndexWriter:IndexWriter在doc進行commit後,才會被持久化並且是可搜尋的。IndexWriterConfig:IndexWriterConfig負責了一些整體的設定引數,並提供了方便使用者進行功能客製化的引數: 

  • Similarity: 這個是搜尋的核心引數,實現了這個介面就能夠進行自定義算分。lucence預設實現了前面文章提到的TF-IDF、BM25演演算法。 
  • MergePolicy: 合併的策略。我們知道ElasticSearch會進行合併,從而減少段的數量。 
  • IndexerThreadPool: 執行緒池的管理。
  • FlushPolicy: flush的策略。
  • Analyzer: 客製化分詞器。
  • IndexDeletionPolicy: 提交管理。

PS:在ElasticSearch中,為了支援分散式的功能,新增了一些系統預設欄位:

  • _uid,主鍵,在寫入的時候,可以指定該Doc的ID值,如果不指定,則系統自動生成一個唯一的UUID值。
  • _version,版本欄位,version來保證對檔案的變更正確的執行,更新檔案時有用。 
  • _source,原始資訊,如果後面維護不需要reindex索引可以關閉該欄位,從而節省空間 
  • _routiong,路由欄位。 
  • 其它的欄位

2.2. 並行模型

上面我們知道indexwriter負責了ElasticSearch索引增刪改查。那它具體是如何管理的呢?

2.2.1. 基本操作

關鍵點:  

  • DocumentsWriter處理寫請求,並分配具體的執行緒DocumentsWriterPerThread
  • DocumentsWriterPerThread具有獨立記憶體空間,對檔案進行處理DocumentsWriter觸發一些flush的操作。
  • DocumentsWriterPerThread中的記憶體In-memory buffer會被flush成獨立的segement檔案。 
  • 對於這種設計,多執行緒的寫入,針對純新增檔案的場景,所有資料都不會有衝突,非常適合隔離的資料寫入方式

2.2.2 更新

Lucene的update和資料庫的update不太一樣,Lucene的更新是查詢後刪除再新增。  

  • 分配一個操作執行緒 
  • 線上程裡執行刪除 
  • 線上程裡執行新增

2.2.3 刪除

上面已經說了,在update中會刪除,普通的也會刪除,lucence維護了一個全域性的刪除表,每個執行緒也會維護一個刪除表,他們雙向同步資料

  • update的刪除會先在內部記錄刪除的資料,然後同步到全域性表中。
  • delete的刪除會作用在Global級別,後非同步同步到執行緒中。
  • Lucene Segment內部,資料實際上其實並不會被真正刪除,Segment內部會維持一個檔案記錄,哪些是docid是刪除的,在merge時,相應的doc檔案會被真正的刪除。

2.2.4 flush和commit

每一個WriterPerThread執行緒會根據flush策略將檔案形成segment檔案,此時segment的檔案還是不可見的,需要indexWriter進行commit後才能被搜尋。 這裡需要注意:ElasticSearch的refresh對應於lucene的flush,ElasticSearch的flush對應於lucene的commit,ElasticSearch在refresh時通過其它方式使得segment變得可讀。

2.2.5 merge

merge是對segment檔案合併的動作,這樣可以提升查詢的效率並且可以真正的刪除的檔案。

小結

在這裡我們稍微總結一下,一個ElasticSearch索引的一個分片對應一個完整的lucene索引, 而一個lucene索引對應多個segment。我們在構建同一個lucene索引的時候, 可能有多個執行緒在並行構建同一個lucene索引, 這個時候每個執行緒會對應一個DocumentsWriterPerThread, 而每個 DocumentsWriterPerThread會對應一個index buffer. 在執行了flush以後, 一個 DocumentsWriterPerThread會生成一個segment。

三、 ElasticSearch的寫

3.1. 宏觀看ElasticSearch請求

在前面的文章已經討論了寫入的流程ElasticSearch

圖片來自官網 當寫入檔案的時候,根據routing規則,會將檔案傳送至特定的Shard中建立lucence。

  • 介紹在Primary Shard上執行成功後,再從Primary Shard上將請求同時傳送給多個Replica Shardgit 
  • 請求在多個Replica Shard上執行成功並返回給Primary Shard後,寫入請求執行成功,返回結果給使用者端

注意上面的寫入延時=主分片延時+max(Replicas Write),即寫入效能如果有副本分片在,就至少是寫入兩個分片的延時延時之和。

3.2. 詳細流程

3.2.1 協調節點內部流程

如上圖所示:

  • 協調節點會對請求檢查放在第一位,如果如果有問題就直接拒絕。主要有長度校驗、必傳引數、型別、版本、id等等。
  • pipeline,使用者可以自定義設定處理器,比如可以對欄位切割或者新增欄位,還支援一些指令碼語言,可以檢視官方檔案編寫。
  • 如果允許自動建立索引(預設是允許的),會先建立索引,建立索引會傳送到主節點上,必須等待master成功響應後,才會進入下一流程。
  • 請求預處理,比如是否會自動生成id、路由,獲取到整個叢集的資訊了,並檢查叢集狀態,比如叢集master不存在,都會被拒絕。
  • 構建sharding請求,比如這一批有5個檔案, 如果都是屬於同一個分片的,那麼就會合併到一個請求裡,會根據路由演演算法將檔案分類放到一個map裡 Map> requestsByShard = new HashMap<>();路由演演算法預設是檔案id%分片數。
  • 轉發請求,有了分片會根據前面的叢集狀態來確定具體的ElasticSearch節點ip,然後並行去請求它們。

3.2.2 主分片節點流程*

 寫入(index)

該部分是elasticsarch的核心寫入流程,在前面的文章也介紹了,請求到該節點會最終呼叫lucence的方法,建立lucence索引。其中主要的關鍵點:

  • ElasticSearch節點接收index請求,存入index buffer,同步存入磁碟translog後返回索引結果
  • Refresh定時將lucence資料生成segment,存入到作業系統快取,此時沒有fsync,清空lucence,此時就可以被ElasticSearch查詢了,如果index buffer佔滿時,也會觸發refresh,預設為jvm的10%。
  • Flush定時將快取中的segments寫入到磁碟,刪除translog。如果translog滿時(512m),也會觸發flush。
  • 如果資料很多,segment的也很多,同時也可能由刪除的檔案,ElasticSearch會定期將它們合併。

update

  • 讀取同id的完整Doc, 記錄版本為version1。
  • 將version1的doc和update請求的Doc合併成一個Doc,更新記憶體中的VersionMap。獲取到完整Doc後。進入後續的操作。
  • 後面的操作會加鎖。
  • 第二次從versionMap中讀取該doc的的最大版本號version2,這裡基本都會從versionMap中獲取到。
  • 檢查版本是否衝突,判斷版本是否一致(衝突),如果發生衝突,則回到第一步,重新執行查詢doc合併操作。如果不衝突,則執行最新的新增doc請求。
  • 介紹在add Doc時,首先將Version + 1得到V3,再將Doc加入到Lucene中去,Lucene中會先刪同id下的已存在doc id,然後再增加新Doc。寫入Lucene成功後,將當前V3更新到versionMap中。
  • 釋放鎖,更新流程就結束了。

介紹其實就是樂觀鎖的機制,每次更新一次版本號加 1 ,不像關係式資料庫有事物,你在更新資料,可能別人也在更新的話,就把你的給覆蓋了。你要更新的時候,先查詢出來,記住版本號,在更新的時候最新的版本號和你查詢的時候不一樣,說明別人先更新了。你應該讀取最新的資料之後再更新。寫成功後,會轉發寫副本分片,等待響應,並最後返回資料給協調節點。具體的流程:

  • 校驗,校驗寫的分片是否存在、索引的狀態是否正常等等。
  • 是否需要延遲執行,如果是則會放入到佇列裡等待。
  • 校驗活躍的分片數是否存在,不足則拒絕寫入。
public boolean enoughShardsActive(final int activeShardCount) {
  if (this.value < 0) {
    throw new IllegalStateException("not enough information to resolve to shard count");
  }
  if (activeShardCount < 0) {
    throw new IllegalArgumentException("activeShardCount cannot be negative");
  }
  return this.value <= activeShardCount;
}

為什麼會要校驗這個活躍的分片數呢?

  • ElasticSearch的索引層有個一waitforactiveshards引數代表寫入的時候必須的分片數,預設是1。如果一個索引是每個分片3個副本的話,那麼一共有4個分片,請求時至少需要校驗存活的分片數至少為1,相當於提前校驗了。如果對資料的可靠性要求很高,就可以調高這個值,必須要達到這個數量才會寫入。
  • 呼叫lucence寫入doc.
  • 寫入translog紀錄檔。
  • 寫入副本分片,迴圈處理副本請求,會傳遞一些資訊。在這裡需要注意,它們是非同步傳送到副本分片上的,並且需要全部等待響應結果,直至超時。
  • 接著上一步,如果有副本分片失敗的情況,會把這個失敗的分片傳送給master,master會更新叢集狀態,這個副本分片會從可分配列表中移除。 

傳送請求至副本

@Override
public void tryAction(ActionListener<ReplicaResponse> listener) {
  replicasProxy.performOn(shard, replicaRequest, primaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, listener);
}

等待結果

privatevoid decPendingAndFinishIfNeeded() {
  assert pendingActions.get() > 0 : "pending action count goes below 0 for request [" + request + "]";
  if (pendingActions.decrementAndGet() == 0) {
    finish();
  }
}

在以前的版本中,其實是非同步請求副本分片的,後來覺得丟失資料的風險很大,就改成同步傳送了,即Primary等Replica返回後再返回給使用者端。如果副本有寫入失敗的,ElasticSearch會進行一些重試,但最終並不強求一定要在多少個節點寫入成功。在返回的結果中,會包含資料在多少個shard中寫入成功了,多少個失敗了,如果有副本上傳失敗,會將失敗的副本上報至Master。

PS:ElasticSearch的資料副本模型和kafka副本很相似,都是採用的是ISR機制。即:ES裡面有一個:in-sync copies概念,主分片會在索引的時候會同步資料至in-sync copies裡面所有的節點,然後再返回ACK給client。而in-sync copies裡面的節點是動態變化的,如果出現極端情況,在in-sync copies列表中只有主分片一個的話,這裡很容易出現SPOF問題,這個是在ElasticSearch中是如何解決的呢?

就是依靠上面我們分析的wait_for_active_shards引數來防止SPOF,如果設定index的wait_for_active_shards=3就會提前校驗必須要有三個活躍的分片才會進行同步,否則拒絕請求。對於可靠性要求高的索引可以提升這個值。

PS:為什麼是先寫lucence再寫入translog呢,這是因為寫入lucence寫入時會有資料檢查,有可能會寫入失敗,這個是發生在記憶體之中的,如果先寫入磁碟的translog的話,還需要回退紀錄檔,比較麻煩

3.2.3 副本分片節點流程8

這個過程和主分片節點的流程基本一樣,有些校驗可能略微不同,最終都會寫入lucence索引。

四、總結

本文介紹了ElasticSearch的寫入流程和一些比較詳細的機制,最後我們總結下開頭我們提出的問題,一個分散式系統需要滿足很多特性,大部分特性都能夠在ElasticSearch中得到滿足。

  • 可靠性:lucence只是個工具,ElasticSearch中通過自己設計的副本來保證了節點的容錯,通過translog紀錄檔保證宕機後能夠恢復。通過這兩套機制提供了可靠性保障。
  • 一致性:ElasticSearch實現的是最終一致性,副本和主分片在同一時刻讀取的資料可能不一致。比如副本的refresh頻率和主分片的頻率可能不一樣。
  • 高效能:ElasticSearch通過多種手段來提升效能,具體包括:
  • lucence自身獨立執行緒維護各自的Segment,多執行緒需要競爭的資源更少,效能更好。 
  • update等操作使用versionMap快取,減少io.
  • refresh至作業系統快取。
  • 原子性、隔離性:使用版本的樂觀鎖機制保證的。
  • 實時性:ElasticSearch設計的是近實時的,如果同步進行refresh、flush將大幅降低效能,所以是”攢一部分資料“再刷入磁碟,不過實時寫入的tranlog紀錄檔還是可以實時通過id查到的。

以上就是ElasticSearch寫入流程範例解析的詳細內容,更多關於ElasticSearch寫入流程的資料請關注it145.com其它相關文章!


IT145.com E-mail:sddin#qq.com