Skip to content

Commit

Permalink
feat(cache): enhance cache strategy of transaction
Browse files Browse the repository at this point in the history
  • Loading branch information
pirosiki197 committed Feb 1, 2025
1 parent 23c1efa commit 4f1538d
Show file tree
Hide file tree
Showing 11 changed files with 588 additions and 195 deletions.
56 changes: 53 additions & 3 deletions template/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,41 @@ import (
"database/sql/driver"
"fmt"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/motoki317/sc"
"github.com/traP-jp/isuc/domains"
)

type cacheWithInfo struct {
*sc.Cache[string, *cacheRows]
query string
info domains.CachePlanSelectQuery
uniqueOnly bool // if true, query is like "SELECT * FROM table WHERE pk = ?"
lastUpdate atomic.Int64 // time.Time.UnixNano()
lastUpdateByKey syncMap[int64]
}

func (c *cacheWithInfo) updateTx() {
c.lastUpdate.Store(time.Now().UnixNano())
}

func (c *cacheWithInfo) updateByKeyTx(key string) {
c.lastUpdateByKey.Store(key, time.Now().UnixNano())
}

func (c *cacheWithInfo) isNewerThan(key string, t int64) bool {
if c.lastUpdate.Load() > t {
return true
}
if update, ok := c.lastUpdateByKey.Load(key); ok && update > t {
return true
}
return false
}

