diff --git a/CHANGELOG.md b/CHANGELOG.md index 73c29c2f..f768d50f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. ### Changed - Updated the pip_docker example to use stac-fastapi.elasticsearch 2.1.0 and the elasticsearch 8.11.0 docker image. [#216](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/216) +- Updated the Data Loader CLI tool to accept a base_url, a data directory, a custom collection id, and an option to use bulk insert. [#218](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/218) ### Fixed diff --git a/Makefile b/Makefile index 4dfb2aec..3025a3e4 100644 --- a/Makefile +++ b/Makefile @@ -104,7 +104,3 @@ install-es: pybase-install .PHONY: install-os install-os: pybase-install pip install -e ./stac_fastapi/opensearch[dev,server] - -.PHONY: ingest -ingest: - python3 data_loader/data_loader.py diff --git a/README.md b/README.md index a6d04609..0fe027fd 100644 --- a/README.md +++ b/README.md @@ -6,6 +6,8 @@ - Our Api core library can be used to create custom backends. See [stac-fastapi-mongo](https://github.com/Healy-Hyperspatial/stac-fastapi-mongo) for a working example. - Reach out on our [Gitter](https://app.gitter.im/#/room/#stac-fastapi-elasticsearch_community:gitter.im) channel or feel free to add to our [Discussions](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/discussions) page here on github. +- There is [Postman](https://documenter.getpostman.com/view/12888943/2s8ZDSdRHA) documentation here for examples on how to run some of the API routes locally - after starting the elasticsearch backend via the docker-compose.yml file. +- The `/examples` folder shows an example of running stac-fastapi-elasticsearch from PyPI in docker without needing any code from the repository. There is also a Postman collection here that you can load into Postman for testing the API routes. ### To install from PyPI: @@ -91,6 +93,26 @@ get the next page of results. curl -X "GET" "http://localhost:8080/collections?limit=1&token=example_token" ``` +## Ingesting Sample Data CLI Tool + +```shell +Usage: data_loader.py [OPTIONS] + + Load STAC items into the database. + +Options: + --base-url TEXT Base URL of the STAC API [required] + --collection-id TEXT ID of the collection to which items are added + --use-bulk Use bulk insert method for items + --data-dir PATH Directory containing collection.json and feature + collection file + --help Show this message and exit. +``` + +```shell +python3 data_loader.py --base-url http://localhost:8080 +``` + ## Testing ```shell @@ -108,12 +130,6 @@ Test against Elasticsearch only make test-elasticsearch ``` -## Ingest sample data - -```shell -make ingest -``` - ## Elasticsearch Mappings Mappings apply to search index, not source. The mappings are stored in index templates on application startup. diff --git a/data_loader.py b/data_loader.py new file mode 100644 index 00000000..1cccffd5 --- /dev/null +++ b/data_loader.py @@ -0,0 +1,121 @@ +"""Data Loader CLI STAC_API Ingestion Tool.""" +import json +import os + +import click +import requests + + +def load_data(data_dir, filename): + """Load json data from a file within the specified data directory.""" + filepath = os.path.join(data_dir, filename) + if not os.path.exists(filepath): + click.secho(f"File not found: {filepath}", fg="red", err=True) + raise click.Abort() + with open(filepath) as file: + return json.load(file) + + +def load_collection(base_url, collection_id, data_dir): + """Load a STAC collection into the database.""" + collection = load_data(data_dir, "collection.json") + collection["id"] = collection_id + try: + resp = requests.post(f"{base_url}/collections", json=collection) + if resp.status_code == 200: + click.echo(f"Status code: {resp.status_code}") + click.echo(f"Added collection: {collection['id']}") + elif resp.status_code == 409: + click.echo(f"Status code: {resp.status_code}") + click.echo(f"Collection: {collection['id']} already exists") + except requests.ConnectionError: + click.secho("Failed to connect", fg="red", err=True) + + +def load_items(base_url, collection_id, use_bulk, data_dir): + """Load STAC items into the database based on the method selected.""" + # Attempt to dynamically find a suitable feature collection file + feature_files = [ + file + for file in os.listdir(data_dir) + if file.endswith(".json") and file != "collection.json" + ] + if not feature_files: + click.secho( + "No feature collection files found in the specified directory.", + fg="red", + err=True, + ) + raise click.Abort() + feature_collection_file = feature_files[ + 0 + ] # Use the first found feature collection file + feature_collection = load_data(data_dir, feature_collection_file) + + load_collection(base_url, collection_id, data_dir) + if use_bulk: + load_items_bulk_insert(base_url, collection_id, feature_collection, data_dir) + else: + load_items_one_by_one(base_url, collection_id, feature_collection, data_dir) + + +def load_items_one_by_one(base_url, collection_id, feature_collection, data_dir): + """Load STAC items into the database one by one.""" + for feature in feature_collection["features"]: + try: + feature["collection"] = collection_id + resp = requests.post( + f"{base_url}/collections/{collection_id}/items", json=feature + ) + if resp.status_code == 200: + click.echo(f"Status code: {resp.status_code}") + click.echo(f"Added item: {feature['id']}") + elif resp.status_code == 409: + click.echo(f"Status code: {resp.status_code}") + click.echo(f"Item: {feature['id']} already exists") + except requests.ConnectionError: + click.secho("Failed to connect", fg="red", err=True) + + +def load_items_bulk_insert(base_url, collection_id, feature_collection, data_dir): + """Load STAC items into the database via bulk insert.""" + try: + for i, _ in enumerate(feature_collection["features"]): + feature_collection["features"][i]["collection"] = collection_id + resp = requests.post( + f"{base_url}/collections/{collection_id}/items", json=feature_collection + ) + if resp.status_code == 200: + click.echo(f"Status code: {resp.status_code}") + click.echo("Bulk inserted items successfully.") + elif resp.status_code == 204: + click.echo(f"Status code: {resp.status_code}") + click.echo("Bulk update successful, no content returned.") + elif resp.status_code == 409: + click.echo(f"Status code: {resp.status_code}") + click.echo("Conflict detected, some items might already exist.") + except requests.ConnectionError: + click.secho("Failed to connect", fg="red", err=True) + + +@click.command() +@click.option("--base-url", required=True, help="Base URL of the STAC API") +@click.option( + "--collection-id", + default="test-collection", + help="ID of the collection to which items are added", +) +@click.option("--use-bulk", is_flag=True, help="Use bulk insert method for items") +@click.option( + "--data-dir", + type=click.Path(exists=True), + default="sample_data/", + help="Directory containing collection.json and feature collection file", +) +def main(base_url, collection_id, use_bulk, data_dir): + """Load STAC items into the database.""" + load_items(base_url, collection_id, use_bulk, data_dir) + + +if __name__ == "__main__": + main() diff --git a/data_loader/data_loader.py b/data_loader/data_loader.py deleted file mode 100644 index 315068b6..00000000 --- a/data_loader/data_loader.py +++ /dev/null @@ -1,69 +0,0 @@ -"""Database ingestion script.""" -import json -import os -import sys - -import click -import requests - -if len(sys.argv) != 2: - print("Usage: python data_loader.py ") - sys.exit(1) - -DATA_DIR = os.path.join(os.path.dirname(__file__), "setup_data/") - -backend = sys.argv[1].lower() - -if backend == "opensearch": - STAC_API_BASE_URL = "http://localhost:8082" -elif backend == "elasticsearch": - STAC_API_BASE_URL = "http://localhost:8080" -else: - print("Invalid backend tag. Enter either 'opensearch' or 'elasticsearch'.") - - -def load_data(filename): - """Load json data.""" - with open(os.path.join(DATA_DIR, filename)) as file: - return json.load(file) - - -def load_collection(collection_id): - """Load stac collection into the database.""" - collection = load_data("collection.json") - collection["id"] = collection_id - try: - resp = requests.post(f"{STAC_API_BASE_URL}/collections", json=collection) - if resp.status_code == 200: - print(f"Status code: {resp.status_code}") - print(f"Added collection: {collection['id']}") - elif resp.status_code == 409: - print(f"Status code: {resp.status_code}") - print(f"Collection: {collection['id']} already exists") - except requests.ConnectionError: - click.secho("failed to connect") - - -def load_items(): - """Load stac items into the database.""" - feature_collection = load_data("sentinel-s2-l2a-cogs_0_100.json") - collection = "test-collection" - load_collection(collection) - - for feature in feature_collection["features"]: - try: - feature["collection"] = collection - resp = requests.post( - f"{STAC_API_BASE_URL}/collections/{collection}/items", json=feature - ) - if resp.status_code == 200: - print(f"Status code: {resp.status_code}") - print(f"Added item: {feature['id']}") - elif resp.status_code == 409: - print(f"Status code: {resp.status_code}") - print(f"Item: {feature['id']} already exists") - except requests.ConnectionError: - click.secho("failed to connect") - - -load_items() diff --git a/data_loader/setup_data/collection.json b/sample_data/collection.json similarity index 99% rename from data_loader/setup_data/collection.json rename to sample_data/collection.json index f4b2acdf..dd68234d 100644 --- a/data_loader/setup_data/collection.json +++ b/sample_data/collection.json @@ -1,5 +1,5 @@ { - "id":"sentinel-s2-l2a-cogs", + "id":"sentinel-s2-l2a-cogs-test", "stac_version":"1.0.0", "description":"Sentinel-2a and Sentinel-2b imagery, processed to Level 2A (Surface Reflectance) and converted to Cloud-Optimized GeoTIFFs", "links":[ diff --git a/data_loader/setup_data/sentinel-s2-l2a-cogs_0_100.json b/sample_data/sentinel-s2-l2a-cogs_0_100.json similarity index 100% rename from data_loader/setup_data/sentinel-s2-l2a-cogs_0_100.json rename to sample_data/sentinel-s2-l2a-cogs_0_100.json