Skip to content

Commit

Permalink
fix: adding in update logic
Browse files Browse the repository at this point in the history
  • Loading branch information
0x0elliot committed Jan 9, 2025
1 parent 51cac7b commit c485370
Showing 1 changed file with 163 additions and 145 deletions.
308 changes: 163 additions & 145 deletions shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -6133,6 +6133,150 @@ func diffWorkflowWrapper(parentWorkflow Workflow) Workflow {
return parentWorkflow
}

func subflowPropagationWrapper(parentWorkflow Workflow, childWorkflow Workflow, parentTrigger Trigger) Trigger {
// remember: when this function is used, the parent trigger is passed to
// create the new child trigger.
trigger := parentTrigger

for paramIndex, param := range trigger.Parameters {
// since this is an added subflow, the workflow being referred
// is most likely not already distributed. let's do that.
if param.Name == "workflow" {
parentSubflowPointedId := param.Value

if len(parentSubflowPointedId) == 0 {
continue
}

ctx := context.Background()

// propagate reference workflow to child org
// check first if the workflow has been propagated before
// to the suborg (ParentWorkflowId is this workflow's ID)
childOrg, err := GetOrg(ctx, childWorkflow.OrgId)
if err != nil {
log.Printf("[WARNING] Failed getting org: %s", err)
continue
}

user := User{
Role: "admin",
ActiveOrg: OrgMini{
Id: childOrg.Id,
},
}

childOrgWorkflows, err := GetAllWorkflowsByQuery(ctx, user, 250, "")
if err != nil {
log.Printf("[WARNING] Failed getting org workflows: %s", err)
continue
}

propagatedEarlier := false
alreadyPropagatedSubflow := ""

for _, workflow := range childOrgWorkflows {
// this means that the subflow has been propagated to
// child workflow already. no need to complicate things further.
if workflow.ParentWorkflowId == parentSubflowPointedId {
propagatedEarlier = true
alreadyPropagatedSubflow = workflow.ID
break
}
}

if propagatedEarlier {
// just make sure that it now points to that workflow
trigger.Parameters[paramIndex].Value = alreadyPropagatedSubflow

// get workflow
workflow, err := GetWorkflow(ctx, alreadyPropagatedSubflow)
if err != nil {
log.Printf("[WARNING] Failed getting propagated subflow: %s", err)
continue
}

startNodeIndexToOverwrite := -1
currentStartNode := ""

// taking the right startnode is important
for startNodeIndex, startNode := range trigger.Parameters {
if startNode.Name == "startnode" {
startNodeIndexToOverwrite = startNodeIndex
currentStartNode = startNode.Value
}
}

if len(currentStartNode) == 0 {
continue
}

for _, action := range workflow.Actions {
if action.ID == currentStartNode {
trigger.Parameters[startNodeIndexToOverwrite].Value = action.ID
break
}
}

continue
}

parentSubflowPointed, err := GetWorkflow(ctx, parentSubflowPointedId)
if err != nil {
log.Printf("[WARNING] Failed getting parent subflow: %s", err)
continue
}

parentSubflowPointed.SuborgDistribution = append(parentSubflowPointed.SuborgDistribution, childWorkflow.OrgId)

err = SetWorkflow(ctx, *parentSubflowPointed, parentSubflowPointedId)
if err != nil {
log.Printf("[WARNING] Failed setting parent subflow: %s", err)
continue
}

propagatedSubflow, err := GenerateWorkflowFromParent(ctx, *parentSubflowPointed, parentSubflowPointed.OrgId, childWorkflow.OrgId)
if err != nil {
log.Printf("[WARNING] Failed to generate child workflow %s (%s) for %s (%s): %s", childWorkflow.Name, childWorkflow.ID, parentWorkflow.Name, parentWorkflow.ID, err)
} else {
log.Printf("[INFO] Generated child workflow %s (%s) for %s (%s)", childWorkflow.Name, childWorkflow.ID, parentWorkflow.Name, parentWorkflow.ID)

trigger.Parameters[paramIndex].Value = propagatedSubflow.ID
}

startnode := ""
startNodeParamIndex := -1

// now handle startnode
for startNodeParamIndex_, param_ := range trigger.Parameters {
if param_.Name == "startnode" {
startnode = param_.Value
startNodeParamIndex = startNodeParamIndex_
}
}

if len(startnode) == 0 {
continue
}

// actions are always startnodes
// find the equivalent of the startnode in the new workflow
for _, action := range propagatedSubflow.Actions {
if action.ID == startnode {
trigger.Parameters[startNodeParamIndex].Value = action.ID
break
}
}

} else if param.Name != "startnode" && param.Name != "startnode" {
// just use it
trigger.Parameters[paramIndex].Value = param.Value
}
}

return trigger
}

