AWS Lambda 與 SQS 非同步處理

AWS Lambda and SQS Asynchronous Processing

在現代雲端架構中,非同步處理是實現高可擴展性和解耦系統元件的關鍵技術。AWS Lambda 與 Amazon SQS(Simple Queue Service)的整合提供了一個強大且可靠的非同步處理解決方案。本文將深入探討如何有效地整合這兩項服務,實現高效能的事件驅動架構。

SQS 與 Lambda 整合概述

什麼是 Amazon SQS?

Amazon SQS 是一項完全託管的訊息佇列服務,可讓您分離和擴展微服務、分散式系統及無伺服器應用程式。SQS 消除了管理和營運訊息導向中介軟體的複雜性和開銷。

Lambda 與 SQS 整合架構

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
┌─────────────┐     ┌─────────────┐     ┌─────────────┐
│   Producer  │────▶│     SQS     │────▶│   Lambda    │
│  (API/App)  │     │    Queue    │     │  Function   │
└─────────────┘     └─────────────┘     └─────────────┘
                    ┌─────────────┐
                    │  Dead Letter│
                    │    Queue    │
                    └─────────────┘

整合優勢

  1. 解耦架構:生產者和消費者獨立運作,互不影響
  2. 自動擴展:Lambda 根據佇列深度自動調整並行執行數量
  3. 容錯處理:內建重試機制和死信佇列支援
  4. 成本效益:按實際使用量付費,無需預先配置資源

基本整合範例

以下是一個簡單的 Lambda 函數,用於處理 SQS 訊息:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
import json
import logging

logger = logging.getLogger()
logger.setLevel(logging.INFO)

def lambda_handler(event, context):
    """
    處理來自 SQS 的訊息
    """
    processed_count = 0
    failed_count = 0

    for record in event['Records']:
        try:
            # 取得訊息內容
            message_body = json.loads(record['body'])
            message_id = record['messageId']
            receipt_handle = record['receiptHandle']

            logger.info(f"Processing message: {message_id}")
            logger.info(f"Message body: {message_body}")

            # 處理業務邏輯
            process_message(message_body)

            processed_count += 1

        except Exception as e:
            logger.error(f"Error processing message: {str(e)}")
            failed_count += 1
            raise  # 重新拋出例外以觸發重試

    return {
        'statusCode': 200,
        'body': json.dumps({
            'processed': processed_count,
            'failed': failed_count
        })
    }

def process_message(message):
    """
    實際的業務處理邏輯
    """
    # 在此實作您的業務邏輯
    order_id = message.get('order_id')
    action = message.get('action')

    logger.info(f"Processing order {order_id} with action {action}")

標準佇列與 FIFO 佇列

Amazon SQS 提供兩種類型的佇列,各有不同的特性和使用場景。

標準佇列 (Standard Queue)

標準佇列提供最大吞吐量、盡力排序和至少一次傳遞。

特性:

  • 幾乎無限的吞吐量
  • 至少一次傳遞(可能重複)
  • 盡力排序(可能亂序)

使用場景:

  • 高吞吐量需求
  • 可容忍重複訊息
  • 不需要嚴格順序

建立標準佇列(AWS CLI):

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
# 建立標準佇列
aws sqs create-queue \
    --queue-name my-standard-queue \
    --attributes '{
        "VisibilityTimeout": "30",
        "MessageRetentionPeriod": "345600",
        "ReceiveMessageWaitTimeSeconds": "20"
    }'

# 取得佇列 URL
aws sqs get-queue-url --queue-name my-standard-queue

FIFO 佇列 (First-In-First-Out)

FIFO 佇列確保訊息按照發送順序處理,且每則訊息只傳遞一次。

特性:

  • 嚴格順序處理
  • 精確一次處理(Exactly-Once Processing)
  • 每秒最多 3,000 則訊息(使用批次)或 300 則訊息(不使用批次)
  • 佇列名稱必須以 .fifo 結尾

使用場景:

  • 金融交易處理
  • 訂單處理系統
  • 需要嚴格順序的工作流程

建立 FIFO 佇列(AWS CLI):

1
2
3
4
5
6
7
8
9
# 建立 FIFO 佇列
aws sqs create-queue \
    --queue-name my-fifo-queue.fifo \
    --attributes '{
        "FifoQueue": "true",
        "ContentBasedDeduplication": "true",
        "VisibilityTimeout": "30",
        "MessageRetentionPeriod": "345600"
    }'

比較表

特性標準佇列FIFO 佇列
吞吐量無限制3,000 TPS(批次)
順序盡力排序嚴格順序
傳遞保證至少一次精確一次
重複訊息可能不會
Lambda 並行受限於 Message Group

發送訊息範例

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import boto3
import json
import uuid

sqs = boto3.client('sqs')

# 標準佇列發送訊息
def send_to_standard_queue(queue_url, message):
    response = sqs.send_message(
        QueueUrl=queue_url,
        MessageBody=json.dumps(message),
        MessageAttributes={
            'MessageType': {
                'DataType': 'String',
                'StringValue': 'OrderCreated'
            }
        }
    )
    return response['MessageId']

