Skip to content

Commit 3c85815

Browse files
authored
SNOW-1708660 Enable parquet by default (#595)
* enable parquet by default * use parquet * add test * fix test * fix test
1 parent 53d404e commit 3c85815

File tree

6 files changed

+89
-9
lines changed

6 files changed

+89
-9
lines changed

src/it/scala/net/snowflake/spark/snowflake/ParquetSuite.scala

+51
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ class ParquetSuite extends IntegrationSuiteBase {
2525
val test_column_map_not_match: String = Random.alphanumeric.filter(_.isLetter).take(10).mkString
2626
val test_nested_dataframe: String = Random.alphanumeric.filter(_.isLetter).take(10).mkString
2727
val test_no_staging_table: String = Random.alphanumeric.filter(_.isLetter).take(10).mkString
28+
val test_table_name: String = Random.alphanumeric.filter(_.isLetter).take(10).mkString
2829

2930
override def afterAll(): Unit = {
3031
jdbcUpdate(s"drop table if exists $test_all_type")
@@ -41,6 +42,7 @@ class ParquetSuite extends IntegrationSuiteBase {
4142
jdbcUpdate(s"drop table if exists $test_column_map_not_match")
4243
jdbcUpdate(s"drop table if exists $test_nested_dataframe")
4344
jdbcUpdate(s"drop table if exists $test_no_staging_table")
45+
jdbcUpdate(s"drop table if exists $test_table_name")
4446
super.afterAll()
4547
}
4648

@@ -707,4 +709,53 @@ class ParquetSuite extends IntegrationSuiteBase {
707709
val res = sparkSession.sql(s"show tables like '%${test_all_type}_STAGING%'").collect()
708710
assert(res.length == 0)
709711
}
712+
713+
test("use parquet in structured type by default") {
714+
// use CSV by default
715+
sparkSession
716+
.sql("select 1")
717+
.write
718+
.format(SNOWFLAKE_SOURCE_NAME)
719+
.options(connectorOptionsNoTable)
720+
.option("dbtable", test_table_name)
721+
.mode(SaveMode.Overwrite)
722+
.save()
723+
assert(Utils.getLastCopyLoad.contains("TYPE=CSV"))
724+
725+
// use Parquet on structured types
726+
sparkSession
727+
.sql("select array(1, 2)")
728+
.write
729+
.format(SNOWFLAKE_SOURCE_NAME)
730+
.options(connectorOptionsNoTable)
731+
.option("dbtable", test_table_name)
732+
.mode(SaveMode.Overwrite)
733+
.save()
734+
assert(Utils.getLastCopyLoad.contains("TYPE=PARQUET"))
735+
736+
// use Json on structured types when PARAM_USE_JSON_IN_STRUCTURED_DATA is true
737+
sparkSession
738+
.sql("select array(1, 2)")
739+
.write
740+
.format(SNOWFLAKE_SOURCE_NAME)
741+
.options(connectorOptionsNoTable)
742+
.option("dbtable", test_table_name)
743+
.option(Parameters.PARAM_USE_JSON_IN_STRUCTURED_DATA, "true")
744+
.mode(SaveMode.Overwrite)
745+
.save()
746+
assert(Utils.getLastCopyLoad.contains("TYPE = JSON"))
747+
748+
// PARAM_USE_PARQUET_IN_WRITE can overwrite PARAM_USE_JSON_IN_STRUCTURED_DATA
749+
sparkSession
750+
.sql("select array(1, 2)")
751+
.write
752+
.format(SNOWFLAKE_SOURCE_NAME)
753+
.options(connectorOptionsNoTable)
754+
.option("dbtable", test_table_name)
755+
.option(Parameters.PARAM_USE_JSON_IN_STRUCTURED_DATA, "true")
756+
.option(Parameters.PARAM_USE_PARQUET_IN_WRITE, "true")
757+
.mode(SaveMode.Overwrite)
758+
.save()
759+
assert(Utils.getLastCopyLoad.contains("TYPE=PARQUET"))
760+
}
710761
}

src/it/scala/net/snowflake/spark/snowflake/SnowflakeResultSetRDDSuite.scala

+5
Original file line numberDiff line numberDiff line change
@@ -1866,6 +1866,7 @@ class SnowflakeResultSetRDDSuite extends IntegrationSuiteBase {
18661866
.format(SNOWFLAKE_SOURCE_NAME)
18671867
.options(thisConnectorOptionsNoTable)
18681868
.option("dbtable", test_table_write)
1869+
.option(Parameters.PARAM_USE_JSON_IN_STRUCTURED_DATA, "true")
18691870
.mode(SaveMode.Overwrite)
18701871
.save()
18711872

@@ -1922,6 +1923,7 @@ class SnowflakeResultSetRDDSuite extends IntegrationSuiteBase {
19221923
.format(SNOWFLAKE_SOURCE_NAME)
19231924
.options(localSFOption)
19241925
.option("dbtable", test_table_write)
1926+
.option(Parameters.PARAM_USE_JSON_IN_STRUCTURED_DATA, "true")
19251927
.mode(SaveMode.Overwrite)
19261928
.save()
19271929

@@ -2004,6 +2006,7 @@ class SnowflakeResultSetRDDSuite extends IntegrationSuiteBase {
20042006
.format(SNOWFLAKE_SOURCE_NAME)
20052007
.options(thisConnectorOptionsNoTable)
20062008
.option("dbtable", test_table_write)
2009+
.option(Parameters.PARAM_USE_JSON_IN_STRUCTURED_DATA, "true")
20072010
.mode(SaveMode.Overwrite)
20082011
.save()
20092012

@@ -2019,6 +2022,7 @@ class SnowflakeResultSetRDDSuite extends IntegrationSuiteBase {
20192022
.format(SNOWFLAKE_SOURCE_NAME)
20202023
.options(thisConnectorOptionsNoTable)
20212024
.option("dbtable", test_table_write)
2025+
.option(Parameters.PARAM_USE_JSON_IN_STRUCTURED_DATA, "true")
20222026
.option(Parameters.PARAM_INTERNAL_QUOTE_JSON_FIELD_NAME, "false")
20232027
.mode(SaveMode.Overwrite)
20242028
.save()
@@ -2031,6 +2035,7 @@ class SnowflakeResultSetRDDSuite extends IntegrationSuiteBase {
20312035
.options(thisConnectorOptionsNoTable)
20322036
.option("dbtable", test_table_write)
20332037
.option(Parameters.PARAM_INTERNAL_QUOTE_JSON_FIELD_NAME, "false")
2038+
.option(Parameters.PARAM_USE_JSON_IN_STRUCTURED_DATA, "true")
20342039
.mode(SaveMode.Overwrite)
20352040
.save()
20362041

src/it/scala/net/snowflake/spark/snowflake/VariantTypeSuite.scala

+2
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,7 @@ class VariantTypeSuite extends IntegrationSuiteBase {
164164
.options(connectorOptionsNoTable)
165165
.option("dbtable", tableName2)
166166
.option(Parameters.PARAM_INTERNAL_USE_PARSE_JSON_FOR_WRITE, "true")
167+
.option(Parameters.PARAM_USE_JSON_IN_STRUCTURED_DATA, "true")
167168
.mode(SaveMode.Overwrite)
168169
.save()
169170

@@ -242,6 +243,7 @@ class VariantTypeSuite extends IntegrationSuiteBase {
242243
.format(SNOWFLAKE_SOURCE_NAME)
243244
.options(connectorOptionsNoTable)
244245
.option("dbtable", tableName4)
246+
.option(Parameters.PARAM_USE_JSON_IN_STRUCTURED_DATA, "true")
245247
.mode(SaveMode.Overwrite)
246248
.save()
247249

src/main/scala/net/snowflake/spark/snowflake/Parameters.scala

+25-3
Original file line numberDiff line numberDiff line change
@@ -260,9 +260,18 @@ object Parameters {
260260
val BOOLEAN_VALUES_FALSE: Set[String] =
261261
Set("off", "no", "false", "0", "disabled")
262262

263-
// enable parquet format
263+
// enable parquet format when loading data from Spark to Snowflake.
264+
// When enabled, Spark connector will only use Parquet file format.
264265
val PARAM_USE_PARQUET_IN_WRITE: String = knownParam("use_parquet_in_write")
265266

267+
// By default, Spark connector uses CSV format when loading data from Spark to Snowflake.
268+
// If the dataframe contains any structured type, Spark connector will use Parquet
269+
// format instead of CSV.
270+
// When this parameter is enabled, Spark connector will use JSON format when loading
271+
// structured data but not Parquet.
272+
// it will be ignored if USE_PARQUET_IN_WRITE parameter is enabled.
273+
val PARAM_USE_JSON_IN_STRUCTURED_DATA: String = knownParam("use_json_in_structured_data")
274+
266275
/**
267276
* Helper method to check if a given string represents some form
268277
* of "true" value, see BOOLEAN_VALUES_TRUE
@@ -297,7 +306,8 @@ object Parameters {
297306
PARAM_TIMESTAMP_LTZ_OUTPUT_FORMAT -> "TZHTZM YYYY-MM-DD HH24:MI:SS.FF3",
298307
PARAM_TIMESTAMP_TZ_OUTPUT_FORMAT -> "TZHTZM YYYY-MM-DD HH24:MI:SS.FF3",
299308
PARAM_TRIM_SPACE -> "false",
300-
PARAM_USE_PARQUET_IN_WRITE -> "false"
309+
PARAM_USE_PARQUET_IN_WRITE -> "false",
310+
PARAM_USE_JSON_IN_STRUCTURED_DATA -> "false"
301311

302312
)
303313

@@ -613,13 +623,25 @@ object Parameters {
613623
def createPerQueryTempDir(): String = Utils.makeTempPath(rootTempDir)
614624

615625
/**
616-
* Use parquet form in download by default
626+
* Use parquet format when loading data from Spark to Snowflake
617627
*/
618628
def useParquetInWrite(): Boolean = {
619629
isTrue(parameters.getOrElse(PARAM_USE_PARQUET_IN_WRITE, "false"))
620630

621631
}
622632

633+
/**
634+
* Use JSON format when loading structured data from Spark to Snowflake
635+
*/
636+
def useJsonInWrite(): Boolean = {
637+
if (useParquetInWrite()) {
638+
// USE_PARQUET_IN_WRITE parameter can overwrite this parameter
639+
false
640+
} else {
641+
isTrue(parameters.getOrElse(PARAM_USE_JSON_IN_STRUCTURED_DATA, "false"))
642+
}
643+
}
644+
623645
/**
624646
* The Snowflake table to be used as the target when loading or writing data.
625647
*/

src/main/scala/net/snowflake/spark/snowflake/SnowflakeWriter.scala

+3-3
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ private[snowflake] class SnowflakeWriter(jdbcWrapper: JDBCWrapper) {
5757
if (params.useParquetInWrite()) {
5858
SupportedFormat.PARQUET
5959
} else if (Utils.containVariant(data.schema)){
60-
SupportedFormat.JSON
60+
if (params.useJsonInWrite()) SupportedFormat.JSON else SupportedFormat.PARQUET
6161
}
6262
else {
6363
SupportedFormat.CSV
@@ -74,7 +74,7 @@ private[snowflake] class SnowflakeWriter(jdbcWrapper: JDBCWrapper) {
7474
)
7575
params.setColumnMap(Option(data.schema), toSchema)
7676
} finally conn.close()
77-
} else if (params.columnMap.isDefined && params.useParquetInWrite()){
77+
} else if (params.columnMap.isDefined && format == SupportedFormat.PARQUET){
7878
val conn = jdbcWrapper.getConnector(params)
7979
try {
8080
val toSchema = Utils.removeQuote(
@@ -94,7 +94,7 @@ private[snowflake] class SnowflakeWriter(jdbcWrapper: JDBCWrapper) {
9494
} finally conn.close()
9595
}
9696

97-
if (params.useParquetInWrite()){
97+
if (format == SupportedFormat.PARQUET){
9898
val conn = jdbcWrapper.getConnector(params)
9999
try{
100100
if (jdbcWrapper.tableExists(params, params.table.get.name)){

src/main/scala/net/snowflake/spark/snowflake/io/StageWriter.scala

+3-3
Original file line numberDiff line numberDiff line change
@@ -327,7 +327,7 @@ private[io] object StageWriter {
327327
if (!tableExists)
328328
{
329329
writeTableState.createTable(tableName,
330-
if (params.useParquetInWrite()) params.toSnowflakeSchema(schema) else schema,
330+
if (format == SupportedFormat.PARQUET) params.toSnowflakeSchema(schema) else schema,
331331
params)
332332
} else if (params.truncateTable && saveMode == SaveMode.Overwrite) {
333333
writeTableState.truncateTable(tableName)
@@ -425,7 +425,7 @@ private[io] object StageWriter {
425425
if (saveMode == SaveMode.Overwrite || !tableExists)
426426
{
427427
conn.createTable(targetTable.name,
428-
if (params.useParquetInWrite()) params.toSnowflakeSchema(schema) else schema,
428+
if (format == SupportedFormat.PARQUET) params.toSnowflakeSchema(schema) else schema,
429429
params,
430430
overwrite = false, temporary = false)
431431
}
@@ -904,7 +904,7 @@ private[io] object StageWriter {
904904
val fromString = ConstantString(s"FROM @$tempStage/$prefix/") !
905905

906906
val mappingList: Option[List[(Int, String)]] =
907-
if (params.useParquetInWrite()) None else params.columnMap match {
907+
if (format == SupportedFormat.PARQUET) None else params.columnMap match {
908908
case Some(map) =>
909909
Some(map.toList.map {
910910
case (key, value) =>

0 commit comments

Comments
 (0)