Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: reuse documents/embeddings for files with the same sha256sum (+fix pgx bug) #464

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions knowledge/pkg/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type SharedIngestionOpts struct {
IsDuplicateFuncName string
Metadata map[string]string
ReuseEmbeddings bool
ReuseFiles bool
}

type IngestPathsOpts struct {
Expand Down
2 changes: 2 additions & 0 deletions knowledge/pkg/client/standalone.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ func (c *StandaloneClient) IngestFromWorkspace(ctx context.Context, datasetID st
ExtraMetadata: meta,
IngestionFlows: opts.IngestionFlows,
ReuseEmbeddings: opts.ReuseEmbeddings,
ReuseFiles: opts.ReuseFiles,
}

_, err = c.Ingest(log.ToCtx(ctx, log.FromCtx(ctx).With("filepath", file).With("absolute_path", iopts.FileMetadata.AbsolutePath)), datasetID, finfo.Name, fileContent, iopts)
Expand Down Expand Up @@ -165,6 +166,7 @@ func (c *StandaloneClient) IngestPaths(ctx context.Context, datasetID string, op
IsDuplicateFuncName: opts.IsDuplicateFuncName,
ExtraMetadata: extraMetadata,
ReuseEmbeddings: opts.ReuseEmbeddings,
ReuseFiles: opts.ReuseFiles,
}

if opts != nil {
Expand Down
1 change: 1 addition & 0 deletions knowledge/pkg/cmd/askdir.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ func (s *ClientAskDir) Run(cmd *cobra.Command, args []string) error {
SharedIngestionOpts: client.SharedIngestionOpts{
IsDuplicateFuncName: s.DeduplicationFuncName,
ReuseEmbeddings: true,
ReuseFiles: true,
},
IgnoreExtensions: strings.Split(s.IgnoreExtensions, ","),
Concurrency: s.Concurrency,
Expand Down
1 change: 1 addition & 0 deletions knowledge/pkg/cmd/ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ func (s *ClientIngest) run(ctx context.Context, filePath string) error {
IsDuplicateFuncName: s.DeduplicationFuncName,
Metadata: metadata,
ReuseEmbeddings: true,
ReuseFiles: true,
},
IgnoreExtensions: strings.Split(s.IgnoreExtensions, ","),
Concurrency: s.Concurrency,
Expand Down
104 changes: 95 additions & 9 deletions knowledge/pkg/datastore/ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package datastore
import (
"bytes"
"context"
"crypto/sha256"
"fmt"
"log/slog"
"os"
Expand All @@ -29,6 +30,7 @@ type IngestOpts struct {
IngestionFlows []flows.IngestionFlow
ExtraMetadata map[string]any
ReuseEmbeddings bool
ReuseFiles bool
}

// Ingest loads a document from a reader and adds it to the dataset.
Expand Down Expand Up @@ -160,25 +162,105 @@ func (s *Datastore) Ingest(ctx context.Context, datasetID string, filename strin
return nil, fmt.Errorf("%w (file %q)", &documentloader.UnsupportedFileTypeError{FileType: filetype}, opts.FileMetadata.AbsolutePath)
}

start := time.Now()
checksum := sha256.Sum256(content)
slog.Debug("File checksum calculated", "size", len(content), "duration", time.Since(start))
opts.FileMetadata.Checksum = fmt.Sprintf("%x", checksum)

// Mandatory Transformation: Add filename to metadata -> append extraMetadata, but do not override filename or absPath
metadata := map[string]any{"filename": filename, "absPath": opts.FileMetadata.AbsolutePath, "fileSize": opts.FileMetadata.Size, "embeddingModel": s.EmbeddingModelProvider.EmbeddingModelName()}
metadata := map[string]any{"filename": filename, "absPath": opts.FileMetadata.AbsolutePath, "fileSize": opts.FileMetadata.Size, "embeddingModel": s.EmbeddingModelProvider.EmbeddingModelName(), "fileChecksum": fmt.Sprintf("%x", checksum)}
for k, v := range opts.ExtraMetadata {
if _, ok := metadata[k]; !ok {
metadata[k] = v
}
}
em := &transformers.ExtraMetadata{Metadata: metadata}
ingestionFlow.Transformations = append(ingestionFlow.Transformations, em)

docs, err := ingestionFlow.Run(ctx, bytes.NewReader(content), filename)
if err != nil {
statusLog.With("status", "failed").Error("Ingestion Flow failed", "error", err)
return nil, fmt.Errorf("ingestion flow failed for file %q: %w", filename, err)
// Reuse existing file if possible
// TODO: this should honor textsplitter and loading settings somehow to allow for changing them and not use the existing embeddings and documents data
var docs []vs.Document
if opts.ReuseFiles {
slog.Info("Checking if existing file can be reused", "checksum", opts.FileMetadata.Checksum)

fs, err := s.Index.FindFilesByMetadata(ctx, "", types.FileMetadata{Checksum: opts.FileMetadata.Checksum}, false)
if err != nil {
slog.Debug("Failed to find file with checksum", "error", err, "checksum", metadata["checksum"])
} else if len(fs) > 0 {
fileLoop:
for _, f := range fs {

// check if the dataset embeddingsconfig matches - if not, we don't have to fetch the documents for this file
ds, err := s.GetDataset(ctx, f.Dataset, nil)
if err != nil || ds == nil {
slog.Debug("Failed to get dataset", "error", err)
continue
}
if ds.EmbeddingsProviderConfig == nil {
slog.Debug("Dataset has no embeddings provider config")
continue
}
dsEmbeddingProvider, err := embeddings.ProviderFromConfig(*ds.EmbeddingsProviderConfig)
if err != nil {
slog.Debug("Failed to get embeddings model provider", "error", err)
continue
}

if dsEmbeddingProvider.EmbeddingModelName() != s.EmbeddingModelProvider.EmbeddingModelName() {
slog.Debug("Embeddings model mismatch", "dataset", ds.ID, "attached", dsEmbeddingProvider.EmbeddingModelName(), "configured", s.EmbeddingModelProvider.EmbeddingModelName())
continue
}

nf, err := s.FindFile(ctx, f)
if err != nil || nf == nil {
slog.Debug("Failed to get file", "error", err)
continue
}
f = *nf
slog.Info("Found existing file that could be reused", "fileID", f.ID, "checksum", opts.FileMetadata.Checksum, "documents", len(f.Documents), "dataset", f.Dataset)

docs = make([]vs.Document, len(f.Documents))
for i, existingDoc := range f.Documents {
document, err := s.Vectorstore.GetDocument(ctx, existingDoc.ID, f.Dataset)
if err != nil {
// At this point we have to do all or none, as a new ingestion process could generate different chunks, e.g. due to different tokenization in a VLM processing step
slog.Info("Failed to get document, aborting file embedding reuse process", "error", err, "docID", existingDoc.ID)
continue fileLoop
}
docs[i] = vs.Document{
ID: uuid.NewString(), // new UUID for the document to avoid collisions with reused docs
Metadata: document.Metadata, // some keys will be overridden in the transformers
Content: document.Content,
Embedding: document.Embedding,
}
}

slog.Info("Reused existing file", "fileID", f.ID, "checksum", opts.FileMetadata.Checksum, "documents", len(docs), "dataset", f.Dataset)
break
}
}
}

em := &transformers.ExtraMetadata{Metadata: metadata}
ingestionFlow.Transformations = append(ingestionFlow.Transformations, em)

// Only run ingestion flow if we're not re-using the details of an existing file and its documents
if len(docs) == 0 {
statusLog.With("status", "skipped").Info("Ingested document", "num_documents", 0)
return nil, nil
docs, err = ingestionFlow.Run(ctx, bytes.NewReader(content), filename)
if err != nil {
statusLog.With("status", "failed").Error("Ingestion Flow failed", "error", err)
return nil, fmt.Errorf("ingestion flow failed for file %q: %w", filename, err)
}

if len(docs) == 0 {
statusLog.With("status", "skipped").Info("Ingested document", "num_documents", 0)
return nil, nil
}
} else {
// We reused documents, so we only need to run the transformers on them, e.g. to add the metadata
docs, err = ingestionFlow.RunTransformers(ctx, docs, statusLog)
if err != nil {
statusLog.With("status", "failed").Error("Failed to run transformers on reused documents", "error", err)
return nil, fmt.Errorf("failed to run transformers on reused documents: %w", err)
}
}

// Sort documents
Expand All @@ -203,6 +285,9 @@ func (s *Datastore) Ingest(ctx context.Context, datasetID string, filename strin
if opts.ReuseEmbeddings {
slog.Debug("Checking if existing embeddings can be reused", "count", len(docs))
for i, doc := range docs {
if len(doc.Embedding) > 0 {
continue
}
existingDocs, err := s.Vectorstore.GetDocuments(ctx, "", nil, []cg.WhereDocument{
{
Operator: cg.WhereDocumentOperatorEquals,
Expand Down Expand Up @@ -287,6 +372,7 @@ func (s *Datastore) Ingest(ctx context.Context, datasetID string, filename strin
dbFile.FileMetadata.AbsolutePath = opts.FileMetadata.AbsolutePath
dbFile.FileMetadata.Size = opts.FileMetadata.Size
dbFile.FileMetadata.ModifiedAt = opts.FileMetadata.ModifiedAt
dbFile.FileMetadata.Checksum = opts.FileMetadata.Checksum
}

iLog := statusLog.With("component", "index")
Expand Down
3 changes: 3 additions & 0 deletions knowledge/pkg/datastore/transformers/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ type ExtraMetadata struct {
func (e *ExtraMetadata) Transform(_ context.Context, docs []vs.Document) ([]vs.Document, error) {
for i, doc := range docs {
metadata := doc.Metadata
if metadata == nil {
metadata = make(map[string]any)
}
for k, v := range e.Metadata {
metadata[k] = v
}
Expand Down
18 changes: 15 additions & 3 deletions knowledge/pkg/flows/flows.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,22 +168,34 @@ func (f *IngestionFlow) Run(ctx context.Context, reader io.Reader, filename stri
/*
* Transform documents
*/
transformerLog := phaseLog.With("stage", "transformer").With(slog.Int("num_documents", len(docs))).With(slog.Int("num_transformers", len(f.Transformations)))
docs, err = f.RunTransformers(ctx, docs, phaseLog)
if err != nil {
return nil, err
}

return f.AddDocIDs(docs), nil
}

func (f *IngestionFlow) RunTransformers(ctx context.Context, docs []vs.Document, log *slog.Logger) ([]vs.Document, error) {
var err error
transformerLog := log.With("stage", "transformer").With(slog.Int("num_documents", len(docs))).With(slog.Int("num_transformers", len(f.Transformations)))
transformerLog.With("status", "starting").Info("Starting document transformers")
docs, err = f.Transform(ctx, docs)
if err != nil {
transformerLog.With("progress", "failed").Error("Failed to transform documents", "error", err)
return nil, fmt.Errorf("failed to transform documents: %w", err)
}
transformerLog.With("status", "completed").Info("Transformed documents", "new_num_documents", len(docs))
return docs, nil
}

func (f *IngestionFlow) AddDocIDs(docs []vs.Document) []vs.Document {
for i, doc := range docs {
if doc.ID == "" {
docs[i].ID = uuid.NewString()
}
}

return docs, nil
return docs
}

type RetrievalFlow struct {
Expand Down
1 change: 1 addition & 0 deletions knowledge/pkg/index/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type Index interface {
DeleteFile(ctx context.Context, datasetID, fileID string) error
FindFile(ctx context.Context, searchFile types.File) (*types.File, error)
FindFileByMetadata(ctx context.Context, dataset string, metadata types.FileMetadata, includeDocuments bool) (*types.File, error)
FindFilesByMetadata(ctx context.Context, dataset string, metadata types.FileMetadata, includeDocuments bool) ([]types.File, error)

// Advanced File Operations
PruneFiles(ctx context.Context, datasetID string, pathPrefix string, keep []string) ([]types.File, error)
Expand Down
4 changes: 4 additions & 0 deletions knowledge/pkg/index/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,10 @@ func (i *Index) FindFileByMetadata(ctx context.Context, dataset string, metadata
return i.DB.FindFileByMetadata(ctx, dataset, metadata, includeDocuments)
}

func (i *Index) FindFilesByMetadata(ctx context.Context, dataset string, metadata types.FileMetadata, includeDocuments bool) ([]types.File, error) {
return i.DB.FindFilesByMetadata(ctx, dataset, metadata, includeDocuments, false)
}

func (i *Index) GetDocumentByID(ctx context.Context, documentID string) (*types.Document, error) {
return i.DB.GetDocument(ctx, documentID)
}
Expand Down
4 changes: 4 additions & 0 deletions knowledge/pkg/index/sqlite/sqlite.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,10 @@ func (i *Index) FindFileByMetadata(ctx context.Context, dataset string, metadata
return i.DB.FindFileByMetadata(ctx, dataset, metadata, includeDocuments)
}

func (i *Index) FindFilesByMetadata(ctx context.Context, dataset string, metadata types.FileMetadata, includeDocuments bool) ([]types.File, error) {
return i.DB.FindFilesByMetadata(ctx, dataset, metadata, includeDocuments, false)
}

func (i *Index) GetDocumentByID(ctx context.Context, documentID string) (*types.Document, error) {
return i.DB.GetDocument(ctx, documentID)
}
Expand Down
1 change: 1 addition & 0 deletions knowledge/pkg/index/types/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type FileMetadata struct {
AbsolutePath string `json:"absolute_path"`
Size int64 `json:"size"`
ModifiedAt time.Time `json:"modified_at"`
Checksum string `json:"checksum"`
}

type Document struct {
Expand Down
36 changes: 31 additions & 5 deletions knowledge/pkg/index/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,12 +185,25 @@ func (db *DB) FindFile(ctx context.Context, searchFile File) (*File, error) {
}

func (db *DB) FindFileByMetadata(ctx context.Context, dataset string, metadata FileMetadata, includeDocuments bool) (*File, error) {
var file File
fs, err := db.FindFilesByMetadata(ctx, dataset, metadata, includeDocuments, true)
if err != nil {
return nil, err
}
if len(fs) == 0 {
return nil, ErrDBFileNotFound
}
return &fs[0], nil
}

func (db *DB) FindFilesByMetadata(ctx context.Context, dataset string, metadata FileMetadata, includeDocuments bool, firstOnly bool) ([]File, error) {
tx := db.WithContext(ctx)
if includeDocuments {
tx = tx.Preload("Documents")
}
tx = tx.Where("dataset = ?", dataset)

if dataset != "" {
tx = tx.Where("dataset = ?", dataset)
}

if metadata.Name != "" {
tx = tx.Where("name = ?", metadata.Name)
Expand All @@ -204,13 +217,26 @@ func (db *DB) FindFileByMetadata(ctx context.Context, dataset string, metadata F
if !metadata.ModifiedAt.IsZero() {
tx = tx.Where("modified_at = ?", metadata.ModifiedAt)
}
if metadata.Checksum != "" {
tx = tx.Where("checksum = ?", metadata.Checksum)
}

if firstOnly {
var file File
err := tx.First(&file).Error
if err != nil {
return nil, ErrDBFileNotFound
}
return []File{file}, nil
}

err := tx.First(&file).Error
var files []File
err := tx.Find(&files).Error
if err != nil {
return nil, ErrDBFileNotFound
return nil, err
}
return files, nil

return &file, nil
}

func (db *DB) GetDocument(ctx context.Context, documentID string) (*Document, error) {
Expand Down
27 changes: 24 additions & 3 deletions knowledge/pkg/vectorstore/chromem/chromem.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,26 @@ func (s *ChromemStore) ExportCollectionsToFile(ctx context.Context, path string,
return s.db.ExportToFile(path, false, "", collections...)
}

func (s *ChromemStore) GetDocument(ctx context.Context, documentID, collection string) (vs.Document, error) {
col := s.db.GetCollection(collection, s.embeddingFunc)
if col == nil {
return vs.Document{}, fmt.Errorf("%w: %q", errors.ErrCollectionNotFound, collection)
}

doc, err := col.GetByID(ctx, documentID)
if err != nil {
return vs.Document{}, err
}

return vs.Document{
ID: doc.ID,
Metadata: convertStringMapToAnyMap(doc.Metadata),
Content: doc.Content,
Embedding: doc.Embedding,
}, nil

}

func (s *ChromemStore) GetDocuments(ctx context.Context, collection string, where map[string]string, whereDocument []chromem.WhereDocument) ([]vs.Document, error) {
col := s.db.GetCollection(collection, s.embeddingFunc)
if col == nil {
Expand All @@ -247,9 +267,10 @@ func (s *ChromemStore) GetDocuments(ctx context.Context, collection string, wher
var docs []vs.Document
for _, doc := range cdocs {
docs = append(docs, vs.Document{
ID: doc.ID,
Metadata: convertStringMapToAnyMap(doc.Metadata),
Content: doc.Content,
ID: doc.ID,
Metadata: convertStringMapToAnyMap(doc.Metadata),
Content: doc.Content,
Embedding: doc.Embedding,
})
}

Expand Down
Loading
Loading