diff --git a/docs/docs/icechunk-python/configuration.md b/docs/docs/icechunk-python/configuration.md index 0be341df..d037aa8f 100644 --- a/docs/docs/icechunk-python/configuration.md +++ b/docs/docs/icechunk-python/configuration.md @@ -22,10 +22,6 @@ It allows you to configure the following parameters: The threshold for when to inline a chunk into a manifest instead of storing it as a separate object in the storage backend. -### [`unsafe_overwrite_refs`](./reference.md#icechunk.RepositoryConfig.unsafe_overwrite_refs) - -Whether to allow overwriting references in the repository. - ### [`get_partial_values_concurrency`](./reference.md#icechunk.RepositoryConfig.get_partial_values_concurrency) The number of concurrent requests to make when getting partial values from storage. diff --git a/icechunk-python/python/icechunk/_icechunk_python.pyi b/icechunk-python/python/icechunk/_icechunk_python.pyi index 0db86fc5..75a8f4f6 100644 --- a/icechunk-python/python/icechunk/_icechunk_python.pyi +++ b/icechunk-python/python/icechunk/_icechunk_python.pyi @@ -570,7 +570,6 @@ class RepositoryConfig: def __init__( self, inline_chunk_threshold_bytes: int | None = None, - unsafe_overwrite_refs: bool | None = None, get_partial_values_concurrency: int | None = None, compression: CompressionConfig | None = None, caching: CachingConfig | None = None, @@ -585,8 +584,6 @@ class RepositoryConfig: ---------- inline_chunk_threshold_bytes: int | None The maximum size of a chunk that will be stored inline in the repository. - unsafe_overwrite_refs: bool | None - Whether to allow overwriting references in the repository. get_partial_values_concurrency: int | None The number of concurrent requests to make when getting partial values from storage. compression: CompressionConfig | None @@ -618,28 +615,6 @@ class RepositoryConfig: """ ... @property - def unsafe_overwrite_refs(self) -> bool | None: - """ - Whether to allow overwriting references in the repository. - - Returns - ------- - bool | None - Whether to allow overwriting references in the repository. - """ - ... - @unsafe_overwrite_refs.setter - def unsafe_overwrite_refs(self, value: bool | None) -> None: - """ - Set whether to allow overwriting references in the repository. - - Parameters - ---------- - value: bool | None - Whether to allow overwriting references in the repository. - """ - ... - @property def get_partial_values_concurrency(self) -> int | None: """ The number of concurrent requests to make when getting partial values from storage. diff --git a/icechunk-python/src/config.rs b/icechunk-python/src/config.rs index 05133db8..016ff0df 100644 --- a/icechunk-python/src/config.rs +++ b/icechunk-python/src/config.rs @@ -969,8 +969,6 @@ pub struct PyRepositoryConfig { #[pyo3(get, set)] pub inline_chunk_threshold_bytes: Option, #[pyo3(get, set)] - pub unsafe_overwrite_refs: Option, - #[pyo3(get, set)] pub get_partial_values_concurrency: Option, #[pyo3(get, set)] pub compression: Option>, @@ -996,7 +994,6 @@ impl From<&PyRepositoryConfig> for RepositoryConfig { fn from(value: &PyRepositoryConfig) -> Self { Python::with_gil(|py| Self { inline_chunk_threshold_bytes: value.inline_chunk_threshold_bytes, - unsafe_overwrite_refs: value.unsafe_overwrite_refs, get_partial_values_concurrency: value.get_partial_values_concurrency, compression: value.compression.as_ref().map(|c| (&*c.borrow(py)).into()), caching: value.caching.as_ref().map(|c| (&*c.borrow(py)).into()), @@ -1014,7 +1011,6 @@ impl From for PyRepositoryConfig { #[allow(clippy::expect_used)] Python::with_gil(|py| Self { inline_chunk_threshold_bytes: value.inline_chunk_threshold_bytes, - unsafe_overwrite_refs: value.unsafe_overwrite_refs, get_partial_values_concurrency: value.get_partial_values_concurrency, compression: value.compression.map(|c| { Py::new(py, Into::::into(c)) @@ -1049,11 +1045,10 @@ impl PyRepositoryConfig { } #[new] - #[pyo3(signature = (inline_chunk_threshold_bytes = None, unsafe_overwrite_refs = None, get_partial_values_concurrency = None, compression = None, caching = None, storage = None, virtual_chunk_containers = None, manifest = None))] + #[pyo3(signature = (inline_chunk_threshold_bytes = None, get_partial_values_concurrency = None, compression = None, caching = None, storage = None, virtual_chunk_containers = None, manifest = None))] #[allow(clippy::too_many_arguments)] pub fn new( inline_chunk_threshold_bytes: Option, - unsafe_overwrite_refs: Option, get_partial_values_concurrency: Option, compression: Option>, caching: Option>, @@ -1063,7 +1058,6 @@ impl PyRepositoryConfig { ) -> Self { Self { inline_chunk_threshold_bytes, - unsafe_overwrite_refs, get_partial_values_concurrency, compression, caching, @@ -1129,9 +1123,8 @@ impl PyRepositoryConfig { })); // TODO: virtual chunk containers format!( - r#"RepositoryConfig(inline_chunk_threshold_bytes={inl}, unsafe_overwrite_refs={uns}, get_partial_values_concurrency={partial}, compression={comp}, caching={caching}, storage={storage}, manifest={manifest})"#, + r#"RepositoryConfig(inline_chunk_threshold_bytes={inl}, get_partial_values_concurrency={partial}, compression={comp}, caching={caching}, storage={storage}, manifest={manifest})"#, inl = format_option_to_string(self.inline_chunk_threshold_bytes), - uns = format_option(self.unsafe_overwrite_refs.map(format_bool)), partial = format_option_to_string(self.get_partial_values_concurrency), comp = comp, caching = caching, diff --git a/icechunk-python/src/store.rs b/icechunk-python/src/store.rs index 93833f8b..de50a63d 100644 --- a/icechunk-python/src/store.rs +++ b/icechunk-python/src/store.rs @@ -8,6 +8,7 @@ use icechunk::{ manifest::{Checksum, SecondsSinceEpoch, VirtualChunkLocation, VirtualChunkRef}, ChunkLength, ChunkOffset, }, + storage::ETag, store::{StoreError, StoreErrorKind}, Store, }; @@ -37,7 +38,7 @@ enum ChecksumArgument { impl From for Checksum { fn from(value: ChecksumArgument) -> Self { match value { - ChecksumArgument::String(etag) => Checksum::ETag(etag), + ChecksumArgument::String(etag) => Checksum::ETag(ETag(etag)), ChecksumArgument::Datetime(date_time) => { Checksum::LastModified(SecondsSinceEpoch(date_time.timestamp() as u32)) } diff --git a/icechunk-python/tests/data/test-repo/chunks/0DX66KVWRNQGEEWC8QN0 b/icechunk-python/tests/data/test-repo/chunks/09HEW2P03CSMHFAZY7DG similarity index 100% rename from icechunk-python/tests/data/test-repo/chunks/0DX66KVWRNQGEEWC8QN0 rename to icechunk-python/tests/data/test-repo/chunks/09HEW2P03CSMHFAZY7DG diff --git a/icechunk-python/tests/data/test-repo/chunks/20BVFJQXWR84R1F1Q58G b/icechunk-python/tests/data/test-repo/chunks/52H0E4NSPN8SVRK9EVGG similarity index 100% rename from icechunk-python/tests/data/test-repo/chunks/20BVFJQXWR84R1F1Q58G rename to icechunk-python/tests/data/test-repo/chunks/52H0E4NSPN8SVRK9EVGG diff --git a/icechunk-python/tests/data/test-repo/chunks/AN2V69NWNGNCW91839Q0 b/icechunk-python/tests/data/test-repo/chunks/AN2V69NWNGNCW91839Q0 deleted file mode 100644 index 8666a286..00000000 Binary files a/icechunk-python/tests/data/test-repo/chunks/AN2V69NWNGNCW91839Q0 and /dev/null differ diff --git a/icechunk-python/tests/data/test-repo/chunks/5PCMGAGQ1FC1GKKHZTK0 b/icechunk-python/tests/data/test-repo/chunks/DWQ75SDC624XF9H326RG similarity index 100% rename from icechunk-python/tests/data/test-repo/chunks/5PCMGAGQ1FC1GKKHZTK0 rename to icechunk-python/tests/data/test-repo/chunks/DWQ75SDC624XF9H326RG diff --git a/icechunk-python/tests/data/test-repo/chunks/HTKY8SN65KBX42E4V9FG b/icechunk-python/tests/data/test-repo/chunks/HTKY8SN65KBX42E4V9FG deleted file mode 100644 index 8666a286..00000000 Binary files a/icechunk-python/tests/data/test-repo/chunks/HTKY8SN65KBX42E4V9FG and /dev/null differ diff --git a/icechunk-python/tests/data/test-repo/chunks/MDYP3ZEV630YPNCEENC0 b/icechunk-python/tests/data/test-repo/chunks/MDYP3ZEV630YPNCEENC0 deleted file mode 100644 index 8666a286..00000000 Binary files a/icechunk-python/tests/data/test-repo/chunks/MDYP3ZEV630YPNCEENC0 and /dev/null differ diff --git a/icechunk-python/tests/data/test-repo/chunks/9HZ3J7FEC9CCEB6GEKVG b/icechunk-python/tests/data/test-repo/chunks/RW938N1KP2R4BHMW62QG similarity index 100% rename from icechunk-python/tests/data/test-repo/chunks/9HZ3J7FEC9CCEB6GEKVG rename to icechunk-python/tests/data/test-repo/chunks/RW938N1KP2R4BHMW62QG diff --git a/icechunk-python/tests/data/test-repo/chunks/Y9DTSVWNZV9HKT2R17T0 b/icechunk-python/tests/data/test-repo/chunks/Y9DTSVWNZV9HKT2R17T0 deleted file mode 100644 index 8666a286..00000000 Binary files a/icechunk-python/tests/data/test-repo/chunks/Y9DTSVWNZV9HKT2R17T0 and /dev/null differ diff --git a/icechunk-python/tests/data/test-repo/config.yaml b/icechunk-python/tests/data/test-repo/config.yaml index 65e4dd1e..80e47459 100644 --- a/icechunk-python/tests/data/test-repo/config.yaml +++ b/icechunk-python/tests/data/test-repo/config.yaml @@ -1,5 +1,4 @@ inline_chunk_threshold_bytes: 12 -unsafe_overwrite_refs: null get_partial_values_concurrency: null compression: null caching: null @@ -13,6 +12,10 @@ virtual_chunk_containers: endpoint_url: https://fly.storage.tigris.dev anonymous: false allow_http: false + az: + name: az + url_prefix: az + store: !azure {} file: name: file url_prefix: file @@ -21,10 +24,6 @@ virtual_chunk_containers: name: gcs url_prefix: gcs store: !gcs {} - az: - name: az - url_prefix: az - store: !azure {} s3: name: s3 url_prefix: s3:// diff --git a/icechunk-python/tests/data/test-repo/manifests/0GQQ44D2837GGMHY81CG b/icechunk-python/tests/data/test-repo/manifests/0GQQ44D2837GGMHY81CG deleted file mode 100644 index 5dff37e3..00000000 Binary files a/icechunk-python/tests/data/test-repo/manifests/0GQQ44D2837GGMHY81CG and /dev/null differ diff --git a/icechunk-python/tests/data/test-repo/manifests/3C9WRKTE3PNDSNYBKD60 b/icechunk-python/tests/data/test-repo/manifests/3C9WRKTE3PNDSNYBKD60 deleted file mode 100644 index a7da5f1c..00000000 Binary files a/icechunk-python/tests/data/test-repo/manifests/3C9WRKTE3PNDSNYBKD60 and /dev/null differ diff --git a/icechunk-python/tests/data/test-repo/manifests/73Q2CY1JSN768PFJS2M0 b/icechunk-python/tests/data/test-repo/manifests/73Q2CY1JSN768PFJS2M0 deleted file mode 100644 index 7b1848c2..00000000 Binary files a/icechunk-python/tests/data/test-repo/manifests/73Q2CY1JSN768PFJS2M0 and /dev/null differ diff --git a/icechunk-python/tests/data/test-repo/manifests/8WT6R2E6WVC9GJ7BS6GG b/icechunk-python/tests/data/test-repo/manifests/8WT6R2E6WVC9GJ7BS6GG deleted file mode 100644 index 1d3cb4ab..00000000 Binary files a/icechunk-python/tests/data/test-repo/manifests/8WT6R2E6WVC9GJ7BS6GG and /dev/null differ diff --git a/icechunk-python/tests/data/test-repo/manifests/C38XX4Z2517M93GQ5MA0 b/icechunk-python/tests/data/test-repo/manifests/C38XX4Z2517M93GQ5MA0 deleted file mode 100644 index 98ce9238..00000000 Binary files a/icechunk-python/tests/data/test-repo/manifests/C38XX4Z2517M93GQ5MA0 and /dev/null differ diff --git a/icechunk-python/tests/data/test-repo/manifests/CMYVHDWMSTG9R25780YG b/icechunk-python/tests/data/test-repo/manifests/CMYVHDWMSTG9R25780YG new file mode 100644 index 00000000..1cd56eb3 Binary files /dev/null and b/icechunk-python/tests/data/test-repo/manifests/CMYVHDWMSTG9R25780YG differ diff --git a/icechunk-python/tests/data/test-repo/manifests/G3W2W8V6ZG09J6C21WE0 b/icechunk-python/tests/data/test-repo/manifests/G3W2W8V6ZG09J6C21WE0 new file mode 100644 index 00000000..a107d98a Binary files /dev/null and b/icechunk-python/tests/data/test-repo/manifests/G3W2W8V6ZG09J6C21WE0 differ diff --git a/icechunk-python/tests/data/test-repo/manifests/G94WC9CN23R53A63CRXG b/icechunk-python/tests/data/test-repo/manifests/G94WC9CN23R53A63CRXG deleted file mode 100644 index 7727c935..00000000 Binary files a/icechunk-python/tests/data/test-repo/manifests/G94WC9CN23R53A63CRXG and /dev/null differ diff --git a/icechunk-python/tests/data/test-repo/manifests/MWE7J4Y1V04W0DCXB8Z0 b/icechunk-python/tests/data/test-repo/manifests/MWE7J4Y1V04W0DCXB8Z0 deleted file mode 100644 index fdfbb82d..00000000 Binary files a/icechunk-python/tests/data/test-repo/manifests/MWE7J4Y1V04W0DCXB8Z0 and /dev/null differ diff --git a/icechunk-python/tests/data/test-repo/manifests/Q04J7QW5RQ8D17TPA10G b/icechunk-python/tests/data/test-repo/manifests/Q04J7QW5RQ8D17TPA10G new file mode 100644 index 00000000..d3ea1cd3 Binary files /dev/null and b/icechunk-python/tests/data/test-repo/manifests/Q04J7QW5RQ8D17TPA10G differ diff --git a/icechunk-python/tests/data/test-repo/manifests/SHTEAP8C784YMZSJKBM0 b/icechunk-python/tests/data/test-repo/manifests/SHTEAP8C784YMZSJKBM0 new file mode 100644 index 00000000..28a7e014 Binary files /dev/null and b/icechunk-python/tests/data/test-repo/manifests/SHTEAP8C784YMZSJKBM0 differ diff --git a/icechunk-python/tests/data/test-repo/manifests/T9PRDPYDRCEHC2GAVR8G b/icechunk-python/tests/data/test-repo/manifests/T9PRDPYDRCEHC2GAVR8G deleted file mode 100644 index 9962c589..00000000 Binary files a/icechunk-python/tests/data/test-repo/manifests/T9PRDPYDRCEHC2GAVR8G and /dev/null differ diff --git a/icechunk-python/tests/data/test-repo/refs/branch.main/ZZZZZZZW.json b/icechunk-python/tests/data/test-repo/refs/branch.main/ZZZZZZZW.json deleted file mode 100644 index c7dc109c..00000000 --- a/icechunk-python/tests/data/test-repo/refs/branch.main/ZZZZZZZW.json +++ /dev/null @@ -1 +0,0 @@ -{"snapshot":"HNG82GMS51ECXFXFCYJG"} \ No newline at end of file diff --git a/icechunk-python/tests/data/test-repo/refs/branch.main/ZZZZZZZX.json b/icechunk-python/tests/data/test-repo/refs/branch.main/ZZZZZZZX.json deleted file mode 100644 index 0fc7cd40..00000000 --- a/icechunk-python/tests/data/test-repo/refs/branch.main/ZZZZZZZX.json +++ /dev/null @@ -1 +0,0 @@ -{"snapshot":"GNFK0SSWD5B8CVA53XEG"} \ No newline at end of file diff --git a/icechunk-python/tests/data/test-repo/refs/branch.main/ZZZZZZZY.json b/icechunk-python/tests/data/test-repo/refs/branch.main/ZZZZZZZY.json deleted file mode 100644 index cf192bde..00000000 --- a/icechunk-python/tests/data/test-repo/refs/branch.main/ZZZZZZZY.json +++ /dev/null @@ -1 +0,0 @@ -{"snapshot":"3EKE17N8YF5ZK5NRMZJ0"} \ No newline at end of file diff --git a/icechunk-python/tests/data/test-repo/refs/branch.main/ZZZZZZZZ.json b/icechunk-python/tests/data/test-repo/refs/branch.main/ZZZZZZZZ.json deleted file mode 100644 index 346eeb78..00000000 --- a/icechunk-python/tests/data/test-repo/refs/branch.main/ZZZZZZZZ.json +++ /dev/null @@ -1 +0,0 @@ -{"snapshot":"R7F1RJHPZ428N4AK19K0"} \ No newline at end of file diff --git a/icechunk-python/tests/data/test-repo/refs/branch.main/ref.json b/icechunk-python/tests/data/test-repo/refs/branch.main/ref.json new file mode 100644 index 00000000..8fa8aba9 --- /dev/null +++ b/icechunk-python/tests/data/test-repo/refs/branch.main/ref.json @@ -0,0 +1 @@ +{"snapshot":"NXH3M0HJ7EEJ0699DPP0"} \ No newline at end of file diff --git a/icechunk-python/tests/data/test-repo/refs/branch.my-branch/ZZZZZZZX.json b/icechunk-python/tests/data/test-repo/refs/branch.my-branch/ZZZZZZZX.json deleted file mode 100644 index a52be478..00000000 --- a/icechunk-python/tests/data/test-repo/refs/branch.my-branch/ZZZZZZZX.json +++ /dev/null @@ -1 +0,0 @@ -{"snapshot":"TNE0TX645A2G7VTXFA1G"} \ No newline at end of file diff --git a/icechunk-python/tests/data/test-repo/refs/branch.my-branch/ZZZZZZZY.json b/icechunk-python/tests/data/test-repo/refs/branch.my-branch/ZZZZZZZY.json deleted file mode 100644 index d63e54c4..00000000 --- a/icechunk-python/tests/data/test-repo/refs/branch.my-branch/ZZZZZZZY.json +++ /dev/null @@ -1 +0,0 @@ -{"snapshot":"394QWZDXAY74HP6Q8P3G"} \ No newline at end of file diff --git a/icechunk-python/tests/data/test-repo/refs/branch.my-branch/ZZZZZZZZ.json b/icechunk-python/tests/data/test-repo/refs/branch.my-branch/ZZZZZZZZ.json deleted file mode 100644 index c7dc109c..00000000 --- a/icechunk-python/tests/data/test-repo/refs/branch.my-branch/ZZZZZZZZ.json +++ /dev/null @@ -1 +0,0 @@ -{"snapshot":"HNG82GMS51ECXFXFCYJG"} \ No newline at end of file diff --git a/icechunk-python/tests/data/test-repo/refs/branch.my-branch/ref.json b/icechunk-python/tests/data/test-repo/refs/branch.my-branch/ref.json new file mode 100644 index 00000000..a4b2ffa4 --- /dev/null +++ b/icechunk-python/tests/data/test-repo/refs/branch.my-branch/ref.json @@ -0,0 +1 @@ +{"snapshot":"XDZ162T1TYBEJMK99NPG"} \ No newline at end of file diff --git a/icechunk-python/tests/data/test-repo/refs/tag.deleted/ref.json b/icechunk-python/tests/data/test-repo/refs/tag.deleted/ref.json index d63e54c4..b84c0bfc 100644 --- a/icechunk-python/tests/data/test-repo/refs/tag.deleted/ref.json +++ b/icechunk-python/tests/data/test-repo/refs/tag.deleted/ref.json @@ -1 +1 @@ -{"snapshot":"394QWZDXAY74HP6Q8P3G"} \ No newline at end of file +{"snapshot":"4QF8JA0YPDN51MHSSYVG"} \ No newline at end of file diff --git a/icechunk-python/tests/data/test-repo/refs/tag.it also works!/ref.json b/icechunk-python/tests/data/test-repo/refs/tag.it also works!/ref.json index a52be478..a4b2ffa4 100644 --- a/icechunk-python/tests/data/test-repo/refs/tag.it also works!/ref.json +++ b/icechunk-python/tests/data/test-repo/refs/tag.it also works!/ref.json @@ -1 +1 @@ -{"snapshot":"TNE0TX645A2G7VTXFA1G"} \ No newline at end of file +{"snapshot":"XDZ162T1TYBEJMK99NPG"} \ No newline at end of file diff --git a/icechunk-python/tests/data/test-repo/refs/tag.it works!/ref.json b/icechunk-python/tests/data/test-repo/refs/tag.it works!/ref.json index d63e54c4..b84c0bfc 100644 --- a/icechunk-python/tests/data/test-repo/refs/tag.it works!/ref.json +++ b/icechunk-python/tests/data/test-repo/refs/tag.it works!/ref.json @@ -1 +1 @@ -{"snapshot":"394QWZDXAY74HP6Q8P3G"} \ No newline at end of file +{"snapshot":"4QF8JA0YPDN51MHSSYVG"} \ No newline at end of file diff --git a/icechunk-python/tests/data/test-repo/snapshots/394QWZDXAY74HP6Q8P3G b/icechunk-python/tests/data/test-repo/snapshots/394QWZDXAY74HP6Q8P3G deleted file mode 100644 index 3b09dece..00000000 Binary files a/icechunk-python/tests/data/test-repo/snapshots/394QWZDXAY74HP6Q8P3G and /dev/null differ diff --git a/icechunk-python/tests/data/test-repo/snapshots/3EKE17N8YF5ZK5NRMZJ0 b/icechunk-python/tests/data/test-repo/snapshots/3EKE17N8YF5ZK5NRMZJ0 deleted file mode 100644 index 7171f403..00000000 Binary files a/icechunk-python/tests/data/test-repo/snapshots/3EKE17N8YF5ZK5NRMZJ0 and /dev/null differ diff --git a/icechunk-python/tests/data/test-repo/snapshots/4QF8JA0YPDN51MHSSYVG b/icechunk-python/tests/data/test-repo/snapshots/4QF8JA0YPDN51MHSSYVG new file mode 100644 index 00000000..14bba3bf Binary files /dev/null and b/icechunk-python/tests/data/test-repo/snapshots/4QF8JA0YPDN51MHSSYVG differ diff --git a/icechunk-python/tests/data/test-repo/snapshots/6Q9GDTXKF17BGQVSQZFG b/icechunk-python/tests/data/test-repo/snapshots/6Q9GDTXKF17BGQVSQZFG deleted file mode 100644 index b049d7ac..00000000 Binary files a/icechunk-python/tests/data/test-repo/snapshots/6Q9GDTXKF17BGQVSQZFG and /dev/null differ diff --git a/icechunk-python/tests/data/test-repo/snapshots/7XAF0Q905SH4938DN9CG b/icechunk-python/tests/data/test-repo/snapshots/7XAF0Q905SH4938DN9CG new file mode 100644 index 00000000..d5dc1dfb Binary files /dev/null and b/icechunk-python/tests/data/test-repo/snapshots/7XAF0Q905SH4938DN9CG differ diff --git a/icechunk-python/tests/data/test-repo/snapshots/949AXZ49X764TMDC6D4G b/icechunk-python/tests/data/test-repo/snapshots/949AXZ49X764TMDC6D4G deleted file mode 100644 index 391bc09f..00000000 Binary files a/icechunk-python/tests/data/test-repo/snapshots/949AXZ49X764TMDC6D4G and /dev/null differ diff --git a/icechunk-python/tests/data/test-repo/snapshots/A2RD2Y65PR6D3B6BR1K0 b/icechunk-python/tests/data/test-repo/snapshots/A2RD2Y65PR6D3B6BR1K0 deleted file mode 100644 index b8eebc1c..00000000 Binary files a/icechunk-python/tests/data/test-repo/snapshots/A2RD2Y65PR6D3B6BR1K0 and /dev/null differ diff --git a/icechunk-python/tests/data/test-repo/snapshots/GC4YVH5SKBPEZCENYQE0 b/icechunk-python/tests/data/test-repo/snapshots/GC4YVH5SKBPEZCENYQE0 new file mode 100644 index 00000000..53a1a52d Binary files /dev/null and b/icechunk-python/tests/data/test-repo/snapshots/GC4YVH5SKBPEZCENYQE0 differ diff --git a/icechunk-python/tests/data/test-repo/snapshots/GNFK0SSWD5B8CVA53XEG b/icechunk-python/tests/data/test-repo/snapshots/GNFK0SSWD5B8CVA53XEG deleted file mode 100644 index 93decc5f..00000000 Binary files a/icechunk-python/tests/data/test-repo/snapshots/GNFK0SSWD5B8CVA53XEG and /dev/null differ diff --git a/icechunk-python/tests/data/test-repo/snapshots/HNG82GMS51ECXFXFCYJG b/icechunk-python/tests/data/test-repo/snapshots/HNG82GMS51ECXFXFCYJG deleted file mode 100644 index bd82739d..00000000 Binary files a/icechunk-python/tests/data/test-repo/snapshots/HNG82GMS51ECXFXFCYJG and /dev/null differ diff --git a/icechunk-python/tests/data/test-repo/snapshots/K1BMYVG1HNVTNV1FSBH0 b/icechunk-python/tests/data/test-repo/snapshots/K1BMYVG1HNVTNV1FSBH0 deleted file mode 100644 index bfcd527f..00000000 Binary files a/icechunk-python/tests/data/test-repo/snapshots/K1BMYVG1HNVTNV1FSBH0 and /dev/null differ diff --git a/icechunk-python/tests/data/test-repo/snapshots/NXH3M0HJ7EEJ0699DPP0 b/icechunk-python/tests/data/test-repo/snapshots/NXH3M0HJ7EEJ0699DPP0 new file mode 100644 index 00000000..7921d022 Binary files /dev/null and b/icechunk-python/tests/data/test-repo/snapshots/NXH3M0HJ7EEJ0699DPP0 differ diff --git a/icechunk-python/tests/data/test-repo/snapshots/P874YS3J196959RDHX7G b/icechunk-python/tests/data/test-repo/snapshots/P874YS3J196959RDHX7G new file mode 100644 index 00000000..5af21bb1 Binary files /dev/null and b/icechunk-python/tests/data/test-repo/snapshots/P874YS3J196959RDHX7G differ diff --git a/icechunk-python/tests/data/test-repo/snapshots/R7F1RJHPZ428N4AK19K0 b/icechunk-python/tests/data/test-repo/snapshots/R7F1RJHPZ428N4AK19K0 deleted file mode 100644 index 493bb362..00000000 Binary files a/icechunk-python/tests/data/test-repo/snapshots/R7F1RJHPZ428N4AK19K0 and /dev/null differ diff --git a/icechunk-python/tests/data/test-repo/snapshots/RPA0WQCNM2N9HBBRHJQ0 b/icechunk-python/tests/data/test-repo/snapshots/RPA0WQCNM2N9HBBRHJQ0 deleted file mode 100644 index 574db73f..00000000 Binary files a/icechunk-python/tests/data/test-repo/snapshots/RPA0WQCNM2N9HBBRHJQ0 and /dev/null differ diff --git a/icechunk-python/tests/data/test-repo/snapshots/SNF98D1SK7NWD5KQJM20 b/icechunk-python/tests/data/test-repo/snapshots/SNF98D1SK7NWD5KQJM20 deleted file mode 100644 index fcfa0b68..00000000 Binary files a/icechunk-python/tests/data/test-repo/snapshots/SNF98D1SK7NWD5KQJM20 and /dev/null differ diff --git a/icechunk-python/tests/data/test-repo/snapshots/TNE0TX645A2G7VTXFA1G b/icechunk-python/tests/data/test-repo/snapshots/TNE0TX645A2G7VTXFA1G deleted file mode 100644 index 77790ad2..00000000 Binary files a/icechunk-python/tests/data/test-repo/snapshots/TNE0TX645A2G7VTXFA1G and /dev/null differ diff --git a/icechunk-python/tests/data/test-repo/snapshots/XDZ162T1TYBEJMK99NPG b/icechunk-python/tests/data/test-repo/snapshots/XDZ162T1TYBEJMK99NPG new file mode 100644 index 00000000..b4162290 Binary files /dev/null and b/icechunk-python/tests/data/test-repo/snapshots/XDZ162T1TYBEJMK99NPG differ diff --git a/icechunk-python/tests/data/test-repo/transactions/394QWZDXAY74HP6Q8P3G b/icechunk-python/tests/data/test-repo/transactions/394QWZDXAY74HP6Q8P3G deleted file mode 100644 index 0b4e36ae..00000000 Binary files a/icechunk-python/tests/data/test-repo/transactions/394QWZDXAY74HP6Q8P3G and /dev/null differ diff --git a/icechunk-python/tests/data/test-repo/transactions/3EKE17N8YF5ZK5NRMZJ0 b/icechunk-python/tests/data/test-repo/transactions/3EKE17N8YF5ZK5NRMZJ0 deleted file mode 100644 index 4716947c..00000000 Binary files a/icechunk-python/tests/data/test-repo/transactions/3EKE17N8YF5ZK5NRMZJ0 and /dev/null differ diff --git a/icechunk-python/tests/data/test-repo/transactions/4QF8JA0YPDN51MHSSYVG b/icechunk-python/tests/data/test-repo/transactions/4QF8JA0YPDN51MHSSYVG new file mode 100644 index 00000000..b92c29da Binary files /dev/null and b/icechunk-python/tests/data/test-repo/transactions/4QF8JA0YPDN51MHSSYVG differ diff --git a/icechunk-python/tests/data/test-repo/transactions/7XAF0Q905SH4938DN9CG b/icechunk-python/tests/data/test-repo/transactions/7XAF0Q905SH4938DN9CG new file mode 100644 index 00000000..c3df8b96 Binary files /dev/null and b/icechunk-python/tests/data/test-repo/transactions/7XAF0Q905SH4938DN9CG differ diff --git a/icechunk-python/tests/data/test-repo/transactions/949AXZ49X764TMDC6D4G b/icechunk-python/tests/data/test-repo/transactions/949AXZ49X764TMDC6D4G deleted file mode 100644 index 31d4665c..00000000 Binary files a/icechunk-python/tests/data/test-repo/transactions/949AXZ49X764TMDC6D4G and /dev/null differ diff --git a/icechunk-python/tests/data/test-repo/transactions/A2RD2Y65PR6D3B6BR1K0 b/icechunk-python/tests/data/test-repo/transactions/A2RD2Y65PR6D3B6BR1K0 deleted file mode 100644 index 55b10334..00000000 Binary files a/icechunk-python/tests/data/test-repo/transactions/A2RD2Y65PR6D3B6BR1K0 and /dev/null differ diff --git a/icechunk-python/tests/data/test-repo/transactions/GC4YVH5SKBPEZCENYQE0 b/icechunk-python/tests/data/test-repo/transactions/GC4YVH5SKBPEZCENYQE0 new file mode 100644 index 00000000..efdbb166 Binary files /dev/null and b/icechunk-python/tests/data/test-repo/transactions/GC4YVH5SKBPEZCENYQE0 differ diff --git a/icechunk-python/tests/data/test-repo/transactions/GNFK0SSWD5B8CVA53XEG b/icechunk-python/tests/data/test-repo/transactions/GNFK0SSWD5B8CVA53XEG deleted file mode 100644 index 29b3ccfc..00000000 Binary files a/icechunk-python/tests/data/test-repo/transactions/GNFK0SSWD5B8CVA53XEG and /dev/null differ diff --git a/icechunk-python/tests/data/test-repo/transactions/HNG82GMS51ECXFXFCYJG b/icechunk-python/tests/data/test-repo/transactions/HNG82GMS51ECXFXFCYJG deleted file mode 100644 index 632c49e7..00000000 Binary files a/icechunk-python/tests/data/test-repo/transactions/HNG82GMS51ECXFXFCYJG and /dev/null differ diff --git a/icechunk-python/tests/data/test-repo/transactions/K1BMYVG1HNVTNV1FSBH0 b/icechunk-python/tests/data/test-repo/transactions/K1BMYVG1HNVTNV1FSBH0 deleted file mode 100644 index 4490f2cb..00000000 Binary files a/icechunk-python/tests/data/test-repo/transactions/K1BMYVG1HNVTNV1FSBH0 and /dev/null differ diff --git a/icechunk-python/tests/data/test-repo/transactions/NXH3M0HJ7EEJ0699DPP0 b/icechunk-python/tests/data/test-repo/transactions/NXH3M0HJ7EEJ0699DPP0 new file mode 100644 index 00000000..12daffcb Binary files /dev/null and b/icechunk-python/tests/data/test-repo/transactions/NXH3M0HJ7EEJ0699DPP0 differ diff --git a/icechunk-python/tests/data/test-repo/transactions/RPA0WQCNM2N9HBBRHJQ0 b/icechunk-python/tests/data/test-repo/transactions/RPA0WQCNM2N9HBBRHJQ0 deleted file mode 100644 index 29bcdf8c..00000000 Binary files a/icechunk-python/tests/data/test-repo/transactions/RPA0WQCNM2N9HBBRHJQ0 and /dev/null differ diff --git a/icechunk-python/tests/data/test-repo/transactions/SNF98D1SK7NWD5KQJM20 b/icechunk-python/tests/data/test-repo/transactions/SNF98D1SK7NWD5KQJM20 deleted file mode 100644 index 271c8d02..00000000 Binary files a/icechunk-python/tests/data/test-repo/transactions/SNF98D1SK7NWD5KQJM20 and /dev/null differ diff --git a/icechunk-python/tests/data/test-repo/transactions/TNE0TX645A2G7VTXFA1G b/icechunk-python/tests/data/test-repo/transactions/TNE0TX645A2G7VTXFA1G deleted file mode 100644 index cd830e5b..00000000 Binary files a/icechunk-python/tests/data/test-repo/transactions/TNE0TX645A2G7VTXFA1G and /dev/null differ diff --git a/icechunk-python/tests/data/test-repo/transactions/XDZ162T1TYBEJMK99NPG b/icechunk-python/tests/data/test-repo/transactions/XDZ162T1TYBEJMK99NPG new file mode 100644 index 00000000..bcd46039 Binary files /dev/null and b/icechunk-python/tests/data/test-repo/transactions/XDZ162T1TYBEJMK99NPG differ diff --git a/icechunk-python/tests/test_config.py b/icechunk-python/tests/test_config.py index 6fb9fccf..df509752 100644 --- a/icechunk-python/tests/test_config.py +++ b/icechunk-python/tests/test_config.py @@ -127,7 +127,7 @@ def test_virtual_chunk_containers() -> None: container = icechunk.VirtualChunkContainer("custom", "s3://", store_config) config.set_virtual_chunk_container(container) assert re.match( - r"RepositoryConfig\(inline_chunk_threshold_bytes=None, unsafe_overwrite_refs=None, get_partial_values_concurrency=None, compression=None, caching=None, storage=None, manifest=.*\)", + r"RepositoryConfig\(inline_chunk_threshold_bytes=None, get_partial_values_concurrency=None, compression=None, caching=None, storage=None, manifest=.*\)", repr(config), ) assert config.virtual_chunk_containers @@ -158,11 +158,9 @@ def test_can_change_deep_config_values() -> None: ) config = icechunk.RepositoryConfig( inline_chunk_threshold_bytes=11, - unsafe_overwrite_refs=False, compression=icechunk.CompressionConfig(level=0), ) config.inline_chunk_threshold_bytes = 5 - config.unsafe_overwrite_refs = True config.get_partial_values_concurrency = 42 config.compression = icechunk.CompressionConfig(level=8) config.compression.level = 2 @@ -180,7 +178,7 @@ def test_can_change_deep_config_values() -> None: ) assert re.match( - r"RepositoryConfig\(inline_chunk_threshold_bytes=5, unsafe_overwrite_refs=True, get_partial_values_concurrency=42, compression=CompressionConfig\(algorithm=None, level=2\), caching=CachingConfig\(num_snapshot_nodes=None, num_chunk_refs=8, num_transaction_changes=None, num_bytes_attributes=None, num_bytes_chunks=None\), storage=StorageSettings\(concurrency=StorageConcurrencySettings\(max_concurrent_requests_for_object=5, ideal_concurrent_request_size=1000000\)\), manifest=.*\)", + r"RepositoryConfig\(inline_chunk_threshold_bytes=5, get_partial_values_concurrency=42, compression=CompressionConfig\(algorithm=None, level=2\), caching=CachingConfig\(num_snapshot_nodes=None, num_chunk_refs=8, num_transaction_changes=None, num_bytes_attributes=None, num_bytes_chunks=None\), storage=StorageSettings\(concurrency=StorageConcurrencySettings\(max_concurrent_requests_for_object=5, ideal_concurrent_request_size=1000000\)\), manifest=.*\)", repr(config), ) repo = icechunk.Repository.open( diff --git a/icechunk/examples/multithreaded_store.rs b/icechunk/examples/multithreaded_store.rs index d64a7c03..1e77e9e8 100644 --- a/icechunk/examples/multithreaded_store.rs +++ b/icechunk/examples/multithreaded_store.rs @@ -13,7 +13,6 @@ async fn main() -> Result<(), Box> { let storage = new_in_memory_storage().await?; let config = RepositoryConfig { inline_chunk_threshold_bytes: Some(128), - unsafe_overwrite_refs: Some(true), ..Default::default() }; let repo = Repository::create(Some(config), storage, HashMap::new()).await?; diff --git a/icechunk/src/config.rs b/icechunk/src/config.rs index bbb9b391..a026d24b 100644 --- a/icechunk/src/config.rs +++ b/icechunk/src/config.rs @@ -205,11 +205,6 @@ impl ManifestConfig { pub struct RepositoryConfig { /// Chunks smaller than this will be stored inline in the manifst pub inline_chunk_threshold_bytes: Option, - /// Unsafely overwrite refs on write. This is not recommended, users should only use it at their - /// own risk in object stores for which we don't support write-object-if-not-exists. There is - /// the possibility of race conditions if this variable is set to true and there are concurrent - /// commit attempts. - pub unsafe_overwrite_refs: Option, /// Concurrency used by the get_partial_values operation to fetch different keys in parallel pub get_partial_values_concurrency: Option, @@ -236,9 +231,6 @@ impl RepositoryConfig { pub fn inline_chunk_threshold_bytes(&self) -> u16 { self.inline_chunk_threshold_bytes.unwrap_or(512) } - pub fn unsafe_overwrite_refs(&self) -> bool { - self.unsafe_overwrite_refs.unwrap_or(false) - } pub fn get_partial_values_concurrency(&self) -> u16 { self.get_partial_values_concurrency.unwrap_or(10) } @@ -268,9 +260,6 @@ impl RepositoryConfig { inline_chunk_threshold_bytes: other .inline_chunk_threshold_bytes .or(self.inline_chunk_threshold_bytes), - unsafe_overwrite_refs: other - .unsafe_overwrite_refs - .or(self.unsafe_overwrite_refs), get_partial_values_concurrency: other .get_partial_values_concurrency .or(self.get_partial_values_concurrency), diff --git a/icechunk/src/format/manifest.rs b/icechunk/src/format/manifest.rs index 8e92cd34..bcf665f1 100644 --- a/icechunk/src/format/manifest.rs +++ b/icechunk/src/format/manifest.rs @@ -366,7 +366,7 @@ fn ref_to_payload( fn checksum(payload: &gen::ChunkRef<'_>) -> Option { if let Some(etag) = payload.checksum_etag() { - Some(Checksum::ETag(etag.to_string())) + Some(Checksum::ETag(ETag(etag.to_string()))) } else if payload.checksum_last_modified() > 0 { Some(Checksum::LastModified(SecondsSinceEpoch(payload.checksum_last_modified()))) } else { @@ -398,7 +398,7 @@ fn mk_chunk_ref<'bldr>( Some(cs) => match cs { Checksum::LastModified(_) => None, Checksum::ETag(etag) => { - Some(builder.create_string(etag.as_str())) + Some(builder.create_string(etag.0.as_str())) } }, None => None, diff --git a/icechunk/src/ops/gc.rs b/icechunk/src/ops/gc.rs index 5af763ee..73bbc422 100644 --- a/icechunk/src/ops/gc.rs +++ b/icechunk/src/ops/gc.rs @@ -527,7 +527,7 @@ pub async fn expire( ExpireRefResult::RefIsExpired => match &r { Ref::Tag(name) => { if expired_tags == ExpiredRefAction::Delete { - delete_tag(storage, storage_settings, name.as_str(), false) + delete_tag(storage, storage_settings, name.as_str()) .await .map_err(GCError::Ref)?; result.deleted_refs.insert(r); diff --git a/icechunk/src/refs.rs b/icechunk/src/refs.rs index f6902d35..e1309374 100644 --- a/icechunk/src/refs.rs +++ b/icechunk/src/refs.rs @@ -8,7 +8,7 @@ use async_recursion::async_recursion; use bytes::Bytes; use futures::{ stream::{FuturesOrdered, FuturesUnordered}, - FutureExt, Stream, StreamExt, TryStreamExt, + FutureExt, StreamExt, }; use itertools::Itertools; use serde::{Deserialize, Serialize}; @@ -19,22 +19,10 @@ use tracing::instrument; use crate::{ error::ICError, format::SnapshotId, - storage::{self, GetRefResult, StorageErrorKind, WriteRefResult}, + storage::{self, GetRefResult, StorageErrorKind, VersionInfo, WriteRefResult}, Storage, StorageError, }; -fn crock_encode_int(n: u64) -> String { - // skip the first 3 bytes (zeroes) - base32::encode(base32::Alphabet::Crockford, &n.to_be_bytes()[3..=7]) -} - -fn crock_decode_int(data: &str) -> Option { - // re insert the first 3 bytes removed during encoding - let mut bytes = vec![0, 0, 0]; - bytes.extend(base32::decode(base32::Alphabet::Crockford, data)?); - Some(u64::from_be_bytes(bytes.as_slice().try_into().ok()?)) -} - #[derive(Debug, Error)] pub enum RefErrorKind { #[error(transparent)] @@ -49,9 +37,6 @@ pub enum RefErrorKind { #[error("invalid ref name `{0}`")] InvalidRefName(String), - #[error("invalid branch version `{0}`")] - InvalidBranchVersion(String), - #[error("tag already exists, tags are immutable: `{0}`")] TagAlreadyExists(String), @@ -128,35 +113,6 @@ impl Ref { } } -#[derive(Debug, Clone, Eq, PartialEq)] -pub struct BranchVersion(pub u64); - -impl BranchVersion { - const MAX_VERSION_NUMBER: u64 = 1099511627775; - - fn decode(version: &str) -> RefResult { - let n = crock_decode_int(version) - .ok_or(RefErrorKind::InvalidBranchVersion(version.to_string()))?; - Ok(BranchVersion(BranchVersion::MAX_VERSION_NUMBER - n)) - } - - fn encode(&self) -> String { - crock_encode_int(BranchVersion::MAX_VERSION_NUMBER - self.0) - } - - fn to_path(&self, branch_name: &str) -> RefResult { - branch_key(branch_name, self.encode().as_str()) - } - - fn initial() -> Self { - Self(0) - } - - fn inc(&self) -> Self { - Self(self.0 + 1) - } -} - #[serde_as] #[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)] pub struct RefData { @@ -164,7 +120,7 @@ pub struct RefData { pub snapshot: SnapshotId, } -const TAG_KEY_NAME: &str = "ref.json"; +const REF_KEY_NAME: &str = "ref.json"; const TAG_DELETE_MARKER_KEY_NAME: &str = "ref.json.deleted"; fn tag_key(tag_name: &str) -> RefResult { @@ -172,7 +128,7 @@ fn tag_key(tag_name: &str) -> RefResult { return Err(RefErrorKind::InvalidRefName(tag_name.to_string()).into()); } - Ok(format!("tag.{}/{}", tag_name, TAG_KEY_NAME)) + Ok(format!("tag.{}/{}", tag_name, REF_KEY_NAME)) } fn tag_delete_marker_key(tag_name: &str) -> RefResult { @@ -183,24 +139,19 @@ fn tag_delete_marker_key(tag_name: &str) -> RefResult { Ok(format!("tag.{}/{}", tag_name, TAG_DELETE_MARKER_KEY_NAME)) } -fn branch_root(branch_name: &str) -> RefResult { +fn branch_key(branch_name: &str) -> RefResult { if branch_name.contains('/') { return Err(RefErrorKind::InvalidRefName(branch_name.to_string()).into()); } - Ok(format!("branch.{}", branch_name)) + Ok(format!("branch.{}/{}", branch_name, REF_KEY_NAME)) } -fn branch_key(branch_name: &str, version_id: &str) -> RefResult { - branch_root(branch_name).map(|root| format!("{}/{}.json", root, version_id)) -} - -#[instrument(skip(storage, storage_settings, overwrite_refs))] +#[instrument(skip(storage, storage_settings))] pub async fn create_tag( storage: &(dyn Storage + Send + Sync), storage_settings: &storage::Settings, name: &str, snapshot: SnapshotId, - overwrite_refs: bool, ) -> RefResult<()> { let key = tag_key(name)?; let data = RefData { snapshot }; @@ -209,8 +160,8 @@ pub async fn create_tag( .write_ref( storage_settings, key.as_str(), - overwrite_refs, Bytes::copy_from_slice(&content), + &VersionInfo::for_creation(), ) .await { @@ -223,51 +174,47 @@ pub async fn create_tag( } #[async_recursion] -#[instrument(skip(storage, storage_settings, overwrite_refs))] +#[instrument(skip(storage, storage_settings))] pub async fn update_branch( storage: &(dyn Storage + Send + Sync), storage_settings: &storage::Settings, name: &str, new_snapshot: SnapshotId, current_snapshot: Option<&SnapshotId>, - overwrite_refs: bool, -) -> RefResult { - let last_version = last_branch_version(storage, storage_settings, name).await; - let last_ref_data = match last_version { - Ok(version) => fetch_branch(storage, storage_settings, name, &version) - .await - .map(|d| Some((version, d))), - Err(RefError { kind: RefErrorKind::RefNotFound(_), .. }) => Ok(None), - Err(err) => Err(err), - }?; - let last_snapshot = last_ref_data.as_ref().map(|d| &d.1.snapshot); - if last_snapshot != current_snapshot { +) -> RefResult<()> { + let (ref_data, version) = match fetch_branch(storage, storage_settings, name).await { + Ok((ref_data, version)) => (Some(ref_data), version), + Err(RefError { kind: RefErrorKind::RefNotFound(..), .. }) => { + (None, VersionInfo::for_creation()) + } + Err(err) => { + return Err(err); + } + }; + + if ref_data.as_ref().map(|rd| &rd.snapshot) != current_snapshot { return Err(RefErrorKind::Conflict { expected_parent: current_snapshot.cloned(), - actual_parent: last_snapshot.cloned(), + actual_parent: ref_data.map(|rd| rd.snapshot), } .into()); } - let new_version = match last_ref_data { - Some((version, _)) => version.inc(), - None => BranchVersion::initial(), - }; - let key = new_version.to_path(name)?; + let key = branch_key(name)?; let data = RefData { snapshot: new_snapshot }; let content = serde_json::to_vec(&data)?; match storage .write_ref( storage_settings, key.as_str(), - overwrite_refs, Bytes::copy_from_slice(&content), + &version, ) .await { - Ok(WriteRefResult::Written) => Ok(new_version), + Ok(WriteRefResult::Written) => Ok(()), Ok(WriteRefResult::WontOverwrite) => { - // If the branch version already exists, an update happened since we checked + // If the already exists, an update happened since we checked // we can just try again and the conflict will be reported update_branch( storage, @@ -275,7 +222,6 @@ pub async fn update_branch( name, data.snapshot, current_snapshot, - overwrite_refs, ) .await } @@ -345,56 +291,25 @@ pub async fn list_branches( Ok(branches) } -async fn branch_history<'a>( - storage: &'a (dyn Storage + Send + Sync), - storage_settings: &storage::Settings, - branch: &str, -) -> RefResult> + 'a> { - let key = branch_root(branch)?; - let all = storage.ref_versions(storage_settings, key.as_str()).await?; - Ok(all.map_err(|e| e.into()).and_then(move |version_id| async move { - let version = version_id - .strip_suffix(".json") - .ok_or(RefErrorKind::InvalidRefName(version_id.clone()))?; - BranchVersion::decode(version) - })) -} - -async fn last_branch_version( - storage: &(dyn Storage + Send + Sync), - storage_settings: &storage::Settings, - branch: &str, -) -> RefResult { - // TODO! optimize - let mut all = Box::pin(branch_history(storage, storage_settings, branch).await?); - all.try_next().await?.ok_or(RefErrorKind::RefNotFound(branch.to_string()).into()) -} - #[instrument(skip(storage, storage_settings))] pub async fn delete_branch( storage: &(dyn Storage + Send + Sync), storage_settings: &storage::Settings, branch: &str, ) -> RefResult<()> { - let key = branch_root(branch)?; - let key_ref = key.as_str(); - let refs = storage - .ref_versions(storage_settings, key_ref) - .await? - .filter_map(|v| async move { - v.ok().map(|v| format!("{}/{}", key_ref, v).as_str().to_string()) - }) - .boxed(); - storage.delete_refs(storage_settings, refs).await?; + // we make sure the branch exists + _ = fetch_branch_tip(storage, storage_settings, branch).await?; + + let key = branch_key(branch)?; + storage.delete_refs(storage_settings, futures::stream::iter([key]).boxed()).await?; Ok(()) } -#[instrument(skip(storage, storage_settings, overwrite_refs))] +#[instrument(skip(storage, storage_settings))] pub async fn delete_tag( storage: &(dyn Storage + Send + Sync), storage_settings: &storage::Settings, tag: &str, - overwrite_refs: bool, ) -> RefResult<()> { // we make sure the tag exists _ = fetch_tag(storage, storage_settings, tag).await?; @@ -405,8 +320,8 @@ pub async fn delete_tag( .write_ref( storage_settings, key.as_str(), - overwrite_refs, Bytes::from_static(&[]), + &VersionInfo::for_creation(), ) .await { @@ -429,7 +344,7 @@ pub async fn fetch_tag( let fut1: Pin>>> = async move { match storage.get_ref(storage_settings, ref_path.as_str()).await { - Ok(GetRefResult::Found { bytes }) => Ok(bytes), + Ok(GetRefResult::Found { bytes, .. }) => Ok(bytes), Ok(GetRefResult::NotFound) => { Err(RefErrorKind::RefNotFound(name.to_string()).into()) } @@ -472,11 +387,13 @@ async fn fetch_branch( storage: &(dyn Storage + Send + Sync), storage_settings: &storage::Settings, name: &str, - version: &BranchVersion, -) -> RefResult { - let path = version.to_path(name)?; - match storage.get_ref(storage_settings, path.as_str()).await { - Ok(GetRefResult::Found { bytes }) => Ok(serde_json::from_slice(bytes.as_ref())?), +) -> RefResult<(RefData, VersionInfo)> { + let ref_key = branch_key(name)?; + match storage.get_ref(storage_settings, ref_key.as_str()).await { + Ok(GetRefResult::Found { bytes, version }) => { + let data = serde_json::from_slice(bytes.as_ref())?; + Ok((data, version)) + } Ok(GetRefResult::NotFound) => { Err(RefErrorKind::RefNotFound(name.to_string()).into()) } @@ -490,30 +407,13 @@ pub async fn fetch_branch_tip( storage_settings: &storage::Settings, name: &str, ) -> RefResult { - let version = last_branch_version(storage, storage_settings, name).await?; - fetch_branch(storage, storage_settings, name, &version).await -} - -#[instrument(skip(storage, storage_settings))] -pub async fn fetch_ref( - storage: &(dyn Storage + Send + Sync), - storage_settings: &storage::Settings, - ref_name: &str, -) -> RefResult<(Ref, RefData)> { - match fetch_tag(storage, storage_settings, ref_name).await { - Ok(from_ref) => Ok((Ref::Tag(ref_name.to_string()), from_ref)), - Err(RefError { kind: RefErrorKind::RefNotFound(_), .. }) => { - let data = fetch_branch_tip(storage, storage_settings, ref_name).await?; - Ok((Ref::Branch(ref_name.to_string()), data)) - } - Err(err) => Err(err), - } + Ok(fetch_branch(storage, storage_settings, name).await?.0) } #[cfg(test)] #[allow(clippy::panic, clippy::unwrap_used, clippy::expect_used)] mod tests { - use std::{iter::once, sync::Arc}; + use std::sync::Arc; use futures::Future; use pretty_assertions::assert_eq; @@ -523,31 +423,6 @@ mod tests { use super::*; - #[tokio::test] - async fn test_branch_version_encoding() -> Result<(), Box> { - let targets = (0..10u64).chain(once(BranchVersion::MAX_VERSION_NUMBER)); - let encodings = [ - "ZZZZZZZZ", "ZZZZZZZY", "ZZZZZZZX", "ZZZZZZZW", "ZZZZZZZV", - // no U - "ZZZZZZZT", "ZZZZZZZS", "ZZZZZZZR", "ZZZZZZZQ", "ZZZZZZZP", - ]; - - for n in targets { - let encoded = BranchVersion(n).encode(); - - if n < 100 { - assert_eq!(encoded, encodings[n as usize]); - } - if n == BranchVersion::MAX_VERSION_NUMBER { - assert_eq!(encoded, "00000000"); - } - - let round = BranchVersion::decode(encoded.as_str())?; - assert_eq!(round, BranchVersion(n)); - } - Ok(()) - } - /// Execute the passed block with all test implementations of Storage. /// /// Currently this function executes against the in-memory and local filesystem object_store @@ -560,6 +435,7 @@ mod tests { mut f: F, ) -> ((Arc, R), (Arc, R, TempDir)) { let mem_storage = new_in_memory_storage().await.unwrap(); + println!("Using mem storage"); let res1 = f(Arc::clone(&mem_storage) as Arc).await; let dir = tempdir().expect("cannot create temp dir"); @@ -567,6 +443,7 @@ mod tests { .await .expect("Cannot create local Storage"); + println!("Using local file system storage"); let res2 = f(Arc::clone(&local_storage) as Arc).await; ((mem_storage, res1), (local_storage, res2, dir)) } @@ -579,28 +456,18 @@ mod tests { let s2 = SnapshotId::random(); let res = fetch_tag(storage.as_ref(), &storage_settings, "tag1").await; - assert!(matches!(res, Err(RefError{kind: RefErrorKind::RefNotFound(name),..}) if name == *"tag1")); + assert!(matches!(res, Err(RefError{kind: RefErrorKind::RefNotFound(name),..}) if name == "tag1")); assert_eq!(list_refs(storage.as_ref(), &storage_settings).await?, BTreeSet::new()); - create_tag(storage.as_ref(), &storage_settings, "tag1", s1.clone(), false).await?; - create_tag(storage.as_ref(), &storage_settings, "tag2", s2.clone(), false).await?; + create_tag(storage.as_ref(), &storage_settings, "tag1", s1.clone()).await?; + create_tag(storage.as_ref(), &storage_settings, "tag2", s2.clone()).await?; let res = fetch_tag(storage.as_ref(), &storage_settings, "tag1").await?; assert_eq!(res.snapshot, s1); - assert_eq!( - fetch_tag(storage.as_ref(), &storage_settings, "tag1").await?, - fetch_ref(storage.as_ref(), &storage_settings, "tag1").await?.1 - ); - let res = fetch_tag(storage.as_ref(), &storage_settings, "tag2").await?; assert_eq!(res.snapshot, s2); - assert_eq!( - fetch_tag(storage.as_ref(), &storage_settings, "tag2").await?, - fetch_ref(storage.as_ref(), &storage_settings, "tag2").await?.1 - ); - assert_eq!( list_refs(storage.as_ref(), &storage_settings).await?, BTreeSet::from([Ref::Tag("tag1".to_string()), Ref::Tag("tag2".to_string())]) @@ -608,8 +475,8 @@ mod tests { // attempts to recreate a tag fail assert!(matches!( - create_tag(storage.as_ref(), &storage_settings, "tag1", s1.clone(), false).await, - Err(RefError{kind: RefErrorKind::TagAlreadyExists(name), ..}) if name == *"tag1" + create_tag(storage.as_ref(), &storage_settings, "tag1", s1.clone()).await, + Err(RefError{kind: RefErrorKind::TagAlreadyExists(name), ..}) if name == "tag1" )); assert_eq!( list_refs(storage.as_ref(), &storage_settings).await?, @@ -618,7 +485,7 @@ mod tests { // attempting to create a branch that doesn't exist, with a fake parent let res = - update_branch(storage.as_ref(), &storage_settings, "branch0", s1.clone(), Some(&s2), false) + update_branch(storage.as_ref(), &storage_settings, "branch0", s1.clone(), Some(&s2)) .await; assert!(res.is_err()); assert_eq!( @@ -627,34 +494,21 @@ mod tests { ); // create a branch successfully - update_branch(storage.as_ref(), &storage_settings, "branch1", s1.clone(), None, false).await?; + update_branch(storage.as_ref(), &storage_settings, "branch1", s1.clone(), None).await?; + assert_eq!( - branch_history(storage.as_ref(), &storage_settings, "branch1") - .await? - .try_collect::>() - .await?, - vec![BranchVersion(0)] - ); - assert_eq!( - last_branch_version(storage.as_ref(), &storage_settings, "branch1").await?, - BranchVersion(0) - ); - assert_eq!( - fetch_branch(storage.as_ref(), &storage_settings, "branch1", &BranchVersion(0)).await?, + fetch_branch_tip(storage.as_ref(), &storage_settings, "branch1").await?, RefData { snapshot: s1.clone() } ); - assert_eq!( - fetch_branch(storage.as_ref(), &storage_settings, "branch1", &BranchVersion(0)).await?, - fetch_ref(storage.as_ref(), &storage_settings, "branch1").await?.1 - ); + assert_eq!( list_refs(storage.as_ref(), &storage_settings).await?, BTreeSet::from([ Ref::Branch("branch1".to_string()), - Ref::Tag("tag1".to_string()), - Ref::Tag("tag2".to_string()) + Ref::Tag("tag1".to_string()), + Ref::Tag("tag2".to_string()) ]) ); @@ -664,36 +518,18 @@ mod tests { "branch1", s2.clone(), Some(&s1.clone()), - false, ) .await?; assert_eq!( - branch_history(storage.as_ref(), &storage_settings, "branch1") - .await? - .try_collect::>() - .await?, - vec![BranchVersion(1), BranchVersion(0)] - ); - assert_eq!( - last_branch_version(storage.as_ref(), &storage_settings, "branch1").await?, - BranchVersion(1) - ); - - assert_eq!( - fetch_branch(storage.as_ref(), &storage_settings, "branch1", &BranchVersion(1)).await?, + fetch_branch_tip(storage.as_ref(), &storage_settings, "branch1").await?, RefData { snapshot: s2.clone() } ); - assert_eq!( - fetch_branch(storage.as_ref(), &storage_settings, "branch1", &BranchVersion(1)).await?, - fetch_ref(storage.as_ref(), &storage_settings, "branch1").await?.1 - ); - let sid = SnapshotId::random(); // update a branch with the wrong parent let res = - update_branch(storage.as_ref(), &storage_settings, "branch1", sid.clone(), Some(&s1), false) + update_branch(storage.as_ref(), &storage_settings, "branch1", sid.clone(), Some(&s1)) .await; assert!(matches!(res, Err(RefError{kind: RefErrorKind::Conflict { expected_parent, actual_parent }, ..}) @@ -701,35 +537,19 @@ mod tests { )); // update the branch again but now with the right parent - update_branch(storage.as_ref(), &storage_settings, "branch1", sid.clone(), Some(&s2), false) + update_branch(storage.as_ref(), &storage_settings, "branch1", sid.clone(), Some(&s2)) .await?; assert_eq!( - branch_history(storage.as_ref(), &storage_settings, "branch1") - .await? - .try_collect::>() - .await?, - vec![BranchVersion(2), BranchVersion(1), BranchVersion(0)] - ); - assert_eq!( - last_branch_version(storage.as_ref(), &storage_settings, "branch1").await?, - BranchVersion(2) + fetch_branch_tip(storage.as_ref(), &storage_settings, "branch1").await?, + RefData { snapshot: sid.clone() } ); - assert_eq!( - fetch_branch(storage.as_ref(), &storage_settings, "branch1", &BranchVersion(2)).await?, - fetch_ref(storage.as_ref(), &storage_settings, "branch1").await?.1 - ); - - assert_eq!( - fetch_ref(storage.as_ref(), &storage_settings, "branch1").await?, - (Ref::Branch("branch1".to_string()), RefData { snapshot: sid.clone() }) - ); // delete a branch delete_branch(storage.as_ref(), &storage_settings, "branch1").await?; assert!(matches!( - fetch_ref(storage.as_ref(), &storage_settings, "branch1").await, + fetch_branch_tip(storage.as_ref(), &storage_settings, "branch1").await, Err(RefError{kind: RefErrorKind::RefNotFound(name),..}) if name == "branch1" )); @@ -750,13 +570,13 @@ mod tests { let storage_settings = storage.default_settings(); let s1 = SnapshotId::random(); let s2 = SnapshotId::random(); - create_tag(storage.as_ref(), &storage_settings, "tag1", s1, false).await?; + create_tag(storage.as_ref(), &storage_settings, "tag1", s1).await?; // we can delete tags - delete_tag(storage.as_ref(), &storage_settings, "tag1", false).await?; + delete_tag(storage.as_ref(), &storage_settings, "tag1").await?; // cannot delete twice - assert!(delete_tag(storage.as_ref(), &storage_settings, "tag1", false) + assert!(delete_tag(storage.as_ref(), &storage_settings, "tag1") .await .is_err()); @@ -765,7 +585,6 @@ mod tests { storage.as_ref(), &storage_settings, "doesnt_exist", - false ) .await .is_err()); @@ -776,14 +595,13 @@ mod tests { &storage_settings, "tag1", s2.clone(), - false ) .await, Err(RefError{kind: RefErrorKind::TagAlreadyExists(name),..}) if name == "tag1"); assert!(list_tags(storage.as_ref(), &storage_settings).await?.is_empty()); // can create different tag - create_tag(storage.as_ref(), &storage_settings, "tag2", s2, false).await?; + create_tag(storage.as_ref(), &storage_settings, "tag2", s2).await?; // listing doesn't include deleted tags assert_eq!( diff --git a/icechunk/src/repository.rs b/icechunk/src/repository.rs index 86b35e9c..44ab628e 100644 --- a/icechunk/src/repository.rs +++ b/icechunk/src/repository.rs @@ -6,6 +6,7 @@ use std::{ }; use bytes::Bytes; +use err_into::ErrorInto as _; use futures::{ stream::{FuturesOrdered, FuturesUnordered}, Stream, StreamExt, TryStreamExt, @@ -28,11 +29,10 @@ use crate::{ }, refs::{ create_tag, delete_branch, delete_tag, fetch_branch_tip, fetch_tag, - list_branches, list_tags, update_branch, BranchVersion, Ref, RefError, - RefErrorKind, + list_branches, list_tags, update_branch, Ref, RefError, RefErrorKind, }, session::{Session, SessionErrorKind, SessionResult}, - storage::{self, ETag, FetchConfigResult, StorageErrorKind, UpdateConfigResult}, + storage::{self, FetchConfigResult, StorageErrorKind, UpdateConfigResult}, virtual_chunks::{ContainerName, VirtualChunkResolver}, Storage, StorageError, }; @@ -125,7 +125,7 @@ pub type RepositoryResult = Result; pub struct Repository { config: RepositoryConfig, storage_settings: storage::Settings, - config_etag: Option, + config_version: storage::VersionInfo, storage: Arc, asset_manager: Arc, virtual_resolver: Arc, @@ -147,7 +147,6 @@ impl Repository { let config = config.map(|c| RepositoryConfig::default().merge(c)).unwrap_or_default(); let compression = config.compression().level(); - let overwrite_refs = config.unsafe_overwrite_refs(); let storage_c = Arc::clone(&storage); let storage_settings = config.storage().cloned().unwrap_or_else(|| storage.default_settings()); @@ -174,7 +173,6 @@ impl Repository { Ref::DEFAULT_BRANCH, new_snapshot.id().clone(), None, - overwrite_refs, ) .await?; Ok::<(), RepositoryError>(()) @@ -187,23 +185,26 @@ impl Repository { let handle2 = tokio::spawn( async move { if has_overriden_config { - let etag = - Repository::store_config(storage_c.as_ref(), &config_c, None) - .await?; - Ok::<_, RepositoryError>(Some(etag)) + let version = Repository::store_config( + storage_c.as_ref(), + &config_c, + &storage::VersionInfo::for_creation(), + ) + .await?; + Ok::<_, RepositoryError>(version) } else { - Ok(None) + Ok(storage::VersionInfo::for_creation()) } } .in_current_span(), ); handle1.await??; - let config_etag = handle2.await??; + let config_version = handle2.await??; debug_assert!(Self::exists(storage.as_ref()).await.unwrap_or(false)); - Self::new(config, config_etag, storage, virtual_chunk_credentials) + Self::new(config, config_version, storage, virtual_chunk_credentials) } #[instrument(skip_all)] @@ -233,17 +234,22 @@ impl Repository { #[allow(clippy::expect_used)] handle2.await.expect("Error checking if repo exists")?; #[allow(clippy::expect_used)] - if let Some((default_config, config_etag)) = + if let Some((default_config, config_version)) = handle1.await.expect("Error fetching repo config")? { // Merge the given config with the defaults let config = config.map(|c| default_config.merge(c)).unwrap_or(default_config); - Self::new(config, Some(config_etag), storage, virtual_chunk_credentials) + Self::new(config, config_version, storage, virtual_chunk_credentials) } else { let config = config.unwrap_or_default(); - Self::new(config, None, storage, virtual_chunk_credentials) + Self::new( + config, + storage::VersionInfo::for_creation(), + storage, + virtual_chunk_credentials, + ) } } @@ -261,7 +267,7 @@ impl Repository { fn new( config: RepositoryConfig, - config_etag: Option, + config_version: storage::VersionInfo, storage: Arc, virtual_chunk_credentials: HashMap, ) -> RepositoryResult { @@ -282,7 +288,7 @@ impl Repository { )); Ok(Self { config, - config_etag, + config_version, storage, storage_settings, virtual_resolver, @@ -316,7 +322,7 @@ impl Repository { Self::new( config, - self.config_etag.clone(), + self.config_version.clone(), Arc::clone(&self.storage), virtual_chunk_credentials .unwrap_or_else(|| self.virtual_chunk_credentials.clone()), @@ -326,22 +332,22 @@ impl Repository { #[instrument(skip_all)] pub async fn fetch_config( storage: &(dyn Storage + Send + Sync), - ) -> RepositoryResult> { + ) -> RepositoryResult> { match storage.fetch_config(&storage.default_settings()).await? { - FetchConfigResult::Found { bytes, etag } => { + FetchConfigResult::Found { bytes, version } => { let config = serde_yaml_ng::from_slice(&bytes)?; - Ok(Some((config, etag))) + Ok(Some((config, version))) } FetchConfigResult::NotFound => Ok(None), } } #[instrument(skip_all)] - pub async fn save_config(&self) -> RepositoryResult { + pub async fn save_config(&self) -> RepositoryResult { Repository::store_config( self.storage().as_ref(), self.config(), - self.config_etag.as_ref(), + &self.config_version, ) .await } @@ -350,18 +356,14 @@ impl Repository { pub(crate) async fn store_config( storage: &(dyn Storage + Send + Sync), config: &RepositoryConfig, - config_etag: Option<&ETag>, - ) -> RepositoryResult { + previous_version: &storage::VersionInfo, + ) -> RepositoryResult { let bytes = Bytes::from(serde_yaml_ng::to_string(config)?); match storage - .update_config( - &storage.default_settings(), - bytes, - config_etag.map(|e| e.as_str()), - ) + .update_config(&storage.default_settings(), bytes, previous_version) .await? { - UpdateConfigResult::Updated { new_etag } => Ok(new_etag), + UpdateConfigResult::Updated { new_version } => Ok(new_version), UpdateConfigResult::NotOnLatestVersion => { Err(RepositoryErrorKind::ConfigWasUpdated.into()) } @@ -426,30 +428,23 @@ impl Repository { &self, branch_name: &str, snapshot_id: &SnapshotId, - ) -> RepositoryResult { + ) -> RepositoryResult<()> { // TODO: The parent snapshot should exist? - let version = match update_branch( + update_branch( self.storage.as_ref(), &self.storage_settings, branch_name, snapshot_id.clone(), None, - self.config().unsafe_overwrite_refs(), ) .await - { - Ok(branch_version) => Ok::<_, RepositoryError>(branch_version), - Err(RefError { + .map_err(|e| match e { + RefError { kind: RefErrorKind::Conflict { expected_parent, actual_parent }, .. - }) => { - Err(RepositoryErrorKind::Conflict { expected_parent, actual_parent } - .into()) - } - Err(err) => Err(err.into()), - }?; - - Ok(version) + } => RepositoryErrorKind::Conflict { expected_parent, actual_parent }.into(), + err => err.into(), + }) } /// List all branches in the repository. @@ -477,7 +472,7 @@ impl Repository { &self, branch: &str, snapshot_id: &SnapshotId, - ) -> RepositoryResult { + ) -> RepositoryResult<()> { raise_if_invalid_snapshot_id( self.storage.as_ref(), &self.storage_settings, @@ -485,17 +480,15 @@ impl Repository { ) .await?; let branch_tip = self.lookup_branch(branch).await?; - let version = update_branch( + update_branch( self.storage.as_ref(), &self.storage_settings, branch, snapshot_id.clone(), Some(&branch_tip), - self.config().unsafe_overwrite_refs(), ) - .await?; - - Ok(version) + .await + .err_into() } /// Delete a branch from the repository. @@ -516,13 +509,7 @@ impl Repository { /// chunks or snapshots associated with the tag. #[instrument(skip(self))] pub async fn delete_tag(&self, tag: &str) -> RepositoryResult<()> { - Ok(delete_tag( - self.storage.as_ref(), - &self.storage_settings, - tag, - self.config().unsafe_overwrite_refs(), - ) - .await?) + Ok(delete_tag(self.storage.as_ref(), &self.storage_settings, tag).await?) } /// Create a new tag in the repository at the given snapshot id @@ -537,7 +524,6 @@ impl Repository { &self.storage_settings, tag_name, snapshot_id.clone(), - self.config().unsafe_overwrite_refs(), ) .await?; Ok(()) @@ -851,8 +837,8 @@ mod tests { // it inits with the default config assert_eq!(repo.config(), &RepositoryConfig::default()); // updating the persistent config create a new file with default values - let etag = repo.save_config().await?; - assert_ne!(etag, ""); + let version = repo.save_config().await?; + assert_ne!(version, storage::VersionInfo::for_creation()); assert_eq!( Repository::fetch_config(storage.as_ref()).await?.unwrap().0, RepositoryConfig::default() @@ -872,8 +858,8 @@ mod tests { assert_eq!(repo.config().inline_chunk_threshold_bytes(), 42); // update the persistent config - let etag = repo.save_config().await?; - assert_ne!(etag, ""); + let version = repo.save_config().await?; + assert_ne!(version, storage::VersionInfo::for_creation()); assert_eq!( Repository::fetch_config(storage.as_ref()) .await? @@ -915,8 +901,8 @@ mod tests { assert_eq!(repo.config().caching().num_chunk_refs(), 100); // and we can save the merge - let etag = repo.save_config().await?; - assert_ne!(etag, ""); + let version = repo.save_config().await?; + assert_ne!(version, storage::VersionInfo::for_creation()); assert_eq!( &Repository::fetch_config(storage.as_ref()).await?.unwrap().0, repo.config() diff --git a/icechunk/src/session.rs b/icechunk/src/session.rs index 88cba53a..93988195 100644 --- a/icechunk/src/session.rs +++ b/icechunk/src/session.rs @@ -812,7 +812,6 @@ impl Session { let id = match current { Err(RefError { kind: RefErrorKind::RefNotFound(_), .. }) => { do_commit( - &self.config, self.storage.as_ref(), Arc::clone(&self.asset_manager), self.storage_settings.as_ref(), @@ -835,7 +834,6 @@ impl Session { .into()) } else { do_commit( - &self.config, self.storage.as_ref(), Arc::clone(&self.asset_manager), self.storage_settings.as_ref(), @@ -1624,7 +1622,6 @@ async fn flush( #[allow(clippy::too_many_arguments)] async fn do_commit( - config: &RepositoryConfig, storage: &(dyn Storage + Send + Sync), asset_manager: Arc, storage_settings: &storage::Settings, @@ -1647,7 +1644,6 @@ async fn do_commit( branch_name, new_snapshot.clone(), Some(&parent_snapshot), - config.unsafe_overwrite_refs(), ) .await { @@ -1747,7 +1743,7 @@ mod tests { detector::ConflictDetector, }, format::manifest::ManifestExtents, - refs::{fetch_ref, Ref}, + refs::{fetch_tag, Ref}, repository::VersionInfo, storage::new_in_memory_storage, strategies::{ @@ -2021,11 +2017,14 @@ mod tests { "main", snapshot.id().clone(), None, - true, ) .await?; - Repository::store_config(storage.as_ref(), &RepositoryConfig::default(), None) - .await?; + Repository::store_config( + storage.as_ref(), + &RepositoryConfig::default(), + &storage::VersionInfo::for_creation(), + ) + .await?; let repo = Repository::open(None, storage, HashMap::new()).await?; let mut ds = repo.writable_session("main").await?; @@ -2864,14 +2863,12 @@ mod tests { let new_snapshot_id = ds.commit("first commit", None).await?; assert_eq!( new_snapshot_id, - fetch_ref(storage.as_ref(), &storage_settings, "main").await?.1.snapshot + fetch_branch_tip(storage.as_ref(), &storage_settings, "main").await?.snapshot ); assert_eq!(&new_snapshot_id, ds.snapshot_id()); repo.create_tag("v1", &new_snapshot_id).await?; - let (ref_name, ref_data) = - fetch_ref(storage.as_ref(), &storage_settings, "v1").await?; - assert_eq!(ref_name, Ref::Tag("v1".to_string())); + let ref_data = fetch_tag(storage.as_ref(), &storage_settings, "v1").await?; assert_eq!(new_snapshot_id, ref_data.snapshot); assert!(matches!( @@ -2906,9 +2903,9 @@ mod tests { ) .await?; let new_snapshot_id = ds.commit("second commit", None).await?; - let (ref_name, ref_data) = - fetch_ref(storage.as_ref(), &storage_settings, Ref::DEFAULT_BRANCH).await?; - assert_eq!(ref_name, Ref::Branch("main".to_string())); + let ref_data = + fetch_branch_tip(storage.as_ref(), &storage_settings, Ref::DEFAULT_BRANCH) + .await?; assert_eq!(new_snapshot_id, ref_data.snapshot); let parents = repo @@ -3416,7 +3413,6 @@ mod tests { "main", conflicting_snap.clone(), Some(¤t_snap), - false, ) .await?; Ok(()) diff --git a/icechunk/src/storage/logging.rs b/icechunk/src/storage/logging.rs index 983629d0..89aae1f1 100644 --- a/icechunk/src/storage/logging.rs +++ b/icechunk/src/storage/logging.rs @@ -12,7 +12,7 @@ use tokio::io::AsyncRead; use super::{ FetchConfigResult, GetRefResult, ListInfo, Reader, Settings, Storage, StorageError, - StorageResult, UpdateConfigResult, WriteRefResult, + StorageResult, UpdateConfigResult, VersionInfo, WriteRefResult, }; use crate::{ format::{ChunkId, ChunkOffset, ManifestId, SnapshotId}, @@ -56,9 +56,9 @@ impl Storage for LoggingStorage { &self, settings: &Settings, config: Bytes, - etag: Option<&str>, + previous_version: &VersionInfo, ) -> StorageResult { - self.backend.update_config(settings, config, etag).await + self.backend.update_config(settings, config, previous_version).await } async fn fetch_snapshot( @@ -178,18 +178,10 @@ impl Storage for LoggingStorage { &self, settings: &Settings, ref_key: &str, - overwrite_refs: bool, bytes: Bytes, + previous_version: &VersionInfo, ) -> StorageResult { - self.backend.write_ref(settings, ref_key, overwrite_refs, bytes).await - } - - async fn ref_versions( - &self, - settings: &Settings, - ref_name: &str, - ) -> StorageResult>> { - self.backend.ref_versions(settings, ref_name).await + self.backend.write_ref(settings, ref_key, bytes, previous_version).await } async fn list_objects<'a>( diff --git a/icechunk/src/storage/mod.rs b/icechunk/src/storage/mod.rs index 1c4b3125..09cffbd7 100644 --- a/icechunk/src/storage/mod.rs +++ b/icechunk/src/storage/mod.rs @@ -104,7 +104,38 @@ const REF_PREFIX: &str = "refs"; const TRANSACTION_PREFIX: &str = "transactions/"; const CONFIG_PATH: &str = "config.yaml"; -pub type ETag = String; +#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Clone, Hash, PartialOrd, Ord)] +pub struct ETag(pub String); +#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Clone, Default)] +pub struct Generation(pub String); + +#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Clone)] +pub struct VersionInfo { + pub etag: Option, + pub generation: Option, +} + +impl VersionInfo { + pub fn for_creation() -> Self { + Self { etag: None, generation: None } + } + + pub fn from_etag_only(etag: String) -> Self { + Self { etag: Some(ETag(etag)), generation: None } + } + + pub fn is_create(&self) -> bool { + self.etag.is_none() && self.generation.is_none() + } + + pub fn etag(&self) -> Option<&String> { + self.etag.as_ref().map(|e| &e.0) + } + + pub fn generation(&self) -> Option<&String> { + self.generation.as_ref().map(|e| &e.0) + } +} #[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Clone, Default)] pub struct ConcurrencySettings { @@ -197,19 +228,19 @@ impl Reader { #[derive(Debug, Clone, PartialEq, Eq)] pub enum FetchConfigResult { - Found { bytes: Bytes, etag: ETag }, + Found { bytes: Bytes, version: VersionInfo }, NotFound, } #[derive(Debug, Clone, PartialEq, Eq)] pub enum UpdateConfigResult { - Updated { new_etag: ETag }, + Updated { new_version: VersionInfo }, NotOnLatestVersion, } #[derive(Debug, Clone, PartialEq, Eq)] pub enum GetRefResult { - Found { bytes: Bytes }, + Found { bytes: Bytes, version: VersionInfo }, NotFound, } @@ -235,7 +266,7 @@ pub trait Storage: fmt::Debug + private::Sealed + Sync + Send { &self, settings: &Settings, config: Bytes, - etag: Option<&str>, + previous_version: &VersionInfo, ) -> StorageResult; async fn fetch_snapshot( &self, @@ -304,17 +335,12 @@ pub trait Storage: fmt::Debug + private::Sealed + Sync + Send { ref_key: &str, ) -> StorageResult; async fn ref_names(&self, settings: &Settings) -> StorageResult>; - async fn ref_versions( - &self, - settings: &Settings, - ref_name: &str, - ) -> StorageResult>>; async fn write_ref( &self, settings: &Settings, ref_key: &str, - overwrite_refs: bool, bytes: Bytes, + previous_version: &VersionInfo, ) -> StorageResult; async fn list_objects<'a>( diff --git a/icechunk/src/storage/object_store.rs b/icechunk/src/storage/object_store.rs index e3192bdc..38ee9f41 100644 --- a/icechunk/src/storage/object_store.rs +++ b/icechunk/src/storage/object_store.rs @@ -46,10 +46,10 @@ use tokio_util::compat::FuturesAsyncReadCompatExt; use tracing::instrument; use super::{ - ConcurrencySettings, FetchConfigResult, GetRefResult, ListInfo, Reader, Settings, - Storage, StorageError, StorageErrorKind, StorageResult, UpdateConfigResult, - WriteRefResult, CHUNK_PREFIX, CONFIG_PATH, MANIFEST_PREFIX, REF_PREFIX, - SNAPSHOT_PREFIX, TRANSACTION_PREFIX, + ConcurrencySettings, ETag, FetchConfigResult, Generation, GetRefResult, ListInfo, + Reader, Settings, Storage, StorageError, StorageErrorKind, StorageResult, + UpdateConfigResult, VersionInfo, WriteRefResult, CHUNK_PREFIX, CONFIG_PATH, + MANIFEST_PREFIX, REF_PREFIX, SNAPSHOT_PREFIX, TRANSACTION_PREFIX, }; #[derive(Debug, Serialize, Deserialize)] @@ -163,6 +163,11 @@ impl ObjectStorage { self.backend.supports_metadata() } + /// We need this because object_store's local file implementation doesn't support it + pub fn supports_conditional_put_updates(&self) -> bool { + self.backend.supports_conditional_put_updates() + } + /// Return all keys in the store /// /// Intended for testing and debugging purposes only. @@ -219,27 +224,6 @@ impl ObjectStorage { ObjectPath::from(format!("{}/{}/{}", self.backend.prefix(), REF_PREFIX, ref_key)) } - async fn do_ref_versions(&self, ref_name: &str) -> BoxStream> { - let prefix = self.ref_key(ref_name); - self.get_client() - .await - .list(Some(prefix.clone()).as_ref()) - .map_err(|e| e.into()) - .and_then(move |meta| { - ready( - self.drop_prefix(&prefix, &meta.location) - .map(|path| path.to_string()) - .ok_or( - StorageErrorKind::Other( - "Bug in ref prefix logic".to_string(), - ) - .into(), - ), - ) - }) - .boxed() - } - async fn delete_batch( &self, prefix: &str, @@ -279,6 +263,27 @@ impl ObjectStorage { Attributes::new() } } + + fn get_ref_name(&self, prefix: &ObjectPath, meta: &ObjectMeta) -> Option { + let relative_key = self.drop_prefix(prefix, &meta.location)?; + let parent = relative_key.parts().next()?; + Some(parent.as_ref().to_string()) + } + + fn get_put_mode(&self, previous_version: &VersionInfo) -> PutMode { + let degrade_to_overwrite = + !previous_version.is_create() && !self.supports_conditional_put_updates(); + if degrade_to_overwrite { + PutMode::Overwrite + } else if previous_version.is_create() { + PutMode::Create + } else { + PutMode::Update(UpdateVersion { + e_tag: previous_version.etag().cloned(), + version: previous_version.generation().cloned(), + }) + } + } } impl private::Sealed for ObjectStorage {} @@ -300,12 +305,14 @@ impl Storage for ObjectStorage { let response = self.get_client().await.get(&path).await; match response { - Ok(result) => match result.meta.e_tag.clone() { - Some(etag) => { - Ok(FetchConfigResult::Found { bytes: result.bytes().await?, etag }) - } - None => Ok(FetchConfigResult::NotFound), - }, + Ok(result) => { + let version = VersionInfo { + etag: result.meta.e_tag.as_ref().cloned().map(ETag), + generation: result.meta.version.as_ref().cloned().map(Generation), + }; + + Ok(FetchConfigResult::Found { bytes: result.bytes().await?, version }) + } Err(object_store::Error::NotFound { .. }) => Ok(FetchConfigResult::NotFound), Err(err) => Err(err.into()), } @@ -315,7 +322,7 @@ impl Storage for ObjectStorage { &self, _settings: &Settings, config: Bytes, - etag: Option<&str>, + previous_version: &VersionInfo, ) -> StorageResult { let path = self.get_config_path(); let attributes = if self.supports_metadata() { @@ -327,23 +334,17 @@ impl Storage for ObjectStorage { Attributes::new() }; - let mode = if let Some(etag) = etag { - PutMode::Update(UpdateVersion { - e_tag: Some(etag.to_string()), - version: None, - }) - } else { - PutMode::Create - }; + let mode = self.get_put_mode(previous_version); let options = PutOptions { mode, attributes, ..PutOptions::default() }; let res = self.get_client().await.put_opts(&path, config.into(), options).await; match res { Ok(res) => { - let new_etag = res.e_tag.ok_or(StorageErrorKind::Other( - "Config object should have an etag".to_string(), - ))?; - Ok(UpdateConfigResult::Updated { new_etag }) + let new_version = VersionInfo { + etag: res.e_tag.map(ETag), + generation: res.version.map(Generation), + }; + Ok(UpdateConfigResult::Updated { new_version }) } Err(object_store::Error::Precondition { .. }) => { Ok(UpdateConfigResult::NotOnLatestVersion) @@ -479,7 +480,14 @@ impl Storage for ObjectStorage { ) -> StorageResult { let key = self.ref_key(ref_key); match self.get_client().await.get(&key).await { - Ok(res) => Ok(GetRefResult::Found { bytes: res.bytes().await? }), + Ok(res) => { + let etag = res.meta.e_tag.clone().map(ETag); + let generation = res.meta.version.clone().map(Generation); + Ok(GetRefResult::Found { + bytes: res.bytes().await?, + version: VersionInfo { etag, generation }, + }) + } Err(object_store::Error::NotFound { .. }) => Ok(GetRefResult::NotFound), Err(err) => Err(err.into()), } @@ -487,42 +495,15 @@ impl Storage for ObjectStorage { #[instrument(skip(self, _settings))] async fn ref_names(&self, _settings: &Settings) -> StorageResult> { - // FIXME: i don't think object_store's implementation of list_with_delimiter is any good - // we need to test if it even works beyond 1k refs let prefix = self.ref_key(""); Ok(self .get_client() .await - .list_with_delimiter(Some(prefix.clone()).as_ref()) - .await? - .common_prefixes - .iter() - .filter_map(|path| { - self.drop_prefix(&prefix, path).map(|path| path.to_string()) - }) - .collect()) - } - - #[instrument(skip(self, _settings))] - async fn ref_versions( - &self, - _settings: &Settings, - ref_name: &str, - ) -> StorageResult>> { - let res = self.do_ref_versions(ref_name).await; - if self.artificially_sort_refs_in_mem() { - #[allow(clippy::expect_used)] - // This branch is used for local tests, not in production. We don't expect the size of - // these streams to be large, so we can collect in memory and fail early if there is an - // error - let mut all = - res.try_collect::>().await.expect("Error fetching ref versions"); - all.sort(); - Ok(futures::stream::iter(all.into_iter().map(Ok)).boxed()) - } else { - Ok(res) - } + .list(Some(prefix.clone()).as_ref()) + .try_filter_map(|meta| ready(Ok(self.get_ref_name(&prefix, &meta)))) + .try_collect() + .await?) } #[instrument(skip(self, _settings, bytes))] @@ -530,11 +511,11 @@ impl Storage for ObjectStorage { &self, _settings: &Settings, ref_key: &str, - overwrite_refs: bool, bytes: Bytes, + previous_version: &VersionInfo, ) -> StorageResult { let key = self.ref_key(ref_key); - let mode = if overwrite_refs { PutMode::Overwrite } else { PutMode::Create }; + let mode = self.get_put_mode(previous_version); let opts = PutOptions { mode, ..PutOptions::default() }; match self @@ -544,7 +525,8 @@ impl Storage for ObjectStorage { .await { Ok(_) => Ok(WriteRefResult::Written), - Err(object_store::Error::AlreadyExists { .. }) => { + Err(object_store::Error::Precondition { .. }) + | Err(object_store::Error::AlreadyExists { .. }) => { Ok(WriteRefResult::WontOverwrite) } Err(err) => Err(err.into()), @@ -668,6 +650,11 @@ pub trait ObjectStoreBackend: Debug + Sync + Send { true } + /// We need this because object_store's local file implementation doesn't support it + fn supports_conditional_put_updates(&self) -> bool { + true + } + fn default_settings(&self) -> Settings { Settings::default() } @@ -734,6 +721,10 @@ impl ObjectStoreBackend for LocalFileSystemObjectStoreBackend { false } + fn supports_conditional_put_updates(&self) -> bool { + false + } + fn default_settings(&self) -> Settings { Settings { concurrency: Some(ConcurrencySettings { diff --git a/icechunk/src/storage/s3.rs b/icechunk/src/storage/s3.rs index e67fd7a3..be105a21 100644 --- a/icechunk/src/storage/s3.rs +++ b/icechunk/src/storage/s3.rs @@ -13,7 +13,6 @@ use crate::{ format::{ChunkId, ChunkOffset, FileTypeTag, ManifestId, ObjectId, SnapshotId}, private, Storage, StorageError, }; -use async_stream::try_stream; use async_trait::async_trait; use aws_config::{ meta::region::RegionProviderChain, retry::ProvideErrorKind, AppName, BehaviorVersion, @@ -41,8 +40,8 @@ use tracing::instrument; use super::{ FetchConfigResult, GetRefResult, ListInfo, Reader, Settings, StorageErrorKind, - StorageResult, UpdateConfigResult, WriteRefResult, CHUNK_PREFIX, CONFIG_PATH, - MANIFEST_PREFIX, REF_PREFIX, SNAPSHOT_PREFIX, TRANSACTION_PREFIX, + StorageResult, UpdateConfigResult, VersionInfo, WriteRefResult, CHUNK_PREFIX, + CONFIG_PATH, MANIFEST_PREFIX, REF_PREFIX, SNAPSHOT_PREFIX, TRANSACTION_PREFIX, }; #[derive(Debug, Serialize, Deserialize)] @@ -248,6 +247,14 @@ impl S3Storage { Ok(res.deleted().len()) } + + fn get_ref_name<'a>(&self, key: Option<&'a str>) -> Option<&'a str> { + let key = key?; + let prefix = self.ref_key("").ok()?; + let relative_key = key.strip_prefix(&prefix)?; + let ref_name = relative_key.split('/').next()?; + Some(ref_name) + } } pub fn range_to_header(range: &Range) -> String { @@ -278,7 +285,7 @@ impl Storage for S3Storage { Ok(output) => match output.e_tag { Some(etag) => Ok(FetchConfigResult::Found { bytes: output.body.collect().await?.into_bytes(), - etag, + version: VersionInfo::from_etag_only(etag), }), None => Ok(FetchConfigResult::NotFound), }, @@ -294,7 +301,7 @@ impl Storage for S3Storage { &self, _settings: &Settings, config: Bytes, - etag: Option<&str>, + previous_version: &VersionInfo, ) -> StorageResult { let key = self.get_config_path()?; let mut req = self @@ -306,7 +313,7 @@ impl Storage for S3Storage { .content_type("application/yaml") .body(config.into()); - if let Some(etag) = etag { + if let Some(etag) = previous_version.etag() { req = req.if_match(etag) } else { req = req.if_none_match("*") @@ -322,7 +329,8 @@ impl Storage for S3Storage { "Config object should have an etag".to_string(), ))? .to_string(); - Ok(UpdateConfigResult::Updated { new_etag }) + let new_version = VersionInfo::from_etag_only(new_etag); + Ok(UpdateConfigResult::Updated { new_version }) } // minio returns this Err(SdkError::ServiceError(err)) => { @@ -471,7 +479,11 @@ impl Storage for S3Storage { match res { Ok(res) => { let bytes = res.body.collect().await?.into_bytes(); - Ok(GetRefResult::Found { bytes }) + if let Some(version) = res.e_tag.map(VersionInfo::from_etag_only) { + Ok(GetRefResult::Found { bytes, version }) + } else { + Ok(GetRefResult::NotFound) + } } Err(err) if err @@ -494,21 +506,16 @@ impl Storage for S3Storage { .list_objects_v2() .bucket(self.bucket.clone()) .prefix(prefix.clone()) - .delimiter("/") .into_paginator() .send(); let mut res = Vec::new(); while let Some(page) = paginator.try_next().await? { - for common_prefix in page.common_prefixes() { - if let Some(key) = common_prefix - .prefix() - .as_ref() - .and_then(|key| key.strip_prefix(prefix.as_str())) - .and_then(|key| key.strip_suffix('/')) - { - res.push(key.to_string()); + for obj in page.contents.unwrap_or_else(Vec::new) { + let name = self.get_ref_name(obj.key()); + if let Some(name) = name { + res.push(name.to_string()); } } } @@ -516,42 +523,13 @@ impl Storage for S3Storage { Ok(res) } - #[instrument(skip(self, _settings))] - async fn ref_versions( - &self, - _settings: &Settings, - ref_name: &str, - ) -> StorageResult>> { - let prefix = self.ref_key(ref_name)?; - let mut paginator = self - .get_client() - .await - .list_objects_v2() - .bucket(self.bucket.clone()) - .prefix(prefix.clone()) - .into_paginator() - .send(); - - let prefix = prefix + "/"; - let stream = try_stream! { - while let Some(page) = paginator.try_next().await? { - for object in page.contents() { - if let Some(key) = object.key.as_ref().and_then(|key| key.strip_prefix(prefix.as_str())) { - yield key.to_string() - } - } - } - }; - Ok(stream.boxed()) - } - #[instrument(skip(self, _settings, bytes))] async fn write_ref( &self, _settings: &Settings, ref_key: &str, - overwrite_refs: bool, bytes: Bytes, + previous_version: &VersionInfo, ) -> StorageResult { let key = self.ref_key(ref_key)?; let mut builder = self @@ -561,8 +539,10 @@ impl Storage for S3Storage { .bucket(self.bucket.clone()) .key(key.clone()); - if !overwrite_refs { - builder = builder.if_none_match("*") + if let Some(etag) = previous_version.etag() { + builder = builder.if_match(etag); + } else { + builder = builder.if_none_match("*"); } let res = builder.body(bytes.into()).send().await; diff --git a/icechunk/src/virtual_chunks.rs b/icechunk/src/virtual_chunks.rs index ae8bc70a..465ea42b 100644 --- a/icechunk/src/virtual_chunks.rs +++ b/icechunk/src/virtual_chunks.rs @@ -325,7 +325,7 @@ impl S3Fetcher { ) } Some(Checksum::ETag(etag)) => { - b = b.if_match(etag); + b = b.if_match(&etag.0); } None => {} }; @@ -459,7 +459,7 @@ impl ChunkFetcher for LocalFSFetcher { .expect("Bad last modified field in virtual chunk reference"); options.if_unmodified_since = Some(d); } - Some(Checksum::ETag(etag)) => options.if_match = Some(etag.clone()), + Some(Checksum::ETag(etag)) => options.if_match = Some(etag.0.clone()), None => {} } diff --git a/icechunk/tests/test_gc.rs b/icechunk/tests/test_gc.rs index f64d08fc..59183e55 100644 --- a/icechunk/tests/test_gc.rs +++ b/icechunk/tests/test_gc.rs @@ -58,7 +58,6 @@ pub async fn test_gc() -> Result<(), Box> { let repo = Repository::create( Some(RepositoryConfig { inline_chunk_threshold_bytes: Some(0), - unsafe_overwrite_refs: Some(true), ..Default::default() }), Arc::clone(&storage), @@ -127,7 +126,6 @@ pub async fn test_gc() -> Result<(), Box> { "main", first_snap_id, Some(&second_snap_id), - false, ) .await?; diff --git a/icechunk/tests/test_storage.rs b/icechunk/tests/test_storage.rs index 802a7bad..43765fa7 100644 --- a/icechunk/tests/test_storage.rs +++ b/icechunk/tests/test_storage.rs @@ -8,18 +8,20 @@ use bytes::Bytes; use icechunk::{ config::{S3Credentials, S3Options, S3StaticCredentials}, format::{ChunkId, ManifestId, SnapshotId}, + new_local_filesystem_storage, refs::{ create_tag, fetch_branch_tip, fetch_tag, list_refs, update_branch, Ref, RefError, RefErrorKind, }, storage::{ new_in_memory_storage, new_s3_storage, FetchConfigResult, StorageResult, - UpdateConfigResult, + UpdateConfigResult, VersionInfo, }, ObjectStorage, Storage, }; use object_store::azure::AzureConfigKey; use pretty_assertions::{assert_eq, assert_ne}; +use tempfile::tempdir; use tokio::io::AsyncReadExt; #[allow(clippy::expect_used)] @@ -89,9 +91,10 @@ async fn mk_azure_blob_storage( Ok(storage) } +#[allow(clippy::expect_used)] async fn with_storage(f: F) -> Result<(), Box> where - F: Fn(Arc) -> Fut, + F: Fn(&'static str, Arc) -> Fut, Fut: Future>>, { let prefix = format!("{:?}", ChunkId::random()); @@ -100,16 +103,27 @@ where let s2 = new_in_memory_storage().await.unwrap(); let s3 = mk_s3_object_store_storage(format!("{prefix}2").as_str()).await?; let s4 = mk_azure_blob_storage(prefix.as_str()).await?; - f(s1).await?; - f(s2).await?; - f(s3).await?; - f(s4).await?; + let dir = tempdir().expect("cannot create temp dir"); + let s5 = new_local_filesystem_storage(dir.path()) + .await + .expect("Cannot create local Storage"); + + println!("Using in memory storage"); + f("in_memory", s2).await?; + println!("Using local filesystem storage"); + f("local_filesystem", s5).await?; + println!("Using s3 native storage"); + f("s3_native", s1).await?; + println!("Using s3 object_store storage"); + f("s3_object_store", s3).await?; + println!("Using azure_blob storage"); + f("azure_blob", s4).await?; Ok(()) } #[tokio::test] pub async fn test_snapshot_write_read() -> Result<(), Box> { - with_storage(|storage| async move { + with_storage(|_, storage| async move { let storage_settings = storage.default_settings(); let id = SnapshotId::random(); let bytes: [u8; 1024] = core::array::from_fn(|_| rand::random()); @@ -133,7 +147,7 @@ pub async fn test_snapshot_write_read() -> Result<(), Box #[tokio::test] pub async fn test_manifest_write_read() -> Result<(), Box> { - with_storage(|storage| async move { + with_storage(|_, storage| async move { let storage_settings = storage.default_settings(); let id = ManifestId::random(); let bytes: [u8; 1024] = core::array::from_fn(|_| rand::random()); @@ -165,7 +179,7 @@ pub async fn test_manifest_write_read() -> Result<(), Box #[tokio::test] pub async fn test_chunk_write_read() -> Result<(), Box> { - with_storage(|storage| async move { + with_storage(|_, storage| async move { let storage_settings = storage.default_settings(); let id = ChunkId::random(); let bytes = Bytes::from_static(b"hello"); @@ -181,11 +195,10 @@ pub async fn test_chunk_write_read() -> Result<(), Box> { #[tokio::test] pub async fn test_tag_write_get() -> Result<(), Box> { - with_storage(|storage| async move { + with_storage(|_, storage| async move { let storage_settings = storage.default_settings(); let id = SnapshotId::random(); - create_tag(storage.as_ref(), &storage_settings, "mytag", id.clone(), false) - .await?; + create_tag(storage.as_ref(), &storage_settings, "mytag", id.clone()).await?; let back = fetch_tag(storage.as_ref(), &storage_settings, "mytag").await?; assert_eq!(id, back.snapshot); Ok(()) @@ -196,10 +209,10 @@ pub async fn test_tag_write_get() -> Result<(), Box> { #[tokio::test] pub async fn test_fetch_non_existing_tag() -> Result<(), Box> { - with_storage(|storage| async move { + with_storage(|_, storage| async move { let storage_settings = storage.default_settings(); let id = SnapshotId::random(); - create_tag(storage.as_ref(), &storage_settings, "mytag", id.clone(), false) + create_tag(storage.as_ref(), &storage_settings, "mytag", id.clone()) .await?; let back = @@ -213,14 +226,14 @@ pub async fn test_fetch_non_existing_tag() -> Result<(), Box Result<(), Box> { - with_storage(|storage| async move { + with_storage(|_, storage| async move { let storage_settings = storage.default_settings(); let id = SnapshotId::random(); - create_tag(storage.as_ref(), &storage_settings, "mytag", id.clone(), false) + create_tag(storage.as_ref(), &storage_settings, "mytag", id.clone()) .await?; let res = - create_tag(storage.as_ref(), &storage_settings, "mytag", id.clone(), false) + create_tag(storage.as_ref(), &storage_settings, "mytag", id.clone()) .await; assert!(matches!(res, Err(RefError{kind: RefErrorKind::TagAlreadyExists(r), ..}) if r == "mytag")); Ok(()) @@ -231,20 +244,18 @@ pub async fn test_create_existing_tag() -> Result<(), Box #[tokio::test] pub async fn test_branch_initialization() -> Result<(), Box> { - with_storage(|storage| async move { + with_storage(|_, storage| async move { let storage_settings = storage.default_settings(); let id = SnapshotId::random(); - let res = update_branch( + update_branch( storage.as_ref(), &storage_settings, "some-branch", id.clone(), None, - false, ) .await?; - assert_eq!(res.0, 0); let res = fetch_branch_tip(storage.as_ref(), &storage_settings, "some-branch").await?; @@ -258,7 +269,7 @@ pub async fn test_branch_initialization() -> Result<(), Box Result<(), Box> { - with_storage(|storage| async move { + with_storage(|_, storage| async move { let storage_settings = storage.default_settings(); let id = SnapshotId::random(); update_branch( @@ -267,7 +278,6 @@ pub async fn test_fetch_non_existing_branch() -> Result<(), Box Result<(), Box Result<(), Box> { - with_storage(|storage| async move { + with_storage(|_, storage| async move { let storage_settings = storage.default_settings(); let id1 = SnapshotId::random(); let id2 = SnapshotId::random(); let id3 = SnapshotId::random(); - let res = update_branch( + update_branch( storage.as_ref(), &storage_settings, "some-branch", id1.clone(), None, - false, ) .await?; - assert_eq!(res.0, 0); - let res = update_branch( + update_branch( storage.as_ref(), &storage_settings, "some-branch", id2.clone(), Some(&id1), - false, ) .await?; - assert_eq!(res.0, 1); - let res = update_branch( + update_branch( storage.as_ref(), &storage_settings, "some-branch", id3.clone(), Some(&id2), - false, ) .await?; - assert_eq!(res.0, 2); let res = fetch_branch_tip(storage.as_ref(), &storage_settings, "some-branch").await?; @@ -336,56 +340,27 @@ pub async fn test_branch_update() -> Result<(), Box> { #[tokio::test] pub async fn test_ref_names() -> Result<(), Box> { - with_storage(|storage| async move { + with_storage(|_, storage| async move { let storage_settings = storage.default_settings(); let id1 = SnapshotId::random(); let id2 = SnapshotId::random(); - update_branch( - storage.as_ref(), - &storage_settings, - "main", - id1.clone(), - None, - false, - ) - .await?; + update_branch(storage.as_ref(), &storage_settings, "main", id1.clone(), None) + .await?; update_branch( storage.as_ref(), &storage_settings, "main", id2.clone(), Some(&id1), - false, ) .await?; - update_branch( - storage.as_ref(), - &storage_settings, - "foo", - id1.clone(), - None, - false, - ) - .await?; - update_branch( - storage.as_ref(), - &storage_settings, - "bar", - id1.clone(), - None, - false, - ) - .await?; - create_tag(storage.as_ref(), &storage_settings, "my-tag", id1.clone(), false) + update_branch(storage.as_ref(), &storage_settings, "foo", id1.clone(), None) + .await?; + update_branch(storage.as_ref(), &storage_settings, "bar", id1.clone(), None) + .await?; + create_tag(storage.as_ref(), &storage_settings, "my-tag", id1.clone()).await?; + create_tag(storage.as_ref(), &storage_settings, "my-other-tag", id1.clone()) .await?; - create_tag( - storage.as_ref(), - &storage_settings, - "my-other-tag", - id1.clone(), - false, - ) - .await?; let res: HashSet<_> = HashSet::from_iter(list_refs(storage.as_ref(), &storage_settings).await?); @@ -408,17 +383,17 @@ pub async fn test_ref_names() -> Result<(), Box> { #[tokio::test] #[allow(clippy::panic)] pub async fn test_write_config_on_empty() -> Result<(), Box> { - with_storage(|storage| async move { + with_storage(|_, storage| async move { let storage_settings = storage.default_settings(); let config = Bytes::copy_from_slice(b"hello"); - let etag = match storage.update_config(&storage_settings, config.clone(), None).await? { - UpdateConfigResult::Updated { new_etag } => new_etag, + let version = match storage.update_config(&storage_settings, config.clone(), &VersionInfo::for_creation()).await? { + UpdateConfigResult::Updated { new_version } => new_version, UpdateConfigResult::NotOnLatestVersion => panic!(), }; - assert_ne!(etag, ""); + assert_ne!(version, VersionInfo::for_creation()); let res = storage.fetch_config(&storage_settings, ).await?; assert!( - matches!(res, FetchConfigResult::Found{bytes, etag: actual_etag} if actual_etag == etag && bytes == config ) + matches!(res, FetchConfigResult::Found{bytes, version: actual_version} if actual_version == version && bytes == config ) ); Ok(()) }).await?; @@ -428,21 +403,21 @@ pub async fn test_write_config_on_empty() -> Result<(), Box Result<(), Box> { - with_storage(|storage| async move { + with_storage(|_, storage| async move { let storage_settings = storage.default_settings(); - let first_etag = match storage.update_config(&storage_settings, Bytes::copy_from_slice(b"hello"), None).await? { - UpdateConfigResult::Updated { new_etag } => new_etag, + let first_version = match storage.update_config(&storage_settings, Bytes::copy_from_slice(b"hello"), &VersionInfo::for_creation()).await? { + UpdateConfigResult::Updated { new_version } => new_version, _ => panic!(), }; let config = Bytes::copy_from_slice(b"bye"); - let second_etag = match storage.update_config(&storage_settings, config.clone(), Some(first_etag.as_str())).await? { - UpdateConfigResult::Updated { new_etag } => new_etag, + let second_version = match storage.update_config(&storage_settings, config.clone(), &first_version).await? { + UpdateConfigResult::Updated { new_version } => new_version, _ => panic!(), }; - assert_ne!(second_etag, first_etag); + assert_ne!(second_version, first_version); let res = storage.fetch_config(&storage_settings, ).await?; assert!( - matches!(res, FetchConfigResult::Found{bytes, etag: actual_etag} if actual_etag == second_etag && bytes == config ) + matches!(res, FetchConfigResult::Found{bytes, version: actual_version} if actual_version == second_version && bytes == config ) ); Ok(()) }).await?; @@ -450,48 +425,63 @@ pub async fn test_write_config_on_existing() -> Result<(), Box Result<(), Box> { // FIXME: this test fails in MiniIO but seems to work on S3 #[allow(clippy::unwrap_used)] let storage = new_in_memory_storage().await.unwrap(); let storage_settings = storage.default_settings(); - let etag = storage + let version = storage .update_config( &storage_settings, Bytes::copy_from_slice(b"hello"), - Some("00000000000000000000000000000000"), + &VersionInfo::from_etag_only("00000000000000000000000000000000".to_string()), ) .await; - assert!(matches!(etag, Ok(UpdateConfigResult::NotOnLatestVersion))); + assert!(matches!(version, Ok(UpdateConfigResult::NotOnLatestVersion))); Ok(()) } #[tokio::test] #[allow(clippy::panic)] -pub async fn test_write_config_fails_on_bad_etag_when_existing( +pub async fn test_write_config_fails_on_bad_version_when_existing( ) -> Result<(), Box> { - with_storage(|storage| async move { + with_storage(|storage_type, storage| async move { let storage_settings = storage.default_settings(); let config = Bytes::copy_from_slice(b"hello"); - let etag = match storage.update_config(&storage_settings, config.clone(), None).await? { - UpdateConfigResult::Updated { new_etag } => new_etag, + let version = match storage.update_config(&storage_settings, config.clone(), &VersionInfo::for_creation()).await? { + UpdateConfigResult::Updated { new_version } => new_version, _ => panic!(), }; - let res = storage + let update_res = storage .update_config(&storage_settings, Bytes::copy_from_slice(b"bye"), - Some("00000000000000000000000000000000"), + &VersionInfo::from_etag_only("00000000000000000000000000000000".to_string()), ) .await?; - assert!(matches!(res, UpdateConfigResult::NotOnLatestVersion)); - - let res = storage.fetch_config(&storage_settings, ).await?; - assert!( - matches!(res, FetchConfigResult::Found{bytes, etag: actual_etag} if actual_etag == etag && bytes == config ) - ); - Ok(()) + if storage_type == "local_filesystem" { + // FIXME: local file system doesn't have conditional updates yet + assert!(matches!(update_res, UpdateConfigResult::Updated{..})); + + } else { + assert!(matches!(update_res, UpdateConfigResult::NotOnLatestVersion)); + } + + let fetch_res = storage.fetch_config(&storage_settings, ).await?; + if storage_type == "local_filesystem" { + // FIXME: local file system doesn't have conditional updates yet + assert!( + matches!(fetch_res, FetchConfigResult::Found{bytes, version: actual_version} + if actual_version != version && bytes == Bytes::copy_from_slice(b"bye")) + ); + } else { + assert!( + matches!(fetch_res, FetchConfigResult::Found{bytes, version: actual_version} + if actual_version == version && bytes == config ) + ); + } + Ok(()) }).await?; Ok(()) } diff --git a/icechunk/tests/test_virtual_refs.rs b/icechunk/tests/test_virtual_refs.rs index 03cfd665..9ab5153f 100644 --- a/icechunk/tests/test_virtual_refs.rs +++ b/icechunk/tests/test_virtual_refs.rs @@ -15,7 +15,7 @@ mod tests { repository::VersionInfo, session::{get_chunk, SessionErrorKind}, storage::{ - self, new_s3_storage, s3::mk_client, ConcurrencySettings, ObjectStorage, + self, new_s3_storage, s3::mk_client, ConcurrencySettings, ETag, ObjectStorage, }, store::{StoreError, StoreErrorKind}, virtual_chunks::VirtualChunkContainer, @@ -711,7 +711,7 @@ mod tests { ))?, offset: 1, length: 5, - checksum: Some(Checksum::ETag(String::from("invalid etag"))), + checksum: Some(Checksum::ETag(ETag(String::from("invalid etag")))), }; store.set_virtual_ref("array/c/0/0/2", ref1, false).await?; @@ -733,7 +733,7 @@ mod tests { )?, offset: 22306, length: 288, - checksum: Some(Checksum::ETag(String::from("invalid etag"))), + checksum: Some(Checksum::ETag(ETag(String::from("invalid etag")))), }; store.set_virtual_ref("array/c/1/1/1", public_ref, false).await?;