首頁 > 軟體

高可用架構etcd選主故障主備秒級切換實現

2022-02-24 22:01:40

什麼是Etcd?

etcd是一個強大的一致性的分散式鍵值儲存,它提供了一種可靠的方式來儲存需要由分散式系統或機器群存取的資料。它優雅地處理網路分割區期間的領導者選舉,並且可以容忍機器故障,即使在領導者節點中也是如此。從簡單的Web應用程式到Kubernetes,任何複雜的應用程式都可以讀取資料並將資料寫入etcd。這是官方對Etcd的描述,基於這些特性,Etcd常用於分散式設定、分散式鎖、分散式服務協調者以及分散式註冊。從功能上來說和zookeeper是一類專案,但是相比而言etcd更現代,etcd使用go語言開發,編譯後生成了系統可執行的二進位制產物,跨平臺性更好,更易維護。etcd直接提供http的介面,非常方便各大語言封裝自己的client sdk,在易用性方面也更好一點。下面也主要使用java的使用者端jetcd,解決主備服務的協調問題。

etcd官網:https://etcd.io

主備服務場景描述

很多時候為了服務的高可用,除了有個在工作的主服務外,還需要多啟用幾個備用服務,這樣,在主服務出現故障時,備用服務能夠馬上頂上。這個場景有個很明顯的特徵就是同一時間只能有一個主服務。常見的如mysql主從切換等,同一時間只能有一個msyql負責寫資料。在我們這邊的場景是,有一個binlog解析服務,實時解析mysql 的binlog,將解析到的資料傳遞到kafka中,kafka消費端有一個Flink job去消費解析的資料。最終這些資料會下層到資料中臺中,提供給中臺系統做基礎的業務資料。很多線上的服務查詢的資料就是來源binlog解析的資料,所以binlog解析的服務不能存在單點故障,在架構上只能是一主多備的模式,主服務故障時,備用服務實時頂上。同時binlog服務也不能同時多個解析。所以,這個場景使用etcd來做主備架構再好不過了。

jetcd具體實現

首先引入jetcd依賴

<dependency>
            <groupId>io.etcd</groupId>
            <artifactId>jetcd-core</artifactId>
            <version>0.3.0</version>
</dependency>

初始化使用者端

Client client = Client.builder().endpoints(
                "http://127.0.0.1:2379",
                "http://127.0.0.1:3379",
                "http://127.0.0.1:4379"
        ).build();

關鍵api介紹

Lock lock = client.getLockClient();
        Lease lease = client.getLeaseClient();
  • Lease提供授予,復原和保持租約的方法,其中有兩個關鍵方法grant(long ttl)和keepAlive()。grant用於授予租約,入參為租約的時間,即如果建立帶租約的key值,ttl秒後即自動刪除,返回租約的id。keepAlive()方法用於保持租約有效,即如果租約即將到期時,keepAlive能夠自動續租ttl時間。
  • Lock有兩個方法,lock(ByteSequence name, long leaseId)和unlock(ByteSequence lockKey)。來實現分散式鎖的功能,其中加鎖時,入參leaseid為續約物件的id,即定義了持有鎖的時間

通過這Lease和Lock的功能,很容易實現主備服務的切換。關鍵程式碼如下:

ByteSequence lockKey = ByteSequence.from("/root/lock", StandardCharsets.UTF_8);
        Lock lock = client.getLockClient();
        Lease lease = client.getLeaseClient();
        long leaseId = lease.grant(lockTTl).get().getID();
        lease.keepAlive(leaseId, new StreamObserver<LeaseKeepAliveResponse>() {
            @Override
            public void onNext(LeaseKeepAliveResponse value) {
                System.err.println("LeaseKeepAliveResponse value:" + value.getTTL());
            }
            @Override
            public void onError(Throwable t) { t.printStackTrace(); }
            @Override
            public void onCompleted() { }
        });
        lock.lock(lockKey, leaseId).get().getKey();
  • 首先申請授予續約獲取到leaseId,其中lockttl為1,單位秒,etcd的租約是秒級的。在這裡ttl的設定是有講究的,取決於當主服務故障時,你想多快讓從服務感知並頂上。當然,受限於etcd本身租約秒級限制,最快也只能是1秒。
  • 然後呼叫keepAlive方法,使授予到的leaseid保活,這樣,只要應用還存活就會自動續約
  • 接著呼叫lock方法,傳入leaseid。只有首次啟動的服務會獲取到鎖,而且在執行期間,會不斷的續約。當從服務執行到此處時,會阻塞住。這樣就能保證多個服務同時執行,只有一個服務真正工作的目的。當獲取到鎖的主服務出現問題時,原先的只有鎖的續約在1秒內就會到期,從服務會馬上獲取到鎖執行工作程式碼

