AgentSkillsCN

17th Space Server Patterns

第17期空间服务器模式

SKILL.md

17th-space-patterns

Space domain patterns for the Sudal project. Reference this when working with Space feature code.


Space FSM Architecture

State Machine Overview

The Space FSM is a YAML-defined state machine (configs/state.yaml) compiled to Go structures.

States (11 total):

StateTerminalDescription
waitingNoInitial lobby state, waiting for participants
activeNoGame activated, participants marking ready
predicting_decisionNoHost deciding on prediction phase
predictingNoPrediction gameplay phase
chargingNoCharging phase, collecting participants
aggregatingNoResult aggregation in progress
completedYesSuccessfully finished
failedYesAggregation hook failed
abortedYesHost left or timeout
expiredYesGrace period ended after completion

Key Transitions

code
waiting → [activate] → active → [ready_all] → predicting_decision
                                              ├→ [yes] → predicting → charging
                                              └→ [no/timer] → charging
charging → [timer + has_min] → aggregating → [timer + result] → completed
                                           → [timer + failed] → failed
completed → [timer 30min] → expired

Timer-Driven Transitions

TimerDurationFromToGuard
waiting_timeout600swaitingaborted-
ready_timeout600sactiveaborted-
predicting_decision_timeout10spredicting_decisioncharging-
predicting_timeout30spredictingcharging-
to_aggregating3schargingaggregatinghas_min_participants
grace_period1800scompletedexpired-

FSM Event Pattern

TriggerEvent Flow

go
// Application layer calls SpaceFSMPort.Execute()
result, err := fsm.Execute(ctx, application.SpaceFSMCommand{
    Space:       snapshot.Clone(),  // Always clone!
    EventName:   "activate",
    Params:      map[string]any{"participant_id": participantID},
    Metadata:    map[string]string{"trace_id": traceID},
    Idempotency: &SpaceFSMIdempotencyCommit{...},
})

// Runtime flow:
// 1. Validate inputs
// 2. Lookup transition for (current_state, event)
// 3. Execute guards (with timeout)
// 4. Execute actions
// 5. Build JSON patches with placeholder resolution
// 6. CAS commit with version check
// 7. Notify observers on state change

Guards vs Actions

Guards: Pure predicates, return (bool, error), must NOT mutate state

go
// Guard signature
type GuardFunc func(ctx context.Context, evt *EventContext, params map[string]any) (bool, error)

// Common guards
- has_min_participants      // Minimum participant count met
- has_not_min_participants  // Below minimum (for abort)
- all_participants_ready    // All marked ready
- waiting_timeout_due       // Timer expired
- has_aggregation_result    // Hook job succeeded
- aggregation_failed        // Hook job failed

Actions: Side effects, return error, run after all guards pass

go
// Action signature
type ActionFunc func(ctx context.Context, evt *EventContext, params map[string]any) error

// Common actions
- notify_activation       // Notify participants of activation
- broadcast_ready_state   // Broadcast ready to all
- log_current_status      // Logging

CAS (Compare-And-Swap) Pattern

State Version Mechanism

Every SpaceSnapshot has a StateVersion field (uint64). On each FSM transition:

go
// Runtime increments version before CAS
next.StateVersion = current.StateVersion + 1

// Locker calculates expected version
expected := next.StateVersion - 1  // == current version

// SQL with version check
UPDATE spaces SET ... WHERE id = $1 AND state_version = $expected

Conflict Handling

go
applied, err := locker.CompareAndSwap(ctx, next, patches, event, commit)
if err != nil {
    return err  // Actual error
}
if !applied {
    return entity.CASConflict(spaceID)  // Retry needed
}

UseCase retry pattern:

go
for attempt := 0; attempt < maxRetries; attempt++ {
    snapshot, err := reader.GetByID(ctx, spaceID)
    if err != nil { return err }

    result, err := fsm.Execute(ctx, command)
    if err == nil {
        return result
    }
    if !errors.Is(err, ErrConcurrencyConflict) {
        return err  // Non-retryable
    }
    // Retry with fresh snapshot
}
return ErrMaxRetriesExceeded

