Skip to content

Commit

Permalink
Replace Argo exit hook step with new callback app (#677)
Browse files Browse the repository at this point in the history
* gh-671 Callback app for Argo
* gh-671 Update CI scripts
* gh-671 Fix license header
* gh-671 Fix unit test

Signed-off-by: Victor Chang <[email protected]>
  • Loading branch information
mocsharp authored Feb 24, 2023
1 parent d36b557 commit 4c8aba3
Show file tree
Hide file tree
Showing 11 changed files with 116 additions and 96 deletions.
4 changes: 4 additions & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.


# Docker-Compose
deploy/

.github/
.docs/
.demos/
Expand Down
2 changes: 2 additions & 0 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ jobs:
image: ghcr.io/project-monai/monai-deploy-workflow-manager
- dockerfile: TaskManager.Dockerfile
image: ghcr.io/project-monai/monai-deploy-task-manager
- dockerfile: CallbackApp.Dockerfile
image: ghcr.io/project-monai/monai-deploy-task-manager-callback
outputs:
semVer: ${{ steps.gitversion.outputs.semVer }}
majorMinorPatch: ${{ steps.gitversion.outputs.majorMinorPatch }}
Expand Down
4 changes: 3 additions & 1 deletion .github/workflows/nightly.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ jobs:
image: ghcr.io/project-monai/monai-deploy-workflow-manager
- dockerfile: TaskManager.Dockerfile
image: ghcr.io/project-monai/monai-deploy-task-manager
env:
- dockerfile: CallbackApp.Dockerfile
image: ghcr.io/project-monai/monai-deploy-task-manager-callback
env:
REGISTRY: ghcr.io
permissions:
packages: write
Expand Down
20 changes: 20 additions & 0 deletions CallbackApp.Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Copyright 2023 MONAI Consortium
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

FROM python:3.10-slim-buster

WORKDIR /app
COPY src/TaskManager/CallbackApp/app.py ./
COPY src/TaskManager/CallbackApp/requirements.txt ./

RUN pip install -r requirements.txt

CMD ["/app/app.py"]
12 changes: 12 additions & 0 deletions src/TaskManager/CallbackApp/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# Copyright 2023 MONAI Consortium
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

.env/
53 changes: 53 additions & 0 deletions src/TaskManager/CallbackApp/app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
#!/usr/bin/env python

# Copyright 2023 MONAI Consortium
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import argparse
import pika
import uuid


def main():
parser = argparse.ArgumentParser()
parser.add_argument("--host", type=str, required=True)
parser.add_argument("--username", type=str, required=True)
parser.add_argument("--password", type=str, required=True)
parser.add_argument("--vhost", type=str, required=True)
parser.add_argument("--exchange", type=str, required=True)
parser.add_argument("--topic", type=str, required=True)
parser.add_argument("--correlationId", type=str, required=True)
parser.add_argument("--message", type=str, required=True)
parser.add_argument("--secure", default=False, type=bool)
args = parser.parse_args()

print(f"[Correlation ID={args.correlationId}] Sending message to {args.host} at exchange={args.exchange}, topic={args.topic}...")
print(f"[Correlation ID={args.correlationId}] Message={args.message}...")

credentials = pika.PlainCredentials(args.username, args.password)
connection = pika.BlockingConnection(
pika.ConnectionParameters(host=args.host, virtual_host=args.vhost, credentials=credentials))
channel = connection.channel()

properties = pika.BasicProperties(
content_type="application/json",
message_id=str(uuid.uuid4()),
app_id="Task Manager Callback",
correlation_id=args.correlationId,
delivery_mode=2,
type=args.topic)

channel.basic_publish(exchange=args.exchange, routing_key=args.topic, body=args.message, properties=properties)
connection.close()
print('Message sent.')

if __name__ == "__main__":
main()
1 change: 1 addition & 0 deletions src/TaskManager/CallbackApp/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pika==1.3.1
14 changes: 2 additions & 12 deletions src/TaskManager/Plug-ins/Argo/ArgoPlugin.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2022 MONAI Consortium
* Copyright 2023 MONAI Consortium
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -534,15 +534,6 @@ private async Task AddExitHookTemplate(Workflow workflow, CancellationToken canc
Name = Strings.ExitHookTemplateName,
Steps = new List<ParallelSteps>()
{
new ParallelSteps()
{
new WorkflowStep()
{
Name = Strings.ExitHookTemplateGenerateTemplateName,
Template = Strings.ExitHookTemplateGenerateTemplateName,
}
},

new ParallelSteps()
{
new WorkflowStep()
Expand All @@ -559,8 +550,7 @@ private async Task AddExitHookTemplate(Workflow workflow, CancellationToken canc
var artifact = await CreateArtifact(temporaryStore, cancellationToken).ConfigureAwait(false);

var exitHookTemplate = new ExitHookTemplate(_options.Value, Event);
workflow.Spec.Templates.Add(exitHookTemplate.GenerateMessageTemplate(artifact));
workflow.Spec.Templates.Add(exitHookTemplate.GenerateSendTemplate(artifact));
workflow.Spec.Templates.Add(exitHookTemplate.GenerateCallbackMessageTemplate(artifact));
}

private async Task<WorkflowTemplate> LoadWorkflowTemplate(string workflowTemplateName)
Expand Down
89 changes: 15 additions & 74 deletions src/TaskManager/Plug-ins/Argo/ExitHookTemplate.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2022 MONAI Consortium
* Copyright 2023 MONAI Consortium
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -53,22 +53,19 @@ public ExitHookTemplate(WorkflowManagerOptions options, TaskDispatchEvent taskDi
_messageFileName = $"{_messageId}.json";
}

public Template2 GenerateMessageTemplate(S3Artifact2 artifact)
public Template2 GenerateCallbackMessageTemplate(S3Artifact2 artifact)
{
var taskUpdateEvent = GenerateTaskCallbackEvent();
var taskUpdateEventJson = JsonConvert.SerializeObject(taskUpdateEvent);
var message = GenerateTaskCallbackMessage();
var messageJson = JsonConvert.SerializeObject(JsonConvert.SerializeObject(message)); // serialize the message 2x

return new Template2()
{
Name = Strings.ExitHookTemplateGenerateTemplateName,
Name = Strings.ExitHookTemplateSendTemplateName,
Inputs = new Inputs
{
Parameters = new List<Parameter>()
{
new Parameter { Name = Strings.ExitHookParameterEvent, Value = taskUpdateEventJson },
new Parameter { Name = Strings.ExitHookParameterMessage, Value = messageJson }
}
},
Container = new Container2
Expand All @@ -82,8 +79,18 @@ public Template2 GenerateMessageTemplate(S3Artifact2 artifact)
{"memory", _options.TaskManager.ArgoPluginArguments.MessageGeneratorContainerMemoryLimit}
}
},
Command = new List<string> { "/bin/sh", "-c" },
Args = new List<string> { $"echo \"{{{{inputs.parameters.message}}}}\" > {Strings.ExitHookOutputPath}{_messageFileName}; cat {Strings.ExitHookOutputPath}{_messageFileName};" }
Command = new List<string> { "python" },
Args = new List<string> {
"/app/app.py",
"--host", _messagingEndpoint,
"--username", _messagingUsername,
"--password", _messagingPassword,
"--vhost", _messagingVhost,
"--exchange", _messagingExchange,
"--topic", _messagingTopic,
"--correlationId", _taskDispatchEvent.CorrelationId,
"--message", "{{inputs.parameters.event}}"
}
},
PodSpecPatch = "{\"initContainers\":[{\"name\":\"init\",\"resources\":{\"limits\":{\"cpu\":\"" + _options.TaskManager.ArgoPluginArguments.InitContainerCpuLimit + "\",\"memory\": \"" + _options.TaskManager.ArgoPluginArguments.InitContainerMemoryLimit + "\"},\"requests\":{\"cpu\":\"0\",\"memory\":\"0Mi\"}}}],\"containers\":[{\"name\":\"wait\",\"resources\":{\"limits\":{\"cpu\":\"" + _options.TaskManager.ArgoPluginArguments.WaitContainerCpuLimit + "\",\"memory\":\"" + _options.TaskManager.ArgoPluginArguments.WaitContainerMemoryLimit + "\"},\"requests\":{\"cpu\":\"0\",\"memory\":\"0Mi\"}}}]}",
Outputs = new Outputs
Expand Down Expand Up @@ -115,71 +122,5 @@ private TaskCallbackEvent GenerateTaskCallbackEvent() =>
Identity = "{{workflow.name}}",
Outputs = _taskDispatchEvent.Outputs ?? new List<Messaging.Common.Storage>()
};

private object GenerateTaskCallbackMessage() =>
new
{
ContentType = Strings.ContentTypeJson,
CorrelationID = _taskDispatchEvent.CorrelationId,
MessageID = _messageId.ToString(),
Type = nameof(TaskCallbackEvent),
AppID = Strings.ApplicationId,
Exchange = _messagingExchange,
RoutingKey = _messagingTopic,
DeliveryMode = 2,
Body = "{{=sprig.b64enc(inputs.parameters.event)}}"
};

public Template2 GenerateSendTemplate(S3Artifact2 artifact)
{
var copyOfArtifact = new S3Artifact2
{
AccessKeySecret = artifact.AccessKeySecret,
Bucket = artifact.Bucket,
Endpoint = artifact.Endpoint,
Insecure = artifact.Insecure,
Key = $"{artifact.Key}/{_messageFileName}",
SecretKeySecret = artifact.SecretKeySecret,
};

return new Template2()
{
Name = Strings.ExitHookTemplateSendTemplateName,
Inputs = new Inputs
{
Artifacts = new List<Artifact>()
{
new Artifact
{
Name = "message",
Path = $"{Strings.ExitHookOutputPath}{_messageFileName}",
Archive = new ArchiveStrategy
{
None = new NoneStrategy()
},
S3 = copyOfArtifact
}
}
},
Container = new Container2
{
Image = _options.TaskManager.ArgoExitHookSendMessageContainerImage,
Resources = new ResourceRequirements
{
Limits = new Dictionary<string, string>{ {"cpu", _options.TaskManager.ArgoPluginArguments.MessageSenderContainerCpuLimit}, {"memory", _options.TaskManager.ArgoPluginArguments.MessageSenderContainerMemoryLimit} }
},
Command = new List<string> { "/rabtap" },
Args = new List<string> {
"pub",
$"--uri=amqp://{_messagingUsername}:{_messagingPassword}@{_messagingEndpoint}/{_messagingVhost}",
"--format=json",
$"{Strings.ExitHookOutputPath}{_messageFileName}",
"--delay=0s",
"--confirms",
"--mandatory" }
},
PodSpecPatch = "{\"initContainers\":[{\"name\":\"init\",\"resources\":{\"limits\":{\"cpu\":\"" + _options.TaskManager.ArgoPluginArguments.InitContainerCpuLimit + "\",\"memory\": \"" + _options.TaskManager.ArgoPluginArguments.InitContainerMemoryLimit + "\"},\"requests\":{\"cpu\":\"0\",\"memory\":\"0Mi\"}}}],\"containers\":[{\"name\":\"wait\",\"resources\":{\"limits\":{\"cpu\":\"" + _options.TaskManager.ArgoPluginArguments.WaitContainerCpuLimit + "\",\"memory\":\"" + _options.TaskManager.ArgoPluginArguments.WaitContainerMemoryLimit + "\"},\"requests\":{\"cpu\":\"0\",\"memory\":\"0Mi\"}}}]}",
};
}
}
}
6 changes: 2 additions & 4 deletions src/TaskManager/Plug-ins/Argo/Strings.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2022 MONAI Consortium
* Copyright 2023 MONAI Consortium
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -32,11 +32,9 @@ public static class Strings

