Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[K8s] Custom Image Support for Kubernetes Instances #2729

Merged
Merged
Changes from all commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
079caff
testing images
landscapepainter Oct 8, 2023
0912fab
nit
landscapepainter Oct 18, 2023
2f08ea2
update
landscapepainter Oct 22, 2023
115840d
custom image support
landscapepainter Oct 23, 2023
18efb2b
check root privilege and installation of sudo
landscapepainter Oct 28, 2023
ff5ec12
Update ssh_user with user from custom image
landscapepainter Oct 29, 2023
e5603ed
support custom image from task yaml
landscapepainter Oct 29, 2023
faaa157
nit
landscapepainter Oct 29, 2023
5ff053d
nit
landscapepainter Oct 29, 2023
bb67a82
nit
landscapepainter Oct 30, 2023
5a15234
check if default user can run 'sudo'
landscapepainter Oct 31, 2023
656aa3a
nit
landscapepainter Nov 1, 2023
00f3db1
nit
landscapepainter Nov 1, 2023
ea66277
nit
landscapepainter Nov 3, 2023
13298ca
updated to support 'docker:' for k8s custom image
landscapepainter Nov 5, 2023
8892352
fix conda init issue
landscapepainter Nov 5, 2023
5410dc6
nit
landscapepainter Nov 6, 2023
2e01767
install rsync from create_node()
landscapepainter Nov 6, 2023
acc039a
format
landscapepainter Nov 6, 2023
f7ca4cf
Merge branch 'master' into k8s_custom_image
landscapepainter Nov 21, 2023
beeab4e
factor out function for running cmd on pods
landscapepainter Nov 21, 2023
d9f2573
nit
landscapepainter Nov 22, 2023
9d7232b
add 'curl' in setup_commands
landscapepainter Nov 22, 2023
eca405c
update Dockerfile_k8s
landscapepainter Nov 23, 2023
d800011
lint
landscapepainter Dec 11, 2023
35bed91
nit
landscapepainter Jan 19, 2024
a2fe540
fix jump pod user issue and openssh install
landscapepainter Jan 19, 2024
5837c35
nit
landscapepainter Jan 19, 2024
d75a225
nit naming and comment update
landscapepainter Jan 19, 2024
cbce4cf
nit
landscapepainter Jan 20, 2024
7e5fc60
Merge branch 'master' of https://github.com/skypilot-org/skypilot int…
romilbhardwaj Jan 20, 2024
b7c69bb
Merge remote-tracking branch 'landscapepainter/k8s_custom_image' into…
romilbhardwaj Jan 20, 2024
3e82711
fixes
romilbhardwaj Jan 21, 2024
b6037b2
Add support for pod_override in config
romilbhardwaj Jan 21, 2024
8e8aa6b
lint
romilbhardwaj Jan 21, 2024
16a1775
logging and apt fixes
romilbhardwaj Jan 21, 2024
f38402b
apt fixes
romilbhardwaj Jan 21, 2024
2545ac1
multinode fixes
romilbhardwaj Jan 22, 2024
efdd07d
multinode fixes
romilbhardwaj Jan 23, 2024
e3acc67
Merge branch 'master' into k8s_custom_image
landscapepainter Jan 30, 2024
3d0ad19
Merge branch 'k8s_custom_image' of https://github.com/landscapepainte…
landscapepainter Jan 30, 2024
2101751
nit
landscapepainter Jan 30, 2024
1474598
nit
landscapepainter Jan 30, 2024
0eed02e
nit
landscapepainter Jan 30, 2024
fe394ee
nit
landscapepainter Jan 30, 2024
3a40507
nit
landscapepainter Jan 30, 2024
2b842b9
rebase k8s ray yaml
landscapepainter Jan 31, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
176 changes: 153 additions & 23 deletions sky/skylet/providers/kubernetes/node_provider.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import copy
import logging
import os
import re
import time
from typing import Dict
from uuid import uuid4
Expand All @@ -10,9 +12,13 @@
from ray.autoscaler.tags import TAG_RAY_CLUSTER_NAME
from ray.autoscaler.tags import TAG_RAY_NODE_KIND

from sky import exceptions
from sky.adaptors import kubernetes
from sky.backends import backend_utils
from sky.provision.kubernetes import utils as kubernetes_utils
from sky.skylet import constants
from sky.skylet.providers.kubernetes import config
from sky.utils import cluster_yaml_utils
from sky.utils import common_utils

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -73,6 +79,20 @@ def to_label_selector(tags):
return label_selector


