Skip to content

Commit 3d8df11

Browse files
authored
Merge pull request #118 from Flagsmith/feat/webhook-validate
feat: Add utility functions for webhooks
2 parents d59b7a3 + 9ee29e8 commit 3d8df11

File tree

3 files changed

+93
-1
lines changed

3 files changed

+93
-1
lines changed

flagsmith/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
from . import webhooks
12
from .flagsmith import Flagsmith
23

3-
__all__ = ("Flagsmith",)
4+
__all__ = ("Flagsmith", "webhooks")

flagsmith/webhooks.py

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
import hashlib
2+
import hmac
3+
from typing import Union
4+
5+
6+
def generate_signature(
7+
request_body: Union[str, bytes],
8+
shared_secret: str,
9+
) -> str:
10+
"""Generates a signature for a webhook request body using HMAC-SHA256.
11+
12+
:param request_body: The raw request body, as string or bytes.
13+
:param shared_secret: The shared secret configured for this specific webhook.
14+
:return: The hex-encoded signature.
15+
"""
16+
if isinstance(request_body, str):
17+
request_body = request_body.encode()
18+
19+
shared_secret_bytes = shared_secret.encode()
20+
21+
return hmac.new(
22+
key=shared_secret_bytes,
23+
msg=request_body,
24+
digestmod=hashlib.sha256,
25+
).hexdigest()
26+
27+
28+
def verify_signature(
29+
request_body: Union[str, bytes],
30+
received_signature: str,
31+
shared_secret: str,
32+
) -> bool:
33+
"""Verifies a webhook's signature to determine if the request was sent by Flagsmith.
34+
35+
:param request_body: The raw request body, as string or bytes.
36+
:param received_signature: The signature as received in the X-Flagsmith-Signature request header.
37+
:param shared_secret: The shared secret configured for this specific webhook.
38+
:return: True if the signature is valid, False otherwise.
39+
"""
40+
expected_signature = generate_signature(request_body, shared_secret)
41+
return hmac.compare_digest(expected_signature, received_signature)

tests/test_webhooks.py

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
import json
2+
3+
from flagsmith.webhooks import generate_signature, verify_signature
4+
5+
6+
def test_generate_signature() -> None:
7+
# Given
8+
request_body = json.dumps({"data": {"foo": 123}})
9+
shared_secret = "shh"
10+
11+
# When
12+
signature = generate_signature(request_body, shared_secret)
13+
14+
# Then
15+
assert isinstance(signature, str)
16+
assert len(signature) == 64 # SHA-256 hex digest is 64 characters
17+
18+
19+
def test_verify_signature_valid() -> None:
20+
# Given
21+
request_body = json.dumps({"data": {"foo": 123}})
22+
shared_secret = "shh"
23+
24+
# When
25+
signature = generate_signature(request_body, shared_secret)
26+
27+
# Then
28+
assert verify_signature(
29+
request_body=request_body,
30+
received_signature=signature,
31+
shared_secret=shared_secret,
32+
)
33+
# Test with bytes instead of str
34+
assert verify_signature(
35+
request_body=request_body.encode(),
36+
received_signature=signature,
37+
shared_secret=shared_secret,
38+
)
39+
40+
41+
def test_verify_signature_invalid() -> None:
42+
# Given
43+
request_body = json.dumps({"event": "flag_updated", "data": {"id": 123}})
44+
45+
# Then
46+
assert not verify_signature(
47+
request_body=request_body,
48+
received_signature="bad",
49+
shared_secret="?",
50+
)

0 commit comments

Comments
 (0)