Skip to content

Commit

Permalink
Adding patch to opensearch backend.
Browse files Browse the repository at this point in the history
  • Loading branch information
rhysrevans3 committed Aug 29, 2024
1 parent e0bd94f commit 01c1563
Show file tree
Hide file tree
Showing 2 changed files with 227 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
ElasticsearchSettings as SyncElasticsearchSettings,
)
from stac_fastapi.types.errors import ConflictError, NotFoundError
from stac_fastapi.types.links import resolve_links
from stac_fastapi.types.stac import (
Collection,
Item,
Expand Down Expand Up @@ -764,11 +765,8 @@ async def merge_patch_item(
base_url: (str): The base URL used for constructing URLs for the item.
refresh (bool, optional): Refresh the index after performing the operation. Defaults to True.
Raises:
ConflictError: If the item already exists in the database.
Returns:
None
patched item.
"""
operations = merge_to_operations(item)

Expand Down Expand Up @@ -797,11 +795,8 @@ async def json_patch_item(
base_url (str): The base URL used for constructing URLs for the item.
refresh (bool, optional): Refresh the index after performing the operation. Defaults to True.
Raises:
ConflictError: If the item already exists in the database.
Returns:
None
patched item.
"""
new_item_id = None
new_collection_id = None
Expand Down Expand Up @@ -1000,21 +995,18 @@ async def merge_patch_collection(
collection: PartialCollection,
base_url: str,
refresh: bool = True,
) -> Item:
) -> Collection:
"""Database logic for merge patching a collection following RF7396.
Args:
collection_id(str): Collection that item belongs to.
item_id(str): Id of item to be patched.
item (PartialItem): The partial item to be updated.
base_url: (str): The base URL used for constructing URLs for the item.
collection_id(str): Id of collection to be patched.
collection (PartialCollection): The partial collection to be updated.
base_url: (str): The base URL used for constructing links.
refresh (bool, optional): Refresh the index after performing the operation. Defaults to True.
Raises:
ConflictError: If the item already exists in the database.
Returns:
None
patched collection.
"""
operations = merge_to_operations(collection)

Expand All @@ -1031,21 +1023,17 @@ async def json_patch_collection(
operations: List[PatchOperation],
base_url: str,
refresh: bool = True,
) -> Item:
"""Database logic for json patching an item following RF6902.
) -> Collection:
"""Database logic for json patching a collection following RF6902.
Args:
collection_id(str): Collection that item belongs to.
item_id(str): Id of item to be patched.
collection_id(str): Id of collection to be patched.
operations (list): List of operations to run.
base_url (str): The base URL used for constructing URLs for the item.
base_url (str): The base URL used for constructing links.
refresh (bool, optional): Refresh the index after performing the operation. Defaults to True.
Raises:
ConflictError: If the item already exists in the database.
Returns:
None
patched collection.
"""
new_collection_id = None
script_operations = []
Expand Down Expand Up @@ -1075,6 +1063,7 @@ async def json_patch_collection(

if new_collection_id:
collection["id"] = new_collection_id
collection["links"] = resolve_links([], base_url)
await self.update_collection(
collection_id=collection_id, collection=collection, refresh=False
)
Expand Down
215 changes: 213 additions & 2 deletions stac_fastapi/opensearch/stac_fastapi/opensearch/database_logic.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Database logic."""

import asyncio
import logging
import os
Expand All @@ -14,13 +15,25 @@

from stac_fastapi.core import serializers
from stac_fastapi.core.extensions import filter
from stac_fastapi.core.utilities import MAX_LIMIT, bbox2polygon
from stac_fastapi.core.utilities import (
MAX_LIMIT,
bbox2polygon,
merge_to_operations,
operations_to_script,
)
from stac_fastapi.opensearch.config import (
AsyncOpensearchSettings as AsyncSearchSettings,
)
from stac_fastapi.opensearch.config import OpensearchSettings as SyncSearchSettings
from stac_fastapi.types.errors import ConflictError, NotFoundError
from stac_fastapi.types.stac import Collection, Item
from stac_fastapi.types.links import resolve_links
from stac_fastapi.types.stac import (
Collection,
Item,
PartialCollection,
PartialItem,
PatchOperation,
)

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -767,6 +780,123 @@ async def create_item(self, item: Item, refresh: bool = False):
f"Item {item_id} in collection {collection_id} already exists"
)

async def merge_patch_item(
self,
collection_id: str,
item_id: str,
item: PartialItem,
base_url: str,
refresh: bool = True,
) -> Item:
"""Database logic for merge patching an item following RF7396.
Args:
collection_id(str): Collection that item belongs to.
item_id(str): Id of item to be patched.
item (PartialItem): The partial item to be updated.
base_url: (str): The base URL used for constructing URLs for the item.
refresh (bool, optional): Refresh the index after performing the operation. Defaults to True.
Returns:
patched item.
"""
operations = merge_to_operations(item)