def run_command_on_pods(node_name, node_namespace, command):
cmd_output = kubernetes.stream()(
kubernetes.core_api().connect_get_namespaced_pod_exec,
node_name,
node_namespace,
command=command,
stderr=True,
stdin=False,
stdout=True,
tty=False,
_request_timeout=kubernetes.API_TIMEOUT)
return cmd_output


class KubernetesNodeProvider(NodeProvider):

def __init__(self, provider_config, cluster_name):
Expand Down Expand Up @@ -186,6 +206,23 @@ def set_node_tags(self, node_ids, tags):
# One more try
self._set_node_tags(node_ids, tags)

def _recover_cluster_yaml_path(self, cluster_name_with_hash: str) -> str:
# 'cluster_name_with_hash' combines the cluster name and hash value,
# separated by a hyphen. By using 'slice_length', we remove the hash
# (and its preceding hyphen) to retrieve the original cluster name.
slice_length = -(common_utils.USER_HASH_LENGTH_IN_CLUSTER_NAME + 1)
cluster_name = cluster_name_with_hash[:slice_length]
cluster_yaml_path = (os.path.join(
os.path.expanduser(backend_utils.SKY_USER_FILE_PATH),
f'{cluster_name}.yml'))
# Check if cluster_yaml_path exists. If not, we are running on
# the master node in a multi-node setup, in which case we must use the
# default ~/.sky/sky_ray.yml path.
if not os.path.exists(cluster_yaml_path):
cluster_yaml_path = os.path.expanduser(
cluster_yaml_utils.SKY_CLUSTER_YAML_REMOTE_PATH)
return cluster_yaml_path

def _set_node_tags(self, node_id, tags):
pod = kubernetes.core_api().read_namespaced_pod(node_id, self.namespace)
pod.metadata.labels.update(tags)
Expand Down Expand Up @@ -264,7 +301,7 @@ def _raise_pod_scheduling_errors(self, new_nodes):
f'Details: \'{event_message}\' ')
raise config.KubernetesError(f'{timeout_err_msg}')

def _wait_for_pods_to_schedule(self, new_nodes):
def _wait_for_pods_to_schedule(self, new_nodes_with_jump_pod):
"""Wait for all pods to be scheduled.

Wait for all pods including jump pod to be scheduled, and if it
Expand All @@ -275,7 +312,7 @@ def _wait_for_pods_to_schedule(self, new_nodes):
start_time = time.time()
while time.time() - start_time < self.timeout:
all_pods_scheduled = True
for node in new_nodes:
for node in new_nodes_with_jump_pod:
# Iterate over each pod to check their status
pod = kubernetes.core_api().read_namespaced_pod(
node.metadata.name, self.namespace)
Expand All @@ -292,7 +329,7 @@ def _wait_for_pods_to_schedule(self, new_nodes):

# Handle pod scheduling errors
try:
self._raise_pod_scheduling_errors(new_nodes)
self._raise_pod_scheduling_errors(new_nodes_with_jump_pod)
except config.KubernetesError:
raise
except Exception as e:
Expand All @@ -301,7 +338,7 @@ def _wait_for_pods_to_schedule(self, new_nodes):
'for pod scheduling failure. '
f'Error: {common_utils.format_exception(e)}') from None

def _wait_for_pods_to_run(self, new_nodes):
def _wait_for_pods_to_run(self, new_nodes_with_jump_pod):
"""Wait for pods and their containers to be ready.

Pods may be pulling images or may be in the process of container
Expand All @@ -310,7 +347,7 @@ def _wait_for_pods_to_run(self, new_nodes):
while True:
all_pods_running = True
# Iterate over each pod to check their status
for node in new_nodes:
for node in new_nodes_with_jump_pod:
pod = kubernetes.core_api().read_namespaced_pod(
node.metadata.name, self.namespace)

Expand Down Expand Up @@ -345,6 +382,60 @@ def _wait_for_pods_to_run(self, new_nodes):
break
time.sleep(1)

