ElasticSearch:SpringBoot整合(批量数据的增删改查)

郎家岭伯爵 2024年05月07日 374次浏览

前言

写一下 ElasticSearch 在 SpringBoot 中的整合,本文实现在 ES 中的批量数据的增删改查功能。

实现

本文代码是在上一篇的基础上进行的,因此不再赘述 SpringBoot 创建 ElasticSearch 连接部分的代码。

package com.langjialing.helloworld.controlle1;

import cn.hutool.json.JSONObject;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.io.IOException;
import java.util.List;

/**
 * @author 郎家岭伯爵
 * @time 2024年5月7日13:41:00
 */
@RestController
@RequestMapping("/es1")
public class EsController1 {

    /**
     * Elasticsearch客户端
     */
    @Autowired
    private RestHighLevelClient client;

    public void bulkIndexDocuments(String indexName, List<JSONObject> documents) throws IOException {
        BulkRequest bulkRequest = new BulkRequest();

        for (JSONObject document : documents) {
            IndexRequest indexRequest = new IndexRequest(indexName)
                    .id(document.getStr("id"))
                    .source(document);

            bulkRequest.add(indexRequest);
        }

        // 处理批量索引响应
        BulkResponse bulkResponse = client.bulk(bulkRequest, RequestOptions.DEFAULT);
        
        // 判断批量写入的数据中是否有错误。如果有错误,则遍历每个响应,找到错误的响应并输出信息
        if (bulkResponse.hasFailures()) {
            // 检查是否有失败的写入操作
            for (BulkItemResponse bulkItemResponse : bulkResponse) {
                if (bulkItemResponse.isFailed()) {
                    BulkItemResponse.Failure failure = bulkItemResponse.getFailure();
                    // 获取失败的文档ID、失败的原因等信息
                    String documentId = failure.getId();
                    String failureMessage = failure.getMessage();
                    System.out.println("Document with ID " + documentId + " failed: " + failureMessage);
                }
            }
        } else {
            System.out.println("All documents were successfully indexed.");
        }
    }

    public void bulkUpdateDocuments(String indexName, List<JSONObject> documents) throws IOException {
        BulkRequest bulkRequest = new BulkRequest();

        for (JSONObject document : documents) {
            UpdateRequest updateRequest = new UpdateRequest(indexName, document.getStr("id"))
                    // doc方法,更新指定的字段而不是整个文档
                    // 例如更新user的name字段,那么应该传入 {"user":{"name":"郎家岭"}}
                    // 如果需要更新多个字段,则传入Map对象,在map对象中存入多个值
                    .doc(document)
                    // upsert方法:不调用此方法的情况下,如果指定的文档ID不存在,es会返回404错误;
                    // 如果调用此方法,如果指定的文档ID不存在,则会新增一个文档,不会返回错误
                    .upsert(document);

            bulkRequest.add(updateRequest);
        }

        // 处理批量更新响应
        BulkResponse bulkResponse = client.bulk(bulkRequest, RequestOptions.DEFAULT);
    }

    public void bulkDeleteDocuments(String indexName, List<String> documentIds) throws IOException {
        BulkRequest bulkRequest = new BulkRequest();

        for (String documentId : documentIds) {
            DeleteRequest deleteRequest = new DeleteRequest(indexName, documentId);
            bulkRequest.add(deleteRequest);
        }

        // 处理批量删除响应
        BulkResponse bulkResponse = client.bulk(bulkRequest, RequestOptions.DEFAULT);
    }

    /*

        <!-- MyMapper.xml -->
        <update id="batchUpdateData" parameterType="java.util.List">
            <foreach collection="paramsList" item="params" separator=";">
                UPDATE my_table
                <set>
                    <foreach collection="params.keySet()" item="key" separator=",">
                        <if test="key != 'id' and key != 'otherField'"> <!-- 排除不需要更新的字段 -->
                            ${key} = #{params.${key}}
                        </if>
                    </foreach>
                </set>
                WHERE id = #{params.id}
                AND otherField = #{params.otherField} <!-- 构建 WHERE 条件 -->
            </foreach>
        </update>

     */

    public void simpleQuery(String id) throws IOException {
        // 构建查询请求
        SearchRequest searchRequest = new SearchRequest("your_index");
        searchRequest.source(buildQuery(id));

        // 执行查询请求
        SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);

        // 处理查询结果
        // ...
    }

    /**
     * 根据_id或_source下的第一层数据构建查询条件
     * @param id id
     * @return SearchSourceBuilder
     */
    private SearchSourceBuilder buildQuery(String id) {
        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();

        // 根据_id查询
        searchSourceBuilder.query(QueryBuilders.idsQuery().addIds(id));

        // 或者根据_source下的第一层数据查询。如果查询name的下一层,即改为:name.firstname.keyword
         searchSourceBuilder.query(QueryBuilders.termQuery("name.keyword", "value"));

        return searchSourceBuilder;
    }
}

注:

  • 本文中的代码没有写供测试使用的方法,大家可自行创建 Controller 或测试类进行功能测试

  • 其中有一段 mapper.xml 代码,使用得十分巧妙,贴一下供参考。

总结

实现下 ElasticSearch 在 SpringBoot 中的批量数据的增删改查功能。