From 8f482dafdcb750b329f109e8e85ac42428ad5ab1 Mon Sep 17 00:00:00 2001 From: Frikky Date: Fri, 24 Jan 2025 02:21:04 +0100 Subject: [PATCH 1/4] Added automatic auth mapping of VALID auth in single actions --- codegen.go | 2 +- db-connector.go | 3 ++- shared.go | 71 +++++++++++++++++++++---------------------------- 3 files changed, 34 insertions(+), 42 deletions(-) diff --git a/codegen.go b/codegen.go index 93620e7..9e32686 100755 --- a/codegen.go +++ b/codegen.go @@ -3643,7 +3643,7 @@ func HandlePut(swagger *openapi3.Swagger, api WorkflowApp, extraParameters []Wor } func GetAppRequirements() string { - return "requests==2.32.3\nurllib3==2.3.0\nliquidpy==0.8.2\nMarkupSafe==3.0.2\nflask[async]==3.1.0\npython-dateutil==2.9.0.post0\nPyJWT==2.10.1\n" + return "requests==2.32.3\nurllib3==2.3.0\nliquidpy==0.8.2\nMarkupSafe==3.0.2\nflask[async]==3.1.0\npython-dateutil==2.9.0.post0\nPyJWT==2.10.1\nshufflepy==0.0.6\nshuffle-sdk==0.0.7" } // Removes JSON values from the input diff --git a/db-connector.go b/db-connector.go index b1e1cb9..321944f 100755 --- a/db-connector.go +++ b/db-connector.go @@ -1928,7 +1928,8 @@ func Fixexecution(ctx context.Context, workflowExecution WorkflowExecution) (Wor } if !skipFinished { - log.Printf("[DEBUG][%s] Setting execution to finished because all results are in and it was still in EXECUTING mode. Should set subflow parent result as well (not implemented) - just returning for now for parent function to handle.", workflowExecution.ExecutionId) + // FIXME: Is this subflow result (not implemented) valid? I think it should have been added? Hmm. + //log.Printf("[DEBUG][%s] Setting execution to finished because all results are in and it was still in EXECUTING mode. Should set subflow parent result as well (not implemented) - just returning for now for parent function to handle.", workflowExecution.ExecutionId) finalWorkflowExecution.Status = "FINISHED" dbsave = true if finalWorkflowExecution.CompletedAt == 0 { diff --git a/shared.go b/shared.go index e2395cc..c927a19 100755 --- a/shared.go +++ b/shared.go @@ -6520,7 +6520,7 @@ func diffWorkflows(oldWorkflow Workflow, parentWorkflow Workflow, update bool) { // i think the following messes everything up with // authenticationIDs. Let's just remove it for now. - // In case of replication, + // In case of replication, parentWorkflow.Actions[actionIndex].AuthenticationId = "" idFound := false @@ -18493,45 +18493,10 @@ func PrepareSingleAction(ctx context.Context, user User, fileId string, body []b return workflowExecution, err } - /* - // FIXME: Is this required? I don't think so - if app.Authentication.Required && len(action.AuthenticationId) == 0 { - - // Basic bypass check for valid headers just in case - authFound := false - for _, param := range action.Parameters { - if param.Name == "headers" || param.Name == "queries" || param.Name == "url" { - lowercased := strings.ToLower(param.Value) - if strings.Contains(lowercased, "auth") || strings.Contains(lowercased, "bearer") || strings.Contains(lowercased, "basic") || strings.Contains(lowercased, "api") { - authFound = true - break - } - } - } - - if !authFound { - log.Printf("[WARNING] Tried to execute SINGLE %s WITHOUT auth (missing)", app.Name) - - found := false - for _, param := range action.Parameters { - if param.Configuration { - found = true - break - } - } - - if !found { - return workflowExecution, errors.New("You must authenticate this API first") - } - } - } - */ - - // FIXME: We need to inject missing empty auth here + // FIXME: We need to inject missing empty auth here in some cases // This is NOT a good solution, but a good bypass - if app.Authentication.Required { + if app.Authentication.Required && len(action.AuthenticationId) == 0 { authFields := 0 - foundFields := []string{} for _, actionParam := range action.Parameters { if actionParam.Configuration { @@ -18570,6 +18535,32 @@ func PrepareSingleAction(ctx context.Context, user User, fileId string, body []b }) } } + + auths, err := GetAllWorkflowAppAuth(ctx, user.ActiveOrg.Id) + if err != nil { + log.Printf("[ERROR] Failed getting auth for single action: %s", err) + } else { + latestTimestamp := int64(0) + for _, auth := range auths { + if auth.App.ID != fileId { + continue + } + + // Fallback to latest created + /* + if latestTimestamp < auth.Created { + latestTimestamp = auth.Created + action.AuthenticationId = auth.Id + } + */ + + // If valid, just choose it + if auth.Validation.Valid { + action.AuthenticationId = auth.Id + break + } + } + } } if runValidationAction { @@ -25431,7 +25422,7 @@ func GetExternalClient(baseUrl string) *http.Client { if len(os.Getenv("SHUFFLE_INTERNAL_NO_PROXY")) > 0 { noProxy = os.Getenv("SHUFFLE_INTERNAL_NO_PROXY") - } + } if len(os.Getenv("SHUFFLE_INTERNAL_NOPROXY")) > 0 { noProxy = os.Getenv("SHUFFLE_INTERNAL_NOPROXY") @@ -25440,7 +25431,7 @@ func GetExternalClient(baseUrl string) *http.Client { // Manage noproxy if len(noProxy) > 0 { - isNoProxy := isNoProxyHost(noProxy, parsedUrl.Host) + isNoProxy := isNoProxyHost(noProxy, parsedUrl.Host) if isNoProxy { log.Printf("[INFO] Skipping proxy for %s", parsedUrl) From 3ba5e4af80875708d7194d7e42ca076371c74778 Mon Sep 17 00:00:00 2001 From: Frikky Date: Fri, 24 Jan 2025 02:29:35 +0100 Subject: [PATCH 2/4] Bumped shufflepy and sdk to work with Singul --- codegen.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/codegen.go b/codegen.go index 9e32686..8eb1b71 100755 --- a/codegen.go +++ b/codegen.go @@ -3643,7 +3643,7 @@ func HandlePut(swagger *openapi3.Swagger, api WorkflowApp, extraParameters []Wor } func GetAppRequirements() string { - return "requests==2.32.3\nurllib3==2.3.0\nliquidpy==0.8.2\nMarkupSafe==3.0.2\nflask[async]==3.1.0\npython-dateutil==2.9.0.post0\nPyJWT==2.10.1\nshufflepy==0.0.6\nshuffle-sdk==0.0.7" + return "requests==2.32.3\nurllib3==2.3.0\nliquidpy==0.8.2\nMarkupSafe==3.0.2\nflask[async]==3.1.0\npython-dateutil==2.9.0.post0\nPyJWT==2.10.1\nshufflepy==0.0.7\nshuffle-sdk==0.0.8" } // Removes JSON values from the input From 9e1e1eeb80cc9349721b6101c169ca7f26d13e58 Mon Sep 17 00:00:00 2001 From: Frikky Date: Fri, 24 Jan 2025 03:19:34 +0100 Subject: [PATCH 3/4] Validation of nw vs europe-west2 and the like for schedules --- shared.go | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/shared.go b/shared.go index c927a19..2d3e5d8 100755 --- a/shared.go +++ b/shared.go @@ -5040,10 +5040,16 @@ func GetGcpSchedule(ctx context.Context, id string) (*ScheduleOld, error) { log.Printf("[ERROR] Client error: %s", err) return schedule, err } + location := "europe-west2" - if len(os.Getenv("SHUFFLE_GCEPROJECT")) > 0 && len(os.Getenv("SHUFFLE_GCEPROJECT_LOCATION")) > 0 { + if len(os.Getenv("SHUFFLE_GCE_LOCATION")) > 0 { + location = os.Getenv("SHUFFLE_GCE_LOCATION") + } + + if len(os.Getenv("SHUFFLE_GCE_LOCATION")) == 0 && len(os.Getenv("SHUFFLE_GCEPROJECT_LOCATION")) > 0 { location = os.Getenv("SHUFFLE_GCEPROJECT_LOCATION") } + req := &schedulerpb.GetJobRequest{ Name: fmt.Sprintf("projects/%s/locations/%s/jobs/schedule_%s", gceProject, location, id), } @@ -18540,7 +18546,7 @@ func PrepareSingleAction(ctx context.Context, user User, fileId string, body []b if err != nil { log.Printf("[ERROR] Failed getting auth for single action: %s", err) } else { - latestTimestamp := int64(0) + //latestTimestamp := int64(0) for _, auth := range auths { if auth.App.ID != fileId { continue From 1adb68c948666f13e541d9bd5272e350126efd9f Mon Sep 17 00:00:00 2001 From: Frikky Date: Fri, 24 Jan 2025 13:37:16 +0100 Subject: [PATCH 4/4] Added reloading of the right workflow triggers in case they are missing in cache --- db-connector.go | 28 ++++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/db-connector.go b/db-connector.go index 321944f..d72a46e 100755 --- a/db-connector.go +++ b/db-connector.go @@ -3235,6 +3235,30 @@ func GetWorkflow(ctx context.Context, id string) (*Workflow, error) { cacheData := []byte(cache.([]uint8)) err = json.Unmarshal(cacheData, &workflow) if err == nil && workflow.ID != "" { + // Somehow this can happen. Reverting to LATEST revision + if len(workflow.Actions) > 0 && len(workflow.Triggers) == 0 { + revisions, err := ListWorkflowRevisions(ctx, workflow.ID, 2) + if err != nil { + log.Printf("[WARNING] Failed getting revisions during trigger load for workflow %s: %s", workflow.ID, err) + } else { + if len(revisions) > 0 { + for _, revision := range revisions { + if revision.ID != workflow.ID { + continue + } + + if len(revision.Triggers) > 0 { + workflow.Triggers = revision.Triggers + break + } + } + + log.Printf("[INFO] Reverting to revision triggers for workflow %s from 0 triggers to %d triggers", workflow.ID, len(revisions[0].Triggers)) + workflow.Triggers = revisions[0].Triggers + } + } + } + return workflow, nil } } else { @@ -8202,6 +8226,10 @@ func SetWorkflowRevision(ctx context.Context, workflow Workflow) error { } DeleteCache(ctx, fmt.Sprintf("%s_%s", nameKey, workflow.ID)) + + // For workflow revision backups + DeleteCache(ctx, fmt.Sprintf("%s_%s_1", nameKey, workflow.ID)) + DeleteCache(ctx, fmt.Sprintf("%s_%s_2", nameKey, workflow.ID)) } return nil