diff --git a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py index 6de5c424..a0cb5698 100644 --- a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py +++ b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py @@ -24,6 +24,7 @@ ElasticsearchSettings as SyncElasticsearchSettings, ) from stac_fastapi.types.errors import ConflictError, NotFoundError +from stac_fastapi.types.links import resolve_links from stac_fastapi.types.stac import ( Collection, Item, @@ -764,11 +765,8 @@ async def merge_patch_item( base_url: (str): The base URL used for constructing URLs for the item. refresh (bool, optional): Refresh the index after performing the operation. Defaults to True. - Raises: - ConflictError: If the item already exists in the database. - Returns: - None + patched item. """ operations = merge_to_operations(item) @@ -797,11 +795,8 @@ async def json_patch_item( base_url (str): The base URL used for constructing URLs for the item. refresh (bool, optional): Refresh the index after performing the operation. Defaults to True. - Raises: - ConflictError: If the item already exists in the database. - Returns: - None + patched item. """ new_item_id = None new_collection_id = None @@ -1000,21 +995,18 @@ async def merge_patch_collection( collection: PartialCollection, base_url: str, refresh: bool = True, - ) -> Item: + ) -> Collection: """Database logic for merge patching a collection following RF7396. Args: - collection_id(str): Collection that item belongs to. - item_id(str): Id of item to be patched. - item (PartialItem): The partial item to be updated. - base_url: (str): The base URL used for constructing URLs for the item. + collection_id(str): Id of collection to be patched. + collection (PartialCollection): The partial collection to be updated. + base_url: (str): The base URL used for constructing links. refresh (bool, optional): Refresh the index after performing the operation. Defaults to True. - Raises: - ConflictError: If the item already exists in the database. Returns: - None + patched collection. """ operations = merge_to_operations(collection) @@ -1031,21 +1023,17 @@ async def json_patch_collection( operations: List[PatchOperation], base_url: str, refresh: bool = True, - ) -> Item: - """Database logic for json patching an item following RF6902. + ) -> Collection: + """Database logic for json patching a collection following RF6902. Args: - collection_id(str): Collection that item belongs to. - item_id(str): Id of item to be patched. + collection_id(str): Id of collection to be patched. operations (list): List of operations to run. - base_url (str): The base URL used for constructing URLs for the item. + base_url (str): The base URL used for constructing links. refresh (bool, optional): Refresh the index after performing the operation. Defaults to True. - Raises: - ConflictError: If the item already exists in the database. - Returns: - None + patched collection. """ new_collection_id = None script_operations = [] @@ -1075,6 +1063,7 @@ async def json_patch_collection( if new_collection_id: collection["id"] = new_collection_id + collection["links"] = resolve_links([], base_url) await self.update_collection( collection_id=collection_id, collection=collection, refresh=False ) diff --git a/stac_fastapi/opensearch/stac_fastapi/opensearch/database_logic.py b/stac_fastapi/opensearch/stac_fastapi/opensearch/database_logic.py index 841d5e27..b45370fd 100644 --- a/stac_fastapi/opensearch/stac_fastapi/opensearch/database_logic.py +++ b/stac_fastapi/opensearch/stac_fastapi/opensearch/database_logic.py @@ -1,4 +1,5 @@ """Database logic.""" + import asyncio import logging import os @@ -14,13 +15,25 @@ from stac_fastapi.core import serializers from stac_fastapi.core.extensions import filter -from stac_fastapi.core.utilities import MAX_LIMIT, bbox2polygon +from stac_fastapi.core.utilities import ( + MAX_LIMIT, + bbox2polygon, + merge_to_operations, + operations_to_script, +) from stac_fastapi.opensearch.config import ( AsyncOpensearchSettings as AsyncSearchSettings, ) from stac_fastapi.opensearch.config import OpensearchSettings as SyncSearchSettings from stac_fastapi.types.errors import ConflictError, NotFoundError -from stac_fastapi.types.stac import Collection, Item +from stac_fastapi.types.links import resolve_links +from stac_fastapi.types.stac import ( + Collection, + Item, + PartialCollection, + PartialItem, + PatchOperation, +) logger = logging.getLogger(__name__) @@ -767,6 +780,123 @@ async def create_item(self, item: Item, refresh: bool = False): f"Item {item_id} in collection {collection_id} already exists" ) + async def merge_patch_item( + self, + collection_id: str, + item_id: str, + item: PartialItem, + base_url: str, + refresh: bool = True, + ) -> Item: + """Database logic for merge patching an item following RF7396. + + Args: + collection_id(str): Collection that item belongs to. + item_id(str): Id of item to be patched. + item (PartialItem): The partial item to be updated. + base_url: (str): The base URL used for constructing URLs for the item. + refresh (bool, optional): Refresh the index after performing the operation. Defaults to True. + + Returns: + patched item. + """ + operations = merge_to_operations(item) + + return await self.json_patch_item( + collection_id=collection_id, + item_id=item_id, + operations=operations, + base_url=base_url, + refresh=refresh, + ) + + async def json_patch_item( + self, + collection_id: str, + item_id: str, + operations: List[PatchOperation], + base_url: str, + refresh: bool = True, + ) -> Item: + """Database logic for json patching an item following RF6902. + + Args: + collection_id(str): Collection that item belongs to. + item_id(str): Id of item to be patched. + operations (list): List of operations to run. + base_url (str): The base URL used for constructing URLs for the item. + refresh (bool, optional): Refresh the index after performing the operation. Defaults to True. + + Returns: + patched item. + """ + new_item_id = None + new_collection_id = None + script_operations = [] + + for operation in operations: + if operation["op"] in ["add", "replace"]: + if ( + operation["path"] == "collection" + and collection_id != operation["value"] + ): + await self.check_collection_exists(collection_id=operation["value"]) + new_collection_id = operation["value"] + + if operation["path"] == "id" and item_id != operation["value"]: + new_item_id = operation["value"] + + else: + script_operations.append(operation) + + script = operations_to_script(script_operations) + + if not new_collection_id and not new_item_id: + await self.client.update( + index=index_by_collection_id(collection_id), + id=mk_item_id(item_id, collection_id), + script=script, + refresh=refresh, + ) + + if new_collection_id: + await self.client.reindex( + body={ + "dest": {"index": f"{ITEMS_INDEX_PREFIX}{operation['value']}"}, + "source": { + "index": f"{ITEMS_INDEX_PREFIX}{collection_id}", + "query": {"term": {"id": {"value": item_id}}}, + }, + "script": { + "lang": "painless", + "source": ( + f"""ctx._id = ctx._id.replace('{collection_id}', '{operation["value"]}');""" + f"""ctx._source.collection = '{operation["value"]}';""" + + script + ), + }, + }, + wait_for_completion=True, + refresh=False, + ) + + item = await self.get_one_item(collection_id, item_id) + + if new_item_id: + item["id"] = new_item_id + item = await self.prep_create_item(item=item, base_url=base_url) + await self.create_item(item=item, refresh=False) + + if new_item_id or new_collection_id: + + await self.delete_item( + item_id=item_id, + collection_id=collection_id, + refresh=refresh, + ) + + return item + async def delete_item( self, item_id: str, collection_id: str, refresh: bool = False ): @@ -891,6 +1021,87 @@ async def update_collection( refresh=refresh, ) + async def merge_patch_collection( + self, + collection_id: str, + collection: PartialCollection, + base_url: str, + refresh: bool = True, + ) -> Collection: + """Database logic for merge patching a collection following RF7396. + + Args: + collection_id(str): Id of collection to be patched. + collection (PartialCollection): The partial collection to be updated. + base_url: (str): The base URL used for constructing links. + refresh (bool, optional): Refresh the index after performing the operation. Defaults to True. + + + Returns: + patched collection. + """ + operations = merge_to_operations(collection) + + return await self.json_patch_collection( + collection_id=collection_id, + operations=operations, + base_url=base_url, + refresh=refresh, + ) + + async def json_patch_collection( + self, + collection_id: str, + operations: List[PatchOperation], + base_url: str, + refresh: bool = True, + ) -> Collection: + """Database logic for json patching a collection following RF6902. + + Args: + collection_id(str): Id of collection to be patched. + operations (list): List of operations to run. + base_url (str): The base URL used for constructing links. + refresh (bool, optional): Refresh the index after performing the operation. Defaults to True. + + Returns: + patched collection. + """ + new_collection_id = None + script_operations = [] + + for operation in operations: + if ( + operation["op"] in ["add", "replace"] + and operation["path"] == "collection" + and collection_id != operation["value"] + ): + new_collection_id = operation["value"] + + else: + script_operations.append(operation) + + script = operations_to_script(script_operations) + + if not new_collection_id: + await self.client.update( + index=COLLECTIONS_INDEX, + id=collection_id, + script=script, + refresh=refresh, + ) + + collection = await self.find_collection(collection_id) + + if new_collection_id: + collection["id"] = new_collection_id + collection["links"] = resolve_links([], base_url) + await self.update_collection( + collection_id=collection_id, collection=collection, refresh=False + ) + + return collection + async def delete_collection(self, collection_id: str, refresh: bool = False): """Delete a collection from the database.