Skip to content

Commit eb918f9

Browse files
authored
Merge pull request #76 from civitaspo/develop
v0.3.1
2 parents 56e7d18 + 679e7fc commit eb918f9

File tree

6 files changed

+104
-25
lines changed

6 files changed

+104
-25
lines changed

Diff for: CHANGELOG.md

+6
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,9 @@
1+
0.3.1 (2019-08-05)
2+
==================
3+
4+
* [Fix -- `athena.ctas>`] When using `save_mode: overwrite`, delete the specified table and location, not the table location that the data catalog has.
5+
* [New featuere -- `athena.drop_table_multi>`] `protect` option.
6+
17
0.3.0 (2019-07-30)
28
==================
39

Diff for: README.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ _export:
1515
repositories:
1616
- https://jitpack.io
1717
dependencies:
18-
- pro.civitaspo:digdag-operator-athena:0.3.0
18+
- pro.civitaspo:digdag-operator-athena:0.3.1
1919
athena:
2020
auth_method: profile
2121

Diff for: build.gradle

+1-1
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ plugins {
55
}
66

77
group = 'pro.civitaspo'
8-
version = '0.3.0'
8+
version = '0.3.1'
99

1010
def digdagVersion = '0.9.37'
1111
def awsSdkVersion = "1.11.587"

Diff for: example/example.dig

+14-6
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ _export:
44
- file://${repos}
55
# - https://jitpack.io
66
dependencies:
7-
- pro.civitaspo:digdag-operator-athena:0.3.0
7+
- pro.civitaspo:digdag-operator-athena:0.3.1
88
athena:
99
auth_method: profile
1010
value: 5
@@ -138,11 +138,19 @@ _export:
138138
echo>: ${athena.last_table_exists}
139139

140140
+step17:
141-
athena.drop_table_multi>:
142-
database: ${database}
143-
regexp: 'hoge_\d+'
144-
with_location: true
145-
limit: 3
141+
+step1:
142+
athena.drop_table_multi>:
143+
database: ${database}
144+
regexp: 'hoge_\d+'
145+
with_location: true
146+
limit: 3
147+
+step2:
148+
athena.drop_table_multi>:
149+
database: ${database}
150+
regexp: 'hoge_\d+'
151+
with_location: true
152+
protect:
153+
created_within: 1h
146154

147155
+step18:
148156
athena.drop_table_multi>:

Diff for: src/main/scala/pro/civitaspo/digdag/plugin/athena/ctas/AthenaCtasOperator.scala

+13-6
Original file line numberDiff line numberDiff line change
@@ -151,16 +151,23 @@ class AthenaCtasOperator(operatorName: String,
151151
return TaskResult.empty(request)
152152
}
153153

154+
case SaveMode.Overwrite =>
155+
if (aws.glue.table.exists(catalogId, database, table)) {
156+
logger.info(s"'$database.$table' already exists, so delete this.")
157+
aws.glue.table.delete(catalogId, database, table)
158+
}
159+
if (location.exists(aws.s3.hasObjects)) {
160+
logger.info(s"${location.get} already exists, so delete this.")
161+
aws.s3.rm_r(location.get).foreach(uri => logger.info(s"Deleted: ${uri.toString}"))
162+
}
163+
154164
case _ => // do nothing
155165
}
156166

157167
val subTask: Config = cf.create()
158-
if (saveMode.equals(SaveMode.Overwrite)) {
159-
subTask.setNested("+drop-before-ctas", buildDropTableSubTaskConfig(with_location = true))
160-
}
161168
subTask.setNested("+ctas", buildCtasQuerySubTaskConfig())
162169
if (tableMode.equals(TableMode.DataOnly)) {
163-
subTask.setNested("+drop-after-ctas", buildDropTableSubTaskConfig(with_location = false))
170+
subTask.setNested("+drop-after-ctas", buildDropTableSubTaskConfig())
164171
}
165172

166173
val builder: ImmutableTaskResult.Builder = TaskResult.defaultBuilder(cf)
@@ -235,14 +242,14 @@ class AthenaCtasOperator(operatorName: String,
235242
subTask
236243
}
237244

