Skip to content

Commit 0e3a2eb

Browse files
committed
Events, panics & deltas
This commit has a few changes & bug fixes: * Fixes a panic when using `WithTLS` with a `nil` TLS config * Adds the `Events` method which returns a read-only channel with state updates for nodes. Each event contains a field denoting the type of change and the related peer's data. * Nodes no longer update their own delta every time they send it to a peer. With this, the events stream was super noisy with a lot of useless events. The caveat of this is that a node that is marked as gone may need to rejoin or perform a metadata update. * Nodes now share state with up to 3 peers at once to speed up convergance. Signed-off-by: David Bond <[email protected]>
1 parent 4d2e36a commit 0e3a2eb

File tree

8 files changed

+248
-93
lines changed

8 files changed

+248
-93
lines changed

README.md

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,14 @@ Below is a very concise example of starting a whisper node in-code:
8282
// See the package documentation for all available configuration options.
8383
node := whisper.New(id)
8484

85+
// This is how you react to changes within the gossip network, by handling individual events. You'll want to do this
86+
// in its own goroutine. Events MUST be handled otherwise the node will eventually block on writing to the channel.
87+
for evt := range node.Events() {
88+
switch evt.Type {
89+
// ...
90+
}
91+
}
92+
8593
// This blocks until the given context is cancelled or a fatal error occurs, use it in a separate goroutine or
8694
// an error group
8795
node.Run(ctx)
@@ -105,17 +113,17 @@ within the network.
105113

106114
```go
107115
// The PeerStore interface describes types that can persist peer data.
108-
PeerStore interface {
109-
// FindPeer should return the peer.Peer whose identifier matches the one provided. It should return
110-
// store.ErrPeerNotFound if a matching peer does not exist.
111-
FindPeer(ctx context.Context, id uint64) (peer.Peer, error)
112-
// SavePeer should persist the given peer.Peer.
113-
SavePeer(ctx context.Context, peer peer.Peer) error
114-
// ListPeers should return all peers within the store.
115-
ListPeers(ctx context.Context) ([]peer.Peer, error)
116-
// RemovePeer should remove a peer from the store. It should return store.ErrPeerNotFound if a matching
117-
// peer does not exist.
118-
RemovePeer(ctx context.Context, id uint64) error
116+
type PeerStore interface {
117+
// FindPeer should return the peer.Peer whose identifier matches the one provided. It should return
118+
// store.ErrPeerNotFound if a matching peer does not exist.
119+
FindPeer(ctx context.Context, id uint64) (peer.Peer, error)
120+
// SavePeer should persist the given peer.Peer.
121+
SavePeer(ctx context.Context, peer peer.Peer) error
122+
// ListPeers should return all peers within the store.
123+
ListPeers(ctx context.Context) ([]peer.Peer, error)
124+
// RemovePeer should remove a peer from the store. It should return store.ErrPeerNotFound if a matching
125+
// peer does not exist.
126+
RemovePeer(ctx context.Context, id uint64) error
119127
}
120128
```
121129

cmd/whisper/start/start.go

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,11 @@ import (
99
"strconv"
1010

1111
"github.com/spf13/cobra"
12+
"golang.org/x/sync/errgroup"
1213

1314
"github.com/davidsbond/whisper"
1415
"github.com/davidsbond/whisper/internal/tlsutil"
16+
"github.com/davidsbond/whisper/pkg/event"
1517
)
1618

1719
func Command() *cobra.Command {
@@ -64,17 +66,45 @@ func Command() *cobra.Command {
6466
}
6567
}
6668

69+
logger := slog.Default().With("local_id", id)
70+
6771
node := whisper.New(id,
6872
whisper.WithPort(port),
6973
whisper.WithAddress(address),
7074
whisper.WithJoinAddress(joinAddress),
71-
whisper.WithLogger(slog.Default().With("local_id", id)),
75+
whisper.WithLogger(logger),
7276
whisper.WithCurve(curve),
7377
whisper.WithKey(key),
7478
whisper.WithTLS(tlsConfig),
7579
)
7680

