Skip to content

Commit

Permalink
Use left joins in subset queries for nullable foreign keys (#3034)
Browse files Browse the repository at this point in the history
  • Loading branch information
alishakawaguchi authored Dec 11, 2024
1 parent 3d9afc7 commit f3d58a8
Show file tree
Hide file tree
Showing 13 changed files with 404 additions and 202 deletions.
8 changes: 5 additions & 3 deletions backend/pkg/table-dependency/table-dependency.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,15 @@ type DependsOn struct {
}

type ForeignKey struct {
Columns []string
// ReferenceSchema string need to split out schema and table
Columns []string
NotNullable []bool
// ReferenceSchema string TODO: need to split out schema and table
ReferenceTable string
ReferenceColumns []string
}

type RunConfig struct {
table string // schema.table
table string // schema.table TODO: should use sqlmanager_shared.SchemaTable
selectColumns []string
insertColumns []string
dependsOn []*DependsOn // this should be a list of config names like "table.insert", rename to dependsOnConfigs
Expand Down Expand Up @@ -210,6 +211,7 @@ func GetRunConfigs(
// by checking insert columns, we can skip foreign keys that are not needed for the insert
if slices.Contains(config.insertColumns, col) {
foreignKey.Columns = append(foreignKey.Columns, col)
foreignKey.NotNullable = append(foreignKey.NotNullable, fk.NotNullable[idx])
foreignKey.ReferenceColumns = append(foreignKey.ReferenceColumns, fk.ForeignKey.Columns[idx])
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
bb_internal "github.com/nucleuscloud/neosync/internal/benthos/benthos-builder/internal"
"github.com/nucleuscloud/neosync/internal/gotypeutil"
"github.com/nucleuscloud/neosync/internal/testutil"
querybuilder "github.com/nucleuscloud/neosync/worker/pkg/query-builder2"
"github.com/nucleuscloud/neosync/worker/pkg/workflows/datasync/activities/shared"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -88,8 +89,8 @@ func Test_ProcessorConfigEmpty(t *testing.T) {
"name": &mgmtv1alpha1.JobMappingTransformer{},
},
}
queryMap := map[string]map[tabledependency.RunType]string{
"public.users": {tabledependency.RunTypeInsert: ""},
queryMap := map[string]map[tabledependency.RunType]*querybuilder.SelectQuery{
"public.users": {tabledependency.RunTypeInsert: &querybuilder.SelectQuery{Query: ""}},
}
runconfigs := []*tabledependency.RunConfig{
tabledependency.NewRunConfig("public.users", tabledependency.RunTypeInsert, []string{"id"}, nil, []string{"id", "name"}, []string{"id", "name"}, []*tabledependency.DependsOn{}, nil, false),
Expand Down Expand Up @@ -185,8 +186,8 @@ func Test_ProcessorConfigEmptyJavascript(t *testing.T) {
tabledependency.NewRunConfig("public.users", tabledependency.RunTypeInsert, []string{"id"}, nil, []string{"id", "name"}, []string{"id", "name"}, []*tabledependency.DependsOn{}, nil, false),
}

queryMap := map[string]map[tabledependency.RunType]string{
"public.users": {tabledependency.RunTypeInsert: ""},
queryMap := map[string]map[tabledependency.RunType]*querybuilder.SelectQuery{
"public.users": {tabledependency.RunTypeInsert: &querybuilder.SelectQuery{Query: ""}},
}
logger := testutil.GetTestLogger(t)
connectionId := uuid.NewString()
Expand Down
38 changes: 27 additions & 11 deletions internal/benthos/benthos-builder/builders/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
bb_shared "github.com/nucleuscloud/neosync/internal/benthos/benthos-builder/shared"
connectionmanager "github.com/nucleuscloud/neosync/internal/connection-manager"
neosync_benthos "github.com/nucleuscloud/neosync/worker/pkg/benthos"
querybuilder "github.com/nucleuscloud/neosync/worker/pkg/query-builder2"
"github.com/nucleuscloud/neosync/worker/pkg/workflows/datasync/activities/shared"
)

Expand All @@ -35,7 +36,8 @@ type sqlSyncBuilder struct {
colTransformerMap map[string]map[string]*mgmtv1alpha1.JobMappingTransformer // schema.table -> column -> transformer
sqlSourceSchemaColumnInfoMap map[string]map[string]*sqlmanager_shared.DatabaseSchemaRow // schema.table -> column -> column info struct
// merged source and destination schema. with preference given to destination schema
mergedSchemaColumnMap map[string]map[string]*sqlmanager_shared.DatabaseSchemaRow // schema.table -> column -> column info struct
mergedSchemaColumnMap map[string]map[string]*sqlmanager_shared.DatabaseSchemaRow // schema.table -> column -> column info struct
isNotForeignKeySafeSubsetMap map[string]map[tabledependency.RunType]bool // schema.table -> true if the query could return rows that violate foreign key constraints
}

type SqlSyncOption func(*SqlSyncOptions)
Expand Down Expand Up @@ -63,12 +65,13 @@ func NewSqlSyncBuilder(
opt(options)
}
return &sqlSyncBuilder{
transformerclient: transformerclient,
sqlmanagerclient: sqlmanagerclient,
redisConfig: redisConfig,
driver: databaseDriver,
selectQueryBuilder: selectQueryBuilder,
options: options,
transformerclient: transformerclient,
sqlmanagerclient: sqlmanagerclient,
redisConfig: redisConfig,
driver: databaseDriver,
selectQueryBuilder: selectQueryBuilder,
options: options,
isNotForeignKeySafeSubsetMap: map[string]map[tabledependency.RunType]bool{},
}
}

Expand Down Expand Up @@ -140,6 +143,7 @@ func (b *sqlSyncBuilder) BuildSourceConfigs(ctx context.Context, params *bb_inte
if err != nil {
return nil, err
}

primaryKeyToForeignKeysMap := getPrimaryKeyDependencyMap(filteredForeignKeysMap)
b.primaryKeyToForeignKeysMap = primaryKeyToForeignKeysMap

Expand All @@ -148,6 +152,15 @@ func (b *sqlSyncBuilder) BuildSourceConfigs(ctx context.Context, params *bb_inte
return nil, fmt.Errorf("unable to build select queries: %w", err)
}

// build map of table to runType to isNotForeignKeySafeSubset
// used in destination config to determine if foreign key violations should be skipped
for table, querymap := range tableRunTypeQueryMap {
b.isNotForeignKeySafeSubsetMap[table] = make(map[tabledependency.RunType]bool)
for runtype, q := range querymap {
b.isNotForeignKeySafeSubsetMap[table][runtype] = q.IsNotForeignKeySafeSubset
}
}

configs, err := buildBenthosSqlSourceConfigResponses(logger, ctx, b.transformerclient, groupedTableMapping, runConfigs, sourceConnection.Id, tableRunTypeQueryMap, groupedColumnInfo, filteredForeignKeysMap, colTransformerMap, job.Id, params.WorkflowId, b.redisConfig, primaryKeyToForeignKeysMap)
if err != nil {
return nil, fmt.Errorf("unable to build benthos sql source config responses: %w", err)
Expand All @@ -172,7 +185,7 @@ func buildBenthosSqlSourceConfigResponses(
groupedTableMapping map[string]*tableMapping,
runconfigs []*tabledependency.RunConfig,
dsnConnectionId string,
tableRunTypeQueryMap map[string]map[tabledependency.RunType]string,
tableRunTypeQueryMap map[string]map[tabledependency.RunType]*querybuilder.SelectQuery,
groupedColumnInfo map[string]map[string]*sqlmanager_shared.DatabaseSchemaRow,
tableDependencies map[string][]*sqlmanager_shared.ForeignConstraint,
colTransformerMap map[string]map[string]*mgmtv1alpha1.JobMappingTransformer,
Expand Down Expand Up @@ -201,7 +214,7 @@ func buildBenthosSqlSourceConfigResponses(
PooledSqlRaw: &neosync_benthos.InputPooledSqlRaw{
ConnectionId: dsnConnectionId,

Query: query,
Query: query.Query,
},
},
},
Expand Down Expand Up @@ -322,6 +335,9 @@ func (b *sqlSyncBuilder) BuildDestinationConfig(ctx context.Context, params *bb_
return nil, fmt.Errorf("unable to parse destination options: %w", err)
}

// skip foreign key violations if the query could return rows that violate foreign key constraints
skipForeignKeyViolations := destOpts.SkipForeignKeyViolations || b.isNotForeignKeySafeSubsetMap[tableKey][benthosConfig.RunType]

config.BenthosDsns = append(config.BenthosDsns, &bb_shared.BenthosDsn{ConnectionId: params.DestConnection.Id})
if benthosConfig.RunType == tabledependency.RunTypeUpdate {
args := benthosConfig.Columns
Expand All @@ -335,7 +351,7 @@ func (b *sqlSyncBuilder) BuildDestinationConfig(ctx context.Context, params *bb_
Schema: benthosConfig.TableSchema,
Table: benthosConfig.TableName,
Columns: benthosConfig.Columns,
SkipForeignKeyViolations: destOpts.SkipForeignKeyViolations,
SkipForeignKeyViolations: skipForeignKeyViolations,
MaxInFlight: int(destOpts.MaxInFlight),
WhereColumns: benthosConfig.PrimaryKeys,
ArgsMapping: buildPlainInsertArgs(args),
Expand Down Expand Up @@ -419,7 +435,7 @@ func (b *sqlSyncBuilder) BuildDestinationConfig(ctx context.Context, params *bb_
ColumnsDataTypes: columnTypes,
ColumnDefaultProperties: columnDefaultProperties,
OnConflictDoNothing: destOpts.OnConflictDoNothing,
SkipForeignKeyViolations: destOpts.SkipForeignKeyViolations,
SkipForeignKeyViolations: skipForeignKeyViolations,
RawInsertMode: b.options.rawInsertMode,
TruncateOnRetry: destOpts.Truncate,
ArgsMapping: buildPlainInsertArgs(benthosConfig.Columns),
Expand Down
3 changes: 2 additions & 1 deletion internal/benthos/benthos-builder/shared/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

sqlmanager_shared "github.com/nucleuscloud/neosync/backend/pkg/sqlmanager/shared"
tabledependency "github.com/nucleuscloud/neosync/backend/pkg/table-dependency"
querybuilder "github.com/nucleuscloud/neosync/worker/pkg/query-builder2"
)

// Holds the environment variable name and the connection id that should replace it at runtime when the Sync activity is launched
Expand All @@ -27,7 +28,7 @@ type SelectQueryMapBuilder interface {
runConfigs []*tabledependency.RunConfig,
subsetByForeignKeyConstraints bool,
groupedColumnInfo map[string]map[string]*sqlmanager_shared.DatabaseSchemaRow,
) (map[string]map[tabledependency.RunType]string, error)
) (map[string]map[tabledependency.RunType]*querybuilder.SelectQuery, error)
}

func WithEnvInterpolation(input string) string {
Expand Down
Loading

0 comments on commit f3d58a8

Please sign in to comment.