Skip to content

Commit 65eb63e

Browse files
authored
[To rel/0.12][IOTDB-2641] Fix time range of TsFile resource overlaps after unseq compaction (#5155)
1 parent 24e35d1 commit 65eb63e

File tree

4 files changed

+296
-5
lines changed

4 files changed

+296
-5
lines changed

server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeMultiChunkTask.java

+18-2
Original file line numberDiff line numberDiff line change
@@ -315,6 +315,7 @@ private void pathsMergeOneFile(int seqFileIdx, IPointReader[] unseqReaders)
315315
mergeFileWriter.startChunkGroup(deviceId);
316316
boolean dataWritten =
317317
mergeChunks(
318+
deviceId,
318319
seqChunkMeta,
319320
isLastFile,
320321
fileSequenceReader,
@@ -343,6 +344,7 @@ private List<Integer> filterNoDataPaths(List[] seqChunkMeta, int seqFileIdx) {
343344

344345
@SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
345346
private boolean mergeChunks(
347+
String deviceId,
346348
List<ChunkMetadata>[] seqChunkMeta,
347349
boolean isLastFile,
348350
TsFileSequenceReader reader,
@@ -396,6 +398,7 @@ private boolean mergeChunks(
396398
unseqReaders,
397399
currFile,
398400
isLastFile,
401+
currFile.getEndTime(deviceId),
399402
i)));
400403

401404
if (Thread.interrupted()) {
@@ -451,6 +454,8 @@ private int mergeChunkV2(
451454
ChunkMetadata currMeta,
452455
boolean chunkOverflowed,
453456
boolean chunkTooSmall,
457+
boolean isLastChunk,
458+
long resourceEndTime,
454459
Chunk chunk,
455460
int lastUnclosedChunkPoint,
456461
int pathIdx,
@@ -500,7 +505,12 @@ private int mergeChunkV2(
500505
} else {
501506
// 3.2 SK is overflowed, uncompress sequence chunk and merge with unseq chunk, then write
502507
unclosedChunkPoint +=
503-
writeChunkWithUnseq(chunk, chunkWriter, unseqReader, currMeta.getEndTime(), pathIdx);
508+
writeChunkWithUnseq(
509+
chunk,
510+
chunkWriter,
511+
unseqReader,
512+
isLastChunk ? resourceEndTime + 1 : currMeta.getEndTime(),
513+
pathIdx);
504514
mergedChunkNum.incrementAndGet();
505515
}
506516

@@ -589,6 +599,7 @@ public class MergeChunkHeapTask implements Callable<Void> {
589599
private TsFileResource currFile;
590600
private boolean isLastFile;
591601
private int taskNum;
602+
private long endTimeOfCurrentResource;
592603

593604
private int totalSeriesNum;
594605

@@ -601,6 +612,7 @@ public MergeChunkHeapTask(
601612
IPointReader[] unseqReaders,
602613
TsFileResource currFile,
603614
boolean isLastFile,
615+
long endTimeOfCurrentResource,
604616
int taskNum) {
605617
this.chunkIdxHeap = chunkIdxHeap;
606618
this.metaListEntries = metaListEntries;
@@ -612,6 +624,7 @@ public MergeChunkHeapTask(
612624
this.isLastFile = isLastFile;
613625
this.taskNum = taskNum;
614626
this.totalSeriesNum = chunkIdxHeap.size();
627+
this.endTimeOfCurrentResource = endTimeOfCurrentResource;
615628
}
616629

617630
@Override
@@ -637,7 +650,8 @@ private void mergeChunkHeap() throws IOException, MetadataException {
637650
ChunkMetadata currMeta = metaListEntry.current();
638651
boolean isLastChunk = !metaListEntry.hasNext();
639652
boolean chunkOverflowed =
640-
MergeUtils.isChunkOverflowed(currTimeValuePairs[pathIdx], currMeta);
653+
MergeUtils.isChunkOverflowed(
654+
currTimeValuePairs[pathIdx], currMeta, isLastChunk, endTimeOfCurrentResource);
641655
boolean chunkTooSmall =
642656
MergeUtils.isChunkTooSmall(
643657
ptWrittens[pathIdx], currMeta, isLastChunk, minChunkPointNum);
@@ -651,6 +665,8 @@ private void mergeChunkHeap() throws IOException, MetadataException {
651665
currMeta,
652666
chunkOverflowed,
653667
chunkTooSmall,
668+
isLastChunk,
669+
endTimeOfCurrentResource,
654670
chunk,
655671
ptWrittens[pathIdx],
656672
pathIdx,

server/src/main/java/org/apache/iotdb/db/utils/MergeUtils.java

+8-2
Original file line numberDiff line numberDiff line change
@@ -225,8 +225,14 @@ private static void collectUnseqChunks(
225225
}
226226
}
227227

228-
public static boolean isChunkOverflowed(TimeValuePair timeValuePair, ChunkMetadata metaData) {
229-
return timeValuePair != null && timeValuePair.getTimestamp() <= metaData.getEndTime();
228+
public static boolean isChunkOverflowed(
229+
TimeValuePair timeValuePair,
230+
ChunkMetadata metaData,
231+
boolean isLastChunk,
232+
long currentResourceEndTime) {
233+
return timeValuePair != null
234+
&& (timeValuePair.getTimestamp() <= metaData.getEndTime()
235+
|| (isLastChunk && timeValuePair.getTimestamp() <= currentResourceEndTime));
230236
}
231237

232238
public static boolean isChunkTooSmall(

server/src/test/java/org/apache/iotdb/db/engine/merge/MergeManagerTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ class FakedSubMergeTask extends MergeChunkHeapTask {
144144
private String progress = "0";
145145

146146
public FakedSubMergeTask(int serialNum) {
147-
super(new PriorityQueue<>(), null, null, null, null, null, null, false, serialNum);
147+
super(new PriorityQueue<>(), null, null, null, null, null, null, false, 0, serialNum);
148148
this.serialNum = serialNum;
149149
}
150150

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,269 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iotdb.db.engine.merge;
21+
22+
import org.apache.iotdb.db.conf.IoTDBConstant;
23+
import org.apache.iotdb.db.conf.IoTDBDescriptor;
24+
import org.apache.iotdb.db.constant.TestConstant;
25+
import org.apache.iotdb.db.engine.merge.manage.MergeManager;
26+
import org.apache.iotdb.db.engine.merge.manage.MergeResource;
27+
import org.apache.iotdb.db.engine.merge.task.MergeTask;
28+
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
29+
import org.apache.iotdb.db.exception.StorageEngineException;
30+
import org.apache.iotdb.db.exception.metadata.MetadataException;
31+
import org.apache.iotdb.db.service.IoTDB;
32+
import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
33+
import org.apache.iotdb.tsfile.read.common.Path;
34+
import org.apache.iotdb.tsfile.write.TsFileWriter;
35+
import org.apache.iotdb.tsfile.write.record.TSRecord;
36+
import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint;
37+
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
38+
39+
import org.junit.After;
40+
import org.junit.Assert;
41+
import org.junit.Before;
42+
import org.junit.Test;
43+
44+
import java.io.File;
45+
import java.io.IOException;
46+
import java.util.HashMap;
47+
import java.util.Map;
48+
49+
public class MergeVerificationTest extends MergeTest {
50+
51+
private int prevMergeChunkThreshold;
52+
File tempSGDir;
53+
54+
@Before
55+
public void setUp() throws MetadataException, IOException, WriteProcessException {
56+
IoTDB.metaManager.init();
57+
MergeManager.getINSTANCE().start();
58+
prevMergeChunkThreshold =
59+
IoTDBDescriptor.getInstance().getConfig().getMergeChunkPointNumberThreshold();
60+
IoTDBDescriptor.getInstance().getConfig().setMergeChunkPointNumberThreshold(-1);
61+
this.measurementNum = 2;
62+
this.deviceNum = 1;
63+
prepareSeries();
64+
this.prepareFiles();
65+
tempSGDir = new File(TestConstant.OUTPUT_DATA_DIR);
66+
if (!tempSGDir.exists()) {
67+
Assert.assertTrue(tempSGDir.mkdirs());
68+
}
69+
}
70+
71+
@After
72+
public void tearDown() throws StorageEngineException, IOException {
73+
IoTDBDescriptor.getInstance()
74+
.getConfig()
75+
.setMergeChunkPointNumberThreshold(prevMergeChunkThreshold);
76+
super.tearDown();
77+
}
78+
79+
private void prepareFiles() throws IOException, WriteProcessException {
80+
// first file time range: [0, 100]
81+
// first measurement: [0, 90]
82+
// second measurement: [0, 100]
83+
File firstFile =
84+
new File(
85+
TestConstant.OUTPUT_DATA_DIR.concat(
86+
1
87+
+ IoTDBConstant.FILE_NAME_SEPARATOR
88+
+ 1
89+
+ IoTDBConstant.FILE_NAME_SEPARATOR
90+
+ 0
91+
+ IoTDBConstant.FILE_NAME_SEPARATOR
92+
+ 0
93+
+ ".tsfile"));
94+
TsFileResource firstTsFileResource = new TsFileResource(firstFile);
95+
firstTsFileResource.setClosed(true);
96+
firstTsFileResource.setMinPlanIndex(1);
97+
firstTsFileResource.setMaxPlanIndex(1);
98+
firstTsFileResource.setVersion(1);
99+
seqResources.add(firstTsFileResource);
100+
if (!firstFile.getParentFile().exists()) {
101+
Assert.assertTrue(firstFile.getParentFile().mkdirs());
102+
}
103+
104+
TsFileWriter fileWriter = new TsFileWriter(firstFile);
105+
for (String deviceId : deviceIds) {
106+
for (MeasurementSchema measurementSchema : measurementSchemas) {
107+
fileWriter.registerTimeseries(
108+
new Path(deviceId, measurementSchema.getMeasurementId()), measurementSchema);
109+
}
110+
}
111+
for (long i = 0; i < 100; ++i) {
112+
for (int j = 0; j < deviceNum; j++) {
113+
TSRecord record = new TSRecord(i, deviceIds[j]);
114+
if (i < 90) {
115+
record.addTuple(
116+
DataPoint.getDataPoint(
117+
measurementSchemas[0].getType(),
118+
measurementSchemas[0].getMeasurementId(),
119+
String.valueOf(i)));
120+
}
121+
record.addTuple(
122+
DataPoint.getDataPoint(
123+
measurementSchemas[1].getType(),
124+
measurementSchemas[1].getMeasurementId(),
125+
String.valueOf(i)));
126+
fileWriter.write(record);
127+
firstTsFileResource.updateStartTime(deviceIds[j], i);
128+
firstTsFileResource.updateEndTime(deviceIds[j], i);
129+
}
130+
}
131+
for (int j = 0; j < deviceNum; j++) {
132+
TSRecord record = new TSRecord(100L, deviceIds[j]);
133+
record.addTuple(
134+
DataPoint.getDataPoint(
135+
measurementSchemas[1].getType(),
136+
measurementSchemas[1].getMeasurementId(),
137+
String.valueOf(100)));
138+
fileWriter.write(record);
139+
firstTsFileResource.updateStartTime(deviceIds[j], 100L);
140+
firstTsFileResource.updateEndTime(deviceIds[j], 100L);
141+
}
142+
143+
fileWriter.flushAllChunkGroups();
144+
fileWriter.close();
145+
146+
// second file time range: [101, 200]
147+
// first measurement: [101, 200]
148+
// second measurement: [101, 200]
149+
File secondFile =
150+
new File(
151+
TestConstant.OUTPUT_DATA_DIR.concat(
152+
2
153+
+ IoTDBConstant.FILE_NAME_SEPARATOR
154+
+ 2
155+
+ IoTDBConstant.FILE_NAME_SEPARATOR
156+
+ 0
157+
+ IoTDBConstant.FILE_NAME_SEPARATOR
158+
+ 0
159+
+ ".tsfile"));
160+
TsFileResource secondTsFileResource = new TsFileResource(secondFile);
161+
secondTsFileResource.setClosed(true);
162+
secondTsFileResource.setMinPlanIndex(2);
163+
secondTsFileResource.setMaxPlanIndex(2);
164+
secondTsFileResource.setVersion(2);
165+
seqResources.add(secondTsFileResource);
166+
167+
if (!secondFile.getParentFile().exists()) {
168+
Assert.assertTrue(secondFile.getParentFile().mkdirs());
169+
}
170+
fileWriter = new TsFileWriter(secondFile);
171+
for (String deviceId : deviceIds) {
172+
for (MeasurementSchema measurementSchema : measurementSchemas) {
173+
fileWriter.registerTimeseries(
174+
new Path(deviceId, measurementSchema.getMeasurementId()), measurementSchema);
175+
}
176+
}
177+
for (long i = 101; i < 201; ++i) {
178+
for (int j = 0; j < deviceNum; j++) {
179+
TSRecord record = new TSRecord(i, deviceIds[j]);
180+
for (int k = 0; k < measurementNum; k++) {
181+
record.addTuple(
182+
DataPoint.getDataPoint(
183+
measurementSchemas[k].getType(),
184+
measurementSchemas[k].getMeasurementId(),
185+
String.valueOf(i)));
186+
}
187+
fileWriter.write(record);
188+
secondTsFileResource.updateStartTime(deviceIds[j], i);
189+
secondTsFileResource.updateEndTime(deviceIds[j], i);
190+
}
191+
}
192+
fileWriter.flushAllChunkGroups();
193+
fileWriter.close();
194+
195+
// unseq file: [90, 110]
196+
// first measurement: [90, 110]
197+
// second measurement: [90, 110]
198+
File thirdFile =
199+
new File(
200+
TestConstant.OUTPUT_DATA_DIR.concat(
201+
3
202+
+ IoTDBConstant.FILE_NAME_SEPARATOR
203+
+ 3
204+
+ IoTDBConstant.FILE_NAME_SEPARATOR
205+
+ 0
206+
+ IoTDBConstant.FILE_NAME_SEPARATOR
207+
+ 0
208+
+ ".tsfile"));
209+
TsFileResource thirdTsFileResource = new TsFileResource(thirdFile);
210+
thirdTsFileResource.setClosed(true);
211+
thirdTsFileResource.setMinPlanIndex(2);
212+
thirdTsFileResource.setMaxPlanIndex(2);
213+
thirdTsFileResource.setVersion(2);
214+
unseqResources.add(thirdTsFileResource);
215+
216+
if (!secondFile.getParentFile().exists()) {
217+
Assert.assertTrue(thirdFile.getParentFile().mkdirs());
218+
}
219+
fileWriter = new TsFileWriter(thirdFile);
220+
for (String deviceId : deviceIds) {
221+
for (MeasurementSchema measurementSchema : measurementSchemas) {
222+
fileWriter.registerTimeseries(
223+
new Path(deviceId, measurementSchema.getMeasurementId()), measurementSchema);
224+
}
225+
}
226+
for (long i = 90; i < 111; ++i) {
227+
for (int j = 0; j < deviceNum; j++) {
228+
TSRecord record = new TSRecord(i, deviceIds[j]);
229+
for (int k = 0; k < measurementNum; k++) {
230+
record.addTuple(
231+
DataPoint.getDataPoint(
232+
measurementSchemas[k].getType(),
233+
measurementSchemas[k].getMeasurementId(),
234+
String.valueOf(i)));
235+
}
236+
fileWriter.write(record);
237+
thirdTsFileResource.updateStartTime(deviceIds[j], i);
238+
thirdTsFileResource.updateEndTime(deviceIds[j], i);
239+
}
240+
}
241+
fileWriter.flushAllChunkGroups();
242+
fileWriter.close();
243+
}
244+
245+
@Test
246+
public void testNotOverlapAfterMerge() throws Exception {
247+
MergeTask mergeTask =
248+
new MergeTask(
249+
new MergeResource(seqResources, unseqResources),
250+
tempSGDir.getPath(),
251+
(k, v, l) -> {},
252+
"test",
253+
true,
254+
1,
255+
MERGE_TEST_SG);
256+
mergeTask.call();
257+
Map<String, Long> deviceToEndTimeMap = new HashMap<>();
258+
for (TsFileResource resource : seqResources) {
259+
for (String deviceId : deviceIds) {
260+
long currentEndTime = resource.getEndTime(deviceId);
261+
long currentStartTime = resource.getStartTime(deviceId);
262+
if (deviceToEndTimeMap.containsKey(deviceId)) {
263+
Assert.assertTrue(deviceToEndTimeMap.get(deviceId) < currentStartTime);
264+
}
265+
deviceToEndTimeMap.put(deviceId, currentEndTime);
266+
}
267+
}
268+
}
269+
}

0 commit comments

Comments
 (0)