Skip to content

Commit 6ac81c0

Browse files
authored
Merge pull request #60 from civitaspo/develop
v0.2.2
2 parents 30be2c0 + 07889e4 commit 6ac81c0

File tree

10 files changed

+85
-37
lines changed

10 files changed

+85
-37
lines changed

Diff for: CHANGELOG.md

+8
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,11 @@
1+
0.2.2 (2019-07-19)
2+
==================
3+
4+
* [Enhancement] Use scala-logging for logging instead of using slf4j directly
5+
* [Enhancement] Use workgroup default output location for athena query result output location.
6+
* [Change - `athena.ctas>`] Introduce `location` option and `output` option become deprecated.
7+
8+
19
0.2.1 (2019-07-16)
210
==================
311

Diff for: README.md

+3-2
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ _export:
1515
repositories:
1616
- https://jitpack.io
1717
dependencies:
18-
- pro.civitaspo:digdag-operator-athena:0.2.1
18+
- pro.civitaspo:digdag-operator-athena:0.2.2
1919
athena:
2020
auth_method: profile
2121

@@ -200,7 +200,8 @@ Nothing
200200
- **database**: The database name for query execution context. (string, optional)
201201
- **table**: The table name for the new table (string, default: `digdag_athena_ctas_${session_uuid.replaceAll("-", "")}_${random}`)
202202
- **workgroup**: The name of the workgroup in which the query is being started. (string, optional)
203-
- **output**: Output location for data created by CTAS (string, default: `"s3://aws-athena-query-results-${AWS_ACCOUNT_ID}-<AWS_REGION>/Unsaved/${YEAR}/${MONTH}/${DAY}/${athena_query_id}/"`)
203+
- **output**: [**Deprecated**] Use **location** option instead.
204+
- **location**: Output location for data created by CTAS (string, default: `"s3://aws-athena-query-results-${AWS_ACCOUNT_ID}-<AWS_REGION>/Unsaved/${YEAR}/${MONTH}/${DAY}/${athena_query_id}/"`)
204205
- **format**: The data format for the CTAS query results, such as `"orc"`, `"parquet"`, `"avro"`, `"json"`, or `"textfile"`. (string, default: `"parquet"`)
205206
- **compression**: The compression type to use for `"orc"` or `"parquet"`. (string, default: `"snappy"`)
206207
- **field_delimiter**: The field delimiter for files in CSV, TSV, and text files. This option is applied only when **format** is specific to text-based data storage formats. (string, optional)

Diff for: build.gradle

+4-1
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ plugins {
55
}
66

77
group = 'pro.civitaspo'
8-
version = '0.2.1'
8+
version = '0.2.2'
99

1010
def digdagVersion = '0.9.37'
1111
def awsSdkVersion = "1.11.587"
@@ -34,6 +34,9 @@ dependencies {
3434
compile group: 'com.amazonaws', name: 'aws-java-sdk-sts', version: awsSdkVersion
3535
// https://mvnrepository.com/artifact/com.amazonaws/aws-java-sdk-glue
3636
compile group: 'com.amazonaws', name: 'aws-java-sdk-glue', version: awsSdkVersion
37+
// https://mvnrepository.com/artifact/com.typesafe.scala-logging/scala-logging
38+
compile group: 'com.typesafe.scala-logging', name: "scala-logging_$depScalaVersion", version: '3.9.2'
39+
3740
}
3841

