Skip to content
This repository has been archived by the owner on Mar 26, 2024. It is now read-only.

Commit

Permalink
Merge pull request #1 from peopledatalabs/chrispyduck/error-handling
Browse files Browse the repository at this point in the history
improve error handling, adjust buffer size
  • Loading branch information
chrispyduck authored Oct 3, 2023
2 parents 933b7b3 + 4f90888 commit 0b2cc6e
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 22 deletions.
8 changes: 5 additions & 3 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type Config struct {
Target Resource
Silent bool
TTL bool
MaxBuf int
}

// exit will exit and print the usage.
Expand All @@ -36,7 +37,7 @@ func exit(e error) {

// validate makes sure from and to are Redis URIs or file paths,
// and generates the final Config.
func validate(from, to string, silent, ttl bool) (Config, error) {
func validate(from, to string, silent, ttl bool, maxBuf int) (Config, error) {
cfg := Config{
Source: Resource{
URI: from,
Expand All @@ -46,6 +47,7 @@ func validate(from, to string, silent, ttl bool) (Config, error) {
},
Silent: silent,
TTL: ttl,
MaxBuf: maxBuf,
}

if strings.HasPrefix(from, "redis://") {
Expand Down Expand Up @@ -76,10 +78,10 @@ func Parse() Config {
to := flag.String("to", "", example)
silent := flag.Bool("silent", false, "optional, no verbose output")
ttl := flag.Bool("ttl", false, "optional, enable ttl sync")

maxBuf := flag.Int("buffer", 20*1024*1024, "the size of the buffer used when reading the file, uint:byte")
flag.Parse()

cfg, err := validate(*from, *to, *silent, *ttl)
cfg, err := validate(*from, *to, *silent, *ttl, *maxBuf)
if err != nil {
// we exit here instead of returning so that we can show
// the usage examples in case of an error.
Expand Down
18 changes: 9 additions & 9 deletions pkg/file/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type File struct {
Bus message.Bus
Silent bool
TTL bool
MaxBuf int
}

// splitCross is a double-cross (✝✝) custom Scanner Split.
Expand All @@ -38,12 +39,13 @@ func splitCross(data []byte, atEOF bool) (advance int, token []byte, err error)
}

// New creates the File struct, to be used for reading/writing.
func New(path string, bus message.Bus, silent, ttl bool) *File {
func New(path string, bus message.Bus, silent, ttl bool, maxBuf int) *File {
return &File{
Path: path,
Bus: bus,
Silent: silent,
TTL: ttl,
MaxBuf: maxBuf,
}
}

Expand All @@ -61,18 +63,16 @@ func (f *File) Read(ctx context.Context) error {

d, err := os.Open(f.Path)
if err != nil {
return err
return fmt.Errorf("error opening file %s: %W", f.Path, err)
}
defer d.Close()

// Scan file, split by double-cross separator
scanner := bufio.NewScanner(d)
buf := make([]byte, 0, bufio.MaxScanTokenSize)
scanner.Buffer(buf, f.MaxBuf)
scanner.Split(splitCross)

// set buffer max size to 2MB, initial size to 128k
buf := make([]byte, 0, 1024*1024)
scanner.Buffer(buf, 20*1024*1024)

// Scan line by line
// file protocol is key✝✝value✝✝ttl✝✝
for scanner.Scan() {
Expand All @@ -94,7 +94,7 @@ func (f *File) Read(ctx context.Context) error {
}

if err := scanner.Err(); err != nil {
return err
return fmt.Errorf("error reading from file: %W", err)
}

return nil
Expand All @@ -104,7 +104,7 @@ func (f *File) Read(ctx context.Context) error {
func (f *File) Write(ctx context.Context) error {
d, err := os.Create(f.Path)
if err != nil {
return err
return fmt.Errorf("error creating file %s: %W", f.Path, err)
}
defer d.Close()

Expand All @@ -129,7 +129,7 @@ func (f *File) Write(ctx context.Context) error {
}
_, err := w.WriteString(p.Key + "✝✝" + p.Value + "✝✝" + p.TTL + "✝✝")
if err != nil {
return err
return fmt.Errorf("error writing key '%s' to file with size %d: %W", p.Key, len(p.Value), err)
}
fmt.Printf("file: write %s => ttl=%s, size=%d\n", p.Key, p.TTL, len(p.Value))
}
Expand Down
20 changes: 14 additions & 6 deletions pkg/redis/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func (r *Redis) maybeTTL(key string) (string, error) {
// Try getting key TTL.
err := r.Pool.Do(radix.Cmd(&ttl, "PTTL", key))
if err != nil {
return ttl, err
return ttl, fmt.Errorf("error calling PTTL for key '%s': %W", key, err)
}

// When key has no expire PTTL returns "-1".
Expand Down Expand Up @@ -81,18 +81,22 @@ func (r *Redis) Read(ctx context.Context) error {
for scanner.Next(&key) {
err := r.Pool.Do(radix.Cmd(&value, "DUMP", key))
if err != nil {
return err
return fmt.Errorf("error reading key '%s' from redis: %W", key, err)
}

ttl, err = r.maybeTTL(key)
if err != nil {
return err
return fmt.Errorf("error syncing ttl for key '%s': %W", key, err)
}

select {
case <-ctx.Done():
fmt.Println("redis: done reading")
return ctx.Err()
err := ctx.Err()
if err != nil {
return fmt.Errorf("error reading from redis: %W", err)
}
return nil
case r.Bus <- message.Payload{Key: key, Value: value, TTL: ttl}:
fmt.Printf("redis: DUMP %s => ttl=%s, size=%d\n", key, ttl, len(value))
}
Expand All @@ -109,7 +113,11 @@ func (r *Redis) Write(ctx context.Context) error {
// Exit early if context done.
case <-ctx.Done():
fmt.Println("redis: done writing")
return ctx.Err()
err := ctx.Err()
if err != nil {
return fmt.Errorf("error writing to redis: %W", err)
}
return nil
// Get Messages from Bus
case p, ok := <-r.Bus:
// if channel closed, set to nil, break loop
Expand All @@ -130,7 +138,7 @@ func (r *Redis) Write(ctx context.Context) error {

err = r.Pool.Do(radix.Cmd(nil, "RESTORE", p.Key, p.TTL, p.Value, "REPLACE"))
if err != nil {
return err
return fmt.Errorf("error restoring key '%s': %W", p.Key, err)
}

fmt.Printf("redis: RESTORE %s ttl=%s \n", p.Key, p.TTL)
Expand Down
8 changes: 4 additions & 4 deletions pkg/run/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func Run(cfg config.Config) {
if cfg.Source.IsRedis {
db, err := radix.NewPool("tcp", cfg.Source.URI, 1)
if err != nil {
exit(err)
exit(fmt.Errorf("error creating new redis pool for %s: %W", cfg.Source.URI, err))
}

source := redis.New(db, ch, cfg.Silent, cfg.TTL)
Expand All @@ -49,7 +49,7 @@ func Run(cfg config.Config) {
return source.Read(gctx)
})
} else {
source := file.New(cfg.Source.URI, ch, cfg.Silent, cfg.TTL)
source := file.New(cfg.Source.URI, ch, cfg.Silent, cfg.TTL, cfg.MaxBuf)

g.Go(func() error {
return source.Read(gctx)
Expand All @@ -60,7 +60,7 @@ func Run(cfg config.Config) {
if cfg.Target.IsRedis {
db, err := radix.NewPool("tcp", cfg.Target.URI, 1)
if err != nil {
exit(err)
exit(fmt.Errorf("error creating new redis pool for %s: %W", cfg.Target.URI, err))
}

target := redis.New(db, ch, cfg.Silent, cfg.TTL)
Expand All @@ -70,7 +70,7 @@ func Run(cfg config.Config) {
return target.Write(gctx)
})
} else {
target := file.New(cfg.Target.URI, ch, cfg.Silent, cfg.TTL)
target := file.New(cfg.Target.URI, ch, cfg.Silent, cfg.TTL, cfg.MaxBuf)

g.Go(func() error {
defer cancel()
Expand Down

0 comments on commit 0b2cc6e

Please sign in to comment.