AgentSkillsCN

sling-hooks

为 Sling 的复制、管道与 API 规范提供数据转换函数。当您需要转换数据、应用字符串/日期/数值函数、进行类型转换、清理数据、计算字段,或使用 trim、upper、lower、date_format、cast、coalesce、hash 等表达式时,可使用此技能。

SKILL.md
--- frontmatter
name: sling-hooks
description: >
  Pre/post actions and hooks for Sling replications and pipelines.
  Use when adding actions before/after replications, executing SQL queries, sending HTTP webhooks, validating data with checks, running shell commands, copying/deleting files, or implementing notifications.

Hooks and Steps Reference

Hooks are custom actions that execute at specific points in replications. Steps are the same concept used in pipelines.

Hook Locations (Replications)

LocationWhenUse Case
startBefore any stream runsSetup, notifications
endAfter all streams completeCleanup, notifications
preBefore each streamValidation, setup
postAfter each streamLogging, notifications
pre_mergeBefore merge (in transaction)Session settings
post_mergeAfter merge (in transaction)Additional SQL

Configuration

Replication-Level Hooks

yaml
hooks:
  start:
    - type: log
      message: "Starting replication"
  end:
    - type: check
      check: execution.status.error == 0

defaults:
  ...
streams:
  ...

Stream-Level Hooks

yaml
streams:
  my_table:
    hooks:
      pre:
        - type: query
          connection: source
          query: "SELECT 1"
      post:
        - type: http
          url: "https://webhook.example.com"

Common Properties

PropertyDescription
typeHook type (required)
idIdentifier for referencing output
ifConditional execution
on_failureabort, warn, quiet, skip, break

Hook Types

log

yaml
- type: log
  level: info  # debug, info, warn, error
  message: "Processed {run.total_rows} rows"

query

yaml
- type: query
  connection: MY_DB
  query: |
    UPDATE status SET last_run = NOW()
    WHERE table_name = '{stream.table}'
  into: result  # Optional: store result
  id: update_status

http

yaml
- type: http
  url: "https://api.example.com/webhook"
  method: POST
  headers:
    Authorization: "Bearer {env.API_TOKEN}"
    Content-Type: "application/json"
  payload: |
    {
      "table": "{stream.table}",
      "rows": {run.total_rows},
      "status": "{run.status}"
    }
  into: response

check

yaml
- type: check
  check: "run.total_rows > 0"
  failure_message: "No rows processed"
  on_failure: abort
  vars:
    threshold: 100

command

yaml
- type: command
  command: "python validate.py {object.table}"
  working_dir: "/scripts"
  print: true
  into: output

copy

yaml
- type: copy
  from: "file://data/source.csv"
  to: "s3://bucket/archive/{YYYY}/{MM}/"
  recursive: true

delete

yaml
- type: delete
  location: "s3://bucket/temp/"
  recursive: true

write

yaml
- type: write
  to: "file://logs/run_{timestamp.file_name}.txt"
  content: |
    Run completed at {timestamp.datetime}
    Rows: {run.total_rows}
    Status: {run.status}

read

yaml
- type: read
  from: "file://config/settings.json"
  into: settings

list

yaml
- type: list
  location: "s3://bucket/data/"
  recursive: false
  only: files  # files or folders
  id: file_list

store

yaml
- type: store
  key: my_value
  value: "{run.total_rows}"

replication

yaml
- type: replication
  path: /path/to/other_replication.yaml
  select_streams: ["table1"]
  mode: incremental

group

yaml
- type: group
  loop: ["users", "orders", "products"]
  steps:
    - type: log
      message: "Processing {loop.value}"
    - type: query
      connection: DB
      query: "ANALYZE {loop.value}"

Variables

Variables use {variable.path} syntax within hook properties.

Debug: Print All Variables

yaml
- type: log
  message: '{runtime_state}'  # Prints all available variables as JSON

Variable Scopes

ScopeDescriptionExample
env.*Environment variables{env.API_TOKEN}
store.*Values from store hooks{store.my_key}
state.*Hook outputs by ID{state.my_query.result[0].id}
timestamp.*Date/time parts{timestamp.YYYY}-{timestamp.MM}
execution.*Replication-level info{execution.total_rows}
source.*Source connection{source.name}, {source.type}
target.*Target connection{target.database}
stream.*Current stream{stream.table}, {stream.file_name}
object.*Target object{object.full_name}
run.*Current run{run.total_rows}, {run.status}
runs.*All runs by ID{runs.my_stream.status}

Commonly Used Variables

yaml
# Timestamps
timestamp.date              # "2025-01-19"
timestamp.datetime          # "2025-01-19 08:27:31"
timestamp.YYYY / MM / DD    # Year, month, day parts
timestamp.file_name         # "2025_01_19_082731" (safe for filenames)
timestamp.unix              # 1737286051

# Run metrics
run.total_rows              # Rows processed
run.total_bytes             # Bytes processed
run.status                  # "success" or "error"
run.duration                # Seconds

# Execution totals
execution.total_rows        # All rows across streams
execution.status.error      # Count of failed streams
execution.status.success    # Count of successful streams

# Stream/Object names
stream.table / stream.schema      # Source table info
object.table / object.full_name   # Target table info

Error Handling

on_failure Options

OptionBehavior
abortStop immediately (default)
warnLog warning, continue
quietSilent, continue
skipSkip this hook, continue
breakExit current group/hook sequence

Check Pattern

yaml
hooks:
  end:
    # Stop if any errors
    - type: check
      check: execution.status.error == 0
      on_failure: break

    # Only run if check passed
    - type: http
      url: "https://webhook.example.com"
      payload: '{"status": "success"}'

Complete Example

yaml
hooks:
  start:
    - type: log
      message: "Starting replication at {timestamp.datetime}"

    - type: query
      connection: '{source.name}'
      query: "SELECT MAX(updated_at) as max_date FROM audit"
      into: audit_check
      id: pre_check

  end:
    - type: check
      check: execution.status.error == 0
      on_failure: break

    - type: http
      url: "{env.SLACK_WEBHOOK}"
      method: POST
      payload: |
        {
          "text": "Replicated {execution.total_rows} rows"
        }

defaults:
  hooks:
    pre:
      - type: log
        message: "Starting {stream.table}"

    post:
      - type: log
        message: "{stream.table}: {run.total_rows} rows in {run.duration}s"

streams:
  public.users:
    hooks:
      post:
        - type: query
          connection: '{target.name}'
          query: "ANALYZE {object.full_name}"

Full Documentation

See https://docs.slingdata.io/concepts/hooks.md for complete reference.