77-
return node.Run(cmd.Context())
81+
group, ctx := errgroup.WithContext(cmd.Context())
82+
group.Go(func() error {
83+
return node.Run(ctx)
84+
})
85+
86+
group.Go(func() error {
87+
for evt := range node.Events() {
88+
switch evt.Type {
89+
case event.TypeDiscovered:
90+
logger.With("peer", evt.Peer.ID).InfoContext(ctx, "peer discovered")
91+
case event.TypeUpdated:
92+
logger.With("peer", evt.Peer.ID).InfoContext(ctx, "peer updated")
93+
case event.TypeRemoved:
94+
logger.With("peer", evt.Peer.ID).WarnContext(ctx, "peer removed")
95+
case event.TypeLeft:
96+
logger.With("peer", evt.Peer.ID).WarnContext(ctx, "peer left")
97+
case event.TypeGone:
98+
logger.With("peer", evt.Peer.ID).ErrorContext(ctx, "peer gone")
99+
default:
100+
continue
101+
}
102+
}
103+
104+
return nil
105+
})
106+
107+
return group.Wait()
78108
},
79109
}
80110

internal/service/service.go

Lines changed: 27 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,10 @@ import (
1414
"google.golang.org/grpc"
1515
"google.golang.org/grpc/codes"
1616
"google.golang.org/grpc/status"
17-
"google.golang.org/protobuf/types/known/anypb"
1817

1918
whispersvcv1 "github.com/davidsbond/whisper/internal/generated/proto/whisper/service/v1"
2019
whisperv1 "github.com/davidsbond/whisper/internal/generated/proto/whisper/v1"
20+
"github.com/davidsbond/whisper/pkg/event"
2121
"github.com/davidsbond/whisper/pkg/peer"
2222
"github.com/davidsbond/whisper/pkg/store"
2323
)
@@ -33,6 +33,7 @@ type (
3333
curve ecdh.Curve
3434
logger *slog.Logger
3535
tls *tls.Config
36+
events chan<- event.Event
3637
}
3738

3839
// The PeerStore interface describes types that persist the current state of all peers within the gossip network.
@@ -48,13 +49,14 @@ type (
4849
)
4950

