Skip to content

Commit c9b0e4d

Browse files
petrmikheevcopybara-github
authored andcommitted
Optimize serialization of big allocation with SparseDataSources.
PiperOrigin-RevId: 871792692 Change-Id: I5888c934ab2e9fef64ab8f4b465116c3002122a1
1 parent 76d9043 commit c9b0e4d

File tree

5 files changed

+85
-12
lines changed

5 files changed

+85
-12
lines changed

koladata/internal/data_bag.cc

Lines changed: 27 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3411,11 +3411,6 @@ DataBagImpl::ExtractSmallAllocAttrContent(absl::string_view attr_name) const {
34113411
}
34123412
}
34133413
}
3414-
std::sort(res.begin(), res.end(),
3415-
[](const DataBagContent::AttrItemContent& lhs,
3416-
const DataBagContent::AttrItemContent& rhs) {
3417-
return lhs.object_id < rhs.object_id;
3418-
});
34193414
return res;
34203415
}
34213416

@@ -3433,11 +3428,34 @@ absl::StatusOr<DataBagContent::AttrContent> DataBagImpl::ExtractAttrContent(
34333428
if (size == 0) {
34343429
continue;
34353430
}
3436-
auto objects = DataSliceImpl::ObjectsFromAllocation(alloc, size);
3437-
ASSIGN_OR_RETURN(auto values, GetAttributeFromSources(
3438-
objects, dense_sources, sparse_sources));
3439-
content.allocs.push_back({alloc, values});
3431+
if (dense_sources.empty()) {
3432+
if (sparse_sources.size() == 1) {
3433+
for (const auto& [obj, value] : sparse_sources.front()->GetAll()) {
3434+
content.items.push_back({obj, value});
3435+
}
3436+
} else {
3437+
absl::flat_hash_set<ObjectId> visited_ids;
3438+
for (const SparseSource* source : sparse_sources) {
3439+
for (const auto& [obj, value] : source->GetAll()) {
3440+
if (bool inserted = visited_ids.insert(obj).second; inserted) {
3441+
content.items.push_back({obj, value});
3442+
}
3443+
}
3444+
}
3445+
}
3446+
} else {
3447+
auto objects = DataSliceImpl::ObjectsFromAllocation(alloc, size);
3448+
ASSIGN_OR_RETURN(
3449+
auto values,
3450+
GetAttributeFromSources(objects, dense_sources, sparse_sources));
3451+
content.allocs.push_back({alloc, std::move(values)});
3452+
}
34403453
}
3454+
std::sort(content.items.begin(), content.items.end(),
3455+
[](const DataBagContent::AttrItemContent& lhs,
3456+
const DataBagContent::AttrItemContent& rhs) {
3457+
return lhs.object_id < rhs.object_id;
3458+
});
34413459
return content;
34423460
}
34433461

koladata/internal/data_bag.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,7 @@ class DataBagImpl : public arolla::RefcountedBase {
210210
// DataSource with lower index will override values of sources with
211211
// higher indices.
212212
// Sparse sources always override dense sources.
213-
// Returns size of the alloc (can be less then alloc.Capacity() e.g. if
213+
// Returns size of the alloc (can be less than alloc.Capacity() e.g. if
214214
// an initial dense source for this allocation was created from a pre-existing
215215
// DenseArray).
216216
int64_t GetAttributeDataSources(AllocationId alloc, absl::string_view attr,

koladata/internal/data_bag_extract_content_test.cc

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,44 @@ TEST(DataBagTest, ExtractSingleItemBigAllocRemovedValues) {
7272
std::bit_ceil(size), DataItem())));
7373
}
7474

