雲端 KMS 整合憑證管理

Cloud KMS Integration with Certificate Management

在現代企業環境中,憑證管理與金鑰管理是資安架構的核心組件。本文將深入探討如何利用各大雲端平台的金鑰管理服務(KMS)來強化憑證安全,實現私鑰保護與自動化憑證生命週期管理。

雲端 KMS 概述

金鑰管理服務(Key Management Service,KMS)是雲端平台提供的託管式加密金鑰管理解決方案。它讓企業能夠在雲端環境中安全地建立、儲存、管理和使用加密金鑰,而無需自行維護硬體安全模組(HSM)基礎設施。

KMS 的核心功能

功能說明
金鑰生成使用經過認證的隨機數生成器產生加密金鑰
金鑰儲存將金鑰安全儲存於 HSM 或軟體保護環境中
金鑰輪替自動或手動更換金鑰以降低洩漏風險
存取控制細粒度的 IAM 策略控制金鑰使用權限
稽核日誌記錄所有金鑰操作供合規審查

主流雲端 KMS 比較

1
2
3
4
5
6
7
8
9
┌─────────────────┬──────────────────┬──────────────────┬──────────────────┐
│     特性        │    AWS KMS       │ Azure Key Vault  │  GCP Cloud KMS   │
├─────────────────┼──────────────────┼──────────────────┼──────────────────┤
│ HSM 支援        │ CloudHSM 整合    │ HSM 層級可選     │ HSM 保護金鑰     │
│ 自動輪替        │ 支援(年度)     │ 支援(可自訂)   │ 支援(可自訂)   │
│ 區域複寫        │ Multi-Region     │ 異地備援         │ 全域金鑰         │
│ FIPS 認證       │ FIPS 140-2 L3    │ FIPS 140-2 L2/L3 │ FIPS 140-2 L3    │
│ 價格模式        │ 按請求計費       │ 按操作計費       │ 按版本計費       │
└─────────────────┴──────────────────┴──────────────────┴──────────────────┘

AWS KMS 與憑證整合

AWS KMS 提供了強大的金鑰管理功能,可與 AWS Certificate Manager (ACM) 及其他憑證服務深度整合。

建立 KMS 金鑰用於憑證簽署

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
# 建立非對稱簽署金鑰
aws kms create-key \
    --key-usage SIGN_VERIFY \
    --key-spec RSA_2048 \
    --description "Certificate Signing Key" \
    --tags TagKey=Purpose,TagValue=CertificateSigning

# 建立金鑰別名
aws kms create-alias \
    --alias-name alias/cert-signing-key \
    --target-key-id <key-id>

使用 KMS 金鑰生成 CSR

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
import boto3
from cryptography import x509
from cryptography.x509.oid import NameOID
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import padding
import base64

class KMSCertificateManager:
    def __init__(self, key_id):
        self.kms_client = boto3.client('kms')
        self.key_id = key_id

    def get_public_key(self):
        """取得 KMS 金鑰的公鑰"""
        response = self.kms_client.get_public_key(KeyId=self.key_id)
        return response['PublicKey']

    def sign_data(self, data, algorithm='RSASSA_PKCS1_V1_5_SHA_256'):
        """使用 KMS 金鑰簽署資料"""
        response = self.kms_client.sign(
            KeyId=self.key_id,
            Message=data,
            MessageType='RAW',
            SigningAlgorithm=algorithm
        )
        return response['Signature']

    def generate_csr(self, common_name, organization, country='TW'):
        """生成憑證簽署請求 (CSR)"""
        from cryptography.hazmat.primitives.serialization import (
            load_der_public_key, Encoding
        )

        # 載入公鑰
        public_key = load_der_public_key(self.get_public_key())

        # 建立 CSR 屬性
        subject = x509.Name([
            x509.NameAttribute(NameOID.COUNTRY_NAME, country),
            x509.NameAttribute(NameOID.ORGANIZATION_NAME, organization),
            x509.NameAttribute(NameOID.COMMON_NAME, common_name),
        ])

        # 建立 CSR(需要自訂簽署流程以使用 KMS)
        csr_builder = x509.CertificateSigningRequestBuilder().subject_name(subject)

        return csr_builder, public_key

# 使用範例
if __name__ == "__main__":
    manager = KMSCertificateManager('alias/cert-signing-key')
    public_key = manager.get_public_key()
    print(f"Public key retrieved: {len(public_key)} bytes")

ACM Private CA 與 KMS 整合

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# 建立私有 CA(使用 KMS 加密)
aws acm-pca create-certificate-authority \
    --certificate-authority-configuration '{
        "KeyAlgorithm": "RSA_2048",
        "SigningAlgorithm": "SHA256WITHRSA",
        "Subject": {
            "Country": "TW",
            "Organization": "Example Corp",
            "CommonName": "Example Private CA"
        }
    }' \
    --certificate-authority-type "ROOT" \
    --tags Key=Environment,Value=Production

# 取得 CA CSR
aws acm-pca get-certificate-authority-csr \
    --certificate-authority-arn <ca-arn> \
    --output text > ca-csr.pem

# 使用 KMS 金鑰簽署 CA 憑證(透過 Lambda 函數)
aws lambda invoke \
    --function-name sign-ca-certificate \
    --payload '{"csr": "ca-csr.pem", "kms_key_id": "alias/root-ca-key"}' \
    response.json

Terraform 配置 AWS KMS 與憑證

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
# providers.tf
provider "aws" {
  region = "ap-northeast-1"
}

# KMS 金鑰
resource "aws_kms_key" "certificate_key" {
  description              = "KMS key for certificate signing"
  key_usage               = "SIGN_VERIFY"
  customer_master_key_spec = "RSA_2048"
  deletion_window_in_days = 30
  enable_key_rotation     = false  # 非對稱金鑰不支援自動輪替

  policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Sid    = "Enable IAM User Permissions"
        Effect = "Allow"
        Principal = {
          AWS = "arn:aws:iam::${data.aws_caller_identity.current.account_id}:root"
        }
        Action   = "kms:*"
        Resource = "*"
      },
      {
        Sid    = "Allow Certificate Manager"
        Effect = "Allow"
        Principal = {
          Service = "acm-pca.amazonaws.com"
        }
        Action = [
          "kms:Sign",
          "kms:GetPublicKey"
        ]
        Resource = "*"
      }
    ]
  })

  tags = {
    Purpose     = "CertificateSigning"
    Environment = "Production"
  }
}

resource "aws_kms_alias" "certificate_key_alias" {
  name          = "alias/certificate-signing-key"
  target_key_id = aws_kms_key.certificate_key.key_id
}

# ACM Private CA
resource "aws_acmpca_certificate_authority" "private_ca" {
  type = "ROOT"

  certificate_authority_configuration {
    key_algorithm     = "RSA_2048"
    signing_algorithm = "SHA256WITHRSA"

    subject {
      common_name         = "Example Private CA"
      country             = "TW"
      organization        = "Example Corp"
      organizational_unit = "Security Team"
    }
  }

  permanent_deletion_time_in_days = 7

  tags = {
    Name        = "private-ca"
    Environment = "Production"
  }
}

