@@ -1258,33 +1258,43 @@ MultiSymbolReadOutput LocalVersionedEngine::batch_read_with_join_internal(
1258
1258
clause->set_component_manager (component_manager);
1259
1259
}
1260
1260
auto clauses_ptr = std::make_shared<std::vector<std::shared_ptr<Clause>>>(std::move (clauses));
1261
-
1261
+ auto pipeline_context = std::make_shared<PipelineContext>();
1262
1262
return folly::collect (symbol_processing_result_futs).via (&async::io_executor ())
1263
- .thenValueInline ([clauses_ptr](auto && symbol_processing_results) {
1263
+ .thenValueInline ([this , &handler_data, pipeline_context, clauses_ptr, component_manager ](auto && symbol_processing_results) mutable {
1264
1264
util::check (!clauses_ptr->empty (), " Cannot join with no joining clause provided" );
1265
1265
std::vector<OutputSchema> output_schemas;
1266
+ std::vector<std::vector<EntityId>> entity_ids;
1267
+ std::vector<VersionedItem> res_versioned_items;
1268
+ std::vector<arcticdb::proto::descriptors::UserDefinedMetadata> res_metadatas;
1266
1269
output_schemas.reserve (symbol_processing_results.size ());
1270
+ entity_ids.reserve (symbol_processing_results.size ());
1271
+ res_versioned_items.reserve (symbol_processing_results.size ());
1272
+ res_metadatas.reserve (symbol_processing_results.size ());
1267
1273
for (const auto & symbol_processing_result: symbol_processing_results) {
1268
1274
output_schemas.emplace_back (std::move (symbol_processing_result.output_schema_ ));
1275
+ entity_ids.emplace_back (std::move (symbol_processing_result.entity_ids_ ));
1276
+ res_versioned_items.emplace_back (std::move (symbol_processing_result.versioned_item_ ));
1277
+ res_metadatas.emplace_back (std::move (symbol_processing_result.metadata_ ));
1269
1278
}
1270
- auto output_schema = clauses_ptr->front ()->join_schema (std::move (output_schemas));
1271
- }). thenValueInline (
1272
- return schedule_remaining_iterations (std::move (entity_ids_vec_fut), clauses_ptr, true )
1273
- . thenValueInline ([component_manager]( std::vector<EntityId>&& processed_entity_ids) {
1274
- auto proc = gather_entities<std::shared_ptr<SegmentInMemory>, std::shared_ptr<RowRange>, std::shared_ptr<ColRange>>(*component_manager, std::move ( processed_entity_ids));
1275
- return collect_segments ( std::move (proc ));
1276
- }). thenValueInline ([store= store (), &handler_data, pipeline_context] (std::vector<SliceAndKey>&& slice_and_keys) {
1277
- // TODO: Make constructing the output descriptor a standard end of pipeline operation
1278
- pipeline_context-> set_descriptor ( descriptor_from_segments (slice_and_keys));
1279
- return prepare_output_frame ( std::move (slice_and_keys) , pipeline_context, store, ReadOptions{}, handler_data);
1280
- }). thenValueInline ([&handler_data, pipeline_context, res_versioned_items, res_metadatas](SegmentInMemory&& frame) mutable {
1281
- return reduce_and_fix_columns ( pipeline_context, frame, ReadOptions{}, handler_data)
1282
- . thenValue ([pipeline_context, frame, res_versioned_items, res_metadatas]( auto &&) mutable {
1283
- return MultiSymbolReadOutput{ std::move (*res_versioned_items ),
1284
- std::move (*res_metadatas),
1285
- {frame, timeseries_descriptor_from_pipeline_context (pipeline_context, {}, pipeline_context-> bucketize_dynamic_ ), {}}} ;
1279
+ auto output_schema = clauses_ptr->front ()->join_schemas (std::move (output_schemas));
1280
+ pipeline_context-> set_descriptor (output_schema. stream_descriptor ());
1281
+ pipeline_context-> norm_meta_ = std::make_shared<arcticdb::proto::descriptors::NormalizationMetadata> (std::move (output_schema. norm_metadata_ ));
1282
+ return schedule_remaining_iterations ( std::move (entity_ids), clauses_ptr, true )
1283
+ . thenValueInline ([component_manager]( std::vector<EntityId>&& processed_entity_ids) {
1284
+ auto proc = gather_entities<std::shared_ptr<SegmentInMemory>, std::shared_ptr<RowRange>, std::shared_ptr<ColRange>>(*component_manager, std::move (processed_entity_ids ));
1285
+ return collect_segments (std::move (proc));
1286
+ }). thenValueInline ([store= store (), &handler_data, pipeline_context](std::vector<SliceAndKey>&& slice_and_keys) mutable {
1287
+ return prepare_output_frame ( std::move (slice_and_keys), pipeline_context, store, ReadOptions{}, handler_data );
1288
+ }). thenValueInline ([&handler_data , pipeline_context, &res_versioned_items, &res_metadatas](SegmentInMemory&& frame) mutable {
1289
+ return reduce_and_fix_columns (pipeline_context, frame, ReadOptions{}, handler_data)
1290
+ . thenValue ([ pipeline_context, frame, &res_versioned_items, &res_metadatas]( auto &&) mutable {
1291
+ return MultiSymbolReadOutput{ std::move (res_versioned_items),
1292
+ std::move (res_metadatas ),
1293
+ {frame, timeseries_descriptor_from_pipeline_context (pipeline_context, {}, pipeline_context-> bucketize_dynamic_ ), {}}};
1294
+ }) ;
1286
1295
});
1287
1296
}).get ();
1297
+
1288
1298
}
1289
1299
1290
1300
void LocalVersionedEngine::write_version_and_prune_previous (
0 commit comments