AgentSkillsCN

go-concurrency-patterns

精通 Go 并发编程,熟练运用 goroutine、channel、同步原语以及 context。适用于构建高并发 Go 应用程序、实现工作池机制,或调试竞态条件等问题。

SKILL.md
--- frontmatter
name: go-concurrency-patterns
description: Master Go concurrency with goroutines, channels, sync primitives, and context. Use when building concurrent Go applications, implementing worker pools, or debugging race conditions.

Go Concurrency Patterns

Production patterns for Go concurrency including goroutines, channels, synchronization primitives, and context management.

When to Use This Skill

  • Building concurrent Go applications
  • Implementing worker pools and pipelines
  • Managing goroutine lifecycles
  • Using channels for communication
  • Debugging race conditions
  • Implementing graceful shutdown

Core Concepts

1. Go Concurrency Primitives

PrimitivePurpose
goroutineLightweight concurrent execution
channelCommunication between goroutines
selectMultiplex channel operations
sync.MutexMutual exclusion
sync.WaitGroupWait for goroutines to complete
context.ContextCancellation and deadlines

2. Go Concurrency Mantra

code
Don't communicate by sharing memory;
share memory by communicating.

Quick Start

go
package main

import (
  "context"
  "fmt"
  "sync"
  "time"
)

func main() {
  ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
  defer cancel()

  results := make(chan string, 10)
  var wg sync.WaitGroup

  // Spawn workers
  for i := 0; i < 3; i++ {
    wg.Go(func() {
      worker(ctx, i, results)
    })
  }

  // Close results when done
  go func() {
    wg.Wait()
    close(results)
  }()

  // Collect results
  for result := range results {
    fmt.Println(result)
  }
}

func worker(ctx context.Context, id int, results chan<- string) {
  select {
  case <-ctx.Done():
    return
  case results <- fmt.Sprintf("Worker %d done", id):
  }
}

Patterns

Pattern 1: Worker Pool

go
package main

import (
  "context"
  "fmt"
  "sync"
)

type Job struct {
  ID   int
  Data string
}

type Result struct {
  JobID int
  Output string
  Err   error
}

func WorkerPool(ctx context.Context, numWorkers int, jobs <-chan Job) <-chan Result {
  results := make(chan Result, len(jobs))

  var wg sync.WaitGroup
  for i := 0; i < numWorkers; i++ {
    wg.Go(func() {
      for job := range jobs {
        select {
        case <-ctx.Done():
          return
        default:
          result := processJob(job)
          results <- result
        }
      }
    })
  }

  go func() {
    wg.Wait()
    close(results)
  }()

  return results
}

func processJob(job Job) Result {
  // Simulate work
  return Result{
    JobID:  job.ID,
    Output: fmt.Sprintf("Processed: %s", job.Data),
  }
}

// Usage
func main() {
  ctx, cancel := context.WithCancel(context.Background())
  defer cancel()

  jobs := make(chan Job, 100)

  // Send jobs
  go func() {
    for i := 0; i < 50; i++ {
      jobs <- Job{ID: i, Data: fmt.Sprintf("job-%d", i)}
    }
    close(jobs)
  }()

  // Process with 5 workers
  results := WorkerPool(ctx, 5, jobs)

  for result := range results {
    fmt.Printf("Result: %+v\n", result)
  }
}

Pattern 2: Fan-Out/Fan-In Pipeline

go
package main

import (
  "context"
  "sync"
)

// Stage 1: Generate numbers
func generate(ctx context.Context, nums ...int) <-chan int {
  out := make(chan int)
  go func() {
    defer close(out)
    for _, n := range nums {
      select {
      case <-ctx.Done():
        return
      case out <- n:
      }
    }
  }()
  return out
}

// Stage 2: Square numbers (can run multiple instances)
func square(ctx context.Context, in <-chan int) <-chan int {
  out := make(chan int)
  go func() {
    defer close(out)
    for n := range in {
      select {
      case <-ctx.Done():
        return
      case out <- n * n:
      }
    }
  }()
  return out
}

// Fan-in: Merge multiple channels into one
func merge(ctx context.Context, cs ...<-chan int) <-chan int {
  var wg sync.WaitGroup
  out := make(chan int)

  // Start output goroutine for each input channel
  for _, c := range cs {
    ch := c
    wg.Go(func() {
      for n := range ch {
        select {
        case <-ctx.Done():
          return
        case out <- n:
        }
      }
    })
  }

  // Close out after all inputs are done
  go func() {
    wg.Wait()
    close(out)
  }()

  return out
}

