Skip to content

Commit 16822cb

Browse files
committed
feat: poc to integrate mix into waku and use lightpush to demonstrate
1 parent 091024b commit 16822cb

File tree

7 files changed

+242
-2
lines changed

7 files changed

+242
-2
lines changed

.gitmodules

+4
Original file line numberDiff line numberDiff line change
@@ -194,3 +194,7 @@
194194
url = https://github.com/waku-org/waku-rlnv2-contract.git
195195
ignore = untracked
196196
branch = master
197+
[submodule "vendor/mix"]
198+
path = vendor/mix
199+
url = https://github.com/vacp2p/mix/
200+
branch = mix-waku-integ

examples/lightpush_publisher_mix.nim

+127
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
import
2+
std/[tables, times, sequtils],
3+
stew/byteutils,
4+
stew/shims/net,
5+
chronicles,
6+
results,
7+
chronos,
8+
confutils,
9+
libp2p/crypto/crypto,
10+
eth/keys,
11+
eth/p2p/discoveryv5/enr
12+
13+
import ../vendor/mix/src/entry_connection,
14+
../vendor/mix/src/protocol
15+
16+
import
17+
waku/[
18+
common/logging,
19+
node/peer_manager,
20+
waku_core,
21+
waku_core/codecs,
22+
waku_node,
23+
waku_enr,
24+
discovery/waku_discv5,
25+
factory/builder,
26+
waku_lightpush/client
27+
]
28+
29+
proc now*(): Timestamp =
30+
getNanosecondTime(getTime().toUnixFloat())
31+
32+
# careful if running pub and sub in the same machine
33+
const wakuPort = 60000
34+
35+
const clusterId = 2
36+
const shardId = @[0'u16]
37+
38+
const
39+
LightpushPeer =
40+
"/ip4/127.0.0.1/tcp/60005/p2p/16Uiu2HAmRhxmCHBYdXt1RibXrjAUNJbduAhzaTHwFCZT4qWnqZAu"
41+
LightpushPubsubTopic = PubsubTopic("/waku/2/rs/2/0")
42+
LightpushContentTopic = ContentTopic("/examples/1/light-pubsub-mix-example/proto")
43+
44+
proc setupAndPublish(rng: ref HmacDrbgContext) {.async.} =
45+
# use notice to filter all waku messaging
46+
setupLog(logging.LogLevel.DEBUG, logging.LogFormat.TEXT)
47+
48+
notice "starting publisher", wakuPort = wakuPort
49+
50+
let
51+
nodeKey = crypto.PrivateKey.random(Secp256k1, rng[]).get()
52+
ip = parseIpAddress("0.0.0.0")
53+
flags = CapabilitiesBitfield.init(relay = true)
54+
55+
let relayShards = RelayShards.init(clusterId, shardId).valueOr:
56+
error "Relay shards initialization failed", error = error
57+
quit(QuitFailure)
58+
59+
var enrBuilder = EnrBuilder.init(nodeKey)
60+
enrBuilder.withWakuRelaySharding(relayShards).expect(
61+
"Building ENR with relay sharding failed"
62+
)
63+
64+
let recordRes = enrBuilder.build()
65+
let record =
66+
if recordRes.isErr():
67+
error "failed to create enr record", error = recordRes.error
68+
quit(QuitFailure)
69+
else:
70+
recordRes.get()
71+
72+
var builder = WakuNodeBuilder.init()
73+
builder.withNodeKey(nodeKey)
74+
builder.withRecord(record)
75+
builder.withNetworkConfigurationDetails(ip, Port(wakuPort)).tryGet()
76+
let node = builder.build().tryGet()
77+
78+
node.mountMetadata(clusterId).expect("failed to mount waku metadata protocol")
79+
node.mountLightPushClient()
80+
(
81+
await node.mountMix("401dd1eb5582f6dc9488d424aa26ed1092becefcf8543172e6d92c17ed07265a")
82+
).isOkOr:
83+
error "failed to mount waku mix protocol: ", error = $error
84+
return
85+
86+
let destPeerId = PeerId.init("16Uiu2HAmPiEs2ozjjJF2iN2Pe2FYeMC9w4caRHKYdLdAfjgbWM6o").valueOr:
87+
error "Failed to initialize PeerId", err = error
88+
return
89+
90+
let conn = MixEntryConnection.newConn(
91+
"/ip4/127.0.0.1/tcp/60001",
92+
destPeerId,
93+
ProtocolType.fromString(WakuLightPushCodec),
94+
node.mix)
95+
96+
await node.start()
97+
node.peerManager.start()
98+
99+
notice "publisher service started"
100+
while true:
101+
let text = "hi there i'm a publisher using mix"
102+
let message = WakuMessage(
103+
payload: toBytes(text), # content of the message
104+
contentTopic: LightpushContentTopic, # content topic to publish to
105+
ephemeral: true, # tell store nodes to not store it
106+
timestamp: now(),
107+
) # current timestamp
108+
109+
let res =
110+
await node.wakuLightpushClient.publishWithConn(LightpushPubsubTopic, message, conn)
111+
112+
if res.isOk:
113+
notice "published message",
114+
text = text,
115+
timestamp = message.timestamp,
116+
psTopic = LightpushPubsubTopic,
117+
contentTopic = LightpushContentTopic
118+
else:
119+
error "failed to publish message", error = res.error
120+
121+
await sleepAsync(5000)
122+
break
123+
124+
when isMainModule:
125+
let rng = crypto.newRng()
126+
asyncSpawn setupAndPublish(rng)
127+
runForever()

waku.nimble

+1
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,7 @@ task example2, "Build Waku examples":
132132
buildBinary "subscriber", "examples/"
133133
buildBinary "filter_subscriber", "examples/"
134134
buildBinary "lightpush_publisher", "examples/"
135+
buildBinary "lightpush_publisher_mix", "examples/"
135136

136137
task chat2, "Build example Waku chat usage":
137138
# NOTE For debugging, set debug level. For chat usage we want minimal log

waku/factory/external_config.nim

+10
Original file line numberDiff line numberDiff line change
@@ -663,6 +663,16 @@ with the drawback of consuming some more bandwidth.""",
663663
name: "rendezvous"
664664
.}: bool
665665

666+
#Mix config
667+
mixkey* {.desc: "ED25519 private key as 64 char hex string.", name: "mixkey".}:
668+
Option[string]
669+
#TODO: Temp config for simulations.Ideally need to get this info from bootstrap ENRs
670+
#[ mixBootstrapNodes* {.
671+
desc:
672+
"Text-encoded data for mix bootstrap node. Encoded in the format Multiaddress:libp2pPubKey:MixPubKey. Argument may be repeated.",
673+
name: "mix-bootstrap-node"
674+
.}: seq[string] ]#
675+
666676
## websocket config
667677
websocketSupport* {.
668678
desc: "Enable websocket: true|false",

waku/factory/node_factory.nim

+11
Original file line numberDiff line numberDiff line change
@@ -422,6 +422,17 @@ proc setupProtocols(
422422
return
423423
err("failed to set node waku peer-exchange peer: " & peerExchangeNode.error)
424424

425+
#mount mix
426+
let mixPrivKey:string =
427+
if conf.mixkey.isSome():
428+
conf.mixkey.get()
429+
else:
430+
error "missing mix key"
431+
return err("missing mix key")
432+
(
433+
await node.mountMix(mixPrivKey)
434+
).isOkOr:
435+
return err("failed to mount waku mix protocol: " & $error)
425436
return ok()
426437

427438
## Start node

waku/node/waku_node.nim

+70-1
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,11 @@ import
99
stew/byteutils,
1010
eth/keys,
1111
nimcrypto,
12+
nimcrypto/utils as ncrutils,
1213
bearssl/rand,
1314
eth/p2p/discoveryv5/enr,
1415
libp2p/crypto/crypto,
16+
libp2p/crypto/curve25519,
1517
libp2p/protocols/ping,
1618
libp2p/protocols/pubsub/gossipsub,
1719
libp2p/protocols/pubsub/rpc/messages,
@@ -20,7 +22,13 @@ import
2022
libp2p/builders,
2123
libp2p/transports/transport,
2224
libp2p/transports/tcptransport,
23-
libp2p/transports/wstransport
25+
libp2p/transports/wstransport,
26+
../../vendor/mix/src/mix_node,
27+
../../vendor/mix/src/mix_protocol,
28+
../../vendor/mix/src/curve25519,
29+
../../vendor/mix/src/protocol
30+
31+
2432
import
2533
../waku_core,
2634
../waku_core/topics/sharding,
@@ -119,6 +127,8 @@ type
119127
topicSubscriptionQueue*: AsyncEventQueue[SubscriptionEvent]
120128
contentTopicHandlers: Table[ContentTopic, TopicHandler]
121129
rateLimitSettings*: ProtocolRateLimitSettings
130+
mix*: MixProtocol
131+
mixbootNodes*: Table[PeerId, MixPubInfo]
122132

123133
proc new*(
124134
T: type WakuNode,
@@ -204,6 +214,65 @@ proc mountSharding*(
204214
node.wakuSharding = Sharding(clusterId: clusterId, shardCountGenZero: shardCount)
205215
return ok()
206216

217+
proc getBootStrapMixNodes(node: WakuNode, exceptPeerID: PeerId): Table[PeerId, MixPubInfo] =
218+
var mixNodes = initTable[PeerId, MixPubInfo]()
219+
# MixNode Multiaddrs and PublicKeys:
220+
let bootNodesMultiaddrs = ["/ip4/127.0.0.1/tcp/60001/p2p/16Uiu2HAmPiEs2ozjjJF2iN2Pe2FYeMC9w4caRHKYdLdAfjgbWM6o",
221+
"/ip4/127.0.0.1/tcp/60002/p2p/16Uiu2HAmLtKaFaSWDohToWhWUZFLtqzYZGPFuXwKrojFVF6az5UF",
222+
"/ip4/127.0.0.1/tcp/60003/p2p/16Uiu2HAmTEDHwAziWUSz6ZE23h5vxG2o4Nn7GazhMor4bVuMXTrA",
223+
"/ip4/127.0.0.1/tcp/60004/p2p/16Uiu2HAmPwRKZajXtfb1Qsv45VVfRZgK3ENdfmnqzSrVm3BczF6f",
224+
"/ip4/127.0.0.1/tcp/60005/p2p/16Uiu2HAmRhxmCHBYdXt1RibXrjAUNJbduAhzaTHwFCZT4qWnqZAu",
225+
]
226+
let bootNodesMixPubKeys = ["9d09ce624f76e8f606265edb9cca2b7de9b41772a6d784bddaf92ffa8fba7d2c",
227+
"9231e86da6432502900a84f867004ce78632ab52cd8e30b1ec322cd795710c2a",
228+
"275cd6889e1f29ca48e5b9edb800d1a94f49f13d393a0ecf1a07af753506de6c",
229+
"e0ed594a8d506681be075e8e23723478388fb182477f7a469309a25e7076fc18",
230+
"8fd7a1a7c19b403d231452a9b1ea40eb1cc76f455d918ef8980e7685f9eeeb1f"
231+
]
232+
for index, mixNodeMultiaddr in bootNodesMultiaddrs:
233+
let peerIdRes = getPeerIdFromMultiAddr(mixNodeMultiaddr)
234+
if peerIdRes.isErr:
235+
error "Failed to get peer id from multiaddress: " , error = peerIdRes.error
236+
let peerId = peerIdRes.get()
237+
if peerID == exceptPeerID:
238+
continue
239+
let mixNodePubInfo = createMixPubInfo(mixNodeMultiaddr, intoCurve25519Key(ncrutils.fromHex(bootNodesMixPubKeys[index])))
240+
241+
mixNodes[peerId] = mixNodePubInfo
242+
info "using mix bootstrap nodes ", bootNodes = mixNodes
243+
return mixNodes
244+
245+
246+
# Mix Protocol
247+
proc mountMix*(node: WakuNode, mixPrivKey: string): Future[Result[void, string]] {.async.} =
248+
info "mounting mix protocol", nodeId = node.info #TODO log the config used
249+
info "mixPrivKey", mixPrivKey = mixPrivKey
250+
251+
let mixKey = intoCurve25519Key(ncrutils.fromHex(mixPrivKey))
252+
let mixPubKey = public(mixKey)
253+
254+
let localaddrStr = node.announcedAddresses[0].toString().valueOr:
255+
return err("Failed to convert multiaddress to string.")
256+
info "local addr", localaddr = localaddrStr
257+
258+
let localMixNodeInfo = initMixNodeInfo(
259+
localaddrStr & "/p2p/" & $node.peerId, mixPubKey, mixKey, node.switch.peerInfo.publicKey.skkey,
260+
node.switch.peerInfo.privateKey.skkey,
261+
)
262+
263+
let protoRes = MixProtocol.initMix(localMixNodeInfo, node.switch, node.getBootStrapMixNodes(node.peerId))
264+
if protoRes.isErr:
265+
error "Mix protocol initialization failed", err = protoRes.error
266+
return
267+
node.mix = protoRes.value
268+
269+
let catchRes = catch:
270+
node.switch.mount(node.mix)
271+
if catchRes.isErr():
272+
return err(catchRes.error.msg)
273+
274+
return ok()
275+
207276
## Waku Sync
208277

209278
proc mountStoreSync*(

waku/waku_lightpush/client.nim

+19-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
{.push raises: [].}
22

33
import std/options, results, chronicles, chronos, metrics, bearssl/rand, stew/byteutils
4-
import libp2p/peerid
4+
import libp2p/peerid, libp2p/stream/connection
55
import
66
../waku_core/peers,
77
../node/peer_manager,
@@ -107,3 +107,21 @@ proc publishToAny*(
107107
obs.onMessagePublished(pubSubTopic, message)
108108

109109
return ok()
110+
111+
112+
proc publishWithConn*(
113+
wl: WakuLightPushClient, pubSubTopic: PubsubTopic, message: WakuMessage, conn: Connection
114+
): Future[WakuLightPushResult[void]] {.async, gcsafe.} =
115+
## This proc is similar to the publish one but in this case
116+
## we use existing connection to publish.
117+
118+
info "publishWithConn", msg_hash = computeMessageHash(pubsubTopic, message).to0xHex
119+
120+
let pushRequest = PushRequest(pubSubTopic: pubSubTopic, message: message)
121+
let rpc = PushRPC(requestId: generateRequestId(wl.rng), request: some(pushRequest))
122+
await conn.writeLP(rpc.encode().buffer)
123+
124+
for obs in wl.publishObservers:
125+
obs.onMessagePublished(pubSubTopic, message)
126+
127+
return ok()

0 commit comments

Comments
 (0)