Skip to content

Commit c5b690b

Browse files
mrdropletcopybara-github
authored andcommitted
Improve DataBag merge conflict on lists
PiperOrigin-RevId: 718393242 Change-Id: I4a0857516ad3c296899f4722dffe7dfbd7c86dcb
1 parent b19e1cd commit c5b690b

File tree

5 files changed

+212
-46
lines changed

5 files changed

+212
-46
lines changed

koladata/internal/data_bag.cc

Lines changed: 51 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -362,7 +362,9 @@ absl::StatusOr<internal::Error> MakeSchemaOrDictMergeError(
362362
const internal::ObjectId& object, const DataItem& key,
363363
const DataItem& expected_value, const DataItem& assigned_value) {
364364
internal::Error error;
365-
auto* schema_or_dict_conflict = error.mutable_schema_or_dict_conflict();
365+
auto* data_bag_merge_conflict = error.mutable_data_bag_merge_conflict();
366+
auto* schema_or_dict_conflict =
367+
data_bag_merge_conflict->mutable_schema_or_dict_conflict();
366368
ASSIGN_OR_RETURN(*schema_or_dict_conflict->mutable_object_id(),
367369
internal::EncodeDataItem(internal::DataItem(object)));
368370
ASSIGN_OR_RETURN(*schema_or_dict_conflict->mutable_key(),
@@ -373,6 +375,39 @@ absl::StatusOr<internal::Error> MakeSchemaOrDictMergeError(
373375
internal::EncodeDataItem(assigned_value));
374376
return error;
375377
}
378+
379+
absl::StatusOr<internal::Error> MakeListMergeError(
380+
const internal::ObjectId& list_object_id,
381+
const std::optional<int64_t> first_list_size,
382+
const std::optional<int64_t> second_list_size,
383+
const std::optional<int64_t> list_item_conflict_index,
384+
const DataItem& first_list_item, const DataItem& second_list_item) {
385+
DCHECK(list_object_id.IsList());
386+
internal::Error error;
387+
auto* data_bag_merge_conflict = error.mutable_data_bag_merge_conflict();
388+
auto* list_conflict = data_bag_merge_conflict->mutable_list_conflict();
389+
ASSIGN_OR_RETURN(
390+
*list_conflict->mutable_list_object_id(),
391+
internal::EncodeDataItem(internal::DataItem(list_object_id)));
392+
if (first_list_size.has_value()) {
393+
list_conflict->set_first_list_size(*first_list_size);
394+
}
395+
if (second_list_size.has_value()) {
396+
list_conflict->set_second_list_size(*second_list_size);
397+
}
398+
if (list_item_conflict_index.has_value()) {
399+
list_conflict->set_list_item_conflict_index(*list_item_conflict_index);
400+
}
401+
if (first_list_item.has_value()) {
402+
ASSIGN_OR_RETURN(*list_conflict->mutable_first_conflicting_item(),
403+
internal::EncodeDataItem(first_list_item));
404+
}
405+
if (second_list_item.has_value()) {
406+
ASSIGN_OR_RETURN(*list_conflict->mutable_second_conflicting_item(),
407+
internal::EncodeDataItem(second_list_item));
408+
}
409+
return error;
410+
}
376411
} // namespace
377412

378413
MergeOptions ReverseMergeOptions(const MergeOptions& options) {
@@ -2987,15 +3022,23 @@ absl::Status DataBagImpl::MergeListsInplace(const DataBagImpl& other,
29873022
}
29883023
if (options.data_conflict_policy == MergeOptions::kRaiseOnConflict) {
29893024
if (this_list.size() != other_list.size()) {
2990-
return absl::FailedPreconditionError(
2991-
absl::StrCat("conflicting list sizes for ", alloc_id, ": ",
2992-
this_list.size(), " vs ", other_list.size()));
3025+
return internal::WithErrorPayload(
3026+
absl::FailedPreconditionError(
3027+
absl::StrCat("conflicting list sizes for ", alloc_id, ": ",
3028+
this_list.size(), " vs ", other_list.size())),
3029+
MakeListMergeError(alloc_id.ObjectByOffset(i), this_list.size(),
3030+
other_list.size(), std::nullopt, DataItem(),
3031+
DataItem()));
29933032
}
29943033
for (size_t j = 0; j < other_list.size(); ++j) {
29953034
if (this_list[j] != other_list[j]) {
2996-
return absl::FailedPreconditionError(absl::StrCat(
2997-
"conflicting list values for ", alloc_id, "at index ", j,
2998-
": ", this_list[j], " vs ", other_list[j]));
3035+
return internal::WithErrorPayload(
3036+
absl::FailedPreconditionError(absl::StrCat(
3037+
"conflicting list values for ", alloc_id, "at index ", j,
3038+
": ", this_list[j], " vs ", other_list[j])),
3039+
MakeListMergeError(alloc_id.ObjectByOffset(i),
3040+
this_list.size(), other_list.size(), j,
3041+
this_list[j], other_list[j]));
29993042
}
30003043
}
30013044
}
@@ -3039,7 +3082,7 @@ absl::Status DataBagImpl::MergeDictsInplace(const DataBagImpl& other,
30393082
"conflicting dict values for ", object_id, " key", key,
30403083
": ", this_value, " vs ", other_value)),
30413084
MakeSchemaOrDictMergeError(object_id, key, this_value,
3042-
other_value));
3085+
other_value));
30433086
}
30443087
}
30453088
}

