AWS Lambda 與 EventBridge 事件驅動

AWS Lambda and EventBridge Event-Driven Architecture

在現代雲端架構中,事件驅動設計模式已成為建構可擴展、鬆耦合系統的關鍵方法。AWS EventBridge 作為一個無伺服器事件匯流排服務,與 Lambda 函數的整合提供了強大的事件處理能力。本文將深入探討如何運用這兩項服務建構高效的事件驅動架構。

EventBridge 服務概述

Amazon EventBridge 是一個無伺服器事件匯流排服務,能夠輕鬆連接應用程式與來自各種來源的資料。它前身為 CloudWatch Events,但提供了更多進階功能和更好的整合能力。

核心特性

EventBridge 的主要特性包括:

  • 事件匯流排(Event Bus):接收和路由事件的中央管道
  • 規則(Rules):定義事件匹配模式和目標
  • Schema Registry:自動發現和儲存事件結構
  • 封存與重播(Archive & Replay):事件的持久化和重新處理
  • API Destinations:將事件發送到外部 HTTP 端點

事件來源類型

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
┌─────────────────────────────────────────────────────────────┐
│                    EventBridge 事件來源                      │
├─────────────────┬─────────────────┬─────────────────────────┤
│   AWS 服務事件   │   自訂應用程式   │      SaaS 合作夥伴       │
├─────────────────┼─────────────────┼─────────────────────────┤
│ • EC2 狀態變更  │ • PutEvents API │ • Zendesk               │
│ • S3 物件操作   │ • SDK 整合      │ • Datadog               │
│ • CodePipeline  │ • 自訂事件格式   │ • Auth0                 │
│ • CloudTrail    │                 │ • Shopify               │
└─────────────────┴─────────────────┴─────────────────────────┘

事件匯流排與規則

建立自訂事件匯流排

預設情況下,每個 AWS 帳戶都有一個 default 事件匯流排。但在實務中,建議為不同用途建立自訂事件匯流排。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
# 建立自訂事件匯流排
aws events create-event-bus --name "order-processing-bus"

# 查看所有事件匯流排
aws events list-event-buses

# 建立帶有標籤的事件匯流排
aws events create-event-bus \
    --name "payment-events-bus" \
    --tags Key=Environment,Value=Production Key=Team,Value=Backend

使用 Terraform 建立事件匯流排

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# EventBridge 事件匯流排
resource "aws_cloudwatch_event_bus" "order_bus" {
  name = "order-processing-bus"

  tags = {
    Environment = "production"
    Application = "order-system"
  }
}

# 事件匯流排政策
resource "aws_cloudwatch_event_bus_policy" "order_bus_policy" {
  event_bus_name = aws_cloudwatch_event_bus.order_bus.name

  policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Sid       = "AllowAccountToPutEvents"
        Effect    = "Allow"
        Principal = {
          AWS = "arn:aws:iam::123456789012:root"
        }
        Action    = "events:PutEvents"
        Resource  = aws_cloudwatch_event_bus.order_bus.arn
      }
    ]
  })
}

建立事件規則

事件規則定義了哪些事件應該被路由到哪些目標。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
# 建立規則
aws events put-rule \
    --name "order-created-rule" \
    --event-bus-name "order-processing-bus" \
    --event-pattern '{
        "source": ["com.myapp.orders"],
        "detail-type": ["OrderCreated"]
    }' \
    --state ENABLED \
    --description "處理新訂單建立事件"

# 將 Lambda 函數設為目標
aws events put-targets \
    --rule "order-created-rule" \
    --event-bus-name "order-processing-bus" \
    --targets '[{
        "Id": "OrderProcessorLambda",
        "Arn": "arn:aws:lambda:ap-northeast-1:123456789012:function:order-processor",
        "Input": "{\"processor\": \"order-handler\"}"
    }]'

事件模式匹配

EventBridge 提供了強大的事件模式匹配功能,讓您能夠精確地篩選需要處理的事件。

基本模式匹配

1
2
3
4
5
6
7
{
  "source": ["com.myapp.orders"],
  "detail-type": ["OrderCreated", "OrderUpdated"],
  "detail": {
    "status": ["pending", "processing"]
  }
}

進階模式匹配

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
{
  "source": ["com.myapp.orders"],
  "detail": {
    "amount": [{
      "numeric": [">", 1000]
    }],
    "region": [{
      "prefix": "ap-"
    }],
    "customer": {
      "tier": ["premium", "enterprise"]
    },
    "items": {
      "category": [{
        "anything-but": ["digital"]
      }]
    }
  }
}

模式匹配運算子

