Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Alexwang/write compression #49156

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions bazel/BUILD.zstd
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
load("@rules_foreign_cc//foreign_cc:defs.bzl", "make")
load("@com_github_ray_project_ray//bazel:ray.bzl", "filter_files_with_suffix")

filegroup(
name = "all",
srcs = glob(["**"]),
)

make(
name = "libzstd",
lib_source = ":all",
args = ["ZSTD_NO_ASM=1"],
visibility = ["//visibility:public"],
)
8 changes: 8 additions & 0 deletions bazel/ray_deps_setup.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -372,3 +372,11 @@ def ray_deps_setup():
sha256 = "2db82d1e7119df3e71b7640219b6dfe84789bc0537983c3b7ac4f7189aecfeaa",
strip_prefix = "jemalloc-5.3.0",
)

http_archive(
name = "zstd",
urls = ["https://github.com/facebook/zstd/releases/download/v1.5.6/zstd-1.5.6.tar.gz"],
build_file = "@com_github_ray_project_ray//bazel:BUILD.zstd",
sha256 = "8c29e06cf42aacc1eafc4077ae2ec6c6fcb96a626157e0593d5e82a34fd403c1",
strip_prefix = "zstd-1.5.6",
)
20 changes: 18 additions & 2 deletions ci/docker/manylinux.Dockerfile
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# syntax=docker/dockerfile:1.3-labs

ARG HOSTTYPE
FROM quay.io/pypa/manylinux2014_${HOSTTYPE}:2023-11-13-f6b0c51
FROM quay.io/pypa/manylinux2014_${HOSTTYPE}:2024-07-01-8dac23b

ARG BUILDKITE_BAZEL_CACHE_URL

Expand All @@ -10,4 +10,20 @@ ENV RAY_INSTALL_JAVA=1
ENV BUILDKITE_BAZEL_CACHE_URL=$BUILDKITE_BAZEL_CACHE_URL

COPY ci/build/build-manylinux-forge.sh /tmp/build-manylinux-forge.sh
RUN /tmp/build-manylinux-forge.sh

RUN <<EOF
#!/bin/bash

