From aab30ee226d8cfa683472581f39956c8a6aff1d9 Mon Sep 17 00:00:00 2001 From: Alex Angelini Date: Thu, 1 Aug 2024 14:34:48 -0400 Subject: [PATCH] Add bytes scanner --- internal/db/content.go | 56 +++++++++++++++++++++++++++++++++++++----- 1 file changed, 50 insertions(+), 6 deletions(-) diff --git a/internal/db/content.go b/internal/db/content.go index e596245..dfa3c11 100644 --- a/internal/db/content.go +++ b/internal/db/content.go @@ -7,6 +7,7 @@ import ( "io" "os" "strconv" + "sync" "github.com/dgraph-io/ristretto" "github.com/jackc/pgx/v5" @@ -137,9 +138,51 @@ func (c *ContentDecoder) Decode(encoded EncodedContent) (DecodedContent, error) return output, nil } +type BytesScanner struct { + buf bytes.Buffer +} + +func (b *BytesScanner) ScanBytes(src []byte) error { + _, err := b.buf.Write(src) + return err +} + +func (b *BytesScanner) Len() int { + return b.buf.Len() +} + +func (b *BytesScanner) Bytes() []byte { + return b.buf.Bytes() +} + +func (b *BytesScanner) Reset() { + b.buf.Reset() +} + +type BytesScannerPool struct { + pool sync.Pool +} + +func NewBytesScannerPool() *BytesScannerPool { + return &BytesScannerPool{ + pool: sync.Pool{ + New: func() any { + return &BytesScanner{} + }, + }, + } +} + +func (b *BytesScannerPool) GetScanner() *BytesScanner { + scanner := b.pool.Get().(*BytesScanner) + scanner.Reset() + return scanner +} + type ContentLookup struct { cache *ristretto.Cache decoders *puddle.Pool[*ContentDecoder] + scanners *BytesScannerPool } func NewContentLookup() (*ContentLookup, error) { @@ -173,6 +216,7 @@ func NewContentLookup() (*ContentLookup, error) { return &ContentLookup{ cache: cache, decoders: decoders, + scanners: NewBytesScannerPool(), }, nil } @@ -190,13 +234,13 @@ func (cl *ContentLookup) Lookup(ctx context.Context, tx pgx.Tx, hashesToLookup m value, found := cl.cache.Get(hash.Hex()) if found { if isEncoded { - decoded, err := decoder.Value().Decode(value.(EncodedContent)) + decoded, err := decoder.Value().Decode(value.(*BytesScanner).Bytes()) if err != nil { return nil, fmt.Errorf("cannot decode value from cache %v: %w", hash.Hex(), err) } contents[hash] = decoded } else { - contents[hash] = value.(DecodedContent) + contents[hash] = value.(*BytesScanner).Bytes() } } else { notFound = append(notFound, hash) @@ -215,7 +259,7 @@ func (cl *ContentLookup) Lookup(ctx context.Context, tx pgx.Tx, hashesToLookup m for rows.Next() { var hash Hash - var value []byte + value := cl.scanners.GetScanner() err = rows.Scan(&hash.H1, &hash.H2, &value) if err != nil { @@ -223,16 +267,16 @@ func (cl *ContentLookup) Lookup(ctx context.Context, tx pgx.Tx, hashesToLookup m } // This is a content addressable cache, any cached value will never be updated - cl.cache.Set(hash.Hex(), value, int64(len(value))) + cl.cache.Set(hash.Hex(), value, int64(value.Len())) if hashesToLookup[hash] { - decoded, err := decoder.Value().Decode(value) + decoded, err := decoder.Value().Decode(value.Bytes()) if err != nil { return nil, fmt.Errorf("cannot decode value from content table %v: %w", hash.Hex(), err) } contents[hash] = decoded } else { - contents[hash] = value + contents[hash] = value.Bytes() } }