diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionExtractor.java index 181c2e440f3e..f2a87f6da3ed 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionExtractor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionExtractor.java @@ -549,11 +549,11 @@ private void extractTsFiles( && ( // Some resource may not be closed due to the control of // PIPE_MIN_FLUSH_INTERVAL_IN_MS. We simply ignore them. - !resource.isClosed() - || mayTsFileContainUnprocessedData(resource) - && isTsFileResourceOverlappedWithTimeRange(resource) - && isTsFileGeneratedAfterExtractionTimeLowerBound(resource) - && mayTsFileResourceOverlappedWithPattern(resource))) + resource.getProcessor().alreadyMarkedClosing() + && mayTsFileContainUnprocessedData(resource) + && isTsFileResourceOverlappedWithTimeRange(resource) + && isTsFileGeneratedAfterExtractionTimeLowerBound(resource) + && mayTsFileResourceOverlappedWithPattern(resource))) .collect(Collectors.toList()); resourceList.addAll(sequenceTsFileResources); @@ -566,11 +566,11 @@ && mayTsFileResourceOverlappedWithPattern(resource))) && ( // Some resource may not be closed due to the control of // PIPE_MIN_FLUSH_INTERVAL_IN_MS. We simply ignore them. - !resource.isClosed() - || mayTsFileContainUnprocessedData(resource) - && isTsFileResourceOverlappedWithTimeRange(resource) - && isTsFileGeneratedAfterExtractionTimeLowerBound(resource) - && mayTsFileResourceOverlappedWithPattern(resource))) + resource.getProcessor().alreadyMarkedClosing() + && mayTsFileContainUnprocessedData(resource) + && isTsFileResourceOverlappedWithTimeRange(resource) + && isTsFileGeneratedAfterExtractionTimeLowerBound(resource) + && mayTsFileResourceOverlappedWithPattern(resource))) .collect(Collectors.toList()); resourceList.addAll(unsequenceTsFileResources); @@ -619,13 +619,13 @@ private boolean mayTsFileContainUnprocessedData(final TsFileResource resource) { // "equals" max progressIndex must be transmitted to avoid data loss final ProgressIndex innerProgressIndex = ((StateProgressIndex) startIndex).getInnerProgressIndex(); - return !innerProgressIndex.isAfter(resource.getMaxProgressIndexAfterClose()) - && !innerProgressIndex.equals(resource.getMaxProgressIndexAfterClose()); + return !innerProgressIndex.isAfter(resource.getMaxProgressIndex()) + && !innerProgressIndex.equals(resource.getMaxProgressIndex()); } // Some different tsFiles may share the same max progressIndex, thus tsFiles with an // "equals" max progressIndex must be transmitted to avoid data loss - return !startIndex.isAfter(resource.getMaxProgressIndexAfterClose()); + return !startIndex.isAfter(resource.getMaxProgressIndex()); } private boolean mayTsFileResourceOverlappedWithPattern(final TsFileResource resource) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java index 861b72c923a7..86ddf8e7bda8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java @@ -1294,6 +1294,11 @@ public Future asyncClose() { IMemTable tmpMemTable = workMemTable == null ? new NotifyFlushMemTable() : workMemTable; try { + // When invoke closing TsFile after insert data to memTable, we shouldn't flush until invoke + // flushing memTable in System module. + Future future = addAMemtableIntoFlushingList(tmpMemTable); + shouldClose = true; + PipeInsertionDataNodeListener.getInstance() .listenToTsFile( dataRegionInfo.getDataRegion().getDataRegionId(), @@ -1301,11 +1306,6 @@ public Future asyncClose() { tsFileResource, false, tmpMemTable.isTotallyGeneratedByPipe()); - - // When invoke closing TsFile after insert data to memTable, we shouldn't flush until invoke - // flushing memTable in System module. - Future future = addAMemtableIntoFlushingList(tmpMemTable); - shouldClose = true; return future; } catch (Exception e) { logger.error(