Skip to content

Commit

Permalink
feat: Add caching
Browse files Browse the repository at this point in the history
- Fixed code lints
- Refactored the code into packages
- Add caching
- Add a helm chart to deploy to k8s
- Fixes #7

Signed-off-by: Luis Davim <[email protected]>
  • Loading branch information
luisdavim committed Jul 27, 2023
1 parent 7914025 commit d1a6b3b
Show file tree
Hide file tree
Showing 23 changed files with 1,280 additions and 329 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
aws-s3-reverse-proxy
./aws-s3-reverse-proxy
aws-s3-reverse-proxy.tar
config
test.txt
cache.d/
271 changes: 271 additions & 0 deletions cache/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,271 @@
package cache

// This file was initially taken from https://github.com/hauke96/tiny-http-proxy/blob/master/cache.go
// some extra functionality and fixes have been added:
// Allow invalidating cahce items
// Expiring cached items based on a global TTL

import (
"bufio"
"bytes"
"crypto/sha256"
"encoding/hex"
"fmt"
"hash"
"io"
"io/fs"
"os"
"path/filepath"
"sync"
"time"

log "github.com/sirupsen/logrus"
)

type Cache struct {
folder string
hash hash.Hash
knownValues map[string][]byte
busyValues map[string]*sync.Mutex
mutex *sync.Mutex
maxSize int64
ttl time.Duration
}

type Options struct {
Path string
MaxSize int64
TTL time.Duration
}

func CreateCache(opts Options) (*Cache, error) {
fileInfos, err := os.ReadDir(opts.Path)
if err != nil {
log.Warnf("Cannot open cache folder '%s': %s", opts.Path, err)
log.Infof("Create cache folder '%s'", opts.Path)
if err := os.Mkdir(opts.Path, os.ModePerm); err != nil {
return nil, err
}
}

values := make(map[string][]byte, 0)
busy := make(map[string]*sync.Mutex, 0)

// Go through every file an save its name in the map. The content of the file
// is loaded when needed. This makes sure that we don't have to read
// the directory content each time the user wants data that's not yet loaded.
for _, info := range fileInfos {
if !info.IsDir() {
values[info.Name()] = nil
}
}

hash := sha256.New()

mutex := &sync.Mutex{}

c := &Cache{
folder: opts.Path,
hash: hash,
knownValues: values,
busyValues: busy,
mutex: mutex,
maxSize: opts.MaxSize,
ttl: opts.TTL,
}

go func() {
ticker := time.NewTicker(c.ttl)
defer ticker.Stop()
for range ticker.C {
files, err := c.findFilesOlderThanTTL()
if err != nil {
continue
}
for _, file := range files {
err := c.deleteFromHash(file.Name())
if err != nil {
continue
}
}
}
}()

return c, nil
}

// Returns true if the resource is found, and false otherwise. If the
// resource is busy, this method will hang until the resource is free. If
// the resource is not found, a lock indicating that the resource is busy will
// be returned. Once the resource Has been put into cache the busy lock *must*
// be unlocked to allow others to access the newly cached resource
func (c *Cache) Has(key string) (*sync.Mutex, bool) {
hashValue := calcHash(key)

c.mutex.Lock()
defer c.mutex.Unlock()

// If the resource is busy, wait for it to be free. This is the case if
// the resource is currently being cached as a result of another request.
// Also, release the lock on the cache to allow other readers while waiting
if lock, busy := c.busyValues[hashValue]; busy {
c.mutex.Unlock()
lock.Lock()
// just waiting in case lock was previously acquired
lock.Unlock()
c.mutex.Lock()
}

// If a resource is in the shared cache, it can't be reserved. One can simply
// access it directly from the cache
if _, found := c.knownValues[hashValue]; found {
return nil, true
}

// The resource is not in the cache, mark the resource as busy until it has
// been cached successfully. Unlocking lock is required!
lock := new(sync.Mutex)
lock.Lock()
c.busyValues[hashValue] = lock
return lock, false
}

