Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds missing targets dependency files as inputs for lage-service #847

Merged
merged 6 commits into from
Feb 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions change/change-59938af0-4161-4e30-a935-aef8e2327605.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
{
"changes": [
{
"type": "patch",
"comment": "adding missing inputs from dependencies",
"packageName": "@lage-run/cli",
"email": "[email protected]",
"dependentChangeType": "patch"
}
]
}
2 changes: 1 addition & 1 deletion packages/cli/src/commands/exec/executeRemotely.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ async function executeOnServer(args: string[], client: LageClient, logger: Logge
task,
taskArgs,
});
logger.info(`Task ${response.packageName} ${response.task} exited with code ${response.exitCode} `);
logger.info(`Task ${response.packageName} ${response.task} exited with code ${response.exitCode}`);
return response;
} catch (error) {
if (error instanceof ConnectError) {
Expand Down
45 changes: 39 additions & 6 deletions packages/cli/src/commands/server/lageService.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { type ConfigOptions, getConfig, getConcurrency, getMaxWorkersPerTask } from "@lage-run/config";
import type { Logger } from "@lage-run/logger";
import type { ILageService } from "@lage-run/rpc";
import { getTargetId, type TargetGraph } from "@lage-run/target-graph";
import { getStartTargetId, getTargetId, type TargetGraph } from "@lage-run/target-graph";
import { type DependencyMap, getPackageInfos, getWorkspaceRoot } from "workspace-tools";
import { createTargetGraph } from "../run/createTargetGraph.js";
import { type Readable } from "stream";
Expand Down Expand Up @@ -88,6 +88,7 @@ async function createInitializedPromise({ cwd, logger, serverControls, nodeArg,

const filteredPipeline = filterPipelineDefinitions(targetGraph.targets.values(), config.pipeline);

logger.info("Initializing Pool");
const pool = new AggregatedPool({
logger,
maxWorkersByGroup: new Map([...getMaxWorkersPerTask(filteredPipeline, maxWorkers)]),
Expand Down Expand Up @@ -122,6 +123,7 @@ async function createInitializedPromise({ cwd, logger, serverControls, nodeArg,
serverControls.countdownToShutdown();
});

logger.info("done initializing");
return { config, targetGraph, packageTree, dependencyMap, root, pool };
}

Expand Down Expand Up @@ -215,7 +217,36 @@ export async function createLageService({
? glob(config.cacheOptions?.environmentGlob, { cwd: root, gitignore: true })
: ["lage.config.js"];

const inputs = (getInputFiles(target, dependencyMap, packageTree) ?? []).concat(globalInputs);
const inputsSet = new Set<string>(getInputFiles(target, dependencyMap, packageTree) ?? []);

for (const globalInput of globalInputs) {
inputsSet.add(globalInput);
}

for (const dependency of target.dependencies) {
if (dependency === getStartTargetId()) {
continue;
}

const depTarget = targetGraph.targets.get(dependency)!;
const depInputs = getInputFiles(depTarget, dependencyMap, packageTree);
if (depInputs) {
depInputs.forEach((file) => inputsSet.add(file));
}
}

const inputs = Array.from(inputsSet);

let results: {
packageName?: string;
task: string;
exitCode: number;
inputs: string[];
outputs: string[];
stdout: string;
stderr: string;
id: string;
};

try {
await pool.exec(
Expand Down Expand Up @@ -258,11 +289,10 @@ export async function createLageService({

const outputs = getOutputFiles(root, target, config.cacheOptions?.outputGlob, packageTree);

return {
results = {
packageName: request.packageName,
task: request.task,
exitCode: 0,
hash: "",
inputs,
outputs,
stdout: writableStdout.toString(),
Expand All @@ -272,18 +302,21 @@ export async function createLageService({
} catch (e) {
const outputs = getOutputFiles(root, target, config.cacheOptions?.outputGlob, packageTree);

return {
results = {
packageName: request.packageName,
task: request.task,
exitCode: 1,
hash: "",
inputs,
outputs,
stdout: "",
stderr: e instanceof Error ? e.toString() : "",
id,
};
}

logger.info(`${request.packageName}#${request.task} results: \n${JSON.stringify(results, null, 2)}\n------`, results);

return results;
},
};
}
110 changes: 99 additions & 11 deletions packages/e2e-tests/src/lageserver.test.ts
Original file line number Diff line number Diff line change
@@ -1,37 +1,34 @@
import path from "path";
import { Monorepo } from "./mock/monorepo.js";
import { parseNdJson } from "./parseNdJson.js";
import fs from "fs";

describe("lageserver", () => {
let repo: Monorepo | undefined;

afterEach(async () => {
await repo?.cleanup();
repo = undefined;
});

it("connects to a running server", async () => {
repo = new Monorepo("basics");
const repo = new Monorepo("basics");

repo.init();
repo.addPackage("a", ["b"]);
repo.addPackage("b");

repo.install();

const serverProcess = repo.runServer();
const serverProcess = repo.runServer(["build"]);
await new Promise((resolve) => setTimeout(resolve, 2000));

const results = repo.run("lage", ["exec", "--server", "--tasks", "build", "--", "a", "build"]);
const output = results.stdout + results.stderr;

const jsonOutput = parseNdJson(output);

serverProcess.kill();

expect(jsonOutput.find((entry) => entry.data?.target?.id === "a#build" && entry.msg === "Finished")).toBeTruthy();
expect(jsonOutput.find((entry) => entry.msg === "Task a build exited with code 0")).toBeTruthy();
await repo.cleanup();
});

it("launches a background server", async () => {
repo = new Monorepo("basics");
const repo = new Monorepo("basics");

repo.init();
repo.addPackage("a", ["b"]);
Expand All @@ -50,5 +47,96 @@ describe("lageserver", () => {
} catch (e) {
// ignore if cannot kill this
}

await repo.cleanup();
});

it("reports inputs for targets and their dependencies' files", async () => {
const repo = new Monorepo("basics");

repo.init();

repo.addPackage("a", ["b"]);
repo.addPackage("b");

repo.install();

repo.commitFiles({
"packages/a/src/index.ts": "console.log('a');",
"packages/a/alt/extra.ts": "console.log('a');",
"packages/b/alt/index.ts": "console.log('b');",
"packages/b/src/extra.ts": "console.log('b');",
});

repo.setLageConfig(
`module.exports = {
pipeline: {
"a#build": {
inputs: ["src/**"],
dependsOn: ["^build"],
},

"b#build": {
inputs: ["alt/**"],
dependsOn: ["^build"],
},
},
};`
);

const results = repo.run("lage", [
"exec",
"b",
"build",
"--tasks",
"build",
"--server",
"localhost:5111",
"--timeout",
"2",
"--reporter",
"json",
]);

const output = results.stdout + results.stderr;
const jsonOutput = parseNdJson(output);
const started = jsonOutput.find((entry) => entry.data?.pid && entry.msg === "Server started");
expect(started?.data.pid).not.toBeUndefined();

repo.run("lage", ["exec", "a", "build", "--tasks", "build", "--server", "localhost:5111", "--timeout", "2", "--reporter", "json"]);

try {
process.kill(parseInt(started?.data.pid));
} catch (e) {
// ignore if cannot kill this
}

const serverLogs = fs.readFileSync(path.join(repo.root, "node_modules/.cache/lage/server.log"), "utf-8");

const lines = serverLogs.split("\n");
let aResults: any;
let bResults: any;

lines.forEach((line, index) => {
if (line.includes("a#build results:")) {
// scan the next few lines until we see a "}", and then parse the JSON
const endToken = "}";
const endTokenLine = lines.findIndex((line, i) => i > index && line.startsWith(endToken));
aResults = JSON.parse(lines.slice(index + 1, endTokenLine + 1).join("\n"));
}

if (line.includes("b#build results:")) {
const endToken = "}";
const endTokenLine = lines.findIndex((line, i) => i > index && line.startsWith(endToken));
bResults = JSON.parse(lines.slice(index + 1, endTokenLine + 1).join("\n"));
}
});

expect(aResults.inputs.find((input) => input === "packages/a/src/index.ts")).toBeTruthy();
expect(aResults.inputs.find((input) => input === "packages/a/alt/extra.ts")).toBeUndefined();
expect(bResults.inputs.find((input) => input === "packages/b/src/extra.ts")).toBeUndefined();
expect(bResults.inputs.find((input) => input === "packages/b/alt/index.ts")).toBeTruthy();

await repo.cleanup();
}, 20000);
});
17 changes: 13 additions & 4 deletions packages/e2e-tests/src/mock/monorepo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,15 @@ export class Monorepo {
}

fs.cpSync(path.resolve(__dirname, "..", "..", "yarn"), path.dirname(this.yarnPath), { recursive: true });
execa.sync(`"${process.execPath}"`, [`"${this.yarnPath}"`, "install"], { cwd: this.root, shell: true });
execa.sync(`"${process.execPath}"`, [`"${this.yarnPath}"`, "install", "--no-immutable"], { cwd: this.root, shell: true });
}

generateRepoFiles() {
this.commitFiles({
".yarnrc": `yarn-path "${this.yarnPath}"`,
".yarnrc.yml": `yarnPath: "${this.yarnPath.replace(/\\/g, "/")}"\ncacheFolder: "${this.root.replace(
/\\/g,
"/"
)}/.yarn/cache"\nnodeLinker: node-modules`,
"package.json": {
name: this.name.replace(/ /g, "-"),
version: "0.1.0",
Expand Down Expand Up @@ -154,12 +157,18 @@ export class Monorepo {
});
}

runServer() {
return execa.default(process.execPath, [path.join(this.root, "node_modules/lage/dist/lage-server.js")], {
runServer(tasks: string[]) {
const cp = execa.default(process.execPath, [path.join(this.root, "node_modules/lage/dist/lage-server.js"), "--tasks", ...tasks], {
cwd: this.root,
detached: true,
stdio: "ignore",
});

if (cp && !cp.pid) {
throw new Error("Failed to start server");
}

return cp;
}

async cleanup() {
Expand Down
Loading