-
Notifications
You must be signed in to change notification settings - Fork 20
Fix multipart upload #217
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix multipart upload #217
Conversation
@Kirill888 fyi, WiP after #212 |
@wietzesuijker I don't think this is correct way to approach this, looks like we are trying to debug by code change, no good. Need proper test suite for chunk merging and it's interaction with chunk flushing. |
dc28e63
to
e42f822
Compare
@Kirill888 Thanks! I added quite some debugging to find out how to make this work. With this code, I wrote a set of sizeable COGs to ABS without errors as quick as you'd expect it to be. The style of the code is currently still a bit out of sync with the rest of the library. I'm curious to get your thoughts on the implementation, though.
|
odc/geo/_version.py
Outdated
@@ -1,3 +1,3 @@ | |||
"""version information only.""" | |||
|
|||
__version__ = "0.4.10" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please revert, version is handled differently now
e42f822
to
f77224e
Compare
f77224e
to
31b0a56
Compare
@wietzesuijker it's too much for me to review in a hurry, but we certainly don't need global locking during merge or partid sorting, they MUST be sorted by construction, see my comments from few days back. |
logger = logging.getLogger("odc.geo.cog._az") | ||
# Set default level (can be overridden by application config) | ||
logger.setLevel(logging.INFO) | ||
# Avoid adding handlers multiple times if this module is reloaded | ||
if not logger.hasHandlers(): | ||
# Configure console handler | ||
handler = logging.StreamHandler(sys.stdout) | ||
formatter = logging.Formatter( | ||
"%(asctime)s [%(levelname)s] %(name)s (%(funcName)s): %(message)s" | ||
) | ||
handler.setFormatter(formatter) | ||
logger.addHandler(handler) | ||
# Prevent logs from propagating to the root logger if handlers are added here | ||
logger.propagate = False | ||
|
||
# Example: Increase log level for debugging | ||
# logger.setLevel(logging.DEBUG) | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no logging setup like that on import please, do it inside your test app if needed for debugging
logging.getLogger(__name__).debug( | ||
"Leftover data size (%s bytes) is less than min_write_sz; deferring flush.", | ||
len(self.left_data), | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NO: it's non-recoverable error and a sign of either misconfiguration or mistake in code
for part in self.parts: | ||
if "PartNumber" not in part or "BlockId" not in part: | ||
raise ValueError(f"Malformed part metadata: {part}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NO, it's upto impl to decide what is malformed or not.
@@ -86,6 +92,7 @@ def __init__( | |||
self.observed: list[tuple[int, Any]] = [] if observed is None else observed | |||
self.is_final = is_final | |||
self.lhs_keep = lhs_keep | |||
self._global_counter = 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NO, can't have that
# Use a Dask distributed lock to ensure the nextPartId update is atomic | ||
lock = Lock("mpu_merge_lock") | ||
with lock: | ||
all_parts = lhs.parts + rhs.parts | ||
all_parts.sort(key=lambda p: int(p["PartNumber"])) | ||
new_nextPartId = ( | ||
all_parts[-1]["PartNumber"] + 1 | ||
if all_parts | ||
else max(lhs.nextPartId, rhs.nextPartId) | ||
) | ||
return MPUChunk( | ||
rhs.nextPartId, | ||
new_nextPartId, | ||
rhs.write_credits, | ||
rhs.data, | ||
lhs.left_data, | ||
lhs.parts + rhs.parts, | ||
all_parts, | ||
lhs.observed + rhs.observed, | ||
rhs.is_final, | ||
lhs.lhs_keep, | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NO, I commented on this before, parts must be ordered by construction, and this logic for next part selection is bogus.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and no lock is needed, nor will this work in a distributed setting
this is too much change and it contradicts original design, I think there is misunderstanding on how this is supposed to work. |
Thanks, I've opened a clean alternative in #223 |
This PR optimizes Azure multipart uploads by ensuring that only non-empty, strictly ordered block IDs are committed.
However, this error still persists: HttpResponseError: The specified block list is invalid / ErrorCode:InvalidBlockList.
Next steps
I suspect a 0‐byte block (or an out‐of‐order/duplicate block) is still being staged. I will try things along these lines next: