AWS Lambda 與 Step Functions 工作流程

AWS Lambda and Step Functions Workflow Orchestration

在現代雲端架構中,無伺服器(Serverless)運算已成為主流。AWS Lambda 提供了強大的函式即服務(FaaS)能力,而 AWS Step Functions 則讓我們能夠將多個 Lambda 函式編排成複雜的工作流程。本文將深入探討如何結合這兩項服務,建構可靠且可擴展的應用程式。

Step Functions 概述

AWS Step Functions 是一項完全託管的工作流程編排服務,讓您可以使用視覺化工作流程來協調分散式應用程式和微服務的元件。它提供了以下核心優勢:

主要特點

  • 視覺化工作流程:透過 Workflow Studio 或 JSON 定義,清楚呈現應用程式邏輯
  • 內建錯誤處理:自動重試、例外捕獲和回退機制
  • 狀態追蹤:每次執行都有完整的歷史記錄
  • 與 AWS 服務整合:原生支援超過 200 種 AWS 服務

工作流程類型

Step Functions 提供兩種工作流程類型:

類型特點適用場景
Standard Workflows最長執行時間 1 年,exactly-once 執行長時間執行的工作流程、需要稽核追蹤
Express Workflows最長執行時間 5 分鐘,高吞吐量高頻率事件處理、串流資料處理
1
2
3
4
5
6
# 使用 AWS CLI 建立狀態機
aws stepfunctions create-state-machine \
    --name "OrderProcessingWorkflow" \
    --definition file://state-machine.json \
    --role-arn arn:aws:iam::123456789012:role/StepFunctionsExecutionRole \
    --type STANDARD

狀態機定義語言(ASL)

Amazon States Language(ASL)是用於定義 Step Functions 狀態機的 JSON 格式語言。它描述了工作流程中的每個狀態及其轉換邏輯。

基本結構

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
{
  "Comment": "訂單處理工作流程",
  "StartAt": "ValidateOrder",
  "States": {
    "ValidateOrder": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:ap-northeast-1:123456789012:function:ValidateOrder",
      "Next": "ProcessPayment",
      "Catch": [
        {
          "ErrorEquals": ["ValidationError"],
          "Next": "OrderFailed"
        }
      ]
    },
    "ProcessPayment": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:ap-northeast-1:123456789012:function:ProcessPayment",
      "Next": "ShipOrder"
    },
    "ShipOrder": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:ap-northeast-1:123456789012:function:ShipOrder",
      "End": true
    },
    "OrderFailed": {
      "Type": "Fail",
      "Error": "OrderProcessingFailed",
      "Cause": "訂單驗證失敗"
    }
  }
}

輸入與輸出處理

ASL 提供了強大的資料流控制功能:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
{
  "Type": "Task",
  "Resource": "arn:aws:lambda:ap-northeast-1:123456789012:function:ProcessData",
  "InputPath": "$.orderDetails",
  "ResultPath": "$.processingResult",
  "OutputPath": "$.processingResult",
  "Parameters": {
    "orderId.$": "$.orderId",
    "items.$": "$.items",
    "timestamp.$": "$$.State.EnteredTime"
  },
  "ResultSelector": {
    "statusCode.$": "$.statusCode",
    "processedAt.$": "$.timestamp"
  },
  "Next": "NextState"
}
  • InputPath:選擇輸入資料的子集
  • Parameters:建構傳遞給任務的 JSON
  • ResultSelector:從任務結果中選擇資料
  • ResultPath:決定結果放置的位置
  • OutputPath:選擇輸出資料的子集

各類狀態類型

Step Functions 支援八種狀態類型,每種都有其特定用途:

1. Task State(任務狀態)

執行工作的核心狀態,可呼叫 Lambda 函式或其他 AWS 服務:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
{
  "ProcessOrder": {
    "Type": "Task",
    "Resource": "arn:aws:states:::lambda:invoke",
    "Parameters": {
      "FunctionName": "arn:aws:lambda:ap-northeast-1:123456789012:function:ProcessOrder",
      "Payload.$": "$"
    },
    "TimeoutSeconds": 300,
    "HeartbeatSeconds": 60,
    "Next": "NotifyCustomer"
  }
}

2. Choice State(選擇狀態)

根據條件決定下一個狀態:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
{
  "CheckOrderAmount": {
    "Type": "Choice",
    "Choices": [
      {
        "Variable": "$.orderAmount",
        "NumericGreaterThan": 10000,
        "Next": "RequireApproval"
      },
      {
        "Variable": "$.orderAmount",
        "NumericGreaterThanEquals": 1000,
        "Next": "StandardProcess"
      }
    ],
    "Default": "FastTrackProcess"
  }
}

