0%

写在前面

​ 这并不是什么高精尖的架构与技术。这只是我个人在工作中结合目前手头的资源进行了一些整合 。当然实现这些需求的方法有很多, 有钱的可以考虑Splunk, 没钱的有研发的团队的可以上Flink、Esper 。

需求

​ 由于攻防对抗的升级, 通过单一的数据源很难直接断言攻击是否成功。因此, 我们需要结合多个数据源进行安全事件的关联, 从中提炼出可靠性较高的安全告警进行人工排查。例如: 针对WebShell上传类的, 可以通过网络流量 + 终端进行关联; 针对Web攻击类, 可以通过WAF + NIDS的事件关联, 得到Bypass WAF 的安全告警。

解决思路

​ 虽然Wazuh本身具备安全事件的关联能力, 但在传统的部署架构中, 通常是由Wazuh Agent将安全事件发送到Wazuh Manager,通过Manager进行安全事件的关联告警。由于缺少了对数据进行ETL, 使得Wazuh Manager很难对异构数据进行关联。因此, 我们需要通过Logstash实现对数据的标准化, 并将标准化后的数据通过Syslog的形式发送到Wazuh Manager, 从而进行异构数据的关联。

坑点
  1. 本次改造采用了Syslog的形式将数据发送到Wazuh Manager端进行数据关联。由于Syslog 默认采用了UDP协议进行数据传输, 当数据发送过大时将会导致数据截断的报错。针对此问题, 需改用TCP的形式进行数据发送规避此问题。
  2. Wazuh Manager 部分告警缺少”必要“关联字段的现象。如: 本次场景中syscheck告警事件, 默认不会携带srcip的字段, 对于此问题可以通过在Manager上编写一个预处理脚本来解决。

改造

  1. 改造之前:

    Suricata (Wazuh agent) —(Agent: UDP 1514)—> Wazuh Manager

  2. 改造之后:

    所有的标准化都由Logstash来进行, Filebeat只需要做’无脑‘转发即可。

workflow:

image-20200303215020561


Filebeat配置
  • filebeat.yaml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
#=========================== Filebeat inputs =============================
filebeat.inputs:
- type: log
paths:
- "/var/log/suricata/alert-*.json"
fields_under_root: true
fields: { application: suricata }
json.keys_under_root: true
json.overwrite_keys: true
json.message_key: log
tail_files: false

scan_frequency: 1s
harvester_buffer_size: 104857600
backoff: 1s
max_backoff: 10s
close_timeout: 30m
close_inactive: 10m
clean_inactive: 72h
ignore_older: 70h
registry_file: /etc/filebeat/registry/wazuh/

#================================ Processors ==================================
processors:
- drop_fields:
fields: ["ecs.version", "agent.ephemeral_id", "agent.version", "agent.type", "agent.id", "agent.ephemeral_id", "input.type"]

#================================ Outputs =====================================
output.logstash:
hosts: ["logstash:5010"]
loadbalance: true
worker: 4
compression_level: 3
bulk_max_size: 4096

Logstash配置
  • 00_input.conf
1
2
3
4
5
6
7
input {
beats {
port => 5010
codec => "json_lines"
tags => ["beats"]
}
}
  • 50_suricata.conf
1
2
3
4
5
6
7
8
filter {
if [application] == "suricata" {
date {
match => [ "timestamp", "ISO8601" ]
target => "timestamp"
}
}
}
  • mapping.json
1
2
3
4
5
6
7
8
{
"common_mapping": {
"src_ip": "srcip",
"dest_ip": "dstip",
"src_port": "srcport",
"dest_port": "dstport"
}
}
  • 70_normalized-suricata.conf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
filter {
clone {
clones => [ "siem_events" ]
}
}

filter {
if [type] == "siem_events" {
mutate {
remove_field => [ "application", "type", "agent", "@version", "@timestamp"]
add_field => {
"provider" => "Suricata"
"product" => "Intrusion Detection System"
}
}

ruby {
init => "
require 'json'

mapping_json = File.read('/etc/logstash/mappings/wazuh/mapping.json')
mapping = JSON.parse(mapping_json)
@common_mapping = mapping['common_mapping']
"

code => "
keys = event.to_hash.keys
keys.each do |key|
if @common_mapping.include? key then
value = event.get(key)
event.remove(key)
new_key = @common_mapping[key]
event.set(new_key, value)
end
end

sensor = event.get('[host][name]')
event.set('sensor', sensor)
"
}
}
}
  • 99_output-elasticsearch.conf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
output {
if [event_type] == "alert" {
elasticsearch {
cacert => "/etc/logstash/certs/ca/ca.crt"
user => "elastic"
password => "Hello World!"
hosts => ["https://elasticsearch:9200"]
index => "suricata-%{+YYYY.MM.dd}"
template => "/etc/logstash/index-template.d/suricata-template.json"
template_name => "suricata"
template_overwrite => true
}
}
}
  • 99_output-wazuh.conf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
output {
if [provider] == "Suricata" {
syslog {
host => "wazuh"
protocol => "tcp"
port => 514
codec => "json"
sourcehost => "logstash"
appname => "NORMALIZED"
}
#stdout {
#codec => rubydebug
#}
}
}

Wazuh配置
  • custom-syscheck.py

    Wazuh Manager 新增预处理脚本对指定的安全事件进行预处理。如: syscheck事件增加srcip字段。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
import json
import sys
import time
import os
from datetime import datetime, timedelta, timezone

# ossec.conf configuration:
# <integration>
# <name>custom-syscheck</name>
# <rule_id>554</rule_id>
# <group>syscheck</group>
# <alert_format>json</alert_format>
# </integration>

# Global vars
debug_enabled = False
pwd = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
json_alert = {}
now = time.strftime("%a %b %d %H:%M:%S %Z %Y")
wazuh_server = "192.168.199.97"

# Set paths
log_file = '{0}/logs/integrations.log'.format(pwd)
syscheck_file = '{0}/logs/syscheck.json'.format(pwd)

def iso8601(hours=8):
td = timedelta(hours=hours)
tz = timezone(td)
return datetime.now(tz=tz).isoformat()

def main(args):
debug("# Starting")

# Read args
alert_file_location = args[1]

debug("# File location")
debug(alert_file_location)

# Load alert. Parse JSON object.
with open(alert_file_location) as alert_file:
json_alert = json.load(alert_file)
debug("# Processing alert")
debug(json_alert)

alert = normalized_data(json_alert)
with open(syscheck_file, 'a') as f:
msg = json.dumps(alert)
f.write(msg + '\n')

