-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathoperations.py
76 lines (58 loc) · 2.5 KB
/
operations.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
import os
import pandas as pd
import logging
from dataland.storage import storage, dataset_template
class Operation(object):
def perform(self):
raise NotImplemented
class AppendOperation(Operation):
INPUT = ''
IGNORE_DUPLICATES=False # requires full dataset to be read into memory
def perform(self):
storage.pull(self.__class__.INPUT)
input = storage.local_path(self.__class__.INPUT)
template = dataset_template(input)
new_records = self.new_records()
assert (set(new_records.columns.values) == set(template.columns)), 'new_records do not match existing data template'
new_records = new_records.reindex(columns=template.columns.tolist())
if self.__class__.IGNORE_DUPLICATES:
old_records = pd.read_csv(input)
new_records = pd.old_records.concat(new_records).drop_duplicates()
with open(storage.local_path(self.__class__.INPUT), 'a') as input_file:
new_records.to_csv(input_file, index=False, header=False)
logging.info('{} updated {} records to {}'.format(self.__class__.__name__, len(new_records), self.__class__.INPUT))
storage.push(self.__class__.INPUT)
def new_records(self):
raise NotImplemented
'''
returns a dataframe containing new records to be appended
'''
class TransformOperation(Operation):
INPUTS={}
OUTPUT=''
def perform(self):
input_dataframes = {}
for input, path in self.__class__.INPUTS.items():
storage.pull(path)
input_dataframes[input] = pd.read_csv(storage.local_path(path))
output_dataframe = self.transform(**input_dataframes)
with open(storage.local_path(self.__class__.OUTPUT), 'w+') as output_file:
output_dataframe.to_csv(output_file, index=False)
logging.info('{} transformed {} records into {}'.format(self.__class__.__name__, len(output_dataframe), self.__class__.OUTPUT))
storage.push(self.__class__.OUTPUT)
def transform(self, input_dataframe):
raise NotImplemented
'''
returns new output dataframe
'''
class UpdateOperation(TransformOperation):
INPUT=''
def __init__(self, *args, **kwargs):
self.__class__.OUTPUT=self.__class__.INPUT
self.transform = self.update
super(UpdateOperation, self).__init__(*args, **kwargs)
def update(self, input_dataframe):
raise NotImplemented
'''
returns updated version of `input_dataframe`
'''