Skip to content

Commit 1c8da74

Browse files
authored
On suscribe avoid sleeping in transaction (#12)
1 parent daeef17 commit 1c8da74

File tree

1 file changed

+30
-24
lines changed

1 file changed

+30
-24
lines changed

pkg/sql/subscriber.go

Lines changed: 30 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,7 @@ func (s *Subscriber) consume(ctx context.Context, topic string, out chan *messag
173173
"consumer_group": s.config.ConsumerGroup,
174174
})
175175

176+
var sleepTime time.Duration = 0
176177
for {
177178
select {
178179
case <-s.closing:
@@ -183,19 +184,30 @@ func (s *Subscriber) consume(ctx context.Context, topic string, out chan *messag
183184
logger.Info("Stopping consume, context canceled", nil)
184185
return
185186

186-
default:
187-
// go on querying
187+
case <-time.After(sleepTime): // Wait if needed
188+
sleepTime = 0
189+
188190
}
189191

190-
messageUUID, err := s.query(ctx, topic, out, logger)
191-
if err != nil && isDeadlock(err) {
192-
logger.Debug("Deadlock during querying message, trying again", watermill.LogFields{
193-
"err": err.Error(),
194-
"message_uuid": messageUUID,
192+
messageUUID, noMsg, err := s.query(ctx, topic, out, logger)
193+
if err != nil {
194+
if isDeadlock(err) {
195+
logger.Debug("Deadlock during querying message, trying again", watermill.LogFields{
196+
"err": err.Error(),
197+
"message_uuid": messageUUID,
198+
})
199+
} else {
200+
logger.Error("Error querying for message", err, watermill.LogFields{
201+
"wait_time": s.config.RetryInterval,
202+
})
203+
sleepTime = s.config.RetryInterval
204+
}
205+
} else if noMsg {
206+
// wait until polling for the next message
207+
logger.Debug("No messages, waiting until next query", watermill.LogFields{
208+
"wait_time": s.config.PollInterval,
195209
})
196-
} else if err != nil {
197-
logger.Error("Error querying for message", err, nil)
198-
time.Sleep(s.config.RetryInterval)
210+
sleepTime = s.config.PollInterval
199211
}
200212
}
201213
}
@@ -205,19 +217,19 @@ func (s *Subscriber) query(
205217
topic string,
206218
out chan *message.Message,
207219
logger watermill.LoggerAdapter,
208-
) (messageUUID string, err error) {
220+
) (messageUUID string, noMsg bool, err error) {
209221
txOptions := &sql.TxOptions{
210222
Isolation: sql.LevelRepeatableRead,
211223
}
212224
tx, err := s.db.BeginTx(ctx, txOptions)
213225
if err != nil {
214-
return "", errors.Wrap(err, "could not begin tx for querying")
226+
return "", false, errors.Wrap(err, "could not begin tx for querying")
215227
}
216228

217229
defer func() {
218230
if err != nil {
219231
rollbackErr := tx.Rollback()
220-
if rollbackErr != nil {
232+
if rollbackErr != nil && rollbackErr != sql.ErrTxDone {
221233
logger.Error("could not rollback tx for querying message", rollbackErr, nil)
222234
}
223235
} else {
@@ -241,15 +253,9 @@ func (s *Subscriber) query(
241253

242254
offset, msg, err := s.config.SchemaAdapter.UnmarshalMessage(row)
243255
if errors.Cause(err) == sql.ErrNoRows {
244-
// wait until polling for the next message
245-
logger.Debug("No more messages, waiting until next query", watermill.LogFields{
246-
"wait_time": s.config.PollInterval,
247-
})
248-
tx.Rollback()
249-
time.Sleep(s.config.PollInterval)
250-
return "", nil
256+
return "", true, nil
251257
} else if err != nil {
252-
return "", errors.Wrap(err, "could not unmarshal message from query")
258+
return "", false, errors.Wrap(err, "could not unmarshal message from query")
253259
}
254260

255261
logger = logger.With(watermill.LogFields{
@@ -271,7 +277,7 @@ func (s *Subscriber) query(
271277

272278
_, err := tx.ExecContext(ctx, consumedQuery, consumedArgs...)
273279
if err != nil {
274-
return msg.UUID, errors.Wrap(err, "cannot send consumed query")
280+
return msg.UUID, false, errors.Wrap(err, "cannot send consumed query")
275281
}
276282
}
277283

@@ -286,7 +292,7 @@ func (s *Subscriber) query(
286292

287293
result, err := tx.ExecContext(ctx, ackQuery, ackArgs...)
288294
if err != nil {
289-
return msg.UUID, errors.Wrap(err, "could not get args for acking the message")
295+
return msg.UUID, false, errors.Wrap(err, "could not get args for acking the message")
290296
}
291297

292298
rowsAffected, _ := result.RowsAffected()
@@ -296,7 +302,7 @@ func (s *Subscriber) query(
296302
})
297303
}
298304

299-
return msg.UUID, nil
305+
return msg.UUID, false, nil
300306
}
301307

302308
// sendMessages sends messages on the output channel.

0 commit comments

Comments
 (0)