完整的測試用例

/**
 * @author: kl @kailing.pub
 * @date: 2019/7/22
 */
public class JEtcdTest {
    private Client client;
    private Lock lock;
    private Lease lease;
    //單位:秒
    private long lockTTl = 1;
    private ByteSequence lockKey = ByteSequence.from("/root/lock", StandardCharsets.UTF_8);
    private ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(2);
    @Before
    public void setUp() {
         client = Client.builder().endpoints(
                "http://127.0.0.1:2379",
                "http://127.0.0.1:3379",
                "http://127.0.0.1:4379"
        ).build();
         lock = client.getLockClient();
         lease = client.getLeaseClient();
    }
    @Test
    public void lockTest1toMaster() throws InterruptedException, ExecutionException {
        long leaseId = lease.grant(lockTTl).get().getID();
         lease.keepAlive(leaseId, new StreamObserver<LeaseKeepAliveResponse>() {
             @Override
             public void onNext(LeaseKeepAliveResponse value) {
                 System.err.println("LeaseKeepAliveResponse value:"+ value.getTTL());
             }
             @Override
             public void onError(Throwable t) {
                 scheduledThreadPool.shutdownNow();
                 t.printStackTrace();
             }
             @Override
             public void onCompleted() {
                 scheduledThreadPool.shutdownNow();
             }
         });
        lock.lock(lockKey, leaseId).get().getKey();
        scheduledThreadPool.submit(() -> {
            while (true) {
                System.err.println("我是主服務開始工作了");
                TimeUnit.SECONDS.sleep(1);
            }
        });
        TimeUnit.DAYS.sleep(1);
    }
    @Test
    public void lockTest2toStandby() throws InterruptedException, ExecutionException {
        long leaseId = lease.grant(lockTTl).get().getID();
        lease.keepAlive(leaseId, new StreamObserver<LeaseKeepAliveResponse>() {
            @Override
            public void onNext(LeaseKeepAliveResponse value) {
                System.err.println("LeaseKeepAliveResponse value:"+ value.getTTL());
            }
            @Override
            public void onError(Throwable t) {
                scheduledThreadPool.shutdownNow();
                t.printStackTrace();
            }
            @Override
            public void onCompleted() {
                 scheduledThreadPool.shutdownNow();
            }
        });
        lock.lock(lockKey, leaseId).get().getKey();
        scheduledThreadPool.submit(() -> {
            while (true) {
                System.err.println("我是備用服務,我開始工作了,估計主服務已經掛了");
                TimeUnit.SECONDS.sleep(1);
            }
        });
        TimeUnit.DAYS.sleep(1);
    }
    @Test
    public void lockTest3toStandby() throws InterruptedException, ExecutionException {
        long leaseId = lease.grant(lockTTl).get().getID();
        lease.keepAlive(leaseId, new StreamObserver<LeaseKeepAliveResponse>() {
            @Override
            public void onNext(LeaseKeepAliveResponse value) {
                System.err.println("LeaseKeepAliveResponse value:"+ value.getTTL());
            }
            @Override
            public void onError(Throwable t) {
                scheduledThreadPool.shutdownNow();
                t.printStackTrace();
            }
            @Override
            public void onCompleted() {
                scheduledThreadPool.shutdownNow();
            }
        });
        lock.lock(lockKey, leaseId).get().getKey();
        scheduledThreadPool.submit(() -> {
            while (true) {
                System.err.println("我是備用服務,我開始工作了,估計主服務已經掛了");
                TimeUnit.SECONDS.sleep(1);
            }
        });
        TimeUnit.DAYS.sleep(1);
    }
}

上面測試用例模擬了一主兩備的高可用架構。分別執行lockTest1toMaster()、lockTest2toStandby()、lockTest3toStandby()服務,會發現只有一個服務會列印。然後手動關閉這個服務,從服務馬上會接著列印。在關閉這個從服務,另外一個從服務就會接著列印。很好的模擬了主備故障切換的效果。

以上就是高可用架構etcd選主故障主備秒級切換實現的詳細內容,更多關於etcd主備故障秒級切換的資料請關注it145.com其它相關文章!


IT145.com E-mail:sddin#qq.com