为 Elastalert 增加 参数遍历、周期性检测规则

背景

​ 由于AWS流量镜像的特殊性,现阶段生产网的架构中只接入了HTTPDNS流量,分别采用了ZeekSuricata对现有流量进行分析与预警。Suricata负责基于签名的特征检测,Zeek负责定制化事件的脚本检测,也算是“各司其职”。近几日,某个业务接口出现了Pindom告警,经过分析发现部分IP尝试对该接口的参数进行遍历。由于遍历参数对应的值设置的都比较大,且后台并未对该参数进行深度的限制,导致了服务器会不断的进行计算,最终导致接口无响应。

需求

  • 检测参数遍历行为;
  • 访问是否存在周期性
  • unique user_agent 统计;
  • threat intelligence 研判;

实现

​ 通过扩展**ElastAlert**告警框架的告警模型,来实现以上需求。

参数遍历
  • 新增规则 - Spider.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
import sys
import json
import redis
import html
import datetime
from multiprocessing import Process, JoinableQueue, Lock, Manager

from elastalert.ruletypes import RuleType
from elastalert.util import elastalert_logger

try:
import pandas as pd
except:
print("Please make sure you have pandas installed. pip install pandas")
sys.exit(0)

try:
from tqdm import tqdm
except:
print("Please make sure you have tqdm module installed. pip install tqdm")
sys.exit(0)


def conn(host='localhost', port=6379, password=None, db=0):
pool = redis.ConnectionPool(host=host, port=port, password=password, db=db)
conn = redis.Redis(connection_pool=pool)
return conn


def put_data(conn, q, data):
with conn.pipeline() as pipe:
for i in data:
pipe.lpush(q, i)
pipe.execute()


class SpiderRule(RuleType):
def __init__(self, rules, args=None):
super(SpiderRule, self).__init__(rules, args=None)
self.MAX_ARGS_LENGTH = int(self.rules['beacon']['max_args_length'])
self.MIN_HITS = int(self.rules['beacon']['min_hits'])
self.MAX_UNIQUE_ARGS = int(self.rules['beacon']['max_unique_args'])
self.THRESHOLD_PERCENT = int(self.rules['beacon']['threshold_percent'])
self.NUM_PROCESSES = int(self.rules['beacon']['threads'])
self.UA_PROCESSES = int(self.rules['beacon']['user_agent'])

self.TIMESTAMP = '@timestamp'
self.FORMAT_TIMESTAMP = self.rules['timestamp'].get('format', None)

self.beacon_module = self.rules['beacon']['beacon_module']
self.WINDOW = int(self.rules['beacon']['window'])
self.MIN_INTERVAL = int(self.rules['beacon']['min_interval'])
buffer_time = str(self.rules['buffer_time'])
self.PERIOD = ':'.join(buffer_time.split(':')[:2])

self.fields = self.normalized_field(self.rules['field'])
self.src_ip = self.fields['aliases']['src_ip']
self.url = self.fields['aliases']['url']
self.url_path = self.fields['aliases']['url_path']
self.http_host = self.fields['aliases']['http_host']
self.user_agent = self.fields['aliases']['user_agent']

self.json = self.rules['output']['json'].get('enable', None)
self.redis = self.rules['output']['redis'].get('enable', None)

self.q_job = JoinableQueue()
self.l_df = Lock()
self.l_list = Lock()

def normalized_field(self, d):
fields = {'hash': [], 'output': [], 'aliases': {}}
for field, info in d.items():
alias = info['alias']
fields['aliases'][alias] = field
for i in info.get('type', []):
fields[i].append(field)
return fields

def add_data(self, data):
# Detection of spider crawlers
self.df = pd.json_normalize(data)
results = self.find_spiders()

d = results.to_dict(orient="records")

# Output to local files
if self.json:
json_path = self.rules['output']['json']['path']
with open(json_path, 'a') as out_file:
for i in d:
out_file.write(json.dumps(i) + '\n')

# Output to Redis Server
if self.redis:
try:
host = self.rules['output']['redis']['host']
port = self.rules['output']['redis']['port']
password = self.rules['output']['redis']['password']
db = self.rules['output']['redis']['db']
key = self.rules['output']['redis']['key']
ioc = self.rules['output']['redis']['field']

redis_conn = conn(host=host, port=port,
password=password, db=db)
IoC = results[ioc].unique().tolist()
put_data(redis_conn, key, IoC)
except:
elastalert_logger.error("Output Redis configuration errors.")
self.add_match(d)

# The results of get_match_str will appear in the alert text
def get_match_str(self, match):
return json.dumps(match)

def add_match(self, results):
for result in results:
super(SpiderRule, self).add_match(result)

