ElasticSearch:Logstash之同步地理图形(geo_shape)数据

郎家岭伯爵 2024年06月19日 417次浏览

前言

前面实现了使用 Logstash 把 geo_point 坐标点数据从 MySQL 同步到了 ElasticSearch,本文我们继续介绍使用 Logstash 把 geo_shape 地理图形数据从 MySQL 同步到 ElasticSearch。

实现

我们在前文的表结构上增加一个 geo_json 字段,用来保存地理图形的 geo_shape 数据。

注:

  • MySQL 中的 geo_json 保存的是字符串格式的 geo_shape。也就是说,geo_json 中的数据完全符合 ElasticSearch 中的 geo_shape 形式,只是类型不同。

创建索引并指定字段映射

-- 创建索引并指定字段映射
PUT langjialing_index
{
  "settings": {
    "index": {
      "number_of_shards": 2,  // 将分片数设置为2,一般为 节点数 × 2
      "number_of_replicas": 1 // 将副本数设置为1,一般为 节点数 × 1
    }
  },
  "mappings": {
    "properties": {
      "geo_point": {
        "type": "geo_point"
      },
      "geo_json": {
        "type": "geo_shape"
      }
    }
  }
}

Logstash数据处理管道配置文件(pipeline)

这里我们贴出如下配置文件中的核心部分:

filter {
  json {
    source => "geo_json"
    target => "geo_json"
  }
}

说明

  • 使用 Logstash 的 json过滤器 来解析 geo_json 字段中的 JSON 字符串,并将其转化为 JSON 对象。
  • source: 指定要解析的字段,这个字段包含 JSON 字符串。
  • target: 指定解析后的 JSON 对象应该放置到哪个字段中。如果 target 参数和 source 参数相同,表示将解析后的 JSON 对象直接覆盖原有的 JSON 字符串字段

完整配置代码

input {
  jdbc {
    jdbc_connection_string => "jdbc:mysql://172.17.0.5:3306/test"
    jdbc_user => "root"
    jdbc_password => "123456"
    jdbc_driver_library => "/usr/share/logstash/mysql-connector-java-8.0.27.jar"
    jdbc_driver_class => "com.mysql.cj.jdbc.Driver"

    jdbc_paging_enabled => "true"
    jdbc_fetch_size => "2000" # 每次查2000后即插入ES,避免OOM。下边的SQL部分注意添加 ORDER BY,避免遗漏数据或者重复数据。
    jdbc_page_size => "2000"

    # 这里类似crontab(但不完全是,还是有区别的),可以定制定时操作,比如每10分钟执行一次同步
    schedule => "0 */10 * * * *"  # 每10分钟运行一次。注意频率不要太高

    # mysql文件, 也可以直接写SQL语句在此处,如下:
    # statement_filepath => "/usr/share/logstash/mysql/mysql-to-es.sql"
    statement => "SELECT
					user_id,
					user_index,
					user_name,
					PASSWORD,
					age,
					geo_lon,
					geo_lat,
					CREATE_time,
					update_time 
				FROM
					user202405
				ORDER BY user_id"

    # 是否将字段名转换为小写,默认true(如果有数据序列化、反序列化需求,建议改为false)
    lowercase_column_names => false
    # 处理中文乱码问题
    codec => plain { charset => "UTF-8" }
  }
}

filter {
  json {
    source => "geo_json"
    target => "geo_json"
  }
}

output {
  elasticsearch {
    hosts => ["http://172.17.0.2:9200"]
    user => "elastic"
    password => "123456"
    index => "%{user_index}"
    document_id  => "%{user_id}"
  }
}

这里我就偷个懒,不写验证部分的内容了。

总结

记录下 Logstash 从 MySQL 向 ElasticSearch 中同步 geo_shape 类型的数据的操作。