Skip to content

Commit

Permalink
feat: improve runtime (#185)
Browse files Browse the repository at this point in the history
  • Loading branch information
RobertIndie authored Aug 25, 2024
1 parent 81df826 commit 94e81ae
Show file tree
Hide file tree
Showing 36 changed files with 1,235 additions and 1,982 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/bench.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,14 @@ jobs:
- uses: acifani/setup-tinygo@v2
with:
tinygo-version: '0.30.0'
- run: docker-compose -f ./tests/docker-compose.yaml up -d
- run: docker compose -f ./tests/docker-compose.yaml up -d
- run: make build-all
- name: Wait for Pulsar service
run: until curl http://localhost:8080/metrics > /dev/null 2>&1 ; do sleep 1; done
- run: make bench
- name: Collect Docker Compose logs
if: failure()
run: docker-compose -f ./tests/docker-compose.yaml logs || true
run: docker compose -f ./tests/docker-compose.yaml logs || true
- name: Upload artifacts
uses: actions/upload-artifact@v2
with:
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,14 @@ jobs:
version: v1.55.2
args: --timeout=10m
skip-pkg-cache: true
- run: docker-compose -f ./tests/docker-compose.yaml up -d
- run: docker compose -f ./tests/docker-compose.yaml up -d
- run: make build-all
- name: Wait for Pulsar service
run: until curl http://localhost:8080/metrics > /dev/null 2>&1 ; do sleep 1; done
- run: make test
- name: Collect Docker Compose logs
if: failure()
run: docker-compose -f ./tests/docker-compose.yaml logs || true
run: docker compose -f ./tests/docker-compose.yaml logs || true
- name: Collect nohup logs
if: failure()
run: cat nohup.out || true
6 changes: 5 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ build:

build-example:
tinygo build -o bin/example_basic.wasm -target=wasi ./examples/basic
go build -o bin/example_external_function ./examples/basic

run-example-external-functions:
FS_SOCKET_PATH=/tmp/fs.sock FS_FUNCTION_NAME=fs/external-function ./bin/example_external_function

lint:
golangci-lint run
Expand Down Expand Up @@ -45,7 +49,7 @@ gen-rest-client:
mkdir -p $(ADMIN_CLIENT_DIR)
openapi-generator generate -i ./apidocs.json -g go -o $(ADMIN_CLIENT_DIR) \
--git-user-id functionstream \
--git-repo-id functionstream/$(ADMIN_CLIENT_DIR) \
--git-repo-id function-stream/$(ADMIN_CLIENT_DIR) \
--package-name adminclient \
--global-property apiDocs,apis,models,supportingFiles
rm -r $(addprefix $(ADMIN_CLIENT_DIR)/, $(FILES_TO_REMOVE))
Expand Down
36 changes: 20 additions & 16 deletions clients/gofs/gofs.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
//go:build wasi
// +build wasi

/*
* Copyright 2024 Function Stream Org.
*
Expand All @@ -20,29 +23,22 @@ import "C"
import (
"encoding/json"
"fmt"
"io"
"os"

. "github.com/functionstream/function-stream/common/wasm_utils"
"github.com/wirelessr/avroschema"
"os"
"syscall"
)

var processFile *os.File
var processFd int
var registerSchemaFd int

func init() {
processFile, _ = os.Open("/process")
processFd, _ = syscall.Open("/process", syscall.O_RDWR, 0)
registerSchemaFd, _ = syscall.Open("/registerSchema", syscall.O_RDWR, 0)
}

var processFunc func([]byte) []byte

//go:wasmimport fs registerSchema
func registerSchema(inputSchemaPtrSize, outputSchemaPtrSize uint64)

func Register[I any, O any](process func(*I) *O) error {
inputSchema, err := avroschema.Reflect(new(I))
if err != nil {
return err
}
outputSchema, err := avroschema.Reflect(new(O))
if err != nil {
return err
Expand All @@ -52,18 +48,26 @@ func Register[I any, O any](process func(*I) *O) error {
err = json.Unmarshal(payload, input)
if err != nil {
fmt.Fprintf(os.Stderr, "Failed to parse JSON: %s %s", err, payload)
return nil
}
output := process(input)
outputPayload, _ := json.Marshal(output)
return outputPayload
}
registerSchema(PtrSize(StringToPtr(inputSchema)), PtrSize(StringToPtr(outputSchema)))
syscall.Write(registerSchemaFd, []byte(outputSchema))
return nil
}

//export process
func process() {
payload, _ := io.ReadAll(processFile)
var stat syscall.Stat_t
syscall.Fstat(processFd, &stat)
payload := make([]byte, stat.Size)
_, _ = syscall.Read(processFd, payload)
outputPayload := processFunc(payload)
_, _ = processFile.Write(outputPayload)
_, _ = syscall.Write(processFd, outputPayload)
}

func Run() {
// Leave it empty
}
112 changes: 112 additions & 0 deletions clients/gofs/gofs_socket.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
//go:build !wasi

package gofs

import (
"context"
"encoding/json"
"fmt"
"os"
"time"

"github.com/functionstream/function-stream/fs/runtime/external/model"
"github.com/wirelessr/avroschema"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/metadata"
)

var client model.FunctionClient
var ctx = context.Background()

var processFunc func([]byte) []byte

const (
FSSocketPath = "FS_SOCKET_PATH"
FSFunctionName = "FS_FUNCTION_NAME"
)

func check() error {
if client == nil {
socketPath := os.Getenv(FSSocketPath)
if socketPath == "" {
return fmt.Errorf("%s is not set", FSSocketPath)
}
funcName := os.Getenv(FSFunctionName)
if funcName == "" {
return fmt.Errorf("%s is not set", FSFunctionName)
}

serviceConfig := `{
"methodConfig": [{
"name": [{"service": "*"}],
"retryPolicy": {
"maxAttempts": 30,
"initialBackoff": "0.1s",
"maxBackoff": "30s",
"backoffMultiplier": 2,
"retryableStatusCodes": ["UNAVAILABLE"]
}
}]
}`
conn, err := grpc.NewClient(
"unix:"+socketPath,
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultServiceConfig(serviceConfig),
)
if err != nil {
panic(err)
}
client = model.NewFunctionClient(conn)
md := metadata.New(map[string]string{
"name": funcName,
})
ctx = metadata.NewOutgoingContext(ctx, md)
}
return nil
}

func Register[I any, O any](process func(*I) *O) error {
if err := check(); err != nil {
return err
}
outputSchema, err := avroschema.Reflect(new(O))
if err != nil {
return err
}
processFunc = func(payload []byte) []byte {
input := new(I)
err = json.Unmarshal(payload, input)
if err != nil {
_, _ = fmt.Fprintf(os.Stderr, "Failed to parse JSON: %s %s\n", err, payload)
}
output := process(input)
outputPayload, _ := json.Marshal(output)
return outputPayload
}
_, err = client.RegisterSchema(ctx, &model.RegisterSchemaRequest{Schema: outputSchema})
if err != nil {
_, _ = fmt.Fprintf(os.Stderr, "Failed to register schema: %s\n", err)
panic(err)
}
return nil
}

func Run() {
if err := check(); err != nil {
panic(err)
}
for {
res, err := client.Read(ctx, &model.ReadRequest{})
if err != nil {
_, _ = fmt.Fprintf(os.Stderr, "Failed to read: %s\n", err)
time.Sleep(3 * time.Second)
continue
}
outputPayload := processFunc(res.Payload)
_, err = client.Write(ctx, &model.Event{Payload: outputPayload})
if err != nil {
_, _ = fmt.Fprintf(os.Stderr, "Failed to write: %s\n", err)
}
}
}
21 changes: 7 additions & 14 deletions common/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,11 @@

package common

// ConfigMap is a custom type that represents a map where keys are strings and values are of any type.
// Since Viper is not case-sensitive, we use '-' to separate words in all field names in the config map.
// This convention helps in maintaining consistency across different configurations and makes them easier to read.
//
// For example:
// - `socket-path` refers to the path of the socket.
// - `pulsar-url` refers to the URL of the Pulsar service.
type ConfigMap map[string]interface{}

// MergeConfig merges multiple ConfigMap into one
func MergeConfig(configs ...*ConfigMap) *ConfigMap {
result := ConfigMap{}
for _, config := range configs {
if config == nil {
continue
}
for k, v := range *config {
result[k] = v
}
}
return &result
}
3 changes: 2 additions & 1 deletion common/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ const (
MemoryTubeType = "memory"
HttpTubeType = "http"

WASMRuntime = "wasm"
WASMRuntime = "wasm"
ExternalRuntime = "external"

RuntimeArchiveConfigKey = "archive"

Expand Down
19 changes: 19 additions & 0 deletions common/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,22 @@ func (l *logCounter) LogValue() slog.Value {
func LogCounter() slog.LogValuer {
return &logCounter{}
}

type NamespacedName struct {
namespace string
name string
}

func (n NamespacedName) String() string {
if n.namespace == "" {
return n.name
}
return n.namespace + "/" + n.name
}

func GetNamespacedName(namespace, name string) NamespacedName {
return NamespacedName{
namespace: namespace,
name: name,
}
}
5 changes: 4 additions & 1 deletion conf/function-stream.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,7 @@ queue:
pulsar_url: "pulsar://localhost:6650"
tube-config:
pulsar:
pulsar_url: "pulsar://localhost:6650"
pulsar_url: "pulsar://localhost:6650"
runtime-config:
external:
socket-path: /tmp/fs.sock
11 changes: 7 additions & 4 deletions examples/basic/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@
package main

import (
"fmt"
"github.com/functionstream/function-stream/clients/gofs"
"os"
"log/slog"
)

func main() {
_, _ = fmt.Fprintln(os.Stderr, "Hello from Go!")
slog.Info("Hello from Go wasm!")
gofs.Run()
}

type Person struct {
Expand All @@ -33,7 +33,10 @@ type Person struct {
}

func init() {
_ = gofs.Register(myProcess)
err := gofs.Register(myProcess)
if err != nil {
slog.Error(err.Error())
}
}

func myProcess(person *Person) *Person {
Expand Down
3 changes: 3 additions & 0 deletions fs/api/func_ctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@

package api

import "github.com/functionstream/function-stream/fs/contube"

type FunctionContext interface {
PutState(key string, value []byte) error
GetState(key string) ([]byte, error)
Write(record contube.Record) error
}
11 changes: 11 additions & 0 deletions fs/contube/contube.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,17 @@ func NewRecordImpl(payload []byte, ackFunc func()) *RecordImpl {
}
}

func NewStructRecord(payload any, ackFunc func()) (*RecordImpl, error) {
data, err := json.Marshal(payload)
if err != nil {
return nil, err
}
return &RecordImpl{
payload: data,
commitFunc: ackFunc,
}, nil
}

func NewSchemaRecordImpl(payload []byte, schema string, ackFunc func()) *RecordImpl {
return &RecordImpl{
payload: payload,
Expand Down
Loading

0 comments on commit 94e81ae

Please sign in to comment.