-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest.py
84 lines (73 loc) · 2.89 KB
/
test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
# test.py
import asyncio
import aiohttp
import sqlite3
from datetime import datetime
import json
class BrainHealthMonitor:
def __init__(self):
self.health_metrics = {}
self.status_checklist = {
"memory_agent": False,
"evolution_agent": False,
"supervisor_agent": False,
"database_connection": False,
"api_endpoints": False,
"memory_coherence": False
}
async def run_health_check(self):
"""Run comprehensive health check of the brain system"""
tests = [
self.check_agent_endpoints(),
self.check_database_health(),
self.check_memory_coherence(),
self.check_system_resources(),
self.check_agent_communication()
]
results = await asyncio.gather(*tests)
return self.compile_health_report(results)
async def check_agent_endpoints(self):
"""Verify all LLM endpoints are responsive"""
endpoints = {
"memory": "http://localhost:1234/v1/chat/completions", # phi-2
"evolution": "http://localhost:1235/v1/chat/completions", # mistral
"supervisor": "http://localhost:1236/v1/chat/completions" # llama2
}
results = {}
async with aiohttp.ClientSession() as session:
for agent, url in endpoints.items():
try:
async with session.get(url) as response:
results[agent] = response.status == 200
except:
results[agent] = False
return {"agent_status": results}
async def check_memory_coherence(self):
"""Check memory system coherence"""
conn = sqlite3.connect('memory.sqlite')
cursor = conn.cursor()
checks = {
"stm_count": cursor.execute("SELECT COUNT(*) FROM short_term_memory").fetchone()[0],
"ltm_count": cursor.execute("SELECT COUNT(*) FROM long_term_memory").fetchone()[0],
"orphaned_memories": cursor.execute("""
SELECT COUNT(*) FROM memory_connections
WHERE target_id NOT IN (SELECT id FROM long_term_memory)
""").fetchone()[0]
}
conn.close()
return {"memory_coherence": checks}
def compile_health_report(self, results):
"""Compile health check results into a report"""
report = {
"timestamp": datetime.now().isoformat(),
"overall_status": "healthy",
"checks": results,
"warnings": [],
"recommendations": []
}
# Analyze results and add warnings/recommendations
for check in results:
if not all(check.values()):
report["overall_status"] = "degraded"
report["warnings"].append(f"Failed check: {check}")
return report