Skip to content

Commit

Permalink
Test moving the backfill API to NATS request-reply
Browse files Browse the repository at this point in the history
  • Loading branch information
neilalexander committed Mar 14, 2024
1 parent 5de04eb commit 1f75480
Show file tree
Hide file tree
Showing 5 changed files with 128 additions and 21 deletions.
5 changes: 4 additions & 1 deletion cmd/dendrite/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,11 @@ func main() {
cm := sqlutil.NewConnectionManager(processCtx, cfg.Global.DatabaseOptions)
routers := httputil.NewRouters()

caches := caching.NewRistrettoCache(cfg.Global.Cache.EstimatedMaxSize, cfg.Global.Cache.MaxAge, caching.EnableMetrics)
natsInstance := jetstream.NATSInstance{}
_, _ = natsInstance.Prepare(processCtx, &cfg.Global.JetStream)

caches := caching.NewRistrettoCache(cfg.Global.Cache.EstimatedMaxSize, cfg.Global.Cache.MaxAge, caching.EnableMetrics)

rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.EnableMetrics)
fsAPI := federationapi.NewInternalAPI(
processCtx, cfg, cm, &natsInstance, federationClient, rsAPI, caches, nil, false,
Expand Down
29 changes: 29 additions & 0 deletions roomserver/external/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package external

import (
"context"

"github.com/neilalexander/harmony/roomserver/api"
"github.com/neilalexander/harmony/setup/jetstream"
)

type RoomserverAPIClient struct {
nats *jetstream.NATSInstance
api.RoomserverInternalAPI
}

func NewRoomserverAPIClient(intapi api.RoomserverInternalAPI, nats *jetstream.NATSInstance) *RoomserverAPIClient {
c := &RoomserverAPIClient{
RoomserverInternalAPI: intapi,
nats: nats,
}
return c
}

func (c *RoomserverAPIClient) PPerformBackfill(
ctx context.Context,
req *api.PerformBackfillRequest,
res *api.PerformBackfillResponse,
) error {
return jetstream.CallAPI(c.nats, "Roomserver", "QueryServerJoinedToRoom", req, res)
}
32 changes: 32 additions & 0 deletions roomserver/external/server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package external

import (
"context"

"github.com/neilalexander/harmony/roomserver/api"
"github.com/neilalexander/harmony/setup/jetstream"
)

type RoomserverAPIServer struct {
nats *jetstream.NATSInstance
api.RoomserverInternalAPI
}

func NewRoomserverAPIServer(intapi api.RoomserverInternalAPI, nats *jetstream.NATSInstance) (*RoomserverAPIServer, error) {
s := &RoomserverAPIServer{
RoomserverInternalAPI: intapi,
nats: nats,
}
ctx := context.TODO()

if err := jetstream.ListenAPI(
s.nats, "Roomserver", "PerformBackfill",
func(req *api.PerformBackfillRequest, res *api.PerformBackfillResponse) error {
return s.RoomserverInternalAPI.PerformBackfill(ctx, req, res)
},
); err != nil {
return nil, err
}

return s, nil
}
13 changes: 10 additions & 3 deletions roomserver/roomserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/sirupsen/logrus"

"github.com/neilalexander/harmony/roomserver/api"
"github.com/neilalexander/harmony/roomserver/external"
"github.com/neilalexander/harmony/roomserver/internal"
"github.com/neilalexander/harmony/roomserver/storage"
)
Expand All @@ -44,9 +45,15 @@ func NewInternalAPI(
logrus.WithError(err).Panicf("failed to connect to room server db")
}

js, nc := natsInstance.Prepare(processContext, &cfg.Global.JetStream)

