Skip to content

Commit 4beee48

Browse files
authored
Merge pull request #133 from syumai/split-producer-send-message-funcs
split queues.Producer's Send funcs and BatchMessage constructors
2 parents 605532b + 15f2b81 commit 4beee48

File tree

11 files changed

+210
-532
lines changed

11 files changed

+210
-532
lines changed

_examples/queues/Makefile

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
.PHONY: dev
22
dev:
3-
npx wrangler dev --port 8787
3+
wrangler dev
44

55
.PHONY: build
66
build:
@@ -9,4 +9,4 @@ build:
99

1010
.PHONY: deploy
1111
deploy:
12-
npx wrangler deploy
12+
wrangler deploy

_examples/queues/main.go

Lines changed: 12 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ func main() {
2323
http.HandleFunc("/", handleProduce)
2424
workers.Serve(nil)
2525
}
26+
2627
func handleProduce(w http.ResponseWriter, req *http.Request) {
2728
if req.URL.Path != "/" {
2829
w.WriteHeader(http.StatusNotFound)
@@ -48,7 +49,7 @@ func handleProduce(w http.ResponseWriter, req *http.Request) {
4849
err = produceText(q, req)
4950
case "application/json":
5051
log.Println("Handling json content type")
51-
err = produceJson(q, req)
52+
err = produceJSON(q, req)
5253
default:
5354
log.Println("Handling bytes content type")
5455
err = produceBytes(q, req)
@@ -68,38 +69,33 @@ func produceText(q *queues.Producer, req *http.Request) error {
6869
if err != nil {
6970
return fmt.Errorf("failed to read request body: %w", err)
7071
}
71-
if len(content) == 0 {
72-
return fmt.Errorf("empty request body")
73-
}
74-
75-
// text content type supports string and []byte messages
76-
if err := q.Send(content, queues.WithContentType(queues.QueueContentTypeText)); err != nil {
72+
// text content type supports string
73+
if err := q.SendText(string(content)); err != nil {
7774
return fmt.Errorf("failed to send message: %w", err)
7875
}
79-
8076
return nil
8177
}
8278

83-
func produceJson(q *queues.Producer, req *http.Request) error {
79+
func produceJSON(q *queues.Producer, req *http.Request) error {
8480
var data any
8581
if err := json.NewDecoder(req.Body).Decode(&data); err != nil {
8682
return fmt.Errorf("failed to read request body: %w", err)
8783
}
88-
89-
// json content type is default and therefore can be omitted
9084
// json content type supports messages of types that can be serialized to json
91-
if err := q.Send(data); err != nil {
85+
if err := q.SendJSON(data); err != nil {
9286
return fmt.Errorf("failed to send message: %w", err)
9387
}
94-
9588
return nil
9689
}
9790

9891
func produceBytes(q *queues.Producer, req *http.Request) error {
99-
// bytes content type support messages of type []byte, string, and io.Reader
100-
if err := q.Send(req.Body, queues.WithContentType(queues.QueueContentTypeBytes)); err != nil {
92+
content, err := io.ReadAll(req.Body)
93+
if err != nil {
94+
return fmt.Errorf("failed to read request body: %w", err)
95+
}
96+
// bytes content type support messages of type []byte
97+
if err := q.SendBytes(content); err != nil {
10198
return fmt.Errorf("failed to send message: %w", err)
10299
}
103-
104100
return nil
105101
}

cloudflare/queues/batchmessage.go

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
package queues
2+
3+
import (
4+
"syscall/js"
5+
6+
"github.com/syumai/workers/internal/jsutil"
7+
)
8+
9+
type BatchMessage struct {
10+
body js.Value
11+
options *sendOptions
12+
}
13+
14+
// NewTextBatchMessage creates a single text message to be batched before sending to a queue.
15+
func NewTextBatchMessage(content string, opts ...SendOption) *BatchMessage {
16+
return newBatchMessage(js.ValueOf(content), contentTypeText, opts...)
17+
}
18+
19+
// NewBytesBatchMessage creates a single byte array message to be batched before sending to a queue.
20+
func NewBytesBatchMessage(content []byte, opts ...SendOption) *BatchMessage {
21+
return newBatchMessage(js.ValueOf(content), contentTypeBytes, opts...)
22+
}
23+
24+
// NewJSONBatchMessage creates a single JSON message to be batched before sending to a queue.
25+
func NewJSONBatchMessage(content any, opts ...SendOption) *BatchMessage {
26+
return newBatchMessage(js.ValueOf(content), contentTypeJSON, opts...)
27+
}
28+
29+
// NewV8BatchMessage creates a single raw JS value message to be batched before sending to a queue.
30+
func NewV8BatchMessage(content js.Value, opts ...SendOption) *BatchMessage {
31+
return newBatchMessage(content, contentTypeV8, opts...)
32+
}
33+
34+
// newBatchMessage creates a single message to be batched before sending to a queue.
35+
func newBatchMessage(body js.Value, contentType contentType, opts ...SendOption) *BatchMessage {
36+
options := sendOptions{
37+
ContentType: contentType,
38+
}
39+
for _, opt := range opts {
40+
opt(&options)
41+
}
42+
return &BatchMessage{body: body, options: &options}
43+
}
44+
45+
func (m *BatchMessage) toJS() js.Value {
46+
obj := jsutil.NewObject()
47+
obj.Set("body", m.body)
48+
obj.Set("options", m.options.toJS())
49+
return obj
50+
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
package queues
2+
3+
import (
4+
"syscall/js"
5+
"time"
6+
7+
"github.com/syumai/workers/internal/jsutil"
8+
)
9+
10+
type batchSendOptions struct {
11+
// DelaySeconds - The number of seconds to delay the message.
12+
// Default is 0
13+
DelaySeconds int
14+
}
15+
16+
func (o *batchSendOptions) toJS() js.Value {
17+
if o == nil {
18+
return js.Undefined()
19+
}
20+
21+
obj := jsutil.NewObject()
22+
if o.DelaySeconds != 0 {
23+
obj.Set("delaySeconds", o.DelaySeconds)
24+
}
25+
26+
return obj
27+
}
28+
29+
type BatchSendOption func(*batchSendOptions)
30+
31+
// WithBatchDelaySeconds changes the number of seconds to delay the message.
32+
func WithBatchDelaySeconds(d time.Duration) BatchSendOption {
33+
return func(o *batchSendOptions) {
34+
o.DelaySeconds = int(d.Seconds())
35+
}
36+
}

cloudflare/queues/content_type.go

Lines changed: 0 additions & 82 deletions
This file was deleted.

0 commit comments

Comments
 (0)