首頁 > 軟體

利用 SpringBoot 在 ES 中實現類似連表查詢功能

2022-07-12 18:02:43

一、摘要

在上篇文章中,我們詳細的介紹瞭如何在 ES 中精準的實現巢狀json物件查詢?

那麼問題來了,我們如何在後端通過技術方式快速的實現 es 中內嵌物件的資料查詢呢?

為了方便更容易掌握技術,本文主要以上篇文章中介紹的通過商品找訂單為案例,利用 SpringBoot 整合 ES 實現這個業務需求,向大家介紹具體的技術實踐方案,存入es中的json資料結構如下:

{
    "orderId":"1",
    "orderNo":"123456",
    "orderUserName":"張三",
    "orderItems":[
        {
            "orderItemId":"12234",
            "orderId":"1",
            "productName":"火腿腸",
            "brandName":"雙匯",
            "sellPrice":"28"
        },
        {
            "orderItemId":"12235",
            "orderId":"1",
            "productName":"果凍",
            "brandName":"匯源",
            "sellPrice":"12"
        }
    ]
}

廢話也不多說了,直接上程式碼!

二、專案實踐

2.1新增依賴

在SpringBoot專案中,新增rest-high-level-client使用者端,方便與 ES 伺服器連線通訊,在這裡需要注意一下,推薦使用者端的版本與 ES 伺服器的版本號一致,不然會出現介面請求錯誤等異常!

小編本次安裝的ES伺服器端版本號為6.8.2,因此使用者端也保持6.8.2,與之一致!

<!--elasticsearch-->
<dependency>
    <groupId>org.elasticsearch</groupId>
    <artifactId>elasticsearch</artifactId>
    <version>6.8.2</version>
</dependency>
<dependency>
    <groupId>org.elasticsearch.client</groupId>
    <artifactId>elasticsearch-rest-client</artifactId>
    <version>6.8.2</version>
</dependency>
<dependency>
    <groupId>org.elasticsearch.client</groupId>
    <artifactId>elasticsearch-rest-high-level-client</artifactId>
    <version>6.8.2</version>
</dependency>

2.2設定 es 使用者端

為了更佳方便的使用 es,我們可以將其各個設定類進行封裝,方便後續進行維護。

  • 在application.properties組態檔中,定義 es 設定連線地址;
# 設定es引數
elasticsearch.scheme=http
elasticsearch.address=127.0.0.1:9200
elasticsearch.userName=
elasticsearch.userPwd=
elasticsearch.socketTimeout=5000
elasticsearch.connectTimeout=5000
elasticsearch.connectionRequestTimeout=5000
  • 建立ElasticSearch設定類,方便SpringBoot啟動時注入;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.Arrays;
import java.util.Objects;


@Configuration
public class ElasticSearchConfiguration {

    private static final Logger log = LoggerFactory.getLogger(ElasticSearchConfiguration.class);


    private static final int ADDRESS_LENGTH = 2;

    @Value("${elasticsearch.scheme:http}")
    private String scheme;

    @Value("${elasticsearch.address}")
    private String address;

    @Value("${elasticsearch.userName}")
    private String userName;

    @Value("${elasticsearch.userPwd}")
    private String userPwd;

    @Value("${elasticsearch.socketTimeout:5000}")
    private Integer socketTimeout;

    @Value("${elasticsearch.connectTimeout:5000}")
    private Integer connectTimeout;

    @Value("${elasticsearch.connectionRequestTimeout:5000}")
    private Integer connectionRequestTimeout;

    /**
     * 初始化使用者端
     * @return
     */
    @Bean(name = "restHighLevelClient")
    public RestHighLevelClient restClientBuilder() {
        HttpHost[] hosts = Arrays.stream(address.split(","))
                .map(this::buildHttpHost)
                .filter(Objects::nonNull)
                .toArray(HttpHost[]::new);
        RestClientBuilder restClientBuilder = RestClient.builder(hosts);
        // 非同步引數設定
        restClientBuilder.setHttpClientConfigCallback(httpClientBuilder -> {
            httpClientBuilder.setDefaultCredentialsProvider(buildCredentialsProvider());
            return httpClientBuilder;
        });

        // 非同步連線延時設定
        restClientBuilder.setRequestConfigCallback(requestConfigBuilder -> {
            requestConfigBuilder.setConnectionRequestTimeout(connectionRequestTimeout);
            requestConfigBuilder.setSocketTimeout(socketTimeout);
            requestConfigBuilder.setConnectTimeout(connectTimeout);
            return requestConfigBuilder;
        });

        return new RestHighLevelClient(restClientBuilder);
    }