data "aws_caller_identity" "current" {}

output "kms_key_arn" {
  value = aws_kms_key.certificate_key.arn
}

output "ca_arn" {
  value = aws_acmpca_certificate_authority.private_ca.arn
}

Azure Key Vault 整合

Azure Key Vault 提供了完整的憑證管理功能,包含憑證的生成、儲存、續訂及整合部署能力。

建立 Key Vault 與憑證

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
# 建立資源群組
az group create \
    --name rg-certificates \
    --location eastasia

# 建立 Key Vault(啟用軟刪除和清除保護)
az keyvault create \
    --name kv-cert-management \
    --resource-group rg-certificates \
    --location eastasia \
    --sku premium \
    --enable-soft-delete true \
    --enable-purge-protection true \
    --enable-rbac-authorization true

# 指派 Key Vault 管理員角色
az role assignment create \
    --role "Key Vault Administrator" \
    --assignee <user-principal-id> \
    --scope /subscriptions/<subscription-id>/resourceGroups/rg-certificates/providers/Microsoft.KeyVault/vaults/kv-cert-management

建立憑證策略並生成憑證

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# 建立憑證策略
az keyvault certificate create \
    --vault-name kv-cert-management \
    --name web-server-cert \
    --policy '{
        "issuerParameters": {
            "name": "Self"
        },
        "keyProperties": {
            "exportable": false,
            "keySize": 2048,
            "keyType": "RSA",
            "reuseKey": true
        },
        "secretProperties": {
            "contentType": "application/x-pkcs12"
        },
        "x509CertificateProperties": {
            "keyUsage": [
                "digitalSignature",
                "keyEncipherment"
            ],
            "extendedKeyUsage": ["1.3.6.1.5.5.7.3.1"],
            "subject": "CN=web.example.com,O=Example Corp,C=TW",
            "validityInMonths": 12,
            "subjectAlternativeNames": {
                "dnsNames": [
                    "web.example.com",
                    "www.example.com"
                ]
            }
        },
        "lifetimeActions": [
            {
                "action": {
                    "actionType": "AutoRenew"
                },
                "trigger": {
                    "daysBeforeExpiry": 30
                }
            }
        ]
    }'

# 取得憑證資訊
az keyvault certificate show \
    --vault-name kv-cert-management \
    --name web-server-cert

使用 Python SDK 管理憑證

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
from azure.identity import DefaultAzureCredential
from azure.keyvault.certificates import (
    CertificateClient,
    CertificatePolicy,
    CertificateContentType,
    WellKnownIssuerNames
)
from azure.keyvault.keys import KeyClient
from azure.keyvault.keys.crypto import CryptographyClient, SignatureAlgorithm
import datetime

class AzureKeyVaultCertManager:
    def __init__(self, vault_url):
        self.credential = DefaultAzureCredential()
        self.vault_url = vault_url
        self.cert_client = CertificateClient(
            vault_url=vault_url,
            credential=self.credential
        )
        self.key_client = KeyClient(
            vault_url=vault_url,
            credential=self.credential
        )

    def create_self_signed_certificate(self, cert_name, subject,
                                        validity_months=12, dns_names=None):
        """建立自簽憑證"""
        policy = CertificatePolicy(
            issuer_name=WellKnownIssuerNames.self,
            subject=subject,
            exportable=False,
            key_type="RSA",
            key_size=2048,
            reuse_key=True,
            content_type=CertificateContentType.pkcs12,
            validity_in_months=validity_months,
            san_dns_names=dns_names or []
        )

        # 開始建立憑證
        poller = self.cert_client.begin_create_certificate(
            certificate_name=cert_name,
            policy=policy
        )

        # 等待完成
        certificate = poller.result()
        return certificate

    def get_certificate(self, cert_name):
        """取得憑證"""
        return self.cert_client.get_certificate(cert_name)

    def sign_with_certificate_key(self, cert_name, data):
        """使用憑證的金鑰進行簽署"""
        # 取得憑證關聯的金鑰
        cert = self.cert_client.get_certificate(cert_name)
        key = self.key_client.get_key(cert_name)

        # 建立加密客戶端
        crypto_client = CryptographyClient(
            key=key,
            credential=self.credential
        )

        # 簽署資料
        result = crypto_client.sign(
            algorithm=SignatureAlgorithm.rs256,
            digest=data
        )

        return result.signature

    def list_certificates(self, include_pending=True):
        """列出所有憑證"""
        return list(self.cert_client.list_properties_of_certificates(
            include_pending=include_pending
        ))

    def get_certificate_versions(self, cert_name):
        """取得憑證的所有版本"""
        return list(self.cert_client.list_properties_of_certificate_versions(
            certificate_name=cert_name
        ))

# 使用範例
if __name__ == "__main__":
    vault_url = "https://kv-cert-management.vault.azure.net/"
    manager = AzureKeyVaultCertManager(vault_url)

    # 建立憑證
    cert = manager.create_self_signed_certificate(
        cert_name="api-server-cert",
        subject="CN=api.example.com,O=Example Corp,C=TW",
        validity_months=12,
        dns_names=["api.example.com", "api-internal.example.com"]
    )

    print(f"Certificate created: {cert.name}")
    print(f"Thumbprint: {cert.properties.x509_thumbprint.hex()}")

整合 Azure Application Gateway

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
# Terraform 配置 Application Gateway 使用 Key Vault 憑證

# Key Vault
resource "azurerm_key_vault" "cert_vault" {
  name                       = "kv-cert-management"
  location                   = azurerm_resource_group.main.location
  resource_group_name        = azurerm_resource_group.main.name
  tenant_id                  = data.azurerm_client_config.current.tenant_id
  sku_name                   = "premium"
  soft_delete_retention_days = 90
  purge_protection_enabled   = true

  enable_rbac_authorization = true
}

# 使用者指派的受控識別
resource "azurerm_user_assigned_identity" "appgw_identity" {
  name                = "appgw-identity"
  location            = azurerm_resource_group.main.location
  resource_group_name = azurerm_resource_group.main.name
}

# 授權受控識別存取 Key Vault
resource "azurerm_role_assignment" "appgw_keyvault_access" {
  scope                = azurerm_key_vault.cert_vault.id
  role_definition_name = "Key Vault Secrets User"
  principal_id         = azurerm_user_assigned_identity.appgw_identity.principal_id
}

# 在 Key Vault 中建立憑證
resource "azurerm_key_vault_certificate" "web_cert" {
  name         = "web-server-cert"
  key_vault_id = azurerm_key_vault.cert_vault.id

  certificate_policy {
    issuer_parameters {
      name = "Self"
    }

    key_properties {
      exportable = false
      key_size   = 2048
      key_type   = "RSA"
      reuse_key  = true
    }

    secret_properties {
      content_type = "application/x-pkcs12"
    }

    x509_certificate_properties {
      extended_key_usage = ["1.3.6.1.5.5.7.3.1"]
      key_usage = [
        "digitalSignature",
        "keyEncipherment"
      ]
      subject            = "CN=web.example.com"
      validity_in_months = 12

      subject_alternative_names {
        dns_names = ["web.example.com", "www.example.com"]
      }
    }

    lifetime_action {
      action {
        action_type = "AutoRenew"
      }
      trigger {
        days_before_expiry = 30
      }
    }
  }
}

