<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
上一篇從結構上分析了action的,本篇將以index action為例仔分析一下action的實現方式。
再概括一下action的作用:對於每種功能(如index)action都會包括兩個基本的類*action(IndexAction)和Transport*action(TransportIndexAction),前者類中會有一個範例(IndexAction INSTANCE = new IndexAction())這個範例用於client繫結對應的TransportAction(registerAction(IndexAction.INSTANCE, TransportIndexAction.class)),繫結過程傳送在ActionModuel中。
另外在Action類中還會定義一個action的名字(String NAME = "indices:data/write/index")這個名字用於TransportService繫結對於的handle,用於處理NettyTransport接收到的資訊。TransportAction的是最終的邏輯處理者,當接收到請求時,會首先判斷本節點能否處理,如果能夠處理則呼叫相關的方法處理得到結果返回,否則將通過NettyTransport轉發該請求到對應的node進行處理。所有的Transport的結構都是這種型別。
首先看一下TransportAction的類圖,所的Transport*action都繼承自於它。
它主要由兩個方法execute和doExecute,execute方法有兩種實現,第一種實現需要自行新增actionListener。最終的邏輯都在doExecute方法中,這個方法在各個功能模組中實現。以下是TransportIndexAction的繼承關係:
實現上由於功能劃分的原因,TransportIndexAction直接繼承自TranspShardReplicationOperationAction,這個抽象類中的方法是所有需要操作shard副本的功能action的父,因此它的實現還包括delete,bulk等功能action。它實現了多個內部類,這些內部類用來輔助完成相關的功能。這裡主要說一下OperationTransportHandler,ReplicaOperationTransportHandler及AsyncShardOperationAction三個子類。
如下所示:
class OperationTransportHandler extends BaseTransportRequestHandler<Request> { //繼承自BaseTransportRequestHanlder ……………… @Override public void messageReceived(final Request request, final TransportChannel channel) throws Exception { // no need to have a threaded listener since we just send back a response request.listenerThreaded(false); // if we have a local operation, execute it on a thread since we don't spawn request.operationThreaded(true); //呼叫Transport的execute方法,通過channel返回結果 execute(request, new ActionListener<Response>() { @Override public void onResponse(Response result) { try { channel.sendResponse(result); } catch (Throwable e) { onFailure(e); } } @Override public void onFailure(Throwable e) { try { channel.sendResponse(e); } catch (Throwable e1) { logger.warn("Failed to send response for " + actionName, e1); } } }); }
看過NettyTransport請求傳送和處理的同學一定對這個程式碼不陌生,這就是elasticsearch節點間處理資訊的典型模式。當請求通過NettyTransport傳送到本節點時會根據請求的action名稱找到對應的handler,使用對應的handler來處理該請求。這個handler就對應著“indices:data/write/index”,可以看到它呼叫execute方法來處理。它的註冊時在TransportShardReplicationOperationAction建構函式中完成的。
知道了OperationTransportHandler,ReplicaOperationTransportHandler就好理解了它的實現方式跟前者完全一樣,對應的action名稱加了一個“[r]”,它的作用是處理需要在副本上進行的操作,程式碼如下所示:
class ReplicaOperationTransportHandler extends BaseTransportRequestHandler<ReplicaOperationRequest> { …………………… @Override public void messageReceived(final ReplicaOperationRequest request, final TransportChannel channel) throws Exception { try { shardOperationOnReplica(request); } catch (Throwable t) { failReplicaIfNeeded(request.shardId.getIndex(), request.shardId.id(), t); throw t; } channel.sendResponse(TransportResponse.Empty.INSTANCE); } }
可以看到程式碼結構非常像,只是呼叫了副本操作的方法shardOperationOnReplica,這個方法在這TransportShardReplicationOperationAction中是抽象的,它的實現在各個子類中,例如deleteaction中實現了對於delete請求如何在副本上處理。
分析完這兩個handle是不是對於action的處理過程有了一定的眉目了呢?但是這才是冰山一角,這兩個Handler是用來接收來自其它節點的請求,如果請求的正好是本節點該如何處理呢?這些邏輯都在AsyncShardOperationAction類中。首先看一下它的內部結構:
因為TransportShardReplicationOperationAction的所有子類都是對索引的修改,會引起資料不一致,因此它的操作流程都是現在primaryShard上操作然後是Replicashard上操作。程式碼如下所示:
protected void doStart() throws ElasticsearchException { try { //檢查是否有阻塞 ClusterBlockException blockException = checkGlobalBlock(observer.observedState()); if (blockException != null) { if (blockException.retryable()) { logger.trace("cluster is blocked ({}), scheduling a retry", blockException.getMessage()); retry(blockException); return; } else { throw blockException; } } //檢測是否是建立索引 if (resolveIndex()) { internalRequest.concreteIndex(observer.observedState().metaData().concreteSingleIndex(internalRequest.request().index(), internalRequest.request().indicesOptions())); } else { internalRequest.concreteIndex(internalRequest.request().index()); } // check if we need to execute, and if not, return if (!resolveRequest(observer.observedState(), internalRequest, listener)) { return; } //再次檢測是否有阻塞 blockException = checkRequestBlock(observer.observedState(), internalRequest); if (blockException != null) { if (blockException.retryable()) { logger.trace("cluster is blocked ({}), scheduling a retry", blockException.getMessage()); retry(blockException); return; } else { throw blockException; } } shardIt = shards(observer.observedState(), internalRequest); } catch (Throwable e) { listener.onFailure(e); return; } //查詢primaryShard boolean foundPrimary = false; ShardRouting shardX; while ((shardX = shardIt.nextOrNull()) != null) { final ShardRouting shard = shardX; // we only deal with primary shardIt here... if (!shard.primary()) { continue; } if (!shard.active() || !observer.observedState().nodes().nodeExists(shard.currentNodeId())) { logger.trace("primary shard [{}] is not yet active or we do not know the node it is assigned to [{}], scheduling a retry.", shard.shardId(), shard.currentNodeId()); retryBecauseUnavailable(shardIt.shardId(), "Primary shard is not active or isn't assigned to a known node."); return; } if (!primaryOperationStarted.compareAndSet(false, true)) { return; } foundPrimary = true; //primaryShard就在本地,直接進行相關操作 if (shard.currentNodeId().equals(observer.observedState().nodes().localNodeId())) { try { if (internalRequest.request().operationThreaded()) { internalRequest.request().beforeLocalFork(); threadPool.executor(executor).execute(new Runnable() { @Override public void run() { try { performOnPrimary(shard.id(), shard); } catch (Throwable t) { listener.onFailure(t); } } }); } else { performOnPrimary(shard.id(), shard); } } catch (Throwable t) { listener.onFailure(t); } } else {//primaryShard在其它節點上,將請求通過truansport傳送到對應的節點。 DiscoveryNode node = observer.observedState().nodes().get(shard.currentNodeId()); transportService.sendRequest(node, actionName, internalRequest.request(), transportOptions, new BaseTransportResponseHandler<Response>() { @Override public Response newInstance() { return newResponseInstance(); } @Override public String executor() { return ThreadPool.Names.SAME; } @Override public void handleResponse(Response response) { listener.onResponse(response); } @Override public void handleException(TransportException exp) { // if we got disconnected from the node, or the node / shard is not in the right state (being closed) if (exp.unwrapCause() instanceof ConnectTransportException || exp.unwrapCause() instanceof NodeClosedException || retryPrimaryException(exp)) { primaryOperationStarted.set(false); internalRequest.request().setCanHaveDuplicates(); // we already marked it as started when we executed it (removed the listener) so pass false // to re-add to the cluster listener logger.trace("received an error from node the primary was assigned to ({}), scheduling a retry", exp.getMessage()); retry(exp); } else { listener.onFailure(exp); } } }); } break; } ……………… }
這就是對應請求的處理過程。
void performOnPrimary(int primaryShardId, final ShardRouting shard) { …… PrimaryResponse<Response, ReplicaRequest> response = shardOperationOnPrimary(clusterState, new PrimaryOperationRequest(primaryShardId, internalRequest.concreteIndex(), internalRequest.request())); performReplicas(response); ………… }
以上就是performOnPrimary方法的部分程式碼,首先呼叫外部類的shardOperationOnPrimary方法,該方法實現在各個子類中,在TransportIndexAction中的實現如下所示:
@Override protected PrimaryResponse<IndexResponse, IndexRequest> shardOperationOnPrimary(ClusterState clusterState, PrimaryOperationRequest shardRequest) throws Throwable { final IndexRequest request = shardRequest.request; // 檢視是否需要routing IndexMetaData indexMetaData = clusterState.metaData().index(shardRequest.shardId.getIndex()); MappingMetaData mappingMd = indexMetaData.mappingOrDefault(request.type()); if (mappingMd != null && mappingMd.routing().required()) { if (request.routing() == null) { throw new RoutingMissingException(shardRequest.shardId.getIndex(), request.type(), request.id()); } } //呼叫indexserice執行對應的index操作 IndexService indexService = indicesService.indexServiceSafe(shardRequest.shardId.getIndex()); IndexShard indexShard = indexService.shardSafe(shardRequest.shardId.id()); SourceToParse sourceToParse = SourceToParse.source(SourceToParse.Origin.PRIMARY, request.source()).type(request.type()).id(request.id()) .routing(request.routing()).parent(request.parent()).timestamp(request.timestamp()).ttl(request.ttl()); long version; boolean created; try { Engine.IndexingOperation op; if (request.opType() == IndexRequest.OpType.INDEX) { Engine.Index index = indexShard.prepareIndex(sourceToParse, request.version(), request.versionType(), Engine.Operation.Origin.PRIMARY, request.canHaveDuplicates()); if (index.parsedDoc().mappingsModified()) { mappingUpdatedAction.updateMappingOnMaster(shardRequest.shardId.getIndex(), index.docMapper(), indexService.indexUUID()); } indexShard.index(index); version = index.version(); op = index; created = index.created(); } else { Engine.Create create = indexShard.prepareCreate(sourceToParse, request.version(), request.versionType(), Engine.Operation.Origin.PRIMARY, request.canHaveDuplicates(), request.autoGeneratedId()); if (create.parsedDoc().mappingsModified()) { mappingUpdatedAction.updateMappingOnMaster(shardRequest.shardId.getIndex(), create.docMapper(), indexService.indexUUID()); } indexShard.create(create); version = create.version(); op = create; created = true; } if (request.refresh()) { try { indexShard.refresh("refresh_flag_index"); } catch (Throwable e) { // ignore } } // update the version on the request, so it will be used for the replicas request.version(version); request.versionType(request.versionType().versionTypeForReplicationAndRecovery()); assert request.versionType().validateVersionForWrites(request.version()); IndexResponse response = new IndexResponse(shardRequest.shardId.getIndex(), request.type(), request.id(), version, created); return new PrimaryResponse<>(shardRequest.request, response, op); } catch (WriteFailureException e) { if (e.getMappingTypeToUpdate() != null) { DocumentMapper docMapper = indexService.mapperService().documentMapper(e.getMappingTypeToUpdate()); if (docMapper != null) { mappingUpdatedAction.updateMappingOnMaster(indexService.index().name(), docMapper, indexService.indexUUID()); } } throw e.getCause(); } }
上面的程式碼就是index的執行過程,這一過程涉及到index的底層操作,這裡就不展開,只是說明它在action中是如何實現的,後面會有詳細說明。接下來看在副本上的操作。副本可能有多個,因此首先呼叫了performReplicas方法,在這個方法中首先開始監聽叢集的狀態,然後便利所有的副本進行處理,如果是非同步則加入一個listener,否則同步執行返回結果。最後呼叫performReplica,在該方法中呼叫外部類的抽象方法shardOperationOnReplica。 這一過程比較簡單,這裡就不再貼程式碼,有興趣可以參考相關原始碼。
這裡以TransportIndexAction為例分析了tansportaction的結構層次。它在TransportAction直接還有一層那就是TransportShardReplicationOperationAction,這個類是actionsupport包中的一個,這個包把所有的子操作方法做了進一步的抽象,抽象出幾個大類放到了這裡,所有其它子功能很多都繼承自這。這個包會在後面有詳細分析。
以上就是elasticsearch原始碼分析index action實現方式的詳細內容,更多關於elasticsearch原始碼分析index action的資料請關注it145.com其它相關文章!
相關文章
<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
综合看Anker超能充系列的性价比很高,并且与不仅和iPhone12/苹果<em>Mac</em>Book很配,而且适合多设备充电需求的日常使用或差旅场景,不管是安卓还是Switch同样也能用得上它,希望这次分享能给准备购入充电器的小伙伴们有所
2021-06-01 09:31:42
除了L4WUDU与吴亦凡已经多次共事,成为了明面上的厂牌成员,吴亦凡还曾带领20XXCLUB全队参加2020年的一场音乐节,这也是20XXCLUB首次全员合照,王嗣尧Turbo、陈彦希Regi、<em>Mac</em> Ova Seas、林渝植等人全部出场。然而让
2021-06-01 09:31:34
目前应用IPFS的机构:1 谷歌<em>浏览器</em>支持IPFS分布式协议 2 万维网 (历史档案博物馆)数据库 3 火狐<em>浏览器</em>支持 IPFS分布式协议 4 EOS 等数字货币数据存储 5 美国国会图书馆,历史资料永久保存在 IPFS 6 加
2021-06-01 09:31:24
开拓者的车机是兼容苹果和<em>安卓</em>,虽然我不怎么用,但确实兼顾了我家人的很多需求:副驾的门板还配有解锁开关,有的时候老婆开车,下车的时候偶尔会忘记解锁,我在副驾驶可以自己开门:第二排设计很好,不仅配置了一个很大的
2021-06-01 09:30:48
不仅是<em>安卓</em>手机,苹果手机的降价力度也是前所未有了,iPhone12也“跳水价”了,发布价是6799元,如今已经跌至5308元,降价幅度超过1400元,最新定价确认了。iPhone12是苹果首款5G手机,同时也是全球首款5nm芯片的智能机,它
2021-06-01 09:30:45