Skip to content

Commit c280ef6

Browse files
authored
Merge pull request #66 from civitaspo/develop
v0.2.4
2 parents fa82f07 + 922cc09 commit c280ef6

File tree

7 files changed

+265
-6
lines changed

7 files changed

+265
-6
lines changed

Diff for: CHANGELOG.md

+6
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,9 @@
1+
0.2.4 (2019-07-29)
2+
==================
3+
4+
* [New Feature] Add `athena.table_exists?>` operator
5+
* [New Feature] Add `athena.partition_exists?>` operator
6+
17
0.2.3 (2019-07-19)
28
==================
39
* [New Feature] Add `athena.drop_table_multi>` operator

Diff for: README.md

+35-1
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ _export:
1515
repositories:
1616
- https://jitpack.io
1717
dependencies:
18-
- pro.civitaspo:digdag-operator-athena:0.2.3
18+
- pro.civitaspo:digdag-operator-athena:0.2.4
1919
athena:
2020
auth_method: profile
2121

@@ -32,6 +32,8 @@ _export:
3232
output: s3://mybucket/prefix/
3333
```
3434
35+
See [examples](./example/example.dig) for more cases.
36+
3537
# Configuration
3638
3739
## Remarks
@@ -112,6 +114,22 @@ Nothing
112114

113115
Nothing
114116

117+
## Configuration for `athena.partition_exists?>` operator
118+
119+
### Options
120+
121+
- **database**: The name of the database. (string, required)
122+
- **table**: The name of the partitioned table. (string, required)
123+
- **partition_kv**: key-value pairs for the partition (string to string map, required)
124+
- **with_location**: Check the partition existence with the location (boolean, default: `false`)
125+
- **error_if_not_exist**: Raise the exception if the partition or location does not exist. (boolean, default: `false`)
126+
- **catalog_id**: glue data catalog id if you use a catalog different from account/region default catalog. (string, optional)
127+
128+
### Output Parameters
129+
130+
- **athena.last_partition_exists.partition_exists**: `true` if the partition exists, or `false` (boolean)
131+
- **athena.last_partition_exists.location_exists**: `true` if the partition location exists, or `false`. `null` if not set **with_location** option is `true`. (boolean)
132+
115133
## Configuration for `athena.apas>` operator
116134

117135
`apas` means *Add a partition as select* that creates a partition the query result is stored.
@@ -249,6 +267,22 @@ Nothing
249267

250268
Nothing
251269

270+
## Configuration for `athena.table_exists?>` operator
271+
272+
### Options
273+
274+
- **database**: The name of the database. (string, required)
275+
- **table**: The name of the table. (string, required)
276+
- **with_location**: Check the partition existence with the location (boolean, default: `false`)
277+
- **error_if_not_exist**: Raise the exception if the table or location does not exist. (boolean, default: `false`)
278+
- **catalog_id**: glue data catalog id if you use a catalog different from account/region default catalog. (string, optional)
279+
280+
### Output Parameters
281+
282+
- **athena.last_partition_exists.table_exists**: `true` if the table exists, or `false` (boolean)
283+
- **athena.last_partition_exists.location_exists**: `true` if the table location exists, or `false`. `null` if not set **with_location** option is `true`. (boolean)
284+
285+
252286
# Development
253287

254288
## Run an Example

Diff for: build.gradle

+1-1
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ plugins {
55
}
66

77
group = 'pro.civitaspo'
8-
version = '0.2.3'
8+
version = '0.2.4'
99

1010
def digdagVersion = '0.9.37'
1111
def awsSdkVersion = "1.11.587"

Diff for: example/example.dig

+75-3
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ _export:
44
- file://${repos}
55
# - https://jitpack.io
66
dependencies:
7-
- pro.civitaspo:digdag-operator-athena:0.2.3
7+
- pro.civitaspo:digdag-operator-athena:0.2.4
88
athena:
99
auth_method: profile
1010
value: 5
@@ -92,6 +92,27 @@ _export:
9292
save_mode: overwrite
9393

9494
+step14:
95+
+step1:
96+
athena.partition_exists?>:
97+
database: ${database}
98+
table: hoge
99+
with_location: true
100+
partition_kv:
101+
b: "9"
102+
c: "10"
103+
+step2:
104+
echo>: ${athena.last_partition_exists}
105+
+step3:
106+
athena.partition_exists?>:
107+
database: ${database}
108+
table: hoge
109+
partition_kv:
110+
b: "9"
111+
c: "10"
112+
+step4:
113+
echo>: ${athena.last_partition_exists}
114+
115+
+step15:
95116
loop>: 10
96117
_parallel: true
97118
_do:
@@ -101,15 +122,66 @@ _export:
101122
location: ${output}/hoge_${i}/
102123
save_mode: overwrite
103124

104-
+step15:
125+
+step16:
126+
+step1:
127+
athena.table_exists?>:
128+
database: ${database}
129+
table: hoge_1
130+
with_location: true
131+
+step2:
132+
echo>: ${athena.last_table_exists}
133+
+step3:
134+
athena.table_exists?>:
135+
database: ${database}
136+
table: hoge
137+
+step4:
138+
echo>: ${athena.last_table_exists}
139+
140+
+step17:
105141
athena.drop_table_multi>:
106142
database: ${database}
107143
regexp: 'hoge_\d+'
108144
with_location: true
109145
limit: 3
110146

111-
+step16:
147+
+step18:
112148
athena.drop_table_multi>:
113149
database: ${database}
114150
regexp: 'hoge_\d+'
115151
with_location: true
152+
153+
+step19:
154+
+step1:
155+
athena.table_exists?>:
156+
database: ${database}
157+
table: hoge_1
158+
with_location: true
159+
+step2:
160+
echo>: ${athena.last_table_exists}
161+
+step3:
162+
athena.table_exists?>:
163+
database: ${database}
164+
table: hoge
165+
+step4:
166+
echo>: ${athena.last_table_exists}
167+
168+
+step20:
169+
+step1:
170+
athena.partition_exists?>:
171+
database: ${database}
172+
table: hoge_1
173+
with_location: true
174+
partition_kv:
175+
b: "9"
176+
c: "10"
177+
+step2:
178+
echo>: ${athena.last_partition_exists}
179+
+step3:
180+
athena.partition_exists?>:
181+
database: ${database}
182+
table: hoge_1
183+
partition_kv:
184+
b: "9"
185+
c: "10"
186+
+step4:
187+
echo>: ${athena.last_partition_exists}

Diff for: src/main/scala/pro/civitaspo/digdag/plugin/athena/AthenaPlugin.scala

+5-1
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,10 @@ import pro.civitaspo.digdag.plugin.athena.ctas.AthenaCtasOperator
1313
import pro.civitaspo.digdag.plugin.athena.drop_partition.AthenaDropPartitionOperator
1414
import pro.civitaspo.digdag.plugin.athena.drop_table.AthenaDropTableOperator
1515
import pro.civitaspo.digdag.plugin.athena.drop_table_multi.AthenaDropTableMultiOperator
16+
import pro.civitaspo.digdag.plugin.athena.partition_exists.AthenaPartitionExistsOperator
1617
import pro.civitaspo.digdag.plugin.athena.preview.AthenaPreviewOperator
1718
import pro.civitaspo.digdag.plugin.athena.query.AthenaQueryOperator
19+
import pro.civitaspo.digdag.plugin.athena.table_exists.AthenaTableExistsOperator
1820

1921

2022
object AthenaPlugin
@@ -38,7 +40,9 @@ object AthenaPlugin
3840
operatorFactory("athena.query", classOf[AthenaQueryOperator]),
3941
operatorFactory("athena.preview", classOf[AthenaPreviewOperator]),
4042
operatorFactory("athena.drop_table", classOf[AthenaDropTableOperator]),
41-
operatorFactory("athena.drop_table_multi", classOf[AthenaDropTableMultiOperator])
43+
operatorFactory("athena.drop_table_multi", classOf[AthenaDropTableMultiOperator]),
44+
operatorFactory("athena.partition_exists?", classOf[AthenaPartitionExistsOperator]),
45+
operatorFactory("athena.table_exists?", classOf[AthenaTableExistsOperator])
4246
)
4347
}
4448

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
package pro.civitaspo.digdag.plugin.athena.partition_exists
2+
3+
4+
import com.amazonaws.services.glue.model.Partition
5+
import com.google.common.collect.ImmutableList
6+
import io.digdag.client.config.{Config, ConfigKey}
7+
import io.digdag.spi.{OperatorContext, TaskResult, TemplateEngine}
8+
import pro.civitaspo.digdag.plugin.athena.AbstractAthenaOperator
9+
10+
import scala.jdk.CollectionConverters._
11+
import scala.util.{Failure, Success, Try}
12+
13+
14+
class AthenaPartitionExistsOperator(operatorName: String,
15+
context: OperatorContext,
16+
systemConfig: Config,
17+
templateEngine: TemplateEngine)
18+
extends AbstractAthenaOperator(operatorName, context, systemConfig, templateEngine)
19+
{
20+
protected val database: String = params.get("database", classOf[String])
21+
protected val table: String = params.get("table", classOf[String])
22+
protected val partitionKv: Map[String, String] = params.getMap("partition_kv", classOf[String], classOf[String]).asScala.toMap
23+
protected val withLocation: Boolean = params.get("with_location", classOf[Boolean], false)
24+
protected val errorIfNotExist: Boolean = params.get("error_if_not_exist", classOf[Boolean], false)
25+
protected val catalogId: Option[String] = Option(params.getOptional("catalog_id", classOf[String]).orNull())
26+
27+
28+
override def runTask(): TaskResult =
29+
{
30+
val storeParams: Config = cf.create()
31+
val lastPartitionExistsParams = storeParams.getNestedOrSetEmpty("athena").getNestedOrSetEmpty("last_partition_exists")
32+
33+
run(storeParams = lastPartitionExistsParams)
34+
35+
val builder = TaskResult.defaultBuilder(request)
36+
builder.resetStoreParams(ImmutableList.of(ConfigKey.of("athena", "last_partition_exists")))
37+
builder.storeParams(storeParams)
38+
builder.build()
39+
}
40+
41+
protected def run(storeParams: Config): Unit =
42+
{
43+
if (aws.glue.partition.exists(catalogId, database, table, partitionKv)) {
44+
storeParams.set("partition_exists", true)
45+
}
46+
else {
47+
if (errorIfNotExist) throw new IllegalStateException(s"The partition{${partitionKv.mkString(",")}} does not exist.")
48+
storeParams.set("partition_exists", false)
49+
}
50+
51+
if (withLocation) {
52+
Try(aws.glue.partition.describe(catalogId, database, table, partitionKv)) match {
53+
case Success(p) =>
54+
val location: String = {
55+
val l = p.getStorageDescriptor.getLocation
56+
if (l.endsWith("/")) l
57+
else l + "/"
58+
}
59+
if (aws.s3.hasObjects(location)) {
60+
storeParams.set("location_exists", true)
61+
}
62+
else {
63+
if (errorIfNotExist) throw new IllegalStateException(s"The location '$location' does not exist, although the partition{${partitionKv.mkString(",")}} exists.")
64+
storeParams.set("location_exists", false)
65+
}
66+
67+
case Failure(e) =>
68+
logger.warn(s"The partition{${partitionKv.mkString(",")}} may not exist.", e)
69+
storeParams.set("location_exists", false)
70+
}
71+
}
72+
}
73+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
package pro.civitaspo.digdag.plugin.athena.table_exists
2+
3+
4+
import com.google.common.collect.ImmutableList
5+
import io.digdag.client.config.{Config, ConfigKey}
6+
import io.digdag.spi.{OperatorContext, TaskResult, TemplateEngine}
7+
import pro.civitaspo.digdag.plugin.athena.AbstractAthenaOperator
8+
9+
import scala.util.{Failure, Success, Try}
10+
11+
12+
class AthenaTableExistsOperator(operatorName: String,
13+
context: OperatorContext,
14+
systemConfig: Config,
15+
templateEngine: TemplateEngine)
16+
extends AbstractAthenaOperator(operatorName, context, systemConfig, templateEngine)
17+
{
18+
19+
val database: String = params.get("database", classOf[String])
20+
val table: String = params.get("table", classOf[String])
21+
val withLocation: Boolean = params.get("with_location", classOf[Boolean], false)
22+
val errorIfNotExist: Boolean = params.get("error_if_not_exist", classOf[Boolean], false)
23+
val catalogId: Option[String] = Option(params.getOptional("catalog_id", classOf[String]).orNull())
24+
25+
override def runTask(): TaskResult =
26+
{
27+
val storeParams: Config = cf.create()
28+
val lastTableExistsParams = storeParams.getNestedOrSetEmpty("athena").getNestedOrSetEmpty("last_table_exists")
29+
30+
run(storeParams = lastTableExistsParams)
31+
32+
val builder = TaskResult.defaultBuilder(request)
33+
builder.resetStoreParams(ImmutableList.of(ConfigKey.of("athena", "last_table_exists")))
34+
builder.storeParams(storeParams)
35+
builder.build()
36+
}
37+
38+
protected def run(storeParams: Config): Unit =
39+
{
40+
if (aws.glue.table.exists(catalogId, database, table)) {
41+
storeParams.set("table_exists", true)
42+
}
43+
else {
44+
if (errorIfNotExist) throw new IllegalStateException(s"The table '$database.$table' does not exist.")
45+
storeParams.set("table_exists", false)
46+
}
47+
48+
if (withLocation) {
49+
Try(aws.glue.table.describe(catalogId, database, table)) match {
50+
case Success(t) =>
51+
val location: String = {
52+
val l = t.getStorageDescriptor.getLocation
53+
if (l.endsWith("/")) l
54+
else l + "/"
55+
}
56+
if (aws.s3.hasObjects(location)) {
57+
storeParams.set("location_exists", true)
58+
}
59+
else {
60+
if (errorIfNotExist) throw new IllegalStateException(s"The location '$location' does not exist, although the table '$database.$table' exists.")
61+
storeParams.set("location_exists", false)
62+
}
63+
64+
case Failure(e) =>
65+
logger.warn(s"The table '$database.$table' may not exist.", e)
66+
storeParams.set("location_exists", false)
67+
}
68+
}
69+
}
70+
}

0 commit comments

Comments
 (0)