Skip to content

Commit

Permalink
Merge pull request #218 from stac-utils/readme-update
Browse files Browse the repository at this point in the history
Add notes to help new Users get started/ update data loader tool
  • Loading branch information
jamesfisher-geo authored Mar 29, 2024
2 parents 987f0b9 + 47b77e5 commit 0ac4d2f
Show file tree
Hide file tree
Showing 7 changed files with 145 additions and 80 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
### Changed

- Updated the pip_docker example to use stac-fastapi.elasticsearch 2.1.0 and the elasticsearch 8.11.0 docker image. [#216](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/216)
- Updated the Data Loader CLI tool to accept a base_url, a data directory, a custom collection id, and an option to use bulk insert. [#218](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/218)

### Fixed

Expand Down
4 changes: 0 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,3 @@ install-es: pybase-install
.PHONY: install-os
install-os: pybase-install
pip install -e ./stac_fastapi/opensearch[dev,server]

.PHONY: ingest
ingest:
python3 data_loader/data_loader.py
28 changes: 22 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@

- Our Api core library can be used to create custom backends. See [stac-fastapi-mongo](https://github.com/Healy-Hyperspatial/stac-fastapi-mongo) for a working example.
- Reach out on our [Gitter](https://app.gitter.im/#/room/#stac-fastapi-elasticsearch_community:gitter.im) channel or feel free to add to our [Discussions](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/discussions) page here on github.
- There is [Postman](https://documenter.getpostman.com/view/12888943/2s8ZDSdRHA) documentation here for examples on how to run some of the API routes locally - after starting the elasticsearch backend via the docker-compose.yml file.
- The `/examples` folder shows an example of running stac-fastapi-elasticsearch from PyPI in docker without needing any code from the repository. There is also a Postman collection here that you can load into Postman for testing the API routes.

### To install from PyPI:

Expand Down Expand Up @@ -91,6 +93,26 @@ get the next page of results.
curl -X "GET" "http://localhost:8080/collections?limit=1&token=example_token"
```

## Ingesting Sample Data CLI Tool

```shell
Usage: data_loader.py [OPTIONS]

Load STAC items into the database.

Options:
--base-url TEXT Base URL of the STAC API [required]
--collection-id TEXT ID of the collection to which items are added
--use-bulk Use bulk insert method for items
--data-dir PATH Directory containing collection.json and feature
collection file
--help Show this message and exit.
```

```shell
python3 data_loader.py --base-url http://localhost:8080
```

## Testing

```shell
Expand All @@ -108,12 +130,6 @@ Test against Elasticsearch only
make test-elasticsearch
```

## Ingest sample data

```shell
make ingest
```

## Elasticsearch Mappings

Mappings apply to search index, not source. The mappings are stored in index templates on application startup.
Expand Down
121 changes: 121 additions & 0 deletions data_loader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
"""Data Loader CLI STAC_API Ingestion Tool."""
import json
import os

import click
import requests


def load_data(data_dir, filename):
"""Load json data from a file within the specified data directory."""
filepath = os.path.join(data_dir, filename)
if not os.path.exists(filepath):
click.secho(f"File not found: {filepath}", fg="red", err=True)
raise click.Abort()
with open(filepath) as file:
return json.load(file)


def load_collection(base_url, collection_id, data_dir):
"""Load a STAC collection into the database."""
collection = load_data(data_dir, "collection.json")
collection["id"] = collection_id
try:
resp = requests.post(f"{base_url}/collections", json=collection)
if resp.status_code == 200:
click.echo(f"Status code: {resp.status_code}")
click.echo(f"Added collection: {collection['id']}")
elif resp.status_code == 409:
click.echo(f"Status code: {resp.status_code}")
click.echo(f"Collection: {collection['id']} already exists")
except requests.ConnectionError:
click.secho("Failed to connect", fg="red", err=True)


def load_items(base_url, collection_id, use_bulk, data_dir):
"""Load STAC items into the database based on the method selected."""
# Attempt to dynamically find a suitable feature collection file
feature_files = [
file
for file in os.listdir(data_dir)
if file.endswith(".json") and file != "collection.json"
]
if not feature_files:
click.secho(
"No feature collection files found in the specified directory.",
fg="red",
err=True,
)
raise click.Abort()
feature_collection_file = feature_files[
0
] # Use the first found feature collection file
feature_collection = load_data(data_dir, feature_collection_file)

load_collection(base_url, collection_id, data_dir)
if use_bulk:
load_items_bulk_insert(base_url, collection_id, feature_collection, data_dir)
else:
load_items_one_by_one(base_url, collection_id, feature_collection, data_dir)


def load_items_one_by_one(base_url, collection_id, feature_collection, data_dir):
"""Load STAC items into the database one by one."""
for feature in feature_collection["features"]:
try:
feature["collection"] = collection_id
resp = requests.post(
f"{base_url}/collections/{collection_id}/items", json=feature
)
if resp.status_code == 200:
click.echo(f"Status code: {resp.status_code}")
click.echo(f"Added item: {feature['id']}")
elif resp.status_code == 409:
click.echo(f"Status code: {resp.status_code}")
click.echo(f"Item: {feature['id']} already exists")
except requests.ConnectionError:
click.secho("Failed to connect", fg="red", err=True)


def load_items_bulk_insert(base_url, collection_id, feature_collection, data_dir):
"""Load STAC items into the database via bulk insert."""
try:
for i, _ in enumerate(feature_collection["features"]):
feature_collection["features"][i]["collection"] = collection_id
resp = requests.post(
f"{base_url}/collections/{collection_id}/items", json=feature_collection
)
if resp.status_code == 200:
click.echo(f"Status code: {resp.status_code}")
click.echo("Bulk inserted items successfully.")
elif resp.status_code == 204:
click.echo(f"Status code: {resp.status_code}")
click.echo("Bulk update successful, no content returned.")
elif resp.status_code == 409:
click.echo(f"Status code: {resp.status_code}")
click.echo("Conflict detected, some items might already exist.")
except requests.ConnectionError:
click.secho("Failed to connect", fg="red", err=True)


@click.command()
@click.option("--base-url", required=True, help="Base URL of the STAC API")
@click.option(
"--collection-id",
default="test-collection",
help="ID of the collection to which items are added",
)
@click.option("--use-bulk", is_flag=True, help="Use bulk insert method for items")
@click.option(
"--data-dir",
type=click.Path(exists=True),
default="sample_data/",
help="Directory containing collection.json and feature collection file",
)
def main(base_url, collection_id, use_bulk, data_dir):
"""Load STAC items into the database."""
load_items(base_url, collection_id, use_bulk, data_dir)


if __name__ == "__main__":
main()
69 changes: 0 additions & 69 deletions data_loader/data_loader.py

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"id":"sentinel-s2-l2a-cogs",
"id":"sentinel-s2-l2a-cogs-test",
"stac_version":"1.0.0",
"description":"Sentinel-2a and Sentinel-2b imagery, processed to Level 2A (Surface Reflectance) and converted to Cloud-Optimized GeoTIFFs",
"links":[
Expand Down
File renamed without changes.

0 comments on commit 0ac4d2f

Please sign in to comment.