Skip to content

Commit 2e1d12c

Browse files
Merge pull request #2461 from RandithaK/feature/flex-schema-email-executor
Flexible schema support with default fallback in Email Executor
2 parents 60ca881 + 256493a commit 2e1d12c

7 files changed

Lines changed: 569 additions & 76 deletions

File tree

backend/internal/flow/common/constants.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,8 @@ const (
193193
RuntimeKeyUserAttributesCacheTTLSeconds = "user_attributes_cache_ttl_seconds"
194194
// RuntimeKeyInviteLink holds the generated invite link for downstream executors (e.g., EmailExecutor).
195195
RuntimeKeyInviteLink = "inviteLink"
196+
// RuntimeKeySkipDelivery indicates that delivery should be skipped for the current flow.
197+
RuntimeKeySkipDelivery = "skipDelivery"
196198
// RuntimeKeyCandidateUsers holds serialized candidate users during disambiguation in resolve mode.
197199
RuntimeKeyCandidateUsers = "candidateUsers"
198200
)
@@ -203,6 +205,8 @@ const (
203205
const (
204206
// InputTypeText represents a text input type.
205207
InputTypeText = "TEXT_INPUT"
208+
// InputTypeEmail represents an email input type.
209+
InputTypeEmail = "EMAIL_INPUT"
206210
// InputTypePassword represents a password credential input type.
207211
InputTypePassword = "PASSWORD_INPUT"
208212
// InputTypeOTP represents a one-time password input type.

backend/internal/flow/executor/email_executor.go

Lines changed: 68 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"errors"
2323
"fmt"
2424

25+
"github.com/asgardeo/thunder/internal/entityprovider"
2526
"github.com/asgardeo/thunder/internal/flow/common"
2627
"github.com/asgardeo/thunder/internal/flow/core"
2728
"github.com/asgardeo/thunder/internal/system/email"
@@ -36,25 +37,36 @@ type emailExecutor struct {
3637
logger *log.Logger
3738
emailClient email.EmailClientInterface
3839
templateService template.TemplateServiceInterface
40+
entityProvider entityprovider.EntityProviderInterface
41+
}
42+
43+
// defaultEmailInput is the default input definition for email collection.
44+
var defaultEmailInput = common.Input{
45+
Ref: "email_input",
46+
Identifier: userAttributeEmail,
47+
Type: common.InputTypeEmail,
48+
Required: true,
3949
}
4050

4151
// newEmailExecutor creates a new instance of the email executor.
4252
func newEmailExecutor(flowFactory core.FlowFactoryInterface, emailClient email.EmailClientInterface,
43-
templateService template.TemplateServiceInterface) *emailExecutor {
53+
templateService template.TemplateServiceInterface,
54+
entityProvider entityprovider.EntityProviderInterface) *emailExecutor {
4455
logger := log.GetLogger().With(log.String(log.LoggerKeyComponentName, "EmailExecutor"))
4556
base := flowFactory.CreateExecutor(
4657
ExecutorNameEmailExecutor,
4758
common.ExecutorTypeUtility,
4859
[]common.Input{},
4960
[]common.Input{
50-
{Identifier: userAttributeEmail, Type: common.InputTypeText, Required: true},
61+
defaultEmailInput,
5162
},
5263
)
5364
return &emailExecutor{
5465
ExecutorInterface: base,
5566
logger: logger,
5667
emailClient: emailClient,
5768
templateService: templateService,
69+
entityProvider: entityProvider,
5870
}
5971
}
6072

@@ -79,6 +91,12 @@ func (e *emailExecutor) executeSend(ctx *core.NodeContext) (*common.ExecutorResp
7991
RuntimeData: make(map[string]string),
8092
}
8193

94+
if ctx.RuntimeData[common.RuntimeKeySkipDelivery] == dataValueTrue {
95+
logger.Debug("Delivery marked as skipped, completing without sending email")
96+
execResp.Status = common.ExecComplete
97+
return execResp, nil
98+
}
99+
82100
if e.emailClient == nil {
83101
execResp.AdditionalData[common.DataEmailSent] = dataValueFalse
84102
execResp.Status = common.ExecFailure
@@ -92,7 +110,10 @@ func (e *emailExecutor) executeSend(ctx *core.NodeContext) (*common.ExecutorResp
92110
}
93111

94112
// Resolve recipient email from user inputs or runtime data.
95-
recipient := resolveRecipientEmail(ctx)
113+
recipient, err := e.resolveRecipientEmail(ctx, logger)
114+
if err != nil {
115+
return nil, err
116+
}
96117
if recipient == "" {
97118
logger.Debug("Email recipient not found in user inputs or runtime data")
98119
execResp.Status = common.ExecFailure
@@ -125,7 +146,6 @@ func (e *emailExecutor) executeSend(ctx *core.NodeContext) (*common.ExecutorResp
125146
"inviteLink": inviteLink,
126147
"appName": ctx.Application.Name,
127148
}
128-
129149
rendered, svcErr := e.templateService.Render(ctx.Context, scenario, template.TemplateTypeEmail, templateData)
130150
if svcErr != nil {
131151
return nil, fmt.Errorf("failed to render email template: %s", svcErr.Code)
@@ -156,15 +176,41 @@ func (e *emailExecutor) executeSend(ctx *core.NodeContext) (*common.ExecutorResp
156176
return execResp, nil
157177
}
158178

159-
// resolveRecipientEmail retrieves the recipient email from user inputs or runtime data.
160-
func resolveRecipientEmail(ctx *core.NodeContext) string {
161-
if recipientEmail, ok := ctx.UserInputs[userAttributeEmail]; ok && recipientEmail != "" {
162-
return recipientEmail
179+
// resolveRecipientEmail retrieves the recipient email from user inputs, runtime data, or forwarded data.
180+
func (e *emailExecutor) resolveRecipientEmail(ctx *core.NodeContext, logger *log.Logger) (string, error) {
181+
emailAttr := e.resolveEmailInput(ctx).Identifier
182+
183+
if recipientEmail, ok := ctx.ForwardedData[emailAttr]; ok {
184+
if emailStr, isString := recipientEmail.(string); isString && emailStr != "" {
185+
return emailStr, nil
186+
}
163187
}
164-
if recipientEmail, ok := ctx.RuntimeData[userAttributeEmail]; ok && recipientEmail != "" {
165-
return recipientEmail
188+
if recipientEmail, ok := ctx.RuntimeData[emailAttr]; ok && recipientEmail != "" {
189+
return recipientEmail, nil
166190
}
167-
return ""
191+
if recipientEmail, ok := ctx.UserInputs[emailAttr]; ok && recipientEmail != "" {
192+
return recipientEmail, nil
193+
}
194+
195+
if userID, ok := ctx.RuntimeData[userAttributeUserID]; ok && userID != "" {
196+
if e.entityProvider == nil {
197+
return "", errors.New("entity provider is not configured for email resolution")
198+
}
199+
user, providerErr := e.entityProvider.GetEntity(userID)
200+
if providerErr != nil {
201+
if providerErr.Code == entityprovider.ErrorCodeEntityNotFound {
202+
return "", nil
203+
}
204+
return "", fmt.Errorf("failed to fetch user from entity provider: %w", providerErr)
205+
}
206+
207+
if recipientEmail, err := GetUserAttribute(user, emailAttr); err == nil {
208+
return recipientEmail, nil
209+
}
210+
logger.Debug("Email attribute not found in user entity", log.String("attribute", emailAttr))
211+
}
212+
213+
return "", nil
168214
}
169215

170216
// isEmailError returns true if the error originated from the email subsystem,
@@ -180,3 +226,14 @@ func isEmailError(err error) bool {
180226
errors.Is(err, email.ErrorSMTPAuth) ||
181227
errors.Is(err, email.ErrorEmailSendFailed)
182228
}
229+
230+
// resolveEmailInput returns the EMAIL_INPUT definition from the node context inputs,
231+
// falling back to the default if none is found.
232+
func (e *emailExecutor) resolveEmailInput(ctx *core.NodeContext) common.Input {
233+
for _, input := range ctx.NodeInputs {
234+
if input.Type == common.InputTypeEmail {
235+
return input
236+
}
237+
}
238+
return defaultEmailInput
239+
}

0 commit comments

Comments
 (0)