diff --git a/.devcontainer/Dockerfile b/.devcontainer/Dockerfile index 818e932..c0b16c3 100644 --- a/.devcontainer/Dockerfile +++ b/.devcontainer/Dockerfile @@ -14,9 +14,9 @@ ENV HOME="/root" # -------------------------------------- # Git # -------------------------------------- -# Need to add the devcontainer workspace folder as a safe directory to enable git +# Need to add the devcontainer workspace folder as a safe directory to enable git # version control system to be enabled in the containers file system. -RUN git config --global --add safe.directory "/workspaces/plugin-template" +RUN git config --global --add safe.directory "/workspaces/plugin-email" # -------------------------------------- # -------------------------------------- @@ -53,11 +53,11 @@ ENV PATH="$PATH:$JAVA_HOME/bin" # Will load a custom configuration file for Micronaut ENV MICRONAUT_ENVIRONMENTS=local,override # Sets the path where you save plugins as Jar and is loaded during the startup process -ENV KESTRA_PLUGINS_PATH="/workspaces/plugin-template/local/plugins" +ENV KESTRA_PLUGINS_PATH="/workspaces/plugin-email/local/plugins" # -------------------------------------- # -------------------------------------- -# SSH +# SSH # -------------------------------------- RUN mkdir -p ~/.ssh RUN touch ~/.ssh/config diff --git a/.devcontainer/devcontainer.json b/.devcontainer/devcontainer.json index b5184d6..54ce4a8 100644 --- a/.devcontainer/devcontainer.json +++ b/.devcontainer/devcontainer.json @@ -1,10 +1,10 @@ { - "name": "plugin-template", + "name": "plugin-email", "build": { "context": ".", "dockerfile": "Dockerfile" }, - "workspaceFolder": "/workspaces/plugin-template", + "workspaceFolder": "/workspaces/plugin-email", "forwardPorts": [8080], "customizations": { "vscode": { diff --git a/build.gradle b/build.gradle index e1d5ded..fd3be47 100644 --- a/build.gradle +++ b/build.gradle @@ -29,7 +29,7 @@ java { } group = "io.kestra.plugin" -description = 'Plugin template for Kestra' +description = 'Plugin Email for Kestra' tasks.withType(JavaCompile).configureEach { options.encoding = "UTF-8" @@ -49,6 +49,10 @@ dependencies { annotationProcessor group: "io.kestra", name: "processor", version: kestraVersion compileOnly group: "io.kestra", name: "core", version: kestraVersion compileOnly group: "io.kestra", name: "script", version: kestraVersion + + // libs + api 'org.simplejavamail:simple-java-mail:8.12.6' + api 'org.eclipse.angus:jakarta.mail:2.0.5' } @@ -91,11 +95,15 @@ dependencies { testImplementation group: "io.kestra", name: "repository-memory", version: kestraVersion testImplementation group: "io.kestra", name: "runner-memory", version: kestraVersion testImplementation group: "io.kestra", name: "storage-local", version: kestraVersion + testImplementation group: "io.kestra", name: "scheduler", version: kestraVersion + testImplementation group: "io.kestra", name: "worker", version: kestraVersion // test testImplementation "org.junit.jupiter:junit-jupiter-engine" testImplementation "org.hamcrest:hamcrest" testImplementation "org.hamcrest:hamcrest-library" + + testImplementation group: 'com.icegreen', name: 'greenmail-junit5', version: '2.1.7' } /**********************************************************************************************************************\ @@ -175,8 +183,8 @@ jar { manifest { attributes( "X-Kestra-Name": project.name, - "X-Kestra-Title": "Template", - "X-Kestra-Group": project.group + ".templates", + "X-Kestra-Title": "Email", + "X-Kestra-Group": project.group + ".email", "X-Kestra-Description": project.description, "X-Kestra-Version": project.version ) diff --git a/gradle.properties b/gradle.properties index 65dfcc5..ac643f9 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,2 +1,2 @@ version=1.1.0-SNAPSHOT -kestraVersion=1.0.0 +kestraVersion=1.1.11 diff --git a/settings.gradle b/settings.gradle index aaa347f..825b030 100644 --- a/settings.gradle +++ b/settings.gradle @@ -1 +1 @@ -rootProject.name = 'plugin-template' +rootProject.name = 'plugin-email' diff --git a/src/main/java/io/kestra/plugin/email/AbstractMailTrigger.java b/src/main/java/io/kestra/plugin/email/AbstractMailTrigger.java new file mode 100644 index 0000000..724d637 --- /dev/null +++ b/src/main/java/io/kestra/plugin/email/AbstractMailTrigger.java @@ -0,0 +1,73 @@ +package io.kestra.plugin.email; + +import io.kestra.core.models.property.Property; +import io.kestra.core.models.triggers.AbstractTrigger; +import io.kestra.core.runners.RunContext; +import io.swagger.v3.oas.annotations.media.Schema; +import jakarta.validation.constraints.NotNull; +import lombok.Builder; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.experimental.SuperBuilder; + +import java.time.Duration; + +@SuperBuilder +@Getter +@NoArgsConstructor +public abstract class AbstractMailTrigger extends AbstractTrigger { + + public Duration getInterval(){ + return Duration.ofSeconds(60); + } + @Schema(title = "Mail server protocol", description = "The protocol to use for connecting to the mail server") + @Builder.Default + protected final Property protocol = Property.ofValue(MailService.Protocol.IMAP); + + @Schema(title = "Mail server host", description = "The hostname or IP address of the mail server") + @NotNull + protected Property host; + + @Schema(title = "Mail server port", description = "The port number of the mail server. Defaults: IMAP=993 (SSL), 143 (non-SSL); POP3=995 (SSL), 110 (non-SSL)") + protected Property port; + + @Schema(title = "Username", description = "The username for authentication") + @NotNull + protected Property username; + + @Schema(title = "Password", description = "The password for authentication") + @NotNull + protected Property password; + + @Schema(title = "Mail folder", description = "The mail folder to monitor (IMAP only)") + @Builder.Default + protected final Property folder = Property.ofValue("INBOX"); + + @Schema(title = "Use SSL", description = "Whether to use SSL/TLS encryption") + @Builder.Default + protected final Property ssl = Property.ofValue(true); + + @Schema(title = "Trust all certificates", description = "Whether to trust all SSL certificates (use with caution)") + @Builder.Default + protected final Property trustAllCertificates = Property.ofValue(false); + + @Schema(title = "Check interval", description = "How frequently to check for new emails") + @Builder.Default + protected final Property interval = Property.ofValue(Duration.ofSeconds(60)); + + protected MailService.MailConfiguration renderMailConfiguration(RunContext runContext) throws Exception { + String rProtocol = String.valueOf(runContext.render(this.protocol).as(MailService.Protocol.class).orElseThrow()); + String rHost = runContext.render(this.host).as(String.class).orElseThrow(); + String rUsername = runContext.render(this.username).as(String.class).orElseThrow(); + String rPassword = runContext.render(this.password).as(String.class).orElseThrow(); + String rFolder = runContext.render(this.folder).as(String.class).orElse("INBOX"); + Boolean rSsl = runContext.render(this.ssl).as(Boolean.class).orElse(true); + Boolean rTrustAllCertificates = runContext.render(this.trustAllCertificates).as(Boolean.class).orElse(false); + Duration rInterval = runContext.render(this.interval).as(Duration.class).orElse(getInterval()); + + Integer rPort = runContext.render(this.port).as(Integer.class) + .orElse(MailService.getDefaultPort(MailService.Protocol.valueOf(rProtocol), rSsl)); + + return new MailService.MailConfiguration(rProtocol, rHost, rPort, rUsername, rPassword, rFolder, rSsl, rTrustAllCertificates, rInterval); + } +} \ No newline at end of file diff --git a/src/main/java/io/kestra/plugin/email/MailExecution.java b/src/main/java/io/kestra/plugin/email/MailExecution.java new file mode 100644 index 0000000..a57b602 --- /dev/null +++ b/src/main/java/io/kestra/plugin/email/MailExecution.java @@ -0,0 +1,82 @@ +package io.kestra.plugin.email; + +import io.kestra.core.models.annotations.Example; +import io.kestra.core.models.annotations.Plugin; +import io.kestra.core.models.property.Property; +import io.kestra.core.models.tasks.VoidOutput; +import io.kestra.core.plugins.notifications.ExecutionInterface; +import io.kestra.core.plugins.notifications.ExecutionService; +import io.kestra.core.runners.RunContext; +import io.swagger.v3.oas.annotations.media.Schema; +import lombok.*; +import lombok.experimental.SuperBuilder; + +import java.util.Map; + +@SuperBuilder +@ToString +@EqualsAndHashCode +@Getter +@NoArgsConstructor +@Schema( + title = "Send an email with the execution information.", + description = "The message will include a link to the execution page in the UI along with the execution ID, namespace, flow name, the start date, duration, and the final status of the execution. If the task failed, then the task that led to the failure is specified.\n\n" + + "Use this notification task only in a flow that has a [Flow trigger](https://kestra.io/docs/administrator-guide/monitoring#alerting), as shown in this example. Don't use this notification task in `errors` tasks. Instead, for `errors` tasks, use the [MailSend](https://kestra.io/plugins/plugin-email/io.kestra.plugin.email.mailsend) task." +) +@Plugin( + examples = { + @Example( + title = "Send an email notification on a failed flow execution", + full = true, + code = """ + id: failure_alert + namespace: company.team + + tasks: + - id: send_alert + type: io.kestra.plugin.email.MailExecution + to: hello@kestra.io + from: hello@kestra.io + subject: "The workflow execution {{trigger.executionId}} failed for the flow {{trigger.flowId}} in the namespace {{trigger.namespace}}" + host: mail.privateemail.com + port: 465 + username: "{{ secret('EMAIL_USERNAME') }}" + password: "{{ secret('EMAIL_PASSWORD') }}" + executionId: "{{ trigger.executionId }}" + + triggers: + - id: failed_prod_workflows + type: io.kestra.plugin.core.trigger.Flow + conditions: + - type: io.kestra.plugin.core.condition.ExecutionStatus + in: + - FAILED + - WARNING + - type: io.kestra.plugin.core.condition.ExecutionNamespace + namespace: prod + prefix: true + """ + ) + }, + aliases = "io.kestra.plugin.notifications.mail.MailExecution" +) +public class MailExecution extends MailTemplate implements ExecutionInterface { + @Builder.Default + private final Property executionId = Property.ofExpression("{{ execution.id }}"); + private Property> customFields; + private Property customMessage; + + @Schema( + hidden = true + ) + protected Property htmlTextContent; + + @Override + public VoidOutput run(RunContext runContext) throws Exception { + this.templateUri = Property.ofValue("mail-template.hbs.peb"); + this.textTemplateUri = Property.ofValue("text-template.hbs.peb"); + this.templateRenderMap = Property.ofValue(ExecutionService.executionMap(runContext, this)); + + return super.run(runContext); + } +} diff --git a/src/main/java/io/kestra/plugin/email/MailReceivedTrigger.java b/src/main/java/io/kestra/plugin/email/MailReceivedTrigger.java new file mode 100644 index 0000000..6d8e17b --- /dev/null +++ b/src/main/java/io/kestra/plugin/email/MailReceivedTrigger.java @@ -0,0 +1,112 @@ +package io.kestra.plugin.email; + +import io.kestra.core.models.annotations.Example; +import io.kestra.core.models.annotations.Plugin; +import io.kestra.core.models.conditions.ConditionContext; +import io.kestra.core.models.executions.Execution; +import io.kestra.core.models.triggers.PollingTriggerInterface; +import io.kestra.core.models.triggers.TriggerContext; +import io.kestra.core.models.triggers.TriggerOutput; +import io.kestra.core.models.triggers.TriggerService; +import io.kestra.core.runners.RunContext; +import io.swagger.v3.oas.annotations.media.Schema; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.ToString; +import lombok.experimental.SuperBuilder; + +import java.time.Duration; +import java.time.ZonedDateTime; +import java.util.Comparator; +import java.util.List; +import java.util.Optional; + +@SuperBuilder +@ToString +@EqualsAndHashCode +@Getter +@NoArgsConstructor +@Schema( + title = "Trigger on new email messages.", + description = "Monitor a mailbox for new emails via IMAP or POP3 protocols." +) +@Plugin( + examples = { + @Example( + title = "Monitor Gmail inbox for new emails", + full = true, + code = """ + id: email_monitor + namespace: company.team + + tasks: + - id: process_email + type: io.kestra.core.tasks.log.Log + message: | + New email received: + Subject: {{ trigger.latestEmail.subject }} + From: {{ trigger.latestEmail.from }} + Date: {{ trigger.latestEmail.date }} + + triggers: + - id: gmail_inbox_trigger + type: io.kestra.plugin.email.MailReceivedTrigger + protocol: IMAP + host: imap.gmail.com + port: 993 + username: "{{ secret('GMAIL_USERNAME') }}" + password: "{{ secret('GMAIL_PASSWORD') }}" + folder: INBOX + interval: PT30S + ssl: true + """ + ) + }, + aliases = "io.kestra.plugin.notifications.mail.MailReceivedTrigger" +) +public class MailReceivedTrigger extends AbstractMailTrigger + implements PollingTriggerInterface, TriggerOutput { + + @Override + public Optional evaluate(ConditionContext conditionContext, TriggerContext context) throws Exception { + RunContext runContext = conditionContext.getRunContext(); + MailService.MailConfiguration mailConfig = renderMailConfiguration(runContext); + + try { + ZonedDateTime lastCheckTime = getLastCheckTime(context,mailConfig.interval); + List newEmails = MailService.fetchNewEmails(runContext, mailConfig.protocol, + mailConfig.host, mailConfig.port, + mailConfig.username, mailConfig.password, mailConfig.folder, mailConfig.ssl, + mailConfig.trustAllCertificates, lastCheckTime); + + if (newEmails.isEmpty()) { + return Optional.empty(); + } + + MailService.EmailData latest = newEmails.stream() + .max(Comparator.comparing(MailService.EmailData::getDate)) + .orElse(newEmails.getFirst()); + + MailService.Output output = MailService.Output.builder() + .latestEmail(latest) + .total(newEmails.size()) + .emails(newEmails) + .build(); + + Execution execution = TriggerService.generateExecution(this, conditionContext, context, output); + return Optional.of(execution); + + } catch (Exception e) { + runContext.logger().error("Error checking for new emails", e); + throw new RuntimeException("Failed to fetch emails: " + e.getMessage(), e); + } + } + + private ZonedDateTime getLastCheckTime(TriggerContext context,Duration interval){ + if(context.getNextExecutionDate()==null){ + return ZonedDateTime.now().minus(getInterval()); + } + return context.getNextExecutionDate().minus(interval); + } +} \ No newline at end of file diff --git a/src/main/java/io/kestra/plugin/email/MailSend.java b/src/main/java/io/kestra/plugin/email/MailSend.java new file mode 100644 index 0000000..02d9351 --- /dev/null +++ b/src/main/java/io/kestra/plugin/email/MailSend.java @@ -0,0 +1,448 @@ +package io.kestra.plugin.email; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import io.kestra.core.models.annotations.Example; +import io.kestra.core.models.annotations.Plugin; +import io.kestra.core.models.property.Property; +import io.kestra.core.models.tasks.RunnableTask; +import io.kestra.core.models.tasks.Task; +import io.kestra.core.models.tasks.VoidOutput; +import io.kestra.core.runners.RunContext; +import io.kestra.core.serializers.JacksonMapper; +import io.swagger.v3.oas.annotations.media.Schema; +import jakarta.mail.util.ByteArrayDataSource; +import jakarta.validation.constraints.NotNull; +import lombok.*; +import lombok.experimental.SuperBuilder; +import org.simplejavamail.api.email.AttachmentResource; +import org.simplejavamail.api.email.Email; +import org.simplejavamail.api.email.EmailPopulatingBuilder; +import org.simplejavamail.api.mailer.Mailer; +import org.simplejavamail.api.mailer.config.TransportStrategy; +import org.simplejavamail.email.EmailBuilder; +import org.simplejavamail.mailer.MailerBuilder; +import org.slf4j.Logger; + +import java.io.InputStream; +import java.net.URI; +import java.util.List; +import java.util.Map; + +import static io.kestra.core.utils.Rethrow.throwFunction; + +@SuperBuilder +@ToString +@EqualsAndHashCode +@Getter +@NoArgsConstructor +@Schema( + title = "Send an automated email from a workflow." +) +@Plugin( + examples = { + @Example( + title = "Send an email on a failed flow execution.", + full = true, + code = """ + id: unreliable_flow + namespace: company.team + + tasks: + - id: fail + type: io.kestra.plugin.scripts.shell.Commands + runner: PROCESS + commands: + - exit 1 + + errors: + - id: send_email + type: io.kestra.plugin.email.MailSend + from: hello@kestra.io + to: hello@kestra.io + username: "{{ secret('EMAIL_USERNAME') }}" + password: "{{ secret('EMAIL_PASSWORD') }}" + host: mail.privateemail.com + port: 465 # or 587 + subject: "Kestra workflow failed for the flow {{flow.id}} in the namespace {{flow.namespace}}" + htmlTextContent: "Failure alert for flow {{ flow.namespace }}.{{ flow.id }} with ID {{ execution.id }}" + """ + ), + @Example( + title = "Send an email with attachments.", + full = true, + code = """ + id: send_email + namespace: company.team + + inputs: + - id: attachments + type: ARRAY + itemType: JSON + + tasks: + - id: send_email + type: io.kestra.plugin.email.MailSend + from: hello@kestra.io + to: hello@kestra.io + attachments: {{ inputs.attachments | toJson }} + """ + ), + @Example( + title = "Send an email with an embedded image.", + full = true, + code = """ + id: send_email + namespace: company.team + + inputs: + - id: embedded_image_uri + type: STRING + + tasks: + - id: send_email + type: io.kestra.plugin.email.MailSend + from: hello@kestra.io + to: hello@kestra.io + embeddedImages: + - name: kestra.png + uri: "{{ inputs.embedded_image_uri }}" + contentType: image/png + """ + ), + @Example( + title = "Export Kestra audit logs to a CSV file and send it by email.", + full = true, + code = """ + id: export_audit_logs_csv + namespace: company.team + + tasks: + - id: ship_audit_logs + type: "io.kestra.plugin.ee.core.log.AuditLogShipper" + lookbackPeriod: P1D + logExporters: + - id: file + type: io.kestra.plugin.ee.core.log.FileLogExporter + + - id: convert_to_csv + type: "io.kestra.plugin.serdes.csv.IonToCsv" + from: "{{ outputs.ship_audit_logs.outputs.file.uris | first }}" + + - id: send_email + type: io.kestra.plugin.email.MailSend + from: hello@kestra.io + to: hello@kestra.io + username: "{{ secret('EMAIL_USERNAME') }}" + password: "{{ secret('EMAIL_PASSWORD') }}" + host: mail.privateemail.com + port: 465 # or 587 + subject: "Weekly Kestra Audit Logs CSV Export" + htmlTextContent: "Weekly Kestra Audit Logs CSV Export" + attachments: + - name: audit_logs.csv + uri: "{{ outputs.convert_to_csv.uri }}" + contentType: text/csv + + triggers: + - id: schedule + type: io.kestra.plugin.core.trigger.Schedule + cron: 0 10 * * 5 + """ + ), + @Example( + title = "Send an email using an internal mail server with self-signed certificate and specific trusted hosts.", + full = true, + code = """ + id: send_email_internal + namespace: company.team + + tasks: + - id: send_email + type: io.kestra.plugin.email.MailSend + from: noreply@company.local + to: admin@company.local + username: "{{ secret('INTERNAL_SMTP_USER') }}" + password: "{{ secret('INTERNAL_SMTP_PASSWORD') }}" + host: mail.company.local + port: 587 + transportStrategy: SMTP_TLS + subject: "Internal notification" + htmlTextContent: "This email was sent from an internal mail server" + verifyServerIdentity: false + trustedHosts: + - mail.company.local + - smtp.company.local + - 192.168.1.100 + """ + ) + }, + aliases = "io.kestra.plugin.notifications.mail.MailSend" +) +public class MailSend extends Task implements RunnableTask { + /* Server info */ + @Schema( + title = "The email server host" + ) + protected Property host; + + @Schema( + title = "The email server port" + ) + private Property port; + + @Schema( + title = "The email server username" + ) + protected Property username; + + @Schema( + title = "The email server password" + ) + protected Property password; + + @Schema( + title = "The optional transport strategy", + description = "Will default to SMTPS if left empty" + ) + @Builder.Default + private final Property transportStrategy = Property.ofValue(TransportStrategy.SMTPS); + + @Schema( + title = "Integer value in milliseconds. Default is 10000 milliseconds, i.e. 10 seconds", + description = "It controls the maximum timeout value when sending emails." + ) + @Builder.Default + private final Property sessionTimeout = Property.ofValue(10000); + + @Schema( + title = "Whether to verify the server identity", + description = "Will default to true if left empty" + ) + @Builder.Default + private final Property verifyServerIdentity = Property.ofValue(true); + + @Schema( + title = "Trusted SSL/TLS hosts", + description = "If provided, only the specified hosts will be trusted for SSL/TLS connections" + ) + private Property> trustedHosts; + + /* Mail info */ + @Schema( + title = "The address of the sender of this email" + ) + protected Property from; + + @Schema( + title = "Email address(es) of the recipient(s). Use semicolon as delimiter to provide several email addresses.", + description = "Note that each email address must be compliant with the RFC2822 format." + ) + protected Property to; + + @Schema( + title = "One or more 'Cc' (carbon copy) optional recipient email address. Use semicolon as delimiter to provide several addresses.", + description = "Note that each email address must be compliant with the RFC2822 format." + ) + protected Property cc; + + @Schema( + title = "The optional subject of this email" + ) + protected Property subject; + + @Schema( + title = "The optional email message body in HTML text", + description = "Both text and HTML can be provided; either will be offered to the email client as alternative content. " + + "Email clients that support it, will favor HTML over plain text and ignore the text body completely." + ) + protected Property htmlTextContent; + + @Schema( + title = "The optional email message body in plain text", + description = "Both text and HTML can be provided; either will be offered to the email client as alternative content. " + + "Email clients that support it, will favor HTML over plain text and ignore the text body completely." + ) + protected Property plainTextContent; + + @Schema( + title = "Adds an attachment to the email message", + description = "The attachment will be shown in the email client as separate files available for download or display. " + + "Inline if the client supports it (for example, most browsers display PDF's in a popup window).", + anyOf = {List.class, String.class} // Can be a List or a String like "{{ inputs.attachments | toJson }})" + ) + private Property attachments; + + @Schema( + title = "Adds image data to this email that can be referred to from the email HTML body.", + description = "The provided images are assumed to be of MIME type png, jpg, or whatever the email client supports as valid image that can be embedded in HTML content.", + anyOf = {List.class, String.class} // Can be a List or a String like "{{ inputs.attachments | toJson }})" + ) + private Property embeddedImages; + + @Override + public VoidOutput run(RunContext runContext) throws Exception { + Thread.currentThread().setContextClassLoader(getClass().getClassLoader()); + + Logger logger = runContext.logger(); + + logger.debug("Sending an email to {}", to); + + final String htmlContent = runContext.render(this.htmlTextContent).as(String.class).orElse(null); + final String textContent = runContext.render(this.plainTextContent).as(String.class) + .orElse("Please view this email in a modern email client"); + + EmailPopulatingBuilder builder = EmailBuilder.startingBlank() + .to(runContext.render(to).as(String.class).orElseThrow()) + .from(runContext.render(from).as(String.class).orElseThrow()) + .withSubject(runContext.render(subject).as(String.class).orElse(null)) + .withHTMLText(htmlContent) + .withPlainText(textContent) + .withReturnReceiptTo(); + + var renderedAttachments = runContext.render(attachments).as(Object.class).orElse(""); + var attachmentsList = getAttachments(renderedAttachments); + + if (!attachmentsList.isEmpty()) { + builder.withAttachments(this.attachmentResources(attachmentsList, runContext)); + } + + var renderedEmbeddedImages = runContext.render(embeddedImages).as(Object.class).orElse(""); + var embeddedImagesList = getAttachments(renderedEmbeddedImages); + + if (!embeddedImagesList.isEmpty()) { + builder.withEmbeddedImages(this.attachmentResources(embeddedImagesList, runContext)); + } + + runContext.render(cc).as(String.class).ifPresent(builder::cc); + + Email email = builder.buildEmail(); + + var rTrustedHosts = runContext.render(trustedHosts).asList(String.class); + var mailerBuilder = MailerBuilder + .withSMTPServer( + runContext.render(this.host).as(String.class).orElse(null), + runContext.render(this.port).as(Integer.class).orElse(null), + runContext.render(this.username).as(String.class).orElse(null), + runContext.render(this.password).as(String.class).orElse(null) + ) + .withTransportStrategy(runContext.render(transportStrategy).as(TransportStrategy.class).orElse(TransportStrategy.SMTPS)) + .withSessionTimeout(runContext.render(sessionTimeout).as(Integer.class).orElse(10000)) + .verifyingServerIdentity(runContext.render(verifyServerIdentity).as(Boolean.class).orElse(true)); + + if (!rTrustedHosts.isEmpty()) { + mailerBuilder = mailerBuilder + .trustingAllHosts(false) + .trustingSSLHosts(rTrustedHosts.toArray(new String[0])); + } + + try (Mailer mailer = mailerBuilder.buildMailer()) { + mailer.sendMail(email); + } + + return null; + } + + private List attachmentResources(List attachments, RunContext runContext) throws Exception { + return attachments + .stream() + .map(throwFunction(attachment -> { + InputStream inputStream = runContext.storage() + .getFile(URI.create(runContext.render(attachment.getUri()).as(String.class).orElseThrow())); + + return new AttachmentResource( + runContext.render(attachment.getName()).as(String.class).orElseThrow(), + new ByteArrayDataSource(inputStream, runContext.render(attachment.getContentType()).as(String.class).orElseThrow()) + ); + })) + .toList(); + } + + private List getAttachments(Object attachments) throws JsonProcessingException { + switch (attachments) { + case null -> { + return List.of(); + } + + case List list -> { + if (list.isEmpty()) return List.of(); + + if (list.getFirst() instanceof Attachment) { + @SuppressWarnings("unchecked") + List typed = (List) list; + return typed; + } else { + @SuppressWarnings("unchecked") + List> items = (List>) list; + return toAttachments(items); + } + } + + case String content -> { + String trimmed = content.trim(); + if (trimmed.isEmpty()) return List.of(); + + if (trimmed.startsWith("[") || trimmed.startsWith("{")) { + return parseJsonAttachmentString(trimmed); + } + + String innerJson = JacksonMapper.ofJson().readValue(trimmed, String.class); + return parseJsonAttachmentString(innerJson); + } + default -> { + } + } + + throw new IllegalArgumentException("The `attachments` attribute must be a String or a List"); + } + + private List parseJsonAttachmentString(String json) throws JsonProcessingException { + String t = json.trim(); + if (t.startsWith("[")) { + List> items = JacksonMapper.ofJson().readValue(t, new TypeReference<>() { + }); + return toAttachments(items); + } else if (t.startsWith("{")) { + Map item = JacksonMapper.ofJson().readValue(t, new TypeReference<>() { + }); + return toAttachments(List.of(item)); + } else { + return List.of(); + } + } + + private static List toAttachments(List> items) { + return items.stream() + .map(item -> Attachment.builder() + .name(Property.ofValue((String) item.get("name"))) + .uri(Property.ofValue((String) item.get("uri"))) + .contentType(Property.ofValue((String) item.getOrDefault("contentType", "application/octet-stream"))) + .build()) + .toList(); + } + + @Getter + @Builder + public static class Attachment { + @Schema( + title = "An attachment URI from Kestra internal storage" + ) + @NotNull + private Property uri; + + @Schema( + title = "The name of the attachment (e.g., 'filename.txt')" + ) + @NotNull + private Property name; + + @Schema( + title = "The media type or MIME (Multipurpose Internet Mail Extensions) type of the resource being sent", + description = "For example, 'text/plain', 'image/png', 'application/pdf', `text/csv`, etc. " + + "If not provided, it will default to 'application/octet-stream'." + ) + @NotNull + @Builder.Default + private Property contentType = Property.ofValue("application/octet-stream"); + } +} diff --git a/src/main/java/io/kestra/plugin/email/MailService.java b/src/main/java/io/kestra/plugin/email/MailService.java new file mode 100644 index 0000000..9b18323 --- /dev/null +++ b/src/main/java/io/kestra/plugin/email/MailService.java @@ -0,0 +1,328 @@ +package io.kestra.plugin.email; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import io.kestra.core.runners.RunContext; +import io.swagger.v3.oas.annotations.media.Schema; +import jakarta.mail.*; +import jakarta.mail.internet.InternetAddress; +import jakarta.mail.internet.MimeMessage; +import jakarta.mail.internet.MimeMultipart; +import lombok.Builder; +import lombok.Getter; +import lombok.experimental.UtilityClass; + +import java.io.IOException; +import java.time.Duration; +import java.time.ZonedDateTime; +import java.util.*; + +@UtilityClass +public class MailService { + + public enum Protocol { + IMAP, + POP3 + } + + @Builder + @Getter + @JsonIgnoreProperties(ignoreUnknown = true) + public static class Output implements io.kestra.core.models.tasks.Output { + @Schema(title = "Latest email received", description = "The most recent email that triggered this execution") + private final EmailData latestEmail; + + @Schema(title = "Total number of new emails found") + private final Integer total; + + @Schema(title = "All new emails found") + private final List emails; + } + + @Builder + @Getter + @JsonIgnoreProperties(ignoreUnknown = true) + public static class EmailData implements io.kestra.core.models.tasks.Output { + @Schema(title = "Email subject") + private final String subject; + + @Schema(title = "Sender email address") + private final String from; + + @Schema(title = "Recipient email addresses") + private final List to; + + @Schema(title = "CC email addresses") + private final List cc; + + @Schema(title = "BCC email addresses") + private final List bcc; + + @Schema(title = "Email date") + private final ZonedDateTime date; + + @Schema(title = "Email body content") + private final String body; + + @Schema(title = "Message ID") + private final String messageId; + + @Schema(title = "Email attachments") + private final List attachments; + } + + @Builder + @Getter + @JsonIgnoreProperties(ignoreUnknown = true) + public static class AttachmentInfo { + @Schema(title = "Attachment filename") + private final String filename; + + @Schema(title = "Content type") + private final String contentType; + + @Schema(title = "File size in bytes") + private final Integer size; + } + + @Builder + @Getter + public static class MailConfiguration { + public final String protocol; + public final String host; + public final Integer port; + public final String username; + public final String password; + public final String folder; + public final Boolean ssl; + public final Boolean trustAllCertificates; + public final Duration interval; + } + + public static Properties setupMailProperties(String protocol, String host, Integer port, Boolean ssl, + Boolean trustAllCertificates, RunContext runContext) { + Properties props = new Properties(); + String protocolName = getProtocolName(protocol, ssl); + + props.put("mail.store.protocol", protocolName); + props.put("mail." + protocolName + ".host", host); + props.put("mail." + protocolName + ".port", port.toString()); + props.put("mail." + protocolName + ".auth", "true"); + + if (ssl) { + props.put("mail." + protocolName + ".ssl.enable", "true"); + props.put("mail." + protocolName + ".ssl.protocols", "TLSv1.2"); + } + + if (trustAllCertificates) { + props.put("mail." + protocolName + ".ssl.trust", "*"); + props.put("mail." + protocolName + ".ssl.checkserveridentity", "false"); + } + + if (runContext.logger().isDebugEnabled()) { + props.put("mail.debug", "true"); + } + + return props; + } + + public static String getProtocolName(String protocol, Boolean ssl) { + if (protocol.equals("IMAP")) { + return ssl ? "imaps" : "imap"; + } + return ssl ? "pop3s" : "pop3"; + } + + public static void connectToStore(Store store, String host, Integer port, String username, + String password, RunContext runContext) throws MessagingException { + try { + store.connect(host, port, username, password); + } catch (MessagingException e) { + store.connect(username, password); + } + runContext.logger().info("Connected to {}:{}", host, port); + } + + public static Integer getDefaultPort(Protocol protocol, Boolean ssl) { + return switch (protocol) { + case IMAP -> ssl ? 993 : 143; + case POP3 -> ssl ? 995 : 110; + }; + } + + public static EmailData parseEmailData(MimeMessage message) throws MessagingException, IOException { + Date receivedDate = message.getReceivedDate() != null ? message.getReceivedDate() : message.getSentDate(); + ZonedDateTime date = receivedDate != null + ? ZonedDateTime.ofInstant(receivedDate.toInstant(), ZonedDateTime.now().getZone()) + : ZonedDateTime.now(); + + return EmailData.builder() + .subject(message.getSubject()) + .from(getAddressString(message.getFrom())) + .to(getAddressList(message.getRecipients(Message.RecipientType.TO))) + .cc(getAddressList(message.getRecipients(Message.RecipientType.CC))) + .bcc(getAddressList(message.getRecipients(Message.RecipientType.BCC))) + .date(date) + .body(extractTextContent(message)) + .messageId(message.getMessageID()) + .attachments(extractAttachments(message)) + .build(); + } + + private static String getAddressString(Address[] addresses) { + if (addresses == null || addresses.length == 0) { + return null; + } + return ((InternetAddress) addresses[0]).getAddress(); + } + + private static List getAddressList(Address[] addresses) { + if (addresses == null || addresses.length == 0) { + return Collections.emptyList(); + } + return Arrays.stream(addresses) + .map(addr -> ((InternetAddress) addr).getAddress()) + .toList(); + } + + private static String extractTextContent(Message message) throws MessagingException, IOException { + if (message.isMimeType("multipart/*")) { + MimeMultipart multipart = (MimeMultipart) message.getContent(); + return extractTextFromMultipart(multipart); + } + return (String) message.getContent(); + } + + private static String extractTextFromMultipart(MimeMultipart multipart) throws MessagingException, IOException { + StringBuilder result = new StringBuilder(); + int count = multipart.getCount(); + + for (int i = 0; i < count; i++) { + BodyPart bodyPart = multipart.getBodyPart(i); + if (bodyPart.isMimeType("multipart/*")) { + result.append(extractTextFromMultipart((MimeMultipart) bodyPart.getContent())); + } else { + result.append(bodyPart.getContent().toString()); + } + } + + return result.toString(); + } + + private static List extractAttachments(Message message) throws MessagingException, IOException { + List attachments = new ArrayList<>(); + + if (message.isMimeType("multipart/*")) { + MimeMultipart multipart = (MimeMultipart) message.getContent(); + extractAttachmentsFromMultipart(multipart, attachments); + } + + return attachments; + } + + private static void extractAttachmentsFromMultipart(MimeMultipart multipart, List attachments) + throws MessagingException, IOException { + int count = multipart.getCount(); + + for (int i = 0; i < count; i++) { + BodyPart bodyPart = multipart.getBodyPart(i); + + if (Part.ATTACHMENT.equalsIgnoreCase(bodyPart.getDisposition()) || + (bodyPart.getFileName() != null && !bodyPart.getFileName().isEmpty())) { + + AttachmentInfo attachment = AttachmentInfo.builder() + .filename(bodyPart.getFileName()) + .contentType(bodyPart.getContentType()) + .size(bodyPart.getSize()) + .build(); + + attachments.add(attachment); + } else if (bodyPart.isMimeType("multipart/*")) { + extractAttachmentsFromMultipart((MimeMultipart) bodyPart.getContent(), attachments); + } + } + } + + public static List fetchNewEmails(RunContext runContext, String protocol, String host, Integer port, + String username, String password, String folder, Boolean ssl, Boolean trustAllCertificates, + ZonedDateTime lastCheckTime) throws MessagingException, IOException { + + Properties props = setupMailProperties(protocol, host, port, ssl, trustAllCertificates, runContext); + String protocolName = getProtocolName(protocol, ssl); + Session session = Session.getInstance(props, null); + Store store = session.getStore(protocolName); + + try { + connectToStore(store, host, port, username, password, runContext); + return processMessages(store, folder, lastCheckTime, runContext); + } finally { + if (store.isConnected()) { + try { + store.close(); + } catch (MessagingException e) { + runContext.logger().warn("Failed to close mail store", e); + } + } + } + } + + private static List processMessages(Store store, String folder, ZonedDateTime lastCheckTime, + RunContext runContext) throws MessagingException, IOException { + List newEmails = new ArrayList<>(); + Folder mailFolder = store.getFolder(folder); + try{ + mailFolder.open(Folder.READ_ONLY); + + int messageCount = mailFolder.getMessageCount(); + if (messageCount == 0) { + runContext.logger().info("No messages found in folder: {}", folder); + return Collections.emptyList(); + } + + runContext.logger().info("Checking for emails newer than: {}", lastCheckTime); + + int messagesToCheck = Math.min(messageCount, 10); + Message[] messages = mailFolder.getMessages(messageCount - messagesToCheck + 1, messageCount); + + runContext.logger().info("Checking {} messages out of {} total", messagesToCheck, messageCount); + + for (Message message : messages) { + if (message instanceof MimeMessage mimeMessage) { + Date receivedDate = message.getReceivedDate() != null ? message.getReceivedDate() + : message.getSentDate(); + + if (receivedDate != null) { + ZonedDateTime messageDate = ZonedDateTime.ofInstant(receivedDate.toInstant(), + lastCheckTime.getZone()); + + runContext.logger().debug("Message date: {}, Last check: {}, Is newer: {}", + messageDate, lastCheckTime, messageDate.isAfter(lastCheckTime)); + + if (messageDate.isAfter(lastCheckTime)) { + EmailData emailData = parseEmailData(mimeMessage); + if (emailData != null) { + newEmails.add(emailData); + runContext.logger().info("New email - Subject: '{}', From: '{}', Body: '{}'", + emailData.getSubject(), emailData.getFrom(), + emailData.getBody().length() > 100 ? emailData.getBody().substring(0, 100) + "..." + : emailData.getBody()); + } + } + } else { + runContext.logger().debug("Message has no received date or sent date."); + } + } + } + } finally { + if (mailFolder.isOpen()) { + try { + mailFolder.close(false); + } catch (MessagingException e) { + runContext.logger().warn("Failed to close mail folder", e); + } + } + } + + runContext.logger().info("Found {} new emails", newEmails.size()); + return newEmails; + } +} \ No newline at end of file diff --git a/src/main/java/io/kestra/plugin/email/MailTemplate.java b/src/main/java/io/kestra/plugin/email/MailTemplate.java new file mode 100644 index 0000000..b910b7c --- /dev/null +++ b/src/main/java/io/kestra/plugin/email/MailTemplate.java @@ -0,0 +1,67 @@ +package io.kestra.plugin.email; + + +import io.kestra.core.models.property.Property; +import io.kestra.core.models.tasks.VoidOutput; +import io.kestra.core.runners.RunContext; +import io.swagger.v3.oas.annotations.media.Schema; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.ToString; +import lombok.experimental.SuperBuilder; +import org.apache.commons.io.IOUtils; + +import java.nio.charset.StandardCharsets; +import java.util.Map; +import java.util.Objects; + +@SuperBuilder +@ToString +@EqualsAndHashCode +@Getter +@NoArgsConstructor +public abstract class MailTemplate extends MailSend { + @Schema( + title = "Template to use", + hidden = true + ) + protected Property templateUri; + + @Schema( + title = "Text template to use", + hidden = true + ) + protected Property textTemplateUri; + + @Schema( + title = "Map of variables to use for the message template" + ) + protected Property> templateRenderMap; + + @Override + public VoidOutput run(RunContext runContext) throws Exception { + String htmlTextTemplate = ""; + String plainTextTemplate = ""; + + final var renderedTemplateUri = runContext.render(this.templateUri).as(String.class); + if (renderedTemplateUri.isPresent()) { + htmlTextTemplate = IOUtils.toString( + Objects.requireNonNull(this.getClass().getClassLoader().getResourceAsStream(renderedTemplateUri.get())), + StandardCharsets.UTF_8 + ); + } + + if (runContext.render(this.textTemplateUri).as(String.class).isPresent()) { + plainTextTemplate = IOUtils.toString( + Objects.requireNonNull(this.getClass().getClassLoader().getResourceAsStream(runContext.render(this.textTemplateUri).as(String.class).get())), + StandardCharsets.UTF_8 + ); + } + + this.htmlTextContent = Property.ofValue(runContext.render(htmlTextTemplate, runContext.render(templateRenderMap).asMap(String.class, Object.class))); + this.plainTextContent = Property.ofValue(runContext.render(plainTextTemplate, runContext.render(templateRenderMap).asMap(String.class, Object.class))); + + return super.run(runContext); + } +} diff --git a/src/main/java/io/kestra/plugin/email/RealTimeTrigger.java b/src/main/java/io/kestra/plugin/email/RealTimeTrigger.java new file mode 100644 index 0000000..ca9fa88 --- /dev/null +++ b/src/main/java/io/kestra/plugin/email/RealTimeTrigger.java @@ -0,0 +1,348 @@ +package io.kestra.plugin.email; + +import io.kestra.core.models.annotations.Example; +import io.kestra.core.models.annotations.Plugin; +import io.kestra.core.models.conditions.ConditionContext; +import io.kestra.core.models.executions.Execution; +import io.kestra.core.models.triggers.RealtimeTriggerInterface; +import io.kestra.core.models.triggers.TriggerContext; +import io.kestra.core.models.triggers.TriggerOutput; +import io.kestra.core.models.triggers.TriggerService; +import io.kestra.core.runners.RunContext; +import io.swagger.v3.oas.annotations.media.Schema; +import jakarta.mail.*; +import jakarta.mail.event.MessageCountEvent; +import jakarta.mail.event.MessageCountListener; +import jakarta.mail.internet.MimeMessage; +import lombok.*; +import lombok.experimental.SuperBuilder; +import org.eclipse.angus.mail.imap.IMAPFolder; +import org.reactivestreams.Publisher; +import reactor.core.publisher.Flux; +import reactor.core.publisher.FluxSink; + +import java.time.Duration; +import java.time.ZonedDateTime; +import java.util.List; +import java.util.Objects; +import java.util.Properties; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +@SuperBuilder +@ToString +@EqualsAndHashCode +@Getter +@NoArgsConstructor +@Schema( + title = "Trigger a flow when an email is received in real-time.", + description = """ + Monitor a mailbox for new emails via IMAP or POP3 protocols and create one execution per email received. + For IMAP, uses the IDLE command for true real-time monitoring. + For POP3, uses polling. + If you would like to process multiple emails in batch, use the MailReceivedTrigger instead. + """ +) +@Plugin( + examples = { + @Example( + title = "Monitor Gmail inbox for new emails in real-time", + full = true, + code = """ + id: realtime_email_monitor + namespace: company.team + + tasks: + - id: process_email + type: io.kestra.core.tasks.log.Log + message: | + Real-time email received: + Subject: {{ trigger.subject }} + From: {{ trigger.from }} + Date: {{ trigger.date }} + Body: {{ trigger.body }} + + triggers: + - id: realtime_gmail_trigger + type: io.kestra.plugin.email.RealTimeTrigger + protocol: IMAP + host: imap.gmail.com + port: 993 + username: "{{ secret('GMAIL_USERNAME') }}" + password: "{{ secret('GMAIL_PASSWORD') }}" + folder: INBOX + ssl: true + """ + ), + @Example( + title = "Monitor POP3 mailbox in real-time", + code = """ + triggers: + - id: realtime_pop3_trigger + type: io.kestra.plugin.email.RealTimeTrigger + protocol: POP3 + host: pop.example.com + port: 995 + username: "{{ secret('EMAIL_USERNAME') }}" + password: "{{ secret('EMAIL_PASSWORD') }}" + ssl: true + interval: PT30S + """ + ) + }, + aliases = "io.kestra.plugin.notifications.mail.RealTimeTrigger" +) +public class RealTimeTrigger extends AbstractMailTrigger + implements RealtimeTriggerInterface, TriggerOutput { + + @Builder.Default + @Getter(AccessLevel.NONE) + private final AtomicBoolean isActive = new AtomicBoolean(true); + + @Builder.Default + @Getter(AccessLevel.NONE) + private final CountDownLatch waitForTermination = new CountDownLatch(1); + + @Builder.Default + @Getter(AccessLevel.NONE) + private final AtomicReference activeStore = new AtomicReference<>(); + + @Builder.Default + @Getter(AccessLevel.NONE) + private final AtomicReference activeFolder = new AtomicReference<>(); + + @Builder.Default + @Getter(AccessLevel.NONE) + private final AtomicReference lastFetched = new AtomicReference<>(); + + @Override + public Publisher evaluate(ConditionContext conditionContext, TriggerContext context) throws Exception { + RunContext runContext = conditionContext.getRunContext(); + MailService.MailConfiguration mailConfig = renderMailConfiguration(runContext); + + runContext.logger().info("Starting real-time email monitoring using {} protocol on {}:{}", + mailConfig.protocol, mailConfig.host, mailConfig.port); + + return createRealtimeEmailStream(runContext, mailConfig) + .map(emailData -> { + runContext.logger().info("Real-time trigger: New email from '{}' with subject '{}'", + emailData.getFrom(), emailData.getSubject()); + return TriggerService.generateRealtimeExecution(this, conditionContext, context, emailData); + }) + .onErrorContinue( + (throwable, o) -> runContext.logger().error("Error in real-time email stream", throwable)) + .doFinally(signalType -> { + runContext.logger().info("Email stream finished with signal: {}", signalType); + this.waitForTermination.countDown(); + }); + } + + private Flux createRealtimeEmailStream(RunContext runContext, MailService.MailConfiguration config) { + if ("IMAP".equals(config.protocol)) { + return createImapIdleStream(runContext, config); + } + + return createPop3PollingStream(runContext, config); + } + + private Flux createImapIdleStream(RunContext runContext, MailService.MailConfiguration config) { + return Flux.create(sink -> { + Store store = null; + IMAPFolder Imapfolder = null; + + try { + Properties props = MailService.setupMailProperties(config.protocol, config.host, config.port, + config.ssl, config.trustAllCertificates, runContext); + Session session = Session.getInstance(props, null); + store = session.getStore(MailService.getProtocolName(config.protocol, config.ssl)); + + MailService.connectToStore(store, config.host, config.port, config.username, config.password, + runContext); + Imapfolder = (IMAPFolder) store.getFolder(config.folder); + Imapfolder.open(Folder.READ_ONLY); + + // Store references for cleanup + activeStore.set(store); + activeFolder.set(Imapfolder); + + runContext.logger().info("Connected to {}:{}", config.host, config.port); + runContext.logger().info("Starting IMAP IDLE monitoring on folder: {}", config.folder); + + final IMAPFolder folder = Imapfolder; + folder.addMessageCountListener(new MessageCountListener() { + @Override + public void messagesAdded(MessageCountEvent e) { + try { + for (Message message : e.getMessages()) { + if (!isActive.get()) + break; + + if (message instanceof MimeMessage mimeMessage) { + MailService.EmailData emailData = MailService.parseEmailData(mimeMessage); + if (emailData != null) { + runContext.logger().info("IMAP IDLE: New email - Subject: '{}', From: '{}'", + emailData.getSubject(), emailData.getFrom()); + sink.next(emailData); + } + } + } + } catch (Exception ex) { + if (isActive.get()) { + runContext.logger().error("Error processing new messages", ex); + } + } + } + + @Override + public void messagesRemoved(MessageCountEvent e) {} + }); + + while (isActive.get() && folder.isOpen()) { + try { + folder.idle(); + } catch (Exception e) { + if (isActive.get()) { + runContext.logger().error("IMAP IDLE error", e); + sink.error(e); + break; + } + } + + // Check if thread was interrupted + if (Thread.currentThread().isInterrupted()) { + isActive.set(false); + break; + } + } + + } catch (Exception e) { + if (isActive.get()) { + sink.error(e); + } + } finally { + sink.complete(); + cleanupImapResources(runContext, store, Imapfolder); + } + + }, FluxSink.OverflowStrategy.BUFFER); + } + + private void cleanupImapResources(RunContext runContext, Store store, IMAPFolder folder) { + try { + runContext.logger().info("Cleaning up IMAP resources"); + + if (folder != null && folder.isOpen()) { + try { + folder.close(false); + runContext.logger().debug("IMAP folder closed"); + } catch (Exception e) { + runContext.logger().warn("Error closing IMAP folder", e); + } + } + + if (store != null && store.isConnected()) { + try { + store.close(); + runContext.logger().debug("IMAP store closed"); + } catch (Exception e) { + runContext.logger().warn("Error closing IMAP store", e); + } + } + } catch (Exception e) { + runContext.logger().warn("Error during IMAP cleanup", e); + } + } + + private Flux createPop3PollingStream(RunContext runContext, MailService.MailConfiguration config) { + + if (lastFetched.get() == null) { + lastFetched.set(ZonedDateTime.now().minus(getInterval())); + runContext.logger().info("Initialized POP3 polling with lastFetched: {}", lastFetched.get()); + } + + return Flux.interval(Duration.ZERO, config.interval) + .takeWhile(tick -> isActive.get()) + .doOnNext(tick -> runContext.logger().info("POP3 polling cycle: {}", tick)) + .flatMap(tick -> { + try { + if (!isActive.get()) { + return Flux.empty(); + } + + ZonedDateTime currentLastFetched = lastFetched.get(); + runContext.logger().info("POP3 polling: checking for emails after {}", currentLastFetched); + + List newEmails = MailService.fetchNewEmails(runContext, config.protocol, + config.host, config.port, config.username, config.password, config.folder, + config.ssl, config.trustAllCertificates, currentLastFetched); + + if (!newEmails.isEmpty()) { + ZonedDateTime latestEmailDate = newEmails.stream() + .map(MailService.EmailData::getDate) + .filter(Objects::nonNull) + .max(ZonedDateTime::compareTo) + .orElse(currentLastFetched); + + lastFetched.set(latestEmailDate); + + runContext.logger().info("POP3 polling: found {} new emails, updated lastFetched to {}", + newEmails.size(), latestEmailDate); + } else { + runContext.logger().info("POP3 polling: no new emails found"); + } + + return Flux.fromIterable(newEmails); + } catch (Exception e) { + if (isActive.get()) { + runContext.logger().error("Error in POP3 polling", e); + } + return Flux.empty(); + } + }, 1); + } + + @Override + public void kill() { + stop(true); + } + + @Override + public void stop() { + stop(false); // must be non-blocking + } + + private void stop(boolean wait) { + if (!isActive.compareAndSet(true, false)) { + return; + } + + Folder folder = activeFolder.get(); + if (folder != null) { + try { + if (folder.isOpen()) { + folder.close(false); + } + } catch (Exception ignored) { + } + } + + Store store = activeStore.get(); + if (store != null) { + try { + if (store.isConnected()) { + store.close(); + } + } catch (Exception ignored) { + } + } + + if (wait) { + try { + this.waitForTermination.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + } +} \ No newline at end of file diff --git a/src/main/java/io/kestra/plugin/email/package-info.java b/src/main/java/io/kestra/plugin/email/package-info.java new file mode 100644 index 0000000..681ef5f --- /dev/null +++ b/src/main/java/io/kestra/plugin/email/package-info.java @@ -0,0 +1,7 @@ +@PluginSubGroup( + description = "This subgroup of plugins contains tasks for email notifications.", + categories = PluginSubGroup.PluginCategory.ALERTING +) +package io.kestra.plugin.email; + +import io.kestra.core.models.annotations.PluginSubGroup; \ No newline at end of file diff --git a/src/main/java/io/kestra/plugin/templates/Example.java b/src/main/java/io/kestra/plugin/templates/Example.java deleted file mode 100644 index a760828..0000000 --- a/src/main/java/io/kestra/plugin/templates/Example.java +++ /dev/null @@ -1,72 +0,0 @@ -package io.kestra.plugin.templates; - -import io.kestra.core.models.annotations.Plugin; -import io.kestra.core.models.property.Property; -import io.swagger.v3.oas.annotations.media.Schema; -import lombok.*; -import lombok.experimental.SuperBuilder; -import org.apache.commons.lang3.StringUtils; -import io.kestra.core.models.tasks.RunnableTask; -import io.kestra.core.models.tasks.Task; -import io.kestra.core.runners.RunContext; -import org.slf4j.Logger; - -@SuperBuilder -@ToString -@EqualsAndHashCode -@Getter -@NoArgsConstructor -@Schema( - title = "Short description for this task", - description = "Full description of this task" -) -@Plugin( - examples = { - @io.kestra.core.models.annotations.Example( - title = "Simple revert", - code = { "format: \"Text to be reverted\"" } - ) - } -) -public class Example extends Task implements RunnableTask { - @Schema( - title = "Short description for this input", - description = "Full description of this input" - ) - private Property format; - - @Override - public Example.Output run(RunContext runContext) throws Exception { - Logger logger = runContext.logger(); - - String render = runContext.render(format).as(String.class).orElse(""); - logger.debug(render); - - return Output.builder() - .child(new OutputChild(StringUtils.reverse(render))) - .build(); - } - - /** - * Input or Output can be nested as you need - */ - @Builder - @Getter - public static class Output implements io.kestra.core.models.tasks.Output { - @Schema( - title = "Short description for this output", - description = "Full description of this output" - ) - private final OutputChild child; - } - - @Builder - @Getter - public static class OutputChild implements io.kestra.core.models.tasks.Output { - @Schema( - title = "Short description for this output", - description = "Full description of this output" - ) - private final String value; - } -} diff --git a/src/main/java/io/kestra/plugin/templates/Trigger.java b/src/main/java/io/kestra/plugin/templates/Trigger.java deleted file mode 100644 index 113a031..0000000 --- a/src/main/java/io/kestra/plugin/templates/Trigger.java +++ /dev/null @@ -1,59 +0,0 @@ -package io.kestra.plugin.templates; - -import io.kestra.core.exceptions.IllegalVariableEvaluationException; -import io.kestra.core.models.annotations.Plugin; -import io.kestra.core.models.conditions.ConditionContext; -import io.kestra.core.models.executions.Execution; -import io.kestra.core.models.property.Property; -import io.kestra.core.models.triggers.*; -import io.kestra.core.runners.RunContext; -import io.swagger.v3.oas.annotations.media.Schema; -import lombok.*; -import lombok.experimental.SuperBuilder; - -import java.time.Duration; -import java.util.Optional; - -@SuperBuilder -@ToString -@EqualsAndHashCode -@Getter -@NoArgsConstructor -@Plugin -@Schema( - title = "Trigger an execution randomly", - description ="Trigger an execution randomly" -) -public class Trigger extends AbstractTrigger implements PollingTriggerInterface, TriggerOutput { - @Builder.Default - private final Duration interval = Duration.ofSeconds(60); - - @Builder.Default - protected Property min = Property.ofValue(0.5); - - @Override - public Optional evaluate(ConditionContext conditionContext, TriggerContext context) throws IllegalVariableEvaluationException { - RunContext runContext = conditionContext.getRunContext(); - - double random = Math.random(); - if (random < runContext.render(this.min).as(Double.class).orElseThrow()) { - return Optional.empty(); - } - - runContext.logger().info("Will create an execution"); - Execution execution = TriggerService.generateExecution( - this, - conditionContext, - context, - Output.builder().random(random).build() - ); - - return Optional.of(execution); - } - - @Builder - @Getter - public static class Output implements io.kestra.core.models.tasks.Output { - private Double random; - } -} diff --git a/src/main/java/io/kestra/plugin/templates/package-info.java b/src/main/java/io/kestra/plugin/templates/package-info.java deleted file mode 100644 index 50e452d..0000000 --- a/src/main/java/io/kestra/plugin/templates/package-info.java +++ /dev/null @@ -1,8 +0,0 @@ -@PluginSubGroup( - title = "Example plugin", - description = "A plugin to show how to build a plugin in Kestra.", - categories = PluginSubGroup.PluginCategory.TOOL -) -package io.kestra.plugin.templates; - -import io.kestra.core.models.annotations.PluginSubGroup; \ No newline at end of file diff --git a/src/main/resources/icons/io.kestra.plugin.email.MailExecution.svg b/src/main/resources/icons/io.kestra.plugin.email.MailExecution.svg new file mode 100644 index 0000000..7abd3dd --- /dev/null +++ b/src/main/resources/icons/io.kestra.plugin.email.MailExecution.svg @@ -0,0 +1,28 @@ + + + + +
+ + + + + + + + + + + + + + + + + + + + + + +
diff --git a/src/main/resources/icons/io.kestra.plugin.email.MailReceivedTrigger.svg b/src/main/resources/icons/io.kestra.plugin.email.MailReceivedTrigger.svg new file mode 100644 index 0000000..40c7f3e --- /dev/null +++ b/src/main/resources/icons/io.kestra.plugin.email.MailReceivedTrigger.svg @@ -0,0 +1,28 @@ + + + + +
+ + + + + + + + + + + + + + + + + + + + + + +
diff --git a/src/main/resources/icons/io.kestra.plugin.email.RealTimeTrigger.svg b/src/main/resources/icons/io.kestra.plugin.email.RealTimeTrigger.svg new file mode 100644 index 0000000..b667ccb --- /dev/null +++ b/src/main/resources/icons/io.kestra.plugin.email.RealTimeTrigger.svg @@ -0,0 +1,28 @@ + + + + +
+ + + + + + + + + + + + + + + + + + + + + + +
diff --git a/src/main/resources/icons/io.kestra.plugin.email.svg b/src/main/resources/icons/io.kestra.plugin.email.svg new file mode 100644 index 0000000..65ab510 --- /dev/null +++ b/src/main/resources/icons/io.kestra.plugin.email.svg @@ -0,0 +1,28 @@ + + + + +
+ + + + + + + + + + + + + + + + + + + + + + +
diff --git a/src/main/resources/icons/plugin-icon.svg b/src/main/resources/icons/plugin-icon.svg new file mode 100644 index 0000000..65ab510 --- /dev/null +++ b/src/main/resources/icons/plugin-icon.svg @@ -0,0 +1,28 @@ + + + + +
+ + + + + + + + + + + + + + + + + + + + + + +
diff --git a/src/main/resources/mail-template.hbs.peb b/src/main/resources/mail-template.hbs.peb new file mode 100644 index 0000000..6c66611 --- /dev/null +++ b/src/main/resources/mail-template.hbs.peb @@ -0,0 +1,211 @@ + + + + + + Flow [{{execution.namespace}}] {{execution.flowId}} execution + + + + + + + + + +
  +