# FIFO 佇列發送訊息
def send_to_fifo_queue(queue_url, message, group_id):
    response = sqs.send_message(
        QueueUrl=queue_url,
        MessageBody=json.dumps(message),
        MessageGroupId=group_id,
        MessageDeduplicationId=str(uuid.uuid4()),
        MessageAttributes={
            'MessageType': {
                'DataType': 'String',
                'StringValue': 'OrderCreated'
            }
        }
    )
    return response['MessageId']

Event Source Mapping 設定

Event Source Mapping 是 Lambda 用來從 SQS 佇列讀取訊息的機制。它會自動輪詢佇列並在有訊息時觸發 Lambda 函數。

建立 Event Source Mapping

使用 AWS CLI:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
# 建立 Event Source Mapping
aws lambda create-event-source-mapping \
    --function-name my-lambda-function \
    --event-source-arn arn:aws:sqs:ap-northeast-1:123456789012:my-queue \
    --batch-size 10 \
    --maximum-batching-window-in-seconds 5 \
    --enabled

# 列出 Event Source Mappings
aws lambda list-event-source-mappings \
    --function-name my-lambda-function

# 更新 Event Source Mapping
aws lambda update-event-source-mapping \
    --uuid "a1b2c3d4-5678-90ab-cdef-EXAMPLE11111" \
    --batch-size 20 \
    --maximum-batching-window-in-seconds 10

重要參數說明

參數說明預設值
BatchSize每次呼叫傳遞的最大訊息數10(最大 10,000)
MaximumBatchingWindowInSeconds批次收集的最大等待時間0(最大 300 秒)
FunctionResponseTypes啟用部分批次回報-
ScalingConfig並行控制設定-

Lambda 執行角色權限

確保 Lambda 函數的執行角色具有必要的 SQS 權限:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "sqs:ReceiveMessage",
                "sqs:DeleteMessage",
                "sqs:GetQueueAttributes",
                "sqs:ChangeMessageVisibility"
            ],
            "Resource": "arn:aws:sqs:ap-northeast-1:123456789012:my-queue"
        },
        {
            "Effect": "Allow",
            "Action": [
                "logs:CreateLogGroup",
                "logs:CreateLogStream",
                "logs:PutLogEvents"
            ],
            "Resource": "arn:aws:logs:*:*:*"
        }
    ]
}

Event Source Mapping 狀態管理

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
# 停用 Event Source Mapping
aws lambda update-event-source-mapping \
    --uuid "a1b2c3d4-5678-90ab-cdef-EXAMPLE11111" \
    --enabled false

# 啟用 Event Source Mapping
aws lambda update-event-source-mapping \
    --uuid "a1b2c3d4-5678-90ab-cdef-EXAMPLE11111" \
    --enabled true

# 刪除 Event Source Mapping
aws lambda delete-event-source-mapping \
    --uuid "a1b2c3d4-5678-90ab-cdef-EXAMPLE11111"

批次處理與並行控制

批次處理最佳化

批次處理可以顯著提高處理效率,減少 Lambda 呼叫次數。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
import json
import logging
from concurrent.futures import ThreadPoolExecutor, as_completed

logger = logging.getLogger()
logger.setLevel(logging.INFO)

def lambda_handler(event, context):
    """
    高效批次處理 SQS 訊息
    """
    records = event['Records']
    batch_size = len(records)

    logger.info(f"Processing batch of {batch_size} messages")

    # 使用執行緒池並行處理訊息
    results = []
    with ThreadPoolExecutor(max_workers=min(batch_size, 10)) as executor:
        future_to_record = {
            executor.submit(process_single_message, record): record
            for record in records
        }

        for future in as_completed(future_to_record):
            record = future_to_record[future]
            try:
                result = future.result()
                results.append({
                    'messageId': record['messageId'],
                    'status': 'success'
                })
            except Exception as e:
                logger.error(f"Error processing {record['messageId']}: {str(e)}")
                results.append({
                    'messageId': record['messageId'],
                    'status': 'failed',
                    'error': str(e)
                })

    return {
        'statusCode': 200,
        'body': json.dumps({
            'total': batch_size,
            'results': results
        })
    }

def process_single_message(record):
    """
    處理單一訊息
    """
    message_body = json.loads(record['body'])
    # 實作業務邏輯
    return True

並行控制設定

Lambda 會根據佇列深度自動調整並行執行數量。您可以透過以下方式控制:

設定最大並行數:

1
2
3
4
5
6
7
8
9
# 設定 Reserved Concurrency
aws lambda put-function-concurrency \
    --function-name my-lambda-function \
    --reserved-concurrent-executions 100

# 在 Event Source Mapping 設定 Scaling Config
aws lambda update-event-source-mapping \
    --uuid "a1b2c3d4-5678-90ab-cdef-EXAMPLE11111" \
    --scaling-config '{"MaximumConcurrency": 50}'

標準佇列的並行行為

1
2
3
4
5
6
佇列深度    Lambda 並行數
────────    ────────────
100         5
1,000       50
10,000      500
100,000     1,000(預設上限)

FIFO 佇列的並行限制