3. Parallel State(平行狀態)

同時執行多個分支:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
{
  "ParallelProcessing": {
    "Type": "Parallel",
    "Branches": [
      {
        "StartAt": "SendEmail",
        "States": {
          "SendEmail": {
            "Type": "Task",
            "Resource": "arn:aws:lambda:ap-northeast-1:123456789012:function:SendEmail",
            "End": true
          }
        }
      },
      {
        "StartAt": "SendSMS",
        "States": {
          "SendSMS": {
            "Type": "Task",
            "Resource": "arn:aws:lambda:ap-northeast-1:123456789012:function:SendSMS",
            "End": true
          }
        }
      }
    ],
    "Next": "LogNotification"
  }
}

4. Map State(映射狀態)

對陣列中的每個元素執行相同操作:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
{
  "ProcessItems": {
    "Type": "Map",
    "ItemsPath": "$.items",
    "MaxConcurrency": 10,
    "ItemProcessor": {
      "ProcessorConfig": {
        "Mode": "INLINE"
      },
      "StartAt": "ProcessSingleItem",
      "States": {
        "ProcessSingleItem": {
          "Type": "Task",
          "Resource": "arn:aws:lambda:ap-northeast-1:123456789012:function:ProcessItem",
          "End": true
        }
      }
    },
    "Next": "AggregateResults"
  }
}

5. Wait State(等待狀態)

暫停執行指定時間:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
{
  "WaitForPayment": {
    "Type": "Wait",
    "Seconds": 3600,
    "Next": "CheckPaymentStatus"
  },
  "WaitUntilTimestamp": {
    "Type": "Wait",
    "TimestampPath": "$.scheduledTime",
    "Next": "ExecuteScheduledTask"
  }
}

6. Pass State(傳遞狀態)

傳遞或轉換資料:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
{
  "SetDefaults": {
    "Type": "Pass",
    "Result": {
      "status": "pending",
      "retryCount": 0
    },
    "ResultPath": "$.defaults",
    "Next": "ProcessOrder"
  }
}

7. Succeed State(成功狀態)

標記工作流程成功結束:

1
2
3
4
5
{
  "OrderCompleted": {
    "Type": "Succeed"
  }
}

8. Fail State(失敗狀態)

標記工作流程失敗:

1
2
3
4
5
6
7
{
  "OrderFailed": {
    "Type": "Fail",
    "Error": "OrderProcessingError",
    "Cause": "無法處理訂單"
  }
}

Lambda 整合模式

Step Functions 提供多種與 Lambda 整合的模式:

1. 請求-回應模式(Request-Response)

最基本的整合方式,Step Functions 呼叫 Lambda 並等待回應:

1
2
3
4
5
6
7
{
  "InvokeLambda": {
    "Type": "Task",
    "Resource": "arn:aws:lambda:ap-northeast-1:123456789012:function:MyFunction",
    "Next": "NextState"
  }
}

2. 使用 Lambda Invoke API

更細緻的控制,可指定呼叫類型:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
{
  "InvokeLambdaWithAPI": {
    "Type": "Task",
    "Resource": "arn:aws:states:::lambda:invoke",
    "Parameters": {
      "FunctionName": "arn:aws:lambda:ap-northeast-1:123456789012:function:MyFunction",
      "InvocationType": "RequestResponse",
      "Payload": {
        "key1.$": "$.value1",
        "key2": "staticValue"
      }
    },
    "ResultSelector": {
      "result.$": "$.Payload"
    },
    "Next": "NextState"
  }
}

3. 等待回呼模式(Callback Pattern)

適用於需要等待外部系統回應的場景:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
{
  "WaitForCallback": {
    "Type": "Task",
    "Resource": "arn:aws:states:::lambda:invoke.waitForTaskToken",
    "Parameters": {
      "FunctionName": "arn:aws:lambda:ap-northeast-1:123456789012:function:StartExternalProcess",
      "Payload": {
        "taskToken.$": "$$.Task.Token",
        "orderId.$": "$.orderId"
      }
    },
    "TimeoutSeconds": 86400,
    "Next": "ProcessResult"
  }
}

Lambda 函式需要使用 Task Token 來完成任務:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import boto3
import json

