Skip to content

Commit

Permalink
1.28 plat 7834 live clipping (#588)
Browse files Browse the repository at this point in the history
* Add live Recorder support

* Add live Recorder support

* Adding flavors to data file

* Add task status updates

* remove filter by prefix and change operatio order in recording manager

* LiveController code for live-clipping

* Add catch and change enum

* Add validation check and protect against file descriptor leaks

* Add support for multi clips in the live entry playlist

* Move get absolute time function from playlist to playlistGenerator

* move validation check to recording manager
  • Loading branch information
david-winder-kaltura authored Apr 29, 2018
1 parent 008206c commit 73b914a
Show file tree
Hide file tree
Showing 15 changed files with 409 additions and 32 deletions.
8 changes: 8 additions & 0 deletions common/PersistenceFormat.js
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,14 @@ class PersistenceFormat {
return 'playlist.json';
}

getStampFileName() {
return 'stamp';
}

getDataFileName() {
return 'data.json';
}

getMP4FileNamefromInfo(chunkPath) {
return chunkPath.replace('.ts', '.mp4');
}
Expand Down
2 changes: 2 additions & 0 deletions common/config/config.json.template
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
"flavorFailPollingInterval" : 10000,
"backendPollingInterval" : 5000,
"wowzaPollingInterval" : 5000,
"tasksPollingIntervalInSeconds" : 1800,
"adapterDiagnosticsIntervalInSec" : 20000,
"defaultDvrWindowInSec" : 7200,
"minimalDvrWindowInSec" : 900,
Expand Down Expand Up @@ -121,5 +122,6 @@
"recordingMaxAllowedMp4FilesQueueSize" : 100
},
"explicitLivePushNotificationTemplateName": "EXPLICIT_LIVE_PUSH_NOTIFICATIONS",
"newClippingTaskPushNotificationTemplateName": "CLIPPING_TASK_ENTRY_SERVER_NODE_CREATED_NOTIFICATIONS",
"recordingNotStartedMaxQueueSize": "11"
}
48 changes: 48 additions & 0 deletions lib/BackendClient.js
Original file line number Diff line number Diff line change
Expand Up @@ -568,6 +568,47 @@ var BackendClient = (function(){
return getLiveEntryServerNodes(entryId, filter);
};

BackendClient.getEntryServerNode = function(entryServerNodeId) {
let deferred = Q.defer();
return createSession()
.then(function() {
client.entryServerNode.get(function (result, err, headers) {
printAPIResponse(result, headers);
if (!err && result && result.objectType !== "KalturaAPIException") { //Use to indicate success operation
logger.info("[%s] entry server node successfully retrieved [%j]", entryServerNodeId, result);
}
else
{
logger.error(util.format("[%s] Failed to retrieve entry server node: %s %j", entryServerNodeId, ErrorUtils.error2string(err), result));
result = null;
}
deferred.resolve(result);
}, entryServerNodeId);
return deferred.promise;
});
};

BackendClient.updateEntryServerNodeStatus = function(entryServerNodeId, newStatus) {
let deferred = Q.defer();
return createSession()
.then(function() {
client.entryServerNode.updateStatus(function (result, err, headers) {
printAPIResponse(result, headers);
if (!err && result && result.objectType !== "KalturaAPIException") { //Use to indicate success operation
logger.info("[%s] entry server node successfully updated [%j]", entryServerNodeId, result);
deferred.resolve(result);
}
else
{
let msg = util.format("[%s] Failed to update Status of entry server node: %s %j", entryServerNodeId, ErrorUtils.error2string(err), result);
logger.error(msg);
deferred.reject(msg);
}
}, entryServerNodeId, newStatus);
return deferred.promise;
});
};

BackendClient.getLiveEntriesForMediaServer = function() {
return createSession()
.then(function() {
Expand All @@ -584,6 +625,13 @@ var BackendClient = (function(){
});
};

BackendClient.getMediaServerId = function() {
return createSession()
.then(function () {
return getMediaServerIdPromise();
});
};

BackendClient.getFlavorsMultiStream = function(entryObject, flavors) {
logger.debug("[%s] Entered getFlavorsMultiStream. Creating session and retrieving live flavors extra params", entryObject.entryId);
return createSession()
Expand Down
78 changes: 78 additions & 0 deletions lib/TaskManager.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/**
* Created by david.winder on 04/15/2018.
*/

const util = require('util');
const BackendClient = require('./BackendClientFactory').getBackendClient();
const KalturaTypes = require('./kaltura-client-lib/KalturaTypes');
const kalturaVO = require('./kaltura-client-lib/KalturaVO');
const logger = require('../common/logger').getLogger('TaskManager');
const config = require('../common/Configuration');
const pushManager = require('./PushManager').PushManager;

const interval = config.get('tasksPollingIntervalInSeconds');
const partnerId = config.get('backendClient').partnerId;

class TaskManager {

constructor()
{
this.callbackMap = {};
this.mediaServerIdPromise = BackendClient.getMediaServerId();
this.mediaServerIdPromise.then(serverNodeId => {
setTimeout(()=>{this._updateList(serverNodeId);}, interval * 1000);
});
}

register(entryServerNodeType, templateName, callback)
{
this.callbackMap[entryServerNodeType] = callback;
this.mediaServerIdPromise.then(serverNodeId => {
logger.debug("Register to Task with for " + templateName + " [" + entryServerNodeType + "] for partner: " + partnerId);
pushManager.registerObjectToPush(partnerId, templateName, templateName, {'serverNodeId': serverNodeId}, (...args) => this._pushMessageReceived(...args));
pushManager.setObjectEnabled(templateName, true);
});
}

_execIfCan(task)
{
if (this.callbackMap[task.serverType])
return this.callbackMap[task.serverType](task);
logger.info('Task with ID [' + task.id + '] cannot be execute - No callback in map');
}

_pushMessageReceived(msg)
{
if (!Array.isArray(msg) || msg.length < 1)
return logger.error('Got un-valid massage from push server: ' + msg);

let task = msg[0];
BackendClient.getEntryServerNode(task.id).then(entryServerNode => {
if (entryServerNode && entryServerNode.status == KalturaTypes.KalturaEntryServerNodeStatus.TASK_PENDING)
return this._execIfCan(task);
logger.info('Task with ID [' + task.id + '] should no executed');
});
}

static _getEntryServerNodesList(serverNodeId, interval)
{
let filter = new kalturaVO.KalturaEntryServerNodeFilter();
filter.serverNodeIdEqual = serverNodeId;
filter.statusIn = KalturaTypes.KalturaEntryServerNodeStatus.TASK_PENDING;
filter.createdAtLessThanOrEqual = - interval;
return BackendClient.getLiveEntryServerNodes(null, filter);
}

_updateList(serverNodeId)
{
TaskManager._getEntryServerNodesList(serverNodeId, interval).then(entryServerNodes => {
for (var i = 0; i < entryServerNodes.length; i++)
this._execIfCan(entryServerNodes[i]);
}).finally(() => {
setTimeout(()=>{this._updateList(serverNodeId);}, interval * 1000);
});
}

}

module.exports.TaskManager = new TaskManager();
17 changes: 17 additions & 0 deletions lib/kaltura-client-lib/KalturaServices.js
Original file line number Diff line number Diff line change
Expand Up @@ -2396,6 +2396,7 @@ KalturaEmailIngestionProfileService.prototype.addMediaEntry = function(callback,
*Class definition for the Kaltura service: entryServerNode.
* The available service actions:
* @action update .
* @action updateStatus Update response profile status by id.
* @action list .
* @action get .
*/
Expand All @@ -2422,6 +2423,22 @@ KalturaEntryServerNodeService.prototype.update = function(callback, id, entrySer
this.client.doQueue(callback);
}
};

/**
* .
* @param id int
* @param status int (optional, enum: KalturaEntryServerNodeStatus).
* @return KalturaEntryServerNode.
*/
KalturaEntryServerNodeService.prototype.updateStatus = function(callback, id, entryServerNodeStatus){
var kparams = {};
this.client.addParam(kparams, 'id', id);
this.client.addParam(kparams, 'status', entryServerNodeStatus);
this.client.queueServiceActionCall('entryservernode', 'updateStatus', kparams);
if (!this.client.isMultiRequest()){
this.client.doQueue(callback);
}
};
/**
* .
* @param filter KalturaEntryServerNodeFilter (optional, default: null).
Expand Down
10 changes: 8 additions & 2 deletions lib/kaltura-client-lib/KalturaTypes.js
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,12 @@ STOPPED : 0,
PLAYABLE : 1,
BROADCASTING : 2,
AUTHENTICATED : 3,
MARKED_FOR_DELETION : 4
MARKED_FOR_DELETION : 4,
TASK_PENDING : 5,
TASK_QUEUED : 6,
TASK_PROCESSING : 7,
TASK_UPLOADING : 8,
TASK_UPLOADING : 9,
};

var KalturaEventNotificationTemplateStatus = module.exports.KalturaEventNotificationTemplateStatus = {
Expand Down Expand Up @@ -732,7 +737,7 @@ PDF : 1,

var KalturaRecordStatus = module.exports.KalturaRecordStatus = {
DISABLED : 0,
APPENDED : 1,
APPENDED : 1,
PER_SESSION : 2,
};

Expand Down Expand Up @@ -2422,6 +2427,7 @@ UPDATED_AT_DESC : '-updatedAt',
var KalturaEntryServerNodeType = module.exports.KalturaEntryServerNodeType = {
LIVE_PRIMARY : '0',
LIVE_BACKUP : '1',
LIVE_CLIPPING_TASK : '2',
};

var KalturaEntryStatus = module.exports.KalturaEntryStatus = {
Expand Down
13 changes: 13 additions & 0 deletions lib/playlistGenerator/PlaylistGenerator.js
Original file line number Diff line number Diff line change
Expand Up @@ -688,4 +688,17 @@ PlaylistGenerator.prototype.sortFlavors = function(defaultAudioLang)

}

PlaylistGenerator.getAbsoluteTimeFromRelative = function(playlist, relativeTime)
{
let totalPlaylistDuration = playlist.durations.reduce((a, b) => a + b, 0);
//setting valid time
relativeTime = max(relativeTime ,0);
relativeTime = min(relativeTime, totalPlaylistDuration)
let i = 0;
while (relativeTime > playlist.durations[i])
relativeTime -= playlist.durations[i++]; //search for the first clip that fits
return playlist.clipTimes[i] + relativeTime;
}


module.exports = PlaylistGenerator;
60 changes: 57 additions & 3 deletions lib/recording/RecordingManager.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ const kalturaTypes = require('../kaltura-client-lib/KalturaTypes');
const unknownEntry = "UNKNOWN";
const backendClient = require('../BackendClientFactory.js').getBackendClient();
const maxAllowedMp4FilesQueueSize = config.get("recording").recordingMaxAllowedMp4FilesQueueSize;
const TaskManager = require('../TaskManager').TaskManager;
const completedRecordingFolderPath = configRecording.completedRecordingFolderPath;
const clippingTaskTemplateName = config.get('newClippingTaskPushNotificationTemplateName');

class RecordingManager {
//todo what to do if we stop stream and start, but other machine has start to get it
Expand All @@ -40,13 +43,14 @@ class RecordingManager {

const recordingTimeInterval = configRecording.recordingTimeIntervalInSec * 1000;
this.completedRecordingFolderPath = configRecording.completedRecordingFolderPath;
//veryfiy that recordingMaxDurationInHours is no longer that
//verify that recordingMaxDurationInHours is no longer that
this.recordingMaxDurationInMs = config.get("recording").recordingMaxDurationInHours * 60 * 60 * 1000;
this.recordingMinDurationInMS = config.get("recording").recordingMinDurationInSeconds * 1000;
this.recordingList = {};

logger.info("Initializing recording manager");

TaskManager.register(kalturaTypes.KalturaEntryServerNodeType.LIVE_CLIPPING_TASK, clippingTaskTemplateName, this.handleNewLiveClippingTask);

setInterval(()=> {
this.handleRecordingEntries()
}, recordingTimeInterval, this);
Expand Down Expand Up @@ -449,7 +453,7 @@ class RecordingManager {

handleMaxDuration(entryId) {
let handleMaxDurationPromise = null;
let recordingEntrySession = this.recordingList[entryId]
let recordingEntrySession = this.recordingList[entryId];
if (!recordingEntrySession) {
logger.info(`[${entryId}] Entry does not exist in recording list. Maybe entry has passed the limit of max duration`);
return Q.resolve();
Expand Down Expand Up @@ -480,6 +484,56 @@ class RecordingManager {
});
}

handleNewLiveClippingTask(task)
{
logger.debug("TASK - in the recordingManager call back for task: " + util.inspect(task));
let playListPath = path.join(persistenceFormat.getEntryBasePath(task.entryId), persistenceFormat.getMasterManifestName());
let newPath = persistenceFormat.getRecordingSessionPath(task.entryId, null, task.clippedEntryId);
let newPathInLive = persistenceFormat.getEntryBasePath(task.clippedEntryId);
let duration = task.clipAttributes.duration;
let strDuration = duration.toString();
let flavors = null;

return qio.makeTree(newPath).then(() => {
return qio.read(playListPath);
}).then((data) => {
let playList = JSON.parse(data);
playList.clipFrom = PlaylistGenerator.getAbsoluteTimeFromRelative(playList, task.clipAttributes.offset);
playList.clipTo = PlaylistGenerator.getAbsoluteTimeFromRelative(playList, task.clipAttributes.offset + duration);
if (playList.clipFrom >= playList.clipTo)
{
logger.error("VALIDATION ERROR: can not set playlist attribute: clipFrom: [" +playList.clipFrom + "] and clipTo: [" + playList.clipTo + "]");
backendClient.updateEntryServerNodeStatus(task.id, kalturaTypes.KalturaEntryServerNodeStatus.ERROR);
throw new Error("Error in setting playlist attribute for task id: " + task.id);
}
flavors = playList.sequences.map(elem => elem.id).join(',');
let newPlayListPath = path.join(newPath,persistenceFormat.getMasterManifestName());
logger.debug("Saving new playlist with clip Attributes to: " + newPlayListPath);
return qio.write(newPlayListPath, JSON.stringify(playList));
}).then(() => {
let stampPath = path.join(newPath,persistenceFormat.getStampFileName());
logger.debug(`Creating Stamp file with [${strDuration}] on [${stampPath}]`);
return qio.write(stampPath, strDuration);
}).then(() => {
let dataFilePath = path.join(newPath,persistenceFormat.getDataFileName());
let dataStr = JSON.stringify({"taskId": task.id, "taskType": task.serverType, "flavors": flavors});
logger.debug(`Creating Data file with [${dataStr}] on [${dataFilePath}]`);
return qio.write(dataFilePath, dataStr);
}).then(() => {
logger.debug(`Creating softLink from [${newPathInLive}] to [${newPath}]`);
return qio.symbolicLink(newPathInLive, newPath, 'directory');
}).then(() => {
let destFilePath = path.join(completedRecordingFolderPath, task.liveEntryId + '_' + task.clippedEntryId + '_' + strDuration);
logger.debug(`Creating softLink (for liveRecorder) from [${destFilePath}] to [${newPath}]`);
return qio.symbolicLink(destFilePath, newPath, 'directory')
}).then(() => {
logger.debug("Updating status of: " + task.id + " to QUEUE");
return backendClient.updateEntryServerNodeStatus(task.id, kalturaTypes.KalturaEntryServerNodeStatus.TASK_QUEUED);
}).catch((err) => {
logger.error(`[${task.id}] failed processing task. Error [${ErrorUtils.error2string(err)}]`);
});
}

}

if (!configRecording.enable){
Expand Down
3 changes: 2 additions & 1 deletion liveRecorder/BackendClient.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,8 @@ def set_recorded_content_local(self, partner_id, entry_id, output_file, duration
resource.keepOriginalFile = False
self.set_recorded_content(entry_id, resource, duration, partner_id, recorded_id, flavor_id)


def update_task_entryServerNode_status(self, entry_server_node_id, new_status):
return self.handle_request(self.partner_id, 'entryServerNode', 'updateStatus', entry_server_node_id, new_status)



6 changes: 3 additions & 3 deletions liveRecorder/Config/configMapping.ini.template
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ admin_secret = @KALTURA_PARTNER_ADMIN_SECRET@
partner_id = @KALTURA_PARTNER_ID@
mode = @VOD_UPLOAD_MODE@
token_key = @LIVE_PACKAGER_TOKEN@
cron_job_log_file_name = "@LOGS_BASE_PATH@/recording_cron.log"
log_file_name = "@LOGS_BASE_PATH@/liveRecorder.log"
recover_log_file_name = "@LOGS_BASE_PATH@/recorder.log"
cron_job_log_file_name = @LOGS_BASE_PATH@/recording_cron.log
log_file_name = @LOGS_BASE_PATH@/liveRecorder.log
recover_log_file_name = @LOGS_BASE_PATH@/recorder.log
api_service_url = @KALTURA_SERVICE_URL@
Loading

0 comments on commit 73b914a

Please sign in to comment.