Skip to content

Commit

Permalink
feat: setup listener on input bucket
Browse files Browse the repository at this point in the history
  • Loading branch information
birme committed Feb 24, 2025
1 parent b10ce9c commit cf3a388
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 2 deletions.
3 changes: 2 additions & 1 deletion src/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { Static, Type } from '@sinclair/typebox';
import { FastifyPluginCallback } from 'fastify';
import { Context } from '@osaas/client-core';
import { apiPipeline } from './api_pipeline';
import { onFileOnInput } from './orchestrator/events';

const HelloWorld = Type.String({
description: 'The magical words!'
Expand Down Expand Up @@ -67,7 +68,7 @@ export default (opts: ApiOptions) => {

api.register(healthcheck, { title: opts.title });
// register other API routes here
api.register(apiPipeline, { name: 'tvplus', ctx: opts.ctx });
api.register(apiPipeline, { name: 'tvplus', ctx: opts.ctx, onFileOnInput });

return api;
};
16 changes: 15 additions & 1 deletion src/api_pipeline.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ import { Context } from '@osaas/client-core';
import { createVodPipeline, removeVodPipeline } from '@osaas/client-transcode';
import { Type } from '@sinclair/typebox';
import { FastifyPluginCallback } from 'fastify';
import { setupListener } from './storage/listener';

export interface PipelineOptions {
name: string;
ctx: Context;
onFileOnInput: (r: any, pipeline: any) => Promise<void>;
}

export const apiPipeline: FastifyPluginCallback<PipelineOptions> = (
Expand All @@ -26,9 +28,21 @@ export const apiPipeline: FastifyPluginCallback<PipelineOptions> = (
},
async (request, reply) => {
try {
await createVodPipeline(opts.name, opts.ctx, {
const pipeline = await createVodPipeline(opts.name, opts.ctx, {
createInputBucket: true
});
if (pipeline.inputStorage) {
setupListener(
pipeline.inputStorage.name,
new URL(pipeline.inputStorage.endpoint),
pipeline.inputStorage.accessKeyId,
pipeline.inputStorage.secretAccessKey,
pipeline,
opts.onFileOnInput
);
} else {
throw new Error('Pipeline has no input storage');
}
reply.status(201).send({ message: 'Pipeline created' });
} catch (err: any) {
reply.status(500).send({ message: err.message });
Expand Down
3 changes: 3 additions & 0 deletions src/orchestrator/events.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
export async function onFileOnInput(rec: any, pipeline: any) {
console.log('Received file on input', rec);
}
28 changes: 28 additions & 0 deletions src/storage/listener.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import * as Minio from 'minio';

export async function setupListener(
bucketName: string,
endpoint: URL,
accessKeyId: string,
secretAccessKey: string,
pipeline: any,
onNotification: (r: any, pipeline: any) => Promise<void>
) {
const client = new Minio.Client({
endPoint: endpoint.hostname,
accessKey: accessKeyId,
secretKey: secretAccessKey,
useSSL: endpoint.protocol === 'https:'
});
const poller = client.listenBucketNotification(bucketName, '', '.mp4', [
's3:ObjectCreated:*'
]);
if (!poller) {
console.error('Failed to setup listener for bucket notifications');
}
console.log('Listening for notifications');
poller.on('notification', async (record) => {
await onNotification(record, pipeline);
poller.stop();
});
}

0 comments on commit cf3a388

Please sign in to comment.