Skip to content

Commit b9f3345

Browse files
authored
Add unit tests and benchmarks for SqlSegmentsMetadataQuery.retrieveUsedSegments (#18108)
Changes: - Add utility methods to `TestDerbyConnector` - Add `SqlSegmentsMetadataQueryBenchmark` - Add new tests in `SqlSegmentsMetadataQueryTest`
1 parent 821769c commit b9f3345

6 files changed

Lines changed: 358 additions & 42 deletions

File tree

benchmarks/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,11 @@
3131
</parent>
3232

3333
<dependencies>
34+
<dependency>
35+
<groupId>org.jdbi</groupId>
36+
<artifactId>jdbi</artifactId>
37+
<scope>provided</scope>
38+
</dependency>
3439
<dependency>
3540
<groupId>org.openjdk.jmh</groupId>
3641
<artifactId>jmh-core</artifactId>
Lines changed: 157 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,157 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.druid.benchmark.indexing;
21+
22+
import com.google.common.collect.ImmutableSet;
23+
import org.apache.druid.java.util.common.DateTimes;
24+
import org.apache.druid.java.util.common.granularity.Granularities;
25+
import org.apache.druid.java.util.common.parsers.CloseableIterator;
26+
import org.apache.druid.metadata.IndexerSqlMetadataStorageCoordinatorTestBase;
27+
import org.apache.druid.metadata.MetadataStorageTablesConfig;
28+
import org.apache.druid.metadata.SqlSegmentsMetadataQuery;
29+
import org.apache.druid.metadata.TestDerbyConnector;
30+
import org.apache.druid.segment.TestDataSource;
31+
import org.apache.druid.segment.TestHelper;
32+
import org.apache.druid.server.coordinator.CreateDataSegments;
33+
import org.apache.druid.timeline.DataSegment;
34+
import org.joda.time.DateTime;
35+
import org.joda.time.Interval;
36+
import org.openjdk.jmh.annotations.Benchmark;
37+
import org.openjdk.jmh.annotations.BenchmarkMode;
38+
import org.openjdk.jmh.annotations.Fork;
39+
import org.openjdk.jmh.annotations.Level;
40+
import org.openjdk.jmh.annotations.Measurement;
41+
import org.openjdk.jmh.annotations.Mode;
42+
import org.openjdk.jmh.annotations.OutputTimeUnit;
43+
import org.openjdk.jmh.annotations.Scope;
44+
import org.openjdk.jmh.annotations.Setup;
45+
import org.openjdk.jmh.annotations.State;
46+
import org.openjdk.jmh.annotations.TearDown;
47+
import org.openjdk.jmh.annotations.Warmup;
48+
import org.openjdk.jmh.infra.Blackhole;
49+
50+
import java.util.List;
51+
import java.util.Map;
52+
import java.util.Set;
53+
import java.util.concurrent.TimeUnit;
54+
import java.util.function.Function;
55+
56+
@State(Scope.Benchmark)
57+
@Fork(value = 1)
58+
@Warmup(iterations = 1, time = 1)
59+
@Measurement(iterations = 20, time = 2)
60+
@BenchmarkMode({Mode.AverageTime})
61+
@OutputTimeUnit(TimeUnit.MILLISECONDS)
62+
public class SqlSegmentsMetadataQueryBenchmark
63+
{
64+
65+
private static final DateTime JAN_1 = DateTimes.of("2025-01-01");
66+
private static final String V1 = JAN_1.toString();
67+
private static final List<DataSegment> WIKI_SEGMENTS_1000X100D
68+
= CreateDataSegments.ofDatasource(TestDataSource.WIKI)
69+
.forIntervals(100, Granularities.DAY)
70+
.withNumPartitions(1000)
71+
.startingAt(JAN_1)
72+
.withVersion(V1)
73+
.eachOfSizeInMb(500);
74+
75+
private TestDerbyConnector derbyConnector;
76+
77+
@Setup(Level.Trial)
78+
public void setup() throws Exception
79+
{
80+
this.derbyConnector = new TestDerbyConnector();
81+
derbyConnector.createDatabase();
82+
derbyConnector.createSegmentTable();
83+
insertSegments(WIKI_SEGMENTS_1000X100D.toArray(new DataSegment[0]));
84+
}
85+
86+
@TearDown(Level.Trial)
87+
public void tearDown() throws Exception
88+
{
89+
derbyConnector.tearDown();
90+
}
91+
92+
@Benchmark
93+
public void benchmarkRetrieveUsedSegments_returnAllSegments(Blackhole blackhole)
94+
{
95+
final Interval queryInterval = new Interval(JAN_1, JAN_1.plusDays(3));
96+
blackhole.consume(readAsSet(q -> q.retrieveUsedSegments(TestDataSource.WIKI, List.of(queryInterval))));
97+
}
98+
99+
@Benchmark
100+
public void benchmarkRetrieveUsedSegments_returnEmpty(Blackhole blackhole)
101+
{
102+
final Interval queryInterval = new Interval(JAN_1.minusDays(2), JAN_1.minusDays(1));
103+
blackhole.consume(readAsSet(q -> q.retrieveUsedSegments(TestDataSource.WIKI, List.of(queryInterval))));
104+
}
105+
106+
@Benchmark
107+
public void benchmarkRetrieveUsedSegments_returnFirstInterval(Blackhole blackhole)
108+
{
109+
final Interval queryInterval = new Interval(JAN_1, JAN_1.plusDays(1));
110+
blackhole.consume(readAsSet(q -> q.retrieveUsedSegments(TestDataSource.WIKI, List.of(queryInterval))));
111+
}
112+
113+
@Benchmark
114+
public void benchmarkRetrieveUsedSegments_returnLastInterval(Blackhole blackhole)
115+
{
116+
final Interval queryInterval = new Interval(JAN_1.plusDays(99), JAN_1.plusDays(100));
117+
blackhole.consume(readAsSet(q -> q.retrieveUsedSegments(TestDataSource.WIKI, List.of(queryInterval))));
118+
}
119+
120+
121+
@Benchmark
122+
public void benchmarkRetrieveUsedSegments_multipleIntervalsWithOverlaps(Blackhole blackhole)
123+
{
124+
List<Interval> intervals = List.of(
125+
new Interval(JAN_1, JAN_1.plusDays(3)),
126+
new Interval(JAN_1.plusDays(2), JAN_1.plusDays(17)),
127+
new Interval(JAN_1.plusDays(31), JAN_1.plusDays(36)),
128+
new Interval(JAN_1.plusDays(35), JAN_1.plusDays(54)),
129+
new Interval(JAN_1.plusDays(68), JAN_1.plusDays(98))
130+
);
131+
blackhole.consume(readAsSet(q -> q.retrieveUsedSegments(TestDataSource.WIKI, intervals)));
132+
}
133+
134+
private <T> Set<T> readAsSet(Function<SqlSegmentsMetadataQuery, CloseableIterator<T>> iterableReader)
135+
{
136+
final MetadataStorageTablesConfig tablesConfig = derbyConnector.getMetadataTablesConfig();
137+
138+
return derbyConnector.inReadOnlyTransaction((handle, status) -> {
139+
final SqlSegmentsMetadataQuery query =
140+
SqlSegmentsMetadataQuery.forHandle(handle, derbyConnector, tablesConfig, TestHelper.JSON_MAPPER);
141+
142+
try (CloseableIterator<T> iterator = iterableReader.apply(query)) {
143+
return ImmutableSet.copyOf(iterator);
144+
}
145+
});
146+
}
147+
148+
private void insertSegments(DataSegment... segments)
149+
{
150+
IndexerSqlMetadataStorageCoordinatorTestBase.insertUsedSegments(
151+
Set.of(segments),
152+
Map.of(),
153+
derbyConnector,
154+
TestHelper.JSON_MAPPER
155+
);
156+
}
157+
}

indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskActionTestKit.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -117,8 +117,8 @@ public void before()
117117
emitter = new StubServiceEmitter();
118118
taskStorage = new HeapMemoryTaskStorage(new TaskStorageConfig(new Period("PT24H")));
119119
testDerbyConnector = new TestDerbyConnector(
120-
Suppliers.ofInstance(new MetadataStorageConnectorConfig()),
121-
Suppliers.ofInstance(metadataStorageTablesConfig)
120+
new MetadataStorageConnectorConfig(),
121+
metadataStorageTablesConfig
122122
);
123123
final ObjectMapper objectMapper = new TestUtils().getTestObjectMapper();
124124
final SegmentSchemaManager segmentSchemaManager = new SegmentSchemaManager(

server/src/test/java/org/apache/druid/metadata/IndexerSqlMetadataStorageCoordinatorTestBase.java

Lines changed: 70 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,8 @@
3131
import org.apache.druid.java.util.common.StringUtils;
3232
import org.apache.druid.java.util.common.jackson.JacksonUtils;
3333
import org.apache.druid.java.util.common.parsers.CloseableIterator;
34-
import org.apache.druid.segment.SegmentSchemaMapping;
3534
import org.apache.druid.segment.TestDataSource;
3635
import org.apache.druid.segment.TestHelper;
37-
import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
3836
import org.apache.druid.segment.metadata.FingerprintGenerator;
3937
import org.apache.druid.segment.metadata.SegmentSchemaManager;
4038
import org.apache.druid.segment.metadata.SegmentSchemaTestUtils;
@@ -332,14 +330,14 @@ protected List<DataSegment> createAndGetUsedYearSegments(final int startYear, fi
332330

333331
for (int year = startYear; year < endYear; year++) {
334332
segments.add(createSegment(
335-
Intervals.of("%d/%d", year, year + 1),
336-
"version",
337-
new LinearShardSpec(0))
333+
Intervals.of("%d/%d", year, year + 1),
334+
"version",
335+
new LinearShardSpec(0)
336+
)
338337
);
339338
}
340339
final Set<DataSegment> segmentsSet = new HashSet<>(segments);
341-
final Set<DataSegment> committedSegments = coordinator.commitSegments(segmentsSet, new SegmentSchemaMapping(
342-
CentralizedDatasourceSchemaConfig.SCHEMA_VERSION));
340+
final Set<DataSegment> committedSegments = coordinator.commitSegments(segmentsSet, null);
343341
Assert.assertTrue(committedSegments.containsAll(segmentsSet));
344342

345343
return segments;
@@ -363,7 +361,15 @@ protected ImmutableList<DataSegment> retrieveUnusedSegments(
363361
tablesConfig,
364362
mapper
365363
)
366-
.retrieveUnusedSegments(TestDataSource.WIKI, intervals, null, limit, lastSegmentId, sortOrder, maxUsedStatusLastUpdatedTime)) {
364+
.retrieveUnusedSegments(
365+
TestDataSource.WIKI,
366+
intervals,
367+
null,
368+
limit,
369+
lastSegmentId,
370+
sortOrder,
371+
maxUsedStatusLastUpdatedTime
372+
)) {
367373
return ImmutableList.copyOf(iterator);
368374
}
369375
}
@@ -383,7 +389,15 @@ protected ImmutableList<DataSegmentPlus> retrieveUnusedSegmentsPlus(
383389
(handle, status) -> {
384390
try (final CloseableIterator<DataSegmentPlus> iterator =
385391
SqlSegmentsMetadataQuery.forHandle(handle, derbyConnector, tablesConfig, mapper)
386-
.retrieveUnusedSegmentsPlus(TestDataSource.WIKI, intervals, null, limit, lastSegmentId, sortOrder, maxUsedStatusLastUpdatedTime)) {
392+
.retrieveUnusedSegmentsPlus(
393+
TestDataSource.WIKI,
394+
intervals,
395+
null,
396+
limit,
397+
lastSegmentId,
398+
sortOrder,
399+
maxUsedStatusLastUpdatedTime
400+
)) {
387401
return ImmutableList.copyOf(iterator);
388402
}
389403
}
@@ -393,11 +407,20 @@ protected ImmutableList<DataSegmentPlus> retrieveUnusedSegmentsPlus(
393407
protected void verifyContainsAllSegmentsPlus(
394408
List<DataSegment> expectedSegments,
395409
List<DataSegmentPlus> actualUnusedSegmentsPlus,
396-
DateTime usedStatusLastUpdatedTime)
410+
DateTime usedStatusLastUpdatedTime
411+
)
397412
{
398-
Map<SegmentId, DataSegment> expectedIdToSegment = expectedSegments.stream().collect(Collectors.toMap(DataSegment::getId, Function.identity()));
413+
Map<SegmentId, DataSegment> expectedIdToSegment = expectedSegments.stream()
414+
.collect(Collectors.toMap(
415+
DataSegment::getId,
416+
Function.identity()
417+
));
399418
Map<SegmentId, DataSegmentPlus> actualIdToSegmentPlus = actualUnusedSegmentsPlus.stream()
400-
.collect(Collectors.toMap(d -> d.getDataSegment().getId(), Function.identity()));
419+
.collect(Collectors.toMap(
420+
d -> d.getDataSegment()
421+
.getId(),
422+
Function.identity()
423+
));
401424
Assert.assertTrue(expectedIdToSegment.entrySet().stream().allMatch(e -> {
402425
DataSegmentPlus segmentPlus = actualIdToSegmentPlus.get(e.getKey());
403426
return segmentPlus != null
@@ -482,7 +505,11 @@ protected List<DataSegment> retrieveUsedSegments(MetadataStorageTablesConfig tab
482505
final String table = tablesConfig.getSegmentsTable();
483506
return derbyConnector.retryWithHandle(
484507
handle -> handle.createQuery("SELECT payload FROM " + table + " WHERE used = true ORDER BY id")
485-
.map((index, result, context) -> JacksonUtils.readValue(mapper, result.getBytes(1), DataSegment.class))
508+
.map((index, result, context) -> JacksonUtils.readValue(
509+
mapper,
510+
result.getBytes(1),
511+
DataSegment.class
512+
))
486513
.list()
487514
);
488515
}
@@ -497,7 +524,10 @@ protected List<String> retrieveUnusedSegmentIds(MetadataStorageTablesConfig tabl
497524
);
498525
}
499526

500-
protected Map<String, String> getSegmentsCommittedDuringReplaceTask(String taskId, MetadataStorageTablesConfig tablesConfig)
527+
protected Map<String, String> getSegmentsCommittedDuringReplaceTask(
528+
String taskId,
529+
MetadataStorageTablesConfig tablesConfig
530+
)
501531
{
502532
final String table = tablesConfig.getUpgradeSegmentsTable();
503533
return derbyConnector.retryWithHandle(handle -> {
@@ -523,7 +553,10 @@ protected Map<String, String> getSegmentsCommittedDuringReplaceTask(String taskI
523553
});
524554
}
525555

526-
protected void insertIntoUpgradeSegmentsTable(Map<DataSegment, ReplaceTaskLock> segmentToTaskLockMap, MetadataStorageTablesConfig tablesConfig)
556+
protected void insertIntoUpgradeSegmentsTable(
557+
Map<DataSegment, ReplaceTaskLock> segmentToTaskLockMap,
558+
MetadataStorageTablesConfig tablesConfig
559+
)
527560
{
528561
final String table = tablesConfig.getUpgradeSegmentsTable();
529562
derbyConnector.retryWithHandle(
@@ -562,6 +595,16 @@ public static void insertUsedSegments(
562595
TestDerbyConnector.DerbyConnectorRule derbyConnectorRule,
563596
ObjectMapper jsonMapper
564597
)
598+
{
599+
insertUsedSegments(dataSegments, upgradedFromSegmentIdMap, derbyConnectorRule.getConnector(), jsonMapper);
600+
}
601+
602+
public static void insertUsedSegments(
603+
Set<DataSegment> dataSegments,
604+
Map<String, String> upgradedFromSegmentIdMap,
605+
TestDerbyConnector connector,
606+
ObjectMapper jsonMapper
607+
)
565608
{
566609
final Set<DataSegmentPlus> usedSegments = new HashSet<>();
567610
for (DataSegment segment : dataSegments) {
@@ -579,7 +622,7 @@ public static void insertUsedSegments(
579622
);
580623
}
581624

582-
insertSegments(usedSegments, false, derbyConnectorRule, jsonMapper);
625+
insertSegments(usedSegments, false, connector, jsonMapper);
583626
}
584627

585628
public static void insertSegments(
@@ -589,8 +632,17 @@ public static void insertSegments(
589632
ObjectMapper jsonMapper
590633
)
591634
{
592-
final TestDerbyConnector connector = derbyConnectorRule.getConnector();
593-
final String table = derbyConnectorRule.metadataTablesConfigSupplier().get().getSegmentsTable();
635+
insertSegments(dataSegments, includeSchema, derbyConnectorRule.getConnector(), jsonMapper);
636+
}
637+
638+
public static void insertSegments(
639+
Set<DataSegmentPlus> dataSegments,
640+
boolean includeSchema,
641+
TestDerbyConnector connector,
642+
ObjectMapper jsonMapper
643+
)
644+
{
645+
final String table = connector.getMetadataTablesConfig().getSegmentsTable();
594646

595647
final String sql = getSegmentInsertSql(includeSchema, table, connector);
596648
connector.retryWithHandle(

0 commit comments

Comments
 (0)