Skip to content

Commit

Permalink
fixed conflicts
Browse files Browse the repository at this point in the history
  • Loading branch information
yashsinghcodes committed Feb 4, 2025
2 parents 8b95450 + 8fd4bc5 commit 22b9936
Show file tree
Hide file tree
Showing 9 changed files with 1,630 additions and 1,281 deletions.
11 changes: 6 additions & 5 deletions app_upload/go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
module stitcher

go 1.22.0
go 1.22.7

toolchain go1.22.10

replace github.com/shuffle/shuffle-shared => ../
Expand Down Expand Up @@ -97,13 +98,13 @@ require (
go.opentelemetry.io/otel/trace v1.33.0 // indirect
go.opentelemetry.io/proto/otlp v1.4.0 // indirect
go4.org v0.0.0-20201209231011-d4a079459e60 // indirect
golang.org/x/crypto v0.31.0 // indirect
golang.org/x/crypto v0.32.0 // indirect
golang.org/x/mod v0.17.0 // indirect
golang.org/x/net v0.33.0 // indirect
golang.org/x/net v0.34.0 // indirect
golang.org/x/oauth2 v0.23.0 // indirect
golang.org/x/sync v0.10.0 // indirect
golang.org/x/sys v0.28.0 // indirect
golang.org/x/term v0.27.0 // indirect
golang.org/x/sys v0.29.0 // indirect
golang.org/x/term v0.28.0 // indirect
golang.org/x/text v0.21.0 // indirect
golang.org/x/time v0.5.0 // indirect
golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d // indirect
Expand Down
16 changes: 8 additions & 8 deletions app_upload/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -355,8 +355,8 @@ golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8U
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/crypto v0.31.0 h1:ihbySMvVjLAeSH1IbfcRTkD/iNscyz8rGzjF/E5hV6U=
golang.org/x/crypto v0.31.0/go.mod h1:kDsLvtWBEx7MV9tJOj9bnXsPbxwJQ6csT/x4KIN4Ssk=
golang.org/x/crypto v0.32.0 h1:euUpcYgM8WcP71gNpTqQCn6rC2t6ULUPiOzfWaXVVfc=
golang.org/x/crypto v0.32.0/go.mod h1:ZnnJkOaASj8g0AjIduWNlq2NRxL0PlBrbKVyZ6V/Ugc=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8=
Expand Down Expand Up @@ -411,8 +411,8 @@ golang.org/x/net v0.0.0-20211216030914-fe4d6282115f/go.mod h1:9nx3DQGgdP8bBQD5qx
golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
golang.org/x/net v0.1.0/go.mod h1:Cx3nUiGt4eDBEyega/BKRp+/AlGL8hYe7U9odMt2Cco=
golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
golang.org/x/net v0.33.0 h1:74SYHlV8BIgHIFC/LrYkOGIwL19eTYXQ5wc6TBuO36I=
golang.org/x/net v0.33.0/go.mod h1:HXLR5J+9DxmrqMwG9qjGCxZ+zKXxBru04zlTvWlWuN4=
golang.org/x/net v0.34.0 h1:Mb7Mrk043xzHgnRM88suvJFwzVrRfHEHJEl5/71CKw0=
golang.org/x/net v0.34.0/go.mod h1:di0qlW3YNM5oh6GqDGQr92MyTozJPmybPK4Ev/Gm31k=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
Expand Down Expand Up @@ -455,14 +455,14 @@ golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA=
golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.29.0 h1:TPYlXGxvx1MGTn2GiZDhnjPA9wZzZeGKHHmKhHYvgaU=
golang.org/x/sys v0.29.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.1.0/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k=
golang.org/x/term v0.27.0 h1:WP60Sv1nlK1T6SupCHbXzSaN0b9wUmsPoRS9b61A23Q=
golang.org/x/term v0.27.0/go.mod h1:iMsnZpn0cago0GOrHO2+Y7u7JPn5AylBrcoWkElMTSM=
golang.org/x/term v0.28.0 h1:/Ts8HFuMR2E6IP/jlo7QVLZHggjKQbhu/7H0LJFr3Gg=
golang.org/x/term v0.28.0/go.mod h1:Sw/lC2IAUZ92udQNf3WodGtn4k/XoLyZoh8v/8uiwek=
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
Expand Down
4 changes: 2 additions & 2 deletions app_upload/stitcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -865,8 +865,8 @@ func main() {
bucketName = os.Args[5]
}

appname := "shuffle-tools"
appversion := "1.2.0"
appname := "shuffle-ai"
appversion := "1.0.0"
err := deployConfigToBackend(appfolder, appname, appversion)
if err != nil {
log.Printf("[WARNING] Failed uploading config: %s", err)
Expand Down
6 changes: 6 additions & 0 deletions blobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,12 @@ func GetAppCategories() []AppCategory {
Icon: "network",
ActionLabels: []string{"Get Rules", "Allow IP", "Block IP"},
},
AppCategory{
Name: "AI",
Color: "#FFC107",
Icon: "AI",
ActionLabels: []string{"Answer Question", "Run Action"},
},
AppCategory{
Name: "Other",
Color: "#FFC107",
Expand Down
2 changes: 1 addition & 1 deletion codegen.go
Original file line number Diff line number Diff line change
Expand Up @@ -3643,7 +3643,7 @@ func HandlePut(swagger *openapi3.Swagger, api WorkflowApp, extraParameters []Wor
}

func GetAppRequirements() string {
return "requests==2.32.3\nurllib3==2.3.0\nliquidpy==0.8.2\nMarkupSafe==3.0.2\nflask[async]==3.1.0\npython-dateutil==2.9.0.post0\nPyJWT==2.10.1\nshufflepy==0.0.7\nshuffle-sdk==0.0.8"
return "requests==2.32.3\nurllib3==2.3.0\nliquidpy==0.8.2\nMarkupSafe==3.0.2\nflask[async]==3.1.0\npython-dateutil==2.9.0.post0\nPyJWT==2.10.1\nshufflepy==0.0.7\nshuffle-sdk==0.0.10\n"
}

// Removes JSON values from the input
Expand Down
115 changes: 103 additions & 12 deletions db-connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ import (
"os"
"strconv"

//"strconv"
//"encoding/binary"
"math"
"math/rand"
"sort"
Expand Down Expand Up @@ -1583,6 +1581,9 @@ func GetExecutionVariables(ctx context.Context, executionId string) (string, int
}

func getExecutionFileValue(ctx context.Context, workflowExecution WorkflowExecution, action ActionResult) (string, error) {
projectName := os.Getenv("SHUFFLE_GCEPROJECT")
bucketName := project.BucketName

fullParsedPath := fmt.Sprintf("large_executions/%s/%s_%s", workflowExecution.ExecutionOrg, workflowExecution.ExecutionId, action.Action.ID)

cacheKey := fmt.Sprintf("%s_%s_action_replace", workflowExecution.ExecutionId, action.Action.ID)
Expand All @@ -1594,11 +1595,24 @@ func getExecutionFileValue(ctx context.Context, workflowExecution WorkflowExecut
}
}

bucket := project.StorageClient.Bucket(project.BucketName)
bucket := project.StorageClient.Bucket(bucketName)
obj := bucket.Object(fullParsedPath)
fileReader, err := obj.NewReader(ctx)
if err != nil {
return "", err
log.Printf("[ERROR] Failed reading file from bucket %s: %s. Will try with alternative solution.", bucketName, err)

if projectName != "shuffler" {
bucketName = fmt.Sprintf("%s.appspot.com", projectName)
bucket = project.StorageClient.Bucket(bucketName)
obj = bucket.Object(fullParsedPath)
fileReader, err = obj.NewReader(ctx)
if err != nil {
log.Printf("[ERROR] Failed reading file again from bucket %s: %s", bucketName, err)
return "", err
}
} else {
return "", err
}
}

data, err := ioutil.ReadAll(fileReader)
Expand Down Expand Up @@ -2066,6 +2080,8 @@ func GetWorkflowExecution(ctx context.Context, id string) (*WorkflowExecution, e
if err == nil {
cacheData := []byte(cache.([]uint8))
err = json.Unmarshal(cacheData, &workflowExecution)


if err == nil || len(workflowExecution.ExecutionId) > 0 {
//log.Printf("[DEBUG] Checking individual execution cache with %d results", len(workflowExecution.Results))
if strings.Contains(workflowExecution.ExecutionArgument, "Result too large to handle") {
Expand Down Expand Up @@ -3266,12 +3282,22 @@ func GetWorkflow(ctx context.Context, id string) (*Workflow, error) {
}
}

log.Printf("[INFO] Reverting to revision triggers for workflow %s from 0 triggers to %d triggers", workflow.ID, len(revisions[0].Triggers))
//log.Printf("[INFO] Reverting to revision triggers for workflow %s from 0 triggers to %d triggers", workflow.ID, len(revisions[0].Triggers))
workflow.Triggers = revisions[0].Triggers
}
}
}

healthWorkflow, _, err := GetStaticWorkflowHealth(ctx, *workflow)
if err != nil {
if !strings.Contains(err.Error(), "Org ID not set") {
log.Printf("[ERROR] Failed getting static workflow health for workflow %s: %s (2)", workflow.ID, err)
}

} else {
workflow = &healthWorkflow
}

return workflow, nil
}
} else {
Expand Down Expand Up @@ -3357,14 +3383,24 @@ func GetWorkflow(ctx context.Context, id string) (*Workflow, error) {
}
}

log.Printf("[INFO] Reverting to revision triggers for workflow %s from 0 triggers to %d triggers", workflow.ID, len(revisions[0].Triggers))
//log.Printf("[INFO] Reverting to revision triggers for workflow %s from 0 triggers to %d triggers", workflow.ID, len(revisions[0].Triggers))
workflow.Triggers = revisions[0].Triggers
}
}
}

