ELK Logstash日志采集与分割配置详解

一、Logstash核心配置结构

Logstash配置文件由三个核心部分组成:input(数据输入)、filter(数据处理)、output(数据输出)。这种结构化的配置方式使得Logstash能够灵活地从多种数据源采集数据,进行复杂的处理转换,并输出到不同的目标系统。

二、文件输入配置基础

1. 基本文件输入配置

input {
  file {
    path => "/var/log/*.log"
    start_position => "beginning"
    sincedb_path => "/dev/null"
  }
}

关键参数说明

  • path:指定要监控的日志文件路径,支持通配符
  • start_position:从文件开头(beginning)或末尾(end)开始读取
  • sincedb_path:记录文件读取位置的数据库文件路径,设为/dev/null表示不记录位置

2. 多文件输入配置

input {
  file {
    path => ["/var/log/nginx/access.log", "/var/log/nginx/error.log"]
    type => "nginx"
    start_position => "beginning"
  }
  file {
    path => "/var/log/messages"
    type => "system"
    start_position => "beginning"
  }
}

通过type字段区分不同来源的日志,便于后续处理。

三、多行日志合并配置

1. Java堆栈跟踪合并

input {
  file {
    path => "/var/log/java-app.log"
    codec => multiline {
      pattern => "^\\s"
      negate => false
      what => "previous"
    }
  }
}

配置说明

  • pattern:匹配以空白字符开头的行
  • negate => false:匹配的行属于多行日志的一部分
  • what => "previous":将匹配的行合并到前一行

2. 时间戳开头的多行日志

input {
  file {
    path => "/var/log/app.log"
    codec => multiline {
      pattern => "^%{TIMESTAMP_ISO8601}"
      negate => true
      what => "previous"
    }
  }
}

配置说明

  • negate => true:不匹配模式的行属于多行日志的一部分
  • 将所有不以时间戳开头的行合并到前一行

四、Grok日志解析配置

1. Apache/Nginx访问日志解析

filter {
  grok {
    match => { "message" => "%{COMBINEDAPACHELOG}" }
  }
}

内置模式说明

  • %{COMBINEDAPACHELOG}:预定义的Apache/Nginx日志模式
  • 自动解析客户端IP、请求方法、URL、状态码、响应大小等字段

2. 自定义日志格式解析

filter {
  grok {
    match => { 
      "message" => "%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} \[%{WORD:module}\] %{GREEDYDATA:message}" 
    }
  }
}

字段解析结果

  • timestamp:日志时间戳
  • level:日志级别(INFO、ERROR等)
  • module:模块名称
  • message:日志内容

3. 多模式匹配配置

filter {
  grok {
    match => [
      "message", "%{COMBINEDAPACHELOG}",
      "message", "%{SYSLOGTIMESTAMP:timestamp} %{SYSLOGHOST:hostname} %{DATA:program}(?:\[%{POSINT:pid}\])?: %{GREEDYDATA:message}"
    ]
    tag_on_failure => ["grok_failure"]
  }
}

配置说明

  • 支持多个匹配模式,按顺序尝试
  • tag_on_failure:匹配失败时添加标签,便于后续处理

五、条件判断与路由配置

1. 根据日志类型分流

output {
  if [type] == "nginx" {
    elasticsearch {
      hosts => ["localhost:9200"]
      index => "nginx-%{+YYYY.MM.dd}"
    }
  } else if [type] == "system" {
    elasticsearch {
      hosts => ["localhost:9200"]
      index => "system-%{+YYYY.MM.dd}"
    }
  }
}

条件判断语法

  • ==:等于
  • !=:不等于
  • in:包含
  • !~:不匹配正则

2. 根据日志级别过滤

filter {
  if [loglevel] == "ERROR" {
    mutate {
      add_tag => ["error_log"]
    }
  }
}

output {
  if "error_log" in [tags] {
    elasticsearch {
      hosts => ["localhost:9200"]
      index => "error-logs-%{+YYYY.MM.dd}"
    }
  }
}

