AgentSkillsCN

azure-storagequeues-to-sqs

将 Azure Storage Queues 迁移到 AWS SQS。将 Azure.Storage.Queues 转换为 AWSSDK.SQS,全面覆盖消息入队与出队、可见性超时以及死信队列等核心功能,并严格保持队列扩展方法的原貌。

SKILL.md
--- frontmatter
name: azure-storagequeues-to-sqs
description: |
  Migrate Azure Storage Queues to AWS SQS. Converts Azure.Storage.Queues to AWSSDK.SQS,
  handling message enqueue/dequeue, visibility timeout, and dead letter queue patterns.
  Preserves queue extension method signatures.
disposition: contextual
filePatterns:
  - "**/*.cs"
  - "**/*.csproj"
  - "**/appsettings*.json"
  - "**/Program.cs"
  - "**/Startup.cs"
version: 1.0.0

Azure Storage Queues → AWS SQS Migration

Overview

Migrate from Azure.Storage.Queues to AWS Simple Queue Service (SQS) while preserving extension method signatures for queue operations.

Package Migration

Azure Packages (Remove)

xml
<PackageReference Include="Azure.Storage.Queues" Version="12.x.x" />
<PackageReference Include="Azure.Identity" Version="1.x.x" />

AWS Packages (Add)

xml
<PackageReference Include="AWSSDK.SQS" Version="3.7.x" />
<PackageReference Include="AWSSDK.Extensions.NETCore.Setup" Version="3.7.x" />

Configuration Migration

Azure Configuration (appsettings.json)

json
{
  "AzureStorage": {
    "ConnectionString": "DefaultEndpointsProtocol=https;AccountName=...",
    "QueueName": "myqueue"
  }
}

AWS Configuration (appsettings.json)

json
{
  "AWS": {
    "Region": "us-east-1"
  },
  "SQS": {
    "QueueUrl": "https://sqs.us-east-1.amazonaws.com/123456789012/myqueue",
    "VisibilityTimeoutSeconds": 30,
    "MaxNumberOfMessages": 10,
    "WaitTimeSeconds": 20
  }
}

Environment Variables:

  • AWS_REGION - AWS region
  • SQS_QUEUE_URL - Full SQS queue URL
  • Use IAM roles for authentication

Service Registration

Azure (Before)

csharp
services.AddSingleton(sp =>
{
    var config = sp.GetRequiredService<IConfiguration>();
    var connectionString = config["AzureStorage:ConnectionString"];
    var queueName = config["AzureStorage:QueueName"];
    
    var queueClient = new QueueClient(connectionString, queueName);
    queueClient.CreateIfNotExists();
    
    return queueClient;
});

AWS (After)

csharp
services.AddDefaultAWSOptions(configuration.GetAWSOptions());
services.AddAWSService<IAmazonSQS>();

services.AddSingleton<IQueueService, AwsSqsQueueService>();

Concept Mapping

Azure Storage QueuesAWS SQSNotes
SendMessageAsync()SendMessageAsync()Direct equivalent
ReceiveMessagesAsync()ReceiveMessageAsync()SQS uses long polling
DeleteMessageAsync()DeleteMessageAsync()Requires receipt handle
Visibility timeoutVisibility timeoutSame concept
Message TTL (7 days)Message retention (14 days)Different defaults
Poison queueDead letter queue (DLQ)Similar pattern
Message size (64 KB)Message size (256 KB)SQS allows larger

Extension Method Preservation

CRITICAL: Keep queue API surface identical.

Example Extension Methods (Keep Signatures)

csharp
public interface IQueueService
{
    Task<bool> SendMessageAsync<T>(T message, CancellationToken cancellationToken = default) where T : class;
    Task<IEnumerable<QueueMessage<T>>> ReceiveMessagesAsync<T>(int maxMessages = 1, CancellationToken cancellationToken = default) where T : class;
    Task<bool> DeleteMessageAsync(string messageId, string popReceipt, CancellationToken cancellationToken = default);
    Task<int> GetApproximateMessageCountAsync(CancellationToken cancellationToken = default);
}

public class QueueMessage<T>
{
    public string MessageId { get; set; } = string.Empty;
    public string PopReceipt { get; set; } = string.Empty;
    public T Body { get; set; } = default!;
    public int DequeueCount { get; set; }
}

Azure Implementation (Before)

csharp
public class AzureQueueService : IQueueService
{
    private readonly QueueClient _queueClient;
    private readonly ILogger<AzureQueueService> _logger;