func main() {
  ctx, cancel := context.WithCancel(context.Background())
  defer cancel()

  // Generate input
  in := generate(ctx, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

  // Fan out to multiple squarers
  c1 := square(ctx, in)
  c2 := square(ctx, in)
  c3 := square(ctx, in)

  // Fan in results
  for result := range merge(ctx, c1, c2, c3) {
    fmt.Println(result)
  }
}

Pattern 3: Bounded Concurrency with Semaphore

go
package main

import (
  "context"
  "fmt"
  "golang.org/x/sync/semaphore"
  "sync"
)

type RateLimitedWorker struct {
  sem *semaphore.Weighted
}

func NewRateLimitedWorker(maxConcurrent int64) *RateLimitedWorker {
  return &RateLimitedWorker{
    sem: semaphore.NewWeighted(maxConcurrent),
  }
}

func (w *RateLimitedWorker) Do(ctx context.Context, tasks []func() error) []error {
  var (
    wg     sync.WaitGroup
    mu     sync.Mutex
    errors []error
  )

  for _, task := range tasks {
    // Acquire semaphore (blocks if at limit)
    if err := w.sem.Acquire(ctx, 1); err != nil {
      return []error{err}
    }

    t := task
    wg.Go(func() {
      defer w.sem.Release(1)

      if err := t(); err != nil {
        mu.Lock()
        errors = append(errors, err)
        mu.Unlock()
      }
    })
  }

  wg.Wait()
  return errors
}

// Alternative: Channel-based semaphore
type Semaphore chan struct{}

func NewSemaphore(n int) Semaphore {
  return make(chan struct{}, n)
}

func (s Semaphore) Acquire() {
  s <- struct{}{}
}

func (s Semaphore) Release() {
  <-s
}

Pattern 4: Graceful Shutdown

go
package main

import (
  "context"
  "fmt"
  "os"
  "os/signal"
  "sync"
  "syscall"
  "time"
)

type Server struct {
  shutdown chan struct{}
  wg       sync.WaitGroup
}

func NewServer() *Server {
  return &Server{
    shutdown: make(chan struct{}),
  }
}

func (s *Server) Start(ctx context.Context) {
  // Start workers
  for i := 0; i < 5; i++ {
    s.wg.Go(func() {
      s.worker(ctx, i)
    })
  }
}

func (s *Server) worker(ctx context.Context, id int) {
  defer fmt.Printf("Worker %d stopped\n", id)

  ticker := time.NewTicker(time.Second)
  defer ticker.Stop()

  for {
    select {
    case <-ctx.Done():
      // Cleanup
      fmt.Printf("Worker %d cleaning up...\n", id)
      time.Sleep(500 * time.Millisecond) // Simulated cleanup
      return
    case <-ticker.C:
      fmt.Printf("Worker %d working...\n", id)
    }
  }
}

func (s *Server) Shutdown(timeout time.Duration) {
  // Signal shutdown
  close(s.shutdown)

  // Wait with timeout
  done := make(chan struct{})
  go func() {
    s.wg.Wait()
    close(done)
  }()

  select {
  case <-done:
    fmt.Println("Clean shutdown completed")
  case <-time.After(timeout):
    fmt.Println("Shutdown timed out, forcing exit")
  }
}

func main() {
  // Setup signal handling
  ctx, cancel := context.WithCancel(context.Background())

  sigCh := make(chan os.Signal, 1)
  signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)

  server := NewServer()
  server.Start(ctx)

  // Wait for signal
  sig := <-sigCh
  fmt.Printf("\nReceived signal: %v\n", sig)

  // Cancel context to stop workers
  cancel()

  // Wait for graceful shutdown
  server.Shutdown(5 * time.Second)
}

Pattern 5: Error Group with Cancellation

go
package main

import (
  "context"
  "fmt"
  "golang.org/x/sync/errgroup"
  "net/http"
)

