Skip to content

Commit 4e6b4e7

Browse files
committed
[push] add send retries
1 parent 7f35b01 commit 4e6b4e7

File tree

8 files changed

+159
-38
lines changed

8 files changed

+159
-38
lines changed

internal/sms-gateway/handlers/upstream.go

+5-5
Original file line numberDiff line numberDiff line change
@@ -68,12 +68,12 @@ func (h *upstreamHandler) postPush(c *fiber.Ctx) error {
6868
return err
6969
}
7070

71-
event := push.Event{
72-
Event: anys.ZeroDefault(v.Event, smsgateway.PushMessageEnqueued),
73-
Data: v.Data,
74-
}
71+
event := push.NewEvent(
72+
anys.ZeroDefault(v.Event, smsgateway.PushMessageEnqueued),
73+
v.Data,
74+
)
7575

76-
if err := h.pushSvc.Enqueue(v.Token, &event); err != nil {
76+
if err := h.pushSvc.Enqueue(v.Token, event); err != nil {
7777
h.Logger.Error("Can't push message", zap.Error(err))
7878
}
7979
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
package push
2+
3+
import "time"
4+
5+
const (
6+
maxRetries = 3
7+
blacklistTimeout = 15 * time.Minute
8+
)
9+
10+
type RetryOutcome string
11+
12+
const (
13+
RetryOutcomeRetried RetryOutcome = "retried"
14+
RetryOutcomeMaxAttempts RetryOutcome = "max_attempts"
15+
)
16+
17+
type BlacklistOperation string
18+
19+
const (
20+
BlacklistOperationAdded BlacklistOperation = "added"
21+
BlacklistOperationSkipped BlacklistOperation = "skipped"
22+
)

internal/sms-gateway/modules/push/domain/events.go

+14-6
Original file line numberDiff line numberDiff line change
@@ -7,22 +7,30 @@ import (
77
)
88

99
type Event struct {
10-
Event smsgateway.PushEventType
11-
Data map[string]string
10+
event smsgateway.PushEventType
11+
data map[string]string
12+
}
13+
14+
func (e *Event) Event() smsgateway.PushEventType {
15+
return e.event
16+
}
17+
18+
func (e *Event) Data() map[string]string {
19+
return e.data
1220
}
1321

1422
func (e *Event) Map() map[string]string {
15-
json, _ := json.Marshal(e.Data)
23+
json, _ := json.Marshal(e.data)
1624

1725
return map[string]string{
18-
"event": string(e.Event),
26+
"event": string(e.event),
1927
"data": string(json),
2028
}
2129
}
2230

2331
func NewEvent(event smsgateway.PushEventType, data map[string]string) *Event {
2432
return &Event{
25-
Event: event,
26-
Data: data,
33+
event: event,
34+
data: data,
2735
}
2836
}

internal/sms-gateway/modules/push/fcm/client.go

+4-5
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package fcm
22

33
import (
44
"context"
5-
"errors"
65
"fmt"
76
"sync"
87

@@ -53,8 +52,8 @@ func (c *Client) Open(ctx context.Context) error {
5352
return nil
5453
}
5554

56-
func (c *Client) Send(ctx context.Context, messages map[string]domain.Event) error {
57-
errs := make([]error, 0, len(messages))
55+
func (c *Client) Send(ctx context.Context, messages map[string]domain.Event) (map[string]error, error) {
56+
errs := make(map[string]error, len(messages))
5857
for address, payload := range messages {
5958
_, err := c.client.Send(ctx, &messaging.Message{
6059
Data: payload.Map(),
@@ -65,11 +64,11 @@ func (c *Client) Send(ctx context.Context, messages map[string]domain.Event) err
6564
})
6665

6766
if err != nil {
68-
errs = append(errs, fmt.Errorf("can't send message to %s: %w", address, err))
67+
errs[address] = fmt.Errorf("can't send message to %s: %w", address, err)
6968
}
7069
}
7170

72-
return errors.Join(errs...)
71+
return errs, nil
7372
}
7473

7574
func (c *Client) Close(ctx context.Context) error {

internal/sms-gateway/modules/push/service.go

+88-12
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77

88
"github.com/android-sms-gateway/server/internal/sms-gateway/modules/push/domain"
99
"github.com/capcom6/go-helpers/cache"
10+
"github.com/capcom6/go-helpers/maps"
1011

1112
"github.com/prometheus/client_golang/prometheus"
1213
"github.com/prometheus/client_golang/prometheus/promauto"
@@ -38,9 +39,12 @@ type Service struct {
3839

3940
client client
4041

41-
cache *cache.Cache[domain.Event]
42+
cache *cache.Cache[eventWrapper]
43+
blacklist *cache.Cache[struct{}]
4244

43-
enqueuedCounter *prometheus.CounterVec
45+
enqueuedCounter *prometheus.CounterVec
46+
retriesCounter *prometheus.CounterVec
47+
blacklistCounter *prometheus.CounterVec
4448

4549
logger *zap.Logger
4650
}
@@ -60,12 +64,34 @@ func New(params Params) *Service {
6064
Help: "Total number of messages enqueued",
6165
}, []string{"event"})
6266

67+
retriesCounter := promauto.NewCounterVec(prometheus.CounterOpts{
68+
Namespace: "sms",
69+
Subsystem: "push",
70+
Name: "retries_total",
71+
Help: "Total retry attempts",
72+
}, []string{"outcome"})
73+
74+
blacklistCounter := promauto.NewCounterVec(prometheus.CounterOpts{
75+
Namespace: "sms",
76+
Subsystem: "push",
77+
Name: "blacklist_total",
78+
Help: "Blacklist operations",
79+
}, []string{"operation"})
80+
6381
return &Service{
64-
config: params.Config,
65-
client: params.Client,
66-
cache: cache.New[domain.Event](cache.Config{}),
67-
enqueuedCounter: enqueuedCounter,
68-
logger: params.Logger,
82+
config: params.Config,
83+
client: params.Client,
84+
85+
cache: cache.New[eventWrapper](cache.Config{}),
86+
blacklist: cache.New[struct{}](cache.Config{
87+
TTL: blacklistTimeout,
88+
}),
89+
90+
enqueuedCounter: enqueuedCounter,
91+
retriesCounter: retriesCounter,
92+
blacklistCounter: blacklistCounter,
93+
94+
logger: params.Logger,
6995
}
7096
}
7197

@@ -86,11 +112,23 @@ func (s *Service) Run(ctx context.Context) {
86112

87113
// Enqueue adds the data to the cache and immediately sends all messages if the debounce is 0.
88114
func (s *Service) Enqueue(token string, event *domain.Event) error {
89-
if err := s.cache.Set(token, *event); err != nil {
115+
if _, err := s.blacklist.Get(token); err == nil {
116+
s.blacklistCounter.WithLabelValues(string(BlacklistOperationSkipped)).Inc()
117+
s.logger.Debug("Skipping blacklisted token", zap.String("token", token))
118+
return nil
119+
}
120+
121+
wrapper := eventWrapper{
122+
token: token,
123+
event: event,
124+
retries: 0,
125+
}
126+
127+
if err := s.cache.Set(token, wrapper); err != nil {
90128
return fmt.Errorf("can't add message to cache: %w", err)
91129
}
92130

93-
s.enqueuedCounter.WithLabelValues(string(event.Event)).Inc()
131+
s.enqueuedCounter.WithLabelValues(string(event.Event())).Inc()
94132

95133
return nil
96134
}
@@ -102,10 +140,48 @@ func (s *Service) sendAll(ctx context.Context) {
102140
return
103141
}
104142

105-
s.logger.Info("Sending messages", zap.Int("count", len(targets)))
143+
messages := maps.MapValues(targets, func(w eventWrapper) domain.Event {
144+
return *w.event
145+
})
146+
147+
s.logger.Info("Sending messages", zap.Int("count", len(messages)))
106148
ctx, cancel := context.WithTimeout(ctx, s.config.Timeout)
107-
if err := s.client.Send(ctx, targets); err != nil {
149+
defer cancel()
150+
151+
errs, err := s.client.Send(ctx, messages)
152+
if len(errs) == 0 && err == nil {
153+
s.logger.Info("Messages sent successfully", zap.Int("count", len(messages)))
154+
return
155+
}
156+
157+
if err != nil {
108158
s.logger.Error("Can't send messages", zap.Error(err))
159+
return
160+
}
161+
162+
for token, sendErr := range errs {
163+
s.logger.Error("Can't send message", zap.Error(sendErr), zap.String("token", token))
164+
165+
wrapper := targets[token]
166+
wrapper.retries++
167+
168+
if wrapper.retries >= maxRetries {
169+
if err := s.blacklist.Set(token, struct{}{}); err != nil {
170+
s.logger.Warn("Can't add to blacklist", zap.String("token", token), zap.Error(err))
171+
}
172+
173+
s.blacklistCounter.WithLabelValues(string(BlacklistOperationAdded)).Inc()
174+
s.retriesCounter.WithLabelValues(string(RetryOutcomeMaxAttempts)).Inc()
175+
s.logger.Warn("Retries exceeded, blacklisting token",
176+
zap.String("token", token),
177+
zap.Duration("ttl", blacklistTimeout))
178+
continue
179+
}
180+
181+
if setErr := s.cache.SetOrFail(token, wrapper); setErr != nil {
182+
s.logger.Info("Can't set message to cache", zap.Error(setErr))
183+
}
184+
185+
s.retriesCounter.WithLabelValues(string(RetryOutcomeRetried)).Inc()
109186
}
110-
cancel()
111187
}

internal/sms-gateway/modules/push/types.go

+9-1
Original file line numberDiff line numberDiff line change
@@ -11,17 +11,25 @@ import (
1111
type Mode string
1212
type Event = domain.Event
1313

14+
var NewEvent = domain.NewEvent
15+
1416
const (
1517
ModeFCM Mode = "fcm"
1618
ModeUpstream Mode = "upstream"
1719
)
1820

1921
type client interface {
2022
Open(ctx context.Context) error
21-
Send(ctx context.Context, messages map[string]domain.Event) error
23+
Send(ctx context.Context, messages map[string]domain.Event) (map[string]error, error)
2224
Close(ctx context.Context) error
2325
}
2426

27+
type eventWrapper struct {
28+
token string
29+
event *domain.Event
30+
retries int
31+
}
32+
2533
func NewMessageEnqueuedEvent() *domain.Event {
2634
return domain.NewEvent(smsgateway.PushMessageEnqueued, nil)
2735
}

internal/sms-gateway/modules/push/upstream/client.go

+16-8
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111

1212
"github.com/android-sms-gateway/client-go/smsgateway"
1313
"github.com/android-sms-gateway/server/internal/sms-gateway/modules/push/domain"
14+
"github.com/capcom6/go-helpers/maps"
1415
)
1516

1617
const BASE_URL = "https://api.sms-gate.app/upstream/v1"
@@ -41,33 +42,34 @@ func (c *Client) Open(ctx context.Context) error {
4142
return nil
4243
}
4344

44-
func (c *Client) Send(ctx context.Context, messages map[string]domain.Event) error {
45+
func (c *Client) Send(ctx context.Context, messages map[string]domain.Event) (map[string]error, error) {
4546
payload := make(smsgateway.UpstreamPushRequest, 0, len(messages))
4647

4748
for address, data := range messages {
4849
payload = append(payload, smsgateway.PushNotification{
4950
Token: address,
50-
Event: data.Event,
51-
Data: data.Data,
51+
Event: data.Event(),
52+
Data: data.Data(),
5253
})
5354
}
5455

5556
payloadBytes, err := json.Marshal(payload)
57+
5658
if err != nil {
57-
return fmt.Errorf("can't marshal payload: %w", err)
59+
return nil, fmt.Errorf("can't marshal payload: %w", err)
5860
}
5961

6062
req, err := http.NewRequestWithContext(ctx, http.MethodPost, BASE_URL+"/push", bytes.NewReader(payloadBytes))
6163
if err != nil {
62-
return fmt.Errorf("can't create request: %w", err)
64+
return nil, fmt.Errorf("can't create request: %w", err)
6365
}
6466

6567
req.Header.Set("Content-Type", "application/json")
6668
req.Header.Set("User-Agent", "android-sms-gateway/1.x (server; golang)")
6769

6870
resp, err := c.client.Do(req)
6971
if err != nil {
70-
return fmt.Errorf("can't send request: %w", err)
72+
return c.mapErrors(messages, fmt.Errorf("can't send request: %w", err)), nil
7173
}
7274

7375
defer func() {
@@ -76,10 +78,16 @@ func (c *Client) Send(ctx context.Context, messages map[string]domain.Event) err
7678
}()
7779

7880
if resp.StatusCode >= 400 {
79-
return fmt.Errorf("unexpected status code: %d", resp.StatusCode)
81+
return c.mapErrors(messages, fmt.Errorf("unexpected status code: %d", resp.StatusCode)), nil
8082
}
8183

82-
return nil
84+
return nil, nil
85+
}
86+
87+
func (c *Client) mapErrors(messages map[string]domain.Event, err error) map[string]error {
88+
return maps.MapValues(messages, func(e domain.Event) error {
89+
return err
90+
})
8391
}
8492

8593
func (c *Client) Close(ctx context.Context) error {

pkg/swagger/docs/requests.http

+1-1
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ Authorization: Basic {{credentials}}
2121
"{{phone}}"
2222
],
2323
"withDeliveryReport": true,
24-
"priority": 128,
24+
"priority": 127,
2525
"simNumber": {{$randomInt 1 2}}
2626
}
2727

0 commit comments

Comments
 (0)