AgentSkillsCN

dagster

借助Dagster进行资产驱动的编排——定义数据资产、传感器与调度,以实现管道管理。

SKILL.md
--- frontmatter
name: dagster
description: >
  Asset-based orchestration with Dagster — define data assets, sensors,
  and schedules for pipeline management.
metadata:
  openclaw:
    requires:
      bins: [clawdata]
    primaryEnv: DAGSTER_HOME
    tags: [orchestration, dagster, assets, pipeline, scheduler]

Dagster

Alternative orchestrator skill using Dagster for asset-based pipeline management.

Commands

TaskCommand
Start Dagster UIclawdata dagster dev
Run all assetsclawdata dagster materialize --all
Run specific assetclawdata dagster materialize <asset>
Check asset statusclawdata dagster status
Run sensor checkclawdata dagster sensor tick <name>

Asset Definitions

python
# assets.py
from dagster import asset, AssetExecutionContext
import duckdb

@asset(group_name="bronze")
def raw_customers(context: AssetExecutionContext):
    """Ingest customer CSV into DuckDB."""
    con = duckdb.connect("data/warehouse.duckdb")
    con.execute("CREATE OR REPLACE TABLE raw_customers AS SELECT * FROM read_csv_auto('data/sample/sample_customers.csv')")
    context.log.info("Loaded raw_customers")

@asset(group_name="silver", deps=[raw_customers])
def slv_customers(context: AssetExecutionContext):
    """Clean and deduplicate customers."""
    con = duckdb.connect("data/warehouse.duckdb")
    con.execute("CREATE OR REPLACE TABLE slv_customers AS SELECT DISTINCT * FROM raw_customers")
    context.log.info("Created slv_customers")

When to use

  • Team prefers asset-based orchestration over DAG-based → Dagster
  • Need software-defined assets with automatic lineage → Dagster
  • Want built-in asset catalog and observability → Dagster UI

Integration

Dagster works alongside dbt via dagster-dbt:

python
from dagster_dbt import DbtCliResource, dbt_assets

@dbt_assets(manifest=dbt_manifest_path)
def clawdata_dbt_assets(context, dbt: DbtCliResource):
    yield from dbt.cli(["build"], context=context).stream()