Patch Whitelist

Only these JSON-Patch operations are allowed:

PathOpValidation
/statusreplaceValid SpaceStatus enum
/participants/$IDadd/removeValid UUID
/participants/$ID/readyreplaceBoolean
/leader_profile_idreplaceValid UUID
/comparison/photo/versionreplaceuint32
/comparison/location/versionreplaceuint32
/comparison/memo/$ID/versionreplaceuint32, valid UUID

Any other patch path is rejected by the locker.


Hook Job Lifecycle

State Hooks

on_enter hooks run after transition commits:

HookRequiredStates
aggregate_quiz_resultYesaggregating
cleanup_terminated_spaceYesfailed, aborted, expired
update_friends_last_compared_atNocompleted
log_current_statusNoany

Hook Job States

code
PENDING → RUNNING → DONE
                  → FAILED (retry or terminal)

Required vs Optional Hooks

go
// Required hook failure blocks FSM progression
if job.Required && status == HookJobStatusFailed {
    // Transition to failed state via guard check
}

// Optional hook failure is logged but doesn't block

Reconcilers

NameSchedulePurpose
cleanup_terminated_space_backstop30sClean up active participants in terminal states
broken_expired_space_cleanupDaily 3AMFix broken expired spaces (ops tool)

Streaming Architecture

Hub Pattern

go
// Stream service manages per-space hubs
type Service struct {
    hubsMu sync.RWMutex
    hubs   map[uuid.UUID]*hub.Instance
}

// Client subscribes with resume token
func (s *Service) Subscribe(ctx context.Context, params SubscribeParams) (<-chan Event, error) {
    hub := s.getOrCreateHub(params.SpaceID)
    return hub.Subscribe(ctx, params.ResumeToken, params.UserID)
}

Resume Token Pattern

Resume tokens enable reconnection without missing events:

go
type ResumeToken struct {
    SpaceID      uuid.UUID
    StateVersion uint64
    EventSeq     uint64
}

// On reconnect:
// 1. Parse resume token
// 2. Fetch events after token's sequence
// 3. Replay missed events
// 4. Continue live streaming

Stream Events

go
// Events pushed to clients
type Event struct {
    Type        EventType
    SpaceID     uuid.UUID
    StateVersion uint64
    Payload     any
}

// Event types
- EventTypeSnapshot     // Full state sync
- EventTypeParticipant  // Participant change
- EventTypeStatus       // Status change
- EventTypeComparison   // Comparison update

Idempotency Patterns

Four Idempotency Trackers

OperationKey FormatPurpose
Create{hostID}:{requestID}Prevent duplicate space creation
Activate{spaceID}:{hostID}:{key}Prevent duplicate activation
Join{spaceID}:{participantID}:{key}Prevent duplicate joins
Leave{spaceID}:{targetID}:{key}Prevent duplicate leaves

Reserve-Then-Consume Pattern

go
// Step 1: Reserve idempotency key (before FSM execution)
err := tracker.Reserve(ctx, spaceID, participantID, key)
if err == ErrIdempotencyConflict {
    return existingResult  // Already processed
}

// Step 2: Execute FSM transition
result, err := fsm.Execute(ctx, command)

// Step 3: Mark consumed (in same transaction via commit struct)
// Done automatically by CAS locker

Error Handling

SpaceError Structure

go
type SpaceError struct {
    code         ErrorCode            // e.g., "FAILED_PRECONDITION"
    reason       string               // e.g., "SPACE_NOT_IN_WAITING"
    domain       string               // "space.v1"
    message      string               // Human-readable message
    meta         map[string]string    // Contextual metadata
    precondition *PreconditionViolation
    inner        error                // Wrapped error
}

Factory Functions

Always use factory functions, never construct directly:

go
// Permission errors
entity.NotHost(spaceID, callerID)
entity.InsufficientRole(spaceID, userID, "host")
entity.InvalidEntryCode(spaceID, code)

// Precondition errors
entity.StateViolation(spaceID, "waiting", "active")
entity.SpaceNotInWaiting(spaceID, "active")
entity.MinParticipantsViolation(spaceID, 2, 1)