def get_args_hash(self, args, session_id):
return hash(tuple(args + [session_id]))

def get_query_str(self, request):
query = request.split('?')[-1]
query_str = dict([i.split("=", 1) for i in query.split(
"&") if i if len(i.split("=", 1)) == 2])
query_str['args_list'] = list(query_str.keys())
query_str['max_length'] = len(query_str)
query_str['url_sample'] = request
return query_str

def percent_grouping(self, d, total):
interval = 0
# Finding the key with the largest value (interval with most events)
mx_key = int(max(iter(list(d.keys())), key=(lambda key: d[key])))
mx_percent = 0.0

for i in range(mx_key - self.WINDOW, mx_key + 1):
current = 0
# Finding center of current window
curr_interval = i + int(self.WINDOW / 2)

for j in range(i, i + self.WINDOW):
if j in d:
current += d[j]

percent = float(current) / total * 100
if percent > mx_percent:
mx_percent = percent
interval = curr_interval

return interval, mx_percent

def find_beacon(self, session_data):
beacon = {}

if not self.FORMAT_TIMESTAMP:
session_data[self.TIMESTAMP] = pd.to_datetime(
session_data[self.TIMESTAMP])
else:
session_data[self.TIMESTAMP] = pd.to_datetime(
session_data[self.TIMESTAMP], format=self.FORMAT_TIMESTAMP)
session_data[self.TIMESTAMP] = (
session_data[self.TIMESTAMP].astype(int) / 1000000000).astype(int)

session_data = session_data.sort_values([self.TIMESTAMP])
session_data['delta'] = (
session_data[self.TIMESTAMP] - session_data[self.TIMESTAMP].shift()).fillna(0)
session_data = session_data[1:]
d = dict(session_data.delta.value_counts())

for key in list(d.keys()):
if key < self.MIN_INTERVAL:
del d[key]

# Finding the total number of events
total = sum(d.values())
if d and total > self.MIN_HITS:
window, percent = self.percent_grouping(d, total)
if percent > self.THRESHOLD_PERCENT and total > self.MIN_HITS:
beacon = {
'percent': int(percent),
'interval': int(window),
}

return beacon

def find_spider(self, q_job, spider_list):
while not q_job.empty():
session_id = q_job.get()
self.l_df.acquire()
session_data = self.df[self.df['session_id']
== session_id]
self.l_df.release()

query_str = session_data[self.url].apply(
lambda req: self.get_query_str(req)).tolist()
query_data = pd.DataFrame(query_str)

# get args_hash
query_data['args_hash'] = query_data['args_list'].apply(
lambda args: self.get_args_hash(args, session_id))

for i in query_data['args_hash'].unique():
result = {
"detail": {
'percent': {},
'unique': {}
},
"tags": [],
"src_ip": session_data[self.src_ip].tolist()[0],
"url_path": session_data[self.url_path].tolist()[0],
"http_host": session_data[self.http_host].tolist()[0],
"unique_ua": session_data[self.user_agent].unique().shape[0],
"alert": False,
}

df = query_data[query_data['args_hash'] == i]
count_args_length = df['max_length'].iloc[0]
if count_args_length > self.MAX_ARGS_LENGTH:
continue

total_hits = df.shape[0]
if total_hits < self.MIN_HITS:
continue

args_list = df['args_list'].iloc[0]
for i in args_list:
unique_args = len(df[i].unique())
if unique_args == 1:
continue

# Calculate the percentage based on the number of changes in the parameters
current_percent = int((unique_args / total_hits) * 100)
if current_percent < self.THRESHOLD_PERCENT:
continue

result['detail']['percent'][i] = current_percent
result['detail']['unique'][i] = unique_args

# Number of parameters with changes
count_unique_args = len(result['detail']['unique'])
if count_unique_args <= self.MAX_UNIQUE_ARGS:
result['alert'] = True

if not result['detail']['unique']:
continue

# Beacon analysis
if self.beacon_module:
result['beacon'] = self.find_beacon(
session_data.reset_index(drop=True))

result['args_list'] = args_list
result['total_hits'] = total_hits
result['url_sample'] = df['url_sample'].iloc[0]
result['period'] = self.PERIOD

if result['alert']:
result['tags'].append('enumerate')

if result['beacon']:
result['tags'].append('beacon')

if result['unique_ua'] >= self.UA_PROCESSES:
result['tags'].append('suspicious-ua')

self.l_list.acquire()
spider_list.append(result)
self.l_list.release()
q_job.task_done()

def find_spiders(self):
if self.df.empty:
raise Exception(
"Elasticsearch did not retrieve any data. Please ensure your settings are correct inside the config file.")

