AgentSkillsCN

kotlin-coroutines

在Amethyst多平台项目中运用高级Kotlin协程模式。在以下场景中使用此功能:(1) 结构化并发(supervisorScope、coroutineScope);(2) 高级Flow操作符(flatMapLatest、combine、merge、shareIn、stateIn);(3) Channels与callbackFlow;(4) 调度器管理与上下文切换;(5) 异常处理(CoroutineExceptionHandler、SupervisorJob);(6) 异步代码测试(runTest、Turbine);(7) Nostr中继连接池与订阅;(8) 事件流中的背压处理。对于基础的StateFlow/SharedFlow模式,可委托给Kotlin专家。同时,与Nostr专家协作,共同处理中继通信相关事宜。

SKILL.md
--- frontmatter
name: kotlin-coroutines
description: |
  Advanced Kotlin coroutines patterns for AmethystMultiplatform. Use when working with:
  (1) Structured concurrency (supervisorScope, coroutineScope), (2) Advanced Flow operators
  (flatMapLatest, combine, merge, shareIn, stateIn), (3) Channels and callbackFlow,
  (4) Dispatcher management and context switching, (5) Exception handling
  (CoroutineExceptionHandler, SupervisorJob), (6) Testing async code (runTest, Turbine),
  (7) Nostr relay connection pools and subscriptions, (8) Backpressure handling in event streams.
  Delegates to kotlin-expert for basic StateFlow/SharedFlow patterns.
  Complements nostr-expert for relay communication.

Kotlin Coroutines - Advanced Async Patterns

Expert guidance for complex async operations in Amethyst: relay pools, event streams, structured concurrency, and testing.

Mental Model

code
Async Architecture in Amethyst:

Relay Pool (supervisorScope)
    ├── Relay 1 (launch) → callbackFlow → Events
    ├── Relay 2 (launch) → callbackFlow → Events
    └── Relay 3 (launch) → callbackFlow → Events
            ↓
    merge() → distinctBy(id) → shareIn
            ↓
    Multiple Collectors (ViewModels, Services)

Key principles:

  • supervisorScope - Children fail independently
  • callbackFlow - Bridge callbacks to Flow
  • shareIn/stateIn - Hot flows from cold
  • Backpressure - buffer(), conflate(), DROP_OLDEST

When to Use This Skill

Use for advanced async patterns:

  • Multi-relay subscriptions with supervisorScope
  • Complex Flow operators (flatMapLatest, combine, merge)
  • callbackFlow for Android callbacks (connectivity, location)
  • Backpressure handling in high-frequency streams
  • Exception handling with CoroutineExceptionHandler
  • Testing coroutines with runTest and Turbine

Delegate to kotlin-expert for:

  • Basic StateFlow/SharedFlow patterns
  • Simple viewModelScope.launch
  • MutableStateFlow → asStateFlow()

Core Patterns

Pattern: callbackFlow for Relay Subscriptions

kotlin
// Real pattern from NostrClientStaticReqAsStateFlow.kt
fun INostrClient.reqAsFlow(
    relay: NormalizedRelayUrl,
    filters: List<Filter>,
): Flow<List<Event>> = callbackFlow {
    val subId = RandomInstance.randomChars(10)
    var hasBeenLive = false
    val eventIds = mutableSetOf<HexKey>()
    var currentEvents = listOf<Event>()

    val listener = object : IRequestListener {
        override fun onEvent(event: Event, ...) {
            if (event.id !in eventIds) {
                currentEvents = if (hasBeenLive) {
                    // After EOSE: prepend
                    listOf(event) + currentEvents
                } else {
                    // Before EOSE: append
                    currentEvents + event
                }
                eventIds.add(event.id)
                trySend(currentEvents)
            }
        }

        override fun onEose(...) {
            hasBeenLive = true
        }
    }

    openReqSubscription(subId, mapOf(relay to filters), listener)

    awaitClose { close(subId) }
}

Key techniques:

  1. Deduplication with Set
  2. EOSE handling (append → prepend strategy)
  3. trySend (non-blocking from callback)
  4. awaitClose for cleanup