FIFO 佇列的並行受限於 Message Group:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
# 發送訊息時指定 Message Group
def send_fifo_message(queue_url, message, user_id):
    """
    使用 user_id 作為 Message Group ID
    確保同一用戶的訊息按順序處理
    """
    sqs.send_message(
        QueueUrl=queue_url,
        MessageBody=json.dumps(message),
        MessageGroupId=f"user-{user_id}",  # 相同 Group 的訊息會按順序處理
        MessageDeduplicationId=str(uuid.uuid4())
    )

死信佇列處理

死信佇列(Dead Letter Queue, DLQ)用於存放無法成功處理的訊息,避免訊息遺失。

設定死信佇列

建立死信佇列:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
# 建立死信佇列
aws sqs create-queue \
    --queue-name my-queue-dlq \
    --attributes '{
        "MessageRetentionPeriod": "1209600"
    }'

# 取得死信佇列 ARN
DLQ_ARN=$(aws sqs get-queue-attributes \
    --queue-url https://sqs.ap-northeast-1.amazonaws.com/123456789012/my-queue-dlq \
    --attribute-names QueueArn \
    --query 'Attributes.QueueArn' \
    --output text)

# 設定主佇列的重新驅動政策
aws sqs set-queue-attributes \
    --queue-url https://sqs.ap-northeast-1.amazonaws.com/123456789012/my-queue \
    --attributes "{
        \"RedrivePolicy\": \"{\\\"deadLetterTargetArn\\\":\\\"${DLQ_ARN}\\\",\\\"maxReceiveCount\\\":\\\"3\\\"}\"
    }"

死信佇列處理 Lambda

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
import json
import logging
import boto3
from datetime import datetime

logger = logging.getLogger()
logger.setLevel(logging.INFO)

sqs = boto3.client('sqs')
sns = boto3.client('sns')

ALERT_TOPIC_ARN = 'arn:aws:sns:ap-northeast-1:123456789012:dlq-alerts'

def lambda_handler(event, context):
    """
    處理死信佇列中的訊息
    """
    for record in event['Records']:
        try:
            message_id = record['messageId']
            message_body = json.loads(record['body'])

            # 取得原始訊息的屬性
            attributes = record.get('messageAttributes', {})
            approximate_receive_count = record.get('attributes', {}).get(
                'ApproximateReceiveCount', 'unknown'
            )

            logger.warning(f"DLQ Message: {message_id}")
            logger.warning(f"Receive count: {approximate_receive_count}")
            logger.warning(f"Body: {json.dumps(message_body, indent=2)}")

            # 分析失敗原因並記錄
            failure_analysis = analyze_failure(message_body, attributes)

            # 發送警報通知
            send_alert(message_id, message_body, failure_analysis)

            # 根據訊息類型決定處理方式
            handle_dead_letter(message_id, message_body, failure_analysis)

        except Exception as e:
            logger.error(f"Error processing DLQ message: {str(e)}")
            # DLQ 的訊息處理失敗時,需要人工介入
            raise

def analyze_failure(message_body, attributes):
    """
    分析訊息處理失敗的原因
    """
    analysis = {
        'timestamp': datetime.utcnow().isoformat(),
        'possible_causes': []
    }

    # 檢查常見問題
    if 'order_id' in message_body and not message_body.get('order_id'):
        analysis['possible_causes'].append('Missing order_id')

    if 'amount' in message_body:
        try:
            float(message_body['amount'])
        except (ValueError, TypeError):
            analysis['possible_causes'].append('Invalid amount format')

    return analysis

def send_alert(message_id, message_body, analysis):
    """
    發送警報通知
    """
    alert_message = {
        'alert_type': 'DLQ_MESSAGE',
        'message_id': message_id,
        'message_body': message_body,
        'analysis': analysis
    }

    sns.publish(
        TopicArn=ALERT_TOPIC_ARN,
        Message=json.dumps(alert_message, indent=2),
        Subject='[ALERT] Dead Letter Queue Message Detected'
    )

def handle_dead_letter(message_id, message_body, analysis):
    """
    處理死信訊息
    """
    # 根據訊息類型或錯誤原因決定處理方式
    message_type = message_body.get('type', 'unknown')

    if message_type == 'order':
        # 訂單相關訊息:標記為需要人工處理
        logger.info(f"Order message {message_id} requires manual intervention")
    elif message_type == 'notification':
        # 通知類訊息:可以安全丟棄
        logger.info(f"Notification message {message_id} can be discarded")
    else:
        # 未知類型:保留並通知
        logger.warning(f"Unknown message type: {message_type}")

重新處理死信佇列訊息

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
#!/bin/bash
# redrive-dlq.sh - 將死信佇列訊息移回主佇列

SOURCE_QUEUE_URL="https://sqs.ap-northeast-1.amazonaws.com/123456789012/my-queue-dlq"
TARGET_QUEUE_URL="https://sqs.ap-northeast-1.amazonaws.com/123456789012/my-queue"

