Skip to content

Commit a4d4fe1

Browse files
committed
WIP: Join schema index construction
1 parent a995492 commit a4d4fe1

File tree

5 files changed

+221
-86
lines changed

5 files changed

+221
-86
lines changed

cpp/arcticdb/CMakeLists.txt

+86-85
Original file line numberDiff line numberDiff line change
@@ -910,92 +910,93 @@ if(${TEST})
910910
python_utils_dump_vars_if_enabled("Python for test compilation")
911911

912912
set(unit_test_srcs
913-
async/test/test_async.cpp
914-
codec/test/test_codec.cpp
915-
codec/test/test_encode_field_collection.cpp
916-
codec/test/test_segment_header.cpp
917-
codec/test/test_encoded_field.cpp
918-
column_store/test/ingestion_stress_test.cpp
919-
column_store/test/test_column.cpp
920-
column_store/test/test_column_data_random_accessor.cpp
921-
column_store/test/test_index_filtering.cpp
922-
column_store/test/test_memory_segment.cpp
923-
column_store/test/test_statistics.cpp
924-
entity/test/test_atom_key.cpp
925-
entity/test/test_key_serialization.cpp
926-
entity/test/test_metrics.cpp
927-
entity/test/test_ref_key.cpp
928-
entity/test/test_tensor.cpp
929-
log/test/test_log.cpp
930-
pipeline/test/test_container.hpp
931-
pipeline/test/test_pipeline.cpp
932-
pipeline/test/test_query.cpp
933-
pipeline/test/test_frame_allocation.cpp
934-
util/test/test_regex.cpp
935-
processing/test/test_arithmetic_type_promotion.cpp
936-
processing/test/test_clause.cpp
937-
processing/test/test_component_manager.cpp
938-
processing/test/test_expression.cpp
939-
processing/test/test_filter_and_project_sparse.cpp
940-
processing/test/test_type_promotion.cpp
941-
processing/test/test_operation_dispatch.cpp
942-
processing/test/test_output_schema_aggregator_types.cpp
943-
processing/test/test_output_schema_ast_validity.cpp
944-
processing/test/test_output_schema_basic.cpp
945-
processing/test/test_parallel_processing.cpp
946-
processing/test/test_resample.cpp
947-
processing/test/test_set_membership.cpp
948-
processing/test/test_signed_unsigned_comparison.cpp
949-
processing/test/test_type_comparison.cpp
950-
storage/test/test_local_storages.cpp
951-
storage/test/test_memory_storage.cpp
952-
storage/test/test_s3_storage.cpp
953-
storage/test/test_storage_factory.cpp
954-
storage/test/common.hpp
955-
storage/test/test_storage_exceptions.cpp
956-
storage/test/test_azure_storage.cpp
957-
storage/test/common.hpp
958-
storage/test/test_storage_operations.cpp
959-
stream/test/stream_test_common.cpp
960-
stream/test/test_aggregator.cpp
961-
stream/test/test_incompletes.cpp
962-
stream/test/test_protobuf_mappings.cpp
963-
stream/test/test_row_builder.cpp
964-
stream/test/test_segment_aggregator.cpp
965-
stream/test/test_types.cpp
966-
util/memory_tracing.hpp
913+
# async/test/test_async.cpp
914+
# codec/test/test_codec.cpp
915+
# codec/test/test_encode_field_collection.cpp
916+
# codec/test/test_segment_header.cpp
917+
# codec/test/test_encoded_field.cpp
918+
# column_store/test/ingestion_stress_test.cpp
919+
# column_store/test/test_column.cpp
920+
# column_store/test/test_column_data_random_accessor.cpp
921+
# column_store/test/test_index_filtering.cpp
922+
# column_store/test/test_memory_segment.cpp
923+
# column_store/test/test_statistics.cpp
924+
# entity/test/test_atom_key.cpp
925+
# entity/test/test_key_serialization.cpp
926+
# entity/test/test_metrics.cpp
927+
# entity/test/test_ref_key.cpp
928+
# entity/test/test_tensor.cpp
929+
# log/test/test_log.cpp
930+
# pipeline/test/test_container.hpp
931+
# pipeline/test/test_pipeline.cpp
932+
# pipeline/test/test_query.cpp
933+
# pipeline/test/test_frame_allocation.cpp
934+
# util/test/test_regex.cpp
935+
# processing/test/test_arithmetic_type_promotion.cpp
936+
# processing/test/test_clause.cpp
937+
# processing/test/test_component_manager.cpp
938+
# processing/test/test_expression.cpp
939+
# processing/test/test_filter_and_project_sparse.cpp
940+
processing/test/test_join_schemas.cpp
941+
# processing/test/test_type_promotion.cpp
942+
# processing/test/test_operation_dispatch.cpp
943+
# processing/test/test_output_schema_aggregator_types.cpp
944+
# processing/test/test_output_schema_ast_validity.cpp
945+
# processing/test/test_output_schema_basic.cpp
946+
# processing/test/test_parallel_processing.cpp
947+
# processing/test/test_resample.cpp
948+
# processing/test/test_set_membership.cpp
949+
# processing/test/test_signed_unsigned_comparison.cpp
950+
# processing/test/test_type_comparison.cpp
951+
# storage/test/test_local_storages.cpp
952+
# storage/test/test_memory_storage.cpp
953+
# storage/test/test_s3_storage.cpp
954+
# storage/test/test_storage_factory.cpp
955+
# storage/test/common.hpp
956+
# storage/test/test_storage_exceptions.cpp
957+
# storage/test/test_azure_storage.cpp
958+
# storage/test/common.hpp
959+
# storage/test/test_storage_operations.cpp
960+
# stream/test/stream_test_common.cpp
961+
# stream/test/test_aggregator.cpp
962+
# stream/test/test_incompletes.cpp
963+
# stream/test/test_protobuf_mappings.cpp
964+
# stream/test/test_row_builder.cpp
965+
# stream/test/test_segment_aggregator.cpp
966+
# stream/test/test_types.cpp
967+
# util/memory_tracing.hpp
967968
util/test/gtest_main.cpp
968-
util/test/random_throw.hpp
969-
util/test/test_bitmagic.cpp
970-
util/test/test_buffer_pool.cpp
971-
util/test/test_composite.cpp
972-
util/test/test_cursor.cpp
973-
util/test/test_decimal.cpp
974-
util/test/test_exponential_backoff.cpp
975-
util/test/test_folly.cpp
976-
util/test/test_format_date.cpp
977-
util/test/test_hash.cpp
978-
util/test/test_id_transformation.cpp
979-
util/test/test_key_utils.cpp
980-
util/test/test_ranges_from_future.cpp
981-
util/test/test_reliable_storage_lock.cpp
982-
util/test/test_slab_allocator.cpp
983-
util/test/test_storage_lock.cpp
984-
util/test/test_string_pool.cpp
985-
util/test/test_string_utils.cpp
986-
util/test/test_tracing_allocator.cpp
987-
version/test/test_append.cpp
988-
version/test/test_key_block.cpp
989-
version/test/test_sort_index.cpp
990-
version/test/test_sorting_info_state_machine.cpp
991-
version/test/test_sparse.cpp
992-
version/test/test_stream_version_data.cpp
993-
version/test/test_symbol_list.cpp
994-
version/test/test_version_map.cpp
995-
version/test/test_version_map_batch.cpp
996-
version/test/test_version_store.cpp
997-
version/test/version_map_model.hpp
998-
python/python_handlers.cpp
969+
# util/test/random_throw.hpp
970+
# util/test/test_bitmagic.cpp
971+
# util/test/test_buffer_pool.cpp
972+
# util/test/test_composite.cpp
973+
# util/test/test_cursor.cpp
974+
# util/test/test_decimal.cpp
975+
# util/test/test_exponential_backoff.cpp
976+
# util/test/test_folly.cpp
977+
# util/test/test_format_date.cpp
978+
# util/test/test_hash.cpp
979+
# util/test/test_id_transformation.cpp
980+
# util/test/test_key_utils.cpp
981+
# util/test/test_ranges_from_future.cpp
982+
# util/test/test_reliable_storage_lock.cpp
983+
# util/test/test_slab_allocator.cpp
984+
# util/test/test_storage_lock.cpp
985+
# util/test/test_string_pool.cpp
986+
# util/test/test_string_utils.cpp
987+
# util/test/test_tracing_allocator.cpp
988+
# version/test/test_append.cpp
989+
# version/test/test_key_block.cpp
990+
# version/test/test_sort_index.cpp
991+
# version/test/test_sorting_info_state_machine.cpp
992+
# version/test/test_sparse.cpp
993+
# version/test/test_stream_version_data.cpp
994+
# version/test/test_symbol_list.cpp
995+
# version/test/test_version_map.cpp
996+
# version/test/test_version_map_batch.cpp
997+
# version/test/test_version_store.cpp
998+
# version/test/version_map_model.hpp
999+
# python/python_handlers.cpp
9991000
)
10001001