運算子用途範例
prefix字串前綴匹配{"prefix": "order-"}
suffix字串後綴匹配{"suffix": ".json"}
anything-but排除特定值{"anything-but": ["test"]}
numeric數值比較{"numeric": [">=", 100]}
cidrIP 範圍匹配{"cidr": "10.0.0.0/24"}
exists欄位存在檢查{"exists": true}

複合模式範例

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
{
  "source": ["aws.ec2"],
  "detail-type": ["EC2 Instance State-change Notification"],
  "detail": {
    "state": ["stopped", "terminated"],
    "instance-id": [{
      "prefix": "i-prod-"
    }]
  },
  "region": [{
    "anything-but": "us-east-1"
  }]
}

Lambda 函數整合

Lambda 函數範例

以下是一個處理 EventBridge 事件的 Lambda 函數範例:

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
import json
import logging
import boto3
from datetime import datetime

logger = logging.getLogger()
logger.setLevel(logging.INFO)

dynamodb = boto3.resource('dynamodb')
sns = boto3.client('sns')

def lambda_handler(event, context):
    """
    處理 EventBridge 傳入的訂單事件
    """
    logger.info(f"Received event: {json.dumps(event)}")

    # 解析事件
    detail_type = event.get('detail-type')
    source = event.get('source')
    detail = event.get('detail', {})
    event_time = event.get('time')

    # 根據事件類型處理
    if detail_type == 'OrderCreated':
        return process_order_created(detail)
    elif detail_type == 'OrderUpdated':
        return process_order_updated(detail)
    elif detail_type == 'OrderCancelled':
        return process_order_cancelled(detail)
    else:
        logger.warning(f"Unknown event type: {detail_type}")
        return {
            'statusCode': 400,
            'body': json.dumps({'error': f'Unknown event type: {detail_type}'})
        }

def process_order_created(detail):
    """處理新訂單建立"""
    order_id = detail.get('orderId')
    customer_id = detail.get('customerId')
    items = detail.get('items', [])
    total_amount = detail.get('totalAmount')

    # 儲存訂單到 DynamoDB
    table = dynamodb.Table('Orders')
    table.put_item(
        Item={
            'orderId': order_id,
            'customerId': customer_id,
            'items': items,
            'totalAmount': str(total_amount),
            'status': 'pending',
            'createdAt': datetime.utcnow().isoformat()
        }
    )

    # 發送通知
    sns.publish(
        TopicArn='arn:aws:sns:ap-northeast-1:123456789012:order-notifications',
        Message=json.dumps({
            'orderId': order_id,
            'message': f'New order created with amount: ${total_amount}'
        }),
        Subject='New Order Created'
    )

    logger.info(f"Order {order_id} processed successfully")

    return {
        'statusCode': 200,
        'body': json.dumps({
            'message': 'Order processed successfully',
            'orderId': order_id
        })
    }

def process_order_updated(detail):
    """處理訂單更新"""
    order_id = detail.get('orderId')
    updates = detail.get('updates', {})

    table = dynamodb.Table('Orders')

    update_expression = "SET " + ", ".join([f"#{k} = :{k}" for k in updates.keys()])
    expression_names = {f"#{k}": k for k in updates.keys()}
    expression_values = {f":{k}": v for k, v in updates.items()}

    table.update_item(
        Key={'orderId': order_id},
        UpdateExpression=update_expression,
        ExpressionAttributeNames=expression_names,
        ExpressionAttributeValues=expression_values
    )

    return {
        'statusCode': 200,
        'body': json.dumps({'message': f'Order {order_id} updated'})
    }

def process_order_cancelled(detail):
    """處理訂單取消"""
    order_id = detail.get('orderId')
    reason = detail.get('reason', 'No reason provided')

    table = dynamodb.Table('Orders')
    table.update_item(
        Key={'orderId': order_id},
        UpdateExpression='SET #status = :status, #cancelReason = :reason, #cancelledAt = :time',
        ExpressionAttributeNames={
            '#status': 'status',
            '#cancelReason': 'cancelReason',
            '#cancelledAt': 'cancelledAt'
        },
        ExpressionAttributeValues={
            ':status': 'cancelled',
            ':reason': reason,
            ':time': datetime.utcnow().isoformat()
        }
    )

    return {
        'statusCode': 200,
        'body': json.dumps({'message': f'Order {order_id} cancelled'})
    }

Lambda 權限設定

Lambda 函數需要適當的權限才能被 EventBridge 調用:

1
2
3
4
5
6
7
# 授予 EventBridge 調用 Lambda 的權限
aws lambda add-permission \
    --function-name order-processor \
    --statement-id EventBridgeInvoke \
    --action lambda:InvokeFunction \
    --principal events.amazonaws.com \
    --source-arn "arn:aws:events:ap-northeast-1:123456789012:rule/order-processing-bus/order-created-rule"

