Skip to content

Commit d6a6261

Browse files
committed
shard
1 parent 1fe82d9 commit d6a6261

5 files changed

Lines changed: 142 additions & 24 deletions

File tree

multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQCompactionTaskRunTest.java

Lines changed: 74 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import com.fasterxml.jackson.databind.ObjectMapper;
2323
import com.google.common.base.Preconditions;
24+
import com.google.common.collect.ImmutableList;
2425
import com.google.common.collect.Iterables;
2526
import com.google.common.util.concurrent.Futures;
2627
import com.google.inject.Guice;
@@ -36,7 +37,9 @@
3637
import org.apache.druid.guice.annotations.Json;
3738
import org.apache.druid.indexer.TaskStatus;
3839
import org.apache.druid.indexer.granularity.UniformGranularitySpec;
40+
import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec;
3941
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
42+
import org.apache.druid.indexer.partitions.PartitionsSpec;
4043
import org.apache.druid.indexing.common.LockGranularity;
4144
import org.apache.druid.indexing.common.TaskToolbox;
4245
import org.apache.druid.indexing.common.TestUtils;
@@ -46,6 +49,7 @@
4649
import org.apache.druid.indexing.common.task.CompactionTaskRunBase;
4750
import org.apache.druid.indexing.common.task.IndexTask;
4851
import org.apache.druid.indexing.common.task.Tasks;
52+
import org.apache.druid.indexing.common.task.TuningConfigBuilder;
4953
import org.apache.druid.java.util.common.Intervals;
5054
import org.apache.druid.java.util.common.Pair;
5155
import org.apache.druid.java.util.common.concurrent.Execs;
@@ -76,6 +80,7 @@
7680
import org.apache.druid.segment.IndexSpec;
7781
import org.apache.druid.segment.QueryableIndexSegment;
7882
import org.apache.druid.segment.ReferenceCountedSegmentProvider;
83+
import org.apache.druid.segment.indexing.TuningConfig;
7984
import org.apache.druid.segment.loading.AcquireSegmentAction;
8085
import org.apache.druid.segment.loading.AcquireSegmentResult;
8186
import org.apache.druid.segment.loading.DataSegmentPusher;
@@ -89,6 +94,7 @@
8994
import org.apache.druid.timeline.CompactionState;
9095
import org.apache.druid.timeline.DataSegment;
9196
import org.apache.druid.timeline.partition.NumberedShardSpec;
97+
import org.apache.druid.timeline.partition.ShardSpec;
9298
import org.joda.time.Interval;
9399
import org.junit.Assert;
94100
import org.junit.Assume;
@@ -581,18 +587,82 @@ public void testIncrementalCompaction() throws Exception
581587
Assert.assertEquals(1, resultPair2.rhs.getSegments().size());
582588
final DataSegment compactedSegment2 = Iterables.getOnlyElement(resultPair2.rhs.getSegments());
583589

584-
final List<DataSegment> usedSegments =
585-
coordinatorClient.fetchUsedSegments(DATA_SOURCE, List.of(Intervals.of("2014-01-01/2014-01-02"))).get();
590+
final List<String> usedSegments =
591+
coordinatorClient.fetchUsedSegments(DATA_SOURCE, List.of(Intervals.of("2014-01-01/2014-01-02")))
592+
.get()
593+
.stream()
594+
.map(DataSegment::toString)
595+
.collect(Collectors.toList());
586596
Assert.assertEquals(
587597
List.of(
588-
compactedSegment2,
598+
compactedSegment2.withShardSpec(new NumberedShardSpec(0, 2)).toString(),
599+
// shard spec in compactedSegment2 has been updated
589600
compactedSegment1.toBuilder()
590601
.shardSpec(new NumberedShardSpec(1, 2))
591602
.version(compactedSegment2.getVersion())
592-
.build() // compactedSegment1 has been upgraded with the new version & shardSpec
603+
.build()
604+
.toString() // compactedSegment1 has been upgraded with the new version & shardSpec
593605
), usedSegments);
594606
}
595607

