-
Notifications
You must be signed in to change notification settings - Fork 85
Open
Description
app-functions-sdk-go/internal/app/triggermessageprocessor.go
Lines 102 to 176 in 0835601
// MessageReceived provides runtime orchestration to pass the envelope / context to configured pipeline(s) along with a response callback to execute on each completion. | |
func (mp *triggerMessageProcessor) MessageReceived(ctx interfaces.AppFunctionContext, envelope types.MessageEnvelope, responseHandler interfaces.PipelineResponseHandler) error { | |
mp.messagesReceived.Inc(1) | |
lc := mp.serviceBinding.LoggingClient() | |
lc.Debugf("trigger attempting to find pipeline(s) for topic %s", envelope.ReceivedTopic) | |
// ensure we have a context established that we can safely cast to *appfunction.Context to pass to runtime | |
if _, ok := ctx.(*appfunction.Context); ctx == nil || !ok { | |
ctx = mp.serviceBinding.BuildContext(envelope) | |
} | |
pipelines := mp.serviceBinding.GetMatchingPipelines(envelope.ReceivedTopic) | |
lc.Debugf("trigger found %d pipeline(s) that match the incoming topic '%s'", len(pipelines), envelope.ReceivedTopic) | |
var finalErr error | |
errorCollectionLock := sync.RWMutex{} | |
pipelinesWaitGroup := sync.WaitGroup{} | |
appContext, ok := ctx.(*appfunction.Context) | |
if !ok { | |
return fmt.Errorf("context received was not *appfunction.Context (%T)", ctx) | |
} | |
targetData, err, isInvalidMessage := mp.serviceBinding.DecodeMessage(appContext, envelope) | |
if err != nil { | |
if isInvalidMessage { | |
mp.invalidMessagesReceived.Inc(1) | |
} | |
return fmt.Errorf("unable to decode message: %s", err.Err.Error()) | |
} | |
for _, pipeline := range pipelines { | |
pipelinesWaitGroup.Add(1) | |
pipeline.MessagesProcessed.Inc(1) | |
go func(p *interfaces.FunctionPipeline, wg *sync.WaitGroup, errCollector func(error)) { | |
startedAt := time.Now() | |
defer p.MessageProcessingTime.UpdateSince(startedAt) | |
defer wg.Done() | |
lc.Debugf("trigger sending message to pipeline %s (%s)", p.Id, envelope.CorrelationID) | |
childCtx, ok := ctx.Clone().(*appfunction.Context) | |
if !ok { | |
errCollector(fmt.Errorf("context received was not *appfunction.Context (%T)", childCtx)) | |
return | |
} | |
if msgErr := mp.serviceBinding.ProcessMessage(childCtx, targetData, p); msgErr != nil { | |
lc.Errorf("message error in pipeline %s (%s): %s", p.Id, envelope.CorrelationID, msgErr.Err.Error()) | |
errCollector(msgErr.Err) | |
} else { | |
if responseHandler != nil { | |
if outputErr := responseHandler(childCtx, p); outputErr != nil { | |
lc.Errorf("failed to process output for message '%s' on pipeline %s: %s", ctx.CorrelationID(), p.Id, outputErr.Error()) | |
errCollector(outputErr) | |
return | |
} | |
} | |
lc.Debugf("trigger successfully processed message '%s' in pipeline %s", p.Id, envelope.CorrelationID) | |
} | |
}(pipeline, &pipelinesWaitGroup, func(e error) { | |
errorCollectionLock.Lock() | |
defer errorCollectionLock.Unlock() | |
finalErr = multierror.Append(finalErr, e) | |
}) | |
} | |
pipelinesWaitGroup.Wait() | |
return finalErr | |
} |
In the method MssageReceived
within the triggerMessageProcessor
implementation, if no pipelines are found for the received topic, it would be more efficient to return early to avoid unnecessary allocations, message decoding, and WaitGroup usage.
Metadata
Metadata
Assignees
Labels
No labels