diff --git a/core/src/main/scala/com/microsoft/azure/synapse/ml/automl/TuneHyperparameters.scala b/core/src/main/scala/com/microsoft/azure/synapse/ml/automl/TuneHyperparameters.scala index b895a533992..b6b1eef9281 100644 --- a/core/src/main/scala/com/microsoft/azure/synapse/ml/automl/TuneHyperparameters.scala +++ b/core/src/main/scala/com/microsoft/azure/synapse/ml/automl/TuneHyperparameters.scala @@ -17,7 +17,6 @@ import org.apache.spark.ml.classification.ClassificationModel import org.apache.spark.ml.param._ import org.apache.spark.ml.regression.RegressionModel import org.apache.spark.ml.util._ -import org.apache.spark.mllib.util.MLUtils import org.apache.spark.sql._ import org.apache.spark.sql.types.StructType @@ -149,9 +148,21 @@ class TuneHyperparameters(override val uid: String) extends Estimator[TuneHyperp override def fit(dataset: Dataset[_]): TuneHyperparametersModel = { //scalastyle:ignore cyclomatic.complexity logFit({ val sparkSession = dataset.sparkSession - val splits = MLUtils.kFold(dataset.toDF.rdd, getNumFolds, getSeed) + import org.apache.spark.sql.functions.{rand, lit} + val df = dataset.toDF + val nFolds = getNumFolds + // DataFrame-based k-fold splitting: assign each row to a fold using hash of random value + val dfWithFold = df.withColumn("_kfold_rand", rand(getSeed)) + val splits = (0 until nFolds).map { fold => + val training = dfWithFold + .filter((dfWithFold("_kfold_rand") * lit(nFolds)).cast("int") =!= lit(fold)) + .drop("_kfold_rand") + val validation = dfWithFold + .filter((dfWithFold("_kfold_rand") * lit(nFolds)).cast("int") === lit(fold)) + .drop("_kfold_rand") + (training, validation) + }.toArray val hyperParams = getParamSpace.paramMaps - val schema = dataset.schema val executionContext = getExecutionContext val (evaluationMetricColumnName, operator): (String, Ordering[Double]) = EvaluationUtils.getMetricWithOperator(getModels.head, getEvaluationMetric) @@ -163,8 +174,8 @@ class TuneHyperparameters(override val uid: String) extends Estimator[TuneHyperp val numModels = getModels.length val metrics = splits.zipWithIndex.map { case ((training, validation), _) => - val trainingDataset = sparkSession.createDataFrame(training, schema).cache() - val validationDataset = sparkSession.createDataFrame(validation, schema).cache() + val trainingDataset = training.cache() + val validationDataset = validation.cache() val modelParams = ListBuffer[ParamMap]() for (n <- 0 until getNumRuns) { diff --git a/core/src/main/scala/com/microsoft/azure/synapse/ml/causal/SyntheticEstimator.scala b/core/src/main/scala/com/microsoft/azure/synapse/ml/causal/SyntheticEstimator.scala index 06fbbe20412..be0862811fe 100644 --- a/core/src/main/scala/com/microsoft/azure/synapse/ml/causal/SyntheticEstimator.scala +++ b/core/src/main/scala/com/microsoft/azure/synapse/ml/causal/SyntheticEstimator.scala @@ -218,14 +218,8 @@ object SyntheticEstimator { } private[causal] def assignRowIndex(df: DataFrame, colName: String): DataFrame = { - df.sparkSession.createDataFrame( - df.rdd.zipWithIndex.map(element => - Row.fromSeq(Seq(element._2) ++ element._1.toSeq) - ), - StructType( - Array(StructField(colName, LongType, nullable = false)) ++ df.schema.fields - ) - ) + df.withColumn(colName, monotonically_increasing_id()) + .select(col(colName) +: df.columns.map(col): _*) } private[causal] def createIndex(data: DataFrame, inputCol: String, indexCol: String): DataFrame = { diff --git a/core/src/main/scala/com/microsoft/azure/synapse/ml/causal/linalg/VectorOps.scala b/core/src/main/scala/com/microsoft/azure/synapse/ml/causal/linalg/VectorOps.scala index db284948952..379ccb31e0b 100644 --- a/core/src/main/scala/com/microsoft/azure/synapse/ml/causal/linalg/VectorOps.scala +++ b/core/src/main/scala/com/microsoft/azure/synapse/ml/causal/linalg/VectorOps.scala @@ -96,10 +96,8 @@ object DVectorOps extends VectorOps[DVector] { def make(size: Long, value: => Double): DVector = { val spark = SparkSession.active import spark.implicits._ - val data = 0L until size spark - .sparkContext - .parallelize(data) + .range(0, size) .toDF("i") .withColumn("value", lit(value)) .as[VectorEntry] diff --git a/core/src/main/scala/com/microsoft/azure/synapse/ml/core/utils/ClusterUtil.scala b/core/src/main/scala/com/microsoft/azure/synapse/ml/core/utils/ClusterUtil.scala index 72a748708b6..f3a8a9b15bc 100644 --- a/core/src/main/scala/com/microsoft/azure/synapse/ml/core/utils/ClusterUtil.scala +++ b/core/src/main/scala/com/microsoft/azure/synapse/ml/core/utils/ClusterUtil.scala @@ -5,9 +5,7 @@ package com.microsoft.azure.synapse.ml.core.utils import java.net.InetAddress import org.apache.http.conn.util.InetAddressUtils -import org.apache.spark.SparkContext import org.apache.spark.injections.BlockManagerUtils -import org.apache.spark.sql.functions.typedLit import org.apache.spark.sql.{Column, DataFrame, SparkSession} import org.slf4j.Logger @@ -20,7 +18,7 @@ object ClusterUtil { * @return The number of tasks per executor. */ def getNumTasksPerExecutor(spark: SparkSession, log: Logger): Int = { - val confTaskCpus = getTaskCpus(spark.sparkContext, log) + val confTaskCpus = getTaskCpus(spark, log) try { val confCores = spark.sparkContext.getConf.get("spark.executor.cores").toInt val tasksPerExec = confCores / confTaskCpus @@ -44,13 +42,18 @@ object ClusterUtil { * @return The number of rows per partition (where partitionId is the array index). */ def getNumRowsPerPartition(df: DataFrame, labelCol: Column): Array[Long] = { - val indexedRowCounts: Array[(Int, Long)] = df - .select(typedLit(0.toByte)) - .rdd - .mapPartitionsWithIndex({case (i,rows) => Iterator((i,rows.size.toLong))}, true) + import org.apache.spark.sql.functions.{spark_partition_id, count, lit} + val partitionCounts = df + .select(spark_partition_id().as("partId")) + .groupBy("partId") + .agg(count(lit(1)).as("cnt")) .collect() - // Get an array where the index is implicitly the partition id - indexedRowCounts.sortBy(pair => pair._1).map(pair => pair._2) + val maxPartId = if (partitionCounts.isEmpty) 0 else partitionCounts.map(_.getInt(0)).max + 1 + val result = Array.fill[Long](maxPartId)(0L) + partitionCounts.foreach { row => + result(row.getInt(0)) = row.getLong(1) + } + result } /** Get number of default cores from sparkSession(required) or master(optional) for 1 executor. @@ -104,9 +107,9 @@ object ClusterUtil { } } - def getTaskCpus(sparkContext: SparkContext, log: Logger): Int = { + def getTaskCpus(spark: SparkSession, log: Logger): Int = { try { - val taskCpusConfig = sparkContext.getConf.getOption("spark.task.cpus") + val taskCpusConfig = spark.sparkContext.getConf.getOption("spark.task.cpus") if (taskCpusConfig.isEmpty) { log.info("ClusterUtils did not detect spark.task.cpus config set, using default 1 instead") } diff --git a/core/src/main/scala/com/microsoft/azure/synapse/ml/recommendation/RankingEvaluator.scala b/core/src/main/scala/com/microsoft/azure/synapse/ml/recommendation/RankingEvaluator.scala index 93ee686c5bd..c7b21a94c0a 100644 --- a/core/src/main/scala/com/microsoft/azure/synapse/ml/recommendation/RankingEvaluator.scala +++ b/core/src/main/scala/com/microsoft/azure/synapse/ml/recommendation/RankingEvaluator.scala @@ -130,6 +130,7 @@ class RankingEvaluator(override val uid: String) /** @group setParam */ def setPredictionCol(value: String): this.type = set(predictionCol, value) + // Note: RankingMetrics from MLlib requires RDD input - .rdd conversion is necessary def getMetrics(dataset: Dataset[_]): AdvancedRankingMetrics = { val predictionAndLabels = dataset .select(getPredictionCol, getLabelCol) diff --git a/core/src/main/scala/com/microsoft/azure/synapse/ml/recommendation/SARModel.scala b/core/src/main/scala/com/microsoft/azure/synapse/ml/recommendation/SARModel.scala index 83a5de4022b..0347ff4008e 100644 --- a/core/src/main/scala/com/microsoft/azure/synapse/ml/recommendation/SARModel.scala +++ b/core/src/main/scala/com/microsoft/azure/synapse/ml/recommendation/SARModel.scala @@ -104,6 +104,7 @@ class SARModel(override val uid: String) extends Model[SARModel] dstOutputColumn: String, num: Int): DataFrame = { + // Note: CoordinateMatrix from MLlib requires RDD input - .rdd conversion is necessary def dfToRDDMatrxEntry(dataframe: DataFrame) = { dataframe.rdd .flatMap(row => diff --git a/core/src/main/scala/com/microsoft/azure/synapse/ml/stages/Lambda.scala b/core/src/main/scala/com/microsoft/azure/synapse/ml/stages/Lambda.scala index 8be7ba1700a..57bae22c2f4 100644 --- a/core/src/main/scala/com/microsoft/azure/synapse/ml/stages/Lambda.scala +++ b/core/src/main/scala/com/microsoft/azure/synapse/ml/stages/Lambda.scala @@ -6,7 +6,6 @@ package com.microsoft.azure.synapse.ml.stages import com.microsoft.azure.synapse.ml.codegen.Wrappable import com.microsoft.azure.synapse.ml.logging.{FeatureNames, SynapseMLLogging} import com.microsoft.azure.synapse.ml.param.UDFParam -import org.apache.spark.SparkContext import org.apache.spark.injections.UDFUtils import org.apache.spark.ml.param.ParamMap import org.apache.spark.ml.util.Identifiable @@ -59,8 +58,8 @@ class Lambda(val uid: String) extends Transformer with Wrappable with ComplexPar def transformSchema(schema: StructType): StructType = { if (get(transformSchemaFunc).isEmpty) { - val sc = SparkContext.getOrCreate() - val df = SparkSession.builder().getOrCreate().createDataFrame(sc.emptyRDD[Row], schema) + val spark = SparkSession.builder().getOrCreate() + val df = spark.createDataFrame(java.util.Collections.emptyList[Row](), schema) transform(df).schema } else { getTransformSchema(schema) diff --git a/core/src/main/scala/com/microsoft/azure/synapse/ml/stages/Repartition.scala b/core/src/main/scala/com/microsoft/azure/synapse/ml/stages/Repartition.scala index 94d4a1f9aa8..031a5255da0 100644 --- a/core/src/main/scala/com/microsoft/azure/synapse/ml/stages/Repartition.scala +++ b/core/src/main/scala/com/microsoft/azure/synapse/ml/stages/Repartition.scala @@ -8,9 +8,8 @@ import com.microsoft.azure.synapse.ml.logging.{FeatureNames, SynapseMLLogging} import org.apache.spark.ml.Transformer import org.apache.spark.ml.param._ import org.apache.spark.ml.util.{DefaultParamsReadable, DefaultParamsWritable, Identifiable} -import org.apache.spark.rdd.RDD import org.apache.spark.sql.types._ -import org.apache.spark.sql.{DataFrame, Dataset, Row} +import org.apache.spark.sql.{DataFrame, Dataset} object Repartition extends DefaultParamsReadable[Repartition] @@ -50,12 +49,8 @@ class Repartition(val uid: String) extends Transformer with Wrappable with Defau logTransform[DataFrame]({ if (getDisable) dataset.toDF - else if (getN < dataset.rdd.getNumPartitions) - dataset.coalesce(getN).toDF() else - dataset.sqlContext.createDataFrame( - dataset.rdd.repartition(getN).asInstanceOf[RDD[Row]], - dataset.schema) + dataset.repartition(getN).toDF() }, dataset.columns.length) } diff --git a/core/src/main/scala/com/microsoft/azure/synapse/ml/stages/StratifiedRepartition.scala b/core/src/main/scala/com/microsoft/azure/synapse/ml/stages/StratifiedRepartition.scala index d8808fd44f1..ba0db3a9299 100644 --- a/core/src/main/scala/com/microsoft/azure/synapse/ml/stages/StratifiedRepartition.scala +++ b/core/src/main/scala/com/microsoft/azure/synapse/ml/stages/StratifiedRepartition.scala @@ -6,13 +6,14 @@ package com.microsoft.azure.synapse.ml.stages import com.microsoft.azure.synapse.ml.codegen.Wrappable import com.microsoft.azure.synapse.ml.core.contracts.HasLabelCol import com.microsoft.azure.synapse.ml.logging.{FeatureNames, SynapseMLLogging} -import org.apache.spark.RangePartitioner import org.apache.spark.ml.Transformer import org.apache.spark.ml.param._ import org.apache.spark.ml.param.shared.HasSeed import org.apache.spark.ml.util._ import org.apache.spark.sql.types._ -import org.apache.spark.sql.{DataFrame, Dataset} +import org.apache.spark.sql.{DataFrame, Dataset, Row} +import org.apache.spark.sql.expressions.Window +import org.apache.spark.sql.functions.{col, lit, max => sqlMax, rand, row_number, spark_partition_id} /** Constants for StratifiedRepartition. */ object SPConstants { @@ -49,32 +50,58 @@ class StratifiedRepartition(val uid: String) extends Transformer with Wrappable */ override def transform(dataset: Dataset[_]): DataFrame = { logTransform[DataFrame]({ - // Count unique values in label column - val distinctLabelCounts = dataset.select(getLabelCol).groupBy(getLabelCol).count().collect() - val labelToCount = distinctLabelCounts.map(row => (row.getInt(0), row.getLong(1))) - val labelToFraction = - getMode match { - case SPConstants.Equal => getEqualLabelCount(labelToCount, dataset) - case SPConstants.Mixed => - val equalLabelToCount = getEqualLabelCount(labelToCount, dataset) - val normalizedRatio = equalLabelToCount.map { case (label, count) => count }.sum / labelToCount.length - labelToCount.map { case (label, count) => (label, count / normalizedRatio) }.toMap - case SPConstants.Original => labelToCount.map { case (label, count) => (label, 1.0) }.toMap - case _ => throw new Exception(s"Unknown mode specified to StratifiedRepartition: $getMode") - } - val labelColIndex = dataset.schema.fieldIndex(getLabelCol) - val spdata = dataset.toDF().rdd.keyBy(row => row.getInt(labelColIndex)) - .sampleByKeyExact(true, labelToFraction, getSeed) - .mapPartitions(keyToRow => keyToRow.zipWithIndex.map { case ((key, row), index) => (index, row) }) - val rangePartitioner = new RangePartitioner(dataset.rdd.getNumPartitions, spdata) - val rspdata = spdata.partitionBy(rangePartitioner).mapPartitions(keyToRow => - keyToRow.map { case (key, row) => row }).persist() - dataset.sqlContext.createDataFrame(rspdata, dataset.schema) + val df = dataset.toDF() + val labelToFraction = computeLabelFractions(df) + val sampled = stratifiedSample(df, labelToFraction) + val numPartitions = getNumPartitions(df) + roundRobinRepartition(sampled, numPartitions) }, dataset.columns.length) } - private def getEqualLabelCount(labelToCount: Array[(Int, Long)], dataset: Dataset[_]): Map[Int, Double] = { - val maxLabelCount = Math.max(labelToCount.map { case (label, count) => count }.max, dataset.rdd.getNumPartitions) + private def computeLabelFractions(df: DataFrame): Map[Int, Double] = { + val distinctLabelCounts = df.select(getLabelCol).groupBy(getLabelCol).count().collect() + val labelToCount = distinctLabelCounts.map(row => (row.getInt(0), row.getLong(1))) + getMode match { + case SPConstants.Equal => getEqualLabelCount(labelToCount, df) + case SPConstants.Mixed => + val equalLabelToCount = getEqualLabelCount(labelToCount, df) + val normalizedRatio = equalLabelToCount.map { case (_, count) => count }.sum / labelToCount.length + labelToCount.map { case (label, count) => (label, count / normalizedRatio) }.toMap + case SPConstants.Original => labelToCount.map { case (label, _) => (label, 1.0) }.toMap + case _ => throw new Exception(s"Unknown mode specified to StratifiedRepartition: $getMode") + } + } + + private def stratifiedSample(df: DataFrame, labelToFraction: Map[Int, Double]): DataFrame = { + val spark = df.sparkSession + val emptyDF = spark.createDataFrame(java.util.Collections.emptyList[Row](), df.schema) + val labelDFs = labelToFraction.map { case (label, fraction) => + val labelData = df.filter(col(getLabelCol) === lit(label)) + val wholeReplicates = math.floor(fraction).toInt + val fractionalPart = fraction - wholeReplicates + val wholePart = if (wholeReplicates > 0) { + (1 to wholeReplicates).map(_ => labelData).reduce(_ union _) + } else emptyDF + val fracPart = if (fractionalPart > 0) { + labelData.sample(withReplacement = false, fractionalPart, getSeed) + } else emptyDF + wholePart.union(fracPart) + } + labelDFs.reduce(_ union _) + } + + private def getNumPartitions(df: DataFrame): Int = + df.select(spark_partition_id().as("_pid")).agg(sqlMax("_pid")).head().getInt(0) + 1 + + private def roundRobinRepartition(df: DataFrame, numPartitions: Int): DataFrame = { + val windowSpec = Window.partitionBy(col(getLabelCol)).orderBy(rand(getSeed)) + val withPartition = df.withColumn("_rr_idx", row_number().over(windowSpec) % lit(numPartitions)) + withPartition.repartitionByRange(numPartitions, col("_rr_idx")).drop("_rr_idx") + } + + private def getEqualLabelCount(labelToCount: Array[(Int, Long)], df: DataFrame): Map[Int, Double] = { + val numPartitions = getNumPartitions(df) + val maxLabelCount = Math.max(labelToCount.map { case (_, count) => count }.max, numPartitions) labelToCount.map { case (label, count) => (label, maxLabelCount.toDouble / count) }.toMap } diff --git a/core/src/main/scala/com/microsoft/azure/synapse/ml/train/ComputeModelStatistics.scala b/core/src/main/scala/com/microsoft/azure/synapse/ml/train/ComputeModelStatistics.scala index fa609a6e51d..a203fc31642 100644 --- a/core/src/main/scala/com/microsoft/azure/synapse/ml/train/ComputeModelStatistics.scala +++ b/core/src/main/scala/com/microsoft/azure/synapse/ml/train/ComputeModelStatistics.scala @@ -266,6 +266,9 @@ class ComputeModelStatistics(override val uid: String) extends Transformer .drop(Array(predictionColumnName, labelColumnName)) } + // Note: MLlib metrics (BinaryClassificationMetrics, MulticlassMetrics, RegressionMetrics) + // require RDD input. These .rdd conversions are necessary until Spark provides + // DataFrame-based equivalents for all metrics. private def selectAndCastToRDD(dataset: Dataset[_], predictionColumnName: String, labelColumnName: String): RDD[(Double, Double)] = { diff --git a/core/src/main/scala/com/microsoft/azure/synapse/ml/train/ComputePerInstanceStatistics.scala b/core/src/main/scala/com/microsoft/azure/synapse/ml/train/ComputePerInstanceStatistics.scala index cceecf77ea4..2c4d80a3b80 100644 --- a/core/src/main/scala/com/microsoft/azure/synapse/ml/train/ComputePerInstanceStatistics.scala +++ b/core/src/main/scala/com/microsoft/azure/synapse/ml/train/ComputePerInstanceStatistics.scala @@ -71,7 +71,7 @@ class ComputePerInstanceStatistics(override val uid: String) extends Transformer if (levels.get.length > 2) levels.get.length else 2 } else { // Otherwise compute unique levels - dataset.select(col(labelColumnName).cast(DoubleType)).rdd.distinct().count().toInt + dataset.select(col(labelColumnName).cast(DoubleType)).distinct().count().toInt } val logLossFunc = udf((scoredLabel: Double, scores: org.apache.spark.ml.linalg.Vector) => diff --git a/core/src/main/scala/org/apache/spark/ml/ComplexParamsSerializer.scala b/core/src/main/scala/org/apache/spark/ml/ComplexParamsSerializer.scala index a0e7f41d0ca..00fe7aa2c34 100644 --- a/core/src/main/scala/org/apache/spark/ml/ComplexParamsSerializer.scala +++ b/core/src/main/scala/org/apache/spark/ml/ComplexParamsSerializer.scala @@ -5,7 +5,6 @@ package org.apache.spark.ml import com.microsoft.azure.synapse.ml.core.serialize.ComplexParam import org.apache.hadoop.fs.Path -import org.apache.spark.SparkContext import org.apache.spark.ml.param.{ParamPair, Params} import org.apache.spark.ml.util.DefaultParamsReader.Metadata import org.apache.spark.ml.util._ @@ -37,7 +36,7 @@ private[ml] class ComplexParamsWriter(instance: Params) extends MLWriter { override protected def saveImpl(path: String): Unit = { val complexParamLocs = ComplexParamsWriter.getComplexParamLocations(instance, path) val complexParamJson = ComplexParamsWriter.getComplexMetadata(complexParamLocs) - ComplexParamsWriter.saveMetadata(instance, path, sc, complexParamJson) + ComplexParamsWriter.saveMetadata(instance, path, sparkSession, complexParamJson) ComplexParamsWriter.saveComplexParams(path, complexParamLocs, shouldOverwrite) } } @@ -90,12 +89,12 @@ private[ml] object ComplexParamsWriter { */ def saveMetadata(instance: Params, path: String, - sc: SparkContext, + spark: SparkSession, extraMetadata: Option[JObject] = None, paramMap: Option[JValue] = None): Unit = { val metadataPath = new Path(path, "metadata").toString - val metadataJson = getMetadataToSave(instance, sc, extraMetadata, paramMap) - sc.parallelize(Seq(metadataJson), 1).saveAsTextFile(metadataPath) + val metadataJson = getMetadataToSave(instance, spark, extraMetadata, paramMap) + spark.createDataFrame(Seq(Tuple1(metadataJson))).toDF("value").write.text(metadataPath) } /** Helper for [[saveMetadata()]] which extracts the JSON to save. @@ -104,7 +103,7 @@ private[ml] object ComplexParamsWriter { * @see [[saveMetadata()]] for details on what this includes. */ def getMetadataToSave(instance: Params, - sc: SparkContext, + spark: SparkSession, extraMetadata: Option[JObject] = None, paramMap: Option[JValue] = None): String = { val uid = instance.uid @@ -121,7 +120,7 @@ private[ml] object ComplexParamsWriter { }.toList) val basicMetadata = ("class" -> cls) ~ ("timestamp" -> System.currentTimeMillis()) ~ - ("sparkVersion" -> sc.version) ~ + ("sparkVersion" -> spark.version) ~ ("uid" -> uid) ~ ("paramMap" -> jsonParams) ~ ("defaultParamMap" -> jsonDefaultParams) diff --git a/core/src/main/scala/org/apache/spark/ml/Serializer.scala b/core/src/main/scala/org/apache/spark/ml/Serializer.scala index 478b6089dc7..b27cf429cc5 100644 --- a/core/src/main/scala/org/apache/spark/ml/Serializer.scala +++ b/core/src/main/scala/org/apache/spark/ml/Serializer.scala @@ -6,7 +6,6 @@ package org.apache.spark.ml import com.microsoft.azure.synapse.ml.core.env.StreamUtilities._ import com.microsoft.azure.synapse.ml.core.utils.ContextObjectInputStream import org.apache.hadoop.fs.Path -import org.apache.spark.SparkContext import org.apache.spark.ml.util.MLWritable import org.apache.spark.sql._ @@ -42,7 +41,7 @@ object Serializer { (if (tpe <:< typeOf[PipelineStage]) new PipelineSerializer() else if (tpe <:< typeOf[Array[PipelineStage]]) new PipelineArraySerializer() else if (tpe <:< typeOf[Dataset[_]]) new DFSerializer(sparkSession) - else new ObjectSerializer(sparkSession.sparkContext)(typeToTypeTag(tpe))) + else new ObjectSerializer(sparkSession)(typeToTypeTag(tpe))) .asInstanceOf[Serializer[T]] } @@ -69,9 +68,9 @@ object Serializer { * @param obj The object to write. * @param outputPath Where to write the object */ - def writeToHDFS[O](sc: SparkContext, obj: O, outputPath: Path, overwrite: Boolean) + def writeToHDFS[O](spark: SparkSession, obj: O, outputPath: Path, overwrite: Boolean) (implicit ttag: TypeTag[O]): Unit = { - val hadoopConf = sc.hadoopConfiguration + val hadoopConf = spark.sparkContext.hadoopConfiguration using(outputPath.getFileSystem(hadoopConf).create(outputPath, overwrite)) { os => write[O](obj, os)(ttag) }.get @@ -82,16 +81,16 @@ object Serializer { * @param path The main path for model to load the object from. * @return The loaded object. */ - def readFromHDFS[O](sc: SparkContext, path: Path)(implicit ttag: TypeTag[O]): O = { - val hadoopConf = sc.hadoopConfiguration + def readFromHDFS[O](spark: SparkSession, path: Path)(implicit ttag: TypeTag[O]): O = { + val hadoopConf = spark.sparkContext.hadoopConfiguration using(path.getFileSystem(hadoopConf).open(path)) { in => read[O](in)(ttag) }.get } - def makeQualifiedPath(sc: SparkContext, path: String): Path = { + def makeQualifiedPath(spark: SparkSession, path: String): Path = { val modelPath = new Path(path) - val hadoopConf = sc.hadoopConfiguration + val hadoopConf = spark.sparkContext.hadoopConfiguration // Note: to get correct working dir, must use root path instead of root + part val fs = modelPath.getFileSystem(hadoopConf) modelPath.makeQualified(fs.getUri, fs.getWorkingDirectory) @@ -99,10 +98,10 @@ object Serializer { } -class ObjectSerializer[O](sc: SparkContext)(implicit ttag: TypeTag[O]) extends Serializer[O] { - def write(obj: O, path: Path, overwrite: Boolean): Unit = Serializer.writeToHDFS(sc, obj, path, overwrite) +class ObjectSerializer[O](spark: SparkSession)(implicit ttag: TypeTag[O]) extends Serializer[O] { + def write(obj: O, path: Path, overwrite: Boolean): Unit = Serializer.writeToHDFS(spark, obj, path, overwrite) - def read(path: Path): O = Serializer.readFromHDFS(sc, path) + def read(path: Path): O = Serializer.readFromHDFS(spark, path) } class DFSerializer(spark: SparkSession) extends Serializer[DataFrame] { diff --git a/core/src/main/scala/org/apache/spark/sql/execution/streaming/DistributedHTTPSource.scala b/core/src/main/scala/org/apache/spark/sql/execution/streaming/DistributedHTTPSource.scala index f41218b0e0d..39e507d3c6f 100644 --- a/core/src/main/scala/org/apache/spark/sql/execution/streaming/DistributedHTTPSource.scala +++ b/core/src/main/scala/org/apache/spark/sql/execution/streaming/DistributedHTTPSource.scala @@ -224,9 +224,8 @@ class DistributedHTTPSource(name: String, // TODO do this by hooking deeper into spark, // TODO allow for dynamic allocation private[spark] val serverInfoDF: DataFrame = { - val serverInfo = sqlContext.sparkContext - .parallelize(Seq(Tuple1("placeholder")), - maxPartitions.getOrElse(sqlContext.sparkContext.defaultParallelism)) + val numParts = maxPartitions.getOrElse(sqlContext.sparkSession.sparkContext.defaultParallelism) + val serverInfo = sqlContext.sparkSession.range(0, numParts, 1, numParts) .toDF("empty") .mapPartitions { _ => val s = server.get @@ -249,6 +248,7 @@ class DistributedHTTPSource(name: String, } private[spark] val serverInfoDFStreaming = { + // Note: internalCreateDataFrame with isStreaming requires RDD - no DataFrame alternative exists val serializer = infoEnc.createSerializer() val serverInfoConfRDD = serverInfoDF.rdd.map(serializer) sqlContext.sparkSession.internalCreateDataFrame( diff --git a/core/src/main/scala/org/apache/spark/sql/execution/streaming/HTTPSource.scala b/core/src/main/scala/org/apache/spark/sql/execution/streaming/HTTPSource.scala index d5dfadf7137..d350f3afcb7 100644 --- a/core/src/main/scala/org/apache/spark/sql/execution/streaming/HTTPSource.scala +++ b/core/src/main/scala/org/apache/spark/sql/execution/streaming/HTTPSource.scala @@ -100,6 +100,7 @@ class HTTPSource(name: String, host: String, port: Int, sqlContext: SQLContext) row.asInstanceOf[InternalRow] } } + // Note: internalCreateDataFrame with isStreaming requires RDD - no DataFrame alternative exists val rawBatch = if (rawList.nonEmpty) { sqlContext.sparkContext.parallelize(rawList) } else { diff --git a/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/HTTPSourceV2.scala b/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/HTTPSourceV2.scala index c185f8d6251..d004c2da932 100644 --- a/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/HTTPSourceV2.scala +++ b/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/HTTPSourceV2.scala @@ -179,6 +179,7 @@ private[streaming] object DriverServiceUtils { } def getDriverHost: String = { + // Note: SparkContext needed for BlockManager access - no SparkSession equivalent val blockManager = SparkContext.getActive.get.env.blockManager blockManager.master.getMemoryStatus.toList.flatMap({ case (blockManagerId, _) => if (blockManagerId.executorId == "driver") Some(getHostToIP(blockManagerId.host)) diff --git a/deep-learning/src/main/scala/com/microsoft/azure/synapse/ml/onnx/ONNXHub.scala b/deep-learning/src/main/scala/com/microsoft/azure/synapse/ml/onnx/ONNXHub.scala index 5c0ce2a93a0..1d8e0ac4f78 100644 --- a/deep-learning/src/main/scala/com/microsoft/azure/synapse/ml/onnx/ONNXHub.scala +++ b/deep-learning/src/main/scala/com/microsoft/azure/synapse/ml/onnx/ONNXHub.scala @@ -11,8 +11,8 @@ import org.apache.commons.codec.digest.DigestUtils import org.apache.commons.io.IOUtils import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.io.{IOUtils => HUtils} -import org.apache.spark.SparkContext import org.apache.spark.internal.Logging +import org.apache.spark.sql.SparkSession import spray.json._ import java.io.BufferedInputStream @@ -84,7 +84,7 @@ object ONNXHub { .map(xch => new Path(new Path(xch, "onnx"), "hub"))) .getOrElse({ val home = new Path("placeholder") - .getFileSystem(SparkContext.getOrCreate().hadoopConfiguration) + .getFileSystem(SparkSession.active.sparkContext.hadoopConfiguration) .getHomeDirectory FileUtilities.join(home, ".cache", "onnx", "hub") }) @@ -201,7 +201,7 @@ class ONNXHub(val modelCacheDir: Path, urlCon.setReadTimeout(readTimeout) using(new BufferedInputStream(urlCon.getInputStream)) { is => using(fs.create(path)) { os => - HUtils.copyBytes(is, os, SparkContext.getOrCreate().hadoopConfiguration) + HUtils.copyBytes(is, os, SparkSession.active.sparkContext.hadoopConfiguration) } } } @@ -219,7 +219,7 @@ class ONNXHub(val modelCacheDir: Path, Seq(selectedModel.metadata.modelSha.map(sha => s"${sha}_${modelPathArr.last}").getOrElse(modelPathArr.last)) val localModelPath = FileUtilities.join(getDir, localModelDirs: _*) - val fs = localModelPath.getFileSystem(SparkContext.getOrCreate().hadoopConfiguration) + val fs = localModelPath.getFileSystem(SparkSession.active.sparkContext.hadoopConfiguration) if (forceReload || !fs.exists(localModelPath)) { if (!verifyRepoRef(repo)) { diff --git a/deep-learning/src/main/scala/com/microsoft/azure/synapse/ml/onnx/ONNXModel.scala b/deep-learning/src/main/scala/com/microsoft/azure/synapse/ml/onnx/ONNXModel.scala index 491ee64d675..b97076eb504 100644 --- a/deep-learning/src/main/scala/com/microsoft/azure/synapse/ml/onnx/ONNXModel.scala +++ b/deep-learning/src/main/scala/com/microsoft/azure/synapse/ml/onnx/ONNXModel.scala @@ -27,7 +27,7 @@ import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.functions.col import org.apache.spark.sql.types._ -import org.apache.spark.{SparkContext, TaskContext} +import org.apache.spark.TaskContext import java.util import scala.collection.JavaConverters._ @@ -196,8 +196,19 @@ class ONNXModel(override val uid: String) } def setModelLocation(path: String): this.type = { - val modelBytes = SparkContext.getOrCreate().binaryFiles(path).first()._2.toArray - this.setModelPayload(modelBytes) + val spark = SparkSession.builder().getOrCreate() + val hadoopConf = spark.sparkContext.hadoopConfiguration + val hadoopPath = new org.apache.hadoop.fs.Path(path) + val fs = hadoopPath.getFileSystem(hadoopConf) + val stream = fs.open(hadoopPath) + try { + val fileLength = fs.getFileStatus(hadoopPath).getLen.toInt + val modelBytes = new Array[Byte](fileLength) + stream.readFully(modelBytes) + this.setModelPayload(modelBytes) + } finally { + stream.close() + } } def sliceAtOutput(output: String): ONNXModel = { diff --git a/lightgbm/src/main/scala/com/microsoft/azure/synapse/ml/lightgbm/LightGBMBase.scala b/lightgbm/src/main/scala/com/microsoft/azure/synapse/ml/lightgbm/LightGBMBase.scala index 79a17931c7c..0d90e16f071 100644 --- a/lightgbm/src/main/scala/com/microsoft/azure/synapse/ml/lightgbm/LightGBMBase.scala +++ b/lightgbm/src/main/scala/com/microsoft/azure/synapse/ml/lightgbm/LightGBMBase.scala @@ -132,12 +132,7 @@ trait LightGBMBase[TrainedModel <: Model[TrainedModel] with LightGBMModelParams] * barrier execution, which is unfortunate as repartition is more expensive than coalesce. */ if (getUseBarrierExecutionMode) { - val numPartitions = df.rdd.getNumPartitions - if (numPartitions > numTasks) { - df.repartition(numTasks) - } else { - df - } + df.repartition(numTasks) } else { df.coalesce(numTasks) } @@ -450,8 +445,7 @@ trait LightGBMBase[TrainedModel <: Model[TrainedModel] with LightGBMModelParams] // By default, we try to intelligently calculate the number of executors, but user can override this with numTasks if (configNumTasks > 0) configNumTasks else { - val numExecutorTasks = ClusterUtil.getNumExecutorTasks(dataset.sparkSession, numTasksPerExecutor, log) - min(numExecutorTasks, dataset.rdd.getNumPartitions) + ClusterUtil.getNumExecutorTasks(dataset.sparkSession, numTasksPerExecutor, log) } } @@ -616,6 +610,7 @@ trait LightGBMBase[TrainedModel <: Model[TrainedModel] with LightGBMModelParams] val encoder = Encoders.kryo[PartitionResult] measures.markTrainingStart() + // Note: barrier execution requires RDD API - no DataFrame equivalent exists in Spark val results: Array[PartitionResult] = if (getUseBarrierExecutionMode) dataframe.rdd.barrier().mapPartitions(mapPartitionsFunc).collect() diff --git a/lightgbm/src/main/scala/com/microsoft/azure/synapse/ml/lightgbm/LightGBMRanker.scala b/lightgbm/src/main/scala/com/microsoft/azure/synapse/ml/lightgbm/LightGBMRanker.scala index ceaa39c4621..85bfd940156 100644 --- a/lightgbm/src/main/scala/com/microsoft/azure/synapse/ml/lightgbm/LightGBMRanker.scala +++ b/lightgbm/src/main/scala/com/microsoft/azure/synapse/ml/lightgbm/LightGBMRanker.scala @@ -96,17 +96,9 @@ class LightGBMRanker(override val uid: String) val repartitionedDataset = getOptGroupCol match { case None => dataset case Some(groupingCol) => - val numPartitions = dataset.rdd.getNumPartitions - - // in barrier mode, will use repartition in super.prepareDataframe, - // this will let repartition on groupingCol fail - // so repartition here, then super.prepareDataframe won't repartition if (getUseBarrierExecutionMode) { - if (numPartitions > numTasks) { - dataset.repartition(numTasks, new Column(groupingCol)) - } else { - dataset.repartition(numPartitions, new Column(groupingCol)) - } + // In barrier mode, repartition by grouping column with target partition count + dataset.repartition(numTasks, new Column(groupingCol)) } else { // if not in barrier mode, coalesce won't break repartition by groupingCol dataset.repartition(new Column(groupingCol)) diff --git a/lightgbm/src/main/scala/com/microsoft/azure/synapse/ml/lightgbm/booster/LightGBMBooster.scala b/lightgbm/src/main/scala/com/microsoft/azure/synapse/ml/lightgbm/booster/LightGBMBooster.scala index e5a10b371ac..84d3324ee81 100644 --- a/lightgbm/src/main/scala/com/microsoft/azure/synapse/ml/lightgbm/booster/LightGBMBooster.scala +++ b/lightgbm/src/main/scala/com/microsoft/azure/synapse/ml/lightgbm/booster/LightGBMBooster.scala @@ -459,9 +459,8 @@ class LightGBMBooster(val trainDataset: Option[LightGBMDataset] = None, if (filename == null || filename.isEmpty) { throw new IllegalArgumentException("filename should not be empty or null.") } - val rdd = session.sparkContext.parallelize(Seq(modelStr.get)) import session.sqlContext.implicits._ - val dataset = session.sqlContext.createDataset(rdd) + val dataset = Seq(modelStr.get).toDS() val mode = if (overwrite) SaveMode.Overwrite else SaveMode.ErrorIfExists dataset.coalesce(1).write.mode(mode).text(filename) } @@ -480,9 +479,8 @@ class LightGBMBooster(val trainDataset: Option[LightGBMDataset] = None, def dumpModel(session: SparkSession, filename: String, overwrite: Boolean): Unit = { val json = lightgbmlib.LGBM_BoosterDumpModelSWIG(boosterHandler.boosterPtr, 0, -1, 0, 1, boosterHandler.dumpModelOutPtr.get().ptr) - val rdd = session.sparkContext.parallelize(Seq(json)) import session.sqlContext.implicits._ - val dataset = session.sqlContext.createDataset(rdd) + val dataset = Seq(json).toDS() val mode = if (overwrite) SaveMode.Overwrite else SaveMode.ErrorIfExists dataset.coalesce(1).write.mode(mode).text(filename) } diff --git a/vw/src/main/scala/com/microsoft/azure/synapse/ml/vw/VowpalWabbitBase.scala b/vw/src/main/scala/com/microsoft/azure/synapse/ml/vw/VowpalWabbitBase.scala index c612cbce3fe..47e54b33921 100644 --- a/vw/src/main/scala/com/microsoft/azure/synapse/ml/vw/VowpalWabbitBase.scala +++ b/vw/src/main/scala/com/microsoft/azure/synapse/ml/vw/VowpalWabbitBase.scala @@ -119,25 +119,22 @@ trait VowpalWabbitBase // get list of columns needed as input protected def getInputColumns: Seq[String] - protected def prepareDataSet(dataset: Dataset[_]): DataFrame = { + protected def prepareDataSet(dataset: Dataset[_]): (DataFrame, Int) = { // follow LightGBM pattern val numTasksPerExec = ClusterUtil.getNumTasksPerExecutor(dataset.sparkSession, log) val numExecutorTasks = ClusterUtil.getNumExecutorTasks(dataset.sparkSession, numTasksPerExec, log) - val numTasks = min(numExecutorTasks, dataset.rdd.getNumPartitions) // Need to pass all columns as sub-cl val dfSubset = dataset.toDF() // Reduce number of partitions to number of executor cores - if (dataset.rdd.getNumPartitions > numTasks) { - if (getUseBarrierExecutionMode) { // see [SPARK-24820][SPARK-24821] - dfSubset.repartition(numTasks) - } - else { - dfSubset.coalesce(numTasks) - } - } else - dfSubset + // coalesce is a no-op when numTasks >= current partitions + if (getUseBarrierExecutionMode) { + (dfSubset.repartition(numExecutorTasks), numExecutorTasks) + } + else { + (dfSubset.coalesce(numExecutorTasks), numExecutorTasks) + } } /** diff --git a/vw/src/main/scala/com/microsoft/azure/synapse/ml/vw/VowpalWabbitBaseLearner.scala b/vw/src/main/scala/com/microsoft/azure/synapse/ml/vw/VowpalWabbitBaseLearner.scala index 699a737da84..f457a42bf3c 100644 --- a/vw/src/main/scala/com/microsoft/azure/synapse/ml/vw/VowpalWabbitBaseLearner.scala +++ b/vw/src/main/scala/com/microsoft/azure/synapse/ml/vw/VowpalWabbitBaseLearner.scala @@ -181,6 +181,7 @@ trait VowpalWabbitBaseLearner extends VowpalWabbitBase { // dispatch to exectuors and collect the model of the first partition (everybody has the same at the end anyway) // important to trigger collect() here so that the spanning tree is still up + // Note: barrier execution requires RDD API - no DataFrame equivalent exists in Spark if (getUseBarrierExecutionMode) df.rdd.barrier().mapPartitions(inputRows => trainIteration(inputRows, localInitialModel)).collect().toSeq else @@ -395,8 +396,7 @@ trait VowpalWabbitBaseLearner extends VowpalWabbitBase { } else { // VW internal coordination - val df = prepareDataSet(dataset) - val numTasks = df.rdd.getNumPartitions + val (df, numTasks) = prepareDataSet(dataset) // get the final command line args val vwArgs = getCommandLineArgs diff --git a/vw/src/main/scala/com/microsoft/azure/synapse/ml/vw/VowpalWabbitBaseProgressive.scala b/vw/src/main/scala/com/microsoft/azure/synapse/ml/vw/VowpalWabbitBaseProgressive.scala index 7c781175eee..0972fda4abd 100644 --- a/vw/src/main/scala/com/microsoft/azure/synapse/ml/vw/VowpalWabbitBaseProgressive.scala +++ b/vw/src/main/scala/com/microsoft/azure/synapse/ml/vw/VowpalWabbitBaseProgressive.scala @@ -92,10 +92,9 @@ trait VowpalWabbitBaseProgressive def getAdditionalOutputSchema: StructType override def transform(dataset: Dataset[_]): DataFrame = { - val df = prepareDataSet(dataset) + val (df, numTasks) = prepareDataSet(dataset) val schema = transformSchema(df.schema) - val numTasks = df.rdd.getNumPartitions val synchronizationSchedule = interPassSyncSchedule(df) // schedule multiple mapPartitions in diff --git a/vw/src/main/scala/com/microsoft/azure/synapse/ml/vw/VowpalWabbitBaseSpark.scala b/vw/src/main/scala/com/microsoft/azure/synapse/ml/vw/VowpalWabbitBaseSpark.scala index 84448cec8d3..1ae60c2c50a 100644 --- a/vw/src/main/scala/com/microsoft/azure/synapse/ml/vw/VowpalWabbitBaseSpark.scala +++ b/vw/src/main/scala/com/microsoft/azure/synapse/ml/vw/VowpalWabbitBaseSpark.scala @@ -5,7 +5,6 @@ package com.microsoft.azure.synapse.ml.vw import com.microsoft.azure.synapse.ml.core.contracts.HasWeightCol import com.microsoft.azure.synapse.ml.core.env.StreamUtilities -import org.apache.spark.SparkContext import org.apache.spark.ml.param.shared.{HasFeaturesCol, HasLabelCol} import org.apache.spark.sql.{DataFrame, Row, SparkSession, types => T} import org.vowpalwabbit.spark.VowpalWabbitExample