-
Notifications
You must be signed in to change notification settings - Fork 133
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
* requirements.txt added openai migrated for summarization.py draft experiment manager Experiment Manager v1 Update README.md image artifacts components; docstrings added entrypoint added; README completed gif added to README refactor to hamilton.plugins requirements.txt added add GraphConstructionHook to __init__.py with other public hooks; updated ExperimentTracker hook accordingly openai migrated for summarization.py draft experiment manager Experiment Manager v1 image artifacts components; docstrings added entrypoint added; README completed gif added to README refactor to hamilton.plugins (+1 squashed commit) Squashed commits: [cfdf020] requirements.txt added openai migrated for summarization.py draft experiment manager * A few minor fixes/improvements to Experimentation Manager (#681) * Some minor changes to experimentation manager: 1. generalizes the experimentation directory (no longer relies on .dat or .db) 2. Changes the sort order for displaying experiments (recent on top) 3. Changes the `path` result to work for more than just parquet, and with us standardizing it * Adds instructinos for running the example from the example directory --------- Co-authored-by: zilto <tjean@DESKTOP-V6JDCS2> Co-authored-by: Elijah ben Izzy <[email protected]>
- Loading branch information
1 parent
aa8af83
commit 9b80a05
Showing
15 changed files
with
1,170 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
|
||
fastapi | ||
fastui | ||
openai | ||
pypdf | ||
sf-hamilton[visualization] | ||
tiktoken |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,152 @@ | ||
import os | ||
import tempfile | ||
from typing import Generator, Union | ||
|
||
import tiktoken | ||
from openai import OpenAI | ||
from pypdf import PdfReader | ||
|
||
from hamilton.htypes import Collect, Parallelizable | ||
|
||
|
||
def openai_client() -> OpenAI: | ||
return OpenAI(api_key=os.environ["OPENAI_API_KEY"]) | ||
|
||
|
||
def raw_text(pdf_source: Union[str, bytes, tempfile.SpooledTemporaryFile]) -> str: | ||
"""Takes a filepath to a PDF and returns a string of the PDF's contents | ||
:param pdf_source: the path, or the temporary file, to the PDF. | ||
:return: the text of the PDF. | ||
""" | ||
reader = PdfReader(pdf_source) | ||
_pdf_text = "" | ||
page_number = 0 | ||
for page in reader.pages: | ||
page_number += 1 | ||
_pdf_text += page.extract_text() + f"\nPage Number: {page_number}" | ||
return _pdf_text | ||
|
||
|
||
def tokenizer(tokenizer_encoding: str = "cl100k_base") -> tiktoken.core.Encoding: | ||
"""Get OpenAI tokenizer""" | ||
return tiktoken.get_encoding(tokenizer_encoding) | ||
|
||
|
||
def _create_chunks( | ||
text: str, tokenizer: tiktoken.core.Encoding, max_length: int | ||
) -> Generator[str, None, None]: | ||
"""Return successive chunks of size `max_length` tokens from provided text. | ||
Split a text into smaller chunks of size n, preferably ending at the end of a sentence | ||
""" | ||
tokens = tokenizer.encode(text) | ||
i = 0 | ||
while i < len(tokens): | ||
# Find the nearest end of sentence within a range of 0.5 * n and 1.5 * n tokens | ||
j = min(i + int(1.5 * max_length), len(tokens)) | ||
while j > i + int(0.5 * max_length): | ||
# Decode the tokens and check for full stop or newline | ||
chunk = tokenizer.decode(tokens[i:j]) | ||
if chunk.endswith(".") or chunk.endswith("\n"): | ||
break | ||
j -= 1 | ||
# If no end of sentence found, use n tokens as the chunk size | ||
if j == i + int(0.5 * max_length): | ||
j = min(i + max_length, len(tokens)) | ||
yield tokens[i:j] | ||
i = j | ||
|
||
|
||
def chunked_text( | ||
raw_text: str, tokenizer: tiktoken.core.Encoding, max_token_length: int = 800 | ||
) -> list[str]: | ||
"""Tokenize text; create chunks of size `max_token_length`; | ||
for each chunk, convert tokens back to text string | ||
""" | ||
_encoded_chunks = _create_chunks(raw_text, tokenizer, max_token_length) | ||
_decoded_chunks = [tokenizer.decode(chunk) for chunk in _encoded_chunks] | ||
return _decoded_chunks | ||
|
||
|
||
def chunk_to_summarize(chunked_text: list[str]) -> Parallelizable[str]: | ||
"""Iterate over chunks that didn't have a stored summary""" | ||
for chunk in chunked_text: | ||
yield chunk | ||
|
||
|
||
def _summarize_text__openai(openai_client: OpenAI, prompt: str, openai_gpt_model: str) -> str: | ||
"""Use OpenAI chat API to ask a model to summarize content contained in a prompt""" | ||
response = openai_client.chat.completions.create( | ||
model=openai_gpt_model, messages=[{"role": "user", "content": prompt}], temperature=0 | ||
) | ||
return response.choices[0].message.content | ||
|
||
|
||
def prompt_to_summarize_chunk() -> str: | ||
"""Base prompt for summarize a chunk of text""" | ||
return f"Extract key points with reasoning into a bulleted format.\n\nContent:{{content}}" # noqa: F541 | ||
|
||
|
||
def chunk_summary( | ||
openai_client: OpenAI, | ||
chunk_to_summarize: str, | ||
prompt_to_summarize_chunk: str, | ||
openai_gpt_model: str, | ||
) -> str: | ||
"""Fill a base prompt with a chunk's content and summarize it; | ||
Store the summary in the chunk object | ||
""" | ||
filled_prompt = prompt_to_summarize_chunk.format(content=chunk_to_summarize) | ||
return _summarize_text__openai(openai_client, filled_prompt, openai_gpt_model) | ||
|
||
|
||
def prompt_to_reduce_summaries() -> str: | ||
"""Prompt for a "reduce" operation to summarize a list of summaries into a single text""" | ||
return f"""Write a summary from this collection of key points. | ||
First answer the question in two sentences. Then, highlight the core argument, conclusions and evidence. | ||
User query: {{query}} | ||
The summary should be structured in bulleted lists following the headings Answer, Core Argument, Evidence, and Conclusions. | ||
Key points:\n{{chunks_summary}}\nSummary:\n""" # noqa: F541 | ||
|
||
|
||
def chunk_summary_collection(chunk_summary: Collect[str]) -> list[str]: | ||
"""Collect chunks for which a summary was just computed""" | ||
return chunk_summary | ||
|
||
|
||
def final_summary( | ||
openai_client: OpenAI, | ||
query: str, | ||
chunk_summary_collection: list[str], | ||
prompt_to_reduce_summaries: str, | ||
openai_gpt_model: str, | ||
) -> str: | ||
"""Concatenate the list of chunk summaries into a single text,fill the prompt template, | ||
and use OpenAI to reduce the content into a single summary; | ||
""" | ||
concatenated_summaries = " ".join(chunk_summary_collection) | ||
filled_prompt = prompt_to_reduce_summaries.format( | ||
query=query, chunks_summary=concatenated_summaries | ||
) | ||
return _summarize_text__openai(openai_client, filled_prompt, openai_gpt_model) | ||
|
||
|
||
if __name__ == "__main__": | ||
import summarization | ||
|
||
from hamilton import driver | ||
|
||
dr = ( | ||
driver.Builder() | ||
.enable_dynamic_execution(allow_experimental_mode=True) | ||
.with_modules(summarization) | ||
.build() | ||
) | ||
dr.display_all_functions("./docs/summary", {"view": False, "format": "png"}, orient="TB") | ||
|
||
inputs = dict( | ||
pdf_source="./data/hamilton_paper.pdf", | ||
openai_gpt_model="gpt-3.5-turbo-0613", | ||
query="What are the main benefits of this tool?", | ||
) | ||
|
||
results = dr.execute(["final_summary"], inputs=inputs) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,125 @@ | ||
# Hamilton Experiment Manager | ||
|
||
Add a hook to your Hamilton Driver to log runs and visualize artifacts and metadata! The server is built using FastAPI + FastUI allowing to easily integrate the server within your app or extend the UI. | ||
|
||
<p align="center"> | ||
<img src="./showcase.gif" height=600, width=auto/> | ||
</p> | ||
|
||
## Features | ||
- 📝 Track run metadata (config, inputs, code version, etc.) | ||
- 📦 Generate directories to store your run artifacts | ||
- 📡 Launch a local server to view and explore your experiments | ||
|
||
## Installation | ||
Use `pip install sf-hamilton[experiments]` to install both the hook and the server with their dependencies | ||
|
||
## How to use the ExperimentTracker hook | ||
The `ExperimentTracker` hook can be added to your Hamilton Driver definition to automatically log metadata about the run and materialized results. | ||
|
||
1. Create the `ExperimentTracker` hook object: | ||
- `experiment_name`: name to organize related runs. Is used to create directories and displayed in the UI | ||
- `base_directory`: path where the metadata cache and subdirectories to store artifacts will be created. Default is `./experiments`. | ||
2. Create the Hamilton Driver and pass the `ExperimentTracker` to `with_adapters()` method | ||
3. Define materializers for each artifacts you care about. The UI provides rich support for `parquet`, `csv`, and `json`. | ||
- ⚠ Make sure to use relative paths (ones that don't start with `/`) for artifacts to be stored in run directories. | ||
4. Call `dr.materialize()` to launch run for which metadata and artifacts will be tracked. | ||
5. (Optional) Use `dr.visualize_materialization()` to visualize the dataflow and set `output_file_path` with the run directory `tracker_hook.run_directory` | ||
|
||
### Running the example | ||
|
||
To run in the example directory, do the following: | ||
|
||
```bash | ||
cd examples/experiment_management | ||
pip install -r requirements.txt # or use your favorite env manager | ||
python run.py | ||
h_experiments # initialize/run the server | ||
``` | ||
|
||
Then navigate to `http://localhost:8123` to view the experiment manager! Then you'll want to integrate it into your own workflow. | ||
|
||
### Integrating your own | ||
```python | ||
from hamilton import driver | ||
from hamilton.plugins import h_experiments | ||
|
||
import my_functions # <- your Hamilton module | ||
|
||
|
||
# 1. create the hook | ||
tracker_hook = h_experiments.ExperimentTracker( | ||
experiment_name="hello-world", | ||
base_directory="/path/to/experiments", | ||
) | ||
|
||
# 2. create driver with modules and hook | ||
dr = ( | ||
driver.Builder() | ||
.with_modules(my_functions) | ||
.with_adapters(tracker_hook) | ||
.build() | ||
) | ||
|
||
# 3. define materializers (absolute or relative path) | ||
materializers = [ | ||
# notice the relative paths (don't start with "/") | ||
to.json( | ||
id="model_performance__json", | ||
dependencies=["model_performance"], | ||
path="./model_performance.json", | ||
), | ||
to.parquet( | ||
id="training_data__parquet", | ||
dependencies=["training_data"], | ||
path="./training_data.parquet", | ||
), | ||
] | ||
|
||
# 4. launch run using `.materialize()` | ||
dr.materialize(*materializers) | ||
|
||
# 5. (optional) visualize materialization and store the figure | ||
# under the `tracker_hook.run_directory` path | ||
dr.visualize_materialization( | ||
*materializers, | ||
output_file_path=f"{tracker_hook.run_directory}/dag", | ||
) | ||
``` | ||
|
||
## How to use the experiment server | ||
The experiment server is a local FastAPI server that reads the run metadata cache and mounts the `base_directory` to view and explore results. The frontend uses FastUI to create a React interface from Python. | ||
|
||
### Start the FastAPI server | ||
``` | ||
h_experiments | ||
``` | ||
|
||
You should see in the terminal: | ||
``` | ||
INFO: Started server process [24113] | ||
INFO: Waiting for application startup. | ||
INFO: Application startup complete. | ||
INFO: Uvicorn running on http://127.0.0.1:8123 (Press CTRL+C to quit) | ||
``` | ||
### Set the experiments directory | ||
``` | ||
h_experiments $/path/to/base_directory | ||
``` | ||
|
||
You can use an absolute or relative path. Default is `./experiments` | ||
|
||
### Set host and port | ||
``` | ||
h_experiments --host $HOST --port $PORT | ||
``` | ||
Defaults are `127.0.0.1` and `8123` | ||
|
||
## What's next? | ||
Let us know how you find the experiment manager and features you'd like to see! This project is still early/experimental and there are several interesting avenues: | ||
- Materialize artifacts to cloud storage | ||
- User interface to view node-level code diffs | ||
- Performance profiling of runs | ||
- User interface to launch runs | ||
|
||
Given this is a FastAPI server, you can easily extend it yourself and mount it as a subroute for your own application! |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,103 @@ | ||
import matplotlib.pyplot as plt | ||
import numpy as np | ||
import pandas as pd | ||
from matplotlib.figure import Figure | ||
from sklearn.base import BaseEstimator, clone | ||
from sklearn.datasets import load_diabetes | ||
from sklearn.decomposition import PCA | ||
from sklearn.ensemble import HistGradientBoostingRegressor, RandomForestRegressor | ||
from sklearn.linear_model import LinearRegression | ||
from sklearn.metrics import mean_squared_error | ||
from sklearn.model_selection import KFold | ||
|
||
from hamilton.function_modifiers import config, extract_fields | ||
|
||
|
||
@extract_fields(dict(X_raw=np.ndarray, y=np.ndarray)) | ||
def load_data() -> dict: | ||
X_raw, y = load_diabetes(return_X_y=True) | ||
return dict(X_raw=X_raw, y=y) | ||
|
||
|
||
def splits(X_raw: np.ndarray, n_splits: int = 3) -> list[tuple]: | ||
fold = KFold(n_splits=n_splits) | ||
return [(train_idx, eval_idx) for train_idx, eval_idx in fold.split(X_raw)] | ||
|
||
|
||
@config.when_not_in(preprocess=["pca"]) | ||
def X__base(X_raw: np.ndarray) -> np.ndarray: | ||
return X_raw | ||
|
||
|
||
@config.when(preprocess="pca") | ||
def X__pca(X_raw: np.ndarray, n_components: int = 5) -> np.ndarray: | ||
pca = PCA(n_components=n_components) | ||
return pca.fit_transform(X_raw) | ||
|
||
|
||
@config.when(model="linear") | ||
def base_model__linear() -> BaseEstimator: | ||
return LinearRegression() | ||
|
||
|
||
@config.when(model="random_forest") | ||
def base_model__random_forest() -> BaseEstimator: | ||
return RandomForestRegressor() | ||
|
||
|
||
@config.when(model="boosting") | ||
def base_model__boosting() -> BaseEstimator: | ||
return HistGradientBoostingRegressor() | ||
|
||
|
||
@extract_fields( | ||
dict( | ||
y_pred=np.ndarray, | ||
cv_scores=list, | ||
) | ||
) | ||
def cross_validation( | ||
X: np.ndarray, | ||
y: np.ndarray, | ||
base_model: BaseEstimator, | ||
splits: list[tuple], | ||
) -> dict: | ||
cv_scores = [] | ||
all_pred = np.zeros(y.shape[0]) | ||
for train_idx, eval_idx in splits: | ||
model = clone(base_model) | ||
|
||
X_train, y_train = X[train_idx], y[train_idx] | ||
X_eval, y_eval = X[eval_idx], y[eval_idx] | ||
|
||
model.fit(X_train, y_train) | ||
|
||
y_eval_pred = model.predict(X_eval) | ||
all_pred[eval_idx] = y_eval_pred | ||
|
||
cv_score = mean_squared_error(y_eval, y_eval_pred) | ||
cv_scores.append(cv_score) | ||
|
||
return dict(y_pred=all_pred, cv_scores=cv_scores) | ||
|
||
|
||
def trained_model( | ||
base_model: BaseEstimator, | ||
X: np.ndarray, | ||
y: np.ndarray, | ||
) -> BaseEstimator: | ||
base_model.fit(X, y) | ||
return base_model | ||
|
||
|
||
def prediction_df(y: np.ndarray, y_pred: np.ndarray) -> pd.DataFrame: | ||
return pd.DataFrame.from_dict(dict(y_true=y, y_pred=y_pred), orient="columns") | ||
|
||
|
||
def prediction_plot(y: np.ndarray, y_pred: np.ndarray) -> Figure: | ||
fig, ax = plt.subplots() | ||
ax.scatter(y, y_pred) | ||
ax.set_xlabel("True") | ||
ax.set_ylabel("Predicted") | ||
|
||
return fig |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
fastparquet | ||
matplotlib | ||
numpy | ||
pandas | ||
pyarrow | ||
scikit-learn | ||
sf-hamilton[experiments,visualization] |
Oops, something went wrong.