Skip to content

Commit 7ea9ee7

Browse files
authored
Merge pull request #74 from civitaspo/protect-created-within
Add a new feature: `protect` option for `athena.drop_table_multi>` op…
2 parents 32215cf + fb60431 commit 7ea9ee7

File tree

2 files changed

+82
-16
lines changed

2 files changed

+82
-16
lines changed

Diff for: example/example.dig

+13-5
Original file line numberDiff line numberDiff line change
@@ -138,11 +138,19 @@ _export:
138138
echo>: ${athena.last_table_exists}
139139

140140
+step17:
141-
athena.drop_table_multi>:
142-
database: ${database}
143-
regexp: 'hoge_\d+'
144-
with_location: true
145-
limit: 3
141+
+step1:
142+
athena.drop_table_multi>:
143+
database: ${database}
144+
regexp: 'hoge_\d+'
145+
with_location: true
146+
limit: 3
147+
+step2:
148+
athena.drop_table_multi>:
149+
database: ${database}
150+
regexp: 'hoge_\d+'
151+
with_location: true
152+
protect:
153+
created_within: 1h
146154

147155
+step18:
148156
athena.drop_table_multi>:
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,12 @@
11
package pro.civitaspo.digdag.plugin.athena.drop_table_multi
22

33

4+
import java.util.Date
5+
6+
import com.amazonaws.services.glue.model.Table
47
import io.digdag.client.config.Config
58
import io.digdag.spi.{OperatorContext, TaskResult, TemplateEngine}
9+
import io.digdag.util.DurationParam
610
import pro.civitaspo.digdag.plugin.athena.AbstractAthenaOperator
711

812

@@ -14,29 +18,83 @@ class AthenaDropTableMultiOperator(operatorName: String,
1418
{
1519
val database: String = params.get("database", classOf[String])
1620
val regexp: String = params.getOptional("regexp", classOf[String]).orNull()
21+
val protect: Option[Config] = Option(params.getOptionalNested("protect").orNull())
1722
val limit: Option[Int] = Option(params.getOptional("limit", classOf[Int]).orNull())
1823
val withLocation: Boolean = params.get("with_location", classOf[Boolean], false)
1924
val catalogId: Option[String] = Option(params.getOptional("catalog_id", classOf[String]).orNull())
2025

26+
val now: Long = System.currentTimeMillis()
27+
2128
override def runTask(): TaskResult =
2229
{
2330
logger.info(s"Drop tables matched by the expression: /$regexp/ in $database")
2431
aws.glue.table.list(catalogId, database, Option(regexp), limit).foreach { t =>
25-
if (withLocation) {
26-
val location: String = {
27-
val l = t.getStorageDescriptor.getLocation
28-
if (l.endsWith("/")) l
29-
else l + "/"
30-
}
31-
if (aws.s3.hasObjects(location)) {
32-
logger.info(s"Delete objects because the location $location has objects.")
33-
aws.s3.rm_r(location).foreach(uri => logger.info(s"Deleted: ${uri.toString}"))
32+
if (!isProtected(t)) {
33+
if (withLocation) {
34+
val location: String = {
35+
val l = t.getStorageDescriptor.getLocation
36+
if (l.endsWith("/")) l
37+
else l + "/"
38+
}
39+
if (aws.s3.hasObjects(location)) {
40+
logger.info(s"Delete objects because the location $location has objects.")
41+
aws.s3.rm_r(location).foreach(uri => logger.info(s"Deleted: ${uri.toString}"))
42+
}
3443
}
44+
logger.info(s"Drop the table '$database.${t.getName}'")
45+
aws.glue.table.delete(catalogId, database, t.getName)
3546
}
36-
logger.info(s"Drop the table '$database.${t.getName}'")
37-
aws.glue.table.delete(catalogId, database, t.getName)
3847
}
3948
TaskResult.empty(cf)
4049
}
4150

51+
protected def isProtected(t: Table): Boolean =
52+
{
53+
protect match {
54+
case None => return false
55+
case Some(c) =>
56+
Option(c.getOptional("created_within", classOf[DurationParam]).orNull()) match {
57+
case None => // do nothing
58+
case Some(d) =>
59+
Option(t.getCreateTime).foreach { date =>
60+
if (isDateWithin(date, d)) {
61+
logger.info(s"Protect the table ${t.getDatabaseName}.${t.getName} because this is created" +
62+
s" within ${d.toString} (created at ${t.getCreateTime.toString}).")
63+
return true
64+
}
65+
}
66+
}
67+
Option(c.getOptional("updated_within", classOf[DurationParam]).orNull()) match {
68+
case None => // do nothing
69+
case Some(d) =>
70+
Option(t.getUpdateTime).foreach { date =>
71+
if (isDateWithin(date, d)) {
72+
logger.info(s"Protect the table ${t.getDatabaseName}.${t.getName} because this is updated" +
73+
s" within ${d.toString} (updated at ${t.getUpdateTime.toString}).")
74+
return true
75+
}
76+
}
77+
}
78+
Option(c.getOptional("accessed_within", classOf[DurationParam]).orNull()) match {
79+
case None => // do nothing
80+
case Some(d) =>
81+
Option(t.getLastAccessTime).foreach { date =>
82+
if (isDateWithin(date, d)) {
83+
logger.info(s"Protect the table ${t.getDatabaseName}.${t.getName} because this is accessed" +
84+
s" within ${d.toString} (last accessed at ${t.getLastAccessTime.toString}).")
85+
return true
86+
}
87+
}
88+
}
89+
}
90+
false
91+
}
92+
93+
protected def isDateWithin(target: Date,
94+
durationWithin: DurationParam): Boolean =
95+
{
96+
val dateWithin: Date = new Date(now - durationWithin.getDuration.toMillis)
97+
target.after(dateWithin)
98+
}
99+
42100
}

0 commit comments

Comments
 (0)