return internal.NewRoomserverAPI(
js, nc := natsInstance.JetStream, natsInstance.Conn
intapi := internal.NewRoomserverAPI(
processContext, cfg, roomserverDB, js, nc, caches, enableMetrics,
)

if _, err := external.NewRoomserverAPIServer(intapi, natsInstance); err != nil {
logrus.WithError(err).Warn("Setting up roomserver NATS responder failed")
return external.NewRoomserverAPIClient(intapi, natsInstance)
}

return intapi
}
70 changes: 53 additions & 17 deletions setup/jetstream/nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package jetstream

import (
"crypto/tls"
"encoding/json"
"fmt"
"reflect"
"strings"
Expand All @@ -19,9 +20,9 @@ import (
)

type NATSInstance struct {
*natsserver.Server
nc *natsclient.Conn
js natsclient.JetStreamContext
srv *natsserver.Server
Conn *natsclient.Conn
JetStream natsclient.JetStreamContext
}

var natsLock sync.Mutex
Expand All @@ -40,7 +41,7 @@ func (s *NATSInstance) Prepare(process *process.ProcessContext, cfg *config.JetS
if len(cfg.Addresses) != 0 {
return setupNATS(process, cfg, nil)
}
if s.Server == nil {
if s.srv == nil {
var err error
opts := &natsserver.Options{
ServerName: "monolith",
Expand All @@ -52,38 +53,38 @@ func (s *NATSInstance) Prepare(process *process.ProcessContext, cfg *config.JetS
NoSigs: true,
NoLog: cfg.NoLog,
}
s.Server, err = natsserver.NewServer(opts)
s.srv, err = natsserver.NewServer(opts)
if err != nil {
panic(err)
}
if !cfg.NoLog {
s.SetLogger(NewLogAdapter(), opts.Debug, opts.Trace)
s.srv.SetLogger(NewLogAdapter(), opts.Debug, opts.Trace)
}
go func() {
process.ComponentStarted()
s.Start()
s.srv.Start()
}()
if !s.srv.ReadyForConnections(time.Second * 60) {
logrus.Fatalln("NATS did not start in time")
}
go func() {
<-process.WaitForShutdown()
s.Shutdown()
s.WaitForShutdown()
s.srv.Shutdown()
s.srv.WaitForShutdown()
process.ComponentFinished()
}()
}
if !s.ReadyForConnections(time.Second * 60) {
logrus.Fatalln("NATS did not start in time")
}
// reuse existing connections
if s.nc != nil {
return s.js, s.nc
if s.Conn != nil {
return s.JetStream, s.Conn
}
nc, err := natsclient.Connect("", natsclient.InProcessServer(s))
nc, err := natsclient.Connect("", natsclient.InProcessServer(s.srv))
if err != nil {
logrus.Fatalln("Failed to create NATS client")
}
js, _ := setupNATS(process, cfg, nc)
s.js = js
s.nc = nc
s.JetStream = js
s.Conn = nc
return js, nc
}

Expand Down Expand Up @@ -236,3 +237,38 @@ func setupNATS(process *process.ProcessContext, cfg *config.JetStream, nc *natsc

return s, nc
}

func CallAPI[req, res any](s *NATSInstance, component, endpoint string, rq req, rs res) error {
subj := fmt.Sprintf("API.%s.%s", component, endpoint)
j, err := json.Marshal(rq)
if err != nil {
return err
}
resp, err := s.Conn.Request(subj, j, time.Second*5)
if err != nil {
return err
}
return json.Unmarshal(resp.Data, rs)
}

func ListenAPI[req, res any](s *NATSInstance, component, endpoint string, fn func(req req, res res) error) error {
subj := fmt.Sprintf("API.%s.%s", component, endpoint)
_, err := s.Conn.Subscribe(subj, func(msg *natsclient.Msg) {
var req req
var res res
if err := json.Unmarshal(msg.Data, &req); err != nil {
return
}
if err := fn(req, res); err != nil {
return
}
j, err := json.Marshal(res)
if err != nil {
return
}
if err := msg.Respond(j); err != nil {
return
}
})
return err
}

0 comments on commit 1f75480

Please sign in to comment.