Skip to content

Commit e0a749b

Browse files
authored
Merge branch 'master' into jazzz/fix_example_link
2 parents 5582c4c + 5dea6d3 commit e0a749b

File tree

3 files changed

+20
-16
lines changed

3 files changed

+20
-16
lines changed

waku/v2/protocol/relay/subscription.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,9 @@ package relay
33
import (
44
"context"
55

6-
"github.com/waku-org/go-waku/waku/v2/protocol"
76
"golang.org/x/exp/slices"
7+
8+
"github.com/waku-org/go-waku/waku/v2/protocol"
89
)
910

1011
// Subscription handles the details of a particular Topic subscription. There may be many subscriptions for a given topic.
@@ -55,3 +56,7 @@ func NewSubscription(contentFilter protocol.ContentFilter) *Subscription {
5556
subType: subType,
5657
}
5758
}
59+
60+
func (s *Subscription) ContentFilter() protocol.ContentFilter {
61+
return s.contentFilter
62+
}

waku/v2/protocol/relay/waku_relay.go

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,10 @@ import (
1212
"github.com/libp2p/go-libp2p/p2p/host/eventbus"
1313
"github.com/prometheus/client_golang/prometheus"
1414
"go.uber.org/zap"
15-
proto "google.golang.org/protobuf/proto"
15+
"google.golang.org/protobuf/proto"
1616

1717
pubsub "github.com/libp2p/go-libp2p-pubsub"
18+
1819
"github.com/waku-org/go-waku/logging"
1920
wps "github.com/waku-org/go-waku/waku/v2/peerstore"
2021
waku_proto "github.com/waku-org/go-waku/waku/v2/protocol"
@@ -388,7 +389,7 @@ func (w *WakuRelay) EnoughPeersToPublishToTopic(topic string) bool {
388389
}
389390

390391
// subscribe returns list of Subscription to receive messages based on content filter
391-
func (w *WakuRelay) subscribe(ctx context.Context, contentFilter waku_proto.ContentFilter, opts ...RelaySubscribeOption) ([]*Subscription, error) {
392+
func (w *WakuRelay) subscribe(contentFilter waku_proto.ContentFilter, opts ...RelaySubscribeOption) ([]*Subscription, error) {
392393

393394
var subscriptions []*Subscription
394395
pubSubTopicMap, err := waku_proto.ContentFilterToPubSubTopicMap(contentFilter)
@@ -438,23 +439,20 @@ func (w *WakuRelay) subscribe(ctx context.Context, contentFilter waku_proto.Cont
438439
w.topicsMutex.Unlock()
439440

440441
subscriptions = append(subscriptions, subscription)
441-
go func() {
442-
defer utils.LogOnPanic()
443-
<-ctx.Done()
444-
subscription.Unsubscribe()
445-
}()
446442
}
447443

448444
return subscriptions, nil
449445
}
450446

451447
// Subscribe returns a Subscription to receive messages as per contentFilter
452448
// contentFilter can contain pubSubTopic and contentTopics or only contentTopics(in case of autosharding)
449+
// ctx argument is ignored and left for compatibility.
453450
func (w *WakuRelay) Subscribe(ctx context.Context, contentFilter waku_proto.ContentFilter, opts ...RelaySubscribeOption) ([]*Subscription, error) {
454-
return w.subscribe(ctx, contentFilter, opts...)
451+
return w.subscribe(contentFilter, opts...)
455452
}
456453

457454
// Unsubscribe closes a subscription to a pubsub topic
455+
// ctx argument is ignored and left for compatibility.
458456
func (w *WakuRelay) Unsubscribe(ctx context.Context, contentFilter waku_proto.ContentFilter) error {
459457

460458
pubSubTopicMap, err := waku_proto.ContentFilterToPubSubTopicMap(contentFilter)

waku/v2/protocol/relay/waku_relay_test.go

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"github.com/libp2p/go-libp2p/core/peerstore"
1313
"github.com/prometheus/client_golang/prometheus"
1414
"github.com/stretchr/testify/require"
15+
1516
"github.com/waku-org/go-waku/logging"
1617
"github.com/waku-org/go-waku/tests"
1718
"github.com/waku-org/go-waku/waku/v2/protocol"
@@ -41,7 +42,7 @@ func TestWakuRelay(t *testing.T) {
4142
require.NoError(t, err)
4243
defer relay.Stop()
4344

44-
subs, err := relay.subscribe(context.Background(), protocol.NewContentFilter(testTopic))
45+
subs, err := relay.subscribe(protocol.NewContentFilter(testTopic))
4546

4647
require.NoError(t, err)
4748

@@ -92,7 +93,7 @@ func TestWakuRelayUnsubscribedTopic(t *testing.T) {
9293
require.NoError(t, err)
9394
defer relay.Stop()
9495

95-
subs, err := relay.subscribe(context.Background(), protocol.NewContentFilter(testTopic))
96+
subs, err := relay.subscribe(protocol.NewContentFilter(testTopic))
9697

9798
require.NoError(t, err)
9899

@@ -278,7 +279,7 @@ func TestWakuRelayAutoShard(t *testing.T) {
278279
defer bcaster.Stop()
279280

280281
//Create a contentTopic level subscription
281-
subs, err := relay.subscribe(context.Background(), protocol.NewContentFilter("", testcTopic))
282+
subs, err := relay.subscribe(protocol.NewContentFilter("", testcTopic))
282283
require.NoError(t, err)
283284
require.Equal(t, relay.IsSubscribed(subs[0].contentFilter.PubsubTopic), true)
284285

@@ -299,7 +300,7 @@ func TestWakuRelayAutoShard(t *testing.T) {
299300
defer cancel()
300301

301302
//Create a pubSub level subscription
302-
subs1, err := relay.subscribe(context.Background(), protocol.NewContentFilter(subs[0].contentFilter.PubsubTopic))
303+
subs1, err := relay.subscribe(protocol.NewContentFilter(subs[0].contentFilter.PubsubTopic))
303304
require.NoError(t, err)
304305

305306
msg := &pb.WakuMessage{
@@ -382,7 +383,7 @@ func TestInvalidMessagePublish(t *testing.T) {
382383

383384
ctx, ctxCancel := context.WithTimeout(context.Background(), 10*time.Second)
384385

385-
subs, err := relay.subscribe(context.Background(), protocol.NewContentFilter(testTopic))
386+
subs, err := relay.subscribe(protocol.NewContentFilter(testTopic))
386387
require.NoError(t, err)
387388

388389
// Test empty contentTopic
@@ -459,10 +460,10 @@ func TestWakuRelayStaticSharding(t *testing.T) {
459460
time.Sleep(2 * time.Second)
460461

461462
// Subscribe to valid static shard topic on both hosts
462-
subs1, err := relay2.subscribe(context.Background(), protocol.NewContentFilter(testTopic, testContentTopic))
463+
subs1, err := relay2.subscribe(protocol.NewContentFilter(testTopic, testContentTopic))
463464
require.NoError(t, err)
464465

465-
subs2, err := relay2.subscribe(context.Background(), protocol.NewContentFilter(testTopic, testContentTopic))
466+
subs2, err := relay2.subscribe(protocol.NewContentFilter(testTopic, testContentTopic))
466467
require.NoError(t, err)
467468
require.True(t, relay2.IsSubscribed(testTopic))
468469
require.Equal(t, testContentTopic, subs2[0].contentFilter.ContentTopics.ToList()[0])

0 commit comments

Comments
 (0)