Skip to content

Commit

Permalink
Made it so wait for results does what it should for loops on cloud
Browse files Browse the repository at this point in the history
  • Loading branch information
frikky committed Jan 25, 2024
1 parent b24cf6d commit 9c4e9db
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 44 deletions.
8 changes: 7 additions & 1 deletion db-connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -1005,6 +1005,13 @@ func SetWorkflowExecution(ctx context.Context, workflowExecution WorkflowExecuti
key := datastore.NameKey(nameKey, strings.ToLower(workflowExecution.ExecutionId), nil)
if _, err := project.Dbclient.Put(ctx, key, &workflowExecution); err != nil {
log.Printf("[ERROR] Problem adding workflow_execution to datastore: %s", err)
if strings.Contains(fmt.Sprintf("%s", err), "context deadline exceeded") {
log.Printf("[ERROR] Context deadline exceeded. Retrying...")
ctx := context.Background()
if _, err := project.Dbclient.Put(ctx, key, &workflowExecution); err != nil {
log.Printf("[ERROR] Workflow execution Error number 1: %s", err)
}
}

// Has to do with certain data coming back in parameters where it shouldn't, causing saving to be impossible
if strings.Contains(fmt.Sprintf("%s", err), "contains an invalid nested") {
Expand Down Expand Up @@ -1418,7 +1425,6 @@ func Fixexecution(ctx context.Context, workflowExecution WorkflowExecution) (Wor
if result.Status == "SUCCESS" {
result.Result = tmpResult.Result
}
//log.Printf("[DEBUG][%s] Getting '%s' result ", workflowExecution.ExecutionId, result.Action.AppName)
}

// Checks for subflows in waiting status
Expand Down
136 changes: 93 additions & 43 deletions shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -9986,43 +9986,81 @@ func updateExecutionParent(ctx context.Context, executionParent, returnValue, pa

isLooping := false
selectedTrigger := Trigger{}

// Validating parent node
checkResult := false

log.Printf("[DEBUG] Parent workflow triggers: %d. Parentnode: %s. Parentworkflow: %s", len(newExecution.Workflow.Triggers), parentNode, newExecution.Workflow.ID)


// Subflows and the like may not be in here anymore. Maybe they are in actions
for _, trigger := range newExecution.Workflow.Triggers {
if trigger.ID == parentNode {
selectedTrigger = trigger
for _, param := range trigger.Parameters {
if param.Name == "argument" && strings.Contains(param.Value, "$") && strings.Contains(param.Value, ".#") {
isLooping = true
break
}
if trigger.ID != parentNode {
continue
}

log.Printf("\n\n\nFound trigger: %s\n\n\n", trigger.ID)

selectedTrigger = trigger
for _, param := range trigger.Parameters {
if param.Name == "argument" && strings.Contains(param.Value, "$") && strings.Contains(param.Value, ".#") {
isLooping = true
}

break
// Check for if wait for results is set
if param.Name == "check_result" {
if param.Value == "true" {
checkResult = true
} else {
checkResult = false
}
}
}

break
}


// Because we changed out how we handle mid-flow triggers
if len(selectedTrigger.ID) == 0 {
for _, action := range newExecution.Workflow.Actions {
if action.ID == parentNode {
selectedTrigger = Trigger{
ID: action.ID,
Label: action.Label,
}
if action.ID != parentNode {
continue
}

foundResult.Action = action
selectedTrigger = Trigger{
ID: action.ID,
Label: action.Label,
}

for _, param := range action.Parameters {
if param.Name == "argument" && strings.Contains(param.Value, "$") && strings.Contains(param.Value, ".#") {
isLooping = true
break
}
foundResult.Action = action

for _, param := range action.Parameters {
if param.Name == "argument" && strings.Contains(param.Value, "$") && strings.Contains(param.Value, ".#") {
isLooping = true
}

break
// Check for if wait for results is set
if param.Name == "check_result" {
if param.Value == "true" {
checkResult = true
} else {
checkResult = false
}
}
}

break
}
}

// Checks if the variable is set properly
if !checkResult {
log.Printf("[DEBUG][%s] No check_result param found for subflow. Not mapping subflow result back to parent workflow. Trigger: %#v", subflowExecutionId, selectedTrigger.ID)

return nil
}

// IF the workflow is looping, the result is added in the backend to not
// cause consistency issues. This means the result will be sent back, and instead
// Added to the workflow result by the backend itself.
Expand Down Expand Up @@ -11524,6 +11562,10 @@ func ParsedExecutionResult(ctx context.Context, workflowExecution WorkflowExecut
} else
*/
if len(workflowExecution.ExecutionParent) > 0 && len(workflowExecution.ExecutionSourceAuth) > 0 && len(workflowExecution.ExecutionSourceNode) > 0 {

// Check if source node has "Wait for Results" set to true


log.Printf("[DEBUG][%s] Found execution parent %s for workflow '%s' (%s)", workflowExecution.ExecutionId, workflowExecution.ExecutionParent, workflowExecution.Workflow.Name, workflowExecution.Workflow.ID)

err = updateExecutionParent(ctx, workflowExecution.ExecutionParent, valueToReturn, workflowExecution.ExecutionSourceAuth, workflowExecution.ExecutionSourceNode, workflowExecution.ExecutionId)
Expand Down Expand Up @@ -11623,7 +11665,7 @@ func ParsedExecutionResult(ctx context.Context, workflowExecution WorkflowExecut

// This is in case the list is not an actual list
if err != nil || len(subflowDataList) == 0 {
log.Printf("\n\nNOT sinkholed from subflow result: %s", err)
log.Printf("NOT sinkholed from subflow result: %s", err)
for resultIndex, result := range workflowExecution.Results {
if result.Action.ID == actionResult.Action.ID {
workflowExecution.Results[resultIndex] = actionResult
Expand Down Expand Up @@ -14106,29 +14148,29 @@ func ValidateNewWorkerExecution(ctx context.Context, body []byte) error {
}

for _, trigger := range baseExecution.Workflow.Triggers {
if trigger.ID == result.Action.ID {
//log.Printf("Found SUBFLOW id: %s", trigger.ID)

for _, param := range trigger.Parameters {
if param.Name == "check_result" && param.Value == "true" {
//log.Printf("Found check as true!")
if trigger.ID != result.Action.ID {
continue
}

var subflowData SubflowData
err = json.Unmarshal([]byte(result.Result), &subflowData)
if err != nil {
log.Printf("Failed unmarshal in subflow check for %s: %s", result.Result, err)
} else if len(subflowData.Result) == 0 {
log.Printf("There is no result yet. Don't save?")
} else {
//log.Printf("There is a result: %s", result.Result)
}
for _, param := range trigger.Parameters {
if param.Name == "check_result" && param.Value == "true" {
//log.Printf("Found check as true!")

break
var subflowData SubflowData
err = json.Unmarshal([]byte(result.Result), &subflowData)
if err != nil {
log.Printf("Failed unmarshal in subflow check for %s: %s", result.Result, err)
} else if len(subflowData.Result) == 0 {
log.Printf("There is no result yet. Don't save?")
} else {
//log.Printf("There is a result: %s", result.Result)
}
}

break
break
}
}

break
}
}
}
Expand Down Expand Up @@ -14297,7 +14339,7 @@ func RunFixParentWorkflowResult(ctx context.Context, execution WorkflowExecution

// FIXME: MAY cause transaction issues.
if updateIndex >= 0 && resultIndex >= 0 {
log.Printf("[DEBUG] Should update index %d in resultIndex %d with new result %s", updateIndex, resultIndex, execution.Result)
log.Printf("\n\n\n[DEBUG] Should update index %d in resultIndex %d with new result %s\n\n\n", updateIndex, resultIndex, execution.Result)

// Again, get the result, just in case, and update that exact value instantly
newParentExecution, err := GetWorkflowExecution(ctx, execution.ExecutionParent)
Expand Down Expand Up @@ -15360,14 +15402,14 @@ func PrepareWorkflowExecution(ctx context.Context, workflow Workflow, request *h
if sourceAuthOk {
workflowExecution.ExecutionSourceAuth = sourceAuth[0]
} else {
log.Printf("[DEBUG] Did NOT get source workflow auth")
//log.Printf("[DEBUG] Did NOT get source workflow auth")
}

sourceNode, sourceNodeOk := request.URL.Query()["source_node"]
if sourceNodeOk {
workflowExecution.ExecutionSourceNode = sourceNode[0]
} else {
log.Printf("[DEBUG] Did NOT get source workflow node")
//log.Printf("[DEBUG] Did NOT get source workflow node")
}

//workflowExecution.ExecutionSource = "default"
Expand All @@ -15376,7 +15418,7 @@ func PrepareWorkflowExecution(ctx context.Context, workflow Workflow, request *h
//log.Printf("Got source workflow %s", sourceWorkflow)
workflowExecution.ExecutionSource = sourceWorkflow[0]
} else {
log.Printf("[DEBUG] Did NOT get source workflow (real). Not critical, as it can be overwritten with reference execution matching.")
//log.Printf("[DEBUG] Did NOT get source workflow (real). Not critical, as it can be overwritten with reference execution matching.")
}

sourceExecution, sourceExecutionOk := request.URL.Query()["source_execution"]
Expand Down Expand Up @@ -21565,6 +21607,14 @@ func parseSubflowResults(ctx context.Context, result ActionResult) (ActionResult
return result, false
}

for _, param := range result.Action.Parameters {
if param.Name == "check_result" {
if param.Value == "false" {
return result, false
}
}
}

//log.Printf("\n\n\n[DEBUG] Got parent subflow result \n\n\n")

newResults := []SubflowData{}
Expand Down

0 comments on commit 9c4e9db

Please sign in to comment.