Skip to content

Commit ad5cc46

Browse files
authored
Merge pull request #118 from Warp-net/develop
Develop
2 parents 9834eba + 3bffdcc commit ad5cc46

338 files changed

Lines changed: 8239 additions & 37249 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

.githooks/pre-commit

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,9 @@ NEW_VERSION="$MAJOR.$MINOR.$PATCH"
3434
sed -i "s/^$CURRENT_VERSION\$/$NEW_VERSION/" "$CONFIG"
3535
sed -i -E "s/^version:[[:space:]]*'?([0-9]+\.[0-9]+\.[0-9]+)'?/version: ${NEW_VERSION}/" "$SNAPCRAFT"
3636

37+
go mod tidy
38+
go mod vendor
39+
3740
git add "$CONFIG" "$SNAPCRAFT"
3841

3942

.github/workflows/check-static.yaml

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,12 @@ env:
55

66
on:
77
workflow_dispatch:
8-
# push:
9-
# branches:
10-
# - main
11-
# pull_request:
12-
# branches:
13-
# - main
8+
push:
9+
branches:
10+
- main
11+
pull_request:
12+
branches:
13+
- main
1414

1515
jobs:
1616
lint:

cmd/node/bootstrap/socks5/server.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,9 @@ type MetricsPusher interface {
3333
RemoveSocksConnections(ip string)
3434
}
3535

36-
const reqIpKey = "ip-key"
36+
type ctxKey string
37+
38+
const reqIpKey ctxKey = "ip-key"
3739

3840
type rule struct {
3941
m MetricsPusher
@@ -133,13 +135,13 @@ func (s *socksServer) warpnetOverlayHandler(ctx context.Context, proto, addr str
133135

134136
peer, isRedirect := s.balancer.route()
135137
if peer == "" {
136-
return nil, fmt.Errorf("no peers found") //nolint:errcheck
138+
return nil, fmt.Errorf("no peers found") //nolint:err113
137139
}
138140
if isRedirect {
139141
peerAddrs := s.streamer.Peerstore().Addrs(peer)
140142
log.Infof("socks5 server: redirect to %v", peerAddrs)
141143
for _, pAddr := range peerAddrs {
142-
conn, err := net.DialTimeout(proto, toNetAddr(pAddr).String(), time.Second)
144+
conn, err := net.DialTimeout(proto, toNetAddr(pAddr).String(), time.Second) //nolint:noctx
143145
if err != nil {
144146
continue
145147
}

cmd/node/member/node/member-node.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ type MemberNode struct {
6464
pubsubService PubSubProvider
6565
dHashTable DistributedHashTableCloser
6666
nodeRepo NodeProvider
67-
statsRepo NodeProvider
67+
statsRepo StatsProvider
6868
authRepo AuthProvider
6969
userRepo UserProvider
7070
followRepo FollowStorer
@@ -88,13 +88,13 @@ func NewMemberNode(
8888
if len(privKey) == 0 {
8989
return nil, node.ErrPrivateKeyRequired
9090
}
91-
nodeRepo := database.NewNodeRepo(db, "NODES")
91+
nodeRepo := database.NewNodeRepo(db)
9292
store, err := warpnet.NewPeerstore(ctx, nodeRepo)
9393
if err != nil {
9494
return nil, err
9595
}
9696

97-
statsRepo := database.NewNodeRepo(db, "CRDT")
97+
statsRepo := database.NewStatsRepo(db)
9898
userRepo := database.NewUserRepo(db)
9999
followRepo := database.NewFollowRepo(db)
100100
owner := authRepo.GetOwner()

cmd/node/member/node/types.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,10 @@ type DistributedHashTableCloser interface {
7575

7676
type NodeProvider interface {
7777
datastore.Datastore
78-
Prefix() string
78+
}
79+
80+
type StatsProvider interface {
81+
datastore.Datastore
7982
}
8083

8184
type AuthProvider interface {

core/crdt/stats.go

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,6 @@ type Broadcaster interface {
5858

5959
type CRDTStorer interface {
6060
ds.Datastore
61-
Prefix() string
6261
}
6362

6463
type CRDTRouter interface {
@@ -84,10 +83,6 @@ func NewCRDTStatsStore(
8483
node host.Host,
8584
router CRDTRouter,
8685
) (*CRDTStatsStore, error) {
87-
prefix := datastore.Prefix()
88-
if prefix != "/CRDT" {
89-
return nil, warpnet.WarpError("CRDT datastore namespace must start with '/CRDT' prefix")
90-
}
9186
ctx, cancel := context.WithCancel(ctx)
9287

9388
baseStore := ds.MutexWrap(datastore)

core/transport/integration_test.go

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -641,7 +641,6 @@ func TestIntegration_MDNSDiscovery(t *testing.T) {
641641
libp2p.DisableRelay(),
642642
)
643643
require.NoError(t, err)
644-
defer hostA.Close()
645644

646645
// Host B.
647646
hostB, err := libp2p.New(
@@ -653,7 +652,6 @@ func TestIntegration_MDNSDiscovery(t *testing.T) {
653652
libp2p.DisableRelay(),
654653
)
655654
require.NoError(t, err)
656-
defer hostB.Close()
657655

658656
t.Logf("Host A: %s addrs=%v", hostA.ID(), hostA.Addrs())
659657
t.Logf("Host B: %s addrs=%v", hostB.ID(), hostB.Addrs())
@@ -681,7 +679,6 @@ func TestIntegration_MDNSDiscovery(t *testing.T) {
681679
}
682680
mdnsA := mdns.NewMdnsService(hostA, mdnsServiceName, notifeeA)
683681
require.NoError(t, mdnsA.Start())
684-
defer mdnsA.Close()
685682

686683
notifeeB := &mdnsNotifee{
687684
h: hostB,
@@ -690,7 +687,15 @@ func TestIntegration_MDNSDiscovery(t *testing.T) {
690687
}
691688
mdnsB := mdns.NewMdnsService(hostB, mdnsServiceName, notifeeB)
692689
require.NoError(t, mdnsB.Start())
693-
defer mdnsB.Close()
690+
defer func() {
691+
mdnsA.Close()
692+
mdnsB.Close()
693+
694+
time.Sleep(100 * time.Millisecond)
695+
696+
hostA.Close()
697+
hostB.Close()
698+
}()
694699

695700
// Wait for at least one side to discover and connect to the other.
696701
t.Log("Waiting for mDNS discovery...")
@@ -765,11 +770,9 @@ func TestIntegration_MDNSDiscoveryWithPSKAndRelay(t *testing.T) {
765770

766771
hostA, err := libp2p.New(commonOpts()...)
767772
require.NoError(t, err)
768-
defer hostA.Close()
769773

770774
hostB, err := libp2p.New(commonOpts()...)
771775
require.NoError(t, err)
772-
defer hostB.Close()
773776

774777
t.Logf("Host A: %s addrs=%v", hostA.ID(), hostA.Addrs())
775778
t.Logf("Host B: %s addrs=%v", hostB.ID(), hostB.Addrs())
@@ -797,7 +800,6 @@ func TestIntegration_MDNSDiscoveryWithPSKAndRelay(t *testing.T) {
797800
}
798801
mdnsA := mdns.NewMdnsService(hostA, mdnsServiceName, notifeeA)
799802
require.NoError(t, mdnsA.Start())
800-
defer mdnsA.Close()
801803

802804
notifeeB := &mdnsNotifee{
803805
h: hostB,
@@ -806,8 +808,16 @@ func TestIntegration_MDNSDiscoveryWithPSKAndRelay(t *testing.T) {
806808
}
807809
mdnsB := mdns.NewMdnsService(hostB, mdnsServiceName, notifeeB)
808810
require.NoError(t, mdnsB.Start())
809-
defer mdnsB.Close()
810811

812+
defer func() {
813+
mdnsA.Close()
814+
mdnsB.Close()
815+
816+
time.Sleep(100 * time.Millisecond)
817+
818+
hostA.Close()
819+
hostB.Close()
820+
}()
811821
t.Log("Waiting for mDNS discovery with PSK+Relay...")
812822
select {
813823
case pid := <-notifeeA.connected:

database/node-repo.go

Lines changed: 30 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ import (
4444
// slash is required because of: invalid datastore key: NODES:/peers/keys/AASAQAISEAXNRKHMX2O3AA26JM7NGIWUPOGIITJ2UHHXGX4OWIEKPNAW6YCSK/priv
4545
const (
4646
requiredPrefixSlash = "/"
47+
nodesPrefix = "NODES"
4748

4849
ErrNilNodeRepo = local_store.DBError("node repo is nil")
4950
)
@@ -71,7 +72,8 @@ type NodeRepo struct {
7172
BootstrapSelfHashHex string
7273
}
7374

74-
func NewNodeRepo(db NodeStorer, prefix string) *NodeRepo {
75+
func NewNodeRepo(db NodeStorer) *NodeRepo {
76+
prefix := nodesPrefix
7577
if !strings.HasPrefix(prefix, requiredPrefixSlash) {
7678
prefix = requiredPrefixSlash + prefix
7779
}
@@ -83,10 +85,6 @@ func NewNodeRepo(db NodeStorer, prefix string) *NodeRepo {
8385
return nr
8486
}
8587

86-
func (d *NodeRepo) Prefix() string {
87-
return d.prefix
88-
}
89-
9088
func (d *NodeRepo) Put(ctx context.Context, key datastore.Key, value []byte) error {
9189
if d == nil || d.db == nil {
9290
return ErrNilNodeRepo
@@ -345,9 +343,7 @@ func (d *NodeRepo) query(tx *local_store.Txn, q datastore.Query) (_ datastore.Re
345343
opt := local_store.DefaultIteratorOptions
346344
opt.PrefetchValues = !q.KeysOnly
347345

348-
key := strings.TrimPrefix(q.Prefix, "/")
349-
prefix := local_store.NewPrefixBuilder(d.prefix).AddRootID(key).Build().Bytes()
350-
opt.Prefix = prefix
346+
opt.Prefix = d.storageQueryPrefix(q.Prefix)
351347

352348
// Handle ordering
353349
if len(q.Orders) > 0 {
@@ -405,7 +401,7 @@ func (d *NodeRepo) query(tx *local_store.Txn, q datastore.Query) (_ datastore.Re
405401
matches := true
406402
check := func(value []byte) error {
407403
e := datastore.DsEntry{
408-
Key: string(item.Key()),
404+
Key: d.resultKeyFromStorageKey(string(item.Key())),
409405
Value: value,
410406
Size: int(item.ValueSize()),
411407
}
@@ -443,7 +439,7 @@ func (d *NodeRepo) query(tx *local_store.Txn, q datastore.Query) (_ datastore.Re
443439
return
444440
}
445441
item := it.Item()
446-
e := datastore.DsEntry{Key: string(item.Key())}
442+
e := datastore.DsEntry{Key: d.resultKeyFromStorageKey(string(item.Key()))}
447443

448444
var result datastore.Result
449445
if !q.KeysOnly {
@@ -481,6 +477,30 @@ func (d *NodeRepo) query(tx *local_store.Txn, q datastore.Query) (_ datastore.Re
481477
return results, nil
482478
}
483479

480+
func (d *NodeRepo) storageQueryPrefix(queryPrefix string) []byte {
481+
prefix := strings.TrimSuffix(datastore.NewKey(queryPrefix).String(), requiredPrefixSlash)
482+
base := strings.TrimSuffix(d.prefix, requiredPrefixSlash)
483+
484+
if prefix == "" {
485+
return []byte(base + requiredPrefixSlash)
486+
}
487+
488+
return []byte(base + prefix + requiredPrefixSlash)
489+
}
490+
491+
func (d *NodeRepo) resultKeyFromStorageKey(storageKey string) string {
492+
if storageKey == d.prefix {
493+
return requiredPrefixSlash
494+
}
495+
496+
trimPrefix := d.prefix + requiredPrefixSlash
497+
if strings.HasPrefix(storageKey, trimPrefix) { //nolint:modernize
498+
return requiredPrefixSlash + strings.TrimPrefix(storageKey, trimPrefix)
499+
}
500+
501+
return storageKey
502+
}
503+
484504
func filter(filters []datastore.Filter, entry datastore.DsEntry) bool {
485505
for _, f := range filters {
486506
if !f.Filter(entry) {

database/node-repo_test.go

Lines changed: 55 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ func (s *NodeRepoTestSuite) SetupSuite() {
5959
auth := NewAuthRepo(s.db)
6060
s.Require().NoError(auth.Authenticate("test", "test"))
6161

62-
s.repo = NewNodeRepo(s.db, "test")
62+
s.repo = NewNodeRepo(s.db)
6363

6464
}
6565

@@ -112,12 +112,12 @@ func (s *NodeRepoTestSuite) TestDiskUsage() {
112112
}
113113

114114
func (s *NodeRepoTestSuite) TestQuerySimple() {
115-
key := datastore.NewKey("query/key")
115+
key := datastore.NewKey("querysimple/key/item")
116116
val := []byte("qval")
117117
err := s.repo.Put(s.ctx, key, val)
118118
s.Require().NoError(err)
119119

120-
q := query.Query{Prefix: "query/key"}
120+
q := query.Query{Prefix: "querysimple/key"}
121121
results, err := s.repo.Query(s.ctx, q)
122122
s.Require().NoError(err)
123123
s.Require().NotNil(results)
@@ -130,12 +130,64 @@ func (s *NodeRepoTestSuite) TestQuerySimple() {
130130
if r.Error != nil {
131131
continue
132132
}
133+
s.Equal("/querysimple/key/item", r.Entry.Key)
133134
found = true
134135
break
135136
}
136137
s.True(found)
137138
}
138139

140+
func (s *NodeRepoTestSuite) TestQueryEmptyPrefix() {
141+
key := datastore.NewKey("all/key")
142+
val := []byte("all")
143+
err := s.repo.Put(s.ctx, key, val)
144+
s.Require().NoError(err)
145+
146+
results, err := s.repo.Query(s.ctx, query.Query{})
147+
s.Require().NoError(err)
148+
s.Require().NotNil(results)
149+
150+
defer func() {
151+
_ = results.Close()
152+
}()
153+
154+
var found bool
155+
for r := range results.Next() {
156+
if r.Error != nil {
157+
continue
158+
}
159+
if r.Entry.Key == "/all/key" {
160+
found = true
161+
break
162+
}
163+
}
164+
165+
s.True(found)
166+
}
167+
168+
func (s *NodeRepoTestSuite) TestQueryPrefixDoesNotMatchSiblingKeys() {
169+
err := s.repo.Put(s.ctx, datastore.NewKey("query/key/child"), []byte("child"))
170+
s.Require().NoError(err)
171+
err = s.repo.Put(s.ctx, datastore.NewKey("query/key2"), []byte("sibling"))
172+
s.Require().NoError(err)
173+
174+
results, err := s.repo.Query(s.ctx, query.Query{Prefix: "query/key"})
175+
s.Require().NoError(err)
176+
s.Require().NotNil(results)
177+
defer func() {
178+
_ = results.Close()
179+
}()
180+
181+
keys := make([]string, 0)
182+
for r := range results.Next() {
183+
s.Require().NoError(r.Error)
184+
keys = append(keys, r.Entry.Key)
185+
}
186+
187+
s.Contains(keys, "/query/key/child")
188+
s.NotContains(keys, "/query/key2")
189+
}
190+
139191
func TestNodeRepoTestSuite(t *testing.T) {
140192
defer goleak.VerifyNone(t)
141193

0 commit comments

Comments
 (0)