AgentSkillsCN

azure-servicebus-to-sqs-sns

将 Azure Service Bus 迁移到 AWS SQS 和 SNS。将 Azure.Messaging.ServiceBus 转换为 AWSSDK.SQS 和 AWSSDK.SimpleNotificationService,全面支持队列与主题/订阅模式、消息路由以及死信队列功能。

SKILL.md
--- frontmatter
name: azure-servicebus-to-sqs-sns
description: |
  Migrate Azure Service Bus to AWS SQS and SNS. Converts Azure.Messaging.ServiceBus
  to AWSSDK.SQS and AWSSDK.SimpleNotificationService, handling queue and topic/subscription
  patterns, message routing, and dead letter queues.
disposition: contextual
filePatterns:
  - "**/*.cs"
  - "**/*.csproj"
  - "**/appsettings*.json"
  - "**/Program.cs"
  - "**/Startup.cs"
version: 1.0.0

Azure Service Bus → AWS SQS + SNS Migration

Overview

Migrate from Azure.Messaging.ServiceBus to AWS SQS (queues) and SNS (topics/subscriptions) while preserving messaging extension method signatures.

Package Migration

Azure Packages (Remove)

xml
<PackageReference Include="Azure.Messaging.ServiceBus" Version="7.x.x" />
<PackageReference Include="Azure.Identity" Version="1.x.x" />

AWS Packages (Add)

xml
<PackageReference Include="AWSSDK.SQS" Version="3.7.x" />
<PackageReference Include="AWSSDK.SimpleNotificationService" Version="3.7.x" />
<PackageReference Include="AWSSDK.Extensions.NETCore.Setup" Version="3.7.x" />

Architecture Mapping

Azure Service BusAWS EquivalentPattern
QueueSQS QueuePoint-to-point messaging
TopicSNS TopicPub/sub messaging
SubscriptionSQS Queue + SNS SubscriptionTopic subscriber
SessionSQS FIFO + MessageGroupIdMessage ordering
Dead letter queueSQS DLQPoison message handling

Key difference: Azure Service Bus combines queues and pub/sub in one service. AWS uses SQS for queues and SNS for pub/sub, requiring both services for topic patterns.

Configuration Migration

Azure Configuration (appsettings.json)

json
{
  "ServiceBus": {
    "ConnectionString": "Endpoint=sb://...",
    "QueueName": "orders",
    "TopicName": "events",
    "SubscriptionName": "order-processor"
  }
}

AWS Configuration (appsettings.json)

json
{
  "AWS": {
    "Region": "us-east-1"
  },
  "Messaging": {
    "QueueUrl": "https://sqs.us-east-1.amazonaws.com/123456789012/orders",
    "TopicArn": "arn:aws:sns:us-east-1:123456789012:events"
  }
}

Service Registration

Azure (Before)

csharp
services.AddSingleton(sp =>
{
    var config = sp.GetRequiredService<IConfiguration>();
    var connectionString = config["ServiceBus:ConnectionString"];
    return new ServiceBusClient(connectionString);
});

services.AddSingleton<IMessageSender, ServiceBusMessageSender>();
services.AddSingleton<IMessageReceiver, ServiceBusMessageReceiver>();

AWS (After)

csharp
services.AddDefaultAWSOptions(configuration.GetAWSOptions());
services.AddAWSService<IAmazonSQS>();
services.AddAWSService<IAmazonSimpleNotificationService>();

services.AddSingleton<IMessageSender, AwsMessageSender>();
services.AddSingleton<IMessageReceiver, AwsMessageReceiver>();

Extension Method Preservation

CRITICAL: Keep messaging API surface identical.

Example Extension Methods (Keep Signatures)

csharp
public interface IMessageSender
{
    // Queue operations
    Task<bool> SendQueueMessageAsync<T>(T message, CancellationToken cancellationToken = default) where T : class;
    
    // Topic operations
    Task<bool> PublishTopicMessageAsync<T>(T message, Dictionary<string, string>? attributes = null, CancellationToken cancellationToken = default) where T : class;
}

public interface IMessageReceiver
{
    // Queue operations
    Task<IEnumerable<ReceivedMessage<T>>> ReceiveQueueMessagesAsync<T>(int maxMessages = 1, CancellationToken cancellationToken = default) where T : class;
    Task<bool> CompleteMessageAsync(string messageId, string lockToken, CancellationToken cancellationToken = default);
    Task<bool> AbandonMessageAsync(string messageId, string lockToken, CancellationToken cancellationToken = default);
    
