Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,5 @@ jobs:
with:
skip-test: ${{ github.event.inputs.skip-test == 'true' }}
kestra-version: ${{ github.event.inputs.kestra-version }}
java-version: '25'
secrets: inherit
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ tasks.withType(JavaCompile) {
options.compilerArgs.add("-parameters")
}

final targetJavaVersion = JavaVersion.VERSION_21
final targetJavaVersion = JavaVersion.VERSION_25

group "io.kestra.plugin"

Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
version=1.7.1-SNAPSHOT
kestraVersion=1.3.0
kestraVersion=2.0.0-SNAPSHOT
6 changes: 6 additions & 0 deletions plugin-script-bun/src/test/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,9 @@ kestra:
type: memory
repository:
type: memory
worker:
controllers:
type: STATIC
static:
endpoints:
- host: localhost
6 changes: 6 additions & 0 deletions plugin-script-deno/src/test/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,9 @@ kestra:
type: memory
repository:
type: memory
worker:
controllers:
type: STATIC
static:
endpoints:
- host: localhost
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,8 @@
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.FlowWithSource;
import io.kestra.core.models.property.Property;
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.runners.FlowListeners;
import io.kestra.core.queues.DispatchQueueInterface;

import io.kestra.core.utils.Await;
import io.kestra.core.utils.IdUtils;
import io.kestra.core.utils.TestsUtils;
Expand All @@ -30,8 +29,6 @@