# Application Gateway
resource "azurerm_application_gateway" "main" {
  name                = "appgw-main"
  location            = azurerm_resource_group.main.location
  resource_group_name = azurerm_resource_group.main.name

  sku {
    name     = "Standard_v2"
    tier     = "Standard_v2"
    capacity = 2
  }

  identity {
    type         = "UserAssigned"
    identity_ids = [azurerm_user_assigned_identity.appgw_identity.id]
  }

  ssl_certificate {
    name                = "web-ssl-cert"
    key_vault_secret_id = azurerm_key_vault_certificate.web_cert.secret_id
  }

  # ... 其他配置
}

GCP Cloud KMS 整合

Google Cloud KMS 提供了強大的金鑰管理功能,可與 Certificate Authority Service 整合實現企業級 PKI。

建立 Cloud KMS 金鑰環與金鑰

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
# 建立金鑰環
gcloud kms keyrings create certificate-keyring \
    --location=asia-east1

# 建立非對稱簽署金鑰
gcloud kms keys create cert-signing-key \
    --location=asia-east1 \
    --keyring=certificate-keyring \
    --purpose=asymmetric-signing \
    --default-algorithm=rsa-sign-pkcs1-2048-sha256 \
    --protection-level=hsm

# 取得公鑰
gcloud kms keys versions get-public-key 1 \
    --location=asia-east1 \
    --keyring=certificate-keyring \
    --key=cert-signing-key \
    --output-file=public-key.pem

使用 Cloud KMS 簽署憑證

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
from google.cloud import kms
from cryptography import x509
from cryptography.x509.oid import NameOID
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import padding
import hashlib
import datetime

class GCPKMSCertificateManager:
    def __init__(self, project_id, location, keyring, key_name, key_version="1"):
        self.client = kms.KeyManagementServiceClient()
        self.key_version_name = self.client.crypto_key_version_path(
            project_id, location, keyring, key_name, key_version
        )
        self.project_id = project_id

    def get_public_key(self):
        """取得 KMS 金鑰的公鑰"""
        response = self.client.get_public_key(
            request={"name": self.key_version_name}
        )
        return serialization.load_pem_public_key(
            response.pem.encode('utf-8')
        )

    def sign_digest(self, digest):
        """使用 KMS 金鑰簽署摘要"""
        # 計算 SHA256 摘要
        digest_crc32c = self._crc32c(digest)

        response = self.client.asymmetric_sign(
            request={
                "name": self.key_version_name,
                "digest": {"sha256": digest},
                "digest_crc32c": digest_crc32c
            }
        )

        # 驗證回應完整性
        if not response.verified_digest_crc32c:
            raise Exception("Request corrupted in-transit")
        if response.name != self.key_version_name:
            raise Exception("Response corrupted in-transit")

        return response.signature

    def _crc32c(self, data):
        """計算 CRC32C 校驗和"""
        import crc32c
        return crc32c.crc32c(data)

    def create_ca_certificate(self, common_name, organization, validity_days=365):
        """建立 CA 憑證"""
        public_key = self.get_public_key()

        # 建立憑證
        subject = issuer = x509.Name([
            x509.NameAttribute(NameOID.COUNTRY_NAME, "TW"),
            x509.NameAttribute(NameOID.ORGANIZATION_NAME, organization),
            x509.NameAttribute(NameOID.COMMON_NAME, common_name),
        ])

        now = datetime.datetime.utcnow()

        cert_builder = (
            x509.CertificateBuilder()
            .subject_name(subject)
            .issuer_name(issuer)
            .public_key(public_key)
            .serial_number(x509.random_serial_number())
            .not_valid_before(now)
            .not_valid_after(now + datetime.timedelta(days=validity_days))
            .add_extension(
                x509.BasicConstraints(ca=True, path_length=0),
                critical=True,
            )
            .add_extension(
                x509.KeyUsage(
                    digital_signature=True,
                    content_commitment=False,
                    key_encipherment=False,
                    data_encipherment=False,
                    key_agreement=False,
                    key_cert_sign=True,
                    crl_sign=True,
                    encipher_only=False,
                    decipher_only=False,
                ),
                critical=True,
            )
        )

        # 取得待簽署的資料
        tbs_certificate = cert_builder._build_tbs_certificate()
        tbs_bytes = tbs_certificate.public_bytes(serialization.Encoding.DER)

        # 計算摘要並簽署
        digest = hashlib.sha256(tbs_bytes).digest()
        signature = self.sign_digest(digest)

        return cert_builder, signature

# 使用範例
if __name__ == "__main__":
    manager = GCPKMSCertificateManager(
        project_id="my-project",
        location="asia-east1",
        keyring="certificate-keyring",
        key_name="cert-signing-key"
    )

    public_key = manager.get_public_key()
    print(f"Public key type: {type(public_key).__name__}")

整合 Certificate Authority Service

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# 建立 CA Pool
gcloud privateca pools create enterprise-ca-pool \
    --location=asia-east1 \
    --tier=enterprise

# 建立根 CA(使用 Cloud KMS 金鑰)
gcloud privateca roots create enterprise-root-ca \
    --pool=enterprise-ca-pool \
    --location=asia-east1 \
    --kms-key-version="projects/my-project/locations/asia-east1/keyRings/certificate-keyring/cryptoKeys/cert-signing-key/cryptoKeyVersions/1" \
    --subject="CN=Enterprise Root CA,O=Example Corp,C=TW" \
    --validity="10y" \
    --max-chain-length=1

# 啟用 CA
gcloud privateca roots enable enterprise-root-ca \
    --pool=enterprise-ca-pool \
    --location=asia-east1

# 簽發終端實體憑證
gcloud privateca certificates create web-server-cert \
    --issuer-pool=enterprise-ca-pool \
    --issuer-location=asia-east1 \
    --generate-key \
    --key-output-file=private-key.pem \
    --cert-output-file=certificate.pem \
    --dns-san="web.example.com,www.example.com" \
    --subject="CN=web.example.com,O=Example Corp,C=TW" \
    --validity="P365D"

Terraform 配置 GCP CA Service

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
# providers.tf
provider "google" {
  project = var.project_id
  region  = var.region
}

# KMS 金鑰環
resource "google_kms_key_ring" "certificate_keyring" {
  name     = "certificate-keyring"
  location = var.region
}

# KMS 非對稱簽署金鑰
resource "google_kms_crypto_key" "ca_signing_key" {
  name     = "ca-signing-key"
  key_ring = google_kms_key_ring.certificate_keyring.id
  purpose  = "ASYMMETRIC_SIGN"

  version_template {
    algorithm        = "RSA_SIGN_PKCS1_2048_SHA256"
    protection_level = "HSM"
  }

  lifecycle {
    prevent_destroy = true
  }
}