    /**
     * 根據設定建立HttpHost
     * @param s
     * @return
     */
    private HttpHost buildHttpHost(String s) {
        String[] address = s.split(":");
        if (address.length == ADDRESS_LENGTH) {
            String ip = address[0];
            int port = Integer.parseInt(address[1]);
            return new HttpHost(ip, port, scheme);
        } else {
            return null;
        }
    }

    /**
     * 構建認證服務
     * @return
     */
    private CredentialsProvider buildCredentialsProvider(){
        final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
        credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(userName,
                userPwd));
        return credentialsProvider;
    }
}
  • 封裝ElasticSearch使用者端服務類,方便公共呼叫處理
import com.fasterxml.jackson.databind.ObjectMapper;
import org.example.es.exception.CommonException;
import org.apache.commons.lang3.StringUtils;
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
import org.elasticsearch.action.admin.indices.get.GetIndexResponse;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.GetAliasesResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.Set;

@Component
public class ElasticSearchClient {

    private static final Logger log = LoggerFactory.getLogger(ElasticSearchClient.class);

    private static final ObjectMapper objectMapper = new ObjectMapper();

    @Autowired
    private RestHighLevelClient client;


    /**
     * 查詢全部索引
     * @return
     */
    public Set<String> getAlias(){
        try {
            GetAliasesRequest request = new GetAliasesRequest();
            GetAliasesResponse response =  client.indices().getAlias(request, RequestOptions.DEFAULT);
            return response.getAliases().keySet();
        } catch (IOException e) {
            log.error("向es發起查詢全部索引資訊請求失敗", e);
        }
        return Collections.emptySet();
    }

    /**
     * 檢查索引是否存在
     * @param indexName
     * @return
     */
    public boolean existsIndex(String indexName){
        try {
            // 建立請求
            GetIndexRequest request = new GetIndexRequest().indices(indexName);
            // 執行請求,獲取響應
            boolean response = client.indices().exists(request, RequestOptions.DEFAULT);
            return response;
        } catch (Exception e) {
            log.error("向es發起查詢索引是否存在請求失敗,請求引數:" + indexName, e);
        }
        return false;
    }


    /**
     * 查詢索引
     * @param indexName
     * @return
     */
    public String getIndex(String indexName){
        try {
            // 建立請求
            GetIndexRequest request = new GetIndexRequest().indices(indexName);
            // 執行請求,獲取響應
            GetIndexResponse response = client.indices().get(request, RequestOptions.DEFAULT);
            return response.toString();
        } catch (Exception e) {
            log.error("向es發起查詢索引請求失敗,請求引數:" + indexName, e);
        }
        return StringUtils.EMPTY;
    }

    /**
     * 建立索引
     * @param indexName
     * @param mapping
     * @return
     */
    public void createIndex(String indexName, Map<String, Object> mapping){
        try {
            CreateIndexRequest request = new CreateIndexRequest();
            //索引名稱
            request.index(indexName);
            //索引設定
            Settings settings = Settings.builder()
                    .put("index.number_of_shards", 3)
                    .put("index.number_of_replicas", 1)
                    .put("index.max_inner_result_window", 5000)
                    .build();
            request.settings(settings);
            //索引結構
            request.mapping("_doc",mapping);
            //執行請求,獲取響應
            CreateIndexResponse response = client.indices().create(request, RequestOptions.DEFAULT);
            if(!response.isAcknowledged()){
                throw new CommonException("向es發起建立索引請求失敗");
            }
            log.info("向es發起建立索引請求成功,返回引數:{}", response.index());
        } catch (Exception e) {
            log.error("向es發起建立索引請求失敗,請求引數:" + indexName, e);
            throw new CommonException("向es發起建立索引請求失敗");
        }
    }


    /**
     * 刪除索引
     * @param indexName
     * @return
     */
    public void deleteIndex(String indexName){
        try {
            DeleteIndexRequest request = new DeleteIndexRequest(indexName);
            AcknowledgedResponse response = client.indices().delete(request, RequestOptions.DEFAULT);
            if(!response.isAcknowledged()){
                throw new CommonException("向es發起刪除索引請求失敗");
            }
            log.info("向es發起刪除索引請求成功,請求引數:{}", indexName);
        } catch (Exception e) {
            log.error("向es發起刪除索引請求失敗,請求引數:" + indexName, e);
            throw new CommonException("向es發起刪除索引請求失敗");
        }
    }


