前言
写一下 ElasticSearch 在 SpringBoot 中的整合,本文实现在 ES 中的批量数据的增删改查功能。
实现
本文代码是在上一篇的基础上进行的,因此不再赘述 SpringBoot 创建 ElasticSearch 连接部分的代码。
package com.langjialing.helloworld.controlle1;
import cn.hutool.json.JSONObject;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.io.IOException;
import java.util.List;
/**
* @author 郎家岭伯爵
* @time 2024年5月7日13:41:00
*/
@RestController
@RequestMapping("/es1")
public class EsController1 {
/**
* Elasticsearch客户端
*/
@Autowired
private RestHighLevelClient client;
public void bulkIndexDocuments(String indexName, List<JSONObject> documents) throws IOException {
BulkRequest bulkRequest = new BulkRequest();
for (JSONObject document : documents) {
IndexRequest indexRequest = new IndexRequest(indexName)
.id(document.getStr("id"))
.source(document);
bulkRequest.add(indexRequest);
}
// 处理批量索引响应
BulkResponse bulkResponse = client.bulk(bulkRequest, RequestOptions.DEFAULT);
// 判断批量写入的数据中是否有错误。如果有错误,则遍历每个响应,找到错误的响应并输出信息
if (bulkResponse.hasFailures()) {
// 检查是否有失败的写入操作
for (BulkItemResponse bulkItemResponse : bulkResponse) {
if (bulkItemResponse.isFailed()) {
BulkItemResponse.Failure failure = bulkItemResponse.getFailure();
// 获取失败的文档ID、失败的原因等信息
String documentId = failure.getId();
String failureMessage = failure.getMessage();
System.out.println("Document with ID " + documentId + " failed: " + failureMessage);
}
}
} else {
System.out.println("All documents were successfully indexed.");
}
}
public void bulkUpdateDocuments(String indexName, List<JSONObject> documents) throws IOException {
BulkRequest bulkRequest = new BulkRequest();
for (JSONObject document : documents) {
UpdateRequest updateRequest = new UpdateRequest(indexName, document.getStr("id"))
// doc方法,更新指定的字段而不是整个文档
// 例如更新user的name字段,那么应该传入 {"user":{"name":"郎家岭"}}
// 如果需要更新多个字段,则传入Map对象,在map对象中存入多个值
.doc(document)
// upsert方法:不调用此方法的情况下,如果指定的文档ID不存在,es会返回404错误;
// 如果调用此方法,如果指定的文档ID不存在,则会新增一个文档,不会返回错误
.upsert(document);
bulkRequest.add(updateRequest);
}
// 处理批量更新响应
BulkResponse bulkResponse = client.bulk(bulkRequest, RequestOptions.DEFAULT);
}
public void bulkDeleteDocuments(String indexName, List<String> documentIds) throws IOException {
BulkRequest bulkRequest = new BulkRequest();
for (String documentId : documentIds) {
DeleteRequest deleteRequest = new DeleteRequest(indexName, documentId);
bulkRequest.add(deleteRequest);
}
// 处理批量删除响应
BulkResponse bulkResponse = client.bulk(bulkRequest, RequestOptions.DEFAULT);
}
/*
<!-- MyMapper.xml -->
<update id="batchUpdateData" parameterType="java.util.List">
<foreach collection="paramsList" item="params" separator=";">
UPDATE my_table
<set>
<foreach collection="params.keySet()" item="key" separator=",">
<if test="key != 'id' and key != 'otherField'"> <!-- 排除不需要更新的字段 -->
${key} = #{params.${key}}
</if>
</foreach>
</set>
WHERE id = #{params.id}
AND otherField = #{params.otherField} <!-- 构建 WHERE 条件 -->
</foreach>
</update>
*/
public void simpleQuery(String id) throws IOException {
// 构建查询请求
SearchRequest searchRequest = new SearchRequest("your_index");
searchRequest.source(buildQuery(id));
// 执行查询请求
SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
// 处理查询结果
// ...
}
/**
* 根据_id或_source下的第一层数据构建查询条件
* @param id id
* @return SearchSourceBuilder
*/
private SearchSourceBuilder buildQuery(String id) {
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
// 根据_id查询
searchSourceBuilder.query(QueryBuilders.idsQuery().addIds(id));
// 或者根据_source下的第一层数据查询。如果查询name的下一层,即改为:name.firstname.keyword
searchSourceBuilder.query(QueryBuilders.termQuery("name.keyword", "value"));
return searchSourceBuilder;
}
}
注:
-
本文中的代码没有写供测试使用的方法,大家可自行创建 Controller 或测试类进行功能测试。
-
其中有一段 mapper.xml 代码,使用得十分巧妙,贴一下供参考。
总结
实现下 ElasticSearch 在 SpringBoot 中的批量数据的增删改查功能。