SDG Run Flow
Configure and execute synthetic data generation flows in sdg_hub.
Basic Execution
python
from datasets import load_dataset
from sdg_hub import FlowRegistry, Flow
# Load your seed dataset
dataset = load_dataset("json", data_files="seed_data.jsonl", split="train")
# Load the flow
flow = Flow.from_yaml(FlowRegistry.get_flow_path("simple-math-qa"))
# Configure model access
flow.set_model_config(
model="gpt-4",
api_base="https://api.openai.com/v1",
api_key="sk-...",
)
# Run generation
result = flow.generate(dataset)
# Save results
result.to_json("generated_data.jsonl")
Model Configuration
OpenAI-compatible endpoints
python
flow.set_model_config(
model="gpt-4",
api_base="https://api.openai.com/v1",
api_key="sk-...",
)
vLLM local server
python
flow.set_model_config(
model="meta-llama/Llama-3.1-8B-Instruct",
api_base="http://localhost:8000/v1",
api_key="EMPTY", # vLLM doesn't require auth
)
Multiple models (for different blocks)
python
flow.set_model_config(
model="gpt-4",
api_base="https://api.openai.com/v1",
api_key="sk-...",
block_name="generator_block", # Only applies to this block
)
flow.set_model_config(
model="gpt-3.5-turbo",
api_base="https://api.openai.com/v1",
api_key="sk-...",
block_name="evaluator_block",
)
Execution Options
Batched processing
python
result = flow.generate(
dataset,
batch_size=32, # Process 32 examples at a time
num_workers=4, # Parallel workers
)
Resume from checkpoint
python
result = flow.generate(
dataset,
checkpoint_dir="./checkpoints",
resume=True, # Continue from last checkpoint
)
Dry run (preview without execution)
python
preview = flow.dry_run(dataset[:5]) print(preview) # Shows what would be generated
Flow Customization
Override block parameters
python
flow = Flow.from_yaml(flow_path) # Modify specific block configs flow.blocks["llm_block"].temperature = 0.9 flow.blocks["filter_block"].min_score = 0.8 result = flow.generate(dataset)
Skip blocks
python
result = flow.generate(
dataset,
skip_blocks=["optional_enhancement_block"],
)
Add custom blocks inline
python
from sdg_hub.core.blocks.base import BaseBlock
class MyPostProcessor(BaseBlock):
def generate(self, dataset, **kwargs):
return dataset.map(lambda x: {**x, "processed": True})
flow.add_block(MyPostProcessor(), position="end")
result = flow.generate(dataset)
Output Handling
Save to various formats
python
# JSONL (recommended)
result.to_json("output.jsonl")
# Parquet (efficient for large datasets)
result.to_parquet("output.parquet")
# Push to HuggingFace Hub
result.push_to_hub("username/dataset-name")
Streaming output
python
for batch in flow.generate_streaming(dataset, batch_size=100):
# Process each batch as it's generated
batch.to_json(f"output_batch_{batch.batch_id}.jsonl")
Monitoring & Logging
python
import logging
logging.basicConfig(level=logging.INFO)
# Enable detailed flow logging
flow.set_logging(
level="DEBUG",
log_file="flow_execution.log",
log_samples=True, # Log sample inputs/outputs
)
result = flow.generate(dataset)
Error Handling
python
from sdg_hub.exceptions import BlockExecutionError, RateLimitError
try:
result = flow.generate(dataset)
except RateLimitError as e:
print(f"Rate limited, waiting {e.retry_after}s")
time.sleep(e.retry_after)
result = flow.generate(dataset, resume=True)
except BlockExecutionError as e:
print(f"Block {e.block_name} failed: {e.message}")
# Inspect failed examples
print(e.failed_examples[:5])
Common Issues
| Issue | Solution |
|---|---|
| Rate limiting | Reduce batch_size, add delays, use checkpoints |
| OOM on large datasets | Use generate_streaming() or smaller batches |
| Missing columns | Check flow.required_columns matches your data |
| Slow generation | Increase num_workers, use faster model |
Related Skills
- •
/sdg-discover-flows- Find available flows - •
/sdg-create-block- Create custom blocks - •
/pipeline-design- Design end-to-end pipelines