From 1ba6c5fc2aa3f52a6829e145fc65bd0a3f703cab Mon Sep 17 00:00:00 2001 From: lalitdeore Date: Mon, 3 Feb 2025 16:55:11 +0530 Subject: [PATCH 1/2] [feature] - search multitenant workflow run search in parent org --- shared.go | 98 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ structs.go | 18 +++++----- 2 files changed, 108 insertions(+), 8 deletions(-) diff --git a/shared.go b/shared.go index 127f384..eacb4dd 100755 --- a/shared.go +++ b/shared.go @@ -28483,6 +28483,104 @@ func HandleWorkflowRunSearch(resp http.ResponseWriter, request *http.Request) { Cursor: cursor, } + //Get workflow run for all subgs of current org where the user is a member + if search.SuborgRuns == true { + suborgs, err := GetAllChildOrgs(ctx, user.ActiveOrg.Id) + if err != nil { + log.Printf("[WARNING] Failed getting suborgs for org %s: %s", user.ActiveOrg.Id, err) + resp.WriteHeader(400) + resp.Write([]byte(`{"success": false}`)) + return + } + + // Limit to max 50 suborgs + if len(suborgs) > 50 { + suborgs = suborgs[:50] + } + + type batchResult struct { + runs []WorkflowExecution + err error + } + + resultChan := make(chan batchResult, len(suborgs)) + var wg sync.WaitGroup + + // Process each suborg concurrently + for _, suborg := range suborgs { + wg.Add(1) + go func(suborg Org) { + defer wg.Done() + + // Check if user is present in this suborg + userPresentInSuborg := false + for _, orgId := range user.Orgs { + if orgId == suborg.Id || user.SupportAccess == true { + userPresentInSuborg = true + break + } + + } + + // Skip this suborg if user is not present + if !userPresentInSuborg { + resultChan <- batchResult{runs: []WorkflowExecution{}} + return + } + + runs, _, err := GetWorkflowRunsBySearch(ctx, suborg.Id, search) + if err != nil { + resultChan <- batchResult{ + err: fmt.Errorf("failed getting workflow runs for suborg %s: %v", suborg.Id, err), + } + return + } + + // Filter runs and add suborg details + parsedRuns := []WorkflowExecution{} + for _, run := range runs { + // Add suborg details to the execution + run.Org = OrgMini{ + Id: suborg.Id, + Name: suborg.Name, + Image: suborg.Image, + CreatorOrg: suborg.CreatorOrg, + RegionUrl: suborg.RegionUrl, + } + parsedRuns = append(parsedRuns, run) + } + + resultChan <- batchResult{runs: parsedRuns} + }(suborg) + } + + // Close channel when all goroutines complete + go func() { + wg.Wait() + close(resultChan) + }() + + // Collect results from all suborgs + suborgRuns := []WorkflowExecution{} + for result := range resultChan { + if result.err != nil { + log.Printf("[WARNING] %v", result.err) + continue + } + suborgRuns = append(suborgRuns, result.runs...) + } + + // Combine parent and suborg runs + allRuns := append(workflowSearchResult.Runs, suborgRuns...) + + // Sort by start time + sort.Slice(allRuns, func(i, j int) bool { + return allRuns[i].StartedAt > allRuns[j].StartedAt + }) + + workflowSearchResult.Runs = allRuns + } + respBody, err := json.Marshal(workflowSearchResult) if err != nil { log.Printf("[WARNING] Failed marshaling workflow runs: %s", err) diff --git a/structs.go b/structs.go index 3084c51..e7290d5 100755 --- a/structs.go +++ b/structs.go @@ -1132,8 +1132,9 @@ type WorkflowExecution struct { SubExecutionCount int64 `json:"sub_execution_count" yaml:"sub_execution_count"` // Max depth to execute subflows in infinite loops (10 by default) Priority int64 `json:"priority" datastore:"priority" yaml:"priority"` // Priority of the execution. Usually manual should be 10, and all other UNDER that. - NotificationsCreated int64 `json:"notifications_created" datastore:"notifications_created"` - Authgroup string `json:"authgroup" datastore:"authgroup"` + NotificationsCreated int64 `json:"notifications_created" datastore:"notifications_created"` + Authgroup string `json:"authgroup" datastore:"authgroup"` + Org OrgMini `json:"org" datastore:"-"` } type Position struct { @@ -1523,11 +1524,11 @@ type ValidationProblem struct { } type TypeValidation struct { - Valid bool `json:"valid" datastore:"valid"` - ChangedAt int64 `json:"changed_at" datastore:"changed_at"` - LastValid int64 `json:"last_valid" datastore:"last_valid"` - ValidationRan bool `json:"validation_ran" datastore:"validation_ran"` - NotificationsCreated int64 `json:"notifications_created" datastore:"notifications_created"` + Valid bool `json:"valid" datastore:"valid"` + ChangedAt int64 `json:"changed_at" datastore:"changed_at"` + LastValid int64 `json:"last_valid" datastore:"last_valid"` + ValidationRan bool `json:"validation_ran" datastore:"validation_ran"` + NotificationsCreated int64 `json:"notifications_created" datastore:"notifications_created"` // For the last update, which did it WorkflowId string `json:"workflow_id" datastore:"workflow_id"` @@ -4023,7 +4024,8 @@ type WorkflowSearch struct { SearchFrom string `json:"start_time"` SearchUntil string `json:"end_time"` - IgnoreOrg bool `json:"ignore_org"` + IgnoreOrg bool `json:"ignore_org"` + SuborgRuns bool `json:"suborg_runs" default:"false"` } type WorkflowSearchResult struct { From 42ce12512b411a521b35df04856d246866cbeacc Mon Sep 17 00:00:00 2001 From: lalitdeore Date: Tue, 4 Feb 2025 13:10:57 +0530 Subject: [PATCH 2/2] fix - user present in org validation before goroutine in workflow run search --- shared.go | 78 ++++++++++++++++++++++++++++++++++++------------------- 1 file changed, 52 insertions(+), 26 deletions(-) diff --git a/shared.go b/shared.go index eacb4dd..0405fed 100755 --- a/shared.go +++ b/shared.go @@ -7938,14 +7938,14 @@ func SaveWorkflow(resp http.ResponseWriter, request *http.Request) { if workflow.OrgId == "" { workflow.OrgId = user.ActiveOrg.Id } - - workflow, allNodes, err := GetStaticWorkflowHealth(ctx, workflow) + + workflow, allNodes, err := GetStaticWorkflowHealth(ctx, workflow) if err != nil { log.Printf("[ERROR] Failed getting static workflow health for %s: %s", workflow.ID, err) resp.WriteHeader(401) resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "Failed getting static workflow health: %s"}`, err.Error()))) return - } + } // Nodechecks foundNodes := []string{} @@ -17577,7 +17577,7 @@ func PrepareSingleAction(ctx context.Context, user User, appId string, body []by app := WorkflowApp{} if strings.ToLower(appId) == "http" { // Find the app and the ID for it - apps, err := FindWorkflowAppByName(ctx, "http") + apps, err := FindWorkflowAppByName(ctx, "http") if err != nil { log.Printf("[WARNING] Failed to find HTTP app in single action execution: %s", err) return workflowExecution, err @@ -17596,7 +17596,7 @@ func PrepareSingleAction(ctx context.Context, user User, appId string, body []by if len(latestVersion) == 0 { latestVersion = innerApp.AppVersion - app = innerApp + app = innerApp continue } @@ -17729,10 +17729,10 @@ func PrepareSingleAction(ctx context.Context, user User, appId string, body []by // Fallback to latest created /* - if latestTimestamp < auth.Created { - latestTimestamp = auth.Created - action.AuthenticationId = auth.Id - } + if latestTimestamp < auth.Created { + latestTimestamp = auth.Created + action.AuthenticationId = auth.Id + } */ // If valid, just choose it @@ -17804,7 +17804,7 @@ func PrepareSingleAction(ctx context.Context, user User, appId string, body []by } } - action.AppID = appId + action.AppID = appId workflow := Workflow{ Actions: []Action{ action, @@ -28498,39 +28498,66 @@ func HandleWorkflowRunSearch(resp http.ResponseWriter, request *http.Request) { suborgs = suborgs[:50] } - type batchResult struct { - runs []WorkflowExecution - err error + type validationResult struct { + org Org + valid bool } - resultChan := make(chan batchResult, len(suborgs)) + resultChan := make(chan validationResult, len(suborgs)) var wg sync.WaitGroup - // Process each suborg concurrently + // Validate suborgs concurrently for _, suborg := range suborgs { wg.Add(1) go func(suborg Org) { defer wg.Done() - // Check if user is present in this suborg userPresentInSuborg := false for _, orgId := range user.Orgs { if orgId == suborg.Id || user.SupportAccess == true { userPresentInSuborg = true break } - } - // Skip this suborg if user is not present - if !userPresentInSuborg { - resultChan <- batchResult{runs: []WorkflowExecution{}} - return + resultChan <- validationResult{ + org: suborg, + valid: userPresentInSuborg, } + }(suborg) + } + + // Close channel when all validations complete + go func() { + wg.Wait() + close(resultChan) + }() + + // Collect valid suborgs + validSuborgs := []Org{} + for result := range resultChan { + if result.valid { + validSuborgs = append(validSuborgs, result.org) + } + } + + type batchResult struct { + runs []WorkflowExecution + err error + } + + runsChan := make(chan batchResult, len(validSuborgs)) + wg = sync.WaitGroup{} + + // Process each valid suborg concurrently + for _, suborg := range validSuborgs { + wg.Add(1) + go func(suborg Org) { + defer wg.Done() runs, _, err := GetWorkflowRunsBySearch(ctx, suborg.Id, search) if err != nil { - resultChan <- batchResult{ + runsChan <- batchResult{ err: fmt.Errorf("failed getting workflow runs for suborg %s: %v", suborg.Id, err), } return @@ -28539,7 +28566,6 @@ func HandleWorkflowRunSearch(resp http.ResponseWriter, request *http.Request) { // Filter runs and add suborg details parsedRuns := []WorkflowExecution{} for _, run := range runs { - // Add suborg details to the execution run.Org = OrgMini{ Id: suborg.Id, Name: suborg.Name, @@ -28550,19 +28576,19 @@ func HandleWorkflowRunSearch(resp http.ResponseWriter, request *http.Request) { parsedRuns = append(parsedRuns, run) } - resultChan <- batchResult{runs: parsedRuns} + runsChan <- batchResult{runs: parsedRuns} }(suborg) } // Close channel when all goroutines complete go func() { wg.Wait() - close(resultChan) + close(runsChan) }() // Collect results from all suborgs suborgRuns := []WorkflowExecution{} - for result := range resultChan { + for result := range runsChan { if result.err != nil { log.Printf("[WARNING] %v", result.err) continue