def debug(msg):
if debug_enabled:
msg = "{0}: {1}\n".format(now, msg)
with open(log_file, "a") as f:
f.write(msg)

def normalized_data(alert):
if alert['agent']['id'] == '000':
alert['srcip'] = wazuh_server
elif alert['agent'].get('ip'):
alert['srcip'] = alert['agent']['ip']
alert['dstip'] = alert['agent']['ip']
alert['integration'] = 'custom-syscheck'
alert['create_timestamp'] = iso8601()
debug(alert)
return(alert)

if __name__ == "__main__":
try:
# Read arguments
bad_arguments = False
if len(sys.argv) >= 4:
msg = '{0} {1} {2} {3} {4}'.format(now, sys.argv[1], sys.argv[2], sys.argv[3], sys.argv[4] if len(sys.argv) > 4 else '')
else:
msg = '{0} Wrong arguments'.format(now)
bad_arguments = True

# Logging the call
with open(log_file, 'a') as f:
f.write(msg + '\n')

if bad_arguments:
debug("# Exiting: Bad arguments.")
sys.exit(1)

# Main function
main(sys.argv)

except Exception as e:
debug(str(e))
raise
  • ossec.conf
    • 配置syslog选用TCP协议;
    • 加载预处理脚本;
    • 加载脚本输出日志;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
<ossec_config>
<remote>
<connection>syslog</connection>
<port>514</port>
<protocol>tcp</protocol> <!-- udp(default)/tcp -->
<allowed-ips>192.168.199.0/24</allowed-ips>
</remote>

<!-- Custom external Integration -->
<integration>
<name>custom-syscheck</name>
<rule_id>554</rule_id>
<group>syscheck</group>
<alert_format>json</alert_format>
</integration>

<!-- Custom syscheck.json -->
<localfile>
<log_format>json</log_format>
<location>/var/ossec/logs/syscheck.json</location>
</localfile>
</ossec_config>
  • local_decoder_normalized.xml

Sample

1
2
3
4
5
6
7
8
<!--
2020 Mar 03 02:53:39 logstash NORMALIZED[-]: {"timestamp":"2020-02-21T19:47:04.382300+0800","flow_id":1133835979634527,"in_iface":"wlp3s0","event_type":"alert","src_ip":"192.168.199.97","src_port":60022,"dest_ip":"192.168.199.162","dest_port":59143,"proto":"TCP","alert":{"action":"allowed","gid":1,"signature_id":123456,"rev":1,"signature":"LOCAL RULES XXX","severity":3}}
-->

<decoder name="nta_json">
<prematch>NORMALIZED[-]: </prematch>
<plugin_decoder offset="after_prematch">JSON_Decoder</plugin_decoder>
</decoder>
  • 0901-local_raw.xml
    • 默认引用的解码器为json, 这里需要修改为刚才新增的nta_json;
    • 通过overwrite=yes覆盖原始规则;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
<group name="suricata,">

<!-- /var/ossec/ruleset/rules/0475-suricata_rules.xml -->
<!-- Defind Suricata Rules -->
<!-- ID: 86600 - 86699 -->

<rule id="86600" level="0" overwrite="yes">
<decoded_as>nta_json</decoded_as>
<field name="timestamp">\.+</field>
<field name="event_type">\.+</field>
<description>Suricata messages.</description>
<options>no_full_log</options>
</rule>

</group>
  • 0905-local_syscheck.xml

    为预处理脚本生成的日志进行解析

1
2
3
4
5
6
7
8
<group name="syscheck,">
<rule id="187100" level="7">
<decoded_as>json</decoded_as>
<field name="integration">custom-syscheck</field>
<description>syscheck integration messages.</description>
<options>no_full_log</options>
</rule>
</group>
  • 9999-local_composite.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
<group name="local,composite,">

<!-- Local Composite Rules Range ID: 200000 - 205000 -->

<rule id="200000" level="15" frequency="2" timeframe="600">
<if_matched_sid>101000</if_matched_sid> <!-- 文件上传类规则 or 检测WebShell上传类规则 -->
<if_sid>187100</if_sid>
<same_source_ip /> <!-- 通过IP进行关联 -->
<description>Phase 3: 检测到服务器:$(srcip), 被上传WebShell.</description>
<options>no_full_log</options>
</rule>

<rule id="200001" level="12" frequency="2" timeframe="600">
<if_matched_sid>88801</if_matched_sid> <!-- WAF安全事件 -->
<if_group>ids</if_group>
<same_source_ip />
<description>Phase 3: Alarm - Same ip Bypass WAF of within 600 seconds. $(srcip) -> $(http.hostname) -> $(alert.signature) -> $(alert.signature_id).</description>
<options>no_full_log</options>
</rule>

</group>

总结

  1. 对于WebShell关联检测,目前采用的是同源IP以及时序的关联, 最为靠谱的应该是通过Hash的比对。这里要吐槽一下Suricata默认的fileinfo, 没办法自定义输出, 只要开启可被还原的协议都会输出fileinfo的事件。正因如此, 数据量一大Wazuh的引擎压力会很大。我尝试通过Lua来自定义一个文件审计类的事件, 貌似也同样没办法区分协议更别说针对http过滤条件进行自定义的过滤输出了。

  2. 由于关联规则本身是通过底层多个安全事件进行维度的关联提升告警的可靠性。因此, 底层安全事件不够准确同样会让上层的关联规则带来大量误报。对于底层安全事件的优化也是需要持续进行的。

  3. Wazuh v3.11.4 采用syslog接收大日志时, 会触发memory violation导致ossec-remoted进程重启, 该问题已向社区反馈下个版本中会解决。

参考

需求

​ 目前平台接入了Suricata的告警规则, 由于镜像源的关系部分规则产生了**’误’**告警, 因此需要针对这部分规则进行IP地址的过滤。

解决方法

  1. 修改Suricata的规则, 如果你的**’误’**告警量很大且为了性能考虑, 推荐直接修改Suricata的规则。
  2. 由于我这边的Suricata告警都是利用Wazuh进行**’消费’的。因此, 我这边直接采用了Wazuh CDB list**这个功能进行指定IP地址的过滤。

步骤

1. 创建 CDB list

Each key must be unique and is terminated with a colon :.

For IP addresses the dot notation is used for subnet matches:

key CIDR Possible matches
192.168.: 192.168.0.0/16 192.168.0.0 - 192.168.255.255
172.16.19.: 172.16.19.0/24 172.16.19.0 - 172.16.19.255
10.1.1.1: 10.1.1.1/32 10.1.1.1
1
2
3
$ vim /var/ossec/etc/lists/private_ip

