前言
前面写过几篇使用 Logstash 把数据从 MySQL 同步到 ElasticSearch 的文章,在那几篇文章中,同步的数据量较小时没有问题,但在同步大量数据时会出现深分页问题。
本文我们即是要解决深分页问题。
需要注意的是,深分页问题并不是 Logstash 或者 ElasticSearch 引起的,而是从 MySQL 中读取数据时,当数据量很大时会出现深分页问题。
实现
Logstash的pipeline文件优化
本文的解决方案是优化 Logstash 的 pipeline 文件。我们以这个文件为例来进行优化。
- 首先我们注释掉这段,不再使用 Logstash 的分页功能:
jdbc_paging_enabled => "true"
jdbc_fetch_size => "2000" # 每次查2000后即插入ES,避免OOM。下面的SQL部分添加 ORDER BY,避免遗漏数据或者重复刷数据。
jdbc_page_size => "2000"
- 增加如下代码段,生成根据主键 ID 来进行分页的动态 SQL:
use_column_value => true # 设置为true,表示使用指定的列(在此例中为 user_id)来跟踪状态
tracking_column => "user_id" # 指定跟踪的列
tracking_column_type => "numeric" # 明确跟踪的列为数字类型。默认即为数字类型,因此不必要设置
last_run_metadata_path => "/path/to/logstash_last_run_metadata.yml" # 存储sql_last_value的值
- 修改 SQL 语句,增加
user_id > :sql_last_value
动态 SQL 部分,以及增加LIMIT 2000
,确保每次获取到数据(注意 SQL 语句中一定需要有上一步中跟踪的数据列,否则无法获取到正确的值进行跟踪):
SELECT
user_id,
user_index,
user_name,
PASSWORD,
age,
geo_lon,
geo_lat,
CREATE_time,
update_time
FROM
user202405
WHERE
user_id > :sql_last_value
ORDER BY user_id
LIMIT 2000
完整配置文件如下:
input {
jdbc {
jdbc_connection_string => "jdbc:mysql://172.17.0.5:3306/test"
jdbc_user => "root"
jdbc_password => "123456"
jdbc_driver_library => "/usr/share/logstash/mysql-connector-java-8.0.27.jar"
jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
# 注释掉Logstash的分页设置
# jdbc_paging_enabled => "true"
# jdbc_fetch_size => "2000" # 每次查2000后即插入ES,避免OOM。下面的SQL部分添加 ORDER BY,避免遗漏数据或者重复刷数据。
# jdbc_page_size => "2000"
use_column_value => true # 设置为true,表示使用指定的列(在此例中为 user_id)来跟踪状态
tracking_column => "user_id" # 指定跟踪的列
tracking_column_type => "numeric" # 明确跟踪的列为数字类型。默认即为数字类型,因此不必要设置
last_run_metadata_path => "/path/to/logstash_last_run_metadata.yml" # 存储sql_last_value的值
# 这里类似crontab(但不完全是,还是有区别的),可以定制定时操作,比如每10分钟执行一次同步
schedule => "*/20 * * * * *" # 每20秒运行一次,每次最多同步2000条数据(SQL中配置的LIMIT为2000,user_id连续有值时可同步最大值)
# mysql文件, 也可以直接写SQL语句在此处,如下:
# statement_filepath => "/usr/share/logstash/mysql/mysql-to-es.sql"
statement => "SELECT
user_id,
user_index,
user_name,
PASSWORD,
age,
geo_lon,
geo_lat,
CREATE_time,
update_time
FROM
user202405
WHERE
user_id > :sql_last_value
ORDER BY user_id
LIMIT 2000"
# 是否将字段名转换为小写,默认true(如果有数据序列化、反序列化需求,建议改为false)
lowercase_column_names => false
# 处理中文乱码问题
codec => plain { charset => "UTF-8" }
}
}
filter {
mutate {
add_field => {
"[geo_point][lat]" => "%{geo_lat}"
"[geo_point][lon]" => "%{geo_lon}"
}
convert => {
"[geo_point][lat]" => "float"
"[geo_point][lon]" => "float"
}
remove_field => ["geo_lat", "geo_lon"]
}
}
output {
elasticsearch {
hosts => ["http://172.17.0.2:9200"]
user => "elastic"
password => "123456"
index => "%{user_index}"
document_id => "%{user_id}"
}
}
注:
- 使用
last_run_metadata_path
跟踪记录上次执行的位置,需要注意在设置定时时间时,每到一次时间点仅能执行一次。例如:本案例中的时间设置为schedule => "*/20 * * * * *"
,每20秒执行一次数据同步,也就是说每20秒同步2000条数据(如果指定某一时间执行,例如schedule => "0 30 9 3 12 *"
,则仅能在12月3日09:30:00时同步2000条数据)。 - 本方案适用于单次同步大数据量。因为配置了
last_run_metadata_path
,在每次执行时跟踪的列ID都会加2000,后续的执行跟踪列ID只增不减,同步过的数据无法被再次同步。
关于深分页问题
以上方案的核心在于使用主键索引,避免二级索引产生的大量回表次数引起的性能问题。
关于深分页问题,这里有一篇文章写的非常好,可供大家学习参考。
总结
Logstash 同步大数据量时出现的深分页问题解决。