Skip to content

Commit

Permalink
Merge pull request #145 from LalitDeore/distribution
Browse files Browse the repository at this point in the history
[feature] - search multitenant workflow run search in parent org
  • Loading branch information
frikky authored Feb 4, 2025
2 parents 00a4a99 + 42ce125 commit 942ed1d
Show file tree
Hide file tree
Showing 2 changed files with 139 additions and 13 deletions.
144 changes: 134 additions & 10 deletions shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -7938,14 +7938,14 @@ func SaveWorkflow(resp http.ResponseWriter, request *http.Request) {
if workflow.OrgId == "" {
workflow.OrgId = user.ActiveOrg.Id
}
workflow, allNodes, err := GetStaticWorkflowHealth(ctx, workflow)

workflow, allNodes, err := GetStaticWorkflowHealth(ctx, workflow)
if err != nil {
log.Printf("[ERROR] Failed getting static workflow health for %s: %s", workflow.ID, err)
resp.WriteHeader(401)
resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "Failed getting static workflow health: %s"}`, err.Error())))
return
}
}

// Nodechecks
foundNodes := []string{}
Expand Down Expand Up @@ -17580,7 +17580,7 @@ func PrepareSingleAction(ctx context.Context, user User, appId string, body []by
app := WorkflowApp{}
if strings.ToLower(appId) == "http" {
// Find the app and the ID for it
apps, err := FindWorkflowAppByName(ctx, "http")
apps, err := FindWorkflowAppByName(ctx, "http")
if err != nil {
log.Printf("[WARNING] Failed to find HTTP app in single action execution: %s", err)
return workflowExecution, err
Expand All @@ -17599,7 +17599,7 @@ func PrepareSingleAction(ctx context.Context, user User, appId string, body []by

if len(latestVersion) == 0 {
latestVersion = innerApp.AppVersion
app = innerApp
app = innerApp
continue
}

Expand Down Expand Up @@ -17732,10 +17732,10 @@ func PrepareSingleAction(ctx context.Context, user User, appId string, body []by

// Fallback to latest created
/*
if latestTimestamp < auth.Created {
latestTimestamp = auth.Created
action.AuthenticationId = auth.Id
}
if latestTimestamp < auth.Created {
latestTimestamp = auth.Created
action.AuthenticationId = auth.Id
}
*/

// If valid, just choose it
Expand Down Expand Up @@ -17807,7 +17807,7 @@ func PrepareSingleAction(ctx context.Context, user User, appId string, body []by
}
}

action.AppID = appId
action.AppID = appId
workflow := Workflow{
Actions: []Action{
action,
Expand Down Expand Up @@ -28487,6 +28487,130 @@ func HandleWorkflowRunSearch(resp http.ResponseWriter, request *http.Request) {
Cursor: cursor,
}

//Get workflow run for all subgs of current org where the user is a member
if search.SuborgRuns == true {
suborgs, err := GetAllChildOrgs(ctx, user.ActiveOrg.Id)
if err != nil {
log.Printf("[WARNING] Failed getting suborgs for org %s: %s", user.ActiveOrg.Id, err)
resp.WriteHeader(400)
resp.Write([]byte(`{"success": false}`))
return
}

// Limit to max 50 suborgs
if len(suborgs) > 50 {
suborgs = suborgs[:50]
}

type validationResult struct {
org Org
valid bool
}

resultChan := make(chan validationResult, len(suborgs))
var wg sync.WaitGroup

// Validate suborgs concurrently
for _, suborg := range suborgs {
wg.Add(1)
go func(suborg Org) {
defer wg.Done()

userPresentInSuborg := false
for _, orgId := range user.Orgs {
if orgId == suborg.Id || user.SupportAccess == true {
userPresentInSuborg = true
break
}
}

resultChan <- validationResult{
org: suborg,
valid: userPresentInSuborg,
}
}(suborg)
}

// Close channel when all validations complete
go func() {
wg.Wait()
close(resultChan)
}()

// Collect valid suborgs
validSuborgs := []Org{}
for result := range resultChan {
if result.valid {
validSuborgs = append(validSuborgs, result.org)
}
}

type batchResult struct {
runs []WorkflowExecution
err error
}

runsChan := make(chan batchResult, len(validSuborgs))
wg = sync.WaitGroup{}

// Process each valid suborg concurrently
for _, suborg := range validSuborgs {
wg.Add(1)
go func(suborg Org) {
defer wg.Done()

runs, _, err := GetWorkflowRunsBySearch(ctx, suborg.Id, search)
if err != nil {
runsChan <- batchResult{
err: fmt.Errorf("failed getting workflow runs for suborg %s: %v", suborg.Id, err),
}
return
}

// Filter runs and add suborg details
parsedRuns := []WorkflowExecution{}
for _, run := range runs {
run.Org = OrgMini{
Id: suborg.Id,
Name: suborg.Name,
Image: suborg.Image,
CreatorOrg: suborg.CreatorOrg,
RegionUrl: suborg.RegionUrl,
}
parsedRuns = append(parsedRuns, run)
}

runsChan <- batchResult{runs: parsedRuns}
}(suborg)
}

// Close channel when all goroutines complete
go func() {
wg.Wait()
close(runsChan)
}()

// Collect results from all suborgs
suborgRuns := []WorkflowExecution{}
for result := range runsChan {
if result.err != nil {
log.Printf("[WARNING] %v", result.err)
continue
}
suborgRuns = append(suborgRuns, result.runs...)
}

// Combine parent and suborg runs
allRuns := append(workflowSearchResult.Runs, suborgRuns...)

// Sort by start time
sort.Slice(allRuns, func(i, j int) bool {
return allRuns[i].StartedAt > allRuns[j].StartedAt
})

workflowSearchResult.Runs = allRuns
}

respBody, err := json.Marshal(workflowSearchResult)
if err != nil {
log.Printf("[WARNING] Failed marshaling workflow runs: %s", err)
Expand Down
8 changes: 5 additions & 3 deletions structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1133,8 +1133,9 @@ type WorkflowExecution struct {
SubExecutionCount int64 `json:"sub_execution_count" yaml:"sub_execution_count"` // Max depth to execute subflows in infinite loops (10 by default)
Priority int64 `json:"priority" datastore:"priority" yaml:"priority"` // Priority of the execution. Usually manual should be 10, and all other UNDER that.

NotificationsCreated int64 `json:"notifications_created" datastore:"notifications_created"`
Authgroup string `json:"authgroup" datastore:"authgroup"`
NotificationsCreated int64 `json:"notifications_created" datastore:"notifications_created"`
Authgroup string `json:"authgroup" datastore:"authgroup"`
Org OrgMini `json:"org" datastore:"-"`
}

type Position struct {
Expand Down Expand Up @@ -4027,7 +4028,8 @@ type WorkflowSearch struct {
SearchFrom string `json:"start_time"`
SearchUntil string `json:"end_time"`

IgnoreOrg bool `json:"ignore_org"`
IgnoreOrg bool `json:"ignore_org"`
SuborgRuns bool `json:"suborg_runs" default:"false"`
}

type WorkflowSearchResult struct {
Expand Down

0 comments on commit 942ed1d

Please sign in to comment.