写在前面
这并不是什么高精尖的架构与技术。这只是我个人在工作中结合目前手头的资源进行了一些整合 。当然实现这些需求的方法有很多, 有钱的可以考虑Splunk, 没钱的有研发的团队的可以上Flink、Esper 。
需求
由于攻防对抗的升级, 通过单一的数据源很难直接断言攻击是否成功。因此, 我们需要结合多个数据源进行安全事件的关联, 从中提炼出可靠性较高的安全告警进行人工排查。例如: 针对WebShell上传类的, 可以通过网络流量 + 终端进行关联; 针对Web攻击类, 可以通过WAF + NIDS的事件关联, 得到Bypass WAF 的安全告警。
解决思路
虽然Wazuh本身具备安全事件的关联能力, 但在传统的部署架构中, 通常是由Wazuh Agent将安全事件发送到Wazuh Manager,通过Manager进行安全事件的关联告警。由于缺少了对数据进行ETL, 使得Wazuh Manager很难对异构数据进行关联。因此, 我们需要通过Logstash实现对数据的标准化, 并将标准化后的数据通过Syslog的形式发送到Wazuh Manager, 从而进行异构数据的关联。
坑点
- 本次改造采用了Syslog的形式将数据发送到Wazuh Manager端进行数据关联。由于Syslog 默认采用了UDP协议进行数据传输, 当数据发送过大时将会导致数据截断的报错。针对此问题, 需改用TCP的形式进行数据发送规避此问题。
- Wazuh Manager 部分告警缺少”必要“关联字段的现象。如: 本次场景中
syscheck
告警事件, 默认不会携带srcip
的字段, 对于此问题可以通过在Manager上编写一个预处理脚本来解决。
改造
改造之前:
Suricata (Wazuh agent) —(Agent: UDP 1514)—> Wazuh Manager
改造之后:
所有的标准化都由Logstash来进行, Filebeat只需要做’无脑‘转发即可。
workflow:
Filebeat配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| filebeat.inputs: - type: log paths: - "/var/log/suricata/alert-*.json" fields_under_root: true fields: { application: suricata } json.keys_under_root: true json.overwrite_keys: true json.message_key: log tail_files: false
scan_frequency: 1s harvester_buffer_size: 104857600 backoff: 1s max_backoff: 10s close_timeout: 30m close_inactive: 10m clean_inactive: 72h ignore_older: 70h registry_file: /etc/filebeat/registry/wazuh/
processors: - drop_fields: fields: ["ecs.version", "agent.ephemeral_id", "agent.version", "agent.type", "agent.id", "agent.ephemeral_id", "input.type"]
output.logstash: hosts: ["logstash:5010"] loadbalance: true worker: 4 compression_level: 3 bulk_max_size: 4096
|
Logstash配置
1 2 3 4 5 6 7
| input { beats { port => 5010 codec => "json_lines" tags => ["beats"] } }
|
1 2 3 4 5 6 7 8
| filter { if [application] == "suricata" { date { match => [ "timestamp", "ISO8601" ] target => "timestamp" } } }
|
1 2 3 4 5 6 7 8
| { "common_mapping": { "src_ip": "srcip", "dest_ip": "dstip", "src_port": "srcport", "dest_port": "dstport" } }
|
70_normalized-suricata.conf
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
| filter { clone { clones => [ "siem_events" ] } }
filter { if [type] == "siem_events" { mutate { remove_field => [ "application", "type", "agent", "@version", "@timestamp"] add_field => { "provider" => "Suricata" "product" => "Intrusion Detection System" } }
ruby { init => " require 'json'
mapping_json = File.read('/etc/logstash/mappings/wazuh/mapping.json') mapping = JSON.parse(mapping_json) @common_mapping = mapping['common_mapping'] "
code => " keys = event.to_hash.keys keys.each do |key| if @common_mapping.include? key then value = event.get(key) event.remove(key) new_key = @common_mapping[key] event.set(new_key, value) end end
sensor = event.get('[host][name]') event.set('sensor', sensor) " } } }
|
99_output-elasticsearch.conf
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| output { if [event_type] == "alert" { elasticsearch { cacert => "/etc/logstash/certs/ca/ca.crt" user => "elastic" password => "Hello World!" hosts => ["https://elasticsearch:9200"] index => "suricata-%{+YYYY.MM.dd}" template => "/etc/logstash/index-template.d/suricata-template.json" template_name => "suricata" template_overwrite => true } } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| output { if [provider] == "Suricata" { syslog { host => "wazuh" protocol => "tcp" port => 514 codec => "json" sourcehost => "logstash" appname => "NORMALIZED" } } }
|
Wazuh配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91
| import json import sys import time import os from datetime import datetime, timedelta, timezone
debug_enabled = False pwd = os.path.dirname(os.path.dirname(os.path.realpath(__file__))) json_alert = {} now = time.strftime("%a %b %d %H:%M:%S %Z %Y") wazuh_server = "192.168.199.97"
log_file = '{0}/logs/integrations.log'.format(pwd) syscheck_file = '{0}/logs/syscheck.json'.format(pwd)
def iso8601(hours=8): td = timedelta(hours=hours) tz = timezone(td) return datetime.now(tz=tz).isoformat()
def main(args): debug("# Starting")
alert_file_location = args[1]
debug("# File location") debug(alert_file_location)
with open(alert_file_location) as alert_file: json_alert = json.load(alert_file) debug("# Processing alert") debug(json_alert)
alert = normalized_data(json_alert) with open(syscheck_file, 'a') as f: msg = json.dumps(alert) f.write(msg + '\n')
def debug(msg): if debug_enabled: msg = "{0}: {1}\n".format(now, msg) with open(log_file, "a") as f: f.write(msg)
def normalized_data(alert): if alert['agent']['id'] == '000': alert['srcip'] = wazuh_server elif alert['agent'].get('ip'): alert['srcip'] = alert['agent']['ip'] alert['dstip'] = alert['agent']['ip'] alert['integration'] = 'custom-syscheck' alert['create_timestamp'] = iso8601() debug(alert) return(alert)
if __name__ == "__main__": try: bad_arguments = False if len(sys.argv) >= 4: msg = '{0} {1} {2} {3} {4}'.format(now, sys.argv[1], sys.argv[2], sys.argv[3], sys.argv[4] if len(sys.argv) > 4 else '') else: msg = '{0} Wrong arguments'.format(now) bad_arguments = True
with open(log_file, 'a') as f: f.write(msg + '\n')
if bad_arguments: debug("# Exiting: Bad arguments.") sys.exit(1)
main(sys.argv)
except Exception as e: debug(str(e)) raise
|
ossec.conf
- 配置syslog选用TCP协议;
- 加载预处理脚本;
- 加载脚本输出日志;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| <ossec_config> <remote> <connection>syslog</connection> <port>514</port> <protocol>tcp</protocol> <allowed-ips>192.168.199.0/24</allowed-ips> </remote> <integration> <name>custom-syscheck</name> <rule_id>554</rule_id> <group>syscheck</group> <alert_format>json</alert_format> </integration> <localfile> <log_format>json</log_format> <location>/var/ossec/logs/syscheck.json</location> </localfile> </ossec_config>
|
local_decoder_normalized.xml
Sample
1 2 3 4 5 6 7 8
|
<decoder name="nta_json"> <prematch>NORMALIZED[-]: </prematch> <plugin_decoder offset="after_prematch">JSON_Decoder</plugin_decoder> </decoder>
|
0901-local_raw.xml
- 默认引用的解码器为
json
, 这里需要修改为刚才新增的nta_json
;
- 通过
overwrite=yes
覆盖原始规则;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| <group name="suricata,">
<rule id="86600" level="0" overwrite="yes"> <decoded_as>nta_json</decoded_as> <field name="timestamp">\.+</field> <field name="event_type">\.+</field> <description>Suricata messages.</description> <options>no_full_log</options> </rule>
</group>
|
0905-local_syscheck.xml
为预处理脚本生成的日志进行解析
1 2 3 4 5 6 7 8
| <group name="syscheck,"> <rule id="187100" level="7"> <decoded_as>json</decoded_as> <field name="integration">custom-syscheck</field> <description>syscheck integration messages.</description> <options>no_full_log</options> </rule> </group>
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| <group name="local,composite,">
<rule id="200000" level="15" frequency="2" timeframe="600"> <if_matched_sid>101000</if_matched_sid> <if_sid>187100</if_sid> <same_source_ip /> <description>Phase 3: 检测到服务器:$(srcip), 被上传WebShell.</description> <options>no_full_log</options> </rule>
<rule id="200001" level="12" frequency="2" timeframe="600"> <if_matched_sid>88801</if_matched_sid> <if_group>ids</if_group> <same_source_ip /> <description>Phase 3: Alarm - Same ip Bypass WAF of within 600 seconds. $(srcip) -> $(http.hostname) -> $(alert.signature) -> $(alert.signature_id).</description> <options>no_full_log</options> </rule> </group>
|
总结
对于WebShell关联检测,目前采用的是同源IP以及时序的关联, 最为靠谱的应该是通过Hash的比对。这里要吐槽一下Suricata默认的fileinfo, 没办法自定义输出, 只要开启可被还原的协议都会输出fileinfo的事件。正因如此, 数据量一大Wazuh的引擎压力会很大。我尝试通过Lua来自定义一个文件审计类的事件, 貌似也同样没办法区分协议更别说针对http过滤条件进行自定义的过滤输出了。
由于关联规则本身是通过底层多个安全事件进行维度的关联提升告警的可靠性。因此, 底层安全事件不够准确同样会让上层的关联规则带来大量误报。对于底层安全事件的优化也是需要持续进行的。
Wazuh v3.11.4 采用syslog接收大日志时, 会触发memory violation
导致ossec-remoted
进程重启, 该问题已向社区反馈下个版本中会解决。
参考