-
Notifications
You must be signed in to change notification settings - Fork 3.6k
feat: avoid oom snapshot #26043
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
feat: avoid oom snapshot #26043
Conversation
56024af
to
2213e14
Compare
influxdb3/src/commands/serve.rs
Outdated
@@ -181,7 +181,7 @@ pub struct Config { | |||
#[clap( | |||
long = "gen1-duration", | |||
env = "INFLUXDB3_GEN1_DURATION", | |||
default_value = "10m", | |||
default_value = "1m", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Defaulting to 1m means there are more query chunks in QueryableBuffer (10 times more), but this hasn't been an issue so far.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this should be set to 1m. I think the gen1 files are still 10m, but the buffer structure is different. Its organization into 1m chunks shouldn't be configuration.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If it comes in with 10m chunk time, then breaking it down to 1m chunks (non overlapping) had been very tricky. Having said that I was trying to break it down at the point of force snapshotting, so everything was in arrow arrays and creating smaller record batches out of bigger ones after grouping them into 1m chunks still required allocation, thereby running into OOM sooner.
Are you thinking at the point of adding data to (buffer_contents
) we can separate them into 1m non-overlapping chunks and then add them into table buffer?
I haven't explored that option, I certainly can try that out - that way gen1 duration will be left to 10m and query buffer handles them as 1m chunks in the table buffer internally I suppose.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
|
||
for chunk in snapshot_chunks { | ||
for chunk in snapshot_chunks_iter { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This snapshot_chunks_iter
produces SnapshotChunk
lazily, uses the chunk to create PersistJob
and then moves it to TableBuffer
's snapshotting_chunks
. Because there's a write lock on this buffer above, it is ok to remove the key and then add it back. Previously the snapshotting_chunks
was cloned and this avoids the cloning.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, you say it gets removed then added back, right? I see that it's removed from TableBuffer.chunk_time_to_chunks
we iterate over SnaphotChunkIter
but I don't see where it's added back to that map. I also see where the new SnapshotChunk
you refer to in this comment is added to the TableBuffer.snapshotting_chunks
but I don't see where anything is removed from that vec. When you say "added back" are you just referring to it being added to the snapshotting chunks?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
are you just referring to it being added to the snapshotting chunks?
Yes - I can see how I've confused you, I should rephrase that. It is added (as opposed to added back) to the snapshotting_chunks
. I was trying to imply it's added back to TableBuffer
s snapshotting_chunks
.
The previous operation was
- to remove those keys from the map
- convert them to record batches
- hold it in
snapshotting_chunks
, - clone it and return the copy back to create persist jobs out of it.
Now, iterator holds all the keys to remove
- each call to
next()
converts it to record batch and yields it - this loop creates persist job
- then adds it to
snapshotting_chunks
at the end
The main thing is there's a write lock for this operation and hence both these operations should leave TableBuffer
in same state.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Has this been tested with high query concurrency to verify that this isn't a performance regression?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes - it has been, if I'm making further changes I'd probably need this to be perf tested again.
persisted_files: Arc<PersistedFiles>, | ||
persisted_snapshot: Arc<Mutex<PersistedSnapshot>>, | ||
) { | ||
let iterator = PersistJobGroupedIterator::new( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This allows chunks to be grouped, since 1m gen 1 duration, it aggregates together up to 10 chunks to write a single parquet file for 10m window.
3a4a9ab
to
d85460b
Compare
} | ||
|
||
#[test_log::test(tokio::test)] | ||
async fn test_snapshot_serially_two_tables_with_varying_throughput() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@pauldix - this should cover the case we discussed with 2 tables receiving different amount of writes.
9aa8fdf
to
0760147
Compare
0760147
to
19c29ab
Compare
|
||
let mut set = JoinSet::new(); | ||
// TODO: may be this concurrency level needs to be externalised | ||
let sempahore = Arc::new(Semaphore::new(5)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This might be good to externalise but it has a direct impact on memory consumption, doing 5 times as much has been working pretty well both on memory consumption and the rate at which the snapshotting itself completes - so it's hard coded for now
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mostly have questions which makes me think I might not understand this PR well enough to approve it so I'm just leaving it at comments for now.
@@ -179,7 +191,35 @@ impl TableBuffer { | |||
} | |||
|
|||
pub fn clear_snapshots(&mut self) { | |||
self.snapshotting_chunks.clear(); | |||
// vec clear still holds the mem (capacity), so use take |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it a bad thing to hold the memory in this case? Wouldn't that mean fewer allocations when filling the vec on the next pass?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is a good point, previously we were replacing the snapshotting_chunks
each time - it was creating a vec each time to replace original one, so I changed it to reclaim the memory. But now that I'm looping through and adding them, I can reuse it. I didn't revisit this code after I made the change to use the iterator. I'll change it and run through the profiler.
} | ||
|
||
pub(crate) struct SnaphotChunkIter<'a> { | ||
pub keys_to_remove: Iter<'a, i64>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: should fields be more public than the struct itself?
|
||
for chunk in snapshot_chunks { | ||
for chunk in snapshot_chunks_iter { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, you say it gets removed then added back, right? I see that it's removed from TableBuffer.chunk_time_to_chunks
we iterate over SnaphotChunkIter
but I don't see where it's added back to that map. I also see where the new SnapshotChunk
you refer to in this comment is added to the TableBuffer.snapshotting_chunks
but I don't see where anything is removed from that vec. When you say "added back" are you just referring to it being added to the snapshotting chunks?
} | ||
} | ||
|
||
pub(crate) struct SnaphotChunkIter<'a> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: typo in name
@@ -435,6 +453,148 @@ impl QueryableBuffer { | |||
} | |||
} | |||
|
|||
#[allow(clippy::too_many_arguments)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we avoid ignoring lints if we create a type with methods where the type holds all the parameters that are currently being passed to these top-level functions (sort_dedupe_parallel
, sort_dedupe_serial
, process_single_persist_job
)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These should've been removed, the code went through few passes and at one point I was creating the PersistJobGroupedIterator
within those functions which ended up with those lint failures. They don't even need them anymore (process_single_persist_job
- this doesn't have the ignore, does it?).
let mut current_size_bytes = current_data.total_batch_size(); | ||
debug!(?current_size_bytes, table_name = ?current_data.table_name, ">>> current_size_bytes for table"); | ||
self.system.refresh_memory(); | ||
let system_mem_bytes = self.system.free_memory() - 100_000_000; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this will take into account cgroup limits -- might be worth considering that as well since cgroup memory limits can lead to OOM kills as well: https://docs.rs/sysinfo/latest/sysinfo/struct.System.html#method.cgroup_limits
for i in 0..10 { | ||
// create another write, this time with two tags, in a different gen1 block | ||
let ts = Gen1Duration::new_1m().as_duration().as_nanos() as i64 + (i * 240_000_000_000); | ||
// let line = format!("bar,t1=a,t2=b f1=3i,f2=3 {}", ts); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did you forget to remove this comment?
.persisted_files | ||
.get_files(db.id, table.table_id); | ||
debug!(?files, ">>> test: queryable buffer persisted files"); | ||
assert_eq!(4, files.len()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The persisted files are gen1 snapshots, right? What exactly determines which writes end up in which files? I'm asking because I'm a little confused -- I see 10 writes with three lines each, each separated from the previous by roughly 240s (4m) in their timestamps, yet I keep seeing Gen1Duration::new_1m()
which makes me think we should actually have a separate file for each write since they are outside of the same 1m duration.
My rough understanding is that in terms of what is written we would have is:
- 3 lines at ts = 300s
- 3 lines at ts = 540s
- 3 lines at ts = 780s
- 3 lines at ts = 1020s
- 3 lines at ts = 1260s
- 3 lines at ts = 1500s
- 3 lines at ts = 1740s
- 3 lines at ts = 1980s
- 3 lines at ts = 2220s
- 3 lines at ts = 2460s
And since everything is configured for a 1m gen1 duration each of these writes would end up in their own gen1 file so we would have 10 files each with 3 lines and none with 9 lines.
Am I misunderstanding the meaning of duration for gen1 files? Does it mean something other than all contained rows have data within the same min and max timestamp?
I saw something in this PR about multiple gen1 chunks being batched into 10min intervals, but I don't think that explains it for me since if that were the case then I would expect 3 files with 6 lines each (300s&540s, 780s&1020s, 2220s&1980), 1 file with 9 lines (1260s&1500s&1740s), and one file with 3 lines (2460s)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The persisted files are gen1 snapshots, right? What exactly determines which writes end up in which files?
They're gen1 files, we'd like to group as many 1m chunks for a table into a single file with time (10 minutes) and memory (whatever is passed in) as two bounds.
Am I misunderstanding the meaning of duration for gen1 files? Does it mean something other than all contained rows have data within the same min and max timestamp?
No, your understanding is right, it is a mechanism to batch the writes together that fall within the same boundary.
I saw something in this PR about multiple gen1 chunks being batched into 10min intervals, but I don't think that explains it for me since if that were the case then I would expect 3 files with 6 lines each
It is grouping multiple gen1 files query buffer chunks (1m) along with a bound on memory, so 100_000
(see here) bytes (arbitrary choice) for this test breaks it to 4 files instead of 3 files. If that is increased, then more persist jobs (chunks) will be grouped together - in this case if I change 100_000
to 150_000
(just checked locally), it writes to 3 files instead of 4 (as you mentioned). I'll try and explain the size implication in the test as well as this might be easy to miss.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've now changed it to 150_000
but when checking the actual bytes I realized it was not doing the check before adding the next one, it was always adding the next one and then going above the max allowed by one item. So, even if I increased it to 150_000
it still should remain as 4 files. I've added more comments to explain the memory limit too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's actually gen1 files are 10m by default. This changes the organization of the buffer to hold 1m chunks of data and then combine those into 10m chunks if there is enough memory available to do so. Otherwise, the 1m buffer chunks are written out into smaller gen1 chunks. We shouldn't refer to the buffer structure as "file" as it's by definition not files.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is grouping multiple gen1 files query buffer chunks (1m) along with a bound on memory, so 100_000 (see here) bytes (arbitrary choice) for this test breaks it to 4 files instead of 3 files.
Thanks for all the explanation! Yeah, the size bound was what I was missing reading through the test case.
Between the different test cases there looks to be a lot of setup boilerplate that could potentially be abstracted into fixture types to make the test cases themselves easier to read. Maybe not worth doing that here, but worth keeping in mind in the future.
5e7ac00
to
d6b7d8b
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One main question about testing query concurrency and some comments about gen1 files vs. the write buffer that I think can be cleaned up for clarity.
influxdb3/src/commands/serve.rs
Outdated
#[clap( | ||
long = "max-memory-for-snapshot", | ||
env = "INFLUXDB3_MAX_MEMORY_FOR_SNAPSHOT", | ||
default_value = "100", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the default is a percentage? Do those usually have % at the end? Or can it be there for clarity? Or if this is 100MB, that seems exceptionally low as a default. A percentage would be better since it will be on a wide variety of memory configurations.
|
||
for chunk in snapshot_chunks { | ||
for chunk in snapshot_chunks_iter { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Has this been tested with high query concurrency to verify that this isn't a performance regression?
.persisted_files | ||
.get_files(db.id, table.table_id); | ||
debug!(?files, ">>> test: queryable buffer persisted files"); | ||
assert_eq!(4, files.len()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's actually gen1 files are 10m by default. This changes the organization of the buffer to hold 1m chunks of data and then combine those into 10m chunks if there is enough memory available to do so. Otherwise, the 1m buffer chunks are written out into smaller gen1 chunks. We shouldn't refer to the buffer structure as "file" as it's by definition not files.
influxdb3/src/commands/serve.rs
Outdated
@@ -181,7 +181,7 @@ pub struct Config { | |||
#[clap( | |||
long = "gen1-duration", | |||
env = "INFLUXDB3_GEN1_DURATION", | |||
default_value = "10m", | |||
default_value = "1m", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this should be set to 1m. I think the gen1 files are still 10m, but the buffer structure is different. Its organization into 1m chunks shouldn't be configuration.
d6b7d8b
to
9fa5ec0
Compare
7498d14
to
8d26c43
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Couple of small comments, but I think it looks good.
|
||
/// This iterator groups persist jobs together to create a single persist job out of it with the | ||
/// combined record batches from all of them. By default it'll try to pick as many as 10 persist | ||
/// jobs (gen1 defaults to 1m so groups 10 of them to get to 10m) whilst maintaining memory |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comment is off? Gen1 is a file and defaults to 10m. The chunks inside the buffer default to 1m?
@@ -898,4 +1234,456 @@ mod tests { | |||
.get_files(db.id, table.table_id); | |||
assert_eq!(files.len(), 2); | |||
} | |||
|
|||
#[test_log::test(tokio::test)] | |||
async fn test_when_snapshot_in_parallel_group_multiple_gen_1_durations() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
async fn test_when_snapshot_in_parallel_group_multiple_gen_1_durations() { | |
async fn test_when_snapshot_in_parallel_group_multiple_bufffer_chunk_durations() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I assume you don't mean gen1, but instead mean that there are multiple chunks inside the table buffer that match up to a larger gen1?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me 😎
This PR addresses the OOM issue (or reduces the chances of running into OOM when snapshotting) by doing following main changes - defaults gen 1 duration to 1m (instead of 10m) - snapshot chunks are built lazily and - sort/dedupe step itself is done serially (i.e 1 at a time) As an optimisation when _not_ forcing a snapshot it aggregates up to 10m worth of chunks and writes them in parallel assumption is given it's a normal snapshot, there is enough memory to run it. closes: #25991
- extra debug logs added - test fixes
- when grouping persist jobs try checking if adding persist job will tip it over the max allowed before adding it - consider the cgroup limits for memory - any typo fixes - extra docs for one of the tests
- gen1 duration is set to 10m, but the internal query buffer is now divided into 1m chunks - additional debug logs to measure the times to see how long it takes to build persist jobs and table buffer updates
- updated the comments for env var
8d26c43
to
3aaf3d5
Compare
3aaf3d5
to
d493deb
Compare
This PR addresses the OOM issue (or reduces the chances of running into OOM when snapshotting) by doing following main changes
As an optimisation when not forcing a snapshot it aggregates up to 10m worth of chunks and writes them in parallel assumption is given it's a normal snapshot, there is enough memory to run it.
closes: #25991