koladata/internal/data_bag_merge_test.cc

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -798,6 +798,9 @@ TYPED_TEST(DataBagMergeTest, MergeLists) {
798798
EXPECT_THAT(status,
799799
StatusIs(absl::StatusCode::kFailedPrecondition,
800800
HasSubstr("conflict")));
801+
std::optional<Error> error = GetErrorPayload(status);
802+
ASSERT_TRUE(error.has_value());
803+
EXPECT_TRUE(error->data_bag_merge_conflict().has_list_conflict());
801804
} else if (merge_options.data_conflict_policy == MergeOptions::kOverwrite) {
802805
EXPECT_OK(status);
803806
verify_lists(lists, db.get());
@@ -823,6 +826,9 @@ TYPED_TEST(DataBagMergeTest, MergeLists) {
823826
EXPECT_THAT(status,
824827
StatusIs(absl::StatusCode::kFailedPrecondition,
825828
HasSubstr("conflict")));
829+
std::optional<Error> error = GetErrorPayload(status);
830+
ASSERT_TRUE(error.has_value());
831+
EXPECT_TRUE(error->data_bag_merge_conflict().has_list_conflict());
826832
} else if (merge_options.data_conflict_policy == MergeOptions::kOverwrite) {
827833
EXPECT_OK(status);
828834
verify_lists(lists, db.get());
@@ -962,7 +968,8 @@ TYPED_TEST(DataBagMergeTest, MergeDictsOnly) {
962968
HasSubstr("conflict")));
963969
std::optional<Error> error = GetErrorPayload(status);
964970
ASSERT_TRUE(error.has_value());
965-
EXPECT_TRUE(error->has_schema_or_dict_conflict());
971+
EXPECT_TRUE(
972+
error->data_bag_merge_conflict().has_schema_or_dict_conflict());
966973
} else if (merge_options.data_conflict_policy == MergeOptions::kOverwrite) {
967974
EXPECT_OK(status);
968975
EXPECT_THAT(db->GetFromDict(a, k), IsOkAndHolds(DataItem(75)));
@@ -1073,7 +1080,7 @@ TEST(DataBagTest, MergeExplicitSchemas) {
10731080
HasSubstr("conflict")));
10741081
std::optional<Error> error = GetErrorPayload(status);
10751082
ASSERT_TRUE(error.has_value());
1076-
EXPECT_TRUE(error->has_schema_or_dict_conflict());
1083+
EXPECT_TRUE(error->data_bag_merge_conflict().has_schema_or_dict_conflict());
10771084
}
10781085
{
10791086
SCOPED_TRACE("keep schema");
@@ -1094,7 +1101,7 @@ TEST(DataBagTest, MergeExplicitSchemas) {
10941101
HasSubstr("conflict")));
10951102
std::optional<Error> error = GetErrorPayload(status);
10961103
ASSERT_TRUE(error.has_value());
1097-
EXPECT_TRUE(error->has_schema_or_dict_conflict());
1104+
EXPECT_TRUE(error->data_bag_merge_conflict().has_schema_or_dict_conflict());
10981105
}
10991106
{
11001107
SCOPED_TRACE("raise on conflict");
@@ -1104,7 +1111,7 @@ TEST(DataBagTest, MergeExplicitSchemas) {
11041111
HasSubstr("conflict")));
11051112
std::optional<Error> error = GetErrorPayload(status);
11061113
ASSERT_TRUE(error.has_value());
1107-
EXPECT_TRUE(error->has_schema_or_dict_conflict());
1114+
EXPECT_TRUE(error->data_bag_merge_conflict().has_schema_or_dict_conflict());
11081115
}
11091116
}
11101117