    // Subscription operations (topic receiver)
    Task<IEnumerable<ReceivedMessage<T>>> ReceiveSubscriptionMessagesAsync<T>(int maxMessages = 1, CancellationToken cancellationToken = default) where T : class;
}

public class ReceivedMessage<T>
{
    public string MessageId { get; set; } = string.Empty;
    public string LockToken { get; set; } = string.Empty;
    public T Body { get; set; } = default!;
    public Dictionary<string, string> Properties { get; set; } = new();
    public int DeliveryCount { get; set; }
}

Azure Implementation (Before)

csharp
public class ServiceBusMessageSender : IMessageSender
{
    private readonly ServiceBusClient _client;
    private readonly IConfiguration _configuration;
    private readonly ILogger<ServiceBusMessageSender> _logger;

    public ServiceBusMessageSender(ServiceBusClient client, IConfiguration configuration, ILogger<ServiceBusMessageSender> logger)
    {
        _client = client;
        _configuration = configuration;
        _logger = logger;
    }

    public async Task<bool> SendQueueMessageAsync<T>(T message, CancellationToken cancellationToken = default) where T : class
    {
        try
        {
            var queueName = _configuration["ServiceBus:QueueName"];
            var sender = _client.CreateSender(queueName);
            
            var json = JsonSerializer.Serialize(message);
            var serviceBusMessage = new ServiceBusMessage(json);
            
            await sender.SendMessageAsync(serviceBusMessage, cancellationToken);
            _logger.LogInformation("Sent message to queue {QueueName}", queueName);
            return true;
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "Failed to send queue message");
            return false;
        }
    }

    public async Task<bool> PublishTopicMessageAsync<T>(T message, Dictionary<string, string>? attributes = null, CancellationToken cancellationToken = default) where T : class
    {
        try
        {
            var topicName = _configuration["ServiceBus:TopicName"];
            var sender = _client.CreateSender(topicName);
            
            var json = JsonSerializer.Serialize(message);
            var serviceBusMessage = new ServiceBusMessage(json);
            
            if (attributes != null)
            {
                foreach (var attr in attributes)
                {
                    serviceBusMessage.ApplicationProperties[attr.Key] = attr.Value;
                }
            }
            
            await sender.SendMessageAsync(serviceBusMessage, cancellationToken);
            _logger.LogInformation("Published message to topic {TopicName}", topicName);
            return true;
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "Failed to publish topic message");
            return false;
        }
    }
}

AWS Implementation (After)

csharp
public class AwsMessageSender : IMessageSender
{
    private readonly IAmazonSQS _sqsClient;
    private readonly IAmazonSimpleNotificationService _snsClient;
    private readonly IConfiguration _configuration;
    private readonly ILogger<AwsMessageSender> _logger;

    public AwsMessageSender(
        IAmazonSQS sqsClient,
        IAmazonSimpleNotificationService snsClient,
        IConfiguration configuration,
        ILogger<AwsMessageSender> logger)
    {
        _sqsClient = sqsClient;
        _snsClient = snsClient;
        _configuration = configuration;
        _logger = logger;
    }

    public async Task<bool> SendQueueMessageAsync<T>(T message, CancellationToken cancellationToken = default) where T : class
    {
        try
        {
            var queueUrl = _configuration["Messaging:QueueUrl"];
            var json = JsonSerializer.Serialize(message);
            
            var request = new SendMessageRequest
            {
                QueueUrl = queueUrl,
                MessageBody = json
            };

            await _sqsClient.SendMessageAsync(request, cancellationToken);
            _logger.LogInformation("Sent message to SQS queue");
            return true;
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "Failed to send SQS message");
            return false;
        }
    }

    public async Task<bool> PublishTopicMessageAsync<T>(T message, Dictionary<string, string>? attributes = null, CancellationToken cancellationToken = default) where T : class
    {
        try
        {
            var topicArn = _configuration["Messaging:TopicArn"];
            var json = JsonSerializer.Serialize(message);
            
            var request = new PublishRequest
            {
                TopicArn = topicArn,
                Message = json
            };

            if (attributes != null)
            {
                foreach (var attr in attributes)
                {
                    request.MessageAttributes[attr.Key] = new MessageAttributeValue
                    {
                        DataType = "String",
                        StringValue = attr.Value
                    };
                }
            }

            await _snsClient.PublishAsync(request, cancellationToken);
            _logger.LogInformation("Published message to SNS topic");
            return true;
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "Failed to publish SNS message");
            return false;
        }
    }
}