tqdm.pandas(desc="Detection of Spider Crawlers.")

# get url_path
self.df[self.url_path] = self.df[self.url].str.split('?').str.get(0)

# add session_id from hash fields
self.df['session_id'] = self.df[self.fields['hash']
].progress_apply(lambda row: hash(tuple(row)), axis=1)
# split url
self.df = self.df[self.df[self.url].apply(lambda request: True if len(
request.split('?')) == 2 else False)].reset_index(drop=True)
# normalized url
self.df[self.url] = self.df[self.url].apply(
lambda request: html.unescape(request))
# unique session_id
unique_session = self.df['session_id'].unique()

for session in unique_session:
self.q_job.put(session)

mgr = Manager()
spider_list = mgr.list()
processes = [Process(target=self.find_spider, args=(
self.q_job, spider_list,)) for thread in range(self.NUM_PROCESSES)]

# Run processes
for p in processes:
p.start()

# Exit the completed processes
for p in processes:
p.join()

results = pd.DataFrame(list(spider_list))

# add timestamp
now = datetime.datetime.now().isoformat()
results['timestamp'] = now

if not results.empty:
results = results[results['alert'] == True]

match_log = "Queried rule %s matches %s crawl events" % (
self.rules['name'],
results.shape[0]
)
elastalert_logger.info(match_log)

return results

配置文件
  • Web.yaml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
name: "Detection of Spider Crawlers"
es_host: "es_server"
es_port: 9200
type: "elastalert_modules.spider.my_rules.SpiderRule"

index: "zeek-other-%Y.%m.%d"
use_strftime_index: true

filter:
- term:
host: "canon88.github.io"
- term:
method.keyword: "GET"

include: ["true_client_ip", "host", "uri", "uri_path", "user_agent"]

timestamp:
format: false
timestamp_field: "@timestamp"

buffer_time:
hours: 12

run_every:
minutes: 10

max_query_size: 10000
scroll: true

beacon:
max_args_length: 10 # 最大检测参数个数
min_hits: 120 # 最小命中事件数
max_unique_args: 2 # 最大动态变化参数
threshold_percent: 70 # 请求阈值百分比
threads: 16 # 多进程
beacon_module: true # 开启周期性检测
min_interval: 1 # 最小周期
window: 2 # 抖动窗口
user_agent: 20 # 唯一UA个数

field:
true_client_ip:
alias: src_ip
type: [hash]
host:
alias: http_host
type: [hash]
uri_path:
alias: url_path
type: [hash]
uri:
alias: url
user_agent:
alias: user_agent

output:
json:
enable: yes # 本地输出
path: /var/log/spider/spider_detect.json
redis:
enable: no # 输出至Redis,联动情报数据进行研判。
host: redis_server
port: 6379
db: 0
password: redis_password
key: spider:feeds
field: src_ip

alert:
- debug

告警输出
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
{
"detail": {
"percent": {
"cookieid": 81
},
"unique": {
"cookieid": 133
}
},
"tags": [
"enumerate", // 存在参数遍历行为
"suspicious-ua" // user_agent 超过阈值
],
"src_ip": "54.160.169.250",
"url_path": "/image/cookieId.html",
"http_host": "canon88.github.io",
"unique_ua": 47,
"alert": true,
"beacon": {},
"args_list": [
"cookieid"
],
"total_hits": 164,
"url_sample": "/image/cookieId.html?cookieid=E99A3E54-5A81-2907-1372-339FFB70A464",
"period": "1:00",
"timestamp": "2020-06-02T11:07:59.276581"
}
简述

find_spider: 用于检测参数遍历的行为,这里加上find_beacon是为了增加一个周期性的检测维度。当然很多爬虫都会「自带」时间抖动,以及使用爬虫池,所以效果并不是特别明显。

find_beacon:更适用于检测C2连接,例如针对DNS域名的请求这种情况,这里有一个检测到的域名周期性请求的告警:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
{
"src_ip": "x.x.x.228",
"hostname": "entitlement.service.imperva.com",
"answers": [
"joxkwsf.x.incapdns.net",
"45.60.73.51"
],
"percent": "100",
"interval": 1800,
"occurrences": 23,
"timestamp": "2020-06-01T08:03:38.164363",
"period": 12,
"event_type": "beaconing",
"num_hits": 806379,
"num_matches": 3,
"kibana_url": "https://canon88.github.io/goto/5f089bcc411426b854da71b9062fdc8c"
}

image-20200603113849810

总结

​ 1小时内,IP: 54.160.169.250 总共访问了该接口164次且cookieid参数更换了133次,占到总请求量的81%。并更换了47个不同的user_agent。

image-20200602114610615

参考

Flare

ElastAlert