Skip to content

Commit 5530bc3

Browse files
committed
feat: changes suggested in review
1 parent ea5a809 commit 5530bc3

File tree

1 file changed

+71
-67
lines changed

1 file changed

+71
-67
lines changed

crates/iceberg/src/arrow/delete_file_manager.rs

+71-67
Original file line numberDiff line numberDiff line change
@@ -138,73 +138,75 @@ impl CachingDeleteFileManager {
138138
}
139139
}
140140

141+
/// Load the deletes for all the specified tasks
142+
///
143+
/// Returned future completes once all loading has finished.
144+
///
145+
/// * Create a single stream of all delete file tasks irrespective of type,
146+
/// so that we can respect the combined concurrency limit
147+
/// * We then process each in two phases: load and parse.
148+
/// * for positional deletes the load phase instantiates an ArrowRecordBatchStream to
149+
/// stream the file contents out
150+
/// * for eq deletes, we first check if the EQ delete is already loaded or being loaded by
151+
/// another concurrently processing data file scan task. If it is, we return a future
152+
/// for the pre-existing task from the load phase. If not, we create such a future
153+
/// and store it in the state to prevent other data file tasks from starting to load
154+
/// the same equality delete file, and return a record batch stream from the load phase
155+
/// as per the other delete file types - only this time it is accompanied by a one-shot
156+
/// channel sender that we will eventually use to resolve the shared future that we stored
157+
/// in the state.
158+
/// * When this gets updated to add support for delete vectors, the load phase will return
159+
/// a PuffinReader for them.
160+
/// * The parse phase parses each record batch stream according to its associated data type.
161+
/// The result of this is a map of data file paths to delete vectors for the positional
162+
/// delete tasks (and in future for the delete vector tasks). For equality delete
163+
/// file tasks, this results in an unbound Predicate.
164+
/// * The unbound Predicates resulting from equality deletes are sent to their associated oneshot
165+
/// channel to store them in the right place in the delete file managers state.
166+
/// * The results of all of these futures are awaited on in parallel with the specified
167+
/// level of concurrency and collected into a vec. We then combine all of the delete
168+
/// vector maps that resulted from any positional delete or delete vector files into a
169+
/// single map and persist it in the state.
170+
///
171+
///
172+
/// Conceptually, the data flow is like this:
173+
///```none
174+
/// FileScanTaskDeleteFile
175+
/// |
176+
/// Already-loading EQ Delete | Everything Else
177+
/// +---------------------------------------------------+
178+
/// | |
179+
/// [get existing future] [load recordbatch stream / puffin]
180+
/// DeleteFileContext::InProgEqDel DeleteFileContext
181+
/// | |
182+
/// | |
183+
/// | +-----------------------------+--------------------------+
184+
/// | Pos Del Del Vec (Not yet Implemented) EQ Del
185+
/// | | | |
186+
/// | [parse pos del stream] [parse del vec puffin] [parse eq del]
187+
/// | HashMap<String, RoaringTreeMap> HashMap<String, RoaringTreeMap> (Predicate, Sender)
188+
/// | | | |
189+
/// | | | [persist to state]
190+
/// | | | ()
191+
/// | | | |
192+
/// | +-----------------------------+--------------------------+
193+
/// | |
194+
/// | [buffer unordered]
195+
/// | |
196+
/// | [combine del vectors]
197+
/// | HashMap<String, RoaringTreeMap>
198+
/// | |
199+
/// | [persist del vectors to state]
200+
/// | ()
201+
/// | |
202+
/// +-------------------------+-------------------------+
203+
/// |
204+
/// [join!]
205+
///```
141206
pub(crate) async fn load_deletes(
142207
&self,
143208
delete_file_entries: &[FileScanTaskDeleteFile],
144209
) -> Result<()> {
145-
/*
146-
* Create a single stream of all delete file tasks irrespective of type,
147-
so that we can respect the combined concurrency limit
148-
* We then process each in two phases: load and parse.
149-
* for positional deletes the load phase instantiates an ArrowRecordBatchStream to
150-
stream the file contents out
151-
* for eq deletes, we first check if the EQ delete is already loaded or being loaded by
152-
another concurrently processing data file scan task. If it is, we return a future
153-
for the pre-existing task from the load phase. If not, we create such a future
154-
and store it in the state to prevent other data file tasks from starting to load
155-
the same equality delete file, and return a record batch stream from the load phase
156-
as per the other delete file types - only this time it is accompanied by a one-shot
157-
channel sender that we will eventually use to resolve the shared future that we stored
158-
in the state.
159-
* When this gets updated to add support for delete vectors, the load phase will return
160-
a PuffinReader for them.
161-
* The parse phase parses each record batch stream according to its associated data type.
162-
The result of this is a map of data file paths to delete vectors for the positional
163-
delete tasks (and in future for the delete vector tasks). For equality delete
164-
file tasks, this results in an unbound Predicate.
165-
* The unbound Predicates resulting from equality deletes are sent to their associated oneshot
166-
channel to store them in the right place in the delete file manager's state.
167-
* The results of all of these futures are awaited on in parallel with the specified
168-
level of concurrency and collected into a vec. We then combine all of the delete
169-
vector maps that resulted from any positional delete or delete vector files into a
170-
single map and persist it in the state.
171-
172-
173-
Conceptually, the data flow is like this:
174-
175-
FileScanTaskDeleteFile
176-
|
177-
Already-loading EQ Delete | Everything Else
178-
+---------------------------------------------------+
179-
| |
180-
[get existing future] [load recordbatch stream / puffin]
181-
DeleteFileContext::InProgEqDel DeleteFileContext
182-
| |
183-
| |
184-
| +-----------------------------+--------------------------+
185-
| Pos Del Del Vec (Not yet Implemented) EQ Del
186-
| | | |
187-
| [parse pos del stream] [parse del vec puffin] [parse eq del]
188-
| HashMap<String, RoaringTreeMap> HashMap<String, RoaringTreeMap> (Predicate, Sender)
189-
| | | |
190-
| | | [persist to state]
191-
| | | ()
192-
| | | |
193-
| +-----------------------------+--------------------------+
194-
| |
195-
| [buffer unordered]
196-
| |
197-
| [combine del vectors]
198-
| HashMap<String, RoaringTreeMap>
199-
| |
200-
| [persist del vectors to state]
201-
| ()
202-
| |
203-
+-------------------------+-------------------------+
204-
|
205-
[join!]
206-
*/
207-
208210
let stream_items = delete_file_entries
209211
.iter()
210212
.map(|t| (t.clone(), self.file_io.clone(), self.state.clone()))
@@ -253,18 +255,20 @@ impl CachingDeleteFileManager {
253255
)),
254256

255257
DataContentType::EqualityDeletes => {
256-
let (sender, fut) = EqDelFuture::new();
257-
{
258+
let sender = {
258259
let mut state = state.write().unwrap();
259-
260260
if let Some(existing) = state.equality_deletes.get(&task.file_path) {
261261
return Ok(DeleteFileContext::InProgEqDel(existing.clone()));
262262
}
263263

264+
let (sender, fut) = EqDelFuture::new();
265+
264266
state
265267
.equality_deletes
266268
.insert(task.file_path.to_string(), fut);
267-
}
269+
270+
sender
271+
};
268272

269273
Ok(DeleteFileContext::FreshEqDel {
270274
batch_stream: Self::parquet_to_batch_stream(&task.file_path, file_io).await?,

0 commit comments

Comments
 (0)