Skip to content

Commit f083e9f

Browse files
petrmikheevcopybara-github
authored andcommitted
Distinguish missing attribute values and explicitly removed attribute values in DataBagImpl.
PiperOrigin-RevId: 714912235 Change-Id: I2e4af3b5584a1184050aa23a74c544b4aaf461b1
1 parent dadaa9e commit f083e9f

File tree

14 files changed

+493
-242
lines changed

14 files changed

+493
-242
lines changed

koladata/internal/BUILD

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -572,6 +572,7 @@ cc_library(
572572
":object_id",
573573
":schema_utils",
574574
":sparse_source",
575+
":types_buffer",
575576
":uuid_object",
576577
"//koladata/internal/op_utils:presence_or",
577578
"@com_google_absl//absl/base:core_headers",

koladata/internal/data_bag.cc

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
#include "koladata/internal/schema_utils.h"
5656
#include "koladata/internal/slice_builder.h"
5757
#include "koladata/internal/sparse_source.h"
58+
#include "koladata/internal/types_buffer.h"
5859
#include "koladata/internal/uuid_object.h"
5960
#include "arolla/dense_array/dense_array.h"
6061
#include "arolla/dense_array/edge.h"
@@ -184,21 +185,20 @@ absl::Status MergeToMutableDenseSourceOnlySparse(
184185
auto process_source = [&](const SparseSource* source,
185186
auto skip_obj_fn) -> absl::Status {
186187
for (const auto& [key, item] : source->GetAll()) {
187-
if (!item.has_value() || skip_obj_fn(key)) {
188+
if (skip_obj_fn(key)) {
188189
continue;
189190
}
190191
if (options.data_conflict_policy == MergeOptions::kOverwrite) {
191192
RETURN_IF_ERROR(result.Set(key, item));
192193
continue;
193194
}
194-
if (auto this_result = result.Get(key).value_or(DataItem());
195-
!this_result.has_value()) {
195+
if (auto this_result = result.Get(key); !this_result.has_value()) {
196196
RETURN_IF_ERROR(result.Set(key, item));
197197
} else if (options.data_conflict_policy ==
198198
MergeOptions::kRaiseOnConflict &&
199-
this_result != item) {
199+
*this_result != item) {
200200
return absl::FailedPreconditionError(
201-
absl::StrCat("conflict ", key, ": ", this_result, " vs ", item));
201+
absl::StrCat("conflict ", key, ": ", *this_result, " vs ", item));
202202
}
203203
}
204204
return absl::OkStatus();
@@ -256,6 +256,10 @@ absl::Status MergeToMutableDenseSource(
256256
ASSIGN_OR_RETURN(
257257
auto other_items,
258258
GetAttributeFromSources(objects, {&dense_source}, sparse_sources));
259+
// GetAttributeFromSources returns information about removed values in
260+
// TypesBuffer withing DataSliceImpl. Ensure that TypesBuffer is there.
261+
DCHECK_GT(size, 0);
262+
DCHECK(!other_items.types_buffer().id_to_typeidx.empty());
259263

260264
if (options.data_conflict_policy == MergeOptions::kOverwrite) {
261265
const auto& objects_array = objects.values<ObjectId>();
@@ -271,19 +275,19 @@ absl::Status MergeToMutableDenseSource(
271275

272276
for (int64_t offset = 0; offset < size; ++offset) {
273277
// TODO: try to use batch `GetAttributeFromSources`.
274-
auto other_item = other_items[offset];
275-
if (!other_item.has_value()) {
278+
if (other_items.types_buffer().id_to_typeidx[offset] ==
279+
TypesBuffer::kUnset) {
276280
continue;
277281
}
282+
auto other_item = other_items[offset];
278283
auto obj_id = alloc.ObjectByOffset(offset);
279-
if (auto this_result = result.Get(obj_id).value_or(DataItem());
280-
!this_result.has_value()) {
284+
if (auto this_result = result.Get(obj_id); !this_result.has_value()) {
281285
RETURN_IF_ERROR(result.Set(obj_id, other_item));
282286
} else {
283287
if (options.data_conflict_policy == MergeOptions::kRaiseOnConflict &&
284-
this_result != other_item) {
288+
*this_result != other_item) {
285289
return absl::FailedPreconditionError(absl::StrCat(
286-
"conflict ", obj_id, ": ", this_result, " vs ", other_item));
290+
"conflict ", obj_id, ": ", *this_result, " vs ", other_item));
287291
}
288292
}
289293
}
@@ -452,7 +456,6 @@ absl::StatusOr<DataSliceImpl> DataBagImpl::GetAttr(
452456
}
453457
source->Get(objs_span, *bldr);
454458
}
455-
bldr->ConvertMaybeRemovedToUnset();
456459
if (bldr->is_finalized()) {
457460
break;
458461
}

koladata/internal/data_bag_merge_test.cc

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
#include "koladata/internal/dtype.h"
3838
#include "koladata/internal/object_id.h"
3939
#include "koladata/internal/schema_utils.h"
40+
#include "koladata/internal/slice_builder.h"
4041
#include "koladata/internal/uuid_object.h"
4142
#include "arolla/dense_array/qtype/types.h"
4243
#include "arolla/qtype/base_types.h"
@@ -442,22 +443,22 @@ TEST(DataBagTest, MergeObjectsOverwriteOnlyDenseSources) {
442443
ASSERT_OK(db->SetAttr(a, "a", a_value));
443444
auto db2 = DataBagImpl::CreateEmptyDatabag();
444445
ASSERT_OK(db2->SetAttr(a[2], "a", DataItem(17.0)));
446+
ASSERT_OK(db2->SetAttr(a[3], "a", DataItem()));
445447
ASSERT_OK(db2->SetAttr(a[5], "a", DataItem(57)));
446448
ASSERT_OK(db->MergeInplace(
447449
*db2, MergeOptions{.data_conflict_policy = MergeOptions::kOverwrite}));
448450
std::vector<DataItem> a_value_expected(a_value.begin(), a_value.end());
449451
a_value_expected[2] = DataItem(17.0);
452+
a_value_expected[3] = DataItem();
450453
a_value_expected[5] = DataItem(57);
451454
EXPECT_THAT(db->GetAttr(a, "a"),
452455
IsOkAndHolds(ElementsAreArray(a_value_expected)));
453456
}
454457
{
455-
SCOPED_TRACE("merge dense with sparse overwrite nothing");
458+
SCOPED_TRACE("merge dense with empty sparse overwrite nothing");
456459
auto db = DataBagImpl::CreateEmptyDatabag();
457460
ASSERT_OK(db->SetAttr(a, "a", a_value));
458461
auto db2 = DataBagImpl::CreateEmptyDatabag();
459-
ASSERT_OK(db2->SetAttr(a[5], "a", a_value[0]));
460-
ASSERT_OK(db2->SetAttr(a[5], "a", DataItem()));
461462
ASSERT_OK(db->MergeInplace(
462463
*db2, MergeOptions{.data_conflict_policy = MergeOptions::kOverwrite}));
463464
EXPECT_THAT(db->GetAttr(a, "a"), IsOkAndHolds(ElementsAreArray(a_value)));
@@ -480,8 +481,13 @@ TEST(DataBagTest, MergeObjectsOverwriteOnlyDenseSources) {
480481
auto db2 = DataBagImpl::CreateEmptyDatabag();
481482

482483
auto b_value = DataSliceImpl::AllocateEmptyObjects(kSize);
483-
ASSERT_OK(db2->SetAttr(a, "a", b_value));
484-
ASSERT_OK(db2->SetAttr(a[5], "a", DataItem()));
484+
SliceBuilder a_but_one(a.size());
485+
for (int i = 0; i < a.size(); ++i) {
486+
if (i != 5) {
487+
a_but_one.InsertIfNotSetAndUpdateAllocIds(i, a[i]);
488+
}
489+
}
490+
ASSERT_OK(db2->SetAttr(std::move(a_but_one).Build(), "a", b_value));
485491
ASSERT_OK(db->MergeInplace(
486492
*db2, MergeOptions{.data_conflict_policy = MergeOptions::kOverwrite}));
487493

@@ -497,10 +503,15 @@ TEST(DataBagTest, MergeObjectsOverwriteOnlyDenseSources) {
497503
auto db2 = DataBagImpl::CreateEmptyDatabag();
498504

499505
auto b_value = DataSliceImpl::AllocateEmptyObjects(kSize);
500-
ASSERT_OK(db2->SetAttr(a, "a", b_value));
506+
SliceBuilder a_but_one(a.size());
507+
for (int i = 0; i < a.size(); ++i) {
508+
if (i != 5) {
509+
a_but_one.InsertIfNotSetAndUpdateAllocIds(i, a[i]);
510+
}
511+
}
512+
ASSERT_OK(db2->SetAttr(std::move(a_but_one).Build(), "a", b_value));
501513
ASSERT_OK(db2->SetAttr(a[1], "a", DataItem(27.0)));
502514
ASSERT_OK(db2->SetAttr(a[3], "a", DataItem(57)));
503-
ASSERT_OK(db2->SetAttr(a[5], "a", DataItem()));
504515
ASSERT_OK(db->MergeInplace(
505516
*db2, MergeOptions{.data_conflict_policy = MergeOptions::kOverwrite}));
506517

koladata/internal/data_bag_test.cc

Lines changed: 34 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -439,32 +439,47 @@ TEST(DataBagTest, SetGetWithFallbackObjectId) {
439439
}
440440

441441
// General case
442+
std::vector<arolla::OptionalValue<ObjectId>> main_b_ids(size);
442443
std::vector<arolla::OptionalValue<ObjectId>> main_b_values(size);
444+
std::vector<arolla::OptionalValue<ObjectId>> fallback_b_ids(size);
443445
std::vector<arolla::OptionalValue<ObjectId>> fallback_b_values(size);
444446
std::vector<arolla::OptionalValue<ObjectId>> merge_b_values(size);
445447
AllocationId alloc_b1 = Allocate(size);
446448
AllocationId alloc_b2 = Allocate(size);
447449
for (size_t i = 0; i < size; ++i) {
448450
ObjectId id1 = alloc_b1.ObjectByOffset(i);
449451
ObjectId id2 = alloc_b2.ObjectByOffset(i);
450-
if (i % 4 == 0) {
452+
if (int group = i % 5; group == 0) {
453+
main_b_ids[i] = ds[i].value<ObjectId>();
451454
main_b_values[i] = id1;
452455
merge_b_values[i] = id1;
453-
} else if (i % 4 == 1) {
456+
} else if (group == 1) {
457+
fallback_b_ids[i] = ds[i].value<ObjectId>();
454458
fallback_b_values[i] = id2;
455459
merge_b_values[i] = id2;
456-
} else if (i % 4 == 2) {
460+
} else if (group == 2) {
461+
main_b_ids[i] = ds[i].value<ObjectId>();
457462
main_b_values[i] = id1;
463+
fallback_b_ids[i] = ds[i].value<ObjectId>();
458464
fallback_b_values[i] = id2;
459465
merge_b_values[i] = id1;
466+
} else if (group == 3) {
467+
// Note: main_b_values contains removed values, fallback not used
468+
main_b_ids[i] = ds[i].value<ObjectId>();
469+
fallback_b_ids[i] = ds[i].value<ObjectId>();
470+
fallback_b_values[i] = id2;
460471
}
461472
}
473+
auto main_ds_ids_b = DataSliceImpl::Create(
474+
arolla::CreateDenseArray<ObjectId>(main_b_ids));
462475
auto main_ds_b = DataSliceImpl::Create(
463476
arolla::CreateDenseArray<ObjectId>(main_b_values));
464-
ASSERT_OK(db->SetAttr(ds, "b", main_ds_b));
477+
ASSERT_OK(db->SetAttr(main_ds_ids_b, "b", main_ds_b));
478+
auto fallback_ds_ids_b = DataSliceImpl::Create(
479+
arolla::CreateDenseArray<ObjectId>(fallback_b_ids));
465480
auto fallback_ds_b = DataSliceImpl::Create(
466481
arolla::CreateDenseArray<ObjectId>(fallback_b_values));
467-
ASSERT_OK(db_f->SetAttr(ds, "b", fallback_ds_b));
482+
ASSERT_OK(db_f->SetAttr(fallback_ds_ids_b, "b", fallback_ds_b));
468483
ASSERT_OK_AND_ASSIGN(DataSliceImpl ds_b_get,
469484
db->GetAttr(ds, "b", {db_f.get()}));
470485
auto merge_ds_b = DataSliceImpl::Create(
@@ -526,7 +541,7 @@ TEST(DataBagTest, SetGetWithFallbackPrimitive) {
526541
merge_b_values[i] = val1;
527542
} else if (i % 4 == 1) {
528543
fallback_b_values[i] = val2;
529-
merge_b_values[i] = val2;
544+
// Note: main_b_values contains removed values, fallback not used
530545
} else if (i % 4 == 2) {
531546
main_b_values[i] = val1;
532547
fallback_b_values[i] = val2;
@@ -559,31 +574,42 @@ TEST(DataBagTest, SetGetWithFallbackPrimitiveMixedType) {
559574
for (size_t size : {1, 2, 4, 17, 126}) {
560575
auto ds = DataSliceImpl::AllocateEmptyObjects(size);
561576

577+
std::vector<arolla::OptionalValue<ObjectId>> main_b_ids(size);
562578
std::vector<arolla::OptionalValue<int64_t>> main_b_values(size);
579+
std::vector<arolla::OptionalValue<ObjectId>> fallback_b_ids(size);
563580
std::vector<arolla::OptionalValue<double>> fallback_b_values(size);
581+
564582
std::vector<arolla::OptionalValue<int64_t>> merge_b_values_int64(size);
565583
std::vector<arolla::OptionalValue<double>> merge_b_values_double(size);
566584
for (size_t i = 0; i < size; ++i) {
567585
int64_t val1 = i + 1;
568586
double val2 = -i - 1;
569587
if (i % 4 == 0) {
588+
main_b_ids[i] = ds[i].value<ObjectId>();
570589
main_b_values[i] = val1;
571590
merge_b_values_int64[i] = val1;
572591
} else if (i % 4 == 1) {
592+
fallback_b_ids[i] = ds[i].value<ObjectId>();
573593
fallback_b_values[i] = val2;
574594
merge_b_values_double[i] = val2;
575595
} else if (i % 4 == 2) {
596+
main_b_ids[i] = ds[i].value<ObjectId>();
597+
fallback_b_ids[i] = ds[i].value<ObjectId>();
576598
main_b_values[i] = val1;
577599
fallback_b_values[i] = val2;
578600
merge_b_values_int64[i] = val1;
579601
}
580602
}
603+
auto main_ds_ids_b =
604+
DataSliceImpl::Create(arolla::CreateDenseArray<ObjectId>(main_b_ids));
581605
auto main_ds_b =
582606
DataSliceImpl::Create(arolla::CreateDenseArray<int64_t>(main_b_values));
583-
ASSERT_OK(db->SetAttr(ds, "b", main_ds_b));
607+
ASSERT_OK(db->SetAttr(main_ds_ids_b, "b", main_ds_b));
608+
auto fallback_ds_ids_b = DataSliceImpl::Create(
609+
arolla::CreateDenseArray<ObjectId>(fallback_b_ids));
584610
auto fallback_ds_b = DataSliceImpl::Create(
585611
arolla::CreateDenseArray<double>(fallback_b_values));
586-
ASSERT_OK(db_f->SetAttr(ds, "b", fallback_ds_b));
612+
ASSERT_OK(db_f->SetAttr(fallback_ds_ids_b, "b", fallback_ds_b));
587613
ASSERT_OK_AND_ASSIGN(DataSliceImpl ds_b_get,
588614
db->GetAttr(ds, "b", {db_f.get()}));
589615
auto merge_ds_b = DataSliceImpl::Create(

0 commit comments

Comments
 (0)