diff --git a/.github/workflows/cicd.yml b/.github/workflows/cicd.yml index 9a50e28f..c8afdcc7 100644 --- a/.github/workflows/cicd.yml +++ b/.github/workflows/cicd.yml @@ -46,6 +46,22 @@ jobs: ES_JAVA_OPTS: -Xms512m -Xmx1g ports: - 9400:9400 + + opensearch_2_11: + image: opensearchproject/opensearch:2.11.1 + env: + cluster.name: stac-cluster + node.name: os01 + network.host: 0.0.0.0 + transport.host: 0.0.0.0 + discovery.type: single-node + http.port: 9202 + http.cors.enabled: true + plugins.security.disabled: true + plugins.security.ssl.http.enabled: true + OPENSEARCH_JAVA_OPTS: -Xms512m -Xmx512m + ports: + - 9202:9202 strategy: matrix: python-version: [ "3.8", "3.9", "3.10", "3.11"] @@ -90,4 +106,15 @@ jobs: ES_PORT: 9400 ES_HOST: 172.17.0.1 ES_USE_SSL: false - ES_VERIFY_CERTS: false \ No newline at end of file + ES_VERIFY_CERTS: false + + - name: Run test suite against OpenSearch 2.11.1 + run: | + cd stac_fastapi/elasticsearch && pipenv run pytest -svvv + env: + ENVIRONMENT: testing + ES_PORT: 9202 + ES_HOST: 172.17.0.1 + ES_USE_SSL: false + ES_VERIFY_CERTS: false + BACKEND: opensearch \ No newline at end of file diff --git a/CHANGELOG.md b/CHANGELOG.md index b552a17f..0df5a582 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,8 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. ### Added - Advanced comparison (LIKE, IN, BETWEEN) operators to the Filter extension [#178](https://github.com/stac-utils/stac-fastapi-elasticsearch/pull/178) +- Collection update endpoint no longer delete all sub items [#177](https://github.com/stac-utils/stac-fastapi-elasticsearch/pull/177) +- OpenSearch 2.11.1 support [#188](https://github.com/stac-utils/stac-fastapi-elasticsearch/pull/188) ### Changed diff --git a/Makefile b/Makefile index ce2609bc..068c86d1 100644 --- a/Makefile +++ b/Makefile @@ -1,20 +1,32 @@ #!make APP_HOST ?= 0.0.0.0 -APP_PORT ?= 8080 +ES_APP_PORT ?= 8080 EXTERNAL_APP_PORT ?= ${APP_PORT} -APP_PORT ?= 8080 +ES_APP_PORT ?= 8080 ES_HOST ?= docker.for.mac.localhost ES_PORT ?= 9200 +OS_APP_PORT ?= 8082 +ES_HOST ?= docker.for.mac.localhost +OS_PORT ?= 9202 + run_es = docker-compose \ run \ - -p ${EXTERNAL_APP_PORT}:${APP_PORT} \ + -p ${EXTERNAL_APP_PORT}:${ES_APP_PORT} \ -e PY_IGNORE_IMPORTMISMATCH=1 \ -e APP_HOST=${APP_HOST} \ - -e APP_PORT=${APP_PORT} \ + -e APP_PORT=${ES_APP_PORT} \ app-elasticsearch +run_os = docker-compose \ + run \ + -p ${EXTERNAL_APP_PORT}:${OS_APP_PORT} \ + -e PY_IGNORE_IMPORTMISMATCH=1 \ + -e APP_HOST=${APP_HOST} \ + -e APP_PORT=${OS_APP_PORT} \ + app-opensearch + .PHONY: image-deploy image-deploy: docker build -f Dockerfile.deploy -t stac-fastapi-elasticsearch:latest . @@ -40,15 +52,32 @@ docker-run: image-dev docker-shell: $(run_es) /bin/bash +.PHONY: test-elasticsearch +test: + -$(run_es) /bin/bash -c 'export && ./scripts/wait-for-it-es.sh elasticsearch:9200 && cd /app/stac_fastapi/elasticsearch/tests/ && pytest' + docker-compose down + +.PHONY: test-opensearch +test-opensearch: + -$(run_os) /bin/bash -c 'export && ./scripts/wait-for-it-es.sh opensearch:9202 && cd /app/stac_fastapi/elasticsearch/tests/ && pytest' + docker-compose down + .PHONY: test test: -$(run_es) /bin/bash -c 'export && ./scripts/wait-for-it-es.sh elasticsearch:9200 && cd /app/stac_fastapi/elasticsearch/tests/ && pytest' docker-compose down -.PHONY: run-database -run-database: + -$(run_os) /bin/bash -c 'export && ./scripts/wait-for-it-es.sh opensearch:9202 && cd /app/stac_fastapi/elasticsearch/tests/ && pytest' + docker-compose down + +.PHONY: run-database-es +run-database-es: docker-compose run --rm elasticsearch +.PHONY: run-database-os +run-database-os: + docker-compose run --rm opensearch + .PHONY: pybase-install pybase-install: pip install wheel && \ diff --git a/README.md b/README.md index 5b4b7685..9ae86aae 100644 --- a/README.md +++ b/README.md @@ -42,13 +42,15 @@ docker-compose build docker-compose up ``` -By default, docker-compose uses Elasticsearch 8.x. However, most recent 7.x versions should also work. +By default, docker-compose uses Elasticsearch 8.x and OpenSearch 2.11.1. If you wish to use a different version, put the following in a file named `.env` in the same directory you run docker-compose from: ```shell ELASTICSEARCH_VERSION=7.17.1 +OPENSEARCH_VERSION=2.11.0 ``` +The most recent Elasticsearch 7.x versions should also work. See the [opensearch-py docs](https://github.com/opensearch-project/opensearch-py/blob/main/COMPATIBILITY.md) for compatibility information. To create a new Collection: @@ -78,7 +80,18 @@ curl -X "GET" "http://localhost:8080/collections?limit=1&token=example_token" ```shell make test ``` - +Test against OpenSearch only + +```shell +make test-opensearch +``` + +Test against Elasticsearch only + +```shell +make test-elasticsearch +``` + ## Ingest sample data ```shell diff --git a/data_loader/data_loader.py b/data_loader/data_loader.py index b8d9f93e..c438811d 100644 --- a/data_loader/data_loader.py +++ b/data_loader/data_loader.py @@ -1,12 +1,24 @@ """Database ingestion script.""" import json import os +import sys import click import requests +if len(sys.argv) != 2: + print("Usage: python data_loader.py ") + sys.exit(1) + DATA_DIR = os.path.join(os.path.dirname(__file__), "setup_data/") -STAC_API_BASE_URL = "http://localhost:8080" + +backend = sys.argv[1].lower() +if backend == "opensearch": + STAC_API_BASE_URL = "http://localhost:8082" +elif backend == "elasticsearch": + STAC_API_BASE_URL = "http://localhost:8080" +else: + print("Invalid backend tag. Enter either 'opensearch' or 'elasticsearch'.") def load_data(filename): diff --git a/docker-compose.yml b/docker-compose.yml index 03698654..916b9e82 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -3,7 +3,7 @@ version: '3.9' services: app-elasticsearch: container_name: stac-fastapi-es - image: stac-utils/stac-fastapi + image: stac-utils/stac-fastapi-es restart: always build: context: . @@ -18,6 +18,7 @@ services: - ES_PORT=9200 - ES_USE_SSL=false - ES_VERIFY_CERTS=false + - BACKEND=elasticsearch ports: - "8080:8080" volumes: @@ -29,6 +30,35 @@ services: command: bash -c "./scripts/wait-for-it-es.sh es-container:9200 && python -m stac_fastapi.elasticsearch.app" + app-opensearch: + container_name: stac-fastapi-os + image: stac-utils/stac-fastapi-os + restart: always + build: + context: . + dockerfile: Dockerfile.dev + environment: + - APP_HOST=0.0.0.0 + - APP_PORT=8082 + - RELOAD=true + - ENVIRONMENT=local + - WEB_CONCURRENCY=10 + - ES_HOST=opensearch + - ES_PORT=9202 + - ES_USE_SSL=false + - ES_VERIFY_CERTS=false + - BACKEND=opensearch + ports: + - "8082:8082" + volumes: + - ./stac_fastapi:/app/stac_fastapi + - ./scripts:/app/scripts + - ./osdata:/usr/share/opensearch/data + depends_on: + - opensearch + command: + bash -c "./scripts/wait-for-it-es.sh os-container:9202 && python -m stac_fastapi.elasticsearch.app" + elasticsearch: container_name: es-container image: docker.elastic.co/elasticsearch/elasticsearch:${ELASTICSEARCH_VERSION:-8.11.0} @@ -40,3 +70,17 @@ services: - ./elasticsearch/snapshots:/usr/share/elasticsearch/snapshots ports: - "9200:9200" + + opensearch: + container_name: os-container + image: opensearchproject/opensearch:${OPENSEARCH_VERSION:-2.11.1} + hostname: opensearch + environment: + - discovery.type=single-node + - plugins.security.disabled=true + - OPENSEARCH_JAVA_OPTS=-Xms512m -Xmx512m + volumes: + - ./opensearch/config/opensearch.yml:/usr/share/opensearch/config/opensearch.yml + - ./opensearch/snapshots:/usr/share/opensearch/snapshots + ports: + - "9202:9202" diff --git a/opensearch/config/opensearch.yml b/opensearch/config/opensearch.yml new file mode 100644 index 00000000..5e44b259 --- /dev/null +++ b/opensearch/config/opensearch.yml @@ -0,0 +1,19 @@ +## Cluster Settings +cluster.name: stac-cluster +node.name: os01 +network.host: 0.0.0.0 +transport.host: 0.0.0.0 +discovery.type: single-node +http.port: 9202 +http.cors.enabled: true +http.cors.allow-headers: X-Requested-With,Content-Type,Content-Length,Accept,Authorization + +path: + repo: + - /usr/share/opensearch/snapshots + +# Security +plugins.security.disabled: true +plugins.security.ssl.http.enabled: true + +node.max_local_storage_nodes: 3 diff --git a/stac_fastapi/elasticsearch/setup.py b/stac_fastapi/elasticsearch/setup.py index 3106c512..a34c5e7c 100644 --- a/stac_fastapi/elasticsearch/setup.py +++ b/stac_fastapi/elasticsearch/setup.py @@ -15,6 +15,8 @@ "stac-fastapi.extensions==2.4.9", "elasticsearch[async]==8.11.0", "elasticsearch-dsl==8.11.0", + "opensearch-py==2.4.2", + "opensearch-py[async]==2.4.2", "pystac[validation]", "uvicorn", "orjson", diff --git a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/app.py b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/app.py index 8adcece4..570edcc5 100644 --- a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/app.py +++ b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/app.py @@ -1,14 +1,26 @@ """FastAPI application.""" +import os + from stac_fastapi.api.app import StacApi from stac_fastapi.api.models import create_get_request_model, create_post_request_model -from stac_fastapi.elasticsearch.config import ElasticsearchSettings from stac_fastapi.elasticsearch.core import ( BulkTransactionsClient, CoreClient, EsAsyncBaseFiltersClient, TransactionsClient, ) -from stac_fastapi.elasticsearch.database_logic import create_collection_index + +if os.getenv("BACKEND", "elasticsearch").lower() == "opensearch": + from stac_fastapi.elasticsearch.config.config_opensearch import SearchSettings + from stac_fastapi.elasticsearch.database_logic.database_logic_opensearch import ( + create_collection_index, + ) +else: + from stac_fastapi.elasticsearch.config.config_elasticsearch import SearchSettings + from stac_fastapi.elasticsearch.database_logic.database_logic_elasticsearch import ( + create_collection_index, + ) + from stac_fastapi.elasticsearch.extensions import QueryExtension from stac_fastapi.elasticsearch.session import Session from stac_fastapi.extensions.core import ( @@ -21,7 +33,7 @@ ) from stac_fastapi.extensions.third_party import BulkTransactionExtension -settings = ElasticsearchSettings() +settings = SearchSettings() session = Session.create_from_settings(settings) filter_extension = FilterExtension(client=EsAsyncBaseFiltersClient()) diff --git a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/config/__init__.py b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/config/__init__.py new file mode 100644 index 00000000..77cfd61d --- /dev/null +++ b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/config/__init__.py @@ -0,0 +1 @@ +"""client config implementations.""" diff --git a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/config.py b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/config/config_elasticsearch.py similarity index 96% rename from stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/config.py rename to stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/config/config_elasticsearch.py index 10cf95e9..903fdc3b 100644 --- a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/config.py +++ b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/config/config_elasticsearch.py @@ -54,7 +54,7 @@ def _es_config() -> Dict[str, Any]: _forbidden_fields: Set[str] = {"type"} -class ElasticsearchSettings(ApiSettings): +class SearchSettings(ApiSettings): """API settings.""" # Fields which are defined by STAC but not included in the database model @@ -67,7 +67,7 @@ def create_client(self): return Elasticsearch(**_es_config()) -class AsyncElasticsearchSettings(ApiSettings): +class AsyncSearchSettings(ApiSettings): """API settings.""" # Fields which are defined by STAC but not included in the database model diff --git a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/config/config_opensearch.py b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/config/config_opensearch.py new file mode 100644 index 00000000..6ea49008 --- /dev/null +++ b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/config/config_opensearch.py @@ -0,0 +1,81 @@ +"""API configuration.""" +import os +import ssl +from typing import Any, Dict, Set + +from opensearchpy import AsyncOpenSearch, OpenSearch + +from stac_fastapi.types.config import ApiSettings + + +def _es_config() -> Dict[str, Any]: + # Determine the scheme (http or https) + use_ssl = os.getenv("ES_USE_SSL", "true").lower() == "true" + scheme = "https" if use_ssl else "http" + + # Configure the hosts parameter with the correct scheme + hosts = [f"{scheme}://{os.getenv('ES_HOST')}:{os.getenv('ES_PORT')}"] + + # Initialize the configuration dictionary + config = { + "hosts": hosts, + "headers": {"accept": "application/json", "Content-Type": "application/json"}, + } + + # Explicitly exclude SSL settings when not using SSL + if not use_ssl: + return config + + # Include SSL settings if using https + config["ssl_version"] = ssl.TLSVersion.TLSv1_3 # type: ignore + config["verify_certs"] = os.getenv("ES_VERIFY_CERTS", "true").lower() != "false" # type: ignore + + # Include CA Certificates if verifying certs + if config["verify_certs"]: + config["ca_certs"] = os.getenv( + "CURL_CA_BUNDLE", "/etc/ssl/certs/ca-certificates.crt" + ) + + # Handle authentication + if (u := os.getenv("ES_USER")) and (p := os.getenv("ES_PASS")): + config["http_auth"] = (u, p) + + if api_key := os.getenv("ES_API_KEY"): + if isinstance(config["headers"], dict): + headers = {**config["headers"], "x-api-key": api_key} + + else: + config["headers"] = {"x-api-key": api_key} + + config["headers"] = headers + + return config + + +_forbidden_fields: Set[str] = {"type"} + + +class SearchSettings(ApiSettings): + """API settings.""" + + # Fields which are defined by STAC but not included in the database model + forbidden_fields: Set[str] = _forbidden_fields + indexed_fields: Set[str] = {"datetime"} + + @property + def create_client(self): + """Create es client.""" + return OpenSearch(**_es_config()) + + +class AsyncSearchSettings(ApiSettings): + """API settings.""" + + # Fields which are defined by STAC but not included in the database model + forbidden_fields: Set[str] = _forbidden_fields + indexed_fields: Set[str] = {"datetime"} + + @property + def create_client(self): + """Create async elasticsearch client.""" + return AsyncOpenSearch(**_es_config()) diff --git a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/core.py b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/core.py index a78d4707..ed6d7da9 100644 --- a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/core.py +++ b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/core.py @@ -1,5 +1,6 @@ """Item crud client.""" import logging +import os import re from base64 import urlsafe_b64encode from datetime import datetime as datetime_type @@ -18,9 +19,18 @@ from stac_pydantic.links import Relations from stac_pydantic.shared import MimeTypes +if os.getenv("BACKEND", "elasticsearch").lower() == "opensearch": + from stac_fastapi.elasticsearch.config.config_opensearch import SearchSettings + from stac_fastapi.elasticsearch.database_logic.database_logic_opensearch import ( + DatabaseLogic, + ) +else: + from stac_fastapi.elasticsearch.database_logic.database_logic_elasticsearch import ( + DatabaseLogic, + ) + from stac_fastapi.elasticsearch.config.config_elasticsearch import SearchSettings + from stac_fastapi.elasticsearch import serializers -from stac_fastapi.elasticsearch.config import ElasticsearchSettings -from stac_fastapi.elasticsearch.database_logic import DatabaseLogic from stac_fastapi.elasticsearch.models.links import PagingLinks from stac_fastapi.elasticsearch.serializers import CollectionSerializer, ItemSerializer from stac_fastapi.elasticsearch.session import Session @@ -728,7 +738,7 @@ class BulkTransactionsClient(BaseBulkTransactionsClient): def __attrs_post_init__(self): """Create es engine.""" - settings = ElasticsearchSettings() + settings = SearchSettings() self.client = settings.create_client def preprocess_item( diff --git a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic/__init__.py b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic/__init__.py new file mode 100644 index 00000000..2a8962ca --- /dev/null +++ b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic/__init__.py @@ -0,0 +1 @@ +"""database logic implementations.""" diff --git a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic/database_logic_elasticsearch.py similarity index 96% rename from stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py rename to stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic/database_logic_elasticsearch.py index 8b8911d1..44104151 100644 --- a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py +++ b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic/database_logic_elasticsearch.py @@ -10,11 +10,12 @@ from elasticsearch import exceptions, helpers # type: ignore from stac_fastapi.elasticsearch import serializers -from stac_fastapi.elasticsearch.config import AsyncElasticsearchSettings -from stac_fastapi.elasticsearch.config import ( - ElasticsearchSettings as SyncElasticsearchSettings, +from stac_fastapi.elasticsearch.config.config_elasticsearch import AsyncSearchSettings +from stac_fastapi.elasticsearch.config.config_elasticsearch import ( + SearchSettings as SyncSearchSettings, ) from stac_fastapi.elasticsearch.extensions import filter +from stac_fastapi.elasticsearch.utilities import bbox2polygon from stac_fastapi.types.errors import ConflictError, NotFoundError from stac_fastapi.types.stac import Collection, Item @@ -177,7 +178,7 @@ async def create_collection_index() -> None: None """ - client = AsyncElasticsearchSettings().create_client + client = AsyncSearchSettings().create_client await client.options(ignore_status=400).indices.create( index=f"{COLLECTIONS_INDEX}-000001", @@ -198,7 +199,7 @@ async def create_item_index(collection_id: str): None """ - client = AsyncElasticsearchSettings().create_client + client = AsyncSearchSettings().create_client index_name = index_by_collection_id(collection_id) await client.options(ignore_status=400).indices.create( @@ -216,7 +217,7 @@ async def delete_item_index(collection_id: str): Args: collection_id (str): The ID of the collection whose items index will be deleted. """ - client = AsyncElasticsearchSettings().create_client + client = AsyncSearchSettings().create_client name = index_by_collection_id(collection_id) resolved = await client.indices.resolve_index(name=name) @@ -229,21 +230,6 @@ async def delete_item_index(collection_id: str): await client.close() -def bbox2polygon(b0: float, b1: float, b2: float, b3: float) -> List[List[List[float]]]: - """Transform a bounding box represented by its four coordinates `b0`, `b1`, `b2`, and `b3` into a polygon. - - Args: - b0 (float): The x-coordinate of the lower-left corner of the bounding box. - b1 (float): The y-coordinate of the lower-left corner of the bounding box. - b2 (float): The x-coordinate of the upper-right corner of the bounding box. - b3 (float): The y-coordinate of the upper-right corner of the bounding box. - - Returns: - List[List[List[float]]]: A polygon represented as a list of lists of coordinates. - """ - return [[[b0, b1], [b2, b1], [b2, b3], [b0, b3], [b0, b1]]] - - def mk_item_id(item_id: str, collection_id: str): """Create the document id for an Item in Elasticsearch. @@ -293,8 +279,8 @@ class Geometry(Protocol): # noqa class DatabaseLogic: """Database logic.""" - client = AsyncElasticsearchSettings().create_client - sync_client = SyncElasticsearchSettings().create_client + client = AsyncSearchSettings().create_client + sync_client = SyncSearchSettings().create_client item_serializer: Type[serializers.ItemSerializer] = attr.ib( default=serializers.ItemSerializer diff --git a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic/database_logic_opensearch.py b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic/database_logic_opensearch.py new file mode 100644 index 00000000..b5ba29db --- /dev/null +++ b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic/database_logic_opensearch.py @@ -0,0 +1,908 @@ +"""Database logic.""" +import asyncio +import logging +import os +from base64 import urlsafe_b64decode, urlsafe_b64encode +from typing import Any, Dict, Iterable, List, Optional, Protocol, Tuple, Type, Union + +import attr +from opensearchpy import exceptions, helpers +from opensearchpy.exceptions import TransportError +from opensearchpy.helpers.query import Q +from opensearchpy.helpers.search import Search + +from stac_fastapi.elasticsearch import serializers +from stac_fastapi.elasticsearch.config.config_opensearch import AsyncSearchSettings +from stac_fastapi.elasticsearch.config.config_opensearch import ( + SearchSettings as SyncSearchSettings, +) +from stac_fastapi.elasticsearch.extensions import filter +from stac_fastapi.elasticsearch.utilities import bbox2polygon +from stac_fastapi.types.errors import ConflictError, NotFoundError +from stac_fastapi.types.stac import Collection, Item + +logger = logging.getLogger(__name__) + +NumType = Union[float, int] + +COLLECTIONS_INDEX = os.getenv("STAC_COLLECTIONS_INDEX", "collections") +ITEMS_INDEX_PREFIX = os.getenv("STAC_ITEMS_INDEX_PREFIX", "items_") +ES_INDEX_NAME_UNSUPPORTED_CHARS = { + "\\", + "/", + "*", + "?", + '"', + "<", + ">", + "|", + " ", + ",", + "#", + ":", +} + +ITEM_INDICES = f"{ITEMS_INDEX_PREFIX}*,-*kibana*,-{COLLECTIONS_INDEX}*" + +DEFAULT_SORT = { + "properties.datetime": {"order": "desc"}, + "id": {"order": "desc"}, + "collection": {"order": "desc"}, +} + +ES_ITEMS_SETTINGS = { + "index": { + "sort.field": list(DEFAULT_SORT.keys()), + "sort.order": [v["order"] for v in DEFAULT_SORT.values()], + } +} + +ES_MAPPINGS_DYNAMIC_TEMPLATES = [ + # Common https://github.com/radiantearth/stac-spec/blob/master/item-spec/common-metadata.md + { + "descriptions": { + "match_mapping_type": "string", + "match": "description", + "mapping": {"type": "text"}, + } + }, + { + "titles": { + "match_mapping_type": "string", + "match": "title", + "mapping": {"type": "text"}, + } + }, + # Projection Extension https://github.com/stac-extensions/projection + {"proj_epsg": {"match": "proj:epsg", "mapping": {"type": "integer"}}}, + { + "proj_projjson": { + "match": "proj:projjson", + "mapping": {"type": "object", "enabled": False}, + } + }, + { + "proj_centroid": { + "match": "proj:centroid", + "mapping": {"type": "geo_point"}, + } + }, + { + "proj_geometry": { + "match": "proj:geometry", + "mapping": {"type": "object", "enabled": False}, + } + }, + { + "no_index_href": { + "match": "href", + "mapping": {"type": "text", "index": False}, + } + }, + # Default all other strings not otherwise specified to keyword + {"strings": {"match_mapping_type": "string", "mapping": {"type": "keyword"}}}, + {"numerics": {"match_mapping_type": "long", "mapping": {"type": "float"}}}, +] + +ES_ITEMS_MAPPINGS = { + "numeric_detection": False, + "dynamic_templates": ES_MAPPINGS_DYNAMIC_TEMPLATES, + "properties": { + "id": {"type": "keyword"}, + "collection": {"type": "keyword"}, + "geometry": {"type": "geo_shape"}, + "assets": {"type": "object", "enabled": False}, + "links": {"type": "object", "enabled": False}, + "properties": { + "type": "object", + "properties": { + # Common https://github.com/radiantearth/stac-spec/blob/master/item-spec/common-metadata.md + "datetime": {"type": "date"}, + "start_datetime": {"type": "date"}, + "end_datetime": {"type": "date"}, + "created": {"type": "date"}, + "updated": {"type": "date"}, + # Satellite Extension https://github.com/stac-extensions/sat + "sat:absolute_orbit": {"type": "integer"}, + "sat:relative_orbit": {"type": "integer"}, + }, + }, + }, +} + +ES_COLLECTIONS_MAPPINGS = { + "numeric_detection": False, + "dynamic_templates": ES_MAPPINGS_DYNAMIC_TEMPLATES, + "properties": { + "extent.spatial.bbox": {"type": "long"}, + "extent.temporal.interval": {"type": "date"}, + "providers": {"type": "object", "enabled": False}, + "links": {"type": "object", "enabled": False}, + "item_assets": {"type": "object", "enabled": False}, + }, +} + + +def index_by_collection_id(collection_id: str) -> str: + """ + Translate a collection id into an Elasticsearch index name. + + Args: + collection_id (str): The collection id to translate into an index name. + + Returns: + str: The index name derived from the collection id. + """ + return f"{ITEMS_INDEX_PREFIX}{''.join(c for c in collection_id.lower() if c not in ES_INDEX_NAME_UNSUPPORTED_CHARS)}" + + +def indices(collection_ids: Optional[List[str]]) -> str: + """ + Get a comma-separated string of index names for a given list of collection ids. + + Args: + collection_ids: A list of collection ids. + + Returns: + A string of comma-separated index names. If `collection_ids` is None, returns the default indices. + """ + if collection_ids is None: + return ITEM_INDICES + else: + return ",".join([index_by_collection_id(c) for c in collection_ids]) + + +async def create_collection_index() -> None: + """ + Create the index for a Collection. + + Returns: + None + + """ + client = AsyncSearchSettings().create_client + + search_body = { + "mappings": ES_COLLECTIONS_MAPPINGS, + "aliases": {COLLECTIONS_INDEX: {}}, + } + + index = f"{COLLECTIONS_INDEX}-000001" + + try: + await client.indices.create(index=index, body=search_body) + except TransportError as e: + if e.status_code == 400: + pass # Ignore 400 status codes + else: + raise e + + await client.close() + + +async def create_item_index(collection_id: str): + """ + Create the index for Items. + + Args: + collection_id (str): Collection identifier. + + Returns: + None + + """ + client = AsyncSearchSettings().create_client + index_name = index_by_collection_id(collection_id) + search_body = { + "aliases": {index_name: {}}, + "mappings": ES_ITEMS_MAPPINGS, + "settings": ES_ITEMS_SETTINGS, + } + + try: + await client.indices.create(index=f"{index_name}-000001", body=search_body) + except TransportError as e: + if e.status_code == 400: + pass # Ignore 400 status codes + else: + raise e + + await client.close() + + +async def delete_item_index(collection_id: str): + """Delete the index for items in a collection. + + Args: + collection_id (str): The ID of the collection whose items index will be deleted. + """ + client = AsyncSearchSettings().create_client + + name = index_by_collection_id(collection_id) + resolved = await client.indices.resolve_index(name=name) + if "aliases" in resolved and resolved["aliases"]: + [alias] = resolved["aliases"] + await client.indices.delete_alias(index=alias["indices"], name=alias["name"]) + await client.indices.delete(index=alias["indices"]) + else: + await client.indices.delete(index=name) + await client.close() + + +def mk_item_id(item_id: str, collection_id: str): + """Create the document id for an Item in Elasticsearch. + + Args: + item_id (str): The id of the Item. + collection_id (str): The id of the Collection that the Item belongs to. + + Returns: + str: The document id for the Item, combining the Item id and the Collection id, separated by a `|` character. + """ + return f"{item_id}|{collection_id}" + + +def mk_actions(collection_id: str, processed_items: List[Item]): + """Create Elasticsearch bulk actions for a list of processed items. + + Args: + collection_id (str): The identifier for the collection the items belong to. + processed_items (List[Item]): The list of processed items to be bulk indexed. + + Returns: + List[Dict[str, Union[str, Dict]]]: The list of bulk actions to be executed, + each action being a dictionary with the following keys: + - `_index`: the index to store the document in. + - `_id`: the document's identifier. + - `_source`: the source of the document. + """ + return [ + { + "_index": index_by_collection_id(collection_id), + "_id": mk_item_id(item["id"], item["collection"]), + "_source": item, + } + for item in processed_items + ] + + +# stac_pydantic classes extend _GeometryBase, which doesn't have a type field, +# So create our own Protocol for typing +# Union[ Point, MultiPoint, LineString, MultiLineString, Polygon, MultiPolygon, GeometryCollection] +class Geometry(Protocol): # noqa + type: str + coordinates: Any + + +@attr.s +class DatabaseLogic: + """Database logic.""" + + client = AsyncSearchSettings().create_client + sync_client = SyncSearchSettings().create_client + + item_serializer: Type[serializers.ItemSerializer] = attr.ib( + default=serializers.ItemSerializer + ) + collection_serializer: Type[serializers.CollectionSerializer] = attr.ib( + default=serializers.CollectionSerializer + ) + + """CORE LOGIC""" + + async def get_all_collections( + self, + token: Optional[str], + limit: int, + ) -> Iterable[Dict[str, Any]]: + """Retrieve a list of all collections from the database. + + Args: + token (Optional[str]): The token used to return the next set of results. + limit (int): Number of results to return + + Returns: + collections (Iterable[Dict[str, Any]]): A list of dictionaries containing the source data for each collection. + + Notes: + The collections are retrieved from the Elasticsearch database using the `client.search` method, + with the `COLLECTIONS_INDEX` as the target index and `size=limit` to retrieve records. + The result is a generator of dictionaries containing the source data for each collection. + """ + search_body: Dict[str, Any] = {} + if token: + search_after = urlsafe_b64decode(token.encode()).decode().split(",") + search_body["search_after"] = search_after + + search_body["sort"] = {"id": {"order": "asc"}} + + collections = await self.client.search( + index=COLLECTIONS_INDEX, body=search_body, size=limit + ) + hits = collections["hits"]["hits"] + return hits + + async def get_one_item(self, collection_id: str, item_id: str) -> Dict: + """Retrieve a single item from the database. + + Args: + collection_id (str): The id of the Collection that the Item belongs to. + item_id (str): The id of the Item. + + Returns: + item (Dict): A dictionary containing the source data for the Item. + + Raises: + NotFoundError: If the specified Item does not exist in the Collection. + + Notes: + The Item is retrieved from the Elasticsearch database using the `client.get` method, + with the index for the Collection as the target index and the combined `mk_item_id` as the document id. + """ + try: + item = await self.client.get( + index=index_by_collection_id(collection_id), + id=mk_item_id(item_id, collection_id), + ) + except exceptions.NotFoundError: + raise NotFoundError( + f"Item {item_id} does not exist in Collection {collection_id}" + ) + return item["_source"] + + @staticmethod + def make_search(): + """Database logic to create a Search instance.""" + return Search().sort(*DEFAULT_SORT) + + @staticmethod + def apply_ids_filter(search: Search, item_ids: List[str]): + """Database logic to search a list of STAC item ids.""" + return search.filter("terms", id=item_ids) + + @staticmethod + def apply_collections_filter(search: Search, collection_ids: List[str]): + """Database logic to search a list of STAC collection ids.""" + return search.filter("terms", collection=collection_ids) + + @staticmethod + def apply_datetime_filter(search: Search, datetime_search): + """Apply a filter to search based on datetime field. + + Args: + search (Search): The search object to filter. + datetime_search (dict): The datetime filter criteria. + + Returns: + Search: The filtered search object. + """ + if "eq" in datetime_search: + search = search.filter( + "term", **{"properties__datetime": datetime_search["eq"]} + ) + else: + search = search.filter( + "range", properties__datetime={"lte": datetime_search["lte"]} + ) + search = search.filter( + "range", properties__datetime={"gte": datetime_search["gte"]} + ) + return search + + @staticmethod + def apply_bbox_filter(search: Search, bbox: List): + """Filter search results based on bounding box. + + Args: + search (Search): The search object to apply the filter to. + bbox (List): The bounding box coordinates, represented as a list of four values [minx, miny, maxx, maxy]. + + Returns: + search (Search): The search object with the bounding box filter applied. + + Notes: + The bounding box is transformed into a polygon using the `bbox2polygon` function and + a geo_shape filter is added to the search object, set to intersect with the specified polygon. + """ + return search.filter( + Q( + { + "geo_shape": { + "geometry": { + "shape": { + "type": "polygon", + "coordinates": bbox2polygon(*bbox), + }, + "relation": "intersects", + } + } + } + ) + ) + + @staticmethod + def apply_intersects_filter( + search: Search, + intersects: Geometry, + ): + """Filter search results based on intersecting geometry. + + Args: + search (Search): The search object to apply the filter to. + intersects (Geometry): The intersecting geometry, represented as a GeoJSON-like object. + + Returns: + search (Search): The search object with the intersecting geometry filter applied. + + Notes: + A geo_shape filter is added to the search object, set to intersect with the specified geometry. + """ + return search.filter( + Q( + { + "geo_shape": { + "geometry": { + "shape": { + "type": intersects.type.lower(), + "coordinates": intersects.coordinates, + }, + "relation": "intersects", + } + } + } + ) + ) + + @staticmethod + def apply_stacql_filter(search: Search, op: str, field: str, value: float): + """Filter search results based on a comparison between a field and a value. + + Args: + search (Search): The search object to apply the filter to. + op (str): The comparison operator to use. Can be 'eq' (equal), 'gt' (greater than), 'gte' (greater than or equal), + 'lt' (less than), or 'lte' (less than or equal). + field (str): The field to perform the comparison on. + value (float): The value to compare the field against. + + Returns: + search (Search): The search object with the specified filter applied. + """ + if op != "eq": + key_filter = {field: {f"{op}": value}} + search = search.filter(Q("range", **key_filter)) + else: + search = search.filter("term", **{field: value}) + + return search + + @staticmethod + def apply_cql2_filter(search: Search, _filter: Optional[Dict[str, Any]]): + """Database logic to perform query for search endpoint.""" + if _filter is not None: + search = search.filter(filter.Clause.parse_obj(_filter).to_es()) + return search + + @staticmethod + def populate_sort(sortby: List) -> Optional[Dict[str, Dict[str, str]]]: + """Database logic to sort search instance.""" + if sortby: + return {s.field: {"order": s.direction} for s in sortby} + else: + return None + + async def execute_search( + self, + search: Search, + limit: int, + token: Optional[str], + sort: Optional[Dict[str, Dict[str, str]]], + collection_ids: Optional[List[str]], + ignore_unavailable: bool = True, + ) -> Tuple[Iterable[Dict[str, Any]], Optional[int], Optional[str]]: + """Execute a search query with limit and other optional parameters. + + Args: + search (Search): The search query to be executed. + limit (int): The maximum number of results to be returned. + token (Optional[str]): The token used to return the next set of results. + sort (Optional[Dict[str, Dict[str, str]]]): Specifies how the results should be sorted. + collection_ids (Optional[List[str]]): The collection ids to search. + ignore_unavailable (bool, optional): Whether to ignore unavailable collections. Defaults to True. + + Returns: + Tuple[Iterable[Dict[str, Any]], Optional[int], Optional[str]]: A tuple containing: + - An iterable of search results, where each result is a dictionary with keys and values representing the + fields and values of each document. + - The total number of results (if the count could be computed), or None if the count could not be + computed. + - The token to be used to retrieve the next set of results, or None if there are no more results. + + Raises: + NotFoundError: If the collections specified in `collection_ids` do not exist. + """ + search_body: Dict[str, Any] = {} + query = search.query.to_dict() if search.query else None + if query: + search_body["query"] = query + if token: + search_after = urlsafe_b64decode(token.encode()).decode().split(",") + search_body["search_after"] = search_after + search_body["sort"] = sort if sort else DEFAULT_SORT + + index_param = indices(collection_ids) + + search_task = asyncio.create_task( + self.client.search( + index=index_param, + ignore_unavailable=ignore_unavailable, + body=search_body, + size=limit, + ) + ) + + count_task = asyncio.create_task( + self.client.count( + index=index_param, + ignore_unavailable=ignore_unavailable, + body=search.to_dict(count=True), + ) + ) + + try: + es_response = await search_task + except exceptions.NotFoundError: + raise NotFoundError(f"Collections '{collection_ids}' do not exist") + + hits = es_response["hits"]["hits"] + items = (hit["_source"] for hit in hits) + + next_token = None + if hits and (sort_array := hits[-1].get("sort")): + next_token = urlsafe_b64encode( + ",".join([str(x) for x in sort_array]).encode() + ).decode() + + # (1) count should not block returning results, so don't wait for it to be done + # (2) don't cancel the task so that it will populate the ES cache for subsequent counts + maybe_count = None + if count_task.done(): + try: + maybe_count = count_task.result().get("count") + except Exception as e: + logger.error(f"Count task failed: {e}") + + return items, maybe_count, next_token + + """ TRANSACTION LOGIC """ + + async def check_collection_exists(self, collection_id: str): + """Database logic to check if a collection exists.""" + if not await self.client.exists(index=COLLECTIONS_INDEX, id=collection_id): + raise NotFoundError(f"Collection {collection_id} does not exist") + + async def prep_create_item( + self, item: Item, base_url: str, exist_ok: bool = False + ) -> Item: + """ + Preps an item for insertion into the database. + + Args: + item (Item): The item to be prepped for insertion. + base_url (str): The base URL used to create the item's self URL. + exist_ok (bool): Indicates whether the item can exist already. + + Returns: + Item: The prepped item. + + Raises: + ConflictError: If the item already exists in the database. + + """ + await self.check_collection_exists(collection_id=item["collection"]) + + if not exist_ok and await self.client.exists( + index=index_by_collection_id(item["collection"]), + id=mk_item_id(item["id"], item["collection"]), + ): + raise ConflictError( + f"Item {item['id']} in collection {item['collection']} already exists" + ) + + return self.item_serializer.stac_to_db(item, base_url) + + def sync_prep_create_item( + self, item: Item, base_url: str, exist_ok: bool = False + ) -> Item: + """ + Prepare an item for insertion into the database. + + This method performs pre-insertion preparation on the given `item`, + such as checking if the collection the item belongs to exists, + and optionally verifying that an item with the same ID does not already exist in the database. + + Args: + item (Item): The item to be inserted into the database. + base_url (str): The base URL used for constructing URLs for the item. + exist_ok (bool): Indicates whether the item can exist already. + + Returns: + Item: The item after preparation is done. + + Raises: + NotFoundError: If the collection that the item belongs to does not exist in the database. + ConflictError: If an item with the same ID already exists in the collection. + """ + item_id = item["id"] + collection_id = item["collection"] + if not self.sync_client.exists(index=COLLECTIONS_INDEX, id=collection_id): + raise NotFoundError(f"Collection {collection_id} does not exist") + + if not exist_ok and self.sync_client.exists( + index=index_by_collection_id(collection_id), + id=mk_item_id(item_id, collection_id), + ): + raise ConflictError( + f"Item {item_id} in collection {collection_id} already exists" + ) + + return self.item_serializer.stac_to_db(item, base_url) + + async def create_item(self, item: Item, refresh: bool = False): + """Database logic for creating one item. + + Args: + item (Item): The item to be created. + refresh (bool, optional): Refresh the index after performing the operation. Defaults to False. + + Raises: + ConflictError: If the item already exists in the database. + + Returns: + None + """ + # todo: check if collection exists, but cache + item_id = item["id"] + collection_id = item["collection"] + es_resp = await self.client.index( + index=index_by_collection_id(collection_id), + id=mk_item_id(item_id, collection_id), + body=item, + refresh=refresh, + ) + + if (meta := es_resp.get("meta")) and meta.get("status") == 409: + raise ConflictError( + f"Item {item_id} in collection {collection_id} already exists" + ) + + async def delete_item( + self, item_id: str, collection_id: str, refresh: bool = False + ): + """Delete a single item from the database. + + Args: + item_id (str): The id of the Item to be deleted. + collection_id (str): The id of the Collection that the Item belongs to. + refresh (bool, optional): Whether to refresh the index after the deletion. Default is False. + + Raises: + NotFoundError: If the Item does not exist in the database. + """ + try: + await self.client.delete( + index=index_by_collection_id(collection_id), + id=mk_item_id(item_id, collection_id), + refresh=refresh, + ) + except exceptions.NotFoundError: + raise NotFoundError( + f"Item {item_id} in collection {collection_id} not found" + ) + + async def create_collection(self, collection: Collection, refresh: bool = False): + """Create a single collection in the database. + + Args: + collection (Collection): The Collection object to be created. + refresh (bool, optional): Whether to refresh the index after the creation. Default is False. + + Raises: + ConflictError: If a Collection with the same id already exists in the database. + + Notes: + A new index is created for the items in the Collection using the `create_item_index` function. + """ + collection_id = collection["id"] + + if await self.client.exists(index=COLLECTIONS_INDEX, id=collection_id): + raise ConflictError(f"Collection {collection_id} already exists") + + await self.client.index( + index=COLLECTIONS_INDEX, + id=collection_id, + body=collection, + refresh=refresh, + ) + + await create_item_index(collection_id) + + async def find_collection(self, collection_id: str) -> Collection: + """Find and return a collection from the database. + + Args: + self: The instance of the object calling this function. + collection_id (str): The ID of the collection to be found. + + Returns: + Collection: The found collection, represented as a `Collection` object. + + Raises: + NotFoundError: If the collection with the given `collection_id` is not found in the database. + + Notes: + This function searches for a collection in the database using the specified `collection_id` and returns the found + collection as a `Collection` object. If the collection is not found, a `NotFoundError` is raised. + """ + try: + collection = await self.client.get( + index=COLLECTIONS_INDEX, id=collection_id + ) + except exceptions.NotFoundError: + raise NotFoundError(f"Collection {collection_id} not found") + + return collection["_source"] + + async def update_collection( + self, collection_id: str, collection: Collection, refresh: bool = False + ): + """Update a collection from the database. + + Args: + self: The instance of the object calling this function. + collection_id (str): The ID of the collection to be updated. + collection (Collection): The Collection object to be used for the update. + + Raises: + NotFoundError: If the collection with the given `collection_id` is not + found in the database. + + Notes: + This function updates the collection in the database using the specified + `collection_id` and with the collection specified in the `Collection` object. + If the collection is not found, a `NotFoundError` is raised. + """ + await self.find_collection(collection_id=collection_id) + + if collection_id != collection["id"]: + await self.create_collection(collection, refresh=refresh) + + await self.client.reindex( + body={ + "dest": {"index": f"{ITEMS_INDEX_PREFIX}{collection['id']}"}, + "source": {"index": f"{ITEMS_INDEX_PREFIX}{collection_id}"}, + "script": { + "lang": "painless", + "source": f"""ctx._id = ctx._id.replace('{collection_id}', '{collection["id"]}'); ctx._source.collection = '{collection["id"]}' ;""", + }, + }, + wait_for_completion=True, + refresh=refresh, + ) + + await self.delete_collection(collection_id) + + else: + await self.client.index( + index=COLLECTIONS_INDEX, + id=collection_id, + body=collection, + refresh=refresh, + ) + + async def delete_collection(self, collection_id: str, refresh: bool = False): + """Delete a collection from the database. + + Parameters: + self: The instance of the object calling this function. + collection_id (str): The ID of the collection to be deleted. + refresh (bool): Whether to refresh the index after the deletion (default: False). + + Raises: + NotFoundError: If the collection with the given `collection_id` is not found in the database. + + Notes: + This function first verifies that the collection with the specified `collection_id` exists in the database, and then + deletes the collection. If `refresh` is set to True, the index is refreshed after the deletion. Additionally, this + function also calls `delete_item_index` to delete the index for the items in the collection. + """ + await self.find_collection(collection_id=collection_id) + await self.client.delete( + index=COLLECTIONS_INDEX, id=collection_id, refresh=refresh + ) + await delete_item_index(collection_id) + + async def bulk_async( + self, collection_id: str, processed_items: List[Item], refresh: bool = False + ) -> None: + """Perform a bulk insert of items into the database asynchronously. + + Args: + self: The instance of the object calling this function. + collection_id (str): The ID of the collection to which the items belong. + processed_items (List[Item]): A list of `Item` objects to be inserted into the database. + refresh (bool): Whether to refresh the index after the bulk insert (default: False). + + Notes: + This function performs a bulk insert of `processed_items` into the database using the specified `collection_id`. The + insert is performed asynchronously, and the event loop is used to run the operation in a separate executor. The + `mk_actions` function is called to generate a list of actions for the bulk insert. If `refresh` is set to True, the + index is refreshed after the bulk insert. The function does not return any value. + """ + await helpers.async_bulk( + self.client, + mk_actions(collection_id, processed_items), + refresh=refresh, + raise_on_error=False, + ) + + def bulk_sync( + self, collection_id: str, processed_items: List[Item], refresh: bool = False + ) -> None: + """Perform a bulk insert of items into the database synchronously. + + Args: + self: The instance of the object calling this function. + collection_id (str): The ID of the collection to which the items belong. + processed_items (List[Item]): A list of `Item` objects to be inserted into the database. + refresh (bool): Whether to refresh the index after the bulk insert (default: False). + + Notes: + This function performs a bulk insert of `processed_items` into the database using the specified `collection_id`. The + insert is performed synchronously and blocking, meaning that the function does not return until the insert has + completed. The `mk_actions` function is called to generate a list of actions for the bulk insert. If `refresh` is set to + True, the index is refreshed after the bulk insert. The function does not return any value. + """ + helpers.bulk( + self.sync_client, + mk_actions(collection_id, processed_items), + refresh=refresh, + raise_on_error=False, + ) + + # DANGER + async def delete_items(self) -> None: + """Danger. this is only for tests.""" + await self.client.delete_by_query( + index=ITEM_INDICES, + body={"query": {"match_all": {}}}, + wait_for_completion=True, + ) + + # DANGER + async def delete_collections(self) -> None: + """Danger. this is only for tests.""" + await self.client.delete_by_query( + index=COLLECTIONS_INDEX, + body={"query": {"match_all": {}}}, + wait_for_completion=True, + ) diff --git a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/utilities.py b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/utilities.py new file mode 100644 index 00000000..b5dac390 --- /dev/null +++ b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/utilities.py @@ -0,0 +1,21 @@ +"""Module for geospatial processing functions. + +This module contains functions for transforming geospatial coordinates, +such as converting bounding boxes to polygon representations. +""" +from typing import List + + +def bbox2polygon(b0: float, b1: float, b2: float, b3: float) -> List[List[List[float]]]: + """Transform a bounding box represented by its four coordinates `b0`, `b1`, `b2`, and `b3` into a polygon. + + Args: + b0 (float): The x-coordinate of the lower-left corner of the bounding box. + b1 (float): The y-coordinate of the lower-left corner of the bounding box. + b2 (float): The x-coordinate of the upper-right corner of the bounding box. + b3 (float): The y-coordinate of the upper-right corner of the bounding box. + + Returns: + List[List[List[float]]]: A polygon represented as a list of lists of coordinates. + """ + return [[[b0, b1], [b2, b1], [b2, b3], [b0, b3], [b0, b1]]] diff --git a/stac_fastapi/elasticsearch/tests/conftest.py b/stac_fastapi/elasticsearch/tests/conftest.py index 13956329..cfe6194b 100644 --- a/stac_fastapi/elasticsearch/tests/conftest.py +++ b/stac_fastapi/elasticsearch/tests/conftest.py @@ -10,13 +10,25 @@ from stac_fastapi.api.app import StacApi from stac_fastapi.api.models import create_get_request_model, create_post_request_model -from stac_fastapi.elasticsearch.config import AsyncElasticsearchSettings + +if os.getenv("BACKEND", "elasticsearch").lower() == "opensearch": + from stac_fastapi.elasticsearch.config.config_opensearch import AsyncSearchSettings + from stac_fastapi.elasticsearch.database_logic.database_logic_opensearch import ( + create_collection_index, + ) +else: + from stac_fastapi.elasticsearch.config.config_elasticsearch import ( + AsyncSearchSettings, + ) + from stac_fastapi.elasticsearch.database_logic.database_logic_elasticsearch import ( + create_collection_index, + ) + from stac_fastapi.elasticsearch.core import ( BulkTransactionsClient, CoreClient, TransactionsClient, ) -from stac_fastapi.elasticsearch.database_logic import create_collection_index from stac_fastapi.elasticsearch.extensions import QueryExtension from stac_fastapi.extensions.core import ( # FieldsExtension, ContextExtension, @@ -54,7 +66,7 @@ def __init__( self.query_params = query_params -class TestSettings(AsyncElasticsearchSettings): +class TestSettings(AsyncSearchSettings): class Config: env_file = ".env.test" @@ -161,7 +173,7 @@ def bulk_txn_client(): @pytest_asyncio.fixture(scope="session") async def app(): - settings = AsyncElasticsearchSettings() + settings = AsyncSearchSettings() extensions = [ TransactionExtension( client=TransactionsClient(session=None), settings=settings