// Resource exhausted errors
entity.SpaceFull(spaceID, 10, 10)
entity.HostQuotaExceeded(hostID, true, false)
entity.JoinRateLimitExceeded(participantID, window, limit, retryAfter)

// Concurrency errors
entity.CASConflict(spaceID)
entity.IdempotencyConflict(spaceID, hostID, key)

Protocol Layer Mapping

go
// rpcerr/from.go maps domain errors to Connect errors
func FromSpaceError(err error) *connect.Error {
    spaceErr, ok := entity.AsSpaceError(err)
    if !ok {
        return connect.NewError(codes.Internal, err)
    }

    code := mapCode(spaceErr.Code())
    connErr := connect.NewError(code, errors.New(spaceErr.Error()))

    // Add ErrorInfo detail
    detail := &errdetails.ErrorInfo{
        Reason:   spaceErr.Reason(),
        Domain:   spaceErr.Domain(),
        Metadata: spaceErr.Metadata(),
    }
    connErr.AddDetail(detail)

    return connErr
}

UseCase Patterns

Guard-UseCase Pattern

UseCases that invoke FSM events follow this structure:

go
type ActivateQuizComparisonUseCase struct {
    fsm              SpaceFSMPort
    snapshotReader   SpaceSnapshotReader
    quotaInspector   SpaceQuotaInspector
    idempotency      SpaceActivateIdempotencyTracker
}

func (uc *ActivateQuizComparisonUseCase) Execute(ctx context.Context, req Request) (*Result, error) {
    // 1. Load current state
    snapshot, err := uc.snapshotReader.GetByID(ctx, req.SpaceID)
    if err != nil { return nil, err }

    // 2. Pre-FSM validation (outside FSM guards)
    if snapshot.HostUserID != req.CallerID {
        return nil, entity.NotHost(req.SpaceID, req.CallerID)
    }

    // 3. Idempotency check
    record, err := uc.idempotency.ReserveActivation(ctx, reservation)
    if record.Consumed {
        return record.Result, nil  // Already activated
    }

    // 4. Execute FSM event
    result, err := uc.fsm.Execute(ctx, command)
    if err != nil {
        return nil, uc.mapError(err)
    }

    // 5. Post-transition side effects (if needed)
    return mapResult(result), nil
}

Stream Service Pattern

go
type Service struct {
    hubsMu      sync.RWMutex
    hubs        map[uuid.UUID]*hub.Instance
    snapshotReader SpaceSnapshotReader
    clock       clock.Clock
}

// SpaceFSMTransitionObserver implementation
func (s *Service) OnTransition(ctx context.Context, event TransitionEvent) {
    s.hubsMu.RLock()
    h, ok := s.hubs[event.SpaceID]
    s.hubsMu.RUnlock()

    if ok {
        h.Broadcast(ctx, mapToStreamEvent(event))
    }
}

Participant Management

Participant Entity

go
type Participant struct {
    SpaceID    uuid.UUID
    UserID     uuid.UUID
    ProfileID  *uuid.UUID
    Role       Role          // "host" or "participant"
    IsActive   bool
    IsReady    bool
    JoinedAt   time.Time
    LeftAt     *time.Time
}

// Roles
const (
    RoleHost        Role = "host"
    RoleParticipant Role = "participant"
)

Participant Lifecycle

code
Created (host) → Join (participants) → Ready → Leave/Kick

Join Flow

  1. Pre-checks: quota, rate limits, active elsewhere
  2. Reserve idempotency key
  3. FSM add_participant event
  4. Update participant count
  5. Consume idempotency key

Leave Reasons

ReasonDescriptionInitiator
SELF_INITIATEDVoluntary leaveSelf
HOST_FORCEDKicked by hostHost
ADMIN_FORCEDRemoved by adminAdmin
SYSTEMSystem cleanupSystem

Rate Limiting & Quotas

Host Quotas

go
type HostCapacity struct {
    HasWaitingSpace   bool
    HasActivePresence bool
    WaitingSpaces     int32
    ActiveSpaces      int32
    ChargingSpaces    int32
    AggregatingSpaces int32
}

