Skip to content

Commit

Permalink
add minio/s3 usage for file storage
Browse files Browse the repository at this point in the history
  • Loading branch information
patrit committed Dec 3, 2023
1 parent fa7b39f commit a07ffc6
Show file tree
Hide file tree
Showing 7 changed files with 164 additions and 20 deletions.
6 changes: 5 additions & 1 deletion .env.template
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
OPENAI_API_KEY=<your_key>
OTEL_EXPORTER_OTLP_ENDPOINT="http://localhost:4317"
PGVECTOR_DB="postgresql+psycopg2://postgres:postgres@localhost:5432/db"
PGVECTOR_DB="postgresql+psycopg2://postgres:postgres@localhost:5432/db"
S3_BUCKET=texttitan
S3_URL=localhost:9000
S3_ACCESS_KEY=<s3_username>
S3_SECRET_KEY=<s3_passwd>
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,5 @@ db
.vscode/
.env
.venv/
minio/
!minio/.keepme
16 changes: 16 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,27 @@ services:
- db:/var/lib/postgresql/data
jaeger:
image: jaegertracing/all-in-one:1.51
restart: always
ports:
- "16686:16686"
- "4317:4317"
environment:
- LOG_LEVEL=debug
minio:
image: quay.io/minio/minio:RELEASE.2023-11-11T08-14-41Z
restart: always
ports:
- "9000:9000"
- "9090:9090"
command: server /data --console-address ":9090"
environment:
- "MINIO_ROOT_USER=root"
- "MINIO_ROOT_PASSWORD=password"
volumes:
- minio:/data

volumes:
db:
driver: local
minio:
driver: local
Empty file added minio/.keepme
Empty file.
86 changes: 85 additions & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

72 changes: 54 additions & 18 deletions privatellm/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import logging
import os
import tempfile
from datetime import timedelta
from enum import Enum
from pathlib import Path
from typing import TYPE_CHECKING, Any, cast
Expand All @@ -14,7 +15,7 @@
from bs4 import BeautifulSoup
from dotenv import load_dotenv
from fastapi import Depends, FastAPI, HTTPException, Request, UploadFile
from fastapi.responses import FileResponse
from fastapi.responses import RedirectResponse
from fastapi.security import HTTPBasic, HTTPBasicCredentials
from langchain.agents import create_sql_agent, initialize_agent
from langchain.agents.agent_toolkits import SQLDatabaseToolkit
Expand Down Expand Up @@ -47,8 +48,10 @@
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.tools import Tool
from langchain.vectorstores.pgvector import PGVector
from miniopy_async import Minio
from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.instrumentation.aiohttp_client import AioHttpClientInstrumentor
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentor
from opentelemetry.instrumentation.logging import LoggingInstrumentor
Expand Down Expand Up @@ -79,6 +82,13 @@

# Postgres connection string
CONNECTION_STRING = os.getenv("PGVECTOR_DB", "")
# S3 settinga
S3_BUCKET = os.getenv("S3_BUCKET", "texttitan")
S3_URL = os.getenv("S3_URL", "localhost:9000")
S3_ACCESS_KEY = os.getenv("S3_ACCESS_KEY", "")
S3_SECRET_KEY = os.getenv("S3_SECRET_KEY", "")

s3client = Minio("localhost:9000", access_key=S3_ACCESS_KEY, secret_key=S3_SECRET_KEY, secure=False)


class ModelEnum(Enum):
Expand Down Expand Up @@ -112,6 +122,7 @@ class DocumentTypeEnum(Enum):
Psycopg2Instrumentor().instrument(enable_commenter=True, commenter_options={})
LoggingInstrumentor().instrument(set_logging_format=True)
HTTPXClientInstrumentor().instrument()
AioHttpClientInstrumentor().instrument()
RequestsInstrumentor().instrument()
resource = Resource(attributes={"service.name": "texttitan"})
provider = TracerProvider(resource=resource)
Expand Down Expand Up @@ -143,7 +154,7 @@ def load_single_document(file_path: Path) -> list[Document]:
ext = file_path.suffix.lower()
if ext in LOADER_MAPPING:
loader_class, loader_args = LOADER_MAPPING[ext]
loader: UnstructuredBaseLoader = loader_class(file_path, **loader_args)
loader: UnstructuredBaseLoader = loader_class(str(file_path), **loader_args)
return loader.load()

