Skip to content

Commit 2f70a2d

Browse files
committed
provider, node, net, io, sdk/node, examples/go/broadcast, examples/go/clock: wrap around io.PipeReader and io.PipeWriter to emit io.EOF instead of io.ErrClosedPipe, remove BindFunc in favor of strings representative of tcp addresses to listen to peers from, update readme with change in go sdk, add ProvidersFor(services ...string) to grab all available providers for a set of services in Go and NodeJS, add chatroom broadcast example for chatting across nodes using ProvidersFor()
1 parent 229e01f commit 2f70a2d

File tree

11 files changed

+343
-140
lines changed

11 files changed

+343
-140
lines changed

README.md

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -285,13 +285,9 @@ type Node struct {
285285
// by calling `flatend.GenerateSecretKey()`.
286286
SecretKey kademlia.PrivateKey
287287

288-
// A list of addresses and ports assembled using:
289-
// 1. flatend.BindAny() (bind to all hosts and any available port)
290-
// 2. flatend.BindTCP(string) (binds to a [host]:[port])
291-
// 3. flatend.BindTCPv4(string) (binds to an [IPv4 host]:[port])
292-
// 4. flatend.BindTCPv6(string) (binds to an [IPv6 host]:[port])
293-
// which your Flatend node will listen for other nodes from.
294-
BindAddrs []BindFunc
288+
// A list of IPv4/IPv6 addresses and ports assembled as [host]:[port] which
289+
// your Flatend node will listen for other nodes from.
290+
BindAddrs []string
295291

296292
// A mapping of service names to their respective handlers.
297293
Services map[string]Handler

examples/go/broadcast/README.md

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
# broadcast
2+
3+
A demo of broadcasting a message from one node to several nodes providing the service named `'chat'`.
4+
5+
```
6+
[terminal 1] $ go run main.go -l :9000
7+
2020/06/27 02:37:52 Listening for Flatend nodes on '[::]:9000'.
8+
2020/06/27 02:37:53 0.0.0.0:9001 has connected. Services: [chat]
9+
2020/06/27 02:37:56 0.0.0.0:9002 has connected. Services: [chat]
10+
hello
11+
Got 'world' from 0.0.0.0:9001!
12+
Got 'test' from 0.0.0.0:9002!
13+
2020/06/27 02:39:09 0.0.0.0:9002 has disconnected from you. Services: [chat]
14+
2020/06/27 02:39:10 0.0.0.0:9001 has disconnected from you. Services: [chat]
15+
16+
[terminal 2] $ go run main.go -l :9001 :9000
17+
2020/06/27 02:37:53 Listening for Flatend nodes on '[::]:9001'.
18+
2020/06/27 02:37:53 You are now connected to 0.0.0.0:9000. Services: [chat]
19+
2020/06/27 02:37:53 Re-probed 0.0.0.0:9000. Services: [chat]
20+
2020/06/27 02:37:53 Discovered 0 peer(s).
21+
2020/06/27 02:37:56 0.0.0.0:9002 has connected. Services: [chat]
22+
Got 'hello' from 0.0.0.0:9000!
23+
world
24+
Got 'test' from 0.0.0.0:9002!
25+
2020/06/27 02:39:09 0.0.0.0:9002 has disconnected from you. Services: [chat]
26+
27+
[terminal 3] $ go run main.go -l :9002 :9000
28+
2020/06/27 02:37:56 Listening for Flatend nodes on '[::]:9002'.
29+
2020/06/27 02:37:56 You are now connected to 0.0.0.0:9000. Services: [chat]
30+
2020/06/27 02:37:56 Re-probed 0.0.0.0:9000. Services: [chat]
31+
2020/06/27 02:37:56 You are now connected to 0.0.0.0:9001. Services: [chat]
32+
2020/06/27 02:37:56 Discovered 1 peer(s).
33+
Got 'hello' from 0.0.0.0:9000!
34+
Got 'world' from 0.0.0.0:9001!
35+
```
36+
37+
```go
38+
package main
39+
40+
import (
41+
"bufio"
42+
"bytes"
43+
"flag"
44+
"fmt"
45+
"github.com/lithdew/flatend"
46+
"io/ioutil"
47+
"os"
48+
)
49+
50+
func check(err error) {
51+
if err != nil {
52+
panic(err)
53+
}
54+
}
55+
56+
func main() {
57+
var listenAddr string
58+
flag.StringVar(&listenAddr, "l", ":9000", "address to listen for peers on")
59+
flag.Parse()
60+
61+
node := &flatend.Node{
62+
PublicAddr: listenAddr,
63+
BindAddrs: []string{listenAddr},
64+
SecretKey: flatend.GenerateSecretKey(),
65+
Services: map[string]flatend.Handler{
66+
"chat": func(ctx *flatend.Context) {
67+
buf, err := ioutil.ReadAll(ctx.Body)
68+
if err != nil {
69+
return
70+
}
71+
fmt.Printf("Got '%s' from %s:%d!\n", string(buf), ctx.ID.Host.String(), ctx.ID.Port)
72+
},
73+
},
74+
}
75+
defer node.Shutdown()
76+
77+
check(node.Start(flag.Args()...))
78+
79+
br := bufio.NewReader(os.Stdin)
80+
for {
81+
line, _, err := br.ReadLine()
82+
if err != nil {
83+
break
84+
}
85+
86+
line = bytes.TrimSpace(line)
87+
if len(line) == 0 {
88+
continue
89+
}
90+
91+
providers := node.ProvidersFor("chat")
92+
for _, provider := range providers {
93+
_, err := provider.Push([]string{"chat"}, nil, ioutil.NopCloser(bytes.NewReader(line)))
94+
if err != nil {
95+
fmt.Printf("Unable to broadcast to %s: %s\n", provider.Addr(), err)
96+
}
97+
}
98+
}
99+
}
100+
```

examples/go/broadcast/main.go

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
package main
2+
3+
import (
4+
"bufio"
5+
"bytes"
6+
"flag"
7+
"fmt"
8+
"github.com/lithdew/flatend"
9+
"io/ioutil"
10+
"os"
11+
)
12+
13+
func check(err error) {
14+
if err != nil {
15+
panic(err)
16+
}
17+
}
18+
19+
func main() {
20+
var listenAddr string
21+
flag.StringVar(&listenAddr, "l", ":9000", "address to listen for peers on")
22+
flag.Parse()
23+
24+
node := &flatend.Node{
25+
PublicAddr: listenAddr,
26+
BindAddrs: []string{listenAddr},
27+
SecretKey: flatend.GenerateSecretKey(),
28+
Services: map[string]flatend.Handler{
29+
"chat": func(ctx *flatend.Context) {
30+
buf, err := ioutil.ReadAll(ctx.Body)
31+
if err != nil {
32+
return
33+
}
34+
fmt.Printf("Got '%s' from %s:%d!\n", string(buf), ctx.ID.Host.String(), ctx.ID.Port)
35+
},
36+
},
37+
}
38+
defer node.Shutdown()
39+
40+
check(node.Start(flag.Args()...))
41+
42+
br := bufio.NewReader(os.Stdin)
43+
for {
44+
line, _, err := br.ReadLine()
45+
if err != nil {
46+
break
47+
}
48+
49+
line = bytes.TrimSpace(line)
50+
if len(line) == 0 {
51+
continue
52+
}
53+
54+
providers := node.ProvidersFor("chat")
55+
for _, provider := range providers {
56+
_, err := provider.Push([]string{"chat"}, nil, ioutil.NopCloser(bytes.NewReader(line)))
57+
if err != nil {
58+
fmt.Printf("Unable to broadcast to %s: %s\n", provider.Addr(), err)
59+
}
60+
}
61+
}
62+
}

examples/go/clock/README.md

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ func clock(ctx *flatend.Context) {
8383
ours := latest.Format(time.Stamp)
8484

8585
timestamp, err := ioutil.ReadAll(ctx.Body)
86-
if err != nil && !errors.Is(err, io.ErrClosedPipe) {
86+
if err != nil {
8787
return
8888
}
8989

@@ -136,9 +136,7 @@ func main() {
136136
fmt.Printf("[%d] Asked someone for their current time. Ours is '%s'.\n", i, timestamp)
137137

138138
res, err := ioutil.ReadAll(stream.Reader)
139-
if !errors.Is(err, io.ErrClosedPipe) {
140-
check(err)
141-
}
139+
check(err)
142140

143141
fmt.Printf("[%d] Got a response! Their current time is: '%s'\n", i, string(res))
144142
}

examples/go/clock/main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ func clock(ctx *flatend.Context) {
2525
ours := latest.Format(time.Stamp)
2626

2727
timestamp, err := ioutil.ReadAll(ctx.Body)
28-
if err != nil && !errors.Is(err, io.ErrClosedPipe) {
28+
if err != nil {
2929
return
3030
}
3131

io.go

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
package flatend
2+
3+
import (
4+
"errors"
5+
"fmt"
6+
"io"
7+
)
8+
9+
type pipeReader struct {
10+
*io.PipeReader
11+
}
12+
13+
func (r *pipeReader) Read(buf []byte) (n int, err error) {
14+
n, err = r.PipeReader.Read(buf)
15+
if err != nil && errors.Is(err, io.ErrClosedPipe) {
16+
err = io.EOF
17+
}
18+
return n, err
19+
}
20+
21+
type pipeWriter struct {
22+
*io.PipeWriter
23+
}
24+
25+
func (w *pipeWriter) Write(buf []byte) (n int, err error) {
26+
n, err = w.PipeWriter.Write(buf)
27+
if err != nil && errors.Is(err, io.ErrClosedPipe) {
28+
err = fmt.Errorf("%s: %w", err, io.EOF)
29+
}
30+
return n, err
31+
}
32+
33+
// createWrappedPipe wraps around a reader/writer pair from io.Pipe() such that all
34+
// errors reported by such reader/writer pair that comprise of io.ErrClosedPipe
35+
// will be wrapped with io.EOF.
36+
func createWrappedPipe() (*pipeReader, *pipeWriter) {
37+
r, w := io.Pipe()
38+
pr := &pipeReader{PipeReader: r}
39+
pw := &pipeWriter{PipeWriter: w}
40+
return pr, pw
41+
}

net.go

Lines changed: 0 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -4,30 +4,11 @@ import (
44
"github.com/lithdew/kademlia"
55
"github.com/lithdew/monte"
66
"io"
7-
"net"
87
"sync"
98
)
109

1110
const ChunkSize = 2048
1211

13-
type BindFunc func() (net.Listener, error)
14-
15-
func BindAny() BindFunc {
16-
return func() (net.Listener, error) { return net.Listen("tcp", ":0") }
17-
}
18-
19-
func BindTCP(addr string) BindFunc {
20-
return func() (net.Listener, error) { return net.Listen("tcp", addr) }
21-
}
22-
23-
func BindTCPv4(addr string) BindFunc {
24-
return func() (net.Listener, error) { return net.Listen("tcp4", addr) }
25-
}
26-
27-
func BindTCPv6(addr string) BindFunc {
28-
return func() (net.Listener, error) { return net.Listen("tcp6", addr) }
29-
}
30-
3112
var _ io.Writer = (*Context)(nil)
3213

3314
type Context struct {

0 commit comments

Comments
 (0)