# 接收並重新發送訊息
while true; do
    # 從 DLQ 接收訊息
    MESSAGE=$(aws sqs receive-message \
        --queue-url $SOURCE_QUEUE_URL \
        --max-number-of-messages 1 \
        --wait-time-seconds 5 \
        --output json)

    # 檢查是否有訊息
    if [ "$(echo $MESSAGE | jq '.Messages | length')" == "0" ] || [ "$(echo $MESSAGE | jq '.Messages')" == "null" ]; then
        echo "No more messages in DLQ"
        break
    fi

    BODY=$(echo $MESSAGE | jq -r '.Messages[0].Body')
    RECEIPT_HANDLE=$(echo $MESSAGE | jq -r '.Messages[0].ReceiptHandle')

    # 發送到主佇列
    aws sqs send-message \
        --queue-url $TARGET_QUEUE_URL \
        --message-body "$BODY"

    # 從 DLQ 刪除訊息
    aws sqs delete-message \
        --queue-url $SOURCE_QUEUE_URL \
        --receipt-handle "$RECEIPT_HANDLE"

    echo "Redrived message successfully"
done

錯誤處理與重試策略

部分批次失敗回報

啟用部分批次失敗回報可以避免因單一訊息失敗而重新處理整個批次:

1
2
3
4
# 啟用部分批次失敗回報
aws lambda update-event-source-mapping \
    --uuid "a1b2c3d4-5678-90ab-cdef-EXAMPLE11111" \
    --function-response-types "ReportBatchItemFailures"

Lambda 函數實作:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
import json
import logging

logger = logging.getLogger()
logger.setLevel(logging.INFO)

def lambda_handler(event, context):
    """
    支援部分批次失敗回報的 Lambda 函數
    """
    batch_item_failures = []

    for record in event['Records']:
        try:
            message_id = record['messageId']
            message_body = json.loads(record['body'])

            # 處理訊息
            process_message(message_body)

            logger.info(f"Successfully processed message: {message_id}")

        except Exception as e:
            logger.error(f"Failed to process message {record['messageId']}: {str(e)}")
            # 記錄失敗的訊息 ID
            batch_item_failures.append({
                'itemIdentifier': record['messageId']
            })

    # 回報失敗的訊息
    return {
        'batchItemFailures': batch_item_failures
    }

def process_message(message):
    """
    訊息處理邏輯,可能會拋出例外
    """
    if not message.get('valid', True):
        raise ValueError("Invalid message format")

    # 實際處理邏輯
    pass

重試策略設計

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
import json
import logging
import time
from functools import wraps

logger = logging.getLogger()
logger.setLevel(logging.INFO)

def retry_with_backoff(max_retries=3, base_delay=1, max_delay=60):
    """
    指數退避重試裝飾器
    """
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            retries = 0
            while retries < max_retries:
                try:
                    return func(*args, **kwargs)
                except Exception as e:
                    retries += 1
                    if retries >= max_retries:
                        logger.error(f"Max retries ({max_retries}) exceeded")
                        raise

                    delay = min(base_delay * (2 ** retries), max_delay)
                    logger.warning(f"Retry {retries}/{max_retries} after {delay}s: {str(e)}")
                    time.sleep(delay)

            return None
        return wrapper
    return decorator

class MessageProcessor:
    """
    訊息處理器,包含完整的錯誤處理邏輯
    """

    def __init__(self):
        self.processed_count = 0
        self.failed_count = 0
        self.errors = []

    @retry_with_backoff(max_retries=3)
    def process_with_retry(self, message):
        """
        帶重試的訊息處理
        """
        # 模擬可能失敗的操作
        self._call_external_api(message)
        self._update_database(message)
        return True

    def _call_external_api(self, message):
        """
        呼叫外部 API
        """
        # 實際的 API 呼叫
        pass

    def _update_database(self, message):
        """
        更新資料庫
        """
        # 實際的資料庫操作
        pass

def lambda_handler(event, context):
    """
    使用 MessageProcessor 處理訊息
    """
    processor = MessageProcessor()
    batch_item_failures = []

    for record in event['Records']:
        try:
            message = json.loads(record['body'])
            processor.process_with_retry(message)
            processor.processed_count += 1

        except Exception as e:
            processor.failed_count += 1
            processor.errors.append({
                'messageId': record['messageId'],
                'error': str(e)
            })
            batch_item_failures.append({
                'itemIdentifier': record['messageId']
            })

    logger.info(f"Processed: {processor.processed_count}, Failed: {processor.failed_count}")

    return {
        'batchItemFailures': batch_item_failures
    }

Visibility Timeout 最佳化

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
import boto3
import json
import logging

logger = logging.getLogger()
logger.setLevel(logging.INFO)

sqs = boto3.client('sqs')

def lambda_handler(event, context):
    """
    動態調整 Visibility Timeout
    """
    queue_url = 'https://sqs.ap-northeast-1.amazonaws.com/123456789012/my-queue'

    for record in event['Records']:
        receipt_handle = record['receiptHandle']

        try:
            # 預估處理時間
            estimated_time = estimate_processing_time(record)

            # 如果預估時間超過預設的 Visibility Timeout,延長它
            if estimated_time > 30:
                new_timeout = min(estimated_time + 30, 43200)  # 最大 12 小時
                sqs.change_message_visibility(
                    QueueUrl=queue_url,
                    ReceiptHandle=receipt_handle,
                    VisibilityTimeout=new_timeout
                )
                logger.info(f"Extended visibility timeout to {new_timeout}s")

            # 處理訊息
            process_message(record)

        except Exception as e:
            logger.error(f"Error: {str(e)}")
            raise

