1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
| #!/usr/bin/env python3
# ocsp_exporter.py
from prometheus_client import start_http_server, Gauge, Counter, Histogram
import time
import subprocess
from datetime import datetime
# Metrics 定義
OCSP_REQUEST_TOTAL = Counter(
'ocsp_requests_total',
'Total number of OCSP requests',
['status', 'responder']
)
OCSP_RESPONSE_TIME = Histogram(
'ocsp_response_time_seconds',
'OCSP response time in seconds',
['responder'],
buckets=[0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0]
)
OCSP_RESPONDER_UP = Gauge(
'ocsp_responder_up',
'OCSP responder availability',
['responder']
)
CRL_LAST_UPDATE = Gauge(
'crl_last_update_timestamp',
'CRL last update timestamp',
['crl_url']
)
CRL_NEXT_UPDATE = Gauge(
'crl_next_update_timestamp',
'CRL next update timestamp',
['crl_url']
)
CRL_ENTRIES_COUNT = Gauge(
'crl_entries_total',
'Total number of entries in CRL',
['crl_url']
)
class OCSPMonitor:
def __init__(self, responders: list, crls: list, check_interval: int = 60):
self.responders = responders
self.crls = crls
self.check_interval = check_interval
def check_ocsp_responder(self, responder_url: str) -> dict:
"""檢查 OCSP Responder 可用性"""
try:
start_time = time.time()
# 發送 OCSP 請求(使用 OpenSSL)
result = subprocess.run([
'openssl', 'ocsp',
'-issuer', '/etc/pki/CA/certs/ca.crt',
'-cert', '/etc/pki/test/test.crt',
'-url', responder_url,
'-resp_text'
], capture_output=True, timeout=10)
response_time = time.time() - start_time
if result.returncode == 0:
OCSP_RESPONDER_UP.labels(responder=responder_url).set(1)
OCSP_REQUEST_TOTAL.labels(
status='success',
responder=responder_url
).inc()
OCSP_RESPONSE_TIME.labels(
responder=responder_url
).observe(response_time)
return {'status': 'up', 'response_time': response_time}
else:
OCSP_RESPONDER_UP.labels(responder=responder_url).set(0)
OCSP_REQUEST_TOTAL.labels(
status='error',
responder=responder_url
).inc()
return {'status': 'down', 'error': result.stderr.decode()}
except subprocess.TimeoutExpired:
OCSP_RESPONDER_UP.labels(responder=responder_url).set(0)
OCSP_REQUEST_TOTAL.labels(
status='timeout',
responder=responder_url
).inc()
return {'status': 'timeout'}
except Exception as e:
OCSP_RESPONDER_UP.labels(responder=responder_url).set(0)
return {'status': 'error', 'error': str(e)}
def run(self):
"""執行監控迴圈"""
while True:
for responder in self.responders:
self.check_ocsp_responder(responder)
time.sleep(self.check_interval)
if __name__ == '__main__':
# 啟動 Prometheus metrics 端點
start_http_server(9100)
# 設定監控目標
monitor = OCSPMonitor(
responders=[
'http://ocsp.example.com',
'http://ocsp-backup.example.com'
],
crls=[
'http://crl.example.com/ca.crl'
]
)
monitor.run()
|