憑證透明度日誌監控實務

Certificate Transparency Log Monitoring Practice

憑證透明度(Certificate Transparency, CT)是現代網路安全基礎設施中不可或缺的一環。透過監控 CT 日誌,組織可以及早發現未經授權的憑證簽發、追蹤網域資產,並強化整體安全態勢。本文將深入探討 CT 日誌監控的實務操作與最佳實踐。

1. 憑證透明度機制回顧

什麼是憑證透明度?

憑證透明度是由 Google 於 2013 年提出的開放框架(RFC 6962),旨在監控和審計 SSL/TLS 憑證的簽發過程。其核心概念包括:

  • 公開日誌(Public Logs):所有 CA 簽發的憑證都必須提交到公開的日誌伺服器
  • 加密證明(Cryptographic Proofs):使用 Merkle Tree 結構確保日誌的不可竄改性
  • 監控與審計(Monitoring & Auditing):任何人都可以查詢和驗證日誌內容

CT 的三大角色

1
2
3
4
5
6
7
┌─────────────┐     ┌─────────────┐     ┌─────────────┐
│     CA      │────▶│  CT Log     │◀────│  Monitor    │
│ (簽發憑證)   │     │  (記錄憑證)  │     │  (監控憑證)  │
└─────────────┘     └─────────────┘     └─────────────┘
       │                   │                   │
       ▼                   ▼                   ▼
   SCT 回執            Merkle Tree         警報通知

SCT(Signed Certificate Timestamp)

當 CA 將憑證提交到 CT 日誌時,日誌伺服器會返回一個 SCT,這是一個加密簽名的時間戳記,證明憑證已被記錄。現代瀏覽器會驗證 SCT 的存在:

1
2
3
# 檢查網站憑證的 SCT
echo | openssl s_client -connect example.com:443 2>/dev/null | \
  openssl x509 -noout -text | grep -A 5 "CT Precertificate SCTs"

2. CT 日誌結構與查詢

Merkle Tree 結構

CT 日誌使用 Merkle Tree(雜湊樹)來組織憑證資料:

1
2
3
4
5
6
7
                    Root Hash
                   /         \
              Hash01         Hash23
             /     \        /     \
          Hash0   Hash1  Hash2   Hash3
            |       |      |       |
          Cert0   Cert1  Cert2   Cert3

這種結構提供了兩個重要特性:

  • 包含證明(Inclusion Proof):證明某個憑證存在於日誌中
  • 一致性證明(Consistency Proof):證明日誌只有新增,沒有修改或刪除

主要 CT 日誌伺服器

日誌名稱營運者端點
ArgonGooglect.googleapis.com/logs/argon2024
XenonGooglect.googleapis.com/logs/xenon2024
NimbusCloudflarect.cloudflare.com/logs/nimbus2024
OakLet’s Encryptoak.ct.letsencrypt.org/2024
YetiDigiCertyeti2024.ct.digicert.com/log

使用 CT API 查詢

1
2
3
4
5
# 取得日誌的 STH(Signed Tree Head)
curl -s "https://ct.googleapis.com/logs/argon2024h1/ct/v1/get-sth" | jq

# 取得日誌條目
curl -s "https://ct.googleapis.com/logs/argon2024h1/ct/v1/get-entries?start=0&end=10" | jq

使用 Python 解析 CT 日誌條目

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
import base64
import struct
from cryptography import x509
from cryptography.hazmat.backends import default_backend

def parse_ct_entry(entry_data):
    """解析 CT 日誌條目"""
    leaf_input = base64.b64decode(entry_data['leaf_input'])
    extra_data = base64.b64decode(entry_data['extra_data'])

    # 解析 MerkleTreeLeaf 結構
    version = leaf_input[0]
    leaf_type = leaf_input[1]
    timestamp = struct.unpack('>Q', leaf_input[2:10])[0]
    entry_type = struct.unpack('>H', leaf_input[10:12])[0]

    if entry_type == 0:  # X509 Entry
        cert_length = struct.unpack('>I', b'\x00' + leaf_input[12:15])[0]
        cert_data = leaf_input[15:15+cert_length]
    elif entry_type == 1:  # Precert Entry
        # 處理 Precertificate
        issuer_key_hash = leaf_input[12:44]
        tbs_length = struct.unpack('>I', b'\x00' + leaf_input[44:47])[0]
        cert_data = leaf_input[47:47+tbs_length]

    # 解析憑證
    cert = x509.load_der_x509_certificate(cert_data, default_backend())

    return {
        'timestamp': timestamp,
        'subject': cert.subject.rfc4514_string(),
        'issuer': cert.issuer.rfc4514_string(),
        'not_before': cert.not_valid_before,
        'not_after': cert.not_valid_after,
        'san': get_san(cert)
    }

def get_san(cert):
    """取得 Subject Alternative Names"""
    try:
        san_ext = cert.extensions.get_extension_for_oid(
            x509.oid.ExtensionOID.SUBJECT_ALTERNATIVE_NAME
        )
        return [name.value for name in san_ext.value]
    except x509.ExtensionNotFound:
        return []

3. 使用 crt.sh 監控

crt.sh 簡介

