-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy path__init__.py
887 lines (749 loc) · 30 KB
/
__init__.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
import boto3
import json
import os
import re
import time
import warnings
import ast
import importlib.resources as pkg_resources
from dataclasses import dataclass
from mojap_metadata.converters import (
BaseConverter,
_flatten_and_convert_complex_data_type,
)
from mojap_metadata.metadata.metadata import (
Metadata,
_unpack_complex_data_type,
_metadata_complex_dtype_names,
)
from mojap_metadata.converters.glue_converter import specs
from typing import Tuple, List, Union
# Format generictype: (glue_type, is_fully_supported)
# If not fully supported we decide on best option
# if glue_type is Null then we have no way to safely
# convert it
_default_type_converter = {
"null": (None, False),
"bool": ("boolean", True),
"bool_": ("boolean", True),
"int8": ("tinyint", True),
"int16": ("smallint", True),
"int32": ("int", True),
"int64": ("bigint", True),
"uint8": ("smallint", False),
"uint16": ("int", False),
"uint32": ("bigint", False),
"uint64": (None, False),
"decimal128": ("decimal", True),
"float16": ("float", False),
"float32": ("float", True),
"float64": ("double", True),
"time32": (None, False),
"time32(s)": (None, False),
"time32(ms)": (None, False),
"time64(us)": (None, False),
"time64(ns)": (None, False),
"date32": ("date", True),
"date64": ("date", True),
"timestamp(s)": ("timestamp", True),
"timestamp(ms)": ("timestamp", True),
"timestamp(us)": ("timestamp", True),
"timestamp(ns)": ("timestamp", True),
"string": ("string", True),
"large_string": ("string", True),
"utf8": ("string", True),
"large_utf8": ("string", True),
"binary": ("binary", True),
"large_binary": ("binary", True),
"list": ("array", True),
"list_": ("array", True),
"large_list": ("array", True),
"struct": ("struct", True),
}
_glue_to_mojap_type_converter = {
"boolean": ("bool", True),
"tinyint": ("int8", True),
"smallint": ("int16", False),
"int": ("int32", False),
"integer": ("int32", False),
"bigint": ("int64", False),
"double": ("float64", True),
"float": ("float32", False),
"decimal": ("decima128", True),
"char": ("string", True),
"varchar": ("string", True),
"string": ("string", True),
"binary": ("binary", False),
"date": ("date64", False),
"timestamp": ("timestamp(s)", False),
"array": ("large_list", False),
"struct": ("struct", False),
}
# The below properties are set by Glue Crawler in the Glue Data Catalog
# or have special uses in AWS. We do not want to overwrite these table properties.
# This list should be reviewed against:
# https://docs.aws.amazon.com/glue/latest/dg/table-properties-crawler.html and
# https://docs.aws.amazon.com/athena/latest/ug/alter-table-set-tblproperties.html.
_glue_table_properties_aws = [
"recordCount",
"skip.header.line.count",
"CrawlerSchemaSerializerVersion",
"classification",
"CrawlerSchemaDeserializerVersion",
"sizeKey",
"averageRecordSize",
"compressionType",
"typeOfData",
"objectCount",
"aws:RawTableLastAltered",
"ViewOriginalText",
"ViewExpandedText",
"ExternalTable:S3Location",
"ExternalTable:FileFormat",
"aws:RawType",
"aws:RawColumnComment",
"aws:RawTableComment",
"has_encrypted_data",
"orc.compress",
"parquet.compression",
"write.compression",
"compression_level",
"projection.*",
"skip.header.line.count",
"storage.location.template",
]
@dataclass
class CsvOptions:
"""
Specific options for CSV spec
"""
serde: str = "lazy"
skip_header: bool = False
sep: str = ","
quote_char: str = '"'
escape_char: str = "\\"
compressed: bool = False
@dataclass
class JsonOptions:
serde: str = "hive"
compressed: bool = False
@dataclass
class ParquetOptions:
# compression:str = "SNAPPY"
compressed: bool = True
SpecOptions = Union[CsvOptions, JsonOptions, ParquetOptions]
@dataclass
class GlueConverterOptions:
"""
Options Class for the GlueConverter
deafultcsv_serde (str):
Jinja template that is used to generate CSV ddl. Defaults to
lazy serde template. To use the open serde template you can use
options.set_csv_serde("open").
json_serde (str):
Jinja template that is used to generate JSON ddl. Can be set to
a custom json ddl template but defaults to our recommended one:
`specs/json_ddl.txt`.
parquet_template (str):
Jinja template that is used to generate PARQUET ddl. Can be set to
a custom json ddl template but defaults to our recommended one:
`specs/parquet_ddl.txt`.
default_db_name (str):
Default database name to default to when defining which database
the table belongs to. Used when calling `GlueConverter.generate_from_meta`
method and no database_name is specified.
default_db_base_path (str):
Default s3 base path default to when defining the the table exists in S3.
Used when calling `GlueConverter.generate_from_meta` method and no
table_location is specified. When no table_location is specified, the
output DDL wil define the table location as <default_db_base_path>/<table_name>/.
ignore_warnings (bool, default=False):
If converter should not warning users of imperfect type conversions.
skip_header (bool):
parameter to csv ddl function
sep (str):
parameter to csv ddl function
quote_char (str):
parameter to csv ddl function
escape_char (str):
parameter to csv ddl function
line_term_char (str):
parameter to csv ddl function
parquet_compression (str):
parameter to parquet ddl function
"""
csv = CsvOptions()
json = JsonOptions()
parquet = ParquetOptions()
default_db_name: str = None
default_db_base_path: str = None
ignore_warnings: bool = False
def set_csv_serde(self, serde_name: str):
allowed_serdes = ["lazy", "open"]
if serde_name not in allowed_serdes:
err_msg = (
f"Input serde_name must be one of {allowed_serdes} "
f"but got {serde_name}."
)
raise ValueError(err_msg)
else:
self.csv.serde = serde_name
def set_json_serde(self, serde_name: str):
allowed_serdes = ["hive", "openx"]
if serde_name not in allowed_serdes:
err_msg = (
f"Input serde_name must be one of {allowed_serdes} "
f"but got {serde_name}."
)
raise ValueError(err_msg)
else:
self.json.serde = serde_name
class GlueConverter(BaseConverter):
def __init__(self, options: GlueConverterOptions = None):
"""
Converts metadata objects to a Hive DDL.
options (GlueConverterOptions, optional): See ?GlueConverterOptions
for more details. If not set a default GlueConverterOptions is set to
the options parameter.
Example:
from mojap_metadata.converters.glue_converter import (
GlueConverter,
GlueConverterOptions,
_create_open_csv_ddl,
)
options = GlueConverterOptions(csv_ddl = _create_open_csv_ddl)
gc = GlueConverter(options)
metadata = Metadata.from_json("my-table-metadata.json")
ddl = gc.generate_from_meta(metadata) # get Glue/Hive DDL
"""
if options is None:
options = GlueConverterOptions()
super().__init__(options)
self._default_type_converter = _default_type_converter
def warn_conversion(self, coltype, converted_type, is_fully_supported):
if converted_type is None:
raise ValueError(
f"{coltype} has no equivalent in Athena/Glue " "so cannot be converted"
)
if not self.options.ignore_warnings and not is_fully_supported:
w = (
f"{coltype} is not fully supported by Athena using best "
"representation. To supress these warnings set this converters "
"options.ignore_warnings = True"
)
warnings.warn(w)
def convert_col_type(self, coltype: str) -> str:
"""Converts our metadata types to Athena/Glue versions
Args:
coltype (str): str representation of our metadata column types
Returns:
str: String representation of athena column type version of `coltype`
"""
data_type = _unpack_complex_data_type(coltype)
return _flatten_and_convert_complex_data_type(
data_type, self.convert_basic_col_type, field_sep=","
)
def convert_basic_col_type(self, coltype: str) -> str:
"""Converts our metadata types (non complex ones)
to etl-manager metadata. Used with the _flatten_and_convert_complex_data_type
and convert_col_type functions.
Args:
coltype (str): str representation of our metadata column types
Returns:
str: String representation of athena column type version of `coltype`
"""
if coltype.startswith("decimal128"):
t, is_supported = self._default_type_converter.get("decimal128")
brackets = coltype.split("(")[1].split(")")[0]
brackets = brackets.replace(" ", "")
t = f"{t}({brackets})"
elif coltype.startswith("binary"):
coltype_ = coltype.split("(", 1)[0]
t, is_supported = self._default_type_converter.get(coltype_, (None, None))
else:
t, is_supported = self._default_type_converter.get(coltype, (None, None))
self.warn_conversion(coltype, t, is_supported)
return t
def convert_columns(self, metadata: Metadata) -> Tuple[List, List]:
cols = []
partitions = []
for c in metadata.columns:
if c["name"] in metadata.partitions:
partitions.append(
{"Name": c["name"], "Type": self.convert_col_type(c["type"])}
)
if "description" in c:
partitions[-1]["Comment"] = c["description"]
else:
cols.append(
{"Name": c["name"], "Type": self.convert_col_type(c["type"])}
)
if "description" in c:
cols[-1]["Comment"] = c["description"]
return cols, partitions
def generate_from_meta(
self,
metadata: Metadata,
database_name: str = None,
table_location: str = None,
) -> dict:
"""Generates the Hive DDL from our metadata.
Args:
metadata (Metadata): metadata object from the Metadata class
database_name (str, optional): database name needed for table DDL.
If `None` this function will look to the options.default_database_name
attribute to find a name. Defaults to None.
table_location (str, optional): S3 location of where table is stored
needed for table DDL. If `None` this function will look to the
options.default_db_base_path attribute to find a name. Defaults
to None.
Raises:
ValueError: If database_name and table_location are not set (and there
are no default options set)
Returns:
dict: for Glue API
"""
# set database_name to metadata.database_name if none
database_name = database_name if database_name else metadata.database_name
# do the same with table_location
table_location = table_location if table_location else metadata.table_location
ff = metadata.file_format
if ff.startswith("csv"):
opts = self.options.csv
elif ff.startswith("json"):
opts = self.options.json
elif ff.startswith("parquet"):
opts = self.options.parquet
else:
raise ValueError(
f"No ddl template for type: {ff} in options "
"(only supports formats starting with csv, json or parquet)"
)
if not database_name:
if self.options.default_db_name:
database_name = self.options.default_db_name
else:
error_msg = (
"Either set database_name in the function "
"or set a default_db_name in the GlueConverter.options"
)
raise ValueError(error_msg)
if not table_location:
if self.options.default_db_base_path:
table_location = os.path.join(
self.options.default_db_base_path, f"{metadata.name}"
)
else:
error_msg = (
"Either set table_location in the function "
"or set a database_base_path in the "
"GlueConverter.options.database_base_path"
)
raise ValueError(error_msg)
table_cols, partition_cols = self.convert_columns(metadata)
spec = generate_spec_from_template(
database_name=database_name,
table_name=metadata.name,
location=table_location,
spec_opts=opts,
table_desc=metadata.description,
columns=table_cols,
partitions=partition_cols,
)
updated_spec = _update_table_parameters(
metadata=metadata,
table_input=spec,
)
return updated_spec
class GlueTable(BaseConverter):
def __init__(self, glue_converter_options: GlueConverterOptions = None):
super().__init__(None)
self.gc = GlueConverter(glue_converter_options)
def convert_basic_col_type(self, col_type: str):
if col_type.startswith("decimal"):
regex = re.compile(r"decimal(\(\d+,( |)\d+\)|\(\d+\))")
bracket_numbers = regex.match(col_type).groups()[0]
return f"decimal128{bracket_numbers}"
elif col_type.startswith(("char", "varchar")):
coltype_ = col_type.split("(", 1)[0]
return _glue_to_mojap_type_converter[coltype_][0]
else:
return _glue_to_mojap_type_converter[col_type][0]
def convert_col_type(self, col_type: str):
data_type = _unpack_complex_data_type(col_type)
return _flatten_and_convert_complex_data_type(
data_type, self.convert_basic_col_type, field_sep=","
)
def convert_columns(self, columns: List[dict]) -> List[dict]:
mojap_meta_cols = []
for col in columns:
mojap_col_type = self.convert_col_type(col["Type"])
if mojap_col_type.startswith(_metadata_complex_dtype_names):
normalised_col_type = col["Type"].split("<", 1)[0]
else:
normalised_col_type = col["Type"].split("(", 1)[0]
full_support = _glue_to_mojap_type_converter.get(normalised_col_type)[1]
if not full_support:
warnings.warn(
f"type {col['Type']} not fully supported, "
"likely due to multiple conversion options"
)
# create the column in mojap meta format
meta_col = {
"name": col["Name"],
"type": mojap_col_type,
}
if col.get("Comment"):
meta_col["description"] = col.get("Comment")
mojap_meta_cols.append(meta_col)
return mojap_meta_cols
def generate_from_meta(
self,
metadata: Union[Metadata, str, dict],
database_name: str = None,
table_location: str = None,
run_msck_repair: bool = False,
):
"""
Creates a glue table from metadata.
Args:
metadata: Metadata object, string path, or dictionary metadata.
database_name (optional): name of the glue database the table is to be
created in. can also be a property of the metadata.
table_location (optional): the s3 location of the table. can also be a
property of the metadata.
run_msck_repair (optional): run msck repair table on the created table,
should be set to True for tables with partitions.
Raises:
ValueError if run_msck_repair table is False, metadata has partitions, and
options.ignore_warnings is set to False
"""
# set database_name to metadata.database_name if none
database_name = database_name if database_name else metadata.database_name
# do the same with table_location
table_location = table_location if table_location else metadata.table_location
glue_client = boto3.client(
"glue",
region_name=os.getenv(
"AWS_REGION", os.getenv("AWS_DEFAULT_REGION", "eu-west-1")
),
)
metadata = Metadata.from_infer(metadata)
boto_dict = self.gc.generate_from_meta(
metadata,
database_name=database_name,
table_location=table_location,
)
# create database if it doesn't exist
_start_query_execution_and_wait(
database_name, f"CREATE DATABASE IF NOT EXISTS {database_name};"
)
# delete table if it exists
try:
glue_client.delete_table(DatabaseName=database_name, Name=metadata.name)
except glue_client.exceptions.EntityNotFoundException:
pass
glue_client.create_table(**boto_dict)
if (
not run_msck_repair
and metadata.partitions
and not self.options.ignore_warnings
):
w = (
"metadata has partitions and run_msck_repair is set to false. To "
"To supress these warnings set this converters "
"options.ignore_warnings = True"
)
warnings.warn(w)
elif run_msck_repair:
_start_query_execution_and_wait(
database_name, f"msck repair table {database_name}.{metadata.name}"
)
def generate_to_meta(
self,
database: str,
table: str,
glue_table_properties: List[str] = None,
get_primary_key: bool = False,
) -> Metadata:
"""
Generates a Metadata object for a specified table from glue.
Args:
database (str): name of the glue database
table (str): name of the table from the glue database
glue_table_properties (List[str], optional): List of table properties to get
from Glue Data Catalog if they exist. Set to "*" to retrieve all table
properties. Defaults to None.
get_primary_key (bool, optional): Set to True to update the primary_key
value in the metadata with the primary_key table property from Glue Data
Catalog if it exists. Defaults to False.
Raises:
TypeError: If primary key value is not of type list in the Glue Data
Catalog.
ValueError: If primary_key cannot be evaluated by ast.literal_eval.
Returns:
Metadata: The Metadata object for the Glue Table.
"""
# get the table information
glue_client = boto3.client("glue")
resp = glue_client.get_table(DatabaseName=database, Name=table)
# pull out just the columns
columns = resp["Table"]["StorageDescriptor"]["Columns"]
# convert the columns
mojap_meta_cols = self.convert_columns(columns)
# check for partitions
partitions = resp["Table"].get("PartitionKeys")
if partitions:
# convert
part_cols_full = self.convert_columns(partitions)
# extend the mojap_meta_cols with the partiton cols
mojap_meta_cols.extend(part_cols_full)
part_cols = [p["name"] for p in part_cols_full]
# make a metadata object
meta = Metadata(name=table, columns=mojap_meta_cols, partitions=part_cols)
else:
meta = Metadata(name=table, columns=mojap_meta_cols)
# get the file format if possible
try:
ff = resp["Table"]["StorageDescriptor"]["Parameters"].get("classification")
except KeyError:
warnings.warn("unable to parse file format, please manually set")
ff = None
if ff:
meta.file_format = ff.lower()
# update metadata dict with glue table properties from table parameters
metadata_dict = _get_table_parameters(
metadata=meta,
table_resp=resp,
glue_table_properties=glue_table_properties,
get_primary_key=get_primary_key,
)
meta = Metadata.from_dict(metadata_dict)
return meta
def _update_table_parameters(
metadata: Metadata,
table_input: dict,
) -> dict:
"""Adds the key, value pairs for primary_key and glue_table_properties from the
metadata to the parameters parameter in the table input. Will not add any key, value
pairs for keys in glue_table_properties that are protected properties defined in
_glue_table_properties_aws and/or other protected glue table properties such as
"primary_key".
Args:
metadata (Metadata): Metadata object.
table_input (dict): A dictionary defining the table metadata.
Returns:
dict: Updated table input with key, value pairs for primary_key and
glue_table_properties.
"""
table_parameters = table_input["TableInput"]["Parameters"]
table_name = table_input["TableInput"]["Name"]
if metadata.primary_key:
table_parameters["primary_key"] = str(metadata.primary_key)
table_properties_protected = _glue_table_properties_aws + ["primary_key"]
for key, value in metadata._data.get("glue_table_properties", {}).items():
if key in table_properties_protected:
warnings.warn(
f"The following property '{key}' is protected and the key, "
f"value pair '{key}: {value}' for table '{table_name}' will not be "
"written from glue_table_properties to the table properties in "
"Glue Data Catalog."
)
else:
table_parameters[key] = value
return table_input
def _get_table_parameters(
metadata: Metadata,
table_resp: dict,
glue_table_properties: List[str] = None,
get_primary_key: bool = False,
) -> dict:
"""
Adds glue_table_properties and/or primary_key from the table properties in
the Glue Data Catalog to the metadata if glue_table_properties are defined
or get_primary_key is True. Set glue_table_properties to "*" to
add glue_table_properties to the metadata with all available table properties
from the Glue Data Catalog.
Args:
metadata (Metadata): Metadata object.
table_resp (dict): A dict containing the table definition for a specified
table in Glue Data Catalog.
glue_table_properties (List[str], optional): List of table properties to get
from Glue Data Catalog if they exist. Set to "*" to retrieve all table
properties. Defaults to None.
get_primary_key (bool, optional): Set to True to update the primary_key value
in the metadata with the primary_key table property from Glue Data Catalog if it
exists. Defaults to False.
Raises:
TypeError: If primary key value is not of type list in the Glue Data Catalog.
ValueError: If primary_key cannot be evaluated by ast.literal_eval.
Returns:
dict: Updated metadata dictionary with glue table properties and primary key
from the Glue Data Catalog.
"""
metadata_dict = metadata.to_dict()
table_parameters_resp = table_resp["Table"]["Parameters"]
table_properties_keys = table_parameters_resp.keys()
table_name = table_resp["Table"]["Name"]
if "primary_key" in table_properties_keys and get_primary_key:
primary_key_value = table_parameters_resp["primary_key"]
try:
value = ast.literal_eval(primary_key_value)
if not isinstance(value, list):
raise TypeError(
"ast.literal_eval cannot evaluate the primary_key value "
f"'{primary_key_value}' type to list for table '{table_name}'. "
"Please check the Glue Data Catalog."
)
except (ValueError, SyntaxError):
raise ValueError(
"ast.literal_eval cannot evaluate the primary_key value "
f"'{primary_key_value}' for table '{table_name}'. The primary_key "
"value must be a Python literal structure: list. Please check "
"the Glue Data Catalog."
)
else:
metadata_dict["primary_key"] = value
if glue_table_properties == "*":
metadata_dict["glue_table_properties"] = table_parameters_resp
elif glue_table_properties:
glue_table_properties_dict = {}
for property in glue_table_properties:
if property in table_properties_keys:
glue_table_properties_dict[property] = table_parameters_resp[property]
else:
warnings.warn(
f"The property '{property}' was not found in the table properties "
f"for the table '{table_name}' in the Glue Data Catalog."
)
metadata_dict["glue_table_properties"] = glue_table_properties_dict
return metadata_dict
def _get_base_table_spec(spec_name: str, serde_name: str = None) -> dict:
"""Gets a table spec (dict) for a specific name
prefilled with standard properties / info for that
specific spec.
Args:
spec_name (str): Name of the spec currently -
'csv', 'json' or 'parquet'
serde_name (str): Name of the specific serde -
CSV: 'open' or 'lazy'
JSON: 'hive' or 'openx'
PARQUET: None
Returns:
dict: A base spec that can be used with boto to create a table.
Once specific details from metadata are filled into it.
"""
if serde_name:
filename = f"{serde_name}_{spec_name}_spec.json"
else:
filename = f"{spec_name}_spec.json"
table_spec = json.load(pkg_resources.open_text(specs, filename))
return table_spec
def _get_spec_and_serde_name_from_opts(spec_opts) -> Tuple[str, str]:
"""Returns the spec name and serde name for a given option Class
and parameters
Args:
spec_opts ([type]): [description]
Raises:
ValueError: [description]
Returns:
Tuple[str, str]: [description]
"""
if isinstance(spec_opts, CsvOptions):
spec_name = "csv"
serde_name = spec_opts.serde
elif isinstance(spec_opts, JsonOptions):
spec_name = "json"
serde_name = spec_opts.serde
elif isinstance(spec_opts, ParquetOptions):
spec_name = "parquet"
serde_name = None
else:
raise ValueError(
f"expected opts to be of an options Type not {type(spec_opts)}"
)
return spec_name, serde_name
def _convert_opts_into_dict(spec_opts: SpecOptions):
"""Takes the spec_opts and converts it to a dict
Just used to pass to Template.render().
Args:
spec_opts (SpecOptions): One of the SpecOptions
classes
Returns:
dict: A dict representation of the data class
"""
out_dict = {}
for k, _ in spec_opts.__annotations__:
out_dict[k] = getattr(spec_opts, k)
return out_dict
def _start_query_execution_and_wait(db: str, sql: str):
ath = boto3.client("athena")
QueryExecutionContext = {"Database": db}
WorkGroup = "primary"
res = ath.start_query_execution(
QueryString=sql,
QueryExecutionContext=QueryExecutionContext,
WorkGroup=WorkGroup,
)
query_exec_id = res["QueryExecutionId"]
while response := ath.get_query_execution(QueryExecutionId=query_exec_id):
state = response["QueryExecution"]["Status"]["State"]
if state not in ["SUCCEEDED", "FAILED"]:
time.sleep(0.25)
else:
break
if not state == "SUCCEEDED":
raise ValueError(response["QueryExecution"]["Status"].get("StateChangeReason"))
def generate_spec_from_template(
database_name,
table_name,
location,
spec_opts: SpecOptions,
table_desc="",
columns=[],
partitions=[],
):
spec_name, serde_name = _get_spec_and_serde_name_from_opts(spec_opts)
base_spec = _get_base_table_spec(spec_name, serde_name)
base_spec["Name"] = table_name
base_spec["Description"] = table_desc
base_spec["StorageDescriptor"]["Columns"] = columns
base_spec["PartitionKeys"] = partitions
base_spec["StorageDescriptor"]["Location"] = location
# Do general options
base_spec["StorageDescriptor"]["Compressed"] = spec_opts.compressed
# Do CSV options
if spec_name == "csv":
csv_param_lu = {
"sep": {"lazy": "field.delim", "open": "separatorChar"},
"quote_char": {"lazy": None, "open": "quoteChar"},
"escape_char": {"lazy": "escape.delim", "open": "escapeChar"},
}
if spec_opts.skip_header:
(
base_spec["StorageDescriptor"]["SerdeInfo"]["Parameters"][
"skip.header.line.count"
]
) = "1"
if spec_opts.sep:
param_name = csv_param_lu["sep"][serde_name]
(base_spec["StorageDescriptor"]["SerdeInfo"]["Parameters"][param_name]) = (
spec_opts.sep
)
if spec_opts.quote_char and serde_name != "lazy":
(base_spec["StorageDescriptor"]["SerdeInfo"]["Parameters"]["quoteChar"]) = (
spec_opts.quote_char
)
if spec_opts.escape_char:
param_name = csv_param_lu["escape_char"][serde_name]
(base_spec["StorageDescriptor"]["SerdeInfo"]["Parameters"][param_name]) = (
spec_opts.escape_char
)
# Do JSON options
if spec_name == "json":
json_col_paths = ",".join([c["Name"] for c in columns])
(base_spec["StorageDescriptor"]["SerdeInfo"]["Parameters"]["paths"]) = (
json_col_paths
)
out_dict = {"DatabaseName": database_name, "TableInput": base_spec}
return out_dict