AgentSkillsCN

add-datalake-consumer

新增事件消费者,按照 BI_SALES_RISK 计划将数据写入 Azure Data Lake(Parquet)。创建事件/消费者/[名称]DataLakeCollector.ts,订阅 RabbitMQ,构建 Parquet 行,将数据写入 /path_prefix/year=YYYY/month=MM/day=DD/。适用于在日志记录或其他类似的“事件转 Data Lake”管道中添加 DataLakeCollector 时使用。

SKILL.md
--- frontmatter
name: add-datalake-consumer
description: Adds an event consumer that writes to Azure Data Lake (Parquet) following BI_SALES_RISK plan. Creates events/consumers/[Name]DataLakeCollector.ts subscribing to RabbitMQ, building Parquet rows, writing to /path_prefix/year=YYYY/month=MM/day=DD/. Use when adding DataLakeCollector in logging or similar “event to Data Lake” pipelines.

Add Data Lake Consumer

Event consumer that subscribes to RabbitMQ and writes to Azure Data Lake (Parquet). Pattern: logging’s DataLakeCollector for risk.evaluated (BI_SALES_RISK_IMPLEMENTATION_PLAN §3.5, §9.1). BI Sales Risk: Paths and Parquet columns MUST match documentation/requirements/BI_SALES_RISK_DATA_LAKE_LAYOUT.md (§2.1 risk.evaluated, §2.2 ml_outcomes, §4 config).

1. Consumer

Path: src/events/consumers/[Name]DataLakeCollector.ts

  • EventConsumer with queue, exchange: coder_events, bindings: e.g. ['risk.evaluated','ml.prediction.completed','opportunity.updated','forecast.generated'].
  • Handler: map event to row. For risk.evaluated use columns in Data Lake Layout §2.1. Build path: {path_prefix}/year={YYYY}/month={MM}/day={DD}/... (Layout §1).
  • Write via @azure/storage-blob (BlockBlob) or @azure/storage-blob + parquetjs (or Arrow) for Parquet. Buffer/batch by time or count if needed.
  • Config: data_lake.connection_string, data_lake.container, data_lake.path_prefix (e.g. /risk_evaluations).

2. Config

config/default.yaml:

yaml
data_lake:
  connection_string: ${DATA_LAKE_CONNECTION_STRING}
  container: ${DATA_LAKE_CONTAINER:-risk}
  path_prefix: ${DATA_LAKE_PATH_PREFIX:-/risk_evaluations}

rabbitmq:
  url: ${RABBITMQ_URL}
  exchange: coder_events
  queue: [module]_data_lake
  bindings:
    - risk.evaluated
    - ml.prediction.completed
    # ...

config/schema.json: add data_lake with connection_string, container, path_prefix.

3. Server

In server.ts: await dataLakeCollector.start() after RabbitMQ connect.

4. Checklist

  • Consumer in events/consumers/, subscribe to RabbitMQ (no Azure Service Bus)
  • Path: {path_prefix}/year=.../month=.../day=.../; format Parquet
  • Config: data_lake.* and schema; rabbitmq queue and bindings
  • Start collector in server