5051
// New returns a new instance of the Service type that will persist peer data using the provided PeerStore implementation.
51-
func New(id uint64, peers PeerStore, curve ecdh.Curve, logger *slog.Logger, tls *tls.Config) *Service {
52+
func New(id uint64, peers PeerStore, curve ecdh.Curve, logger *slog.Logger, tls *tls.Config, events chan<- event.Event) *Service {
5253
return &Service{
5354
id: id,
5455
peers: peers,
5556
curve: curve,
5657
logger: logger,
5758
tls: tls,
59+
events: events,
5860
}
5961
}
6062

@@ -103,23 +105,23 @@ func (svc *Service) Join(ctx context.Context, r *whispersvcv1.JoinRequest) (*whi
103105

104106
response := &whispersvcv1.JoinResponse{Peers: make([]*whisperv1.Peer, len(peers))}
105107
for i, p := range peers {
106-
response.Peers[i] = &whisperv1.Peer{
107-
Id: p.ID,
108-
Address: p.Address,
109-
PublicKey: p.PublicKey.Bytes(),
110-
Delta: p.Delta,
111-
Status: whisperv1.PeerStatus(p.Status),
108+
response.Peers[i], err = peer.ToProto(p)
109+
if err != nil {
110+
return nil, status.Errorf(codes.Internal, "failed to parse peer %d: %v", p.ID, err)
112111
}
112+
}
113113

114-
if p.Metadata != nil {
115-
response.Peers[i].Metadata, err = anypb.New(p.Metadata)
116-
if err != nil {
117-
return nil, status.Errorf(codes.Internal, "failed to marshal metadata for peer %d: %v", p.ID, err)
118-
}
119-
}
114+
eventType := event.TypeDiscovered
115+
if !existing.IsEmpty() {
116+
eventType = event.TypeUpdated
120117
}
121118

122-
return response, nil
119+
select {
120+
case <-ctx.Done():
121+
return nil, status.FromContextError(ctx.Err()).Err()
122+
case svc.events <- event.Event{Type: eventType, Peer: p}:
123+
return response, nil
124+
}
123125
}
124126

125127
func (svc *Service) validateJoinRequest(r *whispersvcv1.JoinRequest) error {
@@ -170,7 +172,13 @@ func (svc *Service) Leave(ctx context.Context, r *whispersvcv1.LeaveRequest) (*w
170172

171173
svc.logger.With("peer", r.GetId()).DebugContext(ctx, "peer is leaving gossip network")
172174

173-
return &whispersvcv1.LeaveResponse{}, nil
175+
// We have to publish an event here, otherwise the local peer will never hear about peers leaving.
176+
select {
177+
case <-ctx.Done():
178+
return nil, status.FromContextError(ctx.Err()).Err()
179+
case svc.events <- event.Event{Type: event.TypeLeft, Peer: p}:
180+
return &whispersvcv1.LeaveResponse{}, nil
181+
}
174182
}
175183

176184
// Status handles an inbound gRPC request querying this peer's current view of the gossip network.
@@ -185,19 +193,9 @@ func (svc *Service) Status(ctx context.Context, _ *whispersvcv1.StatusRequest) (
185193
}
186194

187195
for _, p := range peers {
188-
record := &whisperv1.Peer{
189-
Id: p.ID,
190-
Address: p.Address,
191-
PublicKey: p.PublicKey.Bytes(),
192-
Delta: p.Delta,
193-
Status: whisperv1.PeerStatus(p.Status),
194-
}
195-
196-
if p.Metadata != nil {
197-
record.Metadata, err = anypb.New(p.Metadata)
198-
if err != nil {
199-
return nil, status.Errorf(codes.Internal, "failed to marshal metadata for peer %d: %v", p.ID, err)
200-
}
196+
record, err := peer.ToProto(p)
197+
if err != nil {
198+
return nil, status.Errorf(codes.Internal, "failed to parse peer %d: %v", p.ID, err)
201199
}
202200

203201
if p.ID == svc.id {

internal/service/service_test.go

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package service_test
22

33
import (
4+
"context"
45
"crypto/ecdh"
56
"crypto/rand"
67
"log/slog"
@@ -18,6 +19,7 @@ import (
1819
whispersvcv1 "github.com/davidsbond/whisper/internal/generated/proto/whisper/service/v1"
1920
whisperv1 "github.com/davidsbond/whisper/internal/generated/proto/whisper/v1"
2021
"github.com/davidsbond/whisper/internal/service"
22+
"github.com/davidsbond/whisper/pkg/event"
2123
"github.com/davidsbond/whisper/pkg/peer"
2224
"github.com/davidsbond/whisper/pkg/store"
2325
)
@@ -178,7 +180,9 @@ func TestService_Join(t *testing.T) {
178180
tc.Setup(s)
179181
}
180182

181-
response, err := service.New(tc.ID, s, curve, logger, nil).Join(t.Context(), tc.Request)
183+
events := make(chan event.Event, 1)
184+
185+
response, err := service.New(tc.ID, s, curve, logger, nil, events).Join(t.Context(), tc.Request)
182186
if tc.ExpectsError {
183187
require.Error(t, err)
184188
assert.Nil(t, response)
@@ -197,6 +201,16 @@ func TestService_Join(t *testing.T) {
197201
assert.EqualValues(t, expected.GetMetadata(), actual.GetMetadata())
198202
assert.EqualValues(t, expected.GetPublicKey(), actual.GetPublicKey())
199203
}
204+
205+
ctx, cancel := context.WithTimeout(t.Context(), time.Second)
206+
defer cancel()
207+
208+
select {
209+
case evt := <-events:
210+
assert.EqualValues(t, tc.Request.GetPeer().GetId(), evt.Peer.ID)
211+
case <-ctx.Done():
212+
assert.Fail(t, "timed out waiting for event")
213+
}
200214
})
201215
}
202216
}

pkg/event/event.go

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
package event
2+
3+
import (
4+
"github.com/davidsbond/whisper/pkg/peer"
5+
)
6+
7+
type (
8+
// The Event type represents a single change in peer state.
9+
Event struct {
10+
// The type of event that has occurred.
11+
Type Type
12+
// The peer that has changed.
13+
Peer peer.Peer
14+
}
15+
16+
// Type is used to denote the kind of state change that has occurred to a single peer.
17+
Type int
18+
)
19+
20+
const (
21+
TypeUnspecified Type = iota
22+
// TypeDiscovered denotes that a Peer has been newly discovered by the local peer.
23+
TypeDiscovered
24+
// TypeUpdated denotes that a Peer's delta has changed, possibly due to a metadata update.
25+
TypeUpdated
26+
// TypeLeft denotes that a Peer has left the gossip network.
27+
TypeLeft
28+
// TypeGone denotes that a Peer has failed or is not accessible from other peers within the gossip network.
29+
TypeGone
30+
// TypeRemoved denotes that a Peer has been completely removed from storage.
31+
TypeRemoved
32+
)
33+
34+
func (e Type) String() string {
35+
switch e {
36+
case TypeDiscovered:
37+
return "DISCOVERED"
38+
case TypeUpdated:
39+
return "UPDATED"
40+
case TypeLeft:
41+
return "LEFT"
42+
case TypeGone:
43+
return "GONE"
44+
case TypeRemoved:
45+
return "REMOVED"
46+
default:
47+
return "UNSPECIFIED"
48+
}
49+
}

0 commit comments

Comments
 (0)