type (
queryKey struct{}
stmtKey struct{}
Expand All @@ -18,7 +51,7 @@ type (
func ExportMetrics() string {
res := ""
for query, cache := range caches {
stats := cache.cache.Stats()
stats := cache.Stats()
progress := "["
for i := 0; i < 20; i++ {
if i < int(stats.HitRatio()*20) {
Expand All @@ -43,7 +76,7 @@ type CacheStats struct {
func ExportCacheStats() map[string]CacheStats {
res := make(map[string]CacheStats)
for query, cache := range caches {
stats := cache.cache.Stats()
stats := cache.Stats()
res[query] = CacheStats{
Query: query,
HitRatio: stats.HitRatio(),
Expand All @@ -56,7 +89,7 @@ func ExportCacheStats() map[string]CacheStats {

func PurgeAllCaches() {
for _, cache := range caches {
cache.cache.Purge()
cache.Purge()
}
}

Expand Down Expand Up @@ -109,3 +142,20 @@ func replaceFn(ctx context.Context, key string) (*cacheRows, error) {
}
return cacheRows.clone(), nil
}

type syncMap[T any] struct {
m sync.Map
}

func (m *syncMap[T]) Load(key string) (T, bool) {
var zero T
v, ok := m.m.Load(key)
if !ok {
return zero, false
}
return v.(T), true
}

func (m *syncMap[T]) Store(key string, value T) {
m.m.Store(key, value)
}
56 changes: 53 additions & 3 deletions template/cache.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,41 @@ import (
"database/sql/driver"
"fmt"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/motoki317/sc"
"github.com/traP-jp/isuc/domains"
)

type cacheWithInfo struct {
*sc.Cache[string, *cacheRows]
query string
info domains.CachePlanSelectQuery
uniqueOnly bool // if true, query is like "SELECT * FROM table WHERE pk = ?"
lastUpdate atomic.Int64 // time.Time.UnixNano()
lastUpdateByKey syncMap[int64]
}

func (c *cacheWithInfo) updateTx() {
c.lastUpdate.Store(time.Now().UnixNano())
}

func (c *cacheWithInfo) updateByKeyTx(key string) {
c.lastUpdateByKey.Store(key, time.Now().UnixNano())
}

func (c *cacheWithInfo) isNewerThan(key string, t int64) bool {
if c.lastUpdate.Load() > t {
return true
}
if update, ok := c.lastUpdateByKey.Load(key); ok && update > t {
return true
}
return false
}

type (
queryKey struct{}
stmtKey struct{}
Expand All @@ -18,7 +51,7 @@ type (
func ExportMetrics() string {
res := ""
for query, cache := range caches {
stats := cache.cache.Stats()
stats := cache.Stats()
progress := "["
for i := 0; i < 20; i++ {
if i < int(stats.HitRatio()*20) {
Expand All @@ -43,7 +76,7 @@ type CacheStats struct {
func ExportCacheStats() map[string]CacheStats {
res := make(map[string]CacheStats)
for query, cache := range caches {
stats := cache.cache.Stats()
stats := cache.Stats()
res[query] = CacheStats{
Query: query,
HitRatio: stats.HitRatio(),
Expand All @@ -56,7 +89,7 @@ func ExportCacheStats() map[string]CacheStats {

func PurgeAllCaches() {
for _, cache := range caches {
cache.cache.Purge()
cache.Purge()
}
}

Expand Down Expand Up @@ -109,3 +142,20 @@ func replaceFn(ctx context.Context, key string) (*cacheRows, error) {
}
return cacheRows.clone(), nil
}

type syncMap[T any] struct {
m sync.Map
}

func (m *syncMap[T]) Load(key string) (T, bool) {
var zero T
v, ok := m.m.Load(key)
if !ok {
return zero, false
}
return v.(T), true
}

func (m *syncMap[T]) Store(key string, value T) {
m.m.Store(key, value)
}
39 changes: 23 additions & 16 deletions template/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,18 +51,18 @@ func init() {

conditions := query.Select.Conditions
if isSingleUniqueCondition(conditions, query.Select.Table) {
caches[normalized] = cacheWithInfo{
caches[normalized] = &cacheWithInfo{
Cache: sc.NewMust(replaceFn, 10*time.Minute, 10*time.Minute),
query: normalized,
info: *query.Select,
cache: sc.NewMust(replaceFn, 10*time.Minute, 10*time.Minute),
uniqueOnly: true,
}
continue
}
caches[query.Query] = cacheWithInfo{
caches[query.Query] = &cacheWithInfo{
Cache: sc.NewMust(replaceFn, 10*time.Minute, 10*time.Minute),
query: query.Query,
info: *query.Select,
cache: sc.NewMust(replaceFn, 10*time.Minute, 10*time.Minute),
uniqueOnly: false,
}

Expand Down Expand Up @@ -105,7 +105,8 @@ var (
type cacheConn struct {
inner driver.Conn
tx bool
cleanUp []func()
txStart int64 // time.Time.UnixNano()
cleanUp cleanUpTask
}

func (c *cacheConn) Prepare(rawQuery string) (driver.Stmt, error) {
Expand Down Expand Up @@ -148,23 +149,26 @@ func (c *cacheConn) Begin() (driver.Tx, error) {
return nil, err
}
c.tx = true
c.txStart = time.Now().UnixNano()
return &cacheTx{conn: c, inner: inner}, nil
}

func (c *cacheConn) BeginTx(ctx context.Context, opts driver.TxOptions) (driver.Tx, error) {
var inner driver.Tx
var err error
if i, ok := c.inner.(driver.ConnBeginTx); ok {
inner, err := i.BeginTx(ctx, opts)
inner, err = i.BeginTx(ctx, opts)
if err != nil {
return nil, err
}
} else {
inner, err = c.inner.Begin()
if err != nil {
return nil, err
}
c.tx = true
return &cacheTx{conn: c, inner: inner}, nil
}
inner, err := c.inner.Begin()
if err != nil {
return nil, err
}
c.tx = true
c.txStart = time.Now().UnixNano()
return &cacheTx{conn: c, inner: inner}, nil
}

Expand All @@ -185,18 +189,21 @@ type cacheTx struct {
func (t *cacheTx) Commit() error {
t.conn.tx = false
defer func() {
for _, c := range t.conn.cleanUp {
c()
for _, c := range t.conn.cleanUp.purge {
c.Purge()
}
for _, forget := range t.conn.cleanUp.forget {
forget.cache.Forget(forget.key)
}
t.conn.cleanUp = t.conn.cleanUp[:0]
t.conn.cleanUp.reset()
}()
return t.inner.Commit()
}

func (t *cacheTx) Rollback() error {
t.conn.tx = false
// no need to clean up
t.conn.cleanUp = nil
t.conn.cleanUp.reset()
return t.inner.Rollback()
}

Expand Down
39 changes: 23 additions & 16 deletions template/driver.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -50,18 +50,18 @@ func init() {

conditions := query.Select.Conditions
if isSingleUniqueCondition(conditions, query.Select.Table) {
caches[normalized] = cacheWithInfo{
caches[normalized] = &cacheWithInfo{
Cache: sc.NewMust(replaceFn, 10*time.Minute, 10*time.Minute),
query: normalized,
info: *query.Select,
cache: sc.NewMust(replaceFn, 10*time.Minute, 10*time.Minute),
uniqueOnly: true,
}
continue
}
caches[query.Query] = cacheWithInfo{
caches[query.Query] = &cacheWithInfo{
Cache: sc.NewMust(replaceFn, 10*time.Minute, 10*time.Minute),
query: query.Query,
info: *query.Select,
cache: sc.NewMust(replaceFn, 10*time.Minute, 10*time.Minute),
uniqueOnly: false,
}

Expand Down Expand Up @@ -104,7 +104,8 @@ var (
type cacheConn struct {
inner driver.Conn
tx bool
cleanUp []func()
txStart int64 // time.Time.UnixNano()
cleanUp cleanUpTask
}

func (c *cacheConn) Prepare(rawQuery string) (driver.Stmt, error) {
Expand Down Expand Up @@ -147,23 +148,26 @@ func (c *cacheConn) Begin() (driver.Tx, error) {
return nil, err
}
c.tx = true
c.txStart = time.Now().UnixNano()
return &cacheTx{conn: c, inner: inner}, nil
}

func (c *cacheConn) BeginTx(ctx context.Context, opts driver.TxOptions) (driver.Tx, error) {
var inner driver.Tx
var err error
if i, ok := c.inner.(driver.ConnBeginTx); ok {
inner, err := i.BeginTx(ctx, opts)
inner, err = i.BeginTx(ctx, opts)
if err != nil {
return nil, err
}
} else {
inner, err = c.inner.Begin()
if err != nil {
return nil, err
}
c.tx = true
return &cacheTx{conn: c, inner: inner}, nil
}
inner, err := c.inner.Begin()
if err != nil {
return nil, err
}
c.tx = true
c.txStart = time.Now().UnixNano()
return &cacheTx{conn: c, inner: inner}, nil
}

Expand All @@ -184,18 +188,21 @@ type cacheTx struct {
func (t *cacheTx) Commit() error {
t.conn.tx = false
defer func() {
for _, c := range t.conn.cleanUp {
c()
for _, c := range t.conn.cleanUp.purge {
c.Purge()
}
for _, forget := range t.conn.cleanUp.forget {
forget.cache.Forget(forget.key)
}
t.conn.cleanUp = t.conn.cleanUp[:0]
t.conn.cleanUp.reset()
}()
return t.inner.Commit()
}

func (t *cacheTx) Rollback() error {
t.conn.tx = false
// no need to clean up
t.conn.cleanUp = nil
t.conn.cleanUp.reset()
return t.inner.Rollback()
}

Expand Down
Loading

0 comments on commit 4f1538d

Please sign in to comment.