@@ -790,8 +790,9 @@ class UMAP(UMAPClass, _CumlEstimatorSupervised, _UMAPCumlParams):
790790
791791 sample_fraction : float (optional, default=1.0)
792792 The fraction of the dataset to be used for fitting the model. Since fitting is done on a single node, very large
793- datasets must be subsampled to fit within the node's memory and execute in a reasonable time. Smaller fractions
794- will result in faster training, but may result in sub-optimal embeddings.
793+ datasets must be subsampled to fit within the node's memory. Smaller fractions will result in faster training, but
794+ may decrease embedding quality. Note: this is not guaranteed to provide exactly the fraction specified of the total
795+ count of the given DataFrame.
795796
796797 featuresCol: str or List[str]
797798 The feature column names, spark-rapids-ml supports vector, array and columnar as the input.\n
@@ -1463,22 +1464,30 @@ def write_sparse_array(array: scipy.sparse.spmatrix, df_dir: str) -> None:
14631464 schema = indices_data_schema ,
14641465 )
14651466
1466- indptr_df .write .parquet (
1467- os .path .join (df_dir , "indptr.parquet" ), mode = "overwrite"
1468- )
1469- indices_data_df .write .parquet (
1470- os .path .join (df_dir , "indices_data.parquet" ), mode = "overwrite"
1471- )
1467+ indptr_df .write .parquet (os .path .join (df_dir , "indptr.parquet" ))
1468+ indices_data_df .write .parquet (os .path .join (df_dir , "indices_data.parquet" ))
14721469
14731470 def write_dense_array (array : np .ndarray , df_path : str ) -> None :
1471+ assert (
1472+ spark .conf .get ("spark.sql.execution.arrow.pyspark.enabled" ) == "true"
1473+ ), "spark.sql.execution.arrow.pyspark.enabled must be set to true to persist array attributes"
1474+
14741475 schema = StructType (
14751476 [
1476- StructField (f"_ { i } " , FloatType (), False )
1477- for i in range ( 1 , array . shape [ 1 ] + 1 )
1477+ StructField ("row_id " , LongType (), False ),
1478+ StructField ( "data" , ArrayType ( FloatType (), False ), False ),
14781479 ]
14791480 )
1480- data_df = spark .createDataFrame (pd .DataFrame (array ), schema = schema )
1481- data_df .write .parquet (df_path , mode = "overwrite" )
1481+ data_df = spark .createDataFrame (
1482+ pd .DataFrame (
1483+ {
1484+ "row_id" : range (array .shape [0 ]),
1485+ "data" : list (array ),
1486+ }
1487+ ),
1488+ schema = schema ,
1489+ )
1490+ data_df .write .parquet (df_path )
14821491
14831492 DefaultParamsWriter .saveMetadata (
14841493 self .instance ,
@@ -1491,12 +1500,12 @@ def write_dense_array(array: np.ndarray, df_path: str) -> None:
14911500 },
14921501 )
14931502
1503+ # get a copy, since we're going to modify the array attributes
14941504 model_attributes = self .instance ._get_model_attributes ()
14951505 assert model_attributes is not None
1506+ model_attributes = model_attributes .copy ()
14961507
14971508 data_path = os .path .join (path , "data" )
1498- if not os .path .exists (data_path ):
1499- os .makedirs (data_path )
15001509
15011510 for key in ["embedding_" , "raw_data_" ]:
15021511 array = model_attributes [key ]
@@ -1547,8 +1556,10 @@ def read_sparse_array(
15471556 return scipy .sparse .csr_matrix ((data , indices , indptr ), shape = csr_shape )
15481557
15491558 def read_dense_array (df_path : str ) -> np .ndarray :
1550- data_df = spark .read .parquet (df_path )
1551- return np .array (data_df .collect (), dtype = np .float32 )
1559+ data_df = spark .read .parquet (df_path ).orderBy ("row_id" )
1560+ pdf = data_df .toPandas ()
1561+ assert type (pdf ) == pd .DataFrame
1562+ return np .array (list (pdf .data ), dtype = np .float32 )
15521563
15531564 metadata = DefaultParamsReader .loadMetadata (path , self .sc )
15541565 data_path = os .path .join (path , "data" )
0 commit comments