Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial mqtt calls for endpoints #403

Merged
merged 21 commits into from
Nov 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { ClockCircleFilled } from '@ant-design/icons';
import React from 'react';
import { statusToType } from './instance-helpers';
import { convertISODurationToMiliseconds, getMetaDataFromElement } from '@proceed/bpmn-helper';
import { generateRequestUrl } from '@/lib/engines/endpoints';
import { endpointBuilder } from '@/lib/engines/endpoint';
import { DisplayTable, RelevantInstanceInfo } from './instance-info-panel';

function transformMilisecondsToTimeFormat(milliseconds: number | undefined) {
Expand Down Expand Up @@ -47,10 +47,11 @@ export function ElementStatus({ info }: { info: RelevantInstanceInfo }) {
}}
>
<Image
src={generateRequestUrl(
{ id: '', ip: 'localhost', port: 33029 },
`/resources/process/${info.process.definitionId}/images/${metaData.overviewImage}`,
)}
// TODO: use engine endpoint to get the image
src={endpointBuilder('get', '/resources/process/:definitionId/images/:fileName', {
definitionId: info.process.definitionId,
fileName: metaData.overviewImage,
})}
/>
</div>,
]);
Expand Down
57 changes: 57 additions & 0 deletions src/management-system-v2/app/admin/engines/engines-table.tsx
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
'use client';

import { Tag } from 'antd';
import { useState } from 'react';
import { type TableEngine } from './page';
import ElementList from '@/components/item-list-view';
import Bar from '@/components/bar';
import useFuzySearch from '@/lib/useFuzySearch';

export default function EnginesTable({ engines }: { engines: TableEngine[] }) {
const { filteredData, searchQuery, setSearchQuery } = useFuzySearch({
data: engines,
keys: ['name'],
highlightedKeys: ['name'],
transformData: (matches) => matches.map((match) => match.item),
});

const [selectedEngines, setSelectedEngines] = useState<typeof filteredData>([]);

return (
<>
<Bar
searchProps={{
value: searchQuery,
onChange: (e) => setSearchQuery(e.target.value),
onPressEnter: (e) => setSearchQuery(e.currentTarget.value),
placeholder: 'Search spaces ...',
}}
/>

<ElementList
data={filteredData}
elementSelection={{
selectedElements: selectedEngines,
setSelectionElements: setSelectedEngines,
}}
columns={[
{
title: 'Engine ID',
dataIndex: 'name',
render: (_, engine) => engine.name.highlighted,
},
{
title: 'Status',
dataIndex: 'owner',
sorter: (a, b) => +a.running - +b.running,
render: (_, engine) => (
<Tag color={engine.running ? 'success' : 'error'}>
{engine.running ? 'Online' : 'Offline'}
</Tag>
),
},
]}
/>
</>
);
}
44 changes: 44 additions & 0 deletions src/management-system-v2/app/admin/engines/page.tsx
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import { getCurrentUser } from '@/components/auth';
import Content from '@/components/content';
import { getEngines } from '@/lib/engines/mqtt-endpoints';
import { Result, Skeleton } from 'antd';
import { notFound, redirect } from 'next/navigation';
import { Suspense } from 'react';
import { getSystemAdminByUserId } from '@/lib/data/DTOs';
import EnginesTable from './engines-table';
import { env } from '@/lib/env-vars';

export type TableEngine = Awaited<ReturnType<typeof getEngines>>[number] & { name: string };

async function Engines() {
const user = await getCurrentUser();
if (!user.session) redirect('/');
const adminData = getSystemAdminByUserId(user.userId);
if (!adminData) redirect('/');

try {
const engines = (await getEngines()).map((e) => ({ ...e, name: e.id }));

return <EnginesTable engines={engines} />;
} catch (e) {
console.error(e);
return <Result status="500" title="Error" subTitle="Couldn't fetch engines" />;
}
}

export default function EnginesPage() {
if (!env.NEXT_PUBLIC_ENABLE_EXECUTION) return notFound();

if (!env.MQTT_SERVER_ADDRESS)
return <Result status="500" title="Error" subTitle="No MQTT server address configured" />;

return (
<Content title="Engines">
<Suspense fallback={<Skeleton active />}>
<Engines />
</Suspense>
</Content>
);
}

