diff --git a/waku/waku_archive/driver/postgres_driver/postgres_driver.nim b/waku/waku_archive/driver/postgres_driver/postgres_driver.nim index 9b0e14c841..436dca62a2 100644 --- a/waku/waku_archive/driver/postgres_driver/postgres_driver.nim +++ b/waku/waku_archive/driver/postgres_driver/postgres_driver.nim @@ -291,6 +291,9 @@ proc rowCallbackImpl( outRows.add((msgHash, pubSubTopic, wakuMessage)) +const DefaultDatabasePutRetryInterval = timer.milliseconds(200) +const DefaultDatabasePutRetryCount = 5 + method put*( s: PostgresDriver, messageHash: WakuMessageHash, @@ -312,29 +315,37 @@ method put*( ## until we completely remove the store/archive-v2 logic let fakeId = "0" - ( - ## Add the row to the messages table - await s.writeConnPool.runStmt( - InsertRowStmtName, - InsertRowStmtDefinition, - @[ - fakeId, messageHash, pubsubTopic, contentTopic, payload, version, timestamp, - meta, - ], - @[ - int32(fakeId.len), - int32(messageHash.len), - int32(pubsubTopic.len), - int32(contentTopic.len), - int32(payload.len), - int32(version.len), - int32(timestamp.len), - int32(meta.len), - ], - @[int32(0), int32(0), int32(0), int32(0), int32(0), int32(0), int32(0), int32(0)], - ) - ).isOkOr: - return err("could not put msg in messages table: " & $error) + # Briefly retry puts to avoid failing permanently on temporary failures such as + # partition being created in parallel to row insert. + for i in 1 .. DefaultDatabasePutRetryCount: + ( + await s.writeConnPool.runStmt( + InsertRowStmtName, + InsertRowStmtDefinition, + @[ + fakeId, messageHash, pubsubTopic, contentTopic, payload, version, timestamp, + meta, + ], + @[ + int32(fakeId.len), + int32(messageHash.len), + int32(pubsubTopic.len), + int32(contentTopic.len), + int32(payload.len), + int32(version.len), + int32(timestamp.len), + int32(meta.len), + ], + @[ + int32(0), int32(0), int32(0), int32(0), int32(0), int32(0), int32(0), int32(0) + ], + ) + ).isOkOr: + if i == DefaultDatabasePutRetryCount: + return err("could not put msg in messages table: " & $error) + await sleepAsync(DefaultDatabasePutRetryInterval) + continue + break ## Now add the row to messages_lookup let ret = await s.writeConnPool.runStmt(