17th-space-patterns
Space domain patterns for the Sudal project. Reference this when working with Space feature code.
Space FSM Architecture
State Machine Overview
The Space FSM is a YAML-defined state machine (configs/state.yaml) compiled to Go structures.
States (11 total):
| State | Terminal | Description |
|---|---|---|
waiting | No | Initial lobby state, waiting for participants |
active | No | Game activated, participants marking ready |
predicting_decision | No | Host deciding on prediction phase |
predicting | No | Prediction gameplay phase |
charging | No | Charging phase, collecting participants |
aggregating | No | Result aggregation in progress |
completed | Yes | Successfully finished |
failed | Yes | Aggregation hook failed |
aborted | Yes | Host left or timeout |
expired | Yes | Grace period ended after completion |
Key Transitions
waiting → [activate] → active → [ready_all] → predicting_decision
├→ [yes] → predicting → charging
└→ [no/timer] → charging
charging → [timer + has_min] → aggregating → [timer + result] → completed
→ [timer + failed] → failed
completed → [timer 30min] → expired
Timer-Driven Transitions
| Timer | Duration | From | To | Guard |
|---|---|---|---|---|
waiting_timeout | 600s | waiting | aborted | - |
ready_timeout | 600s | active | aborted | - |
predicting_decision_timeout | 10s | predicting_decision | charging | - |
predicting_timeout | 30s | predicting | charging | - |
to_aggregating | 3s | charging | aggregating | has_min_participants |
grace_period | 1800s | completed | expired | - |
FSM Event Pattern
TriggerEvent Flow
// Application layer calls SpaceFSMPort.Execute()
result, err := fsm.Execute(ctx, application.SpaceFSMCommand{
Space: snapshot.Clone(), // Always clone!
EventName: "activate",
Params: map[string]any{"participant_id": participantID},
Metadata: map[string]string{"trace_id": traceID},
Idempotency: &SpaceFSMIdempotencyCommit{...},
})
// Runtime flow:
// 1. Validate inputs
// 2. Lookup transition for (current_state, event)
// 3. Execute guards (with timeout)
// 4. Execute actions
// 5. Build JSON patches with placeholder resolution
// 6. CAS commit with version check
// 7. Notify observers on state change
Guards vs Actions
Guards: Pure predicates, return (bool, error), must NOT mutate state
// Guard signature type GuardFunc func(ctx context.Context, evt *EventContext, params map[string]any) (bool, error) // Common guards - has_min_participants // Minimum participant count met - has_not_min_participants // Below minimum (for abort) - all_participants_ready // All marked ready - waiting_timeout_due // Timer expired - has_aggregation_result // Hook job succeeded - aggregation_failed // Hook job failed
Actions: Side effects, return error, run after all guards pass
// Action signature type ActionFunc func(ctx context.Context, evt *EventContext, params map[string]any) error // Common actions - notify_activation // Notify participants of activation - broadcast_ready_state // Broadcast ready to all - log_current_status // Logging
CAS (Compare-And-Swap) Pattern
State Version Mechanism
Every SpaceSnapshot has a StateVersion field (uint64). On each FSM transition:
// Runtime increments version before CAS next.StateVersion = current.StateVersion + 1 // Locker calculates expected version expected := next.StateVersion - 1 // == current version // SQL with version check UPDATE spaces SET ... WHERE id = $1 AND state_version = $expected
Conflict Handling
applied, err := locker.CompareAndSwap(ctx, next, patches, event, commit)
if err != nil {
return err // Actual error
}
if !applied {
return entity.CASConflict(spaceID) // Retry needed
}
UseCase retry pattern:
for attempt := 0; attempt < maxRetries; attempt++ {
snapshot, err := reader.GetByID(ctx, spaceID)
if err != nil { return err }
result, err := fsm.Execute(ctx, command)
if err == nil {
return result
}
if !errors.Is(err, ErrConcurrencyConflict) {
return err // Non-retryable
}
// Retry with fresh snapshot
}
return ErrMaxRetriesExceeded
Patch Whitelist
Only these JSON-Patch operations are allowed:
| Path | Op | Validation |
|---|---|---|
/status | replace | Valid SpaceStatus enum |
/participants/$ID | add/remove | Valid UUID |
/participants/$ID/ready | replace | Boolean |
/leader_profile_id | replace | Valid UUID |
/comparison/photo/version | replace | uint32 |
/comparison/location/version | replace | uint32 |
/comparison/memo/$ID/version | replace | uint32, valid UUID |
Any other patch path is rejected by the locker.
Hook Job Lifecycle
State Hooks
on_enter hooks run after transition commits:
| Hook | Required | States |
|---|---|---|
aggregate_quiz_result | Yes | aggregating |
cleanup_terminated_space | Yes | failed, aborted, expired |
update_friends_last_compared_at | No | completed |
log_current_status | No | any |
Hook Job States
PENDING → RUNNING → DONE
→ FAILED (retry or terminal)
Required vs Optional Hooks
// Required hook failure blocks FSM progression
if job.Required && status == HookJobStatusFailed {
// Transition to failed state via guard check
}
// Optional hook failure is logged but doesn't block
Reconcilers
| Name | Schedule | Purpose |
|---|---|---|
cleanup_terminated_space_backstop | 30s | Clean up active participants in terminal states |
broken_expired_space_cleanup | Daily 3AM | Fix broken expired spaces (ops tool) |
Streaming Architecture
Hub Pattern
// Stream service manages per-space hubs
type Service struct {
hubsMu sync.RWMutex
hubs map[uuid.UUID]*hub.Instance
}
// Client subscribes with resume token
func (s *Service) Subscribe(ctx context.Context, params SubscribeParams) (<-chan Event, error) {
hub := s.getOrCreateHub(params.SpaceID)
return hub.Subscribe(ctx, params.ResumeToken, params.UserID)
}
Resume Token Pattern
Resume tokens enable reconnection without missing events:
type ResumeToken struct {
SpaceID uuid.UUID
StateVersion uint64
EventSeq uint64
}
// On reconnect:
// 1. Parse resume token
// 2. Fetch events after token's sequence
// 3. Replay missed events
// 4. Continue live streaming
Stream Events
// Events pushed to clients
type Event struct {
Type EventType
SpaceID uuid.UUID
StateVersion uint64
Payload any
}
// Event types
- EventTypeSnapshot // Full state sync
- EventTypeParticipant // Participant change
- EventTypeStatus // Status change
- EventTypeComparison // Comparison update
Idempotency Patterns
Four Idempotency Trackers
| Operation | Key Format | Purpose |
|---|---|---|
| Create | {hostID}:{requestID} | Prevent duplicate space creation |
| Activate | {spaceID}:{hostID}:{key} | Prevent duplicate activation |
| Join | {spaceID}:{participantID}:{key} | Prevent duplicate joins |
| Leave | {spaceID}:{targetID}:{key} | Prevent duplicate leaves |
Reserve-Then-Consume Pattern
// Step 1: Reserve idempotency key (before FSM execution)
err := tracker.Reserve(ctx, spaceID, participantID, key)
if err == ErrIdempotencyConflict {
return existingResult // Already processed
}
// Step 2: Execute FSM transition
result, err := fsm.Execute(ctx, command)
// Step 3: Mark consumed (in same transaction via commit struct)
// Done automatically by CAS locker
Error Handling
SpaceError Structure
type SpaceError struct {
code ErrorCode // e.g., "FAILED_PRECONDITION"
reason string // e.g., "SPACE_NOT_IN_WAITING"
domain string // "space.v1"
message string // Human-readable message
meta map[string]string // Contextual metadata
precondition *PreconditionViolation
inner error // Wrapped error
}
Factory Functions
Always use factory functions, never construct directly:
// Permission errors entity.NotHost(spaceID, callerID) entity.InsufficientRole(spaceID, userID, "host") entity.InvalidEntryCode(spaceID, code) // Precondition errors entity.StateViolation(spaceID, "waiting", "active") entity.SpaceNotInWaiting(spaceID, "active") entity.MinParticipantsViolation(spaceID, 2, 1) // Resource exhausted errors entity.SpaceFull(spaceID, 10, 10) entity.HostQuotaExceeded(hostID, true, false) entity.JoinRateLimitExceeded(participantID, window, limit, retryAfter) // Concurrency errors entity.CASConflict(spaceID) entity.IdempotencyConflict(spaceID, hostID, key)
Protocol Layer Mapping
// rpcerr/from.go maps domain errors to Connect errors
func FromSpaceError(err error) *connect.Error {
spaceErr, ok := entity.AsSpaceError(err)
if !ok {
return connect.NewError(codes.Internal, err)
}
code := mapCode(spaceErr.Code())
connErr := connect.NewError(code, errors.New(spaceErr.Error()))
// Add ErrorInfo detail
detail := &errdetails.ErrorInfo{
Reason: spaceErr.Reason(),
Domain: spaceErr.Domain(),
Metadata: spaceErr.Metadata(),
}
connErr.AddDetail(detail)
return connErr
}
UseCase Patterns
Guard-UseCase Pattern
UseCases that invoke FSM events follow this structure:
type ActivateQuizComparisonUseCase struct {
fsm SpaceFSMPort
snapshotReader SpaceSnapshotReader
quotaInspector SpaceQuotaInspector
idempotency SpaceActivateIdempotencyTracker
}
func (uc *ActivateQuizComparisonUseCase) Execute(ctx context.Context, req Request) (*Result, error) {
// 1. Load current state
snapshot, err := uc.snapshotReader.GetByID(ctx, req.SpaceID)
if err != nil { return nil, err }
// 2. Pre-FSM validation (outside FSM guards)
if snapshot.HostUserID != req.CallerID {
return nil, entity.NotHost(req.SpaceID, req.CallerID)
}
// 3. Idempotency check
record, err := uc.idempotency.ReserveActivation(ctx, reservation)
if record.Consumed {
return record.Result, nil // Already activated
}
// 4. Execute FSM event
result, err := uc.fsm.Execute(ctx, command)
if err != nil {
return nil, uc.mapError(err)
}
// 5. Post-transition side effects (if needed)
return mapResult(result), nil
}
Stream Service Pattern
type Service struct {
hubsMu sync.RWMutex
hubs map[uuid.UUID]*hub.Instance
snapshotReader SpaceSnapshotReader
clock clock.Clock
}
// SpaceFSMTransitionObserver implementation
func (s *Service) OnTransition(ctx context.Context, event TransitionEvent) {
s.hubsMu.RLock()
h, ok := s.hubs[event.SpaceID]
s.hubsMu.RUnlock()
if ok {
h.Broadcast(ctx, mapToStreamEvent(event))
}
}
Participant Management
Participant Entity
type Participant struct {
SpaceID uuid.UUID
UserID uuid.UUID
ProfileID *uuid.UUID
Role Role // "host" or "participant"
IsActive bool
IsReady bool
JoinedAt time.Time
LeftAt *time.Time
}
// Roles
const (
RoleHost Role = "host"
RoleParticipant Role = "participant"
)
Participant Lifecycle
Created (host) → Join (participants) → Ready → Leave/Kick
Join Flow
- •Pre-checks: quota, rate limits, active elsewhere
- •Reserve idempotency key
- •FSM
add_participantevent - •Update participant count
- •Consume idempotency key
Leave Reasons
| Reason | Description | Initiator |
|---|---|---|
SELF_INITIATED | Voluntary leave | Self |
HOST_FORCED | Kicked by host | Host |
ADMIN_FORCED | Removed by admin | Admin |
SYSTEM | System cleanup | System |
Rate Limiting & Quotas
Host Quotas
type HostCapacity struct {
HasWaitingSpace bool
HasActivePresence bool
WaitingSpaces int32
ActiveSpaces int32
ChargingSpaces int32
AggregatingSpaces int32
}
// Check before creating space
if capacity.HasWaitingSpace {
return entity.HostQuotaExceeded(hostID, true, false)
}
Rate Limits
| Operation | Limit | Window | Error |
|---|---|---|---|
| Create Space | Configurable | Per host | HostCreateRateLimitExceeded |
| Join Space | Configurable | Per user | JoinRateLimitExceeded |
| Concurrent Streams | Max N | Per user | StreamLimitExceeded |
Testing Patterns
FSM Test Structure
func TestActivateTransition(t *testing.T) {
// Arrange: Create snapshot in waiting state
snapshot := testhelpers.NewWaitingSpace(t, 3) // 3 participants
// Act: Trigger activate event
result, err := fsm.Execute(ctx, SpaceFSMCommand{
Space: snapshot,
EventName: "activate",
})
// Assert: Transition succeeded
require.NoError(t, err)
assert.Equal(t, SpaceStatusActive, result.NextSnapshot.Status)
assert.Equal(t, snapshot.StateVersion+1, result.NextSnapshot.StateVersion)
}
CAS Conflict Test
func TestCASConflict(t *testing.T) {
// Arrange: Two concurrent readers
snapshot1, _ := reader.GetByID(ctx, spaceID)
snapshot2, _ := reader.GetByID(ctx, spaceID)
// Act: First update succeeds
_, err1 := fsm.Execute(ctx, SpaceFSMCommand{Space: snapshot1, EventName: "activate"})
require.NoError(t, err1)
// Second update fails with CAS conflict
_, err2 := fsm.Execute(ctx, SpaceFSMCommand{Space: snapshot2, EventName: "activate"})
// Assert
assert.ErrorIs(t, err2, ErrConcurrencyConflict)
}
Integration Test Pattern
func TestJoinSpaceIntegration(t *testing.T) {
// Setup: Create space with host
space := suite.CreateTestSpace(t, hostID)
// Execute: Join as participant
resp, err := client.JoinSpace(ctx, &spacev1.JoinSpaceRequest{
SpaceId: space.ID.String(),
EntryCode: space.EntryCode,
})
// Verify: Participant added
require.NoError(t, err)
assert.Len(t, resp.Participants, 2) // host + new participant
}
Common Pitfalls
1. Always Clone Before FSM Execute
// WRONG: Mutates original
result, _ := fsm.Execute(ctx, SpaceFSMCommand{Space: snapshot})
// CORRECT: Clone first
result, _ := fsm.Execute(ctx, SpaceFSMCommand{Space: snapshot.Clone()})
2. Check Terminal States Early
// Before any operation on completed/failed/expired/aborted spaces
if snapshot.Status.IsTerminal() {
return entity.StateViolation(spaceID, "non-terminal", string(snapshot.Status))
}
3. Handle CAS Conflicts
// Never ignore CAS conflicts - always retry or propagate
if errors.Is(err, ErrConcurrencyConflict) {
// Retry with fresh snapshot or return to caller
}
4. Validate Placeholders in Params
// Runtime validates placeholders like $PARTICIPANT_ID
// Ensure params contain valid values
params := map[string]any{
"participant_id": participantID.String(), // Must be valid UUID string
"is_ready": true, // Must be boolean
}
Key File Locations
| Component | Path |
|---|---|
| FSM Definition | configs/state.yaml |
| FSM Runtime | internal/shared/fsm/runtime/runtime.go |
| FSM Adapter | internal/feature/space/data/adapter/spacefsm/executor.go |
| CAS Locker | internal/feature/space/data/repo/space/locker.go |
| Domain Entities | internal/feature/space/domain/entity/ |
| Error Factories | internal/feature/space/domain/entity/errors.go |
| UseCases | internal/feature/space/application/ |
| Stream Service | internal/feature/space/application/stream/service.go |
| Protocol Manager | internal/feature/space/protocol/manager.go |
| RPC Error Mapping | internal/feature/space/protocol/rpcerr/from.go |