Skip to content

Commit 3309028

Browse files
authored
SNOW-1002378 Add new Parameter "string_timestamp_format" (#539)
* add new parameter * fix test
1 parent 1daa083 commit 3309028

File tree

3 files changed

+71
-5
lines changed

3 files changed

+71
-5
lines changed

src/it/scala/net/snowflake/spark/snowflake/DataTypesIntegrationSuite.scala

+46-3
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,13 @@
1616

1717
package net.snowflake.spark.snowflake
1818

19-
import java.sql.Timestamp
19+
import java.sql.{SQLException, Timestamp}
2020
import java.text.{DateFormat, SimpleDateFormat}
21-
2221
import org.apache.spark.sql.{Row, SaveMode}
2322
import net.snowflake.spark.snowflake.Utils.SNOWFLAKE_SOURCE_NAME
2423
import net.snowflake.spark.snowflake.Utils.SNOWFLAKE_SOURCE_SHORT_NAME
2524
import org.apache.spark.sql.functions._
26-
import org.apache.spark.sql.types.{DateType, TimestampType}
25+
import org.apache.spark.sql.types.{DateType, StringType, StructField, StructType, TimestampType}
2726

2827
/**
2928
* Created by mzukowski on 8/12/16.
@@ -98,6 +97,50 @@ class DataTypesIntegrationSuite extends IntegrationSuiteBase {
9897
)
9998
}
10099

100+
test("change timestamp format") {
101+
jdbcUpdate(s"CREATE OR REPLACE TABLE $test_table (START_TIME TIMESTAMP)")
102+
103+
val spark = sparkSession
104+
import spark.implicits._
105+
106+
val df1 = Seq("2023-11-28 10:23:59.123456").toDF()
107+
df1.write.format(SNOWFLAKE_SOURCE_NAME)
108+
.options(connectorOptionsNoTable).option("dbtable", test_table)
109+
.option(Parameters.PARAM_STRING_TIMESTAMP_FORMAT, "YYYY-MM-DD HH24:MI:SS.FF9")
110+
.mode(SaveMode.Append).save()
111+
112+
val df = sparkSession.read.format(SNOWFLAKE_SOURCE_NAME)
113+
.options(connectorOptionsNoTable).option("dbtable", test_table).load()
114+
assert(df.collect().head.getTimestamp(0).toString.startsWith("2023-11-28 10:23:59.123"))
115+
116+
// it doesn't work if source dataframe has timestamp column
117+
val dateFormat: DateFormat = new SimpleDateFormat("dd/MM/yyyy HH:mm:ss.SSS")
118+
val time: Timestamp =
119+
new Timestamp(dateFormat.parse("28/10/1996 00:00:00.000").getTime)
120+
121+
val data = sc.parallelize(Seq(
122+
Row(time, "2023-11-28 10:23:59.123456")
123+
))
124+
125+
val schema = new StructType(
126+
Array(
127+
StructField("time", TimestampType),
128+
StructField("str", StringType)
129+
)
130+
)
131+
132+
jdbcUpdate(s"CREATE OR REPLACE TABLE $test_table (START_TIME TIMESTAMP, END_TIME TIMESTAMP)")
133+
134+
val df2 = sparkSession.createDataFrame(data, schema)
135+
136+
assertThrows[SQLException]{
137+
df2.write.format(SNOWFLAKE_SOURCE_NAME).options(connectorOptionsNoTable)
138+
.options(connectorOptionsNoTable).option("dbtable", test_table)
139+
.option(Parameters.PARAM_STRING_TIMESTAMP_FORMAT, "YYYY-MM-DD HH24:MI:SS.FF9")
140+
.mode(SaveMode.Append).save()
141+
}
142+
}
143+
101144
test("insert timestamp into date") {
102145
jdbcUpdate(s"create or replace table $test_table(i date)")
103146

src/main/scala/net/snowflake/spark/snowflake/Parameters.scala

+6
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,9 @@ object Parameters {
114114
val PARAM_TIMESTAMP_NTZ_OUTPUT_FORMAT: String = knownParam("timestamp_ntz_output_format")
115115
val PARAM_TIMESTAMP_LTZ_OUTPUT_FORMAT: String = knownParam("timestamp_ltz_output_format")
116116
val PARAM_TIMESTAMP_TZ_OUTPUT_FORMAT: String = knownParam("timestamp_tz_output_format")
117+
// changed the timestamp format when copy data from string column to timestamp column.
118+
// it only works when source dataframe doesn't have timestamp columns.
119+
val PARAM_STRING_TIMESTAMP_FORMAT: String = knownParam("string_timestamp_format")
117120
val TIMESTAMP_OUTPUT_FORMAT_SF_CURRENT: String = "sf_current"
118121
val PARAM_JDBC_QUERY_RESULT_FORMAT: String = knownParam(
119122
"jdbc_query_result_format"
@@ -1017,6 +1020,9 @@ object Parameters {
10171020
}
10181021
}
10191022

1023+
def getStringTimestampFormat: Option[String] =
1024+
parameters.get(PARAM_STRING_TIMESTAMP_FORMAT)
1025+
10201026
def streamingStage: Option[String] = parameters.get(PARAM_STREAMING_STAGE)
10211027

10221028
def storagePath: Option[String] = {

src/main/scala/net/snowflake/spark/snowflake/io/StageWriter.scala

+19-2
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import net.snowflake.spark.snowflake.io.SupportedFormat.SupportedFormat
2525
import net.snowflake.spark.snowflake.DefaultJDBCWrapper.DataBaseOperations
2626
import net.snowflake.spark.snowflake.test.{TestHook, TestHookFlag}
2727
import org.apache.spark.rdd.RDD
28-
import org.apache.spark.sql.types.{BinaryType, StructType}
28+
import org.apache.spark.sql.types.{BinaryType, StructType, TimestampType}
2929
import org.apache.spark.sql.{SQLContext, SaveMode}
3030
import org.slf4j.LoggerFactory
3131

@@ -191,6 +191,8 @@ private[io] object StageWriter {
191191

192192
private[io] val log = LoggerFactory.getLogger(getClass)
193193

194+
private val DEFAULT_TIMESTAMP_FORMAT: String = "TZHTZM YYYY-MM-DD HH24:MI:SS.FF9"
195+
194196
def writeToStage(sqlContext: SQLContext,
195197
rdd: RDD[String],
196198
schema: StructType,
@@ -892,6 +894,21 @@ private[io] object StageWriter {
892894

893895
val mappingFromString = getMappingFromString(mappingList, fromString)
894896

897+
val hasTimestampColumn: Boolean =
898+
schema.exists(field => field.dataType == TimestampType)
899+
900+
val timestampFormat: String =
901+
if (params.getStringTimestampFormat.isEmpty) {
902+
DEFAULT_TIMESTAMP_FORMAT
903+
} else if (hasTimestampColumn) {
904+
log.error("The source Dataframe contains timestamp columns, "
905+
+ "the parameter `string_timestamp_format` has been ignored.")
906+
DEFAULT_TIMESTAMP_FORMAT
907+
} else {
908+
params.getStringTimestampFormat.get
909+
}
910+
911+
895912
val formatString =
896913
format match {
897914
case SupportedFormat.CSV =>
@@ -901,7 +918,7 @@ private[io] object StageWriter {
901918
| FIELD_DELIMITER='|'
902919
| NULL_IF=()
903920
| FIELD_OPTIONALLY_ENCLOSED_BY='"'
904-
| TIMESTAMP_FORMAT='TZHTZM YYYY-MM-DD HH24:MI:SS.FF9'
921+
| TIMESTAMP_FORMAT='$timestampFormat'
905922
| DATE_FORMAT='TZHTZM YYYY-MM-DD HH24:MI:SS.FF9'
906923
| BINARY_FORMAT=BASE64
907924
| )

0 commit comments

Comments
 (0)