Logstash总结

Posted by BenderFly on October 4, 2019

案例

配置filebeats

filebeat.inputs:
- type: log
  paths:
    - /path/to/file/logstash-tutorial.log 
output.logstash:
  hosts: ["localhost:5044"]

运行filebeats

sudo ./filebeat -e -c filebeat.yml -d "publish"

配置logstash vim first-pipeline.conf

input {
    beats {
        port => "5044"
    }
}
 filter {
    grok {
        match => { "message" => "%{COMBINEDAPACHELOG}"}
    }
    geoip {
        source => "clientip"
    }
}
output {
    elasticsearch {
        hosts => [ "localhost:9200" ]
    }
}

测试配置文件

bin/logstash -f first-pipeline.conf --config.test_and_exit

运行logstash

bin/logstash -f first-pipeline.conf --config.reload.automatic

修改first-pipeline.conf之后要删除Filebeat registry file,重新运行filebeats

sudo rm data/registry

logstash常用插件

inputs:
file, syslog, redis, beats

Filters: 
grok, mutate, drop, clone, geoip

Outputs:
elasticsearch, file, graphite, statsd

Codecs:
json, msgpack, and plain

logstash目录结构

{extract.path}         根目录
{extract.path}/bin     可执行文件位置
{extract.path}/config  path.settings
{extract.path}/logs    path.logs
{extract.path}/plugins path.plugins
{extract.path}/data    path.data

logstash配置文件分为pipeline配置文件和setting配置文件 包安装配置文件在/etc/logstash/conf.d目录下,logstash只加载.conf结尾的文件

运行参数

比较常用的参数:

-h, --help
-f, --path.config CONFIG_PATH
-t, --config.test_and_exit
-r, --config.reload.automatic
-e, --config.string CONFIG_STRING

引用字段

只能在outputs,filters插件中使用

# use the +FORMAT syntax where FORMAT is a time format.
# [top-level field][nested field], top-level可省略[]

output {
  statsd {
    increment => "apache.%{[response][status]}"
  }
}

output {
  file {
    path => "/var/log/%{type}.%{+yyyy.MM.dd.HH}"
  }
}

@metadata字段

input { stdin { } }

filter {
  mutate { add_field => { "show" => "This data will be in the output" } }
  mutate { add_field => { "[@metadata][test]" => "Hello" } }
  mutate { add_field => { "[@metadata][no_show]" => "This data will not be in the output" } }
}

output {
  if [@metadata][test] == "Hello" {
    stdout { codec => rubydebug }
  }
}

以上执行结果

$ bin/logstash -f ../test.conf
Pipeline main started
asdf
{
    "@timestamp" => 2016-06-30T02:42:51.496Z,
      "@version" => "1",
          "host" => "example.com",
          "show" => "This data will be in the output",
       "message" => "asdf"
}

使用环境变量

${var} 可以设定默认值:${var:default value}

input {
  tcp {
    port => "${TCP_PORT}"  # 环境变量不存在报错
  }
}

input {
  tcp {
    port => "${TCP_PORT:54321}"  # 设定默认值
  }
}

filter {
  mutate {
    add_field => {
      "my_path" => "${HOME}/file.log"
    }
  }
}

if 条件判断

格式

if EXPRESSION {
  ...
} else if EXPRESSION {
  ...
} else {
  ...
}

可以使用一下oprator

==,  !=,  <,  >,  <=, >=
=~, !~ #正则表达
in, not in
and, or, nand, xor
!
()

案例如下:

filter {
  if [action] == "login" {
    mutate { remove_field => "secret" }
  }
}

output {
  # Send production errors to pagerduty
  if [loglevel] == "ERROR" and [deployment] == "production" {
    pagerduty {
    ...
    }
  }
}

filter {
  if [foo] in [foobar] {
    mutate { add_tag => "field in field" }
  }
  if [foo] in "foo" {
    mutate { add_tag => "field in string" }
  }
  if "hello" in [greeting] {
    mutate { add_tag => "string in field" }
  }
  if [foo] in ["hello", "world", "foo"] {
    mutate { add_tag => "field in list" }
  }
  if [missing] in [alsomissing] {
    mutate { add_tag => "shouldnotexist" }
  }
  if !("foo" in ["hello", "world"]) {
    mutate { add_tag => "shouldexist" }
  }
}

output {
  if "_grokparsefailure" not in [tags] {
    elasticsearch { ... }
  }
}
# 判断某个字段
以下几种情况if [foo] 返回false:
[foo] doesn’t exist in the event, # 字段不存在
[foo] exists in the event, but is false,  # 字段是false
[foo] exists in the event, but is null    # 字段是null

#正则表达式
output {
  if [type] == "apache" {
    if [status] =~ /^5\d\d/ {
      nagios { ...  }
    } else if [status] =~ /^4\d\d/ {
      elasticsearch { ... }
    }
    statsd { increment => "apache.%{status}" }
  }
}

Syslog

input {
  tcp {
    port => 5000
    type => syslog
  }
  udp {
    port => 5000
    type => syslog
  }
}

filter {
  if [type] == "syslog" {
    grok {
      match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:\[%{POSINT:syslog_pid}\])?: %{GREEDYDATA:syslog_message}" }
      add_field => [ "received_at", "%{@timestamp}" ]
      add_field => [ "received_from", "%{host}" ]
    }
    date {
      match => [ "syslog_timestamp", "MMM  d HH:mm:ss", "MMM dd HH:mm:ss" ]
    }
  }
}

output {
  elasticsearch { hosts => ["localhost:9200"] }
  stdout { codec => rubydebug }
}