Skip to content

Commit 6710d1f

Browse files
mrdropletcopybara-github
authored andcommitted
Improve the error message when DataBag has schema/dict conflict during merge.
PiperOrigin-RevId: 718056822 Change-Id: I818d938b56fc0b2c818a0fee434e663c669131c1
1 parent d37f6fe commit 6710d1f

File tree

11 files changed

+183
-21
lines changed

11 files changed

+183
-21
lines changed

koladata/BUILD

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,7 @@ cc_test(
102102
":test_utils",
103103
"//koladata/internal:data_bag",
104104
"//koladata/testing:matchers",
105+
"//koladata/testing:test_env",
105106
"@com_google_absl//absl/status",
106107
"@com_google_absl//absl/status:status_matchers",
107108
"@com_google_arolla//arolla/util:status_backport",

koladata/adoption_utils.cc

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
#include "koladata/data_bag.h"
2727
#include "koladata/data_slice.h"
2828
#include "koladata/extract_utils.h"
29+
#include "koladata/repr_utils.h"
2930
#include "koladata/internal/dtype.h"
3031
#include "koladata/internal/schema_utils.h"
3132
#include "arolla/util/status_macros_backport.h"
@@ -41,7 +42,12 @@ absl::Status AdoptionQueue::AdoptInto(DataBag& db) const {
4142
visited_bags.insert(other_db.get());
4243
RETURN_IF_ERROR(db.MergeInplace(other_db, /*overwrite=*/false,
4344
/*allow_data_conflicts=*/false,
44-
/*allow_schema_conflicts=*/false));
45+
/*allow_schema_conflicts=*/false))
46+
.With([&](const absl::Status& status) {
47+
return AssembleErrorMessage(status, {.db = db.Freeze(),
48+
.ds = std::nullopt,
49+
.to_be_merged_db = other_db});
50+
});
4551
}
4652
for (const DataSlice& slice : slices_to_merge_) {
4753
if (visited_bags.contains(slice.GetBag().get())) {
@@ -55,7 +61,13 @@ absl::Status AdoptionQueue::AdoptInto(DataBag& db) const {
5561
}
5662
RETURN_IF_ERROR(db.MergeInplace(extracted_db, /*overwrite=*/false,
5763
/*allow_data_conflicts=*/false,
58-
/*allow_schema_conflicts=*/false));
64+
/*allow_schema_conflicts=*/false))
65+
.With([&](const absl::Status& status) {
66+
return AssembleErrorMessage(status,
67+
{.db = db.Freeze(),
68+
.ds = std::nullopt,
69+
.to_be_merged_db = extracted_db});
70+
});
5971
}
6072
return absl::OkStatus();
6173
}

koladata/data_bag.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
#include <utility>
1919
#include <vector>
2020

21-
2221
#include "absl/base/no_destructor.h"
2322
#include "absl/container/flat_hash_set.h"
2423
#include "absl/functional/function_ref.h"
@@ -33,6 +32,7 @@
3332
#include "arolla/qtype/simple_qtype.h"
3433
#include "arolla/qtype/typed_value.h"
3534
#include "arolla/util/fingerprint.h"
35+
#include "arolla/util/meta.h"
3636
#include "arolla/util/repr.h"
3737
#include "arolla/util/status_macros_backport.h"
3838

koladata/internal/BUILD

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -591,9 +591,13 @@ cc_test(
591591
":data_item",
592592
":data_slice",
593593
":dtype",
594+
":error_cc_proto",
595+
":error_utils",
594596
":object_id",
595597
":schema_utils",
596598
":uuid_object",
599+
"//koladata/s11n",
600+
"//koladata/testing:test_env",
597601
"@com_google_absl//absl/base:no_destructor",
598602
"@com_google_absl//absl/status",
599603
"@com_google_absl//absl/status:status_matchers",

koladata/internal/data_bag.cc

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -358,6 +358,21 @@ MergeOptions::ConflictHandlingOption ReverseConflictHandlingOption(
358358
return MergeOptions::kRaiseOnConflict;
359359
}
360360

361+
absl::StatusOr<internal::Error> MakeSchemaOrDictMergeError(
362+
const internal::ObjectId& object, const DataItem& key,
363+
const DataItem& expected_value, const DataItem& assigned_value) {
364+
internal::Error error;
365+
auto* schema_or_dict_conflict = error.mutable_schema_or_dict_conflict();
366+
ASSIGN_OR_RETURN(*schema_or_dict_conflict->mutable_object_id(),
367+
internal::EncodeDataItem(internal::DataItem(object)));
368+
ASSIGN_OR_RETURN(*schema_or_dict_conflict->mutable_key(),
369+
internal::EncodeDataItem(key));
370+
ASSIGN_OR_RETURN(*schema_or_dict_conflict->mutable_expected_value(),
371+
internal::EncodeDataItem(expected_value));
372+
ASSIGN_OR_RETURN(*schema_or_dict_conflict->mutable_assigned_value(),
373+
internal::EncodeDataItem(assigned_value));
374+
return error;
375+
}
361376
} // namespace
362377

