官方文档:
https://www.elastic.co/guide/en/logstash/6.3/persistent-queues.html
output kafka文档
https://segmentfault.com/a/1190000016596051
日志采集器Logstash其功能虽然强大,但是它依赖java、在数据量大的时候,Logstash进程会消耗过多的系统资源,这将严重影响业务系统的性能,而filebeat就是一个完美的替代者,它基于Go语言没有任何依赖,配置文件简单,格式明了,同时filebeat比logstash更加轻量级,所以占用系统资源极少,非常适合安装在生产机器上。这就是推荐使用filebeat,也是 ELK Stack 在 Agent 的第一选择。
logstash配置文件说明 logstash
logstash.yml 主配置文件
# cat /etc/logstash/logstash.yml |grep -v ^#
path.data: /data/logstash #数据存储路径
path.config: /etc/logstash/conf.d/*.conf #配置文件目录
path.logs: /var/log/logstash #日志输出路径
# mkdir -p /data/logstash #创建data目录
# chown logstash.logstash /data/logstash #授权
jvm.options 这个配置文件是有关jvm的配置,可以配置运行时内存的最大最小值,垃圾清理机制等
-Xms256m #设置内存大小
-Xmx256m
startup.options logstash运行相关的参数
配置文件是写在/etc/logstash/conf.d/ 下,以.conf结尾。
logstash配置文件
logstash pipeline 包含两个必须的元素:input和output,和一个可选元素:filter。
从input读取事件源,(经过filter解析和处理之后),从output输出到目标存储库(elasticsearch或其他)。
Logstash的数据处理过程主要包括:Inputs, Filters, Outputs 三部分, 另外在Inputs和Outputs中可以使用Codecs对数据格式进行处理。这四个部分均以插件形式存在,在logstash.conf配置文件中设置需要使用的input,filter,output, codec插件,以实现特定的数据采集,数据处理,数据输出等功能
Inputs:用于从数据源获取数据,常见的插件如file, syslog, redis, beats 等[详细参考]
Filters:用于处理数据如格式转换,数据派生等,常见的插件如grok, mutate, drop, clone, geoip等[详细参考]
Outputs:用于数据输出,常见的插件如elastcisearch,file, graphite, statsd等[详细参考]
Codecs:Codecs不是一个单独的流程,而是在输入和输出等插件中用于数据转换的模块,用于对数据进行编码处理,常见的插件如json,multiline
input
input部分,由于需要提取的是日志文件,一般使用file插件,该插件常用的几个参数是:
path:指定日志文件路径。
type:指定一个名称,设置type后,可以在后面的filter和output中对不同的type做不同的处理,适用于需要消费多个日志文件的场景。
start_position:指定起始读取位置,“beginning”表示从文件头开始,“end”表示从文件尾开始(类似tail -f)。
sincedb_path:与Logstash的一个坑有关。通常Logstash会记录每个文件已经被读取到的位置,保存在sincedb中,如果Logstash重启,那么对于同一个文件,会继续从上次记录的位置开始读取。如果想重新从头读取文件,需要删除sincedb文件,sincedb_path则是指定了该文件的路径。为了方便,我们可以根据需要将其设置为“/dev/null”,即不保存位置信息。
#整个配置文件分为三部分:input,filter,output
#参考这里的介绍 https://www.elastic.co/guide/en/logstash/current/configuration-file-structure.html
input {
#file可以多次使用,也可以只写一个file而设置它的path属性配置多个文件实现多文件监控
file {
#type是给结果增加了一个属性叫type值为"<xxx>"的条目。这里的type,对应了ES中index中的type,即如果输入ES时,没有指定type,那么这里的type将作为ES中index的type。
type => "apache-access"
path => "/apphome/ptc/Windchill_10.0/Apache/logs/access_log*"
#start_position可以设置为beginning或者end,beginning表示从头开始读取文件,end表示读取最新的,这个也要和ignore_older一起使用。
start_position => beginning
sincedb_path => "/dev/null" #记录文件上次读取位置;输出到null表示每次重启后都从文件首行开始解析
#坑:start_position仅在该文件从未被监听过的时候起作用,如果sincedb文件中已经有这个文件的inode记录了,那么logstash依然会冲记录过的pos开始读取数据。所以重复测试的时候每回需要删除sincedb文件,该文件一般在安装目录下的data/plugins/inputs/file/中
#优化:在file中添加sincedb_path => "/dev/null",可以直接将sincedb写到黑洞中
#ignore_older表示了针对多久的文件进行监控,默认一天,单位为秒,可以自己定制,比如默认只读取一天内被修改的文件。
ignore_older => 604800
#add_field增加属性。这里使用了${HOSTNAME},即本机的环境变量,如果要使用本机的环境变量,那么需要在启动命令上加--alow-env。
add_field => {"log_hostname"=>"${HOSTNAME}"}
#这个值默认是\n 换行符,如果设置为空"",那么后果是每个字符代表一个event
delimiter => ""
#这个表示关闭超过(默认)3600秒后追踪文件。这个对于multiline来说特别有用。... 这个参数和logstash对文件的读取方式有关,两种方式read tail,如果是read
close_older => 3600
coodec => multiline {
pattern => "^\s"
#这个negate是否定的意思,意思跟pattern相反,也就是不满足patter的意思。
# negate => ""
#what有两个值可选 previous和next,举例说明,java的异常从第二行以空格开始,这里就可以pattern匹配空格开始,what设置为previous意思是空格开头这行跟上一行属于同一event。另一个例子,有时候一条命令太长,当以\结尾时表示这行属于跟下一行属于同一event,这时需要使用negate=>true,what=>'next'。
what => "previous"
auto_flush_interval => 60
}
}
file {
type => "methodserver-log"
path => "/apphome/ptc/Windchill_10.0/Windchill/logs/MethodServer-1604221021-32380.log"
start_position => beginning
sincedb_path => "/opt/logstash-2.3.1/sincedb_path/methodserver_process"
# ignore_older => 604800
}
}
filter{
#执行ruby程序,下面例子是将日期转化为字符串赋予daytag
ruby {
code => "event['daytag'] = event.timestamp.time.localtime.strftime('%Y-%m-%d')"
}
# if [path] =~ "access" {} else if [path] =~ "methodserver" {} else if [path] =~ "servermanager" {} else {} 注意语句结构
if [path] =~ "MethodServer" { #z这里的=~是匹配正则表达式
grok {
patterns_dir => ["/opt/logstash-2.3.1/patterns"] #自定义正则匹配
# Tue 4/12/16 14:24:17: TP-Processor2: hirecode---->77LS
match => { "message" => "%{DAY:log_weekday} %{DATE_US:log_date} %{TIME:log_time}: %{GREEDYDATA:log_data}"}
}
#mutage是做转换用的
mutate {
replace => { "type" => "apache" } #替换属性值
convert => { #类型转换
"bytes" => "integer" #例如还有float
"duration" => "integer"
"state" => "integer"
}
#date主要是用来处理文件内容中的日期的。内容中读取的是字符串,通过date将它转换为@timestamp。参考https://www.elastic.co/guide/en/logstash/current/plugins-filters-date.html#plugins-filters-date-match
# date {
# match => [ "logTime" , "dd/MMM/yyyy:HH:mm:ss Z" ]
# }
}else if [type] in ['tbg_qas','mbg_pre'] { # if ... else if ... else if ... else结构
}else {
drop{} # 将event丢弃
}
}
output {
stdout{ codec=>rubydebug} # 直接输出,调试用起来方便
# 输出到redis
redis {
host => '10.120.20.208'
data_type => 'list'
key => '10.99.201.34:access_log_2016-04'
}
# 输出到ES
elasticsearch {
hosts =>"192.168.0.15:9200"
index => "%{sysid}_%{type}"
document_type => "%{daytag}"
}
}
codec编码插件
logstash实际上是一个input | codec | filter | codec |
output的数据流。codec是用来decode、encode事件的。
Multiline合并多行数据
Multiline有三个设置比较重要:pattern、negate、what .
pattern:必须设置,String类型,没有默认值。匹配的是正则表达式。上面配置中的含义是:通过SECOND_TIME字段与日志文件匹配,判断文件开始位置。
negate:boolean类型,默认为false .如果没有匹配,否定正则表达式。
what:必须设置,可以设置为previous或next。表示当与pattern匹配成功的时候,匹配行是归属上一个事件还是下一个事件
根据上面的配置,还有两个设置:charset、patterns_dir。
charset :一般都知道是编码,这个就不多说了。
patterns_dir:从配置中看到的是一个文件路径。
output
output部分,将数据输出到消息队列,以redis为例,需要指定redis server和list key名称。另外,在测试阶段,可以使用stdout来查看输出信息。
# 输出到redis
output {
if [type] == "example_nginx_access" {
redis {
host => "127.0.0.1"
port => "6379"
data_type => "list"
key => "logstash:example_nginx_access"
}
# stdout {codec => rubydebug}
}
}
filter
实际使用中根据自身需求从官方文档中查找适合自己业务的插件并使用即可,当然也可以编写自己的插件。
grok:
但是不能是正则表达式的^ 和$进行结束判断
是Logstash最重要的一个插件,用于将非结构化的文本数据转化为结构化的数据。grok内部使用正则语法对文本数据进行匹配,为了降低使用复杂度,其提供了一组pattern,我们可以直接调用pattern而不需要自己写正则表达式,参考源码grok-patterns。grok解析文本的语法格式是%{SYNTAX:SEMANTIC},SYNTAX是pattern名称,SEMANTIC是需要生成的字段名称,使用工具Grok Debugger可以对解析语法进行调试。例如,在下面的配置中,我们先使用grok对输入的原始nginx日志信息(默认以message作为字段名)进行解析,并添加新的字段request_path_with_verb(该字段的值是verb和request_path的组合),然后对request_path字段做进一步解析。
kv: 用于将某个字段的值进行分解,类似于编程语言中的字符串Split。在下面的配置中,我们将request_args字段值按照“&”进行分解,分解后的字段名称以“request_args_”作为前缀,并且丢弃重复的字段。
geoip: 用于根据IP信息生成地理位置信息,默认使用自带的一份GeoLiteCity database,也可以自己更换为最新的数据库,但是需要数据格式需要遵循Maxmind的格式(参考GeoLite),似乎目前只能支持legacy database,数据类型必须是.dat。下载GeoLiteCity.dat.gz后解压, 并将文件路径配置到source中即可。
translate: 用于检测某字段的值是否符合条件,如果符合条件则将其翻译成新的值,写入一个新的字段,匹配pattern可以通过YAML文件来配置。例如,在下面的配置中,我们对request_api字段翻译成更加易懂的文字描述。
filter {
grok {
match => {"message" => "%{IPORHOST:client_ip} \"%{TIMESTAMP_ISO8601:timestamp}\" \"%{WORD:verb} %{NOTSPACE:request_path} HTTP/%{NUMBER:httpversion}\" %{NUMBER:response_status:int} %{NUMBER:response_body_bytes:int} \"%{DATA:user_agent}\" \"%{DATA:http_referer}\" \"%{NOTSPACE:http_x_forwarder_for}\" \"%{NUMBER:request_time:float}\" \"%{DATA:upstream_resopnse_time}\" \"%{DATA:http_cookie}\" \"%{DATA:http_authorization}\" \"%{DATA:http_token}\""}
add_field => {"request_path_with_verb" => "%{verb} %{request_path}"}
}
grok {
match => {"request_path" => "%{URIPATH:request_api}(?:\?%{NOTSPACE:request_args}|)"}
add_field => {"request_annotation" => "%{request_api}"}
}
kv {
prefix => "request_args_"
field_split => "&"
source => "request_args"
allow_duplicate_values => false
}
geoip {
source => "client_ip"
database => "/home/elktest/geoip_data/GeoLiteCity.dat"
}
translate {
field => request_path
destination => request_annotation
regex => true
exact => true
dictionary_path => "/home/elktest/api_annotation.yaml"
override => true
}
}
启动
接下来可以将文件保存为testLogs.conf文件,保存到logstash的bin路径下,进入logstash的bin路径下运行:
bin/logstash -f testLogs.conf
//如果放到conf文件夹下
bin/logstash -f conf/testLogs.conf//路径要对应上,这里是conf/testLogs.conf
1
2
3
这篇博客,之前应该看过,也挺不错的 ,可以借鉴下
logstash 后可以加:
-f:指定配置文件,根据配置文件配置logstash
-e:字符串,配置,默认“”stdin输入、stdout输出(在控制台输入、输出),可通过命令行接受设置
-l:输出地址,默认控制台输出
-t:测试配置文件是否正确后退出