首頁 > 軟體

elasticsearch叢集發現zendiscovery的Ping機制分析

2022-04-21 19:00:31

 zenDiscovery實現機制

ping是叢集發現的基本手段,通過在網路上廣播或者指定ping某些節點獲取叢集資訊,從而可以找到叢集的master加入叢集。zenDiscovery實現了兩種ping機制:廣播與單播。本篇將詳細分析一些這MulticastZenPing機制的實現為後面的叢集發現和master選舉做好鋪墊。

廣播的過程

首先看一下廣播(MulticastZenPing),廣播的原理很簡單,節點啟動後向網路傳送廣播資訊,任何收到的節點只要叢集名字相同都應該對此廣播資訊作出迴應。這樣該節點就獲取了叢集的相關資訊。它定義了一個action:"internal:discovery/zen/multicast"和廣播的資訊頭:INTERNAL_HEADER 。之前說過NettyTransport是cluster通訊的基礎,但是廣播卻沒有使它。它使用了java的MulticastSocket。這裡簡單的介紹一下MulticastSocket的使用。它是一個UDP 機制的socket,用來進行多個封包的廣播。它可以幫到一個ip形成一個group,任何MulticastSocket都可以join進來,組內的socket傳送的資訊會被訂閱了改組的所有機器接收到。elasticsearch對其進行了封裝形成了MulticastChannel,有興趣可以參考相關原始碼。 

首先看一下MulticastZenPing的幾個輔助內部類:

它總共定義了4個內部類,這些內部類和它一起完成廣播功能。FinalizingPingCollection是一pingresponse的容器,所有的響應都用它來儲存。MulticastPingResponseRequestHandler它是response處理類,類似於之前所說的nettytransportHandler,它雖然使用的不是netty,但是它也定義了一個messageReceived的方法,當收到請求時直接返回一個response。

MulticastPingResponse就不用細說了,它就是一個響應類。最後要著重說一下Receiver類,因為廣播並不是使用NettyTransport,因此對於訊息處理邏輯都在Receiver中。在初始化MulticastZenPing時會將receiver註冊進去。

protected void doStart() throws ElasticsearchException {
        try {
            ....
            multicastChannel = MulticastChannel.getChannel(nodeName(), shared,
                    new MulticastChannel.Config(port, group, bufferSize, ttl, networkService.resolvePublishHostAddress(address)),
                    new Receiver());//將receiver註冊到channel中
        } catch (Throwable t) {
          ....
        }
    }

Receiver類基礎了Listener,實現了3個方法,訊息經過onMessage方法區分,如果是內部ping則使用handleNodePingRequest方法處理,否則使用handleExternalPingRequest處理,區分方法很簡單,就是讀取資訊都看它是否符合所定義的INTERNAL_HEADER 資訊頭。

nodeping處理程式碼

private void handleNodePingRequest(int id, DiscoveryNode requestingNodeX, ClusterName requestClusterName) {
           ....
            final DiscoveryNodes discoveryNodes = contextProvider.nodes();
            final DiscoveryNode requestingNode = requestingNodeX;
            if (requestingNode.id().equals(discoveryNodes.localNodeId())) {
                // 自身發出的ping,忽略
                return;
            }
        //只接受本叢集ping
            if (!requestClusterName.equals(clusterName)) {
            ...return;
            }
            // 兩個client間不需要ping
            if (!discoveryNodes.localNode().shouldConnectTo(requestingNode)) {return;
            }
        //新建一個response
            final MulticastPingResponse multicastPingResponse = new MulticastPingResponse();
            multicastPingResponse.id = id;
            multicastPingResponse.pingResponse = new PingResponse(discoveryNodes.localNode(), discoveryNodes.masterNode(), clusterName, contextProvider.nodeHasJoinedClusterOnce());
        //無法連線的情況
            if (!transportService.nodeConnected(requestingNode)) {
                // do the connect and send on a thread pool
                threadPool.generic().execute(new Runnable() {
                    @Override
                    public void run() {
                        // connect to the node if possible
                        try {
                            transportService.connectToNode(requestingNode);
                            transportService.sendRequest(requestingNode, ACTION_NAME, multicastPingResponse, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
                                @Override
                                public void handleException(TransportException exp) {
                                    logger.warn("failed to receive confirmation on sent ping response to [{}]", exp, requestingNode);
                                }
                            });
                        } catch (Exception e) {
                            if (lifecycle.started()) {
                                logger.warn("failed to connect to requesting node {}", e, requestingNode);
                            }
                        }
                    }
                });
            } else {
                transportService.sendRequest(requestingNode, ACTION_NAME, multicastPingResponse, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
                    @Override
                    public void handleException(TransportException exp) {
                        if (lifecycle.started()) {
                            logger.warn("failed to receive confirmation on sent ping response to [{}]", exp, requestingNode);
                        }
                    }
                });
            }
        }
    }