608+
@Test
609+
public void testIncrementalCompactionRangePartition() throws Exception
610+
{
611+
List<String> rows = ImmutableList.of(
612+
"2014-01-01T00:00:10Z,a,1\n",
613+
"2014-01-01T00:00:10Z,b,2\n",
614+
"2014-01-01T00:00:10Z,c,3\n",
615+
"2014-01-01T01:00:20Z,a,1\n",
616+
"2014-01-01T01:00:20Z,b,2\n",
617+
"2014-01-01T01:00:20Z,c,3\n",
618+
"2014-01-01T02:00:30Z,a,1\n",
619+
"2014-01-01T02:00:30Z,b,2\n",
620+
"2014-01-01T02:00:30Z,c,3\n"
621+
);
622+
Assume.assumeTrue(lockGranularity == LockGranularity.TIME_CHUNK);
623+
Assume.assumeTrue("Incremental compaction depends on concurrent lock", useConcurrentLocks);
624+
verifyTaskSuccessRowsAndSchemaMatch(
625+
runTask(buildIndexTask(DEFAULT_PARSE_SPEC, rows, inputInterval, false)),
626+
9
627+
);
628+
629+
PartitionsSpec rangePartitionSpec = new DimensionRangePartitionsSpec(null, 3, List.of("dim"), false);
630+
TuningConfig tuningConfig = TuningConfigBuilder.forCompactionTask()
631+
.withMaxTotalRows(Long.MAX_VALUE)
632+
.withPartitionsSpec(rangePartitionSpec)
633+
.withForceGuaranteedRollup(true)
634+
.build();
635+
final CompactionTask compactionTask1 =
636+
compactionTaskBuilder(segmentGranularity).interval(inputInterval, true).tuningConfig(tuningConfig).build();
637+
638+
final Pair<TaskStatus, DataSegmentsWithSchemas> resultPair1 = runTask(compactionTask1);
639+
verifyTaskSuccessRowsAndSchemaMatch(resultPair1, 9);
640+
Assert.assertEquals(3, resultPair1.rhs.getSegments().size());
641+
642+
Pair<TaskStatus, DataSegmentsWithSchemas> appendTask =
643+
runTask(buildIndexTask(DEFAULT_PARSE_SPEC, rows, inputInterval, true));
644+
verifyTaskSuccessRowsAndSchemaMatch(appendTask, 9);
645+
646+
List<SegmentDescriptor> uncompacted = appendTask.rhs.getSegments()
647+
.stream()
648+
.map(DataSegment::toDescriptor)
649+
.collect(Collectors.toList());
650+
final CompactionTask compactionTask2 =
651+
compactionTaskBuilder(segmentGranularity)
652+
.inputSpec(new CompactionIntervalSpec(inputInterval, uncompacted, null), true)
653+
.tuningConfig(tuningConfig)
654+
.build();
655+
final Pair<TaskStatus, DataSegmentsWithSchemas> resultPair2 = runTask(compactionTask2);
656+
verifyTaskSuccessRowsAndSchemaMatch(resultPair2, 9);
657+
Assert.assertEquals(3, resultPair2.rhs.getSegments().size());
658+
659+
final List<DataSegment> usedSegments =
660+
coordinatorClient.fetchUsedSegments(DATA_SOURCE, List.of(Intervals.of("2014-01-01/2014-01-02"))).get();
661+
Assert.assertEquals(6, usedSegments.size());
662+
final List<ShardSpec> shards = usedSegments.stream().map(DataSegment::getShardSpec).collect(Collectors.toList());
663+
Assert.assertEquals(Set.of("range"), shards.stream().map(ShardSpec::getType).collect(Collectors.toSet()));
664+
}
665+
596666
@Test
597667
public void testIncrementalCompactionOverlappingInterval() throws Exception
598668
{

processing/src/main/java/org/apache/druid/timeline/partition/DimensionRangeShardSpec.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,18 @@ public int getNumCorePartitions()
104104
return numCorePartitions;
105105
}
106106

107+
@Override
108+
public ShardSpec withPartitionNum(int partitionNum1)
109+
{
110+
return new DimensionRangeShardSpec(dimensions, start, end, partitionNum1, numCorePartitions);
111+
}
112+
113+
@Override
114+
public ShardSpec withCorePartitions(int partitions1)
115+
{
116+
return new DimensionRangeShardSpec(dimensions, start, end, partitionNum, partitions1);
117+
}
118+
107119
public boolean isNumCorePartitionsUnknown()
108120
{
109121
return numCorePartitions == UNKNOWN_NUM_CORE_PARTITIONS;

processing/src/main/java/org/apache/druid/timeline/partition/NumberedShardSpec.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,18 @@ public int getNumCorePartitions()
100100
return partitions;
101101
}
102102

