Skip to content

Commit

Permalink
Fixed more workflow continue methods on the frontend and added backen…
Browse files Browse the repository at this point in the history
…d backups for it
  • Loading branch information
frikky committed Dec 2, 2023
1 parent 7189c31 commit cb13470
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 14 deletions.
48 changes: 37 additions & 11 deletions db-connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -1275,7 +1275,6 @@ func Fixexecution(ctx context.Context, workflowExecution WorkflowExecution) Work
found := false
result := ActionResult{}


workflowExecution.Workflow.Actions[actionIndex].LargeImage = ""
workflowExecution.Workflow.Actions[actionIndex].SmallImage = ""

Expand Down Expand Up @@ -1388,6 +1387,11 @@ func Fixexecution(ctx context.Context, workflowExecution WorkflowExecution) Work
handled := []string{}
newResults := []ActionResult{}
for _, result := range workflowExecution.Results {
if result.Action.ID == "" && result.Action.Name == "" && result.Result == "" {
log.Printf("[DEBUG][%s] Removing empty result started at %d and finished at %d", workflowExecution.ExecutionId, result.StartedAt, result.CompletedAt)
continue
}

if ArrayContains(handled, result.Action.ID) {
continue
}
Expand All @@ -1397,8 +1401,13 @@ func Fixexecution(ctx context.Context, workflowExecution WorkflowExecution) Work
}

