-
Notifications
You must be signed in to change notification settings - Fork 21
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Fixed code lints - Refactored the code to use `httputil.ReverseProxy` with a director function - Add caching - Fixes #7 Signed-off-by: Luis Davim <[email protected]>
- Loading branch information
Showing
11 changed files
with
699 additions
and
312 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,3 +2,4 @@ aws-s3-reverse-proxy | |
aws-s3-reverse-proxy.tar | ||
config | ||
test.txt | ||
cache.d/ |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,271 @@ | ||
package cache | ||
|
||
// This file was initially taken from https://github.com/hauke96/tiny-http-proxy/blob/master/cache.go | ||
// some extra functionality and fixes have been added: | ||
// Allow invalidating cahce items | ||
// Expiring cached items based on a global TTL | ||
|
||
import ( | ||
"bufio" | ||
"bytes" | ||
"crypto/sha256" | ||
"encoding/hex" | ||
"fmt" | ||
"hash" | ||
"io" | ||
"io/fs" | ||
"os" | ||
"path/filepath" | ||
"sync" | ||
"time" | ||
|
||
log "github.com/sirupsen/logrus" | ||
) | ||
|
||
type Cache struct { | ||
folder string | ||
hash hash.Hash | ||
knownValues map[string][]byte | ||
busyValues map[string]*sync.Mutex | ||
mutex *sync.Mutex | ||
maxSize int64 | ||
ttl time.Duration | ||
} | ||
|
||
type Options struct { | ||
Path string | ||
MaxSize int64 | ||
TTL time.Duration | ||
} | ||
|
||
func CreateCache(opts Options) (*Cache, error) { | ||
fileInfos, err := os.ReadDir(opts.Path) | ||
if err != nil { | ||
log.Warnf("Cannot open cache folder '%s': %s", opts.Path, err) | ||
log.Infof("Create cache folder '%s'", opts.Path) | ||
if err := os.Mkdir(opts.Path, os.ModePerm); err != nil { | ||
return nil, err | ||
} | ||
} | ||
|
||
values := make(map[string][]byte, 0) | ||
busy := make(map[string]*sync.Mutex, 0) | ||
|
||
// Go through every file an save its name in the map. The content of the file | ||
// is loaded when needed. This makes sure that we don't have to read | ||
// the directory content each time the user wants data that's not yet loaded. | ||
for _, info := range fileInfos { | ||
if !info.IsDir() { | ||
values[info.Name()] = nil | ||
} | ||
} | ||
|
||
hash := sha256.New() | ||
|
||
mutex := &sync.Mutex{} | ||
|
||
c := &Cache{ | ||
folder: opts.Path, | ||
hash: hash, | ||
knownValues: values, | ||
busyValues: busy, | ||
mutex: mutex, | ||
maxSize: opts.MaxSize, | ||
ttl: opts.TTL, | ||
} | ||
|
||
go func() { | ||
ticker := time.NewTicker(c.ttl) | ||
defer ticker.Stop() | ||
for range ticker.C { | ||
files, err := c.findFilesOlderThanTTL() | ||
if err != nil { | ||
continue | ||
} | ||
for _, file := range files { | ||
err := c.deleteFromHash(file.Name()) | ||
if err != nil { | ||
continue | ||
} | ||
} | ||
} | ||
}() | ||
|
||
return c, nil | ||
} | ||
|
||
// Returns true if the resource is found, and false otherwise. If the | ||
// resource is busy, this method will hang until the resource is free. If | ||
// the resource is not found, a lock indicating that the resource is busy will | ||
// be returned. Once the resource Has been put into cache the busy lock *must* | ||
// be unlocked to allow others to access the newly cached resource | ||
func (c *Cache) Has(key string) (*sync.Mutex, bool) { | ||
hashValue := calcHash(key) | ||
|
||
c.mutex.Lock() | ||
defer c.mutex.Unlock() | ||
|
||
// If the resource is busy, wait for it to be free. This is the case if | ||
// the resource is currently being cached as a result of another request. | ||
// Also, release the lock on the cache to allow other readers while waiting | ||
if lock, busy := c.busyValues[hashValue]; busy { | ||
c.mutex.Unlock() | ||
lock.Lock() | ||
// just waiting in case lock was previously acquired | ||
lock.Unlock() | ||
c.mutex.Lock() | ||
} | ||
|
||
// If a resource is in the shared cache, it can't be reserved. One can simply | ||
// access it directly from the cache | ||
if _, found := c.knownValues[hashValue]; found { | ||
return nil, true | ||
} | ||
|
||
// The resource is not in the cache, mark the resource as busy until it has | ||
// been cached successfully. Unlocking lock is required! | ||
lock := new(sync.Mutex) | ||
lock.Lock() | ||
c.busyValues[hashValue] = lock | ||
return lock, false | ||
} | ||
|
||
func (c *Cache) Get(key string) (*io.Reader, error) { | ||
var response io.Reader | ||
hashValue := calcHash(key) | ||
|
||
// Try to get content. Error if not found. | ||
c.mutex.Lock() | ||
content, ok := c.knownValues[hashValue] | ||
c.mutex.Unlock() | ||
if !ok && len(content) > 0 { | ||
log.Debugf("Cache doesn't know key '%s'", hashValue) | ||
return nil, fmt.Errorf("key '%s' is not known to cache", hashValue) | ||
} | ||
|
||
log.Debugf("Cache has key '%s'", hashValue) | ||
|
||
// Key is known, but not loaded into RAM | ||
if content == nil { | ||
log.Debugf("Cache item '%s' known but is not stored in memory. Using file.", hashValue) | ||
|
||
file, err := os.Open(filepath.Join(c.folder, hashValue)) | ||
if err != nil { | ||
log.Errorf("Error reading cached file '%s': %s", hashValue, err) | ||
// forget the cached item | ||
_ = c.deleteFromHash(hashValue) | ||
return nil, err | ||
} | ||
|
||
response = file | ||
|
||
log.Debugf("Create reader from file %s", hashValue) | ||
} else { // Key is known and data is already loaded to RAM | ||
response = bytes.NewReader(content) | ||
log.Debugf("Create reader from %d byte large cache content", len(content)) | ||
} | ||
|
||
return &response, nil | ||
} | ||
|
||
func (c *Cache) Delete(key string) error { | ||
return c.deleteFromHash(calcHash(key)) | ||
} | ||
|
||
func (c *Cache) deleteFromHash(hashValue string) error { | ||
c.mutex.Lock() | ||
defer c.mutex.Unlock() | ||
|
||
// If the resource is busy, wait for it to be free. This is the case if | ||
// the resource is currently being cached as a result of another request. | ||
// Also, release the lock on the cache to allow other readers while waiting | ||
if lock, busy := c.busyValues[hashValue]; busy { | ||
c.mutex.Unlock() | ||
lock.Lock() | ||
// just waiting in case lock was previously acquired | ||
lock.Unlock() | ||
c.mutex.Lock() | ||
} | ||
|
||
delete(c.busyValues, hashValue) | ||
delete(c.knownValues, hashValue) | ||
|
||
return os.Remove(filepath.Join(c.folder, hashValue)) | ||
} | ||
|
||
func (c *Cache) findFilesOlderThanTTL() ([]fs.DirEntry, error) { | ||
var files []fs.DirEntry | ||
tmpfiles, err := os.ReadDir(c.folder) | ||
if err != nil { | ||
return files, err | ||
} | ||
|
||
for _, file := range tmpfiles { | ||
if file.Type().IsRegular() { | ||
info, err := file.Info() | ||
if err != nil { | ||
return files, err | ||
} | ||
if time.Since(info.ModTime()) > c.ttl { | ||
files = append(files, file) | ||
} | ||
} | ||
} | ||
return files, err | ||
} | ||
|
||
// release is an internal method which atomically caches an item and unmarks | ||
// the item as busy, if it was busy before. The busy lock *must* be unlocked | ||
// elsewhere! | ||
func (c *Cache) release(hashValue string, content []byte) { | ||
c.mutex.Lock() | ||
delete(c.busyValues, hashValue) | ||
c.knownValues[hashValue] = content | ||
c.mutex.Unlock() | ||
} | ||
|
||
func (c *Cache) Put(key string, content *io.Reader, contentLength int64) error { | ||
hashValue := calcHash(key) | ||
|
||
// Small enough to put it into the in-memory cache | ||
if contentLength <= c.maxSize*1024*1024 { | ||
buffer := &bytes.Buffer{} | ||
_, err := io.Copy(buffer, *content) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
defer c.release(hashValue, buffer.Bytes()) | ||
log.Debugf("Added %s into in-memory cache", hashValue) | ||
|
||
err = os.WriteFile(filepath.Join(c.folder, hashValue), buffer.Bytes(), 0644) | ||
if err != nil { | ||
return err | ||
} | ||
log.Debugf("Wrote content of entry %s into file", hashValue) | ||
} else { // Too large for in-memory cache, just write to file | ||
defer c.release(hashValue, nil) | ||
log.Debugf("Added nil-entry for %s into in-memory cache", hashValue) | ||
|
||
file, err := os.Create(filepath.Join(c.folder, hashValue)) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
writer := bufio.NewWriter(file) | ||
_, err = io.Copy(writer, *content) | ||
if err != nil { | ||
return err | ||
} | ||
log.Debugf("Wrote content of entry %s into file", hashValue) | ||
} | ||
|
||
log.Debugf("Cache wrote content into '%s'", hashValue) | ||
|
||
return nil | ||
} | ||
|
||
func calcHash(data string) string { | ||
sha := sha256.Sum256([]byte(data)) | ||
return hex.EncodeToString(sha[:]) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,11 +1,32 @@ | ||
module github.com/Kriechi/aws-s3-reverse-proxy | ||
|
||
go 1.20 | ||
|
||
require ( | ||
github.com/aws/aws-sdk-go v1.38.25 | ||
github.com/prometheus/client_golang v1.11.1 | ||
github.com/sirupsen/logrus v1.8.1 | ||
github.com/stretchr/testify v1.7.0 | ||
gopkg.in/alecthomas/kingpin.v2 v2.2.6 | ||
github.com/alecthomas/kingpin/v2 v2.3.2 | ||
github.com/aws/aws-sdk-go v1.44.308 | ||
github.com/prometheus/client_golang v1.16.0 | ||
github.com/sirupsen/logrus v1.9.3 | ||
github.com/stretchr/testify v1.8.4 | ||
k8s.io/utils v0.0.0-20230726121419-3b25d923346b | ||
) | ||
|
||
go 1.16 | ||
require ( | ||
github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137 // indirect | ||
github.com/beorn7/perks v1.0.1 // indirect | ||
github.com/cespare/xxhash/v2 v2.2.0 // indirect | ||
github.com/davecgh/go-spew v1.1.1 // indirect | ||
github.com/golang/protobuf v1.5.3 // indirect | ||
github.com/jmespath/go-jmespath v0.4.0 // indirect | ||
github.com/kr/text v0.2.0 // indirect | ||
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect | ||
github.com/pmezard/go-difflib v1.0.0 // indirect | ||
github.com/prometheus/client_model v0.3.0 // indirect | ||
github.com/prometheus/common v0.42.0 // indirect | ||
github.com/prometheus/procfs v0.10.1 // indirect | ||
github.com/rogpeppe/go-internal v1.11.0 // indirect | ||
github.com/xhit/go-str2duration/v2 v2.1.0 // indirect | ||
golang.org/x/sys v0.10.0 // indirect | ||
google.golang.org/protobuf v1.30.0 // indirect | ||
gopkg.in/yaml.v3 v3.0.1 // indirect | ||
) |
Oops, something went wrong.