首頁 > 軟體

elasticsearch資料資訊索引操作action support範例分析

2022-04-22 10:00:58

抽象類分析

Action這一部分主要是資料(索引)的操作和部分叢集資訊操作。 所有的請求通過client轉發到對應的action上然後再由對應的TransportAction來執行相關請求。如果請求能在本機上執行則在本機上執行,否則使用Transport進行轉發到對應的節點。action support部分是對action的抽象,所有的具體action都繼承了support action中的某個類。這裡將對這些抽象類進行分析。

這一部分總共分為broadcast(廣播),master,nodes,replication及single幾個部分。broadcast主要針對一些無具體目標主機的操作,如查詢index是否存在,所有繼承這個類的action都具有這種類似的性質;nodes主要是對節點的操作,如熱點執行緒查詢(hotThread)查詢節點上的繁忙執行緒;replication的子類主要是需要或可以在副本上進行的操作,如索引操作,資料不僅要傳送到主shard還要傳送到各個副本。single則主要是目標明確的單shard操作,如get操作,根據doc的id取doc,doc 的id能夠確定它在哪個shard上,因此操作也在此shard上執行。

doExecute方法

這些support action的實現可以分為兩類,第一類就是實現一個內部類作為非同步操作器,子類執行doExecute時,初始化該操作器並啟動。另外一種就是直接實現一個方法,子類doExecute方法呼叫該方法進行。TransportBroadcastOperationAction就屬於前者,它實現了內部操作器AsyncBroadcastAction。TransportCountAction繼承於它,它doExecute方法如下所示:

@Override
    protected void doExecute(CountRequest request, ActionListener<CountResponse> listener) {
        request.nowInMillis = System.currentTimeMillis();
        super.doExecute(request, listener);
    }

呼叫父類別的doExecute方法,也就是TransportBroadcastOperationAction的方法,它的實現如下所示:

@Override
    protected void doExecute(Request request, ActionListener&lt;Response&gt; listener) {
        new AsyncBroadcastAction(request, listener).start();
    }

可以看到它初始化了AsyncBroadcastAction並啟動。AsyncBroadcastAction只是確定了操作的流程,及操作完成如何返回response,並未涉及到具體的操作邏輯。因為這些邏輯都在每個子action中實現,不同的action需要進行不同的操作。如count需要count每個shard並且返回最後的總數值,而IndexExistAction則需要對比所有索引檢視查詢的索引是否存在。start方法的程式碼如下所示:

public void start() {
      //沒有shards
            if (shardsIts.size() == 0) {
                // no shards
                try {
                    listener.onResponse(newResponse(request, new AtomicReferenceArray(0), clusterState));
                } catch (Throwable e) {
                    listener.onFailure(e);
                }
                return;
            }
            request.beforeStart();
            // count the local operations, and perform the non local ones
            int shardIndex = -1;
       //遍歷對每個shards進行操作
            for (final ShardIterator shardIt : shardsIts) {
                shardIndex++;
                final ShardRouting shard = shardIt.nextOrNull();
                if (shard != null) {
                    performOperation(shardIt, shard, shardIndex);
                } else {
                    // really, no shards active in this group
                    onOperation(null, shardIt, shardIndex, new NoShardAvailableActionException(shardIt.shardId()));
                }
            }
        }

start方法就是遍歷所有shards,如果shard存在則執行performOperation方法,在這個方法中會區分該請求能否在本機上進行,能執行則呼叫shardOperation方法得到結果。這個方法在這是抽象的,每個子類都有實現。否則傳送到對應的主機上。,如果shard為null則進行onOperation操作,遍歷該shard的其它副本看能否找到可以操作的shard。

performOperation程式碼

如下所示:

