diff --git a/shared.go b/shared.go index 8b37433..3fd7939 100755 --- a/shared.go +++ b/shared.go @@ -7938,14 +7938,14 @@ func SaveWorkflow(resp http.ResponseWriter, request *http.Request) { if workflow.OrgId == "" { workflow.OrgId = user.ActiveOrg.Id } - - workflow, allNodes, err := GetStaticWorkflowHealth(ctx, workflow) + + workflow, allNodes, err := GetStaticWorkflowHealth(ctx, workflow) if err != nil { log.Printf("[ERROR] Failed getting static workflow health for %s: %s", workflow.ID, err) resp.WriteHeader(401) resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "Failed getting static workflow health: %s"}`, err.Error()))) return - } + } // Nodechecks foundNodes := []string{} @@ -17580,7 +17580,7 @@ func PrepareSingleAction(ctx context.Context, user User, appId string, body []by app := WorkflowApp{} if strings.ToLower(appId) == "http" { // Find the app and the ID for it - apps, err := FindWorkflowAppByName(ctx, "http") + apps, err := FindWorkflowAppByName(ctx, "http") if err != nil { log.Printf("[WARNING] Failed to find HTTP app in single action execution: %s", err) return workflowExecution, err @@ -17599,7 +17599,7 @@ func PrepareSingleAction(ctx context.Context, user User, appId string, body []by if len(latestVersion) == 0 { latestVersion = innerApp.AppVersion - app = innerApp + app = innerApp continue } @@ -17732,10 +17732,10 @@ func PrepareSingleAction(ctx context.Context, user User, appId string, body []by // Fallback to latest created /* - if latestTimestamp < auth.Created { - latestTimestamp = auth.Created - action.AuthenticationId = auth.Id - } + if latestTimestamp < auth.Created { + latestTimestamp = auth.Created + action.AuthenticationId = auth.Id + } */ // If valid, just choose it @@ -17807,7 +17807,7 @@ func PrepareSingleAction(ctx context.Context, user User, appId string, body []by } } - action.AppID = appId + action.AppID = appId workflow := Workflow{ Actions: []Action{ action, @@ -28487,6 +28487,130 @@ func HandleWorkflowRunSearch(resp http.ResponseWriter, request *http.Request) { Cursor: cursor, } + //Get workflow run for all subgs of current org where the user is a member + if search.SuborgRuns == true { + suborgs, err := GetAllChildOrgs(ctx, user.ActiveOrg.Id) + if err != nil { + log.Printf("[WARNING] Failed getting suborgs for org %s: %s", user.ActiveOrg.Id, err) + resp.WriteHeader(400) + resp.Write([]byte(`{"success": false}`)) + return + } + + // Limit to max 50 suborgs + if len(suborgs) > 50 { + suborgs = suborgs[:50] + } + + type validationResult struct { + org Org + valid bool + } + + resultChan := make(chan validationResult, len(suborgs)) + var wg sync.WaitGroup + + // Validate suborgs concurrently + for _, suborg := range suborgs { + wg.Add(1) + go func(suborg Org) { + defer wg.Done() + + userPresentInSuborg := false + for _, orgId := range user.Orgs { + if orgId == suborg.Id || user.SupportAccess == true { + userPresentInSuborg = true + break + } + } + + resultChan <- validationResult{ + org: suborg, + valid: userPresentInSuborg, + } + }(suborg) + } + + // Close channel when all validations complete + go func() { + wg.Wait() + close(resultChan) + }() + + // Collect valid suborgs + validSuborgs := []Org{} + for result := range resultChan { + if result.valid { + validSuborgs = append(validSuborgs, result.org) + } + } + + type batchResult struct { + runs []WorkflowExecution + err error + } + + runsChan := make(chan batchResult, len(validSuborgs)) + wg = sync.WaitGroup{} + + // Process each valid suborg concurrently + for _, suborg := range validSuborgs { + wg.Add(1) + go func(suborg Org) { + defer wg.Done() + + runs, _, err := GetWorkflowRunsBySearch(ctx, suborg.Id, search) + if err != nil { + runsChan <- batchResult{ + err: fmt.Errorf("failed getting workflow runs for suborg %s: %v", suborg.Id, err), + } + return + } + + // Filter runs and add suborg details + parsedRuns := []WorkflowExecution{} + for _, run := range runs { + run.Org = OrgMini{ + Id: suborg.Id, + Name: suborg.Name, + Image: suborg.Image, + CreatorOrg: suborg.CreatorOrg, + RegionUrl: suborg.RegionUrl, + } + parsedRuns = append(parsedRuns, run) + } + + runsChan <- batchResult{runs: parsedRuns} + }(suborg) + } + + // Close channel when all goroutines complete + go func() { + wg.Wait() + close(runsChan) + }() + + // Collect results from all suborgs + suborgRuns := []WorkflowExecution{} + for result := range runsChan { + if result.err != nil { + log.Printf("[WARNING] %v", result.err) + continue + } + suborgRuns = append(suborgRuns, result.runs...) + } + + // Combine parent and suborg runs + allRuns := append(workflowSearchResult.Runs, suborgRuns...) + + // Sort by start time + sort.Slice(allRuns, func(i, j int) bool { + return allRuns[i].StartedAt > allRuns[j].StartedAt + }) + + workflowSearchResult.Runs = allRuns + } + respBody, err := json.Marshal(workflowSearchResult) if err != nil { log.Printf("[WARNING] Failed marshaling workflow runs: %s", err) diff --git a/structs.go b/structs.go index deb57f5..cb96a13 100755 --- a/structs.go +++ b/structs.go @@ -1133,8 +1133,9 @@ type WorkflowExecution struct { SubExecutionCount int64 `json:"sub_execution_count" yaml:"sub_execution_count"` // Max depth to execute subflows in infinite loops (10 by default) Priority int64 `json:"priority" datastore:"priority" yaml:"priority"` // Priority of the execution. Usually manual should be 10, and all other UNDER that. - NotificationsCreated int64 `json:"notifications_created" datastore:"notifications_created"` - Authgroup string `json:"authgroup" datastore:"authgroup"` + NotificationsCreated int64 `json:"notifications_created" datastore:"notifications_created"` + Authgroup string `json:"authgroup" datastore:"authgroup"` + Org OrgMini `json:"org" datastore:"-"` } type Position struct { @@ -4027,7 +4028,8 @@ type WorkflowSearch struct { SearchFrom string `json:"start_time"` SearchUntil string `json:"end_time"` - IgnoreOrg bool `json:"ignore_org"` + IgnoreOrg bool `json:"ignore_org"` + SuborgRuns bool `json:"suborg_runs" default:"false"` } type WorkflowSearchResult struct {