CrowdStrike Data Parser
Overview
Utility scripts and patterns for parsing CrowdStrike Falcon platform data exports across all modules.
Supported Data Types
| Data Type | Source | Script |
|---|---|---|
| Detections | Falcon Prevent/Insight XDR | parse_detections.py |
| Incidents | Incident Workbench | parse_incidents.py |
| Vulnerabilities | Spotlight | parse_spotlight.py |
| External Assets | EASM | parse_easm.py |
| Host Inventory | Discover | parse_discover.py |
| Query Results | LogScale | parse_logscale.py |
Quick Start
bash
# Parse detection export python scripts/parse_detections.py detections.json --output detections_normalized.json # Parse Spotlight vulnerabilities with KEV enrichment python scripts/parse_spotlight.py spotlight.json --enrich-kev --output vulns.json # Convert to CSV for reporting python scripts/parse_detections.py detections.json --format csv --output detections.csv
Detection Parsing
python
# scripts/parse_detections.py
"""
Parse CrowdStrike detection exports into normalized format.
Usage:
python parse_detections.py <input_file> [--format json|csv] [--output <file>]
"""
import json
import csv
import sys
from typing import Generator
from datetime import datetime
def parse_detection_json(file_path: str) -> Generator[dict, None, None]:
"""Parse detection JSON export."""
with open(file_path, 'r') as f:
data = json.load(f)
# Handle both array and wrapped formats
detections = data if isinstance(data, list) else data.get("resources", [data])
for det in detections:
yield normalize_detection(det)
def normalize_detection(detection: dict) -> dict:
"""Normalize detection to standard schema."""
device = detection.get("device", {})
behaviors = detection.get("behaviors", [])
# Extract unique tactics/techniques
tactics = list(set(b.get("tactic") for b in behaviors if b.get("tactic")))
techniques = list(set(b.get("technique") for b in behaviors if b.get("technique")))
mitre_ids = list(set(b.get("technique_id") for b in behaviors if b.get("technique_id")))
# Extract IOCs
iocs = []
for b in behaviors:
if b.get("sha256"):
iocs.append({"type": "sha256", "value": b["sha256"]})
if b.get("md5"):
iocs.append({"type": "md5", "value": b["md5"]})
if b.get("ioc_value"):
iocs.append({"type": b.get("ioc_type"), "value": b["ioc_value"]})
return {
"detection_id": detection.get("detection_id"),
"severity": detection.get("max_severity_displayname"),
"severity_score": detection.get("max_severity"),
"status": detection.get("status"),
"first_behavior": detection.get("first_behavior"),
"last_behavior": detection.get("last_behavior"),
"hostname": device.get("hostname"),
"local_ip": device.get("local_ip"),
"external_ip": device.get("external_ip"),
"platform": device.get("platform_name"),
"os_version": device.get("os_version"),
"device_id": device.get("device_id"),
"tactics": tactics,
"techniques": techniques,
"mitre_ids": mitre_ids,
"iocs": iocs,
"filenames": list(set(b.get("filename") for b in behaviors if b.get("filename"))),
"cmdlines": [b.get("cmdline") for b in behaviors if b.get("cmdline")],
"assigned_to": detection.get("assigned_to_name"),
"raw_behaviors": behaviors
}
def severity_to_score(severity: str) -> int:
"""Convert severity name to numeric score."""
return {"Critical": 100, "High": 75, "Medium": 50, "Low": 25, "Informational": 10}.get(severity, 0)
def export_csv(detections: list, output_path: str):
"""Export detections to CSV."""
fieldnames = [
"detection_id", "severity", "severity_score", "status", "hostname",
"local_ip", "platform", "first_behavior", "tactics", "techniques",
"mitre_ids", "filenames"
]
with open(output_path, 'w', newline='') as f:
writer = csv.DictWriter(f, fieldnames=fieldnames, extrasaction='ignore')
writer.writeheader()
for det in detections:
row = det.copy()
row["tactics"] = "; ".join(det.get("tactics", []))
row["techniques"] = "; ".join(det.get("techniques", []))
row["mitre_ids"] = "; ".join(det.get("mitre_ids", []))
row["filenames"] = "; ".join(det.get("filenames", []))
writer.writerow(row)
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="Parse CrowdStrike detections")
parser.add_argument("input", help="Input JSON file")
parser.add_argument("--format", choices=["json", "csv"], default="json")
parser.add_argument("--output", "-o", help="Output file path")
args = parser.parse_args()
detections = list(parse_detection_json(args.input))
if args.format == "csv":
export_csv(detections, args.output or "detections.csv")
else:
output = args.output or "detections_normalized.json"
with open(output, 'w') as f:
json.dump(detections, f, indent=2)
print(f"Parsed {len(detections)} detections")
Spotlight Vulnerability Parsing
python
# scripts/parse_spotlight.py
"""
Parse CrowdStrike Spotlight vulnerability exports.
Usage:
python parse_spotlight.py <input_file> [--enrich-kev] [--output <file>]
"""
import json
import csv
import requests
from typing import Generator
KEV_URL = "https://www.cisa.gov/sites/default/files/feeds/known_exploited_vulnerabilities.json"
def parse_spotlight_json(file_path: str) -> Generator[dict, None, None]:
"""Parse Spotlight vulnerability export."""
with open(file_path, 'r') as f:
data = json.load(f)
vulns = data if isinstance(data, list) else data.get("resources", [data])
for vuln in vulns:
yield normalize_vulnerability(vuln)
def normalize_vulnerability(vuln: dict) -> dict:
"""Normalize Spotlight vulnerability to standard schema."""
cve = vuln.get("cve", {})
host = vuln.get("host_info", {})
app = vuln.get("app", {})
cisa = cve.get("cisa_info", {})
return {
"id": vuln.get("id"),
"cve_id": cve.get("id"),
"severity": cve.get("severity"),
"cvss_score": cve.get("base_score"),
"cvss_vector": cve.get("vector"),
"exploit_status": cve.get("exploit_status"),
"is_kev": cisa.get("is_cisa_kev", False),
"kev_due_date": cisa.get("due_date"),
"hostname": host.get("hostname"),
"local_ip": host.get("local_ip"),
"os_version": host.get("os_version"),
"groups": host.get("groups", []),
"tags": host.get("tags", []),
"agent_version": host.get("agent_version"),
"product": app.get("product_name_version"),
"vendor": app.get("vendor"),
"version": app.get("version"),
"status": vuln.get("status"),
"created": vuln.get("created_timestamp"),
"updated": vuln.get("updated_timestamp"),
"remediation": vuln.get("remediation", {}).get("action"),
"suppression": vuln.get("suppression_info")
}
def enrich_with_kev(vulns: list) -> list:
"""Enrich vulnerabilities with current KEV data."""
# Fetch KEV catalog
resp = requests.get(KEV_URL)
kev_data = resp.json()
kev_lookup = {v["cveID"]: v for v in kev_data.get("vulnerabilities", [])}
for vuln in vulns:
cve_id = vuln.get("cve_id")
if cve_id and cve_id in kev_lookup:
kev = kev_lookup[cve_id]
vuln["is_kev"] = True
vuln["kev_due_date"] = kev.get("dueDate")
vuln["kev_action"] = kev.get("requiredAction")
vuln["ransomware_related"] = kev.get("knownRansomwareCampaignUse") == "Known"
return vulns
def calculate_risk_score(vuln: dict) -> float:
"""Calculate composite risk score."""
score = 0.0
cvss = vuln.get("cvss_score") or 0
score += (cvss / 10) * 40 # CVSS: 40%
if vuln.get("is_kev"):
score += 25 # KEV: 25%
if vuln.get("exploit_status") == "available":
score += 20 # Exploit available: 20%
# Severity bonus
sev_bonus = {"CRITICAL": 15, "HIGH": 10, "MEDIUM": 5}.get(vuln.get("severity", "").upper(), 0)
score += sev_bonus
return min(score, 100)
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="Parse CrowdStrike Spotlight vulnerabilities")
parser.add_argument("input", help="Input JSON file")
parser.add_argument("--enrich-kev", action="store_true", help="Enrich with CISA KEV data")
parser.add_argument("--output", "-o", help="Output file path")
args = parser.parse_args()
vulns = list(parse_spotlight_json(args.input))
if args.enrich_kev:
vulns = enrich_with_kev(vulns)
# Calculate risk scores
for v in vulns:
v["risk_score"] = calculate_risk_score(v)
# Sort by risk score
vulns.sort(key=lambda x: x["risk_score"], reverse=True)
output = args.output or "spotlight_normalized.json"
with open(output, 'w') as f:
json.dump(vulns, f, indent=2)
print(f"Parsed {len(vulns)} vulnerabilities")
kev_count = sum(1 for v in vulns if v.get("is_kev"))
print(f"KEV vulnerabilities: {kev_count}")
Incident Parsing
python
# scripts/parse_incidents.py
"""Parse CrowdStrike incident exports."""
import json
from typing import Generator
def parse_incident_json(file_path: str) -> Generator[dict, None, None]:
"""Parse incident JSON export."""
with open(file_path, 'r') as f:
data = json.load(f)
incidents = data if isinstance(data, list) else data.get("resources", [data])
for inc in incidents:
yield normalize_incident(inc)
def normalize_incident(incident: dict) -> dict:
"""Normalize incident to standard schema."""
return {
"incident_id": incident.get("incident_id"),
"incident_type": incident.get("incident_type"),
"state": incident.get("state"),
"status": incident.get("status"),
"score": incident.get("fine_score"),
"name": incident.get("name"),
"description": incident.get("description"),
"created": incident.get("created"),
"start": incident.get("start"),
"end": incident.get("end"),
"tags": incident.get("tags", []),
"assigned_to": incident.get("assigned_to"),
"hosts": [
{
"device_id": h.get("device_id"),
"hostname": h.get("hostname"),
"local_ip": h.get("local_ip")
}
for h in incident.get("hosts", [])
],
"users": incident.get("users", []),
"objectives": incident.get("objectives", []),
"tactics": incident.get("tactics", []),
"techniques": incident.get("techniques", [])
}
EASM Asset Parsing
python
# scripts/parse_easm.py
"""Parse CrowdStrike EASM (External Attack Surface) exports."""
import json
from typing import Generator
def parse_easm_json(file_path: str) -> Generator[dict, None, None]:
"""Parse EASM asset export."""
with open(file_path, 'r') as f:
data = json.load(f)
assets = data if isinstance(data, list) else data.get("resources", [data])
for asset in assets:
yield normalize_easm_asset(asset)
def normalize_easm_asset(asset: dict) -> dict:
"""Normalize EASM asset to standard schema."""
return {
"id": asset.get("id"),
"asset_type": asset.get("asset_type"),
"asset": asset.get("asset"),
"confidence": asset.get("confidence"),
"discovery_date": asset.get("discovery_date"),
"first_seen": asset.get("first_seen"),
"last_seen": asset.get("last_seen"),
"sources": asset.get("sources", []),
"subsidiaries": asset.get("subsidiaries", []),
"services": [
{
"port": s.get("port"),
"protocol": s.get("protocol"),
"service": s.get("service_name"),
"version": s.get("version")
}
for s in asset.get("services", [])
],
"vulnerabilities": [
{
"cve_id": v.get("cve_id"),
"severity": v.get("severity")
}
for v in asset.get("vulnerabilities", [])
],
"exposures": asset.get("exposures", [])
}
CSV Export Utility
python
# scripts/export_csv.py
"""Generic CSV export for CrowdStrike data."""
import csv
import json
import sys
def flatten_dict(d: dict, parent_key: str = '', sep: str = '_') -> dict:
"""Flatten nested dictionary."""
items = []
for k, v in d.items():
new_key = f"{parent_key}{sep}{k}" if parent_key else k
if isinstance(v, dict):
items.extend(flatten_dict(v, new_key, sep).items())
elif isinstance(v, list):
if v and isinstance(v[0], dict):
items.append((new_key, json.dumps(v)))
else:
items.append((new_key, "; ".join(str(x) for x in v)))
else:
items.append((new_key, v))
return dict(items)
def json_to_csv(input_file: str, output_file: str):
"""Convert JSON array to CSV."""
with open(input_file, 'r') as f:
data = json.load(f)
if not isinstance(data, list):
data = data.get("resources", [data])
if not data:
print("No data to export")
return
# Flatten all records
flattened = [flatten_dict(d) for d in data]
# Get all unique keys
all_keys = set()
for record in flattened:
all_keys.update(record.keys())
fieldnames = sorted(all_keys)
with open(output_file, 'w', newline='') as f:
writer = csv.DictWriter(f, fieldnames=fieldnames, extrasaction='ignore')
writer.writeheader()
writer.writerows(flattened)
print(f"Exported {len(flattened)} records to {output_file}")
if __name__ == "__main__":
if len(sys.argv) != 3:
print("Usage: export_csv.py <input.json> <output.csv>")
sys.exit(1)
json_to_csv(sys.argv[1], sys.argv[2])
LogScale Query Result Parsing
python
# scripts/parse_logscale.py
"""Parse CrowdStrike LogScale (Humio) query results."""
import json
from typing import Generator
def parse_logscale_results(file_path: str) -> Generator[dict, None, None]:
"""Parse LogScale query export."""
with open(file_path, 'r') as f:
data = json.load(f)
# LogScale returns events in various formats
events = data.get("events", data) if isinstance(data, dict) else data
for event in events:
yield normalize_logscale_event(event)
def normalize_logscale_event(event: dict) -> dict:
"""Normalize LogScale event to standard schema."""
return {
"timestamp": event.get("@timestamp") or event.get("timestamp"),
"event_type": event.get("event_simpleName"),
"aid": event.get("aid"),
"hostname": event.get("ComputerName"),
"username": event.get("UserName"),
"process_name": event.get("ImageFileName"),
"command_line": event.get("CommandLine"),
"sha256": event.get("SHA256HashData"),
"parent_process": event.get("ParentImageFileName"),
"remote_ip": event.get("RemoteIP"),
"remote_port": event.get("RemotePort"),
"local_ip": event.get("LocalIP"),
"raw": event
}
Batch Processing
bash
#!/bin/bash
# scripts/batch_process.sh - Process multiple CrowdStrike exports
INPUT_DIR="${1:-.}"
OUTPUT_DIR="${2:-./processed}"
mkdir -p "$OUTPUT_DIR"
# Process all detection files
for f in "$INPUT_DIR"/detections*.json; do
[ -f "$f" ] || continue
base=$(basename "$f" .json)
python parse_detections.py "$f" -o "$OUTPUT_DIR/${base}_normalized.json"
done
# Process all Spotlight files
for f in "$INPUT_DIR"/spotlight*.json; do
[ -f "$f" ] || continue
base=$(basename "$f" .json)
python parse_spotlight.py "$f" --enrich-kev -o "$OUTPUT_DIR/${base}_normalized.json"
done
echo "Processing complete. Output in $OUTPUT_DIR"