From 9573cdd06dce41519394f0c46411671752f0497c Mon Sep 17 00:00:00 2001 From: David Winder Date: Mon, 17 Aug 2020 12:10:30 +0300 Subject: [PATCH] PLAT-11081: add upload folder target --- liveRecorder/Tasks/ConcatinationTask.py | 11 +++++++---- liveRecorder/Tasks/TaskBase.py | 15 +++++++++++---- liveRecorder/Tasks/UploadTask.py | 7 ++++--- 3 files changed, 22 insertions(+), 11 deletions(-) diff --git a/liveRecorder/Tasks/ConcatinationTask.py b/liveRecorder/Tasks/ConcatinationTask.py index 313866e8..761af87b 100644 --- a/liveRecorder/Tasks/ConcatinationTask.py +++ b/liveRecorder/Tasks/ConcatinationTask.py @@ -32,8 +32,6 @@ class ConcatenationTask(TaskBase): def __init__(self, param, logger_info): TaskBase.__init__(self, param, logger_info) - concat_task_processing_dir = os.path.join(self.base_directory, self.__class__.__name__, 'processing') - self.recording_path = os.path.join(concat_task_processing_dir, self.entry_directory) self.token_url = self.token_url_template.format(self.get_live_type(), self.recorded_id) self.nginx_url = "http://" + self.token_url + "t/{0}" self.flavor_pattern = 'index-s(?P\d+)' @@ -141,6 +139,11 @@ def run(self): flavors_list = self.extract_flavor_dict() flavors_list.sort(key=lambda flavor: flavor.bandwidth, reverse=True) + ts_recording_folder = self.recording_path_target + mp4_recording_folder = self.recording_path_target + if self.entry_config["should_convert_to_mp4"]: + ts_recording_folder = os.path.join(self.recording_path) + for obj in flavors_list: if obj.audio_language_track and self.entry_config["upload_only_source"]: continue @@ -150,8 +153,8 @@ def run(self): if flavor_id is None: raise ValueError('Could not find flavor ID for {}'.format(obj.url)) ts_output_filename = self.get_output_filename(flavor_id) - output_full_path = os.path.join(self.recording_path, ts_output_filename) - mp4_full_path = output_full_path.replace('.ts', '.mp4') + output_full_path = os.path.join(ts_recording_folder, ts_output_filename) + mp4_full_path = os.path.join(mp4_recording_folder, ts_output_filename).replace('.ts', '.mp4') command = command + ' ' + output_full_path + ' ' + mp4_full_path + ' ' + obj.language if os.path.isfile(output_full_path): self.logger.warn("file [%s] already exist", output_full_path) diff --git a/liveRecorder/Tasks/TaskBase.py b/liveRecorder/Tasks/TaskBase.py index 1814510c..a37ca3ef 100644 --- a/liveRecorder/Tasks/TaskBase.py +++ b/liveRecorder/Tasks/TaskBase.py @@ -13,6 +13,7 @@ class TaskBase(object): hostname = gethostname() base_directory = os.path.join(get_config('recording_base_dir'), hostname) + recording_target_base_dir = get_config('recording_target_base_dir') cron_jon_stamp = get_config('cron_jon_stamp') def check_stamp(self): with open(self.stamp_full_path, "r") as stamp_file: # w+ since we truncated the file @@ -47,14 +48,20 @@ def update_status(self, new_status): def __init__(self, param, logger_info): self.duration = param['duration'] self.recorded_id = param['recorded_id'] - self.entry_directory = param['directory'] + entry_directory = param['directory'] self.entry_id = param['entry_id'] # set job name as log header self.log_header = "{}_{}_{}".format(self.entry_id, self.recorded_id, self.duration) self.logger = logger_decorator(self.__class__.__name__, logger_info) - self.output_filename = self.entry_directory - self.recording_path = os.path.join(self.base_directory, self.__class__.__name__, 'processing', - self.entry_directory) + self.output_filename = entry_directory + self.recording_path = os.path.join(self.base_directory, self.__class__.__name__, 'processing', entry_directory) + + self.recording_path_target = self.recording_path + if self.recording_target_base_dir is not None: + self.recording_path_target = os.path.join(self.recording_target_base_dir, entry_directory) + if not os.path.exists(self.recording_path_target): + os.makedirs(self.recording_path_target) + self.stamp_full_path = os.path.join(self.recording_path, 'stamp') self.data_full_path = os.path.join(self.recording_path, 'data.json') self.data = self.get_data() diff --git a/liveRecorder/Tasks/UploadTask.py b/liveRecorder/Tasks/UploadTask.py index fc4270ff..d1a6570c 100644 --- a/liveRecorder/Tasks/UploadTask.py +++ b/liveRecorder/Tasks/UploadTask.py @@ -29,7 +29,7 @@ def __init__(self, param, logger_info): glob_pattern = param['directory'] + '_f*_out.' + file_extention self.filename_pattern = "[0,1]_.+_[0,1]_.+_\d+(.\d+)?_f(?P\d+)_out[.]" + file_extention - self.flavors_files_list = glob.glob1(self.recording_path, glob_pattern) + self.flavors_files_list = glob.glob1(self.recording_path_target, glob_pattern) def get_chunks_to_upload(self, file_size): @@ -119,7 +119,7 @@ def run(self): self.logger.error(error) raise ValueError(error) flavor_id = result.group('flavor_id') - file_full_path = os.path.join(self.recording_path, flavor_file_name) + file_full_path = os.path.join(self.recording_path_target, flavor_file_name) if mode == 'remote': self.upload_file(file_full_path, flavor_id, is_first_flavor) if mode == 'local': @@ -139,7 +139,8 @@ def run(self): err.code = code raise err else: - self.logger.warn('there were no mp4 files to upload. check {}'.format(self.recording_path)) + self.logger.warn('there were no mp4 files to upload. check {}'.format(self.recording_path_target)) + # delete self.recording_path_target folder self.update_status(KalturaEntryServerNodeStatus.TASK_FINISHED) except KalturaException as e: self.logger.error('failed to upload VOD with error {}, exception details: {}'.format(e.code, e.message))