Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions nim_chat_poc.nimble
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,4 @@ requires "confutils"
requires "eth"
requires "regex"
requires "web3"
requires "https://github.com/jazzz/nim-sds#exports"
2 changes: 1 addition & 1 deletion protos/private_v1.proto
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ message Placeholder {
message PrivateV1Frame {
string conversation_id = 1;
bytes sender = 2;

int64 timestamp = 3; // Sender reported timestamp
oneof frame_type {
common_frames.ContentFrame content = 10;
Placeholder placeholder = 11;
Expand Down
35 changes: 29 additions & 6 deletions src/chat_sdk/client.nim
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@
import # Foreign
chronicles,
chronos,
sds,
sequtils,
std/tables,
std/sequtils,
strformat,
strutils,
tables
tables,
types

import #local
conversation_store,
Expand All @@ -34,8 +36,10 @@ logScope:
#################################################

type
MessageCallback[T] = proc(conversation: Conversation, msg: T): Future[void] {.async.}
NewConvoCallback = proc(conversation: Conversation): Future[void] {.async.}
MessageCallback*[T] = proc(conversation: Conversation, msg: T): Future[void] {.async.}
NewConvoCallback* = proc(conversation: Conversation): Future[void] {.async.}
DeliveryAckCallback* = proc(conversation: Conversation,
msgId: MessageId): Future[void] {.async.}


type KeyEntry* = object
Expand All @@ -53,6 +57,7 @@ type Client* = ref object

newMessageCallbacks: seq[MessageCallback[ContentFrame]]
newConvoCallbacks: seq[NewConvoCallback]
deliveryAckCallbacks: seq[DeliveryAckCallback]

#################################################
# Constructors
Expand All @@ -64,6 +69,8 @@ proc newClient*(name: string, cfg: WakuConfig): Client {.raises: [IOError,
try:
let waku = initWakuClient(cfg)

let rm = newReliabilityManager().valueOr:
raise newException(ValueError, fmt"SDS InitializationError")

var q = QueueRef(queue: newAsyncQueue[ChatPayload](10))
var c = Client(ident: createIdentity(name),
Expand Down Expand Up @@ -130,6 +137,14 @@ proc notifyNewConversation(client: Client, convo: Conversation) =
for cb in client.newConvoCallbacks:
discard cb(convo)

proc onDeliveryAck*(client: Client, callback: DeliveryAckCallback) =
client.deliveryAckCallbacks.add(callback)

proc notifyDeliveryAck(client: Client, convo: Conversation,
messageId: MessageId) =
for cb in client.deliveryAckCallbacks:
discard cb(convo, messageId)

#################################################
# Functional
#################################################
Expand All @@ -144,7 +159,7 @@ proc createIntroBundle*(self: var Client): IntroBundle =
self.keyStore[ephemeralKey.getPublicKey().bytes().bytesToHex()] = KeyEntry(
keyType: "ephemeral",
privateKey: ephemeralKey,
timestamp: getTimestamp()
timestamp: getCurrentTimestamp()
)

result = IntroBundle(
Expand Down Expand Up @@ -189,11 +204,19 @@ proc newPrivateConversation*(client: Client,
participant: @(destPubkey.bytes()),
participantEphemeralId: introBundle.ephemeralId,
discriminator: "test"
)
)



let env = wrapEnv(encrypt(InboxV1Frame(invitePrivateV1: invite,
recipient: "")), convoId)

let convo = initPrivateV1(client.identity(), destPubkey, "default")
let deliveryAckCb = proc(
conversation: Conversation,
msgId: string): Future[void] {.async.} =
client.notifyDeliveryAck(conversation, msgId)

let convo = initPrivateV1(client.identity(), destPubkey, "default", deliveryAckCb)
client.addConversation(convo)

# TODO: Subscribe to new content topic
Expand Down
3 changes: 3 additions & 0 deletions src/chat_sdk/conversation_store.nim
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import ./conversations/convo_type
import crypto
import identity
import proto_types
import types

type ConvoId = string

Expand All @@ -16,3 +17,5 @@ type

proc notifyNewMessage(self: Self, convo: Conversation,
content: ContentFrame)
proc notifyDeliveryAck(self: Self, convo: Conversation,
msgId: MessageId)
84 changes: 76 additions & 8 deletions src/chat_sdk/conversations/private_v1.nim
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@


import blake2
import chronicles
import chronos
import sds
import std/[sequtils, strutils, strformat]
import std/algorithm
import sugar
import tables

import ../conversation_store
import ../crypto
Expand All @@ -13,15 +15,16 @@ import ../delivery/waku_client
import ../[
identity,
proto_types,
types,
utils
]
import convo_type



type
PrivateV1* = ref object of Conversation
# Placeholder for PrivateV1 conversation type
sdsClient: ReliabilityManager
owner: Identity
topic: string
participants: seq[PublicKey]
Expand All @@ -48,27 +51,81 @@ proc derive_topic(participants: seq[PublicKey], discriminator: string): string =
## Derives a topic from the participants' public keys.
return "/convo/private/" & getConvoIdRaw(participants, discriminator)

proc calcMsgId(self: PrivateV1, msgBytes: seq[byte]): string =
let s = fmt"{self.getConvoId()}|{msgBytes}"
result = getBlake2b(s, 16, "")


proc encrypt*(convo: PrivateV1, frame: PrivateV1Frame): EncryptedPayload =
result = EncryptedPayload(plaintext: Plaintext(payload: encode(frame)))

proc decrypt*(convo: PrivateV1, enc: EncryptedPayload): PrivateV1Frame =
result = decode(enc.plaintext.payload, PrivateV1Frame).get()


proc wireCallbacks(convo: PrivateV1, deliveryAckCb: proc(
conversation: Conversation,
msgId: string): Future[void] {.async.} = nil) =
## Accepts lambdas/functions to be called from Reliability Manager callbacks.
let funcMsg = proc(messageId: SdsMessageID,
channelId: SdsChannelID) {.gcsafe.} =
debug "sds message ready", messageId = messageId,
channelId = channelId

let funcDeliveryAck = proc(messageId: SdsMessageID,
channelId: SdsChannelID) {.gcsafe.} =
debug "sds message ack", messageId = messageId,
channelId = channelId, cb = repr(deliveryAckCb)

if deliveryAckCb != nil:
asyncSpawn deliveryAckCb(convo, messageId)

let funcDroppedMsg = proc(messageId: SdsMessageID, missingDeps: seq[
SdsMessageID], channelId: SdsChannelID) {.gcsafe.} =
debug "sds message missing", messageId = messageId,
missingDeps = missingDeps, channelId = channelId

convo.sdsClient.setCallbacks(
funcMsg, funcDeliveryAck, funcDroppedMsg
)



proc initPrivateV1*(owner: Identity, participant: PublicKey,
discriminator: string = "default"): PrivateV1 =
discriminator: string = "default", deliveryAckCb: proc(
conversation: Conversation,
msgId: string): Future[void] {.async.} = nil):
PrivateV1 =

var participants = @[owner.getPubkey(), participant];

return PrivateV1(
var rm = newReliabilityManager().valueOr:
raise newException(ValueError, fmt"sds initialization: {repr(error)}")

result = PrivateV1(
sdsClient: rm,
owner: owner,
topic: derive_topic(participants, discriminator),
participants: participants,
discriminator: discriminator
)

result.wireCallbacks(deliveryAckCb)

result.sdsClient.ensureChannel(result.getConvoId()).isOkOr:
raise newException(ValueError, "bad sds channel")

proc sendFrame(self: PrivateV1, ds: WakuClient,
msg: PrivateV1Frame): Future[void]{.async.} =
let encryptedBytes = EncryptedPayload(plaintext: Plaintext(payload: encode(msg)))

let frameBytes = encode(msg)
let msgId = self.calcMsgId(frameBytes)
let sdsPayload = self.sdsClient.wrapOutgoingMessage(frameBytes, msgId,
self.getConvoId()).valueOr:
raise newException(ValueError, fmt"sds wrapOutgoingMessage failed: {repr(error)}")

let encryptedBytes = EncryptedPayload(plaintext: Plaintext(
payload: sdsPayload))

discard ds.sendPayload(self.getTopic(), encryptedBytes.toEnvelope(
self.getConvoId()))
Expand All @@ -81,8 +138,19 @@ proc handleFrame*[T: ConversationStore](convo: PrivateV1, client: T,
## Dispatcher for Incoming `PrivateV1Frames`.
## Calls further processing depending on the kind of frame.

let enc = decode(bytes, EncryptedPayload).get() # TODO: handle result
let frame = convo.decrypt(enc) # TODO: handle result
let enc = decode(bytes, EncryptedPayload).valueOr:
raise newException(ValueError, fmt"Failed to decode EncryptedPayload: {repr(error)}")

# TODO: Decrypt the payload
let (frameData, missingDeps, channelId) = convo.sdsClient.unwrapReceivedMessage(
enc.plaintext.payload).valueOr:
raise newException(ValueError, fmt"Failed to unwrap SDS message:{repr(error)}")

debug "sds unwrap", convo = convo.id(), missingDeps = missingDeps,
channelId = channelId

let frame = decode(frameData, PrivateV1Frame).valueOr:
raise newException(ValueError, "Failed to decode SdsM: " & error)

if frame.sender == @(convo.owner.getPubkey().bytes()):
notice "Self Message", convo = convo.id()
Expand All @@ -102,7 +170,7 @@ method sendMessage*(convo: PrivateV1, ds: WakuClient,

try:
let frame = PrivateV1Frame(sender: @(convo.owner.getPubkey().bytes()),
content: content_frame)
timestamp: getCurrentTimestamp(), content: content_frame)

await convo.sendFrame(ds, frame)
except Exception as e:
Expand Down
2 changes: 1 addition & 1 deletion src/chat_sdk/delivery/waku_client.nim
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ proc buildWakuNode(cfg: WakuConfig): WakuNode =
builder.withNetworkConfigurationDetails(ip, Port(cfg.port)).tryGet()
let node = builder.build().tryGet()

node.mountMetadata(cfg.clusterId, @[1'u16, 2'u16]).expect("failed to mount waku metadata protocol")
node.mountMetadata(cfg.clusterId, cfg.shardId).expect("failed to mount waku metadata protocol")

result = node

Expand Down
6 changes: 5 additions & 1 deletion src/chat_sdk/inbox.nim
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,12 @@ proc createPrivateV1FromInvite*[T: ConversationStore](client: T,
let destPubkey = loadPublicKeyFromBytes(invite.initiator).valueOr:
raise newException(ValueError, "Invalid public key in intro bundle.")

let convo = initPrivateV1(client.identity(), destPubkey, "default")
let deliveryAckCb = proc(
conversation: Conversation,
msgId: string): Future[void] {.async.} =
client.notifyDeliveryAck(conversation, msgId)

let convo = initPrivateV1(client.identity(), destPubkey, "default", deliveryAckCb)
notice "Creating PrivateV1 conversation", client = client.getId(),
topic = convo.getConvoId()
client.addConversation(convo)
Expand Down
3 changes: 3 additions & 0 deletions src/chat_sdk/types.nim
Original file line number Diff line number Diff line change
@@ -1 +1,4 @@
type ChatError* = string


type MessageId* = string
4 changes: 2 additions & 2 deletions src/chat_sdk/utils.nim
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ import crypto
import blake2
import strutils

proc getTimestamp*(): Timestamp =
proc getCurrentTimestamp*(): Timestamp =
result = waku_core.getNanosecondTime(getTime().toUnix())


proc hash_func*(s: string): string =
proc hash_func*(s: string | seq[byte]): string =
# This should be Blake2s but it does not exist so substituting with Blake2b
result = getBlake2b(s, 4, "")

Expand Down
Loading