Pattern: Structured Concurrency for Relays

kotlin
suspend fun connectToRelays(relays: List<Relay>) = supervisorScope {
    relays.forEach { relay ->
        launch {
            try {
                relay.connect()
                relay.subscribe(filters).collect { event ->
                    eventChannel.send(event)
                }
            } catch (e: IOException) {
                Log.e("Relay", "Connection failed: ${relay.url}", e)
                // Other relays continue
            }
        }
    }
}

Why supervisorScope:

  • One relay failure doesn't cancel others
  • All cancelled together when scope cancelled
  • Proper cleanup guaranteed

Pattern: Exception Handling for Services

kotlin
// Real pattern from PushNotificationReceiverService.kt
class MyService : Service() {
    val exceptionHandler = CoroutineExceptionHandler { _, throwable ->
        Log.e("Service", "Caught: ${throwable.message}", throwable)
    }

    private val scope = CoroutineScope(
        Dispatchers.IO + SupervisorJob() + exceptionHandler
    )

    override fun onDestroy() {
        scope.cancel()
        super.onDestroy()
    }
}

Pattern benefits:

  • SupervisorJob: children fail independently
  • ExceptionHandler: log instead of crash
  • Scoped lifecycle: cancel all on destroy

Pattern: Network Connectivity as Flow

kotlin
// Real pattern from ConnectivityFlow.kt
val status = callbackFlow {
    val networkCallback = object : NetworkCallback() {
        override fun onAvailable(network: Network) {
            trySend(ConnectivityStatus.Active(...))
        }
        override fun onLost(network: Network) {
            trySend(ConnectivityStatus.Off)
        }
    }

    connectivityManager.registerCallback(networkCallback)

    // Initial state
    activeNetwork?.let { trySend(ConnectivityStatus.Active(...)) }

    awaitClose {
        connectivityManager.unregisterCallback(networkCallback)
    }
}
    .distinctUntilChanged()
    .debounce(200)  // Stabilize flapping
    .flowOn(Dispatchers.IO)

Key patterns:

  1. Emit initial state immediately
  2. Register callback in flow body
  3. Cleanup in awaitClose
  4. Stabilize with debounce + distinctUntilChanged

Pattern: Merge Events from Multiple Relays

kotlin
fun observeFromRelays(
    relays: List<NormalizedRelayUrl>,
    filters: List<Filter>
): Flow<Event> =
    relays.map { relay ->
        client.reqAsFlow(relay, filters)
            .flatMapConcat { it.asFlow() }
    }.merge()
    .distinctBy { it.id }

Flow:

  • Each relay: Flow<List<Event>>
  • flatMapConcat: flatten to Flow<Event>
  • merge(): combine all relays
  • distinctBy: deduplicate across relays

Advanced Operators

For comprehensive coverage of Flow operators:

  • flatMapLatest, combine, zip, merge → See advanced-flow-operators.md
  • shareIn, stateIn → Conversion to hot flows
  • buffer, conflate → Backpressure strategies
  • debounce, sample → Rate limiting

Quick Reference

OperatorUse CaseExample
flatMapLatestCancel previous, switch to newSearch (cancel old query)
combineLatest from ALL flowscombine(account, settings, connectivity)
mergeSingle stream from multiplemerge(relay1, relay2, relay3)
shareInMultiple collectors, single upstreamShare expensive computation
stateInStateFlow from FlowViewModel state
buffer(DROP_OLDEST)High-frequency streamsReal-time event feed
conflateLatest onlyUI updates
debounceWait for quiet periodSearch input

Nostr Relay Patterns

For complete relay-specific patterns: → See relay-patterns.md

Covers:

  • Multi-relay subscription management
  • Connection lifecycle and reconnection
  • Event deduplication strategies
  • Backpressure for high-frequency events
  • EOSE handling patterns

Testing

For comprehensive testing patterns: → See testing-coroutines.md

Quick testing pattern:

kotlin
@Test
fun `relay subscription receives events`() = runTest {
    val client = FakeNostrClient()

    client.reqAsFlow(relay, filters).test {
        assertEquals(emptyList(), awaitItem())

        client.sendEvent(event1)
        assertEquals(listOf(event1), awaitItem())

        cancelAndIgnoreRemainingEvents()
    }
}

Testing tools:

  • runTest - Virtual time, auto cleanup
  • Turbine .test {} - Flow assertions
  • advanceTimeBy() - Control time
  • Fake implementations over mocks

Common Scenarios

Scenario: Implement New Relay Feature

Steps:

  1. callbackFlow for subscription
  2. Deduplication (Set of event IDs)
  3. awaitClose for cleanup
  4. Test with FakeNostrClient

Example: Add subscription for specific event kind

kotlin
fun observeKind(kind: Int): Flow<Event> = callbackFlow {
    val listener = object : IRequestListener {
        override fun onEvent(event: Event, ...) {
            if (event.kind == kind) {
                trySend(event)
            }
        }
    }
    client.subscribe(listener)
    awaitClose { client.unsubscribe(listener) }
}

Scenario: Handle Network Connectivity Changes

Steps:

  1. callbackFlow for connectivity
  2. flatMapLatest to reconnect
  3. debounce to stabilize
  4. Exception handling for failures

Example: Reconnect relays on connectivity

kotlin
connectivityFlow
    .flatMapLatest { status ->
        when (status) {
            Active -> relayPool.observeEvents()
            else -> emptyFlow()
        }
    }
    .catch { e -> Log.e("Error", e) }
    .collect { event -> handleEvent(event) }

Scenario: Optimize Multi-Collector Performance

Steps:

  1. Use shareIn for expensive upstream
  2. Configure SharingStarted strategy
  3. Set replay buffer size
  4. Test with multiple collectors

Example: Share relay subscription

kotlin
val events: SharedFlow<Event> = client
    .reqAsFlow(relay, filters)
    .flatMapConcat { it.asFlow() }
    .shareIn(
        scope = viewModelScope,
        started = SharingStarted.WhileSubscribed(5000),
        replay = 0
    )

Anti-Patterns

Using GlobalScope

kotlin
GlobalScope.launch { /* Leaks, no structured concurrency */ }

Use scoped coroutines

kotlin
viewModelScope.launch { /* Cancelled with ViewModel */ }

Forgetting awaitClose

kotlin
callbackFlow {
    registerCallback()
    // Missing cleanup!
}

Always cleanup

kotlin
callbackFlow {
    registerCallback()
    awaitClose { unregisterCallback() }
}

Blocking in Flow

kotlin
flow.map { Thread.sleep(1000); process(it) }

Suspend, don't block

kotlin
flow.map { delay(1000); process(it) }.flowOn(Dispatchers.Default)

Ignoring backpressure

kotlin
fastProducer.collect { slowConsumer(it) }  // Blocks producer!

Handle backpressure

kotlin
fastProducer
    .buffer(64, BufferOverflow.DROP_OLDEST)
    .collect { slowConsumer(it) }

Delegation

Use kotlin-expert for:

  • Basic StateFlow/SharedFlow patterns
  • viewModelScope.launch usage
  • Simple MutableStateFlow → asStateFlow()

Use nostr-expert for:

  • Nostr protocol details (NIPs, event structure)
  • Event creation and signing
  • Cryptographic operations

This skill provides:

  • Advanced async patterns
  • Structured concurrency
  • Complex Flow operators
  • Testing strategies
  • Relay-specific async patterns

Resources

  • references/advanced-flow-operators.md - All Flow operators with examples
  • references/relay-patterns.md - Nostr relay async patterns from codebase
  • references/testing-coroutines.md - Complete testing guide

Quick Decision Tree

code
Need async operation?
    ├─ Simple ViewModel state update → kotlin-expert (StateFlow)
    ├─ Android callback → This skill (callbackFlow)
    ├─ Multiple concurrent operations → This skill (supervisorScope)
    ├─ Complex Flow transformation → This skill (references/advanced-flow-operators.md)
    ├─ Relay subscription → This skill (references/relay-patterns.md)
    └─ Testing async code → This skill (references/testing-coroutines.md)