首頁 > 軟體

SpringBoot 整合 Elasticsearch 實現海量級資料搜尋功能

2022-07-15 10:00:44

今天給大家講講 SpringBoot 框架 整合 Elasticsearch 實現海量級資料搜尋。

一、簡介

在上篇ElasticSearch 文章中,我們詳細的介紹了 ElasticSearch 的各種 api 使用。

實際的專案開發過程中,我們通常基於某些主流框架平臺進行技術開發,比如 SpringBoot,今天我們就以 SpringBoot 整合 ElasticSearch 為例,給大家詳細的介紹 ElasticSearch 的使用!

SpringBoot 連線 ElasticSearch,主流的方式有以下四種方式

  • 方式一:通過 Elastic Transport Client 使用者端連線 es 伺服器,底層基於 TCP 協定通過 transport 模組和遠端 ES 伺服器端通訊,不過,從 V7.0 開始官方不建議使用,V8.0開始正式移除。

  • 方式二:通過 Elastic Java Low Level Rest Client 使用者端連線 es 伺服器,底層基於 HTTP 協定通過 restful API 來和遠端 ES 伺服器端通訊,只提供了最簡單最基本的 API,類似於上篇文章中給大家介紹的 API 操作邏輯

  • Elastic Java High Level Rest Client
    Elastic Java Low Level Rest Client
    Elastic Transport Client
  • 方式四:通過 JestClient 使用者端連線 es 伺服器,這是開源社群基於 HTTP 協定開發的一款 es 使用者端,官方宣稱介面及程式碼設計比 ES 官方提供的 Rest 使用者端更簡潔、更合理,更好用,具有一定的 ES 伺服器端版本相容性,但是更新速度不是很快,目前 ES 版本已經出到 V7.9,但是 JestClient 只支援 V1.0~V6.X 版  本的 ES。

還有一個需要大家注意的地方,那就是版本號的相容!

在開發過程中,大家尤其需要關注一下使用者端和伺服器端的版本號,要儘可能保持一致,比如伺服器端 es 的版本號是 6.8.2 ,那麼連線 es 的使用者端版本號,最好也是 6.8.2 ,即使因專案的原因不能保持一致,使用者端的版本號必須在 6.0.0 ~6.8.2 ,不要超過伺服器的版本號,這樣使用者端才能保持正常工作,否則會出現很多意想不到的問題,假如使用者端是 7.0.4 的版本號,此時的程式會各種報錯,甚至沒辦法用!

為什麼要這樣做呢?主要原因就是 es 的伺服器端,高版本不相容低版本;es6 和 es7 的某些 API 請求引數結構有著很大的區別,所以使用者端和伺服器端版本號儘量保持一致。

廢話也不多說了,直接上程式碼!

二、程式碼實踐

本文采用的 SpringBoot 版本號是 2.1.0.RELEASE ,伺服器端 es 的版本號是 6.8.2 ,使用者端採用的是官方推薦的 Elastic Java High Level Rest Client 版本號是 6.4.2 ,方便與 SpringBoot 的版本相容。

2.1、匯入依賴

<!--elasticsearch-->
<dependency>
    <groupId>org.elasticsearch</groupId>
    <artifactId>elasticsearch</artifactId>
    <version>6.4.2</version>
</dependency>
<dependency>
    <groupId>org.elasticsearch.client</groupId>
    <artifactId>elasticsearch-rest-client</artifactId>
    <version>6.4.2</version>
</dependency>
<dependency>
    <groupId>org.elasticsearch.client</groupId>
    <artifactId>elasticsearch-rest-high-level-client</artifactId>
    <version>6.4.2</version>
</dependency>

2.2、設定環境變數

在 application.properties 全域性組態檔中,設定 elasticsearch 自定義環境變數

elasticsearch.scheme=http
elasticsearch.address=127.0.0.1:9200
elasticsearch.userName=
elasticsearch.userPwd=
elasticsearch.socketTimeout=5000
elasticsearch.connectTimeout=5000
elasticsearch.connectionRequestTimeout=5000

2.3、建立 elasticsearch 的 config 類

