Skip to content

Commit

Permalink
Merge branch 'main' into met-806-worker-pooling
Browse files Browse the repository at this point in the history
  • Loading branch information
Natoandro committed Jan 20, 2025
2 parents 4f120b4 + 68cfb17 commit b548328
Show file tree
Hide file tree
Showing 7 changed files with 51 additions and 11 deletions.
6 changes: 3 additions & 3 deletions src/typegate/src/runtimes/substantial.ts
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,9 @@ export class SubstantialRuntime extends Runtime {
const queue = "default";

const agentConfig = {
pollIntervalSec: typegate.config.base.substantial_poll_interval_sec,
leaseLifespanSec: typegate.config.base.substantial_lease_lifespan_sec,
maxAcquirePerTick: typegate.config.base.substantial_max_acquire_per_tick,
pollIntervalSec: typegate.config.base.substantial_poll_interval_sec!,
leaseLifespanSec: typegate.config.base.substantial_lease_lifespan_sec!,
maxAcquirePerTick: typegate.config.base.substantial_max_acquire_per_tick!,
} satisfies AgentConfig;

const agent = new Agent(backend, queue, agentConfig);
Expand Down
23 changes: 20 additions & 3 deletions src/typegate/src/runtimes/substantial/agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -140,9 +140,7 @@ export class Agent {
}

for (const workflow of this.workflows) {
const requests = replayRequests.filter(
({ run_id }) => getTaskNameFromId(run_id) == workflow.name,
);
const requests = this.#selectReplayRequestsFor(workflow.name, replayRequests);

while (requests.length > 0) {
// this.logger.warn(`Run workflow ${JSON.stringify(next)}`);
Expand All @@ -166,6 +164,25 @@ export class Agent {
}
}

#selectReplayRequestsFor(workflowName: string, runsInScope: Array<NextRun>) {
const runsToDo = [];
for (const run of runsInScope) {
try {
if (getTaskNameFromId(run.run_id) == workflowName) {
runsToDo.push(run);
}
} catch(err) {
this.logger.warn(`Bad runId ${run.run_id}`);
this.logger.error(err);

// TODO:
// Force remove?
}
}

return runsToDo;
}

async #tryAcquireNextRun() {
const activeRunIds = await Meta.substantial.agentActiveLeases({
backend: this.backend,
Expand Down
4 changes: 2 additions & 2 deletions src/typegate/src/typegraph/mod.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import type * as ast from "graphql/ast";
import { Kind } from "graphql";
import type { DenoRuntime } from "../runtimes/deno/deno.ts";
import type { Runtime } from "../runtimes/Runtime.ts";
import { ensure, ensureNonNullable } from "../utils.ts";
import { deepClone, ensure, ensureNonNullable } from "../utils.ts";
import { typegraph_validate } from "native";
import Chance from "chance";
import {
Expand Down Expand Up @@ -229,7 +229,7 @@ export class TypeGraph implements AsyncDisposable {
typegraph,
typegraphName,
materializers,
args: (runtime as any)?.data ?? {},
args: deepClone((runtime as any)?.data ?? {}),
secretManager,
});
}),
Expand Down
2 changes: 2 additions & 0 deletions src/typegate/src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -216,3 +216,5 @@ export function collectFieldNames(tg: TypeGraph, typeIdx: number) {

export const sleep = (ms: number) =>
new Promise((resolve) => setTimeout(resolve, ms));

export const deepClone = <T>(clonable: T): T => JSON.parse(JSON.stringify(clonable)) as T;
12 changes: 12 additions & 0 deletions tests/sync/scripts/workflow.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
// Copyright Metatype OÜ, licensed under the Mozilla Public License Version 2.0.
// SPDX-License-Identifier: MPL-2.0

interface Context {
kwargs: {
name: string;
};
}

export function sayHello(ctx: Context) {
return `Hello ${ctx.kwargs.name}`;
}
9 changes: 8 additions & 1 deletion tests/sync/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,17 @@

from typegraph import t, typegraph, Policy, Graph
from typegraph.runtimes.deno import DenoRuntime
from typegraph.runtimes.substantial import Backend, SubstantialRuntime, WorkflowFile


@typegraph()
def sync(g: Graph):
deno = DenoRuntime()
backend = Backend.redis("SUB_REDIS")

file = WorkflowFile.deno(file="scripts/workflow.ts").import_(["sayHello"]).build()

sub = SubstantialRuntime(backend, [file])
public = Policy.public()

g.expose(
Expand All @@ -17,5 +23,6 @@ def sync(g: Graph):
name="hello",
module="scripts/hello.ts",
secrets=["ULTRA_SECRET"],
).with_policy(public)
).with_policy(public),
helloWorkflow=sub.start(t.struct({"name": t.string()})),
)
6 changes: 4 additions & 2 deletions tests/sync/sync_force_remove_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,13 +80,15 @@ Meta.test(
const _engine = await t.engine("sync/sync.py", {
secrets: {
ULTRA_SECRET:
"if_you_can_read_me_on_an_ERROR_there_is_a_bug",
"if_you_can_read_me_on_an_ERROR_there_is_a_bug",
SUB_REDIS:
"redis://:password@localhost:6380/0",
},
});

const s3 = new S3Client(syncConfig.s3);
const initialObjects = await listObjects(s3, syncConfig.s3Bucket);
assertEquals(initialObjects?.length, 3);
assertEquals(initialObjects?.length, 4);

const gateNoRemove = await spawnGate(syncConfig);
const namesNoRemove = gateNoRemove.register.list().map(({ name }) =>
Expand Down

0 comments on commit b548328

Please sign in to comment.