Skip to content

Commit 0c3ceef

Browse files
committed
fix: mark message as found
1 parent a9d0eaf commit 0c3ceef

File tree

2 files changed

+19
-11
lines changed

2 files changed

+19
-11
lines changed

cmd/storemsgcounter/execute.go

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -177,13 +177,13 @@ func Execute(ctx context.Context, options Options) error {
177177
runIdLogger := logger.With(zap.String("syncRunId", runId))
178178
runIdLogger.Info("rechecking missing messages status")
179179

180-
err := application.checkMissingMessageStatus(ctx, runId, runIdLogger)
180+
err := application.checkMissingMessageStatus(ctx, storenodeIDs, runId, runIdLogger)
181181
if err != nil {
182182
logger.Error("could not recheck the status of missing messages", zap.Error(err))
183183
return
184184
}
185185

186-
err = application.countMissingMessages()
186+
err = application.countMissingMessages(storenodeIDs)
187187
if err != nil {
188188
logger.Error("could not count missing messages", zap.Error(err))
189189
return
@@ -338,7 +338,7 @@ func (app *Application) verifyHistory(ctx context.Context, runId string, storeno
338338
return nil
339339
}
340340

341-
func (app *Application) checkMissingMessageStatus(ctx context.Context, runId string, logger *zap.Logger) error {
341+
func (app *Application) checkMissingMessageStatus(ctx context.Context, storenodes []peer.ID, runId string, logger *zap.Logger) error {
342342
now := app.node.Timesource().Now()
343343

344344
// Get all messages whose status is missing or does not exist, and the column found_on_recheck is false
@@ -349,7 +349,8 @@ func (app *Application) checkMissingMessageStatus(ctx context.Context, runId str
349349
}
350350

351351
wg := sync.WaitGroup{}
352-
for storenodeID, messageHashes := range missingMessages {
352+
353+
for _, storenodeID := range storenodes {
353354
wg.Add(1)
354355
go func(peerID peer.ID, messageHashes []pb.MessageHash) {
355356
defer wg.Done()
@@ -369,15 +370,14 @@ func (app *Application) checkMissingMessageStatus(ctx context.Context, runId str
369370

370371
app.metrics.RecordMissingMessagesPrevHour(peerID, len(messageHashes)-len(foundMissingMessages))
371372

372-
}(storenodeID, messageHashes)
373+
}(storenodeID, missingMessages[storenodeID])
373374
}
374-
375375
wg.Wait()
376376

377377
return nil
378378
}
379379

380-
func (app *Application) countMissingMessages() error {
380+
func (app *Application) countMissingMessages(storenodes []peer.ID) error {
381381

382382
// not including last two hours in now to let sync work
383383
now := app.node.Timesource().Now().Add(-2 * time.Hour)
@@ -396,8 +396,8 @@ func (app *Application) countMissingMessages() error {
396396
if err != nil {
397397
return err
398398
}
399-
for storenode, cnt := range results {
400-
app.metrics.RecordMissingMessagesLastWeek(storenode, cnt)
399+
for _, storenodeID := range storenodes {
400+
app.metrics.RecordMissingMessagesLastWeek(storenodeID, results[storenodeID])
401401
}
402402
return nil
403403
}
@@ -539,6 +539,10 @@ func (app *Application) verifyMessageExistence(ctx context.Context, runId string
539539
var result *store.Result
540540
var err error
541541

542+
if len(messageHashes) == 0 {
543+
return
544+
}
545+
542546
peerInfo := app.node.Host().Peerstore().PeerInfo(peerID)
543547

544548
queryLogger := logger.With(zap.Stringer("storenode", peerID))

internal/persistence/database.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -240,7 +240,7 @@ func (d *DBStore) GetTopicSyncStatus(ctx context.Context, clusterID uint, pubsub
240240
}
241241

242242
func (d *DBStore) GetMissingMessages(from time.Time, to time.Time, clusterID uint) (map[peer.ID][]pb.MessageHash, error) {
243-
rows, err := d.db.Query("SELECT messageHash, storenode FROM missingMessages WHERE storedAt >= $1 AND storedAt <= $2 AND clusterId = $3 AND msgStatus = 'does_not_exist'", from.UnixNano(), to.UnixNano(), clusterID)
243+
rows, err := d.db.Query("SELECT messageHash, storenode FROM missingMessages WHERE storedAt >= $1 AND storedAt <= $2 AND clusterId = $3 AND msgStatus = 'does_not_exist' AND fundOnRecheck = false", from.UnixNano(), to.UnixNano(), clusterID)
244244
if err != nil {
245245
return nil, err
246246
}
@@ -307,6 +307,10 @@ func (d *DBStore) RecordMessage(uuid string, tx *sql.Tx, msgHash pb.MessageHash,
307307
}
308308

309309
func (d *DBStore) MarkMessagesAsFound(peerID peer.ID, messageHashes []pb.MessageHash, clusterID uint) error {
310+
if len(messageHashes) == 0 {
311+
return nil
312+
}
313+
310314
query := "UPDATE missingMessages SET foundOnRecheck = true WHERE clusterID = $1 AND messageHash IN ("
311315
for i := range messageHashes {
312316
if i > 0 {
@@ -343,7 +347,7 @@ func (d *DBStore) RecordStorenodeUnavailable(uuid string, storenode peer.ID) err
343347
}
344348

345349
func (d *DBStore) CountMissingMessages(from time.Time, to time.Time, clusterID uint) (map[peer.ID]int, error) {
346-
rows, err := d.db.Query("SELECT storenode, count(1) as cnt FROM missingMessages WHERE storedAt >= $1 AND storedAt <= $2 AND clusterId = $3 AND msgStatus = 'does_not_exist' GROUP BY storenode", from.UnixNano(), to.UnixNano(), clusterID)
350+
rows, err := d.db.Query("SELECT storenode, count(1) as cnt FROM missingMessages WHERE storedAt >= $1 AND storedAt <= $2 AND clusterId = $3 AND msgStatus = 'does_not_exist' AND foundOnRecheck = false GROUP BY storenode", from.UnixNano(), to.UnixNano(), clusterID)
347351
if err != nil {
348352
return nil, err
349353
}

0 commit comments

Comments
 (0)