func diffWorkflows(oldWorkflow Workflow, parentWorkflow Workflow, update bool) {
// Check if there is a difference in actions, and what they are
// Check if there is a difference in triggers, and what they are
Expand Down Expand Up @@ -6626,119 +6770,9 @@ func diffWorkflows(oldWorkflow Workflow, parentWorkflow Workflow, update bool) {
}
}
} else if trigger.TriggerType == "SUBFLOW" {
// // params: workflow, argument, user_apikey, startnode,
// // check_result and auth_override

for paramIndex, param := range trigger.Parameters {
// since this is an added subflow, the workflow being referred
// is most likely not already distributed. let's do that.
if param.Name == "workflow" {
parentSubflowPointedId := param.Value

if len(parentSubflowPointedId) == 0 {
continue
}

ctx := context.Background()

// propagate reference workflow to child org
// check first if the workflow has been propagated before
// to the suborg (ParentWorkflowId is this workflow's ID)
childOrg, err := GetOrg(ctx, childWorkflow.OrgId)
if err != nil {
log.Printf("[WARNING] Failed getting org: %s", err)
continue
}

user := User{
Role: "admin",
ActiveOrg: OrgMini{
Id: childOrg.Id,
},
}

childOrgWorkflows, err := GetAllWorkflowsByQuery(ctx, user, 250, "")
if err != nil {
log.Printf("[WARNING] Failed getting org workflows: %s", err)
continue
}

propagatedEarlier := false
alreadyPropagatedSubflow := ""

for _, workflow := range childOrgWorkflows {
if workflow.ParentWorkflowId == parentSubflowPointedId {
propagatedEarlier = true
alreadyPropagatedSubflow = workflow.ID
break
}
}

if propagatedEarlier {
// just make sure that it now points to that workflow
trigger.Parameters[paramIndex].Value = alreadyPropagatedSubflow
continue
}

parentSubflowPointed, err := GetWorkflow(ctx, parentSubflowPointedId)
if err != nil {
log.Printf("[WARNING] Failed getting parent subflow: %s", err)
continue
}

parentSubflowPointed.SuborgDistribution = append(parentSubflowPointed.SuborgDistribution, childWorkflow.OrgId)

err = SetWorkflow(ctx, *parentSubflowPointed, parentSubflowPointedId)
if err != nil {
log.Printf("[WARNING] Failed setting parent subflow: %s", err)
continue
}

propagatedSubflow, err := GenerateWorkflowFromParent(ctx, *parentSubflowPointed, parentSubflowPointed.OrgId, childWorkflow.OrgId)
if err != nil {
log.Printf("[WARNING] Failed to generate child workflow %s (%s) for %s (%s): %s", childWorkflow.Name, childWorkflow.ID, parentWorkflow.Name, parentWorkflow.ID, err)
} else {
log.Printf("[INFO] Generated child workflow %s (%s) for %s (%s)", childWorkflow.Name, childWorkflow.ID, parentWorkflow.Name, parentWorkflow.ID)

trigger.Parameters[paramIndex].Value = propagatedSubflow.ID
}

startnode := ""
startNodeParamIndex := -1

// now handle startnode
for startNodeParamIndex_, param_ := range trigger.Parameters {
if param_.Name == "startnode" {
startnode = param_.Value
startNodeParamIndex = startNodeParamIndex_
}
}

if len(startnode) == 0 {
continue
}

// find the equivalent of the startnode in the new workflow
for _, action := range propagatedSubflow.Actions {
if action.ID == startnode {
trigger.Parameters[startNodeParamIndex].Value = action.ID
break
}
}

// sometimes, it can happen that it's a replaced trigger
for _, trigger := range propagatedSubflow.Triggers {
if trigger.ReplacementForTrigger == startnode {
trigger.Parameters[startNodeParamIndex].Value = trigger.ID
break
}
}

} else if param.Name != "startnode" && param.Name != "startnode" {
// just use it
trigger.Parameters[paramIndex].Value = param.Value
}
}
// params: workflow, argument, user_apikey, startnode,
// check_result and auth_override
trigger = subflowPropagationWrapper(parentWorkflow, childWorkflow, trigger)
}

