Skip to content

Commit c0a5e09

Browse files
committed
[core] Disable PARTITION_IDLE_TIME_TO_REPORT_STATISTIC by default
1 parent 19cf86d commit c0a5e09

File tree

3 files changed

+16
-34
lines changed

3 files changed

+16
-34
lines changed

docs/layouts/shortcodes/generated/core_configuration.html

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -703,7 +703,7 @@
703703
</tr>
704704
<tr>
705705
<td><h5>partition.idle-time-to-report-statistic</h5></td>
706-
<td style="word-wrap: break-word;">1 h</td>
706+
<td style="word-wrap: break-word;">0 ms</td>
707707
<td>Duration</td>
708708
<td>Set a time duration when a partition has no new data after this time duration, start to report the partition statistics to hms.</td>
709709
</tr>

paimon-common/src/main/java/org/apache/paimon/CoreOptions.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1640,7 +1640,7 @@ public class CoreOptions implements Serializable {
16401640
public static final ConfigOption<Duration> PARTITION_IDLE_TIME_TO_REPORT_STATISTIC =
16411641
key("partition.idle-time-to-report-statistic")
16421642
.durationType()
1643-
.defaultValue(Duration.ofHours(1))
1643+
.defaultValue(Duration.ofMillis(0))
16441644
.withDescription(
16451645
"Set a time duration when a partition has no new data after this time duration, "
16461646
+ "start to report the partition statistics to hms.");

paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala

Lines changed: 14 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,8 @@ import org.apache.paimon.deletionvectors.DeletionVector
2727
import org.apache.paimon.deletionvectors.append.BaseAppendDeleteFileMaintainer
2828
import org.apache.paimon.index.{BucketAssigner, SimpleHashBucketAssigner}
2929
import org.apache.paimon.io.{CompactIncrement, DataIncrement, IndexIncrement}
30-
import org.apache.paimon.manifest.{FileKind, IndexManifestEntry}
31-
import org.apache.paimon.spark.{SparkInternalRowWrapper, SparkRow, SparkTableWrite, SparkTypeUtils}
30+
import org.apache.paimon.manifest.FileKind
31+
import org.apache.paimon.spark.{SparkRow, SparkTableWrite, SparkTypeUtils}
3232
import org.apache.paimon.spark.schema.SparkSystemColumns.{BUCKET_COL, ROW_KIND_COL}
3333
import org.apache.paimon.spark.util.OptionUtils.paimonExtensionEnabled
3434
import org.apache.paimon.spark.util.SparkRowUtils
@@ -40,10 +40,8 @@ import org.apache.paimon.utils.{InternalRowPartitionComputer, PartitionPathUtils
4040

4141
import org.apache.spark.{Partitioner, TaskContext}
4242
import org.apache.spark.rdd.RDD
43-
import org.apache.spark.sql.{Column, DataFrame, Dataset, Row, SparkSession}
44-
import org.apache.spark.sql.catalyst.InternalRow
43+
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
4544
import org.apache.spark.sql.functions._
46-
import org.apache.spark.sql.types.StructType
4745
import org.slf4j.LoggerFactory
4846

4947
import java.io.IOException
@@ -59,6 +57,15 @@ case class PaimonSparkWriter(table: FileStoreTable) {
5957

6058
private lazy val bucketMode = table.bucketMode
6159

60+
private lazy val disableReportStats = {
61+
val options = table.coreOptions()
62+
val config = options.toConfiguration
63+
config.get(CoreOptions.PARTITION_IDLE_TIME_TO_REPORT_STATISTIC).toMillis <= 0 ||
64+
table.partitionKeys.isEmpty ||
65+
!options.partitionedTableInMetastore ||
66+
table.catalogEnvironment.partitionHandler() == null
67+
}
68+
6269
private lazy val log = LoggerFactory.getLogger(classOf[PaimonSparkWriter])
6370

6471
@transient private lazy val serializer = new CommitMessageSerializer
@@ -326,37 +333,12 @@ case class PaimonSparkWriter(table: FileStoreTable) {
326333
.map(deserializeCommitMessage(serializer, _))
327334
}
328335

329-
def buildCommitMessageFromIndexManifestEntry(
330-
indexManifestEntries: Seq[IndexManifestEntry]): Seq[CommitMessage] = {
331-
indexManifestEntries
332-
.groupBy(entry => (entry.partition(), entry.bucket()))
333-
.map {
334-
case ((partition, bucket), entries) =>
335-
val (added, removed) = entries.partition(_.kind() == FileKind.ADD)
336-
new CommitMessageImpl(
337-
partition,
338-
bucket,
339-
null,
340-
DataIncrement.emptyIncrement(),
341-
CompactIncrement.emptyIncrement(),
342-
new IndexIncrement(added.map(_.indexFile()).asJava, removed.map(_.indexFile()).asJava))
343-
}
344-
.toSeq
345-
}
346-
347336
private def reportToHms(messages: Seq[CommitMessage]): Unit = {
348-
val options = table.coreOptions()
349-
val config = options.toConfiguration
350-
351-
if (
352-
config.get(CoreOptions.PARTITION_IDLE_TIME_TO_REPORT_STATISTIC).toMillis <= 0 ||
353-
table.partitionKeys.isEmpty ||
354-
!options.partitionedTableInMetastore ||
355-
table.catalogEnvironment.partitionHandler() == null
356-
) {
337+
if (disableReportStats) {
357338
return
358339
}
359340

341+
val options = table.coreOptions()
360342
val partitionComputer = new InternalRowPartitionComputer(
361343
options.partitionDefaultName,
362344
table.schema.logicalPartitionType,

0 commit comments

Comments
 (0)