diff --git a/v3io_gputils/mpijob.py b/v3io_gputils/mpijob.py index f74959f..e59c87b 100644 --- a/v3io_gputils/mpijob.py +++ b/v3io_gputils/mpijob.py @@ -5,38 +5,70 @@ from kubernetes import client, config from kubernetes.client.rest import ApiException -_mpijob_template = { - 'apiVersion': 'kubeflow.org/v1alpha1', - 'kind': 'MPIJob', - 'metadata': { - 'name': '', - 'namespace': 'default-tenant' - }, - 'spec': { - 'replicas': 1, - 'template': { - 'spec': { - 'containers': [{ - 'image': 'iguaziodocker/horovod:0.1.1', - 'name': '', - 'command': [], - 'volumeMounts': [{'name': 'v3io', 'mountPath': '/User'}], - 'workingDir': '/User', - 'securityContext': { - 'capabilities': {'add': ['IPC_LOCK']}}, - 'resources': { - 'limits': {'nvidia.com/gpu': 1}}}], - 'volumes': [{ - 'name': 'v3io', - 'flexVolume': { - 'driver': 'v3io/fuse', - 'options': { - 'container': 'users', - 'subPath': '', - 'accessKey': '', - } + +class MPIJobPodTemplateType(object): + launcher = 'Launcher' + worker = 'Worker' + + @staticmethod + def all(): + return [ + MPIJobPodTemplateType.launcher, + MPIJobPodTemplateType.worker + ] + + +_mpijob_launcher_pod_template = { + 'spec': { + 'containers': [ + { + 'image': 'iguaziodocker/horovod:0.1.1', + 'name': '', + 'command': [], + 'workingDir': '/User', + 'securityContext': { + 'capabilities': {'add': ['IPC_LOCK']} + } + } + ], + 'volumes': [{ + 'name': 'v3io', + 'flexVolume': { + 'driver': 'v3io/fuse', + 'options': { + 'container': 'users', + 'subPath': '', + 'accessKey': '', + } + }}] + }, +} + +_mpijob_worker_pod_template = { + 'spec': { + 'containers': [ + { + 'image': 'iguaziodocker/horovod:0.1.1', + 'name': '', + 'command': [], + 'volumeMounts': [{'name': 'v3io', 'mountPath': '/User'}], + 'workingDir': '/User', + 'securityContext': { + 'capabilities': {'add': ['IPC_LOCK']}}, + 'resources': { + 'limits': {'nvidia.com/gpu': 1}}}], + 'volumes': [{ + 'name': 'v3io', + 'flexVolume': { + 'driver': 'v3io/fuse', + 'options': { + 'container': 'users', + 'subPath': '', + 'accessKey': '', + } }}] - }}}} + }, +} class MpiJob: @@ -53,7 +85,7 @@ class MpiJob: """ group = 'kubeflow.org' - version = 'v1alpha1' + version = 'v1' plural = 'mpijobs' def __init__(self, name, image=None, command=None, @@ -61,29 +93,61 @@ def __init__(self, name, image=None, command=None, self.api_instance = None self.name = name self.namespace = namespace - self._struct = deepcopy(_mpijob_template) - self._struct['metadata'] = {'name': name, 'namespace': namespace} - self._update_container('name', name) - if image: - self._update_container('image', image) - if command: - self._update_container('command', ['mpirun','python'] + command) - if replicas: - self._struct['spec']['replicas'] = replicas - self._update_access_token(environ.get('V3IO_ACCESS_KEY','')) - self._update_running_user(environ.get('V3IO_USERNAME','')) - - def _update_container(self, key, value): - self._struct['spec']['template']['spec']['containers'][0][key] = value - def _update_access_token(self, token): - self._struct['spec']['template']['spec']['volumes'][0]['flexVolume']['options']['accessKey'] = token + self._pod_templates = { + MPIJobPodTemplateType.launcher: deepcopy(_mpijob_launcher_pod_template), + MPIJobPodTemplateType.worker: deepcopy(_mpijob_worker_pod_template) + } - def _update_running_user(self, username): - self._struct['spec']['template']['spec']['volumes'][0]['flexVolume']['options']['subPath'] = '/' + username + self._update_container('name', name, MPIJobPodTemplateType.all()) + if image: + self._update_container('image', image, MPIJobPodTemplateType.all()) + if command: + self._update_container('command', ['mpirun', 'python'] + command, [MPIJobPodTemplateType.worker]) + + self._update_access_token(environ.get('V3IO_ACCESS_KEY', ''), MPIJobPodTemplateType.all()) + self._update_running_user(environ.get('V3IO_USERNAME', ''), MPIJobPodTemplateType.all()) + + self._struct = self._generate_mpi_job_template(name, namespace, replicas) + + def _generate_mpi_job_template(self, name, namespace, worker_replicas): + return { + 'apiVersion': 'kubeflow.org/v1', + 'kind': 'MPIJob', + 'metadata': {'name': name, 'namespace': namespace}, + 'spec': { + 'slotsPerWorker': 1, + 'mpiReplicaSpecs': { + 'Launcher': { + 'template': self._pod_templates[MPIJobPodTemplateType.launcher] + }, + 'Worker': { + 'replicas': worker_replicas, + 'template': self._pod_templates[MPIJobPodTemplateType.worker] + }, + }, + }, + } + + def _update_container(self, key, value, template_types): + for template_type in template_types: + self._pod_templates[template_type]['spec']['containers'][0][key] = value + + def _update_access_token(self, token, template_types): + for template_type in template_types: + self._pod_templates[template_type]['spec']['volumes'][0]['flexVolume']['options']['accessKey'] = token + + def _update_running_user(self, username, template_types): + for template_type in template_types: + self._pod_templates[template_type]['spec']['volumes'][0]['flexVolume']['options']['subPath'] = \ + '/' + username + + def _update_volumes(self, volumes, template_types): + for template_type in template_types: + self._pod_templates[template_type]['volumes'] = volumes def volume(self, mount='/User', volpath='~/', access_key=''): - self._update_container('volumeMounts', [{'name': 'v3io', 'mountPath': mount}]) + self._update_container('volumeMounts', [{'name': 'v3io', 'mountPath': mount}], MPIJobPodTemplateType.all()) if volpath.startswith('~/'): v3io_home = environ.get('V3IO_HOME', '') @@ -101,26 +165,31 @@ def volume(self, mount='/User', volpath='~/', access_key=''): } }} - self._struct['spec']['template']['spec']['volumes'] = [vol] + self._update_volumes([vol], MPIJobPodTemplateType.all()) return self def gpus(self, num, gpu_type='nvidia.com/gpu'): - self._update_container('resources', {'limits' : {gpu_type: num}}) + self._update_container('resources', {'limits': {gpu_type: num}}, [MPIJobPodTemplateType.worker]) return self def replicas(self, replicas_num): - self._struct['spec']['replicas'] = replicas_num + self._struct['spec']['mpiReplicaSpecs'][MPIJobPodTemplateType.worker]['replicas'] = replicas_num return self def working_dir(self, working_dir): - self._update_container('workingDir', working_dir) + self._update_container('workingDir', working_dir, MPIJobPodTemplateType.all()) return self def to_dict(self): return self._struct def to_yaml(self): - return yaml.dump(self.to_dict(), default_flow_style=False, sort_keys=False) + + # use safe dumper so yaml.dump will print full objects instead of pointer addresses + noalias_dumper = yaml.dumper.SafeDumper + noalias_dumper.ignore_aliases = lambda _self, data: True + + return yaml.dump(self.to_dict(), default_flow_style=False, sort_keys=False, Dumper=noalias_dumper) def submit(self): config.load_incluster_config() @@ -143,6 +212,7 @@ def delete(self): except ApiException as e: print("Exception when calling CustomObjectsApi->delete_namespaced_custom_object: %s\\n" % e) + def split_path(mntpath=''): if mntpath[0] == '/': mntpath = mntpath[1:]