10011002
set(EXECUTABLE_PERMS OWNER_WRITE OWNER_READ OWNER_EXECUTE GROUP_READ GROUP_EXECUTE WORLD_READ WORLD_EXECUTE) # 755

cpp/arcticdb/processing/clause.cpp

+113
Original file line numberDiff line numberDiff line change
@@ -1381,7 +1381,120 @@ std::vector<EntityId> ConcatClause::process(std::vector<EntityId>&& entity_ids)
13811381
return std::move(entity_ids);
13821382
}
13831383

1384+
// TODO: Move somewhere else
1385+
// Verifies that all of the index information specified in the input schemas are compatible with each other, and that
1386+
// the requisite fields are present in the StreamDescriptors if necessary
1387+
std::pair<IndexDescriptorImpl, arcticdb::proto::descriptors::NormalizationMetadata> generate_index_schema(
1388+
const std::vector<OutputSchema>& input_schemas) {
1389+
util::check<ErrorCode::E_ASSERTION_FAILURE>(!input_schemas.empty(), "Cannot join empty list of schemas");
1390+
std::optional<bool> has_multi_index;
1391+
std::optional<std::string> name;
1392+
std::optional<bool> is_int;
1393+
std::optional<std::string> tz;
1394+
std::optional<bool> is_physically_stored;
1395+
std::optional<int> field_count;
1396+
std::optional<int> start;
1397+
std::optional<int> step;
1398+
for (const auto& schema: input_schemas) {
1399+
const auto& common = schema.norm_metadata_.df().common();
1400+
if (!has_multi_index.has_value()) {
1401+
has_multi_index = common.has_multi_index();
1402+
} else {
1403+
user_input::check<ErrorCode::E_INVALID_USER_ARGUMENT>(
1404+
schema.norm_metadata_.df().common().has_multi_index() == has_multi_index,
1405+
"Mismatching norm metadata in schema join");
1406+
}
1407+
if (has_multi_index) {
1408+
const auto& index = common.multi_index();
1409+
if (!name.has_value()) {
1410+
name = index.name();
1411+
is_int = index.is_int();
1412+
} else {
1413+
if ((index.name() != *name) || (index.is_int() != *is_int)) {
1414+
name = "";
1415+
is_int = false;
1416+
}
1417+
}
1418+
if (!tz.has_value()) {
1419+
tz = index.tz();
1420+
} else {
1421+
if (index.tz() != *tz) {
1422+
tz = "";
1423+
}
1424+
}
1425+
if (!field_count.has_value()) {
1426+
field_count = index.field_count();
1427+
} else {
1428+
user_input::check<ErrorCode::E_INVALID_USER_ARGUMENT>(
1429+
index.field_count() == *field_count,
1430+
"Mismatching norm metadata in schema join");
1431+
}
1432+
} else {
1433+
const auto& index = common.index();
1434+
if (!name.has_value()) {
1435+
name = index.name();
1436+
is_int = index.is_int();
1437+
} else {
1438+
if ((index.name() != *name) || (index.is_int() != *is_int)) {
1439+
name = "";
1440+
is_int = false;
1441+
}
1442+
}
1443+
if (!tz.has_value()) {
1444+
tz = index.tz();
1445+
} else {
1446+
if (index.tz() != *tz) {
1447+
tz = "";
1448+
}
1449+
}
1450+
if (!is_physically_stored.has_value()) {
1451+
is_physically_stored = index.is_physically_stored();
1452+
} else {
1453+
user_input::check<ErrorCode::E_INVALID_USER_ARGUMENT>(
1454+
index.is_physically_stored() == *is_physically_stored,
1455+
"Mismatching norm metadata in schema join");
1456+
}
1457+
if (!start.has_value()) {
1458+
start = index.start();
1459+
step = index.step();
1460+
} else {
1461+
1462+
}
1463+
}
1464+
}
1465+
}
1466+
13841467
OutputSchema ConcatClause::join_schemas(std::vector<OutputSchema>&& output_schemas) const {
1468+
// Norm meta ensure:
1469+
// All have PandasIndex OR PandasMultiIndex
1470+
// If PandasIndex:
1471+
// - name - if all the same maintain, otherwise empty string
1472+
// - is_int - means name is a string representation of an integer, so fold into name logic
1473+
// - tz - same as name
1474+
// - is_physically stored must all be the same
1475+
// - RangeIndex
1476+
// - start==0/step==1 - maintain
1477+
// - All steps the same, use start from first schema and maintain step
1478+
// - Otherwise, log warning, pick set start==0/step==1
1479+
// PandasMultiIndex same, minus is_physically stored, plus field_count matching
1480+
1481+
// StreamDescriptor index ensure:
1482+
// - Type is the same (empty matches anything)
1483+
// - Field count is the same
1484+
1485+
// Indexing ensure:
1486+
// - norm meta version and stream descriptor version semantically match
1487+
// - (check that this the case for all of the modify_schema methods on single-symbol clauses)
1488+
// - all descriptors have requisite index columns
1489+
1490+
// Inner join:
1491+
// - Create map from column name to type for first input schema
1492+
// - Iterate remaining schema, reducing key set for any column names not in each schema
1493+
// - For columns being retained, identify common type
1494+
1495+
// Outer join:
1496+
// - As above, but step 2 is union rather than intersection
1497+
13851498
// TODO: Implement this properly for inner/outer options
13861499
// For now, just check they are all the same
13871500
const auto& reference_schema = output_schemas.front();

cpp/arcticdb/processing/clause.hpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -787,7 +787,7 @@ struct ConcatClause {
787787
std::shared_ptr<ComponentManager> component_manager_;
788788
JoinType join_type_;
789789

790-
ConcatClause(JoinType join_type);
790+
explicit ConcatClause(JoinType join_type);
791791

792792
ARCTICDB_MOVE_COPY_DEFAULT(ConcatClause)
793793

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
/* Copyright 2025 Man Group Operations Limited
2+
*
3+
* Use of this software is governed by the Business Source License 1.1 included in the file licenses/BSL.txt.
4+
*
5+
* As of the Change Date specified in that file, in accordance with the Business Source License, use of this software will be governed by the Apache License, version 2.0.
6+
*/
7+
8+
#include <google/protobuf/util/message_differencer.h>
9+
10+
#include <gtest/gtest.h>
11+
#include <arcticdb/processing/clause.hpp>
12+
13+
using namespace arcticdb;
14+
using namespace arcticdb::pipelines;
15+
using namespace google::protobuf::util;
16+

cpp/arcticdb/processing/test/test_output_schema_basic.cpp

+5
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,11 @@ TEST(OutputSchema, NonModifyingClauses) {
5959
output_schema = merge_clause.modify_schema({stream_desc.clone(), norm_meta});
6060
ASSERT_EQ(output_schema.stream_descriptor(), stream_desc);
6161
ASSERT_TRUE(MessageDifferencer::Equals(output_schema.norm_metadata_, norm_meta));
62+
63+
ConcatClause concat_clause{{}};
64+
output_schema = concat_clause.modify_schema({stream_desc.clone(), norm_meta});
65+
ASSERT_EQ(output_schema.stream_descriptor(), stream_desc);
66+
ASSERT_TRUE(MessageDifferencer::Equals(output_schema.norm_metadata_, norm_meta));
6267
}
6368

6469
TEST(OutputSchema, DateRangeClauseRequiresTimeseries) {

0 commit comments

Comments
 (0)