Skip to content
Open
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
339 changes: 339 additions & 0 deletions src/reliability.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,339 @@
import std/[times, locks, tables, sets, sequtils]
import chronos, results, chronicles
import ./[message, protobuf, reliability_utils, rolling_bloom_filter]

proc newReliabilityManager*(
channelId: SdsChannelID, config: ReliabilityConfig = defaultConfig()
): Result[ReliabilityManager, ReliabilityError] =
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would recommend to create a new reliability_manager.nim module and then all the ReliabilityManager funcs/procs defined in reliability_utils.nim, to be moved into that new reliability_manager.nim.

Also, for ref object types (heap allocated and GC'ed), the idiomatic way to create them is through a new proc. I think it is fine returning a Result but better rename it to new. See the following as an example: https://github.com/waku-org/nwaku/blob/addce8dc3338ea1ca7d6ce1dd525f6f50ccbf467/waku/node/delivery_monitor/recv_monitor.nim#L153-L157

## Creates a new ReliabilityManager with the specified channel ID and configuration.
##
## Parameters:
## - channelId: A unique identifier for the communication channel.
## - config: Configuration options for the ReliabilityManager. If not provided, default configuration is used.
##
## Returns:
## A Result containing either a new ReliabilityManager instance or an error.
if channelId.len == 0:
return err(ReliabilityError.reInvalidArgument)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would depend on how channelId is set elsewhere, but could this rather be an Option[SdsChannelID] which you test with isSome()?


try:
let bloomFilter =
newRollingBloomFilter(config.bloomFilterCapacity, config.bloomFilterErrorRate)

let rm = ReliabilityManager(
lamportTimestamp: 0,
messageHistory: @[],
bloomFilter: bloomFilter,
outgoingBuffer: @[],
incomingBuffer: @[],
channelId: channelId,
config: config,
)
initLock(rm.lock)
return ok(rm)
except Exception:
error "Failed to create ReliabilityManager", msg = getCurrentExceptionMsg()
return err(ReliabilityError.reOutOfMemory)

proc reviewAckStatus(rm: ReliabilityManager, msg: SdsMessage) {.gcsafe.} =
var i = 0
while i < rm.outgoingBuffer.len:
var acknowledged = false
let outMsg = rm.outgoingBuffer[i]

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To me this block of code will read a bit easier if you introduce a isAcknowledged function that tests the ack status of each message in the outgoingBuffer.

Assuming you parse the rollingBloomFilter once before entering the while loop, it could look something like:

Suggested change
if outMsg.isAcknowledged(msg.causalHistory, rollingBloomFilter):
if not rm.onMessageSent.isNil():
rm.onMessageSent(outMsg.message.messageId)

etc.

# Check if message is in causal history
for msgID in msg.causalHistory:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can use in or contains here to replace the block:

if outMsg.message.messagId in msg.causalHistory:

if outMsg.message.messageId == msgID:
acknowledged = true
break

# Check bloom filter if not already acknowledged
if not acknowledged and msg.bloomFilter.len > 0:
let bfResult = deserializeBloomFilter(msg.bloomFilter)
if bfResult.isOk():
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is better to use the valueOr approach.

Suggested change
let bfResult = deserializeBloomFilter(msg.bloomFilter)
if bfResult.isOk():
let bloomFilter = deserializeBloomFilter(msg.bloomFilter).valueOr:
error "Failed to deserialize bloom filter", error = $error
let rbf = RollingBloomFilter(
filter: bloomFilter,
...

var rbf = RollingBloomFilter(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should avoid creating instances of RollingBloomFilter outside the rolling_bloom_filter.nim module.
Then, we need a special init proc, which will be in charge of creating such objects from a seq[byte] bloom filter.

filter: bfResult.get(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why parse the RollingBloomFilter for each message being read in the outgoingBuffer loop? I think you can parse and create the bloom filter once before entering the while loop?

capacity: bfResult.get().capacity,
minCapacity: (
bfResult.get().capacity.float * (100 - CapacityFlexPercent).float / 100.0
).int,
maxCapacity: (
bfResult.get().capacity.float * (100 + CapacityFlexPercent).float / 100.0
).int,
messages: @[],
)
if rbf.contains(outMsg.message.messageId):
acknowledged = true
else:
error "Failed to deserialize bloom filter", error = bfResult.error

if acknowledged:
if not rm.onMessageSent.isNil():
rm.onMessageSent(outMsg.message.messageId)
rm.outgoingBuffer.delete(i)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it's good practice to remove items from the buffer you are iterating over. I would suggest building a list of items that should be cleaned and then deleting them all at once after exiting the iteration loop. @Ivansete-status any suggestions here on general best practice in Nim?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think is better the approach you suggest @jm-clius.
I'd use something like: let newFilteredSeq = sequence.filterIt( ...)

else:
inc i

proc wrapOutgoingMessage*(
rm: ReliabilityManager, message: seq[byte], messageId: SdsMessageID
): Result[seq[byte], ReliabilityError] =
## Wraps an outgoing message with reliability metadata.
##
## Parameters:
## - message: The content of the message to be sent.
## - messageId: Unique identifier for the message
##
## Returns:
## A Result containing either wrapped message bytes or an error.
if message.len == 0:
return err(ReliabilityError.reInvalidArgument)
if message.len > MaxMessageSize:
return err(ReliabilityError.reMessageTooLarge)

withLock rm.lock:
try:
rm.updateLamportTimestamp(getTime().toUnix)

let bfResult = serializeBloomFilter(rm.bloomFilter.filter)
if bfResult.isErr:
error "Failed to serialize bloom filter"
return err(ReliabilityError.reSerializationError)

let msg = SdsMessage(
messageId: messageId,
lamportTimestamp: rm.lamportTimestamp,
causalHistory: rm.getRecentSdsMessageIDs(rm.config.maxCausalHistory),
channelId: rm.channelId,
content: message,
bloomFilter: bfResult.get(),
)

# Add to outgoing buffer
rm.outgoingBuffer.add(
UnacknowledgedMessage(message: msg, sendTime: getTime(), resendAttempts: 0)
)

# Add to causal history and bloom filter
rm.bloomFilter.add(msg.messageId)
rm.addToHistory(msg.messageId)

return serializeMessage(msg)
except Exception:
error "Failed to wrap message", msg = getCurrentExceptionMsg()
return err(ReliabilityError.reSerializationError)

proc processIncomingBuffer(rm: ReliabilityManager) {.gcsafe.} =
withLock rm.lock:
if rm.incomingBuffer.len == 0:
return

# Create dependency map
var dependencies = initTable[SdsMessageID, seq[SdsMessageID]]()
var readyToProcess: seq[SdsMessageID] = @[]
var processed = initHashSet[SdsMessageID]()

# Build dependency graph and find initially ready messages
for msg in rm.incomingBuffer:
var hasMissingDeps = false
for depId in msg.causalHistory:
if not rm.bloomFilter.contains(depId):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that I didn't initially consider the bloomFilter a good source for checking the dependencies of received messages as we already have a more reliable source (local history itself). Wouldn't it be possible to have a dependency check callback, perhaps implemented by the application, that takes a list of dependencies as argument and return the sublist of resolved dependencies after checking local history?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a first step, we can check the rm.messageHistory and only call the application-level callback if we require longer-term history. The application can also at that point try to retrieve still unresolved dependencies from Store nodes.

hasMissingDeps = true
if depId notin dependencies:
dependencies[depId] = @[]
dependencies[depId].add(msg.messageId)

if not hasMissingDeps:
readyToProcess.add(msg.messageId)

while readyToProcess.len > 0:
let msgId = readyToProcess.pop()
if msgId in processed:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Struggling a bit to see how the msgId could already be processed at this point. :)

continue

# Process this message
for msg in rm.incomingBuffer:
if msg.messageId == msgId:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems strange that you have to iterate through the incomingBuffer again just to find this message, that you previously iterated over. Why not simply add the full message to the temporary readyToProcess buffer?

Either way, it might be useful to consider using a template like anyIt when iterating through a container to find a specific item: https://nim-lang.org/docs/sequtils.html#anyIt.t%2Cuntyped%2Cuntyped

rm.addToHistory(msg.messageId)
if not rm.onMessageReady.isNil():
rm.onMessageReady(msg.messageId)
processed.incl(msgId)

# Add any dependent messages that might now be ready
if msgId in dependencies:
readyToProcess.add(dependencies[msgId])
break

rm.incomingBuffer = rm.incomingBuffer.filterIt(it.messageId notin processed)

proc unwrapReceivedMessage*(
rm: ReliabilityManager, message: seq[byte]
): Result[tuple[message: seq[byte], missingDeps: seq[SdsMessageID]], ReliabilityError] =
## Unwraps a received message and processes its reliability metadata.
##
## Parameters:
## - message: The received message bytes
##
## Returns:
## A Result containing either tuple of (processed message, missing dependencies) or an error.
try:
let msg = deserializeMessage(message).valueOr:
return err(ReliabilityError.reDeserializationError)

if rm.bloomFilter.contains(msg.messageId):
return ok((msg.content, @[]))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Presumably this is a duplicate check? I wouldn't rely on bloomFilter unnecessarily here, as false positives could mean that we accidentally believe that we've seen this message before. I'd only use messageHistory here.


rm.bloomFilter.add(msg.messageId)

# Update Lamport timestamp
rm.updateLamportTimestamp(msg.lamportTimestamp)

# Review ACK status for outgoing messages
rm.reviewAckStatus(msg)

var missingDeps: seq[SdsMessageID] = @[]
for depId in msg.causalHistory:
if not rm.bloomFilter.contains(depId):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See point elsewhere. I don't think we should use the bloomFilter (where false positives are possible) if we have access to reliable information in local messageHistory.

missingDeps.add(depId)

if missingDeps.len == 0:
# Check if any dependencies are still in incoming buffer
var depsInBuffer = false
for bufferedMsg in rm.incomingBuffer:
if bufferedMsg.messageId in msg.causalHistory:
depsInBuffer = true
break

if depsInBuffer:
rm.incomingBuffer.add(msg)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this step is necessary if we're going to add the message to the incomingBuffer in any case. Presumably, you'd like to avoid the onMissingDependencies call? What is expected to happen within the onMissingDependencies call?

else:
# All dependencies met, add to history
rm.addToHistory(msg.messageId)
rm.processIncomingBuffer()
if not rm.onMessageReady.isNil():
rm.onMessageReady(msg.messageId)
else:
rm.incomingBuffer.add(msg)
if not rm.onMissingDependencies.isNil():
rm.onMissingDependencies(msg.messageId, missingDeps)

return ok((msg.content, missingDeps))
except Exception:
error "Failed to unwrap message", msg = getCurrentExceptionMsg()
return err(ReliabilityError.reDeserializationError)

proc markDependenciesMet*(
rm: ReliabilityManager, messageIds: seq[SdsMessageID]
): Result[void, ReliabilityError] =
## Marks the specified message dependencies as met.
##
## Parameters:
## - messageIds: A sequence of message IDs to mark as met.
##
## Returns:
## A Result indicating success or an error.
try:
# Add all messageIds to bloom filter
for msgId in messageIds:
if not rm.bloomFilter.contains(msgId):
rm.bloomFilter.add(msgId)
Comment on lines +246 to +248
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking that if we simplify the incomingBuffer to be a map of messageId to a seq[SdsMessageID] of missing dependencies (we can cache the full message contents elsewhere), we could directly mark these dependencies as met by removing them from the missingDependencies seq of each message in the incoming buffer. Processing the incoming buffer then becomes simply finding and processing the messages for which we now have an empty missingDependencies seq. WDYT?

# rm.addToHistory(msgId) -- not needed as this proc usually called when msg in long-term storage of application?

rm.processIncomingBuffer()
return ok()
except Exception:
error "Failed to mark dependencies as met", msg = getCurrentExceptionMsg()
return err(ReliabilityError.reInternalError)

proc setCallbacks*(
rm: ReliabilityManager,
onMessageReady: proc(messageId: SdsMessageID) {.gcsafe.},
onMessageSent: proc(messageId: SdsMessageID) {.gcsafe.},
onMissingDependencies:
proc(messageId: SdsMessageID, missingDeps: seq[SdsMessageID]) {.gcsafe.},
onPeriodicSync: PeriodicSyncCallback = nil,
) =
## Sets the callback functions for various events in the ReliabilityManager.
##
## Parameters:
## - onMessageReady: Callback function called when a message is ready to be processed.
## - onMessageSent: Callback function called when a message is confirmed as sent.
## - onMissingDependencies: Callback function called when a message has missing dependencies.
## - onPeriodicSync: Callback function called to notify about periodic sync
withLock rm.lock:
rm.onMessageReady = onMessageReady
rm.onMessageSent = onMessageSent
rm.onMissingDependencies = onMissingDependencies
rm.onPeriodicSync = onPeriodicSync

proc checkUnacknowledgedMessages(rm: ReliabilityManager) {.gcsafe.} =
## Checks and processes unacknowledged messages in the outgoing buffer.
withLock rm.lock:
let now = getTime()
var newOutgoingBuffer: seq[UnacknowledgedMessage] = @[]

for unackMsg in rm.outgoingBuffer:
let elapsed = now - unackMsg.sendTime
if elapsed > rm.config.resendInterval:
# Time to attempt resend
if unackMsg.resendAttempts < rm.config.maxResendAttempts:
var updatedMsg = unackMsg
updatedMsg.resendAttempts += 1
updatedMsg.sendTime = now
newOutgoingBuffer.add(updatedMsg)
else:
if not rm.onMessageSent.isNil():
rm.onMessageSent(unackMsg.message.messageId)
else:
newOutgoingBuffer.add(unackMsg)

rm.outgoingBuffer = newOutgoingBuffer

proc periodicBufferSweep(
rm: ReliabilityManager
) {.async: (raises: [CancelledError]), gcsafe.} =
## Periodically sweeps the buffer to clean up and check unacknowledged messages.
while true:
try:
rm.checkUnacknowledgedMessages()
rm.cleanBloomFilter()
except Exception:
error "Error in periodic buffer sweep", msg = getCurrentExceptionMsg()

await sleepAsync(chronos.milliseconds(rm.config.bufferSweepInterval.inMilliseconds))

proc periodicSyncMessage(
rm: ReliabilityManager
) {.async: (raises: [CancelledError]), gcsafe.} =
## Periodically notifies to send a sync message to maintain connectivity.
while true:
try:
if not rm.onPeriodicSync.isNil():
rm.onPeriodicSync()
except Exception:
error "Error in periodic sync", msg = getCurrentExceptionMsg()
await sleepAsync(chronos.seconds(rm.config.syncMessageInterval.inSeconds))

proc startPeriodicTasks*(rm: ReliabilityManager) =
## Starts the periodic tasks for buffer sweeping and sync message sending.
##
## This procedure should be called after creating a ReliabilityManager to enable automatic maintenance.
asyncSpawn rm.periodicBufferSweep()
asyncSpawn rm.periodicSyncMessage()

proc resetReliabilityManager*(rm: ReliabilityManager): Result[void, ReliabilityError] =
## Resets the ReliabilityManager to its initial state.
##
## This procedure clears all buffers and resets the Lamport timestamp.
withLock rm.lock:
try:
rm.lamportTimestamp = 0
rm.messageHistory.setLen(0)
rm.outgoingBuffer.setLen(0)
rm.incomingBuffer.setLen(0)
rm.bloomFilter = newRollingBloomFilter(
rm.config.bloomFilterCapacity, rm.config.bloomFilterErrorRate
)
return ok()
except Exception:
error "Failed to reset ReliabilityManager", msg = getCurrentExceptionMsg()
return err(ReliabilityError.reInternalError)