def estimate_processing_time(record):
    """
    根據訊息內容預估處理時間
    """
    message = json.loads(record['body'])

    # 根據訊息類型或大小預估時間
    if message.get('type') == 'large_batch':
        return 120  # 2 分鐘
    elif message.get('type') == 'standard':
        return 30   # 30 秒
    else:
        return 15   # 預設 15 秒

def process_message(record):
    """
    訊息處理邏輯
    """
    pass

Terraform 部署範例

以下是完整的 Terraform 配置,用於部署 Lambda 與 SQS 整合架構:

主要配置檔案

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# main.tf

terraform {
  required_version = ">= 1.0.0"

  required_providers {
    aws = {
      source  = "hashicorp/aws"
      version = "~> 5.0"
    }
    archive = {
      source  = "hashicorp/archive"
      version = "~> 2.0"
    }
  }
}

provider "aws" {
  region = var.aws_region
}

# 變數定義
variable "aws_region" {
  description = "AWS Region"
  type        = string
  default     = "ap-northeast-1"
}

variable "environment" {
  description = "Environment name"
  type        = string
  default     = "production"
}

variable "project_name" {
  description = "Project name"
  type        = string
  default     = "sqs-lambda-demo"
}

# 本地變數
locals {
  common_tags = {
    Environment = var.environment
    Project     = var.project_name
    ManagedBy   = "Terraform"
  }
}

SQS 佇列配置

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
# sqs.tf

# 主佇列
resource "aws_sqs_queue" "main_queue" {
  name                       = "${var.project_name}-queue"
  visibility_timeout_seconds = 60
  message_retention_seconds  = 345600  # 4 天
  receive_wait_time_seconds  = 20      # Long polling

  # 設定死信佇列
  redrive_policy = jsonencode({
    deadLetterTargetArn = aws_sqs_queue.dlq.arn
    maxReceiveCount     = 3
  })

  tags = local.common_tags
}

# 死信佇列
resource "aws_sqs_queue" "dlq" {
  name                       = "${var.project_name}-dlq"
  message_retention_seconds  = 1209600  # 14 天
  visibility_timeout_seconds = 60

  tags = local.common_tags
}

# FIFO 佇列(可選)
resource "aws_sqs_queue" "fifo_queue" {
  name                        = "${var.project_name}-queue.fifo"
  fifo_queue                  = true
  content_based_deduplication = true
  visibility_timeout_seconds  = 60
  message_retention_seconds   = 345600

  redrive_policy = jsonencode({
    deadLetterTargetArn = aws_sqs_queue.fifo_dlq.arn
    maxReceiveCount     = 3
  })

  tags = local.common_tags
}

# FIFO 死信佇列
resource "aws_sqs_queue" "fifo_dlq" {
  name                       = "${var.project_name}-dlq.fifo"
  fifo_queue                 = true
  message_retention_seconds  = 1209600
  visibility_timeout_seconds = 60

  tags = local.common_tags
}

# 佇列政策
resource "aws_sqs_queue_policy" "main_queue_policy" {
  queue_url = aws_sqs_queue.main_queue.id

  policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Sid    = "AllowLambdaAccess"
        Effect = "Allow"
        Principal = {
          Service = "lambda.amazonaws.com"
        }
        Action = [
          "sqs:ReceiveMessage",
          "sqs:DeleteMessage",
          "sqs:GetQueueAttributes"
        ]
        Resource = aws_sqs_queue.main_queue.arn
      }
    ]
  })
}

Lambda 函數配置

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
# lambda.tf

# Lambda 執行角色
resource "aws_iam_role" "lambda_role" {
  name = "${var.project_name}-lambda-role"

  assume_role_policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Action = "sts:AssumeRole"
        Effect = "Allow"
        Principal = {
          Service = "lambda.amazonaws.com"
        }
      }
    ]
  })

  tags = local.common_tags
}

# Lambda 權限政策
resource "aws_iam_role_policy" "lambda_policy" {
  name = "${var.project_name}-lambda-policy"
  role = aws_iam_role.lambda_role.id

  policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Effect = "Allow"
        Action = [
          "sqs:ReceiveMessage",
          "sqs:DeleteMessage",
          "sqs:GetQueueAttributes",
          "sqs:ChangeMessageVisibility"
        ]
        Resource = [
          aws_sqs_queue.main_queue.arn,
          aws_sqs_queue.fifo_queue.arn
        ]
      },
      {
        Effect = "Allow"
        Action = [
          "logs:CreateLogGroup",
          "logs:CreateLogStream",
          "logs:PutLogEvents"
        ]
        Resource = "arn:aws:logs:*:*:*"
      },
      {
        Effect = "Allow"
        Action = [
          "xray:PutTraceSegments",
          "xray:PutTelemetryRecords"
        ]
        Resource = "*"
      }
    ]
  })
}

# Lambda 函數程式碼打包
data "archive_file" "lambda_zip" {
  type        = "zip"
  source_dir  = "${path.module}/lambda"
  output_path = "${path.module}/lambda.zip"
}

