AgentSkillsCN

crowdstrike-parser

解析并转换 CrowdStrike Falcon 的数据导出,包括检测记录、事件、Spotlight 漏洞、EASM 资产,以及 LogScale 查询结果。适用于在处理 CrowdStrike JSON/CSV 导出数据、基于 Falcon 数据生成报告、规范检测数据,或将 CrowdStrike 输出集成到安全工作流中时使用。

SKILL.md
--- frontmatter
name: crowdstrike-parser
description: "Parse and transform CrowdStrike Falcon data exports including detections, incidents, Spotlight vulnerabilities, EASM assets, and LogScale query results. Use when processing CrowdStrike JSON/CSV exports, building reports from Falcon data, normalizing detection data, or integrating CrowdStrike outputs into security workflows."

CrowdStrike Data Parser

Overview

Utility scripts and patterns for parsing CrowdStrike Falcon platform data exports across all modules.

Supported Data Types

Data TypeSourceScript
DetectionsFalcon Prevent/Insight XDRparse_detections.py
IncidentsIncident Workbenchparse_incidents.py
VulnerabilitiesSpotlightparse_spotlight.py
External AssetsEASMparse_easm.py
Host InventoryDiscoverparse_discover.py
Query ResultsLogScaleparse_logscale.py

Quick Start

bash
# Parse detection export
python scripts/parse_detections.py detections.json --output detections_normalized.json

# Parse Spotlight vulnerabilities with KEV enrichment
python scripts/parse_spotlight.py spotlight.json --enrich-kev --output vulns.json

# Convert to CSV for reporting
python scripts/parse_detections.py detections.json --format csv --output detections.csv

Detection Parsing

python
# scripts/parse_detections.py
"""
Parse CrowdStrike detection exports into normalized format.

Usage:
    python parse_detections.py <input_file> [--format json|csv] [--output <file>]
"""

import json
import csv
import sys
from typing import Generator
from datetime import datetime

def parse_detection_json(file_path: str) -> Generator[dict, None, None]:
    """Parse detection JSON export."""
    with open(file_path, 'r') as f:
        data = json.load(f)
    
    # Handle both array and wrapped formats
    detections = data if isinstance(data, list) else data.get("resources", [data])
    
    for det in detections:
        yield normalize_detection(det)

def normalize_detection(detection: dict) -> dict:
    """Normalize detection to standard schema."""
    device = detection.get("device", {})
    behaviors = detection.get("behaviors", [])
    
    # Extract unique tactics/techniques
    tactics = list(set(b.get("tactic") for b in behaviors if b.get("tactic")))
    techniques = list(set(b.get("technique") for b in behaviors if b.get("technique")))
    mitre_ids = list(set(b.get("technique_id") for b in behaviors if b.get("technique_id")))
    
    # Extract IOCs
    iocs = []
    for b in behaviors:
        if b.get("sha256"):
            iocs.append({"type": "sha256", "value": b["sha256"]})
        if b.get("md5"):
            iocs.append({"type": "md5", "value": b["md5"]})
        if b.get("ioc_value"):
            iocs.append({"type": b.get("ioc_type"), "value": b["ioc_value"]})
    
    return {
        "detection_id": detection.get("detection_id"),
        "severity": detection.get("max_severity_displayname"),
        "severity_score": detection.get("max_severity"),
        "status": detection.get("status"),
        "first_behavior": detection.get("first_behavior"),
        "last_behavior": detection.get("last_behavior"),
        "hostname": device.get("hostname"),
        "local_ip": device.get("local_ip"),
        "external_ip": device.get("external_ip"),
        "platform": device.get("platform_name"),
        "os_version": device.get("os_version"),
        "device_id": device.get("device_id"),
        "tactics": tactics,
        "techniques": techniques,
        "mitre_ids": mitre_ids,
        "iocs": iocs,
        "filenames": list(set(b.get("filename") for b in behaviors if b.get("filename"))),
        "cmdlines": [b.get("cmdline") for b in behaviors if b.get("cmdline")],
        "assigned_to": detection.get("assigned_to_name"),
        "raw_behaviors": behaviors
    }

def severity_to_score(severity: str) -> int:
    """Convert severity name to numeric score."""
    return {"Critical": 100, "High": 75, "Medium": 50, "Low": 25, "Informational": 10}.get(severity, 0)

