Skip to content

Commit

Permalink
[BUGFIX] gh-1734 (#1741)
Browse files Browse the repository at this point in the history
### Feature or Bugfix
- Bugfix


### Detail

- After tables are synced and if the tables are deleted from the glue db
then delete the tables where they are used in the shares and add an
activity records for it

### Relates
- #1734

### Security
Please answer the questions below briefly where applicable, or write
`N/A`. Based on
[OWASP 10](https://owasp.org/Top10/en/).

- Does this PR introduce or modify any input fields or queries - this
includes
fetching data from storage outside the application (e.g. a database, an
S3 bucket)?
  - Is the input sanitized?
- What precautions are you taking before deserializing the data you
consume?
  - Is injection prevented by parametrizing queries?
  - Have you ensured no `eval` or similar functions are used?
- Does this PR introduce any functionality or component that requires
authorization?
- How have you ensured it respects the existing AuthN/AuthZ mechanisms?
  - Are you logging failed auth attempts?
- Are you using or adding any cryptographic features?
  - Do you use a standard proven implementations?
  - Are the used keys controlled by the customer? Where are they stored?
- Are you introducing any new policies/roles/users?
  - Have you used the least-privilege principle? How?


By submitting this pull request, I confirm that my contribution is made
under the terms of the Apache 2.0 license.
  • Loading branch information
TejasRGitHub authored Feb 3, 2025
1 parent d6265d1 commit 6b4d239
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 7 deletions.
Original file line number Diff line number Diff line change
@@ -1,16 +1,21 @@
import logging
from datetime import datetime
from typing import List

from sqlalchemy.sql import and_

from dataall.base.db import exceptions
from dataall.core.activity.db.activity_models import Activity
from dataall.modules.s3_datasets.db.dataset_models import (
DatasetTableColumn,
DatasetTable,
S3Dataset,
DatasetTableDataFilter,
)
from dataall.base.utils import json_utils
from dataall.modules.shares_base.db.share_object_repositories import ShareObjectRepository
from dataall.modules.shares_base.db.share_object_models import ShareObjectItem
from dataall.modules.shares_base.services.shares_enums import ShareItemStatus

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -61,11 +66,33 @@ def get_dataset_table_by_uri(session, table_uri):
return table

@staticmethod
def update_existing_tables_status(existing_tables, glue_tables):
def update_existing_tables_status(existing_tables, glue_tables, session):
for existing_table in existing_tables:
if existing_table.GlueTableName not in [t['Name'] for t in glue_tables]:
existing_table.LastGlueTableStatus = 'Deleted'
logger.info(f'Existing Table {existing_table.GlueTableName} status set to Deleted from Glue')
# Once the table item is deleted from glue and no longer part of the dataset
# Find out where this item is used in shares and delete all the share items.
share_item_status_filter = [ShareItemStatus.Share_Succeeded.value]
share_object_items: List[ShareObjectItem] = (
ShareObjectRepository.list_share_object_items_for_item_with_status(
session=session, item_uri=existing_table.tableUri, status=share_item_status_filter
)
)
logger.info(
f'Found {len(share_object_items)} share objects where the table {existing_table.tableUri} is present as a share item in state: {share_item_status_filter}. Deleting those share items'
)
for share_object_item in share_object_items:
activity = Activity(
action='SHARE_OBJECT_ITEM:DELETE',
label='SHARE_OBJECT_ITEM:DELETE',
owner='dataall-automation',
summary=f'dataall-automation deleted share object: {share_object_item.itemName} with uri: {share_object_item.itemUri} since the glue table associated was deleted from source glue db',
targetUri=share_object_item.itemUri,
targetType='share_object_item',
)
session.add(activity)
session.delete(share_object_item)
elif (
existing_table.GlueTableName in [t['Name'] for t in glue_tables]
and existing_table.LastGlueTableStatus == 'Deleted'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,7 @@ def sync_existing_tables(session, uri, glue_tables=None):
existing_tables = DatasetTableRepository.find_dataset_tables(session, uri)
existing_table_names = [e.GlueTableName for e in existing_tables]
existing_dataset_tables_map = {t.GlueTableName: t for t in existing_tables}

DatasetTableRepository.update_existing_tables_status(existing_tables, glue_tables)
DatasetTableRepository.update_existing_tables_status(existing_tables, glue_tables, session)
log.info(f'existing_tables={glue_tables}')
for table in glue_tables:
if table['Name'] not in existing_table_names:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,7 @@
from dataall.modules.datasets_base.db.dataset_repositories import DatasetBaseRepository
from dataall.modules.notifications.db.notification_models import Notification
from dataall.modules.shares_base.db.share_object_models import ShareObjectItem, ShareObject
from dataall.modules.shares_base.services.shares_enums import (
ShareItemHealthStatus,
PrincipalType,
)
from dataall.modules.shares_base.services.shares_enums import ShareItemHealthStatus, PrincipalType, ShareableType

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -411,6 +408,8 @@ def list_shareable_items_of_type(session, share, type, share_type_model, share_t
)
if status:
query = query.filter(ShareObjectItem.status.in_(status))
if type == ShareableType.Table:
query = query.filter(share_type_model.LastGlueTableStatus == 'InSync')
return query

@staticmethod
Expand Down Expand Up @@ -455,6 +454,14 @@ def list_active_share_object_for_dataset(session, dataset_uri):
)
return share_objects

@staticmethod
def list_share_object_items_for_item_with_status(session, item_uri: str, status: List[str]):
return (
session.query(ShareObjectItem)
.filter(ShareObjectItem.status.in_(status), ShareObjectItem.itemUri == item_uri)
.all()
)

@staticmethod
def fetch_submitted_shares_with_notifications(session):
"""
Expand Down

0 comments on commit 6b4d239

Please sign in to comment.