Skip to content

Commit 896f1d3

Browse files
committed
working lightpush request with mix example with static nodes
1 parent c319ff9 commit 896f1d3

File tree

6 files changed

+146
-4
lines changed

6 files changed

+146
-4
lines changed

examples/lightpush_publisher_mix.nim

+122
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
import
2+
std/[tables, times, sequtils],
3+
stew/byteutils,
4+
stew/shims/net,
5+
chronicles,
6+
results,
7+
chronos,
8+
confutils,
9+
libp2p/crypto/crypto,
10+
eth/keys,
11+
eth/p2p/discoveryv5/enr
12+
13+
import ../vendor/mix/src/entry_connection,
14+
../vendor/mix/src/protocol
15+
16+
import
17+
waku/[
18+
common/logging,
19+
node/peer_manager,
20+
waku_core,
21+
waku_core/codecs,
22+
waku_node,
23+
waku_enr,
24+
discovery/waku_discv5,
25+
factory/builder,
26+
waku_lightpush/client
27+
]
28+
29+
proc now*(): Timestamp =
30+
getNanosecondTime(getTime().toUnixFloat())
31+
32+
# careful if running pub and sub in the same machine
33+
const wakuPort = 60000
34+
35+
const clusterId = 2
36+
const shardId = @[0'u16]
37+
38+
const
39+
LightpushPeer =
40+
"/ip4/127.0.0.1/tcp/60005/p2p/16Uiu2HAmRhxmCHBYdXt1RibXrjAUNJbduAhzaTHwFCZT4qWnqZAu"
41+
LightpushPubsubTopic = PubsubTopic("/waku/2/rs/2/0")
42+
LightpushContentTopic = ContentTopic("/examples/1/light-pubsub-mix-example/proto")
43+
44+
proc setupAndPublish(rng: ref HmacDrbgContext) {.async.} =
45+
# use notice to filter all waku messaging
46+
setupLog(logging.LogLevel.DEBUG, logging.LogFormat.TEXT)
47+
48+
notice "starting publisher", wakuPort = wakuPort
49+
50+
let
51+
nodeKey = crypto.PrivateKey.random(Secp256k1, rng[]).get()
52+
ip = parseIpAddress("0.0.0.0")
53+
flags = CapabilitiesBitfield.init(relay = true)
54+
55+
let relayShards = RelayShards.init(clusterId, shardId).valueOr:
56+
error "Relay shards initialization failed", error = error
57+
quit(QuitFailure)
58+
59+
var enrBuilder = EnrBuilder.init(nodeKey)
60+
enrBuilder.withWakuRelaySharding(relayShards).expect(
61+
"Building ENR with relay sharding failed"
62+
)
63+
64+
let recordRes = enrBuilder.build()
65+
let record =
66+
if recordRes.isErr():
67+
error "failed to create enr record", error = recordRes.error
68+
quit(QuitFailure)
69+
else:
70+
recordRes.get()
71+
72+
var builder = WakuNodeBuilder.init()
73+
builder.withNodeKey(nodeKey)
74+
builder.withRecord(record)
75+
builder.withNetworkConfigurationDetails(ip, Port(wakuPort)).tryGet()
76+
let node = builder.build().tryGet()
77+
78+
node.mountMetadata(clusterId).expect("failed to mount waku metadata protocol")
79+
node.mountLightPushClient()
80+
(
81+
await node.mountMix("401dd1eb5582f6dc9488d424aa26ed1092becefcf8543172e6d92c17ed07265a")
82+
).isOkOr:
83+
error "failed to mount waku mix protocol: ", error = $error
84+
return
85+
86+
let conn = MixEntryConnection.newConn(
87+
"/ip4/127.0.0.1/tcp/60001/p2p/16Uiu2HAmPiEs2ozjjJF2iN2Pe2FYeMC9w4caRHKYdLdAfjgbWM6o",
88+
ProtocolType.fromString(WakuLightPushCodec),
89+
node.mix)
90+
91+
await node.start()
92+
node.peerManager.start()
93+
94+
notice "publisher service started"
95+
while true:
96+
let text = "hi there i'm a publisher"
97+
let message = WakuMessage(
98+
payload: toBytes(text), # content of the message
99+
contentTopic: LightpushContentTopic, # content topic to publish to
100+
ephemeral: true, # tell store nodes to not store it
101+
timestamp: now(),
102+
) # current timestamp
103+
104+
let res =
105+
await node.wakuLightpushClient.publishWithConn(LightpushPubsubTopic, message, conn)
106+
107+
if res.isOk:
108+
notice "published message",
109+
text = text,
110+
timestamp = message.timestamp,
111+
psTopic = LightpushPubsubTopic,
112+
contentTopic = LightpushContentTopic
113+
else:
114+
error "failed to publish message", error = res.error
115+
116+
await sleepAsync(5000)
117+
break
118+
119+
when isMainModule:
120+
let rng = crypto.newRng()
121+
asyncSpawn setupAndPublish(rng)
122+
runForever()

vendor/nim-libp2p

waku.nimble

+1
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,7 @@ task example2, "Build Waku examples":
132132
buildBinary "subscriber", "examples/"
133133
buildBinary "filter_subscriber", "examples/"
134134
buildBinary "lightpush_publisher", "examples/"
135+
buildBinary "lightpush_publisher_mix", "examples/"
135136

136137
task chat2, "Build example Waku chat usage":
137138
# NOTE For debugging, set debug level. For chat usage we want minimal log

waku/node/waku_node.nim

+2-1
Original file line numberDiff line numberDiff line change
@@ -253,9 +253,10 @@ proc mountMix*(node: WakuNode, mixPrivKey: string): Future[Result[void, string]]
253253

254254
let localaddrStr = node.announcedAddresses[0].toString().valueOr:
255255
return err("Failed to convert multiaddress to string.")
256+
info "local addr", localaddr = localaddrStr
256257

257258
let localMixNodeInfo = initMixNodeInfo(
258-
localaddrStr, mixPubKey, mixKey, node.switch.peerInfo.publicKey.skkey,
259+
localaddrStr & "/p2p/" & $node.peerId, mixPubKey, mixKey, node.switch.peerInfo.publicKey.skkey,
259260
node.switch.peerInfo.privateKey.skkey,
260261
)
261262

waku/waku_lightpush/client.nim

+19-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
{.push raises: [].}
22

33
import std/options, results, chronicles, chronos, metrics, bearssl/rand, stew/byteutils
4-
import libp2p/peerid
4+
import libp2p/peerid, libp2p/stream/connection
55
import
66
../waku_core/peers,
77
../node/peer_manager,
@@ -107,3 +107,21 @@ proc publishToAny*(
107107
obs.onMessagePublished(pubSubTopic, message)
108108

109109
return ok()
110+
111+
112+
proc publishWithConn*(
113+
wl: WakuLightPushClient, pubSubTopic: PubsubTopic, message: WakuMessage, conn: Connection
114+
): Future[WakuLightPushResult[void]] {.async, gcsafe.} =
115+
## This proc is similar to the publish one but in this case
116+
## we use existing connection to publish.
117+
118+
info "publishWithConn", msg_hash = computeMessageHash(pubsubTopic, message).to0xHex
119+
120+
let pushRequest = PushRequest(pubSubTopic: pubSubTopic, message: message)
121+
let rpc = PushRPC(requestId: generateRequestId(wl.rng), request: some(pushRequest))
122+
await conn.writeLP(rpc.encode().buffer)
123+
124+
for obs in wl.publishObservers:
125+
obs.onMessagePublished(pubSubTopic, message)
126+
127+
return ok()

0 commit comments

Comments
 (0)