1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
| import boto3
import json
import logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def lambda_handler(event, context):
"""密鑰輪換 Lambda 處理程式"""
arn = event['SecretId']
token = event['ClientRequestToken']
step = event['Step']
client = boto3.client('secretsmanager')
# 輪換包含四個步驟
if step == "createSecret":
create_secret(client, arn, token)
elif step == "setSecret":
set_secret(client, arn, token)
elif step == "testSecret":
test_secret(client, arn, token)
elif step == "finishSecret":
finish_secret(client, arn, token)
else:
raise ValueError(f"無效的步驟: {step}")
def create_secret(client, arn, token):
"""建立新的密鑰版本"""
# 取得目前密鑰
current_secret = client.get_secret_value(
SecretId=arn,
VersionStage="AWSCURRENT"
)
# 產生新密碼
new_password = client.get_random_password(
PasswordLength=32,
ExcludeCharacters='"@/\\'
)['RandomPassword']
# 更新密鑰值
secret_dict = json.loads(current_secret['SecretString'])
secret_dict['password'] = new_password
# 儲存為待處理版本
client.put_secret_value(
SecretId=arn,
ClientRequestToken=token,
SecretString=json.dumps(secret_dict),
VersionStages=['AWSPENDING']
)
logger.info(f"已建立新的密鑰版本: {token}")
def set_secret(client, arn, token):
"""在資料庫中設定新密碼"""
# 取得待處理密鑰
pending = client.get_secret_value(
SecretId=arn,
VersionId=token,
VersionStage="AWSPENDING"
)
secret_dict = json.loads(pending['SecretString'])
# 連接資料庫並更新密碼
# 這裡需要根據您的資料庫類型實作
update_database_password(secret_dict)
logger.info("資料庫密碼已更新")
def test_secret(client, arn, token):
"""測試新密鑰是否可用"""
pending = client.get_secret_value(
SecretId=arn,
VersionId=token,
VersionStage="AWSPENDING"
)
secret_dict = json.loads(pending['SecretString'])
# 嘗試使用新憑證連接資料庫
if not test_database_connection(secret_dict):
raise ValueError("新密鑰測試失敗")
logger.info("新密鑰測試成功")
def finish_secret(client, arn, token):
"""完成輪換,將待處理版本設為目前版本"""
# 取得目前版本 ID
metadata = client.describe_secret(SecretId=arn)
current_version = None
for version_id, stages in metadata['VersionIdsToStages'].items():
if 'AWSCURRENT' in stages:
current_version = version_id
break
# 更新版本階段
client.update_secret_version_stage(
SecretId=arn,
VersionStage='AWSCURRENT',
MoveToVersionId=token,
RemoveFromVersionId=current_version
)
logger.info(f"密鑰輪換完成: {token}")
|