Skip to content

Commit

Permalink
Split http adapter logic out of the broker (#29)
Browse files Browse the repository at this point in the history
Items left before this can be moved out of draft state:

- [x] `engine.go`: Determine the best way to pass along the context
around the target adapter to the HTTP server. Since it's already being
looked up in this code we shouldn't need to replicate the same code in
the HTTP server, instead we should probably just pass along the context
we found.
- [x] Code to be deleted out of the broker
- [x] `sub_mgr.go`: Is the implementation I wrote the best method to
connect a request out to the HTTP server?
- [x] `http server, http_client.go`: I need to get some context about
the target adapter for the request in the `SendEvent()` function. I need
to know the host, method, headers, ect...
- [x] `http server, http_client.go`: When sending the response back -
Will need to determine which is the correct timestamp to send back
- [x] Testing a full round trip -- Currently the code will, Request from
cURL -> http server -> broker -> quickstart component -> broker -> http
server (Dies)

Problem:

1. The broker was responsible for executing HTTP requests that originate
from components running the Kit SDK
2. Executing HTTP requests without a "/" in it was erroring out

Solution:

1. Copying the HTTP client code out of the broker and into the HTTP
server running in the cluster
2. Adding in a "/" if the host if empty

Testing:
Verified that running the changes allowed me to execute a full round
trip and receive a response:
```
{
  "args": {}, 
  "data": "", 
  "files": {}, 
  "form": {}, 
  "headers": {
    "Accept-Encoding": "gzip", 
    "Host": "httpbin.org", 
    "User-Agent": "Go-http-client/2.0", 
    "X-Amzn-Trace-Id": "Root=1-66663c6c-61460ac946f419545031d61b"
  }, 
  "json": null, 
  "method": "GET", 
  "origin": "98.225.110.56", 
  "url": "https://httpbin.org/anything"
}
```
  • Loading branch information
BryanFauble authored Jun 26, 2024
1 parent a854bd4 commit 2ef325d
Show file tree
Hide file tree
Showing 14 changed files with 213 additions and 87 deletions.
2 changes: 1 addition & 1 deletion api/kubernetes/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ type Adapter interface {

GetComponentType() api.ComponentType
Validate(data *api.Data) api.Problems
Resolve(data *api.Data) error
Resolve(data *api.Data) (any, error)
}

// +kubebuilder:object:generate=false
Expand Down
13 changes: 7 additions & 6 deletions api/kubernetes/v1alpha1/http_adapter_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,25 +112,26 @@ func (a *HTTPAdapter) Validate(data *api.Data) api.Problems {
return problems
}

func (a *HTTPAdapter) Resolve(data *api.Data) error {
func (a *HTTPAdapter) Resolve(data *api.Data) (any, error) {
tpl := a.getTemplate()

url, err := tpl.URL.Resolve(data, true)
if err != nil {
return err
return nil, err
}

headers := make(map[string]string, len(tpl.Headers))
for k, v := range tpl.Headers {
if headers[k], err = v.Resolve(data, true); err != nil {
return err
return nil, err
}
}
copy := a.Spec.DeepCopy()

a.Spec.URL = url
a.Spec.Headers = headers
copy.URL = url
copy.Headers = headers

return nil
return copy, nil
}

func (a *HTTPAdapter) getTemplate() *HTTPAdapterTemplate {
Expand Down
1 change: 1 addition & 0 deletions api/vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,7 @@ const (
ValKeyStatusCode = "statusCode"
ValKeyURL = "url"
ValKeyVaultURL = "vaultURL"
ValKeySpec = "spec"
)

// Headers and query params.
Expand Down
48 changes: 25 additions & 23 deletions components/broker/engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ type broker struct {
grpcSrv *GRPCServer

natsClient *NATSClient
httpClient *HTTPClient
k8sClient client.Client

healthSrv *brktel.HealthServer
Expand Down Expand Up @@ -114,7 +113,6 @@ func New() Engine {
}
brk.grpcSrv = NewGRPCServer(brk)
brk.natsClient = NewNATSClient(brk)
brk.httpClient = NewHTTPClient(brk)

return brk
}
Expand Down Expand Up @@ -388,28 +386,23 @@ func (brk *broker) routeEvent(ctx *BrokerEventContext) (err error) {
ctx.Log.Debugf("matched event to target '%s'", ctx.Event.Target.GroupKey())

sendSpan := routeSpan.StartChildSpan("Send Event")
if ctx.TargetAdapter != nil {
// TODO move http client to adapter
err = brk.httpClient.SendEvent(ctx)

} else {
sub, found := brk.subMgr.Subscription(ctx.Event.Target)
switch {
case found:
// Found component subscribed via gRPC.
sendSpan.Name = "Send gRPC event"
ctx.Log.Debug("subscription found, sending event with gRPC")
err = sub.SendEvent(ctx)

case ctx.Receiver != ReceiverNATS && ctx.Event.Target.BrokerId != brk.comp.Id:
// Component not found locally, send via NATS.
sendSpan.Name = "Send NATS event"
ctx.Log.Debug("subscription not found, sending event with nats")
err = brk.natsClient.Publish(ctx.Event.Target.Subject(), ctx.Event)
sub, found := brk.subMgr.Subscription(ctx.Event.Target)
switch {
case found:
// Found component subscribed via gRPC.
sendSpan.Name = "Send gRPC event"
ctx.Log.Debug("subscription found, sending event with gRPC")
err = sub.SendEvent(ctx)

default:
err = core.ErrComponentGone()
}
case ctx.Receiver != ReceiverNATS && ctx.Event.Target.BrokerId != brk.comp.Id:
// Component not found locally, send via NATS.
sendSpan.Name = "Send NATS event"
ctx.Log.Debug("subscription not found, sending event with nats")
err = brk.natsClient.Publish(ctx.Event.Target.Subject(), ctx.Event)

default:
err = core.ErrComponentGone()
}
sendSpan.End(err)

Expand Down Expand Up @@ -480,9 +473,18 @@ func (brk *broker) findTarget(ctx *BrokerEventContext) (err error) {
if ctx.TargetAdapter, err = brk.store.Adapter(ctx, ctx.Event.Target.Name, typ); err != nil {
return err
}
if err := ctx.TargetAdapter.Resolve(ctx.Data); err != nil {

spec, err := ctx.TargetAdapter.Resolve(ctx.Data)
if err != nil {
return err
}
ctx.Event.SetSpec(spec)

sub, found := brk.subMgr.Adapter(ctx.TargetAdapter.GetComponentType())
if found && sub != nil && sub.IsActive() {
ctx.Event.Target.Hash = sub.ShortHash()
ctx.Event.Target.Name = sub.Name()
}

return nil
}
Expand Down
49 changes: 38 additions & 11 deletions components/broker/engine/sub_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type SubscriptionMgr interface {
Create(ctx context.Context, cfg *SubscriptionConf) (ReplicaSubscription, GroupSubscription, error)
Subscription(comp *core.Component) (Subscription, bool)
Close()
Adapter(componentType api.ComponentType) (GroupSubscription, bool)
}

type Subscription interface {
Expand All @@ -33,6 +34,8 @@ type Subscription interface {

type GroupSubscription interface {
Subscription
ShortHash() string
Name() string
}

type ReplicaSubscription interface {
Expand All @@ -53,17 +56,20 @@ type SubscriptionConf struct {
}

type subscriptionMgr struct {
subMap map[string]*subscription
grpMap map[string]*groupSubscription
subMap map[string]*subscription
adapterMap map[api.ComponentType]*groupSubscription
grpMap map[string]*groupSubscription

mutex sync.RWMutex

log *logkf.Logger
}

type groupSubscription struct {
subMap map[string]bool
sendCh chan *evtRespCh
shortHash string
name string
subMap map[string]bool
sendCh chan *evtRespCh

ctx context.Context
cancel context.CancelFunc
Expand Down Expand Up @@ -94,12 +100,18 @@ type sendResp struct {

func NewManager() SubscriptionMgr {
return &subscriptionMgr{
subMap: make(map[string]*subscription),
grpMap: make(map[string]*groupSubscription),
log: logkf.Global,
subMap: make(map[string]*subscription),
adapterMap: make(map[api.ComponentType]*groupSubscription),
grpMap: make(map[string]*groupSubscription),
log: logkf.Global,
}
}

func (mgr *subscriptionMgr) Adapter(componentType api.ComponentType) (GroupSubscription, bool) {
mapValue := mgr.adapterMap[componentType]
return mapValue, mapValue != nil
}

func (mgr *subscriptionMgr) Create(ctx context.Context, cfg *SubscriptionConf) (ReplicaSubscription, GroupSubscription, error) {
switch {
case cfg.Component == nil:
Expand All @@ -126,10 +138,12 @@ func (mgr *subscriptionMgr) Create(ctx context.Context, cfg *SubscriptionConf) (
if !found {
ctx, cancel := context.WithCancel(context.Background())
s = &groupSubscription{
subMap: make(map[string]bool),
sendCh: make(chan *evtRespCh),
ctx: ctx,
cancel: cancel,
shortHash: cfg.Component.ShortHash(),
name: cfg.Component.Name,
subMap: make(map[string]bool),
sendCh: make(chan *evtRespCh),
ctx: ctx,
cancel: cancel,
}
mgr.grpMap[cfg.Component.GroupKey()] = s
}
Expand All @@ -150,6 +164,11 @@ func (mgr *subscriptionMgr) Create(ctx context.Context, cfg *SubscriptionConf) (
if grpSub != nil {
sub.sendCh = grpSub.sendCh
go sub.processSendChan()

componentType := api.ComponentType(cfg.Component.Type)
if componentType.IsAdapter() {
mgr.adapterMap[componentType] = grpSub
}
}
mgr.subMap[cfg.Component.Id] = sub

Expand Down Expand Up @@ -242,6 +261,14 @@ func (mgr *subscriptionMgr) cancel(sub *subscription, err error) {
delete(mgr.subMap, sub.comp.Id)
}

func (sub *groupSubscription) ShortHash() string {
return sub.shortHash
}

func (sub *groupSubscription) Name() string {
return sub.name
}

func (grp *groupSubscription) SendEvent(evt *BrokerEventContext) error {
respCh := make(chan *sendResp)
grp.sendCh <- &evtRespCh{mEvt: evt, respCh: respCh}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
//
// SPDX-License-Identifier: MPL-2.0

package engine
package adapter

import (
"context"
Expand All @@ -21,8 +21,8 @@ import (

"github.com/xigxog/kubefox/api"
"github.com/xigxog/kubefox/api/kubernetes/v1alpha1"
"github.com/xigxog/kubefox/components/broker/config"
"github.com/xigxog/kubefox/core"
"github.com/xigxog/kubefox/grpc"
"github.com/xigxog/kubefox/logkf"
)

Expand All @@ -32,12 +32,12 @@ type HTTPClient struct {
secureTransport *http.Transport
insecureTransport *http.Transport

brk Broker
brk *grpc.Client

log *logkf.Logger
}

func NewHTTPClient(brk Broker) *HTTPClient {
func NewHTTPClient(brk *grpc.Client) *HTTPClient {
clients := make(map[string]*http.Client, 7)

// TODO support live refresh of root ca
Expand Down Expand Up @@ -93,18 +93,17 @@ func NewHTTPClient(brk Broker) *HTTPClient {
}
}

func (c *HTTPClient) SendEvent(req *BrokerEventContext) error {
if req.TargetAdapter == nil {
return core.ErrInvalid(fmt.Errorf("adapter is missing"))
}
adapter, ok := req.TargetAdapter.(*v1alpha1.HTTPAdapter)
if !ok {
return core.ErrInvalid(fmt.Errorf("adapter is not HTTPAdapter"))
}

ctx, cancel := context.WithTimeout(context.Background(), req.TTL())
func (c *HTTPClient) SendEvent(req *grpc.ComponentEvent) error {
ctx, cancel := context.WithTimeout(context.Background(), req.Event.TTL())
log := c.log.WithEvent(req.Event)

adapter := &v1alpha1.HTTPAdapter{}

if err := req.Event.Spec(&adapter.Spec); err != nil {
cancel()
return core.ErrInvalid(fmt.Errorf("error parsing adapter spec: %v", err))
}

httpReq, err := req.Event.HTTPRequest(ctx)
if err != nil {
cancel()
Expand All @@ -115,6 +114,9 @@ func (c *HTTPClient) SendEvent(req *BrokerEventContext) error {
return core.ErrInvalid(fmt.Errorf("error parsing adapter url: %v", err))

} else {
if adapterURL.Path == "" {
adapterURL.Path = "/"
}
adapterURL = adapterURL.JoinPath(httpReq.URL.EscapedPath())

httpReq.Host = adapterURL.Host
Expand Down Expand Up @@ -146,9 +148,9 @@ func (c *HTTPClient) SendEvent(req *BrokerEventContext) error {
comp := core.NewPlatformComponent(
api.ComponentTypeHTTPAdapter,
req.Event.Target.Name,
c.brk.Component().Hash,
c.brk.Component.Hash,
)
comp.Id, comp.BrokerId = c.brk.Component().Id, c.brk.Component().BrokerId
comp.Id, comp.BrokerId = c.brk.Component.Id, c.brk.Component.BrokerId

resp := core.NewResp(core.EventOpts{
Parent: req.Event,
Expand All @@ -163,7 +165,7 @@ func (c *HTTPClient) SendEvent(req *BrokerEventContext) error {
if httpResp, err := c.adapterClient(adapter).Do(httpReq); err != nil {
reqErr = core.ErrUnexpected(fmt.Errorf("http request failed: %v", err))
} else {
reqErr = resp.SetHTTPResponse(httpResp, config.MaxEventSize)
reqErr = resp.SetHTTPResponse(httpResp, MaxEventSize)
}
if reqErr != nil {
if !errors.Is(reqErr, &core.Err{}) {
Expand All @@ -172,10 +174,10 @@ func (c *HTTPClient) SendEvent(req *BrokerEventContext) error {
resp.Type = string(api.EventTypeError)
resp.SetJSON(reqErr)

log.Debug(err)
log.Debug(reqErr)
}

c.brk.RecvEvent(resp, ReceiverHTTPClient)
c.brk.SendResp(resp, req.ReceivedAt)
}()

return nil
Expand Down
Loading

0 comments on commit 2ef325d

Please sign in to comment.