From 5bf905555b6dfd9d3120a4361a959adb64e6d19c Mon Sep 17 00:00:00 2001 From: Seth Erickson Date: Thu, 14 Nov 2024 23:40:00 +0000 Subject: [PATCH] remove unused internal package --- internal/walkdirs/flowmatic.go | 125 --------------------------------- internal/walkdirs/walkdirs.go | 68 ------------------ 2 files changed, 193 deletions(-) delete mode 100644 internal/walkdirs/flowmatic.go delete mode 100644 internal/walkdirs/walkdirs.go diff --git a/internal/walkdirs/flowmatic.go b/internal/walkdirs/flowmatic.go deleted file mode 100644 index 092d718a..00000000 --- a/internal/walkdirs/flowmatic.go +++ /dev/null @@ -1,125 +0,0 @@ -package walkdirs - -// Code in this file was adapted from Carl Johnson's "flowmatic" package, -// distibuted with the following license. - -// MIT License - -// Copyright (c) 2022 Carl Johnson - -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: - -// The above copyright notice and this permission notice shall be included in all -// copies or substantial portions of the Software. - -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -// SOFTWARE. - -import ( - "runtime" - "sync" - - "github.com/carlmjohnson/deque" -) - -// Manager is a function that serially examines Task results to see if it produced any new Inputs. -// Returning false will halt the processing of future tasks. -type Manager[Input, Output any] func(Input, Output, error) (tasks []Input, ok bool) - -// Task is a function that can concurrently transform an input into an output. -type Task[Input, Output any] func(in Input) (out Output, err error) - -// // DoTasks does tasks using n concurrent workers (or GOMAXPROCS workers if n < -// 1) which produce output consumed by a serially run manager. The manager -// should return a slice of new task inputs based on prior task results, or -// return false to halt processing. If a task panics during execution, the panic -// will be caught and rethrown in the parent Goroutine. -// -// DoTailingTasks is the same as DoTasks except tasks in the task queue are -// evaluated in last in, first out order. -func DoTailingTasks[Input, Output any](n int, task Task[Input, Output], manager Manager[Input, Output], initial ...Input) { - in, out := start(n, task) - defer func() { - close(in) - // drain any waiting tasks - for range out { - } - }() - queue := deque.Of(initial...) - inflight := 0 - for inflight > 0 || queue.Len() > 0 { - inch := in - item, ok := queue.Tail() - if !ok { - inch = nil - } - select { - case inch <- item: - inflight++ - queue.PopTail() - case r := <-out: - inflight-- - if r.Panic != nil { - panic(r.Panic) - } - items, ok := manager(r.In, r.Out, r.Err) - if !ok { - return - } - queue.Append(items...) - } - } -} - -// result is the type returned by the output channel of Start. -type result[Input, Output any] struct { - In Input - Out Output - Err error - Panic any -} - -// start n workers (or GOMAXPROCS workers if n < 1) which consume -// the in channel, execute task, and send the Result on the out channel. -// Callers should close the in channel to stop the workers from waiting for tasks. -// The out channel will be closed once the last result has been sent. -func start[Input, Output any](n int, task Task[Input, Output]) (in chan<- Input, out <-chan result[Input, Output]) { - if n < 1 { - n = runtime.GOMAXPROCS(0) - } - inch := make(chan Input) - ouch := make(chan result[Input, Output], n) - var wg sync.WaitGroup - wg.Add(n) - for i := 0; i < n; i++ { - go func() { - defer wg.Done() - defer func() { - pval := recover() - if pval == nil { - return - } - ouch <- result[Input, Output]{Panic: pval} - }() - for inval := range inch { - outval, err := task(inval) - ouch <- result[Input, Output]{inval, outval, err, nil} - } - }() - } - go func() { - wg.Wait() - close(ouch) - }() - return inch, ouch -} diff --git a/internal/walkdirs/walkdirs.go b/internal/walkdirs/walkdirs.go deleted file mode 100644 index dfd3bb55..00000000 --- a/internal/walkdirs/walkdirs.go +++ /dev/null @@ -1,68 +0,0 @@ -package walkdirs - -import ( - "context" - "errors" - "io/fs" - "path" - "runtime" -) - -type FS interface { - ReadDir(ctx context.Context, name string) ([]fs.DirEntry, error) -} - -type SkipFunc func(string) bool - -// ErrSkipDirs can be returned by a WalkDirsFunc to prevent WalkDirs from -// walking subdirectories. -var ErrSkipDirs = errors.New("skip subdirectories") - -// WalkDirsFunc is a function called for each directory by WalkDirs. If -// the function returns ErrSkipDirs, none of the directory's subdirectories -// are walked. -type WalkDirsFunc func(name string, entries []fs.DirEntry, err error) error - -// WalkDirs is a directory-oriented FS walker. It walks the FS starting at root, -// calling fn for each directory. If fn returns an error (other than -// ErrSkipDirs), the walk is canceled. WalkDirs reads directory entries in -// concurrent goroutines, the number of which is configurable. Each call to the -// WalkDirsFunc occurs from the same goroutine. The directory structure is -// walked depth-first order and in lexical order if concurrency is 1. -func WalkDirs(ctx context.Context, fsys FS, dir string, skipfn SkipFunc, fn WalkDirsFunc, gos int) error { - if gos < 1 { - gos = runtime.NumCPU() - } - readDirTask := func(dir string) ([]fs.DirEntry, error) { - return fsys.ReadDir(ctx, dir) - } - var walkErr error - // walkMgr is called for each directory and returns slice of paths to walk - walkMgr := func(dir string, entries []fs.DirEntry, err error) ([]string, bool) { - if fnErr := fn(dir, entries, err); fnErr != nil { - if errors.Is(fnErr, ErrSkipDirs) { - // don't add this directory's sub-directories - return nil, true - } - walkErr = fnErr - return nil, false - } - var subDirs []string // paths to continue search - // evaluate entries in reverse order so they are in lexical order for - // DoTailingTasks (which is LIFO). Note, - for i := len(entries); i > 0; i-- { - e := entries[i-1] - if !e.IsDir() { - continue - } - subdir := path.Join(dir, e.Name()) - if skipfn != nil && skipfn(subdir) { - continue - } - subDirs = append(subDirs, subdir) - } - return subDirs, true - } - DoTailingTasks(gos, readDirTask, walkMgr, dir) - return walkErr -}