CISA KEV & NVD Data Processor
Overview
Fetch, parse, and integrate authoritative vulnerability data from CISA KEV and NIST NVD for enrichment and compliance tracking.
Data Sources
| Source | URL | Update Frequency |
|---|---|---|
| CISA KEV | https://www.cisa.gov/known-exploited-vulnerabilities-catalog | Daily |
| NVD CVE | https://services.nvd.nist.gov/rest/json/cves/2.0 | Continuous |
| NVD CPE | https://services.nvd.nist.gov/rest/json/cpes/2.0 | Weekly |
| EPSS | https://api.first.org/data/v1/epss | Daily |
CISA KEV Integration
python
import requests
from datetime import datetime
KEV_JSON_URL = "https://www.cisa.gov/sites/default/files/feeds/known_exploited_vulnerabilities.json"
KEV_CSV_URL = "https://www.cisa.gov/sites/default/files/csv/known_exploited_vulnerabilities.csv"
def fetch_kev_catalog() -> dict:
"""Fetch current CISA KEV catalog."""
response = requests.get(KEV_JSON_URL)
response.raise_for_status()
return response.json()
def parse_kev_catalog(kev_data: dict) -> list:
"""Parse KEV catalog into structured format."""
vulns = []
for v in kev_data.get("vulnerabilities", []):
vulns.append({
"cve_id": v.get("cveID"),
"vendor": v.get("vendorProject"),
"product": v.get("product"),
"vulnerability_name": v.get("vulnerabilityName"),
"date_added": v.get("dateAdded"),
"due_date": v.get("dueDate"),
"short_description": v.get("shortDescription"),
"required_action": v.get("requiredAction"),
"notes": v.get("notes"),
"known_ransomware": v.get("knownRansomwareCampaignUse", "Unknown")
})
return vulns
def build_kev_lookup(kev_vulns: list) -> dict:
"""Build CVE -> KEV info lookup dictionary."""
return {v["cve_id"]: v for v in kev_vulns}
def check_kev_status(cve_id: str, kev_lookup: dict) -> dict:
"""Check if CVE is in KEV catalog."""
kev_info = kev_lookup.get(cve_id)
if kev_info:
due = datetime.strptime(kev_info["due_date"], "%Y-%m-%d")
overdue = datetime.now() > due
return {
"is_kev": True,
"due_date": kev_info["due_date"],
"overdue": overdue,
"days_until_due": (due - datetime.now()).days if not overdue else 0,
"required_action": kev_info["required_action"],
"ransomware_related": kev_info["known_ransomware"] == "Known"
}
return {"is_kev": False}
NVD API Integration
python
NVD_API_URL = "https://services.nvd.nist.gov/rest/json/cves/2.0"
def fetch_cve(cve_id: str, api_key: str = None) -> dict:
"""Fetch CVE details from NVD."""
headers = {"apiKey": api_key} if api_key else {}
params = {"cveId": cve_id}
response = requests.get(NVD_API_URL, headers=headers, params=params)
response.raise_for_status()
data = response.json()
vulns = data.get("vulnerabilities", [])
return vulns[0]["cve"] if vulns else None
def search_cves(keyword: str = None, cpe_name: str = None,
cvss_severity: str = None, pub_start: str = None,
pub_end: str = None, api_key: str = None,
results_per_page: int = 100) -> list:
"""Search NVD for CVEs."""
headers = {"apiKey": api_key} if api_key else {}
params = {"resultsPerPage": results_per_page}
if keyword:
params["keywordSearch"] = keyword
if cpe_name:
params["cpeName"] = cpe_name
if cvss_severity:
params["cvssV3Severity"] = cvss_severity # LOW, MEDIUM, HIGH, CRITICAL
if pub_start:
params["pubStartDate"] = pub_start # ISO format
if pub_end:
params["pubEndDate"] = pub_end
response = requests.get(NVD_API_URL, headers=headers, params=params)
response.raise_for_status()
return [v["cve"] for v in response.json().get("vulnerabilities", [])]
def parse_nvd_cve(cve_data: dict) -> dict:
"""Parse NVD CVE response into structured format."""
metrics = cve_data.get("metrics", {})
# Get CVSS v3.1 or v3.0
cvss_v3 = metrics.get("cvssMetricV31", metrics.get("cvssMetricV30", [{}]))[0] if metrics else {}
cvss_data = cvss_v3.get("cvssData", {})
descriptions = cve_data.get("descriptions", [])
en_desc = next((d["value"] for d in descriptions if d["lang"] == "en"), "")
references = [ref["url"] for ref in cve_data.get("references", [])]
return {
"cve_id": cve_data.get("id"),
"description": en_desc,
"published": cve_data.get("published"),
"modified": cve_data.get("lastModified"),
"cvss_score": cvss_data.get("baseScore"),
"cvss_severity": cvss_data.get("baseSeverity"),
"cvss_vector": cvss_data.get("vectorString"),
"attack_vector": cvss_data.get("attackVector"),
"attack_complexity": cvss_data.get("attackComplexity"),
"privileges_required": cvss_data.get("privilegesRequired"),
"user_interaction": cvss_data.get("userInteraction"),
"scope": cvss_data.get("scope"),
"exploitability_score": cvss_v3.get("exploitabilityScore"),
"impact_score": cvss_v3.get("impactScore"),
"references": references,
"cwe_ids": [w.get("value") for w in cve_data.get("weaknesses", [{}])[0].get("description", []) if w.get("lang") == "en"]
}
EPSS Integration
python
EPSS_API_URL = "https://api.first.org/data/v1/epss"
def fetch_epss(cve_ids: list) -> dict:
"""Fetch EPSS scores for CVEs."""
# EPSS API accepts comma-separated CVE IDs
params = {"cve": ",".join(cve_ids)}
response = requests.get(EPSS_API_URL, params=params)
response.raise_for_status()
data = response.json()
return {
item["cve"]: {
"epss_score": float(item["epss"]),
"percentile": float(item["percentile"]),
"date": item["date"]
}
for item in data.get("data", [])
}
def get_epss_score(cve_id: str) -> dict:
"""Get EPSS score for single CVE."""
result = fetch_epss([cve_id])
return result.get(cve_id, {"epss_score": None, "percentile": None})
Enrichment Pipeline
python
def enrich_vulnerability(vuln: dict, kev_lookup: dict,
nvd_api_key: str = None) -> dict:
"""Enrich vulnerability with KEV, NVD, and EPSS data."""
cve_id = vuln.get("cve_id")
if not cve_id:
return vuln
enriched = vuln.copy()
# KEV status
kev_status = check_kev_status(cve_id, kev_lookup)
enriched.update(kev_status)
# NVD details (if not already present)
if not enriched.get("cvss_score"):
nvd_data = fetch_cve(cve_id, nvd_api_key)
if nvd_data:
parsed = parse_nvd_cve(nvd_data)
enriched["cvss_score"] = parsed.get("cvss_score")
enriched["cvss_vector"] = parsed.get("cvss_vector")
enriched["description"] = parsed.get("description")
# EPSS score
epss = get_epss_score(cve_id)
enriched["epss_score"] = epss.get("epss_score")
enriched["epss_percentile"] = epss.get("percentile")
return enriched
def batch_enrich(vulns: list, kev_lookup: dict = None,
nvd_api_key: str = None) -> list:
"""Batch enrich vulnerabilities."""
if kev_lookup is None:
kev_data = fetch_kev_catalog()
kev_vulns = parse_kev_catalog(kev_data)
kev_lookup = build_kev_lookup(kev_vulns)
# Batch EPSS lookup
cve_ids = [v.get("cve_id") for v in vulns if v.get("cve_id")]
epss_data = fetch_epss(cve_ids) if cve_ids else {}
enriched = []
for v in vulns:
ev = v.copy()
cve_id = v.get("cve_id")
if cve_id:
ev.update(check_kev_status(cve_id, kev_lookup))
if cve_id in epss_data:
ev["epss_score"] = epss_data[cve_id]["epss_score"]
ev["epss_percentile"] = epss_data[cve_id]["percentile"]
enriched.append(ev)
return enriched
KEV Compliance Tracking
python
def get_overdue_kev(vulns: list) -> list:
"""Get KEV vulnerabilities past their due date."""
today = datetime.now()
overdue = []
for v in vulns:
if v.get("is_kev") and v.get("due_date"):
due = datetime.strptime(v["due_date"], "%Y-%m-%d")
if today > due:
v["days_overdue"] = (today - due).days
overdue.append(v)
return sorted(overdue, key=lambda x: x["days_overdue"], reverse=True)
def get_upcoming_kev(vulns: list, days: int = 14) -> list:
"""Get KEV vulnerabilities due within N days."""
today = datetime.now()
upcoming = []
for v in vulns:
if v.get("is_kev") and v.get("due_date"):
due = datetime.strptime(v["due_date"], "%Y-%m-%d")
days_until = (due - today).days
if 0 <= days_until <= days:
v["days_until_due"] = days_until
upcoming.append(v)
return sorted(upcoming, key=lambda x: x["days_until_due"])
Rate Limits
| API | Limit | With API Key |
|---|---|---|
| NVD | 5 req/30s | 50 req/30s |
| EPSS | No limit | N/A |
| KEV | No limit | N/A |
Implement exponential backoff for NVD requests.