Skip to content

Commit c4baba8

Browse files
Handling messages concurrently
1 parent 1066aec commit c4baba8

File tree

1 file changed

+60
-37
lines changed

1 file changed

+60
-37
lines changed

pulsar/trigger/subscriber/trigger.go

+60-37
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ func init() {
2121
type Trigger struct {
2222
client pulsar.Client
2323
handlers []*Handler
24+
logger log.Logger
2425
}
2526
type Handler struct {
2627
handler trigger.Handler
@@ -31,8 +32,6 @@ type Handler struct {
3132
type Factory struct {
3233
}
3334

34-
var logger log.Logger
35-
3635
func (*Factory) New(config *trigger.Config) (trigger.Trigger, error) {
3736
s := &Settings{}
3837
err := metadata.MapToStruct(config.Settings, s, true)
@@ -57,7 +56,7 @@ func (t *Trigger) Metadata() *trigger.Metadata {
5756
}
5857

5958
func (t *Trigger) Initialize(ctx trigger.InitContext) error {
60-
logger = ctx.Logger()
59+
t.logger = ctx.Logger()
6160
// Init handlers
6261
for _, handler := range ctx.GetHandlers() {
6362

@@ -95,6 +94,7 @@ func (t *Trigger) Initialize(ctx trigger.InitContext) error {
9594
consumeroptions.SubscriptionInitialPosition = pulsar.SubscriptionPositionEarliest
9695
}
9796

97+
consumeroptions.MessageChannel = make(chan pulsar.ConsumerMessage)
9898
consumer, err := t.client.Subscribe(consumeroptions)
9999
if err != nil {
100100
return err
@@ -107,66 +107,89 @@ func (t *Trigger) Initialize(ctx trigger.InitContext) error {
107107

108108
// Start implements util.Managed.Start
109109
func (t *Trigger) Start() error {
110+
t.logger.Info("Starting Trigger")
110111
for _, handler := range t.handlers {
111112
go handler.consume()
112113
}
114+
t.logger.Info("Trigger Started")
113115
return nil
114116
}
115117

116118
// Stop implements util.Managed.Stop
117119
func (t *Trigger) Stop() error {
118-
logger.Info("Stopping Trigger")
120+
t.logger.Info("Stopping Trigger")
119121
for _, handler := range t.handlers {
120122
// Stop polling
121123
handler.done <- true
122124
handler.consumer.Close()
123125
}
126+
t.logger.Info("Trigger Stopped")
127+
return nil
128+
}
129+
130+
func (t *Trigger) Resume() error {
131+
t.logger.Info("Resuming Trigger")
132+
err := t.Start()
133+
if err == nil {
134+
t.logger.Info("Trigger Resumed")
135+
}
136+
return err
137+
}
138+
139+
func (t *Trigger) Pause() error {
140+
for _, handler := range t.handlers {
141+
handler.done <- true
142+
}
143+
t.logger.Info("Trigger Paused")
124144
return nil
125145
}
126146

127147
func (handler *Handler) consume() {
148+
handler.handler.Logger().Info("Pulsar Message consumer is started")
149+
defer handler.handler.Logger().Info("Pulsar Message consumer is stopped")
128150
for {
129151
select {
130152
case msg, ok := <-handler.consumer.Chan():
131153
if !ok {
132-
logger.Error("Error while recieveing message")
133-
time.Sleep(5 * time.Second)
154+
handler.handler.Logger().Error("Error while receiving message")
155+
time.Sleep(1 * time.Second)
134156
continue
135157
}
136-
out := &Output{}
137-
if handler.handler.Settings()["format"] != nil &&
138-
handler.handler.Settings()["format"].(string) == "JSON" {
139-
var obj interface{}
140-
err := json.Unmarshal(msg.Payload(), &obj)
141-
if err != nil {
142-
logger.Errorf("Pulsar consumer, configured to receive JSON formatted messages, was unable to parse message: [%v]", msg.Payload())
143-
handler.consumer.Nack(msg)
144-
time.Sleep(5 * time.Second)
145-
continue
146-
} else {
147-
out.Payload = obj
148-
}
149-
} else {
150-
out.Payload = string(msg.Payload())
151-
}
152-
out.Properties = msg.Properties()
153-
out.Topic = msg.Topic()
154-
logger.Debugf("Message recieved [%v]", out.Payload)
155-
// Do something with the message
156-
_, err := handler.handler.Handle(context.Background(), out)
157-
if err == nil {
158-
// Message processed successfully
159-
handler.consumer.Ack(msg)
160-
} else {
161-
// Failed to process messages
162-
handler.consumer.Nack(msg)
163-
time.Sleep(2 * time.Second)
164-
}
158+
// Handle messages concurrently on separate goroutine
159+
go handler.handleMessage(msg)
165160
case <-handler.done:
166-
logger.Info("Stopping Message Consumer")
167161
return
168162
}
169-
170163
}
164+
}
171165

166+
func (handler *Handler) handleMessage(msg pulsar.ConsumerMessage) {
167+
handler.handler.Logger().Debugf("Message received - %s", string(msg.ID().Serialize()))
168+
out := &Output{}
169+
if handler.handler.Settings()["format"] != nil &&
170+
handler.handler.Settings()["format"].(string) == "JSON" {
171+
var obj interface{}
172+
err := json.Unmarshal(msg.Payload(), &obj)
173+
if err != nil {
174+
handler.handler.Logger().Errorf("Pulsar consumer, configured to receive JSON formatted messages, was unable to parse message: [%v]", msg.Payload())
175+
handler.consumer.Nack(msg)
176+
return
177+
} else {
178+
out.Payload = obj
179+
}
180+
} else {
181+
out.Payload = string(msg.Payload())
182+
}
183+
out.Properties = msg.Properties()
184+
out.Topic = msg.Topic()
185+
handler.handler.Logger().Debugf("Message received [%v]", out.Payload)
186+
// Do something with the message
187+
_, err := handler.handler.Handle(context.Background(), out)
188+
if err == nil {
189+
// Message processed successfully
190+
handler.consumer.Ack(msg)
191+
} else {
192+
// Failed to process messages
193+
handler.consumer.Nack(msg)
194+
}
172195
}

0 commit comments

Comments
 (0)