-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdigestauth.py
138 lines (122 loc) · 5.23 KB
/
digestauth.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
#!/usr/bin/python
"""
HTTP Digest Authentication
http://code.activestate.com/recipes/302378-digest-authentication/
http://pythonweb.org/projects/webmodules/doc/0.5.3/html_multipage/lib/example-wsgiAuth.html
"""
import md5
import time
class DigestAuth:
def __init__(self, realm, users):
"""Incomplete Python implementation of Digest Authentication.
For the full specification. http://www.faqs.org/rfcs/rfc2617.html
realm = AuthName in httpd.conf
users = a dict of users containing {username:password}
"""
self.realm = realm
self.users = users
self._headers= []
self.params = {}
def H(self, data):
return md5.md5(data).hexdigest()
def KD(self, secret, data):
return self.H(secret + ":" + data)
def A1(self):
# If the "algorithm" directive's value is "MD5" or is
# unspecified, then A1 is:
# A1 = unq(username-value) ":" unq(realm-value) ":" passwd
username = self.params["username"]
passwd = self.users.get(username, "")
return "%s:%s:%s" % (username, self.realm, passwd)
# This is A1 if qop is set
# A1 = H( unq(username-value) ":" unq(realm-value) ":" passwd )
# ":" unq(nonce-value) ":" unq(cnonce-value)
def A2(self):
# If the "qop" directive's value is "auth" or is unspecified, then A2 is:
# A2 = Method ":" digest-uri-value
return self.method + ":" + self.uri
# Not implemented
# If the "qop" value is "auth-int", then A2 is:
# A2 = Method ":" digest-uri-value ":" H(entity-body)
def response(self):
if self.params.has_key("qop"):
# Check? and self.params["qop"].lower()=="auth":
# If the "qop" value is "auth" or "auth-int":
# request-digest = <"> < KD ( H(A1), unq(nonce-value)
# ":" nc-value
# ":" unq(cnonce-value)
# ":" unq(qop-value)
# ":" H(A2)
# ) <">
return self.KD(self.H(self.A1()), \
self.params["nonce"]
+ ":" + self.params["nc"]
+ ":" + self.params["cnonce"]
+ ":" + self.params["qop"]
+ ":" + self.H(self.A2()))
else:
# If the "qop" directive is not present (this construction is
# for compatibility with RFC 2069):
# request-digest =
# <"> < KD ( H(A1), unq(nonce-value) ":" H(A2) ) > <">
return self.KD(self.H(self.A1()), \
self.params["nonce"] + ":" + self.H(self.A2()))
def parseHeader(self, authheader):
try:
n = len("Digest ")
authheader = authheader[n:].strip()
items = authheader.split(", ")
keyvalues = [i.split("=", 1) for i in items]
keyvalues = [(k.strip(), v.strip().replace('"', ''))
for k, v in keyvalues]
self.params = dict(keyvalues)
except:
self.params = {}
def _returnTuple(self, code):
username = self.params["username"] if "username" in self.params else ""
return (code, self._headers, username)
def _createNonce(self):
return md5.md5("%d:%s" % (time.time(), self.realm)).hexdigest()
def setUsers(self, users):
self.users = users
def createAuthheaer(self):
self._headers.append((
"WWW-Authenticate",
'Digest realm="%s", nonce="%s", algorithm="MD5", qop="auth"'
% (self.realm, self._createNonce())
))
def authenticate(self, method, uri, authheader = ""):
""" Check the response for this method and uri with authheader
returns a tuple with:
- HTTP_CODE
- a tuple with header info (key, value) or None
- and the username which was authenticated or None
"""
self.method = method
self.uri = uri
if authheader.strip() == "":
self.createAuthheaer()
return self._returnTuple(401)
self.parseHeader(authheader)
if not len(self.params):
return self._returnTuple(400)
# Check for required parameters
required = ["username", "realm", "nonce", "uri", "response"]
for k in required:
if not self.params.has_key(k):
return self._returnTuple(400)
# If the user is unknown we can deny access right away
if not self.users.has_key(self.params["username"]):
self.createAuthheaer()
return self._returnTuple(401)
# If qop is sent then cnonce and cn MUST be present
if self.params.has_key("qop"):
if not self.params.has_key("cnonce") \
and self.params.has_key("cn"):
return self._returnTuple(400)
# All else is OK, now check the response.
if self.response() == self.params["response"]:
return self._returnTuple(200)
else:
self.createAuthheaer()
return self._returnTuple(401)