Skip to content

Commit

Permalink
feat: Atlas Dashboard Support (#1066)
Browse files Browse the repository at this point in the history
- metadata: extended atlas proxy with dashboard support
- databuilder: extended atlas search extractor with dashboard support
  • Loading branch information
mgorsk1 authored May 17, 2021
1 parent 32499e6 commit 14f2632
Show file tree
Hide file tree
Showing 11 changed files with 916 additions and 245 deletions.
2 changes: 1 addition & 1 deletion .github/boring-cyborg.yml
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ labelPRBasedOnFilePath:
- search/search_service/models/**/*

category:proxy:
- metadata/metadata_serivce/proxy/**/*
- metadata/metadata_service/proxy/**/*
- search/search_service/proxy/**/*

firstPRWelcomeComment: >
Expand Down
78 changes: 78 additions & 0 deletions databuilder/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,84 @@ job = DefaultJob(
job.launch()
```

#### [AtlasSearchDataExtractor](https://github.com/amundsen-io/amundsendatabuilder/blob/master/databuilder/extractor/atlas_search_data_extractor.py "AtlasSearchDataExtractor")
An extractor that is extracting Atlas Data to index compatible with Elasticsearch Search Proxy.
```python
entity_type = 'Table'
extracted_search_data_path = f'/tmp/{entity_type.lower()}_search_data.json'
process_pool_size = 5

# atlas config
atlas_url = 'localhost'
atlas_port = 21000
atlas_protocol = 'http'
atlas_verify_ssl = False
atlas_username = 'admin'
atlas_password = 'admin'
atlas_search_chunk_size = 200
atlas_details_chunk_size = 10

# elastic config
es = Elasticsearch([
{'host': 'localhost'},
])

elasticsearch_client = es
elasticsearch_new_index_key = f'{entity_type.lower()}-' + str(uuid.uuid4())
elasticsearch_new_index_key_type = '_doc'
elasticsearch_index_alias = f'{entity_type.lower()}_search_index'

job_config = ConfigFactory.from_dict({
'extractor.atlas_search_data.{}'.format(AtlasSearchDataExtractor.ATLAS_URL_CONFIG_KEY):
atlas_url,
'extractor.atlas_search_data.{}'.format(AtlasSearchDataExtractor.ATLAS_PORT_CONFIG_KEY):
atlas_port,
'extractor.atlas_search_data.{}'.format(AtlasSearchDataExtractor.ATLAS_PROTOCOL_CONFIG_KEY):
atlas_protocol,
'extractor.atlas_search_data.{}'.format(AtlasSearchDataExtractor.ATLAS_VALIDATE_SSL_CONFIG_KEY):
atlas_verify_ssl,
'extractor.atlas_search_data.{}'.format(AtlasSearchDataExtractor.ATLAS_USERNAME_CONFIG_KEY):
atlas_username,
'extractor.atlas_search_data.{}'.format(AtlasSearchDataExtractor.ATLAS_PASSWORD_CONFIG_KEY):
atlas_password,
'extractor.atlas_search_data.{}'.format(AtlasSearchDataExtractor.ATLAS_SEARCH_CHUNK_SIZE_KEY):
atlas_search_chunk_size,
'extractor.atlas_search_data.{}'.format(AtlasSearchDataExtractor.ATLAS_DETAILS_CHUNK_SIZE_KEY):
atlas_details_chunk_size,
'extractor.atlas_search_data.{}'.format(AtlasSearchDataExtractor.PROCESS_POOL_SIZE_KEY):
process_pool_size,
'extractor.atlas_search_data.{}'.format(AtlasSearchDataExtractor.ENTITY_TYPE_KEY):
entity_type,
'loader.filesystem.elasticsearch.{}'.format(FSElasticsearchJSONLoader.FILE_PATH_CONFIG_KEY):
extracted_search_data_path,
'loader.filesystem.elasticsearch.{}'.format(FSElasticsearchJSONLoader.FILE_MODE_CONFIG_KEY):
'w',
'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.FILE_PATH_CONFIG_KEY):
extracted_search_data_path,
'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.FILE_MODE_CONFIG_KEY):
'r',
'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.ELASTICSEARCH_CLIENT_CONFIG_KEY):
elasticsearch_client,
'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.ELASTICSEARCH_NEW_INDEX_CONFIG_KEY):
elasticsearch_new_index_key,
'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.ELASTICSEARCH_DOC_TYPE_CONFIG_KEY):
elasticsearch_new_index_key_type,
'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.ELASTICSEARCH_ALIAS_CONFIG_KEY):
elasticsearch_index_alias
})

if __name__ == "__main__":
task = DefaultTask(extractor=AtlasSearchDataExtractor(),
transformer=NoopTransformer(),
loader=FSElasticsearchJSONLoader())

job = DefaultJob(conf=job_config,
task=task,
publisher=ElasticsearchPublisher())

job.launch()
```

#### [VerticaMetadataExtractor](https://github.com/amundsen-io/amundsendatabuilder/blob/master/databuilder/extractor/vertica_metadata_extractor.py "MysqlMetadataExtractor")
An extractor that extracts table and column metadata including database, schema, table name, column name and column datatype from a Vertica database.

Expand Down
81 changes: 56 additions & 25 deletions databuilder/databuilder/extractor/atlas_search_data_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,25 +33,47 @@ def _filter_none(input_list: List) -> List:
return list(filter(None, input_list))

@staticmethod
def get_column_names(column_list: List) -> List:
def get_entity_names(entity_list: List) -> List:
return AtlasSearchDataExtractorHelpers._filter_none(
[c.get('attributes').get('name') for c in column_list if c.get('status').lower() == 'active'])
[e.get('attributes').get('name') for e in entity_list if e.get('status').lower() == 'active'])

@staticmethod
def get_column_descriptions(column_list: List) -> List:
def get_entity_descriptions(entity_list: List) -> List:
return AtlasSearchDataExtractorHelpers._filter_none(
[c.get('attributes').get('description') for c in column_list if c.get('status').lower() == 'active'])
[e.get('attributes', dict()).get('description') for e in entity_list
if e.get('status').lower() == 'active'])

@staticmethod
def get_badges_from_classifications(classifications: List) -> List:
return AtlasSearchDataExtractorHelpers._filter_none(
[c.get('typeName') for c in classifications if c.get('entityStatus', '').lower() == 'active'])

@staticmethod
def get_tags_from_glossary_terms(meanings: List) -> List:
def get_display_text(meanings: List) -> List:
return AtlasSearchDataExtractorHelpers._filter_none(
[c.get('displayText') for c in meanings if c.get('entityStatus', '').lower() == 'active'])

@staticmethod
def get_last_successful_execution_timestamp(executions: List) -> int:
successful_executions = AtlasSearchDataExtractorHelpers._filter_none(
[e.get('attributes').get('timestamp') for e in executions
if e.get('status', '').lower() == 'active' and e.get('attributes', dict()).get('state') == 'succeeded'])

try:
return max(successful_executions)
except ValueError:
return 0

@staticmethod
def get_chart_names(queries: List) -> List[str]:
charts = []

for query in queries:
_charts = query.get('relationshipAttributes', dict()).get('charts', [])
charts += _charts

return AtlasSearchDataExtractorHelpers.get_display_text(charts)


class AtlasSearchDataExtractor(Extractor):
ATLAS_URL_CONFIG_KEY = 'atlas_url'
Expand Down Expand Up @@ -79,9 +101,6 @@ class AtlasSearchDataExtractor(Extractor):
ATLAS_MAX_RETRIES_KEY: 2,
PROCESS_POOL_SIZE_KEY: 10})

# @todo fill out below fields for TableESDocument
# tags: List[str],

# es_document field, atlas field path, modification function, default_value
FIELDS_MAPPING_SPEC: type_fields_mapping_spec = {
'Table': [
Expand All @@ -95,25 +114,49 @@ class AtlasSearchDataExtractor(Extractor):
('total_usage', 'attributes.popularityScore', lambda x: int(x), 0),
('unique_usage', 'attributes.uniqueUsage', lambda x: int(x), 1),
('column_names', 'relationshipAttributes.columns',
lambda x: AtlasSearchDataExtractorHelpers.get_column_names(x), []),
lambda x: AtlasSearchDataExtractorHelpers.get_entity_names(x), []),
('column_descriptions', 'relationshipAttributes.columns',
lambda x: AtlasSearchDataExtractorHelpers.get_column_descriptions(x), []),
lambda x: AtlasSearchDataExtractorHelpers.get_entity_descriptions(x), []),
('tags', 'relationshipAttributes.meanings',
lambda x: AtlasSearchDataExtractorHelpers.get_tags_from_glossary_terms(x), []),
lambda x: AtlasSearchDataExtractorHelpers.get_display_text(x), []),
('badges', 'classifications',
lambda x: AtlasSearchDataExtractorHelpers.get_badges_from_classifications(x), []),
('display_name', 'attributes.qualifiedName', lambda x: x.split('@')[0], None),
('schema_description', 'attributes.parameters.sourceDescription', None, None),
('programmatic_descriptions', 'attributes.parameters', lambda x: [str(s) for s in list(x.values())], {})
],
'Dashboard': [
('group_name', 'relationshipAttributes.group.attributes.name', None, None),
('name', 'attributes.name', None, None),
('description', 'attributes.description', None, None),
('total_usage', 'attributes.popularityScore', lambda x: int(x), 0),
('product', 'attributes.product', None, None),
('cluster', 'attributes.cluster', None, None),
('group_description', 'relationshipAttributes.group.attributes.description', None, None),
('query_names', 'relationshipAttributes.queries',
lambda x: AtlasSearchDataExtractorHelpers.get_entity_names(x), []),
('chart_names', 'relationshipAttributes.queries',
lambda x: AtlasSearchDataExtractorHelpers.get_chart_names(x), []),
('group_url', 'relationshipAttributes.group.attributes.url', None, None),
('url', 'attributes.url', None, None),
('uri', 'attributes.qualifiedName', None, None),
('last_successful_run_timestamp', 'relationshipAttributes.executions',
lambda x: AtlasSearchDataExtractorHelpers.get_last_successful_execution_timestamp(x), None),
('tags', 'relationshipAttributes.meanings',
lambda x: AtlasSearchDataExtractorHelpers.get_display_text(x), []),
('badges', 'classifications',
lambda x: AtlasSearchDataExtractorHelpers.get_badges_from_classifications(x), [])
]
}

ENTITY_MODEL_BY_TYPE = {
'Table': 'databuilder.models.table_elasticsearch_document.TableESDocument'
'Table': 'databuilder.models.table_elasticsearch_document.TableESDocument',
'Dashboard': 'databuilder.models.dashboard_elasticsearch_document.DashboardESDocument'
}

REQUIRED_RELATIONSHIPS_BY_TYPE = {
'Table': ['columns']
'Table': ['columns'],
'Dashboard': ['group', 'charts', 'executions', 'queries']
}

def init(self, conf: ConfigTree) -> None:
Expand All @@ -126,18 +169,6 @@ def init(self, conf: ConfigTree) -> None:
def entity_type(self) -> str:
return self.conf.get(AtlasSearchDataExtractor.ENTITY_TYPE_KEY)

@property
def basic_search_query(self) -> Dict:
query = {
'typeName': self.entity_type,
'excludeDeletedEntities': True,
'query': '*'
}

LOGGER.debug(f'Basic Search Query: {query}')

return query

@property
def dsl_search_query(self) -> Dict:
query = {
Expand Down
8 changes: 4 additions & 4 deletions databuilder/example/scripts/sample_atlas_search_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from databuilder.transformer.base_transformer import NoopTransformer

entity_type = 'Table'
extracted_search_data_path = '/tmp/search_data.json'
extracted_search_data_path = f'/tmp/{entity_type.lower()}_search_data.json'
process_pool_size = 5

# atlas config
Expand All @@ -33,9 +33,9 @@
])

elasticsearch_client = es
elasticsearch_new_index_key = 'tables-' + str(uuid.uuid4())
elasticsearch_new_index_key_type = 'table'
elasticsearch_index_alias = 'table_search_index'
elasticsearch_new_index_key = f'{entity_type.lower()}-' + str(uuid.uuid4())
elasticsearch_new_index_key_type = '_doc'
elasticsearch_index_alias = f'{entity_type.lower()}_search_index'

job_config = ConfigFactory.from_dict({
'extractor.atlas_search_data.{}'.format(AtlasSearchDataExtractor.ATLAS_URL_CONFIG_KEY):
Expand Down
1 change: 1 addition & 0 deletions docker-amundsen-atlas.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ services:
- PROXY_PORT=21000
- PROXY_ENCRYPTED=False
- PROXY_CLIENT=ATLAS
- METADATA_SVC_CONFIG_MODULE_CLASS=metadata_service.config.AtlasConfig
amundsenfrontend:
build:
context: ./frontend
Expand Down
2 changes: 1 addition & 1 deletion metadata/docs/proxy/atlas/popular_tables.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ The suggested formula to generate the popularity score is provided below and sho
Popularity score = number of distinct readers * log(total number of reads)
```

`Table` entity definition with `popularityScore` attribute [amundsenatlastypes==1.0.2](https://github.com/dwarszawski/amundsen-atlas-types/blob/master/amundsenatlastypes/schema/01_2_table_schema.json).
`Table` entity definition with `popularityScore` attribute [amundsenatlastypes==1.2.0](https://github.com/dwarszawski/amundsen-atlas-types/blob/master/amundsenatlastypes/schema/01_2_table_schema.json).

```json
{
Expand Down
2 changes: 1 addition & 1 deletion metadata/docs/proxy/atlas_proxy.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ Usage and Installation of `amundsenatlastypes` can be found [here](https://githu

Minimum Requirements:

- amundsenatlastypes==1.1.4
- amundsenatlastypes==1.2.0
- apache_atlas==0.0.11

### Configurations
Expand Down
Loading

0 comments on commit 14f2632

Please sign in to comment.