forked from linuxfoundation/lfx-v2-auth-service
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmessage_handler.go
More file actions
83 lines (70 loc) · 2.89 KB
/
message_handler.go
File metadata and controls
83 lines (70 loc) · 2.89 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
// Copyright The Linux Foundation and each contributor to LFX.
// SPDX-License-Identifier: MIT
package service
import (
"context"
"encoding/json"
"log/slog"
"github.com/linuxfoundation/lfx-v2-auth-service/internal/domain/port"
"github.com/linuxfoundation/lfx-v2-auth-service/pkg/constants"
"github.com/linuxfoundation/lfx-v2-auth-service/pkg/log"
)
// MessageHandlerService handles NATS messages using the service layer
type MessageHandlerService struct {
messageHandler port.MessageHandler
}
// HandleMessage routes NATS messages to appropriate handlers
func (mhs *MessageHandlerService) HandleMessage(ctx context.Context, msg port.TransportMessenger) {
subject := msg.Subject()
ctx = log.AppendCtx(ctx, slog.String("subject", subject))
slog.DebugContext(ctx, "handling NATS message")
handlers := map[string]func(ctx context.Context, msg port.TransportMessenger) ([]byte, error){
// user read/write operations
constants.UserMetadataUpdateSubject: mhs.messageHandler.UpdateUser,
constants.UserMetadataReadSubject: mhs.messageHandler.GetUserMetadata,
constants.UserEmailReadSubject: mhs.messageHandler.GetUserEmails,
// lookup operations
constants.UserEmailToUserSubject: mhs.messageHandler.EmailToUsername,
constants.UserEmailToSubSubject: mhs.messageHandler.EmailToSub,
// email linking operations
constants.EmailLinkingSendVerificationSubject: mhs.messageHandler.StartEmailLinking,
constants.EmailLinkingVerifySubject: mhs.messageHandler.VerifyEmailLinking,
// identity linking/unlinking/listing operations
constants.UserIdentityLinkSubject: mhs.messageHandler.LinkIdentity,
constants.UserIdentityUnlinkSubject: mhs.messageHandler.UnlinkIdentity,
constants.UserIdentityListSubject: mhs.messageHandler.ListIdentities,
}
handler, ok := handlers[subject]
if !ok {
slog.WarnContext(ctx, "unknown subject")
mhs.respondWithError(ctx, msg, "unknown subject")
return
}
response, errHandler := handler(ctx, msg)
if errHandler != nil {
slog.ErrorContext(ctx, "error handling message",
"error", errHandler,
"subject", subject,
)
mhs.respondWithError(ctx, msg, errHandler.Error())
return
}
errRespond := msg.Respond(response)
if errRespond != nil {
slog.ErrorContext(ctx, "error responding to NATS message", "error", errRespond)
return
}
slog.DebugContext(ctx, "responded to NATS message", "response", string(response))
}
func (mhs *MessageHandlerService) respondWithError(ctx context.Context, msg port.TransportMessenger, errorMsg string) {
payload, _ := json.Marshal(map[string]string{"error": errorMsg})
if err := msg.Respond(payload); err != nil {
slog.ErrorContext(ctx, "failed to send error response", "error", err)
}
}
// NewMessageHandlerService creates a new message handler service
func NewMessageHandlerService(messageHandler port.MessageHandler) *MessageHandlerService {
return &MessageHandlerService{
messageHandler: messageHandler,
}
}