Skip to content

Commit

Permalink
feat: support stateful computing for external function (#190)
Browse files Browse the repository at this point in the history
  • Loading branch information
RobertIndie authored Oct 16, 2024
1 parent 3759584 commit 656d2d7
Show file tree
Hide file tree
Showing 26 changed files with 1,441 additions and 241 deletions.
98 changes: 69 additions & 29 deletions clients/gofs/gofs.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package gofs

import (
"context"
"encoding/json"
"fmt"
"os"
Expand Down Expand Up @@ -66,9 +67,9 @@ func NewFSClient() FSClient {

type moduleWrapper struct {
*fsClient
processFunc func([]byte) []byte // Only for Function
executeFunc func() error
initFunc func() error
processFunc func(context.Context, []byte) []byte // Only for Function
executeFunc func(context.Context) error
initFunc func(context.Context) error
registerErr error
}

Expand All @@ -90,39 +91,39 @@ func (c *fsClient) Register(module string, wrapper *moduleWrapper) FSClient {
return c
}

func Function[I any, O any](process func(*I) *O) *moduleWrapper {
processFunc := func(payload []byte) []byte {
func Function[I any, O any](process func(context.Context, *I) *O) *moduleWrapper {
processFunc := func(ctx context.Context, payload []byte) []byte {
input := new(I)
err := json.Unmarshal(payload, input)
if err != nil {
_, _ = fmt.Fprintf(os.Stderr, "Failed to parse JSON: %s %s\n", err, payload)
}
output := process(input)
output := process(ctx, input)
outputPayload, _ := json.Marshal(output)
return outputPayload
}
m := &moduleWrapper{}
m.initFunc = func() error {
m.initFunc = func(ctx context.Context) error {
outputSchema, err := avroschema.Reflect(new(O))
if err != nil {
return err
}
err = m.rpc.RegisterSchema(outputSchema)
err = m.rpc.RegisterSchema(ctx, outputSchema)
if err != nil {
return fmt.Errorf("failed to register schema: %w", err)
}
return nil
}
m.executeFunc = func() error {
m.executeFunc = func(ctx context.Context) error {
for {
inputPayload, err := m.rpc.Read()
inputPayload, err := m.rpc.Read(ctx)
if err != nil {
_, _ = fmt.Fprintf(os.Stderr, "Failed to read: %s\n", err)
time.Sleep(3 * time.Second)
continue
}
outputPayload := processFunc(inputPayload)
err = m.rpc.Write(outputPayload)
outputPayload := processFunc(ctx, inputPayload)
err = m.rpc.Write(ctx, outputPayload)
if err != nil {
_, _ = fmt.Fprintf(os.Stderr, "Failed to write: %s\n", err)
}
Expand All @@ -132,65 +133,98 @@ func Function[I any, O any](process func(*I) *O) *moduleWrapper {
return m
}

func Source[O any](process func(emit func(*O) error)) *moduleWrapper {
func Source[O any](process func(ctx context.Context, emit func(context.Context, *O) error)) *moduleWrapper {
m := &moduleWrapper{}
emit := func(event *O) error {
emit := func(ctx context.Context, event *O) error {
outputPayload, _ := json.Marshal(event)
return m.rpc.Write(outputPayload)
return m.rpc.Write(ctx, outputPayload)
}
m.initFunc = func() error {
m.initFunc = func(ctx context.Context) error {
outputSchema, err := avroschema.Reflect(new(O))
if err != nil {
return err
}
err = m.rpc.RegisterSchema(outputSchema)
err = m.rpc.RegisterSchema(ctx, outputSchema)
if err != nil {
return fmt.Errorf("failed to register schema: %w", err)
}
return nil
}
m.executeFunc = func() error {
process(emit)
m.executeFunc = func(ctx context.Context) error {
process(ctx, emit)
return nil
}
return m
}

func Sink[I any](process func(*I)) *moduleWrapper {
processFunc := func(payload []byte) {
func Sink[I any](process func(context.Context, *I)) *moduleWrapper {
processFunc := func(ctx context.Context, payload []byte) {
input := new(I)
err := json.Unmarshal(payload, input)
if err != nil {
_, _ = fmt.Fprintf(os.Stderr, "Failed to parse JSON: %s %s\n", err, payload)
}
process(input)
process(ctx, input)
}
m := &moduleWrapper{}
m.initFunc = func() error {
m.initFunc = func(ctx context.Context) error {
inputSchema, err := avroschema.Reflect(new(I))
if err != nil {
return err
}
err = m.rpc.RegisterSchema(inputSchema)
err = m.rpc.RegisterSchema(ctx, inputSchema)
if err != nil {
return fmt.Errorf("failed to register schema: %w", err)
}
return nil
}
m.executeFunc = func() error {
m.executeFunc = func(ctx context.Context) error {
for {
inputPayload, err := m.rpc.Read()
inputPayload, err := m.rpc.Read(ctx)
if err != nil {
_, _ = fmt.Fprintf(os.Stderr, "Failed to read: %s\n", err)
time.Sleep(3 * time.Second)
continue
}
processFunc(inputPayload)
processFunc(ctx, inputPayload)
}
}
return m
}

func Custom(init func(ctx context.Context) error, execute func(ctx context.Context) error) *moduleWrapper {
return &moduleWrapper{
initFunc: init,
executeFunc: execute,
}
}

type FunctionContext struct {
c *fsClient
}

func (c *FunctionContext) GetState(ctx context.Context, key string) ([]byte, error) {
return c.c.rpc.GetState(ctx, key)
}

func (c *FunctionContext) PutState(ctx context.Context, key string, value []byte) error {
return c.c.rpc.PutState(ctx, key, value)
}

func (c *FunctionContext) Write(ctx context.Context, payload []byte) error {
return c.c.rpc.Write(ctx, payload)
}

func (c *FunctionContext) Read(ctx context.Context) ([]byte, error) {
return c.c.rpc.Read(ctx)
}

type funcCtxKey struct{}

func GetFunctionContext(ctx context.Context) *FunctionContext {
return ctx.Value(funcCtxKey{}).(*FunctionContext)
}

func (c *fsClient) Run() error {
if c.err != nil {
return c.err
Expand All @@ -203,13 +237,19 @@ func (c *fsClient) Run() error {
c.state = StateRunning
c.registerMu.Unlock()

funcName := os.Getenv(FSFunctionName)
if funcName == "" {
return fmt.Errorf("%s is not set", FSFunctionName)
}
funcCtx := &FunctionContext{c: c}
if c.rpc == nil {
rpc, err := newFSRPCClient()
if err != nil {
return err
}
c.rpc = rpc
}
ctx := c.rpc.GetContext(context.WithValue(context.Background(), funcCtxKey{}, funcCtx), funcName)
module := os.Getenv(FSModuleName)
if module == "" {
module = DefaultModule
Expand All @@ -219,15 +259,15 @@ func (c *fsClient) Run() error {
return fmt.Errorf("module %s not found", module)
}
m.fsClient = c
err := m.initFunc()
err := m.initFunc(ctx)
if err != nil {
return err
}
c.rpc.loadModule(m)
if c.rpc.skipExecuting() {
return nil
}
return m.executeFunc()
return m.executeFunc(ctx)
}

func (c *fsClient) Error() string {
Expand Down
53 changes: 40 additions & 13 deletions clients/gofs/gofs_socket.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"context"
"fmt"
"os"
"strings"

"github.com/functionstream/function-stream/fs/runtime/external/model"
"google.golang.org/grpc"
Expand All @@ -31,7 +32,6 @@ import (
)

type fsRPCClient struct {
ctx context.Context
grpcCli model.FunctionClient
}

Expand All @@ -40,10 +40,6 @@ func newFSRPCClient() (*fsRPCClient, error) {
if socketPath == "" {
return nil, fmt.Errorf("%s is not set", FSSocketPath)
}
funcName := os.Getenv(FSFunctionName)
if funcName == "" {
return nil, fmt.Errorf("%s is not set", FSFunctionName)
}

serviceConfig := `{
"methodConfig": [{
Expand All @@ -66,37 +62,68 @@ func newFSRPCClient() (*fsRPCClient, error) {
return nil, err
}
client := model.NewFunctionClient(conn)
return &fsRPCClient{grpcCli: client}, nil
}

func (c *fsRPCClient) GetContext(parent context.Context, funcName string) context.Context {
md := metadata.New(map[string]string{
"name": funcName,
})
ctx := metadata.NewOutgoingContext(context.Background(), md)
return &fsRPCClient{grpcCli: client, ctx: ctx}, nil
return metadata.NewOutgoingContext(parent, md)
}

func (c *fsRPCClient) RegisterSchema(schema string) error {
_, err := c.grpcCli.RegisterSchema(c.ctx, &model.RegisterSchemaRequest{Schema: schema})
func (c *fsRPCClient) RegisterSchema(ctx context.Context, schema string) error {
_, err := c.grpcCli.RegisterSchema(ctx, &model.RegisterSchemaRequest{Schema: schema})
if err != nil {
return fmt.Errorf("failed to register schema: %w", err)
}
return nil
}

func (c *fsRPCClient) Write(payload []byte) error {
_, err := c.grpcCli.Write(c.ctx, &model.Event{Payload: payload})
func (c *fsRPCClient) Write(ctx context.Context, payload []byte) error {
_, err := c.grpcCli.Write(ctx, &model.Event{Payload: payload})
if err != nil {
return fmt.Errorf("failed to write: %w", err)
}
return nil
}

func (c *fsRPCClient) Read() ([]byte, error) {
res, err := c.grpcCli.Read(c.ctx, &model.ReadRequest{})
func (c *fsRPCClient) Read(ctx context.Context) ([]byte, error) {
res, err := c.grpcCli.Read(ctx, &model.ReadRequest{})
if err != nil {
return nil, fmt.Errorf("failed to read: %w", err)
}
return res.Payload, nil
}

func (c *fsRPCClient) PutState(ctx context.Context, key string, value []byte) error {
_, err := c.grpcCli.PutState(ctx, &model.PutStateRequest{Key: key, Value: value})
if err != nil {
return err
}
return nil
}

func (c *fsRPCClient) GetState(ctx context.Context, key string) ([]byte, error) {
res, err := c.grpcCli.GetState(ctx, &model.GetStateRequest{Key: key})
if err != nil {
return nil, err
}
return res.Value, nil
}

func (c *fsRPCClient) ListStates(ctx context.Context, path string) ([]string, error) {
path = strings.TrimSuffix(path, "/")
startInclusive := path + "/"
endExclusive := path + "//"
res, err := c.grpcCli.ListStates(ctx, &model.ListStatesRequest{StartInclusive: startInclusive,
EndExclusive: endExclusive})
if err != nil {
return nil, err
}
return res.Keys, nil
}

func (c *fsRPCClient) loadModule(_ *moduleWrapper) {
// no-op
}
Expand Down
25 changes: 19 additions & 6 deletions clients/gofs/gofs_wasmfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package gofs

import "C"
import (
"context"
"fmt"
"os"
"syscall"
Expand All @@ -41,7 +42,7 @@ func process() {
if runningModule == nil {
panic("no module loaded")
}
err := runningModule.executeFunc()
err := runningModule.executeFunc(context.Background())
if err != nil {
fmt.Fprintf(os.Stderr, "error: %v\n", err)
}
Expand All @@ -54,35 +55,47 @@ func newFSRPCClient() (*fsRPCClient, error) {
return &fsRPCClient{}, nil
}

func (c *fsRPCClient) RegisterSchema(schema string) error {
func (c *fsRPCClient) GetContext(parent context.Context, funcName string) context.Context {
return context.Background()
}

func (c *fsRPCClient) RegisterSchema(ctx context.Context, schema string) error {
_, err := syscall.Write(registerSchemaFd, []byte(schema))
if err != nil {
return fmt.Errorf("failed to register schema: %w", err)
}
return nil
}

func (c *fsRPCClient) Write(payload []byte) error {
func (c *fsRPCClient) Write(ctx context.Context, payload []byte) error {
panic("rpc write not implemented")
}

func (c *fsRPCClient) Read() ([]byte, error) {
func (c *fsRPCClient) Read(ctx context.Context) ([]byte, error) {
panic("rpc read not implemented")
}

func (c *fsRPCClient) GetState(ctx context.Context, key string) ([]byte, error) {
panic("rpc get state not implemented")
}

func (c *fsRPCClient) PutState(ctx context.Context, key string, value []byte) error {
panic("rpc put state not implemented")
}

func (c *fsRPCClient) loadModule(m *moduleWrapper) {
if m.processFunc == nil {
panic("only function module is supported for the wasm runtime")
}
m.executeFunc = func() error {
m.executeFunc = func(ctx context.Context) error {
var stat syscall.Stat_t
syscall.Fstat(processFd, &stat)
payload := make([]byte, stat.Size)
_, err := syscall.Read(processFd, payload)
if err != nil {
return fmt.Errorf("failed to read: %w", err)
}
outputPayload := m.processFunc(payload)
outputPayload := m.processFunc(ctx, payload)
_, err = syscall.Write(processFd, outputPayload)
if err != nil {
return fmt.Errorf("failed to write: %w", err)
Expand Down
Loading

0 comments on commit 656d2d7

Please sign in to comment.