    public AzureQueueService(QueueClient queueClient, ILogger<AzureQueueService> logger)
    {
        _queueClient = queueClient;
        _logger = logger;
    }

    public async Task<bool> SendMessageAsync<T>(T message, CancellationToken cancellationToken = default) where T : class
    {
        try
        {
            var json = JsonSerializer.Serialize(message);
            var base64 = Convert.ToBase64String(Encoding.UTF8.GetBytes(json));
            
            await _queueClient.SendMessageAsync(base64, cancellationToken: cancellationToken);
            _logger.LogInformation("Message sent to queue");
            return true;
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "Failed to send message");
            return false;
        }
    }

    public async Task<IEnumerable<QueueMessage<T>>> ReceiveMessagesAsync<T>(
        int maxMessages = 1, CancellationToken cancellationToken = default) where T : class
    {
        var response = await _queueClient.ReceiveMessagesAsync(
            maxMessages: maxMessages, 
            cancellationToken: cancellationToken);

        return response.Value.Select(msg =>
        {
            var json = Encoding.UTF8.GetString(Convert.FromBase64String(msg.MessageText));
            var body = JsonSerializer.Deserialize<T>(json)!;

            return new QueueMessage<T>
            {
                MessageId = msg.MessageId,
                PopReceipt = msg.PopReceipt,
                Body = body,
                DequeueCount = msg.DequeueCount
            };
        }).ToList();
    }

    public async Task<bool> DeleteMessageAsync(string messageId, string popReceipt, CancellationToken cancellationToken = default)
    {
        try
        {
            await _queueClient.DeleteMessageAsync(messageId, popReceipt, cancellationToken);
            return true;
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "Failed to delete message {MessageId}", messageId);
            return false;
        }
    }

    public async Task<int> GetApproximateMessageCountAsync(CancellationToken cancellationToken = default)
    {
        var properties = await _queueClient.GetPropertiesAsync(cancellationToken);
        return properties.Value.ApproximateMessagesCount;
    }
}

AWS Implementation (After)

csharp
public class AwsSqsQueueService : IQueueService
{
    private readonly IAmazonSQS _sqsClient;
    private readonly ILogger<AwsSqsQueueService> _logger;
    private readonly string _queueUrl;

    public AwsSqsQueueService(
        IAmazonSQS sqsClient,
        IConfiguration configuration,
        ILogger<AwsSqsQueueService> logger)
    {
        _sqsClient = sqsClient;
        _logger = logger;
        _queueUrl = configuration["SQS:QueueUrl"]!;
    }

    public async Task<bool> SendMessageAsync<T>(T message, CancellationToken cancellationToken = default) where T : class
    {
        try
        {
            var json = JsonSerializer.Serialize(message);
            
            var request = new SendMessageRequest
            {
                QueueUrl = _queueUrl,
                MessageBody = json
            };

            await _sqsClient.SendMessageAsync(request, cancellationToken);
            _logger.LogInformation("Message sent to SQS queue");
            return true;
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "Failed to send message to SQS");
            return false;
        }
    }

    public async Task<IEnumerable<QueueMessage<T>>> ReceiveMessagesAsync<T>(
        int maxMessages = 1, CancellationToken cancellationToken = default) where T : class
    {
        var request = new ReceiveMessageRequest
        {
            QueueUrl = _queueUrl,
            MaxNumberOfMessages = Math.Min(maxMessages, 10), // SQS max is 10
            WaitTimeSeconds = 20, // Long polling
            AttributeNames = new List<string> { "ApproximateReceiveCount" }
        };

        var response = await _sqsClient.ReceiveMessageAsync(request, cancellationToken);

        return response.Messages.Select(msg =>
        {
            var body = JsonSerializer.Deserialize<T>(msg.Body)!;
            
            var dequeueCount = 0;
            if (msg.Attributes.TryGetValue("ApproximateReceiveCount", out var countStr))
            {
                int.TryParse(countStr, out dequeueCount);
            }

            return new QueueMessage<T>
            {
                MessageId = msg.MessageId,
                PopReceipt = msg.ReceiptHandle, // SQS uses ReceiptHandle
                Body = body,
                DequeueCount = dequeueCount
            };
        }).ToList();
    }

    public async Task<bool> DeleteMessageAsync(string messageId, string popReceipt, CancellationToken cancellationToken = default)
    {
        try
        {
            var request = new DeleteMessageRequest
            {
                QueueUrl = _queueUrl,
                ReceiptHandle = popReceipt // SQS uses receipt handle, not messageId
            };

            await _sqsClient.DeleteMessageAsync(request, cancellationToken);
            return true;
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "Failed to delete message from SQS");
            return false;
        }
    }

    public async Task<int> GetApproximateMessageCountAsync(CancellationToken cancellationToken = default)
    {
        var request = new GetQueueAttributesRequest
        {
            QueueUrl = _queueUrl,
            AttributeNames = new List<string> { "ApproximateNumberOfMessages" }
        };

        var response = await _sqsClient.GetQueueAttributesAsync(request, cancellationToken);
        
        if (response.Attributes.TryGetValue("ApproximateNumberOfMessages", out var countStr))
        {
            return int.Parse(countStr);
        }

        return 0;
    }
}

