Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create documentation for running non-python #23989

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 54 additions & 1 deletion docs/docs-beta/docs/guides/external-systems/non-python.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,57 @@
---
title: "Running non-Python languages"
sidebar_position: 60
---
---

Dagster is written in Python, but that doesn't mean it's limited to running Python to materialize assets. With Pipes, you can run code in other languages and send information back to Dagster. This guide covers how to run JavaScript with Dagster using Pipes, however, the same principle will apply to other languages.

cmpadden marked this conversation as resolved.
Show resolved Hide resolved
<details>
<summary>Prerequisites</summary>

- Familiarity with [Assets](/concepts/assets)
- A basic understanding of JavaScript and Node.js
</details>

# Define a JavaScript-based asset

We will demonstrate running JavaScript code using Dagster Pipes. In this example, the `train_model` function loads a CSV and trains a sequential model using the Tensorflow library.

<CodeExample filePath="guides/automation/pipes-contrived-javascript.py" language="javascript" title="A simple Tensorflow function." />

With an `@asset` definition in Dagster, you can now invoke your JavaScript function from Dagster.

<CodeExample filePath="guides/automation/pipes-asset.py" language="python" title="Asset using Dagster Pipes." />

If the command passed to Dagster Pipes (`node tensorflow/main.js`) exits successfully, then an asset materialization result will be created implicitly for the asset defined here. Additionally, the stdout/stderr will be collected into the asset logs. Dagster Pipes supports passing parameters into Pipes and allowing Pipes processes to more explicitly define the asset materialization event.

# Add JavaScript utility functions to interface with Dagster Pipes

cmpadden marked this conversation as resolved.
Show resolved Hide resolved
Dagster Pipes follows a similar design to Unix pipes, hence the name. The `PipesSubprocessClient` is responsible for running external processes and setting up input/output files. The asset defined here is materialized using the `PipesSubprocessClient` running a Node.js file containing the `train_model` function.

The `PipesSubprocessClient` calls the child process with two environment variables defined, each containing a path to a file. One for input and one for output.
- DAGSTER_PIPES_CONTEXT: Input context
- DAGSTER_PIPES_MESSAGES: Output context
cmpadden marked this conversation as resolved.
Show resolved Hide resolved

<CodeExample filePath="guides/automation/pipes-javascript-utility.py" language="javascript" title="Utility functions to interface with Dagster Pipes." />

Both environment variables are base64, zip compressed JSON objects with a "path" key. These functions decode these environment variables and access the files to hook up our JavaScript function to Dagster.

# Create a JavaScript interface for Dagster to invoke

With the utility functions to decode the Dagster Pipes environment variables, we can send additional parameters into the JavaScript process and output additional information into the asset materializations. The `run_operation` function creates an interface between Dagster Pipes and the JavaScript file to do just that.

<CodeExample filePath="guides/automation/pipes-full-featured-javascript.js" language="javascript" title="Adding a new JavaScript entrypoint for Dagster Pipes." />

`run_operation` looks for the `operation_name` and `config` in the Dagster Pipes context. The `operation_name` is the function to run, while `config` is the parameter to said function.

`run_operation` expects the function it runs will return a model. From the model, `run_operation` accesses the loss function and adds it to the `extras` in the explicit asset materialization it writes to the messages.

# Call the JavaScript interface using Dagster Pipes

Lastly, we can update the `@asset` definition to pass in these additional parameters.

<CodeExample filePath="guides/automation/pipes-asset-with-context.py" language="python" title="Asset using Dagster Pipes." />

# What's next?

