-
Notifications
You must be signed in to change notification settings - Fork 41
/
Copy pathutils.py
319 lines (263 loc) · 10.4 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
"""
Module containing useful functions for validating a facility's in-lined and
many to many attributes before they are saved
"""
import json
import uuid
from django.utils import timezone
from common.models import Contact, ContactType
from common.serializers import ContactSerializer
from facilities.models import (
FacilityContact,
Service,
FacilityDepartment,
Facility,
FacilityOfficer,
JobTitle,
Officer,
OfficerContact,
FacilityUnit
)
inlining_errors = []
def _is_valid_uuid(value):
try:
uuid.UUID(value)
return True
except (ValueError, TypeError):
return False
def _validate_services(services):
errors = []
for service in services:
if not _is_valid_uuid(service.get('service', None)):
errors.append("Service has a badly formed uuid")
return errors
try:
Service.objects.get(id=service['service'])
except (ValueError, TypeError, KeyError, Service.DoesNotExist):
errors.append("service with id {} not found".format(
service.get('service')))
return errors
def _validate_units(units):
errors = []
for unit in units:
if not _is_valid_uuid(unit.get('unit', None)):
errors.append("Please provide a proper facility department")
return errors
try:
FacilityDepartment.objects.get(id=unit.get('unit'))
except FacilityDepartment.DoesNotExist:
errors.append("The facility department provided does not exist")
return errors
def _validate_contacts(contacts):
errors = []
for contact in contacts:
if not _is_valid_uuid(contact.get('contact_type', None)):
errors.append("Contact has a badly formed uuid")
try:
ContactType.objects.get(id=contact.get('contact_type'))
except ContactType.DoesNotExist:
errors.append("Contact type with the id {} was not found".format(
contact))
except (KeyError, ValueError, TypeError):
errors.append("Key contact type is missing")
if contact.get('contact') is None:
errors.append("The contact field is missing")
return errors
def inject_audit_fields(dict_a, validated_data):
audit_data = {
"created_by": validated_data['created_by'],
"updated_by": validated_data['updated_by'],
"created": (
validated_data['created'] if
validated_data.get('created') else timezone.now()),
"updated": (
validated_data['updated'] if
validated_data.get('updated') else timezone.now())
}
dict_a.update(audit_data)
return dict_a
def create_contact(contact_data, validated_data):
try:
return Contact.objects.get(contact=contact_data["contact"])
except Contact.DoesNotExist:
contact_data = inject_audit_fields(contact_data, validated_data)
contact = ContactSerializer(data=contact_data)
return contact.save() if contact.is_valid() else \
inlining_errors.append(json.dumps(contact.errors))
def create_facility_contacts(instance, contact_data, validated_data):
from facilities.serializers import FacilityContactSerializer
contact = create_contact(contact_data, validated_data)
facility_contact_data = {
"contact": contact.id,
"facility": instance.id
}
facility_contact_data_with_audit = inject_audit_fields(
facility_contact_data, validated_data)
try:
FacilityContact.objects.get(
contact_id=facility_contact_data.get('contact'),
facility_id=facility_contact_data.get('facility')
)
except FacilityContact.DoesNotExist:
fac_contact = FacilityContactSerializer(
data=facility_contact_data_with_audit)
fac_contact.save() if fac_contact.is_valid() else \
inlining_errors.append(fac_contact.errors)
def create_facility_units(instance, unit_data, validated_data):
from facilities.serializers import FacilityUnitSerializer
unit_data.pop('department_name', None)
unit_data.pop('regulating_body_name', None)
unit_data['facility'] = instance.id
unit_data = inject_audit_fields(unit_data, validated_data)
try:
FacilityUnit.objects.get(**unit_data)
except FacilityUnit.DoesNotExist:
unit = FacilityUnitSerializer(data=unit_data)
return unit.save() if unit.is_valid() else inlining_errors.append((
json.dumps(unit.errors)))
def create_facility_services(instance, service_data, validated_data):
from facilities.serializers import FacilityServiceSerializer
service_data['facility'] = instance.id
service_data = inject_audit_fields(
service_data, validated_data)
f_service = FacilityServiceSerializer(data=service_data)
f_service.save() if f_service.is_valid() else \
inlining_errors.append(json.dumps(f_service.errors))
class CreateFacilityOfficerMixin(object):
"""Mixin to create facility officers."""
def __init__(self, *args, **kwargs):
self.user = kwargs.pop('user', None)
super(CreateFacilityOfficerMixin, self).__init__(*args, **kwargs)
def _validate_required_fields(self, data):
"""
Ensure that the officer's facility, name and title are provided
"""
errs = {}
if data.get('facility_id', None) is None:
errs["facility_id"] = ["Facility is required"]
if data.get('name', None) is None:
errs["name"] = ["Name is Required"]
if data.get('title', None) is None:
errs["title"] = ["Job title is required"]
if data.get('reg_no', None) is None:
errs["registration_number"] = ["Registration Number is required"]
return errs
def _validate_facility(self, data):
"""
Confirm that the provided facility exists
"""
try:
Facility.objects.get(id=data.get('facility_id', None))
except Facility.DoesNotExist:
error_message = {
"facility_id": ["Facility provided does not exist"]
}
return error_message
def _validate_job_titles(self, data):
"""
Confirm that the provided job title exist
"""
try:
JobTitle.objects.get(id=data.get('title', None))
except JobTitle.DoesNotExist:
error_message = {
"title": ["Job title provided does not exist"]
}
return error_message
def data_is_valid(self, data):
errors = [
self._validate_required_fields(data),
self._validate_facility(data),
self._validate_job_titles(data)
]
errors = [error for error in errors if error]
if errors:
return errors
else:
return True
def _inject_creating_user(self, attributes_dict):
attributes_dict['created_by'] = self.user
attributes_dict['updated_by'] = self.user
return attributes_dict
def _create_contacts(self, data):
contacts = data.get('contacts', [])
created_contacts = []
for contact in contacts:
contact_type = ContactType.objects.get(id=contact.get('type'))
contact_dict = {
"contact_type": contact_type,
"contact": contact.get('contact')
}
try:
created_contacts.append(Contact.objects.get(**contact_dict))
except Contact.DoesNotExist:
contact_dict = self._inject_creating_user(contact_dict)
contact_dict = self._inject_creating_user(contact_dict)
created_contacts.append(Contact.objects.create(**contact_dict))
return created_contacts
def _create_facility_officer(self, data):
facility = Facility.objects.get(id=data['facility_id'])
job_title = JobTitle.objects.get(id=data['title'])
officer_dict = {
"name": data['name'],
"job_title": job_title,
}
id_no = data.get('id_no', None)
reg_no = data.get('reg_no', None)
officer_dict['id_number'] = id_no if id_no else None
officer_dict['registration_number'] = reg_no if reg_no else None
officer_dict = self._inject_creating_user(officer_dict)
try:
officer = Officer.objects.get(registration_number=reg_no)
officer.job_title = job_title
officer.name = data['name']
officer.save()
facility_officer = FacilityOfficer.objects.get(
facility=facility, officer=officer)
except Officer.DoesNotExist:
officer = Officer.objects.create(**officer_dict)
facility_officer_dict = {
"facility": facility,
"officer": officer
}
facility_officer_dict = self._inject_creating_user(
facility_officer_dict)
facility_officer = FacilityOfficer.objects.create(
**facility_officer_dict)
# link the officer to the contacts
created_contacts = self._create_contacts(data)
for contact in created_contacts:
contact_dict = {
"officer": officer,
"contact": contact
}
try:
OfficerContact.objects.get(**contact_dict)
except OfficerContact.DoesNotExist:
contact_dict = self._inject_creating_user(contact_dict)
OfficerContact.objects.create(**contact_dict)
return facility_officer
def create_officer(self, data):
from facilities.serializers import FacilityOfficerSerializer
valid_data = self.data_is_valid(data)
if valid_data is not True:
return {
"created": False,
"detail": valid_data
}
facility_officer = self._create_facility_officer(data)
serialized_officer = FacilityOfficerSerializer(facility_officer).data
return {
"created": True,
"detail": serialized_officer
}
def _officer_data_is_valid(officer_data):
create_officer_instance = CreateFacilityOfficerMixin()
validated_data = create_officer_instance.data_is_valid(officer_data)
if validated_data is True:
return True
else:
return validated_data
def _create_officer(officer_data, user):
create_officer_instance = CreateFacilityOfficerMixin(user=user)
create_officer_instance._create_facility_officer(officer_data)