Skip to content

Commit

Permalink
feat!: refactor ConnectionManager
Browse files Browse the repository at this point in the history
Signed-off-by: Curtish <[email protected]>
  • Loading branch information
curtis-h committed Feb 27, 2025
1 parent fc9b17b commit 5a90c84
Show file tree
Hide file tree
Showing 39 changed files with 907 additions and 1,325 deletions.
16 changes: 4 additions & 12 deletions integration-tests/node/assertions.cjs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ const runTests = (describe, test, assert, SDK) => {

// misc modules
assert("ApiImpl" in SDK);
assert("BasicMediatorHandler" in SDK);
assert("ConnectionsManager" in SDK);
assert("PublicMediatorStore" in SDK);
// assert("BasicMediatorHandler" in SDK);
// assert("ConnectionsManager" in SDK);
// assert("PublicMediatorStore" in SDK);

// credentials
assert("AnonCredsCredential" in SDK);
Expand Down Expand Up @@ -160,22 +160,14 @@ const runTests = (describe, test, assert, SDK) => {
const pluto = new SDK.Pluto(store, apollo);
const did = SDK.Domain.DID.from("did:peer:2.Ez6LSghwSE437wnDE1pt3X6hVDUQzSjsHzinpX3XFvMjRAm7y.Vz6Mkhh1e5CEYYq6JBUcTZ6Cp2ranCWRrv7Yax3Le4N59R6dd.SeyJ0IjoiZG0iLCJzIjp7InVyaSI6Imh0dHA6Ly8xOTIuMTY4LjEuNDQ6ODA4MCIsImEiOlsiZGlkY29tbS92MiJdfX0.SeyJ0IjoiZG0iLCJzIjp7InVyaSI6IndzOi8vMTkyLjE2OC4xLjQ0OjgwODAvd3MiLCJhIjpbImRpZGNvbW0vdjIiXX19");
const agent = SDK.Agent.initialize({
mediatorDID: did,
// mediatorDID: did,
apollo,
castor,
pluto,
mercury,
seed,
});

// hack to avoid mediation startup
agent.connectionManager.cancellable = { cancel: () => { } };
agent.mediationHandler.mediator = {
hostDID: did,
mediatorDID: did,
routingDID: did,
};

await agent.start();
assert(agent.state === "running");
await agent.stop();
Expand Down
29 changes: 15 additions & 14 deletions src/edge-agent/Agent.MessageEvents.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
import {
AgentMessageEvents as AgentMessageEventsClass,
EventCallback,
ListenerKey,
} from "./types";
import { expect, notNil } from "../utils";
import { EventCallback, ListenerKey } from "./types";

/**
* An extension for the Edge agent that gives it capability of
Expand All @@ -11,9 +8,9 @@ import {
*
* @export
* @class AgentMessageEvents
* @typedef {AgentMessageEvents}
* @typedef {EventsManager}
*/
export class AgentMessageEvents implements AgentMessageEventsClass {
export class EventsManager {
private events: Map<ListenerKey, Set<EventCallback>> = new Map();

/**
Expand All @@ -29,8 +26,8 @@ export class AgentMessageEvents implements AgentMessageEventsClass {
if (!this.events.has(eventName)) {
this.events.set(eventName, new Set());
}
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
const callbacks = this.events.get(eventName)!;

const callbacks = expect(this.events.get(eventName));
callbacks.add(callback);
return callbacks.size - 1;
}
Expand All @@ -45,8 +42,10 @@ export class AgentMessageEvents implements AgentMessageEventsClass {
*/
public removeListener(eventName: ListenerKey, callback: EventCallback): void {
const callbacks = this.events.get(eventName);
if (!callbacks) return;
callbacks.delete(callback);

if (notNil(callbacks)) {
callbacks.delete(callback);
}
}

/**
Expand All @@ -58,9 +57,11 @@ export class AgentMessageEvents implements AgentMessageEventsClass {
*/
public emit(eventName: ListenerKey, data: any): void {
const callbacks = this.events.get(eventName);
if (!callbacks) return;
for (const callback of callbacks) {
callback(data);

if (notNil(callbacks)) {
for (const callback of callbacks) {
callback(data);
}
}
}
}
48 changes: 48 additions & 0 deletions src/edge-agent/connections/Connection.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import * as Domain from "../../domain";
import { DIDCommContext } from "../didcomm/Context";

/**
* Define the structure of a Connection
*/
export interface Connection {
/**
* unique identifer for the connection
*/
uri: string;
/**
* current condition of the connection
*/
state: Connection.State;
// ? convert Message to Protocol
/**
* handle delivering a Message to the connected entity
*/
send: (message: Domain.Message, ctx: DIDCommContext) => Promise<Domain.Message | undefined>;
/**
* called when a Message is received from this connection
*/
receive: (message: any, ctx: DIDCommContext) => Promise<void>;
/**
* handle any desired teardown
*/
close?: () => Promise<void>;
}

export namespace Connection {
export enum State {
// no interactions
UNKNOWN = 0,
// newly created but not negotiated
NEW = 1,
// request has been sent
REQUESTED = 2,
// request has been denied
DENIED = 3,
// request has been granted
GRANTED = 4,
// closed
CLOSED = 5,
// communication failing
BROKEN = 6,
}
}
86 changes: 86 additions & 0 deletions src/edge-agent/connections/ConnectionsManager.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
import { Nil, isString } from "../../utils";
import { Connection } from "./Connection";
import { MediatorConnection } from "./didcomm";

/**
* ConnectionsManager is responsible for handling Connections and Mediators
*
* Mediators are a Connection where we periodically fetch messages from
*
* @class ConnectionsManager
* @typedef {ConnectionsManager}
*/
export class ConnectionsManager {
private readonly connections: Connection[] = [];
private readonly mediators = new Set<string>();

// ?? tmp hack around only one mediator
get mediator(): MediatorConnection | Nil {
const mediator: string = this.mediators.values().next().value;
const connection = this.find(mediator);

if (connection instanceof MediatorConnection) {
return connection;
}

return null;
}

/**
* close all active connections
*/
async stop() {
for (const connection of this.connections) {
await connection.close?.();
}
}

/**
* add a Connection
*
* @async
* @param {DIDPair} paired
* @returns {Promise<void>}
*/
add<T extends Connection>(connection: T): void {
this.connections.push(connection);
}

/**
* add a Connection and mark it as a Mediator
* @param mediator
*/
addMediator<T extends Connection>(mediator: T): void {
this.add(mediator);
this.mediators.add(mediator.uri);
}

/**
* Remove a Connection
* this but just means the connection will be removed from the current storage
*
* @async
* @param {DIDPair} pair
* @returns {Promise<void>}
*/
async remove(connection: Connection | string): Promise<void> {
const uri = isString(connection) ? connection : connection.uri;
const index = this.connections.findIndex(x => x.uri === uri);

if (index !== -1) {
this.mediators.delete(uri);
this.connections.splice(index, 1);
}
}

/**
* Search for a Connection by it's unique identifier
*
* @param uri
* @returns
*/
find(uri: string): Connection | undefined {
const connection = this.connections.find(x => x.uri === uri);
return connection;
}
}
38 changes: 38 additions & 0 deletions src/edge-agent/connections/JobManager.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import { CancellableTask } from "../helpers/Task";

export class JobManager {
/**
* An array with cancellable tasks, mainly used to store one or multiple didcomm
* connections in storage implementation at the same time. All of them can be cancelled
* despite they run asyncronously when the Edge agent stops
*
* @public
* @type {CancellableTask<any>[]}
*/
// eslint-disable-next-line @typescript-eslint/no-explicit-any
public cancellables: CancellableTask<any>[] = [];
/**
* Cancellable task used to listen for new messages, stopping the Agent should also stop this
* from running and destroy the instance of the task until agent is started again
*
* @public
* @type {?CancellableTask<void>}
*/
public fetchMessages?: CancellableTask<void>;


/**
* Stops all jobs
*/
stop(): void {
this.fetchMessages?.cancel();
this.fetchMessages = undefined;

while (this.cancellables.length > 0) {
const [task] = this.cancellables.splice(0, 1);
if (task) {
task.cancel();
}
}
}
}
55 changes: 55 additions & 0 deletions src/edge-agent/connections/didcomm/DIDCommConnection.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import * as Domain from "../../../domain";
import { Ctor, Task, notNil } from "../../../utils";
import { ProtocolType } from "../../protocols/ProtocolTypes";
import { Connection } from "../Connection";
import { MediateDeny } from "../didcomm/MediateDeny";
import { MediateGrant } from "../didcomm/MediateGrant";
import { PickupDelivery } from "../didcomm/PickupDelivery";
import { PickupStatus } from "../didcomm/PickupStatus";
import { ProblemReport } from "../didcomm/ProblemReport";

export class DIDCommConnection implements Connection {
public readonly type = "DIDComm";
public readonly tasks = new Map<string, Ctor<Task<any>>>();
public state = Connection.State.NEW;

constructor(
public readonly uri: string,
public readonly host: string,
public readonly alias?: string
) {
this.tasks
.set(ProtocolType.DidcommMediationGrant, MediateGrant)
.set(ProtocolType.DidcommMediationDeny, MediateDeny)
.set(ProtocolType.PickupStatus, PickupStatus)
.set(ProtocolType.PickupDelivery, PickupDelivery)
.set(ProtocolType.ProblemReporting, ProblemReport);
}

async send(msg: Domain.Message, ctx: Task.Context) {
msg.direction = Domain.MessageDirection.SENT;
// filter which messages we want stored
const ignorePluto = [ProtocolType.PickupRequest, ProtocolType.DidcommMediationKeysUpdate];
const shouldStore = ignorePluto.every(x => x !== msg.piuri);

if (shouldStore) {
await ctx.Pluto.storeMessage(msg);
}

const response = await ctx.Mercury.sendMessageParseMessage(msg);

return this.receive(response, ctx);
}

async receive(msg: Domain.Message | undefined, ctx: Task.Context) {
// find the relevant task - enable handling of unmatched
const taskCtor = this.tasks.get(msg?.piuri ?? "unknown");

if (notNil(taskCtor)) {
const result = await ctx.run(new taskCtor({ message: msg }));
return result;
}

return undefined;
}
}
24 changes: 24 additions & 0 deletions src/edge-agent/connections/didcomm/MediateDeny.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import * as Domain from "../../../domain";
import { Task, expect } from "../../../utils";
import { DIDCommContext } from "../../didcomm/Context";
import { Connection } from "../Connection";

/**
* Mediation Denied
* gracefully handle denial
*/

interface Args {
message: Domain.Message;
}

export class MediateDeny extends Task<void, Args> {
async run(ctx: DIDCommContext) {
ctx.logger.warn(`Mediation denied for: ${this.args.message.from?.toString()}`);
ctx.logger.debug(`Mediate-Deny message:`, this.args.message);

const uri = expect(this.args.message.from);
const connection = expect(ctx.Connections.find(uri.toString()));
connection.state = Connection.State.DENIED;
}
}
34 changes: 34 additions & 0 deletions src/edge-agent/connections/didcomm/MediateGrant.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import * as Domain from "../../../domain";
import { Task, expect } from "../../../utils";
import { DIDCommContext } from "../../didcomm/Context";
import { MediationGrant } from "../../protocols/mediation/MediationGrant";
import { Connection } from "../Connection";

/**
* Mediation Granted
* add a new mediator connection
* and store for future use
*/

interface Args {
message: Domain.Message;
}

export class MediateGrant extends Task<void, Args> {
async run(ctx: DIDCommContext) {
const grantMessage = MediationGrant.fromMessage(this.args.message);
const uri = expect(this.args.message.from);
const msgTo = expect(this.args.message.to);
const connection = expect(ctx.Connections.find(uri.toString()));
const mediator: Domain.Mediator = {
hostDID: Domain.DID.from(msgTo),
mediatorDID: Domain.DID.from(uri),
routingDID: Domain.DID.from(grantMessage.body.routing_did),
};

connection.state = Connection.State.GRANTED;
await ctx.Pluto.storeMediator(mediator);

ctx.logger.info(`Mediation Granted: ${uri}`);
}
}
Loading

0 comments on commit 5a90c84

Please sign in to comment.