def lambda_handler(event, context):
    sfn_client = boto3.client('stepfunctions')

    task_token = event['taskToken']
    order_id = event['orderId']

    # 執行外部處理...
    result = process_external_system(order_id)

    # 回報成功
    sfn_client.send_task_success(
        taskToken=task_token,
        output=json.dumps({
            'status': 'completed',
            'result': result
        })
    )

    return {'statusCode': 200}

def process_external_system(order_id):
    # 外部系統處理邏輯
    return {'processed': True}

Lambda 函式範例

以下是一個完整的 Lambda 函式範例,用於訂單處理:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
import json
import logging
import boto3
from datetime import datetime

logger = logging.getLogger()
logger.setLevel(logging.INFO)

dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table('Orders')

def lambda_handler(event, context):
    """
    訂單處理 Lambda 函式
    """
    logger.info(f"收到事件: {json.dumps(event)}")

    try:
        order_id = event.get('orderId')
        items = event.get('items', [])
        customer_id = event.get('customerId')

        # 驗證訂單
        if not order_id or not items:
            raise ValueError("缺少必要的訂單資訊")

        # 計算訂單總額
        total_amount = sum(item.get('price', 0) * item.get('quantity', 0) for item in items)

        # 儲存訂單到 DynamoDB
        table.put_item(
            Item={
                'orderId': order_id,
                'customerId': customer_id,
                'items': items,
                'totalAmount': str(total_amount),
                'status': 'PROCESSING',
                'createdAt': datetime.utcnow().isoformat()
            }
        )

        return {
            'statusCode': 200,
            'orderId': order_id,
            'totalAmount': total_amount,
            'status': 'PROCESSING',
            'message': '訂單處理成功'
        }

    except ValueError as e:
        logger.error(f"驗證錯誤: {str(e)}")
        raise Exception(json.dumps({
            'errorType': 'ValidationError',
            'errorMessage': str(e)
        }))
    except Exception as e:
        logger.error(f"處理錯誤: {str(e)}")
        raise

錯誤處理與重試

Step Functions 提供了強大的錯誤處理機制:

Retry 配置

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
{
  "ProcessPayment": {
    "Type": "Task",
    "Resource": "arn:aws:lambda:ap-northeast-1:123456789012:function:ProcessPayment",
    "Retry": [
      {
        "ErrorEquals": ["Lambda.ServiceException", "Lambda.TooManyRequestsException"],
        "IntervalSeconds": 2,
        "MaxAttempts": 6,
        "BackoffRate": 2,
        "JitterStrategy": "FULL"
      },
      {
        "ErrorEquals": ["States.Timeout"],
        "IntervalSeconds": 5,
        "MaxAttempts": 3,
        "BackoffRate": 1.5
      }
    ],
    "Catch": [
      {
        "ErrorEquals": ["PaymentDeclined"],
        "ResultPath": "$.error",
        "Next": "HandlePaymentDeclined"
      },
      {
        "ErrorEquals": ["States.ALL"],
        "ResultPath": "$.error",
        "Next": "HandleGeneralError"
      }
    ],
    "Next": "ShipOrder"
  }
}

重試參數說明

參數說明
ErrorEquals要匹配的錯誤名稱陣列
IntervalSeconds首次重試前等待的秒數
MaxAttempts最大重試次數
BackoffRate每次重試間隔的倍數
JitterStrategy抖動策略(FULL 或 NONE)

常見錯誤類型

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
{
  "Catch": [
    {
      "ErrorEquals": ["Lambda.ServiceException"],
      "Comment": "Lambda 服務錯誤",
      "Next": "HandleLambdaError"
    },
    {
      "ErrorEquals": ["Lambda.AWSLambdaException"],
      "Comment": "Lambda 呼叫錯誤",
      "Next": "HandleLambdaError"
    },
    {
      "ErrorEquals": ["Lambda.SdkClientException"],
      "Comment": "SDK 客戶端錯誤",
      "Next": "HandleLambdaError"
    },
    {
      "ErrorEquals": ["States.TaskFailed"],
      "Comment": "任務失敗",
      "Next": "HandleTaskError"
    },
    {
      "ErrorEquals": ["States.Timeout"],
      "Comment": "任務超時",
      "Next": "HandleTimeout"
    },
    {
      "ErrorEquals": ["States.ALL"],
      "Comment": "捕獲所有其他錯誤",
      "Next": "HandleGeneralError"
    }
  ]
}

