diff --git a/README.md b/README.md index 827c1e3f..74165fab 100644 --- a/README.md +++ b/README.md @@ -20,6 +20,7 @@ The scheduler tries to schedule similar builds on the same machine, to benefit f * [Docker jobs](#docker-jobs) * [OBuilder jobs](#obuilder-jobs) * [Admin](#admin) + * [Fair queuing](#fair-queuing) * [API](#api) * [Security model](#security-model) * [Prometheus metrics](#prometheus-metrics) @@ -236,6 +237,44 @@ may take a while. The `restart` command waits until the worker has reconnected b You can also give the name of a worker as an extra argument to update just that worker. +### Fair queuing + +Some clients may submit many jobs in batches. +In this case, we probably want other client's jobs to enter the queue ahead of them, even if they are submitted afterwards. + +To handle this, each client (separately for each pool) can be configured with a "rate", +which is the rate at which they can expect to use the cluster (the number of jobs they will have running at once). +If the cluster has free capacity then this has no effect; all jobs will be run. +However, when queuing the scheduler will use this information to try to schedule jobs so that they run when they +would have run if the client was using the cluster at the expected rate. + +For example: + +1. The admin sets Alice's rate to 2 and Bob's rate the 1 (the default), using `ocluster-admin set-rate`. +2. Bob submits 10 jobs, each estimated to run for 1 minute. +3. Because Bob's rate is one, the cluster assigns these jobs "fair start times" of now, +1m, +2m, +3m, etc. +4. The cluster will start as many of these jobs as it has capacity for. If its capacity is 3, Bob's first three jobs will start. +5. Alice submits two jobs, also taking one minute each. + The cluster assigns these jobs fair start times of now and now+30s. + These will be the next two jobs to run, because their start times are before all of Bob's jobs. + +In the `ocluster-admin show` output, you will see these values. +For example: + +``` +... +queue: (backlog) [bob:job8@10m alice:job1@27s] +clients: alice(5)+2m bob(3) +``` + +This means that: + +- There are two jobs on the backlog: Alice's `job1` (fair-start time 27s from now), and Bob's `job8` + (fair start time 10m from now). Alice's job will go first, because it has the lower start time. +- Alice has a rate of 5 jobs (5 job-seconds per second) and her next job will have a fair start time + of 2 minutes from now (because she has already submitted more jobs than her rate). +- Bob has a rate of 3 and no penalty. His next job will get a fair start time of now. + ## API diff --git a/api/pool_admin.ml b/api/pool_admin.ml index 624e5634..c0d7cf60 100644 --- a/api/pool_admin.ml +++ b/api/pool_admin.ml @@ -6,7 +6,7 @@ type worker_info = { active : bool; } -let local ~show ~workers ~worker ~set_active ~update = +let local ~show ~workers ~worker ~set_active ~update ~set_rate = let module X = Raw.Service.PoolAdmin in X.local @@ object inherit X.service @@ -58,6 +58,15 @@ let local ~show ~workers ~worker ~set_active ~update = let name = Params.worker_get params in release_param_caps (); update name + + method set_rate_impl params release_param_caps = + let open X.SetRate in + let client_id = Params.id_get params in + let rate = Params.rate_get params in + release_param_caps (); + match set_rate ~client_id rate with + | Ok () -> Service.return_empty () + | Error `No_such_user -> Service.fail "No such user" end module X = Raw.Client.PoolAdmin @@ -97,3 +106,10 @@ let update t worker = let request, params = Capability.Request.create Params.init_pointer in Params.worker_set params worker; Capability.call_for_unit t method_id request + +let set_rate t ~client_id rate = + let open X.SetRate in + let request, params = Capability.Request.create Params.init_pointer in + Params.id_set params client_id; + Params.rate_set params rate; + Capability.call_for_unit_exn t method_id request diff --git a/api/registration.ml b/api/registration.ml index 6c47b043..46d3d24f 100644 --- a/api/registration.ml +++ b/api/registration.ml @@ -8,11 +8,7 @@ let local ~register = method register_impl params release_param_caps = let open X.Register in let name = Params.name_get params in - let capacity = - let x = Params.capacity_get_int_exn params in - if x > 0 then x - else 32 (* Old workers don't report their capacity. *) - in + let capacity = Params.capacity_get_int_exn params in let worker = Params.worker_get params in release_param_caps (); match worker with diff --git a/api/schema.capnp b/api/schema.capnp index 1b68efa9..89cf872d 100644 --- a/api/schema.capnp +++ b/api/schema.capnp @@ -139,6 +139,9 @@ interface PoolAdmin { update @4 (worker :Text) -> (); # Drain worker, ask it to restart with the latest version, and return when it comes back. + + setRate @5 (id :Text, rate :Float64) -> (); + # Set the expected share of the pool for this client. } interface Admin { diff --git a/bin/admin.ml b/bin/admin.ml index 3893f75f..4f457c63 100644 --- a/bin/admin.ml +++ b/bin/admin.ml @@ -36,6 +36,11 @@ let list_clients cap_path = | [] -> Fmt.epr "No clients.@." | clients -> List.iter print_endline clients +let set_rate cap_path pool_id client_id rate = + run cap_path @@ fun admin_service -> + let pool = Cluster_api.Admin.pool admin_service pool_id in + Cluster_api.Pool_admin.set_rate pool ~client_id rate + let show cap_path pool = run cap_path @@ fun admin_service -> match pool with @@ -140,8 +145,8 @@ let connect_addr = ~docv:"ADDR" [] -let client_id = - Arg.pos 1 Arg.(some string) None @@ +let client_id ~pos = + Arg.pos pos Arg.(some string) None @@ Arg.info ~doc:"Unique name or ID for the client" ~docv:"ID" @@ -154,6 +159,13 @@ let pool_pos = ~docv:"POOL" [] +let rate ~pos = + Arg.pos pos Arg.(some float) None @@ + Arg.info + ~doc:"Number of parallel jobs" + ~docv:"RATE" + [] + let worker = Arg.value @@ Arg.pos 2 Arg.(some string) None @@ @@ -171,12 +183,12 @@ let all = let add_client = let doc = "Create a new client endpoint for submitting jobs" in - Term.(const add_client $ connect_addr $ Arg.required client_id), + Term.(const add_client $ connect_addr $ Arg.required (client_id ~pos:1)), Term.info "add-client" ~doc let remove_client = let doc = "Unregister a client." in - Term.(const remove_client $ connect_addr $ Arg.required client_id), + Term.(const remove_client $ connect_addr $ Arg.required (client_id ~pos:1)), Term.info "remove-client" ~doc let list_clients = @@ -184,6 +196,11 @@ let list_clients = Term.(const list_clients $ connect_addr), Term.info "list-clients" ~doc +let set_rate = + let doc = "Set expected number of parallel jobs for a pool/client combination" in + Term.(const set_rate $ connect_addr $ Arg.required pool_pos $ Arg.required (client_id ~pos:2) $ Arg.required (rate ~pos:3)), + Term.info "set-rate" ~doc + let show = let doc = "Show information about a service, pool or worker" in Term.(const show $ connect_addr $ Arg.value pool_pos), @@ -204,7 +221,7 @@ let update = Term.(const update $ connect_addr $ Arg.required pool_pos $ worker), Term.info "update" ~doc -let cmds = [add_client; remove_client; list_clients; show; pause; unpause; update] +let cmds = [add_client; remove_client; list_clients; set_rate; show; pause; unpause; update] let default_cmd = let doc = "a command-line admin client for the build-scheduler" in diff --git a/dune-project b/dune-project index 381408ec..1a9fcc90 100644 --- a/dune-project +++ b/dune-project @@ -37,6 +37,7 @@ cohttp-lwt-unix sqlite3 obuilder + psq (mirage-crypto (>= 0.8.5)) (ocaml (>= 4.10.0)) (current_ocluster (and (= :version) :with-test)) diff --git a/ocluster.opam b/ocluster.opam index 2cce4c0a..6ebccf8b 100644 --- a/ocluster.opam +++ b/ocluster.opam @@ -21,6 +21,7 @@ depends: [ "cohttp-lwt-unix" "sqlite3" "obuilder" + "psq" "mirage-crypto" {>= "0.8.5"} "ocaml" {>= "4.10.0"} "current_ocluster" {= version & with-test} diff --git a/scheduler/cluster_scheduler.ml b/scheduler/cluster_scheduler.ml index 0dc7bd12..32e76ccf 100644 --- a/scheduler/cluster_scheduler.ml +++ b/scheduler/cluster_scheduler.ml @@ -7,6 +7,21 @@ let () = Sqlite_loader.Ty.register "ocluster-client" Client let restart_timeout = 600.0 (* Maximum time to wait for a worker to reconnect after it disconnects. *) +module Metrics = struct + open Prometheus + + let namespace = "ocluster" + let subsystem = "scheduler" + + let priority ~urgent = + if urgent then "high" else "low" + + let client_queued_jobs = + let help = "Items currently queued" in + let f = Gauge.v_labels ~label_names:["client"; "pool"; "priority"] ~help ~namespace ~subsystem "client_submitted_jobs" in + fun ~client_id ~pool ~urgent -> Gauge.labels f [client_id; pool; priority ~urgent] +end + module Item = struct type t = { descr : Cluster_api.Queue.job_desc; @@ -32,7 +47,7 @@ module Item = struct end module Pool_api = struct - module Pool = Pool.Make(Item) + module Pool = Pool.Make(Item)(Unix) type t = { pool : Pool.t; @@ -46,13 +61,19 @@ module Pool_api = struct let cond = Lwt_condition.create () in { pool; workers; cond } - let submit t ~urgent ~client_id (descr : Cluster_api.Queue.job_desc) : Cluster_api.Ticket.t = + let submit client ~urgent (descr : Cluster_api.Queue.job_desc) : Cluster_api.Ticket.t = let job, set_job = Capability.promise () in - Log.info (fun f -> f "Received new job request from %S (urgent=%b)" client_id urgent); + Log.info (fun f -> f "Received new job request from %S (urgent=%b)" (Pool.Client.client_id client) urgent); let item = { Item.descr; set_job } in - let ticket = Pool.submit ~urgent t.pool item in + let ticket = Pool.Client.submit ~urgent client item in + let queued_jobs = Metrics.client_queued_jobs ~client_id:(Pool.Client.client_id client) ~pool:(Pool.Client.pool_id client) ~urgent in + Prometheus.Gauge.inc_one queued_jobs; + Lwt.async (fun () -> + Capability.wait_until_settled job >|= fun () -> + Prometheus.Gauge.dec_one queued_jobs + ); let cancel () = - match Pool.cancel ticket with + match Pool.Client.cancel client ticket with | Ok () -> Capability.resolve_exn set_job (Capnp_rpc.Exception.v "Ticket cancelled"); Lwt_result.return () @@ -60,7 +81,7 @@ module Pool_api = struct Cluster_api.Job.cancel job in let release () = - match Pool.cancel ticket with + match Pool.Client.cancel client ticket with | Ok () -> Capability.resolve_exn set_job (Capnp_rpc.Exception.v "Ticket released (cancelled)") | Error `Not_queued -> () in @@ -103,7 +124,7 @@ module Pool_api = struct Capability.inc_ref w; Some w - let admin_service t = + let admin_service ~validate_client t = let show () = Fmt.to_to_string Pool.show t.pool in let workers () = Pool.connected_workers t.pool @@ -142,7 +163,17 @@ module Pool_api = struct in Lwt.pick [ aux (); timeout ] in - Cluster_api.Pool_admin.local ~show ~workers ~worker:(worker t) ~set_active ~update + let set_rate ~client_id rate = + if validate_client client_id then ( + let client = Pool.client t.pool ~client_id in + Pool.Client.set_rate client rate; + Ok () + ) else Error `No_such_user + in + Cluster_api.Pool_admin.local ~show ~workers ~worker:(worker t) ~set_active ~update ~set_rate + + let remove_client t ~client_id = + Pool.remove_client t.pool ~client_id end type t = { @@ -155,16 +186,27 @@ let registration_services t = let pp_pool_name f (name, _) = Fmt.string f name let submission_service ~validate ~sturdy_ref t client_id = + let pools = Hashtbl.create 3 in + let get_client pool_id = + match Hashtbl.find_opt pools pool_id with + | Some c -> Ok c + | None -> + match String.Map.find_opt pool_id t.pools with + | None -> + let msg = Fmt.strf "Pool ID %S not one of @[{%a}@]" pool_id (String.Map.pp ~sep:Fmt.comma pp_pool_name) t.pools in + Error (Capnp_rpc.Exception.v msg) + | Some pool -> + let client = Pool_api.Pool.client pool.pool ~client_id in + Hashtbl.add pools pool_id client; + Ok client + in let submit ~pool ~urgent descr = match validate () with | false -> Capability.broken (Capnp_rpc.Exception.v "Access has been revoked") | true -> - match String.Map.find_opt pool t.pools with - | None -> - let msg = Fmt.strf "Pool ID %S not one of @[{%a}@]" pool (String.Map.pp ~sep:Fmt.comma pp_pool_name) t.pools in - Capability.broken (Capnp_rpc.Exception.v msg) - | Some pool -> - Pool_api.submit ~urgent ~client_id pool descr + match get_client pool with + | Ok client -> Pool_api.submit ~urgent client descr + | Error ex -> Capability.broken ex in Cluster_api.Submission.local ~submit ~sturdy_ref @@ -176,7 +218,13 @@ let admin_service ~loader ~restore t = let pool name = match String.Map.find_opt name t.pools with | None -> Capability.broken (Capnp_rpc.Exception.v "No such pool") - | Some pool_api -> Pool_api.admin_service pool_api + | Some pool_api -> + let validate_client id = + match Sqlite_loader.lookup_by_descr loader (Client, id) with + | [] -> false + | _ -> true + in + Pool_api.admin_service ~validate_client pool_api in let add_client name = let descr = (Client, name) in @@ -193,6 +241,7 @@ let admin_service ~loader ~restore t = match Sqlite_loader.lookup_by_descr loader descr with | [digest] -> Sqlite_loader.remove loader digest; + t.pools |> String.Map.iter (fun _ -> Pool_api.remove_client ~client_id:name); Log.info (fun f -> f "Removed endpoint for client %S" name); Lwt_result.return () | [] -> Lwt_result.fail (`Capnp (Capnp_rpc.Error.exn "Unknown client %S" name)) diff --git a/scheduler/dune b/scheduler/dune index e97b2799..a3240286 100644 --- a/scheduler/dune +++ b/scheduler/dune @@ -1,3 +1,3 @@ (library (name cluster_scheduler) - (libraries ocluster-api logs capnp-rpc-lwt capnp-rpc-net lwt-dllist prometheus db lwt.unix)) + (libraries ocluster-api logs capnp-rpc-lwt capnp-rpc-net lwt-dllist prometheus db lwt.unix psq)) diff --git a/scheduler/pool.ml b/scheduler/pool.ml index 68a3bf35..e13d74f6 100644 --- a/scheduler/pool.ml +++ b/scheduler/pool.ml @@ -35,13 +35,21 @@ module Metrics = struct let workers_paused = let help = "Number of workers set to inactive" in Gauge.v_label ~label_name:"pool" ~help ~namespace ~subsystem "workers_paused" + + let priority ~urgent = + if urgent then "high" else "low" end +module Client_map = Map.Make(String) + module Dao = struct type t = { query_cache : Sqlite3.stmt; mark_cached : Sqlite3.stmt; dump_cache : Sqlite3.stmt; + get_rate : Sqlite3.stmt; + set_rate : Sqlite3.stmt; + remove_user : Sqlite3.stmt; } let dump f (db, pool) = @@ -69,6 +77,14 @@ module Dao = struct let mark_cached t ~pool ~hint ~worker = Db.exec t.mark_cached Sqlite3.Data.[ TEXT pool; TEXT hint; TEXT worker ] + let get_rate t ~pool ~client_id = + Db.query_some t.get_rate Sqlite3.Data.[ TEXT pool; TEXT client_id ] + |> Option.map (function + | Sqlite3.Data.[ FLOAT rate ] -> rate + | row -> Fmt.failwith "Bad row from DB: %a" Db.dump_row row + ) + |> Option.value ~default:1.0 + let init db = Sqlite3.exec db "CREATE TABLE IF NOT EXISTS cached ( \ pool TEXT NOT NULL, \ @@ -76,18 +92,39 @@ module Dao = struct worker TEXT NOT NULL, \ created DATETIME NOT NULL, \ PRIMARY KEY (pool, cache_hint, worker))" |> Db.or_fail ~cmd:"create table"; + Sqlite3.exec db "CREATE TABLE IF NOT EXISTS pool_user_rate ( \ + pool TEXT NOT NULL, \ + user TEXT NOT NULL, \ + rate REAL NOT NULL, \ + PRIMARY KEY (pool, user))" |> Db.or_fail ~cmd:"create pool_user_rate table"; let query_cache = Sqlite3.prepare db "SELECT worker FROM cached WHERE pool = ? AND cache_hint = ? ORDER BY worker" in let mark_cached = Sqlite3.prepare db "INSERT OR REPLACE INTO cached (pool, cache_hint, worker, created) VALUES (?, ?, ?, date('now'))" in let dump_cache = Sqlite3.prepare db "SELECT DISTINCT cache_hint FROM cached WHERE pool = ? ORDER BY cache_hint" in - { query_cache; mark_cached; dump_cache } + let set_rate = Sqlite3.prepare db "INSERT OR REPLACE INTO pool_user_rate (pool, user, rate) VALUES (?, ?, ?)" in + let get_rate = Sqlite3.prepare db "SELECT rate FROM pool_user_rate WHERE pool = ? AND user = ?" in + let remove_user = Sqlite3.prepare db "DELETE FROM pool_user_rate WHERE pool = ? AND user = ?" in + { query_cache; mark_cached; dump_cache; set_rate; get_rate; remove_user } end -module Make (Item : S.ITEM) = struct +let (let<>) x f = + if x = 0 then f () + else x + +let pp_rough_duration f x = + if x < 120.0 then + Fmt.pf f "%.0fs" x + else + Fmt.pf f "%.0fm" (x /. 60.) + +module Make (Item : S.ITEM)(Time : S.TIME) = struct module Worker_map = Astring.String.Map type ticket = { + key : < >; item : Item.t; + fair_start_time : float; urgent : bool; + client_id : string; mutable cost : int; (* Estimated cost (set when added to worker queue) *) mutable cancel : (unit -> unit) option; (* None if not in a queue *) } @@ -102,73 +139,78 @@ module Make (Item : S.ITEM) = struct assert (ticket.cancel <> None); ticket.cancel <- None - let cancel ticket = - match ticket.cancel with - | None -> Error `Not_queued - | Some fn -> - Log.info (fun f -> f "Cancel %a" pp_ticket ticket); - ticket.cancel <- None; - fn (); - Ok () - module Backlog = struct + module Key = struct + type t = < > + + let compare = compare + end + + module Ticket = struct + type t = ticket + + let compare a b = + let<> () = compare b.urgent a.urgent in + compare a.fair_start_time b.fair_start_time + + let pp_ticket ~now f ticket = + Fmt.pf f "%s:%a@@%a" + ticket.client_id + Item.pp ticket.item + pp_rough_duration (ticket.fair_start_time -. now) + + let pp ~now f ticket = + let urgent = if ticket.urgent then "+urgent" else "" in + if ticket.cost >= 0 then + Fmt.pf f "%a(%d%s)" (pp_ticket ~now) ticket ticket.cost urgent + else + Fmt.pf f "%a%s" (pp_ticket ~now) ticket urgent + end + + module Q = Psq.Make(Key)(Ticket) + type t = { queue : string; (* For metric reports *) - high : ticket Lwt_dllist.t; - low : ticket Lwt_dllist.t; + mutable psq : Q.t; } - let choose_queue ~ticket ~pool t = - let queue, priority = - match ticket.urgent with - | true -> t.high, "high" - | false -> t.low, "low" - in - queue, Prometheus.Gauge.labels Metrics.queue_length [t.queue; pool; priority] - - let cancel ?(on_cancel=ignore) ~metric ~pool node () = - Lwt_dllist.remove node; + let cancel t ?(on_cancel=ignore) ~metric ~pool key () = + t.psq <- Q.remove key t.psq; Prometheus.Counter.inc_one (Metrics.jobs_cancelled pool); Prometheus.Gauge.dec_one metric; on_cancel () let enqueue ?on_cancel ~pool ticket t = - let queue, metric = choose_queue ~ticket ~pool t in - let node = Lwt_dllist.add_l ticket queue in - set_cancel ticket (cancel ?on_cancel ~metric ~pool node); - Prometheus.Gauge.inc_one metric - - let push_back ~pool ticket t = - let queue, metric = choose_queue ~ticket ~pool t in - let node = Lwt_dllist.add_r ticket queue in - set_cancel ticket (cancel ~metric ~pool node); + let metric = Prometheus.Gauge.labels Metrics.queue_length [t.queue; pool; Metrics.priority ~urgent:ticket.urgent] in + t.psq <- Q.add ticket.key ticket t.psq; + set_cancel ticket (cancel t ?on_cancel ~metric ~pool ticket.key); Prometheus.Gauge.inc_one metric let dequeue_opt ~pool t = - let take ticket = + match Q.pop t.psq with + | None -> None + | Some ((_key, ticket), q) -> + t.psq <- q; let priority = if ticket.urgent then "high" else "low" in Prometheus.Gauge.dec_one (Prometheus.Gauge.labels Metrics.queue_length [t.queue; pool; priority]); clear_cancel ticket; Some ticket - in - match Lwt_dllist.take_opt_r t.high with - | Some ticket -> take ticket - | None -> - match Lwt_dllist.take_opt_r t.low with - | Some ticket -> take ticket - | None -> None - let length t = - Lwt_dllist.length t.low + Lwt_dllist.length t.high + let length t = Q.size t.psq let is_empty t = length t = 0 + let dump f t = + let items = Q.to_priority_list t.psq |> List.rev in + let now = Time.gettimeofday () in + Fmt.pf f "[@[%a@]]" + (Fmt.(list ~sep:sp) (Fmt.using snd (Ticket.pp ~now))) items + let create ~queue () = { queue; - low = Lwt_dllist.create (); - high = Lwt_dllist.create (); + psq = Q.empty; } end @@ -180,7 +222,9 @@ module Make (Item : S.ITEM) = struct | `Ready of worker Lwt_dllist.t (* No work is available. *) ]; mutable workers : worker Worker_map.t; (* Connected workers *) + mutable clients : client_info Client_map.t; mutable cluster_capacity : float; + mutable pending_cached : (Item.cache_hint, int) Hashtbl.t; } and worker = { parent : t; name : string; @@ -190,6 +234,10 @@ module Make (Item : S.ITEM) = struct | `Finished ]; mutable workload : int; (* Total cost of items in worker's queue. *) mutable shutdown : bool; (* Worker is paused because it is shutting down. *) + } and client_info = { + id : string; + mutable next_fair_start_time : float; + mutable finished : bool; } let enqueue_node item queue metric = @@ -210,6 +258,16 @@ module Make (Item : S.ITEM) = struct Option.iter (fun ticket -> worker.workload <- worker.workload - ticket.cost) ticket; ticket + let dec_pending_count t ticket = + let hint = Item.cache_hint ticket.item in + if (hint :> string) <> "" then ( + let pending_count = Hashtbl.find t.pending_cached hint in + if pending_count > 1 then + Hashtbl.replace t.pending_cached hint (pending_count - 1) + else + Hashtbl.remove t.pending_cached hint + ) + (* Return the worker in [workers] with the lowest workload. *) let best_worker ~max_workload t workers = let rec aux ~best = function @@ -270,6 +328,7 @@ module Make (Item : S.ITEM) = struct Log.info (fun f -> f "%S takes %a from its local queue" worker.name Item.pp item); mark_cached ticket.item worker; Prometheus.Counter.inc_one (Metrics.jobs_accepted t.pool); + dec_pending_count t ticket; Lwt_result.return ticket.item | None -> (* Try the global queue instead. *) @@ -296,6 +355,7 @@ module Make (Item : S.ITEM) = struct Log.info (fun f -> f "%S takes %a from the main queue" worker.name Item.pp item); mark_cached item worker; Prometheus.Counter.inc_one (Metrics.jobs_accepted t.pool); + dec_pending_count t ticket; Lwt_result.return item ) in @@ -303,21 +363,14 @@ module Make (Item : S.ITEM) = struct (* Worker is leaving and system is backlogged. Move the worker's items to the backlog. *) let rec push_back worker worker_q q = - let ticket = - match Lwt_dllist.take_opt_l worker_q.Backlog.low with - | Some x -> Some x - | None -> Lwt_dllist.take_opt_l worker_q.Backlog.high - in - match ticket with + match Backlog.dequeue_opt ~pool:worker.parent.pool worker_q with + | None -> () | Some ticket -> Log.info (fun f -> f "Pushing %a back on to the main queue" pp_ticket ticket); - let priority = if ticket.urgent then "high" else "low" in - Prometheus.Gauge.dec_one @@ Prometheus.Gauge.labels Metrics.queue_length [worker_q.queue; worker.parent.pool; priority]; worker.workload <- worker.workload - ticket.cost; - clear_cancel ticket; - Backlog.push_back ~pool:worker.parent.pool ticket q; + ticket.cost <- -1; + Backlog.enqueue ~pool:worker.parent.pool ticket q; push_back worker worker_q q - | None -> () let register t ~name ~capacity = if Worker_map.mem name t.workers then Error `Name_taken @@ -366,17 +419,12 @@ module Make (Item : S.ITEM) = struct Lwt_condition.broadcast cond () ) - let submit ~urgent t item = - Prometheus.Counter.inc_one (Metrics.jobs_submitted t.pool); - let ticket = { item; urgent; cancel = None; cost = -1 } in - add t ticket; - ticket - let add_items t worker_q worker = let rec aux () = match dequeue_opt worker_q worker with | None -> () | Some ticket -> + ticket.cost <- -1; add t ticket; aux () in @@ -451,8 +499,10 @@ module Make (Item : S.ITEM) = struct pool = name; db; main = `Backlog (Backlog.create ~queue:Metrics.incoming_queue ()); + clients = Client_map.empty; workers = Worker_map.empty; cluster_capacity = 0.0; + pending_cached = Hashtbl.create 1024; } let dump_queue ?(sep=Fmt.sp) pp f q = @@ -467,18 +517,11 @@ module Make (Item : S.ITEM) = struct let pp_worker f worker = Fmt.string f worker.name - let pp_cost_item f ticket = - let urgent = if ticket.urgent then "+urgent" else "" in - Fmt.pf f "%a(%d%s)" pp_ticket ticket ticket.cost urgent - let pp_state f = function | { state = `Finished; _ } -> Fmt.string f "(finished)" | { shutdown = true; _ } -> Fmt.string f "(shutting down)" | { state = `Inactive _; _ } -> Fmt.string f "(inactive)" - | { state = `Running (q, _); _} -> - Fmt.pf f "%a : %a" - (dump_queue pp_cost_item) q.low - (dump_queue pp_cost_item) q.high + | { state = `Running (q, _); _} -> Backlog.dump f q let dump_workers f x = let pp_item f (id, w) = @@ -488,22 +531,118 @@ module Make (Item : S.ITEM) = struct let dump_main f = function | `Backlog (q : Backlog.t) -> - Fmt.pf f "(backlog) %a : %a" - (dump_queue pp_ticket) q.low - (dump_queue pp_ticket) q.high + Fmt.pf f "(backlog) %a" Backlog.dump q | `Ready q -> Fmt.pf f "(ready) %a" (dump_queue pp_worker) q - let show f {pool = _; db = _; main; workers; cluster_capacity } = - Fmt.pf f "@[capacity: %.0f@,queue: @[%a@]@,@[registered:%a@]@]@." - cluster_capacity - dump_main main - dump_workers workers + let dump_client t ~now f (_, { id; next_fair_start_time; finished }) = + let delay = next_fair_start_time -. now in + if finished then Fmt.pf f "%s:FINISHED" id + else ( + let rate = Dao.get_rate ~pool:t.pool ~client_id:id t.db in + let pp_delay f x = + if delay > 0.0 then Fmt.pf f "+%a" pp_rough_duration x + in + Fmt.pf f "%s(%.0f)%a" id rate pp_delay delay + ) + + let dump_clients t f clients = + let now = Time.gettimeofday () in + Fmt.(seq ~sep:sp (dump_client t ~now)) f (Client_map.to_seq clients) - let dump f {pool; db; main; workers; cluster_capacity } = - Fmt.pf f "@[capacity: %.0f@,queue: @[%a@]@,@[registered:%a@]@,cached: @[%a@]@]@." + let pp_common f ({pool = _; db = _; main; clients; workers; cluster_capacity; pending_cached = _} as t) = + Fmt.pf f "capacity: %.0f@,queue: @[%a@]@,@[registered:%a@]@,clients: @[%a@]" cluster_capacity dump_main main dump_workers workers - Dao.dump (db, pool) + (dump_clients t) clients + + let show f t = + Fmt.pf f "@[%a@]@." pp_common t + + let dump f t = + Fmt.pf f "@[%a@,cached: @[%a@]@]@." + pp_common t + Dao.dump (t.db, t.pool) + + module Client = struct + type nonrec t = { + parent : t; + info : client_info; + } + + let set_rate (t:t) rate = + let pool = t.parent in + assert (rate > 0.0); + Db.exec pool.db.set_rate Sqlite3.Data.[ TEXT pool.pool; TEXT t.info.id; FLOAT rate ] + + let get_rate (t:t) = + let pool = t.parent in + Dao.get_rate ~pool:pool.pool ~client_id:t.info.id pool.db + + let schedule t cost = + let rate = get_rate t in + let start = max (Time.gettimeofday ()) t.info.next_fair_start_time in + t.info.next_fair_start_time <- start +. (cost /. rate); + start + + let submit ~urgent (t:t) item = + assert (not t.info.finished); + let pool = t.parent in + let cost = + let costs = Item.cost_estimate item in + let hint = Item.cache_hint item in + if (hint :> string) = "" then costs.non_cached + else ( + let pending_count = Hashtbl.find_opt pool.pending_cached hint |> Option.value ~default:0 in + Hashtbl.replace pool.pending_cached hint (pending_count + 1); + if pending_count > 0 || Dao.query_cache pool.db ~pool:pool.pool ~hint:(hint :> string) <> [] then costs.cached + else costs.non_cached + ) + in + let fair_start_time = schedule t (float cost) in + Prometheus.Counter.inc_one (Metrics.jobs_submitted pool.pool); + let key = object end in + let ticket = { key; item; client_id = t.info.id; fair_start_time; urgent; cancel = None; cost = -1 } in + add pool ticket; + ticket + + let cancel (t:t) ticket = + assert (not t.info.finished); + match ticket.cancel with + | None -> Error `Not_queued + | Some fn -> + Log.info (fun f -> f "Cancel %a" pp_ticket ticket); + dec_pending_count t.parent ticket; + ticket.cancel <- None; + fn (); + Ok () + + let client_id t = t.info.id + let pool_id t = t.parent.pool + + let v parent info = { parent; info } + end + + let client t ~client_id = + let info = + match Client_map.find_opt client_id t.clients with + | Some c -> c + | None -> + let info = { + id = client_id; + next_fair_start_time = Time.gettimeofday (); + finished = false; + } in + t.clients <- Client_map.add client_id info t.clients; + info + in + Client.v t info + + let remove_client t ~client_id = + Db.exec t.db.remove_user Sqlite3.Data.[ TEXT t.pool; TEXT client_id ]; + Client_map.find_opt client_id t.clients + |> Option.iter @@ fun client -> + client.finished <- true; + t.clients <- Client_map.remove client_id t.clients end diff --git a/scheduler/pool.mli b/scheduler/pool.mli index 97b9f6ff..517b7d7f 100644 --- a/scheduler/pool.mli +++ b/scheduler/pool.mli @@ -5,7 +5,7 @@ module Dao : sig (** Ensure the required tables are created. *) end -module Make (Item : S.ITEM) : sig +module Make (Item : S.ITEM) (Time : S.TIME) : sig type t (** A pool of workers and queued jobs. *) @@ -15,6 +15,30 @@ module Make (Item : S.ITEM) : sig type worker (** A connected worker. *) + module Client : sig + type t + (** A connected client. *) + + val submit : urgent:bool -> t -> Item.t -> ticket + (** [submit ~urgent t item] adds [item] to the incoming queue. + [urgent] items will be processed before non-urgent ones. *) + + val cancel : t -> ticket -> (unit, [> `Not_queued ]) result + (** [cancel t ticket] discards the item from the queue. *) + + val set_rate : t -> float -> unit + (** [set_rate t rate] sets the maximum number of jobs that the client can expect + to run at once. Clients can submit more jobs than this, and make use of any + spare capacity. However, this will determine what happens when multiple clients + want to use the extra capacity. *) + + val get_rate : t -> float + (** [get_rate t] is the rate previously set by [set_rate] (or [1.0] if never set). *) + + val client_id : t -> string + val pool_id : t -> string + end + val create : name:string -> db:Dao.t -> t (** [create ~name ~db] is a pool that reports metrics tagged with [name] and stores cache information in [db]. *) @@ -23,12 +47,15 @@ module Make (Item : S.ITEM) : sig (** [register t ~name ~capacity] returns a queue for worker [name]. @param capacity Worker's capacity (max number of parallel jobs). *) - val submit : urgent:bool -> t -> Item.t -> ticket - (** [submit ~urgent t item] adds [item] to the incoming queue. - [urgent] items will be processed before non-urgent ones. *) + val client : t -> client_id:string -> Client.t + (** [client t ~client_id] is a client value, which can be used to submit jobs. + These jobs will be scheduled alongside the jobs of other clients, so that + one client does not starve the others. + @param [client_id] Used for logging and reporting. *) - val cancel : ticket -> (unit, [> `Not_queued ]) result - (** [cancel ticket] discards the item from the queue. *) + val remove_client : t -> client_id:string -> unit + (** [remove_client t ~client_id] deletes all information about [client_id], if any. + Call this on all pools when deleting a user. *) val pop : worker -> (Item.t, [> `Finished]) Lwt_result.t (** [pop worker] gets the next item for [worker]. *) diff --git a/scheduler/s.ml b/scheduler/s.ml index a4095f3c..59cdff4d 100644 --- a/scheduler/s.ml +++ b/scheduler/s.ml @@ -16,3 +16,7 @@ module type ITEM = sig val pp : t Fmt.t (** For debugging. *) end + +module type TIME = sig + val gettimeofday : unit -> float +end diff --git a/test/test_scheduling.ml b/test/test_scheduling.ml index 387d7746..6d99abf5 100644 --- a/test/test_scheduling.ml +++ b/test/test_scheduling.ml @@ -10,14 +10,23 @@ module Item = struct let cache_hint t = t.cache_hint - let cost_estimate _ = Cluster_scheduler.S.{ cached = 1; non_cached = 5 } + let cost_estimate _ = Cluster_scheduler.S.{ cached = 2; non_cached = 10 } let pp f t = Fmt.string f t.job end let job ?(cache_hint="") job = { Item.job; cache_hint } -module Pool = Cluster_scheduler.Pool.Make(Item) +module Fake_time = struct + let now = ref 1.0 + + let gettimeofday () = !now + + let advance x = + now := !now +. float x +end + +module Pool = Cluster_scheduler.Pool.Make(Item)(Fake_time) let job_state x = match Lwt.state x with @@ -45,8 +54,8 @@ let with_test_db fn = (fun () -> fn (Cluster_scheduler.Pool.Dao.init db)) (fun () -> if Sqlite3.db_close db then Lwt.return_unit else failwith "close: DB busy!") -let submit ~urgent pool job = - let (_ : Pool.ticket) = Pool.submit ~urgent pool job in +let submit ~urgent client job = + let (_ : Pool.ticket) = Pool.Client.submit ~urgent client job in () (* Assign three jobs to two workers. *) @@ -55,13 +64,14 @@ let simple () = let pool = Pool.create ~db ~name:"simple" in let w1 = Pool.register pool ~name:"worker-1" ~capacity:1 |> Result.get_ok in let w2 = Pool.register pool ~name:"worker-2" ~capacity:1 |> Result.get_ok in + let user = Pool.client pool ~client_id:"u1" in Pool.set_active w1 true; Pool.set_active w2 true; let w1a = Pool.pop w1 in let w2a = Pool.pop w2 in - submit pool ~urgent:false @@ job "job1"; - submit pool ~urgent:false @@ job "job2"; - submit pool ~urgent:false @@ job "job3"; + submit user ~urgent:false @@ job "job1"; + submit user ~urgent:false @@ job "job2"; + submit user ~urgent:false @@ job "job3"; Lwt.pause () >>= fun () -> Alcotest.(check pop_result) "Worker 1 / job 1" (Ok "job1") (job_state w1a); Alcotest.(check pop_result) "Worker 2 / job 1" (Ok "job2") (job_state w2a); @@ -82,28 +92,33 @@ let cached_scheduling () = capacity: 2\n\ queue: (ready) [worker-2 worker-1]\n\ registered:\n\ - \ worker-1 (0): [] : []\n\ - \ worker-2 (0): [] : []\n\ + \ worker-1 (0): []\n\ + \ worker-2 (0): []\n\ + clients: \n\ cached: \n" (Fmt.to_to_string Pool.dump pool); - submit pool ~urgent:false @@ job "job1" ~cache_hint:"a"; - submit pool ~urgent:false @@ job "job2" ~cache_hint:"b"; - submit pool ~urgent:false @@ job "job3" ~cache_hint:"a"; - submit pool ~urgent:false @@ job "job4" ~cache_hint:"a"; - submit pool ~urgent:false @@ job "job5" ~cache_hint:"c"; + let user = Pool.client pool ~client_id:"u1" in + Pool.Client.set_rate user 2.0; + submit user ~urgent:false @@ job "job1" ~cache_hint:"a"; + submit user ~urgent:false @@ job "job2" ~cache_hint:"b"; + submit user ~urgent:false @@ job "job3" ~cache_hint:"a"; + submit user ~urgent:false @@ job "job4" ~cache_hint:"a"; + submit user ~urgent:false @@ job "job5" ~cache_hint:"c"; Alcotest.(check string) "Jobs queued" "\ capacity: 2\n\ - queue: (backlog) [job5 job4 job3] : []\n\ + queue: (backlog) [u1:job5@12s u1:job4@11s u1:job3@10s]\n\ registered:\n\ - \ worker-1 (5): [job1(5)] : []\n\ - \ worker-2 (5): [job2(5)] : []\n\ + \ worker-1 (10): [u1:job1@0s(10)]\n\ + \ worker-2 (10): [u1:job2@5s(10)]\n\ + clients: u1(2)+17s\n\ cached: a: [worker-1], b: [worker-2]\n" (Fmt.to_to_string Pool.dump pool); Lwt.pause () >>= fun () -> Alcotest.(check string) "Jobs started" "\ capacity: 2\n\ - queue: (backlog) [job5 job4 job3] : []\n\ + queue: (backlog) [u1:job5@12s u1:job4@11s u1:job3@10s]\n\ registered:\n\ - \ worker-1 (0): [] : []\n\ - \ worker-2 (0): [] : []\n\ + \ worker-1 (0): []\n\ + \ worker-2 (0): []\n\ + clients: u1(2)+17s\n\ cached: a: [worker-1], b: [worker-2]\n" (Fmt.to_to_string Pool.dump pool); Alcotest.(check pop_result) "Worker 1 / job 1" (Ok "job1") (job_state w1a); Alcotest.(check pop_result) "Worker 2 / job 1" (Ok "job2") (job_state w2a); @@ -111,10 +126,11 @@ let cached_scheduling () = let w2b = Pool.pop w2 in Alcotest.(check string) "Jobs 3 and 4 assigned to worker-1" "\ capacity: 2\n\ - queue: (backlog) [] : []\n\ + queue: (backlog) []\n\ registered:\n\ - \ worker-1 (2): [job4(1) job3(1)] : []\n\ - \ worker-2 (0): [] : []\n\ + \ worker-1 (4): [u1:job4@11s(2) u1:job3@10s(2)]\n\ + \ worker-2 (0): []\n\ + clients: u1(2)+17s\n\ cached: a: [worker-1], b: [worker-2], c: [worker-2]\n" (Fmt.to_to_string Pool.dump pool); Alcotest.(check pop_result) "Worker 2 / job 2" (Ok "job5") (job_state w2b); (* Worker 1 leaves. Its two queued jobs get reassigned. *) @@ -125,18 +141,21 @@ let cached_scheduling () = Lwt.pause () >>= fun () -> Alcotest.(check string) "Worker-1's jobs reassigned" "\ capacity: 1\n\ - queue: (backlog) [job4] : []\n\ + queue: (backlog) [u1:job4@11s]\n\ registered:\n\ - \ worker-2 (0): [] : []\n\ + \ worker-2 (0): []\n\ + clients: u1(2)+17s\n\ cached: a: [worker-1; worker-2], b: [worker-2], c: [worker-2]\n" (Fmt.to_to_string Pool.dump pool); Alcotest.(check pop_result) "Worker 2 / job 3" (Ok "job3") (job_state w2c); let w2d = Pool.pop w2 in Alcotest.(check pop_result) "Worker 2 / job 4" (Ok "job4") (job_state w2d); + Fake_time.advance 20; Pool.release w2; Alcotest.(check string) "Idle" "\ capacity: 0\n\ - queue: (backlog) [] : []\n\ + queue: (backlog) []\n\ registered:\n\ + clients: u1(2)\n\ cached: a: [worker-1; worker-2], b: [worker-2], c: [worker-2]\n" (Fmt.to_to_string Pool.dump pool); Lwt.return_unit @@ -148,23 +167,27 @@ let unbalanced () = let w2 = Pool.register pool ~name:"worker-2" ~capacity:1 |> Result.get_ok in Pool.set_active w1 true; Pool.set_active w2 true; - submit pool ~urgent:false @@ job "job1" ~cache_hint:"a"; - submit pool ~urgent:false @@ job "job2" ~cache_hint:"a"; - submit pool ~urgent:false @@ job "job3" ~cache_hint:"a"; - submit pool ~urgent:false @@ job "job4" ~cache_hint:"a"; - submit pool ~urgent:false @@ job "job5" ~cache_hint:"a"; - submit pool ~urgent:false @@ job "job6" ~cache_hint:"a"; - submit pool ~urgent:false @@ job "job7" ~cache_hint:"a"; - submit pool ~urgent:false @@ job "job8" ~cache_hint:"a"; + let user = Pool.client pool ~client_id:"u1" in + Pool.Client.set_rate user 2.0; + submit user ~urgent:false @@ job "job1" ~cache_hint:"a"; + submit user ~urgent:false @@ job "job2" ~cache_hint:"a"; + submit user ~urgent:false @@ job "job3" ~cache_hint:"a"; + submit user ~urgent:false @@ job "job4" ~cache_hint:"a"; + submit user ~urgent:false @@ job "job5" ~cache_hint:"a"; + submit user ~urgent:false @@ job "job6" ~cache_hint:"a"; + submit user ~urgent:false @@ job "job7" ~cache_hint:"a"; + submit user ~urgent:false @@ job "job8" ~cache_hint:"a"; let _ = Pool.pop w1 in let w2a = Pool.pop w2 in Lwt.pause () >>= fun () -> Alcotest.(check string) "Worker-2 got jobs eventually" "\ capacity: 2\n\ - queue: (backlog) [] : []\n\ + queue: (backlog) []\n\ registered:\n\ - \ worker-1 (6): [job7(1) job6(1) job5(1) job4(1) job3(1) job2(1)] : []\n\ - \ worker-2 (0): [] : []\n\ + \ worker-1 (12): [u1:job7@10s(2) u1:job6@9s(2) u1:job5@8s(2) u1:job4@7s(2)\n\ + \ u1:job3@6s(2) u1:job2@5s(2)]\n\ + \ worker-2 (0): []\n\ + clients: u1(2)+12s\n\ cached: a: [worker-1; worker-2]\n" (Fmt.to_to_string Pool.dump pool); Alcotest.(check pop_result) "Worker 2 / job 1" (Ok "job8") (job_state w2a); Pool.release w1; @@ -174,8 +197,9 @@ let unbalanced () = let no_workers () = with_test_db @@ fun db -> let pool = Pool.create ~db ~name:"no_workers" in - submit pool ~urgent:false @@ job "job1" ~cache_hint:"a"; - submit pool ~urgent:false @@ job "job2" ~cache_hint:"a"; + let user = Pool.client pool ~client_id:"u1" in + submit user ~urgent:false @@ job "job1" ~cache_hint:"a"; + submit user ~urgent:false @@ job "job2" ~cache_hint:"a"; let w1 = Pool.register pool ~name:"worker-1" ~capacity:1 |> Result.get_ok in Pool.set_active w1 true; let _ = Pool.pop w1 in @@ -183,8 +207,9 @@ let no_workers () = Lwt.pause () >>= fun () -> Alcotest.(check string) "Worker-1 gone" "\ capacity: 0\n\ - queue: (backlog) [job2] : []\n\ + queue: (backlog) [u1:job2@10s]\n\ registered:\n\ + clients: u1(1)+12s\n\ cached: a: [worker-1]\n" (Fmt.to_to_string Pool.dump pool); let w1 = Pool.register pool ~name:"worker-1" ~capacity:1 |> Result.get_ok in Pool.set_active w1 true; @@ -193,15 +218,19 @@ let no_workers () = (* We remember cached locations across restarts. *) let persist () = with_test_db @@ fun db -> - let pool = Pool.create ~db ~name:"persist" in - (* worker-1 handles job1 *) - let w1 = Pool.register pool ~name:"worker-1" ~capacity:1 |> Result.get_ok in - Pool.set_active w1 true; - submit pool ~urgent:false @@ job "job1" ~cache_hint:"a"; - let _ = Pool.pop w1 in - Pool.release w1; + begin + let pool = Pool.create ~db ~name:"persist" in + (* worker-1 handles job1 *) + let w1 = Pool.register pool ~name:"worker-1" ~capacity:1 |> Result.get_ok in + Pool.set_active w1 true; + let user = Pool.client pool ~client_id:"u1" in + submit user ~urgent:false @@ job "job1" ~cache_hint:"a"; + let _ = Pool.pop w1 in + Pool.release w1; + end; (* Create a new instance of the scheduler with the same db. *) let pool = Pool.create ~db ~name:"persist" in + let user = Pool.client pool ~client_id:"u1" in (* Worker 2 registers first, and so would normally get the first job: *) let w2 = Pool.register pool ~name:"worker-2" ~capacity:1 |> Result.get_ok in Pool.set_active w2 true; @@ -209,7 +238,7 @@ let persist () = let w1 = Pool.register pool ~name:"worker-1" ~capacity:1 |> Result.get_ok in Pool.set_active w1 true; let w1a = Pool.pop w1 in - submit pool ~urgent:false @@ job "job2" ~cache_hint:"a"; + submit user ~urgent:false @@ job "job2" ~cache_hint:"a"; Lwt.pause () >>= fun () -> Alcotest.(check pop_result) "Worker 1 gets the job" (Ok "job2") (job_state w1a); Alcotest.(check pop_result) "Worker 2 doesn't" (Error "pending") (job_state w2a); @@ -222,10 +251,11 @@ let persist () = let urgent () = with_test_db @@ fun db -> let pool = Pool.create ~db ~name:"urgent" in - submit pool ~urgent:false @@ job "job1" ~cache_hint:"a"; - submit pool ~urgent:true @@ job "job2" ~cache_hint:"a"; - submit pool ~urgent:true @@ job "job3" ~cache_hint:"a"; - submit pool ~urgent:false @@ job "job4" ~cache_hint:"b"; + let user = Pool.client pool ~client_id:"u1" in + submit user ~urgent:false @@ job "job1" ~cache_hint:"a"; + submit user ~urgent:true @@ job "job2" ~cache_hint:"a"; + submit user ~urgent:true @@ job "job3" ~cache_hint:"a"; + submit user ~urgent:false @@ job "job4" ~cache_hint:"b"; let w1 = Pool.register pool ~name:"worker-1" ~capacity:1 |> Result.get_ok in Pool.set_active w1 true; let w1a = Pool.pop w1 in @@ -239,21 +269,23 @@ let urgent () = Pool.release w1; Alcotest.(check string) "Worker-1 gone" "\ capacity: 1\n\ - queue: (backlog) [job1] : [job3]\n\ + queue: (backlog) [u1:job1@0s u1:job3@12s+urgent]\n\ registered:\n\ - \ worker-2 (0): [] : []\n\ + \ worker-2 (0): []\n\ + clients: u1(1)+24s\n\ cached: a: [worker-1], b: [worker-2]\n" (Fmt.to_to_string Pool.dump pool); (* Urgent job 5 goes ahead of non-urgent job 1, but behind the existing urgent job 3. *) - submit pool ~urgent:true @@ job "job5" ~cache_hint:"b"; + submit user ~urgent:true @@ job "job5" ~cache_hint:"b"; flush_queue w2 ~expect:["job3"; "job5"; "job1"] (* Urgent jobs go first on worker queues too. *) let urgent_worker () = with_test_db @@ fun db -> let pool = Pool.create ~db ~name:"urgent-worker" in - submit pool ~urgent:true @@ job "job1" ~cache_hint:"a"; - submit pool ~urgent:false @@ job "job2" ~cache_hint:"a"; - submit pool ~urgent:false @@ job "job3" ~cache_hint:"b"; + let user = Pool.client pool ~client_id:"u1" in + submit user ~urgent:true @@ job "job1" ~cache_hint:"a"; + submit user ~urgent:false @@ job "job2" ~cache_hint:"a"; + submit user ~urgent:false @@ job "job3" ~cache_hint:"b"; let w1 = Pool.register pool ~name:"worker-1" ~capacity:1 |> Result.get_ok in Pool.set_active w1 true; let w1a = Pool.pop w1 in @@ -266,22 +298,25 @@ let urgent_worker () = Alcotest.(check pop_result) "Worker 2 / job 1" (Ok "job3") (job_state w2a); Alcotest.(check string) "Worker-1 queue has job 2 queued" "\ capacity: 2\n\ - queue: (backlog) [] : []\n\ + queue: (backlog) []\n\ registered:\n\ - \ worker-1 (1): [job2(1)] : []\n\ - \ worker-2 (0): [] : []\n\ + \ worker-1 (2): [u1:job2@10s(2)]\n\ + \ worker-2 (0): []\n\ + clients: u1(1)+22s\n\ cached: a: [worker-1], b: [worker-2]\n" (Fmt.to_to_string Pool.dump pool); - submit pool ~urgent:true @@ job "job4" ~cache_hint:"a"; - submit pool ~urgent:false @@ job "job5" ~cache_hint:"b"; - submit pool ~urgent:true @@ job "job6" ~cache_hint:"a"; + Pool.Client.set_rate user 2.0; + submit user ~urgent:true @@ job "job4" ~cache_hint:"a"; + submit user ~urgent:false @@ job "job5" ~cache_hint:"b"; + submit user ~urgent:true @@ job "job6" ~cache_hint:"a"; let w2b = Pool.pop w2 in Alcotest.(check pop_result) "Worker 2 / job 2" (Ok "job5") (job_state w2b); Alcotest.(check string) "Worker-1 gets job4 first" "\ capacity: 2\n\ - queue: (backlog) [] : []\n\ + queue: (backlog) []\n\ registered:\n\ - \ worker-1 (3): [job2(1)] : [job6(1+urgent) job4(1+urgent)]\n\ - \ worker-2 (0): [] : []\n\ + \ worker-1 (6): [u1:job2@10s(2) u1:job6@24s(2+urgent) u1:job4@22s(2+urgent)]\n\ + \ worker-2 (0): []\n\ + clients: u1(2)+25s\n\ cached: a: [worker-1], b: [worker-2]\n" (Fmt.to_to_string Pool.dump pool); flush_queue w1 ~expect:["job4"; "job6"; "job2"] @@ -292,8 +327,9 @@ let inactive () = let w1 = Pool.register pool ~name:"worker-1" ~capacity:1 |> Result.get_ok in Pool.set_active w1 true; let w1a = Pool.pop w1 in - submit pool ~urgent:false @@ job "job1" ~cache_hint:"a"; - submit pool ~urgent:false @@ job "job2" ~cache_hint:"a"; + let user = Pool.client pool ~client_id:"u1" in + submit user ~urgent:false @@ job "job1" ~cache_hint:"a"; + submit user ~urgent:false @@ job "job2" ~cache_hint:"a"; let w2 = Pool.register pool ~name:"worker-2" ~capacity:1 |> Result.get_ok in Pool.set_active w2 true; let w2a = Pool.pop w2 in @@ -302,8 +338,9 @@ let inactive () = capacity: 2\n\ queue: (ready) [worker-2]\n\ registered:\n\ - \ worker-1 (1): [job2(1)] : []\n\ - \ worker-2 (0): [] : []\n\ + \ worker-1 (2): [u1:job2@10s(2)]\n\ + \ worker-2 (0): []\n\ + clients: u1(1)+12s\n\ cached: a: [worker-1]\n" (Fmt.to_to_string Pool.dump pool); (* Deactivate worker-1. Its job is reassigned. *) Pool.set_active w1 false; @@ -312,20 +349,23 @@ let inactive () = queue: (ready) []\n\ registered:\n\ \ worker-1 (0): (inactive)\n\ - \ worker-2 (5): [job2(5)] : []\n\ + \ worker-2 (10): [u1:job2@10s(10)]\n\ + clients: u1(1)+12s\n\ cached: a: [worker-1; worker-2]\n" (Fmt.to_to_string Pool.dump pool); Lwt.pause () >>= fun () -> Alcotest.(check pop_result) "Worker 1 / job 1" (Ok "job1") (job_state w1a); Alcotest.(check pop_result) "Worker 2 / job 2" (Ok "job2") (job_state w2a); - submit pool ~urgent:false @@ job "job3" ~cache_hint:"a"; + Pool.Client.set_rate user 2.0; + submit user ~urgent:false @@ job "job3" ~cache_hint:"a"; (* Deactivate worker-2. *) Pool.set_active w2 false; Alcotest.(check string) "Job unassigned" "\ capacity: 2\n\ - queue: (backlog) [job3] : []\n\ + queue: (backlog) [u1:job3@12s]\n\ registered:\n\ \ worker-1 (0): (inactive)\n\ \ worker-2 (0): (inactive)\n\ + clients: u1(2)+13s\n\ cached: a: [worker-1; worker-2]\n" (Fmt.to_to_string Pool.dump pool); Pool.set_active w2 true; Pool.release w1; @@ -341,41 +381,47 @@ let cancel_worker_queue () = Pool.set_active w2 true; let w1a = Pool.pop w1 in let w2a = Pool.pop w2 in - submit pool ~urgent:false @@ job "job1" ~cache_hint:"a"; + let user = Pool.client pool ~client_id:"u1" in + Pool.Client.set_rate user 2.0; + submit user ~urgent:false @@ job "job1" ~cache_hint:"a"; Lwt.pause () >>= fun () -> - let j2 = Pool.submit pool ~urgent:false @@ job "job2" ~cache_hint:"a" in - let j3 = Pool.submit pool ~urgent:false @@ job "job3" ~cache_hint:"a" in - let j4 = Pool.submit pool ~urgent:false @@ job "job4" ~cache_hint:"b" in - Pool.cancel j4 |> Alcotest.(check (result pass reject)) "job4 cancelled" (Ok ()); + let j2 = Pool.Client.submit user ~urgent:false @@ job "job2" ~cache_hint:"a" in + let j3 = Pool.Client.submit user ~urgent:false @@ job "job3" ~cache_hint:"a" in + let j4 = Pool.Client.submit user ~urgent:false @@ job "job4" ~cache_hint:"b" in + Pool.Client.cancel user j4 |> Alcotest.(check (result pass reject)) "job4 cancelled" (Ok ()); Alcotest.(check string) "Jobs assigned" "\ capacity: 2\n\ queue: (ready) []\n\ registered:\n\ - \ worker-1 (2): [job3(1) job2(1)] : []\n\ - \ worker-2 (0): [] : []\n\ + \ worker-1 (4): [u1:job3@6s(2) u1:job2@5s(2)]\n\ + \ worker-2 (0): []\n\ + clients: u1(2)+12s\n\ cached: a: [worker-1], b: [worker-2]\n" (Fmt.to_to_string Pool.dump pool); - Pool.cancel j2 |> Alcotest.(check (result pass reject)) "job2 cancelled" (Ok ()); + Pool.Client.cancel user j2 |> Alcotest.(check (result pass reject)) "job2 cancelled" (Ok ()); Alcotest.(check string) "Job2 cancelled" "\ capacity: 2\n\ queue: (ready) []\n\ registered:\n\ - \ worker-1 (1): [job3(1)] : []\n\ - \ worker-2 (0): [] : []\n\ + \ worker-1 (2): [u1:job3@6s(2)]\n\ + \ worker-2 (0): []\n\ + clients: u1(2)+12s\n\ cached: a: [worker-1], b: [worker-2]\n" (Fmt.to_to_string Pool.dump pool); Pool.release w2; Pool.set_active w1 false; Alcotest.(check string) "Job3 pushed back" "\ capacity: 1\n\ - queue: (backlog) [job3] : []\n\ + queue: (backlog) [u1:job3@6s]\n\ registered:\n\ \ worker-1 (0): (inactive)\n\ + clients: u1(2)+12s\n\ cached: a: [worker-1], b: [worker-2]\n" (Fmt.to_to_string Pool.dump pool); - Pool.cancel j3 |> Alcotest.(check (result pass reject)) "job3 cancelled" (Ok ()); + Pool.Client.cancel user j3 |> Alcotest.(check (result pass reject)) "job3 cancelled" (Ok ()); Pool.release w1; Alcotest.(check string) "Job3 cancelled" "\ capacity: 0\n\ - queue: (backlog) [] : []\n\ + queue: (backlog) []\n\ registered:\n\ + clients: u1(2)+12s\n\ cached: a: [worker-1], b: [worker-2]\n" (Fmt.to_to_string Pool.dump pool); Alcotest.(check pop_result) "Finish worker-1" (Ok "job1") (job_state w1a); Alcotest.(check pop_result) "Finish worker-2" (Error "pending") (job_state w2a); @@ -390,33 +436,112 @@ let push_back () = Pool.set_active w1 true; let _w1a = Pool.pop w1 in let _w2a = Pool.pop w2 in - submit pool ~urgent:false @@ job "job1" ~cache_hint:"a"; + let user = Pool.client pool ~client_id:"u1" in + Pool.Client.set_rate user 2.0; + submit user ~urgent:false @@ job "job1" ~cache_hint:"a"; Lwt.pause () >>= fun () -> - submit pool ~urgent:false @@ job "job2" ~cache_hint:"a"; - submit pool ~urgent:false @@ job "job3" ~cache_hint:"a"; + submit user ~urgent:false @@ job "job2" ~cache_hint:"a"; + submit user ~urgent:false @@ job "job3" ~cache_hint:"a"; Pool.set_active w2 true; Lwt.pause () >>= fun () -> Alcotest.(check string) "Jobs assigned" "\ capacity: 2\n\ queue: (ready) [worker-2]\n\ registered:\n\ - \ worker-1 (2): [job3(1) job2(1)] : []\n\ - \ worker-2 (0): [] : []\n\ + \ worker-1 (4): [u1:job3@6s(2) u1:job2@5s(2)]\n\ + \ worker-2 (0): []\n\ + clients: u1(2)+7s\n\ cached: a: [worker-1]\n" (Fmt.to_to_string Pool.dump pool); Pool.release w2; Pool.set_active w1 false; Alcotest.(check string) "Jobs pushed back" "\ capacity: 1\n\ - queue: (backlog) [job3 job2] : []\n\ + queue: (backlog) [u1:job3@6s u1:job2@5s]\n\ registered:\n\ \ worker-1 (0): (inactive)\n\ + clients: u1(2)+7s\n\ cached: a: [worker-1]\n" (Fmt.to_to_string Pool.dump pool); Pool.set_active w1 true; flush_queue w1 ~expect:["job2"; "job3"] +(* Two clients share the cluster. *) +let fairness () = + with_test_db @@ fun db -> + let pool = Pool.create ~db ~name:"fairness" in + let w1 = Pool.register pool ~name:"worker-1" ~capacity:1 |> Result.get_ok in + let w2 = Pool.register pool ~name:"worker-2" ~capacity:1 |> Result.get_ok in + let alice = Pool.client pool ~client_id:"alice" in + let bob = Pool.client pool ~client_id:"bob" in + Pool.Client.set_rate alice 2.0; + Pool.Client.set_rate bob 2.0; + Pool.set_active w1 true; + Pool.set_active w2 true; + let w1a = Pool.pop w1 in + let w2a = Pool.pop w2 in + submit alice ~urgent:false @@ job "a1"; + submit alice ~urgent:false @@ job "a2"; + submit alice ~urgent:false @@ job "a3"; + (* Alice's jobs a1 and a2 have already started running on the two machines, + and a3 is queued. Bob now submits some jobs. *) + submit bob ~urgent:false @@ job "b1"; + submit bob ~urgent:false @@ job "b2"; + submit bob ~urgent:false @@ job "b3"; + Lwt.pause () >>= fun () -> + Alcotest.(check pop_result) "Worker 1 / job 1" (Ok "a1") (job_state w1a); + Alcotest.(check pop_result) "Worker 2 / job 1" (Ok "a2") (job_state w2a); + Alcotest.(check string) "Bob's jobs aren't all last" "\ + capacity: 2\n\ + queue: (backlog) [bob:b3@10s alice:a3@10s bob:b2@5s bob:b1@0s]\n\ + registered:\n\ + \ worker-1 (0): []\n\ + \ worker-2 (0): []\n\ + clients: alice(2)+15s bob(2)+15s\n\ + cached: : [worker-1; worker-2]\n" (Fmt.to_to_string Pool.dump pool); + Pool.release w2; + flush_queue w1 ~expect:["b1"; "b2"; "a3"; "b3"] + +(* Two clients with different rates share the cluster. *) +let fairness_rates () = + with_test_db @@ fun db -> + let pool = Pool.create ~db ~name:"fairness_rates" in + let w1 = Pool.register pool ~name:"worker-1" ~capacity:1 |> Result.get_ok in + let w2 = Pool.register pool ~name:"worker-2" ~capacity:1 |> Result.get_ok in + let alice = Pool.client pool ~client_id:"alice" in + let bob = Pool.client pool ~client_id:"bob" in + Pool.Client.set_rate alice 5.0; + Pool.Client.set_rate bob 1.0; + Pool.set_active w1 true; + Pool.set_active w2 true; + let w1a = Pool.pop w1 in + let w2a = Pool.pop w2 in + (* Alice submits 30s worth of work, but using 5 machines expects to take 6s. *) + submit alice ~urgent:false @@ job "a1"; + submit alice ~urgent:false @@ job "a2"; + submit alice ~urgent:false @@ job "a3"; + (* Alice's jobs a1 and a2 have already started running on the two machines, + and a3 is queued. Bob now submits some jobs. It's the same amount of work, + but with the lower rate, Bob expects his jobs to take 30s. *) + submit bob ~urgent:false @@ job "b1"; + submit bob ~urgent:false @@ job "b2"; + submit bob ~urgent:false @@ job "b3"; + Lwt.pause () >>= fun () -> + Alcotest.(check pop_result) "Worker 1 / job 1" (Ok "a1") (job_state w1a); + Alcotest.(check pop_result) "Worker 2 / job 1" (Ok "a2") (job_state w2a); + Alcotest.(check string) "Bob's jobs aren't all last" "\ + capacity: 2\n\ + queue: (backlog) [bob:b3@20s bob:b2@10s alice:a3@4s bob:b1@0s]\n\ + registered:\n\ + \ worker-1 (0): []\n\ + \ worker-2 (0): []\n\ + clients: alice(5)+6s bob(1)+30s\n\ + cached: : [worker-1; worker-2]\n" (Fmt.to_to_string Pool.dump pool); + Pool.release w2; + flush_queue w1 ~expect:["b1"; "a3"; "b2"; "b3"] + let test_case name fn = Alcotest_lwt.test_case name `Quick @@ fun _ () -> Lwt_unix.yield () >>= fun () -> (* Ensure we're inside the Lwt mainloop. Lwt.pause behaves strangely otherwise. *) + Fake_time.now := 1.0; fn () >>= fun () -> Lwt.pause () >|= fun () -> Prometheus.CollectorRegistry.(collect default) @@ -444,4 +569,6 @@ let suite = [ test_case "inactive" inactive; test_case "cancel_worker_queue" cancel_worker_queue; test_case "push_back" push_back; + test_case "fairness" fairness; + test_case "fairness_rates" fairness_rates; ]