How to Use Zeek ThreatHunting?

前言

春节期间,我利用课余时间将之前的一些想法付诸实践。我们都知道,网络威胁分析(NTA)除了具备网络入侵检测系统(NIDS)功能外,还有一个重要特性就是强大的协议解析能力,这对于安全事件的溯源提供了极大的帮助。然而,并非所有的企业都能充分利用这个能力,主要原因有两个方面:

  1. 存储成本:如果需要进行全面的网络元数据捕获,它的存储成本将会是一个“挑战”。因为,你的网络流量越大也就意味着你的存储成本越高。
  2. 隐私合规:如果你所在公司的数据安全成熟度不高且公司业务又面临强合规监管。那么,合规法案将会在一定程度上与数据捕获的需求“冲突”。

所以,如果你和我一样在工作中即需要满足安全需求又需要考虑这么做是否合规,那么我相信本篇文章会对你有一些帮助,既在满足合规的前提下为威胁狩猎提供更多的网络元数据。

需求概述

当触发告警时,Zeek可以从Kafka获取IoC并根据指定时间窗口进行数据捕获。

实现步骤

  1. 首先,需要搞定如何让Zeek连接Kafka。好消息是,Zeek v6.0已经支持通过ZeekJS插件加载并执行JavaScript代码。有了JavaScript的加持,一切皆有可能!

  2. 其次,需要尝试为IoC设置过期时间。Zeek本身对部分类型是支持过期时间这个属性的,借助create_expire这个属性再加上Intelligence Framework就可以为每个IoC设置不同的过期时间。这里给大家推荐J-Grasintel-expire项目,我们就不用“重复造轮子”了。

  3. 最后,使用Intel::LOG作为触发器捕获元数据。这里需要使用Zeek的Intelligence Framework来对IoC进行实时的匹配,你可以将Intel::LOG视为一个触发器,一旦匹配到之后将会自动捕获与之对应的uid事件。这里需要使用Zeek v6.2,目前还没release,尝鲜的话可以先用 zeek version 6.2.0-dev 。

这里需要说明一点,Zeek中负责嗅探网络流量并解析协议的是Worker角色,所以我们的需求落实到代码层面也必须让Worker角色来执行。由于当前Intel::match()方法作用域是在Manager上而并非在Worker上。因此,它并不能满足我们当前的需求,好消息是 Zeek v6.2 版本将会支持,届时我们可以使用Intel::seen_policy()来实现。这里有个例外,如果你的环境中Zeek是非集群架构的话,因为Manager和Worker都在一台机器上,所以,就不存在这个问题了,直接使用Intel::match()就行了。若想了解更多关于Zeek架构方面的知识,请参考:Cluster-architecture

工作原理

解决了上述三个问题之后,它的工作流程应该是这样的:

Workflow

目录结构
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
$ tree threat-hunting
threat-hunting
├── config.dat
├── config.zeek
├── __load__.zeek
├── main.zeek
├── plugins
│   ├── conn
│   │   ├── investigation.zeek
│   ├── dns
│   │   ├── investigation.zeek
│   ├── http
│   │   ├── investigation.zeek
│   │   └── normalized.zeek
│   └── __load__.zeek
└── threathunting.js
脚本说明
Script Description
main.zeek 主程序,通过ZeekJS调用 threathunting.js
config.dat 配置文件,无需重启Zeek控制指定plugin的启用与禁用。你会经常用到它
plugins 插件目录,用户自定义的插件目录。你会经常用到它

如何使用

让我们动手,编写一个自己的插件!

通常你只需要创建一个investigation.zeek脚本,并编辑Intel::seen_policyHTTP::log_policy中的内容即可。如果你对日志标准化有需求也可以创建一个normalized.zeek来实现标准化。下面以创建./plugins/http/investigation.zeek为示例:

  • 首先,为你所需要的日志增加一个类型为boolthreathunting 字段,这里是 HTTP::Info

    1
    2
    3
    redef record HTTP::Info += {
    threathunting: bool &log &optional;
    };
  • 然后,使用 Intel::seen_policy 在匹配到情报时将threathunting字段设置为True。对了,这里HTTP记得添加config.datThreatHunting::enable_module HTTP,它将用来控制插件的热启停。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    # Hook for filtering Intel log entries based on predefined criteria.
    hook Intel::seen_policy(s: Intel::Seen, found: bool)
    {
    # Break if there is no match.
    if ( ! found )
    break;

    # Check if the current log entry matches the set investigation criteria.
    if ( ("HTTP" in enable_module) && (s$conn?$http) )
    s$conn$http$threathunting = T;
    }
  • 最后,使用 HTTP::log_policythreathunting 字段为True的日志进行捕获,搞定!是不是很简单?

    1
    2
    3
    4
    5
    6
    7
    8
    hook HTTP::log_policy(rec: HTTP::Info, id: Log::ID, filter: Log::Filter)
    {
    if ( filter$name == "http_investigation" ) {
    if (! rec?$threathunting) {
    break;
    }
    }
    }

功能演示

threathunting

  1. 这里通过推送IoC(httpbin.org)到Kafka,设置过期时间为60秒;

    1
    2
    3
    4
    5
    6
    7
    8
    9
    {
    "ioc": "httpbin.org",
    "type": "domain",
    "meta": {
    "expire": 60,
    "source": "SOAR",
    "desc": "bad domain"
    }
    }
  2. 模拟访问httpbin.org时将会触发Intel::LOG以及HTTP::LOG

  3. 60秒之后再次尝试访问httpbin.org,未生成Intel::LOG以及HTTP::LOG,因为IoC已过期。

让ChatGPT给上个“价值”

总的来说,使用Zeek进行威胁狩猎的过程在解决存储成本和隐私合规两大问题上展现了显著的优势。Zeek能够从Kafka获取IoC并在指定的时间窗口内进行数据捕获,有效地降低了存储成本。同时,利用Zeek的Intelligence Framework和intel-expire项目设置IoC的过期时间,避免了无限捕获带来的性能损耗,同时也确保了隐私合规。此外,编写自定义插件的功能为用户提供了极大的便利性和灵活性。值得一提的是,这个方案不仅在安全领域有着广泛的应用,也可以有效地应用于其他非安全的场景,如故障排错,从而发挥出更大的价值。