Streams in Effect
Overview
Effect Streams provide:
- •Lazy evaluation - Elements produced on demand
- •Resource safety - Automatic cleanup
- •Backpressure - Producer/consumer coordination
- •Composition - Transform, filter, merge streams
- •Error handling - Typed errors in stream pipeline
typescript
Stream<A, E, R> // Produces values of type A // May fail with error E // Requires environment R
Creating Streams
From Values
typescript
import { Stream } from "effect"
const numbers = Stream.make(1, 2, 3, 4, 5)
const fromArray = Stream.fromIterable([1, 2, 3])
const empty = Stream.empty
const single = Stream.succeed(42)
const infinite = Stream.iterate(1, (n) => n + 1)
From Effects
typescript
const fromEffect = Stream.fromEffect(fetchData())
const polling = Stream.repeatEffect(checkStatus())
const scheduled = Stream.repeatEffectWithSchedule(
checkStatus(),
Schedule.spaced("5 seconds")
)
From Async Sources
typescript
// From async iterable
const fromAsyncIterable = Stream.fromAsyncIterable(
asyncGenerator(),
(error) => new StreamError({ cause: error })
)
// From callback/event emitter
const fromCallback = Stream.async<number, never>((emit) => {
const handler = (value: number) => emit.single(value)
eventEmitter.on("data", handler)
return Effect.sync(() => eventEmitter.off("data", handler))
})
// From queue
const fromQueue = Stream.fromQueue(queue)
Generating Streams
typescript
const naturals = Stream.unfold(1, (n) => Option.some([n, n + 1]))
const range = Stream.range(1, 100)
const repeated = Stream.repeat(Stream.succeed("ping")).pipe(
Stream.take(5)
)
Transforming Streams
map - Transform Elements
typescript
const doubled = numbers.pipe(
Stream.map((n) => n * 2)
)
const enriched = users.pipe(
Stream.mapEffect((user) => fetchProfile(user.id))
)
const parallel = items.pipe(
Stream.mapEffect(process, { concurrency: 10 })
)
filter - Select Elements
typescript
const evens = numbers.pipe( Stream.filter((n) => n % 2 === 0) ) const valid = items.pipe( Stream.filterEffect((item) => validate(item)) )
flatMap - Nested Streams
typescript
const expanded = numbers.pipe( Stream.flatMap((n) => Stream.make(n, n * 10, n * 100)) ) // 1, 10, 100, 2, 20, 200, ...
take/drop
typescript
const first5 = numbers.pipe(Stream.take(5)) const skip5 = numbers.pipe(Stream.drop(5)) const firstWhile = numbers.pipe(Stream.takeWhile((n) => n < 10)) const dropWhile = numbers.pipe(Stream.dropWhile((n) => n < 10))
Combining Streams
concat - Sequential
typescript
const combined = Stream.concat(stream1, stream2) // or const combined = stream1.pipe(Stream.concat(stream2))
merge - Interleaved
typescript
// Interleave elements from both
const merged = Stream.merge(stream1, stream2)
// Merge multiple
const allMerged = Stream.mergeAll([s1, s2, s3], { concurrency: 3 })
zip - Pair Elements
typescript
const zipped = Stream.zip(names, ages)
// Stream<[string, number]>
// With function
const combined = Stream.zipWith(
names,
ages,
(name, age) => ({ name, age })
)
interleave
typescript
const interleaved = Stream.interleave(stream1, stream2) // a1, b1, a2, b2, ...
Consuming Streams
Running to Collection
typescript
const array = yield* Stream.runCollect(numbers) const first = yield* Stream.runHead(numbers) const sum = yield* Stream.runFold( numbers, 0, (acc, n) => acc + n )
Running for Effects
typescript
yield* numbers.pipe(
Stream.runForEach((n) => Effect.log(`Got: ${n}`))
)
yield* numbers.pipe(Stream.runDrain)
Running to Sink
typescript
import { Sink } from "effect"
const sum = yield* numbers.pipe(
Stream.run(Sink.sum)
)
const array = yield* numbers.pipe(
Stream.run(Sink.collectAll())
)
Chunking
Streams process elements in chunks for efficiency:
typescript
const chunked = numbers.pipe( Stream.grouped(10) ) const processed = numbers.pipe( Stream.mapChunks((chunk) => Chunk.map(chunk, (n) => n * 2)) ) const rechunked = numbers.pipe( Stream.rechunk(100) )
Error Handling
typescript
const safe = stream.pipe(
Stream.catchAll((error) => Stream.succeed(fallbackValue))
)
const handled = stream.pipe(
Stream.catchTag("NetworkError", (error) =>
Stream.succeed(cachedValue)
)
)
const resilient = stream.pipe(
Stream.retry(Schedule.exponential("1 second"))
)
const withFallback = stream.pipe(
Stream.orElse(() => fallbackStream)
)
Resource Management
typescript
// Stream with resource lifecycle
const fileStream = Stream.acquireRelease(
Effect.sync(() => fs.openSync("data.txt", "r")),
(fd) => Effect.sync(() => fs.closeSync(fd))
).pipe(
Stream.flatMap((fd) =>
Stream.repeatEffectOption(
Effect.sync(() => {
const buffer = Buffer.alloc(1024)
const bytes = fs.readSync(fd, buffer)
return bytes > 0 ? Option.some(buffer.slice(0, bytes)) : Option.none()
})
)
)
)
// Scoped streams
const scoped = Stream.scoped(
Effect.acquireRelease(openConnection, closeConnection)
)
Sinks
Sinks consume stream elements:
typescript
import { Sink } from "effect"
Sink.sum
Sink.count
Sink.head
Sink.last
Sink.collectAll()
Sink.forEach(f)
const maxSink = Sink.foldLeft(
Number.NEGATIVE_INFINITY,
(max, n: number) => Math.max(max, n)
)
Common Patterns
Batched Processing
typescript
const batchProcess = stream.pipe(
Stream.grouped(100),
Stream.mapEffect((batch) =>
Effect.tryPromise(() => api.processBatch(Chunk.toArray(batch)))
)
)
Rate Limiting
typescript
const rateLimited = stream.pipe(
Stream.throttle({
units: 1,
duration: "100 millis",
strategy: "shape"
})
)
Debouncing
typescript
const debounced = stream.pipe(
Stream.debounce("500 millis")
)
Windowing
typescript
// Time-based windows const windows = stream.pipe( Stream.groupedWithin(1000, "1 second") )
Best Practices
- •Use chunking for efficiency - Batch operations when possible
- •Handle backpressure - Use appropriate buffer strategies
- •Clean up resources - Use acquireRelease for external resources
- •Process in parallel - Use concurrency option in mapEffect
- •Handle errors early - Catch/retry before final consumption
Additional Resources
For comprehensive stream documentation, consult ${CLAUDE_PLUGIN_ROOT}/references/llms-full.txt.
Search for these sections:
- •"Creating Streams" for stream construction
- •"Consuming Streams" for running streams
- •"Operations" for transformations
- •"Error Handling in Streams" for error patterns
- •"Resourceful Streams" for resource management
- •"Sink" for custom sinks