From c081a07341460071a695ffa00637ee597f96b092 Mon Sep 17 00:00:00 2001 From: kalturaguy Date: Wed, 20 Nov 2019 11:34:55 +0200 Subject: [PATCH] SUP-18799 (#614) * add support for https * PLAT-9684 * SUP-18799 * fixes * code review * fixes * a * fix bug --- lib/kaltura-client-lib/KalturaClientBase.js | 5 ++- liveRecorder/Tasks/ConcatinationTask.py | 17 +++++++- liveRecorder/Tasks/KalturaUploadSession.py | 4 +- liveRecorder/Tasks/TaskBase.py | 15 ++++++- liveRecorder/Tasks/UploadTask.py | 43 +++++++++++++-------- 5 files changed, 61 insertions(+), 23 deletions(-) diff --git a/lib/kaltura-client-lib/KalturaClientBase.js b/lib/kaltura-client-lib/KalturaClientBase.js index d6f8eabf..6a402ca4 100644 --- a/lib/kaltura-client-lib/KalturaClientBase.js +++ b/lib/kaltura-client-lib/KalturaClientBase.js @@ -29,6 +29,7 @@ var crypto = require('crypto'); var http = require('http'); +var https = require('https'); var path = require('path'); var url = require('url'); var fs = require('fs'); @@ -373,7 +374,8 @@ KalturaClientBase.prototype.encodeFile = function(boundary, type, name, filename KalturaClientBase.prototype.sendRequestHelper = function (options, body, requestIndex, onCompleteCallback, timeout) { var This = this; - var request = http.request(options, function(response) { + let httpModule= options.protocol.startsWith("https:") ? https : http; + var request = httpModule.request(options, function(response) { response.setEncoding('utf8'); var data = ''; @@ -447,6 +449,7 @@ KalturaClientBase.prototype.doHttpRequest = function (callCompletedCallback, req this.log('Request [' + requestIndex + ']: ' + debugUrl); var options = { + protocol: urlInfo.protocol, host : urlInfo.host, path : urlInfo.path, method : 'POST' diff --git a/liveRecorder/Tasks/ConcatinationTask.py b/liveRecorder/Tasks/ConcatinationTask.py index 559ad04e..70aacf05 100644 --- a/liveRecorder/Tasks/ConcatinationTask.py +++ b/liveRecorder/Tasks/ConcatinationTask.py @@ -7,6 +7,9 @@ import urllib2 import m3u8 + +Flavor = collections.namedtuple('Flavor', 'url language bandwidth') + from Iso639Wrapper import Iso639Wrapper from Config.config import get_config @@ -17,7 +20,6 @@ # todo add timeout, and use m3u8 insted of regex -Flavor = collections.namedtuple('Flavor', 'url language') class ConcatenationTask(TaskBase): @@ -61,6 +63,7 @@ def extract_flavor_dict(self): for element in m3u8_obj.playlists: flavors_list.append(Flavor( url=element.absolute_uri, + bandwidth=element.stream_info.bandwidth, language='und' )) @@ -71,6 +74,7 @@ def extract_flavor_dict(self): language = self.iso639_wrapper.convert_language_to_iso639_3(unicode(language)) flavors_list.append(Flavor( url=element.absolute_uri, + bandwidth=element.stream_info.bandwidth, language=language )) return flavors_list @@ -133,6 +137,7 @@ def run(self): self.url_base_entry = self.nginx_url.format(token) self.url_master = os.path.join(self.url_base_entry, 'master.m3u8') flavors_list = self.extract_flavor_dict() + flavors_list.sort(key=lambda flavor: flavor.bandwidth, reverse=True) for obj in flavors_list: url_postfix = obj.url.rsplit('/', 1)[1] @@ -145,13 +150,21 @@ def run(self): command = command + ' ' + output_full_path + ' ' + mp4_full_path + ' ' + obj.language if os.path.isfile(output_full_path): self.logger.warn("file [%s] already exist", output_full_path) + + if self.entry_config["upload_only_source"]: + break + continue playlist = self.download_file(obj.url) self.logger.debug("load recording manifest : \n %s ", playlist) chunks = m3u8.loads(playlist).files self.download_chunks_and_concat(chunks, output_full_path) self.logger.info("Successfully concat %d files into %s", len(chunks), output_full_path) - self.convert_ts_to_mp4(command) + if self.entry_config["upload_only_source"]: + break + + if self.entry_config["should_convert_to_mp4"]: + self.convert_ts_to_mp4(command) def convert_ts_to_mp4(self, command): diff --git a/liveRecorder/Tasks/KalturaUploadSession.py b/liveRecorder/Tasks/KalturaUploadSession.py index d9278f9c..6d0c4363 100644 --- a/liveRecorder/Tasks/KalturaUploadSession.py +++ b/liveRecorder/Tasks/KalturaUploadSession.py @@ -2,13 +2,13 @@ from Config.config import get_config class KalturaUploadSession: - def __init__(self, file_name, file_size, chunks_to_upload, entry_id, recorded_id, backend_client, logger, infile): + def __init__(self, file_name, file_size, chunks_to_upload, entry_id, parnter_id, recorded_id, backend_client, logger, infile): self.infile = infile self.file_name = file_name self.logger = logger self.file_size = file_size self.chunks_to_upload = chunks_to_upload - self.partner_id = backend_client.get_live_entry(entry_id).partnerId + self.partner_id = parnter_id self.recorded_id = recorded_id self.entry_id = entry_id self.backend_client = backend_client diff --git a/liveRecorder/Tasks/TaskBase.py b/liveRecorder/Tasks/TaskBase.py index d8c6e302..1814510c 100644 --- a/liveRecorder/Tasks/TaskBase.py +++ b/liveRecorder/Tasks/TaskBase.py @@ -6,7 +6,7 @@ from RecordingException import UnequallStampException from BackendClient import * from KalturaClient.Plugins.Core import KalturaEntryServerNodeStatus - +import json class TaskBase(object): @@ -44,7 +44,6 @@ def update_status(self, new_status): self.logger.debug("Updating taskId: [{}] with new status: [{}]".format(id, new_status)) self.backend_client.update_task_entryServerNode_status(id, new_status) - def __init__(self, param, logger_info): self.duration = param['duration'] self.recorded_id = param['recorded_id'] @@ -60,6 +59,18 @@ def __init__(self, param, logger_info): self.data_full_path = os.path.join(self.recording_path, 'data.json') self.data = self.get_data() self.backend_client = BackendClient(self.entry_id + '-' + self.recorded_id) + self.live_entry = self.backend_client.get_live_entry(self.entry_id) + self.recorded_entry = self.backend_client.get_recorded_entry(self.live_entry.partnerId, self.recorded_id) + self.entry_config = { + "upload_only_source": False, + "should_convert_to_mp4": True, + } + + if self.live_entry.conversionProfileId != self.recorded_entry.conversionProfileId: + self.entry_config["upload_only_source"] = True + self.entry_config["should_convert_to_mp4"] = False + + self.logger.info("Entry config for {}: {} {} {} {}".format(self.entry_id, self.entry_config,self.live_entry.conversionProfileId, self.recorded_entry.conversionProfileId,self.recorded_id)) __metaclass__ = abc.ABCMeta diff --git a/liveRecorder/Tasks/UploadTask.py b/liveRecorder/Tasks/UploadTask.py index 2419aebf..fc4270ff 100644 --- a/liveRecorder/Tasks/UploadTask.py +++ b/liveRecorder/Tasks/UploadTask.py @@ -18,11 +18,18 @@ class UploadTask(TaskBase): upload_token_buffer_size = get_config('upload_token_buffer_size_mb', 'int') * 1000000 # buffer is in MB + def __init__(self, param, logger_info): TaskBase.__init__(self, param, logger_info) - mp4_filename_pattern = param['directory'] + '_f*_out.mp4' - self.mp4_files_list = glob.glob1(self.recording_path, mp4_filename_pattern) - self.mp4_filename_pattern = "[0,1]_.+_[0,1]_.+_\d+(.\d+)?_f(?P\d+)_out[.]mp4" + + file_extention = "mp4" + if not self.entry_config.get('should_convert_to_mp4', True): + file_extention = "ts" + + glob_pattern = param['directory'] + '_f*_out.' + file_extention + self.filename_pattern = "[0,1]_.+_[0,1]_.+_\d+(.\d+)?_f(?P\d+)_out[.]" + file_extention + + self.flavors_files_list = glob.glob1(self.recording_path, glob_pattern) def get_chunks_to_upload(self, file_size): @@ -39,7 +46,8 @@ def upload_file(self, file_name, flavor_id, is_first_flavor): with io.open(file_name, 'rb') as infile: upload_session = KalturaUploadSession(file_name, file_size, chunks_to_upload, self.entry_id, - self.recorded_id, self.backend_client, self.logger, infile) + self.live_entry.partnerId, self.recorded_id, self.backend_client, + self.logger, infile) if chunks_to_upload > 2: chunk = upload_session.get_next_chunk() if chunk is not None: @@ -79,17 +87,20 @@ def upload_file(self, file_name, flavor_id, is_first_flavor): def check_replacement_status(self, partner_id): self.logger.debug("About to check replacement status for [%s]", self.recorded_id) - recorded_obj = self.backend_client.get_recorded_entry(partner_id, self.recorded_id) - self.logger.debug("Got replacement Status: %s", recorded_obj.replacementStatus.value) - if recorded_obj.replacementStatus.value != KalturaEntryReplacementStatus.NONE: + self.logger.debug("Got replacement Status: %s", self.recorded_entry.replacementStatus.value) + if self.recorded_entry.replacementStatus.value != KalturaEntryReplacementStatus.NONE: self.logger.info("entry %s has replacementStatus %s, calling cancel_replace", self.recorded_id, - recorded_obj.replacementStatus) + self.recorded_entry.replacementStatus) self.backend_client.cancel_replace(partner_id, self.recorded_id) def append_recording_handler(self, file_full_path, flavor_id, is_first_flavor): - partner_id = self.backend_client.get_live_entry(self.entry_id).partnerId + partner_id = self.live_entry.partnerId if is_first_flavor: self.check_replacement_status(partner_id) + + if not self.entry_config.get('should_convert_to_mp4', True): + flavor_id = None + self.backend_client.set_recorded_content_local(partner_id, self.entry_id, file_full_path, str(float(self.duration)/1000), self.recorded_id, flavor_id) @@ -100,15 +111,15 @@ def run(self): is_first_flavor = True count_uploaded_mp4 = 0 code = '' - for mp4 in self.mp4_files_list: + for flavor_file_name in self.flavors_files_list: try: - result = re.search(self.mp4_filename_pattern, mp4) + result = re.search(self.filename_pattern, flavor_file_name) if not result or not result.group('flavor_id'): - error = "Error running upload task, failed to parse flavor id from filename: [{0}]".format(mp4) + error = "Error running upload task, failed to parse flavor id from filename: [{0}]".format(flavor_file_name) self.logger.error(error) raise ValueError(error) flavor_id = result.group('flavor_id') - file_full_path = os.path.join(self.recording_path, mp4) + file_full_path = os.path.join(self.recording_path, flavor_file_name) if mode == 'remote': self.upload_file(file_full_path, flavor_id, is_first_flavor) if mode == 'local': @@ -118,12 +129,12 @@ def run(self): except KalturaException as e: code = e.code if e.code == 'FLAVOR_PARAMS_ID_NOT_FOUND': - self.logger.warn('{}, failed to upload {}, flavor id {}'.format(e.message, mp4, flavor_id)) + self.logger.warn('{}, failed to upload {}, flavor id {}'.format(e.message, flavor_file_name, flavor_id)) else: raise e if count_uploaded_mp4 == 0: - if len(self.mp4_files_list) > 0: - mp4_files = str(self.mp4_files_list) + if len(self.flavors_files_list) > 0: + mp4_files = str(self.flavors_files_list) err = Exception('failed to upload any of {} check log errors'.format(mp4_files)) err.code = code raise err