Skip to content

Commit

Permalink
add rate limiting
Browse files Browse the repository at this point in the history
  • Loading branch information
Daniel Richman committed Jul 31, 2021
1 parent efb0326 commit 36823a3
Show file tree
Hide file tree
Showing 5 changed files with 264 additions and 180 deletions.
244 changes: 144 additions & 100 deletions download.ml
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,38 @@ open Core
open Async
open Common

module Urls = struct
let base_url = "https://nomads.ncep.noaa.gov/pub/data/nccf/com/gfs/prod"
let forecast_dir = sprintf !"%s/gfs.%{Forecast_time#yyyymmdd_slash_hh}/atmos/" base_url
module Base_url = struct
type t =
| Nomads
| Aws_mirror

let to_string =
function
| Nomads -> "https://nomads.ncep.noaa.gov/pub/data/nccf/com/gfs/prod"
| Aws_mirror -> "https://noaa-gfs-bdp-pds.s3.amazonaws.com"

let throttle =
let nomads =
Limiter_async.Throttle.create_exn
~concurrent_jobs_target:5
~continue_on_error:false
~burst_size:10
~sustained_rate_per_sec:0.9
()
in
let aws_mirror =
Limiter_async.Throttle.create_exn
~concurrent_jobs_target:5
~continue_on_error:false
()
in
function
| Nomads -> nomads
| Aws_mirror -> aws_mirror
end

module Local_paths = struct
let forecast_dir = sprintf !"gfs.%{Forecast_time#yyyymmdd_slash_hh}/atmos/"

let grib_file fcst_time levels hour =
let fcst_hr = Forecast_time.hour_int fcst_time in
Expand All @@ -17,55 +46,83 @@ module Urls = struct
^ sprintf "gfs.t%02iz.pgrb2%s.0p50.f%03i" fcst_hr maybe_b (Hour.to_int hour)
;;

let index_file fcst_time levels hour = grib_file fcst_time levels hour ^ ".idx"
let index_file fcst_time levels hour = index_file fcst_time levels hour
let grib_file fcst_time levels hour = grib_file fcst_time levels hour
let index_file fcst_time levels hour =
grib_file fcst_time levels hour ^ ".idx"
;;
end

let throttled_get =
let throttle = Throttle.create ~continue_on_error:true ~max_concurrent_jobs:5 in
fun uri ~interrupt ~range ->
Throttle.enqueue throttle (fun () -> Http.get uri ~interrupt ~range)
;;

let interrupted_error = return (Or_error.error_string "interrupted")

let with_retries ~name ~f ~attempt_timeout ~interrupt =
let throttled_get_with_retries
~name
~attempt_timeout
~interrupt
~base_url
~local_path
~range
~parse
=
let throttle = Base_url.throttle base_url in
let uri = sprintf "%s/%s" (Base_url.to_string base_url) local_path in
let next_backoff x =
let open Time.Span in
if x < of_sec 100. then scale x 2. else x
in
let rec loop ~backoff =
let interrupt_this = Ivar.create () in
let res = f ~interrupt:(Ivar.read interrupt_this) () in
let%bind res =
let result : _ Deferred.t =
Limiter_async.Throttle.enqueue'
throttle
(fun () ->
(* Only start the timeout once we're inside the throttle. We don't
need to wait for [Interrupted] here, because the outer [choose] will, and kill
the request if it fires. *)
let got : _ Deferred.t =
Http.get
uri
~range
~interrupt:(Ivar.read interrupt_this)
in
choose
[ choice got (fun res -> `Get_result res)
; choice (Clock.after attempt_timeout) (fun () -> `Timeout)
])
()
in
let%bind result =
choose
[ choice res (fun res -> `Res res)
; choice (Clock.after attempt_timeout) (fun () -> `Timeout)
[ choice result (fun x -> `Throttle_result x)
; choice interrupt (fun () -> `Interrupted)
]
in
Ivar.fill interrupt_this ();
let retry () =
match%bind
choose
[ choice (Clock.after backoff) (fun () -> `Ready)
; choice interrupt (fun () -> `Interrupted)
]
with
| `Ready -> loop ~backoff:(next_backoff backoff)
| `Interrupted -> interrupted_error
in
match res with
| `Res (Ok res) ->
Log.Global.debug "%s OK" name;
return (Ok res)
| `Res (Error err) ->
Log.Global.debug !"%s %{Error#mach} (backoff %{Time.Span})" name err backoff;
retry ()
| `Timeout ->
Log.Global.debug !"%s Timeout (backoff %{Time.Span})" name backoff;
retry ()
(match result with
| `Interrupted -> interrupted_error
| `Throttle_result Aborted ->
return (Or_error.error_string "throttle aborted")
| `Throttle_result (Raised exn) -> raise exn
| `Throttle_result (Ok (`Get_result (Error err))) ->
Log.Global.debug !"%s get error: %{Error#mach} (backoff %{Time.Span})" name err backoff;
retry ~backoff
| `Throttle_result (Ok `Timeout) ->
Log.Global.debug !"%s Timeout (backoff %{Time.Span})" name backoff;
retry ~backoff
| `Throttle_result (Ok (`Get_result (Ok res))) ->
(match parse res with
| Error err ->
Log.Global.debug !"%s parse error: %{Error#mach} (backoff %{Time.Span})" name err backoff;
retry ~backoff
| Ok res ->
Log.Global.debug "%s OK" name;
return (Ok res)))
and retry ~backoff =
match%bind
choose
[ choice (Clock.after backoff) (fun () -> `Ready)
; choice interrupt (fun () -> `Interrupted)
]
with
| `Ready -> loop ~backoff:(next_backoff backoff)
| `Interrupted -> interrupted_error
in
loop ~backoff:(Time.Span.of_sec 5.)
Expand Down Expand Up @@ -118,8 +175,8 @@ let filter_messages_and_assert_all_present =
else Ok messages)
;;

