Skip to content

Commit edba39c

Browse files
authored
refactor: migrate to dynamic properties (#81)
* refactor: migrate to dynamic properties migrate Abstract Postgres properties MySQL properties DB2, Oracle properties mongodb properties SQLServer properties
1 parent 8866847 commit edba39c

File tree

36 files changed

+320
-304
lines changed

36 files changed

+320
-304
lines changed

plugin-debezium-db2/src/main/java/io/kestra/plugin/debezium/db2/Capture.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import io.debezium.connector.db2.Db2Connector;
44
import io.kestra.core.models.annotations.Example;
55
import io.kestra.core.models.annotations.Plugin;
6+
import io.kestra.core.models.property.Property;
67
import io.kestra.core.runners.RunContext;
78
import io.kestra.plugin.debezium.AbstractDebeziumTask;
89
import io.swagger.v3.oas.annotations.media.Schema;
@@ -38,10 +39,10 @@
3839
)
3940
public class Capture extends AbstractDebeziumTask implements Db2Interface {
4041

41-
protected String database;
42+
protected Property<String> database;
4243

4344
@Builder.Default
44-
private Db2Interface.SnapshotMode snapshotMode = Db2Interface.SnapshotMode.INITIAL;
45+
private Property<Db2Interface.SnapshotMode> snapshotMode = Property.of(SnapshotMode.INITIAL);
4546

4647
@Override
4748
protected boolean needDatabaseHistory() {
@@ -54,12 +55,12 @@ protected Properties properties(RunContext runContext, Path offsetFile, Path his
5455

5556
props.setProperty("connector.class", Db2Connector.class.getName());
5657

57-
props.setProperty("database.dbname", runContext.render(this.database));
58+
props.setProperty("database.dbname", runContext.render(this.database).as(String.class).orElseThrow());
5859

5960
props.setProperty("include.schema.changes", "false");
6061

6162
if (this.snapshotMode != null) {
62-
props.setProperty("snapshot.mode", this.snapshotMode.name().toLowerCase(Locale.ROOT));
63+
props.setProperty("snapshot.mode", runContext.render(this.snapshotMode).as(SnapshotMode.class).orElseThrow().name().toLowerCase(Locale.ROOT));
6364
}
6465

6566
return props;

plugin-debezium-db2/src/main/java/io/kestra/plugin/debezium/db2/Db2Interface.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package io.kestra.plugin.debezium.db2;
22

33
import io.kestra.core.models.annotations.PluginProperty;
4+
import io.kestra.core.models.property.Property;
45
import io.swagger.v3.oas.annotations.media.Schema;
56

67
import jakarta.validation.constraints.NotNull;
@@ -9,9 +10,8 @@ public interface Db2Interface {
910
@Schema(
1011
title = "The name of the DB2 database from which to stream the changes."
1112
)
12-
@PluginProperty(dynamic = true)
1313
@NotNull
14-
String getDatabase();
14+
Property<String> getDatabase();
1515

1616
@Schema(
1717
title = "Specifies the criteria for running a snapshot when the connector starts.",
@@ -23,9 +23,8 @@ public interface Db2Interface {
2323
"- `NO_DATA`: The connector captures the structure of all relevant tables, performing all the steps described in the INITIAL, except that it does not create READ events to represent the data set at the point of the connector’s start-up.\n" +
2424
"- `RECOVERY`: Set this option to restore a database schema history topic that is lost or corrupted. After a restart, the connector runs a snapshot that rebuilds the topic from the source tables."
2525
)
26-
@PluginProperty(dynamic = false)
2726
@NotNull
28-
SnapshotMode getSnapshotMode();
27+
Property<SnapshotMode> getSnapshotMode();
2928

3029
public enum SnapshotMode {
3130
ALWAYS,

plugin-debezium-db2/src/main/java/io/kestra/plugin/debezium/db2/RealtimeTrigger.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import io.kestra.core.models.annotations.Plugin;
55
import io.kestra.core.models.conditions.ConditionContext;
66
import io.kestra.core.models.executions.Execution;
7+
import io.kestra.core.models.property.Property;
78
import io.kestra.core.models.triggers.*;
89
import io.kestra.plugin.debezium.AbstractDebeziumInterface;
910
import io.kestra.plugin.debezium.AbstractDebeziumRealtimeTrigger;
@@ -50,9 +51,9 @@
5051
)
5152
public class RealtimeTrigger extends AbstractDebeziumRealtimeTrigger implements Db2Interface, AbstractDebeziumInterface {
5253
@Builder.Default
53-
private SnapshotMode snapshotMode = SnapshotMode.INITIAL;
54+
private Property<SnapshotMode> snapshotMode = Property.of(SnapshotMode.INITIAL);
5455

55-
private String database;
56+
private Property<String> database;
5657

5758
@Override
5859
public Publisher<Execution> evaluate(ConditionContext conditionContext, TriggerContext context) throws Exception {

plugin-debezium-db2/src/main/java/io/kestra/plugin/debezium/db2/Trigger.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import io.kestra.core.models.annotations.Plugin;
55
import io.kestra.core.models.conditions.ConditionContext;
66
import io.kestra.core.models.executions.Execution;
7+
import io.kestra.core.models.property.Property;
78
import io.kestra.core.models.triggers.*;
89
import io.kestra.core.runners.RunContext;
910
import io.kestra.plugin.debezium.AbstractDebeziumInterface;
@@ -42,9 +43,9 @@
4243
)
4344
public class Trigger extends AbstractDebeziumTrigger implements Db2Interface, AbstractDebeziumInterface {
4445
@Builder.Default
45-
private Db2Interface.SnapshotMode snapshotMode = Db2Interface.SnapshotMode.INITIAL;
46+
private Property<SnapshotMode> snapshotMode = Property.of(SnapshotMode.INITIAL);
4647

47-
private String database;
48+
private Property<String> database;
4849

4950
@Override
5051
public Optional<Execution> evaluate(ConditionContext conditionContext, TriggerContext context) throws Exception {

plugin-debezium-db2/src/test/java/io/kestra/plugin/debezium/db2/CaptureTest.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package io.kestra.plugin.debezium.db2;
22

33
import com.google.common.base.Charsets;
4+
import io.kestra.core.models.property.Property;
45
import io.kestra.core.serializers.FileSerde;
56
import io.kestra.core.storages.StorageInterface;
67
import io.kestra.core.utils.IdUtils;
@@ -59,13 +60,13 @@ void run() throws Exception {
5960
Capture task = Capture.builder()
6061
.id(IdUtils.create())
6162
.type(Capture.class.getName())
62-
.snapshotMode(Db2Interface.SnapshotMode.INITIAL)
63-
.hostname("127.0.0.1")
64-
.port("5023")
65-
.username(getUsername())
66-
.password(getPassword())
67-
.database("kestra")
68-
.maxRecords(5)
63+
.snapshotMode(Property.of(Db2Interface.SnapshotMode.INITIAL))
64+
.hostname(Property.of("127.0.0.1"))
65+
.port(Property.of("5023"))
66+
.username(Property.of(getUsername()))
67+
.password(Property.of(getPassword()))
68+
.database(Property.of("kestra"))
69+
.maxRecords(Property.of(5))
6970
.includedTables(List.of("DB2INST1.EVENTS"))
7071
.build();
7172

plugin-debezium-mongodb/src/main/java/io/kestra/plugin/debezium/mongodb/Capture.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import io.debezium.connector.mongodb.MongoDbConnector;
44
import io.kestra.core.models.annotations.Example;
55
import io.kestra.core.models.annotations.Plugin;
6+
import io.kestra.core.models.property.Property;
67
import io.kestra.core.runners.RunContext;
78
import io.kestra.plugin.debezium.AbstractDebeziumTask;
89
import io.swagger.v3.oas.annotations.media.Schema;
@@ -65,10 +66,10 @@ public class Capture extends AbstractDebeziumTask implements MongodbInterface {
6566
private Object excludedCollections;
6667

6768
@NotNull
68-
private String connectionString;
69+
private Property<String> connectionString;
6970

7071
@Builder.Default
71-
private MongodbInterface.SnapshotMode snapshotMode = MongodbInterface.SnapshotMode.INITIAL;
72+
private Property<MongodbInterface.SnapshotMode> snapshotMode = Property.of(SnapshotMode.INITIAL);
7273

7374
@Override
7475
protected boolean needDatabaseHistory() {
@@ -81,7 +82,7 @@ protected Properties properties(RunContext runContext, Path offsetFile, Path his
8182

8283
props.setProperty("connector.class", MongoDbConnector.class.getName());
8384

84-
props.setProperty("mongodb.connection.string", runContext.render(this.connectionString));
85+
props.setProperty("mongodb.connection.string", runContext.render(this.connectionString).as(String.class).orElse(null));
8586

8687
if (this.includedCollections != null) {
8788
props.setProperty("collection.include.list", joinProperties(runContext, this.includedCollections));
@@ -94,7 +95,7 @@ protected Properties properties(RunContext runContext, Path offsetFile, Path his
9495
props.setProperty("capture.mode", "change_streams_update_full_with_pre_image");
9596

9697
if (this.snapshotMode != null) {
97-
props.setProperty("snapshot.mode", this.snapshotMode.name().toLowerCase(Locale.ROOT));
98+
props.setProperty("snapshot.mode", runContext.render(this.snapshotMode).as(SnapshotMode.class).orElseThrow().name().toLowerCase(Locale.ROOT));
9899
}
99100

100101
return props;

plugin-debezium-mongodb/src/main/java/io/kestra/plugin/debezium/mongodb/MongodbInterface.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package io.kestra.plugin.debezium.mongodb;
22

33
import io.kestra.core.models.annotations.PluginProperty;
4+
import io.kestra.core.models.property.Property;
45
import io.swagger.v3.oas.annotations.media.Schema;
56
import jakarta.validation.constraints.NotNull;
67
import lombok.Getter;
@@ -15,8 +16,7 @@ public interface MongodbInterface {
1516
"mongodb://mongo_user:mongo_passwd@mongos0.example.com:27017,mongos1.example.com:27017/"
1617
}
1718
)
18-
@PluginProperty(dynamic = true)
19-
String getConnectionString();
19+
Property<String> getConnectionString();
2020

2121
@Schema(
2222
title = "The name of the MongoDB database collection included from which to stream the changes.",
@@ -42,9 +42,8 @@ public interface MongodbInterface {
4242
"- `NO_DATA`: The connector captures the structure of all relevant tables, performing all the steps described in the default snapshot workflow, except that it does not create READ events to represent the data set at the point of the connector’s start-up.\n" +
4343
"- `WHEN_NEEDED`: The connector runs a snapshot upon startup whenever it deems it necessary. That is, when no offsets are available, or when a previously recorded offset specifies a binlog location or GTID that is not available in the server."
4444
)
45-
@PluginProperty(dynamic = false)
4645
@NotNull
47-
SnapshotMode getSnapshotMode();
46+
Property<SnapshotMode> getSnapshotMode();
4847

4948
public enum SnapshotMode {
5049
INITIAL,

plugin-debezium-mongodb/src/main/java/io/kestra/plugin/debezium/mongodb/RealtimeTrigger.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import io.kestra.core.models.annotations.Plugin;
55
import io.kestra.core.models.conditions.ConditionContext;
66
import io.kestra.core.models.executions.Execution;
7+
import io.kestra.core.models.property.Property;
78
import io.kestra.core.models.triggers.TriggerContext;
89
import io.kestra.core.models.triggers.TriggerService;
910
import io.kestra.plugin.debezium.AbstractDebeziumInterface;
@@ -69,14 +70,14 @@
6970
)
7071
public class RealtimeTrigger extends AbstractDebeziumRealtimeTrigger implements MongodbInterface, AbstractDebeziumInterface {
7172
@Builder.Default
72-
private MongodbInterface.SnapshotMode snapshotMode = MongodbInterface.SnapshotMode.INITIAL;
73+
private Property<MongodbInterface.SnapshotMode> snapshotMode = Property.of(SnapshotMode.INITIAL);
7374

7475
private Object includedCollections;
7576

7677
private Object excludedCollections;
7778

7879
@NotNull
79-
private String connectionString;
80+
private Property<String> connectionString;
8081

8182
@Override
8283
public Publisher<Execution> evaluate(ConditionContext conditionContext, TriggerContext context) throws Exception {

plugin-debezium-mongodb/src/main/java/io/kestra/plugin/debezium/mongodb/Trigger.java

Lines changed: 20 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import io.kestra.core.models.annotations.Plugin;
55
import io.kestra.core.models.conditions.ConditionContext;
66
import io.kestra.core.models.executions.Execution;
7+
import io.kestra.core.models.property.Property;
78
import io.kestra.core.models.triggers.*;
89
import io.kestra.core.runners.RunContext;
910
import io.kestra.plugin.debezium.AbstractDebeziumInterface;
@@ -75,41 +76,41 @@ public class Trigger extends AbstractTrigger implements PollingTriggerInterface,
7576
private final Duration interval = Duration.ofSeconds(60);
7677

7778
@Builder.Default
78-
protected AbstractDebeziumTask.Format format = AbstractDebeziumTask.Format.INLINE;
79+
protected Property<AbstractDebeziumTask.Format> format = Property.of(AbstractDebeziumTask.Format.INLINE);
7980

8081
@Builder.Default
81-
protected AbstractDebeziumTask.Deleted deleted = AbstractDebeziumTask.Deleted.ADD_FIELD;
82+
protected Property<AbstractDebeziumTask.Deleted> deleted = Property.of(AbstractDebeziumTask.Deleted.ADD_FIELD);
8283

8384
@Builder.Default
84-
protected String deletedFieldName = "deleted";
85+
protected Property<String> deletedFieldName = Property.of("deleted");
8586

8687
@Builder.Default
87-
protected AbstractDebeziumTask.Key key = AbstractDebeziumTask.Key.ADD_FIELD;
88+
protected Property<AbstractDebeziumTask.Key> key = Property.of(AbstractDebeziumTask.Key.ADD_FIELD);
8889

8990
@Builder.Default
90-
protected AbstractDebeziumTask.Metadata metadata = AbstractDebeziumTask.Metadata.ADD_FIELD;
91+
protected Property<AbstractDebeziumTask.Metadata> metadata = Property.of(AbstractDebeziumTask.Metadata.ADD_FIELD);
9192

9293
@Builder.Default
93-
protected String metadataFieldName = "metadata";
94+
protected Property<String> metadataFieldName = Property.of("metadata");
9495

9596
@Builder.Default
96-
protected AbstractDebeziumTask.SplitTable splitTable = AbstractDebeziumTask.SplitTable.TABLE;
97+
protected Property<AbstractDebeziumTask.SplitTable> splitTable = Property.of(AbstractDebeziumTask.SplitTable.TABLE);
9798

9899
@Builder.Default
99-
protected Boolean ignoreDdl = true;
100+
protected Property<Boolean> ignoreDdl = Property.of(true);
100101

101102
@NotNull
102-
private String connectionString;
103+
private Property<String> connectionString;
103104

104105
@Builder.Default
105-
protected String hostname = "";
106+
protected Property<String> hostname = Property.of("");
106107

107108
@Builder.Default
108-
protected String port = "";
109+
protected Property<String> port = Property.of("");
109110

110-
protected String username;
111+
protected Property<String> username;
111112

112-
protected String password;
113+
protected Property<String> password;
113114

114115
private Object includedDatabases;
115116

@@ -127,20 +128,20 @@ public class Trigger extends AbstractTrigger implements PollingTriggerInterface,
127128

128129
private Object excludedColumns;
129130

130-
private Map<String, String> properties;
131+
private Property<Map<String, String>> properties;
131132

132133
@Builder.Default
133-
protected String stateName = "debezium-state";
134+
protected Property<String> stateName = Property.of("debezium-state");
134135

135-
private Integer maxRecords;
136+
private Property<Integer> maxRecords;
136137

137-
private Duration maxDuration;
138+
private Property<Duration> maxDuration;
138139

139140
@Builder.Default
140-
private Duration maxWait = Duration.ofSeconds(10);
141+
private Property<Duration> maxWait = Property.of(Duration.ofSeconds(10));
141142

142143
@Builder.Default
143-
private MongodbInterface.SnapshotMode snapshotMode = MongodbInterface.SnapshotMode.INITIAL;
144+
private Property<MongodbInterface.SnapshotMode> snapshotMode = Property.of(SnapshotMode.INITIAL);
144145

145146
@Override
146147
public Optional<Execution> evaluate(ConditionContext conditionContext, TriggerContext context) throws Exception {

plugin-debezium-mongodb/src/test/java/io/kestra/plugin/debezium/mongodb/CaptureTest.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package io.kestra.plugin.debezium.mongodb;
22

33
import com.google.common.base.Charsets;
4+
import io.kestra.core.models.property.Property;
45
import io.kestra.core.serializers.FileSerde;
56
import io.kestra.core.storages.StorageInterface;
67
import io.kestra.core.utils.IdUtils;
@@ -36,9 +37,9 @@ void run() throws Exception {
3637
Capture task = Capture.builder()
3738
.id(IdUtils.create())
3839
.type(Capture.class.getName())
39-
.snapshotMode(MongodbInterface.SnapshotMode.INITIAL)
40-
.connectionString("mongodb://mongo_user:mongo_passwd@127.0.0.1:27017/?replicaSet=rs0")
41-
.maxRecords(20)
40+
.snapshotMode(Property.of(MongodbInterface.SnapshotMode.INITIAL))
41+
.connectionString(Property.of("mongodb://mongo_user:mongo_passwd@127.0.0.1:27017/?replicaSet=rs0"))
42+
.maxRecords(Property.of(20))
4243
.build();
4344

4445
RunContext runContext = TestsUtils.mockRunContext(runContextFactory, task, Map.of());

0 commit comments

Comments
 (0)