+ + + + Kestra logo + + + + + + + + + +
+ + + + +
+

+ [{{execution.namespace}}] {{execution.flowId}} ➛ + {{execution.state.current}} + +

+ {% if customMessage is defined %} +

+ {{ customMessage }} +

+ {% endif %} +

+ {% if firstFailed == false %}Succeeded{% else %}Failed on task {{firstFailed.taskId}} {% endif %} after {{duration}} +

+ {% if link is defined %} + + + + + + +
+ + + + + + +
+ See + details
+
+ {% endif %} +

+ Namespace : {{execution.namespace}}

+

+ Flow : {{execution.flowId}}

+

+ Execution : {{execution.id}}

+

+ Status : {{execution.state.current}}

+ {% if lastTask is defined %} +

+ Final task ID : {{lastTask.taskId}}

+ {% endif %} + {% if customFields is defined %} + {% for entry in customFields %} +

+ {{entry.key}} : {{entry.value}}

+ {% endfor %} + {% endif %} +
+
+ + + + + + +
+
 
+ + diff --git a/src/main/resources/metadata/index.yaml b/src/main/resources/metadata/index.yaml index fd65c6b..1bfe654 100644 --- a/src/main/resources/metadata/index.yaml +++ b/src/main/resources/metadata/index.yaml @@ -1,8 +1,8 @@ -group: io.kestra.plugin.template -name: "template" -title: "Template" -description: "Plugin template for Kestra" -body: "" +group: io.kestra.plugin.email +name: "email" +title: "Email" +description: "Tasks that send email notifications." +body: "Configure SMTP server/port, credentials, from/to addresses, subject, and body/attachments to send emails from workflows; supports TLS/SSL settings for secure delivery." videos: [] createdBy: "Kestra Core Team" managedBy: "Kestra Core Team" diff --git a/src/main/resources/text-template.hbs.peb b/src/main/resources/text-template.hbs.peb new file mode 100644 index 0000000..4cc8efd --- /dev/null +++ b/src/main/resources/text-template.hbs.peb @@ -0,0 +1 @@ +Please view this email in a modern email client diff --git a/src/test/java/io/kestra/plugin/email/AbstractEmailTest.java b/src/test/java/io/kestra/plugin/email/AbstractEmailTest.java new file mode 100644 index 0000000..b56aabf --- /dev/null +++ b/src/test/java/io/kestra/plugin/email/AbstractEmailTest.java @@ -0,0 +1,104 @@ +package io.kestra.plugin.email; + +import io.kestra.core.junit.annotations.KestraTest; +import io.kestra.core.models.executions.Execution; +import io.kestra.core.queues.QueueFactoryInterface; +import io.kestra.core.queues.QueueInterface; +import io.kestra.core.runners.TestRunnerUtils; +import io.kestra.core.utils.Await; +import io.kestra.core.utils.TestsUtils; +import io.micronaut.context.ApplicationContext; +import io.micronaut.runtime.server.EmbeddedServer; +import jakarta.inject.Inject; +import jakarta.inject.Named; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.TestInstance; +import reactor.core.publisher.Flux; + +import java.time.Duration; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; + +import static io.kestra.core.tenant.TenantService.MAIN_TENANT; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; + +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +@KestraTest +public class AbstractEmailTest { + @Inject + protected EmbeddedServer embeddedServer; + + @Inject + protected ApplicationContext applicationContext; + + @Inject + @Named(QueueFactoryInterface.EXECUTION_NAMED) + protected QueueInterface executionQueue; + + @Inject + protected TestRunnerUtils runnerUtils; + + @BeforeEach + void reset() { + FakeWebhookController.data = null; + } + + /** + * waits for a webhook data to return a non-null value. + * + * @param dataSupplier supplier function that provides the data to check + * @param timeoutMs The maximum time to wait in milliseconds. + * @return The received data string. + * @throws InterruptedException if the thread is interrupted while sleeping. + * @throws TimeoutException if the data does not become non-null within the timeout period. + */ + public static String waitForWebhookData(Supplier dataSupplier, long timeoutMs) throws InterruptedException, TimeoutException { + try { + return Await.until( + dataSupplier::get, + Duration.ofMillis(100), + Duration.ofSeconds(5) + ); + } catch (TimeoutException e) { + throw new TimeoutException("Webhook data did not arrive within " + timeoutMs + "ms."); + } + } + + protected Execution runAndCaptureExecution(String triggeringFlowId, String notificationFlowId) throws Exception { + CountDownLatch queueCount = new CountDownLatch(1); + AtomicReference last = new AtomicReference<>(); + + Flux receive = TestsUtils.receive(executionQueue, execution -> { + if (execution.getLeft().getFlowId().equals(notificationFlowId)) { + last.set(execution.getLeft()); + queueCount.countDown(); + } + }); + + Execution execution; + + execution = runnerUtils.runOne( + MAIN_TENANT, + "io.kestra.tests", + triggeringFlowId + ); + + boolean await = queueCount.await(20, TimeUnit.SECONDS); + assertThat(await, is(true)); + + Execution triggeredExecution = last.get(); + assertThat(triggeredExecution, notNullValue()); + assertThat(triggeredExecution.getTrigger().getVariables().get("executionId"), is(execution.getId())); + + receive.blockLast(); + + return execution; + } +} \ No newline at end of file diff --git a/src/test/java/io/kestra/plugin/email/AbstractTriggerTest.java b/src/test/java/io/kestra/plugin/email/AbstractTriggerTest.java new file mode 100644 index 0000000..aa98107 --- /dev/null +++ b/src/test/java/io/kestra/plugin/email/AbstractTriggerTest.java @@ -0,0 +1,100 @@ +package io.kestra.plugin.email; + +import com.icegreen.greenmail.configuration.GreenMailConfiguration; +import com.icegreen.greenmail.junit5.GreenMailExtension; +import com.icegreen.greenmail.util.ServerSetup; +import io.kestra.core.models.executions.Execution; +import io.kestra.core.queues.QueueFactoryInterface; +import io.kestra.core.queues.QueueInterface; +import io.kestra.core.runners.FlowListeners; +import io.kestra.core.runners.TestRunner; +import io.kestra.core.utils.IdUtils; +import io.kestra.core.utils.TestsUtils; +import io.kestra.jdbc.runner.JdbcScheduler; +import io.kestra.scheduler.AbstractScheduler; +import io.kestra.worker.DefaultWorker; +import io.micronaut.context.ApplicationContext; +import io.kestra.core.junit.annotations.KestraTest; +import jakarta.inject.Inject; +import jakarta.inject.Named; +import jakarta.mail.*; +import jakarta.mail.internet.InternetAddress; +import jakarta.mail.internet.MimeMessage; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.extension.RegisterExtension; +import reactor.core.publisher.Flux; + +import java.util.Properties; +import java.util.concurrent.CountDownLatch; + +@KestraTest +public abstract class AbstractTriggerTest extends AbstractEmailTest { + + @RegisterExtension + static GreenMailExtension greenMail = new GreenMailExtension( + new ServerSetup[] { + new ServerSetup(3144, "127.0.0.1", ServerSetup.PROTOCOL_IMAP), + new ServerSetup(3145,"127.0.0.1",ServerSetup.PROTOCOL_POP3) + }).withConfiguration(GreenMailConfiguration.aConfig().withUser("test@localhost", "password")) + .withPerMethodLifecycle(false); + + @Inject + protected TestRunner runner; + + @Inject + protected ApplicationContext applicationContext; + + @Inject + protected FlowListeners flowListenersService; + + @Inject + @Named(QueueFactoryInterface.EXECUTION_NAMED) + protected QueueInterface executionQueue; + + @BeforeEach + void cleanupBeforeEach() throws Exception { + greenMail.purgeEmailFromAllMailboxes(); + } + + protected static class TestContext { + final DefaultWorker worker; + final AbstractScheduler scheduler; + final Flux receive; + + TestContext(ApplicationContext applicationContext, FlowListeners flowListeners, QueueInterface queue, + String flowId, CountDownLatch latch) { + this.worker = applicationContext.createBean(DefaultWorker.class, IdUtils.create(), 8, null); + this.scheduler = new JdbcScheduler(applicationContext, flowListeners); + this.receive = TestsUtils.receive(queue, execution -> { + if (execution.getLeft().getFlowId().equals(flowId)) { + latch.countDown(); + } + }); + } + + void start() { + worker.run(); + scheduler.run(); + } + + void shutdown() { + try { + worker.shutdown(); + scheduler.close(); + receive.blockLast(); + } catch (Exception ignored) {} + } + } + + protected void sendTestEmail(String subject, String from, String body) throws MessagingException { + Properties props = new Properties(); + Session session = Session.getDefaultInstance(props); + + MimeMessage message = new MimeMessage(session); + message.setFrom(new InternetAddress(from)); + message.addRecipient(Message.RecipientType.TO, new InternetAddress("test@localhost")); + message.setSubject(subject); + message.setText(body); + greenMail.getUserManager().getUser("test@localhost").deliver(message); + } +} \ No newline at end of file diff --git a/src/test/java/io/kestra/plugin/email/FakeWebhookController.java b/src/test/java/io/kestra/plugin/email/FakeWebhookController.java new file mode 100644 index 0000000..e34daeb --- /dev/null +++ b/src/test/java/io/kestra/plugin/email/FakeWebhookController.java @@ -0,0 +1,39 @@ +package io.kestra.plugin.email; + +import io.micronaut.http.HttpRequest; +import io.micronaut.http.HttpResponse; +import io.micronaut.http.MediaType; +import io.micronaut.http.annotation.Body; +import io.micronaut.http.annotation.Consumes; +import io.micronaut.http.annotation.Controller; +import io.micronaut.http.annotation.Post; + +import java.util.HashMap; +import java.util.Map; + +@Controller("/webhook-unit-test") +public class FakeWebhookController { + public static String data; + public static Map headers = new HashMap<>(); + + @Post + @Consumes({MediaType.APPLICATION_JSON, MediaType.APPLICATION_FORM_URLENCODED}) + public HttpResponse post(@Body String data) { + FakeWebhookController.data = data; + return HttpResponse.ok("ok"); + } + + @Post("/with-headers") + @Consumes({MediaType.APPLICATION_JSON, MediaType.APPLICATION_FORM_URLENCODED}) + public HttpResponse postWithHeaders(HttpRequest request, @Body String data) { + + FakeWebhookController.data = data; + request.getHeaders().forEach((name, values) -> { + if (!values.isEmpty()) { + headers.put(name, values.get(0)); + } + }); + + return HttpResponse.ok("ok"); + } +} diff --git a/src/test/java/io/kestra/plugin/email/MailExecutionTest.java b/src/test/java/io/kestra/plugin/email/MailExecutionTest.java new file mode 100644 index 0000000..63a51f8 --- /dev/null +++ b/src/test/java/io/kestra/plugin/email/MailExecutionTest.java @@ -0,0 +1,43 @@ +package io.kestra.plugin.email; + +import io.kestra.core.junit.annotations.KestraTest; +import io.kestra.core.models.flows.State; +import io.kestra.core.repositories.LocalFlowRepositoryLoader; +import io.kestra.core.runners.TestRunner; +import jakarta.inject.Inject; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.net.URISyntaxException; +import java.util.Objects; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.is; + +@KestraTest +class MailExecutionTest extends AbstractEmailTest { + @Inject + protected TestRunner runner; + + @Inject + protected LocalFlowRepositoryLoader repositoryLoader; + + @BeforeEach + void init() throws IOException, URISyntaxException { + repositoryLoader.load(Objects.requireNonNull(MailExecutionTest.class.getClassLoader().getResource("flows"))); + this.runner.run(); + } + + @Test + void testFlow() throws Exception { + var failedExecution = runAndCaptureExecution( + "main-flow-that-fails", + "mail" + ); + + assertThat(failedExecution.getTaskRunList(), hasSize(1)); + assertThat(failedExecution.getTaskRunList().getFirst().getState().getCurrent(), is(State.Type.FAILED)); + } +} diff --git a/src/test/java/io/kestra/plugin/email/MailReceivedTriggerTest.java b/src/test/java/io/kestra/plugin/email/MailReceivedTriggerTest.java new file mode 100644 index 0000000..89d8749 --- /dev/null +++ b/src/test/java/io/kestra/plugin/email/MailReceivedTriggerTest.java @@ -0,0 +1,199 @@ +package io.kestra.plugin.email; + +import io.kestra.core.models.executions.Execution; +import io.kestra.core.models.flows.Flow; +import io.kestra.core.models.flows.FlowWithSource; +import io.kestra.core.models.property.Property; +import io.kestra.core.runners.FlowListeners; +import io.kestra.core.utils.Await; +import io.kestra.core.utils.TestsUtils; +import io.kestra.plugin.core.debug.Return; +import io.kestra.plugin.email.MailReceivedTrigger; +import io.kestra.plugin.email.MailService; +import io.micronaut.test.context.TestContext; +import org.junit.jupiter.api.Test; +import reactor.core.publisher.Flux; + +import java.time.Duration; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicReference; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.*; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.spy; + +class MailReceivedTriggerTest extends AbstractTriggerTest { + + @Test + void MailReceivedTriggerWithPop3() throws Exception { + FlowListeners flowListenersServiceSpy = spy(this.flowListenersService); + MailReceivedTrigger mailTrigger = MailReceivedTrigger.builder() + .id("pop3-mail-trigger") + .type(MailReceivedTrigger.class.getName()) + .protocol(Property.ofValue(MailService.Protocol.POP3)) + .host(Property.ofValue("127.0.0.1")) + .port(Property.ofValue(3145)) + .username(Property.ofValue("test@localhost")) + .password(Property.ofValue("password")) + .ssl(Property.ofValue(false)) + .trustAllCertificates(Property.ofValue(true)) + .interval(Property.ofValue(Duration.ofSeconds(1))) + .build(); + + Flow testFlow = Flow.builder() + .id("mail-received-trigger-pop3") + .namespace("io.kestra.tests") + .revision(1) + .tasks(Collections.singletonList(Return.builder() + .id("process-pop3-emails") + .type(Return.class.getName()) + .format(Property.ofValue( + "POP3 emails received: {{trigger.total}} emails, latest: {{trigger.latestEmail.subject}}")) + .build())) + .triggers(Collections.singletonList(mailTrigger)) + .build(); + + FlowWithSource flow = FlowWithSource.of(testFlow, null); + doReturn(List.of(flow)).when(flowListenersServiceSpy).flows(); + + CountDownLatch queueCount = new CountDownLatch(1); + AtomicReference lastExecution = new AtomicReference<>(); + + Flux receive = TestsUtils.receive(executionQueue, execution -> { + if (execution.getLeft().getFlowId().equals("mail-received-trigger-pop3")) { + lastExecution.set(execution.getLeft()); + queueCount.countDown(); + } + }); + + TestContext testContext = new TestContext(applicationContext, flowListenersServiceSpy, executionQueue, + "mail-received-trigger-pop3", queueCount); + + try { + testContext.start(); + + Thread.sleep(Duration.ofSeconds(1).toMillis()); + + sendTestEmail("First Email", "sender1@example.com", "First test email body"); + + boolean await = queueCount.await(25, TimeUnit.SECONDS); + assertThat("POP3 emails trigger should execute", await, is(true)); + + try { + Await.until( + () -> lastExecution.get() != null, + Duration.ofMillis(100), + Duration.ofSeconds(2) + ); + } catch (TimeoutException e) { + throw new AssertionError("Execution was not captured within 2 seconds", e); + } + + Execution execution = lastExecution.get(); + + Map triggerVars = execution.getTrigger().getVariables(); + assertThat("Should have received emails", (Integer) triggerVars.get("total"), greaterThan(0)); + + @SuppressWarnings("unchecked") + Map latestEmail = (Map) triggerVars.get("latestEmail"); + assertThat("Latest email should have a subject", latestEmail.get("subject"), notNullValue()); + assertThat("Latest email should have a from address", latestEmail.get("from"), notNullValue()); + assertThat("Latest email subject should be one of the sent emails", + latestEmail.get("subject"), + (is("First Email"))); + } finally { + testContext.shutdown(); + receive.blockLast(); + } + } + + @Test + void MailReceivedTriggerWithImap() throws Exception { + FlowListeners flowListenersServiceSpy = spy(this.flowListenersService); + + MailReceivedTrigger mailTrigger = MailReceivedTrigger.builder() + .id("imap-mail-trigger") + .type(MailReceivedTrigger.class.getName()) + .protocol(Property.ofValue(MailService.Protocol.IMAP)) + .host(Property.ofValue("127.0.0.1")) + .port(Property.ofValue(3144)) + .username(Property.ofValue("test@localhost")) + .password(Property.ofValue("password")) + .ssl(Property.ofValue(false)) + .trustAllCertificates(Property.ofValue(true)) + .interval(Property.ofValue(Duration.ofSeconds(1))) + .build(); + + Flow testFlow = Flow.builder() + .id("mail-received-trigger-imap") + .namespace("io.kestra.tests") + .revision(1) + .tasks(Collections.singletonList(Return.builder() + .id("process-imap-emails") + .type(Return.class.getName()) + .format(Property.ofValue( + "IMAP emails received: {{trigger.total}} emails, latest: {{trigger.latestEmail.subject}}")) + .build())) + .triggers(Collections.singletonList(mailTrigger)) + .build(); + + FlowWithSource flow = FlowWithSource.of(testFlow, null); + doReturn(List.of(flow)).when(flowListenersServiceSpy).flows(); + + CountDownLatch queueCount = new CountDownLatch(1); + AtomicReference lastExecution = new AtomicReference<>(); + + Flux receive = TestsUtils.receive(executionQueue, execution -> { + if (execution.getLeft().getFlowId().equals("mail-received-trigger-imap")) { + lastExecution.set(execution.getLeft()); + queueCount.countDown(); + } + }); + + TestContext testContext = new TestContext(applicationContext, flowListenersServiceSpy, executionQueue, + "mail-received-trigger-imap", queueCount); + + try { + testContext.start(); + + Thread.sleep(Duration.ofSeconds(1).toMillis()); + + sendTestEmail("First Email", "sender1@example.com", "First test email body"); + + boolean await = queueCount.await(25, TimeUnit.SECONDS); + assertThat("IMAP emails trigger should execute", await, is(true)); + + try { + Await.until( + () -> lastExecution.get() != null, + Duration.ofMillis(100), + Duration.ofSeconds(2) + ); + } catch (TimeoutException e) { + throw new AssertionError("Execution was not captured within 2 seconds", e); + } + + Execution execution = lastExecution.get(); + + Map triggerVars = execution.getTrigger().getVariables(); + assertThat("Should have received emails", (Integer) triggerVars.get("total"), greaterThan(0)); + + @SuppressWarnings("unchecked") + Map latestEmail = (Map) triggerVars.get("latestEmail"); + assertThat("Latest email should have a subject", latestEmail.get("subject"), notNullValue()); + assertThat("Latest email should have a from address", latestEmail.get("from"), notNullValue()); + assertThat("Latest email subject should be one of the sent emails", + latestEmail.get("subject"), + (is("First Email"))); + } finally { + testContext.shutdown(); + receive.blockLast(); + } + } +} diff --git a/src/test/java/io/kestra/plugin/email/MailSendTest.java b/src/test/java/io/kestra/plugin/email/MailSendTest.java new file mode 100644 index 0000000..40fabcd --- /dev/null +++ b/src/test/java/io/kestra/plugin/email/MailSendTest.java @@ -0,0 +1,299 @@ +package io.kestra.plugin.email; + +import com.google.common.collect.ImmutableMap; +import com.google.common.io.Files; +import com.icegreen.greenmail.junit5.GreenMailExtension; +import com.icegreen.greenmail.util.ServerSetupTest; +import io.kestra.core.junit.annotations.KestraTest; +import io.kestra.core.models.property.Property; +import io.kestra.core.runners.RunContext; +import io.kestra.core.runners.RunContextFactory; +import io.kestra.core.storages.StorageInterface; +import io.kestra.plugin.email.MailExecution; +import io.kestra.plugin.email.MailSend; +import jakarta.inject.Inject; +import jakarta.mail.Message; +import jakarta.mail.internet.InternetAddress; +import jakarta.mail.internet.MimeBodyPart; +import jakarta.mail.internet.MimeMessage; +import jakarta.mail.internet.MimeMultipart; +import org.apache.commons.io.IOUtils; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.simplejavamail.MailException; +import org.simplejavamail.api.mailer.config.TransportStrategy; + +import java.io.File; +import java.io.FileInputStream; +import java.net.URI; +import java.net.URL; +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +import static io.kestra.core.tenant.TenantService.MAIN_TENANT; +import static org.hamcrest.CoreMatchers.containsString; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; + +@KestraTest +public class MailSendTest { + @RegisterExtension + static GreenMailExtension greenMail = new GreenMailExtension(ServerSetupTest.SMTP); + + private static final String FROM = "from@mail.com"; + private static final String TO = "to@mail.com"; + private static final String SUBJECT = "Mail subject"; + private static String template = null; + private static String textTemplate = null; + + @Inject + StorageInterface storageInterface; + + @BeforeAll + public static void setup() throws Exception { + + template = Files.asCharSource( + new File(Objects.requireNonNull(MailExecution.class.getClassLoader() + .getResource("mail-template.hbs.peb")) + .toURI()), + StandardCharsets.UTF_8 + ).read(); + + textTemplate = Files.asCharSource( + new File(Objects.requireNonNull(MailExecution.class.getClassLoader() + .getResource("text-template.hbs.peb")) + .toURI()), + StandardCharsets.UTF_8 + ).read(); + } + + @Inject + private RunContextFactory runContextFactory; + + private RunContext getRunContext() { + return runContextFactory.of(Map.of( + "firstFailed", false, + "execution", ImmutableMap.of( + "id", "#aBcDeFgH", + "flowId", "mail", + "namespace", "org.test", + "state", ImmutableMap.of( + "current", "SUCCESS" + ) + ), + "duration", Duration.ofMillis(123456), + "flow", ImmutableMap.of( + "id", "mail" + ), + "link", "http://todo.com", + "customFields", ImmutableMap.of( + "Env", "dev" + ), + "customMessage", "myCustomMessage", + "attachments", """ + [ + { + "name": "application.yml", + "uri": "kestra:///file/storage/get.yml", + "contentType": "text/yaml" + } + ]""" + )); + } + + @Test + @DisplayName("Send email with html and plain text contents") + void sendEmail() throws Exception { + RunContext runContext = getRunContext(); + String attachmentFilename = "application.yml"; + URL attachmentResource = MailSendTest.class.getClassLoader().getResource(attachmentFilename); + URI putAttachment = storageInterface.put( + MAIN_TENANT, + null, + new URI("/file/storage/get.yml"), + new FileInputStream(Objects.requireNonNull(attachmentResource).getFile()) + ); + String embeddedImageFilename = "kestra.png"; + URL embeddedImageResource = MailSendTest.class.getClassLoader().getResource(embeddedImageFilename); + URI putEmbeddedImage = storageInterface.put( + MAIN_TENANT, + null, + new URI("/file/storage/get-embedded-image.yml"), + new FileInputStream(Objects.requireNonNull(embeddedImageResource).getFile()) + ); + + MailSend mailSend = MailSend.builder() + .host(Property.ofValue("localhost")) + .port(Property.ofValue(greenMail.getSmtp().getPort())) + .from(Property.ofValue(FROM)) + .to(Property.ofValue(TO)) + .subject(Property.ofValue(SUBJECT)) + .htmlTextContent(Property.ofExpression(template)) + .plainTextContent(Property.ofValue(textTemplate)) + .transportStrategy(Property.ofValue(TransportStrategy.SMTP)) + .attachments(Property.ofExpression("{{ attachments | toJson }}")) + .embeddedImages(Property.ofValue( + List.of(MailSend.Attachment.builder() + .name(Property.ofValue(embeddedImageFilename)) + .uri(Property.ofValue(putEmbeddedImage.toString())) + .contentType(Property.ofValue("image/png")) + .build() + ) + ) + ) + .build(); + + mailSend.run(runContext); + + MimeMessage[] receivedMessages = greenMail.getReceivedMessages(); + + assertThat(receivedMessages.length, is(1)); + + MimeMessage mimeMessage = receivedMessages[0]; + MimeMultipart content = (MimeMultipart) mimeMessage.getContent(); + + assertThat(content.getCount(), is(2)); + + MimeBodyPart bodyPart = ((MimeBodyPart) content.getBodyPart(0)); + String body = IOUtils.toString(bodyPart.getInputStream(), StandardCharsets.UTF_8); + + assertThat(mimeMessage.getFrom()[0].toString(), is(FROM)); + assertThat(((InternetAddress) mimeMessage.getRecipients(Message.RecipientType.TO)[0]).getAddress(), is(TO)); + assertThat(mimeMessage.getSubject(), is(SUBJECT)); + assertThat(body, containsString("Please view this email in a modern email client")); + assertThat(body, containsString("Namespace : or=\r\ng.test")); + assertThat(body, containsString("Flow : mail")); + assertThat(body, containsString("Execution : #a=\r\nBcDeFgH")); + assertThat(body, containsString("Status : SUCCE=\r\nSS")); + assertThat(body, containsString("Env : dev")); + assertThat(body, containsString("myCustomMessage")); + assertThat(body, containsString("Content-Type: image/png; filename=kestra.png; name=kestra.png")); + + MimeBodyPart filePart = ((MimeBodyPart) content.getBodyPart(1)); + String file = IOUtils.toString(filePart.getInputStream(), StandardCharsets.UTF_8); + + assertThat(filePart.getContentType(), is("text/yaml; filename=" + attachmentFilename + "; name=" + attachmentFilename)); + assertThat(filePart.getFileName(), is(attachmentFilename)); + assertThat(file.replace("\r", ""), is(IOUtils.toString(storageInterface.get(MAIN_TENANT, null, putAttachment), StandardCharsets.UTF_8))); + } + + @Test + @DisplayName("Send email with exception with an html and plain text contents") + void testThrowsMailException() { + RunContext runContext = getRunContext(); + + Assertions.assertThrows(MailException.class, () -> { + MailSend mailSend = MailSend.builder() + .host(Property.ofValue("fake-host-unknown.com")) + .port(Property.ofValue(465)) + .from(Property.ofValue(FROM)) + .to(Property.ofValue(TO)) + .subject(Property.ofValue(SUBJECT)) + .htmlTextContent(Property.ofExpression(template)) + .plainTextContent(Property.ofValue(textTemplate)) + .transportStrategy(Property.ofValue(TransportStrategy.SMTP)) + .build(); + + mailSend.run(runContext); + }); + } + + @Test + void sendsEmailWithCsvAttachment_fromYamlList() throws Exception { + RunContext runContext = getRunContext(); + + String src = "application.yml"; + URL res = MailSendTest.class.getClassLoader().getResource(src); + URI putDataset = storageInterface.put( + MAIN_TENANT, + null, + new URI("/file/storage/products.csv"), + new FileInputStream(Objects.requireNonNull(res).getFile()) + ); + + List> attachments = List.of( + Map.of( + "name", "data.csv", + "uri", putDataset.toString(), + "contentType", "text/csv" + ) + ); + + MailSend mailSend = MailSend.builder() + .host(Property.ofValue("localhost")) + .port(Property.ofValue(greenMail.getSmtp().getPort())) + .from(Property.ofValue(FROM)) + .to(Property.ofValue(TO)) + .subject(Property.ofValue(SUBJECT)) + .htmlTextContent(Property.ofValue("find attached your dataset as a CSV file")) + .plainTextContent(Property.ofValue(textTemplate)) + .transportStrategy(Property.ofValue(TransportStrategy.SMTP)) + .attachments(Property.ofValue(attachments)) + .build(); + + mailSend.run(runContext); + + MimeMessage[] received = greenMail.getReceivedMessages(); + assertThat(received.length, is(1)); + + MimeMessage mimeMessage = received[0]; + MimeMultipart content = (MimeMultipart) mimeMessage.getContent(); + + assertThat(content.getCount(), is(2)); + + MimeBodyPart bodyPart = (MimeBodyPart) content.getBodyPart(0); + String body = IOUtils.toString(bodyPart.getInputStream(), StandardCharsets.UTF_8); + + assertThat(mimeMessage.getFrom()[0].toString(), is(FROM)); + assertThat(((InternetAddress) mimeMessage.getRecipients(Message.RecipientType.TO)[0]).getAddress(), is(TO)); + assertThat(mimeMessage.getSubject(), is(SUBJECT)); + assertThat(body, containsString("find attached your dataset as a CSV file")); + + MimeBodyPart filePart = (MimeBodyPart) content.getBodyPart(1); + String file = IOUtils.toString(filePart.getInputStream(), StandardCharsets.UTF_8); + + assertThat(filePart.getContentType(), containsString("text/csv")); + assertThat(filePart.getFileName(), is("data.csv")); + assertThat(file.replace("\r", ""), + is(IOUtils.toString(storageInterface.get(MAIN_TENANT, null, putDataset), StandardCharsets.UTF_8))); + } + + @Test + @DisplayName("Send email with SSL trust configuration") + void sendEmailWithSslTrustConfig() throws Exception { + RunContext runContext = getRunContext(); + + MailSend mailSend = MailSend.builder() + .host(Property.ofValue("localhost")) + .port(Property.ofValue(greenMail.getSmtp().getPort())) + .from(Property.ofValue(FROM)) + .to(Property.ofValue(TO)) + .subject(Property.ofValue(SUBJECT)) + .htmlTextContent(Property.ofValue("

Test email with SSL trust configuration

")) + .plainTextContent(Property.ofValue("Test email with SSL trust configuration")) + .transportStrategy(Property.ofValue(TransportStrategy.SMTP)) + .verifyServerIdentity(Property.ofValue(false)) + .trustedHosts(Property.ofValue(List.of("localhost", "127.0.0.1"))) + .build(); + + mailSend.run(runContext); + + MimeMessage[] receivedMessages = greenMail.getReceivedMessages(); + + assertThat(receivedMessages.length, is(1)); + + MimeMessage mimeMessage = receivedMessages[0]; + assertThat(mimeMessage.getFrom()[0].toString(), is(FROM)); + assertThat(((InternetAddress) mimeMessage.getRecipients(Message.RecipientType.TO)[0]).getAddress(), is(TO)); + assertThat(mimeMessage.getSubject(), is(SUBJECT)); + + String body = IOUtils.toString(mimeMessage.getInputStream(), StandardCharsets.UTF_8); + assertThat(body, containsString("Test email with SSL trust configuration")); + } +} diff --git a/src/test/java/io/kestra/plugin/email/RealTimeTriggerTest.java b/src/test/java/io/kestra/plugin/email/RealTimeTriggerTest.java new file mode 100644 index 0000000..a907dce --- /dev/null +++ b/src/test/java/io/kestra/plugin/email/RealTimeTriggerTest.java @@ -0,0 +1,183 @@ +package io.kestra.plugin.email; + +import io.kestra.core.models.executions.Execution; +import io.kestra.core.models.flows.Flow; +import io.kestra.core.models.flows.FlowWithSource; +import io.kestra.core.models.property.Property; +import io.kestra.core.runners.FlowListeners; +import io.kestra.core.utils.Await; +import io.kestra.core.utils.TestsUtils; +import io.kestra.plugin.core.debug.Return; +import org.junit.jupiter.api.Test; +import reactor.core.publisher.Flux; + +import java.time.Duration; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicReference; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.*; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.spy; + +class RealTimeTriggerTest extends AbstractTriggerTest { + + @Test + void RealTimeTriggerWithPop3() throws Exception { + FlowListeners flowListenersServiceSpy = spy(this.flowListenersService); + + RealTimeTrigger mailTrigger = RealTimeTrigger.builder() + .id("pop3-mail-trigger") + .type(RealTimeTrigger.class.getName()) + .protocol(Property.ofValue(MailService.Protocol.POP3)) + .host(Property.ofValue("127.0.0.1")) + .port(Property.ofValue(3145)) + .username(Property.ofValue("test@localhost")) + .password(Property.ofValue("password")) + .ssl(Property.ofValue(false)) + .trustAllCertificates(Property.ofValue(true)) + .interval(Property.ofValue(Duration.ofSeconds(1))) + .build(); + + Flow testFlow = Flow.builder() + .id("real-time-trigger-pop3") + .namespace("io.kestra.tests") + .revision(1) + .tasks(Collections.singletonList(Return.builder() + .id("process-pop3-emails") + .type(Return.class.getName()) + .format(Property.ofValue( + "POP3 email received: {{trigger.subject}} from {{trigger.from}}")) + .build())) + .triggers(Collections.singletonList(mailTrigger)) + .build(); + + FlowWithSource flow = FlowWithSource.of(testFlow, null); + doReturn(List.of(flow)).when(flowListenersServiceSpy).flows(); + + CountDownLatch queueCount = new CountDownLatch(1); + AtomicReference lastExecution = new AtomicReference<>(); + + Flux receive = TestsUtils.receive(executionQueue, execution -> { + if (execution.getLeft().getFlowId().equals("real-time-trigger-pop3")) { + lastExecution.set(execution.getLeft()); + queueCount.countDown(); + } + }); + + TestContext testContext = new TestContext(applicationContext, flowListenersServiceSpy, executionQueue, + "real-time-trigger-pop3", queueCount); + + try { + testContext.start(); + sendTestEmail("Test Email", "sender@example.com", "Test email body"); + + Thread.sleep(Duration.ofSeconds(2).toMillis()); + + boolean await = queueCount.await(20, TimeUnit.SECONDS); + assertThat("POP3 emails trigger should execute", await, is(true)); + + try { + Await.until( + () -> lastExecution.get() != null, + Duration.ofMillis(100), + Duration.ofSeconds(2) + ); + } catch (TimeoutException e) { + throw new AssertionError("Execution was not captured within 2 seconds", e); + } + + Execution execution = lastExecution.get(); + + Map triggerVars = execution.getTrigger().getVariables(); + assertThat("Latest email subject should be present", triggerVars.get("subject"), notNullValue()); + assertThat("Latest email sender should be present", triggerVars.get("from"), notNullValue()); + assertThat("Latest email body should be present", triggerVars.get("body"), notNullValue()); + } finally { + testContext.shutdown(); + receive.blockLast(); + } + } + + @Test + void RealTimeTriggerWithImap() throws Exception { + FlowListeners flowListenersServiceSpy = spy(this.flowListenersService); + + RealTimeTrigger mailTrigger = RealTimeTrigger.builder() + .id("imap-real-time-trigger") + .type(RealTimeTrigger.class.getName()) + .protocol(Property.ofValue(MailService.Protocol.IMAP)) + .host(Property.ofValue("127.0.0.1")) + .port(Property.ofValue(3144)) + .username(Property.ofValue("test@localhost")) + .password(Property.ofValue("password")) + .ssl(Property.ofValue(false)) + .trustAllCertificates(Property.ofValue(true)) + .interval(Property.ofValue(Duration.ofSeconds(1))) + .build(); + + Flow testFlow = Flow.builder() + .id("real-time-trigger-imap") + .namespace("io.kestra.tests") + .revision(1) + .tasks(Collections.singletonList(Return.builder() + .id("process-imap-emails") + .type(Return.class.getName()) + .format(Property.ofValue( + "IMAP email received: {{trigger.subject}} from {{trigger.from}}")) + .build())) + .triggers(Collections.singletonList(mailTrigger)) + .build(); + + FlowWithSource flow = FlowWithSource.of(testFlow, null); + doReturn(List.of(flow)).when(flowListenersServiceSpy).flows(); + + CountDownLatch queueCount = new CountDownLatch(1); + AtomicReference lastExecution = new AtomicReference<>(); + + Flux receive = TestsUtils.receive(executionQueue, execution -> { + if (execution.getLeft().getFlowId().equals("real-time-trigger-imap")) { + lastExecution.set(execution.getLeft()); + queueCount.countDown(); + } + }); + + TestContext testContext = new TestContext(applicationContext, flowListenersServiceSpy, executionQueue, + "real-time-trigger-imap", queueCount); + + try { + testContext.start(); + + Thread.sleep(Duration.ofSeconds(2).toMillis()); + + sendTestEmail("Test Email", "sender@example.com", "Test email body"); + boolean await = queueCount.await(30, TimeUnit.SECONDS); + assertThat("IMAP emails trigger should execute", await, is(true)); + + try { + Await.until( + () -> lastExecution.get() != null, + Duration.ofMillis(100), + Duration.ofSeconds(2) + ); + } catch (TimeoutException e) { + throw new AssertionError("Execution was not captured within 2 seconds", e); + } + + Execution execution = lastExecution.get(); + + Map triggerVars = execution.getTrigger().getVariables(); + assertThat("Latest email subject should be present", triggerVars.get("subject"), notNullValue()); + assertThat("Latest email sender should be present", triggerVars.get("from"), notNullValue()); + assertThat("Latest email body should be present", triggerVars.get("body"), notNullValue()); + } finally { + testContext.shutdown(); + receive.blockLast(); + } + } +} \ No newline at end of file diff --git a/src/test/java/io/kestra/plugin/templates/ExampleRunnerTest.java b/src/test/java/io/kestra/plugin/templates/ExampleRunnerTest.java deleted file mode 100644 index e3e6cfa..0000000 --- a/src/test/java/io/kestra/plugin/templates/ExampleRunnerTest.java +++ /dev/null @@ -1,26 +0,0 @@ -package io.kestra.plugin.templates; - -import io.kestra.core.junit.annotations.ExecuteFlow; -import io.kestra.core.junit.annotations.KestraTest; -import org.junit.jupiter.api.Test; -import io.kestra.core.models.executions.Execution; - -import java.util.Map; - -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.hasSize; -import static org.hamcrest.Matchers.is; - -/** - * This test will start Kestra, load the flow located in `src/test/resources/flows/example.yaml`, and execute it. - * The Kestra configuration file is in `src/test/resources/application.yml`, it configures the in-memory backend for tests. - */ -@KestraTest(startRunner = true) // This annotation starts an embedded Kestra for tests -class ExampleRunnerTest { - @Test - @ExecuteFlow("flows/example.yaml") - void flow(Execution execution) { - assertThat(execution.getTaskRunList(), hasSize(3)); - assertThat(((Map)execution.getTaskRunList().get(2).getOutputs().get("child")).get("value"), is("task-id")); - } -} diff --git a/src/test/java/io/kestra/plugin/templates/ExampleTest.java b/src/test/java/io/kestra/plugin/templates/ExampleTest.java deleted file mode 100644 index d2e8d38..0000000 --- a/src/test/java/io/kestra/plugin/templates/ExampleTest.java +++ /dev/null @@ -1,38 +0,0 @@ -package io.kestra.plugin.templates; - -import io.kestra.core.junit.annotations.KestraTest; -import io.kestra.core.models.property.Property; -import org.apache.commons.lang3.StringUtils; -import org.junit.jupiter.api.Test; -import io.kestra.core.runners.RunContext; -import io.kestra.core.runners.RunContextFactory; - -import jakarta.inject.Inject; - -import java.util.Map; - -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.is; - -/** - * This test will only test the main task, this allow you to send any input - * parameters to your task and test the returning behaviour easily. - */ -@KestraTest -class ExampleTest { - @Inject - private RunContextFactory runContextFactory; - - @Test - void run() throws Exception { - RunContext runContext = runContextFactory.of(Map.of("variable", "John Doe")); - - Example task = Example.builder() - .format(new Property<>("Hello {{ variable }}")) - .build(); - - Example.Output runOutput = task.run(runContext); - - assertThat(runOutput.getChild().getValue(), is(StringUtils.reverse("Hello John Doe"))); - } -} diff --git a/src/test/resources/flows/common/main-flow-that-fails.yaml b/src/test/resources/flows/common/main-flow-that-fails.yaml new file mode 100644 index 0000000..1812354 --- /dev/null +++ b/src/test/resources/flows/common/main-flow-that-fails.yaml @@ -0,0 +1,5 @@ +id: main-flow-that-fails +namespace: io.kestra.tests +tasks: + - id: failed + type: io.kestra.core.tasks.executions.Fail \ No newline at end of file diff --git a/src/test/resources/flows/common/main-flow-that-succeeds.yaml b/src/test/resources/flows/common/main-flow-that-succeeds.yaml new file mode 100644 index 0000000..2838235 --- /dev/null +++ b/src/test/resources/flows/common/main-flow-that-succeeds.yaml @@ -0,0 +1,6 @@ +id: main-flow-that-succeeds +namespace: io.kestra.tests +tasks: + - id: success + type: io.kestra.plugin.core.debug.Return + format: "{{ task.id }} > {{ taskrun.startDate }}" \ No newline at end of file diff --git a/src/test/resources/flows/example.yaml b/src/test/resources/flows/example.yaml deleted file mode 100644 index c475b05..0000000 --- a/src/test/resources/flows/example.yaml +++ /dev/null @@ -1,13 +0,0 @@ -id: example -namespace: io.kestra.templates - -tasks: -- id: date - type: io.kestra.plugin.templates.Example - format: "{{taskrun.startDate}}" -- id: task-id - type: io.kestra.plugin.templates.Example - format: "{{task.id}}" -- id: flow-id - type: io.kestra.plugin.templates.Example - format: "{{outputs['task-id'].child.value}}" diff --git a/src/test/resources/flows/mail.yaml b/src/test/resources/flows/mail.yaml new file mode 100644 index 0000000..81f0bfe --- /dev/null +++ b/src/test/resources/flows/mail.yaml @@ -0,0 +1,25 @@ +id: mail +namespace: io.kestra.tests + +tasks: + - id: mail + type: io.kestra.plugin.email.MailExecution + to: to@mail.com + from: from@mail.com + subject: This is the subject + host: nohost-mail.site + port: 465 + username: user + password: pass + sessionTimeout: 1000 + transportStrategy: SMTPS + +triggers: + - id: on_success + type: io.kestra.plugin.core.trigger.Flow + preconditions: + id: flow_trigger + flows: + - namespace: io.kestra.tests + flowId: main-flow-that-fails + states: [ FAILED ] diff --git a/src/test/resources/kestra.png b/src/test/resources/kestra.png new file mode 100644 index 0000000..99044bc Binary files /dev/null and b/src/test/resources/kestra.png differ diff --git a/src/test/resources/mail.hbs.html b/src/test/resources/mail.hbs.html new file mode 100644 index 0000000..3dbf758 --- /dev/null +++ b/src/test/resources/mail.hbs.html @@ -0,0 +1,208 @@ + + + + + + Flow [org.test] mail execution + + + + + + + + + +
  + +
+ + + + Kestra logo + + + + + + + + +
+ + + + +
+

[org.test] mail → SUCCESS +

+ + + + + + +
+ + + + + + +
See details
+
+

Namespace : org.test

+

Flow : mail

+

Execution : #aBcDeFgH

+

Status : SUCCESS +

+
+ + + + + + +
+
  +
+ +