Skip to content

Commit

Permalink
fix - user present in org validation before goroutine in workflow run…
Browse files Browse the repository at this point in the history
… search
  • Loading branch information
LalitDeore committed Feb 4, 2025
1 parent 1ba6c5f commit 42ce125
Showing 1 changed file with 52 additions and 26 deletions.
78 changes: 52 additions & 26 deletions shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -7938,14 +7938,14 @@ func SaveWorkflow(resp http.ResponseWriter, request *http.Request) {
if workflow.OrgId == "" {
workflow.OrgId = user.ActiveOrg.Id
}
workflow, allNodes, err := GetStaticWorkflowHealth(ctx, workflow)

workflow, allNodes, err := GetStaticWorkflowHealth(ctx, workflow)
if err != nil {
log.Printf("[ERROR] Failed getting static workflow health for %s: %s", workflow.ID, err)
resp.WriteHeader(401)
resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "Failed getting static workflow health: %s"}`, err.Error())))
return
}
}

// Nodechecks
foundNodes := []string{}
Expand Down Expand Up @@ -17577,7 +17577,7 @@ func PrepareSingleAction(ctx context.Context, user User, appId string, body []by
app := WorkflowApp{}
if strings.ToLower(appId) == "http" {
// Find the app and the ID for it
apps, err := FindWorkflowAppByName(ctx, "http")
apps, err := FindWorkflowAppByName(ctx, "http")
if err != nil {
log.Printf("[WARNING] Failed to find HTTP app in single action execution: %s", err)
return workflowExecution, err
Expand All @@ -17596,7 +17596,7 @@ func PrepareSingleAction(ctx context.Context, user User, appId string, body []by

if len(latestVersion) == 0 {
latestVersion = innerApp.AppVersion
app = innerApp
app = innerApp
continue
}

Expand Down Expand Up @@ -17729,10 +17729,10 @@ func PrepareSingleAction(ctx context.Context, user User, appId string, body []by

// Fallback to latest created
/*
if latestTimestamp < auth.Created {
latestTimestamp = auth.Created
action.AuthenticationId = auth.Id
}
if latestTimestamp < auth.Created {
latestTimestamp = auth.Created
action.AuthenticationId = auth.Id
}
*/

// If valid, just choose it
Expand Down Expand Up @@ -17804,7 +17804,7 @@ func PrepareSingleAction(ctx context.Context, user User, appId string, body []by
}
}

action.AppID = appId
action.AppID = appId
workflow := Workflow{
Actions: []Action{
action,
Expand Down Expand Up @@ -28498,39 +28498,66 @@ func HandleWorkflowRunSearch(resp http.ResponseWriter, request *http.Request) {
suborgs = suborgs[:50]
}

type batchResult struct {
runs []WorkflowExecution
err error
type validationResult struct {
org Org
valid bool
}

resultChan := make(chan batchResult, len(suborgs))
resultChan := make(chan validationResult, len(suborgs))
var wg sync.WaitGroup

// Process each suborg concurrently
// Validate suborgs concurrently
for _, suborg := range suborgs {
wg.Add(1)
go func(suborg Org) {
defer wg.Done()

// Check if user is present in this suborg
userPresentInSuborg := false
for _, orgId := range user.Orgs {
if orgId == suborg.Id || user.SupportAccess == true {
userPresentInSuborg = true
break
}

}

// Skip this suborg if user is not present
if !userPresentInSuborg {
resultChan <- batchResult{runs: []WorkflowExecution{}}
return
resultChan <- validationResult{
org: suborg,
valid: userPresentInSuborg,
}
}(suborg)
}

// Close channel when all validations complete
go func() {
wg.Wait()
close(resultChan)
}()

// Collect valid suborgs
validSuborgs := []Org{}
for result := range resultChan {
if result.valid {
validSuborgs = append(validSuborgs, result.org)
}
}

type batchResult struct {
runs []WorkflowExecution
err error
}

runsChan := make(chan batchResult, len(validSuborgs))
wg = sync.WaitGroup{}

// Process each valid suborg concurrently
for _, suborg := range validSuborgs {
wg.Add(1)
go func(suborg Org) {
defer wg.Done()

runs, _, err := GetWorkflowRunsBySearch(ctx, suborg.Id, search)
if err != nil {
resultChan <- batchResult{
runsChan <- batchResult{
err: fmt.Errorf("failed getting workflow runs for suborg %s: %v", suborg.Id, err),
}
return
Expand All @@ -28539,7 +28566,6 @@ func HandleWorkflowRunSearch(resp http.ResponseWriter, request *http.Request) {
// Filter runs and add suborg details
parsedRuns := []WorkflowExecution{}
for _, run := range runs {
// Add suborg details to the execution
run.Org = OrgMini{
Id: suborg.Id,
Name: suborg.Name,
Expand All @@ -28550,19 +28576,19 @@ func HandleWorkflowRunSearch(resp http.ResponseWriter, request *http.Request) {
parsedRuns = append(parsedRuns, run)
}

resultChan <- batchResult{runs: parsedRuns}
runsChan <- batchResult{runs: parsedRuns}
}(suborg)
}

// Close channel when all goroutines complete
go func() {
wg.Wait()
close(resultChan)
close(runsChan)
}()

// Collect results from all suborgs
suborgRuns := []WorkflowExecution{}
for result := range resultChan {
for result := range runsChan {
if result.err != nil {
log.Printf("[WARNING] %v", result.err)
continue
Expand Down

0 comments on commit 42ce125

Please sign in to comment.