首頁 > 軟體

SpringBoot整合MaxCompute的範例程式碼

2022-08-12 14:00:44

1、SDK方式整合

使用odps-sdk-core整合, 官方檔案地址MaxCompute Java SDK介紹

1.1、依賴引入odps-sdk-core

<properties>
    <java.version>1.8</java.version>
    <!--maxCompute sdk 版本號-->
    <max-compute-sdk.version>0.40.8-public</max-compute-sdk.version>
</properties>

<dependencies>
  <!--max compute sdk-->
  <dependency>
    <groupId>com.aliyun.odps</groupId>
    <artifactId>odps-sdk-core</artifactId>
    <version>${max-compute-sdk.version}</version>
</dependency>
</dependencies>

1.2、編寫連線工具類

編寫MaxComputeSdkUtil以SDK方式連線MaxCompute

1.2.1、重要類和方法說明

1、連線引數類:

@Data
public class MaxComputeSdkConnParam {
    /**阿里雲accessId 相當於使用者名稱 */
    private String aliyunAccessId;
    /**阿里雲accessKey 相當於密碼 */
    private String aliyunAccessKey;
    /**阿里雲maxCompute服務介面地址 預設是http://service.odps.aliyun.com/api*/
    private String maxComputeEndpoint;
    /**專案名稱*/
    private String projectName;
}

2、查詢表後設資料資訊實體

主要是欄位:tableName, comment。還可以自己新增其他欄位

@Data
@NoArgsConstructor
@AllArgsConstructor
public class TableMetaInfo {
    /**表名稱*/
    private String tableName;
    /**表註釋*/
    private String comment;
}

3、公共方法(初始化)

/**預設的odps介面地址 在Odps中也可以看到該變數*/
private static final String defaultEndpoint = "http://service.odps.aliyun.com/api";
/**開啟全表掃描的設定*/
private static final String FULL_SCAN_CONFIG = "odps.sql.allow.fullscan";
/**分頁查詢sql模板*/
private static final String PAGE_SELECT_TEMPLATE_SQL = "select z.* from (%s) z limit %s, %s;";
/**分頁查詢統計數量模板SQL*/
private static final String PAGE_COUNT_TEMPLATE_SQL = "select count(1) from (%s) z;";
/**sdk的odps使用者端*/
private final Odps odps;

/**odps連線引數*/
private final MaxComputeSdkConnParam connParam;

public MaxComputeSdkUtil(MaxComputeSdkConnParam param){
    this.connParam = param;
    // 構建odps使用者端
    this.odps = buildOdps();
}

/**
 * 構建odps使用者端 用於執行sql等操作
 * @return odps使用者端
 */
private Odps buildOdps() {
    // 阿里雲賬號密碼  AccessId 和 AccessKey
    final String aliyunAccessId = connParam.getAliyunAccessId();
    final String aliyunAccessKey = connParam.getAliyunAccessKey();
    // 建立阿里雲賬戶
    final AliyunAccount aliyunAccount = new AliyunAccount(aliyunAccessId, aliyunAccessKey);

    // 使用阿里雲賬戶建立odps使用者端
    final Odps odps = new Odps(aliyunAccount);

    // 傳入了的話就是用傳入的 沒有傳入使用預設的
    final String endpoint = connParam.getMaxComputeEndpoint();
    try {
        odps.setEndpoint(ObjectUtils.isEmpty(endpoint) ? defaultEndpoint : endpoint);
    } catch (Exception e) {
        // 端點格式不正確
        throw new BizException(ResultCode.MAX_COMPUTE_ENDPOINT_ERR);
    }

    // 設定專案
    odps.setDefaultProject(connParam.getProjectName());
    return odps;
}

4、查詢表資訊

/**
 * 獲取表資訊
 */
