Skip to content

Commit 84c36f7

Browse files
committed
fix: max allowance check session validate worker
1 parent e8b794b commit 84c36f7

File tree

5 files changed

+69
-52
lines changed

5 files changed

+69
-52
lines changed

cmd/root.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ node initialization, ensuring privacy, performance, and ease of use.`,
7373
)
7474

7575
// Persistent flags
76-
rootCmd.PersistentFlags().String("home", homeDir, "Home directory for application config and data")
76+
rootCmd.PersistentFlags().String("home", homeDir, "home directory for application config and data")
7777

7878
return rootCmd
7979
}

config/node.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -240,14 +240,14 @@ func DefaultNodeConfig() NodeConfig {
240240
APIPort: fmt.Sprintf("%d", utils.RandomPort()),
241241
GigabytePrices: "0.01;0;udvpn",
242242
HourlyPrices: "0.02;0;udvpn",
243-
IntervalBestRPCAddr: "5m",
244-
IntervalGeoIPLocation: "6h",
245-
IntervalSessionUsageSyncWithBlockchain: "2h",
246-
IntervalSessionUsageSyncWithDatabase: "5s",
247-
IntervalSessionUsageValidate: "5s",
248-
IntervalSessionValidate: "1m",
249-
IntervalSpeedtest: "168h",
250-
IntervalStatusUpdate: "1h",
243+
IntervalBestRPCAddr: (5 * time.Minute).String(),
244+
IntervalGeoIPLocation: (6 * time.Hour).String(),
245+
IntervalSessionUsageSyncWithBlockchain: (2*time.Hour - 5*time.Minute).String(),
246+
IntervalSessionUsageSyncWithDatabase: (3 * time.Second).String(),
247+
IntervalSessionUsageValidate: (3 * time.Second).String(),
248+
IntervalSessionValidate: (5 * time.Minute).String(),
249+
IntervalSpeedtest: (7 * 24 * time.Hour).String(),
250+
IntervalStatusUpdate: (1*time.Hour - 5*time.Minute).String(),
251251
Moniker: randMoniker(),
252252
RemoteAddrs: []string{},
253253
Type: randServiceType().String(),

node/node.go

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package node
22

33
import (
4-
"context"
4+
gocontext "context"
55
"errors"
66
"fmt"
77
"path/filepath"
@@ -11,7 +11,7 @@ import (
1111
"github.com/sentinel-official/sentinel-go-sdk/libs/cron"
1212
"github.com/sentinel-official/sentinel-go-sdk/libs/log"
1313

14-
nodecontext "github.com/sentinel-official/dvpn-node/context"
14+
"github.com/sentinel-official/dvpn-node/context"
1515
"github.com/sentinel-official/dvpn-node/utils"
1616
)
1717

@@ -61,8 +61,8 @@ func (n *Node) Scheduler() *cron.Scheduler {
6161
}
6262

6363
// Register registers the node on the network if not already registered.
64-
func (n *Node) Register(c *nodecontext.Context) error {
65-
node, err := c.Client().Node(context.TODO(), c.AccAddr().Bytes())
64+
func (n *Node) Register(c *context.Context) error {
65+
node, err := c.Client().Node(gocontext.TODO(), c.NodeAddr())
6666
if err != nil {
6767
return fmt.Errorf("failed to query node: %w", err)
6868
}
@@ -74,14 +74,14 @@ func (n *Node) Register(c *nodecontext.Context) error {
7474

7575
// Prepare a message to register the node.
7676
msg := v3.NewMsgRegisterNodeRequest(
77-
c.AccAddr().Bytes(),
77+
c.AccAddr(),
7878
c.GigabytePrices(),
7979
c.HourlyPrices(),
8080
c.RemoteAddrs()[0],
8181
)
8282

8383
// Broadcast the registration transaction.
84-
res, err := c.BroadcastTx(context.TODO(), msg)
84+
res, err := c.BroadcastTx(gocontext.TODO(), msg)
8585
if err != nil {
8686
return fmt.Errorf("failed to broadcast register node tx: %w", err)
8787
}
@@ -94,19 +94,19 @@ func (n *Node) Register(c *nodecontext.Context) error {
9494
}
9595

9696
// UpdateDetails updates the node's pricing and address details on the network.
97-
func (n *Node) UpdateDetails(c *nodecontext.Context) error {
97+
func (n *Node) UpdateDetails(c *context.Context) error {
9898
log.Info("Updating node details...")
9999

100100
// Prepare a message to update the node's details.
101101
msg := v3.NewMsgUpdateNodeDetailsRequest(
102-
c.AccAddr().Bytes(),
102+
c.NodeAddr(),
103103
c.GigabytePrices(),
104104
c.HourlyPrices(),
105105
c.RemoteAddrs()[0],
106106
)
107107

108108
// Broadcast the update transaction.
109-
res, err := c.BroadcastTx(context.TODO(), msg)
109+
res, err := c.BroadcastTx(gocontext.TODO(), msg)
110110
if err != nil {
111111
return fmt.Errorf("failed to broadcast update node details tx: %w", err)
112112
}
@@ -119,12 +119,12 @@ func (n *Node) UpdateDetails(c *nodecontext.Context) error {
119119
}
120120

121121
// Start initializes the Node's services, scheduler, and HTTPS server.
122-
func (n *Node) Start(c *nodecontext.Context, errChan chan error) error {
122+
func (n *Node) Start(c *context.Context, errChan chan error) error {
123123
log.Info("Starting node...")
124124

125125
go func() {
126126
// Bring up the service by running pre-defined tasks.
127-
if err := c.Service().Up(context.TODO()); err != nil {
127+
if err := c.Service().Up(gocontext.TODO()); err != nil {
128128
errChan <- fmt.Errorf("failed to run service up task: %w", err)
129129
return
130130
}
@@ -166,11 +166,11 @@ func (n *Node) Start(c *nodecontext.Context, errChan chan error) error {
166166
}
167167

168168
// Stop gracefully stops the Node's operations.
169-
func (n *Node) Stop(c *nodecontext.Context) error {
169+
func (n *Node) Stop(c *context.Context) error {
170170
if err := c.Service().PreDown(); err != nil {
171171
return fmt.Errorf("failed to run service pre-down task: %w", err)
172172
}
173-
if err := c.Service().Down(context.TODO()); err != nil {
173+
if err := c.Service().Down(gocontext.TODO()); err != nil {
174174
return fmt.Errorf("failed to run service down task: %w", err)
175175
}
176176
if err := c.Service().PostDown(); err != nil {

workers/node.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
package workers
22

33
import (
4-
"context"
4+
gocontext "context"
5+
"errors"
56
"fmt"
67
"time"
78

@@ -10,14 +11,14 @@ import (
1011
"github.com/sentinel-official/sentinel-go-sdk/libs/cron"
1112
logger "github.com/sentinel-official/sentinel-go-sdk/libs/log"
1213

13-
nodecontext "github.com/sentinel-official/dvpn-node/context"
14+
"github.com/sentinel-official/dvpn-node/context"
1415
)
1516

1617
const nameNodeStatusUpdate = "node_status_update"
1718

1819
// NewNodeStatusUpdateWorker creates a worker to periodically update the node's status to active on the blockchain.
1920
// This worker broadcasts a transaction to mark the node as active at regular intervals.
20-
func NewNodeStatusUpdateWorker(c *nodecontext.Context, interval time.Duration) cron.Worker {
21+
func NewNodeStatusUpdateWorker(c *context.Context, interval time.Duration) cron.Worker {
2122
log := logger.With("name", nameNodeStatusUpdate)
2223

2324
// Handler function that updates the node's status to active.
@@ -31,12 +32,13 @@ func NewNodeStatusUpdateWorker(c *nodecontext.Context, interval time.Duration) c
3132
)
3233

3334
// Broadcast the transaction message to the blockchain.
34-
res, err := c.BroadcastTx(context.TODO(), msg)
35+
res, err := c.BroadcastTx(gocontext.TODO(), msg)
3536
if err != nil {
3637
return fmt.Errorf("failed to broadcast update node status tx: %w", err)
3738
}
3839
if !res.TxResult.IsOK() {
39-
return fmt.Errorf("update node status tx failed with code %d: %s", res.TxResult.Code, res.TxResult.Log)
40+
err := errors.New(res.TxResult.Log)
41+
return fmt.Errorf("update node status tx failed with code %d: %w", res.TxResult.Code, err)
4042
}
4143

4244
return nil

workers/session.go

Lines changed: 40 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
package workers
22

33
import (
4-
"context"
4+
gocontext "context"
5+
"errors"
56
"fmt"
67
"time"
78

@@ -11,7 +12,7 @@ import (
1112
"github.com/sentinel-official/sentinel-go-sdk/libs/cron"
1213
logger "github.com/sentinel-official/sentinel-go-sdk/libs/log"
1314

14-
nodecontext "github.com/sentinel-official/dvpn-node/context"
15+
"github.com/sentinel-official/dvpn-node/context"
1516
"github.com/sentinel-official/dvpn-node/database/operations"
1617
)
1718

@@ -25,7 +26,7 @@ const (
2526
// NewSessionUsageSyncWithBlockchainWorker creates a worker that synchronizes session usage with the blockchain.
2627
// This worker retrieves session data from the database, validates it against the blockchain,
2728
// and broadcasts any updates as transactions.
28-
func NewSessionUsageSyncWithBlockchainWorker(c *nodecontext.Context, interval time.Duration) cron.Worker {
29+
func NewSessionUsageSyncWithBlockchainWorker(c *context.Context, interval time.Duration) cron.Worker {
2930
log := logger.With("name", nameSessionUsageSyncWithBlockchain)
3031

3132
handlerFunc := func() error {
@@ -34,7 +35,7 @@ func NewSessionUsageSyncWithBlockchainWorker(c *nodecontext.Context, interval ti
3435
// Retrieve session records from the database.
3536
items, err := operations.SessionFind(c.Database(), nil)
3637
if err != nil {
37-
return fmt.Errorf("failed to retrieve sessions from database: %w", err)
38+
return fmt.Errorf("failed to retrieve sessions from the database: %w", err)
3839
}
3940
if len(items) == 0 {
4041
return nil
@@ -43,9 +44,9 @@ func NewSessionUsageSyncWithBlockchainWorker(c *nodecontext.Context, interval ti
4344
var msgs []types.Msg
4445
// Iterate over sessions and prepare messages for updates.
4546
for _, item := range items {
46-
session, err := c.Client().Session(context.TODO(), item.GetID())
47+
session, err := c.Client().Session(gocontext.TODO(), item.GetID())
4748
if err != nil {
48-
return fmt.Errorf("failed to query session: %w", err)
49+
return fmt.Errorf("failed to query session from the blockchain: %w", err)
4950
}
5051

5152
if session != nil {
@@ -56,12 +57,13 @@ func NewSessionUsageSyncWithBlockchainWorker(c *nodecontext.Context, interval ti
5657
}
5758

5859
// Broadcast the prepared messages as a transaction.
59-
res, err := c.BroadcastTx(context.TODO(), msgs...)
60+
res, err := c.BroadcastTx(gocontext.TODO(), msgs...)
6061
if err != nil {
61-
return fmt.Errorf("failed to broadcast update session details tx: %w", err)
62+
return fmt.Errorf("failed to broadcast update session tx: %w", err)
6263
}
6364
if !res.TxResult.IsOK() {
64-
return fmt.Errorf("update session details tx failed with code %d: %s", res.TxResult.Code, res.TxResult.Log)
65+
err := errors.New(res.TxResult.Log)
66+
return fmt.Errorf("update session tx failed with code %d: %w", res.TxResult.Code, err)
6567
}
6668

6769
return nil
@@ -83,13 +85,14 @@ func NewSessionUsageSyncWithBlockchainWorker(c *nodecontext.Context, interval ti
8385

8486
// NewSessionUsageSyncWithDatabaseWorker creates a worker that updates session usage in the database.
8587
// This worker fetches usage data from the peer service and updates the corresponding database records.
86-
func NewSessionUsageSyncWithDatabaseWorker(c *nodecontext.Context, interval time.Duration) cron.Worker {
88+
func NewSessionUsageSyncWithDatabaseWorker(c *context.Context, interval time.Duration) cron.Worker {
8789
log := logger.With("name", nameSessionUsageSyncWithDatabase)
8890

8991
handlerFunc := func() error {
9092
log.Info("Running scheduler worker")
9193

92-
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
94+
// Create a context with a timeout to fetch peer statistics.
95+
ctx, cancel := gocontext.WithTimeout(gocontext.TODO(), 5*time.Second)
9396
defer cancel()
9497

9598
// Fetch peer usage statistics from the service.
@@ -103,19 +106,24 @@ func NewSessionUsageSyncWithDatabaseWorker(c *nodecontext.Context, interval time
103106

104107
// Update the database with the fetched statistics.
105108
for _, item := range items {
109+
// Convert usage statistics to strings for database storage.
106110
downloadBytes := math.NewInt(item.DownloadBytes).String()
107111
uploadBytes := math.NewInt(item.UploadBytes).String()
108112

113+
// Define query to find the session by peer key.
109114
query := map[string]interface{}{
110115
"peer_key": item.Key,
111116
}
117+
118+
// Define updates to apply to the session record.
112119
updates := map[string]interface{}{
113120
"download_bytes": downloadBytes,
114121
"upload_bytes": uploadBytes,
115122
}
116123

124+
// Update the session in the database.
117125
if _, err := operations.SessionFindOneAndUpdate(c.Database(), query, updates); err != nil {
118-
return fmt.Errorf("failed to update session: %w", err)
126+
return fmt.Errorf("failed to update session with peer key %s: %w", item.Key, err)
119127
}
120128
}
121129

@@ -138,7 +146,7 @@ func NewSessionUsageSyncWithDatabaseWorker(c *nodecontext.Context, interval time
138146

139147
// NewSessionUsageValidateWorker creates a worker that validates session usage limits and removes peers if necessary.
140148
// This worker checks if sessions exceed their maximum byte or duration limits and removes peers accordingly.
141-
func NewSessionUsageValidateWorker(c *nodecontext.Context, interval time.Duration) cron.Worker {
149+
func NewSessionUsageValidateWorker(c *context.Context, interval time.Duration) cron.Worker {
142150
log := logger.With("name", nameSessionUsageValidate)
143151

144152
handlerFunc := func() error {
@@ -147,7 +155,7 @@ func NewSessionUsageValidateWorker(c *nodecontext.Context, interval time.Duratio
147155
// Retrieve session records from the database.
148156
items, err := operations.SessionFind(c.Database(), nil)
149157
if err != nil {
150-
return fmt.Errorf("failed to retrieve sessions from database: %w", err)
158+
return fmt.Errorf("failed to retrieve sessions from the database: %w", err)
151159
}
152160
if len(items) == 0 {
153161
return nil
@@ -157,16 +165,22 @@ func NewSessionUsageValidateWorker(c *nodecontext.Context, interval time.Duratio
157165
for _, item := range items {
158166
removePeer := false
159167

160-
if item.GetBytes().GTE(item.GetMaxBytes()) {
168+
// Check if the session exceeds the maximum allowed bytes.
169+
maxBytes := item.GetMaxBytes()
170+
if !maxBytes.IsZero() && item.GetBytes().GTE(maxBytes) {
161171
removePeer = true
162172
}
163-
if item.GetDuration() >= item.GetMaxDuration() {
173+
174+
// Check if the session exceeds the maximum allowed duration.
175+
maxDuration := item.GetMaxDuration()
176+
if maxDuration != 0 && item.GetDuration() >= maxDuration {
164177
removePeer = true
165178
}
166179

180+
// Remove the peer if validation fails.
167181
if removePeer {
168-
if err := c.RemovePeerIfExistsForKey(context.TODO(), item.PeerKey); err != nil {
169-
return fmt.Errorf("failed to remove peer: %w", err)
182+
if err := c.RemovePeerIfExistsForKey(gocontext.TODO(), item.PeerKey); err != nil {
183+
return fmt.Errorf("failed to remove peer with key %s: %w", item.PeerKey, err)
170184
}
171185
}
172186
}
@@ -190,7 +204,7 @@ func NewSessionUsageValidateWorker(c *nodecontext.Context, interval time.Duratio
190204

191205
// NewSessionValidateWorker creates a worker that validates session status and removes peers if necessary.
192206
// This worker ensures sessions are active and consistent between the database and blockchain.
193-
func NewSessionValidateWorker(c *nodecontext.Context, interval time.Duration) cron.Worker {
207+
func NewSessionValidateWorker(c *context.Context, interval time.Duration) cron.Worker {
194208
log := logger.With("name", nameSessionValidate)
195209

196210
handlerFunc := func() error {
@@ -199,23 +213,23 @@ func NewSessionValidateWorker(c *nodecontext.Context, interval time.Duration) cr
199213
// Retrieve session records from the database.
200214
items, err := operations.SessionFind(c.Database(), nil)
201215
if err != nil {
202-
return fmt.Errorf("failed to retrieve sessions from database: %w", err)
216+
return fmt.Errorf("failed to retrieve sessions from the database: %w", err)
203217
}
204218
if len(items) == 0 {
205219
return nil
206220
}
207221

208222
// Validate session status and consistency.
209223
for _, item := range items {
210-
session, err := c.Client().Session(context.TODO(), item.GetID())
224+
session, err := c.Client().Session(gocontext.TODO(), item.GetID())
211225
if err != nil {
212-
return fmt.Errorf("failed to query session: %w", err)
226+
return fmt.Errorf("failed to query session from the blockchain: %w", err)
213227
}
214228

215229
// Remove peers if sessions are inactive or missing on the blockchain.
216230
if session == nil || !session.GetStatus().Equal(v1.StatusActive) {
217-
if err := c.RemovePeerIfExistsForKey(context.TODO(), item.PeerKey); err != nil {
218-
return fmt.Errorf("failed to remove peer: %w", err)
231+
if err := c.RemovePeerIfExistsForKey(gocontext.TODO(), item.PeerKey); err != nil {
232+
return fmt.Errorf("failed to remove peer with key %s: %w", item.PeerKey, err)
219233
}
220234
}
221235

@@ -224,8 +238,9 @@ func NewSessionValidateWorker(c *nodecontext.Context, interval time.Duration) cr
224238
query := map[string]interface{}{
225239
"id": item.ID,
226240
}
241+
227242
if _, err := operations.SessionFindOneAndDelete(c.Database(), query); err != nil {
228-
return fmt.Errorf("failed to delete session: %w", err)
243+
return fmt.Errorf("failed to delete session %d: %w", item.ID, err)
229244
}
230245
}
231246
}

0 commit comments

Comments
 (0)