Skip to content

Commit

Permalink
Add support for Amazon S3 bucket storage backend.
Browse files Browse the repository at this point in the history
This adds support for using Amazon's S3 bucket as a storage backend via
the `AmazonS3Store` module. It is only implemented for the `Lwt` concurrency
library at this time since the underlying AWS-S3 library does not yet
have support for `Eio`.
  • Loading branch information
zoj613 committed Nov 18, 2024
1 parent c9fa270 commit cb91a0c
Show file tree
Hide file tree
Showing 6 changed files with 206 additions and 2 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/build-and-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ jobs:
run: |
opam install --deps-only --with-test --with-doc --yes zarr
opam install bytesrw conf-zlib conf-zstd --yes
opam install lwt --yes
opam install lwt aws-s3-lwt --yes
opam exec -- dune build zarr zarr-sync zarr-lwt
- name: setup ocaml-5-specific
Expand Down
1 change: 1 addition & 0 deletions dune-project
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
(and (>= 4.14.0)))
(zarr (= :version))
(lwt (>= 2.5.1))
(aws-s3-lwt (>= 4.8.1))
(odoc :with-doc)
(ounit2 :with-test)
(ppx_deriving :with-test)
Expand Down
1 change: 1 addition & 0 deletions zarr-lwt.opam
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ depends: [
"ocaml" {>= "4.14.0"}
"zarr" {= version}
"lwt" {>= "2.5.1"}
"aws-s3-lwt" {>= "4.8.1"}
"odoc" {with-doc}
"ounit2" {with-test}
"ppx_deriving" {with-test}
Expand Down
1 change: 1 addition & 0 deletions zarr-lwt/src/dune
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
(public_name zarr-lwt)
(libraries
zarr
aws-s3-lwt
lwt
lwt.unix)
(ocamlopt_flags
Expand Down
181 changes: 181 additions & 0 deletions zarr-lwt/src/storage.ml
Original file line number Diff line number Diff line change
Expand Up @@ -144,3 +144,184 @@ module FilesystemStore = struct

include Zarr.Storage.Make(IO)
end

module AmazonS3Store = struct
exception Failed of exn
exception Throttled
exception Forbidden
exception Unknown of int * string
exception Redirected

module S3 = Aws_s3_lwt.S3
module Credentials = Aws_s3_lwt.Credentials

open Deferred.Syntax

let empty_content () = S3.{
size = 0;
last_modified = 0.;
key = String.empty;
etag = String.empty;
meta_headers = None;
storage_class = Standard
}

let empty_Ls () = ([], S3.Ls.Done)

let content_key S3.{key; _} = key

let raise_not_found k () = raise (Zarr.Storage.Key_not_found k)

let fold_result :
not_found:(unit -> 'b) -> ('a, S3.error) result -> 'b
= fun ~not_found -> function
| Ok v -> v
| Error Not_found -> not_found ()
| Error Throttled -> raise Throttled
| Error Forbidden -> raise Forbidden
| Error Redirect _ -> raise Redirected
| Error Failed e -> raise (Failed e)
| Error Unknown (i, s) -> raise (Unknown (i, s))

module IO = struct
module Deferred = Deferred

type t =
{bucket : string
;cred : Credentials.t
;endpoint : Aws_s3.Region.endpoint}

let size t key =
let content_size S3.{size; _} = size in
let bucket = t.bucket and credentials = t.cred and endpoint = t.endpoint in
let* res = S3.head ~bucket ~credentials ~endpoint ~key () in
let c = fold_result ~not_found:empty_content res in
Deferred.return (content_size c)

let is_member t key =
let+ size = size t key in
if size = 0 then false else true

let get t key =
let bucket = t.bucket and credentials = t.cred and endpoint = t.endpoint in
let* res = S3.get ~bucket ~credentials ~endpoint ~key () in
let data = fold_result ~not_found:(raise_not_found key) res in
Deferred.return data

let get_partial_values t key ranges =
let read_range ~t ~key (ofs, len) =
let range : S3.range = match len with
| None -> {first = Some ofs; last = None}
| Some l -> {first = Some ofs; last = Some (ofs + l - 1)}
in
let bucket = t.bucket and credentials = t.cred and endpoint = t.endpoint in
let+ res = S3.get ~bucket ~credentials ~endpoint ~range ~key () in
[fold_result ~not_found:(raise_not_found key) res]
in
Deferred.concat_map (read_range ~t ~key) ranges

let set t key data =
let bucket = t.bucket and credentials = t.cred and endpoint = t.endpoint in
let* res = S3.put ~bucket ~credentials ~endpoint ~data ~key () in
let _ = fold_result ~not_found:(fun () -> String.empty) res in
Deferred.return_unit

let set_partial_values t key ?(append=false) rsv =
let* size = size t key in
let* ov = match size with
| 0 -> Deferred.return String.empty
| _ -> get t key
in
let f = if append || ov = String.empty then
fun acc (_, v) -> acc ^ v else
fun acc (rs, v) ->
let s = Bytes.unsafe_of_string acc in
Bytes.blit_string v 0 s rs String.(length v);
Bytes.unsafe_to_string s
in
set t key (List.fold_left f ov rsv)

let erase t key =
let bucket = t.bucket and credentials = t.cred and endpoint = t.endpoint in
let+ res = S3.delete ~bucket ~credentials ~endpoint ~key () in
fold_result ~not_found:(Fun.const ()) res

let rec delete_keys : t -> S3.Ls.cont -> unit Deferred.t = fun t -> function
| Done -> Deferred.return_unit
| More k ->
let* res = k () in
let xs, rest = fold_result ~not_found:empty_Ls res in
let* () = Deferred.iter (delete_content t) xs in
delete_keys t rest

and delete_content t c = erase t (content_key c)

and erase_prefix t prefix =
let bucket = t.bucket and credentials = t.cred and endpoint = t.endpoint in
let* res = S3.ls ~bucket ~credentials ~endpoint ~prefix () in
let xs, rest = fold_result ~not_found:empty_Ls res in
let* () = Deferred.iter (delete_content t) xs in
delete_keys t rest

let rec list t =
let bucket = t.bucket and credentials = t.cred and endpoint = t.endpoint in
let* res = S3.ls ~bucket ~credentials ~endpoint () in
let xs, rest = fold_result ~not_found:empty_Ls res in
accumulate_keys (List.map content_key xs) rest

and accumulate_keys :
string list -> S3.Ls.cont -> string list Deferred.t
= fun acc -> function
| Done -> Deferred.return acc
| More k ->
let* res = k () in
let xs, rest = fold_result ~not_found:empty_Ls res in
accumulate_keys (acc @ List.map content_key xs) rest

module S = Set.Make(String)

let rec partition_add_keys :
string -> string list * S.t -> S3.Ls.cont -> (string list * S.t) Deferred.t
= fun prefix acc -> function
| Done -> Deferred.return acc
| More k ->
let* res = k () in
let xs, rest = fold_result ~not_found:empty_Ls res in
let acc' = List.fold_left (add ~prefix) acc xs in
partition_add_keys prefix acc' rest

and add ~prefix (l, r) (c : S3.content) =
let size = String.length prefix in
if not (String.contains_from c.key size '/') then c.key :: l, r else
l, S.add String.(sub c.key 0 @@ 1 + index_from c.key size '/') r

and list_dir t prefix =
let bucket = t.bucket and credentials = t.cred and endpoint = t.endpoint in
let* res = S3.ls ~bucket ~credentials ~endpoint ~prefix () in
let xs, rest = fold_result ~not_found:empty_Ls res in
let init = List.fold_left (add ~prefix) ([], S.empty) xs in
let+ keys, prefixes = partition_add_keys prefix init rest in
(keys, S.elements prefixes)

let rec rename t prefix new_prefix =
let remove ~t (k, v) = set t k v in
let* xs = list t in
let to_delete = List.filter (String.starts_with ~prefix) xs in
let* data = Deferred.fold_left (rename_and_add ~t ~prefix ~new_prefix) [] to_delete in
let* () = Deferred.iter (remove ~t) data in
Deferred.iter (erase t) to_delete

and rename_and_add ~t ~prefix ~new_prefix acc k =
let l = String.length prefix in
let k' = new_prefix ^ String.sub k l (String.length k - l) in
let+ a = get t k in (k', a) :: acc
end

let with_open ?(scheme=`Http) ?(inet=`V4) ~region ~bucket ~profile f =
let* res = Credentials.Helper.get_credentials ~profile () in
let cred = Result.fold ~ok:Fun.id ~error:raise res in
let endpoint = Aws_s3.Region.endpoint ~inet ~scheme region in
f IO.{bucket; cred; endpoint}

include Zarr.Storage.Make(IO)
end
22 changes: 21 additions & 1 deletion zarr-lwt/src/storage.mli
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
(** An Lwt-aware in-memory storage backend for Zarr v3 hierarchy. *)
module MemoryStore : sig include Zarr.Memory.S with type 'a Deferred.t = 'a Lwt.t end
module MemoryStore : Zarr.Memory.S with type 'a Deferred.t = 'a Lwt.t

(** An Lwt-aware Zip file storage backend for a Zarr v3 hierarchy. *)
module ZipStore : Zarr.Zip.S with type 'a Deferred.t = 'a Lwt.t
Expand All @@ -18,3 +18,23 @@ module FilesystemStore : sig
@raise Failure if [dir] is not a Zarr store path. *)
end

(** An Lwt-aware Amazon S3 bucket storage backend for a Zarr V3 hierarchy. *)
module AmazonS3Store : sig
exception Failed of exn
exception Throttled
exception Forbidden
exception Unknown of int * string
exception Redirected

include Zarr.Storage.STORE with type 'a Deferred.t = 'a Lwt.t

val with_open :
?scheme:[ `Http | `Https ] ->
?inet:[ `V4 | `V6 ] ->
region:Aws_s3.Region.t ->
bucket:string ->
profile:string ->
(t -> 'a Lwt.t) ->
'a Lwt.t
end

0 comments on commit cb91a0c

Please sign in to comment.