Skip to content

Commit

Permalink
Avoid need to pass ftdc into webSvc
Browse files Browse the repository at this point in the history
  • Loading branch information
dgottlieb committed Jan 31, 2025
1 parent df14b2b commit bafbe9a
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 83 deletions.
3 changes: 2 additions & 1 deletion robot/impl/local_robot.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ type localRobot struct {
configRevisionMu sync.RWMutex

// internal services that are in the graph but we also hold onto
webSvc web.Service
webSvc *web.WebService
frameSvc framesystem.Service

// map keyed by Module.Name. This is necessary to get the package manager to use a new folder
Expand Down Expand Up @@ -455,6 +455,7 @@ func newWithResources(
// we assume these never appear in our configs and as such will not be removed from the
// resource graph
r.webSvc = web.New(r, logger, rOpts.webOptions...)
r.ftdc.Add("web", r.webSvc.RequestCounters())
r.frameSvc, err = framesystem.New(ctx, resource.Dependencies{}, logger)
if err != nil {
return nil, err
Expand Down
3 changes: 1 addition & 2 deletions robot/web/graphviz_c.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,10 @@ import (
"time"

"github.com/goccy/go-graphviz"

"go.viam.com/rdk/robot"
)

func (svc *webService) handleVisualizeResourceGraph(w http.ResponseWriter, r *http.Request) {
func (svc *WebService) handleVisualizeResourceGraph(w http.ResponseWriter, r *http.Request) {
localRobot, isLocal := svc.r.(robot.LocalRobot)
if !isLocal {
return
Expand Down
80 changes: 52 additions & 28 deletions robot/web/web.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"go.viam.com/rdk/robot"
grpcserver "go.viam.com/rdk/robot/server"
weboptions "go.viam.com/rdk/robot/web/options"
webstream "go.viam.com/rdk/robot/web/stream"
rutils "go.viam.com/rdk/utils"
)

Expand All @@ -58,6 +59,28 @@ var API = resource.APINamespaceRDKInternal.WithServiceType(SubtypeName)
// InternalServiceName is used to refer to/depend on this service internally.
var InternalServiceName = resource.NewName(API, "builtin")

type WebService struct {
resource.Named

mu sync.Mutex
r robot.Robot
rpcServer rpc.Server
modServer rpc.Server
streamServer *webstream.Server
services map[resource.API]resource.APIResourceCollection[resource.Resource]
opts options
addr string
modAddr string
logger logging.Logger
cancelCtx context.Context
cancelFunc func()
isRunning bool
webWorkers sync.WaitGroup
modWorkers sync.WaitGroup

counter *RequestCounters
}

// A Service controls the web server for a robot.
type Service interface {
resource.Resource
Expand Down Expand Up @@ -85,12 +108,12 @@ var internalWebServiceName = resource.NewName(
"builtin",
)

func (svc *webService) Name() resource.Name {
func (svc *WebService) Name() resource.Name {
return internalWebServiceName
}

// Start starts the web server, will return an error if server is already up.
func (svc *webService) Start(ctx context.Context, o weboptions.Options) error {
func (svc *WebService) Start(ctx context.Context, o weboptions.Options) error {
svc.mu.Lock()
defer svc.mu.Unlock()
if svc.isRunning {
Expand Down Expand Up @@ -140,21 +163,21 @@ func RunWebWithConfig(ctx context.Context, r robot.LocalRobot, cfg *config.Confi
}

// Address returns the address the service is listening on.
func (svc *webService) Address() string {
func (svc *WebService) Address() string {
svc.mu.Lock()
defer svc.mu.Unlock()
return svc.addr
}

// ModuleAddress returns the unix socket path the module server is listening on.
func (svc *webService) ModuleAddress() string {
func (svc *WebService) ModuleAddress() string {
svc.mu.Lock()
defer svc.mu.Unlock()
return svc.modAddr
}

// StartModule starts the grpc module server.
func (svc *webService) StartModule(ctx context.Context) error {
func (svc *WebService) StartModule(ctx context.Context) error {
svc.mu.Lock()
defer svc.mu.Unlock()
if svc.modServer != nil {
Expand Down Expand Up @@ -193,6 +216,7 @@ func (svc *webService) StartModule(ctx context.Context) error {
)

unaryInterceptors = append(unaryInterceptors, grpc.EnsureTimeoutUnaryServerInterceptor)
unaryInterceptors = append(unaryInterceptors, svc.counter.UnaryInterceptor)

// Attach the module name (as defined by the robot config) to the handler context. Can be
// accessed via `grpc.GetModuleName`.
Expand Down Expand Up @@ -232,7 +256,7 @@ func (svc *webService) StartModule(ctx context.Context) error {
return nil
}

func (svc *webService) refreshResources() error {
func (svc *WebService) refreshResources() error {
resources := make(map[resource.Name]resource.Resource)
for _, name := range svc.r.ResourceNames() {
resource, err := svc.r.ResourceByName(name)
Expand All @@ -246,7 +270,7 @@ func (svc *webService) refreshResources() error {

// updateResources gets every existing resource on the robot's resource graph and updates ResourceAPICollection object
// with the correct resources, include deleting ones which have been removed from the resource graph.
func (svc *webService) updateResources(resources map[resource.Name]resource.Resource) error {
func (svc *WebService) updateResources(resources map[resource.Name]resource.Resource) error {
groupedResources := make(map[resource.API]map[resource.Name]resource.Resource)
for n, v := range resources {
r, ok := groupedResources[n.API]
Expand Down Expand Up @@ -302,13 +326,13 @@ func (svc *webService) updateResources(resources map[resource.Name]resource.Reso
}

// Stop stops the main web service prior to actually closing (it leaves the module server running.)
func (svc *webService) Stop() {
func (svc *WebService) Stop() {
svc.mu.Lock()
defer svc.mu.Unlock()
svc.stopWeb()
}

func (svc *webService) stopWeb() {
func (svc *WebService) stopWeb() {
if svc.cancelFunc != nil {
svc.cancelFunc()
}
Expand All @@ -317,7 +341,7 @@ func (svc *webService) stopWeb() {
}

// Close closes a webService via calls to its Cancel func.
func (svc *webService) Close(ctx context.Context) error {
func (svc *WebService) Close(ctx context.Context) error {
svc.mu.Lock()
defer svc.mu.Unlock()
svc.stopWeb()
Expand All @@ -331,7 +355,7 @@ func (svc *webService) Close(ctx context.Context) error {

// runWeb takes the given robot and options and runs the web server. This function will
// block until the context is done.
func (svc *webService) runWeb(ctx context.Context, options weboptions.Options) (err error) {
func (svc *WebService) runWeb(ctx context.Context, options weboptions.Options) (err error) {
if options.Network.BindAddress != "" && options.Network.Listener != nil {
return errors.New("may only set one of network bind address or listener")
}
Expand Down Expand Up @@ -480,11 +504,11 @@ type Namer interface {
GetName() string
}

type methodCounter struct {
type RequestCounters struct {
counts sync.Map
}

func (mc *methodCounter) UnaryInterceptor(ctx context.Context, req any, info *googlegrpc.UnaryServerInfo, handler googlegrpc.UnaryHandler) (resp any, err error) {
func (mc *RequestCounters) UnaryInterceptor(ctx context.Context, req any, info *googlegrpc.UnaryServerInfo, handler googlegrpc.UnaryHandler) (resp any, err error) {
// Handle `info.FullMethod` values such as:
// - `/viam.component.motor.v1.MotorService/IsMoving`
// - `/viam.robot.v1.RobotService/SendSessionHeartbeat`
Expand Down Expand Up @@ -514,19 +538,18 @@ func (mc *methodCounter) UnaryInterceptor(ctx context.Context, req any, info *go
return handler(ctx, req)
}

func (mc *methodCounter) Stats() any {
func (mc *RequestCounters) Stats() any {
ret := make(map[string]int64)
mc.counts.Range(func(key, value any) bool {
ret[key.(string)] = value.(*atomic.Int64).Load()
return true
})
fmt.Println("Returning:", ret)

return ret
}

// Initialize RPC Server options.
func (svc *webService) initRPCOptions(listenerTCPAddr *net.TCPAddr, options weboptions.Options) ([]rpc.ServerOption, error) {
func (svc *WebService) initRPCOptions(listenerTCPAddr *net.TCPAddr, options weboptions.Options) ([]rpc.ServerOption, error) {
hosts := options.GetHosts(listenerTCPAddr)

webrtcOptions := rpc.WebRTCServerOptions{
Expand Down Expand Up @@ -557,11 +580,8 @@ func (svc *webService) initRPCOptions(listenerTCPAddr *net.TCPAddr, options webo
}

var unaryInterceptors []googlegrpc.UnaryServerInterceptor

unaryInterceptors = append(unaryInterceptors, grpc.EnsureTimeoutUnaryServerInterceptor)
counter := &methodCounter{}
svc.ftdc.Add("web", counter)
unaryInterceptors = append(unaryInterceptors, counter.UnaryInterceptor)
unaryInterceptors = append(unaryInterceptors, svc.counter.UnaryInterceptor)

if options.Debug {
rpcOpts = append(rpcOpts, rpc.WithDebug())
Expand Down Expand Up @@ -619,7 +639,7 @@ func (svc *webService) initRPCOptions(listenerTCPAddr *net.TCPAddr, options webo
}

// Initialize authentication handler options.
func (svc *webService) initAuthHandlers(listenerTCPAddr *net.TCPAddr, options weboptions.Options) ([]rpc.ServerOption, error) {
func (svc *WebService) initAuthHandlers(listenerTCPAddr *net.TCPAddr, options weboptions.Options) ([]rpc.ServerOption, error) {
rpcOpts := []rpc.ServerOption{}

if options.Managed && len(options.Auth.Handlers) == 1 {
Expand Down Expand Up @@ -698,7 +718,7 @@ func (svc *webService) initAuthHandlers(listenerTCPAddr *net.TCPAddr, options we
}

// Register every API resource grpc service here.
func (svc *webService) initAPIResourceCollections(ctx context.Context, mod bool) error {
func (svc *WebService) initAPIResourceCollections(ctx context.Context, mod bool) error {
// TODO (RSDK-144): only register necessary services
apiRegs := resource.RegisteredAPIs()
for s, rs := range apiRegs {
Expand All @@ -720,7 +740,7 @@ func (svc *webService) initAPIResourceCollections(ctx context.Context, mod bool)
}

// Initialize HTTP server.
func (svc *webService) initHTTPServer(listenerTCPAddr *net.TCPAddr, options weboptions.Options) (*http.Server, error) {
func (svc *WebService) initHTTPServer(listenerTCPAddr *net.TCPAddr, options weboptions.Options) (*http.Server, error) {
mux := svc.initMux(options)

httpServer, err := utils.NewPossiblySecureHTTPServer(mux, utils.HTTPServerOptions{
Expand All @@ -737,7 +757,7 @@ func (svc *webService) initHTTPServer(listenerTCPAddr *net.TCPAddr, options webo
}

// Initialize multiplexer between http handlers.
func (svc *webService) initMux(options weboptions.Options) *goji.Mux {
func (svc *WebService) initMux(options weboptions.Options) *goji.Mux {
mux := goji.NewMux()
// Note: used by viam-agent for health checks
mux.HandleFunc(pat.New("/"), func(w http.ResponseWriter, _ *http.Request) {
Expand Down Expand Up @@ -794,7 +814,7 @@ func (svc *webService) initMux(options weboptions.Options) *goji.Mux {
// It is invoked instead of returning the "unimplemented" gRPC error whenever a request is received for
// an unregistered service or method. These method could be registered on a remote viam-server or a module server
// so this handler will attempt to route the request to the correct next node in the chain.
func (svc *webService) foreignServiceHandler(srv interface{}, stream googlegrpc.ServerStream) error {
func (svc *WebService) foreignServiceHandler(srv interface{}, stream googlegrpc.ServerStream) error {
// method will be in the form of PackageName.ServiceName/MethodName
method, ok := googlegrpc.MethodFromServerStream(stream)
if !ok {
Expand Down Expand Up @@ -978,13 +998,17 @@ type stats struct {
}

// Stats returns ftdc data on behalf of the rpcServer and other web services.
func (svc *webService) Stats() any {
func (svc *WebService) Stats() any {
// RSDK-9369: It's not ideal to block in `Stats`. But we don't today expect this to be
// problematic, and alternatives are more complex/expensive.
svc.mu.Lock()
defer svc.mu.Unlock()

return stats{svc.rpcServer.Stats()}
return stats{RPCServer: svc.rpcServer.Stats()}
}

func (svc *WebService) RequestCounters() *RequestCounters {
return svc.counter
}

// RestartStatusResponse is the JSON response of the `restart_status` HTTP
Expand All @@ -996,7 +1020,7 @@ type RestartStatusResponse struct {
}

// Handles the `/restart_status` endpoint.
func (svc *webService) handleRestartStatus(w http.ResponseWriter, r *http.Request) {
func (svc *WebService) handleRestartStatus(w http.ResponseWriter, r *http.Request) {
localRobot, isLocal := svc.r.(robot.LocalRobot)
if !isLocal {
return
Expand Down
34 changes: 7 additions & 27 deletions robot/web/web_c.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,9 @@ import (
"bytes"
"context"
"net/http"
"sync"

"github.com/pkg/errors"
streampb "go.viam.com/api/stream/v1"
"go.viam.com/utils/rpc"

"go.viam.com/rdk/gostream"
"go.viam.com/rdk/logging"
Expand All @@ -20,45 +18,27 @@ import (
)

// New returns a new web service for the given robot.
func New(r robot.Robot, logger logging.Logger, opts ...Option) Service {
func New(r robot.Robot, logger logging.Logger, opts ...Option) *WebService {
var wOpts options
for _, opt := range opts {
opt.apply(&wOpts)
}
webSvc := &webService{
webSvc := &WebService{
Named: InternalServiceName.AsNamed(),
r: r,
logger: logger,
rpcServer: nil,
streamServer: nil,
services: map[resource.API]resource.APIResourceCollection[resource.Resource]{},
opts: wOpts,
counter: &RequestCounters{},
}
return webSvc
}

type webService struct {
resource.Named

mu sync.Mutex
r robot.Robot
rpcServer rpc.Server
modServer rpc.Server
streamServer *webstream.Server
services map[resource.API]resource.APIResourceCollection[resource.Resource]
opts options
addr string
modAddr string
logger logging.Logger
cancelCtx context.Context
cancelFunc func()
isRunning bool
webWorkers sync.WaitGroup
modWorkers sync.WaitGroup
return webSvc
}

// Reconfigure pulls resources and updates the stream server audio and video streams with the new resources.
func (svc *webService) Reconfigure(ctx context.Context, deps resource.Dependencies, _ resource.Config) error {
func (svc *WebService) Reconfigure(ctx context.Context, deps resource.Dependencies, _ resource.Config) error {
svc.mu.Lock()
defer svc.mu.Unlock()
if err := svc.updateResources(deps); err != nil {
Expand All @@ -70,13 +50,13 @@ func (svc *webService) Reconfigure(ctx context.Context, deps resource.Dependenci
return svc.streamServer.AddNewStreams(svc.cancelCtx)
}

func (svc *webService) closeStreamServer() {
func (svc *WebService) closeStreamServer() {
if err := svc.streamServer.Close(); err != nil {
svc.logger.Errorw("error closing stream server", "error", err)
}
}

func (svc *webService) initStreamServer(ctx context.Context) error {
func (svc *WebService) initStreamServer(ctx context.Context) error {
// Check to make sure stream config option is set in the webservice.
var streamConfig gostream.StreamConfig
if svc.opts.streamConfig != nil {
Expand Down
Loading

0 comments on commit bafbe9a

Please sign in to comment.