Skip to content

Commit 19785be

Browse files
authored
update gozmq example (#225)
* update gozmq example Signed-off-by: vsoch <[email protected]>
1 parent 07c330c commit 19785be

File tree

7 files changed

+428
-148
lines changed

7 files changed

+428
-148
lines changed

examples/distributed/gozmq/README.md

Lines changed: 61 additions & 66 deletions
Large diffs are not rendered by default.

examples/distributed/gozmq/entrypoint.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,4 +6,4 @@ rank=${FLUX_TASK_RANK}
66
# Get the host name
77
host=$(hostname)
88
echo "Hello I'm host $host"
9-
go run /code/main.go run --size ${workers} --prefix flux-sample --suffix "flux-service.default.svc.cluster.local" --port 5555 --rank ${rank}
9+
go run /code/main.go run --size ${workers} --prefix flux-sample --suffix "flux-service.default.svc.cluster.local" --port 5555 --rank ${rank} --raw
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
kind: Cluster
2+
apiVersion: kind.x-k8s.io/v1alpha4
3+
nodes:
4+
- role: control-plane
5+
kubeadmConfigPatches:
6+
- |
7+
kind: InitConfiguration
8+
nodeRegistration:
9+
kubeletExtraArgs:
10+
node-labels: "ingress-ready=true"
11+
- role: worker
12+
- role: worker
13+
- role: worker
14+
- role: worker

examples/distributed/gozmq/main.go.txt

Lines changed: 179 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -3,53 +3,188 @@ package main
33
import (
44
"log"
55
"os"
6-
"strings"
6+
"sync"
77

88
"github.com/akamensky/argparse"
99
zmq "github.com/pebbe/zmq4"
1010

1111
"fmt"
12-
"math/rand"
1312
"time"
1413
)
1514

16-
func workerTask(toHost, fromHost string) {
15+
16+
// ElapsedTime holds a start, end and elapsed time
17+
type ElapsedTime struct {
18+
StartTime time.Time
19+
EndTime time.Time
20+
}
21+
22+
func (e *ElapsedTime) Start() {
23+
e.StartTime = time.Now()
24+
}
25+
func (e *ElapsedTime) Stop() {
26+
e.EndTime = time.Now()
27+
}
28+
29+
func (e *ElapsedTime) Elapsed() time.Duration {
30+
return e.EndTime.Sub(e.StartTime)
31+
}
32+
33+
// brokerTask is receiving work (client or DEALER calls)
34+
// and responding.
35+
func brokerTask(
36+
broker *zmq.Socket,
37+
measurements int,
38+
size int,
39+
) {
40+
41+
// The total number of expected interactions we should have is
42+
// the number of other workers * measurements
43+
expected := measurements * (size - 1)
44+
count := 0
45+
46+
// Keep going until we hit expected
47+
for count < expected {
48+
identity, err := broker.Recv(0)
49+
if err != nil {
50+
log.Fatalf("Error", err)
51+
}
52+
53+
// Send back to the specific identity asking for more
54+
// We check that the identity we receive at the worker is the one we sent
55+
broker.Send(identity, zmq.SNDMORE)
56+
57+
// This is the envelope delimiter
58+
// If you look at the string it is empty
59+
broker.Recv(0)
60+
61+
// This is the response from the worker
62+
fromIdentity, err := broker.Recv(0)
63+
if fromIdentity != identity {
64+
log.Fatalf("[broker] received message expecting %s got %s\n", identity, fromIdentity)
65+
}
66+
if err != nil {
67+
log.Fatalf("Error broker receiving message", err)
68+
}
69+
70+
// This is completing the round trip, it tells the worker to start
71+
// the next loop and that this message round is finished (I think)
72+
broker.Send("", zmq.SNDMORE)
73+
broker.Send(fromIdentity, 0)
74+
count += 1
75+
}
76+
}
77+
78+
// workerTask SENDS the message and responds
79+
// raw indicates showing raw results instead of a mean
80+
func workerTask(
81+
fromHost, toHost string,
82+
rank int,
83+
raw bool,
84+
wg *sync.WaitGroup,
85+
measurements int,
86+
) {
87+
88+
// Dealer sockets are the clients
1789
worker, err := zmq.NewSocket(zmq.DEALER)
1890
if err != nil {
1991
log.Fatalf("Error", err)
2092
}
2193
defer worker.Close()
22-
set_id(worker) // Set a printable identity
94+
defer wg.Done()
95+
96+
// Set a printable identity and set for times
97+
// This is a lookup of point to point send times
98+
identity := setIdentifier(worker, rank)
2399
worker.Connect(fmt.Sprintf("tcp://%s", toHost))
24100

25-
total := 0
26-
for {
101+
// The client (dealer) is sending and receiving,
102+
// so we keep track of round trip here.
103+
// I think if we time the broker, the broker can store
104+
// messages in memory so the times are too fast.
105+
// Each rank (fromHost ) keeps track of the times from itself
106+
// to one other host (toHost)
107+
times := []time.Duration{}
108+
109+
// Take m measurements
110+
for m := 0; m < measurements; m++ {
111+
112+
// This is a request for work - I think it would
113+
// encompass two messages
114+
_, err := worker.Send("", zmq.SNDMORE)
115+
if err != nil {
116+
log.Fatalf("Error Send More", err)
117+
}
118+
27119
// Tell the broker we're ready for work
28-
worker.Send("", zmq.SNDMORE)
29-
worker.Send("Ready to serve!", 0)
30-
31-
// Get workload from broker, until finished
32-
worker.Recv(0) // Envelope delimiter
33-
workload, _ := worker.Recv(0)
34-
if workload == "Done" {
35-
fmt.Printf("Completed: from %s to %s: %d tasks\n", fromHost, toHost, total)
36-
break
120+
t := ElapsedTime{}
121+
t.Start()
122+
123+
// We send back the worker rank (based on identity) to check
124+
// against the original identity sent to
125+
_, err = worker.Send(identity, 0)
126+
if err != nil {
127+
log.Fatalf("Error Send Message", err)
128+
}
129+
130+
_, err = worker.Recv(0)
131+
if err != nil {
132+
log.Fatalf("Error Receiving Envelope", err)
37133
}
38-
total++
134+
receivedMessage, err := worker.Recv(0)
39135

40-
// Do some random work
41-
time.Sleep(time.Duration(rand.Intn(500)+1) * time.Millisecond)
136+
// This is thd end of the round trip
137+
t.Stop()
138+
139+
if err != nil {
140+
log.Fatalf("Error", err)
141+
}
142+
143+
times = append(times, t.Elapsed())
144+
if receivedMessage != identity {
145+
log.Fatalf("[worker] received message expecting %s got %s\n", identity, receivedMessage)
146+
}
147+
}
148+
if raw {
149+
fmt.Printf(" ⭐️ Times for %d messages %s to %s: %s\n", measurements, fromHost, toHost, times)
150+
} else {
151+
meanTime := calculateMean(times)
152+
fmt.Printf(" ⭐️ Mean times for %d messages %s to %s: %s\n", measurements, fromHost, toHost, meanTime)
42153
}
43154
}
44155

156+
// calculateMean calculates the mean duration
157+
func calculateMean(times []time.Duration) time.Duration {
158+
total := time.Duration(0) * time.Nanosecond
159+
for _, t := range times {
160+
total += t
161+
}
162+
return (total / time.Duration(len(times)) * time.Nanosecond)
163+
}
164+
165+
// getIdentifier for a rank
166+
func getIdentifier(rank int) string {
167+
return fmt.Sprintf("rank-%d", rank)
168+
}
169+
170+
// setIdentifier for a rank
171+
// These need to be predictable between nodes
172+
func setIdentifier(soc *zmq.Socket, rank int) string {
173+
identity := getIdentifier(rank)
174+
soc.SetIdentity(identity)
175+
return identity
176+
}
177+
45178
func main() {
46179

47180
parser := argparse.NewParser("gozmq", "Playing with ZeroMQ in Go")
48181
runCmd := parser.NewCommand("run", "Run the example")
49182
prefix := runCmd.String("p", "prefix", &argparse.Options{Help: "Hostname prefix (e.g., flux-sample)"})
50183
size := runCmd.Int("s", "size", &argparse.Options{Help: "Number of hosts (count starts at 0)"})
51184
rank := runCmd.Int("r", "rank", &argparse.Options{Help: "Rank of this host"})
52-
tasks := runCmd.Int("t", "tasks", &argparse.Options{Help: "Number of tasks (workers) per node", Default: 1})
185+
186+
// This should only be set to 1 for this example
187+
raw := runCmd.Flag("", "raw", &argparse.Options{Help: "Output raw times instead of mean", Default: false})
53188
measurements := runCmd.Int("m", "measurements", &argparse.Options{Help: "Number of measurements to take (to average over)", Default: 10})
54189
suffix := runCmd.String("", "suffix", &argparse.Options{Help: "Hostname suffix (e.g. .flux-service.default.svc.cluster.local)"})
55190
port := runCmd.String("", "port", &argparse.Options{Help: "Port to use", Default: "5671"})
@@ -65,6 +200,9 @@ func main() {
65200
// Start the broker on the host
66201
thisHost := fmt.Sprintf("%s-%d.%s:%s", *prefix, *rank, *suffix, *port)
67202

203+
// This is the broker that will be a router on the rank it is running on
204+
// We will ask the worker for a message, and then keep track of the
205+
// round trip time
68206
broker, err := zmq.NewSocket(zmq.ROUTER)
69207
if err != nil {
70208
log.Fatalf("Error", err)
@@ -74,74 +212,37 @@ func main() {
74212
brokerHost := fmt.Sprintf("tcp://*:%s", *port)
75213
broker.Bind(brokerHost)
76214

77-
// Run a client task for each host
215+
// This will ensure the clients finish, and brokers as well
216+
var wg sync.WaitGroup
217+
218+
// Step 1: launch all the worker tasks!
219+
// We run a client task (worker) to send a message to every other host
220+
// The workers are going to be the main driver to run some number of measurements
78221
for i := 0; i < *size; i++ {
79222

80-
// Don't send to self?
223+
// Don't send to self
81224
if i == *rank {
225+
//row[i+1] = fmt.Sprintf("0")
82226
continue
83227
}
84228

85229
host := fmt.Sprintf("%s-%d.%s:%s", *prefix, i, *suffix, *port)
86230

87-
// Note that we can run more than one worker task here,
88-
// I'm choosing one to mimic(?) point to point (maybe)?
89-
for w := 0; w < *tasks; w++ {
90-
go workerTask(host, thisHost)
91-
}
92-
93-
// Keep matrix of elapsed times
94-
times := make([]time.Duration, *measurements)
95-
for m := 0; m < *measurements; m++ {
96-
97-
// Next message gives us least recently used worker
98-
identity, err := broker.Recv(0)
99-
if err != nil {
100-
log.Fatalf("Error", err)
101-
}
102-
start := time.Now()
103-
broker.Send(identity, zmq.SNDMORE)
104-
105-
// This is the envelope delimiter
106-
broker.Recv(0)
107-
108-
// This is the response from the worker
109-
// This is the round trip time
110-
broker.Recv(0)
111-
end := time.Now()
112-
elapsed := end.Sub(start)
113-
114-
// Add the entry to our matrix
115-
times[m] = elapsed
116-
broker.Send("", zmq.SNDMORE)
117-
118-
// Workers need to keep going until experiment done
119-
broker.Send("Keep going", 0)
120-
}
121-
122-
// Tell the worker it's done
123-
toHostPrefix := strings.Split(host, ".")
124-
fromHostPrefix := strings.Split(thisHost, ".")
125-
fmt.Printf(" ⭐️ Times for %s to %s: %s\n", fromHostPrefix[0], toHostPrefix[0], times)
231+
// We should only have one worker here for a point to point test
232+
// This worker is from thisHost TO the other rank, which should
233+
// also be running a broker. It will perform some number of
234+
// tasks until it receives a Done message
235+
wg.Add(1)
236+
go workerTask(thisHost, host, i, *raw, &wg, *measurements)
126237
}
127238

128-
// Give some time for everyone to finish
129-
time.Sleep(time.Second * 10)
130-
broker.Send("Done", 0)
131-
}
132-
}
239+
// Step 2: Kick off workers to receive them. Keep going
240+
// until both it's received all the expected pings (from other workers)
241+
// AND our own workers are done.
242+
go brokerTask(broker, *measurements, *size)
133243

134-
// calculateMean calculates the mean duration
135-
// TODO get this working, units are weird
136-
func calculateMean(times []time.Duration) time.Duration {
137-
total := time.Duration(0)
138-
for _, t := range times {
139-
total += t
244+
// Wait for all workers to finish, and then for all brokers
245+
// to have the number of interactions they expect
246+
wg.Wait()
140247
}
141-
return (total / time.Duration(len(times))) * time.Nanosecond
142-
}
143-
144-
func set_id(soc *zmq.Socket) {
145-
identity := fmt.Sprintf("%04X-%04X", rand.Intn(0x10000), rand.Intn(0x10000))
146-
soc.SetIdentity(identity)
147-
}
248+
}

0 commit comments

Comments
 (0)