-
Notifications
You must be signed in to change notification settings - Fork 131
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Adds code for snowflake container example running the UI (#1257)
* Adds code for snowflake container example running the UI Note: this is a toy example. For real production needs, you'd need to modify a few things: 1. not use SQLLITE, and instead postgresql, or implement django-snowflake connection. 2. likely not use the Hamilton code within the flask container, instead package up properly and define a UDF or UDTF. 3. could use snowpark dataframes or have hamilton code do other things, e.g. LLM calls.. See https://medium.com/@pkantyka/observability-of-python-code-and-application-logic-with-hamilton-ui-on-snowflake-container-services-a26693b46635 for the write up.
- Loading branch information
Showing
7 changed files
with
271 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,14 @@ | ||
FROM python:3.13.1-slim | ||
|
||
RUN apt update | ||
RUN apt upgrade sqlite3 -y | ||
RUN pip install "sf-hamilton[ui,sdk]" | ||
RUN pip install flask | ||
|
||
ENV HAMILTON_ALLOWED_HOSTS=".snowflakecomputing.app" | ||
ENV SERVER_PORT=8001 | ||
|
||
COPY pipeline_endpoint.py /pipeline_endpoint.py | ||
COPY my_functions.py /my_functions.py | ||
|
||
ENTRYPOINT /bin/bash -c "(hamilton ui --base-dir /hamilton-basedir &) && python /pipeline_endpoint.py" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,16 @@ | ||
# Running the Hamilton & the Hamilton UI in Snowflake | ||
|
||
This example is code for the ["Observability of Python code and application logic with Hamilton UI on Snowflake Container Services" post](https://medium.com/@pkantyka/observability-of-python-code-and-application-logic-with-hamilton-ui-on-snowflake-container-services-a26693b46635) by | ||
[Greg Kantyka](https://medium.com/@pkantyka). | ||
|
||
Here we show the code required to be packaged up for use on Snowflake: | ||
|
||
1. Docker file that runs the Hamilton UI and a flask endpoint to exercise Hamilton code | ||
2. my_functions.py - the Hamilton code that is exercised by the flask endpoint | ||
3. pipeline_endpoint.py - the flask endpoint that exercises the Hamilton code | ||
|
||
To run see: | ||
- snowflake.sql that contains all the SQL to create the necessary objects in Snowflake and exercise things. | ||
|
||
For more details see ["Observability of Python code and application logic with Hamilton UI on Snowflake Container Services" post](https://medium.com/@pkantyka/observability-of-python-code-and-application-logic-with-hamilton-ui-on-snowflake-container-services-a26693b46635) by | ||
[Greg Kantyka](https://medium.com/@pkantyka). |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,21 @@ | ||
import pandas as pd | ||
|
||
|
||
def spend_mean(spend: pd.Series) -> float: | ||
"""Shows function creating a scalar. In this case it computes the mean of the entire column.""" | ||
return spend.mean() | ||
|
||
|
||
def spend_zero_mean(spend: pd.Series, spend_mean: float) -> pd.Series: | ||
"""Shows function that takes a scalar. In this case to zero mean spend.""" | ||
return spend - spend_mean | ||
|
||
|
||
def spend_std_dev(spend: pd.Series) -> float: | ||
"""Function that computes the standard deviation of the spend column.""" | ||
return spend.std() | ||
|
||
|
||
def spend_zero_mean_unit_variance(spend_zero_mean: pd.Series, spend_std_dev: float) -> pd.Series: | ||
"""Function showing one way to make spend have zero mean and unit variance.""" | ||
return spend_zero_mean / spend_std_dev |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,118 @@ | ||
""" | ||
This module: | ||
- Defines a flask app that listens for POST requests on /echo | ||
- the /echo command is invoked from a Snowflake SQL query | ||
- the /echo command calls a function get_response that is defined in this module | ||
- get_response uses Hamilton to execute a pipeline defined in my_functions.py | ||
- my_functions.py contains the functions that are used in the pipeline | ||
- the pipeline is executed with the input data from the Snowflake query | ||
- the output of the pipeline is returned to Snowflake | ||
- the Hamilton UI tracker is used to track the execution of the pipeline | ||
""" | ||
|
||
import logging | ||
import os | ||
import sys | ||
|
||
import pandas as pd | ||
from flask import Flask, make_response, request | ||
|
||
from hamilton import registry | ||
|
||
registry.disable_autoload() | ||
import my_functions # we import the module here! | ||
|
||
from hamilton import driver | ||
from hamilton_sdk import adapters | ||
|
||
# WRAPPER CODE FOR SNOWFLAKE FUNCTION ###### | ||
|
||
SERVICE_HOST = os.getenv("SERVER_HOST", "0.0.0.0") | ||
SERVICE_PORT = os.getenv("SERVER_PORT", 8080) | ||
CHARACTER_NAME = os.getenv("CHARACTER_NAME", "I") | ||
|
||
|
||
def get_logger(logger_name): | ||
logger = logging.getLogger(logger_name) | ||
logger.setLevel(logging.DEBUG) | ||
handler = logging.StreamHandler(sys.stdout) | ||
handler.setLevel(logging.DEBUG) | ||
handler.setFormatter(logging.Formatter("%(name)s [%(asctime)s] [%(levelname)s] %(message)s")) | ||
logger.addHandler(handler) | ||
return logger | ||
|
||
|
||
logger = get_logger("echo-service") | ||
|
||
app = Flask(__name__) | ||
|
||
|
||
@app.get("/healthcheck") | ||
def readiness_probe(): | ||
return "OK" | ||
|
||
|
||
@app.post("/echo") | ||
def echo(): | ||
"""This is the endpoint that Snowflake will call to run Hamilton code.""" | ||
message = request.json | ||
logger.debug(f"Received request: {message}") | ||
|
||
if message is None or not message["data"]: | ||
logger.info("Received empty message") | ||
return {} | ||
|
||
input_rows = message["data"] | ||
logger.info(f"Received {len(input_rows)} rows") | ||
|
||
output_rows = [[row[0], get_response(row[1], row[2], row[3], row[4])] for row in input_rows] | ||
logger.info(f"Produced {len(output_rows)} rows") | ||
|
||
response = make_response({"data": output_rows}) | ||
response.headers["Content-type"] = "application/json" | ||
logger.debug(f"Sending response: {response.json}") | ||
return response | ||
|
||
|
||
# END OF WRAPPER CODE FOR SNOWFLAKE FUNCTION ###### | ||
|
||
|
||
def get_response(prj_id, spend, signups, output_columns): | ||
"""The function that is called from SQL on Snowflake.""" | ||
tracker = adapters.HamiltonTracker( | ||
project_id=prj_id, | ||
username="admin", | ||
dag_name="MYDAG", | ||
tags={"environment": "R&D", "team": "MY_TEAM", "version": "Beta"}, | ||
) | ||
input_columns = { | ||
"signups": pd.Series(spend), | ||
"spend": pd.Series(signups), | ||
} | ||
dr = ( | ||
driver.Builder() | ||
.with_config({}) # we don't have any configuration or invariant data for this example. | ||
.with_modules( | ||
my_functions | ||
) # we need to tell hamilton where to load function definitions from | ||
.with_adapters(tracker) # we add the Hamilton UI tracker | ||
.build() | ||
) | ||
|
||
df = dr.execute(output_columns, inputs=input_columns) | ||
|
||
serializable_df = {} | ||
|
||
for key, value in df.items(): | ||
if isinstance(value, pd.Series): | ||
# Convert Series to dict (or .tolist() for just values) | ||
serializable_df[key] = {str(k): v for k, v in value.to_dict().items()} | ||
else: | ||
# Pass other values as is | ||
serializable_df[key] = value | ||
|
||
return serializable_df | ||
|
||
|
||
if __name__ == "__main__": | ||
app.run(host=SERVICE_HOST, port=SERVICE_PORT) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,2 @@ | ||
flask | ||
sf-hamilton[ui,sdk] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,94 @@ | ||
-- For more details visit: | ||
-- https://medium.com/@pkantyka/observability-of-python-code-and-application-logic-with-hamilton-ui-on-snowflake-container-services-a26693b46635 | ||
|
||
CREATE SERVICE public.hamilton_ui | ||
IN COMPUTE POOL TEST_POOL | ||
FROM SPECIFICATION $$ | ||
spec: | ||
containers: | ||
- name: hamiltonui | ||
image: <account-url-registry-host>/<db-name>/<schema-name>/<repo-name>/snowflake-hamilton-ui | ||
volumeMounts: | ||
- name: hamilton-basedir | ||
mountPath: /hamilton-basedir | ||
endpoints: | ||
- name: entrypoint | ||
port: 8001 | ||
- name: hamilton | ||
port: 8241 | ||
public: true | ||
volumes: | ||
- name: hamilton-basedir | ||
source: "@<db-name>.<schema-name>.hamilton_base" | ||
$$ | ||
QUERY_WAREHOUSE = <warehause-name> | ||
; | ||
|
||
CALL SYSTEM$GET_SERVICE_STATUS('<db-name>.<schema>.hamilton_ui'); | ||
|
||
CALL SYSTEM$GET_SERVICE_LOGS('<db-name>.<schema>.hamilton_ui', '0', 'hammiltonui', 1000); | ||
|
||
SHOW ENDPOINTS IN SERVICE public.hamilton_ui; | ||
|
||
CREATE OR REPLACE FUNCTION public.hamilton_pipeline (prj_id number, signups variant, spend variant, output_columns variant) | ||
RETURNS VARIANT | ||
SERVICE=public.hamilton_ui | ||
ENDPOINT=entrypoint | ||
AS '/echo'; | ||
|
||
|
||
SELECT | ||
public.hamilton_pipeline ( | ||
1, | ||
[1, 10, 50, 100, 200, 400], | ||
[10, 10, 20, 40, 40, 50], | ||
[ 'spend', 'signups', 'spend_std_dev', 'spend_zero_mean_unit_variance' ] | ||
) as data; | ||
|
||
WITH input_data AS ( | ||
SELECT | ||
public.hamilton_pipeline ( | ||
1, | ||
[1, 10, 50, 100, 200, 400], | ||
[10, 10, 20, 40, 40, 50], | ||
[ 'spend', 'signups', 'spend_std_dev', 'spend_zero_mean_unit_variance' ] | ||
) as data | ||
), | ||
flattened AS ( | ||
SELECT | ||
key AS metric_key, | ||
value AS metric_value | ||
FROM | ||
input_data | ||
left join LATERAL FLATTEN(input_data.data) | ||
) | ||
SELECT | ||
* | ||
FROM | ||
flattened f; | ||
|
||
WITH input_data AS ( | ||
SELECT | ||
public.hamilton_pipeline ( | ||
1, | ||
[1, 10, 50, 100, 200, 400], | ||
[10, 10, 20, 40, 40, 50], | ||
[ 'spend', 'signups', 'spend_std_dev', 'spend_zero_mean_unit_variance' ] | ||
) as data | ||
), | ||
flattened AS ( | ||
SELECT | ||
key AS metric_key, | ||
value AS metric_value | ||
FROM | ||
input_data | ||
left join LATERAL FLATTEN(input_data.data) | ||
) | ||
SELECT | ||
f2.key, | ||
f2.value | ||
FROM | ||
flattened f | ||
left join lateral flatten(metric_value) f2 | ||
where | ||
metric_key = 'spend_zero_mean_unit_variance'; |