@Configuration
public class ElasticsearchConfiguration {
    private static final Logger log = LoggerFactory.getLogger(ElasticsearchConfiguration.class);
    private static final int ADDRESS_LENGTH = 2;
    @Value("${elasticsearch.scheme:http}")
    private String scheme;
    @Value("${elasticsearch.address}")
    private String address;
    @Value("${elasticsearch.userName}")
    private String userName;
    @Value("${elasticsearch.userPwd}")
    private String userPwd;
    @Value("${elasticsearch.socketTimeout:5000}")
    private Integer socketTimeout;
    @Value("${elasticsearch.connectTimeout:5000}")
    private Integer connectTimeout;
    @Value("${elasticsearch.connectionRequestTimeout:5000}")
    private Integer connectionRequestTimeout;
    /**
     * 初始化使用者端
     * @return
     */
    @Bean(name = "restHighLevelClient")
    public RestHighLevelClient restClientBuilder() {
        HttpHost[] hosts = Arrays.stream(address.split(","))
                .map(this::buildHttpHost)
                .filter(Objects::nonNull)
                .toArray(HttpHost[]::new);
        RestClientBuilder restClientBuilder = RestClient.builder(hosts);
        // 非同步引數設定
        restClientBuilder.setHttpClientConfigCallback(httpClientBuilder -> {
            httpClientBuilder.setDefaultCredentialsProvider(buildCredentialsProvider());
            return httpClientBuilder;
        });
        // 非同步連線延時設定
        restClientBuilder.setRequestConfigCallback(requestConfigBuilder -> {
            requestConfigBuilder.setConnectionRequestTimeout(connectionRequestTimeout);
            requestConfigBuilder.setSocketTimeout(socketTimeout);
            requestConfigBuilder.setConnectTimeout(connectTimeout);
            return requestConfigBuilder;
        });
        return new RestHighLevelClient(restClientBuilder);
    }
    /**
     * 根據設定建立HttpHost
     * @param s
     * @return
     */
    private HttpHost buildHttpHost(String s) {
        String[] address = s.split(":");
        if (address.length == ADDRESS_LENGTH) {
            String ip = address[0];
            int port = Integer.parseInt(address[1]);
            return new HttpHost(ip, port, scheme);
        } else {
            return null;
        }
    }
    /**
     * 構建認證服務
     * @return
     */
    private CredentialsProvider buildCredentialsProvider(){
        final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
        credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(userName,
                userPwd));
        return credentialsProvider;
    }
}

至此,使用者端設定完畢,專案啟動的時候,會自動注入到 Spring 的 ioc 容器裡面。

2.4、索引管理

es 中最重要的就是索引庫,使用者端如何建立呢?請看下文!

  • 建立索引

@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(classes = ElasticSearchApplication.class)
public class IndexJunit {
    @Autowired
    private RestHighLevelClient client;
    /**
     * 建立索引(簡單模式)
     * @throws IOException
     */
    @Test
    public void createIndex() throws IOException {
        CreateIndexRequest request = new CreateIndexRequest("cs_index");
        CreateIndexResponse response = client.indices().create(request, RequestOptions.DEFAULT);
        System.out.println(response.isAcknowledged());
    }
    /**
     * 建立索引(複雜模式)
     * 可以直接把對應的檔案結構也一併初始化
     * @throws IOException
     */
    @Test
    public void createIndexComplete() throws IOException {
        CreateIndexRequest request = new CreateIndexRequest();
        //索引名稱
        request.index("cs_index");
        //索引設定
        Settings settings = Settings.builder()
                .put("index.number_of_shards", 3)
                .put("index.number_of_replicas", 1)
                .build();
        request.settings(settings);
        //對映結構欄位
        Map<String, Object> properties = new HashMap();
        properties.put("id", ImmutableBiMap.of("type", "text"));
        properties.put("name", ImmutableBiMap.of("type", "text"));
        properties.put("sex", ImmutableBiMap.of("type", "text"));
        properties.put("age", ImmutableBiMap.of("type", "long"));
        properties.put("city", ImmutableBiMap.of("type", "text"));
        properties.put("createTime", ImmutableBiMap.of("type", "long"));
        Map<String, Object> mapping = new HashMap<>();
        mapping.put("properties", properties);
        //新增一個預設型別
        System.out.println(JSON.toJSONString(request));
        request.mapping("_doc",mapping);
        CreateIndexResponse response = client.indices().create(request, RequestOptions.DEFAULT);
        System.out.println(response.isAcknowledged());
    }
}
  • 刪除索引
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(classes = ElasticSearchApplication.class)
public class IndexJunit {
    @Autowired
    private RestHighLevelClient client;
    /**
     * 刪除索引
     * @throws IOException
     */
    @Test
    public void deleteIndex() throws IOException {
        DeleteIndexRequest request = new DeleteIndexRequest("cs_index1");
        AcknowledgedResponse response = client.indices().delete(request, RequestOptions.DEFAULT);
        System.out.println(response.isAcknowledged());
    }
}
  • 查詢索引

