AgentSkillsCN

flow-management

在构建数据集、运行多步骤流水线、管理依赖关系,或统筹配方执行顺序时使用。

SKILL.md
--- frontmatter
name: flow-management
description: "Use when building datasets, running multi-step pipelines, managing dependencies, or orchestrating recipe execution order"

Flow Management Patterns

Reference patterns for building and orchestrating Dataiku flows via the Python API.

When to Use This Skill

  • Building datasets that depend on upstream datasets
  • Running multiple recipes in the correct order
  • Creating multi-step pipelines (e.g., aggregate then join then train)
  • Checking job status and handling failures mid-pipeline

Build a Single Dataset

python
recipe = project.get_recipe("my_recipe")
job = recipe.run(no_fail=True)
status = job.get_status()
state = status.get("baseStatus", {}).get("state")  # "DONE" or "FAILED"

recipe.run() already waits for completion. Use no_fail=True to prevent exceptions on failure.

Build Multiple Datasets in Dependency Order

When downstream datasets depend on upstream ones, build them sequentially:

python
def build_recipe(project, recipe_name):
    """Build a recipe and return success status."""
    print(f"Building {recipe_name}...")
    recipe = project.get_recipe(recipe_name)
    job = recipe.run(no_fail=True)
    status = job.get_status()
    state = status.get("baseStatus", {}).get("state")

    if state == "DONE":
        print(f"  {recipe_name}: success")
        return True
    else:
        # Extract error details
        activities = status.get("baseStatus", {}).get("activities", {})
        for name, info in activities.items():
            if info.get("firstFailure"):
                print(f"  {recipe_name} error: {info['firstFailure'].get('message')}")
        return False


# Build in dependency order: upstream first, then downstream
pipeline = [
    "group_LAB_RESULTS_AGG",       # Step 1: aggregate
    "group_CLINICAL_NOTES_AGG",    # Step 2: aggregate (independent of step 1)
    "join_ML_TRAINING_DATA",       # Step 3: join (depends on steps 1 & 2)
]

for recipe_name in pipeline:
    success = build_recipe(project, recipe_name)
    if not success:
        print(f"Pipeline failed at {recipe_name}. Fix and retry.")
        break

Build Independent Recipes in Parallel

For recipes with no dependency between them, you can build the output datasets directly:

python
# These two aggregations are independent — build them before the join
ds1 = project.get_dataset("LAB_RESULTS_AGG")
ds2 = project.get_dataset("CLINICAL_NOTES_AGG")

# Build both (sequentially via API, but could overlap in Dataiku)
job1 = project.get_recipe("group_LAB_RESULTS_AGG").run(no_fail=True)
job2 = project.get_recipe("group_CLINICAL_NOTES_AGG").run(no_fail=True)

# Then build the dependent join
job3 = project.get_recipe("join_ML_TRAINING_DATA").run(no_fail=True)

Check What Exists Before Creating

Before creating recipes or datasets, check if they already exist to make scripts idempotent:

python
existing_datasets = [d.get("name") for d in project.list_datasets()]
existing_recipes = [r.get("name") for r in project.list_recipes()]

if "MY_OUTPUT" not in existing_datasets:
    # Create the dataset...
    pass

if "my_recipe" not in existing_recipes:
    # Create the recipe...
    pass

Verify Pipeline Results

After building a pipeline, verify the final output:

python
ds = project.get_dataset("ML_TRAINING_DATA")
schema = ds.get_settings().get_raw().get("schema", {}).get("columns", [])

print(f"Output has {len(schema)} columns:")
for col in schema:
    print(f"  - {col['name']} ({col.get('type', 'unknown')})")

Common Pipeline Patterns

PatternStepsKey Concern
Aggregate + JoinGroup inputs → Join aggregated outputsBuild aggregations before the join
Clean + TransformPrepare recipe → Group/JoinSchema updates after each step
ETL to WarehousePrepare → Sync to SQL connectionSet SQL schema before sync
ML PipelinePrep → Aggregate → Join → TrainFull dependency chain, verify schema at each step

Handling Failures Mid-Pipeline

python
for recipe_name in pipeline:
    success = build_recipe(project, recipe_name)
    if not success:
        # Get detailed error info
        jobs = project.list_jobs()
        job = project.get_job(jobs[0]['def']['id'])
        print(job.get_log()[-2000:])  # Last 2000 chars of log
        break

See skills/troubleshooting/ for detailed error diagnosis patterns.

Detailed References

Related Skills