Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add message producer #125

Merged
merged 3 commits into from
Nov 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions _examples/queues/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
build
node_modules
.wrangler
12 changes: 12 additions & 0 deletions _examples/queues/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
.PHONY: dev
dev:
npx wrangler dev --port 8787

.PHONY: build
build:
go run ../../cmd/workers-assets-gen
tinygo build -o ./build/app.wasm -target wasm -no-debug ./...

.PHONY: deploy
deploy:
npx wrangler deploy
38 changes: 38 additions & 0 deletions _examples/queues/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# queues

An example of using Cloudflare Workers that interact with [Cloudflare Queues](https://developers.cloudflare.com/queues/).

## Running

### Requirements

This project requires these tools to be installed globally.

* wrangler
* tinygo

### Supported commands

```
make dev # run dev server
make build # build Go Wasm binary
make deploy # deploy worker
```

### Interacting with the local queue

1. Start the dev server.
```sh
make dev
```

2. Send a message to the queue.
```sh
curl -v -X POST http://localhost:8787/ -d '{"message": "Hello, World!"}' -H "Content-Type: application/json"
```

3. Observe the response and server logs

4. You can pass `text/plain` content type to write queue message as the string or omit the `Content-Type` header to write queue message as
byte array.

7 changes: 7 additions & 0 deletions _examples/queues/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
module github.com/syumai/workers/_examples/queues

go 1.22.8

require github.com/syumai/workers v0.0.0

replace github.com/syumai/workers => ../../
Empty file added _examples/queues/go.sum
Empty file.
105 changes: 105 additions & 0 deletions _examples/queues/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package main

import (
"encoding/json"
"fmt"
"io"
"log"
"net/http"

"github.com/syumai/workers"
"github.com/syumai/workers/cloudflare/queues"
)

const queueName = "QUEUE"

func handleErr(w http.ResponseWriter, msg string, err error) {
log.Println(err)
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(msg))
}

func main() {
http.HandleFunc("/", handleProduce)
workers.Serve(nil)
}
func handleProduce(w http.ResponseWriter, req *http.Request) {
if req.URL.Path != "/" {
w.WriteHeader(http.StatusNotFound)
return
}

if req.Method != http.MethodPost {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}

defer req.Body.Close()

q, err := queues.NewProducer(queueName)
if err != nil {
handleErr(w, "failed to init queue", err)
}

contentType := req.Header.Get("Content-Type")
switch contentType {
case "text/plain":
log.Println("Handling text content type")
err = produceText(q, req)
case "application/json":
log.Println("Handling json content type")
err = produceJson(q, req)
default:
log.Println("Handling bytes content type")
err = produceBytes(q, req)
}

if err != nil {
handleErr(w, "failed to handle request", err)
return
}

w.WriteHeader(http.StatusOK)
w.Write([]byte("message sent\n"))
}

func produceText(q *queues.Producer, req *http.Request) error {
content, err := io.ReadAll(req.Body)
if err != nil {
return fmt.Errorf("failed to read request body: %w", err)
}
if len(content) == 0 {
return fmt.Errorf("empty request body")
}

// text content type supports string and []byte messages
if err := q.Send(content, queues.WithContentType(queues.QueueContentTypeText)); err != nil {
return fmt.Errorf("failed to send message: %w", err)
}

return nil
}

func produceJson(q *queues.Producer, req *http.Request) error {
var data any
if err := json.NewDecoder(req.Body).Decode(&data); err != nil {
return fmt.Errorf("failed to read request body: %w", err)
}

// json content type is default and therefore can be omitted
// json content type supports messages of types that can be serialized to json
if err := q.Send(data); err != nil {
return fmt.Errorf("failed to send message: %w", err)
}

return nil
}

func produceBytes(q *queues.Producer, req *http.Request) error {
// bytes content type support messages of type []byte, string, and io.Reader
if err := q.Send(req.Body, queues.WithContentType(queues.QueueContentTypeBytes)); err != nil {
return fmt.Errorf("failed to send message: %w", err)
}

return nil
}
13 changes: 13 additions & 0 deletions _examples/queues/wrangler.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
name = "queues-producer"
main = "./build/worker.mjs"
compatibility_date = "2022-05-13"
compatibility_flags = [
"streams_enable_constructors"
]

[[queues.producers]]
queue = "my-queue"
binding = "QUEUE"

[build]
command = "make build"
82 changes: 82 additions & 0 deletions cloudflare/queues/content_type.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package queues

import (
"fmt"
"io"
"syscall/js"

"github.com/syumai/workers/internal/jsutil"
)

// QueueContentType represents the content type of a message produced to a queue.
// This information mostly affects how the message body is represented in the Cloudflare UI and is NOT
// propagated to the consumer side.
// - https://developers.cloudflare.com/queues/configuration/javascript-apis/#queuescontenttype
type QueueContentType string

const (
// QueueContentTypeJSON is the default content type for the produced queue message.
// The message body is NOT being marshaled before sending and is passed to js.ValueOf directly.
// Make sure the body is serializable to JSON.
// - https://pkg.go.dev/syscall/js#ValueOf
QueueContentTypeJSON QueueContentType = "json"

// QueueContentTypeV8 is currently treated the same as QueueContentTypeJSON.
QueueContentTypeV8 QueueContentType = "v8"

// QueueContentTypeText is used to send a message as a string.
// Supported body types are string, []byte and io.Reader.
QueueContentTypeText QueueContentType = "text"

// QueueContentTypeBytes is used to send a message as a byte array.
// Supported body types are string, []byte, and io.Reader.
QueueContentTypeBytes QueueContentType = "bytes"
)

func (o QueueContentType) mapValue(val any) (js.Value, error) {
switch o {
case QueueContentTypeText:
switch v := val.(type) {
case string:
return js.ValueOf(v), nil
case []byte:
return js.ValueOf(string(v)), nil
case io.Reader:
b, err := io.ReadAll(v)
if err != nil {
return js.Undefined(), fmt.Errorf("failed to read bytes from reader: %w", err)
}
return js.ValueOf(string(b)), nil
default:
return js.Undefined(), fmt.Errorf("invalid value type for text content type: %T", val)
}

case QueueContentTypeBytes:
var b []byte
switch v := val.(type) {
case string:
b = []byte(v)
case []byte:
b = v
case io.Reader:
var err error
b, err = io.ReadAll(v)
if err != nil {
return js.Undefined(), fmt.Errorf("failed to read bytes from reader: %w", err)
}
default:
return js.Undefined(), fmt.Errorf("invalid value type for bytes content type: %T", val)
}

ua := jsutil.NewUint8Array(len(b))
js.CopyBytesToJS(ua, b)
// accortind to docs, "bytes" type requires an ArrayBuffer to be sent, however practical experience shows that ArrayBufferView should
// be used instead and with Uint8Array.buffer as a value, the send simply fails
return ua, nil

case QueueContentTypeJSON, QueueContentTypeV8:
return js.ValueOf(val), nil
}

return js.Undefined(), fmt.Errorf("unknown content type: %s", o)
}
Loading
Loading