前言
前面我们搭建了 ElasticSearch 和 kibana 服务,完整的 ELK 还有 Logstash 服务,本文我们将搭建 Logstash 服务,用于把 MySQL 数据同步到 ElasticSearch。
实现
下载镜像
# 下载 Logstash 镜像
docker pull logstash:7.6.2
注:
- Logstash 版本与 Elasticsearch 和 Kibana 保持一致,这里我们使用
7.6.2
版本。在前文中使用的7.14.0
版本(由于本人更换了运行环境,导致没有跟前面的版本保持一致,大家使用时注意)。
下载JDBC驱动
在 Oracle 官方下载 JDBC 驱动,后续需要使用。
这里我下载的是 mysql-connector-java-8.0.27.jar
版本的驱动。
Logstash数据处理管道配置文件(pipeline)
这里我把 Logstash 的配置文件放在宿主机,创建 Docker 容器时把文件夹挂载在容器里(直接写在容器里当然也是可以的)。
input {
jdbc {
jdbc_connection_string => "jdbc:mysql://172.17.0.5:3306/test"
jdbc_user => "root"
jdbc_password => "123456"
jdbc_driver_library => "/usr/share/logstash/mysql-connector-java-8.0.27.jar"
jdbc_driver_class => "com.mysql.cj.jdbc.Driver" # 8.x版本的驱动类名
# jdbc_driver_class => "com.mysql.jdbc.Driver" # 5.x 版本的驱动类名
jdbc_paging_enabled => "true"
jdbc_fetch_size => "2000" # 每次查2000后即插入ES,避免OOM
jdbc_page_size => "2000"
# 这里类似crontab(但不完全是,还是有区别的),可以定制定时操作,比如每10分钟执行一次同步
schedule => "0 */10 * * * *" # 每10分钟运行一次。注意频率不要太高
# mysql文件, 也可以直接写SQL语句在此处,如下:
# statement_filepath => "/usr/share/logstash/mysql/mysql-to-es.sql"
statement => "SELECT
*
FROM
user202405"
# 是否将字段名转换为小写,默认true(如果有数据序列化、反序列化需求,建议改为false)
lowercase_column_names => false
# 处理中文乱码问题
codec => plain { charset => "UTF-8" }
}
}
output {
elasticsearch {
hosts => ["http://172.17.0.2:9200"]
index => "%{user_index}"
document_id => "%{user_id}"
user => "elastic"
password => "123456"
}
}
input
部分具体参数说明:
jdbc_connection_string
:JDBC 连接字符串,用于连接 MySQL 数据库。这里指定了 IP 地址 172.17.0.5(注意这里是 Docker 内部 IP,可通过docker inspect container_id
命令查看 Docker 内部 IP)。jdbc_user
:连接数据库的用户名。jdbc_password
:连接数据库的密码。jdbc_driver_library
:指定 JDBC 驱动的 jar 文件路径。jdbc_driver_class
:JDBC 驱动类,这里使用的是com.mysql.cj.jdbc.Driver
。jdbc_paging_enabled
:启用分页查询,这里设置为true
。jdbc_fetch_size
:每次查询返回的数据量,这里设置为 2000,以避免内存溢出(OOM)。jdbc_page_size
:每页查询的数据量,这里设置为 2000。schedule
:定时任务的调度设置,类似于 Crontab 表达式。statement
:SQL查询语句(也可以使用statement_filepath
指定 SQL 文件的地址)。lowercase_column_names
:是否将字段名转换为小写,这里设置为 false。codec
:指定字符编码,这里设置为UTF-8
,以处理中文乱码问题。
output
部分具体参数说明:
hosts
:Elasticsearch 集群的地址。index
:指定 Elasticsearch 索引的名称,这里使用了动态索引名%{user_index}
,表示根据 MySQL 数据表中的user_index
字段来决定索引名称。document_id
:指定 Elasticsearch 文档的 ID,这里使用了动态ID%{user_id}
,表示根据 MySQL 数据表中的user_id
字段来决定文档 ID。user
:连接 Elasticsearch 的用户名(如果 Elasticsearch 没有设置,这个配置项可为空)。password
:连接 Elasticsearch 的密码(如果 Elasticsearch 没有设置,这个配置项可为空)。
可能踩的坑
坑一:配置项空格问题
在配置文件中,务必注意不要多写空格,这里仅有一个空格!否则 Logstash 会启动报错。
踩坑后的报错日志:
Failed to execute action {:action=>LogStash::PipelineAction::Create/pipeline_id:main, :exception=>"LogStash::ConfigurationError", :message=>"Expected one of [ \\t\\r\\n], \"#\", \"if\", [A-Za-z0-9_-], '\"', \"'\", \"}\" at line 60, column 1 (byte 1371) after output {\n elasticsearch {\n hosts => [\"http://172.17.0.2:9200\"]\n user => \"elastic\"\n password => \"123456\"\n index => \"%{user_index}\"\n document_id => \"%{user_id}\"\n}\n", :backtrace=>["/usr/share/logstash/logstash-core/lib/logstash/compiler.rb:41:in `compile_imperative'", "/usr/share/logstash/logstash-core/lib/logstash/compiler.rb:49:in `compile_graph'", "/usr/share/logstash/logstash-core/lib/logstash/compiler.rb:11:in `block in compile_sources'", "org/jruby/RubyArray.java:2580:in `map'", "/usr/share/logstash/logstash-core/lib/logstash/compiler.rb:10:in `compile_sources'", "org/logstash/execution/AbstractPipelineExt.java:161:in `initialize'", "org/logstash/execution/JavaBasePipelineExt.java:47:in `initialize'", "/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:27:in `initialize'", "/usr/share/logstash/logstash-core/lib/logstash/pipeline_action/create.rb:36:in `execute'", "/usr/share/logstash/logstash-core/lib/logstash/agent.rb:326:in `block in converge_state'"]}
坑二:MySQL版本问题
在 MySQL 的驱动设置中,MySQL8.x
和 MySQL5.x
版本的驱动类名是不同的。
5.x 版本驱动类名:
jdbc_driver_class => "com.mysql.jdbc.Driver" # 5.x 版本的驱动类名
8.x 版本驱动类名:
jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
踩坑后的报错日志:
Error: com.mysql.jdbc.Driver not loaded. Are you sure you’ve included the correct jdbc driver in :jdbc_driver_library?
创建容器并启动
# 创建Logstash容器并启动
docker run -d \
-v /home/langjialing/Documents/logstash/logstash.conf:/usr/share/logstash/pipeline/logstash.conf \
-v /home/langjialing/Documents/logstash/mysql-connector-java-8.0.27.jar:/usr/share/logstash/mysql-connector-java-8.0.27.jar \
--name logstash_langjialing logstash:7.6.2
注:
- 这里注意有两个挂载文件夹,把 Logstash 数据处理管道配置文件和 JDBC 的驱动包映射到了容器内部。挂载路径根据自己的实际路径来修改。
Logstash配置文件
在 Logstash 的安装目录下找到 config/logstash.yml
文件,添加如下配置:
http.host: "0.0.0.0"
xpack.monitoring.enabled: true
xpack.monitoring.elasticsearch.hosts: ["http://172.17.0.2:9200"] #es地址
xpack.monitoring.elasticsearch.username: "elastic" #es xpack账号密码
xpack.monitoring.elasticsearch.password: "123456" #es xpack账号密码
path.config: /usr/share/logstash/pipeline/*.conf
path.logs: /usr/share/logstash/logs
具体参数说明:
http.host
:指定 Logstash HTTP 服务监听的主机地址。设置为 “0.0.0.0” 意味着 Logstash 将监听所有可用的网络接口,可以从任何 IP 地址访问。xpack.monitoring.enabled
:启用 X-Pack 监控功能,设置为 true 表示开启。xpack.monitoring.elasticsearch.hosts
:指定 Elasticsearch 集群的地址,用于存储和查看监控数据。xpack.monitoring.elasticsearch.username
:用于连接 Elasticsearch 的用户名(如果没有设置,这里可为空)。xpack.monitoring.elasticsearch.password
:用于连接 Elasticsearch 的密码(如果没有设置,这里可为空)。path.config
:指定 Logstash 管道配置文件的路径。这里是/usr/share/logstash/pipeline/*.conf
,表示加载该目录下所有以.conf
结尾的配置文件。path.logs
:指定 Logstash 日志文件的存储路径。
可通过如下命令查看容器实时日志:
# 查看容器日志。类似于 tail -f 命令
docker logs -f logstash_langjialing
遗留问题点
日志文件问题
这里 path.logs
的配置项,我个人搭建的服务并没有生成相应的日志文件(可能是文件操作权限的问题)。
报错日志
输出的日志中存在如下报错,在网上查询了一些资料,但没有解决掉。
[2024-05-22T06:20:52,565][ERROR][logstash.licensechecker.licensereader] Unable to retrieve license information from license server {:message=>"No Available connections"}
[2024-05-22T06:21:00,130][INFO ][logstash.inputs.jdbc ][main] (0.002218s) SELECT version()
[2024-05-22T06:21:00,137][INFO ][logstash.inputs.jdbc ][main] (0.002458s) SELECT version()
[2024-05-22T06:21:00,170][INFO ][logstash.inputs.jdbc ][main] (0.015133s) SELECT count(*) AS `count` FROM (SELECT
*
FROM
user202405) AS `t1` LIMIT 1
[2024-05-22T06:21:00,177][INFO ][logstash.inputs.jdbc ][main] (0.001708s) SELECT * FROM (SELECT
*
FROM
user202405) AS `t1` LIMIT 2000 OFFSET 0
[2024-05-22T06:21:03,831][WARN ][logstash.licensechecker.licensereader] Attempted to resurrect connection to dead ES instance, but got an error. {:url=>"http://elasticsearch:9200/", :error_type=>LogStash::Outputs::ElasticSearch::HttpClient::Pool::HostUnreachableError, :error=>"Elasticsearch Unreachable: [http://elasticsearch:9200/][Manticore::ResolutionFailure] elasticsearch: Name or service not known"}
遗留问题点解决
日志文件问题解决
本人测试在 Linux 中是可以创建出日志文件的。因此这里就是 Docker 中文件操作权限的问题,上面的配置项没有问题。
报错日志解决
问题点:
在以上示例中,我设置的同步周期为每10秒同步一次(schedule => "*/10 * * * * *" # 每10S运行一次
),这个同步频率太大,导致 ElasticSearch 无法及时响应 Logstash 的请求。
问题解决:
找到问题解决就很容易了。这里我把同步周期改为每10分钟同步一次(schedule => "* */10 * * * *" # 每10分钟运行一次
),报错日志自然而然就没有了。
结果验证
在 Kibana 中验证数据:http://192.168.177.128:5601/
。
总结
搭建 Logstash,实现使用 MySQL 把数据同步到 ElasticSearch。