ElasticSearch:Logstash之大数据量同步中的深分页问题

郎家岭伯爵 2024年09月11日 504次浏览

前言

前面写过几篇使用 Logstash 把数据从 MySQL 同步到 ElasticSearch 的文章,在那几篇文章中,同步的数据量较小时没有问题,但在同步大量数据时会出现深分页问题。

本文我们即是要解决深分页问题。

需要注意的是,深分页问题并不是 Logstash 或者 ElasticSearch 引起的,而是从 MySQL 中读取数据时,当数据量很大时会出现深分页问题。

实现

Logstash的pipeline文件优化

本文的解决方案是优化 Logstash 的 pipeline 文件。我们以这个文件为例来进行优化。

  1. 首先我们注释掉这段,不再使用 Logstash 的分页功能:
  1. jdbc_paging_enabled => "true"
  2. jdbc_fetch_size => "2000" # 每次查2000后即插入ES,避免OOM。下面的SQL部分添加 ORDER BY,避免遗漏数据或者重复刷数据。
  3. jdbc_page_size => "2000"
  1. 增加如下代码段,生成根据主键 ID 来进行分页的动态 SQL:
  1. use_column_value => true # 设置为true,表示使用指定的列(在此例中为 user_id)来跟踪状态
  2. tracking_column => "user_id" # 指定跟踪的列
  3. tracking_column_type => "numeric" # 明确跟踪的列为数字类型。默认即为数字类型,因此不必要设置
  4. last_run_metadata_path => "/path/to/logstash_last_run_metadata.yml" # 存储sql_last_value的值
  1. 修改 SQL 语句,增加 user_id > :sql_last_value 动态 SQL 部分,以及增加 LIMIT 2000,确保每次获取到数据(注意 SQL 语句中一定需要有上一步中跟踪的数据列,否则无法获取到正确的值进行跟踪):
  1. SELECT
  2. user_id,
  3. user_index,
  4. user_name,
  5. PASSWORD,
  6. age,
  7. geo_lon,
  8. geo_lat,
  9. CREATE_time,
  10. update_time
  11. FROM
  12. user202405
  13. WHERE
  14. user_id > :sql_last_value
  15. ORDER BY user_id
  16. LIMIT 2000

完整配置文件如下:

  1. input {
  2. jdbc {
  3. jdbc_connection_string => "jdbc:mysql://172.17.0.5:3306/test"
  4. jdbc_user => "root"
  5. jdbc_password => "123456"
  6. jdbc_driver_library => "/usr/share/logstash/mysql-connector-java-8.0.27.jar"
  7. jdbc_driver_class => "com.mysql.cj.jdbc.Driver"

  8. # 注释掉Logstash的分页设置
  9. # jdbc_paging_enabled => "true"
  10. # jdbc_fetch_size => "2000" # 每次查2000后即插入ES,避免OOM。下面的SQL部分添加 ORDER BY,避免遗漏数据或者重复刷数据。
  11. # jdbc_page_size => "2000"

  12. use_column_value => true # 设置为true,表示使用指定的列(在此例中为 user_id)来跟踪状态
  13. tracking_column => "user_id" # 指定跟踪的列
  14. tracking_column_type => "numeric" # 明确跟踪的列为数字类型。默认即为数字类型,因此不必要设置
  15. last_run_metadata_path => "/path/to/logstash_last_run_metadata.yml" # 存储sql_last_value的值

  16. # 这里类似crontab(但不完全是,还是有区别的),可以定制定时操作,比如每10分钟执行一次同步
  17. schedule => "*/20 * * * * *" # 每20秒运行一次,每次最多同步2000条数据(SQL中配置的LIMIT为2000,user_id连续有值时可同步最大值)

  18. # mysql文件, 也可以直接写SQL语句在此处,如下:
  19. # statement_filepath => "/usr/share/logstash/mysql/mysql-to-es.sql"
  20. statement => "SELECT
  21. user_id,
  22. user_index,
  23. user_name,
  24. PASSWORD,
  25. age,
  26. geo_lon,
  27. geo_lat,
  28. CREATE_time,
  29. update_time
  30. FROM
  31. user202405
  32. WHERE
  33. user_id > :sql_last_value
  34. ORDER BY user_id
  35. LIMIT 2000"

  36. # 是否将字段名转换为小写,默认true(如果有数据序列化、反序列化需求,建议改为false)
  37. lowercase_column_names => false
  38. # 处理中文乱码问题
  39. codec => plain { charset => "UTF-8" }
  40. }
  41. }

  42. filter {
  43. mutate {
  44. add_field => {
  45. "[geo_point][lat]" => "%{geo_lat}"
  46. "[geo_point][lon]" => "%{geo_lon}"
  47. }
  48. convert => {
  49. "[geo_point][lat]" => "float"
  50. "[geo_point][lon]" => "float"
  51. }
  52. remove_field => ["geo_lat", "geo_lon"]
  53. }
  54. }

  55. output {
  56. elasticsearch {
  57. hosts => ["http://172.17.0.2:9200"]
  58. user => "elastic"
  59. password => "123456"
  60. index => "%{user_index}"
  61. document_id => "%{user_id}"
  62. }
  63. }

注:

  • 使用 last_run_metadata_path 跟踪记录上次执行的位置,需要注意在设置定时时间时,每到一次时间点仅能执行一次。例如:本案例中的时间设置为 schedule => "*/20 * * * * *",每20秒执行一次数据同步,也就是说每20秒同步2000条数据(如果指定某一时间执行,例如schedule => "0 30 9 3 12 *",则仅能在12月3日09:30:00时同步2000条数据)。
  • 本方案适用于单次同步大数据量。因为配置了 last_run_metadata_path,在每次执行时跟踪的列ID都会加2000,后续的执行跟踪列ID只增不减,同步过的数据无法被再次同步。

关于深分页问题

以上方案的核心在于使用主键索引,避免二级索引产生的大量回表次数引起的性能问题。

关于深分页问题,这里有一篇文章写的非常好,可供大家学习参考。

总结

Logstash 同步大数据量时出现的深分页问题解决。