From 1bdf680dcc97ab1cb9252f280cbd7e87fcc0d386 Mon Sep 17 00:00:00 2001 From: Nathan Pierce Date: Thu, 5 Dec 2024 11:15:07 -0600 Subject: [PATCH] cli tests + fixes for the way the process handles failures --- README.md | 4 + VERSION | 2 +- internal/anka/cli.go | 111 ++++++++--- internal/anka/vm.go | 7 +- internal/config/config.go | 31 ++-- internal/database/database.go | 18 +- internal/github/client.go | 43 +++-- internal/logging/logging.go | 17 +- internal/metrics/aggregator.go | 6 +- internal/metrics/metrics.go | 81 ++++++--- internal/run/run.go | 23 ++- main.go | 71 ++++++-- plugins/handlers/github/github.go | 243 ++++++++++++++++--------- plugins/receivers/github/github.go | 93 ++++++---- tests/cli-test-capacity.yml | 22 +++ tests/cli-test-empty.yml | 0 tests/cli-test-no-db.yml | 23 +++ tests/cli-test-no-log-directory.yml | 7 + tests/cli-test-no-plugin-name.yml | 16 ++ tests/cli-test-no-plugins.yml | 2 + tests/cli-test-non-existent-plugin.yml | 22 +++ tests/cli-test-start-stop.yml | 24 +++ tests/cli-test.bash | 148 +++++++++++++++ trigger-test-runs.bash | 2 +- 24 files changed, 769 insertions(+), 247 deletions(-) create mode 100644 tests/cli-test-capacity.yml create mode 100644 tests/cli-test-empty.yml create mode 100644 tests/cli-test-no-db.yml create mode 100644 tests/cli-test-no-log-directory.yml create mode 100644 tests/cli-test-no-plugin-name.yml create mode 100644 tests/cli-test-no-plugins.yml create mode 100644 tests/cli-test-non-existent-plugin.yml create mode 100644 tests/cli-test-start-stop.yml create mode 100755 tests/cli-test.bash diff --git a/README.md b/README.md index 5abd1b8..bafa2f8 100644 --- a/README.md +++ b/README.md @@ -535,6 +535,10 @@ For example, the `github` plugin will update the metrics for the plugin it is ru But metrics.UpdateService can also update things like `LastSuccess`, and `LastFailure`. See `metrics.UpdateService` for more information. +## FAQs + +- Can I guarantee that the logs for Anklet will contain the `anklet (and all plugins) shut down` message? + - No, there is no guarantee an error, not thrown from inside of a plugin, will do a graceful shutdown. ## Copyright diff --git a/VERSION b/VERSION index 2bd77c7..2774f85 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -0.9.4 \ No newline at end of file +0.10.0 \ No newline at end of file diff --git a/internal/anka/cli.go b/internal/anka/cli.go index d6ef3ea..d015338 100644 --- a/internal/anka/cli.go +++ b/internal/anka/cli.go @@ -35,12 +35,12 @@ type Cli struct { RegistryPullMutex sync.Mutex } -func GetAnkaCLIFromContext(pluginCtx context.Context) *Cli { +func GetAnkaCLIFromContext(pluginCtx context.Context) (*Cli, error) { ankaCLI, ok := pluginCtx.Value(config.ContextKey("ankacli")).(*Cli) if !ok { - panic("function GetAnkaCLIFromContext failed") + return nil, fmt.Errorf("GetAnkaCLIFromContext failed") } - return ankaCLI + return ankaCLI, nil } func NewCLI(pluginCtx context.Context) (*Cli, error) { @@ -82,7 +82,10 @@ func NewCLI(pluginCtx context.Context) (*Cli, error) { } func (cli *Cli) Execute(pluginCtx context.Context, args ...string) ([]byte, int, error) { - logger := logging.GetLoggerFromContext(pluginCtx) + logger, err := logging.GetLoggerFromContext(pluginCtx) + if err != nil { + return nil, 0, err + } if args[2] != "list" { // hide spammy list command logger.DebugContext(pluginCtx, "executing", "command", strings.Join(args, " ")) } @@ -157,12 +160,18 @@ func (cli *Cli) ExecuteAndParseJsonOnError(pluginCtx context.Context, args ...st } func (cli *Cli) AnkaRun(pluginCtx context.Context, args ...string) error { - vm := GetAnkaVmFromContext(pluginCtx) + vm, err := GetAnkaVmFromContext(pluginCtx) + if err != nil { + return err + } runOutput, exitCode, err := cli.Execute(pluginCtx, "anka", "-j", "run", vm.Name, "bash", "-c", strings.Join(args, " ")) if exitCode != 0 || err != nil { return fmt.Errorf("command execution failed with code %d: %s %s", exitCode, string(runOutput), err) } - logger := logging.GetLoggerFromContext(pluginCtx) + logger, err := logging.GetLoggerFromContext(pluginCtx) + if err != nil { + return err + } logger.DebugContext(pluginCtx, "command executed successfully", "stdout", string(runOutput)) return nil } @@ -171,8 +180,14 @@ func (cli *Cli) AnkaRegistryPull(workerCtx context.Context, pluginCtx context.Co if pluginCtx.Err() != nil { return nil, fmt.Errorf("context canceled before AnkaRegistryPull") } - logger := logging.GetLoggerFromContext(pluginCtx) - ctxPlugin := config.GetPluginFromContext(pluginCtx) + logger, err := logging.GetLoggerFromContext(pluginCtx) + if err != nil { + return nil, err + } + ctxPlugin, err := config.GetPluginFromContext(pluginCtx) + if err != nil { + return nil, err + } var registryExtra []string if ctxPlugin.RegistryURL != "" { registryExtra = []string{"--remote", ctxPlugin.RegistryURL} @@ -187,7 +202,10 @@ func (cli *Cli) AnkaRegistryPull(workerCtx context.Context, pluginCtx context.Co } logger.DebugContext(pluginCtx, "pulling template to host") - metricsData := metrics.GetMetricsDataFromContext(workerCtx) + metricsData, err := metrics.GetMetricsDataFromContext(workerCtx) + if err != nil { + return nil, err + } defer metricsData.SetStatus(pluginCtx, logger, "running") @@ -205,7 +223,10 @@ func (cli *Cli) AnkaRegistryPull(workerCtx context.Context, pluginCtx context.Co } func (cli *Cli) AnkaDelete(workerCtx context.Context, pluginCtx context.Context, vm *VM) error { - logger := logging.GetLoggerFromContext(pluginCtx) + logger, err := logging.GetLoggerFromContext(pluginCtx) + if err != nil { + return err + } deleteOutput, err := cli.ExecuteParseJson(pluginCtx, "anka", "-j", "delete", "--yes", vm.Name) if err != nil { logger.ErrorContext(pluginCtx, "error executing anka delete", "err", err) @@ -214,7 +235,10 @@ func (cli *Cli) AnkaDelete(workerCtx context.Context, pluginCtx context.Context, } logger.DebugContext(pluginCtx, "successfully deleted vm", "std", deleteOutput.Message) // decrement total running VMs - metricsData := metrics.GetMetricsDataFromContext(workerCtx) + metricsData, err := metrics.GetMetricsDataFromContext(workerCtx) + if err != nil { + return err + } metricsData.DecrementTotalRunningVMs() return nil } @@ -223,7 +247,10 @@ func (cli *Cli) ObtainAnkaVM(workerCtx context.Context, pluginCtx context.Contex if pluginCtx.Err() != nil { return pluginCtx, nil, fmt.Errorf("context canceled before ObtainAnkaVMAndName") } - logger := logging.GetLoggerFromContext(pluginCtx) + logger, err := logging.GetLoggerFromContext(pluginCtx) + if err != nil { + return pluginCtx, nil, err + } vmID, err := uuid.NewRandom() if err != nil { logger.ErrorContext(pluginCtx, "error creating uuid for vm name", "err", err) @@ -245,7 +272,10 @@ func (cli *Cli) ObtainAnkaVM(workerCtx context.Context, pluginCtx context.Contex return pluginCtx, vm, err } // increment total running VMs - metricsData := metrics.GetMetricsDataFromContext(workerCtx) + metricsData, err := metrics.GetMetricsDataFromContext(workerCtx) + if err != nil { + return pluginCtx, vm, err + } metricsData.IncrementTotalRunningVMs() return pluginCtx, vm, nil } @@ -254,8 +284,14 @@ func (cli *Cli) AnkaClone(pluginCtx context.Context, template string) error { if pluginCtx.Err() != nil { return fmt.Errorf("context canceled before AnkaClone") } - logger := logging.GetLoggerFromContext(pluginCtx) - vm := GetAnkaVmFromContext(pluginCtx) + logger, err := logging.GetLoggerFromContext(pluginCtx) + if err != nil { + return err + } + vm, err := GetAnkaVmFromContext(pluginCtx) + if err != nil { + return err + } cloneOutput, err := cli.ExecuteParseJson(pluginCtx, "anka", "-j", "clone", template, vm.Name) if err != nil { return err @@ -271,8 +307,14 @@ func (cli *Cli) AnkaStart(pluginCtx context.Context) error { if pluginCtx.Err() != nil { return fmt.Errorf("context canceled before AnkaStart") } - logger := logging.GetLoggerFromContext(pluginCtx) - vm := GetAnkaVmFromContext(pluginCtx) + logger, err := logging.GetLoggerFromContext(pluginCtx) + if err != nil { + return err + } + vm, err := GetAnkaVmFromContext(pluginCtx) + if err != nil { + return err + } startOutput, err := cli.ExecuteParseJson(pluginCtx, "anka", "-j", "start", vm.Name) if err != nil { return err @@ -288,8 +330,14 @@ func (cli *Cli) AnkaCopy(pluginCtx context.Context, filesToCopyIn ...string) err if pluginCtx.Err() != nil { return fmt.Errorf("context canceled before AnkaCopy") } - logger := logging.GetLoggerFromContext(pluginCtx) - vm := GetAnkaVmFromContext(pluginCtx) + logger, err := logging.GetLoggerFromContext(pluginCtx) + if err != nil { + return err + } + vm, err := GetAnkaVmFromContext(pluginCtx) + if err != nil { + return err + } for _, hostLevelFile := range filesToCopyIn { // handle symlinks realPath, err := filepath.EvalSymlinks(hostLevelFile) @@ -312,8 +360,14 @@ func (cli *Cli) AnkaCopy(pluginCtx context.Context, filesToCopyIn ...string) err } func HostHasVmCapacity(pluginCtx context.Context) bool { - logger := logging.GetLoggerFromContext(pluginCtx) - ankaCLI := GetAnkaCLIFromContext(pluginCtx) + logger, err := logging.GetLoggerFromContext(pluginCtx) + if err != nil { + return false + } + ankaCLI, err := GetAnkaCLIFromContext(pluginCtx) + if err != nil { + return false + } // check if there are already two VMS running or not runningVMsList, err := ankaCLI.ExecuteParseJson(pluginCtx, "anka", "-j", "list", "-r") if err != nil { @@ -339,9 +393,18 @@ func HostHasVmCapacity(pluginCtx context.Context) bool { } func (cli *Cli) EnsureVMTemplateExists(workerCtx context.Context, pluginCtx context.Context, targetTemplate string, targetTag string) (error, error) { - logger := logging.GetLoggerFromContext(pluginCtx) - ankaCLI := GetAnkaCLIFromContext(pluginCtx) - globals := config.GetGlobalsFromContext(pluginCtx) + logger, err := logging.GetLoggerFromContext(pluginCtx) + if err != nil { + return nil, err + } + ankaCLI, err := GetAnkaCLIFromContext(pluginCtx) + if err != nil { + return nil, err + } + globals, err := config.GetGlobalsFromContext(pluginCtx) + if err != nil { + return nil, err + } pullTemplate := false list, err := ankaCLI.ExecuteParseJson(pluginCtx, "anka", "-j", "list", targetTemplate) if err != nil { diff --git a/internal/anka/vm.go b/internal/anka/vm.go index af3d10d..75ccd40 100644 --- a/internal/anka/vm.go +++ b/internal/anka/vm.go @@ -2,6 +2,7 @@ package anka import ( "context" + "fmt" "github.com/veertuinc/anklet/internal/config" ) @@ -10,10 +11,10 @@ type VM struct { Name string } -func GetAnkaVmFromContext(ctx context.Context) *VM { +func GetAnkaVmFromContext(ctx context.Context) (*VM, error) { ankaVm, ok := ctx.Value(config.ContextKey("ankavm")).(*VM) if !ok { - panic("function GetAnkaVmFromContext failed") + return nil, fmt.Errorf("GetAnkaVmFromContext failed") } - return ankaVm + return ankaVm, nil } diff --git a/internal/config/config.go b/internal/config/config.go index 0066e58..75869ca 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -1,6 +1,7 @@ package config import ( + "fmt" "os" "strconv" "strings" @@ -191,12 +192,12 @@ func LoadInEnvs(config Config) (Config, error) { return config, nil } -func GetPluginFromContext(ctx context.Context) Plugin { +func GetPluginFromContext(ctx context.Context) (Plugin, error) { plugin, ok := ctx.Value(ContextKey("plugin")).(Plugin) if !ok { - panic("GetPluginFromContext failed") + return Plugin{}, fmt.Errorf("GetPluginFromContext failed") } - return plugin + return plugin, nil } type Globals struct { @@ -206,34 +207,34 @@ type Globals struct { DebugEnabled bool } -func GetGlobalsFromContext(ctx context.Context) Globals { +func GetGlobalsFromContext(ctx context.Context) (Globals, error) { globals, ok := ctx.Value(ContextKey("globals")).(Globals) if !ok { - panic("GetGlobalsFromContext failed") + return Globals{}, fmt.Errorf("GetGlobalsFromContext failed") } - return globals + return globals, nil } -func GetLoadedConfigFromContext(ctx context.Context) *Config { +func GetLoadedConfigFromContext(ctx context.Context) (*Config, error) { config, ok := ctx.Value(ContextKey("config")).(*Config) if !ok { - panic("GetLoadedConfigFromContext failed") + return nil, fmt.Errorf("GetLoadedConfigFromContext failed") } - return config + return config, nil } -func GetIsRepoSetFromContext(ctx context.Context) bool { +func GetIsRepoSetFromContext(ctx context.Context) (bool, error) { isRepoSet, ok := ctx.Value(ContextKey("isRepoSet")).(bool) if !ok { - panic("GetIsRepoSetFromContext failed") + return false, fmt.Errorf("GetIsRepoSetFromContext failed") } - return isRepoSet + return isRepoSet, nil } -func GetConfigFileNameFromContext(ctx context.Context) string { +func GetConfigFileNameFromContext(ctx context.Context) (string, error) { configFileName, ok := ctx.Value(ContextKey("configFileName")).(string) if !ok { - panic("GetConfigFileNameFromContext failed") + return "", fmt.Errorf("GetConfigFileNameFromContext failed") } - return configFileName + return configFileName, nil } diff --git a/internal/database/database.go b/internal/database/database.go index e751f5e..b63e12f 100644 --- a/internal/database/database.go +++ b/internal/database/database.go @@ -57,28 +57,32 @@ func GetDatabaseFromContext(ctx context.Context) (*Database, error) { return database, nil } -func UpdateUniqueRunKey(ctx context.Context, key string) context.Context { +func UpdateUniqueRunKey(ctx context.Context, key string) (context.Context, error) { database, ok := ctx.Value(config.ContextKey("database")).(Database) if !ok { - panic("database not found in context") + return ctx, fmt.Errorf("database not found in context") } database.UniqueRunKey = key ctx = logging.AppendCtx(ctx, slog.String("uniqueRunKey", key)) - return context.WithValue(ctx, config.ContextKey("database"), database) + return context.WithValue(ctx, config.ContextKey("database"), database), nil } -func RemoveUniqueKeyFromDB(ctx context.Context) { +func RemoveUniqueKeyFromDB(ctx context.Context) (context.Context, error) { database, ok := ctx.Value(config.ContextKey("database")).(Database) if !ok { - panic("database not found in context") + return ctx, fmt.Errorf("database not found in context") + } + logging, err := logging.GetLoggerFromContext(ctx) + if err != nil { + return ctx, err } - logging := logging.GetLoggerFromContext(ctx) // we don't use ctx for the database deletion so we avoid getting the cancelled context state, which fails when Del runs deletion, err := database.Client.Del(context.Background(), database.UniqueRunKey).Result() if err != nil { - panic(err) + return nil, err } logging.DebugContext(ctx, fmt.Sprintf("removal of unique key %s from database returned %d (1 is success, 0 failed)", database.UniqueRunKey, deletion)) + return ctx, nil } func CheckIfKeyExists(ctx context.Context, key string) (bool, error) { diff --git a/internal/github/client.go b/internal/github/client.go index 9fda64b..0ead128 100644 --- a/internal/github/client.go +++ b/internal/github/client.go @@ -2,6 +2,7 @@ package github import ( "context" + "fmt" "log/slog" "net/http" "os" @@ -26,28 +27,28 @@ func NewGitHubClientWrapper(client *github.Client) *GitHubClientWrapper { } } -func GetGitHubClientFromContext(ctx context.Context) *github.Client { +func GetGitHubClientFromContext(ctx context.Context) (*github.Client, error) { wrapper, ok := ctx.Value(config.ContextKey("githubwrapperclient")).(*GitHubClientWrapper) if !ok { - panic("GetGitHubClientFromContext failed") + return nil, fmt.Errorf("GetGitHubClientFromContext failed") } - return wrapper.client + return wrapper.client, nil } -func GetRateLimitWaiterClientFromContext(ctx context.Context) *http.Client { +func GetRateLimitWaiterClientFromContext(ctx context.Context) (*http.Client, error) { rateLimiter, ok := ctx.Value(config.ContextKey("rateLimiter")).(*http.Client) if rateLimiter != nil && !ok { - panic("GetRateLimitWaiterClientFromContext failed") + return nil, fmt.Errorf("GetRateLimitWaiterClientFromContext failed") } - return rateLimiter + return rateLimiter, nil } -func GetHttpTransportFromContext(ctx context.Context) *http.Transport { +func GetHttpTransportFromContext(ctx context.Context) (*http.Transport, error) { httpTransport, ok := ctx.Value(config.ContextKey("httpTransport")).(*http.Transport) if httpTransport != nil && !ok { - panic("GetHttpTransportFromContext failed") + return nil, fmt.Errorf("GetHttpTransportFromContext failed") } - return httpTransport + return httpTransport, nil } func AuthenticateAndReturnGitHubClient( @@ -62,9 +63,15 @@ func AuthenticateAndReturnGitHubClient( var client *github.Client var err error var rateLimiter *http.Client - rateLimiter = GetRateLimitWaiterClientFromContext(ctx) + rateLimiter, err = GetRateLimitWaiterClientFromContext(ctx) + if err != nil { + return nil, err + } var httpTransport *http.Transport - httpTransport = GetHttpTransportFromContext(ctx) + httpTransport, err = GetHttpTransportFromContext(ctx) + if err != nil { + return nil, err + } if httpTransport == nil { httpTransport = http.DefaultTransport.(*http.Transport) } @@ -85,9 +92,9 @@ func AuthenticateAndReturnGitHubClient( itr, err := ghinstallation.New(httpTransport, appID, installationID, privateKeyBytes) if err != nil { if strings.Contains(err.Error(), "invalid key") { - panic("error creating github app installation token: " + err.Error() + " (does the key exist on the filesystem?)") + return nil, fmt.Errorf("error creating github app installation token: %s (does the key exist on the filesystem?)", err.Error()) } else { - panic("error creating github app installation token: " + err.Error()) + return nil, fmt.Errorf("error creating github app installation token: %s", err.Error()) } } rateLimiter.Transport = itr @@ -111,8 +118,14 @@ func ExecuteGitHubClientFunction[T any](pluginCtx context.Context, logger *slog. if response.Rate.Remaining <= 10 { // handle primary rate limiting sleepDuration := time.Until(response.Rate.Reset.Time) + time.Second // Adding a second to ensure we're past the reset time logger.WarnContext(innerPluginCtx, "GitHub API rate limit exceeded, sleeping until reset") - metricsData := metrics.GetMetricsDataFromContext(pluginCtx) - ctxPlugin := config.GetPluginFromContext(pluginCtx) + metricsData, err := metrics.GetMetricsDataFromContext(pluginCtx) + if err != nil { + return innerPluginCtx, nil, nil, err + } + ctxPlugin, err := config.GetPluginFromContext(pluginCtx) + if err != nil { + return innerPluginCtx, nil, nil, err + } metricsData.UpdatePlugin(pluginCtx, logger, metrics.PluginBase{ Name: ctxPlugin.Name, Status: "limit_paused", diff --git a/internal/logging/logging.go b/internal/logging/logging.go index e6489d8..1cad53f 100644 --- a/internal/logging/logging.go +++ b/internal/logging/logging.go @@ -2,6 +2,7 @@ package logging import ( "context" + "fmt" "log/slog" "os" "strings" @@ -100,22 +101,28 @@ func AppendCtx(parent context.Context, attr slog.Attr) context.Context { } func Panic(workerCtx context.Context, pluginCtx context.Context, errorMessage string) { - logger := GetLoggerFromContext(pluginCtx) + logger, err := GetLoggerFromContext(pluginCtx) + if err != nil { + panic(err) + } logger.ErrorContext(pluginCtx, errorMessage) panic(errorMessage) } func DevContext(pluginCtx context.Context, errorMessage string) { if strings.ToUpper(os.Getenv("LOG_LEVEL")) == "DEV" { - logger := GetLoggerFromContext(pluginCtx) + logger, err := GetLoggerFromContext(pluginCtx) + if err != nil { + panic(err) + } logger.DebugContext(pluginCtx, errorMessage) } } -func GetLoggerFromContext(ctx context.Context) *slog.Logger { +func GetLoggerFromContext(ctx context.Context) (*slog.Logger, error) { logger, ok := ctx.Value(config.ContextKey("logger")).(*slog.Logger) if !ok { - panic("GetLoggerFromContext failed") + return nil, fmt.Errorf("GetLoggerFromContext failed") } - return logger + return logger, nil } diff --git a/internal/metrics/aggregator.go b/internal/metrics/aggregator.go index b41c42e..6ff5f5a 100644 --- a/internal/metrics/aggregator.go +++ b/internal/metrics/aggregator.go @@ -22,7 +22,11 @@ func (s *Server) StartAggregatorServer(workerCtx context.Context, logger *slog.L logger.ErrorContext(workerCtx, "error getting database client from context", "error", err) return } - loadedConfig := config.GetLoadedConfigFromContext(workerCtx) + loadedConfig, err := config.GetLoadedConfigFromContext(workerCtx) + if err != nil { + logger.ErrorContext(workerCtx, "error getting loaded config from context", "error", err) + return + } if r.URL.Query().Get("format") == "json" { s.handleAggregatorJsonMetrics(workerCtx, logger, databaseContainer, loadedConfig)(w, r) } else if r.URL.Query().Get("format") == "prometheus" { diff --git a/internal/metrics/metrics.go b/internal/metrics/metrics.go index 34d8487..867141c 100644 --- a/internal/metrics/metrics.go +++ b/internal/metrics/metrics.go @@ -61,7 +61,7 @@ type MetricsDataLock struct { MetricsData } -func (m *MetricsDataLock) AddPlugin(plugin interface{}) { +func (m *MetricsDataLock) AddPlugin(plugin interface{}) error { m.Lock() defer m.Unlock() var pluginName string @@ -71,7 +71,7 @@ func (m *MetricsDataLock) AddPlugin(plugin interface{}) { case Plugin: pluginName = pluginTyped.PluginBase.Name default: - panic("unable to get plugin name") + return fmt.Errorf("unable to get plugin name") } for _, plugin := range m.Plugins { var name string @@ -81,13 +81,14 @@ func (m *MetricsDataLock) AddPlugin(plugin interface{}) { case Plugin: name = pluginTyped.PluginBase.Name default: - panic("unable to get plugin name") + return fmt.Errorf("unable to get plugin name") } if name == pluginName { // already exists, don't do anything - return + return nil } } m.Plugins = append(m.Plugins, plugin) + return nil } func (m *MetricsDataLock) IncrementTotalRunningVMs() { @@ -116,12 +117,12 @@ func (m *MetricsDataLock) IncrementTotalFailedRunsSinceStart() { m.TotalFailedRunsSinceStart++ } -func CompareAndUpdateMetrics(currentService interface{}, updatedPlugin interface{}) interface{} { +func CompareAndUpdateMetrics(currentService interface{}, updatedPlugin interface{}) (interface{}, error) { switch currentServiceTyped := currentService.(type) { case Plugin: updated, ok := updatedPlugin.(Plugin) if !ok { - panic("unable to convert updatedPlugin to Plugin") + return nil, fmt.Errorf("unable to convert updatedPlugin to Plugin") } if updated.PluginName != "" { currentServiceTyped.PluginName = updated.PluginName @@ -144,11 +145,11 @@ func CompareAndUpdateMetrics(currentService interface{}, updatedPlugin interface if updated.LastFailedRunJobUrl != "" { currentServiceTyped.LastFailedRunJobUrl = updated.LastFailedRunJobUrl } - return currentServiceTyped + return currentServiceTyped, nil case PluginBase: updated, ok := updatedPlugin.(PluginBase) if !ok { - panic("unable to convert updatedPlugin to PluginBase") + return nil, fmt.Errorf("unable to convert updatedPlugin to PluginBase") } if updated.PluginName != "" { currentServiceTyped.PluginName = updated.PluginName @@ -159,9 +160,9 @@ func CompareAndUpdateMetrics(currentService interface{}, updatedPlugin interface } currentServiceTyped.Status = updated.Status } - return currentServiceTyped + return currentServiceTyped, nil default: - panic("unable to convert currentService to Plugin or PluginBase") + return nil, fmt.Errorf("unable to convert currentService to Plugin or PluginBase") } } @@ -201,16 +202,25 @@ func UpdateSystemMetrics(pluginCtx context.Context, logger *slog.Logger, metrics metricsData.HostDiskUsedBytes = uint64(diskStat.Used) } -func UpdatePlugin(workerCtx context.Context, pluginCtx context.Context, logger *slog.Logger, updatedPlugin interface{}) { - ctxPlugin := config.GetPluginFromContext(pluginCtx) - metricsData := GetMetricsDataFromContext(workerCtx) +func UpdatePlugin(workerCtx context.Context, pluginCtx context.Context, logger *slog.Logger, updatedPlugin interface{}) error { + ctxPlugin, err := config.GetPluginFromContext(pluginCtx) + if err != nil { + return err + } + metricsData, err := GetMetricsDataFromContext(workerCtx) + if err != nil { + return err + } switch updatedPlugin.(type) { case Plugin: for i, currentPluginMetrics := range metricsData.Plugins { switch fullCurrentPluginMetrics := currentPluginMetrics.(type) { case Plugin: if fullCurrentPluginMetrics.PluginBase.Name == ctxPlugin.Name { - newPlugin := CompareAndUpdateMetrics(currentPluginMetrics, updatedPlugin) + newPlugin, err := CompareAndUpdateMetrics(currentPluginMetrics, updatedPlugin) + if err != nil { + return err + } metricsData.Plugins[i] = newPlugin } } @@ -220,28 +230,36 @@ func UpdatePlugin(workerCtx context.Context, pluginCtx context.Context, logger * switch fullCurrentPluginMetrics := currentPluginMetrics.(type) { case PluginBase: if fullCurrentPluginMetrics.Name == ctxPlugin.Name { - newPlugin := CompareAndUpdateMetrics(currentPluginMetrics, updatedPlugin) + newPlugin, err := CompareAndUpdateMetrics(currentPluginMetrics, updatedPlugin) + if err != nil { + return err + } metricsData.Plugins[i] = newPlugin } } } } + return nil } -func (m *MetricsDataLock) UpdatePlugin(pluginCtx context.Context, logger *slog.Logger, updatedPlugin interface{}) { +func (m *MetricsDataLock) UpdatePlugin(pluginCtx context.Context, logger *slog.Logger, updatedPlugin interface{}) error { m.Lock() defer m.Unlock() var name string switch fullUpdatedPlugin := updatedPlugin.(type) { case Plugin: if fullUpdatedPlugin.Name == "" { - panic("updatePlugin.Name is required") + return fmt.Errorf("updatePlugin.Name is required") } for i, plugin := range m.Plugins { switch typedPlugin := plugin.(type) { case Plugin: if fullUpdatedPlugin.Name == typedPlugin.Name { - m.Plugins[i] = CompareAndUpdateMetrics(typedPlugin, updatedPlugin) + newPlugin, err := CompareAndUpdateMetrics(typedPlugin, updatedPlugin) + if err != nil { + return err + } + m.Plugins[i] = newPlugin } } } @@ -251,17 +269,25 @@ func (m *MetricsDataLock) UpdatePlugin(pluginCtx context.Context, logger *slog.L switch typedPlugin := plugin.(type) { case PluginBase: if name == typedPlugin.Name { - m.Plugins[i] = CompareAndUpdateMetrics(typedPlugin, updatedPlugin) + newPlugin, err := CompareAndUpdateMetrics(typedPlugin, updatedPlugin) + if err != nil { + return err + } + m.Plugins[i] = newPlugin } } } } + return nil } -func (m *MetricsDataLock) SetStatus(pluginCtx context.Context, logger *slog.Logger, status string) { +func (m *MetricsDataLock) SetStatus(pluginCtx context.Context, logger *slog.Logger, status string) error { m.Lock() defer m.Unlock() - ctxPlugin := config.GetPluginFromContext(pluginCtx) + ctxPlugin, err := config.GetPluginFromContext(pluginCtx) + if err != nil { + return err + } for i, plugin := range m.Plugins { switch typedPlugin := plugin.(type) { case Plugin: @@ -276,6 +302,7 @@ func (m *MetricsDataLock) SetStatus(pluginCtx context.Context, logger *slog.Logg } } } + return nil } // NewServer creates a new instance of Server @@ -289,7 +316,11 @@ func NewServer(port string) *Server { func (s *Server) Start(parentCtx context.Context, logger *slog.Logger, soloReceiver bool) { http.HandleFunc("/metrics/v1", func(w http.ResponseWriter, r *http.Request) { // update system metrics each call - metricsData := GetMetricsDataFromContext(parentCtx) + metricsData, err := GetMetricsDataFromContext(parentCtx) + if err != nil { + http.Error(w, "failed to get metrics data", http.StatusInternalServerError) + return + } UpdateSystemMetrics(parentCtx, logger, metricsData) // if r.URL.Query().Get("format") == "json" { @@ -523,10 +554,10 @@ func (s *Server) handlePrometheusMetrics(ctx context.Context, soloReceiver bool) } } -func GetMetricsDataFromContext(ctx context.Context) *MetricsDataLock { +func GetMetricsDataFromContext(ctx context.Context) (*MetricsDataLock, error) { metricsData, ok := ctx.Value(config.ContextKey("metrics")).(*MetricsDataLock) if !ok { - panic("GetHttpTransportFromContext failed") + return nil, fmt.Errorf("GetMetricsDataFromContext failed") } - return metricsData + return metricsData, nil } diff --git a/internal/run/run.go b/internal/run/run.go index ab224fd..ac1685f 100644 --- a/internal/run/run.go +++ b/internal/run/run.go @@ -2,6 +2,7 @@ package run import ( "context" + "fmt" "log/slog" "github.com/veertuinc/anklet/internal/config" @@ -18,19 +19,22 @@ func Plugin( logger *slog.Logger, firstPluginStarted chan bool, metricsData *metrics.MetricsDataLock, -) { - ctxPlugin := config.GetPluginFromContext(pluginCtx) +) (context.Context, error) { + ctxPlugin, err := config.GetPluginFromContext(pluginCtx) + if err != nil { + return pluginCtx, err + } // fmt.Printf("%+v\n", service) pluginCtx = logging.AppendCtx(pluginCtx, slog.String("plugin", ctxPlugin.Plugin)) if ctxPlugin.Plugin == "" { - panic("plugin is not set in yaml:plugins:" + ctxPlugin.Name + ":plugin") + return pluginCtx, fmt.Errorf("plugin is not set in yaml:plugins:" + ctxPlugin.Name + ":plugin") } if ctxPlugin.Plugin == "github" { // for { select { case <-pluginCtx.Done(): pluginCancel() - return + return pluginCtx, nil default: // notify the main thread that the service has started select { @@ -38,14 +42,19 @@ func Plugin( default: close(firstPluginStarted) } - github.Run(workerCtx, pluginCtx, pluginCancel, logger, metricsData) + var updatedPluginCtx context.Context + updatedPluginCtx, err = github.Run(workerCtx, pluginCtx, pluginCancel, logger, metricsData) + if err != nil { + return updatedPluginCtx, err + } // metricsData.SetStatus(pluginCtx, logger, "idle") - return // pass back to the main thread/loop + return updatedPluginCtx, nil // pass back to the main thread/loop } // } } else if ctxPlugin.Plugin == "github_receiver" { github_receiver.Run(workerCtx, pluginCtx, pluginCancel, logger, firstPluginStarted, metricsData) } else { - panic("plugin not found: " + ctxPlugin.Plugin) + return pluginCtx, fmt.Errorf("plugin not found") } + return pluginCtx, nil } diff --git a/main.go b/main.go index 8c4678a..0843b86 100644 --- a/main.go +++ b/main.go @@ -71,7 +71,8 @@ func main() { homeDir, err := os.UserHomeDir() if err != nil { - panic(err) + logger.ErrorContext(parentCtx, "unable to get user home directory", "error", err) + os.Exit(1) } var configPath string @@ -93,11 +94,12 @@ func main() { loadedConfig, err := config.LoadConfig(configPath) if err != nil { logger.ErrorContext(parentCtx, "unable to load config.yml (is it in the work_dir, or are you using an absolute path?)", "error", err) - panic(err) + os.Exit(1) } loadedConfig, err = config.LoadInEnvs(loadedConfig) if err != nil { - panic(err) + logger.ErrorContext(parentCtx, "unable to load config.yml from environment variables", "error", err) + os.Exit(1) } logger.InfoContext(parentCtx, "loaded config", slog.Any("config", loadedConfig)) @@ -110,9 +112,13 @@ func main() { parentCtx = context.WithValue(parentCtx, config.ContextKey("suffix"), suffix) if loadedConfig.Log.FileDir != "" { + if !strings.HasSuffix(loadedConfig.Log.FileDir, "/") { + loadedConfig.Log.FileDir += "/" + } logger, fileLocation, err := logging.UpdateLoggerToFile(logger, loadedConfig.Log.FileDir, suffix) if err != nil { - logger.ErrorContext(parentCtx, "error updating logger to file", "error", err) + fmt.Printf("{\"time\":\"%s\",\"level\":\"ERROR\",\"msg\":\"%s\"}\n", time.Now().Format(time.RFC3339), err) + os.Exit(1) } logger.InfoContext(parentCtx, "writing logs to file", slog.String("fileLocation", fileLocation)) logger.InfoContext(parentCtx, "loaded config", slog.Any("config", loadedConfig)) @@ -191,7 +197,7 @@ func main() { rateLimiter, err := github_ratelimit.NewRateLimitWaiterClient(httpTransport) if err != nil { logger.ErrorContext(parentCtx, "error creating github_ratelimit.NewRateLimitWaiterClient", "err", err) - return + os.Exit(1) } parentCtx = context.WithValue(parentCtx, config.ContextKey("rateLimiter"), rateLimiter) } @@ -221,7 +227,11 @@ func main() { } func worker(parentCtx context.Context, logger *slog.Logger, loadedConfig config.Config, sigChan chan os.Signal) { - globals := config.GetGlobalsFromContext(parentCtx) + globals, err := config.GetGlobalsFromContext(parentCtx) + if err != nil { + logger.ErrorContext(parentCtx, "unable to get globals from context", "error", err) + os.Exit(1) + } toRunOnce := globals.RunOnce workerCtx, workerCancel := context.WithCancel(parentCtx) suffix := parentCtx.Value(config.ContextKey("suffix")).(string) @@ -283,7 +293,7 @@ func worker(parentCtx context.Context, logger *slog.Logger, loadedConfig config. ln, err := net.Listen("tcp", ":"+metricsPort) if err != nil { logger.ErrorContext(workerCtx, "metrics port already in use", "port", metricsPort, "error", err) - panic(fmt.Sprintf("metrics port %s is already in use", metricsPort)) + os.Exit(1) } ln.Close() metricsService := metrics.NewServer(metricsPort) @@ -304,7 +314,8 @@ func worker(parentCtx context.Context, logger *slog.Logger, loadedConfig config. Database: databaseDatabase, }) if err != nil { - panic(fmt.Sprintf("unable to access database: %v", err)) + logger.ErrorContext(workerCtx, "unable to access database", "error", err) + os.Exit(1) } workerCtx = context.WithValue(workerCtx, config.ContextKey("database"), databaseContainer) go metricsService.StartAggregatorServer(workerCtx, logger, false) @@ -316,7 +327,7 @@ func worker(parentCtx context.Context, logger *slog.Logger, loadedConfig config. pluginCtx, pluginCancel := context.WithCancel(workerCtx) // Inherit from parent context pluginCtx = logging.AppendCtx(pluginCtx, slog.String("metrics_url", metricsURL)) // check if valid URL - _, err := url.Parse(metricsURL) + _, err = url.Parse(metricsURL) if err != nil { logger.ErrorContext(pluginCtx, "invalid URL", "error", err) pluginCancel() @@ -376,9 +387,14 @@ func worker(parentCtx context.Context, logger *slog.Logger, loadedConfig config. pluginCtx, pluginCancel := context.WithCancel(workerCtx) // Inherit from parent context if plugin.Name == "" { - panic("name is required for plugins") + logger.ErrorContext(pluginCtx, "name is required for plugins") + pluginCancel() + workerCancel() + return } + pluginCtx = logging.AppendCtx(pluginCtx, slog.String("pluginName", plugin.Name)) + if plugin.Repo == "" { logger.InfoContext(pluginCtx, "no repo set for plugin; assuming it's an organization level plugin") pluginCtx = context.WithValue(pluginCtx, config.ContextKey("isRepoSet"), false) @@ -393,8 +409,8 @@ func worker(parentCtx context.Context, logger *slog.Logger, loadedConfig config. plugin.PrivateKey = loadedConfig.GlobalPrivateKey } + // keep this here or the changes to plugin don't get set in the pluginCtx pluginCtx = context.WithValue(pluginCtx, config.ContextKey("plugin"), plugin) - pluginCtx = logging.AppendCtx(pluginCtx, slog.String("pluginName", plugin.Name)) if !strings.Contains(plugin.Plugin, "_receiver") { logging.DevContext(pluginCtx, "plugin is not a receiver; loading the anka CLI") @@ -425,9 +441,10 @@ func worker(parentCtx context.Context, logger *slog.Logger, loadedConfig config. Database: databaseDatabase, }) if err != nil { - panic("unable to access database: " + err.Error()) + logger.ErrorContext(pluginCtx, "unable to access database", "error", err) + pluginCancel() + return } - logger.InfoContext(pluginCtx, "connected to database", slog.Any("database", databaseClient)) pluginCtx = context.WithValue(pluginCtx, config.ContextKey("database"), databaseClient) logging.DevContext(pluginCtx, "connected to database") } @@ -444,13 +461,35 @@ func worker(parentCtx context.Context, logger *slog.Logger, loadedConfig config. return default: // logging.DevContext(pluginCtx, "plugin for loop::default") - run.Plugin(workerCtx, pluginCtx, pluginCancel, logger, firstPluginStarted, metricsData) + updatedPluginCtx, err := run.Plugin( + workerCtx, + pluginCtx, + pluginCancel, + logger, + firstPluginStarted, + metricsData, + ) + if err != nil { + logger.ErrorContext(updatedPluginCtx, "error running plugin", "error", err) + pluginCancel() + // Send SIGQUIT to the main pid + p, err := os.FindProcess(os.Getpid()) + if err != nil { + logger.ErrorContext(updatedPluginCtx, "error finding process", "error", err) + } else { + err = p.Signal(syscall.SIGQUIT) + if err != nil { + logger.ErrorContext(updatedPluginCtx, "error sending SIGQUIT signal", "error", err) + } + } + return + } if workerCtx.Err() != nil || toRunOnce == "true" { pluginCancel() - logger.WarnContext(pluginCtx, shutDownMessage) + logger.WarnContext(updatedPluginCtx, shutDownMessage) return } - metricsData.SetStatus(pluginCtx, logger, "idle") + metricsData.SetStatus(updatedPluginCtx, logger, "idle") select { case <-time.After(time.Duration(plugin.SleepInterval) * time.Second): case <-pluginCtx.Done(): diff --git a/plugins/handlers/github/github.go b/plugins/handlers/github/github.go index 48ff223..2c225e7 100644 --- a/plugins/handlers/github/github.go +++ b/plugins/handlers/github/github.go @@ -121,18 +121,25 @@ func extractLabelValue(labels []string, prefix string) string { } func sendCancelWorkflowRun(pluginCtx context.Context, logger *slog.Logger, workflow WorkflowRunJobDetail) error { - githubClient := internalGithub.GetGitHubClientFromContext(pluginCtx) - ctxPlugin := config.GetPluginFromContext(pluginCtx) + githubClient, err := internalGithub.GetGitHubClientFromContext(pluginCtx) + if err != nil { + return err + } + ctxPlugin, err := config.GetPluginFromContext(pluginCtx) + if err != nil { + return err + } cancelSent := false for { - pluginCtx, workflowRun, _, err := internalGithub.ExecuteGitHubClientFunction(pluginCtx, logger, func() (*github.WorkflowRun, *github.Response, error) { + newPluginCtx, workflowRun, _, err := internalGithub.ExecuteGitHubClientFunction(pluginCtx, logger, func() (*github.WorkflowRun, *github.Response, error) { workflowRun, resp, err := githubClient.Actions.GetWorkflowRunByID(context.Background(), ctxPlugin.Owner, workflow.Repo, workflow.RunID) return workflowRun, resp, err }) if err != nil { - logger.ErrorContext(pluginCtx, "error getting workflow run by ID", "err", err) + logger.ErrorContext(newPluginCtx, "error getting workflow run by ID", "err", err) return err } + pluginCtx = newPluginCtx if *workflowRun.Status == "completed" || (workflowRun.Conclusion != nil && *workflowRun.Conclusion == "cancelled") || cancelSent { @@ -140,15 +147,16 @@ func sendCancelWorkflowRun(pluginCtx context.Context, logger *slog.Logger, workf } else { logger.WarnContext(pluginCtx, "workflow run is still active... waiting for cancellation so we can clean up...", "workflow_run_id", workflow.RunID) if !cancelSent { // this has to happen here so that it doesn't error with "409 Cannot cancel a workflow run that is completed. " if the job is already cancelled - pluginCtx, cancelResponse, _, cancelErr := internalGithub.ExecuteGitHubClientFunction(pluginCtx, logger, func() (*github.Response, *github.Response, error) { + newPluginCtx, cancelResponse, _, cancelErr := internalGithub.ExecuteGitHubClientFunction(pluginCtx, logger, func() (*github.Response, *github.Response, error) { resp, err := githubClient.Actions.CancelWorkflowRunByID(context.Background(), ctxPlugin.Owner, workflow.Repo, workflow.RunID) return resp, nil, err }) // don't use cancelResponse.Response.StatusCode or else it'll error with SIGSEV if cancelErr != nil && !strings.Contains(cancelErr.Error(), "try again later") { - logger.ErrorContext(pluginCtx, "error executing githubClient.Actions.CancelWorkflowRunByID", "err", cancelErr, "response", cancelResponse) + logger.ErrorContext(newPluginCtx, "error executing githubClient.Actions.CancelWorkflowRunByID", "err", cancelErr, "response", cancelResponse) return cancelErr } + pluginCtx = newPluginCtx cancelSent = true logger.WarnContext(pluginCtx, "sent cancel workflow run", "workflow_run_id", workflow.RunID) } @@ -168,11 +176,15 @@ func CheckForCompletedJobs( runOnce bool, failureChannel chan bool, ) { - ctxPlugin := config.GetPluginFromContext(pluginCtx) + ctxPlugin, err := config.GetPluginFromContext(pluginCtx) + if err != nil { + logger.ErrorContext(pluginCtx, "error getting plugin from context", "err", err) + os.Exit(1) + } databaseContainer, err := database.GetDatabaseFromContext(pluginCtx) if err != nil { logger.ErrorContext(pluginCtx, "error getting database from context", "err", err) - logging.Panic(workerCtx, pluginCtx, "error getting database from context") + os.Exit(1) } defer func() { if checkForCompletedJobsMu != nil { @@ -204,7 +216,7 @@ func CheckForCompletedJobs( case <-completedJobChannel: return case <-pluginCtx.Done(): - logger.WarnContext(pluginCtx, "CheckForCompletedJobs"+ctxPlugin.Name+" pluginCtx.Done()") + logging.DevContext(pluginCtx, "CheckForCompletedJobs"+ctxPlugin.Name+" pluginCtx.Done()") return default: } @@ -309,7 +321,11 @@ func cleanup( cleanupMu.Lock() // create an idependent copy of the pluginCtx so we can do cleanup even if pluginCtx got "context canceled" cleanupContext := context.Background() - ctxPlugin := config.GetPluginFromContext(pluginCtx) + ctxPlugin, err := config.GetPluginFromContext(pluginCtx) + if err != nil { + logger.ErrorContext(pluginCtx, "error getting plugin from context", "err", err) + return + } returnToMainQueue, ok := workerCtx.Value(config.ContextKey("returnToMainQueue")).(chan bool) if !ok { logger.ErrorContext(pluginCtx, "error getting returnToMainQueue from context") @@ -385,7 +401,11 @@ func cleanup( logger.ErrorContext(pluginCtx, "error unmarshalling payload to webhook.WorkflowJobPayload", "err", err) return } - ankaCLI := anka.GetAnkaCLIFromContext(pluginCtx) + ankaCLI, err := anka.GetAnkaCLIFromContext(pluginCtx) + if err != nil { + logger.ErrorContext(pluginCtx, "error getting ankaCLI from context", "err", err) + return + } ankaCLI.AnkaDelete(workerCtx, pluginCtx, &vm) databaseContainer.Client.Del(cleanupContext, "anklet/jobs/github/queued/"+ctxPlugin.Owner+"/"+ctxPlugin.Name+"/cleaning") continue // required to keep processing tasks in the db list @@ -437,9 +457,15 @@ func Run( pluginCancel context.CancelFunc, logger *slog.Logger, metricsData *metrics.MetricsDataLock, -) { - ctxPlugin := config.GetPluginFromContext(pluginCtx) - isRepoSet := config.GetIsRepoSetFromContext(pluginCtx) +) (context.Context, error) { + ctxPlugin, err := config.GetPluginFromContext(pluginCtx) + if err != nil { + return pluginCtx, err + } + isRepoSet, err := config.GetIsRepoSetFromContext(pluginCtx) + if err != nil { + return pluginCtx, err + } metricsData.AddPlugin(metrics.Plugin{ PluginBase: &metrics.PluginBase{ @@ -452,18 +478,26 @@ func Run( }, }) - logger.InfoContext(pluginCtx, "checking for jobs....") - - configFileName := config.GetConfigFileNameFromContext(pluginCtx) + configFileName, err := config.GetConfigFileNameFromContext(pluginCtx) + if err != nil { + return pluginCtx, err + } if ctxPlugin.Token == "" && ctxPlugin.PrivateKey == "" { - logging.Panic(workerCtx, pluginCtx, "token or private_key are not set at global level or in "+configFileName+":plugins:"+ctxPlugin.Name+"") + return pluginCtx, fmt.Errorf("token or private_key are not set at global level or in " + configFileName + ":plugins:" + ctxPlugin.Name + "") } if ctxPlugin.PrivateKey != "" && (ctxPlugin.AppID == 0 || ctxPlugin.InstallationID == 0) { - logging.Panic(workerCtx, pluginCtx, "private_key, app_id, and installation_id must all be set in "+configFileName+":plugins:"+ctxPlugin.Name+"") + return pluginCtx, fmt.Errorf("private_key, app_id, and installation_id must all be set in " + configFileName + ":plugins:" + ctxPlugin.Name + "") + } + if strings.HasPrefix(ctxPlugin.PrivateKey, "~/") { + homeDir, err := os.UserHomeDir() + if err != nil { + return pluginCtx, fmt.Errorf("unable to get user home directory: " + err.Error()) + } + ctxPlugin.PrivateKey = filepath.Join(homeDir, ctxPlugin.PrivateKey[2:]) } if ctxPlugin.Owner == "" { - logging.Panic(workerCtx, pluginCtx, "owner is not set in "+configFileName+":plugins:"+ctxPlugin.Name+"") + return pluginCtx, fmt.Errorf("owner is not set in " + configFileName + ":plugins:" + ctxPlugin.Name + "") } // if ctxPlugin.Repo == "" { // logging.Panic(workerCtx, pluginCtx, "repo is not set in anklet.yaml:plugins:"+ctxPlugin.Name+":repo") @@ -471,12 +505,12 @@ func Run( hostHasVmCapacity := anka.HostHasVmCapacity(pluginCtx) if !hostHasVmCapacity { - logger.DebugContext(pluginCtx, "host does not have vm capacity") - return + // logger.DebugContext(pluginCtx, "host does not have vm capacity") + return pluginCtx, fmt.Errorf("host does not have vm capacity") } var githubClient *github.Client - githubClient, err := internalGithub.AuthenticateAndReturnGitHubClient( + githubClient, err = internalGithub.AuthenticateAndReturnGitHubClient( pluginCtx, logger, ctxPlugin.PrivateKey, @@ -485,8 +519,8 @@ func Run( ctxPlugin.Token, ) if err != nil { - logger.ErrorContext(pluginCtx, "error authenticating github client", "err", err) - return + // logger.ErrorContext(pluginCtx, "error authenticating github client", "err", err) + return pluginCtx, fmt.Errorf("error authenticating github client") } githubWrapperClient := internalGithub.NewGitHubClientWrapper(githubClient) pluginCtx = context.WithValue(pluginCtx, config.ContextKey("githubwrapperclient"), githubWrapperClient) @@ -511,8 +545,8 @@ func Run( databaseContainer, err := database.GetDatabaseFromContext(pluginCtx) if err != nil { - logger.ErrorContext(pluginCtx, "error getting database from context", "err", err) - return + // logger.ErrorContext(pluginCtx, "error getting database from context", "err", err) + return pluginCtx, fmt.Errorf("error getting database from context: %s", err.Error()) } defer func() { @@ -543,31 +577,33 @@ func Run( case <-completedJobChannel: logger.InfoContext(pluginCtx, "completed job found at start") completedJobChannel <- github.WorkflowJobEvent{} - return + return pluginCtx, nil case <-pluginCtx.Done(): logger.WarnContext(pluginCtx, "context canceled before completed job found") - return + return pluginCtx, nil default: } + logger.InfoContext(pluginCtx, "checking for jobs....") + var wrappedPayloadJSON string // allow picking up where we left off wrappedPayloadJSON, err = databaseContainer.Client.LIndex(pluginCtx, "anklet/jobs/github/queued/"+ctxPlugin.Owner+"/"+ctxPlugin.Name, -1).Result() if err != nil && err != redis.Nil { - logger.ErrorContext(pluginCtx, "error getting last object from anklet/jobs/github/queued/"+ctxPlugin.Owner+"/"+ctxPlugin.Name, "err", err) - return + // logger.ErrorContext(pluginCtx, "error getting last object from anklet/jobs/github/queued/"+ctxPlugin.Owner+"/"+ctxPlugin.Name, "err", err) + return pluginCtx, fmt.Errorf("error getting last object from anklet/jobs/github/queued/" + ctxPlugin.Owner + "/" + ctxPlugin.Name) } if wrappedPayloadJSON == "" { // if we haven't done anything before, get something from the main queue eldestQueuedJob, err := databaseContainer.Client.LPop(pluginCtx, "anklet/jobs/github/queued/"+ctxPlugin.Owner).Result() if err == redis.Nil { logger.DebugContext(pluginCtx, "no queued jobs found") completedJobChannel <- github.WorkflowJobEvent{} // send true to the channel to stop the check for completed jobs goroutine - return + return pluginCtx, nil } if err != nil { logger.ErrorContext(pluginCtx, "error getting queued jobs", "err", err) metricsData.IncrementTotalFailedRunsSinceStart() - return + return pluginCtx, fmt.Errorf("error getting queued jobs: %s", err.Error()) } databaseContainer.Client.RPush(pluginCtx, "anklet/jobs/github/queued/"+ctxPlugin.Owner+"/"+ctxPlugin.Name, eldestQueuedJob) wrappedPayloadJSON = eldestQueuedJob @@ -575,8 +611,8 @@ func Run( queuedJob, err, typeErr := database.UnwrapPayload[github.WorkflowJobEvent](wrappedPayloadJSON) if err != nil || typeErr != nil { - logger.ErrorContext(pluginCtx, "error unmarshalling job", "err", err) - return + // logger.ErrorContext(pluginCtx, "error unmarshalling job", "err", err) + return pluginCtx, fmt.Errorf("error unmarshalling job: %s", err.Error()) } if !isRepoSet { pluginCtx = logging.AppendCtx(pluginCtx, slog.String("repo", *queuedJob.Repo.Name)) @@ -586,7 +622,6 @@ func Run( pluginCtx = logging.AppendCtx(pluginCtx, slog.Int64("workflowJobRunID", *queuedJob.WorkflowJob.RunID)) pluginCtx = logging.AppendCtx(pluginCtx, slog.String("workflowName", *queuedJob.WorkflowJob.WorkflowName)) pluginCtx = logging.AppendCtx(pluginCtx, slog.String("jobURL", *queuedJob.WorkflowJob.HTMLURL)) - logger.DebugContext(pluginCtx, "queued job found", "queuedJob", queuedJob.Action) // check if the job is already completed, so we don't orphan if there is @@ -596,10 +631,10 @@ func Run( case <-completedJobChannel: logger.InfoContext(pluginCtx, "completed job found by CheckForCompletedJobs") completedJobChannel <- github.WorkflowJobEvent{} // send true to the channel to stop the check for completed jobs goroutine - return + return pluginCtx, nil case <-pluginCtx.Done(): logger.WarnContext(pluginCtx, "context canceled before completed job found") - return + return pluginCtx, nil default: } @@ -613,8 +648,8 @@ func Run( // pluginCtx = logging.AppendCtx(pluginCtx, slog.String("uniqueID", uniqueID)) ankaTemplate := extractLabelValue(queuedJob.WorkflowJob.Labels, "anka-template:") if ankaTemplate == "" { - logger.WarnContext(pluginCtx, "warning: unable to find Anka Template specified in labels - skipping") - return + // logger.WarnContext(pluginCtx, "warning: unable to find Anka Template specified in labels - skipping") + return pluginCtx, fmt.Errorf("warning: unable to find Anka Template specified in labels - skipping") } pluginCtx = logging.AppendCtx(pluginCtx, slog.String("ankaTemplate", ankaTemplate)) ankaTemplateTag := extractLabelValue(queuedJob.WorkflowJob.Labels, "anka-template-tag:") @@ -637,7 +672,10 @@ func Run( } // get anka CLI - ankaCLI := anka.GetAnkaCLIFromContext(pluginCtx) + ankaCLI, err := anka.GetAnkaCLIFromContext(pluginCtx) + if err != nil { + return pluginCtx, err + } logger.InfoContext(pluginCtx, "handling anka workflow run job") metricsData.SetStatus(pluginCtx, logger, "running") @@ -648,9 +686,10 @@ func Run( //TODO: be able to interrupt this noTemplateTagExistsError, returnToQueueError := ankaCLI.EnsureVMTemplateExists(workerCtx, pluginCtx, workflowJob.AnkaTemplate, workflowJob.AnkaTemplateTag) if returnToQueueError != nil { + // DO NOT RETURN AN ERROR TO MAIN. It will cause the other job on this node to be cancelled. logger.WarnContext(pluginCtx, "problem ensuring vm template exists on host", "err", returnToQueueError) failureChannel <- true // return to queue so another node can pick it up - return + return pluginCtx, nil } if noTemplateTagExistsError != nil { logger.ErrorContext(pluginCtx, "error ensuring vm template exists on host", "err", noTemplateTagExistsError) @@ -663,9 +702,9 @@ func Run( } if pluginCtx.Err() != nil { - logger.WarnContext(pluginCtx, "context canceled during vm template check") + // logger.WarnContext(pluginCtx, "context canceled during vm template check") failureChannel <- true - return + return pluginCtx, fmt.Errorf("context canceled during vm template check") } if !skipPrep { @@ -686,60 +725,64 @@ func Run( }) } if err != nil { - logger.ErrorContext(pluginCtx, "error creating registration token", "err", err, "response", response) + logger.DebugContext(pluginCtx, "error creating registration token", "err", err, "response", response) metricsData.IncrementTotalFailedRunsSinceStart() failureChannel <- true - return + return pluginCtx, fmt.Errorf("error creating registration token: %s", err.Error()) } if *runnerRegistration.Token == "" { - logger.ErrorContext(pluginCtx, "registration token is empty; something wrong with github or your service token", "response", response) + logger.DebugContext(pluginCtx, "registration token is empty; something wrong with github or your service token", "response", response) failureChannel <- true - return + return pluginCtx, fmt.Errorf("registration token is empty; something wrong with github or your service token") } if pluginCtx.Err() != nil { - logger.WarnContext(pluginCtx, "context canceled before ObtainAnkaVM") + // logger.WarnContext(pluginCtx, "context canceled before ObtainAnkaVM") failureChannel <- true - return + return pluginCtx, fmt.Errorf("context canceled before ObtainAnkaVM") } // Obtain Anka VM (and name) - pluginCtx, vm, err := ankaCLI.ObtainAnkaVM(workerCtx, pluginCtx, workflowJob.AnkaTemplate) + newPluginCtx, vm, err := ankaCLI.ObtainAnkaVM(workerCtx, pluginCtx, workflowJob.AnkaTemplate) wrappedVM := map[string]interface{}{ "type": "anka.VM", "payload": vm, } wrappedVmJSON, wrappedVmErr := json.Marshal(wrappedVM) if wrappedVmErr != nil { - logger.ErrorContext(pluginCtx, "error marshalling vm to json", "err", wrappedVmErr) + // logger.ErrorContext(pluginCtx, "error marshalling vm to json", "err", wrappedVmErr) ankaCLI.AnkaDelete(workerCtx, pluginCtx, vm) failureChannel <- true - return + return newPluginCtx, fmt.Errorf("error marshalling vm to json: %s", wrappedVmErr.Error()) } - pluginCtx = logging.AppendCtx(pluginCtx, slog.String("vmName", vm.Name)) // TODO: THIS ISN"T WORKING + newPluginCtx = logging.AppendCtx(newPluginCtx, slog.String("vmName", vm.Name)) // TODO: THIS ISN"T WORKING + pluginCtx = newPluginCtx dbErr := databaseContainer.Client.RPush(pluginCtx, "anklet/jobs/github/queued/"+ctxPlugin.Owner+"/"+ctxPlugin.Name, wrappedVmJSON).Err() if dbErr != nil { - logger.ErrorContext(pluginCtx, "error pushing vm data to database", "err", dbErr) + // logger.ErrorContext(pluginCtx, "error pushing vm data to database", "err", dbErr) failureChannel <- true - return + return newPluginCtx, fmt.Errorf("error pushing vm data to database: %s", dbErr.Error()) } if err != nil { // this is thrown, for example, when there is no capacity on the host // we must be sure to create the DB entry so cleanup happens properly - logger.ErrorContext(pluginCtx, "error obtaining anka vm", "err", err) + // logger.ErrorContext(pluginCtx, "error obtaining anka vm", "err", err) failureChannel <- true - return + return pluginCtx, fmt.Errorf("error obtaining anka vm: %s", err.Error()) } if pluginCtx.Err() != nil { - logger.WarnContext(pluginCtx, "context canceled after ObtainAnkaVM") + // logger.WarnContext(pluginCtx, "context canceled after ObtainAnkaVM") failureChannel <- true - return + return pluginCtx, fmt.Errorf("context canceled after ObtainAnkaVM") } // Install runner - globals := config.GetGlobalsFromContext(pluginCtx) + globals, err := config.GetGlobalsFromContext(pluginCtx) + if err != nil { + return pluginCtx, err + } installRunnerPath := filepath.Join(globals.PluginsPath, "handlers", "github", "install-runner.bash") registerRunnerPath := filepath.Join(globals.PluginsPath, "handlers", "github", "register-runner.bash") startRunnerPath := filepath.Join(globals.PluginsPath, "handlers", "github", "start-runner.bash") @@ -747,13 +790,13 @@ func Run( _, registerRunnerErr := os.Stat(registerRunnerPath) _, startRunnerErr := os.Stat(startRunnerPath) if installRunnerErr != nil || registerRunnerErr != nil || startRunnerErr != nil { - logger.ErrorContext(pluginCtx, "must include install-runner.bash, register-runner.bash, and start-runner.bash in "+globals.PluginsPath+"/handlers/github/", "err", err) + // logger.ErrorContext(pluginCtx, "must include install-runner.bash, register-runner.bash, and start-runner.bash in "+globals.PluginsPath+"/handlers/github/", "err", err) err := sendCancelWorkflowRun(pluginCtx, logger, workflowJob) if err != nil { logger.ErrorContext(pluginCtx, "error sending cancel workflow run", "err", err) } metricsData.IncrementTotalFailedRunsSinceStart() - return + return pluginCtx, fmt.Errorf("must include install-runner.bash, register-runner.bash, and start-runner.bash in " + globals.PluginsPath + "/handlers/github/") } // Copy runner scripts to VM @@ -764,19 +807,19 @@ func Run( startRunnerPath, ) if err != nil { - logger.ErrorContext(pluginCtx, "error executing anka copy", "err", err) + // logger.ErrorContext(pluginCtx, "error executing anka copy", "err", err) metricsData.IncrementTotalFailedRunsSinceStart() failureChannel <- true - return + return pluginCtx, fmt.Errorf("error executing anka copy: %s", err.Error()) } select { case <-completedJobChannel: - logger.InfoContext(pluginCtx, "completed job found before installing runner") + logger.WarnContext(pluginCtx, "completed job found before installing runner") completedJobChannel <- github.WorkflowJobEvent{} // send true to the channel to stop the check for completed jobs goroutine - return + return pluginCtx, nil case <-pluginCtx.Done(): logger.WarnContext(pluginCtx, "context canceled before install runner") - return + return pluginCtx, nil default: } @@ -784,19 +827,19 @@ func Run( logger.DebugContext(pluginCtx, "installing github runner inside of vm") installRunnerErr = ankaCLI.AnkaRun(pluginCtx, "./install-runner.bash") if installRunnerErr != nil { - logger.ErrorContext(pluginCtx, "error executing install-runner.bash", "err", installRunnerErr) + // logger.ErrorContext(pluginCtx, "error executing install-runner.bash", "err", installRunnerErr) failureChannel <- true - return + return pluginCtx, fmt.Errorf("error executing install-runner.bash: %s", installRunnerErr.Error()) } // Register runner select { case <-completedJobChannel: logger.InfoContext(pluginCtx, "completed job found before registering runner") completedJobChannel <- github.WorkflowJobEvent{} // send true to the channel to stop the check for completed jobs goroutine - return + return pluginCtx, nil case <-pluginCtx.Done(): logger.WarnContext(pluginCtx, "context canceled before register runner") - return + return pluginCtx, nil default: } logger.DebugContext(pluginCtx, "registering github runner inside of vm") @@ -805,9 +848,9 @@ func Run( vm.Name, *runnerRegistration.Token, repositoryURL, strings.Join(workflowJob.Labels, ","), ctxPlugin.RunnerGroup, ) if registerRunnerErr != nil { - logger.ErrorContext(pluginCtx, "error executing register-runner.bash", "err", registerRunnerErr) + // logger.ErrorContext(pluginCtx, "error executing register-runner.bash", "err", registerRunnerErr) failureChannel <- true - return + return pluginCtx, fmt.Errorf("error executing register-runner.bash: %s", registerRunnerErr.Error()) } defer removeSelfHostedRunner(pluginCtx, *vm, &workflowJob) // Start runner @@ -815,30 +858,30 @@ func Run( case <-completedJobChannel: logger.InfoContext(pluginCtx, "completed job found before starting runner") completedJobChannel <- github.WorkflowJobEvent{} // send true to the channel to stop the check for completed jobs goroutine - return + return pluginCtx, nil case <-pluginCtx.Done(): logger.WarnContext(pluginCtx, "context canceled before start runner") - return + return pluginCtx, nil default: } logger.DebugContext(pluginCtx, "starting github runner inside of vm") startRunnerErr = ankaCLI.AnkaRun(pluginCtx, "./start-runner.bash") if startRunnerErr != nil { - logger.ErrorContext(pluginCtx, "error executing start-runner.bash", "err", startRunnerErr) + // logger.ErrorContext(pluginCtx, "error executing start-runner.bash", "err", startRunnerErr) failureChannel <- true - return + return pluginCtx, fmt.Errorf("error executing start-runner.bash: %s", startRunnerErr.Error()) } + select { case <-completedJobChannel: logger.InfoContext(pluginCtx, "completed job found before jobCompleted checks") completedJobChannel <- github.WorkflowJobEvent{} // send true to the channel to stop the check for completed jobs goroutine - return + return pluginCtx, nil case <-pluginCtx.Done(): logger.WarnContext(pluginCtx, "context canceled before jobCompleted checks") - return + return pluginCtx, nil default: } - } // skipPrep logger.InfoContext(pluginCtx, "watching for job completion") @@ -855,7 +898,10 @@ func Run( "conclusion", *completedJobEvent.WorkflowJob.Conclusion, ) if *completedJobEvent.WorkflowJob.Conclusion == "success" { - metricsData := metrics.GetMetricsDataFromContext(workerCtx) + metricsData, err := metrics.GetMetricsDataFromContext(workerCtx) + if err != nil { + return pluginCtx, err + } metricsData.IncrementTotalSuccessfulRunsSinceStart() metricsData.UpdatePlugin(pluginCtx, logger, metrics.Plugin{ PluginBase: &metrics.PluginBase{ @@ -865,7 +911,10 @@ func Run( LastSuccessfulRunJobUrl: *completedJobEvent.WorkflowJob.URL, }) } else if *completedJobEvent.WorkflowJob.Conclusion == "failure" { - metricsData := metrics.GetMetricsDataFromContext(workerCtx) + metricsData, err := metrics.GetMetricsDataFromContext(workerCtx) + if err != nil { + return pluginCtx, err + } metricsData.IncrementTotalFailedRunsSinceStart() metricsData.UpdatePlugin(pluginCtx, logger, metrics.Plugin{ PluginBase: &metrics.PluginBase{ @@ -879,14 +928,14 @@ func Run( } else if logCounter%2 == 0 { if pluginCtx.Err() != nil { logger.WarnContext(pluginCtx, "context canceled during job status check") - return + return pluginCtx, nil } } completedJobChannel <- github.WorkflowJobEvent{} // so cleanup can also see it as completed - return + return pluginCtx, nil case <-pluginCtx.Done(): logger.WarnContext(pluginCtx, "context canceled while watching for job completion") - return + return pluginCtx, nil default: time.Sleep(10 * time.Second) if logCounter%2 == 0 { @@ -941,13 +990,25 @@ func removeSelfHostedRunner( vm anka.VM, workflow *WorkflowRunJobDetail, ) { - logger := logging.GetLoggerFromContext(pluginCtx) - ctxPlugin := config.GetPluginFromContext(pluginCtx) - githubClient := internalGithub.GetGitHubClientFromContext(pluginCtx) - isRepoSet := config.GetIsRepoSetFromContext(pluginCtx) + var err error var runnersList *github.Runners var response *github.Response - var err error + logger, err := logging.GetLoggerFromContext(pluginCtx) + if err != nil { + fmt.Printf("{\"time\": \"%s\", \"function\": \"removeSelfHostedRunner\", \"error\": \"%s\"}\n", time.Now().Format(time.RFC3339), err.Error()) + } + ctxPlugin, err := config.GetPluginFromContext(pluginCtx) + if err != nil { + logger.ErrorContext(pluginCtx, "error getting plugin from context", "err", err) + } + githubClient, err := internalGithub.GetGitHubClientFromContext(pluginCtx) + if err != nil { + logger.ErrorContext(pluginCtx, "error getting github client from context", "err", err) + } + isRepoSet, err := config.GetIsRepoSetFromContext(pluginCtx) + if err != nil { + logger.ErrorContext(pluginCtx, "error getting isRepoSet from context", "err", err) + } if workflow.Conclusion == "failure" { if isRepoSet { pluginCtx, runnersList, response, err = internalGithub.ExecuteGitHubClientFunction(pluginCtx, logger, func() (*github.Runners, *github.Response, error) { diff --git a/plugins/receivers/github/github.go b/plugins/receivers/github/github.go index 0d9822b..dab0b79 100644 --- a/plugins/receivers/github/github.go +++ b/plugins/receivers/github/github.go @@ -6,6 +6,8 @@ import ( "fmt" "log/slog" "net/http" + "os" + "path/filepath" "strings" "sync" "time" @@ -82,9 +84,15 @@ func Run( logger *slog.Logger, firstPluginStarted chan bool, metricsData *metrics.MetricsDataLock, -) { - ctxPlugin := config.GetPluginFromContext(pluginCtx) - isRepoSet := config.GetIsRepoSetFromContext(pluginCtx) +) (context.Context, error) { + ctxPlugin, err := config.GetPluginFromContext(pluginCtx) + if err != nil { + return pluginCtx, err + } + isRepoSet, err := config.GetIsRepoSetFromContext(pluginCtx) + if err != nil { + return pluginCtx, err + } metricsData.AddPlugin( metrics.PluginBase{ Name: ctxPlugin.Name, @@ -96,20 +104,30 @@ func Run( }, ) - configFileName := config.GetConfigFileNameFromContext(pluginCtx) + configFileName, err := config.GetConfigFileNameFromContext(pluginCtx) + if err != nil { + return pluginCtx, err + } if ctxPlugin.Token == "" && ctxPlugin.PrivateKey == "" { - logging.Panic(workerCtx, pluginCtx, "token or private_key are not set at global level or in "+configFileName+":plugins:"+ctxPlugin.Name) + return pluginCtx, fmt.Errorf("token or private_key are not set at global level or in " + configFileName + ":plugins:" + ctxPlugin.Name) + } + if strings.HasPrefix(ctxPlugin.PrivateKey, "~/") { + homeDir, err := os.UserHomeDir() + if err != nil { + return pluginCtx, fmt.Errorf("unable to get user home directory: " + err.Error()) + } + ctxPlugin.PrivateKey = filepath.Join(homeDir, ctxPlugin.PrivateKey[2:]) } if ctxPlugin.Owner == "" { - logging.Panic(workerCtx, pluginCtx, "owner is not set in "+configFileName+":plugins:"+ctxPlugin.Name) + return pluginCtx, fmt.Errorf("owner is not set in " + configFileName + ":plugins:" + ctxPlugin.Name) } if ctxPlugin.Secret == "" { - logging.Panic(workerCtx, pluginCtx, "secret is not set in "+configFileName+":plugins:"+ctxPlugin.Name) + return pluginCtx, fmt.Errorf("secret is not set in " + configFileName + ":plugins:" + ctxPlugin.Name) } databaseContainer, err := database.GetDatabaseFromContext(pluginCtx) if err != nil { - logging.Panic(pluginCtx, pluginCtx, "error getting database client from context: "+err.Error()) + return pluginCtx, fmt.Errorf("error getting database client from context: " + err.Error()) } var githubClient *github.Client @@ -122,8 +140,8 @@ func Run( ctxPlugin.Token, ) if err != nil { - logger.ErrorContext(pluginCtx, "error authenticating github client", "err", err) - return + // logger.ErrorContext(pluginCtx, "error authenticating github client", "err", err) + return pluginCtx, fmt.Errorf("error authenticating github client: " + err.Error()) } server := &http.Server{Addr: ":" + ctxPlugin.Port} @@ -134,7 +152,8 @@ func Run( http.HandleFunc("/jobs/v1/receiver", func(w http.ResponseWriter, r *http.Request) { databaseContainer, err := database.GetDatabaseFromContext(pluginCtx) if err != nil { - logging.Panic(pluginCtx, pluginCtx, "error getting database client from context: "+err.Error()) + logger.ErrorContext(pluginCtx, "error getting database client from context", "error", err) + return } payload, err := github.ValidatePayload(r, []byte(ctxPlugin.Secret)) if err != nil { @@ -453,8 +472,8 @@ func Run( }) } if err != nil { - logger.ErrorContext(pluginCtx, "error listing hooks", "err", err) - return + // logger.ErrorContext(pluginCtx, "error listing hooks", "err", err) + return pluginCtx, fmt.Errorf("error listing hooks: %s", err.Error()) } for _, hookDelivery := range *hookDeliveries { @@ -494,29 +513,29 @@ func Run( // // get all keys from database for the main queue and service queues as well as completed queuedKeys, err := databaseContainer.Client.Keys(pluginCtx, "anklet/jobs/github/queued/"+ctxPlugin.Owner+"*").Result() if err != nil { - logger.ErrorContext(pluginCtx, "error getting list of keys", "err", err) - return + // logger.ErrorContext(pluginCtx, "error getting list of keys", "err", err) + return pluginCtx, fmt.Errorf("error getting list of keys: %s", err.Error()) } var allQueuedJobs = make(map[string][]string) for _, key := range queuedKeys { queuedJobs, err := databaseContainer.Client.LRange(pluginCtx, key, 0, -1).Result() if err != nil { - logger.ErrorContext(pluginCtx, "error getting list of queued jobs for key: "+key, "err", err) - return + // logger.ErrorContext(pluginCtx, "error getting list of queued jobs for key: "+key, "err", err) + return pluginCtx, fmt.Errorf("error getting list of queued jobs for key: %s", err.Error()) } allQueuedJobs[key] = queuedJobs } completedKeys, err := databaseContainer.Client.Keys(pluginCtx, "anklet/jobs/github/completed"+ctxPlugin.Owner+"*").Result() if err != nil { - logger.ErrorContext(pluginCtx, "error getting list of keys", "err", err) - return + // logger.ErrorContext(pluginCtx, "error getting list of keys", "err", err) + return pluginCtx, fmt.Errorf("error getting list of keys: %s", err.Error()) } var allCompletedJobs = make(map[string][]string) for _, key := range completedKeys { completedJobs, err := databaseContainer.Client.LRange(pluginCtx, key, 0, -1).Result() if err != nil { - logger.ErrorContext(pluginCtx, "error getting list of queued jobs for key: "+key, "err", err) - return + // logger.ErrorContext(pluginCtx, "error getting list of queued jobs for key: "+key, "err", err) + return pluginCtx, fmt.Errorf("error getting list of queued jobs for key: %s", err.Error()) } allCompletedJobs[key] = completedJobs } @@ -545,14 +564,14 @@ MainLoop: }) } if err != nil { - logger.ErrorContext(pluginCtx, "error listing hooks", "err", err) - return + // logger.ErrorContext(pluginCtx, "error listing hooks", "err", err) + return pluginCtx, fmt.Errorf("error listing hooks: %s", err.Error()) } var workflowJobEvent github.WorkflowJobEvent err = json.Unmarshal(*gottenHookDelivery.Request.RawPayload, &workflowJobEvent) if err != nil { - logger.ErrorContext(pluginCtx, "error unmarshalling hook request raw payload to HookResponse", "err", err) - return + // logger.ErrorContext(pluginCtx, "error unmarshalling hook request raw payload to HookResponse", "err", err) + return pluginCtx, fmt.Errorf("error unmarshalling hook request raw payload to HookResponse: %s", err.Error()) } inQueued := false @@ -580,8 +599,8 @@ MainLoop: } wrappedPayload, err, typeErr := database.UnwrapPayload[github.WorkflowJobEvent](queuedJob) if err != nil { - logger.ErrorContext(pluginCtx, "error unmarshalling job", "err", err) - return + // logger.ErrorContext(pluginCtx, "error unmarshalling job", "err", err) + return pluginCtx, fmt.Errorf("error unmarshalling job: %s", err.Error()) } if typeErr != nil { // not the type we want continue @@ -607,8 +626,8 @@ MainLoop: for index, completedJob := range completedJobs { wrappedPayload, err, typeErr := database.UnwrapPayload[github.WorkflowJobEvent](completedJob) if err != nil { - logger.ErrorContext(pluginCtx, "error unmarshalling job", "err", err) - return + // logger.ErrorContext(pluginCtx, "error unmarshalling job", "err", err) + return pluginCtx, fmt.Errorf("error unmarshalling job: %s", err.Error()) } if typeErr != nil { // not the type we want continue @@ -632,8 +651,8 @@ MainLoop: if inCompleted && !inQueued { _, err = databaseContainer.Client.LRem(pluginCtx, inCompletedListKey, 1, allCompletedJobs[inCompletedListKey][inCompletedIndex]).Result() if err != nil { - logger.ErrorContext(pluginCtx, "error removing completedJob from anklet/jobs/github/completed/"+ctxPlugin.Owner, "err", err, "completedJob", allCompletedJobs[inCompletedListKey][inCompletedIndex]) - return + // logger.ErrorContext(pluginCtx, "error removing completedJob from anklet/jobs/github/completed/"+ctxPlugin.Owner, "err", err, "completedJob", allCompletedJobs[inCompletedListKey][inCompletedIndex]) + return pluginCtx, fmt.Errorf("error removing completedJob from anklet/jobs/github/completed/"+ctxPlugin.Owner+": %s", err.Error()) } continue } @@ -665,14 +684,14 @@ MainLoop: }) } if err != nil { - logger.ErrorContext(pluginCtx, "error listing hooks", "err", err) - return + // logger.ErrorContext(pluginCtx, "error listing hooks", "err", err) + return pluginCtx, fmt.Errorf("error listing hooks: %s", err.Error()) } var otherWorkflowJobEvent github.WorkflowJobEvent err = json.Unmarshal(*otherGottenHookDelivery.Request.RawPayload, &otherWorkflowJobEvent) if err != nil { - logger.ErrorContext(pluginCtx, "error unmarshalling hook request raw payload to HookResponse", "err", err) - return + // logger.ErrorContext(pluginCtx, "error unmarshalling hook request raw payload to HookResponse", "err", err) + return pluginCtx, fmt.Errorf("error unmarshalling hook request raw payload to HookResponse: %s", err.Error()) } if *workflowJobEvent.WorkflowJob.ID == *otherWorkflowJobEvent.WorkflowJob.ID { continue MainLoop @@ -735,6 +754,8 @@ MainLoop: <-pluginCtx.Done() logger.InfoContext(pluginCtx, "shutting down receiver") if err := server.Shutdown(pluginCtx); err != nil { - logger.ErrorContext(pluginCtx, "receiver shutdown error", "error", err) + // logger.ErrorContext(pluginCtx, "receiver shutdown error", "error", err) + return pluginCtx, fmt.Errorf("receiver shutdown error: %s", err.Error()) } + return pluginCtx, nil } diff --git a/tests/cli-test-capacity.yml b/tests/cli-test-capacity.yml new file mode 100644 index 0000000..c7c7718 --- /dev/null +++ b/tests/cli-test-capacity.yml @@ -0,0 +1,22 @@ +--- +global_database_url: 127.0.0.1 +global_database_port: 6379 +global_database_user: +global_database_password: +global_database_database: 0 +global_private_key: ~/anklet-private-key.pem +plugins: + - name: RUNNER1 + plugin: github + app_id: 949431 + installation_id: 52970581 + owner: mycompanyone + registry_url: http://anka.registry:8089 + sleep_interval: 5 + - name: RUNNER2 + plugin: github2 + app_id: 949431 + installation_id: 52970581 + owner: mycompanytwo + registry_url: http://anka.registry:8089 + sleep_interval: 10 diff --git a/tests/cli-test-empty.yml b/tests/cli-test-empty.yml new file mode 100644 index 0000000..e69de29 diff --git a/tests/cli-test-no-db.yml b/tests/cli-test-no-db.yml new file mode 100644 index 0000000..741a2a2 --- /dev/null +++ b/tests/cli-test-no-db.yml @@ -0,0 +1,23 @@ +--- +global_private_key: ~/anklet-private-key.pem +plugins: + - name: RUNNER1 + plugin: github + app_id: 949431 + installation_id: 52970581 + owner: mycompanyone + registry_url: http://anka.registry:8089 + sleep_interval: 5 + - name: RUNNER2 + plugin: github + app_id: 949431 + installation_id: 52970581 + owner: mycompanytwo + registry_url: http://anka.registry:8089 + sleep_interval: 10 + database: + url: 127.0.0.1 + port: 6379 + user: "" + password: "" + database: 0 diff --git a/tests/cli-test-no-log-directory.yml b/tests/cli-test-no-log-directory.yml new file mode 100644 index 0000000..ed56eae --- /dev/null +++ b/tests/cli-test-no-log-directory.yml @@ -0,0 +1,7 @@ +--- +work_dir: /tmp/ +pid_file_dir: /tmp/ +log: + # if file_dir is not set, it will be set to current directory you execute anklet in + file_dir: /tmp/doesntexist +plugins: \ No newline at end of file diff --git a/tests/cli-test-no-plugin-name.yml b/tests/cli-test-no-plugin-name.yml new file mode 100644 index 0000000..0d0aa77 --- /dev/null +++ b/tests/cli-test-no-plugin-name.yml @@ -0,0 +1,16 @@ +--- +global_private_key: ~/anklet-private-key.pem +plugins: + - plugin: plugin1 + app_id: 949431 + installation_id: 52970581 + owner: mycompanyone + registry_url: http://anka.registry:8089 + sleep_interval: 5 + - name: RUNNER2 + plugin: plugin2 + app_id: 949431 + installation_id: 52970581 + owner: mycompanytwo + registry_url: http://anka.registry:8089 + sleep_interval: 10 diff --git a/tests/cli-test-no-plugins.yml b/tests/cli-test-no-plugins.yml new file mode 100644 index 0000000..9112006 --- /dev/null +++ b/tests/cli-test-no-plugins.yml @@ -0,0 +1,2 @@ +--- +plugins: diff --git a/tests/cli-test-non-existent-plugin.yml b/tests/cli-test-non-existent-plugin.yml new file mode 100644 index 0000000..c7c7718 --- /dev/null +++ b/tests/cli-test-non-existent-plugin.yml @@ -0,0 +1,22 @@ +--- +global_database_url: 127.0.0.1 +global_database_port: 6379 +global_database_user: +global_database_password: +global_database_database: 0 +global_private_key: ~/anklet-private-key.pem +plugins: + - name: RUNNER1 + plugin: github + app_id: 949431 + installation_id: 52970581 + owner: mycompanyone + registry_url: http://anka.registry:8089 + sleep_interval: 5 + - name: RUNNER2 + plugin: github2 + app_id: 949431 + installation_id: 52970581 + owner: mycompanytwo + registry_url: http://anka.registry:8089 + sleep_interval: 10 diff --git a/tests/cli-test-start-stop.yml b/tests/cli-test-start-stop.yml new file mode 100644 index 0000000..f9f316d --- /dev/null +++ b/tests/cli-test-start-stop.yml @@ -0,0 +1,24 @@ +--- +work_dir: /tmp/ +pid_file_dir: /tmp/ +global_database_url: 127.0.0.1 +global_database_port: 6379 +global_database_user: +global_database_password: +global_database_database: 0 +global_private_key: ~/anklet-private-key.pem +plugins: + - name: RUNNER1 + plugin: github + app_id: 949431 + installation_id: 52970581 + owner: veertuinc + registry_url: http://anka.registry:8089 + sleep_interval: 5 + - name: RUNNER2 + plugin: github + app_id: 949431 + installation_id: 52970581 + owner: veertuinc + registry_url: http://anka.registry:8089 + sleep_interval: 5 \ No newline at end of file diff --git a/tests/cli-test.bash b/tests/cli-test.bash new file mode 100755 index 0000000..6e1616c --- /dev/null +++ b/tests/cli-test.bash @@ -0,0 +1,148 @@ +#!/usr/bin/env bash +set -eo pipefail +TESTS_DIR=$(cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd) +cd $TESTS_DIR/.. # make sure we're in the root + +TEST_VM="15.1-arm64" +TEST_LOG="/tmp/test-log" + +if ! anka version &> /dev/null; then + echo "ERROR: Anka CLI not found" + exit 1 +fi +if ! anka show $TEST_VM &> /dev/null; then + echo "ERROR: VM $TEST_VM not found" + exit 1 +fi + +cleanup() { + pwd + echo "] Cleaning up..." + rm -rf dist + rm -f ~/.config/anklet/config.yml + anka delete --yes "${TEST_VM}-1" &> /dev/null || true + anka delete --yes "${TEST_VM}-2" &> /dev/null || true +} +trap cleanup EXIT + +log_contains() { + if grep "$1" $TEST_LOG &> /dev/null; then + echo " - SUCCESS: Log contains '$1'" + else + echo " - ERROR: Log does not contain '$1'" + exit 1 + fi +} + +log_does_not_contain() { + if ! grep "$1" $TEST_LOG &> /dev/null; then + echo " - SUCCESS: Log does not contain '$1'" + else + echo " - ERROR: Log contains '$1'" + exit 1 + fi +} + +run_test() { + TEST_YML=$1 + TEST_NAME=$(basename $TEST_YML | cut -d. -f1) + TESTS="" + while IFS= read -r line; do + TESTS+="$line"$'\n' + done + echo "]] Running ${TEST_NAME}" + ln -s ${TESTS_DIR}/$TEST_YML ~/.config/anklet/config.yml + if [[ $2 =~ ^[0-9]+$ ]]; then + SLEEP_SECONDS=$2 + $BINARY > $TEST_LOG 2>&1 & + BINARY_PID=$! + sleep $SLEEP_SECONDS + kill -SIGQUIT $BINARY_PID + wait $BINARY_PID + else + $BINARY > $TEST_LOG 2>&1 || true + fi + eval "${TESTS}" + rm -f ~/.config/anklet/config.yml +} + +echo "] Building binary..." +goreleaser --snapshot --clean + +# find the binary +BINARY="$(ls -1 dist/anklet_*_$(uname | tr A-Z a-z)_$(arch))" +echo "] Using binary: $BINARY" + +run_test cli-test-empty.yml < /dev/null +anka start "${TEST_VM}-1" &> /dev/null +anka clone "${TEST_VM}" "${TEST_VM}-2" &> /dev/null +anka start "${TEST_VM}-2" &> /dev/null +run_test cli-test-capacity.yml <