3942
shadowJar {

Diff for: example/example.dig

+3-3
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ _export:
44
- file://${repos}
55
# - https://jitpack.io
66
dependencies:
7-
- pro.civitaspo:digdag-operator-athena:0.2.1
7+
- pro.civitaspo:digdag-operator-athena:0.2.2
88
athena:
99
auth_method: profile
1010
value: 5
@@ -22,7 +22,7 @@ _export:
2222
athena.ctas>: template.sql
2323
database: ${database}
2424
table: hoge
25-
output: ${output}
25+
location: ${output}
2626

2727
+step5:
2828
echo>: ${athena}
@@ -37,7 +37,7 @@ _export:
3737
athena.ctas>: select 1 as a, 2 as b, 3 as c union all select 4 as a, 5 as b, 6 as c
3838
database: ${database}
3939
table: hoge
40-
output: ${output}
40+
location: ${output}
4141
partitioned_by: [b, c]
4242

4343
+step8:

Diff for: src/main/scala/pro/civitaspo/digdag/plugin/athena/AbstractAthenaOperator.scala

+4-4
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,21 @@
11
package pro.civitaspo.digdag.plugin.athena
22

33

4+
import com.typesafe.scalalogging.LazyLogging
45
import io.digdag.client.config.{Config, ConfigFactory}
56
import io.digdag.spi.{OperatorContext, SecretProvider, TemplateEngine}
67
import io.digdag.util.{BaseOperator, DurationParam}
7-
import org.slf4j.{Logger, LoggerFactory}
88
import pro.civitaspo.digdag.plugin.athena.aws.{Aws, AwsConf}
99

10+
1011
abstract class AbstractAthenaOperator(operatorName: String,
1112
context: OperatorContext,
1213
systemConfig: Config,
1314
templateEngine: TemplateEngine)
1415
extends BaseOperator(context)
16+
with LazyLogging
1517
{
16-
17-
protected val logger: Logger = LoggerFactory.getLogger(operatorName)
18-
if (!logger.isDebugEnabled) {
18+
if (!logger.underlying.isDebugEnabled) {
1919
// NOTE: suppress aws-java-sdk logs because of a bit noisy logging.
2020
System.setProperty("org.apache.commons.logging.Log", "org.apache.commons.logging.impl.NoOpLog")
2121
}
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
package pro.civitaspo.digdag.plugin.athena.aws
22

33

4+
import com.typesafe.scalalogging.LazyLogging
5+
6+
47
abstract class AwsService(aws: Aws)
8+
extends LazyLogging
59
{
610
}

Diff for: src/main/scala/pro/civitaspo/digdag/plugin/athena/aws/athena/Athena.scala

+31-12
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,13 @@ package pro.civitaspo.digdag.plugin.athena.aws.athena
22

33

44
import com.amazonaws.services.athena.{AmazonAthena, AmazonAthenaClientBuilder}
5-
import com.amazonaws.services.athena.model.{GetQueryExecutionRequest, GetQueryResultsRequest, QueryExecution, QueryExecutionContext, QueryExecutionState, ResultConfiguration, ResultSet, StartQueryExecutionRequest}
5+
import com.amazonaws.services.athena.model.{GetQueryExecutionRequest, GetQueryResultsRequest, GetWorkGroupRequest, QueryExecution, QueryExecutionContext, QueryExecutionState, ResultConfiguration, ResultSet, StartQueryExecutionRequest}
66
import io.digdag.util.DurationParam
7-
import org.slf4j.Logger
87
import pro.civitaspo.digdag.plugin.athena.aws.{Aws, AwsService}
98

9+
import scala.util.{Failure, Success, Try}
10+
import scala.util.chaining._
11+
1012

1113
case class Athena(aws: Aws)
1214
extends AwsService(aws)
@@ -35,13 +37,34 @@ case class Athena(aws: Aws)
3537
req.setQueryString(query)
3638
database.foreach(db => req.setQueryExecutionContext(new QueryExecutionContext().withDatabase(db)))
3739
req.setWorkGroup(workGroup.getOrElse(DEFAULT_WORKGROUP))
38-
// TODO: overwrite by workgroup configurations if workgroup is not "primary".
39-
req.setResultConfiguration(new ResultConfiguration().withOutputLocation(outputLocation.getOrElse(DEFAULT_OUTPUT_LOCATION)))
40+
req.setResultConfiguration(new ResultConfiguration()
41+
.withOutputLocation(resolveWorkGroupOutputLocation(workGroup.getOrElse(DEFAULT_WORKGROUP))))
4042
requestToken.foreach(req.setClientRequestToken)
4143

4244
withAthena(_.startQueryExecution(req)).getQueryExecutionId
4345
}
4446

47+
def resolveWorkGroupOutputLocation(workGroup: String): String =
48+
{
49+
workGroup match {
50+
case DEFAULT_WORKGROUP => DEFAULT_OUTPUT_LOCATION
51+
case wg =>
52+
val t = Try {
53+
withAthena(_.getWorkGroup(new GetWorkGroupRequest().withWorkGroup(wg)))
54+
.getWorkGroup
55+
.getConfiguration
56+
.getResultConfiguration
57+
.getOutputLocation
58+
}
59+
t match {
60+
case Success(outputLocation) => outputLocation
61+
case Failure(ex) => DEFAULT_OUTPUT_LOCATION.tap { default =>
62+
logger.warn(s"Use $default as athena output location because the workgroup output location cannot be resolved due to '${ex.getMessage}'.", ex)
63+
}
64+
}
65+
}
66+
}
67+
4568
def getQueryExecution(executionId: String): QueryExecution =
4669
{
4770
withAthena(_.getQueryExecution(new GetQueryExecutionRequest().withQueryExecutionId(executionId))).getQueryExecution
@@ -50,14 +73,12 @@ case class Athena(aws: Aws)
5073
def waitQueryExecution(executionId: String,
5174
successStates: Seq[QueryExecutionState],
5275
failureStates: Seq[QueryExecutionState],
53-
timeout: DurationParam,
54-
loggerOption: Option[Logger] = None): Unit =
76+
timeout: DurationParam): Unit =
5577
{
5678
val waiter = AthenaQueryWaiter(athena = this,
5779
successStats = successStates,
5880
failureStats = failureStates,
59-
timeout = timeout,
60-
loggerOption = loggerOption)
81+
timeout = timeout)
6182
waiter.wait(executionId)
6283
}
6384

@@ -68,8 +89,7 @@ case class Athena(aws: Aws)
6889
requestToken: Option[String] = None,
6990
successStates: Seq[QueryExecutionState],
7091
failureStates: Seq[QueryExecutionState],
71-
timeout: DurationParam,
72-
loggerOption: Option[Logger] = None): QueryExecution =
92+
timeout: DurationParam): QueryExecution =
7393
{
7494
val executionId: String = startQueryExecution(query = query,
7595
database = database,
@@ -80,8 +100,7 @@ case class Athena(aws: Aws)
80100
waitQueryExecution(executionId = executionId,
81101
successStates = successStates,
82102
failureStates = failureStates,
83-
timeout = timeout,
84-
loggerOption = loggerOption)
103+
timeout = timeout)
85104

86105
getQueryExecution(executionId = executionId)
87106
}

