Skip to content

Commit 652865b

Browse files
winsmithCopilot
andauthored
Use custom struct for DataSketches ThetaSketch Aggregator (#56)
* Use custom struct for DataSketches ThetaSketch Aggregator Also automatically set a size for the theta sketch in funnel queries. Fix #55 * Update Sources/DataTransferObjects/Query/CustomQuery+CompileDown.swift Co-authored-by: Copilot <[email protected]> --------- Co-authored-by: Copilot <[email protected]>
1 parent 64828a5 commit 652865b

14 files changed

+52
-35
lines changed

Sources/DataTransferObjects/Query/Aggregator.swift

Lines changed: 39 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ public indirect enum Aggregator: Codable, Hashable, Equatable {
118118
/// querying faster.
119119
///
120120
/// https://druid.apache.org/docs/latest/development/extensions-core/datasketches-theta.html
121-
case thetaSketch(GenericAggregator)
121+
case thetaSketch(ThetaSketchAggregator)
122122

123123
case quantilesDoublesSketch(QuantilesDoublesSketchAggregator)
124124

@@ -199,7 +199,7 @@ public indirect enum Aggregator: Codable, Hashable, Equatable {
199199
case "stringAny":
200200
self = try .stringAny(GenericAggregator(from: decoder))
201201
case "thetaSketch":
202-
self = try .thetaSketch(GenericAggregator(from: decoder))
202+
self = try .thetaSketch(ThetaSketchAggregator(from: decoder))
203203
case "quantilesDoublesSketch":
204204
self = try .quantilesDoublesSketch(QuantilesDoublesSketchAggregator(from: decoder))
205205
case "filtered":
@@ -424,7 +424,7 @@ public enum AggregatorType: String, Codable, Hashable {
424424
case quantilesDoublesSketch
425425
case filtered
426426

427-
// JavaScript aggregator missing
427+
// JavaScript aggregator missing on purpose
428428
}
429429

430430
/// A filtered aggregator wraps any given aggregator, but only aggregates the values for which the given dimension filter matches.
@@ -450,6 +450,41 @@ public struct FilteredAggregator: Codable, Hashable {
450450
public let name: String?
451451
}
452452

453+
/// DataSketches Theta Sketch Aggregator
454+
///
455+
/// https://druid.apache.org/docs/latest/development/extensions-core/datasketches-theta/
456+
public struct ThetaSketchAggregator: Codable, Hashable {
457+
public init(name: String, fieldName: String, size: Int? = nil, shouldFinalize: Bool? = nil) {
458+
type = .thetaSketch
459+
self.name = name
460+
self.fieldName = fieldName
461+
self.size = size
462+
self.shouldFinalize = shouldFinalize
463+
}
464+
465+
public let type: AggregatorType
466+
467+
/// String representing the output column to store sketch values.
468+
public let name: String
469+
470+
/// A string for the name of the aggregator used at ingestion time.
471+
public let fieldName: String
472+
473+
/// The maximum number of entries sketch object retains.
474+
///
475+
/// Must be a power of 2. Higher size means higher accuracy but more space to store sketches. After you index with a particular size, Druid persists the sketch in segments.
476+
/// At query time you must use a size greater or equal to the ingested size. See the DataSketches site for details. The default is recommended for the majority of use cases.
477+
///
478+
/// Defaults to 16384
479+
public let size: Int?
480+
481+
/// Return the final double type representing the estimate rather than the intermediate sketch type itself. In addition to controlling the finalization of this aggregator, you can control whether
482+
/// all aggregators are finalized with the query context parameters finalize and sqlFinalizeOuterSketches.
483+
///
484+
/// Defaults to true
485+
public let shouldFinalize: Bool?
486+
}
487+
453488
public struct QuantilesDoublesSketchAggregator: Codable, Hashable {
454489
public init(
455490
name: String,
@@ -509,7 +544,7 @@ public struct UserCountAggregator: Codable, Hashable, PrecompilableAggregator {
509544
public let name: String?
510545

511546
public func precompile() -> (aggregators: [Aggregator], postAggregators: [PostAggregator]) {
512-
let aggregators = [Aggregator.thetaSketch(.init(type: .thetaSketch, name: name ?? "Users", fieldName: "clientUser"))]
547+
let aggregators = [Aggregator.thetaSketch(.init(name: name ?? "Users", fieldName: "clientUser"))]
513548

514549
return (aggregators: aggregators, postAggregators: [])
515550
}

Sources/DataTransferObjects/Query/CustomQuery+CompileDown.swift

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,10 @@ public extension CustomQuery {
4242

4343
// Custom Query Types
4444
if query.queryType == .funnel {
45-
query = try precompiledFunnelQuery()
45+
// If a namespace is set, increase the accuracy of the funnel query (i.e. the size of the theta sketch).
46+
// This helps increase accuracy for bigger customers who have their own namespace while hopefully keeping costs down in telemetry-signals.
47+
// https://github.com/TelemetryDeck/SwiftDataTransferObjects/issues/55
48+
query = try namespace == nil ? precompiledFunnelQuery() : precompiledFunnelQuery(accuracy: 65536)
4649
} else if query.queryType == .experiment {
4750
query = try precompiledExperimentQuery()
4851
}

Sources/DataTransferObjects/QueryGeneration/CustomQuery+Experiment.swift

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ extension CustomQuery {
2020
.filtered(.init(
2121
filter: combined.1.filter ?? Filter.empty,
2222
aggregator: .thetaSketch(.init(
23-
type: .thetaSketch,
2423
name: combined.0,
2524
fieldName: "clientUser"
2625
))
@@ -34,7 +33,6 @@ extension CustomQuery {
3433
sample2.filter ?? Filter.empty,
3534
])),
3635
aggregator: .thetaSketch(.init(
37-
type: .thetaSketch,
3836
name: "users",
3937
fieldName: "clientUser"
4038
))

Sources/DataTransferObjects/QueryGeneration/CustomQuery+Funnel.swift

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
extension CustomQuery {
2-
func precompiledFunnelQuery() throws -> CustomQuery {
2+
func precompiledFunnelQuery(accuracy: Int? = nil) throws -> CustomQuery {
33
var query = self
44

55
guard let steps = steps else { throw QueryGenerationError.keyMissing(reason: "Missing key 'steps'") }
@@ -16,9 +16,9 @@ extension CustomQuery {
1616
aggregations.append(.filtered(.init(
1717
filter: step.filter ?? Filter.empty,
1818
aggregator: .thetaSketch(.init(
19-
type: .thetaSketch,
2019
name: "\(aggregationNamePrefix)\(index)",
21-
fieldName: "clientUser"
20+
fieldName: "clientUser",
21+
size: accuracy
2222
))
2323
)))
2424
}

Sources/DataTransferObjects/QueryGeneration/RetentionQueryGenerator.swift

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,6 @@ public enum RetentionQueryGenerator {
9494
intervals: [.init(dateInterval: interval)]
9595
)),
9696
aggregator: .thetaSketch(.init(
97-
type: .thetaSketch,
9897
name: "_\(title(for: interval))",
9998
fieldName: "clientUser"
10099
))

Tests/QueryGenerationTests/CompileDownTests.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ final class CompileDownTests: XCTestCase {
5858
// This should succeed because an app ID is provided
5959
let appID = UUID()
6060
let query = CustomQuery(queryType: .timeseries, appID: appID, baseFilters: .thisApp, relativeIntervals: relativeIntervals, granularity: .all)
61-
let precompiledQuery = try query.precompile(organizationAppIDs: [appID1, appID2], isSuperOrg: false)
61+
let precompiledQuery = try query.precompile(organizationAppIDs: [appID, appID1, appID2], isSuperOrg: false)
6262

6363
XCTAssertEqual(
6464
precompiledQuery.filter,

Tests/QueryGenerationTests/ConvenienceAggregatorTests.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ final class ConvenienceAggregatorTests: XCTestCase {
55
func testUserCountQueryGetsPrecompiled() throws {
66
let query = CustomQuery(queryType: .timeseries, aggregations: [.userCount(.init())])
77
let precompiled = try query.precompile(organizationAppIDs: [UUID()], isSuperOrg: false)
8-
let expectedAggregations: [Aggregator] = [.thetaSketch(.init(type: .thetaSketch, name: "Users", fieldName: "clientUser"))]
8+
let expectedAggregations: [Aggregator] = [.thetaSketch(.init(name: "Users", fieldName: "clientUser"))]
99
XCTAssertEqual(precompiled.aggregations, expectedAggregations)
1010
}
1111

Tests/QueryGenerationTests/ExperimentQueryGenerationTests.swift

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@ final class ExperimentQueryGenerationTests: XCTestCase {
3131
filter: .selector(.init(dimension: "type", value: "payScreenALaunched")),
3232
aggregator: .thetaSketch(
3333
.init(
34-
type: .thetaSketch,
3534
name: "cohort_1",
3635
fieldName: "clientUser"
3736
)
@@ -41,7 +40,6 @@ final class ExperimentQueryGenerationTests: XCTestCase {
4140
filter: .selector(.init(dimension: "type", value: "payScreenBLaunched")),
4241
aggregator: .thetaSketch(
4342
.init(
44-
type: .thetaSketch,
4543
name: "cohort_2",
4644
fieldName: "clientUser"
4745
)
@@ -51,7 +49,6 @@ final class ExperimentQueryGenerationTests: XCTestCase {
5149
filter: .selector(.init(dimension: "type", value: "paymentSucceeded")),
5250
aggregator: .thetaSketch(
5351
.init(
54-
type: .thetaSketch,
5552
name: "success",
5653
fieldName: "clientUser"
5754
)
@@ -64,7 +61,6 @@ final class ExperimentQueryGenerationTests: XCTestCase {
6461
])),
6562
aggregator: .thetaSketch(
6663
.init(
67-
type: .thetaSketch,
6864
name: "users",
6965
fieldName: "clientUser"
7066
)

Tests/QueryGenerationTests/FunnelQueryGenerationTests.swift

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ final class FunnelQueryGenerationTests: XCTestCase {
2626
filter: .selector(.init(dimension: "type", value: "appLaunchedRegularly")),
2727
aggregator: .thetaSketch(
2828
.init(
29-
type: AggregatorType.thetaSketch,
3029
name: "_funnel_step_0",
3130
fieldName: "clientUser"
3231
)
@@ -36,7 +35,6 @@ final class FunnelQueryGenerationTests: XCTestCase {
3635
filter: .selector(.init(dimension: "type", value: "dataEntered")),
3736
aggregator: .thetaSketch(
3837
.init(
39-
type: AggregatorType.thetaSketch,
4038
name: "_funnel_step_1",
4139
fieldName: "clientUser"
4240
)
@@ -46,7 +44,6 @@ final class FunnelQueryGenerationTests: XCTestCase {
4644
filter: .selector(.init(dimension: "type", value: "paywallSeen")),
4745
aggregator: .thetaSketch(
4846
.init(
49-
type: AggregatorType.thetaSketch,
5047
name: "_funnel_step_2",
5148
fieldName: "clientUser"
5249
)
@@ -56,7 +53,6 @@ final class FunnelQueryGenerationTests: XCTestCase {
5653
filter: .selector(.init(dimension: "type", value: "conversion")),
5754
aggregator: .thetaSketch(
5855
.init(
59-
type: AggregatorType.thetaSketch,
6056
name: "_funnel_step_3",
6157
fieldName: "clientUser"
6258
)

Tests/QueryGenerationTests/RetentionQueryGenerationTests.swift

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@ final class RetentionQueryGenerationTests: XCTestCase {
3636
)),
3737
aggregator: .thetaSketch(
3838
.init(
39-
type: AggregatorType.thetaSketch,
4039
name: "_2022-08-01T00:00:00.000Z_2022-08-31T23:59:59.000Z",
4140
fieldName: "clientUser"
4241
)
@@ -54,7 +53,6 @@ final class RetentionQueryGenerationTests: XCTestCase {
5453
)),
5554
aggregator: .thetaSketch(
5655
.init(
57-
type: .thetaSketch,
5856
name: "_2022-09-01T00:00:00.000Z_2022-09-30T23:59:59.000Z",
5957
fieldName: "clientUser"
6058
)

Tests/QueryTests/AggregatorTests.swift

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ final class AggregatorTests: XCTestCase {
6868
func testThetaSketchAggregatorDecoding() throws {
6969
let decodedAggregators = try JSONDecoder.telemetryDecoder.decode([Aggregator].self, from: exampleDruidAggregatorsThetaSketch)
7070

71-
XCTAssertEqual(decodedAggregators, [Aggregator.thetaSketch(.init(type: .thetaSketch, name: "count", fieldName: "clientUser"))])
71+
XCTAssertEqual(decodedAggregators, [Aggregator.thetaSketch(.init(name: "count", fieldName: "clientUser"))])
7272
}
7373

7474
func testQuantilesDoublesSketchAggregator() throws {
@@ -134,7 +134,6 @@ final class AggregatorTests: XCTestCase {
134134
),
135135
aggregator: .thetaSketch(
136136
.init(
137-
type: .thetaSketch,
138137
name: "newSessionBegan",
139138
fieldName: "clientUser"
140139
)
@@ -158,7 +157,6 @@ final class AggregatorTests: XCTestCase {
158157
),
159158
aggregator: .thetaSketch(
160159
.init(
161-
type: .thetaSketch,
162160
name: "InsightShown",
163161
fieldName: "clientUser"
164162
)
@@ -180,7 +178,6 @@ final class AggregatorTests: XCTestCase {
180178
),
181179
aggregator: .thetaSketch(
182180
.init(
183-
type: .thetaSketch,
184181
name: "newSessionBegan",
185182
fieldName: "clientUser"
186183
)
@@ -204,7 +201,6 @@ final class AggregatorTests: XCTestCase {
204201
),
205202
aggregator: .thetaSketch(
206203
.init(
207-
type: .thetaSketch,
208204
name: "InsightShown",
209205
fieldName: "clientUser"
210206
)

Tests/QueryTests/HashingTests.swift

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,7 @@ class HashingTests: XCTestCase {
191191
granularity: .day,
192192
aggregations: [
193193
.count(.init(name: "signals")),
194-
.thetaSketch(.init(type: .thetaSketch, name: "users", fieldName: "clientUser")),
194+
.thetaSketch(.init(name: "users", fieldName: "clientUser")),
195195
]
196196
)
197197

@@ -203,7 +203,7 @@ class HashingTests: XCTestCase {
203203
granularity: .day,
204204
aggregations: [
205205
.count(.init(name: "signals")),
206-
.thetaSketch(.init(type: .thetaSketch, name: "users", fieldName: "clientUser")),
206+
.thetaSketch(.init(name: "users", fieldName: "clientUser")),
207207
]
208208
)
209209

Tests/QueryTests/RetentionQueryTests.swift

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ class RetentionQueryTests: XCTestCase {
2222
)),
2323
aggregator: .thetaSketch(
2424
.init(
25-
type: AggregatorType.thetaSketch,
2625
name: "_august_clientUser_count",
2726
fieldName: "clientUser"
2827
)
@@ -40,7 +39,6 @@ class RetentionQueryTests: XCTestCase {
4039
)),
4140
aggregator: .thetaSketch(
4241
.init(
43-
type: .thetaSketch,
4442
name: "_september_clientUser_count",
4543
fieldName: "clientUser"
4644
)
@@ -58,7 +56,6 @@ class RetentionQueryTests: XCTestCase {
5856
)),
5957
aggregator: .thetaSketch(
6058
.init(
61-
type: .thetaSketch,
6259
name: "_october_clientUser_count",
6360
fieldName: "clientUser"
6461
)
@@ -76,7 +73,6 @@ class RetentionQueryTests: XCTestCase {
7673
)),
7774
aggregator: .thetaSketch(
7875
.init(
79-
type: .thetaSketch,
8076
name: "_november_clientUser_count",
8177
fieldName: "clientUser"
8278
)

Tests/QueryTests/VirtualColumnTests.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ final class VirtualColumnTests: XCTestCase {
4141
],
4242
granularity: .all,
4343
aggregations: [
44-
.thetaSketch(.init(type: .thetaSketch, name: "count", fieldName: "clientUser"))
44+
.thetaSketch(.init(name: "count", fieldName: "clientUser"))
4545
],
4646
threshold: 10,
4747
metric: .numeric(.init(metric: "count")),

0 commit comments

Comments
 (0)