Skip to content

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,6 @@ trait SparkAdapter extends Serializable {
def createRelation(sqlContext: SQLContext,
metaClient: HoodieTableMetaClient,
schema: Schema,
globPaths: Array[StoragePath],
parameters: java.util.Map[String, String]): BaseRelation

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ case class BaseFileOnlyRelation(override val sqlContext: SQLContext,
override val metaClient: HoodieTableMetaClient,
override val optParams: Map[String, String],
private val userSchema: Option[StructType],
private val globPaths: Seq[StoragePath],
private val prunedDataSchema: Option[StructType] = None)
extends HoodieBaseRelation(sqlContext, metaClient, optParams, userSchema, prunedDataSchema)
with SparkAdapterSupport {
Expand Down Expand Up @@ -110,7 +109,7 @@ case class BaseFileOnlyRelation(override val sqlContext: SQLContext,
}

protected def collectFileSplits(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[HoodieBaseFileSplit] = {
val fileSlices = listLatestFileSlices(globPaths, partitionFilters, dataFilters)
val fileSlices = listLatestFileSlices(partitionFilters, dataFilters)
val fileSplits = fileSlices.flatMap { fileSlice =>
// TODO fix, currently assuming parquet as underlying format
val pathInfo: StoragePathInfo = fileSlice.getBaseFile.get.getPathInfo
Expand Down Expand Up @@ -139,7 +138,7 @@ case class BaseFileOnlyRelation(override val sqlContext: SQLContext,
def toHadoopFsRelation: HadoopFsRelation = {
val enableFileIndex = HoodieSparkConfUtils.getConfigValue(optParams, sparkSession.sessionState.conf,
ENABLE_HOODIE_FILE_INDEX.key, ENABLE_HOODIE_FILE_INDEX.defaultValue.toString).toBoolean
if (enableFileIndex && globPaths.isEmpty) {
if (enableFileIndex) {
// NOTE: There are currently 2 ways partition values could be fetched:
// - Source columns (producing the values used for physical partitioning) will be read
// from the data file
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ object DataSourceReadOptions {
val REALTIME_PAYLOAD_COMBINE_OPT_VAL = HoodieReaderConfig.REALTIME_PAYLOAD_COMBINE
val REALTIME_MERGE: ConfigProperty[String] = HoodieReaderConfig.MERGE_TYPE

@Deprecated
val READ_PATHS: ConfigProperty[String] = ConfigProperty
.key("hoodie.datasource.read.paths")
.noDefaultValue()
Expand All @@ -88,6 +89,7 @@ object DataSourceReadOptions {
@Deprecated
val READ_PRE_COMBINE_FIELD = HoodieWriteConfig.PRECOMBINE_FIELD_NAME

@Deprecated
val ENABLE_HOODIE_FILE_INDEX: ConfigProperty[Boolean] = ConfigProperty
.key("hoodie.file.index.enable")
.defaultValue(true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import org.apache.hudi.hadoop.fs.HadoopFSUtils
import org.apache.hudi.io.storage.HoodieSparkIOFactory
import org.apache.hudi.storage.{HoodieStorageUtils, StoragePath}
import org.apache.hudi.storage.hadoop.HoodieHadoopStorage
import org.apache.hudi.util.{PathUtils, SparkConfigUtils}
import org.apache.hudi.util.{SparkConfigUtils}

import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession, SQLContext}
import org.apache.spark.sql.execution.streaming.{Sink, Source}
Expand Down Expand Up @@ -107,27 +107,15 @@ class DefaultSource extends RelationProvider
val storage = HoodieStorageUtils.getStorage(
allPaths.head, HadoopFSUtils.getStorageConf(sqlContext.sparkContext.hadoopConfiguration))

val globPaths = if (path.exists(_.contains("*")) || readPaths.nonEmpty) {
PathUtils.checkAndGlobPathIfNecessary(allPaths, storage)
} else {
Seq.empty
if (path.exists(_.contains("*")) || readPaths.nonEmpty) {
throw new HoodieException("Glob paths are not supported for read paths as of Hudi 1.2.0")
}

// Add default options for unspecified read options keys.
val parameters = (if (globPaths.nonEmpty) {
Map(
"glob.paths" -> globPaths.mkString(",")
)
} else {
Map()
}) ++ DataSourceOptionsHelper.parametersWithReadDefaults(sqlContext.getAllConfs.filter(k => k._1.startsWith("hoodie.")) ++ optParams)
val parameters = DataSourceOptionsHelper.parametersWithReadDefaults(sqlContext.getAllConfs.filter(k => k._1.startsWith("hoodie.")) ++ optParams)

// Get the table base path
val tablePath = if (globPaths.nonEmpty) {
DataSourceUtils.getTablePath(storage, globPaths.asJava)
} else {
DataSourceUtils.getTablePath(storage, Seq(new StoragePath(path.get)).asJava)
}
val tablePath = DataSourceUtils.getTablePath(storage, Seq(new StoragePath(path.get)).asJava)
log.info("Obtained hudi table path: " + tablePath)

val metaClient = HoodieTableMetaClient.builder().setMetaserverConfig(parameters.toMap.asJava)
Expand All @@ -141,7 +129,7 @@ class DefaultSource extends RelationProvider
parameters
}

DefaultSource.createRelation(sqlContext, metaClient, schema, globPaths, options.toMap)
DefaultSource.createRelation(sqlContext, metaClient, schema, options.toMap)
}

/**
Expand Down Expand Up @@ -279,7 +267,6 @@ object DefaultSource {
def createRelation(sqlContext: SQLContext,
metaClient: HoodieTableMetaClient,
schema: StructType,
globPaths: Seq[StoragePath],
parameters: Map[String, String]): BaseRelation = {
val tableType = metaClient.getTableType
val isBootstrappedTable = metaClient.getTableConfig.getBootstrapBasePath.isPresent
Expand Down Expand Up @@ -308,8 +295,10 @@ object DefaultSource {
}

lazy val enableFileGroupReader = SparkConfigUtils
.getStringWithAltKeys(parameters, HoodieReaderConfig.FILE_GROUP_READER_ENABLED).toBoolean &&
!metaClient.isMetadataTable && (globPaths == null || globPaths.isEmpty)
.getStringWithAltKeys(parameters, HoodieReaderConfig.FILE_GROUP_READER_ENABLED).toBoolean
if (!enableFileGroupReader) {
throw new IllegalArgumentException("File group reader is disabled")
}
lazy val tableVersion = if (SparkConfigUtils.containsConfigProperty(parameters, INCREMENTAL_READ_TABLE_VERSION)) {
Integer.parseInt(parameters(INCREMENTAL_READ_TABLE_VERSION.key))
} else {
Expand Down Expand Up @@ -340,7 +329,7 @@ object DefaultSource {
new HoodieCopyOnWriteSnapshotHadoopFsRelationFactory(
sqlContext, metaClient, parameters, userSchema, isBootstrap = false).build()
} else {
resolveBaseFileOnlyRelation(sqlContext, globPaths, userSchema, metaClient, parameters)
resolveBaseFileOnlyRelation(sqlContext, userSchema, metaClient, parameters)
}
case (COPY_ON_WRITE, QUERY_TYPE_INCREMENTAL_OPT_VAL, _) =>
(hoodieTableSupportsCompletionTime, enableFileGroupReader) match {
Expand All @@ -357,15 +346,15 @@ object DefaultSource {
new HoodieMergeOnReadSnapshotHadoopFsRelationFactory(
sqlContext, metaClient, parameters, userSchema, isBootstrap = false).build()
} else {
new MergeOnReadSnapshotRelation(sqlContext, parameters, metaClient, globPaths, userSchema)
new MergeOnReadSnapshotRelation(sqlContext, parameters, metaClient, userSchema)
}

case (MERGE_ON_READ, QUERY_TYPE_SNAPSHOT_OPT_VAL, true) =>
if (enableFileGroupReader) {
new HoodieMergeOnReadSnapshotHadoopFsRelationFactory(
sqlContext, metaClient, parameters, userSchema, isBootstrap = true).build()
} else {
HoodieBootstrapMORRelation(sqlContext, userSchema, globPaths, metaClient, parameters)
HoodieBootstrapMORRelation(sqlContext, userSchema, metaClient, parameters)
}

case (MERGE_ON_READ, QUERY_TYPE_INCREMENTAL_OPT_VAL, _) =>
Expand All @@ -383,7 +372,7 @@ object DefaultSource {
new HoodieCopyOnWriteSnapshotHadoopFsRelationFactory(
sqlContext, metaClient, parameters, userSchema, isBootstrap = true).build()
} else {
resolveHoodieBootstrapRelation(sqlContext, globPaths, userSchema, metaClient, parameters)
resolveHoodieBootstrapRelation(sqlContext, userSchema, metaClient, parameters)
}

case (_, _, _) =>
Expand All @@ -395,7 +384,6 @@ object DefaultSource {
}

private def resolveHoodieBootstrapRelation(sqlContext: SQLContext,
globPaths: Seq[StoragePath],
userSchema: Option[StructType],
metaClient: HoodieTableMetaClient,
parameters: Map[String, String]): BaseRelation = {
Expand All @@ -404,20 +392,18 @@ object DefaultSource {
val isSchemaEvolutionEnabledOnRead = HoodieSparkConfUtils.getConfigValue(parameters,
sqlContext.sparkSession.sessionState.conf, DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key,
DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.defaultValue.toString).toBoolean
if (!enableFileIndex || isSchemaEvolutionEnabledOnRead
|| globPaths.nonEmpty || !parameters.getOrElse(DATA_QUERIES_ONLY.key, DATA_QUERIES_ONLY.defaultValue).toBoolean) {
HoodieBootstrapRelation(sqlContext, userSchema, globPaths, metaClient, parameters + (DATA_QUERIES_ONLY.key() -> "false"))
if (!enableFileIndex || isSchemaEvolutionEnabledOnRead || !parameters.getOrElse(DATA_QUERIES_ONLY.key, DATA_QUERIES_ONLY.defaultValue).toBoolean) {
HoodieBootstrapRelation(sqlContext, userSchema, metaClient, parameters + (DATA_QUERIES_ONLY.key() -> "false"))
} else {
HoodieBootstrapRelation(sqlContext, userSchema, globPaths, metaClient, parameters).toHadoopFsRelation
HoodieBootstrapRelation(sqlContext, userSchema, metaClient, parameters).toHadoopFsRelation
}
}

private def resolveBaseFileOnlyRelation(sqlContext: SQLContext,
globPaths: Seq[StoragePath],
userSchema: Option[StructType],
metaClient: HoodieTableMetaClient,
optParams: Map[String, String]): BaseRelation = {
val baseRelation = new BaseFileOnlyRelation(sqlContext, metaClient, optParams, userSchema, globPaths)
val baseRelation = new BaseFileOnlyRelation(sqlContext, metaClient, optParams, userSchema)

// NOTE: We fallback to [[HadoopFsRelation]] in all of the cases except ones requiring usage of
// [[BaseFileOnlyRelation]] to function correctly. This is necessary to maintain performance parity w/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.hudi.HoodieConversionUtils.toScalaOption
import org.apache.hudi.avro.HoodieAvroUtils
import org.apache.hudi.avro.HoodieAvroUtils.createNewSchemaField
import org.apache.hudi.client.utils.SparkInternalSchemaConverter
import org.apache.hudi.common.config.{ConfigProperty, HoodieConfig, HoodieMetadataConfig}
import org.apache.hudi.common.config.{ConfigProperty, HoodieMetadataConfig}
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.fs.FSUtils.getRelativePartitionPath
import org.apache.hudi.common.model.{FileSlice, HoodieFileFormat, HoodieRecord}
Expand All @@ -33,7 +33,7 @@ import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, T
import org.apache.hudi.common.table.timeline.{HoodieTimeline, TimelineLayout}
import org.apache.hudi.common.table.timeline.TimelineUtils.validateTimestampAsOf
import org.apache.hudi.common.table.view.HoodieTableFileSystemView
import org.apache.hudi.common.util.{ConfigUtils, StringUtils}
import org.apache.hudi.common.util.ConfigUtils
import org.apache.hudi.common.util.StringUtils.isNullOrEmpty
import org.apache.hudi.common.util.ValidationUtils.checkState
import org.apache.hudi.config.HoodieBootstrapConfig.DATA_QUERIES_ONLY
Expand All @@ -55,7 +55,6 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.mapred.JobConf
import org.apache.spark.SerializableWritable
import org.apache.spark.execution.datasources.HoodieInMemoryFileIndex
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Row, SparkSession, SQLContext}
Expand Down Expand Up @@ -412,18 +411,12 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
*/
protected def collectFileSplits(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[FileSplit]

protected def listLatestFileSlices(globPaths: Seq[StoragePath], partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[FileSlice] = {
protected def listLatestFileSlices(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[FileSlice] = {
queryTimestamp match {
case Some(ts) =>
specifiedQueryTimestamp.foreach(t => validateTimestampAsOf(metaClient, t))

val partitionDirs = if (globPaths.isEmpty) {
fileIndex.listFiles(partitionFilters, dataFilters)
} else {
val inMemoryFileIndex = HoodieInMemoryFileIndex.create(sparkSession, globPaths)
inMemoryFileIndex.listFiles(partitionFilters, dataFilters)
}

val partitionDirs = fileIndex.listFiles(partitionFilters, dataFilters)
val fsView = new HoodieTableFileSystemView(
metaClient, timeline, sparkAdapter.getSparkPartitionedFileUtils.toFileStatuses(partitionDirs)
.map(fileStatus => HadoopFSUtils.convertToStoragePathInfo(fileStatus))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,10 @@ case class HoodieBootstrapMORSplit(dataFile: PartitionedFile, skeletonFile: Opti
*/
case class HoodieBootstrapMORRelation(override val sqlContext: SQLContext,
private val userSchema: Option[StructType],
private val globPaths: Seq[StoragePath],
override val metaClient: HoodieTableMetaClient,
override val optParams: Map[String, String],
private val prunedDataSchema: Option[StructType] = None)
extends BaseHoodieBootstrapRelation(sqlContext, userSchema, globPaths, metaClient,
extends BaseHoodieBootstrapRelation(sqlContext, userSchema, metaClient,
optParams, prunedDataSchema) {

override type Relation = HoodieBootstrapMORRelation
Expand All @@ -69,12 +68,8 @@ case class HoodieBootstrapMORRelation(override val sqlContext: SQLContext,
override lazy val mandatoryFields: Seq[String] = mandatoryFieldsForMerging

protected override def getFileSlices(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[FileSlice] = {
if (globPaths.isEmpty) {
fileIndex.listFileSlices(HoodieFileIndex.
convertFilterForTimestampKeyGenerator(metaClient, partitionFilters)).values.flatten.toSeq
} else {
listLatestFileSlices(globPaths, partitionFilters, dataFilters)
}
fileIndex.listFileSlices(HoodieFileIndex.
convertFilterForTimestampKeyGenerator(metaClient, partitionFilters)).values.flatten.toSeq
}

protected override def createFileSplit(fileSlice: FileSlice, dataFile: PartitionedFile, skeletonFile: Option[PartitionedFile]): FileSplit = {
Expand Down
Loading
Loading