public List<TableMetaInfo> getTableInfos(){
    final Tables tables = odps.tables();
    List<TableMetaInfo> resultTables = new ArrayList<>();
    try {
        for (Table table : tables) {
            // tableName
            final String name = table.getName();
            // 描述
            final String comment = table.getComment();
            final TableMetaInfo info = new TableMetaInfo(name, comment);
            resultTables.add(info);
        }
    } catch (Exception e) {
        e.printStackTrace();
        final String errMsg = ObjectUtils.isEmpty(e.getMessage()) ? "" : e.getMessage();
        if (errMsg.contains("ODPS-0410051:Invalid credentials")){
            throw new BizException(ResultCode.MAX_COMPUTE_UNAME_ERR);
        }
        if (errMsg.contains("ODPS-0410042:Invalid signature value")){
            throw new BizException(ResultCode.MAX_COMPUTE_PWD_ERR);
        }
        if (errMsg.contains("ODPS-0420095: Access Denied")){
            throw new BizException(ResultCode.MAX_COMPUTE_PROJECT_ERR);
        }
    }
    return resultTables;
}

5、執行SQL封裝

/**
 * 執行sql查詢
 * @param querySql 查詢sql
 * @param fullScan 是否開啟全表掃描 如果查詢多個分割區資料,需要開啟全表掃描
 * @return List<Map<String, Object>>
 */
public List<Map<String, Object>> queryData(String querySql, boolean fullScan){
    try {
        // 設定全表掃描嗎
        configFullScan(fullScan);
        // 使用任務執行SQL
        final Instance instance = SQLTask.run(odps, querySql);
        // 等待執行成功
        instance.waitForSuccess();
        // 封裝返回結果
        List<Record> records = SQLTask.getResult(instance);
        // 結果轉換為Map
        return buildMapByRecords(records);
    } catch (OdpsException e) {
        e.printStackTrace();
        throw new BizException(ResultCode.MAX_COMPUTE_SQL_EXEC_ERR);
    }
}

/**
 * 開啟和移除全表掃描設定
 * @param fullScan 是否全表掃描
 */
private void configFullScan(boolean fullScan) {
        if (fullScan){
        // 開啟全表掃描設定
        Map<String, String> config = new HashMap<>();
        log.info("===>>開啟全表掃描, 查詢多個分割區資料");
        config.put(FULL_SCAN_CONFIG, "true");
        odps.setGlobalSettings(config);
        }else {
        // 移除全表掃描設定
        odps.getGlobalSettings().remove(FULL_SCAN_CONFIG);
        }
    }

/**
 * 將List<Record>準換為List<Map></>
 * @param records sql查詢結果
 * @return 返回結果
 */
private List<Map<String, Object>> buildMapByRecords(List<Record> records) {
        List<Map<String, Object>> listMap = new ArrayList<>();
        for (Record record : records) {
        Column[] columns = record.getColumns();
        Map<String, Object> map = new LinkedHashMap<>();
        for (Column column : columns) {
        String name = column.getName();
        Object value = record.get(name);
        // maxCompute裡面的空返回的是使用n
        if ("\N".equalsIgnoreCase(String.valueOf(value))) {
        map.put(name, "");
        } else {
        map.put(name, value);
        }
        }
        listMap.add(map);
        }
        return listMap;
    }

6、分頁查詢分裝

/**
 * 執行sql查詢【分頁查詢】
 * @param querySql 查詢sql
 * @param page 頁碼 從1開始 第n頁傳n
 * @param size 每頁記錄數
 * @param fullScan 是否開啟全表掃描 如果查詢多個分割區資料,需要開啟全表掃描
 * @return List<Map<String, Object>>
 */
public List<Map<String, Object>> queryData(String querySql, Integer page, Integer size, boolean fullScan){
    // 重寫SQl,新增limit offset, limit
    // 1、替換分號
    querySql = querySql.replaceAll(";", "");
    // 2、格式化SQL
    Integer offset = (page - 1 ) * size;
    // 得到執行sql
    final String execSql = String.format(PAGE_SELECT_TEMPLATE_SQL, querySql, offset, size);
    log.info("=======>>>執行分頁sql為:{}", execSql);

    // 呼叫執行SQL資料
    return queryData(execSql, fullScan);
}