func (c *Cache) Get(key string) (*io.Reader, error) {
var response io.Reader
hashValue := calcHash(key)

// Try to get content. Error if not found.
c.mutex.Lock()
content, ok := c.knownValues[hashValue]
c.mutex.Unlock()
if !ok && len(content) > 0 {
log.Debugf("Cache doesn't know key '%s'", hashValue)
return nil, fmt.Errorf("key '%s' is not known to cache", hashValue)
}

log.Debugf("Cache has key '%s'", hashValue)

// Key is known, but not loaded into RAM
if content == nil {
log.Debugf("Cache item '%s' known but is not stored in memory. Using file.", hashValue)

file, err := os.Open(filepath.Join(c.folder, hashValue))
if err != nil {
log.Errorf("Error reading cached file '%s': %s", hashValue, err)
// forget the cached item
_ = c.deleteFromHash(hashValue)
return nil, err
}

response = file

log.Debugf("Create reader from file %s", hashValue)
} else { // Key is known and data is already loaded to RAM
response = bytes.NewReader(content)
log.Debugf("Create reader from %d byte large cache content", len(content))
}

return &response, nil
}

func (c *Cache) Delete(key string) error {
return c.deleteFromHash(calcHash(key))
}

func (c *Cache) deleteFromHash(hashValue string) error {
c.mutex.Lock()
defer c.mutex.Unlock()

// If the resource is busy, wait for it to be free. This is the case if
// the resource is currently being cached as a result of another request.
// Also, release the lock on the cache to allow other readers while waiting
if lock, busy := c.busyValues[hashValue]; busy {
c.mutex.Unlock()
lock.Lock()
// just waiting in case lock was previously acquired
lock.Unlock()
c.mutex.Lock()
}

delete(c.busyValues, hashValue)
delete(c.knownValues, hashValue)

return os.Remove(filepath.Join(c.folder, hashValue))
}

func (c *Cache) findFilesOlderThanTTL() ([]fs.DirEntry, error) {
var files []fs.DirEntry
tmpfiles, err := os.ReadDir(c.folder)
if err != nil {
return files, err
}

for _, file := range tmpfiles {
if file.Type().IsRegular() {
info, err := file.Info()
if err != nil {
return files, err
}
if time.Since(info.ModTime()) > c.ttl {
files = append(files, file)
}
}
}
return files, err
}

// release is an internal method which atomically caches an item and unmarks
// the item as busy, if it was busy before. The busy lock *must* be unlocked
// elsewhere!
func (c *Cache) release(hashValue string, content []byte) {
c.mutex.Lock()
delete(c.busyValues, hashValue)
c.knownValues[hashValue] = content
c.mutex.Unlock()
}

func (c *Cache) Put(key string, content *io.Reader, contentLength int64) error {
hashValue := calcHash(key)

// Small enough to put it into the in-memory cache
if contentLength <= c.maxSize*1024*1024 {
buffer := &bytes.Buffer{}
_, err := io.Copy(buffer, *content)
if err != nil {
return err
}

defer c.release(hashValue, buffer.Bytes())
log.Debugf("Added %s into in-memory cache", hashValue)

err = os.WriteFile(filepath.Join(c.folder, hashValue), buffer.Bytes(), 0644)
if err != nil {
return err
}
log.Debugf("Wrote content of entry %s into file", hashValue)
} else { // Too large for in-memory cache, just write to file
defer c.release(hashValue, nil)
log.Debugf("Added nil-entry for %s into in-memory cache", hashValue)

file, err := os.Create(filepath.Join(c.folder, hashValue))
if err != nil {
return err
}

writer := bufio.NewWriter(file)
_, err = io.Copy(writer, *content)
if err != nil {
return err
}
log.Debugf("Wrote content of entry %s into file", hashValue)
}

log.Debugf("Cache wrote content into '%s'", hashValue)

return nil
}

