ElasticSearch:Logstash同步MySQL和ElasticSearch数据

郎家岭伯爵 2024年05月22日 558次浏览

前言

前面我们搭建了 ElasticSearch 和 kibana 服务,完整的 ELK 还有 Logstash 服务,本文我们将搭建 Logstash 服务,用于把 MySQL 数据同步到 ElasticSearch。

实现

下载镜像

# 下载 Logstash 镜像
docker pull logstash:7.6.2

注:

  • Logstash 版本与 Elasticsearch 和 Kibana 保持一致,这里我们使用 7.6.2 版本。在前文中使用的 7.14.0 版本(由于本人更换了运行环境,导致没有跟前面的版本保持一致,大家使用时注意)。

下载JDBC驱动

在 Oracle 官方下载 JDBC 驱动,后续需要使用。

这里我下载的是 mysql-connector-java-8.0.27.jar 版本的驱动。

Logstash数据处理管道配置文件(pipeline)

这里我把 Logstash 的配置文件放在宿主机,创建 Docker 容器时把文件夹挂载在容器里(直接写在容器里当然也是可以的)。

input {
  jdbc {
    jdbc_connection_string => "jdbc:mysql://172.17.0.5:3306/test"
    jdbc_user => "root"
    jdbc_password => "123456"
    jdbc_driver_library => "/usr/share/logstash/mysql-connector-java-8.0.27.jar"
    jdbc_driver_class => "com.mysql.cj.jdbc.Driver" # 8.x版本的驱动类名
    # jdbc_driver_class => "com.mysql.jdbc.Driver"  # 5.x 版本的驱动类名
	
    jdbc_paging_enabled => "true"
    jdbc_fetch_size => "2000" # 每次查2000后即插入ES,避免OOM
    jdbc_page_size => "2000"
	
    # 这里类似crontab(但不完全是,还是有区别的),可以定制定时操作,比如每10分钟执行一次同步
    schedule => "0 */10 * * * *"  # 每10分钟运行一次。注意频率不要太高
	
    # mysql文件, 也可以直接写SQL语句在此处,如下:
    # statement_filepath => "/usr/share/logstash/mysql/mysql-to-es.sql"
    statement => "SELECT
			* 
		FROM
			user202405"
	
    # 是否将字段名转换为小写,默认true(如果有数据序列化、反序列化需求,建议改为false)
    lowercase_column_names => false
    # 处理中文乱码问题
    codec => plain { charset => "UTF-8" }
  }
}

output {
  elasticsearch {
    hosts => ["http://172.17.0.2:9200"]
    index => "%{user_index}"
    document_id => "%{user_id}"
    user => "elastic"
    password => "123456"
  }
}

input 部分具体参数说明:

  • jdbc_connection_string:JDBC 连接字符串,用于连接 MySQL 数据库。这里指定了 IP 地址 172.17.0.5(注意这里是 Docker 内部 IP,可通过 docker inspect container_id 命令查看 Docker 内部 IP)。
  • jdbc_user:连接数据库的用户名。
  • jdbc_password:连接数据库的密码。
  • jdbc_driver_library:指定 JDBC 驱动的 jar 文件路径。
  • jdbc_driver_class:JDBC 驱动类,这里使用的是 com.mysql.cj.jdbc.Driver
  • jdbc_paging_enabled:启用分页查询,这里设置为 true
  • jdbc_fetch_size:每次查询返回的数据量,这里设置为 2000,以避免内存溢出(OOM)。
  • jdbc_page_size:每页查询的数据量,这里设置为 2000。
  • schedule:定时任务的调度设置,类似于 Crontab 表达式。
  • statement:SQL查询语句(也可以使用 statement_filepath 指定 SQL 文件的地址)。
  • lowercase_column_names:是否将字段名转换为小写,这里设置为 false。
  • codec:指定字符编码,这里设置为 UTF-8,以处理中文乱码问题。

output 部分具体参数说明:

  • hosts:Elasticsearch 集群的地址。
  • index:指定 Elasticsearch 索引的名称,这里使用了动态索引名 %{user_index},表示根据 MySQL 数据表中的 user_index 字段来决定索引名称。
  • document_id:指定 Elasticsearch 文档的 ID,这里使用了动态ID %{user_id},表示根据 MySQL 数据表中的 user_id 字段来决定文档 ID。
  • user:连接 Elasticsearch 的用户名(如果 Elasticsearch 没有设置,这个配置项可为空)。
  • password:连接 Elasticsearch 的密码(如果 Elasticsearch 没有设置,这个配置项可为空)。

可能踩的坑

坑一:配置项空格问题

在配置文件中,务必注意不要多写空格,这里仅有一个空格!否则 Logstash 会启动报错。

踩坑后的报错日志

Failed to execute action {:action=>LogStash::PipelineAction::Create/pipeline_id:main, :exception=>"LogStash::ConfigurationError", :message=>"Expected one of [ \\t\\r\\n], \"#\", \"if\", [A-Za-z0-9_-], '\"', \"'\", \"}\" at line 60, column 1 (byte 1371) after output {\n  elasticsearch {\n    hosts => [\"http://172.17.0.2:9200\"]\n    user => \"elastic\"\n    password => \"123456\"\n    index => \"%{user_index}\"\n    document_id  => \"%{user_id}\"\n}\n", :backtrace=>["/usr/share/logstash/logstash-core/lib/logstash/compiler.rb:41:in `compile_imperative'", "/usr/share/logstash/logstash-core/lib/logstash/compiler.rb:49:in `compile_graph'", "/usr/share/logstash/logstash-core/lib/logstash/compiler.rb:11:in `block in compile_sources'", "org/jruby/RubyArray.java:2580:in `map'", "/usr/share/logstash/logstash-core/lib/logstash/compiler.rb:10:in `compile_sources'", "org/logstash/execution/AbstractPipelineExt.java:161:in `initialize'", "org/logstash/execution/JavaBasePipelineExt.java:47:in `initialize'", "/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:27:in `initialize'", "/usr/share/logstash/logstash-core/lib/logstash/pipeline_action/create.rb:36:in `execute'", "/usr/share/logstash/logstash-core/lib/logstash/agent.rb:326:in `block in converge_state'"]}

