From ff4a919fc61dd48b637a65d0d134cdc82d5f6381 Mon Sep 17 00:00:00 2001 From: jonhealy1 Date: Mon, 20 Nov 2023 19:33:28 +0800 Subject: [PATCH 01/14] database logic to folder --- .../stac_fastapi/elasticsearch/app.py | 2 +- .../stac_fastapi/elasticsearch/core.py | 2 +- .../database_logic.py | 16 +--------------- stac_fastapi/elasticsearch/tests/conftest.py | 2 +- 4 files changed, 4 insertions(+), 18 deletions(-) rename stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/{ => database_elasticsearch}/database_logic.py (97%) diff --git a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/app.py b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/app.py index e3c4bc64..7044be46 100644 --- a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/app.py +++ b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/app.py @@ -8,7 +8,7 @@ EsAsyncBaseFiltersClient, TransactionsClient, ) -from stac_fastapi.elasticsearch.database_logic import create_collection_index +from stac_fastapi.elasticsearch.database_elasticsearch.database_logic import create_collection_index from stac_fastapi.elasticsearch.extensions import QueryExtension from stac_fastapi.elasticsearch.session import Session from stac_fastapi.extensions.core import ( diff --git a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/core.py b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/core.py index 65e9a458..2dfbab20 100644 --- a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/core.py +++ b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/core.py @@ -20,7 +20,7 @@ from stac_fastapi.elasticsearch import serializers from stac_fastapi.elasticsearch.config import ElasticsearchSettings -from stac_fastapi.elasticsearch.database_logic import DatabaseLogic +from stac_fastapi.elasticsearch.database_elasticsearch.database_logic import DatabaseLogic from stac_fastapi.elasticsearch.models.links import PagingLinks from stac_fastapi.elasticsearch.serializers import CollectionSerializer, ItemSerializer from stac_fastapi.elasticsearch.session import Session diff --git a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_elasticsearch/database_logic.py similarity index 97% rename from stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py rename to stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_elasticsearch/database_logic.py index 6b2cd433..f8db5226 100644 --- a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py +++ b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_elasticsearch/database_logic.py @@ -17,6 +17,7 @@ from stac_fastapi.elasticsearch.extensions import filter from stac_fastapi.types.errors import ConflictError, NotFoundError from stac_fastapi.types.stac import Collection, Item +from stac_fastapi.elasticsearch.utilities import bbox2polygon logger = logging.getLogger(__name__) @@ -229,21 +230,6 @@ async def delete_item_index(collection_id: str): await client.close() -def bbox2polygon(b0: float, b1: float, b2: float, b3: float) -> List[List[List[float]]]: - """Transform a bounding box represented by its four coordinates `b0`, `b1`, `b2`, and `b3` into a polygon. - - Args: - b0 (float): The x-coordinate of the lower-left corner of the bounding box. - b1 (float): The y-coordinate of the lower-left corner of the bounding box. - b2 (float): The x-coordinate of the upper-right corner of the bounding box. - b3 (float): The y-coordinate of the upper-right corner of the bounding box. - - Returns: - List[List[List[float]]]: A polygon represented as a list of lists of coordinates. - """ - return [[[b0, b1], [b2, b1], [b2, b3], [b0, b3], [b0, b1]]] - - def mk_item_id(item_id: str, collection_id: str): """Create the document id for an Item in Elasticsearch. diff --git a/stac_fastapi/elasticsearch/tests/conftest.py b/stac_fastapi/elasticsearch/tests/conftest.py index f4b49928..67d72135 100644 --- a/stac_fastapi/elasticsearch/tests/conftest.py +++ b/stac_fastapi/elasticsearch/tests/conftest.py @@ -16,7 +16,7 @@ CoreClient, TransactionsClient, ) -from stac_fastapi.elasticsearch.database_logic import create_collection_index +from stac_fastapi.elasticsearch.database_elasticsearch.database_logic import create_collection_index from stac_fastapi.elasticsearch.extensions import QueryExtension from stac_fastapi.extensions.core import ( # FieldsExtension, ContextExtension, From a4f4cd41ad469de335ef001a4e8ec191561a831f Mon Sep 17 00:00:00 2001 From: jonhealy1 Date: Mon, 20 Nov 2023 19:33:43 +0800 Subject: [PATCH 02/14] add utilities --- .../stac_fastapi/elasticsearch/utilities.py | 15 +++++++++++++++ 1 file changed, 15 insertions(+) create mode 100644 stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/utilities.py diff --git a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/utilities.py b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/utilities.py new file mode 100644 index 00000000..7be319a7 --- /dev/null +++ b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/utilities.py @@ -0,0 +1,15 @@ +from typing import List + +def bbox2polygon(b0: float, b1: float, b2: float, b3: float) -> List[List[List[float]]]: + """Transform a bounding box represented by its four coordinates `b0`, `b1`, `b2`, and `b3` into a polygon. + + Args: + b0 (float): The x-coordinate of the lower-left corner of the bounding box. + b1 (float): The y-coordinate of the lower-left corner of the bounding box. + b2 (float): The x-coordinate of the upper-right corner of the bounding box. + b3 (float): The y-coordinate of the upper-right corner of the bounding box. + + Returns: + List[List[List[float]]]: A polygon represented as a list of lists of coordinates. + """ + return [[[b0, b1], [b2, b1], [b2, b3], [b0, b3], [b0, b1]]] From 45ce18b0b462d43e308b9445a5ba642f147ad0ae Mon Sep 17 00:00:00 2001 From: jonhealy1 Date: Mon, 20 Nov 2023 23:27:33 +0800 Subject: [PATCH 03/14] add opensearch to docker-compose --- docker-compose.yml | 40 ++++++++++++++++++++++++++++++++ opensearch/config/opensearch.yml | 35 ++++++++++++++++++++++++++++ 2 files changed, 75 insertions(+) create mode 100644 opensearch/config/opensearch.yml diff --git a/docker-compose.yml b/docker-compose.yml index 1cad4dee..ac163674 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -29,6 +29,34 @@ services: command: bash -c "./scripts/wait-for-it-es.sh es-container:9200 && python -m stac_fastapi.elasticsearch.app" + app-opensearch: + container_name: stac-fastapi-os + image: stac-utils/stac-fastapi + restart: always + build: + context: . + dockerfile: Dockerfile.dev + environment: + - APP_HOST=0.0.0.0 + - APP_PORT=8080 + - RELOAD=true + - ENVIRONMENT=local + - WEB_CONCURRENCY=10 + - OS_HOST=172.17.0.1 + - OS_PORT=9200 + - OS_USE_SSL=false + - OS_VERIFY_CERTS=false + ports: + - "8082:8080" + volumes: + - ./stac_fastapi:/app/stac_fastapi + - ./scripts:/app/scripts + - ./osdata:/usr/share/opensearch/data + depends_on: + - opensearch + command: + bash -c "./scripts/wait-for-it-os.sh os-container:9200 && python -m stac_fastapi.elasticsearch.app" + elasticsearch: container_name: es-container image: docker.elastic.co/elasticsearch/elasticsearch:${ELASTICSEARCH_VERSION:-8.10.4} @@ -39,3 +67,15 @@ services: - ./elasticsearch/snapshots:/usr/share/elasticsearch/snapshots ports: - "9200:9200" + + opensearch: + container_name: os-container + image: opensearchproject/opensearch:latest + environment: + - "discovery.type=single-node" + - "plugins.security.disabled=true" + volumes: + - ./opensearch/config/opensearch.yml:/usr/share/opensearch/config/opensearch.yml + - ./opensearch/snapshots:/usr/share/opensearch/snapshots + ports: + - "9202:9200" diff --git a/opensearch/config/opensearch.yml b/opensearch/config/opensearch.yml new file mode 100644 index 00000000..dc69776a --- /dev/null +++ b/opensearch/config/opensearch.yml @@ -0,0 +1,35 @@ +## Cluster Settings +cluster.name: stac-cluster +node.name: es01 +network.host: 0.0.0.0 +transport.host: 0.0.0.0 +discovery.type: single-node +http.port: 9200 + +path: + repo: + - /usr/share/opensearch/snapshots + +######## Start OpenSearch Security Demo Configuration ######## +# WARNING: revise all the lines below before you go into production +plugins.security.ssl.transport.pemcert_filepath: esnode.pem +plugins.security.ssl.transport.pemkey_filepath: esnode-key.pem +plugins.security.ssl.transport.pemtrustedcas_filepath: root-ca.pem +plugins.security.ssl.transport.enforce_hostname_verification: false +plugins.security.ssl.http.enabled: true +plugins.security.ssl.http.pemcert_filepath: esnode.pem +plugins.security.ssl.http.pemkey_filepath: esnode-key.pem +plugins.security.ssl.http.pemtrustedcas_filepath: root-ca.pem +plugins.security.allow_unsafe_democertificates: true +plugins.security.allow_default_init_securityindex: true +plugins.security.authcz.admin_dn: + - CN=kirk,OU=client,O=client,L=test, C=de + +plugins.security.audit.type: internal_opensearch +plugins.security.enable_snapshot_restore_privilege: true +plugins.security.check_snapshot_restore_write_privileges: true +plugins.security.restapi.roles_enabled: ["all_access", "security_rest_api_access"] +plugins.security.system_indices.enabled: true +plugins.security.system_indices.indices: [".plugins-ml-config", ".plugins-ml-connector", ".plugins-ml-model-group", ".plugins-ml-model", ".plugins-ml-task", ".plugins-ml-conversation-meta", ".plugins-ml-conversation-interactions", ".opendistro-alerting-config", ".opendistro-alerting-alert*", ".opendistro-anomaly-results*", ".opendistro-anomaly-detector*", ".opendistro-anomaly-checkpoints", ".opendistro-anomaly-detection-state", ".opendistro-reports-*", ".opensearch-notifications-*", ".opensearch-notebooks", ".opensearch-observability", ".ql-datasources", ".opendistro-asynchronous-search-response*", ".replication-metadata-store", ".opensearch-knn-models", ".geospatial-ip2geo-data*"] +node.max_local_storage_nodes: 3 +######## End OpenSearch Security Demo Configuration ######## From 6a9b66601b880926c912f824a4e2e43f31fe2a26 Mon Sep 17 00:00:00 2001 From: jonhealy1 Date: Tue, 21 Nov 2023 12:08:52 +0800 Subject: [PATCH 04/14] update compose --- docker-compose.yml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index ac163674..e8058b45 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -3,7 +3,7 @@ version: '3.9' services: app-elasticsearch: container_name: stac-fastapi-es - image: stac-utils/stac-fastapi + image: stac-utils/stac-fastapi-es restart: always build: context: . @@ -31,7 +31,7 @@ services: app-opensearch: container_name: stac-fastapi-os - image: stac-utils/stac-fastapi + image: stac-utils/stac-fastapi-os restart: always build: context: . @@ -55,7 +55,7 @@ services: depends_on: - opensearch command: - bash -c "./scripts/wait-for-it-os.sh os-container:9200 && python -m stac_fastapi.elasticsearch.app" + bash -c "./scripts/wait-for-it-es.sh os-container:9200 && python -m stac_fastapi.elasticsearch.app" elasticsearch: container_name: es-container @@ -67,7 +67,7 @@ services: - ./elasticsearch/snapshots:/usr/share/elasticsearch/snapshots ports: - "9200:9200" - + opensearch: container_name: os-container image: opensearchproject/opensearch:latest From ae9b38fecd9c3ca708f0c75c9cae3a9ce37b40e1 Mon Sep 17 00:00:00 2001 From: James Date: Fri, 19 Jan 2024 19:57:38 -0500 Subject: [PATCH 05/14] arcitecture, requirements, opensearch config --- .gitignore | 3 + README.md | 4 +- docker-compose.yml | 76 +- opensearch/config/opensearch.yml | 2 +- stac_fastapi/elasticsearch/setup.py | 2 + .../stac_fastapi/elasticsearch/app.py | 4 +- .../elasticsearch/config/__init__.py | 1 + .../config_elasticsearch.py} | 0 .../elasticsearch/config/config_opensearch.py | 71 ++ .../stac_fastapi/elasticsearch/core.py | 4 +- .../elasticsearch/database_logic/__init__.py | 1 + .../database_logic_elasticsearch.py | 838 ++++++++++++++++++ .../database_logic_opensearch.py} | 4 +- stac_fastapi/elasticsearch/tests/conftest.py | 4 +- 14 files changed, 966 insertions(+), 48 deletions(-) create mode 100644 stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/config/__init__.py rename stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/{config.py => config/config_elasticsearch.py} (100%) create mode 100644 stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/config/config_opensearch.py create mode 100644 stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic/__init__.py create mode 100644 stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic/database_logic_elasticsearch.py rename stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/{database_elasticsearch/database_logic.py => database_logic/database_logic_opensearch.py} (99%) diff --git a/.gitignore b/.gitignore index 81ab98af..5cce79de 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,8 @@ elasticsearch/snapshots/ +# local testing +DEV.ipynb + # Byte-compiled / optimized / DLL files __pycache__/ *.py[cod] diff --git a/README.md b/README.md index 5b4b7685..0078d2ef 100644 --- a/README.md +++ b/README.md @@ -42,13 +42,15 @@ docker-compose build docker-compose up ``` -By default, docker-compose uses Elasticsearch 8.x. However, most recent 7.x versions should also work. +By default, docker-compose uses Elasticsearch 8.x and OpenSearch 2.11.1. If you wish to use a different version, put the following in a file named `.env` in the same directory you run docker-compose from: ```shell ELASTICSEARCH_VERSION=7.17.1 +OPENSEARCH_VERSION=2.11.0 ``` +The most recent 7.x versions should also work. See the [opensearch-py docs](https://github.com/opensearch-project/opensearch-py/blob/main/COMPATIBILITY.md) for compatibility information. To create a new Collection: diff --git a/docker-compose.yml b/docker-compose.yml index 96b7f493..c181b096 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,33 +1,33 @@ version: '3.9' services: - app-elasticsearch: - container_name: stac-fastapi-es - image: stac-utils/stac-fastapi-es - restart: always - build: - context: . - dockerfile: Dockerfile.dev - environment: - - APP_HOST=0.0.0.0 - - APP_PORT=8080 - - RELOAD=true - - ENVIRONMENT=local - - WEB_CONCURRENCY=10 - - ES_HOST=172.17.0.1 - - ES_PORT=9200 - - ES_USE_SSL=false - - ES_VERIFY_CERTS=false - ports: - - "8080:8080" - volumes: - - ./stac_fastapi:/app/stac_fastapi - - ./scripts:/app/scripts - - ./esdata:/usr/share/elasticsearch/data - depends_on: - - elasticsearch - command: - bash -c "./scripts/wait-for-it-es.sh es-container:9200 && python -m stac_fastapi.elasticsearch.app" + # app-elasticsearch: + # container_name: stac-fastapi-es + # image: stac-utils/stac-fastapi-es + # restart: always + # build: + # context: . + # dockerfile: Dockerfile.dev + # environment: + # - APP_HOST=0.0.0.0 + # - APP_PORT=8080 + # - RELOAD=true + # - ENVIRONMENT=local + # - WEB_CONCURRENCY=10 + # - ES_HOST=172.17.0.1 + # - ES_PORT=9200 + # - ES_USE_SSL=false + # - ES_VERIFY_CERTS=false + # ports: + # - "8080:8080" + # volumes: + # - ./stac_fastapi:/app/stac_fastapi + # - ./scripts:/app/scripts + # - ./esdata:/usr/share/elasticsearch/data + # depends_on: + # - elasticsearch + # command: + # bash -c "./scripts/wait-for-it-es.sh es-container:9200 && python -m stac_fastapi.elasticsearch.app" app-opensearch: container_name: stac-fastapi-os @@ -57,20 +57,20 @@ services: command: bash -c "./scripts/wait-for-it-es.sh os-container:9200 && python -m stac_fastapi.elasticsearch.app" - elasticsearch: - container_name: es-container - image: docker.elastic.co/elasticsearch/elasticsearch:${ELASTICSEARCH_VERSION:-8.11.0} - environment: - ES_JAVA_OPTS: -Xms512m -Xmx1g - volumes: - - ./elasticsearch/config/elasticsearch.yml:/usr/share/elasticsearch/config/elasticsearch.yml - - ./elasticsearch/snapshots:/usr/share/elasticsearch/snapshots - ports: - - "9200:9200" + # elasticsearch: + # container_name: es-container + # image: docker.elastic.co/elasticsearch/elasticsearch:${ELASTICSEARCH_VERSION:-8.11.0} + # environment: + # ES_JAVA_OPTS: -Xms512m -Xmx1g + # volumes: + # - ./elasticsearch/config/elasticsearch.yml:/usr/share/elasticsearch/config/elasticsearch.yml + # - ./elasticsearch/snapshots:/usr/share/elasticsearch/snapshots + # ports: + # - "9200:9200" opensearch: container_name: os-container - image: opensearchproject/opensearch:latest + image: opensearchproject/opensearch:${OPENSEARCH_VERSION:-2.11.1} environment: - "discovery.type=single-node" - "plugins.security.disabled=true" diff --git a/opensearch/config/opensearch.yml b/opensearch/config/opensearch.yml index dc69776a..71607e6c 100644 --- a/opensearch/config/opensearch.yml +++ b/opensearch/config/opensearch.yml @@ -1,6 +1,6 @@ ## Cluster Settings cluster.name: stac-cluster -node.name: es01 +node.name: os01 network.host: 0.0.0.0 transport.host: 0.0.0.0 discovery.type: single-node diff --git a/stac_fastapi/elasticsearch/setup.py b/stac_fastapi/elasticsearch/setup.py index 3106c512..201b6dc8 100644 --- a/stac_fastapi/elasticsearch/setup.py +++ b/stac_fastapi/elasticsearch/setup.py @@ -15,6 +15,8 @@ "stac-fastapi.extensions==2.4.9", "elasticsearch[async]==8.11.0", "elasticsearch-dsl==8.11.0", + "opensearch-py==2.1.1", + "opensearch-py[async]==2.1.1", "pystac[validation]", "uvicorn", "orjson", diff --git a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/app.py b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/app.py index fabfbb4f..5a37e3b5 100644 --- a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/app.py +++ b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/app.py @@ -1,14 +1,14 @@ """FastAPI application.""" from stac_fastapi.api.app import StacApi from stac_fastapi.api.models import create_get_request_model, create_post_request_model -from stac_fastapi.elasticsearch.config import ElasticsearchSettings +from stac_fastapi.elasticsearch.config.config_elasticsearch import ElasticsearchSettings from stac_fastapi.elasticsearch.core import ( BulkTransactionsClient, CoreClient, EsAsyncBaseFiltersClient, TransactionsClient, ) -from stac_fastapi.elasticsearch.database_elasticsearch.database_logic import create_collection_index +from stac_fastapi.elasticsearch.database_elasticsearch.database_logic.database_logic_elasticsearch import create_collection_index from stac_fastapi.elasticsearch.extensions import QueryExtension from stac_fastapi.elasticsearch.session import Session from stac_fastapi.extensions.core import ( diff --git a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/config/__init__.py b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/config/__init__.py new file mode 100644 index 00000000..77cfd61d --- /dev/null +++ b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/config/__init__.py @@ -0,0 +1 @@ +"""client config implementations.""" diff --git a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/config.py b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/config/config_elasticsearch.py similarity index 100% rename from stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/config.py rename to stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/config/config_elasticsearch.py diff --git a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/config/config_opensearch.py b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/config/config_opensearch.py new file mode 100644 index 00000000..3f8b88b8 --- /dev/null +++ b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/config/config_opensearch.py @@ -0,0 +1,71 @@ +"""API configuration.""" +import os +import ssl +from typing import Any, Dict, Set + +from opensearchpy import AsyncOpenSearch, OpenSearch +from stac_fastapi.types.config import ApiSettings + + +def _es_config() -> Dict[str, Any]: + # Determine the scheme (http or https) + use_ssl = os.getenv("ES_USE_SSL", "true").lower() == "true" + scheme = "https" if use_ssl else "http" + + # Configure the hosts parameter with the correct scheme + hosts = [f"{scheme}://{os.getenv('ES_HOST')}:{os.getenv('ES_PORT')}"] + + # Initialize the configuration dictionary + config = { + "hosts": hosts, + "headers": {"accept": "application/vnd.elasticsearch+json; compatible-with=7"}, + } + + # Explicitly exclude SSL settings when not using SSL + if not use_ssl: + return config + + # Include SSL settings if using https + config["ssl_version"] = ssl.TLSVersion.TLSv1_3 # type: ignore + config["verify_certs"] = os.getenv("ES_VERIFY_CERTS", "true").lower() != "false" # type: ignore + + # Include CA Certificates if verifying certs + if config["verify_certs"]: + config["ca_certs"] = os.getenv( + "CURL_CA_BUNDLE", "/etc/ssl/certs/ca-certificates.crt" + ) + + # Handle authentication + if (u := os.getenv("ES_USER")) and (p := os.getenv("ES_PASS")): + config["http_auth"] = (u, p) + + return config + + +_forbidden_fields: Set[str] = {"type"} + + +class ElasticsearchSettings(ApiSettings): + """API settings.""" + + # Fields which are defined by STAC but not included in the database model + forbidden_fields: Set[str] = _forbidden_fields + indexed_fields: Set[str] = {"datetime"} + + @property + def create_client(self): + """Create es client.""" + return OpenSearch(**_es_config()) + + +class AsyncElasticsearchSettings(ApiSettings): + """API settings.""" + + # Fields which are defined by STAC but not included in the database model + forbidden_fields: Set[str] = _forbidden_fields + indexed_fields: Set[str] = {"datetime"} + + @property + def create_client(self): + """Create async elasticsearch client.""" + return AsyncOpenSearch(**_es_config()) diff --git a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/core.py b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/core.py index d65190fd..45dfefb4 100644 --- a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/core.py +++ b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/core.py @@ -19,8 +19,8 @@ from stac_pydantic.shared import MimeTypes from stac_fastapi.elasticsearch import serializers -from stac_fastapi.elasticsearch.config import ElasticsearchSettings -from stac_fastapi.elasticsearch.database_elasticsearch.database_logic import DatabaseLogic +from stac_fastapi.elasticsearch.config.config_elasticsearch import ElasticsearchSettings +from stac_fastapi.elasticsearch.database_elasticsearch.database_logic.database_logic_elasticsearch import DatabaseLogic from stac_fastapi.elasticsearch.models.links import PagingLinks from stac_fastapi.elasticsearch.serializers import CollectionSerializer, ItemSerializer from stac_fastapi.elasticsearch.session import Session diff --git a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic/__init__.py b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic/__init__.py new file mode 100644 index 00000000..2a8962ca --- /dev/null +++ b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic/__init__.py @@ -0,0 +1 @@ +"""database logic implementations.""" diff --git a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic/database_logic_elasticsearch.py b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic/database_logic_elasticsearch.py new file mode 100644 index 00000000..3ecc3797 --- /dev/null +++ b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic/database_logic_elasticsearch.py @@ -0,0 +1,838 @@ +"""Database logic.""" +import asyncio +import logging +import os +from base64 import urlsafe_b64decode, urlsafe_b64encode +from typing import Any, Dict, Iterable, List, Optional, Protocol, Tuple, Type, Union + +import attr +from elasticsearch_dsl import Q, Search + +from elasticsearch import exceptions, helpers # type: ignore +from stac_fastapi.elasticsearch import serializers +from stac_fastapi.elasticsearch.config.config_elasticsearch import AsyncElasticsearchSettings +from stac_fastapi.elasticsearch.config.config_elasticsearch import ( + ElasticsearchSettings as SyncElasticsearchSettings, +) +from stac_fastapi.elasticsearch.extensions import filter +from stac_fastapi.types.errors import ConflictError, NotFoundError +from stac_fastapi.types.stac import Collection, Item +from stac_fastapi.elasticsearch.utilities import bbox2polygon + +logger = logging.getLogger(__name__) + +NumType = Union[float, int] + +COLLECTIONS_INDEX = os.getenv("STAC_COLLECTIONS_INDEX", "collections") +ITEMS_INDEX_PREFIX = os.getenv("STAC_ITEMS_INDEX_PREFIX", "items_") +ES_INDEX_NAME_UNSUPPORTED_CHARS = { + "\\", + "/", + "*", + "?", + '"', + "<", + ">", + "|", + " ", + ",", + "#", + ":", +} + +ITEM_INDICES = f"{ITEMS_INDEX_PREFIX}*,-*kibana*,-{COLLECTIONS_INDEX}*" + +DEFAULT_SORT = { + "properties.datetime": {"order": "desc"}, + "id": {"order": "desc"}, + "collection": {"order": "desc"}, +} + +ES_ITEMS_SETTINGS = { + "index": { + "sort.field": list(DEFAULT_SORT.keys()), + "sort.order": [v["order"] for v in DEFAULT_SORT.values()], + } +} + +ES_MAPPINGS_DYNAMIC_TEMPLATES = [ + # Common https://github.com/radiantearth/stac-spec/blob/master/item-spec/common-metadata.md + { + "descriptions": { + "match_mapping_type": "string", + "match": "description", + "mapping": {"type": "text"}, + } + }, + { + "titles": { + "match_mapping_type": "string", + "match": "title", + "mapping": {"type": "text"}, + } + }, + # Projection Extension https://github.com/stac-extensions/projection + {"proj_epsg": {"match": "proj:epsg", "mapping": {"type": "integer"}}}, + { + "proj_projjson": { + "match": "proj:projjson", + "mapping": {"type": "object", "enabled": False}, + } + }, + { + "proj_centroid": { + "match": "proj:centroid", + "mapping": {"type": "geo_point"}, + } + }, + { + "proj_geometry": { + "match": "proj:geometry", + "mapping": {"type": "object", "enabled": False}, + } + }, + { + "no_index_href": { + "match": "href", + "mapping": {"type": "text", "index": False}, + } + }, + # Default all other strings not otherwise specified to keyword + {"strings": {"match_mapping_type": "string", "mapping": {"type": "keyword"}}}, + {"numerics": {"match_mapping_type": "long", "mapping": {"type": "float"}}}, +] + +ES_ITEMS_MAPPINGS = { + "numeric_detection": False, + "dynamic_templates": ES_MAPPINGS_DYNAMIC_TEMPLATES, + "properties": { + "id": {"type": "keyword"}, + "collection": {"type": "keyword"}, + "geometry": {"type": "geo_shape"}, + "assets": {"type": "object", "enabled": False}, + "links": {"type": "object", "enabled": False}, + "properties": { + "type": "object", + "properties": { + # Common https://github.com/radiantearth/stac-spec/blob/master/item-spec/common-metadata.md + "datetime": {"type": "date"}, + "start_datetime": {"type": "date"}, + "end_datetime": {"type": "date"}, + "created": {"type": "date"}, + "updated": {"type": "date"}, + # Satellite Extension https://github.com/stac-extensions/sat + "sat:absolute_orbit": {"type": "integer"}, + "sat:relative_orbit": {"type": "integer"}, + }, + }, + }, +} + +ES_COLLECTIONS_MAPPINGS = { + "numeric_detection": False, + "dynamic_templates": ES_MAPPINGS_DYNAMIC_TEMPLATES, + "properties": { + "extent.spatial.bbox": {"type": "long"}, + "extent.temporal.interval": {"type": "date"}, + "providers": {"type": "object", "enabled": False}, + "links": {"type": "object", "enabled": False}, + "item_assets": {"type": "object", "enabled": False}, + }, +} + + +def index_by_collection_id(collection_id: str) -> str: + """ + Translate a collection id into an Elasticsearch index name. + + Args: + collection_id (str): The collection id to translate into an index name. + + Returns: + str: The index name derived from the collection id. + """ + return f"{ITEMS_INDEX_PREFIX}{''.join(c for c in collection_id.lower() if c not in ES_INDEX_NAME_UNSUPPORTED_CHARS)}" + + +def indices(collection_ids: Optional[List[str]]) -> str: + """ + Get a comma-separated string of index names for a given list of collection ids. + + Args: + collection_ids: A list of collection ids. + + Returns: + A string of comma-separated index names. If `collection_ids` is None, returns the default indices. + """ + if collection_ids is None: + return ITEM_INDICES + else: + return ",".join([index_by_collection_id(c) for c in collection_ids]) + + +async def create_collection_index() -> None: + """ + Create the index for a Collection. + + Returns: + None + + """ + client = AsyncElasticsearchSettings().create_client + + await client.options(ignore_status=400).indices.create( + index=f"{COLLECTIONS_INDEX}-000001", + aliases={COLLECTIONS_INDEX: {}}, + mappings=ES_COLLECTIONS_MAPPINGS, + ) + await client.close() + + +async def create_item_index(collection_id: str): + """ + Create the index for Items. + + Args: + collection_id (str): Collection identifier. + + Returns: + None + + """ + client = AsyncElasticsearchSettings().create_client + index_name = index_by_collection_id(collection_id) + + await client.options(ignore_status=400).indices.create( + index=f"{index_by_collection_id(collection_id)}-000001", + aliases={index_name: {}}, + mappings=ES_ITEMS_MAPPINGS, + settings=ES_ITEMS_SETTINGS, + ) + await client.close() + + +async def delete_item_index(collection_id: str): + """Delete the index for items in a collection. + + Args: + collection_id (str): The ID of the collection whose items index will be deleted. + """ + client = AsyncElasticsearchSettings().create_client + + name = index_by_collection_id(collection_id) + resolved = await client.indices.resolve_index(name=name) + if "aliases" in resolved and resolved["aliases"]: + [alias] = resolved["aliases"] + await client.indices.delete_alias(index=alias["indices"], name=alias["name"]) + await client.indices.delete(index=alias["indices"]) + else: + await client.indices.delete(index=name) + await client.close() + + +def mk_item_id(item_id: str, collection_id: str): + """Create the document id for an Item in Elasticsearch. + + Args: + item_id (str): The id of the Item. + collection_id (str): The id of the Collection that the Item belongs to. + + Returns: + str: The document id for the Item, combining the Item id and the Collection id, separated by a `|` character. + """ + return f"{item_id}|{collection_id}" + + +def mk_actions(collection_id: str, processed_items: List[Item]): + """Create Elasticsearch bulk actions for a list of processed items. + + Args: + collection_id (str): The identifier for the collection the items belong to. + processed_items (List[Item]): The list of processed items to be bulk indexed. + + Returns: + List[Dict[str, Union[str, Dict]]]: The list of bulk actions to be executed, + each action being a dictionary with the following keys: + - `_index`: the index to store the document in. + - `_id`: the document's identifier. + - `_source`: the source of the document. + """ + return [ + { + "_index": index_by_collection_id(collection_id), + "_id": mk_item_id(item["id"], item["collection"]), + "_source": item, + } + for item in processed_items + ] + + +# stac_pydantic classes extend _GeometryBase, which doesn't have a type field, +# So create our own Protocol for typing +# Union[ Point, MultiPoint, LineString, MultiLineString, Polygon, MultiPolygon, GeometryCollection] +class Geometry(Protocol): # noqa + type: str + coordinates: Any + + +@attr.s +class DatabaseLogic: + """Database logic.""" + + client = AsyncElasticsearchSettings().create_client + sync_client = SyncElasticsearchSettings().create_client + + item_serializer: Type[serializers.ItemSerializer] = attr.ib( + default=serializers.ItemSerializer + ) + collection_serializer: Type[serializers.CollectionSerializer] = attr.ib( + default=serializers.CollectionSerializer + ) + + """CORE LOGIC""" + + async def get_all_collections( + self, token: Optional[str], limit: int + ) -> Iterable[Dict[str, Any]]: + """Retrieve a list of all collections from the database. + + Args: + token (Optional[str]): The token used to return the next set of results. + limit (int): Number of results to return + + Returns: + collections (Iterable[Dict[str, Any]]): A list of dictionaries containing the source data for each collection. + + Notes: + The collections are retrieved from the Elasticsearch database using the `client.search` method, + with the `COLLECTIONS_INDEX` as the target index and `size=limit` to retrieve records. + The result is a generator of dictionaries containing the source data for each collection. + """ + search_after = None + if token: + search_after = urlsafe_b64decode(token.encode()).decode().split(",") + collections = await self.client.search( + index=COLLECTIONS_INDEX, + search_after=search_after, + size=limit, + sort={"id": {"order": "asc"}}, + ) + hits = collections["hits"]["hits"] + return hits + + async def get_one_item(self, collection_id: str, item_id: str) -> Dict: + """Retrieve a single item from the database. + + Args: + collection_id (str): The id of the Collection that the Item belongs to. + item_id (str): The id of the Item. + + Returns: + item (Dict): A dictionary containing the source data for the Item. + + Raises: + NotFoundError: If the specified Item does not exist in the Collection. + + Notes: + The Item is retrieved from the Elasticsearch database using the `client.get` method, + with the index for the Collection as the target index and the combined `mk_item_id` as the document id. + """ + try: + item = await self.client.get( + index=index_by_collection_id(collection_id), + id=mk_item_id(item_id, collection_id), + ) + except exceptions.NotFoundError: + raise NotFoundError( + f"Item {item_id} does not exist in Collection {collection_id}" + ) + return item["_source"] + + @staticmethod + def make_search(): + """Database logic to create a Search instance.""" + return Search().sort(*DEFAULT_SORT) + + @staticmethod + def apply_ids_filter(search: Search, item_ids: List[str]): + """Database logic to search a list of STAC item ids.""" + return search.filter("terms", id=item_ids) + + @staticmethod + def apply_collections_filter(search: Search, collection_ids: List[str]): + """Database logic to search a list of STAC collection ids.""" + return search.filter("terms", collection=collection_ids) + + @staticmethod + def apply_datetime_filter(search: Search, datetime_search): + """Apply a filter to search based on datetime field. + + Args: + search (Search): The search object to filter. + datetime_search (dict): The datetime filter criteria. + + Returns: + Search: The filtered search object. + """ + if "eq" in datetime_search: + search = search.filter( + "term", **{"properties__datetime": datetime_search["eq"]} + ) + else: + search = search.filter( + "range", properties__datetime={"lte": datetime_search["lte"]} + ) + search = search.filter( + "range", properties__datetime={"gte": datetime_search["gte"]} + ) + return search + + @staticmethod + def apply_bbox_filter(search: Search, bbox: List): + """Filter search results based on bounding box. + + Args: + search (Search): The search object to apply the filter to. + bbox (List): The bounding box coordinates, represented as a list of four values [minx, miny, maxx, maxy]. + + Returns: + search (Search): The search object with the bounding box filter applied. + + Notes: + The bounding box is transformed into a polygon using the `bbox2polygon` function and + a geo_shape filter is added to the search object, set to intersect with the specified polygon. + """ + return search.filter( + Q( + { + "geo_shape": { + "geometry": { + "shape": { + "type": "polygon", + "coordinates": bbox2polygon(*bbox), + }, + "relation": "intersects", + } + } + } + ) + ) + + @staticmethod + def apply_intersects_filter( + search: Search, + intersects: Geometry, + ): + """Filter search results based on intersecting geometry. + + Args: + search (Search): The search object to apply the filter to. + intersects (Geometry): The intersecting geometry, represented as a GeoJSON-like object. + + Returns: + search (Search): The search object with the intersecting geometry filter applied. + + Notes: + A geo_shape filter is added to the search object, set to intersect with the specified geometry. + """ + return search.filter( + Q( + { + "geo_shape": { + "geometry": { + "shape": { + "type": intersects.type.lower(), + "coordinates": intersects.coordinates, + }, + "relation": "intersects", + } + } + } + ) + ) + + @staticmethod + def apply_stacql_filter(search: Search, op: str, field: str, value: float): + """Filter search results based on a comparison between a field and a value. + + Args: + search (Search): The search object to apply the filter to. + op (str): The comparison operator to use. Can be 'eq' (equal), 'gt' (greater than), 'gte' (greater than or equal), + 'lt' (less than), or 'lte' (less than or equal). + field (str): The field to perform the comparison on. + value (float): The value to compare the field against. + + Returns: + search (Search): The search object with the specified filter applied. + """ + if op != "eq": + key_filter = {field: {f"{op}": value}} + search = search.filter(Q("range", **key_filter)) + else: + search = search.filter("term", **{field: value}) + + return search + + @staticmethod + def apply_cql2_filter(search: Search, _filter: Optional[Dict[str, Any]]): + """Database logic to perform query for search endpoint.""" + if _filter is not None: + search = search.filter(filter.Clause.parse_obj(_filter).to_es()) + return search + + @staticmethod + def populate_sort(sortby: List) -> Optional[Dict[str, Dict[str, str]]]: + """Database logic to sort search instance.""" + if sortby: + return {s.field: {"order": s.direction} for s in sortby} + else: + return None + + async def execute_search( + self, + search: Search, + limit: int, + token: Optional[str], + sort: Optional[Dict[str, Dict[str, str]]], + collection_ids: Optional[List[str]], + ignore_unavailable: bool = True, + ) -> Tuple[Iterable[Dict[str, Any]], Optional[int], Optional[str]]: + """Execute a search query with limit and other optional parameters. + + Args: + search (Search): The search query to be executed. + limit (int): The maximum number of results to be returned. + token (Optional[str]): The token used to return the next set of results. + sort (Optional[Dict[str, Dict[str, str]]]): Specifies how the results should be sorted. + collection_ids (Optional[List[str]]): The collection ids to search. + ignore_unavailable (bool, optional): Whether to ignore unavailable collections. Defaults to True. + + Returns: + Tuple[Iterable[Dict[str, Any]], Optional[int], Optional[str]]: A tuple containing: + - An iterable of search results, where each result is a dictionary with keys and values representing the + fields and values of each document. + - The total number of results (if the count could be computed), or None if the count could not be + computed. + - The token to be used to retrieve the next set of results, or None if there are no more results. + + Raises: + NotFoundError: If the collections specified in `collection_ids` do not exist. + """ + search_after = None + if token: + search_after = urlsafe_b64decode(token.encode()).decode().split(",") + + query = search.query.to_dict() if search.query else None + + index_param = indices(collection_ids) + + search_task = asyncio.create_task( + self.client.search( + index=index_param, + ignore_unavailable=ignore_unavailable, + query=query, + sort=sort or DEFAULT_SORT, + search_after=search_after, + size=limit, + ) + ) + + count_task = asyncio.create_task( + self.client.count( + index=index_param, + ignore_unavailable=ignore_unavailable, + body=search.to_dict(count=True), + ) + ) + + try: + es_response = await search_task + except exceptions.NotFoundError: + raise NotFoundError(f"Collections '{collection_ids}' do not exist") + + hits = es_response["hits"]["hits"] + items = (hit["_source"] for hit in hits) + + next_token = None + if hits and (sort_array := hits[-1].get("sort")): + next_token = urlsafe_b64encode( + ",".join([str(x) for x in sort_array]).encode() + ).decode() + + # (1) count should not block returning results, so don't wait for it to be done + # (2) don't cancel the task so that it will populate the ES cache for subsequent counts + maybe_count = None + if count_task.done(): + try: + maybe_count = count_task.result().get("count") + except Exception as e: + logger.error(f"Count task failed: {e}") + + return items, maybe_count, next_token + + """ TRANSACTION LOGIC """ + + async def check_collection_exists(self, collection_id: str): + """Database logic to check if a collection exists.""" + if not await self.client.exists(index=COLLECTIONS_INDEX, id=collection_id): + raise NotFoundError(f"Collection {collection_id} does not exist") + + async def prep_create_item( + self, item: Item, base_url: str, exist_ok: bool = False + ) -> Item: + """ + Preps an item for insertion into the database. + + Args: + item (Item): The item to be prepped for insertion. + base_url (str): The base URL used to create the item's self URL. + exist_ok (bool): Indicates whether the item can exist already. + + Returns: + Item: The prepped item. + + Raises: + ConflictError: If the item already exists in the database. + + """ + await self.check_collection_exists(collection_id=item["collection"]) + + if not exist_ok and await self.client.exists( + index=index_by_collection_id(item["collection"]), + id=mk_item_id(item["id"], item["collection"]), + ): + raise ConflictError( + f"Item {item['id']} in collection {item['collection']} already exists" + ) + + return self.item_serializer.stac_to_db(item, base_url) + + def sync_prep_create_item( + self, item: Item, base_url: str, exist_ok: bool = False + ) -> Item: + """ + Prepare an item for insertion into the database. + + This method performs pre-insertion preparation on the given `item`, + such as checking if the collection the item belongs to exists, + and optionally verifying that an item with the same ID does not already exist in the database. + + Args: + item (Item): The item to be inserted into the database. + base_url (str): The base URL used for constructing URLs for the item. + exist_ok (bool): Indicates whether the item can exist already. + + Returns: + Item: The item after preparation is done. + + Raises: + NotFoundError: If the collection that the item belongs to does not exist in the database. + ConflictError: If an item with the same ID already exists in the collection. + """ + item_id = item["id"] + collection_id = item["collection"] + if not self.sync_client.exists(index=COLLECTIONS_INDEX, id=collection_id): + raise NotFoundError(f"Collection {collection_id} does not exist") + + if not exist_ok and self.sync_client.exists( + index=index_by_collection_id(collection_id), + id=mk_item_id(item_id, collection_id), + ): + raise ConflictError( + f"Item {item_id} in collection {collection_id} already exists" + ) + + return self.item_serializer.stac_to_db(item, base_url) + + async def create_item(self, item: Item, refresh: bool = False): + """Database logic for creating one item. + + Args: + item (Item): The item to be created. + refresh (bool, optional): Refresh the index after performing the operation. Defaults to False. + + Raises: + ConflictError: If the item already exists in the database. + + Returns: + None + """ + # todo: check if collection exists, but cache + item_id = item["id"] + collection_id = item["collection"] + es_resp = await self.client.index( + index=index_by_collection_id(collection_id), + id=mk_item_id(item_id, collection_id), + document=item, + refresh=refresh, + ) + + if (meta := es_resp.get("meta")) and meta.get("status") == 409: + raise ConflictError( + f"Item {item_id} in collection {collection_id} already exists" + ) + + async def delete_item( + self, item_id: str, collection_id: str, refresh: bool = False + ): + """Delete a single item from the database. + + Args: + item_id (str): The id of the Item to be deleted. + collection_id (str): The id of the Collection that the Item belongs to. + refresh (bool, optional): Whether to refresh the index after the deletion. Default is False. + + Raises: + NotFoundError: If the Item does not exist in the database. + """ + try: + await self.client.delete( + index=index_by_collection_id(collection_id), + id=mk_item_id(item_id, collection_id), + refresh=refresh, + ) + except exceptions.NotFoundError: + raise NotFoundError( + f"Item {item_id} in collection {collection_id} not found" + ) + + async def create_collection(self, collection: Collection, refresh: bool = False): + """Create a single collection in the database. + + Args: + collection (Collection): The Collection object to be created. + refresh (bool, optional): Whether to refresh the index after the creation. Default is False. + + Raises: + ConflictError: If a Collection with the same id already exists in the database. + + Notes: + A new index is created for the items in the Collection using the `create_item_index` function. + """ + collection_id = collection["id"] + + if await self.client.exists(index=COLLECTIONS_INDEX, id=collection_id): + raise ConflictError(f"Collection {collection_id} already exists") + + await self.client.index( + index=COLLECTIONS_INDEX, + id=collection_id, + document=collection, + refresh=refresh, + ) + + await create_item_index(collection_id) + + async def find_collection(self, collection_id: str) -> Collection: + """Find and return a collection from the database. + + Args: + self: The instance of the object calling this function. + collection_id (str): The ID of the collection to be found. + + Returns: + Collection: The found collection, represented as a `Collection` object. + + Raises: + NotFoundError: If the collection with the given `collection_id` is not found in the database. + + Notes: + This function searches for a collection in the database using the specified `collection_id` and returns the found + collection as a `Collection` object. If the collection is not found, a `NotFoundError` is raised. + """ + try: + collection = await self.client.get( + index=COLLECTIONS_INDEX, id=collection_id + ) + except exceptions.NotFoundError: + raise NotFoundError(f"Collection {collection_id} not found") + + return collection["_source"] + + async def delete_collection(self, collection_id: str, refresh: bool = False): + """Delete a collection from the database. + + Parameters: + self: The instance of the object calling this function. + collection_id (str): The ID of the collection to be deleted. + refresh (bool): Whether to refresh the index after the deletion (default: False). + + Raises: + NotFoundError: If the collection with the given `collection_id` is not found in the database. + + Notes: + This function first verifies that the collection with the specified `collection_id` exists in the database, and then + deletes the collection. If `refresh` is set to True, the index is refreshed after the deletion. Additionally, this + function also calls `delete_item_index` to delete the index for the items in the collection. + """ + await self.find_collection(collection_id=collection_id) + await self.client.delete( + index=COLLECTIONS_INDEX, id=collection_id, refresh=refresh + ) + await delete_item_index(collection_id) + + async def bulk_async( + self, collection_id: str, processed_items: List[Item], refresh: bool = False + ) -> None: + """Perform a bulk insert of items into the database asynchronously. + + Args: + self: The instance of the object calling this function. + collection_id (str): The ID of the collection to which the items belong. + processed_items (List[Item]): A list of `Item` objects to be inserted into the database. + refresh (bool): Whether to refresh the index after the bulk insert (default: False). + + Notes: + This function performs a bulk insert of `processed_items` into the database using the specified `collection_id`. The + insert is performed asynchronously, and the event loop is used to run the operation in a separate executor. The + `mk_actions` function is called to generate a list of actions for the bulk insert. If `refresh` is set to True, the + index is refreshed after the bulk insert. The function does not return any value. + """ + await helpers.async_bulk( + self.client, + mk_actions(collection_id, processed_items), + refresh=refresh, + raise_on_error=False, + ) + + def bulk_sync( + self, collection_id: str, processed_items: List[Item], refresh: bool = False + ) -> None: + """Perform a bulk insert of items into the database synchronously. + + Args: + self: The instance of the object calling this function. + collection_id (str): The ID of the collection to which the items belong. + processed_items (List[Item]): A list of `Item` objects to be inserted into the database. + refresh (bool): Whether to refresh the index after the bulk insert (default: False). + + Notes: + This function performs a bulk insert of `processed_items` into the database using the specified `collection_id`. The + insert is performed synchronously and blocking, meaning that the function does not return until the insert has + completed. The `mk_actions` function is called to generate a list of actions for the bulk insert. If `refresh` is set to + True, the index is refreshed after the bulk insert. The function does not return any value. + """ + helpers.bulk( + self.sync_client, + mk_actions(collection_id, processed_items), + refresh=refresh, + raise_on_error=False, + ) + + # DANGER + async def delete_items(self) -> None: + """Danger. this is only for tests.""" + await self.client.delete_by_query( + index=ITEM_INDICES, + body={"query": {"match_all": {}}}, + wait_for_completion=True, + ) + + # DANGER + async def delete_collections(self) -> None: + """Danger. this is only for tests.""" + await self.client.delete_by_query( + index=COLLECTIONS_INDEX, + body={"query": {"match_all": {}}}, + wait_for_completion=True, + ) diff --git a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_elasticsearch/database_logic.py b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic/database_logic_opensearch.py similarity index 99% rename from stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_elasticsearch/database_logic.py rename to stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic/database_logic_opensearch.py index 26d830f0..0bcf9d7a 100644 --- a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_elasticsearch/database_logic.py +++ b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic/database_logic_opensearch.py @@ -10,8 +10,8 @@ from elasticsearch import exceptions, helpers # type: ignore from stac_fastapi.elasticsearch import serializers -from stac_fastapi.elasticsearch.config import AsyncElasticsearchSettings -from stac_fastapi.elasticsearch.config import ( +from stac_fastapi.elasticsearch.config.config_opensearch import AsyncElasticsearchSettings +from stac_fastapi.elasticsearch.config.config_opensearch import ( ElasticsearchSettings as SyncElasticsearchSettings, ) from stac_fastapi.elasticsearch.extensions import filter diff --git a/stac_fastapi/elasticsearch/tests/conftest.py b/stac_fastapi/elasticsearch/tests/conftest.py index 888ec8e3..23989f9d 100644 --- a/stac_fastapi/elasticsearch/tests/conftest.py +++ b/stac_fastapi/elasticsearch/tests/conftest.py @@ -10,13 +10,13 @@ from stac_fastapi.api.app import StacApi from stac_fastapi.api.models import create_get_request_model, create_post_request_model -from stac_fastapi.elasticsearch.config import AsyncElasticsearchSettings +from stac_fastapi.elasticsearch.config.config_elasticsearch import AsyncElasticsearchSettings from stac_fastapi.elasticsearch.core import ( BulkTransactionsClient, CoreClient, TransactionsClient, ) -from stac_fastapi.elasticsearch.database_elasticsearch.database_logic import create_collection_index +from stac_fastapi.elasticsearch.database_elasticsearch.database_logic.database_logic_elasticsearch import create_collection_index from stac_fastapi.elasticsearch.extensions import QueryExtension from stac_fastapi.extensions.core import ( # FieldsExtension, ContextExtension, From afbeb2025d1ffeb56b5af9af653e7104f13a1772 Mon Sep 17 00:00:00 2001 From: James Date: Tue, 23 Jan 2024 13:58:44 -0500 Subject: [PATCH 06/14] Opensearch database_logic, conditional imports. OS tests failing --- Makefile | 25 ++++- data_loader/data_loader.py | 14 ++- docker-compose.yml | 97 ++++++++++--------- opensearch/config/opensearch.yml | 46 +++++---- stac_fastapi/elasticsearch/setup.py | 4 +- .../stac_fastapi/elasticsearch/app.py | 14 ++- .../config/config_elasticsearch.py | 4 +- .../elasticsearch/config/config_opensearch.py | 6 +- .../stac_fastapi/elasticsearch/core.py | 14 ++- .../database_logic_elasticsearch.py | 14 +-- .../database_logic_opensearch.py | 62 +++++++----- stac_fastapi/elasticsearch/tests/conftest.py | 15 ++- 12 files changed, 193 insertions(+), 122 deletions(-) diff --git a/Makefile b/Makefile index ce2609bc..589f9503 100644 --- a/Makefile +++ b/Makefile @@ -1,20 +1,32 @@ #!make APP_HOST ?= 0.0.0.0 -APP_PORT ?= 8080 +ES_APP_PORT ?= 8080 EXTERNAL_APP_PORT ?= ${APP_PORT} -APP_PORT ?= 8080 +ES_APP_PORT ?= 8080 ES_HOST ?= docker.for.mac.localhost ES_PORT ?= 9200 +OS_APP_PORT ?= 8082 +ES_HOST ?= docker.for.mac.localhost +OS_PORT ?= 9202 + run_es = docker-compose \ run \ - -p ${EXTERNAL_APP_PORT}:${APP_PORT} \ + -p ${EXTERNAL_APP_PORT}:${ES_APP_PORT} \ -e PY_IGNORE_IMPORTMISMATCH=1 \ -e APP_HOST=${APP_HOST} \ - -e APP_PORT=${APP_PORT} \ + -e APP_PORT=${ES_APP_PORT} \ app-elasticsearch +run_os = docker-compose \ + run \ + -p ${EXTERNAL_APP_PORT}:${OS_APP_PORT} \ + -e PY_IGNORE_IMPORTMISMATCH=1 \ + -e APP_HOST=${APP_HOST} \ + -e APP_PORT=${OS_APP_PORT} \ + app-opensearch + .PHONY: image-deploy image-deploy: docker build -f Dockerfile.deploy -t stac-fastapi-elasticsearch:latest . @@ -45,6 +57,11 @@ test: -$(run_es) /bin/bash -c 'export && ./scripts/wait-for-it-es.sh elasticsearch:9200 && cd /app/stac_fastapi/elasticsearch/tests/ && pytest' docker-compose down +.PHONY: test-opensearch +test-opensearch: + -$(run_os) /bin/bash -c 'export && ./scripts/wait-for-it-es.sh opensearch:9202 && cd /app/stac_fastapi/elasticsearch/tests/clients && pytest' + docker-compose down + .PHONY: run-database run-database: docker-compose run --rm elasticsearch diff --git a/data_loader/data_loader.py b/data_loader/data_loader.py index b8d9f93e..54e6b57a 100644 --- a/data_loader/data_loader.py +++ b/data_loader/data_loader.py @@ -1,12 +1,24 @@ """Database ingestion script.""" import json import os +import sys import click import requests +if len(sys.argv) != 2: + print("Usage: python data_loader.py ") + sys.exit(1) + DATA_DIR = os.path.join(os.path.dirname(__file__), "setup_data/") -STAC_API_BASE_URL = "http://localhost:8080" + +backend = sys.argv[1].lower() +if backend == "opensearch": + STAC_API_BASE_URL = "http://localhost:8082" +elif backend == "elasticsearch": + STAC_API_BASE_URL = "http://localhost:8080" +else: + print("Invalid backend tag. Enter either 'opensearch' or 'elasticsearch'.") def load_data(filename): diff --git a/docker-compose.yml b/docker-compose.yml index c181b096..354c3823 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,33 +1,34 @@ version: '3.9' services: - # app-elasticsearch: - # container_name: stac-fastapi-es - # image: stac-utils/stac-fastapi-es - # restart: always - # build: - # context: . - # dockerfile: Dockerfile.dev - # environment: - # - APP_HOST=0.0.0.0 - # - APP_PORT=8080 - # - RELOAD=true - # - ENVIRONMENT=local - # - WEB_CONCURRENCY=10 - # - ES_HOST=172.17.0.1 - # - ES_PORT=9200 - # - ES_USE_SSL=false - # - ES_VERIFY_CERTS=false - # ports: - # - "8080:8080" - # volumes: - # - ./stac_fastapi:/app/stac_fastapi - # - ./scripts:/app/scripts - # - ./esdata:/usr/share/elasticsearch/data - # depends_on: - # - elasticsearch - # command: - # bash -c "./scripts/wait-for-it-es.sh es-container:9200 && python -m stac_fastapi.elasticsearch.app" + app-elasticsearch: + container_name: stac-fastapi-es + image: stac-utils/stac-fastapi-es + restart: always + build: + context: . + dockerfile: Dockerfile.dev + environment: + - APP_HOST=0.0.0.0 + - APP_PORT=8080 + - RELOAD=true + - ENVIRONMENT=local + - WEB_CONCURRENCY=10 + - ES_HOST=172.17.0.1 + - ES_PORT=9200 + - ES_USE_SSL=false + - ES_VERIFY_CERTS=false + - BACKEND=elasticsearch + ports: + - "8080:8080" + volumes: + - ./stac_fastapi:/app/stac_fastapi + - ./scripts:/app/scripts + - ./esdata:/usr/share/elasticsearch/data + depends_on: + - elasticsearch + command: + bash -c "./scripts/wait-for-it-es.sh es-container:9200 && python -m stac_fastapi.elasticsearch.app" app-opensearch: container_name: stac-fastapi-os @@ -38,16 +39,17 @@ services: dockerfile: Dockerfile.dev environment: - APP_HOST=0.0.0.0 - - APP_PORT=8080 + - APP_PORT=8082 - RELOAD=true - ENVIRONMENT=local - WEB_CONCURRENCY=10 - - OS_HOST=172.17.0.1 - - OS_PORT=9200 - - OS_USE_SSL=false - - OS_VERIFY_CERTS=false + - ES_HOST=172.17.0.1 + - ES_PORT=9202 + - ES_USE_SSL=false + - ES_VERIFY_CERTS=false + - BACKEND=opensearch ports: - - "8082:8080" + - "8082:8082" volumes: - ./stac_fastapi:/app/stac_fastapi - ./scripts:/app/scripts @@ -55,27 +57,28 @@ services: depends_on: - opensearch command: - bash -c "./scripts/wait-for-it-es.sh os-container:9200 && python -m stac_fastapi.elasticsearch.app" + bash -c "./scripts/wait-for-it-es.sh os-container:9202 && python -m stac_fastapi.elasticsearch.app" - # elasticsearch: - # container_name: es-container - # image: docker.elastic.co/elasticsearch/elasticsearch:${ELASTICSEARCH_VERSION:-8.11.0} - # environment: - # ES_JAVA_OPTS: -Xms512m -Xmx1g - # volumes: - # - ./elasticsearch/config/elasticsearch.yml:/usr/share/elasticsearch/config/elasticsearch.yml - # - ./elasticsearch/snapshots:/usr/share/elasticsearch/snapshots - # ports: - # - "9200:9200" + elasticsearch: + container_name: es-container + image: docker.elastic.co/elasticsearch/elasticsearch:${ELASTICSEARCH_VERSION:-8.11.0} + environment: + ES_JAVA_OPTS: -Xms512m -Xmx1g + volumes: + - ./elasticsearch/config/elasticsearch.yml:/usr/share/elasticsearch/config/elasticsearch.yml + - ./elasticsearch/snapshots:/usr/share/elasticsearch/snapshots + ports: + - "9200:9200" opensearch: container_name: os-container image: opensearchproject/opensearch:${OPENSEARCH_VERSION:-2.11.1} environment: - - "discovery.type=single-node" - - "plugins.security.disabled=true" + - discovery.type=single-node + - plugins.security.disabled=true + - OPENSEARCH_JAVA_OPTS=-Xms512m -Xmx512m volumes: - ./opensearch/config/opensearch.yml:/usr/share/opensearch/config/opensearch.yml - ./opensearch/snapshots:/usr/share/opensearch/snapshots ports: - - "9202:9200" + - "9202:9202" diff --git a/opensearch/config/opensearch.yml b/opensearch/config/opensearch.yml index 71607e6c..249eb3e8 100644 --- a/opensearch/config/opensearch.yml +++ b/opensearch/config/opensearch.yml @@ -4,32 +4,38 @@ node.name: os01 network.host: 0.0.0.0 transport.host: 0.0.0.0 discovery.type: single-node -http.port: 9200 +http.port: 9202 +http.cors.enabled: true +http.cors.allow-headers: X-Requested-With,Content-Type,Content-Length,Accept,Authorization path: repo: - /usr/share/opensearch/snapshots -######## Start OpenSearch Security Demo Configuration ######## -# WARNING: revise all the lines below before you go into production -plugins.security.ssl.transport.pemcert_filepath: esnode.pem -plugins.security.ssl.transport.pemkey_filepath: esnode-key.pem -plugins.security.ssl.transport.pemtrustedcas_filepath: root-ca.pem -plugins.security.ssl.transport.enforce_hostname_verification: false +# Security +plugins.security.disabled: true plugins.security.ssl.http.enabled: true -plugins.security.ssl.http.pemcert_filepath: esnode.pem -plugins.security.ssl.http.pemkey_filepath: esnode-key.pem -plugins.security.ssl.http.pemtrustedcas_filepath: root-ca.pem -plugins.security.allow_unsafe_democertificates: true -plugins.security.allow_default_init_securityindex: true -plugins.security.authcz.admin_dn: - - CN=kirk,OU=client,O=client,L=test, C=de -plugins.security.audit.type: internal_opensearch -plugins.security.enable_snapshot_restore_privilege: true -plugins.security.check_snapshot_restore_write_privileges: true -plugins.security.restapi.roles_enabled: ["all_access", "security_rest_api_access"] -plugins.security.system_indices.enabled: true -plugins.security.system_indices.indices: [".plugins-ml-config", ".plugins-ml-connector", ".plugins-ml-model-group", ".plugins-ml-model", ".plugins-ml-task", ".plugins-ml-conversation-meta", ".plugins-ml-conversation-interactions", ".opendistro-alerting-config", ".opendistro-alerting-alert*", ".opendistro-anomaly-results*", ".opendistro-anomaly-detector*", ".opendistro-anomaly-checkpoints", ".opendistro-anomaly-detection-state", ".opendistro-reports-*", ".opensearch-notifications-*", ".opensearch-notebooks", ".opensearch-observability", ".ql-datasources", ".opendistro-asynchronous-search-response*", ".replication-metadata-store", ".opensearch-knn-models", ".geospatial-ip2geo-data*"] node.max_local_storage_nodes: 3 + +######## Start OpenSearch Security Demo Configuration ######## +# WARNING: revise all the lines below before you go into production +# plugins.security.ssl.transport.pemcert_filepath: esnode.pem +# plugins.security.ssl.transport.pemkey_filepath: esnode-key.pem +# plugins.security.ssl.transport.pemtrustedcas_filepath: root-ca.pem +# plugins.security.ssl.transport.enforce_hostname_verification: false +# plugins.security.ssl.http.pemcert_filepath: esnode.pem +# plugins.security.ssl.http.pemkey_filepath: esnode-key.pem +# plugins.security.ssl.http.pemtrustedcas_filepath: root-ca.pem +# plugins.security.allow_unsafe_democertificates: true +# plugins.security.allow_default_init_securityindex: true +# plugins.security.authcz.admin_dn: +# - CN=kirk,OU=client,O=client,L=test, C=de + +# plugins.security.audit.type: internal_opensearch +# plugins.security.enable_snapshot_restore_privilege: true +# plugins.security.check_snapshot_restore_write_privileges: true +# plugins.security.restapi.roles_enabled: ["all_access", "security_rest_api_access"] +# plugins.security.system_indices.enabled: true +# plugins.security.system_indices.indices: [".plugins-ml-config", ".plugins-ml-connector", ".plugins-ml-model-group", ".plugins-ml-model", ".plugins-ml-task", ".plugins-ml-conversation-meta", ".plugins-ml-conversation-interactions", ".opendistro-alerting-config", ".opendistro-alerting-alert*", ".opendistro-anomaly-results*", ".opendistro-anomaly-detector*", ".opendistro-anomaly-checkpoints", ".opendistro-anomaly-detection-state", ".opendistro-reports-*", ".opensearch-notifications-*", ".opensearch-notebooks", ".opensearch-observability", ".ql-datasources", ".opendistro-asynchronous-search-response*", ".replication-metadata-store", ".opensearch-knn-models", ".geospatial-ip2geo-data*"] ######## End OpenSearch Security Demo Configuration ######## diff --git a/stac_fastapi/elasticsearch/setup.py b/stac_fastapi/elasticsearch/setup.py index 201b6dc8..a34c5e7c 100644 --- a/stac_fastapi/elasticsearch/setup.py +++ b/stac_fastapi/elasticsearch/setup.py @@ -15,8 +15,8 @@ "stac-fastapi.extensions==2.4.9", "elasticsearch[async]==8.11.0", "elasticsearch-dsl==8.11.0", - "opensearch-py==2.1.1", - "opensearch-py[async]==2.1.1", + "opensearch-py==2.4.2", + "opensearch-py[async]==2.4.2", "pystac[validation]", "uvicorn", "orjson", diff --git a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/app.py b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/app.py index 5a37e3b5..1330ed89 100644 --- a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/app.py +++ b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/app.py @@ -1,14 +1,22 @@ """FastAPI application.""" from stac_fastapi.api.app import StacApi from stac_fastapi.api.models import create_get_request_model, create_post_request_model -from stac_fastapi.elasticsearch.config.config_elasticsearch import ElasticsearchSettings from stac_fastapi.elasticsearch.core import ( BulkTransactionsClient, CoreClient, EsAsyncBaseFiltersClient, TransactionsClient, ) -from stac_fastapi.elasticsearch.database_elasticsearch.database_logic.database_logic_elasticsearch import create_collection_index +import os + +backend = os.getenv("BACKEND", "elasticsearch").lower() +if backend == "opensearch": + from stac_fastapi.elasticsearch.config.config_opensearch import SearchSettings + from stac_fastapi.elasticsearch.database_logic.database_logic_opensearch import create_collection_index +else: + from stac_fastapi.elasticsearch.config.config_elasticsearch import SearchSettings + from stac_fastapi.elasticsearch.database_logic.database_logic_elasticsearch import create_collection_index + from stac_fastapi.elasticsearch.extensions import QueryExtension from stac_fastapi.elasticsearch.session import Session from stac_fastapi.extensions.core import ( @@ -21,7 +29,7 @@ ) from stac_fastapi.extensions.third_party import BulkTransactionExtension -settings = ElasticsearchSettings() +settings = SearchSettings() session = Session.create_from_settings(settings) filter_extension = FilterExtension(client=EsAsyncBaseFiltersClient()) diff --git a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/config/config_elasticsearch.py b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/config/config_elasticsearch.py index 8634d3b9..5df7a7db 100644 --- a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/config/config_elasticsearch.py +++ b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/config/config_elasticsearch.py @@ -45,7 +45,7 @@ def _es_config() -> Dict[str, Any]: _forbidden_fields: Set[str] = {"type"} -class ElasticsearchSettings(ApiSettings): +class SearchSettings(ApiSettings): """API settings.""" # Fields which are defined by STAC but not included in the database model @@ -58,7 +58,7 @@ def create_client(self): return Elasticsearch(**_es_config()) -class AsyncElasticsearchSettings(ApiSettings): +class AsyncSearchSettings(ApiSettings): """API settings.""" # Fields which are defined by STAC but not included in the database model diff --git a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/config/config_opensearch.py b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/config/config_opensearch.py index 3f8b88b8..291452d8 100644 --- a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/config/config_opensearch.py +++ b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/config/config_opensearch.py @@ -18,7 +18,7 @@ def _es_config() -> Dict[str, Any]: # Initialize the configuration dictionary config = { "hosts": hosts, - "headers": {"accept": "application/vnd.elasticsearch+json; compatible-with=7"}, + "headers": {"accept": "application/json", "Content-Type": "application/json"}, } # Explicitly exclude SSL settings when not using SSL @@ -45,7 +45,7 @@ def _es_config() -> Dict[str, Any]: _forbidden_fields: Set[str] = {"type"} -class ElasticsearchSettings(ApiSettings): +class SearchSettings(ApiSettings): """API settings.""" # Fields which are defined by STAC but not included in the database model @@ -58,7 +58,7 @@ def create_client(self): return OpenSearch(**_es_config()) -class AsyncElasticsearchSettings(ApiSettings): +class AsyncSearchSettings(ApiSettings): """API settings.""" # Fields which are defined by STAC but not included in the database model diff --git a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/core.py b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/core.py index 45dfefb4..6bbc65d6 100644 --- a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/core.py +++ b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/core.py @@ -1,6 +1,7 @@ """Item crud client.""" import logging import re +import os from base64 import urlsafe_b64encode from datetime import datetime as datetime_type from datetime import timezone @@ -18,9 +19,16 @@ from stac_pydantic.links import Relations from stac_pydantic.shared import MimeTypes +backend = os.getenv("BACKEND", "elasticsearch").lower() +if backend == "opensearch": + from stac_fastapi.elasticsearch.database_logic.database_logic_opensearch import DatabaseLogic + from stac_fastapi.elasticsearch.config.config_opensearch import SearchSettings +else: + from stac_fastapi.elasticsearch.database_logic.database_logic_elasticsearch import DatabaseLogic + from stac_fastapi.elasticsearch.config.config_elasticsearch import SearchSettings + + from stac_fastapi.elasticsearch import serializers -from stac_fastapi.elasticsearch.config.config_elasticsearch import ElasticsearchSettings -from stac_fastapi.elasticsearch.database_elasticsearch.database_logic.database_logic_elasticsearch import DatabaseLogic from stac_fastapi.elasticsearch.models.links import PagingLinks from stac_fastapi.elasticsearch.serializers import CollectionSerializer, ItemSerializer from stac_fastapi.elasticsearch.session import Session @@ -716,7 +724,7 @@ class BulkTransactionsClient(BaseBulkTransactionsClient): def __attrs_post_init__(self): """Create es engine.""" - settings = ElasticsearchSettings() + settings = SearchSettings() self.client = settings.create_client def preprocess_item( diff --git a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic/database_logic_elasticsearch.py b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic/database_logic_elasticsearch.py index 3ecc3797..3e871aec 100644 --- a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic/database_logic_elasticsearch.py +++ b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic/database_logic_elasticsearch.py @@ -10,9 +10,9 @@ from elasticsearch import exceptions, helpers # type: ignore from stac_fastapi.elasticsearch import serializers -from stac_fastapi.elasticsearch.config.config_elasticsearch import AsyncElasticsearchSettings +from stac_fastapi.elasticsearch.config.config_elasticsearch import AsyncSearchSettings from stac_fastapi.elasticsearch.config.config_elasticsearch import ( - ElasticsearchSettings as SyncElasticsearchSettings, + SearchSettings as SyncSearchSettings, ) from stac_fastapi.elasticsearch.extensions import filter from stac_fastapi.types.errors import ConflictError, NotFoundError @@ -178,7 +178,7 @@ async def create_collection_index() -> None: None """ - client = AsyncElasticsearchSettings().create_client + client = AsyncSearchSettings().create_client await client.options(ignore_status=400).indices.create( index=f"{COLLECTIONS_INDEX}-000001", @@ -199,7 +199,7 @@ async def create_item_index(collection_id: str): None """ - client = AsyncElasticsearchSettings().create_client + client = AsyncSearchSettings().create_client index_name = index_by_collection_id(collection_id) await client.options(ignore_status=400).indices.create( @@ -217,7 +217,7 @@ async def delete_item_index(collection_id: str): Args: collection_id (str): The ID of the collection whose items index will be deleted. """ - client = AsyncElasticsearchSettings().create_client + client = AsyncSearchSettings().create_client name = index_by_collection_id(collection_id) resolved = await client.indices.resolve_index(name=name) @@ -279,8 +279,8 @@ class Geometry(Protocol): # noqa class DatabaseLogic: """Database logic.""" - client = AsyncElasticsearchSettings().create_client - sync_client = SyncElasticsearchSettings().create_client + client = AsyncSearchSettings().create_client + sync_client = SyncSearchSettings().create_client item_serializer: Type[serializers.ItemSerializer] = attr.ib( default=serializers.ItemSerializer diff --git a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic/database_logic_opensearch.py b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic/database_logic_opensearch.py index 0bcf9d7a..8649553e 100644 --- a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic/database_logic_opensearch.py +++ b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic/database_logic_opensearch.py @@ -6,13 +6,13 @@ from typing import Any, Dict, Iterable, List, Optional, Protocol, Tuple, Type, Union import attr -from elasticsearch_dsl import Q, Search +from opensearchpy.helpers.search import Search -from elasticsearch import exceptions, helpers # type: ignore +from opensearchpy import helpers, exceptions from stac_fastapi.elasticsearch import serializers -from stac_fastapi.elasticsearch.config.config_opensearch import AsyncElasticsearchSettings +from stac_fastapi.elasticsearch.config.config_opensearch import AsyncSearchSettings from stac_fastapi.elasticsearch.config.config_opensearch import ( - ElasticsearchSettings as SyncElasticsearchSettings, + SearchSettings as SyncSearchSettings, ) from stac_fastapi.elasticsearch.extensions import filter from stac_fastapi.types.errors import ConflictError, NotFoundError @@ -178,12 +178,18 @@ async def create_collection_index() -> None: None """ - client = AsyncElasticsearchSettings().create_client + client = AsyncSearchSettings().create_client - await client.options(ignore_status=400).indices.create( - index=f"{COLLECTIONS_INDEX}-000001", - aliases={COLLECTIONS_INDEX: {}}, - mappings=ES_COLLECTIONS_MAPPINGS, + search_body = { + "mappings": ES_COLLECTIONS_MAPPINGS, + "aliases":{COLLECTIONS_INDEX: {}} + } + + index = f"{COLLECTIONS_INDEX}-000001" + + await client.indices.create( + index=index, + body=search_body ) await client.close() @@ -199,14 +205,17 @@ async def create_item_index(collection_id: str): None """ - client = AsyncElasticsearchSettings().create_client + client = AsyncSearchSettings().create_client index_name = index_by_collection_id(collection_id) + search_body = { + "aliases":{index_name: {}}, + "mappings":ES_ITEMS_MAPPINGS, + "settings":ES_ITEMS_SETTINGS, + } - await client.options(ignore_status=400).indices.create( + await client.indices.create( index=f"{index_by_collection_id(collection_id)}-000001", - aliases={index_name: {}}, - mappings=ES_ITEMS_MAPPINGS, - settings=ES_ITEMS_SETTINGS, + body=search_body ) await client.close() @@ -217,7 +226,7 @@ async def delete_item_index(collection_id: str): Args: collection_id (str): The ID of the collection whose items index will be deleted. """ - client = AsyncElasticsearchSettings().create_client + client = AsyncSearchSettings().create_client name = index_by_collection_id(collection_id) resolved = await client.indices.resolve_index(name=name) @@ -279,8 +288,8 @@ class Geometry(Protocol): # noqa class DatabaseLogic: """Database logic.""" - client = AsyncElasticsearchSettings().create_client - sync_client = SyncElasticsearchSettings().create_client + client = AsyncSearchSettings().create_client + sync_client = SyncSearchSettings().create_client item_serializer: Type[serializers.ItemSerializer] = attr.ib( default=serializers.ItemSerializer @@ -308,12 +317,13 @@ async def get_all_collections( with the `COLLECTIONS_INDEX` as the target index and `size=limit` to retrieve records. The result is a generator of dictionaries containing the source data for each collection. """ - search_after = None + search_body = {} if token: search_after = urlsafe_b64decode(token.encode()).decode().split(",") + search_body["search_after"] = search_after collections = await self.client.search( index=COLLECTIONS_INDEX, - search_after=search_after, + body=search_body, size=limit, sort={"id": {"order": "asc"}}, ) @@ -518,11 +528,12 @@ async def execute_search( Raises: NotFoundError: If the collections specified in `collection_ids` do not exist. """ - search_after = None + search_body = {} + query = search.query.to_dict() if search.query else None + search_body["query"] = query if query else {} if token: search_after = urlsafe_b64decode(token.encode()).decode().split(",") - - query = search.query.to_dict() if search.query else None + search_body["search_after"] = search_after index_param = indices(collection_ids) @@ -530,9 +541,8 @@ async def execute_search( self.client.search( index=index_param, ignore_unavailable=ignore_unavailable, - query=query, + body=search_body, sort=sort or DEFAULT_SORT, - search_after=search_after, size=limit, ) ) @@ -663,7 +673,7 @@ async def create_item(self, item: Item, refresh: bool = False): es_resp = await self.client.index( index=index_by_collection_id(collection_id), id=mk_item_id(item_id, collection_id), - document=item, + body=item, refresh=refresh, ) @@ -717,7 +727,7 @@ async def create_collection(self, collection: Collection, refresh: bool = False) await self.client.index( index=COLLECTIONS_INDEX, id=collection_id, - document=collection, + body=collection, refresh=refresh, ) diff --git a/stac_fastapi/elasticsearch/tests/conftest.py b/stac_fastapi/elasticsearch/tests/conftest.py index 23989f9d..c18b3dde 100644 --- a/stac_fastapi/elasticsearch/tests/conftest.py +++ b/stac_fastapi/elasticsearch/tests/conftest.py @@ -10,13 +10,20 @@ from stac_fastapi.api.app import StacApi from stac_fastapi.api.models import create_get_request_model, create_post_request_model -from stac_fastapi.elasticsearch.config.config_elasticsearch import AsyncElasticsearchSettings + +backend = os.getenv("BACKEND", "elasticsearch").lower() +if backend == "opensearch": + from stac_fastapi.elasticsearch.config.config_opensearch import AsyncSearchSettings + from stac_fastapi.elasticsearch.database_logic.database_logic_opensearch import create_collection_index +else: + from stac_fastapi.elasticsearch.config.config_elasticsearch import AsyncSearchSettings + from stac_fastapi.elasticsearch.database_logic.database_logic_elasticsearch import create_collection_index + from stac_fastapi.elasticsearch.core import ( BulkTransactionsClient, CoreClient, TransactionsClient, ) -from stac_fastapi.elasticsearch.database_elasticsearch.database_logic.database_logic_elasticsearch import create_collection_index from stac_fastapi.elasticsearch.extensions import QueryExtension from stac_fastapi.extensions.core import ( # FieldsExtension, ContextExtension, @@ -53,7 +60,7 @@ def __init__( self.query_params = query_params or {} -class TestSettings(AsyncElasticsearchSettings): +class TestSettings(AsyncSearchSettings): class Config: env_file = ".env.test" @@ -160,7 +167,7 @@ def bulk_txn_client(): @pytest_asyncio.fixture(scope="session") async def app(): - settings = AsyncElasticsearchSettings() + settings = AsyncSearchSettings() extensions = [ TransactionExtension( client=TransactionsClient(session=None), settings=settings From d39c9c70f3698ee61f963f805839bfb5d82754cd Mon Sep 17 00:00:00 2001 From: James Date: Mon, 29 Jan 2024 20:04:35 -0500 Subject: [PATCH 07/14] ignore 400 fix --- Makefile | 10 ++++-- README.md | 4 +++ .../database_logic_opensearch.py | 33 ++++++++++++++----- 3 files changed, 35 insertions(+), 12 deletions(-) diff --git a/Makefile b/Makefile index 589f9503..9e3c8c1b 100644 --- a/Makefile +++ b/Makefile @@ -59,13 +59,17 @@ test: .PHONY: test-opensearch test-opensearch: - -$(run_os) /bin/bash -c 'export && ./scripts/wait-for-it-es.sh opensearch:9202 && cd /app/stac_fastapi/elasticsearch/tests/clients && pytest' + -$(run_os) /bin/bash -c 'export && ./scripts/wait-for-it-es.sh opensearch:9202 && cd /app/stac_fastapi/elasticsearch/tests/ && pytest' docker-compose down -.PHONY: run-database -run-database: +.PHONY: run-database-es +run-database-es: docker-compose run --rm elasticsearch +.PHONY: run-database-os +run-database-os: + docker-compose run --rm opensearch + .PHONY: pybase-install pybase-install: pip install wheel && \ diff --git a/README.md b/README.md index 0078d2ef..29868044 100644 --- a/README.md +++ b/README.md @@ -80,6 +80,10 @@ curl -X "GET" "http://localhost:8080/collections?limit=1&token=example_token" ```shell make test ``` + +```shell +make test-opensearch +``` ## Ingest sample data diff --git a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic/database_logic_opensearch.py b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic/database_logic_opensearch.py index 8649553e..f937250e 100644 --- a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic/database_logic_opensearch.py +++ b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic/database_logic_opensearch.py @@ -7,8 +7,9 @@ import attr from opensearchpy.helpers.search import Search - from opensearchpy import helpers, exceptions +from opensearchpy.exceptions import TransportError + from stac_fastapi.elasticsearch import serializers from stac_fastapi.elasticsearch.config.config_opensearch import AsyncSearchSettings from stac_fastapi.elasticsearch.config.config_opensearch import ( @@ -187,10 +188,17 @@ async def create_collection_index() -> None: index = f"{COLLECTIONS_INDEX}-000001" - await client.indices.create( - index=index, - body=search_body - ) + try: + await client.indices.create( + index=index, + body=search_body + ) + except TransportError as e: + if e.status_code == 400: + pass # Ignore 400 status codes + else: + raise e + await client.close() @@ -213,10 +221,17 @@ async def create_item_index(collection_id: str): "settings":ES_ITEMS_SETTINGS, } - await client.indices.create( - index=f"{index_by_collection_id(collection_id)}-000001", - body=search_body - ) + try: + await client.indices.create( + index=f"{index_name}-000001", + body=search_body + ) + except TransportError as e: + if e.status_code == 400: + pass # Ignore 400 status codes + else: + raise e + await client.close() From 9d6a16ef408ba0304f97c74952c3baced22246aa Mon Sep 17 00:00:00 2001 From: James Date: Tue, 30 Jan 2024 14:25:52 -0500 Subject: [PATCH 08/14] sort body param, cicd --- .github/workflows/cicd.yml | 29 ++++++++++++++++++- Makefile | 10 ++++++- README.md | 10 ++++++- opensearch/config/opensearch.yml | 22 -------------- .../stac_fastapi/elasticsearch/core.py | 1 + .../database_logic_opensearch.py | 13 +++++---- 6 files changed, 55 insertions(+), 30 deletions(-) diff --git a/.github/workflows/cicd.yml b/.github/workflows/cicd.yml index 9a50e28f..c8afdcc7 100644 --- a/.github/workflows/cicd.yml +++ b/.github/workflows/cicd.yml @@ -46,6 +46,22 @@ jobs: ES_JAVA_OPTS: -Xms512m -Xmx1g ports: - 9400:9400 + + opensearch_2_11: + image: opensearchproject/opensearch:2.11.1 + env: + cluster.name: stac-cluster + node.name: os01 + network.host: 0.0.0.0 + transport.host: 0.0.0.0 + discovery.type: single-node + http.port: 9202 + http.cors.enabled: true + plugins.security.disabled: true + plugins.security.ssl.http.enabled: true + OPENSEARCH_JAVA_OPTS: -Xms512m -Xmx512m + ports: + - 9202:9202 strategy: matrix: python-version: [ "3.8", "3.9", "3.10", "3.11"] @@ -90,4 +106,15 @@ jobs: ES_PORT: 9400 ES_HOST: 172.17.0.1 ES_USE_SSL: false - ES_VERIFY_CERTS: false \ No newline at end of file + ES_VERIFY_CERTS: false + + - name: Run test suite against OpenSearch 2.11.1 + run: | + cd stac_fastapi/elasticsearch && pipenv run pytest -svvv + env: + ENVIRONMENT: testing + ES_PORT: 9202 + ES_HOST: 172.17.0.1 + ES_USE_SSL: false + ES_VERIFY_CERTS: false + BACKEND: opensearch \ No newline at end of file diff --git a/Makefile b/Makefile index 9e3c8c1b..068c86d1 100644 --- a/Makefile +++ b/Makefile @@ -52,7 +52,7 @@ docker-run: image-dev docker-shell: $(run_es) /bin/bash -.PHONY: test +.PHONY: test-elasticsearch test: -$(run_es) /bin/bash -c 'export && ./scripts/wait-for-it-es.sh elasticsearch:9200 && cd /app/stac_fastapi/elasticsearch/tests/ && pytest' docker-compose down @@ -62,6 +62,14 @@ test-opensearch: -$(run_os) /bin/bash -c 'export && ./scripts/wait-for-it-es.sh opensearch:9202 && cd /app/stac_fastapi/elasticsearch/tests/ && pytest' docker-compose down +.PHONY: test +test: + -$(run_es) /bin/bash -c 'export && ./scripts/wait-for-it-es.sh elasticsearch:9200 && cd /app/stac_fastapi/elasticsearch/tests/ && pytest' + docker-compose down + + -$(run_os) /bin/bash -c 'export && ./scripts/wait-for-it-es.sh opensearch:9202 && cd /app/stac_fastapi/elasticsearch/tests/ && pytest' + docker-compose down + .PHONY: run-database-es run-database-es: docker-compose run --rm elasticsearch diff --git a/README.md b/README.md index 29868044..552ee0f4 100644 --- a/README.md +++ b/README.md @@ -80,11 +80,19 @@ curl -X "GET" "http://localhost:8080/collections?limit=1&token=example_token" ```shell make test ``` +Test against OpenSearch only ```shell make test-opensearch ``` - + +Test against Elasticsearch only + +```shell +make test-elasticsearch +``` + + ## Ingest sample data ```shell diff --git a/opensearch/config/opensearch.yml b/opensearch/config/opensearch.yml index 249eb3e8..5e44b259 100644 --- a/opensearch/config/opensearch.yml +++ b/opensearch/config/opensearch.yml @@ -17,25 +17,3 @@ plugins.security.disabled: true plugins.security.ssl.http.enabled: true node.max_local_storage_nodes: 3 - -######## Start OpenSearch Security Demo Configuration ######## -# WARNING: revise all the lines below before you go into production -# plugins.security.ssl.transport.pemcert_filepath: esnode.pem -# plugins.security.ssl.transport.pemkey_filepath: esnode-key.pem -# plugins.security.ssl.transport.pemtrustedcas_filepath: root-ca.pem -# plugins.security.ssl.transport.enforce_hostname_verification: false -# plugins.security.ssl.http.pemcert_filepath: esnode.pem -# plugins.security.ssl.http.pemkey_filepath: esnode-key.pem -# plugins.security.ssl.http.pemtrustedcas_filepath: root-ca.pem -# plugins.security.allow_unsafe_democertificates: true -# plugins.security.allow_default_init_securityindex: true -# plugins.security.authcz.admin_dn: -# - CN=kirk,OU=client,O=client,L=test, C=de - -# plugins.security.audit.type: internal_opensearch -# plugins.security.enable_snapshot_restore_privilege: true -# plugins.security.check_snapshot_restore_write_privileges: true -# plugins.security.restapi.roles_enabled: ["all_access", "security_rest_api_access"] -# plugins.security.system_indices.enabled: true -# plugins.security.system_indices.indices: [".plugins-ml-config", ".plugins-ml-connector", ".plugins-ml-model-group", ".plugins-ml-model", ".plugins-ml-task", ".plugins-ml-conversation-meta", ".plugins-ml-conversation-interactions", ".opendistro-alerting-config", ".opendistro-alerting-alert*", ".opendistro-anomaly-results*", ".opendistro-anomaly-detector*", ".opendistro-anomaly-checkpoints", ".opendistro-anomaly-detection-state", ".opendistro-reports-*", ".opensearch-notifications-*", ".opensearch-notebooks", ".opensearch-observability", ".ql-datasources", ".opendistro-asynchronous-search-response*", ".replication-metadata-store", ".opensearch-knn-models", ".geospatial-ip2geo-data*"] -######## End OpenSearch Security Demo Configuration ######## diff --git a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/core.py b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/core.py index 6bbc65d6..0a5d3413 100644 --- a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/core.py +++ b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/core.py @@ -110,6 +110,7 @@ async def all_collections(self, **kwargs) -> Collections: next_link = None if len(hits) == limit: last_hit = hits[-1] + logger.info(last_hit) next_search_after = last_hit["sort"] next_token = urlsafe_b64encode( ",".join(map(str, next_search_after)).encode() diff --git a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic/database_logic_opensearch.py b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic/database_logic_opensearch.py index f937250e..360837b4 100644 --- a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic/database_logic_opensearch.py +++ b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic/database_logic_opensearch.py @@ -7,6 +7,7 @@ import attr from opensearchpy.helpers.search import Search +from opensearchpy.helpers.query import Q from opensearchpy import helpers, exceptions from opensearchpy.exceptions import TransportError @@ -336,11 +337,12 @@ async def get_all_collections( if token: search_after = urlsafe_b64decode(token.encode()).decode().split(",") search_body["search_after"] = search_after + search_body["sort"] = {"id": {"order": "asc"}} + collections = await self.client.search( index=COLLECTIONS_INDEX, body=search_body, - size=limit, - sort={"id": {"order": "asc"}}, + size=limit ) hits = collections["hits"]["hits"] return hits @@ -545,10 +547,12 @@ async def execute_search( """ search_body = {} query = search.query.to_dict() if search.query else None - search_body["query"] = query if query else {} + if query: + search_body["query"] = query if token: search_after = urlsafe_b64decode(token.encode()).decode().split(",") search_body["search_after"] = search_after + search_body["sort"] = sort if sort else DEFAULT_SORT index_param = indices(collection_ids) @@ -557,8 +561,7 @@ async def execute_search( index=index_param, ignore_unavailable=ignore_unavailable, body=search_body, - sort=sort or DEFAULT_SORT, - size=limit, + size=limit ) ) From 6d48213c44d7b7e027f8b8a751729c27b3918678 Mon Sep 17 00:00:00 2001 From: James Date: Wed, 31 Jan 2024 11:13:03 -0500 Subject: [PATCH 09/14] formatting and linting --- data_loader/data_loader.py | 6 +- .../stac_fastapi/elasticsearch/app.py | 14 +++-- .../elasticsearch/config/config_opensearch.py | 1 + .../stac_fastapi/elasticsearch/core.py | 14 +++-- .../database_logic_elasticsearch.py | 2 +- .../database_logic_opensearch.py | 55 +++++++++---------- .../stac_fastapi/elasticsearch/utilities.py | 6 ++ stac_fastapi/elasticsearch/tests/conftest.py | 15 +++-- 8 files changed, 63 insertions(+), 50 deletions(-) diff --git a/data_loader/data_loader.py b/data_loader/data_loader.py index 54e6b57a..c438811d 100644 --- a/data_loader/data_loader.py +++ b/data_loader/data_loader.py @@ -7,8 +7,8 @@ import requests if len(sys.argv) != 2: - print("Usage: python data_loader.py ") - sys.exit(1) + print("Usage: python data_loader.py ") + sys.exit(1) DATA_DIR = os.path.join(os.path.dirname(__file__), "setup_data/") @@ -18,7 +18,7 @@ elif backend == "elasticsearch": STAC_API_BASE_URL = "http://localhost:8080" else: - print("Invalid backend tag. Enter either 'opensearch' or 'elasticsearch'.") + print("Invalid backend tag. Enter either 'opensearch' or 'elasticsearch'.") def load_data(filename): diff --git a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/app.py b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/app.py index 1330ed89..570edcc5 100644 --- a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/app.py +++ b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/app.py @@ -1,4 +1,6 @@ """FastAPI application.""" +import os + from stac_fastapi.api.app import StacApi from stac_fastapi.api.models import create_get_request_model, create_post_request_model from stac_fastapi.elasticsearch.core import ( @@ -7,15 +9,17 @@ EsAsyncBaseFiltersClient, TransactionsClient, ) -import os -backend = os.getenv("BACKEND", "elasticsearch").lower() -if backend == "opensearch": +if os.getenv("BACKEND", "elasticsearch").lower() == "opensearch": from stac_fastapi.elasticsearch.config.config_opensearch import SearchSettings - from stac_fastapi.elasticsearch.database_logic.database_logic_opensearch import create_collection_index + from stac_fastapi.elasticsearch.database_logic.database_logic_opensearch import ( + create_collection_index, + ) else: from stac_fastapi.elasticsearch.config.config_elasticsearch import SearchSettings - from stac_fastapi.elasticsearch.database_logic.database_logic_elasticsearch import create_collection_index + from stac_fastapi.elasticsearch.database_logic.database_logic_elasticsearch import ( + create_collection_index, + ) from stac_fastapi.elasticsearch.extensions import QueryExtension from stac_fastapi.elasticsearch.session import Session diff --git a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/config/config_opensearch.py b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/config/config_opensearch.py index 291452d8..643b81ce 100644 --- a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/config/config_opensearch.py +++ b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/config/config_opensearch.py @@ -4,6 +4,7 @@ from typing import Any, Dict, Set from opensearchpy import AsyncOpenSearch, OpenSearch + from stac_fastapi.types.config import ApiSettings diff --git a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/core.py b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/core.py index 0a5d3413..cc261889 100644 --- a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/core.py +++ b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/core.py @@ -1,7 +1,7 @@ """Item crud client.""" import logging -import re import os +import re from base64 import urlsafe_b64encode from datetime import datetime as datetime_type from datetime import timezone @@ -19,15 +19,17 @@ from stac_pydantic.links import Relations from stac_pydantic.shared import MimeTypes -backend = os.getenv("BACKEND", "elasticsearch").lower() -if backend == "opensearch": - from stac_fastapi.elasticsearch.database_logic.database_logic_opensearch import DatabaseLogic +if os.getenv("BACKEND", "elasticsearch").lower() == "opensearch": from stac_fastapi.elasticsearch.config.config_opensearch import SearchSettings + from stac_fastapi.elasticsearch.database_logic.database_logic_opensearch import ( + DatabaseLogic, + ) else: - from stac_fastapi.elasticsearch.database_logic.database_logic_elasticsearch import DatabaseLogic + from stac_fastapi.elasticsearch.database_logic.database_logic_elasticsearch import ( + DatabaseLogic, + ) from stac_fastapi.elasticsearch.config.config_elasticsearch import SearchSettings - from stac_fastapi.elasticsearch import serializers from stac_fastapi.elasticsearch.models.links import PagingLinks from stac_fastapi.elasticsearch.serializers import CollectionSerializer, ItemSerializer diff --git a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic/database_logic_elasticsearch.py b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic/database_logic_elasticsearch.py index 3e871aec..9646d2e4 100644 --- a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic/database_logic_elasticsearch.py +++ b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic/database_logic_elasticsearch.py @@ -15,9 +15,9 @@ SearchSettings as SyncSearchSettings, ) from stac_fastapi.elasticsearch.extensions import filter +from stac_fastapi.elasticsearch.utilities import bbox2polygon from stac_fastapi.types.errors import ConflictError, NotFoundError from stac_fastapi.types.stac import Collection, Item -from stac_fastapi.elasticsearch.utilities import bbox2polygon logger = logging.getLogger(__name__) diff --git a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic/database_logic_opensearch.py b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic/database_logic_opensearch.py index 360837b4..bff62d80 100644 --- a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic/database_logic_opensearch.py +++ b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic/database_logic_opensearch.py @@ -6,10 +6,10 @@ from typing import Any, Dict, Iterable, List, Optional, Protocol, Tuple, Type, Union import attr -from opensearchpy.helpers.search import Search -from opensearchpy.helpers.query import Q -from opensearchpy import helpers, exceptions +from opensearchpy import exceptions, helpers from opensearchpy.exceptions import TransportError +from opensearchpy.helpers.query import Q +from opensearchpy.helpers.search import Search from stac_fastapi.elasticsearch import serializers from stac_fastapi.elasticsearch.config.config_opensearch import AsyncSearchSettings @@ -17,9 +17,9 @@ SearchSettings as SyncSearchSettings, ) from stac_fastapi.elasticsearch.extensions import filter +from stac_fastapi.elasticsearch.utilities import bbox2polygon from stac_fastapi.types.errors import ConflictError, NotFoundError from stac_fastapi.types.stac import Collection, Item -from stac_fastapi.elasticsearch.utilities import bbox2polygon logger = logging.getLogger(__name__) @@ -184,22 +184,19 @@ async def create_collection_index() -> None: search_body = { "mappings": ES_COLLECTIONS_MAPPINGS, - "aliases":{COLLECTIONS_INDEX: {}} + "aliases": {COLLECTIONS_INDEX: {}}, } index = f"{COLLECTIONS_INDEX}-000001" try: - await client.indices.create( - index=index, - body=search_body - ) + await client.indices.create(index=index, body=search_body) except TransportError as e: if e.status_code == 400: pass # Ignore 400 status codes else: - raise e - + raise e + await client.close() @@ -217,22 +214,19 @@ async def create_item_index(collection_id: str): client = AsyncSearchSettings().create_client index_name = index_by_collection_id(collection_id) search_body = { - "aliases":{index_name: {}}, - "mappings":ES_ITEMS_MAPPINGS, - "settings":ES_ITEMS_SETTINGS, + "aliases": {index_name: {}}, + "mappings": ES_ITEMS_MAPPINGS, + "settings": ES_ITEMS_SETTINGS, } try: - await client.indices.create( - index=f"{index_name}-000001", - body=search_body - ) + await client.indices.create(index=f"{index_name}-000001", body=search_body) except TransportError as e: if e.status_code == 400: pass # Ignore 400 status codes else: - raise e - + raise e + await client.close() @@ -317,7 +311,9 @@ class DatabaseLogic: """CORE LOGIC""" async def get_all_collections( - self, token: Optional[str], limit: int + self, + token: Optional[str], + limit: int, ) -> Iterable[Dict[str, Any]]: """Retrieve a list of all collections from the database. @@ -333,16 +329,15 @@ async def get_all_collections( with the `COLLECTIONS_INDEX` as the target index and `size=limit` to retrieve records. The result is a generator of dictionaries containing the source data for each collection. """ - search_body = {} + search_body: Dict[str, Any] = {} if token: search_after = urlsafe_b64decode(token.encode()).decode().split(",") search_body["search_after"] = search_after + search_body["sort"] = {"id": {"order": "asc"}} - + collections = await self.client.search( - index=COLLECTIONS_INDEX, - body=search_body, - size=limit + index=COLLECTIONS_INDEX, body=search_body, size=limit ) hits = collections["hits"]["hits"] return hits @@ -545,14 +540,14 @@ async def execute_search( Raises: NotFoundError: If the collections specified in `collection_ids` do not exist. """ - search_body = {} + search_body: Dict[str, Any] = {} query = search.query.to_dict() if search.query else None if query: - search_body["query"] = query + search_body["query"] = query if token: search_after = urlsafe_b64decode(token.encode()).decode().split(",") search_body["search_after"] = search_after - search_body["sort"] = sort if sort else DEFAULT_SORT + search_body["sort"] = sort if sort else DEFAULT_SORT index_param = indices(collection_ids) @@ -561,7 +556,7 @@ async def execute_search( index=index_param, ignore_unavailable=ignore_unavailable, body=search_body, - size=limit + size=limit, ) ) diff --git a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/utilities.py b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/utilities.py index 7be319a7..b5dac390 100644 --- a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/utilities.py +++ b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/utilities.py @@ -1,5 +1,11 @@ +"""Module for geospatial processing functions. + +This module contains functions for transforming geospatial coordinates, +such as converting bounding boxes to polygon representations. +""" from typing import List + def bbox2polygon(b0: float, b1: float, b2: float, b3: float) -> List[List[List[float]]]: """Transform a bounding box represented by its four coordinates `b0`, `b1`, `b2`, and `b3` into a polygon. diff --git a/stac_fastapi/elasticsearch/tests/conftest.py b/stac_fastapi/elasticsearch/tests/conftest.py index c18b3dde..c50e5628 100644 --- a/stac_fastapi/elasticsearch/tests/conftest.py +++ b/stac_fastapi/elasticsearch/tests/conftest.py @@ -11,13 +11,18 @@ from stac_fastapi.api.app import StacApi from stac_fastapi.api.models import create_get_request_model, create_post_request_model -backend = os.getenv("BACKEND", "elasticsearch").lower() -if backend == "opensearch": +if os.getenv("BACKEND", "elasticsearch").lower() == "opensearch": from stac_fastapi.elasticsearch.config.config_opensearch import AsyncSearchSettings - from stac_fastapi.elasticsearch.database_logic.database_logic_opensearch import create_collection_index + from stac_fastapi.elasticsearch.database_logic.database_logic_opensearch import ( + create_collection_index, + ) else: - from stac_fastapi.elasticsearch.config.config_elasticsearch import AsyncSearchSettings - from stac_fastapi.elasticsearch.database_logic.database_logic_elasticsearch import create_collection_index + from stac_fastapi.elasticsearch.config.config_elasticsearch import ( + AsyncSearchSettings, + ) + from stac_fastapi.elasticsearch.database_logic.database_logic_elasticsearch import ( + create_collection_index, + ) from stac_fastapi.elasticsearch.core import ( BulkTransactionsClient, From ef27d9663c258ce61cf8a5ac0f122dbe3c26dc9f Mon Sep 17 00:00:00 2001 From: James Date: Wed, 31 Jan 2024 13:03:49 -0500 Subject: [PATCH 10/14] documentation --- .gitignore | 3 --- README.md | 3 +-- 2 files changed, 1 insertion(+), 5 deletions(-) diff --git a/.gitignore b/.gitignore index 5cce79de..81ab98af 100644 --- a/.gitignore +++ b/.gitignore @@ -1,8 +1,5 @@ elasticsearch/snapshots/ -# local testing -DEV.ipynb - # Byte-compiled / optimized / DLL files __pycache__/ *.py[cod] diff --git a/README.md b/README.md index 552ee0f4..9ae86aae 100644 --- a/README.md +++ b/README.md @@ -50,7 +50,7 @@ file named `.env` in the same directory you run docker-compose from: ELASTICSEARCH_VERSION=7.17.1 OPENSEARCH_VERSION=2.11.0 ``` -The most recent 7.x versions should also work. See the [opensearch-py docs](https://github.com/opensearch-project/opensearch-py/blob/main/COMPATIBILITY.md) for compatibility information. +The most recent Elasticsearch 7.x versions should also work. See the [opensearch-py docs](https://github.com/opensearch-project/opensearch-py/blob/main/COMPATIBILITY.md) for compatibility information. To create a new Collection: @@ -92,7 +92,6 @@ Test against Elasticsearch only make test-elasticsearch ``` - ## Ingest sample data ```shell From 61e563a45940b82fd17b31cad221606fccafdb38 Mon Sep 17 00:00:00 2001 From: James Date: Wed, 31 Jan 2024 13:14:25 -0500 Subject: [PATCH 11/14] changelog --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4c3f4a61..ed531e2c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. ### Added +- OpenSearch 2.11.1 support [#187](https://github.com/stac-utils/stac-fastapi-elasticsearch/pull/187) - Advanced comparison (LIKE, IN, BETWEEN) operators to the Filter extension [#178](https://github.com/stac-utils/stac-fastapi-elasticsearch/pull/178) ### Changed From bf1598d6e162a02195a7a38b28187670f20dc581 Mon Sep 17 00:00:00 2001 From: James Date: Fri, 2 Feb 2024 18:54:50 -0500 Subject: [PATCH 12/14] incorporate pr 177 --- CHANGELOG.md | 3 +- docker-compose.yml | 3 +- .../elasticsearch/config/config_opensearch.py | 9 ++++ .../stac_fastapi/elasticsearch/core.py | 1 - .../database_logic_opensearch.py | 47 +++++++++++++++++++ 5 files changed, 60 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e76fecef..36e46fdf 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,8 +9,9 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. ### Added -- OpenSearch 2.11.1 support [#187](https://github.com/stac-utils/stac-fastapi-elasticsearch/pull/187) - Advanced comparison (LIKE, IN, BETWEEN) operators to the Filter extension [#178](https://github.com/stac-utils/stac-fastapi-elasticsearch/pull/178) +- Collection update endpoint no longer delete all sub items [#177](https://github.com/stac-utils/stac-fastapi-elasticsearch/pull/177) +- OpenSearch 2.11.1 support [#187](https://github.com/stac-utils/stac-fastapi-elasticsearch/pull/187) ### Changed diff --git a/docker-compose.yml b/docker-compose.yml index 2010cd08..916b9e82 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -43,7 +43,7 @@ services: - RELOAD=true - ENVIRONMENT=local - WEB_CONCURRENCY=10 - - ES_HOST=172.17.0.1 + - ES_HOST=opensearch - ES_PORT=9202 - ES_USE_SSL=false - ES_VERIFY_CERTS=false @@ -74,6 +74,7 @@ services: opensearch: container_name: os-container image: opensearchproject/opensearch:${OPENSEARCH_VERSION:-2.11.1} + hostname: opensearch environment: - discovery.type=single-node - plugins.security.disabled=true diff --git a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/config/config_opensearch.py b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/config/config_opensearch.py index 643b81ce..6ea49008 100644 --- a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/config/config_opensearch.py +++ b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/config/config_opensearch.py @@ -40,6 +40,15 @@ def _es_config() -> Dict[str, Any]: if (u := os.getenv("ES_USER")) and (p := os.getenv("ES_PASS")): config["http_auth"] = (u, p) + if api_key := os.getenv("ES_API_KEY"): + if isinstance(config["headers"], dict): + headers = {**config["headers"], "x-api-key": api_key} + + else: + config["headers"] = {"x-api-key": api_key} + + config["headers"] = headers + return config diff --git a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/core.py b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/core.py index 202dff6c..ed6d7da9 100644 --- a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/core.py +++ b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/core.py @@ -112,7 +112,6 @@ async def all_collections(self, **kwargs) -> Collections: next_link = None if len(hits) == limit: last_hit = hits[-1] - logger.info(last_hit) next_search_after = last_hit["sort"] next_token = urlsafe_b64encode( ",".join(map(str, next_search_after)).encode() diff --git a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic/database_logic_opensearch.py b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic/database_logic_opensearch.py index bff62d80..6bc3ee95 100644 --- a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic/database_logic_opensearch.py +++ b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic/database_logic_opensearch.py @@ -771,6 +771,53 @@ async def find_collection(self, collection_id: str) -> Collection: raise NotFoundError(f"Collection {collection_id} not found") return collection["_source"] + + async def update_collection( + self, collection_id: str, collection: Collection, refresh: bool = False + ): + """Update a collection from the database. + + Args: + self: The instance of the object calling this function. + collection_id (str): The ID of the collection to be updated. + collection (Collection): The Collection object to be used for the update. + + Raises: + NotFoundError: If the collection with the given `collection_id` is not + found in the database. + + Notes: + This function updates the collection in the database using the specified + `collection_id` and with the collection specified in the `Collection` object. + If the collection is not found, a `NotFoundError` is raised. + """ + await self.find_collection(collection_id=collection_id) + + if collection_id != collection["id"]: + await self.create_collection(collection, refresh=refresh) + + await self.client.reindex( + body={ + "dest": {"index": f"{ITEMS_INDEX_PREFIX}{collection['id']}"}, + "source": {"index": f"{ITEMS_INDEX_PREFIX}{collection_id}"}, + "script": { + "lang": "painless", + "source": f"""ctx._id = ctx._id.replace('{collection_id}', '{collection["id"]}'); ctx._source.collection = '{collection["id"]}' ;""", + }, + }, + wait_for_completion=True, + refresh=refresh, + ) + + await self.delete_collection(collection_id) + + else: + await self.client.index( + index=COLLECTIONS_INDEX, + id=collection_id, + body=collection, + refresh=refresh, + ) async def delete_collection(self, collection_id: str, refresh: bool = False): """Delete a collection from the database. From 22c233ee8fdd8b371a1e79b7070d263bb46813b7 Mon Sep 17 00:00:00 2001 From: James Date: Fri, 2 Feb 2024 18:57:35 -0500 Subject: [PATCH 13/14] linting --- .../elasticsearch/database_logic/database_logic_opensearch.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic/database_logic_opensearch.py b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic/database_logic_opensearch.py index 6bc3ee95..b5ba29db 100644 --- a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic/database_logic_opensearch.py +++ b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic/database_logic_opensearch.py @@ -771,7 +771,7 @@ async def find_collection(self, collection_id: str) -> Collection: raise NotFoundError(f"Collection {collection_id} not found") return collection["_source"] - + async def update_collection( self, collection_id: str, collection: Collection, refresh: bool = False ): From ed469d31dd5303452da3a8abc26e53128590d228 Mon Sep 17 00:00:00 2001 From: James Date: Fri, 2 Feb 2024 19:05:43 -0500 Subject: [PATCH 14/14] update PR number --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 36e46fdf..0df5a582 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,7 +11,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. - Advanced comparison (LIKE, IN, BETWEEN) operators to the Filter extension [#178](https://github.com/stac-utils/stac-fastapi-elasticsearch/pull/178) - Collection update endpoint no longer delete all sub items [#177](https://github.com/stac-utils/stac-fastapi-elasticsearch/pull/177) -- OpenSearch 2.11.1 support [#187](https://github.com/stac-utils/stac-fastapi-elasticsearch/pull/187) +- OpenSearch 2.11.1 support [#188](https://github.com/stac-utils/stac-fastapi-elasticsearch/pull/188) ### Changed