Classroom Glossary Public page

Module 10: LLM-Powered Threat Intelligence Automation

1,086 words

Duration: 2 hr lecture + 3 hr lab + 5 hr independent
Lab: Lab 10 (Automated CVE triage and ATLAS enrichment pipeline)
MITRE ATLAS tactics: (defensive use); Reconnaissance AML.TA0002 (adversarial misuse)
Foundational weave: Mitchell Ch 7 (language and context) — LLMs as information processors


10.1 The Dual-Use Frame

Every capability covered in Modules 1-9 has a dual-use structure: the same technique that attacks a system can be used to defend one. Prompt injection can be used to extract data -- or to probe whether a deployed model leaks data in response to injection attempts. HarmBench can be used to jailbreak models -- or to validate that a model meets a safety threshold before deployment.

Module 10 pivots from the attack-focus of Modules 3-9 to a defensive use case: LLM-powered threat intelligence automation. The goal is to build a pipeline that accelerates the defender's workflow -- ingesting vulnerability data, enriching it with semantic analysis, and generating actionable output faster than a human analyst can.

The dual-use connection must be stated explicitly: the same pipeline that helps a defender triage CVEs helps an attacker discover exploitable vulnerabilities in target organizations' technology stacks. The architecture is identical; the deployment intent is different. Understanding both directions of the dual-use frame is the prerequisite for designing pipelines that benefit defenders without creating attacker enablement.


10.2 CVE Triage as an LLM Task

A CVE (Common Vulnerabilities and Exposures) entry contains structured metadata (ID, CVSS score, CWE classification, affected products) and unstructured description text (the narrative explaining the vulnerability).

The structured metadata is machine-readable but context-free. The description text is context-rich but not machine-queryable. A human analyst bridges this gap: reading the description, interpreting the technical details, mapping the vulnerability to the organization's specific technology stack, and assessing exploitability in context.

LLMs are well-suited to this bridging task. They can:

  • Extract structured fields from unstructured description text (CWE category, affected component, attack vector)
  • Generate CVSS sub-score justifications from description text
  • Map the vulnerability to an ATLAS technique based on the attack pattern described
  • Filter for relevance to a specific technology stack from a list of affected products

The Mitchell Chapter 7 connection: Chapter 7 covers how language models encode contextual meaning -- the same word has different meaning in different context windows. A CVE description that says "allows unauthenticated remote code execution" carries high-severity meaning that the model, trained on security text, will correctly associate with critical CVSS scores. The model's linguistic context matches the analyst's domain context.


10.3 NVD Feed Integration

The National Vulnerability Database (NVD) provides a JSON feed of CVEs updated daily. The feed format (CVE_Items list, each with cve, configurations, impact, publishedDate) is the standard intake for vulnerability management tools.

import requests, json, datetime

NVD_BASE = "https://services.nvd.nist.gov/rest/json/cves/2.0"

def fetch_recent_cves(days_back: int = 7, results_per_page: int = 20) -> list[dict]:
    """Fetch CVEs published in the last N days from NVD API v2.0."""
    end   = datetime.datetime.utcnow()
    start = end - datetime.timedelta(days=days_back)
    params = {
        "pubStartDate": start.strftime("%Y-%m-%dT%H:%M:%S.000"),
        "pubEndDate":   end.strftime("%Y-%m-%dT%H:%M:%S.000"),
        "resultsPerPage": results_per_page,
    }
    resp = requests.get(NVD_BASE, params=params, timeout=30)
    resp.raise_for_status()
    return resp.json().get("vulnerabilities", [])

def extract_cve_fields(vuln: dict) -> dict:
    """Extract key fields from NVD API v2.0 vulnerability entry."""
    cve  = vuln["cve"]
    desc = next(
        (d["value"] for d in cve.get("descriptions", []) if d["lang"] == "en"),
        ""
    )
    metrics = cve.get("metrics", {})
    cvss_v3 = metrics.get("cvssMetricV31", [{}])[0].get("cvssData", {})
    return {
        "id":           cve["id"],
        "description":  desc,
        "cvss_score":   cvss_v3.get("baseScore"),
        "cvss_vector":  cvss_v3.get("vectorString"),
        "severity":     cvss_v3.get("baseSeverity"),
        "cwes":         [w["description"][0]["value"]
                         for w in cve.get("weaknesses", [])
                         if w.get("description")],
        "published":    cve.get("published"),
    }

10.4 LLM Enrichment with JSON Schema Enforcement

The enrichment step sends each CVE description to an LLM and requests structured output. Reliable structured output requires two constraints: a well-formed prompt that specifies the output schema, and output validation that rejects non-conforming responses.

import json, re
from typing import Any

ATLAS_TECHNIQUES = {
    "AML.T0020": "Poison Training Data",
    "AML.T0043": "Craft Adversarial Data",
    "AML.T0044": "Full ML Model Access",
    "AML.T0048": "Backdoor ML Model",
    "AML.T0058": "Erode ML Model Integrity",
    "AML.T0065": "LLM-Mediated C2",
    # ... full ATLAS technique list
}

ENRICHMENT_PROMPT = """You are a security analyst. Given the CVE description below, 
produce a JSON object with exactly these fields:
- "attack_vector": one of ["network", "adjacent", "local", "physical"]
- "attack_complexity": one of ["low", "high"]
- "affected_component": short string naming the vulnerable component
- "atlas_technique": the most applicable MITRE ATLAS technique ID from {techniques}
- "atlas_rationale": one sentence explaining the mapping
- "exploitability": one of ["public_exploit", "poc_available", "theoretical", "unknown"]
- "stack_keywords": list of technology stack keywords (e.g. ["nginx", "linux", "python"])

CVE Description:
{description}

Return only the JSON object. No other text."""

