AgentSkillsCN

initialize-challenge-scenario

通过从S3摄取遥测数据并构建基础转换管道,启动Valentine Data Challenge场景。这一设置将搭建起一个初步的工作流,后续可通过完善的数据质量保障措施进一步优化。

SKILL.md
--- frontmatter
name: initialize-challenge-scenario
description: "Initialize the Valentine Data Challenge scenario by ingesting telemetry data from S3 and creating a basic transformation pipeline. This sets up a naive workflow that will later be improved with proper data quality safeguards."
allowed-tools:
  - Read
  - Write
  - Glob
  - Grep
  - Bash
  - WebFetch(domain:docs.bauplanlabs.com)

Initialize Challenge Scenario

This skill sets up the initial data pipeline for the Valentine Data Challenge. It creates a basic ingestion and transformation workflow without advanced safety mechanisms, demonstrating a scenario that many data teams encounter.

Overview

This skill performs three main steps:

  1. Step A: Ingest raw telemetry data from S3 into telemetry.signal_bronze on a staging branch
  2. Step B: Create a transformation pipeline that materializes telemetry.signal from the bronze table
  3. Step C: Merge the staging branch into main

CRITICAL: This is a Naive Workflow

This workflow intentionally lacks:

  • Comprehensive data quality checks
  • WAP (Write-Audit-Publish) safety patterns
  • Validation gates before merging to production

These will be added in later iterations to demonstrate how Bauplan's transactional branches enable safe data operations.

Prerequisites

Before running this skill:

  1. Verify you have access to Bauplan and are authenticated
  2. Confirm you're starting from the main branch
  3. Ensure the namespace telemetry doesn't already exist (fresh start)

Command Execution Priority:

  • Prefer using available Bauplan tools for all operations
  • CLI commands (bauplan) shown in this skill are secondary
  • If CLI commands fail, use uvx bauplan as fallback (runs without the CLI globally installed)

Step A: Ingest Data from S3

Create a Python script that ingests data from S3 into a bronze table on a staging branch.

Data Source Details

  • S3 Bucket: bauplan-alpha-user-import-uploads
  • S3 Path: user-uploads/case-study-intella/telemetry/raw/
  • Target Table: telemetry.signal_bronze
  • Target Namespace: telemetry

Script Template

Create lakehouse_workflow/ingest_bronze.py:

python
"""
Ingest raw telemetry data from S3 into telemetry.signal_bronze.
This is a simple ingestion script without comprehensive quality checks.
"""
import bauplan
from datetime import datetime

def ingest_bronze_telemetry():
    """Ingest telemetry data from S3 into bronze table on staging branch."""
    client = bauplan.Client()

    # Get username for branch naming
    info = client.info()
    username = info.user.username
    branch_name = f"{username}.telemetry_staging"

    # S3 configuration
    s3_bucket = "bauplan-alpha-user-import-uploads"
    s3_path = f"s3://{s3_bucket}/user-uploads/case-study-intella/telemetry/raw/"
    table_name = "signal_bronze"
    namespace = "telemetry"

    try:
        # Create staging branch from main
        if client.has_branch(branch_name):
            print(f"Branch '{branch_name}' already exists, using existing branch")
        else:
            client.create_branch(branch_name, from_ref="main")
            print(f"Created staging branch: {branch_name}")

        # Create namespace if it doesn't exist
        if not client.has_namespace(namespace=namespace, branch=branch_name):
            client.create_namespace(namespace=namespace, branch=branch_name)
            print(f"Created namespace: {namespace}")

        # Create table and import data
        if client.has_table(table=table_name, ref=branch_name, namespace=namespace):
            print(f"Table {namespace}.{table_name} already exists, importing additional data")
            client.import_data(
                table=table_name,
                search_uri=s3_path,
                namespace=namespace,
                branch=branch_name
            )
        else:
            # Create table (schema inferred from S3)
            client.create_table(
                table=table_name,
                search_uri=s3_path,
                namespace=namespace,
                branch=branch_name
            )
            print(f"Created table: {namespace}.{table_name}")

            # Import data
            client.import_data(
                table=table_name,
                search_uri=s3_path,
                namespace=namespace,
                branch=branch_name
            )

        # Basic check: verify data exists
        result = client.query(
            query=f"SELECT COUNT(*) as row_count FROM {namespace}.{table_name}",
            ref=branch_name
        )
        row_count = result.column("row_count")[0].as_py()
        print(f"Imported {row_count} rows into {namespace}.{table_name}")

        print(f"\nStaging branch ready: {branch_name}")
        return branch_name

    except Exception as e:
        print(f"Ingestion failed: {e}")
        raise