363378
MergeOptions ReverseMergeOptions(const MergeOptions& options) {
@@ -3017,9 +3032,13 @@ absl::Status DataBagImpl::MergeDictsInplace(const DataBagImpl& other,
30173032
const DataItem& this_value = this_dict.GetOrAssign(key, other_value);
30183033
if (conflict_policy == MergeOptions::kRaiseOnConflict &&
30193034
this_value != other_value) {
3020-
return absl::FailedPreconditionError(absl::StrCat(
3021-
"conflicting dict values for ", alloc_id.ObjectByOffset(i),
3022-
" key", key, ": ", this_value, " vs ", other_value));
3035+
internal::ObjectId object_id = alloc_id.ObjectByOffset(i);
3036+
return internal::WithErrorPayload(
3037+
absl::FailedPreconditionError(absl::StrCat(
3038+
"conflicting dict values for ", object_id, " key", key,
3039+
": ", this_value, " vs ", other_value)),
3040+
MakeSchemaOrDictMergeError(object_id, key, this_value,
3041+
other_value));
30233042
}
30243043
}
30253044
}

koladata/internal/data_bag_merge_test.cc

Lines changed: 27 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
#include <cstdint>
1818
#include <functional>
1919
#include <initializer_list>
20+
#include <optional>
2021
#include <string>
2122
#include <tuple>
2223
#include <type_traits>
@@ -35,6 +36,8 @@
3536
#include "koladata/internal/data_item.h"
3637
#include "koladata/internal/data_slice.h"
3738
#include "koladata/internal/dtype.h"
39+
#include "koladata/internal/error.pb.h"
40+
#include "koladata/internal/error_utils.h"
3841
#include "koladata/internal/object_id.h"
3942
#include "koladata/internal/schema_utils.h"
4043
#include "koladata/internal/slice_builder.h"
@@ -957,6 +960,9 @@ TYPED_TEST(DataBagMergeTest, MergeDictsOnly) {
957960
if (merge_options.data_conflict_policy == MergeOptions::kRaiseOnConflict) {
958961
EXPECT_THAT(status, StatusIs(absl::StatusCode::kFailedPrecondition,
959962
HasSubstr("conflict")));
963+
std::optional<Error> error = GetErrorPayload(status);
964+
ASSERT_TRUE(error.has_value());
965+
EXPECT_TRUE(error->has_schema_or_dict_conflict());
960966
} else if (merge_options.data_conflict_policy == MergeOptions::kOverwrite) {
961967
EXPECT_OK(status);
962968
EXPECT_THAT(db->GetFromDict(a, k), IsOkAndHolds(DataItem(75)));
@@ -1061,10 +1067,13 @@ TEST(DataBagTest, MergeExplicitSchemas) {
10611067
{
10621068
SCOPED_TRACE("overwrite data");
10631069
auto res = db->PartiallyPersistentFork();
1064-
EXPECT_THAT(
1065-
res->MergeInplace(*db2, MergeOptions{.data_conflict_policy =
1066-
MergeOptions::kOverwrite}),
1067-
StatusIs(absl::StatusCode::kFailedPrecondition, HasSubstr("conflict")));
1070+
absl::Status status = res->MergeInplace(
1071+
*db2, MergeOptions{.data_conflict_policy = MergeOptions::kOverwrite});
1072+
EXPECT_THAT(status, StatusIs(absl::StatusCode::kFailedPrecondition,
1073+
HasSubstr("conflict")));
1074+
std::optional<Error> error = GetErrorPayload(status);
1075+
ASSERT_TRUE(error.has_value());
1076+
EXPECT_TRUE(error->has_schema_or_dict_conflict());
10681077
}
10691078
{
10701079
SCOPED_TRACE("keep schema");
@@ -1078,17 +1087,24 @@ TEST(DataBagTest, MergeExplicitSchemas) {
10781087
{
10791088
SCOPED_TRACE("keep data");
10801089
auto res = db->PartiallyPersistentFork();
1081-
EXPECT_THAT(
1082-
res->MergeInplace(*db2, MergeOptions{.data_conflict_policy =
1083-
MergeOptions::kKeepOriginal}),
1084-
StatusIs(absl::StatusCode::kFailedPrecondition, HasSubstr("conflict")));
1090+
absl::Status status = res->MergeInplace(
1091+
*db2,
1092+
MergeOptions{.data_conflict_policy = MergeOptions::kKeepOriginal});
1093+
EXPECT_THAT(status, StatusIs(absl::StatusCode::kFailedPrecondition,
1094+
HasSubstr("conflict")));
1095+
std::optional<Error> error = GetErrorPayload(status);
1096+
ASSERT_TRUE(error.has_value());
1097+
EXPECT_TRUE(error->has_schema_or_dict_conflict());
10851098
}
10861099
{
10871100
SCOPED_TRACE("raise on conflict");
10881101
auto res = db->PartiallyPersistentFork();
1089-
EXPECT_THAT(
1090-
res->MergeInplace(*db2, MergeOptions()),
1091-
StatusIs(absl::StatusCode::kFailedPrecondition, HasSubstr("conflict")));
1102+
absl::Status status = res->MergeInplace(*db2, MergeOptions());
1103+
EXPECT_THAT(status, StatusIs(absl::StatusCode::kFailedPrecondition,
1104+
HasSubstr("conflict")));
1105+
std::optional<Error> error = GetErrorPayload(status);
1106+
ASSERT_TRUE(error.has_value());
1107+
EXPECT_TRUE(error->has_schema_or_dict_conflict());
10921108
}
10931109
}
10941110

koladata/internal/error.proto

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,14 @@ message IncompatibleSchema {
4040
optional arolla.serialization_base.ContainerProto assigned_schema = 3;
4141
}
4242

43+
// Schema or dict conflicts when merging two DataBags.
44+
message SchemaOrDictConflict {
45+
optional arolla.serialization_base.ContainerProto object_id = 1;
46+
optional arolla.serialization_base.ContainerProto key = 2;
47+
optional arolla.serialization_base.ContainerProto expected_value = 3;
48+
optional arolla.serialization_base.ContainerProto assigned_value = 4;
49+
}
50+
4351
// Koda error message transportation proto.
4452
message Error {
4553
optional string error_message = 1;
@@ -48,5 +56,6 @@ message Error {
4856
NoCommonSchema no_common_schema = 3;
4957
MissingObjectSchema missing_object_schema = 4;
5058
IncompatibleSchema incompatible_schema = 5;
59+
SchemaOrDictConflict schema_or_dict_conflict = 6;
5160
}
5261
}

koladata/repr_utils.cc

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,69 @@ absl::StatusOr<Error> SetIncompatibleSchemaError(
202202
return cause;
203203
}
204204

205+
constexpr const char* kDataBagMergeErrorSchemaConflict =
206+
"cannot merge DataBags due to an exception encountered when merging "
207+
"schemas.\n\n"
208+
"The conflicting schema in the first DataBag: %s\n"
209+
"The conflicting schema in the second DataBag: %s\n\n"
210+
"The cause is the schema for attribute %s is incompatible: %s vs %s\n";
211+
212+
constexpr const char* kDataBagMergeErrorDictConflict =
213+
"cannot merge DataBags due to an exception encountered when merging "
214+
"dicts.\n\n"
215+
"The conflicting dict in the first DataBag: %s\n"
216+
"The conflicting dict in the second DataBag: %s\n\n"
217+
"The cause is the value of the key %s is incompatible: %s vs %s\n";
218+
219+
absl::StatusOr<Error> SetDataBagMergeError(Error cause, const DataBagPtr& db1,
220+
const DataBagPtr& db2) {
221+
const auto& schema_or_dict_conflict = cause.schema_or_dict_conflict();
222+
ASSIGN_OR_RETURN(
223+
internal::DataItem object_item,
224+
DecodeDataItem(schema_or_dict_conflict.object_id()));
225+
ASSIGN_OR_RETURN(internal::DataItem key_item,
226+
DecodeDataItem(schema_or_dict_conflict.key()));
227+
internal::DataItem schema = internal::DataItem(
228+
object_item.is_schema() ? schema::kSchema : schema::kAny);
229+
ASSIGN_OR_RETURN(
230+
DataSlice expected_value,
231+
DataSlice::Create(
232+
DecodeDataItem(schema_or_dict_conflict.expected_value()),
233+
DataSlice::JaggedShape::Empty(), schema,
234+
db1));
235+
ASSIGN_OR_RETURN(
236+
DataSlice assigned_value,
237+
DataSlice::Create(
238+
DecodeDataItem(schema_or_dict_conflict.assigned_value()),
239+
DataSlice::JaggedShape::Empty(), schema,
240+
db2));
241+
ASSIGN_OR_RETURN(
242+
DataSlice item,
243+
DataSlice::Create(object_item, schema, db1));
244+
ASSIGN_OR_RETURN(
245+
DataSlice conflicting_item,
246+
DataSlice::Create(object_item, std::move(schema), db2));
247+
248+
std::string key_str = internal::DataItemRepr(
249+
key_item, {.strip_quotes = false});
250+
ASSIGN_OR_RETURN(std::string item_str, DataSliceToStr(item));
251+
ASSIGN_OR_RETURN(std::string conflicting_item_str,
252+
DataSliceToStr(conflicting_item));
253+
ASSIGN_OR_RETURN(std::string expected_value_str,
254+
DataSliceToStr(expected_value));
255+
ASSIGN_OR_RETURN(std::string assigned_value_str,
256+
DataSliceToStr(assigned_value));
257+
std::string error_str =
258+
object_item.is_schema()
259+
? absl::StrFormat(kDataBagMergeErrorSchemaConflict, item_str,
260+
conflicting_item_str, key_str, expected_value_str,
261+
assigned_value_str)
262+
: absl::StrFormat(kDataBagMergeErrorDictConflict, item_str,
263+
conflicting_item_str, key_str, expected_value_str,
264+
assigned_value_str);
265+
cause.set_error_message(std::move(error_str));
266+
return cause;
267+
}
205268
} // namespace
206269

207270
absl::Status AssembleErrorMessage(const absl::Status& status,
@@ -225,6 +288,12 @@ absl::Status AssembleErrorMessage(const absl::Status& status,
225288
data.db, data.ds));
226289
return WithErrorPayload(status, error);
227290
}
291+
if (cause->has_schema_or_dict_conflict()) {
292+
ASSIGN_OR_RETURN(Error error,
293+
SetDataBagMergeError(*std::move(cause), data.db,
294+
data.to_be_merged_db));
295+
return WithErrorPayload(status, error);
296+
}
228297
return status;
229298
}
230299

koladata/repr_utils.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ namespace koladata {
2929
struct SupplementalData {
3030
absl::Nullable<const koladata::DataBagPtr> db;
3131
std::optional<const koladata::DataSlice> ds;
32+
absl::Nullable<const koladata::DataBagPtr> to_be_merged_db;
3233
};
3334

3435
// Creates the readable error message and sets it in the payload of Status if

py/koladata/types/data_bag.cc

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1196,7 +1196,11 @@ absl::Nullable<PyObject*> PyDataBag_merge_inplace(PyObject* self,
11961196
}
11971197
RETURN_IF_ERROR(db->MergeInplace(*other, overwrite, allow_data_conflicts,
11981198
allow_schema_conflicts))
1199-
.With(arolla::python::SetPyErrFromStatus);
1199+
.With([&](const absl::Status& status) {
1200+
return arolla::python::SetPyErrFromStatus(AssembleErrorMessage(
1201+
status,
1202+
{.db = db, .ds = std::nullopt, .to_be_merged_db = *other}));
1203+
});
12001204
}
12011205
Py_RETURN_NONE;
12021206
}

0 commit comments

Comments
 (0)