Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add notes to help new Users get started/ update data loader tool #218

Merged
merged 9 commits into from
Mar 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
### Changed

- Updated the pip_docker example to use stac-fastapi.elasticsearch 2.1.0 and the elasticsearch 8.11.0 docker image. [#216](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/216)
- Updated the Data Loader CLI tool to accept a base_url, a data directory, a custom collection id, and an option to use bulk insert. [#218](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/218)

### Fixed

Expand Down
4 changes: 0 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,3 @@ install-es: pybase-install
.PHONY: install-os
install-os: pybase-install
pip install -e ./stac_fastapi/opensearch[dev,server]

.PHONY: ingest
ingest:
python3 data_loader/data_loader.py
28 changes: 22 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@

- Our Api core library can be used to create custom backends. See [stac-fastapi-mongo](https://github.com/Healy-Hyperspatial/stac-fastapi-mongo) for a working example.
- Reach out on our [Gitter](https://app.gitter.im/#/room/#stac-fastapi-elasticsearch_community:gitter.im) channel or feel free to add to our [Discussions](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/discussions) page here on github.
- There is [Postman](https://documenter.getpostman.com/view/12888943/2s8ZDSdRHA) documentation here for examples on how to run some of the API routes locally - after starting the elasticsearch backend via the docker-compose.yml file.
- The `/examples` folder shows an example of running stac-fastapi-elasticsearch from PyPI in docker without needing any code from the repository. There is also a Postman collection here that you can load into Postman for testing the API routes.

### To install from PyPI:

Expand Down Expand Up @@ -91,6 +93,26 @@ get the next page of results.
curl -X "GET" "http://localhost:8080/collections?limit=1&token=example_token"
```

## Ingesting Sample Data CLI Tool

```shell
Usage: data_loader.py [OPTIONS]

Load STAC items into the database.

Options:
--base-url TEXT Base URL of the STAC API [required]
--collection-id TEXT ID of the collection to which items are added
--use-bulk Use bulk insert method for items
--data-dir PATH Directory containing collection.json and feature
collection file
--help Show this message and exit.
```

```shell
python3 data_loader.py --base-url http://localhost:8080
```

## Testing

```shell
Expand All @@ -108,12 +130,6 @@ Test against Elasticsearch only
make test-elasticsearch
```

## Ingest sample data

```shell
make ingest
```

## Elasticsearch Mappings

Mappings apply to search index, not source. The mappings are stored in index templates on application startup.
Expand Down
121 changes: 121 additions & 0 deletions data_loader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
"""Data Loader CLI STAC_API Ingestion Tool."""
import json
import os

import click
import requests


def load_data(data_dir, filename):
"""Load json data from a file within the specified data directory."""
filepath = os.path.join(data_dir, filename)
if not os.path.exists(filepath):
click.secho(f"File not found: {filepath}", fg="red", err=True)
raise click.Abort()
with open(filepath) as file:
return json.load(file)


def load_collection(base_url, collection_id, data_dir):
"""Load a STAC collection into the database."""
collection = load_data(data_dir, "collection.json")
collection["id"] = collection_id
try:
resp = requests.post(f"{base_url}/collections", json=collection)
if resp.status_code == 200:
click.echo(f"Status code: {resp.status_code}")
click.echo(f"Added collection: {collection['id']}")
elif resp.status_code == 409:
click.echo(f"Status code: {resp.status_code}")
click.echo(f"Collection: {collection['id']} already exists")
except requests.ConnectionError:
click.secho("Failed to connect", fg="red", err=True)


def load_items(base_url, collection_id, use_bulk, data_dir):
"""Load STAC items into the database based on the method selected."""
# Attempt to dynamically find a suitable feature collection file
feature_files = [
file
for file in os.listdir(data_dir)
if file.endswith(".json") and file != "collection.json"
]
if not feature_files:
click.secho(
"No feature collection files found in the specified directory.",
fg="red",
err=True,
)
raise click.Abort()
feature_collection_file = feature_files[
0
] # Use the first found feature collection file
feature_collection = load_data(data_dir, feature_collection_file)

load_collection(base_url, collection_id, data_dir)
if use_bulk:
load_items_bulk_insert(base_url, collection_id, feature_collection, data_dir)
else:
load_items_one_by_one(base_url, collection_id, feature_collection, data_dir)


def load_items_one_by_one(base_url, collection_id, feature_collection, data_dir):
"""Load STAC items into the database one by one."""
for feature in feature_collection["features"]:
try:
feature["collection"] = collection_id
resp = requests.post(
f"{base_url}/collections/{collection_id}/items", json=feature
)
if resp.status_code == 200:
click.echo(f"Status code: {resp.status_code}")
click.echo(f"Added item: {feature['id']}")
elif resp.status_code == 409:
click.echo(f"Status code: {resp.status_code}")
click.echo(f"Item: {feature['id']} already exists")
except requests.ConnectionError:
click.secho("Failed to connect", fg="red", err=True)


def load_items_bulk_insert(base_url, collection_id, feature_collection, data_dir):
"""Load STAC items into the database via bulk insert."""
try:
for i, _ in enumerate(feature_collection["features"]):
feature_collection["features"][i]["collection"] = collection_id
resp = requests.post(
f"{base_url}/collections/{collection_id}/items", json=feature_collection
)
if resp.status_code == 200:
click.echo(f"Status code: {resp.status_code}")
click.echo("Bulk inserted items successfully.")
elif resp.status_code == 204:
click.echo(f"Status code: {resp.status_code}")
click.echo("Bulk update successful, no content returned.")
elif resp.status_code == 409:
click.echo(f"Status code: {resp.status_code}")
click.echo("Conflict detected, some items might already exist.")
except requests.ConnectionError:
click.secho("Failed to connect", fg="red", err=True)


@click.command()
@click.option("--base-url", required=True, help="Base URL of the STAC API")
@click.option(
"--collection-id",
default="test-collection",
help="ID of the collection to which items are added",
)
@click.option("--use-bulk", is_flag=True, help="Use bulk insert method for items")
@click.option(
"--data-dir",
type=click.Path(exists=True),
default="sample_data/",
help="Directory containing collection.json and feature collection file",
)
def main(base_url, collection_id, use_bulk, data_dir):
"""Load STAC items into the database."""
load_items(base_url, collection_id, use_bulk, data_dir)


if __name__ == "__main__":
main()
69 changes: 0 additions & 69 deletions data_loader/data_loader.py

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"id":"sentinel-s2-l2a-cogs",
"id":"sentinel-s2-l2a-cogs-test",
"stac_version":"1.0.0",
"description":"Sentinel-2a and Sentinel-2b imagery, processed to Level 2A (Surface Reflectance) and converted to Cloud-Optimized GeoTIFFs",
"links":[
Expand Down