Skip to content

Commit

Permalink
Merge pull request #1419 from kaleido-io/websocket-namespaced
Browse files Browse the repository at this point in the history
namespace scoped web sockets
  • Loading branch information
nguyer authored Oct 20, 2023
2 parents 13bf4e1 + f6cbfe1 commit f92b532
Show file tree
Hide file tree
Showing 8 changed files with 327 additions and 21 deletions.
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ $(eval $(call makemock, internal/operations, Manager, operat
$(eval $(call makemock, internal/multiparty, Manager, multipartymocks))
$(eval $(call makemock, internal/apiserver, FFISwaggerGen, apiservermocks))
$(eval $(call makemock, internal/apiserver, Server, apiservermocks))
$(eval $(call makemock, internal/events/websockets, WebSocketsNamespaced, websocketsmocks))

firefly-nocgo: ${GOFILES}
CGO_ENABLED=0 $(VGO) build -o ${BINARY_NAME}-nocgo -ldflags "-X main.buildDate=$(DATE) -X main.buildVersion=$(BUILD_VERSION) -X 'github.com/hyperledger/firefly/cmd.BuildVersionOverride=$(BUILD_VERSION)' -X 'github.com/hyperledger/firefly/cmd.BuildDate=$(DATE)' -X 'github.com/hyperledger/firefly/cmd.BuildCommit=$(GIT_REF)'" -tags=prod -tags=prod -v
Expand Down
20 changes: 20 additions & 0 deletions internal/apiserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,9 @@ func (as *apiServer) createMuxRouter(ctx context.Context, mgr namespace.Manager)
ws.(*websockets.WebSockets).SetAuthorizer(mgr)
r.HandleFunc(`/ws`, ws.(*websockets.WebSockets).ServeHTTP)

// namespace scoped web sockets
r.HandleFunc("/api/v1/namespaces/{ns}/ws", hf.APIWrapper(getNamespacedWebSocketHandler(ws.(*websockets.WebSockets), mgr)))

uiPath := config.GetString(coreconfig.UIPath)
if uiPath != "" && config.GetBool(coreconfig.UIEnabled) {
r.PathPrefix(`/ui`).Handler(newStaticHandler(uiPath, "index.html", `/ui`))
Expand All @@ -394,6 +397,23 @@ func (as *apiServer) createMuxRouter(ctx context.Context, mgr namespace.Manager)
return r
}

func getNamespacedWebSocketHandler(ws websockets.WebSocketsNamespaced, mgr namespace.Manager) ffapi.HandlerFunction {
return func(res http.ResponseWriter, req *http.Request) (status int, err error) {

vars := mux.Vars(req)
namespace := vars["ns"]
or, err := mgr.Orchestrator(req.Context(), namespace, false)
if err != nil || or == nil {
return 404, i18n.NewError(req.Context(), coremsgs.Msg404NotFound)
}

ws.ServeHTTPNamespaced(namespace, res, req)

return 200, nil
}

}

func (as *apiServer) notFoundHandler(res http.ResponseWriter, req *http.Request) (status int, err error) {
res.Header().Add("Content-Type", "application/json")
return 404, i18n.NewError(req.Context(), coremsgs.Msg404NotFound)
Expand Down
36 changes: 36 additions & 0 deletions internal/apiserver/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"mime/multipart"
Expand All @@ -43,6 +44,7 @@ import (
"github.com/hyperledger/firefly/mocks/namespacemocks"
"github.com/hyperledger/firefly/mocks/orchestratormocks"
"github.com/hyperledger/firefly/mocks/spieventsmocks"
"github.com/hyperledger/firefly/mocks/websocketsmocks"
"github.com/hyperledger/firefly/pkg/core"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
Expand Down Expand Up @@ -513,3 +515,37 @@ func TestGetOrchestratorMissingTag(t *testing.T) {
_, err := getOrchestrator(context.Background(), &namespacemocks.Manager{}, "", nil)
assert.Regexp(t, "FF10437", err)
}

func TestGetNamespacedWebSocketHandler(t *testing.T) {
mgr, _, _ := newTestServer()
mwsns := &websocketsmocks.WebSocketsNamespaced{}
mwsns.On("ServeHTTPNamespaced", "ns1", mock.Anything, mock.Anything).Return()

var b bytes.Buffer
req := httptest.NewRequest("GET", "/api/v1/namespaces/ns1/ws", &b)
req = mux.SetURLVars(req, map[string]string{"ns": "ns1"})
req.Header.Set("Content-Type", "application/json; charset=utf-8")
res := httptest.NewRecorder()

handler := getNamespacedWebSocketHandler(mwsns, mgr)
status, err := handler(res, req)
assert.NoError(t, err)
assert.Equal(t, 200, status)
}

func TestGetNamespacedWebSocketHandlerUnknownNamespace(t *testing.T) {
mgr, _, _ := newTestServer()
mwsns := &websocketsmocks.WebSocketsNamespaced{}

mgr.On("Orchestrator", mock.Anything, "unknown", false).Return(nil, errors.New("unknown namespace")).Maybe()
var b bytes.Buffer
req := httptest.NewRequest("GET", "/api/v1/namespaces/unknown/ws", &b)
req = mux.SetURLVars(req, map[string]string{"ns": "unknown"})
req.Header.Set("Content-Type", "application/json; charset=utf-8")
res := httptest.NewRecorder()

handler := getNamespacedWebSocketHandler(mwsns, mgr)
status, err := handler(res, req)
assert.Error(t, err)
assert.Equal(t, 404, status)
}
1 change: 1 addition & 0 deletions internal/coremsgs/en_error_messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,4 +300,5 @@ var (
MsgTokensRESTErrConflict = ffe("FF10459", "Conflict from tokens service: %s", 409)
MsgBatchWithDataNotSupported = ffe("FF10460", "Provided subscription '%s' enables batching and withData which is not supported", 400)
MsgBatchDeliveryNotSupported = ffe("FF10461", "Batch delivery not supported by transport '%s'", 400)
MsgWSWrongNamespace = ffe("FF10462", "Websocket request received on a namespace scoped connection but the provided namespace does not match")
)
69 changes: 50 additions & 19 deletions internal/events/websockets/websocket_connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,23 +38,25 @@ type websocketStartedSub struct {
}

type websocketConnection struct {
ctx context.Context
ws *WebSockets
wsConn *websocket.Conn
cancelCtx func()
connID string
sendMessages chan interface{}
senderDone chan struct{}
receiverDone chan struct{}
autoAck bool
started []*websocketStartedSub
inflight []*core.EventDeliveryResponse
mux sync.Mutex
closed bool
remoteAddr string
userAgent string
header http.Header
auth core.Authorizer
ctx context.Context
ws *WebSockets
wsConn *websocket.Conn
cancelCtx func()
connID string
sendMessages chan interface{}
senderDone chan struct{}
receiverDone chan struct{}
autoAck bool
started []*websocketStartedSub
inflight []*core.EventDeliveryResponse
mux sync.Mutex
closed bool
remoteAddr string
userAgent string
header http.Header
auth core.Authorizer
namespaceScoped bool // if true then any request to listen is asserted to be in the context of namespace
namespace string
}

func newConnection(pCtx context.Context, ws *WebSockets, wsConn *websocket.Conn, req *http.Request, auth core.Authorizer) *websocketConnection {
Expand All @@ -80,6 +82,18 @@ func newConnection(pCtx context.Context, ws *WebSockets, wsConn *websocket.Conn,
return wc
}

func (wc *websocketConnection) assertNamespace(namespace string) (string, error) {

if wc.namespaceScoped {
if namespace == "" {
namespace = wc.namespace
} else if namespace != wc.namespace {
return "", i18n.NewError(wc.ctx, coremsgs.MsgWSWrongNamespace)
}
}
return namespace, nil
}

// processAutoStart gives a helper to specify query parameters to auto-start your subscription
func (wc *websocketConnection) processAutoStart(req *http.Request) {
query := req.URL.Query()
Expand All @@ -88,12 +102,18 @@ func (wc *websocketConnection) processAutoStart(req *http.Request) {
_, hasName := query["name"]
autoAck, hasAutoack := req.URL.Query()["autoack"]
isAutoack := hasAutoack && (len(autoAck) == 0 || autoAck[0] != "false")
namespace, err := wc.assertNamespace(query.Get("namespace"))
if err != nil {
wc.protocolError(err)
return
}

if hasEphemeral || hasName {
filter := core.NewSubscriptionFilterFromQuery(query)
err := wc.handleStart(&core.WSStart{
AutoAck: &isAutoack,
Ephemeral: isEphemeral,
Namespace: query.Get("namespace"),
Namespace: namespace,
Name: query.Get("name"),
Filter: filter,
})
Expand Down Expand Up @@ -157,7 +177,10 @@ func (wc *websocketConnection) receiveLoop() {
var msg core.WSStart
err = json.Unmarshal(msgData, &msg)
if err == nil {
err = wc.authorizeMessage(msg.Namespace)
msg.Namespace, err = wc.assertNamespace(msg.Namespace)
if err == nil {
err = wc.authorizeMessage(msg.Namespace)
}
if err == nil {
err = wc.handleStart(&msg)
}
Expand Down Expand Up @@ -251,6 +274,14 @@ func (wc *websocketConnection) restartForNamespace(ns string, startTime time.Tim
}

func (wc *websocketConnection) handleStart(start *core.WSStart) (err error) {
// this will very likely already be checked before we get here but
// it doesn't do any harm to do a final assertion just in case it hasn't been done yet

start.Namespace, err = wc.assertNamespace(start.Namespace)
if err != nil {
return err
}

wc.mux.Lock()
if start.AutoAck != nil {
if *start.AutoAck != wc.autoAck && len(wc.started) > 0 {
Expand Down
23 changes: 23 additions & 0 deletions internal/events/websockets/websockets.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ import (
"github.com/hyperledger/firefly/pkg/events"
)

type WebSocketsNamespaced interface {
ServeHTTPNamespaced(namespace string, res http.ResponseWriter, req *http.Request)
}

type WebSockets struct {
ctx context.Context
capabilities *events.Capabilities
Expand Down Expand Up @@ -122,6 +126,25 @@ func (ws *WebSockets) ServeHTTP(res http.ResponseWriter, req *http.Request) {
wc.processAutoStart(req)
}

func (ws *WebSockets) ServeHTTPNamespaced(namespace string, res http.ResponseWriter, req *http.Request) {

wsConn, err := ws.upgrader.Upgrade(res, req, nil)
if err != nil {
log.L(ws.ctx).Errorf("WebSocket upgrade failed: %s", err)
return
}

ws.connMux.Lock()
wc := newConnection(ws.ctx, ws, wsConn, req, ws.auth)
wc.namespaceScoped = true
wc.namespace = namespace
ws.connections[wc.connID] = wc
ws.connMux.Unlock()

wc.processAutoStart(req)

}

func (ws *WebSockets) ack(connID string, inflight *core.EventDeliveryResponse) {
if cb, ok := ws.callbacks.handlers[inflight.Subscription.Namespace]; ok {
cb.DeliveryResponse(connID, inflight)
Expand Down
Loading

0 comments on commit f92b532

Please sign in to comment.