Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into ms2/mqtt-in-memory-st…
Browse files Browse the repository at this point in the history
…orage
  • Loading branch information
FelipeTrost committed Nov 30, 2024
2 parents d2064dc + 3016db8 commit 10f4f5a
Show file tree
Hide file tree
Showing 15 changed files with 372 additions and 404 deletions.
3 changes: 2 additions & 1 deletion src/engine/native/node/native-config/src/config_default.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@
"maxProcessLogEntries": 500,
"maxProcessLogTables": 5,
"rotationInterval": 600,
"maxStandardLogEntries": 1000
"maxStandardLogEntries": 1000,
"mqttLevel": "info"
},
"processes": {
"acceptUserTasks": false,
Expand Down
64 changes: 64 additions & 0 deletions src/engine/universal/core/src/messaging-setup.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
const { version: proceedVersion } = require('../../../native/node/package.json');
const { logging } = require('@proceed/machine');

/**
* This file contains functionality that handles setup and interactions of the messaging interface with other modules of the engine
Expand Down Expand Up @@ -58,4 +59,67 @@ module.exports = {
}
}
},
async setupMonitoringAndLogging(messaging, configModule, machineModule, logger) {
let { serverAddress, baseTopic } = await configModule.readConfig('messaging');
if (!serverAddress) return;

if (baseTopic && !baseTopic.endsWith('/')) baseTopic += '/';
baseTopic += 'proceed-pms';

// Monitoring data
const [{ id: machineId }, loadInterval] = await Promise.all([
machineModule.getMachineInformation(['id']),
configModule.readConfig('engine.loadInterval'),
]);

setInterval(async () => {
try {
const machineData = await machineModule.getMachineInformation();
await messaging.publish(`${baseTopic}/engine/${machineId}/machine/monitoring`, machineData);
} catch (e) {
logger.error('Failed to publish monitoring data');
}
}, loadInterval * 1000);

// Logging data
const mqttLevel = await configModule.readConfig('logs.mqttLevel');
const orderedLevels = ['trace', 'debug', 'info', 'warn', 'error', 'fatal'];
const mqttLevelIdx = orderedLevels.indexOf(mqttLevel);
const logGuard = (level) => orderedLevels.indexOf(level) >= mqttLevelIdx;

logging.registerCallback(async (obj, log) => {
try {
if (!logGuard(log?.level)) return;

const sentMessages = [];
const print = `[${log.level.toUpperCase()}] ${log.moduleName}${obj.definitionId || ''} ${log.msg}`;

sentMessages.push(messaging.publish(`${baseTopic}/engine/${machineId}/logging`, print));

if (obj.definitionId) {
sentMessages.push(
messaging.publish(`${baseTopic}/engine/${machineId}/logging/process`, print),
);

if (log.instanceId) {
sentMessages.push(
messaging.publish(
`${baseTopic}/engine/process/${obj.definitionId}/instance/${log.instanceId}/logging`,
print,
),
);
}
} else {
sentMessages.push(
messaging.publish(`${baseTopic}/engine/${machineId}/logging/standard`, print),
);
}

await Promise.all(sentMessages);
} catch (e) {
// NOTE: using logger.error could cause an infinite loop if publish keeps failing
console.error(e);
}
});
},
};
3 changes: 2 additions & 1 deletion src/engine/universal/core/src/module.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ const monitoring = require('@proceed/monitoring');
const management = require('./management.js');
const { setup5thIndustryEndpoints } = require('./engine/5thIndustry.js');
const { enableInterruptedInstanceRecovery } = require('../../../../../FeatureFlags.js');
const { setupMessaging } = require('./messaging-setup.js');
const { setupMessaging, setupMonitoringAndLogging } = require('./messaging-setup.js');
const { enableMessaging, enable5thIndustryIntegration } = require('../../../../../FeatureFlags.js');

const configObject = {
Expand Down Expand Up @@ -53,6 +53,7 @@ module.exports = {

if (enableMessaging) {
await setupMessaging(system.messaging, config, machineInformation, logger);
await setupMonitoringAndLogging(system.messaging, config, machineInformation, logger);
}

if (!options.silentMode) {
Expand Down
23 changes: 21 additions & 2 deletions src/engine/universal/machine/logging/logging.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ const rotationUtils = require('./src/utils/logRotationUtils');
const startRotation = require('./src/rotation/rotation');
const routes = require('./src/routes/logRoutes');

/** @typedef {{moduleName:string; definitionId: string; consoleOnly: boolean;}} LoggerConfObject */

let singletonInstance;

/**
Expand All @@ -22,6 +24,7 @@ class Logging {
this.doneInitializing = undefined;
// See @proceed/system.console
console.constructor._setLoggingModule(this);
this.logCallbacks = new Set();
}

/**
Expand Down Expand Up @@ -65,13 +68,29 @@ class Logging {
routes(this);
}

/** @param {(obj: LoggerConfObject, log:any)=>void} callback */
registerCallback(callback) {
return this.logCallbacks.add(callback);
}

/** @param {(obj: LoggerConfObject, log:string)=>void} callback */
unregisterCallback(callback) {
return this.logCallbacks.delete(callback);
}

/**
* Factory method creating a logger
* @param {object} confObject An object containing all configuration parameters for the logger
* @param {LoggerConfObject } confObject An object containing all configuration parameters for the logger
* @returns a logger
*/
getLogger(confObject) {
const logger = loggerLoader(confObject, this.init.bind(this));
const logger = loggerLoader(confObject, this.init.bind(this), [
(msg) => {
for (const callback of this.logCallbacks) {
callback(confObject, msg);
}
},
]);
return logger;
}

Expand Down
11 changes: 7 additions & 4 deletions src/engine/universal/machine/logging/src/loggerHelpers/logger.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,17 @@ const writerLoader = require('./writers.js');
*
* Class for logger instances
* Instantiates a new logger
* @param {object} confObject The configuration for the logger
* @param {{moduleName:string; definitionId: string; consoleOnly: boolean;}} confObject The configuration for the logger
* @param {promise} loggingInitializedPromise a promise indicating that the logger has
* @param {((...args: any[])=>void)[]} [customWriters] Custom writer functions to be used by the logger
* finished being asynchronously initialized
*/
class Logger {
constructor(confObject, loggingInitializer) {
constructor(confObject, loggingInitializer, customWriters) {
this.instanceInitialized = false;
this.confObject = confObject;
this.loggingInitializer = loggingInitializer;
this.functionsForWriter = [];
this.functionsForWriter = customWriters ? [...customWriters] : [];
this.moduleName = confObject.moduleName;
}

Expand Down Expand Up @@ -184,7 +185,9 @@ Logger.initialization = new Promise((resolve) => {
/**
* @param {object} confObject The configuration for the logger
* @param {promise} loggingInitializedPromise a promise indicating that the logger has
* @param {((...args: any[])=>void)[]} [customWriters] Custom writer functions to be used by the logger
* finished being asynchronously initialized
* @returns a configured instance of the Logger class
*/
module.exports = (confObject, loggingInitializer) => new Logger(confObject, loggingInitializer);
module.exports = (confObject, loggingInitializer, customWriters) =>
new Logger(confObject, loggingInitializer, customWriters);
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
import { ReactNode } from 'react';
import { Alert, Checkbox, Image, Progress, ProgressProps, Space, Typography } from 'antd';
import { ClockCircleFilled } from '@ant-design/icons';
import React from 'react';
import { statusToType } from './instance-helpers';
import { convertISODurationToMiliseconds, getMetaDataFromElement } from '@proceed/bpmn-helper';
import { endpointBuilder } from '@/lib/engines/endpoint';
import { getPlanDelays, getTimeInfo, statusToType } from './instance-helpers';
import { getMetaDataFromElement } from '@proceed/bpmn-helper';
import { DisplayTable, RelevantInstanceInfo } from './instance-info-panel';
import { endpointBuilder } from '@/lib/engines/endpoint';

function transformMilisecondsToTimeFormat(milliseconds: number | undefined) {
function transformMillisecondsToTimeFormat(milliseconds: number | undefined) {
if (!milliseconds || milliseconds < 0 || milliseconds < 1000) return;

const days = Math.floor(milliseconds / (3600000 * 24));
Expand All @@ -21,7 +20,6 @@ function transformMilisecondsToTimeFormat(milliseconds: number | undefined) {
const seconds = Math.floor(milliseconds / 1000);
milliseconds -= seconds * 1000;

// Will display time in 10:30:23 format
return `${days} Days, ${hours}h, ${minutes}min, ${seconds}s`;
}

Expand All @@ -46,6 +44,7 @@ export function ElementStatus({ info }: { info: RelevantInstanceInfo }) {
marginTop: '1rem',
}}
>
{/** TODO: correct image url */}
<Image
// TODO: use engine endpoint to get the image
src={endpointBuilder('get', '/resources/process/:definitionId/images/:fileName', {
Expand Down Expand Up @@ -159,64 +158,14 @@ export function ElementStatus({ info }: { info: RelevantInstanceInfo }) {
statusEntries.push(['Documentation:', info.element.businessObject?.documentation?.[0]?.text]);

// Activity time calculation
let start: Date | undefined = undefined;
if (info.instance) {
if (isRootElement) start = new Date(info.instance.globalStartTime);
else if (logInfo) start = new Date(logInfo.startTime);
else if (token) start = new Date(token.currentFlowElementStartTime);
}

let end;
if (info.instance) {
if (isRootElement) {
const ended = info.instance.instanceState.every(
(state) =>
state !== 'RUNNING' &&
state !== 'READY' &&
state !== 'DEPLOYMENT-WAITING' &&
state !== 'PAUSING' &&
state !== 'PAUSED',
);

if (ended) {
const lastLog = info.instance.log[info.instance.log.length - 1];
if (lastLog) end = new Date(lastLog.endTime);
}
} else if (logInfo) {
end = new Date(logInfo.endTime);
}
}

let duration;
if (start && end) duration = end.getTime() - start.getTime();

const plan = {
end: metaData.timePlannedEnd ? new Date(metaData.timePlannedEnd) : undefined,
start: metaData.timePlannedOccurrence ? new Date(metaData.timePlannedOccurrence) : undefined,
duration: metaData.timePlannedDuration
? convertISODurationToMiliseconds(metaData.timePlannedDuration)
: undefined,
};

// The order in which missing times are derived from the others is irrelevant
// If there is only one -> not possible to derive the others
// If there are two -> derive the missing one (order doesn't matter)
// If there are three -> nothing to do

if (!plan.end && plan.start && plan.duration)
plan.end = new Date(plan.start.getTime() + plan.duration);

if (!plan.start && plan.end && plan.duration)
plan.start = new Date(plan.end.getTime() - plan.duration);

if (!plan.duration && plan.start && plan.end)
plan.duration = plan.end.getTime() - plan.start.getTime();
const { start, end, duration } = getTimeInfo({
element: info.element,
instance: info.instance,
logInfo,
token,
});

const delays = {
start: plan.start && start && start.getTime() - plan.start.getTime(),
end: plan.end && end && end.getTime() - plan.end.getTime(),
duration: plan.duration && duration && duration - plan.duration,
};
const { delays, plan } = getPlanDelays({ elementMetaData: metaData, start, end, duration });

// Activity time
statusEntries.push([
Expand All @@ -234,7 +183,7 @@ export function ElementStatus({ info }: { info: RelevantInstanceInfo }) {
<ClockCircleFilled style={{ fontSize: '1rem' }} />
<Typography.Text strong>Delay:</Typography.Text>
<Typography.Text type={delays.start && delays.start >= 1000 ? 'danger' : undefined}>
{transformMilisecondsToTimeFormat(delays.start)}
{transformMillisecondsToTimeFormat(delays.start)}
</Typography.Text>
</Space>,
]);
Expand All @@ -243,18 +192,18 @@ export function ElementStatus({ info }: { info: RelevantInstanceInfo }) {
<Space>
<ClockCircleFilled style={{ fontSize: '1rem' }} />
<Typography.Text strong>Duration:</Typography.Text>
<Typography.Text>{transformMilisecondsToTimeFormat(duration)}</Typography.Text>
<Typography.Text>{transformMillisecondsToTimeFormat(duration)}</Typography.Text>
</Space>,
<Space>
<ClockCircleFilled style={{ fontSize: '1rem' }} />
<Typography.Text strong>Planned Duration:</Typography.Text>
<Typography.Text>{transformMilisecondsToTimeFormat(plan.duration)}</Typography.Text>
<Typography.Text>{transformMillisecondsToTimeFormat(plan.duration)}</Typography.Text>
</Space>,
<Space>
<ClockCircleFilled style={{ fontSize: '1rem' }} />
<Typography.Text strong>Delay:</Typography.Text>
<Typography.Text type={delays.duration && delays.duration >= 1000 ? 'danger' : undefined}>
{delays.start ? transformMilisecondsToTimeFormat(delays.duration) : ''}
{transformMillisecondsToTimeFormat(delays.duration)}
</Typography.Text>
</Space>,
]);
Expand All @@ -274,7 +223,7 @@ export function ElementStatus({ info }: { info: RelevantInstanceInfo }) {
<ClockCircleFilled style={{ fontSize: '1rem' }} />
<Typography.Text strong>Delay:</Typography.Text>
<Typography.Text type={delays.end && delays.end >= 1000 ? 'danger' : undefined}>
{transformMilisecondsToTimeFormat(delays.end)}
{transformMillisecondsToTimeFormat(delays.end)}
</Typography.Text>
</Space>,
]);
Expand Down
Loading

0 comments on commit 10f4f5a

Please sign in to comment.