0%

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# Installing from GIT
$ git clone https://github.com/ntop/PF_RING.git

# Kernel Module Installation
$ cd /opt/PF_RING/kernel
$ make
$ sudo make install

# Running PF_RING
$ cd PF_RING/kernel
$ sudo insmod pf_ring.ko min_num_slots=65536 enable_tx_capture=0
# sudo insmod ./pf_ring.ko [min_num_slots=N] [enable_tx_capture=1|0] [ enable_ip_defrag=1|0]

# Drivers
$ ethtool -i eth1 | grep driver
driver: ixgbe

# Libpfring and Libpcap Installation
$ cd PF_RING/userland/lib
$ ./configure && make
$ sudo make install
$ cd ../libpcap
$ ./configure && make
$ sudo make install

# Application Examples
$ cd PF_RING/userland/examples
$ make
$ sudo ./pfcount -i zc:eth1
$ sudo ./pfsend -f 64byte_packets.pcap -n 0 -i zc:eth1 -r 5
1
2
3
4
$ git clone --recursive https://github.com/zeek/zeek
$ ./configure --with-pcap=/opt/PF_RING --enable-jemalloc
$ make -j4
$ sudo make install

背景

​ 随着企业安全建设的不断完善,信息安全的工作也进入了Happy苦逼)的运营阶段。谈起安全运营工作,自然避不开事件响应这个话题。对于安全事件响应而言,我们时常会需要进行跨部门的协作。并且在某些事件中,我们甚至需要进行持续的跟踪与排查。因此,在事件的响应过程中,对于每一个响应步骤的记录显得尤为重要。它可以帮助我们在事件解决后,将经验教训纳入其中,加强整体安全能力。另一方面从自动化的角度来说,我们也应该考虑如何将响应过程转换为可被复用的Playbook,用以快速应对攻击,从而缩短感染攻击到遏制攻击的时间。

下面来说说我这的痛点,或者也可以说是我们在运营过程中所需要解决的一些问题:

  • 如何在事件响应过程中记录每一个响应步骤所花费的时间?这些任务的处理时间,将会直接影响到我们后期MTTDMTTR的计算。
  • 如何从安全事件中提炼Playbook?对于重复可被流程化的过程,自动化才是王道啊。
  • 面对各种“”操作的攻击手法,如何提供更多可定制化的插件给安全分析人员使用,用以提升安全分析的效率?
  • 如何快速的与现有的安全设备进行联动,并及时止损。
  • 通常安全事件会涉及跨部门协作的情况,我们如何快速就此次事件展开分析并及时与协作部门之间同步事件进展。

安全事件响应平台 - TheHive

​ 我最终选择了*TheHive* 安全事件响应平台来协助我进行日常的安全运营工作。TheHive不同于SIEM这类的产品,它主要对接的是需要被真实响应的事件。个人粗略汇总了一下它的特点:

  • 融合协作TheHive将安全事件视作Case,提倡多人、跨部门之间的协作。通过分享机制,可以快速与协作部门之间同步安全事件进展。
  • 成本度量TheHive支持记录每个CaseTask的时间成本开销。可以帮助我们更好的去度量现有的MTTD、MTTR指标,也为我们后期去优化指标提供了重要的依据。
  • 快速响应:在事件响应的过程中,你会需要对已有的数据进行分析,并迅速提供补救措施来阻止攻击。TheHiveCortex组件支持对数据进行快速的分析,并将已确认的IoC自动化推送到现有的安全设备完成与SIEM、WAF、FW、EDR的联动。
  • 效率提升:对于可被流程化的响应过程,必然是需要自动化的,也就少不了日常Playbook的积累。那么,Playbook从何而来?我们可以采用TheHive去记录每一次的安全事件响应的过程,并通过Task的形式去拆分需要协作的事项以及响应的步骤,利用这种方式帮助我们去积累Playbook

TheHive集群部署

​ 由于篇幅的关系,这里主要介绍的是采用TheHive集群时需要调整的一些配置。至于如何安装TheHive,请参考:Step-by-Step guide。如果只是为了测试的话,可以直接用官网提供的Docker或者VM镜像。

​ 根据官方文档介绍,TheHive集群涉及4个部分。以下将会分别说明当采用TheHive集群时,TheHive、Cortex、Cassandra、Minio需要做的调整。

Thehive

​ 我们将节点1视为主节点,通过编辑/etc/thehive/application.conf文件来配置akka组件,如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
## Akka server
akka {
cluster.enable = on
actor {
provider = cluster
}
remote.artery {
canonical {
hostname = "<My IP address>"
port = 2551
}
}
# seed node list contains at least one active node
cluster.seed-nodes = [
"akka://application@<Node 1 IP address>:2551",
"akka://application@<Node 2 IP address>:2551",
"akka://application@<Node 3 IP address>:2551"
]
}

Cassandra

  • 集群配置

    • 使用以下参数更新配置文件:/etc/cassandra/cassandra.yaml
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    cluster_name: 'thp'
    num_tokens: 256
    authenticator: PasswordAuthenticator
    authorizer: CassandraAuthorizer
    role_manager: CassandraRoleManager
    data_file_directories:
    - /var/lib/cassandra/data
    commitlog_directory: /var/lib/cassandra/commitlog
    saved_caches_directory: /var/lib/cassandra/saved_caches
    seed_provider:
    - class_name: org.apache.cassandra.locator.SimpleSeedProvider
    parameters:
    - seeds: "<ip node 1>, <ip node 2>, <ip node 3>"
    listen_interface : ens160 # 监听的接口
    rpc_interface: ens160 # 监听的接口
    endpoint_snitch: SimpleSnitch
    • 删除文件 /etc/cassandra/cassandra-topology.properties
    1
    $ rm -rf /etc/cassandra/cassandra-topology.properties
  • 启动服务

    • 在每个节点上启动服务
    1
    $ service cassandra start
    • 查询集群状态
    1
    2
    3
    4
    5
    6
    7
    8
    9
    $ nodetool status
    Datacenter: datacenter1
    =======================
    Status=Up/Down
    |/ State=Normal/Leaving/Joining/Moving
    -- Address Load Tokens Owns (effective) Host ID Rack
    UN 192.168.199.35 449.33 KiB 256 100.0% 72e95db1-9c37-4a53-9312-76bd0b2e6ca7 rack1
    UN 192.168.199.36 631.65 KiB 256 100.0% 4051f9d4-91de-43e5-9a4a-c3da46417830 rack1
    UN 192.168.199.37 437.13 KiB 256 100.0% 8844626f-04c0-4dd3-855e-088935b8dc65 rack1
  • 初始化数据库

    • 修改数据库默认密码(默认账户密码:cassandra/cassandra
    1
    2
    3
    $ cqlsh th01 -u cassandra
    cassandra@cqlsh> ALTER USER cassandra WITH PASSWORD 'HelloWorld';
    cassandra@cqlsh> quit;
    • 确保所有节点上的用户账户都是一致的
    1
    2
    $ cqlsh <ip node X> -u cassandra
    cassandra@cqlsh> ALTER KEYSPACE system_auth WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 3 };
    • 创建名为thehiveKEYSPACE
    1
    cassandra@cqlsh> CREATE KEYSPACE thehive WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '3' } AND durable_writes = 'true';
    • 创建角色thehive,并授予thehive 权限(选择密码)
    1
    2
    cassandra@cqlsh> CREATE ROLE thehive WITH LOGIN = true AND PASSWORD = 'HelloWorld';
    cassandra@cqlsh> GRANT ALL PERMISSIONS ON KEYSPACE thehive TO 'thehive';
  • TheHive 相关配置

    由于最新的TheHive集群需要配合ElasticSearch进行索引,因此需要同步更新如下配置:

    • 更新/etc/thehive/application.conf配置
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    db.janusgraph {
    storage {
    ## Cassandra configuration
    backend: cql
    hostname: ["<ip node 1>", "<ip node 2>", "<ip node 3>"]
    username: "cassandra"
    password: "HelloWorld"
    cql {
    cluster-name: thp
    keyspace: thehive
    }
    }

    ## Index configuration
    index.search {
    backend: elasticsearch
    hostname: ["<es node 1>", "es node 2", "es node 3"]
    index-name: thehive
    # auth
    elasticsearch.http.auth.type=basic
    elasticsearch.http.auth.basic.username=elastic
    elasticsearch.http.auth.basic.password=HelloWorld
    # ssl
    elasticsearch.ssl.enabled=true
    elasticsearch.ssl.truststore.location=/etc/thehive/truststore.jks
    elasticsearch.ssl.truststore.password=HelloWorld
    }
    }

Minio

​ 由于我的文件存储是采用了Minio,所以这里需要配置一下。其实更简单的方式,你可以考虑使用S3

  • 创建目录
1
$ mkdir /opt/minio
  • 创建用户
1
$ adduser minio
  • 创建数据卷

    在每台服务器上至少创建2个数据卷

1
2
$ mkdir -p /srv/minio/{1,2}
$ chown -R minio:minio /srv/minio
  • 修改主机名
1
2
3
4
$ vim /etc/hosts
192.168.199.35 minio1
192.168.199.36 minio2
192.168.199.37 minio3
  • 安装
1
2
3
4
$ cd /opt/minio
$ mkdir /opt/minio/{bin,etc}
$ wget -O /opt/minio/bin/minio https://dl.minio.io/server/minio/release/linux-amd64/minio
$ chown -R minio:minio /opt/minio
  • 配置

    • 新建配置文件/opt/minio/etc/minio.conf
    1
    2
    3
    MINIO_OPTS="server --address :9100 http://minio{1...3}/srv/minio/{1...2}"
    MINIO_ACCESS_KEY="admin"
    MINIO_SECRET_KEY="HelloWorld"
    • 新建系统启动文件/usr/lib/systemd/system/minio.service
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    [Unit]
    Description=minio
    Documentation=https://docs.min.io
    Wants=network-online.target
    After=network-online.target
    AssertFileIsExecutable=/opt/minio/bin/minio

    [Service]
    WorkingDirectory=/opt/minio
    User=minio
    Group=minio
    EnvironmentFile=/opt/minio/etc/minio.conf
    ExecStart=/opt/minio/bin/minio $MINIO_OPTS
    Restart=always
    LimitNOFILE=65536
    TimeoutStopSec=0
    SendSIGKILL=no

    [Install]
    WantedBy=multi-user.target
  • 启动

1
2
3
$ systemctl daemon-reload
$ systemctl enable minio
$ systemctl start minio.service

注:这里记得确认一下权限的问题,权限不对的话会导致进程起不来。

  • 创建bucket

    Minio-1

    • 创建bucket

    Minio-2

  • 修改TheHive配置文件 /etc/thehive/application.conf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
## Attachment storage configuration
storage {
provider: s3
s3 {
bucket = "thehive"
readTimeout = 1 minute
writeTimeout = 1 minute
chunkSize = 1 MB
endpoint = "http://minio1:9100"
accessKey = "admin"
secretKey = "HelloWorld"
region = "us-east-1"
}
}
alpakka.s3.path-style-access = force

Cortex

  • 修改 Cortex 配置文件 /etc/cortex/application.conf

    这里注意,官方默认的配置文件有个小问题。当采用Elastic认证的时候需要将username修改为user,否则会报错。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
play.http.secret.key="QZUm2UgZYXF6axC"
search {
index = cortex
uri = "https://elasticsearch01:9200,elasticsearch02:9200,elasticsearch03:9200"
user = "elastic" # 修改username为user
password = "HelloWorld"
keyStore {
path = "/etc/cortex/truststore.jks"
password = "HelloWorld"
}
trustStore {
path = "/etc/cortex/truststore.jks"
password = "HelloWorld"
}
}

Analyzers and Responders

​ 由于在Cortex 3中实现了对dockerized分析器的支持,安装过程已经被大大简化。因此,我们不必纠结于安装插件时的Python或其他库依赖项这种头疼的问题。

  • 安装Docker
1
2
3
4
5
# Ubuntu 18.04
$ wget -O- https://download.docker.com/linux/ubuntu/gpg | sudo apt-key add -
$ add-apt-repository "deb [arch=amd64] https://download.docker.com/linux/ubuntu bionic stable"
$ sudo apt-get update
$ sudo apt-get install docker-ce
  • Cortex账户运行Docker的权限
1
$ usermod -a -G docker cortex
  • 更新配置文件/etc/cortex/application.conf,启用analyzers.json
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
## ANALYZERS
#
analyzer {
urls = [
"https://download.thehive-project.org/analyzers.json" # 本次新增
"/etc/cortex/Cortex-Analyzers/analyzers"
]
}

# RESPONDERS
#
responder {
urls = [
"https://download.thehive-project.org/responders.json" # 本次新增
"/etc/cortex/Cortex-Analyzers/responders"
]
}

如何创建插件

​ 前面有说到Cortex组件默认已经集成了丰富的AnalyzersResponses插件,便于运营人员快速的对安全事件进行分析与响应。在实际使用过程中根据需求场景的不同,我们仍需要进行一些插件的定制化。如何创建插件,官网有很详细的文档介绍,请参考:How to Write and Submit an Analyzer。以下附上了部分新增的插件代码:

好了,废话少说,放“码”过来!!!

Analyzers - 插件

微歩在线

​ 由于我们已经购买了商业(微歩在线)威胁情报,所以我们也和TheHive进行了整合。

  • threatbook.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
#!/usr/bin/env python3
# encoding: utf-8

import requests


class ThreatBookError(Exception):
def __init__(self, message):
Exception.__init__(self, message)
self.message = message


class ThreatBook():
"""
Threat intelligence: Threat Book
https://x.threatbook.cn/nodev4/vb4/API
"""

def __init__(self, key):
self.key = key
self.ua = "HappyHunting"
self.session = requests.Session()
self.urls = {
'compromise': 'https://api.threatbook.cn/v3/scene/dns',
'reputation': 'https://api.threatbook.cn/v3/scene/ip_reputation'
}

def _request(self, url, params={}):
"""
Request an url
"""
headers = {'User-Agent': self.ua}
r = self.session.get(
url=url,
params=params,
headers=headers
)

'''
{
"response_code": -1,
"verbose_msg": "Invalid Access IP"
}
'''
if r.status_code != 200:
raise ThreatBookError(
'Invalid HTTP status code %i' % r.status_code)
if r.json()['response_code'] != 0:
raise ThreatBookError(r.json())
return r.json()

def parser_results(self, results):
for k, v in results.items():
intel = {
'ioc': k,
'malicious': v['is_malicious'],
'confidence': v['confidence_level'],
'tags': v['judgments']
}
return intel

def get_reputation(self, ioc):
"""Getting reputation IP"""
url = self.urls['reputation']
params = {
'apikey': self.key,
'resource': ioc
}
results = self._request(url=url, params=params)
return self.parser_results(results['data'])

def get_compromise(self, ioc):
"""Getting compromise IoC"""
url = self.urls['compromise']
params = {
'apikey': self.key,
'resource': ioc
}
results = self._request(url=url, params=params)
return self.parser_results(list(results['data'].values())[0])


