Skip to content

Commit 98325ef

Browse files
authored
Merge pull request #21 from exasol/feature/#18-avro-format-import-support
Add support for importing avro formatted files
2 parents 519164a + 7f75a6c commit 98325ef

32 files changed

Lines changed: 726 additions & 293 deletions

README.md

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ can contact our support team.
1414

1515
* [Overview](#overview)
1616
* [Getting started](#getting-started)
17+
* [Storage Formats](#data-storage-formats)
1718
* [Import](#import)
1819
* [Export](#export)
1920
* [Building from source](#building-from-source)
@@ -110,6 +111,28 @@ CREATE OR REPLACE JAVA SET SCRIPT EXPORT_TABLE(...) EMITS (ROWS_AFFECTED INT) AS
110111
Please do not forget to change the bucket name or latest jar version according
111112
to your setup.
112113

114+
## Data Storage Formats
115+
116+
When performing import or export, the default data format is set as [Apache
117+
Parquet][parquet] format. However, you can specify the format using
118+
`DATA_FORMAT` configuration property.
119+
120+
For example in order to import [Apache Avro][avro] format:
121+
122+
```sql
123+
IMPORT INTO MY_SALES_POSITIONS_TABLE
124+
FROM SCRIPT ETL.IMPORT_PATH WITH
125+
BUCKET_PATH = 's3a://exa-bucket/data/avro/retail/sales_positions/*'
126+
DATA_FORMAT = 'AVRO'
127+
...
128+
PARALLELISM = 'nproc()';
129+
```
130+
131+
### Supported storage formats
132+
133+
* [Apache Parquet][parquet]
134+
* [Apache Avro][avro]: currently only import is supported
135+
113136
## IMPORT
114137

115138
Please follow instructions below in order to import from different cloud
@@ -268,4 +291,5 @@ The packaged jar should be located at
268291
[gcs]: https://cloud.google.com/storage/
269292
[azure]: https://azure.microsoft.com/en-us/services/storage/blobs/
270293
[parquet]: https://parquet.apache.org/
294+
[avro]: https://avro.apache.org/
271295
[jars]: https://github.com/exasol/cloud-storage-etl-udfs/releases

project/build.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
sbt.version=1.2.6
1+
sbt.version=1.2.8

project/plugins.sbt

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ addSbtPlugin("io.get-coursier" % "sbt-coursier" % "1.0.3")
44

55
// Adds a `wartremover` a flexible Scala code linting tool
66
// http://github.com/puffnfresh/wartremover
7-
addSbtPlugin("org.wartremover" % "sbt-wartremover" % "2.3.7")
7+
addSbtPlugin("org.wartremover" % "sbt-wartremover" % "2.4.1")
88

99
// Adds Contrib Warts
1010
// http://github.com/wartremover/wartremover-contrib/
@@ -32,7 +32,7 @@ addSbtPlugin("org.scoverage" % "sbt-scoverage" % "1.5.1")
3232

3333
// Adds a `dependencyUpdates` task to check Maven repositories for dependency updates
3434
// http://github.com/rtimush/sbt-updates
35-
addSbtPlugin("com.timushev.sbt" % "sbt-updates" % "0.3.4")
35+
addSbtPlugin("com.timushev.sbt" % "sbt-updates" % "0.4.0")
3636

3737
// Adds a `scalafmt` task for automatic source code formatting
3838
// https://github.com/lucidsoftware/neo-sbt-scalafmt
@@ -60,7 +60,7 @@ addSbtPlugin("com.typesafe.sbt" % "sbt-git" % "1.0.0")
6060

6161
// Adds a `sbt-explicit-dependencies` plugin
6262
// https://github.com/cb372/sbt-explicit-dependencies
63-
addSbtPlugin("com.github.cb372" % "sbt-explicit-dependencies" % "0.2.6")
63+
addSbtPlugin("com.github.cb372" % "sbt-explicit-dependencies" % "0.2.9")
6464

6565
// Setup this and project/project/plugins.sbt for formatting project/*.scala files with scalafmt
6666
inThisBuild(

sbtx

Lines changed: 34 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@
66

77
set -o pipefail
88

9-
declare -r sbt_release_version="0.13.18"
10-
declare -r sbt_unreleased_version="0.13.18"
9+
declare -r sbt_release_version="1.2.8"
10+
declare -r sbt_unreleased_version="1.2.8"
1111

1212
declare -r latest_213="2.13.0-M5"
1313
declare -r latest_212="2.12.8"
@@ -43,11 +43,12 @@ declare -a extra_jvm_opts extra_sbt_opts
4343

4444
echoerr () { echo >&2 "$@"; }
4545
vlog () { [[ -n "$verbose" ]] && echoerr "$@"; }
46-
die () { echo "Aborting: $@" ; exit 1; }
46+
die () { echo "Aborting: $*" ; exit 1; }
4747

4848
setTrapExit () {
4949
# save stty and trap exit, to ensure echo is re-enabled if we are interrupted.
50-
export SBT_STTY="$(stty -g 2>/dev/null)"
50+
SBT_STTY="$(stty -g 2>/dev/null)"
51+
export SBT_STTY
5152

5253
# restore stty settings (echo in particular)
5354
onSbtRunnerExit() {
@@ -67,16 +68,18 @@ get_script_path () {
6768
local path="$1"
6869
[[ -L "$path" ]] || { echo "$path" ; return; }
6970

70-
local target="$(readlink "$path")"
71+
local -r target="$(readlink "$path")"
7172
if [[ "${target:0:1}" == "/" ]]; then
7273
echo "$target"
7374
else
7475
echo "${path%/*}/$target"
7576
fi
7677
}
7778

78-
declare -r script_path="$(get_script_path "$BASH_SOURCE")"
79-
declare -r script_name="${script_path##*/}"
79+
script_path="$(get_script_path "${BASH_SOURCE[0]}")"
80+
declare -r script_path
81+
script_name="${script_path##*/}"
82+
declare -r script_name
8083

8184
init_default_option_file () {
8285
local overriding_var="${!1}"
@@ -90,8 +93,8 @@ init_default_option_file () {
9093
echo "$default_file"
9194
}
9295

93-
declare sbt_opts_file="$(init_default_option_file SBT_OPTS .sbtopts)"
94-
declare jvm_opts_file="$(init_default_option_file JVM_OPTS .jvmopts)"
96+
sbt_opts_file="$(init_default_option_file SBT_OPTS .sbtopts)"
97+
jvm_opts_file="$(init_default_option_file JVM_OPTS .jvmopts)"
9598

9699
build_props_sbt () {
97100
[[ -r "$buildProps" ]] && \
@@ -142,9 +145,9 @@ addResidual () { vlog "[residual] arg = '$1'" ; residual_args+=("$1"); }
142145
addResolver () { addSbt "set resolvers += $1"; }
143146
addDebugger () { addJava "-Xdebug" ; addJava "-Xrunjdwp:transport=dt_socket,server=y,suspend=n,address=$1"; }
144147
setThisBuild () {
145-
vlog "[addBuild] args = '$@'"
148+
vlog "[addBuild] args = '$*'"
146149
local key="$1" && shift
147-
addSbt "set $key in ThisBuild := $@"
150+
addSbt "set $key in ThisBuild := $*"
148151
}
149152
setScalaVersion () {
150153
[[ "$1" == *"-SNAPSHOT" ]] && addResolver 'Resolver.sonatypeRepo("snapshots")'
@@ -159,7 +162,7 @@ setJavaHome () {
159162
}
160163

161164
getJavaVersion() {
162-
local str=$("$1" -version 2>&1 | grep -E -e '(java|openjdk) version' | awk '{ print $3 }' | tr -d '"')
165+
local -r str=$("$1" -version 2>&1 | grep -E -e '(java|openjdk) version' | awk '{ print $3 }' | tr -d '"')
163166

164167
# java -version on java8 says 1.8.x
165168
# but on 9 and 10 it's 9.x.y and 10.x.y.
@@ -191,14 +194,14 @@ checkJava() {
191194
}
192195

193196
java_version () {
194-
local version=$(getJavaVersion "$java_cmd")
197+
local -r version=$(getJavaVersion "$java_cmd")
195198
vlog "Detected Java version: $version"
196199
echo "$version"
197200
}
198201

199202
# MaxPermSize critical on pre-8 JVMs but incurs noisy warning on 8+
200203
default_jvm_opts () {
201-
local v="$(java_version)"
204+
local -r v="$(java_version)"
202205
if [[ $v -ge 8 ]]; then
203206
echo "$default_jvm_opts_common"
204207
else
@@ -240,11 +243,11 @@ execRunner () {
240243

241244
jar_url () { make_url "$1"; }
242245

243-
is_cygwin () [[ "$(uname -a)" == "CYGWIN"* ]]
246+
is_cygwin () { [[ "$(uname -a)" == "CYGWIN"* ]]; }
244247

245248
jar_file () {
246249
is_cygwin \
247-
&& echo "$(cygpath -w $sbt_launch_dir/"$1"/sbt-launch.jar)" \
250+
&& cygpath -w "$sbt_launch_dir/$1/sbt-launch.jar" \
248251
|| echo "$sbt_launch_dir/$1/sbt-launch.jar"
249252
}
250253

@@ -420,7 +423,7 @@ process_args "$@"
420423
readConfigFile() {
421424
local end=false
422425
until $end; do
423-
read || end=true
426+
read -r || end=true
424427
[[ $REPLY =~ ^# ]] || [[ -z $REPLY ]] || echo "$REPLY"
425428
done < "$1"
426429
}
@@ -429,10 +432,10 @@ readConfigFile() {
429432
# can supply args to this runner
430433
if [[ -r "$sbt_opts_file" ]]; then
431434
vlog "Using sbt options defined in file $sbt_opts_file"
432-
while read opt; do extra_sbt_opts+=("$opt"); done < <(readConfigFile "$sbt_opts_file")
435+
while read -r opt; do extra_sbt_opts+=("$opt"); done < <(readConfigFile "$sbt_opts_file")
433436
elif [[ -n "$SBT_OPTS" && ! ("$SBT_OPTS" =~ ^@.*) ]]; then
434437
vlog "Using sbt options defined in variable \$SBT_OPTS"
435-
extra_sbt_opts=( $SBT_OPTS )
438+
IFS=" " read -r -a extra_sbt_opts <<< "$SBT_OPTS"
436439
else
437440
vlog "No extra sbt options have been defined"
438441
fi
@@ -452,18 +455,18 @@ checkJava
452455
setTraceLevel() {
453456
case "$sbt_version" in
454457
"0.7."* | "0.10."* | "0.11."* ) echoerr "Cannot set trace level in sbt version $sbt_version" ;;
455-
*) setThisBuild traceLevel $trace_level ;;
458+
*) setThisBuild traceLevel "$trace_level" ;;
456459
esac
457460
}
458461

459462
# set scalacOptions if we were given any -S opts
460-
[[ ${#scalac_args[@]} -eq 0 ]] || addSbt "set scalacOptions in ThisBuild += \"${scalac_args[@]}\""
463+
[[ ${#scalac_args[@]} -eq 0 ]] || addSbt "set scalacOptions in ThisBuild += \"${scalac_args[*]}\""
461464

462465
[[ -n "$sbt_explicit_version" && -z "$sbt_new" ]] && addJava "-Dsbt.version=$sbt_explicit_version"
463466
vlog "Detected sbt version $sbt_version"
464467

465468
if [[ -n "$sbt_script" ]]; then
466-
residual_args=( $sbt_script ${residual_args[@]} )
469+
residual_args=( "$sbt_script" "${residual_args[@]}" )
467470
else
468471
# no args - alert them there's stuff in here
469472
(( argumentCount > 0 )) || {
@@ -484,6 +487,7 @@ EOM
484487
}
485488

486489
# pick up completion if present; todo
490+
# shellcheck disable=SC1091
487491
[[ -r .sbt_completion.sh ]] && source .sbt_completion.sh
488492

489493
# directory to store sbt launchers
@@ -518,13 +522,13 @@ fi
518522

519523
if [[ -r "$jvm_opts_file" ]]; then
520524
vlog "Using jvm options defined in file $jvm_opts_file"
521-
while read opt; do extra_jvm_opts+=("$opt"); done < <(readConfigFile "$jvm_opts_file")
525+
while read -r opt; do extra_jvm_opts+=("$opt"); done < <(readConfigFile "$jvm_opts_file")
522526
elif [[ -n "$JVM_OPTS" && ! ("$JVM_OPTS" =~ ^@.*) ]]; then
523527
vlog "Using jvm options defined in \$JVM_OPTS variable"
524-
extra_jvm_opts=( $JVM_OPTS )
528+
IFS=" " read -r -a extra_jvm_opts <<< "$JVM_OPTS"
525529
else
526530
vlog "Using default jvm options"
527-
extra_jvm_opts=( $(default_jvm_opts) )
531+
IFS=" " read -r -a extra_jvm_opts <<< "$(default_jvm_opts)"
528532
fi
529533

530534
# traceLevel is 0.12+
@@ -546,13 +550,12 @@ main () {
546550
# we're not going to print those lines anyway. We strip that bit of
547551
# line noise, but leave the other codes to preserve color.
548552
mainFiltered () {
549-
local ansiOverwrite='\r\x1BM\x1B[2K'
550-
local excludeRegex=$(egrep -v '^#|^$' ~/.sbtignore | paste -sd'|' -)
553+
local -r excludeRegex=$(grep -E -v '^#|^$' ~/.sbtignore | paste -sd'|' -)
551554

552555
echoLine () {
553-
local line="$1"
554-
local line1="$(echo "$line" | sed 's/\r\x1BM\x1B\[2K//g')" # This strips the OverwriteLine code.
555-
local line2="$(echo "$line1" | sed 's/\x1B\[[0-9;]*[JKmsu]//g')" # This strips all codes - we test regexes against this.
556+
local -r line="$1"
557+
local -r line1="${line//\r\x1BM\x1B\[2K//g}" # This strips the OverwriteLine code.
558+
local -r line2="${line1//\x1B\[[0-9;]*[JKmsu]//g}" # This strips all codes - we test regexes against this.
556559

557560
if [[ $line2 =~ $excludeRegex ]]; then
558561
[[ -n $debugUs ]] && echo "[X] $line1"
@@ -569,7 +572,7 @@ mainFiltered () {
569572
# Obviously this is super ad hoc but I don't know how to improve on it. Testing whether
570573
# stdin is a terminal is useless because most of my use cases for this filtering are
571574
# exactly when I'm at a terminal, running sbt non-interactively.
572-
shouldFilter () { [[ -f ~/.sbtignore ]] && ! egrep -q '\b(shell|console|consoleProject)\b' <<<"${residual_args[@]}"; }
575+
shouldFilter () { [[ -f ~/.sbtignore ]] && ! grep -E -q '\b(shell|console|consoleProject)\b' <<<"${residual_args[@]}"; }
573576

574577
# run sbt
575578
if shouldFilter; then mainFiltered; else main; fi
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
package com.exasol.cloudetl.avro
2+
3+
import com.exasol.cloudetl.data.Row
4+
5+
import org.apache.avro.file.DataFileReader
6+
import org.apache.avro.generic.GenericRecord
7+
import org.apache.avro.util.Utf8
8+
9+
/**
10+
* An object that creates a [[com.exasol.cloudetl.data.Row]] iterator
11+
* given the Avro [[org.apache.avro.file.DataFileReader]] with
12+
* [[org.apache.avro.generic.GenericRecord]].
13+
*
14+
* Each next record is converted into an internal Row class.
15+
*/
16+
object AvroRowIterator {
17+
18+
def apply(reader: DataFileReader[GenericRecord]): Iterator[Row] =
19+
new Iterator[Row] {
20+
@SuppressWarnings(Array("org.wartremover.warts.Var"))
21+
private[this] var isCompleted = false
22+
23+
override def hasNext: Boolean =
24+
if (isCompleted) {
25+
false
26+
} else {
27+
val hasNext = reader.hasNext
28+
if (!hasNext) {
29+
reader.close()
30+
isCompleted = true
31+
}
32+
hasNext
33+
}
34+
35+
override def next(): Row = {
36+
if (!hasNext) {
37+
throw new NoSuchElementException("Avro reader next on empty iterator")
38+
}
39+
val record = reader.next()
40+
recordToRow(record)
41+
}
42+
}
43+
44+
private[this] def recordToRow(record: GenericRecord): Row = {
45+
val size = record.getSchema.getFields.size
46+
val values = Array.ofDim[Any](size)
47+
for { index <- 0 until size } {
48+
values.update(index, convertRecordValue(record.get(index)))
49+
}
50+
Row(values.toSeq)
51+
}
52+
53+
@SuppressWarnings(Array("org.wartremover.warts.AsInstanceOf"))
54+
private[this] def convertRecordValue(value: Any): Any = value match {
55+
case _: GenericRecord =>
56+
throw new IllegalArgumentException("Avro nested record type is not supported yet!")
57+
case _: java.util.Collection[_] =>
58+
throw new IllegalArgumentException("Avro collection type is not supported yet!")
59+
case _: java.util.Map[_, _] =>
60+
throw new IllegalArgumentException("Avro map type is not supported yet!")
61+
case _: Utf8 => value.asInstanceOf[Utf8].toString
62+
case primitiveType => primitiveType
63+
}
64+
65+
}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
package com.exasol.cloudetl.bucket
2+
3+
import org.apache.hadoop.conf.Configuration
4+
5+
/** A [[Bucket]] implementation for the Azure Blob Storage */
6+
final case class AzureBlobBucket(path: String, params: Map[String, String]) extends Bucket {
7+
8+
/** @inheritdoc */
9+
override val bucketPath: String = path
10+
11+
/** @inheritdoc */
12+
override def validate(): Unit =
13+
Bucket.validate(params, Bucket.AZURE_PARAMETERS)
14+
15+
/**
16+
* @inheritdoc
17+
*
18+
* Additionally validates that all required parameters are available
19+
* in order to create a configuration.
20+
*/
21+
override def createConfiguration(): Configuration = {
22+
validate()
23+
24+
val conf = new Configuration()
25+
val accountName = Bucket.requiredParam(params, "AZURE_ACCOUNT_NAME")
26+
val accountSecretKey = Bucket.requiredParam(params, "AZURE_SECRET_KEY")
27+
conf.set("fs.azure", classOf[org.apache.hadoop.fs.azure.NativeAzureFileSystem].getName)
28+
conf.set("fs.wasb.impl", classOf[org.apache.hadoop.fs.azure.NativeAzureFileSystem].getName)
29+
conf.set("fs.wasbs.impl", classOf[org.apache.hadoop.fs.azure.NativeAzureFileSystem].getName)
30+
conf.set("fs.AbstractFileSystem.wasb.impl", classOf[org.apache.hadoop.fs.azure.Wasb].getName)
31+
conf.set(
32+
"fs.AbstractFileSystem.wasbs.impl",
33+
classOf[org.apache.hadoop.fs.azure.Wasbs].getName
34+
)
35+
conf.set(s"fs.azure.account.key.$accountName.blob.core.windows.net", accountSecretKey)
36+
37+
conf
38+
}
39+
40+
}

0 commit comments

Comments
 (0)