在現代雲端架構中,事件驅動設計模式已成為建構可擴展、鬆耦合系統的關鍵方法。AWS EventBridge 作為一個無伺服器事件匯流排服務,與 Lambda 函數的整合提供了強大的事件處理能力。本文將深入探討如何運用這兩項服務建構高效的事件驅動架構。
EventBridge 服務概述
Amazon EventBridge 是一個無伺服器事件匯流排服務,能夠輕鬆連接應用程式與來自各種來源的資料。它前身為 CloudWatch Events,但提供了更多進階功能和更好的整合能力。
核心特性
EventBridge 的主要特性包括:
- 事件匯流排(Event Bus):接收和路由事件的中央管道
- 規則(Rules):定義事件匹配模式和目標
- Schema Registry:自動發現和儲存事件結構
- 封存與重播(Archive & Replay):事件的持久化和重新處理
- API Destinations:將事件發送到外部 HTTP 端點
事件來源類型
1
2
3
4
5
6
7
8
9
10
| ┌─────────────────────────────────────────────────────────────┐
│ EventBridge 事件來源 │
├─────────────────┬─────────────────┬─────────────────────────┤
│ AWS 服務事件 │ 自訂應用程式 │ SaaS 合作夥伴 │
├─────────────────┼─────────────────┼─────────────────────────┤
│ • EC2 狀態變更 │ • PutEvents API │ • Zendesk │
│ • S3 物件操作 │ • SDK 整合 │ • Datadog │
│ • CodePipeline │ • 自訂事件格式 │ • Auth0 │
│ • CloudTrail │ │ • Shopify │
└─────────────────┴─────────────────┴─────────────────────────┘
|
事件匯流排與規則
建立自訂事件匯流排
預設情況下,每個 AWS 帳戶都有一個 default 事件匯流排。但在實務中,建議為不同用途建立自訂事件匯流排。
1
2
3
4
5
6
7
8
9
10
| # 建立自訂事件匯流排
aws events create-event-bus --name "order-processing-bus"
# 查看所有事件匯流排
aws events list-event-buses
# 建立帶有標籤的事件匯流排
aws events create-event-bus \
--name "payment-events-bus" \
--tags Key=Environment,Value=Production Key=Team,Value=Backend
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
| # EventBridge 事件匯流排
resource "aws_cloudwatch_event_bus" "order_bus" {
name = "order-processing-bus"
tags = {
Environment = "production"
Application = "order-system"
}
}
# 事件匯流排政策
resource "aws_cloudwatch_event_bus_policy" "order_bus_policy" {
event_bus_name = aws_cloudwatch_event_bus.order_bus.name
policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Sid = "AllowAccountToPutEvents"
Effect = "Allow"
Principal = {
AWS = "arn:aws:iam::123456789012:root"
}
Action = "events:PutEvents"
Resource = aws_cloudwatch_event_bus.order_bus.arn
}
]
})
}
|
建立事件規則
事件規則定義了哪些事件應該被路由到哪些目標。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
| # 建立規則
aws events put-rule \
--name "order-created-rule" \
--event-bus-name "order-processing-bus" \
--event-pattern '{
"source": ["com.myapp.orders"],
"detail-type": ["OrderCreated"]
}' \
--state ENABLED \
--description "處理新訂單建立事件"
# 將 Lambda 函數設為目標
aws events put-targets \
--rule "order-created-rule" \
--event-bus-name "order-processing-bus" \
--targets '[{
"Id": "OrderProcessorLambda",
"Arn": "arn:aws:lambda:ap-northeast-1:123456789012:function:order-processor",
"Input": "{\"processor\": \"order-handler\"}"
}]'
|
事件模式匹配
EventBridge 提供了強大的事件模式匹配功能,讓您能夠精確地篩選需要處理的事件。
基本模式匹配
1
2
3
4
5
6
7
| {
"source": ["com.myapp.orders"],
"detail-type": ["OrderCreated", "OrderUpdated"],
"detail": {
"status": ["pending", "processing"]
}
}
|
進階模式匹配
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
| {
"source": ["com.myapp.orders"],
"detail": {
"amount": [{
"numeric": [">", 1000]
}],
"region": [{
"prefix": "ap-"
}],
"customer": {
"tier": ["premium", "enterprise"]
},
"items": {
"category": [{
"anything-but": ["digital"]
}]
}
}
}
|
模式匹配運算子
| 運算子 | 用途 | 範例 |
|---|
prefix | 字串前綴匹配 | {"prefix": "order-"} |
suffix | 字串後綴匹配 | {"suffix": ".json"} |
anything-but | 排除特定值 | {"anything-but": ["test"]} |
numeric | 數值比較 | {"numeric": [">=", 100]} |
cidr | IP 範圍匹配 | {"cidr": "10.0.0.0/24"} |
exists | 欄位存在檢查 | {"exists": true} |
複合模式範例
1
2
3
4
5
6
7
8
9
10
11
12
13
| {
"source": ["aws.ec2"],
"detail-type": ["EC2 Instance State-change Notification"],
"detail": {
"state": ["stopped", "terminated"],
"instance-id": [{
"prefix": "i-prod-"
}]
},
"region": [{
"anything-but": "us-east-1"
}]
}
|
Lambda 函數整合
Lambda 函數範例
以下是一個處理 EventBridge 事件的 Lambda 函數範例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
| import json
import logging
import boto3
from datetime import datetime
logger = logging.getLogger()
logger.setLevel(logging.INFO)
dynamodb = boto3.resource('dynamodb')
sns = boto3.client('sns')
def lambda_handler(event, context):
"""
處理 EventBridge 傳入的訂單事件
"""
logger.info(f"Received event: {json.dumps(event)}")
# 解析事件
detail_type = event.get('detail-type')
source = event.get('source')
detail = event.get('detail', {})
event_time = event.get('time')
# 根據事件類型處理
if detail_type == 'OrderCreated':
return process_order_created(detail)
elif detail_type == 'OrderUpdated':
return process_order_updated(detail)
elif detail_type == 'OrderCancelled':
return process_order_cancelled(detail)
else:
logger.warning(f"Unknown event type: {detail_type}")
return {
'statusCode': 400,
'body': json.dumps({'error': f'Unknown event type: {detail_type}'})
}
def process_order_created(detail):
"""處理新訂單建立"""
order_id = detail.get('orderId')
customer_id = detail.get('customerId')
items = detail.get('items', [])
total_amount = detail.get('totalAmount')
# 儲存訂單到 DynamoDB
table = dynamodb.Table('Orders')
table.put_item(
Item={
'orderId': order_id,
'customerId': customer_id,
'items': items,
'totalAmount': str(total_amount),
'status': 'pending',
'createdAt': datetime.utcnow().isoformat()
}
)
# 發送通知
sns.publish(
TopicArn='arn:aws:sns:ap-northeast-1:123456789012:order-notifications',
Message=json.dumps({
'orderId': order_id,
'message': f'New order created with amount: ${total_amount}'
}),
Subject='New Order Created'
)
logger.info(f"Order {order_id} processed successfully")
return {
'statusCode': 200,
'body': json.dumps({
'message': 'Order processed successfully',
'orderId': order_id
})
}
def process_order_updated(detail):
"""處理訂單更新"""
order_id = detail.get('orderId')
updates = detail.get('updates', {})
table = dynamodb.Table('Orders')
update_expression = "SET " + ", ".join([f"#{k} = :{k}" for k in updates.keys()])
expression_names = {f"#{k}": k for k in updates.keys()}
expression_values = {f":{k}": v for k, v in updates.items()}
table.update_item(
Key={'orderId': order_id},
UpdateExpression=update_expression,
ExpressionAttributeNames=expression_names,
ExpressionAttributeValues=expression_values
)
return {
'statusCode': 200,
'body': json.dumps({'message': f'Order {order_id} updated'})
}
def process_order_cancelled(detail):
"""處理訂單取消"""
order_id = detail.get('orderId')
reason = detail.get('reason', 'No reason provided')
table = dynamodb.Table('Orders')
table.update_item(
Key={'orderId': order_id},
UpdateExpression='SET #status = :status, #cancelReason = :reason, #cancelledAt = :time',
ExpressionAttributeNames={
'#status': 'status',
'#cancelReason': 'cancelReason',
'#cancelledAt': 'cancelledAt'
},
ExpressionAttributeValues={
':status': 'cancelled',
':reason': reason,
':time': datetime.utcnow().isoformat()
}
)
return {
'statusCode': 200,
'body': json.dumps({'message': f'Order {order_id} cancelled'})
}
|
Lambda 權限設定
Lambda 函數需要適當的權限才能被 EventBridge 調用:
1
2
3
4
5
6
7
| # 授予 EventBridge 調用 Lambda 的權限
aws lambda add-permission \
--function-name order-processor \
--statement-id EventBridgeInvoke \
--action lambda:InvokeFunction \
--principal events.amazonaws.com \
--source-arn "arn:aws:events:ap-northeast-1:123456789012:rule/order-processing-bus/order-created-rule"
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
| # Lambda 函數
resource "aws_lambda_function" "order_processor" {
filename = "order_processor.zip"
function_name = "order-processor"
role = aws_iam_role.lambda_role.arn
handler = "handler.lambda_handler"
runtime = "python3.11"
timeout = 30
memory_size = 256
environment {
variables = {
ORDERS_TABLE = aws_dynamodb_table.orders.name
SNS_TOPIC = aws_sns_topic.order_notifications.arn
}
}
tags = {
Environment = "production"
}
}
# IAM 角色
resource "aws_iam_role" "lambda_role" {
name = "order-processor-role"
assume_role_policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Action = "sts:AssumeRole"
Effect = "Allow"
Principal = {
Service = "lambda.amazonaws.com"
}
}
]
})
}
# Lambda 執行政策
resource "aws_iam_role_policy" "lambda_policy" {
name = "order-processor-policy"
role = aws_iam_role.lambda_role.id
policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Effect = "Allow"
Action = [
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents"
]
Resource = "arn:aws:logs:*:*:*"
},
{
Effect = "Allow"
Action = [
"dynamodb:PutItem",
"dynamodb:UpdateItem",
"dynamodb:GetItem"
]
Resource = aws_dynamodb_table.orders.arn
},
{
Effect = "Allow"
Action = "sns:Publish"
Resource = aws_sns_topic.order_notifications.arn
}
]
})
}
# EventBridge 規則
resource "aws_cloudwatch_event_rule" "order_created" {
name = "order-created-rule"
event_bus_name = aws_cloudwatch_event_bus.order_bus.name
description = "Capture order created events"
event_pattern = jsonencode({
source = ["com.myapp.orders"]
detail-type = ["OrderCreated"]
})
}
# EventBridge 目標
resource "aws_cloudwatch_event_target" "order_processor" {
rule = aws_cloudwatch_event_rule.order_created.name
event_bus_name = aws_cloudwatch_event_bus.order_bus.name
target_id = "OrderProcessorLambda"
arn = aws_lambda_function.order_processor.arn
}
# Lambda 權限
resource "aws_lambda_permission" "eventbridge_invoke" {
statement_id = "AllowEventBridgeInvoke"
action = "lambda:InvokeFunction"
function_name = aws_lambda_function.order_processor.function_name
principal = "events.amazonaws.com"
source_arn = aws_cloudwatch_event_rule.order_created.arn
}
|
發送自訂事件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
| import boto3
import json
from datetime import datetime
def publish_order_event(order_data, event_type):
"""
發布訂單事件到 EventBridge
"""
client = boto3.client('events')
response = client.put_events(
Entries=[
{
'Time': datetime.utcnow(),
'Source': 'com.myapp.orders',
'DetailType': event_type,
'Detail': json.dumps(order_data),
'EventBusName': 'order-processing-bus',
'TraceHeader': 'Root=1-5f84c7a1-sample-trace-id'
}
]
)
failed_count = response.get('FailedEntryCount', 0)
if failed_count > 0:
print(f"Failed to publish {failed_count} events")
for entry in response.get('Entries', []):
if 'ErrorCode' in entry:
print(f"Error: {entry['ErrorCode']} - {entry['ErrorMessage']}")
return response
# 使用範例
order_data = {
'orderId': 'ORD-2025-001',
'customerId': 'CUST-123',
'items': [
{'productId': 'PROD-001', 'quantity': 2, 'price': 29.99},
{'productId': 'PROD-002', 'quantity': 1, 'price': 49.99}
],
'totalAmount': 109.97,
'shippingAddress': {
'city': 'Taipei',
'country': 'Taiwan'
}
}
publish_order_event(order_data, 'OrderCreated')
|
排程規則設定
EventBridge 支援使用 cron 或 rate 表達式建立排程規則,可用於定期觸發 Lambda 函數。
Rate 表達式
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
| # 每 5 分鐘執行一次
aws events put-rule \
--name "every-5-minutes" \
--schedule-expression "rate(5 minutes)" \
--state ENABLED
# 每小時執行一次
aws events put-rule \
--name "hourly-task" \
--schedule-expression "rate(1 hour)" \
--state ENABLED
# 每天執行一次
aws events put-rule \
--name "daily-cleanup" \
--schedule-expression "rate(1 day)" \
--state ENABLED
|
Cron 表達式
EventBridge cron 表達式格式:cron(分鐘 小時 日期 月份 星期 年份)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
| # 每天早上 9 點(UTC)執行
aws events put-rule \
--name "daily-morning-report" \
--schedule-expression "cron(0 9 * * ? *)" \
--state ENABLED
# 每週一早上 8 點執行
aws events put-rule \
--name "weekly-monday-task" \
--schedule-expression "cron(0 8 ? * MON *)" \
--state ENABLED
# 每月第一天午夜執行
aws events put-rule \
--name "monthly-first-day" \
--schedule-expression "cron(0 0 1 * ? *)" \
--state ENABLED
# 每週一到週五的上午 9 點到下午 5 點,每 15 分鐘執行
aws events put-rule \
--name "business-hours-check" \
--schedule-expression "cron(0/15 9-17 ? * MON-FRI *)" \
--state ENABLED
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
| # 定期清理任務
resource "aws_cloudwatch_event_rule" "cleanup_schedule" {
name = "database-cleanup-schedule"
description = "Trigger database cleanup every night at 2 AM UTC"
schedule_expression = "cron(0 2 * * ? *)"
tags = {
Purpose = "DatabaseMaintenance"
}
}
resource "aws_cloudwatch_event_target" "cleanup_lambda" {
rule = aws_cloudwatch_event_rule.cleanup_schedule.name
target_id = "DatabaseCleanupLambda"
arn = aws_lambda_function.cleanup_function.arn
# 可選:傳入額外參數
input = jsonencode({
action = "cleanup"
retention = 30
table_name = "logs"
})
}
# 帶有重試設定的排程
resource "aws_cloudwatch_event_rule" "report_schedule" {
name = "weekly-report-schedule"
description = "Generate weekly report every Monday at 8 AM"
schedule_expression = "cron(0 8 ? * MON *)"
}
resource "aws_cloudwatch_event_target" "report_lambda" {
rule = aws_cloudwatch_event_rule.report_schedule.name
target_id = "WeeklyReportLambda"
arn = aws_lambda_function.report_function.arn
retry_policy {
maximum_event_age_in_seconds = 3600
maximum_retry_attempts = 3
}
dead_letter_config {
arn = aws_sqs_queue.dlq.arn
}
}
|
排程任務 Lambda 函數
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
| import json
import logging
import boto3
from datetime import datetime, timedelta
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def cleanup_handler(event, context):
"""
定期清理過期資料
"""
logger.info(f"Cleanup triggered at {datetime.utcnow().isoformat()}")
# 從事件中取得參數
retention_days = event.get('retention', 30)
table_name = event.get('table_name', 'logs')
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table(table_name)
# 計算過期時間
expiry_date = datetime.utcnow() - timedelta(days=retention_days)
expiry_timestamp = expiry_date.isoformat()
# 掃描並刪除過期項目
deleted_count = 0
scan_kwargs = {
'FilterExpression': '#ts < :expiry',
'ExpressionAttributeNames': {'#ts': 'timestamp'},
'ExpressionAttributeValues': {':expiry': expiry_timestamp}
}
while True:
response = table.scan(**scan_kwargs)
items = response.get('Items', [])
with table.batch_writer() as batch:
for item in items:
batch.delete_item(Key={'id': item['id']})
deleted_count += 1
# 檢查是否還有更多項目
if 'LastEvaluatedKey' not in response:
break
scan_kwargs['ExclusiveStartKey'] = response['LastEvaluatedKey']
logger.info(f"Cleanup completed. Deleted {deleted_count} items.")
return {
'statusCode': 200,
'body': json.dumps({
'message': 'Cleanup completed',
'deletedCount': deleted_count,
'timestamp': datetime.utcnow().isoformat()
})
}
|
跨帳戶事件傳遞
EventBridge 支援在不同 AWS 帳戶之間傳遞事件,這對於微服務架構和多帳戶策略非常重要。
架構概述
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
| ┌─────────────────────────────────────────────────────────────────────────┐
│ 跨帳戶事件傳遞架構 │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ Account A (發送者) Account B (接收者) │
│ ┌─────────────────────┐ ┌─────────────────────┐ │
│ │ Application │ │ Event Bus │ │
│ │ ┌─────────────┐ │ │ (custom-bus) │ │
│ │ │ PutEvents │───┼──────────────┼──▶ ┌─────────────┐ │ │
│ │ └─────────────┘ │ │ │ Rule │ │ │
│ │ │ │ │ └──────┬──────┘ │ │
│ │ ▼ │ │ │ │ │
│ │ ┌─────────────┐ │ │ ▼ │ │
│ │ │ Event Bus │ │ │ ┌─────────────┐ │ │
│ │ │ (default) │ │ │ │ Lambda │ │ │
│ │ └─────────────┘ │ │ └─────────────┘ │ │
│ └─────────────────────┘ └─────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────┘
|
設定接收端(Account B)
首先,在接收帳戶中設定事件匯流排政策:
1
2
3
4
5
6
| # Account B: 建立允許 Account A 發送事件的政策
aws events put-permission \
--event-bus-name "partner-events-bus" \
--statement-id "AllowAccountA" \
--action "events:PutEvents" \
--principal "111111111111" # Account A 的帳戶 ID
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
| # ============================================
# Account B(接收端)設定
# ============================================
resource "aws_cloudwatch_event_bus" "partner_bus" {
name = "partner-events-bus"
}
resource "aws_cloudwatch_event_bus_policy" "cross_account" {
event_bus_name = aws_cloudwatch_event_bus.partner_bus.name
policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Sid = "AllowCrossAccountPutEvents"
Effect = "Allow"
Principal = {
AWS = [
"arn:aws:iam::111111111111:root", # Account A
"arn:aws:iam::222222222222:root" # Account C
]
}
Action = "events:PutEvents"
Resource = aws_cloudwatch_event_bus.partner_bus.arn
Condition = {
StringEquals = {
"events:source" = ["com.partner.orders", "com.partner.inventory"]
}
}
}
]
})
}
# 接收事件的規則
resource "aws_cloudwatch_event_rule" "cross_account_orders" {
name = "cross-account-order-events"
event_bus_name = aws_cloudwatch_event_bus.partner_bus.name
description = "Process order events from partner accounts"
event_pattern = jsonencode({
source = ["com.partner.orders"]
})
}
resource "aws_cloudwatch_event_target" "process_partner_orders" {
rule = aws_cloudwatch_event_rule.cross_account_orders.name
event_bus_name = aws_cloudwatch_event_bus.partner_bus.name
target_id = "PartnerOrderProcessor"
arn = aws_lambda_function.partner_order_processor.arn
}
|
設定發送端(Account A)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
| # ============================================
# Account A(發送端)設定
# ============================================
# 建立轉發規則
resource "aws_cloudwatch_event_rule" "forward_to_partner" {
name = "forward-orders-to-partner"
description = "Forward order events to partner account"
event_pattern = jsonencode({
source = ["com.myapp.orders"]
detail-type = ["OrderCreated", "OrderShipped"]
})
}
# 目標為其他帳戶的事件匯流排
resource "aws_cloudwatch_event_target" "partner_bus" {
rule = aws_cloudwatch_event_rule.forward_to_partner.name
target_id = "PartnerEventBus"
arn = "arn:aws:events:ap-northeast-1:999999999999:event-bus/partner-events-bus"
role_arn = aws_iam_role.eventbridge_cross_account.arn
}
# 跨帳戶發送所需的 IAM 角色
resource "aws_iam_role" "eventbridge_cross_account" {
name = "eventbridge-cross-account-role"
assume_role_policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Effect = "Allow"
Principal = {
Service = "events.amazonaws.com"
}
Action = "sts:AssumeRole"
}
]
})
}
resource "aws_iam_role_policy" "eventbridge_cross_account" {
name = "eventbridge-cross-account-policy"
role = aws_iam_role.eventbridge_cross_account.id
policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Effect = "Allow"
Action = "events:PutEvents"
Resource = "arn:aws:events:ap-northeast-1:999999999999:event-bus/partner-events-bus"
}
]
})
}
|
使用 AWS Organizations 簡化設定
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
| # 使用 Organization ID 簡化跨帳戶存取
resource "aws_cloudwatch_event_bus_policy" "org_policy" {
event_bus_name = aws_cloudwatch_event_bus.central_bus.name
policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Sid = "AllowOrganizationAccounts"
Effect = "Allow"
Principal = "*"
Action = "events:PutEvents"
Resource = aws_cloudwatch_event_bus.central_bus.arn
Condition = {
StringEquals = {
"aws:PrincipalOrgID" = "o-example12345"
}
}
}
]
})
}
|
Schema Registry 使用
Schema Registry 可自動發現和記錄事件結構,幫助團隊理解和使用事件。
啟用 Schema Discovery
1
2
3
4
5
6
7
8
9
10
11
12
13
| # 在事件匯流排上啟用 Schema Discovery
aws schemas create-discoverer \
--source-arn "arn:aws:events:ap-northeast-1:123456789012:event-bus/order-processing-bus" \
--description "Discover order event schemas"
# 查看發現的 schemas
aws schemas list-schemas \
--registry-name "discovered-schemas"
# 取得特定 schema 詳情
aws schemas describe-schema \
--registry-name "discovered-schemas" \
--schema-name "com.myapp.orders@OrderCreated"
|
建立自訂 Schema Registry
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
| # 建立自訂 registry
aws schemas create-registry \
--registry-name "order-events-registry" \
--description "Schema registry for order events"
# 建立 schema
aws schemas create-schema \
--registry-name "order-events-registry" \
--schema-name "OrderCreated" \
--type "OpenApi3" \
--content '{
"openapi": "3.0.0",
"info": {
"title": "OrderCreated",
"version": "1"
},
"paths": {},
"components": {
"schemas": {
"OrderCreated": {
"type": "object",
"required": ["orderId", "customerId", "totalAmount"],
"properties": {
"orderId": {
"type": "string",
"description": "Unique order identifier"
},
"customerId": {
"type": "string",
"description": "Customer identifier"
},
"items": {
"type": "array",
"items": {
"type": "object",
"properties": {
"productId": {"type": "string"},
"quantity": {"type": "integer"},
"price": {"type": "number"}
}
}
},
"totalAmount": {
"type": "number",
"description": "Total order amount"
},
"createdAt": {
"type": "string",
"format": "date-time"
}
}
}
}
}
}'
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
| resource "aws_schemas_registry" "order_events" {
name = "order-events-registry"
description = "Schema registry for order processing events"
tags = {
Environment = "production"
}
}
resource "aws_schemas_schema" "order_created" {
name = "OrderCreated"
registry_name = aws_schemas_registry.order_events.name
type = "OpenApi3"
description = "Schema for OrderCreated events"
content = jsonencode({
openapi = "3.0.0"
info = {
title = "OrderCreated"
version = "1"
}
paths = {}
components = {
schemas = {
OrderCreated = {
type = "object"
required = ["orderId", "customerId", "totalAmount"]
properties = {
orderId = {
type = "string"
description = "Unique order identifier"
}
customerId = {
type = "string"
description = "Customer identifier"
}
totalAmount = {
type = "number"
description = "Total order amount"
}
}
}
}
}
})
}
resource "aws_schemas_discoverer" "order_bus" {
source_arn = aws_cloudwatch_event_bus.order_bus.arn
description = "Auto-discover schemas from order event bus"
tags = {
Environment = "production"
}
}
|
產生程式碼綁定
Schema Registry 可以自動產生各種程式語言的類型定義:
1
2
3
4
5
6
7
8
9
10
11
12
13
| # 下載 Python 程式碼綁定
aws schemas get-code-binding-source \
--registry-name "order-events-registry" \
--schema-name "OrderCreated" \
--language "Python36" \
--schema-version "1" \
order_created.zip
# 支援的語言
# - Python36
# - TypeScript3
# - Java8
# - Go1
|
使用產生的類型(Python 範例)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
| # 使用從 Schema Registry 產生的類型
from schema_registry.order_created import OrderCreated, OrderItem
def create_order_event(order_id: str, customer_id: str, items: list) -> OrderCreated:
"""
使用類型安全的方式建立訂單事件
"""
order_items = [
OrderItem(
productId=item['product_id'],
quantity=item['quantity'],
price=item['price']
)
for item in items
]
total_amount = sum(item['quantity'] * item['price'] for item in items)
return OrderCreated(
orderId=order_id,
customerId=customer_id,
items=order_items,
totalAmount=total_amount
)
|
監控與故障排除
CloudWatch 指標
EventBridge 自動發布以下指標到 CloudWatch:
| 指標名稱 | 描述 | 單位 |
|---|
Invocations | 規則觸發次數 | Count |
FailedInvocations | 目標調用失敗次數 | Count |
TriggeredRules | 匹配事件的規則數 | Count |
MatchedEvents | 匹配規則的事件數 | Count |
ThrottledRules | 被限流的規則數 | Count |
建立 CloudWatch 儀表板
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
| aws cloudwatch put-dashboard \
--dashboard-name "EventBridge-Monitoring" \
--dashboard-body '{
"widgets": [
{
"type": "metric",
"x": 0,
"y": 0,
"width": 12,
"height": 6,
"properties": {
"metrics": [
["AWS/Events", "Invocations", "RuleName", "order-created-rule"],
[".", "FailedInvocations", ".", "."]
],
"title": "Order Rule Invocations",
"region": "ap-northeast-1",
"period": 300
}
},
{
"type": "metric",
"x": 12,
"y": 0,
"width": 12,
"height": 6,
"properties": {
"metrics": [
["AWS/Events", "MatchedEvents", "EventBusName", "order-processing-bus"]
],
"title": "Matched Events",
"region": "ap-northeast-1",
"period": 300
}
}
]
}'
|
CloudWatch 告警設定
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
| # 失敗調用告警
resource "aws_cloudwatch_metric_alarm" "failed_invocations" {
alarm_name = "eventbridge-failed-invocations"
comparison_operator = "GreaterThanThreshold"
evaluation_periods = 2
metric_name = "FailedInvocations"
namespace = "AWS/Events"
period = 300
statistic = "Sum"
threshold = 5
alarm_description = "Alert when EventBridge target invocations fail"
dimensions = {
RuleName = aws_cloudwatch_event_rule.order_created.name
}
alarm_actions = [aws_sns_topic.alerts.arn]
ok_actions = [aws_sns_topic.alerts.arn]
tags = {
Environment = "production"
}
}
# 限流告警
resource "aws_cloudwatch_metric_alarm" "throttled_rules" {
alarm_name = "eventbridge-throttled-rules"
comparison_operator = "GreaterThanThreshold"
evaluation_periods = 1
metric_name = "ThrottledRules"
namespace = "AWS/Events"
period = 60
statistic = "Sum"
threshold = 0
alarm_description = "Alert when rules are being throttled"
dimensions = {
EventBusName = aws_cloudwatch_event_bus.order_bus.name
}
alarm_actions = [aws_sns_topic.alerts.arn]
}
|
死信佇列(DLQ)設定
當目標無法成功處理事件時,可以將失敗的事件發送到 DLQ:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
| # SQS 死信佇列
resource "aws_sqs_queue" "event_dlq" {
name = "eventbridge-dlq"
message_retention_seconds = 1209600 # 14 天
tags = {
Purpose = "EventBridge-DeadLetterQueue"
}
}
resource "aws_sqs_queue_policy" "event_dlq" {
queue_url = aws_sqs_queue.event_dlq.id
policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Effect = "Allow"
Principal = {
Service = "events.amazonaws.com"
}
Action = "sqs:SendMessage"
Resource = aws_sqs_queue.event_dlq.arn
}
]
})
}
# 帶有 DLQ 的事件目標
resource "aws_cloudwatch_event_target" "with_dlq" {
rule = aws_cloudwatch_event_rule.order_created.name
event_bus_name = aws_cloudwatch_event_bus.order_bus.name
target_id = "OrderProcessorWithDLQ"
arn = aws_lambda_function.order_processor.arn
dead_letter_config {
arn = aws_sqs_queue.event_dlq.arn
}
retry_policy {
maximum_event_age_in_seconds = 86400 # 24 小時
maximum_retry_attempts = 185 # 最大重試次數
}
}
|
事件封存與重播
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
| # 建立事件封存
aws events create-archive \
--archive-name "order-events-archive" \
--event-source-arn "arn:aws:events:ap-northeast-1:123456789012:event-bus/order-processing-bus" \
--description "Archive for order processing events" \
--event-pattern '{
"source": ["com.myapp.orders"]
}' \
--retention-days 90
# 重播封存的事件
aws events start-replay \
--replay-name "order-replay-20250706" \
--event-source-arn "arn:aws:events:ap-northeast-1:123456789012:event-bus/order-processing-bus" \
--destination '{
"Arn": "arn:aws:events:ap-northeast-1:123456789012:event-bus/order-processing-bus"
}' \
--event-start-time "2025-07-01T00:00:00Z" \
--event-end-time "2025-07-05T23:59:59Z"
# 查看重播狀態
aws events describe-replay \
--replay-name "order-replay-20250706"
|
故障排除 Lambda 函數
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
| import json
import logging
import boto3
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def dlq_processor(event, context):
"""
處理死信佇列中的失敗事件
"""
for record in event['Records']:
try:
# 解析原始事件
body = json.loads(record['body'])
original_event = body.get('detail', {})
error_message = body.get('ErrorMessage', 'Unknown error')
logger.info(f"Processing failed event: {json.dumps(original_event)}")
logger.info(f"Error message: {error_message}")
# 分析失敗原因
if 'ValidationError' in error_message:
handle_validation_error(original_event)
elif 'ThrottlingException' in error_message:
requeue_event(original_event)
else:
store_for_manual_review(original_event, error_message)
except Exception as e:
logger.error(f"Error processing DLQ message: {str(e)}")
raise
def handle_validation_error(event):
"""處理驗證錯誤"""
# 記錄到 S3 供後續分析
s3 = boto3.client('s3')
s3.put_object(
Bucket='failed-events-bucket',
Key=f"validation-errors/{event.get('orderId', 'unknown')}.json",
Body=json.dumps(event)
)
def requeue_event(event):
"""重新排隊被限流的事件"""
eventbridge = boto3.client('events')
eventbridge.put_events(
Entries=[{
'Source': event.get('source', 'com.myapp.orders'),
'DetailType': event.get('detail-type', 'Unknown'),
'Detail': json.dumps(event.get('detail', {})),
'EventBusName': 'order-processing-bus'
}]
)
def store_for_manual_review(event, error):
"""儲存需要人工審查的事件"""
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table('FailedEvents')
table.put_item(
Item={
'eventId': event.get('id', 'unknown'),
'event': json.dumps(event),
'error': error,
'status': 'pending_review'
}
)
|
日誌分析查詢
使用 CloudWatch Logs Insights 分析 Lambda 日誌:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
| -- 查詢失敗的事件處理
fields @timestamp, @message
| filter @message like /ERROR/
| sort @timestamp desc
| limit 100
-- 統計各事件類型的處理時間
fields @timestamp, detail_type, @duration
| filter @type = "REPORT"
| stats avg(@duration) as avg_duration,
max(@duration) as max_duration,
count(*) as invocations
by detail_type
-- 查詢特定訂單的處理記錄
fields @timestamp, @message
| filter @message like /ORD-2025-001/
| sort @timestamp asc
|
最佳實踐總結
事件設計原則
使用明確的來源和類型
source 應反映產生事件的應用程式或服務detail-type 應清楚描述事件的類型
保持事件不可變
- 事件一旦發布就不應該被修改
- 需要更新時應發布新的事件
包含足夠的上下文
- 事件應該是自包含的
- 消費者不應該需要查詢其他系統來理解事件
效能最佳化
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
| # 批次發送事件以提高效能
def batch_publish_events(events, batch_size=10):
"""批次發布事件到 EventBridge"""
client = boto3.client('events')
for i in range(0, len(events), batch_size):
batch = events[i:i + batch_size]
entries = [
{
'Source': event['source'],
'DetailType': event['detail_type'],
'Detail': json.dumps(event['detail']),
'EventBusName': 'order-processing-bus'
}
for event in batch
]
response = client.put_events(Entries=entries)
if response['FailedEntryCount'] > 0:
# 處理失敗的事件
handle_failed_events(response['Entries'], batch)
|
安全性考量
- 最小權限原則:僅授予必要的權限
- 使用資源政策:限制誰可以發送事件到您的事件匯流排
- 加密傳輸中的資料:EventBridge 預設使用 TLS
- 監控異常活動:設定告警以偵測異常的事件模式
透過本文的介紹,您應該對 AWS Lambda 與 EventBridge 的整合有了全面的了解。事件驅動架構能夠幫助您建構更加靈活、可擴展的雲端應用程式。建議從小規模的用例開始實踐,逐步擴展到更複雜的場景。