Skip to content

Commit

Permalink
feat: Add caching
Browse files Browse the repository at this point in the history
- Fixed code lints
- Refactored the code to use `httputil.ReverseProxy` with a director function
- Add caching
- Fixes #7

Signed-off-by: Luis Davim <[email protected]>
  • Loading branch information
luisdavim committed Jul 27, 2023
1 parent 7914025 commit 5a0f60d
Show file tree
Hide file tree
Showing 11 changed files with 698 additions and 311 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@ aws-s3-reverse-proxy
aws-s3-reverse-proxy.tar
config
test.txt
cache.d/
271 changes: 271 additions & 0 deletions cache/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,271 @@
package cache

// This file was initially taken from https://github.com/hauke96/tiny-http-proxy/blob/master/cache.go
// some extra functionality has been added:
// Allow invalidating cahce items
// expiring cached items based on a global TTL

import (
"bufio"
"bytes"
"crypto/sha256"
"encoding/hex"
"fmt"
"hash"
"io"
"io/fs"
"os"
"path/filepath"
"sync"
"time"

log "github.com/sirupsen/logrus"
)

type Cache struct {
folder string
hash hash.Hash
knownValues map[string][]byte
busyValues map[string]*sync.Mutex
mutex *sync.Mutex
maxSize int64
ttl time.Duration
}

type Options struct {
Path string
MaxSize int64
TTL time.Duration
}

func CreateCache(opts Options) (*Cache, error) {
fileInfos, err := os.ReadDir(opts.Path)
if err != nil {
log.Warnf("Cannot open cache folder '%s': %s", opts.Path, err)
log.Infof("Create cache folder '%s'", opts.Path)
if err := os.Mkdir(opts.Path, os.ModePerm); err != nil {
return nil, err
}
}

values := make(map[string][]byte, 0)
busy := make(map[string]*sync.Mutex, 0)

// Go through every file an save its name in the map. The content of the file
// is loaded when needed. This makes sure that we don't have to read
// the directory content each time the user wants data that's not yet loaded.
for _, info := range fileInfos {
if !info.IsDir() {
values[info.Name()] = nil
}
}

hash := sha256.New()

mutex := &sync.Mutex{}

c := &Cache{
folder: opts.Path,
hash: hash,
knownValues: values,
busyValues: busy,
mutex: mutex,
maxSize: opts.MaxSize,
ttl: opts.TTL,
}

go func() {
ticker := time.NewTicker(c.ttl)
defer ticker.Stop()
for range ticker.C {
files, err := c.findFilesOlderThanTTL()
if err != nil {
continue
}
for _, file := range files {
err := c.deleteFromHash(file.Name())
if err != nil {
continue
}
}
}
}()

return c, nil
}

// Returns true if the resource is found, and false otherwise. If the
// resource is busy, this method will hang until the resource is free. If
// the resource is not found, a lock indicating that the resource is busy will
// be returned. Once the resource Has been put into cache the busy lock *must*
// be unlocked to allow others to access the newly cached resource
func (c *Cache) Has(key string) (*sync.Mutex, bool) {
hashValue := calcHash(key)

c.mutex.Lock()
defer c.mutex.Unlock()

// If the resource is busy, wait for it to be free. This is the case if
// the resource is currently being cached as a result of another request.
// Also, release the lock on the cache to allow other readers while waiting
if lock, busy := c.busyValues[hashValue]; busy {
c.mutex.Unlock()
lock.Lock()
// just waiting in case lock was previously acquired
lock.Unlock()
c.mutex.Lock()
}

// If a resource is in the shared cache, it can't be reserved. One can simply
// access it directly from the cache
if _, found := c.knownValues[hashValue]; found {
return nil, true
}

// The resource is not in the cache, mark the resource as busy until it has
// been cached successfully. Unlocking lock is required!
lock := new(sync.Mutex)
lock.Lock()
c.busyValues[hashValue] = lock
return lock, false
}

func (c *Cache) Get(key string) (*io.Reader, error) {
var response io.Reader
hashValue := calcHash(key)

// Try to get content. Error if not found.
c.mutex.Lock()
content, ok := c.knownValues[hashValue]
c.mutex.Unlock()
if !ok && len(content) > 0 {
log.Debugf("Cache doesn't know key '%s'", hashValue)
return nil, fmt.Errorf("key '%s' is not known to cache", hashValue)
}

log.Debugf("Cache has key '%s'", hashValue)

// Key is known, but not loaded into RAM
if content == nil {
log.Debugf("Cache item '%s' known but is not stored in memory. Using file.", hashValue)

file, err := os.Open(filepath.Join(c.folder, hashValue))
if err != nil {
log.Errorf("Error reading cached file '%s': %s", hashValue, err)
// forget the cached item
_ = c.deleteFromHash(hashValue)
return nil, err
}

response = file

log.Debugf("Create reader from file %s", hashValue)
} else { // Key is known and data is already loaded to RAM
response = bytes.NewReader(content)
log.Debugf("Create reader from %d byte large cache content", len(content))
}

return &response, nil
}