crt.sh 是由 Sectigo 維護的免費 CT 日誌搜尋引擎,它聚合了多個 CT 日誌的資料,提供便捷的網頁介面和 API。

網頁查詢

直接在瀏覽器訪問:

1
2
https://crt.sh/?q=example.com
https://crt.sh/?q=%.example.com  # 查詢所有子網域

API 查詢

1
2
3
4
5
6
7
8
# 查詢特定網域的所有憑證(JSON 格式)
curl -s "https://crt.sh/?q=example.com&output=json" | jq

# 查詢包含子網域的所有憑證
curl -s "https://crt.sh/?q=%.example.com&output=json" | jq

# 只查詢最近的憑證
curl -s "https://crt.sh/?q=example.com&output=json&exclude=expired" | jq

使用 Python 封裝 crt.sh API

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
import requests
import json
from datetime import datetime

class CrtShMonitor:
    """crt.sh 監控封裝類別"""

    BASE_URL = "https://crt.sh"

    def __init__(self, domain):
        self.domain = domain

    def get_certificates(self, include_subdomains=True, exclude_expired=True):
        """取得網域的所有憑證"""
        query = f"%.{self.domain}" if include_subdomains else self.domain
        params = {
            'q': query,
            'output': 'json'
        }
        if exclude_expired:
            params['exclude'] = 'expired'

        response = requests.get(self.BASE_URL, params=params, timeout=30)
        response.raise_for_status()

        return response.json()

    def get_unique_subdomains(self):
        """取得所有唯一的子網域"""
        certs = self.get_certificates()
        subdomains = set()

        for cert in certs:
            name = cert.get('name_value', '')
            # 處理多個 SAN
            for subdomain in name.split('\n'):
                subdomain = subdomain.strip().lower()
                if subdomain.endswith(self.domain):
                    subdomains.add(subdomain)

        return sorted(subdomains)

    def get_recent_certificates(self, days=30):
        """取得最近 N 天內簽發的憑證"""
        certs = self.get_certificates()
        recent = []
        cutoff = datetime.now().timestamp() - (days * 86400)

        for cert in certs:
            entry_timestamp = cert.get('entry_timestamp', '')
            if entry_timestamp:
                cert_time = datetime.fromisoformat(
                    entry_timestamp.replace('T', ' ').split('.')[0]
                )
                if cert_time.timestamp() > cutoff:
                    recent.append(cert)

        return recent

    def find_wildcard_certs(self):
        """查找萬用字元憑證"""
        certs = self.get_certificates()
        wildcards = []

        for cert in certs:
            name = cert.get('name_value', '')
            if '*' in name:
                wildcards.append(cert)

        return wildcards


# 使用範例
if __name__ == "__main__":
    monitor = CrtShMonitor("example.com")

    # 取得所有子網域
    subdomains = monitor.get_unique_subdomains()
    print(f"發現 {len(subdomains)} 個子網域:")
    for subdomain in subdomains:
        print(f"  - {subdomain}")

    # 檢查最近的憑證
    recent = monitor.get_recent_certificates(days=7)
    print(f"\n最近 7 天內簽發的憑證: {len(recent)} 個")

定期監控腳本

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
#!/bin/bash
# ct_monitor.sh - CT 日誌監控腳本

DOMAIN="example.com"
STATE_FILE="/var/lib/ct-monitor/${DOMAIN}.state"
WEBHOOK_URL="https://hooks.slack.com/services/xxx/yyy/zzz"

# 取得當前憑證列表
current_certs=$(curl -s "https://crt.sh/?q=%.${DOMAIN}&output=json" | \
    jq -r '.[].id' | sort -u)

# 讀取上次的狀態
if [ -f "$STATE_FILE" ]; then
    previous_certs=$(cat "$STATE_FILE")
else
    previous_certs=""
fi

# 找出新憑證
new_certs=$(comm -23 <(echo "$current_certs") <(echo "$previous_certs"))

if [ -n "$new_certs" ]; then
    echo "發現新憑證:"
    for cert_id in $new_certs; do
        cert_info=$(curl -s "https://crt.sh/?id=${cert_id}&output=json" | jq '.[0]')
        common_name=$(echo "$cert_info" | jq -r '.common_name')
        issuer=$(echo "$cert_info" | jq -r '.issuer_name')

        echo "  ID: $cert_id"
        echo "  CN: $common_name"
        echo "  Issuer: $issuer"

        # 發送 Slack 通知
        curl -X POST -H 'Content-type: application/json' \
            --data "{\"text\":\"新憑證簽發通知\nDomain: ${common_name}\nIssuer: ${issuer}\nhttps://crt.sh/?id=${cert_id}\"}" \
            "$WEBHOOK_URL"
    done
fi

# 更新狀態檔
echo "$current_certs" > "$STATE_FILE"

4. 自建監控系統

系統架構設計

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
┌──────────────┐    ┌──────────────┐    ┌──────────────┐
│  CT Log API  │───▶│  Collector   │───▶│  PostgreSQL  │
└──────────────┘    └──────────────┘    └──────────────┘
┌──────────────┐    ┌──────────────┐           │
│   Alerting   │◀───│   Analyzer   │◀──────────┘
└──────────────┘    └──────────────┘
┌──────────────┐
│  Slack/PD    │
│  Webhook     │
└──────────────┘