if __name__ == "__main__":
    branch = ingest_bronze_telemetry()
    print(f"Next: Create transformation pipeline and run on branch '{branch}'")

Step B: Create Transformation Pipeline

Create a Bauplan pipeline project that transforms the bronze data into the production table.

Pipeline Details

  • Project Name: ingest-bronze-telemetry
  • Source Table: telemetry.signal_bronze
  • Output Table: telemetry.signal
  • Transformation: Simple SELECT (pass-through for now)

Directory Structure

code
challenged_pipeline/
  bauplan_project.yml
  models.py

bauplan_project.yml

yaml
project:
  id: <generate-unique-uuid>
  name: ingest-bronze-telemetry

models.py

python
import bauplan

@bauplan.model(
    materialization_strategy='REPLACE',
    columns=['signal', 'dateTime', 'value']
)
@bauplan.python('3.11', pip={'polars': '1.15.0'})
def signal(bronze_data=bauplan.Model('telemetry.signal_bronze')):
    """
    Transform bronze telemetry data into production signal table.
    Currently a pass-through transformation.

    | signal      | dateTime            | value  |
    |-------------|---------------------|--------|
    | temp_sensor | 2024-01-01 00:00:00 | 23.5   |
    | gyro_x      | 2024-01-01 00:00:01 | 0.012  |
    """
    import polars as pl

    # Convert from Arrow to Polars
    df = pl.from_arrow(bronze_data)

    # Simple pass-through for now
    # (Later iterations will add proper transformations and validations)

    return df.to_arrow()

Run the Pipeline

After creating the pipeline files:

  1. Navigate to the pipeline directory: cd challenged_pipeline
  2. Verify you're on the staging branch: bauplan branch checkout <username>.telemetry_staging
  3. Dry run the pipeline: bauplan run --dry-run
  4. Run the pipeline: bauplan run
  5. Verify the output: bauplan query "SELECT COUNT(*) FROM telemetry.signal"

Step C: Merge to Main

After the pipeline runs successfully on the staging branch, merge to main:

bash
# Checkout main branch
bauplan branch checkout main

# Merge staging branch
bauplan branch merge <username>.telemetry_staging

# Verify tables exist on main
bauplan table get telemetry.signal_bronze
bauplan table get telemetry.signal

Workflow Checklist

Track your progress through the initialization:

  • Step A.1: Create lakehouse_workflow/ingest_bronze.py
  • Step A.2: Run ingestion script: python lakehouse_workflow/ingest_bronze.py
  • Step A.3: Verify staging branch created
  • Step B.1: Create challenged_pipeline/ directory
  • Step B.2: Generate UUID and create bauplan_project.yml
  • Step B.3: Create models.py with signal transformation
  • Step B.4: Checkout staging branch
  • Step B.5: Run pipeline dry-run
  • Step B.6: Run pipeline
  • Step B.7: Verify output table exists
  • Step C.1: Checkout main branch
  • Step C.2: Merge staging branch to main
  • Step C.3: Verify tables on main

Expected Outcome

After completing all steps, you will have:

  1. A staging branch with ingested telemetry data
  2. Two tables in the telemetry namespace:
    • telemetry.signal_bronze - raw data from S3
    • telemetry.signal - transformed data
  3. Both tables merged into the main branch
  4. A working pipeline in challenged_pipeline/

What's Missing (By Design)

This naive workflow intentionally omits:

  • Data quality validation before materialization
  • Schema enforcement
  • Null checks
  • Uniqueness constraints
  • Proper error handling and rollback

These gaps will be addressed in subsequent iterations, demonstrating how Bauplan's transactional branches enable safe iteration on data quality improvements.