<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
Action這一部分主要是資料(索引)的操作和部分叢集資訊操作。 所有的請求通過client轉發到對應的action上然後再由對應的TransportAction來執行相關請求。如果請求能在本機上執行則在本機上執行,否則使用Transport進行轉發到對應的節點。action support部分是對action的抽象,所有的具體action都繼承了support action中的某個類。這裡將對這些抽象類進行分析。
這一部分總共分為broadcast(廣播),master,nodes,replication及single幾個部分。broadcast主要針對一些無具體目標主機的操作,如查詢index是否存在,所有繼承這個類的action都具有這種類似的性質;nodes主要是對節點的操作,如熱點執行緒查詢(hotThread)查詢節點上的繁忙執行緒;replication的子類主要是需要或可以在副本上進行的操作,如索引操作,資料不僅要傳送到主shard還要傳送到各個副本。single則主要是目標明確的單shard操作,如get操作,根據doc的id取doc,doc 的id能夠確定它在哪個shard上,因此操作也在此shard上執行。
這些support action的實現可以分為兩類,第一類就是實現一個內部類作為非同步操作器,子類執行doExecute時,初始化該操作器並啟動。另外一種就是直接實現一個方法,子類doExecute方法呼叫該方法進行。TransportBroadcastOperationAction就屬於前者,它實現了內部操作器AsyncBroadcastAction。TransportCountAction繼承於它,它doExecute方法如下所示:
@Override protected void doExecute(CountRequest request, ActionListener<CountResponse> listener) { request.nowInMillis = System.currentTimeMillis(); super.doExecute(request, listener); }
呼叫父類別的doExecute方法,也就是TransportBroadcastOperationAction的方法,它的實現如下所示:
@Override protected void doExecute(Request request, ActionListener<Response> listener) { new AsyncBroadcastAction(request, listener).start(); }
可以看到它初始化了AsyncBroadcastAction並啟動。AsyncBroadcastAction只是確定了操作的流程,及操作完成如何返回response,並未涉及到具體的操作邏輯。因為這些邏輯都在每個子action中實現,不同的action需要進行不同的操作。如count需要count每個shard並且返回最後的總數值,而IndexExistAction則需要對比所有索引檢視查詢的索引是否存在。start方法的程式碼如下所示:
public void start() { //沒有shards if (shardsIts.size() == 0) { // no shards try { listener.onResponse(newResponse(request, new AtomicReferenceArray(0), clusterState)); } catch (Throwable e) { listener.onFailure(e); } return; } request.beforeStart(); // count the local operations, and perform the non local ones int shardIndex = -1; //遍歷對每個shards進行操作 for (final ShardIterator shardIt : shardsIts) { shardIndex++; final ShardRouting shard = shardIt.nextOrNull(); if (shard != null) { performOperation(shardIt, shard, shardIndex); } else { // really, no shards active in this group onOperation(null, shardIt, shardIndex, new NoShardAvailableActionException(shardIt.shardId())); } } }
start方法就是遍歷所有shards,如果shard存在則執行performOperation方法,在這個方法中會區分該請求能否在本機上進行,能執行則呼叫shardOperation方法得到結果。這個方法在這是抽象的,每個子類都有實現。否則傳送到對應的主機上。,如果shard為null則進行onOperation操作,遍歷該shard的其它副本看能否找到可以操作的shard。
如下所示:
protected void performOperation(final ShardIterator shardIt, final ShardRouting shard, final int shardIndex) { if (shard == null) {//shard 為null丟擲異常 // no more active shards... (we should not really get here, just safety) onOperation(null, shardIt, shardIndex, new NoShardAvailableActionException(shardIt.shardId())); } else { try { final ShardRequest shardRequest = newShardRequest(shardIt.size(), shard, request); if (shard.currentNodeId().equals(nodes.localNodeId())) {//shard在本地執行shardOperation方法,並通過onOperation方法封裝結果 threadPool.executor(executor).execute(new Runnable() { @Override public void run() { try { onOperation(shard, shardIndex, shardOperation(shardRequest)); } catch (Throwable e) { onOperation(shard, shardIt, shardIndex, e); } } }); } else {//不是本地shard,傳送到對應節點。 DiscoveryNode node = nodes.get(shard.currentNodeId()); if (node == null) { // no node connected, act as failure onOperation(shard, shardIt, shardIndex, new NoShardAvailableActionException(shardIt.shardId())); } else { transportService.sendRequest(node, transportShardAction, shardRequest, new BaseTransportResponseHandler<ShardResponse>() { @Override public ShardResponse newInstance() { return newShardResponse(); } @Override public String executor() { return ThreadPool.Names.SAME; } @Override public void handleResponse(ShardResponse response) { onOperation(shard, shardIndex, response); } @Override public void handleException(TransportException e) { onOperation(shard, shardIt, shardIndex, e); } }); } } } catch (Throwable e) { onOperation(shard, shardIt, shardIndex, e); } } }
方法shardOperation在countTransportAction的實現如下所示:
@Override protected ShardCountResponse shardOperation(ShardCountRequest request) throws ElasticsearchException { IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex());// IndexShard indexShard = indexService.shardSafe(request.shardId().id()); //構造查詢context SearchShardTarget shardTarget = new SearchShardTarget(clusterService.localNode().id(), request.shardId().getIndex(), request.shardId().id()); SearchContext context = new DefaultSearchContext(0, new ShardSearchLocalRequest(request.types(), request.nowInMillis(), request.filteringAliases()), shardTarget, indexShard.acquireSearcher("count"), indexService, indexShard, scriptService, cacheRecycler, pageCacheRecycler, bigArrays, threadPool.estimatedTimeInMillisCounter()); SearchContext.setCurrent(context); try { // TODO: min score should move to be "null" as a value that is not initialized... if (request.minScore() != -1) { context.minimumScore(request.minScore()); } BytesReference source = request.querySource(); if (source != null && source.length() > 0) { try { QueryParseContext.setTypes(request.types()); context.parsedQuery(indexService.queryParserService().parseQuery(source)); } finally { QueryParseContext.removeTypes(); } } final boolean hasTerminateAfterCount = request.terminateAfter() != DEFAULT_TERMINATE_AFTER; boolean terminatedEarly = false; context.preProcess(); try { long count; if (hasTerminateAfterCount) {//呼叫lucene的封裝介面執行查詢並返回結果 final Lucene.EarlyTerminatingCollector countCollector = Lucene.createCountBasedEarlyTerminatingCollector(request.terminateAfter()); terminatedEarly = Lucene.countWithEarlyTermination(context.searcher(), context.query(), countCollector); count = countCollector.count(); } else { count = Lucene.count(context.searcher(), context.query()); } return new ShardCountResponse(request.shardId(), count, terminatedEarly); } catch (Exception e) { throw new QueryPhaseExecutionException(context, "failed to execute count", e); } } finally { // this will also release the index searcher context.close(); SearchContext.removeCurrent(); } }
可以看到這裡是每個action真正的邏輯實現。因為這裡涉及到index部分的內容,這裡就不詳細分析。後面關於index的分析會有涉及。這就是support action中的第一種實現。
第二種就master的相關操作,因此沒有實現對應的操作類,而只是實現了一個方法。該方法的作用跟操作器作用相同,唯一的不同是它沒有操作器這麼多的變數, 而且它不是非同步的。master的操作需要實時進行,執行過程中需要阻塞某些操作,保證叢集狀態一致性。這裡就不再說明,請參考TransportMasterNodeOperationAction原碼。
本篇概括說了support action,並以countTransportAction為例說明了support Action中的非同步操作器實現,最後簡單的分析了master的同步操作。因為這裡涉及到很多action不可能一一分析,有興趣可以參考對應的程式碼。而且這裡有以下index部分的內容,所以沒有更深入的分析。在後面分析完index的相關功能後,會挑出幾個重要的action做詳細分析。
以上就是elasticsearch資料資訊索引操作action support範例分析的詳細內容,更多關於elasticsearch資料資訊索引操作action support的資料請關注it145.com其它相關文章!
相關文章
<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
综合看Anker超能充系列的性价比很高,并且与不仅和iPhone12/苹果<em>Mac</em>Book很配,而且适合多设备充电需求的日常使用或差旅场景,不管是安卓还是Switch同样也能用得上它,希望这次分享能给准备购入充电器的小伙伴们有所
2021-06-01 09:31:42
除了L4WUDU与吴亦凡已经多次共事,成为了明面上的厂牌成员,吴亦凡还曾带领20XXCLUB全队参加2020年的一场音乐节,这也是20XXCLUB首次全员合照,王嗣尧Turbo、陈彦希Regi、<em>Mac</em> Ova Seas、林渝植等人全部出场。然而让
2021-06-01 09:31:34
目前应用IPFS的机构:1 谷歌<em>浏览器</em>支持IPFS分布式协议 2 万维网 (历史档案博物馆)数据库 3 火狐<em>浏览器</em>支持 IPFS分布式协议 4 EOS 等数字货币数据存储 5 美国国会图书馆,历史资料永久保存在 IPFS 6 加
2021-06-01 09:31:24
开拓者的车机是兼容苹果和<em>安卓</em>,虽然我不怎么用,但确实兼顾了我家人的很多需求:副驾的门板还配有解锁开关,有的时候老婆开车,下车的时候偶尔会忘记解锁,我在副驾驶可以自己开门:第二排设计很好,不仅配置了一个很大的
2021-06-01 09:30:48
不仅是<em>安卓</em>手机,苹果手机的降价力度也是前所未有了,iPhone12也“跳水价”了,发布价是6799元,如今已经跌至5308元,降价幅度超过1400元,最新定价确认了。iPhone12是苹果首款5G手机,同时也是全球首款5nm芯片的智能机,它
2021-06-01 09:30:45