    /**
     * 查詢索引對映欄位
     * @param indexName
     * @return
     */
    public String getMapping(String indexName){
        try {
            GetMappingsRequest request = new GetMappingsRequest().indices(indexName).types("_doc");
            GetMappingsResponse response = client.indices().getMapping(request, RequestOptions.DEFAULT);
            return response.toString();
        } catch (Exception e) {
            log.error("向es發起查詢索引對映欄位請求失敗,請求引數:" + indexName, e);
        }
        return StringUtils.EMPTY;
    }


    /**
     * 新增索引對映欄位
     * @param indexName
     * @return
     */
    public void addMapping(String indexName, Map<String, Object> mapping){
        try {
            PutMappingRequest request = new PutMappingRequest();
            request.indices(indexName);
            request.type("_doc");
            //新增欄位
            request.source(mapping);
            AcknowledgedResponse response = client.indices().putMapping(request, RequestOptions.DEFAULT);
            if(!response.isAcknowledged()){
                throw new CommonException("向es發起新增索引對映欄位請求失敗");
            }
            log.info("向es發起新增索引對映欄位請求成功,請求引數:{}", toJson(request));
        } catch (Exception e) {
            log.error("向es發起新增索引對映欄位請求失敗,請求引數:" + indexName, e);
            throw new CommonException("向es發起新增索引對映欄位請求失敗");
        }
    }


    /**
     * 向索引中新增檔案
     * @param indexName
     * @param id
     * @param obj
     */
    public void addDocument(String indexName, String id, Object obj){
        try {
            //向索引中新增檔案
            IndexRequest request = new IndexRequest();
            // 外層引數
            request.id(id);
            request.index(indexName);
            request.type("_doc");

            // 存入物件
            request.source(toJson(obj), XContentType.JSON);
            // 傳送請求
            IndexResponse response = client.index(request, RequestOptions.DEFAULT);
            if(response.status().getStatus() >= 400){
                log.warn("向es發起新增檔案資料請求失敗,請求引數:{},返回引數:{}", request.toString(), response.toString());
                throw new CommonException("向es發起新增檔案資料請求失敗");
            }
        } catch (Exception e) {
            log.error("向es發起新增檔案資料請求失敗,請求引數:" + indexName, e);
            throw new CommonException("向es發起新增檔案資料請求失敗");
        }
    }

    /**
     * 修改索引中的檔案資料
     * @param indexName
     * @param id
     * @param obj
     */
    public void updateDocument(String indexName, String id, Map<String,Object> obj){
        try {
            //修改索引中的檔案資料
            UpdateRequest request = new UpdateRequest();
            // 外層引數
            request.id(id);
            request.index(indexName);
            request.type("_doc");
            // 存入物件
            request.doc(obj);
            request.doc(toJson(obj), XContentType.JSON);
            // 傳送請求
            UpdateResponse response = client.update(request, RequestOptions.DEFAULT);
            if(response.status().getStatus() >= 400){
                log.warn("向es發起修改檔案資料請求失敗,請求引數:{},返回引數:{}", request.toString(), response.toString());
                throw new CommonException("向es發起修改檔案資料請求失敗");
            }
        } catch (Exception e) {
            log.error("向es發起修改檔案資料請求失敗,請求引數:" + indexName, e);
            throw new CommonException("向es發起修改檔案資料請求失敗");
        }
    }



    /**
     * 刪除索引中的檔案資料
     * @param indexName
     * @param id
     */
    public void deleteDocument(String indexName, String id){
        try {
            //刪除索引中的檔案資料
            DeleteRequest request = new DeleteRequest();
            // 外層引數
            request.id(id);
            request.index(indexName);
            request.type("_doc");
            // 傳送請求
            DeleteResponse response = client.delete(request, RequestOptions.DEFAULT);
            if(response.status().getStatus() >= 400){
                log.warn("向es發起刪除檔案資料請求失敗,請求引數:{},返回引數:{}", request.toString(), response.toString());
                throw new CommonException("向es發起刪除檔案資料請求失敗");
            }
        } catch (Exception e) {
            log.error("向es發起刪除檔案資料請求失敗,請求引數:" + indexName, e);
            throw new CommonException("向es發起刪除檔案資料請求失敗");
        }
    }


    /**
     * 查詢索引中的檔案資料
     * @param indexName
     * @param id
     */
    public String getDocumentById(String indexName, String id){
        try {
            GetRequest request = new GetRequest();
            // 外層引數
            request.id(id);
            request.index(indexName);
            request.type("_doc");
            // 傳送請求
            GetResponse response = client.get(request, RequestOptions.DEFAULT);
            response.getSourceAsString();
        } catch (Exception e) {
            log.error("向es發起查詢檔案資料請求失敗,請求引數:" + indexName, e);
        }
        return StringUtils.EMPTY;
    }


