Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 34 additions & 23 deletions waku/waku_archive/driver/postgres_driver/postgres_driver.nim
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,9 @@ proc rowCallbackImpl(

outRows.add((msgHash, pubSubTopic, wakuMessage))

const DefaultDatabasePutRetryInterval = timer.milliseconds(200)
const DefaultDatabasePutRetryCount = 5

method put*(
s: PostgresDriver,
messageHash: WakuMessageHash,
Expand All @@ -312,29 +315,37 @@ method put*(
## until we completely remove the store/archive-v2 logic
let fakeId = "0"

(
## Add the row to the messages table
await s.writeConnPool.runStmt(
InsertRowStmtName,
InsertRowStmtDefinition,
@[
fakeId, messageHash, pubsubTopic, contentTopic, payload, version, timestamp,
meta,
],
@[
int32(fakeId.len),
int32(messageHash.len),
int32(pubsubTopic.len),
int32(contentTopic.len),
int32(payload.len),
int32(version.len),
int32(timestamp.len),
int32(meta.len),
],
@[int32(0), int32(0), int32(0), int32(0), int32(0), int32(0), int32(0), int32(0)],
)
).isOkOr:
return err("could not put msg in messages table: " & $error)
# Briefly retry puts to avoid failing permanently on temporary failures such as
# partition being created in parallel to row insert.
for i in 1 .. DefaultDatabasePutRetryCount:
(
await s.writeConnPool.runStmt(
InsertRowStmtName,
InsertRowStmtDefinition,
@[
fakeId, messageHash, pubsubTopic, contentTopic, payload, version, timestamp,
meta,
],
@[
int32(fakeId.len),
int32(messageHash.len),
int32(pubsubTopic.len),
int32(contentTopic.len),
int32(payload.len),
int32(version.len),
int32(timestamp.len),
int32(meta.len),
],
@[
int32(0), int32(0), int32(0), int32(0), int32(0), int32(0), int32(0), int32(0)
],
)
).isOkOr:
if i == DefaultDatabasePutRetryCount:
return err("could not put msg in messages table: " & $error)
await sleepAsync(DefaultDatabasePutRetryInterval)
continue
break

## Now add the row to messages_lookup
let ret = await s.writeConnPool.runStmt(
Expand Down
Loading