func (c *Cache) Delete(key string) error {
return c.deleteFromHash(calcHash(key))
}

func (c *Cache) deleteFromHash(hashValue string) error {
c.mutex.Lock()
defer c.mutex.Unlock()

// If the resource is busy, wait for it to be free. This is the case if
// the resource is currently being cached as a result of another request.
// Also, release the lock on the cache to allow other readers while waiting
if lock, busy := c.busyValues[hashValue]; busy {
c.mutex.Unlock()
lock.Lock()
// just waiting in case lock was previously acquired
lock.Unlock()
c.mutex.Lock()
}

delete(c.busyValues, hashValue)
delete(c.knownValues, hashValue)

return os.Remove(filepath.Join(c.folder, hashValue))
}

func (c *Cache) findFilesOlderThanTTL() ([]fs.DirEntry, error) {
var files []fs.DirEntry
tmpfiles, err := os.ReadDir(c.folder)
if err != nil {
return files, err
}

for _, file := range tmpfiles {
if file.Type().IsRegular() {
info, err := file.Info()
if err != nil {
return files, err
}
if time.Since(info.ModTime()) > c.ttl {
files = append(files, file)
}
}
}
return files, err
}

// release is an internal method which atomically caches an item and unmarks
// the item as busy, if it was busy before. The busy lock *must* be unlocked
// elsewhere!
func (c *Cache) release(hashValue string, content []byte) {
c.mutex.Lock()
delete(c.busyValues, hashValue)
c.knownValues[hashValue] = content
c.mutex.Unlock()
}

func (c *Cache) Put(key string, content *io.Reader, contentLength int64) error {
hashValue := calcHash(key)

// Small enough to put it into the in-memory cache
if contentLength <= c.maxSize*1024*1024 {
buffer := &bytes.Buffer{}
_, err := io.Copy(buffer, *content)
if err != nil {
return err
}

defer c.release(hashValue, buffer.Bytes())
log.Debugf("Added %s into in-memory cache", hashValue)

err = os.WriteFile(filepath.Join(c.folder, hashValue), buffer.Bytes(), 0644)
if err != nil {
return err
}
log.Debugf("Wrote content of entry %s into file", hashValue)
} else { // Too large for in-memory cache, just write to file
defer c.release(hashValue, nil)
log.Debugf("Added nil-entry for %s into in-memory cache", hashValue)

file, err := os.Create(filepath.Join(c.folder, hashValue))
if err != nil {
return err
}

writer := bufio.NewWriter(file)
_, err = io.Copy(writer, *content)
if err != nil {
return err
}
log.Debugf("Wrote content of entry %s into file", hashValue)
}

log.Debugf("Cache wrote content into '%s'", hashValue)

return nil
}

func calcHash(data string) string {
sha := sha256.Sum256([]byte(data))
return hex.EncodeToString(sha[:])
}
33 changes: 27 additions & 6 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,11 +1,32 @@
module github.com/Kriechi/aws-s3-reverse-proxy

go 1.20

require (
github.com/aws/aws-sdk-go v1.38.25
github.com/prometheus/client_golang v1.11.1
github.com/sirupsen/logrus v1.8.1
github.com/stretchr/testify v1.7.0
gopkg.in/alecthomas/kingpin.v2 v2.2.6
github.com/alecthomas/kingpin/v2 v2.3.2
github.com/aws/aws-sdk-go v1.44.308
github.com/prometheus/client_golang v1.16.0
github.com/sirupsen/logrus v1.9.3
github.com/stretchr/testify v1.8.4
k8s.io/utils v0.0.0-20230726121419-3b25d923346b
)

go 1.16
require (
github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_model v0.3.0 // indirect
github.com/prometheus/common v0.42.0 // indirect
github.com/prometheus/procfs v0.10.1 // indirect
github.com/rogpeppe/go-internal v1.11.0 // indirect
github.com/xhit/go-str2duration/v2 v2.1.0 // indirect
golang.org/x/sys v0.10.0 // indirect
google.golang.org/protobuf v1.30.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
Loading

0 comments on commit 5a0f60d

Please sign in to comment.