|
4 | 4 | "context" |
5 | 5 | "database/sql" |
6 | 6 | "encoding/hex" |
7 | | - "encoding/json" |
8 | 7 | "errors" |
9 | 8 | "fmt" |
10 | 9 | "net" |
@@ -285,10 +284,9 @@ func retrieveHistory(ctx context.Context, runId string, storenodes []peer.AddrIn |
285 | 284 | var result *store.Result |
286 | 285 | var err error |
287 | 286 |
|
288 | | - logger.Info("retrieving message history for topic", zap.Stringer("storenode", node), zap.Int64("from", startTime.UnixNano()), zap.Int64("to", endTime.UnixNano())) |
289 | | - |
290 | 287 | queryLbl: |
291 | 288 | for i := 0; i < maxAttempts; i++ { |
| 289 | + logger.Info("retrieving message history for topic", zap.Stringer("storenode", node), zap.Int64("from", startTime.UnixNano()), zap.Int64("to", endTime.UnixNano()), zap.Int("attempt", i)) |
292 | 290 | result, err = wakuNode.Store().Query(ctx, store.FilterCriteria{ |
293 | 291 | ContentFilter: protocol.NewContentFilter(topic), |
294 | 292 | TimeStart: proto.Int64(startTime.UnixNano()), |
@@ -345,8 +343,7 @@ func retrieveHistory(ctx context.Context, runId string, storenodes []peer.AddrIn |
345 | 343 |
|
346 | 344 | nextRetryLbl: |
347 | 345 | for i := 0; i < maxAttempts; i++ { |
348 | | - a, _ := json.Marshal(result.Response()) |
349 | | - logger.Info("EXECUTING NEXT!!!", zap.String("cursor", hex.EncodeToString(result.Cursor())), zap.String("RESPONSE", string(a))) |
| 346 | + logger.Info("EXECUTING NEXT!!!", zap.String("cursor", hex.EncodeToString(result.Cursor()))) |
350 | 347 | err = result.Next(ctx) |
351 | 348 | if err != nil { |
352 | 349 | logger.Error("could not query storenode", zap.Stringer("storenode", node), zap.Error(err)) |
@@ -392,7 +389,8 @@ func verifyMessageExistence(ctx context.Context, runId string, peerID peer.ID, m |
392 | 389 |
|
393 | 390 | queryLbl: |
394 | 391 | for i := 0; i < maxAttempts; i++ { |
395 | | - result, err = wakuNode.Store().QueryByHash(ctx, messageHashes, store.IncludeData(false), store.WithPeer(peerInfo.ID)) |
| 392 | + logger.Info("querying by hash", zap.Stringer("storenode", peerID), zap.Stringers("hashes", messageHashes), zap.Int("attempt", i)) |
| 393 | + result, err = wakuNode.Store().QueryByHash(ctx, messageHashes, store.IncludeData(false), store.WithPeer(peerInfo.ID), store.WithPaging(false, 100)) |
396 | 394 | if err != nil { |
397 | 395 | logger.Error("could not query storenode", zap.Stringer("storenode", peerInfo), zap.Error(err)) |
398 | 396 | storeNodeFailure = true |
@@ -436,6 +434,7 @@ queryLbl: |
436 | 434 |
|
437 | 435 | nextRetryLbl: |
438 | 436 | for i := 0; i < maxAttempts; i++ { |
| 437 | + logger.Info("executing next while querying hashes", zap.Stringer("storenode", peerID), zap.Int("attempt", i)) |
439 | 438 | err = result.Next(ctx) |
440 | 439 | if err != nil { |
441 | 440 | logger.Error("could not query storenode", zap.Stringer("storenode", peerInfo), zap.Error(err)) |
|
0 commit comments