Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BlueStyle #23

Merged
merged 1 commit into from
Jun 3, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Project.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
name = "AWSClusterManagers"
uuid = "69f5933c-162f-5d2f-a6d8-da72a0bfad91"
authors = ["Invenia Technical Computing"]
version = "1.1.4"
version = "1.1.5"

[deps]
AWSBatch = "dcae83d4-2881-5875-9d49-e5534165e9c0"
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ AWSClusterManagers

[![CI](https://github.com/JuliaCloud/AWSClusterManagers.jl/workflows/CI/badge.svg)](https://github.com/JuliaCloud/AWSClusterManagers.jl/actions?query=workflow%3ACI)
[![Bors enabled](https://bors.tech/images/badge_small.svg)](https://app.bors.tech/repositories/32323)
[![Code Style: Blue](https://img.shields.io/badge/code%20style-blue-4495d1.svg)](https://github.com/invenia/BlueStyle)
[![codecov](https://codecov.io/gh/JuliaCloud/AWSClusterManagers.jl/branch/master/graph/badge.svg)](https://codecov.io/gh/JuliaCloud/AWSClusterManagers.jl)
[![Stable Documentation](https://img.shields.io/badge/docs-stable-blue.svg)](https://juliacloud.github.io/AWSClusterManagers.jl/stable)

Expand Down
42 changes: 0 additions & 42 deletions TODO.md

This file was deleted.

2 changes: 1 addition & 1 deletion src/AWSClusterManagers.jl
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ const LOGGER = getlogger(@__MODULE__)

function __init__()
# https://invenia.github.io/Memento.jl/latest/faq/pkg-usage/
Memento.register(LOGGER)
return Memento.register(LOGGER)
end

include("socket.jl")
Expand Down
65 changes: 41 additions & 24 deletions src/batch.jl
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ struct AWSBatchManager <: ContainerManager
max_ip::IPv4=ip"255.255.255.255",
)
min_workers >= 0 || throw(ArgumentError("min workers must be non-negative"))
min_workers <= max_workers || throw(ArgumentError("min workers exceeds max workers"))
min_workers <= max_workers ||
throw(ArgumentError("min workers exceeds max workers"))

# Default the queue to using the WORKER_JOB_QUEUE environmental variable.
if isempty(queue)
Expand All @@ -79,7 +80,18 @@ struct AWSBatchManager <: ContainerManager

region = isempty(region) ? "us-east-1" : region

new(min_workers, max_workers, definition, name, queue, memory, region, Second(timeout), min_ip, max_ip)
return new(
min_workers,
max_workers,
definition,
name,
queue,
memory,
region,
Second(timeout),
min_ip,
max_ip,
)
end
end

Expand All @@ -95,7 +107,7 @@ function AWSBatchManager(
min_ip::IPv4=ip"0.0.0.0",
max_ip::IPv4=ip"255.255.255.255",
)
AWSBatchManager(
return AWSBatchManager(
min_workers,
max_workers,
definition,
Expand All @@ -105,16 +117,16 @@ function AWSBatchManager(
region,
timeout,
min_ip,
max_ip
max_ip,
)
end

function AWSBatchManager(workers::UnitRange{<:Integer}; kwargs...)
AWSBatchManager(first(workers), last(workers); kwargs...)
return AWSBatchManager(first(workers), last(workers); kwargs...)
end

function AWSBatchManager(workers::Integer; kwargs...)
AWSBatchManager(workers, workers; kwargs...)
return AWSBatchManager(workers, workers; kwargs...)
end

launch_timeout(mgr::AWSBatchManager) = mgr.timeout
Expand Down Expand Up @@ -143,32 +155,37 @@ function spawn_containers(mgr::AWSBatchManager, override_cmd::Cmd)
max_compute = @mock max_vcpus(queue)

if min_workers > max_compute
error(string(
"Unable to launch the minimum number of workers ($min_workers) as the ",
"minimum exceeds the max VCPUs available ($max_compute).",
))
error(
string(
"Unable to launch the minimum number of workers ($min_workers) as the ",
"minimum exceeds the max VCPUs available ($max_compute).",
),
)
elseif max_workers > max_compute
# Note: In addition to warning the user about the VCPU cap we could also also reduce
# the number of worker we request. Unfortunately since we don't know how many jobs
# are currently running or how long they will take we'll leave `max_workers`
# untouched.
warn(LOGGER, string(
"Due to the max VCPU limit ($max_compute) most likely only a partial amount ",
"of the requested workers ($max_workers) will be spawned.",
))
warn(
LOGGER,
string(
"Due to the max VCPU limit ($max_compute) most likely only a partial amount ",
"of the requested workers ($max_workers) will be spawned.",
),
)
end

# Since each batch worker can only use one cpu we override the vcpus to one.
job = @mock run_batch(
name = mgr.job_name,
definition = mgr.job_definition,
queue = mgr.job_queue,
region = mgr.region,
vcpus = 1,
memory = mgr.job_memory,
cmd = override_cmd,
num_jobs = max_workers,
allow_job_registration = false,
job = @mock run_batch(;
name=mgr.job_name,
definition=mgr.job_definition,
queue=mgr.job_queue,
region=mgr.region,
vcpus=1,
memory=mgr.job_memory,
cmd=override_cmd,
num_jobs=max_workers,
allow_job_registration=false,
)

if max_workers > 1
Expand Down
28 changes: 17 additions & 11 deletions src/batch_node.jl
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ struct AWSBatchNodeManager <: ContainerManager

function AWSBatchNodeManager(; timeout::Period=AWS_BATCH_NODE_TIMEOUT)
if !haskey(ENV, "AWS_BATCH_JOB_ID") || !haskey(ENV, "AWS_BATCH_JOB_MAIN_NODE_INDEX")
error("Unable to use $AWSBatchNodeManager outside of a running AWS Batch multi-node parallel job")
error(
"Unable to use $AWSBatchNodeManager outside of a running AWS Batch multi-node parallel job",
)
end

info(LOGGER, "AWS Batch Job ID: $(ENV["AWS_BATCH_JOB_ID"])")
Expand All @@ -33,7 +35,9 @@ struct AWSBatchNodeManager <: ContainerManager
end
end

function Distributed.launch(manager::AWSBatchNodeManager, params::Dict, launched::Array, c::Condition)
function Distributed.launch(
manager::AWSBatchNodeManager, params::Dict, launched::Array, c::Condition
)
num_workers = manager.num_workers
connected_workers = 0

Expand Down Expand Up @@ -69,10 +73,7 @@ function Distributed.launch(manager::AWSBatchNodeManager, params::Dict, launched
# address and port.
config = WorkerConfig()
config.io = sock
config.userdata = (;
:job_id => job_id,
:node_index => node_index,
)
config.userdata = (; :job_id => job_id, :node_index => node_index)

push!(workers, config)
end
Expand All @@ -86,19 +87,24 @@ function Distributed.launch(manager::AWSBatchNodeManager, params::Dict, launched
#
# Note: Julia worker numbers will not match up to the node index of the worker.
# Primarily this is due to the worker numbers being 1-indexed while nodes are 0-indexed.
append!(launched, sort!(workers, by=w -> w.userdata.node_index))
append!(launched, sort!(workers; by=w -> w.userdata.node_index))
notify(c)

if connected_workers < num_workers
warn(LOGGER, "Only $connected_workers of the $num_workers workers job have reported in")
warn(
LOGGER,
"Only $connected_workers of the $num_workers workers job have reported in",
)
else
debug(LOGGER, "All workers have successfully reported in")
end
end

function start_batch_node_worker()
if !haskey(ENV, "AWS_BATCH_JOB_ID") || !haskey(ENV, "AWS_BATCH_JOB_NODE_INDEX")
error("Unable to start a worker outside of a running AWS Batch multi-node parallel job")
error(
"Unable to start a worker outside of a running AWS Batch multi-node parallel job",
)
end

info(LOGGER, "AWS Batch Job ID: $(ENV["AWS_BATCH_JOB_ID"])")
Expand Down Expand Up @@ -151,8 +157,8 @@ function start_batch_node_worker()
# Establish a connection to the manager. If the manager is slow to startup the worker
# will attempt to connect for ~2 minutes.
manager_connect = retry(
() -> connect(manager_ip, AWS_BATCH_JOB_NODE_PORT),
delays=ExponentialBackOff(n=8, max_delay=30),
() -> connect(manager_ip, AWS_BATCH_JOB_NODE_PORT);
delays=ExponentialBackOff(; n=8, max_delay=30),
check=(s, e) -> e isa Base.IOError,
)
sock = manager_connect()
Expand Down
11 changes: 7 additions & 4 deletions src/container.jl
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,15 @@ The minimum and maximum number of workers wanted by the manager.
"""
desired_workers(::ContainerManager)

function Distributed.launch(manager::ContainerManager, params::Dict, launched::Array, c::Condition)
function Distributed.launch(
manager::ContainerManager, params::Dict, launched::Array, c::Condition
)
min_workers, max_workers = desired_workers(manager)
num_workers = 0

# Determine the IP address of the current host within the specified range
ips = filter!(getipaddrs()) do ip
typeof(ip) === typeof(manager.min_ip) &&
manager.min_ip <= ip <= manager.max_ip
typeof(ip) === typeof(manager.min_ip) && manager.min_ip <= ip <= manager.max_ip
end
valid_ip = first(ips)

Expand Down Expand Up @@ -108,7 +109,9 @@ function Distributed.launch(manager::ContainerManager, params::Dict, launched::A
end
end

function Distributed.manage(manager::ContainerManager, id::Integer, config::WorkerConfig, op::Symbol)
function Distributed.manage(
manager::ContainerManager, id::Integer, config::WorkerConfig, op::Symbol
)
# Note: Terminating the TCP connection from the master to the worker will cause the
# worker to shutdown automatically.
end
Expand Down
14 changes: 5 additions & 9 deletions src/docker.jl
Original file line number Diff line number Diff line change
Expand Up @@ -57,29 +57,25 @@ struct DockerManager <: ContainerManager
image = @mock image_id()
end

new(num_workers, image, Second(timeout), min_ip, max_ip)
return new(num_workers, image, Second(timeout), min_ip, max_ip)
end
end

function DockerManager(
num_workers::Integer;
image::AbstractString="",
timeout::Union{Real, Period}=DOCKER_TIMEOUT,
timeout::Union{Real,Period}=DOCKER_TIMEOUT,
min_ip::IPv4=ip"0.0.0.0",
max_ip::IPv4=ip"255.255.255.255",
)
DockerManager(num_workers, image, timeout, min_ip, max_ip)
return DockerManager(num_workers, image, timeout, min_ip, max_ip)
end

launch_timeout(mgr::DockerManager) = mgr.timeout
desired_workers(mgr::DockerManager) = mgr.num_workers, mgr.num_workers

function Base.:(==)(a::DockerManager, b::DockerManager)
return (
a.num_workers == b.num_workers &&
a.image == b.image &&
a.timeout == b.timeout
)
return (a.num_workers == b.num_workers && a.image == b.image && a.timeout == b.timeout)
end

function spawn_containers(mgr::DockerManager, override_cmd::Cmd)
Expand All @@ -88,7 +84,7 @@ function spawn_containers(mgr::DockerManager, override_cmd::Cmd)
cmd = `$cmd $override_cmd`

# Docker only allow us to spawn a job at a time
for id in 1:mgr.num_workers
for id in 1:(mgr.num_workers)
container_id = @mock read(cmd, String)
notice(LOGGER, "Spawning container: $container_id")
end
Expand Down
Loading