let get_index ~interrupt fcst_time level_set hour =
with_retries
let get_index ~interrupt base_url fcst_time level_set hour =
throttled_get_with_retries
~name:
(sprintf
!"Download index %{Forecast_time#yyyymmddhh} %{Level_set} %{Hour}"
Expand All @@ -128,71 +185,58 @@ let get_index ~interrupt fcst_time level_set hour =
hour)
~interrupt
~attempt_timeout:(Time.Span.of_sec 10.)
~f:(fun ~interrupt () ->
match%bind
throttled_get
(Urls.index_file fcst_time level_set hour)
~range:(`all_with_max_len (128 * 1024))
~interrupt
with
| Error _ as error -> return error
| Ok bigstring ->
(match Grib_index.parse (Bigstring.to_string bigstring) with
| Error _ as error -> return error
| Ok messages ->
let checked_messages =
filter_messages_and_assert_all_present
messages
~level_set
~expect_fcst_time:fcst_time
~expect_hour:hour
in
return checked_messages))
~base_url
~local_path:(Local_paths.index_file fcst_time level_set hour)
~range:(`all_with_max_len (128 * 1024))
~parse:(fun bigstring ->
match Grib_index.parse (Bigstring.to_string bigstring) with
| Error _ as error -> error
| Ok messages ->
filter_messages_and_assert_all_present
messages
~level_set
~expect_fcst_time:fcst_time
~expect_hour:hour)
;;

let get_message ~interrupt (msg : Grib_index.message) =
with_retries
let get_message ~interrupt base_url (msg : Grib_index.message) =
throttled_get_with_retries
~name:("Download message " ^ Grib_index.message_to_string msg)
~interrupt
~attempt_timeout:(Time.Span.of_sec 60.)
~f:(fun ~interrupt () ->
match%bind
throttled_get
(Urls.grib_file msg.fcst_time (Level.level_set msg.level) msg.hour)
~range:(`exactly_pos_len (msg.offset, msg.length))
~interrupt
with
| Error _ as error -> return error
| Ok bigstring ->
(match Grib_message.of_bigstring bigstring with
| Error _ as error -> return error
| Ok message ->
let matches =
[%compare.equal: Variable.t Or_error.t]
(Grib_message.variable message)
(Ok msg.variable)
&& [%compare.equal: Hour.t Or_error.t]
(Grib_message.hour message)
(Ok msg.hour)
&& [%compare.equal: Level.t Or_error.t]
(Grib_message.level message)
(Ok msg.level)
&& [%compare.equal: Layout.t Or_error.t]
(Grib_message.layout message)
(Ok Half_deg)
in
if matches
then return (Ok message)
else return (Or_error.errorf "GRIB message contents did not match index")))
~base_url
~local_path:(Local_paths.grib_file msg.fcst_time (Level.level_set msg.level) msg.hour)
~range:(`exactly_pos_len (msg.offset, msg.length))
~parse:(fun bigstring ->
match Grib_message.of_bigstring bigstring with
| Error _ as error -> error
| Ok message ->
let matches =
[%compare.equal: Variable.t Or_error.t]
(Grib_message.variable message)
(Ok msg.variable)
&& [%compare.equal: Hour.t Or_error.t]
(Grib_message.hour message)
(Ok msg.hour)
&& [%compare.equal: Level.t Or_error.t]
(Grib_message.level message)
(Ok msg.level)
&& [%compare.equal: Layout.t Or_error.t]
(Grib_message.layout message)
(Ok Half_deg)
in
if matches
then Ok message
else Or_error.errorf "GRIB message contents did not match index")
;;

let download_to_temp_filename ~interrupt ~temp_filename fcst_time =
let download_to_temp_filename ~interrupt ~temp_filename base_url fcst_time =
Log.Global.debug !"Begin download of %{Forecast_time#yyyymmddhh}" fcst_time;
match%bind Dataset_file.create ~filename:temp_filename RW with
| Error _ as error -> return error
| Ok ds ->
let message_job ~interrupt msg =
match%bind get_message ~interrupt msg with
match%bind get_message ~interrupt base_url msg with
| Error _ as error -> return error
| Ok grib ->
let slice = Dataset_file.slice ds msg.hour msg.level msg.variable in
Expand All @@ -208,7 +252,7 @@ let download_to_temp_filename ~interrupt ~temp_filename fcst_time =
let interrupt_this = Ivar.create () in
upon interrupt (fun () -> Ivar.fill_if_empty interrupt_this ());
let interrupt = Ivar.read interrupt_this in
match%bind get_index ~interrupt fcst_time level_set hour with
match%bind get_index ~interrupt base_url fcst_time level_set hour with
| Error _ as error -> return error
| Ok messages ->
let last, rest =
Expand All @@ -226,11 +270,11 @@ let download_to_temp_filename ~interrupt ~temp_filename fcst_time =
(* becomes determined as soon as _any_ job fails, since we want to eagerly kill
the others at that point *)
Deferred.create (fun ivar ->
List.iter rest_results ~f:(fun res ->
upon res (fun res ->
match res with
| Ok _ -> ()
| Error e -> Ivar.fill_if_empty ivar e)))
List.iter rest_results ~f:(fun res ->
upon res (fun res ->
match res with
| Ok _ -> ()
| Error e -> Ivar.fill_if_empty ivar e)))
in
let%bind final_result =
choose
Expand Down Expand Up @@ -268,14 +312,14 @@ let download_to_temp_filename ~interrupt ~temp_filename fcst_time =
| Ok () -> Dataset_file.msync ds)
;;

