ELK Logstash日志采集与分割配置详解
一、Logstash核心配置结构
Logstash配置文件由三个核心部分组成:input(数据输入)、filter(数据处理)、output(数据输出)。这种结构化的配置方式使得Logstash能够灵活地从多种数据源采集数据,进行复杂的处理转换,并输出到不同的目标系统。
二、文件输入配置基础
1. 基本文件输入配置
input {
file {
path => "/var/log/*.log"
start_position => "beginning"
sincedb_path => "/dev/null"
}
}关键参数说明:
path:指定要监控的日志文件路径,支持通配符start_position:从文件开头(beginning)或末尾(end)开始读取sincedb_path:记录文件读取位置的数据库文件路径,设为/dev/null表示不记录位置
2. 多文件输入配置
input {
file {
path => ["/var/log/nginx/access.log", "/var/log/nginx/error.log"]
type => "nginx"
start_position => "beginning"
}
file {
path => "/var/log/messages"
type => "system"
start_position => "beginning"
}
}通过type字段区分不同来源的日志,便于后续处理。
三、多行日志合并配置
1. Java堆栈跟踪合并
input {
file {
path => "/var/log/java-app.log"
codec => multiline {
pattern => "^\\s"
negate => false
what => "previous"
}
}
}配置说明:
pattern:匹配以空白字符开头的行negate => false:匹配的行属于多行日志的一部分what => "previous":将匹配的行合并到前一行
2. 时间戳开头的多行日志
input {
file {
path => "/var/log/app.log"
codec => multiline {
pattern => "^%{TIMESTAMP_ISO8601}"
negate => true
what => "previous"
}
}
}配置说明:
negate => true:不匹配模式的行属于多行日志的一部分- 将所有不以时间戳开头的行合并到前一行
四、Grok日志解析配置
1. Apache/Nginx访问日志解析
filter {
grok {
match => { "message" => "%{COMBINEDAPACHELOG}" }
}
}内置模式说明:
%{COMBINEDAPACHELOG}:预定义的Apache/Nginx日志模式- 自动解析客户端IP、请求方法、URL、状态码、响应大小等字段
2. 自定义日志格式解析
filter {
grok {
match => {
"message" => "%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} \[%{WORD:module}\] %{GREEDYDATA:message}"
}
}
}字段解析结果:
timestamp:日志时间戳level:日志级别(INFO、ERROR等)module:模块名称message:日志内容
3. 多模式匹配配置
filter {
grok {
match => [
"message", "%{COMBINEDAPACHELOG}",
"message", "%{SYSLOGTIMESTAMP:timestamp} %{SYSLOGHOST:hostname} %{DATA:program}(?:\[%{POSINT:pid}\])?: %{GREEDYDATA:message}"
]
tag_on_failure => ["grok_failure"]
}
}配置说明:
- 支持多个匹配模式,按顺序尝试
tag_on_failure:匹配失败时添加标签,便于后续处理
五、条件判断与路由配置
1. 根据日志类型分流
output {
if [type] == "nginx" {
elasticsearch {
hosts => ["localhost:9200"]
index => "nginx-%{+YYYY.MM.dd}"
}
} else if [type] == "system" {
elasticsearch {
hosts => ["localhost:9200"]
index => "system-%{+YYYY.MM.dd}"
}
}
}条件判断语法:
==:等于!=:不等于in:包含!~:不匹配正则
2. 根据日志级别过滤
filter {
if [loglevel] == "ERROR" {
mutate {
add_tag => ["error_log"]
}
}
}
output {
if "error_log" in [tags] {
elasticsearch {
hosts => ["localhost:9200"]
index => "error-logs-%{+YYYY.MM.dd}"
}
}
}配置说明:
- 为ERROR级别的日志添加标签
- 根据标签将错误日志输出到专门的索引
六、多输出配置示例
1. 同时输出到多个目标
output {
elasticsearch {
hosts => ["localhost:9200"]
index => "logs-%{+YYYY.MM.dd}"
}
stdout {
codec => rubydebug
}
file {
path => "/var/log/logstash/output.log"
}
}配置说明:
- 数据同时输出到Elasticsearch、控制台和文件
- 便于调试和备份
2. 条件输出配置
output {
if [fields][log_type] == "nginx" {
elasticsearch {
hosts => ["localhost:9200"]
index => "nginx-log-%{+YYYY.MM.dd}"
}
} else {
elasticsearch {
hosts => ["localhost:9200"]
index => "app-log-%{+YYYY.MM.dd}"
}
}
}配置说明:
- 根据
log_type字段值动态选择输出索引 - 实现不同来源日志的隔离存储
七、完整配置示例
1. Nginx日志采集完整配置
input {
file {
path => "/var/log/nginx/access.log"
type => "nginx-access"
start_position => "beginning"
}
file {
path => "/var/log/nginx/error.log"
type => "nginx-error"
start_position => "beginning"
}
}
filter {
if [type] == "nginx-access" {
grok {
match => { "message" => "%{COMBINEDAPACHELOG}" }
}
date {
match => ["timestamp", "dd/MMM/yyyy:HH:mm:ss Z"]
target => "@timestamp"
}
mutate {
remove_field => ["timestamp"]
}
}
}
output {
if [type] == "nginx-access" {
elasticsearch {
hosts => ["localhost:9200"]
index => "nginx-access-%{+YYYY.MM.dd}"
}
} else if [type] == "nginx-error" {
elasticsearch {
hosts => ["localhost:9200"]
index => "nginx-error-%{+YYYY.MM.dd}"
}
}
}配置特点:
- 分别采集访问日志和错误日志
- 使用Grok解析访问日志格式
- 按日志类型输出到不同索引
2. 多行日志+条件过滤完整配置
input {
file {
path => "/var/log/java-app.log"
codec => multiline {
pattern => "^\\s"
negate => false
what => "previous"
}
}
}
filter {
grok {
match => {
"message" => "%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} %{JAVACLASS:class} - %{GREEDYDATA:message}"
}
}
if [level] == "ERROR" {
mutate {
add_tag => ["error"]
}
}
}
output {
elasticsearch {
hosts => ["localhost:9200"]
index => "java-app-%{+YYYY.MM.dd}"
}
if "error" in [tags] {
file {
path => "/var/log/error.log"
}
}
}配置特点:
- 处理Java应用的多行堆栈跟踪
- 解析日志级别、类名和消息内容
- 错误日志同时输出到文件和Elasticsearch
八、配置优化建议
1. 性能优化
- 合理设置
sincedb_write_interval,减少磁盘I/O - 使用
stat_interval调整文件检查频率 - 避免在output阶段进行复杂条件判断
2. 调试技巧
- 使用
stdout { codec => rubydebug }输出调试信息 - 添加
tag_on_failure标签标记解析失败的日志 - 使用条件判断逐步调试过滤逻辑
3. 维护建议
- 按业务模块拆分配置文件
- 使用
-f参数指定配置文件目录启动 - 定期清理
sincedb文件,避免文件读取位置记录错误
通过以上配置示例,可以灵活应对各种日志采集场景,实现日志数据的结构化解析、条件过滤和多目标输出。
作者:严锋 创建时间:2024-05-06 11:00
最后编辑:严锋 更新时间:2025-12-25 10:39
最后编辑:严锋 更新时间:2025-12-25 10:39