Skip to content

Commit

Permalink
remove progress interface completely (#5426)
Browse files Browse the repository at this point in the history
* remove progress interface

* remove progress interface
  • Loading branch information
k-anshul committed Aug 9, 2024
1 parent 6b076d2 commit ca4f66d
Show file tree
Hide file tree
Showing 11 changed files with 7 additions and 32 deletions.
4 changes: 2 additions & 2 deletions runtime/drivers/duckdb/transporter_duckDB_to_duckDB_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func TestDuckDBToDuckDBTransfer(t *testing.T) {
tr := NewDuckDBToDuckDB(olap, zap.NewNop())

// transfer once
err = tr.Transfer(context.Background(), map[string]any{"sql": "SELECT * FROM foo", "db": filepath.Join(tempDir, "tranfser.db")}, map[string]any{"table": "test"}, &drivers.TransferOptions{Progress: drivers.NoOpProgress{}})
err = tr.Transfer(context.Background(), map[string]any{"sql": "SELECT * FROM foo", "db": filepath.Join(tempDir, "tranfser.db")}, map[string]any{"table": "test"}, &drivers.TransferOptions{})
require.NoError(t, err)

rows, err := olap.Execute(context.Background(), &drivers.Statement{Query: "SELECT COUNT(*) FROM test"})
Expand All @@ -52,7 +52,7 @@ func TestDuckDBToDuckDBTransfer(t *testing.T) {
require.NoError(t, rows.Close())

// transfer again
err = tr.Transfer(context.Background(), map[string]any{"sql": "SELECT * FROM foo", "db": filepath.Join(tempDir, "tranfser.db")}, map[string]any{"table": "test"}, &drivers.TransferOptions{Progress: drivers.NoOpProgress{}})
err = tr.Transfer(context.Background(), map[string]any{"sql": "SELECT * FROM foo", "db": filepath.Join(tempDir, "tranfser.db")}, map[string]any{"table": "test"}, &drivers.TransferOptions{})
require.NoError(t, err)

rows, err = olap.Execute(context.Background(), &drivers.Statement{Query: "SELECT COUNT(*) FROM test"})
Expand Down
2 changes: 0 additions & 2 deletions runtime/drivers/duckdb/transporter_filestore_to_duckDB.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ func (t *fileStoreToDuckDB) Transfer(ctx context.Context, srcProps, sinkProps ma
if !sizeWithinStorageLimits(t.to, size) {
return drivers.ErrStorageLimitExceeded
}
opts.Progress.Target(size, drivers.ProgressUnitByte)

var format string
if srcCfg.Format != "" {
Expand All @@ -70,6 +69,5 @@ func (t *fileStoreToDuckDB) Transfer(ctx context.Context, srcProps, sinkProps ma
if err != nil {
return err
}
opts.Progress.Observe(size, drivers.ProgressUnitByte)
return nil
}
1 change: 0 additions & 1 deletion runtime/drivers/duckdb/transporter_motherduck_to_duckDB.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ func NewMotherduckToDuckDB(from drivers.Handle, to drivers.OLAPStore, logger *za
}
}

// TODO: should it run count from user_query to set target in progress ?
func (t *motherduckToDuckDB) Transfer(ctx context.Context, srcProps, sinkProps map[string]any, opts *drivers.TransferOptions) error {
srcConfig := &mdSrcProps{}
err := mapstructure.WeakDecode(srcProps, srcConfig)
Expand Down
2 changes: 1 addition & 1 deletion runtime/drivers/duckdb/transporter_mysql_to_duckDB_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func allMySQLDataTypesTest(t *testing.T, db *sql.DB, dsn string) {
olap, _ := to.AsOLAP("")

tr := NewSQLStoreToDuckDB(sqlStore, olap, zap.NewNop())
err = tr.Transfer(ctx, map[string]any{"sql": "select * from all_data_types_table;"}, map[string]any{"table": "sink"}, &drivers.TransferOptions{Progress: drivers.NoOpProgress{}})
err = tr.Transfer(ctx, map[string]any{"sql": "select * from all_data_types_table;"}, map[string]any{"table": "sink"}, &drivers.TransferOptions{})
require.NoError(t, err)
res, err := olap.Execute(context.Background(), &drivers.Statement{Query: "select count(*) from sink"})
require.NoError(t, err)
Expand Down
7 changes: 2 additions & 5 deletions runtime/drivers/duckdb/transporter_objectStore_to_duckDB.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,9 @@ func (t *objectStoreToDuckDB) Transfer(ctx context.Context, srcProps, sinkProps

// if sql is specified use ast rewrite to fill in the downloaded files
if srcCfg.SQL != "" {
return t.ingestDuckDBSQL(ctx, srcCfg.SQL, iterator, srcCfg, sinkCfg, opts)
return t.ingestDuckDBSQL(ctx, srcCfg.SQL, iterator, srcCfg, sinkCfg)
}

opts.Progress.Target(size, drivers.ProgressUnitByte)
appendToTable := false
var format string
if srcCfg.Format != "" {
Expand Down Expand Up @@ -114,7 +113,6 @@ func (t *objectStoreToDuckDB) Transfer(ctx context.Context, srcProps, sinkProps

size := fileSize(files)
t.logger.Debug("ingested files", zap.Strings("files", files), zap.Int64("bytes_ingested", size), zap.Duration("duration", time.Since(st)), observability.ZapCtx(ctx))
opts.Progress.Observe(size, drivers.ProgressUnitByte)
appendToTable = true
}
// convert to enum
Expand All @@ -125,7 +123,7 @@ func (t *objectStoreToDuckDB) Transfer(ctx context.Context, srcProps, sinkProps
return nil
}

func (t *objectStoreToDuckDB) ingestDuckDBSQL(ctx context.Context, originalSQL string, iterator drivers.FileIterator, srcCfg *fileSourceProperties, dbSink *sinkProperties, opts *drivers.TransferOptions) error {
func (t *objectStoreToDuckDB) ingestDuckDBSQL(ctx context.Context, originalSQL string, iterator drivers.FileIterator, srcCfg *fileSourceProperties, dbSink *sinkProperties) error {
ast, err := duckdbsql.Parse(originalSQL)
if err != nil {
return err
Expand Down Expand Up @@ -178,7 +176,6 @@ func (t *objectStoreToDuckDB) ingestDuckDBSQL(ctx context.Context, originalSQL s

size := fileSize(files)
t.logger.Debug("ingested files", zap.Strings("files", files), zap.Int64("bytes_ingested", size), zap.Duration("duration", time.Since(st)), observability.ZapCtx(ctx))
opts.Progress.Observe(size, drivers.ProgressUnitByte)
appendToTable = true
}
// convert to enum
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func allDataTypesTest(t *testing.T, db *sql.DB, dbURL string) {
olap, _ := to.AsOLAP("")

tr := NewSQLStoreToDuckDB(sqlStore, olap, zap.NewNop())
err = tr.Transfer(ctx, map[string]any{"sql": "select * from all_datatypes;"}, map[string]any{"table": "sink"}, &drivers.TransferOptions{Progress: drivers.NoOpProgress{}})
err = tr.Transfer(ctx, map[string]any{"sql": "select * from all_datatypes;"}, map[string]any{"table": "sink"}, &drivers.TransferOptions{})
require.NoError(t, err)
res, err := olap.Execute(context.Background(), &drivers.Statement{Query: "select count(*) from sink"})
require.NoError(t, err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func Test_sqliteToDuckDB_Transfer(t *testing.T) {
logger: zap.NewNop(),
}
query := fmt.Sprintf("SELECT * FROM sqlite_scan('%s', 't');", dbPath)
err = tr.Transfer(context.Background(), map[string]any{"sql": query}, map[string]any{"table": "test"}, &drivers.TransferOptions{Progress: drivers.NoOpProgress{}})
err = tr.Transfer(context.Background(), map[string]any{"sql": query}, map[string]any{"table": "test"}, &drivers.TransferOptions{})
require.NoError(t, err)

res, err := olap.Execute(context.Background(), &drivers.Statement{Query: "SELECT count(*) from test"})
Expand Down
1 change: 0 additions & 1 deletion runtime/drivers/duckdb/transporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -602,7 +602,6 @@ func runOLAPStore(t *testing.T) drivers.OLAPStore {
func mockTransferOptions() *drivers.TransferOptions {
return &drivers.TransferOptions{
AllowHostAccess: true,
Progress: drivers.NoOpProgress{},
AcquireConnector: func(name string) (drivers.Handle, func(), error) {
return nil, nil, fmt.Errorf("not found")
},
Expand Down
2 changes: 0 additions & 2 deletions runtime/drivers/snowflake/sql_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,6 @@ type fileIterator struct {
conn *sql.Conn
rows sqld.Rows
batches []*sf.ArrowBatch
progress drivers.Progress
limitInBytes int64
logger *zap.Logger
tempDir string
Expand Down Expand Up @@ -247,7 +246,6 @@ func (f *fileIterator) Next() ([]string, error) {
if err != nil {
return nil, err
}
f.progress.Observe(1, drivers.ProgressUnitFile)
f.logger.Debug("size of file", zap.String("size", datasize.ByteSize(fileInfo.Size()).HumanReadable()), observability.ZapCtx(f.ctx))
return []string{fw.Name()}, nil
}
Expand Down
15 changes: 0 additions & 15 deletions runtime/drivers/transporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,24 +14,9 @@ type Transporter interface {
type TransferOptions struct {
AllowHostAccess bool
RepoRoot string
Progress Progress
AcquireConnector func(string) (Handle, func(), error)
}

// Progress is an interface for communicating progress info
type Progress interface {
Target(val int64, unit ProgressUnit)
// Observe is used by caller to provide incremental updates
Observe(val int64, unit ProgressUnit)
}

type NoOpProgress struct{}

func (n NoOpProgress) Target(val int64, unit ProgressUnit) {}
func (n NoOpProgress) Observe(val int64, unit ProgressUnit) {}

var _ Progress = NoOpProgress{}

type ProgressUnit int

const (
Expand Down
1 change: 0 additions & 1 deletion runtime/reconcilers/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,6 @@ func (r *SourceReconciler) ingestSource(ctx context.Context, self *runtimev1.Res
AcquireConnector: func(name string) (drivers.Handle, func(), error) {
return r.C.AcquireConn(ctx, name)
},
Progress: drivers.NoOpProgress{},
}

transferStart := time.Now()
Expand Down

1 comment on commit ca4f66d

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🎉 Published on https://ui.rilldata.com as production
🚀 Deployed on https://66b61c76c4712d7448c907d8--rill-ui.netlify.app

Please sign in to comment.