Skip to content

Commit f9d0793

Browse files
committed
feat(assets): create assets for dbt models
1 parent fadc2c8 commit f9d0793

File tree

13 files changed

+500
-79
lines changed

13 files changed

+500
-79
lines changed

build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ dependencies {
6666
**********************************************************************************************************************/
6767
test {
6868
useJUnitPlatform()
69+
systemProperty "kestra.encryption.secret-key", "test"
6970
}
7071

7172
testlogger {

gradle.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
11
version=1.1.4-SNAPSHOT
2-
kestraVersion=1.1.0
2+
kestraVersion=1.2.0

src/main/java/io/kestra/plugin/dbt/ResultParser.java

Lines changed: 179 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -3,40 +3,61 @@
33
import com.fasterxml.jackson.annotation.JsonInclude;
44
import com.fasterxml.jackson.databind.ObjectMapper;
55
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
6+
import io.kestra.core.models.assets.AssetIdentifier;
7+
import io.kestra.core.models.assets.AssetsInOut;
8+
import io.kestra.core.models.assets.Custom;
69
import io.kestra.core.models.executions.TaskRun;
710
import io.kestra.core.models.executions.TaskRunAttempt;
811
import io.kestra.core.models.executions.metrics.Counter;
912
import io.kestra.core.models.flows.State;
13+
import io.kestra.core.queues.QueueException;
1014
import io.kestra.core.runners.RunContext;
1115
import io.kestra.core.runners.WorkerTaskResult;
1216
import io.kestra.core.serializers.JacksonMapper;
1317
import io.kestra.core.utils.IdUtils;
18+
import io.kestra.plugin.dbt.models.Manifest;
1419
import io.kestra.plugin.dbt.models.RunResult;
1520

1621
import java.io.File;
1722
import java.io.IOException;
1823
import java.net.URI;
1924
import java.time.Instant;
20-
import java.util.ArrayList;
21-
import java.util.List;
22-
import java.util.Objects;
25+
import java.util.*;
2326

2427
import static io.kestra.core.utils.Rethrow.throwFunction;
2528

2629
public abstract class ResultParser {
2730
static final protected ObjectMapper MAPPER = JacksonMapper.ofJson(false)
2831
.setSerializationInclusion(JsonInclude.Include.NON_NULL);
2932

30-
public static URI parseManifest(RunContext runContext, File file) throws IOException {
31-
return runContext.storage().putFile(file);
33+
private static final String TABLE_ASSET_TYPE = "io.kestra.plugin.ee.assets.Table";
34+
private static final String RESOURCE_TYPE_MODEL = "model";
35+
36+
public record ManifestResult(Manifest manifest, URI uri) {
37+
}
38+
39+
public static ManifestResult parseManifestWithAssets(RunContext runContext, File file) throws IOException, IllegalVariableEvaluationException {
40+
Manifest manifest = MAPPER.readValue(file, Manifest.class);
41+
emitAssets(runContext, manifest);
42+
return new ManifestResult(manifest, runContext.storage().putFile(file));
43+
}
44+
45+
public static URI parseManifest(RunContext runContext, File file) throws IOException, IllegalVariableEvaluationException {
46+
return parseManifestWithAssets(runContext, file).uri();
3247
}
3348

3449
public static URI parseRunResult(RunContext runContext, File file) throws IOException, IllegalVariableEvaluationException {
50+
return parseRunResult(runContext, file, null);
51+
}
52+
53+
public static URI parseRunResult(RunContext runContext, File file, Manifest manifest) throws IOException, IllegalVariableEvaluationException {
3554
RunResult result = MAPPER.readValue(
3655
file,
3756
RunResult.class
3857
);
3958

59+
Map<String, ModelAsset> modelAssets = manifest == null ? Map.of() : extractModelAssets(manifest);
60+
4061
java.util.List<WorkerTaskResult> workerTaskResults = result
4162
.getResults()
4263
.stream()
@@ -96,22 +117,26 @@ public static URI parseRunResult(RunContext runContext, File file) throws IOExce
96117
.filter(Objects::nonNull)
97118
.forEach(runContext::metric);
98119

99-
return WorkerTaskResult.builder()
100-
.taskRun(TaskRun.builder()
101-
.id(IdUtils.create())
102-
.namespace(runContext.render("{{ flow.namespace }}"))
103-
.flowId(runContext.render("{{ flow.id }}"))
104-
.taskId(r.getUniqueId())
105-
.value(runContext.render("{{ taskrun.id }}"))
106-
.executionId(runContext.render("{{ execution.id }}"))
107-
.parentTaskRunId(runContext.render("{{ taskrun.id }}"))
120+
AssetsInOut assets = assetsFor(r.getUniqueId(), modelAssets);
121+
TaskRun.TaskRunBuilder taskRunBuilder = TaskRun.builder()
122+
.id(IdUtils.create())
123+
.namespace(runContext.render("{{ flow.namespace }}"))
124+
.flowId(runContext.render("{{ flow.id }}"))
125+
.taskId(r.getUniqueId())
126+
.value(runContext.render("{{ taskrun.id }}"))
127+
.executionId(runContext.render("{{ execution.id }}"))
128+
.parentTaskRunId(runContext.render("{{ taskrun.id }}"))
129+
.state(state)
130+
.attempts(List.of(TaskRunAttempt.builder()
108131
.state(state)
109-
.attempts(List.of(TaskRunAttempt.builder()
110-
.state(state)
111-
.build()
112-
))
113132
.build()
114-
)
133+
));
134+
if (assets != null) {
135+
taskRunBuilder.assets(assets);
136+
}
137+
138+
return WorkerTaskResult.builder()
139+
.taskRun(taskRunBuilder.build())
115140
.build();
116141
}))
117142
.toList();
@@ -120,4 +145,139 @@ public static URI parseRunResult(RunContext runContext, File file) throws IOExce
120145

121146
return runContext.storage().putFile(file);
122147
}
148+
149+
private static AssetsInOut assetsFor(String uniqueId, Map<String, ModelAsset> modelAssets) {
150+
if (uniqueId == null) {
151+
return null;
152+
}
153+
154+
ModelAsset modelAsset = modelAssets.get(uniqueId);
155+
if (modelAsset == null) {
156+
return null;
157+
}
158+
159+
List<AssetIdentifier> inputs = modelAsset.dependsOn()
160+
.stream()
161+
.map(modelAssets::get)
162+
.filter(Objects::nonNull)
163+
.map(dependsOn -> new AssetIdentifier(null, null, dependsOn.assetId(), TABLE_ASSET_TYPE))
164+
.toList();
165+
166+
return new AssetsInOut(
167+
inputs,
168+
List.of(Custom.builder()
169+
.id(modelAsset.assetId())
170+
.type(TABLE_ASSET_TYPE)
171+
.metadata(modelAsset.metadata())
172+
.build()
173+
)
174+
);
175+
}
176+
177+
private static void emitAssets(RunContext runContext, Manifest manifest) throws IllegalVariableEvaluationException {
178+
Map<String, ModelAsset> modelAssets = extractModelAssets(manifest);
179+
runContext.logger().info("dbt assets extracted from manifest: {}", modelAssets.size());
180+
for (ModelAsset asset : modelAssets.values()) {
181+
try {
182+
runContext.assets().upsert(Custom.builder()
183+
.id(asset.assetId())
184+
.type(TABLE_ASSET_TYPE)
185+
.metadata(asset.metadata())
186+
.build()
187+
);
188+
} catch (UnsupportedOperationException | QueueException e) {
189+
// UnsupportedOperationException for OSS or tests where EE is not configured (assets are EE only)
190+
runContext.logger().warn("Unable to upsert dbt asset '{}'", asset.assetId(), e);
191+
}
192+
}
193+
}
194+
195+
private static Map<String, ModelAsset> extractModelAssets(Manifest manifest) {
196+
if (manifest == null || manifest.getNodes() == null || manifest.getNodes().isEmpty()) {
197+
return Map.of();
198+
}
199+
200+
String system = adapterType(manifest);
201+
Map<String, ModelAsset> modelAssets = new HashMap<>();
202+
for (Map.Entry<String, Manifest.Node> entry : manifest.getNodes().entrySet()) {
203+
Manifest.Node node = entry.getValue();
204+
if (node == null || !RESOURCE_TYPE_MODEL.equalsIgnoreCase(node.getResourceType())) {
205+
continue;
206+
}
207+
208+
String uniqueId = firstNonBlank(node.getUniqueId(), entry.getKey());
209+
if (uniqueId == null) {
210+
continue;
211+
}
212+
213+
String name = firstNonBlank(node.getAlias(), node.getName(), uniqueId);
214+
String assetId = assetIdFor(node.getDatabase(), node.getSchema(), name, uniqueId);
215+
Map<String, Object> metadata = new HashMap<>();
216+
if (hasValue(system)) {
217+
metadata.put("system", system);
218+
}
219+
if (hasValue(node.getDatabase())) {
220+
metadata.put("database", node.getDatabase());
221+
}
222+
if (hasValue(node.getSchema())) {
223+
metadata.put("schema", node.getSchema());
224+
}
225+
if (hasValue(name)) {
226+
metadata.put("name", name);
227+
}
228+
229+
List<String> dependsOn = List.of();
230+
if (node.getDependsOn() != null) {
231+
dependsOn = node.getDependsOn().getOrDefault("nodes", List.of());
232+
}
233+
234+
modelAssets.put(uniqueId, new ModelAsset(assetId, metadata, dependsOn));
235+
}
236+
237+
return modelAssets;
238+
}
239+
240+
private static String adapterType(Manifest manifest) {
241+
if (manifest.getMetadata() == null) {
242+
return null;
243+
}
244+
Object adapterType = manifest.getMetadata().get("adapter_type");
245+
return adapterType == null ? null : adapterType.toString();
246+
}
247+
248+
private static String assetIdFor(String database, String schema, String name, String fallback) {
249+
List<String> parts = new ArrayList<>();
250+
if (hasValue(database)) {
251+
parts.add(database);
252+
}
253+
if (hasValue(schema)) {
254+
parts.add(schema);
255+
}
256+
if (hasValue(name)) {
257+
parts.add(name);
258+
}
259+
if (!parts.isEmpty()) {
260+
return String.join(".", parts);
261+
}
262+
return fallback;
263+
}
264+
265+
private static String firstNonBlank(String... values) {
266+
if (values == null) {
267+
return null;
268+
}
269+
for (String value : values) {
270+
if (hasValue(value)) {
271+
return value;
272+
}
273+
}
274+
return null;
275+
}
276+
277+
private static boolean hasValue(String value) {
278+
return value != null && !value.trim().isEmpty();
279+
}
280+
281+
private record ModelAsset(String assetId, Map<String, Object> metadata, List<String> dependsOn) {
282+
}
123283
}
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
package io.kestra.plugin.dbt;
2+
3+
import io.kestra.core.runners.RunContext;
4+
5+
import java.lang.reflect.Field;
6+
import java.util.Optional;
7+
8+
public final class RunContextUtils {
9+
private RunContextUtils() {
10+
}
11+
12+
public static void ensureSecretKey(RunContext runContext) {
13+
if (runContext == null) {
14+
return;
15+
}
16+
17+
try {
18+
Class<?> type = runContext.getClass();
19+
Field field = null;
20+
while (type != null && field == null) {
21+
try {
22+
field = type.getDeclaredField("secretKey");
23+
} catch (NoSuchFieldException e) {
24+
type = type.getSuperclass();
25+
}
26+
}
27+
28+
if (field == null) {
29+
return;
30+
}
31+
32+
initializeRunContext(runContext);
33+
field.setAccessible(true);
34+
if (field.get(runContext) == null) {
35+
field.set(runContext, Optional.empty());
36+
}
37+
} catch (IllegalAccessException e) {
38+
// Ignore to avoid breaking task execution on reflection issues.
39+
}
40+
}
41+
42+
private static void initializeRunContext(RunContext runContext) {
43+
try {
44+
Field appField = null;
45+
Class<?> type = runContext.getClass();
46+
while (type != null && appField == null) {
47+
try {
48+
appField = type.getDeclaredField("applicationContext");
49+
} catch (NoSuchFieldException e) {
50+
type = type.getSuperclass();
51+
}
52+
}
53+
if (appField == null) {
54+
return;
55+
}
56+
appField.setAccessible(true);
57+
Object appContext = appField.get(runContext);
58+
if (appContext == null) {
59+
return;
60+
}
61+
var initMethod = runContext.getClass().getDeclaredMethod("init", io.micronaut.context.ApplicationContext.class);
62+
initMethod.setAccessible(true);
63+
initMethod.invoke(runContext, appContext);
64+
} catch (ReflectiveOperationException e) {
65+
// Ignore to avoid breaking task execution on reflection issues.
66+
}
67+
}
68+
}

0 commit comments

Comments
 (0)