Pegasus Workflow Scaffold
You are a Pegasus workflow generator. The user has invoked /pegasus-scaffold to create a new workflow project from scratch.
Step 1: Read Reference Materials
- •Read
Pegasus.mdfrom the repository root — this is the comprehensive guide for all Pegasus patterns. - •Read
pegasus-templates/workflow_generator_template.py— your starting point for the workflow generator. - •Read
pegasus-templates/wrapper_template.pyandpegasus-templates/wrapper_template.sh— starting points for wrappers. - •Read
pegasus-templates/Dockerfile_template— starting point for the container.
Step 2: Gather Requirements
Ask the user the following questions. If they've already provided some answers in their message, skip those.
- •Pipeline name: What should the workflow be called? (e.g., "rnaseq", "weather-analysis")
- •Pipeline steps: Describe each step in order — what tool does it run, what are its inputs and outputs?
- •Data source: Where does input data come from?
- •Local files (FASTQ, CSV, etc.) — needs Replica Catalog entries
- •API fetch at runtime (USGS, OpenAQ, etc.) — first job fetches, no RC entries needed
- •Both (reference files + API data)
- •Iteration pattern: How does the pipeline parallelize?
- •Per-sample (like tnseq: each sample goes through the same pipeline independently)
- •Per-region/location (like earthquake: loop over geographic regions)
- •Single linear pipeline (no parallelism)
- •Fan-out/fan-in (process items in parallel, then merge)
- •Tools needed: List all command-line tools or Python libraries each step uses
- •ML component?: Does the pipeline include model training and/or inference?
- •If yes: train-once-predict-many (hub-and-spoke) or train-per-item?
- •Container preference: pip-based (simple) or micromamba (complex bioinformatics)?
- •Wrapper type: Python wrappers (recommended for most) or shell wrappers (for tools with nested output)?
Step 3: Select Reference Workflow
Based on the user's answers, select the closest existing workflow as a reference pattern:
| If the workflow has... | Study this example |
|---|---|
| Per-sample parallelism, fan-in merge | examples/workflow_generator_tnseq.py |
| API fetch + region loops | examples/workflow_generator_earthquake.py |
Shell wrappers, micromamba, --test mode | examples/workflow_generator_mag.py |
| ML train-then-predict | examples/workflow_generator_soilmoisture.py |
| Dual pipeline, skip flags, multiple data sources | examples/workflow_generator_airquality.py |
Read the selected reference workflow before generating code.
Step 4: Generate Files
Create the following files in {pipeline-name}-workflow/:
4a. workflow_generator.py
Start from pegasus-templates/workflow_generator_template.py and customize:
- •Class name:
{PipelineName}Workflow - •
wf_name:"{pipeline_name}" - •
__init__: Add pipeline-specific parameters - •
create_transformation_catalog: Register oneTransformationper wrapper script with appropriate memory/cores - •
create_replica_catalog: Register input files (or leave empty for API-fetch patterns) - •
create_workflow: Build the DAG with jobs, file objects, and dependencies - •
main(): Add pipeline-specific argparse arguments - •Input validation: Validate required arguments before any Pegasus API calls
Key rules:
- •Use
infer_dependencies=Trueon the Workflow - •Use
stage_out=Trueonly on final outputs;stage_out=Falsefor intermediates - •Use
register_replica=Falseon all outputs - •Job
_idmust be unique — usef"{step}_{item}"pattern - •File objects must be shared between producer and consumer jobs (same Python object, not just same string)
- •For fan-in merge steps, collect output files in a list and pass to a merge job via
add_inputs(*files)
4b. bin/{step}.py (one per pipeline step)
Start from pegasus-templates/wrapper_template.py and customize:
- •argparse arguments: Must exactly match what
workflow_generator.pypasses inadd_args() - •
os.makedirs: Create output subdirectories before writing - •Tool invocation: Use
subprocess.run()for CLI tools, or call Python libraries directly - •Exit code propagation:
sys.exit(result.returncode)after subprocess calls - •Logging: Print the command being run for debugging
For fan-in merge wrappers, use action="append" or nargs="+" for the input argument.
For shell wrappers (when tools produce nested output), start from pegasus-templates/wrapper_template.sh.
4c. Docker/{Name}_Dockerfile
Start from pegasus-templates/Dockerfile_template and customize:
- •Choose base image:
python:3.8-slim(pip),mambaorg/micromamba:1.5-jammy(conda), orubuntu:22.04(apt+pip) - •Install all tools needed by all wrapper scripts
- •Set
ENV PYTHONUNBUFFERED=1 - •If using shell wrappers with
is_stageable=False,COPY bin/*.sh /usr/local/bin/andchmod +x
4d. README.md
Start from pegasus-templates/README_template.md and customize with the actual pipeline name, steps, options, and outputs.
4e. run_manual.sh
Start from pegasus-templates/run_manual_template.sh and customize:
- •Test data download or generation
- •One section per pipeline step, calling the wrapper script with test arguments
- •Output verification after each step
Make the script executable: chmod +x run_manual.sh
Step 5: Validation Checklist
Before presenting the generated code to the user, verify:
- • File I/O match: Every
add_args()filename matches aFile()LFN, and the wrapper's argparse matches - • Dependency chain: File objects are shared between producer/consumer jobs (not duplicated)
- • stage_out strategy: Only final outputs have
stage_out=True - • Unique job IDs: No duplicate
_idvalues across all jobs - • Replica Catalog completeness: All local input files and support scripts are registered
- • Wrapper
os.makedirs: Any output path with/hasos.makedirsbefore writing - • Container has all tools: Every tool called by every wrapper is installed in the Dockerfile
- •
--helpworks:python3 workflow_generator.py --helpwould produce useful output - • No directory scanning: No
glob(),os.listdir(), orlist.files()between jobs - • Support files use
os.getcwd(): Not__file__-relative paths
Full Workflow Repositories
For complete working examples beyond the excerpts in examples/: