Skip to content

Commit

Permalink
feat: reuse documents/embeddings for files with the same sha256sum (+…
Browse files Browse the repository at this point in the history
…fix pgx bug) (#464)
  • Loading branch information
iwilltry42 authored Mar 7, 2025
1 parent cebcaa9 commit a6d62fe
Show file tree
Hide file tree
Showing 17 changed files with 276 additions and 26 deletions.
1 change: 1 addition & 0 deletions knowledge/pkg/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type SharedIngestionOpts struct {
IsDuplicateFuncName string
Metadata map[string]string
ReuseEmbeddings bool
ReuseFiles bool
}

type IngestPathsOpts struct {
Expand Down
2 changes: 2 additions & 0 deletions knowledge/pkg/client/standalone.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ func (c *StandaloneClient) IngestFromWorkspace(ctx context.Context, datasetID st
ExtraMetadata: meta,
IngestionFlows: opts.IngestionFlows,
ReuseEmbeddings: opts.ReuseEmbeddings,
ReuseFiles: opts.ReuseFiles,
}

_, err = c.Ingest(log.ToCtx(ctx, log.FromCtx(ctx).With("filepath", file).With("absolute_path", iopts.FileMetadata.AbsolutePath)), datasetID, finfo.Name, fileContent, iopts)
Expand Down Expand Up @@ -165,6 +166,7 @@ func (c *StandaloneClient) IngestPaths(ctx context.Context, datasetID string, op
IsDuplicateFuncName: opts.IsDuplicateFuncName,
ExtraMetadata: extraMetadata,
ReuseEmbeddings: opts.ReuseEmbeddings,
ReuseFiles: opts.ReuseFiles,
}

if opts != nil {
Expand Down
1 change: 1 addition & 0 deletions knowledge/pkg/cmd/askdir.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ func (s *ClientAskDir) Run(cmd *cobra.Command, args []string) error {
SharedIngestionOpts: client.SharedIngestionOpts{
IsDuplicateFuncName: s.DeduplicationFuncName,
ReuseEmbeddings: true,
ReuseFiles: true,
},
IgnoreExtensions: strings.Split(s.IgnoreExtensions, ","),
Concurrency: s.Concurrency,
Expand Down
1 change: 1 addition & 0 deletions knowledge/pkg/cmd/ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ func (s *ClientIngest) run(ctx context.Context, filePath string) error {
IsDuplicateFuncName: s.DeduplicationFuncName,
Metadata: metadata,
ReuseEmbeddings: true,
ReuseFiles: true,
},
IgnoreExtensions: strings.Split(s.IgnoreExtensions, ","),
Concurrency: s.Concurrency,
Expand Down
104 changes: 95 additions & 9 deletions knowledge/pkg/datastore/ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package datastore
import (
"bytes"
"context"
"crypto/sha256"
"fmt"
"log/slog"
"os"
Expand All @@ -29,6 +30,7 @@ type IngestOpts struct {
IngestionFlows []flows.IngestionFlow
ExtraMetadata map[string]any
ReuseEmbeddings bool
ReuseFiles bool
}

// Ingest loads a document from a reader and adds it to the dataset.
Expand Down Expand Up @@ -160,25 +162,105 @@ func (s *Datastore) Ingest(ctx context.Context, datasetID string, filename strin
return nil, fmt.Errorf("%w (file %q)", &documentloader.UnsupportedFileTypeError{FileType: filetype}, opts.FileMetadata.AbsolutePath)
}

start := time.Now()
checksum := sha256.Sum256(content)
slog.Debug("File checksum calculated", "size", len(content), "duration", time.Since(start))
opts.FileMetadata.Checksum = fmt.Sprintf("%x", checksum)

// Mandatory Transformation: Add filename to metadata -> append extraMetadata, but do not override filename or absPath
metadata := map[string]any{"filename": filename, "absPath": opts.FileMetadata.AbsolutePath, "fileSize": opts.FileMetadata.Size, "embeddingModel": s.EmbeddingModelProvider.EmbeddingModelName()}
metadata := map[string]any{"filename": filename, "absPath": opts.FileMetadata.AbsolutePath, "fileSize": opts.FileMetadata.Size, "embeddingModel": s.EmbeddingModelProvider.EmbeddingModelName(), "fileChecksum": fmt.Sprintf("%x", checksum)}
for k, v := range opts.ExtraMetadata {
if _, ok := metadata[k]; !ok {
metadata[k] = v
}
}
em := &transformers.ExtraMetadata{Metadata: metadata}
ingestionFlow.Transformations = append(ingestionFlow.Transformations, em)

docs, err := ingestionFlow.Run(ctx, bytes.NewReader(content), filename)
if err != nil {
statusLog.With("status", "failed").Error("Ingestion Flow failed", "error", err)
return nil, fmt.Errorf("ingestion flow failed for file %q: %w", filename, err)
// Reuse existing file if possible
// TODO: this should honor textsplitter and loading settings somehow to allow for changing them and not use the existing embeddings and documents data
var docs []vs.Document
if opts.ReuseFiles {
slog.Info("Checking if existing file can be reused", "checksum", opts.FileMetadata.Checksum)

fs, err := s.Index.FindFilesByMetadata(ctx, "", types.FileMetadata{Checksum: opts.FileMetadata.Checksum}, false)
if err != nil {
slog.Debug("Failed to find file with checksum", "error", err, "checksum", metadata["checksum"])
} else if len(fs) > 0 {
fileLoop:
for _, f := range fs {

// check if the dataset embeddingsconfig matches - if not, we don't have to fetch the documents for this file
ds, err := s.GetDataset(ctx, f.Dataset, nil)
if err != nil || ds == nil {
slog.Debug("Failed to get dataset", "error", err)
continue
}
if ds.EmbeddingsProviderConfig == nil {
slog.Debug("Dataset has no embeddings provider config")
continue
}
dsEmbeddingProvider, err := embeddings.ProviderFromConfig(*ds.EmbeddingsProviderConfig)
if err != nil {
slog.Debug("Failed to get embeddings model provider", "error", err)
continue
}

if dsEmbeddingProvider.EmbeddingModelName() != s.EmbeddingModelProvider.EmbeddingModelName() {
slog.Debug("Embeddings model mismatch", "dataset", ds.ID, "attached", dsEmbeddingProvider.EmbeddingModelName(), "configured", s.EmbeddingModelProvider.EmbeddingModelName())
continue
}

nf, err := s.FindFile(ctx, f)
if err != nil || nf == nil {
slog.Debug("Failed to get file", "error", err)
continue
}
f = *nf
slog.Info("Found existing file that could be reused", "fileID", f.ID, "checksum", opts.FileMetadata.Checksum, "documents", len(f.Documents), "dataset", f.Dataset)

docs = make([]vs.Document, len(f.Documents))
for i, existingDoc := range f.Documents {
document, err := s.Vectorstore.GetDocument(ctx, existingDoc.ID, f.Dataset)
if err != nil {
// At this point we have to do all or none, as a new ingestion process could generate different chunks, e.g. due to different tokenization in a VLM processing step
slog.Info("Failed to get document, aborting file embedding reuse process", "error", err, "docID", existingDoc.ID)
continue fileLoop
}
docs[i] = vs.Document{
ID: uuid.NewString(), // new UUID for the document to avoid collisions with reused docs
Metadata: document.Metadata, // some keys will be overridden in the transformers
Content: document.Content,
Embedding: document.Embedding,
}
}

slog.Info("Reused existing file", "fileID", f.ID, "checksum", opts.FileMetadata.Checksum, "documents", len(docs), "dataset", f.Dataset)
break
}
}
}

em := &transformers.ExtraMetadata{Metadata: metadata}
ingestionFlow.Transformations = append(ingestionFlow.Transformations, em)

// Only run ingestion flow if we're not re-using the details of an existing file and its documents
if len(docs) == 0 {
statusLog.With("status", "skipped").Info("Ingested document", "num_documents", 0)
return nil, nil
docs, err = ingestionFlow.Run(ctx, bytes.NewReader(content), filename)
if err != nil {
statusLog.With("status", "failed").Error("Ingestion Flow failed", "error", err)
return nil, fmt.Errorf("ingestion flow failed for file %q: %w", filename, err)
}

if len(docs) == 0 {
statusLog.With("status", "skipped").Info("Ingested document", "num_documents", 0)
return nil, nil
}
} else {
// We reused documents, so we only need to run the transformers on them, e.g. to add the metadata
docs, err = ingestionFlow.RunTransformers(ctx, docs, statusLog)
if err != nil {
statusLog.With("status", "failed").Error("Failed to run transformers on reused documents", "error", err)
return nil, fmt.Errorf("failed to run transformers on reused documents: %w", err)
}
}

// Sort documents
Expand All @@ -203,6 +285,9 @@ func (s *Datastore) Ingest(ctx context.Context, datasetID string, filename strin
if opts.ReuseEmbeddings {
slog.Debug("Checking if existing embeddings can be reused", "count", len(docs))
for i, doc := range docs {
if len(doc.Embedding) > 0 {
continue
}
existingDocs, err := s.Vectorstore.GetDocuments(ctx, "", nil, []cg.WhereDocument{
{
Operator: cg.WhereDocumentOperatorEquals,
Expand Down Expand Up @@ -287,6 +372,7 @@ func (s *Datastore) Ingest(ctx context.Context, datasetID string, filename strin
dbFile.FileMetadata.AbsolutePath = opts.FileMetadata.AbsolutePath
dbFile.FileMetadata.Size = opts.FileMetadata.Size
dbFile.FileMetadata.ModifiedAt = opts.FileMetadata.ModifiedAt
dbFile.FileMetadata.Checksum = opts.FileMetadata.Checksum
}

iLog := statusLog.With("component", "index")
Expand Down
3 changes: 3 additions & 0 deletions knowledge/pkg/datastore/transformers/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ type ExtraMetadata struct {
func (e *ExtraMetadata) Transform(_ context.Context, docs []vs.Document) ([]vs.Document, error) {
for i, doc := range docs {
metadata := doc.Metadata
if metadata == nil {
metadata = make(map[string]any)
}
for k, v := range e.Metadata {
metadata[k] = v
}
Expand Down
18 changes: 15 additions & 3 deletions knowledge/pkg/flows/flows.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,22 +168,34 @@ func (f *IngestionFlow) Run(ctx context.Context, reader io.Reader, filename stri
/*
* Transform documents
*/
transformerLog := phaseLog.With("stage", "transformer").With(slog.Int("num_documents", len(docs))).With(slog.Int("num_transformers", len(f.Transformations)))
docs, err = f.RunTransformers(ctx, docs, phaseLog)
if err != nil {
return nil, err
}

return f.AddDocIDs(docs), nil
}

func (f *IngestionFlow) RunTransformers(ctx context.Context, docs []vs.Document, log *slog.Logger) ([]vs.Document, error) {
var err error
transformerLog := log.With("stage", "transformer").With(slog.Int("num_documents", len(docs))).With(slog.Int("num_transformers", len(f.Transformations)))
transformerLog.With("status", "starting").Info("Starting document transformers")
docs, err = f.Transform(ctx, docs)
if err != nil {
transformerLog.With("progress", "failed").Error("Failed to transform documents", "error", err)
return nil, fmt.Errorf("failed to transform documents: %w", err)
}
transformerLog.With("status", "completed").Info("Transformed documents", "new_num_documents", len(docs))
return docs, nil
}

func (f *IngestionFlow) AddDocIDs(docs []vs.Document) []vs.Document {
for i, doc := range docs {
if doc.ID == "" {
docs[i].ID = uuid.NewString()
}
}

return docs, nil
return docs
}

type RetrievalFlow struct {
Expand Down
1 change: 1 addition & 0 deletions knowledge/pkg/index/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type Index interface {
DeleteFile(ctx context.Context, datasetID, fileID string) error
FindFile(ctx context.Context, searchFile types.File) (*types.File, error)
FindFileByMetadata(ctx context.Context, dataset string, metadata types.FileMetadata, includeDocuments bool) (*types.File, error)
FindFilesByMetadata(ctx context.Context, dataset string, metadata types.FileMetadata, includeDocuments bool) ([]types.File, error)

// Advanced File Operations
PruneFiles(ctx context.Context, datasetID string, pathPrefix string, keep []string) ([]types.File, error)
Expand Down
4 changes: 4 additions & 0 deletions knowledge/pkg/index/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,10 @@ func (i *Index) FindFileByMetadata(ctx context.Context, dataset string, metadata
return i.DB.FindFileByMetadata(ctx, dataset, metadata, includeDocuments)
}

func (i *Index) FindFilesByMetadata(ctx context.Context, dataset string, metadata types.FileMetadata, includeDocuments bool) ([]types.File, error) {
return i.DB.FindFilesByMetadata(ctx, dataset, metadata, includeDocuments, false)
}

func (i *Index) GetDocumentByID(ctx context.Context, documentID string) (*types.Document, error) {
return i.DB.GetDocument(ctx, documentID)
}
Expand Down
4 changes: 4 additions & 0 deletions knowledge/pkg/index/sqlite/sqlite.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,10 @@ func (i *Index) FindFileByMetadata(ctx context.Context, dataset string, metadata
return i.DB.FindFileByMetadata(ctx, dataset, metadata, includeDocuments)
}

func (i *Index) FindFilesByMetadata(ctx context.Context, dataset string, metadata types.FileMetadata, includeDocuments bool) ([]types.File, error) {
return i.DB.FindFilesByMetadata(ctx, dataset, metadata, includeDocuments, false)
}

func (i *Index) GetDocumentByID(ctx context.Context, documentID string) (*types.Document, error) {
return i.DB.GetDocument(ctx, documentID)
}
Expand Down
1 change: 1 addition & 0 deletions knowledge/pkg/index/types/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type FileMetadata struct {
AbsolutePath string `json:"absolute_path"`
Size int64 `json:"size"`
ModifiedAt time.Time `json:"modified_at"`
Checksum string `json:"checksum"`
}

type Document struct {
Expand Down
36 changes: 31 additions & 5 deletions knowledge/pkg/index/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,12 +185,25 @@ func (db *DB) FindFile(ctx context.Context, searchFile File) (*File, error) {
}

func (db *DB) FindFileByMetadata(ctx context.Context, dataset string, metadata FileMetadata, includeDocuments bool) (*File, error) {
var file File
fs, err := db.FindFilesByMetadata(ctx, dataset, metadata, includeDocuments, true)
if err != nil {
return nil, err
}
if len(fs) == 0 {
return nil, ErrDBFileNotFound
}
return &fs[0], nil
}

func (db *DB) FindFilesByMetadata(ctx context.Context, dataset string, metadata FileMetadata, includeDocuments bool, firstOnly bool) ([]File, error) {
tx := db.WithContext(ctx)
if includeDocuments {
tx = tx.Preload("Documents")
}
tx = tx.Where("dataset = ?", dataset)

if dataset != "" {
tx = tx.Where("dataset = ?", dataset)
}

if metadata.Name != "" {
tx = tx.Where("name = ?", metadata.Name)
Expand All @@ -204,13 +217,26 @@ func (db *DB) FindFileByMetadata(ctx context.Context, dataset string, metadata F
if !metadata.ModifiedAt.IsZero() {
tx = tx.Where("modified_at = ?", metadata.ModifiedAt)
}
if metadata.Checksum != "" {
tx = tx.Where("checksum = ?", metadata.Checksum)
}

if firstOnly {
var file File
err := tx.First(&file).Error
if err != nil {
return nil, ErrDBFileNotFound
}
return []File{file}, nil
}

err := tx.First(&file).Error
var files []File
err := tx.Find(&files).Error
if err != nil {
return nil, ErrDBFileNotFound
return nil, err
}
return files, nil

return &file, nil
}

func (db *DB) GetDocument(ctx context.Context, documentID string) (*Document, error) {
Expand Down
27 changes: 24 additions & 3 deletions knowledge/pkg/vectorstore/chromem/chromem.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,26 @@ func (s *ChromemStore) ExportCollectionsToFile(ctx context.Context, path string,
return s.db.ExportToFile(path, false, "", collections...)
}

func (s *ChromemStore) GetDocument(ctx context.Context, documentID, collection string) (vs.Document, error) {
col := s.db.GetCollection(collection, s.embeddingFunc)
if col == nil {
return vs.Document{}, fmt.Errorf("%w: %q", errors.ErrCollectionNotFound, collection)
}

doc, err := col.GetByID(ctx, documentID)
if err != nil {
return vs.Document{}, err
}

return vs.Document{
ID: doc.ID,
Metadata: convertStringMapToAnyMap(doc.Metadata),
Content: doc.Content,
Embedding: doc.Embedding,
}, nil

}

func (s *ChromemStore) GetDocuments(ctx context.Context, collection string, where map[string]string, whereDocument []chromem.WhereDocument) ([]vs.Document, error) {
col := s.db.GetCollection(collection, s.embeddingFunc)
if col == nil {
Expand All @@ -247,9 +267,10 @@ func (s *ChromemStore) GetDocuments(ctx context.Context, collection string, wher
var docs []vs.Document
for _, doc := range cdocs {
docs = append(docs, vs.Document{
ID: doc.ID,
Metadata: convertStringMapToAnyMap(doc.Metadata),
Content: doc.Content,
ID: doc.ID,
Metadata: convertStringMapToAnyMap(doc.Metadata),
Content: doc.Content,
Embedding: doc.Embedding,
})
}

Expand Down
Loading

0 comments on commit a6d62fe

Please sign in to comment.