坑二:MySQL版本问题

在 MySQL 的驱动设置中,MySQL8.xMySQL5.x 版本的驱动类名是不同的

5.x 版本驱动类名

jdbc_driver_class => "com.mysql.jdbc.Driver"  # 5.x 版本的驱动类名

8.x 版本驱动类名

jdbc_driver_class => "com.mysql.cj.jdbc.Driver"

踩坑后的报错日志

Error: com.mysql.jdbc.Driver not loaded. Are you sure you’ve included the correct jdbc driver in :jdbc_driver_library?

创建容器并启动

# 创建Logstash容器并启动
docker run -d \
  -v /home/langjialing/Documents/logstash/logstash.conf:/usr/share/logstash/pipeline/logstash.conf \
  -v /home/langjialing/Documents/logstash/mysql-connector-java-8.0.27.jar:/usr/share/logstash/mysql-connector-java-8.0.27.jar \
  --name logstash_langjialing logstash:7.6.2

注:

  • 这里注意有两个挂载文件夹,把 Logstash 数据处理管道配置文件和 JDBC 的驱动包映射到了容器内部。挂载路径根据自己的实际路径来修改。

Logstash配置文件

在 Logstash 的安装目录下找到 config/logstash.yml 文件,添加如下配置:

http.host: "0.0.0.0"
xpack.monitoring.enabled: true
xpack.monitoring.elasticsearch.hosts: ["http://172.17.0.2:9200"]  #es地址
xpack.monitoring.elasticsearch.username: "elastic"  #es xpack账号密码
xpack.monitoring.elasticsearch.password: "123456"     #es xpack账号密码
path.config: /usr/share/logstash/pipeline/*.conf
path.logs: /usr/share/logstash/logs

具体参数说明:

  • http.host:指定 Logstash HTTP 服务监听的主机地址。设置为 “0.0.0.0” 意味着 Logstash 将监听所有可用的网络接口,可以从任何 IP 地址访问。
  • xpack.monitoring.enabled:启用 X-Pack 监控功能,设置为 true 表示开启。
  • xpack.monitoring.elasticsearch.hosts:指定 Elasticsearch 集群的地址,用于存储和查看监控数据。
  • xpack.monitoring.elasticsearch.username:用于连接 Elasticsearch 的用户名(如果没有设置,这里可为空)。
  • xpack.monitoring.elasticsearch.password:用于连接 Elasticsearch 的密码(如果没有设置,这里可为空)。
  • path.config:指定 Logstash 管道配置文件的路径。这里是 /usr/share/logstash/pipeline/*.conf,表示加载该目录下所有以 .conf 结尾的配置文件
  • path.logs:指定 Logstash 日志文件的存储路径。

可通过如下命令查看容器实时日志

# 查看容器日志。类似于 tail -f 命令
docker logs -f logstash_langjialing

遗留问题点

日志文件问题

这里 path.logs 的配置项,我个人搭建的服务并没有生成相应的日志文件(可能是文件操作权限的问题)。

报错日志

输出的日志中存在如下报错,在网上查询了一些资料,但没有解决掉。

[2024-05-22T06:20:52,565][ERROR][logstash.licensechecker.licensereader] Unable to retrieve license information from license server {:message=>"No Available connections"}
[2024-05-22T06:21:00,130][INFO ][logstash.inputs.jdbc     ][main] (0.002218s) SELECT version()
[2024-05-22T06:21:00,137][INFO ][logstash.inputs.jdbc     ][main] (0.002458s) SELECT version()
[2024-05-22T06:21:00,170][INFO ][logstash.inputs.jdbc     ][main] (0.015133s) SELECT count(*) AS `count` FROM (SELECT
                        *
                FROM
                        user202405) AS `t1` LIMIT 1
[2024-05-22T06:21:00,177][INFO ][logstash.inputs.jdbc     ][main] (0.001708s) SELECT * FROM (SELECT
                        *
                FROM
                        user202405) AS `t1` LIMIT 2000 OFFSET 0
[2024-05-22T06:21:03,831][WARN ][logstash.licensechecker.licensereader] Attempted to resurrect connection to dead ES instance, but got an error. {:url=>"http://elasticsearch:9200/", :error_type=>LogStash::Outputs::ElasticSearch::HttpClient::Pool::HostUnreachableError, :error=>"Elasticsearch Unreachable: [http://elasticsearch:9200/][Manticore::ResolutionFailure] elasticsearch: Name or service not known"}

遗留问题点解决

日志文件问题解决

本人测试在 Linux 中是可以创建出日志文件的。因此这里就是 Docker 中文件操作权限的问题,上面的配置项没有问题。

报错日志解决

问题点:

在以上示例中,我设置的同步周期为每10秒同步一次(schedule => "*/10 * * * * *" # 每10S运行一次),这个同步频率太大,导致 ElasticSearch 无法及时响应 Logstash 的请求。

问题解决:

找到问题解决就很容易了。这里我把同步周期改为每10分钟同步一次(schedule => "* */10 * * * *" # 每10分钟运行一次),报错日志自然而然就没有了。

结果验证

在 Kibana 中验证数据:http://192.168.177.128:5601/

总结

搭建 Logstash,实现使用 MySQL 把数据同步到 ElasticSearch。