Skip to content

Commit a4c9002

Browse files
examples: Add gRPC streaming example (#339)
* examples: Add streaming example * Update examples/streaming/README.md Co-authored-by: Sarah French <15078782+SarahFrench@users.noreply.github.com> * Update examples/streaming/README.md Co-authored-by: Sarah French <15078782+SarahFrench@users.noreply.github.com> * Update examples/streaming/README.md Co-authored-by: Sarah French <15078782+SarahFrench@users.noreply.github.com> * fix typo and format markdown * return error from Close methods * update Readme * update copyright headers & regen protobufs --------- Co-authored-by: Sarah French <15078782+SarahFrench@users.noreply.github.com>
1 parent 0ac49e2 commit a4c9002

11 files changed

Lines changed: 1156 additions & 0 deletions

File tree

examples/streaming/.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
myfile

examples/streaming/README.md

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
# gRPC streaming Example
2+
3+
This example builds a plugin & client which can stream a large amount of data
4+
between them while staying below reasonable message size limits of the gRPC
5+
protocol.
6+
7+
> Note: [hashicorp/go-plugin sets an upper limit on message size](https://github.com/hashicorp/go-plugin/blob/d0d30899ca2d91b0869cb73db95afca180e769cf/grpc_client.go#L39-L41). At time of writing, that value is `math.MaxInt32` bytes, or approximately 2GB.
8+
9+
## To execute
10+
11+
Build the plugin
12+
13+
```
14+
go build -o ./plugin/streamer ./plugin
15+
```
16+
17+
Launch the client:
18+
19+
```
20+
go run main.go myfile
21+
```
22+
23+
The client will first write data to the streamer plugin, and then the client will read that
24+
data back from the plugin. The plugin writes the data it receives in a file called `myfile`,
25+
due to the argument passed to the client above.
26+
27+
## To re-generate protobuf definitions
28+
29+
Install protobuf tooling
30+
31+
```
32+
brew install protobuf@29
33+
```
34+
35+
```
36+
go install google.golang.org/protobuf/cmd/protoc-gen-go@v1.36.6
37+
go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@v1.3.0
38+
```
39+
40+
generate files
41+
42+
```
43+
cd proto
44+
protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative streamer.proto
45+
```

examples/streaming/main.go

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
// Copyright IBM Corp. 2016, 2025
2+
// SPDX-License-Identifier: MPL-2.0
3+
4+
package main
5+
6+
import (
7+
"context"
8+
"fmt"
9+
"log"
10+
"os"
11+
"os/exec"
12+
13+
"github.com/hashicorp/go-hclog"
14+
"github.com/hashicorp/go-plugin"
15+
"github.com/hashicorp/go-plugin/examples/streaming/shared"
16+
"google.golang.org/grpc"
17+
)
18+
19+
func main() {
20+
if len(os.Args) != 2 {
21+
log.Fatal("expected path to file as an argument")
22+
}
23+
path := os.Args[1]
24+
25+
logger := hclog.New(&hclog.LoggerOptions{
26+
Level: hclog.Trace,
27+
Output: os.Stderr,
28+
JSONFormat: true,
29+
})
30+
31+
msgSizeLimit := 1000
32+
chunkSize := 10
33+
34+
client := plugin.NewClient(&plugin.ClientConfig{
35+
HandshakeConfig: plugin.HandshakeConfig{
36+
ProtocolVersion: 1,
37+
MagicCookieKey: "BASIC_PLUGIN",
38+
MagicCookieValue: "hello",
39+
},
40+
Plugins: map[string]plugin.Plugin{
41+
"streamer": &shared.StreamerPlugin{},
42+
},
43+
Cmd: exec.Command("./plugin/streamer"),
44+
AllowedProtocols: []plugin.Protocol{
45+
plugin.ProtocolGRPC,
46+
},
47+
Logger: logger,
48+
GRPCDialOptions: []grpc.DialOption{
49+
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(msgSizeLimit)),
50+
grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(msgSizeLimit)),
51+
},
52+
})
53+
defer client.Kill()
54+
55+
logger.Debug("launching a client")
56+
57+
rpcClient, err := client.Client()
58+
if err != nil {
59+
log.Fatal(err)
60+
}
61+
62+
raw, err := rpcClient.Dispense("streamer")
63+
if err != nil {
64+
log.Fatal(err)
65+
}
66+
67+
ctx := context.Background()
68+
69+
streamer := raw.(shared.Streamer)
70+
err = streamer.Configure(ctx, path, int64(chunkSize))
71+
if err != nil {
72+
log.Fatal(err)
73+
}
74+
75+
err = streamer.Write(ctx, []byte("Lorem ipsum dolor sit amet"))
76+
if err != nil {
77+
log.Fatal(err)
78+
}
79+
80+
logger.Debug("writing finished")
81+
82+
b, err := streamer.Read(ctx)
83+
if err != nil {
84+
log.Fatal(err)
85+
}
86+
logger.Debug(fmt.Sprintf("received %d bytes", len(b)), "bytes", string(b))
87+
}
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
streamer
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
// Copyright IBM Corp. 2016, 2025
2+
// SPDX-License-Identifier: MPL-2.0
3+
4+
package main
5+
6+
import (
7+
"context"
8+
"errors"
9+
"io"
10+
"os"
11+
12+
"github.com/hashicorp/go-hclog"
13+
"github.com/hashicorp/go-plugin"
14+
"github.com/hashicorp/go-plugin/examples/streaming/shared"
15+
)
16+
17+
type FileStreamer struct {
18+
logger hclog.Logger
19+
path string
20+
}
21+
22+
func (fs *FileStreamer) Configure(ctx context.Context, path string, _ int64) error {
23+
fs.path = path
24+
return nil
25+
}
26+
27+
func (fs *FileStreamer) Read(ctx context.Context) ([]byte, error) {
28+
fs.logger.Debug("FileStreamer: Read", "path", fs.path)
29+
f, err := os.OpenFile(fs.path, os.O_RDONLY, 0644)
30+
if err != nil {
31+
return nil, err
32+
}
33+
defer func() {
34+
cErr := f.Close()
35+
err = errors.Join(err, cErr)
36+
}()
37+
return io.ReadAll(f)
38+
}
39+
40+
func (fs *FileStreamer) Write(ctx context.Context, b []byte) error {
41+
fs.logger.Debug("FileStreamer: Write", "path", fs.path)
42+
f, err := os.OpenFile(fs.path, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0644)
43+
if err != nil {
44+
return err
45+
}
46+
defer func() {
47+
cErr := f.Close()
48+
err = errors.Join(err, cErr)
49+
}()
50+
51+
n, err := f.Write(b)
52+
if err != nil {
53+
return err
54+
}
55+
fs.logger.Debug("FileStreamer: Write finished", "bytes written", n)
56+
return nil
57+
}
58+
59+
var handshakeConfig = plugin.HandshakeConfig{
60+
ProtocolVersion: 1,
61+
MagicCookieKey: "BASIC_PLUGIN",
62+
MagicCookieValue: "hello",
63+
}
64+
65+
func main() {
66+
logger := hclog.New(&hclog.LoggerOptions{
67+
Level: hclog.Trace,
68+
Output: os.Stderr,
69+
JSONFormat: true,
70+
})
71+
72+
streamer := &FileStreamer{
73+
logger: logger,
74+
}
75+
var pluginMap = map[string]plugin.Plugin{
76+
"streamer": &shared.StreamerPlugin{
77+
Impl: streamer,
78+
},
79+
}
80+
81+
logger.Debug("plugin launched, about to be served")
82+
83+
plugin.Serve(&plugin.ServeConfig{
84+
HandshakeConfig: handshakeConfig,
85+
Plugins: pluginMap,
86+
GRPCServer: plugin.DefaultGRPCServer,
87+
Logger: logger,
88+
})
89+
}

0 commit comments

Comments
 (0)