-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathkpgen.py
377 lines (347 loc) · 12.4 KB
/
kpgen.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
#!/usr/bin/python3
# -*- coding: utf-8 -*-
# Final version
import random as ra
import csv
import sys
try:
from libs.kirjasto import gen_id, gen_timestamp, add_check_digit
except ImportError:
print('Error: kirjasto.py missing, please consult Fingrid Datahub test team to get new one!')
exit()
try:
from libs.fconfig import jakeluverkkoyhtio, MGA, dealers, id_range, limit
except ImportError:
print('Error: fconfig.py missing, please consult Fingrid Datahub test team to get new one!')
exit()
storage = {}
def random_supplier(): # DDQ
"""
Input: None
Output: String
"""
return ra.choice(dealers)
def random_address():
"""
Input: None
Output: String
"""
return ra.choice(list(open('libs/osoitteet.txt','r',encoding ='latin-1')))
def selector():
"""
Input: None
Output: String
"""
if jakeluverkkoyhtio:
for count, DSO_id in enumerate(jakeluverkkoyhtio, 1):
if DSO_id == '':
print('Error: please add at least one DSO to fconfig.py!')
exit()
else:
print (str(count),'-', DSO_id)
ans = 0
while not ans:
try:
ans = int(input("Select DSO (1-{}): ".format(count)))
if ans <= 0 or ans > len(jakeluverkkoyhtio):
raise ValueError
except ValueError:
ans = 0
print("Select between 1 - {}".format(count))
except SyntaxError or NameError:
ans = 0
print("Only numbers accepted.")
storage['DSO'] = jakeluverkkoyhtio[ans-1]
return (jakeluverkkoyhtio[ans-1])
else:
try:
storage['DSO'] = jakeluverkkoyhtio[0]
return (jakeluverkkoyhtio[0])
except:
print('Error: please add at least one DSO to fconfig.py!')
exit()
def input_mga():
if MGA:
for count, MGA_id in enumerate(MGA, 1):
if MGA_id == '':
print('Error: please add at least one MGA to fconfig.py!')
exit()
else:
print(str(count),'-',MGA_id)
ans = 0
while not ans:
try:
ans = int(input("Select MGA (1-{}): ".format(count)))
if ans <= 0 or ans > len(MGA):
raise ValueError
except ValueError:
ans = 0
print("Select between 1 - {}".format(count))
except SyntaxError:
ans = 0
print("Only numbers accepted.")
except NameError:
ans = 0
print("Only numbers accepted.")
storage['MGA'] = MGA[ans-1]
else:
try:
storage['MGA'] = MGA[0]
except:
print('Error: please add at least one MGA to fconfig.py!')
exit()
def ap_type():
types = [('Non-production','AG01'), ('Production','AG02')]
for count, aptype in enumerate(types, 1):
print(str(count),'-',aptype[0])
ans = 0
while not ans:
try:
ans = int(input("Select accounting point type (1-{}): ".format(count)))
except ValueError:
ans = 0
print("Select between 1 - {}".format(count))
except SyntaxError:
ans = 0
print("Only numbers accepted.")
except NameError:
ans = 0
print("Only numbers accepted.")
storage['type'] = types[ans-1][1]
def remote_readable():
readable = [('Readable','1'),('Non-readable','0')]
for count, modes in enumerate(readable, 1):
print(str(count),'-',modes[0])
ans = 0
while not ans:
try:
ans = int(input("Select accounting point mode (1-{}): ".format(count)))
except ValueError:
ans = 0
print("Select between 1 - {}".format(count))
except SyntaxError:
ans = 0
print("Only numbers accepted.")
except NameError:
ans = 0
print("Only numbers accepted.")
storage['remote'] = readable[ans-1][1]
def metering_method():
methods = [('Continunous metering','E13'),('Reading metering','E14'),('Unmetered','E16')]
for count, method in enumerate(methods, 1):
print(str(count),'-',method[0])
ans = 0
while not ans:
try:
ans = int(input("Select accounting point metering method (1-{}): ".format(count)))
except ValueError:
ans = 0
print("Select between 1 - {}".format(count))
except SyntaxError:
ans = 0
print("Only numbers accepted.")
except NameError:
ans = 0
print("Only numbers accepted.")
storage['method'] = methods[ans-1][1]
def csv_filler():
"""
Input: None
Output: string
"""
return random_supplier()
def get_input():
"""
Input: None
Output: integer
"""
ans = 0
while not ans:
try:
ans = int(input("Amount of accounting points: "))
if ans <= 0 or ans > limit:
raise ValueError
except ValueError:
ans = 0
print("Value should be between 1 - {}".format(limit))
except SyntaxError:
ans = 0
print("Only numbers accepted.")
except NameError:
ans = 0
print("Only numbers accepted.")
return ans
def apoint_gen(jvy=None, kp_lkm=None, mga=None, aptype=None, remote =None, method=None):
"""
Input: string (default: function call)
Output: list of strings
"""
if not jvy: # cmd-line parameter test
prefix=selector()[:8]
else:
prefix=jvy[:8]
if not kp_lkm: # cmd-line parameter test
amount=get_input()
else:
amount=int(kp_lkm)
if not mga: # cmd-line parameter test
input_mga()
if not aptype: # cmd-line parameter test
ap_type()
if not remote: # cmd-line parameter test
remote_readable()
if not method: # cmd-line parameter test
metering_method()
if id_range: # config-file test
if id_range <= 90000000:
vakio = id_range
else:
print('Error: id_range maximum value is 90000000')
print('Please fix value in fconfig.py')
exit()
else:
vakio = ra.randint(1,90000000)
klista = []
for i in range(vakio, amount + vakio):
# produces fixed length formatted string including checksum
output = str(prefix)+str(i).zfill(9)
klista.append(add_check_digit(int(output)))
return klista
def produce_xml(prefix, apoint):
"""
Input: integer, integer
Output: file
"""
try:
from xml.etree import ElementTree as et
import os
ns2 = './/{urn:fi:Datahub:mif:masterdata:E58_MasterDataMPEvent:elements:v1}'
ns3 = './/{urn:fi:Datahub:mif:common:HDR_Header:elements:v1}'
ns5 = './/{urn:fi:Datahub:mif:masterdata:E58_MasterDataMPEvent:elements:v1}'
datafile = 'libs/xml_template.xml'
if 'address' not in storage:
osoite = random_address().split(',')
storage['address'] = osoite
if not os.path.exists('xml'):
os.makedirs('xml')
tree = et.parse(datafile)
tree.find(ns3+'Identification').text = gen_id(True)
tree.find(ns3+'PhysicalSenderEnergyParty')[0].text = storage['DSO']
tree.find(ns3+'JuridicalSenderEnergyParty')[0].text = storage['DSO']
tree.find(ns3+'Creation').text = gen_timestamp() # Now
tree.find(ns5+'StartOfOccurrence').text = gen_timestamp('True') # Last midnight
tree.find(ns5+'MeteringPointUsedDomainLocation')[0].text = apoint
tree.find(ns5+'MeteringGridAreaUsedDomainLocation')[0].text = storage['MGA']
tree.find(ns5+'MeteringPointAddress')[1].text = storage['address'][1] # streetname
tree.find(ns5+'MeteringPointAddress')[2].text = str(ra.randint(1,100))
tree.find(ns5+'MeteringPointAddress')[5].text = storage['address'][0] # zip code
tree.find(ns5+'MeteringPointAddress')[6].text = storage['address'][2].strip() # City
tree.find(ns5+'MPDetailMeteringPointCharacteristic')[0].text = storage['remote']
tree.find(ns5+'MPDetailMeteringPointCharacteristic')[1].text = storage['method']
tree.find(ns5+'MeteringPointUsedDomainLocation')[2].text = storage['type']
datafile = 'xml/apoint_{}.xml'.format(apoint)
tree.write(datafile)
except FileNotFoundError:
print("osoitteet.txt missing from libs directory.")
exit()
def output_apoint(jvy, kp_lkm, mga, aptype, remote, method):
"""
Input: list of integers
Output: file
"""
import os.path
if not jvy and kp_lkm: # cmd-line parameter test for jvy
print('Accounting point missing.')
exit()
elif not kp_lkm and jvy: # cmd-line parameter test for kp_lkm
print('Amount missing.')
exit()
elif not mga and jvy:
print('MGA missing.')
exit()
elif not aptype and jvy:
print('Accountin point type missing.')
exit()
elif not remote and jvy:
print('Remote readable status missing.')
exit()
elif not method and jvy:
print('Read method missing.')
exit()
try:
kplista = apoint_gen(jvy, kp_lkm, mga, aptype, remote, method)
if os.path.exists('kp.csv'):
kp_done = True
else:
kp_done = False
with open('kp.csv','a') as f:
if kp_done:
pass
else:
f.write('Accounting point,Metering Area,Supplier,DSO,MGA,ZIP,Street,City,AP type,Remote readable,Metering method\n')
prefix = kplista[0][:8]
ma = str(prefix) + '00000000' # Metering area
for apoint in kplista:
produce_xml(prefix, apoint)
supplier = csv_filler()
f.write("{},{},{},{},{},{},{},{},{},{},{}\n".format(apoint,ma,supplier,
storage['DSO'],
storage['MGA'],
storage['address'][0],
storage['address'][1],
storage['address'][2].strip(),
storage['type'],
storage['remote'],
storage['method'] ))
except TypeError:
print('Error: please add at least one dealer to config file!')
exit()
def main(argv):
import getopt
jvy = 0
kp_lkm = 0
mga = 0
aptype = 0
remote = 0
method = 0
if len(sys.argv) >1:
try:
opts, args = getopt.getopt(argv,"hl:j:m:t:r:M:",["kp_lkm=","jvy=","mga=", "aptype=", "remote=", "method="])
except getopt.GetoptError:
print('Usage:')
print ('kpgen.py -j <DSO> -m <MGA> -l <number of accounting points> -t type (AG01/AG02) -r remote readable (0/1) -M metering method (E13/E14/E16)')
exit()
for opt, arg in opts:
if opt in ('-l'):
kp_lkm = arg
elif opt in ('-j'):
jvy = arg
storage['DSO'] = jvy
elif opt in ('-m'):
mga = arg
storage['MGA'] = mga
elif opt in ('-t'):
aptype = arg
storage['type'] = aptype
elif opt in ('-r'):
remote = arg
storage['remote'] = remote
elif opt in ('-M'):
method = arg
storage['method'] = method
elif opt in ('-h'):
print('Usage:')
print('kpgen.py -j <DSO> -m <MGA> -l <number of accounting points> -t type (AG01/AG02) -r remote readable (0/1) -M metering method (E13/E14/E16)')
exit()
output_apoint(jvy, kp_lkm, mga, aptype, remote, method)
else:
output_apoint(None, None, None, None, None, None)
if __name__ == "__main__":
try:
main(sys.argv[1:])
except KeyboardInterrupt:
print("\n\nProgram cancelled by user!")
exit()
except EOFError:
print("\n\nProgram cancelled by user!")
exit()