Skip to content

Commit d7f4823

Browse files
committed
Support for wide row tables on Redshift
1 parent 1cc45f9 commit d7f4823

File tree

6 files changed

+63
-20
lines changed

6 files changed

+63
-20
lines changed

config/redshift.config.reference.hocon

+5-1
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,11 @@
3232
# Custom JDBC configuration. Optional, default value { "ssl": true }
3333
"jdbc": { "ssl": true },
3434
# MAXERROR, amount of acceptable loading errors. Optional, default value 10
35-
"maxError": 10
35+
"maxError": 10,
36+
# unlock experimental features
37+
"experimental": {
38+
"enableWideRow": false
39+
}
3640
},
3741

3842
"schedules": {

modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/config/StorageTarget.scala

+7-1
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,8 @@ object StorageTarget {
8181
username: String,
8282
password: PasswordConfig,
8383
maxError: Int,
84-
sshTunnel: Option[TunnelConfig]) extends StorageTarget {
84+
sshTunnel: Option[TunnelConfig],
85+
experimental: RedshiftExperimentalFeatures) extends StorageTarget {
8586
override def driver: String = "com.amazon.redshift.jdbc42.Driver"
8687

8788
override def connectionUrl: String = s"jdbc:redshift://$host:$port/$database"
@@ -275,6 +276,8 @@ object StorageTarget {
275276
j.sslRootCert, j.tcpKeepAlive, j.tcpKeepAliveMinutes))
276277
}
277278

279+
final case class RedshiftExperimentalFeatures(enableWideRow: Boolean)
280+
278281
/** Reference to encrypted entity inside EC2 Parameter Store */
279282
final case class ParameterStoreConfig(parameterName: String)
280283

