Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUGFIX] gh-1734 #1741

Merged
merged 3 commits into from
Feb 3, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,16 +1,21 @@
import logging
from datetime import datetime
from typing import List

from sqlalchemy.sql import and_

from dataall.base.db import exceptions
from dataall.core.activity.db.activity_models import Activity
from dataall.modules.s3_datasets.db.dataset_models import (
DatasetTableColumn,
DatasetTable,
S3Dataset,
DatasetTableDataFilter,
)
from dataall.base.utils import json_utils
from dataall.modules.shares_base.db.share_object_repositories import ShareObjectRepository
from dataall.modules.shares_base.db.share_object_models import ShareObjectItem
from dataall.modules.shares_base.services.shares_enums import ShareItemStatus

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -61,11 +66,33 @@ def get_dataset_table_by_uri(session, table_uri):
return table

@staticmethod
def update_existing_tables_status(existing_tables, glue_tables):
def update_existing_tables_status(existing_tables, glue_tables, session):
for existing_table in existing_tables:
if existing_table.GlueTableName not in [t['Name'] for t in glue_tables]:
existing_table.LastGlueTableStatus = 'Deleted'
logger.info(f'Existing Table {existing_table.GlueTableName} status set to Deleted from Glue')
# Once the table item is deleted from glue and no longer part of the dataset
# Find out where this item is used in shares and delete all the share items.
share_item_status_filter = [ShareItemStatus.Share_Succeeded.value]
share_object_items: List[ShareObjectItem] = (
ShareObjectRepository.list_share_object_items_for_item_with_status(
session=session, item_uri=existing_table.tableUri, status=share_item_status_filter
)
)
logger.info(
f'Found {len(share_object_items)} share objects where the table {existing_table.tableUri} is present as a share item in state: {share_item_status_filter}. Deleting those share items'
)
for share_object_item in share_object_items:
activity = Activity(
action='SHARE_OBJECT_ITEM:DELETE',
label='SHARE_OBJECT_ITEM:DELETE',
owner='dataall-automation',
summary=f'dataall-automation deleted share object: {share_object_item.itemName} with uri: {share_object_item.itemUri} since the glue table associated was deleted from source glue db',
targetUri=share_object_item.itemUri,
targetType='share_object_item',
)
session.add(activity)
session.delete(share_object_item)
elif (
existing_table.GlueTableName in [t['Name'] for t in glue_tables]
and existing_table.LastGlueTableStatus == 'Deleted'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,7 @@ def sync_existing_tables(session, uri, glue_tables=None):
existing_tables = DatasetTableRepository.find_dataset_tables(session, uri)
existing_table_names = [e.GlueTableName for e in existing_tables]
existing_dataset_tables_map = {t.GlueTableName: t for t in existing_tables}

DatasetTableRepository.update_existing_tables_status(existing_tables, glue_tables)
DatasetTableRepository.update_existing_tables_status(existing_tables, glue_tables, session)
log.info(f'existing_tables={glue_tables}')
for table in glue_tables:
if table['Name'] not in existing_table_names:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,9 @@
from dataall.modules.datasets_base.db.dataset_models import DatasetBase
from dataall.modules.datasets_base.db.dataset_repositories import DatasetBaseRepository
from dataall.modules.notifications.db.notification_models import Notification
from dataall.modules.s3_datasets.db.dataset_models import S3Dataset
TejasRGitHub marked this conversation as resolved.
Show resolved Hide resolved
from dataall.modules.shares_base.db.share_object_models import ShareObjectItem, ShareObject
from dataall.modules.shares_base.services.shares_enums import (
ShareItemHealthStatus,
PrincipalType,
)
from dataall.modules.shares_base.services.shares_enums import ShareItemHealthStatus, PrincipalType, ShareableType

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -108,6 +106,14 @@ def get_share_item_by_uri(session, uri):
def get_share_item_details(session, share_type_model, item_uri):
return session.query(share_type_model).get(item_uri)

TejasRGitHub marked this conversation as resolved.
Show resolved Hide resolved
@staticmethod
def get_shares_for_principal_and_database(session, principal, database):
return (
session.query(ShareObject)
.join(S3Dataset, S3Dataset.datasetUri == ShareObject.datasetUri)
.filter(and_(S3Dataset.GlueDatabaseName == database, ShareObject.principalIAMRoleName == principal))
)

@staticmethod
def remove_share_object_item(session, share_item):
session.delete(share_item)
Expand Down Expand Up @@ -411,6 +417,8 @@ def list_shareable_items_of_type(session, share, type, share_type_model, share_t
)
if status:
query = query.filter(ShareObjectItem.status.in_(status))
if type == ShareableType.Table:
query = query.filter(share_type_model.LastGlueTableStatus == 'InSync')
TejasRGitHub marked this conversation as resolved.
Show resolved Hide resolved
return query

@staticmethod
Expand Down Expand Up @@ -455,6 +463,14 @@ def list_active_share_object_for_dataset(session, dataset_uri):
)
return share_objects

@staticmethod
def list_share_object_items_for_item_with_status(session, item_uri: str, status: List[str]):
return (
session.query(ShareObjectItem)
.filter(ShareObjectItem.status.in_(status), ShareObjectItem.itemUri == item_uri)
.all()
)

@staticmethod
def fetch_submitted_shares_with_notifications(session):
"""
Expand Down