Terraform 完整設定

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
# Lambda 函數
resource "aws_lambda_function" "order_processor" {
  filename         = "order_processor.zip"
  function_name    = "order-processor"
  role             = aws_iam_role.lambda_role.arn
  handler          = "handler.lambda_handler"
  runtime          = "python3.11"
  timeout          = 30
  memory_size      = 256

  environment {
    variables = {
      ORDERS_TABLE = aws_dynamodb_table.orders.name
      SNS_TOPIC    = aws_sns_topic.order_notifications.arn
    }
  }

  tags = {
    Environment = "production"
  }
}

# IAM 角色
resource "aws_iam_role" "lambda_role" {
  name = "order-processor-role"

  assume_role_policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Action = "sts:AssumeRole"
        Effect = "Allow"
        Principal = {
          Service = "lambda.amazonaws.com"
        }
      }
    ]
  })
}

# Lambda 執行政策
resource "aws_iam_role_policy" "lambda_policy" {
  name = "order-processor-policy"
  role = aws_iam_role.lambda_role.id

  policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Effect = "Allow"
        Action = [
          "logs:CreateLogGroup",
          "logs:CreateLogStream",
          "logs:PutLogEvents"
        ]
        Resource = "arn:aws:logs:*:*:*"
      },
      {
        Effect = "Allow"
        Action = [
          "dynamodb:PutItem",
          "dynamodb:UpdateItem",
          "dynamodb:GetItem"
        ]
        Resource = aws_dynamodb_table.orders.arn
      },
      {
        Effect = "Allow"
        Action = "sns:Publish"
        Resource = aws_sns_topic.order_notifications.arn
      }
    ]
  })
}

# EventBridge 規則
resource "aws_cloudwatch_event_rule" "order_created" {
  name           = "order-created-rule"
  event_bus_name = aws_cloudwatch_event_bus.order_bus.name
  description    = "Capture order created events"

  event_pattern = jsonencode({
    source      = ["com.myapp.orders"]
    detail-type = ["OrderCreated"]
  })
}

# EventBridge 目標
resource "aws_cloudwatch_event_target" "order_processor" {
  rule           = aws_cloudwatch_event_rule.order_created.name
  event_bus_name = aws_cloudwatch_event_bus.order_bus.name
  target_id      = "OrderProcessorLambda"
  arn            = aws_lambda_function.order_processor.arn
}

# Lambda 權限
resource "aws_lambda_permission" "eventbridge_invoke" {
  statement_id  = "AllowEventBridgeInvoke"
  action        = "lambda:InvokeFunction"
  function_name = aws_lambda_function.order_processor.function_name
  principal     = "events.amazonaws.com"
  source_arn    = aws_cloudwatch_event_rule.order_created.arn
}

發送自訂事件

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
import boto3
import json
from datetime import datetime

def publish_order_event(order_data, event_type):
    """
    發布訂單事件到 EventBridge
    """
    client = boto3.client('events')

    response = client.put_events(
        Entries=[
            {
                'Time': datetime.utcnow(),
                'Source': 'com.myapp.orders',
                'DetailType': event_type,
                'Detail': json.dumps(order_data),
                'EventBusName': 'order-processing-bus',
                'TraceHeader': 'Root=1-5f84c7a1-sample-trace-id'
            }
        ]
    )

    failed_count = response.get('FailedEntryCount', 0)
    if failed_count > 0:
        print(f"Failed to publish {failed_count} events")
        for entry in response.get('Entries', []):
            if 'ErrorCode' in entry:
                print(f"Error: {entry['ErrorCode']} - {entry['ErrorMessage']}")

    return response

# 使用範例
order_data = {
    'orderId': 'ORD-2025-001',
    'customerId': 'CUST-123',
    'items': [
        {'productId': 'PROD-001', 'quantity': 2, 'price': 29.99},
        {'productId': 'PROD-002', 'quantity': 1, 'price': 49.99}
    ],
    'totalAmount': 109.97,
    'shippingAddress': {
        'city': 'Taipei',
        'country': 'Taiwan'
    }
}

publish_order_event(order_data, 'OrderCreated')

排程規則設定

EventBridge 支援使用 cron 或 rate 表達式建立排程規則,可用於定期觸發 Lambda 函數。

Rate 表達式

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
# 每 5 分鐘執行一次
aws events put-rule \
    --name "every-5-minutes" \
    --schedule-expression "rate(5 minutes)" \
    --state ENABLED

# 每小時執行一次
aws events put-rule \
    --name "hourly-task" \
    --schedule-expression "rate(1 hour)" \
    --state ENABLED