錯誤處理狀態範例

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
{
  "HandleGeneralError": {
    "Type": "Task",
    "Resource": "arn:aws:lambda:ap-northeast-1:123456789012:function:LogError",
    "Parameters": {
      "errorInfo.$": "$.error",
      "executionId.$": "$$.Execution.Id",
      "stateName.$": "$$.State.Name"
    },
    "Next": "NotifyAdmin"
  },
  "NotifyAdmin": {
    "Type": "Task",
    "Resource": "arn:aws:states:::sns:publish",
    "Parameters": {
      "TopicArn": "arn:aws:sns:ap-northeast-1:123456789012:WorkflowErrors",
      "Message.$": "States.Format('工作流程錯誤: {}', $.error.Cause)",
      "Subject": "Step Functions 錯誤通知"
    },
    "Next": "WorkflowFailed"
  },
  "WorkflowFailed": {
    "Type": "Fail",
    "Error": "WorkflowExecutionFailed",
    "Cause": "工作流程執行失敗,請查看錯誤日誌"
  }
}

平行與分支執行

平行執行(Parallel State)

同時執行多個獨立分支,所有分支完成後才繼續:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
{
  "Comment": "訂單處理平行工作流程",
  "StartAt": "ValidateOrder",
  "States": {
    "ValidateOrder": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:ap-northeast-1:123456789012:function:ValidateOrder",
      "Next": "ParallelProcessing"
    },
    "ParallelProcessing": {
      "Type": "Parallel",
      "Branches": [
        {
          "StartAt": "ProcessInventory",
          "States": {
            "ProcessInventory": {
              "Type": "Task",
              "Resource": "arn:aws:lambda:ap-northeast-1:123456789012:function:ProcessInventory",
              "Next": "UpdateStock"
            },
            "UpdateStock": {
              "Type": "Task",
              "Resource": "arn:aws:lambda:ap-northeast-1:123456789012:function:UpdateStock",
              "End": true
            }
          }
        },
        {
          "StartAt": "ProcessPayment",
          "States": {
            "ProcessPayment": {
              "Type": "Task",
              "Resource": "arn:aws:lambda:ap-northeast-1:123456789012:function:ProcessPayment",
              "Next": "SendReceipt"
            },
            "SendReceipt": {
              "Type": "Task",
              "Resource": "arn:aws:lambda:ap-northeast-1:123456789012:function:SendReceipt",
              "End": true
            }
          }
        },
        {
          "StartAt": "UpdateAnalytics",
          "States": {
            "UpdateAnalytics": {
              "Type": "Task",
              "Resource": "arn:aws:lambda:ap-northeast-1:123456789012:function:UpdateAnalytics",
              "End": true
            }
          }
        }
      ],
      "ResultPath": "$.parallelResults",
      "Catch": [
        {
          "ErrorEquals": ["States.ALL"],
          "ResultPath": "$.error",
          "Next": "HandleParallelError"
        }
      ],
      "Next": "FinalizeOrder"
    },
    "FinalizeOrder": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:ap-northeast-1:123456789012:function:FinalizeOrder",
      "End": true
    },
    "HandleParallelError": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:ap-northeast-1:123456789012:function:RollbackOrder",
      "Next": "OrderFailed"
    },
    "OrderFailed": {
      "Type": "Fail",
      "Error": "OrderProcessingFailed",
      "Cause": "平行處理過程中發生錯誤"
    }
  }
}

分散式 Map(Distributed Map)

處理大量資料時使用分散式 Map:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
{
  "ProcessLargeDataset": {
    "Type": "Map",
    "ItemProcessor": {
      "ProcessorConfig": {
        "Mode": "DISTRIBUTED",
        "ExecutionType": "STANDARD"
      },
      "StartAt": "ProcessRecord",
      "States": {
        "ProcessRecord": {
          "Type": "Task",
          "Resource": "arn:aws:lambda:ap-northeast-1:123456789012:function:ProcessRecord",
          "End": true
        }
      }
    },
    "ItemReader": {
      "Resource": "arn:aws:states:::s3:getObject",
      "ReaderConfig": {
        "InputType": "JSON"
      },
      "Parameters": {
        "Bucket": "my-data-bucket",
        "Key": "input/large-dataset.json"
      }
    },
    "MaxConcurrency": 100,
    "ResultWriter": {
      "Resource": "arn:aws:states:::s3:putObject",
      "Parameters": {
        "Bucket": "my-data-bucket",
        "Prefix": "output/results"
      }
    },
    "Next": "AggregateResults"
  }
}

Terraform 部署範例

完整的 Terraform 配置

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# providers.tf
terraform {
  required_version = ">= 1.0.0"

  required_providers {
    aws = {
      source  = "hashicorp/aws"
      version = "~> 5.0"
    }
  }
}

