前言
前面写过几篇使用 Logstash 把数据从 MySQL 同步到 ElasticSearch 的文章,在那几篇文章中,同步的数据量较小时没有问题,但在同步大量数据时会出现深分页问题。
本文我们即是要解决深分页问题。
需要注意的是,深分页问题并不是 Logstash 或者 ElasticSearch 引起的,而是从 MySQL 中读取数据时,当数据量很大时会出现深分页问题。
实现
Logstash的pipeline文件优化
本文的解决方案是优化 Logstash 的 pipeline 文件。我们以这个文件为例来进行优化。
- 首先我们注释掉这段,不再使用 Logstash 的分页功能:
- jdbc_paging_enabled => "true"
- jdbc_fetch_size => "2000" # 每次查2000后即插入ES,避免OOM。下面的SQL部分添加 ORDER BY,避免遗漏数据或者重复刷数据。
- jdbc_page_size => "2000"
- 增加如下代码段,生成根据主键 ID 来进行分页的动态 SQL:
- use_column_value => true # 设置为true,表示使用指定的列(在此例中为 user_id)来跟踪状态
- tracking_column => "user_id" # 指定跟踪的列
- tracking_column_type => "numeric" # 明确跟踪的列为数字类型。默认即为数字类型,因此不必要设置
- last_run_metadata_path => "/path/to/logstash_last_run_metadata.yml" # 存储sql_last_value的值
- 修改 SQL 语句,增加
user_id > :sql_last_value
动态 SQL 部分,以及增加LIMIT 2000
,确保每次获取到数据(注意 SQL 语句中一定需要有上一步中跟踪的数据列,否则无法获取到正确的值进行跟踪):
- SELECT
- user_id,
- user_index,
- user_name,
- PASSWORD,
- age,
- geo_lon,
- geo_lat,
- CREATE_time,
- update_time
- FROM
- user202405
- WHERE
- user_id > :sql_last_value
- ORDER BY user_id
- LIMIT 2000
完整配置文件如下:
- input {
- jdbc {
- jdbc_connection_string => "jdbc:mysql://172.17.0.5:3306/test"
- jdbc_user => "root"
- jdbc_password => "123456"
- jdbc_driver_library => "/usr/share/logstash/mysql-connector-java-8.0.27.jar"
- jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
- # 注释掉Logstash的分页设置
- # jdbc_paging_enabled => "true"
- # jdbc_fetch_size => "2000" # 每次查2000后即插入ES,避免OOM。下面的SQL部分添加 ORDER BY,避免遗漏数据或者重复刷数据。
- # jdbc_page_size => "2000"
- use_column_value => true # 设置为true,表示使用指定的列(在此例中为 user_id)来跟踪状态
- tracking_column => "user_id" # 指定跟踪的列
- tracking_column_type => "numeric" # 明确跟踪的列为数字类型。默认即为数字类型,因此不必要设置
- last_run_metadata_path => "/path/to/logstash_last_run_metadata.yml" # 存储sql_last_value的值
- # 这里类似crontab(但不完全是,还是有区别的),可以定制定时操作,比如每10分钟执行一次同步
- schedule => "*/20 * * * * *" # 每20秒运行一次,每次最多同步2000条数据(SQL中配置的LIMIT为2000,user_id连续有值时可同步最大值)
- # mysql文件, 也可以直接写SQL语句在此处,如下:
- # statement_filepath => "/usr/share/logstash/mysql/mysql-to-es.sql"
- statement => "SELECT
- user_id,
- user_index,
- user_name,
- PASSWORD,
- age,
- geo_lon,
- geo_lat,
- CREATE_time,
- update_time
- FROM
- user202405
- WHERE
- user_id > :sql_last_value
- ORDER BY user_id
- LIMIT 2000"
- # 是否将字段名转换为小写,默认true(如果有数据序列化、反序列化需求,建议改为false)
- lowercase_column_names => false
- # 处理中文乱码问题
- codec => plain { charset => "UTF-8" }
- }
- }
- filter {
- mutate {
- add_field => {
- "[geo_point][lat]" => "%{geo_lat}"
- "[geo_point][lon]" => "%{geo_lon}"
- }
- convert => {
- "[geo_point][lat]" => "float"
- "[geo_point][lon]" => "float"
- }
- remove_field => ["geo_lat", "geo_lon"]
- }
- }
- output {
- elasticsearch {
- hosts => ["http://172.17.0.2:9200"]
- user => "elastic"
- password => "123456"
- index => "%{user_index}"
- document_id => "%{user_id}"
- }
- }
注:
- 使用
last_run_metadata_path
跟踪记录上次执行的位置,需要注意在设置定时时间时,每到一次时间点仅能执行一次。例如:本案例中的时间设置为schedule => "*/20 * * * * *"
,每20秒执行一次数据同步,也就是说每20秒同步2000条数据(如果指定某一时间执行,例如schedule => "0 30 9 3 12 *"
,则仅能在12月3日09:30:00时同步2000条数据)。 - 本方案适用于单次同步大数据量。因为配置了
last_run_metadata_path
,在每次执行时跟踪的列ID都会加2000,后续的执行跟踪列ID只增不减,同步过的数据无法被再次同步。
关于深分页问题
以上方案的核心在于使用主键索引,避免二级索引产生的大量回表次数引起的性能问题。
关于深分页问题,这里有一篇文章写的非常好,可供大家学习参考。
总结
Logstash 同步大数据量时出现的深分页问题解决。