資料庫結構

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
-- 建立憑證資料表
CREATE TABLE certificates (
    id SERIAL PRIMARY KEY,
    log_id VARCHAR(255) NOT NULL,
    log_index BIGINT NOT NULL,
    timestamp TIMESTAMP NOT NULL,
    common_name VARCHAR(255),
    issuer VARCHAR(512),
    not_before TIMESTAMP,
    not_after TIMESTAMP,
    serial_number VARCHAR(128),
    sha256_fingerprint VARCHAR(64) UNIQUE,
    raw_certificate BYTEA,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,

    UNIQUE(log_id, log_index)
);

-- 建立 SAN 資料表
CREATE TABLE subject_alternative_names (
    id SERIAL PRIMARY KEY,
    certificate_id INTEGER REFERENCES certificates(id),
    san_type VARCHAR(20),
    san_value VARCHAR(255),

    INDEX idx_san_value (san_value)
);

-- 建立監控規則表
CREATE TABLE monitoring_rules (
    id SERIAL PRIMARY KEY,
    pattern VARCHAR(255) NOT NULL,
    rule_type VARCHAR(50) NOT NULL,  -- 'domain', 'wildcard', 'issuer'
    notify_email VARCHAR(255),
    notify_webhook VARCHAR(512),
    is_active BOOLEAN DEFAULT TRUE,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

-- 建立告警表
CREATE TABLE alerts (
    id SERIAL PRIMARY KEY,
    rule_id INTEGER REFERENCES monitoring_rules(id),
    certificate_id INTEGER REFERENCES certificates(id),
    alert_type VARCHAR(50),
    message TEXT,
    is_acknowledged BOOLEAN DEFAULT FALSE,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

CT 日誌收集器

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
import asyncio
import aiohttp
import base64
import hashlib
from datetime import datetime
from cryptography import x509
from cryptography.hazmat.backends import default_backend
import psycopg2
from psycopg2.extras import execute_values

class CTLogCollector:
    """CT 日誌收集器"""

    LOGS = [
        {
            'name': 'Google Argon 2024',
            'url': 'https://ct.googleapis.com/logs/us1/argon2024/',
        },
        {
            'name': 'Cloudflare Nimbus 2024',
            'url': 'https://ct.cloudflare.com/logs/nimbus2024/',
        },
    ]

    def __init__(self, db_config):
        self.db_config = db_config
        self.batch_size = 256

    async def get_tree_size(self, session, log_url):
        """取得日誌的當前大小"""
        async with session.get(f"{log_url}ct/v1/get-sth") as response:
            data = await response.json()
            return data['tree_size']

    async def get_entries(self, session, log_url, start, end):
        """取得日誌條目"""
        url = f"{log_url}ct/v1/get-entries?start={start}&end={end}"
        async with session.get(url) as response:
            data = await response.json()
            return data.get('entries', [])

    def parse_certificate(self, entry):
        """解析憑證資料"""
        leaf_input = base64.b64decode(entry['leaf_input'])
        extra_data = base64.b64decode(entry['extra_data'])

        # 略過 MerkleTreeLeaf 標頭
        timestamp = int.from_bytes(leaf_input[2:10], 'big')
        entry_type = int.from_bytes(leaf_input[10:12], 'big')

        try:
            if entry_type == 0:  # X509LogEntry
                cert_len = int.from_bytes(b'\x00' + leaf_input[12:15], 'big')
                cert_der = leaf_input[15:15+cert_len]
            else:  # PrecertLogEntry
                tbs_len = int.from_bytes(b'\x00' + leaf_input[44:47], 'big')
                # 對於 precert,需要從 extra_data 取得完整憑證
                cert_len = int.from_bytes(b'\x00' + extra_data[0:3], 'big')
                cert_der = extra_data[3:3+cert_len]

            cert = x509.load_der_x509_certificate(cert_der, default_backend())

            # 提取 SAN
            sans = []
            try:
                san_ext = cert.extensions.get_extension_for_oid(
                    x509.oid.ExtensionOID.SUBJECT_ALTERNATIVE_NAME
                )
                for name in san_ext.value:
                    if isinstance(name, x509.DNSName):
                        sans.append(('DNS', name.value))
                    elif isinstance(name, x509.IPAddress):
                        sans.append(('IP', str(name.value)))
            except x509.ExtensionNotFound:
                pass

            return {
                'timestamp': datetime.fromtimestamp(timestamp / 1000),
                'common_name': self._get_common_name(cert),
                'issuer': cert.issuer.rfc4514_string(),
                'not_before': cert.not_valid_before,
                'not_after': cert.not_valid_after,
                'serial_number': format(cert.serial_number, 'x'),
                'sha256_fingerprint': hashlib.sha256(cert_der).hexdigest(),
                'raw_certificate': cert_der,
                'sans': sans
            }
        except Exception as e:
            return None

    def _get_common_name(self, cert):
        """取得 Common Name"""
        try:
            cn = cert.subject.get_attributes_for_oid(
                x509.oid.NameOID.COMMON_NAME
            )
            return cn[0].value if cn else None
        except:
            return None

    async def collect_log(self, log_info, start_index=None):
        """收集單一日誌"""
        async with aiohttp.ClientSession() as session:
            tree_size = await self.get_tree_size(session, log_info['url'])

            if start_index is None:
                start_index = max(0, tree_size - 1000)  # 從最後 1000 條開始

            print(f"收集 {log_info['name']}: {start_index} -> {tree_size}")

            conn = psycopg2.connect(**self.db_config)
            cursor = conn.cursor()

            for batch_start in range(start_index, tree_size, self.batch_size):
                batch_end = min(batch_start + self.batch_size - 1, tree_size - 1)

                entries = await self.get_entries(
                    session, log_info['url'], batch_start, batch_end
                )

                certs_to_insert = []
                for i, entry in enumerate(entries):
                    parsed = self.parse_certificate(entry)
                    if parsed:
                        parsed['log_id'] = log_info['name']
                        parsed['log_index'] = batch_start + i
                        certs_to_insert.append(parsed)

                if certs_to_insert:
                    self._batch_insert(cursor, certs_to_insert)
                    conn.commit()

                print(f"  已處理: {batch_end}/{tree_size}")

            cursor.close()
            conn.close()

    def _batch_insert(self, cursor, certs):
        """批次插入憑證"""
        cert_values = [
            (
                c['log_id'], c['log_index'], c['timestamp'],
                c['common_name'], c['issuer'], c['not_before'],
                c['not_after'], c['serial_number'], c['sha256_fingerprint'],
                psycopg2.Binary(c['raw_certificate'])
            )
            for c in certs
        ]

        execute_values(
            cursor,
            """
            INSERT INTO certificates
            (log_id, log_index, timestamp, common_name, issuer,
             not_before, not_after, serial_number, sha256_fingerprint, raw_certificate)
            VALUES %s
            ON CONFLICT (sha256_fingerprint) DO NOTHING
            RETURNING id, sha256_fingerprint
            """,
            cert_values
        )

        # 插入 SANs
        inserted = {row[1]: row[0] for row in cursor.fetchall()}

        san_values = []
        for cert in certs:
            cert_id = inserted.get(cert['sha256_fingerprint'])
            if cert_id:
                for san_type, san_value in cert['sans']:
                    san_values.append((cert_id, san_type, san_value))

        if san_values:
            execute_values(
                cursor,
                """
                INSERT INTO subject_alternative_names
                (certificate_id, san_type, san_value)
                VALUES %s
                """,
                san_values
            )


# 執行收集
async def main():
    db_config = {
        'host': 'localhost',
        'database': 'ct_monitor',
        'user': 'ct_user',
        'password': 'secret'
    }

    collector = CTLogCollector(db_config)

    for log in collector.LOGS:
        await collector.collect_log(log)

if __name__ == "__main__":
    asyncio.run(main())

告警分析器

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
import re
import psycopg2
import requests
from datetime import datetime, timedelta

class AlertAnalyzer:
    """告警分析器"""

    def __init__(self, db_config):
        self.conn = psycopg2.connect(**db_config)

    def check_new_certificates(self, since_minutes=60):
        """檢查新簽發的憑證"""
        cursor = self.conn.cursor()

        since = datetime.now() - timedelta(minutes=since_minutes)

        cursor.execute("""
            SELECT c.id, c.common_name, c.issuer, c.timestamp,
                   array_agg(s.san_value) as sans
            FROM certificates c
            LEFT JOIN subject_alternative_names s ON c.id = s.certificate_id
            WHERE c.created_at > %s
            GROUP BY c.id, c.common_name, c.issuer, c.timestamp
        """, (since,))

        new_certs = cursor.fetchall()
        cursor.close()

        return new_certs

    def match_rules(self, certificate):
        """比對監控規則"""
        cursor = self.conn.cursor()

        cursor.execute("""
            SELECT id, pattern, rule_type, notify_email, notify_webhook
            FROM monitoring_rules
            WHERE is_active = TRUE
        """)

        rules = cursor.fetchall()
        cursor.close()

        matched_rules = []
        cert_id, common_name, issuer, timestamp, sans = certificate

        all_names = [common_name] + (sans or [])

        for rule in rules:
            rule_id, pattern, rule_type, email, webhook = rule

            if rule_type == 'domain':
                for name in all_names:
                    if name and pattern.lower() in name.lower():
                        matched_rules.append(rule)
                        break

            elif rule_type == 'wildcard':
                regex = pattern.replace('.', r'\.').replace('*', '.*')
                for name in all_names:
                    if name and re.match(regex, name, re.IGNORECASE):
                        matched_rules.append(rule)
                        break

            elif rule_type == 'issuer':
                if issuer and pattern.lower() in issuer.lower():
                    matched_rules.append(rule)

        return matched_rules

    def create_alert(self, rule, certificate):
        """建立告警"""
        cursor = self.conn.cursor()

        rule_id, pattern, rule_type, email, webhook = rule
        cert_id, common_name, issuer, timestamp, sans = certificate

        message = f"""
新憑證告警
===========
規則: {pattern} ({rule_type})
Common Name: {common_name}
簽發者: {issuer}
時間: {timestamp}
SANs: {', '.join(sans or [])}
"""

        cursor.execute("""
            INSERT INTO alerts (rule_id, certificate_id, alert_type, message)
            VALUES (%s, %s, %s, %s)
            RETURNING id
        """, (rule_id, cert_id, 'new_certificate', message))

        alert_id = cursor.fetchone()[0]
        self.conn.commit()
        cursor.close()

        # 發送通知
        if webhook:
            self._send_webhook(webhook, message)

        if email:
            self._send_email(email, "CT 監控告警", message)

        return alert_id

    def _send_webhook(self, webhook_url, message):
        """發送 Webhook 通知"""
        try:
            if 'slack.com' in webhook_url:
                payload = {'text': message}
            elif 'discord.com' in webhook_url:
                payload = {'content': message}
            else:
                payload = {'message': message}

            requests.post(webhook_url, json=payload, timeout=10)
        except Exception as e:
            print(f"Webhook 發送失敗: {e}")

    def _send_email(self, to_email, subject, body):
        """發送郵件通知(需要設定 SMTP)"""
        # 實作郵件發送邏輯
        pass

    def run_analysis(self):
        """執行分析"""
        new_certs = self.check_new_certificates()

        for cert in new_certs:
            matched_rules = self.match_rules(cert)
            for rule in matched_rules:
                self.create_alert(rule, cert)

        return len(new_certs)

5. certspotter 工具使用

安裝 certspotter

certspotter 是 SSLMate 開發的開源 CT 監控工具。

1
2
3
4
5
6
7
8
# 使用 Go 安裝
go install software.sslmate.com/src/certspotter/cmd/certspotter@latest

# 或從原始碼編譯
git clone https://github.com/SSLMate/certspotter.git
cd certspotter
go build ./cmd/certspotter
sudo mv certspotter /usr/local/bin/

基本使用

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
# 建立監控清單
mkdir -p ~/.certspotter
echo "example.com" > ~/.certspotter/watchlist
echo ".example.com" >> ~/.certspotter/watchlist  # 監控所有子網域

# 執行監控
certspotter

# 指定資料目錄
certspotter -state_dir /var/lib/certspotter

# 輸出詳細資訊
certspotter -verbose

設定通知腳本

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
#!/bin/bash
# /etc/certspotter/hooks.d/notify.sh

# 環境變數:
# $CERT_PARSEABLE - 可解析的憑證資訊
# $CERT_SUMMARY - 憑證摘要

SLACK_WEBHOOK="https://hooks.slack.com/services/xxx"

# 解析憑證資訊
SUBJECT=$(echo "$CERT_PARSEABLE" | grep "^Subject:" | cut -d: -f2-)
ISSUER=$(echo "$CERT_PARSEABLE" | grep "^Issuer:" | cut -d: -f2-)
NOT_BEFORE=$(echo "$CERT_PARSEABLE" | grep "^Not Before:" | cut -d: -f2-)
NOT_AFTER=$(echo "$CERT_PARSEABLE" | grep "^Not After:" | cut -d: -f2-)
DNS_NAMES=$(echo "$CERT_PARSEABLE" | grep "^DNS:" | cut -d: -f2- | tr '\n' ', ')

# 發送 Slack 通知
curl -X POST -H 'Content-type: application/json' \
    --data "{
        \"attachments\": [{
            \"color\": \"warning\",
            \"title\": \"新憑證簽發通知\",
            \"fields\": [
                {\"title\": \"Subject\", \"value\": \"$SUBJECT\", \"short\": false},
                {\"title\": \"Issuer\", \"value\": \"$ISSUER\", \"short\": false},
                {\"title\": \"有效期\", \"value\": \"$NOT_BEFORE - $NOT_AFTER\", \"short\": false},
                {\"title\": \"DNS Names\", \"value\": \"$DNS_NAMES\", \"short\": false}
            ]
        }]
    }" \
    "$SLACK_WEBHOOK"

systemd 服務配置

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
# /etc/systemd/system/certspotter.service
[Unit]
Description=Certificate Transparency Log Monitor
After=network.target

[Service]
Type=simple
User=certspotter
Group=certspotter
ExecStart=/usr/local/bin/certspotter -state_dir /var/lib/certspotter -watchlist /etc/certspotter/watchlist
Restart=always
RestartSec=60

# 安全設定
NoNewPrivileges=true
ProtectSystem=strict
ProtectHome=true
ReadWritePaths=/var/lib/certspotter

[Install]
WantedBy=multi-user.target
1
2
3
4
# 啟動服務
sudo systemctl daemon-reload
sudo systemctl enable certspotter
sudo systemctl start certspotter

使用 SSLMate 雲端服務

SSLMate 提供免費的 CT 監控雲端服務:

1
2
3
4
5
6
# 註冊帳號後設定監控
curl -X POST "https://api.certspotter.com/v1/issuances" \
    -H "Authorization: Bearer YOUR_API_KEY" \
    -d "domain=example.com" \
    -d "include_subdomains=true" \
    -d "webhook_url=https://your-server.com/ct-webhook"

6. 警報與通知整合

Slack 整合

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
import requests
from datetime import datetime

class SlackNotifier:
    """Slack 通知器"""

    def __init__(self, webhook_url):
        self.webhook_url = webhook_url

    def send_certificate_alert(self, cert_info):
        """發送憑證告警"""
        payload = {
            "blocks": [
                {
                    "type": "header",
                    "text": {
                        "type": "plain_text",
                        "text": ":lock: 新 SSL 憑證簽發通知"
                    }
                },
                {
                    "type": "section",
                    "fields": [
                        {
                            "type": "mrkdwn",
                            "text": f"*網域:*\n{cert_info['common_name']}"
                        },
                        {
                            "type": "mrkdwn",
                            "text": f"*簽發者:*\n{cert_info['issuer']}"
                        }
                    ]
                },
                {
                    "type": "section",
                    "fields": [
                        {
                            "type": "mrkdwn",
                            "text": f"*有效期開始:*\n{cert_info['not_before']}"
                        },
                        {
                            "type": "mrkdwn",
                            "text": f"*有效期結束:*\n{cert_info['not_after']}"
                        }
                    ]
                },
                {
                    "type": "section",
                    "text": {
                        "type": "mrkdwn",
                        "text": f"*SANs:*\n```{chr(10).join(cert_info.get('sans', []))}```"
                    }
                },
                {
                    "type": "actions",
                    "elements": [
                        {
                            "type": "button",
                            "text": {
                                "type": "plain_text",
                                "text": "查看詳情"
                            },
                            "url": f"https://crt.sh/?id={cert_info.get('crtsh_id', '')}"
                        }
                    ]
                }
            ]
        }

        response = requests.post(
            self.webhook_url,
            json=payload,
            timeout=10
        )
        return response.status_code == 200

PagerDuty 整合

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
import requests
from datetime import datetime

class PagerDutyNotifier:
    """PagerDuty 通知器"""

    EVENTS_API = "https://events.pagerduty.com/v2/enqueue"

    def __init__(self, routing_key):
        self.routing_key = routing_key

    def trigger_incident(self, cert_info, severity="warning"):
        """觸發事件"""
        payload = {
            "routing_key": self.routing_key,
            "event_action": "trigger",
            "dedup_key": f"ct-{cert_info['sha256_fingerprint'][:16]}",
            "payload": {
                "summary": f"新憑證簽發: {cert_info['common_name']}",
                "severity": severity,
                "source": "ct-monitor",
                "timestamp": datetime.utcnow().isoformat() + "Z",
                "custom_details": {
                    "common_name": cert_info['common_name'],
                    "issuer": cert_info['issuer'],
                    "sans": cert_info.get('sans', []),
                    "not_before": str(cert_info['not_before']),
                    "not_after": str(cert_info['not_after']),
                }
            },
            "links": [
                {
                    "href": f"https://crt.sh/?id={cert_info.get('crtsh_id', '')}",
                    "text": "View on crt.sh"
                }
            ]
        }

        response = requests.post(
            self.EVENTS_API,
            json=payload,
            timeout=10
        )
        return response.json()

整合 Prometheus Alertmanager

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# alertmanager.yml
global:
  slack_api_url: 'https://hooks.slack.com/services/xxx'

route:
  receiver: 'ct-alerts'
  group_by: ['alertname', 'domain']
  group_wait: 30s
  group_interval: 5m
  repeat_interval: 4h

receivers:
  - name: 'ct-alerts'
    slack_configs:
      - channel: '#security-alerts'
        title: 'CT Monitor Alert'
        text: |
          {{ range .Alerts }}
          *Alert:* {{ .Labels.alertname }}
          *Domain:* {{ .Labels.domain }}
          *Issuer:* {{ .Labels.issuer }}
          *Details:* {{ .Annotations.description }}
          {{ end }}          

Prometheus 指標收集

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
from prometheus_client import Counter, Gauge, Histogram, start_http_server

# 定義指標
certificates_discovered = Counter(
    'ct_certificates_discovered_total',
    'Total number of certificates discovered',
    ['domain', 'issuer']
)

certificates_expiring_soon = Gauge(
    'ct_certificates_expiring_soon',
    'Number of certificates expiring within 30 days',
    ['domain']
)

query_duration = Histogram(
    'ct_query_duration_seconds',
    'Duration of CT log queries',
    ['log_name']
)

# 在監控邏輯中使用
def on_certificate_found(cert):
    certificates_discovered.labels(
        domain=cert['common_name'],
        issuer=cert['issuer']
    ).inc()

# 啟動指標伺服器
start_http_server(9090)

7. 子網域發現應用

使用 CT 日誌進行子網域列舉

CT 日誌是發現子網域的寶貴資源,因為每個簽發的憑證都會被記錄。

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
import requests
from collections import defaultdict

class SubdomainEnumerator:
    """子網域列舉器"""

    def __init__(self, domain):
        self.domain = domain.lower()
        self.subdomains = set()

    def enumerate_crtsh(self):
        """使用 crt.sh 列舉"""
        url = f"https://crt.sh/?q=%.{self.domain}&output=json"

        try:
            response = requests.get(url, timeout=30)
            response.raise_for_status()

            for cert in response.json():
                name_value = cert.get('name_value', '')
                for name in name_value.split('\n'):
                    name = name.strip().lower()
                    if name.endswith(self.domain) and '*' not in name:
                        self.subdomains.add(name)
        except Exception as e:
            print(f"crt.sh 查詢失敗: {e}")

        return self.subdomains

    def enumerate_censys(self, api_id, api_secret):
        """使用 Censys 列舉"""
        from censys.search import CensysCertificates

        api = CensysCertificates(api_id=api_id, api_secret=api_secret)

        query = f"parsed.names: *.{self.domain}"

        for cert in api.search(query, per_page=100, pages=10):
            for name in cert.get('parsed', {}).get('names', []):
                name = name.lower()
                if name.endswith(self.domain) and '*' not in name:
                    self.subdomains.add(name)

        return self.subdomains

    def verify_subdomains(self):
        """驗證子網域是否存在"""
        import socket

        verified = {}

        for subdomain in self.subdomains:
            try:
                socket.gethostbyname(subdomain)
                verified[subdomain] = True
            except socket.gaierror:
                verified[subdomain] = False

        return verified

    def analyze_by_issuer(self):
        """按簽發者分析"""
        url = f"https://crt.sh/?q=%.{self.domain}&output=json"

        issuer_stats = defaultdict(int)

        try:
            response = requests.get(url, timeout=30)
            for cert in response.json():
                issuer = cert.get('issuer_name', 'Unknown')
                issuer_stats[issuer] += 1
        except Exception as e:
            print(f"分析失敗: {e}")

        return dict(issuer_stats)


# 使用範例
def main():
    enumerator = SubdomainEnumerator("example.com")

    # 列舉子網域
    subdomains = enumerator.enumerate_crtsh()
    print(f"發現 {len(subdomains)} 個子網域")

    # 驗證
    verified = enumerator.verify_subdomains()
    active = [s for s, v in verified.items() if v]
    print(f"活躍子網域: {len(active)}")

    # 按簽發者分析
    issuer_stats = enumerator.analyze_by_issuer()
    print("\n簽發者統計:")
    for issuer, count in sorted(issuer_stats.items(), key=lambda x: -x[1])[:10]:
        print(f"  {issuer}: {count}")

    # 輸出結果
    print("\n子網域列表:")
    for subdomain in sorted(active):
        print(f"  {subdomain}")

if __name__ == "__main__":
    main()

整合到資產發現流程

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
#!/bin/bash
# subdomain_discovery.sh - 綜合子網域發現腳本

DOMAIN="$1"
OUTPUT_DIR="./recon/${DOMAIN}"

mkdir -p "$OUTPUT_DIR"

echo "[*] 開始子網域發現: $DOMAIN"

# 1. CT 日誌查詢
echo "[+] 查詢 crt.sh..."
curl -s "https://crt.sh/?q=%.${DOMAIN}&output=json" | \
    jq -r '.[].name_value' | \
    sed 's/\*\.//g' | \
    sort -u > "$OUTPUT_DIR/crtsh.txt"

echo "    找到 $(wc -l < "$OUTPUT_DIR/crtsh.txt") 個子網域"

# 2. DNS 暴力破解(可選)
echo "[+] DNS 暴力破解..."
if command -v subfinder &> /dev/null; then
    subfinder -d "$DOMAIN" -silent > "$OUTPUT_DIR/subfinder.txt"
    echo "    找到 $(wc -l < "$OUTPUT_DIR/subfinder.txt") 個子網域"
fi

# 3. 合併結果
cat "$OUTPUT_DIR"/*.txt | sort -u > "$OUTPUT_DIR/all_subdomains.txt"
echo "[*] 總共發現 $(wc -l < "$OUTPUT_DIR/all_subdomains.txt") 個唯一子網域"

# 4. 驗證活躍性
echo "[+] 驗證子網域..."
while read subdomain; do
    if host "$subdomain" &> /dev/null; then
        echo "$subdomain" >> "$OUTPUT_DIR/active_subdomains.txt"
    fi
done < "$OUTPUT_DIR/all_subdomains.txt"

echo "[*] 活躍子網域: $(wc -l < "$OUTPUT_DIR/active_subdomains.txt")"

# 5. 輸出報告
echo ""
echo "=== 發現報告 ==="
echo "網域: $DOMAIN"
echo "總子網域: $(wc -l < "$OUTPUT_DIR/all_subdomains.txt")"
echo "活躍子網域: $(wc -l < "$OUTPUT_DIR/active_subdomains.txt")"
echo "結果目錄: $OUTPUT_DIR"

8. 最佳實務與自動化

監控策略建議

  1. 分層監控

    • 核心網域:即時監控,任何新憑證立即告警
    • 次要網域:每小時檢查,批次通知
    • 第三方服務:每日檢查,週報彙整
  2. 告警分級

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    
    def classify_alert(cert):
        """告警分級"""
        common_name = cert.get('common_name', '')
        issuer = cert.get('issuer', '')
    
        # 高優先級:核心網域
        if any(core in common_name for core in ['api.', 'auth.', 'admin.']):
            return 'critical'
    
        # 中優先級:非預期簽發者
        expected_issuers = ['Let\'s Encrypt', 'DigiCert', 'Sectigo']
        if not any(exp in issuer for exp in expected_issuers):
            return 'high'
    
        # 低優先級:一般憑證
        return 'low'
    
  3. 誤報處理

    • 維護白名單:已知的合法憑證簽發模式
    • 自動確認:內部 CA 簽發的憑證
    • 靜默期:剛上線的新服務

自動化工作流程

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# GitHub Actions 工作流程
name: CT Log Monitor

on:
  schedule:
    - cron: '0 * * * *'  # 每小時執行
  workflow_dispatch:

jobs:
  monitor:
    runs-on: ubuntu-latest

    steps:
      - uses: actions/checkout@v4

      - name: Set up Python
        uses: actions/setup-python@v5
        with:
          python-version: '3.11'

      - name: Install dependencies
        run: |
          pip install requests          

      - name: Run CT monitor
        env:
          SLACK_WEBHOOK: ${{ secrets.SLACK_WEBHOOK }}
          DOMAINS: ${{ secrets.MONITORED_DOMAINS }}
        run: |
          python scripts/ct_monitor.py          

      - name: Upload results
        uses: actions/upload-artifact@v4
        with:
          name: ct-results
          path: results/

Docker 容器化部署

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
# Dockerfile
FROM python:3.11-slim

WORKDIR /app

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY src/ ./src/
COPY config/ ./config/

ENV PYTHONPATH=/app

CMD ["python", "-m", "src.monitor"]
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# docker-compose.yml
version: '3.8'

services:
  ct-monitor:
    build: .
    restart: always
    environment:
      - DATABASE_URL=postgresql://user:pass@db:5432/ct_monitor
      - SLACK_WEBHOOK=${SLACK_WEBHOOK}
    depends_on:
      - db
      - redis
    volumes:
      - ./config:/app/config:ro

  db:
    image: postgres:15
    restart: always
    environment:
      - POSTGRES_USER=user
      - POSTGRES_PASSWORD=pass
      - POSTGRES_DB=ct_monitor
    volumes:
      - postgres_data:/var/lib/postgresql/data

  redis:
    image: redis:7-alpine
    restart: always

  prometheus:
    image: prom/prometheus
    restart: always
    volumes:
      - ./prometheus.yml:/etc/prometheus/prometheus.yml
    ports:
      - "9090:9090"

  grafana:
    image: grafana/grafana
    restart: always
    ports:
      - "3000:3000"
    volumes:
      - grafana_data:/var/lib/grafana

volumes:
  postgres_data:
  grafana_data:

安全考量

  1. API 金鑰管理

    1
    2
    3
    4
    
    # 使用環境變數或 secrets manager
    export CT_API_KEY=$(aws secretsmanager get-secret-value \
        --secret-id ct-monitor/api-key \
        --query SecretString --output text)
    
  2. 速率限制

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    
    import time
    from functools import wraps
    
    def rate_limit(calls_per_second=1):
        """速率限制裝飾器"""
        min_interval = 1.0 / calls_per_second
        last_call = [0]
    
        def decorator(func):
            @wraps(func)
            def wrapper(*args, **kwargs):
                elapsed = time.time() - last_call[0]
                if elapsed < min_interval:
                    time.sleep(min_interval - elapsed)
                last_call[0] = time.time()
                return func(*args, **kwargs)
            return wrapper
        return decorator
    
    @rate_limit(calls_per_second=2)
    def query_crtsh(domain):
        # ...
        pass
    
  3. 日誌與稽核

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    
    import logging
    
    logging.basicConfig(
        level=logging.INFO,
        format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
        handlers=[
            logging.FileHandler('/var/log/ct-monitor/audit.log'),
            logging.StreamHandler()
        ]
    )
    
    logger = logging.getLogger('ct-monitor')
    
    def log_alert(cert, rule):
        logger.info(
            "Alert triggered",
            extra={
                'cert_cn': cert['common_name'],
                'cert_issuer': cert['issuer'],
                'rule_id': rule['id'],
                'rule_pattern': rule['pattern']
            }
        )
    

效能優化建議

  1. 批次處理:避免逐筆查詢,使用批次 API
  2. 快取機制:使用 Redis 快取常用查詢結果
  3. 增量同步:記錄最後處理的日誌索引,只處理新條目
  4. 並行處理:使用 asyncio 或多執行緒同時監控多個日誌
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import asyncio
import aiohttp
from asyncio import Semaphore

class ParallelCTMonitor:
    """並行 CT 監控"""

    def __init__(self, max_concurrent=5):
        self.semaphore = Semaphore(max_concurrent)

    async def check_domain(self, session, domain):
        async with self.semaphore:
            url = f"https://crt.sh/?q=%.{domain}&output=json"
            async with session.get(url) as response:
                return await response.json()

    async def monitor_domains(self, domains):
        async with aiohttp.ClientSession() as session:
            tasks = [
                self.check_domain(session, domain)
                for domain in domains
            ]
            results = await asyncio.gather(*tasks, return_exceptions=True)
            return dict(zip(domains, results))

總結

憑證透明度日誌監控是現代安全營運的重要組成部分。透過本文介紹的工具和技術,您可以:

  • 及時發現未經授權的憑證簽發
  • 追蹤組織的完整憑證資產
  • 發現潛在的子網域和影子 IT
  • 驗證憑證簽發流程的合規性

建議從簡單的 crt.sh 監控開始,隨著需求增長逐步建立更完整的監控系統。持續監控和自動化是確保憑證安全的關鍵。

參考資源

comments powered by Disqus
Built with Hugo
Theme Stack designed by Jimmy