Module: 10 — LLM-Powered Threat Intelligence Automation
Points: 20
Time estimate: 3 hr lab + 5 hr independent
Deliverable: lab-10-report.md + lab10/ directory
Objectives
- Build a pipeline that fetches CVE data from NVD and extracts structured fields.
- Use an LLM to generate ATLAS technique mappings for each CVE.
- Add a verification step that checks the LLM's mappings against the ATLAS database.
- Generate a weekly threat briefing for a fictional organization's technology stack.
Setup
pip install anthropic requests
mkdir lab10 && cd lab10
# ATLAS technique database (subset for this lab)
cat > atlas_techniques.json << 'EOF'
{
"AML.T0020": "Poison Training Data",
"AML.T0043": "Craft Adversarial Data",
"AML.T0044": "Full ML Model Access",
"AML.T0045": "Upload ML Model",
"AML.T0048": "Backdoor ML Model",
"AML.T0058": "Erode ML Model Integrity",
"AML.T0065": "LLM-Mediated Command and Control",
"AML.T0015": "Evade ML Model",
"AML.T0005": "Develop Capabilities",
"AML.T0031": "Obtain Capabilities",
"AML.T0051": "LLM Prompt Injection",
"AML.T0054": "LLM Jailbreak"
}
EOF
Part A: Fetch and Parse NVD CVE Data (45 min)
Write lab10/nvd_fetch.py:
import requests, json, time, datetime
from pathlib import Path
NVD_BASE = "https://services.nvd.nist.gov/rest/json/cves/2.0"
def fetch_recent_cves(days_back: int = 7, max_results: int = 20) -> list[dict]:
"""Fetch CVEs from NVD published in the last N days."""
end = datetime.datetime.utcnow()
start = end - datetime.timedelta(days=days_back)
params = {
"pubStartDate": start.strftime("%Y-%m-%dT%H:%M:%S.000"),
"pubEndDate": end.strftime("%Y-%m-%dT%H:%M:%S.000"),
"resultsPerPage": max_results,
}
time.sleep(0.6) # respect NVD rate limit (5 req/30s without API key)
resp = requests.get(NVD_BASE, params=params, timeout=30)
resp.raise_for_status()
return resp.json().get("vulnerabilities", [])
def extract_fields(vuln: dict) -> dict:
cve = vuln["cve"]
desc = next(
(d["value"] for d in cve.get("descriptions", []) if d["lang"] == "en"), ""
)
metrics = cve.get("metrics", {})
cvss3 = metrics.get("cvssMetricV31", [{}])[0].get("cvssData", {})
cwes = [
w["description"][0]["value"]
for w in cve.get("weaknesses", [])
if w.get("description")
]
products = [
f"{c.get('criteria', '')}".split(":")[3] if ":" in c.get("criteria", "") else ""
for node in vuln.get("cve", {}).get("configurations", [])
for match in node.get("nodes", []) or [node]
for c in match.get("cpeMatch", [])
]
return {
"id": cve["id"],
"description": desc[:400],
"cvss_score": cvss3.get("baseScore"),
"severity": cvss3.get("baseSeverity"),
"cwes": cwes[:3],
"products": list(set(p for p in products if p))[:5],
"published": cve.get("published", "")[:10],
}
if __name__ == "__main__":
cves = fetch_recent_cves(days_back=14, max_results=15)
parsed = [extract_fields(v) for v in cves]
Path("cves_raw.json").write_text(json.dumps(parsed, indent=2))
print(f"Fetched {len(parsed)} CVEs")
for c in parsed[:3]:
print(f" {c['id']} [{c['severity']}] {c['description'][:80]}...")
Run it and confirm cves_raw.json is populated with at least 5 CVEs.
If NVD is rate-limited or unavailable: use the provided fixture:
cp /opt/virtus-academy/lab-fixtures/ai-201/lab10-cves.json lab10/cves_raw.json
Part B: LLM ATLAS Enrichment (45 min)
Write lab10/enrich.py:
import anthropic, json, re
from pathlib import Path
client = anthropic.Anthropic()
atlas = json.loads(Path("atlas_techniques.json").read_text())
cves = json.loads(Path("cves_raw.json").read_text())
PROMPT = """You are a security analyst specializing in AI/ML system attacks.
Given the CVE description below, produce a JSON object with exactly these fields:
- "attack_vector": one of ["network", "adjacent", "local", "physical", "unknown"]
- "affected_component": short string naming the vulnerable component (e.g. "nginx web server", "Python pickle loader")
- "atlas_technique": one of these ATLAS technique IDs: {techniques}
Use null if none applies.
- "atlas_rationale": one sentence explaining the mapping (or "N/A -- not an ML-system vulnerability")
- "exploitability": one of ["public_exploit", "poc_available", "theoretical", "unknown"]
- "stack_keywords": list of 2-5 technology keywords (e.g. ["python", "flask", "langchain"])
CVE: {cve_id}
Description: {description}
Return ONLY the JSON object."""
def enrich(cve: dict) -> dict:
prompt = PROMPT.format(
techniques=list(atlas.keys()),
cve_id=cve["id"],
description=cve["description"],
)
resp = client.messages.create(
model="claude-haiku-4-5-20251001",
max_tokens=400,
temperature=0.0,
messages=[{"role": "user", "content": prompt}],
)
text = resp.content[0].text
match = re.search(r'\{.*\}', text, re.DOTALL)
if not match:
return {"error": "no JSON", "raw": text[:200]}
try:
return {**cve, **json.loads(match.group())}
except json.JSONDecodeError as e:
return {"error": str(e), "raw": match.group()[:200]}
enriched = []
for cve in cves:
result = enrich(cve)
enriched.append(result)
status = result.get("atlas_technique", "null")
print(f" {cve['id']}: {status}")
Path("cves_enriched.json").write_text(json.dumps(enriched, indent=2))
print(f"\nEnriched {len(enriched)} CVEs -> cves_enriched.json")
Part C: Verification Against ATLAS Database (30 min)
Write lab10/verify.py:
import json
from pathlib import Path
atlas = json.loads(Path("atlas_techniques.json").read_text())
enriched = json.loads(Path("cves_enriched.json").read_text())
# Org technology stack (fictional)
ORG_STACK = {"python", "flask", "langchain", "postgresql", "nginx", "ubuntu", "redis"}
stats = {"atlas_mapped": 0, "invalid_id": 0, "null": 0, "stack_relevant": 0, "errors": 0}
print("=== ATLAS Mapping Verification ===\n")
for cve in enriched:
if "error" in cve:
stats["errors"] += 1
continue
tech_id = cve.get("atlas_technique")
keywords = set(k.lower() for k in cve.get("stack_keywords", []))
stack_match = bool(keywords & ORG_STACK)
if tech_id is None:
stats["null"] += 1
status = "NOT-ML-SYSTEM"
elif tech_id not in atlas:
stats["invalid_id"] += 1
status = f"INVALID-ID ({tech_id})"
else:
stats["atlas_mapped"] += 1
status = f"VALID ({tech_id}: {atlas[tech_id]})"
if stack_match:
stats["stack_relevant"] += 1
flag = " [STACK-MATCH]"
else:
flag = ""
print(f" {cve['id']}: {status}{flag}")
print(f"\nSummary:")
for k, v in stats.items():
print(f" {k}: {v}")
In lab-10-report.md:
- How many CVEs received a valid ATLAS mapping? How many got
null? - Were any mapping IDs invalid (hallucinated)? If so, which CVE and what did the model return?
- How many CVEs matched your organization's technology stack?
- For one CVE with a valid ATLAS mapping, explain whether the mapping is correct given the CVE description.
Part D: Generate a Weekly Threat Briefing (45 min)
Write lab10/briefing.py:
import anthropic, json
from pathlib import Path
client = anthropic.Anthropic()
enriched = json.loads(Path("cves_enriched.json").read_text())
atlas = json.loads(Path("atlas_techniques.json").read_text())
# Filter to stack-relevant, valid-mapped CVEs
ORG_STACK = {"python", "flask", "langchain", "postgresql", "nginx", "ubuntu", "redis"}
relevant = [
c for c in enriched
if not c.get("error")
and bool({k.lower() for k in c.get("stack_keywords", [])} & ORG_STACK)
and (c.get("cvss_score") or 0) >= 6.0
]
if not relevant:
print("No stack-relevant high-severity CVEs this week.")
else:
summary_data = [
f"- {c['id']} (CVSS {c.get('cvss_score','N/A')}): {c['description'][:150]}"
f" [ATLAS: {c.get('atlas_technique', 'N/A')}]"
for c in relevant[:5]
]
briefing_prompt = f"""You are a security analyst writing a weekly threat briefing.
The following CVEs from the past 7 days match our organization's technology stack
(Python, Flask, LangChain, PostgreSQL, nginx, Ubuntu, Redis).
CVEs:
{chr(10).join(summary_data)}
Write a 200-300 word threat briefing addressed to a non-technical CISO.
Include: the most critical CVE and why, the recommended immediate action,
and one forward-looking concern about AI/ML pipeline security.
Do not reproduce CVE description text verbatim."""
resp = client.messages.create(
model="claude-haiku-4-5-20251001",
max_tokens=500,
temperature=0.3,
messages=[{"role": "user", "content": briefing_prompt}],
)
briefing_text = resp.content[0].text
Path("weekly_briefing.md").write_text(f"# Weekly Threat Briefing\n\n{briefing_text}")
print(briefing_text)
In lab-10-report.md:
- Paste the first paragraph of your generated briefing.
- Evaluate the briefing against two criteria: (a) is the most critical CVE correctly identified, and (b) is the recommended action specific and actionable?
- Identify one way an attacker could use the same pipeline against your organization. What information does the pipeline's query parameters reveal about the technology stack?
Grading
| Component | Points |
|---|---|
Part A: NVD fetch successful; cves_raw.json present with 5+ CVEs |
4 |
Part B: enrichment runs; cves_enriched.json present |
5 |
| Part C: verification output; hallucination check answered; stack analysis | 6 |
| Part D: briefing generated; dual-use attack identified | 5 |