raise ValueError(f"Unsupported file extension '{ext}'")
Expand Down Expand Up @@ -198,32 +209,50 @@ async def update_embedding(documents: list[Document], username: str) -> None:


@app.get("/files/{userpath}")
async def get_files(userpath: str, username: str = Depends(authenticate_user)) -> str:
async def get_files(userpath: str, username: str = Depends(authenticate_user)) -> list[dict[str, Any]]:
# Get the file path and serve it
if userpath != username:
raise HTTPException(
status_code=403,
detail=f"Forbidden to access /files/{userpath}",
)
files = Path(f"files/{userpath}/").glob("*")
return str(files)
prefix = Path("files") / userpath
objects = await s3client.list_objects(S3_BUCKET, start_after=str(prefix), recursive=True)
return [
{
"object_name": obj.object_name,
"bucket_name": obj.bucket_name,
"content_type": obj.content_type,
"etag": obj.etag,
"last_modified": obj.last_modified,
"size": obj.size,
}
for obj in objects
]


@app.get("/files/{userpath}/{filename}")
async def get_file(userpath: str, filename: str, username: str = Depends(authenticate_user)) -> FileResponse:
async def get_file(userpath: str, filename: str, username: str = Depends(authenticate_user)) -> RedirectResponse:
# Get the file path and serve it
if userpath != username:
raise HTTPException(
status_code=403,
detail=f"Forbidden to access /files/{userpath}/{filename}",
)
file_path = Path("files") / userpath / filename
if not file_path.exists():
url = await s3client.get_presigned_url(
"GET",
S3_BUCKET,
str(file_path),
expires=timedelta(hours=1),
)

if url is None:
raise HTTPException(
status_code=404,
detail=f"No such file /files/{userpath}/{filename}",
)
return FileResponse(file_path)
return RedirectResponse(url)


@app.delete("/files/{userpath}/{filename}")
Expand All @@ -235,23 +264,24 @@ async def delete_file(userpath: str, filename: str, username: str = Depends(auth
detail=f"Forbidden to access /files/{userpath}/{filename}",
)
file_path = Path("files") / userpath / filename
if not file_path.exists():
objects = await s3client.list_objects(S3_BUCKET, prefix=str(file_path))
if not objects:
raise HTTPException(
status_code=404,
detail=f"No such file /files/{userpath}/{filename}",
detail=f"No such file /{file_path}",
)
engine = create_engine(CONNECTION_STRING)
with engine.connect() as session:
session.execute(
text(
"""delete from langchain_pg_embedding
where cmetadata ->> 'source' = ':path'
where cmetadata ->> 'source' = :path
and collection_id =
(select uuid from langchain_pg_collection where name = ':username');"""
(select uuid from langchain_pg_collection where name = :username);"""
).bindparams(path=str(file_path), username=username)
)
session.commit()
file_path.unlink(missing_ok=True)
await s3client.remove_object(S3_BUCKET, str(file_path))
return ""


Expand All @@ -263,12 +293,9 @@ async def upload_files(
) -> dict[str, list[Path]]:
filenames = []
dirname = Path("files") / username
if not dirname.exists():
dirname.mkdir()
for pdf_file in pdf_files:
fn = dirname / cast(str, pdf_file.filename)
with fn.open("wb") as f:
f.write(pdf_file.file.read())
await s3client.put_object(bucket_name=S3_BUCKET, object_name=str(fn), data=pdf_file.file, length=pdf_file.size)
filenames.append(fn)
await update_files(filenames, documenttype, username)
return {"uploaded_filenames": filenames}
Expand Down Expand Up @@ -629,5 +656,14 @@ async def parsing_llm(query: str, question: str) -> Any:
return await llm_chain.arun({"question": question, "summaries": query})


if __name__ == "__main__":
async def main() -> None:
bucket_exists = await s3client.bucket_exists(S3_BUCKET)
if not bucket_exists:
await s3client.make_bucket(S3_BUCKET)
uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True, workers=2) # noqa: S104


if __name__ == "__main__":
import asyncio

asyncio.run(main())
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ opentelemetry-instrumentation-requests = "*"
opentelemetry-instrumentation-psycopg2 = "*"
opentelemetry-instrumentation-logging = "*"
opentelemetry-instrumentation-httpx = "*"
opentelemetry-instrumentation-aiohttp-client = "*"
python-dotenv = "^1.0.0"
miniopy-async = "*"

[tool.poetry.dev-dependencies]
pytest = "*"
Expand Down

0 comments on commit a07ffc6

Please sign in to comment.