public const string ExitHook = "exit";
public const string ExitHookTemplateName = "exit-message-template";
public const string ExitHookTemplateGenerateTemplateName = "generate-message";
public const string ExitHookTemplateSendTemplateName = "send-message";
public const string ExitHookParameterEvent = "event";
public const string ExitHookParameterMessage = "message";
public const string ExitHookGenerateMessageContainerImage = "alpine:latest";
public const string ExitHookGenerateMessageContainerImage = "monai-deploy-task-manager-callback:1.0";
public const string ExitHookOutputArtifactName = "output";
#pragma warning disable S5443 // public directory /tmp/ is used in Docker container.
public const string ExitHookOutputPath = "/tmp/";
Expand Down
7 changes: 2 additions & 5 deletions tests/UnitTests/TaskManager.Argo.Tests/ArgoPluginTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -683,11 +683,8 @@ public async Task ArgoPlugin_Adds_Container_Resource_Restrictions_Based_On_Confi
var result = await runner.ExecuteTask(CancellationToken.None).ConfigureAwait(false);

Assert.Equal(TaskExecutionStatus.Accepted, result.Status);
Assert.Equal(_messageGeneratorContainerCpuLimit, _submittedArgoTemplate?.Spec.Templates.FirstOrDefault(p => p.Name == Strings.ExitHookTemplateGenerateTemplateName).Container.Resources.Limits["cpu"]);
Assert.Equal(_messageGeneratorContainerMemoryLimit, _submittedArgoTemplate?.Spec.Templates.FirstOrDefault(p => p.Name == Strings.ExitHookTemplateGenerateTemplateName).Container.Resources.Limits["memory"]);
Assert.Equal(expectedPodSpecPatch, _submittedArgoTemplate?.Spec.Templates.FirstOrDefault(p => p.Name == Strings.ExitHookTemplateGenerateTemplateName).PodSpecPatch);
Assert.Equal(_messageSenderContainerCpuLimit, _submittedArgoTemplate?.Spec.Templates.FirstOrDefault(p => p.Name == Strings.ExitHookTemplateSendTemplateName).Container.Resources.Limits["cpu"]);
Assert.Equal(_messageSenderContainerMemoryLimit, _submittedArgoTemplate?.Spec.Templates.FirstOrDefault(p => p.Name == Strings.ExitHookTemplateSendTemplateName).Container.Resources.Limits["memory"]);
Assert.Equal(_messageGeneratorContainerCpuLimit, _submittedArgoTemplate?.Spec.Templates.FirstOrDefault(p => p.Name == Strings.ExitHookTemplateSendTemplateName).Container.Resources.Limits["cpu"]);
Assert.Equal(_messageGeneratorContainerMemoryLimit, _submittedArgoTemplate?.Spec.Templates.FirstOrDefault(p => p.Name == Strings.ExitHookTemplateSendTemplateName).Container.Resources.Limits["memory"]);
Assert.Equal(expectedPodSpecPatch, _submittedArgoTemplate?.Spec.Templates.FirstOrDefault(p => p.Name == Strings.ExitHookTemplateSendTemplateName).PodSpecPatch);
}

Expand Down

0 comments on commit 4c8aba3

Please sign in to comment.