Skip to content

Commit

Permalink
Fix s3 tests
Browse files Browse the repository at this point in the history
  • Loading branch information
mawandm committed Aug 16, 2024
1 parent d073e53 commit 9c9d3b9
Showing 1 changed file with 187 additions and 0 deletions.
187 changes: 187 additions & 0 deletions nesis/api/tests/core/document_loaders/test_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import nesis.api.core.services as services
import nesis.api.core.document_loaders.s3 as s3
from nesis.api import tests
from nesis.api.core.document_loaders.stores import SqlDocumentStore
from nesis.api.core.models import DBSession
from nesis.api.core.models import initialize_engine
from nesis.api.core.models.entities import (
Expand Down Expand Up @@ -331,3 +332,189 @@ def test_unsync_s3_documents(
)
documents = session.query(Document).all()
assert len(documents) == 0


@mock.patch("nesis.api.core.document_loaders.s3.boto3.client")
def test_extract_documents(
client: mock.MagicMock, cache: mock.MagicMock, session: Session
) -> None:
destination_sql_url = tests.config["database"]["url"]
# destination_sql_url = "mssql+pymssql://sa:Pa55woR.d12345@localhost:11433/master"
data = {
"name": "s3 documents",
"engine": "s3",
"connection": {
"endpoint": "https://s3.endpoint",
"access_key": "",
"secret_key": "",
"dataobjects": "buckets",
"mode": "extract",
"destination": {
"sql": {"url": destination_sql_url},
},
},
}

datasource = Datasource(
name=data["name"],
connection=data["connection"],
source_type=DatasourceType.S3,
status=DatasourceStatus.ONLINE,
)

session.add(datasource)
session.commit()

http_client = mock.MagicMock()
http_client.upload.return_value = json.dumps({})
s3_client = mock.MagicMock()

client.return_value = s3_client
paginator = mock.MagicMock()
paginator.paginate.return_value = [
{
"KeyCount": 1,
"Contents": [
{
"Key": "image.jpg",
"LastModified": strptime("2023-07-18 06:40:07"),
"ETag": '"d41d8cd98f00b204e9800998ecf8427e"',
"Size": 0,
"StorageClass": "STANDARD",
"Owner": {
"DisplayName": "webfile",
"ID": "75aa57f09aa0c8caeab4f8c24e99d10f8e7faeebf76c078efc7c6caea54ba06a",
},
}
],
}
]
s3_client.get_paginator.return_value = paginator

ingestor = s3.Processor(
config=tests.config,
http_client=http_client,
cache_client=cache,
datasource=datasource,
)

extract_store = SqlDocumentStore(
url=data["connection"]["destination"]["sql"]["url"]
)

with Session(extract_store._engine) as session:
initial_count = len(
session.query(ingestor._extract_runner._extraction_store.Store)
.filter()
.all()
)

ingestor.run(
metadata={"datasource": "documents"},
)

_, upload_kwargs = http_client.upload.call_args_list[0]
url = upload_kwargs["url"]
metadata = upload_kwargs["metadata"]
field = upload_kwargs["field"]

assert url == "http://localhost:8080/v1/extractions/text"
assert field == "file"
ut.TestCase().assertDictEqual(
metadata,
{
"datasource": "documents",
"file_name": "buckets/image.jpg",
"self_link": "https://s3.endpoint/buckets/image.jpg",
},
)

with Session(extract_store._engine) as session:
all_documents = (
session.query(ingestor._extract_runner._extraction_store.Store)
.filter()
.all()
)
assert len(all_documents) == initial_count + 1


@mock.patch("nesis.api.core.document_loaders.s3.boto3.client")
def test_unextract_documents(
client: mock.MagicMock, cache: mock.MagicMock, session: Session
) -> None:
"""
Test deleting of s3 documents from the rag engine if they have been deleted from the s3 bucket
"""
destination_sql_url = tests.config["database"]["url"]
data = {
"name": "s3 documents",
"engine": "s3",
"connection": {
"endpoint": "https://s3.endpoint",
"access_key": "",
"secret_key": "",
"dataobjects": "buckets",
"mode": "extract",
"destination": {
"sql": {"url": destination_sql_url},
},
},
}
datasource = Datasource(
name=data["name"],
connection=data["connection"],
source_type=DatasourceType.MINIO,
status=DatasourceStatus.ONLINE,
)

session.add(datasource)
session.commit()

http_client = mock.MagicMock()
s3_client = mock.MagicMock()

client.return_value = s3_client
s3_client.head_object.side_effect = Exception("HeadObject Not Found")

minio_ingestor = s3.Processor(
config=tests.config,
http_client=http_client,
cache_client=cache,
datasource=datasource,
)

extract_store = SqlDocumentStore(
url=data["connection"]["destination"]["sql"]["url"]
)

with Session(extract_store._engine) as session:
session.query(minio_ingestor._extract_runner._extraction_store.Store).delete()
document = minio_ingestor._extract_runner._extraction_store.Store()
document.base_uri = data["connection"]["endpoint"]
document.uuid = str(uuid.uuid4())
document.filename = "invalid.pdf"
document.extract_metadata = {"data": [{"doc_id": str(uuid.uuid4())}]}
document.store_metadata = {
"bucket_name": "some-bucket",
"object_name": "file/path.pdf",
}
document.last_modified = datetime.datetime.utcnow()

session.add(document)
session.commit()

initial_count = len(
session.query(minio_ingestor._extract_runner._extraction_store.Store)
.filter()
.all()
)

minio_ingestor.run(
metadata={"datasource": "documents"},
)

with Session(extract_store._engine) as session:
documents = session.query(
minio_ingestor._extract_runner._extraction_store.Store
).all()
assert len(documents) == initial_count - 1

0 comments on commit 9c9d3b9

Please sign in to comment.