# 每天執行一次
aws events put-rule \
    --name "daily-cleanup" \
    --schedule-expression "rate(1 day)" \
    --state ENABLED

Cron 表達式

EventBridge cron 表達式格式:cron(分鐘 小時 日期 月份 星期 年份)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# 每天早上 9 點(UTC)執行
aws events put-rule \
    --name "daily-morning-report" \
    --schedule-expression "cron(0 9 * * ? *)" \
    --state ENABLED

# 每週一早上 8 點執行
aws events put-rule \
    --name "weekly-monday-task" \
    --schedule-expression "cron(0 8 ? * MON *)" \
    --state ENABLED

# 每月第一天午夜執行
aws events put-rule \
    --name "monthly-first-day" \
    --schedule-expression "cron(0 0 1 * ? *)" \
    --state ENABLED

# 每週一到週五的上午 9 點到下午 5 點,每 15 分鐘執行
aws events put-rule \
    --name "business-hours-check" \
    --schedule-expression "cron(0/15 9-17 ? * MON-FRI *)" \
    --state ENABLED

Terraform 排程規則範例

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
# 定期清理任務
resource "aws_cloudwatch_event_rule" "cleanup_schedule" {
  name                = "database-cleanup-schedule"
  description         = "Trigger database cleanup every night at 2 AM UTC"
  schedule_expression = "cron(0 2 * * ? *)"

  tags = {
    Purpose = "DatabaseMaintenance"
  }
}

resource "aws_cloudwatch_event_target" "cleanup_lambda" {
  rule      = aws_cloudwatch_event_rule.cleanup_schedule.name
  target_id = "DatabaseCleanupLambda"
  arn       = aws_lambda_function.cleanup_function.arn

  # 可選:傳入額外參數
  input = jsonencode({
    action     = "cleanup"
    retention  = 30
    table_name = "logs"
  })
}

# 帶有重試設定的排程
resource "aws_cloudwatch_event_rule" "report_schedule" {
  name                = "weekly-report-schedule"
  description         = "Generate weekly report every Monday at 8 AM"
  schedule_expression = "cron(0 8 ? * MON *)"
}

resource "aws_cloudwatch_event_target" "report_lambda" {
  rule      = aws_cloudwatch_event_rule.report_schedule.name
  target_id = "WeeklyReportLambda"
  arn       = aws_lambda_function.report_function.arn

  retry_policy {
    maximum_event_age_in_seconds = 3600
    maximum_retry_attempts       = 3
  }

  dead_letter_config {
    arn = aws_sqs_queue.dlq.arn
  }
}

排程任務 Lambda 函數

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
import json
import logging
import boto3
from datetime import datetime, timedelta

logger = logging.getLogger()
logger.setLevel(logging.INFO)

def cleanup_handler(event, context):
    """
    定期清理過期資料
    """
    logger.info(f"Cleanup triggered at {datetime.utcnow().isoformat()}")

    # 從事件中取得參數
    retention_days = event.get('retention', 30)
    table_name = event.get('table_name', 'logs')

    dynamodb = boto3.resource('dynamodb')
    table = dynamodb.Table(table_name)

    # 計算過期時間
    expiry_date = datetime.utcnow() - timedelta(days=retention_days)
    expiry_timestamp = expiry_date.isoformat()

    # 掃描並刪除過期項目
    deleted_count = 0
    scan_kwargs = {
        'FilterExpression': '#ts < :expiry',
        'ExpressionAttributeNames': {'#ts': 'timestamp'},
        'ExpressionAttributeValues': {':expiry': expiry_timestamp}
    }

    while True:
        response = table.scan(**scan_kwargs)
        items = response.get('Items', [])

        with table.batch_writer() as batch:
            for item in items:
                batch.delete_item(Key={'id': item['id']})
                deleted_count += 1

        # 檢查是否還有更多項目
        if 'LastEvaluatedKey' not in response:
            break
        scan_kwargs['ExclusiveStartKey'] = response['LastEvaluatedKey']

    logger.info(f"Cleanup completed. Deleted {deleted_count} items.")

    return {
        'statusCode': 200,
        'body': json.dumps({
            'message': 'Cleanup completed',
            'deletedCount': deleted_count,
            'timestamp': datetime.utcnow().isoformat()
        })
    }

跨帳戶事件傳遞

EventBridge 支援在不同 AWS 帳戶之間傳遞事件,這對於微服務架構和多帳戶策略非常重要。