配置说明

  • 为ERROR级别的日志添加标签
  • 根据标签将错误日志输出到专门的索引

六、多输出配置示例

1. 同时输出到多个目标

output {
  elasticsearch {
    hosts => ["localhost:9200"]
    index => "logs-%{+YYYY.MM.dd}"
  }
  stdout {
    codec => rubydebug
  }
  file {
    path => "/var/log/logstash/output.log"
  }
}

配置说明

  • 数据同时输出到Elasticsearch、控制台和文件
  • 便于调试和备份

2. 条件输出配置

output {
  if [fields][log_type] == "nginx" {
    elasticsearch {
      hosts => ["localhost:9200"]
      index => "nginx-log-%{+YYYY.MM.dd}"
    }
  } else {
    elasticsearch {
      hosts => ["localhost:9200"]
      index => "app-log-%{+YYYY.MM.dd}"
    }
  }
}

配置说明

  • 根据log_type字段值动态选择输出索引
  • 实现不同来源日志的隔离存储

七、完整配置示例

1. Nginx日志采集完整配置

input {
  file {
    path => "/var/log/nginx/access.log"
    type => "nginx-access"
    start_position => "beginning"
  }
  file {
    path => "/var/log/nginx/error.log"
    type => "nginx-error"
    start_position => "beginning"
  }
}

filter {
  if [type] == "nginx-access" {
    grok {
      match => { "message" => "%{COMBINEDAPACHELOG}" }
    }
    date {
      match => ["timestamp", "dd/MMM/yyyy:HH:mm:ss Z"]
      target => "@timestamp"
    }
    mutate {
      remove_field => ["timestamp"]
    }
  }
}

output {
  if [type] == "nginx-access" {
    elasticsearch {
      hosts => ["localhost:9200"]
      index => "nginx-access-%{+YYYY.MM.dd}"
    }
  } else if [type] == "nginx-error" {
    elasticsearch {
      hosts => ["localhost:9200"]
      index => "nginx-error-%{+YYYY.MM.dd}"
    }
  }
}

配置特点

  • 分别采集访问日志和错误日志
  • 使用Grok解析访问日志格式
  • 按日志类型输出到不同索引

2. 多行日志+条件过滤完整配置

input {
  file {
    path => "/var/log/java-app.log"
    codec => multiline {
      pattern => "^\\s"
      negate => false
      what => "previous"
    }
  }
}

filter {
  grok {
    match => { 
      "message" => "%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} %{JAVACLASS:class} - %{GREEDYDATA:message}" 
    }
  }
  if [level] == "ERROR" {
    mutate {
      add_tag => ["error"]
    }
  }
}

output {
  elasticsearch {
    hosts => ["localhost:9200"]
    index => "java-app-%{+YYYY.MM.dd}"
  }
  if "error" in [tags] {
    file {
      path => "/var/log/error.log"
    }
  }
}

配置特点

  • 处理Java应用的多行堆栈跟踪
  • 解析日志级别、类名和消息内容
  • 错误日志同时输出到文件和Elasticsearch

八、配置优化建议

1. 性能优化

  • 合理设置sincedb_write_interval,减少磁盘I/O
  • 使用stat_interval调整文件检查频率
  • 避免在output阶段进行复杂条件判断

2. 调试技巧

  • 使用stdout { codec => rubydebug }输出调试信息
  • 添加tag_on_failure标签标记解析失败的日志
  • 使用条件判断逐步调试过滤逻辑

3. 维护建议

  • 按业务模块拆分配置文件
  • 使用-f参数指定配置文件目录启动
  • 定期清理sincedb文件,避免文件读取位置记录错误

通过以上配置示例,可以灵活应对各种日志采集场景,实现日志数据的结构化解析、条件过滤和多目标输出。

作者:严锋  创建时间:2024-05-06 11:00
最后编辑:严锋  更新时间:2025-12-25 10:39