def _check_user_privilege(self, new_nodes):
# Checks if the default user has sufficient privilege to set up
# the kubernetes instance pod.
check_k8s_user_sudo_cmd = [
'/bin/sh',
'-c',
(
'if [ $(id -u) -eq 0 ]; then'
# If user is root, create an alias for sudo used in skypilot setup
' echo \'alias sudo=""\' >> ~/.bashrc; '
'else '
' if command -v sudo >/dev/null 2>&1; then '
' timeout 2 sudo -l >/dev/null 2>&1 || '
f' ( echo {exceptions.INSUFFICIENT_PRIVILEGES_CODE!r}; ); '
' else '
f' ( echo {exceptions.INSUFFICIENT_PRIVILEGES_CODE!r}; ); '
' fi; '
'fi')
]

for new_node in new_nodes:
privilege_check = run_command_on_pods(new_node.metadata.name,
self.namespace,
check_k8s_user_sudo_cmd)
if privilege_check == str(exceptions.INSUFFICIENT_PRIVILEGES_CODE):
raise config.KubernetesError(
'Insufficient system privileges detected. '
'Ensure the default user has root access or '
'"sudo" is installed and the user is added to the sudoers '
'from the image.')

def _setup_ssh_in_pods(self, new_nodes):
# Setting up ssh for the pod instance. This is already setup for
# the jump pod so it does not need to be run for it.
set_k8s_ssh_cmd = [
'/bin/sh', '-c',
('prefix_cmd() { if [ $(id -u) -ne 0 ]; then echo "sudo"; else echo ""; fi; }; '
'export DEBIAN_FRONTEND=noninteractive;'
'$(prefix_cmd) apt-get update;'
'$(prefix_cmd) apt install openssh-server rsync -y; '
'$(prefix_cmd) mkdir -p /var/run/sshd; '
'$(prefix_cmd) sed -i "s/PermitRootLogin prohibit-password/PermitRootLogin yes/" /etc/ssh/sshd_config; '
'$(prefix_cmd) sed "s@session\\s*required\\s*pam_loginuid.so@session optional pam_loginuid.so@g" -i /etc/pam.d/sshd; '
'cd /etc/ssh/ && $(prefix_cmd) ssh-keygen -A; '
'$(prefix_cmd) mkdir -p ~/.ssh; '
'$(prefix_cmd) cp /etc/secret-volume/ssh-publickey ~/.ssh/authorized_keys; '
'$(prefix_cmd) service ssh restart')
romilbhardwaj marked this conversation as resolved.
Show resolved Hide resolved
romilbhardwaj marked this conversation as resolved.
Show resolved Hide resolved
]

# TODO(romilb): We need logging and surface errors here.
for new_node in new_nodes:
run_command_on_pods(new_node.metadata.name, self.namespace,
set_k8s_ssh_cmd)

def _set_env_vars_in_pods(self, new_nodes):
"""Setting environment variables in pods.

Expand All @@ -363,22 +454,33 @@ def _set_env_vars_in_pods(self, new_nodes):
"""
set_k8s_env_var_cmd = [
'/bin/sh', '-c',
('printenv | awk -F "=" \'{print "export " $1 "=\\047" $2 "\\047"}\' > ~/k8s_env_var.sh && '
('prefix_cmd() { if [ $(id -u) -ne 0 ]; then echo "sudo"; else echo ""; fi; } && '
'printenv | awk -F "=" \'{print "export " $1 "=\\047" $2 "\\047"}\' > ~/k8s_env_var.sh && '
'mv ~/k8s_env_var.sh /etc/profile.d/k8s_env_var.sh || '
'sudo mv ~/k8s_env_var.sh /etc/profile.d/k8s_env_var.sh')
'$(prefix_cmd) mv ~/k8s_env_var.sh /etc/profile.d/k8s_env_var.sh')
]

for new_node in new_nodes:
kubernetes.stream()(
kubernetes.core_api().connect_get_namespaced_pod_exec,
new_node.metadata.name,
self.namespace,
command=set_k8s_env_var_cmd,
stderr=True,
stdin=False,
stdout=True,
tty=False,
_request_timeout=kubernetes.API_TIMEOUT)
run_command_on_pods(new_node.metadata.name, self.namespace,
set_k8s_env_var_cmd)

def _update_ssh_user_config(self, new_nodes, cluster_name_with_hash):
get_k8s_ssh_user_cmd = ['/bin/sh', '-c', ('echo $(whoami)')]
for new_node in new_nodes:
ssh_user = run_command_on_pods(new_node.metadata.name,
self.namespace, get_k8s_ssh_user_cmd)
romilbhardwaj marked this conversation as resolved.
Show resolved Hide resolved

