Skip to content

Commit

Permalink
Add kafka messages consumer to Framework (#2624)
Browse files Browse the repository at this point in the history
* Add kafka messages consumer to Framework

* Fix imports

* Fix proto imports

* Add proper test

* Add license and small refactor
  • Loading branch information
martinboulais authored Feb 11, 2025
1 parent f445a1d commit 8532349
Show file tree
Hide file tree
Showing 10 changed files with 549 additions and 2 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/proto-sync.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ jobs:
cp aliecs/core/protos/o2control.proto Control/protobuf
cp aliecs/common/protos/events.proto Control/protobuf/protos
cp aliecs/common/protos/common.proto Control/protobuf/protos/protos
cp aliecs/common/protos/events.proto Framework/Backend/protos
cp aliecs/common/protos/common.proto Framework/Backend/protos
cp aliecs/apricot/protos/apricot.proto Control/protobuf/o2apricot.proto
rm -rf aliecs
- name: Check if there are any differences and create PR
Expand Down
7 changes: 7 additions & 0 deletions Framework/Backend/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ const {
} = require('./errors/updateAndSendExpressResponseFromNativeError.js');
const { Logger } = require('./log/Logger');

const { getWebUiProtoIncludeDir } = require('./protobuf/getWebUiProtoIncludeDir');
const { AliEcsEventMessagesConsumer } = require('./kafka/AliEcsEventMessagesConsumer.js');

exports.ConsulService = ConsulService;

exports.HttpServer = HttpServer;
Expand Down Expand Up @@ -85,3 +88,7 @@ exports.GrpcErrorCodes = GrpcErrorCodes;
exports.grpcErrorToNativeError = grpcErrorToNativeError;

exports.updateAndSendExpressResponseFromNativeError = updateAndSendExpressResponseFromNativeError;

exports.getWebUiProtoIncludeDir = getWebUiProtoIncludeDir;

exports.AliEcsEventMessagesConsumer = AliEcsEventMessagesConsumer;
63 changes: 63 additions & 0 deletions Framework/Backend/kafka/AliEcsEventMessagesConsumer.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/**
* @license
* Copyright CERN and copyright holders of ALICE O2. This software is
* distributed under the terms of the GNU General Public License v3 (GPL
* Version 3), copied verbatim in the file "COPYING".
*
* See http://alice-o2.web.cern.ch/license for full licensing information.
*
* In applying this license CERN does not waive the privileges and immunities
* granted to it by virtue of its status as an Intergovernmental Organization
* or submit itself to any jurisdiction.
*/

const protobuf = require('protobufjs');
const path = require('node:path');
const { KafkaMessagesConsumer } = require('./KafkaMessagesConsumer.js');
const { getWebUiProtoIncludeDir } = require('../protobuf/getWebUiProtoIncludeDir.js');

// Customize protobuf loader to set the import directory, protobuf do not allow to do so...
const root = new protobuf.Root();
root.resolvePath = (origin, target) => {
if (path.isAbsolute(target)) {
return target;
}

return path.join(getWebUiProtoIncludeDir(), target);
};

root.loadSync(path.resolve(__dirname, '../protobuf/protos/events.proto'));
const EventMessage = root.lookupType('events.Event');

/**
* @callback MessageReceivedCallback
* @param {EventMessage} message received message
* @return {Promise<void>}
*/

/**
* Consumer that consume ECS event messages and pass them to previously-registered listeners
*/
class AliEcsEventMessagesConsumer extends KafkaMessagesConsumer {
/**
* Constructor
*
* @param {import('kafkajs').Kafka} kafkaClient configured kafka client
* @param {string} groupId the group id to use for the kafka consumer
* @param {string[]} topics the list of topics to consume
*/
constructor(kafkaClient, groupId, topics) {
super(kafkaClient, groupId, topics, EventMessage);
}

/**
* @inheritDoc
*/
getLoggerLabel() {
return 'ALI-ECS-EVENT-CONSUMER';
}
}

exports.AliEcsEventMessagesConsumer = AliEcsEventMessagesConsumer;

exports.EventMessage = EventMessage;
110 changes: 110 additions & 0 deletions Framework/Backend/kafka/KafkaMessagesConsumer.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/**
* @license
* Copyright 2019-2020 CERN and copyright holders of ALICE O2.
* See http://alice-o2.web.cern.ch/copyright for details of the copyright holders.
* All rights not expressly granted are reserved.
*
* This software is distributed under the terms of the GNU General Public
* License v3 (GPL Version 3), copied verbatim in the file "COPYING".
*
* In applying this license CERN does not waive the privileges and immunities
* granted to it by virtue of its status as an Intergovernmental Organization
* or submit itself to any jurisdiction.
*/

const { LogManager } = require('../log/LogManager.js');

/**
* Generic Kafka Message consumer extracting objects according to a protobuf definition
* @template T extends import('protobufjs').Type
*/
class KafkaMessagesConsumer {
/**
* Constructor
*
* @param {import('kafkajs').Kafka} kafkaClient configured kafka client
* @param {string} groupId the group id to use for the kafka consumer
* @param {string[]} topics the list of topics to consume
* @param {import('protobufjs').Type} protoType the type definition of the handled message
*/
constructor(kafkaClient, groupId, topics, protoType) {
this.consumer = kafkaClient.consumer({ groupId });
this._topics = topics;
this._protoType = protoType;

/**
* @type {MessageReceivedCallback[]}
* @private
*/
this._listeners = [];

this._logger = LogManager.getLogger(this.getLoggerLabel());
}

/**
* Register a listener to listen on event message being received
*
* Listeners are called all at once, not waiting for completion before calling the next ones, only errors are caught and logged
*
* @param {MessageReceivedCallback} listener the listener to register
* @return {void}
*/
onMessageReceived(listener) {
this._listeners.push(listener);
}

/**
* Start the kafka consumer
*
* @return {Promise<void>} Resolves once the consumer started to consume messages
*/
async start() {
this._logger.infoMessage(`Started to listen on kafka topic ${this._topics}`);
await this.consumer.connect();
await this.consumer.subscribe({ topics: this._topics });
await this.consumer.run({
eachMessage: async ({ message, topic }) => {
const error = this._protoType.verify(message.value);
if (error) {
this._logger.errorMessage(`Received an invalid message on "${topic}" ${error}`);
return;
}
this._logger.debugMessage(`Received message on ${topic}`);

try {
await this._handleEvent(this._protoType.toObject(
this._protoType.decode(message.value),
{ enums: String },
));
} catch (error) {
this._logger.errorMessage(`Failed to convert message to object on topic ${topic}: ${error}`);
}
},
});
}

/**
* Call every registered listeners by passing the given message to it
*
* @param {T} message the message to pass to listeners
* @return {void}
*/
async _handleEvent(message) {
for (const listener of this._listeners) {
listener(message).catch((error) => {
this._logger.errorMessage(`An error occurred when handling event: ${error.message}\n${error.stack}`);
});
}
}

/**
* Return the label to be used by the logger
*
* @return {string} the logger label
*/
getLoggerLabel() {
return 'EVENT-CONSUMER';
}
}

exports.KafkaMessagesConsumer = KafkaMessagesConsumer;
15 changes: 15 additions & 0 deletions Framework/Backend/protobuf/getWebUiProtoIncludeDir.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
/**
* @license
* Copyright 2019-2020 CERN and copyright holders of ALICE O2.
* See http://alice-o2.web.cern.ch/copyright for details of the copyright holders.
* All rights not expressly granted are reserved.
*
* This software is distributed under the terms of the GNU General Public
* License v3 (GPL Version 3), copied verbatim in the file "COPYING".
*
* In applying this license CERN does not waive the privileges and immunities
* granted to it by virtue of its status as an Intergovernmental Organization
* or submit itself to any jurisdiction.
*/

exports.getWebUiProtoIncludeDir = () => __dirname;
48 changes: 48 additions & 0 deletions Framework/Backend/protobuf/protos/common.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* === This file is part of ALICE O² ===
*
* Copyright 2024 CERN and copyright holders of ALICE O².
* Author: Teo Mrnjavac <[email protected]>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* In applying this license CERN does not waive the privileges and
* immunities granted to it by virtue of its status as an
* Intergovernmental Organization or submit itself to any jurisdiction.
*/


syntax = "proto3";

package common;
option java_package = "ch.cern.alice.o2.control.common";
option go_package = "github.com/AliceO2Group/Control/common/protos;pb";

//////////////// Common types ///////////////

message User {
// The unique CERN identifier of this user.
optional int32 externalId = 1;
// The unique identifier of this entity.
optional int32 id = 2;
// Name of the user.
string name = 3;
}

message WorkflowTemplateInfo {
string name = 1;
string description = 2;
string path = 3;
bool public = 4; // whether the environment is public or not
}
Loading

0 comments on commit 8532349

Please sign in to comment.