Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ class AssertRefSnapshotId {
TableRequirementType type;
string ref;
int64_t snapshot_id;
bool has_snapshot_id = false;
};

} // namespace rest_api_objects
Expand Down
6 changes: 5 additions & 1 deletion src/rest_catalog/objects/assert_ref_snapshot_id.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,14 @@ string AssertRefSnapshotId::TryFromJSON(yyjson_val *obj) {
if (!snapshot_id_val) {
return "AssertRefSnapshotId required property 'snapshot-id' is missing";
} else {
if (yyjson_is_sint(snapshot_id_val)) {
if (yyjson_is_null(snapshot_id_val)) {
has_snapshot_id = false;
} else if (yyjson_is_sint(snapshot_id_val)) {
snapshot_id = yyjson_get_sint(snapshot_id_val);
has_snapshot_id = true;
} else if (yyjson_is_uint(snapshot_id_val)) {
snapshot_id = yyjson_get_uint(snapshot_id_val);
has_snapshot_id = true;
} else {
return StringUtil::Format(
"AssertRefSnapshotId property 'snapshot_id' is not of type 'integer', found '%s' instead",
Expand Down
22 changes: 21 additions & 1 deletion src/storage/irc_transaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,11 @@ void CommitTableToJSON(yyjson_mut_doc *doc, yyjson_mut_val *root_object,
auto requirement_json = yyjson_mut_arr_add_obj(doc, requirements_array);
yyjson_mut_obj_add_strcpy(doc, requirement_json, "type", assert_ref_snapshot_id.type.value.c_str());
yyjson_mut_obj_add_strcpy(doc, requirement_json, "ref", assert_ref_snapshot_id.ref.c_str());
yyjson_mut_obj_add_uint(doc, requirement_json, "snapshot-id", assert_ref_snapshot_id.snapshot_id);
if (assert_ref_snapshot_id.has_snapshot_id) {
yyjson_mut_obj_add_uint(doc, requirement_json, "snapshot-id", assert_ref_snapshot_id.snapshot_id);
} else {
yyjson_mut_obj_add_null(doc, requirement_json, "snapshot-id");
}
} else if (requirement.has_assert_create) {
auto &assert_create = requirement.assert_create;
auto requirement_json = yyjson_mut_arr_add_obj(doc, requirements_array);
Expand Down Expand Up @@ -230,6 +234,18 @@ static rest_api_objects::TableRequirement CreateAssertRefSnapshotIdRequirement(I
auto &res = req.assert_ref_snapshot_id;
res.ref = "main";
res.snapshot_id = old_snapshot.snapshot_id;
res.has_snapshot_id = true;
res.type.value = "assert-ref-snapshot-id";
return req;
}

static rest_api_objects::TableRequirement CreateAssertNoSnapshotRequirement() {
rest_api_objects::TableRequirement req;
req.has_assert_ref_snapshot_id = true;

auto &res = req.assert_ref_snapshot_id;
res.ref = "main";
res.has_snapshot_id = false;
res.type.value = "assert-ref-snapshot-id";
return req;
}
Expand Down Expand Up @@ -307,6 +323,10 @@ TableTransactionInfo IRCTransaction::GetTransactionRequest(ClientContext &contex
//! If any changes were made to the state of the table, we should assert that our parent snapshot has
//! not changed. We don't want to change the table location if someone has added a snapshot
commit_state.table_change.requirements.push_back(CreateAssertRefSnapshotIdRequirement(*current_snapshot));
} else if (!info.has_assert_create) {
//! If the table had no snapshots and isn't created by this transaction, we should assert that no snapshot
//! has been added in the meantime
commit_state.table_change.requirements.push_back(CreateAssertNoSnapshotRequirement());
}

transaction.table_changes.push_back(std::move(table_change));
Expand Down
73 changes: 73 additions & 0 deletions test/sql/local/irc/insert/test_concurrent_insert_new_table.test
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
# name: test/sql/local/irc/insert/test_concurrent_insert_new_table.test
# description: Test concurrent inserts to a newly created table with no snapshots
# group: [insert]

require-env ICEBERG_SERVER_AVAILABLE

require avro

require parquet

require iceberg

require httpfs

# Do not ignore 'HTTP' error messages!
set ignore_error_messages

statement ok
CREATE SECRET (
TYPE S3,
KEY_ID 'admin',
SECRET 'password',
ENDPOINT '127.0.0.1:9000',
URL_STYLE 'path',
USE_SSL 0
);

statement ok
ATTACH '' AS my_datalake (
TYPE ICEBERG,
CLIENT_ID 'admin',
CLIENT_SECRET 'password',
ENDPOINT 'http://127.0.0.1:8181'
);

statement ok
DROP TABLE IF EXISTS my_datalake.default.concurrent_new_table_test;

statement ok
CREATE TABLE my_datalake.default.concurrent_new_table_test (id INTEGER, name VARCHAR);

# con1 starts a transaction and inserts
statement ok con1
begin

statement ok con1
INSERT INTO my_datalake.default.concurrent_new_table_test VALUES (1, 'con1');

# con2 starts a transaction and inserts
statement ok con2
begin

statement ok con2
INSERT INTO my_datalake.default.concurrent_new_table_test VALUES (2, 'con2');

# con2 commits first
statement ok con2
commit

# con1 should fail because con2 already added a snapshot
statement error con1
commit
----
<REGEX>:.*Requirement failed: branch main was created concurrently.*

# only con2's data is in the table
query II
SELECT * FROM my_datalake.default.concurrent_new_table_test ORDER BY id;
----
2 con2

statement ok
DROP TABLE my_datalake.default.concurrent_new_table_test;
Loading