# 主要 Lambda 函數
resource "aws_lambda_function" "sqs_processor" {
  function_name    = "${var.project_name}-processor"
  filename         = data.archive_file.lambda_zip.output_path
  source_code_hash = data.archive_file.lambda_zip.output_base64sha256
  handler          = "handler.lambda_handler"
  runtime          = "python3.11"
  role             = aws_iam_role.lambda_role.arn
  timeout          = 60
  memory_size      = 256

  reserved_concurrent_executions = 100

  environment {
    variables = {
      ENVIRONMENT = var.environment
      LOG_LEVEL   = "INFO"
    }
  }

  tracing_config {
    mode = "Active"
  }

  tags = local.common_tags
}

# DLQ 處理 Lambda 函數
resource "aws_lambda_function" "dlq_processor" {
  function_name    = "${var.project_name}-dlq-processor"
  filename         = data.archive_file.lambda_zip.output_path
  source_code_hash = data.archive_file.lambda_zip.output_base64sha256
  handler          = "dlq_handler.lambda_handler"
  runtime          = "python3.11"
  role             = aws_iam_role.lambda_role.arn
  timeout          = 60
  memory_size      = 256

  environment {
    variables = {
      ENVIRONMENT     = var.environment
      ALERT_TOPIC_ARN = aws_sns_topic.alerts.arn
    }
  }

  tags = local.common_tags
}

# CloudWatch Log Group
resource "aws_cloudwatch_log_group" "lambda_logs" {
  name              = "/aws/lambda/${aws_lambda_function.sqs_processor.function_name}"
  retention_in_days = 14

  tags = local.common_tags
}

Event Source Mapping 配置

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# event_source_mapping.tf

# 標準佇列的 Event Source Mapping
resource "aws_lambda_event_source_mapping" "sqs_trigger" {
  event_source_arn = aws_sqs_queue.main_queue.arn
  function_name    = aws_lambda_function.sqs_processor.arn

  batch_size                         = 10
  maximum_batching_window_in_seconds = 5

  # 啟用部分批次失敗回報
  function_response_types = ["ReportBatchItemFailures"]

  # 並行控制
  scaling_config {
    maximum_concurrency = 50
  }

  enabled = true
}

# FIFO 佇列的 Event Source Mapping
resource "aws_lambda_event_source_mapping" "fifo_trigger" {
  event_source_arn = aws_sqs_queue.fifo_queue.arn
  function_name    = aws_lambda_function.sqs_processor.arn

  batch_size                         = 10
  maximum_batching_window_in_seconds = 0

  function_response_types = ["ReportBatchItemFailures"]

  enabled = true
}

# DLQ 的 Event Source Mapping
resource "aws_lambda_event_source_mapping" "dlq_trigger" {
  event_source_arn = aws_sqs_queue.dlq.arn
  function_name    = aws_lambda_function.dlq_processor.arn

  batch_size = 1
  enabled    = true
}

監控與警報配置

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
# monitoring.tf

# SNS Topic 用於警報
resource "aws_sns_topic" "alerts" {
  name = "${var.project_name}-alerts"

  tags = local.common_tags
}

# 佇列深度警報
resource "aws_cloudwatch_metric_alarm" "queue_depth" {
  alarm_name          = "${var.project_name}-queue-depth-high"
  comparison_operator = "GreaterThanThreshold"
  evaluation_periods  = 2
  metric_name         = "ApproximateNumberOfMessagesVisible"
  namespace           = "AWS/SQS"
  period              = 300
  statistic           = "Average"
  threshold           = 1000
  alarm_description   = "Queue depth exceeds 1000 messages"

  dimensions = {
    QueueName = aws_sqs_queue.main_queue.name
  }

  alarm_actions = [aws_sns_topic.alerts.arn]
  ok_actions    = [aws_sns_topic.alerts.arn]

  tags = local.common_tags
}

# DLQ 訊息警報
resource "aws_cloudwatch_metric_alarm" "dlq_messages" {
  alarm_name          = "${var.project_name}-dlq-messages"
  comparison_operator = "GreaterThanThreshold"
  evaluation_periods  = 1
  metric_name         = "ApproximateNumberOfMessagesVisible"
  namespace           = "AWS/SQS"
  period              = 60
  statistic           = "Sum"
  threshold           = 0
  alarm_description   = "Messages detected in DLQ"

  dimensions = {
    QueueName = aws_sqs_queue.dlq.name
  }

  alarm_actions = [aws_sns_topic.alerts.arn]

  tags = local.common_tags
}

# Lambda 錯誤警報
resource "aws_cloudwatch_metric_alarm" "lambda_errors" {
  alarm_name          = "${var.project_name}-lambda-errors"
  comparison_operator = "GreaterThanThreshold"
  evaluation_periods  = 1
  metric_name         = "Errors"
  namespace           = "AWS/Lambda"
  period              = 60
  statistic           = "Sum"
  threshold           = 5
  alarm_description   = "Lambda function errors exceed threshold"

  dimensions = {
    FunctionName = aws_lambda_function.sqs_processor.function_name
  }

  alarm_actions = [aws_sns_topic.alerts.arn]

  tags = local.common_tags
}

