ElasticSearch结合Logstash

ElasticSearch结合Logstash

小龙 624 2020-03-04

logstash简介

官方介绍:Logstash is an open source data collection engine with real-time pipelining capabilities。简单来说logstash就是一根具备实时数据传输能力的管道,负责将数据信息从管道的输入端传输到管道的输出端;与此同时这根管道还可让使用者根据自己的需求在中间加上过滤网,Logstash提供了很多功能强大的过滤网以满足你的各种应用场景
logstash是一个接收、处理、转发日志的工具。支持系统日志、webserver日志、错误日志、应用日志,总之包括所有可以抛出来的日志类型。在一个典型的使用场景下(ELK):用ElasticSearch作为后台数据的存储,Kibana用来前端报表展示。logstash在其过程中担任搬运工的角色,它为数据存储、报表查询和日志解析创建了一个功能强大的管道链。
logstash提供了多种多样的input、filters、codecs和output组件,可以轻松是实现强大的功能

系统结构

logstash1.png
Logstash的时间(logstashh将数据流中的每一条数据称之为event)处理流水线有三个主要角色完成:inputs-> filters -> outputs

  • inputs:必须,负责产生时间(Inputs generate events),常用:file、、syslog、redis、beats(如Filebeats)
  • filters:可选,负责数据处理与转换(filters modify them),常用:grok、mutate、drop、clone、geoip
  • outputs:必须,负责数据输出(outputs ship them elsewhere),常用:ElasticSearch、file、graphite、statsd
    其中inputs和outputs支持codecs(coder & decoder)在1.3.0版本之前,logstasg只支持纯文本形式输入,然后以过滤器处理它。但现在,我们可以输入处理不同类型的数据,所以完整的数据流程应该是:input | decode | filter | encode | output;
    codec的引入,使得logstash可以更好更方便的与其他有自定义数据格式的运维产品共存,比如:graphite、fluent、netfiow、collectd,以及msgpack、json、edn等通用数据格式的其它产品

安装logstash

logstash可以直接和数据库关联,并且自动更加数据库的数据更新索引
安装的logstash的版本要和ElasticSearch的版本一致
1、下载安装文件,并上传到服务器之中,解压缩,重名名

tar xzvf /srv/ftp/logstash-7.6.0.tar.gz -C /usr/local/
mv /usr/local/logstash-7.6.0/ /usr/local/logstash

2、执行命令:

/usr/local/logstash/bin/logstash -e 'input { stdin {} } output { stdout{} ]'

3、连接mysql:停掉logstash,logstash没有mysq类驱动,所有需要将下载好的驱动上传到服务器中,将上传好的驱动放在logstash目录下

mv /srv/ftp/mysql-connector-java-5.1.46.jar /usr/local/logstash/lib/

4、创建logstash-mysql.conf文件

vim /usr/local/logstash/config/logstash-mysql.conf
input {
    stdin {
    }
    jdbc {
      # 数据库  数据库名称为drp,导入数据的表名为goods
      jdbc_connection_string => "jdbc:mysql://192.168.137.1:3306/drp"
      # 用户名密码
      jdbc_user => "root"
      jdbc_password => "mysqladmin"
      # mysql驱动jar包的位置
      jdbc_driver_library => "/usr/local/logstash/lib/mysql-connector-java-5.1.46.jar"
      # mysql的Driver
      jdbc_driver_class => "com.mysql.jdbc.Driver"
      jdbc_paging_enabled => "true"
      jdbc_page_size => "50000"
      # 查询sql
      statement => "select * from goods"
      schedule => "* * * * *"
          #索引的类型
      type => "book"
    }
}
filter {
    json {
        source => "message"
        remove_field => ["message"]
    }
}
output {
    elasticsearch {
        hosts => "127.0.0.1:9200"
        # index名
                # 索引的名称
                index => "drp"
                # 需要关联的数据库中有有一个id字段,对应索引的id号
        document_id => "%{gid}"
    }
    stdout {
        codec => json_lines
    }
}

5、启动logstash

/usr/local/logstash/bin/logstash -f /usr/local/logstash/config/logstash-mysql.conf