架構概述

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
┌─────────────────────────────────────────────────────────────────────────┐
│                         跨帳戶事件傳遞架構                               │
├─────────────────────────────────────────────────────────────────────────┤
│                                                                         │
│   Account A (發送者)                    Account B (接收者)               │
│   ┌─────────────────────┐              ┌─────────────────────┐          │
│   │   Application       │              │   Event Bus         │          │
│   │   ┌─────────────┐   │              │   (custom-bus)      │          │
│   │   │ PutEvents   │───┼──────────────┼──▶ ┌─────────────┐  │          │
│   │   └─────────────┘   │              │    │    Rule     │  │          │
│   │         │           │              │    └──────┬──────┘  │          │
│   │         ▼           │              │           │         │          │
│   │   ┌─────────────┐   │              │           ▼         │          │
│   │   │ Event Bus   │   │              │    ┌─────────────┐  │          │
│   │   │ (default)   │   │              │    │   Lambda    │  │          │
│   │   └─────────────┘   │              │    └─────────────┘  │          │
│   └─────────────────────┘              └─────────────────────┘          │
│                                                                         │
└─────────────────────────────────────────────────────────────────────────┘

設定接收端(Account B)

首先,在接收帳戶中設定事件匯流排政策:

1
2
3
4
5
6
# Account B: 建立允許 Account A 發送事件的政策
aws events put-permission \
    --event-bus-name "partner-events-bus" \
    --statement-id "AllowAccountA" \
    --action "events:PutEvents" \
    --principal "111111111111"  # Account A 的帳戶 ID

Terraform 跨帳戶設定

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
# ============================================
# Account B(接收端)設定
# ============================================

resource "aws_cloudwatch_event_bus" "partner_bus" {
  name = "partner-events-bus"
}

resource "aws_cloudwatch_event_bus_policy" "cross_account" {
  event_bus_name = aws_cloudwatch_event_bus.partner_bus.name

  policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Sid       = "AllowCrossAccountPutEvents"
        Effect    = "Allow"
        Principal = {
          AWS = [
            "arn:aws:iam::111111111111:root",  # Account A
            "arn:aws:iam::222222222222:root"   # Account C
          ]
        }
        Action    = "events:PutEvents"
        Resource  = aws_cloudwatch_event_bus.partner_bus.arn
        Condition = {
          StringEquals = {
            "events:source" = ["com.partner.orders", "com.partner.inventory"]
          }
        }
      }
    ]
  })
}

# 接收事件的規則
resource "aws_cloudwatch_event_rule" "cross_account_orders" {
  name           = "cross-account-order-events"
  event_bus_name = aws_cloudwatch_event_bus.partner_bus.name
  description    = "Process order events from partner accounts"

  event_pattern = jsonencode({
    source = ["com.partner.orders"]
  })
}

resource "aws_cloudwatch_event_target" "process_partner_orders" {
  rule           = aws_cloudwatch_event_rule.cross_account_orders.name
  event_bus_name = aws_cloudwatch_event_bus.partner_bus.name
  target_id      = "PartnerOrderProcessor"
  arn            = aws_lambda_function.partner_order_processor.arn
}

設定發送端(Account A)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
# ============================================
# Account A(發送端)設定
# ============================================

# 建立轉發規則
resource "aws_cloudwatch_event_rule" "forward_to_partner" {
  name        = "forward-orders-to-partner"
  description = "Forward order events to partner account"

  event_pattern = jsonencode({
    source      = ["com.myapp.orders"]
    detail-type = ["OrderCreated", "OrderShipped"]
  })
}

# 目標為其他帳戶的事件匯流排
resource "aws_cloudwatch_event_target" "partner_bus" {
  rule      = aws_cloudwatch_event_rule.forward_to_partner.name
  target_id = "PartnerEventBus"
  arn       = "arn:aws:events:ap-northeast-1:999999999999:event-bus/partner-events-bus"
  role_arn  = aws_iam_role.eventbridge_cross_account.arn
}

# 跨帳戶發送所需的 IAM 角色
resource "aws_iam_role" "eventbridge_cross_account" {
  name = "eventbridge-cross-account-role"

  assume_role_policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Effect = "Allow"
        Principal = {
          Service = "events.amazonaws.com"
        }
        Action = "sts:AssumeRole"
      }
    ]
  })
}

resource "aws_iam_role_policy" "eventbridge_cross_account" {
  name = "eventbridge-cross-account-policy"
  role = aws_iam_role.eventbridge_cross_account.id

  policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Effect   = "Allow"
        Action   = "events:PutEvents"
        Resource = "arn:aws:events:ap-northeast-1:999999999999:event-bus/partner-events-bus"
      }
    ]
  })
}

