AgentSkillsCN

data-engineering-observability

利用OpenTelemetry(追踪)与Prometheus(指标)对数据管道进行可观测性与监控。涵盖仪器化、仪表板与告警机制。

SKILL.md
--- frontmatter
name: data-engineering-observability
description: "Observability and monitoring for data pipelines using OpenTelemetry (traces) and Prometheus (metrics). Covers instrumentation, dashboards, and alerting."
dependsOn: ["@data-engineering-core"]

Pipeline Observability

Tracing and metrics for data pipelines using OpenTelemetry and Prometheus. Instrument code for visibility into performance, errors, and data lineage.

Quick Reference

ToolPurposeWhat it Measures
OpenTelemetryDistributed tracingPipeline stages, latency, dependencies
PrometheusMetricsThroughput, error rates, resource utilization
GrafanaVisualizationDashboards combining traces + metrics

Why Observable?

  • Debugging: Trace failed records through pipeline stages
  • Performance: Identify bottlenecks, optimize slow transformations
  • Reliability: Set alerts on error rates, SLA breaches
  • Cost: Track resource usage, optimize expensive operations
  • Compliance: Audit trail of data transformations

Skill Dependencies

  • @data-engineering-core - Pipeline structure to instrument
  • @data-engineering-orchestration - Prefect/Dagster have built-in observability
  • @data-engineering-streaming - Stream processing patterns need tracing

OpenTelemetry Integration

OpenTelemetry (OTel) provides a vendor-neutral standard for distributed tracing, metrics, and logs.

Installation

bash
pip install opentelemetry-api opentelemetry-sdk opentelemetry-exporter-otlp

Basic Tracing

python
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
import logging

# Setup tracer provider
provider = TracerProvider()
exporter = OTLPSpanExporter(endpoint="http://localhost:4317")
provider.add_span_processor(BatchSpanProcessor(exporter))
trace.set_tracer_provider(provider)

tracer = trace.get_tracer("data_pipeline")

def run_pipeline():
    with tracer.start_as_current_span("extract") as span:
        span.set_attribute("source", "sales.parquet")
        span.set_attribute("format", "parquet")
        df = pl.scan_parquet("data/sales.parquet").collect()
        span.set_attribute("rows_read", len(df))

    with tracer.start_as_current_span("transform") as span:
        span.set_attribute("operation", "aggregation")
        result = df.group_by("category").agg(pl.col("value").sum())

    with tracer.start_as_current_span("load") as span:
        span.set_attribute("target", "duckdb.summary")
        result.to_pandas().to_sql("summary", conn, if_exists="replace")
        span.set_attribute("rows_written", len(result))

if __name__ == "__main__":
    run_pipeline()

Trace Context Propagation

For multi-service pipelines, pass trace context:

python
from opentelemetry import propagators
from opentelemetry.propagators.b3 import B3Format

# Inject trace context into message headers (Kafka, HTTP)
carrier = {}
propagator = B3Format()
propagator.inject(carrier, context=trace.get_current_span().get_context())

# Send carrier dict with message (e.g., Kafka header)
producer.produce(
    topic="events",
    key=key,
    value=json.dumps(data),
    headers=list(carrier.items())
)

# Consumer extracts context
context = propagator.extract(carrier=carrier)
with tracer.start_as_current_span("process_message", context=context):
    process(data)

Prometheus Metrics

Prometheus collects numeric time series data. Push or pull metrics from your application.

Installation

bash
pip install prometheus-client

Basic Instrumentation

python
from prometheus_client import Counter, Histogram, Gauge, start_http_server
import time

# Define metrics
ROWS_PROCESSED = Counter(
    'etl_rows_processed_total',
    'Total rows processed by ETL',
    ['source', 'stage']
)

PROCESSING_TIME = Histogram(
    'etl_processing_seconds',
    'Time spent processing',
    ['operation'],
    buckets=[0.1, 0.5, 1.0, 5.0, 10.0, 30.0, 60.0]
)

PIPELINE_ERRORS = Counter(
    'etl_errors_total',
    'Total preprocessing errors',
    ['stage', 'error_type']
)

MEMORY_USAGE = Gauge(
    'etl_memory_bytes',
    'Process memory usage in bytes'
)

# Start metrics server (Prometheus scrapes this endpoint)
start_http_server(8000)

def process_batch(stage: str, batch_id: int):
    with PROCESSING_TIME.time(operation=f"batch_{batch_id}"):
        try:
            rows = extract_and_process(batch_id)
            ROWS_PROCESSED.labels(source="kafka", stage=stage).inc(rows)
            return rows
        except Exception as e:
            PIPELINE_ERRORS.labels(stage=stage, error_type=type(e).__name__).inc()
            raise

# Periodic gauge update
import psutil
def update_memory():
    process = psutil.Process()
    MEMORY_USAGE.set(process.memory_info().rss)

Custom Collector

python
from prometheus_client import CollectorRegistry, Gauge

registry = CollectorRegistry()

# Custom gauge that computes on demand
queue_size = Gauge(
    'kafka_queue_size',
    'Number of messages in queue',
    registry=registry
)

def collect_queue_size():
    size = kafka_consumer.metrics()['fetch-metrics']['records-lag-max']
    queue_size.set(size)

# Register with push gateway or scrape

Integration with Orchestration

Prefect Built-in Observability

Prefect automatically records:

  • Task run status (success/failure)
  • Duration
  • Retry counts
  • Parameters

Enable Prefect Cloud/Server for UI:

bash
prefect cloud login  # or prefect server start
prefect agent start -q 'default'

Dagster Observability

Dagster Dagit UI shows:

  • Asset materialization history
  • Run duration and status
  • Asset lineage graph
  • Resource usage

Enable metrics:

python
from dagster import DagsterMetric

@asset
def monitored_asset():
    # Dagster automatically records metrics
    pass

Dashboards & Alerting

Grafana Dashboard Example

Create dashboard with panels:

  • Throughput: rate(etl_rows_processed_total[5m])
  • Latency: histogram_quantile(0.95, etl_processing_seconds_bucket)
  • Error Rate: rate(etl_errors_total[5m])
  • Memory: etl_memory_bytes / 1024 / 1024

Alert Rules (Prometheus Alertmanager)

yaml
groups:
  - name: etl-alerts
    rules:
      - alert: HighErrorRate
        expr: rate(etl_errors_total[5m]) > 0.1
        for: 2m
        labels:
          severity: critical
        annotations:
          summary: "ETL error rate elevated"
          description: "{{ $labels.stage }} stage error rate: {{ $value }} errors/sec"

Best Practices

Instrumentation

  1. Span every pipeline stage - extract, transform, load, validate
  2. Add attributes - dataset names, row counts, file paths
  3. Propagate context across async boundaries (threads, processes, network)
  4. Record errors in spans with span.record_exception()
  5. Sample judiciously - 100% in dev, lower in prod (sampling policy)

Metrics

  1. Use counters for events (rows processed, errors)
  2. Use histograms for durations (processing time, latency)
  3. Use gauges for state (queue size, memory usage)
  4. Label dimensions (stage, source, status) but avoid cardinality explosion
  5. Export endpoint on separate port (8000) outside app port

Production

  1. Centralized logs - send structured logs to ELK/Datadog
  2. Correlation IDs - Include trace IDs in log entries
  3. Alert on SLA breaches - latency > threshold, error rate > X%
  4. Test observability - Simulate failures, verify traces/metrics
  5. Document schema - Define metric names and label values in README

References