前言
前面搭建了使用 Logstash 把 MySQL 中数据同步到 ElasticSearch 中,不过同步的是文本类型的数据。
本文我们将在上文的基础上实现 ElasticSearch 中地理坐标点(geo_point
) 数据的同步。
实现
本文的运行环境基于上文,因此这里仅记录核心部分。
MySQL数据表结构
以下是待同步的 MySQL 的数据表结构。在这个表中,我们将把 MySQL 中的 geo_lon
、geo_lat
写入 ElasticSearch 中的 geo_point
字段。
CREATE TABLE `user202405` (
`user_id` int NOT NULL AUTO_INCREMENT COMMENT '用户ID',
`user_index` varchar(100) COLLATE utf8mb4_general_ci NOT NULL COMMENT '用户索引',
`user_name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '用户名',
`password` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '密码',
`age` tinyint DEFAULT NULL COMMENT '年龄',
`geo_lon` varchar(255) COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '经度',
`geo_lat` varchar(255) COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '纬度',
`status` tinyint NOT NULL DEFAULT '1' COMMENT '状态:1-正常;99-删除',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
PRIMARY KEY (`user_id`)
) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci COMMENT='用户信息表';
手动创建索引并指定字段类型
创建索引的 ES 语句:
PUT langjialing_index
{
"mappings": {
"properties": {
"geo_point": {
"type": "geo_point"
}
}
}
}
注:
- 在本例中,我们把 ElasticSearch 中
geo_point
类型的字段名称也设为geo_point
; - 在同步数据之前,需要确保 ElasticSearch 中已经创建了索引,并且索引中存在
geo_point
字段。否则 Logstash 会把geo_point
识别为普通对象。
Logstash数据处理管道配置文件(pipeline)
我们将在 Logstash 的数据处理管道配置文件中新增部分如下核心代码:
filter {
mutate {
add_field => {
"[geo_point][lat]" => "%{geo_lat}"
"[geo_point][lon]" => "%{geo_lon}"
}
convert => {
"[geo_point][lat]" => "float"
"[geo_point][lon]" => "float"
}
remove_field => ["geo_lat", "geo_lon"]
}
}
具体配置说明:
add_field
: 新增geo_point
字段,并将geo_lat
和geo_lon
的值分别赋给geo_point.lat
和geo_point.lon
。convert
: 将geo_point.lat
和geo_point.lon
转换为浮点数,以确保其符合 Elasticsearchgeo_point
类型的要求。remove_field
: 删除 Logstash 事件中的 geo_lat 和 geo_lon 字段,以避免在发送到 Elasticsearch 时存在冗余数据。
完整配置代码:
input {
jdbc {
jdbc_connection_string => "jdbc:mysql://172.17.0.5:3306/test"
jdbc_user => "root"
jdbc_password => "123456"
jdbc_driver_library => "/usr/share/logstash/mysql-connector-java-8.0.27.jar"
jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
jdbc_paging_enabled => "true"
jdbc_fetch_size => "2000" # 每次查2000后即插入ES,避免OOM。下面的SQL部分添加 ORDER BY,避免遗漏数据或者重复刷数据。
jdbc_page_size => "2000"
# 这里类似crontab(但不完全是,还是有区别的),可以定制定时操作,比如每10分钟执行一次同步
schedule => "0 */10 * * * *" # 每10分钟运行一次。注意频率不要太高
# mysql文件, 也可以直接写SQL语句在此处,如下:
# statement_filepath => "/usr/share/logstash/mysql/mysql-to-es.sql"
statement => "SELECT
user_id,
user_index,
user_name,
PASSWORD,
age,
geo_lon,
geo_lat,
CREATE_time,
update_time
FROM
user202405
ORDER BY user_id"
# 是否将字段名转换为小写,默认true(如果有数据序列化、反序列化需求,建议改为false)
lowercase_column_names => false
# 处理中文乱码问题
codec => plain { charset => "UTF-8" }
}
}
filter {
mutate {
add_field => {
"[geo_point][lat]" => "%{geo_lat}"
"[geo_point][lon]" => "%{geo_lon}"
}
convert => {
"[geo_point][lat]" => "float"
"[geo_point][lon]" => "float"
}
remove_field => ["geo_lat", "geo_lon"]
}
}
output {
elasticsearch {
hosts => ["http://172.17.0.2:9200"]
user => "elastic"
password => "123456"
index => "%{user_index}"
document_id => "%{user_id}"
}
}
可能踩的坑
Logstash 的 pipeline
文件夹下可以同时配置多个 conf 配置文件,在配置了多个 conf 文件的情况下,仅需要在其中一个 conf 文件配置 filter 功能块即可。否则会出现异常。
在这里我踩的坑就是因为配置了多个 conf 文件,每个 conf 文件里都有 remove_field
语法块,导致出现了 parse_exception
异常。
其实理解了 Logstash 的执行逻辑排查问题也就简单了:在读取配置文件时,第一个文件中移除了 geo_lon 和 geo_lat 字段,第二个文件里还要再次移除;但由于已经移除了,第二次再去移除时必然是会有异常的。
filter {
mutate {
add_field => {
"[geo_point][lat]" => "%{geo_lat}"
"[geo_point][lon]" => "%{geo_lon}"
}
convert => {
"[geo_point][lat]" => "float"
"[geo_point][lon]" => "float"
}
remove_field => ["geo_lat", "geo_lon"]
}
}
结果验证
登录 kibana 查看同步过来的数据:
总结
记录下 Logstash 从 MySQL 向 ElasticSearch 中同步 geo_point
类型的数据的操作。