Skip to content

Commit

Permalink
update persistence
Browse files Browse the repository at this point in the history
  • Loading branch information
trisongz committed Mar 6, 2024
1 parent ae7eb3f commit 06caa96
Show file tree
Hide file tree
Showing 5 changed files with 434 additions and 3 deletions.
170 changes: 170 additions & 0 deletions lazyops/libs/persistence/backends/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -531,3 +531,173 @@ async def areplicate_from(self, source: Any, **kwargs):
Replicates the Cache from a Source
"""
return await ThreadPooler.run_async(self.replicate_from, source, **kwargs)

"""
Math Related Methods
"""

def incr(self, key: str, amount: Union[int, float] = 1, **kwargs) -> Union[int, float]:
"""
[int/float] Increments the value of the key by the given amount
"""
if isinstance(amount, int): return self.incrby(key, amount = amount, **kwargs)
return self.incrbyfloat(key, amount = amount, **kwargs)

def incrby(self, key: str, amount: int = 1, **kwargs) -> int:
"""
[int] Increments the value of the key by the given amount
"""
raise NotImplementedError

def incrbyfloat(self, key: str, amount: float = 1.0, **kwargs) -> float:
"""
[float] Increments the value of the key by the given amount
"""
raise NotImplementedError

async def aincr(self, key: str, amount: Union[int, float] = 1, **kwargs) -> Union[int, float]:
"""
Increments the value of the key by the given amount
"""
if isinstance(amount, int): return await self.aincrby(key, amount = amount, **kwargs)
return await self.aincrbyfloat(key, amount = amount, **kwargs)

async def aincrby(self, key: str, amount: int = 1, **kwargs) -> int:
"""
[int] Increments the value of the key by the given amount
"""
raise NotImplementedError

async def aincrbyfloat(self, key: str, amount: float = 1.0, **kwargs) -> float:
"""
[float] Increments the value of the key by the given amount
"""
raise NotImplementedError

def decr(self, key: str, amount: Union[int, float] = 1, **kwargs) -> Union[int, float]:
"""
[int/float] Decrements the value of the key by the given amount
"""
if isinstance(amount, int): return self.decrby(key, amount = amount, **kwargs)
return self.decrbyfloat(key, amount = amount, **kwargs)

def decrby(self, key: str, amount: int = 1, **kwargs) -> int:
"""
[int] Decrements the value of the key by the given amount
"""
raise NotImplementedError

def decrbyfloat(self, key: str, amount: float = 1.0, **kwargs) -> float:
"""
[float] Decrements the value of the key by the given amount
"""
raise NotImplementedError

async def adecr(self, key: str, amount: Union[int, float] = 1, **kwargs) -> Union[int, float]:
"""
Decrements the value of the key by the given amount
"""
if isinstance(amount, int): return await self.adecrby(key, amount = amount, **kwargs)
return await self.adecrbyfloat(key, amount = amount, **kwargs)

async def adecrby(self, key: str, amount: int = 1, **kwargs) -> int:
"""
[int] Decrements the value of the key by the given amount
"""
raise NotImplementedError

async def adecrbyfloat(self, key: str, amount: float = 1.0, **kwargs) -> float:
"""
[float] Decrements the value of the key by the given amount
"""
raise NotImplementedError

"""
Set Operations
"""

def sadd(self, key: str, *values: Any, **kwargs) -> int:
"""
Adds the value to the set
"""
raise NotImplementedError

async def asadd(self, key: str, *values: Any, **kwargs) -> int:
"""
Adds the value to the set
"""
return await ThreadPooler.run_async(self.sadd, key, *values, **kwargs)

def slength(self, key: str, **kwargs) -> int:
"""
Returns the length of the set
"""
raise NotImplementedError

async def aslength(self, key: str, **kwargs) -> int:
"""
Returns the length of the set
"""
return await ThreadPooler.run_async(self.slength, key, **kwargs)


def sismember(self, key: str, value: Any, **kwargs) -> bool:
"""
Returns whether the value is a member of the set
"""
raise NotImplementedError

async def asismember(self, key: str, value: Any, **kwargs) -> bool:
"""
Returns whether the value is a member of the set
"""
return await ThreadPooler.run_async(self.sismember, key, value, **kwargs)

def smembers(self, key: str, **kwargs) -> List[Any]:
"""
Returns the members of the set
"""
raise NotImplementedError

async def assembers(self, key: str, **kwargs) -> List[Any]:
"""
Returns the members of the set
"""
return await ThreadPooler.run_async(self.smembers, key, **kwargs)

def smismember(self, key: str, *values: Any, **kwargs) -> bool:
"""
Returns whether the values are members of the set
"""
raise NotImplementedError

async def assmismember(self, key: str, *values: Any, **kwargs) -> bool:
"""
Returns whether the values are members of the set
"""
return await ThreadPooler.run_async(self.smismember, key, *values, **kwargs)

def srem(self, key: str, *values: Any, **kwargs) -> int:
"""
Removes the value from the set
"""
raise NotImplementedError

async def assrem(self, key: str, *values: Any, **kwargs) -> int:
"""
Removes the value from the set
"""
return await ThreadPooler.run_async(self.srem, key, *values, **kwargs)

def spop(self, key: str, **kwargs) -> Any:
"""
Removes and returns a random member of the set
"""
raise NotImplementedError

async def aspop(self, key: str, **kwargs) -> Any:
"""
Removes and returns a random member of the set
"""
return await ThreadPooler.run_async(self.spop, key, **kwargs)

77 changes: 76 additions & 1 deletion lazyops/libs/persistence/backends/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,4 +299,79 @@ async def awrite_data(self, data: Dict):
with self.file_lock:
await self.file_path.async_write_text(json.dumps(data, indent = 4, ensure_ascii = False))
self.file_hash = create_unique_id()
await self.file_hash_path.async_write_text(self.file_hash)
await self.file_hash_path.async_write_text(self.file_hash)



def incrby(self, key: str, amount: int = 1, **kwargs) -> int:
# sourcery skip: class-extract-method
"""
[int] Increments the value of the key by the given amount
"""
existing = self.get(key, 0)
value = existing + amount
self.set(key, value)
return value

def incrbyfloat(self, key: str, amount: float = 1.0, **kwargs) -> float:
"""
[float] Increments the value of the key by the given amount
"""
existing = self.get(key, 0.0)
value = existing + amount
self.set(key, value)
return value

async def aincrby(self, key: str, amount: int = 1, **kwargs) -> int:
"""
[int] Increments the value of the key by the given amount
"""
existing = await self.aget(key, 0)
value = existing + amount
await self.aset(key, value)
return value

async def aincrbyfloat(self, key: str, amount: float = 1.0, **kwargs) -> float:
"""
[float] Increments the value of the key by the given amount
"""
existing = await self.aget(key, 0.0)
value = existing + amount
await self.aset(key, value)
return value

def decrby(self, key: str, amount: int = 1, **kwargs) -> int:
"""
[int] Decrements the value of the key by the given amount
"""
existing = self.get(key, 0)
value = existing - amount
self.set(key, value)
return value

def decrbyfloat(self, key: str, amount: float = 1.0, **kwargs) -> float:
"""
[float] Decrements the value of the key by the given amount
"""
existing = self.get(key, 0.0)
value = existing - amount
self.set(key, value)
return value

async def adecrby(self, key: str, amount: int = 1, **kwargs) -> int:
"""
[int] Decrements the value of the key by the given amount
"""
existing = await self.aget(key, 0)
value = existing - amount
await self.aset(key, value)
return value

async def adecrbyfloat(self, key: str, amount: float = 1.0, **kwargs) -> float:
"""
[float] Decrements the value of the key by the given amount
"""
existing = await self.aget(key, 0.0)
value = existing - amount
await self.aset(key, value)
return value
71 changes: 70 additions & 1 deletion lazyops/libs/persistence/backends/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -544,4 +544,73 @@ async def amigrate_compression(self, **kwargs) -> List[str]:

if results: await self.aset_batch(results)
if failed_keys: logger.warning(f'Failed to migrate keys: {failed_keys}')
return failed_keys
return failed_keys

"""
Math Related Methods
"""

def incrby(self, key: str, amount: int = 1, **kwargs) -> int:
"""
[int] Increments the value of the key by the given amount
"""
if self.hset_enabled:
return self.cache.hincrby(self.base_key, key, amount = amount, **kwargs)
return self.cache.incrby(self.get_key(key), amount = amount, **kwargs)

def incrbyfloat(self, key: str, amount: float = 1.0, **kwargs) -> float:
"""
[float] Increments the value of the key by the given amount
"""
if self.hset_enabled:
return self.cache.hincrbyfloat(self.base_key, key, amount = amount, **kwargs)
return self.cache.incrbyfloat(self.get_key(key), amount = amount, **kwargs)

async def aincrby(self, key: str, amount: int = 1, **kwargs) -> int:
"""
[int] Increments the value of the key by the given amount
"""
if self.hset_enabled:
return await self.cache.async_hincrby(self.base_key, key, amount = amount, **kwargs)
return await self.cache.async_incrby(self.get_key(key), amount = amount, **kwargs)

async def aincrbyfloat(self, key: str, amount: float = 1.0, **kwargs) -> float:
"""
[float] Increments the value of the key by the given amount
"""
if self.hset_enabled:
return await self.cache.async_hincrbyfloat(self.base_key, key, amount = amount, **kwargs)
return await self.cache.async_incrbyfloat(self.get_key(key), amount = amount, **kwargs)

def decrby(self, key: str, amount: int = 1, **kwargs) -> int:
"""
[int] Decrements the value of the key by the given amount
"""
if self.hset_enabled:
return self.cache.hincrby(self.base_key, key, amount = (amount * -1), **kwargs)
return self.cache.decrby(self.get_key(key), amount = amount, **kwargs)

def decrbyfloat(self, key: str, amount: float = 1.0, **kwargs) -> float:
"""
[float] Decrements the value of the key by the given amount
"""
if self.hset_enabled:
return self.cache.hincrbyfloat(self.base_key, key, amount = (amount * -1), **kwargs)
return self.cache.incrbyfloat(self.get_key(key), amount = (amount * -1), **kwargs)

async def adecrby(self, key: str, amount: int = 1, **kwargs) -> int:
"""
[int] Decrements the value of the key by the given amount
"""
if self.hset_enabled:
return await self.cache.async_hincrby(self.base_key, key, amount = (amount * -1), **kwargs)
return await self.cache.async_decrby(self.get_key(key), amount = amount, **kwargs)

async def adecrbyfloat(self, key: str, amount: float = 1.0, **kwargs) -> float:
"""
[float] Decrements the value of the key by the given amount
"""
if self.hset_enabled:
return await self.cache.async_hincrbyfloat(self.base_key, key, amount = (amount * -1), **kwargs)
return await self.cache.async_incrbyfloat(self.get_key(key), amount = (amount * -1), **kwargs)

Loading

0 comments on commit 06caa96

Please sign in to comment.