Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 0 additions & 16 deletions reliability.nimble

This file was deleted.

3 changes: 3 additions & 0 deletions src/sds.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
import ./sds/[message, protobuf, reliability, reliability_utils]

export message, protobuf, reliability, reliability_utils
File renamed without changes.
File renamed without changes.
File renamed without changes.
2 changes: 1 addition & 1 deletion src/protobuf.nim → src/sds/protobuf.nim
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import libp2p/protobuf/minprotobuf
import std/options
import endians
import ../src/[message, protobufutil, bloom, reliability_utils]
import ./[message, protobufutil, bloom, reliability_utils]

proc encode*(msg: SdsMessage): ProtoBuffer =
var pb = initProtoBuffer()
Expand Down
File renamed without changes.
35 changes: 22 additions & 13 deletions src/reliability.nim → src/sds/reliability.nim
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ import std/[times, locks, tables, sets, options]
import chronos, results, chronicles
import ./[message, protobuf, reliability_utils, rolling_bloom_filter]

export message, reliability_utils, protobuf

proc newReliabilityManager*(
config: ReliabilityConfig = defaultConfig()
): Result[ReliabilityManager, ReliabilityError] =
Expand Down Expand Up @@ -47,12 +49,12 @@ proc reviewAckStatus(rm: ReliabilityManager, msg: SdsMessage) {.gcsafe.} =
capacity: bfResult.get().capacity,
minCapacity: (
bfResult.get().capacity.float * (100 - CapacityFlexPercent).float / 100.0
).int,
maxCapacity: (
bfResult.get().capacity.float * (100 + CapacityFlexPercent).float / 100.0
).int,
messages: @[],
)
).int,
maxCapacity: (
bfResult.get().capacity.float * (100 + CapacityFlexPercent).float / 100.0
).int,
messages: @[],
)
)
else:
error "Failed to deserialize bloom filter", error = bfResult.error
Expand Down Expand Up @@ -112,14 +114,16 @@ proc wrapOutgoingMessage*(
let msg = SdsMessage(
messageId: messageId,
lamportTimestamp: channel.lamportTimestamp,
causalHistory: rm.getRecentSdsMessageIDs(rm.config.maxCausalHistory, channelId),
causalHistory: rm.getRecentSdsMessageIDs(rm.config.maxCausalHistory,
channelId),
channelId: channelId,
content: message,
bloomFilter: bfResult.get(),
)

channel.outgoingBuffer.add(
UnacknowledgedMessage(message: msg, sendTime: getTime(), resendAttempts: 0)
UnacknowledgedMessage(message: msg, sendTime: getTime(),
resendAttempts: 0)
)

# Add to causal history and bloom filter
Expand All @@ -132,7 +136,8 @@ proc wrapOutgoingMessage*(
channelId = channelId, msg = getCurrentExceptionMsg()
return err(ReliabilityError.reSerializationError)

proc processIncomingBuffer(rm: ReliabilityManager, channelId: SdsChannelID) {.gcsafe.} =
proc processIncomingBuffer(rm: ReliabilityManager,
channelId: SdsChannelID) {.gcsafe.} =
withLock rm.lock:
if channelId notin rm.channels:
error "Channel does not exist", channelId = channelId
Expand Down Expand Up @@ -176,7 +181,8 @@ proc processIncomingBuffer(rm: ReliabilityManager, channelId: SdsChannelID) {.gc
proc unwrapReceivedMessage*(
rm: ReliabilityManager, message: seq[byte]
): Result[
tuple[message: seq[byte], missingDeps: seq[SdsMessageID], channelId: SdsChannelID],
tuple[message: seq[byte], missingDeps: seq[SdsMessageID],
channelId: SdsChannelID],
ReliabilityError,
] =
## Unwraps a received message and processes its reliability metadata.
Expand Down Expand Up @@ -234,7 +240,8 @@ proc unwrapReceivedMessage*(
return err(ReliabilityError.reDeserializationError)

proc markDependenciesMet*(
rm: ReliabilityManager, messageIds: seq[SdsMessageID], channelId: SdsChannelID
rm: ReliabilityManager, messageIds: seq[SdsMessageID],
channelId: SdsChannelID
): Result[void, ReliabilityError] =
## Marks the specified message dependencies as met.
##
Expand Down Expand Up @@ -330,7 +337,8 @@ proc periodicBufferSweep(
except Exception:
error "Error in periodic buffer sweep", msg = getCurrentExceptionMsg()

await sleepAsync(chronos.milliseconds(rm.config.bufferSweepInterval.inMilliseconds))
await sleepAsync(chronos.milliseconds(
rm.config.bufferSweepInterval.inMilliseconds))

proc periodicSyncMessage(
rm: ReliabilityManager
Expand All @@ -351,7 +359,8 @@ proc startPeriodicTasks*(rm: ReliabilityManager) =
asyncSpawn rm.periodicBufferSweep()
asyncSpawn rm.periodicSyncMessage()

proc resetReliabilityManager*(rm: ReliabilityManager): Result[void, ReliabilityError] =
proc resetReliabilityManager*(rm: ReliabilityManager): Result[void,
ReliabilityError] =
## Resets the ReliabilityManager to its initial state.
##
## This procedure clears all buffers and resets the Lamport timestamp.
Expand Down
File renamed without changes.
File renamed without changes.
2 changes: 1 addition & 1 deletion tests/test_bloom.nim
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import unittest, results, strutils
import ../src/bloom
import ../src/sds/bloom
from random import rand, randomize

suite "bloom filter":
Expand Down
2 changes: 1 addition & 1 deletion tests/test_reliability.nim
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import unittest, results, chronos, std/[times, options, tables]
import ../src/[reliability, message, protobuf, reliability_utils, rolling_bloom_filter]
import ../src/sds/[reliability, message, protobuf, reliability_utils, rolling_bloom_filter]

const testChannel = "testChannel"

Expand Down