Skip to content

Commit 5a6cbb3

Browse files
committed
Add Automatic Compaction Configuration
1 parent 652865b commit 5a6cbb3

File tree

7 files changed

+346
-1
lines changed

7 files changed

+346
-1
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/// Defines Automatic Compaction settings for a data source
2+
public struct AutoCompactionDynamicConfig: Codable, Hashable, Equatable {
3+
public init(
4+
dataSource: String,
5+
taskPriority: Int?,
6+
inputSegmentSizeBytes: Int?,
7+
skipOffsetFromLatest: String?,
8+
tuningConfig: TuningConfig?,
9+
granularitySpec: GranularitySpec?,
10+
maxRowsPerSegment: Int?
11+
) {
12+
self.dataSource = dataSource
13+
self.taskPriority = taskPriority
14+
self.inputSegmentSizeBytes = inputSegmentSizeBytes
15+
self.skipOffsetFromLatest = skipOffsetFromLatest
16+
self.tuningConfig = tuningConfig
17+
self.granularitySpec = granularitySpec
18+
self.maxRowsPerSegment = maxRowsPerSegment
19+
}
20+
21+
/// The datasource name to be compacted.
22+
public let dataSource: String
23+
24+
/// Priority of compaction task (Defaults to 25)
25+
public let taskPriority: Int?
26+
27+
/// Maximum number of total segment bytes processed per compaction task.
28+
///
29+
/// Since a time chunk must be processed in its entirety, if the segments for a particular time chunk have a total size in bytes greater than this parameter, compaction will not run for that time chunk. (default = 100,000,000,000,000 i.e. 100TB)
30+
public let inputSegmentSizeBytes: Int?
31+
32+
/// The offset for searching segments to be compacted in ISO 8601 duration format. Strongly recommended to set for realtime datasources. See Data handling with compaction. (default = "P1D")
33+
public let skipOffsetFromLatest: String?
34+
35+
/// Tuning config for compaction tasks.
36+
public let tuningConfig: TuningConfig?
37+
38+
// not implemented:
39+
// public let taskContext: TaskContext?
40+
41+
/// Custom granularitySpec.
42+
public let granularitySpec: GranularitySpec?
43+
44+
// not implemented:
45+
// public let dimensionsSpec: DimensionsSpec?
46+
47+
// not implemented:
48+
// public let transformSpec: TransformSpec?
49+
50+
// not implemented:
51+
// public let metricsSpec: MetricsSpec?
52+
53+
// not implemented:
54+
// public let iOConfig: IOConfig?
55+
56+
// undocumented
57+
public let maxRowsPerSegment: Int?
58+
}
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
public struct IndexParallelTuningConfig: Codable, Hashable, Equatable {
2+
public init(
3+
maxRowsInMemory: Int? = nil,
4+
maxBytesInMemory: Int? = nil,
5+
maxColumnsToMerge: Int? = nil,
6+
splitHintSpec: SplitHintSpec? = nil,
7+
partitionsSpec: PartitionsSpec? = nil,
8+
indexSpec: IndexSpec? = nil,
9+
indexSpecForIntermediatePersists: IndexSpec? = nil,
10+
maxPendingPersists: Int? = nil,
11+
forceGuaranteedRollup: Bool? = nil,
12+
reportParseExceptions: Bool? = nil,
13+
pushTimeout: Int? = nil,
14+
maxNumConcurrentSubTasks: Int? = nil,
15+
maxRetry: Int? = nil,
16+
maxNumSegmentsToMerge: Int? = nil,
17+
totalNumMergeTasks: Int? = nil,
18+
taskStatusCheckPeriodMs: Int? = nil,
19+
chatHandlerTimeout: String? = nil,
20+
chatHandlerNumRetries: Int? = nil,
21+
awaitSegmentAvailabilityTimeoutMillis: Int? = nil
22+
) {
23+
self.maxRowsInMemory = maxRowsInMemory
24+
self.maxBytesInMemory = maxBytesInMemory
25+
self.maxColumnsToMerge = maxColumnsToMerge
26+
self.splitHintSpec = splitHintSpec
27+
self.partitionsSpec = partitionsSpec
28+
self.indexSpec = indexSpec
29+
self.indexSpecForIntermediatePersists = indexSpecForIntermediatePersists
30+
self.maxPendingPersists = maxPendingPersists
31+
self.forceGuaranteedRollup = forceGuaranteedRollup
32+
self.reportParseExceptions = reportParseExceptions
33+
self.pushTimeout = pushTimeout
34+
self.maxNumConcurrentSubTasks = maxNumConcurrentSubTasks
35+
self.maxRetry = maxRetry
36+
self.maxNumSegmentsToMerge = maxNumSegmentsToMerge
37+
self.totalNumMergeTasks = totalNumMergeTasks
38+
self.taskStatusCheckPeriodMs = taskStatusCheckPeriodMs
39+
self.chatHandlerTimeout = chatHandlerTimeout
40+
self.chatHandlerNumRetries = chatHandlerNumRetries
41+
self.awaitSegmentAvailabilityTimeoutMillis = awaitSegmentAvailabilityTimeoutMillis
42+
}
43+
44+
/// Used in determining when intermediate persists to disk should occur. Normally user does not need to set this, but depending on the nature of data, if rows are short in terms of bytes, user may not want to store a million rows in memory and this value should be set. (default = 1000000)
45+
public let maxRowsInMemory: Int?
46+
47+
/// Used in determining when intermediate persists to disk should occur. Normally this is computed internally and user does not need to set it. This value represents number of bytes to aggregate in heap memory before persisting. This is based on a rough estimate of memory usage and not actual usage. The maximum heap memory usage for indexing is maxBytesInMemory * (2 + maxPendingPersists) (default = 1/6 of max JVM memory)
48+
public let maxBytesInMemory: Int?
49+
50+
/// Limit of the number of segments to merge in a single phase when merging segments for publishing. This limit affects the total number of columns present in a set of segments to merge. If the limit is exceeded, segment merging occurs in multiple phases. Druid merges at least 2 segments per phase, regardless of this setting. (Default = -1 i.e. no limit)
51+
public let maxColumnsToMerge: Int?
52+
53+
/// Hint to control the amount of data that each first phase task reads. Druid may ignore the hint depending on the implementation of the input source. (default: size-based split hint spec)
54+
public let splitHintSpec: SplitHintSpec?
55+
56+
/// Defines how to partition data in each timeChunk
57+
public let partitionsSpec: PartitionsSpec?
58+
59+
/// Defines segment storage format options to use at indexing time
60+
public let indexSpec: IndexSpec?
61+
62+
/// Defines segment storage format options to use at indexing time for intermediate persisted temporary segments. You can use indexSpecForIntermediatePersists to disable dimension/metric compression on intermediate segments to reduce memory required for final merging. However, disabling compression on intermediate segments might increase page cache use while they are used before getting merged into final segment published.
63+
public let indexSpecForIntermediatePersists: IndexSpec?
64+
65+
/// Maximum number of persists that can be pending but not started. If a new intermediate persist exceeds this limit, Druid blocks ingestion until the currently running persist finishes. One persist can be running concurrently with ingestion, and none can be queued up. The maximum heap memory usage for indexing scales is maxRowsInMemory * (2 + maxPendingPersists).
66+
public let maxPendingPersists: Int?
67+
68+
/// Forces perfect rollup. The perfect rollup optimizes the total size of generated segments and querying time but increases indexing time. If true, specify intervals in the granularitySpec and use either hashed or single_dim for the partitionsSpec. You cannot use this flag in conjunction with appendToExisting of IOConfig. (default = false)
69+
public let forceGuaranteedRollup: Bool?
70+
71+
/// If true, Druid throws exceptions encountered during parsing and halts ingestion. If false, Druid skips unparseable rows and fields. (default = false)
72+
public let reportParseExceptions: Bool?
73+
74+
/// Milliseconds to wait to push segments. Must be >= 0, where 0 means to wait forever. (default = 0)
75+
public let pushTimeout: Int?
76+
77+
// not implemented:
78+
// public let segmentWriteOutMediumFactory
79+
80+
/// Maximum number of worker tasks which can be run in parallel at the same time. The supervisor task would spawn worker tasks up to maxNumConcurrentSubTasks regardless of the current available task slots. If this value is set to 1, the Supervisor task processes data ingestion on its own instead of spawning worker tasks. If this value is set to too large, too many worker tasks can be created which might block other ingestion (default = 1)
81+
public let maxNumConcurrentSubTasks: Int?
82+
83+
/// Maximum number of retries on task failures (default = 3)
84+
public let maxRetry: Int?
85+
86+
/// Max limit for the number of segments that a single task can merge at the same time in the second phase. Used only with hashed or single_dim partitionsSpec. (default = 100)
87+
public let maxNumSegmentsToMerge: Int?
88+
89+
/// Total number of tasks to merge segments in the merge phase when partitionsSpec is set to hashed or single_dim. (default = 10)
90+
public let totalNumMergeTasks: Int?
91+
92+
/// Polling period in milliseconds to check running task statuses. (default=1000)
93+
public let taskStatusCheckPeriodMs: Int?
94+
95+
/// Timeout for reporting the pushed segments in worker tasks. (default = PT10S)
96+
public let chatHandlerTimeout: String?
97+
98+
/// Retries for reporting the pushed segments in worker tasks. (default = 5)
99+
public let chatHandlerNumRetries: Int?
100+
101+
/// Milliseconds to wait for the newly indexed segments to become available for query after ingestion completes. If <= 0, no wait occurs. If > 0, the task waits for the Coordinator to indicate that the new segments are available for querying. If the timeout expires, the task exits as successful, but the segments are not confirmed as available for query. (default = 0)
102+
public let awaitSegmentAvailabilityTimeoutMillis: Int?
103+
}