    /**
     * 索引高階查詢
     * @param indexName
     * @param source
     * @return
     */
    public SearchResponse searchDocument(String indexName, SearchSourceBuilder source){
        //搜尋
        SearchRequest searchRequest = new SearchRequest();
        searchRequest.indices(indexName);
        searchRequest.source(source);
        try {
            // 執行請求
            SearchResponse response = client.search(searchRequest, RequestOptions.DEFAULT);
            return response;
        } catch (Exception e) {
            log.warn("向es發起查詢檔案資料請求失敗,請求引數:" + searchRequest.toString(), e);
        }
        return null;
    }


    /**
     * 將物件格式化成json,並保持原欄位型別輸出
     * @param object
     * @return
     */
    private String toJson(Object object) {
        try {
            return objectMapper.writeValueAsString(object);
        } catch (Exception e) {
            throw new CommonException(e);
        }
    }

}

2.3初始化索引結構

在使用 es 對訂單進行查詢搜尋時,我們需要先定義好對應的訂單索引結構,內容如下:

@ActiveProfiles("dev")
@RunWith(SpringRunner.class)
@SpringBootTest
public class OrderIndexServiceJunit {

    @Autowired
    private ElasticSearchClient elasticSearchClient;
    
    /**
     * 初始化索引結構
     *
     * @return
     */
    @Test
    public void initIndex(){
        String indexName = "orderIndex-2022-07";
        // 建立請求
        boolean existsIndex = elasticSearchClient.existsIndex(indexName);
        if (!existsIndex) {
            Map<String, Object> properties = buildMapping();
            elasticSearchClient.createIndex(indexName, properties);
        }
    }

    /**
     * 構建索引結構
     *
     * @return
     */
    private Map<String, Object> buildMapping() {
        Map<String, Object> properties = new HashMap();
        //訂單id  唯一鍵ID
        properties.put("orderId", ImmutableBiMap.of("type", "keyword"));
        //訂單號
        properties.put("orderNo", ImmutableBiMap.of("type", "keyword"));
        //客戶姓名
        properties.put("orderUserName", ImmutableBiMap.of("type", "text"));
        
        //訂單項
        Map<String, Object> orderItems = new HashMap();
        //訂單項ID
        orderItems.put("orderItemId", ImmutableBiMap.of("type", "keyword"));
        //產品名稱
        orderItems.put("productName", ImmutableBiMap.of("type", "text"));
        //品牌名稱
        orderItems.put("brandName", ImmutableBiMap.of("type", "text"));
        //銷售金額,單位分*100
        orderItems.put("sellPrice", ImmutableBiMap.of("type", "integer"));
        properties.put("orderItems", ImmutableBiMap.of("type", "nested", "properties", orderItems));

        //檔案結構對映
        Map<String, Object> mapping = new HashMap();
        mapping.put("properties", properties);
        return mapping;
    }
}

2.4向 es 中同步檔案資料

索引結構建立好之後,我們需要將支援 es 搜尋的訂單資料同步進去。

將指定的訂單 ID 從資料庫查詢出來,並封裝成 es 訂單資料結構,儲存到 es 中!

@ActiveProfiles("dev")
@RunWith(SpringRunner.class)
@SpringBootTest
public class OrderIndexServiceJunit {

    @Autowired
    private ElasticSearchClient elasticSearchClient;


    /**
     * 儲存訂單到ES中
     * @param request
     */
    @Test
    public void saveDocument(){
        String indexName =  "orderIndex-2022-07";
        //從資料庫查詢最新訂單資料,並封裝成對應的es訂單結構
        String orderId = "202202020202";
        OrderIndexDocDTO indexDocDTO = buildOrderIndexDocDTO(orderId);
        //儲存資料到ES中
        elasticSearchClient.addDocument(indexName, indexDocDTO.getOrderId(), indexDocDTO);
    }

}

2.5內嵌物件查詢

內嵌物件查詢分兩種形式,比如,第一種通過商品、品牌、價格等條件,分頁查詢訂單資料;第二種是通過訂單ID、商品、品牌、價格等,分頁查詢訂單項資料。具體的實踐,請看下文。

通過商品、品牌、價格等條件,分頁查詢訂單資料;

@ActiveProfiles("dev")
@RunWith(SpringRunner.class)
@SpringBootTest
public class OrderIndexServiceJunit {

    @Autowired
    private ElasticSearchClient elasticSearchClient;