koladata/internal/error.proto

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -40,12 +40,28 @@ message IncompatibleSchema {
4040
optional arolla.serialization_base.ContainerProto assigned_schema = 3;
4141
}
4242

43-
// Schema or dict conflicts when merging two DataBags.
44-
message SchemaOrDictConflict {
45-
optional arolla.serialization_base.ContainerProto object_id = 1;
46-
optional arolla.serialization_base.ContainerProto key = 2;
47-
optional arolla.serialization_base.ContainerProto expected_value = 3;
48-
optional arolla.serialization_base.ContainerProto assigned_value = 4;
43+
// Conflicts when merging two DataBags.
44+
message DataBagMergeConflict {
45+
message ListConflict {
46+
optional arolla.serialization_base.ContainerProto list_object_id = 1;
47+
optional uint64 first_list_size = 2;
48+
optional uint64 second_list_size = 3;
49+
optional uint64 list_item_conflict_index = 4;
50+
optional arolla.serialization_base.ContainerProto first_conflicting_item =
51+
5;
52+
optional arolla.serialization_base.ContainerProto second_conflicting_item =
53+
6;
54+
}
55+
message SchemaOrDictConflict {
56+
optional arolla.serialization_base.ContainerProto object_id = 1;
57+
optional arolla.serialization_base.ContainerProto key = 2;
58+
optional arolla.serialization_base.ContainerProto expected_value = 3;
59+
optional arolla.serialization_base.ContainerProto assigned_value = 4;
60+
}
61+
oneof conflict {
62+
ListConflict list_conflict = 1;
63+
SchemaOrDictConflict schema_or_dict_conflict = 2;
64+
}
4965
}
5066

5167
// Koda error message transportation proto.
@@ -56,6 +72,6 @@ message Error {
5672
NoCommonSchema no_common_schema = 3;
5773
MissingObjectSchema missing_object_schema = 4;
5874
IncompatibleSchema incompatible_schema = 5;
59-
SchemaOrDictConflict schema_or_dict_conflict = 6;
75+
DataBagMergeConflict data_bag_merge_conflict = 6;
6076
}
6177
}

koladata/repr_utils.cc

Lines changed: 91 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -216,52 +216,116 @@ constexpr const char* kDataBagMergeErrorDictConflict =
216216
"The conflicting dict in the second DataBag: %s\n\n"
217217
"The cause is the value of the key %s is incompatible: %s vs %s\n";
218218

219-
absl::StatusOr<Error> SetDataBagMergeError(Error cause, const DataBagPtr& db1,
220-
const DataBagPtr& db2) {
221-
const auto& schema_or_dict_conflict = cause.schema_or_dict_conflict();
222-
ASSIGN_OR_RETURN(
223-
internal::DataItem object_item,
224-
DecodeDataItem(schema_or_dict_conflict.object_id()));
225-
ASSIGN_OR_RETURN(internal::DataItem key_item,
226-
DecodeDataItem(schema_or_dict_conflict.key()));
219+
absl::StatusOr<std::string> SetSchemaOrDictErrorMessage(
220+
const internal::DataBagMergeConflict::SchemaOrDictConflict& conflict,
221+
const DataBagPtr& db1, const DataBagPtr& db2) {
222+
ASSIGN_OR_RETURN(internal::DataItem object_item,
223+
DecodeDataItem(conflict.object_id()));
224+
ASSIGN_OR_RETURN(internal::DataItem key_item, DecodeDataItem(conflict.key()));
227225
internal::DataItem schema = internal::DataItem(
228226
object_item.is_schema() ? schema::kSchema : schema::kAny);
229227
ASSIGN_OR_RETURN(
230228
DataSlice expected_value,
231-
DataSlice::Create(
232-
DecodeDataItem(schema_or_dict_conflict.expected_value()),
233-
DataSlice::JaggedShape::Empty(), schema,
234-
db1));
229+
DataSlice::Create(DecodeDataItem(conflict.expected_value()),
230+
DataSlice::JaggedShape::Empty(), schema, db1));
235231
ASSIGN_OR_RETURN(
236232
DataSlice assigned_value,
237-
DataSlice::Create(
238-
DecodeDataItem(schema_or_dict_conflict.assigned_value()),
239-
DataSlice::JaggedShape::Empty(), schema,
240-
db2));
233+
DataSlice::Create(DecodeDataItem(conflict.assigned_value()),
234+
DataSlice::JaggedShape::Empty(), schema, db2));
241235
ASSIGN_OR_RETURN(
242236
DataSlice item,
243237
DataSlice::Create(object_item, schema, db1));
244238
ASSIGN_OR_RETURN(
245239
DataSlice conflicting_item,
246240
DataSlice::Create(object_item, std::move(schema), db2));
247241

