diff --git a/src/include/rest_catalog/objects/assert_ref_snapshot_id.hpp b/src/include/rest_catalog/objects/assert_ref_snapshot_id.hpp index e2258bb98..418c6fd02 100644 --- a/src/include/rest_catalog/objects/assert_ref_snapshot_id.hpp +++ b/src/include/rest_catalog/objects/assert_ref_snapshot_id.hpp @@ -30,6 +30,7 @@ class AssertRefSnapshotId { TableRequirementType type; string ref; int64_t snapshot_id; + bool has_snapshot_id = false; }; } // namespace rest_api_objects diff --git a/src/rest_catalog/objects/assert_ref_snapshot_id.cpp b/src/rest_catalog/objects/assert_ref_snapshot_id.cpp index 9a44c60d9..e5ab372cf 100644 --- a/src/rest_catalog/objects/assert_ref_snapshot_id.cpp +++ b/src/rest_catalog/objects/assert_ref_snapshot_id.cpp @@ -50,10 +50,14 @@ string AssertRefSnapshotId::TryFromJSON(yyjson_val *obj) { if (!snapshot_id_val) { return "AssertRefSnapshotId required property 'snapshot-id' is missing"; } else { - if (yyjson_is_sint(snapshot_id_val)) { + if (yyjson_is_null(snapshot_id_val)) { + has_snapshot_id = false; + } else if (yyjson_is_sint(snapshot_id_val)) { snapshot_id = yyjson_get_sint(snapshot_id_val); + has_snapshot_id = true; } else if (yyjson_is_uint(snapshot_id_val)) { snapshot_id = yyjson_get_uint(snapshot_id_val); + has_snapshot_id = true; } else { return StringUtil::Format( "AssertRefSnapshotId property 'snapshot_id' is not of type 'integer', found '%s' instead", diff --git a/src/storage/irc_transaction.cpp b/src/storage/irc_transaction.cpp index 288539048..7f753253c 100644 --- a/src/storage/irc_transaction.cpp +++ b/src/storage/irc_transaction.cpp @@ -47,7 +47,11 @@ void CommitTableToJSON(yyjson_mut_doc *doc, yyjson_mut_val *root_object, auto requirement_json = yyjson_mut_arr_add_obj(doc, requirements_array); yyjson_mut_obj_add_strcpy(doc, requirement_json, "type", assert_ref_snapshot_id.type.value.c_str()); yyjson_mut_obj_add_strcpy(doc, requirement_json, "ref", assert_ref_snapshot_id.ref.c_str()); - yyjson_mut_obj_add_uint(doc, requirement_json, "snapshot-id", assert_ref_snapshot_id.snapshot_id); + if (assert_ref_snapshot_id.has_snapshot_id) { + yyjson_mut_obj_add_uint(doc, requirement_json, "snapshot-id", assert_ref_snapshot_id.snapshot_id); + } else { + yyjson_mut_obj_add_null(doc, requirement_json, "snapshot-id"); + } } else if (requirement.has_assert_create) { auto &assert_create = requirement.assert_create; auto requirement_json = yyjson_mut_arr_add_obj(doc, requirements_array); @@ -230,6 +234,18 @@ static rest_api_objects::TableRequirement CreateAssertRefSnapshotIdRequirement(I auto &res = req.assert_ref_snapshot_id; res.ref = "main"; res.snapshot_id = old_snapshot.snapshot_id; + res.has_snapshot_id = true; + res.type.value = "assert-ref-snapshot-id"; + return req; +} + +static rest_api_objects::TableRequirement CreateAssertNoSnapshotRequirement() { + rest_api_objects::TableRequirement req; + req.has_assert_ref_snapshot_id = true; + + auto &res = req.assert_ref_snapshot_id; + res.ref = "main"; + res.has_snapshot_id = false; res.type.value = "assert-ref-snapshot-id"; return req; } @@ -307,6 +323,10 @@ TableTransactionInfo IRCTransaction::GetTransactionRequest(ClientContext &contex //! If any changes were made to the state of the table, we should assert that our parent snapshot has //! not changed. We don't want to change the table location if someone has added a snapshot commit_state.table_change.requirements.push_back(CreateAssertRefSnapshotIdRequirement(*current_snapshot)); + } else if (!info.has_assert_create) { + //! If the table had no snapshots and isn't created by this transaction, we should assert that no snapshot + //! has been added in the meantime + commit_state.table_change.requirements.push_back(CreateAssertNoSnapshotRequirement()); } transaction.table_changes.push_back(std::move(table_change)); diff --git a/test/sql/local/irc/insert/test_concurrent_insert_new_table.test b/test/sql/local/irc/insert/test_concurrent_insert_new_table.test new file mode 100644 index 000000000..9076cf1aa --- /dev/null +++ b/test/sql/local/irc/insert/test_concurrent_insert_new_table.test @@ -0,0 +1,73 @@ +# name: test/sql/local/irc/insert/test_concurrent_insert_new_table.test +# description: Test concurrent inserts to a newly created table with no snapshots +# group: [insert] + +require-env ICEBERG_SERVER_AVAILABLE + +require avro + +require parquet + +require iceberg + +require httpfs + +# Do not ignore 'HTTP' error messages! +set ignore_error_messages + +statement ok +CREATE SECRET ( + TYPE S3, + KEY_ID 'admin', + SECRET 'password', + ENDPOINT '127.0.0.1:9000', + URL_STYLE 'path', + USE_SSL 0 +); + +statement ok +ATTACH '' AS my_datalake ( + TYPE ICEBERG, + CLIENT_ID 'admin', + CLIENT_SECRET 'password', + ENDPOINT 'http://127.0.0.1:8181' +); + +statement ok +DROP TABLE IF EXISTS my_datalake.default.concurrent_new_table_test; + +statement ok +CREATE TABLE my_datalake.default.concurrent_new_table_test (id INTEGER, name VARCHAR); + +# con1 starts a transaction and inserts +statement ok con1 +begin + +statement ok con1 +INSERT INTO my_datalake.default.concurrent_new_table_test VALUES (1, 'con1'); + +# con2 starts a transaction and inserts +statement ok con2 +begin + +statement ok con2 +INSERT INTO my_datalake.default.concurrent_new_table_test VALUES (2, 'con2'); + +# con2 commits first +statement ok con2 +commit + +# con1 should fail because con2 already added a snapshot +statement error con1 +commit +---- +:.*Requirement failed: branch main was created concurrently.* + +# only con2's data is in the table +query II +SELECT * FROM my_datalake.default.concurrent_new_table_test ORDER BY id; +---- +2 con2 + +statement ok +DROP TABLE my_datalake.default.concurrent_new_table_test;