10.168.:PrivateNet

Since Wazuh v3.11.3, CDB lists are built and loaded automatically when the analysis engine is started. Therefore, when adding or modifying CDB lists, it is no longer needed to run ossec-makelists, just restart the manager.

从Wazuh v3.11.3开始,将在启动分析引擎时自动构建和加载CDB列表。因此,添加或修改CDB列表时,不再需要运行ossec-makelists,只需重新启动管理器即可。

3.11.3 之前版本需要执行

1
$ /var/ossec/bin/ossec-makelists
2. 在 ossec.conf 中添加 list
1
2
3
4
5
6
7
8
$ vim /var/ossec/etc/ossec.conf

<ossec_config>
<ruleset>
<!-- User-defined CDB -->
<list>etc/lists/private_ip</list>
</ruleset>
</ossec_config>
3. 重启进程
1
$ systemctl restart wazuh-manager
4. 配置规则
1
2
3
4
5
6
7
8
9
10
11
12
13
14
<var name="SAME_IP_TIME">120</var>
<var name="SAME_IP_IGORE">300</var>

<group name="local,suricata,ids,">

<rule id="102018" level="8" frequency="5" timeframe="$SAME_IP_TIME" ignore="$SAME_IP_IGORE">
<if_matched_sid>86601</if_matched_sid>
<field name="alert.signature_id">2013057</field>
<list field="src_ip" lookup="not_address_match_key">etc/lists/private_ip</list>
<description>Wazuh Rules - Same ip of attack occurred 5 times within $SAME_IP_TIME seconds. $(src_ip) -> $(alert.signature) -> $(alert.signature_id).</description>
<options>no_full_log</options>
</rule>

</group>
5. 测试规则
1
$ /var/ossec/bin/ossec-logtest

参考

背景

​ 由于项目需要, 用到了Ruby对Logstash进行一些定制化的配置。翻遍了整个Logstash的官方文档, 对于Ruby API的介绍就写了两个方法。getset 也是醉了😂, 幸好谷歌一下…

解决方案

  • 删除事件:cancel
  • 取消删除事件:uncancel
  • 是否删除:cancelled?
  • 是否包含字段:include?
  • 删除字段:remove
  • 事件转字符串:to_s
  • 事件转hash字典(不含metadata字段):to_hash
  • 事件转hash字典(含metadata字段):to_hash_with_metadata
  • 事件转json字符串:to_json
  • 增加tag:tag
  • 取事件时间戳:timestamp

更多查询官方接口源码: JrubyEventExtLibrary.java

参考

背景

当Logstash要处理的多个input类型时, 最常见的两种解决方案(不推荐):

  1. 通过条件判断解决问题

  2. 通过多实例解决问题

负面影响:

  1. 条件判断

    1. 条件地狱。已知的在一个管道中实现多个独立流的方法是使用条件判断。主要方式是在输入部分通过标签标记事件,然后在过滤器中和输出阶段创建条件分支,对贴有不同标签的事件,应用不同的插件集。这种方式虽然可以解决问题,但在实际的使用中却非常的痛苦!下面是一个简单的 demo 片段:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    input {
    beats {
    port => 3444
    tag => apache
    }
    tcp {
    port => 4222
    tag => firewall
    }
    }

    filter {
    if "apache" in [tags] {
    dissect { ... }
    } else if "firewall" in [tags] {
    grok { ... }
    }
    }

    output {
    if "apache" in [tags] {
    elasticsearch { ... }
    } else if "firewall" in [tags] {
    tcp { ... }
    }
    }
    1. 缺乏拥塞管理。Logstash在所有事件和完成所有输出之前不会移动到下一批事件。这意味着,对于上面的管道,如果 TCP 套接字目标不可达,Logstash将不会处理其他批次的事件,这也就意味着 Elasticsearch 将不会接收事件,并且会对 TCP 输入和 Beats 输入施加反压力。
    2. 不同的数据流需要以不同的方式处理。如果 TCP - > Grok - > TCP 数据流处理大量的小数据,而 Beats -> Dissect -> ES 数据流中的单个数据体积大但是数量少。那么前一个数据流希望有多个 worker 并行并其每一批次处理更多事件,第二个数据流则期望使用少量的 worker 和每批次处理少量的事件。使用单个管道,无法为单个数据流指定独立的管道配置。
  2. 多实例

    1. 需要管理多个实例(通过 init 系统管理多个后台服务)
    2. 每个 Logstash 的实例也意味着一个独立的 JVM
    3. 需要监视每个 Logstash 实例

解决方案

​ 通过配置多管道(Multiple Pipelines), 解决以上的所有问题。

配置

/usr/share/logstash/config/pipelines.yml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# This file is where you define your pipelines. You can define multiple.
# For more information on multiple pipelines, see the documentation:
# https://www.elastic.co/guide/en/logstash/current/multiple-pipelines.html

- pipeline.id: dsiem
path.config: "/etc/logstash/conf.d_siem/*.conf"
pipeline.workers: 16
queue.type: persisted
queue.max_bytes: 300gb

- pipeline.id: cloudflare
path.config: "/etc/logstash/conf.d_cf/*.conf"
pipeline.workers: 8
queue.type: persisted
queue.max_bytes: 100gb

- pipeline.id: ti
path.config: "/etc/logstash/conf.d_ti/*.conf"
pipeline.workers: 8
queue.type: persisted
queue.max_bytes: 50gb

/etc/supervisord.d/logstash.ini

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
[program:logstash]
command=/usr/share/logstash/bin/logstash --path.data /lingtian/data/logstash/
#user=logstash
numprocs=1
autostart=true
autorestart=true
startsecs=1
startretries=3
exitcodes=0,2
stopsignal=INT
redirect_stderr=true
stdout_logfile_maxbytes=1MB
stdout_logfile_backups=5
stdout_capture_maxbytes=1MB
stdout_logfile=/lingtian/logs/logstash/logstash.log
加载配置文件

可通过使用参数, --config.reload.automatic or -r 在配置文件发生更改后自动检测并重新加载配置。

1
$ bin/logstash -f apache.config --config.reload.automatic

默认3秒检查配置文件, 可以通过使用参数, --config.reload.interval 修改秒数。

如果Logstash已经在没有启用自动重载的情况下运行,则可以通过向运行Logstash的进程发送SIGHUP(信号挂起)来强制Logstash重载配置文件并重新启动管道。例如:

1
$ kill -SIGHUP $logstash_pid

