Skip to content

Commit b0f0a61

Browse files
committed
Implement index checking and merging logic
1 parent a4d4fe1 commit b0f0a61

File tree

1 file changed

+100
-33
lines changed

1 file changed

+100
-33
lines changed

cpp/arcticdb/processing/clause.cpp

+100-33
Original file line numberDiff line numberDiff line change
@@ -1382,26 +1382,78 @@ std::vector<EntityId> ConcatClause::process(std::vector<EntityId>&& entity_ids)
13821382
}
13831383

13841384
// TODO: Move somewhere else
1385-
// Verifies that all of the index information specified in the input schemas are compatible with each other, and that
1386-
// the requisite fields are present in the StreamDescriptors if necessary
1387-
std::pair<IndexDescriptorImpl, arcticdb::proto::descriptors::NormalizationMetadata> generate_index_schema(
1388-
const std::vector<OutputSchema>& input_schemas) {
1389-
util::check<ErrorCode::E_ASSERTION_FAILURE>(!input_schemas.empty(), "Cannot join empty list of schemas");
1385+
using namespace arcticdb::proto::descriptors;
1386+
1387+
std::tuple<std::vector<IndexDescriptorImpl>, std::vector<NormalizationMetadata>, std::vector<std::shared_ptr<FieldCollection>>>
1388+
split_schemas(std::vector<OutputSchema>&& schemas) {
1389+
std::vector<IndexDescriptorImpl> index_descs;
1390+
std::vector<NormalizationMetadata> norm_metas;
1391+
std::vector<std::shared_ptr<FieldCollection>> field_collections;
1392+
index_descs.reserve(schemas.size());
1393+
norm_metas.reserve(schemas.size());
1394+
field_collections.reserve(schemas.size());
1395+
std::for_each(schemas.begin(), schemas.end(), [&](OutputSchema& schema) {
1396+
index_descs.emplace_back(schema.stream_descriptor().index());
1397+
norm_metas.emplace_back(std::move(schema.norm_metadata_));
1398+
field_collections.emplace_back(schema.stream_descriptor().fields_ptr());
1399+
});
1400+
1401+
return {std::move(index_descs), std::move(norm_metas), std::move(field_collections)};
1402+
}
1403+
1404+
IndexDescriptorImpl generate_index_descriptor(const std::vector<IndexDescriptorImpl>& index_descs) {
1405+
// Ensure:
1406+
// - Type is the same (empty matches anything)
1407+
// - Field count is the same
1408+
std::optional<IndexDescriptor::Type> index_type;
1409+
std::optional<uint32_t> index_desc_field_count;
1410+
for (const auto& index_desc: index_descs) {
1411+
if (!index_type.has_value()) {
1412+
index_type = index_desc.type();
1413+
} else {
1414+
schema::check<ErrorCode::E_DESCRIPTOR_MISMATCH>(
1415+
index_desc.type() == IndexDescriptor::Type::EMPTY || index_desc.type() == *index_type,
1416+
"Mismatching IndexDescriptor in schema join");
1417+
}
1418+
if (!index_desc_field_count.has_value()) {
1419+
index_desc_field_count = index_desc.field_count();
1420+
} else {
1421+
schema::check<ErrorCode::E_DESCRIPTOR_MISMATCH>(
1422+
index_desc.field_count() == *index_desc_field_count,
1423+
"Mismatching IndexDescriptor in schema join");
1424+
}
1425+
}
1426+
return {*index_desc_field_count, *index_type};
1427+
}
1428+
1429+
NormalizationMetadata generate_norm_meta(const std::vector<NormalizationMetadata>& norm_metas) {
1430+
// Ensure:
1431+
// All have PandasIndex OR PandasMultiIndex
1432+
// If PandasIndex:
1433+
// - name - if all the same maintain, otherwise empty string
1434+
// - is_int - means name is a string representation of an integer, so fold into name logic
1435+
// - tz - same as name
1436+
// - is_physically stored must all be the same
1437+
// - RangeIndex
1438+
// - start==0/step==1 - maintain
1439+
// - All steps the same, use start from first schema and maintain step
1440+
// - Otherwise, log warning, set start==0/step==1
1441+
// PandasMultiIndex same, minus is_physically stored, plus field_count matching
13901442
std::optional<bool> has_multi_index;
13911443
std::optional<std::string> name;
13921444
std::optional<bool> is_int;
13931445
std::optional<std::string> tz;
13941446
std::optional<bool> is_physically_stored;
1395-
std::optional<int> field_count;
1447+
std::optional<uint32_t> field_count;
13961448
std::optional<int> start;
13971449
std::optional<int> step;
1398-
for (const auto& schema: input_schemas) {
1399-
const auto& common = schema.norm_metadata_.df().common();
1450+
for (const auto& norm_meta: norm_metas) {
1451+
const auto& common = norm_meta.df().common();
14001452
if (!has_multi_index.has_value()) {
14011453
has_multi_index = common.has_multi_index();
14021454
} else {
1403-
user_input::check<ErrorCode::E_INVALID_USER_ARGUMENT>(
1404-
schema.norm_metadata_.df().common().has_multi_index() == has_multi_index,
1455+
schema::check<ErrorCode::E_DESCRIPTOR_MISMATCH>(
1456+
common.has_multi_index() == has_multi_index,
14051457
"Mismatching norm metadata in schema join");
14061458
}
14071459
if (has_multi_index) {
@@ -1425,7 +1477,7 @@ std::pair<IndexDescriptorImpl, arcticdb::proto::descriptors::NormalizationMetada
14251477
if (!field_count.has_value()) {
14261478
field_count = index.field_count();
14271479
} else {
1428-
user_input::check<ErrorCode::E_INVALID_USER_ARGUMENT>(
1480+
schema::check<ErrorCode::E_DESCRIPTOR_MISMATCH>(
14291481
index.field_count() == *field_count,
14301482
"Mismatching norm metadata in schema join");
14311483
}
@@ -1450,42 +1502,57 @@ std::pair<IndexDescriptorImpl, arcticdb::proto::descriptors::NormalizationMetada
14501502
if (!is_physically_stored.has_value()) {
14511503
is_physically_stored = index.is_physically_stored();
14521504
} else {
1453-
user_input::check<ErrorCode::E_INVALID_USER_ARGUMENT>(
1505+
schema::check<ErrorCode::E_DESCRIPTOR_MISMATCH>(
14541506
index.is_physically_stored() == *is_physically_stored,
14551507
"Mismatching norm metadata in schema join");
14561508
}
14571509
if (!start.has_value()) {
14581510
start = index.start();
14591511
step = index.step();
14601512
} else {
1461-
1513+
if (index.step() != *step) {
1514+
log::version().warn("Mismatching RangeIndexes being combined, setting to start=0, step=1");
1515+
start = 0;
1516+
step = 1;
1517+
}
14621518
}
14631519
}
14641520
}
1521+
NormalizationMetadata norm_meta;
1522+
if (*has_multi_index) {
1523+
auto* index = norm_meta.mutable_df()->mutable_common()->mutable_multi_index();
1524+
index->set_name(*name);
1525+
index->set_is_int(*is_int);
1526+
index->set_tz(*tz);
1527+
index->set_field_count(*field_count);
1528+
} else {
1529+
auto* index = norm_meta.mutable_df()->mutable_common()->mutable_index();
1530+
index->set_name(*name);
1531+
index->set_is_int(*is_int);
1532+
index->set_tz(*tz);
1533+
index->set_is_physically_stored(*is_physically_stored);
1534+
index->set_start(*start);
1535+
index->set_step(*step);
1536+
}
1537+
return norm_meta;
14651538
}
14661539

14671540
OutputSchema ConcatClause::join_schemas(std::vector<OutputSchema>&& output_schemas) const {
1468-
// Norm meta ensure:
1469-
// All have PandasIndex OR PandasMultiIndex
1470-
// If PandasIndex:
1471-
// - name - if all the same maintain, otherwise empty string
1472-
// - is_int - means name is a string representation of an integer, so fold into name logic
1473-
// - tz - same as name
1474-
// - is_physically stored must all be the same
1475-
// - RangeIndex
1476-
// - start==0/step==1 - maintain
1477-
// - All steps the same, use start from first schema and maintain step
1478-
// - Otherwise, log warning, pick set start==0/step==1
1479-
// PandasMultiIndex same, minus is_physically stored, plus field_count matching
1480-
1481-
// StreamDescriptor index ensure:
1482-
// - Type is the same (empty matches anything)
1483-
// - Field count is the same
1541+
util::check(!output_schemas.empty(), "Cannot join empty list of schemas");
1542+
// Decompose output_schemas vector into vectors of IndexDescriptorImpl, NormalizationMetadata, and FieldCollection
1543+
ARCTICDB_UNUSED auto [index_descs, norm_metas, field_collections] = split_schemas(std::move(output_schemas));
1544+
ARCTICDB_UNUSED auto index_desc = generate_index_descriptor(index_descs);
1545+
ARCTICDB_UNUSED auto norm_meta = generate_norm_meta(norm_metas);
1546+
if (norm_meta.df().common().has_multi_index()) {
1547+
schema::check<ErrorCode::E_DESCRIPTOR_MISMATCH>(
1548+
norm_meta.df().common().multi_index().field_count() == index_desc.field_count(),
1549+
"Mismatching index field counts in schema join");
1550+
} else {
1551+
schema::check<ErrorCode::E_DESCRIPTOR_MISMATCH>(
1552+
index_desc.field_count() <= 1,
1553+
"Mismatching index field counts in schema join");
1554+
}
14841555

1485-
// Indexing ensure:
1486-
// - norm meta version and stream descriptor version semantically match
1487-
// - (check that this the case for all of the modify_schema methods on single-symbol clauses)
1488-
// - all descriptors have requisite index columns
14891556

14901557
// Inner join:
14911558
// - Create map from column name to type for first input schema

0 commit comments

Comments
 (0)