Skip to content

Commit f42c109

Browse files
authored
Merge pull request #50 from jackhftang/master
added tryReceiveAsync, receiveAllAsync
2 parents 4cc1ca3 + 0438d9d commit f42c109

File tree

2 files changed

+98
-13
lines changed

2 files changed

+98
-13
lines changed
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
import std/asyncdispatch
2+
import std/sequtils
3+
import std/strutils
4+
import ../zmq
5+
6+
const N_WORKER = 3
7+
const host = "tcp://localhost:5572"
8+
9+
proc worker(): Future[void] {.async.} =
10+
let socket = zmq.connect(host, DEALER)
11+
socket.sendAll("READY")
12+
var isRunning = true
13+
while isRunning:
14+
let multiparts = await socket.receiveAllAsync()
15+
echo "worker receive ", multiparts
16+
let command = multiparts[0]
17+
case command:
18+
of "JOB":
19+
let x = multiparts[1]
20+
let y = parseInt(x)*2
21+
socket.sendAll("DONE", $x, $y)
22+
of "KILL":
23+
isRunning = false
24+
socket.sendAll("END")
25+
26+
proc router(): Future[void] {.async.} =
27+
let socket = zmq.listen(host, ROUTER)
28+
var jobs = toSeq(1..10)
29+
var nWorker = 0
30+
var isRunning = true
31+
while isRunning:
32+
let multiparts = await socket.receiveAllAsync()
33+
echo "router receive ", multiparts
34+
let workerId = multiparts[0]
35+
let command = multiparts[1]
36+
case command:
37+
of "READY":
38+
nWorker += 1
39+
if jobs.len > 0:
40+
socket.sendAll(workerId, "JOB", $jobs.pop())
41+
else:
42+
socket.sendAll(workerId, "KILL")
43+
of "END":
44+
nWorker -= 1
45+
if nWorker == 0:
46+
# stop router if no workers
47+
isRunning = false
48+
of "DONE":
49+
let x = multiparts[2]
50+
let y = multiparts[3]
51+
assert parseInt(x)*2 == parseInt(y)
52+
if jobs.len > 0:
53+
socket.sendAll(workerId, "JOB", $jobs.pop())
54+
else:
55+
socket.sendAll(workerId, "KILL")
56+
else:
57+
raise newException(CatchableError, "unknown command")
58+
59+
when isMainModule:
60+
echo "ex11_async_router_dealer.nim"
61+
asyncCheck router()
62+
for i in 1..N_WORKER:
63+
asyncCheck worker()
64+
while hasPendingOperations():
65+
poll()

zmq/asynczmq.nim

Lines changed: 33 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -90,17 +90,7 @@ proc pollAsync*(poller: AsyncZPoller, timeout: int = 2) : Future[int] =
9090

9191
result.complete(r)
9292

93-
proc receiveAsync*(conn: ZConnection): Future[string] =
94-
## Similar to `receive()`, but `receiveAsync()` allows other async tasks to run.
95-
## `receiveAsync()` allows other async tasks to run in those cases.
96-
##
97-
## This will not work in some case because it depends on ZMQ_FD which is not necessarily the 'true' FD of the socket
98-
##
99-
## See https://github.com/zeromq/libzmq/issues/2941 and https://github.com/zeromq/pyzmq/issues/1411
100-
let fut = newFuture[string]("receiveAsync")
101-
result = fut
102-
let sock = conn.socket
103-
93+
template receiveAsyncCallbackTemplate(fut: Future, sock: ZSocket, recv, cb) =
10494
proc cb(fd: AsyncFD): bool {.closure, gcsafe.} =
10595
# the cb should work on the low level socket and not the ZConnection object
10696
result = true
@@ -116,15 +106,45 @@ proc receiveAsync*(conn: ZConnection): Future[string] =
116106
else:
117107
# ready to read
118108
unregister(fd)
119-
fut.complete sock.receive(DONTWAIT)
109+
fut.complete recv(sock, DONTWAIT)
120110
except:
121111
unregister(fd)
122112
fut.fail getCurrentException()
123113

114+
proc receiveAsync*(conn: ZConnection): Future[string] =
115+
## Similar to `receive()`, but `receiveAsync()` allows other async tasks to run.
116+
## `receiveAsync()` allows other async tasks to run in those cases.
117+
##
118+
## This will not work in some case because it depends on ZMQ_FD which is not necessarily the 'true' FD of the socket
119+
##
120+
## See https://github.com/zeromq/libzmq/issues/2941 and https://github.com/zeromq/pyzmq/issues/1411
121+
let fut = newFuture[string]("receiveAsync")
122+
result = fut
123+
receiveAsyncCallbackTemplate(fut, conn.socket, receive, cb)
124124
let fd = getsockopt[cint](conn, ZSockOptions.FD).AsyncFD
125125
register(fd)
126126
discard cb(fd)
127127

128+
proc tryReceiveAsync*(conn: ZConnection): Future[tuple[msgAvailable: bool, moreAvailable: bool, msg: string]] =
129+
## Async version of `tryReceive()`
130+
let fut = newFuture[tuple[msgAvailable: bool, moreAvailable: bool, msg: string]]("tryReceiveAsync")
131+
result = fut
132+
receiveAsyncCallbackTemplate(fut, conn.socket, tryReceive, cb)
133+
let fd = getsockopt[cint](conn, ZSockOptions.FD).AsyncFD
134+
register(fd)
135+
discard cb(fd)
136+
137+
proc receiveAllAsync*(conn: ZConnection): Future[seq[string]] {.async.} =
138+
## async version for `receiveAll()`
139+
var expectMessage = true
140+
while expectMessage:
141+
let (msgAvailable, moreAvailable, msg) = await tryReceiveAsync(conn)
142+
if msgAvailable:
143+
result.add msg
144+
expectMessage = moreAvailable
145+
else:
146+
expectMessage = false
147+
128148
proc sendAsync*(conn: ZConnection, msg: string, flags: ZSendRecvOptions = DONTWAIT): Future[void] =
129149
## `send()` is blocking for some connection types (e.g. PUSH, DEALER).
130150
## `sendAsync()` allows other async tasks to run in those cases.
@@ -134,11 +154,11 @@ proc sendAsync*(conn: ZConnection, msg: string, flags: ZSendRecvOptions = DONTWA
134154
## See https://github.com/zeromq/libzmq/issues/2941 and https://github.com/zeromq/pyzmq/issues/1411
135155
let fut = newFuture[void]("sendAsync")
136156
result = fut
137-
let sock = conn.socket
138157

139158
let status = getsockopt[cint](conn, ZSockOptions.EVENTS)
140159
if (status and ZMQ_POLLOUT) == 0:
141160
# wait until queue available
161+
let sock = conn.socket
142162
proc cb(fd: AsyncFD): bool {.closure, gcsafe.} =
143163
result = true
144164

0 commit comments

Comments
 (0)