案例
配置filebeats
filebeat.inputs:
- type: log
paths:
- /path/to/file/logstash-tutorial.log
output.logstash:
hosts: ["localhost:5044"]
运行filebeats
sudo ./filebeat -e -c filebeat.yml -d "publish"
配置logstash vim first-pipeline.conf
input {
beats {
port => "5044"
}
}
filter {
grok {
match => { "message" => "%{COMBINEDAPACHELOG}"}
}
geoip {
source => "clientip"
}
}
output {
elasticsearch {
hosts => [ "localhost:9200" ]
}
}
测试配置文件
bin/logstash -f first-pipeline.conf --config.test_and_exit
运行logstash
bin/logstash -f first-pipeline.conf --config.reload.automatic
修改first-pipeline.conf之后要删除Filebeat registry file,重新运行filebeats
sudo rm data/registry
logstash常用插件
inputs:
file, syslog, redis, beats
Filters:
grok, mutate, drop, clone, geoip
Outputs:
elasticsearch, file, graphite, statsd
Codecs:
json, msgpack, and plain
logstash目录结构
{extract.path} 根目录
{extract.path}/bin 可执行文件位置
{extract.path}/config path.settings
{extract.path}/logs path.logs
{extract.path}/plugins path.plugins
{extract.path}/data path.data
logstash配置文件分为pipeline配置文件和setting配置文件 包安装配置文件在/etc/logstash/conf.d目录下,logstash只加载.conf结尾的文件
运行参数
比较常用的参数:
-h, --help
-f, --path.config CONFIG_PATH
-t, --config.test_and_exit
-r, --config.reload.automatic
-e, --config.string CONFIG_STRING
引用字段
只能在outputs,filters插件中使用
# use the +FORMAT syntax where FORMAT is a time format.
# [top-level field][nested field], top-level可省略[]
output {
statsd {
increment => "apache.%{[response][status]}"
}
}
output {
file {
path => "/var/log/%{type}.%{+yyyy.MM.dd.HH}"
}
}
@metadata字段
input { stdin { } }
filter {
mutate { add_field => { "show" => "This data will be in the output" } }
mutate { add_field => { "[@metadata][test]" => "Hello" } }
mutate { add_field => { "[@metadata][no_show]" => "This data will not be in the output" } }
}
output {
if [@metadata][test] == "Hello" {
stdout { codec => rubydebug }
}
}
以上执行结果
$ bin/logstash -f ../test.conf
Pipeline main started
asdf
{
"@timestamp" => 2016-06-30T02:42:51.496Z,
"@version" => "1",
"host" => "example.com",
"show" => "This data will be in the output",
"message" => "asdf"
}
使用环境变量
${var} 可以设定默认值:${var:default value}
input {
tcp {
port => "${TCP_PORT}" # 环境变量不存在报错
}
}
input {
tcp {
port => "${TCP_PORT:54321}" # 设定默认值
}
}
filter {
mutate {
add_field => {
"my_path" => "${HOME}/file.log"
}
}
}
if 条件判断
格式
if EXPRESSION {
...
} else if EXPRESSION {
...
} else {
...
}
可以使用一下oprator
==, !=, <, >, <=, >=
=~, !~ #正则表达
in, not in
and, or, nand, xor
!
()
案例如下:
filter {
if [action] == "login" {
mutate { remove_field => "secret" }
}
}
output {
# Send production errors to pagerduty
if [loglevel] == "ERROR" and [deployment] == "production" {
pagerduty {
...
}
}
}
filter {
if [foo] in [foobar] {
mutate { add_tag => "field in field" }
}
if [foo] in "foo" {
mutate { add_tag => "field in string" }
}
if "hello" in [greeting] {
mutate { add_tag => "string in field" }
}
if [foo] in ["hello", "world", "foo"] {
mutate { add_tag => "field in list" }
}
if [missing] in [alsomissing] {
mutate { add_tag => "shouldnotexist" }
}
if !("foo" in ["hello", "world"]) {
mutate { add_tag => "shouldexist" }
}
}
output {
if "_grokparsefailure" not in [tags] {
elasticsearch { ... }
}
}
# 判断某个字段
以下几种情况if [foo] 返回false:
[foo] doesn’t exist in the event, # 字段不存在
[foo] exists in the event, but is false, # 字段是false
[foo] exists in the event, but is null # 字段是null
#正则表达式
output {
if [type] == "apache" {
if [status] =~ /^5\d\d/ {
nagios { ... }
} else if [status] =~ /^4\d\d/ {
elasticsearch { ... }
}
statsd { increment => "apache.%{status}" }
}
}
Syslog
input {
tcp {
port => 5000
type => syslog
}
udp {
port => 5000
type => syslog
}
}
filter {
if [type] == "syslog" {
grok {
match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:\[%{POSINT:syslog_pid}\])?: %{GREEDYDATA:syslog_message}" }
add_field => [ "received_at", "%{@timestamp}" ]
add_field => [ "received_from", "%{host}" ]
}
date {
match => [ "syslog_timestamp", "MMM d HH:mm:ss", "MMM dd HH:mm:ss" ]
}
}
}
output {
elasticsearch { hosts => ["localhost:9200"] }
stdout { codec => rubydebug }
}