Diff for: src/main/scala/pro/civitaspo/digdag/plugin/athena/aws/athena/AthenaQueryWaiter.scala

+4-5
Original file line numberDiff line numberDiff line change
@@ -6,18 +6,17 @@ import java.util.concurrent.{Executors, ExecutorService}
66
import com.amazonaws.services.athena.model.{GetQueryExecutionRequest, GetQueryExecutionResult, QueryExecutionState}
77
import com.amazonaws.waiters.{PollingStrategy, PollingStrategyContext, SdkFunction, Waiter, WaiterAcceptor, WaiterBuilder, WaiterParameters, WaiterState}
88
import com.amazonaws.waiters.PollingStrategy.{DelayStrategy, RetryStrategy}
9+
import com.typesafe.scalalogging.LazyLogging
910
import io.digdag.util.DurationParam
10-
import org.slf4j.{Logger, LoggerFactory}
11+
1112

1213
case class AthenaQueryWaiter(athena: Athena,
1314
successStats: Seq[QueryExecutionState],
1415
failureStats: Seq[QueryExecutionState],
1516
executorService: ExecutorService = Executors.newFixedThreadPool(50),
16-
timeout: DurationParam,
17-
loggerOption: Option[Logger] = None)
17+
timeout: DurationParam)
18+
extends LazyLogging
1819
{
19-
val logger: Logger = loggerOption.getOrElse(LoggerFactory.getLogger(classOf[AthenaQueryWaiter]))
20-
2120
def wait(executionId: String): Unit =
2221
{
2322
newWaiter().run(newWaiterParameters(executionId = executionId))

Diff for: src/main/scala/pro/civitaspo/digdag/plugin/athena/aws/sts/Sts.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import com.amazonaws.services.securitytoken.{AWSSecurityTokenService, AWSSecurit
66
import com.amazonaws.services.securitytoken.model.{AssumeRoleRequest, GetCallerIdentityRequest, PolicyDescriptorType}
77
import pro.civitaspo.digdag.plugin.athena.aws.{Aws, AwsService}
88

9-
import scala.collection.JavaConverters._
9+
import scala.jdk.CollectionConverters._
1010

1111

1212
case class Sts(aws: Aws)

Diff for: src/main/scala/pro/civitaspo/digdag/plugin/athena/ctas/AthenaCtasOperator.scala

+23-9
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import pro.civitaspo.digdag.plugin.athena.AbstractAthenaOperator
1212
import scala.jdk.CollectionConverters._
1313
import scala.util.{Failure, Random, Success, Try}
1414

15+
1516
class AthenaCtasOperator(operatorName: String,
1617
context: OperatorContext,
1718
systemConfig: Config,
@@ -85,7 +86,20 @@ class AthenaCtasOperator(operatorName: String,
8586
protected val database: Optional[String] = params.getOptional("database", classOf[String])
8687
protected val table: String = params.get("table", classOf[String], defaultTableName)
8788
protected val workGroup: Optional[String] = params.getOptional("workgroup", classOf[String])
89+
@deprecated(message = "Use location option instead", since = "0.2.2")
8890
protected val output: Optional[String] = params.getOptional("output", classOf[String])
91+
protected val location: Optional[String] = {
92+
val l = params.getOptional("location", classOf[String])
93+
if (output.isPresent && l.isPresent) {
94+
logger.warn(s"Use the value of location option: ${l.get()} although the value of output option (${output.get()}) is specified.")
95+
l
96+
}
97+
else if (output.isPresent) {
98+
logger.warn("output option is deprecated. Please use location option instead.")
99+
output
100+
}
101+
else l
102+
}
89103
protected val format: String = params.get("format", classOf[String], "parquet")
90104
protected val compression: String = params.get("compression", classOf[String], "snappy")
91105
protected val fieldDelimiter: Optional[String] = params.getOptional("field_delimiter", classOf[String])
@@ -135,15 +149,15 @@ class AthenaCtasOperator(operatorName: String,
135149
override def runTask(): TaskResult =
136150
{
137151
saveMode match {
138-
case SaveMode.ErrorIfExists if output.isPresent && hasObjects(output.get) =>
139-
throw new IllegalStateException(s"${output.get} already exists")
140-
case SaveMode.Ignore if output.isPresent && hasObjects(output.get) =>
141-
logger.info(s"${output.get} already exists, so ignore this session.")
152+
case SaveMode.ErrorIfExists if location.isPresent && hasObjects(location.get) =>
153+
throw new IllegalStateException(s"${location.get} already exists")
154+
case SaveMode.Ignore if location.isPresent && hasObjects(location.get) =>
155+
logger.info(s"${location.get} already exists, so ignore this session.")
142156
return TaskResult.empty(request)
143-
case SaveMode.Overwrite if output.isPresent =>
144-
logger.info(s"Overwrite ${output.get}")
145-
rmObjects(output.get)
146-
case _ => // do nothing
157+
case SaveMode.Overwrite if location.isPresent =>
158+
logger.info(s"Overwrite ${location.get}")
159+
rmObjects(location.get)
160+
case _ => // do nothing
147161
}
148162

149163
val subTask: Config = cf.create()
@@ -169,7 +183,7 @@ class AthenaCtasOperator(operatorName: String,
169183
protected def generateCtasQuery(): String =
170184
{
171185
val propsBuilder = Map.newBuilder[String, String]
172-
if (output.isPresent) propsBuilder += ("external_location" -> s"'${output.get}'")
186+
if (location.isPresent) propsBuilder += ("external_location" -> s"'${location.get}'")
173187
propsBuilder += ("format" -> s"'$format'")
174188
format match {
175189
case "parquet" => propsBuilder += ("parquet_compression" -> s"'$compression'")

0 commit comments

Comments
 (0)