使用 AWS Organizations 簡化設定

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
# 使用 Organization ID 簡化跨帳戶存取
resource "aws_cloudwatch_event_bus_policy" "org_policy" {
  event_bus_name = aws_cloudwatch_event_bus.central_bus.name

  policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Sid       = "AllowOrganizationAccounts"
        Effect    = "Allow"
        Principal = "*"
        Action    = "events:PutEvents"
        Resource  = aws_cloudwatch_event_bus.central_bus.arn
        Condition = {
          StringEquals = {
            "aws:PrincipalOrgID" = "o-example12345"
          }
        }
      }
    ]
  })
}

Schema Registry 使用

Schema Registry 可自動發現和記錄事件結構,幫助團隊理解和使用事件。

啟用 Schema Discovery

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
# 在事件匯流排上啟用 Schema Discovery
aws schemas create-discoverer \
    --source-arn "arn:aws:events:ap-northeast-1:123456789012:event-bus/order-processing-bus" \
    --description "Discover order event schemas"

# 查看發現的 schemas
aws schemas list-schemas \
    --registry-name "discovered-schemas"

# 取得特定 schema 詳情
aws schemas describe-schema \
    --registry-name "discovered-schemas" \
    --schema-name "com.myapp.orders@OrderCreated"

建立自訂 Schema Registry

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# 建立自訂 registry
aws schemas create-registry \
    --registry-name "order-events-registry" \
    --description "Schema registry for order events"

# 建立 schema
aws schemas create-schema \
    --registry-name "order-events-registry" \
    --schema-name "OrderCreated" \
    --type "OpenApi3" \
    --content '{
        "openapi": "3.0.0",
        "info": {
            "title": "OrderCreated",
            "version": "1"
        },
        "paths": {},
        "components": {
            "schemas": {
                "OrderCreated": {
                    "type": "object",
                    "required": ["orderId", "customerId", "totalAmount"],
                    "properties": {
                        "orderId": {
                            "type": "string",
                            "description": "Unique order identifier"
                        },
                        "customerId": {
                            "type": "string",
                            "description": "Customer identifier"
                        },
                        "items": {
                            "type": "array",
                            "items": {
                                "type": "object",
                                "properties": {
                                    "productId": {"type": "string"},
                                    "quantity": {"type": "integer"},
                                    "price": {"type": "number"}
                                }
                            }
                        },
                        "totalAmount": {
                            "type": "number",
                            "description": "Total order amount"
                        },
                        "createdAt": {
                            "type": "string",
                            "format": "date-time"
                        }
                    }
                }
            }
        }
    }'

Terraform Schema Registry 設定

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
resource "aws_schemas_registry" "order_events" {
  name        = "order-events-registry"
  description = "Schema registry for order processing events"

  tags = {
    Environment = "production"
  }
}

resource "aws_schemas_schema" "order_created" {
  name          = "OrderCreated"
  registry_name = aws_schemas_registry.order_events.name
  type          = "OpenApi3"
  description   = "Schema for OrderCreated events"

  content = jsonencode({
    openapi = "3.0.0"
    info = {
      title   = "OrderCreated"
      version = "1"
    }
    paths = {}
    components = {
      schemas = {
        OrderCreated = {
          type     = "object"
          required = ["orderId", "customerId", "totalAmount"]
          properties = {
            orderId = {
              type        = "string"
              description = "Unique order identifier"
            }
            customerId = {
              type        = "string"
              description = "Customer identifier"
            }
            totalAmount = {
              type        = "number"
              description = "Total order amount"
            }
          }
        }
      }
    }
  })
}

resource "aws_schemas_discoverer" "order_bus" {
  source_arn  = aws_cloudwatch_event_bus.order_bus.arn
  description = "Auto-discover schemas from order event bus"

  tags = {
    Environment = "production"
  }
}

產生程式碼綁定

Schema Registry 可以自動產生各種程式語言的類型定義:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
# 下載 Python 程式碼綁定
aws schemas get-code-binding-source \
    --registry-name "order-events-registry" \
    --schema-name "OrderCreated" \
    --language "Python36" \
    --schema-version "1" \
    order_created.zip

# 支援的語言
# - Python36
# - TypeScript3
# - Java8
# - Go1

使用產生的類型(Python 範例)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# 使用從 Schema Registry 產生的類型
from schema_registry.order_created import OrderCreated, OrderItem

def create_order_event(order_id: str, customer_id: str, items: list) -> OrderCreated:
    """
    使用類型安全的方式建立訂單事件
    """
    order_items = [
        OrderItem(
            productId=item['product_id'],
            quantity=item['quantity'],
            price=item['price']
        )
        for item in items
    ]

    total_amount = sum(item['quantity'] * item['price'] for item in items)

    return OrderCreated(
        orderId=order_id,
        customerId=customer_id,
        items=order_items,
        totalAmount=total_amount
    )

