Skip to content

Commit 2d6ab07

Browse files
committed
feat(assets): create assets for dbt models
1 parent fadc2c8 commit 2d6ab07

File tree

16 files changed

+688
-93
lines changed

16 files changed

+688
-93
lines changed

build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ dependencies {
6666
**********************************************************************************************************************/
6767
test {
6868
useJUnitPlatform()
69+
systemProperty "kestra.encryption.secret-key", "test"
6970
}
7071

7172
testlogger {

gradle.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
11
version=1.1.4-SNAPSHOT
2-
kestraVersion=1.1.0
2+
kestraVersion=1.2.0

src/main/java/io/kestra/plugin/dbt/ResultParser.java

Lines changed: 175 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -3,40 +3,53 @@
33
import com.fasterxml.jackson.annotation.JsonInclude;
44
import com.fasterxml.jackson.databind.ObjectMapper;
55
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
6+
import io.kestra.core.models.assets.AssetIdentifier;
7+
import io.kestra.core.models.assets.AssetsInOut;
8+
import io.kestra.core.models.assets.Custom;
69
import io.kestra.core.models.executions.TaskRun;
710
import io.kestra.core.models.executions.TaskRunAttempt;
811
import io.kestra.core.models.executions.metrics.Counter;
912
import io.kestra.core.models.flows.State;
13+
import io.kestra.core.queues.QueueException;
1014
import io.kestra.core.runners.RunContext;
1115
import io.kestra.core.runners.WorkerTaskResult;
1216
import io.kestra.core.serializers.JacksonMapper;
1317
import io.kestra.core.utils.IdUtils;
18+
import io.kestra.plugin.dbt.models.Manifest;
1419
import io.kestra.plugin.dbt.models.RunResult;
1520

1621
import java.io.File;
1722
import java.io.IOException;
1823
import java.net.URI;
1924
import java.time.Instant;
20-
import java.util.ArrayList;
21-
import java.util.List;
22-
import java.util.Objects;
25+
import java.util.*;
2326

2427
import static io.kestra.core.utils.Rethrow.throwFunction;
2528

2629
public abstract class ResultParser {
2730
static final protected ObjectMapper MAPPER = JacksonMapper.ofJson(false)
2831
.setSerializationInclusion(JsonInclude.Include.NON_NULL);
2932

30-
public static URI parseManifest(RunContext runContext, File file) throws IOException {
31-
return runContext.storage().putFile(file);
33+
private static final String TABLE_ASSET_TYPE = "io.kestra.plugin.ee.assets.Table";
34+
private static final String RESOURCE_TYPE_MODEL = "model";
35+
36+
public record ManifestResult(Manifest manifest, URI uri) {
3237
}
3338

34-
public static URI parseRunResult(RunContext runContext, File file) throws IOException, IllegalVariableEvaluationException {
39+
public static ManifestResult parseManifestWithAssets(RunContext runContext, File file) throws IOException, IllegalVariableEvaluationException {
40+
Manifest manifest = MAPPER.readValue(file, Manifest.class);
41+
emitAssets(runContext, manifest);
42+
return new ManifestResult(manifest, runContext.storage().putFile(file));
43+
}
44+
45+
public static URI parseRunResult(RunContext runContext, File file, Manifest manifest) throws IOException, IllegalVariableEvaluationException {
3546
RunResult result = MAPPER.readValue(
3647
file,
3748
RunResult.class
3849
);
3950

51+
Map<String, ModelAsset> modelAssets = manifest == null ? Map.of() : extractModelAssets(manifest);
52+
4053
java.util.List<WorkerTaskResult> workerTaskResults = result
4154
.getResults()
4255
.stream()
@@ -96,22 +109,26 @@ public static URI parseRunResult(RunContext runContext, File file) throws IOExce
96109
.filter(Objects::nonNull)
97110
.forEach(runContext::metric);
98111

99-
return WorkerTaskResult.builder()
100-
.taskRun(TaskRun.builder()
101-
.id(IdUtils.create())
102-
.namespace(runContext.render("{{ flow.namespace }}"))
103-
.flowId(runContext.render("{{ flow.id }}"))
104-
.taskId(r.getUniqueId())
105-
.value(runContext.render("{{ taskrun.id }}"))
106-
.executionId(runContext.render("{{ execution.id }}"))
107-
.parentTaskRunId(runContext.render("{{ taskrun.id }}"))
112+
AssetsInOut assets = assetsFor(r.getUniqueId(), modelAssets);
113+
TaskRun.TaskRunBuilder taskRunBuilder = TaskRun.builder()
114+
.id(IdUtils.create())
115+
.namespace(runContext.render("{{ flow.namespace }}"))
116+
.flowId(runContext.render("{{ flow.id }}"))
117+
.taskId(r.getUniqueId())
118+
.value(runContext.render("{{ taskrun.id }}"))
119+
.executionId(runContext.render("{{ execution.id }}"))
120+
.parentTaskRunId(runContext.render("{{ taskrun.id }}"))
121+
.state(state)
122+
.attempts(List.of(TaskRunAttempt.builder()
108123
.state(state)
109-
.attempts(List.of(TaskRunAttempt.builder()
110-
.state(state)
111-
.build()
112-
))
113124
.build()
114-
)
125+
));
126+
if (assets != null) {
127+
taskRunBuilder.assets(assets);
128+
}
129+
130+
return WorkerTaskResult.builder()
131+
.taskRun(taskRunBuilder.build())
115132
.build();
116133
}))
117134
.toList();
@@ -120,4 +137,142 @@ public static URI parseRunResult(RunContext runContext, File file) throws IOExce
120137

121138
return runContext.storage().putFile(file);
122139
}
140+
141+
private static AssetsInOut assetsFor(String uniqueId, Map<String, ModelAsset> modelAssets) {
142+
if (uniqueId == null) {
143+
return null;
144+
}
145+
146+
ModelAsset modelAsset = modelAssets.get(uniqueId);
147+
if (modelAsset == null) {
148+
return null;
149+
}
150+
151+
List<AssetIdentifier> inputs = modelAsset.dependsOn().stream()
152+
.map(modelAssets::get)
153+
.filter(Objects::nonNull)
154+
.map(dep -> new AssetIdentifier(null, null, dep.assetId(), TABLE_ASSET_TYPE))
155+
.toList();
156+
157+
return new AssetsInOut(
158+
inputs,
159+
List.of(Custom.builder()
160+
.id(modelAsset.assetId())
161+
.type(TABLE_ASSET_TYPE)
162+
.metadata(modelAsset.metadata())
163+
.build()
164+
)
165+
);
166+
}
167+
168+
private static void emitAssets(RunContext runContext, Manifest manifest) throws IllegalVariableEvaluationException {
169+
Map<String, ModelAsset> modelAssets = extractModelAssets(manifest);
170+
runContext.logger().info("dbt assets extracted from manifest: {}", modelAssets.size());
171+
172+
for (ModelAsset asset : modelAssets.values()) {
173+
try {
174+
runContext.assets().upsert(Custom.builder()
175+
.id(asset.assetId())
176+
.type(TABLE_ASSET_TYPE)
177+
.metadata(asset.metadata())
178+
.build()
179+
);
180+
} catch (UnsupportedOperationException | QueueException e) {
181+
// UnsupportedOperationException for OSS or tests where EE is not configured (assets are EE only)
182+
runContext.logger().warn("Unable to upsert dbt asset '{}'", asset.assetId(), e);
183+
}
184+
}
185+
}
186+
187+
private static Map<String, ModelAsset> extractModelAssets(Manifest manifest) {
188+
if (manifest == null || manifest.getNodes() == null || manifest.getNodes().isEmpty()) {
189+
return Map.of();
190+
}
191+
192+
String system = adapterType(manifest);
193+
Map<String, ModelAsset> modelAssets = new HashMap<>();
194+
195+
for (Map.Entry<String, Manifest.Node> entry : manifest.getNodes().entrySet()) {
196+
Manifest.Node node = entry.getValue();
197+
if (node == null || !RESOURCE_TYPE_MODEL.equalsIgnoreCase(node.getResourceType())) {
198+
continue;
199+
}
200+
201+
String uniqueId = firstNonBlank(node.getUniqueId(), entry.getKey());
202+
if (uniqueId == null) {
203+
continue;
204+
}
205+
206+
String name = firstNonBlank(node.getAlias(), node.getName(), uniqueId);
207+
String assetId = assetIdFor(node.getDatabase(), node.getSchema(), name, uniqueId);
208+
209+
Map<String, Object> metadata = new HashMap<>();
210+
if (hasValue(system)) metadata.put("system", system);
211+
if (hasValue(node.getDatabase())) metadata.put("database", node.getDatabase());
212+
if (hasValue(node.getSchema())) metadata.put("schema", node.getSchema());
213+
if (hasValue(name)) metadata.put("name", name);
214+
215+
List<String> dependsOn = List.of();
216+
if (node.getDependsOn() != null) {
217+
dependsOn = node.getDependsOn().getOrDefault("nodes", List.of());
218+
}
219+
220+
modelAssets.put(uniqueId, new ModelAsset(assetId, metadata, dependsOn));
221+
}
222+
223+
Map<String, ModelAsset> filtered = new HashMap<>(modelAssets.size());
224+
for (Map.Entry<String, ModelAsset> e : modelAssets.entrySet()) {
225+
ModelAsset a = e.getValue();
226+
List<String> deps = a.dependsOn() == null ? List.of() : a.dependsOn().stream()
227+
.filter(modelAssets::containsKey)
228+
.toList();
229+
filtered.put(e.getKey(), new ModelAsset(a.assetId(), a.metadata(), deps));
230+
}
231+
232+
return filtered;
233+
}
234+
235+
private static String adapterType(Manifest manifest) {
236+
if (manifest.getMetadata() == null) {
237+
return null;
238+
}
239+
Object adapterType = manifest.getMetadata().get("adapter_type");
240+
return adapterType == null ? null : adapterType.toString();
241+
}
242+
243+
private static String assetIdFor(String database, String schema, String name, String fallback) {
244+
List<String> parts = new ArrayList<>();
245+
if (hasValue(database)) {
246+
parts.add(database);
247+
}
248+
if (hasValue(schema)) {
249+
parts.add(schema);
250+
}
251+
if (hasValue(name)) {
252+
parts.add(name);
253+
}
254+
if (!parts.isEmpty()) {
255+
return String.join(".", parts);
256+
}
257+
return fallback;
258+
}
259+
260+
private static String firstNonBlank(String... values) {
261+
if (values == null) {
262+
return null;
263+
}
264+
for (String value : values) {
265+
if (hasValue(value)) {
266+
return value;
267+
}
268+
}
269+
return null;
270+
}
271+
272+
private static boolean hasValue(String value) {
273+
return value != null && !value.trim().isEmpty();
274+
}
275+
276+
private record ModelAsset(String assetId, Map<String, Object> metadata, List<String> dependsOn) {
277+
}
123278
}
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
package io.kestra.plugin.dbt;
2+
3+
import io.kestra.core.runners.RunContext;
4+
5+
import java.lang.reflect.Field;
6+
import java.util.Optional;
7+
8+
public final class RunContextUtils {
9+
private RunContextUtils() {
10+
}
11+
12+
public static void ensureSecretKey(RunContext runContext) {
13+
if (runContext == null) {
14+
return;
15+
}
16+
17+
try {
18+
Class<?> type = runContext.getClass();
19+
Field field = null;
20+
while (type != null && field == null) {
21+
try {
22+
field = type.getDeclaredField("secretKey");
23+
} catch (NoSuchFieldException e) {
24+
type = type.getSuperclass();
25+
}
26+
}
27+
28+
if (field == null) {
29+
return;
30+
}
31+
32+
initializeRunContext(runContext);
33+
field.setAccessible(true);
34+
if (field.get(runContext) == null) {
35+
field.set(runContext, Optional.empty());
36+
}
37+
} catch (IllegalAccessException e) {
38+
// Ignore to avoid breaking task execution on reflection issues.
39+
}
40+
}
41+
42+
private static void initializeRunContext(RunContext runContext) {
43+
try {
44+
Field appField = null;
45+
Class<?> type = runContext.getClass();
46+
while (type != null && appField == null) {
47+
try {
48+
appField = type.getDeclaredField("applicationContext");
49+
} catch (NoSuchFieldException e) {
50+
type = type.getSuperclass();
51+
}
52+
}
53+
if (appField == null) {
54+
return;
55+
}
56+
appField.setAccessible(true);
57+
Object appContext = appField.get(runContext);
58+
if (appContext == null) {
59+
return;
60+
}
61+
var initMethod = runContext.getClass().getDeclaredMethod("init", io.micronaut.context.ApplicationContext.class);
62+
initMethod.setAccessible(true);
63+
initMethod.invoke(runContext, appContext);
64+
} catch (ReflectiveOperationException e) {
65+
// Ignore to avoid breaking task execution on reflection issues.
66+
}
67+
}
68+
}

0 commit comments

Comments
 (0)