public class AwsMessageReceiver : IMessageReceiver
{
    private readonly IAmazonSQS _sqsClient;
    private readonly IConfiguration _configuration;
    private readonly ILogger<AwsMessageReceiver> _logger;

    public AwsMessageReceiver(IAmazonSQS sqsClient, IConfiguration configuration, ILogger<AwsMessageReceiver> logger)
    {
        _sqsClient = sqsClient;
        _configuration = configuration;
        _logger = logger;
    }

    public async Task<IEnumerable<ReceivedMessage<T>>> ReceiveQueueMessagesAsync<T>(int maxMessages = 1, CancellationToken cancellationToken = default) where T : class
    {
        var queueUrl = _configuration["Messaging:QueueUrl"];
        
        var request = new ReceiveMessageRequest
        {
            QueueUrl = queueUrl,
            MaxNumberOfMessages = Math.Min(maxMessages, 10),
            WaitTimeSeconds = 20,
            AttributeNames = new List<string> { "ApproximateReceiveCount" },
            MessageAttributeNames = new List<string> { "All" }
        };

        var response = await _sqsClient.ReceiveMessageAsync(request, cancellationToken);

        return response.Messages.Select(msg =>
        {
            var body = JsonSerializer.Deserialize<T>(msg.Body)!;
            var deliveryCount = int.Parse(msg.Attributes.GetValueOrDefault("ApproximateReceiveCount", "1"));
            
            var properties = msg.MessageAttributes.ToDictionary(
                kvp => kvp.Key,
                kvp => kvp.Value.StringValue
            );

            return new ReceivedMessage<T>
            {
                MessageId = msg.MessageId,
                LockToken = msg.ReceiptHandle,
                Body = body,
                Properties = properties,
                DeliveryCount = deliveryCount
            };
        }).ToList();
    }

    public async Task<bool> CompleteMessageAsync(string messageId, string lockToken, CancellationToken cancellationToken = default)
    {
        try
        {
            var queueUrl = _configuration["Messaging:QueueUrl"];
            
            var request = new DeleteMessageRequest
            {
                QueueUrl = queueUrl,
                ReceiptHandle = lockToken
            };

            await _sqsClient.DeleteMessageAsync(request, cancellationToken);
            return true;
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "Failed to complete message {MessageId}", messageId);
            return false;
        }
    }

    public async Task<bool> AbandonMessageAsync(string messageId, string lockToken, CancellationToken cancellationToken = default)
    {
        try
        {
            var queueUrl = _configuration["Messaging:QueueUrl"];
            
            // Change visibility timeout to 0 to make message immediately available
            var request = new ChangeMessageVisibilityRequest
            {
                QueueUrl = queueUrl,
                ReceiptHandle = lockToken,
                VisibilityTimeout = 0
            };

            await _sqsClient.ChangeMessageVisibilityAsync(request, cancellationToken);
            return true;
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "Failed to abandon message {MessageId}", messageId);
            return false;
        }
    }

    public async Task<IEnumerable<ReceivedMessage<T>>> ReceiveSubscriptionMessagesAsync<T>(int maxMessages = 1, CancellationToken cancellationToken = default) where T : class
    {
        // In AWS, subscriptions receive via their own SQS queue
        // This requires a different queue URL configured for the subscription
        var subscriptionQueueUrl = _configuration["Messaging:SubscriptionQueueUrl"];
        
        var request = new ReceiveMessageRequest
        {
            QueueUrl = subscriptionQueueUrl,
            MaxNumberOfMessages = Math.Min(maxMessages, 10),
            WaitTimeSeconds = 20,
            AttributeNames = new List<string> { "ApproximateReceiveCount" }
        };

        var response = await _sqsClient.ReceiveMessageAsync(request, cancellationToken);

        return response.Messages.Select(msg =>
        {
            // SNS wraps message in envelope, need to unwrap
            var snsMessage = JsonSerializer.Deserialize<SnsMessage>(msg.Body)!;
            var body = JsonSerializer.Deserialize<T>(snsMessage.Message)!;
            
            var deliveryCount = int.Parse(msg.Attributes.GetValueOrDefault("ApproximateReceiveCount", "1"));

            return new ReceivedMessage<T>
            {
                MessageId = msg.MessageId,
                LockToken = msg.ReceiptHandle,
                Body = body,
                Properties = snsMessage.MessageAttributes.ToDictionary(kvp => kvp.Key, kvp => kvp.Value.Value),
                DeliveryCount = deliveryCount
            };
        }).ToList();
    }

    private class SnsMessage
    {
        public string Message { get; set; } = string.Empty;
        public Dictionary<string, SnsMessageAttribute> MessageAttributes { get; set; } = new();
    }

    private class SnsMessageAttribute
    {
        public string Type { get; set; } = string.Empty;
        public string Value { get; set; } = string.Empty;
    }
}

