1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
| #!/usr/bin/env python3
"""
OAuth 2.0 完整安全測試套件
"""
import requests
import json
import secrets
import hashlib
import base64
from urllib.parse import urlencode, parse_qs, urlparse
class OAuth2SecurityTester:
def __init__(self, config):
self.auth_endpoint = config['auth_endpoint']
self.token_endpoint = config['token_endpoint']
self.client_id = config['client_id']
self.client_secret = config.get('client_secret')
self.redirect_uri = config['redirect_uri']
self.scope = config.get('scope', 'openid profile')
self.results = []
def log_result(self, test_name, vulnerable, details=""):
result = {
"test": test_name,
"vulnerable": vulnerable,
"details": details
}
self.results.append(result)
status = "[!] 漏洞" if vulnerable else "[+] 安全"
print(f"{status}: {test_name} - {details}")
def test_redirect_uri_validation(self):
"""測試 redirect_uri 驗證"""
test_name = "Redirect URI 驗證"
payloads = [
"https://evil.com/callback",
self.redirect_uri + "/../evil",
self.redirect_uri + "@evil.com",
"//evil.com/callback",
]
for payload in payloads:
params = {
"response_type": "code",
"client_id": self.client_id,
"redirect_uri": payload,
"scope": self.scope,
"state": secrets.token_urlsafe(16)
}
response = requests.get(
self.auth_endpoint,
params=params,
allow_redirects=False
)
if response.status_code in [301, 302, 303, 307]:
location = response.headers.get('Location', '')
if 'evil' in location:
self.log_result(test_name, True, f"接受惡意 redirect_uri: {payload}")
return
self.log_result(test_name, False, "正確驗證 redirect_uri")
def test_state_parameter(self):
"""測試 state 參數處理"""
test_name = "State 參數驗證"
# 測試無 state 參數
params = {
"response_type": "code",
"client_id": self.client_id,
"redirect_uri": self.redirect_uri,
"scope": self.scope
}
response = requests.get(
self.auth_endpoint,
params=params,
allow_redirects=False
)
if response.status_code in [301, 302, 303, 307]:
self.log_result(test_name, True, "接受無 state 參數的請求")
else:
self.log_result(test_name, False, "要求 state 參數")
def test_pkce_enforcement(self):
"""測試 PKCE 強制執行"""
test_name = "PKCE 強制執行"
# 測試無 PKCE 參數
params = {
"response_type": "code",
"client_id": self.client_id,
"redirect_uri": self.redirect_uri,
"scope": self.scope,
"state": secrets.token_urlsafe(16)
}
response = requests.get(
self.auth_endpoint,
params=params,
allow_redirects=False
)
if response.status_code in [301, 302, 303, 307]:
self.log_result(test_name, True, "不強制要求 PKCE")
else:
self.log_result(test_name, False, "強制要求 PKCE")
def test_implicit_flow(self):
"""測試是否支援不安全的 Implicit Flow"""
test_name = "Implicit Flow 支援"
params = {
"response_type": "token",
"client_id": self.client_id,
"redirect_uri": self.redirect_uri,
"scope": self.scope,
"state": secrets.token_urlsafe(16)
}
response = requests.get(
self.auth_endpoint,
params=params,
allow_redirects=False
)
if response.status_code in [301, 302, 303, 307]:
self.log_result(test_name, True, "支援不安全的 Implicit Flow")
else:
self.log_result(test_name, False, "不支援 Implicit Flow")
def test_token_endpoint_auth(self):
"""測試 Token 端點認證"""
test_name = "Token 端點認證"
# 嘗試不提供 client_secret
data = {
"grant_type": "authorization_code",
"code": "fake_code",
"redirect_uri": self.redirect_uri,
"client_id": self.client_id
}
response = requests.post(self.token_endpoint, data=data)
# 如果回應不是認證錯誤,可能有問題
if response.status_code != 401:
self.log_result(test_name, True, "Token 端點可能不需要認證")
else:
self.log_result(test_name, False, "Token 端點需要認證")
def generate_report(self):
"""產生測試報告"""
print("\n" + "="*60)
print("OAuth 2.0 安全測試報告")
print("="*60 + "\n")
vulnerabilities = [r for r in self.results if r['vulnerable']]
print(f"總測試項目: {len(self.results)}")
print(f"發現漏洞: {len(vulnerabilities)}")
print(f"安全項目: {len(self.results) - len(vulnerabilities)}")
if vulnerabilities:
print("\n發現的漏洞:")
for vuln in vulnerabilities:
print(f" - {vuln['test']}: {vuln['details']}")
return {
"total_tests": len(self.results),
"vulnerabilities_found": len(vulnerabilities),
"details": self.results
}
def run_all_tests(self):
"""執行所有測試"""
print("開始 OAuth 2.0 安全測試...\n")
self.test_redirect_uri_validation()
self.test_state_parameter()
self.test_pkce_enforcement()
self.test_implicit_flow()
self.test_token_endpoint_auth()
return self.generate_report()
if __name__ == "__main__":
config = {
"auth_endpoint": "https://target.com/oauth/authorize",
"token_endpoint": "https://target.com/oauth/token",
"client_id": "your_client_id",
"client_secret": "your_client_secret",
"redirect_uri": "https://your-app.com/callback",
"scope": "openid profile email"
}
tester = OAuth2SecurityTester(config)
report = tester.run_all_tests()
|