Skip to content

Commit

Permalink
Simple Message Broker (#50)
Browse files Browse the repository at this point in the history
  • Loading branch information
AndrewSC208 authored Nov 7, 2024
1 parent 06e2feb commit 6ec5f27
Show file tree
Hide file tree
Showing 64 changed files with 15,136 additions and 306 deletions.
32 changes: 32 additions & 0 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,38 @@
"DRAFT_CONFIG": "tests/blueprint/node_3.yaml"
},
},
{
"name": "Catalyst",
"type": "go",
"request": "launch",
"mode": "auto",
"program": "services/core/catalyst/main.go",
"cwd": "${workspaceFolder}",
"debugAdapter": "dlv-dap",
"env": {
"DRAFT_CONFIG": "tests/catalyst/config.yaml"
},
},
{
"name": "dctl - Produce",
"type": "go",
"request": "launch",
"mode": "auto",
"program": "tools/dctl/main.go",
"cwd": "${workspaceFolder}",
"debugAdapter": "dlv-dap",
"args": ["broker", "produce"],
},
{
"name": "dctl - Consume",
"type": "go",
"request": "launch",
"mode": "auto",
"program": "tools/dctl/main.go",
"cwd": "${workspaceFolder}",
"debugAdapter": "dlv-dap",
"args": ["broker", "consume"],
},
{
"name": "Fuse",
"type": "go",
Expand Down
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ Draft is a framework for easily building reliable, efficient, and scalable distr

Blueprint is a core service that provides both a service registry and a generic key/value store for dynamic service configuration. All processes in a Draft cluster register themselves with Blueprint at startup so that Blueprint can manage all service configuration and provide a single view into the status of the cluster.

### Catalyst
Catalyst is a core service that acts as the primary message broker, and actor system. Services, and clients can `Produce` and `Consume` [CloudEvent](https://cloudevents.io/) messages. Currently a simple `Broadcast` message delivery is implemented so each `Consumer` of the same channel will receive the message at the same time.

### Fuse

Fuse is a core service that enabled routing between Draft processes as well as ingress into the cluster. It manages an installation of [Envoy](https://www.envoyproxy.io/) to route requests from clients to services.
Expand Down
10 changes: 5 additions & 5 deletions services/core/blueprint/web-client/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion services/core/blueprint/web-client/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
"@mui/material": "^5.14.20",
"@reduxjs/toolkit": "^2.0.1",
"antd": "^5.21.5",
"api": "https://gitpkg.vercel.app/steady-bytes/draft/api?main",
"api": "https://gitpkg.vercel.app/steady-bytes/draft/api?api/v0.6.0",
"localforage": "^1.10.0",
"match-sorter": "^6.3.1",
"react": "^18.2.0",
Expand Down
1 change: 1 addition & 0 deletions services/core/catalyst/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# Catalyst
97 changes: 97 additions & 0 deletions services/core/catalyst/broker/atomicMap.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package broker

import (
"encoding/base64"
"sync"

"connectrpc.com/connect"
acv1 "github.com/steady-bytes/draft/api/core/message_broker/actors/v1"
)

type (
atomicMap struct {
mu sync.RWMutex
// Store the routine client connection
m map[string][]*connect.ServerStream[acv1.ConsumeResponse]
// yield the client connection to a thread, and then send events to it
n map[string]chan *acv1.CloudEvent
}
)

func newAtomicMap() *atomicMap {
return &atomicMap{
mu: sync.RWMutex{},
m: make(map[string][]*connect.ServerStream[acv1.ConsumeResponse]),
n: make(map[string]chan *acv1.CloudEvent),
}
}

// hash to calculate the same key for two strings
func (am *atomicMap) hash(msgKindName string) string {
bs := []byte(msgKindName)
return base64.StdEncoding.EncodeToString(bs)
}

func (am *atomicMap) Insert(key string, resStream *connect.ServerStream[acv1.ConsumeResponse]) {
// TODO: Figure out how to start with a read lock?
am.mu.RLock()
defer am.mu.RUnlock()
list, ok := am.m[key]
if !ok {
am.mu.RUnlock()
am.mu.Lock()
defer am.mu.Unlock()
var list []*connect.ServerStream[acv1.ConsumeResponse]
list = append(list, resStream)
am.m[key] = list
} else {
list = append(list, resStream)
am.m[key] = list
}
}

func (am *atomicMap) Broker(key string, resStream *connect.ServerStream[acv1.ConsumeResponse]) {
am.mu.RLock()
ch, found := am.n[key]
if !found {
// create the channel to add to map
ch := make(chan *acv1.CloudEvent)
// store channel in map for future connections
am.mu.RUnlock()
am.mu.Lock()
am.n[key] = ch
am.mu.Unlock()
// now start a new routine and keep it open as long as the `ch` channel has connected clients
go am.send(ch, resStream)

return
} else {
// the channel is already made and shared with other consumers, and producers so we can just use `ch`
go am.send(ch, resStream)
am.mu.RUnlock()
}
}

func (am *atomicMap) send(ch chan *acv1.CloudEvent, stream *connect.ServerStream[acv1.ConsumeResponse]) {
// when the channel receives a message send to the stream the client is holding onto
for {
m := <-ch
msg := &acv1.ConsumeResponse{
Message: m,
}
stream.Send(msg)
}
}

func (am *atomicMap) Broadcast(key string, msg *acv1.CloudEvent) {
ch, ok := am.n[key]
if ok {
ch <- msg
} else {
// we don't have any consumers that will listen to the message so as of right now
// the message is not worth sending

// TODO: We might consider a dead letter queue
return
}
}
35 changes: 35 additions & 0 deletions services/core/catalyst/broker/consumer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package broker

import (
"context"

"connectrpc.com/connect"
acv1 "github.com/steady-bytes/draft/api/core/message_broker/actors/v1"
)

type (
Consumer interface {
Consume(ctx context.Context, msg *acv1.CloudEvent, stream *connect.ServerStream[acv1.ConsumeResponse]) error
}

consumer struct {
consumerRegistrationChan chan register
}
)

func NewConsumer(consumerRegistrationChan chan register) Consumer {
return &consumer{
consumerRegistrationChan: consumerRegistrationChan,
}

}

func (c *consumer) Consume(ctx context.Context, msg *acv1.CloudEvent, stream *connect.ServerStream[acv1.ConsumeResponse]) error {
// fling the consumer stream into the controller
c.consumerRegistrationChan <- register{
CloudEvent: msg,
ServerStream: stream,
}

return nil
}
108 changes: 108 additions & 0 deletions services/core/catalyst/broker/controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package broker

import (
"connectrpc.com/connect"
acv1 "github.com/steady-bytes/draft/api/core/message_broker/actors/v1"
"github.com/steady-bytes/draft/pkg/chassis"
)

type (
Controller interface {
Consumer
Producer
}

controller struct {
Producer
Consumer

logger chassis.Logger

state *atomicMap
}

register struct {
*acv1.CloudEvent
*connect.ServerStream[acv1.ConsumeResponse]
}
)

func NewController(logger chassis.Logger) Controller {
var (
producerMsgChan = make(chan *acv1.CloudEvent)
consumerRegistrationChan = make(chan register)
)

ctr := &controller{
NewProducer(producerMsgChan),
NewConsumer(consumerRegistrationChan),
logger,
newAtomicMap(),
}

// TODO: This could contain more configuration. Like maybe reading the number
// of cpu cores to spread the works over?

go ctr.produce(producerMsgChan)
go ctr.consume(consumerRegistrationChan)

return ctr
}

const (
LOG_KEY_TO_CH = "key to channel"
)

func (c *controller) produce(producerMsgChan chan *acv1.CloudEvent) {
for {
msg := <-producerMsgChan
c.logger.WithField("msg: ", msg).Info("produce massage received")

// make hash of <domain><msg.Type.String>
key := c.state.hash(string(msg.ProtoReflect().Descriptor().FullName()))

// do I save to blueprint?
// - default config is to be durable
// - the producer can also add configuration to say not to store

// send the received `Message` to all `Consumers` for the same key
c.logger.WithField("key", key).Info(LOG_KEY_TO_CH)
c.state.Broadcast(key, msg)
}
}

// consume - Will create a hash of the message domain, and typeUrl then save the msg.ServerStream to `atomicMap.m`
// that can be used to `Broadcast` messages to when a message is produced. Con's to this approach are a `RWMutex`
// has to be used to `Broadcast` the message so the connected stream.
// func (c *controller) consume(reg chan register) {
// for {
// msg := <-reg
// fmt.Print("Receive a request to setup a consumer", msg)

// // make hash of <domain><msg.Type.String>
// key := c.hash(msg.GetDomain(), msg.GetKind().GetTypeUrl())

// // use hash as key if the hash does not exist, then create a slice of connections
// // and append the connection to the slice
// c.state.Insert(key, msg.ServerStream)
// }
// }

// consume - Will create a hash of the message domain, and typeUrl to use as a key to a tx, and rx sides of a channel
// the `tx` or transmitter will be used when a producer produces an event to send the event to each client that is consuming
// events of the domain, and typeUrl.
func (c *controller) consume(registerChan chan register) {
for {
// create a shared channel that will receive any kind of message of that domain, and typeUrl
// add the receiver to a go routine that will keep the `ServerStream` open and send any messages
// received up to the client connect.
msg := <-registerChan
c.logger.WithField("msg", msg).Info("consume channel registration")

key := c.state.hash(string(msg.ProtoReflect().Descriptor().FullName()))

// key := c.state.hash(msg.GetDomain(), msg.GetKind().GetTypeUrl())
c.logger.WithField("key", key).Info(LOG_KEY_TO_CH)
c.state.Broker(key, msg.ServerStream)
}
}
47 changes: 47 additions & 0 deletions services/core/catalyst/broker/producer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package broker

import (
"context"
"errors"
"fmt"
"io"

acv1 "github.com/steady-bytes/draft/api/core/message_broker/actors/v1"

"connectrpc.com/connect"
)

type (
Producer interface {
Produce(ctx context.Context, inputStream *connect.BidiStream[acv1.ProduceRequest, acv1.ProduceResponse]) error
}

producer struct {
producerChan chan *acv1.CloudEvent
}
)

func NewProducer(produceChan chan *acv1.CloudEvent) Producer {
return &producer{
producerChan: produceChan,
}
}

// Accepts an incomming bidirectional stream to keep open and push incomming
// messages into the broker when a message is `produce`'ed
func (p *producer) Produce(ctx context.Context, inputStream *connect.BidiStream[acv1.ProduceRequest, acv1.ProduceResponse]) error {
for {
if err := ctx.Err(); err != nil {
return err
}

request, err := inputStream.Receive()
if err != nil && errors.Is(err, io.EOF) {
return nil
} else if err != nil {
return fmt.Errorf("receive request: %w", err)
}

p.producerChan <- request.GetMessage()
}
}
Loading

0 comments on commit 6ec5f27

Please sign in to comment.