logstash简介
官方介绍:Logstash is an open source data collection engine with real-time pipelining capabilities。简单来说logstash就是一根具备实时数据传输能力的管道,负责将数据信息从管道的输入端传输到管道的输出端;与此同时这根管道还可让使用者根据自己的需求在中间加上过滤网,Logstash提供了很多功能强大的过滤网以满足你的各种应用场景
logstash是一个接收、处理、转发日志的工具。支持系统日志、webserver日志、错误日志、应用日志,总之包括所有可以抛出来的日志类型。在一个典型的使用场景下(ELK):用ElasticSearch作为后台数据的存储,Kibana用来前端报表展示。logstash在其过程中担任搬运工的角色,它为数据存储、报表查询和日志解析创建了一个功能强大的管道链。
logstash提供了多种多样的input、filters、codecs和output组件,可以轻松是实现强大的功能
系统结构
Logstash的时间(logstashh将数据流中的每一条数据称之为event)处理流水线有三个主要角色完成:inputs-> filters -> outputs
- inputs:必须,负责产生时间(Inputs generate events),常用:file、、syslog、redis、beats(如Filebeats)
- filters:可选,负责数据处理与转换(filters modify them),常用:grok、mutate、drop、clone、geoip
- outputs:必须,负责数据输出(outputs ship them elsewhere),常用:ElasticSearch、file、graphite、statsd
其中inputs和outputs支持codecs(coder & decoder)在1.3.0版本之前,logstasg只支持纯文本形式输入,然后以过滤器处理它。但现在,我们可以输入处理不同类型的数据,所以完整的数据流程应该是:input | decode | filter | encode | output;
codec的引入,使得logstash可以更好更方便的与其他有自定义数据格式的运维产品共存,比如:graphite、fluent、netfiow、collectd,以及msgpack、json、edn等通用数据格式的其它产品
安装logstash
logstash可以直接和数据库关联,并且自动更加数据库的数据更新索引
安装的logstash的版本要和ElasticSearch的版本一致
1、下载安装文件,并上传到服务器之中,解压缩,重名名
tar xzvf /srv/ftp/logstash-7.6.0.tar.gz -C /usr/local/
mv /usr/local/logstash-7.6.0/ /usr/local/logstash
2、执行命令:
/usr/local/logstash/bin/logstash -e 'input { stdin {} } output { stdout{} ]'
3、连接mysql:停掉logstash,logstash没有mysq类驱动,所有需要将下载好的驱动上传到服务器中,将上传好的驱动放在logstash目录下
mv /srv/ftp/mysql-connector-java-5.1.46.jar /usr/local/logstash/lib/
4、创建logstash-mysql.conf文件
vim /usr/local/logstash/config/logstash-mysql.conf
input {
stdin {
}
jdbc {
# 数据库 数据库名称为drp,导入数据的表名为goods
jdbc_connection_string => "jdbc:mysql://192.168.137.1:3306/drp"
# 用户名密码
jdbc_user => "root"
jdbc_password => "mysqladmin"
# mysql驱动jar包的位置
jdbc_driver_library => "/usr/local/logstash/lib/mysql-connector-java-5.1.46.jar"
# mysql的Driver
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_paging_enabled => "true"
jdbc_page_size => "50000"
# 查询sql
statement => "select * from goods"
schedule => "* * * * *"
#索引的类型
type => "book"
}
}
filter {
json {
source => "message"
remove_field => ["message"]
}
}
output {
elasticsearch {
hosts => "127.0.0.1:9200"
# index名
# 索引的名称
index => "drp"
# 需要关联的数据库中有有一个id字段,对应索引的id号
document_id => "%{gid}"
}
stdout {
codec => json_lines
}
}
5、启动logstash
/usr/local/logstash/bin/logstash -f /usr/local/logstash/config/logstash-mysql.conf