Skip to content

Commit 9cafa2c

Browse files
authored
Create IndexParallelTaskSpec (#58)
* Create IndexParallelTaskSpec also clean up folder structure and sync with druid project structure * Clean up DimensionsSpec * KinesisIndexTaskIOConfig * DruidInputSource * DimensionsSpec * Update Tests * fix test * fix tests for real
1 parent 14a5854 commit 9cafa2c

19 files changed

+311
-32
lines changed

Sources/DataTransferObjects/Supervisor/InputFormat.swift renamed to Sources/DataTransferObjects/Druid/data/input/InputFormat.swift

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
/// https://druid.apache.org/docs/latest/ingestion/data-formats/#input-format
2+
///
3+
/// https://github.com/apache/druid/blob/master/processing/src/main/java/org/apache/druid/data/input/InputFormat.java
24
public struct InputFormat: Codable, Hashable, Equatable {
35
public init(type: InputFormat.InputFormatType, keepNullColumns: Bool? = nil) {
46
self.type = type
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/// https://github.com/apache/druid/blob/master/processing/src/main/java/org/apache/druid/data/input/InputSource.java#L61
2+
public indirect enum InputSource: Codable, Hashable, Equatable {
3+
case druid(DruidInputSource)
4+
5+
enum CodingKeys: String, CodingKey {
6+
case type
7+
}
8+
9+
public init(from decoder: Decoder) throws {
10+
let values = try decoder.container(keyedBy: CodingKeys.self)
11+
let type = try values.decode(String.self, forKey: .type)
12+
13+
switch type {
14+
case "druid":
15+
self = try .druid(DruidInputSource(from: decoder))
16+
17+
default:
18+
throw EncodingError.invalidValue("Invalid type", .init(codingPath: [CodingKeys.type], debugDescription: "Invalid Type", underlyingError: nil))
19+
}
20+
}
21+
22+
public func encode(to encoder: Encoder) throws {
23+
var container = encoder.container(keyedBy: CodingKeys.self)
24+
25+
switch self {
26+
case let .druid(spec):
27+
try container.encode("druid", forKey: .type)
28+
try spec.encode(to: encoder)
29+
}
30+
}
31+
}

Sources/DataTransferObjects/DataSchema/IngestionDimensionSpec.swift renamed to Sources/DataTransferObjects/Druid/data/input/impl/DimensionsSpec.swift

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
/// https://druid.apache.org/docs/latest/ingestion/ingestion-spec/#dimensionsspec
2-
public struct IngestionDimensionSpec: Codable, Hashable, Equatable {
2+
/// https://github.com/apache/druid/blob/master/processing/src/main/java/org/apache/druid/data/input/impl/DimensionsSpec.java
3+
public struct DimensionsSpec: Codable, Hashable, Equatable {
34
public init(
4-
dimensions: [IngestionDimensionSpecDimension],
5+
dimensions: [IngestionDimensionSpecDimension]? = nil,
56
dimensionExclusions: [String]? = nil,
67
spatialDimensions: [IngestionDimensionSpecSpatialDimension]? = nil,
78
includeAllDimensions: Bool? = nil,
@@ -23,7 +24,7 @@ public struct IngestionDimensionSpec: Codable, Hashable, Equatable {
2324
///
2425
/// As a best practice, put the most frequently filtered dimensions at the beginning of the dimensions list. In this case, it
2526
/// would also be good to consider partitioning by those same dimensions.
26-
public let dimensions: [IngestionDimensionSpecDimension]
27+
public let dimensions: [IngestionDimensionSpecDimension]?
2728

2829
/// The names of dimensions to exclude from ingestion. Only names are supported here, not objects.
2930
///

Sources/DataTransferObjects/DataSchema/TimestampSpec.swift renamed to Sources/DataTransferObjects/Druid/data/input/impl/TimestampSpec.swift

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77
/// You can also set the expression name to __time to replace the value of the timestamp.
88
///
99
/// Treat __time as a millisecond timestamp: the number of milliseconds since Jan 1, 1970 at midnight UTC.
10+
///
11+
/// https://github.com/apache/druid/blob/master/processing/src/main/java/org/apache/druid/data/input/impl/TimestampSpec.java
1012
public struct TimestampSpec: Codable, Hashable, Equatable {
1113
public init(
1214
column: String? = nil,

Sources/DataTransferObjects/DataSchema/GranularitySpec.swift renamed to Sources/DataTransferObjects/Druid/indexer/granularity/GranularitySpec.swift

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
/// https://druid.apache.org/docs/latest/ingestion/ingestion-spec/#granularityspec
2+
/// https://github.com/apache/druid/blob/master/processing/src/main/java/org/apache/druid/indexer/granularity/GranularitySpec.java
23
public struct GranularitySpec: Codable, Hashable, Equatable {
34
public init(
45
type: GranularitySpec.GranularitySpecType? = nil,
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/// https://druid.apache.org/docs/latest/ingestion/native-batch#ioconfig
2+
/// https://github.com/apache/druid/blob/master/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexIOConfig.java
3+
public struct ParallelIndexIOConfig: Codable, Hashable, Equatable {
4+
public init(inputFormat: InputFormat, inputSource: InputSource? = nil, appendToExisting: Bool? = nil, dropExisting: Bool? = nil) {
5+
self.inputFormat = inputFormat
6+
self.inputSource = inputSource
7+
self.appendToExisting = appendToExisting
8+
self.dropExisting = dropExisting
9+
}
10+
11+
/// inputFormat to specify how to parse input data.
12+
public let inputFormat: InputFormat
13+
14+
public let inputSource: InputSource?
15+
16+
/// Creates segments as additional shards of the latest version
17+
///
18+
/// effectively appending to the segment set instead of replacing it. This means that you can append new segments to any
19+
/// datasource regardless of its original partitioning scheme. You must use the dynamic partitioning type for the appended
20+
/// segments. If you specify a different partitioning type, the task fails with an error.
21+
public let appendToExisting: Bool?
22+
23+
/// If true and appendToExisting is false and the granularitySpec contains aninterval, then the ingestion task replaces
24+
/// all existing segments fully contained by the specified interval when the task publishes new segments. If ingestion
25+
/// fails, Druid doesn't change any existing segments. In the case of misconfiguration where either appendToExisting is
26+
/// true or interval isn't specified in granularitySpec, Druid doesn't replace any segments even if dropExisting is true.
27+
///
28+
/// WARNING: this feature is still experimental.
29+
public let dropExisting: Bool?
30+
}

Sources/DataTransferObjects/Supervisor/SupervisorSpec.swift renamed to Sources/DataTransferObjects/Druid/indexing/common/task/batch/parallel/ParallelIndexIngestionSpec.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/// The container object for the supervisor configuration.
2-
public struct SupervisorSpec: Codable, Hashable, Equatable {
2+
public struct ParallelIndexIngestionSpec: Codable, Hashable, Equatable {
33
public init(ioConfig: IoConfig? = nil, tuningConfig: TuningConfig? = nil, dataSchema: DataSchema? = nil) {
44
self.ioConfig = ioConfig
55
self.tuningConfig = tuningConfig
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
/// The Druid input source is to support reading data directly from existing Druid segments, potentially using a new schema
2+
/// and changing the name, dimensions, metrics, rollup, etc. of the segment. The Druid input source is splittable and can be
3+
/// used by the parallel task. This input source has a fixed input format for reading from Druid segments; no inputFormat
4+
/// field needs to be specified in the ingestion spec when using this input source.
5+
/// https://github.com/apache/druid/blob/master/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java
6+
///
7+
public struct DruidInputSource: Codable, Hashable, Equatable {
8+
public init(dataSource: String, interval: QueryTimeInterval, filter: Filter? = nil) {
9+
self.dataSource = dataSource
10+
self.interval = interval
11+
self.filter = filter
12+
}
13+
14+
/// A String defining the Druid datasource to fetch rows from
15+
public let dataSource: String
16+
17+
/// A String representing an ISO-8601 interval, which defines the time range to fetch the data over.
18+
public let interval: QueryTimeInterval
19+
20+
/// Only rows that match the filter, if specified, will be returned.
21+
public let filter: Filter?
22+
}

Sources/DataTransferObjects/Supervisor/ioConfig/KinesisIOConfig.swift renamed to Sources/DataTransferObjects/Druid/indexing/kinesis/KinesisIndexTaskIOConfig.swift

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
/// https://druid.apache.org/docs/latest/ingestion/supervisor/#io-configuration
22
/// https://druid.apache.org/docs/latest/ingestion/kinesis-ingestion#io-configuration
3-
public struct KinesisIOConfig: Codable, Hashable, Equatable {
3+
/// https://github.com/apache/druid/blob/master/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskIOConfig.java
4+
public struct KinesisIndexTaskIOConfig: Codable, Hashable, Equatable {
45
public init(
56
stream: String,
67
inputFormat: InputFormat,
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
/// The parallel task type index_parallel is a task for multi-threaded batch indexing. Parallel task indexing only relies on Druid
2+
/// resources. It doesn't depend on other external systems like Hadoop.
3+
///
4+
/// The index_parallel task is a supervisor task that orchestrates the whole indexing process. The supervisor task splits the input
5+
/// data and creates worker tasks to process the individual portions of data.
6+
///
7+
/// Druid issues the worker tasks to the Overlord. The Overlord schedules and runs the workers on Middle Managers or Indexers. After a
8+
/// worker task successfully processes the assigned input portion, it reports the resulting segment list to the Supervisor task.
9+
///
10+
/// The Supervisor task periodically checks the status of worker tasks. If a task fails, the Supervisor retries the task until the number
11+
/// of retries reaches the configured limit. If all worker tasks succeed, it publishes the reported segments at once and finalizes
12+
/// ingestion.
13+
///
14+
/// The detailed behavior of the parallel task is different depending on the partitionsSpec. See partitionsSpec for more details.
15+
///
16+
/// https://druid.apache.org/docs/latest/ingestion/native-batch
17+
public struct IndexParallelTaskSpec: Codable, Hashable, Equatable {
18+
/// The task ID. If omitted, Druid generates the task ID using the task type, data source name, interval, and date-time stamp.
19+
public let id: String?
20+
21+
/// The ingestion spec that defines the data schema, IO config, and tuning config.
22+
public let spec: ParallelIndexIngestionSpec
23+
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/// Tasks do all ingestion-related work in Druid.
2+
///
3+
/// For batch ingestion, you will generally submit tasks directly to Druid using the Tasks APIs. For streaming ingestion,
4+
/// tasks are generally submitted for you by a supervisor.
5+
///
6+
/// https://druid.apache.org/docs/latest/ingestion/tasks
7+
public indirect enum TaskSpec: Codable, Hashable, Equatable {
8+
case indexParallel(IndexParallelTaskSpec)
9+
10+
enum CodingKeys: String, CodingKey {
11+
case type
12+
}
13+
14+
public init(from decoder: Decoder) throws {
15+
let values = try decoder.container(keyedBy: CodingKeys.self)
16+
let type = try values.decode(String.self, forKey: .type)
17+
18+
switch type {
19+
case "index_parallel":
20+
self = try .indexParallel(IndexParallelTaskSpec(from: decoder))
21+
22+
default:
23+
throw EncodingError.invalidValue("Invalid type", .init(codingPath: [CodingKeys.type], debugDescription: "Invalid Type", underlyingError: nil))
24+
}
25+
}
26+
27+
public func encode(to encoder: Encoder) throws {
28+
var container = encoder.container(keyedBy: CodingKeys.self)
29+
30+
switch self {
31+
case let .indexParallel(spec):
32+
try container.encode("index_parallel", forKey: .type)
33+
try spec.encode(to: encoder)
34+
}
35+
}
36+
}

Sources/DataTransferObjects/DataSchema/DataSchema.swift renamed to Sources/DataTransferObjects/Druid/segment/indexing/DataSchema.swift

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
11
/// https://druid.apache.org/docs/latest/ingestion/ingestion-spec#dataschema
2+
///
3+
/// https://github.com/apache/druid/blob/master/server/src/main/java/org/apache/druid/segment/indexing/DataSchema.java
24
public struct DataSchema: Codable, Hashable, Equatable {
35
public init(
46
dataSource: String,
57
timestampSpec: TimestampSpec? = nil,
68
metricsSpec: [Aggregator]? = nil,
79
granularitySpec: GranularitySpec? = nil,
810
transformSpec: TransformSpec? = nil,
9-
dimensionsSpec: IngestionDimensionSpec? = nil
11+
dimensionsSpec: DimensionsSpec? = nil
1012
) {
1113
self.dataSource = dataSource
1214
self.timestampSpec = timestampSpec
@@ -31,5 +33,5 @@ public struct DataSchema: Codable, Hashable, Equatable {
3133

3234
public let transformSpec: TransformSpec?
3335

34-
public let dimensionsSpec: IngestionDimensionSpec?
36+
public let dimensionsSpec: DimensionsSpec?
3537
}

Sources/DataTransferObjects/Supervisor/ioConfig/IoConfig.swift renamed to Sources/DataTransferObjects/Druid/segment/indexing/IoConfig.swift

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
1+
/// https://github.com/apache/druid/blob/master/server/src/main/java/org/apache/druid/segment/indexing/IOConfig.java
12
public indirect enum IoConfig: Codable, Hashable, Equatable {
2-
case kinesis(KinesisIOConfig)
3-
// space for Kafka
3+
case kinesis(KinesisIndexTaskIOConfig)
4+
case indexParallel(ParallelIndexIOConfig)
45

56
enum CodingKeys: String, CodingKey {
67
case type
@@ -12,7 +13,9 @@ public indirect enum IoConfig: Codable, Hashable, Equatable {
1213

1314
switch type {
1415
case "kinesis":
15-
self = try .kinesis(KinesisIOConfig(from: decoder))
16+
self = try .kinesis(KinesisIndexTaskIOConfig(from: decoder))
17+
case "index_parallel":
18+
self = try .indexParallel(ParallelIndexIOConfig(from: decoder))
1619

1720
default:
1821
throw EncodingError.invalidValue("Invalid type", .init(codingPath: [CodingKeys.type], debugDescription: "Invalid Type", underlyingError: nil))
@@ -26,6 +29,9 @@ public indirect enum IoConfig: Codable, Hashable, Equatable {
2629
case let .kinesis(ioConfig):
2730
try container.encode("kinesis", forKey: .type)
2831
try ioConfig.encode(to: encoder)
32+
case let .indexParallel(ioConfig):
33+
try container.encode("index_parallel", forKey: .type)
34+
try ioConfig.encode(to: encoder)
2935
}
3036
}
3137
}

Sources/DataTransferObjects/Supervisor/Supervisor.swift

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
/// Datasource / Namespace Supervisor definition
22
public struct Supervisor: Codable, Hashable, Equatable {
3-
public init(type: Supervisor.SupervisorType, spec: SupervisorSpec? = nil, suspended: Bool? = nil) {
3+
public init(type: Supervisor.SupervisorType, spec: ParallelIndexIngestionSpec? = nil, suspended: Bool? = nil) {
44
self.type = type
55
self.spec = spec
66
self.suspended = suspended
@@ -17,7 +17,7 @@ public struct Supervisor: Codable, Hashable, Equatable {
1717
public let type: SupervisorType
1818

1919
/// The container object for the supervisor configuration.
20-
public let spec: SupervisorSpec?
20+
public let spec: ParallelIndexIngestionSpec?
2121

2222
/// Indicates whether the supervisor is in a suspended state.
2323
public let suspended: Bool?

Tests/DataSchemaTests/IngestionDimensionSpecTests.swift

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ final class IngestionDimensionSpecTests: XCTestCase {
1515
"""
1616
.filter { !$0.isWhitespace }
1717

18-
let docsValue = IngestionDimensionSpec(
18+
let docsValue = DimensionsSpec(
1919
dimensions: [
2020
.init(type: .long, name: "page", createBitmapIndex: nil, multiValueHandling: nil),
2121
.init(type: .long, name: "userId", createBitmapIndex: nil, multiValueHandling: nil),
@@ -66,7 +66,7 @@ final class IngestionDimensionSpecTests: XCTestCase {
6666
"""
6767
.filter { !$0.isWhitespace }
6868

69-
let tdValue = IngestionDimensionSpec(
69+
let tdValue = DimensionsSpec(
7070
dimensions: [
7171
.init(type: .string, name: "appID", createBitmapIndex: true, multiValueHandling: .sorted_array),
7272
.init(type: .string, name: "type", createBitmapIndex: true, multiValueHandling: .sorted_array),
@@ -85,7 +85,7 @@ final class IngestionDimensionSpecTests: XCTestCase {
8585
forceSegmentSortByTime: nil
8686
)
8787

88-
let testedType = IngestionDimensionSpec.self
88+
let testedType = DimensionsSpec.self
8989

9090
func testDecodingDocsExample() throws {
9191
let decodedValue = try JSONDecoder.telemetryDecoder.decode(testedType, from: docsValueString.data(using: .utf8)!)

Tests/QueryGenerationTests/CompileDownTests.swift

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -281,22 +281,6 @@ final class CompileDownTests: XCTestCase {
281281
XCTAssertEqual(compiledQuery.dataSource?.name, "com.telemetrydeck.test")
282282
}
283283

284-
func testNamespaceWithAvailabilityDateInTheFuture() throws {
285-
let intervals: [QueryTimeInterval] = [
286-
.init(beginningDate: Date(iso8601String: "2023-04-01T00:00:00.000Z")!, endDate: Date(iso8601String: "2023-05-31T00:00:00.000Z")!),
287-
]
288-
289-
var query = CustomQuery(queryType: .timeseries, intervals: intervals, granularity: .day)
290-
query.dataSource = nil
291-
let precompiledQuery = try query.precompile(
292-
namespace: "some-unknown-namespace",
293-
organizationAppIDs: [appID1, appID2],
294-
isSuperOrg: false
295-
)
296-
let compiledQuery = try precompiledQuery.compileToRunnableQuery()
297-
XCTAssertEqual(compiledQuery.dataSource?.name, "telemetry-signals")
298-
}
299-
300284
func testAllowsHourlyGranularityForTimeseries() throws {
301285
let intervals: [QueryTimeInterval] = [
302286
.init(beginningDate: Date(iso8601String: "2023-04-01T00:00:00.000Z")!, endDate: Date(iso8601String: "2023-05-31T00:00:00.000Z")!),

0 commit comments

Comments
 (0)