Skip to content

refactor: improve MPUChunk flush logic and Dask finalization #223

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: develop
Choose a base branch
from

Conversation

wietzesuijker
Copy link
Contributor

It enables writing sizeable COGs more flexibly and solves failures for sizeable COGs with many overviews on Azure.

The multi-part upload process has been improved with refined data flushing logic. Instead of a single flush, data is collected in a local buffer and sent in smaller chunks based on write size and available credits.

Error handling has also been enhanced, providing clearer messages for problems like insufficient write credits or invalid part IDs. This ensures immediate notifications of write failures.

The finalization process can now be executed in a distributed manner using Dask. It retrieves a Dask client, submits the flush as a remote task with a timeout, and can manage cancellations, resulting in a more reliable final output assembly.

@@ -296,7 +299,7 @@ def gen_bunch(
partId + idx * writes_per_chunk,
writes_per_chunk,
is_final=is_final,
lhs_keep=lhs_keep,
lhs_keep=lhs_keep if idx == 0 else 0,
Copy link
Member

@Kirill888 Kirill888 Apr 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, every chunk must keep same amount regardless of position relative to other chunks. Good way to think of it is "extremes scenarios": what happens when only one chunks was written to and it did not keep left side bytes, now you need to write a header with too few bytes for a chunk write, and since you didn't keep any lhs bytes you end up in scenario like this:

HDR{10KiB}, P{id=2}, P{id=7}, ...

You can't write out HDR{10KiB} even though you reserved PartId=1 for it, because it's too small. That's why we need to keep at least 5MiB (S3 case) bytes on the left, so we always end up with:

HDR{10KiB}, L{5MiB}, P{id=2}, P{id=7}, ...

And we then can always write out HDR{10KiB}+L{5MiB} => P{id=1}, or alternatively we end up with:

HDR{10KiB}, D{100KiB}

No parts were written out yet, since we only had 100KiB worth of non-header data D{100KiB}, and so it was never written, so we can just perform

HDR{10KiB} + D{100KiB} => P{id=1}, or even cancels multi-part upload and generate a single write object instead, since this time P{id=1} is the last part of the upload it is allowed to be any size.

@@ -65,6 +66,7 @@ class MPUChunk:
"observed",
"is_final",
"lhs_keep",
"__dict__",
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why?

@@ -390,33 +389,29 @@ def mpu_write(
partId,
ch,
writes_per_chunk=writes_per_chunk,
lhs_keep=lhs_keep,
lhs_keep=lhs_keep if idx == 0 else 0,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, see other comment about lhs_keep

@Kirill888
Copy link
Member

Kirill888 commented Apr 10, 2025

what we need are more tests for MPUChunk, in particular for merge and flush behaviours before we go along and start changing things all over the place.

I'm pretty sure that merge/collate/append might end up changing one or all of the inputs, which goes unnoticed most of the time, but, on some occasions, Dask might re-do the computation with already modified inputs and you end up with incorrect state, either on disk or even in code (triggering asserts). So I would start with adding tests to detect that scenario, and then fixing MPUChunk logic. It could also be possible that part writing produces chunks that do not satisfy expected state, so adding more thorough tests for that would help to either detect the error or convince us that issue is some place else.

We really must change MPUChunk to be properly "final", and not allow in-place changes of the chunk object itself or any of it's mutable parts.

I mentioned before, I really don't think this whole mpu functionality belongs in odc-geo, I was thinking of taking that part out into a separate module and possibly changing up the interface. I'm just not convinced that using dask.Bag of byte chunks was a good choice. At the very least we should be able to support a simpler interface using fixed size sequence of futures that evaluate to bytes of arbitrary size each.

In the case of COG generation we know up-front how many logical parts there are, even if some of the parts will end up being empty, so no need to use Bag, which is for "don't yet know how many elements in it".

Also you still have not explained to me exactly what problems YOU are having with this code in what scenarios.

@wietzesuijker
Copy link
Contributor Author

Thanks for the rigorous feedback; that's very helpful to refactor the implementation, I'll follow up. The problem is with writing large COGs (50k*50k-ish) with multiple overview levels. This triggers an AssertionError in MPUChunk.flush due to small left_data during finalization: assert len(self.left_data) >= write.min_write_sz within MPUChunk.flush when finalise=True.

@Kirill888
Copy link
Member

Thanks for the rigorous feedback; that's very helpful to refactor the implementation, I'll follow up. The problem is with writing large COGs (50k*50k-ish) with multiple overview levels. This triggers an AssertionError in MPUChunk.flush due to small left_data during finalization: assert len(self.left_data) >= write.min_write_sz within MPUChunk.flush when finalise=True.

  1. assert len(self.left_data) >= write.min_write_sz

    • could be bad config/bug for lhs_keep, it needs to be at least write.min_write_sz, if not things won't work
    • could be an error in merge (not keeping lhs_keep in some circumstances)
    • could be due to Dask task stealing in combination with us modifying input MPUChunk objects while we should not, we know we are doing it, we don't know if Dask ever reuses those objects.
  2. 50Kx50K is not that huge, but what matters is not the size in pixels, but the size in the number of tiles

    • increase block size for native layer and the first 4 overview levels, I'd say 4096 for native, 2048 for first 4 overviews, then 1024 after that

Other notes: run tests on your dev machine, please, tests are failing on this PR, for code that was changed by this PR. Also linting is failing too.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants