88	"cosmossdk.io/math" 
99	"github.com/cosmos/cosmos-sdk/types" 
1010	"github.com/sentinel-official/sentinel-go-sdk/libs/cron" 
11+ 	logger "github.com/sentinel-official/sentinel-go-sdk/libs/log" 
1112	"github.com/sentinel-official/sentinelhub/v12/types/v1" 
1213
1314	"github.com/sentinel-official/sentinel-dvpnx/core" 
@@ -25,6 +26,8 @@ const (
2526// This worker retrieves session data from the database, validates it against the blockchain, 
2627// and broadcasts any updates as transactions. 
2728func  NewSessionUsageSyncWithBlockchainWorker (c  * core.Context , interval  time.Duration ) cron.Worker  {
29+ 	log  :=  logger .With ("module" , "workers" , "name" , NameSessionUsageSyncWithBlockchain )
30+ 
2831	handlerFunc  :=  func (ctx  context.Context ) error  {
2932		// Retrieve session records from the database. 
3033		query  :=  map [string ]interface {}{
@@ -44,14 +47,25 @@ func NewSessionUsageSyncWithBlockchainWorker(c *core.Context, interval time.Dura
4447				return  fmt .Errorf ("querying session %d from blockchain: %w" , item .GetID (), err )
4548			}
4649			if  session  ==  nil  {
50+ 				log .Debug ("Skipping session" ,
51+ 					"id" , item .GetID (), "peer_id" , item .GetPeerID (), "cause" , "nil session" ,
52+ 				)
4753				continue 
4854			}
4955			if  session .GetUploadBytes ().Equal (item .GetRxBytes ()) {
56+ 				log .Debug ("Skipping session" ,
57+ 					"id" , item .GetID (), "peer_id" , item .GetPeerID (), "cause" , "already up-to-date" ,
58+ 				)
5059				continue 
5160			}
5261
5362			// Generate an update message for the session. 
5463			msg  :=  item .MsgUpdateSessionRequest ()
64+ 			log .Debug ("Adding session to update list" ,
65+ 				"id" , item .GetID (), "peer_id" , item .GetPeerID (), "download_bytes" , msg .DownloadBytes ,
66+ 				"duration" , msg .Duration , "upload_bytes" , msg .UploadBytes ,
67+ 			)
68+ 
5569			msgs  =  append (msgs , msg )
5670		}
5771
@@ -73,6 +87,8 @@ func NewSessionUsageSyncWithBlockchainWorker(c *core.Context, interval time.Dura
7387// NewSessionUsageSyncWithDatabaseWorker creates a worker that updates session usage in the database. 
7488// This worker fetches usage data from the peer service and updates the corresponding database records. 
7589func  NewSessionUsageSyncWithDatabaseWorker (c  * core.Context , interval  time.Duration ) cron.Worker  {
90+ 	log  :=  logger .With ("module" , "workers" , "name" , NameSessionUsageSyncWithDatabase )
91+ 
7692	handlerFunc  :=  func (ctx  context.Context ) error  {
7793		// Fetch peer usage statistics from the service. 
7894		items , err  :=  c .Service ().PeerStatistics ()
@@ -81,14 +97,22 @@ func NewSessionUsageSyncWithDatabaseWorker(c *core.Context, interval time.Durati
8197		}
8298
8399		// Update the database with the fetched statistics. 
84- 		for  id , item  :=  range  items  {
100+ 		for  peerID , item  :=  range  items  {
101+ 			if  time .Since (item .UpdatedAt ) >  interval  {
102+ 				log .Debug ("Skipping session" ,
103+ 					"id" , 0 , "peer_id" , peerID , "cause" , "already up-to-date" ,
104+ 					"updated_at" , item .UpdatedAt ,
105+ 				)
106+ 				continue 
107+ 			}
108+ 
85109			// Convert usage statistics to strings for database storage. 
86110			rxBytes  :=  math .NewInt (item .RxBytes ).String ()
87111			txBytes  :=  math .NewInt (item .TxBytes ).String ()
88112
89113			// Define query to find the session by peer id. 
90114			query  :=  map [string ]interface {}{
91- 				"peer_id" : id ,
115+ 				"peer_id" : peerID ,
92116			}
93117
94118			// Define updates to apply to the session record. 
@@ -97,9 +121,11 @@ func NewSessionUsageSyncWithDatabaseWorker(c *core.Context, interval time.Durati
97121				"tx_bytes" : txBytes ,
98122			}
99123
100- 			// Update the session in the database. 
124+ 			log .Debug ("Updating session in database" ,
125+ 				"id" , 0 , "peer_id" , peerID , "rx_bytes" , rxBytes , "tx_bytes" , txBytes ,
126+ 			)
101127			if  _ , err  :=  operations .SessionFindOneAndUpdate (c .Database (), query , updates ); err  !=  nil  {
102- 				return  fmt .Errorf ("updating session for peer %q in database: %w" , id , err )
128+ 				return  fmt .Errorf ("updating session for peer %q in database: %w" , peerID , err )
103129			}
104130		}
105131
@@ -115,6 +141,8 @@ func NewSessionUsageSyncWithDatabaseWorker(c *core.Context, interval time.Durati
115141// NewSessionUsageValidateWorker creates a worker that validates session usage limits and removes peers if necessary. 
116142// This worker checks if sessions exceed their maximum byte or duration limits and removes peers accordingly. 
117143func  NewSessionUsageValidateWorker (c  * core.Context , interval  time.Duration ) cron.Worker  {
144+ 	log  :=  logger .With ("module" , "workers" , "name" , NameSessionUsageValidate )
145+ 
118146	handlerFunc  :=  func (ctx  context.Context ) error  {
119147		// Retrieve session records from the database. 
120148		query  :=  map [string ]interface {}{
@@ -133,22 +161,35 @@ func NewSessionUsageValidateWorker(c *core.Context, interval time.Duration) cron
133161			// Check if the session exceeds the maximum allowed bytes. 
134162			maxBytes  :=  item .GetMaxBytes ()
135163			if  ! maxBytes .IsZero () &&  item .GetTotalBytes ().GTE (maxBytes ) {
164+ 				log .Debug ("Marking peer for removing from service" ,
165+ 					"id" , item .GetID (), "peer_id" , item .GetPeerID (), "cause" , "exceeds max bytes" ,
166+ 					"total_bytes" , item .GetTotalBytes (), "max_bytes" , item .GetMaxBytes (),
167+ 				)
136168				removePeer  =  true 
137169			}
138170
139171			// Check if the session exceeds the maximum allowed duration. 
140172			maxDuration  :=  item .GetMaxDuration ()
141173			if  maxDuration  !=  0  &&  item .GetDuration () >=  maxDuration  {
174+ 				log .Debug ("Marking peer for removing from service" ,
175+ 					"id" , item .GetID (), "peer_id" , item .GetPeerID (), "cause" , "exceeds max duration" ,
176+ 					"duration" , item .GetDuration (), "max_duration" , maxDuration ,
177+ 				)
142178				removePeer  =  true 
143179			}
144180
145181			// Ensure that only sessions of the current service type are validated. 
146182			if  item .GetServiceType () !=  c .Service ().Type () {
183+ 				log .Debug ("Skipping peer" ,
184+ 					"id" , item .GetID (), "peer_id" , item .GetPeerID (), "cause" , "invalid service type" ,
185+ 					"got" , item .GetServiceType (), "expected" , c .Service ().Type (),
186+ 				)
147187				removePeer  =  false 
148188			}
149189
150190			// If the session exceeded any limits, remove the associated peer. 
151191			if  removePeer  {
192+ 				log .Info ("Removing peer from service" , "id" , item .GetID (), "peer_id" , item .GetPeerID ())
152193				if  err  :=  c .RemovePeerIfExists (ctx , item .GetPeerID ()); err  !=  nil  {
153194					return  fmt .Errorf ("removing peer %q for session %d from service: %w" , item .GetPeerID (), item .GetID (), err )
154195				}
@@ -167,6 +208,8 @@ func NewSessionUsageValidateWorker(c *core.Context, interval time.Duration) cron
167208// NewSessionValidateWorker creates a worker that validates session status and removes peers if necessary. 
168209// This worker ensures sessions are active and consistent between the database and blockchain. 
169210func  NewSessionValidateWorker (c  * core.Context , interval  time.Duration ) cron.Worker  {
211+ 	log  :=  logger .With ("module" , "workers" , "name" , NameSessionValidate )
212+ 
170213	handlerFunc  :=  func (ctx  context.Context ) error  {
171214		// Retrieve session records from the database. 
172215		query  :=  map [string ]interface {}{
@@ -189,20 +232,32 @@ func NewSessionValidateWorker(c *core.Context, interval time.Duration) cron.Work
189232
190233			// Remove peer if the session is missing on the blockchain. 
191234			if  session  ==  nil  {
235+ 				log .Debug ("Marking peer for removing from service" ,
236+ 					"id" , item .GetID (), "peer_id" , item .GetPeerID (), "cause" , "nil session" ,
237+ 				)
192238				removePeer  =  true 
193239			}
194240			// Remove peer if the session status is not active. 
195241			if  session  !=  nil  &&  ! session .GetStatus ().Equal (v1 .StatusActive ) {
242+ 				log .Debug ("Marking peer for removing from service" ,
243+ 					"id" , item .GetID (), "peer_id" , item .GetPeerID (), "cause" , "invalid session status" ,
244+ 					"got" , session .GetStatus (), "expected" , v1 .StatusActive ,
245+ 				)
196246				removePeer  =  true 
197247			}
198248
199249			// Ensure that only sessions of the current service type are validated. 
200250			if  item .GetServiceType () !=  c .Service ().Type () {
251+ 				log .Debug ("Skipping peer" ,
252+ 					"id" , item .GetID (), "peer_id" , item .GetPeerID (), "cause" , "invalid service type" ,
253+ 					"got" , item .GetServiceType (), "expected" , c .Service ().Type (),
254+ 				)
201255				removePeer  =  false 
202256			}
203257
204258			// Remove the associated peer if validation fails. 
205259			if  removePeer  {
260+ 				log .Info ("Removing peer from service" , "id" , item .GetID (), "peer_id" , item .GetPeerID ())
206261				if  err  :=  c .RemovePeerIfExists (ctx , item .GetPeerID ()); err  !=  nil  {
207262					return  fmt .Errorf ("removing peer %q for session %d from service: %w" , item .GetPeerID (), item .GetID (), err )
208263				}
@@ -212,6 +267,9 @@ func NewSessionValidateWorker(c *core.Context, interval time.Duration) cron.Work
212267
213268			// Delete session if the session is missing on the blockchain. 
214269			if  session  ==  nil  {
270+ 				log .Debug ("Marking session for deleting from database" ,
271+ 					"id" , item .GetID (), "peer_id" , item .GetPeerID (), "cause" , "nil session" ,
272+ 				)
215273				deleteSession  =  true 
216274			}
217275
@@ -221,6 +279,7 @@ func NewSessionValidateWorker(c *core.Context, interval time.Duration) cron.Work
221279					"id" : item .GetID (),
222280				}
223281
282+ 				log .Info ("Deleting session from database" , "id" , item .GetID (), "peer_id" , item .GetPeerID ())
224283				if  _ , err  :=  operations .SessionFindOneAndDelete (c .Database (), query ); err  !=  nil  {
225284					return  fmt .Errorf ("deleting session %d from database: %w" , item .GetID (), err )
226285				}
0 commit comments