Skip to content

Support for wide row tables on Redshift #1082

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion config/redshift.config.reference.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,11 @@
# Custom JDBC configuration. Optional, default value { "ssl": true }
"jdbc": { "ssl": true },
# MAXERROR, amount of acceptable loading errors. Optional, default value 10
"maxError": 10
"maxError": 10,
# unlock experimental features
"experimental": {
"enableWideRow": false
}
},

"schedules": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ object StorageTarget {
username: String,
password: PasswordConfig,
maxError: Int,
sshTunnel: Option[TunnelConfig]) extends StorageTarget {
sshTunnel: Option[TunnelConfig],
experimental: RedshiftExperimentalFeatures) extends StorageTarget {
override def driver: String = "com.amazon.redshift.jdbc42.Driver"

override def connectionUrl: String = s"jdbc:redshift://$host:$port/$database"
Expand Down Expand Up @@ -275,6 +276,8 @@ object StorageTarget {
j.sslRootCert, j.tcpKeepAlive, j.tcpKeepAliveMinutes))
}

final case class RedshiftExperimentalFeatures(enableWideRow: Boolean)

/** Reference to encrypted entity inside EC2 Parameter Store */
final case class ParameterStoreConfig(parameterName: String)

Expand Down Expand Up @@ -328,6 +331,9 @@ object StorageTarget {
*/
final case class TunnelConfig(bastion: BastionConfig, localPort: Int, destination: DestinationConfig)

implicit def redshiftExperimentalFeaturesDecoder: Decoder[RedshiftExperimentalFeatures] =
deriveDecoder[RedshiftExperimentalFeatures]

implicit def redshiftConfigDecoder: Decoder[Redshift] =
deriveDecoder[Redshift]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,8 @@ object ConfigSpec {
"admin",
StorageTarget.PasswordConfig.PlainText("Supersecret1"),
10,
None
None,
StorageTarget.RedshiftExperimentalFeatures(false)
)
val exampleSnowflake = StorageTarget.Snowflake(
Some("us-west-2"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ class StorageTargetSpec extends Specification {
"schema": "atomic",
"maxError": 1,
"compRows": 20000,
"purpose": "ENRICHED_EVENTS"
"purpose": "ENRICHED_EVENTS",
"experimental": {"enableWideRow": true}
}"""

val expected = StorageTarget.Redshift(
Expand All @@ -46,7 +47,8 @@ class StorageTargetSpec extends Specification {
"ADD HERE",
StorageTarget.PasswordConfig.PlainText("ADD HERE"),
1,
None)
None,
StorageTarget.RedshiftExperimentalFeatures(true))

config.as[StorageTarget] must beRight(expected)
}
Expand Down
3 changes: 3 additions & 0 deletions modules/redshift-loader/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,8 @@
"port": 5439
"jdbc": { "ssl": true }
"maxError": 10
"experimental": {
"enableWideRow": false
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import com.snowplowanalytics.iglu.schemaddl.redshift.generators.{DdlFile, DdlGen
import com.snowplowanalytics.snowplow.rdbloader.LoadStatements
import com.snowplowanalytics.snowplow.rdbloader.common.Common
import com.snowplowanalytics.snowplow.rdbloader.common.config.TransformerConfig.Compression
import com.snowplowanalytics.snowplow.rdbloader.common.LoaderMessage.TypesInfo.WideRow.WideRowFormat.{JSON, PARQUET}
import com.snowplowanalytics.snowplow.rdbloader.common.LoaderMessage.TypesInfo
import com.snowplowanalytics.snowplow.rdbloader.config.{Config, StorageTarget}
import com.snowplowanalytics.snowplow.rdbloader.db.Columns.{ColumnsToCopy, ColumnsToSkip, EventTableColumns}
import com.snowplowanalytics.snowplow.rdbloader.db.Migration.{Block, Entity, Item, NoPreStatements, NoStatements}
Expand All @@ -43,7 +45,7 @@ object Redshift {

def build(config: Config[StorageTarget]): Either[String, Target] = {
config.storage match {
case StorageTarget.Redshift(_, _, _, _, roleArn, schema, _, _, maxError, _) =>
case StorageTarget.Redshift(_, _, _, _, roleArn, schema, _, _, maxError, _, experimentalFeatures) =>
val result = new Target {

override val requiresEventsColumns: Boolean = false
Expand All @@ -67,19 +69,39 @@ object Redshift {
Block(preTransaction.reverse, inTransaction.reverse, Entity.Table(schema, target))
}

override def extendTable(info: ShreddedType.Info): Option[Block] =
throw new IllegalStateException("Redshift Loader does not support loading wide row")

override def getLoadStatements(discovery: DataDiscovery, eventTableColumns: EventTableColumns, loadAuthMethod: LoadAuthMethod): LoadStatements = {
val shreddedStatements = discovery
.shreddedTypes
.filterNot(_.isAtomic)
.map(shreddedType => Statement.ShreddedCopy(shreddedType, discovery.compression))

val atomic = Statement.EventsCopy(discovery.base, discovery.compression, ColumnsToCopy(AtomicColumns.Columns), ColumnsToSkip.none, discovery.typesInfo, loadAuthMethod)
NonEmptyList(atomic, shreddedStatements)
override def extendTable(info: ShreddedType.Info): Option[Block] = {
val frColumnName = Fragment.const(info.getNameFull)
val frTableName = Fragment.const(EventsTable.withSchema(schema))
val addColumnSql = sql"ALTER TABLE $frTableName ADD COLUMN $frColumnName SUPER"
Some(Block(List(Item.AddColumn(addColumnSql, Nil)), Nil, Entity.Column(info)))
}

override def getLoadStatements(discovery: DataDiscovery, eventTableColumns: EventTableColumns, loadAuthMethod: LoadAuthMethod): LoadStatements =
discovery.typesInfo match {
case TypesInfo.Shredded(_) =>
val shreddedStatements = discovery
.shreddedTypes
.filterNot(_.isAtomic)
.map(shreddedType => Statement.ShreddedCopy(shreddedType, discovery.compression))

val atomic = Statement.EventsCopy(discovery.base, discovery.compression, ColumnsToCopy(AtomicColumns.Columns), ColumnsToSkip.none, discovery.typesInfo, loadAuthMethod)
NonEmptyList(atomic, shreddedStatements)
case TypesInfo.WideRow(_, _) if !experimentalFeatures.enableWideRow =>
throw new IllegalStateException("Experimental widerow loading for Redshift is not enabled")
case TypesInfo.WideRow(_, _) =>
val columnsToCopy = ColumnsToCopy.fromDiscoveredData(discovery)
NonEmptyList.one(
Statement.EventsCopy(
discovery.base,
discovery.compression,
columnsToCopy,
ColumnsToSkip.none,
discovery.typesInfo,
loadAuthMethod
)
)
}

override def createTable(schemas: SchemaList): Block = {
val subschemas = FlatSchema.extractProperties(schemas)
val tableName = StringUtils.getTableName(schemas.latest)
Expand Down Expand Up @@ -110,7 +132,7 @@ object Redshift {
val frRoleArn = Fragment.const0(s"aws_iam_role=$roleArn")
val frPath = Fragment.const0(source)
sql"COPY $frTableName FROM '$frPath' CREDENTIALS '$frRoleArn' DELIMITER '$EventFieldSeparator'"
case Statement.EventsCopy(path, compression, columnsToCopy, _, _, _) =>
case Statement.EventsCopy(path, compression, columnsToCopy, _, typesInfo, _) =>
// For some reasons Redshift JDBC doesn't handle interpolation in COPY statements
val frTableName = Fragment.const(EventsTable.withSchema(schema))
val frPath = Fragment.const0(Common.entityPathFull(path, Common.AtomicType))
Expand All @@ -119,13 +141,18 @@ object Redshift {
val frMaxError = Fragment.const0(maxError.toString)
val frCompression = getCompressionFormat(compression)
val frColumns = Fragment.const0(columnsToCopy.names.map(_.value).mkString(","))
val frFileFormat = typesInfo match {
case TypesInfo.Shredded(_) => "CSV DELIMITER '$EventsFieldSeparator'"
case TypesInfo.WideRow(JSON, _) => "JSON 'auto'"
case TypesInfo.WideRow(PARQUET, _) => "PARQUET"
}

sql"""COPY $frTableName ($frColumns) FROM '$frPath'
| CREDENTIALS '$frRoleArn'
| FORMAT $frFileFormat
| REGION '$frRegion'
| MAXERROR $frMaxError
| TIMEFORMAT 'auto'
| DELIMITER '$EventFieldSeparator'
| EMPTYASNULL
| FILLRECORD
| TRUNCATECOLUMNS
Expand Down Expand Up @@ -163,7 +190,7 @@ object Redshift {
| ACCEPTINVCHARS
| $frCompression""".stripMargin
case ShreddedType.Widerow(_) =>
throw new IllegalStateException("Widerow loading is not yet supported for Redshift")
throw new IllegalStateException("Cannot perform a shredded copy from widerow files")
}
case Statement.CreateTransient =>
Fragment.const0(s"CREATE TABLE ${EventsTable.TransitTable(schema).withSchema} ( LIKE ${EventsTable.AtomicEvents(schema).withSchema} )")
Expand Down