From be524759327d5594bbbf6baf2de41b480f9663ab Mon Sep 17 00:00:00 2001 From: Elton Cardoso do Nascimento <43186596+EltonCN@users.noreply.github.com> Date: Fri, 30 Aug 2024 18:37:39 -0300 Subject: [PATCH 1/6] Mechanism exploration --- dev/memory_storage.ipynb | 465 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 465 insertions(+) create mode 100644 dev/memory_storage.ipynb diff --git a/dev/memory_storage.ipynb b/dev/memory_storage.ipynb new file mode 100644 index 0000000..97486ca --- /dev/null +++ b/dev/memory_storage.ipynb @@ -0,0 +1,465 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": 1, + "metadata": {}, + "outputs": [], + "source": [ + "import redis\n", + "import cst_python as cst\n", + "import json" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": {}, + "outputs": [], + "source": [ + "client = redis.Redis(decode_responses=True)\n", + "pubsub = client.pubsub()" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "True" + ] + }, + "execution_count": 3, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "client.flushall()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Node1 publica que existe" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "1" + ] + }, + "execution_count": 4, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "client.lpush(\"nodes\", \"node1\")" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "metadata": {}, + "outputs": [], + "source": [ + "class MemoryEncoder(json.JSONEncoder):\n", + " def default(self, memory:cst.core.entities.Memory):\n", + " return MemoryEncoder.to_dict(memory)\n", + " \n", + " @staticmethod\n", + " def to_dict(memory:cst.core.entities.Memory):\n", + " data = {\n", + " \"timestamp\": memory.get_timestamp(),\n", + " \"evaluation\": memory.get_evaluation(),\n", + " \"I\": memory.get_info(),\n", + " \"name\": memory.get_name(),\n", + " \"id\": memory.get_id()\n", + " }\n", + "\n", + " return data" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Node1 checa se memória com id \"memory1\" existe. Como não, publica key:" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "metadata": {}, + "outputs": [], + "source": [ + "def update_memory(memory_name, memory_object:cst.MemoryObject, client:redis.Redis):\n", + " timestamp = float(client.hget(f\"memories:{memory_name}\", \"timestamp\"))\n", + " \n", + " if memory_object.timestamp < timestamp:\n", + " print(\"Retrieve update\")\n", + " memory_dict = client.hgetall(f\"memories:{memory_name}\")\n", + "\n", + " memory_object.set_evaluation(float(memory_dict[\"evaluation\"]))\n", + " memory_object.set_name(memory_dict[\"name\"])\n", + " memory_object.set_id(float(memory_dict[\"id\"]))\n", + "\n", + " info_json = memory_dict[\"I\"]\n", + " info = json.loads(info_json)\n", + "\n", + " memory_object.set_info(info)\n", + "\n", + " memory_object.timestamp = float(memory_dict[\"timestamp\"])\n", + " elif memory_object.timestamp > timestamp:\n", + " print(\"Send update\")\n", + " memory_dict = MemoryEncoder.to_dict(memory_object)\n", + " memory_dict[\"I\"] = json.dumps(memory_dict[\"I\"])\n", + "\n", + " client.hset(f\"memories:{memory_name}\", mapping=memory_dict)\n", + " client.publish(f\"memories:{memory_name}:update\", \"\")" + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "metadata": {}, + "outputs": [], + "source": [ + "def create_memory(node, memory_name, client:redis.Redis, pubsub:redis.client.PubSub) -> cst.MemoryObject:\n", + " nodes = client.lrange(\"nodes\", 0, -1)\n", + "\n", + " memory_exist = False\n", + " memory_node = \"\"\n", + "\n", + " for n in nodes:\n", + " if n == node:\n", + " continue\n", + " \n", + " if memory_name in client.lrange(f\"{n}:memories\", 0, -1):\n", + " memory_exist = True\n", + " memory_node = n\n", + "\n", + " break\n", + "\n", + " memory = cst.MemoryObject()\n", + "\n", + " if memory_exist:\n", + " #Copia memória\n", + " print(\"Copia\")\n", + "\n", + " memory_dict = client.hgetall(f\"memories:{memory_name}\")\n", + "\n", + " memory.set_evaluation(float(memory_dict[\"evaluation\"]))\n", + " memory.set_name(memory_dict[\"name\"])\n", + " memory.set_id(float(memory_dict[\"id\"]))\n", + "\n", + " info_json = memory_dict[\"I\"]\n", + " info = json.loads(info_json)\n", + "\n", + " memory.set_info(info)\n", + "\n", + " memory.timestamp = float(memory_dict[\"timestamp\"])\n", + " \n", + " else:\n", + " #Indica que memória existe\n", + " print(\"Cria\")\n", + " client.lpush(f\"{node}:memories\", memory_name)\n", + "\n", + " memory_dict = MemoryEncoder.to_dict(memory)\n", + " memory_dict[\"I\"] = json.dumps(memory_dict[\"I\"])\n", + "\n", + " client.hset(f\"memories:{memory_name}\", mapping=memory_dict)\n", + "\n", + " subscribe_func = lambda message : update_memory(memory_name, memory, client)\n", + " pubsub.subscribe(**{f\"memories:{memory_name}:update\":subscribe_func})\n", + "\n", + " return memory" + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Cria\n" + ] + } + ], + "source": [ + "memory1 = create_memory(\"node1\", \"memory1\", client, pubsub)" + ] + }, + { + "cell_type": "code", + "execution_count": 9, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "MemoryObject [idmemoryobject=0.0, timestamp=0.0, evaluation=0.0, I=None, name=]" + ] + }, + "execution_count": 9, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "memory1" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "node2 entra no jogo" + ] + }, + { + "cell_type": "code", + "execution_count": 10, + "metadata": {}, + "outputs": [], + "source": [ + "client2 = redis.Redis()\n", + "pubsub2 = client2.pubsub()" + ] + }, + { + "cell_type": "code", + "execution_count": 11, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "2" + ] + }, + "execution_count": 11, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "client2.lpush(\"nodes\", \"node2\")" + ] + }, + { + "cell_type": "code", + "execution_count": 12, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "[b'node2', b'node1']" + ] + }, + "execution_count": 12, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "nodes = client2.lrange(\"nodes\", 0, -1)\n", + "nodes" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "node2 tenta criar memória, percebe que existe e sincroniza" + ] + }, + { + "cell_type": "code", + "execution_count": 13, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Cria\n" + ] + }, + { + "data": { + "text/plain": [ + "MemoryObject [idmemoryobject=0.0, timestamp=0.0, evaluation=0.0, I=None, name=]" + ] + }, + "execution_count": 13, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "node2_memory1 = create_memory(\"node2\", \"memory1\", client2, pubsub2)\n", + "\n", + "node2_memory1" + ] + }, + { + "cell_type": "code", + "execution_count": 14, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Cria\n" + ] + }, + { + "data": { + "text/plain": [ + "MemoryObject [idmemoryobject=0.0, timestamp=0.0, evaluation=0.0, I=None, name=]" + ] + }, + "execution_count": 14, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "memory2 = create_memory(\"node2\", \"memory2\", client2, pubsub2)\n", + "\n", + "memory2" + ] + }, + { + "cell_type": "code", + "execution_count": 15, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Send update\n" + ] + } + ], + "source": [ + "node2_memory1.set_info(\"INFO\")\n", + "update_memory(\"memory1\", node2_memory1, client2)" + ] + }, + { + "cell_type": "code", + "execution_count": 16, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "{'type': 'subscribe', 'pattern': None, 'channel': 'memories:memory1:update', 'data': 1}\n", + "Retrieve update\n", + "{'type': 'subscribe', 'pattern': None, 'channel': b'memories:memory1:update', 'data': 1}\n", + "{'type': 'subscribe', 'pattern': None, 'channel': b'memories:memory2:update', 'data': 2}\n" + ] + } + ], + "source": [ + "for _pubsub in [pubsub, pubsub2]:\n", + " msg = _pubsub.get_message()\n", + " while msg is not None:\n", + " print(msg)\n", + " msg = _pubsub.get_message()" + ] + }, + { + "cell_type": "code", + "execution_count": 17, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "{'timestamp': '1725053432.9742534',\n", + " 'evaluation': '0.0',\n", + " 'I': '\"INFO\"',\n", + " 'name': '',\n", + " 'id': '0.0'}" + ] + }, + "execution_count": 17, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "client.hgetall(\"memories:memory1\")" + ] + }, + { + "cell_type": "code", + "execution_count": 18, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "MemoryObject [idmemoryobject=0.0, timestamp=1725053432.9742534, evaluation=0.0, I=INFO, name=]" + ] + }, + "execution_count": 18, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "memory1" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.11.9" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} From d4157264f59f9797f8d68f2087df49d69ce6f8e0 Mon Sep 17 00:00:00 2001 From: Elton Cardoso do Nascimento <43186596+EltonCN@users.noreply.github.com> Date: Fri, 6 Sep 2024 17:25:44 -0300 Subject: [PATCH 2/6] MemoryStorageCodelet --- dev/memory_storage.ipynb | 167 ++++---- dev/memory_storage_codelet.ipynb | 715 +++++++++++++++++++++++++++++++ 2 files changed, 802 insertions(+), 80 deletions(-) create mode 100644 dev/memory_storage_codelet.ipynb diff --git a/dev/memory_storage.ipynb b/dev/memory_storage.ipynb index 97486ca..06854c0 100644 --- a/dev/memory_storage.ipynb +++ b/dev/memory_storage.ipynb @@ -2,7 +2,7 @@ "cells": [ { "cell_type": "code", - "execution_count": 1, + "execution_count": 22, "metadata": {}, "outputs": [], "source": [ @@ -13,7 +13,16 @@ }, { "cell_type": "code", - "execution_count": 2, + "execution_count": 23, + "metadata": {}, + "outputs": [], + "source": [ + "mind_name = \"default_mind\"" + ] + }, + { + "cell_type": "code", + "execution_count": 24, "metadata": {}, "outputs": [], "source": [ @@ -23,7 +32,7 @@ }, { "cell_type": "code", - "execution_count": 3, + "execution_count": 25, "metadata": {}, "outputs": [ { @@ -32,7 +41,7 @@ "True" ] }, - "execution_count": 3, + "execution_count": 25, "metadata": {}, "output_type": "execute_result" } @@ -50,7 +59,7 @@ }, { "cell_type": "code", - "execution_count": 4, + "execution_count": 26, "metadata": {}, "outputs": [ { @@ -59,18 +68,18 @@ "1" ] }, - "execution_count": 4, + "execution_count": 26, "metadata": {}, "output_type": "execute_result" } ], "source": [ - "client.lpush(\"nodes\", \"node1\")" + "client.lpush(f\"{mind_name}:nodes\", \"node1\")" ] }, { "cell_type": "code", - "execution_count": 5, + "execution_count": 27, "metadata": {}, "outputs": [], "source": [ @@ -100,16 +109,16 @@ }, { "cell_type": "code", - "execution_count": 6, + "execution_count": 28, "metadata": {}, "outputs": [], "source": [ "def update_memory(memory_name, memory_object:cst.MemoryObject, client:redis.Redis):\n", - " timestamp = float(client.hget(f\"memories:{memory_name}\", \"timestamp\"))\n", + " timestamp = float(client.hget(f\"{mind_name}:memories:{memory_name}\", \"timestamp\"))\n", " \n", " if memory_object.timestamp < timestamp:\n", " print(\"Retrieve update\")\n", - " memory_dict = client.hgetall(f\"memories:{memory_name}\")\n", + " memory_dict = client.hgetall(f\"{mind_name}:memories:{memory_name}\")\n", "\n", " memory_object.set_evaluation(float(memory_dict[\"evaluation\"]))\n", " memory_object.set_name(memory_dict[\"name\"])\n", @@ -126,39 +135,31 @@ " memory_dict = MemoryEncoder.to_dict(memory_object)\n", " memory_dict[\"I\"] = json.dumps(memory_dict[\"I\"])\n", "\n", - " client.hset(f\"memories:{memory_name}\", mapping=memory_dict)\n", - " client.publish(f\"memories:{memory_name}:update\", \"\")" + " client.hset(f\"{mind_name}:memories:{memory_name}\", mapping=memory_dict)\n", + " client.publish(f\"{mind_name}:memories:{memory_name}:update\", \"\")" ] }, { "cell_type": "code", - "execution_count": 7, + "execution_count": 29, "metadata": {}, "outputs": [], "source": [ "def create_memory(node, memory_name, client:redis.Redis, pubsub:redis.client.PubSub) -> cst.MemoryObject:\n", - " nodes = client.lrange(\"nodes\", 0, -1)\n", - "\n", - " memory_exist = False\n", - " memory_node = \"\"\n", + " memory = cst.MemoryObject()\n", "\n", - " for n in nodes:\n", - " if n == node:\n", - " continue\n", - " \n", - " if memory_name in client.lrange(f\"{n}:memories\", 0, -1):\n", - " memory_exist = True\n", - " memory_node = n\n", + " if client.exists(f\"{mind_name}:memories:{memory_name}\"):\n", + " memory_dict = client.hgetall(f\"{mind_name}:memories:{memory_name}\")\n", "\n", - " break\n", + " if memory_dict[\"owner\"] != \"\":\n", + " #Solicita memória\n", + " pass\n", "\n", - " memory = cst.MemoryObject()\n", "\n", - " if memory_exist:\n", " #Copia memória\n", " print(\"Copia\")\n", "\n", - " memory_dict = client.hgetall(f\"memories:{memory_name}\")\n", + " memory_dict = client.hgetall(f\"{mind_name}:memories:{memory_name}\")\n", "\n", " memory.set_evaluation(float(memory_dict[\"evaluation\"]))\n", " memory.set_name(memory_dict[\"name\"])\n", @@ -170,7 +171,6 @@ " memory.set_info(info)\n", "\n", " memory.timestamp = float(memory_dict[\"timestamp\"])\n", - " \n", " else:\n", " #Indica que memória existe\n", " print(\"Cria\")\n", @@ -178,18 +178,19 @@ "\n", " memory_dict = MemoryEncoder.to_dict(memory)\n", " memory_dict[\"I\"] = json.dumps(memory_dict[\"I\"])\n", + " memory_dict[\"owner\"] = \"\" #node\n", "\n", - " client.hset(f\"memories:{memory_name}\", mapping=memory_dict)\n", + " client.hset(f\"{mind_name}:memories:{memory_name}\", mapping=memory_dict)\n", "\n", " subscribe_func = lambda message : update_memory(memory_name, memory, client)\n", - " pubsub.subscribe(**{f\"memories:{memory_name}:update\":subscribe_func})\n", + " pubsub.subscribe(**{f\"{mind_name}:memories:{memory_name}:update\":subscribe_func})\n", "\n", " return memory" ] }, { "cell_type": "code", - "execution_count": 8, + "execution_count": 30, "metadata": {}, "outputs": [ { @@ -206,7 +207,7 @@ }, { "cell_type": "code", - "execution_count": 9, + "execution_count": 31, "metadata": {}, "outputs": [ { @@ -215,7 +216,7 @@ "MemoryObject [idmemoryobject=0.0, timestamp=0.0, evaluation=0.0, I=None, name=]" ] }, - "execution_count": 9, + "execution_count": 31, "metadata": {}, "output_type": "execute_result" } @@ -233,17 +234,17 @@ }, { "cell_type": "code", - "execution_count": 10, + "execution_count": 32, "metadata": {}, "outputs": [], "source": [ - "client2 = redis.Redis()\n", + "client2 = redis.Redis(decode_responses=True)\n", "pubsub2 = client2.pubsub()" ] }, { "cell_type": "code", - "execution_count": 11, + "execution_count": 33, "metadata": {}, "outputs": [ { @@ -252,33 +253,33 @@ "2" ] }, - "execution_count": 11, + "execution_count": 33, "metadata": {}, "output_type": "execute_result" } ], "source": [ - "client2.lpush(\"nodes\", \"node2\")" + "client2.lpush(f\"{mind_name}:nodes\", \"node2\")" ] }, { "cell_type": "code", - "execution_count": 12, + "execution_count": 34, "metadata": {}, "outputs": [ { "data": { "text/plain": [ - "[b'node2', b'node1']" + "['node2', 'node1']" ] }, - "execution_count": 12, + "execution_count": 34, "metadata": {}, "output_type": "execute_result" } ], "source": [ - "nodes = client2.lrange(\"nodes\", 0, -1)\n", + "nodes = client2.lrange(f\"{mind_name}:nodes\", 0, -1)\n", "nodes" ] }, @@ -291,14 +292,39 @@ }, { "cell_type": "code", - "execution_count": 13, + "execution_count": 35, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "{'timestamp': '0.0',\n", + " 'evaluation': '0.0',\n", + " 'I': 'null',\n", + " 'name': '',\n", + " 'id': '0.0',\n", + " 'owner': ''}" + ] + }, + "execution_count": 35, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "client.hgetall(f\"{mind_name}:memories:memory1\")" + ] + }, + { + "cell_type": "code", + "execution_count": 36, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ - "Cria\n" + "Copia\n" ] }, { @@ -307,7 +333,7 @@ "MemoryObject [idmemoryobject=0.0, timestamp=0.0, evaluation=0.0, I=None, name=]" ] }, - "execution_count": 13, + "execution_count": 36, "metadata": {}, "output_type": "execute_result" } @@ -320,7 +346,7 @@ }, { "cell_type": "code", - "execution_count": 14, + "execution_count": 37, "metadata": {}, "outputs": [ { @@ -336,7 +362,7 @@ "MemoryObject [idmemoryobject=0.0, timestamp=0.0, evaluation=0.0, I=None, name=]" ] }, - "execution_count": 14, + "execution_count": 37, "metadata": {}, "output_type": "execute_result" } @@ -349,7 +375,7 @@ }, { "cell_type": "code", - "execution_count": 15, + "execution_count": 38, "metadata": {}, "outputs": [ { @@ -367,17 +393,17 @@ }, { "cell_type": "code", - "execution_count": 16, + "execution_count": 39, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ - "{'type': 'subscribe', 'pattern': None, 'channel': 'memories:memory1:update', 'data': 1}\n", + "{'type': 'subscribe', 'pattern': None, 'channel': 'default_mind:memories:memory1:update', 'data': 1}\n", "Retrieve update\n", - "{'type': 'subscribe', 'pattern': None, 'channel': b'memories:memory1:update', 'data': 1}\n", - "{'type': 'subscribe', 'pattern': None, 'channel': b'memories:memory2:update', 'data': 2}\n" + "{'type': 'subscribe', 'pattern': None, 'channel': 'default_mind:memories:memory1:update', 'data': 1}\n", + "{'type': 'subscribe', 'pattern': None, 'channel': 'default_mind:memories:memory2:update', 'data': 2}\n" ] } ], @@ -391,51 +417,32 @@ }, { "cell_type": "code", - "execution_count": 17, + "execution_count": 40, "metadata": {}, "outputs": [ { "data": { "text/plain": [ - "{'timestamp': '1725053432.9742534',\n", + "{'timestamp': '1725638895.8791993',\n", " 'evaluation': '0.0',\n", " 'I': '\"INFO\"',\n", " 'name': '',\n", - " 'id': '0.0'}" + " 'id': '0.0',\n", + " 'owner': ''}" ] }, - "execution_count": 17, + "execution_count": 40, "metadata": {}, "output_type": "execute_result" } ], "source": [ - "client.hgetall(\"memories:memory1\")" - ] - }, - { - "cell_type": "code", - "execution_count": 18, - "metadata": {}, - "outputs": [ - { - "data": { - "text/plain": [ - "MemoryObject [idmemoryobject=0.0, timestamp=1725053432.9742534, evaluation=0.0, I=INFO, name=]" - ] - }, - "execution_count": 18, - "metadata": {}, - "output_type": "execute_result" - } - ], - "source": [ - "memory1" + "client.hgetall(f\"{mind_name}:memories:memory1\")" ] }, { "cell_type": "code", - "execution_count": null, + "execution_count": 42, "metadata": {}, "outputs": [], "source": [] diff --git a/dev/memory_storage_codelet.ipynb b/dev/memory_storage_codelet.ipynb new file mode 100644 index 0000000..581e762 --- /dev/null +++ b/dev/memory_storage_codelet.ipynb @@ -0,0 +1,715 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": 1, + "metadata": {}, + "outputs": [], + "source": [ + "import json\n", + "import weakref\n", + "import json\n", + "import asyncio\n", + "import threading\n", + "from concurrent.futures import ThreadPoolExecutor\n", + "from typing import Optional, cast, List\n", + "\n", + "import redis\n", + "\n", + "\n", + "import cst_python as cst\n", + "from cst_python.core.entities import Memory, Mind" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "True" + ] + }, + "execution_count": 2, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "client = redis.Redis(decode_responses=True)\n", + "client.flushall()" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": {}, + "outputs": [], + "source": [ + "class MemoryEncoder(json.JSONEncoder):\n", + " def default(self, memory:cst.core.entities.Memory):\n", + " return MemoryEncoder.to_dict(memory)\n", + " \n", + " @staticmethod\n", + " def to_dict(memory:cst.core.entities.Memory):\n", + " data = {\n", + " \"timestamp\": memory.get_timestamp(),\n", + " \"evaluation\": memory.get_evaluation(),\n", + " \"I\": memory.get_info(),\n", + " \"name\": memory.get_name(),\n", + " \"id\": memory.get_id()\n", + " }\n", + "\n", + " return data" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "metadata": {}, + "outputs": [], + "source": [ + "class MemoryStorageCodelet:\n", + " def __init__(self, mind:Mind, node_name:Optional[str]=None, mind_name:Optional[str]=None, request_timeout:float=500e-3) -> None:\n", + " self._mind = mind\n", + " self._request_timeout = request_timeout\n", + " \n", + " if mind_name is None:\n", + " mind_name = \"default_mind\"\n", + " self._mind_name = cast(str, mind_name)\n", + " \n", + " self._memories : weakref.WeakValueDictionary[str, Memory] = weakref.WeakValueDictionary()\n", + " \n", + " self._client = redis.Redis(decode_responses=True)\n", + " self._pubsub = self._client.pubsub()\n", + " self._pubsub_thread : redis.client.PubSubWorkerThread = self._pubsub.run_in_thread()\n", + "\n", + " if node_name is None:\n", + " node_number = self._client.scard(f\"{mind_name}:nodes\")\n", + "\n", + " node_name = f\"node{node_number}\"\n", + " while self._client.sismember(f\"{mind_name}:nodes\", node_name):\n", + " node_number += 1\n", + " node_name = f\"node{node_number}\"\n", + "\n", + " self._node_name = cast(str, node_name)\n", + "\n", + " self._client.sadd(f\"{mind_name}:nodes\", node_name)\n", + "\n", + " transfer_service_addr = f\"{self._mind_name}:nodes:{node_name}:transfer_memory\"\n", + " self._pubsub.subscribe(**{transfer_service_addr:self.transfer_memory})\n", + "\n", + " transfer_done_addr = f\"{self._mind_name}:nodes:{node_name}:transfer_done\"\n", + " self._pubsub.subscribe(**{transfer_done_addr:self.notify_transfer})\n", + "\n", + " self._last_update : dict[str, float] = {}\n", + " self._waiting_retrieve : set[str] = set()\n", + " \n", + " self._retrieve_executor = ThreadPoolExecutor(3)\n", + "\n", + " self._waiting_request_events : dict[str, threading.Event] = {}\n", + "\n", + " self._request = None\n", + "\n", + " def proc(self) -> None:\n", + " \n", + " #Check new memories\n", + "\n", + " mind_memories = {}\n", + " for memory in self._mind.raw_memory.all_memories:\n", + " if memory.get_name() == \"\": #No name -> No MS\n", + " continue\n", + "\n", + " mind_memories[memory.get_name()] = memory\n", + "\n", + " mind_memories_names = set(mind_memories.keys())\n", + " memories_names = set(self._memories.keys())\n", + "\n", + " #Check only not here (memories_names not in mind should be garbage collected)\n", + " difference = mind_memories_names - memories_names\n", + " for memory_name in difference:\n", + " memory : Memory = mind_memories[memory_name]\n", + " self._memories[memory_name] = memory\n", + "\n", + " if self._client.exists(f\"{self._mind_name}:memories:{memory_name}\"):\n", + " self._retrieve_executor.submit(self.retrieve_memory, memory)\n", + " \n", + " else: #Send impostor with owner\n", + " memory_impostor = {\"name\":memory.get_name(),\n", + " \"evalution\" : 0.0,\n", + " \"I\": \"\",\n", + " \"id\" : \"0.0\",\n", + " \"owner\": self._node_name}\n", + " \n", + " self._client.hset(f\"{self._mind_name}:memories:{memory_name}\", mapping=memory_impostor)\n", + "\n", + " subscribe_func = lambda message : self.update_memory(memory_name)\n", + " self._pubsub.subscribe(**{f\"{self._mind_name}:memories:{memory_name}:update\":subscribe_func})\n", + "\n", + " #Update memories\n", + " to_update = self._last_update.keys()\n", + " for memory_name in to_update:\n", + " if memory_name not in self._memories:\n", + " del self._last_update[memory_name]\n", + " continue\n", + "\n", + " memory = self._memories[memory_name]\n", + " if memory.get_timestamp() > self._last_update[memory_name]:\n", + " self.update_memory(memory_name)\n", + "\n", + " def transfer_memory(self, message) -> None:\n", + " request = json.loads(message[\"data\"])\n", + " \n", + " memory_name = request[\"memory_name\"]\n", + " requesting_node = request[\"node\"]\n", + "\n", + " print(self._node_name, \"Tranfering\", memory_name)\n", + "\n", + " if memory_name in self._memories:\n", + " memory = self._memories[memory_name]\n", + " else:\n", + " memory = cst.MemoryObject()\n", + " memory.set_name(memory_name)\n", + " \n", + " self.send_memory(memory)\n", + "\n", + " response_addr = f\"{self._mind_name}:nodes:{requesting_node}:transfer_done\"\n", + " self._client.publish(response_addr, memory_name)\n", + "\n", + "\n", + " def send_memory(self, memory:Memory) -> None:\n", + " memory_name = memory.get_name()\n", + " print(self._node_name, \"Send memory\", memory_name)\n", + " \n", + " memory_dict = MemoryEncoder.to_dict(memory)\n", + " memory_dict[\"I\"] = json.dumps(memory_dict[\"I\"])\n", + " memory_dict[\"owner\"] = \"\"\n", + "\n", + "\n", + " self._client.hset(f\"{self._mind_name}:memories:{memory_name}\", mapping=memory_dict)\n", + " self._client.publish(f\"{self._mind_name}:memories:{memory_name}:update\", \"\")\n", + "\n", + " self._last_update[memory_name] = memory.get_timestamp()\n", + " \n", + " def update_memory(self, memory_name:str) -> None:\n", + " print(self._node_name, \"Updating memory\", memory_name)\n", + "\n", + " timestamp = float(self._client.hget(f\"{self._mind_name}:memories:{memory_name}\", \"timestamp\"))\n", + " memory = self._memories[memory_name]\n", + " memory_timestamp = memory.get_timestamp()\n", + " \n", + " if memory_timestamp < timestamp:\n", + " self._retrieve_executor.submit(self.retrieve_memory, memory)\n", + "\n", + " elif memory_timestamp> timestamp:\n", + " self.send_memory(memory)\n", + "\n", + " self._last_update[memory_name] = memory.get_timestamp()\n", + "\n", + " def retrieve_memory(self, memory:Memory) -> None:\n", + " memory_name = memory.get_name()\n", + "\n", + " print(self._node_name, \"Retrieve\", memory_name)\n", + "\n", + " if memory_name in self._waiting_retrieve:\n", + " return\n", + " self._waiting_retrieve.add(memory_name)\n", + "\n", + " memory_dict = self._client.hgetall(f\"{self._mind_name}:memories:{memory_name}\")\n", + "\n", + " if memory_dict[\"owner\"] != \"\":\n", + " event = threading.Event()\n", + " self._waiting_request_events[memory_name] = event\n", + " self.request_memory(memory_name, memory_dict[\"owner\"])\n", + "\n", + " if not event.wait(timeout=self._request_timeout):\n", + " print(self._node_name, \"Request failed\", memory_name)\n", + " #Request failed\n", + " self.send_memory(memory)\n", + " return \n", + " \n", + " memory_dict = self._client.hgetall(f\"{self._mind_name}:memories:{memory_name}\")\n", + "\n", + " memory.set_evaluation(float(memory_dict[\"evaluation\"]))\n", + " memory.set_id(float(memory_dict[\"id\"]))\n", + "\n", + " info_json = memory_dict[\"I\"]\n", + " info = json.loads(info_json)\n", + "\n", + " print(self._node_name, \"INFO\", info, info_json)\n", + "\n", + " memory.set_info(info)\n", + "\n", + " self._last_update[memory_name] = memory.get_timestamp()\n", + "\n", + " self._waiting_retrieve.remove(memory_name)\n", + "\n", + " def request_memory(self, memory_name:str, owner_name:str):\n", + " print(self._node_name, \"Requesting\", memory_name)\n", + "\n", + " request_addr = f\"{self._mind_name}:nodes:{owner_name}:transfer_memory\"\n", + " \n", + " request_dict = {\"memory_name\":memory_name, \"node\":self._node_name}\n", + " request = json.dumps(request_dict)\n", + " self._client.publish(request_addr, request)\n", + "\n", + " def notify_transfer(self, message:str) -> None:\n", + " memory_name = message[\"data\"]\n", + " if memory_name in self._waiting_request_events:\n", + " event = self._waiting_request_events[memory_name]\n", + " event.set()\n", + " del self._waiting_request_events[memory_name]\n", + "\n", + " def __del__(self) -> None:\n", + " self._pubsub_thread.stop()" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "metadata": {}, + "outputs": [], + "source": [ + "mind = cst.Mind()\n", + "memory1 = mind.create_memory_object(\"Memory1\", \"\")" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "metadata": {}, + "outputs": [], + "source": [ + "ms_codelet = MemoryStorageCodelet(mind, \"node0\")" + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "metadata": {}, + "outputs": [], + "source": [ + "ms_codelet.proc()" + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "{'node0'}" + ] + }, + "execution_count": 8, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "client.smembers(\"default_mind:nodes\")" + ] + }, + { + "cell_type": "code", + "execution_count": 9, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "{'name': 'Memory1', 'evalution': '0.0', 'I': '', 'id': '0.0', 'owner': 'node0'}" + ] + }, + "execution_count": 9, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "client.hgetall(\"default_mind:memories:Memory1\")" + ] + }, + { + "cell_type": "code", + "execution_count": 10, + "metadata": {}, + "outputs": [], + "source": [ + "mind2 = cst.Mind()\n", + "mind2_memory1 = mind2.create_memory_object(\"Memory1\", \"\")\n", + "mind2_ms_codelet = MemoryStorageCodelet(mind2)" + ] + }, + { + "cell_type": "code", + "execution_count": 11, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "MemoryObject [idmemoryobject=0, timestamp=1725654264.298408, evaluation=0.0, I=, name=Memory1]" + ] + }, + "execution_count": 11, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "mind2_memory1" + ] + }, + { + "cell_type": "code", + "execution_count": 12, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "'node1'" + ] + }, + "execution_count": 12, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "mind2_ms_codelet._node_name" + ] + }, + { + "cell_type": "code", + "execution_count": 13, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "{'node0', 'node1'}" + ] + }, + "execution_count": 13, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "client.smembers(\"default_mind:nodes\")" + ] + }, + { + "cell_type": "code", + "execution_count": 14, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "node1 Retrieve Memory1\n", + "node1 Requesting Memory1\n", + "node0 Tranfering Memory1\n", + "node0 Send memory Memory1\n" + ] + } + ], + "source": [ + "mind2_ms_codelet.proc()" + ] + }, + { + "cell_type": "code", + "execution_count": 15, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "node1 Updating memory Memory1\n", + "node0 Updating memory Memory1\n", + "node1 Send memory Memory1\n", + "node0 Updating memory Memory1\n", + "node1 Updating memory Memory1\n", + "node0 Retrieve Memory1\n", + "node0 INFO \"\"\n", + "node1 INFO \"\"\n" + ] + }, + { + "data": { + "text/plain": [ + "MemoryObject [idmemoryobject=0.0, timestamp=1725654264.7690487, evaluation=0.0, I=, name=Memory1]" + ] + }, + "execution_count": 15, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "mind2_memory1" + ] + }, + { + "cell_type": "code", + "execution_count": 16, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "{'name': 'Memory1',\n", + " 'evalution': '0.0',\n", + " 'I': '\"\"',\n", + " 'id': '0',\n", + " 'owner': '',\n", + " 'timestamp': '1725654264.298408',\n", + " 'evaluation': '0.0'}" + ] + }, + "execution_count": 16, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "client.hgetall(\"default_mind:memories:Memory1\")" + ] + }, + { + "cell_type": "code", + "execution_count": 17, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "-1" + ] + }, + "execution_count": 17, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "memory1.set_info(\"INFO\")" + ] + }, + { + "cell_type": "code", + "execution_count": 18, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "node0 Updating memory Memory1\n", + "node0 Send memory Memory1\n", + "node1 Updating memory Memory1\n", + "node0 Updating memory Memory1\n", + "node1 Retrieve Memory1\n" + ] + } + ], + "source": [ + "ms_codelet.proc()" + ] + }, + { + "cell_type": "code", + "execution_count": 19, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "node1 INFO INFO \"INFO\"\n" + ] + }, + { + "data": { + "text/plain": [ + "{'name': 'Memory1',\n", + " 'evalution': '0.0',\n", + " 'I': '\"INFO\"',\n", + " 'id': '0.0',\n", + " 'owner': '',\n", + " 'timestamp': '1725654264.794752',\n", + " 'evaluation': '0.0'}" + ] + }, + "execution_count": 19, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "client.hgetall(\"default_mind:memories:Memory1\")" + ] + }, + { + "cell_type": "code", + "execution_count": 20, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "MemoryObject [idmemoryobject=0.0, timestamp=1725654264.8138657, evaluation=0.0, I=INFO, name=Memory1]" + ] + }, + "execution_count": 20, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "mind2_memory1" + ] + }, + { + "cell_type": "code", + "execution_count": 21, + "metadata": {}, + "outputs": [], + "source": [ + "mind2_ms_codelet.proc()" + ] + }, + { + "cell_type": "code", + "execution_count": 22, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "{'name': 'Memory1',\n", + " 'evalution': '0.0',\n", + " 'I': '\"INFO\"',\n", + " 'id': '0.0',\n", + " 'owner': '',\n", + " 'timestamp': '1725654264.794752',\n", + " 'evaluation': '0.0'}" + ] + }, + "execution_count": 22, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "client.hgetall(\"default_mind:memories:Memory1\")" + ] + }, + { + "cell_type": "code", + "execution_count": 23, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "node1 Updating memory Memory1\n", + "node1 Send memory Memory1\n", + "node0 Updating memory Memory1\n", + "node1 Updating memory Memory1\n", + "node0 Retrieve Memory1\n", + "node0 INFO INFO2 \"INFO2\"\n" + ] + } + ], + "source": [ + "mind2_memory1.set_info(\"INFO2\")\n", + "mind2_ms_codelet.proc()" + ] + }, + { + "cell_type": "code", + "execution_count": 24, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "{'name': 'Memory1',\n", + " 'evalution': '0.0',\n", + " 'I': '\"INFO2\"',\n", + " 'id': '0.0',\n", + " 'owner': '',\n", + " 'timestamp': '1725654302.9360394',\n", + " 'evaluation': '0.0'}" + ] + }, + "execution_count": 24, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "client.hgetall(\"default_mind:memories:Memory1\")" + ] + }, + { + "cell_type": "code", + "execution_count": 25, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "MemoryObject [idmemoryobject=0.0, timestamp=1725654302.943039, evaluation=0.0, I=INFO2, name=Memory1]" + ] + }, + "execution_count": 25, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "memory1" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.11.9" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} From 837723d9d2d54a078dc08e06ac97acf125d75803 Mon Sep 17 00:00:00 2001 From: Elton Cardoso do Nascimento <43186596+EltonCN@users.noreply.github.com> Date: Fri, 13 Sep 2024 11:12:31 -0300 Subject: [PATCH 3/6] Convert MS to Codelet --- dev/memory_storage_codelet.ipynb | 294 ++++++++++++++++++++++++------- dev/weak_test.py | 18 ++ 2 files changed, 249 insertions(+), 63 deletions(-) create mode 100644 dev/weak_test.py diff --git a/dev/memory_storage_codelet.ipynb b/dev/memory_storage_codelet.ipynb index 581e762..d90f888 100644 --- a/dev/memory_storage_codelet.ipynb +++ b/dev/memory_storage_codelet.ipynb @@ -9,14 +9,13 @@ "import json\n", "import weakref\n", "import json\n", - "import asyncio\n", "import threading\n", + "import time\n", "from concurrent.futures import ThreadPoolExecutor\n", "from typing import Optional, cast, List\n", "\n", "import redis\n", "\n", - "\n", "import cst_python as cst\n", "from cst_python.core.entities import Memory, Mind" ] @@ -71,8 +70,10 @@ "metadata": {}, "outputs": [], "source": [ - "class MemoryStorageCodelet:\n", + "class MemoryStorageCodelet(cst.Codelet):\n", " def __init__(self, mind:Mind, node_name:Optional[str]=None, mind_name:Optional[str]=None, request_timeout:float=500e-3) -> None:\n", + " super().__init__()\n", + " \n", " self._mind = mind\n", " self._request_timeout = request_timeout\n", " \n", @@ -113,6 +114,12 @@ "\n", " self._request = None\n", "\n", + " def calculate_activation(self) -> None:\n", + " pass\n", + "\n", + " def access_memory_objects(self) -> None:\n", + " pass\n", + "\n", " def proc(self) -> None:\n", " \n", " #Check new memories\n", @@ -138,7 +145,7 @@ " \n", " else: #Send impostor with owner\n", " memory_impostor = {\"name\":memory.get_name(),\n", - " \"evalution\" : 0.0,\n", + " \"evaluation\" : 0.0,\n", " \"I\": \"\",\n", " \"id\" : \"0.0\",\n", " \"owner\": self._node_name}\n", @@ -196,6 +203,9 @@ " def update_memory(self, memory_name:str) -> None:\n", " print(self._node_name, \"Updating memory\", memory_name)\n", "\n", + " if memory_name not in self._memories:\n", + " self._pubsub.unsubscribe(f\"{self._mind_name}:memories:{memory_name}:update\")\n", + "\n", " timestamp = float(self._client.hget(f\"{self._mind_name}:memories:{memory_name}\", \"timestamp\"))\n", " memory = self._memories[memory_name]\n", " memory_timestamp = memory.get_timestamp()\n", @@ -263,7 +273,8 @@ " del self._waiting_request_events[memory_name]\n", "\n", " def __del__(self) -> None:\n", - " self._pubsub_thread.stop()" + " self._pubsub_thread.stop()\n", + " self._retrieve_executor.shutdown(cancel_futures=True)" ] }, { @@ -282,7 +293,11 @@ "metadata": {}, "outputs": [], "source": [ - "ms_codelet = MemoryStorageCodelet(mind, \"node0\")" + "ms_codelet = MemoryStorageCodelet(mind, \"node0\")\n", + "ms_codelet.time_step = 100\n", + "\n", + "mind.insert_codelet(ms_codelet)\n", + "mind.start()" ] }, { @@ -291,7 +306,7 @@ "metadata": {}, "outputs": [], "source": [ - "ms_codelet.proc()" + "time.sleep(1)" ] }, { @@ -322,7 +337,11 @@ { "data": { "text/plain": [ - "{'name': 'Memory1', 'evalution': '0.0', 'I': '', 'id': '0.0', 'owner': 'node0'}" + "{'name': 'Memory1',\n", + " 'evaluation': '0.0',\n", + " 'I': '',\n", + " 'id': '0.0',\n", + " 'owner': 'node0'}" ] }, "execution_count": 9, @@ -342,7 +361,10 @@ "source": [ "mind2 = cst.Mind()\n", "mind2_memory1 = mind2.create_memory_object(\"Memory1\", \"\")\n", - "mind2_ms_codelet = MemoryStorageCodelet(mind2)" + "mind2_ms_codelet = MemoryStorageCodelet(mind2)\n", + "mind2_ms_codelet.time_step = 100\n", + "mind2.insert_codelet(mind2_ms_codelet)\n", + "mind2.start()" ] }, { @@ -350,10 +372,28 @@ "execution_count": 11, "metadata": {}, "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "node1 Retrieve Memory1\n", + "node1 Requesting Memory1\n", + "node0 Tranfering Memory1\n", + "node0 Send memory Memory1\n", + "node1 Updating memory Memory1\n", + "node0 Updating memory Memory1\n", + "node1 Send memory Memory1\n", + "node1 Updating memory Memory1\n", + "node0 Updating memory Memory1\n", + "node0 Retrieve Memory1\n", + "node0 INFO \"\"\n", + "node1 INFO \"\"\n" + ] + }, { "data": { "text/plain": [ - "MemoryObject [idmemoryobject=0, timestamp=1725654264.298408, evaluation=0.0, I=, name=Memory1]" + "MemoryObject [idmemoryobject=0.0, timestamp=1726077369.7999365, evaluation=0.0, I=, name=Memory1]" ] }, "execution_count": 11, @@ -409,20 +449,9 @@ "cell_type": "code", "execution_count": 14, "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "node1 Retrieve Memory1\n", - "node1 Requesting Memory1\n", - "node0 Tranfering Memory1\n", - "node0 Send memory Memory1\n" - ] - } - ], + "outputs": [], "source": [ - "mind2_ms_codelet.proc()" + "time.sleep(1)" ] }, { @@ -430,24 +459,10 @@ "execution_count": 15, "metadata": {}, "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "node1 Updating memory Memory1\n", - "node0 Updating memory Memory1\n", - "node1 Send memory Memory1\n", - "node0 Updating memory Memory1\n", - "node1 Updating memory Memory1\n", - "node0 Retrieve Memory1\n", - "node0 INFO \"\"\n", - "node1 INFO \"\"\n" - ] - }, { "data": { "text/plain": [ - "MemoryObject [idmemoryobject=0.0, timestamp=1725654264.7690487, evaluation=0.0, I=, name=Memory1]" + "MemoryObject [idmemoryobject=0.0, timestamp=1726077369.7999365, evaluation=0.0, I=, name=Memory1]" ] }, "execution_count": 15, @@ -468,12 +483,11 @@ "data": { "text/plain": [ "{'name': 'Memory1',\n", - " 'evalution': '0.0',\n", + " 'evaluation': '0.0',\n", " 'I': '\"\"',\n", " 'id': '0',\n", " 'owner': '',\n", - " 'timestamp': '1725654264.298408',\n", - " 'evaluation': '0.0'}" + " 'timestamp': '1726077369.5866976'}" ] }, "execution_count": 16, @@ -518,12 +532,13 @@ "node0 Send memory Memory1\n", "node1 Updating memory Memory1\n", "node0 Updating memory Memory1\n", - "node1 Retrieve Memory1\n" + "node1 Retrieve Memory1\n", + "node1 INFO INFO \"INFO\"\n" ] } ], "source": [ - "ms_codelet.proc()" + "time.sleep(1)" ] }, { @@ -531,23 +546,15 @@ "execution_count": 19, "metadata": {}, "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "node1 INFO INFO \"INFO\"\n" - ] - }, { "data": { "text/plain": [ "{'name': 'Memory1',\n", - " 'evalution': '0.0',\n", + " 'evaluation': '0.0',\n", " 'I': '\"INFO\"',\n", " 'id': '0.0',\n", " 'owner': '',\n", - " 'timestamp': '1725654264.794752',\n", - " 'evaluation': '0.0'}" + " 'timestamp': '1726077370.926107'}" ] }, "execution_count": 19, @@ -567,7 +574,7 @@ { "data": { "text/plain": [ - "MemoryObject [idmemoryobject=0.0, timestamp=1725654264.8138657, evaluation=0.0, I=INFO, name=Memory1]" + "MemoryObject [idmemoryobject=0.0, timestamp=1726077371.003417, evaluation=0.0, I=INFO, name=Memory1]" ] }, "execution_count": 20, @@ -585,7 +592,7 @@ "metadata": {}, "outputs": [], "source": [ - "mind2_ms_codelet.proc()" + "time.sleep(1)" ] }, { @@ -597,12 +604,11 @@ "data": { "text/plain": [ "{'name': 'Memory1',\n", - " 'evalution': '0.0',\n", + " 'evaluation': '0.0',\n", " 'I': '\"INFO\"',\n", " 'id': '0.0',\n", " 'owner': '',\n", - " 'timestamp': '1725654264.794752',\n", - " 'evaluation': '0.0'}" + " 'timestamp': '1726077370.926107'}" ] }, "execution_count": 22, @@ -634,7 +640,7 @@ ], "source": [ "mind2_memory1.set_info(\"INFO2\")\n", - "mind2_ms_codelet.proc()" + "time.sleep(1)" ] }, { @@ -646,12 +652,11 @@ "data": { "text/plain": [ "{'name': 'Memory1',\n", - " 'evalution': '0.0',\n", + " 'evaluation': '0.0',\n", " 'I': '\"INFO2\"',\n", " 'id': '0.0',\n", " 'owner': '',\n", - " 'timestamp': '1725654302.9360394',\n", - " 'evaluation': '0.0'}" + " 'timestamp': '1726077373.0085642'}" ] }, "execution_count": 24, @@ -671,7 +676,7 @@ { "data": { "text/plain": [ - "MemoryObject [idmemoryobject=0.0, timestamp=1725654302.943039, evaluation=0.0, I=INFO2, name=Memory1]" + "MemoryObject [idmemoryobject=0.0, timestamp=1726077373.1104536, evaluation=0.0, I=INFO2, name=Memory1]" ] }, "execution_count": 25, @@ -683,6 +688,169 @@ "memory1" ] }, + { + "cell_type": "code", + "execution_count": 26, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "node0 Updating memory Memory1\n", + "node0 Send memory Memory1\n", + "node0 Updating memory Memory1\n", + "node1 Updating memory Memory1\n", + "node1 Retrieve Memory1\n", + "node1 INFO 1 1\n" + ] + } + ], + "source": [ + "memory1.set_info(1)\n", + "time.sleep(1)" + ] + }, + { + "cell_type": "code", + "execution_count": 27, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "1" + ] + }, + "execution_count": 27, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "mind2_memory1.get_info()" + ] + }, + { + "cell_type": "code", + "execution_count": 28, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "node0 Updating memory Memory1\n", + "node0 Send memory Memory1\n", + "node0 Updating memory Memory1\n", + "node1 Updating memory Memory1\n", + "node1 Retrieve Memory1\n", + "node1 INFO 1 \"1\"\n" + ] + } + ], + "source": [ + "memory1.set_info(\"1\")\n", + "time.sleep(1)" + ] + }, + { + "cell_type": "code", + "execution_count": 29, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "'1'" + ] + }, + "execution_count": 29, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "mind2_memory1.get_info()" + ] + }, + { + "cell_type": "code", + "execution_count": 30, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "node0 Updating memory Memory1\n", + "node0 Send memory Memory1\n", + "node1 Updating memory Memory1\n", + "node0 Updating memory Memory1\n", + "node1 Retrieve Memory1\n", + "node1 INFO True true\n" + ] + } + ], + "source": [ + "memory1.set_info(True)\n", + "time.sleep(1)\n" + ] + }, + { + "cell_type": "code", + "execution_count": 31, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "(True, bool)" + ] + }, + "execution_count": 31, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "mind2_memory1.get_info(), type(mind2_memory1.get_info())" + ] + }, + { + "cell_type": "code", + "execution_count": 32, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "node0 Updating memory Memory1\n", + "node0 Send memory Memory1\n", + "node1 Updating memory Memory1\n", + "node0 Updating memory Memory1\n", + "node1 Retrieve Memory1\n", + "node1 INFO [1, 2, 3] [1, 2, 3]\n" + ] + }, + { + "data": { + "text/plain": [ + "([1, 2, 3], list)" + ] + }, + "execution_count": 32, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "memory1.set_info([1,2,3])\n", + "time.sleep(1)\n", + "mind2_memory1.get_info(), type(mind2_memory1.get_info())" + ] + }, { "cell_type": "code", "execution_count": null, diff --git a/dev/weak_test.py b/dev/weak_test.py new file mode 100644 index 0000000..1669116 --- /dev/null +++ b/dev/weak_test.py @@ -0,0 +1,18 @@ +import weakref + +class Dummy: + + def __init__(self, value): + self.value = value + +weak_dict = weakref.WeakValueDictionary() + +var = Dummy(1) + +weak_dict["var"] = var + +print("var" in weak_dict) + +del var + +print("var" in weak_dict) \ No newline at end of file From 054b59a68d38464880580315970ecc936581d16d Mon Sep 17 00:00:00 2001 From: Elton Cardoso do Nascimento <43186596+EltonCN@users.noreply.github.com> Date: Fri, 20 Sep 2024 16:07:56 -0300 Subject: [PATCH 4/6] Fix Memory/MO data types timestamp and id float->int --- src/cst_python/core/entities/memory.py | 8 ++++---- src/cst_python/core/entities/memory_object.py | 18 +++++++++--------- 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/src/cst_python/core/entities/memory.py b/src/cst_python/core/entities/memory.py index 30e0160..bbaa0a6 100644 --- a/src/cst_python/core/entities/memory.py +++ b/src/cst_python/core/entities/memory.py @@ -38,7 +38,7 @@ def set_evaluation(self, evaluation:float) -> None: #@alias.alias("getTimestamp") @abc.abstractmethod - def get_timestamp(self) -> float: + def get_timestamp(self) -> int: ... #@alias.alias("addMemoryObserver") @@ -53,17 +53,17 @@ def remove_memory_observer(self, observer:MemoryObserver) -> None: #@alias.alias("getId") @abc.abstractmethod - def get_id(self) -> float: + def get_id(self) -> int: ... #@alias.alias("setId") @abc.abstractmethod - def set_id(self, memory_id:float) -> None: + def set_id(self, memory_id:int) -> None: ... #@alias.alias("getTimestamp") @abc.abstractmethod - def get_timestamp(self) -> float: + def get_timestamp(self) -> int: ... diff --git a/src/cst_python/core/entities/memory_object.py b/src/cst_python/core/entities/memory_object.py index 9ab855d..48a8160 100644 --- a/src/cst_python/core/entities/memory_object.py +++ b/src/cst_python/core/entities/memory_object.py @@ -10,8 +10,8 @@ class MemoryObject(Memory): def __init__(self) -> None: - self._id = 0.0 - self._timestamp = 0.0 + self._id = 0 + self._timestamp = 0 self._evaluation = 0.0 self._info : Any = None self._name = "" @@ -24,10 +24,10 @@ def __getstate__(self) -> object: return state - def get_id(self) -> float: + def get_id(self) -> int: return self._id - def set_id(self, memory_id: float) -> None: + def set_id(self, memory_id: int) -> None: self._id = memory_id def get_info(self) -> Any: @@ -35,7 +35,7 @@ def get_info(self) -> Any: def set_info(self, value: Any) -> int: self._info = value - self._timestamp = time.time() + self._timestamp = int(time.time()*1000) self._notify_memory_observers() return -1 @@ -47,16 +47,16 @@ def _notify_memory_observers(self) -> None: def update_info(self, info:Any) -> None: self.set_info(info) - def get_timestamp(self) -> float: + def get_timestamp(self) -> int: return self._timestamp @property - def timestamp(self) -> float: + def timestamp(self) -> int: return self._timestamp #@alias.alias("setTimestamp") @timestamp.setter - def timestamp(self, value:float) -> None: + def timestamp(self, value:int) -> None: self._timestamp = value def get_name(self) -> str: @@ -72,7 +72,7 @@ def get_evaluation(self) -> float: return self._evaluation def set_evaluation(self, evaluation: float) -> None: - return self._evaluation + self._evaluation = evaluation #@alias.alias("toString", "to_string") def __str__(self) -> str: From 1f52cf6c44cb5d2477a3e181b49974cd9de55253 Mon Sep 17 00:00:00 2001 From: Elton Cardoso do Nascimento <43186596+EltonCN@users.noreply.github.com> Date: Fri, 20 Sep 2024 16:12:55 -0300 Subject: [PATCH 5/6] MS: Fix type safety warning --- dev/memory_storage_codelet.ipynb | 201 +++++++++++++++++++++++-------- 1 file changed, 149 insertions(+), 52 deletions(-) diff --git a/dev/memory_storage_codelet.ipynb b/dev/memory_storage_codelet.ipynb index d90f888..d1dbc3f 100644 --- a/dev/memory_storage_codelet.ipynb +++ b/dev/memory_storage_codelet.ipynb @@ -43,7 +43,7 @@ }, { "cell_type": "code", - "execution_count": 3, + "execution_count": 2, "metadata": {}, "outputs": [], "source": [ @@ -66,7 +66,7 @@ }, { "cell_type": "code", - "execution_count": 4, + "execution_count": 3, "metadata": {}, "outputs": [], "source": [ @@ -87,25 +87,30 @@ " self._pubsub = self._client.pubsub()\n", " self._pubsub_thread : redis.client.PubSubWorkerThread = self._pubsub.run_in_thread()\n", "\n", - " if node_name is None:\n", - " node_number = self._client.scard(f\"{mind_name}:nodes\")\n", + " base_name = node_name\n", + " if base_name is None:\n", + " base_name = \"node\"\n", "\n", - " node_name = f\"node{node_number}\"\n", + " \n", + " if self._client.sismember(f\"{mind_name}:nodes\", node_name):\n", + " node_number = self._client.scard(f\"{mind_name}:nodes\")\n", + " node_name = base_name+str(node_number)\n", " while self._client.sismember(f\"{mind_name}:nodes\", node_name):\n", " node_number += 1\n", - " node_name = f\"node{node_number}\"\n", + " node_name = base_name+str(node_number)\n", + " \n", "\n", " self._node_name = cast(str, node_name)\n", "\n", " self._client.sadd(f\"{mind_name}:nodes\", node_name)\n", "\n", " transfer_service_addr = f\"{self._mind_name}:nodes:{node_name}:transfer_memory\"\n", - " self._pubsub.subscribe(**{transfer_service_addr:self.transfer_memory})\n", + " self._pubsub.subscribe(**{transfer_service_addr:self._handler_transfer_memory})\n", "\n", " transfer_done_addr = f\"{self._mind_name}:nodes:{node_name}:transfer_done\"\n", - " self._pubsub.subscribe(**{transfer_done_addr:self.notify_transfer})\n", + " self._pubsub.subscribe(**{transfer_done_addr:self._handler_notify_transfer})\n", "\n", - " self._last_update : dict[str, float] = {}\n", + " self._last_update : dict[str, int] = {}\n", " self._waiting_retrieve : set[str] = set()\n", " \n", " self._retrieve_executor = ThreadPoolExecutor(3)\n", @@ -141,13 +146,13 @@ " self._memories[memory_name] = memory\n", "\n", " if self._client.exists(f\"{self._mind_name}:memories:{memory_name}\"):\n", - " self._retrieve_executor.submit(self.retrieve_memory, memory)\n", + " self._retrieve_executor.submit(self._retrieve_memory, memory)\n", " \n", " else: #Send impostor with owner\n", " memory_impostor = {\"name\":memory.get_name(),\n", " \"evaluation\" : 0.0,\n", " \"I\": \"\",\n", - " \"id\" : \"0.0\",\n", + " \"id\" : 0,\n", " \"owner\": self._node_name}\n", " \n", " self._client.hset(f\"{self._mind_name}:memories:{memory_name}\", mapping=memory_impostor)\n", @@ -166,27 +171,25 @@ " if memory.get_timestamp() > self._last_update[memory_name]:\n", " self.update_memory(memory_name)\n", "\n", - " def transfer_memory(self, message) -> None:\n", - " request = json.loads(message[\"data\"])\n", - " \n", - " memory_name = request[\"memory_name\"]\n", - " requesting_node = request[\"node\"]\n", + " def update_memory(self, memory_name:str) -> None:\n", + " print(self._node_name, \"Updating memory\", memory_name)\n", "\n", - " print(self._node_name, \"Tranfering\", memory_name)\n", + " if memory_name not in self._memories:\n", + " self._pubsub.unsubscribe(f\"{self._mind_name}:memories:{memory_name}:update\")\n", "\n", - " if memory_name in self._memories:\n", - " memory = self._memories[memory_name]\n", - " else:\n", - " memory = cst.MemoryObject()\n", - " memory.set_name(memory_name)\n", + " timestamp = float(self._client.hget(f\"{self._mind_name}:memories:{memory_name}\", \"timestamp\"))\n", + " memory = self._memories[memory_name]\n", + " memory_timestamp = memory.get_timestamp()\n", " \n", - " self.send_memory(memory)\n", + " if memory_timestamp < timestamp:\n", + " self._retrieve_executor.submit(self._retrieve_memory, memory)\n", "\n", - " response_addr = f\"{self._mind_name}:nodes:{requesting_node}:transfer_done\"\n", - " self._client.publish(response_addr, memory_name)\n", + " elif memory_timestamp> timestamp:\n", + " self._send_memory(memory)\n", "\n", + " self._last_update[memory_name] = memory.get_timestamp()\n", "\n", - " def send_memory(self, memory:Memory) -> None:\n", + " def _send_memory(self, memory:Memory) -> None:\n", " memory_name = memory.get_name()\n", " print(self._node_name, \"Send memory\", memory_name)\n", " \n", @@ -200,25 +203,8 @@ "\n", " self._last_update[memory_name] = memory.get_timestamp()\n", " \n", - " def update_memory(self, memory_name:str) -> None:\n", - " print(self._node_name, \"Updating memory\", memory_name)\n", - "\n", - " if memory_name not in self._memories:\n", - " self._pubsub.unsubscribe(f\"{self._mind_name}:memories:{memory_name}:update\")\n", - "\n", - " timestamp = float(self._client.hget(f\"{self._mind_name}:memories:{memory_name}\", \"timestamp\"))\n", - " memory = self._memories[memory_name]\n", - " memory_timestamp = memory.get_timestamp()\n", - " \n", - " if memory_timestamp < timestamp:\n", - " self._retrieve_executor.submit(self.retrieve_memory, memory)\n", - "\n", - " elif memory_timestamp> timestamp:\n", - " self.send_memory(memory)\n", "\n", - " self._last_update[memory_name] = memory.get_timestamp()\n", - "\n", - " def retrieve_memory(self, memory:Memory) -> None:\n", + " def _retrieve_memory(self, memory:Memory) -> None:\n", " memory_name = memory.get_name()\n", "\n", " print(self._node_name, \"Retrieve\", memory_name)\n", @@ -232,18 +218,18 @@ " if memory_dict[\"owner\"] != \"\":\n", " event = threading.Event()\n", " self._waiting_request_events[memory_name] = event\n", - " self.request_memory(memory_name, memory_dict[\"owner\"])\n", + " self._request_memory(memory_name, memory_dict[\"owner\"])\n", "\n", " if not event.wait(timeout=self._request_timeout):\n", " print(self._node_name, \"Request failed\", memory_name)\n", " #Request failed\n", - " self.send_memory(memory)\n", + " self._send_memory(memory)\n", " return \n", " \n", " memory_dict = self._client.hgetall(f\"{self._mind_name}:memories:{memory_name}\")\n", "\n", " memory.set_evaluation(float(memory_dict[\"evaluation\"]))\n", - " memory.set_id(float(memory_dict[\"id\"]))\n", + " memory.set_id(int(memory_dict[\"id\"]))\n", "\n", " info_json = memory_dict[\"I\"]\n", " info = json.loads(info_json)\n", @@ -256,7 +242,7 @@ "\n", " self._waiting_retrieve.remove(memory_name)\n", "\n", - " def request_memory(self, memory_name:str, owner_name:str):\n", + " def _request_memory(self, memory_name:str, owner_name:str) -> None:\n", " print(self._node_name, \"Requesting\", memory_name)\n", "\n", " request_addr = f\"{self._mind_name}:nodes:{owner_name}:transfer_memory\"\n", @@ -265,21 +251,74 @@ " request = json.dumps(request_dict)\n", " self._client.publish(request_addr, request)\n", "\n", - " def notify_transfer(self, message:str) -> None:\n", + " def _handler_notify_transfer(self, message:str) -> None:\n", " memory_name = message[\"data\"]\n", " if memory_name in self._waiting_request_events:\n", " event = self._waiting_request_events[memory_name]\n", " event.set()\n", " del self._waiting_request_events[memory_name]\n", "\n", + " def _handler_transfer_memory(self, message) -> None:\n", + " request = json.loads(message[\"data\"])\n", + " \n", + " memory_name = request[\"memory_name\"]\n", + " requesting_node = request[\"node\"]\n", + "\n", + " print(self._node_name, \"Tranfering\", memory_name)\n", + "\n", + " if memory_name in self._memories:\n", + " memory = self._memories[memory_name]\n", + " else:\n", + " memory = cst.MemoryObject()\n", + " memory.set_name(memory_name)\n", + " \n", + " self._send_memory(memory)\n", + "\n", + " response_addr = f\"{self._mind_name}:nodes:{requesting_node}:transfer_done\"\n", + " self._client.publish(response_addr, memory_name)\n", + "\n", " def __del__(self) -> None:\n", " self._pubsub_thread.stop()\n", " self._retrieve_executor.shutdown(cancel_futures=True)" ] }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "```mermaid\n", + "flowchart LR\n", + "\n", + "update[Update Memory]\n", + "send[Send Memory]\n", + "retrieve[Retrieve Memory]\n", + "request[Request Memory]\n", + "handler_notify_transfer[Handler: Notify Transfer]\n", + "handler_transfer_memory[Handler: Transfer Memory]\n", + "\n", + "\n", + "update --> |\"timestamp(MS) < timestamp(local)\"| send\n", + "update --> |\"timestamp(MS) > timestamp(local)\"| retrieve\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "handler_transfer_memory --> send\n", + "\n", + "subgraph retrieveContext\n", + "retrieve --> |owner is not empty| request\n", + "\n", + "request -.->|Wait transfer event| handler_notify_transfer\n", + "\n", + "end\n", + "\n", + "```" + ] + }, { "cell_type": "code", - "execution_count": 5, + "execution_count": 4, "metadata": {}, "outputs": [], "source": [ @@ -289,9 +328,18 @@ }, { "cell_type": "code", - "execution_count": 6, + "execution_count": 5, "metadata": {}, - "outputs": [], + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "node03 Retrieve Memory1\n", + "node03 INFO INFO \"INFO\"\n" + ] + } + ], "source": [ "ms_codelet = MemoryStorageCodelet(mind, \"node0\")\n", "ms_codelet.time_step = 100\n", @@ -300,6 +348,55 @@ "mind.start()" ] }, + { + "cell_type": "code", + "execution_count": 6, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "MemoryObject [idmemoryobject=1, timestamp=1726858916840, evaluation=0.0, I=INFO, name=Memory1]" + ] + }, + "execution_count": 6, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "memory1" + ] + }, + { + "cell_type": "code", + "execution_count": 9, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "-1" + ] + }, + "execution_count": 9, + "metadata": {}, + "output_type": "execute_result" + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "node03 Updating memory Memory1\n", + "node03 Send memory Memory1\n", + "node03 Updating memory Memory1\n" + ] + } + ], + "source": [ + "memory1.set_info(1.0)" + ] + }, { "cell_type": "code", "execution_count": 7, From b5651144f8e85f383a80805c7b94f1feab7e9397 Mon Sep 17 00:00:00 2001 From: Elton Cardoso do Nascimento <43186596+EltonCN@users.noreply.github.com> Date: Fri, 27 Sep 2024 13:58:36 -0300 Subject: [PATCH 6/6] Move MS and MemoryEncoder to module --- dev/memory_storage_codelet.ipynb | 274 +----------------- src/cst_python/memory_storage/__init__.py | 1 + .../memory_storage/memory_encoder.py | 32 ++ .../memory_storage/memory_storage.py | 212 ++++++++++++++ 4 files changed, 258 insertions(+), 261 deletions(-) create mode 100644 src/cst_python/memory_storage/__init__.py create mode 100644 src/cst_python/memory_storage/memory_encoder.py create mode 100644 src/cst_python/memory_storage/memory_storage.py diff --git a/dev/memory_storage_codelet.ipynb b/dev/memory_storage_codelet.ipynb index d1dbc3f..25f6692 100644 --- a/dev/memory_storage_codelet.ipynb +++ b/dev/memory_storage_codelet.ipynb @@ -6,18 +6,12 @@ "metadata": {}, "outputs": [], "source": [ - "import json\n", - "import weakref\n", - "import json\n", - "import threading\n", "import time\n", - "from concurrent.futures import ThreadPoolExecutor\n", - "from typing import Optional, cast, List\n", "\n", "import redis\n", "\n", "import cst_python as cst\n", - "from cst_python.core.entities import Memory, Mind" + "from cst_python.memory_storage import MemoryStorageCodelet" ] }, { @@ -41,247 +35,6 @@ "client.flushall()" ] }, - { - "cell_type": "code", - "execution_count": 2, - "metadata": {}, - "outputs": [], - "source": [ - "class MemoryEncoder(json.JSONEncoder):\n", - " def default(self, memory:cst.core.entities.Memory):\n", - " return MemoryEncoder.to_dict(memory)\n", - " \n", - " @staticmethod\n", - " def to_dict(memory:cst.core.entities.Memory):\n", - " data = {\n", - " \"timestamp\": memory.get_timestamp(),\n", - " \"evaluation\": memory.get_evaluation(),\n", - " \"I\": memory.get_info(),\n", - " \"name\": memory.get_name(),\n", - " \"id\": memory.get_id()\n", - " }\n", - "\n", - " return data" - ] - }, - { - "cell_type": "code", - "execution_count": 3, - "metadata": {}, - "outputs": [], - "source": [ - "class MemoryStorageCodelet(cst.Codelet):\n", - " def __init__(self, mind:Mind, node_name:Optional[str]=None, mind_name:Optional[str]=None, request_timeout:float=500e-3) -> None:\n", - " super().__init__()\n", - " \n", - " self._mind = mind\n", - " self._request_timeout = request_timeout\n", - " \n", - " if mind_name is None:\n", - " mind_name = \"default_mind\"\n", - " self._mind_name = cast(str, mind_name)\n", - " \n", - " self._memories : weakref.WeakValueDictionary[str, Memory] = weakref.WeakValueDictionary()\n", - " \n", - " self._client = redis.Redis(decode_responses=True)\n", - " self._pubsub = self._client.pubsub()\n", - " self._pubsub_thread : redis.client.PubSubWorkerThread = self._pubsub.run_in_thread()\n", - "\n", - " base_name = node_name\n", - " if base_name is None:\n", - " base_name = \"node\"\n", - "\n", - " \n", - " if self._client.sismember(f\"{mind_name}:nodes\", node_name):\n", - " node_number = self._client.scard(f\"{mind_name}:nodes\")\n", - " node_name = base_name+str(node_number)\n", - " while self._client.sismember(f\"{mind_name}:nodes\", node_name):\n", - " node_number += 1\n", - " node_name = base_name+str(node_number)\n", - " \n", - "\n", - " self._node_name = cast(str, node_name)\n", - "\n", - " self._client.sadd(f\"{mind_name}:nodes\", node_name)\n", - "\n", - " transfer_service_addr = f\"{self._mind_name}:nodes:{node_name}:transfer_memory\"\n", - " self._pubsub.subscribe(**{transfer_service_addr:self._handler_transfer_memory})\n", - "\n", - " transfer_done_addr = f\"{self._mind_name}:nodes:{node_name}:transfer_done\"\n", - " self._pubsub.subscribe(**{transfer_done_addr:self._handler_notify_transfer})\n", - "\n", - " self._last_update : dict[str, int] = {}\n", - " self._waiting_retrieve : set[str] = set()\n", - " \n", - " self._retrieve_executor = ThreadPoolExecutor(3)\n", - "\n", - " self._waiting_request_events : dict[str, threading.Event] = {}\n", - "\n", - " self._request = None\n", - "\n", - " def calculate_activation(self) -> None:\n", - " pass\n", - "\n", - " def access_memory_objects(self) -> None:\n", - " pass\n", - "\n", - " def proc(self) -> None:\n", - " \n", - " #Check new memories\n", - "\n", - " mind_memories = {}\n", - " for memory in self._mind.raw_memory.all_memories:\n", - " if memory.get_name() == \"\": #No name -> No MS\n", - " continue\n", - "\n", - " mind_memories[memory.get_name()] = memory\n", - "\n", - " mind_memories_names = set(mind_memories.keys())\n", - " memories_names = set(self._memories.keys())\n", - "\n", - " #Check only not here (memories_names not in mind should be garbage collected)\n", - " difference = mind_memories_names - memories_names\n", - " for memory_name in difference:\n", - " memory : Memory = mind_memories[memory_name]\n", - " self._memories[memory_name] = memory\n", - "\n", - " if self._client.exists(f\"{self._mind_name}:memories:{memory_name}\"):\n", - " self._retrieve_executor.submit(self._retrieve_memory, memory)\n", - " \n", - " else: #Send impostor with owner\n", - " memory_impostor = {\"name\":memory.get_name(),\n", - " \"evaluation\" : 0.0,\n", - " \"I\": \"\",\n", - " \"id\" : 0,\n", - " \"owner\": self._node_name}\n", - " \n", - " self._client.hset(f\"{self._mind_name}:memories:{memory_name}\", mapping=memory_impostor)\n", - "\n", - " subscribe_func = lambda message : self.update_memory(memory_name)\n", - " self._pubsub.subscribe(**{f\"{self._mind_name}:memories:{memory_name}:update\":subscribe_func})\n", - "\n", - " #Update memories\n", - " to_update = self._last_update.keys()\n", - " for memory_name in to_update:\n", - " if memory_name not in self._memories:\n", - " del self._last_update[memory_name]\n", - " continue\n", - "\n", - " memory = self._memories[memory_name]\n", - " if memory.get_timestamp() > self._last_update[memory_name]:\n", - " self.update_memory(memory_name)\n", - "\n", - " def update_memory(self, memory_name:str) -> None:\n", - " print(self._node_name, \"Updating memory\", memory_name)\n", - "\n", - " if memory_name not in self._memories:\n", - " self._pubsub.unsubscribe(f\"{self._mind_name}:memories:{memory_name}:update\")\n", - "\n", - " timestamp = float(self._client.hget(f\"{self._mind_name}:memories:{memory_name}\", \"timestamp\"))\n", - " memory = self._memories[memory_name]\n", - " memory_timestamp = memory.get_timestamp()\n", - " \n", - " if memory_timestamp < timestamp:\n", - " self._retrieve_executor.submit(self._retrieve_memory, memory)\n", - "\n", - " elif memory_timestamp> timestamp:\n", - " self._send_memory(memory)\n", - "\n", - " self._last_update[memory_name] = memory.get_timestamp()\n", - "\n", - " def _send_memory(self, memory:Memory) -> None:\n", - " memory_name = memory.get_name()\n", - " print(self._node_name, \"Send memory\", memory_name)\n", - " \n", - " memory_dict = MemoryEncoder.to_dict(memory)\n", - " memory_dict[\"I\"] = json.dumps(memory_dict[\"I\"])\n", - " memory_dict[\"owner\"] = \"\"\n", - "\n", - "\n", - " self._client.hset(f\"{self._mind_name}:memories:{memory_name}\", mapping=memory_dict)\n", - " self._client.publish(f\"{self._mind_name}:memories:{memory_name}:update\", \"\")\n", - "\n", - " self._last_update[memory_name] = memory.get_timestamp()\n", - " \n", - "\n", - " def _retrieve_memory(self, memory:Memory) -> None:\n", - " memory_name = memory.get_name()\n", - "\n", - " print(self._node_name, \"Retrieve\", memory_name)\n", - "\n", - " if memory_name in self._waiting_retrieve:\n", - " return\n", - " self._waiting_retrieve.add(memory_name)\n", - "\n", - " memory_dict = self._client.hgetall(f\"{self._mind_name}:memories:{memory_name}\")\n", - "\n", - " if memory_dict[\"owner\"] != \"\":\n", - " event = threading.Event()\n", - " self._waiting_request_events[memory_name] = event\n", - " self._request_memory(memory_name, memory_dict[\"owner\"])\n", - "\n", - " if not event.wait(timeout=self._request_timeout):\n", - " print(self._node_name, \"Request failed\", memory_name)\n", - " #Request failed\n", - " self._send_memory(memory)\n", - " return \n", - " \n", - " memory_dict = self._client.hgetall(f\"{self._mind_name}:memories:{memory_name}\")\n", - "\n", - " memory.set_evaluation(float(memory_dict[\"evaluation\"]))\n", - " memory.set_id(int(memory_dict[\"id\"]))\n", - "\n", - " info_json = memory_dict[\"I\"]\n", - " info = json.loads(info_json)\n", - "\n", - " print(self._node_name, \"INFO\", info, info_json)\n", - "\n", - " memory.set_info(info)\n", - "\n", - " self._last_update[memory_name] = memory.get_timestamp()\n", - "\n", - " self._waiting_retrieve.remove(memory_name)\n", - "\n", - " def _request_memory(self, memory_name:str, owner_name:str) -> None:\n", - " print(self._node_name, \"Requesting\", memory_name)\n", - "\n", - " request_addr = f\"{self._mind_name}:nodes:{owner_name}:transfer_memory\"\n", - " \n", - " request_dict = {\"memory_name\":memory_name, \"node\":self._node_name}\n", - " request = json.dumps(request_dict)\n", - " self._client.publish(request_addr, request)\n", - "\n", - " def _handler_notify_transfer(self, message:str) -> None:\n", - " memory_name = message[\"data\"]\n", - " if memory_name in self._waiting_request_events:\n", - " event = self._waiting_request_events[memory_name]\n", - " event.set()\n", - " del self._waiting_request_events[memory_name]\n", - "\n", - " def _handler_transfer_memory(self, message) -> None:\n", - " request = json.loads(message[\"data\"])\n", - " \n", - " memory_name = request[\"memory_name\"]\n", - " requesting_node = request[\"node\"]\n", - "\n", - " print(self._node_name, \"Tranfering\", memory_name)\n", - "\n", - " if memory_name in self._memories:\n", - " memory = self._memories[memory_name]\n", - " else:\n", - " memory = cst.MemoryObject()\n", - " memory.set_name(memory_name)\n", - " \n", - " self._send_memory(memory)\n", - "\n", - " response_addr = f\"{self._mind_name}:nodes:{requesting_node}:transfer_done\"\n", - " self._client.publish(response_addr, memory_name)\n", - "\n", - " def __del__(self) -> None:\n", - " self._pubsub_thread.stop()\n", - " self._retrieve_executor.shutdown(cancel_futures=True)" - ] - }, { "cell_type": "markdown", "metadata": {}, @@ -318,7 +71,7 @@ }, { "cell_type": "code", - "execution_count": 4, + "execution_count": 2, "metadata": {}, "outputs": [], "source": [ @@ -328,15 +81,14 @@ }, { "cell_type": "code", - "execution_count": 5, + "execution_count": 3, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ - "node03 Retrieve Memory1\n", - "node03 INFO INFO \"INFO\"\n" + "node03 Retrieve Memory1\n" ] } ], @@ -350,16 +102,16 @@ }, { "cell_type": "code", - "execution_count": 6, + "execution_count": 4, "metadata": {}, "outputs": [ { "data": { "text/plain": [ - "MemoryObject [idmemoryobject=1, timestamp=1726858916840, evaluation=0.0, I=INFO, name=Memory1]" + "MemoryObject [idmemoryobject=1, timestamp=1727456263799, evaluation=0.0, I=[1, 1, 1], name=Memory1]" ] }, - "execution_count": 6, + "execution_count": 4, "metadata": {}, "output_type": "execute_result" } @@ -370,7 +122,7 @@ }, { "cell_type": "code", - "execution_count": 9, + "execution_count": 6, "metadata": {}, "outputs": [ { @@ -379,7 +131,7 @@ "-1" ] }, - "execution_count": 9, + "execution_count": 6, "metadata": {}, "output_type": "execute_result" }, @@ -387,14 +139,14 @@ "name": "stdout", "output_type": "stream", "text": [ - "node03 Updating memory Memory1\n", - "node03 Send memory Memory1\n", - "node03 Updating memory Memory1\n" + "node02 Updating memory Memory1\n", + "node02 Send memory Memory1\n", + "node02 Updating memory Memory1\n" ] } ], "source": [ - "memory1.set_info(1.0)" + "memory1.set_info([1,1,1])" ] }, { diff --git a/src/cst_python/memory_storage/__init__.py b/src/cst_python/memory_storage/__init__.py new file mode 100644 index 0000000..5abd58d --- /dev/null +++ b/src/cst_python/memory_storage/__init__.py @@ -0,0 +1 @@ +from .memory_storage import MemoryStorageCodelet \ No newline at end of file diff --git a/src/cst_python/memory_storage/memory_encoder.py b/src/cst_python/memory_storage/memory_encoder.py new file mode 100644 index 0000000..7c5a9e9 --- /dev/null +++ b/src/cst_python/memory_storage/memory_encoder.py @@ -0,0 +1,32 @@ +import json +from typing import Any + +from cst_python.core.entities import Memory + +class MemoryEncoder(json.JSONEncoder): + def default(self, memory:Memory): + return MemoryEncoder.to_dict(memory) + + @staticmethod + def to_dict(memory:Memory, jsonify_info:bool=False): + data = { + "timestamp": memory.get_timestamp(), + "evaluation": memory.get_evaluation(), + "I": memory.get_info(), + "name": memory.get_name(), + "id": memory.get_id() + } + + if jsonify_info: + data["I"] = json.dumps(data["I"]) + + return data + + def load_memory(memory:Memory, memory_dict:dict[str,Any], load_json:bool=True): + memory.set_evaluation(float(memory_dict["evaluation"])) + memory.set_id(int(memory_dict["id"])) + + info_json = memory_dict["I"] + info = json.loads(info_json) + + memory.set_info(info) diff --git a/src/cst_python/memory_storage/memory_storage.py b/src/cst_python/memory_storage/memory_storage.py new file mode 100644 index 0000000..7abb474 --- /dev/null +++ b/src/cst_python/memory_storage/memory_storage.py @@ -0,0 +1,212 @@ +import json +import weakref +import json +import threading +from concurrent.futures import ThreadPoolExecutor +from typing import Optional, cast + +import redis + +from cst_python.core.entities import Codelet, Mind, Memory, MemoryObject +from .memory_encoder import MemoryEncoder + +class MemoryStorageCodelet(Codelet): + def __init__(self, mind:Mind, node_name:Optional[str]=None, mind_name:Optional[str]=None, request_timeout:float=500e-3) -> None: + super().__init__() + + self._mind = mind + self._request_timeout = request_timeout + + if mind_name is None: + mind_name = "default_mind" + self._mind_name = cast(str, mind_name) + + self._memories : weakref.WeakValueDictionary[str, Memory] = weakref.WeakValueDictionary() + + self._client = redis.Redis(decode_responses=True) + self._pubsub = self._client.pubsub() + self._pubsub_thread : redis.client.PubSubWorkerThread = self._pubsub.run_in_thread() + + base_name = node_name + if base_name is None: + base_name = "node" + + if self._client.sismember(f"{mind_name}:nodes", node_name): + node_number = self._client.scard(f"{mind_name}:nodes") + node_name = base_name+str(node_number) + while self._client.sismember(f"{mind_name}:nodes", node_name): + node_number += 1 + node_name = base_name+str(node_number) + + + self._node_name = cast(str, node_name) + + self._client.sadd(f"{mind_name}:nodes", node_name) + + transfer_service_addr = f"{self._mind_name}:nodes:{node_name}:transfer_memory" + self._pubsub.subscribe(**{transfer_service_addr:self._handler_transfer_memory}) + + transfer_done_addr = f"{self._mind_name}:nodes:{node_name}:transfer_done" + self._pubsub.subscribe(**{transfer_done_addr:self._handler_notify_transfer}) + + self._last_update : dict[str, int] = {} + self._waiting_retrieve : set[str] = set() + + self._retrieve_executor = ThreadPoolExecutor(3) + + self._waiting_request_events : dict[str, threading.Event] = {} + + self._request = None + + def calculate_activation(self) -> None: + pass + + def access_memory_objects(self) -> None: + pass + + def proc(self) -> None: + + #Check new memories + + mind_memories = {} + for memory in self._mind.raw_memory.all_memories: + if memory.get_name() == "": #No name -> No MS + continue + + mind_memories[memory.get_name()] = memory + + mind_memories_names = set(mind_memories.keys()) + memories_names = set(self._memories.keys()) + + #Check only not here (memories_names not in mind should be garbage collected) + difference = mind_memories_names - memories_names + for memory_name in difference: + memory : Memory = mind_memories[memory_name] + self._memories[memory_name] = memory + + if self._client.exists(f"{self._mind_name}:memories:{memory_name}"): + self._retrieve_executor.submit(self._retrieve_memory, memory) + + else: #Send impostor with owner + memory_impostor = {"name":memory.get_name(), + "evaluation" : 0.0, + "I": "", + "id" : 0, + "owner": self._node_name} + + self._client.hset(f"{self._mind_name}:memories:{memory_name}", mapping=memory_impostor) + + subscribe_func = lambda message : self.update_memory(memory_name) + self._pubsub.subscribe(**{f"{self._mind_name}:memories:{memory_name}:update":subscribe_func}) + + #Update memories + to_update = self._last_update.keys() + for memory_name in to_update: + if memory_name not in self._memories: + del self._last_update[memory_name] + continue + + memory = self._memories[memory_name] + if memory.get_timestamp() > self._last_update[memory_name]: + self.update_memory(memory_name) + + def update_memory(self, memory_name:str) -> None: + print(self._node_name, "Updating memory", memory_name) + + if memory_name not in self._memories: + self._pubsub.unsubscribe(f"{self._mind_name}:memories:{memory_name}:update") + + timestamp = float(self._client.hget(f"{self._mind_name}:memories:{memory_name}", "timestamp")) + memory = self._memories[memory_name] + memory_timestamp = memory.get_timestamp() + + if memory_timestamp < timestamp: + self._retrieve_executor.submit(self._retrieve_memory, memory) + + elif memory_timestamp> timestamp: + self._send_memory(memory) + + self._last_update[memory_name] = memory.get_timestamp() + + def _send_memory(self, memory:Memory) -> None: + memory_name = memory.get_name() + print(self._node_name, "Send memory", memory_name) + + memory_dict = MemoryEncoder.to_dict(memory, jsonify_info=True) + memory_dict["owner"] = "" + + + self._client.hset(f"{self._mind_name}:memories:{memory_name}", mapping=memory_dict) + self._client.publish(f"{self._mind_name}:memories:{memory_name}:update", "") + + self._last_update[memory_name] = memory.get_timestamp() + + + def _retrieve_memory(self, memory:Memory) -> None: + memory_name = memory.get_name() + + print(self._node_name, "Retrieve", memory_name) + + if memory_name in self._waiting_retrieve: + return + self._waiting_retrieve.add(memory_name) + + memory_dict = self._client.hgetall(f"{self._mind_name}:memories:{memory_name}") + + if memory_dict["owner"] != "": + event = threading.Event() + self._waiting_request_events[memory_name] = event + self._request_memory(memory_name, memory_dict["owner"]) + + if not event.wait(timeout=self._request_timeout): + print(self._node_name, "Request failed", memory_name) + #Request failed + self._send_memory(memory) + return + + memory_dict = self._client.hgetall(f"{self._mind_name}:memories:{memory_name}") + + MemoryEncoder.load_memory(memory, memory_dict) + + self._last_update[memory_name] = memory.get_timestamp() + + self._waiting_retrieve.remove(memory_name) + + def _request_memory(self, memory_name:str, owner_name:str) -> None: + print(self._node_name, "Requesting", memory_name) + + request_addr = f"{self._mind_name}:nodes:{owner_name}:transfer_memory" + + request_dict = {"memory_name":memory_name, "node":self._node_name} + request = json.dumps(request_dict) + self._client.publish(request_addr, request) + + def _handler_notify_transfer(self, message:str) -> None: + memory_name = message["data"] + if memory_name in self._waiting_request_events: + event = self._waiting_request_events[memory_name] + event.set() + del self._waiting_request_events[memory_name] + + def _handler_transfer_memory(self, message) -> None: + request = json.loads(message["data"]) + + memory_name = request["memory_name"] + requesting_node = request["node"] + + print(self._node_name, "Tranfering", memory_name) + + if memory_name in self._memories: + memory = self._memories[memory_name] + else: + memory = MemoryObject() + memory.set_name(memory_name) + + self._send_memory(memory) + + response_addr = f"{self._mind_name}:nodes:{requesting_node}:transfer_done" + self._client.publish(response_addr, memory_name) + + def __del__(self) -> None: + self._pubsub_thread.stop() + self._retrieve_executor.shutdown(cancel_futures=True) \ No newline at end of file