-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathasync_runner.py
168 lines (132 loc) · 5 KB
/
async_runner.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
""" Main file with routines to run Listener
"""
import random
import threading
import time
from queue import Queue
from signal import SIGINT, signal
from pony.orm.dbapiprovider import DatabaseError
from ImHearing import audio, logger, post_recording, pre_recording, reader
from ImHearing.database import models
# Configurations Sections
GLOBAL_CONFIG, global_ret = reader.global_config()
AWS_CONFIG, aws_ret = reader.aws_config()
DB_CONFIG, db_ret = reader.db_config()
# Queue for Threads
task_queue = Queue(maxsize=1)
# Semaphore to indicate that Thread is uploading a file and cannot be killed
thread_uploading_archive = False
# Global list to Handle thread inside Signal Handler
thread_list = []
# Variable used to Stop the Thread
thread_run = True
if global_ret < 0 or aws_ret < 0 or db_ret < 0:
print("-- Some Error Found when Reading Config File --")
if global_ret < 0:
print(GLOBAL_CONFIG)
elif aws_ret < 0:
print(AWS_CONFIG)
elif db_ret < 0:
print(DB_CONFIG)
exit(-1)
# Defines the DB Connection to Pony
try:
db = models.define_db(
provider='sqlite',
filename=DB_CONFIG['db_path'],
create_db=True
)
except DatabaseError as e:
print("ERROR: {}".format(e))
print("-- Recreate the DB or Try some DB Recovery Utility --")
exit(-1)
def exit_handler(signal_received, frame):
"""
We need to keep the DB consistent in case of a CTRL+C. Also, we perform the
upload of remaining archives and a cleanup.
"""
my_logger = logger.get_logger("exit_handler", GLOBAL_CONFIG['log_file'])
my_logger.info("Terminating - Reveived Signal {}".format(signal_received))
# Stop Threads
if len(thread_list) > 0:
current_thread = thread_list.pop()
while thread_uploading_archive and current_thread.is_alive():
my_logger.info(
"Terminating - Waiting Thread to Upload".format(signal_received))
time.sleep(10)
if current_thread.is_alive():
my_logger.info(
"Terminating Thread".format(signal_received))
thread_run = False
# Archive
post_recording.archive_records(db, GLOBAL_CONFIG)
# --> Clean Up Routine Here
post_recording.remove_uploaded_records(db)
post_recording.remove_uploaded_archives(db)
exit(0)
def processing():
"""
This routine is the main consumer to the Queue. Every time an item is added
to the Queue, this routine consumes it.
"""
while True and thread_run:
if not task_queue.empty():
# Set the semaphore
thread_uploading_archive = True
# Archive, Upload & Remove Records & Archives
post_recording.archive_records(db, GLOBAL_CONFIG)
# Uploading check
up_arch = post_recording.upload_archive(db, AWS_CONFIG)
up_count = 0
while up_arch is False and up_count <= 10:
wait_time = random.randint(10, 90)
time.sleep(wait_time)
up_arch = post_recording.upload_archive(db, AWS_CONFIG)
up_count += 1
if up_count > 10:
exit(-1)
post_recording.remove_uploaded_archives(db)
post_recording.remove_uploaded_records(db)
# Removes task from queue after finishing all tasks and Release Sem
thread_uploading_archive = False
task_queue.get()
return
def main():
main_logger = logger.get_logger("runner", GLOBAL_CONFIG['log_file'])
while True:
if pre_recording.check_aws_budget(db, AWS_CONFIG) < 0:
main_logger.error(
" -- AWS Budget {} exceeded -- ".format(
AWS_CONFIG['budget_cost']))
exit(-1)
perform_cleanup_routines = False
if pre_recording.check_fs_usage(db, GLOBAL_CONFIG) < 0:
main_logger.warning(
" -- FS Usage Check, Exceeded {}MB -- ".format(
GLOBAL_CONFIG['storage_usage']))
perform_cleanup_routines = True
if pre_recording.check_record_count(db, GLOBAL_CONFIG) < 0:
main_logger.warning(
" -- Record Count Check, Exceeded {} --".format(
GLOBAL_CONFIG['records_count']))
perform_cleanup_routines = True
if perform_cleanup_routines and not task_queue.full():
main_logger.info(
" -- Starting CleanUP and Uploading Thread --"
)
task_id = str(random.randrange(0, 100000)).zfill(6)
task_queue.put(task_id)
main_logger.info(
" -- Queueing Task {} -- ".format(task_id)
)
record_obj = audio.start_recording(db, GLOBAL_CONFIG)
main_logger.info(
" -- Record {} Finished -- ".format(record_obj.path)
)
if __name__ == '__main__':
signal(SIGINT, exit_handler)
consumer_thread = threading.Thread(target=processing)
thread_list.append(consumer_thread)
# Start Threads
consumer_thread.start()
main()