Skip to content

Commit

Permalink
Merge branch 'main' of github.com:Shuffle/Shuffle-shared
Browse files Browse the repository at this point in the history
  • Loading branch information
0x0elliot committed Jan 24, 2025
2 parents e57c568 + 1adb68c commit 60e01b6
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 43 deletions.
2 changes: 1 addition & 1 deletion codegen.go
Original file line number Diff line number Diff line change
Expand Up @@ -3643,7 +3643,7 @@ func HandlePut(swagger *openapi3.Swagger, api WorkflowApp, extraParameters []Wor
}

func GetAppRequirements() string {
return "requests==2.32.3\nurllib3==2.3.0\nliquidpy==0.8.2\nMarkupSafe==3.0.2\nflask[async]==3.1.0\npython-dateutil==2.9.0.post0\nPyJWT==2.10.1\n"
return "requests==2.32.3\nurllib3==2.3.0\nliquidpy==0.8.2\nMarkupSafe==3.0.2\nflask[async]==3.1.0\npython-dateutil==2.9.0.post0\nPyJWT==2.10.1\nshufflepy==0.0.7\nshuffle-sdk==0.0.8"
}

// Removes JSON values from the input
Expand Down
31 changes: 30 additions & 1 deletion db-connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -1928,7 +1928,8 @@ func Fixexecution(ctx context.Context, workflowExecution WorkflowExecution) (Wor
}

