Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Release 1.5.8 #1559

Merged
merged 1 commit into from
Aug 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 20 additions & 9 deletions src/apps/answers/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -2181,6 +2181,13 @@ async def copy_applet_files(self, applet_id: uuid.UUID):
logger.info("Copy applet files...")
files = await self._get_applet_files_list(self.answer_session_source, self.storage_source, applet_id)

size_source = sum([f["Size"] for f in files])
logger.info(f"Total size on source: {size_source}")

files_target = await self._get_applet_files_list(self.answer_session_target, self.storage_target, applet_id)
size_target = sum([f["Size"] for f in files_target])
logger.info(f"Total size on target: {size_target}")

tasks = []
# copy files concurrently
for file in files:
Expand All @@ -2196,22 +2203,26 @@ async def copy_applet_files(self, applet_id: uuid.UUID):
logger.info(f"Processed [{i} / {total}] {int(i / total * 100)}%")
logger.info("Copy applet files done")

size_source = sum([f["Size"] for f in files])
logger.info(f"Total size on source: {size_source}")

files_target = await self._get_applet_files_list(self.answer_session_target, self.storage_target, applet_id)
size_target = sum([f["Size"] for f in files_target])
logger.info(f"Total size on source: {size_source}")
logger.info(f"Total size on target: {size_target}")
if size_source != size_target:
logger.error(f"!!!Applet '{applet_id}' size doesn't match!!!")

async def transfer(self, applet_id: uuid.UUID):
async def transfer(self, applet_id: uuid.UUID, *, copy_db: bool = True, copy_files: bool = True):
applet = await AppletsCRUD(self.session).get_by_id(applet_id)
logger.info(f"Move answers for applet '{applet.display_name}'({applet.id})")

async with atomic(self.answer_session_target):
await self.copy_answers(applet.id)
async with atomic(self.answer_session_target):
await self.copy_answer_items(applet.id)
if copy_db:
async with atomic(self.answer_session_target):
await self.copy_answers(applet.id)
async with atomic(self.answer_session_target):
await self.copy_answer_items(applet.id)
else:
logger.info("Skip copying database")

await self.copy_applet_files(applet_id)
if copy_files:
await self.copy_applet_files(applet_id)
else:
logger.info("Skip copying files")
140 changes: 139 additions & 1 deletion src/apps/workspaces/commands/arbitrary_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,20 @@
from sqlalchemy.exc import ProgrammingError
from sqlalchemy.ext.asyncio import create_async_engine

from apps.file.storage import select_storage
from apps.answers.service import AnswerTransferService
from apps.file.storage import create_client, select_storage
from apps.users.cruds.user import UsersCRUD
from apps.users.errors import UserIsDeletedError, UserNotFound
from apps.workspaces.constants import StorageType
from apps.workspaces.domain.workspace import (
WorkspaceArbitrary,
WorkSpaceArbitraryConsoleOutput,
WorkspaceArbitraryCreate,
WorkspaceArbitraryFields,
)
from apps.workspaces.errors import ArbitraryServerSettingsError, WorkspaceNotFoundError
from apps.workspaces.service.workspace import WorkspaceService
from config import settings
from infrastructure.commands.utils import coro
from infrastructure.database import atomic, session_manager

