原理通过 Logstash 将 MySQL 数据同步到 Elasticsearch其核心原理可以概括为“主动轮询”。它不像 Canal 那样监听 MySQL 的 binlog 变更日志而是由 Logstash 按照设定好的时间间隔主动向 MySQL 发起 SQL 查询然后将查询到的结果写入 Elasticsearch实现起来很简单 下面是一个典型的ELKElasticsearch、Logstash、Kibanaversion: 3.8 services: es: image: docker.elastic.co/elasticsearch/elasticsearch:7.17.16 container_name: es environment: - discovery.typesingle-node - xpack.security.enabledfalse - ES_JAVA_OPTS-Xms512m -Xmx512m ports: - 9200:9200 volumes: - es_data:/usr/share/elasticsearch/data networks: - es-net kibana: image: docker.elastic.co/kibana/kibana:7.17.16 container_name: kibana ports: - 5601:5601 environment: - ELASTICSEARCH_HOSTShttp://es:9200 depends_on: - es networks: - es-net logstash: image: docker.elastic.co/logstash/logstash:7.17.16 container_name: logstash depends_on: - es ports: - 5044:5044 - 9600:9600 volumes: - ./logstash.conf:/usr/share/logstash/pipeline/logstash.conf - ./mysql-connector-j-8.3.0/mysql-connector-j-8.3.0.jar:/usr/share/logstash/mysql-connector-j-8.3.0.jar networks: - es-net volumes: es_data: networks: es-net: driver: bridge通这个docker-compoe你就可以运行起来这三个服务了同时你还需要把logstash.conf 和mysql-connector-j-8.3.0.jar 挂载进去mysql-connector-j-8.3.0 因为我这是mysql.8.2 所以使用这个logstash.conf 是主力配置input { jdbc { jdbc_connection_string jdbc:mysql://47.116.153.12:31218/go?useSSLfalseserverTimezoneAsia/Shanghai # MySQL 连接地址Go 数据库 jdbc_user canal # MySQL 用户名 jdbc_password canal # MySQL 密码 jdbc_driver_library /usr/share/logstash/mysql-connector-j-8.3.0.jar # MySQL JDBC 驱动 jar 包路径Logstash 需要 jdbc_driver_class com.mysql.cj.jdbc.Driver # MySQL 8.x 驱动类 statement SELECT id, name, description, max_users, background, is_private, password, created_by, created_at, updated_at, invitation_code FROM chat_rooms WHERE updated_at :sql_last_value AND updated_at IS NOT NULL ORDER BY updated_at ASC # 查询 SQL # 作用增量同步 chat_rooms 表 # 条件 # - 只取 updated_at 上次同步时间的数据增量 # - updated_at 不为空 # - 按更新时间升序排序 use_column_value true # 使用字段值做增量标记而不是时间戳文件 tracking_column updated_at # 用 updated_at 作为增量同步字段 tracking_column_type timestamp # tracking_column 类型是 timestamp时间类型 schedule * * * * * # 定时任务每分钟执行一次cron 表达式 clean_run false # false 不清空历史 offset继续上次同步位置 last_run_metadata_path /usr/share/logstash/data/last_run # 保存上次同步时间的位置文件用于断点续传 } } output { elasticsearch { hosts [http://es:9200] # ES 地址Docker 内部服务名 index chat_rooms # 写入 ES 的索引名称 document_id %{id} # 用 MySQL 的 id 作为 ES document_id保证幂等防重复 } stdout { codec json } # 同时输出到控制台调试用JSON 格式 }运行起来你就会发现他会一直轮询查询初次表大的话可以带上limit限制因为Logstash JDBC 的确会“记住上一次的时间”如果你是根据ID他会记住上一次的ID调整sql 语句发生的事情这种我建议最好有一个自动更新的update键并且带上索引根据这个好建立查询语句只调整最近更新的 但是这种方式对真删不太友好可以说是不支持因为监听不到delete事件所以使用这种方法做一下软删可以 但是这种方法上手快简单上手快只需要 MySQL ES Logstash不依赖 binlog不用改 MySQL 配置稳定适合小中型项目SQL 可控想查什么就查什么