if __name__ == '__main__':
key = '<api_key>'
threat = ThreatBook(key)
# reputation
ioc = '8.8.8.8'
r = threat.get_reputation(ioc)
# compromise
ioc = 'zzv.no-ip.info'
r = threat.get_compromise(ioc)
print(r)
  • threatbook_analyzer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
#!/usr/bin/env python3
# encoding: utf-8

from threatbook import ThreatBook
from cortexutils.analyzer import Analyzer


class ThreatBookAnalyzer(Analyzer):

def __init__(self):
Analyzer.__init__(self)
self.service = self.get_param(
'config.service', None, 'Service parameter is missing')
self.key = self.get_param(
'config.key', None, 'Missing ThreatBook API key')
self.polling_interval = self.get_param('config.polling_interval', 1)
self.threatbook = ThreatBook(self.key)

def summary(self, raw):
taxonomies = []
level = "info"
namespace = "ThreatBook"
value = "False"

if self.service == 'reputation':
predicate = 'Reputation'
elif self.service == 'compromise':
predicate = 'Compromise'

if raw:
if raw['malicious'] == True:
level = "malicious"
value = "True"

taxonomies.append(self.build_taxonomy(
level, namespace, predicate, value))
return {"taxonomies": taxonomies}

def run(self):
if self.service == 'reputation':
data = self.get_param('data', None, 'Data is missing')
results = self.threatbook.get_reputation(data)
self.report(results)
elif self.service == 'compromise':
data = self.get_param('data', None, 'Data is missing')
results = self.threatbook.get_compromise(data)
self.report(results)
else:
self.error('Invalid data type')


if __name__ == '__main__':
ThreatBookAnalyzer().run()
  • ThreatBook_Compromise.json
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
{
"name": "ThreatBook_Compromise",
"version": "1.0",
"author": "Canon",
"url": "https://github.com/TheHive-Project/Cortex-Analyzers",
"license": "AGPL-V3",
"description": "Get the compromise information of IP、Domain from ThreatBook.",
"dataTypeList": [
"ip",
"domain"
],
"command": "ThreatBook/threatbook_analyzer.py",
"baseConfig": "ThreatBook",
"config": {
"service": "compromise"
},
"configurationItems": [
{
"name": "key",
"description": "API key for ThreatBook",
"type": "string",
"multi": false,
"required": true
},
{
"name": "polling_interval",
"description": "Define time interval between two requests attempts for the report",
"type": "number",
"multi": false,
"required": false,
"defaultValue": 60
}
]
}
  • ThreatBook_Reputation.json
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
{
"name": "ThreatBook_Reputation",
"version": "1.0",
"author": "Canon",
"url": "https://github.com/TheHive-Project/Cortex-Analyzers",
"license": "AGPL-V3",
"description": "Get the reputation information of IP from ThreatBook.",
"dataTypeList": [
"ip"
],
"command": "ThreatBook/threatbook_analyzer.py",
"baseConfig": "ThreatBook",
"config": {
"service": "reputation"
},
"configurationItems": [
{
"name": "key",
"description": "API key for ThreatBook",
"type": "string",
"multi": false,
"required": true
},
{
"name": "polling_interval",
"description": "Define time interval between two requests attempts for the report",
"type": "number",
"multi": false,
"required": false,
"defaultValue": 60
}
]
}

ProxyCheck

  • proxycheck.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
#!/usr/bin/env python3
# encoding: utf-8

import requests


class ProxyCheckError(Exception):
def __init__(self, message):
Exception.__init__(self, message)
self.message = message


class ProxyCheck():
"""
Threat intelligence: ProxyCheck
http://proxycheck.io/v2/
"""

def __init__(self, key):
self.key = key
self.ua = "HappyHunting"
self.session = requests.Session()
self.url = 'http://proxycheck.io/v2/'
self.params = {
'vpn': 1, 'asn': 1, 'time': 1, 'info': 0, 'risk': 1,
'port': 1, 'seen': 1, 'days': 7, 'tag': 'siem'
}

def _request(self, url, params={}):
"""
Request ProxyCheck API
"""
headers = {'User-Agent': self.ua}
r = self.session.get(
url=url,
params=params,
headers=headers
)

if r.status_code != 200:
raise ProxyCheckError(
'Invalid HTTP status code %i' % r.status_code)
return r.json()

def check_proxy(self, data):
"""
Checking proxy information from proxycheck.io
"""
url = self.url + data
self.params['key'] = self.key
results = self._request(url=url, params=self.params)
return self.parser_results(results, data)

def parser_results(self, r, ioc):
"""
Parsing results
"""
intel = {}
if r['status'] == 'ok':
intel = {
'ip': ioc,
'country': r[ioc]['country'],
'city': r[ioc]['proxy'],
'proxy': r[ioc]['proxy'],
'type': r[ioc]['type'],
'provider': r[ioc]['provider']
}
return intel


if __name__ == '__main__':
key = '<api_key>'
proxycheck = ProxyCheck(key)

ioc = '8.8.8.8'
r = proxycheck.check_proxy(ioc)
print(r)
  • proxycheck_analyzer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
#!/usr/bin/env python3
# encoding: utf-8

from proxycheck import ProxyCheck
from cortexutils.analyzer import Analyzer


class ProxyCheckAnalyzer(Analyzer):

def __init__(self):
Analyzer.__init__(self)
self.service = self.get_param(
'config.service', None, 'Service parameter is missing')
self.key = self.get_param(
'config.key', None, 'Missing ProxyCheck API key')
self.polling_interval = self.get_param('config.polling_interval', 1)
self.proxycheck = ProxyCheck(self.key)

def summary(self, raw):
taxonomies = []
level = "info"
namespace = "ProxyCheck"
predicate = "Proxy"
value = "False"

if raw.get("proxy") == "yes":
level = "suspicious"
value = "True"

taxonomies.append(self.build_taxonomy(
level, namespace, predicate, value))
return {"taxonomies": taxonomies}

def run(self):
if self.service == 'proxycheck':
data = self.get_param('data', None, 'Data is missing')
results = self.proxycheck.check_proxy(data)
self.report(results)
else:
self.error('Invalid data type')


if __name__ == '__main__':
ProxyCheckAnalyzer().run()
  • ProxyCheck.json
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
{
"name": "ProxyCheck",
"version": "1.0",
"author": "Canon",
"url": "https://github.com/TheHive-Project/Cortex-Analyzers",
"license": "AGPL-V3",
"description": "Get the compromise information of IP from ProxyCheck.",
"dataTypeList": ["ip"],
"command": "ProxyCheck/proxycheck_analyzer.py",
"baseConfig": "ProxyCheck",
"config": {
"service": "proxycheck"
},
"configurationItems": [
{
"name": "key",
"description": "API key for ProxyCheck",
"type": "string",
"multi": false,
"required": true
},
{
"name": "polling_interval",
"description": "Define time interval between two requests attempts for the report",
"type": "number",
"multi": false,
"required": false,
"defaultValue": 60
}
]
}

Responders - 插件

Mail

Cortex默认有一个插件(Mailer)负责发送邮件。使用了一下发现比较“坑”,首先不支持对多个收件人的发送,且当选择从Observables中发送邮件时,收件人竟然是mail类型的IoC。。。 WTF!别问我怎么知道的,它源码里就是这么写的。。。所以,自己动手丰衣足食!

主要功能:

  1. 在原有的基础上新增了批量发送的功能;
  2. 新增了支持对task logs数据类型的发送;
  3. 发送邮件时会附带当前case或者taskURL,便于收件人快速浏览问题;
  • mail.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
#!/usr/bin/env python3
# encoding: utf-8

import ssl
import smtplib
import mistune
from cortexutils.responder import Responder
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText


class Mail(Responder):
def __init__(self):
Responder.__init__(self)
self.smtp_host = self.get_param("config.smtp_host", "localhost")
self.smtp_port = self.get_param("config.smtp_port", "25")
self.mail_from = self.get_param(
"config.from", None, "Missing sender email address"
)
self.smtp_user = self.get_param("config.smtp_user", "user", None)
self.smtp_pwd = self.get_param("config.smtp_pwd", "pwd", None)
self.thehive_url = self.get_param("config.thehive_url", None, None)

def create_links(self):
"""
Create TheHive links
:rtype: String
:return: URL
"""
if self.data_type == "thehive:case":
case_id = self.get_param(
"data.id", None, "case id is missing"
)
url = self.thehive_url + "/index.html#!/case/{}/details".format(case_id)
elif self.data_type == "thehive:case_task":
case_id = self.get_param(
"data.case.id", None, "case id is missing"
)
task_id = self.get_param(
"data.id", None, "task id is missing"
)
url = self.thehive_url + "/index.html#!/case/{}/tasks/{}".format(case_id, task_id)
elif self.data_type == "thehive:case_task_log":
case_id = self.get_param(
"data.case_task.case.id", None, "case id is missing"
)
task_id = self.get_param(
"data.case_task.id", None, "task id is missing"
)
url = self.thehive_url + "/index.html#!/case/{}/tasks/{}".format(case_id, task_id)
return url

def run(self):
Responder.run(self)
if self.data_type == "thehive:case_task_log":
title = self.get_param(
"data.case_task.title", None, "title is missing")
else:
title = self.get_param("data.title", None, "title is missing")

if self.data_type in ["thehive:case", "thehive:case_task"]:
description = self.get_param(
"data.description", None, "case description is missing"
)
elif self.data_type == "thehive:case_task_log":
description = self.get_param(
"data.message", None, "task logs description is missing"
)
elif self.data_type == "thehive:alert":
description = self.get_param(
"data.case.description", None, "alert description is missing"
)
else:
self.error("Invalid dataType")

mail_to = []
if self.data_type == "thehive:case":
# Search recipient address in case tags
tags = self.get_param(
"data.tags", None, "recipient address not found in tags"
)
mail_tags = [t[5:] for t in tags if t.startswith("mail:")]
if mail_tags:
mail_to = mail_tags
else:
self.error("recipient address not found in tags")

elif self.data_type in ["thehive:case_task", "thehive:case_task_log"]:
# Search recipient address in tasks description
descr_array = description.splitlines()
if "mailto:" in descr_array[0]:
mail_str = descr_array[0].replace("mailto:", "").strip()
mail_to = [i.strip() for i in mail_str.split(',')]
else:
self.error("recipient address not found in description")
# Set rest of description as body
description = "\n".join(descr_array[1:])

elif self.data_type == "thehive:alert":
# Search recipient address in artifacts
artifacts = self.get_param(
"data.artifacts", None, "recipient address not found in observables"
)
mail_artifacts = [
a["data"]
for a in artifacts
if a.get("dataType") == "mail" and "data" in a
]
mail_tags = [
t[5:]
for t in mail_artifacts
if t.startswith("mail:")
]
if mail_tags:
mail_to = mail_tags
else:
self.error("recipient address not found in observables")

msg = MIMEMultipart()
msg["Subject"] = title
msg["From"] = self.mail_from
msg["To"] = ','.join(mail_to)
# Markdown to HTML
content = mistune.markdown(description, escape=True, hard_wrap=True)
# add TheHive Links
links = self.create_links()
content += '\n<p><a href="{}">Click me to TheHive</a></p>\n'.format(links)
msg.attach(MIMEText(content, "html", "utf-8"))

if self.smtp_user and self.smtp_pwd:
try:
context = ssl.create_default_context()
with smtplib.SMTP(self.smtp_host, self.smtp_port) as server:
server.ehlo()
server.starttls(context=context)
server.ehlo()
server.login(self.smtp_user, self.smtp_pwd)
server.send_message(msg, self.mail_from, mail_to)
except smtplib.SMTPNotSupportedError:
with smtplib.SMTP(self.smtp_host, self.smtp_port) as server:
server.ehlo()
server.login(self.smtp_user, self.smtp_pwd)
server.send_message(msg, self.mail_from, mail_to)
else:
with smtplib.SMTP(self.smtp_host, self.smtp_port) as server:
server.send_message(msg, self.mail_from, mail_to)

self.report({"message": "message sent"})

def operations(self, raw):
return [self.build_operation("AddTagToCase", tag="mail sent")]


if __name__ == "__main__":
Mail().run()
  • Mail.json
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
{
"name": "Mail",
"version": "1.0",
"author": "Canon",
"url": "https://github.com/TheHive-Project/Cortex-Analyzers",
"license": "AGPL-V3",
"description": "Send an email with information from a TheHive case or alert",
"dataTypeList": ["thehive:case", "thehive:alert", "thehive:case_task", "thehive:case_task_log"],
"command": "Mail/mail.py",
"baseConfig": "Mail",
"configurationItems": [
{
"name": "from",
"description": "email address from which the mail is send",
"type": "string",
"multi": false,
"required": true
},
{
"name": "smtp_host",
"description": "SMTP server used to send mail",
"type": "string",
"multi": false,
"required": true,
"defaultValue": "localhost"
},
{
"name": "smtp_port",
"description": "SMTP server port",
"type": "number",
"multi": false,
"required": true,
"defaultValue": 25
},
{
"name": "smtp_user",
"description": "SMTP server user",
"type": "string",
"multi": false,
"required": false,
"defaultValue": "user"
},
{
"name": "smtp_pwd",
"description": "SMTP server password",
"type": "string",
"multi": false,
"required": false,
"defaultValue": "pwd"
},
{
"name": "thehive_url",
"description": "TheHive server address",
"type": "string",
"multi": false,
"required": true,
"defaultValue": "http://localhost:9000"
}
]
}

Threat Intelligence

​ 其实默认TheHive是推荐与MISP进行对接实现情报的feed。由于我们自建了威胁情报库,所以写了一个Responders插件,帮助在分析时提交IoC情报。这边代码就不上了。给出一个提交的用例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
{
"threat": {
"ioc": "193.142.146.143",
"type": "ip",
"tags": [
"burp scan"
],
"description": "该IP在短时间内对用户登录接口发起大量访问,且包含着大量登录失败的情况",
"created_by": "canon@loveyou.com",
"producer": "Canon",
"provider": "TheHive",
"creation_time": "2021-05-14T09:48:23.664Z",
"modification_time": "2021-05-14T09:48:23.664Z",
"expiration_time": "2021-05-29T09:48:23.664Z",
"meta": {
"case": [
{
"title": "安全分析 - 周报(05.10-05.14)",
"created_by": "canon@loveyou.com",
"owner": "canon@loveyou.com",
"link": "https://127.0.0.1:9000/index.html#!/case/~43769904/observables/~463080"
}
]
}
},
"timestamp": "2021-05-14T09:48:23.664Z"
}

如何启用插件

加载插件

  • 插件路径
    • /etc/cortex/Cortex-Analyzers/analyzers
    • /etc/cortex/Cortex-Analyzers/responders
1
2
3
4
5
6
7
8
9
10
$ ll /etc/cortex/Cortex-Analyzers/analyzers
drwxr-xr-x 10 root root 4096 May 5 01:48 ./
drwxr-xr-x 10 root root 4096 May 5 01:49 ../
drwxr-xr-x 2 root root 4096 May 5 01:48 ProxyCheck/
drwxr-xr-x 2 root root 4096 May 5 01:48 ThreatBook/

