Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pv/one index per collection #97

Merged
merged 4 commits into from
Apr 27, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 11 additions & 5 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,17 @@ repos:
# W503 black conflicts with "line break before operator" rule
# E203 black conflicts with "whitespace before ':'" rule
'--ignore=E501,W503,E203,C901' ]
# - repo: https://github.com/pre-commit/mirrors-mypy
# rev: v0.942
# hooks:
# - id: mypy
# args: [--no-strict-optional, --ignore-missing-imports]
- repo: https://github.com/pre-commit/mirrors-mypy
rev: v0.942
hooks:
- id: mypy
exclude: /tests/
# --strict
args: [--no-strict-optional, --ignore-missing-imports, --implicit-reexport]
additional_dependencies: [
"types-attrs",
"types-requests"
]
- repo: https://github.com/PyCQA/pydocstyle
rev: 6.1.1
hooks:
Expand Down
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.

- Default to Python 3.10
- Default to Elasticsearch 8.x
- Collection objects are not stored in `collections` rather than `stac_collections`
gadomski marked this conversation as resolved.
Show resolved Hide resolved
philvarner marked this conversation as resolved.
Show resolved Hide resolved
- Item objects are no longer stored in `stac_items`, but in indices per collection named `items_{collection_id}`

### Removed

Expand Down
8 changes: 4 additions & 4 deletions stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
CoreClient,
TransactionsClient,
)
from stac_fastapi.elasticsearch.database_logic import create_collection_index
from stac_fastapi.elasticsearch.extensions import QueryExtension
from stac_fastapi.elasticsearch.indexes import IndexesClient
from stac_fastapi.elasticsearch.session import Session
from stac_fastapi.extensions.core import ( # FieldsExtension,
ContextExtension,
Expand Down Expand Up @@ -44,11 +44,11 @@


@app.on_event("startup")
async def _startup_event():
await IndexesClient().create_indexes()
async def _startup_event() -> None:
await create_collection_index()


def run():
def run() -> None:
"""Run app from command line using uvicorn if available."""
try:
import uvicorn
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import os
from typing import Any, Dict, Set

from elasticsearch import AsyncElasticsearch, Elasticsearch
from elasticsearch import AsyncElasticsearch, Elasticsearch # type: ignore
from stac_fastapi.types.config import ApiSettings


Expand Down
68 changes: 40 additions & 28 deletions stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,31 +52,30 @@ class CoreClient(AsyncBaseCoreClient):
async def all_collections(self, **kwargs) -> Collections:
"""Read all collections from the database."""
base_url = str(kwargs["request"].base_url)
collection_list = await self.database.get_all_collections()
collection_list = [
self.collection_serializer.db_to_stac(c, base_url=base_url)
for c in collection_list
]

links = [
{
"rel": Relations.root.value,
"type": MimeTypes.json,
"href": base_url,
},
{
"rel": Relations.parent.value,
"type": MimeTypes.json,
"href": base_url,
},
{
"rel": Relations.self.value,
"type": MimeTypes.json,
"href": urljoin(base_url, "collections"),
},
]

return Collections(collections=collection_list, links=links)
return Collections(
collections=[
self.collection_serializer.db_to_stac(c, base_url=base_url)
for c in await self.database.get_all_collections()
],
links=[
{
"rel": Relations.root.value,
"type": MimeTypes.json,
"href": base_url,
},
{
"rel": Relations.parent.value,
"type": MimeTypes.json,
"href": base_url,
},
{
"rel": Relations.self.value,
"type": MimeTypes.json,
"href": urljoin(base_url, "collections"),
},
],
)

@overrides
async def get_collection(self, collection_id: str, **kwargs) -> Collection:
Expand All @@ -100,6 +99,8 @@ async def item_collection(
limit=limit,
token=token,
sort=None,
collection_ids=[collection_id],
ignore_unavailable=False,
)

items = [
Expand Down Expand Up @@ -276,6 +277,7 @@ async def post_search(
limit=limit,
token=search_request.token, # type: ignore
sort=sort,
collection_ids=search_request.collections,
)

items = [
Expand Down Expand Up @@ -341,8 +343,11 @@ async def create_item(self, item: stac_types.Item, **kwargs) -> stac_types.Item:
processed_items = [
bulk_client.preprocess_item(item, base_url) for item in item["features"] # type: ignore
]

# not a great way to get the collection_id-- should be part of the method signature
collection_id = processed_items[0]["collection"]
await self.database.bulk_async(
processed_items, refresh=kwargs.get("refresh", False)
collection_id, processed_items, refresh=kwargs.get("refresh", False)
)

return None # type: ignore
Expand All @@ -355,12 +360,14 @@ async def create_item(self, item: stac_types.Item, **kwargs) -> stac_types.Item:
async def update_item(self, item: stac_types.Item, **kwargs) -> stac_types.Item:
"""Update item."""
base_url = str(kwargs["request"].base_url)
collection_id = item["collection"]

now = datetime_type.now(timezone.utc).isoformat().replace("+00:00", "Z")
item["properties"]["updated"] = str(now)

await self.database.check_collection_exists(collection_id=item["collection"])
await self.database.check_collection_exists(collection_id)
# todo: index instead of delete and create
await self.delete_item(item_id=item["id"], collection_id=item["collection"])
await self.delete_item(item_id=item["id"], collection_id=collection_id)
gadomski marked this conversation as resolved.
Show resolved Hide resolved
await self.create_item(item=item, **kwargs)

return ItemSerializer.db_to_stac(item, base_url)
Expand Down Expand Up @@ -440,6 +447,11 @@ def bulk_item_insert(
self.preprocess_item(item, base_url) for item in items.items.values()
]

self.database.bulk_sync(processed_items, refresh=kwargs.get("refresh", False))
# not a great way to get the collection_id-- should be part of the method signature
collection_id = processed_items[0]["collection"]

self.database.bulk_sync(
collection_id, processed_items, refresh=kwargs.get("refresh", False)
)

return f"Successfully added {len(processed_items)} Items."
Loading