-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathses-dma.py
160 lines (139 loc) · 5.73 KB
/
ses-dma.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
# Core System Configuration
config = {
"llm": {
"endpoint": "http://localhost:1234/v1/chat/completions", # LM Studio endpoint
"model": "local-model",
"temperature": 0.7,
"max_tokens": 512
},
"memory": {
"db_path": "memory.sqlite",
"stm_capacity": 10, # Number of items in short-term memory
"ltm_threshold": 0.6, # Fitness threshold for long-term memory
"consolidation_interval": 300 # 5 minutes
},
"logging": {
"log_path": "ses_dma.log",
"metrics_db": "metrics.sqlite",
"log_level": "INFO"
}
}
# Core Memory Architecture
class MemoryStore:
def __init__(self, db_path):
self.conn = sqlite3.connect(db_path)
self.init_tables()
def init_tables(self):
"""Initialize core memory tables"""
self.conn.executescript("""
CREATE TABLE IF NOT EXISTS short_term_memory (
id INTEGER PRIMARY KEY,
content TEXT,
embedding BLOB,
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP,
access_count INTEGER DEFAULT 1,
importance_score FLOAT DEFAULT 0.5
);
CREATE TABLE IF NOT EXISTS long_term_memory (
id INTEGER PRIMARY KEY,
content TEXT,
embedding BLOB,
created_at DATETIME,
last_accessed DATETIME,
access_count INTEGER,
importance_score FLOAT,
connections JSON
);
CREATE TABLE IF NOT EXISTS memory_metrics (
id INTEGER PRIMARY KEY,
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP,
memory_type TEXT,
operation TEXT,
details JSON
);
""")
class MemoryController:
def __init__(self, config):
self.store = MemoryStore(config["memory"]["db_path"])
self.logger = self.setup_logger(config["logging"])
self.metrics = MetricsCollector(config["logging"]["metrics_db"])
async def process_input(self, query: str):
"""Process incoming query and manage memory operations"""
self.logger.info(f"Processing query: {query}")
# Record input metrics
await self.metrics.record_operation("input_processing", {
"query_length": len(query),
"timestamp": datetime.now().isoformat()
})
# Store in short-term memory
memory_id = await self.store.add_to_stm(query)
return memory_id
async def consolidate_memories(self):
"""Basic memory consolidation process"""
memories = await self.store.get_stm_candidates()
for memory in memories:
fitness = await self.calculate_fitness(memory)
if fitness >= self.config["memory"]["ltm_threshold"]:
await self.store.promote_to_ltm(memory)
self.logger.info(f"Memory {memory.id} promoted to LTM")
class EvolutionSystem:
def __init__(self, memory_controller):
self.memory_controller = memory_controller
self.fitness_calculator = MemoryFitnessCalculator()
async def calculate_fitness(self, memory_item):
"""Calculate memory item's fitness score"""
metrics = {
"recency": self.calculate_recency(memory_item.timestamp),
"access_frequency": memory_item.access_count,
"importance": memory_item.importance_score,
}
return sum(metrics.values()) / len(metrics)
async def prune_memories(self):
"""Basic memory pruning based on fitness"""
candidates = await self.memory_controller.get_pruning_candidates()
for memory in candidates:
fitness = await self.calculate_fitness(memory)
if fitness < self.config["memory"]["ltm_threshold"]:
await self.memory_controller.archive_memory(memory.id)
class MonitoringSystem:
def __init__(self, config):
self.logger = self.setup_logger(config)
self.metrics_db = self.init_metrics_db(config)
def setup_logger(self, config):
"""Configure structured logging"""
logging.basicConfig(
filename=config["logging"]["log_path"],
level=config["logging"]["log_level"],
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
return logging.getLogger("SES-DMA")
async def record_metric(self, metric_type, value, metadata=None):
"""Record system metrics"""
await self.metrics_db.execute(
"INSERT INTO metrics (type, value, metadata) VALUES (?, ?, ?)",
(metric_type, value, json.dumps(metadata or {}))
)
class LLMInterface:
def __init__(self, config):
self.endpoint = config["llm"]["endpoint"]
self.headers = {
"Content-Type": "application/json"
}
async def generate_response(self, prompt, context=None):
"""Interface with LM Studio endpoint"""
payload = {
"messages": [
{"role": "system", "content": "You are a helpful assistant with evolving memory."},
{"role": "user", "content": prompt}
],
"temperature": self.config["llm"]["temperature"],
"max_tokens": self.config["llm"]["max_tokens"]
}
if context:
payload["messages"].insert(1, {
"role": "assistant",
"content": f"Previous context: {context}"
})
async with aiohttp.ClientSession() as session:
async with session.post(self.endpoint, json=payload) as response:
return await response.json()