@@ -328,6 +331,9 @@ object StorageTarget {
328331
*/
329332
final case class TunnelConfig(bastion: BastionConfig, localPort: Int, destination: DestinationConfig)
330333

334+
implicit def redshiftExperimentalFeaturesDecoder: Decoder[RedshiftExperimentalFeatures] =
335+
deriveDecoder[RedshiftExperimentalFeatures]
336+
331337
implicit def redshiftConfigDecoder: Decoder[Redshift] =
332338
deriveDecoder[Redshift]
333339

modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/ConfigSpec.scala

+2-1
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,8 @@ object ConfigSpec {
104104
"admin",
105105
StorageTarget.PasswordConfig.PlainText("Supersecret1"),
106106
10,
107-
None
107+
None,
108+
StorageTarget.RedshiftExperimentalFeatures(false)
108109
)
109110
val exampleSnowflake = StorageTarget.Snowflake(
110111
Some("us-west-2"),

modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/config/StorageTargetSpec.scala

+4-2
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,8 @@ class StorageTargetSpec extends Specification {
3333
"schema": "atomic",
3434
"maxError": 1,
3535
"compRows": 20000,
36-
"purpose": "ENRICHED_EVENTS"
36+
"purpose": "ENRICHED_EVENTS",
37+
"experimental": {"enableWideRow": true}
3738
}"""
3839

3940
val expected = StorageTarget.Redshift(
@@ -46,7 +47,8 @@ class StorageTargetSpec extends Specification {
4647
"ADD HERE",
4748
StorageTarget.PasswordConfig.PlainText("ADD HERE"),
4849
1,
49-
None)
50+
None,
51+
StorageTarget.RedshiftExperimentalFeatures(true))
5052

5153
config.as[StorageTarget] must beRight(expected)
5254
}

modules/redshift-loader/src/main/resources/application.conf

+3
Original file line numberDiff line numberDiff line change
@@ -4,5 +4,8 @@
44
"port": 5439
55
"jdbc": { "ssl": true }
66
"maxError": 10
7+
"experimental": {
8+
"enableWideRow": false
9+
}
710
}
811
}

modules/redshift-loader/src/main/scala/com/snowplowanalytics/snowplow/loader/redshift/Redshift.scala

+42-15
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ import com.snowplowanalytics.iglu.schemaddl.redshift.generators.{DdlFile, DdlGen
2121
import com.snowplowanalytics.snowplow.rdbloader.LoadStatements
2222
import com.snowplowanalytics.snowplow.rdbloader.common.Common
2323
import com.snowplowanalytics.snowplow.rdbloader.common.config.TransformerConfig.Compression
24+
import com.snowplowanalytics.snowplow.rdbloader.common.LoaderMessage.TypesInfo.WideRow.WideRowFormat.{JSON, PARQUET}
25+
import com.snowplowanalytics.snowplow.rdbloader.common.LoaderMessage.TypesInfo
2426
import com.snowplowanalytics.snowplow.rdbloader.config.{Config, StorageTarget}
2527
import com.snowplowanalytics.snowplow.rdbloader.db.Columns.{ColumnsToCopy, ColumnsToSkip, EventTableColumns}
2628
import com.snowplowanalytics.snowplow.rdbloader.db.Migration.{Block, Entity, Item, NoPreStatements, NoStatements}
@@ -43,7 +45,7 @@ object Redshift {
4345

4446
def build(config: Config[StorageTarget]): Either[String, Target] = {
4547
config.storage match {
46-
case StorageTarget.Redshift(_, _, _, _, roleArn, schema, _, _, maxError, _) =>
48+
case StorageTarget.Redshift(_, _, _, _, roleArn, schema, _, _, maxError, _, experimentalFeatures) =>
4749
val result = new Target {
4850

4951
override val requiresEventsColumns: Boolean = false
@@ -67,19 +69,39 @@ object Redshift {
6769
Block(preTransaction.reverse, inTransaction.reverse, Entity.Table(schema, target))
6870
}
6971

70-
override def extendTable(info: ShreddedType.Info): Option[Block] =
71-
throw new IllegalStateException("Redshift Loader does not support loading wide row")
72-
73-
override def getLoadStatements(discovery: DataDiscovery, eventTableColumns: EventTableColumns, loadAuthMethod: LoadAuthMethod): LoadStatements = {
74-
val shreddedStatements = discovery
75-
.shreddedTypes
76-
.filterNot(_.isAtomic)
77-
.map(shreddedType => Statement.ShreddedCopy(shreddedType, discovery.compression))
78-
79-
val atomic = Statement.EventsCopy(discovery.base, discovery.compression, ColumnsToCopy(AtomicColumns.Columns), ColumnsToSkip.none, discovery.typesInfo, loadAuthMethod)
80-
NonEmptyList(atomic, shreddedStatements)
72+
override def extendTable(info: ShreddedType.Info): Option[Block] = {
73+
val columnName = info.getNameFull
74+
val frTableName = Fragment.const(EventsTable.withSchema(schema))
75+
val addColumnSql = sql"ALTER TABLE $frTableName ADD COLUMN $columnName SUPER"
76+
Some(Block(List(Item.AddColumn(addColumnSql, Nil)), Nil, Entity.Column(info)))
8177
}
8278

79+
override def getLoadStatements(discovery: DataDiscovery, eventTableColumns: EventTableColumns, loadAuthMethod: LoadAuthMethod): LoadStatements =
80+
discovery.typesInfo match {
81+
case TypesInfo.Shredded(_) =>
82+
val shreddedStatements = discovery
83+
.shreddedTypes
84+
.filterNot(_.isAtomic)
85+
.map(shreddedType => Statement.ShreddedCopy(shreddedType, discovery.compression))
86+
87+
val atomic = Statement.EventsCopy(discovery.base, discovery.compression, ColumnsToCopy(AtomicColumns.Columns), ColumnsToSkip.none, discovery.typesInfo, loadAuthMethod)
88+
NonEmptyList(atomic, shreddedStatements)
89+
case TypesInfo.WideRow(_, _) if !experimentalFeatures.enableWideRow =>
90+
throw new IllegalStateException("Experimental widerow loading for Redshift is not enabled")
91+
case TypesInfo.WideRow(_, _) =>
92+
val columnsToCopy = ColumnsToCopy.fromDiscoveredData(discovery)
93+
NonEmptyList.one(
94+
Statement.EventsCopy(
95+
discovery.base,
96+
discovery.compression,
97+
columnsToCopy,
98+
ColumnsToSkip.none,
99+
discovery.typesInfo,
100+
loadAuthMethod
101+
)
102+
)
103+
}
104+
83105
override def createTable(schemas: SchemaList): Block = {
84106
val subschemas = FlatSchema.extractProperties(schemas)
85107
val tableName = StringUtils.getTableName(schemas.latest)
@@ -110,7 +132,7 @@ object Redshift {
110132
val frRoleArn = Fragment.const0(s"aws_iam_role=$roleArn")
111133
val frPath = Fragment.const0(source)
112134
sql"COPY $frTableName FROM '$frPath' CREDENTIALS '$frRoleArn' DELIMITER '$EventFieldSeparator'"
113-
case Statement.EventsCopy(path, compression, columnsToCopy, _, _, _) =>
135+
case Statement.EventsCopy(path, compression, columnsToCopy, _, typesInfo, _) =>
114136
// For some reasons Redshift JDBC doesn't handle interpolation in COPY statements
115137
val frTableName = Fragment.const(EventsTable.withSchema(schema))
116138
val frPath = Fragment.const0(Common.entityPathFull(path, Common.AtomicType))
@@ -119,13 +141,18 @@ object Redshift {
119141
val frMaxError = Fragment.const0(maxError.toString)
120142
val frCompression = getCompressionFormat(compression)
121143
val frColumns = Fragment.const0(columnsToCopy.names.map(_.value).mkString(","))
144+
val frFileFormat = typesInfo match {
145+
case TypesInfo.Shredded(_) => "CSV DELIMITER '$EventsFieldSeparator'"
146+
case TypesInfo.WideRow(JSON, _) => "JSON 'auto'"
147+
case TypesInfo.WideRow(PARQUET, _) => "PARQUET"
148+
}
122149

123150
sql"""COPY $frTableName ($frColumns) FROM '$frPath'
124151
| CREDENTIALS '$frRoleArn'
152+
| FORMAT $frFileFormat
125153
| REGION '$frRegion'
126154
| MAXERROR $frMaxError
127155
| TIMEFORMAT 'auto'
128-
| DELIMITER '$EventFieldSeparator'
129156
| EMPTYASNULL
130157
| FILLRECORD
131158
| TRUNCATECOLUMNS
@@ -163,7 +190,7 @@ object Redshift {
163190
| ACCEPTINVCHARS
164191
| $frCompression""".stripMargin
165192
case ShreddedType.Widerow(_) =>
166-
throw new IllegalStateException("Widerow loading is not yet supported for Redshift")
193+
throw new IllegalStateException("Cannot perform a shredded copy from widerow files")
167194
}
168195
case Statement.CreateTransient =>
169196
Fragment.const0(s"CREATE TABLE ${EventsTable.TransitTable(schema).withSchema} ( LIKE ${EventsTable.AtomicEvents(schema).withSchema} )")

0 commit comments

Comments
 (0)