首頁 > 軟體

HBase的協處理器編碼實戰

2020-06-16 17:25:20

1 協處理器簡介

        如果要統計Hbase中的資料,比如統計某個欄位的最大值、統計滿足某種條件的記錄數、統計各種記錄的特點並按照記錄特點分類等等,常規的做法是把HBase中整個表的資料Scan出來,或者加一個Filter,進行一些初步的過濾,然後在用戶端進行統計處理。但是這麼做會有很大的副作用,比如占用大量的網路頻寬(巨量資料量尤為明顯),RPC的壓力也是不容小覷的。

        HBase作為列式資料庫最經常被人詬病的特性包括:無法輕易建立“二級索引”,難以執行求和、計數、排序等操作。比如,在舊版本的(<0.92)HBase中,統計資料表的總行數,需要使用Counter方法,執行一次MapReduce Job才能得到。雖然HBase在資料儲存層中整合了MapReduce,能夠有效進行資料表的分散式計算,然而在很多情況下,做一些簡單的相加或者聚合計算的時候,如果直接將計算過程放置在server端,能夠減少網路開銷,從而獲得很好的效能提升。於是,HBase在0.92之後引入了協處理器(coprocessors),實現了一些激動人心的新特性:能夠輕易建立二次索引、複雜過濾器以及存取控制等。

        簡單理解來說,協處理器是HBase讓使用者的部分邏輯在資料存放端即HBase伺服器端進行計算的機制,它允許使用者在HBase伺服器端執行自己的程式碼。

2 協處理器的分類

        協處理器分為兩種型別:系統協處理器可以全域性匯入Region Server上的所有資料表,表協處理器是使用者可以指定一張表使用的協處理器。協處理器框架為了更好支援其行為的靈活性,提供了兩個不同方面的外掛。一個是觀察者(Observer),類似於關聯式資料庫的觸發器。另一個是終端(Endpoint),動態的終端有點像儲存過程。

        Observer的設計意圖是允許使用者通過插入程式碼來過載協處理器框架的upcall方法,而具體的事件觸發的callback方法由HBase的核心程式碼來執行。協處理器框架處理所有的callback呼叫細節,協處理器自身只需要插入新增或者改變的功能。

        Endpoint是動態RPC外掛的介面,它的實現程式碼被安裝在伺服器端,從而能夠通過HBase RPC喚醒。用戶端類庫提供了非常方便的方法來呼叫這些動態介面,它們可以在任意時候呼叫一個終端,它們的實現程式碼會被目標Region遠端執行,結果會返回到終端。使用者可以結合使用這些強大的外掛介面,為HBase新增全新的特性。

3 Protocol Buffer的使用

        由於下面的Endpoint編碼範例使用了Google公司的混合語言資料標準Protocol Buffer,所以首先了解一下這個常用於RPC系統的工具。

3.1 ProtocolBuffer介紹

        Protocol Buffer是一種輕便高效的結構化資料儲存格式,可以用於結構化資料序列化,很適合做資料儲存或RPC資料交換格式。它可用於通訊協定、資料儲存等領域的語言無關、平台無關、可延伸的序列化結構資料格式。目前提供了C++、Java、Python三種語言的 API。

       為什麼要使用Protocol Buffer呢?先看一個在實際開發中經常會遇到的系統場景:我們的用戶端程式是使用Java開發的,可能執行自不同的平台,如Linux、Windows或者是Android,而我們的伺服器程式通常是基於Linux平台並使用C++開發完成的。在這兩種程式之間進行資料通訊時存在多種方式用於設計訊息格式,如:

       1、直接傳遞C/C++語言中位元組對齊的結構體資料,只要結構體的宣告為定長格式,那麼該方式對於C/C++程式而言就非常方便了,僅需將接收到的資料按照結構體型別強行轉換即可。事實上對於變長結構體也不會非常麻煩。在傳送資料時,也只需定義一個結構體變數並設定各個成員變數的值之後,再以char*的方式將該二進位制資料傳送到遠端。反之,該方式對於Java開發者而言就會非常繁瑣,首先需要將接收到的資料存於ByteBuffer之中,再根據約定的位元組序逐個讀取每個欄位,並將讀取後的值再賦值給另外一個值物件中的域變數,以便於程式中其他程式碼邏輯的編寫。對於該型別程式而言,聯調的基準是必須用戶端和伺服器雙方均完成了訊息報文構建程式的編寫後才能展開,而該設計方式將會直接導致Java程式開發的進度過慢。即便是Debug階段,也會經常遇到Java程式中出現各種域欄位拼接的小錯誤。

       2、使用SOAP協定(WebService)作為訊息報文的格式載體,由該方式生成的報文是基於文字格式的,同時還存在大量的XML描述資訊,因此將會大大增加網路IO的負擔。又由於XML解析的複雜性,這也會大幅降低報文解析的效能。總之,使用該設計方式將會使系統的整體執行效能明顯下降。

       對於以上兩種方式所產生的問題,Protocol Buffer均可以很好的解決,不僅如此,Protocol Buffer還有一個非常重要的優點就是可以保證同一訊息報文新舊版本之間的相容性。