監控與故障排除

CloudWatch 指標

EventBridge 自動發布以下指標到 CloudWatch:

指標名稱描述單位
Invocations規則觸發次數Count
FailedInvocations目標調用失敗次數Count
TriggeredRules匹配事件的規則數Count
MatchedEvents匹配規則的事件數Count
ThrottledRules被限流的規則數Count

建立 CloudWatch 儀表板

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
aws cloudwatch put-dashboard \
    --dashboard-name "EventBridge-Monitoring" \
    --dashboard-body '{
        "widgets": [
            {
                "type": "metric",
                "x": 0,
                "y": 0,
                "width": 12,
                "height": 6,
                "properties": {
                    "metrics": [
                        ["AWS/Events", "Invocations", "RuleName", "order-created-rule"],
                        [".", "FailedInvocations", ".", "."]
                    ],
                    "title": "Order Rule Invocations",
                    "region": "ap-northeast-1",
                    "period": 300
                }
            },
            {
                "type": "metric",
                "x": 12,
                "y": 0,
                "width": 12,
                "height": 6,
                "properties": {
                    "metrics": [
                        ["AWS/Events", "MatchedEvents", "EventBusName", "order-processing-bus"]
                    ],
                    "title": "Matched Events",
                    "region": "ap-northeast-1",
                    "period": 300
                }
            }
        ]
    }'

CloudWatch 告警設定

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# 失敗調用告警
resource "aws_cloudwatch_metric_alarm" "failed_invocations" {
  alarm_name          = "eventbridge-failed-invocations"
  comparison_operator = "GreaterThanThreshold"
  evaluation_periods  = 2
  metric_name         = "FailedInvocations"
  namespace           = "AWS/Events"
  period              = 300
  statistic           = "Sum"
  threshold           = 5
  alarm_description   = "Alert when EventBridge target invocations fail"

  dimensions = {
    RuleName = aws_cloudwatch_event_rule.order_created.name
  }

  alarm_actions = [aws_sns_topic.alerts.arn]
  ok_actions    = [aws_sns_topic.alerts.arn]

  tags = {
    Environment = "production"
  }
}

# 限流告警
resource "aws_cloudwatch_metric_alarm" "throttled_rules" {
  alarm_name          = "eventbridge-throttled-rules"
  comparison_operator = "GreaterThanThreshold"
  evaluation_periods  = 1
  metric_name         = "ThrottledRules"
  namespace           = "AWS/Events"
  period              = 60
  statistic           = "Sum"
  threshold           = 0
  alarm_description   = "Alert when rules are being throttled"

  dimensions = {
    EventBusName = aws_cloudwatch_event_bus.order_bus.name
  }

  alarm_actions = [aws_sns_topic.alerts.arn]
}

死信佇列(DLQ)設定

當目標無法成功處理事件時,可以將失敗的事件發送到 DLQ:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# SQS 死信佇列
resource "aws_sqs_queue" "event_dlq" {
  name                      = "eventbridge-dlq"
  message_retention_seconds = 1209600  # 14 天

  tags = {
    Purpose = "EventBridge-DeadLetterQueue"
  }
}

resource "aws_sqs_queue_policy" "event_dlq" {
  queue_url = aws_sqs_queue.event_dlq.id

  policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Effect    = "Allow"
        Principal = {
          Service = "events.amazonaws.com"
        }
        Action    = "sqs:SendMessage"
        Resource  = aws_sqs_queue.event_dlq.arn
      }
    ]
  })
}

# 帶有 DLQ 的事件目標
resource "aws_cloudwatch_event_target" "with_dlq" {
  rule           = aws_cloudwatch_event_rule.order_created.name
  event_bus_name = aws_cloudwatch_event_bus.order_bus.name
  target_id      = "OrderProcessorWithDLQ"
  arn            = aws_lambda_function.order_processor.arn

  dead_letter_config {
    arn = aws_sqs_queue.event_dlq.arn
  }

  retry_policy {
    maximum_event_age_in_seconds = 86400   # 24 小時
    maximum_retry_attempts       = 185     # 最大重試次數
  }
}

事件封存與重播

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# 建立事件封存
aws events create-archive \
    --archive-name "order-events-archive" \
    --event-source-arn "arn:aws:events:ap-northeast-1:123456789012:event-bus/order-processing-bus" \
    --description "Archive for order processing events" \
    --event-pattern '{
        "source": ["com.myapp.orders"]
    }' \
    --retention-days 90