# CA Pool
resource "google_privateca_ca_pool" "enterprise_pool" {
  name     = "enterprise-ca-pool"
  location = var.region
  tier     = "ENTERPRISE"

  publishing_options {
    publish_ca_cert = true
    publish_crl     = true
  }

  issuance_policy {
    allowed_key_types {
      rsa {
        min_modulus_size = 2048
        max_modulus_size = 4096
      }
    }

    allowed_issuance_modes {
      allow_csr_based_issuance    = true
      allow_config_based_issuance = true
    }

    maximum_lifetime = "31536000s"  # 1 年
  }
}

# 根 CA
resource "google_privateca_certificate_authority" "root_ca" {
  pool                     = google_privateca_ca_pool.enterprise_pool.name
  certificate_authority_id = "enterprise-root-ca"
  location                 = var.region
  type                     = "SELF_SIGNED"

  config {
    subject_config {
      subject {
        country_code        = "TW"
        organization        = "Example Corp"
        organizational_unit = "Security"
        common_name         = "Enterprise Root CA"
      }
    }

    x509_config {
      ca_options {
        is_ca                  = true
        max_issuer_path_length = 1
      }

      key_usage {
        base_key_usage {
          cert_sign = true
          crl_sign  = true
        }
        extended_key_usage {
          server_auth = false
          client_auth = false
        }
      }
    }
  }

  key_spec {
    cloud_kms_key_version = google_kms_crypto_key.ca_signing_key.id
  }

  lifetime = "315360000s"  # 10 年

  depends_on = [google_kms_crypto_key.ca_signing_key]
}

# 輸出
output "ca_pool_id" {
  value = google_privateca_ca_pool.enterprise_pool.id
}

output "root_ca_id" {
  value = google_privateca_certificate_authority.root_ca.id
}

私鑰安全儲存

私鑰是憑證安全的核心,必須採用適當的保護措施。

金鑰保護等級

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
┌────────────────────────────────────────────────────────────────────────┐
│                        金鑰保護等級金字塔                               │
├────────────────────────────────────────────────────────────────────────┤
│                                                                        │
│                          ┌──────────┐                                  │
│                          │   HSM    │  ← 最高安全等級                  │
│                          │ FIPS L3  │    硬體保護、防篡改              │
│                        ┌─┴──────────┴─┐                                │
│                        │   Cloud HSM   │  ← 雲端 HSM 服務              │
│                        │   FIPS L2/L3  │    專用硬體、合規認證         │
│                      ┌─┴───────────────┴─┐                             │
│                      │    Managed KMS     │  ← 雲端 KMS 標準           │
│                      │   軟體保護+HSM後端  │    便利性與安全性平衡      │
│                    ┌─┴───────────────────┴─┐                           │
│                    │    Encrypted Storage   │  ← 加密儲存              │
│                    │    檔案系統加密+ACL    │    基本保護               │
│                  ┌─┴───────────────────────┴─┐                         │
│                  │      Plain Text Storage    │  ← 不建議使用          │
│                  │      無保護                │    高風險              │
│                  └─────────────────────────────┘                       │
└────────────────────────────────────────────────────────────────────────┘

AWS CloudHSM 整合

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
# 初始化 CloudHSM 叢集
aws cloudhsmv2 create-cluster \
    --hsm-type hsm1.medium \
    --subnet-ids subnet-12345678 \
    --region ap-northeast-1

# 建立 HSM
aws cloudhsmv2 create-hsm \
    --cluster-id cluster-12345678 \
    --availability-zone ap-northeast-1a

# 設定 CloudHSM 客戶端
sudo yum install -y cloudhsm-client

# 配置連線
sudo /opt/cloudhsm/bin/configure -a <HSM_IP>

# 使用 PKCS#11 生成金鑰對
pkcs11-tool --module /opt/cloudhsm/lib/libcloudhsm_pkcs11.so \
    --login --pin <USER_PIN> \
    --keypairgen --key-type rsa:2048 \
    --label "cert-signing-key"

私鑰存取控制策略

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "AllowKeyAdministration",
            "Effect": "Allow",
            "Principal": {
                "AWS": "arn:aws:iam::123456789012:role/KeyAdminRole"
            },
            "Action": [
                "kms:Create*",
                "kms:Describe*",
                "kms:Enable*",
                "kms:List*",
                "kms:Put*",
                "kms:Update*",
                "kms:Revoke*",
                "kms:Disable*",
                "kms:Get*",
                "kms:Delete*",
                "kms:ScheduleKeyDeletion",
                "kms:CancelKeyDeletion"
            ],
            "Resource": "*"
        },
        {
            "Sid": "AllowCertificateSigning",
            "Effect": "Allow",
            "Principal": {
                "AWS": "arn:aws:iam::123456789012:role/CertificateManagerRole"
            },
            "Action": [
                "kms:Sign",
                "kms:GetPublicKey",
                "kms:DescribeKey"
            ],
            "Resource": "*",
            "Condition": {
                "StringEquals": {
                    "kms:SigningAlgorithm": "RSASSA_PKCS1_V1_5_SHA_256"
                }
            }
        },
        {
            "Sid": "DenyExport",
            "Effect": "Deny",
            "Principal": "*",
            "Action": [
                "kms:Export*"
            ],
            "Resource": "*"
        }
    ]
}

自動化憑證簽發

使用 AWS Lambda 自動簽發憑證

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
import boto3
import json
from datetime import datetime, timedelta
import base64