75+
TEST(DataBagTest, ExtractAttrFromSparseSources) {
76+
auto db = DataBagImpl::CreateEmptyDatabag();
77+
DataSliceImpl objs =
78+
DataSliceImpl::ObjectsFromAllocation(Allocate(1000), 1000);
79+
80+
ASSERT_OK(db->SetAttr(objs[43], "a", DataItem(57)));
81+
ASSERT_OK(db->SetAttr(objs[57], "a", DataItem(75)));
82+
83+
{
84+
ASSERT_OK_AND_ASSIGN(auto content, db->ExtractContent());
85+
EXPECT_TRUE(content.dicts.empty());
86+
EXPECT_TRUE(content.lists.empty());
87+
auto a = content.attrs["a"];
88+
EXPECT_TRUE(a.allocs.empty());
89+
ASSERT_EQ(a.items.size(), 2);
90+
EXPECT_EQ(a.items[0].object_id, objs[43].value<ObjectId>());
91+
EXPECT_EQ(a.items[0].value, DataItem(57));
92+
EXPECT_EQ(a.items[1].object_id, objs[57].value<ObjectId>());
93+
EXPECT_EQ(a.items[1].value, DataItem(75));
94+
}
95+
96+
db = db->PartiallyPersistentFork();
97+
ASSERT_OK(db->SetAttr(objs[57], "a", DataItem(43)));
98+
99+
{
100+
ASSERT_OK_AND_ASSIGN(auto content, db->ExtractContent());
101+
EXPECT_TRUE(content.dicts.empty());
102+
EXPECT_TRUE(content.lists.empty());
103+
auto a = content.attrs["a"];
104+
EXPECT_TRUE(a.allocs.empty());
105+
ASSERT_EQ(a.items.size(), 2);
106+
EXPECT_EQ(a.items[0].object_id, objs[43].value<ObjectId>());
107+
EXPECT_EQ(a.items[0].value, DataItem(57));
108+
EXPECT_EQ(a.items[1].object_id, objs[57].value<ObjectId>());
109+
EXPECT_EQ(a.items[1].value, DataItem(43));
110+
}
111+
}
112+
75113
TEST(DataBagTest, ExtractRemovedValues) {
76114
int64_t size = 5;
77115
auto db = DataBagImpl::CreateEmptyDatabag();

koladata/s11n/decoder.cc

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -576,8 +576,14 @@ absl::Status DecodeAttrProto(const KodaV1Proto::AttrProto& attr_proto,
576576
}
577577
const TypedValue& tval = input_values[chunk_proto.values_subindex()];
578578
if (tval.GetType() == arolla::GetQType<internal::DataItem>()) {
579-
RETURN_IF_ERROR(db.SetAttr(internal::DataItem(obj), attr_proto.name(),
580-
tval.UnsafeAs<internal::DataItem>()));
579+
if (obj.IsSchema() && attr_proto.name() != schema::kSchemaNameAttr) {
580+
RETURN_IF_ERROR(db.SetSchemaAttr(internal::DataItem(obj),
581+
attr_proto.name(),
582+
tval.UnsafeAs<internal::DataItem>()));
583+
} else {
584+
RETURN_IF_ERROR(db.SetAttr(internal::DataItem(obj), attr_proto.name(),
585+
tval.UnsafeAs<internal::DataItem>()));
586+
}
581587
} else {
582588
ASSIGN_OR_RETURN(const internal::DataSliceImpl& values,
583589
tval.As<internal::DataSliceImpl>());

py/koladata/s11n/data_slice_s11n_test.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -552,6 +552,17 @@ def test_dtype(self):
552552
with self.assertRaisesRegex(ValueError, 'unsupported DType: ANY'):
553553
arolla.s11n.loads(container_proto.SerializeToString())
554554

555+
def test_memory_usage(self):
556+
a = kd.obj_shaped_as(kd.item(1).repeat(int(1e6))).with_attrs(x=1)
557+
b = a.S[0].extract()
558+
# `b` is a single ObjectId from a big allocation. Here we test that we
559+
# serialize the allocation in sparse form (1 ObjectId + value) rather than
560+
# in dense form (1 base ObjectId + 1e6 bytes with type and presence
561+
# information + value)
562+
data = kd.dumps(b)
563+
self.assertLess(len(data), 1000)
564+
kd.testing.assert_equivalent(kd.loads(data), b)
565+
555566

556567
if __name__ == '__main__':
557568
absltest.main()

0 commit comments

Comments
 (0)