前言
前面实现了使用 Logstash 把 geo_point
坐标点数据从 MySQL 同步到了 ElasticSearch,本文我们继续介绍使用 Logstash 把 geo_shape
地理图形数据从 MySQL 同步到 ElasticSearch。
实现
我们在前文的表结构上增加一个 geo_json
字段,用来保存地理图形的 geo_shape
数据。
注:
- MySQL 中的
geo_json
保存的是字符串格式的geo_shape
。也就是说,geo_json 中的数据完全符合 ElasticSearch 中的 geo_shape 形式,只是类型不同。
创建索引并指定字段映射
-- 创建索引并指定字段映射
PUT langjialing_index
{
"settings": {
"index": {
"number_of_shards": 2, // 将分片数设置为2,一般为 节点数 × 2
"number_of_replicas": 1 // 将副本数设置为1,一般为 节点数 × 1
}
},
"mappings": {
"properties": {
"geo_point": {
"type": "geo_point"
},
"geo_json": {
"type": "geo_shape"
}
}
}
}
Logstash数据处理管道配置文件(pipeline)
这里我们贴出如下配置文件中的核心部分:
filter {
json {
source => "geo_json"
target => "geo_json"
}
}
说明:
- 使用 Logstash 的
json过滤器
来解析 geo_json 字段中的 JSON 字符串,并将其转化为 JSON 对象。 source
: 指定要解析的字段,这个字段包含 JSON 字符串。target
: 指定解析后的 JSON 对象应该放置到哪个字段中。如果 target 参数和 source 参数相同,表示将解析后的 JSON 对象直接覆盖原有的 JSON 字符串字段。
完整配置代码:
input {
jdbc {
jdbc_connection_string => "jdbc:mysql://172.17.0.5:3306/test"
jdbc_user => "root"
jdbc_password => "123456"
jdbc_driver_library => "/usr/share/logstash/mysql-connector-java-8.0.27.jar"
jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
jdbc_paging_enabled => "true"
jdbc_fetch_size => "2000" # 每次查2000后即插入ES,避免OOM。下边的SQL部分注意添加 ORDER BY,避免遗漏数据或者重复数据。
jdbc_page_size => "2000"
# 这里类似crontab(但不完全是,还是有区别的),可以定制定时操作,比如每10分钟执行一次同步
schedule => "0 */10 * * * *" # 每10分钟运行一次。注意频率不要太高
# mysql文件, 也可以直接写SQL语句在此处,如下:
# statement_filepath => "/usr/share/logstash/mysql/mysql-to-es.sql"
statement => "SELECT
user_id,
user_index,
user_name,
PASSWORD,
age,
geo_lon,
geo_lat,
CREATE_time,
update_time
FROM
user202405
ORDER BY user_id"
# 是否将字段名转换为小写,默认true(如果有数据序列化、反序列化需求,建议改为false)
lowercase_column_names => false
# 处理中文乱码问题
codec => plain { charset => "UTF-8" }
}
}
filter {
json {
source => "geo_json"
target => "geo_json"
}
}
output {
elasticsearch {
hosts => ["http://172.17.0.2:9200"]
user => "elastic"
password => "123456"
index => "%{user_index}"
document_id => "%{user_id}"
}
}
这里我就偷个懒,不写验证部分的内容了。
总结
记录下 Logstash 从 MySQL 向 ElasticSearch 中同步 geo_shape
类型的数据的操作。