@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(classes = ElasticSearchApplication.class)
public class IndexJunit {
    @Autowired
    private RestHighLevelClient client;
    /**
     * 查詢索引
     * @throws IOException
     */
    @Test
    public void getIndex() throws IOException {
        // 建立請求
        GetIndexRequest request = new GetIndexRequest();
        request.indices("cs_index");
        // 執行請求,獲取響應
        GetIndexResponse response = client.indices().get(request, RequestOptions.DEFAULT);
        System.out.println(response.toString());
    }
}
  • 查詢索引是否存在

@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(classes = ElasticSearchApplication.class)
public class IndexJunit {
    @Autowired
    private RestHighLevelClient client;
    /**
     * 檢查索引是否存在
     * @throws IOException
     */
    @Test
    public void exists() throws IOException {
        // 建立請求
        GetIndexRequest request = new GetIndexRequest();
        request.indices("cs_index");
        // 執行請求,獲取響應
        boolean response = client.indices().exists(request, RequestOptions.DEFAULT);
        System.out.println(response);
    }
}
  • 查詢所有的索引名稱

@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(classes = ElasticSearchApplication.class)
public class IndexJunit {
    @Autowired
    private RestHighLevelClient client;
    /**
     * 查詢所有的索引名稱
     * @throws IOException
     */
    @Test
    public void getAllIndices() throws IOException {
        GetAliasesRequest request = new GetAliasesRequest();
        GetAliasesResponse response =  client.indices().getAlias(request,RequestOptions.DEFAULT);
        Map<String, Set<AliasMetaData>> map = response.getAliases();
        Set<String> indices = map.keySet();
        for (String key : indices) {
            System.out.println(key);
        }
    }
}
  • 查詢索引對映欄位

@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(classes = ElasticSearchApplication.class)
public class IndexJunit {
    @Autowired
    private RestHighLevelClient client;
    /**
     * 查詢索引對映欄位
     * @throws IOException
     */
    @Test
    public void getMapping() throws IOException {
        GetMappingsRequest request = new GetMappingsRequest();
        request.indices("cs_index");
        request.types("_doc");
        GetMappingsResponse response = client.indices().getMapping(request, RequestOptions.DEFAULT);
        System.out.println(response.toString());
    }
}
  • 新增索引對映欄位

@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(classes = ElasticSearchApplication.class)
public class IndexJunit {
    @Autowired
    private RestHighLevelClient client;
    /**
     * 新增索引對映欄位
     * @throws IOException
     */
    @Test
    public void addMapping() throws IOException {
        PutMappingRequest request = new PutMappingRequest();
        request.indices("cs_index");
        request.type("_doc");
        //新增欄位
        Map<String, Object> properties = new HashMap();
        properties.put("accountName", ImmutableBiMap.of("type", "keyword"));
        Map<String, Object> mapping = new HashMap<>();
        mapping.put("properties", properties);
        request.source(mapping);
        PutMappingResponse response = client.indices().putMapping(request, RequestOptions.DEFAULT);
        System.out.println(response.isAcknowledged());
    }
}

2.5、檔案管理

所謂檔案,就是向索引裡面新增資料,方便進行資料查詢,詳細操作內容,請看下文!

  • 新增檔案

public class UserDocument {
    private String id;
    private String name;
    private String sex;
    private Integer age;
    private String city;
    private Date createTime;

    //省略get、set...
}
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(classes = ElasticSearchApplication.class)
public class DocJunit {
    @Autowired
    private RestHighLevelClient client;
    /**
     * 新增檔案
     * @throws IOException
     */
    @Test
    public void addDocument() throws IOException {
        // 建立物件
        UserDocument user = new UserDocument();
        user.setId("1");
        user.setName("里斯");
        user.setCity("武漢");
        user.setSex("男");
        user.setAge(20);
        user.setCreateTime(new Date());
        // 建立索引,即獲取索引
        IndexRequest request = new IndexRequest();
        // 外層引數
        request.id("1");
        request.index("cs_index");
        request.type("_doc");
        request.timeout(TimeValue.timeValueSeconds(1));
        // 存入物件
        request.source(JSON.toJSONString(user), XContentType.JSON);
        // 傳送請求
        System.out.println(request.toString());
        IndexResponse response = client.index(request, RequestOptions.DEFAULT);
        System.out.println(response.toString());
    }
}
  • 更新檔案