参考

背景

​ 在利用Scrapy编写爬虫时,经常会遇到同一个爬虫中包含多个item classes, 并且需要在同一个pipelines.py中根据不同的item classes进行逻辑的处理。

需求: 数据需要同时发送到ElasticSearch以及Redis, 唯一的区别就是item classes,不同。发送到Redis的数据是标准化之后的数据, 发送到ElasticSearch是原始的数据。

解决方法: 这里可以通过isinstance来进行区分。

实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# -*- coding: utf-8 -*-

# Define your item pipelines here
#
# Don't forget to add your pipeline to the ITEM_PIPELINES setting
# See: https://doc.scrapy.org/en/latest/topics/item-pipeline.html


from tb.items import RedisItem
from tb.items import ElasticSearchItem

class RedisPipeline(object):
def process_item(self, item, spider):
if isinstance(item, RedisItem):
# to do something
return item

class ElasticPipeline(object):
def process_item(self, item, spider):
if isinstance(item, ElasticSearchItem):
# to do something
return item

参考

需求

​ 在自建企业内部威胁情报的过程中, 需要通过Redis统一管理过期时间的Key。因此需要对过期的Key进行实时监听, 并进行回调处理。

思路

​ 在 Redis2.8.0 版本之后,其推出了一个新的特性——键空间消息(Redis Keyspace Notifications),它配合 2.0.0 版本之后的 SUBSCRIBE 就能完成这个定时任务的操作了。

Redis 的键空间通知支持 订阅指定 Key 的所有事件 与 订阅指定事件 两种方式。

Keyspace notifications are implemented sending two distinct type of events for every operation affecting the Redis data space. For instance a DEL operation targeting the key named mykey in database 0 will trigger the delivering of two messages, exactly equivalent to the following two PUBLISH commands:

1
2
PUBLISH __keyspace@0__:mykey del
PUBLISH __keyevent@0__:del mykey

通过 Redis 的键空间通知(keyspace notification)可以做到:将IoC数据写入Redis,设置过期时间10分钟,利用 Redis 键过期回调提醒,如果未被消费,则进行处理。

实现

1. 修改 redis.conf 开启redis key过期提醒

By default keyspace events notifications are disabled because while not very sensible the feature uses some CPU power. Notifications are enabled using the notify-keyspace-events of redis.conf or via the CONFIG SET.

由于键空间通知比较耗CPU, 所以 Redis默认是关闭键空间事件通知的, 需要手动开启 notify-keyspace-events 后才启作用。

1
2
3
4
5
6
7
8
9
10
11
K:keyspace事件,事件以__keyspace@<db>__为前缀进行发布;        
E:keyevent事件,事件以__keyevent@<db>__为前缀进行发布;
g:一般性的,非特定类型的命令,比如del,expire,rename等;
$:String 特定命令;
l:List 特定命令;
s:Set 特定命令;
h:Hash 特定命令;
z:Sorted 特定命令;
x:过期事件,当某个键过期并删除时会产生该事件;
e:驱逐事件,当某个键因maxmemore策略而被删除时,产生该事件;
A:g$lshzxe的别名,因此”AKE”意味着所有事件。

notify-keyspace-events Ex 表示开启键过期事件提醒

redis.conf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# RDB Config
databases 16
save 900 1
save 300 10
save 60 10000
stop-writes-on-bgsave-error yes
rdbcompression yes
rdbchecksum no
dir ./
# AOF Config
appendonly yes
appendfilename "appendonly.aof"
appendfsync everysec
no-appendfsync-on-rewrite no
auto-aof-rewrite-percentage 100
auto-aof-rewrite-min-size 64mb
# set Password
requirepass %{mypassword}
# set notify-keyspace-events
notify-keyspace-events Ex

2. RedisHelper

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
#!/usr/bin/env python3
# Author: Canon
# Date: 2019-12-27
# Version: 0.2

import redis

class RedisHelper():
def __init__(self):
# 连接Redis
self.__conn = redis.Redis(host='127.0.0.1', port=6379, password='mypassword', db=0)
# 定义keyevent, 此处0为indexdb
self.keyevent = '__keyevent@0__:expired'

# 发布方法
def publish(self, key, msg):
ttl = 10
self.__conn.setex(key, msg, ttl)
return True

# 订阅方法
def subscribe(self):
sub = self.__conn.pubsub()
sub.subscribe(self.keyevent)
for msg in sub.listen():
if msg['type'] == 'message':
ex_key = msg['data'].decode()
print(ex_key)

参考

需求

​ 由于网站时常遭受黑客攻击, 现准备将手头一些攻击者的IP地址收集起来用做企业内部的威胁情报。既然要着手做威胁情报, 那么就避免不了通过一些网站进行数据的丰富化。要想简单省事儿, 当然是使用购买api账号的方式,不过有的api也很坑, 除非购买的是企业版, 否则个人版的api会受到请求速率的限制。所以这边只能依靠爬虫(Scrapy)来收集数据。但是采用了分布式爬虫, 就避免不了需要进行集中管理, 以及统一下等操作。下面就说下利用Scrapy官方提供的爬虫管理工具(Scrapyd)来满足以上的需求。

Scrapyd

Scrapyd是由Scrapy 官方提供的爬虫管理工具,使用它我们可以非常方便地上传、控制爬虫并且查看运行日志。

安装

1
$ pip install scrapyd

启动