protected void performOperation(final ShardIterator shardIt, final ShardRouting shard, final int shardIndex) {
            if (shard == null) {//shard 為null丟擲異常
                // no more active shards... (we should not really get here, just safety)
                onOperation(null, shardIt, shardIndex, new NoShardAvailableActionException(shardIt.shardId()));
            } else {
                try {
                    final ShardRequest shardRequest = newShardRequest(shardIt.size(), shard, request);
                    if (shard.currentNodeId().equals(nodes.localNodeId())) {//shard在本地執行shardOperation方法,並通過onOperation方法封裝結果
                        threadPool.executor(executor).execute(new Runnable() {
                            @Override
                            public void run() {
                                try {
                                    onOperation(shard, shardIndex, shardOperation(shardRequest));
                                } catch (Throwable e) {
                                    onOperation(shard, shardIt, shardIndex, e);
                                }
                            }
                        });
                    } else {//不是本地shard,傳送到對應節點。
                        DiscoveryNode node = nodes.get(shard.currentNodeId());
                        if (node == null) {
                            // no node connected, act as failure
                            onOperation(shard, shardIt, shardIndex, new NoShardAvailableActionException(shardIt.shardId()));
                        } else {
                            transportService.sendRequest(node, transportShardAction, shardRequest, new BaseTransportResponseHandler&lt;ShardResponse&gt;() {
                                @Override
                                public ShardResponse newInstance() {
                                    return newShardResponse();
                                }
                                @Override
                                public String executor() {
                                    return ThreadPool.Names.SAME;
                                }
                                @Override
                                public void handleResponse(ShardResponse response) {
                                    onOperation(shard, shardIndex, response);
                                }
                                @Override
                                public void handleException(TransportException e) {
                                    onOperation(shard, shardIt, shardIndex, e);
                                }
                            });
                        }
                    }
                } catch (Throwable e) {
                    onOperation(shard, shardIt, shardIndex, e);
                }
            }
        }

方法shardOperation在countTransportAction的實現如下所示:

@Override
    protected ShardCountResponse shardOperation(ShardCountRequest request) throws ElasticsearchException {
        IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex());//
        IndexShard indexShard = indexService.shardSafe(request.shardId().id());
    //構造查詢context
        SearchShardTarget shardTarget = new SearchShardTarget(clusterService.localNode().id(), request.shardId().getIndex(), request.shardId().id());
        SearchContext context = new DefaultSearchContext(0,
                new ShardSearchLocalRequest(request.types(), request.nowInMillis(), request.filteringAliases()),
                shardTarget, indexShard.acquireSearcher("count"), indexService, indexShard,
                scriptService, cacheRecycler, pageCacheRecycler, bigArrays, threadPool.estimatedTimeInMillisCounter());
        SearchContext.setCurrent(context);
        try {
            // TODO: min score should move to be "null" as a value that is not initialized...
            if (request.minScore() != -1) {
                context.minimumScore(request.minScore());
            }
            BytesReference source = request.querySource();
            if (source != null &amp;&amp; source.length() &gt; 0) {
                try {
                    QueryParseContext.setTypes(request.types());
                    context.parsedQuery(indexService.queryParserService().parseQuery(source));
                } finally {
                    QueryParseContext.removeTypes();
                }
            }
            final boolean hasTerminateAfterCount = request.terminateAfter() != DEFAULT_TERMINATE_AFTER;
            boolean terminatedEarly = false;
            context.preProcess();
            try {
                long count;
                if (hasTerminateAfterCount) {//呼叫lucene的封裝介面執行查詢並返回結果
                    final Lucene.EarlyTerminatingCollector countCollector =
                            Lucene.createCountBasedEarlyTerminatingCollector(request.terminateAfter());
                    terminatedEarly = Lucene.countWithEarlyTermination(context.searcher(), context.query(), countCollector);
                    count = countCollector.count();
                } else {
                    count = Lucene.count(context.searcher(), context.query());
                }
                return new ShardCountResponse(request.shardId(), count, terminatedEarly);
            } catch (Exception e) {
                throw new QueryPhaseExecutionException(context, "failed to execute count", e);
            }
        } finally {
            // this will also release the index searcher
            context.close();
            SearchContext.removeCurrent();
        }
    }

可以看到這裡是每個action真正的邏輯實現。因為這裡涉及到index部分的內容,這裡就不詳細分析。後面關於index的分析會有涉及。這就是support action中的第一種實現。

master的相關操作

第二種就master的相關操作,因此沒有實現對應的操作類,而只是實現了一個方法。該方法的作用跟操作器作用相同,唯一的不同是它沒有操作器這麼多的變數, 而且它不是非同步的。master的操作需要實時進行,執行過程中需要阻塞某些操作,保證叢集狀態一致性。這裡就不再說明,請參考TransportMasterNodeOperationAction原碼。

總結

本篇概括說了support action,並以countTransportAction為例說明了support Action中的非同步操作器實現,最後簡單的分析了master的同步操作。因為這裡涉及到很多action不可能一一分析,有興趣可以參考對應的程式碼。而且這裡有以下index部分的內容,所以沒有更深入的分析。在後面分析完index的相關功能後,會挑出幾個重要的action做詳細分析。

以上就是elasticsearch資料資訊索引操作action support範例分析的詳細內容,更多關於elasticsearch資料資訊索引操作action support的資料請關注it145.com其它相關文章!


IT145.com E-mail:sddin#qq.com