Kotlin Coroutines - Advanced Async Patterns
Expert guidance for complex async operations in Amethyst: relay pools, event streams, structured concurrency, and testing.
Mental Model
Async Architecture in Amethyst:
Relay Pool (supervisorScope)
├── Relay 1 (launch) → callbackFlow → Events
├── Relay 2 (launch) → callbackFlow → Events
└── Relay 3 (launch) → callbackFlow → Events
↓
merge() → distinctBy(id) → shareIn
↓
Multiple Collectors (ViewModels, Services)
Key principles:
- •supervisorScope - Children fail independently
- •callbackFlow - Bridge callbacks to Flow
- •shareIn/stateIn - Hot flows from cold
- •Backpressure - buffer(), conflate(), DROP_OLDEST
When to Use This Skill
Use for advanced async patterns:
- •Multi-relay subscriptions with supervisorScope
- •Complex Flow operators (flatMapLatest, combine, merge)
- •callbackFlow for Android callbacks (connectivity, location)
- •Backpressure handling in high-frequency streams
- •Exception handling with CoroutineExceptionHandler
- •Testing coroutines with runTest and Turbine
Delegate to kotlin-expert for:
- •Basic StateFlow/SharedFlow patterns
- •Simple viewModelScope.launch
- •MutableStateFlow → asStateFlow()
Core Patterns
Pattern: callbackFlow for Relay Subscriptions
// Real pattern from NostrClientStaticReqAsStateFlow.kt
fun INostrClient.reqAsFlow(
relay: NormalizedRelayUrl,
filters: List<Filter>,
): Flow<List<Event>> = callbackFlow {
val subId = RandomInstance.randomChars(10)
var hasBeenLive = false
val eventIds = mutableSetOf<HexKey>()
var currentEvents = listOf<Event>()
val listener = object : IRequestListener {
override fun onEvent(event: Event, ...) {
if (event.id !in eventIds) {
currentEvents = if (hasBeenLive) {
// After EOSE: prepend
listOf(event) + currentEvents
} else {
// Before EOSE: append
currentEvents + event
}
eventIds.add(event.id)
trySend(currentEvents)
}
}
override fun onEose(...) {
hasBeenLive = true
}
}
openReqSubscription(subId, mapOf(relay to filters), listener)
awaitClose { close(subId) }
}
Key techniques:
- •Deduplication with Set
- •EOSE handling (append → prepend strategy)
- •trySend (non-blocking from callback)
- •awaitClose for cleanup
Pattern: Structured Concurrency for Relays
suspend fun connectToRelays(relays: List<Relay>) = supervisorScope {
relays.forEach { relay ->
launch {
try {
relay.connect()
relay.subscribe(filters).collect { event ->
eventChannel.send(event)
}
} catch (e: IOException) {
Log.e("Relay", "Connection failed: ${relay.url}", e)
// Other relays continue
}
}
}
}
Why supervisorScope:
- •One relay failure doesn't cancel others
- •All cancelled together when scope cancelled
- •Proper cleanup guaranteed
Pattern: Exception Handling for Services
// Real pattern from PushNotificationReceiverService.kt
class MyService : Service() {
val exceptionHandler = CoroutineExceptionHandler { _, throwable ->
Log.e("Service", "Caught: ${throwable.message}", throwable)
}
private val scope = CoroutineScope(
Dispatchers.IO + SupervisorJob() + exceptionHandler
)
override fun onDestroy() {
scope.cancel()
super.onDestroy()
}
}
Pattern benefits:
- •SupervisorJob: children fail independently
- •ExceptionHandler: log instead of crash
- •Scoped lifecycle: cancel all on destroy
Pattern: Network Connectivity as Flow
// Real pattern from ConnectivityFlow.kt
val status = callbackFlow {
val networkCallback = object : NetworkCallback() {
override fun onAvailable(network: Network) {
trySend(ConnectivityStatus.Active(...))
}
override fun onLost(network: Network) {
trySend(ConnectivityStatus.Off)
}
}
connectivityManager.registerCallback(networkCallback)
// Initial state
activeNetwork?.let { trySend(ConnectivityStatus.Active(...)) }
awaitClose {
connectivityManager.unregisterCallback(networkCallback)
}
}
.distinctUntilChanged()
.debounce(200) // Stabilize flapping
.flowOn(Dispatchers.IO)
Key patterns:
- •Emit initial state immediately
- •Register callback in flow body
- •Cleanup in awaitClose
- •Stabilize with debounce + distinctUntilChanged
Pattern: Merge Events from Multiple Relays
fun observeFromRelays(
relays: List<NormalizedRelayUrl>,
filters: List<Filter>
): Flow<Event> =
relays.map { relay ->
client.reqAsFlow(relay, filters)
.flatMapConcat { it.asFlow() }
}.merge()
.distinctBy { it.id }
Flow:
- •Each relay:
Flow<List<Event>> - •flatMapConcat: flatten to
Flow<Event> - •merge(): combine all relays
- •distinctBy: deduplicate across relays
Advanced Operators
For comprehensive coverage of Flow operators:
- •flatMapLatest, combine, zip, merge → See advanced-flow-operators.md
- •shareIn, stateIn → Conversion to hot flows
- •buffer, conflate → Backpressure strategies
- •debounce, sample → Rate limiting
Quick Reference
| Operator | Use Case | Example |
|---|---|---|
| flatMapLatest | Cancel previous, switch to new | Search (cancel old query) |
| combine | Latest from ALL flows | combine(account, settings, connectivity) |
| merge | Single stream from multiple | merge(relay1, relay2, relay3) |
| shareIn | Multiple collectors, single upstream | Share expensive computation |
| stateIn | StateFlow from Flow | ViewModel state |
| buffer(DROP_OLDEST) | High-frequency streams | Real-time event feed |
| conflate | Latest only | UI updates |
| debounce | Wait for quiet period | Search input |
Nostr Relay Patterns
For complete relay-specific patterns: → See relay-patterns.md
Covers:
- •Multi-relay subscription management
- •Connection lifecycle and reconnection
- •Event deduplication strategies
- •Backpressure for high-frequency events
- •EOSE handling patterns
Testing
For comprehensive testing patterns: → See testing-coroutines.md
Quick testing pattern:
@Test
fun `relay subscription receives events`() = runTest {
val client = FakeNostrClient()
client.reqAsFlow(relay, filters).test {
assertEquals(emptyList(), awaitItem())
client.sendEvent(event1)
assertEquals(listOf(event1), awaitItem())
cancelAndIgnoreRemainingEvents()
}
}
Testing tools:
- •
runTest- Virtual time, auto cleanup - •Turbine
.test {}- Flow assertions - •
advanceTimeBy()- Control time - •Fake implementations over mocks
Common Scenarios
Scenario: Implement New Relay Feature
Steps:
- •callbackFlow for subscription
- •Deduplication (Set of event IDs)
- •awaitClose for cleanup
- •Test with FakeNostrClient
Example: Add subscription for specific event kind
fun observeKind(kind: Int): Flow<Event> = callbackFlow {
val listener = object : IRequestListener {
override fun onEvent(event: Event, ...) {
if (event.kind == kind) {
trySend(event)
}
}
}
client.subscribe(listener)
awaitClose { client.unsubscribe(listener) }
}
Scenario: Handle Network Connectivity Changes
Steps:
- •callbackFlow for connectivity
- •flatMapLatest to reconnect
- •debounce to stabilize
- •Exception handling for failures
Example: Reconnect relays on connectivity
connectivityFlow
.flatMapLatest { status ->
when (status) {
Active -> relayPool.observeEvents()
else -> emptyFlow()
}
}
.catch { e -> Log.e("Error", e) }
.collect { event -> handleEvent(event) }
Scenario: Optimize Multi-Collector Performance
Steps:
- •Use shareIn for expensive upstream
- •Configure SharingStarted strategy
- •Set replay buffer size
- •Test with multiple collectors
Example: Share relay subscription
val events: SharedFlow<Event> = client
.reqAsFlow(relay, filters)
.flatMapConcat { it.asFlow() }
.shareIn(
scope = viewModelScope,
started = SharingStarted.WhileSubscribed(5000),
replay = 0
)
Anti-Patterns
❌ Using GlobalScope
GlobalScope.launch { /* Leaks, no structured concurrency */ }
✅ Use scoped coroutines
viewModelScope.launch { /* Cancelled with ViewModel */ }
❌ Forgetting awaitClose
callbackFlow {
registerCallback()
// Missing cleanup!
}
✅ Always cleanup
callbackFlow {
registerCallback()
awaitClose { unregisterCallback() }
}
❌ Blocking in Flow
flow.map { Thread.sleep(1000); process(it) }
✅ Suspend, don't block
flow.map { delay(1000); process(it) }.flowOn(Dispatchers.Default)
❌ Ignoring backpressure
fastProducer.collect { slowConsumer(it) } // Blocks producer!
✅ Handle backpressure
fastProducer
.buffer(64, BufferOverflow.DROP_OLDEST)
.collect { slowConsumer(it) }
Delegation
Use kotlin-expert for:
- •Basic StateFlow/SharedFlow patterns
- •viewModelScope.launch usage
- •Simple MutableStateFlow → asStateFlow()
Use nostr-expert for:
- •Nostr protocol details (NIPs, event structure)
- •Event creation and signing
- •Cryptographic operations
This skill provides:
- •Advanced async patterns
- •Structured concurrency
- •Complex Flow operators
- •Testing strategies
- •Relay-specific async patterns
Resources
- •references/advanced-flow-operators.md - All Flow operators with examples
- •references/relay-patterns.md - Nostr relay async patterns from codebase
- •references/testing-coroutines.md - Complete testing guide
Quick Decision Tree
Need async operation?
├─ Simple ViewModel state update → kotlin-expert (StateFlow)
├─ Android callback → This skill (callbackFlow)
├─ Multiple concurrent operations → This skill (supervisorScope)
├─ Complex Flow transformation → This skill (references/advanced-flow-operators.md)
├─ Relay subscription → This skill (references/relay-patterns.md)
└─ Testing async code → This skill (references/testing-coroutines.md)