首頁 > 軟體

springboot 整合hbase的範例程式碼

2022-04-11 13:02:20

前言

作為巨量資料家族中的重要一員,在巨量資料以及海量資料儲存方面,hbase具有重要的地方,本篇將從java對hbase的操作上,進行詳細的說明;

HBase 定義

HBase 是一種分散式、可延伸、支援海量資料儲存的 NoSQL 資料庫;

HBase 資料模型

從邏輯上來說,HBase 資料模型同關係型資料庫很類似,資料儲存在一張表中,有行有列。但從 HBase 的底層物理儲存結構(K-V)來看,HBase 更像是一個 multi-dimensional map;

物理儲存結構

資料模型

hbase中有幾個重要的與資料模型相關的術語,有必要做深入的瞭解;

1、Name Space

  • 名稱空間,類似於關係型資料庫的 DatabBase 概念,每個名稱空間下有多個表;
  • 自帶兩個預設的名稱空間,分別是 hbase 和 default,hbase 中存放的是 HBase 內建的表,default 表是使用者預設使用的名稱空間;

2、Region

  • Region 類似於關係型資料庫的表概念;
  • HBase 定義表時只需要宣告列族即可,不需要宣告具體的列;
  • 往 HBase 寫入資料時,欄位可以動態、按需指定;

因此,和關係型資料庫相比,HBase 能夠輕鬆應對欄位變更的場景

3、Row

HBase 表中的每行資料都由一個 RowKey 和多個 Column(列)組成,資料是按照 RowKey的字典序儲存,且查詢資料時只能根據 RowKey 進行檢索,所以RowKey 的設計十分重要;

4、Column

  • HBase 中的每個列都由 Column Family(列族)和 Column Qualifier(列限定符)進行限定,例如 info:name,info:age;
  • 建表時,只需指明列族,而列限定符無需預先定義;

5、Time Stamp

  • 用於標識資料的不同版本(version);
  • 每條資料寫入時,如果不指定時間戳,系統會自動為其加上該欄位,其值為寫入HBase 的時間;

6、Cell

  • 由{rowkey, column Family:column Qualifier, time Stamp} 唯一確定的單元;
  • cell 中的資料是沒有型別的,全部是位元組碼形式儲存;

window環境下快速搭建 hbase執行環境

在小編之前的某一篇中,分享了基於centos7環境搭建hbase的單機執行環境,本篇為方便演示,在windows下快速搭建一個hbase的執行環境;

搭建步驟

1、官網下載安裝包;

hadoop 3.1.0 以及 hbase 1.3.1

2、設定hadoop環境變數

並加入到系統path中,

3、修改 hbase-env.cmd組態檔

進入hbase解壓後的onfig目錄下,在 hbase-env.cmd 新增如下的設定,即設定hbase依賴的Java環境以及自身的設定目錄;

set HBASE_MANAGES_ZK=false
set JAVA_HOME=C:Program FilesJavajdk1.8.0_171
set HBASE_CLASSPATH=E:bigData-toolhbase-1.3.1conf

4、修改hbase-site.xml 檔案

進入hbase解壓後的onfig目錄下,將下面的組態檔新增到hbase-site.xml 設定中

<configuration>
    <property>  
        <name>hbase.rootdir</name>  
        <value>file:///E:/bigData-tool/hbase-1.3.1/root</value>  
    </property>  
    <property>  
        <name>hbase.tmp.dir</name>  
        <value>E:/bigData-tool/hbase-1.3.1/tem</value>  
    </property>  
    <property>  
        <name>hbase.zookeeper.quorum</name>  
        <value>127.0.0.1</value>  
    </property>  
    <property>  
        <name>hbase.zookeeper.property.dataDir</name>  
        <value>E:/bigData-tool/hbase-1.3.1/zoo</value>  
    </property>  
    <property>  
        <name>hbase.cluster.distributed</name>  
        <value>false</value>  
     </property>  
</configuration>

5、啟動hbase服務

進入bin目錄下,在cmd視窗中執行下面的啟動指令碼啟動

啟動成功後,可以通過瀏覽器控制檯檢視hbase服務資訊

6、hbase使用者端測試

服務啟動之後,在bin目錄下,通過hbase提供的shell使用者端操作命令測試下服務,進入bin目錄下,直接cmd輸入 hbase shell 即可

輸入 list命令,檢視下當前所有的表