return await self.json_patch_item(
collection_id=collection_id,
item_id=item_id,
operations=operations,
base_url=base_url,
refresh=refresh,
)

async def json_patch_item(
self,
collection_id: str,
item_id: str,
operations: List[PatchOperation],
base_url: str,
refresh: bool = True,
) -> Item:
"""Database logic for json patching an item following RF6902.
Args:
collection_id(str): Collection that item belongs to.
item_id(str): Id of item to be patched.
operations (list): List of operations to run.
base_url (str): The base URL used for constructing URLs for the item.
refresh (bool, optional): Refresh the index after performing the operation. Defaults to True.
Returns:
patched item.
"""
new_item_id = None
new_collection_id = None
script_operations = []

for operation in operations:
if operation["op"] in ["add", "replace"]:
if (
operation["path"] == "collection"
and collection_id != operation["value"]
):
await self.check_collection_exists(collection_id=operation["value"])
new_collection_id = operation["value"]

if operation["path"] == "id" and item_id != operation["value"]:
new_item_id = operation["value"]

else:
script_operations.append(operation)

script = operations_to_script(script_operations)

if not new_collection_id and not new_item_id:
await self.client.update(
index=index_by_collection_id(collection_id),
id=mk_item_id(item_id, collection_id),
script=script,
refresh=refresh,
)

if new_collection_id:
await self.client.reindex(
body={
"dest": {"index": f"{ITEMS_INDEX_PREFIX}{operation['value']}"},
"source": {
"index": f"{ITEMS_INDEX_PREFIX}{collection_id}",
"query": {"term": {"id": {"value": item_id}}},
},
"script": {
"lang": "painless",
"source": (
f"""ctx._id = ctx._id.replace('{collection_id}', '{operation["value"]}');"""
f"""ctx._source.collection = '{operation["value"]}';"""
+ script
),
},
},
wait_for_completion=True,
refresh=False,
)

item = await self.get_one_item(collection_id, item_id)

if new_item_id:
item["id"] = new_item_id
item = await self.prep_create_item(item=item, base_url=base_url)
await self.create_item(item=item, refresh=False)

if new_item_id or new_collection_id:

await self.delete_item(
item_id=item_id,
collection_id=collection_id,
refresh=refresh,
)

return item

async def delete_item(
self, item_id: str, collection_id: str, refresh: bool = False
):
Expand Down Expand Up @@ -891,6 +1021,87 @@ async def update_collection(
refresh=refresh,
)

async def merge_patch_collection(
self,
collection_id: str,
collection: PartialCollection,
base_url: str,
refresh: bool = True,
) -> Collection:
"""Database logic for merge patching a collection following RF7396.
Args:
collection_id(str): Id of collection to be patched.
collection (PartialCollection): The partial collection to be updated.
base_url: (str): The base URL used for constructing links.
refresh (bool, optional): Refresh the index after performing the operation. Defaults to True.
Returns:
patched collection.
"""
operations = merge_to_operations(collection)

return await self.json_patch_collection(
collection_id=collection_id,
operations=operations,
base_url=base_url,
refresh=refresh,
)

async def json_patch_collection(
self,
collection_id: str,
operations: List[PatchOperation],
base_url: str,
refresh: bool = True,
) -> Collection:
"""Database logic for json patching a collection following RF6902.
Args:
collection_id(str): Id of collection to be patched.
operations (list): List of operations to run.
base_url (str): The base URL used for constructing links.
refresh (bool, optional): Refresh the index after performing the operation. Defaults to True.
Returns:
patched collection.
"""
new_collection_id = None
script_operations = []

for operation in operations:
if (
operation["op"] in ["add", "replace"]
and operation["path"] == "collection"
and collection_id != operation["value"]
):
new_collection_id = operation["value"]

else:
script_operations.append(operation)

script = operations_to_script(script_operations)

if not new_collection_id:
await self.client.update(
index=COLLECTIONS_INDEX,
id=collection_id,
script=script,
refresh=refresh,
)

collection = await self.find_collection(collection_id)

if new_collection_id:
collection["id"] = new_collection_id
collection["links"] = resolve_links([], base_url)
await self.update_collection(
collection_id=collection_id, collection=collection, refresh=False
)

return collection

async def delete_collection(self, collection_id: str, refresh: bool = False):
"""Delete a collection from the database.
Expand Down

0 comments on commit 01c1563

Please sign in to comment.