ElasticSearch:Logstash之大数据量同步中的深分页问题

郎家岭伯爵 2024年09月11日 264次浏览

前言

前面写过几篇使用 Logstash 把数据从 MySQL 同步到 ElasticSearch 的文章,在那几篇文章中,同步的数据量较小时没有问题,但在同步大量数据时会出现深分页问题。

本文我们即是要解决深分页问题。

需要注意的是,深分页问题并不是 Logstash 或者 ElasticSearch 引起的,而是从 MySQL 中读取数据时,当数据量很大时会出现深分页问题。

实现

Logstash的pipeline文件优化

本文的解决方案是优化 Logstash 的 pipeline 文件。我们以这个文件为例来进行优化。

  1. 首先我们注释掉这段,不再使用 Logstash 的分页功能:
jdbc_paging_enabled => "true"
jdbc_fetch_size => "2000" # 每次查2000后即插入ES,避免OOM。下面的SQL部分添加 ORDER BY,避免遗漏数据或者重复刷数据。
jdbc_page_size => "2000"
  1. 增加如下代码段,生成根据主键 ID 来进行分页的动态 SQL:
use_column_value => true	# 设置为true,表示使用指定的列(在此例中为 user_id)来跟踪状态
tracking_column => "user_id"	# 指定跟踪的列
tracking_column_type => "numeric"  # 明确跟踪的列为数字类型。默认即为数字类型,因此不必要设置
last_run_metadata_path => "/path/to/logstash_last_run_metadata.yml"	# 存储sql_last_value的值
  1. 修改 SQL 语句,增加 user_id > :sql_last_value 动态 SQL 部分,以及增加 LIMIT 2000,确保每次获取到数据(注意 SQL 语句中一定需要有上一步中跟踪的数据列,否则无法获取到正确的值进行跟踪):
SELECT
	user_id,
	user_index,
	user_name,
	PASSWORD,
	age,
	geo_lon,
	geo_lat,
	CREATE_time,
	update_time 
FROM
	user202405
WHERE
	user_id > :sql_last_value
ORDER BY user_id
LIMIT 2000

完整配置文件如下:

input {
  jdbc {
    jdbc_connection_string => "jdbc:mysql://172.17.0.5:3306/test"
    jdbc_user => "root"
    jdbc_password => "123456"
    jdbc_driver_library => "/usr/share/logstash/mysql-connector-java-8.0.27.jar"
    jdbc_driver_class => "com.mysql.cj.jdbc.Driver"

	# 注释掉Logstash的分页设置
    # jdbc_paging_enabled => "true"
    # jdbc_fetch_size => "2000" # 每次查2000后即插入ES,避免OOM。下面的SQL部分添加 ORDER BY,避免遗漏数据或者重复刷数据。
    # jdbc_page_size => "2000"
	
    use_column_value => true	# 设置为true,表示使用指定的列(在此例中为 user_id)来跟踪状态
    tracking_column => "user_id"	# 指定跟踪的列
    tracking_column_type => "numeric"  # 明确跟踪的列为数字类型。默认即为数字类型,因此不必要设置
    last_run_metadata_path => "/path/to/logstash_last_run_metadata.yml"	# 存储sql_last_value的值

    # 这里类似crontab(但不完全是,还是有区别的),可以定制定时操作,比如每10分钟执行一次同步
    schedule => "*/20 * * * * *"  # 每20秒运行一次,每次最多同步2000条数据(SQL中配置的LIMIT为2000,user_id连续有值时可同步最大值)

    # mysql文件, 也可以直接写SQL语句在此处,如下:
    # statement_filepath => "/usr/share/logstash/mysql/mysql-to-es.sql"
    statement => "SELECT
					user_id,
					user_index,
					user_name,
					PASSWORD,
					age,
					geo_lon,
					geo_lat,
					CREATE_time,
					update_time 
				FROM
					user202405
				WHERE
					user_id > :sql_last_value
				ORDER BY user_id
				LIMIT 2000"

    # 是否将字段名转换为小写,默认true(如果有数据序列化、反序列化需求,建议改为false)
    lowercase_column_names => false
    # 处理中文乱码问题
    codec => plain { charset => "UTF-8" }
  }
}

filter {
  mutate {
    add_field => {
      "[geo_point][lat]" => "%{geo_lat}"
      "[geo_point][lon]" => "%{geo_lon}"
    }
    convert => {
      "[geo_point][lat]" => "float"
      "[geo_point][lon]" => "float"
    }
    remove_field => ["geo_lat", "geo_lon"]
  }
}

output {
  elasticsearch {
    hosts => ["http://172.17.0.2:9200"]
    user => "elastic"
    password => "123456"
    index => "%{user_index}"
    document_id  => "%{user_id}"
  }
}

注:

  • 使用 last_run_metadata_path 跟踪记录上次执行的位置,需要注意在设置定时时间时,每到一次时间点仅能执行一次。例如:本案例中的时间设置为 schedule => "*/20 * * * * *",每20秒执行一次数据同步,也就是说每20秒同步2000条数据(如果指定某一时间执行,例如schedule => "0 30 9 3 12 *",则仅能在12月3日09:30:00时同步2000条数据)。
  • 本方案适用于单次同步大数据量。因为配置了 last_run_metadata_path,在每次执行时跟踪的列ID都会加2000,后续的执行跟踪列ID只增不减,同步过的数据无法被再次同步。

关于深分页问题

以上方案的核心在于使用主键索引,避免二级索引产生的大量回表次数引起的性能问题。

关于深分页问题,这里有一篇文章写的非常好,可供大家学习参考。

总结

Logstash 同步大数据量时出现的深分页问题解决。