Classroom Glossary Public page

Lab 10: Automated CVE Triage and ATLAS Enrichment Pipeline

295 words

Module: 10 — LLM-Powered Threat Intelligence Automation
Points: 20
Time estimate: 3 hr lab + 5 hr independent
Deliverable: lab-10-report.md + lab10/ directory


Objectives

  1. Build a pipeline that fetches CVE data from NVD and extracts structured fields.
  2. Use an LLM to generate ATLAS technique mappings for each CVE.
  3. Add a verification step that checks the LLM's mappings against the ATLAS database.
  4. Generate a weekly threat briefing for a fictional organization's technology stack.

Setup

pip install anthropic requests
mkdir lab10 && cd lab10

# ATLAS technique database (subset for this lab)
cat > atlas_techniques.json << 'EOF'
{
  "AML.T0020": "Poison Training Data",
  "AML.T0043": "Craft Adversarial Data",
  "AML.T0044": "Full ML Model Access",
  "AML.T0045": "Upload ML Model",
  "AML.T0048": "Backdoor ML Model",
  "AML.T0058": "Erode ML Model Integrity",
  "AML.T0065": "LLM-Mediated Command and Control",
  "AML.T0015": "Evade ML Model",
  "AML.T0005": "Develop Capabilities",
  "AML.T0031": "Obtain Capabilities",
  "AML.T0051": "LLM Prompt Injection",
  "AML.T0054": "LLM Jailbreak"
}
EOF

Part A: Fetch and Parse NVD CVE Data (45 min)

Write lab10/nvd_fetch.py:

import requests, json, time, datetime
from pathlib import Path

NVD_BASE = "https://services.nvd.nist.gov/rest/json/cves/2.0"

def fetch_recent_cves(days_back: int = 7, max_results: int = 20) -> list[dict]:
    """Fetch CVEs from NVD published in the last N days."""
    end   = datetime.datetime.utcnow()
    start = end - datetime.timedelta(days=days_back)
    params = {
        "pubStartDate":   start.strftime("%Y-%m-%dT%H:%M:%S.000"),
        "pubEndDate":     end.strftime("%Y-%m-%dT%H:%M:%S.000"),
        "resultsPerPage": max_results,
    }
    time.sleep(0.6)   # respect NVD rate limit (5 req/30s without API key)
    resp = requests.get(NVD_BASE, params=params, timeout=30)
    resp.raise_for_status()
    return resp.json().get("vulnerabilities", [])

def extract_fields(vuln: dict) -> dict:
    cve  = vuln["cve"]
    desc = next(
        (d["value"] for d in cve.get("descriptions", []) if d["lang"] == "en"), ""
    )
    metrics  = cve.get("metrics", {})
    cvss3    = metrics.get("cvssMetricV31", [{}])[0].get("cvssData", {})
    cwes     = [
        w["description"][0]["value"]
        for w in cve.get("weaknesses", [])
        if w.get("description")
    ]
    products = [
        f"{c.get('criteria', '')}".split(":")[3] if ":" in c.get("criteria", "") else ""
        for node in vuln.get("cve", {}).get("configurations", [])
        for match in node.get("nodes", []) or [node]
        for c in match.get("cpeMatch", [])
    ]
    return {
        "id":          cve["id"],
        "description": desc[:400],
        "cvss_score":  cvss3.get("baseScore"),
        "severity":    cvss3.get("baseSeverity"),
        "cwes":        cwes[:3],
        "products":    list(set(p for p in products if p))[:5],
        "published":   cve.get("published", "")[:10],
    }

if __name__ == "__main__":
    cves = fetch_recent_cves(days_back=14, max_results=15)
    parsed = [extract_fields(v) for v in cves]
    Path("cves_raw.json").write_text(json.dumps(parsed, indent=2))
    print(f"Fetched {len(parsed)} CVEs")
    for c in parsed[:3]:
        print(f"  {c['id']} [{c['severity']}] {c['description'][:80]}...")

Run it and confirm cves_raw.json is populated with at least 5 CVEs.

If NVD is rate-limited or unavailable: use the provided fixture:

cp /opt/virtus-academy/lab-fixtures/ai-201/lab10-cves.json lab10/cves_raw.json

Part B: LLM ATLAS Enrichment (45 min)

Write lab10/enrich.py:

import anthropic, json, re
from pathlib import Path

client = anthropic.Anthropic()
atlas = json.loads(Path("atlas_techniques.json").read_text())
cves  = json.loads(Path("cves_raw.json").read_text())

PROMPT = """You are a security analyst specializing in AI/ML system attacks.
Given the CVE description below, produce a JSON object with exactly these fields:

- "attack_vector": one of ["network", "adjacent", "local", "physical", "unknown"]
- "affected_component": short string naming the vulnerable component (e.g. "nginx web server", "Python pickle loader")
- "atlas_technique": one of these ATLAS technique IDs: {techniques}
  Use null if none applies.
- "atlas_rationale": one sentence explaining the mapping (or "N/A -- not an ML-system vulnerability")
- "exploitability": one of ["public_exploit", "poc_available", "theoretical", "unknown"]
- "stack_keywords": list of 2-5 technology keywords (e.g. ["python", "flask", "langchain"])

CVE: {cve_id}
Description: {description}

Return ONLY the JSON object."""

