-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathgoogle_play_integrity.py
More file actions
165 lines (139 loc) · 6.28 KB
/
google_play_integrity.py
File metadata and controls
165 lines (139 loc) · 6.28 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
import logging
from django.conf import settings
from google.oauth2 import service_account
from google.oauth2.service_account import Credentials
from googleapiclient.discovery import build
from users.models import DeviceIntegritySample
from utils.app_integrity.exceptions import (
AccountDetailsError,
AppIntegrityError,
DeviceIntegrityError,
DuplicateSampleRequestError,
IntegrityRequestError,
)
from utils.app_integrity.schemas import AccountDetails, AppIntegrity, DeviceIntegrity, RequestDetails, VerdictResponse
logger = logging.getLogger(__name__)
APP_PACKAGE_NAME = "org.commcare.dalvik"
GOOGLE_SERVICE_NAME = "playintegrity"
class AppIntegrityService:
"""
Verifies the application integrity of the app using Google Play Integrity API.
"""
def __init__(self, token: str, request_hash: str, app_package: str | None = None, is_demo_user: bool = False):
self.token = token
self.request_hash = request_hash
self.package_name = app_package or APP_PACKAGE_NAME
self.is_demo_user = is_demo_user
@property
def evaluators(self):
"""
Returns a list of functions that evaluate the verdict.
The order of evaluation matters, as some checks depend on previous ones.
"""
return [
lambda x: self.check_request_details(x.requestDetails),
lambda x: self.check_app_integrity(x.appIntegrity),
lambda x: self.check_device_integrity(x.deviceIntegrity),
lambda x: self.check_account_details(x.accountDetails),
]
def verify_integrity(self):
"""
Raises an exception if the app integrity is compromised, otherwise does nothing.
"""
raw_verdict = self.obtain_verdict()
self.analyze_verdict(self.parse_raw_verdict(raw_verdict))
def obtain_verdict(self) -> dict:
"""
This method uses the Google Play Integrity API to decode the integrity token
Documentation:
https://github.com/googleapis/google-api-python-client/blob/main/docs/start.md#building-and-calling-a-service
"""
service_spec = {
"serviceName": GOOGLE_SERVICE_NAME,
"version": "v1",
"credentials": self._google_service_account_credentials,
}
with build(**service_spec) as service:
body = {"integrityToken": self.token}
response = service.v1().decodeIntegrityToken(packageName=self.package_name, body=body).execute()
return response["tokenPayloadExternal"]
def parse_raw_verdict(self, raw_verdict: dict) -> VerdictResponse:
logger.info(f"Integrity token verdict for app({self.package_name}): {raw_verdict}")
return VerdictResponse.from_dict(raw_verdict)
@property
def _google_service_account_credentials(self) -> Credentials:
if not settings.GOOGLE_APPLICATION_CREDENTIALS:
raise Exception("GOOGLE_APPLICATION_CREDENTIALS must be set")
return service_account.Credentials.from_service_account_info(
settings.GOOGLE_APPLICATION_CREDENTIALS,
scopes=["https://www.googleapis.com/auth/playintegrity"],
)
def analyze_verdict(self, verdict: VerdictResponse):
"""
Checks the verdict and raises appropriate exceptions if
the app integrity is compromised.
"""
[evaluator(verdict) for evaluator in self.evaluators]
def check_request_details(self, request_details: RequestDetails):
if request_details.requestHash != self.request_hash:
raise IntegrityRequestError("Request hash mismatch")
if request_details.requestPackageName != self.package_name:
raise IntegrityRequestError("Request package name mismatch")
def check_app_integrity(self, app_integrity: AppIntegrity):
if app_integrity.packageName != self.package_name:
raise AppIntegrityError("App package name mismatch")
def check_device_integrity(self, device_integrity: DeviceIntegrity):
verdicts = device_integrity.deviceRecognitionVerdict
if self.is_demo_user and "MEETS_VIRTUAL_INTEGRITY" in verdicts:
return
if "MEETS_DEVICE_INTEGRITY" not in verdicts:
raise DeviceIntegrityError("Device integrity compromised")
def check_account_details(self, account_details: AccountDetails):
if self.is_demo_user:
return
verdict = account_details.appLicensingVerdict
if verdict == "UNLICENSED":
raise AccountDetailsError("Account not licensed")
def log_sample_request(self, request_id: str, device_id: str):
"""
Performs a sampling request to log the integrity check results.
"""
raw_verdict = self.obtain_verdict()
verdict = self.parse_raw_verdict(raw_verdict)
passed_request_check = True
passed_app_integrity_check = True
passed_device_integrity_check = True
passed_account_details_check = True
for evaluator in self.evaluators:
try:
evaluator(verdict)
except IntegrityRequestError:
passed_request_check = False
except AppIntegrityError:
passed_app_integrity_check = False
except DeviceIntegrityError:
passed_device_integrity_check = False
except AccountDetailsError:
passed_account_details_check = False
check_passed = (
passed_request_check
and passed_app_integrity_check
and passed_device_integrity_check
and passed_account_details_check
)
sample, created = DeviceIntegritySample.objects.get_or_create(
request_id=request_id,
defaults={
"device_id": device_id,
"is_demo_user": self.is_demo_user,
"google_verdict": raw_verdict,
"passed": check_passed,
"passed_request_check": passed_request_check,
"passed_app_integrity_check": passed_app_integrity_check,
"passed_device_integrity_check": passed_device_integrity_check,
"passed_account_details_check": passed_account_details_check,
},
)
if not created:
raise DuplicateSampleRequestError("Duplicate sample request")
return sample