export const dynamic = 'force-dynamic';
7 changes: 7 additions & 0 deletions src/management-system-v2/app/admin/layout.tsx
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import Link from 'next/link';
import Layout from '@/app/(dashboard)/[environmentId]/layout-client';
import { AreaChartOutlined, AppstoreOutlined, FileOutlined } from '@ant-design/icons';
import { MdOutlineComputer } from 'react-icons/md';
import { FaUsers } from 'react-icons/fa';
import { RiAdminFill } from 'react-icons/ri';

Expand Down Expand Up @@ -45,6 +46,12 @@ export default function AdminLayout({ children }: { children: React.ReactNode })
label: <Link href="/admin/systemadmins">Manage admins</Link>,
icon: <RiAdminFill />,
},

{
key: 'engines',
label: <Link href="/admin/engines">Engines</Link>,
icon: <MdOutlineComputer />,
},
],
},
]}
Expand Down
2 changes: 1 addition & 1 deletion src/management-system-v2/lib/engines/deployment.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import {
// @ts-ignore
// import decider from '@proceed/decider';
import { Machine, getMachines } from './machines';
import * as endpoints from './endpoints';
import * as endpoints from './http-endpoints';
import { prepareExport } from '../process-export/export-preparation';
import { Prettify } from '../typescript-utils';

Expand Down
30 changes: 30 additions & 0 deletions src/management-system-v2/lib/engines/endpoint.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
type EndpointSchema = typeof import('./endpoints.json');
type Endpoints = EndpointSchema;
type Methods = 'get' | 'post' | 'put' | 'delete';

type GetParamsFromString<
Copy link
Contributor

@jjoderis jjoderis Nov 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would maybe add one or two comments to this file to explain what exactly these types do.
And is "Count" actually a fitting name? I looks like it is a list of all arguments that are behind a ":" in the string.

Str extends string,
Count extends unknown[] = [],
> = Str extends `${infer Start}:${string}/${infer Rest}`
? Str extends `${Start}:${infer Param}/${Rest}`
? GetParamsFromString<Rest, [...Count, Param]>
: Count
: Str extends `${string}:${infer End}`
? [...Count, End]
: Count;

type EndpointArgsArray<ParamsArray extends string[]> = ParamsArray extends []
? []
: [Record<ParamsArray[number], string>];
type EndpointArgs<Endpoint extends string> = EndpointArgsArray<GetParamsFromString<Endpoint>>;

type AvailableEndpoints<Method extends Methods> = keyof Endpoints[Method] extends string
? keyof Endpoints[Method]
: never;
export function endpointBuilder<Method extends Methods, Url extends AvailableEndpoints<Method>>(
_: Method,
endpoint: Url,
...options: EndpointArgs<Url>
) {
return endpoint.replace(/:([^/]+)/g, (_, capture_group) => options[0]?.[capture_group] || '');
}
67 changes: 67 additions & 0 deletions src/management-system-v2/lib/engines/endpoints.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
{
"get": {
"/machine/:properties": {
"params": true
},
"/machine/": {},
"/capabilities/": {},
"/configuration/": {},
"/configuration/:key": {},
"/logging": {},
"/logging/status": {},
"/logging/standard": {},
"/logging/process": {},
"/logging/process/:definitionId": {},
"/logging/process/:definitionId/instance/:instanceId": {},
"/monitoring/": {},
"/tasklist/api/": {},
"/tasklist/api/userTask": {},
"/configuration/api/config": {},
"/logging/api/log": {},
"/": {},
"/process/": {},
"/process/:definitionId": {},
"/process/:definitionId/versions": {},
"/process/:definitionId/versions/:version": {},
"/process/:definitionId/instance": {},
"/process/:definitionId/instance/:instanceID": {},
"/process/:definitionId/user-tasks/:fileName": {},
"/process/:definitionId/user-tasks": {},
"/status/": {},
"/resources/process/:definitionId/images/:fileName": {},
"/resources/process/:definitionId/images/": {}
},
"post": {
"/capabilities/execute": {},
"/capabilities/return": {},
"/evaluation/": {},
"/tasklist/api/userTask": {},
"/configuration/api/config": {},
"/process/": {},
"/process/:definitionId/versions/:version/instance": {},
"/process/:definitionId/instance/:instanceId/tokens": {},
"/process/:definitionId/instance/:instanceId/variables": {},
"/process/:definitionId/versions/:version/instance/migration": {}
},
"put": {
"/configuration/": {},
"/tasklist/api/variable": {},
"/tasklist/api/milestone": {},
"/process/:definitionId/instance/:instanceID": {},
"/process/:definitionId/instance/:instanceID/instanceState": {},
"/process/:definitionId/instance/:instanceId/tokens/:tokenId": {},
"/process/:definitionId/instance/:instanceId/tokens/:tokenId/currentFlowNodeState": {},
"/process/:definitionId/user-tasks/:fileName": {},
"/resources/process/:definitionId/images/:fileName": {}
},
"delete": {
"/configuration/": {},
"/logging": {},
"/logging/standard": {},
"/logging/process": {},
"/logging/process/:definitionId": {},
"/logging/process/:definitionId/instance/:instanceId": {},
"/process/:definitionId": {},
"/process/:definitionId/instance/:instanceId/tokens/:tokenId": {}
}
}
126 changes: 126 additions & 0 deletions src/management-system-v2/lib/engines/mqtt-endpoints.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
import mqtt from 'mqtt';
import { env } from '@/lib/env-vars';

const mqttTimeout = 1000;

const mqttCredentials = {
password: env.MQTT_PASSWORD,
username: env.MQTT_USERNAME,
};

const baseTopicPrefix = env.MQTT_BASETOPIC ? env.MQTT_BASETOPIC + '/' : '';

export function getClient(options?: mqtt.IClientOptions): Promise<mqtt.MqttClient> {
const address = env.MQTT_SERVER_ADDRESS || '';

return new Promise((res, rej) => {
const client = mqtt.connect(address, {
...mqttCredentials,
...options,
});
client.on('connect', () => res(client));
client.on('error', (err) => rej(err));
});
}

function subscribeToTopic(client: mqtt.MqttClient, topic: string) {
return new Promise<void>((res, rej) => {
setTimeout(rej, mqttTimeout); // Timeout if the subscription takes too long
client.subscribe(topic, (err) => {
if (err) rej(err);
res();
});
});
}

function getEnginePrefix(engineId: string) {
return `${baseTopicPrefix}proceed-pms/engine/${engineId}`;
}

export async function getEngines() {
const client = await getClient({
connectTimeout: mqttTimeout,
});

const engines: { id: string; running: boolean; version: string }[] = [];

await subscribeToTopic(client, `${getEnginePrefix('+')}/status`);

// All retained messages are sent at once
// The broker should bundle them in one tcp packet,
// after it is parsed all messages are in the queue, and handled before close
// is handled, as the packets where pushed to the queue before the close event was emitted.
// This is of course subject to the implementation of the broker,
// however for a small amount of engines it should be fine.
await new Promise<void>((res) => {
setTimeout(res, mqttTimeout); // Timeout in case we receive no messages

client.on('message', (topic, message) => {
const match = topic.match(new RegExp(`^${getEnginePrefix('')}([^\/]+)\/status`));
if (match) {
const id = match[1];
const status = JSON.parse(message.toString());
engines.push({ id, ...status });
res();
}
});
});

await client.endAsync();

return engines;
}

const requestClient = getClient();

export async function mqttRequest(
engineId: string,
url: string,
message: {
method: 'GET' | 'POST' | 'PUT' | 'DELETE';
body: Record<string, any>;
query: Record<string, any>;
page?: number;
},
) {
const client = await requestClient;

const requestId = crypto.randomUUID();
const requestTopic = getEnginePrefix(engineId) + '/api' + url;
await subscribeToTopic(client, requestTopic);

// handler for the response
let res: (res: any) => void, rej: (Err: any) => void;
const receivedAnswer = new Promise<any>((_res, _rej) => {
res = _res;
rej = _rej;
});
function handler(topic: string, _message: any) {
const message = JSON.parse(_message.toString());
if (topic !== requestTopic) return;
if (
!message ||
typeof message !== 'object' ||
!('type' in message) ||
message.type !== 'response' ||
!('id' in message) ||
message.id !== requestId
)
return;

res(JSON.parse(message.body));
}
client.on('message', handler);

// send request
client.publish(requestTopic, JSON.stringify({ ...message, type: 'request', id: requestId }));

// await for response or timeout
setTimeout(rej!, mqttTimeout);
const response = await receivedAnswer;

// cleanup
client.removeListener('message', handler);

return response;
}
5 changes: 5 additions & 0 deletions src/management-system-v2/lib/env-vars.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ const environmentVariables = {
}
})
.optional(),

MQTT_SERVER_ADDRESS: z.string().url().optional(),
MQTT_USERNAME: z.string().optional(),
MQTT_PASSWORD: z.string().optional(),
MQTT_BASETOPIC: z.string().optional(),
},
production: {
NEXTAUTH_SECRET: z.string(),
Expand Down
Loading