diff --git a/docs/hamilton-ui/ui.rst b/docs/hamilton-ui/ui.rst index 2bb168fcb..c1db0bed0 100644 --- a/docs/hamilton-ui/ui.rst +++ b/docs/hamilton-ui/ui.rst @@ -123,6 +123,12 @@ More extensive self-hosting documentation is in the works, e.g. Snowflake, Datab chart contribution! +Running on Snowflake +--------------------- +You can run the Hamilton UI on Snowflake Container Services. For a detailed guide, see the blog post +`Observability of Python code and application logic with Hamilton UI on Snowflake Container Services `_ by +`Greg Kantyka `_ and the `Hamilton Snowflake Example `_. + ----------- Get started ----------- diff --git a/examples/snowflake/hamilton_ui/Dockerfile b/examples/snowflake/hamilton_ui/Dockerfile new file mode 100644 index 000000000..0433ae7a8 --- /dev/null +++ b/examples/snowflake/hamilton_ui/Dockerfile @@ -0,0 +1,14 @@ +FROM python:3.13.1-slim + +RUN apt update +RUN apt upgrade sqlite3 -y +RUN pip install "sf-hamilton[ui,sdk]" +RUN pip install flask + +ENV HAMILTON_ALLOWED_HOSTS=".snowflakecomputing.app" +ENV SERVER_PORT=8001 + +COPY pipeline_endpoint.py /pipeline_endpoint.py +COPY my_functions.py /my_functions.py + +ENTRYPOINT /bin/bash -c "(hamilton ui --base-dir /hamilton-basedir &) && python /pipeline_endpoint.py" diff --git a/examples/snowflake/hamilton_ui/README.md b/examples/snowflake/hamilton_ui/README.md new file mode 100644 index 000000000..9f5fc7174 --- /dev/null +++ b/examples/snowflake/hamilton_ui/README.md @@ -0,0 +1,16 @@ +# Running the Hamilton & the Hamilton UI in Snowflake + +This example is code for the ["Observability of Python code and application logic with Hamilton UI on Snowflake Container Services" post](https://medium.com/@pkantyka/observability-of-python-code-and-application-logic-with-hamilton-ui-on-snowflake-container-services-a26693b46635) by +[Greg Kantyka](https://medium.com/@pkantyka). + +Here we show the code required to be packaged up for use on Snowflake: + +1. Docker file that runs the Hamilton UI and a flask endpoint to exercise Hamilton code +2. my_functions.py - the Hamilton code that is exercised by the flask endpoint +3. pipeline_endpoint.py - the flask endpoint that exercises the Hamilton code + +To run see: + - snowflake.sql that contains all the SQL to create the necessary objects in Snowflake and exercise things. + +For more details see ["Observability of Python code and application logic with Hamilton UI on Snowflake Container Services" post](https://medium.com/@pkantyka/observability-of-python-code-and-application-logic-with-hamilton-ui-on-snowflake-container-services-a26693b46635) by +[Greg Kantyka](https://medium.com/@pkantyka). diff --git a/examples/snowflake/hamilton_ui/my_functions.py b/examples/snowflake/hamilton_ui/my_functions.py new file mode 100644 index 000000000..387d0b9fc --- /dev/null +++ b/examples/snowflake/hamilton_ui/my_functions.py @@ -0,0 +1,21 @@ +import pandas as pd + + +def spend_mean(spend: pd.Series) -> float: + """Shows function creating a scalar. In this case it computes the mean of the entire column.""" + return spend.mean() + + +def spend_zero_mean(spend: pd.Series, spend_mean: float) -> pd.Series: + """Shows function that takes a scalar. In this case to zero mean spend.""" + return spend - spend_mean + + +def spend_std_dev(spend: pd.Series) -> float: + """Function that computes the standard deviation of the spend column.""" + return spend.std() + + +def spend_zero_mean_unit_variance(spend_zero_mean: pd.Series, spend_std_dev: float) -> pd.Series: + """Function showing one way to make spend have zero mean and unit variance.""" + return spend_zero_mean / spend_std_dev diff --git a/examples/snowflake/hamilton_ui/pipeline_endpoint.py b/examples/snowflake/hamilton_ui/pipeline_endpoint.py new file mode 100644 index 000000000..7a787c8d2 --- /dev/null +++ b/examples/snowflake/hamilton_ui/pipeline_endpoint.py @@ -0,0 +1,118 @@ +""" +This module: +- Defines a flask app that listens for POST requests on /echo +- the /echo command is invoked from a Snowflake SQL query +- the /echo command calls a function get_response that is defined in this module +- get_response uses Hamilton to execute a pipeline defined in my_functions.py +- my_functions.py contains the functions that are used in the pipeline +- the pipeline is executed with the input data from the Snowflake query +- the output of the pipeline is returned to Snowflake +- the Hamilton UI tracker is used to track the execution of the pipeline +""" + +import logging +import os +import sys + +import pandas as pd +from flask import Flask, make_response, request + +from hamilton import registry + +registry.disable_autoload() +import my_functions # we import the module here! + +from hamilton import driver +from hamilton_sdk import adapters + +# WRAPPER CODE FOR SNOWFLAKE FUNCTION ###### + +SERVICE_HOST = os.getenv("SERVER_HOST", "0.0.0.0") +SERVICE_PORT = os.getenv("SERVER_PORT", 8080) +CHARACTER_NAME = os.getenv("CHARACTER_NAME", "I") + + +def get_logger(logger_name): + logger = logging.getLogger(logger_name) + logger.setLevel(logging.DEBUG) + handler = logging.StreamHandler(sys.stdout) + handler.setLevel(logging.DEBUG) + handler.setFormatter(logging.Formatter("%(name)s [%(asctime)s] [%(levelname)s] %(message)s")) + logger.addHandler(handler) + return logger + + +logger = get_logger("echo-service") + +app = Flask(__name__) + + +@app.get("/healthcheck") +def readiness_probe(): + return "OK" + + +@app.post("/echo") +def echo(): + """This is the endpoint that Snowflake will call to run Hamilton code.""" + message = request.json + logger.debug(f"Received request: {message}") + + if message is None or not message["data"]: + logger.info("Received empty message") + return {} + + input_rows = message["data"] + logger.info(f"Received {len(input_rows)} rows") + + output_rows = [[row[0], get_response(row[1], row[2], row[3], row[4])] for row in input_rows] + logger.info(f"Produced {len(output_rows)} rows") + + response = make_response({"data": output_rows}) + response.headers["Content-type"] = "application/json" + logger.debug(f"Sending response: {response.json}") + return response + + +# END OF WRAPPER CODE FOR SNOWFLAKE FUNCTION ###### + + +def get_response(prj_id, spend, signups, output_columns): + """The function that is called from SQL on Snowflake.""" + tracker = adapters.HamiltonTracker( + project_id=prj_id, + username="admin", + dag_name="MYDAG", + tags={"environment": "R&D", "team": "MY_TEAM", "version": "Beta"}, + ) + input_columns = { + "signups": pd.Series(spend), + "spend": pd.Series(signups), + } + dr = ( + driver.Builder() + .with_config({}) # we don't have any configuration or invariant data for this example. + .with_modules( + my_functions + ) # we need to tell hamilton where to load function definitions from + .with_adapters(tracker) # we add the Hamilton UI tracker + .build() + ) + + df = dr.execute(output_columns, inputs=input_columns) + + serializable_df = {} + + for key, value in df.items(): + if isinstance(value, pd.Series): + # Convert Series to dict (or .tolist() for just values) + serializable_df[key] = {str(k): v for k, v in value.to_dict().items()} + else: + # Pass other values as is + serializable_df[key] = value + + return serializable_df + + +if __name__ == "__main__": + app.run(host=SERVICE_HOST, port=SERVICE_PORT) diff --git a/examples/snowflake/hamilton_ui/requirements.txt b/examples/snowflake/hamilton_ui/requirements.txt new file mode 100644 index 000000000..7e573a31a --- /dev/null +++ b/examples/snowflake/hamilton_ui/requirements.txt @@ -0,0 +1,2 @@ +flask +sf-hamilton[ui,sdk] diff --git a/examples/snowflake/hamilton_ui/snowflake.sql b/examples/snowflake/hamilton_ui/snowflake.sql new file mode 100644 index 000000000..7cad26748 --- /dev/null +++ b/examples/snowflake/hamilton_ui/snowflake.sql @@ -0,0 +1,94 @@ +-- For more details visit: +-- https://medium.com/@pkantyka/observability-of-python-code-and-application-logic-with-hamilton-ui-on-snowflake-container-services-a26693b46635 + +CREATE SERVICE public.hamilton_ui +IN COMPUTE POOL TEST_POOL +FROM SPECIFICATION $$ +spec: + containers: + - name: hamiltonui + image: ////snowflake-hamilton-ui + volumeMounts: + - name: hamilton-basedir + mountPath: /hamilton-basedir + endpoints: + - name: entrypoint + port: 8001 + - name: hamilton + port: 8241 + public: true + volumes: + - name: hamilton-basedir + source: "@..hamilton_base" + $$ +QUERY_WAREHOUSE = +; + +CALL SYSTEM$GET_SERVICE_STATUS('..hamilton_ui'); + +CALL SYSTEM$GET_SERVICE_LOGS('..hamilton_ui', '0', 'hammiltonui', 1000); + +SHOW ENDPOINTS IN SERVICE public.hamilton_ui; + +CREATE OR REPLACE FUNCTION public.hamilton_pipeline (prj_id number, signups variant, spend variant, output_columns variant) + RETURNS VARIANT + SERVICE=public.hamilton_ui + ENDPOINT=entrypoint + AS '/echo'; + + +SELECT + public.hamilton_pipeline ( + 1, + [1, 10, 50, 100, 200, 400], + [10, 10, 20, 40, 40, 50], + [ 'spend', 'signups', 'spend_std_dev', 'spend_zero_mean_unit_variance' ] + ) as data; + +WITH input_data AS ( + SELECT + public.hamilton_pipeline ( + 1, + [1, 10, 50, 100, 200, 400], + [10, 10, 20, 40, 40, 50], + [ 'spend', 'signups', 'spend_std_dev', 'spend_zero_mean_unit_variance' ] + ) as data +), +flattened AS ( + SELECT + key AS metric_key, + value AS metric_value + FROM + input_data + left join LATERAL FLATTEN(input_data.data) +) +SELECT + * +FROM + flattened f; + +WITH input_data AS ( + SELECT + public.hamilton_pipeline ( + 1, + [1, 10, 50, 100, 200, 400], + [10, 10, 20, 40, 40, 50], + [ 'spend', 'signups', 'spend_std_dev', 'spend_zero_mean_unit_variance' ] + ) as data +), +flattened AS ( + SELECT + key AS metric_key, + value AS metric_value + FROM + input_data + left join LATERAL FLATTEN(input_data.data) +) +SELECT + f2.key, + f2.value +FROM + flattened f + left join lateral flatten(metric_value) f2 +where + metric_key = 'spend_zero_mean_unit_variance';