Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import org.apache.spark.ml.classification.ClassificationModel
import org.apache.spark.ml.param._
import org.apache.spark.ml.regression.RegressionModel
import org.apache.spark.ml.util._
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.sql._
import org.apache.spark.sql.types.StructType

Expand Down Expand Up @@ -149,9 +148,21 @@ class TuneHyperparameters(override val uid: String) extends Estimator[TuneHyperp
override def fit(dataset: Dataset[_]): TuneHyperparametersModel = { //scalastyle:ignore cyclomatic.complexity
logFit({
val sparkSession = dataset.sparkSession
val splits = MLUtils.kFold(dataset.toDF.rdd, getNumFolds, getSeed)
import org.apache.spark.sql.functions.{rand, lit}
val df = dataset.toDF
val nFolds = getNumFolds
// DataFrame-based k-fold splitting: assign each row to a fold using hash of random value
val dfWithFold = df.withColumn("_kfold_rand", rand(getSeed))
val splits = (0 until nFolds).map { fold =>
val training = dfWithFold
.filter((dfWithFold("_kfold_rand") * lit(nFolds)).cast("int") =!= lit(fold))
.drop("_kfold_rand")
val validation = dfWithFold
.filter((dfWithFold("_kfold_rand") * lit(nFolds)).cast("int") === lit(fold))
.drop("_kfold_rand")
(training, validation)
}.toArray
Comment on lines +151 to +164
Copy link

Copilot AI Mar 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dfWithFold is not cached, so each fold’s training and validation DataFrames will independently re-evaluate the rand(getSeed) expression and re-scan the input. Persisting dfWithFold once (and unpersisting after building splits) would both reduce work and ensure the fold assignment is identical across all derived DataFrames even if the optimizer rewrites the plan.

Copilot uses AI. Check for mistakes.
val hyperParams = getParamSpace.paramMaps
val schema = dataset.schema
val executionContext = getExecutionContext
val (evaluationMetricColumnName, operator): (String, Ordering[Double]) =
EvaluationUtils.getMetricWithOperator(getModels.head, getEvaluationMetric)
Expand All @@ -163,8 +174,8 @@ class TuneHyperparameters(override val uid: String) extends Estimator[TuneHyperp
val numModels = getModels.length

val metrics = splits.zipWithIndex.map { case ((training, validation), _) =>
val trainingDataset = sparkSession.createDataFrame(training, schema).cache()
val validationDataset = sparkSession.createDataFrame(validation, schema).cache()
val trainingDataset = training.cache()
val validationDataset = validation.cache()

val modelParams = ListBuffer[ParamMap]()
for (n <- 0 until getNumRuns) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,14 +218,8 @@ object SyntheticEstimator {
}

private[causal] def assignRowIndex(df: DataFrame, colName: String): DataFrame = {
df.sparkSession.createDataFrame(
df.rdd.zipWithIndex.map(element =>
Row.fromSeq(Seq(element._2) ++ element._1.toSeq)
),
StructType(
Array(StructField(colName, LongType, nullable = false)) ++ df.schema.fields
)
)
df.withColumn(colName, monotonically_increasing_id())
.select(col(colName) +: df.columns.map(col): _*)
Comment on lines +221 to +222
Copy link

Copilot AI Mar 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

monotonically_increasing_id() does not produce dense, 0-based consecutive indices (it encodes partition ID and can create very large gaps). Downstream MatrixOps.size computes matrix dimensions via max(i)+1, so this change can massively inflate inferred sizes and lead to incorrect behavior or OOMs. Please generate a dense index instead (e.g., row_number() over an explicit Window.orderBy(...) and subtract 1) so indices are contiguous and bounded by the distinct count.

Suggested change
df.withColumn(colName, monotonically_increasing_id())
.select(col(colName) +: df.columns.map(col): _*)
val windowSpec = Window.orderBy(df.columns.map(col): _*)
val indexedDf = df.withColumn(colName, row_number().over(windowSpec) - 1)
indexedDf.select(col(colName) +: df.columns.map(col): _*)

Copilot uses AI. Check for mistakes.
}

private[causal] def createIndex(data: DataFrame, inputCol: String, indexCol: String): DataFrame = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,8 @@ object DVectorOps extends VectorOps[DVector] {
def make(size: Long, value: => Double): DVector = {
val spark = SparkSession.active
import spark.implicits._
val data = 0L until size
spark
.sparkContext
.parallelize(data)
.range(0, size)
.toDF("i")
.withColumn("value", lit(value))
.as[VectorEntry]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,7 @@ package com.microsoft.azure.synapse.ml.core.utils

import java.net.InetAddress
import org.apache.http.conn.util.InetAddressUtils
import org.apache.spark.SparkContext
import org.apache.spark.injections.BlockManagerUtils
import org.apache.spark.sql.functions.typedLit
import org.apache.spark.sql.{Column, DataFrame, SparkSession}
import org.slf4j.Logger

Expand All @@ -20,7 +18,7 @@ object ClusterUtil {
* @return The number of tasks per executor.
*/
def getNumTasksPerExecutor(spark: SparkSession, log: Logger): Int = {
val confTaskCpus = getTaskCpus(spark.sparkContext, log)
val confTaskCpus = getTaskCpus(spark, log)
try {
val confCores = spark.sparkContext.getConf.get("spark.executor.cores").toInt
val tasksPerExec = confCores / confTaskCpus
Expand All @@ -44,13 +42,18 @@ object ClusterUtil {
* @return The number of rows per partition (where partitionId is the array index).
*/
def getNumRowsPerPartition(df: DataFrame, labelCol: Column): Array[Long] = {
val indexedRowCounts: Array[(Int, Long)] = df
.select(typedLit(0.toByte))
.rdd
.mapPartitionsWithIndex({case (i,rows) => Iterator((i,rows.size.toLong))}, true)
import org.apache.spark.sql.functions.{spark_partition_id, count, lit}
val partitionCounts = df
.select(spark_partition_id().as("partId"))
.groupBy("partId")
.agg(count(lit(1)).as("cnt"))
.collect()
// Get an array where the index is implicitly the partition id
indexedRowCounts.sortBy(pair => pair._1).map(pair => pair._2)
val maxPartId = if (partitionCounts.isEmpty) 0 else partitionCounts.map(_.getInt(0)).max + 1
val result = Array.fill[Long](maxPartId)(0L)
partitionCounts.foreach { row =>
result(row.getInt(0)) = row.getLong(1)
}
result
}

/** Get number of default cores from sparkSession(required) or master(optional) for 1 executor.
Expand Down Expand Up @@ -104,9 +107,9 @@ object ClusterUtil {
}
}

def getTaskCpus(sparkContext: SparkContext, log: Logger): Int = {
def getTaskCpus(spark: SparkSession, log: Logger): Int = {
try {
val taskCpusConfig = sparkContext.getConf.getOption("spark.task.cpus")
val taskCpusConfig = spark.sparkContext.getConf.getOption("spark.task.cpus")
if (taskCpusConfig.isEmpty) {
log.info("ClusterUtils did not detect spark.task.cpus config set, using default 1 instead")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ class RankingEvaluator(override val uid: String)
/** @group setParam */
def setPredictionCol(value: String): this.type = set(predictionCol, value)

// Note: RankingMetrics from MLlib requires RDD input - .rdd conversion is necessary
def getMetrics(dataset: Dataset[_]): AdvancedRankingMetrics = {
val predictionAndLabels = dataset
.select(getPredictionCol, getLabelCol)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ class SARModel(override val uid: String) extends Model[SARModel]
dstOutputColumn: String,
num: Int): DataFrame = {

// Note: CoordinateMatrix from MLlib requires RDD input - .rdd conversion is necessary
def dfToRDDMatrxEntry(dataframe: DataFrame) = {
dataframe.rdd
.flatMap(row =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package com.microsoft.azure.synapse.ml.stages
import com.microsoft.azure.synapse.ml.codegen.Wrappable
import com.microsoft.azure.synapse.ml.logging.{FeatureNames, SynapseMLLogging}
import com.microsoft.azure.synapse.ml.param.UDFParam
import org.apache.spark.SparkContext
import org.apache.spark.injections.UDFUtils
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.ml.util.Identifiable
Expand Down Expand Up @@ -59,8 +58,8 @@ class Lambda(val uid: String) extends Transformer with Wrappable with ComplexPar

def transformSchema(schema: StructType): StructType = {
if (get(transformSchemaFunc).isEmpty) {
val sc = SparkContext.getOrCreate()
val df = SparkSession.builder().getOrCreate().createDataFrame(sc.emptyRDD[Row], schema)
val spark = SparkSession.builder().getOrCreate()
val df = spark.createDataFrame(java.util.Collections.emptyList[Row](), schema)
transform(df).schema
} else {
getTransformSchema(schema)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,8 @@ import com.microsoft.azure.synapse.ml.logging.{FeatureNames, SynapseMLLogging}
import org.apache.spark.ml.Transformer
import org.apache.spark.ml.param._
import org.apache.spark.ml.util.{DefaultParamsReadable, DefaultParamsWritable, Identifiable}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types._
import org.apache.spark.sql.{DataFrame, Dataset, Row}
import org.apache.spark.sql.{DataFrame, Dataset}

object Repartition extends DefaultParamsReadable[Repartition]

Expand Down Expand Up @@ -50,12 +49,8 @@ class Repartition(val uid: String) extends Transformer with Wrappable with Defau
logTransform[DataFrame]({
if (getDisable)
dataset.toDF
else if (getN < dataset.rdd.getNumPartitions)
dataset.coalesce(getN).toDF()
else
dataset.sqlContext.createDataFrame(
dataset.rdd.repartition(getN).asInstanceOf[RDD[Row]],
dataset.schema)
dataset.repartition(getN).toDF()
}, dataset.columns.length)
Comment on lines 49 to 54
Copy link

Copilot AI Mar 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Switching from conditional coalesce/RDD-based repartitioning to unconditional dataset.repartition(getN) means reducing partitions will now always incur a full shuffle (previously it could use coalesce without shuffle when decreasing partitions). If this transformer is expected to be used for downsizing partitions, consider adding a parameter to choose coalesce vs repartition, or reintroduce a non-shuffle path when getN is less than the current partition count (without relying on RDD APIs).

Copilot uses AI. Check for mistakes.
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,14 @@ package com.microsoft.azure.synapse.ml.stages
import com.microsoft.azure.synapse.ml.codegen.Wrappable
import com.microsoft.azure.synapse.ml.core.contracts.HasLabelCol
import com.microsoft.azure.synapse.ml.logging.{FeatureNames, SynapseMLLogging}
import org.apache.spark.RangePartitioner
import org.apache.spark.ml.Transformer
import org.apache.spark.ml.param._
import org.apache.spark.ml.param.shared.HasSeed
import org.apache.spark.ml.util._
import org.apache.spark.sql.types._
import org.apache.spark.sql.{DataFrame, Dataset}
import org.apache.spark.sql.{DataFrame, Dataset, Row}
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.{col, lit, max => sqlMax, rand, row_number, spark_partition_id}

/** Constants for <code>StratifiedRepartition</code>. */
object SPConstants {
Expand Down Expand Up @@ -49,32 +50,58 @@ class StratifiedRepartition(val uid: String) extends Transformer with Wrappable
*/
override def transform(dataset: Dataset[_]): DataFrame = {
logTransform[DataFrame]({
// Count unique values in label column
val distinctLabelCounts = dataset.select(getLabelCol).groupBy(getLabelCol).count().collect()
val labelToCount = distinctLabelCounts.map(row => (row.getInt(0), row.getLong(1)))
val labelToFraction =
getMode match {
case SPConstants.Equal => getEqualLabelCount(labelToCount, dataset)
case SPConstants.Mixed =>
val equalLabelToCount = getEqualLabelCount(labelToCount, dataset)
val normalizedRatio = equalLabelToCount.map { case (label, count) => count }.sum / labelToCount.length
labelToCount.map { case (label, count) => (label, count / normalizedRatio) }.toMap
case SPConstants.Original => labelToCount.map { case (label, count) => (label, 1.0) }.toMap
case _ => throw new Exception(s"Unknown mode specified to StratifiedRepartition: $getMode")
}
val labelColIndex = dataset.schema.fieldIndex(getLabelCol)
val spdata = dataset.toDF().rdd.keyBy(row => row.getInt(labelColIndex))
.sampleByKeyExact(true, labelToFraction, getSeed)
.mapPartitions(keyToRow => keyToRow.zipWithIndex.map { case ((key, row), index) => (index, row) })
val rangePartitioner = new RangePartitioner(dataset.rdd.getNumPartitions, spdata)
val rspdata = spdata.partitionBy(rangePartitioner).mapPartitions(keyToRow =>
keyToRow.map { case (key, row) => row }).persist()
dataset.sqlContext.createDataFrame(rspdata, dataset.schema)
val df = dataset.toDF()
val labelToFraction = computeLabelFractions(df)
val sampled = stratifiedSample(df, labelToFraction)
val numPartitions = getNumPartitions(df)
roundRobinRepartition(sampled, numPartitions)
}, dataset.columns.length)
}

private def getEqualLabelCount(labelToCount: Array[(Int, Long)], dataset: Dataset[_]): Map[Int, Double] = {
val maxLabelCount = Math.max(labelToCount.map { case (label, count) => count }.max, dataset.rdd.getNumPartitions)
private def computeLabelFractions(df: DataFrame): Map[Int, Double] = {
val distinctLabelCounts = df.select(getLabelCol).groupBy(getLabelCol).count().collect()
val labelToCount = distinctLabelCounts.map(row => (row.getInt(0), row.getLong(1)))
getMode match {
case SPConstants.Equal => getEqualLabelCount(labelToCount, df)
case SPConstants.Mixed =>
val equalLabelToCount = getEqualLabelCount(labelToCount, df)
val normalizedRatio = equalLabelToCount.map { case (_, count) => count }.sum / labelToCount.length
labelToCount.map { case (label, count) => (label, count / normalizedRatio) }.toMap
case SPConstants.Original => labelToCount.map { case (label, _) => (label, 1.0) }.toMap
case _ => throw new Exception(s"Unknown mode specified to StratifiedRepartition: $getMode")
}
}

private def stratifiedSample(df: DataFrame, labelToFraction: Map[Int, Double]): DataFrame = {
val spark = df.sparkSession
val emptyDF = spark.createDataFrame(java.util.Collections.emptyList[Row](), df.schema)
val labelDFs = labelToFraction.map { case (label, fraction) =>
val labelData = df.filter(col(getLabelCol) === lit(label))
val wholeReplicates = math.floor(fraction).toInt
val fractionalPart = fraction - wholeReplicates
val wholePart = if (wholeReplicates > 0) {
(1 to wholeReplicates).map(_ => labelData).reduce(_ union _)
} else emptyDF
val fracPart = if (fractionalPart > 0) {
labelData.sample(withReplacement = false, fractionalPart, getSeed)
} else emptyDF
wholePart.union(fracPart)
}
labelDFs.reduce(_ union _)
}

private def getNumPartitions(df: DataFrame): Int =
df.select(spark_partition_id().as("_pid")).agg(sqlMax("_pid")).head().getInt(0) + 1

private def roundRobinRepartition(df: DataFrame, numPartitions: Int): DataFrame = {
val windowSpec = Window.partitionBy(col(getLabelCol)).orderBy(rand(getSeed))
val withPartition = df.withColumn("_rr_idx", row_number().over(windowSpec) % lit(numPartitions))
withPartition.repartitionByRange(numPartitions, col("_rr_idx")).drop("_rr_idx")
}

private def getEqualLabelCount(labelToCount: Array[(Int, Long)], df: DataFrame): Map[Int, Double] = {
val numPartitions = getNumPartitions(df)
val maxLabelCount = Math.max(labelToCount.map { case (_, count) => count }.max, numPartitions)
labelToCount.map { case (label, count) => (label, maxLabelCount.toDouble / count) }.toMap
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,9 @@ class ComputeModelStatistics(override val uid: String) extends Transformer
.drop(Array(predictionColumnName, labelColumnName))
}

// Note: MLlib metrics (BinaryClassificationMetrics, MulticlassMetrics, RegressionMetrics)
// require RDD input. These .rdd conversions are necessary until Spark provides
// DataFrame-based equivalents for all metrics.
private def selectAndCastToRDD(dataset: Dataset[_],
predictionColumnName: String,
labelColumnName: String): RDD[(Double, Double)] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ class ComputePerInstanceStatistics(override val uid: String) extends Transformer
if (levels.get.length > 2) levels.get.length else 2
} else {
// Otherwise compute unique levels
dataset.select(col(labelColumnName).cast(DoubleType)).rdd.distinct().count().toInt
dataset.select(col(labelColumnName).cast(DoubleType)).distinct().count().toInt
}

val logLossFunc = udf((scoredLabel: Double, scores: org.apache.spark.ml.linalg.Vector) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package org.apache.spark.ml

import com.microsoft.azure.synapse.ml.core.serialize.ComplexParam
import org.apache.hadoop.fs.Path
import org.apache.spark.SparkContext
import org.apache.spark.ml.param.{ParamPair, Params}
import org.apache.spark.ml.util.DefaultParamsReader.Metadata
import org.apache.spark.ml.util._
Expand Down Expand Up @@ -37,7 +36,7 @@ private[ml] class ComplexParamsWriter(instance: Params) extends MLWriter {
override protected def saveImpl(path: String): Unit = {
val complexParamLocs = ComplexParamsWriter.getComplexParamLocations(instance, path)
val complexParamJson = ComplexParamsWriter.getComplexMetadata(complexParamLocs)
ComplexParamsWriter.saveMetadata(instance, path, sc, complexParamJson)
ComplexParamsWriter.saveMetadata(instance, path, sparkSession, complexParamJson)
ComplexParamsWriter.saveComplexParams(path, complexParamLocs, shouldOverwrite)
}
}
Expand Down Expand Up @@ -90,12 +89,12 @@ private[ml] object ComplexParamsWriter {
*/
def saveMetadata(instance: Params,
path: String,
sc: SparkContext,
spark: SparkSession,
extraMetadata: Option[JObject] = None,
paramMap: Option[JValue] = None): Unit = {
val metadataPath = new Path(path, "metadata").toString
val metadataJson = getMetadataToSave(instance, sc, extraMetadata, paramMap)
sc.parallelize(Seq(metadataJson), 1).saveAsTextFile(metadataPath)
val metadataJson = getMetadataToSave(instance, spark, extraMetadata, paramMap)
spark.createDataFrame(Seq(Tuple1(metadataJson))).toDF("value").write.text(metadataPath)
}

/** Helper for [[saveMetadata()]] which extracts the JSON to save.
Expand All @@ -104,7 +103,7 @@ private[ml] object ComplexParamsWriter {
* @see [[saveMetadata()]] for details on what this includes.
*/
def getMetadataToSave(instance: Params,
sc: SparkContext,
spark: SparkSession,
extraMetadata: Option[JObject] = None,
paramMap: Option[JValue] = None): String = {
val uid = instance.uid
Expand All @@ -121,7 +120,7 @@ private[ml] object ComplexParamsWriter {
}.toList)
val basicMetadata = ("class" -> cls) ~
("timestamp" -> System.currentTimeMillis()) ~
("sparkVersion" -> sc.version) ~
("sparkVersion" -> spark.version) ~
("uid" -> uid) ~
("paramMap" -> jsonParams) ~
("defaultParamMap" -> jsonDefaultParams)
Expand Down
Loading
Loading