def export_csv(detections: list, output_path: str):
    """Export detections to CSV."""
    fieldnames = [
        "detection_id", "severity", "severity_score", "status", "hostname",
        "local_ip", "platform", "first_behavior", "tactics", "techniques",
        "mitre_ids", "filenames"
    ]
    
    with open(output_path, 'w', newline='') as f:
        writer = csv.DictWriter(f, fieldnames=fieldnames, extrasaction='ignore')
        writer.writeheader()
        for det in detections:
            row = det.copy()
            row["tactics"] = "; ".join(det.get("tactics", []))
            row["techniques"] = "; ".join(det.get("techniques", []))
            row["mitre_ids"] = "; ".join(det.get("mitre_ids", []))
            row["filenames"] = "; ".join(det.get("filenames", []))
            writer.writerow(row)

if __name__ == "__main__":
    import argparse
    parser = argparse.ArgumentParser(description="Parse CrowdStrike detections")
    parser.add_argument("input", help="Input JSON file")
    parser.add_argument("--format", choices=["json", "csv"], default="json")
    parser.add_argument("--output", "-o", help="Output file path")
    args = parser.parse_args()
    
    detections = list(parse_detection_json(args.input))
    
    if args.format == "csv":
        export_csv(detections, args.output or "detections.csv")
    else:
        output = args.output or "detections_normalized.json"
        with open(output, 'w') as f:
            json.dump(detections, f, indent=2)
    
    print(f"Parsed {len(detections)} detections")

Spotlight Vulnerability Parsing

python
# scripts/parse_spotlight.py
"""
Parse CrowdStrike Spotlight vulnerability exports.

Usage:
    python parse_spotlight.py <input_file> [--enrich-kev] [--output <file>]
"""

import json
import csv
import requests
from typing import Generator

KEV_URL = "https://www.cisa.gov/sites/default/files/feeds/known_exploited_vulnerabilities.json"

def parse_spotlight_json(file_path: str) -> Generator[dict, None, None]:
    """Parse Spotlight vulnerability export."""
    with open(file_path, 'r') as f:
        data = json.load(f)
    
    vulns = data if isinstance(data, list) else data.get("resources", [data])
    
    for vuln in vulns:
        yield normalize_vulnerability(vuln)

def normalize_vulnerability(vuln: dict) -> dict:
    """Normalize Spotlight vulnerability to standard schema."""
    cve = vuln.get("cve", {})
    host = vuln.get("host_info", {})
    app = vuln.get("app", {})
    cisa = cve.get("cisa_info", {})
    
    return {
        "id": vuln.get("id"),
        "cve_id": cve.get("id"),
        "severity": cve.get("severity"),
        "cvss_score": cve.get("base_score"),
        "cvss_vector": cve.get("vector"),
        "exploit_status": cve.get("exploit_status"),
        "is_kev": cisa.get("is_cisa_kev", False),
        "kev_due_date": cisa.get("due_date"),
        "hostname": host.get("hostname"),
        "local_ip": host.get("local_ip"),
        "os_version": host.get("os_version"),
        "groups": host.get("groups", []),
        "tags": host.get("tags", []),
        "agent_version": host.get("agent_version"),
        "product": app.get("product_name_version"),
        "vendor": app.get("vendor"),
        "version": app.get("version"),
        "status": vuln.get("status"),
        "created": vuln.get("created_timestamp"),
        "updated": vuln.get("updated_timestamp"),
        "remediation": vuln.get("remediation", {}).get("action"),
        "suppression": vuln.get("suppression_info")
    }

def enrich_with_kev(vulns: list) -> list:
    """Enrich vulnerabilities with current KEV data."""
    # Fetch KEV catalog
    resp = requests.get(KEV_URL)
    kev_data = resp.json()
    kev_lookup = {v["cveID"]: v for v in kev_data.get("vulnerabilities", [])}
    
    for vuln in vulns:
        cve_id = vuln.get("cve_id")
        if cve_id and cve_id in kev_lookup:
            kev = kev_lookup[cve_id]
            vuln["is_kev"] = True
            vuln["kev_due_date"] = kev.get("dueDate")
            vuln["kev_action"] = kev.get("requiredAction")
            vuln["ransomware_related"] = kev.get("knownRansomwareCampaignUse") == "Known"
    
    return vulns