// Check before creating space
if capacity.HasWaitingSpace {
    return entity.HostQuotaExceeded(hostID, true, false)
}

Rate Limits

OperationLimitWindowError
Create SpaceConfigurablePer hostHostCreateRateLimitExceeded
Join SpaceConfigurablePer userJoinRateLimitExceeded
Concurrent StreamsMax NPer userStreamLimitExceeded

Testing Patterns

FSM Test Structure

go
func TestActivateTransition(t *testing.T) {
    // Arrange: Create snapshot in waiting state
    snapshot := testhelpers.NewWaitingSpace(t, 3)  // 3 participants

    // Act: Trigger activate event
    result, err := fsm.Execute(ctx, SpaceFSMCommand{
        Space:     snapshot,
        EventName: "activate",
    })

    // Assert: Transition succeeded
    require.NoError(t, err)
    assert.Equal(t, SpaceStatusActive, result.NextSnapshot.Status)
    assert.Equal(t, snapshot.StateVersion+1, result.NextSnapshot.StateVersion)
}

CAS Conflict Test

go
func TestCASConflict(t *testing.T) {
    // Arrange: Two concurrent readers
    snapshot1, _ := reader.GetByID(ctx, spaceID)
    snapshot2, _ := reader.GetByID(ctx, spaceID)

    // Act: First update succeeds
    _, err1 := fsm.Execute(ctx, SpaceFSMCommand{Space: snapshot1, EventName: "activate"})
    require.NoError(t, err1)

    // Second update fails with CAS conflict
    _, err2 := fsm.Execute(ctx, SpaceFSMCommand{Space: snapshot2, EventName: "activate"})

    // Assert
    assert.ErrorIs(t, err2, ErrConcurrencyConflict)
}

Integration Test Pattern

go
func TestJoinSpaceIntegration(t *testing.T) {
    // Setup: Create space with host
    space := suite.CreateTestSpace(t, hostID)

    // Execute: Join as participant
    resp, err := client.JoinSpace(ctx, &spacev1.JoinSpaceRequest{
        SpaceId:   space.ID.String(),
        EntryCode: space.EntryCode,
    })

    // Verify: Participant added
    require.NoError(t, err)
    assert.Len(t, resp.Participants, 2)  // host + new participant
}

Common Pitfalls

1. Always Clone Before FSM Execute

go
// WRONG: Mutates original
result, _ := fsm.Execute(ctx, SpaceFSMCommand{Space: snapshot})

// CORRECT: Clone first
result, _ := fsm.Execute(ctx, SpaceFSMCommand{Space: snapshot.Clone()})

2. Check Terminal States Early

go
// Before any operation on completed/failed/expired/aborted spaces
if snapshot.Status.IsTerminal() {
    return entity.StateViolation(spaceID, "non-terminal", string(snapshot.Status))
}

3. Handle CAS Conflicts

go
// Never ignore CAS conflicts - always retry or propagate
if errors.Is(err, ErrConcurrencyConflict) {
    // Retry with fresh snapshot or return to caller
}

4. Validate Placeholders in Params

go
// Runtime validates placeholders like $PARTICIPANT_ID
// Ensure params contain valid values
params := map[string]any{
    "participant_id": participantID.String(),  // Must be valid UUID string
    "is_ready":       true,                     // Must be boolean
}

Key File Locations

ComponentPath
FSM Definitionconfigs/state.yaml
FSM Runtimeinternal/shared/fsm/runtime/runtime.go
FSM Adapterinternal/feature/space/data/adapter/spacefsm/executor.go
CAS Lockerinternal/feature/space/data/repo/space/locker.go
Domain Entitiesinternal/feature/space/domain/entity/
Error Factoriesinternal/feature/space/domain/entity/errors.go
UseCasesinternal/feature/space/application/
Stream Serviceinternal/feature/space/application/stream/service.go
Protocol Managerinternal/feature/space/protocol/manager.go
RPC Error Mappinginternal/feature/space/protocol/rpcerr/from.go