Skip to content

Commit

Permalink
feat(websocket): add http handler controller
Browse files Browse the repository at this point in the history
Signed-off-by: Christian Stewart <[email protected]>
  • Loading branch information
paralin committed Jun 21, 2024
1 parent b4ba3d1 commit a5f272d
Show file tree
Hide file tree
Showing 7 changed files with 1,253 additions and 0 deletions.
2 changes: 2 additions & 0 deletions core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
udptpt "github.com/aperturerobotics/bifrost/transport/udp"
"github.com/aperturerobotics/bifrost/transport/webrtc"
wtpt "github.com/aperturerobotics/bifrost/transport/websocket"
wtpt_http "github.com/aperturerobotics/bifrost/transport/websocket/http"
"github.com/aperturerobotics/controllerbus/bus"
"github.com/aperturerobotics/controllerbus/controller/resolver/static"
cbc "github.com/aperturerobotics/controllerbus/core"
Expand Down Expand Up @@ -67,6 +68,7 @@ func AddFactories(b bus.Bus, sr *static.Resolver) {
sr.AddFactory(udptpt.NewFactory(b))
// websocket transport
sr.AddFactory(wtpt.NewFactory(b))
sr.AddFactory(wtpt_http.NewFactory(b))
// webrtc transport
sr.AddFactory(webrtc.NewFactory(b))

Expand Down
66 changes: 66 additions & 0 deletions transport/websocket/http/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package websocket_http

import (
"github.com/aperturerobotics/bifrost/peer"
"github.com/aperturerobotics/bifrost/transport"
"github.com/aperturerobotics/bifrost/util/confparse"
"github.com/aperturerobotics/controllerbus/config"
"github.com/pkg/errors"
)

// ConfigID is the string used to identify this config object.
const ConfigID = ControllerID

// Validate validates the configuration.
// This is a cursory validation to see if the values "look correct."
func (c *Config) Validate() error {
if _, err := c.ParseTransportPeerID(); err != nil {
return errors.Wrap(err, "transport_peer_id")
}

return nil
}

// GetConfigID returns the unique string for this configuration type.
// This string is stored with the encoded config.
func (c *Config) GetConfigID() string {
return ConfigID
}

// EqualsConfig checks if the other config is equal.
func (c *Config) EqualsConfig(c2 config.Config) bool {
return config.EqualsConfig(c, c2)
}

// ParseTransportPeerID parses the node peer ID if it is not empty.
func (c *Config) ParseTransportPeerID() (peer.ID, error) {
return confparse.ParsePeerID(c.GetTransportPeerId())
}

// SetTransportPeerId sets the node peer ID field.
func (c *Config) SetTransportPeerId(peerID string) {
c.TransportPeerId = peerID
}

// GetDebugVals returns the directive arguments as key/value pairs.
// This should be something like param1="test", param2="test".
// This is not necessarily unique, and is primarily intended for display.
func (c *Config) GetDebugVals() config.DebugValues {
vals := make(config.DebugValues)
if tp := c.GetTransportPeerId(); tp != "" {
vals["peer-id"] = []string{tp}
}
if hp := c.GetHttpPatterns(); len(hp) != 0 {
vals["http-patterns"] = hp
}
if hp := c.GetPeerHttpPatterns(); len(hp) != 0 {
vals["peer-http-patterns"] = hp
}
return vals
}

// _ is a type assertion
var _ transport.Config = ((*Config)(nil))

// _ is a type assertion
var _ config.Debuggable = ((*Config)(nil))
56 changes: 56 additions & 0 deletions transport/websocket/http/factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package websocket_http

import (
"context"

"github.com/aperturerobotics/controllerbus/bus"
"github.com/aperturerobotics/controllerbus/config"
"github.com/aperturerobotics/controllerbus/controller"
"github.com/blang/semver"
)

// Factory constructs a WebSocket transport.
type Factory struct {
// bus is the controller bus
bus bus.Bus
}

// NewFactory builds a transport factory.
func NewFactory(bus bus.Bus) *Factory {
return &Factory{bus: bus}
}

// GetConfigID returns the configuration ID for the controller.
func (t *Factory) GetConfigID() string {
return ConfigID
}

// GetControllerID returns the unique ID for the controller.
func (t *Factory) GetControllerID() string {
return ControllerID
}

// ConstructConfig constructs an instance of the controller configuration.
func (t *Factory) ConstructConfig() config.Config {
return &Config{}
}

// Construct constructs the associated controller given configuration.
func (t *Factory) Construct(
ctx context.Context,
conf config.Config,
opts controller.ConstructOpts,
) (controller.Controller, error) {
le := opts.GetLogger()
cc := conf.(*Config)

return NewWebSocketHttp(le, t.bus, cc)
}

// GetVersion returns the version of this controller.
func (t *Factory) GetVersion() semver.Version {
return Version
}

// _ is a type assertion
var _ controller.Factory = ((*Factory)(nil))
155 changes: 155 additions & 0 deletions transport/websocket/http/http.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
package websocket_http

import (
"context"
"net/http"

bifrost_http "github.com/aperturerobotics/bifrost/http"
"github.com/aperturerobotics/bifrost/transport"
transport_controller "github.com/aperturerobotics/bifrost/transport/controller"
"github.com/aperturerobotics/bifrost/transport/websocket"
"github.com/aperturerobotics/controllerbus/bus"
"github.com/aperturerobotics/controllerbus/controller"
"github.com/aperturerobotics/controllerbus/directive"
"github.com/blang/semver"
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/sirupsen/logrus"
)

// ControllerID is the WebSocket HTTP handler controller ID.
const ControllerID = "bifrost/websocket/http"

// Version is the version of the implementation.
var Version = semver.MustParse("0.0.1")

// NewWebSocketHttp builds a new WebSocket http handler controller.
type WebSocketHttp struct {
// Controller is the transport controller
*transport_controller.Controller
// mux is the serve mux
mux *http.ServeMux
}

// NewWebSocketHttp builds a new WebSocket http handler controller.
func NewWebSocketHttp(le *logrus.Entry, b bus.Bus, conf *Config) (*WebSocketHttp, error) {
peerID, err := conf.ParseTransportPeerID()
if err != nil {
return nil, err
}

mux := http.NewServeMux()
ctrl := &WebSocketHttp{mux: mux}

// Ensure no duplicate patterns (causes a panic in net/http)
seenPatterns := make(map[string]struct{}, len(conf.GetHttpPatterns())+len(conf.GetPeerHttpPatterns()))
checkPattern := func(httpPattern string) bool {
if len(httpPattern) == 0 {
return false
}

if _, ok := seenPatterns[httpPattern]; ok {
le.
WithField("http-pattern", httpPattern).
Warn("ignoring duplicate http pattern")
return false
}

seenPatterns[httpPattern] = struct{}{}
return true
}

for _, httpPattern := range conf.GetHttpPatterns() {
if checkPattern(httpPattern) {
mux.HandleFunc(httpPattern, ctrl.ServeWebSocketHTTP)
}
}

for _, peerHttpPattern := range conf.GetPeerHttpPatterns() {
if checkPattern(peerHttpPattern) {
mux.HandleFunc(peerHttpPattern, ctrl.ServePeerHTTP)
}
}

ctrl.Controller = transport_controller.NewController(
le,
b,
controller.NewInfo(ControllerID, Version, "bifrost websocket http handler"),
peerID,
func(
ctx context.Context,
le *logrus.Entry,
pkey crypto.PrivKey,
handler transport.TransportHandler,
) (transport.Transport, error) {
return websocket.NewWebSocket(
ctx,
le,
&websocket.Config{
TransportPeerId: peerID.String(),
Quic: conf.GetQuic(),
Dialers: conf.GetDialers(),
},
pkey,
handler,
)
},
)

return ctrl, nil
}

// ServeWebSocketHTTP serves the WebSocket on the HTTP response.
func (t *WebSocketHttp) ServeWebSocketHTTP(rw http.ResponseWriter, req *http.Request) {
// wait for the transport to be ready
tpt, err := t.GetTransport(req.Context())
if err != nil {
// This must be a context canceled error.
rw.WriteHeader(500)
return
}

// Call ServeHTTP
tpt.(*websocket.WebSocket).ServeHTTP(rw, req)
}

// ServePeerHTTP serves the peer ID as a string on the HTTP response.
func (t *WebSocketHttp) ServePeerHTTP(rw http.ResponseWriter, req *http.Request) {
// wait for the transport to be ready
tpt, err := t.GetTransport(req.Context())
if err != nil {
// This must be a context canceled error.
rw.WriteHeader(500)
return
}

rw.WriteHeader(200)
_, _ = rw.Write([]byte(tpt.GetPeerID().String()))
}

// HandleDirective asks if the handler can resolve the directive.
// If it can, it returns resolver(s). If not, returns nil.
// It is safe to add a reference to the directive during this call.
// The passed context is canceled when the directive instance expires.
// NOTE: the passed context is not canceled when the handler is removed.
func (t *WebSocketHttp) HandleDirective(ctx context.Context, di directive.Instance) ([]directive.Resolver, error) {
switch dir := di.GetDirective().(type) {
case bifrost_http.LookupHTTPHandler:
return t.ResolveLookupHTTPHandler(ctx, dir)
}

return t.Controller.HandleDirective(ctx, di)
}

// ResolveLookupHTTPHandler resolves the LookupHTTPHandler directive conditionally.
// Returns nil, nil if no handlers matched.
func (t *WebSocketHttp) ResolveLookupHTTPHandler(ctx context.Context, dir bifrost_http.LookupHTTPHandler) ([]directive.Resolver, error) {
handler, _ := bifrost_http.MatchServeMuxPattern(t.mux, dir)
if handler == nil {
return nil, nil
}

return directive.R(bifrost_http.NewLookupHTTPHandlerResolver(handler), nil)
}

// _ is a type assertion.
var _ transport.Controller = ((*WebSocketHttp)(nil))
Loading

0 comments on commit a5f272d

Please sign in to comment.