From 42ce12512b411a521b35df04856d246866cbeacc Mon Sep 17 00:00:00 2001 From: lalitdeore Date: Tue, 4 Feb 2025 13:10:57 +0530 Subject: [PATCH] fix - user present in org validation before goroutine in workflow run search --- shared.go | 78 ++++++++++++++++++++++++++++++++++++------------------- 1 file changed, 52 insertions(+), 26 deletions(-) diff --git a/shared.go b/shared.go index eacb4dd..0405fed 100755 --- a/shared.go +++ b/shared.go @@ -7938,14 +7938,14 @@ func SaveWorkflow(resp http.ResponseWriter, request *http.Request) { if workflow.OrgId == "" { workflow.OrgId = user.ActiveOrg.Id } - - workflow, allNodes, err := GetStaticWorkflowHealth(ctx, workflow) + + workflow, allNodes, err := GetStaticWorkflowHealth(ctx, workflow) if err != nil { log.Printf("[ERROR] Failed getting static workflow health for %s: %s", workflow.ID, err) resp.WriteHeader(401) resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "Failed getting static workflow health: %s"}`, err.Error()))) return - } + } // Nodechecks foundNodes := []string{} @@ -17577,7 +17577,7 @@ func PrepareSingleAction(ctx context.Context, user User, appId string, body []by app := WorkflowApp{} if strings.ToLower(appId) == "http" { // Find the app and the ID for it - apps, err := FindWorkflowAppByName(ctx, "http") + apps, err := FindWorkflowAppByName(ctx, "http") if err != nil { log.Printf("[WARNING] Failed to find HTTP app in single action execution: %s", err) return workflowExecution, err @@ -17596,7 +17596,7 @@ func PrepareSingleAction(ctx context.Context, user User, appId string, body []by if len(latestVersion) == 0 { latestVersion = innerApp.AppVersion - app = innerApp + app = innerApp continue } @@ -17729,10 +17729,10 @@ func PrepareSingleAction(ctx context.Context, user User, appId string, body []by // Fallback to latest created /* - if latestTimestamp < auth.Created { - latestTimestamp = auth.Created - action.AuthenticationId = auth.Id - } + if latestTimestamp < auth.Created { + latestTimestamp = auth.Created + action.AuthenticationId = auth.Id + } */ // If valid, just choose it @@ -17804,7 +17804,7 @@ func PrepareSingleAction(ctx context.Context, user User, appId string, body []by } } - action.AppID = appId + action.AppID = appId workflow := Workflow{ Actions: []Action{ action, @@ -28498,39 +28498,66 @@ func HandleWorkflowRunSearch(resp http.ResponseWriter, request *http.Request) { suborgs = suborgs[:50] } - type batchResult struct { - runs []WorkflowExecution - err error + type validationResult struct { + org Org + valid bool } - resultChan := make(chan batchResult, len(suborgs)) + resultChan := make(chan validationResult, len(suborgs)) var wg sync.WaitGroup - // Process each suborg concurrently + // Validate suborgs concurrently for _, suborg := range suborgs { wg.Add(1) go func(suborg Org) { defer wg.Done() - // Check if user is present in this suborg userPresentInSuborg := false for _, orgId := range user.Orgs { if orgId == suborg.Id || user.SupportAccess == true { userPresentInSuborg = true break } - } - // Skip this suborg if user is not present - if !userPresentInSuborg { - resultChan <- batchResult{runs: []WorkflowExecution{}} - return + resultChan <- validationResult{ + org: suborg, + valid: userPresentInSuborg, } + }(suborg) + } + + // Close channel when all validations complete + go func() { + wg.Wait() + close(resultChan) + }() + + // Collect valid suborgs + validSuborgs := []Org{} + for result := range resultChan { + if result.valid { + validSuborgs = append(validSuborgs, result.org) + } + } + + type batchResult struct { + runs []WorkflowExecution + err error + } + + runsChan := make(chan batchResult, len(validSuborgs)) + wg = sync.WaitGroup{} + + // Process each valid suborg concurrently + for _, suborg := range validSuborgs { + wg.Add(1) + go func(suborg Org) { + defer wg.Done() runs, _, err := GetWorkflowRunsBySearch(ctx, suborg.Id, search) if err != nil { - resultChan <- batchResult{ + runsChan <- batchResult{ err: fmt.Errorf("failed getting workflow runs for suborg %s: %v", suborg.Id, err), } return @@ -28539,7 +28566,6 @@ func HandleWorkflowRunSearch(resp http.ResponseWriter, request *http.Request) { // Filter runs and add suborg details parsedRuns := []WorkflowExecution{} for _, run := range runs { - // Add suborg details to the execution run.Org = OrgMini{ Id: suborg.Id, Name: suborg.Name, @@ -28550,19 +28576,19 @@ func HandleWorkflowRunSearch(resp http.ResponseWriter, request *http.Request) { parsedRuns = append(parsedRuns, run) } - resultChan <- batchResult{runs: parsedRuns} + runsChan <- batchResult{runs: parsedRuns} }(suborg) } // Close channel when all goroutines complete go func() { wg.Wait() - close(resultChan) + close(runsChan) }() // Collect results from all suborgs suborgRuns := []WorkflowExecution{} - for result := range resultChan { + for result := range runsChan { if result.err != nil { log.Printf("[WARNING] %v", result.err) continue