Sources/DataTransferObjects/Supervisor/tuningConfig/TuningConfig.swift renamed to Sources/DataTransferObjects/Druid/configuration/TuningConfig/TuningConfig.swift

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
public indirect enum TuningConfig: Codable, Hashable, Equatable {
22
case kinesis(KinesisTuningConfig)
3-
// space for Kafka
3+
case indexParallel(IndexParallelTuningConfig)
4+
// case kafka not implemented
45

56
enum CodingKeys: String, CodingKey {
67
case type
@@ -13,6 +14,8 @@ public indirect enum TuningConfig: Codable, Hashable, Equatable {
1314
switch type {
1415
case "kinesis":
1516
self = try .kinesis(KinesisTuningConfig(from: decoder))
17+
case "index_parallel":
18+
self = try .indexParallel(IndexParallelTuningConfig(from: decoder))
1619

1720
default:
1821
throw EncodingError.invalidValue("Invalid type", .init(codingPath: [CodingKeys.type], debugDescription: "Invalid Type", underlyingError: nil))
@@ -26,6 +29,9 @@ public indirect enum TuningConfig: Codable, Hashable, Equatable {
2629
case let .kinesis(ioConfig):
2730
try container.encode("kinesis", forKey: .type)
2831
try ioConfig.encode(to: encoder)
32+
case let .indexParallel(tuningConfig):
33+
try container.encode("index_parallel", forKey: .type)
34+
try tuningConfig.encode(to: encoder)
2935
}
3036
}
3137
}

0 commit comments

Comments
 (0)