Expand Down Expand Up @@ -297,3 +300,138 @@ async def ping(owner_email: str = typer.Argument(..., help="Workspace owner emai
except httpx.HTTPError as e:
error_msg("File upload error")
error(str(e))


def _s_lower(s: str | None) -> str | None:
return s.lower() if s else s


def _is_db_same(source_arb_data: WorkspaceArbitrary | None, target_arb_data: WorkspaceArbitrary | None) -> bool:
checks = [source_arb_data is None, target_arb_data is None]
if all(checks):
return True
if any(checks):
return False
assert source_arb_data
assert target_arb_data
return _s_lower(source_arb_data.database_uri) == _s_lower(target_arb_data.database_uri)


def _is_bucket_same(source_arb_data: WorkspaceArbitrary | None, target_arb_data: WorkspaceArbitrary | None) -> bool:
checks = [source_arb_data is None, target_arb_data is None]
if all(checks):
return True
if any(checks):
return False

to_check = (
"storage_type",
"storage_url",
"storage_access_key",
"storage_secret_key",
"storage_region",
"storage_bucket",
)

for attr in to_check:
if _s_lower(getattr(source_arb_data, attr)) != _s_lower(getattr(target_arb_data, attr)):
return False

return True


@app.command(
short_help=(
"Copy answers (DB records, files) from source arbitrary (or internal) to target arbitrary (or internal) server."
"WARNING: ensure source and target are different!"
)
)
@coro
async def copy_applet_answers(
applet_ids: list[uuid.UUID] = typer.Argument(..., help="A list of Applet IDs for data copying."),
source_owner_email: Optional[str] = typer.Option(
None,
"--src-owner-email",
"-s",
help="Source workspace owner email. Internal server will be used by default.",
),
target_owner_email: Optional[str] = typer.Option(
None,
"--tgt-owner-email",
"-t",
help="Target workspace owner email. Internal server will be used by default.",
),
skip_db: bool = typer.Option(
False,
is_flag=True,
help="Skip DB records copying.",
),
skip_files: bool = typer.Option(
False,
is_flag=True,
help="Skip files copying.",
),
) -> None:
if not source_owner_email and not target_owner_email:
error("Source or target should be set")
if skip_db and skip_files:
error("Nothing to copy: DB and files skipped")

source_arb_data = None
target_arb_data = None

session_maker = session_manager.get_session()
async with session_maker() as session:
user_crud = UsersCRUD(session)

if source_owner_email:
try:
source_owner = await user_crud.get_by_email(source_owner_email)
source_arb_data = await WorkspaceService(
session, source_owner.id
).get_arbitrary_info_by_owner_id_if_use_arbitrary(source_owner.id, in_use_only=False)
except (UserNotFound, UserIsDeletedError):
error(f"User with email {source_owner_email} not found")
except WorkspaceNotFoundError as e:
error(str(e))
if target_owner_email:
try:
target_owner = await user_crud.get_by_email(target_owner_email)
target_arb_data = await WorkspaceService(
session, target_owner.id
).get_arbitrary_info_by_owner_id_if_use_arbitrary(target_owner.id, in_use_only=False)
except (UserNotFound, UserIsDeletedError):
error(f"User with email {target_owner} not found")
except WorkspaceNotFoundError as e:
error(str(e))

if source_arb_data is None and target_arb_data is None:
error("No arbitrary data found")

copy_db = False if skip_db else not _is_db_same(source_arb_data, target_arb_data)
copy_files = False if skip_files else not _is_bucket_same(source_arb_data, target_arb_data)

source_db_uri = source_arb_data.database_uri if source_arb_data else settings.database.url
target_db_uri = target_arb_data.database_uri if target_arb_data else settings.database.url
async with session_manager.get_session(source_db_uri)() as source_session:
await AnswerTransferService.check_db(source_session)
async with session_manager.get_session(target_db_uri)() as target_session:
await AnswerTransferService.check_db(target_session)

source_bucket = create_client(source_arb_data)
try:
await source_bucket.check()
except Exception as e:
error_msg(str(e))
raise
target_bucket = create_client(target_arb_data)
try:
await target_bucket.check()
except Exception as e:
error_msg(str(e))
raise

service = AnswerTransferService(session, source_session, target_session, source_bucket, target_bucket)

for applet_id in applet_ids:
await service.transfer(applet_id, copy_db=copy_db, copy_files=copy_files)
9 changes: 9 additions & 0 deletions src/infrastructure/utility/cdn_arbitrary.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from typing import BinaryIO

import boto3
import botocore
from azure.storage.blob import BlobSasPermissions, BlobServiceClient, generate_blob_sas

from infrastructure.utility.cdn_client import CDNClient
Expand All @@ -11,11 +12,15 @@

class ArbitraryS3CdnClient(CDNClient):
def configure_client(self, config: CdnConfig):
client_config = botocore.config.Config(
max_pool_connections=25,
)
return boto3.client(
"s3",
aws_access_key_id=self.config.access_key,
aws_secret_access_key=self.config.secret_key,
region_name=self.config.region,
config=client_config,
)


Expand All @@ -28,12 +33,16 @@ def generate_private_url(self, key):
return f"gs://{self.config.bucket}/{key}"

def configure_client(self, config):
client_config = botocore.config.Config(
max_pool_connections=25,
)
return boto3.client(
"s3",
aws_access_key_id=self.config.access_key,
aws_secret_access_key=self.config.secret_key,
region_name=self.config.region,
endpoint_url=self.endpoint_url,
config=client_config,
)


Expand Down
5 changes: 5 additions & 0 deletions src/infrastructure/utility/cdn_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from typing import BinaryIO

import boto3
import botocore
import httpx
from botocore.exceptions import ClientError, EndpointConnectionError

Expand Down Expand Up @@ -40,6 +41,9 @@ def generate_private_url(self, key):

def configure_client(self, config):
assert config, "set CDN"
client_config = botocore.config.Config(
max_pool_connections=25,
)

if config.access_key and config.secret_key:
return boto3.client(
Expand All @@ -48,6 +52,7 @@ def configure_client(self, config):
region_name=config.region,
aws_access_key_id=config.access_key,
aws_secret_access_key=config.secret_key,
config=client_config,
)
try:
return boto3.client("s3", region_name=config.region)
Expand Down
Loading