Skip to content

Commit 692138f

Browse files
authored
Properly handle tables/views in insert_append (#544)
1 parent 7685d93 commit 692138f

3 files changed

Lines changed: 175 additions & 11 deletions

File tree

.github/workflows/pr.yaml

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,8 @@ jobs:
9393
/usr/local/share/powershell /usr/share/swift /usr/local/.ghcup \
9494
/usr/lib/jvm || true
9595
echo "some directories deleted"
96-
sudo apt install aptitude -y >/dev/null 2>&1
96+
sudo apt-get update >/dev/null 2>&1
97+
sudo apt install aptitude -y >/dev/null 2>&1 || true
9798
sudo aptitude purge aria2 ansible azure-cli shellcheck rpm xorriso zsync \
9899
esl-erlang firefox gfortran-8 gfortran-9 google-chrome-stable \
99100
google-cloud-sdk imagemagick \
@@ -103,14 +104,14 @@ jobs:
103104
libfreetype6 libfreetype6-dev libfontconfig1 libfontconfig1-dev \
104105
snmp pollinate libpq-dev postgresql-client powershell ruby-full \
105106
sphinxsearch subversion mongodb-org azure-cli microsoft-edge-stable \
106-
-y -f >/dev/null 2>&1
107-
sudo aptitude purge google-cloud-sdk -f -y >/dev/null 2>&1
107+
-y -f >/dev/null 2>&1 || true
108+
sudo aptitude purge google-cloud-sdk -f -y >/dev/null 2>&1 || true
108109
sudo aptitude purge microsoft-edge-stable -f -y >/dev/null 2>&1 || true
109110
sudo apt purge microsoft-edge-stable -f -y >/dev/null 2>&1 || true
110-
sudo aptitude purge '~n ^php' -f -y >/dev/null 2>&1
111-
sudo aptitude purge '~n ^dotnet' -f -y >/dev/null 2>&1
112-
sudo apt-get autoremove -y >/dev/null 2>&1
113-
sudo apt-get autoclean -y >/dev/null 2>&1
111+
sudo aptitude purge '~n ^php' -f -y >/dev/null 2>&1 || true
112+
sudo aptitude purge '~n ^dotnet' -f -y >/dev/null 2>&1 || true
113+
sudo apt-get autoremove -y >/dev/null 2>&1 || true
114+
sudo apt-get autoclean -y >/dev/null 2>&1 || true
114115
echo "some packages purged"
115116
df -h
116117

core/src/duckdb/creator.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -538,7 +538,9 @@ impl TableManager {
538538
Ok(indexes)
539539
}
540540

541-
#[cfg(test)]
541+
/// Creates a `TableManager` with a specific internal table name.
542+
/// This is useful when you need to target an existing internal table
543+
/// (e.g., for appending to a table created by a previous Overwrite operation).
542544
pub(crate) fn from_table_name(
543545
table_definition: Arc<TableDefinition>,
544546
table_name: RelationName,

core/src/duckdb/write.rs

Lines changed: 164 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -351,9 +351,35 @@ fn insert_append(
351351
.context(super::UnableToBeginTransactionSnafu)
352352
.map_err(to_retriable_data_write_error)?;
353353

354-
let append_table = TableManager::new(Arc::clone(table_definition))
355-
.with_internal(false)
356-
.map_err(to_retriable_data_write_error)?;
354+
// Check if there's an internal table from a previous Overwrite operation.
355+
// If so, we need to append to that internal table instead of the base table name
356+
// (which would be a view after Overwrite).
357+
let append_table = {
358+
let base_table_manager = TableManager::new(Arc::clone(table_definition))
359+
.with_internal(false)
360+
.map_err(to_retriable_data_write_error)?;
361+
362+
// List internal tables to see if a previous Overwrite created a view structure
363+
let internal_tables = table_definition
364+
.list_internal_tables(&tx)
365+
.map_err(to_retriable_data_write_error)?;
366+
367+
if let Some((latest_internal_table_name, _)) = internal_tables.last() {
368+
// There's an internal table from a previous Overwrite - use it for appending
369+
tracing::debug!(
370+
"Found internal table {} from previous overwrite, using it for append instead of base table {}",
371+
latest_internal_table_name,
372+
base_table_manager.table_name()
373+
);
374+
TableManager::from_table_name(
375+
Arc::clone(table_definition),
376+
latest_internal_table_name.clone(),
377+
)
378+
} else {
379+
// No internal tables - use the base table as normal
380+
base_table_manager
381+
}
382+
};
357383

358384
let should_have_indexes = !append_table.indexes_vec().is_empty();
359385
let has_indexes = !append_table
@@ -936,6 +962,141 @@ mod test {
936962
tx.rollback().expect("to rollback");
937963
}
938964

965+
#[tokio::test]
966+
async fn test_write_to_table_append_after_overwrite() {
967+
// Test scenario: Write with Overwrite, then write with Append
968+
// This tests the fix for the issue where Append fails after Overwrite because
969+
// Overwrite creates a view pointing to an internal table, and Append was trying
970+
// to insert into the view instead of the internal table.
971+
// Expected behavior: Append should detect the internal table and insert into it.
972+
973+
let _guard = init_tracing(None);
974+
let pool = get_mem_duckdb();
975+
976+
let table_definition = get_basic_table_definition();
977+
978+
// Step 1: Do an Overwrite operation (creates internal table + view)
979+
let duckdb_sink = DuckDBDataSink::new(
980+
Arc::clone(&pool),
981+
Arc::clone(&table_definition),
982+
InsertOp::Overwrite,
983+
None,
984+
table_definition.schema(),
985+
);
986+
let data_sink: Arc<dyn DataSink> = Arc::new(duckdb_sink);
987+
988+
let batches = vec![RecordBatch::try_new(
989+
Arc::clone(&table_definition.schema()),
990+
vec![
991+
Arc::new(Int64Array::from(vec![Some(1), Some(2)])),
992+
Arc::new(StringArray::from(vec![Some("a"), Some("b")])),
993+
],
994+
)
995+
.expect("should create a record batch")];
996+
997+
let stream = Box::pin(
998+
MemoryStream::try_new(batches, table_definition.schema(), None).expect("to get stream"),
999+
);
1000+
1001+
data_sink
1002+
.write_all(stream, &Arc::new(TaskContext::default()))
1003+
.await
1004+
.expect("to write all with overwrite");
1005+
1006+
// Verify Overwrite created an internal table and view
1007+
{
1008+
let mut conn = Arc::clone(&pool).connect_sync().expect("to connect");
1009+
let duckdb = DuckDB::duckdb_conn(&mut conn).expect("to get duckdb conn");
1010+
let tx = duckdb.conn.transaction().expect("to begin transaction");
1011+
let internal_tables = table_definition
1012+
.list_internal_tables(&tx)
1013+
.expect("to list internal tables");
1014+
assert_eq!(
1015+
internal_tables.len(),
1016+
1,
1017+
"Overwrite should create 1 internal table"
1018+
);
1019+
1020+
let view_rows: i64 = tx
1021+
.query_row(
1022+
&format!("SELECT COUNT(1) FROM {}", table_definition.name()),
1023+
[],
1024+
|row| row.get(0),
1025+
)
1026+
.expect("to query view");
1027+
assert_eq!(view_rows, 2, "View should have 2 rows after overwrite");
1028+
tx.rollback().expect("to rollback");
1029+
}
1030+
1031+
// Step 2: Do an Append operation (should append to the internal table, not fail)
1032+
let duckdb_sink_append = DuckDBDataSink::new(
1033+
Arc::clone(&pool),
1034+
Arc::clone(&table_definition),
1035+
InsertOp::Append,
1036+
None,
1037+
table_definition.schema(),
1038+
);
1039+
let data_sink_append: Arc<dyn DataSink> = Arc::new(duckdb_sink_append);
1040+
1041+
let batches_append = vec![RecordBatch::try_new(
1042+
Arc::clone(&table_definition.schema()),
1043+
vec![
1044+
Arc::new(Int64Array::from(vec![Some(3), Some(4)])),
1045+
Arc::new(StringArray::from(vec![Some("c"), Some("d")])),
1046+
],
1047+
)
1048+
.expect("should create a record batch")];
1049+
1050+
let stream_append = Box::pin(
1051+
MemoryStream::try_new(batches_append, table_definition.schema(), None)
1052+
.expect("to get stream"),
1053+
);
1054+
1055+
// This should NOT fail with "is not a table" error
1056+
data_sink_append
1057+
.write_all(stream_append, &Arc::new(TaskContext::default()))
1058+
.await
1059+
.expect("to write all with append after overwrite");
1060+
1061+
// Verify the append worked - should now have 4 rows total
1062+
let mut conn = pool.connect_sync().expect("to connect");
1063+
let duckdb = DuckDB::duckdb_conn(&mut conn).expect("to get duckdb conn");
1064+
let tx = duckdb.conn.transaction().expect("to begin transaction");
1065+
1066+
// Still should have just 1 internal table
1067+
let internal_tables = table_definition
1068+
.list_internal_tables(&tx)
1069+
.expect("to list internal tables");
1070+
assert_eq!(
1071+
internal_tables.len(),
1072+
1,
1073+
"Should still have 1 internal table after append"
1074+
);
1075+
1076+
// Query through the view - should see all 4 rows
1077+
let total_rows: i64 = tx
1078+
.query_row(
1079+
&format!("SELECT COUNT(1) FROM {}", table_definition.name()),
1080+
[],
1081+
|row| row.get(0),
1082+
)
1083+
.expect("to query view");
1084+
assert_eq!(total_rows, 4, "View should have 4 rows after append");
1085+
1086+
// Query the internal table directly - should also have 4 rows
1087+
let (internal_table_name, _) = internal_tables.first().expect("should have internal table");
1088+
let internal_rows: i64 = tx
1089+
.query_row(
1090+
&format!("SELECT COUNT(1) FROM {internal_table_name}"),
1091+
[],
1092+
|row| row.get(0),
1093+
)
1094+
.expect("to query internal table");
1095+
assert_eq!(internal_rows, 4, "Internal table should have 4 rows");
1096+
1097+
tx.rollback().expect("to rollback");
1098+
}
1099+
9391100
#[tokio::test]
9401101
async fn test_write_to_table_append_with_previous_table() {
9411102
// Test scenario: Write to a table with append mode with a previous table

0 commit comments

Comments
 (0)