Initialize Challenge Scenario
This skill sets up the initial data pipeline for the Valentine Data Challenge. It creates a basic ingestion and transformation workflow without advanced safety mechanisms, demonstrating a scenario that many data teams encounter.
Overview
This skill performs three main steps:
- •Step A: Ingest raw telemetry data from S3 into
telemetry.signal_bronzeon a staging branch - •Step B: Create a transformation pipeline that materializes
telemetry.signalfrom the bronze table - •Step C: Merge the staging branch into main
CRITICAL: This is a Naive Workflow
This workflow intentionally lacks:
- •Comprehensive data quality checks
- •WAP (Write-Audit-Publish) safety patterns
- •Validation gates before merging to production
These will be added in later iterations to demonstrate how Bauplan's transactional branches enable safe data operations.
Prerequisites
Before running this skill:
- •Verify you have access to Bauplan and are authenticated
- •Confirm you're starting from the
mainbranch - •Ensure the namespace
telemetrydoesn't already exist (fresh start)
Command Execution Priority:
- •Prefer using available Bauplan tools for all operations
- •CLI commands (
bauplan) shown in this skill are secondary - •If CLI commands fail, use
uvx bauplanas fallback (runs without the CLI globally installed)
Step A: Ingest Data from S3
Create a Python script that ingests data from S3 into a bronze table on a staging branch.
Data Source Details
- •S3 Bucket:
bauplan-alpha-user-import-uploads - •S3 Path:
user-uploads/case-study-intella/telemetry/raw/ - •Target Table:
telemetry.signal_bronze - •Target Namespace:
telemetry
Script Template
Create lakehouse_workflow/ingest_bronze.py:
"""
Ingest raw telemetry data from S3 into telemetry.signal_bronze.
This is a simple ingestion script without comprehensive quality checks.
"""
import bauplan
from datetime import datetime
def ingest_bronze_telemetry():
"""Ingest telemetry data from S3 into bronze table on staging branch."""
client = bauplan.Client()
# Get username for branch naming
info = client.info()
username = info.user.username
branch_name = f"{username}.telemetry_staging"
# S3 configuration
s3_bucket = "bauplan-alpha-user-import-uploads"
s3_path = f"s3://{s3_bucket}/user-uploads/case-study-intella/telemetry/raw/"
table_name = "signal_bronze"
namespace = "telemetry"
try:
# Create staging branch from main
if client.has_branch(branch_name):
print(f"Branch '{branch_name}' already exists, using existing branch")
else:
client.create_branch(branch_name, from_ref="main")
print(f"Created staging branch: {branch_name}")
# Create namespace if it doesn't exist
if not client.has_namespace(namespace=namespace, branch=branch_name):
client.create_namespace(namespace=namespace, branch=branch_name)
print(f"Created namespace: {namespace}")
# Create table and import data
if client.has_table(table=table_name, ref=branch_name, namespace=namespace):
print(f"Table {namespace}.{table_name} already exists, importing additional data")
client.import_data(
table=table_name,
search_uri=s3_path,
namespace=namespace,
branch=branch_name
)
else:
# Create table (schema inferred from S3)
client.create_table(
table=table_name,
search_uri=s3_path,
namespace=namespace,
branch=branch_name
)
print(f"Created table: {namespace}.{table_name}")
# Import data
client.import_data(
table=table_name,
search_uri=s3_path,
namespace=namespace,
branch=branch_name
)
# Basic check: verify data exists
result = client.query(
query=f"SELECT COUNT(*) as row_count FROM {namespace}.{table_name}",
ref=branch_name
)
row_count = result.column("row_count")[0].as_py()
print(f"Imported {row_count} rows into {namespace}.{table_name}")
print(f"\nStaging branch ready: {branch_name}")
return branch_name
except Exception as e:
print(f"Ingestion failed: {e}")
raise
if __name__ == "__main__":
branch = ingest_bronze_telemetry()
print(f"Next: Create transformation pipeline and run on branch '{branch}'")
Step B: Create Transformation Pipeline
Create a Bauplan pipeline project that transforms the bronze data into the production table.
Pipeline Details
- •Project Name:
ingest-bronze-telemetry - •Source Table:
telemetry.signal_bronze - •Output Table:
telemetry.signal - •Transformation: Simple SELECT (pass-through for now)
Directory Structure
challenged_pipeline/ bauplan_project.yml models.py
bauplan_project.yml
project: id: <generate-unique-uuid> name: ingest-bronze-telemetry
models.py
import bauplan
@bauplan.model(
materialization_strategy='REPLACE',
columns=['signal', 'dateTime', 'value']
)
@bauplan.python('3.11', pip={'polars': '1.15.0'})
def signal(bronze_data=bauplan.Model('telemetry.signal_bronze')):
"""
Transform bronze telemetry data into production signal table.
Currently a pass-through transformation.
| signal | dateTime | value |
|-------------|---------------------|--------|
| temp_sensor | 2024-01-01 00:00:00 | 23.5 |
| gyro_x | 2024-01-01 00:00:01 | 0.012 |
"""
import polars as pl
# Convert from Arrow to Polars
df = pl.from_arrow(bronze_data)
# Simple pass-through for now
# (Later iterations will add proper transformations and validations)
return df.to_arrow()
Run the Pipeline
After creating the pipeline files:
- •Navigate to the pipeline directory:
cd challenged_pipeline - •Verify you're on the staging branch:
bauplan branch checkout <username>.telemetry_staging - •Dry run the pipeline:
bauplan run --dry-run - •Run the pipeline:
bauplan run - •Verify the output:
bauplan query "SELECT COUNT(*) FROM telemetry.signal"
Step C: Merge to Main
After the pipeline runs successfully on the staging branch, merge to main:
# Checkout main branch bauplan branch checkout main # Merge staging branch bauplan branch merge <username>.telemetry_staging # Verify tables exist on main bauplan table get telemetry.signal_bronze bauplan table get telemetry.signal
Workflow Checklist
Track your progress through the initialization:
- • Step A.1: Create
lakehouse_workflow/ingest_bronze.py - • Step A.2: Run ingestion script:
python lakehouse_workflow/ingest_bronze.py - • Step A.3: Verify staging branch created
- • Step B.1: Create
challenged_pipeline/directory - • Step B.2: Generate UUID and create
bauplan_project.yml - • Step B.3: Create
models.pywith signal transformation - • Step B.4: Checkout staging branch
- • Step B.5: Run pipeline dry-run
- • Step B.6: Run pipeline
- • Step B.7: Verify output table exists
- • Step C.1: Checkout main branch
- • Step C.2: Merge staging branch to main
- • Step C.3: Verify tables on main
Expected Outcome
After completing all steps, you will have:
- •A staging branch with ingested telemetry data
- •Two tables in the
telemetrynamespace:- •
telemetry.signal_bronze- raw data from S3 - •
telemetry.signal- transformed data
- •
- •Both tables merged into the
mainbranch - •A working pipeline in
challenged_pipeline/
What's Missing (By Design)
This naive workflow intentionally omits:
- •Data quality validation before materialization
- •Schema enforcement
- •Null checks
- •Uniqueness constraints
- •Proper error handling and rollback
These gaps will be addressed in subsequent iterations, demonstrating how Bauplan's transactional branches enable safe iteration on data quality improvements.