到此為主,所有的準備工作就完成了,下面讓我們通過hbase提供的Java使用者端SDK來看看如何操作habse資料庫吧;

Java API詳細使用

1、匯入使用者端依賴

		<dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>1.3.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase</artifactId>
            <version>1.3.1</version>
        </dependency>

2、DDL相關操作

和ddl相關的包括,判斷表是否存在,建立表,建立名稱空間,刪除表,刪除名稱空間;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.HBaseAdmin;

import java.io.IOException;

public class DDlTest {

    public static Connection connection = null;
    public static Admin admin = null;

    static {
        Configuration conf = HBaseConfiguration.create();
        //使用 HBaseConfiguration 的單例方法範例化
        conf = HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum", "127.0.0.1");
        conf.set("hbase.zookeeper.property.clientPort", "2181");
        try {
            connection = ConnectionFactory.createConnection(conf);
            admin = connection.getAdmin();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {

    }

    /**
     * 判斷表是否存在
     * @param tableName
     * @return
     * @throws Exception
     */
    public static boolean isTableExistV1(String tableName) throws Exception {
        HBaseConfiguration conf = new HBaseConfiguration();
        conf.set("hbase.zookeeper.quorum", "127.0.0.1");
        conf.set("hbase.zookeeper.property.clientPort", "2181");
        HBaseAdmin admin = new HBaseAdmin(conf);
        boolean tableExists = admin.tableExists(tableName);
        admin.close();
        return tableExists;
    }

    /**
     * 判斷表是否存在
     * @param tableName
     * @return
     * @throws Exception
     */
    public static boolean isTableExistV2(String tableName) throws Exception {
        Configuration conf = HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum", "127.0.0.1");
        conf.set("hbase.zookeeper.property.clientPort", "2181");
        Connection connection = ConnectionFactory.createConnection(conf);
        Admin admin = connection.getAdmin();
        boolean result = admin.tableExists(TableName.valueOf(tableName));
        admin.close();
        return result;
    }

    public static boolean isTableExistV3(String tableName) throws Exception {
        boolean result = admin.tableExists(TableName.valueOf(tableName));
        return result;
    }

    /**
     * 建立表
     * @param tableName 表名
     * @param columnFamily 列簇名
     * @throws Exception
     */
    public static void createTable(String tableName, String... columnFamily) throws Exception {

        if (columnFamily.length <= 0) {
            System.out.println("請傳入列簇資訊");
        }
        //判斷表是否存在
        if (isTableExistV3(tableName)) {
            System.out.println("表" + tableName + "已存在");
            close();
            return;
        }
        //建立表屬性物件,表名需要轉位元組
        HTableDescriptor descriptor = new HTableDescriptor(TableName.valueOf(tableName));

        //建立多個列族
        for (String cf : columnFamily) {
            descriptor.addFamily(new HColumnDescriptor(cf));
        }
        //根據對錶的設定,建立表
        admin.createTable(descriptor);
        System.out.println("表" + tableName + "建立成功!");
        close();
    }

    /**
     * 刪除表
     * @param tableName
     */
    public static void dropTable(String tableName) throws Exception{
        if (!isTableExistV3(tableName)) {
            System.out.println(tableName + ": 不存在 !" );
            return;
        }
        //1、下線表
        admin.disableTable(TableName.valueOf(tableName));
        //2、刪除表
        admin.deleteTable(TableName.valueOf(tableName));
        System.out.println("刪除表成功");
        close();
    }

    /**
     * 建立名稱空間
     * @param nameSpace
     */
    public static void createNameSpace(String nameSpace){
        if(nameSpace == null){
            System.out.println(nameSpace + ": 不存在 !" );
            return;
        }
        NamespaceDescriptor namespaceDescriptor = NamespaceDescriptor.create(nameSpace).build();
        try {
            admin.createNamespace(namespaceDescriptor);
        } catch (NamespaceExistException e){
            System.out.println("名稱空間已存在");
        }
        catch (IOException e) {
            e.printStackTrace();
        }
        System.out.println(nameSpace + ": 名稱空間建立成功");
    }

    public static void close() {
        if (admin != null) {
            try {
                admin.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        if (connection != null) {
            try {
                connection.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

我們選取其中一個判斷表是否存在的方法做一下測試,觀察控制檯輸出結果,其他的方法有興趣的同學可以依次做測試即可;

3、DML相關操作

和DML操作相關的主要包括表資料的增刪改查,相對來說,在實際開發中,DML的操作,尤其是資料查詢,可能使用的更加頻繁,因此關於DML的操作務必要掌握;

package com.congge.test;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;

import java.io.IOException;

public class DMLTest {

    public static Connection connection = null;
    public static Admin admin = null;

    static {
        Configuration conf = HBaseConfiguration.create();
        //使用 HBaseConfiguration 的單例方法範例化
        conf = HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum", "127.0.0.1");
        conf.set("hbase.zookeeper.property.clientPort", "2181");
        try {
            connection = ConnectionFactory.createConnection(conf);
            admin = connection.getAdmin();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) throws Exception {
        //System.out.println(isTableExistV1("user"));
        //System.out.println(isTableExistV3("user"));
        //createTable("stu","info1","info2");
        //給表put資料
        //putData("stu","1005","info1","name","wangwu");
        //putData("stu","1003","info1","name","q7");
        //獲取表資料
        //System.out.println("----------");
        //getData("stu","1005","","");
        //獲取資料【scan的方式】
        //getDataFromScan("stu");
        //dropTable("stu");
        //createNameSpace("0409");

        deleteData("stu","1005","","");
        close();
    }

    public static boolean isTableExistV3(String tableName) throws Exception {
        boolean result = admin.tableExists(TableName.valueOf(tableName));
        return result;
    }

    /**
     *
     * @param tableName 表名
     * @param rowKey    rowKey
     * @param cf        columnFamily
     * @param cn        columnName
     * @param value     columnValue
     */
    public static void putData(String tableName,String rowKey,String cf,String cn,String value) throws Exception{

        //1、獲取表物件
        Table table = connection.getTable(TableName.valueOf(tableName));

        //2、拼接 put物件
        Put put = new Put(Bytes.toBytes(rowKey));

        //3、新增 欄位資訊 column
        put.addColumn(Bytes.toBytes(cf),Bytes.toBytes(cn),Bytes.toBytes(value));

        //4、執行資料插入
        table.put(put);

        System.out.println("資料插入成功");
    }

    /**
     * 獲取資料
     * @param tableName
     * @param rowKey
     * @param cf
     * @param cn
     */
    public static void getData(String tableName,String rowKey,String cf,String cn) throws Exception{
        Table table = connection.getTable(TableName.valueOf(tableName));
        Get get = new Get(Bytes.toBytes(rowKey));
        //新增 cf【也可以不新增】
        //get.addFamily(Bytes.toBytes(cf));
        // 同時傳入 cf 和 cn
        if(StringUtils.isNotEmpty(cf) && StringUtils.isNotEmpty(cn)){
            get.addColumn(Bytes.toBytes(cf),Bytes.toBytes(cn));
        }
        Result result = table.get(get);
        //解析結果
        Cell[] cells = result.rawCells();
        for(Cell cell : cells){
            System.out.println("cf : " + Bytes.toString(CellUtil.cloneFamily(cell)));
            System.out.println("cn : " + Bytes.toString(CellUtil.cloneQualifier(cell)));
            System.out.println("value : " + Bytes.toString(CellUtil.cloneValue(cell)));
        }
    }

    /**
     * 通過掃描的方式獲取資料
     * @param tableName
     */
    public static void getDataFromScan(String tableName) throws Exception{
        Table table = connection.getTable(TableName.valueOf(tableName));

        //拿到掃描器物件
        //Scan scan = new Scan();
        //可以根據 rowkey繼續獲取【非必須】
        Scan scan = new Scan(Bytes.toBytes("1001"),Bytes.toBytes("1003"));
        ResultScanner resultScanner = table.getScanner(scan);

        //結果解析
        for(Result result : resultScanner){
            Cell[] cells = result.rawCells();
            for(Cell cell : cells){
                System.out.println("rowkey : " + Bytes.toString(CellUtil.cloneRow(cell)));
                System.out.println("cf : " + Bytes.toString(CellUtil.cloneFamily(cell)));
                System.out.println("cn : " + Bytes.toString(CellUtil.cloneQualifier(cell)));
                System.out.println("value : " + Bytes.toString(CellUtil.cloneValue(cell)));
            }
        }
    }

    /**
     * 刪除資料
     * @param tableName
     * @param rowKey
     * @param cf
     * @param cn
     * @throws Exception
     */
    public static void deleteData(String tableName,String rowKey,String cf,String cn) throws Exception{
        Table table = connection.getTable(TableName.valueOf(tableName));
        Delete delete = new Delete(Bytes.toBytes(rowKey));
        //還可以傳入列簇,以及欄位名【非必須】
        if(StringUtils.isNotEmpty(cf) && StringUtils.isNotEmpty(cn)){
            delete.addColumn(Bytes.toBytes(cf),Bytes.toBytes(cn));
        }
        table.delete(delete);
        System.out.println("資料刪除成功");
    }

    public static void close() {
        if (admin != null) {
            try {
                admin.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        if (connection != null) {
            try {
                connection.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

}

下面選擇幾個方法做一下測試,觀察下效果如何,

插入資料與查詢資料

其中關於查詢資料,其API很靈活,可以只傳入 rowKey,也可以進一步傳入 列簇以及指定欄位名稱查詢;

刪除資料測試

更多的方法有興趣的同學可以一一測試,限於篇幅,這裡就不繼續展開了;

Hbase與springboot整合

下面演示下在web應用中,與springboot的整合過程

1、匯入springboot依賴

		<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

2、新增一個工具類

使用該工具類,完成對hbase的一系列的增刪查改

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

public class HBaseService {

    private Logger log = LoggerFactory.getLogger(HBaseService.class);

    private Admin admin = null;

    private Connection connection = null;

    public HBaseService(Configuration conf) {
        try {
            connection = ConnectionFactory.createConnection(conf);
            admin = connection.getAdmin();
        } catch (IOException e) {
            log.error("獲取HBase連線失敗!");
        }
    }

    public Map<String,String> getData(String tableName,String rowKey,String cf,String cn) throws Exception{

        Map<String,String> resultMap = new HashMap<>();
        Table table = connection.getTable(TableName.valueOf(tableName));
        Get get = new Get(Bytes.toBytes(rowKey));
        //新增 cf【也可以不新增】
        //get.addFamily(Bytes.toBytes(cf));

        // 同時傳入 cf 和 cn
        //get.addColumn(Bytes.toBytes(cf),Bytes.toBytes(cn));

        Result result = table.get(get);
        //解析結果
        Cell[] cells = result.rawCells();
        for(Cell cell : cells){
            String columnFamilyName = Bytes.toString(CellUtil.cloneFamily(cell));
            System.out.println("columnFamilyName : " + columnFamilyName);
            String colName = Bytes.toString(CellUtil.cloneQualifier(cell));
            System.out.println("colName : " + colName);
            String colValue = Bytes.toString(CellUtil.cloneValue(cell));
            System.out.println("colValue : " + colValue);
            resultMap.put(colName,colValue);
        }
        return resultMap;
    }

}

將該類新增到spring容器中,方便後續其他類注入

import com.congge.service.HBaseService;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class HBaseConfig {

    @Bean
    public HBaseService getHbaseService() {
        //設定臨時的hadoop環境變數,之後程式會去這個目錄下的bin目錄下找winutils.exe工具,windows連線hadoop時會用到
        //System.setProperty("hadoop.home.dir", "D:\Program Files\Hadoop");
        //執行此步時,會去resources目錄下找相應的組態檔,例如hbase-site.xml
        org.apache.hadoop.conf.Configuration conf = HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum", "127.0.0.1");
        conf.set("hbase.zookeeper.property.clientPort", "2181");
        return new HBaseService(conf);
    }

}

注意,在實際開發中,連線zk的資訊可以通過外部組態檔讀取進來;

3、編寫一個測試使用的controller類

import com.congge.service.HBaseService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.Map;

@RestController
public class HbaseController {

    @Autowired
    private HBaseService hBaseService;

    @GetMapping("/getData")
    public Map<String,String> getData(String tableName, String rowKey, String cf, String cn) throws Exception{
        return hBaseService.getData(tableName,rowKey,cf,cn);
    }

}

在該類中,有一個獲取單行資料的方法,啟動工程,瀏覽器存取介面:

http://localhost:8087/getData?tableName=stu&rowKey=1002

本篇詳細總結了hbase的Java使用者端的使用,在實際開發過程中,還需要結合自身的情況做更加細緻的整合與優化,本篇到此結束,感謝觀看!

到此這篇關於springboot 整合hbase的範例程式碼的文章就介紹到這了,更多相關springboot 整合hbase內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!


IT145.com E-mail:sddin#qq.com