# Lambda Duration 警報
resource "aws_cloudwatch_metric_alarm" "lambda_duration" {
  alarm_name          = "${var.project_name}-lambda-duration"
  comparison_operator = "GreaterThanThreshold"
  evaluation_periods  = 3
  metric_name         = "Duration"
  namespace           = "AWS/Lambda"
  period              = 60
  statistic           = "Average"
  threshold           = 50000  # 50 秒
  alarm_description   = "Lambda duration approaching timeout"

  dimensions = {
    FunctionName = aws_lambda_function.sqs_processor.function_name
  }

  alarm_actions = [aws_sns_topic.alerts.arn]

  tags = local.common_tags
}

輸出配置

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# outputs.tf

output "main_queue_url" {
  description = "Main SQS queue URL"
  value       = aws_sqs_queue.main_queue.url
}

output "main_queue_arn" {
  description = "Main SQS queue ARN"
  value       = aws_sqs_queue.main_queue.arn
}

output "fifo_queue_url" {
  description = "FIFO SQS queue URL"
  value       = aws_sqs_queue.fifo_queue.url
}

output "dlq_url" {
  description = "Dead letter queue URL"
  value       = aws_sqs_queue.dlq.url
}

output "lambda_function_name" {
  description = "Lambda function name"
  value       = aws_lambda_function.sqs_processor.function_name
}

output "lambda_function_arn" {
  description = "Lambda function ARN"
  value       = aws_lambda_function.sqs_processor.arn
}

output "alerts_topic_arn" {
  description = "SNS alerts topic ARN"
  value       = aws_sns_topic.alerts.arn
}

部署指令

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
# 初始化 Terraform
terraform init

# 檢視執行計畫
terraform plan -var="environment=production"

# 部署基礎設施
terraform apply -var="environment=production" -auto-approve

# 取得輸出值
terraform output

# 清除資源
terraform destroy -var="environment=production" -auto-approve

監控與效能優化

CloudWatch 監控儀表板

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
import boto3
import json

cloudwatch = boto3.client('cloudwatch')

def create_dashboard(project_name, queue_name, function_name):
    """
    建立 CloudWatch 監控儀表板
    """
    dashboard_body = {
        "widgets": [
            {
                "type": "metric",
                "x": 0,
                "y": 0,
                "width": 12,
                "height": 6,
                "properties": {
                    "title": "SQS Queue Metrics",
                    "metrics": [
                        ["AWS/SQS", "ApproximateNumberOfMessagesVisible",
                         "QueueName", queue_name],
                        [".", "ApproximateNumberOfMessagesNotVisible", ".", "."],
                        [".", "NumberOfMessagesSent", ".", "."],
                        [".", "NumberOfMessagesReceived", ".", "."]
                    ],
                    "period": 60,
                    "stat": "Sum",
                    "region": "ap-northeast-1"
                }
            },
            {
                "type": "metric",
                "x": 12,
                "y": 0,
                "width": 12,
                "height": 6,
                "properties": {
                    "title": "Lambda Invocations & Errors",
                    "metrics": [
                        ["AWS/Lambda", "Invocations",
                         "FunctionName", function_name],
                        [".", "Errors", ".", "."],
                        [".", "Throttles", ".", "."]
                    ],
                    "period": 60,
                    "stat": "Sum",
                    "region": "ap-northeast-1"
                }
            },
            {
                "type": "metric",
                "x": 0,
                "y": 6,
                "width": 12,
                "height": 6,
                "properties": {
                    "title": "Lambda Duration",
                    "metrics": [
                        ["AWS/Lambda", "Duration",
                         "FunctionName", function_name,
                         {"stat": "Average"}],
                        ["...", {"stat": "p99"}],
                        ["...", {"stat": "Maximum"}]
                    ],
                    "period": 60,
                    "region": "ap-northeast-1"
                }
            },
            {
                "type": "metric",
                "x": 12,
                "y": 6,
                "width": 12,
                "height": 6,
                "properties": {
                    "title": "Lambda Concurrent Executions",
                    "metrics": [
                        ["AWS/Lambda", "ConcurrentExecutions",
                         "FunctionName", function_name]
                    ],
                    "period": 60,
                    "stat": "Maximum",
                    "region": "ap-northeast-1"
                }
            }
        ]
    }

    cloudwatch.put_dashboard(
        DashboardName=f"{project_name}-monitoring",
        DashboardBody=json.dumps(dashboard_body)
    )

關鍵指標監控

指標說明警報閾值建議
ApproximateNumberOfMessagesVisible佇列中可見訊息數> 1000
ApproximateAgeOfOldestMessage最舊訊息的存留時間> 3600 秒
NumberOfMessagesDeleted已刪除訊息數與處理量比較
Lambda ErrorsLambda 錯誤數> 5 per minute
Lambda DurationLambda 執行時間> 80% timeout
Lambda ConcurrentExecutions並行執行數> 80% limit

效能優化建議

1. 批次大小最佳化

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
# 根據訊息大小和處理時間調整批次大小
def calculate_optimal_batch_size(avg_message_size_kb, avg_processing_time_ms):
    """
    計算最佳批次大小
    """
    # Lambda 最大 payload: 6MB
    max_payload_kb = 6 * 1024

    # 基於訊息大小的限制
    size_based_limit = max_payload_kb // avg_message_size_kb

    # 基於處理時間的限制(假設 timeout 60 秒)
    timeout_ms = 60000
    time_based_limit = timeout_ms // avg_processing_time_ms

    # 取最小值,並限制在 SQS 允許範圍內
    optimal_size = min(size_based_limit, time_based_limit, 10000)

    return max(1, optimal_size)

