Skip to content

Commit

Permalink
Fixed most of the normal suborg workflow distribution bugs for apps &…
Browse files Browse the repository at this point in the history
… triggers
  • Loading branch information
frikky committed Feb 6, 2025
1 parent 5150485 commit 384acea
Show file tree
Hide file tree
Showing 3 changed files with 442 additions and 90 deletions.
122 changes: 122 additions & 0 deletions cloudSync.go
Original file line number Diff line number Diff line change
Expand Up @@ -1422,3 +1422,125 @@ func ActivateWorkflowApp(resp http.ResponseWriter, request *http.Request) {
resp.WriteHeader(200)
resp.Write([]byte(`{"success": true}`))
}

// For replicating HTTP request from schedule user
func HandleSuborgScheduleRun(request *http.Request, workflow *Workflow) {
ctx := context.Background()
if len(workflow.SuborgDistribution) == 0 {
log.Printf("[WARNING] No suborgs to run for workflow %s", workflow.ID)
return
}

// Finding first one.
originalTriggerId := ""
for _, trigger := range workflow.Triggers {
if trigger.TriggerType == "SCHEDULE" {
originalTriggerId = trigger.ID
break
}
}

if len(originalTriggerId) == 0 {
return
}

// 1. Get child workflows of workflow
// 2. Map to the right ones
childWorkflows, err := ListChildWorkflows(ctx, workflow.ID)
if err != nil {
log.Printf("[ERROR] Failed getting child workflows for parent workflow %s: %s", workflow.ID, err)
return
}

client := http.Client{}
for _, childWorkflow := range childWorkflows {
if childWorkflow.ID == workflow.ID {
continue
}

if childWorkflow.OrgId == workflow.OrgId {
continue
}

// Check if the OrgId is still in the workflow.Sub
found := false
for _, suborg := range workflow.SuborgDistribution {
if childWorkflow.OrgId == suborg {
found = true
break
}
}

if !found {
continue
}

// Ensuring the trigger still exists in the child
found = false
for _, trigger := range childWorkflow.Triggers {
if trigger.ReplacementForTrigger == originalTriggerId {
found = true
break
}
}

if !found {
continue
}

log.Printf("[DEBUG] Should be running %s schedule suborg workflows", childWorkflow.ID)
go func(client http.Client, request *http.Request, childWorkflow Workflow) {
baseurl := "https://shuffler.io"
if os.Getenv("BASE_URL") != "" {
baseurl = os.Getenv("BASE_URL")
}

if os.Getenv("SHUFFLE_CLOUDRUN_URL") != "" {
baseurl = os.Getenv("SHUFFLE_CLOUDRUN_URL")
}

body, err := ioutil.ReadAll(request.Body)
if err != nil {
log.Printf("[ERROR] Failed reading body from schedule request: %s", err)
return
}

request.Body = io.NopCloser(bytes.NewBuffer(body))
formattedUrl := fmt.Sprintf("%s/api/v1/workflows/%s/run", baseurl, childWorkflow.ID)
req, err := http.NewRequest(
"POST",
formattedUrl,
bytes.NewBuffer(body),
)

if err != nil {
log.Printf("[WARNING] Failed mapping child workflow schedule: %s", err)
return
}

for key, value := range request.Header {
req.Header.Set(key, value[0])
}

newresp, err := client.Do(req)
if err != nil {
log.Printf("[ERROR] Failed running child workflow schedule: %s", err)
return
}

defer newresp.Body.Close()
if newresp.StatusCode == 200 {
log.Printf("[DEBUG] Started suborg workflow from schedule. Parent: %s. Child: %s", childWorkflow.ParentWorkflowId, childWorkflow.ID)
} else {
respBody, err := ioutil.ReadAll(newresp.Body)
if err != nil {
log.Printf("[ERROR] Failed to read body from failed newresp")
return
}

log.Printf("[ERROR] Failed to start suborg workflow from schedule with status %d. Parent: %s. Child: %s. Raw Body: %s", newresp.StatusCode, childWorkflow.ParentWorkflowId, childWorkflow.ID, string(respBody))
}

}(client, request, childWorkflow)
}
}
2 changes: 1 addition & 1 deletion db-connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -6301,7 +6301,7 @@ func GetPrioritizedApps(ctx context.Context, user User) ([]WorkflowApp, error) {
_, err := it.Next(&innerApp)
cnt += 1
if cnt > maxAmount {
log.Printf("[ERROR] Maximum try exceeded for workfloapp (2)")
log.Printf("[ERROR] Maximum try exceeded for workflowapp (2)")
break
}

Expand Down
Loading

0 comments on commit 384acea

Please sign in to comment.