1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
| #!/usr/bin/env python3
"""
Certificate Exporter for Prometheus
匯出 SSL 憑證指標供 Prometheus 收集
"""
import ssl
import socket
from datetime import datetime
from prometheus_client import start_http_server, Gauge, Info
import time
import yaml
# 定義指標
CERT_EXPIRY_SECONDS = Gauge(
'ssl_certificate_expiry_seconds',
'SSL certificate expiry time in seconds',
['domain', 'issuer']
)
CERT_VALID = Gauge(
'ssl_certificate_valid',
'SSL certificate validity (1=valid, 0=invalid)',
['domain']
)
CERT_DAYS_REMAINING = Gauge(
'ssl_certificate_days_remaining',
'Days remaining until certificate expires',
['domain']
)
CERT_INFO = Info(
'ssl_certificate',
'SSL certificate information',
['domain']
)
def get_certificate_info(hostname: str, port: int = 443) -> dict:
"""取得 SSL 憑證資訊"""
context = ssl.create_default_context()
try:
with socket.create_connection((hostname, port), timeout=10) as sock:
with context.wrap_socket(sock, server_hostname=hostname) as ssock:
cert = ssock.getpeercert()
# 解析到期日期
not_after = datetime.strptime(
cert['notAfter'],
'%b %d %H:%M:%S %Y %Z'
)
not_before = datetime.strptime(
cert['notBefore'],
'%b %d %H:%M:%S %Y %Z'
)
# 取得 issuer
issuer = dict(x[0] for x in cert['issuer'])
issuer_name = issuer.get('organizationName', 'Unknown')
# 取得 subject
subject = dict(x[0] for x in cert['subject'])
common_name = subject.get('commonName', hostname)
# 計算剩餘天數
now = datetime.utcnow()
days_remaining = (not_after - now).days
expiry_seconds = (not_after - now).total_seconds()
return {
'valid': True,
'common_name': common_name,
'issuer': issuer_name,
'not_before': not_before.isoformat(),
'not_after': not_after.isoformat(),
'days_remaining': days_remaining,
'expiry_seconds': expiry_seconds,
'serial_number': cert.get('serialNumber', ''),
}
except Exception as e:
return {
'valid': False,
'error': str(e),
'days_remaining': -1,
'expiry_seconds': -1,
}
def update_metrics(domains: list):
"""更新所有網域的指標"""
for domain in domains:
info = get_certificate_info(domain)
CERT_VALID.labels(domain=domain).set(1 if info['valid'] else 0)
if info['valid']:
CERT_EXPIRY_SECONDS.labels(
domain=domain,
issuer=info['issuer']
).set(info['expiry_seconds'])
CERT_DAYS_REMAINING.labels(domain=domain).set(info['days_remaining'])
CERT_INFO.labels(domain=domain).info({
'common_name': info['common_name'],
'issuer': info['issuer'],
'not_before': info['not_before'],
'not_after': info['not_after'],
'serial_number': info['serial_number'],
})
def main():
# 載入設定
with open('/etc/cert-exporter/config.yaml', 'r') as f:
config = yaml.safe_load(f)
domains = config.get('domains', [])
port = config.get('port', 9117)
interval = config.get('interval', 300)
# 啟動 HTTP 伺服器
start_http_server(port)
print(f"Certificate exporter started on port {port}")
while True:
update_metrics(domains)
time.sleep(interval)
if __name__ == "__main__":
main()
|