diff --git a/common/PersistenceFormat.js b/common/PersistenceFormat.js index cb080893..3751f54b 100644 --- a/common/PersistenceFormat.js +++ b/common/PersistenceFormat.js @@ -42,6 +42,14 @@ class PersistenceFormat { return 'playlist.json'; } + getStampFileName() { + return 'stamp'; + } + + getDataFileName() { + return 'data.json'; + } + getMP4FileNamefromInfo(chunkPath) { return chunkPath.replace('.ts', '.mp4'); } diff --git a/common/config/config.json.template b/common/config/config.json.template index 94e000d5..b08bea84 100644 --- a/common/config/config.json.template +++ b/common/config/config.json.template @@ -9,6 +9,7 @@ "flavorFailPollingInterval" : 10000, "backendPollingInterval" : 5000, "wowzaPollingInterval" : 5000, + "tasksPollingIntervalInSeconds" : 1800, "adapterDiagnosticsIntervalInSec" : 20000, "defaultDvrWindowInSec" : 7200, "minimalDvrWindowInSec" : 900, @@ -121,5 +122,6 @@ "recordingMaxAllowedMp4FilesQueueSize" : 100 }, "explicitLivePushNotificationTemplateName": "EXPLICIT_LIVE_PUSH_NOTIFICATIONS", + "newClippingTaskPushNotificationTemplateName": "CLIPPING_TASK_ENTRY_SERVER_NODE_CREATED_NOTIFICATIONS", "recordingNotStartedMaxQueueSize": "11" } \ No newline at end of file diff --git a/lib/BackendClient.js b/lib/BackendClient.js index 889d2e8d..cf4d2aa7 100644 --- a/lib/BackendClient.js +++ b/lib/BackendClient.js @@ -568,6 +568,47 @@ var BackendClient = (function(){ return getLiveEntryServerNodes(entryId, filter); }; + BackendClient.getEntryServerNode = function(entryServerNodeId) { + let deferred = Q.defer(); + return createSession() + .then(function() { + client.entryServerNode.get(function (result, err, headers) { + printAPIResponse(result, headers); + if (!err && result && result.objectType !== "KalturaAPIException") { //Use to indicate success operation + logger.info("[%s] entry server node successfully retrieved [%j]", entryServerNodeId, result); + } + else + { + logger.error(util.format("[%s] Failed to retrieve entry server node: %s %j", entryServerNodeId, ErrorUtils.error2string(err), result)); + result = null; + } + deferred.resolve(result); + }, entryServerNodeId); + return deferred.promise; + }); + }; + + BackendClient.updateEntryServerNodeStatus = function(entryServerNodeId, newStatus) { + let deferred = Q.defer(); + return createSession() + .then(function() { + client.entryServerNode.updateStatus(function (result, err, headers) { + printAPIResponse(result, headers); + if (!err && result && result.objectType !== "KalturaAPIException") { //Use to indicate success operation + logger.info("[%s] entry server node successfully updated [%j]", entryServerNodeId, result); + deferred.resolve(result); + } + else + { + let msg = util.format("[%s] Failed to update Status of entry server node: %s %j", entryServerNodeId, ErrorUtils.error2string(err), result); + logger.error(msg); + deferred.reject(msg); + } + }, entryServerNodeId, newStatus); + return deferred.promise; + }); + }; + BackendClient.getLiveEntriesForMediaServer = function() { return createSession() .then(function() { @@ -584,6 +625,13 @@ var BackendClient = (function(){ }); }; + BackendClient.getMediaServerId = function() { + return createSession() + .then(function () { + return getMediaServerIdPromise(); + }); + }; + BackendClient.getFlavorsMultiStream = function(entryObject, flavors) { logger.debug("[%s] Entered getFlavorsMultiStream. Creating session and retrieving live flavors extra params", entryObject.entryId); return createSession() diff --git a/lib/TaskManager.js b/lib/TaskManager.js new file mode 100644 index 00000000..ac829836 --- /dev/null +++ b/lib/TaskManager.js @@ -0,0 +1,78 @@ +/** + * Created by david.winder on 04/15/2018. + */ + +const util = require('util'); +const BackendClient = require('./BackendClientFactory').getBackendClient(); +const KalturaTypes = require('./kaltura-client-lib/KalturaTypes'); +const kalturaVO = require('./kaltura-client-lib/KalturaVO'); +const logger = require('../common/logger').getLogger('TaskManager'); +const config = require('../common/Configuration'); +const pushManager = require('./PushManager').PushManager; + +const interval = config.get('tasksPollingIntervalInSeconds'); +const partnerId = config.get('backendClient').partnerId; + +class TaskManager { + + constructor() + { + this.callbackMap = {}; + this.mediaServerIdPromise = BackendClient.getMediaServerId(); + this.mediaServerIdPromise.then(serverNodeId => { + setTimeout(()=>{this._updateList(serverNodeId);}, interval * 1000); + }); + } + + register(entryServerNodeType, templateName, callback) + { + this.callbackMap[entryServerNodeType] = callback; + this.mediaServerIdPromise.then(serverNodeId => { + logger.debug("Register to Task with for " + templateName + " [" + entryServerNodeType + "] for partner: " + partnerId); + pushManager.registerObjectToPush(partnerId, templateName, templateName, {'serverNodeId': serverNodeId}, (...args) => this._pushMessageReceived(...args)); + pushManager.setObjectEnabled(templateName, true); + }); + } + + _execIfCan(task) + { + if (this.callbackMap[task.serverType]) + return this.callbackMap[task.serverType](task); + logger.info('Task with ID [' + task.id + '] cannot be execute - No callback in map'); + } + + _pushMessageReceived(msg) + { + if (!Array.isArray(msg) || msg.length < 1) + return logger.error('Got un-valid massage from push server: ' + msg); + + let task = msg[0]; + BackendClient.getEntryServerNode(task.id).then(entryServerNode => { + if (entryServerNode && entryServerNode.status == KalturaTypes.KalturaEntryServerNodeStatus.TASK_PENDING) + return this._execIfCan(task); + logger.info('Task with ID [' + task.id + '] should no executed'); + }); + } + + static _getEntryServerNodesList(serverNodeId, interval) + { + let filter = new kalturaVO.KalturaEntryServerNodeFilter(); + filter.serverNodeIdEqual = serverNodeId; + filter.statusIn = KalturaTypes.KalturaEntryServerNodeStatus.TASK_PENDING; + filter.createdAtLessThanOrEqual = - interval; + return BackendClient.getLiveEntryServerNodes(null, filter); + } + + _updateList(serverNodeId) + { + TaskManager._getEntryServerNodesList(serverNodeId, interval).then(entryServerNodes => { + for (var i = 0; i < entryServerNodes.length; i++) + this._execIfCan(entryServerNodes[i]); + }).finally(() => { + setTimeout(()=>{this._updateList(serverNodeId);}, interval * 1000); + }); + } + +} + +module.exports.TaskManager = new TaskManager(); \ No newline at end of file diff --git a/lib/kaltura-client-lib/KalturaServices.js b/lib/kaltura-client-lib/KalturaServices.js index 19c65040..ce2d4f54 100644 --- a/lib/kaltura-client-lib/KalturaServices.js +++ b/lib/kaltura-client-lib/KalturaServices.js @@ -2396,6 +2396,7 @@ KalturaEmailIngestionProfileService.prototype.addMediaEntry = function(callback, *Class definition for the Kaltura service: entryServerNode. * The available service actions: * @action update . + * @action updateStatus Update response profile status by id. * @action list . * @action get . */ @@ -2422,6 +2423,22 @@ KalturaEntryServerNodeService.prototype.update = function(callback, id, entrySer this.client.doQueue(callback); } }; + +/** + * . + * @param id int + * @param status int (optional, enum: KalturaEntryServerNodeStatus). + * @return KalturaEntryServerNode. + */ +KalturaEntryServerNodeService.prototype.updateStatus = function(callback, id, entryServerNodeStatus){ + var kparams = {}; + this.client.addParam(kparams, 'id', id); + this.client.addParam(kparams, 'status', entryServerNodeStatus); + this.client.queueServiceActionCall('entryservernode', 'updateStatus', kparams); + if (!this.client.isMultiRequest()){ + this.client.doQueue(callback); + } +}; /** * . * @param filter KalturaEntryServerNodeFilter (optional, default: null). diff --git a/lib/kaltura-client-lib/KalturaTypes.js b/lib/kaltura-client-lib/KalturaTypes.js index a632408e..b8a1e6ae 100644 --- a/lib/kaltura-client-lib/KalturaTypes.js +++ b/lib/kaltura-client-lib/KalturaTypes.js @@ -400,7 +400,12 @@ STOPPED : 0, PLAYABLE : 1, BROADCASTING : 2, AUTHENTICATED : 3, -MARKED_FOR_DELETION : 4 +MARKED_FOR_DELETION : 4, +TASK_PENDING : 5, +TASK_QUEUED : 6, +TASK_PROCESSING : 7, +TASK_UPLOADING : 8, +TASK_UPLOADING : 9, }; var KalturaEventNotificationTemplateStatus = module.exports.KalturaEventNotificationTemplateStatus = { @@ -732,7 +737,7 @@ PDF : 1, var KalturaRecordStatus = module.exports.KalturaRecordStatus = { DISABLED : 0, -APPENDED : 1, +APPENDED : 1, PER_SESSION : 2, }; @@ -2422,6 +2427,7 @@ UPDATED_AT_DESC : '-updatedAt', var KalturaEntryServerNodeType = module.exports.KalturaEntryServerNodeType = { LIVE_PRIMARY : '0', LIVE_BACKUP : '1', +LIVE_CLIPPING_TASK : '2', }; var KalturaEntryStatus = module.exports.KalturaEntryStatus = { diff --git a/lib/playlistGenerator/PlaylistGenerator.js b/lib/playlistGenerator/PlaylistGenerator.js index fa006d65..5977e809 100644 --- a/lib/playlistGenerator/PlaylistGenerator.js +++ b/lib/playlistGenerator/PlaylistGenerator.js @@ -688,4 +688,17 @@ PlaylistGenerator.prototype.sortFlavors = function(defaultAudioLang) } +PlaylistGenerator.getAbsoluteTimeFromRelative = function(playlist, relativeTime) +{ + let totalPlaylistDuration = playlist.durations.reduce((a, b) => a + b, 0); + //setting valid time + relativeTime = max(relativeTime ,0); + relativeTime = min(relativeTime, totalPlaylistDuration) + let i = 0; + while (relativeTime > playlist.durations[i]) + relativeTime -= playlist.durations[i++]; //search for the first clip that fits + return playlist.clipTimes[i] + relativeTime; +} + + module.exports = PlaylistGenerator; diff --git a/lib/recording/RecordingManager.js b/lib/recording/RecordingManager.js index b9c225a3..82d92499 100644 --- a/lib/recording/RecordingManager.js +++ b/lib/recording/RecordingManager.js @@ -20,6 +20,9 @@ const kalturaTypes = require('../kaltura-client-lib/KalturaTypes'); const unknownEntry = "UNKNOWN"; const backendClient = require('../BackendClientFactory.js').getBackendClient(); const maxAllowedMp4FilesQueueSize = config.get("recording").recordingMaxAllowedMp4FilesQueueSize; +const TaskManager = require('../TaskManager').TaskManager; +const completedRecordingFolderPath = configRecording.completedRecordingFolderPath; +const clippingTaskTemplateName = config.get('newClippingTaskPushNotificationTemplateName'); class RecordingManager { //todo what to do if we stop stream and start, but other machine has start to get it @@ -40,13 +43,14 @@ class RecordingManager { const recordingTimeInterval = configRecording.recordingTimeIntervalInSec * 1000; this.completedRecordingFolderPath = configRecording.completedRecordingFolderPath; - //veryfiy that recordingMaxDurationInHours is no longer that + //verify that recordingMaxDurationInHours is no longer that this.recordingMaxDurationInMs = config.get("recording").recordingMaxDurationInHours * 60 * 60 * 1000; this.recordingMinDurationInMS = config.get("recording").recordingMinDurationInSeconds * 1000; this.recordingList = {}; logger.info("Initializing recording manager"); - + TaskManager.register(kalturaTypes.KalturaEntryServerNodeType.LIVE_CLIPPING_TASK, clippingTaskTemplateName, this.handleNewLiveClippingTask); + setInterval(()=> { this.handleRecordingEntries() }, recordingTimeInterval, this); @@ -449,7 +453,7 @@ class RecordingManager { handleMaxDuration(entryId) { let handleMaxDurationPromise = null; - let recordingEntrySession = this.recordingList[entryId] + let recordingEntrySession = this.recordingList[entryId]; if (!recordingEntrySession) { logger.info(`[${entryId}] Entry does not exist in recording list. Maybe entry has passed the limit of max duration`); return Q.resolve(); @@ -480,6 +484,56 @@ class RecordingManager { }); } + handleNewLiveClippingTask(task) + { + logger.debug("TASK - in the recordingManager call back for task: " + util.inspect(task)); + let playListPath = path.join(persistenceFormat.getEntryBasePath(task.entryId), persistenceFormat.getMasterManifestName()); + let newPath = persistenceFormat.getRecordingSessionPath(task.entryId, null, task.clippedEntryId); + let newPathInLive = persistenceFormat.getEntryBasePath(task.clippedEntryId); + let duration = task.clipAttributes.duration; + let strDuration = duration.toString(); + let flavors = null; + + return qio.makeTree(newPath).then(() => { + return qio.read(playListPath); + }).then((data) => { + let playList = JSON.parse(data); + playList.clipFrom = PlaylistGenerator.getAbsoluteTimeFromRelative(playList, task.clipAttributes.offset); + playList.clipTo = PlaylistGenerator.getAbsoluteTimeFromRelative(playList, task.clipAttributes.offset + duration); + if (playList.clipFrom >= playList.clipTo) + { + logger.error("VALIDATION ERROR: can not set playlist attribute: clipFrom: [" +playList.clipFrom + "] and clipTo: [" + playList.clipTo + "]"); + backendClient.updateEntryServerNodeStatus(task.id, kalturaTypes.KalturaEntryServerNodeStatus.ERROR); + throw new Error("Error in setting playlist attribute for task id: " + task.id); + } + flavors = playList.sequences.map(elem => elem.id).join(','); + let newPlayListPath = path.join(newPath,persistenceFormat.getMasterManifestName()); + logger.debug("Saving new playlist with clip Attributes to: " + newPlayListPath); + return qio.write(newPlayListPath, JSON.stringify(playList)); + }).then(() => { + let stampPath = path.join(newPath,persistenceFormat.getStampFileName()); + logger.debug(`Creating Stamp file with [${strDuration}] on [${stampPath}]`); + return qio.write(stampPath, strDuration); + }).then(() => { + let dataFilePath = path.join(newPath,persistenceFormat.getDataFileName()); + let dataStr = JSON.stringify({"taskId": task.id, "taskType": task.serverType, "flavors": flavors}); + logger.debug(`Creating Data file with [${dataStr}] on [${dataFilePath}]`); + return qio.write(dataFilePath, dataStr); + }).then(() => { + logger.debug(`Creating softLink from [${newPathInLive}] to [${newPath}]`); + return qio.symbolicLink(newPathInLive, newPath, 'directory'); + }).then(() => { + let destFilePath = path.join(completedRecordingFolderPath, task.liveEntryId + '_' + task.clippedEntryId + '_' + strDuration); + logger.debug(`Creating softLink (for liveRecorder) from [${destFilePath}] to [${newPath}]`); + return qio.symbolicLink(destFilePath, newPath, 'directory') + }).then(() => { + logger.debug("Updating status of: " + task.id + " to QUEUE"); + return backendClient.updateEntryServerNodeStatus(task.id, kalturaTypes.KalturaEntryServerNodeStatus.TASK_QUEUED); + }).catch((err) => { + logger.error(`[${task.id}] failed processing task. Error [${ErrorUtils.error2string(err)}]`); + }); + } + } if (!configRecording.enable){ diff --git a/liveRecorder/BackendClient.py b/liveRecorder/BackendClient.py index f8f0180e..1a47f82f 100644 --- a/liveRecorder/BackendClient.py +++ b/liveRecorder/BackendClient.py @@ -155,7 +155,8 @@ def set_recorded_content_local(self, partner_id, entry_id, output_file, duration resource.keepOriginalFile = False self.set_recorded_content(entry_id, resource, duration, partner_id, recorded_id, flavor_id) - + def update_task_entryServerNode_status(self, entry_server_node_id, new_status): + return self.handle_request(self.partner_id, 'entryServerNode', 'updateStatus', entry_server_node_id, new_status) diff --git a/liveRecorder/Config/configMapping.ini.template b/liveRecorder/Config/configMapping.ini.template index 006b6f96..e66a9199 100644 --- a/liveRecorder/Config/configMapping.ini.template +++ b/liveRecorder/Config/configMapping.ini.template @@ -4,7 +4,7 @@ admin_secret = @KALTURA_PARTNER_ADMIN_SECRET@ partner_id = @KALTURA_PARTNER_ID@ mode = @VOD_UPLOAD_MODE@ token_key = @LIVE_PACKAGER_TOKEN@ -cron_job_log_file_name = "@LOGS_BASE_PATH@/recording_cron.log" -log_file_name = "@LOGS_BASE_PATH@/liveRecorder.log" -recover_log_file_name = "@LOGS_BASE_PATH@/recorder.log" +cron_job_log_file_name = @LOGS_BASE_PATH@/recording_cron.log +log_file_name = @LOGS_BASE_PATH@/liveRecorder.log +recover_log_file_name = @LOGS_BASE_PATH@/recorder.log api_service_url = @KALTURA_SERVICE_URL@ \ No newline at end of file diff --git a/liveRecorder/KalturaClient/Plugins/Core.py b/liveRecorder/KalturaClient/Plugins/Core.py index 5b5cd42e..b2e0422f 100644 --- a/liveRecorder/KalturaClient/Plugins/Core.py +++ b/liveRecorder/KalturaClient/Plugins/Core.py @@ -346,10 +346,18 @@ def getValue(self): # @package Kaltura # @subpackage Client class KalturaEntryServerNodeStatus(object): + ERROR = -1 STOPPED = 0 PLAYABLE = 1 BROADCASTING = 2 AUTHENTICATED = 3 + MARKED_FOR_DELETION = 4 + TASK_PENDING = 5 + TASK_QUEUED = 6 + TASK_PROCESSING = 7 + TASK_UPLOADING = 8 + TASK_FINISHED = 9 + def __init__(self, value): self.value = value @@ -2507,6 +2515,7 @@ def getValue(self): class KalturaEntryServerNodeType(object): LIVE_PRIMARY = "0" LIVE_BACKUP = "1" + LIVE_CLIPPING_TASK = "2" def __init__(self, value): self.value = value @@ -36118,6 +36127,41 @@ def getLastUpdatedCategoryCreatedAt(self): def setLastUpdatedCategoryCreatedAt(self, newLastUpdatedCategoryCreatedAt): self.lastUpdatedCategoryCreatedAt = newLastUpdatedCategoryCreatedAt +# @package Kaltura +# @subpackage Client +class KalturaTaskEntryServerNode(KalturaEntryServerNode): + def __init__(self, + id=NotImplemented, + entryId=NotImplemented, + serverNodeId=NotImplemented, + partnerId=NotImplemented, + createdAt=NotImplemented, + updatedAt=NotImplemented, + status=NotImplemented, + serverType=NotImplemented): + KalturaEntryServerNode.__init__(self, + id, + entryId, + serverNodeId, + partnerId, + createdAt, + updatedAt, + status, + serverType) + + + PROPERTY_LOADERS = { + } + + def fromXml(self, node): + KalturaEntryServerNode.fromXml(self, node) + self.fromXmlImpl(node, KalturaTaskEntryServerNode.PROPERTY_LOADERS) + + def toParams(self): + kparams = KalturaEntryServerNode.toParams(self) + kparams.put("objectType", "KalturaTaskEntryServerNode") + return kparams + # @package Kaltura # @subpackage Client @@ -39510,6 +39554,76 @@ def getStatusIn(self): def setStatusIn(self, newStatusIn): self.statusIn = newStatusIn +# @package Kaltura +# @subpackage Client +class KalturaClippingTaskEntryServerNode(KalturaTaskEntryServerNode): + def __init__(self, + id=NotImplemented, + entryId=NotImplemented, + serverNodeId=NotImplemented, + partnerId=NotImplemented, + createdAt=NotImplemented, + updatedAt=NotImplemented, + status=NotImplemented, + serverType=NotImplemented, + clipAttributes=NotImplemented, + clippedEntryId=NotImplemented, + liveEntryId=NotImplemented): + KalturaTaskEntryServerNode.__init__(self, + id, + entryId, + serverNodeId, + partnerId, + createdAt, + updatedAt, + status, + serverType) + + # @var KalturaClipAttributes + self.clipAttributes = clipAttributes + + # @var string + self.clippedEntryId = clippedEntryId + + # @var string + self.liveEntryId = liveEntryId + + + PROPERTY_LOADERS = { + 'clipAttributes': (KalturaObjectFactory.create, KalturaClipAttributes), + 'clippedEntryId': getXmlNodeText, + 'liveEntryId': getXmlNodeText, + } + + def fromXml(self, node): + KalturaTaskEntryServerNode.fromXml(self, node) + self.fromXmlImpl(node, KalturaClippingTaskEntryServerNode.PROPERTY_LOADERS) + + def toParams(self): + kparams = KalturaTaskEntryServerNode.toParams(self) + kparams.put("objectType", "KalturaClippingTaskEntryServerNode") + kparams.addObjectIfDefined("clipAttributes", self.clipAttributes) + kparams.addStringIfDefined("clippedEntryId", self.clippedEntryId) + kparams.addStringIfDefined("liveEntryId", self.liveEntryId) + return kparams + + def getClipAttributes(self): + return self.clipAttributes + + def setClipAttributes(self, newClipAttributes): + self.clipAttributes = newClipAttributes + + def getClippedEntryId(self): + return self.clippedEntryId + + def setClippedEntryId(self, newClippedEntryId): + self.clippedEntryId = newClippedEntryId + + def getLiveEntryId(self): + return self.liveEntryId + + def setLiveEntryId(self, newLiveEntryId): + self.liveEntryId = newLiveEntryId # @package Kaltura # @subpackage Client @@ -55287,6 +55401,16 @@ def get(self, id): resultNode = self.client.doQueue() return KalturaObjectFactory.create(resultNode, KalturaEntryServerNode) + def updateStatus(self, id, status): + kparams = KalturaParams() + kparams.addStringIfDefined("id", id) + kparams.addIntIfDefined("status", status); + self.client.queueServiceActionCall("entryservernode", "updateStatus", KalturaEntryServerNode, kparams) + if self.client.isMultiRequest(): + return self.client.getMultiRequestResult() + resultNode = self.client.doQueue() + return (KalturaObjectFactory.create(resultNode[0], KalturaEntryServerNode),resultNode[1]) + def validateRegisteredEntryServerNode(self, id): """Validates server node still registered on entry""" @@ -59330,6 +59454,8 @@ def getTypes(self): 'KalturaStringValue': KalturaStringValue, 'KalturaEntryReplacementOptions': KalturaEntryReplacementOptions, 'KalturaEntryServerNode': KalturaEntryServerNode, + 'KalturaTaskEntryServerNode': KalturaTaskEntryServerNode, + 'KalturaClippingTaskEntryServerNode': KalturaClippingTaskEntryServerNode, 'KalturaObjectIdentifier': KalturaObjectIdentifier, 'KalturaExtendingItemMrssParameter': KalturaExtendingItemMrssParameter, 'KalturaPlayableEntry': KalturaPlayableEntry, diff --git a/liveRecorder/Tasks/ConcatinationTask.py b/liveRecorder/Tasks/ConcatinationTask.py index 5bdfe280..94883895 100644 --- a/liveRecorder/Tasks/ConcatinationTask.py +++ b/liveRecorder/Tasks/ConcatinationTask.py @@ -13,6 +13,7 @@ from Logger.LoggerDecorator import log_subprocess_output from TaskBase import TaskBase from datetime import datetime +from KalturaClient.Plugins.Core import KalturaEntryReplacementStatus,KalturaEntryServerNodeStatus # todo add timeout, and use m3u8 insted of regex @@ -31,7 +32,6 @@ def __init__(self, param, logger_info): TaskBase.__init__(self, param, logger_info) concat_task_processing_dir = os.path.join(self.base_directory, self.__class__.__name__, 'processing') self.recording_path = os.path.join(concat_task_processing_dir, self.entry_directory) - self.stamp_full_path = os.path.join(self.recording_path, 'stamp') self.token_url = self.token_url_template.format(self.recorded_id) self.nginx_url = "http://" + self.token_url + "t/{0}" self.flavor_pattern = 'index-s(?P\d+)' @@ -109,33 +109,31 @@ def parse_m3u8(m3u8): matches = re.findall(regex, m3u8, re.MULTILINE) return matches - def get_flavor_id(self, url_postfix, single_flavor): - if single_flavor: - flavors_dirs = filter(os.path.isdir, - [os.path.join(self.recording_path, f) for f in os.listdir(self.recording_path)]) - flavor_id = flavors_dirs[0].rsplit('/', 1)[-1] - else: - result = re.search(self.flavor_pattern, url_postfix) - if not result: - error = "Error running concat task, failed to parse flavor from url: [%s]", obj.url - self.logger.error(error) - raise ValueError(error) - flavor_id = result.group('flavor') - - return flavor_id + def get_flavor_id(self, url_postfix): + result = re.search(self.flavor_pattern, url_postfix) + if result: + return result.group('flavor') + flavors_dirs = filter(os.path.isdir, [os.path.join(self.recording_path, f) for f in os.listdir(self.recording_path)]) + if flavors_dirs: + return flavors_dirs[0].rsplit('/', 1)[-1] + data = self.get_data() + if data and data["flavors"]: + return data["flavors"].split(',')[0] + return None def run(self): - + self.update_status(KalturaEntryServerNodeStatus.TASK_PROCESSING) command = self.ts_to_mp4_convertor + ' ' token = self.tokenize_url(self.token_url) self.url_base_entry = self.nginx_url.format(token) self.url_master = os.path.join(self.url_base_entry, 'master.m3u8') flavors_list = self.extract_flavor_dict() - single_flavor = len(flavors_list) == 1 for obj in flavors_list: url_postfix = obj.url.rsplit('/', 1)[1] - flavor_id = self.get_flavor_id(url_postfix, single_flavor) + flavor_id = self.get_flavor_id(url_postfix) + if flavor_id is None: + raise ValueError('Could not find flavor ID for {}'.format(obj.url)) ts_output_filename = self.get_output_filename(flavor_id) output_full_path = os.path.join(self.recording_path, ts_output_filename) mp4_full_path = output_full_path.replace('.ts', '.mp4') diff --git a/liveRecorder/Tasks/TaskBase.py b/liveRecorder/Tasks/TaskBase.py index ff3e490d..234c9012 100644 --- a/liveRecorder/Tasks/TaskBase.py +++ b/liveRecorder/Tasks/TaskBase.py @@ -4,6 +4,9 @@ from socket import gethostname from Config.config import get_config from RecordingException import UnequallStampException +from BackendClient import * +from KalturaClient.Plugins.Core import KalturaEntryServerNodeStatus + class TaskBase(object): @@ -25,6 +28,24 @@ def check_stamp(self): stamp) raise UnequallStampException(msg) + + def get_data(self): + try: + with open(self.data_full_path) as data_file: + data = json.load(data_file) + return data + except: + self.logger.debug("Error in loading the data.json file") + return None + + def update_status(self, new_status): + data = self.get_data() + if data and data["taskId"]: + id = data["taskId"] + self.logger.debug("Updating taskId: [{}] with new status: [{}]".format(id, new_status)) + self.backend_client.update_task_entryServerNode_status(id, new_status) + + def __init__(self, param, logger_info): self.duration = param['duration'] self.recorded_id = param['recorded_id'] @@ -37,6 +58,8 @@ def __init__(self, param, logger_info): self.recording_path = os.path.join(self.base_directory, self.__class__.__name__, 'processing', self.entry_directory) self.stamp_full_path = os.path.join(self.recording_path, 'stamp') + self.data_full_path = os.path.join(self.recording_path, 'data.json') + self.backend_client = BackendClient(self.entry_id + '-' + self.recorded_id) __metaclass__ = abc.ABCMeta diff --git a/liveRecorder/Tasks/TaskRunner.py b/liveRecorder/Tasks/TaskRunner.py index 59de2d94..5362fb8a 100644 --- a/liveRecorder/Tasks/TaskRunner.py +++ b/liveRecorder/Tasks/TaskRunner.py @@ -10,6 +10,7 @@ import time, threading import Queue as Q from RecordingException import UnequallStampException +from KalturaClient.Plugins.Core import KalturaEntryServerNodeStatus import schedule # Currently not support multiple machine pulling from one incoming dir. @@ -130,7 +131,7 @@ def move_and_add_to_queue(self, src_dir, queue_name): except Exception as e: self.logger.error("[%s-%s] Error while try to add task:%s \n %s", param['entry_id'], param['recorded_id'], str(e), traceback.format_exc()) - def move_to_incoming_dir(self, src_dir, dst_dir): + def move_to_dir(self, src_dir, dst_dir): file_list = os.listdir(src_dir) for path in file_list: @@ -179,6 +180,7 @@ def work(self, index): self.logger.fatal("[%s] Job %s on entry %s has no more retries or failed to get it, move entry to " "failed task directory ", logger_info, self.task_name, task_parameter['directory']) self.safe_move(src, self.error_directory) + job.update_status(KalturaEntryServerNodeStatus.ERROR) except Exception as e: self.logger.fatal("[%s] Failed to handle failure task %s \n %s", logger_info, str(e) , traceback.format_exc()) @@ -219,7 +221,7 @@ def failed_task_handler(self): def start(self): try: self.logger.info("Starting %d workers", self.number_of_processes) - self.move_to_incoming_dir(self.working_directory, self.input_directory) + self.move_to_dir(self.working_directory, self.input_directory) self.add_new_task_handler() self.failed_task_handler() workers = [Process(target=self.work, args=(i,)) for i in xrange(1, self.number_of_processes+1)] diff --git a/liveRecorder/Tasks/UploadTask.py b/liveRecorder/Tasks/UploadTask.py index 14070cb7..2419aebf 100644 --- a/liveRecorder/Tasks/UploadTask.py +++ b/liveRecorder/Tasks/UploadTask.py @@ -5,12 +5,13 @@ from TaskBase import TaskBase from ThreadWorkers import ThreadWorkers from KalturaUploadSession import KalturaUploadSession -from KalturaClient.Plugins.Core import KalturaEntryReplacementStatus +from KalturaClient.Plugins.Core import KalturaEntryReplacementStatus,KalturaEntryServerNodeStatus from KalturaClient.Base import KalturaException import glob import re + class UploadTask(TaskBase): # Global scope #global backend_client @@ -19,8 +20,6 @@ class UploadTask(TaskBase): def __init__(self, param, logger_info): TaskBase.__init__(self, param, logger_info) - session_id = self.entry_id + '-' + self.recorded_id - self.backend_client = BackendClient(session_id) mp4_filename_pattern = param['directory'] + '_f*_out.mp4' self.mp4_files_list = glob.glob1(self.recording_path, mp4_filename_pattern) self.mp4_filename_pattern = "[0,1]_.+_[0,1]_.+_\d+(.\d+)?_f(?P\d+)_out[.]mp4" @@ -95,6 +94,7 @@ def append_recording_handler(self, file_full_path, flavor_id, is_first_flavor): str(float(self.duration)/1000), self.recorded_id, flavor_id) def run(self): + self.update_status(KalturaEntryServerNodeStatus.TASK_UPLOADING) try: mode = get_config('mode') is_first_flavor = True @@ -129,6 +129,7 @@ def run(self): raise err else: self.logger.warn('there were no mp4 files to upload. check {}'.format(self.recording_path)) + self.update_status(KalturaEntryServerNodeStatus.TASK_FINISHED) except KalturaException as e: self.logger.error('failed to upload VOD with error {}, exception details: {}'.format(e.code, e.message)) if e.code == 'KALTURA_RECORDING_DISABLED':