Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

adding AWS dynamodb support #496

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ wheels/
.installed.cfg
*.egg
.idea/*
*.iml

# PyInstaller
# Usually these files are written by a python script from a template
Expand All @@ -40,6 +41,7 @@ htmlcov/
.tox/
.coverage
.coverage.*
coverage_*
.cache
nosetests.xml
coverage.xml
Expand Down Expand Up @@ -119,4 +121,6 @@ venv.bak/
*.ipynb
*.rdb
/protobuf*
.DS_Store
.DS_Store

pychunkedgraph/tests/docker/
50 changes: 25 additions & 25 deletions pychunkedgraph/graph/chunkedgraph.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,8 @@
from . import attributes
from . import exceptions
from .client import base
from .client import BigTableClient
from .client import BackendClientInfo
from .client import get_default_client_info
from .client import get_default_client_info, get_client_class
from .cache import CacheService
from .meta import ChunkedGraphMeta
from .utils import basetypes
Expand Down Expand Up @@ -43,64 +42,64 @@ def __init__(
3. Existing graphs in other projects/clients,
Requires `graph_id` and `client_info`.
"""
# create client based on type
# for now, just use BigTableClient


# create client based on backend type specified in client_info.TYPE
client_class = get_client_class(client_info)
if meta:
graph_id = meta.graph_config.ID_PREFIX + meta.graph_config.ID
bt_client = BigTableClient(
bt_client = client_class(
graph_id, config=client_info.CONFIG, graph_meta=meta
)
self._meta = meta
else:
bt_client = BigTableClient(graph_id, config=client_info.CONFIG)
bt_client = client_class(graph_id, config=client_info.CONFIG)
self._meta = bt_client.read_graph_meta()

self._client = bt_client
self._id_client = bt_client
self._cache_service = None
self.mock_edges = None # hack for unit tests

@property
def meta(self) -> ChunkedGraphMeta:
return self._meta

@property
def graph_id(self) -> str:
return self.meta.graph_config.ID_PREFIX + self.meta.graph_config.ID

@property
def version(self) -> str:
return self.client.read_graph_version()

@property
def client(self) -> base.SimpleClient:
return self._client

@property
def id_client(self) -> base.ClientWithIDGen:
return self._id_client

@property
def cache(self):
return self._cache_service

@property
def segmentation_resolution(self) -> np.ndarray:
return np.array(self.meta.ws_cv.scale["resolution"])

@cache.setter
def cache(self, cache_service: CacheService):
self._cache_service = cache_service

def create(self):
"""Creates the graph in storage client and stores meta."""
self._client.create_graph(self._meta, version=__version__)

def update_meta(self, meta: ChunkedGraphMeta, overwrite: bool):
"""Update meta of an already existing graph."""
self.client.update_graph_meta(meta, overwrite=overwrite)

def range_read_chunk(
self,
chunk_id: basetypes.CHUNK_ID,
Expand Down Expand Up @@ -163,7 +162,7 @@ def get_atomic_ids_from_coords(
"""
if self.get_chunk_layer(parent_id) == 1:
return np.array([parent_id] * len(coordinates), dtype=np.uint64)

# Enable search with old parent by using its timestamp and map to parents
parent_ts = self.get_node_timestamps([parent_id], return_numpy=False)[0]
return id_helpers.get_atomic_ids_from_coords(
Expand All @@ -175,7 +174,7 @@ def get_atomic_ids_from_coords(
self.get_roots,
max_dist_nm,
)

def get_parents(
self,
node_ids: typing.Sequence[np.uint64],
Expand All @@ -199,7 +198,7 @@ def get_parents(
)
if not parent_rows:
return types.empty_1d

parents = []
if current:
for id_ in node_ids:
Expand All @@ -224,7 +223,7 @@ def get_parents(
raise KeyError from exc
return parents
return self.cache.parents_multiple(node_ids, time_stamp=time_stamp)

def get_parent(
self,
node_id: np.uint64,
Expand All @@ -241,13 +240,14 @@ def get_parent(
end_time=time_stamp,
end_time_inclusive=True,
)

if not parents:
return None
if latest:
return parents[0].value
return [(p.value, p.timestamp) for p in parents]
return self.cache.parent(node_id, time_stamp=time_stamp)

def get_children(
self,
node_id_or_ids: typing.Union[typing.Iterable[np.uint64], np.uint64],
Expand All @@ -274,7 +274,7 @@ def get_children(
return types.empty_1d.copy()
return np.concatenate(list(node_children_d.values()))
return node_children_d

def _get_children_multiple(
self, node_ids: typing.Iterable[np.uint64], *, raw_only=False
) -> typing.Dict:
Expand Down
51 changes: 43 additions & 8 deletions pychunkedgraph/graph/client/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@
Please see `base.py` for more details.
"""

from os import environ
from collections import namedtuple

from .bigtable.client import Client as BigTableClient
from .base import SimpleClient


_backend_clientinfo_fields = ("TYPE", "CONFIG")
Expand All @@ -29,16 +30,50 @@
_backend_clientinfo_fields,
defaults=_backend_clientinfo_defaults,
)

GCP_BIGTABLE_BACKEND_TYPE = "bigtable"
AMAZON_DYNAMODB_BACKEND_TYPE = "amazon.dynamodb"
DEFAULT_BACKEND_TYPE = GCP_BIGTABLE_BACKEND_TYPE
SUPPORTED_BACKEND_TYPES={GCP_BIGTABLE_BACKEND_TYPE, AMAZON_DYNAMODB_BACKEND_TYPE}

def get_default_client_info():
"""
Load client from env variables.
Get backend client type from BACKEND_CLIENT_TYPE env variable.
"""
backend_type_env = environ.get("BACKEND_CLIENT_TYPE", DEFAULT_BACKEND_TYPE)
if backend_type_env == GCP_BIGTABLE_BACKEND_TYPE:
from .bigtable import get_client_info as get_bigtable_client_info
client_info = BackendClientInfo(
TYPE=backend_type_env,
CONFIG=get_bigtable_client_info(admin=True, read_only=False)
)
elif backend_type_env == AMAZON_DYNAMODB_BACKEND_TYPE:
from .amazon.dynamodb import get_client_info as get_amazon_dynamodb_client_info
client_info = BackendClientInfo(
TYPE=backend_type_env,
CONFIG=get_amazon_dynamodb_client_info(admin=True, read_only=False)
)
else:
raise TypeError(f"Client backend {backend_type_env} is not supported, supported backend types: {', '.join(list(SUPPORTED_BACKEND_TYPES))}")
return client_info

def get_client_class(client_info: BackendClientInfo):
if isinstance(client_info.TYPE, SimpleClient):
return client_info.TYPE

if client_info.TYPE is None:
class_type = DEFAULT_BACKEND_TYPE
elif isinstance(client_info.TYPE, str):
class_type = client_info.TYPE
else:
raise TypeError(f"Unsupported client backend {type(client_info.TYPE)}")

# TODO make dynamic after multiple platform support is added
from .bigtable import get_client_info as get_bigtable_client_info
if class_type == GCP_BIGTABLE_BACKEND_TYPE:
from .bigtable.client import Client as BigTableClient
ret_class_type = BigTableClient
elif class_type == AMAZON_DYNAMODB_BACKEND_TYPE:
from .amazon.dynamodb.client import Client as AmazonDynamoDbClient
ret_class_type = AmazonDynamoDbClient
else:
raise TypeError(f"Client backend {class_type} is not supported, supported backend types: {', '.join(list(SUPPORTED_BACKEND_TYPES))}")

return BackendClientInfo(
CONFIG=get_bigtable_client_info(admin=True, read_only=False)
)
return ret_class_type
Empty file.
42 changes: 42 additions & 0 deletions pychunkedgraph/graph/client/amazon/dynamodb/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
from collections import namedtuple
from os import environ

DEFAULT_TABLE_PREFIX = "neuromancer-seung-import.pychunkedgraph"
DEFAULT_AWS_REGION = "us-east-1"

_amazon_dynamodb_config_fields = (
"REGION",
"TABLE_PREFIX",
"ADMIN",
"READ_ONLY",
"END_POINT",
)
_amazon_dynamodb_config_defaults = (
environ.get("AWS_DEFAULT_REGION", DEFAULT_AWS_REGION),
environ.get("AMAZON_DYNAMODB_TABLE_PREFIX", DEFAULT_TABLE_PREFIX),
False,
True,
None,
)
AmazonDynamoDbConfig = namedtuple(
"AmazonDynamoDbConfig", _amazon_dynamodb_config_fields, defaults=_amazon_dynamodb_config_defaults
)


def get_client_info(
region: str = None,
table_prefix: str = None,
admin: bool = False,
read_only: bool = True,
):
"""Helper function to load config from env."""
_region = region if region else environ.get("AWS_DEFAULT_REGION", DEFAULT_AWS_REGION)
_table_prefix = table_prefix if table_prefix else environ.get("AMAZON_DYNAMODB_TABLE_PREFIX", DEFAULT_TABLE_PREFIX)

kwargs = {
"REGION": _region,
"TABLE_PREFIX": _table_prefix,
"ADMIN": admin,
"READ_ONLY": read_only
}
return AmazonDynamoDbConfig(**kwargs)
Loading
Loading