for branchIndex, branch := range childWorkflow.Branches {
Expand Down Expand Up @@ -6795,16 +6829,14 @@ func diffWorkflows(oldWorkflow Workflow, parentWorkflow Workflow, update bool) {
childWorkflow.Triggers[index].Label = action.Label
childWorkflow.Triggers[index].Position = action.Position
childWorkflow.Triggers[index].AppVersion = action.AppVersion
childWorkflow.Triggers[index].IsStartNode = action.IsStartNode
break
} else if action.TriggerType == "SCHEDULE" {
// make sure to override: name, label, position,
// app_version, startnode and parameters
// app_version and parameters
childWorkflow.Triggers[index].Name = action.Name
childWorkflow.Triggers[index].Label = action.Label
childWorkflow.Triggers[index].Position = action.Position
childWorkflow.Triggers[index].AppVersion = action.AppVersion
childWorkflow.Triggers[index].IsStartNode = action.IsStartNode
// i don't want schedules to start or stop according to the parent workflow.
// thus, doing what i did here.
for paramIndex, param := range action.Parameters {
Expand All @@ -6818,6 +6850,21 @@ func diffWorkflows(oldWorkflow Workflow, parentWorkflow Workflow, update bool) {
}

break
} else if action.TriggerType == "SUBFLOW" {
// make sure to override: name, label, position,
// app_version, startnode and parameters
childWorkflow.Triggers[index].Name = action.Name
childWorkflow.Triggers[index].Label = action.Label
childWorkflow.Triggers[index].Position = action.Position
childWorkflow.Triggers[index].AppVersion = action.AppVersion

// essentially, now we try to verify:
// okay, new workflow? we see it's a subflow that's
// what changed? is it the workflow?

action = subflowPropagationWrapper(parentWorkflow, childWorkflow, action)
childWorkflow.Triggers[index].Parameters = action.Parameters
break
}

childWorkflow.Triggers[index] = action
Expand Down Expand Up @@ -9946,35 +9993,6 @@ func GenerateWorkflowFromParent(ctx context.Context, workflow Workflow, parentOr
newWf.Branches[branchIndex].DestinationID = newWf.Triggers[triggerIndex].ID
}
}
} else if newWf.Triggers[triggerIndex].TriggerType == "SUBFLOW" {
oldID := newWf.Triggers[triggerIndex].ID
newWf.Triggers[triggerIndex].ID = uuid.NewV4().String()

newWf.Triggers[triggerIndex].ReplacementForTrigger = oldID

// loop through workflow trigger parameters
for paramIndex, param := range newWf.Triggers[triggerIndex].Parameters {
if param.Name == "workflow" {
if len(param.Value) == 0 {
break
}

subflow, err := GetWorkflow(ctx, param.Value)
if err != nil {
log.Printf("[ERROR] Failed getting subflow %s: %s", param.Value, err)
break
}

// distribute the subflow further to this suborg, and then take the new id
childWorkflow, err := GenerateWorkflowFromParent(ctx, *subflow, parentOrgId, subOrgId)
if err != nil {
log.Printf("[ERROR] Failed generating subflow %s: %s", subflow.ID, err)
break
}

newWf.Triggers[triggerIndex].Parameters[paramIndex].Value = childWorkflow.ID
}
}
}
}

Expand Down

0 comments on commit c485370

Please sign in to comment.