Skip to content

Commit 9928411

Browse files
committed
update uploader.py
1 parent 8b86858 commit 9928411

6 files changed

Lines changed: 159 additions & 137 deletions

File tree

mapillary_tools/api_v4.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
1+
import enum
12
import logging
23
import os
34
import ssl
45
import typing as T
5-
import enum
66
from json import dumps
77

88
import requests

mapillary_tools/authenticate.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,10 @@
77
import sys
88
import typing as T
99

10-
import requests
1110
import jsonschema
1211

12+
import requests
13+
1314
from . import api_v4, config, constants, exceptions, types
1415

1516

mapillary_tools/upload.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -516,7 +516,7 @@ def _upload_everything(
516516
T.cast(T.BinaryIO, camm_fp),
517517
upload_api_v4.ClusterFileType.CAMM,
518518
video_metadata.md5sum,
519-
event_payload=event_payload,
519+
progress=event_payload,
520520
)
521521
except Exception as ex:
522522
raise UploadError(ex) from ex

mapillary_tools/upload_api_v4.py

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,7 @@
88

99
import requests
1010

11-
from .api_v4 import (
12-
request_get,
13-
request_post,
14-
REQUESTS_TIMEOUT,
15-
ClusterFileType,
16-
)
11+
from .api_v4 import ClusterFileType, request_get, request_post, REQUESTS_TIMEOUT
1712

