-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy pathmessageconverter.go
More file actions
115 lines (99 loc) · 3.44 KB
/
messageconverter.go
File metadata and controls
115 lines (99 loc) · 3.44 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
package gomessagestore
import (
"errors"
"strings"
"github.com/blackhatbrigade/gomessagestore/repository"
"github.com/blackhatbrigade/gomessagestore/uuid"
"github.com/sirupsen/logrus"
)
// MessageConverter is a function that takes in a MessageEnvelope and returns a Message; can be used to create custom messages
type MessageConverter func(*repository.MessageEnvelope) (Message, error)
// MsgEnvelopesToMessages converts envelopes to any number of different structs that impliment the Message interface
func MsgEnvelopesToMessages(msgEnvelopes []*repository.MessageEnvelope, converters ...MessageConverter) []Message {
myConverters := append(converters, defaultConverters()...)
messages := make([]Message, 0, len(msgEnvelopes))
for _, messageEnvelope := range msgEnvelopes {
if messageEnvelope == nil {
logrus.Error("Found a nil in the message envelope slice, can't transform to a message")
continue
}
for _, converter := range myConverters {
message, err := converter(messageEnvelope)
if message != nil && err == nil {
messages = append(messages, message)
break // only one successful conversion per envelope
}
}
}
return messages
}
// convertEnvelopeToCommand strips out data from a MessageEnvelope to form a Message of type command
func convertEnvelopeToCommand(messageEnvelope *repository.MessageEnvelope) (Message, error) {
streamStringParts := strings.SplitN(messageEnvelope.StreamName, "-", 2)
if len(streamStringParts) == 2 {
_, err := uuid.Parse(streamStringParts[1])
if err == nil {
if strings.HasSuffix(streamStringParts[0], ":command") {
category := strings.TrimSuffix(streamStringParts[0], ":command")
cmd := NewCommand(
messageEnvelope.ID,
messageEnvelope.EntityID,
category,
messageEnvelope.MessageType,
messageEnvelope.Data,
messageEnvelope.Metadata,
)
cmd.MessageVersion = messageEnvelope.Version
cmd.GlobalPosition = messageEnvelope.GlobalPosition
cmd.Time = messageEnvelope.Time
return cmd, nil
}
}
} else {
if strings.HasSuffix(streamStringParts[0], ":command") {
cmd := NewCommand(
messageEnvelope.ID,
messageEnvelope.EntityID,
strings.TrimSuffix(messageEnvelope.StreamName, ":command"),
messageEnvelope.MessageType,
messageEnvelope.Data,
messageEnvelope.Metadata,
)
cmd.MessageVersion = messageEnvelope.Version
cmd.GlobalPosition = messageEnvelope.GlobalPosition
cmd.Time = messageEnvelope.Time
return cmd, nil
}
}
return nil, errors.New("Failed converting Envelope to Command, moving on to next converter")
}
// convertEnvelopeToEvent strips out data from a MessageEnvelope to form a Message of type event
func convertEnvelopeToEvent(messageEnvelope *repository.MessageEnvelope) (Message, error) {
category := ""
var id uuid.UUID
cats := strings.SplitN(messageEnvelope.StreamName, "-", 2)
if len(cats) > 0 {
category = cats[0]
if len(cats) == 2 {
id, _ = uuid.Parse(cats[1]) // errors on parsing just leave entityID blank
}
}
evt := NewEvent(
messageEnvelope.ID,
id,
category,
messageEnvelope.MessageType,
messageEnvelope.Data,
messageEnvelope.Metadata,
)
evt.MessageVersion = messageEnvelope.Version
evt.GlobalPosition = messageEnvelope.GlobalPosition
evt.Time = messageEnvelope.Time
return evt, nil
}
func defaultConverters() []MessageConverter {
return []MessageConverter{
convertEnvelopeToCommand,
convertEnvelopeToEvent, // always run this one last, as it always passes
}
}