Skip to content

Commit d0a7fe0

Browse files
feat(core): attach logs to dynamically-generated taskruns and nest them in the UI
Plugins that synthesize dynamic taskruns via dynamicWorkerResult (e.g. the Ansible and dbt plugins building one taskrun per unit of work) had no way to attach logs to them, so all the output landed on the parent task's root taskrun. Add an overload dynamicWorkerResult(WorkerTaskResult, List<DynamicTaskRunLog>) that registers the taskrun and its log lines in a single call. The lines are emitted through the regular logging pipeline (a child logger bound to the taskrun), so the standard appender behaviour - secret masking, long-message splitting, level filtering, server-log forwarding - applies natively. The logs ride with the taskrun being registered, so they can only target it: execution/tenant/namespace/flow are taken from the run context and the attempt is fixed to 0, so a plugin never builds (nor can forge) a LogEntry for another execution or taskrun. The default registers the taskrun and drops the logs so runtimes that don't support per-taskrun logs still show the taskrun. Nest these taskruns under their parent in the execution views (they carry parentTaskRunId): indent child rows by depth in Gantt and the Logs view, and make the topology "Show task logs" action include a node's dynamic child taskruns so their logs show alongside the task's own.
1 parent 0bf78c9 commit d0a7fe0

9 files changed

Lines changed: 331 additions & 29 deletions

File tree

core/src/main/java/io/kestra/core/runners/DefaultRunContext.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -506,6 +506,15 @@ public List<WorkerTaskResult> dynamicWorkerResults() {
506506
return dynamicWorkerTaskResult;
507507
}
508508

509+
@Override
510+
public void dynamicWorkerResult(WorkerTaskResult workerTaskResult, List<DynamicTaskRunLog> logs) {
511+
this.dynamicWorkerTaskResult.add(workerTaskResult);
512+
513+
if (logs != null && !logs.isEmpty() && workerTaskResult.getTaskRun() != null) {
514+
this.logger.emitDynamicTaskRunLogs(workerTaskResult.getTaskRun(), logs);
515+
}
516+
}
517+
509518
/**
510519
* {@inheritDoc}
511520
*/
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
package io.kestra.core.runners;
2+
3+
import org.slf4j.event.Level;
4+
5+
/**
6+
* A log line to attach to a dynamically-generated taskrun when it is registered through
7+
* {@link RunContext#dynamicWorkerResult(WorkerTaskResult, java.util.List)}.
8+
* <p>
9+
* Callers only provide the level and message — the execution, tenant, namespace, flow, taskrun
10+
* identity and attempt are filled in from the run context, and secrets are masked, so a plugin
11+
* never builds (nor can tamper with) a full {@link io.kestra.core.models.executions.LogEntry}.
12+
*/
13+
public record DynamicTaskRunLog(Level level, String message) {
14+
}