def enrich_cve(description: str, llm_client) -> dict[str, Any]:
    prompt = ENRICHMENT_PROMPT.format(
        techniques=list(ATLAS_TECHNIQUES.keys()),
        description=description[:2000],   # truncate; descriptions rarely exceed 500 chars
    )
    response = llm_client.complete(prompt, max_tokens=512, temperature=0.0)
    # Extract JSON even if the model adds prose around it
    match = re.search(r'\{.*\}', response, re.DOTALL)
    if not match:
        raise ValueError(f"No JSON in LLM response: {response[:200]}")
    return json.loads(match.group())

Temperature = 0.0 for structured output. Non-zero temperature introduces randomness into JSON field values; a CVSS vector string or ATLAS ID cannot be "creative." Zero temperature makes the model deterministic and maximizes format compliance.


10.5 Hallucination Mitigation in Security Contexts

LLM hallucination in a threat intelligence pipeline is not an abstract concern -- it is a concrete failure mode with security consequences. A hallucinated ATLAS technique mapping (e.g., the model assigns AML.T0020 to a network vulnerability that has nothing to do with training data) misleads the analyst. A hallucinated "public_exploit" flag triggers an emergency response for a vulnerability that has no known exploit.

Three mitigations for security-context hallucination:

1. Closed-world enumeration. Enumerate all valid values for categorical fields in the prompt. The LLM cannot hallucinate a value outside the closed world because the prompt defines the valid values explicitly. This works for ATLAS technique IDs, attack vectors, and severity levels -- all of which have finite enumeration.

2. Post-generation verification. After the LLM returns a mapping, verify it against an authoritative source:

def verify_atlas_mapping(technique_id: str) -> bool:
    """Check whether the technique ID exists in the ATLAS database."""
    # atlas_db is a pre-loaded dict of valid technique IDs
    return technique_id in ATLAS_TECHNIQUES

def verify_stack_relevance(stack_keywords: list[str], org_stack: set[str]) -> bool:
    """Check whether at least one keyword matches the org's technology stack."""
    return bool(set(k.lower() for k in stack_keywords) & org_stack)

3. Confidence-aware output. Ask the LLM to express uncertainty. A prompt that includes "if you are not confident in the ATLAS mapping, use atlas_technique: null and explain why in atlas_rationale" produces more honest output than a prompt that demands a definitive answer.


10.6 The Adversarial Flip: Attacker Use Cases

The same pipeline capabilities that assist defenders also accelerate attacker reconnaissance:

Defender use Adversarial flip
Filter CVEs matching org stack Enumerate target org's vulnerable components from job postings, GitHub, Shodan
Generate ATLAS technique mappings Identify applicable attack techniques for a target's known CVEs
Build regression test suite for model safety Identify gaps in model safety coverage to target
Produce weekly threat briefing Produce targeted spear-phishing content referencing authentic CVE details

The ATLAS Reconnaissance tactic (AML.TA0002) covers the adversarial use of publicly available information to profile ML-based targets. The same NVD feed that a defender uses for CVE triage is the feed an attacker uses to identify unpatched AI-system dependencies.

The pipeline design implication: a threat intelligence automation system should not expose its technology stack filter configuration externally. An attacker who knows the organization is filtering for "nginx + python + langchain" CVEs has inferred the organization's technology stack.


10.7 Rate Limiting and Cost Management

Production threat intel pipelines hit NVD rate limits and LLM token costs at scale. The NVD API enforces a 5-request-per-30-second limit without an API key (50/30s with a key). A pipeline that fetches 200 CVEs per day without rate limiting will be blocked within minutes.

import time, functools
from typing import Callable

def rate_limited(calls_per_period: int, period_seconds: float):
    """Decorator that enforces a rate limit on any function."""
    min_interval = period_seconds / calls_per_period
    last_call_time = [0.0]
    def decorator(fn: Callable) -> Callable:
        @functools.wraps(fn)
        def wrapper(*args, **kwargs):
            elapsed = time.time() - last_call_time[0]
            if elapsed < min_interval:
                time.sleep(min_interval - elapsed)
            last_call_time[0] = time.time()
            return fn(*args, **kwargs)
        return wrapper
    return decorator

@rate_limited(calls_per_period=5, period_seconds=30)
def fetch_cve_page(start_index: int) -> dict:
    ...

LLM token costs: enriching 200 CVEs at ~600 tokens per enrichment call (prompt + completion) costs ~120,000 tokens. At a typical open-weight model cost (if using a commercial API) this is non-trivial. Mitigations: batch multiple CVE descriptions in one prompt (with careful prompt engineering to avoid cross-contamination), use a smaller model for initial triage and a larger model for high-severity CVEs only.


Reflection prompts

  1. Your CVE enrichment pipeline assigns atlas_technique: AML.T0043 (Craft Adversarial Data) to a CVE that describes a SQL injection in an ML model's data ingestion endpoint. Is this mapping correct? What ATLAS technique would a more careful analyst assign, and how would you fix the prompt to get the correct answer?
  2. An attacker queries your threat briefing endpoint (which is publicly accessible) with org_stack = ["llm-api", "vector-db", "rag"]. What does the attacker learn from the response, and how would you restrict the endpoint to prevent this information disclosure?
  3. Your pipeline uses zero-temperature completion for JSON extraction but finds that 15% of responses still contain non-JSON prose. Propose two prompt engineering changes that would reduce this rate without switching to a larger model.