Convert: Optimize for wide schema and fix Sharding logic#92
Open
ViciousEagle03 wants to merge 1 commit intothanos-io:mainfrom
Open
Convert: Optimize for wide schema and fix Sharding logic#92ViciousEagle03 wants to merge 1 commit intothanos-io:mainfrom
ViciousEagle03 wants to merge 1 commit intothanos-io:mainfrom
Conversation
ViciousEagle03
commented
Apr 3, 2026
| ) | ||
|
|
||
| const ( | ||
| targetBatchSizeBytes = 64 * 1024 * 1024 |
Author
There was a problem hiding this comment.
targetBatchSizeBytes is the memory limit for each batch of rows in the buffer.
Signed-off-by: Piyush Sharma <piyushsharma04321@gmail.com>
3e1fb0e to
b1f9764
Compare
Author
|
@GiedriusS, I noticed the #33 issue was labeled |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Fixes: #33
While trying to run the
TestConverterIndexWithManyLabelNamesand understanding why it was not even running on extremely wide schemas (for this specific case, 34,000 label columns), I ran into a few bottlenecks. From what I can gather as I learn the architecture, the pipeline likely wasn't originally designed for schemas of this extreme width. The write and read pipelines had several bottlenecks, which collectively caused a chain of thread contention, I/O backpressure, and GC pressure.Note: The fixes proposed below are based on my profiling and understanding of the system's limits and the Thanos-parquet-gateway codebase. If I have misinterpreted any part of the Thanos architecture or the parquet-go internals. Please let me know if any of these changes conflict with the intended design of the gateway. I am more than happy to adjust the logic based on your suggestions.
Logic fix 1: (
ShardSeries)In the current implementation of the
ShardSeriesfunction, there is a small logical error in how the function determines when to cut a new shard based on the maximum column limitmath.MaxInt16. So when a new shard is cut, the label/labels of the new series that triggered the shardcutting logic were already pushed into theshardLabelColumns[shardIdx]of the current shard. After this, is the check statement executed. Due to this, the old schema map contained label columns that did not belong to it. When this corrupted schema map is eventually passed down to the pipeline toBuildSchemaFromLabels->parquet.NewSchema(), it triggers a panic inside the parquet-go library because it exceeds the limit of the column labels. We Fix this by anewLabelsbuffer. So we check first and only then push the labels to theshardLabelColumns[shardIdx].Wrote the
TestShardSeriesIndexBoundarysuite to test the same.Fix 2:
When dealing with extremely wide shards (e.g., the first shard with 32,762 columns), blindly passing the default
rowGroupSize(1,000,000) toparquet.MaxRowsPerRowGroupcauses the Parquet writer to attempt to buffer 1,000,000 rows of 32,762 columns. Though many of them are empty, the size still skyrockets to around 32 billion cells (1M * 32,762), requiring a ridiculous amount of RAM.The Fix: We dynamically calculate the
MaxRowsPerRowGroupby introducing aMaxCellBudgetof 100,000,000, while ensuring the row size is reasonably large enough for the serialization logic. Because of this inverse scaling, we are essentially calculating the exact rows per group that will stay under the RAM buffer limit.Fix 3: (
newBufferedReader)The current logic of
newBufferedReaderutilizes a hardcoded batch size (128 rows) and a hardcoded channel capacity (128 batches). The problem arises when the pipeline is fed with a very wide schema (in our case, 32,762 unique label names in a single block). The amount of bytes fed into RAM instantly skyrockets:Because of this, the background goroutine went ahead and decoded 13 GB of data into memory, causing it to hang before the writer even started processing the first batch. Since the writer is CPU bound, the
BufferedReaderchannel was pushing data at a speed the writer couldn't digest.The Fix: We maintain a batch size via a reader budget and dynamically calculate how many rows we should process in a batch to efficiently decode them, ensuring the row batch size is range-bounded according to the incoming schema. Since the Parquet writer can only process so many batches, we reduced the channel capacity to 2 (double buffering) to force the TSDB reader to pause for the writer, ensuring the pipeline never hoards more than 2 batches of data in RAM at any given time.
I am still not sure what the best hardcoded channel value should be, and it is a point of discussion.
Fix 4: (
WriteRows)Though we limited the batches of rows processed to save memory, the
WriteRows()function currently uses an errgroup to write the batches of.labels.parquetand.chunks.parquetfiles concurrently. For a wide schema, this kills performance. The traces show that for a single shard with 32,762 columns, around 2,000 ephemeral goroutines are created. Because of the memory budget introductioen, the code was constantly spinning up and tearing down an errgroup.The goroutines spent 90% of their time blocked on internal Mutex locks within the parquet-go writer. At the same time, a concurrent
row.Clone()calls were flooding the heap with millions ofparquet.Valueobjects, which triggered Go's Mark Assists. Most of the time was spent scanning pointers to free up memory instead of actually writing data.The Fix: By using sequential writes instead of concurrent ones, the goroutine count drops from around 2,000 to 7. This removes the scheduler churn, reduces the instantaneous allocation by 50%, and keeps us below the GC threshold.
I ran both versions and found a negligible time difference. Trace attached below.
I ran this using the sequential logic with all the other fixes applied.

I ran this one with the

errgroup.groupconcurrency logic with all the other fixes applied.Fix 5:
Whenever
parquet.Convert(targetSchema, sourceSchema)is executed, it attempts to pair every single column in the target schema with the matching column in the source schema. It does this by repeatedly calling a helper functionfieldByName(), which then callsGroup.Fields().In older versions of parquet-go,
Group.Fields()dynamically evaluates the underlying Node map by allocating a slice and running a full string comparison sort every single time. For a single huge shard containing 32,762 columns, this results in massive struct allocations (32,762 calls * 32,762 fields). The sorting was not optimized for such huge schemas, taking around 32,762 calls * (32,762 * log (32,762)), resulting in a time complexity ofO(N^2logN).Initially, to fix this, I wrote a custom wrapper to intercept and cache the field lookups. Fortunately, I found the PR#423 in the parquet-go codebase fixing this exact problem, released in
v0.27.0of the parquet-go library. Bumping the dependency natively drops the complexity toO(NlogN).Below, I have attached the execution traces, along with CPU and Memory profile diffs.

Note: Since the original pipeline couldn't complete the 34,000 column conversion to do the above bottlenecks, I ran the original code at 25000 columns and the new patch with the test as is, i.e, with the default 34,000 columns.
Fix(Left) vs the current codebase(Right) goroutine trace
I also ran the commands below to compare the runs.
Both the Flame Graphs are attached below
We could see some red blocks in the flame graphs below. I believe it's because in the old version of parquet-go lib, they were inlined by the compiler, but in the


v0.27.0, they are not inlined, so we see a jump in theCopyRowsandAppendRowsblocks.Final result