AgentSkillsCN

cisa-kev-nvd

处理并整合 CISA 已知漏洞利用目录(KEV)与 NVD(国家漏洞数据库)数据。适用于在丰富漏洞数据时,添加 KEV 状态信息、获取 CVE 详情、追踪修复期限、构建漏洞情报源,或将威胁情报集成到安全工作流中时使用。

SKILL.md
--- frontmatter
name: cisa-kev-nvd
description: "Process and integrate CISA Known Exploited Vulnerabilities (KEV) catalog and NVD (National Vulnerability Database) data. Use when enriching vulnerability data with KEV status, fetching CVE details, tracking remediation deadlines, building vulnerability feeds, or integrating threat intelligence into security workflows."

CISA KEV & NVD Data Processor

Overview

Fetch, parse, and integrate authoritative vulnerability data from CISA KEV and NIST NVD for enrichment and compliance tracking.

Data Sources

CISA KEV Integration

python
import requests
from datetime import datetime

KEV_JSON_URL = "https://www.cisa.gov/sites/default/files/feeds/known_exploited_vulnerabilities.json"
KEV_CSV_URL = "https://www.cisa.gov/sites/default/files/csv/known_exploited_vulnerabilities.csv"

def fetch_kev_catalog() -> dict:
    """Fetch current CISA KEV catalog."""
    response = requests.get(KEV_JSON_URL)
    response.raise_for_status()
    return response.json()

def parse_kev_catalog(kev_data: dict) -> list:
    """Parse KEV catalog into structured format."""
    vulns = []
    for v in kev_data.get("vulnerabilities", []):
        vulns.append({
            "cve_id": v.get("cveID"),
            "vendor": v.get("vendorProject"),
            "product": v.get("product"),
            "vulnerability_name": v.get("vulnerabilityName"),
            "date_added": v.get("dateAdded"),
            "due_date": v.get("dueDate"),
            "short_description": v.get("shortDescription"),
            "required_action": v.get("requiredAction"),
            "notes": v.get("notes"),
            "known_ransomware": v.get("knownRansomwareCampaignUse", "Unknown")
        })
    return vulns

def build_kev_lookup(kev_vulns: list) -> dict:
    """Build CVE -> KEV info lookup dictionary."""
    return {v["cve_id"]: v for v in kev_vulns}

def check_kev_status(cve_id: str, kev_lookup: dict) -> dict:
    """Check if CVE is in KEV catalog."""
    kev_info = kev_lookup.get(cve_id)
    if kev_info:
        due = datetime.strptime(kev_info["due_date"], "%Y-%m-%d")
        overdue = datetime.now() > due
        return {
            "is_kev": True,
            "due_date": kev_info["due_date"],
            "overdue": overdue,
            "days_until_due": (due - datetime.now()).days if not overdue else 0,
            "required_action": kev_info["required_action"],
            "ransomware_related": kev_info["known_ransomware"] == "Known"
        }
    return {"is_kev": False}

NVD API Integration

python
NVD_API_URL = "https://services.nvd.nist.gov/rest/json/cves/2.0"

def fetch_cve(cve_id: str, api_key: str = None) -> dict:
    """Fetch CVE details from NVD."""
    headers = {"apiKey": api_key} if api_key else {}
    params = {"cveId": cve_id}
    
    response = requests.get(NVD_API_URL, headers=headers, params=params)
    response.raise_for_status()
    
    data = response.json()
    vulns = data.get("vulnerabilities", [])
    return vulns[0]["cve"] if vulns else None

def search_cves(keyword: str = None, cpe_name: str = None, 
                cvss_severity: str = None, pub_start: str = None,
                pub_end: str = None, api_key: str = None,
                results_per_page: int = 100) -> list:
    """Search NVD for CVEs."""
    headers = {"apiKey": api_key} if api_key else {}
    params = {"resultsPerPage": results_per_page}
    
    if keyword:
        params["keywordSearch"] = keyword
    if cpe_name:
        params["cpeName"] = cpe_name
    if cvss_severity:
        params["cvssV3Severity"] = cvss_severity  # LOW, MEDIUM, HIGH, CRITICAL
    if pub_start:
        params["pubStartDate"] = pub_start  # ISO format
    if pub_end:
        params["pubEndDate"] = pub_end
    
    response = requests.get(NVD_API_URL, headers=headers, params=params)
    response.raise_for_status()
    
    return [v["cve"] for v in response.json().get("vulnerabilities", [])]

def parse_nvd_cve(cve_data: dict) -> dict:
    """Parse NVD CVE response into structured format."""
    metrics = cve_data.get("metrics", {})
    
    # Get CVSS v3.1 or v3.0
    cvss_v3 = metrics.get("cvssMetricV31", metrics.get("cvssMetricV30", [{}]))[0] if metrics else {}
    cvss_data = cvss_v3.get("cvssData", {})
    
    descriptions = cve_data.get("descriptions", [])
    en_desc = next((d["value"] for d in descriptions if d["lang"] == "en"), "")
    
    references = [ref["url"] for ref in cve_data.get("references", [])]
    
    return {
        "cve_id": cve_data.get("id"),
        "description": en_desc,
        "published": cve_data.get("published"),
        "modified": cve_data.get("lastModified"),
        "cvss_score": cvss_data.get("baseScore"),
        "cvss_severity": cvss_data.get("baseSeverity"),
        "cvss_vector": cvss_data.get("vectorString"),
        "attack_vector": cvss_data.get("attackVector"),
        "attack_complexity": cvss_data.get("attackComplexity"),
        "privileges_required": cvss_data.get("privilegesRequired"),
        "user_interaction": cvss_data.get("userInteraction"),
        "scope": cvss_data.get("scope"),
        "exploitability_score": cvss_v3.get("exploitabilityScore"),
        "impact_score": cvss_v3.get("impactScore"),
        "references": references,
        "cwe_ids": [w.get("value") for w in cve_data.get("weaknesses", [{}])[0].get("description", []) if w.get("lang") == "en"]
    }

EPSS Integration

python
EPSS_API_URL = "https://api.first.org/data/v1/epss"

def fetch_epss(cve_ids: list) -> dict:
    """Fetch EPSS scores for CVEs."""
    # EPSS API accepts comma-separated CVE IDs
    params = {"cve": ",".join(cve_ids)}
    response = requests.get(EPSS_API_URL, params=params)
    response.raise_for_status()
    
    data = response.json()
    return {
        item["cve"]: {
            "epss_score": float(item["epss"]),
            "percentile": float(item["percentile"]),
            "date": item["date"]
        }
        for item in data.get("data", [])
    }

def get_epss_score(cve_id: str) -> dict:
    """Get EPSS score for single CVE."""
    result = fetch_epss([cve_id])
    return result.get(cve_id, {"epss_score": None, "percentile": None})

Enrichment Pipeline

python
def enrich_vulnerability(vuln: dict, kev_lookup: dict, 
                         nvd_api_key: str = None) -> dict:
    """Enrich vulnerability with KEV, NVD, and EPSS data."""
    cve_id = vuln.get("cve_id")
    if not cve_id:
        return vuln
    
    enriched = vuln.copy()
    
    # KEV status
    kev_status = check_kev_status(cve_id, kev_lookup)
    enriched.update(kev_status)
    
    # NVD details (if not already present)
    if not enriched.get("cvss_score"):
        nvd_data = fetch_cve(cve_id, nvd_api_key)
        if nvd_data:
            parsed = parse_nvd_cve(nvd_data)
            enriched["cvss_score"] = parsed.get("cvss_score")
            enriched["cvss_vector"] = parsed.get("cvss_vector")
            enriched["description"] = parsed.get("description")
    
    # EPSS score
    epss = get_epss_score(cve_id)
    enriched["epss_score"] = epss.get("epss_score")
    enriched["epss_percentile"] = epss.get("percentile")
    
    return enriched

def batch_enrich(vulns: list, kev_lookup: dict = None, 
                 nvd_api_key: str = None) -> list:
    """Batch enrich vulnerabilities."""
    if kev_lookup is None:
        kev_data = fetch_kev_catalog()
        kev_vulns = parse_kev_catalog(kev_data)
        kev_lookup = build_kev_lookup(kev_vulns)
    
    # Batch EPSS lookup
    cve_ids = [v.get("cve_id") for v in vulns if v.get("cve_id")]
    epss_data = fetch_epss(cve_ids) if cve_ids else {}
    
    enriched = []
    for v in vulns:
        ev = v.copy()
        cve_id = v.get("cve_id")
        
        if cve_id:
            ev.update(check_kev_status(cve_id, kev_lookup))
            if cve_id in epss_data:
                ev["epss_score"] = epss_data[cve_id]["epss_score"]
                ev["epss_percentile"] = epss_data[cve_id]["percentile"]
        
        enriched.append(ev)
    
    return enriched

KEV Compliance Tracking

python
def get_overdue_kev(vulns: list) -> list:
    """Get KEV vulnerabilities past their due date."""
    today = datetime.now()
    overdue = []
    for v in vulns:
        if v.get("is_kev") and v.get("due_date"):
            due = datetime.strptime(v["due_date"], "%Y-%m-%d")
            if today > due:
                v["days_overdue"] = (today - due).days
                overdue.append(v)
    return sorted(overdue, key=lambda x: x["days_overdue"], reverse=True)

def get_upcoming_kev(vulns: list, days: int = 14) -> list:
    """Get KEV vulnerabilities due within N days."""
    today = datetime.now()
    upcoming = []
    for v in vulns:
        if v.get("is_kev") and v.get("due_date"):
            due = datetime.strptime(v["due_date"], "%Y-%m-%d")
            days_until = (due - today).days
            if 0 <= days_until <= days:
                v["days_until_due"] = days_until
                upcoming.append(v)
    return sorted(upcoming, key=lambda x: x["days_until_due"])

Rate Limits

APILimitWith API Key
NVD5 req/30s50 req/30s
EPSSNo limitN/A
KEVNo limitN/A

Implement exponential backoff for NVD requests.