33import com .fasterxml .jackson .annotation .JsonInclude ;
44import com .fasterxml .jackson .databind .ObjectMapper ;
55import io .kestra .core .exceptions .IllegalVariableEvaluationException ;
6+ import io .kestra .core .models .assets .AssetIdentifier ;
7+ import io .kestra .core .models .assets .AssetsInOut ;
8+ import io .kestra .core .models .assets .Custom ;
69import io .kestra .core .models .executions .TaskRun ;
710import io .kestra .core .models .executions .TaskRunAttempt ;
811import io .kestra .core .models .executions .metrics .Counter ;
912import io .kestra .core .models .flows .State ;
13+ import io .kestra .core .queues .QueueException ;
1014import io .kestra .core .runners .RunContext ;
1115import io .kestra .core .runners .WorkerTaskResult ;
1216import io .kestra .core .serializers .JacksonMapper ;
1317import io .kestra .core .utils .IdUtils ;
18+ import io .kestra .plugin .dbt .models .Manifest ;
1419import io .kestra .plugin .dbt .models .RunResult ;
1520
1621import java .io .File ;
1722import java .io .IOException ;
1823import java .net .URI ;
1924import java .time .Instant ;
20- import java .util .ArrayList ;
21- import java .util .List ;
22- import java .util .Objects ;
25+ import java .util .*;
2326
2427import static io .kestra .core .utils .Rethrow .throwFunction ;
2528
2629public abstract class ResultParser {
2730 static final protected ObjectMapper MAPPER = JacksonMapper .ofJson (false )
2831 .setSerializationInclusion (JsonInclude .Include .NON_NULL );
2932
30- public static URI parseManifest (RunContext runContext , File file ) throws IOException {
31- return runContext .storage ().putFile (file );
33+ private static final String TABLE_ASSET_TYPE = "io.kestra.plugin.ee.assets.Table" ;
34+ private static final String RESOURCE_TYPE_MODEL = "model" ;
35+
36+ public record ManifestResult (Manifest manifest , URI uri ) {
37+ }
38+
39+ public static ManifestResult parseManifestWithAssets (RunContext runContext , File file ) throws IOException , IllegalVariableEvaluationException {
40+ Manifest manifest = MAPPER .readValue (file , Manifest .class );
41+ emitAssets (runContext , manifest );
42+ return new ManifestResult (manifest , runContext .storage ().putFile (file ));
43+ }
44+
45+ public static URI parseManifest (RunContext runContext , File file ) throws IOException , IllegalVariableEvaluationException {
46+ return parseManifestWithAssets (runContext , file ).uri ();
3247 }
3348
3449 public static URI parseRunResult (RunContext runContext , File file ) throws IOException , IllegalVariableEvaluationException {
50+ return parseRunResult (runContext , file , null );
51+ }
52+
53+ public static URI parseRunResult (RunContext runContext , File file , Manifest manifest ) throws IOException , IllegalVariableEvaluationException {
3554 RunResult result = MAPPER .readValue (
3655 file ,
3756 RunResult .class
3857 );
3958
59+ Map <String , ModelAsset > modelAssets = manifest == null ? Map .of () : extractModelAssets (manifest );
60+
4061 java .util .List <WorkerTaskResult > workerTaskResults = result
4162 .getResults ()
4263 .stream ()
@@ -96,22 +117,26 @@ public static URI parseRunResult(RunContext runContext, File file) throws IOExce
96117 .filter (Objects ::nonNull )
97118 .forEach (runContext ::metric );
98119
99- return WorkerTaskResult .builder ()
100- .taskRun (TaskRun .builder ()
101- .id (IdUtils .create ())
102- .namespace (runContext .render ("{{ flow.namespace }}" ))
103- .flowId (runContext .render ("{{ flow.id }}" ))
104- .taskId (r .getUniqueId ())
105- .value (runContext .render ("{{ taskrun.id }}" ))
106- .executionId (runContext .render ("{{ execution.id }}" ))
107- .parentTaskRunId (runContext .render ("{{ taskrun.id }}" ))
120+ AssetsInOut assets = assetsFor (r .getUniqueId (), modelAssets );
121+ TaskRun .TaskRunBuilder taskRunBuilder = TaskRun .builder ()
122+ .id (IdUtils .create ())
123+ .namespace (runContext .render ("{{ flow.namespace }}" ))
124+ .flowId (runContext .render ("{{ flow.id }}" ))
125+ .taskId (r .getUniqueId ())
126+ .value (runContext .render ("{{ taskrun.id }}" ))
127+ .executionId (runContext .render ("{{ execution.id }}" ))
128+ .parentTaskRunId (runContext .render ("{{ taskrun.id }}" ))
129+ .state (state )
130+ .attempts (List .of (TaskRunAttempt .builder ()
108131 .state (state )
109- .attempts (List .of (TaskRunAttempt .builder ()
110- .state (state )
111- .build ()
112- ))
113132 .build ()
114- )
133+ ));
134+ if (assets != null ) {
135+ taskRunBuilder .assets (assets );
136+ }
137+
138+ return WorkerTaskResult .builder ()
139+ .taskRun (taskRunBuilder .build ())
115140 .build ();
116141 }))
117142 .toList ();
@@ -120,4 +145,139 @@ public static URI parseRunResult(RunContext runContext, File file) throws IOExce
120145
121146 return runContext .storage ().putFile (file );
122147 }
148+
149+ private static AssetsInOut assetsFor (String uniqueId , Map <String , ModelAsset > modelAssets ) {
150+ if (uniqueId == null ) {
151+ return null ;
152+ }
153+
154+ ModelAsset modelAsset = modelAssets .get (uniqueId );
155+ if (modelAsset == null ) {
156+ return null ;
157+ }
158+
159+ List <AssetIdentifier > inputs = modelAsset .dependsOn ()
160+ .stream ()
161+ .map (modelAssets ::get )
162+ .filter (Objects ::nonNull )
163+ .map (dependsOn -> new AssetIdentifier (null , null , dependsOn .assetId (), TABLE_ASSET_TYPE ))
164+ .toList ();
165+
166+ return new AssetsInOut (
167+ inputs ,
168+ List .of (Custom .builder ()
169+ .id (modelAsset .assetId ())
170+ .type (TABLE_ASSET_TYPE )
171+ .metadata (modelAsset .metadata ())
172+ .build ()
173+ )
174+ );
175+ }
176+
177+ private static void emitAssets (RunContext runContext , Manifest manifest ) throws IllegalVariableEvaluationException {
178+ Map <String , ModelAsset > modelAssets = extractModelAssets (manifest );
179+ runContext .logger ().info ("dbt assets extracted from manifest: {}" , modelAssets .size ());
180+ for (ModelAsset asset : modelAssets .values ()) {
181+ try {
182+ runContext .assets ().upsert (Custom .builder ()
183+ .id (asset .assetId ())
184+ .type (TABLE_ASSET_TYPE )
185+ .metadata (asset .metadata ())
186+ .build ()
187+ );
188+ } catch (UnsupportedOperationException | QueueException e ) {
189+ // UnsupportedOperationException for OSS or tests where EE is not configured (assets are EE only)
190+ runContext .logger ().warn ("Unable to upsert dbt asset '{}'" , asset .assetId (), e );
191+ }
192+ }
193+ }
194+
195+ private static Map <String , ModelAsset > extractModelAssets (Manifest manifest ) {
196+ if (manifest == null || manifest .getNodes () == null || manifest .getNodes ().isEmpty ()) {
197+ return Map .of ();
198+ }
199+
200+ String system = adapterType (manifest );
201+ Map <String , ModelAsset > modelAssets = new HashMap <>();
202+ for (Map .Entry <String , Manifest .Node > entry : manifest .getNodes ().entrySet ()) {
203+ Manifest .Node node = entry .getValue ();
204+ if (node == null || !RESOURCE_TYPE_MODEL .equalsIgnoreCase (node .getResourceType ())) {
205+ continue ;
206+ }
207+
208+ String uniqueId = firstNonBlank (node .getUniqueId (), entry .getKey ());
209+ if (uniqueId == null ) {
210+ continue ;
211+ }
212+
213+ String name = firstNonBlank (node .getAlias (), node .getName (), uniqueId );
214+ String assetId = assetIdFor (node .getDatabase (), node .getSchema (), name , uniqueId );
215+ Map <String , Object > metadata = new HashMap <>();
216+ if (hasValue (system )) {
217+ metadata .put ("system" , system );
218+ }
219+ if (hasValue (node .getDatabase ())) {
220+ metadata .put ("database" , node .getDatabase ());
221+ }
222+ if (hasValue (node .getSchema ())) {
223+ metadata .put ("schema" , node .getSchema ());
224+ }
225+ if (hasValue (name )) {
226+ metadata .put ("name" , name );
227+ }
228+
229+ List <String > dependsOn = List .of ();
230+ if (node .getDependsOn () != null ) {
231+ dependsOn = node .getDependsOn ().getOrDefault ("nodes" , List .of ());
232+ }
233+
234+ modelAssets .put (uniqueId , new ModelAsset (assetId , metadata , dependsOn ));
235+ }
236+
237+ return modelAssets ;
238+ }
239+
240+ private static String adapterType (Manifest manifest ) {
241+ if (manifest .getMetadata () == null ) {
242+ return null ;
243+ }
244+ Object adapterType = manifest .getMetadata ().get ("adapter_type" );
245+ return adapterType == null ? null : adapterType .toString ();
246+ }
247+
248+ private static String assetIdFor (String database , String schema , String name , String fallback ) {
249+ List <String > parts = new ArrayList <>();
250+ if (hasValue (database )) {
251+ parts .add (database );
252+ }
253+ if (hasValue (schema )) {
254+ parts .add (schema );
255+ }
256+ if (hasValue (name )) {
257+ parts .add (name );
258+ }
259+ if (!parts .isEmpty ()) {
260+ return String .join ("." , parts );
261+ }
262+ return fallback ;
263+ }
264+
265+ private static String firstNonBlank (String ... values ) {
266+ if (values == null ) {
267+ return null ;
268+ }
269+ for (String value : values ) {
270+ if (hasValue (value )) {
271+ return value ;
272+ }
273+ }
274+ return null ;
275+ }
276+
277+ private static boolean hasValue (String value ) {
278+ return value != null && !value .trim ().isEmpty ();
279+ }
280+
281+ private record ModelAsset (String assetId , Map <String , Object > metadata , List <String > dependsOn ) {
282+ }
123283}
0 commit comments