1813
MAPILLARY_UPLOAD_ENDPOINT = os.getenv(
1914
"MAPILLARY_UPLOAD_ENDPOINT", "https://rupload.facebook.com/mapillary_public_uploads"

mapillary_tools/uploader.py

Lines changed: 153 additions & 127 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
import jsonschema
1616
import requests
1717

18-
from . import constants, exif_write, types, upload_api_v4, utils
18+
from . import api_v4, constants, exif_write, types, upload_api_v4, utils
1919

2020

2121
LOG = logging.getLogger(__name__)
@@ -28,6 +28,8 @@ class Progress(T.TypedDict, total=False):
2828
# File type
2929
file_type: str
3030

31+
begin_offset: int
32+
3133
# How many bytes has been uploaded so far since "upload_start"
3234
offset: int
3335

@@ -74,7 +76,7 @@ class UploadCancelled(Exception):
7476

7577

7678
class EventEmitter:
77-
events: dict[EventName, T.List]
79+
events: dict[EventName, list]
7880

7981
def __init__(self):
8082
self.events = {}
@@ -195,7 +197,7 @@ def __init__(
195197
self,
196198
user_items: types.UserItem,
197199
emitter: EventEmitter | None = None,
198-
chunk_size: int = upload_api_v4.DEFAULT_CHUNK_SIZE,
200+
chunk_size: int = 2 * 1024 * 1024, # 2MB
199201
dry_run=False,
200202
):
201203
jsonschema.validate(instance=user_items, schema=types.UserItemSchema)
@@ -219,7 +221,7 @@ def upload_zipfile(
219221
return None
220222

221223
final_event_payload: Progress = {
222-
**event_payload, # type: ignore
224+
# **event_payload, # type: ignore
223225
"sequence_image_count": len(namelist),
224226
}
225227

@@ -235,7 +237,7 @@ def upload_zipfile(
235237
zip_fp,
236238
upload_api_v4.ClusterFileType.ZIP,
237239
upload_md5sum,
238-
event_payload=final_event_payload,
240+
progress=final_event_payload,
239241
)
240242

241243
def upload_images(
@@ -269,15 +271,88 @@ def upload_images(
269271
ret[sequence_uuid] = cluster_id
270272
return ret
271273

274+
def _create_upload_service(
275+
self, session_key: str, cluster_filetype: upload_api_v4.ClusterFileType
276+
) -> upload_api_v4.UploadService:
277+
upload_service: upload_api_v4.UploadService
278+
279+
if self.dry_run:
280+
upload_service = upload_api_v4.FakeUploadService(
281+
user_access_token=self.user_items["user_upload_token"],
282+
session_key=session_key,
283+
cluster_filetype=cluster_filetype,
284+
)
285+
else:
286+
upload_service = upload_api_v4.UploadService(
287+
user_access_token=self.user_items["user_upload_token"],
288+
session_key=session_key,
289+
cluster_filetype=cluster_filetype,
290+
)
291+
292+
return upload_service
293+
294+
def _handle_upload_exception(self, ex: Exception, progress: Progress) -> None:
295+
retries = progress["retries"]
296+
begin_offset = progress.get("begin_offset")
297+
chunk_size = progress["chunk_size"]
298+
299+
if retries <= constants.MAX_UPLOAD_RETRIES and _is_retriable_exception(ex):
300+
if self.emitter:
301+
self.emitter.emit("upload_interrupted", progress)
302+
LOG.warning(
303+
# use %s instead of %d because offset could be None
304+
"Error uploading chunk_size %d at begin_offset %s: %s: %s",
305+
chunk_size,
306+
begin_offset,
307+
ex.__class__.__name__,
308+
str(ex),
309+
)
310+
# Keep things immutable here. Will increment retries in the caller
311+
retries += 1
312+
if _is_immediate_retry(ex):
313+
sleep_for = 0
314+
else:
315+
sleep_for = min(2**retries, 16)
316+
LOG.info(
317+
"Retrying in %d seconds (%d/%d)",
318+
sleep_for,
319+
retries,
320+
constants.MAX_UPLOAD_RETRIES,
321+
)
322+
if sleep_for:
323+
time.sleep(sleep_for)
324+
else:
325+
raise ex
326+
327+
def _finish_upload_retryable(
328+
self, upload_service: upload_api_v4.UploadService, file_handle: str
329+
):
330+
if self.dry_run:
331+
cluster_id = "0"
332+
else:
333+
resp = api_v4.finish_upload(
334+
self.user_items["user_upload_token"],
335+
file_handle,
336+
upload_service.cluster_filetype,
337+
organization_id=self.user_items.get("MAPOrganizationKey"),
338+
)
339+
340+
data = resp.json()
341+
cluster_id = data.get("cluster_id")
342+
343+
# TODO: validate cluster_id
344+
345+
return cluster_id
346+
272347
def upload_stream(
273348
self,
274349
fp: T.IO[bytes],
275350
cluster_filetype: upload_api_v4.ClusterFileType,
276351
upload_md5sum: str,
277-
event_payload: Progress | None = None,
352+
progress: Progress | None = None,
278353
) -> str | None:
279-
if event_payload is None:
280-
event_payload = {}
354+
if progress is None:
355+
progress = {}
281356

282357
fp.seek(0, io.SEEK_END)
283358
entity_size = fp.tell()
@@ -289,40 +364,79 @@ def upload_stream(
289364
}
290365
session_key = f"mly_tools_{upload_md5sum}{SUFFIX_MAP[cluster_filetype]}"
291366

292-
if self.dry_run:
293-
upload_service: upload_api_v4.UploadService = (
294-
upload_api_v4.FakeUploadService(
295-
user_access_token=self.user_items["user_upload_token"],
296-
session_key=session_key,
297-
organization_id=self.user_items.get("MAPOrganizationKey"),
298-
cluster_filetype=cluster_filetype,
299-
chunk_size=self.chunk_size,
367+
upload_service = self._create_upload_service(session_key, cluster_filetype)
368+
369+
progress["entity_size"] = entity_size
370+
progress["md5sum"] = upload_md5sum
371+
progress["chunk_size"] = self.chunk_size
372+
progress["retries"] = 0
373+
374+
if self.emitter:
375+
try:
376+
self.emitter.emit("upload_start", progress)
377+
except UploadCancelled:
378+
# throw in upload_start only
379+
return None
380+
381+
while True:
382+
try:
383+
file_handle = self._upload_stream_retryable(
384+
upload_service, fp, progress
300385
)
301-
)
302-
else:
303-
upload_service = upload_api_v4.UploadService(
304-
user_access_token=self.user_items["user_upload_token"],
305-
session_key=session_key,
306-
organization_id=self.user_items.get("MAPOrganizationKey"),
307-
cluster_filetype=cluster_filetype,
308-
chunk_size=self.chunk_size,
309-
)
386+
except Exception as ex:
387+
self._handle_upload_exception(ex, progress)
388+
else:
389+
break
310390

311-
final_event_payload: Progress = {
312-
**event_payload, # type: ignore
313-
"entity_size": entity_size,
314-
"md5sum": upload_md5sum,
315-
}
391+
progress["retries"] += 1
316392

317-
try:
318-
return _upload_stream_with_retries(
319-
upload_service,
320-
fp,
321-
event_payload=final_event_payload,
322-
emitter=self.emitter,
323-
)
324-
except UploadCancelled:
325-
return None
393+
if self.emitter:
394+
self.emitter.emit("upload_end", progress)
395+
396+
# TODO: retry here
397+
cluster_id = self._finish_upload_retryable(upload_service, file_handle)
398+
399+
if self.emitter:
400+
self.emitter.emit("upload_finished", progress)
401+
402+
return cluster_id
403+
404+
def _chunkize_byte_stream(
405+
self,
406+
stream: T.IO[bytes],
407+
progress: Progress,
408+
) -> T.Generator[bytes, None, None]:
409+
while True:
410+
data = stream.read(self.chunk_size)
411+
if not data:
412+
break
413+
yield data
414+
progress["offset"] += len(data)
415+
progress["chunk_size"] = len(data)
416+
if self.emitter:
417+
self.emitter.emit("upload_progress", progress)
418+
419+
def _upload_stream_retryable(
420+
self,
421+
upload_service: upload_api_v4.UploadService,
422+
fp: T.IO[bytes],
423+
progress: Progress,
424+
) -> str:
425+
begin_offset = upload_service.fetch_offset()
426+
427+
progress["begin_offset"] = begin_offset
428+
progress["offset"] = begin_offset
429+
430+
if self.emitter:
431+
self.emitter.emit("upload_fetch_offset", progress)
432+
433+
fp.seek(begin_offset, io.SEEK_SET)
434+
435+
shifted_chunks = self._chunkize_byte_stream(fp, progress)
436+
437+
file_handle = upload_service.upload_shifted_chunks(shifted_chunks, begin_offset)
438+
439+
return file_handle
326440

327441

328442
def _validate_metadatas(metadatas: T.Sequence[types.ImageMetadata]):
@@ -384,91 +498,3 @@ def _is_retriable_exception(ex: Exception):
384498
return True
385499

386500
return False
387-
388-
389-
def _setup_callback(emitter: EventEmitter, mutable_payload: Progress):
390-
def _callback(chunk: bytes, _):
391-
assert isinstance(emitter, EventEmitter)
392-
mutable_payload["offset"] += len(chunk)
393-
mutable_payload["chunk_size"] = len(chunk)
394-
emitter.emit("upload_progress", mutable_payload)
395-
396-
return _callback
397-
398-
399-
def _upload_stream_with_retries(
400-
upload_service: upload_api_v4.UploadService,
401-
fp: T.IO[bytes],
402-
event_payload: Progress | None = None,
403-
emitter: EventEmitter | None = None,
404-
) -> str:
405-
retries = 0
406-
407-
if event_payload is None:
408-
event_payload = {}
409-
410-
mutable_payload = T.cast(Progress, {**event_payload})
411-
412-
# when it progresses, we reset retries
413-
def _reset_retries(_, __):
414-
nonlocal retries
415-
retries = 0
416-
417-
if emitter:
418-
emitter.emit("upload_start", mutable_payload)
419-
420-
while True:
421-
fp.seek(0, io.SEEK_SET)
422-
begin_offset: int | None = None
423-
try:
424-
begin_offset = upload_service.fetch_offset()
425-
upload_service.callbacks = [_reset_retries]
426-
if emitter:
427-
mutable_payload["offset"] = begin_offset
428-
mutable_payload["retries"] = retries
429-
emitter.emit("upload_fetch_offset", mutable_payload)
430-
upload_service.callbacks.append(
431-
_setup_callback(emitter, mutable_payload)
432-
)
433-
file_handle = upload_service.upload(fp, offset=begin_offset)
434-
except Exception as ex:
435-
if retries < constants.MAX_UPLOAD_RETRIES and _is_retriable_exception(ex):
436-
if emitter:
437-
emitter.emit("upload_interrupted", mutable_payload)
438-
LOG.warning(
439-
# use %s instead of %d because offset could be None
440-
"Error uploading chunk_size %d at begin_offset %s: %s: %s",
441-
upload_service.chunk_size,
442-
begin_offset,
443-
ex.__class__.__name__,
444-
str(ex),
445-
)
446-
retries += 1
447-
if _is_immediate_retry(ex):
448-
sleep_for = 0
449-
else:
450-
sleep_for = min(2**retries, 16)
451-
LOG.info(
452-
"Retrying in %d seconds (%d/%d)",
453-
sleep_for,
454-
retries,
455-
constants.MAX_UPLOAD_RETRIES,
456-
)
457-
if sleep_for:
458-
time.sleep(sleep_for)
459-
else:
460-
raise ex
461-
else:
462-
break
463-
464-
if emitter:
465-
emitter.emit("upload_end", mutable_payload)
466-
467-
# TODO: retry here
468-
cluster_id = upload_service.finish(file_handle)
469-
470-
if emitter:
471-
mutable_payload["cluster_id"] = cluster_id
472-
emitter.emit("upload_finished", mutable_payload)
473-
474-
return cluster_id

0 commit comments

Comments
 (0)