Skip to content

Commit 573b926

Browse files
committed
rewrite poc
1 parent 5d9d0f5 commit 573b926

8 files changed

Lines changed: 1577 additions & 9 deletions

File tree

crates/iceberg/src/spec/snapshot.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ pub const MAIN_BRANCH: &str = "main";
3535

3636
/// Reference to [`Snapshot`].
3737
pub type SnapshotRef = Arc<Snapshot>;
38-
#[derive(Debug, Default, Serialize, Deserialize, PartialEq, Eq, Clone)]
38+
#[derive(Debug, Default, Serialize, Deserialize, PartialEq, Eq, Clone, Hash)]
3939
#[serde(rename_all = "lowercase")]
4040
/// The operation field is used by some operations, like snapshot expiration, to skip processing certain snapshots.
4141
pub enum Operation {

crates/iceberg/src/transaction/append.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -121,8 +121,8 @@ impl SnapshotProduceOperation for FastAppendOperation {
121121
}
122122

123123
async fn existing_manifest(
124-
&self,
125-
snapshot_produce: &SnapshotProducer<'_>,
124+
&mut self,
125+
snapshot_produce: &mut SnapshotProducer<'_>,
126126
) -> Result<Vec<ManifestFile>> {
127127
let Some(snapshot) = snapshot_produce.table.metadata().current_snapshot() else {
128128
return Ok(vec![]);
Lines changed: 275 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,275 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use std::collections::HashSet;
19+
use std::future::Future;
20+
21+
use futures::SinkExt;
22+
use futures::future::try_join_all;
23+
use once_cell::sync::Lazy;
24+
25+
use crate::delete_file_index::DeleteFileIndex;
26+
use crate::error::Result;
27+
use crate::scan::DeleteFileContext;
28+
use crate::spec::{
29+
DataContentType, DataFile, FormatVersion, INITIAL_SEQUENCE_NUMBER, ManifestContentType,
30+
ManifestFile, Operation,
31+
};
32+
use crate::table::Table;
33+
use crate::transaction::manifest_filter::ManifestFilterManager;
34+
use crate::transaction::snapshot::SnapshotProduceOperation;
35+
use crate::util::snapshot::ancestors_between;
36+
use crate::{Error, ErrorKind};
37+
38+
/// Operations whose snapshots may add delete files.
39+
static VALIDATE_ADDED_DELETE_FILES_OPERATIONS: Lazy<HashSet<Operation>> =
40+
Lazy::new(|| HashSet::from([Operation::Overwrite, Operation::Delete]));
41+
42+
/// An additive sub-trait of [`SnapshotProduceOperation`] implemented only by delete-class
43+
/// operations (delete, overwrite, rewrite).
44+
///
45+
/// Append-only operations implement only the base [`SnapshotProduceOperation`] and carry
46+
/// none of the validation/filtering surface defined here. A `DeleteAwareOperation` adds two
47+
/// capabilities on top of the base trait:
48+
///
49+
/// - **Write-time conflict validation** via [`validate`](DeleteAwareOperation::validate),
50+
/// invoked by the action against the refreshed base table before any snapshot is written.
51+
/// - **Manifest filtering** via the owned [`ManifestFilterManager`]s reached through
52+
/// [`data_filter`](DeleteAwareOperation::data_filter) and
53+
/// [`delete_filter`](DeleteAwareOperation::delete_filter), which rewrite carried-forward
54+
/// manifests to drop the files the operation removes.
55+
///
56+
/// The filter managers are owned fields on the operation, populated incrementally as the
57+
/// action builds the operation. Because the operation is rebuilt on each commit attempt, the
58+
/// managers are reconstructed deterministically per attempt from the action's stored inputs.
59+
#[allow(unused)]
60+
pub(crate) trait DeleteAwareOperation: SnapshotProduceOperation {
61+
/// Per-operation conflict check against the refreshed base table, run before any write.
62+
///
63+
/// Implemented per operation; each operation composes the reusable validation helpers to
64+
/// detect concurrent changes that would make the pending operation incorrect.
65+
fn validate(
66+
&self,
67+
base: &Table,
68+
parent_snapshot_id: Option<i64>,
69+
) -> impl Future<Output = Result<()>> + Send;
70+
71+
/// Accessor to the operation's owned data-manifest filter manager.
72+
///
73+
/// Built up as the operation is constructed and mutated during manifest production.
74+
fn data_filter(&mut self) -> &mut ManifestFilterManager;
75+
76+
/// Accessor to the operation's owned delete-manifest filter manager.
77+
///
78+
/// Built up as the operation is constructed and mutated during manifest production.
79+
fn delete_filter(&mut self) -> &mut ManifestFilterManager;
80+
81+
/// Retrieves the history of snapshots between two points with matching operations and content
82+
/// type.
83+
///
84+
/// # Arguments
85+
///
86+
/// * `base` - The base table to retrieve history from.
87+
/// * `from_snapshot_id` - The starting snapshot ID (exclusive), or None to start from the
88+
/// beginning.
89+
/// * `to_snapshot_id` - The ending snapshot ID (inclusive).
90+
/// * `matching_operations` - Set of operations to match when collecting snapshots.
91+
/// * `manifest_content_type` - The content type of manifests to collect.
92+
///
93+
/// # Returns
94+
///
95+
/// A tuple containing:
96+
/// * A vector of manifest files matching the criteria.
97+
/// * A set of snapshot IDs that were collected.
98+
///
99+
/// # Errors
100+
///
101+
/// Returns an error if the history between the snapshots cannot be determined.
102+
fn validation_history<'a>(
103+
&'a self,
104+
base: &'a Table,
105+
from_snapshot_id: Option<i64>,
106+
to_snapshot_id: i64,
107+
matching_operations: &'a HashSet<Operation>,
108+
manifest_content_type: ManifestContentType,
109+
) -> impl Future<Output = Result<(Vec<ManifestFile>, HashSet<i64>)>> + Send + 'a {
110+
async move {
111+
let mut manifests: Vec<ManifestFile> = vec![];
112+
let mut new_snapshots = HashSet::new();
113+
let mut last_snapshot = None;
114+
115+
let metadata = base.metadata_ref();
116+
let snapshots = ancestors_between(&metadata, to_snapshot_id, from_snapshot_id);
117+
118+
for current_snapshot in snapshots {
119+
last_snapshot = Some(current_snapshot.clone());
120+
121+
// Find all snapshots with the matching operations
122+
// and their manifest files with the matching content type
123+
if matching_operations.contains(&current_snapshot.summary().operation) {
124+
new_snapshots.insert(current_snapshot.snapshot_id());
125+
126+
let manifest_list = base.manifest_list_reader(&current_snapshot).load().await?;
127+
128+
for manifest in manifest_list.entries() {
129+
if manifest.content == manifest_content_type
130+
&& manifest.added_snapshot_id == current_snapshot.snapshot_id()
131+
{
132+
manifests.push(manifest.clone());
133+
}
134+
}
135+
}
136+
}
137+
138+
if let Some(last_snapshot) = last_snapshot
139+
&& last_snapshot.parent_snapshot_id() != from_snapshot_id
140+
{
141+
return Err(Error::new(
142+
ErrorKind::DataInvalid,
143+
format!(
144+
"Cannot determine history between starting snapshot {} and the last known ancestor {}",
145+
from_snapshot_id.unwrap_or(-1),
146+
last_snapshot.snapshot_id()
147+
),
148+
));
149+
}
150+
151+
Ok((manifests, new_snapshots))
152+
}
153+
}
154+
155+
/// Validates that there are no new delete files for the given data files.
156+
///
157+
/// # Arguments
158+
///
159+
/// * `base` - The base table to validate against.
160+
/// * `from_snapshot_id` - The starting snapshot ID (exclusive), or None to start from the
161+
/// beginning.
162+
/// * `to_snapshot_id` - The ending snapshot ID (inclusive), or None if there is no current
163+
/// table state.
164+
/// * `data_files` - The data files to check for conflicting delete files.
165+
/// * `ignore_equality_deletes` - Whether to ignore equality deletes and only check for
166+
/// positional deletes.
167+
///
168+
/// # Returns
169+
///
170+
/// A `Result` indicating success or an error if validation fails.
171+
///
172+
/// # Errors
173+
///
174+
/// Returns an error if new delete files are found for any of the data files.
175+
fn validate_no_new_deletes_for_data_files<'a>(
176+
&'a self,
177+
base: &'a Table,
178+
from_snapshot_id: Option<i64>,
179+
to_snapshot_id: Option<i64>,
180+
data_files: &'a [DataFile],
181+
ignore_equality_deletes: bool,
182+
) -> impl Future<Output = Result<()>> + Send + 'a {
183+
async move {
184+
// Delete files only exist in format version 2 and above. If there is no current table
185+
// state, or the table is V1, there cannot be any new delete files to conflict with.
186+
let Some(to_snapshot_id) = to_snapshot_id else {
187+
return Ok(());
188+
};
189+
if base.metadata().format_version() == FormatVersion::V1 {
190+
return Ok(());
191+
}
192+
193+
// Get matching delete files that have been added since the from_snapshot_id
194+
let (delete_manifests, _) = self
195+
.validation_history(
196+
base,
197+
from_snapshot_id,
198+
to_snapshot_id,
199+
&VALIDATE_ADDED_DELETE_FILES_OPERATIONS,
200+
ManifestContentType::Deletes,
201+
)
202+
.await?;
203+
204+
// Build the delete file index from the matching delete manifests.
205+
//
206+
// `DeleteFileIndex::new` spawns a background task that populates the index by
207+
// collecting from the channel until *all* senders are dropped; queries on the
208+
// returned index (`get_deletes_for_data_file`) block until that population
209+
// completes. We therefore scope the sender to this block so it is dropped as
210+
// soon as every delete file has been sent, allowing the index to finish
211+
// populating before we query it below. Holding the sender past this point
212+
// would deadlock.
213+
let delete_file_index = {
214+
let (delete_file_index, mut delete_file_tx) =
215+
DeleteFileIndex::new(base.runtime().clone());
216+
let manifests = try_join_all(
217+
delete_manifests
218+
.iter()
219+
.map(|f| f.load_manifest(base.file_io())),
220+
)
221+
.await?;
222+
for entry in manifests.iter().flat_map(|manifest| manifest.entries()) {
223+
let delete_file_ctx = DeleteFileContext {
224+
manifest_entry: entry.clone(),
225+
partition_spec_id: entry.data_file().partition_spec_id,
226+
};
227+
delete_file_tx.send(delete_file_ctx).await?;
228+
}
229+
// `delete_file_tx` is dropped here as the block ends, closing the channel.
230+
delete_file_index
231+
};
232+
233+
// Get starting seq num from starting snapshot if available
234+
let starting_sequence_number = if let Some(from_snapshot_id) = from_snapshot_id {
235+
match base.metadata().snapshot_by_id(from_snapshot_id) {
236+
Some(snapshot) => snapshot.sequence_number(),
237+
None => INITIAL_SEQUENCE_NUMBER,
238+
}
239+
} else {
240+
INITIAL_SEQUENCE_NUMBER
241+
};
242+
243+
// Validate if there are deletes using delete file index
244+
for data_file in data_files {
245+
let delete_files = delete_file_index
246+
.get_deletes_for_data_file(data_file, Some(starting_sequence_number))
247+
.await;
248+
249+
if ignore_equality_deletes {
250+
if delete_files.iter().any(|delete_file| {
251+
delete_file.file_type == DataContentType::PositionDeletes
252+
}) {
253+
return Err(Error::new(
254+
ErrorKind::DataInvalid,
255+
format!(
256+
"Cannot commit, found new positional delete for added data file: {}",
257+
data_file.file_path
258+
),
259+
));
260+
}
261+
} else if !delete_files.is_empty() {
262+
return Err(Error::new(
263+
ErrorKind::DataInvalid,
264+
format!(
265+
"Cannot commit, found new delete for added data file: {}",
266+
data_file.file_path
267+
),
268+
));
269+
}
270+
}
271+
272+
Ok(())
273+
}
274+
}
275+
}

0 commit comments

Comments
 (0)