Skip to content

Commit

Permalink
Initial mqtt calls for endpoints (#403)
Browse files Browse the repository at this point in the history
* feat(ms2/engines): initial mqtt endpoints

* feat(ms2/engines): endpoints for engines

* feat(ms2/engines): mqtt requests

* feat(ms2/admin): engine view

* refactor(ms2/mqtt)

* fix(ms2/engine-endpoints): import

* fix(ms2/admin-engines): make page dynamic

* feat(ms2/admin): added link to engines in sidebar

* chore: format

* fix(ms2/env-vars): mqtt env vars shouldn't be required yet

* fix: possible undefined

* fix: engine name key

* fix(ms2/build): unset env-var

* fix(ms2/mqtt): correct engine id key

* admin-dashboard/engines: better error message

* fix: import
  • Loading branch information
FelipeTrost authored Nov 28, 2024
1 parent 34ee1e8 commit cb7885c
Show file tree
Hide file tree
Showing 12 changed files with 497 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { ClockCircleFilled } from '@ant-design/icons';
import React from 'react';
import { statusToType } from './instance-helpers';
import { convertISODurationToMiliseconds, getMetaDataFromElement } from '@proceed/bpmn-helper';
import { generateRequestUrl } from '@/lib/engines/endpoints';
import { endpointBuilder } from '@/lib/engines/endpoint';
import { DisplayTable, RelevantInstanceInfo } from './instance-info-panel';

function transformMilisecondsToTimeFormat(milliseconds: number | undefined) {
Expand Down Expand Up @@ -47,10 +47,11 @@ export function ElementStatus({ info }: { info: RelevantInstanceInfo }) {
}}
>
<Image
src={generateRequestUrl(
{ id: '', ip: 'localhost', port: 33029 },
`/resources/process/${info.process.definitionId}/images/${metaData.overviewImage}`,
)}
// TODO: use engine endpoint to get the image
src={endpointBuilder('get', '/resources/process/:definitionId/images/:fileName', {
definitionId: info.process.definitionId,
fileName: metaData.overviewImage,
})}
/>
</div>,
]);
Expand Down
57 changes: 57 additions & 0 deletions src/management-system-v2/app/admin/engines/engines-table.tsx
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
'use client';

import { Tag } from 'antd';
import { useState } from 'react';
import { type TableEngine } from './page';
import ElementList from '@/components/item-list-view';
import Bar from '@/components/bar';
import useFuzySearch from '@/lib/useFuzySearch';

export default function EnginesTable({ engines }: { engines: TableEngine[] }) {
const { filteredData, searchQuery, setSearchQuery } = useFuzySearch({
data: engines,
keys: ['name'],
highlightedKeys: ['name'],
transformData: (matches) => matches.map((match) => match.item),
});

const [selectedEngines, setSelectedEngines] = useState<typeof filteredData>([]);

return (
<>
<Bar
searchProps={{
value: searchQuery,
onChange: (e) => setSearchQuery(e.target.value),
onPressEnter: (e) => setSearchQuery(e.currentTarget.value),
placeholder: 'Search spaces ...',
}}
/>

<ElementList
data={filteredData}
elementSelection={{
selectedElements: selectedEngines,
setSelectionElements: setSelectedEngines,
}}
columns={[
{
title: 'Engine ID',
dataIndex: 'name',
render: (_, engine) => engine.name.highlighted,
},
{
title: 'Status',
dataIndex: 'owner',
sorter: (a, b) => +a.running - +b.running,
render: (_, engine) => (
<Tag color={engine.running ? 'success' : 'error'}>
{engine.running ? 'Online' : 'Offline'}
</Tag>
),
},
]}
/>
</>
);
}
44 changes: 44 additions & 0 deletions src/management-system-v2/app/admin/engines/page.tsx
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import { getCurrentUser } from '@/components/auth';
import Content from '@/components/content';
import { getEngines } from '@/lib/engines/mqtt-endpoints';
import { Result, Skeleton } from 'antd';
import { notFound, redirect } from 'next/navigation';
import { Suspense } from 'react';
import { getSystemAdminByUserId } from '@/lib/data/DTOs';
import EnginesTable from './engines-table';
import { env } from '@/lib/env-vars';

export type TableEngine = Awaited<ReturnType<typeof getEngines>>[number] & { name: string };

async function Engines() {
const user = await getCurrentUser();
if (!user.session) redirect('/');
const adminData = getSystemAdminByUserId(user.userId);
if (!adminData) redirect('/');

try {
const engines = (await getEngines()).map((e) => ({ ...e, name: e.id }));

return <EnginesTable engines={engines} />;
} catch (e) {
console.error(e);
return <Result status="500" title="Error" subTitle="Couldn't fetch engines" />;
}
}