$ ll /etc/cortex/Cortex-Analyzers/responders
drwxr-xr-x 6 root root 4096 May 5 01:49 ./
drwxr-xr-x 10 root root 4096 May 5 01:49 ../
drwxr-xr-x 2 root root 4096 May 5 01:49 Mail/
  • 修改配置文件/etc/cortex/application.conf

    建议大家将新增的插件与官方的插件区别开,这样后期也便于维护。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
## ANALYZERS
#
analyzer {
urls = [
"https://download.thehive-project.org/analyzers.json"
"/etc/cortex/Cortex-Analyzers/analyzers" # 新增自定义插件
]
}

# RESPONDERS
#
responder {
urls = [
"https://download.thehive-project.org/responders.json"
"/etc/cortex/Cortex-Analyzers/responders" # 新增自定义插件
]
}

启用插件

  • Analyzers

    • ThreatBook - Analyzers Config

    ![Analyzers Config](/Analyzers Config.png)

    • ThreatBook - Analyzers

    Analyzers

  • Responders

    • Mail - Responders Config

    ![Responders Config](/Responders Config.png)

    • Mail - Responders

    Responders

使用场景

​ 下面来说一下我们都用TheHive做了哪些,刚开始使用场景其实并不多,还需要后期的摸索。

workflow

提前创建好模板,例如:按照Playbook的形式提前创建好。便于后期快速引用

  • 分析周报模板

    按照周为单位创建Case,以天为单位创建Task。

    周报模板

  • 应急响应模板

    可以参照应急响应阶段来创建

    应急响应

  • 引用模板

    引用模板

  • 事件运营:SIEMAlarm) -> TheHiveAlert

    TheHiveSIEM做了对接,主要将两种类型的告警自动化的推送到了TheHive上。

    • 第一种:需要研判的安全事件。例如:基于内->外的NetFlow告警事件(异常端口访问,周期性请求等等)、敏感信息泄漏告警事件(黑客论坛监控、GitHub监控)。通常这类事件需要进行二次确认的,所以会选择通过TheHive来记录整个事件的处理过程。

      人工研判事件-1

    • 第二种:需要重点关注的安全事件。例如:EDR上的告警事件,命中C2指标的情报告警,通常这类事件需要第一时间去响应。

      • 在事件响应的过程中我们可以借助Cortex Analyzers的能力协助我们进行数据分析。如:同时调用多家情报厂商接口进行查询,丰富化数据信息(查询PDNS信息、Whois信息、CMDB等),联动SIEM查询近一段时间内的安全事件等。
      • 对于已“实锤”的指标,可通过Cortex Responders组件与安全设备进行联动,批量下发阻断策略,及时止损。
      • 对于跨部门协作的问题,可利用TheHive去同步事件响应的进度,包括在同一个Case里讨论该问题。
      • 通过对响应过程的记录,可更好的帮助我们去优化安全事件响应流程,并同时帮助我们积累Playbook,为日后的自动化做铺垫。
  • 规则运营:SIEMAlarmAlert)-> TheHiveCase

    ​ 主要是将分析时发现的规则误报以及漏报的情况,通过手动提交Case的形式发送到TheHive上。例如,在SIEM上发现了某个告警存在误报的现象,通过SIEM提交该告警信息给指定负责人,系统会自动将邮件以及Case转到该人员名下。

    • 通过SIEM推送至TheHive,并通知分析人员进行规则优化。

    规则运营-4

    • 提交Case并邮件通知

    规则运营-5

    规则运营-6

    • TheHive

    规则运营-7

    规则运营-8

  • 日常事项:

    • 安全分析周报

      • 以周为单位创建Case

      安全分析周报01

      • 以天为单位创建Task

      安全分析周报03

      • 告警与Case相关联

      安全分析周报05

      • 批量分析IoC

      安全分析周报02

      • 分享给需要关注的小组

      安全分析周报04


写在最后:

​ 如果你有关注过开源解决方案的话,相信你一定有看到过一些TheHive与工作流(**Shufflen8n)组件整合的方案。不难看出,TheHive擅长的是事件响应与分析,这是一种半自动化的形式。通过与工作流组件的对接,你会发现这就是一个“散装*”版的SOAR。商业的SOAR相比开源的SOAR多了一个“作战室”的概念,这个功能与TheHive就会有那么一些相似。例如:你可以在作战室中分析某个IP的情报信息,或者联动现有安全设备对某个IoC进行响应的操作。这些功能其实就是对应到了TheHive中的AnalyzersResponders*的功能。

​ 我个人觉得TheHive这种“半自动化”的形式,可以很好的与SOAR进行互补,相信与SOAR对接后会有更多的“价值”被体现出来。例如:在分析任务中可按照场景的不同有选择的调用SOARPalyBook,并将响应结果feedbackTheHive中。其实TheHive上还有挺多东西值得说的,一次也写不完。更多东西还需要我们根据实际场景再去挖掘,“思路”很重要!

安装

在线安装

1
2
3
4
$ echo 'deb http://download.opensuse.org/repositories/security:/zeek/Debian_10/ /' | sudo tee /etc/apt/sources.list.d/security:zeek.list
$ curl -fsSL https://download.opensuse.org/repositories/security:zeek/Debian_10/Release.key | gpg --dearmor | sudo tee /etc/apt/trusted.gpg.d/security_zeek.gpg > /dev/null
$ sudo apt update
$ sudo apt install zeek

架构图

_images / deployment.png

Manager -> Worker

  1. 在设置集群时,必须在所有主机上设置Zeek用户,并且该用户必须能够从管理器中对集群中的所有机器进行ssh访问,并且必须在不被提示密码/口令的情况下工作(例如,使用ssh公钥认证)。另外,在工作节点上,该用户必须能够以混杂模式访问目标网络接口。
  2. 存储必须在同一路径下的所有主机上可用。
Manager
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# 安装Zeek 略过

# 生成SSH Key
$ ssh-keygen

# 记得Worker节点需要创建.ssh目录

# 复制ssh pub到Zeek Worker
$ scp /root/.ssh/id_rsa.pub root@Zeek-Worker1:~/.ssh/authorized_keys2

# 配置Manager node.cfg
$ vim /opt/zeek/etc/node.cfg
[logger-1]
type=logger
host=Zeek-Manager
#
[manager]
type=manager
host=Zeek-Manager
#
[proxy-1]
type=proxy
host=Zeek-Manager
#
[worker-1]
type=worker
host=Zeek-Worker1
interface=ens224
#
[worker-2]
type=worker
host=Zeek-Worker2
interface=ens224

# 检查Zeek
$ zeekctl
[ZeekControl] > check
logger-1 scripts are ok.
manager scripts are ok.
proxy-1 scripts are ok.
worker-1 scripts are ok.
worker-2 scripts are ok.

# 启动Zeek
$ zeekctl
[ZeekControl] > start
starting logger ...
starting manager ...
starting proxy ...
starting workers ...

集群中性能是否对于单台有优化待测试

背景

​ 由于工作比较忙,有段时间没有更了,快到年底了,就当是总结了。今年主要精力都是在围绕着安全运营这一块的工作,随着工作的开展发现自己“组装”的SIEM用起来不是很“舒服”。这是我之前写的一篇《Wazuh-如何对异构数据进行关联告警》的文章,当时规划的数据流的Workflow如下图所示。

image-20200303215020561

“散装”SIEM主要是建立在ELK的框架之上,告警模块分别采用了Wazuh与Elastalert。早期为了使Wazuh能够消费异构数据(WAF、NTA、EDR)并进行关联告警,我在数据入库之前利用Logstash对异构数据进行了标准化,同时Wazuh对标准化后的数据进行关联。例如:Suricata通过Filebeat将告警事件发出,由Logstash统一进行标准化处理并同时输出到Elastic以及Wazuh。其中Elastic为告警的元数据,推送到Wazuh上的为标准化后的安全事件。


“散装”SIEM v0.1的不足与改进措施

1. 数据标准化

​ 由于前期的标准化未采用**ECS**(Elastic Common Schema),导致后期沿用Elastic生态的时候出现了使用上的不便。大家都知道ES在7.X的版本推出了SIEM这个功能,如果想使用Elastic SIEM进行分析的话,那么ECS是首选。这里为了后期与开源生态更好的进行融合,需要将原先的标准化转为ECS的格式。

2. 告警丰富化

为了更有效的提升告警质量,提高安全分析的效率。需要对入库的安全事件以及产生的告警进行必要的丰富化。

  • 利用CMDB平台的数据对内部资产进行丰富化。例如增加:部门、业务、应用类型、负责人等字段。

  • 对接威胁情报数据,SIEM会对告警事件的攻击IP进行情报侧数据的丰富化;

    • 本地威胁情报(曾经攻击过我们的IP地址)
    • 第三方威胁情报
      • 开源
      • 商业
  • 敏感接口监控(如:登录接口、支付接口、钱包接口)。利用第三方数据对IP地址进行Proxy标记,为后期风控的研判提供一些数据支撑;

  • 为安全事件增加方向与区域的字段。便于分析人员第一时间识别内对内以及内对外的告警。也可针对方向进行告警级别的权重调整;

3. 提升检测能力

底层安全设备的检测能力与安全事件的可信度,是直接影响SIEM告警的关键因素。

  • 接入Imperva WAF的告警数据,与Suricata进行自动化关联,定期将绕过的规则“移植”到前端的Imperva WAF上,加强边界的安全防护能力;
  • “消费”AWS VPC FLOW数据,增加内对外的异常连接检测能力。由于是四层数据能够被“消费”的维度实在不多。主要实现了以下几种告警:
    • 周期性连接告警
    • 威胁情报类告警
    • 端口扫描类告警
      • 短时间内,内网主机请求相同目的IP的多个端口
      • 短时间内,内网主机请求多个IP的相同目的端口
    • 敏感端口请求告警

4. 溯源分析

目前SIEM产生的告警,并不会携带原始的安全事件,这对于分析小伙伴来说并不友好,特别是告警量比较多的时候。

  • 现已为每一个告警增加了**”Hunting”**的字段,通过该字段可直接溯源到底层的安全事件;
  • 增加了更适用于安全分析人员使用的仪表盘;
  • 编写了一个自认为比较贴合分析人员使用的工具:HappyHunting

5. 其他改进

  • 解决了SIEM联动CDN WAF API响应时间过长(15-20分钟 😅)的问题。目前与Imperva WAF API联动已做到了准实时。

  • 优化NTA login_audit代码,提升NTA性能。之前写过一篇文章《**Suricata + Lua实现本地情报对接**》,主要利用了Lua脚本对“敏感”接口进行登录审计并利用情报进行高危账号检测。现已将这部分功能移植到Logstash + Ruby上;

  • 之前的自动化联动规则都是通过手动修改脚本调整rule.id来实现,这种方式在初期还能勉强运行。但在后期运行中暴露出了不足,既不便于管理也增加了维护的成本。所以为了维护SIEM的规则,这里采用了Redis来进行规则的统一管理。后期只需要将需要阻断的rule.id推送至Redis即可。同时也在输出的告警中通过event.action字段标准该告警的响应方式;


“进化” - “散装”SIEM v0.2

1. Workflow

​ 这是重新调整之后的“散装”SIEM v0.2的workflow😁。如下图所示,数据源采集端这里就不展开来说了,都是现成的工具发就完事儿了。下面主要说一下Logstash中对数据处理的部分:

SIEM-v0.2


2. 数据处理

1. Normalized

image-20201208110506481


1.1 normalized-alert_to_siem

​ 针对alert事件进行标准化,也就是安全设备发出的数据。参考上图中蓝色线所示部分

​ 官方已支持Suricata数据的ECS,我们可直接启用filebeat的Suricata模块。但是,这里需要注意一点,默认Suricata的模块中有一些标准化是交由Elastic来做的。由于我们需要利用Logstash做后续的ETL部分,所以现在的整个数据流是:Filebeat -> Logstash -> Elastic。那么,这一部分的标准化,需要我们在Logstash层面来实现。具体涉及到的配置如下:

1.1.1 normalized-suricata