def lambda_handler(event, context):
    """
    自動憑證簽發 Lambda 函數

    Event 格式:
    {
        "common_name": "web.example.com",
        "san": ["www.example.com", "api.example.com"],
        "validity_days": 365,
        "ca_arn": "arn:aws:acm-pca:..."
    }
    """

    acm_pca_client = boto3.client('acm-pca')

    common_name = event['common_name']
    san = event.get('san', [])
    validity_days = event.get('validity_days', 365)
    ca_arn = event['ca_arn']

    # 生成 CSR(使用臨時金鑰對)
    from cryptography.hazmat.primitives import serialization
    from cryptography.hazmat.primitives.asymmetric import rsa
    from cryptography import x509
    from cryptography.x509.oid import NameOID
    from cryptography.hazmat.primitives import hashes

    # 生成金鑰對
    private_key = rsa.generate_private_key(
        public_exponent=65537,
        key_size=2048
    )

    # 建立 CSR
    subject = x509.Name([
        x509.NameAttribute(NameOID.COUNTRY_NAME, "TW"),
        x509.NameAttribute(NameOID.ORGANIZATION_NAME, "Example Corp"),
        x509.NameAttribute(NameOID.COMMON_NAME, common_name),
    ])

    csr_builder = x509.CertificateSigningRequestBuilder().subject_name(subject)

    # 添加 SAN
    if san:
        san_list = [x509.DNSName(common_name)] + [x509.DNSName(name) for name in san]
        csr_builder = csr_builder.add_extension(
            x509.SubjectAlternativeName(san_list),
            critical=False
        )

    csr = csr_builder.sign(private_key, hashes.SHA256())
    csr_pem = csr.public_bytes(serialization.Encoding.PEM)

    # 使用 ACM PCA 簽發憑證
    response = acm_pca_client.issue_certificate(
        CertificateAuthorityArn=ca_arn,
        Csr=csr_pem,
        SigningAlgorithm='SHA256WITHRSA',
        Validity={
            'Value': validity_days,
            'Type': 'DAYS'
        },
        IdempotencyToken=f"{common_name}-{datetime.now().strftime('%Y%m%d%H%M%S')}"
    )

    certificate_arn = response['CertificateArn']

    # 等待憑證簽發完成
    import time
    for _ in range(30):
        try:
            cert_response = acm_pca_client.get_certificate(
                CertificateAuthorityArn=ca_arn,
                CertificateArn=certificate_arn
            )
            break
        except acm_pca_client.exceptions.RequestInProgressException:
            time.sleep(1)

    # 將私鑰儲存到 Secrets Manager
    secrets_client = boto3.client('secretsmanager')

    private_key_pem = private_key.private_bytes(
        encoding=serialization.Encoding.PEM,
        format=serialization.PrivateFormat.PKCS8,
        encryption_algorithm=serialization.NoEncryption()
    )

    secret_name = f"certificates/{common_name}"

    try:
        secrets_client.create_secret(
            Name=secret_name,
            SecretString=json.dumps({
                'certificate': cert_response['Certificate'],
                'certificate_chain': cert_response['CertificateChain'],
                'private_key': private_key_pem.decode('utf-8'),
                'common_name': common_name,
                'issued_at': datetime.now().isoformat(),
                'expires_at': (datetime.now() + timedelta(days=validity_days)).isoformat()
            })
        )
    except secrets_client.exceptions.ResourceExistsException:
        secrets_client.update_secret(
            SecretId=secret_name,
            SecretString=json.dumps({
                'certificate': cert_response['Certificate'],
                'certificate_chain': cert_response['CertificateChain'],
                'private_key': private_key_pem.decode('utf-8'),
                'common_name': common_name,
                'issued_at': datetime.now().isoformat(),
                'expires_at': (datetime.now() + timedelta(days=validity_days)).isoformat()
            })
        )

    return {
        'statusCode': 200,
        'body': json.dumps({
            'certificate_arn': certificate_arn,
            'secret_name': secret_name,
            'common_name': common_name,
            'expires_at': (datetime.now() + timedelta(days=validity_days)).isoformat()
        })
    }

使用 Kubernetes cert-manager 與 Cloud KMS

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
# cert-manager ClusterIssuer 配置
apiVersion: cert-manager.io/v1
kind: ClusterIssuer
metadata:
  name: enterprise-ca-issuer
spec:
  ca:
    secretName: enterprise-ca-secret

---
# 憑證請求
apiVersion: cert-manager.io/v1
kind: Certificate
metadata:
  name: web-server-cert
  namespace: production
spec:
  secretName: web-server-tls
  duration: 8760h  # 1 年
  renewBefore: 720h  # 30 天前更新

  subject:
    organizations:
      - Example Corp

  commonName: web.example.com

  dnsNames:
    - web.example.com
    - www.example.com
    - api.example.com

  privateKey:
    algorithm: RSA
    size: 2048

  issuerRef:
    name: enterprise-ca-issuer
    kind: ClusterIssuer

---
# 使用 Google CAS 作為 Issuer
apiVersion: cas-issuer.jetstack.io/v1beta1
kind: GoogleCASClusterIssuer
metadata:
  name: gcp-cas-issuer
spec:
  project: my-project
  location: asia-east1
  caPoolId: enterprise-ca-pool
  # 可選:指定特定 CA
  # certificateAuthorityId: enterprise-root-ca

自動化憑證續訂流程

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
import boto3
from datetime import datetime, timedelta
import json

class CertificateRenewalManager:
    def __init__(self):
        self.acm_client = boto3.client('acm')
        self.acm_pca_client = boto3.client('acm-pca')
        self.sns_client = boto3.client('sns')
        self.secrets_client = boto3.client('secretsmanager')

    def check_expiring_certificates(self, days_threshold=30):
        """檢查即將到期的憑證"""
        expiring_certs = []
        threshold_date = datetime.now() + timedelta(days=days_threshold)

        # 列出所有 ACM 憑證
        paginator = self.acm_client.get_paginator('list_certificates')

        for page in paginator.paginate():
            for cert_summary in page['CertificateSummaryList']:
                cert_details = self.acm_client.describe_certificate(
                    CertificateArn=cert_summary['CertificateArn']
                )

                cert = cert_details['Certificate']
                not_after = cert.get('NotAfter')

                if not_after and not_after.replace(tzinfo=None) < threshold_date:
                    expiring_certs.append({
                        'arn': cert_summary['CertificateArn'],
                        'domain': cert_summary['DomainName'],
                        'expires': not_after.isoformat(),
                        'status': cert['Status'],
                        'type': cert['Type']
                    })

        return expiring_certs

    def renew_private_certificate(self, cert_arn, ca_arn):
        """續訂私有憑證"""
        # 取得現有憑證資訊
        cert_details = self.acm_client.describe_certificate(
            CertificateArn=cert_arn
        )['Certificate']

        domain = cert_details['DomainName']
        san = cert_details.get('SubjectAlternativeNames', [])

        # 簽發新憑證
        # ... 實作與 lambda_handler 類似的邏輯

        return {
            'old_cert_arn': cert_arn,
            'domain': domain,
            'status': 'renewed'
        }

    def send_expiry_notification(self, expiring_certs, sns_topic_arn):
        """發送到期通知"""
        if not expiring_certs:
            return

        message = {
            'subject': f'Certificate Expiry Alert: {len(expiring_certs)} certificates expiring soon',
            'expiring_certificates': expiring_certs,
            'timestamp': datetime.now().isoformat()
        }

        self.sns_client.publish(
            TopicArn=sns_topic_arn,
            Subject='Certificate Expiry Alert',
            Message=json.dumps(message, indent=2, default=str)
        )

def lambda_handler(event, context):
    """
    定期檢查並續訂憑證的 Lambda 函數
    可透過 CloudWatch Events 每日觸發
    """
    manager = CertificateRenewalManager()

    # 檢查即將到期的憑證
    expiring_certs = manager.check_expiring_certificates(days_threshold=30)

    if expiring_certs:
        # 發送通知
        manager.send_expiry_notification(
            expiring_certs,
            event.get('sns_topic_arn', 'arn:aws:sns:...')
        )

        # 自動續訂私有憑證
        for cert in expiring_certs:
            if cert['type'] == 'PRIVATE':
                try:
                    manager.renew_private_certificate(
                        cert['arn'],
                        event.get('ca_arn')
                    )
                except Exception as e:
                    print(f"Failed to renew {cert['domain']}: {str(e)}")

    return {
        'statusCode': 200,
        'body': json.dumps({
            'checked': datetime.now().isoformat(),
            'expiring_count': len(expiring_certs),
            'expiring_certificates': expiring_certs
        }, default=str)
    }

多雲金鑰管理

在多雲環境中管理金鑰需要統一的策略和工具。

多雲 KMS 抽象層

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
from abc import ABC, abstractmethod
from typing import Optional, Dict, Any
import boto3
from azure.identity import DefaultAzureCredential
from azure.keyvault.keys import KeyClient
from azure.keyvault.keys.crypto import CryptographyClient, SignatureAlgorithm
from google.cloud import kms

