Skip to content
This repository was archived by the owner on Feb 13, 2026. It is now read-only.

Commit 0eb1165

Browse files
authored
refactor: migrate to dynamic properties (#84)
* refactor: migrate to dynamic properties migrate abstract python singer to dynamic properties and BigQuery tap * refactor: migrate target migrate abstract target AdswerveBigQuery DatamillCoPostgres GenericTarget MeltanoSnowflake Oracle * refactor: migrate taps tasks
1 parent 2d1a561 commit 0eb1165

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

54 files changed

+730
-877
lines changed

src/main/java/io/kestra/plugin/singer/AbstractPythonSinger.java

Lines changed: 17 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import io.kestra.core.models.annotations.PluginProperty;
99
import io.kestra.core.models.executions.metrics.Counter;
1010
import io.kestra.core.models.executions.metrics.Timer;
11+
import io.kestra.core.models.property.Property;
1112
import io.kestra.core.models.tasks.Task;
1213
import io.kestra.core.models.tasks.runners.AbstractLogConsumer;
1314
import io.kestra.core.models.tasks.runners.ScriptService;
@@ -65,22 +66,19 @@ public abstract class AbstractPythonSinger extends Task {
6566
@Schema(
6667
title = "The name of Singer state file stored in KV Store."
6768
)
68-
@PluginProperty(dynamic = true)
6969
@NotNull
7070
@Builder.Default
71-
protected String stateName = "singer-state";
71+
protected Property<String> stateName = Property.of("singer-state");
7272

7373
@Schema(
7474
title = "Override default pip packages to use a specific version."
7575
)
76-
@PluginProperty(dynamic = true)
77-
protected List<String> pipPackages;
76+
protected Property<List<String>> pipPackages;
7877

7978
@Schema(
8079
title = "Override default singer command."
8180
)
82-
@PluginProperty(dynamic = true)
83-
protected String command;
81+
protected Property<String> command;
8482

8583
@Schema(
8684
title = "Deprecated, use 'taskRunner' instead"
@@ -98,9 +96,8 @@ public abstract class AbstractPythonSinger extends Task {
9896
private TaskRunner taskRunner = Docker.instance();
9997

10098
@Schema(title = "The task runner container image, only used if the task runner is container-based.")
101-
@PluginProperty(dynamic = true)
10299
@Builder.Default
103-
private String containerImage = DEFAULT_IMAGE;
100+
private Property<String> containerImage = Property.of(DEFAULT_IMAGE);
104101

105102
protected DockerOptions injectDefaults(DockerOptions original) {
106103
if (original == null) {
@@ -115,14 +112,16 @@ protected DockerOptions injectDefaults(DockerOptions original) {
115112
return builder.build();
116113
}
117114

118-
abstract public Map<String, Object> configuration(RunContext runContext) throws IllegalVariableEvaluationException, IOException;
115+
public abstract Map<String, Object> configuration(RunContext runContext) throws IllegalVariableEvaluationException, IOException;
119116

120-
abstract public List<String> pipPackages();
117+
public abstract Property<List<String>> pipPackages();
121118

122-
abstract protected String command();
119+
protected abstract Property<String> command();
123120

124121
protected String finalCommand(RunContext runContext) throws IllegalVariableEvaluationException {
125-
return this.command != null ? runContext.render(this.command) : this.command();
122+
return this.command != null ?
123+
runContext.render(this.command).as(String.class).orElseThrow() :
124+
runContext.render(this.command()).as(String.class).orElse(null);
126125
}
127126

128127
protected void run(RunContext runContext, String command, AbstractLogConsumer logConsumer) throws Exception {
@@ -141,7 +140,7 @@ protected void run(RunContext runContext, String command, AbstractLogConsumer lo
141140
.withWarningOnStdErr(true)
142141
.withDockerOptions(this.injectDefaults(getDocker()))
143142
.withTaskRunner(taskRunner)
144-
.withContainerImage(this.containerImage)
143+
.withContainerImage(runContext.render(this.containerImage).as(String.class).orElseThrow())
145144
.withLogConsumer(logConsumer)
146145
.withCommands(ScriptService.scriptCommands(
147146
List.of("/bin/sh", "-c"),
@@ -156,7 +155,11 @@ protected void run(RunContext runContext, String command, AbstractLogConsumer lo
156155
}
157156

158157
protected Stream<String> pipInstallCommands(RunContext runContext) throws Exception {
159-
ArrayList<String> finalRequirements = new ArrayList<>(this.pipPackages != null ? runContext.render(this.pipPackages) : this.pipPackages());
158+
ArrayList<String> finalRequirements = new ArrayList<>(
159+
this.pipPackages != null ?
160+
runContext.render(this.pipPackages).asList(String.class) :
161+
runContext.render(this.pipPackages()).asList(String.class)
162+
);
160163
finalRequirements.add("python-json-logger");
161164

162165
return Stream.of(

src/main/java/io/kestra/plugin/singer/taps/AbstractPythonTap.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ public void initEnvDiscoveryAndState(RunContext runContext) throws Exception {
6969
if (this.features().contains(Feature.STATE)) {
7070
try {
7171
InputStream taskStateFile = runContext.stateStore().getState(
72-
runContext.render(this.stateName),
72+
runContext.render(this.stateName).as(String.class).orElseThrow(),
7373
"state.json",
7474
runContext.storage().getTaskStorageContext().map(StorageContext.Task::getTaskRunValue).orElse(null)
7575
);
@@ -102,7 +102,7 @@ public Output run(RunContext runContext) throws Exception {
102102
.raw(runContext.storage().putFile(this.rawSingerStream.getLeft()));
103103

104104
if (this.features().contains(Feature.STATE)) {
105-
this.saveState(runContext, runContext.render(this.stateName), this.stateRecords);
105+
this.saveState(runContext, runContext.render(this.stateName).as(String.class).orElseThrow(), this.stateRecords);
106106
}
107107

108108
return outputBuilder

src/main/java/io/kestra/plugin/singer/taps/BigQuery.java

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import com.google.common.collect.ImmutableMap;
55
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
66
import io.kestra.core.models.annotations.PluginProperty;
7+
import io.kestra.core.models.property.Property;
78
import io.kestra.core.models.tasks.RunnableTask;
89
import io.kestra.core.runners.RunContext;
910
import io.kestra.plugin.singer.models.Feature;
@@ -30,8 +31,7 @@ public class BigQuery extends AbstractPythonTap implements RunnableTask<Abstract
3031
@Schema(
3132
title = "The JSON service account key as string."
3233
)
33-
@PluginProperty(dynamic = true)
34-
protected String serviceAccount;
34+
protected Property<String> serviceAccount;
3535

3636
@NotNull
3737
@NotEmpty
@@ -45,17 +45,15 @@ public class BigQuery extends AbstractPythonTap implements RunnableTask<Abstract
4545
@Schema(
4646
title = "Limits the number of records returned in each stream, applied as a limit in the query."
4747
)
48-
@PluginProperty
49-
private Integer limit;
48+
private Property<Integer> limit;
5049

5150
@NotNull
5251
@Schema(
5352
title = "When replicating incrementally, disable to only select records whose `datetime_key` is greater than the maximum value replicated in the last run, by excluding records whose timestamps match exactly.",
5453
description = "This could cause records to be missed that were created after the last run finished, but during the same second and with the same timestamp."
5554
)
56-
@PluginProperty
5755
@Builder.Default
58-
private Boolean startAlwaysInclusive = true;
56+
private Property<Boolean> startAlwaysInclusive = Property.of(true);
5957

6058
@NotNull
6159
@Schema(
@@ -83,7 +81,7 @@ public List<Feature> features() {
8381
public Map<String, Object> configuration(RunContext runContext) throws IllegalVariableEvaluationException {
8482
ImmutableMap.Builder<String, Object> builder = ImmutableMap.<String, Object>builder()
8583
.put("streams", this.streams)
86-
.put("start_always_inclusive", this.startAlwaysInclusive);
84+
.put("start_always_inclusive", runContext.render(this.startAlwaysInclusive).as(Boolean.class).orElseThrow());
8785

8886
if (this.startDateTime != null) {
8987
builder.put("start_datetime", runContext.render(this.startDateTime.toString()));
@@ -97,13 +95,13 @@ public Map<String, Object> configuration(RunContext runContext) throws IllegalVa
9795
}
9896

9997
@Override
100-
public List<String> pipPackages() {
101-
return List.of("git+https://github.com/kestra-io/tap-bigquery.git@fix");
98+
public Property<List<String>> pipPackages() {
99+
return Property.of(List.of("git+https://github.com/kestra-io/tap-bigquery.git@fix"));
102100
}
103101

104102
@Override
105-
protected String command() {
106-
return "tap-bigquery";
103+
protected Property<String> command() {
104+
return Property.of("tap-bigquery");
107105
}
108106

109107
@SuppressWarnings("DuplicatedCode")
@@ -112,7 +110,7 @@ protected Map<String, String> environmentVariables(RunContext runContext) throws
112110
HashMap<String, String> env = new HashMap<>(super.environmentVariables(runContext));
113111

114112
if (this.serviceAccount != null) {
115-
this.writeSingerFiles("google-credentials.json", runContext.render(this.serviceAccount));
113+
this.writeSingerFiles("google-credentials.json", runContext.render(this.serviceAccount).as(String.class).orElseThrow());
116114
env.put("GOOGLE_APPLICATION_CREDENTIALS", workingDirectory.toAbsolutePath() + "/google-credentials.json");
117115
}
118116

src/main/java/io/kestra/plugin/singer/taps/BingAds.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import com.google.common.collect.ImmutableMap;
44
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
55
import io.kestra.core.models.annotations.PluginProperty;
6+
import io.kestra.core.models.property.Property;
67
import io.kestra.core.models.tasks.RunnableTask;
78
import io.kestra.core.runners.RunContext;
89
import io.kestra.plugin.singer.models.Feature;
@@ -107,12 +108,12 @@ public Map<String, Object> configuration(RunContext runContext) throws IllegalVa
107108
}
108109

109110
@Override
110-
public List<String> pipPackages() {
111-
return Collections.singletonList("tap-bing-ads");
111+
public Property<List<String>> pipPackages() {
112+
return Property.of(Collections.singletonList("tap-bing-ads"));
112113
}
113114

114115
@Override
115-
protected String command() {
116-
return "tap-bing-ads";
116+
protected Property<String> command() {
117+
return Property.of("tap-bing-ads");
117118
}
118119
}

src/main/java/io/kestra/plugin/singer/taps/ChargeBee.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import com.google.common.collect.ImmutableMap;
44
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
55
import io.kestra.core.models.annotations.PluginProperty;
6+
import io.kestra.core.models.property.Property;
67
import io.kestra.core.models.tasks.RunnableTask;
78
import io.kestra.core.runners.RunContext;
89
import io.kestra.plugin.singer.models.Feature;
@@ -84,12 +85,12 @@ public Map<String, Object> configuration(RunContext runContext) throws IllegalVa
8485
}
8586

8687
@Override
87-
public List<String> pipPackages() {
88-
return Collections.singletonList("git+https://github.com/hotgluexyz/tap-chargebee.git");
88+
public Property<List<String>> pipPackages() {
89+
return Property.of(Collections.singletonList("git+https://github.com/hotgluexyz/tap-chargebee.git"));
8990
}
9091

9192
@Override
92-
protected String command() {
93-
return "tap-chargebee";
93+
protected Property<String> command() {
94+
return Property.of("tap-chargebee");
9495
}
9596
}

src/main/java/io/kestra/plugin/singer/taps/ExchangeRateHost.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import com.google.common.collect.ImmutableMap;
44
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
55
import io.kestra.core.models.annotations.PluginProperty;
6+
import io.kestra.core.models.property.Property;
67
import io.kestra.core.models.tasks.RunnableTask;
78
import io.kestra.core.runners.RunContext;
89
import io.kestra.plugin.singer.models.Feature;
@@ -69,12 +70,12 @@ public Map<String, Object> configuration(RunContext runContext) throws IllegalVa
6970
}
7071

7172
@Override
72-
public List<String> pipPackages() {
73-
return Collections.singletonList("tap-exchangeratehost");
73+
public Property<List<String>> pipPackages() {
74+
return Property.of(Collections.singletonList("tap-exchangeratehost"));
7475
}
7576

7677
@Override
77-
protected String command() {
78-
return "tap-exchangeratehost";
78+
protected Property<String> command() {
79+
return Property.of("tap-exchangeratehost");
7980
}
8081
}

src/main/java/io/kestra/plugin/singer/taps/FacebookAds.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import com.google.common.collect.ImmutableMap;
44
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
55
import io.kestra.core.models.annotations.PluginProperty;
6+
import io.kestra.core.models.property.Property;
67
import io.kestra.core.models.tasks.RunnableTask;
78
import io.kestra.core.runners.RunContext;
89
import io.kestra.plugin.singer.models.Feature;
@@ -48,9 +49,8 @@ public class FacebookAds extends AbstractPythonTap implements RunnableTask<Abstr
4849
@Schema(
4950
title = "How many Days before the Start Date to fetch Ads Insights for."
5051
)
51-
@PluginProperty(dynamic = true)
5252
@Builder.Default
53-
private final Integer insightsBufferDays = 0;
53+
private final Property<Integer> insightsBufferDays = Property.of(0);
5454

5555
@NotNull
5656
@Schema(
@@ -79,7 +79,7 @@ public Map<String, Object> configuration(RunContext runContext) throws IllegalVa
7979
ImmutableMap.Builder<String, Object> builder = ImmutableMap.<String, Object>builder()
8080
.put("account_id", runContext.render(this.accountId))
8181
.put("access_token", runContext.render(this.accessToken))
82-
.put("insights_buffer_days", this.insightsBufferDays)
82+
.put("insights_buffer_days", runContext.render(this.insightsBufferDays).as(Integer.class).orElseThrow())
8383
.put("start_date", runContext.render(this.startDate.toString()));
8484

8585
if (this.endDate != null) {
@@ -90,12 +90,12 @@ public Map<String, Object> configuration(RunContext runContext) throws IllegalVa
9090
}
9191

9292
@Override
93-
public List<String> pipPackages() {
94-
return Collections.singletonList("tap-facebook");
93+
public Property<List<String>> pipPackages() {
94+
return Property.of(Collections.singletonList("tap-facebook"));
9595
}
9696

9797
@Override
98-
protected String command() {
99-
return "tap-facebook";
98+
protected Property<String> command() {
99+
return Property.of("tap-facebook");
100100
}
101101
}

src/main/java/io/kestra/plugin/singer/taps/Fastly.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import com.google.common.collect.ImmutableMap;
44
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
55
import io.kestra.core.models.annotations.PluginProperty;
6+
import io.kestra.core.models.property.Property;
67
import io.kestra.core.models.tasks.RunnableTask;
78
import io.kestra.core.runners.RunContext;
89
import io.kestra.plugin.singer.models.Feature;
@@ -65,12 +66,12 @@ public Map<String, Object> configuration(RunContext runContext) throws IllegalVa
6566
}
6667

6768
@Override
68-
public List<String> pipPackages() {
69-
return Collections.singletonList("git+https://gitlab.com/meltano/tap-fastly.git");
69+
public Property<List<String>> pipPackages() {
70+
return Property.of(Collections.singletonList("git+https://gitlab.com/meltano/tap-fastly.git"));
7071
}
7172

7273
@Override
73-
protected String command() {
74-
return "tap-fastly";
74+
protected Property<String> command() {
75+
return Property.of("tap-fastly");
7576
}
7677
}

src/main/java/io/kestra/plugin/singer/taps/GenericTap.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
44
import io.kestra.core.models.annotations.PluginProperty;
5+
import io.kestra.core.models.property.Property;
56
import io.kestra.core.models.tasks.RunnableTask;
67
import io.kestra.core.runners.RunContext;
78
import io.kestra.plugin.singer.models.Feature;
@@ -11,6 +12,7 @@
1112
import lombok.experimental.SuperBuilder;
1213

1314
import java.util.Arrays;
15+
import java.util.HashMap;
1416
import java.util.List;
1517
import java.util.Map;
1618

@@ -27,15 +29,13 @@ public class GenericTap extends AbstractPythonTap implements RunnableTask<Abstra
2729
@Schema(
2830
title = "The list of pip package to install."
2931
)
30-
@PluginProperty
31-
private List<String> pipPackages;
32+
private Property<List<String>> pipPackages;
3233

3334
@NotNull
3435
@Schema(
3536
title = "The command to start."
3637
)
37-
@PluginProperty
38-
private String command;
38+
private Property<String> command;
3939

4040
@NotNull
4141
@Schema(
@@ -53,25 +53,25 @@ public class GenericTap extends AbstractPythonTap implements RunnableTask<Abstra
5353
title = "The configuration to use",
5454
description = "Will be save on config.json and used as arguments"
5555
)
56-
@PluginProperty(dynamic = true)
57-
private Map<String, Object> configs;
56+
private Property<Map<String, Object>> configs;
5857

5958
public List<Feature> features() {
6059
return this.features;
6160
}
6261

6362
@Override
6463
public Map<String, Object> configuration(RunContext runContext) throws IllegalVariableEvaluationException {
65-
return runContext.render(configs);
64+
var config = runContext.render(configs).asMap(String.class, Object.class);
65+
return config.isEmpty() ? new HashMap<>() : config;
6666
}
6767

6868
@Override
69-
public List<String> pipPackages() {
69+
public Property<List<String>> pipPackages() {
7070
return this.pipPackages;
7171
}
7272

7373
@Override
74-
protected String command() {
74+
protected Property<String> command() {
7575
return this.command;
7676
}
7777
}

0 commit comments

Comments
 (0)