export default function EnginesPage() {
if (!env.NEXT_PUBLIC_ENABLE_EXECUTION) return notFound();

if (!env.MQTT_SERVER_ADDRESS)
return <Result status="500" title="Error" subTitle="No MQTT server address configured" />;

return (
<Content title="Engines">
<Suspense fallback={<Skeleton active />}>
<Engines />
</Suspense>
</Content>
);
}

export const dynamic = 'force-dynamic';
7 changes: 7 additions & 0 deletions src/management-system-v2/app/admin/layout.tsx
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import Link from 'next/link';
import Layout from '@/app/(dashboard)/[environmentId]/layout-client';
import { AreaChartOutlined, AppstoreOutlined, FileOutlined } from '@ant-design/icons';
import { MdOutlineComputer } from 'react-icons/md';
import { FaUsers } from 'react-icons/fa';
import { RiAdminFill } from 'react-icons/ri';

Expand Down Expand Up @@ -45,6 +46,12 @@ export default function AdminLayout({ children }: { children: React.ReactNode })
label: <Link href="/admin/systemadmins">Manage admins</Link>,
icon: <RiAdminFill />,
},

{
key: 'engines',
label: <Link href="/admin/engines">Engines</Link>,
icon: <MdOutlineComputer />,
},
],
},
]}
Expand Down
2 changes: 1 addition & 1 deletion src/management-system-v2/lib/engines/deployment.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import {
// @ts-ignore
// import decider from '@proceed/decider';
import { Machine, getMachines } from './machines';
import * as endpoints from './endpoints';
import * as endpoints from './http-endpoints';
import { prepareExport } from '../process-export/export-preparation';
import { Prettify } from '../typescript-utils';

Expand Down
30 changes: 30 additions & 0 deletions src/management-system-v2/lib/engines/endpoint.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
type EndpointSchema = typeof import('./endpoints.json');
type Endpoints = EndpointSchema;
type Methods = 'get' | 'post' | 'put' | 'delete';

type GetParamsFromString<
Str extends string,
Count extends unknown[] = [],
> = Str extends `${infer Start}:${string}/${infer Rest}`
? Str extends `${Start}:${infer Param}/${Rest}`
? GetParamsFromString<Rest, [...Count, Param]>
: Count
: Str extends `${string}:${infer End}`
? [...Count, End]
: Count;

type EndpointArgsArray<ParamsArray extends string[]> = ParamsArray extends []
? []
: [Record<ParamsArray[number], string>];
type EndpointArgs<Endpoint extends string> = EndpointArgsArray<GetParamsFromString<Endpoint>>;

type AvailableEndpoints<Method extends Methods> = keyof Endpoints[Method] extends string
? keyof Endpoints[Method]
: never;
export function endpointBuilder<Method extends Methods, Url extends AvailableEndpoints<Method>>(
_: Method,
endpoint: Url,
...options: EndpointArgs<Url>
) {
return endpoint.replace(/:([^/]+)/g, (_, capture_group) => options[0]?.[capture_group] || '');
}
67 changes: 67 additions & 0 deletions src/management-system-v2/lib/engines/endpoints.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
{
"get": {
"/machine/:properties": {
"params": true
},
"/machine/": {},
"/capabilities/": {},
"/configuration/": {},
"/configuration/:key": {},
"/logging": {},
"/logging/status": {},
"/logging/standard": {},
"/logging/process": {},
"/logging/process/:definitionId": {},
"/logging/process/:definitionId/instance/:instanceId": {},
"/monitoring/": {},
"/tasklist/api/": {},
"/tasklist/api/userTask": {},
"/configuration/api/config": {},
"/logging/api/log": {},
"/": {},
"/process/": {},
"/process/:definitionId": {},
"/process/:definitionId/versions": {},
"/process/:definitionId/versions/:version": {},
"/process/:definitionId/instance": {},
"/process/:definitionId/instance/:instanceID": {},
"/process/:definitionId/user-tasks/:fileName": {},
"/process/:definitionId/user-tasks": {},
"/status/": {},
"/resources/process/:definitionId/images/:fileName": {},
"/resources/process/:definitionId/images/": {}
},
"post": {
"/capabilities/execute": {},
"/capabilities/return": {},
"/evaluation/": {},
"/tasklist/api/userTask": {},
"/configuration/api/config": {},
"/process/": {},
"/process/:definitionId/versions/:version/instance": {},
"/process/:definitionId/instance/:instanceId/tokens": {},
"/process/:definitionId/instance/:instanceId/variables": {},
"/process/:definitionId/versions/:version/instance/migration": {}
},
"put": {
"/configuration/": {},
"/tasklist/api/variable": {},
"/tasklist/api/milestone": {},
"/process/:definitionId/instance/:instanceID": {},
"/process/:definitionId/instance/:instanceID/instanceState": {},
"/process/:definitionId/instance/:instanceId/tokens/:tokenId": {},
"/process/:definitionId/instance/:instanceId/tokens/:tokenId/currentFlowNodeState": {},
"/process/:definitionId/user-tasks/:fileName": {},
"/resources/process/:definitionId/images/:fileName": {}
},
"delete": {
"/configuration/": {},
"/logging": {},
"/logging/standard": {},
"/logging/process": {},
"/logging/process/:definitionId": {},
"/logging/process/:definitionId/instance/:instanceId": {},
"/process/:definitionId": {},
"/process/:definitionId/instance/:instanceId/tokens/:tokenId": {}
}
}
126 changes: 126 additions & 0 deletions src/management-system-v2/lib/engines/mqtt-endpoints.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
import mqtt from 'mqtt';
import { env } from '@/lib/env-vars';

const mqttTimeout = 1000;

const mqttCredentials = {
password: env.MQTT_PASSWORD,
username: env.MQTT_USERNAME,
};

const baseTopicPrefix = env.MQTT_BASETOPIC ? env.MQTT_BASETOPIC + '/' : '';

export function getClient(options?: mqtt.IClientOptions): Promise<mqtt.MqttClient> {
const address = env.MQTT_SERVER_ADDRESS || '';

return new Promise((res, rej) => {
const client = mqtt.connect(address, {
...mqttCredentials,
...options,
});
client.on('connect', () => res(client));
client.on('error', (err) => rej(err));
});
}

function subscribeToTopic(client: mqtt.MqttClient, topic: string) {
return new Promise<void>((res, rej) => {
setTimeout(rej, mqttTimeout); // Timeout if the subscription takes too long
client.subscribe(topic, (err) => {
if (err) rej(err);
res();
});
});
}

function getEnginePrefix(engineId: string) {
return `${baseTopicPrefix}proceed-pms/engine/${engineId}`;
}

export async function getEngines() {
const client = await getClient({
connectTimeout: mqttTimeout,
});

const engines: { id: string; running: boolean; version: string }[] = [];

await subscribeToTopic(client, `${getEnginePrefix('+')}/status`);

// All retained messages are sent at once
// The broker should bundle them in one tcp packet,
// after it is parsed all messages are in the queue, and handled before close
// is handled, as the packets where pushed to the queue before the close event was emitted.
// This is of course subject to the implementation of the broker,
// however for a small amount of engines it should be fine.
await new Promise<void>((res) => {
setTimeout(res, mqttTimeout); // Timeout in case we receive no messages

client.on('message', (topic, message) => {
const match = topic.match(new RegExp(`^${getEnginePrefix('')}([^\/]+)\/status`));
if (match) {
const id = match[1];
const status = JSON.parse(message.toString());
engines.push({ id, ...status });
res();
}
});
});

await client.endAsync();

return engines;
}

const requestClient = getClient();

export async function mqttRequest(
engineId: string,
url: string,
message: {
method: 'GET' | 'POST' | 'PUT' | 'DELETE';
body: Record<string, any>;
query: Record<string, any>;
page?: number;
},
) {
const client = await requestClient;

const requestId = crypto.randomUUID();
const requestTopic = getEnginePrefix(engineId) + '/api' + url;
await subscribeToTopic(client, requestTopic);

// handler for the response
let res: (res: any) => void, rej: (Err: any) => void;
const receivedAnswer = new Promise<any>((_res, _rej) => {
res = _res;
rej = _rej;
});
function handler(topic: string, _message: any) {
const message = JSON.parse(_message.toString());
if (topic !== requestTopic) return;
if (
!message ||
typeof message !== 'object' ||
!('type' in message) ||
message.type !== 'response' ||
!('id' in message) ||
message.id !== requestId
)
return;

res(JSON.parse(message.body));
}
client.on('message', handler);

// send request
client.publish(requestTopic, JSON.stringify({ ...message, type: 'request', id: requestId }));

// await for response or timeout
setTimeout(rej!, mqttTimeout);
const response = await receivedAnswer;

// cleanup
client.removeListener('message', handler);

return response;
}
5 changes: 5 additions & 0 deletions src/management-system-v2/lib/env-vars.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ const environmentVariables = {
}
})
.optional(),

MQTT_SERVER_ADDRESS: z.string().url().optional(),
MQTT_USERNAME: z.string().optional(),
MQTT_PASSWORD: z.string().optional(),
MQTT_BASETOPIC: z.string().optional(),
},
production: {
NEXTAUTH_SECRET: z.string(),
Expand Down
Loading

0 comments on commit cb7885c

Please sign in to comment.