provider "aws" {
  region = var.aws_region
}

# variables.tf
variable "aws_region" {
  description = "AWS 區域"
  type        = string
  default     = "ap-northeast-1"
}

variable "environment" {
  description = "環境名稱"
  type        = string
  default     = "production"
}

variable "project_name" {
  description = "專案名稱"
  type        = string
  default     = "order-processing"
}

# locals.tf
locals {
  common_tags = {
    Project     = var.project_name
    Environment = var.environment
    ManagedBy   = "terraform"
  }
}

Lambda 函式資源

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
# lambda.tf

# Lambda 執行角色
resource "aws_iam_role" "lambda_execution_role" {
  name = "${var.project_name}-lambda-role"

  assume_role_policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Action = "sts:AssumeRole"
        Effect = "Allow"
        Principal = {
          Service = "lambda.amazonaws.com"
        }
      }
    ]
  })

  tags = local.common_tags
}

# Lambda 基本執行政策
resource "aws_iam_role_policy_attachment" "lambda_basic_execution" {
  role       = aws_iam_role.lambda_execution_role.name
  policy_arn = "arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole"
}

# Lambda 自訂政策
resource "aws_iam_role_policy" "lambda_custom_policy" {
  name = "${var.project_name}-lambda-policy"
  role = aws_iam_role.lambda_execution_role.id

  policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Effect = "Allow"
        Action = [
          "dynamodb:GetItem",
          "dynamodb:PutItem",
          "dynamodb:UpdateItem",
          "dynamodb:Query"
        ]
        Resource = aws_dynamodb_table.orders.arn
      },
      {
        Effect = "Allow"
        Action = [
          "sns:Publish"
        ]
        Resource = aws_sns_topic.notifications.arn
      }
    ]
  })
}

# 驗證訂單 Lambda
resource "aws_lambda_function" "validate_order" {
  filename         = "${path.module}/lambda/validate_order.zip"
  function_name    = "${var.project_name}-validate-order"
  role             = aws_iam_role.lambda_execution_role.arn
  handler          = "index.handler"
  runtime          = "python3.11"
  timeout          = 30
  memory_size      = 256

  environment {
    variables = {
      ENVIRONMENT = var.environment
      TABLE_NAME  = aws_dynamodb_table.orders.name
    }
  }

  tags = local.common_tags
}

# 處理付款 Lambda
resource "aws_lambda_function" "process_payment" {
  filename         = "${path.module}/lambda/process_payment.zip"
  function_name    = "${var.project_name}-process-payment"
  role             = aws_iam_role.lambda_execution_role.arn
  handler          = "index.handler"
  runtime          = "python3.11"
  timeout          = 60
  memory_size      = 256

  environment {
    variables = {
      ENVIRONMENT = var.environment
    }
  }

  tags = local.common_tags
}

# 發送通知 Lambda
resource "aws_lambda_function" "send_notification" {
  filename         = "${path.module}/lambda/send_notification.zip"
  function_name    = "${var.project_name}-send-notification"
  role             = aws_iam_role.lambda_execution_role.arn
  handler          = "index.handler"
  runtime          = "python3.11"
  timeout          = 30
  memory_size      = 128

  environment {
    variables = {
      SNS_TOPIC_ARN = aws_sns_topic.notifications.arn
    }
  }

  tags = local.common_tags
}

Step Functions 資源

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
# step_functions.tf

# Step Functions 執行角色
resource "aws_iam_role" "step_functions_role" {
  name = "${var.project_name}-sfn-role"

  assume_role_policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Action = "sts:AssumeRole"
        Effect = "Allow"
        Principal = {
          Service = "states.amazonaws.com"
        }
      }
    ]
  })

  tags = local.common_tags
}

# Step Functions 政策
resource "aws_iam_role_policy" "step_functions_policy" {
  name = "${var.project_name}-sfn-policy"
  role = aws_iam_role.step_functions_role.id

  policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Effect = "Allow"
        Action = [
          "lambda:InvokeFunction"
        ]
        Resource = [
          aws_lambda_function.validate_order.arn,
          aws_lambda_function.process_payment.arn,
          aws_lambda_function.send_notification.arn,
          "${aws_lambda_function.validate_order.arn}:*",
          "${aws_lambda_function.process_payment.arn}:*",
          "${aws_lambda_function.send_notification.arn}:*"
        ]
      },
      {
        Effect = "Allow"
        Action = [
          "logs:CreateLogDelivery",
          "logs:GetLogDelivery",
          "logs:UpdateLogDelivery",
          "logs:DeleteLogDelivery",
          "logs:ListLogDeliveries",
          "logs:PutLogEvents",
          "logs:PutResourcePolicy",
          "logs:DescribeResourcePolicies",
          "logs:DescribeLogGroups"
        ]
        Resource = "*"
      },
      {
        Effect = "Allow"
        Action = [
          "xray:PutTraceSegments",
          "xray:PutTelemetryRecords",
          "xray:GetSamplingRules",
          "xray:GetSamplingTargets"
        ]
        Resource = "*"
      }
    ]
  })
}