newWorkflow := FixWorkflowPosition(ctx, *workflow)
workflow = &newWorkflow

healthWorkflow, _, err := GetStaticWorkflowHealth(ctx, *workflow)
if err != nil {
if !strings.Contains(err.Error(), "Org ID not set") {
log.Printf("[ERROR] Failed getting static workflow health for workflow %s: %s (2)", workflow.ID, err)
}
} else {
workflow = &healthWorkflow
}

if project.CacheDb && workflow.ID != "" {
//log.Printf("[DEBUG] Setting cache for workflow %s", cacheKey)
data, err := json.Marshal(workflow)
Expand Down Expand Up @@ -4962,12 +4998,12 @@ func GetOpenApiDatastore(ctx context.Context, id string) (ParsedOpenApi, error)
err := project.Dbclient.Get(ctx, key, api)
//if (err != nil || len(api.Body) == 0) && !strings.Contains(fmt.Sprintf("%s", err), "no such") {
if err != nil || len(api.Body) == 0 {
log.Printf("Some API issue: %s", err)

if strings.Contains(fmt.Sprintf("%s", err), "cannot load field") {
return *api, nil
}

log.Printf("[ERROR] Some OpenAPI refissue for ID '%s': %s", id, err)

//project.BucketName := project.BucketName
fullParsedPath := fmt.Sprintf("extra_specs/%s/openapi.json", id)
//gs://shuffler.appspot.com/extra_specs/0373ed696a3a2cba0a2b6838068f2b80
Expand Down Expand Up @@ -5166,6 +5202,20 @@ func FindWorkflowAppByName(ctx context.Context, appName string) ([]WorkflowApp,
var apps []WorkflowApp

nameKey := "workflowapp"
cacheKey := fmt.Sprintf("%s_appname_%s", appName)
if project.CacheDb {
cache, err := GetCache(ctx, cacheKey)
if err == nil {
cacheData := []byte(cache.([]uint8))
err = json.Unmarshal(cacheData, &apps)
if err == nil {
return apps, nil
}
} else {
//log.Printf("[DEBUG] Failed getting cache for user: %s", err)
}
}

if project.DbType == "opensearch" {
var buf bytes.Buffer
query := map[string]interface{}{
Expand Down Expand Up @@ -5234,14 +5284,27 @@ func FindWorkflowAppByName(ctx context.Context, appName string) ([]WorkflowApp,
}
} else {
//log.Printf("Looking for name %s in %s", appName, nameKey)
q := datastore.NewQuery(nameKey).Filter("name =", appName)
q := datastore.NewQuery(nameKey).Filter("Name =", appName).Limit(6)
_, err := project.Dbclient.GetAll(ctx, q, &apps)
if err != nil && len(apps) == 0 {
log.Printf("[WARNING] Failed getting apps for name: %s", appName)
return apps, err
}
}

if project.CacheDb {
data, err := json.Marshal(apps)
if err != nil {
log.Printf("[WARNING] Failed marshalling apps for appname %s: %s", appName, err)
return apps, nil
}

err = SetCache(ctx, cacheKey, data, 1440)
if err != nil {
log.Printf("[WARNING] Failed updating cache: %s", err)
}
}

log.Printf("[INFO] Found %d apps for name %s in db-connector", len(apps), appName)
return apps, nil
}
Expand Down Expand Up @@ -6216,13 +6279,26 @@ func GetPrioritizedApps(ctx context.Context, user User) ([]WorkflowApp, error) {
if user.ActiveOrg.Id != "" {
query := datastore.NewQuery(nameKey).Filter("reference_org =", user.ActiveOrg.Id).Limit(queryLimit)
//log.Printf("[INFO] Before ref org search. Org: %s\n\n", user.ActiveOrg.Id)
maxAmount := 100
cnt := 0
for {
it := project.Dbclient.Run(ctx, query)
if cnt > maxAmount {
log.Printf("[ERROR] Maximum try exceeded for workflowapp (1)")
break
}

for {
innerApp := WorkflowApp{}
_, err := it.Next(&innerApp)
cnt += 1
if cnt > maxAmount {
log.Printf("[ERROR] Maximum try exceeded for workfloapp (2)")
break
}

if err != nil {
//log.Printf("[INFO] Failed fetching results: %v", err)
if strings.Contains(fmt.Sprintf("%s", err), "cannot load field") {
//log.Printf("[ERROR] Error in reference_org app load of %s (%s): %s.", innerApp.Name, innerApp.ID, err)
} else {
Expand Down Expand Up @@ -7804,7 +7880,7 @@ func ListChildWorkflows(ctx context.Context, originalId string) ([]Workflow, err
}
}

log.Printf("[AUDIT] Getting workflow children for workflow %s.", originalId)
//log.Printf("[AUDIT] Getting workflow children for workflow %s.", originalId)
if project.DbType == "opensearch" {
var buf bytes.Buffer
query := map[string]interface{}{
Expand Down Expand Up @@ -7985,7 +8061,7 @@ func ListWorkflowRevisions(ctx context.Context, originalId string, amount int) (
}
}

log.Printf("[AUDIT] Getting workflow revisions for workflow %s.", originalId)
//log.Printf("[AUDIT] Getting workflow revisions for workflow %s.", originalId)
if project.DbType == "opensearch" {
var buf bytes.Buffer
query := map[string]interface{}{
Expand Down Expand Up @@ -8251,8 +8327,11 @@ func SetWorkflowRevision(ctx context.Context, workflow Workflow) error {
DeleteCache(ctx, fmt.Sprintf("%s_%s", nameKey, workflow.ID))

// For workflow revision backups
DeleteCache(ctx, fmt.Sprintf("%s_%s_1", nameKey, workflow.ID))
go DeleteCache(ctx, fmt.Sprintf("%s_%s_1", nameKey, workflow.ID))
go DeleteCache(ctx, fmt.Sprintf("%s_%s_200", nameKey, workflow.ID))
// Actively used keys
DeleteCache(ctx, fmt.Sprintf("%s_%s_2", nameKey, workflow.ID))
DeleteCache(ctx, fmt.Sprintf("%s_%s_50", nameKey, workflow.ID))
}

return nil
Expand Down Expand Up @@ -11004,16 +11083,28 @@ func GetAllWorkflowExecutionsV2(ctx context.Context, workflowId string, amount i
// Create a timeout to prevent the query from taking more than 5 seconds total

cursorStr := ""
maxAmount := 100
cnt := 0
for {
it := project.Dbclient.Run(ctx, query)
if cnt > maxAmount {
log.Printf("[ERROR] Error getting workflow execution (4): reached maximum retries")
break
}

breakOuter := false
for {
innerWorkflow := WorkflowExecution{}
_, err := it.Next(&innerWorkflow)
if cnt > maxAmount {
log.Printf("[ERROR] Error getting workflow execution (3): reached maximum retries")
break
}

if err != nil {
if strings.Contains(err.Error(), "context deadline exceeded") {
log.Printf("[WARNING] Error getting workflow executions (1): %s", err)
cnt += 1
breakOuter = true
} else {
if strings.Contains(err.Error(), `cannot load field`) {
Expand Down
Loading

0 comments on commit 22b9936

Please sign in to comment.