ElasticSearch:Logstash之同步地理坐标点数据

郎家岭伯爵 2024年05月23日 109次浏览

前言

前面搭建了使用 Logstash 把 MySQL 中数据同步到 ElasticSearch 中,不过同步的是文本类型的数据。

本文我们将在上文的基础上实现 ElasticSearch 中地理坐标点(geo_point) 数据的同步。

实现

本文的运行环境基于上文,因此这里仅记录核心部分。

MySQL数据表结构

以下是待同步的 MySQL 的数据表结构。在这个表中,我们将把 MySQL 中的 geo_longeo_lat 写入 ElasticSearch 中的 geo_point 字段。

CREATE TABLE `user202405` (
  `user_id` int NOT NULL AUTO_INCREMENT COMMENT '用户ID',
  `user_index` varchar(100) COLLATE utf8mb4_general_ci NOT NULL COMMENT '用户索引',
  `user_name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '用户名',
  `password` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '密码',
  `age` tinyint DEFAULT NULL COMMENT '年龄',
  `geo_lon` varchar(255) COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '经度',
  `geo_lat` varchar(255) COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '纬度',
  `status` tinyint NOT NULL DEFAULT '1' COMMENT '状态:1-正常;99-删除',
  `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  `update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
  PRIMARY KEY (`user_id`)
) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci COMMENT='用户信息表';

手动创建索引并指定字段类型

创建索引的 ES 语句:

PUT langjialing_index
{
  "mappings": {
    "properties": {
      "geo_point": {
        "type": "geo_point"
      }
    }
  }
}

注:

  1. 在本例中,我们把 ElasticSearch 中 geo_point 类型的字段名称也设为 geo_point
  2. 在同步数据之前,需要确保 ElasticSearch 中已经创建了索引,并且索引中存在 geo_point 字段。否则 Logstash 会把 geo_point 识别为普通对象。

Logstash数据处理管道配置文件(pipeline)

我们将在 Logstash 的数据处理管道配置文件中新增部分如下核心代码:

filter {
  mutate {
    add_field => {
      "[geo_point][lat]" => "%{geo_lat}"
      "[geo_point][lon]" => "%{geo_lon}"
    }
    convert => {
      "[geo_point][lat]" => "float"
      "[geo_point][lon]" => "float"
    }
    remove_field => ["geo_lat", "geo_lon"]
  }
}

具体配置说明:

  • add_field: 新增 geo_point 字段,并将 geo_latgeo_lon 的值分别赋给 geo_point.latgeo_point.lon
  • convert: 将 geo_point.latgeo_point.lon 转换为浮点数,以确保其符合 Elasticsearch geo_point 类型的要求。
  • remove_field: 删除 Logstash 事件中的 geo_lat 和 geo_lon 字段,以避免在发送到 Elasticsearch 时存在冗余数据。

完整配置代码:

input {
  jdbc {
    jdbc_connection_string => "jdbc:mysql://172.17.0.5:3306/test"
    jdbc_user => "root"
    jdbc_password => "123456"
    jdbc_driver_library => "/usr/share/logstash/mysql-connector-java-8.0.27.jar"
    jdbc_driver_class => "com.mysql.cj.jdbc.Driver"

    jdbc_paging_enabled => "true"
    jdbc_fetch_size => "2000" # 每次查2000后即插入ES,避免OOM
    jdbc_page_size => "2000"

    # 这里类似crontab(但不完全是,还是有区别的),可以定制定时操作,比如每10分钟执行一次同步
    schedule => "0 */10 * * * *"  # 每10分钟运行一次。注意频率不要太高

    # mysql文件, 也可以直接写SQL语句在此处,如下:
    # statement_filepath => "/usr/share/logstash/mysql/mysql-to-es.sql"
    statement => "SELECT
					user_id,
					user_index,
					user_name,
					PASSWORD,
					age,
					geo_lon,
					geo_lat,
					CREATE_time,
					update_time 
				FROM
					user202405"

    # 是否将字段名转换为小写,默认true(如果有数据序列化、反序列化需求,建议改为false)
    lowercase_column_names => false
    # 处理中文乱码问题
    codec => plain { charset => "UTF-8" }
  }
}

filter {
  mutate {
    add_field => {
      "[geo_point][lat]" => "%{geo_lat}"
      "[geo_point][lon]" => "%{geo_lon}"
    }
    convert => {
      "[geo_point][lat]" => "float"
      "[geo_point][lon]" => "float"
    }
    remove_field => ["geo_lat", "geo_lon"]
  }
}

output {
  elasticsearch {
    hosts => ["http://172.17.0.2:9200"]
    user => "elastic"
    password => "123456"
    index => "%{user_index}"
    document_id  => "%{user_id}"
  }
}

结果验证

登录 kibana 查看同步过来的数据:

总结

记录下 Logstash 从 MySQL 向 ElasticSearch 中同步 geo_point 类型的数据的操作。

捐赠页面示例