AgentSkillsCN

Sdg Run Flow

配置并执行 SDG 数据生成流程。

SKILL.md
--- frontmatter
description: Configure and execute SDG data generation flows

SDG Run Flow

Configure and execute synthetic data generation flows in sdg_hub.

Basic Execution

python
from datasets import load_dataset
from sdg_hub import FlowRegistry, Flow

# Load your seed dataset
dataset = load_dataset("json", data_files="seed_data.jsonl", split="train")

# Load the flow
flow = Flow.from_yaml(FlowRegistry.get_flow_path("simple-math-qa"))

# Configure model access
flow.set_model_config(
    model="gpt-4",
    api_base="https://api.openai.com/v1",
    api_key="sk-...",
)

# Run generation
result = flow.generate(dataset)

# Save results
result.to_json("generated_data.jsonl")

Model Configuration

OpenAI-compatible endpoints

python
flow.set_model_config(
    model="gpt-4",
    api_base="https://api.openai.com/v1",
    api_key="sk-...",
)

vLLM local server

python
flow.set_model_config(
    model="meta-llama/Llama-3.1-8B-Instruct",
    api_base="http://localhost:8000/v1",
    api_key="EMPTY",  # vLLM doesn't require auth
)

Multiple models (for different blocks)

python
flow.set_model_config(
    model="gpt-4",
    api_base="https://api.openai.com/v1",
    api_key="sk-...",
    block_name="generator_block",  # Only applies to this block
)

flow.set_model_config(
    model="gpt-3.5-turbo",
    api_base="https://api.openai.com/v1",
    api_key="sk-...",
    block_name="evaluator_block",
)

Execution Options

Batched processing

python
result = flow.generate(
    dataset,
    batch_size=32,  # Process 32 examples at a time
    num_workers=4,  # Parallel workers
)

Resume from checkpoint

python
result = flow.generate(
    dataset,
    checkpoint_dir="./checkpoints",
    resume=True,  # Continue from last checkpoint
)

Dry run (preview without execution)

python
preview = flow.dry_run(dataset[:5])
print(preview)  # Shows what would be generated

Flow Customization

Override block parameters

python
flow = Flow.from_yaml(flow_path)

# Modify specific block configs
flow.blocks["llm_block"].temperature = 0.9
flow.blocks["filter_block"].min_score = 0.8

result = flow.generate(dataset)

Skip blocks

python
result = flow.generate(
    dataset,
    skip_blocks=["optional_enhancement_block"],
)

Add custom blocks inline

python
from sdg_hub.core.blocks.base import BaseBlock

class MyPostProcessor(BaseBlock):
    def generate(self, dataset, **kwargs):
        return dataset.map(lambda x: {**x, "processed": True})

flow.add_block(MyPostProcessor(), position="end")
result = flow.generate(dataset)

Output Handling

Save to various formats

python
# JSONL (recommended)
result.to_json("output.jsonl")

# Parquet (efficient for large datasets)
result.to_parquet("output.parquet")

# Push to HuggingFace Hub
result.push_to_hub("username/dataset-name")

Streaming output

python
for batch in flow.generate_streaming(dataset, batch_size=100):
    # Process each batch as it's generated
    batch.to_json(f"output_batch_{batch.batch_id}.jsonl")

Monitoring & Logging

python
import logging
logging.basicConfig(level=logging.INFO)

# Enable detailed flow logging
flow.set_logging(
    level="DEBUG",
    log_file="flow_execution.log",
    log_samples=True,  # Log sample inputs/outputs
)

result = flow.generate(dataset)

Error Handling

python
from sdg_hub.exceptions import BlockExecutionError, RateLimitError

try:
    result = flow.generate(dataset)
except RateLimitError as e:
    print(f"Rate limited, waiting {e.retry_after}s")
    time.sleep(e.retry_after)
    result = flow.generate(dataset, resume=True)
except BlockExecutionError as e:
    print(f"Block {e.block_name} failed: {e.message}")
    # Inspect failed examples
    print(e.failed_examples[:5])

Common Issues

IssueSolution
Rate limitingReduce batch_size, add delays, use checkpoints
OOM on large datasetsUse generate_streaming() or smaller batches
Missing columnsCheck flow.required_columns matches your data
Slow generationIncrease num_workers, use faster model

Related Skills

  • /sdg-discover-flows - Find available flows
  • /sdg-create-block - Create custom blocks
  • /pipeline-design - Design end-to-end pipelines