func calcHash(data string) string {
sha := sha256.Sum256([]byte(data))
return hex.EncodeToString(sha[:])
}
23 changes: 23 additions & 0 deletions charts/aws-s3-reverse-proxy/.helmignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# Patterns to ignore when building packages.
# This supports shell glob matching, relative path matching, and
# negation (prefixed with !). Only one pattern per line.
.DS_Store
# Common VCS dirs
.git/
.gitignore
.bzr/
.bzrignore
.hg/
.hgignore
.svn/
# Common backup files
*.swp
*.bak
*.tmp
*.orig
*~
# Various IDEs
.project
.idea/
*.tmproj
.vscode/
24 changes: 24 additions & 0 deletions charts/aws-s3-reverse-proxy/Chart.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
apiVersion: v2
name: aws-s3-reverse-proxy
description: A Helm chart for Kubernetes

# A chart can be either an 'application' or a 'library' chart.
#
# Application charts are a collection of templates that can be packaged into versioned archives
# to be deployed.
#
# Library charts provide useful utilities or functions for the chart developer. They're included as
# a dependency of application charts to inject those utilities and functions into the rendering
# pipeline. Library charts do not define any templates and therefore cannot be deployed.
type: application

# This is the chart version. This version number should be incremented each time you make changes
# to the chart and its templates, including the app version.
# Versions are expected to follow Semantic Versioning (https://semver.org/)
version: 0.1.0

# This is the version number of the application being deployed. This version number should be
# incremented each time you make changes to the application. Versions are not expected to
# follow Semantic Versioning. They should reflect the version the application is using.
# It is recommended to use it with quotes.
appVersion: "1.1.0"
22 changes: 22 additions & 0 deletions charts/aws-s3-reverse-proxy/templates/NOTES.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
1. Get the application URL by running these commands:
{{- if .Values.ingress.enabled }}
{{- range $host := .Values.ingress.hosts }}
{{- range .paths }}
http{{ if $.Values.ingress.tls }}s{{ end }}://{{ $host.host }}{{ .path }}
{{- end }}
{{- end }}
{{- else if contains "NodePort" .Values.service.type }}
export NODE_PORT=$(kubectl get --namespace {{ .Release.Namespace }} -o jsonpath="{.spec.ports[0].nodePort}" services {{ include "aws-s3-reverse-proxy.fullname" . }})
export NODE_IP=$(kubectl get nodes --namespace {{ .Release.Namespace }} -o jsonpath="{.items[0].status.addresses[0].address}")
echo http://$NODE_IP:$NODE_PORT
{{- else if contains "LoadBalancer" .Values.service.type }}
NOTE: It may take a few minutes for the LoadBalancer IP to be available.
You can watch the status of by running 'kubectl get --namespace {{ .Release.Namespace }} svc -w {{ include "aws-s3-reverse-proxy.fullname" . }}'
export SERVICE_IP=$(kubectl get svc --namespace {{ .Release.Namespace }} {{ include "aws-s3-reverse-proxy.fullname" . }} --template "{{"{{ range (index .status.loadBalancer.ingress 0) }}{{.}}{{ end }}"}}")
echo http://$SERVICE_IP:{{ .Values.service.port }}
{{- else if contains "ClusterIP" .Values.service.type }}
export POD_NAME=$(kubectl get pods --namespace {{ .Release.Namespace }} -l "app.kubernetes.io/name={{ include "aws-s3-reverse-proxy.name" . }},app.kubernetes.io/instance={{ .Release.Name }}" -o jsonpath="{.items[0].metadata.name}")
export CONTAINER_PORT=$(kubectl get pod --namespace {{ .Release.Namespace }} $POD_NAME -o jsonpath="{.spec.containers[0].ports[0].containerPort}")
echo "Visit http://127.0.0.1:8080 to use your application"
kubectl --namespace {{ .Release.Namespace }} port-forward $POD_NAME 8080:$CONTAINER_PORT
{{- end }}
Loading

0 comments on commit d1a6b3b

Please sign in to comment.