Skip to content

Commit 13ac082

Browse files
committed
test
test2 test test test test test test test test test test
1 parent a9d0eaf commit 13ac082

File tree

2 files changed

+41
-14
lines changed

2 files changed

+41
-14
lines changed

cmd/storemsgcounter/execute.go

Lines changed: 34 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -150,15 +150,15 @@ func Execute(ctx context.Context, options Options) error {
150150
missingMessagesTimer := time.NewTimer(0)
151151
defer missingMessagesTimer.Stop()
152152

153-
syncCheckTimer := time.NewTicker(30 * time.Minute)
153+
syncCheckTimer := time.NewTicker(1 * time.Minute)
154154
defer syncCheckTimer.Stop()
155155

156156
for {
157157
select {
158158
case <-ctx.Done():
159159
return nil
160160
case <-missingMessagesTimer.C:
161-
tmpUUID := uuid.New()
161+
/*tmpUUID := uuid.New()
162162
runId := hex.EncodeToString(tmpUUID[:])
163163
runIdLogger := logger.With(zap.String("runId", runId))
164164
@@ -169,21 +169,21 @@ func Execute(ctx context.Context, options Options) error {
169169
}
170170
runIdLogger.Info("verification complete")
171171
172-
missingMessagesTimer.Reset(timeInterval)
172+
missingMessagesTimer.Reset(timeInterval)*/
173173
case <-syncCheckTimer.C:
174174
go func() {
175175
tmpUUID := uuid.New()
176176
runId := hex.EncodeToString(tmpUUID[:])
177177
runIdLogger := logger.With(zap.String("syncRunId", runId))
178-
runIdLogger.Info("rechecking missing messages status")
178+
runIdLogger.Info("rechecking missing messages status", zap.Stringers("X", storenodeIDs))
179179

180-
err := application.checkMissingMessageStatus(ctx, runId, runIdLogger)
180+
err := application.checkMissingMessageStatus(ctx, storenodeIDs, runId, runIdLogger)
181181
if err != nil {
182182
logger.Error("could not recheck the status of missing messages", zap.Error(err))
183183
return
184184
}
185185

186-
err = application.countMissingMessages()
186+
err = application.countMissingMessages(storenodeIDs)
187187
if err != nil {
188188
logger.Error("could not count missing messages", zap.Error(err))
189189
return
@@ -338,29 +338,43 @@ func (app *Application) verifyHistory(ctx context.Context, runId string, storeno
338338
return nil
339339
}
340340

341-
func (app *Application) checkMissingMessageStatus(ctx context.Context, runId string, logger *zap.Logger) error {
341+
func (app *Application) checkMissingMessageStatus(ctx context.Context, storenodes []peer.ID, runId string, logger *zap.Logger) error {
342342
now := app.node.Timesource().Now()
343343

344344
// Get all messages whose status is missing or does not exist, and the column found_on_recheck is false
345345
// if found, set found_on_recheck to true
346346
missingMessages, err := app.db.GetMissingMessages(now.Add(-2*time.Hour), now.Add(-time.Hour), options.ClusterID)
347+
if err == nil {
348+
for k, t := range missingMessages {
349+
logger.Info("TTTTTX", zap.String("perID", k.String()), zap.Any("missingMessages", len(t)))
350+
351+
}
352+
}
347353
if err != nil {
354+
logger.Info("TTT2 error?", zap.Error(err))
348355
return err
349356
}
350357

351358
wg := sync.WaitGroup{}
352-
for storenodeID, messageHashes := range missingMessages {
359+
360+
logger.Info("VVVVV1", zap.Any("V1", storenodes))
361+
for _, storenodeID := range storenodes {
362+
logger.Info("VVVVV2")
363+
353364
wg.Add(1)
354365
go func(peerID peer.ID, messageHashes []pb.MessageHash) {
355366
defer wg.Done()
356367

357368
foundMissingMessages := make(map[pb.MessageHash]struct{})
369+
logger.Info("CCCCC Verifying message existence")
358370
app.verifyMessageExistence(ctx, runId, peerID, messageHashes, func(result *store.Result) {
359371
for _, mkv := range result.Messages() {
360372
foundMissingMessages[mkv.WakuMessageHash()] = struct{}{}
361373
}
362374
}, logger)
363375

376+
logger.Info("DDD Marking as found", zap.Int("X", len(foundMissingMessages)))
377+
364378
err := app.db.MarkMessagesAsFound(peerID, maps.Keys(foundMissingMessages), options.ClusterID)
365379
if err != nil {
366380
logger.Error("could not mark messages as found", zap.Error(err))
@@ -369,15 +383,19 @@ func (app *Application) checkMissingMessageStatus(ctx context.Context, runId str
369383

370384
app.metrics.RecordMissingMessagesPrevHour(peerID, len(messageHashes)-len(foundMissingMessages))
371385

372-
}(storenodeID, messageHashes)
386+
}(storenodeID, missingMessages[storenodeID])
387+
logger.Info("VVVVV4")
388+
373389
}
390+
logger.Info("VVVVV5")
374391

375392
wg.Wait()
393+
logger.Info("VVVVV6")
376394

377395
return nil
378396
}
379397

380-
func (app *Application) countMissingMessages() error {
398+
func (app *Application) countMissingMessages(storenodes []peer.ID) error {
381399

382400
// not including last two hours in now to let sync work
383401
now := app.node.Timesource().Now().Add(-2 * time.Hour)
@@ -396,8 +414,8 @@ func (app *Application) countMissingMessages() error {
396414
if err != nil {
397415
return err
398416
}
399-
for storenode, cnt := range results {
400-
app.metrics.RecordMissingMessagesLastWeek(storenode, cnt)
417+
for _, storenodeID := range storenodes {
418+
app.metrics.RecordMissingMessagesLastWeek(storenodeID, results[storenodeID])
401419
}
402420
return nil
403421
}
@@ -539,6 +557,10 @@ func (app *Application) verifyMessageExistence(ctx context.Context, runId string
539557
var result *store.Result
540558
var err error
541559

560+
if len(messageHashes) == 0 {
561+
return
562+
}
563+
542564
peerInfo := app.node.Host().Peerstore().PeerInfo(peerID)
543565

544566
queryLogger := logger.With(zap.Stringer("storenode", peerID))

internal/persistence/database.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -240,6 +240,7 @@ func (d *DBStore) GetTopicSyncStatus(ctx context.Context, clusterID uint, pubsub
240240
}
241241

242242
func (d *DBStore) GetMissingMessages(from time.Time, to time.Time, clusterID uint) (map[peer.ID][]pb.MessageHash, error) {
243+
// And not found on rehcekc
243244
rows, err := d.db.Query("SELECT messageHash, storenode FROM missingMessages WHERE storedAt >= $1 AND storedAt <= $2 AND clusterId = $3 AND msgStatus = 'does_not_exist'", from.UnixNano(), to.UnixNano(), clusterID)
244245
if err != nil {
245246
return nil, err
@@ -307,6 +308,10 @@ func (d *DBStore) RecordMessage(uuid string, tx *sql.Tx, msgHash pb.MessageHash,
307308
}
308309

309310
func (d *DBStore) MarkMessagesAsFound(peerID peer.ID, messageHashes []pb.MessageHash, clusterID uint) error {
311+
if len(messageHashes) == 0 {
312+
return nil
313+
}
314+
310315
query := "UPDATE missingMessages SET foundOnRecheck = true WHERE clusterID = $1 AND messageHash IN ("
311316
for i := range messageHashes {
312317
if i > 0 {
@@ -315,7 +320,7 @@ func (d *DBStore) MarkMessagesAsFound(peerID peer.ID, messageHashes []pb.Message
315320
query += fmt.Sprintf("$%d", i+2)
316321
}
317322
query += ")"
318-
323+
d.log.Info("THE QUERY: ", zap.String("Q", query))
319324
args := []interface{}{clusterID}
320325
for _, messageHash := range messageHashes {
321326
args = append(args, messageHash)
@@ -343,7 +348,7 @@ func (d *DBStore) RecordStorenodeUnavailable(uuid string, storenode peer.ID) err
343348
}
344349

345350
func (d *DBStore) CountMissingMessages(from time.Time, to time.Time, clusterID uint) (map[peer.ID]int, error) {
346-
rows, err := d.db.Query("SELECT storenode, count(1) as cnt FROM missingMessages WHERE storedAt >= $1 AND storedAt <= $2 AND clusterId = $3 AND msgStatus = 'does_not_exist' GROUP BY storenode", from.UnixNano(), to.UnixNano(), clusterID)
351+
rows, err := d.db.Query("SELECT storenode, count(1) as cnt FROM missingMessages WHERE storedAt >= $1 AND storedAt <= $2 AND clusterId = $3 AND msgStatus = 'does_not_exist' AND foundOnRecheck = false GROUP BY storenode", from.UnixNano(), to.UnixNano(), clusterID)
347352
if err != nil {
348353
return nil, err
349354
}

0 commit comments

Comments
 (0)