def calculate_risk_score(vuln: dict) -> float:
    """Calculate composite risk score."""
    score = 0.0
    cvss = vuln.get("cvss_score") or 0
    score += (cvss / 10) * 40  # CVSS: 40%
    
    if vuln.get("is_kev"):
        score += 25  # KEV: 25%
    
    if vuln.get("exploit_status") == "available":
        score += 20  # Exploit available: 20%
    
    # Severity bonus
    sev_bonus = {"CRITICAL": 15, "HIGH": 10, "MEDIUM": 5}.get(vuln.get("severity", "").upper(), 0)
    score += sev_bonus
    
    return min(score, 100)

if __name__ == "__main__":
    import argparse
    parser = argparse.ArgumentParser(description="Parse CrowdStrike Spotlight vulnerabilities")
    parser.add_argument("input", help="Input JSON file")
    parser.add_argument("--enrich-kev", action="store_true", help="Enrich with CISA KEV data")
    parser.add_argument("--output", "-o", help="Output file path")
    args = parser.parse_args()
    
    vulns = list(parse_spotlight_json(args.input))
    
    if args.enrich_kev:
        vulns = enrich_with_kev(vulns)
    
    # Calculate risk scores
    for v in vulns:
        v["risk_score"] = calculate_risk_score(v)
    
    # Sort by risk score
    vulns.sort(key=lambda x: x["risk_score"], reverse=True)
    
    output = args.output or "spotlight_normalized.json"
    with open(output, 'w') as f:
        json.dump(vulns, f, indent=2)
    
    print(f"Parsed {len(vulns)} vulnerabilities")
    kev_count = sum(1 for v in vulns if v.get("is_kev"))
    print(f"KEV vulnerabilities: {kev_count}")

Incident Parsing

python
# scripts/parse_incidents.py
"""Parse CrowdStrike incident exports."""

import json
from typing import Generator

def parse_incident_json(file_path: str) -> Generator[dict, None, None]:
    """Parse incident JSON export."""
    with open(file_path, 'r') as f:
        data = json.load(f)
    
    incidents = data if isinstance(data, list) else data.get("resources", [data])
    
    for inc in incidents:
        yield normalize_incident(inc)

def normalize_incident(incident: dict) -> dict:
    """Normalize incident to standard schema."""
    return {
        "incident_id": incident.get("incident_id"),
        "incident_type": incident.get("incident_type"),
        "state": incident.get("state"),
        "status": incident.get("status"),
        "score": incident.get("fine_score"),
        "name": incident.get("name"),
        "description": incident.get("description"),
        "created": incident.get("created"),
        "start": incident.get("start"),
        "end": incident.get("end"),
        "tags": incident.get("tags", []),
        "assigned_to": incident.get("assigned_to"),
        "hosts": [
            {
                "device_id": h.get("device_id"),
                "hostname": h.get("hostname"),
                "local_ip": h.get("local_ip")
            }
            for h in incident.get("hosts", [])
        ],
        "users": incident.get("users", []),
        "objectives": incident.get("objectives", []),
        "tactics": incident.get("tactics", []),
        "techniques": incident.get("techniques", [])
    }

EASM Asset Parsing

python
# scripts/parse_easm.py
"""Parse CrowdStrike EASM (External Attack Surface) exports."""

import json
from typing import Generator

def parse_easm_json(file_path: str) -> Generator[dict, None, None]:
    """Parse EASM asset export."""
    with open(file_path, 'r') as f:
        data = json.load(f)
    
    assets = data if isinstance(data, list) else data.get("resources", [data])
    
    for asset in assets:
        yield normalize_easm_asset(asset)

def normalize_easm_asset(asset: dict) -> dict:
    """Normalize EASM asset to standard schema."""
    return {
        "id": asset.get("id"),
        "asset_type": asset.get("asset_type"),
        "asset": asset.get("asset"),
        "confidence": asset.get("confidence"),
        "discovery_date": asset.get("discovery_date"),
        "first_seen": asset.get("first_seen"),
        "last_seen": asset.get("last_seen"),
        "sources": asset.get("sources", []),
        "subsidiaries": asset.get("subsidiaries", []),
        "services": [
            {
                "port": s.get("port"),
                "protocol": s.get("protocol"),
                "service": s.get("service_name"),
                "version": s.get("version")
            }
            for s in asset.get("services", [])
        ],
        "vulnerabilities": [
            {
                "cve_id": v.get("cve_id"),
                "severity": v.get("severity")
            }
            for v in asset.get("vulnerabilities", [])
        ],
        "exposures": asset.get("exposures", [])
    }

