|
6 | 6 |
|
7 | 7 | class JWTAuth: |
8 | 8 | encoder = jwt.encode |
| 9 | + decoder = jwt.decode |
| 10 | + algorithm = "HS256" |
| 11 | + |
| 12 | + class InvalidToken(Exception): |
| 13 | + pass |
9 | 14 |
|
10 | 15 | def __init__(self, secret: str): |
11 | 16 | self.secret = secret |
12 | 17 |
|
13 | 18 | def issue_token(self, client: str) -> str: |
14 | | - return self.encoder(self.get_claims(client), self.secret, algorithm="HS256") |
| 19 | + return self.encoder( |
| 20 | + self.get_claims(client), self.secret, algorithm=self.algorithm |
| 21 | + ) |
| 22 | + |
| 23 | + def verify_token(self, token: str, client: str) -> dict: |
| 24 | + data = self.decoder( |
| 25 | + token, |
| 26 | + self.secret, |
| 27 | + algorithms=self.algorithm, |
| 28 | + audience=self._audience, |
| 29 | + issuer=self._issuer, |
| 30 | + ) |
| 31 | + if data.get("jti") != client: |
| 32 | + raise self.InvalidToken(token) |
| 33 | + return data |
15 | 34 |
|
16 | | - @staticmethod |
17 | | - def get_claims(client: str, ttl_hours=24) -> dict: |
| 35 | + def get_claims(self, client: str, ttl_hours=24) -> dict: |
18 | 36 | return { |
19 | | - "Audience": "localhost", |
20 | | - "ExpiresAt": datetime.now(tz=timezone.utc) + timedelta(hours=ttl_hours), |
21 | | - "Id": client, |
22 | | - "IssuedAt": datetime.now(tz=timezone.utc), |
23 | | - "Issuer": f"MargayPythonSDK-{socket.gethostname()}", |
24 | | - "Subject": "client", |
| 37 | + "aud": self._audience, |
| 38 | + "exp": datetime.now(tz=timezone.utc) + timedelta(hours=ttl_hours), |
| 39 | + "jti": client, |
| 40 | + "iat": datetime.now(tz=timezone.utc), |
| 41 | + "iss": self._issuer, |
| 42 | + "sub": "client", |
25 | 43 | } |
| 44 | + |
| 45 | + _audience = "localhost" |
| 46 | + _issuer = f"MargayPythonSDK-{socket.gethostname()}" |
0 commit comments