Queue Infrastructure (Terraform)

hcl
# Standard SQS Queue
resource "aws_sqs_queue" "main" {
  name                       = "myqueue"
  visibility_timeout_seconds = 30
  message_retention_seconds  = 1209600  # 14 days
  max_message_size           = 262144   # 256 KB
  delay_seconds              = 0
  receive_wait_time_seconds  = 20       # Long polling

  # Dead letter queue
  redrive_policy = jsonencode({
    deadLetterTargetArn = aws_sqs_queue.dlq.arn
    maxReceiveCount     = 3
  })

  tags = {
    Environment = "production"
  }
}

# Dead Letter Queue
resource "aws_sqs_queue" "dlq" {
  name                       = "myqueue-dlq"
  message_retention_seconds  = 1209600  # 14 days
}

# Optional: FIFO Queue (for ordering guarantees)
resource "aws_sqs_queue" "fifo" {
  name                        = "myqueue.fifo"
  fifo_queue                  = true
  content_based_deduplication = true
  deduplication_scope         = "messageGroup"
  fifo_throughput_limit       = "perMessageGroupId"
}

IAM Requirements

json
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "sqs:SendMessage",
        "sqs:ReceiveMessage",
        "sqs:DeleteMessage",
        "sqs:GetQueueAttributes",
        "sqs:GetQueueUrl"
      ],
      "Resource": "arn:aws:sqs:*:*:myqueue"
    }
  ]
}

Testing with Localstack

docker-compose.yml:

yaml
version: '3.8'
services:
  localstack:
    image: localstack/localstack:latest
    ports:
      - "4566:4566"
    environment:
      - SERVICES=sqs
      - DEBUG=1

Test configuration (appsettings.Development.json):

json
{
  "AWS": {
    "Region": "us-east-1",
    "ServiceURL": "http://localhost:4566"
  },
  "SQS": {
    "QueueUrl": "http://localhost:4566/000000000000/myqueue"
  }
}

Migration Checklist

  • Remove Azure Storage Queues NuGet packages
  • Add AWS SQS NuGet packages
  • Create SQS queues (Terraform/CloudFormation)
  • Configure dead letter queue
  • Update service registration
  • Convert QueueClient to IAmazonSQS
  • Update configuration (connection string → queue URL)
  • Add IAM policies
  • Update visibility timeout settings
  • Configure long polling (WaitTimeSeconds)
  • Update integration tests to use Localstack
  • Verify extension method signatures unchanged
  • Test message processing and error handling

Common Pitfalls

⚠️ Long polling: SQS requires explicit long polling config (WaitTimeSeconds > 0) for efficiency
⚠️ Receipt handle: SQS uses ReceiptHandle (not MessageId) for delete operations
⚠️ Message size: SQS allows 256 KB (vs Azure's 64 KB) but check your message sizes
⚠️ FIFO queues: If you need ordering, use .fifo suffix and enable content-based deduplication
⚠️ Visibility timeout: SQS default is 30s (can extend per-message with ChangeMessageVisibility)
⚠️ Batch operations: SQS supports batch send/delete (up to 10 messages) for efficiency

Advanced: FIFO Queue Migration

If you need message ordering or exactly-once processing, use SQS FIFO:

csharp
var request = new SendMessageRequest
{
    QueueUrl = _queueUrl,
    MessageBody = json,
    MessageGroupId = "order-processing", // Required for FIFO
    MessageDeduplicationId = Guid.NewGuid().ToString() // Or use content-based
};

Success Criteria

✅ All Azure Storage Queues code replaced with SQS
✅ Extension method signatures unchanged
✅ Dead letter queue configured
✅ Long polling enabled for efficiency
✅ IAM policies applied
✅ Localstack tests passing
✅ Message processing working correctly
✅ No breaking changes to consuming applications