1
2
3
4
5
6
7
8
9
$ scrapyd
2019-12-19T10:56:06+0800 [-] Loading /Users/canon/anaconda3/lib/python3.7/site-packages/scrapyd/txapp.py...
2019-12-19T10:56:06+0800 [-] Scrapyd web console available at http://127.0.0.1:6800/
2019-12-19T10:56:06+0800 [-] Loaded.
2019-12-19T10:56:06+0800 [twisted.scripts._twistd_unix.UnixAppLogger#info] twistd 18.9.0 (/Users/canon/anaconda3/bin/python 3.7.1) starting up.
2019-12-19T10:56:06+0800 [twisted.scripts._twistd_unix.UnixAppLogger#info] reactor class: twisted.internet.selectreactor.SelectReactor.
2019-12-19T10:56:06+0800 [-] Site starting on 6800
2019-12-19T10:56:06+0800 [twisted.web.server.Site#info] Starting factory <twisted.web.server.Site object at 0x109efcf98>
2019-12-19T10:56:06+0800 [Launcher] Scrapyd 1.2.1 started: max_proc=48, runner='scrapyd.runner'

Scrapyd是一个服务端,我们需要通过一个客户端(Scrapyd-Client)将爬虫项目发送到Scrapyd服务中去。这里先修改一下Scrapyd服务地址,默认Scrapyd启动是通过命令: Scrapyd就可以直接启动,默认绑定的ip地址是127.0.0.1端口是:6800,这里为了其他主机可以访问,需将ip地址设置为0.0.0.0

​ 根据上图启动的信息, 可以看到默认配置文件是在/Users/canon/anaconda3/lib/python3.7/site-packages/scrapyd/中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
$ vim default_scrapyd.conf

[scrapyd]
eggs_dir = eggs
logs_dir = logs
items_dir =
jobs_to_keep = 5
dbs_dir = dbs
max_proc = 0
max_proc_per_cpu = 4
finished_to_keep = 100
poll_interval = 5.0
bind_address = 0.0.0.0
http_port = 6800
debug = off
runner = scrapyd.runner
application = scrapyd.app.application
launcher = scrapyd.launcher.Launcher
webroot = scrapyd.website.Root

[services]
schedule.json = scrapyd.webservice.Schedule
cancel.json = scrapyd.webservice.Cancel
addversion.json = scrapyd.webservice.AddVersion
listprojects.json = scrapyd.webservice.ListProjects
listversions.json = scrapyd.webservice.ListVersions
listspiders.json = scrapyd.webservice.ListSpiders
delproject.json = scrapyd.webservice.DeleteProject
delversion.json = scrapyd.webservice.DeleteVersion
listjobs.json = scrapyd.webservice.ListJobs
daemonstatus.json = scrapyd.webservice.DaemonStatus

Scrapyd-Client

Scrapyd-Client可以用来部署Scrapy项目,它会帮我们把项目打包成egg文件,我们不用再动手调用add version.json接口去部署到Scrapyd,操作简单。

安装

1
$ pip install scrapyd-client

配置

​ 要部署Scrapy项目,我们首先需要修改项目的配置文件。首先切换至项目根目录, 会看到有一个scrapy.cfg文件,它的内容如下:

1
2
3
4
5
6
7
8
9
10
11
# Automatically created by: scrapy startproject
#
# For more information about the [deploy] section see:
# https://scrapyd.readthedocs.io/en/latest/deploy.html

[settings]
default = spider_ti.settings

[deploy]
#url = http://localhost:6800/
project = spider_ti

这里需要配置一下deploy部分。例如, 我们将项目部署到10.10.10.1Scrapyd上,则修改内容如下:

1
2
3
[deploy]
url = http://10.10.10.1:6800/
project = spider_ti

这样我们再在scrapy.cfg文件所在路径执行如下命令:

1
2
3
4
5
6
$ scrapyd-deploy

Packing version 1576725163
Deploying to project "spider_ti" in http://localhost:6800/addversion.json
Server response (200):
{"node_name": "CanondeMacBook-Pro.local", "status": "ok", "project": "spider_ti", "version": "1576725163", "spiders": 3}

项目版本默认为当前时间戳。我们也可以指定项目版本,通过version参数传递即可。例如:

1
$ scrapyd-deploy --version 201912191114

注: 在Python3的Scrapyd 1.2.0版本中,版本号不能指定为带字母的字符串,它们必须为纯数字,否则会出现报错。

如果有多台主机,我们可以配置各台主机的别名,修改配置文件为:

1
2
3
4
5
6
7
[deploy:vm1]
url = http://10.10.10.1:6800/
project = spider_ti

[deploy:vm2]
url = http://10.10.10.2:6800/
project = spider_ti

在此统一配置多台主机,一台主机对应一组配置,在deploy后面加上主机的别名即可。如果想将项目部署到IP为10.10.10.2vm2主机,我们只需要执行如下命令:

1
scrapyd-deploy vm2

如此一来,我们只需要在scrapy.cfg文件中配置好各台主机的Scrapyd地址,然后调用scrapyd-deploy命令加主机名称即可实现部署。

如果Scrapyd设置了访问限制,我们可以在配置文件中加入用户名和密码的配置,同时修改端口成Nginx代理端口。例如: 在第1章我们使用的是6801,那么这里就需要改成6801,修改如下:

1
2
3
4
5
6
7
8
9
10
11
[deploy:vm1]
url = http://10.10.10.1:6801/
project = spider_ti
username = admin
password = admin

[deploy:vm2]
url = http://10.10.10.2:6801/
project = spider_ti
username = canon
password = canon

通过加入username和password字段,我们就可以在部署时自动进行Auth验证,然后成功实现部署。

运行

1
$ curl http://127.0.0.1:6800/schedule.json -d project=spider_ti -d spider=ti

列出任务

1
$ curl http://127.0.0.1:6800/listjobs.json?project=spider_ti | python -m json.tool

列出项目

1
$ curl http://127.0.0.1:6800/listprojects.json

停止

1
$ curl http://127.0.0.1:6800/cancel.json -d project=spider_ti -d job=838dec26222311ea8eb6a5eb893a35a5

删除

  • 版本
1
$ curl http://127.0.0.1:6800/delversion.json -d project=spider_ti -d version=1576735972
  • 项目
1
$ curl http://127.0.0.1:6800/delproject.json -d project=spider_ti

参考

Solr RCE CVE-2019-0193

1
2
3
4
5
6
7
8
9
# Solr POST RCE CVE-2019-0193
alert http $EXTERNAL_NET any -> $HOME_NET any (msg:"LOCAL RULES EXPLOIT Solr RCE CVE-2019-0193 POST"; flow:to_server,established; flowbits:set,CVE-2019-0193.post.request; content:"POST"; http_method; fast_pattern; content:"/solr"; http_uri; content:"/config"; http_uri; content:"params.resource.loader.enabled"; http_client_body; classtype:shellcode-detect; sid:3020016; rev:1; metadata:attack_target web_server, signature_severity Critical, direction outside_to_inside, created_at 2019_10_31, updated_at 2019_10_31, author Canon, tag RCE, tag CVE-2019-0193, tag http, tag exploit, tag Solr;)

alert http $EXTERNAL_NET any -> $HOME_NET any (msg:"LOCAL RULES EXPLOIT Solr RCE CVE-2019-0193 POST Successful"; flow:from_server,established; flowbits:isset,CVE-2019-0193.post.request; content:"200"; http_stat_code; classtype:shellcode-detect; sid:3020017; rev:1; metadata:attack_target web_server, signature_severity Critical, direction outside_to_inside, created_at 2019_10_31, updated_at 2019_10_31, author Canon, tag RCE, tag CVE-2019-0193, tag http, tag exploit, tag Solr;)

# Solr GET RCE CVE-2019-0193
alert http $EXTERNAL_NET any -> $HOME_NET any (msg:"LOCAL RULES EXPLOIT Solr RCE CVE-2019-0193 GET"; flow:to_server,established; flowbits:set,CVE-2019-0193.get.request; content:"GET"; http_method; content:"/solr"; http_uri; fast_pattern; content:"/select?"; http_uri; content:"wt=velocity"; http_uri; content:"java.lang.Runtime"; http_uri; content:"getRuntime().exec"; http_uri; classtype:shellcode-detect; sid:3020018; rev:1; metadata:attack_target web_server, signature_severity Critical, direction outside_to_inside, created_at 2019_10_31, updated_at 2019_10_31, author Canon, tag RCE, tag CVE-2019-0193, tag http, tag exploit, tag Solr;)

alert http $EXTERNAL_NET any -> $HOME_NET any (msg:"LOCAL RULES EXPLOIT Solr RCE CVE-2019-0193 GET Successful"; flow:from_server,established; flowbits:isset,CVE-2019-0193.get.request; content:"200"; http_stat_code; classtype:shellcode-detect; sid:3020019; rev:1; metadata:attack_target web_server, signature_severity Critical, direction outside_to_inside, created_at 2019_10_31, updated_at 2019_10_31, author Canon, tag RCE, tag CVE-2019-0193, tag http, tag exploit, tag Solr;)

背景

​ 由于近期网站遭受恶意攻击, 通过对于登录接口的审计与分析, 现已确定了一批可疑账号。既然之前写过一个登录接口的审计脚本, 那么完全可以通过扩展这个脚本来实现对于可疑账号的比对。主要思路: 通过将可疑账号存进Redis中, 再利用Lua脚本调用Redis接口进行账号的比对。

先说一下Suricata默认是存在黑名单机制的, 如下:

1
2
3
4
5
# IP Reputation
#reputation-categories-file: /etc/suricata/iprep/categories.txt
#default-reputation-path: /etc/suricata/iprep
#reputation-files:
# - reputation.list

Suricata 5.0 版本中更是增加了新的功能**Datasets**。大概看了一下, 可以通过在规则中使用datasetdatarep关键字将大量数据与sticky buffer进行匹配。确实是个很赞的功能!

1
2
3
4
5
alert http any any -> any any (http.user_agent; dataset:set, ua-seen, type string, save ua-seen.lst; sid:1;)

alert dns any any -> any any (dns.query; to_sha256; dataset:set, dns-sha256-seen, type sha256, save dns-sha256-seen.lst; sid:2;)

alert http any any -> any any (http.uri; to_md5; dataset:isset, http-uri-md5-seen, type md5, load http-uri-md5-seen.lst; sid:3;)

但是… 这并不适用我现在的场景, 因为在我的场景中, 用户的登录请求存在于POST请求中, 默认的Suricata方法并不能准确定位到我们需要的账号。这个时候我们就只能依赖于Lua脚本来扩展。当然这些需求Zeek也可以满足, 只是…Zeek的脚本真是难写…不忍吐槽~

准备阶段

运行环境

OS: Ubuntu 18.04

Suricata: Suricata 5.0.0 RELEASE

LuaRocks

  1. 由于Ubuntu默认没有安装**LuaRocks** (LuaRocks is the package manager for Lua modules), 这里需要我们手动安装。
1
2
# 通过apt直接安装, 简单省事儿。
$ apt-get install luarocks

  1. 通过luarocks安装我们所需要的lua模块, 这里我们需要用到redis-lualuasocket这两个模块。
1
2
3
4
5
6
7
8
9
10
11
12
13
# Install Modules
$ luarocks install luasocket
$ luarocks install redis-lua

$ ll /usr/local/share/lua/5.1/
total 72
drwxr-xr-x 3 root root 4096 Oct 25 03:35 ./
drwxr-xr-x 3 root root 4096 Sep 17 14:14 ../
-rw-r--r-- 1 root root 8331 Oct 25 03:34 ltn12.lua
-rw-r--r-- 1 root root 2487 Oct 25 03:34 mime.lua
-rw-r--r-- 1 root root 35599 Oct 25 03:35 redis.lua
drwxr-xr-x 2 root root 4096 Oct 25 03:34 socket/
-rw-r--r-- 1 root root 4451 Oct 25 03:34 socket.lua

  1. 安装成功后, 可以简单的测试一下。
  • 利用Docker启动Redis容器
1
$ docker run -ti -d -p 6379:6379 redis
  • 测试脚本hello_redis.lua
1
2
3
4
5
6
7
8
9
10
11
12
13
local redis = require "redis"

local client = redis.connect("127.0.0.1", 6379)

local response = client:ping()
if response == false then
return 0
end

client:set("hello", "world")

local var = client:get("hello")
print(var)
  • 可能会存在环境变量不对导致的报错
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
$ luajit hello_redis.lua
luajit: /usr/local/share/lua/5.1/redis.lua:793: module 'socket' not found:
no field package.preload['socket']
no file './socket.lua'
no file '/usr/local/share/luajit-2.0.5/socket.lua'
no file '/usr/local/share/lua/5.1/socket.lua'
no file '/usr/local/share/lua/5.1/socket/init.lua'
no file './socket.so'
no file '/usr/local/lib/lua/5.1/socket.so'
no file '/usr/local/lib/lua/5.1/loadall.so'
stack traceback:
[C]: in function 'require'
/usr/local/share/lua/5.1/redis.lua:793: in function 'create_connection'
/usr/local/share/lua/5.1/redis.lua:836: in function 'connect'
a.lua:3: in main chunk
[C]: at 0x56508049e440
  • 执行luarocks path --bin 并将结果输入
1
2
3
4
5
$ luarocks path --bin
Warning: The directory '/home/canon/.cache/luarocks' or its parent directory is not owned by the current user and the cache has been disabled. Please check the permissions and owner of that directory. If executing /usr/local/bin/luarocks with sudo, you may want sudo's -H flag.
export LUA_PATH='/home/canon/.luarocks/share/lua/5.1/?.lua;/home/canon/.luarocks/share/lua/5.1/?/init.lua;/usr/local/share/lua/5.1/?.lua;/usr/local/share/lua/5.1/?/init.lua;./?.lua;/usr/local/share/luajit-2.0.5/?.lua'
export LUA_CPATH='/home/canon/.luarocks/lib/lua/5.1/?.so;/usr/local/lib/lua/5.1/?.so;./?.so;/usr/local/lib/lua/5.1/loadall.so'
export PATH='/home/canon/.luarocks/bin:/usr/local/bin:/home/canon/anaconda3/bin:/home/canon/anaconda3/condabin:/usr/local/sbin:/usr/sbin:/usr/bin:/sbin:/bin:/snap/bin'
  • 执行脚本, 将会看到如下输出:
1
2
$ luajit hello_redis.lua
world

CJson

​ 这里强烈建议大家使用CJson模块, 我之前为了测试随便从github上找了个json模块来使用。这几天发现在网站的高峰时期 Suricata app_layer.flow 这个字段非常的大, 从而导致了 kernel_drops。由于我们的网站是面对海外用户有时差, 经过几天的熬夜最终定位到是由于json模块太过于消耗性能而导致。可以看下这个截图:

  • 启用CJson模块之前的监控图

image-20191104103020962

  • 启用CJson模块之后的监控图

image-20191104103557884

  1. 下载CJson模块
1
2
3
4
5
# wget 下载
$ wget https://www.kyne.com.au/~mark/software/download/lua-cjson-2.1.0.tar.gz

# Git
$ git clone git@github.com:mpx/lua-cjson.git

  1. 根据Lua环境修改Makefile(个人配置)
1
2
3
4
5
6
7
8
9
10
11
12
##### Build defaults #####
LUA_VERSION = 5.1
TARGET = cjson.so
PREFIX = /usr/local
#CFLAGS = -g -Wall -pedantic -fno-inline
CFLAGS = -O3 -Wall -pedantic -DNDEBUG
CJSON_CFLAGS = -fpic
CJSON_LDFLAGS = -shared
LUA_INCLUDE_DIR = $(PREFIX)/include/luajit-2.0
LUA_CMODULE_DIR = $(PREFIX)/lib/lua/$(LUA_VERSION)
LUA_MODULE_DIR = $(PREFIX)/share/lua/$(LUA_VERSION)
LUA_BIN_DIR = $(PREFIX)/bin
  1. 安装
1
2
3
4
5
6
7
8
9
10
11
12
13
14
$ make
cc -c -O3 -Wall -pedantic -DNDEBUG -I/usr/local/include/luajit-2.0 -fpic -o lua_cjson.o lua_cjson.c
In file included from lua_cjson.c:47:0:
fpconv.h:15:20: warning: inline function ‘fpconv_init’ declared but never defined
extern inline void fpconv_init();
^~~~~~~~~~~
cc -c -O3 -Wall -pedantic -DNDEBUG -I/usr/local/include/luajit-2.0 -fpic -o strbuf.o strbuf.c
cc -c -O3 -Wall -pedantic -DNDEBUG -I/usr/local/include/luajit-2.0 -fpic -o fpconv.o fpconv.c
cc -shared -o cjson.so lua_cjson.o strbuf.o fpconv.o

$ make install
mkdir -p //usr/local/lib/lua/5.1
cp cjson.so //usr/local/lib/lua/5.1
chmod 755 //usr/local/lib/lua/5.1/cjson.so

MD5

1
$ luarocks install --server=http://luarocks.org/manifests/kikito md5

代码示例

扩展登录接口审计脚本: http_audit.lua

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
json = require "cjson.safe"
md5 = require "md5"
redis = require "redis"

-- 登录接口
login_url = "/login"
-- 登录错误提示
success_code = 0
-- event_name
event_name = "login_audit"
-- event_type
event_type = "lua"
-- logs
name = "login_audit.json"
-- 协议
proto = "TCP"

-- redis_config
host = "127.0.0.1"
port = 6379

-- common_mapping
http_common_mapping = '{"accept":"accept","accept-charset":"accept_charset","accept-encoding":"accept_encoding","accept-language":"accept_language","user-agent":"user_agent"}'
common_mapping_table = json.decode(http_common_mapping)


-- defind functioin
function md5Encode(args)
m = md5.new()
m:update(args)
return md5.tohex(m:finish())
end

function formatStr(args)
t = {}
ios = string.match(args, 'canon')
if ios ~= nil then
mail = 'email"%s+(.-)%s'
t['email'] = string.match(args, mail)
else
data = string.split(args, '&')
for n, v in ipairs(data) do
d = string.split(v, '=')
t[d[1]] = d[2]
end
end
return t
end

function string.split(s, p)
rt = {}
string.gsub(s, '[^'..p..']+', function(w) table.insert(rt, w) end )
return rt
end

-- default function
function init (args)
local needs = {}
needs["protocol"] = "http"
return needs
end

function setup (args)
filename = SCLogPath() .. "/" .. name
file = assert(io.open(filename, "a"))
SCLogInfo("app_login_audit filename: " .. filename)
http = 0

-- Connect Redis Server
SCLogInfo("Connect Redis Server...")
client = redis.connect(host, port)
response = client:ping()
if response then
SCLogInfo("Redis Server connection succeeded.")
end
end

function log(args)
-- init tables
http_table = {}

-- ti tables
ti = {
tags = {}
}

-- init score
score = 50

-- http_hostname & http_url
http_hostname = HttpGetRequestHost()
http_url = HttpGetRequestUriNormalized()

-- http_method
rl = HttpGetRequestLine()
if rl then
http_method = string.match(rl, "%w+")
if http_method then
http_table["method"] = http_method
end
end

-- 为了保证 Suricata 的性能不受影响, 严格控制过滤条件
if http_url == login_url and http_method == "POST" then
http_table["hostname"] = http_hostname
http_table["url"] = http_url
http_table["url_path"] = http_url

-- http_status & http_protocol
rsl = HttpGetResponseLine()
if rsl then
status_code = string.match(rsl, "%s(%d+)%s")
http_table["status"] = tonumber(status_code)

http_protocol = string.match(rsl, "(.-)%s")
http_table["protocol"] = http_protocol
end

-- login_results
a, o, e = HttpGetResponseBody()
if a then
for n, v in ipairs(a) do
body = json.decode(v)
results_code = tonumber(body["code"])
if results_code == success_code then
http_table["results"] = "success"
else
http_table["results"] = "failed"
end
end
http_table["results_code"] = results_code
end

--[[
1. 获取用户登录email 并查询 Redis中是否存在该账号
2. 根据结果进行相应的打分以及tags标注
]]
--
a, o, e = HttpGetRequestBody()
if a then
for n, v in ipairs(a) do
res = formatStr(v)
if res["email"] then
-- 查询Redis对比黑名单
black_ioc = client:get(res["email"])
if black_ioc then
ti["provider"] = "Canon"
ti["producer"] = "NTA"
table.insert(ti["tags"], "account in blacklist")
score = score + 10
end
end
end
end

-- RequestHeaders
rh = HttpGetRequestHeaders()
if rh then
for k, v in pairs(rh) do
key = string.lower(k)
request_var = request_mapping_table[key]
if request_var then
http_table[request_var] = v
end
end
end

-- ResponseHeaders
rsh = HttpGetResponseHeaders()
if rsh then
for k, v in pairs(rsh) do
key = string.lower(k)
response_var = response_mapping_table[key]
if response_var then
http_table[response_var] = v
end
end
end

-- timestring
sec, usec = SCPacketTimestamp()
timestring = os.date("!%Y-%m-%dT%T", sec) .. '.' .. usec .. '+0000'

-- flow_info
ip_version, src_ip, dst_ip, protocol, src_port, dst_port = SCFlowTuple()

-- flow_id
id = SCFlowId()
flow_id = string.format("%.0f", id)
flow_id = tonumber(flow_id)

-- true_ip
true_client_ip = HttpGetRequestHeader("True-Client-IP")
if true_client_ip ~= nil then
src_ip = true_client_ip
end

-- session_id
tetrad = src_ip .. src_port .. dst_ip .. dst_port
session_id = md5Encode(tetrad)

-- table
raw_data = {
timestamp = timestring,
flow_id = flow_id,
session_id = session_id,
src_ip = src_ip,
src_port = src_port,
proto = proto,
dest_ip = dst_ip,
dest_port = dst_port,
event_name = event_name,
event_type = event_type,
app_type = app_type,
http = http_table,
ti = ti,
score = score
}

-- json encode
data = json.encode(raw_data)

file:write(data .. "\n")
file:flush()

http = http + 1
end

end

function deinit (args)
SCLogInfo ("app_login_audit transactions logged: " .. http);
file:close(file)
end

简单说下以上脚本的功能:

  1. 登录接口的用户名审计(废话…);
  2. 通过请求Redis比对当前用户是否在黑名单中, 并进行相应的打分、标签处理;
  3. 根据自定义的需求获取的http headers, 这个对于业务安全上还是有点用的;
  4. 针对CDN或者Nginx这种场景下, 可以直接对 xff 或者 true_client_ip 进行四元组的hash, 得到session_id, 这样溯源的时候会比较方便。因为在这种场景下传统的四层flow_id就不是那么有用了。
  5. 后续可以追加一些简单的检测方法, 例如:
    1. 检查请求头中的字段是否完成;
    2. 检查请求头中的某个字段长度是否符合合规;
    3. ……

配置Suricata启用Lua脚本

1
2
3
4
5
- lua:
enabled: yes
scripts-dir: /etc/suricata/lua-output/
scripts:
- login_audit.lua

启用Suricata

1
$ suricata -vvv --pfring -k none -c /etc/suricata/suricata.yaml

注: 这里-vvv 参数建议加上. 如果你的Lua脚本有一些问题, 如果加上了这个参数, 就可以通过这个日志看出。

日志样例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
{
"src_port": 62722,
"score": 65,
"session_id": "c863aeb2ef8d1b37f3257f8c210bf440",
"ti": {
"tags": [
"account in blacklist"
],
"provider": "Canon",
"producer": "NTA"
},
"alert": {
"alerted": true,
"rules": {
"请求头校验": "dev-id"
}
},
"proto": "TCP",
"flow_id": "1064295903559076",
"timestamp": "2019-10-25T08:33:55.585519+0000",
"event_type": "lua",
"src_ip": "1.1.1.1",
"dest_port": 80,
"http": {
"response_content_length": "96",
"response_content_type": "application/json; charset=UTF-8",
"accept_encoding": "gzip",
"accept": "application/json",
"results_code": 400504,
"server": "nginx",
"date": "Fri, 25 Oct 2019 08:33:55 GMT",
"app_version": "6.6.0",
"request_content_type": "application/x-www-form-urlencoded",
"user_agent": "okhttp/3.12.0",
"url": "/login",
"email": "canon@gmail.com",
"results": "failed",
"pragma": "no-cache",-
"cache_control": "no-cache, max-age=0, no-store",
"connection": "keep-alive",
"status": 200,
"protocol": "HTTP/1.1",
"hostname": "x.x.x.x",
"url_path": "/login",
"method": "POST",
"device": "RMX1920 Android8.0.0",
"device_type": "Android",
"request_content_length": "39"
},
"event_name": "login_audit",
"dest_ip": "2.2.2.2"
}

需求:

​ 现有一批高危用户, 需要实时关注该账号的登录情况。由于之前已经写好了一个针对用户登录账号的审计规则, 因此, 这里需要用到**Wazuh CDB list**这个功能(此功能主要用例是创建用户,IP或域名的白/黑列表。)消费审计规则数据即可。


  1. 新建列表
1
2
3
4
5
$ more blacklist

admin:
root:
administrator:
  1. 将列表文件添加到ossec.conf
1
2
3
4
5
6
7
8
$ more ossec.conf

<ossec_config>
<ruleset>
<!-- User-defined CDB list -->
<list>etc/lists/blacklist</list>
</ruleset>
</ossec_config>
  1. 编译列表
1
2
3
$ /var/ossec/bin/ossec-makelists

* File etc/lists/blacklist.cdb needs to be updated
  1. 重启进程
1
$ sudo systemctl restart wazuh-manager
  1. 配置规则
1
2
3
4
5
6
7
8
9
10
11
12
13
14
<group name="local,blacklist,">

<!-- Defind blacklist Rules -->
<!-- ID: 100150 - 100199 -->

<rule id="100163" level="12">
<if_sid>100303</if_sid>
<list field="http.email" lookup="match_key">etc/lists/blacklist</list>
<description>Wazuh Rules - High-risk user login detected. $(src_ip) -> $(http.email) -> $(http.hostname) -> $(http.url) = $(http.results).</description>
<options>no_full_log</options>
<group>blacklist,</group>
</rule>

</group>
  1. 测试规则
1
2
3
4
5
6
7
8
9
$ ./ossec-logtest
2019/10/18 15:06:47 ossec-testrule: INFO: Started (pid: 2184).
ossec-testrule: Type one log per line.

**Phase 3: Completed filtering (rules).
Rule id: '100163'
Level: '12'
Description: 'Wazuh Rules - High-risk user login detected. 1.1.1.1 -> admin -> canon88.github.io -> /user/login = success.'
**Alert to be generated.