# Centos 7 is EOL and is no longer available from the usual mirrors, so switch
# to https://vault.centos.org
sed -i 's/enabled=1/enabled=0/g' /etc/yum/pluginconf.d/fastestmirror.conf
sed -i 's/^mirrorlist/#mirrorlist/g' /etc/yum.repos.d/*.repo
sed -i 's;^.*baseurl=http://mirror;baseurl=https://vault;g' /etc/yum.repos.d/*.repo
if [ "${HOSTTYPE}" == "aarch64" ]; then
sed -i 's;/centos/7/;/altarch/7/;g' /etc/yum.repos.d/*.repo
fi

./tmp/build-manylinux-forge.sh

EOF

2 changes: 1 addition & 1 deletion ci/docker/manylinux.aarch64.wanda.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
name: "manylinux-aarch64"
froms:
- quay.io/pypa/manylinux2014_aarch64:2023-11-13-f6b0c51
- quay.io/pypa/manylinux2014_aarch64:2024-07-01-8dac23b
srcs:
- ci/build/build-manylinux-forge.sh
build_args:
Expand Down
2 changes: 1 addition & 1 deletion ci/docker/manylinux.wanda.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
name: "manylinux"
froms:
- quay.io/pypa/manylinux2014_x86_64:2023-11-13-f6b0c51
- quay.io/pypa/manylinux2014_x86_64:2024-07-01-8dac23b
srcs:
- ci/build/build-manylinux-forge.sh
build_args:
Expand Down
4 changes: 2 additions & 2 deletions dashboard/client/src/common/ProfilingLink.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ export const MemoryProfilingButton = ({
if (!pid || !ip) {
return <div></div>;
}
const profilerUrl = `/memory_profile?pid=${pid}&ip=${ip}`;
const profilerUrl = `memory_profile?pid=${pid}&ip=${ip}`;

return <ProfilerButton profilerUrl={profilerUrl} type={type} />;
};
Expand All @@ -325,7 +325,7 @@ export const TaskMemoryProfilingButton = ({
if (!taskId) {
return null;
}
const profilerUrl = `/memory_profile?task_id=${taskId}&attempt_number=${attemptNumber}&node_id=${nodeId}`;
const profilerUrl = `memory_profile?task_id=${taskId}&attempt_number=${attemptNumber}&node_id=${nodeId}`;

return <ProfilerButton profilerUrl={profilerUrl} />;
};
30 changes: 29 additions & 1 deletion dashboard/modules/metrics/dashboards/data_dashboard_panels.py
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,34 @@
fill=0,
stack=True,
),
Panel(
id=38,
title="In-Task Backpressure Time",
description="Time spent within a running task in backpressure.",
unit="seconds",
targets=[
Target(
expr="sum(ray_data_in_task_backpressure_time{{{global_filters}}}) by (dataset, operator)",
legend="In-Task Backpressure Time: {{dataset}}, {{operator}}",
)
],
fill=0,
stack=True,
),
Panel(
id=39,
title="Task CPU Time",
description="Time spent using CPU within a running task.",
unit="seconds",
targets=[
Target(
expr="sum(ray_data_task_cpu_time{{{global_filters}}}) by (dataset, operator)",
legend="Task CPU Time: {{dataset}}, {{operator}}",
)
],
fill=0,
stack=True,
),
# Ray Data Metrics (Object Store Memory)
Panel(
id=13,
Expand Down Expand Up @@ -548,4 +576,4 @@
panels=DATA_GRAFANA_PANELS,
standard_global_filters=['dataset=~"$DatasetID"', 'SessionName=~"$SessionName"'],
base_json_file_name="data_grafana_dashboard_base.json",
)
)
3 changes: 3 additions & 0 deletions dashboard/modules/reporter/profile_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,9 @@ async def detach_profiler(
cmd.append("--verbose")
cmd.append(str(pid))

if await _can_passwordless_sudo():
cmd = ["sudo", "-n"] + cmd

process = await asyncio.create_subprocess_exec(
*cmd,
stdout=subprocess.PIPE,
Expand Down
21 changes: 20 additions & 1 deletion python/ray/_raylet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -2228,7 +2228,26 @@ cdef execute_task_with_cancellation_handler(

if omp_num_threads_overriden:
# Reset the OMP_NUM_THREADS environ if it was set.
os.environ.pop("OMP_NUM_THREADS", None)
try:
os.environ.pop("OMP_NUM_THREADS", None)
except KeyError as e:
# os.environ is known to have undefined behavior if multiple
# threads is trying to modify the mapping simultaneously.
# Here specifically, race condition could happen when two threads
# poping the environ in the same time, where one of them deleting
# the key successfully, but the other met KeyError. If we met this
# issue, we just ensure the the current os.environ do not contain
# this env var and move on.
# Related issue: https://github.com/python/cpython/issues/120513
logger.error(
"KeyError occurred when popping OMP_NUM_THREADS from "
"os.environ, a possible race condition might happened. "
"Checking if the env var is already cleaned up and move on."
)
if os.environ.get("OMP_NUM_THREADS", None) is not None:
logger.error(
"Something wrong happened, OMP_NUM_THREADS still remains in os.environ"
)

if execution_info.max_calls != 0:
# Reset the state of the worker for the next task to execute.
Expand Down
24 changes: 17 additions & 7 deletions python/ray/air/_internal/util.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import os
import socket
from contextlib import closing
import copy
import logging
import os
import queue
import socket
import threading
from contextlib import closing
from typing import Optional

import numpy as np
Expand Down Expand Up @@ -35,7 +36,13 @@ class StartTraceback(Exception):


def skip_exceptions(exc: Optional[Exception]) -> Exception:
"""Skip all contained `StartTracebacks` to reduce traceback output"""
"""Skip all contained `StartTracebacks` to reduce traceback output.

Returns a shallow copy of the exception with all `StartTracebacks` removed.

If the RAY_AIR_FULL_TRACEBACKS environment variable is set,
the original exception (not a copy) is returned.
"""
should_not_shorten = bool(int(os.environ.get("RAY_AIR_FULL_TRACEBACKS", "0")))

if should_not_shorten:
Expand All @@ -45,12 +52,15 @@ def skip_exceptions(exc: Optional[Exception]) -> Exception:
# If this is a StartTraceback, skip
return skip_exceptions(exc.__cause__)

# Else, make sure nested exceptions are properly skipped
# Perform a shallow copy to prevent recursive __cause__/__context__.
new_exc = copy.copy(exc).with_traceback(exc.__traceback__)

# Make sure nested exceptions are properly skipped.
cause = getattr(exc, "__cause__", None)
if cause:
exc.__cause__ = skip_exceptions(cause)
new_exc.__cause__ = skip_exceptions(cause)

return exc
return new_exc


def exception_cause(exc: Optional[Exception]) -> Optional[Exception]:
Expand Down
43 changes: 42 additions & 1 deletion python/ray/air/tests/test_tracebacks.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import pytest

import ray
from ray import cloudpickle
from tblib import pickling_support
from ray.train import ScalingConfig
from ray.air._internal.util import StartTraceback, skip_exceptions
from ray.air._internal.util import StartTraceback, skip_exceptions, exception_cause
from ray.train.data_parallel_trainer import DataParallelTrainer

from ray.tune import Tuner
Expand Down Expand Up @@ -47,6 +49,45 @@ def test_short_traceback(levels):
assert i == levels - start_traceback + 1


def test_recursion():
"""Test that the skipped exception does not point to the original exception."""
root_exception = None

with pytest.raises(StartTraceback) as exc_info:
try:
raise Exception("Root Exception")
except Exception as e:
root_exception = e
raise StartTraceback from root_exception

assert root_exception, "Root exception was not captured."

start_traceback = exc_info.value
skipped_exception = skip_exceptions(start_traceback)

assert (
root_exception != skipped_exception
), "Skipped exception points to the original exception."


def test_tblib():
"""Test that tblib does not cause a maximum recursion error."""

with pytest.raises(Exception) as exc_info:
try:
try:
raise Exception("Root Exception")
except Exception as root_exception:
raise StartTraceback from root_exception
except Exception as start_traceback:
raise skip_exceptions(start_traceback) from exception_cause(start_traceback)

pickling_support.install()
reraised_exception = exc_info.value
# This should not raise a RecursionError/PicklingError.
cloudpickle.dumps(reraised_exception)


def test_traceback_tuner(ray_start_2_cpus):
"""Ensure that the Tuner's stack trace is not too long."""

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,22 @@ class OpRuntimeMetrics:
"metrics_group": "tasks",
},
)
task_cpu_time: float = field(
default=0,
metadata={
"description": "Time actively using CPU within tasks",
"metrics_group": "tasks",
"map_only": True,
},
)
in_task_backpressure_time: float = field(
default=0,
metadata={
"description": "Time spent waiting idly on generator outputs to be yielded within tasks",
"metrics_group": "tasks",
"map_only": True,
},
)

# === Object store memory metrics ===
obj_store_mem_internal_inqueue_blocks: int = field(
Expand Down Expand Up @@ -273,7 +289,6 @@ class OpRuntimeMetrics:
"metrics_group": "object_store_memory",
},
)

# === Miscellaneous metrics ===
# Use "metrics_group: "misc" in the metadata for new metrics in this section.

Expand Down Expand Up @@ -485,6 +500,9 @@ def on_task_output_generated(self, task_index: int, output: RefBundle):
for block_ref, meta in output.blocks:
assert meta.exec_stats and meta.exec_stats.wall_time_s
self.block_generation_time += meta.exec_stats.wall_time_s
if meta.exec_stats.backpressure_time:
self.in_task_backpressure_time += meta.exec_stats.backpressure_time
self.task_cpu_time += meta.exec_stats.cpu_time_s
assert meta.num_rows is not None
self.rows_task_outputs_generated += meta.num_rows
trace_allocation(block_ref, "operator_output")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from typing import Any, Callable, Deque, Dict, Iterator, List, Optional, Set, Union

import ray
import time
from ray import ObjectRef
from ray._raylet import ObjectRefGenerator
from ray.data._internal.compute import (
Expand Down Expand Up @@ -422,9 +423,11 @@ def _map_task(
m_out.exec_stats = stats.build()
m_out.exec_stats.udf_time_s = map_transformer.udf_time()
m_out.exec_stats.task_idx = ctx.task_idx
finish_time = time.perf_counter()
yield b_out
yield m_out
stats = BlockExecStats.builder()
stats.prev_map_task_finish_time = finish_time


class _BlockRefBundler:
Expand Down
Loading
Loading