Skip to content

Commit 24e35d1

Browse files
authored
[To rel/0.12][IOTDB-2624] Fix "overlapped data should be consumed first" occurs when executing query (#5154)
1 parent 2bd8500 commit 24e35d1

File tree

2 files changed

+229
-7
lines changed

2 files changed

+229
-7
lines changed

server/src/main/java/org/apache/iotdb/db/engine/merge/selector/MaxFileMergeFileSelector.java

+1-7
Original file line numberDiff line numberDiff line change
@@ -240,7 +240,7 @@ private void selectOverlappedSeqFiles(TsFileResource unseqFile) {
240240
boolean noMoreOverlap = false;
241241
for (int i = 0; i < resource.getSeqFiles().size() && !noMoreOverlap; i++) {
242242
TsFileResource seqFile = resource.getSeqFiles().get(i);
243-
if (!seqFile.getDevices().contains(deviceId)) {
243+
if (tmpSelectedSeqFiles.contains(i) || !seqFile.getDevices().contains(deviceId)) {
244244
continue;
245245
}
246246

@@ -257,23 +257,17 @@ private void selectOverlappedSeqFiles(TsFileResource unseqFile) {
257257
} else if (!seqFile.isClosed()) {
258258
// we cannot make sure whether unclosed file has overlap or not, so we just add it.
259259
tmpSelectedSeqFiles.add(i);
260-
tmpSelectedNum++;
261260
} else if (unseqEndTime <= seqEndTime) {
262261
// if time range in unseq file is 10-20, seq file is 15-25, then select this seq file and
263262
// there is no more overlap later.
264263
tmpSelectedSeqFiles.add(i);
265-
tmpSelectedNum++;
266264
noMoreOverlap = true;
267265
} else if (unseqStartTime <= seqEndTime) {
268266
// if time range in unseq file is 10-20, seq file is 0-15, then select this seq file and
269267
// there may be overlap later.
270268
tmpSelectedSeqFiles.add(i);
271-
tmpSelectedNum++;
272269
}
273270
}
274-
if (tmpSelectedNum + seqSelectedNum == resource.getSeqFiles().size()) {
275-
break;
276-
}
277271
}
278272
}
279273

server/src/test/java/org/apache/iotdb/db/engine/merge/MaxFileMergeFileSelectorTest.java

+228
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,11 @@
2929
import org.apache.iotdb.db.engine.storagegroup.timeindex.ITimeIndex;
3030
import org.apache.iotdb.db.exception.MergeException;
3131
import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
32+
import org.apache.iotdb.tsfile.read.common.Path;
33+
import org.apache.iotdb.tsfile.write.TsFileWriter;
34+
import org.apache.iotdb.tsfile.write.record.TSRecord;
35+
import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint;
36+
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
3237

3338
import org.junit.Assert;
3439
import org.junit.Test;
@@ -633,4 +638,227 @@ public void testAllUnseqFilesOverlappedWithSeqFileOpen()
633638
Assert.assertEquals(3, result[0].size());
634639
Assert.assertEquals(2, result[1].size());
635640
}
641+
642+
@Test
643+
public void testMultiFileOverlapWithOneFile()
644+
throws IOException, WriteProcessException, MergeException {
645+
List<TsFileResource> seqList = new ArrayList<>();
646+
List<TsFileResource> unseqList = new ArrayList<>();
647+
// first file [0, 10]
648+
// first device [0, 5]
649+
// second device [0, 10]
650+
File firstFile =
651+
new File(
652+
TestConstant.OUTPUT_DATA_DIR.concat(
653+
1
654+
+ IoTDBConstant.FILE_NAME_SEPARATOR
655+
+ 1
656+
+ IoTDBConstant.FILE_NAME_SEPARATOR
657+
+ 0
658+
+ IoTDBConstant.FILE_NAME_SEPARATOR
659+
+ 0
660+
+ ".tsfile"));
661+
TsFileResource firstTsFileResource = new TsFileResource(firstFile);
662+
firstTsFileResource.setClosed(true);
663+
firstTsFileResource.setMinPlanIndex(1);
664+
firstTsFileResource.setMaxPlanIndex(1);
665+
firstTsFileResource.setVersion(1);
666+
seqList.add(firstTsFileResource);
667+
if (!firstFile.getParentFile().exists()) {
668+
Assert.assertTrue(firstFile.getParentFile().mkdirs());
669+
}
670+
671+
TsFileWriter fileWriter = new TsFileWriter(firstFile);
672+
for (String deviceId : deviceIds) {
673+
for (MeasurementSchema measurementSchema : measurementSchemas) {
674+
fileWriter.registerTimeseries(
675+
new Path(deviceId, measurementSchema.getMeasurementId()), measurementSchema);
676+
}
677+
}
678+
for (long i = 0; i < 10; ++i) {
679+
for (int j = 0; j < deviceNum; j++) {
680+
if (j == 3 && i > 5) {
681+
continue;
682+
}
683+
TSRecord record = new TSRecord(i, deviceIds[j]);
684+
for (int k = 0; k < measurementNum; ++k) {
685+
record.addTuple(
686+
DataPoint.getDataPoint(
687+
measurementSchemas[k].getType(),
688+
measurementSchemas[k].getMeasurementId(),
689+
String.valueOf(i)));
690+
}
691+
fileWriter.write(record);
692+
firstTsFileResource.updateStartTime(deviceIds[j], i);
693+
firstTsFileResource.updateEndTime(deviceIds[j], i);
694+
}
695+
}
696+
697+
fileWriter.flushAllChunkGroups();
698+
fileWriter.close();
699+
700+
// second file time range: [11, 20]
701+
// first measurement: [11, 20]
702+
// second measurement: [11, 20]
703+
File secondFile =
704+
new File(
705+
TestConstant.OUTPUT_DATA_DIR.concat(
706+
2
707+
+ IoTDBConstant.FILE_NAME_SEPARATOR
708+
+ 2
709+
+ IoTDBConstant.FILE_NAME_SEPARATOR
710+
+ 0
711+
+ IoTDBConstant.FILE_NAME_SEPARATOR
712+
+ 0
713+
+ ".tsfile"));
714+
TsFileResource secondTsFileResource = new TsFileResource(secondFile);
715+
secondTsFileResource.setClosed(true);
716+
secondTsFileResource.setMinPlanIndex(2);
717+
secondTsFileResource.setMaxPlanIndex(2);
718+
secondTsFileResource.setVersion(2);
719+
seqList.add(secondTsFileResource);
720+
721+
if (!secondFile.getParentFile().exists()) {
722+
Assert.assertTrue(secondFile.getParentFile().mkdirs());
723+
}
724+
fileWriter = new TsFileWriter(secondFile);
725+
for (String deviceId : deviceIds) {
726+
for (MeasurementSchema measurementSchema : measurementSchemas) {
727+
fileWriter.registerTimeseries(
728+
new Path(deviceId, measurementSchema.getMeasurementId()), measurementSchema);
729+
}
730+
}
731+
for (long i = 11; i < 21; ++i) {
732+
for (int j = 0; j < deviceNum; j++) {
733+
TSRecord record = new TSRecord(i, deviceIds[j]);
734+
for (int k = 0; k < measurementNum; k++) {
735+
record.addTuple(
736+
DataPoint.getDataPoint(
737+
measurementSchemas[k].getType(),
738+
measurementSchemas[k].getMeasurementId(),
739+
String.valueOf(i)));
740+
}
741+
fileWriter.write(record);
742+
secondTsFileResource.updateStartTime(deviceIds[j], i);
743+
secondTsFileResource.updateEndTime(deviceIds[j], i);
744+
}
745+
}
746+
fileWriter.flushAllChunkGroups();
747+
fileWriter.close();
748+
749+
// unseq file: [0, 1]
750+
File thirdFile =
751+
new File(
752+
TestConstant.OUTPUT_DATA_DIR.concat(
753+
3
754+
+ IoTDBConstant.FILE_NAME_SEPARATOR
755+
+ 3
756+
+ IoTDBConstant.FILE_NAME_SEPARATOR
757+
+ 0
758+
+ IoTDBConstant.FILE_NAME_SEPARATOR
759+
+ 0
760+
+ ".tsfile"));
761+
TsFileResource thirdTsFileResource = new TsFileResource(thirdFile);
762+
thirdTsFileResource.setClosed(true);
763+
thirdTsFileResource.setMinPlanIndex(3);
764+
thirdTsFileResource.setMaxPlanIndex(3);
765+
thirdTsFileResource.setVersion(3);
766+
unseqList.add(thirdTsFileResource);
767+
768+
if (!secondFile.getParentFile().exists()) {
769+
Assert.assertTrue(thirdFile.getParentFile().mkdirs());
770+
}
771+
fileWriter = new TsFileWriter(thirdFile);
772+
for (String deviceId : deviceIds) {
773+
for (MeasurementSchema measurementSchema : measurementSchemas) {
774+
fileWriter.registerTimeseries(
775+
new Path(deviceId, measurementSchema.getMeasurementId()), measurementSchema);
776+
}
777+
}
778+
for (long i = 0; i < 2; ++i) {
779+
for (int j = 0; j < deviceNum; j++) {
780+
TSRecord record = new TSRecord(i, deviceIds[j]);
781+
for (int k = 0; k < measurementNum; k++) {
782+
record.addTuple(
783+
DataPoint.getDataPoint(
784+
measurementSchemas[k].getType(),
785+
measurementSchemas[k].getMeasurementId(),
786+
String.valueOf(i)));
787+
}
788+
fileWriter.write(record);
789+
thirdTsFileResource.updateStartTime(deviceIds[j], i);
790+
thirdTsFileResource.updateEndTime(deviceIds[j], i);
791+
}
792+
}
793+
fileWriter.flushAllChunkGroups();
794+
fileWriter.close();
795+
796+
// unseq file: [6, 8]
797+
File fourthFile =
798+
new File(
799+
TestConstant.OUTPUT_DATA_DIR.concat(
800+
4
801+
+ IoTDBConstant.FILE_NAME_SEPARATOR
802+
+ 4
803+
+ IoTDBConstant.FILE_NAME_SEPARATOR
804+
+ 0
805+
+ IoTDBConstant.FILE_NAME_SEPARATOR
806+
+ 0
807+
+ ".tsfile"));
808+
TsFileResource fourthTsFileResource = new TsFileResource(fourthFile);
809+
fourthTsFileResource.setClosed(true);
810+
fourthTsFileResource.setMinPlanIndex(4);
811+
fourthTsFileResource.setMaxPlanIndex(4);
812+
fourthTsFileResource.setVersion(4);
813+
unseqList.add(fourthTsFileResource);
814+
815+
if (!fourthFile.getParentFile().exists()) {
816+
Assert.assertTrue(fourthFile.getParentFile().mkdirs());
817+
}
818+
fileWriter = new TsFileWriter(fourthFile);
819+
for (String deviceId : deviceIds) {
820+
for (MeasurementSchema measurementSchema : measurementSchemas) {
821+
fileWriter.registerTimeseries(
822+
new Path(deviceId, measurementSchema.getMeasurementId()), measurementSchema);
823+
}
824+
}
825+
for (long i = 6; i < 15; ++i) {
826+
for (int j = 0; j < deviceNum; j++) {
827+
if (j == 3) {
828+
continue;
829+
}
830+
TSRecord record = new TSRecord(i, deviceIds[j]);
831+
for (int k = 0; k < measurementNum; k++) {
832+
record.addTuple(
833+
DataPoint.getDataPoint(
834+
measurementSchemas[k].getType(),
835+
measurementSchemas[k].getMeasurementId(),
836+
String.valueOf(i)));
837+
}
838+
fileWriter.write(record);
839+
fourthTsFileResource.updateStartTime(deviceIds[j], i);
840+
fourthTsFileResource.updateEndTime(deviceIds[j], i);
841+
}
842+
}
843+
TSRecord record = new TSRecord(1, deviceIds[3]);
844+
for (int k = 0; k < measurementNum; k++) {
845+
record.addTuple(
846+
DataPoint.getDataPoint(
847+
measurementSchemas[k].getType(),
848+
measurementSchemas[k].getMeasurementId(),
849+
String.valueOf(1)));
850+
}
851+
fileWriter.write(record);
852+
fourthTsFileResource.updateStartTime(deviceIds[3], 1);
853+
fourthTsFileResource.updateEndTime(deviceIds[3], 1);
854+
fileWriter.flushAllChunkGroups();
855+
fileWriter.close();
856+
857+
MergeResource resource = new MergeResource(seqList, unseqList);
858+
IMergeFileSelector mergeFileSelector =
859+
new MaxFileMergeFileSelector(resource, 500 * 1024 * 1024);
860+
List[] result = mergeFileSelector.select();
861+
Assert.assertEquals(2, result[0].size());
862+
Assert.assertEquals(2, result[1].size());
863+
}
636864
}

0 commit comments

Comments
 (0)