if !skipFinished {
log.Printf("[DEBUG][%s] Setting execution to finished because all results are in and it was still in EXECUTING mode. Should set subflow parent result as well (not implemented) - just returning for now for parent function to handle.", workflowExecution.ExecutionId)
// FIXME: Is this subflow result (not implemented) valid? I think it should have been added? Hmm.
//log.Printf("[DEBUG][%s] Setting execution to finished because all results are in and it was still in EXECUTING mode. Should set subflow parent result as well (not implemented) - just returning for now for parent function to handle.", workflowExecution.ExecutionId)
finalWorkflowExecution.Status = "FINISHED"
dbsave = true
if finalWorkflowExecution.CompletedAt == 0 {
Expand Down Expand Up @@ -3234,6 +3235,30 @@ func GetWorkflow(ctx context.Context, id string) (*Workflow, error) {
cacheData := []byte(cache.([]uint8))
err = json.Unmarshal(cacheData, &workflow)
if err == nil && workflow.ID != "" {
// Somehow this can happen. Reverting to LATEST revision
if len(workflow.Actions) > 0 && len(workflow.Triggers) == 0 {
revisions, err := ListWorkflowRevisions(ctx, workflow.ID, 2)
if err != nil {
log.Printf("[WARNING] Failed getting revisions during trigger load for workflow %s: %s", workflow.ID, err)
} else {
if len(revisions) > 0 {
for _, revision := range revisions {
if revision.ID != workflow.ID {
continue
}

if len(revision.Triggers) > 0 {
workflow.Triggers = revision.Triggers
break
}
}

log.Printf("[INFO] Reverting to revision triggers for workflow %s from 0 triggers to %d triggers", workflow.ID, len(revisions[0].Triggers))
workflow.Triggers = revisions[0].Triggers
}
}
}

return workflow, nil
}
} else {
Expand Down Expand Up @@ -8211,6 +8236,10 @@ func SetWorkflowRevision(ctx context.Context, workflow Workflow) error {
}

DeleteCache(ctx, fmt.Sprintf("%s_%s", nameKey, workflow.ID))

// For workflow revision backups
DeleteCache(ctx, fmt.Sprintf("%s_%s_1", nameKey, workflow.ID))
DeleteCache(ctx, fmt.Sprintf("%s_%s_2", nameKey, workflow.ID))
}

return nil
Expand Down
79 changes: 38 additions & 41 deletions shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -5040,10 +5040,16 @@ func GetGcpSchedule(ctx context.Context, id string) (*ScheduleOld, error) {
log.Printf("[ERROR] Client error: %s", err)
return schedule, err
}

location := "europe-west2"
if len(os.Getenv("SHUFFLE_GCEPROJECT")) > 0 && len(os.Getenv("SHUFFLE_GCEPROJECT_LOCATION")) > 0 {
if len(os.Getenv("SHUFFLE_GCE_LOCATION")) > 0 {
location = os.Getenv("SHUFFLE_GCE_LOCATION")
}

if len(os.Getenv("SHUFFLE_GCE_LOCATION")) == 0 && len(os.Getenv("SHUFFLE_GCEPROJECT_LOCATION")) > 0 {
location = os.Getenv("SHUFFLE_GCEPROJECT_LOCATION")
}

req := &schedulerpb.GetJobRequest{
Name: fmt.Sprintf("projects/%s/locations/%s/jobs/schedule_%s", gceProject, location, id),
}
Expand Down Expand Up @@ -6520,7 +6526,7 @@ func diffWorkflows(oldWorkflow Workflow, parentWorkflow Workflow, update bool) {
// i think the following messes everything up with
// authenticationIDs. Let's just remove it for now.

// In case of replication,
// In case of replication,
parentWorkflow.Actions[actionIndex].AuthenticationId = ""

idFound := false
Expand Down Expand Up @@ -18491,45 +18497,10 @@ func PrepareSingleAction(ctx context.Context, user User, fileId string, body []b
return workflowExecution, err
}

/*
// FIXME: Is this required? I don't think so
if app.Authentication.Required && len(action.AuthenticationId) == 0 {

// Basic bypass check for valid headers just in case
authFound := false
for _, param := range action.Parameters {
if param.Name == "headers" || param.Name == "queries" || param.Name == "url" {
lowercased := strings.ToLower(param.Value)
if strings.Contains(lowercased, "auth") || strings.Contains(lowercased, "bearer") || strings.Contains(lowercased, "basic") || strings.Contains(lowercased, "api") {
authFound = true
break
}
}
}

if !authFound {
log.Printf("[WARNING] Tried to execute SINGLE %s WITHOUT auth (missing)", app.Name)

found := false
for _, param := range action.Parameters {
if param.Configuration {
found = true
break
}
}

if !found {
return workflowExecution, errors.New("You must authenticate this API first")
}
}
}
*/

// FIXME: We need to inject missing empty auth here
// FIXME: We need to inject missing empty auth here in some cases
// This is NOT a good solution, but a good bypass
if app.Authentication.Required {
if app.Authentication.Required && len(action.AuthenticationId) == 0 {
authFields := 0

foundFields := []string{}
for _, actionParam := range action.Parameters {
if actionParam.Configuration {
Expand Down Expand Up @@ -18568,6 +18539,32 @@ func PrepareSingleAction(ctx context.Context, user User, fileId string, body []b
})
}
}

auths, err := GetAllWorkflowAppAuth(ctx, user.ActiveOrg.Id)
if err != nil {
log.Printf("[ERROR] Failed getting auth for single action: %s", err)
} else {
//latestTimestamp := int64(0)
for _, auth := range auths {
if auth.App.ID != fileId {
continue
}

// Fallback to latest created
/*
if latestTimestamp < auth.Created {
latestTimestamp = auth.Created
action.AuthenticationId = auth.Id
}
*/

// If valid, just choose it
if auth.Validation.Valid {
action.AuthenticationId = auth.Id
break
}
}
}
}

if runValidationAction {
Expand Down Expand Up @@ -25429,7 +25426,7 @@ func GetExternalClient(baseUrl string) *http.Client {

if len(os.Getenv("SHUFFLE_INTERNAL_NO_PROXY")) > 0 {
noProxy = os.Getenv("SHUFFLE_INTERNAL_NO_PROXY")
}
}

if len(os.Getenv("SHUFFLE_INTERNAL_NOPROXY")) > 0 {
noProxy = os.Getenv("SHUFFLE_INTERNAL_NOPROXY")
Expand All @@ -25438,7 +25435,7 @@ func GetExternalClient(baseUrl string) *http.Client {

// Manage noproxy
if len(noProxy) > 0 {
isNoProxy := isNoProxyHost(noProxy, parsedUrl.Host)
isNoProxy := isNoProxyHost(noProxy, parsedUrl.Host)
if isNoProxy {
log.Printf("[INFO] Skipping proxy for %s", parsedUrl)

Expand Down

0 comments on commit 60e01b6

Please sign in to comment.