CSV Export Utility

python
# scripts/export_csv.py
"""Generic CSV export for CrowdStrike data."""

import csv
import json
import sys

def flatten_dict(d: dict, parent_key: str = '', sep: str = '_') -> dict:
    """Flatten nested dictionary."""
    items = []
    for k, v in d.items():
        new_key = f"{parent_key}{sep}{k}" if parent_key else k
        if isinstance(v, dict):
            items.extend(flatten_dict(v, new_key, sep).items())
        elif isinstance(v, list):
            if v and isinstance(v[0], dict):
                items.append((new_key, json.dumps(v)))
            else:
                items.append((new_key, "; ".join(str(x) for x in v)))
        else:
            items.append((new_key, v))
    return dict(items)

def json_to_csv(input_file: str, output_file: str):
    """Convert JSON array to CSV."""
    with open(input_file, 'r') as f:
        data = json.load(f)
    
    if not isinstance(data, list):
        data = data.get("resources", [data])
    
    if not data:
        print("No data to export")
        return
    
    # Flatten all records
    flattened = [flatten_dict(d) for d in data]
    
    # Get all unique keys
    all_keys = set()
    for record in flattened:
        all_keys.update(record.keys())
    
    fieldnames = sorted(all_keys)
    
    with open(output_file, 'w', newline='') as f:
        writer = csv.DictWriter(f, fieldnames=fieldnames, extrasaction='ignore')
        writer.writeheader()
        writer.writerows(flattened)
    
    print(f"Exported {len(flattened)} records to {output_file}")

if __name__ == "__main__":
    if len(sys.argv) != 3:
        print("Usage: export_csv.py <input.json> <output.csv>")
        sys.exit(1)
    json_to_csv(sys.argv[1], sys.argv[2])

LogScale Query Result Parsing

python
# scripts/parse_logscale.py
"""Parse CrowdStrike LogScale (Humio) query results."""

import json
from typing import Generator

def parse_logscale_results(file_path: str) -> Generator[dict, None, None]:
    """Parse LogScale query export."""
    with open(file_path, 'r') as f:
        data = json.load(f)
    
    # LogScale returns events in various formats
    events = data.get("events", data) if isinstance(data, dict) else data
    
    for event in events:
        yield normalize_logscale_event(event)

def normalize_logscale_event(event: dict) -> dict:
    """Normalize LogScale event to standard schema."""
    return {
        "timestamp": event.get("@timestamp") or event.get("timestamp"),
        "event_type": event.get("event_simpleName"),
        "aid": event.get("aid"),
        "hostname": event.get("ComputerName"),
        "username": event.get("UserName"),
        "process_name": event.get("ImageFileName"),
        "command_line": event.get("CommandLine"),
        "sha256": event.get("SHA256HashData"),
        "parent_process": event.get("ParentImageFileName"),
        "remote_ip": event.get("RemoteIP"),
        "remote_port": event.get("RemotePort"),
        "local_ip": event.get("LocalIP"),
        "raw": event
    }

Batch Processing

bash
#!/bin/bash
# scripts/batch_process.sh - Process multiple CrowdStrike exports

INPUT_DIR="${1:-.}"
OUTPUT_DIR="${2:-./processed}"

mkdir -p "$OUTPUT_DIR"

# Process all detection files
for f in "$INPUT_DIR"/detections*.json; do
    [ -f "$f" ] || continue
    base=$(basename "$f" .json)
    python parse_detections.py "$f" -o "$OUTPUT_DIR/${base}_normalized.json"
done

# Process all Spotlight files
for f in "$INPUT_DIR"/spotlight*.json; do
    [ -f "$f" ] || continue
    base=$(basename "$f" .json)
    python parse_spotlight.py "$f" --enrich-kev -o "$OUTPUT_DIR/${base}_normalized.json"
done

echo "Processing complete. Output in $OUTPUT_DIR"