Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

namespace scoped web sockets #1419

Merged
merged 1 commit into from
Oct 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ $(eval $(call makemock, internal/operations, Manager, operat
$(eval $(call makemock, internal/multiparty, Manager, multipartymocks))
$(eval $(call makemock, internal/apiserver, FFISwaggerGen, apiservermocks))
$(eval $(call makemock, internal/apiserver, Server, apiservermocks))
$(eval $(call makemock, internal/events/websockets, WebSocketsNamespaced, websocketsmocks))

firefly-nocgo: ${GOFILES}
CGO_ENABLED=0 $(VGO) build -o ${BINARY_NAME}-nocgo -ldflags "-X main.buildDate=$(DATE) -X main.buildVersion=$(BUILD_VERSION) -X 'github.com/hyperledger/firefly/cmd.BuildVersionOverride=$(BUILD_VERSION)' -X 'github.com/hyperledger/firefly/cmd.BuildDate=$(DATE)' -X 'github.com/hyperledger/firefly/cmd.BuildCommit=$(GIT_REF)'" -tags=prod -tags=prod -v
Expand Down
20 changes: 20 additions & 0 deletions internal/apiserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,9 @@ func (as *apiServer) createMuxRouter(ctx context.Context, mgr namespace.Manager)
ws.(*websockets.WebSockets).SetAuthorizer(mgr)
r.HandleFunc(`/ws`, ws.(*websockets.WebSockets).ServeHTTP)

// namespace scoped web sockets
r.HandleFunc("/api/v1/namespaces/{ns}/ws", hf.APIWrapper(getNamespacedWebSocketHandler(ws.(*websockets.WebSockets), mgr)))

uiPath := config.GetString(coreconfig.UIPath)
if uiPath != "" && config.GetBool(coreconfig.UIEnabled) {
r.PathPrefix(`/ui`).Handler(newStaticHandler(uiPath, "index.html", `/ui`))
Expand All @@ -394,6 +397,23 @@ func (as *apiServer) createMuxRouter(ctx context.Context, mgr namespace.Manager)
return r
}

func getNamespacedWebSocketHandler(ws websockets.WebSocketsNamespaced, mgr namespace.Manager) ffapi.HandlerFunction {
return func(res http.ResponseWriter, req *http.Request) (status int, err error) {

vars := mux.Vars(req)
namespace := vars["ns"]
or, err := mgr.Orchestrator(req.Context(), namespace, false)
if err != nil || or == nil {
return 404, i18n.NewError(req.Context(), coremsgs.Msg404NotFound)
}

ws.ServeHTTPNamespaced(namespace, res, req)

return 200, nil
}

}

func (as *apiServer) notFoundHandler(res http.ResponseWriter, req *http.Request) (status int, err error) {
res.Header().Add("Content-Type", "application/json")
return 404, i18n.NewError(req.Context(), coremsgs.Msg404NotFound)
Expand Down
36 changes: 36 additions & 0 deletions internal/apiserver/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"mime/multipart"
Expand All @@ -43,6 +44,7 @@ import (
"github.com/hyperledger/firefly/mocks/namespacemocks"
"github.com/hyperledger/firefly/mocks/orchestratormocks"
"github.com/hyperledger/firefly/mocks/spieventsmocks"
"github.com/hyperledger/firefly/mocks/websocketsmocks"
"github.com/hyperledger/firefly/pkg/core"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
Expand Down Expand Up @@ -513,3 +515,37 @@ func TestGetOrchestratorMissingTag(t *testing.T) {
_, err := getOrchestrator(context.Background(), &namespacemocks.Manager{}, "", nil)
assert.Regexp(t, "FF10437", err)
}

func TestGetNamespacedWebSocketHandler(t *testing.T) {
mgr, _, _ := newTestServer()
mwsns := &websocketsmocks.WebSocketsNamespaced{}
mwsns.On("ServeHTTPNamespaced", "ns1", mock.Anything, mock.Anything).Return()

var b bytes.Buffer
req := httptest.NewRequest("GET", "/api/v1/namespaces/ns1/ws", &b)
req = mux.SetURLVars(req, map[string]string{"ns": "ns1"})
req.Header.Set("Content-Type", "application/json; charset=utf-8")
res := httptest.NewRecorder()

handler := getNamespacedWebSocketHandler(mwsns, mgr)
status, err := handler(res, req)
assert.NoError(t, err)
assert.Equal(t, 200, status)
}

func TestGetNamespacedWebSocketHandlerUnknownNamespace(t *testing.T) {
mgr, _, _ := newTestServer()
mwsns := &websocketsmocks.WebSocketsNamespaced{}

mgr.On("Orchestrator", mock.Anything, "unknown", false).Return(nil, errors.New("unknown namespace")).Maybe()
var b bytes.Buffer
req := httptest.NewRequest("GET", "/api/v1/namespaces/unknown/ws", &b)
req = mux.SetURLVars(req, map[string]string{"ns": "unknown"})
req.Header.Set("Content-Type", "application/json; charset=utf-8")
res := httptest.NewRecorder()

handler := getNamespacedWebSocketHandler(mwsns, mgr)
status, err := handler(res, req)
assert.Error(t, err)
assert.Equal(t, 404, status)
}
1 change: 1 addition & 0 deletions internal/coremsgs/en_error_messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,4 +300,5 @@ var (
MsgTokensRESTErrConflict = ffe("FF10459", "Conflict from tokens service: %s", 409)
MsgBatchWithDataNotSupported = ffe("FF10460", "Provided subscription '%s' enables batching and withData which is not supported", 400)
MsgBatchDeliveryNotSupported = ffe("FF10461", "Batch delivery not supported by transport '%s'", 400)
MsgWSWrongNamespace = ffe("FF10462", "Websocket request received on a namespace scoped connection but the provided namespace does not match")
)
69 changes: 50 additions & 19 deletions internal/events/websockets/websocket_connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,23 +38,25 @@ type websocketStartedSub struct {
}

type websocketConnection struct {
ctx context.Context
ws *WebSockets
wsConn *websocket.Conn
cancelCtx func()
connID string
sendMessages chan interface{}
senderDone chan struct{}
receiverDone chan struct{}
autoAck bool
started []*websocketStartedSub
inflight []*core.EventDeliveryResponse
mux sync.Mutex
closed bool
remoteAddr string
userAgent string
header http.Header
auth core.Authorizer
ctx context.Context
ws *WebSockets
wsConn *websocket.Conn
cancelCtx func()
connID string
sendMessages chan interface{}
senderDone chan struct{}
receiverDone chan struct{}
autoAck bool
started []*websocketStartedSub
inflight []*core.EventDeliveryResponse
mux sync.Mutex
closed bool
remoteAddr string
userAgent string
header http.Header
auth core.Authorizer
namespaceScoped bool // if true then any request to listen is asserted to be in the context of namespace
namespace string
}

func newConnection(pCtx context.Context, ws *WebSockets, wsConn *websocket.Conn, req *http.Request, auth core.Authorizer) *websocketConnection {
Expand All @@ -80,6 +82,18 @@ func newConnection(pCtx context.Context, ws *WebSockets, wsConn *websocket.Conn,
return wc
}

func (wc *websocketConnection) assertNamespace(namespace string) (string, error) {

if wc.namespaceScoped {
if namespace == "" {
namespace = wc.namespace
} else if namespace != wc.namespace {
return "", i18n.NewError(wc.ctx, coremsgs.MsgWSWrongNamespace)
}
}
return namespace, nil
}

// processAutoStart gives a helper to specify query parameters to auto-start your subscription
func (wc *websocketConnection) processAutoStart(req *http.Request) {
query := req.URL.Query()
Expand All @@ -88,12 +102,18 @@ func (wc *websocketConnection) processAutoStart(req *http.Request) {
_, hasName := query["name"]
autoAck, hasAutoack := req.URL.Query()["autoack"]
isAutoack := hasAutoack && (len(autoAck) == 0 || autoAck[0] != "false")
namespace, err := wc.assertNamespace(query.Get("namespace"))
if err != nil {
wc.protocolError(err)
return
}

if hasEphemeral || hasName {
filter := core.NewSubscriptionFilterFromQuery(query)
err := wc.handleStart(&core.WSStart{
AutoAck: &isAutoack,
Ephemeral: isEphemeral,
Namespace: query.Get("namespace"),
Namespace: namespace,
Name: query.Get("name"),
Filter: filter,
})
Expand Down Expand Up @@ -157,7 +177,10 @@ func (wc *websocketConnection) receiveLoop() {
var msg core.WSStart
err = json.Unmarshal(msgData, &msg)
if err == nil {
err = wc.authorizeMessage(msg.Namespace)
msg.Namespace, err = wc.assertNamespace(msg.Namespace)
if err == nil {
err = wc.authorizeMessage(msg.Namespace)
}
if err == nil {
err = wc.handleStart(&msg)
}
Expand Down Expand Up @@ -251,6 +274,14 @@ func (wc *websocketConnection) restartForNamespace(ns string, startTime time.Tim
}

func (wc *websocketConnection) handleStart(start *core.WSStart) (err error) {
// this will very likely already be checked before we get here but
// it doesn't do any harm to do a final assertion just in case it hasn't been done yet

start.Namespace, err = wc.assertNamespace(start.Namespace)
if err != nil {
return err
}

wc.mux.Lock()
if start.AutoAck != nil {
if *start.AutoAck != wc.autoAck && len(wc.started) > 0 {
Expand Down
23 changes: 23 additions & 0 deletions internal/events/websockets/websockets.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ import (
"github.com/hyperledger/firefly/pkg/events"
)

type WebSocketsNamespaced interface {
ServeHTTPNamespaced(namespace string, res http.ResponseWriter, req *http.Request)
}

type WebSockets struct {
ctx context.Context
capabilities *events.Capabilities
Expand Down Expand Up @@ -122,6 +126,25 @@ func (ws *WebSockets) ServeHTTP(res http.ResponseWriter, req *http.Request) {
wc.processAutoStart(req)
}

func (ws *WebSockets) ServeHTTPNamespaced(namespace string, res http.ResponseWriter, req *http.Request) {

wsConn, err := ws.upgrader.Upgrade(res, req, nil)
if err != nil {
log.L(ws.ctx).Errorf("WebSocket upgrade failed: %s", err)
return
}

ws.connMux.Lock()
wc := newConnection(ws.ctx, ws, wsConn, req, ws.auth)
wc.namespaceScoped = true
wc.namespace = namespace
ws.connections[wc.connID] = wc
ws.connMux.Unlock()

wc.processAutoStart(req)

}

func (ws *WebSockets) ack(connID string, inflight *core.EventDeliveryResponse) {
if cb, ok := ws.callbacks.handlers[inflight.Subscription.Namespace]; ok {
cb.DeliveryResponse(connID, inflight)
Expand Down
Loading