Skip to content

Commit

Permalink
SUP-18799 (#614)
Browse files Browse the repository at this point in the history
* add support for https

* PLAT-9684

* SUP-18799

* fixes

* code review

* fixes

* a

* fix bug
  • Loading branch information
kalturaguy authored Nov 20, 2019
1 parent ea89164 commit c081a07
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 23 deletions.
5 changes: 4 additions & 1 deletion lib/kaltura-client-lib/KalturaClientBase.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

var crypto = require('crypto');
var http = require('http');
var https = require('https');
var path = require('path');
var url = require('url');
var fs = require('fs');
Expand Down Expand Up @@ -373,7 +374,8 @@ KalturaClientBase.prototype.encodeFile = function(boundary, type, name, filename

KalturaClientBase.prototype.sendRequestHelper = function (options, body, requestIndex, onCompleteCallback, timeout) {
var This = this;
var request = http.request(options, function(response) {
let httpModule= options.protocol.startsWith("https:") ? https : http;
var request = httpModule.request(options, function(response) {
response.setEncoding('utf8');

var data = '';
Expand Down Expand Up @@ -447,6 +449,7 @@ KalturaClientBase.prototype.doHttpRequest = function (callCompletedCallback, req
this.log('Request [' + requestIndex + ']: ' + debugUrl);

var options = {
protocol: urlInfo.protocol,
host : urlInfo.host,
path : urlInfo.path,
method : 'POST'
Expand Down
17 changes: 15 additions & 2 deletions liveRecorder/Tasks/ConcatinationTask.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@
import urllib2

import m3u8

Flavor = collections.namedtuple('Flavor', 'url language bandwidth')

from Iso639Wrapper import Iso639Wrapper

from Config.config import get_config
Expand All @@ -17,7 +20,6 @@

# todo add timeout, and use m3u8 insted of regex

Flavor = collections.namedtuple('Flavor', 'url language')


class ConcatenationTask(TaskBase):
Expand Down Expand Up @@ -61,6 +63,7 @@ def extract_flavor_dict(self):
for element in m3u8_obj.playlists:
flavors_list.append(Flavor(
url=element.absolute_uri,
bandwidth=element.stream_info.bandwidth,
language='und'
))

Expand All @@ -71,6 +74,7 @@ def extract_flavor_dict(self):
language = self.iso639_wrapper.convert_language_to_iso639_3(unicode(language))
flavors_list.append(Flavor(
url=element.absolute_uri,
bandwidth=element.stream_info.bandwidth,
language=language
))
return flavors_list
Expand Down Expand Up @@ -133,6 +137,7 @@ def run(self):
self.url_base_entry = self.nginx_url.format(token)
self.url_master = os.path.join(self.url_base_entry, 'master.m3u8')
flavors_list = self.extract_flavor_dict()
flavors_list.sort(key=lambda flavor: flavor.bandwidth, reverse=True)

for obj in flavors_list:
url_postfix = obj.url.rsplit('/', 1)[1]
Expand All @@ -145,13 +150,21 @@ def run(self):
command = command + ' ' + output_full_path + ' ' + mp4_full_path + ' ' + obj.language
if os.path.isfile(output_full_path):
self.logger.warn("file [%s] already exist", output_full_path)

if self.entry_config["upload_only_source"]:
break

continue
playlist = self.download_file(obj.url)
self.logger.debug("load recording manifest : \n %s ", playlist)
chunks = m3u8.loads(playlist).files
self.download_chunks_and_concat(chunks, output_full_path)
self.logger.info("Successfully concat %d files into %s", len(chunks), output_full_path)
self.convert_ts_to_mp4(command)
if self.entry_config["upload_only_source"]:
break

if self.entry_config["should_convert_to_mp4"]:
self.convert_ts_to_mp4(command)

def convert_ts_to_mp4(self, command):

Expand Down
4 changes: 2 additions & 2 deletions liveRecorder/Tasks/KalturaUploadSession.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@
from Config.config import get_config

class KalturaUploadSession:
def __init__(self, file_name, file_size, chunks_to_upload, entry_id, recorded_id, backend_client, logger, infile):
def __init__(self, file_name, file_size, chunks_to_upload, entry_id, parnter_id, recorded_id, backend_client, logger, infile):
self.infile = infile
self.file_name = file_name
self.logger = logger
self.file_size = file_size
self.chunks_to_upload = chunks_to_upload
self.partner_id = backend_client.get_live_entry(entry_id).partnerId
self.partner_id = parnter_id
self.recorded_id = recorded_id
self.entry_id = entry_id
self.backend_client = backend_client
Expand Down
15 changes: 13 additions & 2 deletions liveRecorder/Tasks/TaskBase.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from RecordingException import UnequallStampException
from BackendClient import *
from KalturaClient.Plugins.Core import KalturaEntryServerNodeStatus

import json


class TaskBase(object):
Expand Down Expand Up @@ -44,7 +44,6 @@ def update_status(self, new_status):
self.logger.debug("Updating taskId: [{}] with new status: [{}]".format(id, new_status))
self.backend_client.update_task_entryServerNode_status(id, new_status)


def __init__(self, param, logger_info):
self.duration = param['duration']
self.recorded_id = param['recorded_id']
Expand All @@ -60,6 +59,18 @@ def __init__(self, param, logger_info):
self.data_full_path = os.path.join(self.recording_path, 'data.json')
self.data = self.get_data()
self.backend_client = BackendClient(self.entry_id + '-' + self.recorded_id)
self.live_entry = self.backend_client.get_live_entry(self.entry_id)
self.recorded_entry = self.backend_client.get_recorded_entry(self.live_entry.partnerId, self.recorded_id)
self.entry_config = {
"upload_only_source": False,
"should_convert_to_mp4": True,
}

if self.live_entry.conversionProfileId != self.recorded_entry.conversionProfileId:
self.entry_config["upload_only_source"] = True
self.entry_config["should_convert_to_mp4"] = False

self.logger.info("Entry config for {}: {} {} {} {}".format(self.entry_id, self.entry_config,self.live_entry.conversionProfileId, self.recorded_entry.conversionProfileId,self.recorded_id))


__metaclass__ = abc.ABCMeta
Expand Down
43 changes: 27 additions & 16 deletions liveRecorder/Tasks/UploadTask.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,18 @@ class UploadTask(TaskBase):

upload_token_buffer_size = get_config('upload_token_buffer_size_mb', 'int') * 1000000 # buffer is in MB


def __init__(self, param, logger_info):
TaskBase.__init__(self, param, logger_info)
mp4_filename_pattern = param['directory'] + '_f*_out.mp4'
self.mp4_files_list = glob.glob1(self.recording_path, mp4_filename_pattern)
self.mp4_filename_pattern = "[0,1]_.+_[0,1]_.+_\d+(.\d+)?_f(?P<flavor_id>\d+)_out[.]mp4"

file_extention = "mp4"
if not self.entry_config.get('should_convert_to_mp4', True):
file_extention = "ts"

glob_pattern = param['directory'] + '_f*_out.' + file_extention
self.filename_pattern = "[0,1]_.+_[0,1]_.+_\d+(.\d+)?_f(?P<flavor_id>\d+)_out[.]" + file_extention

self.flavors_files_list = glob.glob1(self.recording_path, glob_pattern)


def get_chunks_to_upload(self, file_size):
Expand All @@ -39,7 +46,8 @@ def upload_file(self, file_name, flavor_id, is_first_flavor):
with io.open(file_name, 'rb') as infile:

upload_session = KalturaUploadSession(file_name, file_size, chunks_to_upload, self.entry_id,
self.recorded_id, self.backend_client, self.logger, infile)
self.live_entry.partnerId, self.recorded_id, self.backend_client,
self.logger, infile)
if chunks_to_upload > 2:
chunk = upload_session.get_next_chunk()
if chunk is not None:
Expand Down Expand Up @@ -79,17 +87,20 @@ def upload_file(self, file_name, flavor_id, is_first_flavor):

def check_replacement_status(self, partner_id):
self.logger.debug("About to check replacement status for [%s]", self.recorded_id)
recorded_obj = self.backend_client.get_recorded_entry(partner_id, self.recorded_id)
self.logger.debug("Got replacement Status: %s", recorded_obj.replacementStatus.value)
if recorded_obj.replacementStatus.value != KalturaEntryReplacementStatus.NONE:
self.logger.debug("Got replacement Status: %s", self.recorded_entry.replacementStatus.value)
if self.recorded_entry.replacementStatus.value != KalturaEntryReplacementStatus.NONE:
self.logger.info("entry %s has replacementStatus %s, calling cancel_replace", self.recorded_id,
recorded_obj.replacementStatus)
self.recorded_entry.replacementStatus)
self.backend_client.cancel_replace(partner_id, self.recorded_id)

def append_recording_handler(self, file_full_path, flavor_id, is_first_flavor):
partner_id = self.backend_client.get_live_entry(self.entry_id).partnerId
partner_id = self.live_entry.partnerId
if is_first_flavor:
self.check_replacement_status(partner_id)

if not self.entry_config.get('should_convert_to_mp4', True):
flavor_id = None

self.backend_client.set_recorded_content_local(partner_id, self.entry_id, file_full_path,
str(float(self.duration)/1000), self.recorded_id, flavor_id)

Expand All @@ -100,15 +111,15 @@ def run(self):
is_first_flavor = True
count_uploaded_mp4 = 0
code = ''
for mp4 in self.mp4_files_list:
for flavor_file_name in self.flavors_files_list:
try:
result = re.search(self.mp4_filename_pattern, mp4)
result = re.search(self.filename_pattern, flavor_file_name)
if not result or not result.group('flavor_id'):
error = "Error running upload task, failed to parse flavor id from filename: [{0}]".format(mp4)
error = "Error running upload task, failed to parse flavor id from filename: [{0}]".format(flavor_file_name)
self.logger.error(error)
raise ValueError(error)
flavor_id = result.group('flavor_id')
file_full_path = os.path.join(self.recording_path, mp4)
file_full_path = os.path.join(self.recording_path, flavor_file_name)
if mode == 'remote':
self.upload_file(file_full_path, flavor_id, is_first_flavor)
if mode == 'local':
Expand All @@ -118,12 +129,12 @@ def run(self):
except KalturaException as e:
code = e.code
if e.code == 'FLAVOR_PARAMS_ID_NOT_FOUND':
self.logger.warn('{}, failed to upload {}, flavor id {}'.format(e.message, mp4, flavor_id))
self.logger.warn('{}, failed to upload {}, flavor id {}'.format(e.message, flavor_file_name, flavor_id))
else:
raise e
if count_uploaded_mp4 == 0:
if len(self.mp4_files_list) > 0:
mp4_files = str(self.mp4_files_list)
if len(self.flavors_files_list) > 0:
mp4_files = str(self.flavors_files_list)
err = Exception('failed to upload any of {} check log errors'.format(mp4_files))
err.code = code
raise err
Expand Down

0 comments on commit c081a07

Please sign in to comment.