func fetchAllURLs(ctx context.Context, urls []string) ([]string, error) {
  g, ctx := errgroup.WithContext(ctx)

  results := make([]string, len(urls))

  for i, url := range urls {
    i, url := i, url // Capture loop variables

    g.Go(func() error {
      req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
      if err != nil {
        return fmt.Errorf("creating request for %s: %w", url, err)
      }

      resp, err := http.DefaultClient.Do(req)
      if err != nil {
        return fmt.Errorf("fetching %s: %w", url, err)
      }
      defer resp.Body.Close()

      results[i] = fmt.Sprintf("%s: %d", url, resp.StatusCode)
      return nil
    })
  }

  // Wait for all goroutines to complete or one to fail
  if err := g.Wait(); err != nil {
    return nil, err // First error cancels all others
  }

  return results, nil
}

// With concurrency limit
func fetchWithLimit(ctx context.Context, urls []string, limit int) ([]string, error) {
  g, ctx := errgroup.WithContext(ctx)
  g.SetLimit(limit) // Max concurrent goroutines

  results := make([]string, len(urls))
  var mu sync.Mutex

  for i, url := range urls {
    i, url := i, url

    g.Go(func() error {
      result, err := fetchURL(ctx, url)
      if err != nil {
        return err
      }

      mu.Lock()
      results[i] = result
      mu.Unlock()
      return nil
    })
  }

  if err := g.Wait(); err != nil {
    return nil, err
  }

  return results, nil
}

Pattern 6: Concurrent Map with sync.Map

go
package main

import (
  "sync"
)

// For frequent reads, infrequent writes
type Cache struct {
  m sync.Map
}

func (c *Cache) Get(key string) (interface{}, bool) {
  return c.m.Load(key)
}

func (c *Cache) Set(key string, value interface{}) {
  c.m.Store(key, value)
}

func (c *Cache) GetOrSet(key string, value interface{}) (interface{}, bool) {
  return c.m.LoadOrStore(key, value)
}

func (c *Cache) Delete(key string) {
  c.m.Delete(key)
}

// For write-heavy workloads, use sharded map
type ShardedMap struct {
  shards    []*shard
  numShards int
}

type shard struct {
  sync.RWMutex
  data map[string]interface{}
}

func NewShardedMap(numShards int) *ShardedMap {
  m := &ShardedMap{
    shards:    make([]*shard, numShards),
    numShards: numShards,
  }
  for i := range m.shards {
    m.shards[i] = &shard{data: make(map[string]interface{})}
  }
  return m
}

func (m *ShardedMap) getShard(key string) *shard {
  // Simple hash
  h := 0
  for _, c := range key {
    h = 31*h + int(c)
  }
  return m.shards[h%m.numShards]
}

func (m *ShardedMap) Get(key string) (interface{}, bool) {
  shard := m.getShard(key)
  shard.RLock()
  defer shard.RUnlock()
  v, ok := shard.data[key]
  return v, ok
}

func (m *ShardedMap) Set(key string, value interface{}) {
  shard := m.getShard(key)
  shard.Lock()
  defer shard.Unlock()
  shard.data[key] = value
}

Pattern 7: Select with Timeout and Default

go
func selectPatterns() {
  ch := make(chan int)

  // Timeout pattern
  select {
  case v := <-ch:
    fmt.Println("Received:", v)
  case <-time.After(time.Second):
    fmt.Println("Timeout!")
  }

  // Non-blocking send/receive
  select {
  case ch <- 42:
    fmt.Println("Sent")
  default:
    fmt.Println("Channel full, skipping")
  }

  // Priority select (check high priority first)
  highPriority := make(chan int)
  lowPriority := make(chan int)

  for {
    select {
    case msg := <-highPriority:
      fmt.Println("High priority:", msg)
    default:
      select {
      case msg := <-highPriority:
        fmt.Println("High priority:", msg)
      case msg := <-lowPriority:
        fmt.Println("Low priority:", msg)
      }
    }
  }
}

Race Detection

bash
# Run tests with race detector
go test -race ./...

# Build with race detector
go build -race .

# Run with race detector
go run -race main.go

Best Practices

Do's

  • Use context - For cancellation and deadlines
  • Close channels - From sender side only
  • Use errgroup - For concurrent operations with errors
  • Buffer channels - When you know the count
  • Prefer channels - Over mutexes when possible

Don'ts

  • Don't leak goroutines - Always have exit path
  • Don't close from receiver - Causes panic
  • Don't use shared memory - Unless necessary
  • Don't ignore context cancellation - Check ctx.Done()
  • Don't use time.Sleep for sync - Use proper primitives

Resources