另外的一個方法是處理外部ping資訊,處理過程是返回cluster的資訊(這種外部ping的具體作用沒有研究不是太清楚)。以上是響應MulticastZenPing的過程,收到其它節點的響應資訊後它會把本節點及叢集的master節點相關資訊返回給廣播節點。這樣廣播節點就獲知了叢集的相關資訊。在MulticastZenPing類中還有一個類 MulticastPingResponseRequestHandler,它的作用是廣播節點對其它節點對廣播資訊響應的迴應,廣播節點的第二次傳送資訊的過程。它跟其它TransportRequestHandler一樣它有messageReceived方法,在啟動時註冊到transportserver中,只處理一類action:"internal:discovery/zen/multicast"。

ping請求的傳送策略

程式碼如下:

public void ping(final PingListener listener, final TimeValue timeout) {
       ....
    //產生一個id
        final int id = pingIdGenerator.incrementAndGet();
        try {
            receivedResponses.put(id, new PingCollection());
            sendPingRequest(id);//第一次傳送ping請求
            // 等待時間的1/2後再次傳送一個請求
            threadPool.schedule(TimeValue.timeValueMillis(timeout.millis() / 2), ThreadPool.Names.GENERIC, new AbstractRunnable() {
                @Override
                public void onFailure(Throwable t) {
                    logger.warn("[{}] failed to send second ping request", t, id);
                    finalizePingCycle(id, listener);
                }
                @Override
                public void doRun() {
                    sendPingRequest(id);
            //再過1/2時間再次傳送一個請求
                    threadPool.schedule(TimeValue.timeValueMillis(timeout.millis() / 2), ThreadPool.Names.GENERIC, new AbstractRunnable() {
                        @Override
                        public void onFailure(Throwable t) {
                            logger.warn("[{}] failed to send third ping request", t, id);
                            finalizePingCycle(id, listener);
                        }
                        @Override
                        public void doRun() {
                            // make one last ping, but finalize as soon as all nodes have responded or a timeout has past
                            PingCollection collection = receivedResponses.get(id);
                            FinalizingPingCollection finalizingPingCollection = new FinalizingPingCollection(id, collection, collection.size(), listener);
                            receivedResponses.put(id, finalizingPingCollection);
                            logger.trace("[{}] sending last pings", id);
                            sendPingRequest(id);
                //最後一次傳送請求,超時的1/4後
                            threadPool.schedule(TimeValue.timeValueMillis(timeout.millis() / 4), ThreadPool.Names.GENERIC, new AbstractRunnable() {
                                @Override
                                public void onFailure(Throwable t) {
                                    logger.warn("[{}] failed to finalize ping", t, id);
                                }
                                @Override
                                protected void doRun() throws Exception {
                                    finalizePingCycle(id, listener);
                                }
                            });
                        }
                    });
                }
            });
        } catch (Exception e) {
            logger.warn("failed to ping", e);
            finalizePingCycle(id, listener);
        }
    }

傳送過程主要是呼叫sendPingRequest(id)方法,在該方法中會將id,資訊頭,版本,本地節點資訊一起寫入到BytesStreamOutput中然後將其進行廣播,這個廣播資訊會被其它機器上的Receiver接收並處理,並且響應該ping請求。另外一個需要關注的是以上加說明的部分,它通過鏈時的定期傳送請求,在等待時間內可能會發出4次請求,這種傳送方式會造成大量的ping請求重複,幸好ping的資源消耗小,但是好處是可以儘可能保證在timeout這個時間段內叢集的新增節點都能收到這個ping資訊。在單播中也採用了該策略。

總結

廣播的過程:廣播使用的是jdk的MulticastSocket,在timeout時間內4次發生ping請求,ping請求包括一個id,資訊頭,本地節點的一些資訊;這些資訊在其它節點中被接收到交給Receiver處理,Receiver會將叢集的master和本機的相關資訊通過transport返回給廣播節點。廣播節點收到這些資訊後會理解使用transport返回一個空的response。至此一個廣播過程完成。

在節點分佈在多個網段時,廣播就失效了,因為廣播資訊不可達。這個時間就需要使用單播去ping指定的節點獲取cluster的相關資訊。這就是單播的用處。單播使用的是NettyTransport,它會使用跟廣播一樣的鏈式請求向指定的節點傳送請求。資訊的處理方式是之前所介紹的NettyTransport標準的資訊處理過程。

以上就是elasticsearch叢集發現zendiscovery的Ping機制分析的詳細內容,更多關於elasticsearch叢集發現zendiscovery Ping的資料請關注it145.com其它相關文章!


IT145.com E-mail:sddin#qq.com