AgentSkillsCN

sling-pipelines

调试并修复 Sling 的连接、复制与 API 问题。当您遇到错误、排查连接失败、修复身份验证问题、解决类型转换难题、处理内存/性能问题,或调试 API 速率限制时,可使用此技能。

SKILL.md
--- frontmatter
name: sling-pipelines
description: >
  Create multi-step data workflows with Sling pipelines.
  Use when orchestrating multiple operations, running tasks before/after replications, or building complex data workflows with conditionals and loops.

Pipelines

Pipelines are multi-step workflows that execute a sequence of tasks with control flow.

When to Use Pipelines

  • Run tasks before/after replications
  • Chain multiple replications
  • Implement data validation
  • Send notifications
  • Manage file operations

Basic Structure

yaml
env:
  MY_VAR: "value"

steps:
  - type: log
    message: "Starting pipeline"

  - type: replication
    path: /path/to/replication.yaml
    id: main_repl

  - type: query
    connection: MY_DB
    query: "SELECT COUNT(*) FROM table"
    if: state.main_repl.status == "success"

MCP Operations

Parse

json
{"action": "parse", "input": {"file_path": "/path/to/pipeline.yaml"}}

Run

json
{
  "action": "run",
  "input": {
    "file_path": "/path/to/pipeline.yaml",
    "env": {"MY_VAR": "override"}
  }
}

Step Types

TypeDescription
logOutput messages
runSimple data transfer
replicationRun replication file
queryExecute SQL
httpHTTP requests
commandShell commands
checkValidate conditions
copyCopy files
deleteDelete files
writeWrite to files
readRead file contents
listList files
storeStore values
groupGroup steps, enable looping

Common Steps

log

yaml
- type: log
  level: info  # debug, info, warn, error
  message: "Processing {env.MY_VAR}"

run (simple transfer)

yaml
- type: run
  source: "MY_POSTGRES.public.users"
  target: "file://users.csv"
  mode: "full-refresh"

replication

yaml
- type: replication
  path: /path/to/replication.yaml
  select_streams: ["users", "orders"]
  mode: "incremental"
  id: my_repl

query

yaml
- type: query
  connection: MY_POSTGRES
  query: |
    SELECT COUNT(*) as cnt FROM users
    WHERE created_at > NOW() - INTERVAL '1 day'
  into: result
  id: count_query

http

yaml
- type: http
  url: "https://api.example.com/webhook"
  method: POST
  headers:
    Authorization: "Bearer {env.API_TOKEN}"
  payload: |
    {"status": "{state.my_repl.status}"}
  into: response

command

yaml
- type: command
  command: "python validate.py {store.file_path}"
  working_dir: "/scripts"
  into: output

check

yaml
- type: check
  check: "state.count_query.result[0].cnt > 0"
  failure_message: "No records found"
  on_failure: abort

copy

yaml
- type: copy
  from: "file://data/today/"
  to: "s3://bucket/archive/{YYYY}/{MM}/{DD}/"
  recursive: true

store

yaml
- type: store
  key: my_value
  value: "something"
# Later: {store.my_value}

group (with loop)

yaml
- type: group
  loop: ["users", "products", "orders"]
  steps:
    - type: log
      message: "Processing: {loop.value}"
    - type: run
      source: "POSTGRES.public.{loop.value}"
      target: "SNOWFLAKE.staging.{loop.value}"

Control Flow

Conditional Execution

yaml
- type: replication
  path: main.yaml
  id: main_job

- type: http
  url: "https://slack.com/webhook"
  payload: '{"text": "Success!"}'
  if: state.main_job.status == "success"

- type: http
  url: "https://slack.com/webhook"
  payload: '{"text": "Failed!"}'
  if: state.main_job.status == "error"

Looping

yaml
- type: list
  location: "s3://bucket/data/"
  id: files

- type: group
  loop: state.files.result
  steps:
    - type: log
      message: "Processing: {loop.value.name}"
    - type: run
      source: "s3://bucket/{loop.value.path}"
      target: "POSTGRES.staging.imported"

Error Handling

OptionBehavior
abortStop pipeline (default)
warnLog warning, continue
quietSilent, continue
skipSkip step, continue
breakExit current group only
yaml
- type: delete
  location: "file:///tmp/old/"
  on_failure: warn  # Don't fail if files don't exist

Variables

VariableDescription
{env.VAR}Environment variables
{store.key}Stored values
{state.id.*}Step results by ID
{timestamp.date}Current date
{loop.value}Current loop item
{loop.index}Loop iteration index

Complete Example

yaml
env:
  SLACK_WEBHOOK: "${SLACK_WEBHOOK_URL}"

steps:
  - type: log
    message: "Starting data pipeline"

  - type: replication
    path: replications/main.yaml
    id: main_sync

  - type: check
    check: state.main_sync.status == "success"
    on_failure: break

  - type: query
    connection: TARGET_DB
    query: "CALL refresh_materialized_views()"

  - type: http
    url: "{env.SLACK_WEBHOOK}"
    method: POST
    payload: |
      {"text": "Pipeline completed: {state.main_sync.total_rows} rows"}

  - type: log
    message: "Pipeline finished"

Full Documentation

See https://docs.slingdata.io/concepts/pipeline.md for complete reference.