class CloudKMSProvider(ABC):
    """雲端 KMS 抽象基類"""

    @abstractmethod
    def get_public_key(self, key_id: str) -> bytes:
        """取得公鑰"""
        pass

    @abstractmethod
    def sign(self, key_id: str, data: bytes, algorithm: str) -> bytes:
        """簽署資料"""
        pass

    @abstractmethod
    def verify(self, key_id: str, data: bytes, signature: bytes, algorithm: str) -> bool:
        """驗證簽章"""
        pass

class AWSKMSProvider(CloudKMSProvider):
    """AWS KMS 實作"""

    def __init__(self, region: str = 'ap-northeast-1'):
        self.client = boto3.client('kms', region_name=region)

    def get_public_key(self, key_id: str) -> bytes:
        response = self.client.get_public_key(KeyId=key_id)
        return response['PublicKey']

    def sign(self, key_id: str, data: bytes, algorithm: str = 'RSASSA_PKCS1_V1_5_SHA_256') -> bytes:
        response = self.client.sign(
            KeyId=key_id,
            Message=data,
            MessageType='RAW',
            SigningAlgorithm=algorithm
        )
        return response['Signature']

    def verify(self, key_id: str, data: bytes, signature: bytes,
               algorithm: str = 'RSASSA_PKCS1_V1_5_SHA_256') -> bool:
        try:
            response = self.client.verify(
                KeyId=key_id,
                Message=data,
                MessageType='RAW',
                Signature=signature,
                SigningAlgorithm=algorithm
            )
            return response['SignatureValid']
        except Exception:
            return False

class AzureKeyVaultProvider(CloudKMSProvider):
    """Azure Key Vault 實作"""

    def __init__(self, vault_url: str):
        self.credential = DefaultAzureCredential()
        self.vault_url = vault_url
        self.key_client = KeyClient(vault_url=vault_url, credential=self.credential)

    def get_public_key(self, key_id: str) -> bytes:
        key = self.key_client.get_key(key_id)
        # 轉換為 DER 格式
        from cryptography.hazmat.primitives import serialization
        from cryptography.hazmat.primitives.asymmetric import rsa

        public_key = rsa.RSAPublicNumbers(
            e=int.from_bytes(key.key.e, 'big'),
            n=int.from_bytes(key.key.n, 'big')
        ).public_key()

        return public_key.public_bytes(
            encoding=serialization.Encoding.DER,
            format=serialization.PublicFormat.SubjectPublicKeyInfo
        )

    def sign(self, key_id: str, data: bytes, algorithm: str = 'RS256') -> bytes:
        key = self.key_client.get_key(key_id)
        crypto_client = CryptographyClient(key=key, credential=self.credential)

        import hashlib
        digest = hashlib.sha256(data).digest()

        result = crypto_client.sign(
            algorithm=SignatureAlgorithm.rs256,
            digest=digest
        )
        return result.signature

    def verify(self, key_id: str, data: bytes, signature: bytes, algorithm: str = 'RS256') -> bool:
        key = self.key_client.get_key(key_id)
        crypto_client = CryptographyClient(key=key, credential=self.credential)

        import hashlib
        digest = hashlib.sha256(data).digest()

        result = crypto_client.verify(
            algorithm=SignatureAlgorithm.rs256,
            digest=digest,
            signature=signature
        )
        return result.is_valid

class GCPKMSProvider(CloudKMSProvider):
    """GCP Cloud KMS 實作"""

    def __init__(self, project_id: str, location: str, keyring: str):
        self.client = kms.KeyManagementServiceClient()
        self.project_id = project_id
        self.location = location
        self.keyring = keyring

    def _get_key_version_name(self, key_id: str, version: str = '1') -> str:
        return self.client.crypto_key_version_path(
            self.project_id, self.location, self.keyring, key_id, version
        )

    def get_public_key(self, key_id: str) -> bytes:
        key_version_name = self._get_key_version_name(key_id)
        response = self.client.get_public_key(request={"name": key_version_name})

        from cryptography.hazmat.primitives import serialization
        public_key = serialization.load_pem_public_key(response.pem.encode())

        return public_key.public_bytes(
            encoding=serialization.Encoding.DER,
            format=serialization.PublicFormat.SubjectPublicKeyInfo
        )

    def sign(self, key_id: str, data: bytes, algorithm: str = 'SHA256') -> bytes:
        import hashlib
        digest = hashlib.sha256(data).digest()

        key_version_name = self._get_key_version_name(key_id)

        response = self.client.asymmetric_sign(
            request={
                "name": key_version_name,
                "digest": {"sha256": digest}
            }
        )
        return response.signature

    def verify(self, key_id: str, data: bytes, signature: bytes, algorithm: str = 'SHA256') -> bool:
        # GCP 不直接提供驗證 API,使用公鑰本地驗證
        from cryptography.hazmat.primitives import hashes
        from cryptography.hazmat.primitives.asymmetric import padding
        from cryptography.hazmat.primitives import serialization

        try:
            public_key_der = self.get_public_key(key_id)
            public_key = serialization.load_der_public_key(public_key_der)

            public_key.verify(
                signature,
                data,
                padding.PKCS1v15(),
                hashes.SHA256()
            )
            return True
        except Exception:
            return False

class MultiCloudKMSManager:
    """多雲 KMS 管理器"""

    def __init__(self):
        self.providers: Dict[str, CloudKMSProvider] = {}

    def register_provider(self, name: str, provider: CloudKMSProvider):
        """註冊雲端提供者"""
        self.providers[name] = provider

    def get_provider(self, name: str) -> Optional[CloudKMSProvider]:
        """取得雲端提供者"""
        return self.providers.get(name)

    def sign_with_provider(self, provider_name: str, key_id: str,
                           data: bytes, algorithm: str = None) -> bytes:
        """使用指定提供者簽署"""
        provider = self.get_provider(provider_name)
        if not provider:
            raise ValueError(f"Provider {provider_name} not found")

        return provider.sign(key_id, data, algorithm or 'default')

# 使用範例
if __name__ == "__main__":
    manager = MultiCloudKMSManager()

    # 註冊各雲端提供者
    manager.register_provider('aws', AWSKMSProvider(region='ap-northeast-1'))
    manager.register_provider('azure', AzureKeyVaultProvider(
        vault_url='https://my-vault.vault.azure.net/'
    ))
    manager.register_provider('gcp', GCPKMSProvider(
        project_id='my-project',
        location='asia-east1',
        keyring='my-keyring'
    ))

    # 使用 AWS KMS 簽署
    data = b"Hello, World!"
    signature = manager.sign_with_provider('aws', 'alias/my-key', data)
    print(f"AWS signature: {signature.hex()[:32]}...")

HashiCorp Vault 統一金鑰管理

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# Vault Transit Secrets Engine 配置
# 啟用 Transit 引擎
path "transit" {
  capabilities = ["create", "read", "update", "delete", "list"]
}

# 建立簽署金鑰
resource "vault_transit_secret_backend_key" "cert_signing" {
  backend = vault_mount.transit.path
  name    = "cert-signing-key"
  type    = "rsa-2048"

  deletion_allowed = false
  exportable       = false

  auto_rotate_period = 0  # 手動輪替
}

