You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
If would be great if this backend could be added to the list.
Here is an attempt:
import datetime
import zlib
from typing import Any
import boto3
import msgspec
from aiocache.base import BaseCache
from aiocache.serializers import BaseSerializer
from aiocache.serializers.serializers import _NOT_SET
from boto3.dynamodb.types import Binary
class MsgSpecSerializer(BaseSerializer):
"""Transform data to bytes using msgspec.dumps and msgspec.loads to retrieve it back.
You need to have ``msgspec`` installed in order to be able to use this serializer.
:param encoding: str. Can be used to change encoding param for ``msg.loads`` method.
Default is utf-8.
:param use_list: bool. Can be used to change use_list param for ``msgspec.loads`` method.
Default is True.
"""
def __init__(self, encoding=_NOT_SET, use_list=True, compress=False):
if not msgspec:
raise RuntimeError("msgspec not installed, MsgSpecSerializer unavailable")
self.encoding = self.DEFAULT_ENCODING if encoding is _NOT_SET else encoding
self.use_list = use_list
self.compress = compress
def dumps(self, value) -> bytes:
"""Serialize the received value using ``msgpack.dumps``.
:param value: obj
:returns: bytes
"""
output = msgspec.msgpack.encode(value)
if self.compress:
output = zlib.compress(output)
return output
def loads(self, value: bytes) -> Any:
"""Deserialize value using ``msgpack.loads``.
:param value: bytes
:returns: obj
"""
if value is None:
return None
if isinstance(value, Binary):
value = value.value
if isinstance(value, memoryview):
value = bytes(value)
decoded = msgspec.msgpack.decode(value)
if self.compress:
decoded = zlib.decompress(decoded)
return decoded
class DynamoDBBackend(BaseCache):
def __init__(self, table_name="cache_table", region_name="us-east-1", key_column="CacheKey", val_column="CacheValue", ttl_attribute_name="CacheTTL", **kwargs):
super().__init__(**kwargs)
self.key_column = key_column
self.val_column = val_column
self.ttl_attribute_name = ttl_attribute_name
self.table_name = table_name
self.region_name = region_name
self.resource = boto3.resource("dynamodb", region_name=region_name)
self.table = self.resource.Table(table_name)
async def _get(self, key, encoding="utf-8", _conn=None):
response = self.table.get_item(Key={self.key_column: key})
item = response.get("Item")
if not item:
return None
value = item[self.val_column]
return self.serializer.loads(value)
async def _set(self, key, value, ttl=0, _cas_token=None, _conn=None):
serialized_value = self.serializer.dumps(value)
item = {self.key_column: key, self.val_column: serialized_value}
if ttl > 0:
expires_at = datetime.datetime.now() + datetime.timedelta(seconds=ttl)
item[self.ttl_attribute_name] = int(expires_at.timestamp())
self.table.put_item(Item=item)
return True
async def _delete(self, key, _conn=None):
self.table.delete_item(Key={self.key_column: key})
return True
async def _exists(self, key, _conn=None):
# Assume a keys exists as dynamodb does not have a direct way to check if a key exists
return True
async def _clear(self, namespace=None, _conn=None):
raise NotImplementedError("Clearing all items is not supported for DynamoDB.")
async def _close(self, *args, _conn=None, **kwargs):
pass # No specific close logic required for boto3
def build_key(self, key: str, namespace: str | None = None) -> str:
return self._str_build_key(key, namespace)
class DynamoDBCache(DynamoDBBackend):
"""DynamoDB cache implementation.
With the following components as defaults:
- serializer: :class:`BaseSerializer`
- plugins: []
Config options are:
:param serializer: obj derived from :class:`aiocache.serializers.BaseSerializer`.
:param plugins: list of :class:`aiocache.plugins.BasePlugin` derived classes.
:param namespace: string to use as default prefix for the key used in all operations of
the backend. Default is an empty string, "".
:param timeout: int or float in seconds specifying maximum timeout for the operations to last.
By default its 5.
:param table_name: str with the name of the DynamoDB table. Default is "cache_table".
:param region_name: str with the AWS region for DynamoDB. Default is "us-east-1".
"""
NAME = "dynamodb"
def __init__(self, serializer=None, compress=False, **kwargs):
super().__init__(serializer=serializer or MsgSpecSerializer(compress=compress), **kwargs)
@classmethod
def parse_uri_path(cls, path):
return {}
def __repr__(self):
return f"DynamoDBCache (table_name={self.table_name}, region_name={self.region_name})"
It is important to create a dynamodb table with a TTL column - the default table columns are
key_column: CacheKey
val_column: CacheValue
ttl_attribute_name: CacheTTL
The text was updated successfully, but these errors were encountered:
As I've mentioned elsewhere, it's probably best to create additional backends in separate libraries, rather than us trying to maintain dozens of backends. If you do this, then open an issue/PR to link to your backend from the README, then other users will be able to find it.
If would be great if this backend could be added to the list.
Here is an attempt:
It is important to create a dynamodb table with a TTL column - the default table columns are
CacheKey
CacheValue
CacheTTL
The text was updated successfully, but these errors were encountered: