Skip to content

Commit 0f9541b

Browse files
committed
Fix multiple LoRa Server instances processing same uplink data.
Closes #85.
1 parent 26feb5a commit 0f9541b

File tree

6 files changed

+111
-20
lines changed

6 files changed

+111
-20
lines changed

cmd/loraserver/main.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ func mustGetContext(netID lorawan.NetID, c *cli.Context) common.Context {
115115
rp := common.NewRedisPool(c.String("redis-url"))
116116

117117
// setup gateway backend
118-
gw, err := gateway.NewBackend(c.String("gw-mqtt-server"), c.String("gw-mqtt-username"), c.String("gw-mqtt-password"))
118+
gw, err := gateway.NewBackend(rp, c.String("gw-mqtt-server"), c.String("gw-mqtt-username"), c.String("gw-mqtt-password"))
119119
if err != nil {
120120
log.Fatalf("gateway-backend setup failed: %s", err)
121121
}

docs/changelog.md

+6
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,11 @@
11
# Changelog
22

3+
## 0.12.1
4+
5+
* Fix multiple LoRa Server instances processing the same gateway payloads
6+
(resulting in the gateway count multiplied by the number of LoRa Server
7+
instances).
8+
39
## 0.12.0
410

511
This release decouples the node "inventory" part from LoRa Server. This

internal/backend/gateway/backend.go

+28-1
Original file line numberDiff line numberDiff line change
@@ -11,21 +11,25 @@ import (
1111
"github.com/brocaar/loraserver/api/gw"
1212
"github.com/brocaar/loraserver/internal/backend"
1313
"github.com/eclipse/paho.mqtt.golang"
14+
"github.com/garyburd/redigo/redis"
1415
)
1516

1617
const rxTopic = "gateway/+/rx"
18+
const uplinkLockTTL = time.Millisecond * 500
1719

1820
// Backend implements a MQTT pub-sub backend.
1921
type Backend struct {
2022
conn mqtt.Client
2123
rxPacketChan chan gw.RXPacket
2224
wg sync.WaitGroup
25+
redisPool *redis.Pool
2326
}
2427

2528
// NewBackend creates a new Backend.
26-
func NewBackend(server, username, password string) (backend.Gateway, error) {
29+
func NewBackend(p *redis.Pool, server, username, password string) (backend.Gateway, error) {
2730
b := Backend{
2831
rxPacketChan: make(chan gw.RXPacket),
32+
redisPool: p,
2933
}
3034

3135
opts := mqtt.NewClientOptions()
@@ -95,6 +99,29 @@ func (b *Backend) rxPacketHandler(c mqtt.Client, msg mqtt.Message) {
9599
return
96100
}
97101

102+
// Since with MQTT all subscribers will receive the uplink messages sent
103+
// by all the gatewyas, the first instance receiving the message must lock it,
104+
// so that other instances can ignore the same message (from the same gw).
105+
// As an unique id, the gw mac + base64 encoded payload is used. This is because
106+
// we can't trust any of the data, as the MIC hasn't been validated yet.
107+
strB, err := rxPacket.PHYPayload.MarshalText()
108+
if err != nil {
109+
log.Errorf("backend/gateway: marshal text error: %s", err)
110+
}
111+
key := fmt.Sprintf("lora:ns:uplink:lock:%s:%s", rxPacket.RXInfo.MAC, string(strB))
112+
redisConn := b.redisPool.Get()
113+
defer redisConn.Close()
114+
115+
_, err = redis.String(redisConn.Do("SET", key, "lock", "PX", int64(uplinkLockTTL/time.Millisecond), "NX"))
116+
if err != nil {
117+
if err == redis.ErrNil {
118+
// the payload is already being processed by an other instance
119+
return
120+
}
121+
log.Errorf("backend/gateway:: acquire uplink payload lock error: %s", err)
122+
return
123+
}
124+
98125
b.rxPacketChan <- rxPacket
99126
}
100127

internal/backend/gateway/backend_test.go

+68-16
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,16 @@ import (
66
"time"
77

88
"github.com/brocaar/loraserver/api/gw"
9+
"github.com/brocaar/loraserver/internal/common"
10+
"github.com/brocaar/loraserver/internal/test"
911
"github.com/brocaar/lorawan"
1012
"github.com/eclipse/paho.mqtt.golang"
1113
. "github.com/smartystreets/goconvey/convey"
1214
)
1315

1416
func TestBackend(t *testing.T) {
1517
conf := getConfig()
18+
r := common.NewRedisPool(conf.RedisURL)
1619

1720
Convey("Given a MQTT client", t, func() {
1821
opts := mqtt.NewClientOptions().AddBroker(conf.Server).SetUsername(conf.Username).SetPassword(conf.Password)
@@ -22,7 +25,8 @@ func TestBackend(t *testing.T) {
2225
So(token.Error(), ShouldBeNil)
2326

2427
Convey("Given a new Backend", func() {
25-
backend, err := NewBackend(conf.Server, conf.Username, conf.Password)
28+
test.MustFlushRedis(r)
29+
backend, err := NewBackend(r, conf.Server, conf.Username, conf.Password)
2630
So(err, ShouldBeNil)
2731
defer backend.Close()
2832
time.Sleep(time.Millisecond * 100) // give the backend some time to subscribe to the topic
@@ -39,7 +43,7 @@ func TestBackend(t *testing.T) {
3943
token.Wait()
4044
So(token.Error(), ShouldBeNil)
4145

42-
Convey("When sending a TXPacket (from the Backend)", func() {
46+
Convey("Given a TXPacket", func() {
4347
txPacket := gw.TXPacket{
4448
TXInfo: gw.TXInfo{
4549
MAC: [8]byte{1, 2, 3, 4, 5, 6, 7, 8},
@@ -52,15 +56,19 @@ func TestBackend(t *testing.T) {
5256
MACPayload: &lorawan.MACPayload{},
5357
},
5458
}
55-
So(backend.SendTXPacket(txPacket), ShouldBeNil)
5659

57-
Convey("Then the same packet is consumed by the MQTT client", func() {
58-
packet := <-txPacketChan
59-
So(packet, ShouldResemble, txPacket)
60+
Convey("When sending it from the backend", func() {
61+
So(backend.SendTXPacket(txPacket), ShouldBeNil)
62+
63+
Convey("Then the same packet has been received", func() {
64+
packet := <-txPacketChan
65+
So(packet, ShouldResemble, txPacket)
66+
})
6067
})
68+
6169
})
6270

63-
Convey("When sending a RXPacket (from the MQTT client)", func() {
71+
Convey("Given an RXPacket", func() {
6472
rxPacket := gw.RXPacket{
6573
RXInfo: gw.RXInfo{
6674
Time: time.Now().UTC(),
@@ -74,15 +82,59 @@ func TestBackend(t *testing.T) {
7482
MACPayload: &lorawan.MACPayload{},
7583
},
7684
}
77-
b, err := json.Marshal(rxPacket)
78-
So(err, ShouldBeNil)
79-
token := c.Publish("gateway/0102030405060708/rx", 0, false, b)
80-
token.Wait()
81-
So(token.Error(), ShouldBeNil)
82-
83-
Convey("Then the same packet is consumed by the backend", func() {
84-
packet := <-backend.RXPacketChan()
85-
So(packet, ShouldResemble, rxPacket)
85+
86+
Convey("When sending it once", func() {
87+
b, err := json.Marshal(rxPacket)
88+
So(err, ShouldBeNil)
89+
token := c.Publish("gateway/0102030405060708/rx", 0, false, b)
90+
token.Wait()
91+
So(token.Error(), ShouldBeNil)
92+
93+
Convey("Then the same packet is consumed by the backend", func() {
94+
packet := <-backend.RXPacketChan()
95+
So(packet, ShouldResemble, rxPacket)
96+
})
97+
})
98+
99+
Convey("When sending it twice with the same MAC", func() {
100+
b, err := json.Marshal(rxPacket)
101+
So(err, ShouldBeNil)
102+
token := c.Publish("gateway/0102030405060708/rx", 0, false, b)
103+
token.Wait()
104+
So(token.Error(), ShouldBeNil)
105+
token = c.Publish("gateway/0102030405060708/rx", 0, false, b)
106+
token.Wait()
107+
So(token.Error(), ShouldBeNil)
108+
109+
Convey("Then it is received only once", func() {
110+
<-backend.RXPacketChan()
111+
112+
var received bool
113+
select {
114+
case <-backend.RXPacketChan():
115+
received = true
116+
case <-time.After(time.Millisecond * 100):
117+
}
118+
So(received, ShouldBeFalse)
119+
})
120+
})
121+
122+
Convey("When sending it twice with different MACs", func() {
123+
b, err := json.Marshal(rxPacket)
124+
So(err, ShouldBeNil)
125+
token := c.Publish("gateway/0102030405060708/rx", 0, false, b)
126+
token.Wait()
127+
128+
rxPacket.RXInfo.MAC = [8]byte{8, 7, 6, 5, 4, 3, 2, 1}
129+
b, err = json.Marshal(rxPacket)
130+
So(err, ShouldBeNil)
131+
token = c.Publish("gateway/0102030405060708/rx", 0, false, b)
132+
token.Wait()
133+
134+
Convey("Then it is received twice", func() {
135+
<-backend.RXPacketChan()
136+
<-backend.RXPacketChan()
137+
})
86138
})
87139
})
88140
})

internal/backend/gateway/base_test.go

+7-1
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,13 @@ type config struct {
1414
Server string
1515
Username string
1616
Password string
17+
RedisURL string
1718
}
1819

1920
func getConfig() *config {
2021
c := &config{
21-
Server: "tcp://127.0.0.1:1883",
22+
Server: "tcp://127.0.0.1:1883",
23+
RedisURL: "redis://localhost:6379",
2224
}
2325

2426
if v := os.Getenv("TEST_MQTT_SERVER"); v != "" {
@@ -33,5 +35,9 @@ func getConfig() *config {
3335
c.Password = v
3436
}
3537

38+
if v := os.Getenv("TEST_REDIS_URL"); v != "" {
39+
c.RedisURL = v
40+
}
41+
3642
return c
3743
}

mkdocs.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ pages:
1313
- changelog.md
1414

1515
extra:
16-
version: '0.12.0'
16+
version: '0.12.1'
1717
github:
1818
download_release: true
1919

0 commit comments

Comments
 (0)