@@ -5,8 +5,8 @@ use crate::engine::{
5
5
CanonicOperatorName , OperatorData , OperatorName , QueryProcessor , WorkflowOperatorPath ,
6
6
} ;
7
7
use crate :: error:: Error ;
8
- use crate :: util:: Result ;
9
8
use crate :: util:: input:: StringOrNumberRange ;
9
+ use crate :: util:: { Result , safe_lock_mutex} ;
10
10
use crate :: {
11
11
engine:: {
12
12
InitializedVectorOperator , MetaData , QueryContext , SourceOperator ,
@@ -560,7 +560,7 @@ where
560
560
}
561
561
}
562
562
563
- type TimeExtractorType = Box < dyn Fn ( & Feature ) -> Result < TimeInterval > + Send + Sync + ' static > ;
563
+ type TimeExtractorType = Box < dyn FnMut ( & Feature ) -> Result < TimeInterval > + Send + Sync + ' static > ;
564
564
565
565
#[ pin_project( project = OgrSourceStreamProjection ) ]
566
566
pub struct OgrSourceStream < G >
@@ -571,7 +571,7 @@ where
571
571
dataset_iterator : Arc < Mutex < OgrDatasetIterator > > ,
572
572
data_types : Arc < HashMap < String , FeatureDataType > > ,
573
573
feature_collection_builder : FeatureCollectionBuilder < G > ,
574
- time_extractor : Arc < TimeExtractorType > ,
574
+ time_extractor : Arc < std :: sync :: Mutex < TimeExtractorType > > ,
575
575
time_attribute_parser :
576
576
Arc < Box < dyn Fn ( FieldValue ) -> Result < TimeInstance > + Send + Sync + ' static > > ,
577
577
query_rectangle : VectorQueryRectangle ,
@@ -849,7 +849,7 @@ where
849
849
data_types : Arc :: new ( data_types) ,
850
850
feature_collection_builder,
851
851
query_rectangle,
852
- time_extractor : Arc :: new ( time_extractor) ,
852
+ time_extractor : Arc :: new ( std :: sync :: Mutex :: new ( time_extractor) ) ,
853
853
time_attribute_parser : Arc :: new ( time_attribute_parser) ,
854
854
chunk_byte_size,
855
855
future : None ,
@@ -867,7 +867,7 @@ where
867
867
feature_collection_builder : FeatureCollectionBuilder < G > ,
868
868
data_types : Arc < HashMap < String , FeatureDataType > > ,
869
869
query_rectangle : VectorQueryRectangle ,
870
- time_extractor : Arc < TimeExtractorType > ,
870
+ time_extractor : Arc < std :: sync :: Mutex < TimeExtractorType > > ,
871
871
time_attribute_parser : Arc < Box < dyn Fn ( FieldValue ) -> Result < TimeInstance > + Send + Sync > > ,
872
872
chunk_byte_size : usize ,
873
873
) -> Result < FeatureCollection < G > > {
@@ -880,7 +880,7 @@ where
880
880
& dataset_information,
881
881
& data_types,
882
882
& query_rectangle,
883
- time_extractor. as_ref ( ) ,
883
+ safe_lock_mutex ( & time_extractor) . as_mut ( ) ,
884
884
time_attribute_parser. as_ref ( ) ,
885
885
chunk_byte_size,
886
886
) ;
@@ -975,10 +975,12 @@ where
975
975
} => {
976
976
let time_start_parser = Self :: create_time_parser ( start_format) ;
977
977
978
+ let mut field_index = None ;
978
979
Box :: new ( move |feature : & Feature | {
979
- let field_value = feature
980
- . field_index ( & start_field)
981
- . and_then ( |i| feature. field ( i) ) ?;
980
+ let field_index =
981
+ get_or_insert_field_index ( & mut field_index, feature, & start_field) ?;
982
+
983
+ let field_value = feature. field ( field_index) ?;
982
984
if let Some ( field_value) = field_value {
983
985
let time_start = time_start_parser ( field_value) ?;
984
986
TimeInterval :: new ( time_start, ( time_start + duration) ?) . map_err ( Into :: into)
@@ -997,13 +999,17 @@ where
997
999
let time_start_parser = Self :: create_time_parser ( start_format) ;
998
1000
let time_end_parser = Self :: create_time_parser ( end_format) ;
999
1001
1002
+ let mut start_field_index = None ;
1003
+ let mut end_field_index = None ;
1004
+
1000
1005
Box :: new ( move |feature : & Feature | {
1001
- let start_field_value = feature
1002
- . field_index ( & start_field)
1003
- . and_then ( |i| feature. field ( i) ) ?;
1004
- let end_field_value = feature
1005
- . field_index ( & end_field)
1006
- . and_then ( |i| feature. field ( i) ) ?;
1006
+ let start_field_index =
1007
+ get_or_insert_field_index ( & mut start_field_index, feature, & start_field) ?;
1008
+ let end_field_index =
1009
+ get_or_insert_field_index ( & mut end_field_index, feature, & end_field) ?;
1010
+
1011
+ let start_field_value = feature. field ( start_field_index) ?;
1012
+ let end_field_value = feature. field ( end_field_index) ?;
1007
1013
1008
1014
if let ( Some ( start_field_value) , Some ( end_field_value) ) =
1009
1015
( start_field_value, end_field_value)
@@ -1025,13 +1031,20 @@ where
1025
1031
} => {
1026
1032
let time_start_parser = Self :: create_time_parser ( start_format) ;
1027
1033
1034
+ let mut start_field_index = None ;
1035
+ let mut duration_field_index = None ;
1036
+
1028
1037
Box :: new ( move |feature : & Feature | {
1029
- let start_field_value = feature
1030
- . field_index ( & start_field)
1031
- . and_then ( |i| feature. field ( i) ) ?;
1032
- let duration_field_value = feature
1033
- . field_index ( & duration_field)
1034
- . and_then ( |i| feature. field ( i) ) ?;
1038
+ let start_field_index =
1039
+ get_or_insert_field_index ( & mut start_field_index, feature, & start_field) ?;
1040
+ let duration_field_index = get_or_insert_field_index (
1041
+ & mut duration_field_index,
1042
+ feature,
1043
+ & duration_field,
1044
+ ) ?;
1045
+
1046
+ let start_field_value = feature. field ( start_field_index) ?;
1047
+ let duration_field_value = feature. field ( duration_field_index) ?;
1035
1048
1036
1049
if let ( Some ( start_field_value) , Some ( duration_field_value) ) =
1037
1050
( start_field_value, duration_field_value)
@@ -1110,7 +1123,7 @@ where
1110
1123
dataset_information : & OgrSourceDataset ,
1111
1124
data_types : & HashMap < String , FeatureDataType > ,
1112
1125
query_rectangle : & VectorQueryRectangle ,
1113
- time_extractor : & dyn Fn ( & Feature ) -> Result < TimeInterval > ,
1126
+ time_extractor : & mut dyn FnMut ( & Feature ) -> Result < TimeInterval > ,
1114
1127
time_attribute_parser : & dyn Fn ( FieldValue ) -> Result < TimeInstance > ,
1115
1128
chunk_byte_size : usize ,
1116
1129
) -> Result < FeatureCollection < G > > {
@@ -1265,7 +1278,7 @@ where
1265
1278
default_geometry : & Option < G > ,
1266
1279
data_types : & HashMap < String , FeatureDataType > ,
1267
1280
query_rectangle : & VectorQueryRectangle ,
1268
- time_extractor : & dyn Fn ( & Feature ) -> Result < TimeInterval , Error > ,
1281
+ time_extractor : & mut dyn FnMut ( & Feature ) -> Result < TimeInterval , Error > ,
1269
1282
time_attribute_parser : & dyn Fn ( FieldValue ) -> Result < TimeInstance > ,
1270
1283
builder : & mut FeatureCollectionRowBuilder < G > ,
1271
1284
feature : & Feature ,
@@ -1306,8 +1319,14 @@ where
1306
1319
builder. push_generic_geometry ( geometry) ;
1307
1320
builder. push_time_interval ( time_interval) ;
1308
1321
1322
+ let mut field_indices = HashMap :: with_capacity ( data_types. len ( ) ) ;
1323
+
1309
1324
for ( column, data_type) in data_types {
1310
- let field = feature. field_index ( column) . and_then ( |i| feature. field ( i) ) ;
1325
+ let field_index = field_indices
1326
+ . entry ( column. as_str ( ) )
1327
+ . or_insert_with ( || feature. field_index ( column) ) ;
1328
+
1329
+ let field = field_index. clone ( ) . and_then ( |i| feature. field ( i) ) ;
1311
1330
let value =
1312
1331
Self :: convert_field_value ( * data_type, field, time_attribute_parser, error_spec) ?;
1313
1332
builder. push_data ( column, value) ?;
@@ -1319,6 +1338,20 @@ where
1319
1338
}
1320
1339
}
1321
1340
1341
+ fn get_or_insert_field_index (
1342
+ field_index : & mut Option < usize > ,
1343
+ feature : & Feature ,
1344
+ field_name : & str ,
1345
+ ) -> Result < usize > {
1346
+ if let Some ( i) = field_index {
1347
+ return Ok ( * i) ;
1348
+ }
1349
+
1350
+ let i = feature. field_index ( field_name) ?;
1351
+ * field_index = Some ( i) ;
1352
+ Ok ( i)
1353
+ }
1354
+
1322
1355
impl < G > Stream for OgrSourceStream < G >
1323
1356
where
1324
1357
G : Geometry + ArrowTyped + ' static + std:: marker:: Unpin + TryFromOgrGeometry ,
0 commit comments