From ee70c46c1e09d0d834dd53db8532df85a4dee735 Mon Sep 17 00:00:00 2001 From: "mike.art" Date: Fri, 11 Oct 2024 11:41:24 +0200 Subject: [PATCH 1/3] Add message producer --- _examples/queues/.gitignore | 3 + _examples/queues/Makefile | 12 ++++ _examples/queues/go.mod | 7 ++ _examples/queues/go.sum | 0 _examples/queues/main.go | 105 +++++++++++++++++++++++++++++ _examples/queues/wrangler.toml | 13 ++++ cloudflare/queues/content_type.go | 58 ++++++++++++++++ cloudflare/queues/producer.go | 105 +++++++++++++++++++++++++++++ cloudflare/queues/producer_opts.go | 51 ++++++++++++++ internal/jsutil/jsutil.go | 6 +- 10 files changed, 359 insertions(+), 1 deletion(-) create mode 100644 _examples/queues/.gitignore create mode 100644 _examples/queues/Makefile create mode 100644 _examples/queues/go.mod create mode 100644 _examples/queues/go.sum create mode 100644 _examples/queues/main.go create mode 100644 _examples/queues/wrangler.toml create mode 100644 cloudflare/queues/content_type.go create mode 100644 cloudflare/queues/producer.go create mode 100644 cloudflare/queues/producer_opts.go diff --git a/_examples/queues/.gitignore b/_examples/queues/.gitignore new file mode 100644 index 0000000..aee7b7e --- /dev/null +++ b/_examples/queues/.gitignore @@ -0,0 +1,3 @@ +build +node_modules +.wrangler diff --git a/_examples/queues/Makefile b/_examples/queues/Makefile new file mode 100644 index 0000000..db68197 --- /dev/null +++ b/_examples/queues/Makefile @@ -0,0 +1,12 @@ +.PHONY: dev +dev: + npx wrangler dev --port 8787 + +.PHONY: build +build: + go run ../../cmd/workers-assets-gen + tinygo build -o ./build/app.wasm -target wasm -no-debug ./... + +.PHONY: deploy +deploy: + npx wrangler deploy diff --git a/_examples/queues/go.mod b/_examples/queues/go.mod new file mode 100644 index 0000000..779f83b --- /dev/null +++ b/_examples/queues/go.mod @@ -0,0 +1,7 @@ +module github.com/syumai/workers/_examples/queues + +go 1.22.8 + +require github.com/syumai/workers v0.0.0 + +replace github.com/syumai/workers => ../../ diff --git a/_examples/queues/go.sum b/_examples/queues/go.sum new file mode 100644 index 0000000..e69de29 diff --git a/_examples/queues/main.go b/_examples/queues/main.go new file mode 100644 index 0000000..c899b15 --- /dev/null +++ b/_examples/queues/main.go @@ -0,0 +1,105 @@ +package main + +import ( + "encoding/json" + "fmt" + "io" + "log" + "net/http" + + "github.com/syumai/workers" + "github.com/syumai/workers/cloudflare/queues" +) + +const queueName = "QUEUE" + +func handleErr(w http.ResponseWriter, msg string, err error) { + log.Println(err) + w.WriteHeader(http.StatusInternalServerError) + w.Write([]byte(msg)) +} + +func main() { + http.HandleFunc("/", handleProduce) + workers.Serve(nil) +} +func handleProduce(w http.ResponseWriter, req *http.Request) { + if req.URL.Path != "/" { + w.WriteHeader(http.StatusNotFound) + return + } + + if req.Method != http.MethodPost { + w.WriteHeader(http.StatusMethodNotAllowed) + return + } + + defer req.Body.Close() + + q, err := queues.NewProducer(queueName) + if err != nil { + handleErr(w, "failed to init queue", err) + } + + contentType := req.Header.Get("Content-Type") + switch contentType { + case "text/plain": + log.Println("Handling text content type") + err = produceText(q, req) + case "application/json": + log.Println("Handling json content type") + err = produceJson(q, req) + default: + log.Println("Handling bytes content type") + err = produceBytes(q, req) + } + + if err != nil { + handleErr(w, "failed to handle request", err) + return + } + + w.WriteHeader(http.StatusOK) + w.Write([]byte("message sent\n")) +} + +func produceText(q *queues.Producer, req *http.Request) error { + content, err := io.ReadAll(req.Body) + if err != nil { + return fmt.Errorf("failed to read request body: %w", err) + } + if len(content) == 0 { + return fmt.Errorf("empty request body") + } + + // text content type supports string and []byte messages + if err := q.Send(content, queues.WithContentType(queues.QueueContentTypeText)); err != nil { + return fmt.Errorf("failed to send message: %w", err) + } + + return nil +} + +func produceJson(q *queues.Producer, req *http.Request) error { + var data any + if err := json.NewDecoder(req.Body).Decode(&data); err != nil { + return fmt.Errorf("failed to read request body: %w", err) + } + + // json content type is default and therefore can be omitted + // json content type supports messages of types that can be serialized to json + if err := q.Send(data); err != nil { + return fmt.Errorf("failed to send message: %w", err) + } + + return nil +} + +func produceBytes(q *queues.Producer, req *http.Request) error { + // bytes content type support messages of type []byte, string, and io.Reader + if err := q.Send(req.Body, queues.WithContentType(queues.QueueContentTypeBytes)); err != nil { + return fmt.Errorf("failed to send message: %w", err) + } + + return nil +} diff --git a/_examples/queues/wrangler.toml b/_examples/queues/wrangler.toml new file mode 100644 index 0000000..0965832 --- /dev/null +++ b/_examples/queues/wrangler.toml @@ -0,0 +1,13 @@ +name = "queues-producer" +main = "./build/worker.mjs" +compatibility_date = "2022-05-13" +compatibility_flags = [ + "streams_enable_constructors" +] + +[[queues.producers]] +queue = "my-queue" +binding = "QUEUE" + +[build] +command = "make build" diff --git a/cloudflare/queues/content_type.go b/cloudflare/queues/content_type.go new file mode 100644 index 0000000..63cc82b --- /dev/null +++ b/cloudflare/queues/content_type.go @@ -0,0 +1,58 @@ +package queues + +import ( + "fmt" + "io" + "syscall/js" + + "github.com/syumai/workers/internal/jsutil" +) + +type QueueContentType string + +const ( + QueueContentTypeJSON QueueContentType = "json" + QueueContentTypeText QueueContentType = "text" + QueueContentTypeBytes QueueContentType = "bytes" + QueueContentTypeV8 QueueContentType = "v8" +) + +func (o QueueContentType) mapValue(val any) (js.Value, error) { + switch o { + case QueueContentTypeText: + switch v := val.(type) { + case string: + return js.ValueOf(v), nil + case []byte: + return js.ValueOf(string(v)), nil + default: + return js.Undefined(), fmt.Errorf("invalid value type for text content type: %T", val) + } + + case QueueContentTypeBytes: + var b []byte + switch v := val.(type) { + case string: + b = []byte(v) + case []byte: + b = v + case io.Reader: + var err error + b, err = io.ReadAll(v) + if err != nil { + return js.Undefined(), fmt.Errorf("failed to read bytes from reader: %w", err) + } + default: + return js.Undefined(), fmt.Errorf("invalid value type for bytes content type: %T", val) + } + + ua := jsutil.NewUint8Array(len(b)) + js.CopyBytesToJS(ua, b) + return ua.Get("buffer"), nil + + case QueueContentTypeJSON, QueueContentTypeV8: + return js.ValueOf(val), nil + } + + return js.Undefined(), fmt.Errorf("unknown content type: %s", o) +} diff --git a/cloudflare/queues/producer.go b/cloudflare/queues/producer.go new file mode 100644 index 0000000..972408c --- /dev/null +++ b/cloudflare/queues/producer.go @@ -0,0 +1,105 @@ +package queues + +import ( + "errors" + "fmt" + "syscall/js" + + "github.com/syumai/workers/cloudflare/internal/cfruntimecontext" + "github.com/syumai/workers/internal/jsutil" +) + +type BatchMessage struct { + body any + options *sendOptions +} + +func NewBatchMessage(body any, opts ...SendOption) *BatchMessage { + options := defaultSendOptions() + for _, opt := range opts { + opt(options) + } + return &BatchMessage{body: body, options: options} +} + +func (m *BatchMessage) toJS() (js.Value, error) { + if m == nil { + return js.Undefined(), errors.New("message is nil") + } + + jsValue, err := m.options.ContentType.mapValue(m.body) + if err != nil { + return js.Undefined(), err + } + + obj := jsutil.NewObject() + obj.Set("body", jsValue) + obj.Set("options", m.options.toJS()) + + return obj, nil +} + +type Producer struct { + // queue - Objects that Queue API belongs to. Default is Global + queue js.Value +} + +func NewProducer(queueName string) (*Producer, error) { + inst := cfruntimecontext.MustGetRuntimeContextEnv().Get(queueName) + if inst.IsUndefined() { + return nil, fmt.Errorf("%s is undefined", queueName) + } + return &Producer{queue: inst}, nil +} + +func (p *Producer) Send(content any, opts ...SendOption) error { + if p.queue.IsUndefined() { + return errors.New("queue object not found") + } + + options := defaultSendOptions() + for _, opt := range opts { + opt(options) + } + + jsValue, err := options.ContentType.mapValue(content) + if err != nil { + return err + } + + prom := p.queue.Call("send", jsValue, options.toJS()) + _, err = jsutil.AwaitPromise(prom) + return err +} + +func (p *Producer) SendBatch(messages []*BatchMessage) error { + if p.queue.IsUndefined() { + return errors.New("queue object not found") + } + + if len(messages) == 0 { + return nil + } + + jsArray := jsutil.NewArray(len(messages)) + for i, message := range messages { + jsValue, err := message.toJS() + if err != nil { + return fmt.Errorf("failed to convert message %d to JS: %w", i, err) + } + jsArray.SetIndex(i, jsValue) + } + + prom := p.queue.Call("sendBatch", jsArray) + _, err := jsutil.AwaitPromise(prom) + return err +} + +func (p *Producer) SendJsonBatch(messages ...any) error { + batch := make([]*BatchMessage, len(messages)) + for i, message := range messages { + batch[i] = NewBatchMessage(message) + } + + return p.SendBatch(batch) +} diff --git a/cloudflare/queues/producer_opts.go b/cloudflare/queues/producer_opts.go new file mode 100644 index 0000000..f681874 --- /dev/null +++ b/cloudflare/queues/producer_opts.go @@ -0,0 +1,51 @@ +package queues + +import ( + "syscall/js" + "time" + + "github.com/syumai/workers/internal/jsutil" +) + +type sendOptions struct { + // ContentType - Content type of the message + // Default is "json" + ContentType QueueContentType + + // DelaySeconds - The number of seconds to delay the message. + // Default is 0 + DelaySeconds int +} + +func defaultSendOptions() *sendOptions { + return &sendOptions{ + ContentType: QueueContentTypeJSON, + } +} + +func (o *sendOptions) toJS() js.Value { + obj := jsutil.NewObject() + obj.Set("contentType", string(o.ContentType)) + + if o.DelaySeconds != 0 { + obj.Set("delaySeconds", o.DelaySeconds) + } + + return obj +} + +type SendOption func(*sendOptions) + +// WithContentType changes the content type of the message. +func WithContentType(contentType QueueContentType) SendOption { + return func(o *sendOptions) { + o.ContentType = contentType + } +} + +// WithDelay changes the number of seconds to delay the message. +func (q *Producer) WithDelay(d time.Duration) SendOption { + return func(o *sendOptions) { + o.DelaySeconds = int(d.Seconds()) + } +} diff --git a/internal/jsutil/jsutil.go b/internal/jsutil/jsutil.go index ececd04..5210d86 100644 --- a/internal/jsutil/jsutil.go +++ b/internal/jsutil/jsutil.go @@ -26,6 +26,10 @@ func NewObject() js.Value { return ObjectClass.New() } +func NewArray(size int) js.Value { + return ArrayClass.New(size) +} + func NewUint8Array(size int) js.Value { return Uint8ArrayClass.New(size) } @@ -89,7 +93,7 @@ func StrRecordToMap(v js.Value) map[string]string { return result } -// MaybeString returns string value of given JavaScript value or returns nil if the value is undefined. +// MaybeString returns string value of given JavaScript value or returns "" if the value is undefined. func MaybeString(v js.Value) string { if v.IsUndefined() { return "" From bdeff99c27d70cb2942baad61a2434bc3dfb8954 Mon Sep 17 00:00:00 2001 From: "mike.art" Date: Mon, 14 Oct 2024 10:34:44 +0200 Subject: [PATCH 2/3] Fix "bytes" value mapping --- cloudflare/queues/content_type.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/cloudflare/queues/content_type.go b/cloudflare/queues/content_type.go index 63cc82b..f68102a 100644 --- a/cloudflare/queues/content_type.go +++ b/cloudflare/queues/content_type.go @@ -48,7 +48,9 @@ func (o QueueContentType) mapValue(val any) (js.Value, error) { ua := jsutil.NewUint8Array(len(b)) js.CopyBytesToJS(ua, b) - return ua.Get("buffer"), nil + // accortind to docs, "bytes" type requires an ArrayBuffer to be sent, however practical experience shows that ArrayBufferView should + // be used instead and with Uint8Array.buffer as a value, the send simply fails + return ua, nil case QueueContentTypeJSON, QueueContentTypeV8: return js.ValueOf(val), nil From ae2bcf1937f997f9ce4c1f704a893e018c16f4a5 Mon Sep 17 00:00:00 2001 From: "mike.art" Date: Mon, 14 Oct 2024 22:52:44 +0200 Subject: [PATCH 3/3] Add tests and docs --- _examples/queues/README.md | 38 +++++ cloudflare/queues/content_type.go | 28 +++- cloudflare/queues/content_type_test.go | 205 ++++++++++++++++++++++++ cloudflare/queues/producer.go | 32 ++-- cloudflare/queues/producer_opts.go | 30 +++- cloudflare/queues/producer_test.go | 211 +++++++++++++++++++++++++ 6 files changed, 529 insertions(+), 15 deletions(-) create mode 100644 _examples/queues/README.md create mode 100644 cloudflare/queues/content_type_test.go create mode 100644 cloudflare/queues/producer_test.go diff --git a/_examples/queues/README.md b/_examples/queues/README.md new file mode 100644 index 0000000..cfb6ef9 --- /dev/null +++ b/_examples/queues/README.md @@ -0,0 +1,38 @@ +# queues + +An example of using Cloudflare Workers that interact with [Cloudflare Queues](https://developers.cloudflare.com/queues/). + +## Running + +### Requirements + +This project requires these tools to be installed globally. + +* wrangler +* tinygo + +### Supported commands + +``` +make dev # run dev server +make build # build Go Wasm binary +make deploy # deploy worker +``` + +### Interacting with the local queue + +1. Start the dev server. +```sh +make dev +``` + +2. Send a message to the queue. +```sh +curl -v -X POST http://localhost:8787/ -d '{"message": "Hello, World!"}' -H "Content-Type: application/json" +``` + +3. Observe the response and server logs + +4. You can pass `text/plain` content type to write queue message as the string or omit the `Content-Type` header to write queue message as +byte array. + diff --git a/cloudflare/queues/content_type.go b/cloudflare/queues/content_type.go index f68102a..b0db97a 100644 --- a/cloudflare/queues/content_type.go +++ b/cloudflare/queues/content_type.go @@ -8,13 +8,29 @@ import ( "github.com/syumai/workers/internal/jsutil" ) +// QueueContentType represents the content type of a message produced to a queue. +// This information mostly affects how the message body is represented in the Cloudflare UI and is NOT +// propagated to the consumer side. +// - https://developers.cloudflare.com/queues/configuration/javascript-apis/#queuescontenttype type QueueContentType string const ( - QueueContentTypeJSON QueueContentType = "json" - QueueContentTypeText QueueContentType = "text" + // QueueContentTypeJSON is the default content type for the produced queue message. + // The message body is NOT being marshaled before sending and is passed to js.ValueOf directly. + // Make sure the body is serializable to JSON. + // - https://pkg.go.dev/syscall/js#ValueOf + QueueContentTypeJSON QueueContentType = "json" + + // QueueContentTypeV8 is currently treated the same as QueueContentTypeJSON. + QueueContentTypeV8 QueueContentType = "v8" + + // QueueContentTypeText is used to send a message as a string. + // Supported body types are string, []byte and io.Reader. + QueueContentTypeText QueueContentType = "text" + + // QueueContentTypeBytes is used to send a message as a byte array. + // Supported body types are string, []byte, and io.Reader. QueueContentTypeBytes QueueContentType = "bytes" - QueueContentTypeV8 QueueContentType = "v8" ) func (o QueueContentType) mapValue(val any) (js.Value, error) { @@ -25,6 +41,12 @@ func (o QueueContentType) mapValue(val any) (js.Value, error) { return js.ValueOf(v), nil case []byte: return js.ValueOf(string(v)), nil + case io.Reader: + b, err := io.ReadAll(v) + if err != nil { + return js.Undefined(), fmt.Errorf("failed to read bytes from reader: %w", err) + } + return js.ValueOf(string(b)), nil default: return js.Undefined(), fmt.Errorf("invalid value type for text content type: %T", val) } diff --git a/cloudflare/queues/content_type_test.go b/cloudflare/queues/content_type_test.go new file mode 100644 index 0000000..5305c97 --- /dev/null +++ b/cloudflare/queues/content_type_test.go @@ -0,0 +1,205 @@ +package queues + +import ( + "bytes" + "syscall/js" + "testing" + + "github.com/syumai/workers/internal/jsutil" +) + +func TestContentType_mapValue(t *testing.T) { + tests := []struct { + name string + contentType QueueContentType + val any + want js.Value + wantErr bool + }{ + { + name: "string as text", + contentType: QueueContentTypeText, + val: "hello", + want: js.ValueOf("hello"), + }, + { + name: "[]byte as text", + contentType: QueueContentTypeText, + val: []byte("hello"), + want: js.ValueOf("hello"), + }, + { + name: "io.Reader as text", + contentType: QueueContentTypeText, + val: bytes.NewBufferString("hello"), + want: js.ValueOf("hello"), + }, + { + name: "number as text", + contentType: QueueContentTypeText, + val: 42, + want: js.Undefined(), + wantErr: true, + }, + { + name: "function as text", + contentType: QueueContentTypeText, + val: func() {}, + want: js.Undefined(), + wantErr: true, + }, + + { + name: "string as json", + contentType: QueueContentTypeJSON, + val: "hello", + want: js.ValueOf("hello"), + }, + { + name: "number as json", + contentType: QueueContentTypeJSON, + val: 42, + want: js.ValueOf(42), + }, + { + name: "bool as json", + contentType: QueueContentTypeJSON, + val: true, + want: js.ValueOf(true), + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := tt.contentType.mapValue(tt.val) + if (err != nil) != tt.wantErr { + t.Fatalf("%s.mapValue() error = %v, wantErr %v", tt.contentType, err, tt.wantErr) + } + if got.String() != tt.want.String() { + t.Errorf("%s.mapValue() = %v, want %v", tt.contentType, got, tt.want) + } + }) + } +} + +func TestContentType_mapValue_bytes(t *testing.T) { + jsOf := func(b []byte) js.Value { + ua := jsutil.NewUint8Array(len(b)) + js.CopyBytesToJS(ua, b) + return ua + } + + tests := []struct { + name string + val any + want js.Value + }{ + { + name: "[]byte as bytes", + val: []byte("hello"), + want: jsOf([]byte("hello")), + }, + { + name: "string as bytes", + val: "hello", + want: jsOf([]byte("hello"))}, + { + name: "io.Reader as bytes", + val: bytes.NewBufferString("hello"), + want: jsOf([]byte("hello")), + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := QueueContentTypeBytes.mapValue(tt.val) + if err != nil { + t.Fatalf("%s.mapValue() got error = %v", QueueContentTypeBytes, err) + } + if got.Type() != tt.want.Type() { + t.Errorf("%s.mapValue() = type %v, want type %v", QueueContentTypeBytes, got, tt.want) + } + if got.String() != tt.want.String() { + t.Errorf("%s.mapValue() = %v, want %v", QueueContentTypeBytes, got, tt.want) + } + }) + } +} + +func TestContentType_mapValue_map(t *testing.T) { + val := map[string]interface{}{ + "Name": "Alice", + "Age": 42, + } + + tests := []struct { + name string + contentType QueueContentType + }{ + { + name: "json", + contentType: QueueContentTypeJSON, + }, + { + name: "v8", + contentType: QueueContentTypeV8, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + + got, err := tt.contentType.mapValue(val) + if err != nil { + t.Fatalf("QueueContentTypeJSON.mapValue() got error = %v", err) + } + if got.Type() != js.TypeObject { + t.Errorf("QueueContentTypeJSON.mapValue() = type %v, want type %v", got, js.TypeObject) + } + if got.Get("Name").String() != "Alice" { + t.Errorf("QueueContentTypeJSON.mapValue() = %v, want %v", got.Get("Name").String(), "Alice") + } + if got.Get("Age").Int() != 42 { + t.Errorf("QueueContentTypeJSON.mapValue() = %v, want %v", got.Get("Age").Int(), 42) + } + }) + } +} + +type User struct { + Name string +} + +func TestContentType_mapValue_unsupported_types(t *testing.T) { + t.Run("struct as json", func(t *testing.T) { + defer func() { + if p := recover(); p == nil { + t.Fatalf("QueueContentTypeJSON.mapValue() did not panic") + } + }() + + val := User{Name: "Alice"} + _, _ = QueueContentTypeJSON.mapValue(val) + }) + + t.Run("slice of structs as json", func(t *testing.T) { + defer func() { + if p := recover(); p == nil { + t.Fatalf("QueueContentTypeJSON.mapValue() did not panic") + } + }() + + val := User{Name: "Alice"} + _, _ = QueueContentTypeJSON.mapValue([]User{val}) + }) + + t.Run("slice of bytes as json", func(t *testing.T) { + defer func() { + if p := recover(); p == nil { + t.Fatalf("QueueContentTypeJSON.mapValue() did not panic") + } + }() + + _, _ = QueueContentTypeJSON.mapValue([]byte("hello")) + }) +} diff --git a/cloudflare/queues/producer.go b/cloudflare/queues/producer.go index 972408c..6fa47eb 100644 --- a/cloudflare/queues/producer.go +++ b/cloudflare/queues/producer.go @@ -14,6 +14,7 @@ type BatchMessage struct { options *sendOptions } +// NewBatchMessage creates a single message to be batched before sending to a queue. func NewBatchMessage(body any, opts ...SendOption) *BatchMessage { options := defaultSendOptions() for _, opt := range opts { @@ -44,6 +45,10 @@ type Producer struct { queue js.Value } +// NewProducer creates a new Producer object to send messages to a queue. +// queueName is the name of the queue environment var to send messages to. +// In Cloudflare API documentation, this object represents the Queue. +// - https://developers.cloudflare.com/queues/configuration/javascript-apis/#producer func NewProducer(queueName string) (*Producer, error) { inst := cfruntimecontext.MustGetRuntimeContextEnv().Get(queueName) if inst.IsUndefined() { @@ -52,6 +57,11 @@ func NewProducer(queueName string) (*Producer, error) { return &Producer{queue: inst}, nil } +// Send sends a single message to a queue. This function allows setting send options for the message. +// If no options are provided, the default options are used (QueueContentTypeJSON and no delay). +// +// - https://developers.cloudflare.com/queues/configuration/javascript-apis/#producer +// - https://developers.cloudflare.com/queues/configuration/javascript-apis/#queuesendoptions func (p *Producer) Send(content any, opts ...SendOption) error { if p.queue.IsUndefined() { return errors.New("queue object not found") @@ -72,7 +82,8 @@ func (p *Producer) Send(content any, opts ...SendOption) error { return err } -func (p *Producer) SendBatch(messages []*BatchMessage) error { +// SendBatch sends multiple messages to a queue. This function allows setting options for each message. +func (p *Producer) SendBatch(messages []*BatchMessage, opts ...BatchSendOption) error { if p.queue.IsUndefined() { return errors.New("queue object not found") } @@ -81,6 +92,14 @@ func (p *Producer) SendBatch(messages []*BatchMessage) error { return nil } + var options *batchSendOptions + if len(opts) > 0 { + options = &batchSendOptions{} + for _, opt := range opts { + opt(options) + } + } + jsArray := jsutil.NewArray(len(messages)) for i, message := range messages { jsValue, err := message.toJS() @@ -90,16 +109,7 @@ func (p *Producer) SendBatch(messages []*BatchMessage) error { jsArray.SetIndex(i, jsValue) } - prom := p.queue.Call("sendBatch", jsArray) + prom := p.queue.Call("sendBatch", jsArray, options.toJS()) _, err := jsutil.AwaitPromise(prom) return err } - -func (p *Producer) SendJsonBatch(messages ...any) error { - batch := make([]*BatchMessage, len(messages)) - for i, message := range messages { - batch[i] = NewBatchMessage(message) - } - - return p.SendBatch(batch) -} diff --git a/cloudflare/queues/producer_opts.go b/cloudflare/queues/producer_opts.go index f681874..5d345ae 100644 --- a/cloudflare/queues/producer_opts.go +++ b/cloudflare/queues/producer_opts.go @@ -44,8 +44,36 @@ func WithContentType(contentType QueueContentType) SendOption { } // WithDelay changes the number of seconds to delay the message. -func (q *Producer) WithDelay(d time.Duration) SendOption { +func WithDelay(d time.Duration) SendOption { return func(o *sendOptions) { o.DelaySeconds = int(d.Seconds()) } } + +type batchSendOptions struct { + // DelaySeconds - The number of seconds to delay the message. + // Default is 0 + DelaySeconds int +} + +func (o *batchSendOptions) toJS() js.Value { + if o == nil { + return js.Undefined() + } + + obj := jsutil.NewObject() + if o.DelaySeconds != 0 { + obj.Set("delaySeconds", o.DelaySeconds) + } + + return obj +} + +type BatchSendOption func(*batchSendOptions) + +// WithBatchDelay changes the number of seconds to delay the message. +func WithBatchDelay(d time.Duration) BatchSendOption { + return func(o *batchSendOptions) { + o.DelaySeconds = int(d.Seconds()) + } +} diff --git a/cloudflare/queues/producer_test.go b/cloudflare/queues/producer_test.go new file mode 100644 index 0000000..fc00a93 --- /dev/null +++ b/cloudflare/queues/producer_test.go @@ -0,0 +1,211 @@ +package queues + +import ( + "errors" + "fmt" + "syscall/js" + "testing" + "time" + + "github.com/syumai/workers/internal/jsutil" +) + +func validatingProducer(t *testing.T, validateFn func(message js.Value, options js.Value) error) *Producer { + sendFn := js.FuncOf(func(this js.Value, args []js.Value) interface{} { + sendArg := args[0] // this should be batch (in case of SendBatch) or a single message (in case of Send) + var options js.Value + if len(args) > 1 { + options = args[1] + } + return jsutil.NewPromise(js.FuncOf(func(this js.Value, args []js.Value) interface{} { + resolve := args[0] + go func() { + if err := validateFn(sendArg, options); err != nil { + // must be non-fatal to avoid a deadlock + t.Errorf("validation failed: %v", err) + } + resolve.Invoke(js.Undefined()) + }() + return js.Undefined() + })) + }) + + queue := jsutil.NewObject() + queue.Set("send", sendFn) + queue.Set("sendBatch", sendFn) + + return &Producer{queue: queue} +} + +func TestSend(t *testing.T) { + t.Run("text content type", func(t *testing.T) { + validation := func(message js.Value, options js.Value) error { + if message.Type() != js.TypeString { + return errors.New("message body must be a string") + } + if message.String() != "hello" { + return errors.New("message body must be 'hello'") + } + if options.Get("contentType").String() != "text" { + return errors.New("content type must be text") + } + return nil + } + + producer := validatingProducer(t, validation) + err := producer.Send("hello", WithContentType(QueueContentTypeText)) + if err != nil { + t.Fatalf("Send failed: %v", err) + } + }) + + t.Run("json content type", func(t *testing.T) { + validation := func(message js.Value, options js.Value) error { + if message.Type() != js.TypeString { + return errors.New("message body must be a string") + } + if message.String() != "hello" { + return errors.New("message body must be 'hello'") + } + if options.Get("contentType").String() != "json" { + return errors.New("content type must be json") + } + return nil + } + + producer := validatingProducer(t, validation) + err := producer.Send("hello", WithContentType(QueueContentTypeJSON)) + if err != nil { + t.Fatalf("Send failed: %v", err) + } + }) +} + +func TestSend_ContentTypeOption(t *testing.T) { + tests := []struct { + name string + options []SendOption + expectedContentType string + expectedDelaySec int + wantErr bool + }{ + { + name: "text", + options: []SendOption{WithContentType(QueueContentTypeText)}, + expectedContentType: "text", + }, + { + name: "json", + options: []SendOption{WithContentType(QueueContentTypeJSON)}, + expectedContentType: "json", + }, + { + name: "default", + options: nil, + expectedContentType: "json", + }, + { + name: "v8", + options: []SendOption{WithContentType(QueueContentTypeV8)}, + expectedContentType: "v8", + }, + { + name: "bytes", + options: []SendOption{WithContentType(QueueContentTypeBytes)}, + expectedContentType: "bytes", + }, + + { + name: "delay", + options: []SendOption{WithDelay(5 * time.Second)}, + expectedDelaySec: 5, + expectedContentType: "json", + }, + + { + name: "invalid content type", + options: []SendOption{WithContentType("invalid")}, + wantErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + validation := func(message js.Value, options js.Value) error { + gotCT := options.Get("contentType").String() + if gotCT != string(tt.expectedContentType) { + return fmt.Errorf("expected content type %q, got %q", tt.expectedContentType, gotCT) + } + gotDelaySec := jsutil.MaybeInt(options.Get("delaySeconds")) + if gotDelaySec != tt.expectedDelaySec { + return fmt.Errorf("expected delay %d, got %d", tt.expectedDelaySec, gotDelaySec) + } + return nil + } + + producer := validatingProducer(t, validation) + err := producer.Send("hello", tt.options...) + if (err != nil) != tt.wantErr { + t.Fatalf("expected error: %t, got %v", tt.wantErr, err) + } + }) + } +} + +func TestSendBatch_Defaults(t *testing.T) { + validation := func(batch js.Value, options js.Value) error { + if batch.Type() != js.TypeObject { + return errors.New("message batch must be an object (array)") + } + if batch.Length() != 2 { + return fmt.Errorf("expected 2 messages, got %d", batch.Length()) + } + first := batch.Index(0) + if first.Get("body").String() != "hello" { + return fmt.Errorf("first message body must be 'hello', was %s", first.Get("body")) + } + if first.Get("options").Get("contentType").String() != "json" { + return fmt.Errorf("first message content type must be json, was %s", first.Get("options").Get("contentType")) + } + + second := batch.Index(1) + if second.Get("body").String() != "world" { + return fmt.Errorf("second message body must be 'world', was %s", second.Get("body")) + } + if second.Get("options").Get("contentType").String() != "text" { + return fmt.Errorf("second message content type must be text, was %s", second.Get("options").Get("contentType")) + } + + return nil + } + + var batch []*BatchMessage = []*BatchMessage{ + NewBatchMessage("hello"), + NewBatchMessage("world", WithContentType(QueueContentTypeText)), + } + + producer := validatingProducer(t, validation) + err := producer.SendBatch(batch) + if err != nil { + t.Fatalf("SendBatch failed: %v", err) + } +} + +func TestSendBatch_Options(t *testing.T) { + validation := func(_ js.Value, options js.Value) error { + if options.Get("delaySeconds").Int() != 5 { + return fmt.Errorf("expected delay 5, got %d", options.Get("delaySeconds").Int()) + } + return nil + } + + var batch []*BatchMessage = []*BatchMessage{ + NewBatchMessage("hello"), + } + + producer := validatingProducer(t, validation) + err := producer.SendBatch(batch, WithBatchDelay(5*time.Second)) + if err != nil { + t.Fatalf("SendBatch failed: %v", err) + } +}