mysql数据库与elasticsearch数据同步功能实现(logstash-input-jdbc插件)

252 阅读1分钟

前提

安装elasticsearch,logstash这个自行百度安装。

logstash5.x之后,集成了logstash-input-jdbc插件。只需简单地命令就可安装, 到logstash的bin目录下执行

./logstash-plugin install logstash-input-jdbc

配置

在logstash的conf中创建jdbc.conf,粘贴以下内容,修改对应的配置即可

input {
  stdin {}
  jdbc {
    # mysql数据库驱动,https://mvnrepository.com/  这里有
    jdbc_driver_library => "/usr/local/logstash/config/mysql-connector-java-5.1.31.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    # 数据库连接地址
    jdbc_connection_string => "jdbc:mysql://localhost:3306/octopus"
    # 用户名,密码
    jdbc_user => "root"
    jdbc_password => "1424464835"
    # 更新时间  corn表达式   http://cron.qqe2.com/在线cron表达式
    schedule => "0 0/5 * * *"
    # 是否分页
    jdbc_paging_enabled => "true"
    jdbc_page_size => "10000"
    # sql语句执行语句 ,也可写在配置文件中,然后填入配置文件地址便可
    statement_filepath => "select * from store"
    # elasticsearch索引类型名
    type => "doc"
  }
}

# 过滤已被删除的数据
filter {
     if [is_delete] == "1" {
         drop {}
     }
 }


# 输出部分
output {
    elasticsearch {
        # elasticsearch索引名
        index => "store"
        # 使用input中的type作为elasticsearch索引下的类型名
        document_type => "%{type}"  
        # elasticsearch的ip和端口号
        hosts => "localhost:9200"
        # 同步mysql中数据id作为elasticsearch中文档id
        document_id => "%{id}"
    }
    stdout {
        codec => json_lines
    }
}

运行

bin/logstash -f config/jdbc.conf --config.reload.automatic