248-
std::string key_str = internal::DataItemRepr(
249-
key_item, {.strip_quotes = false});
242+
std::string key_str =
243+
internal::DataItemRepr(key_item, {.strip_quotes = false});
250244
ASSIGN_OR_RETURN(std::string item_str, DataSliceToStr(item));
251245
ASSIGN_OR_RETURN(std::string conflicting_item_str,
252246
DataSliceToStr(conflicting_item));
253247
ASSIGN_OR_RETURN(std::string expected_value_str,
254248
DataSliceToStr(expected_value));
255249
ASSIGN_OR_RETURN(std::string assigned_value_str,
256250
DataSliceToStr(assigned_value));
257-
std::string error_str =
258-
object_item.is_schema()
259-
? absl::StrFormat(kDataBagMergeErrorSchemaConflict, item_str,
260-
conflicting_item_str, key_str, expected_value_str,
261-
assigned_value_str)
262-
: absl::StrFormat(kDataBagMergeErrorDictConflict, item_str,
263-
conflicting_item_str, key_str, expected_value_str,
264-
assigned_value_str);
251+
return object_item.is_schema()
252+
? absl::StrFormat(kDataBagMergeErrorSchemaConflict, item_str,
253+
conflicting_item_str, key_str,
254+
expected_value_str, assigned_value_str)
255+
: absl::StrFormat(kDataBagMergeErrorDictConflict, item_str,
256+
conflicting_item_str, key_str,
257+
expected_value_str, assigned_value_str);
258+
}
259+
260+
constexpr const char* kDataBagMergeErrorListSizeConflict =
261+
"cannot merge DataBags due to an exception encountered when merging "
262+
"lists.\n\n"
263+
"The conflicting list in the first DataBag: %s\n"
264+
"The conflicting list in the second DataBag: %s\n\n"
265+
"The cause is the list sizes are incompatible: %d vs %d\n";
266+
267+
constexpr const char* kDataBagMergeErrorListItemConflict =
268+
"cannot merge DataBags due to an exception encountered when merging "
269+
"lists.\n\n"
270+
"The conflicting list in the first DataBag: %s\n"
271+
"The conflicting list in the second DataBag: %s\n\n"
272+
"The cause is the value at index %d is incompatible: %s vs %s\n";
273+
274+
absl::StatusOr<std::string> SetListErrorMessage(
275+
const internal::DataBagMergeConflict::ListConflict& conflict,
276+
const DataBagPtr& db1, const DataBagPtr& db2) {
277+
ASSIGN_OR_RETURN(internal::DataItem list_item,
278+
DecodeDataItem(conflict.list_object_id()));
279+
ASSIGN_OR_RETURN(DataSlice list,
280+
DataSlice::Create(list_item, DataSlice::JaggedShape::Empty(),
281+
internal::DataItem(schema::kAny), db1));
282+
ASSIGN_OR_RETURN(DataSlice conflicting_list,
283+
DataSlice::Create(list_item, DataSlice::JaggedShape::Empty(),
284+
internal::DataItem(schema::kAny), db2));
285+
ASSIGN_OR_RETURN(std::string list_str, DataSliceToStr(list));
286+
ASSIGN_OR_RETURN(std::string conflicting_list_str,
287+
DataSliceToStr(conflicting_list));
288+
if (conflict.has_list_item_conflict_index()) {
289+
ASSIGN_OR_RETURN(
290+
DataSlice first_conflicting_item,
291+
DataSlice::Create(DecodeDataItem(conflict.first_conflicting_item()),
292+
DataSlice::JaggedShape::Empty(),
293+
internal::DataItem(schema::kAny), db1));
294+
ASSIGN_OR_RETURN(
295+
DataSlice second_conflicting_item,
296+
DataSlice::Create(DecodeDataItem(conflict.second_conflicting_item()),
297+
DataSlice::JaggedShape::Empty(),
298+
internal::DataItem(schema::kAny), db2));
299+
ASSIGN_OR_RETURN(std::string first_conflicting_item_str,
300+
DataSliceToStr(first_conflicting_item));
301+
ASSIGN_OR_RETURN(std::string second_conflicting_item_str,
302+
DataSliceToStr(second_conflicting_item));
303+
return absl::StrFormat(
304+
kDataBagMergeErrorListItemConflict, list_str, conflicting_list_str,
305+
conflict.list_item_conflict_index(), first_conflicting_item_str,
306+
second_conflicting_item_str);
307+
}
308+
return absl::StrFormat(kDataBagMergeErrorListSizeConflict, list_str,
309+
conflicting_list_str, conflict.first_list_size(),
310+
conflict.second_list_size());
311+
}
312+
313+
absl::StatusOr<Error> SetDataBagMergeError(Error cause, const DataBagPtr& db1,
314+
const DataBagPtr& db2) {
315+
std::string error_str;
316+
if (cause.data_bag_merge_conflict().has_schema_or_dict_conflict()) {
317+
ASSIGN_OR_RETURN(
318+
error_str,
319+
SetSchemaOrDictErrorMessage(
320+
cause.data_bag_merge_conflict().schema_or_dict_conflict(), db1,
321+
db2));
322+
}
323+
if (cause.data_bag_merge_conflict().has_list_conflict()) {
324+
ASSIGN_OR_RETURN(
325+
error_str,
326+
SetListErrorMessage(cause.data_bag_merge_conflict().list_conflict(),
327+
db1, db2));
328+
}
265329
cause.set_error_message(std::move(error_str));
266330
return cause;
267331
}
@@ -288,7 +352,7 @@ absl::Status AssembleErrorMessage(const absl::Status& status,
288352
data.db, data.ds));
289353
return WithErrorPayload(status, error);
290354
}
291-
if (cause->has_schema_or_dict_conflict()) {
355+
if (cause->has_data_bag_merge_conflict()) {
292356
ASSIGN_OR_RETURN(Error error,
293357
SetDataBagMergeError(*std::move(cause), data.db,
294358
data.to_be_merged_db));

py/koladata/types/data_bag_test.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1610,6 +1610,42 @@ def test_merge_inplace_dict_conflict(self):
16101610
The conflicting dict in the second DataBag: Dict\{1=Entity\(\):\$[0-9a-zA-Z]{22}\}
16111611
16121612
The cause is the value of the key 1 is incompatible: Entity\(\):\$[0-9a-zA-Z]{22} vs Entity\(\):\$[0-9a-zA-Z]{22}
1613+
""",
1614+
):
1615+
db1.merge_inplace(db2, allow_data_conflicts=False)
1616+
1617+
def test_merge_inplace_list_item_conflict(self):
1618+
itemid = kde.allocation.new_listid().eval()
1619+
db1 = bag()
1620+
db1.list([db1.obj(x=1), db1.obj(y=2)], itemid=itemid)
1621+
db2 = bag()
1622+
db2.list([db2.obj(x=1), db2.obj(y=3)], itemid=itemid)
1623+
with self.assertRaisesRegex(
1624+
exceptions.KodaError,
1625+
r"""cannot merge DataBags due to an exception encountered when merging lists.
1626+
1627+
The conflicting list in the first DataBag: List\[Entity\(\):\$[0-9a-zA-Z]{22}, Entity\(\):\$[0-9a-zA-Z]{22}\]
1628+
The conflicting list in the second DataBag: List\[Entity\(\):\$[0-9a-zA-Z]{22}, Entity\(\):\$[0-9a-zA-Z]{22}\]
1629+
1630+
The cause is the value at index 0 is incompatible: Entity\(\):\$[0-9a-zA-Z]{22} vs Entity\(\):\$[0-9a-zA-Z]{22}
1631+
""",
1632+
):
1633+
db1.merge_inplace(db2, allow_data_conflicts=False)
1634+
1635+
def test_merge_inplace_list_size_conflict(self):
1636+
itemid = kde.allocation.new_listid().eval()
1637+
db1 = bag()
1638+
db1.list([db1.obj(x=1), db1.obj(y=2)], itemid=itemid)
1639+
db2 = bag()
1640+
db2.list([db2.obj(x=1)], itemid=itemid)
1641+
with self.assertRaisesRegex(
1642+
exceptions.KodaError,
1643+
r"""cannot merge DataBags due to an exception encountered when merging lists.
1644+
1645+
The conflicting list in the first DataBag: List\[Entity\(\):\$[0-9a-zA-Z]{22}, Entity\(\):\$[0-9a-zA-Z]{22}\]
1646+
The conflicting list in the second DataBag: List\[Entity\(\):\$[0-9a-zA-Z]{22}\]
1647+
1648+
The cause is the list sizes are incompatible: 2 vs 1
16131649
""",
16141650
):
16151651
db1.merge_inplace(db2, allow_data_conflicts=False)

0 commit comments

Comments
 (0)