def enrich(cve: dict) -> dict:
    prompt = PROMPT.format(
        techniques=list(atlas.keys()),
        cve_id=cve["id"],
        description=cve["description"],
    )
    resp = client.messages.create(
        model="claude-haiku-4-5-20251001",
        max_tokens=400,
        temperature=0.0,
        messages=[{"role": "user", "content": prompt}],
    )
    text = resp.content[0].text
    match = re.search(r'\{.*\}', text, re.DOTALL)
    if not match:
        return {"error": "no JSON", "raw": text[:200]}
    try:
        return {**cve, **json.loads(match.group())}
    except json.JSONDecodeError as e:
        return {"error": str(e), "raw": match.group()[:200]}

enriched = []
for cve in cves:
    result = enrich(cve)
    enriched.append(result)
    status = result.get("atlas_technique", "null")
    print(f"  {cve['id']}: {status}")

Path("cves_enriched.json").write_text(json.dumps(enriched, indent=2))
print(f"\nEnriched {len(enriched)} CVEs -> cves_enriched.json")

Part C: Verification Against ATLAS Database (30 min)

Write lab10/verify.py:

import json
from pathlib import Path

atlas    = json.loads(Path("atlas_techniques.json").read_text())
enriched = json.loads(Path("cves_enriched.json").read_text())

# Org technology stack (fictional)
ORG_STACK = {"python", "flask", "langchain", "postgresql", "nginx", "ubuntu", "redis"}

stats = {"atlas_mapped": 0, "invalid_id": 0, "null": 0, "stack_relevant": 0, "errors": 0}

print("=== ATLAS Mapping Verification ===\n")
for cve in enriched:
    if "error" in cve:
        stats["errors"] += 1
        continue

    tech_id = cve.get("atlas_technique")
    keywords = set(k.lower() for k in cve.get("stack_keywords", []))
    stack_match = bool(keywords & ORG_STACK)

    if tech_id is None:
        stats["null"] += 1
        status = "NOT-ML-SYSTEM"
    elif tech_id not in atlas:
        stats["invalid_id"] += 1
        status = f"INVALID-ID ({tech_id})"
    else:
        stats["atlas_mapped"] += 1
        status = f"VALID ({tech_id}: {atlas[tech_id]})"

    if stack_match:
        stats["stack_relevant"] += 1
        flag = " [STACK-MATCH]"
    else:
        flag = ""

    print(f"  {cve['id']}: {status}{flag}")

print(f"\nSummary:")
for k, v in stats.items():
    print(f"  {k}: {v}")

In lab-10-report.md:

  1. How many CVEs received a valid ATLAS mapping? How many got null?
  2. Were any mapping IDs invalid (hallucinated)? If so, which CVE and what did the model return?
  3. How many CVEs matched your organization's technology stack?
  4. For one CVE with a valid ATLAS mapping, explain whether the mapping is correct given the CVE description.

Part D: Generate a Weekly Threat Briefing (45 min)

Write lab10/briefing.py:

import anthropic, json
from pathlib import Path

client   = anthropic.Anthropic()
enriched = json.loads(Path("cves_enriched.json").read_text())
atlas    = json.loads(Path("atlas_techniques.json").read_text())

# Filter to stack-relevant, valid-mapped CVEs
ORG_STACK = {"python", "flask", "langchain", "postgresql", "nginx", "ubuntu", "redis"}

relevant = [
    c for c in enriched
    if not c.get("error")
    and bool({k.lower() for k in c.get("stack_keywords", [])} & ORG_STACK)
    and (c.get("cvss_score") or 0) >= 6.0
]

if not relevant:
    print("No stack-relevant high-severity CVEs this week.")
else:
    summary_data = [
        f"- {c['id']} (CVSS {c.get('cvss_score','N/A')}): {c['description'][:150]}"
        f" [ATLAS: {c.get('atlas_technique', 'N/A')}]"
        for c in relevant[:5]
    ]

    briefing_prompt = f"""You are a security analyst writing a weekly threat briefing.
The following CVEs from the past 7 days match our organization's technology stack
(Python, Flask, LangChain, PostgreSQL, nginx, Ubuntu, Redis).

CVEs:
{chr(10).join(summary_data)}

Write a 200-300 word threat briefing addressed to a non-technical CISO.
Include: the most critical CVE and why, the recommended immediate action,
and one forward-looking concern about AI/ML pipeline security.
Do not reproduce CVE description text verbatim."""

    resp = client.messages.create(
        model="claude-haiku-4-5-20251001",
        max_tokens=500,
        temperature=0.3,
        messages=[{"role": "user", "content": briefing_prompt}],
    )
    briefing_text = resp.content[0].text
    Path("weekly_briefing.md").write_text(f"# Weekly Threat Briefing\n\n{briefing_text}")
    print(briefing_text)

In lab-10-report.md:

  1. Paste the first paragraph of your generated briefing.
  2. Evaluate the briefing against two criteria: (a) is the most critical CVE correctly identified, and (b) is the recommended action specific and actionable?
  3. Identify one way an attacker could use the same pipeline against your organization. What information does the pipeline's query parameters reveal about the technology stack?

Grading

Component Points
Part A: NVD fetch successful; cves_raw.json present with 5+ CVEs 4
Part B: enrichment runs; cves_enriched.json present 5
Part C: verification output; hallucination check answered; stack analysis 6
Part D: briefing generated; dual-use attack identified 5