Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin' into releases/rc-trulens-1.2.9
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-chu committed Dec 3, 2024
2 parents eae7216 + a0fcfce commit 0614871
Show file tree
Hide file tree
Showing 15 changed files with 814 additions and 29 deletions.
213 changes: 213 additions & 0 deletions examples/experimental/pace_example.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# This notebook demonstrates how to use the Pace utility\n",
"\n",
"This utility can be used to limit the rate of API requests to external endpoints. "
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import asyncio\n",
"import random\n",
"import threading\n",
"import time\n",
"\n",
"from IPython import display\n",
"from trulens.core.utils.pace import Pace"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Create a pace instance with 2 second per period and 20 marks per second. The\n",
"# average of 20 marks per second will be maintained across any 2 second period\n",
"# but this makes it possible for an initial burst of 20 marks immediately. This\n",
"# is due to the assumption that there were no marks before the process started.\n",
"\n",
"# If seconds_per_period is increased, a larger burst of marks will be possible\n",
"# before the average marks per second since the start of the process stabilizes.\n",
"# A larger burst also means there will be a delay until the next period before\n",
"# marks can return again. A \"burstiness\" warning is issue the first time a delay\n",
"# longer than half of the seconds_per_period is encountered.\n",
"\n",
"p = Pace(seconds_per_period=2, marks_per_second=20)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# start time and counter\n",
"st = time.time()\n",
"count = 0\n",
"\n",
"while True:\n",
" # Mark and increment counter. Calls to mark will block to maintain pace.\n",
" p.mark()\n",
" count += 1\n",
"\n",
" et = time.time()\n",
" display.clear_output(wait=True)\n",
"\n",
" # Show stats of the marks rate since the start of this cell.\n",
" print(f\"\"\"\n",
"Elapsed time: {et - st}\n",
"Marks count: {count}\n",
"Marks per second: {count / (et - st)}\n",
"\"\"\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Pace across Threads\n",
"\n",
"The pacing should be maintained even if a single Pace instance is used across\n",
"multiple threads."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"num_threads = 10\n",
"count = 0\n",
"\n",
"\n",
"# Create a function to run in each thread and update the count for each mark:\n",
"def marker():\n",
" global count\n",
"\n",
" while True:\n",
" # Mark and increment counter. Calls to mark will block to maintain pace.\n",
" p.mark()\n",
" count += 1\n",
"\n",
" # Add a bit of sleep to simulate some work.\n",
" time.sleep(random.random() / 100.0)\n",
"\n",
"\n",
"# Start time.\n",
"st = time.time()\n",
"\n",
"# Start the threads.\n",
"for i in range(num_threads):\n",
" t = threading.Thread(target=marker)\n",
" t.start()\n",
"\n",
"while True:\n",
" # Report count stats every second.\n",
" time.sleep(1)\n",
"\n",
" display.clear_output(wait=True)\n",
"\n",
" et = time.time()\n",
"\n",
" # Show stats of the marks rate since the start of this cell.\n",
" print(f\"\"\"\n",
"Elapsed time: {et - st}\n",
"Marks count: {count}\n",
"Marks per second: {count / (et - st)}\n",
"\"\"\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Pace in Async Tasks\n",
"\n",
"Pace can also be maintained when using asynchronous tasks. For this, the `amark`\n",
"method must be used and awaited."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"num_tasks = 10\n",
"count = 0\n",
"\n",
"\n",
"# Create a function to run in each task and update the count for each mark:\n",
"async def async_marker():\n",
" global count\n",
"\n",
" while True:\n",
" # Mark and increment counter. Calls to amark will block to maintain pace.\n",
" await p.amark()\n",
" count += 1\n",
"\n",
" # Add a bit of sleep to simulate some work.\n",
" await asyncio.sleep(random.random() / 100.0)\n",
"\n",
"\n",
"# Start time.\n",
"st = time.time()\n",
"\n",
"loop = asyncio.get_event_loop()\n",
"\n",
"# Start the threads.\n",
"for i in range(num_tasks):\n",
" task = loop.create_task(async_marker())\n",
"\n",
"while True:\n",
" # Report count stats every second.\n",
"\n",
" await asyncio.sleep(1)\n",
"\n",
" display.clear_output(wait=True)\n",
"\n",
" et = time.time()\n",
"\n",
" # Show stats of the marks rate since the start of this cell.\n",
" print(f\"\"\"\n",
"max_marks: {p.max_marks}\n",
"mark_expirations: {p.mark_expirations}\n",
"len(mark_expirations): {len(p.mark_expirations)}\n",
"Elapsed time: {et - st}\n",
"Marks count: {count}\n",
"Marks per second: {count / (et - st)}\n",
"\"\"\")"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3"
}
},
"nbformat": 4,
"nbformat_minor": 4
}
44 changes: 43 additions & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,11 @@ def _validate_snowpark_session_with_connection_parameters(
"warehouse": snowpark_session.get_current_warehouse(),
"role": snowpark_session.get_current_role(),
}

for k, v in snowpark_session_connection_parameters.items():
if v and v.startswith('"') and v.endswith('"'):
snowpark_session_connection_parameters[k] = v.strip('"')

missing_snowpark_session_parameters = []
mismatched_parameters = []
for k, v in snowpark_session_connection_parameters.items():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
"trulens-core",
"trulens-feedback",
"trulens-providers-cortex",
"trulens-semconv",
]

_TRULENS_EXTRA_STAGED_PACKAGES = [
Expand All @@ -33,6 +34,8 @@
"nest-asyncio",
"nltk",
"numpy",
"opentelemetry-api",
"opentelemetry-sdk",
"packaging",
"pandas",
"pip",
Expand Down
10 changes: 6 additions & 4 deletions src/core/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,16 +45,18 @@ alembic = "^1.8.1"
nest-asyncio = "^1.5"
python-dotenv = ">=0.21,<2.0"
importlib-resources = "^6.0"
trulens-semconv = { version = "^1.0.0", optional = true }
opentelemetry-api = { version = "^1.0.0", optional = true }
opentelemetry-sdk = { version = "^1.0.0", optional = true }

[tool.poetry.group.tqdm]
optional = true

[tool.poetry.group.tqdm.dependencies]
tqdm = { version = ">=4.2.0", optional = true }

[tool.poetry.group.otel]
[tool.poetry.group.dev]
optional = true

[tool.poetry.group.otel.dependencies]
opentelemetry-api = { version = ">=1", optional = true }
opentelemetry-sdk = { version = ">=1", optional = true }
[tool.poetry.group.dev.dependencies]
trulens-semconv = { path = "../semconv" }
Loading

0 comments on commit 0614871

Please sign in to comment.