cluster_yaml_path = self._recover_cluster_yaml_path(
cluster_name_with_hash)
with open(cluster_yaml_path, 'r') as f:
content = f.read()

# Replacing the default ssh user name with the actual user name.
# This updates user name specified in user's custom image if it's used.
content = re.sub(r'ssh_user: \w+', f'ssh_user: {ssh_user}', content)

with open(cluster_yaml_path, 'w') as f:
f.write(content)

def create_node(self, node_config, tags, count):
conf = copy.deepcopy(node_config)
Expand Down Expand Up @@ -423,17 +525,33 @@ def create_node(self, node_config, tags, count):
# Adding the jump pod to the new_nodes list as well so it can be
# checked if it's scheduled and running along with other pod instances.
ssh_jump_pod_name = conf['metadata']['labels']['skypilot-ssh-jump']
new_nodes_with_jump_pod = new_nodes[:]
jump_pod = kubernetes.core_api().read_namespaced_pod(
ssh_jump_pod_name, self.namespace)
new_nodes.append(jump_pod)
new_nodes_with_jump_pod.append(jump_pod)
node_names = [node.metadata.name for node in new_nodes_with_jump_pod]

# Wait until the pods are scheduled and surface cause for error
# if there is one
self._wait_for_pods_to_schedule(new_nodes)
logger.info(config.log_prefix +
f'Waiting for pods to schedule. Pods: {node_names}')
self._wait_for_pods_to_schedule(new_nodes_with_jump_pod)
# Wait until the pods and their containers are up and running, and
# fail early if there is an error
self._wait_for_pods_to_run(new_nodes)
logger.info(config.log_prefix +
f'Waiting for pods to run. Pods: {node_names}')
self._wait_for_pods_to_run(new_nodes_with_jump_pod)
logger.info(config.log_prefix +
f'Checking if user in image has sufficient privileges.')
self._check_user_privilege(new_nodes)
logger.info(config.log_prefix + f'Setting up SSH in pod.')
self._setup_ssh_in_pods(new_nodes)
logger.info(config.log_prefix +
f'Setting up environment variables in pod.')
self._set_env_vars_in_pods(new_nodes)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to setup envs for jump pod?

Copy link
Collaborator Author

@landscapepainter landscapepainter Jan 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The envs are necessary for pods to interact with GPUs, so it's only necessary for the instance pods, not for jump pod.

cluster_name_with_hash = conf['metadata']['labels']['skypilot-cluster']
logger.info(config.log_prefix + f'Fetching and updating ssh username.')
self._update_ssh_user_config(new_nodes, cluster_name_with_hash)

def terminate_node(self, node_id):
logger.info(config.log_prefix + 'calling delete_namespaced_pod')
Expand Down Expand Up @@ -480,7 +598,7 @@ def get_command_runner(self,
log_prefix,
node_id,
auth_config,
cluster_name,
cluster_name_with_hash,
process_runner,
use_internal_ip,
docker_config=None):
Expand All @@ -492,20 +610,32 @@ def get_command_runner(self,
node_id(str): the node ID.
auth_config(dict): the authentication configs from the autoscaler
yaml file.
cluster_name(str): the name of the cluster.
cluster_name_with_hash(str): the name of the cluster and hash value,
separated by a hyphen.
process_runner(module): the module to use to run the commands
in the CommandRunner. E.g., subprocess.
use_internal_ip(bool): whether the node_id belongs to an internal ip
or external ip.
docker_config(dict): If set, the docker information of the docker
container that commands should be run on.
"""
# For custom images, the username might differ across images.
# The 'ssh_user' is updated inplace in the YAML at the end of the
# 'create_node()' process in _update_ssh_user_config.
# Since the node provider is initialized with stale auth information,
# we need to reload the updated user from YAML.
cluster_yaml_path = self._recover_cluster_yaml_path(
cluster_name_with_hash)
ssh_credentials = backend_utils.ssh_credential_from_yaml(
cluster_yaml_path)
auth_config['ssh_user'] = ssh_credentials['ssh_user']

common_args = {
'log_prefix': log_prefix,
'node_id': node_id,
'provider': self,
'auth_config': auth_config,
'cluster_name': cluster_name,
'cluster_name': cluster_name_with_hash,
'process_runner': process_runner,
'use_internal_ip': use_internal_ip,
}
Expand Down
Loading