diff --git a/Project.toml b/Project.toml index bf81bc7..005e3f9 100644 --- a/Project.toml +++ b/Project.toml @@ -1,7 +1,7 @@ name = "AWSClusterManagers" uuid = "69f5933c-162f-5d2f-a6d8-da72a0bfad91" authors = ["Invenia Technical Computing"] -version = "1.1.4" +version = "1.1.5" [deps] AWSBatch = "dcae83d4-2881-5875-9d49-e5534165e9c0" diff --git a/README.md b/README.md index 7d33b09..b5fd82e 100644 --- a/README.md +++ b/README.md @@ -3,6 +3,7 @@ AWSClusterManagers [![CI](https://github.com/JuliaCloud/AWSClusterManagers.jl/workflows/CI/badge.svg)](https://github.com/JuliaCloud/AWSClusterManagers.jl/actions?query=workflow%3ACI) [![Bors enabled](https://bors.tech/images/badge_small.svg)](https://app.bors.tech/repositories/32323) +[![Code Style: Blue](https://img.shields.io/badge/code%20style-blue-4495d1.svg)](https://github.com/invenia/BlueStyle) [![codecov](https://codecov.io/gh/JuliaCloud/AWSClusterManagers.jl/branch/master/graph/badge.svg)](https://codecov.io/gh/JuliaCloud/AWSClusterManagers.jl) [![Stable Documentation](https://img.shields.io/badge/docs-stable-blue.svg)](https://juliacloud.github.io/AWSClusterManagers.jl/stable) diff --git a/TODO.md b/TODO.md deleted file mode 100644 index 7b06c6c..0000000 --- a/TODO.md +++ /dev/null @@ -1,42 +0,0 @@ -# TODO - -The following is a list of items that are a mostly nice-to-haves. - -## Bridge networking support - -In order to work with bridge networking in ECS we need to use the ECS API more. The basics of how we could support bridged networking is: - -1. Launch the worker tasks -2. Use the task IDs from the output of the launch and use `describe-tasks` to determine the host port used (when ephemeral is used) -3. Determine the address of the containers instance (container-id to instance-id to IP address) -4. Manager connects to workers using the information gathered - -Additionally we probably want to be able to determine if a task is using "bridge" or "host" networking. We can easily determine which "networkMode" was used via `describe-task-definition`. Note, this may not be enough as the "networkMode" may be overridden. In that case we would have to check the task itself. - - -## Task introspection - -Since tasks launch other tasks it would be good to have some kind of introspection available within a task to determine its own definition name. Note that [container agent introspection](http://docs.aws.amazon.com/AmazonECS/latest/developerguide/ecs-agent-introspection.html) does exist but it deals with the container instance and not the containers themselves. - -## TLS Sockets - -Ideally we should be using a secure channel to talk betweek the ECS tasks. - -## Review output from runtasks - -The run tasks AWS command contains a section in the output for failures. We should probably -parse the output and ensure that their are no failed tasks. - -``` -{ - "failures": [ - { - "reason": "RESOURCE:MEMORY", - "arn": "arn:aws:ecs:us-east-1:292522074875:container-instance/d6e98fba-83fe-4e52-9920-8d2bb8d5ff75" - } - ], - "tasks": [] -} -``` - -One complication with failures from run-task is that we are already waiting for a set number of workers to contact us. One alternative is to only start listening to the number of workers that stated they were launching. Unfortunately since we cannot launch all worker at once we could end up being too slow to listen if all workers are listened to at the end. Probably the solution to this is to start listening to workers as we confirm they should be coming up. \ No newline at end of file diff --git a/src/AWSClusterManagers.jl b/src/AWSClusterManagers.jl index 655a950..e37418e 100644 --- a/src/AWSClusterManagers.jl +++ b/src/AWSClusterManagers.jl @@ -14,7 +14,7 @@ const LOGGER = getlogger(@__MODULE__) function __init__() # https://invenia.github.io/Memento.jl/latest/faq/pkg-usage/ - Memento.register(LOGGER) + return Memento.register(LOGGER) end include("socket.jl") diff --git a/src/batch.jl b/src/batch.jl index 41a34ba..4830ba8 100644 --- a/src/batch.jl +++ b/src/batch.jl @@ -70,7 +70,8 @@ struct AWSBatchManager <: ContainerManager max_ip::IPv4=ip"255.255.255.255", ) min_workers >= 0 || throw(ArgumentError("min workers must be non-negative")) - min_workers <= max_workers || throw(ArgumentError("min workers exceeds max workers")) + min_workers <= max_workers || + throw(ArgumentError("min workers exceeds max workers")) # Default the queue to using the WORKER_JOB_QUEUE environmental variable. if isempty(queue) @@ -79,7 +80,18 @@ struct AWSBatchManager <: ContainerManager region = isempty(region) ? "us-east-1" : region - new(min_workers, max_workers, definition, name, queue, memory, region, Second(timeout), min_ip, max_ip) + return new( + min_workers, + max_workers, + definition, + name, + queue, + memory, + region, + Second(timeout), + min_ip, + max_ip, + ) end end @@ -95,7 +107,7 @@ function AWSBatchManager( min_ip::IPv4=ip"0.0.0.0", max_ip::IPv4=ip"255.255.255.255", ) - AWSBatchManager( + return AWSBatchManager( min_workers, max_workers, definition, @@ -105,16 +117,16 @@ function AWSBatchManager( region, timeout, min_ip, - max_ip + max_ip, ) end function AWSBatchManager(workers::UnitRange{<:Integer}; kwargs...) - AWSBatchManager(first(workers), last(workers); kwargs...) + return AWSBatchManager(first(workers), last(workers); kwargs...) end function AWSBatchManager(workers::Integer; kwargs...) - AWSBatchManager(workers, workers; kwargs...) + return AWSBatchManager(workers, workers; kwargs...) end launch_timeout(mgr::AWSBatchManager) = mgr.timeout @@ -143,32 +155,37 @@ function spawn_containers(mgr::AWSBatchManager, override_cmd::Cmd) max_compute = @mock max_vcpus(queue) if min_workers > max_compute - error(string( - "Unable to launch the minimum number of workers ($min_workers) as the ", - "minimum exceeds the max VCPUs available ($max_compute).", - )) + error( + string( + "Unable to launch the minimum number of workers ($min_workers) as the ", + "minimum exceeds the max VCPUs available ($max_compute).", + ), + ) elseif max_workers > max_compute # Note: In addition to warning the user about the VCPU cap we could also also reduce # the number of worker we request. Unfortunately since we don't know how many jobs # are currently running or how long they will take we'll leave `max_workers` # untouched. - warn(LOGGER, string( - "Due to the max VCPU limit ($max_compute) most likely only a partial amount ", - "of the requested workers ($max_workers) will be spawned.", - )) + warn( + LOGGER, + string( + "Due to the max VCPU limit ($max_compute) most likely only a partial amount ", + "of the requested workers ($max_workers) will be spawned.", + ), + ) end # Since each batch worker can only use one cpu we override the vcpus to one. - job = @mock run_batch( - name = mgr.job_name, - definition = mgr.job_definition, - queue = mgr.job_queue, - region = mgr.region, - vcpus = 1, - memory = mgr.job_memory, - cmd = override_cmd, - num_jobs = max_workers, - allow_job_registration = false, + job = @mock run_batch(; + name=mgr.job_name, + definition=mgr.job_definition, + queue=mgr.job_queue, + region=mgr.region, + vcpus=1, + memory=mgr.job_memory, + cmd=override_cmd, + num_jobs=max_workers, + allow_job_registration=false, ) if max_workers > 1 diff --git a/src/batch_node.jl b/src/batch_node.jl index f7f3e98..2801579 100644 --- a/src/batch_node.jl +++ b/src/batch_node.jl @@ -17,7 +17,9 @@ struct AWSBatchNodeManager <: ContainerManager function AWSBatchNodeManager(; timeout::Period=AWS_BATCH_NODE_TIMEOUT) if !haskey(ENV, "AWS_BATCH_JOB_ID") || !haskey(ENV, "AWS_BATCH_JOB_MAIN_NODE_INDEX") - error("Unable to use $AWSBatchNodeManager outside of a running AWS Batch multi-node parallel job") + error( + "Unable to use $AWSBatchNodeManager outside of a running AWS Batch multi-node parallel job", + ) end info(LOGGER, "AWS Batch Job ID: $(ENV["AWS_BATCH_JOB_ID"])") @@ -33,7 +35,9 @@ struct AWSBatchNodeManager <: ContainerManager end end -function Distributed.launch(manager::AWSBatchNodeManager, params::Dict, launched::Array, c::Condition) +function Distributed.launch( + manager::AWSBatchNodeManager, params::Dict, launched::Array, c::Condition +) num_workers = manager.num_workers connected_workers = 0 @@ -69,10 +73,7 @@ function Distributed.launch(manager::AWSBatchNodeManager, params::Dict, launched # address and port. config = WorkerConfig() config.io = sock - config.userdata = (; - :job_id => job_id, - :node_index => node_index, - ) + config.userdata = (; :job_id => job_id, :node_index => node_index) push!(workers, config) end @@ -86,11 +87,14 @@ function Distributed.launch(manager::AWSBatchNodeManager, params::Dict, launched # # Note: Julia worker numbers will not match up to the node index of the worker. # Primarily this is due to the worker numbers being 1-indexed while nodes are 0-indexed. - append!(launched, sort!(workers, by=w -> w.userdata.node_index)) + append!(launched, sort!(workers; by=w -> w.userdata.node_index)) notify(c) if connected_workers < num_workers - warn(LOGGER, "Only $connected_workers of the $num_workers workers job have reported in") + warn( + LOGGER, + "Only $connected_workers of the $num_workers workers job have reported in", + ) else debug(LOGGER, "All workers have successfully reported in") end @@ -98,7 +102,9 @@ end function start_batch_node_worker() if !haskey(ENV, "AWS_BATCH_JOB_ID") || !haskey(ENV, "AWS_BATCH_JOB_NODE_INDEX") - error("Unable to start a worker outside of a running AWS Batch multi-node parallel job") + error( + "Unable to start a worker outside of a running AWS Batch multi-node parallel job", + ) end info(LOGGER, "AWS Batch Job ID: $(ENV["AWS_BATCH_JOB_ID"])") @@ -151,8 +157,8 @@ function start_batch_node_worker() # Establish a connection to the manager. If the manager is slow to startup the worker # will attempt to connect for ~2 minutes. manager_connect = retry( - () -> connect(manager_ip, AWS_BATCH_JOB_NODE_PORT), - delays=ExponentialBackOff(n=8, max_delay=30), + () -> connect(manager_ip, AWS_BATCH_JOB_NODE_PORT); + delays=ExponentialBackOff(; n=8, max_delay=30), check=(s, e) -> e isa Base.IOError, ) sock = manager_connect() diff --git a/src/container.jl b/src/container.jl index a4f50f1..c21a1be 100644 --- a/src/container.jl +++ b/src/container.jl @@ -34,14 +34,15 @@ The minimum and maximum number of workers wanted by the manager. """ desired_workers(::ContainerManager) -function Distributed.launch(manager::ContainerManager, params::Dict, launched::Array, c::Condition) +function Distributed.launch( + manager::ContainerManager, params::Dict, launched::Array, c::Condition +) min_workers, max_workers = desired_workers(manager) num_workers = 0 # Determine the IP address of the current host within the specified range ips = filter!(getipaddrs()) do ip - typeof(ip) === typeof(manager.min_ip) && - manager.min_ip <= ip <= manager.max_ip + typeof(ip) === typeof(manager.min_ip) && manager.min_ip <= ip <= manager.max_ip end valid_ip = first(ips) @@ -108,7 +109,9 @@ function Distributed.launch(manager::ContainerManager, params::Dict, launched::A end end -function Distributed.manage(manager::ContainerManager, id::Integer, config::WorkerConfig, op::Symbol) +function Distributed.manage( + manager::ContainerManager, id::Integer, config::WorkerConfig, op::Symbol +) # Note: Terminating the TCP connection from the master to the worker will cause the # worker to shutdown automatically. end diff --git a/src/docker.jl b/src/docker.jl index 99d0eef..dd4c7fb 100644 --- a/src/docker.jl +++ b/src/docker.jl @@ -57,29 +57,25 @@ struct DockerManager <: ContainerManager image = @mock image_id() end - new(num_workers, image, Second(timeout), min_ip, max_ip) + return new(num_workers, image, Second(timeout), min_ip, max_ip) end end function DockerManager( num_workers::Integer; image::AbstractString="", - timeout::Union{Real, Period}=DOCKER_TIMEOUT, + timeout::Union{Real,Period}=DOCKER_TIMEOUT, min_ip::IPv4=ip"0.0.0.0", max_ip::IPv4=ip"255.255.255.255", ) - DockerManager(num_workers, image, timeout, min_ip, max_ip) + return DockerManager(num_workers, image, timeout, min_ip, max_ip) end launch_timeout(mgr::DockerManager) = mgr.timeout desired_workers(mgr::DockerManager) = mgr.num_workers, mgr.num_workers function Base.:(==)(a::DockerManager, b::DockerManager) - return ( - a.num_workers == b.num_workers && - a.image == b.image && - a.timeout == b.timeout - ) + return (a.num_workers == b.num_workers && a.image == b.image && a.timeout == b.timeout) end function spawn_containers(mgr::DockerManager, override_cmd::Cmd) @@ -88,7 +84,7 @@ function spawn_containers(mgr::DockerManager, override_cmd::Cmd) cmd = `$cmd $override_cmd` # Docker only allow us to spawn a job at a time - for id in 1:mgr.num_workers + for id in 1:(mgr.num_workers) container_id = @mock read(cmd, String) notice(LOGGER, "Spawning container: $container_id") end diff --git a/src/socket.jl b/src/socket.jl index c4023eb..bfcc9ab 100644 --- a/src/socket.jl +++ b/src/socket.jl @@ -1,7 +1,7 @@ using Sockets: IPAddr, IPv4, IPv6 # Copy of `Sockets._sizeof_uv_interface_address` -const _sizeof_uv_interface_address = ccall(:jl_uv_sizeof_interface_address,Int32,()) +const _sizeof_uv_interface_address = ccall(:jl_uv_sizeof_interface_address, Int32, ()) # https://github.com/JuliaLang/julia/pull/30349 if VERSION < v"1.2.0-DEV.56" @@ -12,18 +12,31 @@ if VERSION < v"1.2.0-DEV.56" addr_ref = Ref{Ptr{UInt8}}(C_NULL) count_ref = Ref{Int32}(1) lo_present = false - err = ccall(:jl_uv_interface_addresses, Int32, (Ref{Ptr{UInt8}}, Ref{Int32}), addr_ref, count_ref) + err = ccall( + :jl_uv_interface_addresses, + Int32, + (Ref{Ptr{UInt8}}, Ref{Int32}), + addr_ref, + count_ref, + ) uv_error("getlocalip", err) addr, count = addr_ref[], count_ref[] - for i = 0:(count-1) - current_addr = addr + i*_sizeof_uv_interface_address - if 1 == ccall(:jl_uv_interface_address_is_internal, Int32, (Ptr{UInt8},), current_addr) + for i in 0:(count - 1) + current_addr = addr + i * _sizeof_uv_interface_address + if 1 == ccall( + :jl_uv_interface_address_is_internal, Int32, (Ptr{UInt8},), current_addr + ) lo_present = true continue end - sockaddr = ccall(:jl_uv_interface_address_sockaddr, Ptr{Cvoid}, (Ptr{UInt8},), current_addr) + sockaddr = ccall( + :jl_uv_interface_address_sockaddr, Ptr{Cvoid}, (Ptr{UInt8},), current_addr + ) if ccall(:jl_sockaddr_in_is_ip4, Int32, (Ptr{Cvoid},), sockaddr) == 1 - push!(addresses, IPv4(ntoh(ccall(:jl_sockaddr_host4, UInt32, (Ptr{Cvoid},), sockaddr)))) + push!( + addresses, + IPv4(ntoh(ccall(:jl_sockaddr_host4, UInt32, (Ptr{Cvoid},), sockaddr))), + ) end end ccall(:uv_free_interface_addresses, Cvoid, (Ptr{UInt8}, Int32), addr, count) @@ -33,7 +46,6 @@ else using Sockets: getipaddrs end - """ is_link_local(ip::IPv4) -> Bool @@ -42,7 +54,6 @@ Determine if the IP address is within the [link-local address] """ is_link_local(ip::IPv4) = ip"169.254.0.0" <= ip <= ip"169.254.255.255" - # Julia structure mirroring `uv_interface_address_t` # http://docs.libuv.org/en/v1.x/misc.html#c.uv_interface_address_t struct InterfaceAddress{T<:IPAddr} @@ -64,20 +75,33 @@ function get_interface_addrs() addr_ref = Ref{Ptr{UInt8}}(C_NULL) count_ref = Ref{Int32}(1) lo_present = false - err = ccall(:jl_uv_interface_addresses, Int32, (Ref{Ptr{UInt8}}, Ref{Int32}), addr_ref, count_ref) + err = ccall( + :jl_uv_interface_addresses, + Int32, + (Ref{Ptr{UInt8}}, Ref{Int32}), + addr_ref, + count_ref, + ) Base.uv_error("getlocalip", err) addr, count = addr_ref[], count_ref[] - for i = 0:(count - 1) + for i in 0:(count - 1) current_addr = addr + i * _sizeof_uv_interface_address # Note: Extracting interface name without a proper accessor name = unsafe_string(unsafe_load(Ptr{Cstring}(current_addr))) - is_internal = ccall(:jl_uv_interface_address_is_internal, Int32, (Ptr{UInt8},), current_addr) == 1 - sockaddr = ccall(:jl_uv_interface_address_sockaddr, Ptr{Cvoid}, (Ptr{UInt8},), current_addr) + is_internal = + ccall( + :jl_uv_interface_address_is_internal, Int32, (Ptr{UInt8},), current_addr + ) == 1 + sockaddr = ccall( + :jl_uv_interface_address_sockaddr, Ptr{Cvoid}, (Ptr{UInt8},), current_addr + ) ip = if ccall(_jl_sockaddr_is_ip4, Int32, (Ptr{Cvoid},), sockaddr) == 1 IPv4(ntoh(ccall(:jl_sockaddr_host4, UInt32, (Ptr{Cvoid},), sockaddr))) elseif ccall(_jl_sockaddr_is_ip6, Int32, (Ptr{Cvoid},), sockaddr) == 1 addr6 = Ref{UInt128}() - scope_id = ccall(:jl_sockaddr_host6, UInt32, (Ptr{Cvoid}, Ref{UInt128},), sockaddr, addr6) + scope_id = ccall( + :jl_sockaddr_host6, UInt32, (Ptr{Cvoid}, Ref{UInt128}), sockaddr, addr6 + ) IPv6(ntoh(addr6[])) end push!(addresses, InterfaceAddress(name, is_internal, ip)) diff --git a/test/batch.jl b/test/batch.jl index 1fb2846..12533cd 100644 --- a/test/batch.jl +++ b/test/batch.jl @@ -38,7 +38,7 @@ const BATCH_SPAWN_REGEX = r"Spawning (array )?job: (?[0-9a-f\-]+)(?(1) \(n=( @testset "keywords" begin mgr = AWSBatchManager( 3, - 4, + 4; definition="keyword-def", name="keyword-name", queue="keyword-queue", @@ -97,7 +97,7 @@ const BATCH_SPAWN_REGEX = r"Spawning (array )?job: (?[0-9a-f\-]+)(?(1) \(n=( @test mgr.job_queue == "worker" # Use the queue passed in - mgr = AWSBatchManager(3, queue="special") + mgr = AWSBatchManager(3; queue="special") @test mgr.job_queue == "special" end end @@ -121,7 +121,7 @@ const BATCH_SPAWN_REGEX = r"Spawning (array )?job: (?[0-9a-f\-]+)(?(1) \(n=( @patch AWSBatch.max_vcpus(::JobQueue) = 1 @patch function AWSBatch.run_batch(; kwargs...) @async run(kwargs[:cmd]) - BatchJob("00000000-0000-0000-0000-000000000001") + return BatchJob("00000000-0000-0000-0000-000000000001") end ] @@ -130,7 +130,7 @@ const BATCH_SPAWN_REGEX = r"Spawning (array )?job: (?[0-9a-f\-]+)(?(1) \(n=( init_procs = procs() # Add a single AWSBatchManager worker added_procs = @test_log LOGGER "notice" BATCH_SPAWN_REGEX begin - addprocs(AWSBatchManager(1)) + addprocs(AWSBatchManager(1)) end # Check that the workers are available @test length(added_procs) == 1 @@ -148,7 +148,7 @@ const BATCH_SPAWN_REGEX = r"Spawning (array )?job: (?[0-9a-f\-]+)(?(1) \(n=( @patch AWSBatch.max_vcpus(::JobQueue) = 1 @patch function AWSBatch.run_batch(; kwargs...) # Avoiding spawning a worker process - BatchJob("00000000-0000-0000-0000-000000000002") + return BatchJob("00000000-0000-0000-0000-000000000002") end ] @@ -162,25 +162,31 @@ const BATCH_SPAWN_REGEX = r"Spawning (array )?job: (?[0-9a-f\-]+)(?(1) \(n=( @testset "VCPU limit" begin @testset "minimum exceeds" begin patches = [ - @patch AWSBatch.JobQueue(queue::AbstractString) = JobQueue(mock_queue_arn) + @patch function AWSBatch.JobQueue(queue::AbstractString) + return JobQueue(mock_queue_arn) + end @patch AWSBatch.max_vcpus(::JobQueue) = 3 ] apply(patches) do - @test_throws TaskFailedException addprocs(AWSBatchManager(4, timeout=Second(5))) + @test_throws TaskFailedException addprocs( + AWSBatchManager(4; timeout=Second(5)) + ) @test nprocs() == 1 end end @testset "maximum exceeds" begin patches = [ - @patch AWSBatch.JobQueue(queue::AbstractString) = JobQueue(mock_queue_arn) + @patch function AWSBatch.JobQueue(queue::AbstractString) + return JobQueue(mock_queue_arn) + end @patch AWSBatch.max_vcpus(::JobQueue) = 1 @patch function AWSBatch.run_batch(; kwargs...) for _ in 1:kwargs[:num_jobs] @async run(kwargs[:cmd]) end - BatchJob("00000000-0000-0000-0000-000000000004") + return BatchJob("00000000-0000-0000-0000-000000000004") end ] msg = string( @@ -189,7 +195,7 @@ const BATCH_SPAWN_REGEX = r"Spawning (array )?job: (?[0-9a-f\-]+)(?(1) \(n=( ) apply(patches) do added_procs = @test_log LOGGER "warn" msg begin - addprocs(AWSBatchManager(0:2, timeout=Second(5))) + addprocs(AWSBatchManager(0:2; timeout=Second(5))) end @test length(added_procs) > 0 rmprocs(added_procs; waitfor=5.0) diff --git a/test/batch_node_online.jl b/test/batch_node_online.jl index 901d686..94546d5 100644 --- a/test/batch_node_online.jl +++ b/test/batch_node_online.jl @@ -49,10 +49,8 @@ function batch_node_job_definition(; "jobRoleArn" => job_role_arn, "vcpus" => 1, "memory" => 1024, # MiB - "command" => [ - "julia", "-e", manager_code, - ], - ) + "command" => ["julia", "-e", manager_code], + ), ), Dict( "targetNodes" => "1:", @@ -66,22 +64,19 @@ function batch_node_job_definition(; "-c", "julia $bind_to -e $(bash_quote(worker_code))", ], - ) - ) - ] + ), + ), + ], ), # Retrying to handle EC2 instance failures and internal AWS issues: # https://docs.aws.amazon.com/batch/latest/userguide/job_retries.html - "retryStrategy" => Dict( - "attempts" => 3, - ), + "retryStrategy" => Dict("attempts" => 3), ) end # Use single quotes so that no shell interpolation occurs. bash_quote(str::AbstractString) = string("'", replace(str, "'" => "'\\''"), "'") - # AWS Batch parallel multi-node jobs will only run on on-demand clusters. When running # on spot the jobs will remain stuck in the RUNNABLE state let ce = describe_compute_environment(STACK["ComputeEnvironmentArn"]) @@ -89,16 +84,14 @@ let ce = describe_compute_environment(STACK["ComputeEnvironmentArn"]) error( "Aborting as compute environment $(STACK["ComputeEnvironmentArn"]) is not " * "using on-demand instances which are required for AWS Batch multi-node " * - "parallel jobs." + "parallel jobs.", ) end end - const BATCH_NODE_INDEX_REGEX = r"Worker job (?\d+): (?\d+)" const BATCH_NODE_JOB_DEF = register_job_definition(batch_node_job_definition()) # ARN - # Spawn all of the AWS Batch jobs at once in order to make online tests run faster. Each # job spawned below has a corresponding testset. Ideally, the job spawning would be # contained within the testset bun unfortunately that doesn't seem possible as `@sync` and @@ -106,9 +99,8 @@ const BATCH_NODE_JOB_DEF = register_job_definition(batch_node_job_definition()) const BATCH_NODE_JOBS = Dict{String,BatchJob}() let job_name = "test-worker-spawn-success" - BATCH_NODE_JOBS[job_name] = submit_job( - job_name=job_name, - job_definition=BATCH_NODE_JOB_DEF, + BATCH_NODE_JOBS[job_name] = submit_job(; + job_name=job_name, job_definition=BATCH_NODE_JOB_DEF ) end @@ -118,19 +110,13 @@ let job_name = "test-worker-spawn-failure" "nodePropertyOverrides" => [ Dict( "targetNodes" => "1:", - "containerOverrides" => Dict( - "command" => ["bash", "-c", "exit 0"], - ) - ) - ] - + "containerOverrides" => Dict("command" => ["bash", "-c", "exit 0"]), + ), + ], ) - - BATCH_NODE_JOBS[job_name] = submit_job( - job_name=job_name, - job_definition=BATCH_NODE_JOB_DEF, - node_overrides=overrides, + BATCH_NODE_JOBS[job_name] = submit_job(; + job_name=job_name, job_definition=BATCH_NODE_JOB_DEF, node_overrides=overrides ) end @@ -152,17 +138,15 @@ let job_name = "test-worker-link-local" catch e # Prevents the job from failing so we can retry AWS errors showerror(stderr, e, catch_backtrace()) end - """ - ] - ) - ) - ] + """, + ], + ), + ), + ], ) - BATCH_NODE_JOBS[job_name] = submit_job( - job_name=job_name, - job_definition=BATCH_NODE_JOB_DEF, - node_overrides=overrides, + BATCH_NODE_JOBS[job_name] = submit_job(; + job_name=job_name, job_definition=BATCH_NODE_JOB_DEF, node_overrides=overrides ) end @@ -188,16 +172,14 @@ let job_name = "test-worker-link-local-bind-to" "bash", "-c", "julia $bind_to -e $(bash_quote(worker_code))", - ] - ) - ) - ] + ], + ), + ), + ], ) - BATCH_NODE_JOBS[job_name] = submit_job( - job_name=job_name, - job_definition=BATCH_NODE_JOB_DEF, - node_overrides=overrides, + BATCH_NODE_JOBS[job_name] = submit_job(; + job_name=job_name, job_definition=BATCH_NODE_JOB_DEF, node_overrides=overrides ) end @@ -221,17 +203,14 @@ let job_name = "test-slow-manager" "nodePropertyOverrides" => [ Dict( "targetNodes" => "0", - "containerOverrides" => Dict( - "command" => ["julia", "-e", manager_code] - ) - ) - ] + "containerOverrides" => + Dict("command" => ["julia", "-e", manager_code]), + ), + ], ) - BATCH_NODE_JOBS[job_name] = submit_job( - job_name=job_name, - job_definition=BATCH_NODE_JOB_DEF, - node_overrides=overrides, + BATCH_NODE_JOBS[job_name] = submit_job(; + job_name=job_name, job_definition=BATCH_NODE_JOB_DEF, node_overrides=overrides ) end @@ -315,11 +294,8 @@ let job_name = "test-worker-timeout" "nodePropertyOverrides" => [ Dict( "targetNodes" => "0", - "containerOverrides" => Dict( - "command" => [ - "julia", "-e", manager_code, - ], - ) + "containerOverrides" => + Dict("command" => ["julia", "-e", manager_code]), ), Dict( "targetNodes" => "1:", @@ -328,20 +304,17 @@ let job_name = "test-worker-timeout" "bash", "-c", "julia $bind_to -e $(bash_quote(worker_code))", - ] - ) - ) - ] + ], + ), + ), + ], ) - BATCH_NODE_JOBS[job_name] = submit_job( - job_name=job_name, - job_definition=BATCH_NODE_JOB_DEF, - node_overrides=overrides, + BATCH_NODE_JOBS[job_name] = submit_job(; + job_name=job_name, job_definition=BATCH_NODE_JOB_DEF, node_overrides=overrides ) end - @testset "AWSBatchNodeManager (online)" begin # Note: Alternatively we could test report via Mocking but since the function is only # used for online testing and this particular test doesn't require an additional AWS diff --git a/test/batch_online.jl b/test/batch_online.jl index 442a8c2..43b73db 100644 --- a/test/batch_online.jl +++ b/test/batch_online.jl @@ -3,7 +3,6 @@ # but are usually instant and instances take around 4 minutes before they are ready. const TIMEOUT = Minute(15) - # Scrapes the log output to determine the worker job IDs as stated by the manager function scrape_worker_job_ids(output::AbstractString) m = match(BATCH_SPAWN_REGEX, output) @@ -22,7 +21,12 @@ function scrape_worker_job_ids(output::AbstractString) end end -function run_batch_job(image_name::AbstractString, num_workers::Integer; timeout::Period=TIMEOUT, should_fail::Bool=false) +function run_batch_job( + image_name::AbstractString, + num_workers::Integer; + timeout::Period=TIMEOUT, + should_fail::Bool=false, +) # TODO: Use AWS Batch job parameters to avoid re-registering the job timeout_secs = Dates.value(Second(timeout)) @@ -61,21 +65,21 @@ function run_batch_job(image_name::AbstractString, num_workers::Integer; timeout # - 64 workers with a manager with 1024 MB of memory info(LOGGER, "Submitting AWS Batch job with $num_workers workers") job = run_batch(; - name = STACK["JobName"] * "-n$num_workers", - queue = STACK["ManagerJobQueueArn"], - definition = STACK["JobDefinitionName"], - image = image_name, - role = STACK["JobRoleArn"], - vcpus = 1, - memory = 2048, - cmd = Cmd(["julia", "-e", code]), + name=STACK["JobName"] * "-n$num_workers", + queue=STACK["ManagerJobQueueArn"], + definition=STACK["JobDefinitionName"], + image=image_name, + role=STACK["JobRoleArn"], + vcpus=1, + memory=2048, + cmd=Cmd(["julia", "-e", code]), ) # If no compute environment resources are available it could take around # 5 minutes before the manager job is running info(LOGGER, "Waiting for AWS Batch manager job $(job.id) to run (~5 minutes)") start_time = time() - @test wait(state -> state < AWSBatch.RUNNING, job, timeout=timeout_secs) == true + @test wait(state -> state < AWSBatch.RUNNING, job; timeout=timeout_secs) == true info(LOGGER, "Manager spawning duration: $(time_str(time() - start_time))") # Once the manager job is running it will spawn additional AWS Batch jobs as @@ -87,9 +91,10 @@ function run_batch_job(image_name::AbstractString, num_workers::Integer; timeout info(LOGGER, "Waiting for AWS Batch workers and manager job to complete (~5 minutes)") start_time = time() if should_fail - @test wait(job, [AWSBatch.FAILED], [AWSBatch.SUCCEEDED], timeout=timeout_secs) == true + @test wait(job, [AWSBatch.FAILED], [AWSBatch.SUCCEEDED]; timeout=timeout_secs) == + true else - @test wait(job, [AWSBatch.SUCCEEDED], timeout=timeout_secs) == true + @test wait(job, [AWSBatch.SUCCEEDED]; timeout=timeout_secs) == true end info(LOGGER, "Worker spawning duration: $(time_str(time() - start_time))") @@ -106,7 +111,9 @@ function run_batch_job(image_name::AbstractString, num_workers::Integer; timeout log_wait_start = time() while true events = log_events(job) - if events !== nothing && !isempty(events) && any(e -> e.message == "Manager Complete", events) + if events !== nothing && + !isempty(events) && + any(e -> e.message == "Manager Complete", events) break elseif time() - log_wait_start > 60 error("CloudWatch logs have not completed ingestion within 1 minute") @@ -118,7 +125,6 @@ function run_batch_job(image_name::AbstractString, num_workers::Integer; timeout return job end - @testset "AWSBatchManager (online)" begin # Note: Start with the largest number of workers so the remaining tests don't have # to wait for the cluster to scale up on subsequent tests. @@ -136,8 +142,12 @@ end # Spawned are the AWS Batch job IDs reported upon job submission at launch # while reported is the self-reported job ID of each worker. spawned_jobs = scrape_worker_job_ids(output) - reported_jobs = [m[1] for m in eachmatch(r"Worker job \d+: ([0-9a-f\-]+(?:\:\d+)?)", output)] - reported_containers = [m[1] for m in eachmatch(r"Worker container \d+: ([0-9a-f]*)", output)] + reported_jobs = [ + m[1] for m in eachmatch(r"Worker job \d+: ([0-9a-f\-]+(?:\:\d+)?)", output) + ] + reported_containers = [ + m[1] for m in eachmatch(r"Worker container \d+: ([0-9a-f]*)", output) + ] @test num_procs == num_workers + 1 if num_workers > 0 diff --git a/test/docker.jl b/test/docker.jl index 9ab102e..7836e0b 100644 --- a/test/docker.jl +++ b/test/docker.jl @@ -31,7 +31,7 @@ const DOCKER_SPAWN_REGEX = r"^Spawning container: [0-9a-z]{12}$" @testset "keywords" begin mgr = DockerManager( - 3, + 3; image=mock_image, timeout=Minute(12), min_ip=ip"3.0.0.0", @@ -97,7 +97,7 @@ const DOCKER_SPAWN_REGEX = r"^Spawning container: [0-9a-z]{12}$" init_procs = procs() # Add a single AWSBatchManager worker added_procs = @test_log LOGGER "notice" DOCKER_SPAWN_REGEX begin - addprocs(DockerManager(1, mock_image)) + addprocs(DockerManager(1, mock_image)) end # Check that the workers are available @test length(added_procs) == 1 diff --git a/test/docker_online.jl b/test/docker_online.jl index 4889683..2b8c7a0 100644 --- a/test/docker_online.jl +++ b/test/docker_online.jl @@ -6,12 +6,13 @@ end @testset "container_id" begin - container_id = read(``` - docker run - -i $TEST_IMAGE - julia -e "using AWSClusterManagers: container_id; print(container_id())" - ```, - String + container_id = read( + ``` +docker run +-i $TEST_IMAGE +julia -e "using AWSClusterManagers: container_id; print(container_id())" +```, + String, ) test_result = @test !isempty(container_id) @@ -25,13 +26,14 @@ end @testset "image_id" begin - image_id = read(``` - docker run - -v /var/run/docker.sock:/var/run/docker.sock - -i $TEST_IMAGE - julia -e "using AWSClusterManagers: image_id; print(image_id())" - ```, - String + image_id = read( + ``` +docker run +-v /var/run/docker.sock:/var/run/docker.sock +-i $TEST_IMAGE +julia -e "using AWSClusterManagers: image_id; print(image_id())" +```, + String, ) @test !isempty(image_id) @@ -61,14 +63,15 @@ """ # Run the code in a docker container, but replace the newlines with semi-colons. - output = read(``` - docker run - --network=host - -v /var/run/docker.sock:/var/run/docker.sock - -i $TEST_IMAGE - julia -e $(replace(code, r"\n+" => "; ")) - ```, - String + output = read( + ``` + docker run + --network=host + -v /var/run/docker.sock:/var/run/docker.sock + -i $TEST_IMAGE + julia -e $(replace(code, r"\n+" => "; ")) + ```, + String, ) m = match(r"(?<=NumProcs: )\d+", output) @@ -76,9 +79,15 @@ # Spawned is the list container IDs reported by the manager upon launch while # reported is the self-reported container ID of each worker. - spawned_containers = map(m -> m.match, eachmatch(r"(?<=Spawning container: )[0-9a-f\-]+", output)) - reported_containers = map(m -> m.match, eachmatch(r"(?<=Worker container \d: )[0-9a-f\-]+", output)) - reported_images = map(m -> m.match, eachmatch(r"(?<=Worker image \d: )[0-9a-f\-]+", output)) + spawned_containers = map( + m -> m.match, eachmatch(r"(?<=Spawning container: )[0-9a-f\-]+", output) + ) + reported_containers = map( + m -> m.match, eachmatch(r"(?<=Worker container \d: )[0-9a-f\-]+", output) + ) + reported_images = map( + m -> m.match, eachmatch(r"(?<=Worker image \d: )[0-9a-f\-]+", output) + ) @test num_procs == num_workers + 1 @test length(reported_containers) == num_workers diff --git a/test/runtests.jl b/test/runtests.jl index 5f39746..afc74a7 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -19,12 +19,11 @@ using Test Mocking.activate() const LOGGER = Memento.config!("info"; fmt="[{date} | {level} | {name}]: {msg}") - # https://github.com/JuliaLang/julia/pull/32814 +# https://github.com/JuliaLang/julia/pull/32814 if VERSION < v"1.3.0-alpha.110" const TaskFailedException = ErrorException end - const PKG_DIR = abspath(@__DIR__, "..") # Enables the running of the "docker" and "batch" online tests. e.g ONLINE=docker,batch @@ -33,7 +32,8 @@ const ONLINE = split(strip(get(ENV, "ONLINE", "")), r"\s*,\s*"; keepempty=false) # Run the tests on a stack created with the "test/batch.yml" CloudFormation template const STACK_NAME = get(ENV, "STACK_NAME", "") const STACK = !isempty(STACK_NAME) ? stack_output(STACK_NAME) : Dict() -const ECR = !isempty(STACK) ? first(split(STACK["EcrUri"], ':')) : "aws-cluster-managers-test" +const ECR = + !isempty(STACK) ? first(split(STACK["EcrUri"], ':')) : "aws-cluster-managers-test" const GIT_DIR = joinpath(@__DIR__, "..", ".git") const REV = if isdir(GIT_DIR) diff --git a/test/utils.jl b/test/utils.jl index f65e513..e979685 100644 --- a/test/utils.jl +++ b/test/utils.jl @@ -14,7 +14,7 @@ function report(io::IO, job::BatchJob) println(io) log_str = log_messages(job) - !isempty(log_str) && println(io, '\n', log_str) + return !isempty(log_str) && println(io, '\n', log_str) end report(job::BatchJob) = sprint(report, job) @@ -58,9 +58,7 @@ function submit_job(; retry_strategy::Dict=Dict(), ) options = Dict{String,Any}( - "jobName" => job_name, - "jobDefinition" => job_definition, - "jobQueue" => job_queue, + "jobName" => job_name, "jobDefinition" => job_definition, "jobQueue" => job_queue ) if !isempty(node_overrides) @@ -86,7 +84,8 @@ function describe_compute_environment(compute_environment::AbstractString) # --query computeEnvironments[0] # ``` output = AWSCore.Services.batch( - "POST", "/v1/describecomputeenvironments", + "POST", + "/v1/describecomputeenvironments", Dict("computeEnvironments" => [compute_environment]), ) @@ -104,7 +103,7 @@ function wait_finish(job::BatchJob; timeout::Period=Minute(20)) info(LOGGER, "Waiting for AWS Batch job to finish (~5 minutes)") # TODO: Disable logging from wait? Or at least emit timestamps - wait(job, [AWSBatch.FAILED, AWSBatch.SUCCEEDED], timeout=timeout_secs) # TODO: Support timeout as Period + wait(job, [AWSBatch.FAILED, AWSBatch.SUCCEEDED]; timeout=timeout_secs) # TODO: Support timeout as Period duration = job_duration(job) runtime = job_runtime(job) info(LOGGER, "Job duration: $(time_str(duration)), Job runtime: $(time_str(runtime))")