Skip to content

Conversation

nemesiscodex
Copy link

What changes are being made and why?

closes #4129


How the changes have been QAed?

Steps to reproduce:

  1. Open a terminal at the path of the docker-compose file
  2. Execute docker compose up
  3. In the Browser navigate to the kestra instance and create a new Flow with the content below
  4. Execute the flow and observe its logs
  5. If you see the message starting with Starting Job go to your terminal with the running kestra instance and press CTRL + c to shut it down.
  6. Start Kestra again and observe the execution you triggered before shutting down

Alternative with devcontainers:

  1. Run kestra with ./gradlew runLocal
  2. In the Browser navigate to the kestra instance and create a new Flow with the content below
  3. Execute the flow and observe its logs
  4. If you see the message starting with Starting Job go to your terminal, find the container started by devcontainer and do docker stop <devcontainerid>
  5. Go back to your IDE, reload the window and start kestra again ./gradlew runLocal
  6. Observe the execution you triggered before shutting down
id: selfupdate
namespace: dev
description: Update Kestra itself with docker-compose

labels:
  env: dev

tasks:
  - id: pull_images
    type: io.kestra.plugin.scripts.shell.Commands
    commands:
      - export EXECUTION_DATE=$(date) # keep the date constant, so we can track if the first or the second start finished
      - echo Starting Job $EXECUTION_DATE
      - sleep 30
      - echo Finished Job $EXECUTION_DATE

logger.trace("Container created: {}", exec.getId());
// evaluate resume (task property overrides plugin configuration if set)
Boolean resumeProp = runContext.render(this.resume).as(Boolean.class).orElse(null);
Optional<Boolean> resumeConfig = runContext.pluginConfiguration(RESUME_ENABLED_CONFIG);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We didn't offer plugin configuration for resume as it's a task property so it could be enabled globally via task defaults.
We never offer both a plugin config and a task property.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you remove the ability to configure it via plugin configuration?

Comment on lines +393 to +395
if (logger.isDebugEnabled()) {
logger.debug("Resuming existing container {}", containerId);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if (logger.isDebugEnabled()) {
logger.debug("Resuming existing container {}", containerId);
}
logger.info("Resuming existing container {}", containerId);

Await.until(ended::get);

if (exitCode != 0) {
if (needVolume && FileHandlingStrategy.VOLUME.equals(strategy) && filesVolumeName != null) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you remove the check on fileVolumeName?

if (needVolume && FileHandlingStrategy.VOLUME.equals(strategy) && filesVolumeName != null) {
downloadOutputFiles(exec.getId(), dockerClient, runContext, taskCommands);
if (needVolume && FileHandlingStrategy.VOLUME.equals(strategy) && filesVolumeName != null) {
// For newly created containers, original condition holds; for resumed ones, filesVolumeName is null
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case, for resumed container we would never arrives here.
I think you should set fileVolumesName by inspecting the resumed container, should be doable

"echo \"::{\\\"outputs\\\":{\\\"msg\\\":\\\"Token\\\"}}::\" && sleep 1"
)));

var first = dockerCreate.run(runContext, createCommands, Collections.emptyList());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Run will ends the container so I'm not sure the test does what you think it does

@loicmathieu
Copy link
Member

For testing, only a single test could be added to check that resuming work.
This is how it is done for our Kubernetes task runner, you can take inspiration and adapt for Docker.

The idea is to start the taskrunner in a thread, interrupt the thread when the container is started, start another one and check that the same container is used.

 @Test
    void killAfterResume() throws Exception {
        var runContext = runContext(this.runContextFactory);
        var commands = initScriptCommands(runContext);
        Mockito.when(commands.getCommands()).thenReturn(Property.ofValue(
            ScriptService.scriptCommands(List.of("/bin/sh", "-c"), Collections.emptyList(), List.of("sleep 50"))
        ));

        var taskRunner = ((Kubernetes) taskRunner()).toBuilder().delete(Property.ofValue(false)).resume(Property.ofValue(true)).build();
        Thread initialPodThread = new Thread(throwRunnable(() -> taskRunner.run(runContext, commands, Collections.emptyList())));
        initialPodThread.start();
        try (var client = PodService.client(runContext, taskRunner.getConfig())) {
            String labelSelector = "kestra.io/taskrun-id=" + ((Map<String, Object>) runContext.getVariables().get("taskrun")).get("id");
            Await.until(() -> {
                PodList existingPods;
                try {
                    existingPods = client.pods().inNamespace(runContext.render(taskRunner.getNamespace()).as(String.class).orElseThrow()).list(new ListOptionsBuilder().withLabelSelector(labelSelector).build());
                } catch (IllegalVariableEvaluationException e) {
                    throw new RuntimeException(e);
                }
                return !existingPods.getItems().isEmpty();
            });
            initialPodThread.interrupt();

            Map<String, Object> taskRunProps = new HashMap<>((Map<String, Object>) runContext.getVariables().get("taskrun"));
            RunContext anotherRunContext = runContext(this.runContextFactory, Map.of("taskrun", taskRunProps));
            var anotherTaskRunner = ((Kubernetes) taskRunner()).toBuilder().delete(Property.ofValue(false)).resume(Property.ofValue(true)).build();

            List<LogEntry> logs = new CopyOnWriteArrayList<>();
            Flux<LogEntry> receive = TestsUtils.receive(logQueue, (logEntry) -> {
                logs.add(logEntry.getLeft());
            });
            Thread resumePodThread = new Thread(throwRunnable(() -> anotherTaskRunner.run(anotherRunContext, commands, Collections.emptyList())));
            resumePodThread.start();
            TestsUtils.awaitLog(logs, logEntry -> logEntry.getMessage().contains("resumed from an already running pod"));
            receive.blockLast();

            anotherTaskRunner.kill();
            resumePodThread.interrupt();

            try {
                PodList existingPods = client.pods().inNamespace(runContext.render(taskRunner.getNamespace()).as(String.class).orElseThrow()).list(new ListOptionsBuilder().withLabelSelector(labelSelector).build());
                assertThat(existingPods.getItems().isEmpty(), is(true));
            } catch (IllegalVariableEvaluationException e) {
                throw new RuntimeException(e);
            }
        }
    }

@nemesiscodex
Copy link
Author

Thanks for the review, I will check and fix tonight 🙂

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

Status: To review

Development

Successfully merging this pull request may close these issues.

Implements resume of container for the Docker task runner

2 participants