workflowExecution.Results = newResults
for varKey, variable := range workflowExecution.Workflow.ExecutionVariables {

// Sort results based on CompletedAt
sort.Slice(workflowExecution.Results, func(i, j int) bool {
return workflowExecution.Results[i].CompletedAt < workflowExecution.Results[j].CompletedAt
})

for varKey, variable := range workflowExecution.Workflow.ExecutionVariables {
for key, value := range lastexecVar {
if key == variable.Name {
workflowExecution.Workflow.ExecutionVariables[varKey].Value = value.Result
Expand All @@ -1408,12 +1417,12 @@ func Fixexecution(ctx context.Context, workflowExecution WorkflowExecution) Work
}

// Check for failures before setting to finished
// Update execution parent
if workflowExecution.Status == "EXECUTING" {
//&& workflowExecution.Workflow.Configuration.ExitOnError {

for _, result := range workflowExecution.Results {
if result.Status == "FAILURE" || result.Status == "ABORTED" {
log.Printf("[DEBUG][%s] Setting execution to aborted because of result %s (%s) with status '%s'", workflowExecution.ExecutionId, result.Action.Name, result.Action.ID, result.Status)
log.Printf("[DEBUG][%s] Setting execution to aborted because of result %s (%s) with status '%s'. Should update execution parent if it exists (not implemented).", workflowExecution.ExecutionId, result.Action.Name, result.Action.ID, result.Status)

workflowExecution.Status = "ABORTED"
if workflowExecution.CompletedAt == 0 {
Expand All @@ -1428,7 +1437,7 @@ func Fixexecution(ctx context.Context, workflowExecution WorkflowExecution) Work
// Check if finished too?
finalWorkflowExecution := SanitizeExecution(workflowExecution)
if workflowExecution.Status == "EXECUTING" && len(workflowExecution.Results) == len(workflowExecution.Workflow.Actions)+extra {
log.Printf("[DEBUG][%s] Setting execution to finished because all results are in and it was still in EXECUTING mode", workflowExecution.ExecutionId)
log.Printf("[DEBUG][%s] Setting execution to finished because all results are in and it was still in EXECUTING mode. Should set subflow parent result as well (not implemented).", workflowExecution.ExecutionId)

finalWorkflowExecution.Status = "FINISHED"
if finalWorkflowExecution.CompletedAt == 0 {
Expand Down Expand Up @@ -1534,7 +1543,7 @@ func GetWorkflowExecution(ctx context.Context, id string) (*WorkflowExecution, e
newexec := Fixexecution(ctx, *workflowExecution)
workflowExecution = &newexec

log.Printf("[DEBUG][%s] Returned execution from cache with %d results", id, len(workflowExecution.Results))
//log.Printf("[DEBUG][%s] Returned execution from cache with %d results", id, len(workflowExecution.Results))
return workflowExecution, nil
} else {
//log.Printf("[WARNING] Failed getting workflowexecution: %s", err)
Expand Down Expand Up @@ -10534,6 +10543,11 @@ func GetWorkflowRunsBySearch(ctx context.Context, orgId string, search WorkflowS
} else {
query := datastore.NewQuery(index).Filter("execution_org=", orgId).Order("-started_at").Limit(5)

// This is a trick for SupportAccess users
if len(orgId) == 0 {
query = datastore.NewQuery(index).Order("-started_at").Limit(5)
}

if len(search.WorkflowId) > 0 {
query = query.Filter("workflow_id =", search.WorkflowId)
}
Expand All @@ -10544,18 +10558,23 @@ func GetWorkflowRunsBySearch(ctx context.Context, orgId string, search WorkflowS

// String to timestamp for search.SearchFrom (string)
startTimestamp, err := time.Parse(time.RFC3339, search.SearchFrom)
endTimestamp, enderr := time.Parse(time.RFC3339, search.SearchUntil)
if err != nil {
if len(search.SearchFrom) > 0 {
//log.Printf("[WARNING] Failed parsing start time: %s", err)

// If there is no endTimestamp
if enderr != nil {
// FIXME: Set 3 months back in time
}
}
} else {
// Make it into a number instead of a string
query = query.Filter("started_at >=", startTimestamp.Unix())
}

// String to timestamp for search.SearchUntil (string)
endTimestamp, err := time.Parse(time.RFC3339, search.SearchUntil)
if err != nil {
if enderr != nil {
if len(search.SearchFrom) > 0 {
//log.Printf("[WARNING] Failed parsing end time: %s", err)
}
Expand Down Expand Up @@ -10692,7 +10711,7 @@ func GetWorkflowRunsBySearch(ctx context.Context, orgId string, search WorkflowS

removeIndexes := []int{}
for execIndex, execution := range executions {
if execution.ExecutionOrg != orgId {
if execution.ExecutionOrg != orgId && len(orgId) > 0 {
removeIndexes = append(removeIndexes, execIndex)
continue
}
Expand Down Expand Up @@ -10740,6 +10759,7 @@ func GetWorkflowRunsBySearch(ctx context.Context, orgId string, search WorkflowS
execution.Results = execution.Results[:1000]
}

/*
for resIndex, _ := range execution.Results {
if execIndex > len(executions) {
continue
Expand All @@ -10752,12 +10772,18 @@ func GetWorkflowRunsBySearch(ctx context.Context, orgId string, search WorkflowS
executions[execIndex].Results[resIndex].Action = Action{}
executions[execIndex].Results[resIndex].Result = ""
}
*/

// Set action in all execution results to empty

}

for _, removeIndex := range removeIndexes {
executions = append(executions[:removeIndex], executions[removeIndex+1:]...)
// Loop through removeIndexes backwards and remove them
for i := len(removeIndexes) - 1; i >= 0; i-- {
executions = append(executions[:removeIndexes[i]], executions[removeIndexes[i]+1:]...)
}


slice.Sort(executions[:], func(i, j int) bool {
return executions[i].StartedAt > executions[j].StartedAt
})
Expand Down
6 changes: 5 additions & 1 deletion notifications.go
Original file line number Diff line number Diff line change
Expand Up @@ -632,7 +632,11 @@ func CreateOrgNotification(ctx context.Context, title, description, referenceUrl
// continue
//}

notification.Read = false
// Added ignore as someone could want to never see a specific alert again due to e.g. expecting a 404 on purpose
if notification.Ignored {
notification.Read = false
}

notification.Amount += 1
notification.ReferenceUrl = referenceUrl

Expand Down
21 changes: 19 additions & 2 deletions shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -10232,7 +10232,7 @@ func RunExecutionTranslation(ctx context.Context, actionResult ActionResult) {
// Updateparam is a check to see if the execution should be continuously validated
func ParsedExecutionResult(ctx context.Context, workflowExecution WorkflowExecution, actionResult ActionResult, updateParam bool, retries int64) (*WorkflowExecution, bool, error) {
var err error
if actionResult.Action.ID == "" {
if actionResult.Action.ID == "" && actionResult.Action.Name == "" {
// Can we find it based on label?

log.Printf("\n\n[ERROR][%s] Failed handling EMPTY action %#v (ParsedExecutionResult). Usually ONLY happens during worker run that sets everything?\n\n", workflowExecution.ExecutionId, actionResult)
Expand Down Expand Up @@ -20913,14 +20913,31 @@ func HandleWorkflowRunSearch(resp http.ResponseWriter, request *http.Request) {
}
}

runs, cursor, err := GetWorkflowRunsBySearch(ctx, user.ActiveOrg.Id, search)
chosenOrg := user.ActiveOrg.Id
if search.IgnoreOrg == true && user.SupportAccess {
chosenOrg = ""
}

runs, cursor, err := GetWorkflowRunsBySearch(ctx, chosenOrg, search)
if err != nil {
log.Printf("[WARNING] Failed getting workflow runs by search: %s", err)
resp.WriteHeader(400)
resp.Write([]byte(`{"success": false}`))
return
}

parsedRuns := []WorkflowExecution{}
for _, run := range runs {
if run.ExecutionOrg != user.ActiveOrg.Id {
if !user.SupportAccess{
continue
}
}

parsedRuns = append(parsedRuns, run)
}

runs = parsedRuns
workflowSearchResult := WorkflowSearchResult{
Success: true,
Runs: runs,
Expand Down
3 changes: 3 additions & 0 deletions structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1201,6 +1201,7 @@ type Notification struct {
Read bool `json:"read" datastore:"read"`

ModifiedBy string `json:"modified_by" datastore:"modified_by"`
Ignored bool `json:"ignored" datastore:"ignored"`
}

type NotificationCached struct {
Expand Down Expand Up @@ -3631,6 +3632,8 @@ type WorkflowSearch struct {
Status string `json:"status"`
SearchFrom string `json:"start_time"`
SearchUntil string `json:"end_time"`

IgnoreOrg bool `json:"ignore_org"`
}

type WorkflowSearchResult struct {
Expand Down

0 comments on commit cb13470

Please sign in to comment.