​ 通用Suricata事件标准化配置。

  • 删除一些filebeat自带的字段。
  • 增加providerproductsensor等字段,为了后期区分不同的数据源类型,(例:NTA、WAF、EDR),以及不同的NTA产品(例:Suricata、Zeek
Logstash Config
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
filter {
mutate {
remove_field => [ "application", "type", "agent", "@version", "[event][original]" ]
add_field => {
"provider" => "Suricata"
"product" => "IDS"
"sensor" => "%{[host][name]}"
}
lowercase => [ "[network][transport]" ]
}
uuid {
target => "[event][id]"
overwrite => true
}
}

1.1.2 normalized-alert_for_suricata
Logstash Config
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
filter {
if [suricata][eve][alert] {
mutate {
rename => {
"[suricata][eve][alert][category]" => "[rule][category]"
"[suricata][eve][alert][signature_id]" => "[rule][id]"
"[suricata][eve][alert][signature]" => "[rule][name]"
"[suricata][eve][alert][rule]" => "[rule][description]"
}
}

mutate {
convert => {
"[rule][id]" => "integer"
}
copy => {
"[rule][category]" => "message"
}
}

if [suricata][eve][alert][action] == "blocked" {
mutate {
update => {
"[suricata][eve][alert][action]" => "denied"
}
}
}

if [suricata][eve][alert][action] {
ruby {
code => "
action = event.get('[suricata][eve][alert][action]')

event_type = event.get('[event][type]')
if event_type then
event_type = event.get('[event][type]').push(action)
else
event_type = [action]
end
event.set('[event][type]', event_type)
event.remove('[suricata][eve][alert][action]')

event.set('[event][action]', action)
"
}
}

mutate {
rename => {
"[suricata][eve][alert][severity]" => "[event][severity]"
"[suricata][eve][payload_printable]" => "[rule][payload]"
"[suricata][eve][http][http_request_body_printable]" => "[http][request][body][content]"
"[suricata][eve][http][http_response_body_printable]" => "[http][response][body][content]"
}
}

ruby {
code => "
rule_id = event.get('[rule][id]')
rule_name = event.get('[rule][name]')
event_id = event.get('[event][id]')

event.set('[related][rule][id]', [rule_id])
event.set('[related][rule][name]', [rule_name])
event.set('[related][event][id]', [event_id])
"
}
}
}

1.1.3 normalized-fileinfo_for_suricata
Logstash Config
1
2
3
4
5
6
7
8
9
10
filter {
if [suricata][eve][fileinfo] {
mutate {
rename => {
"[suricata][eve][fileinfo][filename]" => "[file][path]"
"[suricata][eve][fileinfo][size]" => "[file][size]"
}
}
}
}

1.1.4 normalized-flow_for_suricata
Logstash Config
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
filter {
if [suricata][eve][flow] {
mutate {
rename => {
"[suricata][eve][flow][pkts_toclient]" => "[destination][packets]"
"[suricata][eve][flow][pkts_toserver]" => "[source][packets]"
"[suricata][eve][flow][bytes_toclient]" => "[destination][bytes]"
"[suricata][eve][flow][bytes_toserver]" => "[source][bytes]"
}
}

ruby {
init => "
@sb = 0
@sp = 0
@db = 0
@dp = 0
"

code => "
events = event.to_hash

if events.has_key?('source') then
@sb = events['source'].fetch('bytes', 0)
@sp = events['source'].fetch('packets', 0)
end

if events.has_key?('destination') then
@db = events['destination'].fetch('bytes', 0)
@dp = events['destination'].fetch('packets', 0)
end

if (@sb+@db+@sp+@dp > 0) then
if (@sb+@db > 0) then
event.set('[network][bytes]', @sb+@db)
end
if (@sp+@dp > 0) then
event.set('[network][packets]', @sp+@dp)
end
end
"
}

date {
match => [ "[suricata][eve][flow][start]", "ISO8601" ]
target => "[event][start]"
}

date {
match => [ "[suricata][eve][flow][end]", "ISO8601" ]
target => "[event][end]"
}

mutate {
rename => {
"[suricata][eve][flow][age]" => "[event][duration]"
}
}

mutate {
remove_field => [
"[suricata][eve][flow][start]",
"[suricata][eve][flow][end]"
]
}
}
}

1.1.5 normalized-http_for_suricata
Logstash Config
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
filter {
if [suricata][eve][http] {
mutate {
rename => {
"[suricata][eve][http][http_method]" => "[http][request][method]"
"[suricata][eve][http][status]" => "[http][response][status_code]"
"[suricata][eve][http][hostname]" => "[destination][domain]"
}
}

if [destination][domain] and [network][protocol] == "http" {
mutate {
copy => { "[destination][domain]" => "[url][domain]" }
}
}

ruby {
init => "
@pattern = /(?<path>[^?#]*)(?:\?(?<query>[^#]*))?(?:#(?<fragment>.*))?/
"
code => "
url = event.get('[suricata][eve][http][url]')
res = @pattern.match(url)

if res['path'] then
event.set('[url][path]', res['path'])
end
if res['query'] then
event.set('[url][query]', res['query'])
end
if res['fragment'] then
event.set('[url][fragment]', res['fragment'])
end
"
}

mutate {
rename => {
"[suricata][eve][http][url]" => "[url][original]"
"[suricata][eve][http][http_refer]" => "[http][request][referrer]"
"[suricata][eve][http][length]" => "[http][response][body][bytes]"
"[suricata][eve][http][http_user_agent]" => "[user_agent][original]"
}
}
}
}

1.1.6 normalized_http_headers_for_suricata

​ 当Suricata设置dump-all-headers:both时,会将HTTP头全部输出。对于http_audit这个需求而言是个很好的功能,只不过输出的格式有点坑😂😂😂。为了更方便的在Kibana上进行筛选,我对这部分数据进行了标准化。😁

Logstash Config
1
2
3
4
5
6
7
filter {
if [suricata][eve][http] {
ruby {
path => "/etc/logstash/scripts/normalized_http_headers.rb"
}
}
}
Ruby Code
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
def filter(event)
request = {}
response = {}

request_headers = event.get("[suricata][eve][http][request_headers]")
response_headers = event.get("[suricata][eve][http][response_headers]")

if request_headers then
request_headers.each do |headers|
name = headers['name'].downcase
value = headers['value']
request[name] = value
end
end

if response_headers then
response_headers.each do |headers|
name = headers['name'].downcase
value = headers['value']
response[name] = value
end
end

event.remove("[suricata][eve][http][request_headers]")
event.remove("[suricata][eve][http][response_headers]")
event.set("[suricata][eve][http][request]", request)
event.set("[suricata][eve][http][response]", response)
return [event]
end
示例
  • 标准化之前的数据:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
{
"request_headers": [
{
"name": "Connection",
"value": "Keep-Alive"
}
],
"response_headers": [
{
"name": "Server",
"value": "NWS_TCloud_S11"
}
]
}
  • 标准化之后的数据:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
{
"http": {
"request": {
"host": "192.168.199.1:25782",
"connection": "Close",
"cache-control": "no-cache",
"pragma": "no-cache",
"user-agent": "Microsoft-Windows/10.0 UPnP/1.0",
"accept": "text/xml, application/xml"
},
"response": {
"ext": "",
"content-length": "2645",
"server": "RT-N56U/3.4.3.9 UPnP/1.1 MiniUPnPd/2.0",
"content-type": "text/xml; charset=\"utf-8\"",
"connection": "close"
}
}
}

1.1.7 normalized-tls_for_suricata
Logstash Config
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
filter {
if [suricata][eve][tls] {
mutate {
uppercase => [
"[tls][server][hash][sha1]"
]
split => {
"[tls][server][hash][sha1]" => ":"
}
join => {
"[tls][server][hash][sha1]" => ""
}
copy => {
"[tls][server][hash][sha1]" => "[related][hash]"
}
}
}
}

1.2 normalized-alarm_from_siem

​ 针对alarm事件进行标准化,也就是SIEM发出的数据。参考上图中红色线所示部分

1.2.1 normalized-alarm

​ 通用alarm事件标准化配置。

Logstash Config
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
filter {
date {
match => ["timestamp", "ISO8601"]
target => "timestamp"
}

mutate {
rename => {
"[data][source]" => "source"
"[data][destination]" => "destination"
"[data][network]" => "network"

"[data][event]" => "event"
"[data][fileset]" => "fileset"

"[data][http]" => "http"
"[data][url]" => "url"
"[data][user_agent]" => "user_agent"

"[data][related]" => "related"
"[data][threat]" => "threat"

"[rule][groups]" => "[rule][ruleset]"
}

convert => {
#"[agent][id]" => "integer"

"[event][severity]" => "integer"

"[rule][id]" => "integer"

"[related][rule][id]" => "integer"

"[network][bytes]" => "integer"
"[network][packets]" => "integer"

"[source][port]" => "integer"
"[source][bytes]" => "integer"
"[source][packets]" => "integer"

"[destination][port]" => "integer"
"[destination][bytes]" => "integer"
"[destination][packets]" => "integer"

"[http][response][status_code]" => "integer"
"[http][response][body][bytes]" => "integer"
}

remove_field => [
"beat", "input_type", "tags", "count", "@version",
"ecs", "log", "offset", "type", "host", "predecoder",
"decoder", "[data][rule]"
]

copy => {
"[rule][description]" => "[rule][name]"
}
}

if [event][kind] == "alarm" {
mutate {
rename => {
"previous_output" => "[related][event][log]"
}
}

ruby {
code => "
src_ip = event.get('[source][ip]')
dst_ip = event.get('[destination][ip]')
src_port = event.get('[source][port]').to_s
dst_port = event.get('[destination][port]').to_s
rule_name = event.get('[related][rule][name]')[0].to_s

rule_description = src_ip + ':' + src_port + ' -> ' + dst_ip + ':' + dst_port + ' -> ' + rule_name
event.set('[rule][description]', rule_description)

if event.get('[related][rule][id]') then
sid = event.get('[related][rule][id]')[0]
event.set('[rule][uuid]', sid)
end

event.set('[rule][category]', 'Frequency')
"
}
}
}

2. Enrichment

image-20201208110506481


2.1 enrichment_alert_to_siem

​ 针对alert事件进行丰富化,也就是安全设备发出的原始安全事件。参考上图中蓝色线所示部分

2.1.1 enrichment-alert_direction_for_suricata

​ 由于一些特殊原因,我不得不将suricata.yaml配置文件中的EXTERNAL_NET = any。这也导致了部分Suricata告警的误报,毕竟放开了规则的方向这个收敛条件。所以,我利用了Logstash 在数据入库到SIEM之前做了一层过滤。

Logstash Config
1
2
3
4
5
6
7
filter {
if [rule][description] {
ruby {
path => "/etc/logstash/scripts/add_direction.rb"
}
}
}
Ruby Code
  • enrichment-alert_direction_for_suricata.rb
    • 过滤触发规则的IP与规则方向不匹配的安全事件
    • 增加方向与区域的字段,便于分析人员第一时间识别内对内以及内对外的告警。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
require "ipaddr"


def filter(event)
src_ip = event.get("[source][ip]")
dst_ip = event.get("[destination][ip]")
if not src_ip or not dst_ip then
event.cancel
return []
end
ipaddr_src = IPAddr.new src_ip
ipaddr_dst = IPAddr.new dst_ip

# Sample: alert http $EXTERNAL_NET any -> $HOME_NET any
rule = event.get("[rule][description]")
src_direction = rule.split(" ")[2]
dst_direction = rule.split(" ")[5]

src_private = ipaddr_src.private?()
dst_private = ipaddr_dst.private?()

if event.get("provider") == "Suricata" then
if ( src_private ) and ( src_direction == "$EXTERNAL_NET" ) then
event.cancel
return []
end

if ( dst_private ) and ( dst_direction == "$EXTERNAL_NET" ) then
event.cancel
return []
end
end

if src_private and dst_private then
direction = "outbound"
zone = "internal"
elsif src_private and not dst_private then
direction = "outbound"
zone = "internal"
elsif not src_private and dst_private then
direction = "inbound"
zone = "external"
else
direction = "inbound"
zone = "external"
end

event.set("[network][direction]", direction)
event.set("[network][zone]", zone)
return [event]
end

​ 为了便于后期做关联分析,增加了攻击者与域名的关联字段。

Logstash Config
1
2
3
4
5
6
7
8
9
10
11
filter {
if [url][domain] {
ruby {
code => "
source_ip = event.get('[source][ip]')
url_domain = event.get('[url][domain]')
event.set('[related][domain]', [source_ip, url_domain])
"
}
}
}

2.1.3 add_geo-private_ip

​ 为内网IP增加地理位置标示,主要是为了Dashboard展示的时候可以看到资产在地图上的坐标。地图炮?BIUBIUBIU?😂。由于不是外网IP没办法加载GeoIP进行匹配,这里使用了Translate这个插件来进行配置。

Logstash Conifg
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
filter {
translate {
regex => true
exact => true
dictionary_path => "/etc/logstash/scripts/private_ip_geo.yml"
field => "[source][ip]"
destination => "translation_geo"
}

json {
source => "translation_geo"
target => "[source][geo]"
skip_on_invalid_json => true
}

translate {
regex => true
exact => true
dictionary_path => "/etc/logstash/scripts/private_ip_asn.yml"
field => "[source][ip]"
destination => "translation_as"
}

json {
source => "translation_as"
target => "[source][as]"
skip_on_invalid_json => true
}

mutate {
remove_field => [ "translation_geo", "translation_as" ]
}
}

filter {
translate {
regex => true
exact => true
dictionary_path => "/etc/logstash/scripts/private_ip_geo.yml"
field => "[destination][ip]"
destination => "translation_geo"
}

json {
source => "translation_geo"
target => "[destination][geo]"
skip_on_invalid_json => true
}

translate {
regex => true
exact => true
dictionary_path => "/etc/logstash/scripts/private_ip_asn.yml"
field => "[destination][ip]"
destination => "translation_as"
}

json {
source => "translation_as"
target => "[destination][as]"
skip_on_invalid_json => true
}

mutate {
remove_field => [ "translation_geo", "translation_as" ]
}
}
Yaml
1
'192.168.199.\d+': '{"location":{"lat":45.8491,"lon":-119.7143},"country_name":"China","country_iso_code":"CN","region_name":"Jiangsu","region_iso_code":"JS","city_name":"Nanjing"}'
1
'192.168.199.\d+': '{"number":4134,"organization.name":"CHINANET-BACKBONE"}'
示例

image-20201209110617996

image-20201208153642680


2.1.4 add_geo-public_ip

​ 外网IP就比较好搞定了直接加载GeoIP数据即可。

Logstash Conifg
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
filter {
if ! [source][geo] {
geoip {
source => "[source][ip]"
target => "[source][geo]"
fields => ["city_name", "country_name", "country_code2", "region_name", "region_code", "location"]
database => "/etc/logstash/GeoLite2-City.mmdb"
}

geoip {
source => "[source][ip]"
target => "[source][as]"
fields => ["autonomous_system_organization", "autonomous_system_number"]
database => "/etc/logstash/GeoLite2-ASN.mmdb"
default_database_type => "ASN"
}
}
}

filter {
if ! [destination][geo] {
geoip {
source => "[destination][ip]"
target => "[destination][geo]"
fields => ["city_name", "country_name", "country_code2", "region_name", "region_code", "location"]
database => "/etc/logstash/GeoLite2-City.mmdb"
}

geoip {
source => "[destination][ip]"
target => "[destination][as]"
fields => ["autonomous_system_organization", "autonomous_system_number"]
database => "/etc/logstash/GeoLite2-ASN.mmdb"
default_database_type => "ASN"
}
}
}

filter {
mutate {
rename => ["[source][geo][country_code2]", "[source][geo][country_iso_code]"]
rename => ["[source][geo][region_code]", "[source][geo][region_iso_code]"]
rename => ["[source][as][asn]", "[source][as][number]"]
rename => ["[source][as][as_org]", "[source][as][organization.name]"]
rename => ["[destination][geo][country_code2]", "[destination][geo][country_iso_code]"]
rename => ["[destination][geo][region_code]", "[destination][geo][region_iso_code]"]
rename => ["[destination][as][asn]", "[destination][as][number]"]
rename => ["[destination][as][as_org]", "[destination][as][organization.name]"]

remove_tag => [ "_geoip_lookup_failure" ]
}
}

2.2 enrichment-alarm_from_siem

​ 针对alarm事件进行丰富化,也就是SIEM发出的数据。上图中红色线所示部分

​ 为SIEM告警增加了Hunting的功能,通过该功能可直接溯源到触发alarm的所有alert事件。

Logstash Config
1
2
3
4
5
6
7
filter {
if [event][kind] == "alarm" and [related][event][log] {
ruby {
path => "/etc/logstash/scripts/add_related_event_id.rb"
}
}
}
Ruby Code
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
require "json"


def register(params)
@pattern = /(?:\\n)?\w+ \d+ \d+:\d+:\d+ logstash NORMALIZED\[-\]: /
end

def filter(event)
event_id = []
rule_id = []
rule_name = []
event_log = event.get('[related][event][log]')

atomic_rules = event_log.split(@pattern)[1 .. -1]
for atomic in atomic_rules do
e_id = JSON.parse(atomic)['event']['id']
r_id = JSON.parse(atomic)['rule']['id']
r_name = JSON.parse(atomic)['rule']['name']

event_id.push(e_id)
rule_id.push(r_id)
rule_name.push(r_name)
end
event.set('[related][event][id]', event_id)
event.set('[related][rule][id]', rule_id)
event.set('[related][rule][name]', rule_name)
event.remove('[related][event][log]')

return [event]
end
示例

​ 以下是一个Wazuh聚合告警,分析人员通过点击threat.hunting.event.id字段即可溯源出触发该聚合规则的底层alert事件。

image-20201204175051910

image-20201204175244182


3. Threat Intelligence

threat-intelligence

3.1 threatIntel_alert_to_siem

​ 通过Logstash加载Ruby脚本,将IoC推送至Redis。为了避免重复推送,每个IoC都会设置超时时间(默认7天)。如上图中蓝色线所示部分;

3.1.1 add-ti_shodan

​ 对于攻击过我们的IP地址我们都会利用Shodan进行反向探测,收集一波攻击者(肉鸡)的资产信息,留给后面分析人员用。由于已经设置了IoC超时时间,所以在超时之前IoC不会重复推送。当然,单个Key调用API频率还是要控制一下的,不过你可以选择多个Key哦。你懂的😈😈😈

Logstash Config
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
filter {
if [suricata][eve][alert] {
clone {
clones => [ "siem_events" ]
}
}
}

filter {
if [type] == "siem_events" {
ruby {
path => "/etc/logstash/scripts/siem-ti_shodan.rb"
script_params => {
"host" => "127.0.0.1"
"port" => 6379
"password" => "HelloWorld"

"ti_db" => 1
"alert_prefix" => "alert:"
"expire" => 86400

"spider_db" => 5
"spider_key" => "spider:shodan:ioc"
}
}
}
}
Ruby Code
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
require "json"
require "redis"
require "ipaddr"

def register(params)
@expire = params["expire"]
@alert = params["alert_prefix"]
@alarm = params["alarm_prefix"]
@spider_key = params["spider_key"]

# connect to redis
@ti_redis = Redis.new(host:params["host"], port:params["port"], password:params["password"], db:params["ti_db"])
@spider_redis = Redis.new(host:params["host"], port:params["port"], password:params["password"], db:params["spider_db"])
end

def filter(event)
src_ip = event.get("[source][ip]")
dst_ip = event.get("[destination][ip]")

begin
ipaddr_src = IPAddr.new src_ip
ipaddr_dst = IPAddr.new dst_ip
rescue Exception => e
event.cancel
return []
end

if not ipaddr_src.private?() then
ioc = src_ip
elsif not ipaddr_dst.private?() then
ioc = dst_ip
else
return [event]
end

if event.get("[event][kind]") == "alert" then
alert_ioc = @alert + ioc
if not @ti_redis.exists?(alert_ioc) then
@ti_redis.setex(alert_ioc, @expire, true)
@spider_redis.lpush(@spider_key, ioc)
end
end

return [event]
end

3.2 threatIntel_alarm_from_siem

​ 通过Logstash进行威胁情报数据的丰富化。如上图中红色线所示部分;

3.2.1 add-ti_tags_from_shodan

​ 将Shodan IoC情报数据丰富化至alarm。

Logstash Config
1
2
3
4
5
6
7
8
9
10
11
12
13
14
filter {
if [event][kind] == "alarm" {
ruby {
path => "/etc/logstash/scripts/ti_shodan.rb"
script_params => {
"host" => "127.0.0.1"
"port" => 6379
"password" => "HelloWorld"
"ti_db" => 1
"alarm_prefix" => "alarm:"
}
}
}
}
Ruby Code
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
require "json"
require "redis"
require "ipaddr"

def register(params)
@alarm = params["alarm_prefix"]
# connect to redis
@ti_redis = Redis.new(host:params["host"], port:params["port"], password:params["password"], db:params["ti_db"])
end

def filter(event)
src_ip = event.get("[source][ip]")
dst_ip = event.get("[destination][ip]")

begin
ipaddr_src = IPAddr.new src_ip
ipaddr_dst = IPAddr.new dst_ip
rescue Exception => e
event.cancel
return []
end

if not ipaddr_src.private?() then
ioc = src_ip
elsif not ipaddr_dst.private?() then
ioc = dst_ip
else
return [event]
end

raw_data = @ti_redis.get(@alarm + ioc)
if raw_data then
data = JSON.parse(raw_data)
if data then
event.set("[threat][hunting][services]", data["services"])
event.set("[threat][hunting][vulns]", data["vulns"])
event.set("[threat][hunting][ports]", data["ports"])
event.set("[threat][hunting][hostnames]", data["hostnames"])
event.set("[threat][hunting][domains]", data["domains"])
if data["details"] then
details = data["details"].to_json
event.set("[threat][hunting][details]", details)
end
end
end

return [event]
end
示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
{
"threat": {
"hunting": {
"vulns": [
"CVE-2019-0220",
"CVE-2019-0197",
"CVE-2019-0196",
"CVE-2018-1302",
"CVE-2019-0211",
"CVE-2017-15710",
"CVE-2018-1301",
"CVE-2018-1283",
"CVE-2018-1303",
"CVE-2017-15715",
"CVE-2018-1333",
"CVE-2018-17199",
"CVE-2018-11763",
"CVE-2018-1312"
],
"domains": [],
"ports": [
8888,
80,
8080,
8090,
22
],
"details": "{\"tcp\":[{\"http-simple-new\":8888,\"Apache httpd\":\"2.4.29\"},{\"http\":80,\"Apache httpd\":\"2.4.29\"},{\"http\":8080,\"Apache httpd\":\"2.4.29\"},{\"http-simple-new\":8090},{\"ssh\":22,\"OpenSSH\":\"7.6p1 Ubuntu-4ubuntu0.3\"}],\"udp\":[]}",
"hostnames": [],
"services": [
"http-simple-new",
"ssh",
"http"
]
}
}
}

image-20201205151011258

image-20201205150744102

3.2.2 add-ti_tags

​ 这部分通常是对接的自有情报(我们会收集攻击过我们的IP,建立适用于自己的内部情报。)以及开源情报。原则上SIEM产生的alarm事件并不会很多,所以这边直接从Elastic获取了情报数据进行告警的丰富化。如果alarm事件很多的话,建议也是放在Redis或者再考虑其他方案。近期准备采购商业情报,后期将会有一个商业情报的对接过程。

Logstash Config
1
2
3
4
5
6
7
8
9
10
11
12
filter {
if [event][kind] == "alarm" {
ruby {
path => "/etc/logstash/scripts/siem-ti_tags.rb"
script_params => {
"index" => "ecs-ti-*"
"urls" => "https://elastic:HelloWorld@127.0.0.1:9200"
"ca" => "ca.crt"
}
}
}
}
Ruby Code
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
require 'json'
require 'elasticsearch'

def register(params)
@urls = params["urls"]
@index = params["index"]
@ca = params["ca"]
@client = Elasticsearch::Client.new urls: @urls, transport_options: { ssl: { ca_file: @ca } }
end

def filter(event)
ioc = event.get('[source][ip]')
query = {
"_source": {
"includes": [
"threat.tags",
"threat.provider"
]
},
"query": {
"bool": {
"must": [
{
"terms": {
"threat.type": [
"ipv4",
"ip"
]
}
},
{
"term": {
"threat.ioc": ioc
}
}
],
"filter": [
{
"range": {
"threat.creation_time": {
"gte": "now-7d"
}
}
}
]
}
},
"size": 10
}
response = @client.search index: @index, body: query.to_json

tags = []
providers = []
if not response['hits']['hits'].empty? then
response['hits']['hits'].each do |result|
if not providers.include?(result["_source"]["threat"]["provider"])
providers.push(result["_source"]["threat"]["provider"])
end
tags = tags - result["_source"]["threat"]["tags"]
tags = tags + result["_source"]["threat"]["tags"]
end
end

event.set('[threat][intelligence][tags]', tags)
event.set('[threat][intelligence][providers]', providers)
return [event]
end
示例

image-20201208144650226

1
2
3
4
5
6
7
8
9
10
11
12
{
"threat": {
"intelligence": {
"providers": [
"NTA"
],
"tags": [
"WebAttack"
]
}
}
}

4. Filter

image-20201207153152332

4.1 alert_to_siem

4.1.1 filter_ip_from_alert

​ 实际使用场景中需要对一些白名单IP、特定签名规则进行过滤。这么做也是为了保证SIEM的性能以及告警的可靠性。这部分数据依旧会发送到Elastic作为历史数据留存,但不会被SIEM消费并产生告警。

Logstash Config

​ 利用Clone插件,将需要被SIEM消费的数据经由脚本过滤。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
filter {
if [suricata][eve][alert] {
clone {
clones => [ "siem_events" ]
}
}
}

filter {
if [type] == "siem_events" {
ruby {
path => "/etc/logstash/scripts/siem-filter_ip.rb"
script_params => {
"host" => "127.0.0.1"
"port" => 6379
"password" => "HelloWorld"
"cdn_db" => 3
"scan_db" => 4
}
}
}
}
Ruby Code
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
require "redis"


def register(params)
begin
@cdn_db = Redis.new(host:params["host"], port:params["port"], password:params["password"], db:params["cdn_db"])
@scan_db = Redis.new(host:params["host"], port:params["port"], password:params["password"], db:params["scan_db"])
rescue
return
end
end

def filter(event)
src_ip = event.get("[source][ip]")
dst_ip = event.get("[destination][ip]")

if @cdn_db.exists?(src_ip) || @cdn_db.exists?(dst_ip) || @scan_db.exists?(src_ip) || @scan_db.exists?(dst_ip) then
event.cancel
return []
end

return [event]
end
4.1.2 filter_sid_from_alert
Logstash Config
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
filter {
if [suricata][eve][alert] {
clone {
clones => [ "siem_events" ]
}
}
}

filter {
if [type] == "siem_events" {
ruby {
path => "/etc/logstash/scripts/siem-filter_sid.rb"
script_params => {
"host" => "127.0.0.1"
"port" => 6379
"password" => "HelloWorld"
"sid_db" => 2
}
}
}
}
Ruby Code
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
require "redis"


def register(params)
@signature_id = Redis.new(host:params["host"], port:params["port"], password:params["password"], db:params["sid_db"])
end

def filter(event)
sid = event.get("[rule][id]")
if @signature_id.exists?(sid) then
event.cancel
return []
end
return [event]
end

4.2 alarm_from_siem

4.2.1 update_action_from_alarm

​ 更新匹配到的rule.id事件,将event.action值更新为:block。便于后期被SIEM联动模块“消费”。如上图中红色线所示部分。主要是为了通过event.action字段来区分SIEM做的自动化操作。

Logstash Config

​ 针对指定rule.id事件,将allowed值更新为:block。便于后期被联动模块“消费”。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
filter {
mutate {
update => {
"[event][action]" => "allowed"
}
}

ruby {
path => "/etc/logstash/scripts/siem-update_action.rb"
script_params => {
"host" => "127.0.0.1"
"port" => 6379
"password" => "HelloWorld"
"siem_action_db" => 7
}
}
}
Ruby Code
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
require "redis"


def register(params)
begin
@siem_action_db = Redis.new(host:params["host"], port:params["port"], password:params["password"], db:params["siem_action_db"])
rescue
return
end
end


def filter(event)
rule_id = event.get("[rule][id]")

if @siem_action_db.exists?(rule_id) then
event.set("[event][action]", "block")
end

return [event]
end
示例:

image-20201207111416997


3. 仪表盘

SIEM的告警仪表盘

SIEM-Alarm

安全事件仪表盘 (安全溯源)

SIEM Alert

敏感接口监控

image-20201214112856076

背景

XXX


SIEM v0.2 的不足

XXX


SIEM v0.3 的改进

1. Workflow

XXX


2. Normalized

Workflow

![image-20220218183243830](/Users/canon/Library/Application Support/typora-user-images/image-20220218183243830.png)

Configure

No File Script Log Note
1 60_normalized-general.conf
2 61_normalized-alert.conf
3 62_normalized-flow.conf
4 63_normalized-fileinfo.conf
5 64_normalized-http.conf 64_normalized-http.rb 64_normalized-http.log
6 65_normalized-tls.conf

3. Enrichment

Workflow

![image-20220221134544143](/Users/canon/Library/Application Support/typora-user-images/image-20220221134544143.png)

Configure

No File Script Log Note
1 70_enrichment-general-geo-1-private_ip.conf
2 70_enrichment-general-geo-2-public_ip.conf
3 71_enrichment-alert-1-direction.conf 71_enrichment-alert-1-direction.rb
4 71_enrichment-alert-2-killChain.conf 71_enrichment-alert-2-killChain.rb 71_enrichment-alert-2-killChain.log
5 71_enrichment-alert-3-cve.conf 71_enrichment-alert-3-cve.rb 71_enrichment-alert-3-cve.log
6 71_enrichment-alert-4-whitelist_ip.conf 71_enrichment-alert-4-whitelist_ip.rb 71_enrichment-alert-4-whitelist_ip.log ****

KillChain

Suricata

Imput

  • Redis template
1
2
3
4
# key: str killchain:{provider}:{rule id}
# value: json str {"steps"=>{KillChain steps}, "description"=>{KillChain description}, "class"=>{rule class}}

localhost:6379> set killchain:suricata:2028933 '{"steps": 4, "description": "Exploitation", "class"=>exploit}'

Output

1
2
3
4
5
6
7
8
9
{
"threat": {
"killchain": {
"steps": 1,
"description": "侦查跟踪",
"class": "scan"
}
}
}
Imperva

Vulnerability

Input

  • Redis template
1
2
3
4
# key: str enrichment:{class}:{cve}
# value: str {create time create user}

localhost:6379> set enrichment:cve:CVE-2016-8618 '2022-02-16 Canon'

Output

1
2
3
4
5
6
7
8
9
{
"rule": {
"cve": "CVE-2015-9381" // none
},
"vulnerability": {
"id": "CVE-2015-9381",
"enumeration": "CVE"
}
}

4. ThreatIntel

Workflow

![image-20220221105601713](/Users/canon/Library/Application Support/typora-user-images/image-20220221105601713.png)

Configure

Shodan

No File Script Log Note
1 85_threatintel-siem-event-1-shodan.conf 85_threatintel-siem-event-1-shodan.rb

Input

  • Redis template
1
2
3
4
5
# key: list spider:{provider}:ioc
# value: str ioc

localhost:6379> LPUSH spider:shodan:ioc 8.8.8.8 # Spider Queue
localhost:6379> SETEX alert:8.8.8.8 86400 true # IoC Cache

Output

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
{
"source": {
"ip": "84.17.52.20"
},
"threat": {
"hunting": {
"details": {
"tcp": [
{
"http-simple-new": 81
},
{
"https": 443
},
{
"http-simple-new": 8080
},
{
"https-simple-new": 8081
},
{
"https": 8443
},
{
"https-simple-new": 9002
}
],
"udp": []
},
"domains": "cdn77.com",
"hostnames": "unn-84-17-52-20.cdn77.com",
"ports": [
8443,
8081,
9002,
8080,
81,
443
],
"services": [
"https",
"https-simple-new",
"http-simple-new"
]
}
}
}

3. Filter

3.1 Whitelist

3.1.1 IP

  • Redis template

Input

1
2
3
4
# key: str whitelist:{class}:{ip}
# value: json str {"type": "cdn", "action": "pass|drop"}

localhost:6379> set whitelist:ip:8.8.8.8 '{"type": "cdn", "action": "pass"}'

Output

1
2
3
4
5
6
7
8
9
10
11
{
"source": {
"ip": "x.x.x.x"
},
"whitelist": {
"type": "cdn", // 描述白名单类型,如:CDN、红队测试IP、办公网出口IP
"action": "drop", // drop|pass 事件是否需要进入SIEM消费
"origin": "source" // source|destination 描述实际匹配到白名单来源是"source"还是"destination"
},
"isWhitelist": true // 仅作为筛选条件
}

Differences

  • v0.2
1
2
3
4
5
6
7
8
9
10
11
12
{
"source": {
"ip": "x.x.x.x",
"isWhitelist": true,
"whitelistType": "exit_whitelist"
},
"destination.": {
"ip": "y.y.y.y",
"isWhitelist": true,
"whitelistType": "exit_whitelist"
}
}
  • v0.3
1
2
3
4
5
6
7
8
9
10
11
{
"source": {
"ip": "x.x.x.x"
},
"whitelist": {
"type": "cdn",
"action": "drop",
"origin": "source"
},
"isWhitelist": true
}

3.1.2 Rule

3.1.2.1 SID
  • Redis template

Input

1
2
3
4
# key: str whitelist:{class}:{provider}:{rule id}
# value: str {rule name}

localhost:6379> set whitelist:sid:suricata:2101201 "GPLWEB_SERVER_403_Forbidden"

Workflow

![image-20220224143316558](/Users/canon/Library/Application Support/typora-user-images/image-20220224143316558.png)

Input

  • Redis template
1
2
3
4
5
6
7
# key: str whitelist:{class}:{provider}:{rule name}
# value: str add by {user} {date} {sid} # default

# Suricata
localhost:6379> set "whitelist:rule:suricata:GPLWEB_SERVER_403_Forbidden" "add by Canon 2022.02.24 2101201"
# Imperva
localhost:6379> set "whitelist:rule:imperva:Suspicious Response Code" "add by Canon 2022.02.24"

Workflow

![image-20220224150916372](/Users/canon/Library/Application Support/typora-user-images/image-20220224150916372.png)

背景

​ 由于AWS流量镜像的特殊性,现阶段生产网的架构中只接入了HTTPDNS流量,分别采用了ZeekSuricata对现有流量进行分析与预警。Suricata负责基于签名的特征检测,Zeek负责定制化事件的脚本检测,也算是“各司其职”。近几日,某个业务接口出现了Pindom告警,经过分析发现部分IP尝试对该接口的参数进行遍历。由于遍历参数对应的值设置的都比较大,且后台并未对该参数进行深度的限制,导致了服务器会不断的进行计算,最终导致接口无响应。

需求

  • 检测参数遍历行为;
  • 访问是否存在周期性
  • unique user_agent 统计;
  • threat intelligence 研判;

实现

​ 通过扩展**ElastAlert**告警框架的告警模型,来实现以上需求。

参数遍历
  • 新增规则 - Spider.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
import sys
import json
import redis
import html
import datetime
from multiprocessing import Process, JoinableQueue, Lock, Manager

from elastalert.ruletypes import RuleType
from elastalert.util import elastalert_logger

try:
import pandas as pd
except:
print("Please make sure you have pandas installed. pip install pandas")
sys.exit(0)

try:
from tqdm import tqdm
except:
print("Please make sure you have tqdm module installed. pip install tqdm")
sys.exit(0)


def conn(host='localhost', port=6379, password=None, db=0):
pool = redis.ConnectionPool(host=host, port=port, password=password, db=db)
conn = redis.Redis(connection_pool=pool)
return conn


def put_data(conn, q, data):
with conn.pipeline() as pipe:
for i in data:
pipe.lpush(q, i)
pipe.execute()


class SpiderRule(RuleType):
def __init__(self, rules, args=None):
super(SpiderRule, self).__init__(rules, args=None)
self.MAX_ARGS_LENGTH = int(self.rules['beacon']['max_args_length'])
self.MIN_HITS = int(self.rules['beacon']['min_hits'])
self.MAX_UNIQUE_ARGS = int(self.rules['beacon']['max_unique_args'])
self.THRESHOLD_PERCENT = int(self.rules['beacon']['threshold_percent'])
self.NUM_PROCESSES = int(self.rules['beacon']['threads'])
self.UA_PROCESSES = int(self.rules['beacon']['user_agent'])

self.TIMESTAMP = '@timestamp'
self.FORMAT_TIMESTAMP = self.rules['timestamp'].get('format', None)

self.beacon_module = self.rules['beacon']['beacon_module']
self.WINDOW = int(self.rules['beacon']['window'])
self.MIN_INTERVAL = int(self.rules['beacon']['min_interval'])
buffer_time = str(self.rules['buffer_time'])
self.PERIOD = ':'.join(buffer_time.split(':')[:2])

self.fields = self.normalized_field(self.rules['field'])
self.src_ip = self.fields['aliases']['src_ip']
self.url = self.fields['aliases']['url']
self.url_path = self.fields['aliases']['url_path']
self.http_host = self.fields['aliases']['http_host']
self.user_agent = self.fields['aliases']['user_agent']

self.json = self.rules['output']['json'].get('enable', None)
self.redis = self.rules['output']['redis'].get('enable', None)

self.q_job = JoinableQueue()
self.l_df = Lock()
self.l_list = Lock()

def normalized_field(self, d):
fields = {'hash': [], 'output': [], 'aliases': {}}
for field, info in d.items():
alias = info['alias']
fields['aliases'][alias] = field
for i in info.get('type', []):
fields[i].append(field)
return fields

def add_data(self, data):
# Detection of spider crawlers
self.df = pd.json_normalize(data)
results = self.find_spiders()

d = results.to_dict(orient="records")

# Output to local files
if self.json:
json_path = self.rules['output']['json']['path']
with open(json_path, 'a') as out_file:
for i in d:
out_file.write(json.dumps(i) + '\n')

# Output to Redis Server
if self.redis:
try:
host = self.rules['output']['redis']['host']
port = self.rules['output']['redis']['port']
password = self.rules['output']['redis']['password']
db = self.rules['output']['redis']['db']
key = self.rules['output']['redis']['key']
ioc = self.rules['output']['redis']['field']

redis_conn = conn(host=host, port=port,
password=password, db=db)
IoC = results[ioc].unique().tolist()
put_data(redis_conn, key, IoC)
except:
elastalert_logger.error("Output Redis configuration errors.")
self.add_match(d)

# The results of get_match_str will appear in the alert text
def get_match_str(self, match):
return json.dumps(match)

def add_match(self, results):
for result in results:
super(SpiderRule, self).add_match(result)

def get_args_hash(self, args, session_id):
return hash(tuple(args + [session_id]))

def get_query_str(self, request):
query = request.split('?')[-1]
query_str = dict([i.split("=", 1) for i in query.split(
"&") if i if len(i.split("=", 1)) == 2])
query_str['args_list'] = list(query_str.keys())
query_str['max_length'] = len(query_str)
query_str['url_sample'] = request
return query_str

def percent_grouping(self, d, total):
interval = 0
# Finding the key with the largest value (interval with most events)
mx_key = int(max(iter(list(d.keys())), key=(lambda key: d[key])))
mx_percent = 0.0

for i in range(mx_key - self.WINDOW, mx_key + 1):
current = 0
# Finding center of current window
curr_interval = i + int(self.WINDOW / 2)

for j in range(i, i + self.WINDOW):
if j in d:
current += d[j]

percent = float(current) / total * 100
if percent > mx_percent:
mx_percent = percent
interval = curr_interval

return interval, mx_percent

def find_beacon(self, session_data):
beacon = {}

if not self.FORMAT_TIMESTAMP:
session_data[self.TIMESTAMP] = pd.to_datetime(
session_data[self.TIMESTAMP])
else:
session_data[self.TIMESTAMP] = pd.to_datetime(
session_data[self.TIMESTAMP], format=self.FORMAT_TIMESTAMP)
session_data[self.TIMESTAMP] = (
session_data[self.TIMESTAMP].astype(int) / 1000000000).astype(int)

session_data = session_data.sort_values([self.TIMESTAMP])
session_data['delta'] = (
session_data[self.TIMESTAMP] - session_data[self.TIMESTAMP].shift()).fillna(0)
session_data = session_data[1:]
d = dict(session_data.delta.value_counts())

for key in list(d.keys()):
if key < self.MIN_INTERVAL:
del d[key]

# Finding the total number of events
total = sum(d.values())
if d and total > self.MIN_HITS:
window, percent = self.percent_grouping(d, total)
if percent > self.THRESHOLD_PERCENT and total > self.MIN_HITS:
beacon = {
'percent': int(percent),
'interval': int(window),
}

return beacon

def find_spider(self, q_job, spider_list):
while not q_job.empty():
session_id = q_job.get()
self.l_df.acquire()
session_data = self.df[self.df['session_id']
== session_id]
self.l_df.release()

query_str = session_data[self.url].apply(
lambda req: self.get_query_str(req)).tolist()
query_data = pd.DataFrame(query_str)

# get args_hash
query_data['args_hash'] = query_data['args_list'].apply(
lambda args: self.get_args_hash(args, session_id))

for i in query_data['args_hash'].unique():
result = {
"detail": {
'percent': {},
'unique': {}
},
"tags": [],
"src_ip": session_data[self.src_ip].tolist()[0],
"url_path": session_data[self.url_path].tolist()[0],
"http_host": session_data[self.http_host].tolist()[0],
"unique_ua": session_data[self.user_agent].unique().shape[0],
"alert": False,
}

df = query_data[query_data['args_hash'] == i]
count_args_length = df['max_length'].iloc[0]
if count_args_length > self.MAX_ARGS_LENGTH:
continue

total_hits = df.shape[0]
if total_hits < self.MIN_HITS:
continue

args_list = df['args_list'].iloc[0]
for i in args_list:
unique_args = len(df[i].unique())
if unique_args == 1:
continue

# Calculate the percentage based on the number of changes in the parameters
current_percent = int((unique_args / total_hits) * 100)
if current_percent < self.THRESHOLD_PERCENT:
continue

result['detail']['percent'][i] = current_percent
result['detail']['unique'][i] = unique_args

# Number of parameters with changes
count_unique_args = len(result['detail']['unique'])
if count_unique_args <= self.MAX_UNIQUE_ARGS:
result['alert'] = True

if not result['detail']['unique']:
continue

# Beacon analysis
if self.beacon_module:
result['beacon'] = self.find_beacon(
session_data.reset_index(drop=True))

result['args_list'] = args_list
result['total_hits'] = total_hits
result['url_sample'] = df['url_sample'].iloc[0]
result['period'] = self.PERIOD

if result['alert']:
result['tags'].append('enumerate')

if result['beacon']:
result['tags'].append('beacon')

if result['unique_ua'] >= self.UA_PROCESSES:
result['tags'].append('suspicious-ua')

self.l_list.acquire()
spider_list.append(result)
self.l_list.release()
q_job.task_done()

def find_spiders(self):
if self.df.empty:
raise Exception(
"Elasticsearch did not retrieve any data. Please ensure your settings are correct inside the config file.")

tqdm.pandas(desc="Detection of Spider Crawlers.")

# get url_path
self.df[self.url_path] = self.df[self.url].str.split('?').str.get(0)

# add session_id from hash fields
self.df['session_id'] = self.df[self.fields['hash']
].progress_apply(lambda row: hash(tuple(row)), axis=1)
# split url
self.df = self.df[self.df[self.url].apply(lambda request: True if len(
request.split('?')) == 2 else False)].reset_index(drop=True)
# normalized url
self.df[self.url] = self.df[self.url].apply(
lambda request: html.unescape(request))
# unique session_id
unique_session = self.df['session_id'].unique()

for session in unique_session:
self.q_job.put(session)

mgr = Manager()
spider_list = mgr.list()
processes = [Process(target=self.find_spider, args=(
self.q_job, spider_list,)) for thread in range(self.NUM_PROCESSES)]

# Run processes
for p in processes:
p.start()

# Exit the completed processes
for p in processes:
p.join()

results = pd.DataFrame(list(spider_list))

# add timestamp
now = datetime.datetime.now().isoformat()
results['timestamp'] = now

if not results.empty:
results = results[results['alert'] == True]

match_log = "Queried rule %s matches %s crawl events" % (
self.rules['name'],
results.shape[0]
)
elastalert_logger.info(match_log)

return results

配置文件
  • Web.yaml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
name: "Detection of Spider Crawlers"
es_host: "es_server"
es_port: 9200
type: "elastalert_modules.spider.my_rules.SpiderRule"

index: "zeek-other-%Y.%m.%d"
use_strftime_index: true

filter:
- term:
host: "canon88.github.io"
- term:
method.keyword: "GET"

include: ["true_client_ip", "host", "uri", "uri_path", "user_agent"]

timestamp:
format: false
timestamp_field: "@timestamp"

buffer_time:
hours: 12

run_every:
minutes: 10

max_query_size: 10000
scroll: true

beacon:
max_args_length: 10 # 最大检测参数个数
min_hits: 120 # 最小命中事件数
max_unique_args: 2 # 最大动态变化参数
threshold_percent: 70 # 请求阈值百分比
threads: 16 # 多进程
beacon_module: true # 开启周期性检测
min_interval: 1 # 最小周期
window: 2 # 抖动窗口
user_agent: 20 # 唯一UA个数

field:
true_client_ip:
alias: src_ip
type: [hash]
host:
alias: http_host
type: [hash]
uri_path:
alias: url_path
type: [hash]
uri:
alias: url
user_agent:
alias: user_agent

output:
json:
enable: yes # 本地输出
path: /var/log/spider/spider_detect.json
redis:
enable: no # 输出至Redis,联动情报数据进行研判。
host: redis_server
port: 6379
db: 0
password: redis_password
key: spider:feeds
field: src_ip

alert:
- debug

告警输出
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
{
"detail": {
"percent": {
"cookieid": 81
},
"unique": {
"cookieid": 133
}
},
"tags": [
"enumerate", // 存在参数遍历行为
"suspicious-ua" // user_agent 超过阈值
],
"src_ip": "54.160.169.250",
"url_path": "/image/cookieId.html",
"http_host": "canon88.github.io",
"unique_ua": 47,
"alert": true,
"beacon": {},
"args_list": [
"cookieid"
],
"total_hits": 164,
"url_sample": "/image/cookieId.html?cookieid=E99A3E54-5A81-2907-1372-339FFB70A464",
"period": "1:00",
"timestamp": "2020-06-02T11:07:59.276581"
}
简述

find_spider: 用于检测参数遍历的行为,这里加上find_beacon是为了增加一个周期性的检测维度。当然很多爬虫都会「自带」时间抖动,以及使用爬虫池,所以效果并不是特别明显。

find_beacon:更适用于检测C2连接,例如针对DNS域名的请求这种情况,这里有一个检测到的域名周期性请求的告警:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
{
"src_ip": "x.x.x.228",
"hostname": "entitlement.service.imperva.com",
"answers": [
"joxkwsf.x.incapdns.net",
"45.60.73.51"
],
"percent": "100",
"interval": 1800,
"occurrences": 23,
"timestamp": "2020-06-01T08:03:38.164363",
"period": 12,
"event_type": "beaconing",
"num_hits": 806379,
"num_matches": 3,
"kibana_url": "https://canon88.github.io/goto/5f089bcc411426b854da71b9062fdc8c"
}

image-20200603113849810

总结

​ 1小时内,IP: 54.160.169.250 总共访问了该接口164次且cookieid参数更换了133次,占到总请求量的81%。并更换了47个不同的user_agent。

image-20200602114610615

参考

Flare

ElastAlert

背景

  1. 本地环境中部署了2台NTA(Suricata)接收内网12台DNS服务器的流量,用于发现DNS请求中存在的安全问题。近一段时间发现2台NTA服务器运行10小时左右就会自动重启Suricata进程,看了一下日志大概意思是说内存不足,需要强制重启释放内存。说起这个问题当时也是花了一些时间去定位。首先DNS这种小包在我这平均流量也就25Mbps,所以大概率不是因为网卡流量过大而导致的。继续定位,由于我们这各个应用服务器会通过内网域名的形式进行接口调用,所以DNS请求量很大。Kibana上看了一下目前dns_type: query事件的数据量320,000,000/天 ~ 350,000,000/天(这仅仅是dns_type: query数据量,dns_type: answer 数据量也超级大)。由于Suricata不能在数据输出之前对指定域名进行过滤,这一点确实很傻必须吐槽。当时的规避做法就是只保留dns_type: query事件,既保证了Suricata的正常运行也暂时满足了需求。

  2. 近一段时间网站的某个上传接口被上传了包含恶意Payload的jpg与png。虽然Suricata有检测到,但也延伸了新的需求,如何判断文件是否上传成功以及文件还原与提取HASH。虽然这两点Suricata自身都可以做,但是有一个弊端不得不说。例如Suricata只要开启file_info就会对所有支持文件还原的协议进行HASH提取。由于我们是电商,外部访问的数据量会很大,Suricata默认不支持过滤,针对用户访问的HTML网页这种也会被计算一个HASH,这个量就非常的恐怖了。

总结:针对以上2个问题,我需要的是一个更加灵活的NTA框架,下面请来本次主角 - Zeek


需求

  1. 过滤内部DNS域名,只保留外部DNS域名的请求与响应数据;
  2. 更灵活的文件还原与提取HASH;

实现

1. 过滤本地DNS请求
DNS脚本

dns-filter_external.zeek

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
redef Site::local_zones = {"canon88.github.io", "baidu.com", "google.com"};

function dns_filter(rec: DNS::Info): bool
{
return ! Site::is_local_name(rec$query);
}

redef Analyzer::disable_all = T

event zeek_init()
{
Analyzer::enable_analyzer(Analyzer::ANALYZER_VXLAN);
Analyzer::enable_analyzer(Analyzer::ANALYZER_DNS);
Log::remove_default_filter(DNS::LOG);
local filter: Log::Filter = [$name="dns_split", $path="/data/logs/zeek/dns_remotezone", $pred=dns_filter];
Log::add_filter(DNS::LOG, filter);
}
脚本简述:
  1. 通过Site::local_zones定义一个内部的域名,这些域名默认都是我们需要过滤掉的。例如,在我的需求中,多为内网的域名;

    1. 优化性能, 只开启DNS流量解析。由于这2台NTA只负责解析DNS流量,为了保留针对域名特征检测的能力,我选择了SuricataZeek共存,当然Zeek也可以做特征检测,只是我懒。。。通过Analyzer::enable_analyzer(Analyzer::ANALYZER_DNS);指定了只对DNS流量进行分析;
    2. 过滤日志并输出;
日志样例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
{
"ts": 1589175829.994196,
"uid": "CPRxOZ2RtkPYhjz8R9",
"id.orig_h": "1.1.1.1",
"id.orig_p": 40411,
"id.resp_h": "2.2.2.2",
"id.resp_p": 53,
"proto": "udp",
"trans_id": 696,
"rtt": 1.3113021850585938e-05,
"query": "graph.facebook.com",
"qclass": 1,
"qclass_name": "C_INTERNET",
"qtype": 1,
"qtype_name": "A",
"rcode": 0,
"rcode_name": "NOERROR",
"AA": false,
"TC": false,
"RD": true,
"RA": true,
"Z": 0,
"answers": [
"api.facebook.com",
"star.c10r.facebook.com",
"157.240.22.19"
],
"TTLs": [
540,
770,
54
],
"rejected": false,
"event_type": "dns"
}
总结:

当采用Zeek过滤了DNS请求后,现在每天的DNS数据量 6,300,000/天 ~ 6,800,000/天(query + answer),对比之前的数据量320,000,000/天 ~ 350,000,000/天(query)。数据量减少很明显,同时也减少了后端ES存储的压力。

  • Zeek DNS (query + answer)

image-20200511135508651

  • Suricata DNS (query)

image-20200511140015791


2. 更灵活的文件还原与提取文件HASH
文件还原脚本

file_extraction.zeek

Demo脚本,语法并不是很优雅,切勿纠结。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
@load base/frameworks/files/main
@load base/protocols/http/main

module Files;

export {
redef record Info += {
hostname: string &log &optional;
proxied: set[string] &log &optional;
url: string &log &optional;
method: string &log &optional;
true_client_ip: string &log &optional;
logs: bool &log &optional;
};

option http_info = T;
}

redef FileExtract::prefix = "/data/logs/zeek/extracted_files/";

global mime_to_ext: table[string] of string = {
["text/plain"] = "txt",
["application/x-executable"] = "",
["application/x-dosexec"] = "exe",
["image/jpeg"] = "jpg",
["image/png"] = "png",
["application/pdf"] = "pdf",
["application/java-archive"] = "jar",
["application/x-java-applet"] = "jar",
["application/x-java-jnlp-file"] = "jnlp",
["application/msword"] = "doc",
["application/vnd.openxmlformats-officedocument.wordprocessingml.document"] = "docs",
["application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"] = "xlsx",
["application/vnd.openxmlformats-officedocument.presentationml.presentation"] = "pptx",
};

global file_analyzer: table[string] of bool = {
["Extraction"] = T,
};

global http_method: set[string] = {
"GET",
"POST",
};

global http_hostname: set[string] = {
"canon88.github.io",
};

global http_uri: set[string] = {
"/index.php",
"/account.php",
};

function files_filter(rec: Files::Info): bool
{
return rec?$logs;
}

event zeek_init()
{
Log::remove_default_filter(Files::LOG);
local filter: Log::Filter = [$name="file_extraction", $path="/data/logs/zeek/file_extraction", $pred=files_filter];
Log::add_filter(Files::LOG, filter);
}

event file_sniff(f: fa_file, meta: fa_metadata) &priority=3
{
if ( f$source != "HTTP" )
return;

if ( ! f$http?$method )
return;

if ( f$http$method !in http_method )
return;

if ( ! f$http?$host )
return;

if ( f$http$host !in http_hostname )
return;

if ( ! meta?$mime_type )
return;

if ( meta$mime_type !in mime_to_ext )
return;

f$info$logs = T;

if ( file_analyzer["Extraction"] )
{
local fname = fmt("%s-%s.%s", f$source, f$id, mime_to_ext[meta$mime_type]);
Files::add_analyzer(f, Files::ANALYZER_EXTRACT, [$extract_filename=fname]);
}

Files::add_analyzer(f, Files::ANALYZER_MD5);

if ( http_info )
{
if ( f$http?$host )
f$info$hostname = f$http$host;
if ( f$http?$proxied )
f$info$proxied = f$http$proxied;
if ( f$http?$method )
f$info$method = f$http$method;
if ( f$http?$uri )
f$info$url = f$http$uri;
if ( f$http?$true_client_ip )
f$info$true_client_ip = f$http$true_client_ip;
}
}


event file_state_remove(f: fa_file)
{
if ( !f$info?$extracted || !f$info?$md5 || FileExtract::prefix == "" || !f$info?$logs )
return;

local orig = f$info$extracted;
local split_orig = split_string(f$info$extracted, /\./);
local extension = split_orig[|split_orig|-1];
local ntime = fmt("%D", network_time());
local ndate = sub_bytes(ntime, 1, 10);
local dest_dir = fmt("%s%s", FileExtract::prefix, ndate);
mkdir(dest_dir);
local dest = fmt("%s/%s.%s", dest_dir, f$info$md5, extension);
local cmd = fmt("mv %s/%s %s", FileExtract::prefix, orig, dest);
when ( local result = Exec::run([$cmd=cmd]) )
{

}
f$info$extracted = dest;
}
脚本简述:
 1. 支持针对指定hostname,method,url,文件头进行hash的提取以及文件还原;
 2. 默认文件还原按照年月日进行数据的存储,保存名字按照MD5名称命名;
 3. 丰富化了文件还原的日志,增加HTTP相关字段;
日志样例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
{
"ts": 1588891497.173108,
"fuid": "FhOGNc2zDdlF3AP5c",
"tx_hosts": [
"1.1.1.1"
],
"rx_hosts": [
"2.2.2.2"
],
"conn_uids": [
"CItQs61wvvtPqSB0Ub"
],
"source": "HTTP",
"depth": 0,
"analyzers": [
"MD5",
"SHA1",
"EXTRACT"
],
"mime_type": "image/png",
"duration": 0,
"local_orig": true,
"is_orig": false,
"seen_bytes": 353,
"total_bytes": 353,
"missing_bytes": 0,
"overflow_bytes": 0,
"timedout": false,
"md5": "fd0229d400049449084b3864359c445a",
"sha1": "d836d3f06c0fc075cf0f5d95f50b79cac1dac97d",
"extracted": "/data/logs/zeek/extracted_files/2020-05-07/fd0229d400049449084b3864359c445a.png",
"extracted_cutoff": false,
"hostname": "canon88.github.io",
"proxied": [
"TRUE-CLIENT-IP -> 3.3.3.3",
"X-FORWARDED-FOR -> 4.4.4.4"
],
"url": "/image/close.png",
"method": "GET",
"true_client_ip": "3.3.3.3",
"logs": true
}
文件还原

这是其中一个包含恶意Payload还原出的图片样例

image-20200511150738194

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
$ ll /data/logs/zeek/extracted_files/
total 89916
drwxr-xr-x 10 root root 150 May 11 06:14 ./
drwxr-xr-x 4 root root 67 May 11 06:00 ../
drwxr-xr-x 2 root root 50 May 5 07:54 2020-05-04/
drwxr-xr-x 2 root root 4096 May 5 23:51 2020-05-05/
drwxr-xr-x 2 root root 671744 May 6 23:41 2020-05-06/
drwxr-xr-x 2 root root 4096 May 7 22:44 2020-05-07/
drwxr-xr-x 2 root root 741376 May 8 23:59 2020-05-08/
drwxr-xr-x 2 root root 23425024 May 9 23:59 2020-05-09/
drwxr-xr-x 2 root root 25047040 May 10 23:59 2020-05-10/
drwxr-xr-x 2 root root 24846336 May 11 06:14 2020-05-11/

$ xxd /data/logs/zeek/extracted_files/2020-05-07/884d9474180e5b49f851643cb2442bce.jpg
00000000: 8950 4e47 0d0a 1a0a 0000 000d 4948 4452 .PNG........IHDR
00000010: 0000 003c 0000 003c 0806 0000 003a fcd9 ...<...<.....:..
00000020: 7200 0000 1974 4558 7453 6f66 7477 6172 r....tEXtSoftwar
00000030: 6500 4164 6f62 6520 496d 6167 6552 6561 e.Adobe ImageRea
00000040: 6479 71c9 653c 0000 0320 6954 5874 584d dyq.e<... iTXtXM
00000050: 4c3a 636f 6d2e 6164 6f62 652e 786d 7000 L:com.adobe.xmp.
..........
00001030: 8916 ce5f 7480 2f38 c073 69f1 5c14 83fb ..._t./8.si.\...
00001040: aa9d 42a3 8f4b ff05 e012 e04b 802f 01be ..B..K.....K./..
00001050: 04b8 91c7 ff04 1800 bcae 819f d1da 1896 ................
00001060: 0000 0000 4945 4e44 ae42 6082 3c3f 7068 ....IEND.B`.<?ph
00001070: 7020 7068 7069 6e66 6f28 293b 3f3e 1a p phpinfo();?>.
总结:

通过Zeek脚本扩展后,可以“随意所欲”的获取各种类型文件的Hash以及定制化的进行文件还原。

头脑风暴

当获取文件上传的Hash之后,可以尝试扩展出以下2个安全事件:

  1. 判断文件是否上传成功。

    通常第一时间会需要定位文件是否上传成功,若上传成功需要进行相关的事件输出,这个时候我们可以通过采用HIDS进行文件落地事件的关联。

  2. 关联杀毒引擎/威胁情报。

    将第一个关联好的事件进行Hash的碰撞,最常见的是将HASH送到VT或威胁情报。

这里以Wazuh事件为例,将Zeek的文件还原事件与Wazuh的新增文件事件进行关联,关联指标采用Hash

a. Zeek事件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
{
"ts": 1589158812.471443,
"fuid": "FBkqzM2AFg0jrioji6",
"tx_hosts": [
"1.1.1.1"
],
"rx_hosts": [
"2.2.2.2"
],
"conn_uids": [
"CcOyQo2ziEuoLBNIb9"
],
"source": "HTTP",
"depth": 0,
"analyzers": [
"SHA1",
"EXTRACT",
"MD5",
"DATA_EVENT"
],
"mime_type": "text/plain",
"duration": 0,
"local_orig": true,
"is_orig": true,
"seen_bytes": 31,
"total_bytes": 31,
"missing_bytes": 0,
"overflow_bytes": 0,
"timedout": false,
"md5": "37a74f452f1c49854a2951fd605687c5",
"extracted": "/data/logs/zeek/extracted_files/2020-05-11/37a74f452f1c49854a2951fd605687c5.txt",
"extracted_cutoff": false,
"hostname": "canon88.github.io",
"proxied": [
"X-FORWARDED-FOR -> 3.3.3.3",
"TRUE-CLIENT-IP -> 4.4.4.4"
],
"url": "/index.php",
"method": "POST",
"true_client_ip": "4.4.4.4",
"logs": true
}

b. Wazuh 事件

image-20200511144805354

参考

reindex

1
2
3
4
5
6
7
8
9
10
11
12
from elasticsearch import Elasticsearch
from elasticsearch import helpers

host = ['es_host1', 'es_host2', 'es_host3']
port = 9200
timeout = 600
auth_user = 'elastic'
auth_password = 'hello world'
use_ssl = True
ca_certs = '/opt/certs/ca/ca.crt'

es = Elasticsearch(host, port=port, timeout=timeout, http_auth=(auth_user, auth_password), verify_certs=True, use_ssl=use_ssl, ca_certs=ca_certs)
按照指定日期重建索引
1
2
3
4
5
6
7
8
9
10
11
12
13
import datetime
import time

begin_date = (datetime.datetime.now() - datetime.timedelta(days = 10)).strftime("%Y.%m.%d")
begin_date = datetime.datetime.strptime(begin_date, "%Y.%m.%d")
end_date = (datetime.datetime.now() - datetime.timedelta(days = 1)).strftime("%Y.%m.%d")
end_date = datetime.datetime.strptime(end_date, "%Y.%m.%d")

date_list = []
while begin_date <= end_date:
date_str = begin_date.strftime("%Y.%m.%d")
date_list.append(date_str)
begin_date += datetime.timedelta(days=1)
1
2
3
4
5
6
7
8
9
10
11
date_list
['2020.03.19',
'2020.03.20',
'2020.03.21',
'2020.03.22',
'2020.03.23',
'2020.03.24',
'2020.03.25',
'2020.03.26',
'2020.03.27',
'2020.03.28']
1
2
3
4
5
6
7
8
9
chunk_size = 10000
for day in date_list:
source_index = 'wazuh-alerts-3.x-' + day
target_index = 'siem-alerts-' + day
helpers.reindex(
client=es, source_index=source_index, target_index=target_index,
target_client=es, chunk_size=chunk_size
)
print(source_index + ' -> ' + target_index + ' fin.')

写在前面

​ 这并不是什么高精尖的架构与技术。这只是我个人在工作中结合目前手头的资源进行了一些整合 。当然实现这些需求的方法有很多, 有钱的可以考虑Splunk, 没钱的有研发的团队的可以上Flink、Esper 。

需求

​ 由于攻防对抗的升级, 通过单一的数据源很难直接断言攻击是否成功。因此, 我们需要结合多个数据源进行安全事件的关联, 从中提炼出可靠性较高的安全告警进行人工排查。例如: 针对WebShell上传类的, 可以通过网络流量 + 终端进行关联; 针对Web攻击类, 可以通过WAF + NIDS的事件关联, 得到Bypass WAF 的安全告警。

解决思路

​ 虽然Wazuh本身具备安全事件的关联能力, 但在传统的部署架构中, 通常是由Wazuh Agent将安全事件发送到Wazuh Manager,通过Manager进行安全事件的关联告警。由于缺少了对数据进行ETL, 使得Wazuh Manager很难对异构数据进行关联。因此, 我们需要通过Logstash实现对数据的标准化, 并将标准化后的数据通过Syslog的形式发送到Wazuh Manager, 从而进行异构数据的关联。

坑点
  1. 本次改造采用了Syslog的形式将数据发送到Wazuh Manager端进行数据关联。由于Syslog 默认采用了UDP协议进行数据传输, 当数据发送过大时将会导致数据截断的报错。针对此问题, 需改用TCP的形式进行数据发送规避此问题。
  2. Wazuh Manager 部分告警缺少”必要“关联字段的现象。如: 本次场景中syscheck告警事件, 默认不会携带srcip的字段, 对于此问题可以通过在Manager上编写一个预处理脚本来解决。

改造

  1. 改造之前:

    Suricata (Wazuh agent) —(Agent: UDP 1514)—> Wazuh Manager

  2. 改造之后:

    所有的标准化都由Logstash来进行, Filebeat只需要做’无脑‘转发即可。

workflow:

image-20200303215020561


Filebeat配置
  • filebeat.yaml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
#=========================== Filebeat inputs =============================
filebeat.inputs:
- type: log
paths:
- "/var/log/suricata/alert-*.json"
fields_under_root: true
fields: { application: suricata }
json.keys_under_root: true
json.overwrite_keys: true
json.message_key: log
tail_files: false

scan_frequency: 1s
harvester_buffer_size: 104857600
backoff: 1s
max_backoff: 10s
close_timeout: 30m
close_inactive: 10m
clean_inactive: 72h
ignore_older: 70h
registry_file: /etc/filebeat/registry/wazuh/

#================================ Processors ==================================
processors:
- drop_fields:
fields: ["ecs.version", "agent.ephemeral_id", "agent.version", "agent.type", "agent.id", "agent.ephemeral_id", "input.type"]

#================================ Outputs =====================================
output.logstash:
hosts: ["logstash:5010"]
loadbalance: true
worker: 4
compression_level: 3
bulk_max_size: 4096

Logstash配置
  • 00_input.conf
1
2
3
4
5
6
7
input {
beats {
port => 5010
codec => "json_lines"
tags => ["beats"]
}
}
  • 50_suricata.conf
1
2
3
4
5
6
7
8
filter {
if [application] == "suricata" {
date {
match => [ "timestamp", "ISO8601" ]
target => "timestamp"
}
}
}
  • mapping.json
1
2
3
4
5
6
7
8
{
"common_mapping": {
"src_ip": "srcip",
"dest_ip": "dstip",
"src_port": "srcport",
"dest_port": "dstport"
}
}
  • 70_normalized-suricata.conf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
filter {
clone {
clones => [ "siem_events" ]
}
}

filter {
if [type] == "siem_events" {
mutate {
remove_field => [ "application", "type", "agent", "@version", "@timestamp"]
add_field => {
"provider" => "Suricata"
"product" => "Intrusion Detection System"
}
}

ruby {
init => "
require 'json'

mapping_json = File.read('/etc/logstash/mappings/wazuh/mapping.json')
mapping = JSON.parse(mapping_json)
@common_mapping = mapping['common_mapping']
"

code => "
keys = event.to_hash.keys
keys.each do |key|
if @common_mapping.include? key then
value = event.get(key)
event.remove(key)
new_key = @common_mapping[key]
event.set(new_key, value)
end
end

sensor = event.get('[host][name]')
event.set('sensor', sensor)
"
}
}
}
  • 99_output-elasticsearch.conf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
output {
if [event_type] == "alert" {
elasticsearch {
cacert => "/etc/logstash/certs/ca/ca.crt"
user => "elastic"
password => "Hello World!"
hosts => ["https://elasticsearch:9200"]
index => "suricata-%{+YYYY.MM.dd}"
template => "/etc/logstash/index-template.d/suricata-template.json"
template_name => "suricata"
template_overwrite => true
}
}
}
  • 99_output-wazuh.conf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
output {
if [provider] == "Suricata" {
syslog {
host => "wazuh"
protocol => "tcp"
port => 514
codec => "json"
sourcehost => "logstash"
appname => "NORMALIZED"
}
#stdout {
#codec => rubydebug
#}
}
}

Wazuh配置
  • custom-syscheck.py

    Wazuh Manager 新增预处理脚本对指定的安全事件进行预处理。如: syscheck事件增加srcip字段。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
import json
import sys
import time
import os
from datetime import datetime, timedelta, timezone

# ossec.conf configuration:
# <integration>
# <name>custom-syscheck</name>
# <rule_id>554</rule_id>
# <group>syscheck</group>
# <alert_format>json</alert_format>
# </integration>

# Global vars
debug_enabled = False
pwd = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
json_alert = {}
now = time.strftime("%a %b %d %H:%M:%S %Z %Y")
wazuh_server = "192.168.199.97"

# Set paths
log_file = '{0}/logs/integrations.log'.format(pwd)
syscheck_file = '{0}/logs/syscheck.json'.format(pwd)

def iso8601(hours=8):
td = timedelta(hours=hours)
tz = timezone(td)
return datetime.now(tz=tz).isoformat()

def main(args):
debug("# Starting")

# Read args
alert_file_location = args[1]

debug("# File location")
debug(alert_file_location)

# Load alert. Parse JSON object.
with open(alert_file_location) as alert_file:
json_alert = json.load(alert_file)
debug("# Processing alert")
debug(json_alert)

alert = normalized_data(json_alert)
with open(syscheck_file, 'a') as f:
msg = json.dumps(alert)
f.write(msg + '\n')

def debug(msg):
if debug_enabled:
msg = "{0}: {1}\n".format(now, msg)
with open(log_file, "a") as f:
f.write(msg)

def normalized_data(alert):
if alert['agent']['id'] == '000':
alert['srcip'] = wazuh_server
elif alert['agent'].get('ip'):
alert['srcip'] = alert['agent']['ip']
alert['dstip'] = alert['agent']['ip']
alert['integration'] = 'custom-syscheck'
alert['create_timestamp'] = iso8601()
debug(alert)
return(alert)

if __name__ == "__main__":
try:
# Read arguments
bad_arguments = False
if len(sys.argv) >= 4:
msg = '{0} {1} {2} {3} {4}'.format(now, sys.argv[1], sys.argv[2], sys.argv[3], sys.argv[4] if len(sys.argv) > 4 else '')
else:
msg = '{0} Wrong arguments'.format(now)
bad_arguments = True

# Logging the call
with open(log_file, 'a') as f:
f.write(msg + '\n')

if bad_arguments:
debug("# Exiting: Bad arguments.")
sys.exit(1)

# Main function
main(sys.argv)

except Exception as e:
debug(str(e))
raise
  • ossec.conf
    • 配置syslog选用TCP协议;
    • 加载预处理脚本;
    • 加载脚本输出日志;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
<ossec_config>
<remote>
<connection>syslog</connection>
<port>514</port>
<protocol>tcp</protocol> <!-- udp(default)/tcp -->
<allowed-ips>192.168.199.0/24</allowed-ips>
</remote>

<!-- Custom external Integration -->
<integration>
<name>custom-syscheck</name>
<rule_id>554</rule_id>
<group>syscheck</group>
<alert_format>json</alert_format>
</integration>

<!-- Custom syscheck.json -->
<localfile>
<log_format>json</log_format>
<location>/var/ossec/logs/syscheck.json</location>
</localfile>
</ossec_config>
  • local_decoder_normalized.xml

Sample

1
2
3
4
5
6
7
8
<!--
2020 Mar 03 02:53:39 logstash NORMALIZED[-]: {"timestamp":"2020-02-21T19:47:04.382300+0800","flow_id":1133835979634527,"in_iface":"wlp3s0","event_type":"alert","src_ip":"192.168.199.97","src_port":60022,"dest_ip":"192.168.199.162","dest_port":59143,"proto":"TCP","alert":{"action":"allowed","gid":1,"signature_id":123456,"rev":1,"signature":"LOCAL RULES XXX","severity":3}}
-->

<decoder name="nta_json">
<prematch>NORMALIZED[-]: </prematch>
<plugin_decoder offset="after_prematch">JSON_Decoder</plugin_decoder>
</decoder>
  • 0901-local_raw.xml
    • 默认引用的解码器为json, 这里需要修改为刚才新增的nta_json;
    • 通过overwrite=yes覆盖原始规则;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
<group name="suricata,">

<!-- /var/ossec/ruleset/rules/0475-suricata_rules.xml -->
<!-- Defind Suricata Rules -->
<!-- ID: 86600 - 86699 -->

<rule id="86600" level="0" overwrite="yes">
<decoded_as>nta_json</decoded_as>
<field name="timestamp">\.+</field>
<field name="event_type">\.+</field>
<description>Suricata messages.</description>
<options>no_full_log</options>
</rule>

</group>
  • 0905-local_syscheck.xml

    为预处理脚本生成的日志进行解析

1
2
3
4
5
6
7
8
<group name="syscheck,">
<rule id="187100" level="7">
<decoded_as>json</decoded_as>
<field name="integration">custom-syscheck</field>
<description>syscheck integration messages.</description>
<options>no_full_log</options>
</rule>
</group>
  • 9999-local_composite.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
<group name="local,composite,">

<!-- Local Composite Rules Range ID: 200000 - 205000 -->

<rule id="200000" level="15" frequency="2" timeframe="600">
<if_matched_sid>101000</if_matched_sid> <!-- 文件上传类规则 or 检测WebShell上传类规则 -->
<if_sid>187100</if_sid>
<same_source_ip /> <!-- 通过IP进行关联 -->
<description>Phase 3: 检测到服务器:$(srcip), 被上传WebShell.</description>
<options>no_full_log</options>
</rule>

<rule id="200001" level="12" frequency="2" timeframe="600">
<if_matched_sid>88801</if_matched_sid> <!-- WAF安全事件 -->
<if_group>ids</if_group>
<same_source_ip />
<description>Phase 3: Alarm - Same ip Bypass WAF of within 600 seconds. $(srcip) -> $(http.hostname) -> $(alert.signature) -> $(alert.signature_id).</description>
<options>no_full_log</options>
</rule>

</group>

总结

  1. 对于WebShell关联检测,目前采用的是同源IP以及时序的关联, 最为靠谱的应该是通过Hash的比对。这里要吐槽一下Suricata默认的fileinfo, 没办法自定义输出, 只要开启可被还原的协议都会输出fileinfo的事件。正因如此, 数据量一大Wazuh的引擎压力会很大。我尝试通过Lua来自定义一个文件审计类的事件, 貌似也同样没办法区分协议更别说针对http过滤条件进行自定义的过滤输出了。

  2. 由于关联规则本身是通过底层多个安全事件进行维度的关联提升告警的可靠性。因此, 底层安全事件不够准确同样会让上层的关联规则带来大量误报。对于底层安全事件的优化也是需要持续进行的。

  3. Wazuh v3.11.4 采用syslog接收大日志时, 会触发memory violation导致ossec-remoted进程重启, 该问题已向社区反馈下个版本中会解决。

参考

需求

​ 目前平台接入了Suricata的告警规则, 由于镜像源的关系部分规则产生了**’误’**告警, 因此需要针对这部分规则进行IP地址的过滤。

解决方法

  1. 修改Suricata的规则, 如果你的**’误’**告警量很大且为了性能考虑, 推荐直接修改Suricata的规则。
  2. 由于我这边的Suricata告警都是利用Wazuh进行**’消费’的。因此, 我这边直接采用了Wazuh CDB list**这个功能进行指定IP地址的过滤。

步骤

1. 创建 CDB list

Each key must be unique and is terminated with a colon :.

For IP addresses the dot notation is used for subnet matches:

key CIDR Possible matches
192.168.: 192.168.0.0/16 192.168.0.0 - 192.168.255.255
172.16.19.: 172.16.19.0/24 172.16.19.0 - 172.16.19.255
10.1.1.1: 10.1.1.1/32 10.1.1.1
1
2
3
$ vim /var/ossec/etc/lists/private_ip

10.168.:PrivateNet

Since Wazuh v3.11.3, CDB lists are built and loaded automatically when the analysis engine is started. Therefore, when adding or modifying CDB lists, it is no longer needed to run ossec-makelists, just restart the manager.

从Wazuh v3.11.3开始,将在启动分析引擎时自动构建和加载CDB列表。因此,添加或修改CDB列表时,不再需要运行ossec-makelists,只需重新启动管理器即可。

3.11.3 之前版本需要执行

1
$ /var/ossec/bin/ossec-makelists
2. 在 ossec.conf 中添加 list
1
2
3
4
5
6
7
8
$ vim /var/ossec/etc/ossec.conf

<ossec_config>
<ruleset>
<!-- User-defined CDB -->
<list>etc/lists/private_ip</list>
</ruleset>
</ossec_config>
3. 重启进程
1
$ systemctl restart wazuh-manager
4. 配置规则
1
2
3
4
5
6
7
8
9
10
11
12
13
14
<var name="SAME_IP_TIME">120</var>
<var name="SAME_IP_IGORE">300</var>

<group name="local,suricata,ids,">

<rule id="102018" level="8" frequency="5" timeframe="$SAME_IP_TIME" ignore="$SAME_IP_IGORE">
<if_matched_sid>86601</if_matched_sid>
<field name="alert.signature_id">2013057</field>
<list field="src_ip" lookup="not_address_match_key">etc/lists/private_ip</list>
<description>Wazuh Rules - Same ip of attack occurred 5 times within $SAME_IP_TIME seconds. $(src_ip) -> $(alert.signature) -> $(alert.signature_id).</description>
<options>no_full_log</options>
</rule>

</group>
5. 测试规则
1
$ /var/ossec/bin/ossec-logtest

参考