2. 記憶體配置最佳化

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
# 使用 AWS Lambda Power Tuning 工具
# https://github.com/alexcasalboni/aws-lambda-power-tuning

# 部署 Power Tuning
sam deploy \
    --template-file template.yml \
    --stack-name lambda-power-tuning \
    --capabilities CAPABILITY_IAM

# 執行效能測試
aws stepfunctions start-execution \
    --state-machine-arn arn:aws:states:ap-northeast-1:123456789012:stateMachine:powerTuningStateMachine \
    --input '{
        "lambdaARN": "arn:aws:lambda:ap-northeast-1:123456789012:function:my-function",
        "powerValues": [128, 256, 512, 1024, 2048],
        "num": 50,
        "payload": "{\"test\": true}"
    }'

3. 連線池最佳化

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import boto3
from botocore.config import Config

# 在 Lambda 冷啟動時初始化連線
config = Config(
    max_pool_connections=50,
    retries={
        'max_attempts': 3,
        'mode': 'adaptive'
    }
)

# 在 handler 外部建立客戶端(重複使用連線)
dynamodb = boto3.resource('dynamodb', config=config)
table = dynamodb.Table('my-table')

def lambda_handler(event, context):
    """
    重複使用已建立的連線
    """
    for record in event['Records']:
        message = json.loads(record['body'])

        # 使用預先建立的連線
        table.put_item(Item=message)

4. 冷啟動優化

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# 使用 Provisioned Concurrency
# Terraform 配置
"""
resource "aws_lambda_provisioned_concurrency_config" "sqs_processor" {
  function_name                     = aws_lambda_function.sqs_processor.function_name
  provisioned_concurrent_executions = 10
  qualifier                         = aws_lambda_alias.live.name
}

resource "aws_lambda_alias" "live" {
  name             = "live"
  function_name    = aws_lambda_function.sqs_processor.function_name
  function_version = aws_lambda_function.sqs_processor.version
}
"""

# 使用輕量級套件
# requirements.txt
"""
boto3>=1.28.0
aws-lambda-powertools>=2.0.0
"""

# 延遲載入不必要的模組
def lambda_handler(event, context):
    # 只在需要時載入重型模組
    if event.get('requires_pandas'):
        import pandas as pd
        # 使用 pandas

成本優化

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
def analyze_cost_efficiency(queue_url, function_name, days=7):
    """
    分析 SQS + Lambda 成本效率
    """
    import boto3
    from datetime import datetime, timedelta

    cloudwatch = boto3.client('cloudwatch')

    end_time = datetime.utcnow()
    start_time = end_time - timedelta(days=days)

    # 取得 Lambda 呼叫次數
    invocations = cloudwatch.get_metric_statistics(
        Namespace='AWS/Lambda',
        MetricName='Invocations',
        Dimensions=[{'Name': 'FunctionName', 'Value': function_name}],
        StartTime=start_time,
        EndTime=end_time,
        Period=86400,
        Statistics=['Sum']
    )

    # 取得 SQS 訊息數
    messages = cloudwatch.get_metric_statistics(
        Namespace='AWS/SQS',
        MetricName='NumberOfMessagesReceived',
        Dimensions=[{'Name': 'QueueName', 'Value': queue_url.split('/')[-1]}],
        StartTime=start_time,
        EndTime=end_time,
        Period=86400,
        Statistics=['Sum']
    )

    total_invocations = sum([dp['Sum'] for dp in invocations['Datapoints']])
    total_messages = sum([dp['Sum'] for dp in messages['Datapoints']])

    # 計算批次效率
    if total_invocations > 0:
        avg_batch_size = total_messages / total_invocations
        print(f"Average batch size: {avg_batch_size:.2f}")
        print(f"Recommendation: {'Increase batch size' if avg_batch_size < 5 else 'Batch size is optimal'}")

    return {
        'total_invocations': total_invocations,
        'total_messages': total_messages,
        'avg_batch_size': total_messages / total_invocations if total_invocations > 0 else 0
    }

總結

AWS Lambda 與 SQS 的整合提供了一個強大且可擴展的非同步處理架構。透過本文介紹的技術和最佳實踐,您可以:

  1. 選擇適合的佇列類型:根據業務需求選擇標準佇列或 FIFO 佇列
  2. 正確設定 Event Source Mapping:最佳化批次大小和並行控制
  3. 實作完善的錯誤處理:利用部分批次失敗回報和死信佇列
  4. 使用基礎設施即程式碼:透過 Terraform 管理和版本控制您的架構
  5. 持續監控和優化:建立監控儀表板並根據指標調整配置

在實作時,請記住以下關鍵原則:

  • 冪等性設計:確保訊息處理邏輯可以安全地重複執行
  • 適當的超時設定:Lambda timeout 應大於預期處理時間
  • 完善的日誌記錄:便於問題排查和效能分析
  • 定期檢視 DLQ:及時處理失敗的訊息

透過這些實踐,您將能夠建立一個穩健、可擴展且易於維護的非同步處理系統。

comments powered by Disqus
Built with Hugo
Theme Stack designed by Jimmy