/**
 * 執行分頁查詢
 * @param querySql 分頁查詢sql
 * @param page 頁碼 從1開始 第n頁傳n
 * @param size 每頁記錄數
 * @return 分頁查詢結果
 */
public PageResult<Map<String, Object>> pageQueryMap(String querySql, Integer page, Integer size){
    // 1、替換分號
    querySql = querySql.replaceAll(";", "");
    String countSql = String.format(PAGE_COUNT_TEMPLATE_SQL, querySql);
    log.info("=======>>>執行分頁統計總數sql為:{}", countSql);
    // 查詢總數
    final List<Map<String, Object>> countMap = queryData(countSql, false);
    if (CollectionUtils.isEmpty(countMap)){
        return new PageResult<>(0L, new ArrayList<>());
    }

    long count = 0L;
    for (Object value : countMap.get(0).values()) {
        count = Long.parseLong(String.valueOf(value));
    }

    if (count == 0){
        return new PageResult<>(0L, new ArrayList<>());
    }

    // 執行分頁查詢 開啟全表掃描
    final List<Map<String, Object>> resultList = queryData(querySql, page, size, true);

    return new PageResult<>(count, resultList);
}


/**
 * 執行分頁查詢
 * @param querySql 分頁查詢sql
 * @param page 頁碼 從1開始 第n頁傳n
 * @param size 每頁記錄數
 * @return 分頁查詢結果
 */
public <T>PageResult<T> pageQuery(String querySql, Integer page, Integer size, Class<T> clazz){
    final PageResult<Map<String, Object>> result = pageQueryMap(querySql, page, size);
    List<T> rows = new ArrayList<>();
    for (Map<String, Object> row : result.getRows()) {
        final T t = JSONObject.parseObject(JSONObject.toJSONString(row), clazz);
        rows.add(t);
    }
    return new PageResult<>(result.getTotal(), rows);
}

1.2.2 工具類測試

使用測試資料測試工具類

public static void main(String[] args) {
    // 構建連線引數
    final MaxComputeSdkConnParam connParam = new MaxComputeSdkConnParam();
    connParam.setAliyunAccessId("您的阿里雲賬號accessId");
    connParam.setAliyunAccessKey("您的阿里雲賬號accessKey");
    connParam.setProjectName("專案名");

    // 範例化工具類
    final MaxComputeSdkUtil sdkUtil = new MaxComputeSdkUtil(connParam);

    // 查詢所有表
    final List<TableMetaInfo> tableInfos = sdkUtil.getTableInfos();
    for (TableMetaInfo tableInfo : tableInfos) {
        System.out.println(tableInfo.getTableName());
    }

    // 分頁查詢資料
    final PageResult<Map<String, Object>> page = sdkUtil.pageQueryMap("select * from ods_cust;", 2, 10);
    System.out.println(page.getTotal());
    for (Map<String, Object> map : page.getRows()) {
        System.out.println(JSONObject.toJSONString(map));
    }
}

1.2.3 為什麼要開啟全表掃描

maxCompute存在使用限制如下:

當使用select語句時,屏顯最多隻能顯示10000行結果。當select語句作為子句時則無此限制,select子句會將全部結果返回給上層查詢。
select語句查詢分割區表時預設禁止全表掃描。
自2018年1月10日20:00:00後,在新建立的專案上執行SQL語句時,預設情況下,針對該專案裡的分割區表不允許執行全表掃描操作。在查詢分割區表資料時必須指定分割區,由此減少SQL的不必要I/O,從而減少計算資源的浪費以及按量計費模式下不必要的計算費用。

如果您需要對分割區表進行全表掃描,可以在全表掃描的SQL語句前加上命令set odps.sql.allow.fullscan=true;,並和SQL語句一起提交執行。假設sale_detail表為分割區表,需要同時執行如下語句進行全表查詢:

2、JDBC方式整合

使用odps-jdbc整合, 官方檔案地址MaxCompute Java JDBC介紹

<properties>
    <java.version>1.8</java.version>
    <!--maxCompute-jdbc-版本號-->
    <max-compute-jdbc.version>3.0.1</max-compute-jdbc.version>
</properties>

<dependencies>
  <!--max compute jdbc-->
  <dependency>
    <groupId>com.aliyun.odps</groupId>
    <artifactId>odps-jdbc</artifactId>
    <version>${max-compute-jdbc.version}</version>
    <classifier>jar-with-dependencies</classifier>
  </dependency>
</dependencies>

2.2、編寫連線工具類

編寫MaxComputeSdkUtil以JDBC方式連線MaxCompute

2.2.1、重要類和方法說明

1、連線引數類:

@Data
public class MaxComputeJdbcConnParam {
  /**阿里雲accessId 相當於使用者名稱 */
  private String aliyunAccessId;
  /**阿里雲accessKey 相當於密碼 */
  private String aliyunAccessKey;
  /** maxcompute_endpoint */
  private String endpoint;
  /**專案名稱*/
  private String projectName;
}

2、公共方法(初始化)

/**JDBC 驅動名稱*/
private static final String DRIVER_NAME = "com.aliyun.odps.jdbc.OdpsDriver";

private static final String SELECT_ALL_TABLE_SQL = "select table_name, table_comment from Information_Schema.TABLES";

private static final String SELECT_FIELD_BY_TABLE_SQL = "select column_name, column_comment from Information_Schema.COLUMNS where table_name = '%s'";
/**分頁查詢sql模板*/
private static final String PAGE_SELECT_TEMPLATE_SQL = "select z.* from (%s) z limit %s, %s;";
/**分頁查詢統計數量模板SQL*/
private static final String PAGE_COUNT_TEMPLATE_SQL = "select count(1) from (%s) z;";
/**連線*/
private final Connection conn;

/**
 * 連線引數
 */
private final MaxComputeJdbcConnParam connParam;

public MaxComputeJdbcUtil(MaxComputeJdbcConnParam connParam) {
    this.connParam = connParam;
    this.conn = buildConn();
}

/**
 * 建立連線
 * @return 資料庫連線
 */
private Connection buildConn() {
    try {
        Class.forName(DRIVER_NAME);
    } catch (ClassNotFoundException e) {
        e.printStackTrace();
        throw new BizException(ResultCode.MAX_COMPUTE_JDBC_DRIVE_LOAD_ERR);
    }

    try {
        // JDBCURL連線模板
        String jdbcUrlTemplate = "jdbc:odps:%s?project=%s&useProjectTimeZone=true";
        // 使用驅動管理器連線獲取連線
        return DriverManager.getConnection(
                String.format(jdbcUrlTemplate, connParam.getEndpoint(), connParam.getProjectName()),
                connParam.getAliyunAccessId(), connParam.getAliyunAccessKey());
    } catch (SQLException e) {
        e.printStackTrace();
        throw new BizException(ResultCode.MAX_COMPUTE_JDBC_DRIVE_LOAD_ERR);
    }
}

3、查詢表資訊

/**
 * 獲取表資訊
 * @return 表資訊列表
 */
public List<TableMetaInfo> getTableInfos(){
    List<TableMetaInfo> resultList = new ArrayList<>();
    Statement statement = null;
    ResultSet resultSet = null;
    try {
        // 建立statement 使用SQL直接查詢
        statement = conn.createStatement();
        // 執行查詢語句
        resultSet = statement.executeQuery(SELECT_ALL_TABLE_SQL);
        while (resultSet.next()){
            final String tableName = resultSet.getString("table_name");
            final String tableComment = resultSet.getString("table_comment");
            final TableMetaInfo info = new TableMetaInfo(tableName, tableComment);
            resultList.add(info);
        }

        return resultList;
    } catch (SQLException e) {
        e.printStackTrace();
        throw new BizException(ResultCode.MAX_COMPUTE_SQL_EXEC_ERR);
    } finally {
        // 關閉resultSet
        closeResultSet(resultSet);
        // 關閉statement
        closeStatement(statement);
    }
}