3.2 安裝Protocol Buffer

// 在https://developers.google.com/protocol-buffers/docs/downloads下載protobuf-2.6.1.tar.gz後解壓至指定目錄

$ tar -xvf protobuf-2.6.1.tar.gz -C app/

// 刪除壓縮包

$ rm protobuf-2.6.1.tar.gz

// 安裝c++編譯器相關包

$ sudo apt-get install g++

// 編譯安裝protobuf

$ cd app/protobuf-2.6.1/

$ ./configure

$ make

$ make check

$ sudo make install

// 新增到lib

$ vim ~/.bashrc

export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/usr/local/lib

$ source ~/.bashrc

// 驗證是否安裝成功

$ protoc --version

3.3 編寫proto檔案

       首先需要編寫一個 proto 檔案,定義程式中需要處理的結構化資料。proto 檔案非常類似java或者C語言的資料定義。如下程式碼給出了範例中定義RPC介面的 endpoint.proto檔案內容:

// 定義常用選項
option java_package = "com.hbase.demo.endpoint"; //指定生成Java程式碼的包名
option java_outer_classname = "Sum"; //指定生成Java程式碼的外部類名稱
option java_generic_services = true; //基於服務定義產生抽象服務程式碼
option optimize_for = SPEED; //指定優化級別
// 定義請求包
message SumRequest {
 required string family = 1; //列族
 required string column = 2; //列名
}
// 定義回復包
message SumResponse {
 required int64 sum = 1 [default = 0]; //求和結果
}
// 定義RPC服務
service SumService {
 //獲取求和結果
 rpc getSum(SumRequest)
  returns (SumResponse);
}

3.4 編譯proto檔案

 

// 將proto檔案編譯生成java程式碼

$ protoc endpoint.proto --java_out=./

// 生成的檔案Sum.java如下圖所示:

 

 

4 Endpoint編碼範例

 

       業務邏輯如求和、排序等功能放在伺服器端,在伺服器端完成計算後將結果傳送給用戶端,可以減少資料的傳輸量。下面的範例將在HBase的伺服器端生成一個RPC服務,即在伺服器端對指定錶的指定列值進行求和計算,並將計算結果返回給用戶端。用戶端呼叫該RPC服務,獲取響應結果後輸出。

4.1 伺服器端程式碼

       首先,將通過Protocol Buffer生成的RPC介面檔案Sum.java匯入專案,然後在專案中新建類SumEndPoint編寫伺服器端程式碼:

package com.hbase.demo.endpoint;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.Hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.protobuf.ResponseConverter;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.util.Bytes;

import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController;
import com.google.protobuf.Service;
import com.hbase.demo.endpoint.Sum.SumRequest;
import com.hbase.demo.endpoint.Sum.SumResponse;
import com.hbase.demo.endpoint.Sum.SumService;
 
/**
 * @author developer
 * 說明:hbase協處理器endpooint的伺服器端程式碼
 * 功能:繼承通過protocol buffer生成的rpc介面,在伺服器端獲取指定列的資料後進行求和操作,最後將結果返回用戶端
 */
public class SumEndPoint extends SumService implements Coprocessor,CoprocessorService {
   
    private RegionCoprocessorEnvironment env;  // 定義環境
   
    @Override
    public Service getService() {
        return this;
    }

    @Override
    public void getSum(RpcController controller, SumRequest request, RpcCallback<SumResponse> done) {
        // 定義變數
        SumResponse response = null;
        InternalScanner scanner = null;
        // 設定掃描物件
        Scan scan = new Scan();
        scan.addFamily(Bytes.toBytes(request.getFamily()));
        scan.addColumn(Bytes.toBytes(request.getFamily()), Bytes.toBytes(request.getColumn()));
        // 掃描每個region,取值後求和
        try {
            scanner = env.getRegion().getScanner(scan);
            List<Cell> results = new ArrayList<Cell>();
            boolean hasMore = false;
            Long sum = 0L;
            do {
                hasMore = scanner.next(results);
                for (Cell cell : results) {
                    sum += Long.parseLong(new String(CellUtil.cloneValue(cell)));
                }
                results.clear();
            } while (hasMore);
            // 設定返回結果
            response = SumResponse.newBuilder().setSum(sum).build();
        } catch (IOException e) {
            ResponseConverter.setControllerException(controller, e);
        } finally {
            if (scanner != null) {
                try {
                    scanner.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
        // 將rpc結果返回給用戶端
        done.run(response);
    }
   
    // 協處理器初始化時呼叫的方法
    @Override
    public void start(CoprocessorEnvironment env) throws IOException {
        if (env instanceof RegionCoprocessorEnvironment) {
            this.env = (RegionCoprocessorEnvironment)env;
        } else {
            throw new CoprocessorException("no load region");
        }
    }
   
    // 協處理器結束時呼叫的方法
    @Override
    public void stop(CoprocessorEnvironment env) throws IOException {
       
    }

}

更多詳情見請繼續閱讀下一頁的精彩內容http://www.linuxidc.com/Linux/2016-12/138720p2.htm


IT145.com E-mail:sddin#qq.com