Week: 6-7 -- NSM at Scale + Threat Hunting
Points: 20
Time estimate: 90 min lab + 4 hr independent
Deliverable: lab-7-report.md + alert screenshots + SIEM correlation rule
Objectives
- Deploy a Suricata cluster and Zeek sensor against the academy NSM corpus.
- Ship EVE JSON and Zeek logs to a SIEM (Wazuh or Elasticsearch).
- Detect three named threat scenarios using the Bejtlich four-data-types framework.
- Write a SIEM correlation rule that fires when a C2 beaconing pattern is confirmed by both Suricata and Zeek data.
Setup
Ensure the academy NSM corpus is present (four pcap files):
ls ~/academy-corpus/
# Expected:
# normal-traffic-24h.pcap
# intrusion-lateral.pcap
# c2-beacon.pcap
# exfiltration-dns.pcap
Start the SIEM:
# Elasticsearch single-node
docker run -d --name elasticsearch \
-e discovery.type=single-node \
-e xpack.security.enabled=false \
-p 9200:9200 \
docker.elastic.co/elasticsearch/elasticsearch:8.12.0
# Kibana
docker run -d --name kibana \
-e ELASTICSEARCH_HOSTS=http://elasticsearch:9200 \
--link elasticsearch \
-p 5601:5601 \
docker.elastic.co/kibana/kibana:8.12.0
# Wait ~2 min then verify
curl http://localhost:9200/_cluster/health | python3 -m json.tool | grep status
Part A: Suricata Analysis of NSM Corpus (30 min)
Run Suricata offline against each corpus pcap:
# Update rules first
sudo suricata-update
sudo suricata-update update-sources
sudo suricata-update enable-source et/open
# Run against C2 beaconing corpus
sudo suricata -r ~/academy-corpus/c2-beacon.pcap \
-l /tmp/suricata-c2/ \
--af-packet=no
# Run against intrusion lateral movement corpus
sudo suricata -r ~/academy-corpus/intrusion-lateral.pcap \
-l /tmp/suricata-intrusion/ \
--af-packet=no
# Run against DNS exfiltration corpus
sudo suricata -r ~/academy-corpus/exfiltration-dns.pcap \
-l /tmp/suricata-dns/ \
--af-packet=no
Extract alert summaries:
# Alert summary from C2 corpus
cat /tmp/suricata-c2/eve.json | python3 -c "
import sys, json
for line in sys.stdin:
try:
e = json.loads(line)
if e.get('event_type') == 'alert':
print(e['alert']['signature'], '|', e.get('src_ip'), '->', e.get('dest_ip'))
except: pass
" | sort | uniq -c | sort -rn | head -20
Record: what alert signatures fire on each corpus pcap?
Part B: Zeek Analysis (30 min)
Run Zeek against each corpus file and analyze logs:
# C2 beaconing analysis
zeek -r ~/academy-corpus/c2-beacon.pcap \
LogAscii::use_json=T
# Identify beaconing pattern in conn.log
cat conn.log | python3 -c "
import sys, json, collections
from datetime import datetime
# Group connections by (src, dst, port)
groups = collections.defaultdict(list)
for line in sys.stdin:
try:
e = json.loads(line)
key = (e.get('id.orig_h'), e.get('id.resp_h'), e.get('id.resp_p'))
groups[key].append(e.get('ts', 0))
except: pass
# Find periodic connections
for key, times in groups.items():
if len(times) >= 5:
times = sorted(times)
intervals = [times[i+1]-times[i] for i in range(len(times)-1)]
mean_interval = sum(intervals) / len(intervals)
std_dev = (sum((i-mean_interval)**2 for i in intervals)/len(intervals))**0.5
jitter_pct = (std_dev / mean_interval * 100) if mean_interval > 0 else 0
if jitter_pct < 10 and len(times) >= 10: # low jitter = periodic = potential beacon
print(f'{key}: {len(times)} connections, interval={mean_interval:.0f}s, jitter={jitter_pct:.1f}%')
"
# DNS exfiltration analysis
zeek -r ~/academy-corpus/exfiltration-dns.pcap LogAscii::use_json=T
# Entropy analysis of DNS query names
cat dns.log | python3 -c "
import sys, json, math, collections
def entropy(s):
freq = collections.Counter(s)
return -sum((f/len(s)) * math.log2(f/len(s)) for f in freq.values()) if s else 0
for line in sys.stdin:
try:
e = json.loads(line)
q = e.get('query', '')
if q and entropy(q) > 3.5:
print(f'High-entropy query: {q} (entropy={entropy(q):.2f})')
except: pass
" | head -20
Part C: SIEM Correlation Rule (30 min)
Write a Wazuh/Elasticsearch correlation rule that fires when BOTH:
- Suricata fires a beacon-related alert from a source IP AND
- Zeek conn.log shows periodic connections (interval jitter < 10%) from the same source IP
Elasticsearch Watcher rule example:
{
"trigger": {"schedule": {"interval": "5m"}},
"input": {
"search": {
"request": {
"indices": ["suricata-*"],
"body": {
"query": {
"bool": {
"must": [
{"term": {"event.kind": "alert"}},
{"match": {"rule.name": "MALWARE"}},
{"range": {"@timestamp": {"gte": "now-1h"}}}
]
}
},
"aggs": {
"beacon_sources": {
"terms": {"field": "source.ip", "size": 100}
}
}
}
}
}
},
"condition": {
"compare": {"ctx.payload.aggregations.beacon_sources.buckets": {"not_eq": []}}
},
"actions": {
"log_beacon_alert": {
"logging": {
"text": "C2 BEACON DETECTED: {{ctx.payload.aggregations.beacon_sources.buckets}}"
}
}
}
}
Verify the rule fires when replaying the c2-beacon.pcap against the live Suricata instance.
Lab Report
Create lab-7-report.md with:
- Alert summary table: top 5 signatures per corpus pcap (Suricata output)
- Beaconing detection output from conn.log analysis (the periodic-connection candidates)
- DNS exfiltration: top 5 high-entropy queries with entropy scores
- Screenshot or JSON output of the SIEM correlation rule firing
- Bejtlich four-data-types mapping: for the C2 beaconing scenario, identify which specific data from each of the four types (full-content, session, statistical, alert) contributed to detection
Grading
| Component | Points |
|---|---|
| Suricata: alert signatures identified for all 3 corpus pcaps | 5 |
| Zeek: beaconing pattern identified with interval + jitter | 5 |
| Zeek: DNS exfiltration high-entropy queries identified | 4 |
| SIEM correlation rule: defined and fires on corpus | 4 |
| Four-data-types mapping: all four types mapped to evidence | 2 |
| Total | 20 |