From eda25febe854ba89b647e29b5d0fbbaa626e003f Mon Sep 17 00:00:00 2001 From: Tom Hensel Date: Fri, 5 Jul 2024 17:08:52 +0200 Subject: [PATCH] refactor; unify emoji style; use recent python features --- .github/workflows/code-quality.yaml | 20 +- .gitignore | 2 + config/config.yaml | 2 + pytest.ini | 3 + src/config_manager.py | 41 +-- src/meshgram.py | 78 +++--- src/meshtastic_interface.py | 129 ++++++---- src/message_processor.py | 387 +++++++++++++++------------- src/node_manager.py | 158 +++++++++--- src/telegram_interface.py | 291 +++++++++------------ tests/test_meshtastic_interface.py | 34 ++- 11 files changed, 639 insertions(+), 506 deletions(-) create mode 100644 pytest.ini diff --git a/.github/workflows/code-quality.yaml b/.github/workflows/code-quality.yaml index a74184f..1cf9ac2 100644 --- a/.github/workflows/code-quality.yaml +++ b/.github/workflows/code-quality.yaml @@ -19,15 +19,13 @@ jobs: - name: Install dependencies run: | python -m pip install -q -U pip - pip install -q -U flake8 pytest - if [ -f requirements.txt ]; then pip install -q -U -r requirements.txt; fi - - name: Lint with flake8 - continue-on-error: true - run: | - # stop the build if there are Python syntax errors or undefined names - flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics - # exit-zero treats all errors as warnings - flake8 . --count --exit-zero --max-complexity=10 --max-line-length=120 --statistics - - name: Test with pytest + pip install -q -U -r requirements.txt + pip install -q -U pytest pytest-asyncio coverage + + - name: Run tests with coverage run: | - pytest \ No newline at end of file + coverage run -m pytest + coverage report -m + + - name: Run Prospector + uses: jpetrucciani/prospector-check@master diff --git a/.gitignore b/.gitignore index a191a23..70d9861 100644 --- a/.gitignore +++ b/.gitignore @@ -20,6 +20,8 @@ wheels/ .installed.cfg *.egg +.coverage + # dotenv .envrc diff --git a/config/config.yaml b/config/config.yaml index 1cba5b2..2d3e726 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -21,6 +21,8 @@ meshtastic: logging: level: 'info' + level_telegram: 'warn' + level_httpx: 'warn' use_syslog: false syslog_host: "${SYSLOG_HOST}" syslog_port: 514 diff --git a/pytest.ini b/pytest.ini new file mode 100644 index 0000000..442bb94 --- /dev/null +++ b/pytest.ini @@ -0,0 +1,3 @@ +[pytest] +pythonpath = src +testpaths = tests diff --git a/src/config_manager.py b/src/config_manager.py index 04a349c..9343919 100644 --- a/src/config_manager.py +++ b/src/config_manager.py @@ -1,12 +1,12 @@ import logging import re -from typing import Any, Optional, List +from typing import Any, Optional, List, Dict from envyaml import EnvYAML class ConfigManager: def __init__(self, config_path: str = 'config/config.yaml'): try: - self.config = EnvYAML(config_path) + self.config: Dict[str, Any] = EnvYAML(config_path) except Exception as e: raise ValueError(f"Failed to load configuration from {config_path}: {e}") self.setup_logging() @@ -23,19 +23,23 @@ def get_authorized_users(self) -> List[int]: def setup_logging(self) -> None: log_level = self._parse_log_level(self.get('logging.level', 'INFO')) - formatter = SensitiveFormatter('%(asctime)s %(levelname)s [%(name)s] %(message)s',) + log_level_telegram = self._parse_log_level(self.get('logging.level_telegram', 'INFO')) + log_level_httpx = self._parse_log_level(self.get('logging.level_telegram', 'WARN')) + + formatter = SensitiveFormatter('%(asctime)s %(levelname)s %(name)s - %(message)s') + + handlers = [logging.StreamHandler()] + if self.get('logging.file_log', False): + handlers.append(logging.FileHandler(self.get('logging.file_path', 'meshgram.log'))) - logging.basicConfig( - level=log_level, - handlers=[ - logging.StreamHandler(), - logging.FileHandler('meshgram.log') - ] - ) + logging.basicConfig(level=log_level, handlers=handlers, format='%(asctime)s %(levelname)s %(name)s - %(message)s') - for handler in logging.getLogger().handlers: + for handler in logging.root.handlers: handler.setFormatter(formatter) + logging.getLogger('httpx').setLevel(log_level_httpx) + logging.getLogger('telegram').setLevel(log_level_telegram) + if self.get('logging.use_syslog', False): self._setup_syslog_handler() @@ -52,11 +56,13 @@ def _parse_log_level(self, level: Any) -> int: logging.warning(f"Invalid log level type: {type(level)}. Defaulting to INFO.") return logging.INFO + # TODO: seperate logging level for syslog handler def _setup_syslog_handler(self) -> None: try: - syslog_handler = logging.handlers.SysLogHandler( + from logging.handlers import SysLogHandler + syslog_handler = SysLogHandler( address=(self.get('logging.syslog_host'), self.get('logging.syslog_port', 514)), - socktype=logging.handlers.socket.SOCK_DGRAM if self.get('logging.syslog_protocol', 'udp') == 'udp' else logging.handlers.socket.SOCK_STREAM + socktype=SysLogHandler.UDP_SOCKET if self.get('logging.syslog_protocol', 'udp').lower() == 'udp' else SysLogHandler.TCP_SOCKET ) syslog_handler.setFormatter(SensitiveFormatter('%(name)s - %(levelname)s - %(message)s')) logging.getLogger().addHandler(syslog_handler) @@ -70,16 +76,15 @@ def validate_config(self) -> None: 'meshtastic.connection_type', 'meshtastic.device', ] - for key in required_keys: - if not self.get(key): - raise ValueError(f"Missing required configuration: {key}") + missing_keys = [key for key in required_keys if not self.get(key)] + if missing_keys: + raise ValueError(f"Missing required configuration: {', '.join(missing_keys)}") class SensitiveFormatter(logging.Formatter): def __init__(self, fmt: Optional[str] = None, datefmt: Optional[str] = None): super().__init__(fmt, datefmt) self.sensitive_patterns = [ - (re.compile(r'(bot\d+):(AAH[\w-]{34})'), r'\1:[REDACTED]'), - (re.compile(r'(token=)([A-Za-z0-9-_]{35,})'), r'\1[REDACTED]'), + (re.compile(r'(https://api\.telegram\.org/bot)([A-Za-z0-9:_-]{35,})(/\w+)'), r'\1[redacted]\3') ] def format(self, record: logging.LogRecord) -> str: diff --git a/src/meshgram.py b/src/meshgram.py index 4631c9d..be65569 100644 --- a/src/meshgram.py +++ b/src/meshgram.py @@ -1,6 +1,6 @@ import argparse import asyncio -from typing import Optional, List +from typing import Optional, Sequence from meshtastic_interface import MeshtasticInterface from telegram_interface import TelegramInterface from message_processor import MessageProcessor @@ -14,69 +14,72 @@ def __init__(self, config: ConfigManager) -> None: self.meshtastic: Optional[MeshtasticInterface] = None self.telegram: Optional[TelegramInterface] = None self.message_processor: Optional[MessageProcessor] = None - self.tasks: List[Task] = [] + self.tasks: Sequence[Task] = () async def setup(self) -> None: self.logger.info("Setting up meshgram...") try: - self.meshtastic = MeshtasticInterface(self.config) - await self.meshtastic.setup() - - self.telegram = TelegramInterface(self.config) - await self.telegram.setup() - + self.meshtastic = await self._setup_meshtastic() + self.telegram = await self._setup_telegram() self.message_processor = MessageProcessor(self.meshtastic, self.telegram, self.config) self.logger.info("Meshgram setup complete.") except Exception as e: - self.logger.error(f"Error during setup: {e}") + self.logger.error(f"Error during setup: {e}", exc_info=True) await self.shutdown() raise + async def _setup_meshtastic(self) -> MeshtasticInterface: + meshtastic = MeshtasticInterface(self.config) + await meshtastic.setup() + return meshtastic + + async def _setup_telegram(self) -> TelegramInterface: + telegram = TelegramInterface(self.config) + await telegram.setup() + return telegram + + async def shutdown(self) -> None: + self.logger.info("Shutting down meshgram...") + components = [self.message_processor, self.telegram, self.meshtastic] + for component in components: + if component: + try: + await component.close() + except Exception as e: + self.logger.error(f"Error closing {component.__class__.__name__}: {e}", exc_info=True) + + for task in self.tasks: + if not task.done(): + task.cancel() + await asyncio.gather(*self.tasks, return_exceptions=True) + self.logger.info("Meshgram shutdown complete.") + async def run(self) -> None: try: await self.setup() except Exception as e: - self.logger.error(f"Failed to set up Meshgram: {e}") + self.logger.error(f"Failed to set up Meshgram: {e}", exc_info=True) return - self.logger.info("Meshgram is running ใƒฝ(ยดโ–ฝ`)/") - self.tasks = [ + self.logger.info("Meshgram is running.") + self.tasks = ( asyncio.create_task(self.message_processor.process_messages()), asyncio.create_task(self.meshtastic.process_thread_safe_queue()), asyncio.create_task(self.meshtastic.process_pending_messages()), asyncio.create_task(self.telegram.start_polling()), - asyncio.create_task(self.message_processor.check_heartbeats()) - ] + ) try: await asyncio.gather(*self.tasks) except asyncio.CancelledError: self.logger.info("Received cancellation signal.") except Exception as e: - self.logger.error(f"An error occurred: {e}", exc_info=True) + self.logger.error(f"Unexpected error: {e}", exc_info=True) finally: await self.shutdown() - async def shutdown(self) -> None: - self.logger.info("Shutting down meshgram...") - for task in self.tasks: - if not task.done(): - task.cancel() - await asyncio.gather(*self.tasks, return_exceptions=True) - if self.meshtastic: - await self.meshtastic.close() - if self.telegram: - await self.telegram.close() - if self.message_processor: - if hasattr(self.message_processor, 'close'): - await self.message_processor.close() - else: - self.logger.warning("MessageProcessor does not have a close method.") - self.logger.info("Meshgram shutdown complete.") - async def main() -> None: parser = argparse.ArgumentParser(description='Meshgram: Meshtastic-Telegram Bridge') parser.add_argument('-c', '--config', default='config/config.yaml', help='Path to configuration file') - parser.add_argument('--version', action='version', version='%(prog)s 1.0.0') args = parser.parse_args() config = ConfigManager(args.config) @@ -86,10 +89,9 @@ async def main() -> None: app = Meshgram(config) try: await app.run() - except KeyboardInterrupt: - logger.info("Received keyboard interrupt. Shutting down gracefully...") - except Exception as e: - logger.error(f"Unhandled exception: {e}", exc_info=True) + except ExceptionGroup as eg: + for i, e in enumerate(eg.exceptions, 1): + logger.error(f"Exception {i}: {e}", exc_info=e) finally: await app.shutdown() @@ -97,4 +99,4 @@ async def main() -> None: try: asyncio.run(main()) except KeyboardInterrupt: - print("\nShutdown complete.") + print("\nShutdown complete.") \ No newline at end of file diff --git a/src/meshtastic_interface.py b/src/meshtastic_interface.py index cb87bfd..f1b3999 100644 --- a/src/meshtastic_interface.py +++ b/src/meshtastic_interface.py @@ -1,5 +1,6 @@ import asyncio import queue +import traceback from typing import Dict, Any, Optional, Union, List from dataclasses import dataclass from datetime import datetime, timedelta @@ -22,21 +23,21 @@ def __init__(self, config: ConfigManager) -> None: self.config = config self.logger = get_logger(__name__) self.interface: Optional[Union[SerialInterface, TCPInterface]] = None - self.message_queue = asyncio.Queue() - self.thread_safe_queue = queue.Queue() - self.loop = asyncio.get_event_loop() + self.message_queue: asyncio.Queue[Dict[str, Any]] = asyncio.Queue() + self.thread_safe_queue: queue.Queue[Dict[str, Any]] = queue.Queue() + self.loop = asyncio.get_running_loop() self.pending_messages: List[PendingMessage] = [] self.last_telemetry: Dict[str, Any] = {} - self.max_retries, self.retry_interval = 3, 60 + self.max_retries: int = 3 + self.retry_interval: int = 60 self.node_manager = NodeManager() - self.is_setup = False + self.is_setup: bool = False async def setup(self) -> None: self.logger.info("Setting up meshtastic interface...") try: self.interface = await self._create_interface() pub.subscribe(self.on_meshtastic_message, "meshtastic.receive") - pub.subscribe(self.on_connection, "meshtastic.connection.established") await self._fetch_node_info() self.is_setup = True self.logger.info("Meshtastic interface setup complete.") @@ -60,20 +61,37 @@ async def _create_interface(self) -> Union[SerialInterface, TCPInterface]: async def _fetch_node_info(self) -> None: try: - node_info = await asyncio.to_thread(self.interface.getMyNodeInfo) - await self.send_node_info(node_info) + my_node_info = await asyncio.to_thread(self.interface.getMyNodeInfo) + node_id = my_node_info.get('user', {}).get('id') + if node_id: + self.logger.info(f"Received info on our node: {my_node_info}") + else: + self.logger.error(f"Received node info without a node ID: {my_node_info}") except Exception as e: - self.logger.error(f"Failed to get node info: {e}") + self.logger.error(f"Failed to get node info: {e}", exc_info=True) - def on_meshtastic_message(self, packet, interface): - self.logger.info(f"Received message from Meshtastic: {packet}") + def on_meshtastic_message(self, packet: Dict[str, Any], interface: Any) -> None: self.logger.debug(f"Message details - fromId: {packet.get('fromId')}, toId: {packet.get('toId')}, portnum: {packet.get('decoded', {}).get('portnum')}") - self.thread_safe_queue.put(packet) + if packet.get('decoded', {}).get('portnum') == 'ROUTING_APP': + self.handle_ack(packet) + else: + self.thread_safe_queue.put(packet) - def on_connection(self, interface, topic=pub.AUTO_TOPIC): - self.logger.info(f"Connected to Meshtastic interface: {interface}") + def handle_ack(self, packet: Dict[str, Any]) -> None: + ack_data = { + 'type': 'ack', + 'from': packet.get('fromId'), + 'to': packet.get('toId'), + 'message_id': packet.get('id') + } + self.loop.call_soon_threadsafe(self.message_queue.put_nowait, ack_data) async def send_message(self, text: str, recipient: str) -> None: + if not text or not recipient: + raise ValueError("Text and recipient must not be empty") + if len(text) > 230: # Meshtastic message size limit + raise ValueError("Message too long") + self.logger.info(f"Attempting to send message to Meshtastic: {text}") try: self.logger.debug(f"Sending message to Meshtastic with recipient: {recipient}") @@ -84,46 +102,15 @@ async def send_message(self, text: str, recipient: str) -> None: self.logger.error(f"Error sending message to Meshtastic: {e}", exc_info=True) self.pending_messages.append(PendingMessage(text, recipient)) - async def send_node_info(self, node_info: Dict[str, Any]) -> None: - node_id = node_info.get('user', {}).get('id', 'unknown') - self.node_manager.update_node(node_id, { - 'name': node_info.get('user', {}).get('longName', 'unknown'), - 'shortName': node_info.get('user', {}).get('shortName', 'unknown'), - 'hwModel': node_info.get('user', {}).get('hwModel', 'unknown') - }) - await self.message_queue.put({'type': 'node_info', 'text': self.node_manager.format_node_info(node_id)}) - async def send_bell(self, dest_id: str) -> None: + if not dest_id: + raise ValueError("Destination ID must not be empty") + try: await asyncio.to_thread(self.interface.sendText, "๐Ÿ””", destinationId=dest_id) self.logger.info(f"Bell (text message) sent to node {dest_id}") except Exception as e: - self.logger.error(f"Error sending bell to node {dest_id}: {e}") - raise - - async def request_location(self, dest_id: str) -> None: - try: - await asyncio.to_thread(self.interface.sendText, "Please share your location", destinationId=dest_id) - self.logger.info(f"Location request (text message) sent to node {dest_id}") - except Exception as e: - self.logger.error(f"Error requesting location from node {dest_id}: {e}") - raise - - async def request_telemetry(self, dest_id: str) -> None: - try: - await asyncio.to_thread(self.interface.sendTelemetry) - self.logger.info(f"Telemetry request sent to node {dest_id}") - except Exception as e: - self.logger.error(f"Error requesting telemetry from node {dest_id}: {e}") - raise - - async def traceroute(self, dest_id: str) -> None: - try: - self.logger.info(f"Initiating traceroute to {dest_id}") - await asyncio.to_thread(self.interface.sendText, f"!traceroute {dest_id}", destinationId=dest_id) - self.logger.info(f"Traceroute request sent to {dest_id}") - except Exception as e: - self.logger.error(f"Error performing traceroute to node {dest_id}: {e}") + self.logger.error(f"Error sending bell to node {dest_id}: {e}", exc_info=True) raise async def process_pending_messages(self) -> None: @@ -148,7 +135,7 @@ async def process_thread_safe_queue(self) -> None: while True: try: packet = self.thread_safe_queue.get_nowait() - self.loop.call_soon_threadsafe(self.message_queue.put_nowait, packet) + await self.message_queue.put(packet) except queue.Empty: await asyncio.sleep(0.1) @@ -157,10 +144,17 @@ async def get_status(self) -> str: return "Meshtastic interface not connected" try: node_info = await asyncio.to_thread(self.interface.getMyNodeInfo) - return f"Connected to node: {node_info.get('user', {}).get('longName', 'Unknown')}\n" \ - f"Battery level: {node_info.get('deviceMetrics', {}).get('batteryLevel', 'Unknown')}\n" \ - f"Channel utilization: {node_info.get('deviceMetrics', {}).get('channelUtilization', 'Unknown')}" + battery_level = node_info.get('deviceMetrics', {}).get('batteryLevel', 'N/A') + battery_str = "PWR" if battery_level == 101 else f"{battery_level}%" + air_util_tx = node_info.get('deviceMetrics', {}).get('airUtilTx', 'N/A') + air_util_tx_str = f"{air_util_tx:.2f}%" if isinstance(air_util_tx, (int, float)) else air_util_tx + return ( + f"Node: {node_info.get('user', {}).get('longName', 'N/A')}\n" + f"Battery: {battery_str}\n" + f"Air Utilization TX: {air_util_tx_str}" + ) except Exception as e: + self.logger.error(f"Error getting meshtastic status: {e}", exc_info=True) return f"Error getting meshtastic status: {e}" async def close(self) -> None: @@ -170,7 +164,30 @@ async def close(self) -> None: try: await asyncio.to_thread(self.interface.close) pub.unsubscribe(self.on_meshtastic_message, "meshtastic.receive") - pub.unsubscribe(self.on_connection, "meshtastic.connection.established") except Exception as e: - self.logger.error(f"Error closing Meshtastic interface: {e}") - self.logger.info("Meshtastic interface closed.") \ No newline at end of file + self.logger.error(f"Error closing Meshtastic interface: {e}", exc_info=True) + self.logger.info("Meshtastic interface closed.") + + async def reconnect(self) -> None: + self.logger.info("Attempting to reconnect to Meshtastic...") + try: + if self.interface: + await asyncio.to_thread(self.interface.close) + self.interface = await self._create_interface() + self.logger.info("Reconnected to Meshtastic successfully.") + except Exception as e: + self.logger.error(f"Failed to reconnect to Meshtastic: {e}", exc_info=True) + + async def periodic_health_check(self) -> None: + while True: + try: + await asyncio.to_thread(self.interface.ping) + except Exception as e: + self.logger.error(f"Health check failed: {e}", exc_info=True) + await self.reconnect() + await asyncio.sleep(60) # Check every minute + + def start_background_tasks(self) -> None: + asyncio.create_task(self.process_pending_messages()) + asyncio.create_task(self.process_thread_safe_queue()) + asyncio.create_task(self.periodic_health_check()) diff --git a/src/message_processor.py b/src/message_processor.py index 0397b36..f1f2916 100644 --- a/src/message_processor.py +++ b/src/message_processor.py @@ -1,6 +1,7 @@ import asyncio -from typing import Dict, Any +from typing import Dict, Any, List from datetime import datetime, timezone +from telegram import Update from meshtastic_interface import MeshtasticInterface from telegram_interface import TelegramInterface from config_manager import ConfigManager, get_logger @@ -13,97 +14,12 @@ def __init__(self, meshtastic: MeshtasticInterface, telegram: TelegramInterface, self.telegram = telegram self.node_manager = meshtastic.node_manager self.start_time = datetime.now(timezone.utc) - self.last_heartbeat = {} - self.heartbeat_timeout = config.get('meshtastic.heartbeat_timeout', 300) self.local_nodes = config.get('meshtastic.local_nodes', []) - self.pending_requests = {} # For tracking location, telemetry, and traceroute requests - - async def handle_meshtastic_message(self, packet: Dict[str, Any]) -> None: - self.logger.debug(f"Received Meshtastic message: {packet}") - if packet.get('fromId') not in self.local_nodes: - self.logger.info(f"Message from non-local node: {packet.get('fromId')}") - - try: - portnum = packet.get('decoded', {}).get('portnum') - handler = getattr(self, f"handle_{portnum.lower()}", None) - if handler: - self.logger.info(f"Handling Meshtastic message type: {portnum}") - await handler(packet) - else: - self.logger.warning(f"Unhandled Meshtastic message type: {portnum}") - except Exception as e: - self.logger.error(f'Error handling Meshtastic message: {e}', exc_info=True) - - async def handle_text_message_app(self, packet: Dict[str, Any]) -> None: - text = packet['decoded']['payload'].decode('utf-8') - sender, recipient = packet.get('fromId', 'unknown'), packet.get('toId', 'unknown') - - message = f"[Meshtastic:{sender}->{recipient}] {text}" - self.logger.info(f"Sending Meshtastic message to Telegram: {message}") - await self.telegram.send_message(message, disable_notification=False) - - async def handle_telegram_text(self, message: Dict[str, Any]) -> None: - self.logger.info(f"Handling Telegram text message: {message}") - sender = message['sender'][:10] - recipient = self.config.get('meshtastic.default_node_id', '^all') - text = message['text'] - - meshtastic_message = f"[TG:{sender}] {text}" - self.logger.info(f"Preparing to send Telegram message to Meshtastic: {meshtastic_message}") - try: - await self.meshtastic.send_message(meshtastic_message, recipient) - self.logger.info(f"Successfully sent message to Meshtastic: {meshtastic_message}") - except Exception as e: - self.logger.error(f"Failed to send message to Meshtastic: {e}", exc_info=True) - await self.telegram.send_message("Failed to send message to Meshtastic. Please try again.") - - # Add this line to check if the message is being processed - self.logger.info("Finished handling Telegram text message") - - async def handle_position_app(self, packet: Dict[str, Any]) -> None: - position = packet['decoded'].get('position', {}) - sender = packet.get('fromId', 'unknown') - self.node_manager.update_node_position(sender, position) - position_info = self.node_manager.get_node_position(sender) - await self.update_or_send_message('location', sender, position_info) - - async def handle_telemetry_app(self, packet: Dict[str, Any]) -> None: - node_id = packet.get('fromId', 'unknown') - telemetry = packet.get('decoded', {}).get('telemetry', {}) - device_metrics = telemetry.get('deviceMetrics', {}) - self.node_manager.update_node_telemetry(node_id, device_metrics) - self.last_heartbeat[node_id] = datetime.now(timezone.utc) - telemetry_info = self.node_manager.get_node_telemetry(node_id) - await self.update_or_send_message('telemetry', node_id, telemetry_info) - - async def handle_admin_app(self, packet: Dict[str, Any]) -> None: - admin_message = packet.get('decoded', {}).get('admin', {}) - self.logger.info(f"Received admin message: {admin_message}") - if 'getRouteReply' in admin_message: - route = admin_message['getRouteReply'].get('route', []) - dest_id = packet.get('toId', 'unknown') - route_str = " -> ".join(map(str, route)) if route else "No route found" - traceroute_result = f"๐Ÿ” Traceroute to {dest_id}:\n{route_str}" - await self.update_or_send_message('traceroute', dest_id, traceroute_result) - elif 'getChannelResponse' in admin_message: - self.logger.info(f"Received channel response: {admin_message['getChannelResponse']}") - else: - self.logger.warning(f"Unhandled admin message: {admin_message}") - - async def update_or_send_message(self, request_type: str, node_id: str, content: str) -> None: - request_key = f"{request_type}:{node_id}" - if request_key in self.pending_requests: - await self.telegram.send_or_update_message(content, message_id=self.pending_requests[request_key]) - del self.pending_requests[request_key] - else: - await self.telegram.send_message(content, disable_notification=True) async def process_messages(self) -> None: tasks = [ self.process_meshtastic_messages(), - self.process_telegram_messages(), - self.periodic_status_update(), - self.check_heartbeats() + self.process_telegram_messages() ] await asyncio.gather(*tasks) @@ -125,146 +41,251 @@ async def process_meshtastic_messages(self) -> None: async def process_telegram_messages(self) -> None: while True: try: - self.logger.debug("Waiting for Telegram message...") message = await self.telegram.message_queue.get() self.logger.info(f"Processing Telegram message: {message}") - if message['type'] == 'command': - await self.handle_telegram_command(message) - elif message['type'] == 'telegram': - await self.handle_telegram_text(message) - elif message['type'] == 'location': - await self.handle_telegram_location(message) - else: - self.logger.warning(f"Received unknown message type: {message['type']}") - self.logger.debug("Finished processing Telegram message") + await self.handle_telegram_message(message) except asyncio.CancelledError: break except Exception as e: self.logger.error(f"Error processing Telegram message: {e}", exc_info=True) await asyncio.sleep(0.1) - async def periodic_status_update(self) -> None: - while True: - try: - await asyncio.sleep(3600) - status = await self.get_status() - await self.telegram.send_message(status) - except asyncio.CancelledError: - break - except Exception as e: - self.logger.error(f"Error in periodic status update: {e}", exc_info=True) + async def handle_telegram_message(self, message: Dict[str, Any]) -> None: + handlers = { + 'command': self.handle_telegram_command, + 'telegram': self.handle_telegram_text, + 'location': self.handle_telegram_location + } + handler = handlers.get(message['type']) + if handler: + await handler(message) + else: + self.logger.warning(f"Received unknown message type: {message['type']}") - async def get_status(self) -> str: - uptime = datetime.now(timezone.utc) - self.start_time - meshtastic_status = await self.meshtastic.get_status() - num_nodes = len(self.node_manager.get_all_nodes()) - return f"Meshgram Status:\nUptime: {uptime}\nConnected Nodes: {num_nodes}\nMeshtastic Status:\n{meshtastic_status}" + async def handle_meshtastic_message(self, packet: Dict[str, Any]) -> None: + self.logger.debug(f"Received Meshtastic message: {packet}") + if packet.get('fromId') not in self.local_nodes: + self.logger.info(f"Message from non-local node: {packet.get('fromId')}") + + try: + portnum = packet.get('decoded', {}).get('portnum') + handler = getattr(self, f"handle_{portnum.lower()}", None) + if handler: + self.logger.info(f"Handling Meshtastic message type {portnum} from {packet.get('fromId')}") + await handler(packet) + else: + self.logger.warning(f"Unhandled Meshtastic message type: {portnum} from: {packet.get('fromId')}") + except Exception as e: + self.logger.error(f'Error handling Meshtastic message: {e}', exc_info=True) + + async def handle_text_message_app(self, packet: Dict[str, Any]) -> None: + text = packet['decoded']['payload'].decode('utf-8') + sender, recipient = packet.get('fromId', 'unknown'), packet.get('toId', 'unknown') + + message = f"๐Ÿ“ก Meshtastic: {sender} โ†’ {recipient}\n๐Ÿ’ฌ {text}" + self.logger.info(f"Sending Meshtastic message to Telegram: {message}") + await self.telegram.send_message(message, disable_notification=False) async def handle_nodeinfo_app(self, packet: Dict[str, Any]) -> None: - node_id = packet.get('from', 'unknown') + node_id = packet.get('fromId', 'unknown') node_info = packet['decoded'] self.node_manager.update_node(node_id, { 'shortName': node_info.get('user', {}).get('shortName', 'unknown'), 'longName': node_info.get('user', {}).get('longName', 'unknown'), 'hwModel': node_info.get('user', {}).get('hwModel', 'unknown') }) - await self.telegram.send_message(self.node_manager.format_node_info(node_id), disable_notification=True) + info_text = self.node_manager.format_node_info(node_id) + await self.telegram.send_or_edit_message('nodeinfo', node_id, info_text) + + async def handle_admin_app(self, packet: Dict[str, Any]) -> None: + admin_message = packet.get('decoded', {}).get('admin', {}) + if 'getRouteReply' in admin_message: + await self._handle_route_reply(admin_message, packet.get('toId', 'unknown')) + elif 'deviceMetrics' in admin_message: + await self._handle_device_metrics(packet.get('fromId', 'unknown'), admin_message['deviceMetrics']) + elif 'position' in admin_message: + await self._handle_position(packet.get('fromId', 'unknown'), admin_message['position']) + else: + self.logger.warning(f"Received unexpected admin message: {admin_message}") + + async def _handle_route_reply(self, admin_message: Dict[str, Any], dest_id: str) -> None: + route = admin_message['getRouteReply'].get('route', []) + if route: + route_str = " โ†’ ".join(f"!{node:08x}" for node in route) + traceroute_result = f"๐Ÿ” Traceroute to {dest_id}:\n{route_str}" + else: + traceroute_result = f"๐Ÿ” Traceroute to {dest_id}: No route found" + await self.telegram.send_message(traceroute_result) + + async def _handle_device_metrics(self, node_id: str, device_metrics: Dict[str, Any]) -> None: + self.node_manager.update_node_telemetry(node_id, device_metrics) + telemetry_info = self.node_manager.get_node_telemetry(node_id) + await self.telegram.send_or_edit_message('telemetry', node_id, telemetry_info) + + async def _handle_position(self, node_id: str, position: Dict[str, Any]) -> None: + self.node_manager.update_node_position(node_id, position) + position_info = self.node_manager.get_node_position(node_id) + await self.telegram.send_or_edit_message('location', node_id, position_info) + + async def handle_routing_app(self, packet: Dict[str, Any]) -> None: + routing_info = packet.get('decoded', {}).get('routing', {}) + node_id = packet.get('fromId', 'unknown') + self.node_manager.update_node_routing(node_id, routing_info) + routing_text = self.node_manager.format_node_routing(node_id) + await self.telegram.send_or_edit_message('routing', node_id, routing_text) + + async def handle_neighborinfo_app(self, packet: Dict[str, Any]) -> None: + neighbor_info = packet.get('decoded', {}).get('neighbors', {}) + node_id = packet.get('fromId', 'unknown') + self.node_manager.update_node_neighbors(node_id, neighbor_info) + neighbor_text = self.node_manager.format_node_neighbors(node_id) + await self.telegram.send_or_edit_message('neighbors', node_id, neighbor_text) + + async def handle_telemetry_app(self, packet: Dict[str, Any]) -> None: + node_id = packet.get('fromId', 'unknown') + telemetry = packet.get('decoded', {}).get('telemetry', {}) + device_metrics = telemetry.get('deviceMetrics', {}) + self.node_manager.update_node_telemetry(node_id, device_metrics) + telemetry_info = self.node_manager.get_node_telemetry(node_id) + await self.telegram.send_or_edit_message('telemetry', node_id, telemetry_info) + + async def handle_position_app(self, packet: Dict[str, Any]) -> None: + position = packet['decoded'].get('position', {}) + node_id = packet.get('fromId', 'unknown') + self.node_manager.update_node_position(node_id, position) + position_info = self.node_manager.get_node_position(node_id) + await self.telegram.send_or_edit_message('location', node_id, position_info) + + latitude = position.get('latitudeI', 0) / 1e7 + longitude = position.get('longitudeI', 0) / 1e7 + if latitude != 0 and longitude != 0: + await self.telegram.bot.send_location(chat_id=self.telegram.chat_id, latitude=latitude, longitude=longitude) + + async def handle_detection_sensor_app(self, packet: Dict[str, Any]) -> None: + sensor_data = packet.get('decoded', {}).get('detectionSensor', {}) + node_id = packet.get('fromId', 'unknown') + self.node_manager.update_node_sensor(node_id, sensor_data) + sensor_info = self.node_manager.get_node_sensor_info(node_id) + await self.telegram.send_or_edit_message('sensor', node_id, sensor_info) + + async def handle_telegram_text(self, message: Dict[str, Any]) -> None: + self.logger.info(f"Handling Telegram text message: {message}") + sender = message['sender'][:10] + recipient = self.config.get('meshtastic.default_node_id') + text = message['text'] + + meshtastic_message = f"[TG:{sender}] {text}" + self.logger.info(f"Preparing to send Telegram message to Meshtastic: {meshtastic_message}") + try: + await self.meshtastic.send_message(meshtastic_message, recipient) + self.logger.info(f"Successfully sent message to Meshtastic: {meshtastic_message}") + except Exception as e: + self.logger.error(f"Failed to send message to Meshtastic: {e}", exc_info=True) + await self.telegram.send_message("Failed to send message to Meshtastic. Please try again.") async def handle_telegram_command(self, message: Dict[str, Any]) -> None: try: command = message.get('command', '').partition('@')[0] - args, user_id = message.get('args', {}), message.get('user_id') + args, user_id, update = message.get('args', []), message.get('user_id'), message.get('update') if not self.telegram.is_user_authorized(user_id) and command not in ['start', 'help', 'user']: - await self.telegram.send_message("You are not authorized to use this command.") + await update.message.reply_text("You are not authorized to use this command.") return handler = getattr(self, f"cmd_{command}", None) if handler: - await handler(args, user_id) + await handler(args, user_id, update) else: - await self.telegram.send_message(f"Unknown command: {command}") + await update.message.reply_text(f"Unknown command: {command}") except Exception as e: self.logger.error(f'Error handling Telegram command: {e}', exc_info=True) - await self.telegram.send_message(f"Error executing command: {e}") - - def validate_node_id(self, node_id: str) -> bool: - return len(node_id) == 8 and all(c in '0123456789abcdefABCDEF' for c in node_id) + await update.message.reply_text(f"Error executing command: {e}") async def handle_telegram_location(self, message: Dict[str, Any]) -> None: lat, lon = message['location']['latitude'], message['location']['longitude'] + alt = message['location'].get('altitude', 0) sender = message['sender'] - await self.meshtastic.send_location(lat, lon, f"Location from telegram user {sender}") + try: + if not self.is_valid_coordinate(lat, lon, alt): + raise ValueError("Invalid coordinates") - async def cmd_status(self, args: Dict[str, Any], user_id: int) -> None: - nodes = self.node_manager.get_all_nodes() - total_nodes = len(nodes) - active_nodes = sum(1 for node in nodes.values() if 'last_updated' in node) - - status_text = f"๐Ÿ“Š Meshgram Status\n๐Ÿ”ข Total nodes: {total_nodes}\nโœ… Active nodes: {active_nodes}\n\n" - for node_id, node_info in nodes.items(): - status_text += (f"๐Ÿ”ท Node {node_id}:\n" - f"๐Ÿ“› Name: {node_info.get('name', 'Unknown')}\n" - f"๐Ÿ”‹ Battery: {node_info.get('batteryLevel', 'Unknown')}\n" - f"โฑ๏ธ Uptime: {node_info.get('uptimeSeconds', 'Unknown')} seconds\n" - f"๐Ÿ•’ Last updated: {node_info.get('last_updated', 'Unknown')}\n\n") - - await self.telegram.send_message(status_text) + recipient = self.config.get('meshtastic.default_node_id') + await self.meshtastic.send_message(f"[TG:{sender}] location lat={lat:.6f}, lon={lon:.6f}, alt={alt:.1f}m", recipient) + await self.telegram.send_message(f"๐Ÿ“ Location sent to Meshtastic network: lat={lat:.6f}, lon={lon:.6f}, alt={alt:.1f}m") + except ValueError as e: + self.logger.error(f"Invalid location data: {e}") + await self.telegram.send_message(f"Failed to send location to Meshtastic. Invalid data: {e}") + except Exception as e: + self.logger.error(f"Failed to send location to Meshtastic: {e}", exc_info=True) + await self.telegram.send_message("Failed to send location to Meshtastic. Please try again.") - async def cmd_node(self, args: Dict[str, Any], user_id: int) -> None: - node_id = args.get('node_id') or self.config.get('meshtastic.default_node_id') - if not node_id: - await self.telegram.send_message("No node ID provided and no default node ID set.") - return - node_info = self.node_manager.format_node_info(node_id) - await self.telegram.send_message(node_info) + def is_valid_coordinate(self, lat: float, lon: float, alt: float) -> bool: + return -90 <= lat <= 90 and -180 <= lon <= 180 and -1000 <= alt <= 50000 + + async def get_status(self) -> str: + uptime = datetime.now(timezone.utc) - self.start_time + meshtastic_status = await self.meshtastic.get_status() + num_nodes = len(self.node_manager.get_all_nodes()) + return (f"๐Ÿ“Š Meshgram Status:\n" + f"โฑ๏ธ Uptime: {self._format_uptime(uptime.total_seconds())}\n" + f"๐Ÿ”ข Connected Nodes: {num_nodes}\n\n" + f"๐Ÿ“ก Meshtastic Status:\n{meshtastic_status}") - async def cmd_bell(self, args: Dict[str, Any], user_id: int) -> None: - dest_id = args.get('node_id') or self.config.get('meshtastic.default_node_id') + async def cmd_status(self, args: List[str], user_id: int, update: Update) -> None: + status = await self.get_status() + await update.message.reply_text(status) + + async def cmd_bell(self, args: List[str], user_id: int, update: Update) -> None: + dest_id = args[0] if args else self.config.get('meshtastic.default_node_id') + if not dest_id: + await update.message.reply_text("No node ID provided and no default node ID set.") + return self.logger.info(f"Sending bell to node {dest_id}") try: await self.meshtastic.send_bell(dest_id) - self.logger.info(f"Bell sent successfully to node {dest_id}") - await self.telegram.send_message(f"๐Ÿ”” Bell sent to node {dest_id}.", disable_notification=True) + await update.message.reply_text(f"๐Ÿ”” Bell sent to node {dest_id}.", disable_notification=True) except Exception as e: self.logger.error(f"Failed to send bell to node {dest_id}: {e}", exc_info=True) - await self.telegram.send_message(f"Failed to send bell to node {dest_id}. Error: {str(e)}") - - async def cmd_location(self, args: Dict[str, Any], user_id: int) -> None: - dest_id = args.get('node_id') or self.config.get('meshtastic.default_node_id') - await self.meshtastic.request_location(dest_id) - message = await self.telegram.send_message(f"๐Ÿ“ Location request sent to node {dest_id}. Waiting for response...") - if message: - self.pending_requests[f"location:{dest_id}"] = message.message_id - - async def cmd_telemetry(self, args: Dict[str, Any], user_id: int) -> None: - dest_id = args.get('node_id') or self.config.get('meshtastic.default_node_id') - await self.meshtastic.request_telemetry(dest_id) - message = await self.telegram.send_message(f"๐Ÿ“Š Telemetry request sent to node {dest_id}. Waiting for response...") - if message: - self.pending_requests[f"telemetry:{dest_id}"] = message.message_id - - async def cmd_traceroute(self, args: Dict[str, Any], user_id: int) -> None: - dest_id = args.get('node_id') or self.config.get('meshtastic.default_node_id') - message = await self.telegram.send_message(f"๐Ÿ” Initiating traceroute to {dest_id}...") - if message: - self.pending_requests[f"traceroute:{dest_id}"] = message.message_id - await self.meshtastic.traceroute(dest_id) - - async def check_heartbeats(self) -> None: - while True: - try: - now = datetime.now(timezone.utc) - for node_id, last_heartbeat in list(self.last_heartbeat.items()): - if (now - last_heartbeat).total_seconds() > self.heartbeat_timeout: - del self.last_heartbeat[node_id] - await self.telegram.send_message(f"โš ๏ธ Node {node_id} is no longer active.") - await asyncio.sleep(60) - except Exception as e: - self.logger.error(f"Error in check_heartbeats: {e}", exc_info=True) - await asyncio.sleep(60) + await update.message.reply_text(f"Failed to send bell to node {dest_id}. Error: {str(e)}") + + async def cmd_node(self, args: List[str], user_id: int, update: Update) -> None: + node_id = args[0] if args else self.config.get('meshtastic.default_node_id') + if not node_id: + await update.message.reply_text("No node ID provided and no default node ID set.") + return + node_info = self.node_manager.format_node_info(node_id) + telemetry_info = self.node_manager.get_node_telemetry(node_id) + position_info = self.node_manager.get_node_position(node_id) + routing_info = self.node_manager.format_node_routing(node_id) + neighbor_info = self.node_manager.format_node_neighbors(node_id) + sensor_info = self.node_manager.get_node_sensor_info(node_id) + + full_info = f"{node_info}\n\n{telemetry_info}\n\n{position_info}\n\n{routing_info}\n\n{neighbor_info}\n\n{sensor_info}" + await update.message.reply_text(full_info) async def close(self) -> None: self.logger.info("Closing MessageProcessor...") # Add any cleanup operations here if needed self.logger.info("MessageProcessor closed.") + + def _format_uptime(self, seconds: int) -> str: + days, remainder = divmod(seconds, 86400) + hours, remainder = divmod(remainder, 3600) + minutes, _ = divmod(remainder, 60) + return f"{int(days)}d {int(hours)}h {int(minutes)}m" + + def _format_channel_utilization(self, value: float) -> str: + return f"{value:.2f}%" if isinstance(value, (int, float)) else str(value) + + async def _update_telemetry_message(self, node_id: str, telemetry_data: Dict[str, Any]) -> None: + self.node_manager.update_node_telemetry(node_id, telemetry_data) + telemetry_info = self.node_manager.get_node_telemetry(node_id) + await self.telegram.send_or_edit_message('telemetry', node_id, telemetry_info) + + async def _update_location_message(self, node_id: str, position_data: Dict[str, Any]) -> None: + self.node_manager.update_node_position(node_id, position_data) + position_info = self.node_manager.get_node_position(node_id) + await self.telegram.send_or_edit_message('location', node_id, position_info) + + def _get_battery_status(self, battery_level: int) -> str: + return "PWR" if battery_level == 101 else f"{battery_level}%" \ No newline at end of file diff --git a/src/node_manager.py b/src/node_manager.py index 1ddcbe3..280f485 100644 --- a/src/node_manager.py +++ b/src/node_manager.py @@ -1,11 +1,14 @@ -from typing import Dict, Any, Optional, List +from typing import Dict, Any, Optional, List, Union from datetime import datetime, timedelta class NodeManager: def __init__(self): self.nodes: Dict[str, Dict[str, Any]] = {} self.node_history: Dict[str, List[Dict[str, Any]]] = {} - self.history_limit = 100 + self.history_limit: int = 100 + + def format_node_name(self, node_id: Union[str, int], short_name: str) -> str: + return f'{node_id} [{short_name}]' def update_node(self, node_id: str, data: Dict[str, Any]) -> None: if node_id not in self.nodes: @@ -25,46 +28,44 @@ def get_node(self, node_id: str) -> Optional[Dict[str, Any]]: def get_all_nodes(self) -> Dict[str, Dict[str, Any]]: return self.nodes - def get_node_history(self, node_id: str) -> List[Dict[str, Any]]: - return self.node_history.get(node_id, []) - - def format_node_info(self, node_id: str) -> str: + def get_node_position(self, node_id: str) -> str: node = self.get_node(node_id) if not node: - return f"โ„น๏ธ No information available for node {node_id}" - info = [f"๐Ÿ”ท Discovered Node ID: {node_id}"] - for key, value in node.items(): - if key != 'last_updated': - emoji = { - 'name': '๐Ÿ“›', 'shortName': '๐Ÿท๏ธ', 'hwModel': '๐Ÿ–ฅ๏ธ', - 'batteryLevel': '๐Ÿ”‹', 'voltage': 'โšก', 'channelUtilization': '๐Ÿ“Š', - 'airUtilTx': '๐Ÿ“ก', 'temperature': '๐ŸŒก๏ธ', 'relativeHumidity': '๐Ÿ’ง', - 'barometricPressure': '๐ŸŒช๏ธ', 'gasResistance': '๐Ÿ’จ', 'current': 'โšก' - }.get(key, '๐Ÿ”น') - info.append(f"{emoji} {key.capitalize()}: {value}") - info.append(f"๐Ÿ•’ Last updated: {node.get('last_updated', 'Unknown')}") - return "\n".join(info) + return self.escape_markdown(f"๐Ÿ“ No position available for node {node_id}") + latitude = node.get('latitude', 'N/A') + longitude = node.get('longitude', 'N/A') + last_position_update = node.get('last_position_update', 'N/A') + + return self.escape_markdown( + f"๐Ÿ“ Position for node {node_id}:\n" + f"๐ŸŒŽ Latitude: {latitude}\n" + f"๐ŸŒ Longitude: {longitude}\n" + f"๐Ÿ•’ Last updated: {self._format_date(last_position_update) if last_position_update != 'N/A' else 'N/A'}" + ) + def get_node_telemetry(self, node_id: str) -> str: node = self.get_node(node_id) if not node: - return f"No telemetry available for node {node_id}" - air_util_tx = node.get('airUtilTx', 'Unknown') - air_util_tx_str = f"{air_util_tx:.2f}%" if isinstance(air_util_tx, (int, float)) else air_util_tx - return (f"๐Ÿ“Š Telemetry for node {node_id}:\n" - f"โ€ข Battery: {node.get('batteryLevel', 'Unknown')}%\n" - f"โ€ข Air Utilization TX: {air_util_tx_str}\n" - f"โ€ข Uptime: {node.get('uptimeSeconds', 'Unknown')} seconds\n" - f"โ€ข Last updated: {node.get('last_updated', 'Unknown')}") + return self.escape_markdown(f"๐Ÿ“Š No telemetry available for node {node_id}") + + battery_level = node.get('batteryLevel', 'N/A') + battery_str = "PWR" if battery_level == 101 else f"{battery_level}%" + air_util_tx = node.get('airUtilTx', 'N/A') + air_util_tx_str = self._format_percentage(air_util_tx) + channel_utilization = node.get('channelUtilization', 'N/A') + channel_utilization_str = self._format_percentage(channel_utilization) + uptime = node.get('uptimeSeconds', 'N/A') + last_updated = node.get('last_updated', 'N/A') - def get_node_position(self, node_id: str) -> str: - node = self.get_node(node_id) - if not node: - return f"No position available for node {node_id}" - return (f"๐Ÿ“ Position for node {node_id}:\n" - f"โ€ข Latitude: {node.get('latitude', 'Unknown')}\n" - f"โ€ข Longitude: {node.get('longitude', 'Unknown')}\n" - f"โ€ข Last updated: {node.get('last_position_update', 'Unknown')}") + return self.escape_markdown( + f"๐Ÿ“Š Telemetry for node {node_id}:\n" + f"๐Ÿ”‹ Battery: {battery_str}\n" + f"๐Ÿ“ก Air Utilization TX: {air_util_tx_str}\n" + f"๐Ÿ“Š Channel Utilization: {channel_utilization_str}\n" + f"โฑ๏ธ Uptime: {uptime} seconds\n" + f"๐Ÿ•’ Last updated: {self._format_date(last_updated) if last_updated != 'N/A' else 'N/A'}" + ) def validate_node_id(self, node_id: str) -> bool: return len(node_id) == 8 and all(c in '0123456789abcdefABCDEF' for c in node_id) @@ -90,6 +91,91 @@ def get_inactive_nodes(self, timeout: int = 300) -> List[str]: (now - datetime.fromisoformat(node['last_updated'])) > timedelta(seconds=timeout) ] + def format_node_info(self, node_id: str) -> str: + node = self.get_node(node_id) + if not node: + return self.escape_markdown(f"โ„น๏ธ No information available for node {node_id}") + + short_name = node.get('shortName', 'unknown') + formatted_name = self.format_node_name(node_id, short_name) + info = [f"๐Ÿ”ท Node {formatted_name}:"] + emoji_map = { + 'name': '๐Ÿ“›', 'longName': '๐Ÿ“', 'hwModel': '๐Ÿ–ฅ๏ธ', + 'batteryLevel': '๐Ÿ”‹', 'voltage': 'โšก', 'channelUtilization': '๐Ÿ“Š', + 'airUtilTx': '๐Ÿ“ก', 'temperature': '๐ŸŒก๏ธ', 'relativeHumidity': '๐Ÿ’ง', + 'barometricPressure': '๐ŸŒช๏ธ', 'gasResistance': '๐Ÿ’จ', 'current': 'โšก', + 'last_updated': '๐Ÿ•’' + } + + for key, value in node.items(): + if key == 'last_updated': + value = self._format_date(value) + elif key in ['channelUtilization', 'airUtilTx']: + value = self._format_percentage(value) + elif key == 'shortName': + continue # Skip shortName as it's already included in the formatted name + emoji = emoji_map.get(key, '๐Ÿ”น') + info.append(self.escape_markdown(f"{emoji} {key.capitalize()}: {value}")) + + return "\n".join(info) + + def _format_date(self, date_str: str) -> str: + try: + date = datetime.fromisoformat(date_str) + now = datetime.now(date.tzinfo) + delta = now - date + if delta < timedelta(minutes=1): + return "just now" + elif delta < timedelta(hours=1): + return f"{delta.seconds // 60} minutes ago" + elif delta < timedelta(days=1): + return f"{delta.seconds // 3600} hours ago" + else: + return f"{delta.days} days ago" + except ValueError: + return "Unknown" + + def _format_percentage(self, value: Any) -> str: + return f"{value:.2f}%" if isinstance(value, (int, float)) else str(value) + def remove_node(self, node_id: str) -> None: self.nodes.pop(node_id, None) - self.node_history.pop(node_id, None) \ No newline at end of file + self.node_history.pop(node_id, None) + + def update_node_routing(self, node_id: str, routing_info: Dict[str, Any]) -> None: + self.update_node(node_id, {'routing': routing_info}) + + def update_node_neighbors(self, node_id: str, neighbor_info: Dict[str, Any]) -> None: + self.update_node(node_id, {'neighbors': neighbor_info}) + + def update_node_sensor(self, node_id: str, sensor_data: Dict[str, Any]) -> None: + self.update_node(node_id, {'sensor': sensor_data}) + + def format_node_routing(self, node_id: str) -> str: + node = self.get_node(node_id) + if not node or 'routing' not in node: + return self.escape_markdown(f"๐Ÿ”€ No routing information available for node {node_id}") + + routing_info = node['routing'] + return self.escape_markdown(f"๐Ÿ”€ Routing information for node {node_id}:\n" + "\n".join(f" {k}: {v}" for k, v in routing_info.items())) + + def format_node_neighbors(self, node_id: str) -> str: + node = self.get_node(node_id) + if not node or 'neighbors' not in node: + return self.escape_markdown(f"๐Ÿ‘ฅ No neighbor information available for node {node_id}") + + neighbor_info = node['neighbors'] + return self.escape_markdown(f"๐Ÿ‘ฅ Neighbor information for node {node_id}:\n" + "\n".join(f" {k}: {v}" for k, v in neighbor_info.items())) + + def get_node_sensor_info(self, node_id: str) -> str: + node = self.get_node(node_id) + if not node or 'sensor' not in node: + return self.escape_markdown(f"๐Ÿ”ฌ No sensor information available for node {node_id}") + + sensor_data = node['sensor'] + return self.escape_markdown(f"๐Ÿ”ฌ Sensor information for node {node_id}:\n" + "\n".join(f" {k}: {v}" for k, v in sensor_data.items())) + + @staticmethod + def escape_markdown(text: str) -> str: + # FIXME: convert all callers to not use this method + return text diff --git a/src/telegram_interface.py b/src/telegram_interface.py index 169796a..b82bed8 100644 --- a/src/telegram_interface.py +++ b/src/telegram_interface.py @@ -1,39 +1,30 @@ import asyncio -from typing import Dict, Any, Optional, List -from telegram import Bot, Update, BotCommand, Message, InlineKeyboardButton, InlineKeyboardMarkup +import re +from typing import Dict, Any, Optional, Callable +from telegram import Bot, Update, InlineKeyboardButton, InlineKeyboardMarkup from telegram.ext import Application, CommandHandler, ContextTypes, MessageHandler, filters +from telegram.constants import ParseMode +from telegram.helpers import escape_markdown from config_manager import ConfigManager, get_logger -import time - -class RateLimiter: - def __init__(self, max_calls: int, period: float): - self.max_calls, self.period, self.calls = max_calls, period, [] - - async def wait(self): - now = time.time() - self.calls = [call for call in self.calls if now - call < self.period] - if len(self.calls) >= self.max_calls: - await asyncio.sleep(self.period - (now - self.calls[0])) - self.calls.append(time.time()) class TelegramInterface: def __init__(self, config: ConfigManager) -> None: - self.config, self.logger = config, get_logger(__name__) - self.bot, self.application = None, None - self.message_queue, self._stop_event = asyncio.Queue(), asyncio.Event() - self.chat_id, self.last_node_messages = None, {} - self.commands = { + self.config = config + self.logger = get_logger(__name__) + self.bot: Optional[Bot] = None + self.application: Optional[Application] = None + self.message_queue: asyncio.Queue[Dict[str, Any]] = asyncio.Queue() + self._stop_event: asyncio.Event = asyncio.Event() + self.chat_id: Optional[int] = None + self.last_messages: Dict[str, int] = {} + self.commands: Dict[str, Dict[str, str | Callable]] = { 'start': {'description': 'Start the bot and see available commands', 'handler': self.start_command}, 'help': {'description': 'Show help message', 'handler': self.help_command}, 'status': {'description': 'Check the current status', 'handler': self.handle_command}, - 'bell': {'description': 'Send a bell to the meshtastic chat group', 'handler': self.handle_command}, - 'location': {'description': 'Request location from meshtastic side', 'handler': self.handle_command}, - 'telemetry': {'description': 'Request telemetry or display last received value', 'handler': self.handle_command}, - 'traceroute': {'description': 'Trace route to a specific node', 'handler': self.handle_command}, + 'bell': {'description': 'Send a bell to the meshtastic user', 'handler': self.handle_command}, 'node': {'description': 'Get information about a specific node', 'handler': self.handle_command}, 'user': {'description': 'Get information about your Telegram user', 'handler': self.user_command}, } - self.rate_limiter = RateLimiter(max_calls=30, period=60) async def setup(self) -> None: self.logger.info("Setting up telegram interface...") @@ -42,12 +33,9 @@ async def setup(self) -> None: if not token: raise ValueError("Telegram bot token not found in configuration") self.bot = Bot(token=token) - self.application = Application.builder().token(token=token).build() - self.application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, self.on_telegram_message)) - self.application.add_handler(MessageHandler(filters.LOCATION, self.on_telegram_location)) - for command, data in self.commands.items(): - self.application.add_handler(CommandHandler(command, data['handler'])) - await self.bot.set_my_commands([BotCommand(command, data['description']) for command, data in self.commands.items()]) + self.application = Application.builder().token(token).build() + self._setup_handlers() + await self.bot.set_my_commands([(cmd, data['description']) for cmd, data in self.commands.items()]) self.chat_id = self.config.get('telegram.chat_id') if not self.chat_id: raise ValueError("Telegram chat id not found in configuration") @@ -56,179 +44,156 @@ async def setup(self) -> None: self.logger.exception(f"Failed to set up telegram: {e}") raise + def _setup_handlers(self) -> None: + self.application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, self.on_telegram_message)) + self.application.add_handler(MessageHandler(filters.LOCATION, self.on_telegram_location)) + for command, data in self.commands.items(): + self.application.add_handler(CommandHandler(command, data['handler'])) + async def start_polling(self) -> None: if not self.application: - self.logger.error("Telegram application not initialized") + self.logger.error("Telegram application not initialized", exc_info=True) return self.logger.info("Starting telegram polling...") - await self.application.initialize() - await self.application.start() - - retry_delay = 1 - while not self._stop_event.is_set(): - try: - await self.application.updater.start_polling(drop_pending_updates=True) - self.logger.info("Telegram polling started") - await self._stop_event.wait() - except NetworkError as e: - self.logger.error(f"Network error occurred: {e}. Retrying in {retry_delay} seconds...") - await asyncio.sleep(retry_delay) - retry_delay = min(retry_delay * 2, 60) # Exponential backoff, max 60 seconds - except Exception as e: - self.logger.error(f"Unexpected error in Telegram polling: {e}") - break - - await self._shutdown_polling() + try: + await self.application.initialize() + await self.application.start() + await self.application.updater.start_polling(drop_pending_updates=True) + await self._stop_event.wait() + except Exception as e: + self.logger.error(f"Error in Telegram polling: {e}", exc_info=True) + finally: + await self._shutdown_polling() async def _shutdown_polling(self) -> None: self.logger.info("Stopping telegram polling...") - try: - if self.application.updater.running: - await self.application.updater.stop() - await self.application.stop() - await self.application.shutdown() - except RuntimeError as e: - self.logger.warning(f"RuntimeError during shutdown: {e}") - except Exception as e: - self.logger.error(f"Error during shutdown: {e}") + if self.application: + try: + await self.application.stop() + await self.application.shutdown() + except Exception as e: + self.logger.error(f"Error during Telegram shutdown: {e}", exc_info=True) self.logger.info("Telegram polling stopped") async def on_telegram_message(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: - self.logger.debug(f"Received message from Telegram: {update.message.text}") - try: - await self.message_queue.put({ - 'text': update.message.text, - 'sender': update.effective_user.username or update.effective_user.first_name, - 'type': 'telegram', - 'message_id': update.message.message_id, - 'user_id': update.effective_user.id - }) - self.logger.info(f"Received message from Telegram: {update.message.text}") - await update.message.reply_text("Message received and will be sent to Meshtastic.") - await self.bot.send_chat_action(chat_id=update.effective_chat.id, action="typing") - except Exception as e: - self.logger.error(f'Error handling Telegram message: {e}') - await update.message.reply_text("An error occurred while processing your message. Please try again.") + await self.message_queue.put({ + 'text': update.message.text, + 'sender': update.effective_user.username or update.effective_user.first_name, + 'type': 'telegram', + 'message_id': update.message.message_id, + 'user_id': update.effective_user.id + }) - async def send_message(self, text: str, disable_notification: bool = False, pin_message: bool = False) -> Optional[Message]: - self.logger.debug(f"Attempting to send message to Telegram: {text}") - try: - message = await self.bot.send_message(chat_id=self.chat_id, text=text, disable_notification=disable_notification, disable_web_page_preview=True) - self.logger.info(f"Sent message to Telegram: {text}") - if pin_message: - await self.bot.pin_chat_message(chat_id=self.chat_id, message_id=message.message_id) - self.logger.info("Pinned message to Telegram") - return message - except Exception as e: - self.logger.error(f"Failed to send Telegram message: {e}") - return None - - async def send_or_update_message(self, text: str, message_id: Optional[int] = None, disable_notification: bool = False) -> None: - await self.rate_limiter.wait() - try: + async def on_telegram_location(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: + await self.message_queue.put({ + 'location': { + 'latitude': update.message.location.latitude, + 'longitude': update.message.location.longitude + }, + 'sender': update.effective_user.username or update.effective_user.first_name, + 'type': 'location', + 'message_id': update.message.message_id + }) + + async def send_or_edit_message(self, message_type: str, node_id: str, content: str) -> None: + message_key = f"{message_type}:{node_id}" + if message_key in self.last_messages: + await self.edit_message(self.last_messages[message_key], content) + else: + message_id = await self.send_message(content) if message_id: - await self.bot.edit_message_text(chat_id=self.chat_id, message_id=message_id, text=text) - else: - message = await self.bot.send_message(chat_id=self.chat_id, text=text, disable_notification=disable_notification) - return message.message_id - except Exception as e: - self.logger.error(f"Failed to send or update telegram message: {e}") + self.last_messages[message_key] = message_id - async def on_telegram_location(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: + async def send_message(self, text: str, disable_notification: bool = False) -> Optional[int]: try: - await self.message_queue.put({ - 'location': { - 'latitude': update.message.location.latitude, - 'longitude': update.message.location.longitude - }, - 'sender': update.effective_user.username or update.effective_user.first_name, - 'type': 'telegram', - 'message_id': update.message.message_id - }) - await update.message.reply_text("Location received and will be sent to meshtastic.") + escaped_text = self.escape_markdown(text) + message = await self.bot.send_message( + chat_id=self.chat_id, + text=escaped_text, + disable_notification=disable_notification, + disable_web_page_preview=True, + parse_mode=ParseMode.MARKDOWN_V2 + ) + return message.message_id except Exception as e: - self.logger.error(f'Error handling telegram location: {e}') - await update.message.reply_text("An error occurred while processing your location. Please try again.") + self.logger.error(f"Failed to send Telegram message: {e}", exc_info=True) + return None - async def send_or_update_node_info(self, node_id: str, info_text: str) -> None: + # FIXME: "Message is not modified: specified new message content and reply markup are exactly the same as a current content and reply markup of the message" + async def edit_message(self, message_id: int, text: str) -> bool: try: - if node_id in self.last_node_messages: - await self.send_or_update_message(info_text, message_id=self.last_node_messages[node_id]) - else: - message_id = await self.send_or_update_message(info_text, disable_notification=True) - self.last_node_messages[node_id] = message_id - self.logger.info(f"Updated node info for {node_id}") + escaped_text = self.escape_markdown(text) + await self.bot.edit_message_text( + chat_id=self.chat_id, + message_id=message_id, + text=escaped_text, + parse_mode=ParseMode.MARKDOWN_V2 + ) + return True except Exception as e: - self.logger.error(f"Failed to update node info for {node_id}: {e}") + self.logger.error(f"Failed to edit Telegram message: {e}", exc_info=True) + return False - def generate_help_text(self) -> str: - help_text = "๐Ÿš€ Welcome to Meshgram! Here are the available commands:\n\n" - for command, data in self.commands.items(): - help_text += f"/{command} - {data['description']}\n" - - help_text += "\n๐Ÿ” Advanced Usage:\n" - help_text += "Some commands can target specific nodes by adding a node ID:\n" - help_text += "โ€ข /location [node_id] - ๐Ÿ“ Request location (e.g., /location !abc123)\n" - help_text += "โ€ข /telemetry [node_id] - ๐Ÿ“Š Request telemetry data\n" - help_text += "โ€ข /node [node_id] - โ„น๏ธ Get node information\n" - help_text += "\nIf no node ID is provided, the default node will be used." - - return help_text + @staticmethod + def escape_markdown(text: str) -> str: + # https://docs.python-telegram-bot.org/en/stable/telegram.helpers.html#telegram.helpers.escape_markdown + return escape_markdown(text, version=2) - async def start_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: + async def add_reaction(self, message_id: int, emoji: str) -> None: try: - await update.message.reply_text(self.generate_help_text()) + keyboard = [[InlineKeyboardButton(emoji, callback_data=f"reaction:{emoji}")]] + reply_markup = InlineKeyboardMarkup(keyboard) + await self.bot.edit_message_reply_markup( + chat_id=self.chat_id, + message_id=message_id, + reply_markup=reply_markup + ) except Exception as e: - self.logger.error(f"Error in start command: {e}") - await update.message.reply_text("An error occurred. Please try again.") - - async def help_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: - await self.start_command(update, context) + self.logger.error(f"Failed to add reaction to Telegram message: {e}", exc_info=True) def is_user_authorized(self, user_id: int) -> bool: authorized_users = self.config.get_authorized_users() return not authorized_users or user_id in authorized_users + async def start_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: + await self.help_command(update, context) + + async def help_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: + help_text = "๐Ÿ“š Available commands:\n\n" + help_text += "\n".join(f"/{command} - {data['description']}" for command, data in self.commands.items()) + await update.message.reply_text(self.escape_markdown(help_text), parse_mode=ParseMode.MARKDOWN_V2) + async def user_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: user = update.effective_user - user_info = (f"๐Ÿ‘ค User Information:\n" - f"๐Ÿ†” ID: {user.id}\n" - f"๐Ÿ“› Name: {user.full_name}\n" - f"๐Ÿท๏ธ Username: @{user.username}\n" - f"๐Ÿค– Is Bot: {'Yes' if user.is_bot else 'No'}") - await update.message.reply_text(user_info) + user_info = ( + f"๐Ÿ†” ID: {user.id}\n" + f"๐Ÿ‘ค Username: @{user.username}\n" + f"๐Ÿ“› Name: {user.full_name}\n" + f"๐Ÿค– Is Bot: {'Yes' if user.is_bot else 'No'}" + ) + await update.message.reply_text(self.escape_markdown(user_info), parse_mode=ParseMode.MARKDOWN_V2) async def handle_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: command = update.message.text.split()[0][1:].partition('@')[0] + args = context.args user_id = update.effective_user.id - if command not in ['start', 'help', 'user', 'status'] and not self.is_user_authorized(user_id): - await update.message.reply_text("You are not authorized to use this command.") + if not self.is_user_authorized(user_id) and command not in ['start', 'help', 'user']: + await update.message.reply_text(self.escape_markdown("You are not authorized to use this command."), parse_mode=ParseMode.MARKDOWN_V2) return - try: - args = {'node_id': context.args[0]} if context.args else {} - await self.message_queue.put({ - 'type': 'command', - 'command': command, - 'args': args, - 'user_id': user_id - }) - - if command in ['location', 'telemetry']: - node_id = args.get('node_id', 'default node') - await update.message.reply_text(f"Sending {command} request to {node_id}...", disable_notification=True) - except Exception as e: - self.logger.error(f"Error in {command} command: {e}") - await update.message.reply_text(f"An error occurred while processing the {command} command. Please try again.") + await self.message_queue.put({ + 'type': 'command', + 'command': command, + 'args': args, + 'user_id': user_id, + 'update': update + }) async def close(self) -> None: self.logger.info("Stopping telegram interface...") self._stop_event.set() - if self.application and self.application.updater.running: - await self.application.updater.stop() if self.application: await self.application.stop() - await self.application.shutdown() - self.logger.info("Telegram interface stopped.") \ No newline at end of file + self.logger.info("Telegram interface stopped.") diff --git a/tests/test_meshtastic_interface.py b/tests/test_meshtastic_interface.py index 6ee4f7b..2e62a24 100644 --- a/tests/test_meshtastic_interface.py +++ b/tests/test_meshtastic_interface.py @@ -1,2 +1,34 @@ -# tests/test_meshtastic_interface.py import pytest +from unittest.mock import AsyncMock, MagicMock +from meshtastic_interface import MeshtasticInterface +from config_manager import ConfigManager + +@pytest.fixture +def mock_config(): + config = MagicMock(spec=ConfigManager) + config.get.return_value = 'serial' + return config + +@pytest.mark.asyncio +async def test_meshtastic_interface_setup(mock_config): + interface = MeshtasticInterface(mock_config) + interface._create_interface = AsyncMock() + interface._fetch_node_info = AsyncMock() + + await interface.setup() + + assert interface.is_setup == True + interface._create_interface.assert_called_once() + interface._fetch_node_info.assert_called_once() + +@pytest.mark.asyncio +async def test_meshtastic_interface_send_message(mock_config): + interface = MeshtasticInterface(mock_config) + interface.interface = MagicMock() + interface.interface.sendText = AsyncMock() + + await interface.send_message("Test message", "!4e19d9a4") + + interface.interface.sendText.assert_called_once_with("Test message", destinationId="!4e19d9a4") + +# Add more tests for other methods... \ No newline at end of file