From 6b560c8d1d6182b6638d6c63c76c9937205be4b8 Mon Sep 17 00:00:00 2001 From: nate nowack Date: Fri, 17 Jan 2025 15:27:49 -0600 Subject: [PATCH] revamp integration contributor guide and clean up other sections (#16757) --- docs/contribute/contribute-integrations.mdx | 74 +++++- docs/mint.json | 4 +- docs/snippets/installation.mdx | 15 +- docs/v3/develop/index.mdx | 8 +- docs/v3/develop/interact-with-api.mdx | 235 +++++++++++++++----- docs/v3/get-started/install.mdx | 42 +++- 6 files changed, 306 insertions(+), 72 deletions(-) diff --git a/docs/contribute/contribute-integrations.mdx b/docs/contribute/contribute-integrations.mdx index 9ee1c97a8569..9930cd676b64 100644 --- a/docs/contribute/contribute-integrations.mdx +++ b/docs/contribute/contribute-integrations.mdx @@ -1,20 +1,70 @@ --- -sidebarTitle: Contribute integrations -title: Contribute integrations +sidebarTitle: Contribute to integrations +title: Contribute to integrations --- -Prefect welcomes integration contributions. -Create an integration package for community use by following the steps below. +Prefect welcomes contributions to existing integrations. -## Generate a project + + Thinking about making your own integration? Feel free to [create a new discussion](https://github.com/PrefectHQ/prefect/discussions/new?category=ideas) to flesh out your idea with other contributors. + -To help you create an integration, use [this GitHub repository](https://github.com/PrefectHQ/prefect-collection-template#quickstart) to bootstrap an integration project. -The repository uses [Cookiecutter](https://cookiecutter.readthedocs.io/) to generate a project structure. -The template includes tools for testing, linting, docs, and building the package. +## Contributing to existing integrations -## List your project in the integrations catalog +All integrations are hosted in the [Prefect GitHub repository](https://github.com/PrefectHQ/prefect) under `src/integrations`. -To list your integration in the Prefect integrations catalog, submit a pull request to the [Prefect GitHub repository](https://github.com/PrefectHQ/prefect). +To contribute to an existing integration, please follow these steps: + + + + Fork the [Prefect GitHub repository](https://github.com/PrefectHQ/prefect) + + + + ```bash + git clone https://github.com/your-username/prefect.git + ``` + + + + ```bash + git checkout -b my-new-branch + ``` + + + + Move to the integration directory and install the dependencies: + ```bash + cd src/integrations/my-integration + uv venv --python 3.12 + source .venv/bin/activate + uv pip install -e ".[dev]" + ``` + + + + Make the necessary changes to the integration code. + + + + If you're adding new functionality, please add tests. + + You can run the tests with: + ```bash + pytest tests + ``` + + + + ```bash + git add . + git commit -m "My new integration" + git push origin my-new-branch + ``` + + + + Submit your pull request upstream through the GitHub interface. + + -Add a file to the `docs/integrations/catalog` directory with details about your integration package. -Use the `TEMPLATE.yaml` file in that folder as a guide. \ No newline at end of file diff --git a/docs/mint.json b/docs/mint.json index 76f9942eec9f..f2dd4bd31503 100644 --- a/docs/mint.json +++ b/docs/mint.json @@ -116,8 +116,7 @@ "v3/develop/pause-resume", "v3/develop/cancel", "v3/develop/inputs", - "v3/develop/runtime-context", - "v3/develop/interact-with-api" + "v3/develop/runtime-context" ], "version": "v3" }, @@ -148,6 +147,7 @@ "v3/develop/settings-ref" ] }, + "v3/develop/interact-with-api", "v3/develop/test-workflows" ], "version": "v3" diff --git a/docs/snippets/installation.mdx b/docs/snippets/installation.mdx index 8672f57c39d7..93746858cf8c 100644 --- a/docs/snippets/installation.mdx +++ b/docs/snippets/installation.mdx @@ -1,5 +1,14 @@ -To install Prefect with pip, run: +To install Prefect with `pip`, run: -```bash + + +```bash pip pip install -U prefect -``` \ No newline at end of file +``` + +```bash uv +uv pip install -U prefect +``` + + + diff --git a/docs/v3/develop/index.mdx b/docs/v3/develop/index.mdx index eafe9b9d1f1a..ebaa2e283dac 100644 --- a/docs/v3/develop/index.mdx +++ b/docs/v3/develop/index.mdx @@ -35,4 +35,10 @@ The **Manage concurrency** section explains how to speed up your workflows and l - [Limit concurrent task runs](/v3/develop/task-run-limits/) shows how to prevent too many tasks from running simultaneously. - [Apply concurrency and rate limits](/v3/develop/global-concurrency-limits/) demonstrates how to control concurrency and apply rate limits using Prefect's provided utilities. -Finally, [Test workflows](/v3/develop/test-workflows/) discusses tools for testing workflows. + +Explore further customization beyond the core SDK with: + +- [Using client methods](/v3/develop/interact-with-api/) + + +Finally, [testing workflows](/v3/develop/test-workflows/) includes strategies and utilities for testing workflows. diff --git a/docs/v3/develop/interact-with-api.mdx b/docs/v3/develop/interact-with-api.mdx index 5f939cf288e6..b37186ef3784 100644 --- a/docs/v3/develop/interact-with-api.mdx +++ b/docs/v3/develop/interact-with-api.mdx @@ -1,27 +1,54 @@ --- -title: Manage run metadata in Python +title: Using prefect client methods description: Learn how to use the `PrefectClient` to interact with the API. --- +## Overview + + The [`PrefectClient`](https://prefect-python-sdk-docs.netlify.app/prefect/client/) -contains many methods that make it simpler to perform actions, such as: +offers methods to simplify common operations against Prefect's REST API that may not be abstracted away by the SDK. -- reschedule late flow runs -- get the last `N` completed flow runs from a workspace +For example, to [reschedule flow runs](/v3/develop/interact-with-api/#reschedule-late-flow-runs), one might use methods like: + - `read_flow_runs` with a `FlowRunFilter` to read certain flow runs + - `create_flow_run_from_deployment` to schedule new flow runs + - `delete_flow_run` to delete a very `Late` flow run -The `PrefectClient` is an async context manager. Here's an example usage: -```python -from prefect import get_client +### Getting a client + +By default, `get_client()` returns an asynchronous client to be used as a context manager, but you may also use a synchronous client. + + + +```python async +from prefect import get_client async with get_client() as client: response = await client.hello() print(response.json()) # ๐Ÿ‘‹ ``` +You can also use a synchronous client: + +```python sync +from prefect import get_client + +with get_client(sync_client=True) as client: + response = client.hello() + print(response.json()) # ๐Ÿ‘‹ +``` + + + ## Examples +These examples are meant to illustrate how one might develop their own utilities for interacting with the API. + + +If you believe a client method is missing, or you'd like to see a specific pattern better represented in the SDK generally, please [open an issue](https://github.com/PrefectHQ/prefect/issues/new/choose). + ### Reschedule late flow runs To bulk reschedule flow runs that are late, delete the late flow runs and create new ones in a @@ -32,16 +59,64 @@ The following example reschedules the last three late flow runs of a deployment `healthcheck-storage-test` to run six hours later than their original expected start time. It also deletes any remaining late flow runs of that deployment. +First, define the rescheduling function: + +```python +async def reschedule_late_flow_runs( + deployment_name: str, + delay: timedelta, + most_recent_n: int, + delete_remaining: bool = True, + states: list[str] | None = None +) -> list[FlowRun]: + states = states or ["Late"] + + async with get_client() as client: + flow_runs = await client.read_flow_runs( + flow_run_filter=FlowRunFilter( + state=dict(name=dict(any_=states)), + expected_start_time=dict(before_=datetime.now(timezone.utc)), + ), + deployment_filter=DeploymentFilter(name={'like_': deployment_name}), + sort=FlowRunSort.START_TIME_DESC, + limit=most_recent_n if not delete_remaining else None + ) + + rescheduled_flow_runs: list[FlowRun] = [] + for i, run in enumerate(flow_runs): + await client.delete_flow_run(flow_run_id=run.id) + if i < most_recent_n: + new_run = await client.create_flow_run_from_deployment( + deployment_id=run.deployment_id, + state=Scheduled(scheduled_time=run.expected_start_time + delay), + ) + rescheduled_flow_runs.append(new_run) + + return rescheduled_flow_runs +``` + +Then use it to reschedule flows: + ```python +rescheduled_flow_runs = asyncio.run( + reschedule_late_flow_runs( + deployment_name="healthcheck-storage-test", + delay=timedelta(hours=6), + most_recent_n=3, + ) +) +``` + + + +```python reschedule_late_flows.py +from __future__ import annotations + import asyncio from datetime import datetime, timedelta, timezone -from typing import Optional - from prefect import get_client -from prefect.client.schemas.filters import ( - DeploymentFilter, FlowRunFilter -) +from prefect.client.schemas.filters import DeploymentFilter, FlowRunFilter from prefect.client.schemas.objects import FlowRun from prefect.client.schemas.sorting import FlowRunSort from prefect.states import Scheduled @@ -51,22 +126,17 @@ async def reschedule_late_flow_runs( delay: timedelta, most_recent_n: int, delete_remaining: bool = True, - states: Optional[list[str]] = None + states: list[str] | None = None ) -> list[FlowRun]: - if not states: - states = ["Late"] + states = states or ["Late"] async with get_client() as client: flow_runs = await client.read_flow_runs( flow_run_filter=FlowRunFilter( state=dict(name=dict(any_=states)), - expected_start_time=dict( - before_=datetime.now(timezone.utc) - ), - ), - deployment_filter=DeploymentFilter( - name={'like_': deployment_name} + expected_start_time=dict(before_=datetime.now(timezone.utc)), ), + deployment_filter=DeploymentFilter(name={'like_': deployment_name}), sort=FlowRunSort.START_TIME_DESC, limit=most_recent_n if not delete_remaining else None ) @@ -75,15 +145,13 @@ async def reschedule_late_flow_runs( print(f"No flow runs found in states: {states!r}") return [] - rescheduled_flow_runs = [] + rescheduled_flow_runs: list[FlowRun] = [] for i, run in enumerate(flow_runs): await client.delete_flow_run(flow_run_id=run.id) if i < most_recent_n: new_run = await client.create_flow_run_from_deployment( deployment_id=run.deployment_id, - state=Scheduled( - scheduled_time=run.expected_start_time + delay - ), + state=Scheduled(scheduled_time=run.expected_start_time + delay), ) rescheduled_flow_runs.append(new_run) @@ -100,16 +168,16 @@ if __name__ == "__main__": ) print(f"Rescheduled {len(rescheduled_flow_runs)} flow runs") - - assert all( - run.state.is_scheduled() for run in rescheduled_flow_runs - ) + + assert all(run.state.is_scheduled() for run in rescheduled_flow_runs) assert all( run.expected_start_time > datetime.now(timezone.utc) for run in rescheduled_flow_runs ) ``` + + ### Get the last `N` completed flow runs from your workspace To get the last `N` completed flow runs from your workspace, use `read_flow_runs` and `prefect.client.schemas`. @@ -117,26 +185,48 @@ To get the last `N` completed flow runs from your workspace, use `read_flow_runs This example gets the last three completed flow runs from your workspace: ```python +async def get_most_recent_flow_runs( + n: int, + states: list[str] | None = None +) -> list[FlowRun]: + async with get_client() as client: + return await client.read_flow_runs( + flow_run_filter=FlowRunFilter( + state={'type': {'any_': states or ["COMPLETED"]}} + ), + sort=FlowRunSort.END_TIME_DESC, + limit=n, + ) +``` + +Use it to get the last 3 completed runs: + +```python +flow_runs: list[FlowRun] = asyncio.run( + get_most_recent_flow_runs(n=3) +) +``` + + + +```python get_recent_flows.py +from __future__ import annotations + import asyncio -from typing import Optional from prefect import get_client from prefect.client.schemas.filters import FlowRunFilter from prefect.client.schemas.objects import FlowRun from prefect.client.schemas.sorting import FlowRunSort - async def get_most_recent_flow_runs( - n: int = 3, - states: Optional[list[str]] = None -) -> list[FlowRun]: - if not states: - states = ["COMPLETED"] - + n: int, + states: list[str] | None = None +) -> list[FlowRun]: async with get_client() as client: return await client.read_flow_runs( flow_run_filter=FlowRunFilter( - state={'type': {'any_': states}} + state={'type': {'any_': states or ["COMPLETED"]}} ), sort=FlowRunSort.END_TIME_DESC, limit=n, @@ -144,52 +234,89 @@ async def get_most_recent_flow_runs( if __name__ == "__main__": - last_3_flow_runs: list[FlowRun] = asyncio.run( - get_most_recent_flow_runs() + flow_runs: list[FlowRun] = asyncio.run( + get_most_recent_flow_runs(n=3) ) - print(last_3_flow_runs) + assert len(flow_runs) == 3 assert all( - run.state.is_completed() for run in last_3_flow_runs + run.state.is_completed() for run in flow_runs ) assert ( - end_times := [run.end_time for run in last_3_flow_runs] + end_times := [run.end_time for run in flow_runs] ) == sorted(end_times, reverse=True) ``` + + Instead of the last three from the whole workspace, you can also use the `DeploymentFilter` to get the last three completed flow runs of a specific deployment. ### Transition all running flows to cancelled through the Client Use `get_client`to set multiple runs to a `Cancelled` state. -The code below cancels all flow runs that are in `Pending`, `Running`, `Scheduled`, or `Late` states when the script is run. +This example cancels all flow runs that are in `Pending`, `Running`, `Scheduled`, or `Late` states when the script is run. ```python -import anyio +async def list_flow_runs_with_states(states: list[str]) -> list[FlowRun]: + async with get_client() as client: + return await client.read_flow_runs( + flow_run_filter=FlowRunFilter( + state=FlowRunFilterState( + name=FlowRunFilterStateName(any_=states) + ) + ) + ) + +async def cancel_flow_runs(flow_runs: list[FlowRun]): + async with get_client() as client: + for flow_run in flow_runs: + state = flow_run.state.copy( + update={"name": "Cancelled", "type": StateType.CANCELLED} + ) + await client.set_flow_run_state(flow_run.id, state, force=True) +``` +Cancel all pending, running, scheduled or late flows: + +```python +async def bulk_cancel_flow_runs(): + states = ["Pending", "Running", "Scheduled", "Late"] + flow_runs = await list_flow_runs_with_states(states) + + while flow_runs: + print(f"Cancelling {len(flow_runs)} flow runs") + await cancel_flow_runs(flow_runs) + flow_runs = await list_flow_runs_with_states(states) + +asyncio.run(bulk_cancel_flow_runs()) +``` + + + +```python cancel_flows.py +import asyncio from prefect import get_client from prefect.client.schemas.filters import FlowRunFilter, FlowRunFilterState, FlowRunFilterStateName -from prefect.client.schemas.objects import StateType +from prefect.client.schemas.objects import FlowRun, StateType -async def list_flow_runs_with_states(states: list[str]): +async def list_flow_runs_with_states(states: list[str]) -> list[FlowRun]: async with get_client() as client: - flow_runs = await client.read_flow_runs( + return await client.read_flow_runs( flow_run_filter=FlowRunFilter( state=FlowRunFilterState( name=FlowRunFilterStateName(any_=states) ) ) ) - return flow_runs -async def cancel_flow_runs(flow_runs): +async def cancel_flow_runs(flow_runs: list[FlowRun]): async with get_client() as client: for idx, flow_run in enumerate(flow_runs): print(f"[{idx + 1}] Cancelling flow run '{flow_run.name}' with ID '{flow_run.id}'") - state_updates = {} + state_updates: dict[str, str] = {} state_updates.setdefault("name", "Cancelled") state_updates.setdefault("type", StateType.CANCELLED) state = flow_run.state.copy(update=state_updates) @@ -208,5 +335,7 @@ async def bulk_cancel_flow_runs(): if __name__ == "__main__": - anyio.run(bulk_cancel_flow_runs) -``` \ No newline at end of file + asyncio.run(bulk_cancel_flow_runs()) +``` + + diff --git a/docs/v3/get-started/install.mdx b/docs/v3/get-started/install.mdx index a9aba320feb4..d81e9e748364 100644 --- a/docs/v3/get-started/install.mdx +++ b/docs/v3/get-started/install.mdx @@ -17,7 +17,7 @@ prefect version You should see output similar to: ```bash -Version: 3.0.0 +Version: 3.1.10 API version: 0.8.4 Python version: 3.12.2 Git commit: d6bdb075 @@ -30,6 +30,46 @@ Server: SQLite version: 3.45.2 ``` + + + +### If you use `uv` + +start an `ipython` shell with python 3.12 and `prefect` installed: + +```bash +uvx --python 3.12 --with prefect ipython +``` + +install prefect into a `uv` virtual environment: + +```bash +uv venv --python 3.12 && source .venv/bin/activate +uv pip install -U prefect +``` + +add prefect to an active project: + +```bash +uv add prefect +``` + +run prefect server in an ephemeral python environment with `uvx`: + +```bash +uvx prefect server start +``` + +### If you use `docker` + +run prefect server in a container port-forwarded to your local machine's 4200 port: + +```bash +docker run -d -p 4200:4200 prefecthq/prefect:3-latest -- prefect server start --host 0.0.0.0 +``` + + + ## Windows installation You can install and run Prefect via Windows PowerShell, the Windows Command Prompt, or [conda](https://docs.conda.io/projects/conda/en/latest/user-guide/install/windows.html). After installation, you may need to manually add the Python local packages `Scripts` folder to your `Path` environment variable.