# CloudWatch Log Group for Step Functions
resource "aws_cloudwatch_log_group" "step_functions_logs" {
  name              = "/aws/vendedlogs/states/${var.project_name}-workflow"
  retention_in_days = 30

  tags = local.common_tags
}

# Step Functions 狀態機
resource "aws_sfn_state_machine" "order_processing" {
  name     = "${var.project_name}-workflow"
  role_arn = aws_iam_role.step_functions_role.arn

  definition = jsonencode({
    Comment = "訂單處理工作流程"
    StartAt = "ValidateOrder"
    States = {
      ValidateOrder = {
        Type     = "Task"
        Resource = aws_lambda_function.validate_order.arn
        Next     = "CheckOrderType"
        Retry = [
          {
            ErrorEquals     = ["Lambda.ServiceException", "Lambda.TooManyRequestsException"]
            IntervalSeconds = 2
            MaxAttempts     = 3
            BackoffRate     = 2
          }
        ]
        Catch = [
          {
            ErrorEquals = ["ValidationError"]
            ResultPath  = "$.error"
            Next        = "OrderFailed"
          }
        ]
      }
      CheckOrderType = {
        Type = "Choice"
        Choices = [
          {
            Variable              = "$.orderType"
            StringEquals          = "express"
            Next                  = "ExpressProcessing"
          },
          {
            Variable              = "$.totalAmount"
            NumericGreaterThan    = 10000
            Next                  = "HighValueProcessing"
          }
        ]
        Default = "StandardProcessing"
      }
      ExpressProcessing = {
        Type = "Parallel"
        Branches = [
          {
            StartAt = "ProcessPaymentExpress"
            States = {
              ProcessPaymentExpress = {
                Type     = "Task"
                Resource = aws_lambda_function.process_payment.arn
                End      = true
              }
            }
          },
          {
            StartAt = "SendExpressNotification"
            States = {
              SendExpressNotification = {
                Type     = "Task"
                Resource = aws_lambda_function.send_notification.arn
                End      = true
              }
            }
          }
        ]
        Next = "OrderCompleted"
      }
      HighValueProcessing = {
        Type = "Task"
        Resource = "arn:aws:states:::lambda:invoke.waitForTaskToken"
        Parameters = {
          FunctionName = aws_lambda_function.process_payment.arn
          Payload = {
            "taskToken.$"  = "$$.Task.Token"
            "orderData.$"  = "$"
            "requiresApproval" = true
          }
        }
        TimeoutSeconds = 86400
        Next           = "OrderCompleted"
      }
      StandardProcessing = {
        Type     = "Task"
        Resource = aws_lambda_function.process_payment.arn
        Next     = "SendNotification"
        Retry = [
          {
            ErrorEquals     = ["PaymentGatewayError"]
            IntervalSeconds = 5
            MaxAttempts     = 3
            BackoffRate     = 1.5
          }
        ]
      }
      SendNotification = {
        Type     = "Task"
        Resource = aws_lambda_function.send_notification.arn
        Next     = "OrderCompleted"
      }
      OrderCompleted = {
        Type = "Succeed"
      }
      OrderFailed = {
        Type  = "Fail"
        Error = "OrderProcessingFailed"
        Cause = "訂單處理失敗"
      }
    }
  })

  logging_configuration {
    log_destination        = "${aws_cloudwatch_log_group.step_functions_logs.arn}:*"
    include_execution_data = true
    level                  = "ALL"
  }

  tracing_configuration {
    enabled = true
  }

  tags = local.common_tags
}

支援資源

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
# supporting_resources.tf

# DynamoDB 表格
resource "aws_dynamodb_table" "orders" {
  name           = "${var.project_name}-orders"
  billing_mode   = "PAY_PER_REQUEST"
  hash_key       = "orderId"

  attribute {
    name = "orderId"
    type = "S"
  }

  attribute {
    name = "customerId"
    type = "S"
  }

  global_secondary_index {
    name            = "CustomerIndex"
    hash_key        = "customerId"
    projection_type = "ALL"
  }

  tags = local.common_tags
}

# SNS Topic
resource "aws_sns_topic" "notifications" {
  name = "${var.project_name}-notifications"

  tags = local.common_tags
}

# EventBridge 規則 - 啟動工作流程
resource "aws_cloudwatch_event_rule" "order_created" {
  name        = "${var.project_name}-order-created"
  description = "觸發訂單處理工作流程"

  event_pattern = jsonencode({
    source      = ["custom.orders"]
    detail-type = ["OrderCreated"]
  })

  tags = local.common_tags
}

resource "aws_cloudwatch_event_target" "step_functions" {
  rule      = aws_cloudwatch_event_rule.order_created.name
  target_id = "StartOrderProcessing"
  arn       = aws_sfn_state_machine.order_processing.arn
  role_arn  = aws_iam_role.eventbridge_sfn_role.arn
}

# EventBridge 呼叫 Step Functions 的角色
resource "aws_iam_role" "eventbridge_sfn_role" {
  name = "${var.project_name}-eventbridge-sfn-role"

  assume_role_policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Action = "sts:AssumeRole"
        Effect = "Allow"
        Principal = {
          Service = "events.amazonaws.com"
        }
      }
    ]
  })

  tags = local.common_tags
}

resource "aws_iam_role_policy" "eventbridge_sfn_policy" {
  name = "${var.project_name}-eventbridge-sfn-policy"
  role = aws_iam_role.eventbridge_sfn_role.id

  policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Effect   = "Allow"
        Action   = "states:StartExecution"
        Resource = aws_sfn_state_machine.order_processing.arn
      }
    ]
  })
}

# outputs.tf
output "state_machine_arn" {
  description = "Step Functions 狀態機 ARN"
  value       = aws_sfn_state_machine.order_processing.arn
}

output "state_machine_name" {
  description = "Step Functions 狀態機名稱"
  value       = aws_sfn_state_machine.order_processing.name
}

output "lambda_functions" {
  description = "Lambda 函式 ARN"
  value = {
    validate_order    = aws_lambda_function.validate_order.arn
    process_payment   = aws_lambda_function.process_payment.arn
    send_notification = aws_lambda_function.send_notification.arn
  }
}

部署命令

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
# 初始化 Terraform
terraform init

# 驗證配置
terraform validate

# 查看執行計畫
terraform plan -out=tfplan

# 執行部署
terraform apply tfplan

# 取得輸出值
terraform output state_machine_arn

監控與故障排除

CloudWatch 指標監控

Step Functions 自動發布以下指標到 CloudWatch:

指標名稱說明
ExecutionsStarted已啟動的執行次數
ExecutionsSucceeded成功完成的執行次數
ExecutionsFailed失敗的執行次數
ExecutionsAborted中止的執行次數
ExecutionsTimedOut超時的執行次數
ExecutionTime執行時間(毫秒)

CloudWatch 警報配置

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# cloudwatch_alarms.tf

resource "aws_cloudwatch_metric_alarm" "execution_failed" {
  alarm_name          = "${var.project_name}-execution-failed"
  comparison_operator = "GreaterThanThreshold"
  evaluation_periods  = 1
  metric_name         = "ExecutionsFailed"
  namespace           = "AWS/States"
  period              = 300
  statistic           = "Sum"
  threshold           = 0
  alarm_description   = "Step Functions 執行失敗警報"

  dimensions = {
    StateMachineArn = aws_sfn_state_machine.order_processing.arn
  }

  alarm_actions = [aws_sns_topic.notifications.arn]

  tags = local.common_tags
}

resource "aws_cloudwatch_metric_alarm" "execution_throttled" {
  alarm_name          = "${var.project_name}-execution-throttled"
  comparison_operator = "GreaterThanThreshold"
  evaluation_periods  = 1
  metric_name         = "ExecutionThrottled"
  namespace           = "AWS/States"
  period              = 60
  statistic           = "Sum"
  threshold           = 5
  alarm_description   = "Step Functions 執行被限流警報"

  dimensions = {
    StateMachineArn = aws_sfn_state_machine.order_processing.arn
  }

  alarm_actions = [aws_sns_topic.notifications.arn]

  tags = local.common_tags
}