core/src/main/java/io/kestra/core/runners/RunContext.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,22 @@ public abstract class RunContext implements PropertyContext {
121121

122122
public abstract List<WorkerTaskResult> dynamicWorkerResults();
123123

124+
/**
125+
* Registers a dynamically-generated taskrun together with the log lines to attach to it, in a
126+
* single call — the way plugins surface logs under the sub-taskruns they create at runtime
127+
* (these taskruns are synthesized after their underlying work has finished, so their logs are
128+
* already available at registration time).
129+
* <p>
130+
* The logs ride with the taskrun being registered, so they can only ever target that taskrun;
131+
* their execution, tenant, namespace, flow and attempt are taken from this context (a plugin
132+
* cannot target another execution or tenant) and secrets are masked. The default registers the
133+
* taskrun and drops the logs, so runtimes that don't support per-taskrun logs (and older ones)
134+
* still show the taskrun rather than failing the task.
135+
*/
136+
public void dynamicWorkerResult(WorkerTaskResult workerTaskResult, List<DynamicTaskRunLog> logs) {
137+
this.dynamicWorkerResult(List.of(workerTaskResult));
138+
}
139+
124140
/**
125141
* Gets access to the working directory.
126142
*

core/src/main/java/io/kestra/core/runners/RunContextLogger.java

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
1717
import io.kestra.core.models.executions.LogEntry;
18+
import io.kestra.core.models.executions.TaskRun;
1819

1920
import ch.qos.logback.classic.Level;
2021
import ch.qos.logback.classic.Logger;
@@ -213,6 +214,59 @@ public void emitLogs(List<LogEntry> logEntries) {
213214
this.logEmitter.emits(logEntries);
214215
}
215216

217+
/**
218+
* Emit the log lines attached to a dynamically-generated taskrun through the regular logging
219+
* pipeline, so the standard appender behaviour (secret masking, long-message splitting, level
220+
* filtering, forwarding to the server log, file vs queue routing) applies natively — the only
221+
* thing that changes is the taskrun the lines are attributed to.
222+
* <p>
223+
* When this context logs to a file ({@code logToFile}), its logs are file-only (no inline/queue
224+
* display): the lines are routed through this context's own logger so they land in the same
225+
* downloadable file. A flat file has no taskrun, so there is nothing to link there.
226+
* <p>
227+
* Otherwise the lines go through a child logger bound to a {@link LogEntry} whose execution,
228+
* tenant, namespace and flow are taken from this context's bound entry — a caller cannot target
229+
* another execution or tenant — with the dynamic taskrun's id and a fixed attempt 0 (these
230+
* taskruns have a single attempt and the log view groups by the 0-based attempt). The level
231+
* filter and the secrets known to this context are inherited so nothing else is lost.
232+
*/
233+
public void emitDynamicTaskRunLogs(TaskRun dynamicTaskRun, List<DynamicTaskRunLog> logs) {
234+
// A logToFile task logs to a file only (no inline display): route the lines through this
235+
// context's own logger so they join the same downloadable file (a flat file has no taskrun
236+
// to link to). Otherwise emit through a logger bound to the dynamic taskrun, so the lines
237+
// are attributed to it while still passing through the regular appender pipeline.
238+
org.slf4j.Logger logger = this.logToFile ? this.logger() : deriveLoggerFor(dynamicTaskRun).logger();
239+
240+
for (DynamicTaskRunLog log : logs) {
241+
logger.atLevel(log.level()).log(log.message());
242+
}
243+
}
244+
245+
/**
246+
* Derive a logger bound to a dynamically-generated taskrun: a child of this context that shares
247+
* its log emitter, level filter and known secrets, but emits under the taskrun's id with a fixed
248+
* attempt 0 (these taskruns have a single attempt and the log view groups by the 0-based
249+
* attempt). Execution, tenant, namespace and flow are taken from this context, so a caller can
250+
* only ever target this execution's taskrun — never forge a log for another execution or tenant.
251+
*/
252+
private RunContextLogger deriveLoggerFor(TaskRun dynamicTaskRun) {
253+
LogEntry boundLogEntry = LogEntry.builder()
254+
.tenantId(this.logEntry.getTenantId())
255+
.executionId(this.logEntry.getExecutionId())
256+
.namespace(this.logEntry.getNamespace())
257+
.flowId(this.logEntry.getFlowId())
258+
.executionKind(this.logEntry.getExecutionKind())
259+
.taskId(dynamicTaskRun.getTaskId())
260+
.taskRunId(dynamicTaskRun.getId())
261+
.attemptNumber(0)
262+
.build();
263+
264+
RunContextLogger taskRunLogger = new RunContextLogger(this.logEmitter, boundLogEntry, null, false);
265+
taskRunLogger.loglevel = this.loglevel; // inherit this context's level filter as-is (logback Level)
266+
taskRunLogger.useSecrets.addAll(this.useSecrets);
267+
return taskRunLogger;
268+
}
269+
216270
private Logger initializeLogger() {
217271
LoggerContext loggerContext = new LoggerContext();
218272
LogbackMDCAdapter mdcAdapter = new LogbackMDCAdapter();

core/src/test/java/io/kestra/core/runners/RunContextLoggerTest.java

Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
import io.kestra.core.models.executions.Execution;
1818
import io.kestra.core.models.executions.LogEntry;
19+
import io.kestra.core.models.executions.TaskRun;
20+
import io.kestra.core.models.executions.TaskRunAttempt;
1921
import io.kestra.core.models.flows.Flow;
2022
import io.kestra.core.queues.BroadcastQueueInterface;
2123
import io.kestra.core.queues.DispatchQueueInterface;
@@ -213,6 +215,156 @@ void emitLogs_sendsToBothQueues() {
213215
assertThat(followQueueLogs).containsExactlyInAnyOrder(FollowLogEvent.from(e1), FollowLogEvent.from(e2));
214216
}
215217

218+
@Test
219+
void emitDynamicTaskRunLogs_forcesContextAndAttemptZeroAndMasks() {
220+
List<LogEntry> logs = new CopyOnWriteArrayList<>();
221+
logQueue.addListener(logs::add);
222+
223+
Flow flow = TestsUtils.mockFlow();
224+
Execution execution = TestsUtils.mockExecution(flow, Map.of());
225+
226+
RunContextLogger runContextLogger = new RunContextLogger(
227+
logEntryEmitter,
228+
LogEntry.of(execution),
229+
Level.TRACE,
230+
false
231+
);
232+
runContextLogger.usedSecret("super-secret-value");
233+
234+
// a dynamic taskrun that (deliberately) carries a foreign execution/tenant and one attempt:
235+
// the emitted entry must NOT inherit those — execution/tenant/namespace/flow come from the context.
236+
TaskRun dynamicTaskRun = TaskRun.builder()
237+
.id("dyn-taskrun-id")
238+
.taskId("Play | Task 1")
239+
.tenantId("other-tenant")
240+
.executionId("other-execution")
241+
.namespace("other.namespace")
242+
.flowId("other-flow")
243+
.attempts(List.of(TaskRunAttempt.builder().build()))
244+
.build();
245+
246+
runContextLogger.emitDynamicTaskRunLogs(dynamicTaskRun, List.of(new DynamicTaskRunLog(Level.ERROR, "leak super-secret-value here")));
247+
248+
List<LogEntry> queueLogs = TestsUtils.awaitLogs(logs, 1);
249+
assertThat(queueLogs).hasSize(1);
250+
LogEntry emitted = queueLogs.getFirst();
251+
// taskrun identity comes from the dynamic taskrun
252+
assertThat(emitted.getTaskRunId()).isEqualTo("dyn-taskrun-id");
253+
assertThat(emitted.getTaskId()).isEqualTo("Play | Task 1");
254+
// attempt is forced to 0 (these taskruns have one attempt; the log view groups by the 0-based attempt)
255+
assertThat(emitted.getAttemptNumber()).isEqualTo(0);
256+
// execution/tenant/namespace/flow are forced from the context, never from the (foreign) taskrun
257+
assertThat(emitted.getExecutionId()).isEqualTo(execution.getId());
258+
assertThat(emitted.getExecutionId()).isNotEqualTo("other-execution");
259+
assertThat(emitted.getTenantId()).isEqualTo(execution.getTenantId());
260+
assertThat(emitted.getNamespace()).isEqualTo(execution.getNamespace());
261+
assertThat(emitted.getFlowId()).isEqualTo(execution.getFlowId());
262+
// level preserved + secret masked
263+
assertThat(emitted.getLevel()).isEqualTo(Level.ERROR);
264+
assertThat(emitted.getMessage()).isEqualTo("leak ****** here");
265+
}
266+
267+
@Test
268+
void emitDynamicTaskRunLogs_inheritsLevelFilter() {
269+
List<LogEntry> logs = new CopyOnWriteArrayList<>();
270+
logQueue.addListener(logs::add);
271+
272+
Flow flow = TestsUtils.mockFlow();
273+
Execution execution = TestsUtils.mockExecution(flow, Map.of());
274+
275+
// the context filters at WARN: an INFO dynamic line must be dropped, like any task log
276+
RunContextLogger runContextLogger = new RunContextLogger(
277+
logEntryEmitter,
278+
LogEntry.of(execution),
279+
Level.WARN,
280+
false
281+
);
282+
283+
TaskRun dynamicTaskRun = TaskRun.builder()
284+
.id("dyn-taskrun-id")
285+
.taskId("Play | Task 1")
286+
.attempts(List.of(TaskRunAttempt.builder().build()))
287+
.build();
288+
289+
runContextLogger.emitDynamicTaskRunLogs(dynamicTaskRun, List.of(
290+
new DynamicTaskRunLog(Level.INFO, "info dropped by filter"),
291+
new DynamicTaskRunLog(Level.ERROR, "error kept")
292+
));
293+
294+
List<LogEntry> queueLogs = TestsUtils.awaitLogs(logs, 1);
295+
assertThat(queueLogs).hasSize(1);
296+
assertThat(queueLogs.getFirst().getLevel()).isEqualTo(Level.ERROR);
297+
assertThat(queueLogs.getFirst().getMessage()).isEqualTo("error kept");
298+
assertThat(queueLogs).noneMatch(l -> l.getLevel().equals(Level.INFO));
299+
}
300+
301+
@Test
302+
void emitDynamicTaskRunLogs_underLogToFileGoesToFileNotQueue() throws Exception {
303+
List<LogEntry> logs = new CopyOnWriteArrayList<>();
304+
logQueue.addListener(logs::add);
305+
306+
Flow flow = TestsUtils.mockFlow();
307+
Execution execution = TestsUtils.mockExecution(flow, Map.of());
308+
309+
// logToFile=true: task logs are file-only, so the dynamic lines must land in the file
310+
// (with masking) and never reach the inline log queue
311+
RunContextLogger runContextLogger = new RunContextLogger(
312+
logEntryEmitter,
313+
LogEntry.of(execution),
314+
Level.TRACE,
315+
true
316+
);
317+
runContextLogger.usedSecret("super-secret-value");
318+
319+
TaskRun dynamicTaskRun = TaskRun.builder()
320+
.id("dyn-taskrun-id")
321+
.taskId("Play | Task 1")
322+
.attempts(List.of(TaskRunAttempt.builder().build()))
323+
.build();
324+
325+
runContextLogger.emitDynamicTaskRunLogs(dynamicTaskRun, List.of(
326+
new DynamicTaskRunLog(Level.INFO, "to file super-secret-value")
327+
));
328+
329+
runContextLogger.closeLogFile();
330+
String fileContent = java.nio.file.Files.readString(runContextLogger.getLogFile().toPath());
331+
assertThat(fileContent).contains("to file ******");
332+
// file-only: ContextAppender is not attached, so nothing reaches the inline queue
333+
assertThat(logs).isEmpty();
334+
}
335+
336+
@Test
337+
void emitDynamicTaskRunLogs_seedsMDCWithDynamicTaskRunIdentity() {
338+
Flow flow = TestsUtils.mockFlow();
339+
Execution execution = TestsUtils.mockExecution(flow, Map.of());
340+
341+
// mirror exactly how emitDynamicTaskRunLogs binds the child logger for a dynamic taskrun:
342+
// execution context + the dynamic taskrun's id/taskId, attempt 0
343+
LogEntry boundLogEntry = LogEntry.of(execution).toBuilder()
344+
.taskId("Play | Task 1")
345+
.taskRunId("dyn-taskrun-id")
346+
.attemptNumber(0)
347+
.build();
348+
349+
RunContextLogger runContextLogger = new RunContextLogger(
350+
logEntryEmitter,
351+
boundLogEntry,
352+
Level.TRACE,
353+
false
354+
);
355+
ch.qos.logback.classic.Logger perRunLogger =
356+
(ch.qos.logback.classic.Logger) runContextLogger.logger();
357+
358+
// the per-run MDC carries the dynamic taskrun identity (taskRunId/taskId), not just the
359+
// execution context — so forwarded server logs are attributed to the dynamic taskrun too
360+
assertThat(perRunLogger.getLoggerContext().getMDCAdapter().getCopyOfContextMap())
361+
.containsEntry("taskRunId", "dyn-taskrun-id")
362+
.containsEntry("taskId", "Play | Task 1")
363+
.containsEntry("executionId", execution.getId())
364+
.containsEntry("namespace", execution.getNamespace())
365+
.containsEntry("flowId", execution.getFlowId());
366+
}
367+
216368
@Test
217369
void transformPreservesMDC() throws Exception {
218370
Flow flow = TestsUtils.mockFlow();

ui/packages/topology/src/nodes/TaskNode.vue

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,7 @@
119119
interface TaskRun {
120120
id: string
121121
taskId: string;
122+
parentTaskRunId?: string;
122123
state: {
123124
current: [string, string];
124125
duration?: string;
@@ -227,6 +228,16 @@
227228
)
228229
})
229230
231+
// The task's own taskruns plus any dynamically-generated child taskruns (e.g. Ansible
232+
// plays/tasks) so "show task logs" surfaces their logs too, not just the task's root logs.
233+
const taskRunsWithDynamicChildren = computed(() => {
234+
const ids = new Set(taskRuns.value.map((t: TaskRun) => t.id))
235+
const children = taskRunList.value.filter(
236+
(t: TaskRun) => t.parentTaskRunId && ids.has(t.parentTaskRunId),
237+
)
238+
return [...taskRuns.value, ...children]
239+
})
240+
230241
const state = computed(() => {
231242
if (!taskRuns.value?.length) {
232243
return null
@@ -341,7 +352,7 @@
341352
key: "logs",
342353
label: t("show task logs"),
343354
icon: TextBoxSearch,
344-
onClick: () => emit(EVENTS.SHOW_LOGS, {id: taskId.value, execution: taskExecution.value, taskRuns: taskRuns.value}),
355+
onClick: () => emit(EVENTS.SHOW_LOGS, {id: taskId.value, execution: taskExecution.value, taskRuns: taskRunsWithDynamicChildren.value}),
345356
})
346357
}
347358
if (taskExecution.value) {

ui/src/components/executions/Gantt.vue

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,10 @@
9999
<ChevronRight v-if="!selectedTaskRuns.includes(item.id)" />
100100
<ChevronDown v-else />
101101
</div>
102-
<div class="task-label">
102+
<div
103+
class="task-label"
104+
:style="{'--depth': item.depth || 0}"
105+
>
103106
<div v-if="taskTypeByTaskRunId[item.id]" class="task-icon-box">
104107
<KsTaskIcon :cls="taskTypeByTaskRunId[item.id]" onlyIcon :icons="pluginsStore.icons" />
105108
</div>
@@ -858,6 +861,7 @@
858861
display: flex;
859862
align-items: center;
860863
gap: var(--ks-spacing-4);
864+
padding-left: calc(var(--depth, 0) * var(--ks-spacing-5));
861865
862866
code {
863867
color: var(--ks-text-primary);

ui/src/components/executions/TaskRunLine.vue

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
<template>
2-
<div v-if="!hideHeader" class="taskrun-header">
2+
<div
3+
v-if="!hideHeader"
4+
class="taskrun-header"
5+
:style="{'--depth': depth}"
6+
>
37
<div>
48
<KsIcon
59
v-if="!taskRunId && shouldDisplayChevron(currentTaskRun)"
@@ -135,6 +139,7 @@
135139
logs?: any[]
136140
filter?: string
137141
hideHeader?: boolean
142+
depth?: number
138143
}
139144
140145
const props = withDefaults(defineProps<Props>(), {
@@ -146,6 +151,7 @@
146151
logs: () => [],
147152
filter: "",
148153
hideHeader: false,
154+
depth: 0,
149155
})
150156
151157
const emit = defineEmits<{
@@ -247,6 +253,7 @@
247253
248254
.taskrun-header {
249255
background-color: var(--ks-bg-surface);
256+
padding-left: calc(var(--ks-spacing-4) + var(--depth, 0) * var(--ks-spacing-5));
250257
251258
.task-icon {
252259
width: 36px;

0 commit comments

Comments
 (0)