Skip to content

Commit

Permalink
Adds missing targets dependency files as inputs for lage-service (#847)
Browse files Browse the repository at this point in the history
* adding missing inputs from dependencies

* Change files

* try updating yarn for better e2e tests

* allow for non-immutable builds for these fixtures

* fix up the yarnrc.yml to be legal yaml

* fixing tests
  • Loading branch information
kenotron authored Feb 7, 2025
1 parent b0ee668 commit 142b145
Show file tree
Hide file tree
Showing 6 changed files with 1,051 additions and 153,536 deletions.
11 changes: 11 additions & 0 deletions change/change-59938af0-4161-4e30-a935-aef8e2327605.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
{
"changes": [
{
"type": "patch",
"comment": "adding missing inputs from dependencies",
"packageName": "@lage-run/cli",
"email": "[email protected]",
"dependentChangeType": "patch"
}
]
}
2 changes: 1 addition & 1 deletion packages/cli/src/commands/exec/executeRemotely.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ async function executeOnServer(args: string[], client: LageClient, logger: Logge
task,
taskArgs,
});
logger.info(`Task ${response.packageName} ${response.task} exited with code ${response.exitCode} `);
logger.info(`Task ${response.packageName} ${response.task} exited with code ${response.exitCode}`);
return response;
} catch (error) {
if (error instanceof ConnectError) {
Expand Down
45 changes: 39 additions & 6 deletions packages/cli/src/commands/server/lageService.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { type ConfigOptions, getConfig, getConcurrency, getMaxWorkersPerTask } from "@lage-run/config";
import type { Logger } from "@lage-run/logger";
import type { ILageService } from "@lage-run/rpc";
import { getTargetId, type TargetGraph } from "@lage-run/target-graph";
import { getStartTargetId, getTargetId, type TargetGraph } from "@lage-run/target-graph";
import { type DependencyMap, getPackageInfos, getWorkspaceRoot } from "workspace-tools";
import { createTargetGraph } from "../run/createTargetGraph.js";
import { type Readable } from "stream";
Expand Down Expand Up @@ -88,6 +88,7 @@ async function createInitializedPromise({ cwd, logger, serverControls, nodeArg,

const filteredPipeline = filterPipelineDefinitions(targetGraph.targets.values(), config.pipeline);

logger.info("Initializing Pool");
const pool = new AggregatedPool({
logger,
maxWorkersByGroup: new Map([...getMaxWorkersPerTask(filteredPipeline, maxWorkers)]),
Expand Down Expand Up @@ -122,6 +123,7 @@ async function createInitializedPromise({ cwd, logger, serverControls, nodeArg,
serverControls.countdownToShutdown();
});

logger.info("done initializing");
return { config, targetGraph, packageTree, dependencyMap, root, pool };
}

Expand Down Expand Up @@ -215,7 +217,36 @@ export async function createLageService({
? glob(config.cacheOptions?.environmentGlob, { cwd: root, gitignore: true })
: ["lage.config.js"];

const inputs = (getInputFiles(target, dependencyMap, packageTree) ?? []).concat(globalInputs);
const inputsSet = new Set<string>(getInputFiles(target, dependencyMap, packageTree) ?? []);

for (const globalInput of globalInputs) {
inputsSet.add(globalInput);
}

for (const dependency of target.dependencies) {
if (dependency === getStartTargetId()) {
continue;
}

const depTarget = targetGraph.targets.get(dependency)!;
const depInputs = getInputFiles(depTarget, dependencyMap, packageTree);
if (depInputs) {
depInputs.forEach((file) => inputsSet.add(file));
}
}

const inputs = Array.from(inputsSet);

let results: {
packageName?: string;
task: string;
exitCode: number;
inputs: string[];
outputs: string[];
stdout: string;
stderr: string;
id: string;
};

try {
await pool.exec(
Expand Down Expand Up @@ -258,11 +289,10 @@ export async function createLageService({

const outputs = getOutputFiles(root, target, config.cacheOptions?.outputGlob, packageTree);

return {
results = {
packageName: request.packageName,
task: request.task,
exitCode: 0,
hash: "",
inputs,
outputs,
stdout: writableStdout.toString(),
Expand All @@ -272,18 +302,21 @@ export async function createLageService({
} catch (e) {
const outputs = getOutputFiles(root, target, config.cacheOptions?.outputGlob, packageTree);

return {
results = {
packageName: request.packageName,
task: request.task,
exitCode: 1,
hash: "",
inputs,
outputs,
stdout: "",
stderr: e instanceof Error ? e.toString() : "",
id,
};
}

logger.info(`${request.packageName}#${request.task} results: \n${JSON.stringify(results, null, 2)}\n------`, results);

return results;
},
};
}
110 changes: 99 additions & 11 deletions packages/e2e-tests/src/lageserver.test.ts
Original file line number Diff line number Diff line change
@@ -1,37 +1,34 @@
import path from "path";
import { Monorepo } from "./mock/monorepo.js";
import { parseNdJson } from "./parseNdJson.js";
import fs from "fs";

describe("lageserver", () => {
let repo: Monorepo | undefined;

afterEach(async () => {
await repo?.cleanup();
repo = undefined;
});

it("connects to a running server", async () => {
repo = new Monorepo("basics");
const repo = new Monorepo("basics");

repo.init();
repo.addPackage("a", ["b"]);
repo.addPackage("b");

repo.install();

const serverProcess = repo.runServer();
const serverProcess = repo.runServer(["build"]);
await new Promise((resolve) => setTimeout(resolve, 2000));

const results = repo.run("lage", ["exec", "--server", "--tasks", "build", "--", "a", "build"]);
const output = results.stdout + results.stderr;

const jsonOutput = parseNdJson(output);

serverProcess.kill();

expect(jsonOutput.find((entry) => entry.data?.target?.id === "a#build" && entry.msg === "Finished")).toBeTruthy();
expect(jsonOutput.find((entry) => entry.msg === "Task a build exited with code 0")).toBeTruthy();
await repo.cleanup();
});

it("launches a background server", async () => {
repo = new Monorepo("basics");
const repo = new Monorepo("basics");

repo.init();
repo.addPackage("a", ["b"]);
Expand All @@ -50,5 +47,96 @@ describe("lageserver", () => {
} catch (e) {
// ignore if cannot kill this
}

await repo.cleanup();
});

it("reports inputs for targets and their dependencies' files", async () => {
const repo = new Monorepo("basics");

repo.init();

repo.addPackage("a", ["b"]);
repo.addPackage("b");

repo.install();

repo.commitFiles({
"packages/a/src/index.ts": "console.log('a');",
"packages/a/alt/extra.ts": "console.log('a');",
"packages/b/alt/index.ts": "console.log('b');",
"packages/b/src/extra.ts": "console.log('b');",
});

repo.setLageConfig(
`module.exports = {
pipeline: {
"a#build": {
inputs: ["src/**"],
dependsOn: ["^build"],
},
"b#build": {
inputs: ["alt/**"],
dependsOn: ["^build"],
},
},
};`
);

const results = repo.run("lage", [
"exec",
"b",
"build",
"--tasks",
"build",
"--server",
"localhost:5111",
"--timeout",
"2",
"--reporter",
"json",
]);

const output = results.stdout + results.stderr;
const jsonOutput = parseNdJson(output);
const started = jsonOutput.find((entry) => entry.data?.pid && entry.msg === "Server started");
expect(started?.data.pid).not.toBeUndefined();

repo.run("lage", ["exec", "a", "build", "--tasks", "build", "--server", "localhost:5111", "--timeout", "2", "--reporter", "json"]);

try {
process.kill(parseInt(started?.data.pid));
} catch (e) {
// ignore if cannot kill this
}

const serverLogs = fs.readFileSync(path.join(repo.root, "node_modules/.cache/lage/server.log"), "utf-8");

const lines = serverLogs.split("\n");
let aResults: any;
let bResults: any;

lines.forEach((line, index) => {
if (line.includes("a#build results:")) {
// scan the next few lines until we see a "}", and then parse the JSON
const endToken = "}";
const endTokenLine = lines.findIndex((line, i) => i > index && line.startsWith(endToken));
aResults = JSON.parse(lines.slice(index + 1, endTokenLine + 1).join("\n"));
}

if (line.includes("b#build results:")) {
const endToken = "}";
const endTokenLine = lines.findIndex((line, i) => i > index && line.startsWith(endToken));
bResults = JSON.parse(lines.slice(index + 1, endTokenLine + 1).join("\n"));
}
});

expect(aResults.inputs.find((input) => input === "packages/a/src/index.ts")).toBeTruthy();
expect(aResults.inputs.find((input) => input === "packages/a/alt/extra.ts")).toBeUndefined();
expect(bResults.inputs.find((input) => input === "packages/b/src/extra.ts")).toBeUndefined();
expect(bResults.inputs.find((input) => input === "packages/b/alt/index.ts")).toBeTruthy();

await repo.cleanup();
}, 20000);
});
17 changes: 13 additions & 4 deletions packages/e2e-tests/src/mock/monorepo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,15 @@ export class Monorepo {
}

fs.cpSync(path.resolve(__dirname, "..", "..", "yarn"), path.dirname(this.yarnPath), { recursive: true });
execa.sync(`"${process.execPath}"`, [`"${this.yarnPath}"`, "install"], { cwd: this.root, shell: true });
execa.sync(`"${process.execPath}"`, [`"${this.yarnPath}"`, "install", "--no-immutable"], { cwd: this.root, shell: true });
}

generateRepoFiles() {
this.commitFiles({
".yarnrc": `yarn-path "${this.yarnPath}"`,
".yarnrc.yml": `yarnPath: "${this.yarnPath.replace(/\\/g, "/")}"\ncacheFolder: "${this.root.replace(
/\\/g,
"/"
)}/.yarn/cache"\nnodeLinker: node-modules`,
"package.json": {
name: this.name.replace(/ /g, "-"),
version: "0.1.0",
Expand Down Expand Up @@ -154,12 +157,18 @@ export class Monorepo {
});
}

runServer() {
return execa.default(process.execPath, [path.join(this.root, "node_modules/lage/dist/lage-server.js")], {
runServer(tasks: string[]) {
const cp = execa.default(process.execPath, [path.join(this.root, "node_modules/lage/dist/lage-server.js"), "--tasks", ...tasks], {
cwd: this.root,
detached: true,
stdio: "ignore",
});

if (cp && !cp.pid) {
throw new Error("Failed to start server");
}

return cp;
}

async cleanup() {
Expand Down
Loading

0 comments on commit 142b145

Please sign in to comment.