@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(classes = ElasticSearchApplication.class)
public class DocJunit {
    @Autowired
    private RestHighLevelClient client;
    /**
     * 更新檔案(按需修改)
     * @throws IOException
     */
    @Test
    public void updateDocument() throws IOException {
        // 建立物件
        UserDocument user = new UserDocument();
        user.setId("2");
        user.setName("程咬金");
        user.setCreateTime(new Date());
        // 建立索引,即獲取索引
        UpdateRequest request = new UpdateRequest();
        // 外層引數
        request.id("2");
        request.index("cs_index");
        request.type("_doc");
        request.timeout(TimeValue.timeValueSeconds(1));
        // 存入物件
        request.doc(JSON.toJSONString(user), XContentType.JSON);
        // 傳送請求
        System.out.println(request.toString());
        UpdateResponse response = client.update(request, RequestOptions.DEFAULT);
        System.out.println(response.toString());
    }
}
  • 刪除檔案

@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(classes = ElasticSearchApplication.class)
public class DocJunit {
    @Autowired
    private RestHighLevelClient client;
    /**
     * 刪除檔案
     * @throws IOException
     */
    @Test
    public void deleteDocument() throws IOException {
        // 建立索引,即獲取索引
        DeleteRequest request = new DeleteRequest();
        // 外層引數
        request.id("1");
        request.index("cs_index");
        request.type("_doc");
        request.timeout(TimeValue.timeValueSeconds(1));
        // 傳送請求
        System.out.println(request.toString());
        DeleteResponse response = client.delete(request, RequestOptions.DEFAULT);
        System.out.println(response.toString());
    }
}
  • 查詢檔案是不是存在

@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(classes = ElasticSearchApplication.class)
public class DocJunit {
    @Autowired
    private RestHighLevelClient client;
    /**
     * 查詢檔案是不是存在
     * @throws IOException
     */
    @Test
    public void exists() throws IOException {
        // 建立索引,即獲取索引
        GetRequest request = new GetRequest();
        // 外層引數
        request.id("3");
        request.index("cs_index");
        request.type("_doc");
        // 傳送請求
        System.out.println(request.toString());
        boolean response = client.exists(request, RequestOptions.DEFAULT);
        System.out.println(response);
    }
}
  • 通過 ID 查詢指定檔案

@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(classes = ElasticSearchApplication.class)
public class DocJunit {
    @Autowired
    private RestHighLevelClient client;
    /**
     * 通過ID,查詢指定檔案
     * @throws IOException
     */
    @Test
    public void getById() throws IOException {
        // 建立索引,即獲取索引
        GetRequest request = new GetRequest();
        // 外層引數
        request.id("1");
        request.index("cs_index");
        request.type("_doc");
        // 傳送請求
        System.out.println(request.toString());
        GetResponse response = client.get(request, RequestOptions.DEFAULT);
        System.out.println(response.toString());
    }
}
  • 批次新增檔案

@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(classes = ElasticSearchApplication.class)
public class DocJunit {
    @Autowired
    private RestHighLevelClient client;
    /**
     * 批次新增檔案
     * @throws IOException
     */
    @Test
    public void batchAddDocument() throws IOException {
        // 批次請求
        BulkRequest bulkRequest = new BulkRequest();
        bulkRequest.timeout(TimeValue.timeValueSeconds(10));
        // 建立物件
        List<UserDocument> userArrayList = new ArrayList<>();
        userArrayList.add(new UserDocument("張三", "男", 30, "武漢"));
        userArrayList.add(new UserDocument("里斯", "女", 31, "北京"));
        userArrayList.add(new UserDocument("王五", "男", 32, "武漢"));
        userArrayList.add(new UserDocument("趙六", "女", 33, "長沙"));
        userArrayList.add(new UserDocument("七七", "男", 34, "武漢"));
        // 新增請求
        for (int i = 0; i < userArrayList.size(); i++) {
            userArrayList.get(i).setId(String.valueOf(i));
            IndexRequest indexRequest = new IndexRequest();
            // 外層引數
            indexRequest.id(String.valueOf(i));
            indexRequest.index("cs_index");
            indexRequest.type("_doc");
            indexRequest.timeout(TimeValue.timeValueSeconds(1));
            indexRequest.source(JSON.toJSONString(userArrayList.get(i)), XContentType.JSON);
            bulkRequest.add(indexRequest);
        }
        // 執行請求
        BulkResponse response = client.bulk(bulkRequest, RequestOptions.DEFAULT);
        System.out.println(response.status());
    }
}

三、小結

本文主要圍繞 SpringBoot 整合 ElasticSearch 接受資料的插入和搜尋使用技巧,在實際的使用過程中,版本號尤其的重要,不同版本的 es,對應的 api 是不一樣的。

到此這篇關於SpringBoot 整合 Elasticsearch 實現海量級資料搜尋的文章就介紹到這了,更多相關SpringBoot 整合 Elasticsearch 資料搜尋內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!


IT145.com E-mail:sddin#qq.com