Infrastructure (Terraform)

hcl
# SQS Queue for direct messaging
resource "aws_sqs_queue" "orders" {
  name                       = "orders"
  visibility_timeout_seconds = 30
  message_retention_seconds  = 1209600
  max_message_size           = 262144
  receive_wait_time_seconds  = 20

  redrive_policy = jsonencode({
    deadLetterTargetArn = aws_sqs_queue.orders_dlq.arn
    maxReceiveCount     = 3
  })
}

resource "aws_sqs_queue" "orders_dlq" {
  name = "orders-dlq"
}

# SNS Topic for pub/sub
resource "aws_sns_topic" "events" {
  name = "events"
}

# SQS Queue for subscription
resource "aws_sqs_queue" "order_processor" {
  name                       = "order-processor"
  visibility_timeout_seconds = 30
  message_retention_seconds  = 1209600
  receive_wait_time_seconds  = 20
}

# Subscribe SQS queue to SNS topic
resource "aws_sns_topic_subscription" "order_processor" {
  topic_arn = aws_sns_topic.events.arn
  protocol  = "sqs"
  endpoint  = aws_sqs_queue.order_processor.arn
}

# Allow SNS to send messages to SQS
resource "aws_sqs_queue_policy" "order_processor" {
  queue_url = aws_sqs_queue.order_processor.id

  policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Effect = "Allow"
        Principal = {
          Service = "sns.amazonaws.com"
        }
        Action   = "sqs:SendMessage"
        Resource = aws_sqs_queue.order_processor.arn
        Condition = {
          ArnEquals = {
            "aws:SourceArn" = aws_sns_topic.events.arn
          }
        }
      }
    ]
  })
}

IAM Requirements

json
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "sqs:SendMessage",
        "sqs:ReceiveMessage",
        "sqs:DeleteMessage",
        "sqs:ChangeMessageVisibility"
      ],
      "Resource": [
        "arn:aws:sqs:*:*:orders",
        "arn:aws:sqs:*:*:order-processor"
      ]
    },
    {
      "Effect": "Allow",
      "Action": [
        "sns:Publish"
      ],
      "Resource": "arn:aws:sns:*:*:events"
    }
  ]
}

Testing with Localstack

docker-compose.yml:

yaml
version: '3.8'
services:
  localstack:
    image: localstack/localstack:latest
    ports:
      - "4566:4566"
    environment:
      - SERVICES=sqs,sns
      - DEBUG=1

Migration Checklist

  • Remove Azure Service Bus NuGet packages
  • Add AWS SQS and SNS NuGet packages
  • Create SQS queues (Terraform/CloudFormation)
  • Create SNS topics for pub/sub patterns
  • Subscribe SQS queues to SNS topics
  • Configure queue policies to allow SNS
  • Update service registration
  • Convert ServiceBusClient to IAmazonSQS + IAmazonSimpleNotificationService
  • Update message unwrapping for SNS → SQS subscriptions
  • Add IAM policies
  • Update integration tests
  • Verify extension method signatures unchanged

Common Pitfalls

⚠️ SNS message envelope: SNS wraps messages in JSON envelope when delivering to SQS - must unwrap
⚠️ No sessions: SQS doesn't have native session support - use FIFO queues with MessageGroupId
⚠️ No transactions: SQS doesn't support transactions across multiple queues
⚠️ Message filters: SNS subscription filters use different syntax than Service Bus topic filters
⚠️ Visibility timeout: Must be long enough to process message, or extend with ChangeMessageVisibility
⚠️ Fan-out: Use SNS topic → multiple SQS queues for broadcast patterns

Success Criteria

✅ All Azure Service Bus code replaced with SQS + SNS
✅ Extension method signatures unchanged
✅ Queue and topic patterns working
✅ Dead letter queues configured
✅ Subscriptions receiving messages
✅ IAM policies applied
✅ Message ordering preserved (if using FIFO)
✅ No breaking changes to consuming applications