# Vault 策略
resource "vault_policy" "cert_manager" {
  name = "cert-manager"

  policy = <<EOT
# 允許簽署操作
path "transit/sign/cert-signing-key" {
  capabilities = ["create", "update"]
}

# 允許取得公鑰
path "transit/keys/cert-signing-key" {
  capabilities = ["read"]
}

# 允許驗證簽章
path "transit/verify/cert-signing-key" {
  capabilities = ["create", "update"]
}
EOT
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
import hvac
import base64

class VaultTransitManager:
    """HashiCorp Vault Transit 管理器"""

    def __init__(self, vault_url: str, token: str):
        self.client = hvac.Client(url=vault_url, token=token)

    def sign_data(self, key_name: str, data: bytes,
                  algorithm: str = 'sha2-256') -> str:
        """使用 Vault Transit 簽署資料"""
        # Base64 編碼資料
        encoded_data = base64.b64encode(data).decode('utf-8')

        response = self.client.secrets.transit.sign_data(
            name=key_name,
            hash_input=encoded_data,
            hash_algorithm=algorithm,
            signature_algorithm='pkcs1v15'
        )

        return response['data']['signature']

    def verify_signature(self, key_name: str, data: bytes,
                         signature: str, algorithm: str = 'sha2-256') -> bool:
        """驗證簽章"""
        encoded_data = base64.b64encode(data).decode('utf-8')

        response = self.client.secrets.transit.verify_signed_data(
            name=key_name,
            hash_input=encoded_data,
            signature=signature,
            hash_algorithm=algorithm
        )

        return response['data']['valid']

    def get_public_key(self, key_name: str) -> dict:
        """取得公鑰"""
        response = self.client.secrets.transit.read_key(name=key_name)
        return response['data']['keys']

# 使用範例
if __name__ == "__main__":
    vault_manager = VaultTransitManager(
        vault_url='https://vault.example.com:8200',
        token='s.xxxxx'
    )

    # 簽署資料
    data = b"Certificate data to sign"
    signature = vault_manager.sign_data('cert-signing-key', data)
    print(f"Signature: {signature}")

    # 驗證簽章
    is_valid = vault_manager.verify_signature('cert-signing-key', data, signature)
    print(f"Signature valid: {is_valid}")

最佳實務與合規

金鑰管理最佳實務

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
┌────────────────────────────────────────────────────────────────────────┐
│                        金鑰管理最佳實務清單                             │
├────────────────────────────────────────────────────────────────────────┤
│                                                                        │
│  1. 金鑰生命週期管理                                                    │
│     ├─ 制定明確的金鑰輪替策略(建議:對稱金鑰每年、非對稱金鑰每2-3年)    │
│     ├─ 建立金鑰版本控制機制                                            │
│     ├─ 實施金鑰到期自動通知                                            │
│     └─ 定義金鑰銷毀程序                                                │
│                                                                        │
│  2. 存取控制                                                           │
│     ├─ 最小權限原則:僅授予必要的金鑰操作權限                           │
│     ├─ 職責分離:金鑰管理員與金鑰使用者分開                             │
│     ├─ 多因素驗證:關鍵操作需要 MFA                                     │
│     └─ 定期審查存取權限                                                │
│                                                                        │
│  3. 稽核與監控                                                         │
│     ├─ 啟用所有金鑰操作的日誌記錄                                       │
│     ├─ 設定異常使用警報                                                │
│     ├─ 定期審查稽核日誌                                                │
│     └─ 保留日誌至少 1 年(依合規要求調整)                               │
│                                                                        │
│  4. 災難復原                                                           │
│     ├─ 金鑰備份策略(考慮 HSM 的備份限制)                               │
│     ├─ 跨區域複寫(如適用)                                            │
│     ├─ 定期測試恢復程序                                                │
│     └─ 建立緊急金鑰恢復流程                                            │
│                                                                        │
│  5. 合規要求                                                           │
│     ├─ 確保 HSM 符合 FIPS 140-2 Level 2/3                               │
│     ├─ 遵循 PCI DSS 金鑰管理要求(如適用)                               │
│     ├─ 符合 GDPR 加密要求(如處理歐盟資料)                              │
│     └─ 定期進行合規性評估                                              │
│                                                                        │
└────────────────────────────────────────────────────────────────────────┘

合規性檢查腳本

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
import boto3
from datetime import datetime, timedelta
from typing import List, Dict, Any
import json

class KMSComplianceChecker:
    """KMS 合規性檢查器"""

    def __init__(self, region: str = 'ap-northeast-1'):
        self.kms_client = boto3.client('kms', region_name=region)
        self.cloudtrail_client = boto3.client('cloudtrail', region_name=region)

    def check_key_rotation(self) -> List[Dict[str, Any]]:
        """檢查金鑰輪替狀態"""
        issues = []

        paginator = self.kms_client.get_paginator('list_keys')

        for page in paginator.paginate():
            for key in page['Keys']:
                key_id = key['KeyId']

                try:
                    # 取得金鑰資訊
                    key_info = self.kms_client.describe_key(KeyId=key_id)['KeyMetadata']

                    # 跳過 AWS 管理的金鑰
                    if key_info['KeyManager'] == 'AWS':
                        continue

                    # 檢查對稱金鑰的輪替狀態
                    if key_info['KeySpec'] == 'SYMMETRIC_DEFAULT':
                        rotation_status = self.kms_client.get_key_rotation_status(
                            KeyId=key_id
                        )

                        if not rotation_status['KeyRotationEnabled']:
                            issues.append({
                                'key_id': key_id,
                                'issue': 'Key rotation not enabled',
                                'severity': 'MEDIUM',
                                'recommendation': 'Enable automatic key rotation'
                            })

                    # 檢查金鑰年齡
                    key_age = (datetime.now(key_info['CreationDate'].tzinfo) -
                              key_info['CreationDate']).days

                    if key_age > 365 * 3:  # 超過 3 年
                        issues.append({
                            'key_id': key_id,
                            'issue': f'Key is {key_age} days old',
                            'severity': 'LOW',
                            'recommendation': 'Consider key rotation or replacement'
                        })

                except Exception as e:
                    print(f"Error checking key {key_id}: {str(e)}")

        return issues

    def check_key_policies(self) -> List[Dict[str, Any]]:
        """檢查金鑰策略"""
        issues = []

        paginator = self.kms_client.get_paginator('list_keys')

        for page in paginator.paginate():
            for key in page['Keys']:
                key_id = key['KeyId']

                try:
                    policy = self.kms_client.get_key_policy(
                        KeyId=key_id,
                        PolicyName='default'
                    )

                    policy_doc = json.loads(policy['Policy'])

                    for statement in policy_doc.get('Statement', []):
                        # 檢查是否有過於寬鬆的權限
                        if statement.get('Principal') == '*':
                            issues.append({
                                'key_id': key_id,
                                'issue': 'Key policy allows access to all principals',
                                'severity': 'HIGH',
                                'recommendation': 'Restrict key policy to specific principals'
                            })

                        # 檢查是否允許所有操作
                        actions = statement.get('Action', [])
                        if isinstance(actions, str):
                            actions = [actions]

                        if 'kms:*' in actions:
                            issues.append({
                                'key_id': key_id,
                                'issue': 'Key policy allows all KMS actions',
                                'severity': 'MEDIUM',
                                'recommendation': 'Use principle of least privilege'
                            })

                except Exception as e:
                    print(f"Error checking policy for key {key_id}: {str(e)}")

        return issues

    def check_key_usage_logging(self) -> List[Dict[str, Any]]:
        """檢查金鑰使用日誌"""
        issues = []

        # 檢查 CloudTrail 是否啟用
        trails = self.cloudtrail_client.describe_trails()['trailList']

        kms_logging_enabled = False

        for trail in trails:
            if trail.get('IsMultiRegionTrail', False) or trail.get('HomeRegion') == self.kms_client.meta.region_name:
                # 檢查是否記錄 KMS 事件
                selectors = self.cloudtrail_client.get_event_selectors(
                    TrailName=trail['Name']
                )

                for selector in selectors.get('EventSelectors', []):
                    if selector.get('ReadWriteType') in ['All', 'WriteOnly']:
                        kms_logging_enabled = True
                        break

        if not kms_logging_enabled:
            issues.append({
                'issue': 'KMS operations may not be logged in CloudTrail',
                'severity': 'HIGH',
                'recommendation': 'Enable CloudTrail logging for KMS operations'
            })

        return issues

    def generate_compliance_report(self) -> Dict[str, Any]:
        """生成合規性報告"""
        rotation_issues = self.check_key_rotation()
        policy_issues = self.check_key_policies()
        logging_issues = self.check_key_usage_logging()

        all_issues = rotation_issues + policy_issues + logging_issues

        report = {
            'generated_at': datetime.now().isoformat(),
            'summary': {
                'total_issues': len(all_issues),
                'high_severity': len([i for i in all_issues if i.get('severity') == 'HIGH']),
                'medium_severity': len([i for i in all_issues if i.get('severity') == 'MEDIUM']),
                'low_severity': len([i for i in all_issues if i.get('severity') == 'LOW'])
            },
            'issues': {
                'key_rotation': rotation_issues,
                'key_policies': policy_issues,
                'logging': logging_issues
            }
        }

        return report

# 使用範例
if __name__ == "__main__":
    checker = KMSComplianceChecker()
    report = checker.generate_compliance_report()

    print(json.dumps(report, indent=2, default=str))

憑證與 KMS 整合架構圖

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
┌─────────────────────────────────────────────────────────────────────────────┐
│                     企業級憑證與 KMS 整合架構                                │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                             │
│  ┌───────────────────────────────────────────────────────────────────────┐  │
│  │                           管理層 (Management Plane)                    │  │
│  │  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐   │  │
│  │  │   Terraform │  │   Ansible   │  │ Vault Agent │  │ cert-manager│   │  │
│  │  │   (IaC)     │  │ (Config Mgmt│  │   (Secrets) │  │ (K8s Certs) │   │  │
│  │  └──────┬──────┘  └──────┬──────┘  └──────┬──────┘  └──────┬──────┘   │  │
│  └─────────┼────────────────┼────────────────┼────────────────┼──────────┘  │
│            │                │                │                │             │
│            ▼                ▼                ▼                ▼             │
│  ┌───────────────────────────────────────────────────────────────────────┐  │
│  │                         統一 API 層 (Unified API Layer)               │  │
│  │  ┌────────────────────────────────────────────────────────────────┐   │  │
│  │  │              Multi-Cloud KMS Abstraction Layer                 │   │  │
│  │  └────────────────────────────────────────────────────────────────┘   │  │
│  └───────────────────────────────────────────────────────────────────────┘  │
│            │                │                │                │             │
│            ▼                ▼                ▼                ▼             │
│  ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ ┌───────────┐  │
│  │    AWS KMS      │ │ Azure Key Vault │ │ GCP Cloud KMS   │ │ HashiCorp │  │
│  │                 │ │                 │ │                 │ │   Vault   │  │
│  │ ┌─────────────┐ │ │ ┌─────────────┐ │ │ ┌─────────────┐ │ │ ┌───────┐ │  │
│  │ │  CloudHSM   │ │ │ │  HSM Pool   │ │ │ │  HSM Keys   │ │ │ │Transit│ │  │
│  │ │  (FIPS L3)  │ │ │ │  (FIPS L2)  │ │ │ │  (FIPS L3)  │ │ │ │ Engine│ │  │
│  │ └─────────────┘ │ │ └─────────────┘ │ │ └─────────────┘ │ │ └───────┘ │  │
│  └─────────────────┘ └─────────────────┘ └─────────────────┘ └───────────┘  │
│            │                │                │                │             │
│            └────────────────┴────────┬───────┴────────────────┘             │
│                                      │                                      │
│                                      ▼                                      │
│  ┌───────────────────────────────────────────────────────────────────────┐  │
│  │                        憑證授權單位 (Certificate Authorities)         │  │
│  │  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐   │  │
│  │  │ ACM Private │  │ Azure PKI   │  │  GCP CAS    │  │  Vault PKI  │   │  │
│  │  │     CA      │  │             │  │             │  │             │   │  │
│  │  └──────┬──────┘  └──────┬──────┘  └──────┬──────┘  └──────┬──────┘   │  │
│  └─────────┼────────────────┼────────────────┼────────────────┼──────────┘  │
│            │                │                │                │             │
│            └────────────────┴────────┬───────┴────────────────┘             │
│                                      │                                      │
│                                      ▼                                      │
│  ┌───────────────────────────────────────────────────────────────────────┐  │
│  │                          應用層 (Application Layer)                   │  │
│  │  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐   │  │
│  │  │ Web Servers │  │ API Gateway │  │ Service Mesh│  │  IoT Devices│   │  │
│  │  │  (TLS 1.3)  │  │   (mTLS)    │  │   (Istio)   │  │ (Device Cert│   │  │
│  │  └─────────────┘  └─────────────┘  └─────────────┘  └─────────────┘   │  │
│  └───────────────────────────────────────────────────────────────────────┘  │
│                                                                             │
└─────────────────────────────────────────────────────────────────────────────┘

總結

雲端 KMS 與憑證管理的整合是現代企業資安架構的關鍵組成部分。透過本文介紹的技術和最佳實務,您可以:

  1. 安全儲存私鑰:利用雲端 KMS 的 HSM 保護能力,確保私鑰永不離開安全邊界
  2. 自動化憑證生命週期:結合雲端 CA 服務實現憑證的自動簽發、續訂和撤銷
  3. 統一多雲金鑰管理:透過抽象層簡化跨雲端環境的金鑰管理
  4. 符合合規要求:滿足 FIPS、PCI DSS、GDPR 等法規對加密金鑰的要求

在實施過程中,請記住:

  • 始終遵循最小權限原則
  • 啟用完整的稽核日誌
  • 定期進行合規性檢查
  • 建立災難復原計畫
  • 持續監控金鑰使用狀況

透過適當的規劃和實施,雲端 KMS 可以成為您憑證基礎設施的堅實基石。

comments powered by Disqus
Built with Hugo
Theme Stack designed by Jimmy