import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import reactor.core.publisher.Flux;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
Expand All @@ -48,13 +45,10 @@ class CommandsTriggerTest {
protected FlowListeners flowListenersService;

@Inject
@Named(QueueFactoryInterface.EXECUTION_NAMED)
protected QueueInterface<Execution> executionQueue;
protected DispatchQueueInterface<Execution> executionQueue;

@Test
void commandsTrigger_shouldTriggerOnImplicitFailureExit1() throws Exception {
FlowListeners flowListenersServiceSpy = spy(this.flowListenersService);

CommandsTrigger trigger = CommandsTrigger.builder()
.id("commands-trigger")
.type(CommandsTrigger.class.getName())
Expand Down Expand Up @@ -93,10 +87,10 @@ void commandsTrigger_shouldTriggerOnImplicitFailureExit1() throws Exception {
CountDownLatch queueCount = new CountDownLatch(1);
AtomicReference<Execution> lastExecution = new AtomicReference<>();

Flux<Execution> receive = TestsUtils.receive(executionQueue, execution ->
executionQueue.addListener(execution ->
{
if (execution.getLeft().getFlowId().equals("commands-trigger-flow")) {
lastExecution.set(execution.getLeft());
if (execution.getFlowId().equals("commands-trigger-flow")) {
lastExecution.set(execution);
queueCount.countDown();
}
});
Expand Down Expand Up @@ -140,10 +134,6 @@ void commandsTrigger_shouldTriggerOnImplicitFailureExit1() throws Exception {
scheduler.close();
} catch (Exception ignored) {
}
try {
receive.blockLast();
} catch (Exception ignored) {
}
}
}

Expand Down Expand Up @@ -189,10 +179,10 @@ void commandsTrigger_shouldTriggerOnStdoutMatchUsingStructuredOutputs() throws E
CountDownLatch queueCount = new CountDownLatch(1);
AtomicReference<Execution> lastExecution = new AtomicReference<>();

Flux<Execution> receive = TestsUtils.receive(executionQueue, execution ->
executionQueue.addListener(execution ->
{
if (execution.getLeft().getFlowId().equals("commands-stdout-match-flow")) {
lastExecution.set(execution.getLeft());
if (execution.getFlowId().equals("commands-stdout-match-flow")) {
lastExecution.set(execution);
queueCount.countDown();
}
});
Expand Down Expand Up @@ -239,10 +229,6 @@ void commandsTrigger_shouldTriggerOnStdoutMatchUsingStructuredOutputs() throws E
scheduler.close();
} catch (Exception ignored) {
}
try {
receive.blockLast();
} catch (Exception ignored) {
}
}
}

Expand Down Expand Up @@ -280,8 +266,8 @@ void commandsTrigger_edgeModeShouldSuppressSecondEmission() throws Exception {
AtomicInteger executionCount = new AtomicInteger(0);
CountDownLatch firstExecution = new CountDownLatch(1);

Flux<Execution> receive = TestsUtils.receive(executionQueue, execution -> {
if (execution.getLeft().getFlowId().equals("commands-edge-flow")) {
executionQueue.addListener(execution -> {
if (execution.getFlowId().equals("commands-edge-flow")) {
executionCount.incrementAndGet();
firstExecution.countDown();
}
Expand All @@ -304,7 +290,6 @@ void commandsTrigger_edgeModeShouldSuppressSecondEmission() throws Exception {
} finally {
try { worker.shutdown(); } catch (Exception ignored) {}
try { scheduler.close(); } catch (Exception ignored) {}
try { receive.blockLast(); } catch (Exception ignored) {}
}
}

Expand Down Expand Up @@ -344,9 +329,9 @@ void commandsTrigger_shouldMatchRegexAgainstStructuredOutputs() throws Exception
CountDownLatch queueCount = new CountDownLatch(1);
AtomicReference<Execution> lastExecution = new AtomicReference<>();

Flux<Execution> receive = TestsUtils.receive(executionQueue, execution -> {
if (execution.getLeft().getFlowId().equals("commands-regex-flow")) {
lastExecution.set(execution.getLeft());
executionQueue.addListener(execution -> {
if (execution.getFlowId().equals("commands-regex-flow")) {
lastExecution.set(execution);
queueCount.countDown();
}
});
Expand All @@ -370,7 +355,6 @@ void commandsTrigger_shouldMatchRegexAgainstStructuredOutputs() throws Exception
} finally {
try { worker.shutdown(); } catch (Exception ignored) {}
try { scheduler.close(); } catch (Exception ignored) {}
try { receive.blockLast(); } catch (Exception ignored) {}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,14 @@
import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.core.models.executions.LogEntry;
import io.kestra.core.models.property.Property;
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.queues.DispatchQueueInterface;
import io.kestra.core.runners.RunContextFactory;
import io.kestra.core.storages.StorageInterface;
import io.kestra.core.tenant.TenantService;
import io.kestra.core.utils.TestsUtils;
import io.kestra.plugin.scripts.go.Commands;

import jakarta.inject.Inject;
import jakarta.inject.Named;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
Expand All @@ -37,16 +35,15 @@ public class CommandsTest {
RunContextFactory runContextFactory;

@Inject
@Named(QueueFactoryInterface.WORKERTASKLOG_NAMED)
private QueueInterface<LogEntry> logQueue;
private DispatchQueueInterface<LogEntry> logQueue;

@Inject
StorageInterface storageInterface;

@Test
void should_print_hello_there() throws Exception {
List<LogEntry> logs = new CopyOnWriteArrayList<>();
var receive = TestsUtils.receive(logQueue, l -> logs.add(l.getLeft()));
logQueue.addListener(logs::add);

var goScript = storageInterface.put(
TenantService.MAIN_TENANT,
Expand Down Expand Up @@ -91,7 +88,6 @@ func main() {
assertThat(run.getStdErrLineCount(), is(3)); // go logs everything to stderr

TestsUtils.awaitLog(logs, log -> log.getMessage() != null && log.getMessage().contains("hello there!"));
receive.blockLast();
assertThat(List.copyOf(logs).stream().filter(logEntry -> logEntry.getMessage() != null && logEntry.getMessage().contains("hello there!")).count(), is(1L));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,14 @@
import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.core.models.executions.LogEntry;
import io.kestra.core.models.property.Property;
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.queues.DispatchQueueInterface;
import io.kestra.core.runners.RunContextFactory;
import io.kestra.core.storages.StorageInterface;
import io.kestra.core.tenant.TenantService;
import io.kestra.core.utils.TestsUtils;
import io.kestra.plugin.scripts.go.Script;

import jakarta.inject.Inject;
import jakarta.inject.Named;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
Expand All @@ -33,16 +31,15 @@ public class ScriptTest {
RunContextFactory runContextFactory;

@Inject
@Named(QueueFactoryInterface.WORKERTASKLOG_NAMED)
private QueueInterface<LogEntry> logQueue;
private DispatchQueueInterface<LogEntry> logQueue;

@Inject
StorageInterface storageInterface;

@Test
void should_print_hello_there() throws Exception {
List<LogEntry> logs = new CopyOnWriteArrayList<>();
var receive = TestsUtils.receive(logQueue, l -> logs.add(l.getLeft()));
logQueue.addListener(logs::add);

var script = Script.builder()
.id("go-script-" + UUID.randomUUID())
Expand All @@ -65,7 +62,6 @@ func main() {
assertThat(run.getStdErrLineCount(), is(0));

TestsUtils.awaitLog(logs, log -> log.getMessage() != null && log.getMessage().contains("hello there!"));
receive.blockLast();
assertThat(List.copyOf(logs).stream().filter(logEntry -> logEntry.getMessage() != null && logEntry.getMessage().contains("hello there!")).count(), is(1L));
}

Expand Down
6 changes: 6 additions & 0 deletions plugin-script-go/src/test/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,9 @@ kestra:
type: memory
repository:
type: memory
worker:
controllers:
type: STATIC
static:
endpoints:
- host: localhost
6 changes: 6 additions & 0 deletions plugin-script-groovy/src/test/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,9 @@ kestra:
type: memory
repository:
type: memory
worker:
controllers:
type: STATIC
static:
endpoints:
- host: localhost
6 changes: 6 additions & 0 deletions plugin-script-lua/src/test/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,9 @@ kestra:
type: memory
repository:
type: memory
worker:
controllers:
type: STATIC
static:
endpoints:
- host: localhost
6 changes: 6 additions & 0 deletions plugin-script-nashorn/src/test/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,9 @@ kestra:
type: memory
repository:
type: memory
worker:
controllers:
type: STATIC
static:
endpoints:
- host: localhost
6 changes: 6 additions & 0 deletions plugin-script-perl/src/test/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,9 @@ kestra:
type: memory
repository:
type: memory
worker:
controllers:
type: STATIC
static:
endpoints:
- host: localhost
6 changes: 6 additions & 0 deletions plugin-script-php/src/test/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,9 @@ kestra:
type: memory
repository:
type: memory
worker:
controllers:
type: STATIC
static:
endpoints:
- host: localhost
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@

tasks:
- id: python
type: io.kestra.core.tasks.scripts.Python
type: io.kestra.plugin.core.scripts.Python
inputFiles:
data.json: |
{"status": "OK"}
Expand Down Expand Up @@ -94,7 +94,7 @@

tasks:
- id: python
type: io.kestra.core.tasks.scripts.Python
type: io.kestra.plugin.core.scripts.Python
inputFiles:
data.csv: {{outputs.previousTaskId.uri}}
main.py: |
Expand Down
6 changes: 6 additions & 0 deletions plugin-script-python/src/test/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,9 @@ kestra:
type: memory
repository:
type: memory
worker:
controllers:
type: STATIC
static:
endpoints:
- host: localhost
Loading
Loading