Skip to content

Commit 057b044

Browse files
committed
fix: log better
1 parent 70cacec commit 057b044

File tree

6 files changed

+58
-18
lines changed

6 files changed

+58
-18
lines changed

lavender_data/client/api.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -377,8 +377,7 @@ def get_next_item(
377377
no_cache=no_cache,
378378
max_retry_count=max_retry_count,
379379
)
380-
content = self._check_response(response).payload.read()
381-
return deserialize_sample(content)
380+
return self._check_response(response).payload.read()
382381

383382
def submit_next_item(
384383
self,
@@ -406,8 +405,7 @@ def get_submitted_result(self, iteration_id: str, cache_key: str):
406405
)
407406
if response.status_code == 202:
408407
raise LavenderDataApiError(response.content.decode("utf-8"))
409-
content = self._check_response(response).payload.read()
410-
return deserialize_sample(content)
408+
return self._check_response(response).payload.read()
411409

412410
def complete_index(self, iteration_id: str, index: int):
413411
with self._get_client() as client:

lavender_data/client/cli/api_call.py

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
from lavender_data.client import LavenderDataClient
2+
from lavender_data.serialize import deserialize_sample
23

34

45
def _api(api_url: str, api_key: str):
@@ -77,11 +78,13 @@ def get_next_item(
7778
no_cache: bool,
7879
max_retry_count: int,
7980
):
80-
return _api(api_url=api_url, api_key=api_key).get_next_item(
81-
iteration_id=iteration_id,
82-
rank=rank,
83-
no_cache=no_cache,
84-
max_retry_count=max_retry_count,
81+
return deserialize_sample(
82+
_api(api_url=api_url, api_key=api_key).get_next_item(
83+
iteration_id=iteration_id,
84+
rank=rank,
85+
no_cache=no_cache,
86+
max_retry_count=max_retry_count,
87+
)
8588
)
8689

8790

@@ -102,8 +105,10 @@ def submit_next_item(
102105

103106

104107
def get_submitted_result(api_url: str, api_key: str, iteration_id: str, cache_key: str):
105-
return _api(api_url=api_url, api_key=api_key).get_submitted_result(
106-
iteration_id=iteration_id, cache_key=cache_key
108+
return deserialize_sample(
109+
_api(api_url=api_url, api_key=api_key).get_submitted_result(
110+
iteration_id=iteration_id, cache_key=cache_key
111+
)
107112
)
108113

109114

lavender_data/client/iteration.py

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
from typing import Optional, Union, Literal
44
from concurrent.futures import ThreadPoolExecutor, Future, as_completed
55

6+
from lavender_data.serialize import deserialize_sample, DeserializeException
67
from lavender_data.client.api import (
78
get_client,
89
LavenderDataClient,
@@ -140,6 +141,8 @@ def __init__(
140141
self._api_key = api_key
141142
self._api = _api(self._api_url, self._api_key)
142143

144+
self._bytes = 0
145+
143146
self.id = self._iteration_id
144147

145148
def torch(
@@ -215,19 +218,21 @@ def _set_last_indices(self, sample_or_batch):
215218

216219
def _get_next_item(self):
217220
try:
218-
sample_or_batch = self._api.get_next_item(
221+
serialized = self._api.get_next_item(
219222
iteration_id=self._iteration_id,
220223
rank=self._rank,
221224
no_cache=self._no_cache,
222225
max_retry_count=self._max_retry_count,
223226
)
227+
self._bytes += len(serialized)
228+
return deserialize_sample(serialized)
224229
except LavenderDataApiError as e:
225230
if "No more indices to pop" in str(e):
226231
raise StopIteration
227232
else:
228233
raise e
229-
230-
return sample_or_batch
234+
except DeserializeException as e:
235+
raise ValueError(f"Failed to deserialize sample: {e}")
231236

232237
def _submit_next_item(self) -> str:
233238
cache_key = self._api.submit_next_item(
@@ -240,10 +245,12 @@ def _submit_next_item(self) -> str:
240245

241246
def _get_submitted_result(self, cache_key: str):
242247
try:
243-
return self._api.get_submitted_result(
248+
serialized = self._api.get_submitted_result(
244249
iteration_id=self._iteration_id,
245250
cache_key=cache_key,
246251
)
252+
self._bytes += len(serialized)
253+
return deserialize_sample(serialized)
247254
except LavenderDataApiError as e:
248255
if "Data is still being processed" in str(e):
249256
return None
@@ -253,6 +260,8 @@ def _get_submitted_result(self, cache_key: str):
253260
raise StopIteration
254261
else:
255262
raise e
263+
except DeserializeException as e:
264+
raise ValueError(f"Failed to deserialize sample: {e}")
256265

257266
def __next__(self):
258267
self._complete_last_indices()

lavender_data/serialize.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,10 @@ def serialize_sample(sample: dict):
128128
return header + body
129129

130130

131+
class DeserializeException(Exception):
132+
pass
133+
134+
131135
def deserialize_sample(content: bytes, strict: bool = True):
132136
header_length, current = detach_length(content)
133137
keys = json.loads(current[:header_length].decode("utf-8"))
@@ -152,7 +156,7 @@ def deserialize_sample(content: bytes, strict: bool = True):
152156
warnings.warn(msg)
153157
values.append(None)
154158
else:
155-
raise ValueError(msg)
159+
raise DeserializeException(msg)
156160
current = value[value_length:]
157161
i += 1
158162

lavender_data/server/app.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,8 +126,13 @@ async def add_process_time_header(request: Request, call_next):
126126
process_time = time.perf_counter() - start_time
127127

128128
if log_filter(request, response):
129+
path = (
130+
f"{request.url.path}?{request.url.query}"
131+
if request.url.query
132+
else request.url.path
133+
)
129134
logger.info(
130-
f"{request.client.host}:{request.client.port} - {request.method} {request.url.path}?{request.url.query} {response.status_code} {process_time:.2f}s"
135+
f"{request.client.host}:{request.client.port} - {request.method} {path} {response.status_code} {process_time:.2f}s"
131136
)
132137

133138
return response

lavender_data/server/iteration/process.py

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,21 @@ def process_next_samples(
165165
raise error
166166

167167

168+
def _format_number(number: int):
169+
if number < 1000:
170+
return f"{number} "
171+
elif number < 1000**2:
172+
return f"{number/1000:.2f} K"
173+
elif number < 1000**3:
174+
return f"{number/1000**2:.2f} M"
175+
else:
176+
return f"{number/1000**3:.2f} G"
177+
178+
179+
def _ms(seconds: float):
180+
return f"{seconds*1000:.2f} ms"
181+
182+
168183
@pool_task()
169184
def process_next_samples_and_store(
170185
params: ProcessNextSamplesParams,
@@ -184,7 +199,11 @@ def process_next_samples_and_store(
184199
shared_memory.set(cache_key, content, ex=cache_ttl)
185200
_store_time = time.perf_counter()
186201
logger.debug(
187-
f"Done processing {cache_key} in {_store_time-_start:.2f}s, process: {_process_time-_start:.2f}s, serialize: {_serialize_time - _process_time:.2f}s, store: {_store_time - _serialize_time:.2f}s, size: {len(content)} bytes"
202+
f"Done processing {cache_key} in {_ms(_store_time - _start)}, "
203+
f"process: {_ms(_process_time - _start)}, "
204+
f"serialize: {_ms(_serialize_time - _process_time)}, "
205+
f"store: {_ms(_store_time - _serialize_time)}, "
206+
f"size: {_format_number(len(content))}B"
188207
)
189208
except ProcessNextSamplesException as e:
190209
shared_memory.set(cache_key, f"processing_error:{e.json()}", ex=cache_ttl)

0 commit comments

Comments
 (0)