diff --git a/backend/pkg/sqlmanager/shared/types.go b/backend/pkg/sqlmanager/shared/types.go index b6769fd5e6..6a923197aa 100644 --- a/backend/pkg/sqlmanager/shared/types.go +++ b/backend/pkg/sqlmanager/shared/types.go @@ -174,3 +174,10 @@ type InitSchemaStatements struct { Label string Statements []string } + +type SelectQuery struct { + Query string + + // If true, this query could return rows that violate foreign key constraints + IsNotForeignKeySafeSubset bool +} diff --git a/internal/benthos/benthos-builder/builders/benthos-builder_test.go b/internal/benthos/benthos-builder/builders/benthos-builder_test.go index 94a41a00d5..061789f3bd 100644 --- a/internal/benthos/benthos-builder/builders/benthos-builder_test.go +++ b/internal/benthos/benthos-builder/builders/benthos-builder_test.go @@ -14,7 +14,6 @@ import ( bb_internal "github.com/nucleuscloud/neosync/internal/benthos/benthos-builder/internal" "github.com/nucleuscloud/neosync/internal/gotypeutil" "github.com/nucleuscloud/neosync/internal/testutil" - querybuilder "github.com/nucleuscloud/neosync/worker/pkg/query-builder2" "github.com/nucleuscloud/neosync/worker/pkg/workflows/datasync/activities/shared" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" @@ -89,8 +88,8 @@ func Test_ProcessorConfigEmpty(t *testing.T) { "name": &mgmtv1alpha1.JobMappingTransformer{}, }, } - queryMap := map[string]map[tabledependency.RunType]*querybuilder.SelectQuery{ - "public.users": {tabledependency.RunTypeInsert: &querybuilder.SelectQuery{Query: ""}}, + queryMap := map[string]map[tabledependency.RunType]*sqlmanager_shared.SelectQuery{ + "public.users": {tabledependency.RunTypeInsert: &sqlmanager_shared.SelectQuery{Query: ""}}, } runconfigs := []*tabledependency.RunConfig{ tabledependency.NewRunConfig("public.users", tabledependency.RunTypeInsert, []string{"id"}, nil, []string{"id", "name"}, []string{"id", "name"}, []*tabledependency.DependsOn{}, nil, false), @@ -186,8 +185,8 @@ func Test_ProcessorConfigEmptyJavascript(t *testing.T) { tabledependency.NewRunConfig("public.users", tabledependency.RunTypeInsert, []string{"id"}, nil, []string{"id", "name"}, []string{"id", "name"}, []*tabledependency.DependsOn{}, nil, false), } - queryMap := map[string]map[tabledependency.RunType]*querybuilder.SelectQuery{ - "public.users": {tabledependency.RunTypeInsert: &querybuilder.SelectQuery{Query: ""}}, + queryMap := map[string]map[tabledependency.RunType]*sqlmanager_shared.SelectQuery{ + "public.users": {tabledependency.RunTypeInsert: &sqlmanager_shared.SelectQuery{Query: ""}}, } logger := testutil.GetTestLogger(t) connectionId := uuid.NewString() diff --git a/internal/benthos/benthos-builder/builders/sql.go b/internal/benthos/benthos-builder/builders/sql.go index 7524542c0e..4d47fb5510 100644 --- a/internal/benthos/benthos-builder/builders/sql.go +++ b/internal/benthos/benthos-builder/builders/sql.go @@ -18,7 +18,6 @@ import ( bb_shared "github.com/nucleuscloud/neosync/internal/benthos/benthos-builder/shared" connectionmanager "github.com/nucleuscloud/neosync/internal/connection-manager" neosync_benthos "github.com/nucleuscloud/neosync/worker/pkg/benthos" - querybuilder "github.com/nucleuscloud/neosync/worker/pkg/query-builder2" "github.com/nucleuscloud/neosync/worker/pkg/workflows/datasync/activities/shared" ) @@ -166,7 +165,7 @@ func buildBenthosSqlSourceConfigResponses( groupedTableMapping map[string]*tableMapping, runconfigs []*tabledependency.RunConfig, dsnConnectionId string, - tableRunTypeQueryMap map[string]map[tabledependency.RunType]*querybuilder.SelectQuery, + tableRunTypeQueryMap map[string]map[tabledependency.RunType]*sqlmanager_shared.SelectQuery, groupedColumnInfo map[string]map[string]*sqlmanager_shared.DatabaseSchemaRow, tableDependencies map[string][]*sqlmanager_shared.ForeignConstraint, colTransformerMap map[string]map[string]*mgmtv1alpha1.JobMappingTransformer, diff --git a/internal/benthos/benthos-builder/shared/types.go b/internal/benthos/benthos-builder/shared/types.go index 67d21a5f3d..5c18070ab9 100644 --- a/internal/benthos/benthos-builder/shared/types.go +++ b/internal/benthos/benthos-builder/shared/types.go @@ -5,7 +5,6 @@ import ( sqlmanager_shared "github.com/nucleuscloud/neosync/backend/pkg/sqlmanager/shared" tabledependency "github.com/nucleuscloud/neosync/backend/pkg/table-dependency" - querybuilder "github.com/nucleuscloud/neosync/worker/pkg/query-builder2" ) // Holds the environment variable name and the connection id that should replace it at runtime when the Sync activity is launched @@ -28,7 +27,7 @@ type SelectQueryMapBuilder interface { runConfigs []*tabledependency.RunConfig, subsetByForeignKeyConstraints bool, groupedColumnInfo map[string]map[string]*sqlmanager_shared.DatabaseSchemaRow, - ) (map[string]map[tabledependency.RunType]*querybuilder.SelectQuery, error) + ) (map[string]map[tabledependency.RunType]*sqlmanager_shared.SelectQuery, error) } func WithEnvInterpolation(input string) string { diff --git a/worker/pkg/query-builder2/subset.go b/worker/pkg/query-builder2/subset.go index 57301f556c..84c0425141 100644 --- a/worker/pkg/query-builder2/subset.go +++ b/worker/pkg/query-builder2/subset.go @@ -13,7 +13,7 @@ func BuildSelectQueryMap( runConfigs []*tabledependency.RunConfig, subsetByForeignKeyConstraints bool, groupedColumnInfo map[string]map[string]*sqlmanager_shared.DatabaseSchemaRow, -) (map[string]map[tabledependency.RunType]*SelectQuery, error) { +) (map[string]map[tabledependency.RunType]*sqlmanager_shared.SelectQuery, error) { tableDependencies := map[string]*TableConstraints{} for _, rc := range runConfigs { if rc.RunType() != tabledependency.RunTypeInsert { @@ -56,17 +56,17 @@ func BuildSelectQueryMap( qb.AddWhereCondition(schema, table, qualifiedWhereCaluse) } - querymap := map[string]map[tabledependency.RunType]*SelectQuery{} + querymap := map[string]map[tabledependency.RunType]*sqlmanager_shared.SelectQuery{} for _, cfg := range runConfigs { if _, ok := querymap[cfg.Table()]; !ok { - querymap[cfg.Table()] = map[tabledependency.RunType]*SelectQuery{} + querymap[cfg.Table()] = map[tabledependency.RunType]*sqlmanager_shared.SelectQuery{} } schema, table := splitTable(cfg.Table()) query, _, isNotForeignKeySafe, err := qb.BuildQuery(schema, table) if err != nil { return nil, err } - querymap[cfg.Table()][cfg.RunType()] = &SelectQuery{ + querymap[cfg.Table()][cfg.RunType()] = &sqlmanager_shared.SelectQuery{ Query: query, IsNotForeignKeySafeSubset: isNotForeignKeySafe, } diff --git a/worker/pkg/query-builder2/wrapper.go b/worker/pkg/query-builder2/wrapper.go index 259cd30c0f..296ebc23c2 100644 --- a/worker/pkg/query-builder2/wrapper.go +++ b/worker/pkg/query-builder2/wrapper.go @@ -8,20 +8,13 @@ import ( // QueryMapBuilderWrapper implements the SelectQueryMapBuilder interface type QueryMapBuilderWrapper struct{} -type SelectQuery struct { - Query string - - // If true, this query could return rows that violate foreign key constraints - IsNotForeignKeySafeSubset bool -} - // BuildSelectQueryMap wraps the original BuildSelectQueryMap function func (w *QueryMapBuilderWrapper) BuildSelectQueryMap( driver string, runConfigs []*tabledependency.RunConfig, subsetByForeignKeyConstraints bool, groupedColumnInfo map[string]map[string]*sqlmanager_shared.DatabaseSchemaRow, -) (map[string]map[tabledependency.RunType]*SelectQuery, error) { +) (map[string]map[tabledependency.RunType]*sqlmanager_shared.SelectQuery, error) { return BuildSelectQueryMap( driver, runConfigs,