    /**
     * 通過商品、品牌、價格等條件,分頁查詢訂單資料
     * @param request
     */
    @Test
    public void search1(){
        //查詢索引,支援萬用字元
        String indexName = "orderIndex-*";
        String orderUserName = "張三";
        String productName = "薯條";
        // 條件搜尋
        SearchSourceBuilder builder = new SearchSourceBuilder();

        //組合搜尋
        BoolQueryBuilder mainBoolQuery = new BoolQueryBuilder();
        mainBoolQuery.must(QueryBuilders.matchQuery("orderUserName", orderUserName));

        //訂單項相關資訊搜尋
        BoolQueryBuilder nestedBoolQuery = new BoolQueryBuilder(); nestedBoolQuery.must(QueryBuilders.matchQuery("orderItems.productName", productName));
        //內嵌物件搜尋,需要指定path
        NestedQueryBuilder nestedQueryBuilder = QueryBuilders.nestedQuery("orderItems",nestedBoolQuery, ScoreMode.None);
        //子表查詢
        mainBoolQuery.must(nestedQueryBuilder);

        //封裝查詢引數
        builder.query(mainBoolQuery);

        //返回引數
        builder.fetchSource(new String[]{}, new String[]{});

        //結果集合分頁,從第一頁開始,返回最多四條資料
        builder.from(0).size(4);

        //排序
        builder.sort("orderId", SortOrder.DESC);
        log.info("dsl:{}", builder.toString());
        // 執行請求
        SearchResponse response = elasticSearchClient.searchDocument(indexName, builder);
        // 當前返回的總行數
        long count = response.getHits().getTotalHits();
        // 返回的具體行數
        SearchHit[] searchHits = response.getHits().getHits();
  log.info("response:{}", response.toString());
    }

}

通過訂單ID、商品、品牌、價格等,分頁查詢訂單項資料;

@ActiveProfiles("dev")
@RunWith(SpringRunner.class)
@SpringBootTest
public class OrderIndexServiceJunit {

    @Autowired
    private ElasticSearchClient elasticSearchClient;


    /**
     * 通過訂單ID、商品、品牌、價格等,分頁查詢訂單項資料
     * @param request
     */
    @Test
    public void search2(){
        //查詢索引,支援萬用字元
        String indexName = "orderIndex-*";
        String orderId = "202202020202";
        String productName = "薯條";
        // 條件搜尋
        SearchSourceBuilder builder = new SearchSourceBuilder();

        //組合搜尋
        BoolQueryBuilder mainBoolQuery = new BoolQueryBuilder();
        mainBoolQuery.must(QueryBuilders.termQuery("_id", orderId));

        //訂單項相關資訊搜尋
        BoolQueryBuilder nestedBoolQuery = new BoolQueryBuilder(); nestedBoolQuery.must(QueryBuilders.matchQuery("orderItems.productName", productName));
        //內嵌物件搜尋,需要指定path
        NestedQueryBuilder nestedQueryBuilder = QueryBuilders.nestedQuery("orderItems",nestedBoolQuery, ScoreMode.None);

        //內嵌物件分頁查詢
        InnerHitBuilder innerHitBuilder = new InnerHitBuilder();
        //結果集合分頁,從第一頁開始,返回最多四條資料
        innerHitBuilder.setFrom(0).setSize(4);
        //只返回訂單項id
        innerHitBuilder.setFetchSourceContext(new FetchSourceContext(true, new String[]{"orderItems.orderItemId"}, new String[]{}));
        innerHitBuilder.addSort(SortBuilders.fieldSort("orderItems.orderItemId").order(SortOrder.DESC));
        nestedQueryBuilder.innerHit(innerHitBuilder);
        
        //子表查詢
        mainBoolQuery.must(nestedQueryBuilder);

        //封裝查詢引數
        builder.query(mainBoolQuery);

        //返回引數
        builder.fetchSource(new String[]{}, new String[]{});

        //結果集合分頁,從第一頁開始,返回最多四條資料
        builder.from(0).size(4);

        //排序
        builder.sort("orderId", SortOrder.DESC);
        log.info("dsl:{}", builder.toString());
        // 執行請求
        SearchResponse response = elasticSearchClient.searchDocument(indexName, builder);
        // 當前返回的訂單主表總行數
        long count = response.getHits().getTotalHits();
        // 返回的訂單主表資料
        SearchHit[] searchHits = response.getHits().getHits();
        // 返回查詢的的訂單項分頁資料
        Map<String, SearchHits> = searchHit[0].getInnerHits();
        log.info("response:{}", response.toString());
    }

}

三、小結

本文主要以通過商品名稱查詢訂單資料為案例,介紹利用 SpringBoot 整合 es 實現資料的高效搜尋,內容如果難免有些遺漏,歡迎網友指出!


IT145.com E-mail:sddin#qq.com