Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added customizable locks #4

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 76 additions & 40 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"encoding/hex"
"encoding/json"
"errors"
"github.com/zeriontech/sidecache/pkg/lock"
"io"
"io/ioutil"
"net/http"
Expand All @@ -17,6 +16,8 @@ import (
"strings"
"time"

"github.com/zeriontech/sidecache/pkg/lock"

"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/zeriontech/sidecache/pkg/cache"
"go.uber.org/zap"
Expand Down Expand Up @@ -143,50 +144,50 @@ func (server CacheServer) CacheHandler(w http.ResponseWriter, r *http.Request) {
}
}()

path := strings.Split(r.URL.Path, "/")
key := "lock:" + path[1]
if !UseLock {
serve(server, w, r)
return
}

key := GetLockKey(server.Logger, r.URL)
resultKey := server.HashURL(server.ReorderQueryString(r.URL))

if UseLock {
attempt := 0
for {
// check the cache
server.Logger.Info("checking the cache", zap.String("resultKey", resultKey), zap.Int("attempt", attempt+1))
if cachedDataBytes := server.CheckCache(resultKey); cachedDataBytes != nil {
serveFromCache(cachedDataBytes, server, w, r)
return
}
attempt := 0
for {
// check the cache
server.Logger.Info("checking the cache", zap.String("resultKey", resultKey), zap.Int("attempt", attempt+1))
if cachedDataBytes := server.CheckCache(resultKey); cachedDataBytes != nil {
serveFromCache(cachedDataBytes, server, w, r)
return
}

// try to acquire the lock
server.Logger.Info("acquiring the lock", zap.String("key", key))
if err := server.LockMgr.Acquire(key, LockTtl); err == nil {
server.Logger.Info("lock acquired", zap.String("key", key))
defer func() {
// release the lock
if err := server.LockMgr.Release(key); err != nil {
server.Logger.Error("could not unlock the lock", zap.Error(err))
}
}()
serve(server, w, r)
return
} else {
server.Logger.Error("lock is locked", zap.Error(err))
}
// try to acquire the lock
server.Logger.Info("acquiring the lock", zap.String("key", key))
if err := server.LockMgr.Acquire(key, LockTtl); err == nil {
server.Logger.Info("lock acquired", zap.String("key", key))
defer func() {
// release the lock
if err := server.LockMgr.Release(key); err != nil {
server.Logger.Error("could not unlock the lock", zap.Error(err))
}
}()
serve(server, w, r)
return
} else {
server.Logger.Error("lock is locked", zap.Error(err))
}

// wait a bit
backoff := server.GetBackoff(attempt)
if backoff >= LockTtl {
// failed to acquire the lock for too long
server.Logger.Error("failed to acquire the lock", zap.String("url", r.URL.String()))
w.WriteHeader(http.StatusGatewayTimeout)
return
}
server.Logger.Info("sleeping", zap.String("key", key), zap.Duration("backoff", backoff))
time.Sleep(backoff)
attempt++
// wait a bit
backoff := server.GetBackoff(attempt)
if backoff >= LockTtl {
// failed to acquire the lock for too long
server.Logger.Error("failed to acquire the lock", zap.String("url", r.URL.String()))
w.WriteHeader(http.StatusGatewayTimeout)
return
}
} else {
serve(server, w, r)
server.Logger.Info("sleeping", zap.String("key", key), zap.Duration("backoff", backoff))
time.Sleep(backoff)
attempt++
}
}

Expand Down Expand Up @@ -257,3 +258,38 @@ func (server CacheServer) GetBackoff(attempt int) time.Duration {
return 500 * time.Millisecond
}
}

func GetLockKey(logger *zap.Logger, u *url.URL) string {
var key string
switch LockLocation {
case LocationPath:
path := strings.Split(u.Path, "/")
if LockIndex >= len(path) {
logger.Error(
"cannot parse lock key, index is out of bounds",
zap.String("url", u.String()),
)
return ""
}
key = path[LockIndex]
case LocationQuery:
values, ok := u.Query()[LockKey]
if !ok || len(values) == 0 {
logger.Error(
"cannot parse lock key, query param is missing",
zap.String("url", u.String()),
)
return ""
}
key = values[0]
for _, v := range values {
if v < key {
key = v
}
}
default:
logger.Error("lock is enabled but location is unspecified")
return ""
}
return "lock:" + key
}
73 changes: 73 additions & 0 deletions pkg/server/server_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package server_test

import (
"net/url"
"testing"

"github.com/zeriontech/sidecache/pkg/server"
"go.uber.org/zap"
)

func TestGetLockKey(t *testing.T) {
logger, err := zap.NewDevelopment()
if err != nil {
t.FailNow()
}
testData := []struct {
name string
urlStr string
expected string
lockLocation server.Location
lockIndex int
lockKey string
}{
{
"path-1",
"http://localhost:1234/0x2d03f2b283fc90da454383afa8080293c8336448/info/?address=0x2d03f2b283fc90da454383afa8080293c8336448",
"lock:0x2d03f2b283fc90da454383afa8080293c8336448",
server.LocationPath,
1,
"",
},
{
"query-address",
"http://localhost:1234/api/v1/actions/?address=0x49131d39ead64a9e4912e641a6bd5fa7ae452f3f&currency=usd&limit=500&offset=0&search_query=receive",
"lock:0x49131d39ead64a9e4912e641a6bd5fa7ae452f3f",
server.LocationQuery,
0,
"address",
},
{
"query-multiple-uses-min",
"http://localhost:1234/api/v1/actions/?address=0x49131d39ead64a9e4912e641a6bd5fa7ae452f3f&address=0x123&address=0x456",
"lock:0x123",
server.LocationQuery,
0,
"address",
},
}
for _, d := range testData {
t.Run(d.name, func(t *testing.T) {
u, err := url.Parse(d.urlStr)
if err != nil {
logger.Error("could not parse test url")
t.FailNow()
}

// setup
server.LockLocation = d.lockLocation
server.LockIndex = d.lockIndex
server.LockKey = d.lockKey

key := server.GetLockKey(logger, u)
if key != d.expected {
logger.Error(
"key mismatch",
zap.String("expected", d.expected),
zap.String("actual", key),
)
t.Fail()
}
})
}
}
28 changes: 27 additions & 1 deletion pkg/server/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,32 @@ import (
var (
CacheTtl, _ = time.ParseDuration(os.Getenv("CACHE_TTL"))
LockTtl, _ = time.ParseDuration(os.Getenv("LOCK_TTL"))
UseLock = strings.ToLower(os.Getenv("USE_LOCK")) == "true"
ProjectName = os.Getenv("PROJECT_NAME")

UseLock = strings.ToLower(os.Getenv("USE_LOCK")) == "true"
LockLocation Location = NewLocation(os.Getenv("LOCK_LOCATION"))
LockIndex int = 1 // only for path location
LockKey string = os.Getenv("LOCK_KEY") // only for query location
)

type Location string

var (
LocationUnspecified Location = ""
LocationPath Location = "path"
LocationQuery Location = "query"
)

func NewLocation(s string) Location {
var l Location
l.FromString(s)
return l
}

func (l *Location) FromString(s string) {
if strings.ToLower(s) == "query" {
*l = LocationQuery
} else {
*l = LocationPath
}
}