4、執行SQL封裝

/**
 * 執行sql查詢
 * @param querySql 查詢sql
 * @return List<Map<String, Object>>
 */
public List<Map<String, Object>> queryData(String querySql){
    List<Map<String, Object>> resultList = new ArrayList<>();
    Statement statement = null;
    ResultSet resultSet = null;
    try {
        // 建立statement
        statement = conn.createStatement();

        // 執行查詢語句
        resultSet = statement.executeQuery(querySql);

        // 構建結果返回
        buildMapByRs(resultList, resultSet);

        return resultList;
    } catch (SQLException e) {
        e.printStackTrace();
        throw new BizException(ResultCode.MAX_COMPUTE_SQL_EXEC_ERR);
    } finally {
        // 關閉resultSet
        closeResultSet(resultSet);
        // 關閉statement
        closeStatement(statement);
    }
}

/**
 * 將ResultSet轉換為List<Map<String, Object>>
 * @param resultList 轉換的集合
 * @param resultSet ResultSet
 * @throws SQLException e
 */
private void buildMapByRs(List<Map<String, Object>> resultList, ResultSet resultSet) throws SQLException {
    // 獲取後設資料
    ResultSetMetaData metaData = resultSet.getMetaData();
    while (resultSet.next()) {
        // 獲取列數
        int columnCount = metaData.getColumnCount();
        Map<String, Object> map = new HashMap<>();
        for (int i = 0; i < columnCount; i++) {
            String columnName = metaData.getColumnName(i + 1);
            Object object = resultSet.getObject(columnName);
            // maxCompute裡面的空返回的是使用n
            if ("\N".equalsIgnoreCase(String.valueOf(object))) {
                map.put(columnName, "");
            } else {
                map.put(columnName, object);
            }
        }
        resultList.add(map);
    }
}


private void closeStatement(Statement statement){
    if (statement != null){
        try {
            statement.close();
        } catch (SQLException e) {
            e.printStackTrace();
        }
    }
}

private void closeResultSet(ResultSet resultSet){
    if (resultSet != null){
        try {
            resultSet.close();
        } catch (SQLException e) {
            e.printStackTrace();
        }
    }
}

5、分頁查詢分裝

/**
 * 執行sql查詢
 * @param querySql 查詢sql
 * @return List<Map<String, Object>>
 */
public List<Map<String, Object>> queryData(String querySql, Integer page, Integer size){
    List<Map<String, Object>> resultList = new ArrayList<>();
    Statement statement = null;
    ResultSet resultSet = null;
    try {
        // 1、替換分號
        querySql = querySql.replaceAll(";", "");
        // 建立statement
        statement = conn.createStatement();
        // 2、格式化SQL
        int offset = (page - 1 ) * size;
        final String execSql = String.format(PAGE_SELECT_TEMPLATE_SQL, querySql, offset, size);
        log.info("=======>>>執行分頁sql為:{}", execSql);
        // 執行查詢語句
        resultSet = statement.executeQuery(execSql);

        // 構建結果返回
        buildMapByRs(resultList, resultSet);
        return resultList;
    } catch (SQLException e) {
        e.printStackTrace();
        throw new BizException(ResultCode.MAX_COMPUTE_SQL_EXEC_ERR);
    } finally {
        // 關閉resultSet
        closeResultSet(resultSet);
        // 關閉statement
        closeStatement(statement);
    }
}


/**
 * 執行分頁查詢
 * @param querySql 分頁查詢sql
 * @param page 頁碼 從1開始 第n頁傳n
 * @param size 每頁記錄數
 * @return 分頁查詢結果
 */