resource "aws_cloudwatch_metric_alarm" "long_execution_time" {
  alarm_name          = "${var.project_name}-long-execution"
  comparison_operator = "GreaterThanThreshold"
  evaluation_periods  = 3
  metric_name         = "ExecutionTime"
  namespace           = "AWS/States"
  period              = 300
  statistic           = "Average"
  threshold           = 300000  # 5 分鐘
  alarm_description   = "Step Functions 執行時間過長警報"

  dimensions = {
    StateMachineArn = aws_sfn_state_machine.order_processing.arn
  }

  alarm_actions = [aws_sns_topic.notifications.arn]

  tags = local.common_tags
}

CloudWatch Logs Insights 查詢

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
-- 查詢失敗的執行
fields @timestamp, execution_arn, type, details.error, details.cause
| filter type = "ExecutionFailed" or type = "TaskFailed"
| sort @timestamp desc
| limit 50

-- 查詢執行時間統計
fields @timestamp, execution_arn
| filter type = "ExecutionSucceeded"
| stats avg(duration) as avg_duration,
        max(duration) as max_duration,
        min(duration) as min_duration
  by bin(1h)

-- 查詢特定訂單的執行追蹤
fields @timestamp, type, details
| filter execution_arn like /order-12345/
| sort @timestamp asc

AWS CLI 故障排除命令

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# 列出最近的執行
aws stepfunctions list-executions \
    --state-machine-arn arn:aws:states:ap-northeast-1:123456789012:stateMachine:order-processing \
    --status-filter FAILED \
    --max-results 10

# 取得執行詳細資訊
aws stepfunctions describe-execution \
    --execution-arn arn:aws:states:ap-northeast-1:123456789012:execution:order-processing:exec-123

# 取得執行歷史記錄
aws stepfunctions get-execution-history \
    --execution-arn arn:aws:states:ap-northeast-1:123456789012:execution:order-processing:exec-123 \
    --reverse-order

# 停止執行中的工作流程
aws stepfunctions stop-execution \
    --execution-arn arn:aws:states:ap-northeast-1:123456789012:execution:order-processing:exec-123 \
    --error "ManualStop" \
    --cause "手動停止執行"

# 手動啟動執行
aws stepfunctions start-execution \
    --state-machine-arn arn:aws:states:ap-northeast-1:123456789012:stateMachine:order-processing \
    --name "manual-test-$(date +%s)" \
    --input '{"orderId": "test-001", "items": [{"name": "Product A", "quantity": 1, "price": 100}]}'

X-Ray 追蹤

啟用 X-Ray 追蹤以獲得更詳細的效能分析:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# Lambda 函式中啟用 X-Ray
import json
from aws_xray_sdk.core import xray_recorder
from aws_xray_sdk.core import patch_all

patch_all()

@xray_recorder.capture('process_order')
def lambda_handler(event, context):
    # 新增自訂註解
    xray_recorder.put_annotation('orderId', event.get('orderId'))

    # 新增自訂中繼資料
    xray_recorder.put_metadata('orderDetails', event)

    with xray_recorder.in_subsegment('validate_input'):
        # 驗證輸入邏輯
        validate_order(event)

    with xray_recorder.in_subsegment('process_payment'):
        # 處理付款邏輯
        result = process_payment(event)

    return result

常見問題與解決方案

問題可能原因解決方案
執行超時Lambda 冷啟動、外部 API 延遲增加 TimeoutSeconds、使用 Provisioned Concurrency
權限錯誤IAM 角色權限不足檢查並更新 Step Functions 執行角色政策
狀態機定義錯誤ASL 語法錯誤使用 Workflow Studio 驗證定義
輸入/輸出錯誤JSONPath 表達式錯誤使用 AWS Console 測試執行並檢查每個狀態的輸入輸出
限流錯誤超過服務配額申請增加配額或實作退避重試

總結

AWS Step Functions 與 Lambda 的結合為建構複雜的無伺服器應用程式提供了強大的基礎。透過本文介紹的概念和實踐,您可以:

  1. 設計可靠的工作流程:利用 ASL 定義清晰的狀態轉換和資料流
  2. 實現強健的錯誤處理:透過 Retry 和 Catch 機制確保應用程式的韌性
  3. 優化效能:使用平行執行和分散式 Map 提高處理效率
  4. 基礎設施即程式碼:使用 Terraform 管理和版本控制您的工作流程
  5. 有效監控與除錯:利用 CloudWatch 和 X-Ray 快速定位問題

掌握這些技術後,您將能夠建構出更加可靠、可擴展且易於維護的雲端應用程式。

comments powered by Disqus
Built with Hugo
Theme Stack designed by Jimmy