diff --git a/.github/workflows/proto-sync.yml b/.github/workflows/proto-sync.yml index 6fc47c8a1..f53c78ab5 100644 --- a/.github/workflows/proto-sync.yml +++ b/.github/workflows/proto-sync.yml @@ -21,6 +21,8 @@ jobs: cp aliecs/core/protos/o2control.proto Control/protobuf cp aliecs/common/protos/events.proto Control/protobuf/protos cp aliecs/common/protos/common.proto Control/protobuf/protos/protos + cp aliecs/common/protos/events.proto Framework/Backend/protos + cp aliecs/common/protos/common.proto Framework/Backend/protos cp aliecs/apricot/protos/apricot.proto Control/protobuf/o2apricot.proto rm -rf aliecs - name: Check if there are any differences and create PR diff --git a/Framework/Backend/index.js b/Framework/Backend/index.js index 7585031f0..5fb1d4837 100644 --- a/Framework/Backend/index.js +++ b/Framework/Backend/index.js @@ -39,6 +39,9 @@ const { } = require('./errors/updateAndSendExpressResponseFromNativeError.js'); const { Logger } = require('./log/Logger'); +const { getWebUiProtoIncludeDir } = require('./protobuf/getWebUiProtoIncludeDir'); +const { AliEcsEventMessagesConsumer } = require('./kafka/AliEcsEventMessagesConsumer.js'); + exports.ConsulService = ConsulService; exports.HttpServer = HttpServer; @@ -85,3 +88,7 @@ exports.GrpcErrorCodes = GrpcErrorCodes; exports.grpcErrorToNativeError = grpcErrorToNativeError; exports.updateAndSendExpressResponseFromNativeError = updateAndSendExpressResponseFromNativeError; + +exports.getWebUiProtoIncludeDir = getWebUiProtoIncludeDir; + +exports.AliEcsEventMessagesConsumer = AliEcsEventMessagesConsumer; diff --git a/Framework/Backend/kafka/AliEcsEventMessagesConsumer.js b/Framework/Backend/kafka/AliEcsEventMessagesConsumer.js new file mode 100644 index 000000000..356e938cc --- /dev/null +++ b/Framework/Backend/kafka/AliEcsEventMessagesConsumer.js @@ -0,0 +1,63 @@ +/** + * @license + * Copyright CERN and copyright holders of ALICE O2. This software is + * distributed under the terms of the GNU General Public License v3 (GPL + * Version 3), copied verbatim in the file "COPYING". + * + * See http://alice-o2.web.cern.ch/license for full licensing information. + * + * In applying this license CERN does not waive the privileges and immunities + * granted to it by virtue of its status as an Intergovernmental Organization + * or submit itself to any jurisdiction. + */ + +const protobuf = require('protobufjs'); +const path = require('node:path'); +const { KafkaMessagesConsumer } = require('./KafkaMessagesConsumer.js'); +const { getWebUiProtoIncludeDir } = require('../protobuf/getWebUiProtoIncludeDir.js'); + +// Customize protobuf loader to set the import directory, protobuf do not allow to do so... +const root = new protobuf.Root(); +root.resolvePath = (origin, target) => { + if (path.isAbsolute(target)) { + return target; + } + + return path.join(getWebUiProtoIncludeDir(), target); +}; + +root.loadSync(path.resolve(__dirname, '../protobuf/protos/events.proto')); +const EventMessage = root.lookupType('events.Event'); + +/** + * @callback MessageReceivedCallback + * @param {EventMessage} message received message + * @return {Promise} + */ + +/** + * Consumer that consume ECS event messages and pass them to previously-registered listeners + */ +class AliEcsEventMessagesConsumer extends KafkaMessagesConsumer { + /** + * Constructor + * + * @param {import('kafkajs').Kafka} kafkaClient configured kafka client + * @param {string} groupId the group id to use for the kafka consumer + * @param {string[]} topics the list of topics to consume + */ + constructor(kafkaClient, groupId, topics) { + super(kafkaClient, groupId, topics, EventMessage); + } + + /** + * @inheritDoc + */ + getLoggerLabel() { + return 'ALI-ECS-EVENT-CONSUMER'; + } +} + +exports.AliEcsEventMessagesConsumer = AliEcsEventMessagesConsumer; + +exports.EventMessage = EventMessage; diff --git a/Framework/Backend/kafka/KafkaMessagesConsumer.js b/Framework/Backend/kafka/KafkaMessagesConsumer.js new file mode 100644 index 000000000..6d83ad22b --- /dev/null +++ b/Framework/Backend/kafka/KafkaMessagesConsumer.js @@ -0,0 +1,110 @@ +/** + * @license + * Copyright 2019-2020 CERN and copyright holders of ALICE O2. + * See http://alice-o2.web.cern.ch/copyright for details of the copyright holders. + * All rights not expressly granted are reserved. + * + * This software is distributed under the terms of the GNU General Public + * License v3 (GPL Version 3), copied verbatim in the file "COPYING". + * + * In applying this license CERN does not waive the privileges and immunities + * granted to it by virtue of its status as an Intergovernmental Organization + * or submit itself to any jurisdiction. + */ + +const { LogManager } = require('../log/LogManager.js'); + +/** + * Generic Kafka Message consumer extracting objects according to a protobuf definition + * @template T extends import('protobufjs').Type + */ +class KafkaMessagesConsumer { + /** + * Constructor + * + * @param {import('kafkajs').Kafka} kafkaClient configured kafka client + * @param {string} groupId the group id to use for the kafka consumer + * @param {string[]} topics the list of topics to consume + * @param {import('protobufjs').Type} protoType the type definition of the handled message + */ + constructor(kafkaClient, groupId, topics, protoType) { + this.consumer = kafkaClient.consumer({ groupId }); + this._topics = topics; + this._protoType = protoType; + + /** + * @type {MessageReceivedCallback[]} + * @private + */ + this._listeners = []; + + this._logger = LogManager.getLogger(this.getLoggerLabel()); + } + + /** + * Register a listener to listen on event message being received + * + * Listeners are called all at once, not waiting for completion before calling the next ones, only errors are caught and logged + * + * @param {MessageReceivedCallback} listener the listener to register + * @return {void} + */ + onMessageReceived(listener) { + this._listeners.push(listener); + } + + /** + * Start the kafka consumer + * + * @return {Promise} Resolves once the consumer started to consume messages + */ + async start() { + this._logger.infoMessage(`Started to listen on kafka topic ${this._topics}`); + await this.consumer.connect(); + await this.consumer.subscribe({ topics: this._topics }); + await this.consumer.run({ + eachMessage: async ({ message, topic }) => { + const error = this._protoType.verify(message.value); + if (error) { + this._logger.errorMessage(`Received an invalid message on "${topic}" ${error}`); + return; + } + this._logger.debugMessage(`Received message on ${topic}`); + + try { + await this._handleEvent(this._protoType.toObject( + this._protoType.decode(message.value), + { enums: String }, + )); + } catch (error) { + this._logger.errorMessage(`Failed to convert message to object on topic ${topic}: ${error}`); + } + }, + }); + } + + /** + * Call every registered listeners by passing the given message to it + * + * @param {T} message the message to pass to listeners + * @return {void} + */ + async _handleEvent(message) { + for (const listener of this._listeners) { + listener(message).catch((error) => { + this._logger.errorMessage(`An error occurred when handling event: ${error.message}\n${error.stack}`); + }); + } + } + + /** + * Return the label to be used by the logger + * + * @return {string} the logger label + */ + getLoggerLabel() { + return 'EVENT-CONSUMER'; + } +} + +exports.KafkaMessagesConsumer = KafkaMessagesConsumer; diff --git a/Framework/Backend/protobuf/getWebUiProtoIncludeDir.js b/Framework/Backend/protobuf/getWebUiProtoIncludeDir.js new file mode 100644 index 000000000..724797f8b --- /dev/null +++ b/Framework/Backend/protobuf/getWebUiProtoIncludeDir.js @@ -0,0 +1,15 @@ +/** + * @license + * Copyright 2019-2020 CERN and copyright holders of ALICE O2. + * See http://alice-o2.web.cern.ch/copyright for details of the copyright holders. + * All rights not expressly granted are reserved. + * + * This software is distributed under the terms of the GNU General Public + * License v3 (GPL Version 3), copied verbatim in the file "COPYING". + * + * In applying this license CERN does not waive the privileges and immunities + * granted to it by virtue of its status as an Intergovernmental Organization + * or submit itself to any jurisdiction. + */ + +exports.getWebUiProtoIncludeDir = () => __dirname; diff --git a/Framework/Backend/protobuf/protos/common.proto b/Framework/Backend/protobuf/protos/common.proto new file mode 100644 index 000000000..435790e40 --- /dev/null +++ b/Framework/Backend/protobuf/protos/common.proto @@ -0,0 +1,48 @@ +/* + * === This file is part of ALICE O² === + * + * Copyright 2024 CERN and copyright holders of ALICE O². + * Author: Teo Mrnjavac + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + * + * In applying this license CERN does not waive the privileges and + * immunities granted to it by virtue of its status as an + * Intergovernmental Organization or submit itself to any jurisdiction. + */ + + +syntax = "proto3"; + +package common; +option java_package = "ch.cern.alice.o2.control.common"; +option go_package = "github.com/AliceO2Group/Control/common/protos;pb"; + +//////////////// Common types /////////////// + +message User { + // The unique CERN identifier of this user. + optional int32 externalId = 1; + // The unique identifier of this entity. + optional int32 id = 2; + // Name of the user. + string name = 3; +} + +message WorkflowTemplateInfo { + string name = 1; + string description = 2; + string path = 3; + bool public = 4; // whether the environment is public or not +} \ No newline at end of file diff --git a/Framework/Backend/protobuf/protos/events.proto b/Framework/Backend/protobuf/protos/events.proto new file mode 100644 index 000000000..cc6940a18 --- /dev/null +++ b/Framework/Backend/protobuf/protos/events.proto @@ -0,0 +1,148 @@ +/* + * === This file is part of ALICE O² === + * + * Copyright 2024 CERN and copyright holders of ALICE O². + * Author: Teo Mrnjavac + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + * + * In applying this license CERN does not waive the privileges and + * immunities granted to it by virtue of its status as an + * Intergovernmental Organization or submit itself to any jurisdiction. + */ + +syntax = "proto3"; + +package events; +option java_package = "ch.cern.alice.o2.control.events"; +option go_package = "github.com/AliceO2Group/Control/common/protos;pb"; + +import public "protos/common.proto"; + +//////////////// Common event messages /////////////// + +enum OpStatus { + NULL = 0; + STARTED = 1; + ONGOING = 2; + DONE_OK = 3; + DONE_ERROR = 4; + DONE_TIMEOUT = 5; +} + +message Ev_MetaEvent_MesosHeartbeat { +} + +message Ev_MetaEvent_CoreStart { + string frameworkId = 1; +} + +message Ev_MetaEvent_FrameworkEvent { + string frameworkId = 1; + string message = 2; +} + +message Ev_EnvironmentEvent { + string environmentId = 1; + string state = 2; + uint32 runNumber = 3; // only when the environment is in the running state + string error = 4; + string message = 5; // any additional message concerning the current state or transition + string transition = 6; + string transitionStep = 7; + OpStatus transitionStatus = 8; + map vars = 9; // consolidated environment variables at the root role of the environment + common.User lastRequestUser = 10; + common.WorkflowTemplateInfo workflowTemplateInfo = 11; +} + +message Traits { + string trigger = 1; + string await = 2; + string timeout = 3; + bool critical = 4; +} + +message Ev_TaskEvent { + string name = 1; // task name, based on the name of the task class + string taskid = 2; // task id, unique + string state = 3; // state machine state for this task + string status = 4; // active/inactive etc. + string hostname = 5; + string className = 6; // name of the task class from which this task was spawned + Traits traits = 7; + string environmentId = 8; + string path = 9; // path to the parent taskRole of this task within the environment +} + +message Ev_CallEvent { + string func = 1; // name of the function being called, within the workflow template context + OpStatus callStatus = 2; // progress or success/failure state of the call + string return = 3; // return value of the function + Traits traits = 4; + string output = 5; // any additional output of the function + string error = 6; // error value, if returned + string environmentId = 7; + string path = 8; // path to the parent callRole of this call within the environment +} + +message Ev_RoleEvent { + string name = 1; // role name + string status = 2; // active/inactive etc., derived from the state of child tasks, calls or other roles + string state = 3; // state machine state for this role + string rolePath = 4; // path to this role within the environment + string environmentId = 5; +} + +message Ev_IntegratedServiceEvent { + string name = 1; // name of the context, usually the path of the callRole that calls a given integrated service function e.g. readout-dataflow.dd-scheduler.terminate + string error = 2; // error message, if any + string operationName = 3; // name of the operation, usually the name of the integrated service function being called e.g. ddsched.PartitionTerminate()" + OpStatus operationStatus = 4; // progress or success/failure state of the operation + string operationStep = 5; // if the operation has substeps, this is the name of the current substep, like an API call or polling phase + OpStatus operationStepStatus = 6; // progress or success/failure state of the current substep + string environmentId = 7; + string payload = 8; // any additional payload, depending on the integrated service; there is no schema, it can even be the raw return structure of a remote API call +} + +message Ev_RunEvent { + string environmentId = 1; + uint32 runNumber = 2; + string state = 3; + string error = 4; + string transition = 5; + OpStatus transitionStatus = 6; + map vars = 7; + common.User lastRequestUser = 8; +} + +message Event { + int64 timestamp = 1; + int64 timestampNano = 2; + reserved 3 to 10; + reserved 17 to 100; + + oneof Payload { + Ev_EnvironmentEvent environmentEvent = 11; + Ev_TaskEvent taskEvent = 12; + Ev_RoleEvent roleEvent = 13; + Ev_CallEvent callEvent = 14; + Ev_IntegratedServiceEvent integratedServiceEvent = 15; + Ev_RunEvent runEvent = 16; + + Ev_MetaEvent_FrameworkEvent frameworkEvent = 101; + Ev_MetaEvent_MesosHeartbeat mesosHeartbeatEvent = 102; + Ev_MetaEvent_CoreStart coreStartEvent = 103; + } +} diff --git a/Framework/Backend/test/mocha-kafka.js b/Framework/Backend/test/mocha-kafka.js new file mode 100644 index 000000000..5dfae9994 --- /dev/null +++ b/Framework/Backend/test/mocha-kafka.js @@ -0,0 +1,69 @@ +const { AliEcsEventMessagesConsumer, EventMessage } = require('../kafka/AliEcsEventMessagesConsumer.js'); +const assert = require('node:assert'); +const { fromInt } = require('long'); + +const dummyKafkaClient = { + eachMessage: null, + + /** + * Dummy kafka client + */ + consumer() { + const client = this; + return { + + /** + * Dummy connect implementation + */ + connect() { + }, + + /** + * Dummy subscription implementation + */ + subscribe() { + }, + + /** + * Dummy run implementation + * @param configuration + */ + run(configuration) { + client.eachMessage = configuration.eachMessage; + }, + }; + }, + + /** + * Send a dummy message + * @param message + */ + sendDummyMessage(message) { + this.eachMessage({ message: { value: message }, topic: 'dummy-topic' }); + }, +}; + +describe('KAFKA CONSUMERS', () => { + it('should successfully create an AliECS event message consumer', async () => { + const messagesConsumer = new AliEcsEventMessagesConsumer( + dummyKafkaClient, + 'dummy-group-id', + ['dummy-topic'], + ); + let receviedMessages = null; + messagesConsumer.onMessageReceived((message) => { + receviedMessages = message; + }); + + await messagesConsumer.start(); + const dummyMessage = { timestamp: 123, timestampNano: 456, mesosHeartbeatEvent: {} }; + const message = EventMessage.encode(EventMessage.create(dummyMessage)).finish(); + dummyKafkaClient.sendDummyMessage(message); + + assert.deepStrictEqual({ + timestamp: fromInt(123, false), + timestampNano: fromInt(456, false), + mesosHeartbeatEvent: {}, + }, receviedMessages); + }); +}); diff --git a/Framework/package-lock.json b/Framework/package-lock.json index 5b6584091..7a5820940 100644 --- a/Framework/package-lock.json +++ b/Framework/package-lock.json @@ -16,6 +16,7 @@ "mithril": "1.1.7", "mysql": "^2.18.1", "openid-client": "^5.6.0", + "protobufjs": "^7.4.0", "winston": "3.17.0", "ws": "^8.18.0" }, @@ -997,6 +998,60 @@ "url": "https://opencollective.com/unts" } }, + "node_modules/@protobufjs/aspromise": { + "version": "1.1.2", + "resolved": "https://registry.npmjs.org/@protobufjs/aspromise/-/aspromise-1.1.2.tgz", + "integrity": "sha512-j+gKExEuLmKwvz3OgROXtrJ2UG2x8Ch2YZUxahh+s1F2HZ+wAceUNLkvy6zKCPVRkU++ZWQrdxsUeQXmcg4uoQ==" + }, + "node_modules/@protobufjs/base64": { + "version": "1.1.2", + "resolved": "https://registry.npmjs.org/@protobufjs/base64/-/base64-1.1.2.tgz", + "integrity": "sha512-AZkcAA5vnN/v4PDqKyMR5lx7hZttPDgClv83E//FMNhR2TMcLUhfRUBHCmSl0oi9zMgDDqRUJkSxO3wm85+XLg==" + }, + "node_modules/@protobufjs/codegen": { + "version": "2.0.4", + "resolved": "https://registry.npmjs.org/@protobufjs/codegen/-/codegen-2.0.4.tgz", + "integrity": "sha512-YyFaikqM5sH0ziFZCN3xDC7zeGaB/d0IUb9CATugHWbd1FRFwWwt4ld4OYMPWu5a3Xe01mGAULCdqhMlPl29Jg==" + }, + "node_modules/@protobufjs/eventemitter": { + "version": "1.1.0", + "resolved": "https://registry.npmjs.org/@protobufjs/eventemitter/-/eventemitter-1.1.0.tgz", + "integrity": "sha512-j9ednRT81vYJ9OfVuXG6ERSTdEL1xVsNgqpkxMsbIabzSo3goCjDIveeGv5d03om39ML71RdmrGNjG5SReBP/Q==" + }, + "node_modules/@protobufjs/fetch": { + "version": "1.1.0", + "resolved": "https://registry.npmjs.org/@protobufjs/fetch/-/fetch-1.1.0.tgz", + "integrity": "sha512-lljVXpqXebpsijW71PZaCYeIcE5on1w5DlQy5WH6GLbFryLUrBD4932W/E2BSpfRJWseIL4v/KPgBFxDOIdKpQ==", + "dependencies": { + "@protobufjs/aspromise": "^1.1.1", + "@protobufjs/inquire": "^1.1.0" + } + }, + "node_modules/@protobufjs/float": { + "version": "1.0.2", + "resolved": "https://registry.npmjs.org/@protobufjs/float/-/float-1.0.2.tgz", + "integrity": "sha512-Ddb+kVXlXst9d+R9PfTIxh1EdNkgoRe5tOX6t01f1lYWOvJnSPDBlG241QLzcyPdoNTsblLUdujGSE4RzrTZGQ==" + }, + "node_modules/@protobufjs/inquire": { + "version": "1.1.0", + "resolved": "https://registry.npmjs.org/@protobufjs/inquire/-/inquire-1.1.0.tgz", + "integrity": "sha512-kdSefcPdruJiFMVSbn801t4vFK7KB/5gd2fYvrxhuJYg8ILrmn9SKSX2tZdV6V+ksulWqS7aXjBcRXl3wHoD9Q==" + }, + "node_modules/@protobufjs/path": { + "version": "1.1.2", + "resolved": "https://registry.npmjs.org/@protobufjs/path/-/path-1.1.2.tgz", + "integrity": "sha512-6JOcJ5Tm08dOHAbdR3GrvP+yUUfkjG5ePsHYczMFLq3ZmMkAD98cDgcT2iA1lJ9NVwFd4tH/iSSoe44YWkltEA==" + }, + "node_modules/@protobufjs/pool": { + "version": "1.1.0", + "resolved": "https://registry.npmjs.org/@protobufjs/pool/-/pool-1.1.0.tgz", + "integrity": "sha512-0kELaGSIDBKvcgS4zkjz1PeddatrjYcmMWOlAuAPwAeccUrPHdUqo/J6LiymHHEiJT5NrF1UVwxY14f+fy4WQw==" + }, + "node_modules/@protobufjs/utf8": { + "version": "1.1.0", + "resolved": "https://registry.npmjs.org/@protobufjs/utf8/-/utf8-1.1.0.tgz", + "integrity": "sha512-Vvn3zZrhQZkkBE8LSuW3em98c0FwgO4nxzv6OdSxPKJIEKY2bGbHn+mhGIPerzI4twdxaP8/0+06HBpwf345Lw==" + }, "node_modules/@puppeteer/browsers": { "version": "2.7.1", "resolved": "https://registry.npmjs.org/@puppeteer/browsers/-/browsers-2.7.1.tgz", @@ -3642,6 +3697,11 @@ "node": ">= 12.0.0" } }, + "node_modules/long": { + "version": "5.2.3", + "resolved": "https://registry.npmjs.org/long/-/long-5.2.3.tgz", + "integrity": "sha512-lcHwpNoggQTObv5apGNCTdJrO69eHOZMi4BNC+rTLER8iHAqGrUVeLh/irVIM7zTw2bOXA8T6uNPeujwOLg/2Q==" + }, "node_modules/lru-cache": { "version": "6.0.0", "resolved": "https://registry.npmjs.org/lru-cache/-/lru-cache-6.0.0.tgz", @@ -4649,6 +4709,29 @@ "node": ">= 8" } }, + "node_modules/protobufjs": { + "version": "7.4.0", + "resolved": "https://registry.npmjs.org/protobufjs/-/protobufjs-7.4.0.tgz", + "integrity": "sha512-mRUWCc3KUU4w1jU8sGxICXH/gNS94DvI1gxqDvBzhj1JpcsimQkYiOJfwsPUykUI5ZaspFbSgmBLER8IrQ3tqw==", + "hasInstallScript": true, + "dependencies": { + "@protobufjs/aspromise": "^1.1.2", + "@protobufjs/base64": "^1.1.2", + "@protobufjs/codegen": "^2.0.4", + "@protobufjs/eventemitter": "^1.1.0", + "@protobufjs/fetch": "^1.1.0", + "@protobufjs/float": "^1.0.2", + "@protobufjs/inquire": "^1.1.0", + "@protobufjs/path": "^1.1.2", + "@protobufjs/pool": "^1.1.0", + "@protobufjs/utf8": "^1.1.0", + "@types/node": ">=13.7.0", + "long": "^5.0.0" + }, + "engines": { + "node": ">=12.0.0" + } + }, "node_modules/proxy-addr": { "version": "2.0.7", "resolved": "https://registry.npmjs.org/proxy-addr/-/proxy-addr-2.0.7.tgz", diff --git a/Framework/package.json b/Framework/package.json index 82312ae3d..f7068100f 100644 --- a/Framework/package.json +++ b/Framework/package.json @@ -39,7 +39,8 @@ "mysql": "^2.18.1", "openid-client": "^5.6.0", "winston": "3.17.0", - "ws": "^8.18.0" + "ws": "^8.18.0", + "protobufjs": "^7.4.0" }, "devDependencies": { "@eslint/js": "^9.20.0", @@ -52,7 +53,8 @@ "nyc": "^17.1.0", "puppeteer": "^24.2.0", "sinon": "19.0.2", - "supertest": "^7.0.0" + "supertest": "^7.0.0", + "long": "^5.2.3" }, "main": "Backend/index.js" }