Substreams Sink Development Expert
Expert assistant for consuming Substreams data - building production-grade sinks and data pipelines.
Core Concepts
What is a Substreams Sink?
A Substreams sink is an application that:
- •Connects to a Substreams endpoint via gRPC
- •Streams processed blockchain data from a Substreams package (.spkg)
- •Handles cursor persistence for resumability
- •Manages chain reorganizations (reorgs) gracefully
- •Processes the data into your destination (database, queue, etc.)
Note: Before building a custom sink, consider using existing solutions:
- •substreams-sink-sql - For PostgreSQL and ClickHouse. Handles cursor management, reorgs, batching, and schema management out of the box.
- •substreams-sink-kv - For key-value stores.
- •substreams-sink-files - For file-based outputs (JSON, CSV, Parquet).
The examples in this guide use database code for illustration purposes. For production SQL database sinks,
substreams-sink-sqlis highly recommended as it solves cursor persistence, reorg handling, batching, and many edge cases already.
Key Components
- •Endpoint: gRPC server providing Substreams data (e.g.,
mainnet.eth.streamingfast.io:443) - •Package (.spkg): Compiled Substreams with modules and protobuf schemas
- •Module: The specific output module to stream from
- •Cursor: Opaque string for resuming streams at exact position
- •Block Range: Start and stop blocks for the stream
Authentication
All Substreams endpoints require authentication:
# Set API key (recommended for CLI tools) export SUBSTREAMS_API_KEY="your-api-key" # Or set bearer token directly export SUBSTREAMS_API_TOKEN="your-jwt-token"
Get your API key from thegraph.market or pinax.network.
Language Recommendations
| Language | Recommendation | Best For |
|---|---|---|
| Go | Official SDK (Recommended) | Production sinks, StreamingFast sinks |
| JavaScript | Official SDK | Web apps, Node.js services |
| Python | Reference implementation | Prototyping, data analysis |
| Rust | Reference implementation | High-performance custom sinks |
Quick Start by Language
Go (Recommended)
package main
import (
"context"
"log"
"github.com/streamingfast/substreams/sink"
)
func main() {
sinker, err := sink.New(
sink.NewFromManifest("substreams.spkg", "map_events"),
sink.WithBlockRange(":+1000"),
)
if err != nil {
log.Fatalf("create sinker: %v", err)
}
sinker.Run(ctx, sink.NewSinker(
handleBlockScopedData,
handleBlockUndoSignal,
))
}
func handleBlockScopedData(ctx context.Context, data *pbsubstreamsrpc.BlockScopedData, isLive *bool, cursor *sink.Cursor) error {
// Process block data
// Persist cursor after successful processing
return nil
}
func handleBlockUndoSignal(ctx context.Context, undoSignal *pbsubstreamsrpc.BlockUndoSignal, cursor *sink.Cursor) error {
// Handle reorg: rewind data to undoSignal.LastValidBlock
// Persist undoSignal.LastValidCursor
return nil
}
See references/go-sink.md for complete guide.
JavaScript (Node.js)
import { createRegistry, createRequest } from "@substreams/core";
import { createGrpcTransport } from "@connectrpc/connect-node";
const transport = createGrpcTransport({
baseUrl: "https://mainnet.eth.streamingfast.io:443",
httpVersion: "2",
});
const request = createRequest({
substreamPackage: pkg,
outputModule: "map_events",
startBlockNum: 17000000n,
stopBlockNum: "+1000",
});
for await (const response of stream(request, registry, transport)) {
if (response.message.case === "blockScopedData") {
// Process block data
await persistCursor(response.message.value.cursor);
} else if (response.message.case === "blockUndoSignal") {
// Handle reorg
await handleUndo(response.message.value);
}
}
See references/javascript-sink.md for complete guide.
Python
import grpc
from sf.substreams.rpc.v2 import service_pb2, service_pb2_grpc
creds = grpc.ssl_channel_credentials()
with grpc.secure_channel("mainnet.eth.streamingfast.io:443", creds) as channel:
stub = service_pb2_grpc.StreamStub(channel)
metadata = [("authorization", f"Bearer {token}")]
request = service_pb2.Request(
start_block_num=17000000,
stop_block_num=17001000,
modules=package.modules,
output_module="map_events",
production_mode=True,
)
for response in stub.Blocks(request, metadata=metadata):
if response.WhichOneof("message") == "block_scoped_data":
# Process block data
pass
elif response.WhichOneof("message") == "block_undo_signal":
# Handle reorg
pass
See references/python-sink.md for complete guide.
Rust
use substreams_stream::{BlockResponse, SubstreamsStream};
let stream = SubstreamsStream::new(
endpoint,
cursor,
package,
modules.clone(),
"map_events".to_string(),
start_block,
stop_block,
);
while let Some(response) = stream.next().await {
match response? {
BlockResponse::New(data) => {
// Process block data
persist_cursor(&data.cursor);
}
BlockResponse::Undo(signal) => {
// Handle reorg
rewind_to_block(signal.last_valid_block);
persist_cursor(&signal.last_valid_cursor);
}
}
}
See references/rust-sink.md for complete guide.
Critical Concepts
Cursor Management
The cursor is the most critical piece of sink development.
RULE #1: Persist cursor AFTER successful processing, never before. RULE #2: On restart, load persisted cursor and resume from there. RULE #3: Blank/empty cursor means start from the beginning.
The cursor is an opaque string that encodes:
- •Block number and hash
- •Module execution state
- •Position within the block
Cursor persistence patterns:
| Storage | Use Case | Example |
|---|---|---|
| File | Development, single instance | cursor.txt |
| Database | Production, multi-instance | Cursors table with module key |
| Redis | High availability | Key-value with TTL |
See references/cursor-reorg.md for detailed patterns.
Chain Reorganization (Reorg) Handling
When a chain reorganizes, you receive a BlockUndoSignal:
BlockUndoSignal {
last_valid_block: BlockRef { num: 17000100, id: "0xabc..." }
last_valid_cursor: "opaque-cursor-string"
}
Required actions:
- •Delete/revert all data for blocks >
last_valid_block.num - •Persist the
last_valid_cursor - •Continue streaming (new blocks will follow automatically)
Final blocks only mode (recommended for most sinks):
- •Set
final_blocks_only: truein request - •Only receive blocks that cannot be reorganized
- •Eliminates need for undo handling
- •Trade-off: ~2-3 minutes delay from chain tip
Error Handling & Retry
Fatal errors (do not retry):
- •
Unauthenticated- Invalid or expired token - •
InvalidArgument- Bad request parameters - •
Internal- Server-side bug
Retryable errors (implement exponential backoff):
- •
Unavailable- Server temporarily unavailable - •
ResourceExhausted- Rate limited - •Connection timeouts
Exponential backoff pattern:
Base delay: 500ms Max delay: 45-90 seconds Jitter: Add random 0-100ms
Production Mode vs Development Mode
| Feature | Production Mode | Development Mode |
|---|---|---|
| Parallel execution | Yes | No |
| Output | Single module only | All modules |
| Performance | Optimized | Debug-friendly |
| Use case | Sinks | Testing, debugging |
Always use production mode for sinks:
sink.WithProductionMode() // Go production_mode=True // Python productionMode: true // JavaScript
Common Endpoints
| Network | Endpoint |
|---|---|
| Ethereum Mainnet | mainnet.eth.streamingfast.io:443 |
| Ethereum Sepolia | sepolia.eth.streamingfast.io:443 |
| Polygon | polygon.streamingfast.io:443 |
| Arbitrum One | arb-one.streamingfast.io:443 |
| Optimism | optimism.streamingfast.io:443 |
| Base | base.streamingfast.io:443 |
| BSC | bsc.streamingfast.io:443 |
| Solana | mainnet.sol.streamingfast.io:443 |
| Near | mainnet.near.streamingfast.io:443 |
Full list: thegraph.market/supported-networks
Block Range Syntax
# Explicit range --start-block 17000000 --stop-block 17001000 # From manifest initialBlock --start-block : --stop-block 17001000 # Relative stop (process 1000 blocks) --start-block 17000000 --stop-block +1000 # To chain head (live streaming) --start-block 17000000 --stop-block 0
Module Parameters
Pass runtime parameters to modules:
# Single parameter
-p "map_events=0xa0b86a33e6..."
# Multiple parameters
-p "map_events=0xa0b86a33..." -p "filter_module=type:transfer"
# JSON parameters
-p 'map_events={"contracts":["0x123","0x456"],"min_value":1000}'
Protobuf Code Generation
Generate language bindings from .spkg files:
# Install buf go install github.com/bufbuild/buf/cmd/buf@latest # Generate from local .spkg buf generate --exclude-path="google" ./my-substreams.spkg#format=bin # Generate from URL buf generate "https://spkg.io/streamingfast/substreams-eth-block-meta-v0.4.3.spkg#format=binpb" # Generate from buf registry buf generate buf.build/streamingfast/substreams --include-imports
Troubleshooting
Connection Issues
"Unauthenticated" error:
- •Verify API key/token is set correctly
- •Check token hasn't expired
- •Ensure correct environment variable name
"Connection refused" error:
- •Verify endpoint URL and port
- •Check TLS is enabled for https://
- •Test network connectivity
Empty Output
No data received:
- •Verify
initialBlockin manifest is before your start block - •Check the output module name is correct
- •Ensure the block range contains relevant data
- •Try a known-good block range first
Performance Issues
Slow processing:
- •Enable production mode
- •Use gzip compression
- •Increase connection timeout
- •Consider final_blocks_only for non-realtime needs
Resources
- •Official Documentation
- •Go Sink Reference
- •JavaScript Sink Reference
- •Python Sink Reference
- •Rust Sink Reference
- •Cursor & Reorg Handling