From here, this example could be extended to support many JavaScript defined operations and many JavaScript based assets using different pairings of operations and configurations.
6 changes: 3 additions & 3 deletions docs/docs-beta/sidebars.ts
Original file line number Diff line number Diff line change
Expand Up @@ -226,9 +226,9 @@ const sidebars: SidebarsConfig = {
{
type: 'autogenerated',
dirName: 'about',
}
]
}
},
],
},
],
dagsterPlus: [
{
Expand Down
4 changes: 2 additions & 2 deletions docs/docs-beta/src/components/CodeExample.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ const CodeExample: React.FC<CodeExampleProps> = ({filePath, language, title}) =>
// Adjust the import path to start from the docs directory
import(`!!raw-loader!/../../examples/docs_beta_snippets/docs_beta_snippets/${filePath}`)
.then((module) => {
const lines = module.default.split('\n').map(line => {
return line.replaceAll(/#.*?noqa.*?$/g, "").trim();
const lines = module.default.split('\n').map((line) => {
return line.replaceAll(/#.*?noqa.*?$/g, '').trim();
});
const mainIndex = lines.findIndex((line) => line.trim().startsWith('if __name__ == '));
const strippedContent =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import dagster as dg


@dg.asset(
# add other useful metadata
compute_kind="javascript",
)
def my_asset(
context: dg.AssetExecutionContext,
pipes_subprocess_client: dg.PipesSubprocessClient,
):
"""Runs Javascript to generate an asset."""
return pipes_subprocess_client.run(
command=["node", "tensorflow/main.js"],
context=context.op_execution_context,
extras={
"operation_name": "train_model",
"config": {
"path_to_data": "file://../tensorflow/data/data.csv",
"data_config": { "hasHeaders": True },
"path_to_model": "file://../tensorflow/model"
}
},
).get_materialize_result()


defs = dg.Definitions(
assets=[my_asset],
resources={"pipes_subprocess_client": dg.PipesSubprocessClient()},
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import dagster as dg


@dg.asset(
# add other useful metadata
compute_kind="javascript",
)
def my_asset(
context: dg.AssetExecutionContext,
pipes_subprocess_client: dg.PipesSubprocessClient,
):
"""Runs Javascript to generate an asset."""
return pipes_subprocess_client.run(
command=["node", "tensorflow/main.js"],
context=context.op_execution_context,
extras={},
).get_materialize_result()


defs = dg.Definitions(
assets=[my_asset],
resources={"pipes_subprocess_client": dg.PipesSubprocessClient()},
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// "tensorflow/main.js"
import * as tf from '@tensorflow/tfjs';

const CONFIG = { /* ... */ };

async function train_model() {
const { path_to_data, data_config, path_to_model } = CONFIG;
const dataset = await tf.data.csv(path_to_data, data_config).map(({ xs, ys }) => {
return {
xs: tf.tensor2d(Object.values(xs), [Object.values(xs).length, 1]),
ys: tf.tensor2d(Object.values(ys), [Object.values(ys).length, 1])
};
})
.batch(100);

const model = tf.sequential()
model.add(tf.layers.dense({units: 1, inputShape: [1]}));
model.compile({loss: 'meanSquaredError', optimizer: 'sgd'});

await model.fitDataset(dataset, {epochs: 250})
await model.save(path_to_model);
model.summary();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
const ALL_OPERATIONS = {
train_model
}

async function run_operation() {
const { asset_keys, extras: { operation_name, config } } = await getPipesContext()
if (!(operation_name in ALL_OPERATIONS)) {
setPipesMessages({ error: `Operation ${operation_name} not found` });
return;
}
const operation = ALL_OPERATIONS[operation_name];
const model = await operation(config);
await setPipesMessages({
method: "report_asset_materialization",
params: {
asset_key: asset_keys[0],
data_version: null,
metadata: {
metrics: model.metrics ? { raw_value: model.metrics, type: "text" } : undefined,
loss: { raw_value: model.loss, type: "text" },
},
},
});
return 0;
}

run_operation().then((result) => {
process.exit(result);
});
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import * as util from 'util';
import { promises as fs } from "fs";
import { inflate } from 'zlib';

const inflateAsync = util.promisify(inflate);

async function _decodeParam(value) {
if (!value) {
return null;
}
// Decode from base64 and unzip
const decoded = Buffer.from(value, "base64");
const decompressed = await inflateAsync(decoded);
// Deserialize to JSON
return JSON.parse(decompressed.toString("utf-8"));
}

async function getPipesContext() {
// get the env var value for where the pipes context is stored
const encodedPipesContextParam = process.env.DAGSTER_PIPES_CONTEXT;
// decove the value to get the input file path
const decodedPipesContextParam = await _decodeParam(encodedPipesContextParam);
if (!decodedPipesContextParam) {
return null;
}
return await fs.readFile(decodedPipesContextParam.path, "utf-8")
.then((data) => JSON.parse(data));
}

async function setPipesMessages(message) {
// get the env var value for where the pipes message is stored
const encodedPipesMessagesParam = process.env.DAGSTER_PIPES_MESSAGES;
// decode the value to get the output file path
const decodedPipesMessagesParam = await _decodeParam(encodedPipesMessagesParam);
if (!decodedPipesMessagesParam) {
return null;
}
const path = decodedPipesMessagesParam.path;
await fs.appendFile(path, JSON.stringify(message) + "\n");
}