let download ~interrupt ?directory forecast_time =
let download ~interrupt ?directory base_url forecast_time =
let make_filename ?prefix () =
Dataset_file.Filename.one ?directory ?prefix forecast_time
in
let temp_filename = make_filename ~prefix:Dataset_file.Filename.downloader_prefix () in
let final_filename = make_filename () in
Log.Global.debug "Temp filename will be %s" temp_filename;
match%bind download_to_temp_filename ~interrupt ~temp_filename forecast_time with
match%bind download_to_temp_filename ~interrupt ~temp_filename base_url forecast_time with
| Ok () ->
Log.Global.debug "Renaming %s -> %s" temp_filename final_filename;
Monitor.try_with_or_error (fun () -> Sys.rename temp_filename final_filename)
Expand Down
9 changes: 9 additions & 0 deletions download.mli
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,29 @@ open Core
open Async
open Common

module Base_url : sig
type t =
| Nomads
| Aws_mirror
end

val get_index
: interrupt:unit Deferred.t
-> Base_url.t
-> Forecast_time.t
-> Level_set.t
-> Hour.t
-> Grib_index.message list Or_error.t Deferred.t

val get_message
: interrupt:unit Deferred.t
-> Base_url.t
-> Grib_index.message
-> Grib_message.t Or_error.t Deferred.t

val download
: interrupt:unit Deferred.t
-> ?directory:string
-> Base_url.t
-> Forecast_time.t
-> unit Or_error.t Deferred.t
1 change: 1 addition & 0 deletions dune
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
async
curl
ctypes.foreign
async_kernel.limiter_async
)
(link_flags (-cclib -leccodes))
(preprocess (pps ppx_jane))
Expand Down
10 changes: 10 additions & 0 deletions http.ml
Original file line number Diff line number Diff line change
Expand Up @@ -108,11 +108,13 @@ end = struct
| In
| Out
| Inout
[@@deriving sexp_of]

type ctl_action =
| Add of in_and_or_out
| Del
| Mod of in_and_or_out
[@@deriving sexp_of]

type t = int

Expand Down Expand Up @@ -141,6 +143,14 @@ end = struct
~events
~data:(Core.Unix.File_descr.to_int fd)
with
| exception Unix.Unix_error (code, fn_name, str) ->
let extra_info = sprintf !"%{sexp:ctl_action} %{Core.Unix.File_descr}" change fd in
let str =
match str with
| "" -> extra_info
| str -> sprintf "%s %s" str extra_info
in
raise (Unix.Unix_error (code, fn_name, str))
| 0 -> ()
| _ -> failwith "epoll_ctl unexpected return value"
;;
Expand Down
Loading

0 comments on commit 36823a3

Please sign in to comment.