Logstash Multiple Pipelines

背景

当Logstash要处理的多个input类型时, 最常见的两种解决方案(不推荐):

  1. 通过条件判断解决问题

  2. 通过多实例解决问题

负面影响:

  1. 条件判断

    1. 条件地狱。已知的在一个管道中实现多个独立流的方法是使用条件判断。主要方式是在输入部分通过标签标记事件,然后在过滤器中和输出阶段创建条件分支,对贴有不同标签的事件,应用不同的插件集。这种方式虽然可以解决问题,但在实际的使用中却非常的痛苦!下面是一个简单的 demo 片段:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    input {
    beats {
    port => 3444
    tag => apache
    }
    tcp {
    port => 4222
    tag => firewall
    }
    }

    filter {
    if "apache" in [tags] {
    dissect { ... }
    } else if "firewall" in [tags] {
    grok { ... }
    }
    }

    output {
    if "apache" in [tags] {
    elasticsearch { ... }
    } else if "firewall" in [tags] {
    tcp { ... }
    }
    }
    1. 缺乏拥塞管理。Logstash在所有事件和完成所有输出之前不会移动到下一批事件。这意味着,对于上面的管道,如果 TCP 套接字目标不可达,Logstash将不会处理其他批次的事件,这也就意味着 Elasticsearch 将不会接收事件,并且会对 TCP 输入和 Beats 输入施加反压力。
    2. 不同的数据流需要以不同的方式处理。如果 TCP - > Grok - > TCP 数据流处理大量的小数据,而 Beats -> Dissect -> ES 数据流中的单个数据体积大但是数量少。那么前一个数据流希望有多个 worker 并行并其每一批次处理更多事件,第二个数据流则期望使用少量的 worker 和每批次处理少量的事件。使用单个管道,无法为单个数据流指定独立的管道配置。
  2. 多实例

    1. 需要管理多个实例(通过 init 系统管理多个后台服务)
    2. 每个 Logstash 的实例也意味着一个独立的 JVM
    3. 需要监视每个 Logstash 实例

解决方案

​ 通过配置多管道(Multiple Pipelines), 解决以上的所有问题。

配置

/usr/share/logstash/config/pipelines.yml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# This file is where you define your pipelines. You can define multiple.
# For more information on multiple pipelines, see the documentation:
# https://www.elastic.co/guide/en/logstash/current/multiple-pipelines.html

- pipeline.id: dsiem
path.config: "/etc/logstash/conf.d_siem/*.conf"
pipeline.workers: 16
queue.type: persisted
queue.max_bytes: 300gb

- pipeline.id: cloudflare
path.config: "/etc/logstash/conf.d_cf/*.conf"
pipeline.workers: 8
queue.type: persisted
queue.max_bytes: 100gb

- pipeline.id: ti
path.config: "/etc/logstash/conf.d_ti/*.conf"
pipeline.workers: 8
queue.type: persisted
queue.max_bytes: 50gb

/etc/supervisord.d/logstash.ini

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
[program:logstash]
command=/usr/share/logstash/bin/logstash --path.data /lingtian/data/logstash/
#user=logstash
numprocs=1
autostart=true
autorestart=true
startsecs=1
startretries=3
exitcodes=0,2
stopsignal=INT
redirect_stderr=true
stdout_logfile_maxbytes=1MB
stdout_logfile_backups=5
stdout_capture_maxbytes=1MB
stdout_logfile=/lingtian/logs/logstash/logstash.log
加载配置文件

可通过使用参数, --config.reload.automatic or -r 在配置文件发生更改后自动检测并重新加载配置。

1
$ bin/logstash -f apache.config --config.reload.automatic

默认3秒检查配置文件, 可以通过使用参数, --config.reload.interval 修改秒数。

如果Logstash已经在没有启用自动重载的情况下运行,则可以通过向运行Logstash的进程发送SIGHUP(信号挂起)来强制Logstash重载配置文件并重新启动管道。例如:

1
$ kill -SIGHUP $logstash_pid

参考