Add Data Lake Consumer
Event consumer that subscribes to RabbitMQ and writes to Azure Data Lake (Parquet). Pattern: logging’s DataLakeCollector for risk.evaluated (BI_SALES_RISK_IMPLEMENTATION_PLAN §3.5, §9.1). BI Sales Risk: Paths and Parquet columns MUST match documentation/requirements/BI_SALES_RISK_DATA_LAKE_LAYOUT.md (§2.1 risk.evaluated, §2.2 ml_outcomes, §4 config).
1. Consumer
Path: src/events/consumers/[Name]DataLakeCollector.ts
- •
EventConsumerwithqueue,exchange: coder_events,bindings: e.g.['risk.evaluated','ml.prediction.completed','opportunity.updated','forecast.generated']. - •Handler: map event to row. For risk.evaluated use columns in Data Lake Layout §2.1. Build path:
{path_prefix}/year={YYYY}/month={MM}/day={DD}/...(Layout §1). - •Write via
@azure/storage-blob(BlockBlob) or@azure/storage-blob+parquetjs(or Arrow) for Parquet. Buffer/batch by time or count if needed. - •Config:
data_lake.connection_string,data_lake.container,data_lake.path_prefix(e.g./risk_evaluations).
2. Config
config/default.yaml:
data_lake:
connection_string: ${DATA_LAKE_CONNECTION_STRING}
container: ${DATA_LAKE_CONTAINER:-risk}
path_prefix: ${DATA_LAKE_PATH_PREFIX:-/risk_evaluations}
rabbitmq:
url: ${RABBITMQ_URL}
exchange: coder_events
queue: [module]_data_lake
bindings:
- risk.evaluated
- ml.prediction.completed
# ...
config/schema.json: add data_lake with connection_string, container, path_prefix.
3. Server
In server.ts: await dataLakeCollector.start() after RabbitMQ connect.
4. Checklist
- • Consumer in
events/consumers/, subscribe to RabbitMQ (no Azure Service Bus) - • Path:
{path_prefix}/year=.../month=.../day=.../; format Parquet - • Config:
data_lake.*and schema;rabbitmqqueue and bindings - • Start collector in server