# 重播封存的事件
aws events start-replay \
    --replay-name "order-replay-20250706" \
    --event-source-arn "arn:aws:events:ap-northeast-1:123456789012:event-bus/order-processing-bus" \
    --destination '{
        "Arn": "arn:aws:events:ap-northeast-1:123456789012:event-bus/order-processing-bus"
    }' \
    --event-start-time "2025-07-01T00:00:00Z" \
    --event-end-time "2025-07-05T23:59:59Z"

# 查看重播狀態
aws events describe-replay \
    --replay-name "order-replay-20250706"

故障排除 Lambda 函數

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
import json
import logging
import boto3

logger = logging.getLogger()
logger.setLevel(logging.INFO)

def dlq_processor(event, context):
    """
    處理死信佇列中的失敗事件
    """
    for record in event['Records']:
        try:
            # 解析原始事件
            body = json.loads(record['body'])
            original_event = body.get('detail', {})
            error_message = body.get('ErrorMessage', 'Unknown error')

            logger.info(f"Processing failed event: {json.dumps(original_event)}")
            logger.info(f"Error message: {error_message}")

            # 分析失敗原因
            if 'ValidationError' in error_message:
                handle_validation_error(original_event)
            elif 'ThrottlingException' in error_message:
                requeue_event(original_event)
            else:
                store_for_manual_review(original_event, error_message)

        except Exception as e:
            logger.error(f"Error processing DLQ message: {str(e)}")
            raise

def handle_validation_error(event):
    """處理驗證錯誤"""
    # 記錄到 S3 供後續分析
    s3 = boto3.client('s3')
    s3.put_object(
        Bucket='failed-events-bucket',
        Key=f"validation-errors/{event.get('orderId', 'unknown')}.json",
        Body=json.dumps(event)
    )

def requeue_event(event):
    """重新排隊被限流的事件"""
    eventbridge = boto3.client('events')
    eventbridge.put_events(
        Entries=[{
            'Source': event.get('source', 'com.myapp.orders'),
            'DetailType': event.get('detail-type', 'Unknown'),
            'Detail': json.dumps(event.get('detail', {})),
            'EventBusName': 'order-processing-bus'
        }]
    )

def store_for_manual_review(event, error):
    """儲存需要人工審查的事件"""
    dynamodb = boto3.resource('dynamodb')
    table = dynamodb.Table('FailedEvents')
    table.put_item(
        Item={
            'eventId': event.get('id', 'unknown'),
            'event': json.dumps(event),
            'error': error,
            'status': 'pending_review'
        }
    )

日誌分析查詢

使用 CloudWatch Logs Insights 分析 Lambda 日誌:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
-- 查詢失敗的事件處理
fields @timestamp, @message
| filter @message like /ERROR/
| sort @timestamp desc
| limit 100

-- 統計各事件類型的處理時間
fields @timestamp, detail_type, @duration
| filter @type = "REPORT"
| stats avg(@duration) as avg_duration,
        max(@duration) as max_duration,
        count(*) as invocations
  by detail_type

-- 查詢特定訂單的處理記錄
fields @timestamp, @message
| filter @message like /ORD-2025-001/
| sort @timestamp asc

最佳實踐總結

事件設計原則

  1. 使用明確的來源和類型

    • source 應反映產生事件的應用程式或服務
    • detail-type 應清楚描述事件的類型
  2. 保持事件不可變

    • 事件一旦發布就不應該被修改
    • 需要更新時應發布新的事件
  3. 包含足夠的上下文

    • 事件應該是自包含的
    • 消費者不應該需要查詢其他系統來理解事件

效能最佳化

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
# 批次發送事件以提高效能
def batch_publish_events(events, batch_size=10):
    """批次發布事件到 EventBridge"""
    client = boto3.client('events')

    for i in range(0, len(events), batch_size):
        batch = events[i:i + batch_size]
        entries = [
            {
                'Source': event['source'],
                'DetailType': event['detail_type'],
                'Detail': json.dumps(event['detail']),
                'EventBusName': 'order-processing-bus'
            }
            for event in batch
        ]

        response = client.put_events(Entries=entries)

        if response['FailedEntryCount'] > 0:
            # 處理失敗的事件
            handle_failed_events(response['Entries'], batch)

安全性考量

  1. 最小權限原則:僅授予必要的權限
  2. 使用資源政策:限制誰可以發送事件到您的事件匯流排
  3. 加密傳輸中的資料:EventBridge 預設使用 TLS
  4. 監控異常活動:設定告警以偵測異常的事件模式

透過本文的介紹,您應該對 AWS Lambda 與 EventBridge 的整合有了全面的了解。事件驅動架構能夠幫助您建構更加靈活、可擴展的雲端應用程式。建議從小規模的用例開始實踐,逐步擴展到更複雜的場景。

comments powered by Disqus
Built with Hugo
Theme Stack designed by Jimmy