From 09186d8d8e04cefd0c47832ee487508e4245e75f Mon Sep 17 00:00:00 2001 From: Bashar Eid Date: Thu, 23 Jan 2025 17:53:10 -0500 Subject: [PATCH 01/52] barely stub out new connection wrapper --- grpc/app_conn.go | 43 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 43 insertions(+) create mode 100644 grpc/app_conn.go diff --git a/grpc/app_conn.go b/grpc/app_conn.go new file mode 100644 index 00000000000..9fde534dd78 --- /dev/null +++ b/grpc/app_conn.go @@ -0,0 +1,43 @@ +package grpc + +import ( + "context" + "net/url" + + "go.viam.com/rdk/logging" + "go.viam.com/utils/rpc" +) + +// spin off Goroutine that attempts to create connection +// routine should at first block for some time interval +// if connection is not created after initial timeout, no longer block +// however, continue re-attempting connection at other specified time interval +// once connection establishes, close off routine + +type AppConn struct { + ReconfigurableClientConn +} + +func CreateNewGRPCClient(ctx context.Context, cloudCfg *logging.CloudConfig, logger logging.Logger) (rpc.ClientConn, error) { + grpcURL, err := url.Parse(cloudCfg.AppAddress) + if err != nil { + return nil, err + } + + dialOpts := make([]rpc.DialOption, 0, 2) + // Only add credentials when secret is set. + if cloudCfg.Secret != "" { + dialOpts = append(dialOpts, rpc.WithEntityCredentials(cloudCfg.ID, + rpc.Credentials{ + Type: "robot-secret", + Payload: cloudCfg.Secret, + }, + )) + } + + if grpcURL.Scheme == "http" { + dialOpts = append(dialOpts, rpc.WithInsecure()) + } + + return rpc.DialDirectGRPC(ctx, grpcURL.Host, logger, dialOpts...) +} From e18211c8c374da08082d378e1d579aa21b2412d9 Mon Sep 17 00:00:00 2001 From: Bashar Eid Date: Fri, 24 Jan 2025 17:53:46 -0500 Subject: [PATCH 02/52] checkpoint --- config/reader.go | 2 +- grpc/app_conn.go | 51 ++++++++++++++++++++++++++-------------- web/server/entrypoint.go | 7 +++++- 3 files changed, 41 insertions(+), 19 deletions(-) diff --git a/config/reader.go b/config/reader.go index 1898e41bd6d..7b4fc780173 100644 --- a/config/reader.go +++ b/config/reader.go @@ -181,7 +181,7 @@ func isLocationSecretsEqual(prevCloud, cloud *Cloud) bool { return true } -func getTimeoutCtx(ctx context.Context, shouldReadFromCache bool, id string) (context.Context, func()) { +func GetTimeoutCtx(ctx context.Context, shouldReadFromCache bool, id string) (context.Context, func()) { timeout := readTimeout // When environment indicates we are behind a proxy, bump timeout. Network // operations tend to take longer when behind a proxy. diff --git a/grpc/app_conn.go b/grpc/app_conn.go index 9fde534dd78..12022c1b2a9 100644 --- a/grpc/app_conn.go +++ b/grpc/app_conn.go @@ -3,41 +3,58 @@ package grpc import ( "context" "net/url" + "time" + "github.com/pkg/errors" + + "go.viam.com/rdk/config" "go.viam.com/rdk/logging" + "go.viam.com/utils" "go.viam.com/utils/rpc" ) -// spin off Goroutine that attempts to create connection -// routine should at first block for some time interval -// if connection is not created after initial timeout, no longer block -// however, continue re-attempting connection at other specified time interval -// once connection establishes, close off routine - type AppConn struct { ReconfigurableClientConn } -func CreateNewGRPCClient(ctx context.Context, cloudCfg *logging.CloudConfig, logger logging.Logger) (rpc.ClientConn, error) { - grpcURL, err := url.Parse(cloudCfg.AppAddress) +func NewAppConn(ctx context.Context, cloud *config.Cloud, logger logging.Logger) (*AppConn, error) { + grpcURL, err := url.Parse(cloud.AppAddress) if err != nil { return nil, err } + dialOpts := dialOpts(cloud) + + if grpcURL.Scheme == "http" { + dialOpts = append(dialOpts, rpc.WithInsecure()) + } + + appConn := &AppConn{} + + appConn.connMu.Lock() + defer appConn.connMu.Unlock() + appConn.conn, err = rpc.DialDirectGRPC(ctx, grpcURL.Host, logger, dialOpts...) + if err != nil { + if errors.Is(err, context.DeadlineExceeded) { + // TODO(RSDK-8292): run background job to attempt connection + } else { + return nil, err + } + } + + return appConn, nil +} + +func dialOpts(cloud *config.Cloud) []rpc.DialOption { dialOpts := make([]rpc.DialOption, 0, 2) // Only add credentials when secret is set. - if cloudCfg.Secret != "" { - dialOpts = append(dialOpts, rpc.WithEntityCredentials(cloudCfg.ID, + if cloud.Secret != "" { + dialOpts = append(dialOpts, rpc.WithEntityCredentials(cloud.ID, rpc.Credentials{ Type: "robot-secret", - Payload: cloudCfg.Secret, + Payload: cloud.Secret, }, )) } - - if grpcURL.Scheme == "http" { - dialOpts = append(dialOpts, rpc.WithInsecure()) - } - - return rpc.DialDirectGRPC(ctx, grpcURL.Host, logger, dialOpts...) + return dialOpts } diff --git a/web/server/entrypoint.go b/web/server/entrypoint.go index 070cd752ad8..003710b667c 100644 --- a/web/server/entrypoint.go +++ b/web/server/entrypoint.go @@ -24,6 +24,7 @@ import ( "go.viam.com/utils/rpc" "go.viam.com/rdk/config" + "go.viam.com/rdk/grpc" "go.viam.com/rdk/logging" "go.viam.com/rdk/resource" "go.viam.com/rdk/robot" @@ -31,6 +32,7 @@ import ( "go.viam.com/rdk/robot/web" weboptions "go.viam.com/rdk/robot/web/options" rutils "go.viam.com/rdk/utils" + "go.viam.com/rdk/utils/contextutils" ) var viamDotDir = filepath.Join(rutils.PlatformHomeDir(), ".viam") @@ -191,13 +193,16 @@ func RunServer(ctx context.Context, args []string, _ logging.Logger) (err error) // Start remote logging with config from disk. // This is to ensure we make our best effort to write logs for failures loading the remote config. if cfgFromDisk.Cloud != nil && (cfgFromDisk.Cloud.LogPath != "" || cfgFromDisk.Cloud.AppAddress != "") { + ctxWithTimeout, ctxWithTimeoutCancel := config.GetTimeoutCtx(ctx, true, cfgFromDisk.Cloud.ID) + appConn, err := grpc.NewAppConn(ctxWithTimeout, cfgFromDisk.Cloud, logger) // TODO(RSDK-8292): [q] what logger should I pass here? + ctxWithTimeoutCancel() netAppender, err := logging.NewNetAppender( &logging.CloudConfig{ AppAddress: cfgFromDisk.Cloud.AppAddress, ID: cfgFromDisk.Cloud.ID, Secret: cfgFromDisk.Cloud.Secret, }, - nil, false, logger.Sublogger("networking").Sublogger("netlogger"), + appConn, false, logger.Sublogger("networking").Sublogger("netlogger"), ) if err != nil { return err From 1306520e79b10eb089157e959d13077b8cc03649 Mon Sep 17 00:00:00 2001 From: Bashar Eid Date: Mon, 27 Jan 2025 12:04:43 -0500 Subject: [PATCH 03/52] repeatedly retry connecting to App --- grpc/app_conn.go | 33 +++++++++++++++++++++++++++++---- web/server/entrypoint.go | 1 - 2 files changed, 29 insertions(+), 5 deletions(-) diff --git a/grpc/app_conn.go b/grpc/app_conn.go index 12022c1b2a9..013487ffc63 100644 --- a/grpc/app_conn.go +++ b/grpc/app_conn.go @@ -9,12 +9,16 @@ import ( "go.viam.com/rdk/config" "go.viam.com/rdk/logging" - "go.viam.com/utils" "go.viam.com/utils/rpc" ) type AppConn struct { ReconfigurableClientConn + + // Err stores the most recent error returned by the serialized dial attempts running in the background. It can also be used to tell + // whether dial attempts are currently happening; If err is a non-nil value, dial attempts have stopped. Accesses to Err should respect + // ReconfigurableClientConn.connMu + Err error } func NewAppConn(ctx context.Context, cloud *config.Cloud, logger logging.Logger) (*AppConn, error) { @@ -31,13 +35,34 @@ func NewAppConn(ctx context.Context, cloud *config.Cloud, logger logging.Logger) appConn := &AppConn{} - appConn.connMu.Lock() - defer appConn.connMu.Unlock() + // a lock is not necessary here because this call is blocking appConn.conn, err = rpc.DialDirectGRPC(ctx, grpcURL.Host, logger, dialOpts...) if err != nil { if errors.Is(err, context.DeadlineExceeded) { - // TODO(RSDK-8292): run background job to attempt connection + go func() { + for { + appConn.connMu.Lock() + defer appConn.connMu.Unlock() + + // TODO(RSDK-8292): [qu] should I use ctx instead of context.Background() + ctxWithTimeOut, ctxWithTimeOutCancel := context.WithTimeout(context.Background(), 5*time.Second) + defer ctxWithTimeOutCancel() + + appConn.conn, err = rpc.DialDirectGRPC(ctxWithTimeOut, grpcURL.Host, logger, dialOpts...) + if errors.Is(err, context.DeadlineExceeded) { + appConn.connMu.Unlock() + + // only dial again if previous attempt timed out + continue + } + + appConn.Err = err + + break + } + }() } else { + appConn.Err = err return nil, err } } diff --git a/web/server/entrypoint.go b/web/server/entrypoint.go index 003710b667c..134ec7215ee 100644 --- a/web/server/entrypoint.go +++ b/web/server/entrypoint.go @@ -32,7 +32,6 @@ import ( "go.viam.com/rdk/robot/web" weboptions "go.viam.com/rdk/robot/web/options" rutils "go.viam.com/rdk/utils" - "go.viam.com/rdk/utils/contextutils" ) var viamDotDir = filepath.Join(rutils.PlatformHomeDir(), ".viam") From 5e5a791466cfc662bd4bbfd339fb823de2f07869 Mon Sep 17 00:00:00 2001 From: Bashar Eid Date: Mon, 27 Jan 2025 12:09:26 -0500 Subject: [PATCH 04/52] no longer store initial error --- grpc/app_conn.go | 1 - 1 file changed, 1 deletion(-) diff --git a/grpc/app_conn.go b/grpc/app_conn.go index 013487ffc63..e64065dfa29 100644 --- a/grpc/app_conn.go +++ b/grpc/app_conn.go @@ -62,7 +62,6 @@ func NewAppConn(ctx context.Context, cloud *config.Cloud, logger logging.Logger) } }() } else { - appConn.Err = err return nil, err } } From 7543b726b86962e60ed4065f98f4cb879391bf67 Mon Sep 17 00:00:00 2001 From: Bashar Eid Date: Mon, 27 Jan 2025 12:16:26 -0500 Subject: [PATCH 05/52] lint --- config/reader.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/config/reader.go b/config/reader.go index 7b4fc780173..29ae031403f 100644 --- a/config/reader.go +++ b/config/reader.go @@ -257,7 +257,7 @@ func readFromCloud( if !cfg.Cloud.SignalingInsecure && (checkForNewCert || tls.certificate == "" || tls.privateKey == "") { logger.Debug("reading tlsCertificate from the cloud") - ctxWithTimeout, cancel := getTimeoutCtx(ctx, shouldReadFromCache, cloudCfg.ID) + ctxWithTimeout, cancel := GetTimeoutCtx(ctx, shouldReadFromCache, cloudCfg.ID) certData, err := readCertificateDataFromCloudGRPC(ctxWithTimeout, cloudCfg, logger) if err != nil { cancel() @@ -609,7 +609,7 @@ func processConfig(unprocessedConfig *Config, fromCloud bool, logger logging.Log func getFromCloudOrCache(ctx context.Context, cloudCfg *Cloud, shouldReadFromCache bool, logger logging.Logger) (*Config, bool, error) { var cached bool - ctxWithTimeout, cancel := getTimeoutCtx(ctx, shouldReadFromCache, cloudCfg.ID) + ctxWithTimeout, cancel := GetTimeoutCtx(ctx, shouldReadFromCache, cloudCfg.ID) defer cancel() cfg, errorShouldCheckCache, err := getFromCloudGRPC(ctxWithTimeout, cloudCfg, logger) From b333e4893bb634ecacac4000f4300e312c524fd5 Mon Sep 17 00:00:00 2001 From: Bashar Eid Date: Mon, 27 Jan 2025 13:23:44 -0500 Subject: [PATCH 06/52] lint again --- grpc/app_conn.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/grpc/app_conn.go b/grpc/app_conn.go index e64065dfa29..a74b4138fae 100644 --- a/grpc/app_conn.go +++ b/grpc/app_conn.go @@ -42,11 +42,9 @@ func NewAppConn(ctx context.Context, cloud *config.Cloud, logger logging.Logger) go func() { for { appConn.connMu.Lock() - defer appConn.connMu.Unlock() // TODO(RSDK-8292): [qu] should I use ctx instead of context.Background() ctxWithTimeOut, ctxWithTimeOutCancel := context.WithTimeout(context.Background(), 5*time.Second) - defer ctxWithTimeOutCancel() appConn.conn, err = rpc.DialDirectGRPC(ctxWithTimeOut, grpcURL.Host, logger, dialOpts...) if errors.Is(err, context.DeadlineExceeded) { @@ -56,10 +54,14 @@ func NewAppConn(ctx context.Context, cloud *config.Cloud, logger logging.Logger) continue } + ctxWithTimeOutCancel() + appConn.Err = err break } + + appConn.connMu.Unlock() }() } else { return nil, err From 8827cd4a98592041d28bd1bdfca990487c5e47e6 Mon Sep 17 00:00:00 2001 From: Bashar Eid Date: Mon, 27 Jan 2025 13:26:47 -0500 Subject: [PATCH 07/52] remove TODO comment --- grpc/app_conn.go | 1 - 1 file changed, 1 deletion(-) diff --git a/grpc/app_conn.go b/grpc/app_conn.go index a74b4138fae..2d53e2890f7 100644 --- a/grpc/app_conn.go +++ b/grpc/app_conn.go @@ -43,7 +43,6 @@ func NewAppConn(ctx context.Context, cloud *config.Cloud, logger logging.Logger) for { appConn.connMu.Lock() - // TODO(RSDK-8292): [qu] should I use ctx instead of context.Background() ctxWithTimeOut, ctxWithTimeOutCancel := context.WithTimeout(context.Background(), 5*time.Second) appConn.conn, err = rpc.DialDirectGRPC(ctxWithTimeOut, grpcURL.Host, logger, dialOpts...) From 67d8e559a884658fb47ca9677d4c65c71ada63a9 Mon Sep 17 00:00:00 2001 From: Bashar Eid Date: Mon, 27 Jan 2025 13:34:51 -0500 Subject: [PATCH 08/52] lint --- web/server/entrypoint.go | 1 + 1 file changed, 1 insertion(+) diff --git a/web/server/entrypoint.go b/web/server/entrypoint.go index 134ec7215ee..173b10abae8 100644 --- a/web/server/entrypoint.go +++ b/web/server/entrypoint.go @@ -195,6 +195,7 @@ func RunServer(ctx context.Context, args []string, _ logging.Logger) (err error) ctxWithTimeout, ctxWithTimeoutCancel := config.GetTimeoutCtx(ctx, true, cfgFromDisk.Cloud.ID) appConn, err := grpc.NewAppConn(ctxWithTimeout, cfgFromDisk.Cloud, logger) // TODO(RSDK-8292): [q] what logger should I pass here? ctxWithTimeoutCancel() + netAppender, err := logging.NewNetAppender( &logging.CloudConfig{ AppAddress: cfgFromDisk.Cloud.AppAddress, From 13945878d8a4a187838594d37396d60fe437b86e Mon Sep 17 00:00:00 2001 From: Bashar Eid Date: Mon, 27 Jan 2025 13:38:09 -0500 Subject: [PATCH 09/52] log error --- web/server/entrypoint.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/web/server/entrypoint.go b/web/server/entrypoint.go index 173b10abae8..efa576e4815 100644 --- a/web/server/entrypoint.go +++ b/web/server/entrypoint.go @@ -194,6 +194,9 @@ func RunServer(ctx context.Context, args []string, _ logging.Logger) (err error) if cfgFromDisk.Cloud != nil && (cfgFromDisk.Cloud.LogPath != "" || cfgFromDisk.Cloud.AppAddress != "") { ctxWithTimeout, ctxWithTimeoutCancel := config.GetTimeoutCtx(ctx, true, cfgFromDisk.Cloud.ID) appConn, err := grpc.NewAppConn(ctxWithTimeout, cfgFromDisk.Cloud, logger) // TODO(RSDK-8292): [q] what logger should I pass here? + if err != nil { + logger.Info("error establishing global App gRPC connection:", err) + } ctxWithTimeoutCancel() netAppender, err := logging.NewNetAppender( From 88e576b4706ccafb05e91f2e4d4a8f9e1396eaeb Mon Sep 17 00:00:00 2001 From: Bashar Eid Date: Mon, 27 Jan 2025 13:47:17 -0500 Subject: [PATCH 10/52] return error --- web/server/entrypoint.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/web/server/entrypoint.go b/web/server/entrypoint.go index efa576e4815..39bb57a8c7a 100644 --- a/web/server/entrypoint.go +++ b/web/server/entrypoint.go @@ -195,7 +195,7 @@ func RunServer(ctx context.Context, args []string, _ logging.Logger) (err error) ctxWithTimeout, ctxWithTimeoutCancel := config.GetTimeoutCtx(ctx, true, cfgFromDisk.Cloud.ID) appConn, err := grpc.NewAppConn(ctxWithTimeout, cfgFromDisk.Cloud, logger) // TODO(RSDK-8292): [q] what logger should I pass here? if err != nil { - logger.Info("error establishing global App gRPC connection:", err) + return err } ctxWithTimeoutCancel() From c9d46a9fa7a3a9714058a10c25b7e3996f1f5c33 Mon Sep 17 00:00:00 2001 From: Bashar Eid Date: Mon, 27 Jan 2025 13:58:31 -0500 Subject: [PATCH 11/52] lint --- config/reader.go | 2 ++ grpc/app_conn.go | 8 +++++++- 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/config/reader.go b/config/reader.go index 29ae031403f..064491e99db 100644 --- a/config/reader.go +++ b/config/reader.go @@ -181,6 +181,8 @@ func isLocationSecretsEqual(prevCloud, cloud *Cloud) bool { return true } +// GetTimeoutCtx returns a context [and its cancel function] with a timeout value determined by whether we are behind a proxy and whether a +// cached config exists. func GetTimeoutCtx(ctx context.Context, shouldReadFromCache bool, id string) (context.Context, func()) { timeout := readTimeout // When environment indicates we are behind a proxy, bump timeout. Network diff --git a/grpc/app_conn.go b/grpc/app_conn.go index 2d53e2890f7..8f5f64b23fd 100644 --- a/grpc/app_conn.go +++ b/grpc/app_conn.go @@ -6,12 +6,14 @@ import ( "time" "github.com/pkg/errors" + "go.viam.com/utils/rpc" "go.viam.com/rdk/config" "go.viam.com/rdk/logging" - "go.viam.com/utils/rpc" ) +// AppConn maintains an underlying client connection meant to be used globally to connect to App. The AppConn constructor repeatedly +// attempts to dial App until a connection is successfully established. type AppConn struct { ReconfigurableClientConn @@ -21,6 +23,10 @@ type AppConn struct { Err error } +// NewAppConn creates an AppConn instance with a gRPC client connection to App. An initial dial attempt blocks. If it errors, the error is +// returned. If it times out, an AppConn object will return with a nil underlying client connection. Serialized attempts at establishing a +// connection to App will continue to occur, however, in a background Goroutine. These attempts will continue until a connection is made or +// an error that is not a context.DeadlineExceeded occurs - in which case the resulting error will be stored in AppConn.Err. func NewAppConn(ctx context.Context, cloud *config.Cloud, logger logging.Logger) (*AppConn, error) { grpcURL, err := url.Parse(cloud.AppAddress) if err != nil { From a974c16c63a319714bbd99dd3891cd7960748988 Mon Sep 17 00:00:00 2001 From: Bashar Eid Date: Mon, 27 Jan 2025 14:12:49 -0500 Subject: [PATCH 12/52] use sublogger --- web/server/entrypoint.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/web/server/entrypoint.go b/web/server/entrypoint.go index 39bb57a8c7a..78dfb9a2744 100644 --- a/web/server/entrypoint.go +++ b/web/server/entrypoint.go @@ -193,7 +193,7 @@ func RunServer(ctx context.Context, args []string, _ logging.Logger) (err error) // This is to ensure we make our best effort to write logs for failures loading the remote config. if cfgFromDisk.Cloud != nil && (cfgFromDisk.Cloud.LogPath != "" || cfgFromDisk.Cloud.AppAddress != "") { ctxWithTimeout, ctxWithTimeoutCancel := config.GetTimeoutCtx(ctx, true, cfgFromDisk.Cloud.ID) - appConn, err := grpc.NewAppConn(ctxWithTimeout, cfgFromDisk.Cloud, logger) // TODO(RSDK-8292): [q] what logger should I pass here? + appConn, err := grpc.NewAppConn(ctxWithTimeout, cfgFromDisk.Cloud, logger.Sublogger("networking").Sublogger("app_connection")) if err != nil { return err } From 1fd26c1029a0a96c34247e737ed136ff359a6552 Mon Sep 17 00:00:00 2001 From: Bashar Eid Date: Mon, 27 Jan 2025 14:15:27 -0500 Subject: [PATCH 13/52] create context.Context within NewAppConn() --- grpc/app_conn.go | 5 ++++- web/server/entrypoint.go | 4 +--- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/grpc/app_conn.go b/grpc/app_conn.go index 8f5f64b23fd..0a81ce31eb5 100644 --- a/grpc/app_conn.go +++ b/grpc/app_conn.go @@ -41,8 +41,11 @@ func NewAppConn(ctx context.Context, cloud *config.Cloud, logger logging.Logger) appConn := &AppConn{} + ctxWithTimeout, ctxWithTimeoutCancel := config.GetTimeoutCtx(ctx, true, cloud.ID) + defer ctxWithTimeoutCancel() + // a lock is not necessary here because this call is blocking - appConn.conn, err = rpc.DialDirectGRPC(ctx, grpcURL.Host, logger, dialOpts...) + appConn.conn, err = rpc.DialDirectGRPC(ctxWithTimeout, grpcURL.Host, logger, dialOpts...) if err != nil { if errors.Is(err, context.DeadlineExceeded) { go func() { diff --git a/web/server/entrypoint.go b/web/server/entrypoint.go index 78dfb9a2744..220f4edbf73 100644 --- a/web/server/entrypoint.go +++ b/web/server/entrypoint.go @@ -192,12 +192,10 @@ func RunServer(ctx context.Context, args []string, _ logging.Logger) (err error) // Start remote logging with config from disk. // This is to ensure we make our best effort to write logs for failures loading the remote config. if cfgFromDisk.Cloud != nil && (cfgFromDisk.Cloud.LogPath != "" || cfgFromDisk.Cloud.AppAddress != "") { - ctxWithTimeout, ctxWithTimeoutCancel := config.GetTimeoutCtx(ctx, true, cfgFromDisk.Cloud.ID) - appConn, err := grpc.NewAppConn(ctxWithTimeout, cfgFromDisk.Cloud, logger.Sublogger("networking").Sublogger("app_connection")) + appConn, err := grpc.NewAppConn(ctx, cfgFromDisk.Cloud, logger.Sublogger("networking").Sublogger("app_connection")) if err != nil { return err } - ctxWithTimeoutCancel() netAppender, err := logging.NewNetAppender( &logging.CloudConfig{ From 0511a68265f7c0706a17590f397e44bfe903c304 Mon Sep 17 00:00:00 2001 From: Bashar Eid Date: Mon, 27 Jan 2025 14:16:32 -0500 Subject: [PATCH 14/52] use context.Context passed in as function parameter in repeated dials to App --- grpc/app_conn.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/grpc/app_conn.go b/grpc/app_conn.go index 0a81ce31eb5..f2ab1fc7eca 100644 --- a/grpc/app_conn.go +++ b/grpc/app_conn.go @@ -52,7 +52,7 @@ func NewAppConn(ctx context.Context, cloud *config.Cloud, logger logging.Logger) for { appConn.connMu.Lock() - ctxWithTimeOut, ctxWithTimeOutCancel := context.WithTimeout(context.Background(), 5*time.Second) + ctxWithTimeOut, ctxWithTimeOutCancel := context.WithTimeout(ctx, 5*time.Second) appConn.conn, err = rpc.DialDirectGRPC(ctxWithTimeOut, grpcURL.Host, logger, dialOpts...) if errors.Is(err, context.DeadlineExceeded) { From b87a1b5c003d5344ca54c8ca055a815a0b872115 Mon Sep 17 00:00:00 2001 From: Bashar Eid Date: Mon, 27 Jan 2025 14:21:24 -0500 Subject: [PATCH 15/52] no longer store error --- grpc/app_conn.go | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/grpc/app_conn.go b/grpc/app_conn.go index f2ab1fc7eca..867a295720a 100644 --- a/grpc/app_conn.go +++ b/grpc/app_conn.go @@ -16,17 +16,12 @@ import ( // attempts to dial App until a connection is successfully established. type AppConn struct { ReconfigurableClientConn - - // Err stores the most recent error returned by the serialized dial attempts running in the background. It can also be used to tell - // whether dial attempts are currently happening; If err is a non-nil value, dial attempts have stopped. Accesses to Err should respect - // ReconfigurableClientConn.connMu - Err error } // NewAppConn creates an AppConn instance with a gRPC client connection to App. An initial dial attempt blocks. If it errors, the error is // returned. If it times out, an AppConn object will return with a nil underlying client connection. Serialized attempts at establishing a // connection to App will continue to occur, however, in a background Goroutine. These attempts will continue until a connection is made or -// an error that is not a context.DeadlineExceeded occurs - in which case the resulting error will be stored in AppConn.Err. +// an error that is not a context.DeadlineExceeded occurs. func NewAppConn(ctx context.Context, cloud *config.Cloud, logger logging.Logger) (*AppConn, error) { grpcURL, err := url.Parse(cloud.AppAddress) if err != nil { @@ -64,8 +59,6 @@ func NewAppConn(ctx context.Context, cloud *config.Cloud, logger logging.Logger) ctxWithTimeOutCancel() - appConn.Err = err - break } From 5c4f7d384597085bf637dcbaac057995ab211e7f Mon Sep 17 00:00:00 2001 From: Bashar Eid Date: Mon, 27 Jan 2025 14:29:30 -0500 Subject: [PATCH 16/52] always dial again --- grpc/app_conn.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/grpc/app_conn.go b/grpc/app_conn.go index 867a295720a..faa584b31eb 100644 --- a/grpc/app_conn.go +++ b/grpc/app_conn.go @@ -50,10 +50,9 @@ func NewAppConn(ctx context.Context, cloud *config.Cloud, logger logging.Logger) ctxWithTimeOut, ctxWithTimeOutCancel := context.WithTimeout(ctx, 5*time.Second) appConn.conn, err = rpc.DialDirectGRPC(ctxWithTimeOut, grpcURL.Host, logger, dialOpts...) - if errors.Is(err, context.DeadlineExceeded) { + if err != nil { appConn.connMu.Unlock() - // only dial again if previous attempt timed out continue } From c4799cf1c346d0b1347eae3dcccfdbe51c4cb8c5 Mon Sep 17 00:00:00 2001 From: Bashar Eid Date: Mon, 27 Jan 2025 14:47:06 -0500 Subject: [PATCH 17/52] rename logger --- grpc/app_conn.go | 2 +- web/server/entrypoint.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/grpc/app_conn.go b/grpc/app_conn.go index faa584b31eb..172e74f30b7 100644 --- a/grpc/app_conn.go +++ b/grpc/app_conn.go @@ -48,9 +48,9 @@ func NewAppConn(ctx context.Context, cloud *config.Cloud, logger logging.Logger) appConn.connMu.Lock() ctxWithTimeOut, ctxWithTimeOutCancel := context.WithTimeout(ctx, 5*time.Second) - appConn.conn, err = rpc.DialDirectGRPC(ctxWithTimeOut, grpcURL.Host, logger, dialOpts...) if err != nil { + logger.Debug("error while dialing App. Could not establish global, unified connection", err) appConn.connMu.Unlock() continue diff --git a/web/server/entrypoint.go b/web/server/entrypoint.go index 220f4edbf73..e7741e6cca3 100644 --- a/web/server/entrypoint.go +++ b/web/server/entrypoint.go @@ -192,7 +192,7 @@ func RunServer(ctx context.Context, args []string, _ logging.Logger) (err error) // Start remote logging with config from disk. // This is to ensure we make our best effort to write logs for failures loading the remote config. if cfgFromDisk.Cloud != nil && (cfgFromDisk.Cloud.LogPath != "" || cfgFromDisk.Cloud.AppAddress != "") { - appConn, err := grpc.NewAppConn(ctx, cfgFromDisk.Cloud, logger.Sublogger("networking").Sublogger("app_connection")) + appConn, err := grpc.NewAppConn(ctx, cfgFromDisk.Cloud, logger.Sublogger("networking").Sublogger("appconnection")) if err != nil { return err } From 11aa86fc76c086e7a0329ee7366ed770f5ecd005 Mon Sep 17 00:00:00 2001 From: Bashar Eid Date: Mon, 27 Jan 2025 14:55:30 -0500 Subject: [PATCH 18/52] add TODO comment --- logging/net_appender.go | 1 + 1 file changed, 1 insertion(+) diff --git a/logging/net_appender.go b/logging/net_appender.go index b62179544e3..b7feda561bd 100644 --- a/logging/net_appender.go +++ b/logging/net_appender.go @@ -406,6 +406,7 @@ func (w *remoteLogWriterGRPC) write(ctx context.Context, logs []*commonpb.LogEnt return nil } +// TODO(RSDK-8292): [qu] do we need this anymore? func (w *remoteLogWriterGRPC) getOrCreateClient(ctx context.Context) (apppb.RobotServiceClient, error) { w.clientMutex.Lock() defer w.clientMutex.Unlock() From 92ceb64f9b30934bf64cf5269c691aa323bb31ee Mon Sep 17 00:00:00 2001 From: Bashar Eid Date: Mon, 27 Jan 2025 15:31:45 -0500 Subject: [PATCH 19/52] use unified connection in RestartChecker --- web/server/entrypoint.go | 20 ++++++++++---------- web/server/restart_checker.go | 7 +------ 2 files changed, 11 insertions(+), 16 deletions(-) diff --git a/web/server/entrypoint.go b/web/server/entrypoint.go index e7741e6cca3..70dbef9d951 100644 --- a/web/server/entrypoint.go +++ b/web/server/entrypoint.go @@ -189,14 +189,14 @@ func RunServer(ctx context.Context, args []string, _ logging.Logger) (err error) defer exporter.Stop() } + appConn, err := grpc.NewAppConn(ctx, cfgFromDisk.Cloud, logger.Sublogger("networking").Sublogger("appconnection")) + if err != nil { + return err + } + // Start remote logging with config from disk. // This is to ensure we make our best effort to write logs for failures loading the remote config. if cfgFromDisk.Cloud != nil && (cfgFromDisk.Cloud.LogPath != "" || cfgFromDisk.Cloud.AppAddress != "") { - appConn, err := grpc.NewAppConn(ctx, cfgFromDisk.Cloud, logger.Sublogger("networking").Sublogger("appconnection")) - if err != nil { - return err - } - netAppender, err := logging.NewNetAppender( &logging.CloudConfig{ AppAddress: cfgFromDisk.Cloud.AppAddress, @@ -223,7 +223,7 @@ func RunServer(ctx context.Context, args []string, _ logging.Logger) (err error) } // Run the server with remote logging enabled. - err = server.runServer(ctx) + err = server.runServer(ctx, appConn) if err != nil { logger.Error("Fatal error running server, exiting now: ", err) } @@ -233,7 +233,7 @@ func RunServer(ctx context.Context, args []string, _ logging.Logger) (err error) // runServer is an entry point to starting the web server after the local config is read. Once the local config // is read the logger may be initialized to remote log. This ensure we capture errors starting up the server and report to the cloud. -func (s *robotServer) runServer(ctx context.Context) error { +func (s *robotServer) runServer(ctx context.Context, conn rpc.ClientConn) error { initialReadCtx, cancel := context.WithTimeout(ctx, time.Second*5) cfg, err := config.Read(initialReadCtx, s.args.ConfigFile, s.logger) if err != nil { @@ -243,7 +243,7 @@ func (s *robotServer) runServer(ctx context.Context) error { cancel() config.UpdateFileConfigDebug(cfg.Debug) - err = s.serveWeb(ctx, cfg) + err = s.serveWeb(ctx, cfg, conn) if err != nil { s.logger.Errorw("error serving web", "error", err) } @@ -363,7 +363,7 @@ func (s *robotServer) configWatcher(ctx context.Context, currCfg *config.Config, } } -func (s *robotServer) serveWeb(ctx context.Context, cfg *config.Config) (err error) { +func (s *robotServer) serveWeb(ctx context.Context, cfg *config.Config, conn rpc.ClientConn) (err error) { ctx, cancel := context.WithCancel(ctx) hungShutdownDeadline := 90 * time.Second @@ -463,7 +463,7 @@ func (s *robotServer) serveWeb(ctx context.Context, cfg *config.Config) (err err cloudRestartCheckerActive = make(chan struct{}) utils.PanicCapturingGo(func() { defer close(cloudRestartCheckerActive) - restartCheck, err := newRestartChecker(ctx, cfg.Cloud, s.logger) + restartCheck, err := newRestartChecker(ctx, cfg.Cloud, s.logger, conn) if err != nil { if errors.Is(err, context.Canceled) { return diff --git a/web/server/restart_checker.go b/web/server/restart_checker.go index 2727edd5189..50251b9a35d 100644 --- a/web/server/restart_checker.go +++ b/web/server/restart_checker.go @@ -59,12 +59,7 @@ func (c *needsRestartCheckerGRPC) needsRestart(ctx context.Context) (bool, time. return res.MustRestart, restartInterval, nil } -func newRestartChecker(ctx context.Context, cfg *config.Cloud, logger logging.Logger) (needsRestartChecker, error) { - client, err := config.CreateNewGRPCClient(ctx, cfg, logger) - if err != nil { - return nil, err - } - +func newRestartChecker(ctx context.Context, cfg *config.Cloud, logger logging.Logger, client rpc.ClientConn) (needsRestartChecker, error) { return &needsRestartCheckerGRPC{ cfg: cfg, logger: logger, From a8a5af36cfc5e37641c4f1a8072fccf7ced610ad Mon Sep 17 00:00:00 2001 From: Bashar Eid Date: Mon, 27 Jan 2025 15:46:58 -0500 Subject: [PATCH 20/52] remove context.Context --- web/server/entrypoint.go | 5 +---- web/server/restart_checker.go | 2 +- 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/web/server/entrypoint.go b/web/server/entrypoint.go index 70dbef9d951..5e87aaef176 100644 --- a/web/server/entrypoint.go +++ b/web/server/entrypoint.go @@ -463,11 +463,8 @@ func (s *robotServer) serveWeb(ctx context.Context, cfg *config.Config, conn rpc cloudRestartCheckerActive = make(chan struct{}) utils.PanicCapturingGo(func() { defer close(cloudRestartCheckerActive) - restartCheck, err := newRestartChecker(ctx, cfg.Cloud, s.logger, conn) + restartCheck, err := newRestartChecker(cfg.Cloud, s.logger, conn) if err != nil { - if errors.Is(err, context.Canceled) { - return - } s.logger.Errorw("error creating restart checker", "error", err) panic(fmt.Sprintf("error creating restart checker: %v", err)) } diff --git a/web/server/restart_checker.go b/web/server/restart_checker.go index 50251b9a35d..088cd37eb60 100644 --- a/web/server/restart_checker.go +++ b/web/server/restart_checker.go @@ -59,7 +59,7 @@ func (c *needsRestartCheckerGRPC) needsRestart(ctx context.Context) (bool, time. return res.MustRestart, restartInterval, nil } -func newRestartChecker(ctx context.Context, cfg *config.Cloud, logger logging.Logger, client rpc.ClientConn) (needsRestartChecker, error) { +func newRestartChecker(cfg *config.Cloud, logger logging.Logger, client rpc.ClientConn) (needsRestartChecker, error) { return &needsRestartCheckerGRPC{ cfg: cfg, logger: logger, From 1baabf6cc34bccc282b18dd4c7173c48433329d6 Mon Sep 17 00:00:00 2001 From: Bashar Eid Date: Mon, 27 Jan 2025 15:48:35 -0500 Subject: [PATCH 21/52] lint --- web/server/entrypoint.go | 6 +----- web/server/restart_checker.go | 4 ++-- 2 files changed, 3 insertions(+), 7 deletions(-) diff --git a/web/server/entrypoint.go b/web/server/entrypoint.go index 5e87aaef176..070160f469a 100644 --- a/web/server/entrypoint.go +++ b/web/server/entrypoint.go @@ -463,11 +463,7 @@ func (s *robotServer) serveWeb(ctx context.Context, cfg *config.Config, conn rpc cloudRestartCheckerActive = make(chan struct{}) utils.PanicCapturingGo(func() { defer close(cloudRestartCheckerActive) - restartCheck, err := newRestartChecker(cfg.Cloud, s.logger, conn) - if err != nil { - s.logger.Errorw("error creating restart checker", "error", err) - panic(fmt.Sprintf("error creating restart checker: %v", err)) - } + restartCheck := newRestartChecker(cfg.Cloud, s.logger, conn) defer restartCheck.close() restartInterval := defaultNeedsRestartCheckInterval diff --git a/web/server/restart_checker.go b/web/server/restart_checker.go index 088cd37eb60..1875a4201a1 100644 --- a/web/server/restart_checker.go +++ b/web/server/restart_checker.go @@ -59,10 +59,10 @@ func (c *needsRestartCheckerGRPC) needsRestart(ctx context.Context) (bool, time. return res.MustRestart, restartInterval, nil } -func newRestartChecker(cfg *config.Cloud, logger logging.Logger, client rpc.ClientConn) (needsRestartChecker, error) { +func newRestartChecker(cfg *config.Cloud, logger logging.Logger, client rpc.ClientConn) needsRestartChecker { return &needsRestartCheckerGRPC{ cfg: cfg, logger: logger, client: client, - }, nil + } } From eb11c9ea1b09fbd5f33a07b27ec4de20a5731ef2 Mon Sep 17 00:00:00 2001 From: Bashar Eid Date: Mon, 27 Jan 2025 16:35:11 -0500 Subject: [PATCH 22/52] check parent context --- grpc/app_conn.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/grpc/app_conn.go b/grpc/app_conn.go index 172e74f30b7..ec700d942c0 100644 --- a/grpc/app_conn.go +++ b/grpc/app_conn.go @@ -47,6 +47,10 @@ func NewAppConn(ctx context.Context, cloud *config.Cloud, logger logging.Logger) for { appConn.connMu.Lock() + if ctx.Err() != nil { + break + } + ctxWithTimeOut, ctxWithTimeOutCancel := context.WithTimeout(ctx, 5*time.Second) appConn.conn, err = rpc.DialDirectGRPC(ctxWithTimeOut, grpcURL.Host, logger, dialOpts...) if err != nil { From 6f3f163a28aa34322ad605ab7d1a091962989bc7 Mon Sep 17 00:00:00 2001 From: Bashar Eid Date: Mon, 27 Jan 2025 16:40:51 -0500 Subject: [PATCH 23/52] check parent context --- grpc/app_conn.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/grpc/app_conn.go b/grpc/app_conn.go index ec700d942c0..4cf16592ff9 100644 --- a/grpc/app_conn.go +++ b/grpc/app_conn.go @@ -45,14 +45,15 @@ func NewAppConn(ctx context.Context, cloud *config.Cloud, logger logging.Logger) if errors.Is(err, context.DeadlineExceeded) { go func() { for { - appConn.connMu.Lock() - if ctx.Err() != nil { break } + appConn.connMu.Lock() + ctxWithTimeOut, ctxWithTimeOutCancel := context.WithTimeout(ctx, 5*time.Second) appConn.conn, err = rpc.DialDirectGRPC(ctxWithTimeOut, grpcURL.Host, logger, dialOpts...) + ctxWithTimeOutCancel() if err != nil { logger.Debug("error while dialing App. Could not establish global, unified connection", err) appConn.connMu.Unlock() @@ -60,8 +61,6 @@ func NewAppConn(ctx context.Context, cloud *config.Cloud, logger logging.Logger) continue } - ctxWithTimeOutCancel() - break } From a3b995603a1f6612e3d108b30887b440a29a860d Mon Sep 17 00:00:00 2001 From: Bashar Eid Date: Tue, 28 Jan 2025 13:32:17 -0500 Subject: [PATCH 24/52] restore web/server/restart_checker.go file --- web/server/restart_checker.go | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/web/server/restart_checker.go b/web/server/restart_checker.go index 1875a4201a1..2727edd5189 100644 --- a/web/server/restart_checker.go +++ b/web/server/restart_checker.go @@ -59,10 +59,15 @@ func (c *needsRestartCheckerGRPC) needsRestart(ctx context.Context) (bool, time. return res.MustRestart, restartInterval, nil } -func newRestartChecker(cfg *config.Cloud, logger logging.Logger, client rpc.ClientConn) needsRestartChecker { +func newRestartChecker(ctx context.Context, cfg *config.Cloud, logger logging.Logger) (needsRestartChecker, error) { + client, err := config.CreateNewGRPCClient(ctx, cfg, logger) + if err != nil { + return nil, err + } + return &needsRestartCheckerGRPC{ cfg: cfg, logger: logger, client: client, - } + }, nil } From 6ecf9ef72500c111655f2632e6db9a2daa579ba7 Mon Sep 17 00:00:00 2001 From: Bashar Eid Date: Tue, 28 Jan 2025 13:36:33 -0500 Subject: [PATCH 25/52] revert to checkpoint --- web/server/entrypoint.go | 27 +++++++++++++++++---------- 1 file changed, 17 insertions(+), 10 deletions(-) diff --git a/web/server/entrypoint.go b/web/server/entrypoint.go index 070160f469a..e7741e6cca3 100644 --- a/web/server/entrypoint.go +++ b/web/server/entrypoint.go @@ -189,14 +189,14 @@ func RunServer(ctx context.Context, args []string, _ logging.Logger) (err error) defer exporter.Stop() } - appConn, err := grpc.NewAppConn(ctx, cfgFromDisk.Cloud, logger.Sublogger("networking").Sublogger("appconnection")) - if err != nil { - return err - } - // Start remote logging with config from disk. // This is to ensure we make our best effort to write logs for failures loading the remote config. if cfgFromDisk.Cloud != nil && (cfgFromDisk.Cloud.LogPath != "" || cfgFromDisk.Cloud.AppAddress != "") { + appConn, err := grpc.NewAppConn(ctx, cfgFromDisk.Cloud, logger.Sublogger("networking").Sublogger("appconnection")) + if err != nil { + return err + } + netAppender, err := logging.NewNetAppender( &logging.CloudConfig{ AppAddress: cfgFromDisk.Cloud.AppAddress, @@ -223,7 +223,7 @@ func RunServer(ctx context.Context, args []string, _ logging.Logger) (err error) } // Run the server with remote logging enabled. - err = server.runServer(ctx, appConn) + err = server.runServer(ctx) if err != nil { logger.Error("Fatal error running server, exiting now: ", err) } @@ -233,7 +233,7 @@ func RunServer(ctx context.Context, args []string, _ logging.Logger) (err error) // runServer is an entry point to starting the web server after the local config is read. Once the local config // is read the logger may be initialized to remote log. This ensure we capture errors starting up the server and report to the cloud. -func (s *robotServer) runServer(ctx context.Context, conn rpc.ClientConn) error { +func (s *robotServer) runServer(ctx context.Context) error { initialReadCtx, cancel := context.WithTimeout(ctx, time.Second*5) cfg, err := config.Read(initialReadCtx, s.args.ConfigFile, s.logger) if err != nil { @@ -243,7 +243,7 @@ func (s *robotServer) runServer(ctx context.Context, conn rpc.ClientConn) error cancel() config.UpdateFileConfigDebug(cfg.Debug) - err = s.serveWeb(ctx, cfg, conn) + err = s.serveWeb(ctx, cfg) if err != nil { s.logger.Errorw("error serving web", "error", err) } @@ -363,7 +363,7 @@ func (s *robotServer) configWatcher(ctx context.Context, currCfg *config.Config, } } -func (s *robotServer) serveWeb(ctx context.Context, cfg *config.Config, conn rpc.ClientConn) (err error) { +func (s *robotServer) serveWeb(ctx context.Context, cfg *config.Config) (err error) { ctx, cancel := context.WithCancel(ctx) hungShutdownDeadline := 90 * time.Second @@ -463,7 +463,14 @@ func (s *robotServer) serveWeb(ctx context.Context, cfg *config.Config, conn rpc cloudRestartCheckerActive = make(chan struct{}) utils.PanicCapturingGo(func() { defer close(cloudRestartCheckerActive) - restartCheck := newRestartChecker(cfg.Cloud, s.logger, conn) + restartCheck, err := newRestartChecker(ctx, cfg.Cloud, s.logger) + if err != nil { + if errors.Is(err, context.Canceled) { + return + } + s.logger.Errorw("error creating restart checker", "error", err) + panic(fmt.Sprintf("error creating restart checker: %v", err)) + } defer restartCheck.close() restartInterval := defaultNeedsRestartCheckInterval From 6303b448614d6751f07cd992e25b4f9eff0926dc Mon Sep 17 00:00:00 2001 From: Bashar Eid Date: Tue, 28 Jan 2025 14:56:39 -0500 Subject: [PATCH 26/52] create connection to App before checking cloud config --- web/server/entrypoint.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/web/server/entrypoint.go b/web/server/entrypoint.go index e7741e6cca3..f4b94763a20 100644 --- a/web/server/entrypoint.go +++ b/web/server/entrypoint.go @@ -189,14 +189,14 @@ func RunServer(ctx context.Context, args []string, _ logging.Logger) (err error) defer exporter.Stop() } + appConn, err := grpc.NewAppConn(ctx, cfgFromDisk.Cloud, logger.Sublogger("networking").Sublogger("appconnection")) + if err != nil { + return err + } + // Start remote logging with config from disk. // This is to ensure we make our best effort to write logs for failures loading the remote config. if cfgFromDisk.Cloud != nil && (cfgFromDisk.Cloud.LogPath != "" || cfgFromDisk.Cloud.AppAddress != "") { - appConn, err := grpc.NewAppConn(ctx, cfgFromDisk.Cloud, logger.Sublogger("networking").Sublogger("appconnection")) - if err != nil { - return err - } - netAppender, err := logging.NewNetAppender( &logging.CloudConfig{ AppAddress: cfgFromDisk.Cloud.AppAddress, From c8c1b5de1ae686e922c31765160e73ca3dda1392 Mon Sep 17 00:00:00 2001 From: Bashar Eid Date: Tue, 28 Jan 2025 15:37:23 -0500 Subject: [PATCH 27/52] refactor error conditions --- grpc/app_conn.go | 42 +++++++++++++++++++++--------------------- 1 file changed, 21 insertions(+), 21 deletions(-) diff --git a/grpc/app_conn.go b/grpc/app_conn.go index 4cf16592ff9..de89d5af883 100644 --- a/grpc/app_conn.go +++ b/grpc/app_conn.go @@ -42,33 +42,33 @@ func NewAppConn(ctx context.Context, cloud *config.Cloud, logger logging.Logger) // a lock is not necessary here because this call is blocking appConn.conn, err = rpc.DialDirectGRPC(ctxWithTimeout, grpcURL.Host, logger, dialOpts...) if err != nil { - if errors.Is(err, context.DeadlineExceeded) { - go func() { - for { - if ctx.Err() != nil { - break - } + if !errors.Is(err, context.DeadlineExceeded) { + return nil, err + } - appConn.connMu.Lock() + go func() { + for { + if ctx.Err() != nil { + break + } - ctxWithTimeOut, ctxWithTimeOutCancel := context.WithTimeout(ctx, 5*time.Second) - appConn.conn, err = rpc.DialDirectGRPC(ctxWithTimeOut, grpcURL.Host, logger, dialOpts...) - ctxWithTimeOutCancel() - if err != nil { - logger.Debug("error while dialing App. Could not establish global, unified connection", err) - appConn.connMu.Unlock() + appConn.connMu.Lock() - continue - } + ctxWithTimeOut, ctxWithTimeOutCancel := context.WithTimeout(ctx, 5*time.Second) + appConn.conn, err = rpc.DialDirectGRPC(ctxWithTimeOut, grpcURL.Host, logger, dialOpts...) + ctxWithTimeOutCancel() + if err != nil { + logger.Debug("error while dialing App. Could not establish global, unified connection", err) + appConn.connMu.Unlock() - break + continue } - appConn.connMu.Unlock() - }() - } else { - return nil, err - } + break + } + + appConn.connMu.Unlock() + }() } return appConn, nil From 43d718f2f3de35dc9b33934d52727ec9fdcbc898 Mon Sep 17 00:00:00 2001 From: Bashar Eid Date: Tue, 28 Jan 2025 15:40:33 -0500 Subject: [PATCH 28/52] defer context cancel and eliminate race condition --- grpc/app_conn.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/grpc/app_conn.go b/grpc/app_conn.go index de89d5af883..1effcdf8faf 100644 --- a/grpc/app_conn.go +++ b/grpc/app_conn.go @@ -49,14 +49,15 @@ func NewAppConn(ctx context.Context, cloud *config.Cloud, logger logging.Logger) go func() { for { if ctx.Err() != nil { - break + return } + ctxWithTimeOut, ctxWithTimeOutCancel := context.WithTimeout(ctx, 5*time.Second) + defer ctxWithTimeOutCancel() + appConn.connMu.Lock() - ctxWithTimeOut, ctxWithTimeOutCancel := context.WithTimeout(ctx, 5*time.Second) appConn.conn, err = rpc.DialDirectGRPC(ctxWithTimeOut, grpcURL.Host, logger, dialOpts...) - ctxWithTimeOutCancel() if err != nil { logger.Debug("error while dialing App. Could not establish global, unified connection", err) appConn.connMu.Unlock() @@ -64,10 +65,9 @@ func NewAppConn(ctx context.Context, cloud *config.Cloud, logger logging.Logger) continue } - break + appConn.connMu.Unlock() + return } - - appConn.connMu.Unlock() }() } From 1963013310379eb6a3f5b7b9e496ceed44c6edf8 Mon Sep 17 00:00:00 2001 From: Bashar Eid Date: Tue, 28 Jan 2025 15:43:07 -0500 Subject: [PATCH 29/52] lint --- grpc/app_conn.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/grpc/app_conn.go b/grpc/app_conn.go index 1effcdf8faf..d024004fdfb 100644 --- a/grpc/app_conn.go +++ b/grpc/app_conn.go @@ -53,11 +53,11 @@ func NewAppConn(ctx context.Context, cloud *config.Cloud, logger logging.Logger) } ctxWithTimeOut, ctxWithTimeOutCancel := context.WithTimeout(ctx, 5*time.Second) - defer ctxWithTimeOutCancel() appConn.connMu.Lock() appConn.conn, err = rpc.DialDirectGRPC(ctxWithTimeOut, grpcURL.Host, logger, dialOpts...) + ctxWithTimeOutCancel() if err != nil { logger.Debug("error while dialing App. Could not establish global, unified connection", err) appConn.connMu.Unlock() @@ -66,6 +66,7 @@ func NewAppConn(ctx context.Context, cloud *config.Cloud, logger logging.Logger) } appConn.connMu.Unlock() + return } }() From 5e2b24b9864cc5b1febc267e250c83b3d9b77434 Mon Sep 17 00:00:00 2001 From: Bashar Eid Date: Tue, 28 Jan 2025 15:44:58 -0500 Subject: [PATCH 30/52] release lock sooner --- grpc/app_conn.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/grpc/app_conn.go b/grpc/app_conn.go index d024004fdfb..70a989d9fec 100644 --- a/grpc/app_conn.go +++ b/grpc/app_conn.go @@ -57,16 +57,14 @@ func NewAppConn(ctx context.Context, cloud *config.Cloud, logger logging.Logger) appConn.connMu.Lock() appConn.conn, err = rpc.DialDirectGRPC(ctxWithTimeOut, grpcURL.Host, logger, dialOpts...) + appConn.connMu.Unlock() ctxWithTimeOutCancel() if err != nil { logger.Debug("error while dialing App. Could not establish global, unified connection", err) - appConn.connMu.Unlock() continue } - appConn.connMu.Unlock() - return } }() From 59101decd16f7fd9c438ab616a2820e4d2768a4d Mon Sep 17 00:00:00 2001 From: Bashar Eid Date: Tue, 28 Jan 2025 16:20:26 -0500 Subject: [PATCH 31/52] use stoppable workers --- grpc/app_conn.go | 56 +++++++++++++++++++++++++--------------- web/server/entrypoint.go | 10 +++---- 2 files changed, 40 insertions(+), 26 deletions(-) diff --git a/grpc/app_conn.go b/grpc/app_conn.go index 70a989d9fec..7c230d35d6f 100644 --- a/grpc/app_conn.go +++ b/grpc/app_conn.go @@ -6,6 +6,7 @@ import ( "time" "github.com/pkg/errors" + "go.viam.com/utils" "go.viam.com/utils/rpc" "go.viam.com/rdk/config" @@ -16,6 +17,8 @@ import ( // attempts to dial App until a connection is successfully established. type AppConn struct { ReconfigurableClientConn + + dialer *utils.StoppableWorkers } // NewAppConn creates an AppConn instance with a gRPC client connection to App. An initial dial attempt blocks. If it errors, the error is @@ -41,38 +44,49 @@ func NewAppConn(ctx context.Context, cloud *config.Cloud, logger logging.Logger) // a lock is not necessary here because this call is blocking appConn.conn, err = rpc.DialDirectGRPC(ctxWithTimeout, grpcURL.Host, logger, dialOpts...) - if err != nil { - if !errors.Is(err, context.DeadlineExceeded) { - return nil, err - } + if err == nil { + return appConn, nil + } - go func() { - for { - if ctx.Err() != nil { - return - } + if !errors.Is(err, context.DeadlineExceeded) { + return nil, err + } - ctxWithTimeOut, ctxWithTimeOutCancel := context.WithTimeout(ctx, 5*time.Second) + appConn.dialer = utils.NewStoppableWorkers(ctx) + + appConn.dialer.Add(func(ctx context.Context) { + for { + if ctx.Err() != nil { + return + } - appConn.connMu.Lock() + ctxWithTimeout, ctxWithTimeoutCancel := context.WithTimeout(ctx, 5*time.Second) - appConn.conn, err = rpc.DialDirectGRPC(ctxWithTimeOut, grpcURL.Host, logger, dialOpts...) - appConn.connMu.Unlock() - ctxWithTimeOutCancel() - if err != nil { - logger.Debug("error while dialing App. Could not establish global, unified connection", err) + appConn.connMu.Lock() - continue - } + appConn.conn, err = rpc.DialDirectGRPC(ctxWithTimeout, grpcURL.Host, logger, dialOpts...) + appConn.connMu.Unlock() + ctxWithTimeoutCancel() + if err != nil { + logger.Debug("error while dialing App. Could not establish global, unified connection", err) - return + continue } - }() - } + return + } + }) + + // if the initial dial attempt fails due to a time out, we return the connection-lacking `AppConn` and a nil `error` return appConn, nil } +func (ac *AppConn) Close() error { + ac.dialer.Stop() + + return ac.ReconfigurableClientConn.Close() +} + func dialOpts(cloud *config.Cloud) []rpc.DialOption { dialOpts := make([]rpc.DialOption, 0, 2) // Only add credentials when secret is set. diff --git a/web/server/entrypoint.go b/web/server/entrypoint.go index f4b94763a20..e7741e6cca3 100644 --- a/web/server/entrypoint.go +++ b/web/server/entrypoint.go @@ -189,14 +189,14 @@ func RunServer(ctx context.Context, args []string, _ logging.Logger) (err error) defer exporter.Stop() } - appConn, err := grpc.NewAppConn(ctx, cfgFromDisk.Cloud, logger.Sublogger("networking").Sublogger("appconnection")) - if err != nil { - return err - } - // Start remote logging with config from disk. // This is to ensure we make our best effort to write logs for failures loading the remote config. if cfgFromDisk.Cloud != nil && (cfgFromDisk.Cloud.LogPath != "" || cfgFromDisk.Cloud.AppAddress != "") { + appConn, err := grpc.NewAppConn(ctx, cfgFromDisk.Cloud, logger.Sublogger("networking").Sublogger("appconnection")) + if err != nil { + return err + } + netAppender, err := logging.NewNetAppender( &logging.CloudConfig{ AppAddress: cfgFromDisk.Cloud.AppAddress, From e2a592e8a8b565bc1e7e13d844af8280da53932e Mon Sep 17 00:00:00 2001 From: Bashar Eid Date: Tue, 28 Jan 2025 16:22:57 -0500 Subject: [PATCH 32/52] add comment to Close() --- grpc/app_conn.go | 1 + 1 file changed, 1 insertion(+) diff --git a/grpc/app_conn.go b/grpc/app_conn.go index 7c230d35d6f..16da657cabe 100644 --- a/grpc/app_conn.go +++ b/grpc/app_conn.go @@ -81,6 +81,7 @@ func NewAppConn(ctx context.Context, cloud *config.Cloud, logger logging.Logger) return appConn, nil } +// Close calls `ReconfigurableClientConn.Close()` in addition to `Stop`ping `dialer` func (ac *AppConn) Close() error { ac.dialer.Stop() From c6269350b2f9481f970bcb99b5c9807e6f5ce468 Mon Sep 17 00:00:00 2001 From: Bashar Eid Date: Tue, 28 Jan 2025 16:31:12 -0500 Subject: [PATCH 33/52] lint --- grpc/app_conn.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/grpc/app_conn.go b/grpc/app_conn.go index 16da657cabe..d1edf8fef7a 100644 --- a/grpc/app_conn.go +++ b/grpc/app_conn.go @@ -81,7 +81,7 @@ func NewAppConn(ctx context.Context, cloud *config.Cloud, logger logging.Logger) return appConn, nil } -// Close calls `ReconfigurableClientConn.Close()` in addition to `Stop`ping `dialer` +// Close calls `ReconfigurableClientConn.Close()` in addition to `Stop`ping `dialer`. func (ac *AppConn) Close() error { ac.dialer.Stop() From 51a9b3d3ca84fd4c5a00a587699be0941d943304 Mon Sep 17 00:00:00 2001 From: Bashar Eid Date: Tue, 28 Jan 2025 16:40:35 -0500 Subject: [PATCH 34/52] make comments more concise --- grpc/app_conn.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/grpc/app_conn.go b/grpc/app_conn.go index d1edf8fef7a..91ce95f911e 100644 --- a/grpc/app_conn.go +++ b/grpc/app_conn.go @@ -42,7 +42,7 @@ func NewAppConn(ctx context.Context, cloud *config.Cloud, logger logging.Logger) ctxWithTimeout, ctxWithTimeoutCancel := config.GetTimeoutCtx(ctx, true, cloud.ID) defer ctxWithTimeoutCancel() - // a lock is not necessary here because this call is blocking + // lock not necessary here because call is blocking appConn.conn, err = rpc.DialDirectGRPC(ctxWithTimeout, grpcURL.Host, logger, dialOpts...) if err == nil { return appConn, nil @@ -77,7 +77,7 @@ func NewAppConn(ctx context.Context, cloud *config.Cloud, logger logging.Logger) } }) - // if the initial dial attempt fails due to a time out, we return the connection-lacking `AppConn` and a nil `error` + // if initial dial attempt fails due to time out, return `AppConn` and nil `error` and no underlying connection return appConn, nil } From 2e9033b9f4bf7e2435f235958971beb1646ae555 Mon Sep 17 00:00:00 2001 From: Bashar Eid Date: Tue, 28 Jan 2025 16:56:09 -0500 Subject: [PATCH 35/52] only stop stoppable workers if non-nil --- grpc/app_conn.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/grpc/app_conn.go b/grpc/app_conn.go index 91ce95f911e..b033463025e 100644 --- a/grpc/app_conn.go +++ b/grpc/app_conn.go @@ -83,7 +83,9 @@ func NewAppConn(ctx context.Context, cloud *config.Cloud, logger logging.Logger) // Close calls `ReconfigurableClientConn.Close()` in addition to `Stop`ping `dialer`. func (ac *AppConn) Close() error { - ac.dialer.Stop() + if ac.dialer != nil { + ac.dialer.Stop() + } return ac.ReconfigurableClientConn.Close() } From 31dee570dd75cb400ae6e16e93b937c189278e89 Mon Sep 17 00:00:00 2001 From: Bashar Eid Date: Tue, 28 Jan 2025 16:57:11 -0500 Subject: [PATCH 36/52] change spacing --- grpc/app_conn.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/grpc/app_conn.go b/grpc/app_conn.go index b033463025e..3c9467560d7 100644 --- a/grpc/app_conn.go +++ b/grpc/app_conn.go @@ -61,9 +61,7 @@ func NewAppConn(ctx context.Context, cloud *config.Cloud, logger logging.Logger) } ctxWithTimeout, ctxWithTimeoutCancel := context.WithTimeout(ctx, 5*time.Second) - appConn.connMu.Lock() - appConn.conn, err = rpc.DialDirectGRPC(ctxWithTimeout, grpcURL.Host, logger, dialOpts...) appConn.connMu.Unlock() ctxWithTimeoutCancel() From f1615a2491bb31e3c2eea988cec99cbcb39581fe Mon Sep 17 00:00:00 2001 From: Bashar Eid Date: Tue, 28 Jan 2025 17:05:42 -0500 Subject: [PATCH 37/52] rephrase comments --- grpc/app_conn.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/grpc/app_conn.go b/grpc/app_conn.go index 3c9467560d7..95bddd87151 100644 --- a/grpc/app_conn.go +++ b/grpc/app_conn.go @@ -75,11 +75,11 @@ func NewAppConn(ctx context.Context, cloud *config.Cloud, logger logging.Logger) } }) - // if initial dial attempt fails due to time out, return `AppConn` and nil `error` and no underlying connection + // if initial dial attempt fails due to time out, return nil error return appConn, nil } -// Close calls `ReconfigurableClientConn.Close()` in addition to `Stop`ping `dialer`. +// Close attempts to close the underlying connection if there is one and stops background dialing attempts func (ac *AppConn) Close() error { if ac.dialer != nil { ac.dialer.Stop() From f035a593d36455a18162a1a09b5d7f2a61037ad4 Mon Sep 17 00:00:00 2001 From: Bashar Eid Date: Tue, 28 Jan 2025 17:09:13 -0500 Subject: [PATCH 38/52] lint --- grpc/app_conn.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/grpc/app_conn.go b/grpc/app_conn.go index 95bddd87151..e7bc4830262 100644 --- a/grpc/app_conn.go +++ b/grpc/app_conn.go @@ -79,7 +79,7 @@ func NewAppConn(ctx context.Context, cloud *config.Cloud, logger logging.Logger) return appConn, nil } -// Close attempts to close the underlying connection if there is one and stops background dialing attempts +// Close attempts to close the underlying connection if there is one and stops background dialing attempts. func (ac *AppConn) Close() error { if ac.dialer != nil { ac.dialer.Stop() From 93b73144ba6d0799778e912095ea82b4bc363d87 Mon Sep 17 00:00:00 2001 From: Bashar Eid Date: Tue, 28 Jan 2025 17:30:13 -0500 Subject: [PATCH 39/52] create connection to app without checking conditions --- web/server/entrypoint.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/web/server/entrypoint.go b/web/server/entrypoint.go index e7741e6cca3..f4b94763a20 100644 --- a/web/server/entrypoint.go +++ b/web/server/entrypoint.go @@ -189,14 +189,14 @@ func RunServer(ctx context.Context, args []string, _ logging.Logger) (err error) defer exporter.Stop() } + appConn, err := grpc.NewAppConn(ctx, cfgFromDisk.Cloud, logger.Sublogger("networking").Sublogger("appconnection")) + if err != nil { + return err + } + // Start remote logging with config from disk. // This is to ensure we make our best effort to write logs for failures loading the remote config. if cfgFromDisk.Cloud != nil && (cfgFromDisk.Cloud.LogPath != "" || cfgFromDisk.Cloud.AppAddress != "") { - appConn, err := grpc.NewAppConn(ctx, cfgFromDisk.Cloud, logger.Sublogger("networking").Sublogger("appconnection")) - if err != nil { - return err - } - netAppender, err := logging.NewNetAppender( &logging.CloudConfig{ AppAddress: cfgFromDisk.Cloud.AppAddress, From 48dc67bb01fd591ccb4389367a8a9e7eca86c3f0 Mon Sep 17 00:00:00 2001 From: Bashar Eid Date: Wed, 29 Jan 2025 09:53:10 -0500 Subject: [PATCH 40/52] check for non-nil cloud config before creating connection to App --- grpc/app_conn.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/grpc/app_conn.go b/grpc/app_conn.go index e7bc4830262..e36611fcfc4 100644 --- a/grpc/app_conn.go +++ b/grpc/app_conn.go @@ -26,6 +26,12 @@ type AppConn struct { // connection to App will continue to occur, however, in a background Goroutine. These attempts will continue until a connection is made or // an error that is not a context.DeadlineExceeded occurs. func NewAppConn(ctx context.Context, cloud *config.Cloud, logger logging.Logger) (*AppConn, error) { + appConn := &AppConn{} + + if cloud == nil { + return appConn, nil + } + grpcURL, err := url.Parse(cloud.AppAddress) if err != nil { return nil, err @@ -37,8 +43,6 @@ func NewAppConn(ctx context.Context, cloud *config.Cloud, logger logging.Logger) dialOpts = append(dialOpts, rpc.WithInsecure()) } - appConn := &AppConn{} - ctxWithTimeout, ctxWithTimeoutCancel := config.GetTimeoutCtx(ctx, true, cloud.ID) defer ctxWithTimeoutCancel() From f0c8f07ce328fa52cef6efecc0c85062b1d10c93 Mon Sep 17 00:00:00 2001 From: Bashar Eid Date: Wed, 29 Jan 2025 10:15:03 -0500 Subject: [PATCH 41/52] use global connection to App in restart checker --- web/server/entrypoint.go | 17 +++++------------ web/server/restart_checker.go | 9 ++------- 2 files changed, 7 insertions(+), 19 deletions(-) diff --git a/web/server/entrypoint.go b/web/server/entrypoint.go index f4b94763a20..070160f469a 100644 --- a/web/server/entrypoint.go +++ b/web/server/entrypoint.go @@ -223,7 +223,7 @@ func RunServer(ctx context.Context, args []string, _ logging.Logger) (err error) } // Run the server with remote logging enabled. - err = server.runServer(ctx) + err = server.runServer(ctx, appConn) if err != nil { logger.Error("Fatal error running server, exiting now: ", err) } @@ -233,7 +233,7 @@ func RunServer(ctx context.Context, args []string, _ logging.Logger) (err error) // runServer is an entry point to starting the web server after the local config is read. Once the local config // is read the logger may be initialized to remote log. This ensure we capture errors starting up the server and report to the cloud. -func (s *robotServer) runServer(ctx context.Context) error { +func (s *robotServer) runServer(ctx context.Context, conn rpc.ClientConn) error { initialReadCtx, cancel := context.WithTimeout(ctx, time.Second*5) cfg, err := config.Read(initialReadCtx, s.args.ConfigFile, s.logger) if err != nil { @@ -243,7 +243,7 @@ func (s *robotServer) runServer(ctx context.Context) error { cancel() config.UpdateFileConfigDebug(cfg.Debug) - err = s.serveWeb(ctx, cfg) + err = s.serveWeb(ctx, cfg, conn) if err != nil { s.logger.Errorw("error serving web", "error", err) } @@ -363,7 +363,7 @@ func (s *robotServer) configWatcher(ctx context.Context, currCfg *config.Config, } } -func (s *robotServer) serveWeb(ctx context.Context, cfg *config.Config) (err error) { +func (s *robotServer) serveWeb(ctx context.Context, cfg *config.Config, conn rpc.ClientConn) (err error) { ctx, cancel := context.WithCancel(ctx) hungShutdownDeadline := 90 * time.Second @@ -463,14 +463,7 @@ func (s *robotServer) serveWeb(ctx context.Context, cfg *config.Config) (err err cloudRestartCheckerActive = make(chan struct{}) utils.PanicCapturingGo(func() { defer close(cloudRestartCheckerActive) - restartCheck, err := newRestartChecker(ctx, cfg.Cloud, s.logger) - if err != nil { - if errors.Is(err, context.Canceled) { - return - } - s.logger.Errorw("error creating restart checker", "error", err) - panic(fmt.Sprintf("error creating restart checker: %v", err)) - } + restartCheck := newRestartChecker(cfg.Cloud, s.logger, conn) defer restartCheck.close() restartInterval := defaultNeedsRestartCheckInterval diff --git a/web/server/restart_checker.go b/web/server/restart_checker.go index 2727edd5189..1875a4201a1 100644 --- a/web/server/restart_checker.go +++ b/web/server/restart_checker.go @@ -59,15 +59,10 @@ func (c *needsRestartCheckerGRPC) needsRestart(ctx context.Context) (bool, time. return res.MustRestart, restartInterval, nil } -func newRestartChecker(ctx context.Context, cfg *config.Cloud, logger logging.Logger) (needsRestartChecker, error) { - client, err := config.CreateNewGRPCClient(ctx, cfg, logger) - if err != nil { - return nil, err - } - +func newRestartChecker(cfg *config.Cloud, logger logging.Logger, client rpc.ClientConn) needsRestartChecker { return &needsRestartCheckerGRPC{ cfg: cfg, logger: logger, client: client, - }, nil + } } From 40e87cf60c1d495b1e1e589ce7f82b08592b6d5c Mon Sep 17 00:00:00 2001 From: Bashar Eid Date: Wed, 29 Jan 2025 10:32:36 -0500 Subject: [PATCH 42/52] clean up code --- grpc/app_conn.go | 12 ++++++------ web/server/entrypoint.go | 2 +- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/grpc/app_conn.go b/grpc/app_conn.go index e36611fcfc4..2ecb2ea9889 100644 --- a/grpc/app_conn.go +++ b/grpc/app_conn.go @@ -13,7 +13,7 @@ import ( "go.viam.com/rdk/logging" ) -// AppConn maintains an underlying client connection meant to be used globally to connect to App. The AppConn constructor repeatedly +// AppConn maintains an underlying client connection meant to be used globally to connect to App. The `AppConn` constructor repeatedly // attempts to dial App until a connection is successfully established. type AppConn struct { ReconfigurableClientConn @@ -21,10 +21,10 @@ type AppConn struct { dialer *utils.StoppableWorkers } -// NewAppConn creates an AppConn instance with a gRPC client connection to App. An initial dial attempt blocks. If it errors, the error is -// returned. If it times out, an AppConn object will return with a nil underlying client connection. Serialized attempts at establishing a -// connection to App will continue to occur, however, in a background Goroutine. These attempts will continue until a connection is made or -// an error that is not a context.DeadlineExceeded occurs. +// NewAppConn creates an `AppConn` instance with a gRPC client connection to App. An initial dial attempt blocks. If it errors, the error +// is returned. If it times out, an `AppConn` object with a nil underlying client connection will return. Serialized attempts at establishing +// a connection to App will continue to occur, however, in a background Goroutine. These attempts will continue until a connection is made. +// If `cloud` is nil, an `AppConn` with a nil underlying connection will return, and the background dialer will not start. func NewAppConn(ctx context.Context, cloud *config.Cloud, logger logging.Logger) (*AppConn, error) { appConn := &AppConn{} @@ -83,7 +83,7 @@ func NewAppConn(ctx context.Context, cloud *config.Cloud, logger logging.Logger) return appConn, nil } -// Close attempts to close the underlying connection if there is one and stops background dialing attempts. +// Close attempts to close the underlying connection and stops background dialing attempts. func (ac *AppConn) Close() error { if ac.dialer != nil { ac.dialer.Stop() diff --git a/web/server/entrypoint.go b/web/server/entrypoint.go index 070160f469a..4283370401d 100644 --- a/web/server/entrypoint.go +++ b/web/server/entrypoint.go @@ -189,7 +189,7 @@ func RunServer(ctx context.Context, args []string, _ logging.Logger) (err error) defer exporter.Stop() } - appConn, err := grpc.NewAppConn(ctx, cfgFromDisk.Cloud, logger.Sublogger("networking").Sublogger("appconnection")) + appConn, err := grpc.NewAppConn(ctx, cfgFromDisk.Cloud, logger.Sublogger("networking").Sublogger("app_connection")) if err != nil { return err } From 3ab3a5386df697d76beb8b078a931a008585e57b Mon Sep 17 00:00:00 2001 From: Bashar Eid Date: Wed, 29 Jan 2025 10:33:56 -0500 Subject: [PATCH 43/52] lint --- grpc/app_conn.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/grpc/app_conn.go b/grpc/app_conn.go index 2ecb2ea9889..fd6c89fd94d 100644 --- a/grpc/app_conn.go +++ b/grpc/app_conn.go @@ -22,9 +22,10 @@ type AppConn struct { } // NewAppConn creates an `AppConn` instance with a gRPC client connection to App. An initial dial attempt blocks. If it errors, the error -// is returned. If it times out, an `AppConn` object with a nil underlying client connection will return. Serialized attempts at establishing -// a connection to App will continue to occur, however, in a background Goroutine. These attempts will continue until a connection is made. -// If `cloud` is nil, an `AppConn` with a nil underlying connection will return, and the background dialer will not start. +// is returned. If it times out, an `AppConn` object with a nil underlying client connection will return. Serialized attempts at +// establishing a connection to App will continue to occur, however, in a background Goroutine. These attempts will continue until a +// connection is made. If `cloud` is nil, an `AppConn` with a nil underlying connection will return, and the background dialer will not +// start. func NewAppConn(ctx context.Context, cloud *config.Cloud, logger logging.Logger) (*AppConn, error) { appConn := &AppConn{} From ded9d43594384aa5c0ca2e20cffb10b37c9067e6 Mon Sep 17 00:00:00 2001 From: Bashar Eid Date: Wed, 29 Jan 2025 12:43:25 -0500 Subject: [PATCH 44/52] add to server connection to App --- web/server/entrypoint.go | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/web/server/entrypoint.go b/web/server/entrypoint.go index 4283370401d..d923e9fba33 100644 --- a/web/server/entrypoint.go +++ b/web/server/entrypoint.go @@ -59,6 +59,7 @@ type robotServer struct { args Arguments logger logging.Logger registry *logging.Registry + conn rpc.ClientConn } func logViamEnvVariables(logger logging.Logger) { @@ -220,10 +221,11 @@ func RunServer(ctx context.Context, args []string, _ logging.Logger) (err error) logger: logger, args: argsParsed, registry: registry, + conn: appConn, } // Run the server with remote logging enabled. - err = server.runServer(ctx, appConn) + err = server.runServer(ctx) if err != nil { logger.Error("Fatal error running server, exiting now: ", err) } @@ -233,7 +235,7 @@ func RunServer(ctx context.Context, args []string, _ logging.Logger) (err error) // runServer is an entry point to starting the web server after the local config is read. Once the local config // is read the logger may be initialized to remote log. This ensure we capture errors starting up the server and report to the cloud. -func (s *robotServer) runServer(ctx context.Context, conn rpc.ClientConn) error { +func (s *robotServer) runServer(ctx context.Context) error { initialReadCtx, cancel := context.WithTimeout(ctx, time.Second*5) cfg, err := config.Read(initialReadCtx, s.args.ConfigFile, s.logger) if err != nil { @@ -243,7 +245,7 @@ func (s *robotServer) runServer(ctx context.Context, conn rpc.ClientConn) error cancel() config.UpdateFileConfigDebug(cfg.Debug) - err = s.serveWeb(ctx, cfg, conn) + err = s.serveWeb(ctx, cfg) if err != nil { s.logger.Errorw("error serving web", "error", err) } @@ -363,7 +365,7 @@ func (s *robotServer) configWatcher(ctx context.Context, currCfg *config.Config, } } -func (s *robotServer) serveWeb(ctx context.Context, cfg *config.Config, conn rpc.ClientConn) (err error) { +func (s *robotServer) serveWeb(ctx context.Context, cfg *config.Config) (err error) { ctx, cancel := context.WithCancel(ctx) hungShutdownDeadline := 90 * time.Second @@ -463,7 +465,7 @@ func (s *robotServer) serveWeb(ctx context.Context, cfg *config.Config, conn rpc cloudRestartCheckerActive = make(chan struct{}) utils.PanicCapturingGo(func() { defer close(cloudRestartCheckerActive) - restartCheck := newRestartChecker(cfg.Cloud, s.logger, conn) + restartCheck := newRestartChecker(cfg.Cloud, s.logger, s.conn) defer restartCheck.close() restartInterval := defaultNeedsRestartCheckInterval From ff6953e8420926050772674b61a714f170e9c652 Mon Sep 17 00:00:00 2001 From: Bashar Eid Date: Wed, 29 Jan 2025 12:44:56 -0500 Subject: [PATCH 45/52] no longer check if error when intially dialing is due to time out --- grpc/app_conn.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/grpc/app_conn.go b/grpc/app_conn.go index fd6c89fd94d..a0aafbeaa26 100644 --- a/grpc/app_conn.go +++ b/grpc/app_conn.go @@ -53,10 +53,6 @@ func NewAppConn(ctx context.Context, cloud *config.Cloud, logger logging.Logger) return appConn, nil } - if !errors.Is(err, context.DeadlineExceeded) { - return nil, err - } - appConn.dialer = utils.NewStoppableWorkers(ctx) appConn.dialer.Add(func(ctx context.Context) { From 3da6860f7a5c4a98d94f09a19943eea0955bfe7d Mon Sep 17 00:00:00 2001 From: Bashar Eid Date: Wed, 29 Jan 2025 12:53:22 -0500 Subject: [PATCH 46/52] use Debugw() --- grpc/app_conn.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/grpc/app_conn.go b/grpc/app_conn.go index a0aafbeaa26..4c45ea06f5d 100644 --- a/grpc/app_conn.go +++ b/grpc/app_conn.go @@ -67,7 +67,7 @@ func NewAppConn(ctx context.Context, cloud *config.Cloud, logger logging.Logger) appConn.connMu.Unlock() ctxWithTimeoutCancel() if err != nil { - logger.Debug("error while dialing App. Could not establish global, unified connection", err) + logger.Debugw("error while dialing App. Could not establish global, unified connection", "error", err) continue } From 8f67114d5270f20b0ea33d973652859e666fd2ad Mon Sep 17 00:00:00 2001 From: Bashar Eid Date: Wed, 29 Jan 2025 13:30:44 -0500 Subject: [PATCH 47/52] lint --- grpc/app_conn.go | 1 - 1 file changed, 1 deletion(-) diff --git a/grpc/app_conn.go b/grpc/app_conn.go index 4c45ea06f5d..be76d4119f2 100644 --- a/grpc/app_conn.go +++ b/grpc/app_conn.go @@ -5,7 +5,6 @@ import ( "net/url" "time" - "github.com/pkg/errors" "go.viam.com/utils" "go.viam.com/utils/rpc" From 725d4d90017d8ed4924f485bc38484f2d654707f Mon Sep 17 00:00:00 2001 From: Bashar Eid Date: Wed, 29 Jan 2025 13:42:51 -0500 Subject: [PATCH 48/52] comment that the connection to App can be nil --- web/server/entrypoint.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/web/server/entrypoint.go b/web/server/entrypoint.go index d923e9fba33..9461ba68137 100644 --- a/web/server/entrypoint.go +++ b/web/server/entrypoint.go @@ -190,6 +190,8 @@ func RunServer(ctx context.Context, args []string, _ logging.Logger) (err error) defer exporter.Stop() } + // the underlying connection in `appConn` can be nil. In this case, a background Goroutine is kicked off to reattempt dials in a + // serialized manner appConn, err := grpc.NewAppConn(ctx, cfgFromDisk.Cloud, logger.Sublogger("networking").Sublogger("app_connection")) if err != nil { return err From 24ba32f1119c75f6b9895abcf81d14563b000180 Mon Sep 17 00:00:00 2001 From: Bashar Eid Date: Wed, 29 Jan 2025 13:50:26 -0500 Subject: [PATCH 49/52] only acquire lock if dial succeeds --- grpc/app_conn.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/grpc/app_conn.go b/grpc/app_conn.go index be76d4119f2..eae399a91ed 100644 --- a/grpc/app_conn.go +++ b/grpc/app_conn.go @@ -61,9 +61,7 @@ func NewAppConn(ctx context.Context, cloud *config.Cloud, logger logging.Logger) } ctxWithTimeout, ctxWithTimeoutCancel := context.WithTimeout(ctx, 5*time.Second) - appConn.connMu.Lock() - appConn.conn, err = rpc.DialDirectGRPC(ctxWithTimeout, grpcURL.Host, logger, dialOpts...) - appConn.connMu.Unlock() + conn, err := rpc.DialDirectGRPC(ctxWithTimeout, grpcURL.Host, logger, dialOpts...) ctxWithTimeoutCancel() if err != nil { logger.Debugw("error while dialing App. Could not establish global, unified connection", "error", err) @@ -71,6 +69,10 @@ func NewAppConn(ctx context.Context, cloud *config.Cloud, logger logging.Logger) continue } + appConn.connMu.Lock() + defer appConn.connMu.Unlock() + appConn.conn = conn + return } }) From 728d69277b039e65fe88bb635e1ca5d1b94df2dd Mon Sep 17 00:00:00 2001 From: Bashar Eid Date: Wed, 29 Jan 2025 13:55:29 -0500 Subject: [PATCH 50/52] lint --- grpc/app_conn.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/grpc/app_conn.go b/grpc/app_conn.go index eae399a91ed..d5f71a8ee13 100644 --- a/grpc/app_conn.go +++ b/grpc/app_conn.go @@ -70,8 +70,8 @@ func NewAppConn(ctx context.Context, cloud *config.Cloud, logger logging.Logger) } appConn.connMu.Lock() - defer appConn.connMu.Unlock() appConn.conn = conn + appConn.connMu.Unlock() return } From 78fa9245bfbf3d1e7e7c042d40cc562392155b5a Mon Sep 17 00:00:00 2001 From: Bashar Eid Date: Wed, 29 Jan 2025 14:27:20 -0500 Subject: [PATCH 51/52] remove comment from final return statment in AppConn constructor --- grpc/app_conn.go | 1 - 1 file changed, 1 deletion(-) diff --git a/grpc/app_conn.go b/grpc/app_conn.go index d5f71a8ee13..1bff75c4da1 100644 --- a/grpc/app_conn.go +++ b/grpc/app_conn.go @@ -77,7 +77,6 @@ func NewAppConn(ctx context.Context, cloud *config.Cloud, logger logging.Logger) } }) - // if initial dial attempt fails due to time out, return nil error return appConn, nil } From b477b5a5e383911cf99f8b06b3bac97d6fe57d89 Mon Sep 17 00:00:00 2001 From: Bashar Eid Date: Wed, 29 Jan 2025 14:27:46 -0500 Subject: [PATCH 52/52] remove TODO comment --- logging/net_appender.go | 1 - 1 file changed, 1 deletion(-) diff --git a/logging/net_appender.go b/logging/net_appender.go index b7feda561bd..b62179544e3 100644 --- a/logging/net_appender.go +++ b/logging/net_appender.go @@ -406,7 +406,6 @@ func (w *remoteLogWriterGRPC) write(ctx context.Context, logs []*commonpb.LogEnt return nil } -// TODO(RSDK-8292): [qu] do we need this anymore? func (w *remoteLogWriterGRPC) getOrCreateClient(ctx context.Context) (apppb.RobotServiceClient, error) { w.clientMutex.Lock() defer w.clientMutex.Unlock()