public PageResult<Map<String, Object>> pageQueryMap(String querySql, Integer page, Integer size){
    // 1、替換分號
    querySql = querySql.replaceAll(";", "");
    String countSql = String.format(PAGE_COUNT_TEMPLATE_SQL, querySql);
    log.info("=======>>>執行分頁統計總數sql為:{}", countSql);
    // 查詢總數
    final List<Map<String, Object>> countMap = queryData(countSql);
    if (CollectionUtils.isEmpty(countMap)){
        return new PageResult<>(0L, new ArrayList<>());
    }

    long count = 0L;
    for (Object value : countMap.get(0).values()) {
        count = Long.parseLong(String.valueOf(value));
    }

    if (count == 0){
        return new PageResult<>(0L, new ArrayList<>());
    }

    // 執行分頁查詢 開啟全表掃描
    final List<Map<String, Object>> resultList = queryData(querySql, page, size);

    return new PageResult<>(count, resultList);
}


/**
 * 執行分頁查詢
 * @param querySql 分頁查詢sql
 * @param page 頁碼 從1開始 第n頁傳n
 * @param size 每頁記錄數
 * @return 分頁查詢結果
 */
public <T>PageResult<T> pageQuery(String querySql, Integer page, Integer size, Class<T> clazz){
    final PageResult<Map<String, Object>> result = pageQueryMap(querySql, page, size);
    List<T> rows = new ArrayList<>();
    for (Map<String, Object> row : result.getRows()) {
        final T t = JSONObject.parseObject(JSONObject.toJSONString(row), clazz);
        rows.add(t);
    }
    return new PageResult<>(result.getTotal(), rows);
}

2.2.2 工具類測試

使用測試資料測試工具類

public static void main(String[] args) {
    final MaxComputeJdbcConnParam connParam = new MaxComputeJdbcConnParam();
    connParam.setAliyunAccessId("您的阿里雲賬號accessId");
    connParam.setAliyunAccessKey("您的阿里雲賬號accessKey");
    connParam.setProjectName("專案名");
    connParam.setEndpoint("http://service.cn-hangzhou.maxcompute.aliyun.com/api");
    final MaxComputeJdbcUtil jdbcUtil = new MaxComputeJdbcUtil(connParam);

    // 獲取表資訊
    final List<TableMetaInfo> tableInfos = jdbcUtil.getTableInfos();
    for (TableMetaInfo tableInfo : tableInfos) {
        System.out.println(tableInfo);
    }

    // 獲取欄位資訊
    final String tableName = tableInfos.get(new Random().nextInt(tableInfos.size())).getTableName();
    final List<TableColumnMetaInfo> fields = jdbcUtil.getFieldByTableName(tableName);
    for (TableColumnMetaInfo field : fields) {
        System.out.println(field.getFieldName() + "-" + field.getComment());
    }

    // 執行查詢
    final List<Map<String, Object>> list = jdbcUtil.queryData("select * from ods_cust;");
    for (Map<String, Object> map : list) {
        System.out.println(JSONObject.toJSONString(map));
    }

    // 執行分頁查詢
    final List<Map<String, Object>> list2 = jdbcUtil.queryData("select * from ods_cust;", 2, 10);
    for (Map<String, Object> map : list2) {
        System.out.println(JSONObject.toJSONString(map));
    }

    // 執行分頁查詢 並返回count
    final PageResult<Map<String, Object>> list3 = jdbcUtil.pageQueryMap("select * from ods_cust;", 2, 10);
    System.out.println(list3.getTotal());
    for (Map<String, Object> map : list3.getRows()) {
        System.out.println(JSONObject.toJSONString(map));
    }

    jdbcUtil.close();
}

專案地址

springboot整合maxCompute

到此這篇關於SpringBoot整合MaxCompute的文章就介紹到這了,更多相關SpringBoot整合MaxCompute內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!


IT145.com E-mail:sddin#qq.com