Skip to content

Commit 4d1c353

Browse files
Vasist10AndreyMarkinPPC
authored andcommitted
[bid-manager-api] feat: Add query cache and report reuse
1 parent deb861d commit 4d1c353

File tree

2 files changed

+204
-26
lines changed

2 files changed

+204
-26
lines changed

libs/community/google/bid-manager/garf_bid_manager/api_clients.py

Lines changed: 92 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
import csv
1717
import io
18+
import json
1819
import logging
1920
import os
2021
import pathlib
@@ -34,6 +35,8 @@
3435
_DEFAULT_API_SCOPES = ['https://www.googleapis.com/auth/doubleclickbidmanager']
3536

3637
_SERVICE_ACCOUNT_CREDENTIALS_FILE = str(pathlib.Path.home() / 'dbm.json')
38+
_QUERY_CACHE_ENV = 'GARF_BID_MANAGER_QUERY_CACHE_DIR'
39+
_DEFAULT_QUERY_CACHE_DIR = pathlib.Path.home() / '.garf/bid_manager'
3740

3841

3942
class BidManagerApiClientError(exceptions.BidManagerApiError):
@@ -50,6 +53,7 @@ def __init__(
5053
'GARF_BID_MANAGER_CREDENTIALS_FILE', _SERVICE_ACCOUNT_CREDENTIALS_FILE
5154
),
5255
auth_mode: Literal['oauth', 'service_account'] = 'oauth',
56+
query_cache_dir: str | pathlib.Path | None = None,
5357
**kwargs: str,
5458
) -> None:
5559
"""Initializes BidManagerApiClient."""
@@ -59,6 +63,10 @@ def __init__(
5963
self.kwargs = kwargs
6064
self._client = None
6165
self._credentials = None
66+
cache_dir = query_cache_dir or os.getenv(_QUERY_CACHE_ENV)
67+
self.query_cache_dir = (
68+
pathlib.Path(cache_dir) if cache_dir else _DEFAULT_QUERY_CACHE_DIR
69+
)
6270

6371
@property
6472
def credentials(self):
@@ -87,36 +95,36 @@ def client(self):
8795
def get_response(
8896
self, request: query_editor.BidManagerApiQuery, **kwargs: str
8997
) -> api_clients.GarfApiResponse:
90-
query = _build_request(request)
91-
query_response = self.client.queries().create(body=query).execute()
92-
report_response = (
93-
self.client.queries()
94-
.run(queryId=query_response['queryId'], synchronous=False)
95-
.execute()
96-
)
97-
query_id = report_response['key']['queryId']
98-
report_id = report_response['key']['reportId']
99-
logging.info(
100-
'Query %s is running, report %s has been created and is '
101-
'currently being generated.',
102-
query_id,
103-
report_id,
104-
)
98+
query_hash = request.hash
99+
query_id = None
100+
report_id = None
101+
status = None
105102

106-
get_request = (
107-
self.client.queries()
108-
.reports()
109-
.get(
110-
queryId=report_response['key']['queryId'],
111-
reportId=report_response['key']['reportId'],
103+
if cached_ids := self._load_cached_query_reference(query_hash):
104+
cached_query_id, cached_report_id = cached_ids
105+
logging.info(
106+
'Attempting to reuse DV360 report %s for query hash %s.',
107+
cached_report_id,
108+
query_hash,
112109
)
113-
)
110+
try:
111+
status = self._get_report_status(cached_query_id, cached_report_id)
112+
query_id, report_id = cached_query_id, cached_report_id
113+
except Exception as exc: # pylint: disable=broad-except
114+
logging.warning(
115+
'Unable to reuse DV360 report %s (hash %s), regenerating. Reason: %s',
116+
cached_report_id,
117+
query_hash,
118+
exc,
119+
)
120+
status = None
114121

115-
status = _check_if_report_is_done(get_request)
122+
if status is None:
123+
query_id, report_id = self._run_query(request)
124+
self._save_cached_query_reference(query_hash, query_id, report_id)
125+
status = self._get_report_status(query_id, report_id)
116126

117-
logging.info(
118-
'Report %s generated successfully. Now downloading.', report_id
119-
)
127+
logging.info('Report %s generated successfully. Now downloading.', report_id)
120128
with smart_open.open(
121129
status['metadata']['googleCloudStoragePath'], 'r', encoding='utf-8'
122130
) as f:
@@ -144,6 +152,64 @@ def _get_oauth_credentials(self):
144152
)
145153

146154

155+
def _run_query(
156+
self, request: query_editor.BidManagerApiQuery
157+
) -> tuple[str, str]:
158+
query = _build_request(request)
159+
query_response = self.client.queries().create(body=query).execute()
160+
report_response = (
161+
self.client.queries()
162+
.run(queryId=query_response['queryId'], synchronous=False)
163+
.execute()
164+
)
165+
query_id = report_response['key']['queryId']
166+
report_id = report_response['key']['reportId']
167+
logging.info(
168+
'Query %s is running, report %s has been created and is currently '
169+
'being generated.',
170+
query_id,
171+
report_id,
172+
)
173+
return query_id, report_id
174+
175+
def _get_report_status(self, query_id: str, report_id: str):
176+
get_request = (
177+
self.client.queries()
178+
.reports()
179+
.get(
180+
queryId=query_id,
181+
reportId=report_id,
182+
)
183+
)
184+
return _check_if_report_is_done(get_request)
185+
186+
def _load_cached_query_reference(
187+
self, query_hash: str
188+
) -> tuple[str, str] | None:
189+
cache_path = self.query_cache_dir / f'{query_hash}.txt'
190+
if not cache_path.is_file():
191+
return None
192+
try:
193+
with open(cache_path, 'r', encoding='utf-8') as cache_file:
194+
data = json.load(cache_file)
195+
return data['query_id'], data['report_id']
196+
except (OSError, ValueError, KeyError) as exc:
197+
logging.warning(
198+
'Failed to load DV360 cache file %s, ignoring. Reason: %s',
199+
cache_path,
200+
exc,
201+
)
202+
return None
203+
204+
def _save_cached_query_reference(
205+
self, query_hash: str, query_id: str, report_id: str
206+
) -> None:
207+
self.query_cache_dir.mkdir(parents=True, exist_ok=True)
208+
cache_path = self.query_cache_dir / f'{query_hash}.txt'
209+
with open(cache_path, 'w', encoding='utf-8') as cache_file:
210+
json.dump({'query_id': query_id, 'report_id': report_id}, cache_file)
211+
212+
147213
def _build_request(request: query_editor.BidManagerApiQuery):
148214
"""Builds Bid Manager API query object from BidManagerApiQuery."""
149215
query = {

libs/community/google/bid-manager/tests/unit/test_api_clients.py

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,11 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15+
import contextlib
16+
import io
17+
import json
18+
from unittest import mock
19+
1520
import pytest
1621
from garf_bid_manager import api_clients, query_editor
1722

@@ -85,3 +90,110 @@ def test_process_api_response():
8590
]
8691

8792
assert result == expected_result
93+
94+
95+
def _build_query_spec():
96+
query = """
97+
SELECT
98+
advertiser AS advertiser,
99+
metric_impressions AS impressions
100+
FROM standard
101+
WHERE advertiser = 1
102+
"""
103+
return query_editor.BidManagerApiQuery(text=query, title='test').generate()
104+
105+
106+
def _mock_report_status():
107+
return {
108+
'key': {'queryId': '123', 'reportId': '456'},
109+
'metadata': {
110+
'status': {'state': 'DONE'},
111+
'googleCloudStoragePath': 'gs://bucket/report.csv',
112+
},
113+
}
114+
115+
116+
def _patch_open(monkeypatch, data: str):
117+
@contextlib.contextmanager
118+
def fake_open(*_, **__):
119+
yield io.StringIO(data)
120+
121+
monkeypatch.setattr(api_clients.smart_open, 'open', fake_open)
122+
123+
124+
def test_get_response_reuses_cached_report(tmp_path, monkeypatch):
125+
spec = _build_query_spec()
126+
cache_file = tmp_path / f'{spec.hash}.txt'
127+
cache_file.write_text(
128+
json.dumps({'query_id': '123', 'report_id': '456'}),
129+
encoding='utf-8',
130+
)
131+
132+
client = api_clients.BidManagerApiClient(query_cache_dir=tmp_path)
133+
134+
mock_client = mock.Mock()
135+
mock_queries = mock.Mock()
136+
mock_reports = mock.Mock()
137+
mock_get_request = mock.Mock()
138+
mock_get_request.execute.return_value = _mock_report_status()
139+
mock_reports.get.return_value = mock_get_request
140+
mock_queries.reports.return_value = mock_reports
141+
mock_client.queries.return_value = mock_queries
142+
client._client = mock_client
143+
144+
_patch_open(monkeypatch, 'col1,col2\nvalue1,value2\n')
145+
146+
response = client.get_response(spec)
147+
148+
assert response.results == [
149+
{'FILTER_ADVERTISER': 'value1', 'METRIC_IMPRESSIONS': 'value2'}
150+
]
151+
mock_queries.create.assert_not_called()
152+
mock_queries.run.assert_not_called()
153+
154+
155+
def test_get_response_creates_and_caches_report(tmp_path, monkeypatch):
156+
spec = _build_query_spec()
157+
client = api_clients.BidManagerApiClient(query_cache_dir=tmp_path)
158+
159+
mock_client = mock.Mock()
160+
mock_queries = mock.Mock()
161+
mock_reports = mock.Mock()
162+
163+
mock_create = mock.Mock()
164+
mock_create.execute.return_value = {'queryId': '999'}
165+
mock_run = mock.Mock()
166+
mock_run.execute.return_value = {
167+
'key': {'queryId': '999', 'reportId': '555'}
168+
}
169+
mock_get_request = mock.Mock()
170+
mock_get_request.execute.return_value = {
171+
'key': {'queryId': '999', 'reportId': '555'},
172+
'metadata': {
173+
'status': {'state': 'DONE'},
174+
'googleCloudStoragePath': 'gs://bucket/report.csv',
175+
},
176+
}
177+
178+
mock_reports.get.return_value = mock_get_request
179+
mock_queries.create.return_value = mock_create
180+
mock_queries.run.return_value = mock_run
181+
mock_queries.reports.return_value = mock_reports
182+
mock_client.queries.return_value = mock_queries
183+
client._client = mock_client
184+
185+
_patch_open(monkeypatch, 'col1,col2\nvalue1,value2\n')
186+
187+
response = client.get_response(spec)
188+
189+
assert response.results == [
190+
{'FILTER_ADVERTISER': 'value1', 'METRIC_IMPRESSIONS': 'value2'}
191+
]
192+
mock_queries.create.assert_called_once()
193+
mock_queries.run.assert_called_once()
194+
cache_file = tmp_path / f'{spec.hash}.txt'
195+
assert cache_file.is_file()
196+
assert json.loads(cache_file.read_text(encoding='utf-8')) == {
197+
'query_id': '999',
198+
'report_id': '555',
199+
}

0 commit comments

Comments
 (0)