-
Notifications
You must be signed in to change notification settings - Fork 994
/
Copy pathset.go
104 lines (90 loc) · 2.03 KB
/
set.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
package discovery
import (
"context"
"sync"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
)
// limitedSet is a thread safe set of peers with given limit.
type limitedSet struct {
lk sync.RWMutex
ps map[peer.ID]struct{}
limit uint
waitPeer chan peer.ID
}
// newLimitedSet constructs a set with the maximum peers amount.
func newLimitedSet(limit uint) *limitedSet {
ps := new(limitedSet)
ps.ps = make(map[peer.ID]struct{})
ps.limit = limit
ps.waitPeer = make(chan peer.ID)
return ps
}
func (ps *limitedSet) Contains(p peer.ID) bool {
ps.lk.RLock()
_, ok := ps.ps[p]
ps.lk.RUnlock()
return ok
}
func (ps *limitedSet) Limit() uint {
return ps.limit
}
func (ps *limitedSet) Size() uint {
ps.lk.RLock()
defer ps.lk.RUnlock()
return uint(len(ps.ps))
}
// Add attempts to add the given peer into the set.
func (ps *limitedSet) Add(p peer.ID) (added bool) {
ps.lk.Lock()
if _, ok := ps.ps[p]; ok {
ps.lk.Unlock()
return false
}
ps.ps[p] = struct{}{}
ps.lk.Unlock()
for {
// peer will be pushed to the channel only when somebody is reading from it.
// this is done to handle case when Peers() was called on empty set.
select {
case ps.waitPeer <- p:
default:
return true
}
}
}
func (ps *limitedSet) Remove(id peer.ID) {
ps.lk.Lock()
delete(ps.ps, id)
ps.lk.Unlock()
}
// Peers returns all discovered peers from the set.
func (ps *limitedSet) Peers(ctx context.Context) ([]peer.ID, error) {
ps.lk.RLock()
if len(ps.ps) > 0 {
out := make([]peer.ID, 0, len(ps.ps))
for p := range ps.ps {
out = append(out, p)
}
ps.lk.RUnlock()
return out, nil
}
ps.lk.RUnlock()
// block until a new peer will be discovered
select {
case <-ctx.Done():
return nil, ctx.Err()
case p := <-ps.waitPeer:
return []peer.ID{p}, nil
}
}
func (ps *limitedSet) peerInfos(pstore peerstore.Peerstore) []peer.AddrInfo {
infos := make([]peer.AddrInfo, 0, len(ps.ps))
for p := range ps.ps {
infos = append(infos, peer.AddrInfo{
ID: p,
Addrs: pstore.Addrs(p),
})
}
return infos
}