238-
protected def buildDropTableSubTaskConfig(with_location: Boolean): Config =
245+
protected def buildDropTableSubTaskConfig(): Config =
239246
{
240247
val subTask: Config = cf.create()
241248

242249
subTask.set("_type", "athena.drop_table")
243250
subTask.set("database", database)
244251
subTask.set("table", table)
245-
subTask.set("with_location", with_location)
252+
subTask.set("with_location", false)
246253
catalogId.foreach(cid => subTask.set("catalog_id", cid))
247254

248255
putCommonSettingToSubTask(subTask)
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,12 @@
11
package pro.civitaspo.digdag.plugin.athena.drop_table_multi
22

33

4+
import java.util.Date
5+
6+
import com.amazonaws.services.glue.model.Table
47
import io.digdag.client.config.Config
58
import io.digdag.spi.{OperatorContext, TaskResult, TemplateEngine}
9+
import io.digdag.util.DurationParam
610
import pro.civitaspo.digdag.plugin.athena.AbstractAthenaOperator
711

812

@@ -14,29 +18,83 @@ class AthenaDropTableMultiOperator(operatorName: String,
1418
{
1519
val database: String = params.get("database", classOf[String])
1620
val regexp: String = params.getOptional("regexp", classOf[String]).orNull()
21+
val protect: Option[Config] = Option(params.getOptionalNested("protect").orNull())
1722
val limit: Option[Int] = Option(params.getOptional("limit", classOf[Int]).orNull())
1823
val withLocation: Boolean = params.get("with_location", classOf[Boolean], false)
1924
val catalogId: Option[String] = Option(params.getOptional("catalog_id", classOf[String]).orNull())
2025

26+
val now: Long = System.currentTimeMillis()
27+
2128
override def runTask(): TaskResult =
2229
{
2330
logger.info(s"Drop tables matched by the expression: /$regexp/ in $database")
2431
aws.glue.table.list(catalogId, database, Option(regexp), limit).foreach { t =>
25-
if (withLocation) {
26-
val location: String = {
27-
val l = t.getStorageDescriptor.getLocation
28-
if (l.endsWith("/")) l
29-
else l + "/"
30-
}
31-
if (aws.s3.hasObjects(location)) {
32-
logger.info(s"Delete objects because the location $location has objects.")
33-
aws.s3.rm_r(location).foreach(uri => logger.info(s"Deleted: ${uri.toString}"))
32+
if (!isProtected(t)) {
33+
if (withLocation) {
34+
val location: String = {
35+
val l = t.getStorageDescriptor.getLocation
36+
if (l.endsWith("/")) l
37+
else l + "/"
38+
}
39+
if (aws.s3.hasObjects(location)) {
40+
logger.info(s"Delete objects because the location $location has objects.")
41+
aws.s3.rm_r(location).foreach(uri => logger.info(s"Deleted: ${uri.toString}"))
42+
}
3443
}
44+
logger.info(s"Drop the table '$database.${t.getName}'")
45+
aws.glue.table.delete(catalogId, database, t.getName)
3546
}
36-
logger.info(s"Drop the table '$database.${t.getName}'")
37-
aws.glue.table.delete(catalogId, database, t.getName)
3847
}
3948
TaskResult.empty(cf)
4049
}
4150

51+
protected def isProtected(t: Table): Boolean =
52+
{
53+
protect match {
54+
case None => return false
55+
case Some(c) =>
56+
Option(c.getOptional("created_within", classOf[DurationParam]).orNull()) match {
57+
case None => // do nothing
58+
case Some(d) =>
59+
Option(t.getCreateTime).foreach { date =>
60+
if (isDateWithin(date, d)) {
61+
logger.info(s"Protect the table ${t.getDatabaseName}.${t.getName} because this is created" +
62+
s" within ${d.toString} (created at ${t.getCreateTime.toString}).")
63+
return true
64+
}
65+
}
66+
}
67+
Option(c.getOptional("updated_within", classOf[DurationParam]).orNull()) match {
68+
case None => // do nothing
69+
case Some(d) =>
70+
Option(t.getUpdateTime).foreach { date =>
71+
if (isDateWithin(date, d)) {
72+
logger.info(s"Protect the table ${t.getDatabaseName}.${t.getName} because this is updated" +
73+
s" within ${d.toString} (updated at ${t.getUpdateTime.toString}).")
74+
return true
75+
}
76+
}
77+
}
78+
Option(c.getOptional("accessed_within", classOf[DurationParam]).orNull()) match {
79+
case None => // do nothing
80+
case Some(d) =>
81+
Option(t.getLastAccessTime).foreach { date =>
82+
if (isDateWithin(date, d)) {
83+
logger.info(s"Protect the table ${t.getDatabaseName}.${t.getName} because this is accessed" +
84+
s" within ${d.toString} (last accessed at ${t.getLastAccessTime.toString}).")
85+
return true
86+
}
87+
}
88+
}
89+
}
90+
false
91+
}
92+
93+
protected def isDateWithin(target: Date,
94+
durationWithin: DurationParam): Boolean =
95+
{
96+
val dateWithin: Date = new Date(now - durationWithin.getDuration.toMillis)
97+
target.after(dateWithin)
98+
}
99+
42100
}

0 commit comments

Comments
 (0)