forked from facebookincubator/velox
-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathQueryConfig.h
More file actions
1594 lines (1284 loc) · 64 KB
/
QueryConfig.h
File metadata and controls
1594 lines (1284 loc) · 64 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include "velox/common/compression/Compression.h"
#include "velox/common/config/Config.h"
#include "velox/vector/TypeAliases.h"
namespace facebook::velox::core {
/// A simple wrapper around velox::IConfig. Defines constants for query
/// config properties and accessor methods.
/// Create per query context. Does not have a singleton instance.
/// Does not allow altering properties on the fly. Only at creation time.
class QueryConfig {
public:
explicit QueryConfig(std::unordered_map<std::string, std::string> values);
// This is needed only to resolve correct ctor for cases like
// QueryConfig{{}} or QueryConfig({}).
struct ConfigTag {};
explicit QueryConfig(
ConfigTag /*tag*/,
std::shared_ptr<const config::IConfig> config);
/// Maximum memory that a query can use on a single host.
static constexpr const char* kQueryMaxMemoryPerNode =
"query_max_memory_per_node";
/// User provided session timezone. Stores a string with the actual timezone
/// name, e.g: "America/Los_Angeles".
static constexpr const char* kSessionTimezone = "session_timezone";
/// Session start time in milliseconds since Unix epoch. This represents when
/// the query session began execution. Used for functions that need to know
/// the session start time (e.g., current_date, localtime).
static constexpr const char* kSessionStartTime = "start_time";
/// If true, timezone-less timestamp conversions (e.g. string to timestamp,
/// when the string does not specify a timezone) will be adjusted to the user
/// provided session timezone (if any).
///
/// For instance:
///
/// if this option is true and user supplied "America/Los_Angeles",
/// "1970-01-01" will be converted to -28800 instead of 0.
///
/// False by default.
static constexpr const char* kAdjustTimestampToTimezone =
"adjust_timestamp_to_session_timezone";
/// Whether to use the simplified expression evaluation path. False by
/// default.
static constexpr const char* kExprEvalSimplified =
"expression.eval_simplified";
/// Whether to enable the FlatNoNulls fast path for expression evaluation.
/// When enabled, expressions skip null checking and vector decoding when all
/// inputs are flat-encoded with no nulls. True by default.
static constexpr const char* kExprEvalFlatNoNulls =
"expression.eval_flat_no_nulls";
/// Whether to track CPU usage for individual expressions (supported by call
/// and cast expressions). False by default. Can be expensive when processing
/// small batches, e.g. < 10K rows.
static constexpr const char* kExprTrackCpuUsage =
"expression.track_cpu_usage";
/// Takes a comma separated list of function names to track CPU usage for.
/// Only applicable when kExprTrackCpuUsage is set to false. Is empty by
/// default. This allows fine-grained control over CPU tracking overhead when
/// only specific functions need to be monitored.
static constexpr const char* kExprTrackCpuUsageForFunctions =
"expression.track_cpu_usage_for_functions";
/// Controls whether non-deterministic expressions are deduplicated during
/// compilation. This is intended for testing and debugging purposes. By
/// default, this is set to true to preserve standard behavior. If set to
/// false, non-deterministic functions (such as rand()) will not be
/// deduplicated. Since non-deterministic functions may yield different
/// outputs on each call, disabling deduplication guarantees that the function
/// is executed only when the original expression is evaluated, rather than
/// being triggered for every deduplicated instance. This ensures each
/// invocation corresponds directly to the actual expression, maintaining
/// independent behavior for each call.
static constexpr const char* kExprDedupNonDeterministic =
"expression.dedup_non_deterministic";
/// Whether to track CPU usage for stages of individual operators. True by
/// default. Can be expensive when processing small batches, e.g. < 10K rows.
static constexpr const char* kOperatorTrackCpuUsage =
"track_operator_cpu_usage";
/// Flags used to configure the CAST operator:
static constexpr const char* kLegacyCast = "legacy_cast";
/// This flag makes the Row conversion to by applied in a way that the casting
/// row field are matched by name instead of position.
static constexpr const char* kCastMatchStructByName =
"cast_match_struct_by_name";
/// Reduce() function will throw an error if encountered an array of size
/// greater than this.
static constexpr const char* kExprMaxArraySizeInReduce =
"expression.max_array_size_in_reduce";
/// Controls maximum number of compiled regular expression patterns per
/// function instance per thread of execution.
static constexpr const char* kExprMaxCompiledRegexes =
"expression.max_compiled_regexes";
/// Used for backpressure to block local exchange producers when the local
/// exchange buffer reaches or exceeds this size.
static constexpr const char* kMaxLocalExchangeBufferSize =
"max_local_exchange_buffer_size";
/// Limits the number of partitions created by a local exchange.
/// Partitioning data too granularly can lead to poor performance.
/// This setting allows increasing the task concurrency for all
/// pipelines except the ones that require a local partitioning.
/// Affects the number of drivers for pipelines containing
/// LocalPartitionNode and cannot exceed the maximum number of
/// pipeline drivers configured for the task.
static constexpr const char* kMaxLocalExchangePartitionCount =
"max_local_exchange_partition_count";
/// Minimum number of local exchange output partitions to use buffered
/// partitioning.
///
/// When the number of output partitions is low, it is preferred to process
/// one input vector at a time. For example, with 10 output partitions
/// splitting a single 100KB input vector into 10 10KB vectors is acceptable.
/// However, when the number of output partitions is high it may result in a
/// large number of tiny vectors generated. For example, with 100 output
/// partitions splitting a single 100KB input vector results in 100 1KB
/// vectors. Exchanging and processing tiny vectors may negatively impact
/// performance. To avoid this, buffered partitioning is used to accumulate
/// larger vectors.
static constexpr const char*
kMinLocalExchangePartitionCountToUsePartitionBuffer =
"min_local_exchange_partition_count_to_use_partition_buffer";
/// Maximum size in bytes to accumulate for a single partition of a local
/// exchange before flushing.
///
/// The total amount of memory used by a single
/// local exchange operator is the sum of the sizes of all partitions. For
/// example, if the number of downstream pipeline drivers is 10 and the max
/// local exchange partition buffer size is 100KB, then the total memory used
/// by a single local exchange operator is 1MB. The total memory needed to
/// perform a local exchange is equal to the single local exchange
/// operator memory multiplied by the number of upstream pipeline drivers. For
/// example, if the number of upstream pipeline drivers is 10 the total memory
/// used by the local exchange operator is 10MB.
static constexpr const char* kMaxLocalExchangePartitionBufferSize =
"max_local_exchange_partition_buffer_size";
/// Try to preserve the encoding of the input vector when copying it to the
/// buffer.
static constexpr const char* kLocalExchangePartitionBufferPreserveEncoding =
"local_exchange_partition_buffer_preserve_encoding";
/// Maximum number of vectors buffered in each local merge source before
/// blocking to wait for consumers.
static constexpr const char* kLocalMergeSourceQueueSize =
"local_merge_source_queue_size";
/// Maximum size in bytes to accumulate in ExchangeQueue. Enforced
/// approximately, not strictly.
static constexpr const char* kMaxExchangeBufferSize =
"exchange.max_buffer_size";
/// Maximum size in bytes to accumulate among all sources of the merge
/// exchange. Enforced approximately, not strictly.
static constexpr const char* kMaxMergeExchangeBufferSize =
"merge_exchange.max_buffer_size";
/// The minimum number of bytes to accumulate in the ExchangeQueue
/// before unblocking a consumer. This is used to avoid creating tiny
/// batches which may have a negative impact on performance when the
/// cost of creating vectors is high (for example, when there are many
/// columns). To avoid latency degradation, the exchange client unblocks a
/// consumer when 1% of the data size observed so far is accumulated.
static constexpr const char* kMinExchangeOutputBatchBytes =
"min_exchange_output_batch_bytes";
static constexpr const char* kMaxPartialAggregationMemory =
"max_partial_aggregation_memory";
static constexpr const char* kMaxExtendedPartialAggregationMemory =
"max_extended_partial_aggregation_memory";
static constexpr const char* kAbandonPartialAggregationMinRows =
"abandon_partial_aggregation_min_rows";
static constexpr const char* kAbandonPartialAggregationMinPct =
"abandon_partial_aggregation_min_pct";
/// Memory threshold in bytes for triggering string compaction during
/// global aggregation. When total string storage exceeds this limit with
/// high unused memory ratio, compaction is triggered to reclaim dead strings.
/// Disabled by default (0).
///
/// NOTE: currently only applies to approx_most_frequent aggregate with
/// StringView type during global aggregation. May extend to other types.
static constexpr const char* kAggregationCompactionBytesThreshold =
"aggregation_compaction_bytes_threshold";
/// Ratio of unused (evicted) bytes to total bytes that triggers compaction.
/// Value is between 0.0 and 1.0. Default is 0.25.
///
/// NOTE: currently only applies to approx_most_frequent aggregate with
/// StringView type during global aggregation. May extend to other types.
static constexpr const char* kAggregationCompactionUnusedMemoryRatio =
"aggregation_compaction_unused_memory_ratio";
/// If true, enables lightweight memory compaction before spilling during
/// memory reclaim in aggregation. When enabled, the aggregation operator
/// will try to compact aggregate function state (e.g., free dead strings)
/// before resorting to spilling.
/// Disabled by default.
static constexpr const char* kAggregationMemoryCompactionReclaimEnabled =
"aggregation_memory_compaction_reclaim_enabled";
static constexpr const char* kAbandonPartialTopNRowNumberMinRows =
"abandon_partial_topn_row_number_min_rows";
static constexpr const char* kAbandonPartialTopNRowNumberMinPct =
"abandon_partial_topn_row_number_min_pct";
/// Number of input rows to receive before starting to check whether to
/// abandon building a HashTable without duplicates in HashBuild for left
/// semi/anti join.
static constexpr const char* kAbandonDedupHashMapMinRows =
"abandon_dedup_hashmap_min_rows";
/// Abandons building a HashTable without duplicates in HashBuild for left
/// semi/anti join if the percentage of distinct keys in the HashTable exceeds
/// this threshold. Zero means 'disable this optimization'.
static constexpr const char* kAbandonDedupHashMapMinPct =
"abandon_dedup_hashmap_min_pct";
static constexpr const char* kMaxElementsSizeInRepeatAndSequence =
"max_elements_size_in_repeat_and_sequence";
/// If true, the PartitionedOutput operator will flush rows eagerly, without
/// waiting until buffers reach certain size. Default is false.
static constexpr const char* kPartitionedOutputEagerFlush =
"partitioned_output_eager_flush";
/// The maximum number of bytes to buffer in PartitionedOutput operator to
/// avoid creating tiny SerializedPages.
///
/// For PartitionedOutputNode::Kind::kPartitioned, PartitionedOutput operator
/// would buffer up to that number of bytes / number of destinations for each
/// destination before producing a SerializedPage.
static constexpr const char* kMaxPartitionedOutputBufferSize =
"max_page_partitioning_buffer_size";
/// The maximum size in bytes for the task's buffered output.
///
/// The producer Drivers are blocked when the buffered size exceeds
/// this. The Drivers are resumed when the buffered size goes below
/// OutputBufferManager::kContinuePct % of this.
static constexpr const char* kMaxOutputBufferSize = "max_output_buffer_size";
/// Preferred size of batches in bytes to be returned by operators from
/// Operator::getOutput. It is used when an estimate of average row size is
/// known. Otherwise kPreferredOutputBatchRows is used.
static constexpr const char* kPreferredOutputBatchBytes =
"preferred_output_batch_bytes";
/// Preferred number of rows to be returned by operators from
/// Operator::getOutput. It is used when an estimate of average row size is
/// not known. When the estimate of average row size is known,
/// kPreferredOutputBatchBytes is used.
static constexpr const char* kPreferredOutputBatchRows =
"preferred_output_batch_rows";
/// Max number of rows that could be return by operators from
/// Operator::getOutput. It is used when an estimate of average row size is
/// known and kPreferredOutputBatchBytes is used to compute the number of
/// output rows.
static constexpr const char* kMaxOutputBatchRows = "max_output_batch_rows";
/// Initial output batch size in rows for MergeJoin operator. When non-zero,
/// the batch size starts at this value and is dynamically adjusted based on
/// the average row size of previous output batches. When zero (default),
/// dynamic adjustment is disabled and the batch size is fixed at
/// preferredOutputBatchRows.
static constexpr const char* kMergeJoinOutputBatchStartSize =
"merge_join_output_batch_start_size";
/// TableScan operator will exit getOutput() method after this many
/// milliseconds even if it has no data to return yet. Zero means 'no time
/// limit'.
static constexpr const char* kTableScanGetOutputTimeLimitMs =
"table_scan_getoutput_time_limit_ms";
/// If non-zero, overrides the number of rows in each output batch produced
/// by the TableScan operator, bypassing the dynamic batch size calculation.
/// Zero means 'no override'.
static constexpr const char* kTableScanOutputBatchRowsOverride =
"table_scan_output_batch_rows_override";
/// If false, the 'group by' code is forced to use generic hash mode
/// hashtable.
static constexpr const char* kHashAdaptivityEnabled =
"hash_adaptivity_enabled";
/// If true, the conjunction expression can reorder inputs based on the time
/// taken to calculate them.
static constexpr const char* kAdaptiveFilterReorderingEnabled =
"adaptive_filter_reordering_enabled";
/// If true, allow hash probe drivers to generate build-side rows in parallel.
static constexpr const char* kParallelOutputJoinBuildRowsEnabled =
"parallel_output_join_build_rows_enabled";
/// Global enable spilling flag.
static constexpr const char* kSpillEnabled = "spill_enabled";
/// Aggregation spilling flag, only applies if "spill_enabled" flag is set.
static constexpr const char* kAggregationSpillEnabled =
"aggregation_spill_enabled";
/// Join spilling flag, only applies if "spill_enabled" flag is set.
static constexpr const char* kJoinSpillEnabled = "join_spill_enabled";
/// Config to enable hash join spill for mixed grouped execution mode.
static constexpr const char* kMixedGroupedModeHashJoinSpillEnabled =
"mixed_grouped_mode_hash_join_spill_enabled";
/// OrderBy spilling flag, only applies if "spill_enabled" flag is set.
static constexpr const char* kOrderBySpillEnabled = "order_by_spill_enabled";
/// Window spilling flag, only applies if "spill_enabled" flag is set.
static constexpr const char* kWindowSpillEnabled = "window_spill_enabled";
/// When processing spilled window data, read batches of whole partitions
/// having at least that many rows. Set to 1 to read one whole partition at a
/// time. Each driver processing the Window operator will process that much
/// data at once.
static constexpr const char* kWindowSpillMinReadBatchRows =
"window_spill_min_read_batch_rows";
/// If true, the memory arbitrator will reclaim memory from table writer by
/// flushing its buffered data to disk. only applies if "spill_enabled" flag
/// is set.
static constexpr const char* kWriterSpillEnabled = "writer_spill_enabled";
/// RowNumber spilling flag, only applies if "spill_enabled" flag is set.
static constexpr const char* kRowNumberSpillEnabled =
"row_number_spill_enabled";
/// MarkDistinct spilling flag, only applies if "spill_enabled" flag is set.
static constexpr const char* kMarkDistinctSpillEnabled =
"mark_distinct_spill_enabled";
/// TopNRowNumber spilling flag, only applies if "spill_enabled" flag is set.
static constexpr const char* kTopNRowNumberSpillEnabled =
"topn_row_number_spill_enabled";
/// LocalMerge spilling flag, only applies if "spill_enabled" flag is set.
static constexpr const char* kLocalMergeSpillEnabled =
"local_merge_spill_enabled";
/// Specify the max number of local sources to merge at a time.
static constexpr const char* kLocalMergeMaxNumMergeSources =
"local_merge_max_num_merge_sources";
/// The max row numbers to fill and spill for each spill run. This is used to
/// cap the memory used for spilling. If it is zero, then there is no limit
/// and spilling might run out of memory.
/// Based on offline test results, the default value is set to 12 million rows
/// which uses ~128MB memory when to fill a spill run.
static constexpr const char* kMaxSpillRunRows = "max_spill_run_rows";
/// The max spill bytes limit set for each query. This is used to cap the
/// storage used for spilling. If it is zero, then there is no limit and
/// spilling might exhaust the storage or takes too long to run. The default
/// value is set to 100 GB.
static constexpr const char* kMaxSpillBytes = "max_spill_bytes";
/// The max allowed spilling level with zero being the initial spilling level.
/// This only applies for hash build spilling which might trigger recursive
/// spilling when the build table is too big. If it is set to -1, then there
/// is no limit and then some extreme large query might run out of spilling
/// partition bits (see kSpillPartitionBits) at the end. The max spill level
/// is used in production to prevent some bad user queries from using too much
/// io and cpu resources.
static constexpr const char* kMaxSpillLevel = "max_spill_level";
/// The max allowed spill file size. If it is zero, then there is no limit.
static constexpr const char* kMaxSpillFileSize = "max_spill_file_size";
static constexpr const char* kSpillCompressionKind =
"spill_compression_codec";
/// The max number of files to merge at a time when merging sorted files into
/// a single ordered stream. 0 means unlimited. This is used to reduce memory
/// pressure by capping the number of open files when merging spilled sorted
/// files to avoid using too much memory and causing OOM. Note that this is
/// only applicable for ordered spill.
static constexpr const char* kSpillNumMaxMergeFiles =
"spill_num_max_merge_files";
/// Enable the prefix sort or fallback to timsort in spill. The prefix sort is
/// faster than std::sort but requires the memory to build normalized prefix
/// keys, which might have potential risk of running out of server memory.
static constexpr const char* kSpillPrefixSortEnabled =
"spill_prefixsort_enabled";
/// Specifies spill write buffer size in bytes. The spiller tries to buffer
/// serialized spill data up to the specified size before write to storage
/// underneath for io efficiency. If it is set to zero, then spill write
/// buffering is disabled.
static constexpr const char* kSpillWriteBufferSize =
"spill_write_buffer_size";
/// Specifies the buffer size in bytes to read from one spilled file. If the
/// underlying filesystem supports async read, we do read-ahead with double
/// buffering, which doubles the buffer used to read from each spill file.
static constexpr const char* kSpillReadBufferSize = "spill_read_buffer_size";
/// Config used to create spill files. This config is provided to underlying
/// file system and the config is free form. The form should be defined by the
/// underlying file system.
static constexpr const char* kSpillFileCreateConfig =
"spill_file_create_config";
/// Config used to create aggregation spill files. This config is provided to
/// underlying file system and the config is free form. The form should be
/// defined by the underlying file system.
static constexpr const char* kAggregationSpillFileCreateConfig =
"aggregation_spill_file_create_config";
/// Config used to create hash join spill files. This config is provided to
/// underlying file system and the config is free form. The form should be
/// defined by the underlying file system.
static constexpr const char* kHashJoinSpillFileCreateConfig =
"hash_join_spill_file_create_config";
/// Config used to create row number spill files. This config is provided to
/// underlying file system and the config is free form. The form should be
/// defined by the underlying file system.
static constexpr const char* kRowNumberSpillFileCreateConfig =
"row_number_spill_file_create_config";
/// Default offset spill start partition bit.
/// 'kSpillNumPartitionBits' together to
/// calculate the spilling partition number for join spill or aggregation
/// spill.
static constexpr const char* kSpillStartPartitionBit =
"spiller_start_partition_bit";
/// Default number of spill partition bits. It is the number of bits used to
/// calculate the spill partition number for hash join and RowNumber. The
/// number of spill partitions will be power of two.
///
/// NOTE: as for now, we only support up to 8-way spill partitioning.
static constexpr const char* kSpillNumPartitionBits =
"spiller_num_partition_bits";
/// The minimal available spillable memory reservation in percentage of the
/// current memory usage. Suppose the current memory usage size of M,
/// available memory reservation size of N and min reservation percentage of
/// P, if M * P / 100 > N, then spiller operator needs to grow the memory
/// reservation with percentage of spillableReservationGrowthPct(). This
/// ensures we have sufficient amount of memory reservation to process the
/// large input outlier.
static constexpr const char* kMinSpillableReservationPct =
"min_spillable_reservation_pct";
/// The spillable memory reservation growth percentage of the previous memory
/// reservation size. 10 means exponential growth along a series of integer
/// powers of 11/10. The reservation grows by this much until it no longer
/// can, after which it starts spilling.
static constexpr const char* kSpillableReservationGrowthPct =
"spillable_reservation_growth_pct";
/// Minimum memory footprint size required to reclaim memory from a file
/// writer by flushing its buffered data to disk.
static constexpr const char* kWriterFlushThresholdBytes =
"writer_flush_threshold_bytes";
/// If true, array_agg() aggregation function will ignore nulls in the input.
static constexpr const char* kPrestoArrayAggIgnoreNulls =
"presto.array_agg.ignore_nulls";
/// If true, Spark function's behavior is ANSI-compliant, e.g. throws runtime
/// exception instead of returning null on invalid inputs. It affects only
/// functions explicitly marked as "ANSI compliant".
/// Note: This feature is still under development to achieve full ANSI
/// compliance. Users can refer to the Spark function documentation to verify
/// the current support status of a specific function.
static constexpr const char* kSparkAnsiEnabled = "spark.ansi_enabled";
/// The default number of expected items for the bloomfilter.
static constexpr const char* kSparkBloomFilterExpectedNumItems =
"spark.bloom_filter.expected_num_items";
/// The default number of bits to use for the bloom filter.
static constexpr const char* kSparkBloomFilterNumBits =
"spark.bloom_filter.num_bits";
/// The max number of bits to use for the bloom filter.
static constexpr const char* kSparkBloomFilterMaxNumBits =
"spark.bloom_filter.max_num_bits";
/// The max number of items to use for the bloom filter.
static constexpr const char* kSparkBloomFilterMaxNumItems =
"spark.bloom_filter.max_num_items";
/// The current spark partition id.
static constexpr const char* kSparkPartitionId = "spark.partition_id";
/// If true, simple date formatter is used for time formatting and parsing.
/// Joda date formatter is used by default.
static constexpr const char* kSparkLegacyDateFormatter =
"spark.legacy_date_formatter";
/// If true, Spark statistical aggregation functions including skewness,
/// kurtosis, stddev, stddev_samp, variance, var_samp, covar_samp and corr
/// will return NaN instead of NULL when dividing by zero during expression
/// evaluation.
static constexpr const char* kSparkLegacyStatisticalAggregate =
"spark.legacy_statistical_aggregate";
/// If true, ignore null fields when generating JSON string.
/// If false, null fields are included with a null value.
static constexpr const char* kSparkJsonIgnoreNullFields =
"spark.json_ignore_null_fields";
/// If true, collect_list aggregate function will ignore nulls in the input.
/// Defaults to true to match Spark's default behavior. Set to false to
/// include nulls (RESPECT NULLS). Introduced in Spark 4.2 (SPARK-55256).
static constexpr const char* kSparkCollectListIgnoreNulls =
"spark.collect_list.ignore_nulls";
/// The number of local parallel table writer operators per task.
static constexpr const char* kTaskWriterCount = "task_writer_count";
/// The number of local parallel table writer operators per task for
/// partitioned writes. If not set, use "task_writer_count".
static constexpr const char* kTaskPartitionedWriterCount =
"task_partitioned_writer_count";
/// If true, finish the hash probe on an empty build table for a specific set
/// of hash joins.
static constexpr const char* kHashProbeFinishEarlyOnEmptyBuild =
"hash_probe_finish_early_on_empty_build";
/// Whether hash probe can generate any dynamic filter (including Bloom
/// filter) and push down to upstream operators.
static constexpr const char* kHashProbeDynamicFilterPushdownEnabled =
"hash_probe_dynamic_filter_pushdown_enabled";
/// Whether hash probe can generate dynamic filter for string types and
/// push down to upstream operators.
static constexpr const char* kHashProbeStringDynamicFilterPushdownEnabled =
"hash_probe_string_dynamic_filter_pushdown_enabled";
/// The maximum byte size of Bloom filter that can be generated from hash
/// probe. When set to 0, no Bloom filter will be generated. To achieve
/// optimal performance, this should not be too larger than the CPU cache size
/// on the host.
static constexpr const char* kHashProbeBloomFilterPushdownMaxSize =
"hash_probe_bloom_filter_pushdown_max_size";
/// The minimum number of table rows that can trigger the parallel hash join
/// table build.
static constexpr const char* kMinTableRowsForParallelJoinBuild =
"min_table_rows_for_parallel_join_build";
/// If set to true, then during execution of tasks, the output vectors of
/// every operator are validated for consistency. This is an expensive check
/// so should only be used for debugging. It can help debug issues where
/// malformed vector cause failures or crashes by helping identify which
/// operator is generating them.
static constexpr const char* kValidateOutputFromOperators =
"debug.validate_output_from_operators";
/// If true, enable caches in expression evaluation for performance, including
/// ExecCtx::vectorPool_, ExecCtx::decodedVectorPool_,
/// ExecCtx::selectivityVectorPool_, Expr::baseDictionary_,
/// Expr::dictionaryCache_, and Expr::cachedDictionaryIndices_. Otherwise,
/// disable the caches.
static constexpr const char* kEnableExpressionEvaluationCache =
"enable_expression_evaluation_cache";
/// For a given shared subexpression, the maximum distinct sets of inputs we
/// cache results for. Lambdas can call the same expression with different
/// inputs many times, causing the results we cache to explode in size.
/// Putting a limit contains the memory usage.
static constexpr const char* kMaxSharedSubexprResultsCached =
"max_shared_subexpr_results_cached";
/// Maximum number of splits to preload. Set to 0 to disable preloading.
static constexpr const char* kMaxSplitPreloadPerDriver =
"max_split_preload_per_driver";
/// If not zero, specifies the cpu time slice limit in ms that a driver thread
/// can continuously run without yielding. If it is zero, then there is no
/// limit.
static constexpr const char* kDriverCpuTimeSliceLimitMs =
"driver_cpu_time_slice_limit_ms";
/// Window operator can be configured to sub-divide window partitions on each
/// thread of execution into groups of partitions for sequential processing.
/// This setting specifies how many sub-partitions to create for each thread.
static constexpr const char* kWindowNumSubPartitions =
"window_num_sub_partitions";
/// Maximum number of bytes to use for the normalized key in prefix-sort. Use
/// 0 to disable prefix-sort.
static constexpr const char* kPrefixSortNormalizedKeyMaxBytes =
"prefixsort_normalized_key_max_bytes";
/// Minimum number of rows to use prefix-sort. The default value has been
/// derived using micro-benchmarking.
static constexpr const char* kPrefixSortMinRows = "prefixsort_min_rows";
/// Maximum number of bytes to be stored in prefix-sort buffer for a string
/// key.
static constexpr const char* kPrefixSortMaxStringPrefixLength =
"prefixsort_max_string_prefix_length";
/// Enable query tracing flag.
static constexpr const char* kQueryTraceEnabled = "query_trace_enabled";
/// Base dir of a query to store tracing data.
static constexpr const char* kQueryTraceDir = "query_trace_dir";
/// The plan node id whose input data will be traced.
/// Empty string if only want to trace the query metadata.
static constexpr const char* kQueryTraceNodeId = "query_trace_node_id";
/// The max trace bytes limit. Tracing is disabled if zero.
static constexpr const char* kQueryTraceMaxBytes = "query_trace_max_bytes";
/// The regexp of traced task id. We only enable trace on a task if its id
/// matches.
static constexpr const char* kQueryTraceTaskRegExp =
"query_trace_task_reg_exp";
/// If true, we only collect the input trace for a given operator but without
/// the actual execution.
static constexpr const char* kQueryTraceDryRun = "query_trace_dry_run";
/// Config used to create operator trace directory. This config is provided to
/// underlying file system and the config is free form. The form should be
/// defined by the underlying file system.
static constexpr const char* kOpTraceDirectoryCreateConfig =
"op_trace_directory_create_config";
/// Disable optimization in expression evaluation to peel common dictionary
/// layer from inputs.
static constexpr const char* kDebugDisableExpressionWithPeeling =
"debug_disable_expression_with_peeling";
/// Disable optimization in expression evaluation to re-use cached results for
/// common sub-expressions.
static constexpr const char* kDebugDisableCommonSubExpressions =
"debug_disable_common_sub_expressions";
/// Disable optimization in expression evaluation to re-use cached results
/// between subsequent input batches that are dictionary encoded and have the
/// same alphabet(underlying flat vector).
static constexpr const char* kDebugDisableExpressionWithMemoization =
"debug_disable_expression_with_memoization";
/// Disable optimization in expression evaluation to delay loading of lazy
/// inputs unless required.
static constexpr const char* kDebugDisableExpressionWithLazyInputs =
"debug_disable_expression_with_lazy_inputs";
/// Fix the random seed used to create data structure used in
/// approx_percentile. This makes the query result deterministic on single
/// node; multi-node partial aggregation is still subject to non-determinism
/// due to non-deterministic merge order.
static constexpr const char*
kDebugAggregationApproxPercentileFixedRandomSeed =
"debug_aggregation_approx_percentile_fixed_random_seed";
/// When debug is enabled for memory manager, this is used to match the memory
/// pools that need allocation callsites tracking. Default to track nothing.
static constexpr const char* kDebugMemoryPoolNameRegex =
"debug_memory_pool_name_regex";
/// Warning threshold in bytes for debug memory pools. When set to a
/// non-zero value, a warning will be logged once per memory pool when
/// allocations cause the pool to exceed this threshold. This is useful for
/// identifying memory usage patterns during debugging. Requires allocation
/// tracking to be enabled via `debug_memory_pool_name_regex` for the pool. A
/// value of 0 means no warning threshold is enforced.
static constexpr const char* kDebugMemoryPoolWarnThresholdBytes =
"debug_memory_pool_warn_threshold_bytes";
/// Some lambda functions over arrays and maps are evaluated in batches of the
/// underlying elements that comprise the arrays/maps. This is done to make
/// the batch size manageable as array vectors can have thousands of elements
/// each and hit scaling limits as implementations typically expect
/// BaseVectors to a couple of thousand entries. This lets up tune those batch
/// sizes.
static constexpr const char* kDebugLambdaFunctionEvaluationBatchSize =
"debug_lambda_function_evaluation_batch_size";
/// The UDF `bing_tile_children` generates the children of a Bing tile based
/// on a specified target zoom level. The number of children produced is
/// determined by the difference between the target zoom level and the zoom
/// level of the input tile. This configuration limits the number of children
/// by capping the maximum zoom level difference, with a default value set
/// to 5. This cap is necessary to prevent excessively large array outputs,
/// which can exceed the size limits of the elements vector in the Velox array
/// vector.
static constexpr const char* kDebugBingTileChildrenMaxZoomShift =
"debug_bing_tile_children_max_zoom_shift";
/// Temporary flag to control whether selective Nimble reader should be used
/// in this query or not. Will be removed after the selective Nimble reader
/// is fully rolled out.
static constexpr const char* kSelectiveNimbleReaderEnabled =
"selective_nimble_reader_enabled";
/// The max ratio of a query used memory to its max capacity, and the scale
/// writer exchange stops scaling writer processing if the query's current
/// memory usage exceeds this ratio. The value is in the range of (0, 1].
static constexpr const char* kScaleWriterRebalanceMaxMemoryUsageRatio =
"scaled_writer_rebalance_max_memory_usage_ratio";
/// The max number of logical table partitions that can be assigned to a
/// single table writer thread. The logical table partition is used by local
/// exchange writer for writer scaling, and multiple physical table
/// partitions can be mapped to the same logical table partition based on the
/// hash value of calculated partitioned ids.
static constexpr const char* kScaleWriterMaxPartitionsPerWriter =
"scaled_writer_max_partitions_per_writer";
/// Minimum amount of data processed by a logical table partition to trigger
/// writer scaling if it is detected as overloaded by scale wrirer exchange.
static constexpr const char*
kScaleWriterMinPartitionProcessedBytesRebalanceThreshold =
"scaled_writer_min_partition_processed_bytes_rebalance_threshold";
/// Minimum amount of data processed by all the logical table partitions to
/// trigger skewed partition rebalancing by scale writer exchange.
static constexpr const char* kScaleWriterMinProcessedBytesRebalanceThreshold =
"scaled_writer_min_processed_bytes_rebalance_threshold";
/// If true, enables the scaled table scan processing. For each table scan
/// plan node, a scan controller is used to control the number of running scan
/// threads based on the query memory usage. It keeps increasing the number of
/// running threads until the query memory usage exceeds the threshold defined
/// by 'table_scan_scale_up_memory_usage_ratio'.
static constexpr const char* kTableScanScaledProcessingEnabled =
"table_scan_scaled_processing_enabled";
/// The query memory usage ratio used by scan controller to decide if it can
/// increase the number of running scan threads. When the query memory usage
/// is below this ratio, the scan controller keeps increasing the running scan
/// thread for scale up, and stop once exceeds this ratio. The value is in the
/// range of [0, 1].
///
/// NOTE: this only applies if 'table_scan_scaled_processing_enabled' is true.
static constexpr const char* kTableScanScaleUpMemoryUsageRatio =
"table_scan_scale_up_memory_usage_ratio";
/// Specifies the shuffle compression kind which is defined by
/// CompressionKind. If it is CompressionKind_NONE, then no compression.
static constexpr const char* kShuffleCompressionKind =
"shuffle_compression_codec";
/// If a key is found in multiple given maps, by default that key's value in
/// the resulting map comes from the last one of those maps. When true, throw
/// exception on duplicate map key.
static constexpr const char* kThrowExceptionOnDuplicateMapKeys =
"throw_exception_on_duplicate_map_keys";
/// Specifies the max number of input batches to prefetch to do index lookup
/// ahead. If it is zero, then process one input batch at a time.
static constexpr const char* kIndexLookupJoinMaxPrefetchBatches =
"index_lookup_join_max_prefetch_batches";
/// If this is true, then the index join operator might split output for each
/// input batch based on the output batch size control. Otherwise, it tries to
/// produce a single output for each input batch.
static constexpr const char* kIndexLookupJoinSplitOutput =
"index_lookup_join_split_output";
// Max wait time for exchange request in seconds.
static constexpr const char* kRequestDataSizesMaxWaitSec =
"request_data_sizes_max_wait_sec";
/// In streaming aggregation, wait until we have enough number of output rows
/// to produce a batch of size specified by this. If set to 0, then
/// Operator::outputBatchRows will be used as the min output batch rows.
static constexpr const char* kStreamingAggregationMinOutputBatchRows =
"streaming_aggregation_min_output_batch_rows";
/// TODO: Remove after dependencies are cleaned up.
static constexpr const char* kStreamingAggregationEagerFlush =
"streaming_aggregation_eager_flush";
// If true, skip request data size if there is only single source.
// This is used to optimize the Presto-on-Spark use case where each
// exchange client has only one shuffle partition source.
static constexpr const char* kSkipRequestDataSizeWithSingleSourceEnabled =
"skip_request_data_size_with_single_source_enabled";
/// If true, exchange clients defer data fetching until next() is called.
/// This enables waiter tasks using cached hash tables to skip I/O entirely
/// when the table is already cached. If false (default), exchange clients
/// start fetching data immediately when remote tasks are added.
static constexpr const char* kExchangeLazyFetchingEnabled =
"exchange_lazy_fetching_enabled";
/// If this is true, then it allows you to get the struct field names
/// as json element names when casting a row to json.
static constexpr const char* kFieldNamesInJsonCastEnabled =
"field_names_in_json_cast_enabled";
/// If this is true, then operators that evaluate expressions will track
/// stats for expressions that are not special forms and return them as
/// part of their operator stats. Tracking these stats can be expensive
/// (especially if operator stats are retrieved frequently) and this allows
/// the user to explicitly enable it.
static constexpr const char* kOperatorTrackExpressionStats =
"operator_track_expression_stats";
/// If this is true, enable the operator input/output batch size stats
/// collection in driver execution. This can be expensive for data types with
/// a large number of columns (e.g., ROW types) as it calls estimateFlatSize()
/// which recursively calculates sizes for all child vectors.
static constexpr const char* kEnableOperatorBatchSizeStats =
"enable_operator_batch_size_stats";
/// If this is true, then the unnest operator might split output for each
/// input batch based on the output batch size control. Otherwise, it produces
/// a single output for each input batch. This can be overridden on a per
/// operator basis by the splitOutput parameter in the UnnestPlanNode.
static constexpr const char* kUnnestSplitOutput = "unnest_split_output";
/// Priority of the query in the memory pool reclaimer. Lower value means
/// higher priority. This is used in global arbitration victim selection.
static constexpr const char* kQueryMemoryReclaimerPriority =
"query_memory_reclaimer_priority";
/// The max number of input splits to listen to by SplitListener per table
/// scan node per worker. It's up to the SplitListener implementation to
/// respect this config.
static constexpr const char* kMaxNumSplitsListenedTo =
"max_num_splits_listened_to";
/// Source of the query. Used by Presto to identify the file system username.
static constexpr const char* kSource = "source";
/// Client tags of the query. Used by Presto to identify the file system
/// username.
static constexpr const char* kClientTags = "client_tags";
/// Enable (reader) row size tracker as a fallback to file level row size
/// estimates.
static constexpr const char* kRowSizeTrackingMode = "row_size_tracking_mode";
/// Maximum number of distinct values to keep when merging vector hashers in
/// join HashBuild.
static constexpr const char* kJoinBuildVectorHasherMaxNumDistinct =
"join_build_vector_hasher_max_num_distinct";
enum class RowSizeTrackingMode {
DISABLED = 0,
EXCLUDE_DELTA_SPLITS = 1,
ENABLED_FOR_ALL = 2,
};
bool selectiveNimbleReaderEnabled() const {
return get<bool>(kSelectiveNimbleReaderEnabled, true);
}
RowSizeTrackingMode rowSizeTrackingMode() const {
return get<RowSizeTrackingMode>(
kRowSizeTrackingMode, RowSizeTrackingMode::ENABLED_FOR_ALL);
}
bool debugDisableExpressionsWithPeeling() const {
return get<bool>(kDebugDisableExpressionWithPeeling, false);
}
bool debugDisableCommonSubExpressions() const {
return get<bool>(kDebugDisableCommonSubExpressions, false);
}
bool debugDisableExpressionsWithMemoization() const {
return get<bool>(kDebugDisableExpressionWithMemoization, false);
}
bool debugDisableExpressionsWithLazyInputs() const {
return get<bool>(kDebugDisableExpressionWithLazyInputs, false);
}
std::string debugMemoryPoolNameRegex() const {
return get<std::string>(kDebugMemoryPoolNameRegex, "");
}
uint64_t debugMemoryPoolWarnThresholdBytes() const {
return config::toCapacity(
get<std::string>(kDebugMemoryPoolWarnThresholdBytes, "0B"),
config::CapacityUnit::BYTE);
}
std::optional<uint32_t> debugAggregationApproxPercentileFixedRandomSeed()
const {
return get<uint32_t>(kDebugAggregationApproxPercentileFixedRandomSeed);
}
int32_t debugLambdaFunctionEvaluationBatchSize() const {
return get<int32_t>(kDebugLambdaFunctionEvaluationBatchSize, 10'000);
}
uint8_t debugBingTileChildrenMaxZoomShift() const {
return get<uint8_t>(kDebugBingTileChildrenMaxZoomShift, 7);
}
uint64_t queryMaxMemoryPerNode() const {
return config::toCapacity(
get<std::string>(kQueryMaxMemoryPerNode, "0B"),
config::CapacityUnit::BYTE);
}
uint64_t maxPartialAggregationMemoryUsage() const {
static constexpr uint64_t kDefault = 1L << 24;
return get<uint64_t>(kMaxPartialAggregationMemory, kDefault);
}
uint64_t maxExtendedPartialAggregationMemoryUsage() const {
static constexpr uint64_t kDefault = 1L << 26;
return get<uint64_t>(kMaxExtendedPartialAggregationMemory, kDefault);
}
int32_t abandonPartialAggregationMinRows() const {
return get<int32_t>(kAbandonPartialAggregationMinRows, 100'000);
}
int32_t abandonPartialAggregationMinPct() const {
return get<int32_t>(kAbandonPartialAggregationMinPct, 80);
}
uint64_t aggregationCompactionBytesThreshold() const {
return get<uint64_t>(kAggregationCompactionBytesThreshold, 0);
}
double aggregationCompactionUnusedMemoryRatio() const {
return get<double>(kAggregationCompactionUnusedMemoryRatio, 0.25);
}
bool aggregationMemoryCompactionReclaimEnabled() const {
return get<bool>(kAggregationMemoryCompactionReclaimEnabled, false);
}
int32_t abandonPartialTopNRowNumberMinRows() const {
return get<int32_t>(kAbandonPartialTopNRowNumberMinRows, 100'000);
}
int32_t abandonPartialTopNRowNumberMinPct() const {
return get<int32_t>(kAbandonPartialTopNRowNumberMinPct, 80);
}
int32_t abandonHashBuildDedupMinRows() const {
return get<int32_t>(kAbandonDedupHashMapMinRows, 100'000);
}
int32_t abandonHashBuildDedupMinPct() const {
return get<int32_t>(kAbandonDedupHashMapMinPct, 0);
}
int32_t maxElementsSizeInRepeatAndSequence() const {
return get<int32_t>(kMaxElementsSizeInRepeatAndSequence, 10'000);
}
uint64_t maxSpillRunRows() const {
static constexpr uint64_t kDefault = 12UL << 20;
return get<uint64_t>(kMaxSpillRunRows, kDefault);
}