103+
@Override
104+
public ShardSpec withPartitionNum(int partitionNum1)
105+
{
106+
return new NumberedShardSpec(partitionNum1, partitions);
107+
}
108+
109+
@Override
110+
public ShardSpec withCorePartitions(int partitions1)
111+
{
112+
return new NumberedShardSpec(partitionNum, partitions1);
113+
}
114+
103115
@Override
104116
public <T> PartitionChunk<T> createChunk(T obj)
105117
{

processing/src/main/java/org/apache/druid/timeline/partition/ShardSpec.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import com.fasterxml.jackson.annotation.JsonSubTypes;
2424
import com.fasterxml.jackson.annotation.JsonTypeInfo;
2525
import com.google.common.collect.RangeSet;
26+
import org.apache.druid.error.DruidException;
2627

2728
import java.util.List;
2829
import java.util.Map;
@@ -66,6 +67,16 @@ public interface ShardSpec
6667

6768
int getNumCorePartitions();
6869

70+
default ShardSpec withPartitionNum(int partitionNum1)
71+
{
72+
throw DruidException.defensive("ShardSpec[%s] does not implement withPartitionNum", this.getClass().toString());
73+
}
74+
75+
default ShardSpec withCorePartitions(int partitions)
76+
{
77+
throw DruidException.defensive("ShardSpec[%s] does not implement withCorePartitions", this.getClass().toString());
78+
}
79+
6980
/**
7081
* Returns the start root partition ID of the atomic update group which this segment belongs to.
7182
*
@@ -119,6 +130,7 @@ default short getAtomicUpdateGroupSize()
119130

120131
/**
121132
* if given domain ranges are not possible in this shard, return false; otherwise return true;
133+
*
122134
* @return possibility of in domain
123135
*/
124136
@JsonIgnore

server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java

Lines changed: 32 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -495,19 +495,16 @@ public SegmentPublishResult commitReplaceSegments(
495495
final SegmentPublishResult result = inReadWriteDatasourceTransaction(
496496
dataSource,
497497
transaction -> {
498-
final Set<DataSegment> segmentsToInsert = new HashSet<>(replaceSegments);
499-
500-
Set<DataSegmentPlus> upgradedSegments = createNewIdsOfAppendSegmentsAfterReplace(
498+
final Pair<Set<DataSegmentPlus>, Set<DataSegment>> newSegments = createNewSegmentsAfterReplace(
501499
dataSource,
502500
transaction,
503501
replaceSegments,
504502
locksHeldByReplaceTask
505503
);
506-
504+
final Set<DataSegment> segmentsToInsert = newSegments.rhs;
507505
Map<SegmentId, SegmentMetadata> upgradeSegmentMetadata = new HashMap<>();
508506
final Map<String, String> upgradedFromSegmentIdMap = new HashMap<>();
509-
for (DataSegmentPlus dataSegmentPlus : upgradedSegments) {
510-
segmentsToInsert.add(dataSegmentPlus.getDataSegment());
507+
for (DataSegmentPlus dataSegmentPlus : newSegments.lhs) {
511508
if (dataSegmentPlus.getSchemaFingerprint() != null && dataSegmentPlus.getNumRows() != null) {
512509
upgradeSegmentMetadata.put(
513510
dataSegmentPlus.getDataSegment().getId(),
@@ -1856,9 +1853,18 @@ protected Set<DataSegment> insertSegments(
18561853
}
18571854

18581855
/**
1859-
* Creates new versions of segments appended while a "REPLACE" task was in progress.
1856+
* Creates upgraded versions of segments that were appended while a REPLACE task was in progress.
1857+
* Upgraded segments get new intervals, versions, and partition numbers to maintain consistency
1858+
* with the version created by the REPLACE task.
1859+
*
1860+
* @param dataSource The datasource being modified
1861+
* @param transaction The segment metadata transaction
1862+
* @param replaceSegments Segments being committed by the REPLACE task
1863+
* @param locksHeldByReplaceTask Replace locks held by the task
1864+
* @return Pair of (upgraded segments, all segments to insert with updated shard specs)
1865+
* @throws DruidException if a replace interval partially overlaps an appended segment
18601866
*/
1861-
private Set<DataSegmentPlus> createNewIdsOfAppendSegmentsAfterReplace(
1867+
private Pair<Set<DataSegmentPlus>, Set<DataSegment>> createNewSegmentsAfterReplace(
18621868
final String dataSource,
18631869
final SegmentMetadataTransaction transaction,
18641870
final Set<DataSegment> replaceSegments,
@@ -1868,15 +1874,12 @@ private Set<DataSegmentPlus> createNewIdsOfAppendSegmentsAfterReplace(
18681874
// If a "REPLACE" task has locked an interval, it would commit some segments
18691875
// (or at least tombstones) in that interval (except in LEGACY_REPLACE ingestion mode)
18701876
if (replaceSegments.isEmpty() || locksHeldByReplaceTask.isEmpty()) {
1871-
return Collections.emptySet();
1877+
return Pair.of(Collections.emptySet(), Collections.emptySet());
18721878
}
18731879

1874-
// For each replace interval, find the number of core partitions and total partitions
1875-
final Map<Interval, Integer> intervalToNumCorePartitions = new HashMap<>();
1880+
// For each replace interval, find the current partition number
18761881
final Map<Interval, Integer> intervalToCurrentPartitionNum = new HashMap<>();
18771882
for (DataSegment segment : replaceSegments) {
1878-
intervalToNumCorePartitions.put(segment.getInterval(), segment.getShardSpec().getNumCorePartitions());
1879-
18801883
int partitionNum = segment.getShardSpec().getPartitionNum();
18811884
intervalToCurrentPartitionNum.compute(
18821885
segment.getInterval(),
@@ -1895,12 +1898,12 @@ private Set<DataSegmentPlus> createNewIdsOfAppendSegmentsAfterReplace(
18951898
= retrieveSegmentsById(dataSource, transaction, upgradeSegmentToLockVersion.keySet());
18961899

18971900
if (segmentsToUpgrade.isEmpty()) {
1898-
return Collections.emptySet();
1901+
return Pair.of(Collections.emptySet(), replaceSegments);
18991902
}
19001903

1901-
final Set<Interval> replaceIntervals = intervalToNumCorePartitions.keySet();
1902-
1904+
final Set<Interval> replaceIntervals = intervalToCurrentPartitionNum.keySet();
19031905
final Set<DataSegmentPlus> upgradedSegments = new HashSet<>();
1906+
final Set<DataSegment> segmentsToInsert = new HashSet<>(replaceSegments);
19041907
for (DataSegmentPlus oldSegmentMetadata : segmentsToUpgrade) {
19051908
// Determine interval of the upgraded segment
19061909
DataSegment oldSegment = oldSegmentMetadata.getDataSegment();
@@ -1935,9 +1938,7 @@ private Set<DataSegmentPlus> createNewIdsOfAppendSegmentsAfterReplace(
19351938
newInterval,
19361939
(i, value) -> value == null ? 0 : value + 1
19371940
);
1938-
final int numCorePartitions = intervalToNumCorePartitions.get(newInterval);
1939-
ShardSpec shardSpec = new NumberedShardSpec(partitionNum, numCorePartitions);
1940-
1941+
final ShardSpec shardSpec = oldSegment.getShardSpec().withPartitionNum(partitionNum);
19411942
// Create upgraded segment with the correct interval, version and shard spec
19421943
String lockVersion = upgradeSegmentToLockVersion.get(oldSegment.getId().toString());
19431944
DataSegment dataSegment = DataSegment.builder(oldSegment)
@@ -1963,9 +1964,20 @@ private Set<DataSegmentPlus> createNewIdsOfAppendSegmentsAfterReplace(
19631964
oldSegmentMetadata.getIndexingStateFingerprint()
19641965
)
19651966
);
1967+
segmentsToInsert.add(dataSegment);
19661968
}
19671969

1968-
return upgradedSegments;
1970+
// update corePartitions in shard spec
1971+
return Pair.of(upgradedSegments, segmentsToInsert.stream().map(segment -> {
1972+
Integer partitionNum = intervalToCurrentPartitionNum.get(segment.getInterval());
1973+
if (!segment.isTombstone()
1974+
&& partitionNum != null
1975+
&& partitionNum + 1 != segment.getShardSpec().getNumCorePartitions()) {
1976+
return segment.withShardSpec(segment.getShardSpec().withCorePartitions(partitionNum + 1));
1977+
} else {
1978+
return segment;
1979+
}
1980+
}).collect(Collectors.toSet()));
19691981
}
19701982

19711983
/**

0 commit comments

Comments
 (0)