From c3daac74b361eb22c028fed9212abb8cd25db8bd Mon Sep 17 00:00:00 2001 From: Dan Gottlieb Date: Mon, 27 Jan 2025 15:55:39 -0500 Subject: [PATCH 1/5] RSDK-9818: Annotate gRPC requests from modules to the viam-server with module names. Pass a VIAM_MODULE_NAME env variable to module processes. --- grpc/interceptors.go | 58 +++++++++++++++++++++++++ module/modmanager/manager.go | 1 + module/module.go | 21 ++++++++- module/testmodule/main.go | 21 ++++++++- robot/client/client.go | 5 +++ robot/client/client_options.go | 8 ++++ robot/impl/local_robot_test.go | 79 ++++++++++++++++++++++++++++++++++ robot/web/web.go | 4 ++ 8 files changed, 194 insertions(+), 3 deletions(-) diff --git a/grpc/interceptors.go b/grpc/interceptors.go index 7c89f88e0f7..5134f47b88e 100644 --- a/grpc/interceptors.go +++ b/grpc/interceptors.go @@ -5,6 +5,7 @@ import ( "time" "google.golang.org/grpc" + "google.golang.org/grpc/metadata" ) // DefaultMethodTimeout is the default context timeout for all inbound gRPC @@ -43,3 +44,60 @@ func EnsureTimeoutUnaryClientInterceptor( return invoker(ctx, method, req, reply, cc, opts...) } + +// The following code is for appending/extracting grpc metadata regarding module names/origins via +// contexts. +type modNameKeyType int + +const modNameKeyID = modNameKeyType(iota) + +// GetName returns the debug log key included when enabling the context for debug logging. +func GetModuleName(ctx context.Context) string { + valI := ctx.Value(modNameKeyID) + if val, ok := valI.(string); ok { + return val + } + + return "" +} + +const modNameMetadataKey = "modName" + +type ModInterceptors struct { + ModName string +} + +// UnaryClientInterceptor adds debug directives from the current context (if any) to the +// outgoing request's metadata. +func (mc *ModInterceptors) UnaryClientInterceptor( + ctx context.Context, + method string, + req, reply interface{}, + cc *grpc.ClientConn, + invoker grpc.UnaryInvoker, + opts ...grpc.CallOption, +) error { + ctx = metadata.AppendToOutgoingContext(ctx, modNameMetadataKey, mc.ModName) + return invoker(ctx, method, req, reply, cc, opts...) +} + +// UnaryServerInterceptor checks the incoming RPC metadata for a distributed tracing directive and +// attaches any information to a debug context. +func ModNameUnaryServerInterceptor( + ctx context.Context, + req interface{}, + info *grpc.UnaryServerInfo, + handler grpc.UnaryHandler, +) (interface{}, error) { + meta, ok := metadata.FromIncomingContext(ctx) + if !ok { + return handler(ctx, req) + } + + values := meta.Get(modNameMetadataKey) + if len(values) == 1 { + ctx = context.WithValue(ctx, modNameKeyID, values[0]) + } + + return handler(ctx, req) +} diff --git a/module/modmanager/manager.go b/module/modmanager/manager.go index f5b05478f6e..3fcc756609d 100644 --- a/module/modmanager/manager.go +++ b/module/modmanager/manager.go @@ -1408,6 +1408,7 @@ func getFullEnvironment( environment := map[string]string{ "VIAM_HOME": viamHomeDir, "VIAM_MODULE_DATA": dataDir, + "VIAM_MODULE_NAME": cfg.Name, } if cfg.Type == config.ModuleTypeRegistry { environment["VIAM_MODULE_ID"] = cfg.ModuleID diff --git a/module/module.go b/module/module.go index 877741ec0da..027aaece83a 100644 --- a/module/module.go +++ b/module/module.go @@ -175,6 +175,9 @@ type peerResourceState struct { // Module represents an external resource module that services components/services. type Module struct { + // The name of the module as per the robot config. + name string + shutdownCtx context.Context shutdownFn context.CancelFunc parent *client.RobotClient @@ -219,7 +222,12 @@ func NewModule(ctx context.Context, address string, logger logging.Logger) (*Mod } cancelCtx, cancel := context.WithCancel(context.Background()) + + // If the env variable does not exist, the empty string is returned. + modName, _ := os.LookupEnv("VIAM_MODULE_NAME") + m := &Module{ + name: modName, shutdownCtx: cancelCtx, shutdownFn: cancel, logger: logger, @@ -369,7 +377,18 @@ func (m *Module) connectParent(ctx context.Context) error { clientLogger := logging.NewLogger("networking.module-connection") clientLogger.SetLevel(m.logger.GetLevel()) // TODO(PRODUCT-343): add session support to modules - rc, err := client.New(ctx, fullAddr, clientLogger, client.WithDisableSessions()) + + connectOptions := []client.RobotClientOption{ + client.WithDisableSessions(), + } + + // Modules compiled against newer SDKs may be running against older `viam-server`s that do not + // provide the module name as an env variable. + if m.name != "" { + connectOptions = append(connectOptions, client.WithModName(m.name)) + } + + rc, err := client.New(ctx, fullAddr, m.logger, connectOptions...) if err != nil { return err } diff --git a/module/testmodule/main.go b/module/testmodule/main.go index c6abc022b24..39fabfedc7a 100644 --- a/module/testmodule/main.go +++ b/module/testmodule/main.go @@ -13,6 +13,7 @@ import ( "go.viam.com/rdk/components/generic" "go.viam.com/rdk/components/motor" + "go.viam.com/rdk/components/sensor" "go.viam.com/rdk/logging" "go.viam.com/rdk/module" "go.viam.com/rdk/resource" @@ -106,9 +107,21 @@ func mainWithArgs(ctx context.Context, args []string, logger logging.Logger) err func newHelper( ctx context.Context, deps resource.Dependencies, conf resource.Config, logger logging.Logger, ) (resource.Resource, error) { + var dependsOnSensor sensor.Sensor + for _, resObj := range deps { + if resSensor, ok := resObj.(sensor.Sensor); ok { + dependsOnSensor = resSensor + } + } + + if len(deps) > 0 && dependsOnSensor == nil { + panic("bad") + } + return &helper{ - Named: conf.ResourceName().AsNamed(), - logger: logger, + Named: conf.ResourceName().AsNamed(), + logger: logger, + dependsOnSensor: dependsOnSensor, }, nil } @@ -117,6 +130,7 @@ type helper struct { resource.TriviallyCloseable logger logging.Logger numReconfigurations int + dependsOnSensor sensor.Sensor } // DoCommand looks up the "real" command from the map it's passed. @@ -191,6 +205,9 @@ func (h *helper) DoCommand(ctx context.Context, req map[string]interface{}) (map return map[string]any{}, nil case "get_num_reconfigurations": return map[string]any{"num_reconfigurations": h.numReconfigurations}, nil + case "do_readings_on_dep": + h.dependsOnSensor.Readings(ctx, nil) + return nil, nil default: return nil, fmt.Errorf("unknown command string %s", cmd) } diff --git a/robot/client/client.go b/robot/client/client.go index a3967f954bc..c0c951fe830 100644 --- a/robot/client/client.go +++ b/robot/client/client.go @@ -290,6 +290,11 @@ func New(ctx context.Context, address string, clientLogger logging.ZapCompatible rpc.WithStreamClientInterceptor(streamClientInterceptor()), ) + if rOpts.modName != "" { + inter := &grpc.ModInterceptors{ModName: rOpts.modName} + rc.dialOptions = append(rc.dialOptions, rpc.WithUnaryClientInterceptor(inter.UnaryClientInterceptor)) + } + if err := rc.Connect(ctx); err != nil { return nil, err } diff --git a/robot/client/client_options.go b/robot/client/client_options.go index f05dee4ccdc..a3949a83153 100644 --- a/robot/client/client_options.go +++ b/robot/client/client_options.go @@ -32,6 +32,8 @@ type robotClientOpts struct { // controls whether or not sessions are disabled. disableSessions bool + + modName string } // RobotClientOption configures how we set up the connection. @@ -56,6 +58,12 @@ func newFuncRobotClientOption(f func(*robotClientOpts)) *funcRobotClientOption { } } +func WithModName(modName string) RobotClientOption { + return newFuncRobotClientOption(func(o *robotClientOpts) { + o.modName = modName + }) +} + // WithRefreshEvery returns a RobotClientOption for how often to refresh the status/parts of the // robot. func WithRefreshEvery(refreshEvery time.Duration) RobotClientOption { diff --git a/robot/impl/local_robot_test.go b/robot/impl/local_robot_test.go index 0d56947e3a6..cc99b50fae9 100644 --- a/robot/impl/local_robot_test.go +++ b/robot/impl/local_robot_test.go @@ -46,6 +46,7 @@ import ( "go.viam.com/rdk/config" "go.viam.com/rdk/examples/customresources/apis/gizmoapi" "go.viam.com/rdk/examples/customresources/apis/summationapi" + "go.viam.com/rdk/grpc" rgrpc "go.viam.com/rdk/grpc" internalcloud "go.viam.com/rdk/internal/cloud" "go.viam.com/rdk/logging" @@ -4534,3 +4535,81 @@ func TestRemovingOfflineRemotes(t *testing.T) { cancelReconfig() wg.Wait() } + +// TestModuleNamePassing asserts that module names are passed from viam-server -> module +// properly. Such that incoming requests from module -> viam-server identify themselves. And can be +// observed on contexts via `grpc.GetModuleName(ctx)`. +func TestModuleNamePassing(t *testing.T) { + logger := logging.NewTestLogger(t) + + ctx := context.Background() + + // We will inject a `ReadingsFunc` handler. The request should come from the `testmodule` and + // the interceptors should pass along a module name. Which will get captured in the + // `moduleNameCh` that the end of the test will assert on. + // + // The channel must be buffered to such that the `ReadingsFunc` returns without waiting on a + // reader of the channel. + moduleNameCh := make(chan string, 1) + callbackSensor := &inject.Sensor{ + ReadingsFunc: func(ctx context.Context, extra map[string]any) (map[string]any, error) { + moduleNameCh <- grpc.GetModuleName(ctx) + return map[string]any{ + "reading": 42, + }, nil + }, + CloseFunc: func(ctx context.Context) error { + return nil + }, + } + + // The resource registry is a global. We must use unique model names to avoid unexpected + // collisions. + callbackModelName := resource.DefaultModelFamily.WithModel(utils.RandomAlphaString(8)) + resource.RegisterComponent( + sensor.API, + callbackModelName, + resource.Registration[sensor.Sensor, resource.NoNativeConfig]{Constructor: func( + ctx context.Context, + deps resource.Dependencies, + conf resource.Config, + logger logging.Logger, + ) (sensor.Sensor, error) { + // Be lazy -- just return an a singleton object. + return callbackSensor, nil + }}) + + const moduleName = "fancy_module_name" + localRobot := setupLocalRobot(t, ctx, &config.Config{ + Modules: []config.Module{ + { + Name: moduleName, + ExePath: rtestutils.BuildTempModule(t, "module/testmodule"), + Type: config.ModuleTypeLocal, + }, + }, + Components: []resource.Config{ + // We will invoke a special `DoCommand` on `modularComp`. It will expect its `DependsOn: + // "foo"` to be a sensor. And call the `Readings` API on that sensor. + { + Name: "modularComp", + API: generic.API, + Model: resource.NewModel("rdk", "test", "helper"), + DependsOn: []string{"foo"}, + }, + // `foo` will be a sensor that we've instrumented with the injected `ReadingsFunc`. + { + Name: "foo", + API: sensor.API, + Model: callbackModelName, + }, + }, + }, logger) + + res, err := localRobot.ResourceByName(generic.Named("modularComp")) + test.That(t, err, test.ShouldBeNil) + + _, err = res.DoCommand(ctx, map[string]interface{}{"command": "do_readings_on_dep"}) + test.That(t, err, test.ShouldBeNil) + test.That(t, <-moduleNameCh, test.ShouldEqual, moduleName) +} diff --git a/robot/web/web.go b/robot/web/web.go index dbf5ae4a713..51d43de7870 100644 --- a/robot/web/web.go +++ b/robot/web/web.go @@ -194,6 +194,10 @@ func (svc *webService) StartModule(ctx context.Context) error { unaryInterceptors = append(unaryInterceptors, grpc.EnsureTimeoutUnaryServerInterceptor) + // Attach the module name (as defined by the robot config) to the handler context. Can be + // accessed via `grpc.GetModuleName`. + unaryInterceptors = append(unaryInterceptors, grpc.ModNameUnaryServerInterceptor) + opManager := svc.r.OperationManager() unaryInterceptors = append(unaryInterceptors, opManager.UnaryServerInterceptor, logging.UnaryServerInterceptor) From 5c3446a109931dcc913f0e41aa9d657d0cc8aae6 Mon Sep 17 00:00:00 2001 From: Dan Gottlieb Date: Mon, 27 Jan 2025 20:42:01 -0500 Subject: [PATCH 2/5] lint --- grpc/interceptors.go | 12 +++++++----- module/testmodule/main.go | 4 ++-- robot/client/client_options.go | 2 ++ robot/impl/local_robot_test.go | 5 ++--- 4 files changed, 13 insertions(+), 10 deletions(-) diff --git a/grpc/interceptors.go b/grpc/interceptors.go index 5134f47b88e..3f23cfc0bd7 100644 --- a/grpc/interceptors.go +++ b/grpc/interceptors.go @@ -51,7 +51,8 @@ type modNameKeyType int const modNameKeyID = modNameKeyType(iota) -// GetName returns the debug log key included when enabling the context for debug logging. +// GetModuleName returns the module name (if any) the request came from. The module name will match +// a string from the robot config. func GetModuleName(ctx context.Context) string { valI := ctx.Value(modNameKeyID) if val, ok := valI.(string); ok { @@ -63,12 +64,13 @@ func GetModuleName(ctx context.Context) string { const modNameMetadataKey = "modName" +// ModInterceptors takes a user input `ModName` and exposes an interceptor method that will attach +// it to outgoing gRPC requests. type ModInterceptors struct { ModName string } -// UnaryClientInterceptor adds debug directives from the current context (if any) to the -// outgoing request's metadata. +// UnaryClientInterceptor adds a module name to any outgoing unary gRPC request. func (mc *ModInterceptors) UnaryClientInterceptor( ctx context.Context, method string, @@ -81,8 +83,8 @@ func (mc *ModInterceptors) UnaryClientInterceptor( return invoker(ctx, method, req, reply, cc, opts...) } -// UnaryServerInterceptor checks the incoming RPC metadata for a distributed tracing directive and -// attaches any information to a debug context. +// ModNameUnaryServerInterceptor checks the incoming RPC metadata for a module name and attaches any +// information to a context that can be retrieved with `GetModuleName`. func ModNameUnaryServerInterceptor( ctx context.Context, req interface{}, diff --git a/module/testmodule/main.go b/module/testmodule/main.go index 39fabfedc7a..a73483753eb 100644 --- a/module/testmodule/main.go +++ b/module/testmodule/main.go @@ -206,8 +206,8 @@ func (h *helper) DoCommand(ctx context.Context, req map[string]interface{}) (map case "get_num_reconfigurations": return map[string]any{"num_reconfigurations": h.numReconfigurations}, nil case "do_readings_on_dep": - h.dependsOnSensor.Readings(ctx, nil) - return nil, nil + _, err := h.dependsOnSensor.Readings(ctx, nil) + return nil, err default: return nil, fmt.Errorf("unknown command string %s", cmd) } diff --git a/robot/client/client_options.go b/robot/client/client_options.go index a3949a83153..a90de5d90e7 100644 --- a/robot/client/client_options.go +++ b/robot/client/client_options.go @@ -58,6 +58,8 @@ func newFuncRobotClientOption(f func(*robotClientOpts)) *funcRobotClientOption { } } +// WithModName attaches a unary interceptor that attaches the module name for each outgoing gRPC +// request. func WithModName(modName string) RobotClientOption { return newFuncRobotClientOption(func(o *robotClientOpts) { o.modName = modName diff --git a/robot/impl/local_robot_test.go b/robot/impl/local_robot_test.go index cc99b50fae9..fbfc2fb4cd7 100644 --- a/robot/impl/local_robot_test.go +++ b/robot/impl/local_robot_test.go @@ -46,7 +46,6 @@ import ( "go.viam.com/rdk/config" "go.viam.com/rdk/examples/customresources/apis/gizmoapi" "go.viam.com/rdk/examples/customresources/apis/summationapi" - "go.viam.com/rdk/grpc" rgrpc "go.viam.com/rdk/grpc" internalcloud "go.viam.com/rdk/internal/cloud" "go.viam.com/rdk/logging" @@ -4538,7 +4537,7 @@ func TestRemovingOfflineRemotes(t *testing.T) { // TestModuleNamePassing asserts that module names are passed from viam-server -> module // properly. Such that incoming requests from module -> viam-server identify themselves. And can be -// observed on contexts via `grpc.GetModuleName(ctx)`. +// observed on contexts via `[r]grpc.GetModuleName(ctx)`. func TestModuleNamePassing(t *testing.T) { logger := logging.NewTestLogger(t) @@ -4553,7 +4552,7 @@ func TestModuleNamePassing(t *testing.T) { moduleNameCh := make(chan string, 1) callbackSensor := &inject.Sensor{ ReadingsFunc: func(ctx context.Context, extra map[string]any) (map[string]any, error) { - moduleNameCh <- grpc.GetModuleName(ctx) + moduleNameCh <- rgrpc.GetModuleName(ctx) return map[string]any{ "reading": 42, }, nil From bfda26d91ec069f3ff33d56d55b60dc2fa024a25 Mon Sep 17 00:00:00 2001 From: Dan Gottlieb Date: Tue, 28 Jan 2025 11:29:32 -0500 Subject: [PATCH 3/5] comments --- module/module.go | 3 ++- module/testmodule/main.go | 6 ++---- robot/client/client_options.go | 2 +- 3 files changed, 5 insertions(+), 6 deletions(-) diff --git a/module/module.go b/module/module.go index 027aaece83a..958393df2eb 100644 --- a/module/module.go +++ b/module/module.go @@ -175,7 +175,8 @@ type peerResourceState struct { // Module represents an external resource module that services components/services. type Module struct { - // The name of the module as per the robot config. + // The name of the module as per the robot config. This value is communicated via the + // `VIAM_MODULE_NAME` env var. name string shutdownCtx context.Context diff --git a/module/testmodule/main.go b/module/testmodule/main.go index a73483753eb..c28409ba571 100644 --- a/module/testmodule/main.go +++ b/module/testmodule/main.go @@ -108,10 +108,8 @@ func newHelper( ctx context.Context, deps resource.Dependencies, conf resource.Config, logger logging.Logger, ) (resource.Resource, error) { var dependsOnSensor sensor.Sensor - for _, resObj := range deps { - if resSensor, ok := resObj.(sensor.Sensor); ok { - dependsOnSensor = resSensor - } + if len(conf.DependsOn) > 0 { + dependsOnSensor, err := sensor.FromDependencies(deps, conf.DependsOn[0]) } if len(deps) > 0 && dependsOnSensor == nil { diff --git a/robot/client/client_options.go b/robot/client/client_options.go index a90de5d90e7..4e7a11d79bf 100644 --- a/robot/client/client_options.go +++ b/robot/client/client_options.go @@ -59,7 +59,7 @@ func newFuncRobotClientOption(f func(*robotClientOpts)) *funcRobotClientOption { } // WithModName attaches a unary interceptor that attaches the module name for each outgoing gRPC -// request. +// request. Should only be used in Viam module library code. func WithModName(modName string) RobotClientOption { return newFuncRobotClientOption(func(o *robotClientOpts) { o.modName = modName From c897bd8c861ff796a3f3052c5c108da242c1c53c Mon Sep 17 00:00:00 2001 From: Dan Gottlieb Date: Tue, 28 Jan 2025 12:45:08 -0500 Subject: [PATCH 4/5] lint --- module/testmodule/main.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/module/testmodule/main.go b/module/testmodule/main.go index c28409ba571..0c30aa9458d 100644 --- a/module/testmodule/main.go +++ b/module/testmodule/main.go @@ -108,12 +108,16 @@ func newHelper( ctx context.Context, deps resource.Dependencies, conf resource.Config, logger logging.Logger, ) (resource.Resource, error) { var dependsOnSensor sensor.Sensor + var err error if len(conf.DependsOn) > 0 { - dependsOnSensor, err := sensor.FromDependencies(deps, conf.DependsOn[0]) + dependsOnSensor, err = sensor.FromDependencies(deps, conf.DependsOn[0]) + if err != nil { + return nil, err + } } if len(deps) > 0 && dependsOnSensor == nil { - panic("bad") + return nil, fmt.Errorf("Sensor not found in deps: %v", deps) } return &helper{ From 8c0a9dbb23ad18a0e07e795d0f85c3bf0f2b9fa4 Mon Sep 17 00:00:00 2001 From: Dan Gottlieb Date: Tue, 28 Jan 2025 12:57:07 -0500 Subject: [PATCH 5/5] lint --- module/testmodule/main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/module/testmodule/main.go b/module/testmodule/main.go index 0c30aa9458d..4e04901bcf3 100644 --- a/module/testmodule/main.go +++ b/module/testmodule/main.go @@ -117,7 +117,7 @@ func newHelper( } if len(deps) > 0 && dependsOnSensor == nil { - return nil, fmt.Errorf("Sensor not found in deps: %v", deps) + return nil, fmt.Errorf("sensor not found in deps: %v", deps) } return &helper{