From 31da03ff425020f730126080acdbbd17142c4842 Mon Sep 17 00:00:00 2001 From: Tom Hensel Date: Fri, 5 Jul 2024 17:08:52 +0200 Subject: [PATCH] refactor; unify emoji style; use recent python features --- src/meshgram.py | 17 +-- src/meshtastic_interface.py | 14 +- src/message_processor.py | 293 ++++++++++++++++-------------------- src/node_manager.py | 20 +-- src/telegram_interface.py | 228 ++++++++++------------------ 5 files changed, 239 insertions(+), 333 deletions(-) diff --git a/src/meshgram.py b/src/meshgram.py index 4631c9d..bb35f98 100644 --- a/src/meshgram.py +++ b/src/meshgram.py @@ -1,6 +1,7 @@ import argparse import asyncio -from typing import Optional, List +from typing import Optional +from collections.abc import Sequence from meshtastic_interface import MeshtasticInterface from telegram_interface import TelegramInterface from message_processor import MessageProcessor @@ -14,7 +15,7 @@ def __init__(self, config: ConfigManager) -> None: self.meshtastic: Optional[MeshtasticInterface] = None self.telegram: Optional[TelegramInterface] = None self.message_processor: Optional[MessageProcessor] = None - self.tasks: List[Task] = [] + self.tasks: Sequence[Task] = () async def setup(self) -> None: self.logger.info("Setting up meshgram...") @@ -40,13 +41,12 @@ async def run(self) -> None: return self.logger.info("Meshgram is running ใƒฝ(ยดโ–ฝ`)/") - self.tasks = [ + self.tasks = ( asyncio.create_task(self.message_processor.process_messages()), asyncio.create_task(self.meshtastic.process_thread_safe_queue()), asyncio.create_task(self.meshtastic.process_pending_messages()), asyncio.create_task(self.telegram.start_polling()), - asyncio.create_task(self.message_processor.check_heartbeats()) - ] + ) try: await asyncio.gather(*self.tasks) except asyncio.CancelledError: @@ -67,10 +67,7 @@ async def shutdown(self) -> None: if self.telegram: await self.telegram.close() if self.message_processor: - if hasattr(self.message_processor, 'close'): - await self.message_processor.close() - else: - self.logger.warning("MessageProcessor does not have a close method.") + await self.message_processor.close() self.logger.info("Meshgram shutdown complete.") async def main() -> None: @@ -97,4 +94,4 @@ async def main() -> None: try: asyncio.run(main()) except KeyboardInterrupt: - print("\nShutdown complete.") + print("\nShutdown complete.") \ No newline at end of file diff --git a/src/meshtastic_interface.py b/src/meshtastic_interface.py index cb87bfd..5b8c796 100644 --- a/src/meshtastic_interface.py +++ b/src/meshtastic_interface.py @@ -68,7 +68,19 @@ async def _fetch_node_info(self) -> None: def on_meshtastic_message(self, packet, interface): self.logger.info(f"Received message from Meshtastic: {packet}") self.logger.debug(f"Message details - fromId: {packet.get('fromId')}, toId: {packet.get('toId')}, portnum: {packet.get('decoded', {}).get('portnum')}") - self.thread_safe_queue.put(packet) + if packet.get('decoded', {}).get('portnum') == 'ROUTING_APP': + self.handle_ack(packet) + else: + self.thread_safe_queue.put(packet) + + def handle_ack(self, packet): + ack_data = { + 'type': 'ack', + 'from': packet.get('fromId'), + 'to': packet.get('toId'), + 'message_id': packet.get('id') + } + self.loop.call_soon_threadsafe(self.message_queue.put_nowait, ack_data) def on_connection(self, interface, topic=pub.AUTO_TOPIC): self.logger.info(f"Connected to Meshtastic interface: {interface}") diff --git a/src/message_processor.py b/src/message_processor.py index 0397b36..638c2eb 100644 --- a/src/message_processor.py +++ b/src/message_processor.py @@ -1,5 +1,5 @@ import asyncio -from typing import Dict, Any +from typing import Dict, Any, List from datetime import datetime, timezone from meshtastic_interface import MeshtasticInterface from telegram_interface import TelegramInterface @@ -13,10 +13,53 @@ def __init__(self, meshtastic: MeshtasticInterface, telegram: TelegramInterface, self.telegram = telegram self.node_manager = meshtastic.node_manager self.start_time = datetime.now(timezone.utc) - self.last_heartbeat = {} + self.last_heartbeat: Dict[str, datetime] = {} self.heartbeat_timeout = config.get('meshtastic.heartbeat_timeout', 300) self.local_nodes = config.get('meshtastic.local_nodes', []) - self.pending_requests = {} # For tracking location, telemetry, and traceroute requests + + async def process_messages(self) -> None: + tasks = [ + self.process_meshtastic_messages(), + self.process_telegram_messages(), + self.periodic_status_update(), + self.check_heartbeats() + ] + await asyncio.gather(*tasks) + + async def process_meshtastic_messages(self) -> None: + while True: + try: + message = await self.meshtastic.message_queue.get() + self.logger.debug(f"Processing Meshtastic message: {message}") + if 'decoded' in message and 'portnum' in message['decoded']: + await self.handle_meshtastic_message(message) + else: + self.logger.warning(f"Received unexpected message format: {message}") + except asyncio.CancelledError: + break + except Exception as e: + self.logger.error(f"Error processing Meshtastic message: {e}", exc_info=True) + await asyncio.sleep(0.1) + + async def process_telegram_messages(self) -> None: + while True: + try: + message = await self.telegram.message_queue.get() + self.logger.info(f"Processing Telegram message: {message}") + match message['type']: + case 'command': + await self.handle_telegram_command(message) + case 'telegram': + await self.handle_telegram_text(message) + case 'location': + await self.handle_telegram_location(message) + case _: + self.logger.warning(f"Received unknown message type: {message['type']}") + except asyncio.CancelledError: + break + except Exception as e: + self.logger.error(f"Error processing Telegram message: {e}", exc_info=True) + await asyncio.sleep(0.1) async def handle_meshtastic_message(self, packet: Dict[str, Any]) -> None: self.logger.debug(f"Received Meshtastic message: {packet}") @@ -42,30 +85,12 @@ async def handle_text_message_app(self, packet: Dict[str, Any]) -> None: self.logger.info(f"Sending Meshtastic message to Telegram: {message}") await self.telegram.send_message(message, disable_notification=False) - async def handle_telegram_text(self, message: Dict[str, Any]) -> None: - self.logger.info(f"Handling Telegram text message: {message}") - sender = message['sender'][:10] - recipient = self.config.get('meshtastic.default_node_id', '^all') - text = message['text'] - - meshtastic_message = f"[TG:{sender}] {text}" - self.logger.info(f"Preparing to send Telegram message to Meshtastic: {meshtastic_message}") - try: - await self.meshtastic.send_message(meshtastic_message, recipient) - self.logger.info(f"Successfully sent message to Meshtastic: {meshtastic_message}") - except Exception as e: - self.logger.error(f"Failed to send message to Meshtastic: {e}", exc_info=True) - await self.telegram.send_message("Failed to send message to Meshtastic. Please try again.") - - # Add this line to check if the message is being processed - self.logger.info("Finished handling Telegram text message") - async def handle_position_app(self, packet: Dict[str, Any]) -> None: position = packet['decoded'].get('position', {}) sender = packet.get('fromId', 'unknown') self.node_manager.update_node_position(sender, position) position_info = self.node_manager.get_node_position(sender) - await self.update_or_send_message('location', sender, position_info) + await self.telegram.send_or_edit_message('location', sender, position_info) async def handle_telemetry_app(self, packet: Dict[str, Any]) -> None: node_id = packet.get('fromId', 'unknown') @@ -74,106 +99,38 @@ async def handle_telemetry_app(self, packet: Dict[str, Any]) -> None: self.node_manager.update_node_telemetry(node_id, device_metrics) self.last_heartbeat[node_id] = datetime.now(timezone.utc) telemetry_info = self.node_manager.get_node_telemetry(node_id) - await self.update_or_send_message('telemetry', node_id, telemetry_info) - - async def handle_admin_app(self, packet: Dict[str, Any]) -> None: - admin_message = packet.get('decoded', {}).get('admin', {}) - self.logger.info(f"Received admin message: {admin_message}") - if 'getRouteReply' in admin_message: - route = admin_message['getRouteReply'].get('route', []) - dest_id = packet.get('toId', 'unknown') - route_str = " -> ".join(map(str, route)) if route else "No route found" - traceroute_result = f"๐Ÿ” Traceroute to {dest_id}:\n{route_str}" - await self.update_or_send_message('traceroute', dest_id, traceroute_result) - elif 'getChannelResponse' in admin_message: - self.logger.info(f"Received channel response: {admin_message['getChannelResponse']}") - else: - self.logger.warning(f"Unhandled admin message: {admin_message}") - - async def update_or_send_message(self, request_type: str, node_id: str, content: str) -> None: - request_key = f"{request_type}:{node_id}" - if request_key in self.pending_requests: - await self.telegram.send_or_update_message(content, message_id=self.pending_requests[request_key]) - del self.pending_requests[request_key] - else: - await self.telegram.send_message(content, disable_notification=True) - - async def process_messages(self) -> None: - tasks = [ - self.process_meshtastic_messages(), - self.process_telegram_messages(), - self.periodic_status_update(), - self.check_heartbeats() - ] - await asyncio.gather(*tasks) - - async def process_meshtastic_messages(self) -> None: - while True: - try: - message = await self.meshtastic.message_queue.get() - self.logger.debug(f"Processing Meshtastic message: {message}") - if 'decoded' in message and 'portnum' in message['decoded']: - await self.handle_meshtastic_message(message) - else: - self.logger.warning(f"Received unexpected message format: {message}") - except asyncio.CancelledError: - break - except Exception as e: - self.logger.error(f"Error processing Meshtastic message: {e}", exc_info=True) - await asyncio.sleep(0.1) - - async def process_telegram_messages(self) -> None: - while True: - try: - self.logger.debug("Waiting for Telegram message...") - message = await self.telegram.message_queue.get() - self.logger.info(f"Processing Telegram message: {message}") - if message['type'] == 'command': - await self.handle_telegram_command(message) - elif message['type'] == 'telegram': - await self.handle_telegram_text(message) - elif message['type'] == 'location': - await self.handle_telegram_location(message) - else: - self.logger.warning(f"Received unknown message type: {message['type']}") - self.logger.debug("Finished processing Telegram message") - except asyncio.CancelledError: - break - except Exception as e: - self.logger.error(f"Error processing Telegram message: {e}", exc_info=True) - await asyncio.sleep(0.1) - - async def periodic_status_update(self) -> None: - while True: - try: - await asyncio.sleep(3600) - status = await self.get_status() - await self.telegram.send_message(status) - except asyncio.CancelledError: - break - except Exception as e: - self.logger.error(f"Error in periodic status update: {e}", exc_info=True) - - async def get_status(self) -> str: - uptime = datetime.now(timezone.utc) - self.start_time - meshtastic_status = await self.meshtastic.get_status() - num_nodes = len(self.node_manager.get_all_nodes()) - return f"Meshgram Status:\nUptime: {uptime}\nConnected Nodes: {num_nodes}\nMeshtastic Status:\n{meshtastic_status}" + await self.telegram.send_or_edit_message('telemetry', node_id, telemetry_info) async def handle_nodeinfo_app(self, packet: Dict[str, Any]) -> None: - node_id = packet.get('from', 'unknown') + node_id = packet.get('fromId', 'unknown') node_info = packet['decoded'] self.node_manager.update_node(node_id, { 'shortName': node_info.get('user', {}).get('shortName', 'unknown'), 'longName': node_info.get('user', {}).get('longName', 'unknown'), 'hwModel': node_info.get('user', {}).get('hwModel', 'unknown') }) - await self.telegram.send_message(self.node_manager.format_node_info(node_id), disable_notification=True) + info_text = self.node_manager.format_node_info(node_id) + await self.telegram.send_or_edit_message('nodeinfo', node_id, info_text) + + async def handle_telegram_text(self, message: Dict[str, Any]) -> None: + self.logger.info(f"Handling Telegram text message: {message}") + sender = message['sender'][:10] + recipient = self.config.get('meshtastic.default_node_id', '^all') + text = message['text'] + + meshtastic_message = f"[TG:{sender}] {text}" + self.logger.info(f"Preparing to send Telegram message to Meshtastic: {meshtastic_message}") + try: + await self.meshtastic.send_message(meshtastic_message, recipient) + self.logger.info(f"Successfully sent message to Meshtastic: {meshtastic_message}") + except Exception as e: + self.logger.error(f"Failed to send message to Meshtastic: {e}", exc_info=True) + await self.telegram.send_message("Failed to send message to Meshtastic. Please try again.") async def handle_telegram_command(self, message: Dict[str, Any]) -> None: try: command = message.get('command', '').partition('@')[0] - args, user_id = message.get('args', {}), message.get('user_id') + args, user_id = message.get('args', []), message.get('user_id') if not self.telegram.is_user_authorized(user_id) and command not in ['start', 'help', 'user']: await self.telegram.send_message("You are not authorized to use this command.") @@ -188,83 +145,91 @@ async def handle_telegram_command(self, message: Dict[str, Any]) -> None: self.logger.error(f'Error handling Telegram command: {e}', exc_info=True) await self.telegram.send_message(f"Error executing command: {e}") - def validate_node_id(self, node_id: str) -> bool: - return len(node_id) == 8 and all(c in '0123456789abcdefABCDEF' for c in node_id) - async def handle_telegram_location(self, message: Dict[str, Any]) -> None: lat, lon = message['location']['latitude'], message['location']['longitude'] sender = message['sender'] await self.meshtastic.send_location(lat, lon, f"Location from telegram user {sender}") - async def cmd_status(self, args: Dict[str, Any], user_id: int) -> None: - nodes = self.node_manager.get_all_nodes() - total_nodes = len(nodes) - active_nodes = sum(1 for node in nodes.values() if 'last_updated' in node) - - status_text = f"๐Ÿ“Š Meshgram Status\n๐Ÿ”ข Total nodes: {total_nodes}\nโœ… Active nodes: {active_nodes}\n\n" - for node_id, node_info in nodes.items(): - status_text += (f"๐Ÿ”ท Node {node_id}:\n" - f"๐Ÿ“› Name: {node_info.get('name', 'Unknown')}\n" - f"๐Ÿ”‹ Battery: {node_info.get('batteryLevel', 'Unknown')}\n" - f"โฑ๏ธ Uptime: {node_info.get('uptimeSeconds', 'Unknown')} seconds\n" - f"๐Ÿ•’ Last updated: {node_info.get('last_updated', 'Unknown')}\n\n") - - await self.telegram.send_message(status_text) + async def periodic_status_update(self) -> None: + while True: + try: + await asyncio.sleep(3600) + status = await self.get_status() + await self.telegram.send_message(status) + except asyncio.CancelledError: + break + except Exception as e: + self.logger.error(f"Error in periodic status update: {e}", exc_info=True) - async def cmd_node(self, args: Dict[str, Any], user_id: int) -> None: - node_id = args.get('node_id') or self.config.get('meshtastic.default_node_id') - if not node_id: + async def get_status(self) -> str: + uptime = datetime.now(timezone.utc) - self.start_time + meshtastic_status = await self.meshtastic.get_status() + num_nodes = len(self.node_manager.get_all_nodes()) + return f"Meshgram Status:\nUptime: {uptime}\nConnected Nodes: {num_nodes}\nMeshtastic Status:\n{meshtastic_status}" + + async def check_heartbeats(self) -> None: + while True: + try: + now = datetime.now(timezone.utc) + for node_id, last_heartbeat in list(self.last_heartbeat.items()): + if (now - last_heartbeat).total_seconds() > self.heartbeat_timeout: + del self.last_heartbeat[node_id] + await self.telegram.send_message(f"โš ๏ธ Node {node_id} is no longer active.") + await asyncio.sleep(60) + except Exception as e: + self.logger.error(f"Error in check_heartbeats: {e}", exc_info=True) + await asyncio.sleep(60) + + async def cmd_status(self, args: List[str], user_id: int) -> None: + status = await self.get_status() + await self.telegram.send_message(status) + + async def cmd_bell(self, args: List[str], user_id: int) -> None: + dest_id = args[0] if args else self.config.get('meshtastic.default_node_id') + if not dest_id: await self.telegram.send_message("No node ID provided and no default node ID set.") return - node_info = self.node_manager.format_node_info(node_id) - await self.telegram.send_message(node_info) - - async def cmd_bell(self, args: Dict[str, Any], user_id: int) -> None: - dest_id = args.get('node_id') or self.config.get('meshtastic.default_node_id') self.logger.info(f"Sending bell to node {dest_id}") try: await self.meshtastic.send_bell(dest_id) - self.logger.info(f"Bell sent successfully to node {dest_id}") await self.telegram.send_message(f"๐Ÿ”” Bell sent to node {dest_id}.", disable_notification=True) except Exception as e: self.logger.error(f"Failed to send bell to node {dest_id}: {e}", exc_info=True) await self.telegram.send_message(f"Failed to send bell to node {dest_id}. Error: {str(e)}") - async def cmd_location(self, args: Dict[str, Any], user_id: int) -> None: - dest_id = args.get('node_id') or self.config.get('meshtastic.default_node_id') + async def cmd_location(self, args: List[str], user_id: int) -> None: + dest_id = args[0] if args else self.config.get('meshtastic.default_node_id') + if not dest_id: + await self.telegram.send_message("No node ID provided and no default node ID set.") + return await self.meshtastic.request_location(dest_id) - message = await self.telegram.send_message(f"๐Ÿ“ Location request sent to node {dest_id}. Waiting for response...") - if message: - self.pending_requests[f"location:{dest_id}"] = message.message_id + await self.telegram.send_message(f"๐Ÿ“ Location request sent to node {dest_id}. Waiting for response...") - async def cmd_telemetry(self, args: Dict[str, Any], user_id: int) -> None: - dest_id = args.get('node_id') or self.config.get('meshtastic.default_node_id') + async def cmd_telemetry(self, args: List[str], user_id: int) -> None: + dest_id = args[0] if args else self.config.get('meshtastic.default_node_id') + if not dest_id: + await self.telegram.send_message("No node ID provided and no default node ID set.") + return await self.meshtastic.request_telemetry(dest_id) - message = await self.telegram.send_message(f"๐Ÿ“Š Telemetry request sent to node {dest_id}. Waiting for response...") - if message: - self.pending_requests[f"telemetry:{dest_id}"] = message.message_id + await self.telegram.send_message(f"๐Ÿ“Š Telemetry request sent to node {dest_id}. Waiting for response...") - async def cmd_traceroute(self, args: Dict[str, Any], user_id: int) -> None: - dest_id = args.get('node_id') or self.config.get('meshtastic.default_node_id') - message = await self.telegram.send_message(f"๐Ÿ” Initiating traceroute to {dest_id}...") - if message: - self.pending_requests[f"traceroute:{dest_id}"] = message.message_id + async def cmd_traceroute(self, args: List[str], user_id: int) -> None: + dest_id = args[0] if args else self.config.get('meshtastic.default_node_id') + if not dest_id: + await self.telegram.send_message("No node ID provided and no default node ID set.") + return + await self.telegram.send_message(f"๐Ÿ” Initiating traceroute to {dest_id}...") await self.meshtastic.traceroute(dest_id) - async def check_heartbeats(self) -> None: - while True: - try: - now = datetime.now(timezone.utc) - for node_id, last_heartbeat in list(self.last_heartbeat.items()): - if (now - last_heartbeat).total_seconds() > self.heartbeat_timeout: - del self.last_heartbeat[node_id] - await self.telegram.send_message(f"โš ๏ธ Node {node_id} is no longer active.") - await asyncio.sleep(60) - except Exception as e: - self.logger.error(f"Error in check_heartbeats: {e}", exc_info=True) - await asyncio.sleep(60) + async def cmd_node(self, args: List[str], user_id: int) -> None: + node_id = args[0] if args else self.config.get('meshtastic.default_node_id') + if not node_id: + await self.telegram.send_message("No node ID provided and no default node ID set.") + return + node_info = self.node_manager.format_node_info(node_id) + await self.telegram.send_message(node_info) async def close(self) -> None: self.logger.info("Closing MessageProcessor...") # Add any cleanup operations here if needed - self.logger.info("MessageProcessor closed.") + self.logger.info("MessageProcessor closed.") \ No newline at end of file diff --git a/src/node_manager.py b/src/node_manager.py index 1ddcbe3..948d73b 100644 --- a/src/node_manager.py +++ b/src/node_manager.py @@ -32,7 +32,7 @@ def format_node_info(self, node_id: str) -> str: node = self.get_node(node_id) if not node: return f"โ„น๏ธ No information available for node {node_id}" - info = [f"๐Ÿ”ท Discovered Node ID: {node_id}"] + info = [f"๐Ÿ”ท Node {node_id}:"] for key, value in node.items(): if key != 'last_updated': emoji = { @@ -48,23 +48,23 @@ def format_node_info(self, node_id: str) -> str: def get_node_telemetry(self, node_id: str) -> str: node = self.get_node(node_id) if not node: - return f"No telemetry available for node {node_id}" + return f"๐Ÿ“Š No telemetry available for node {node_id}" air_util_tx = node.get('airUtilTx', 'Unknown') air_util_tx_str = f"{air_util_tx:.2f}%" if isinstance(air_util_tx, (int, float)) else air_util_tx return (f"๐Ÿ“Š Telemetry for node {node_id}:\n" - f"โ€ข Battery: {node.get('batteryLevel', 'Unknown')}%\n" - f"โ€ข Air Utilization TX: {air_util_tx_str}\n" - f"โ€ข Uptime: {node.get('uptimeSeconds', 'Unknown')} seconds\n" - f"โ€ข Last updated: {node.get('last_updated', 'Unknown')}") + f"๐Ÿ”‹ Battery: {node.get('batteryLevel', 'Unknown')}%\n" + f"๐Ÿ“ก Air Utilization TX: {air_util_tx_str}\n" + f"โฑ๏ธ Uptime: {node.get('uptimeSeconds', 'Unknown')} seconds\n" + f"๐Ÿ•’ Last updated: {node.get('last_updated', 'Unknown')}") def get_node_position(self, node_id: str) -> str: node = self.get_node(node_id) if not node: - return f"No position available for node {node_id}" + return f"๐Ÿ“ No position available for node {node_id}" return (f"๐Ÿ“ Position for node {node_id}:\n" - f"โ€ข Latitude: {node.get('latitude', 'Unknown')}\n" - f"โ€ข Longitude: {node.get('longitude', 'Unknown')}\n" - f"โ€ข Last updated: {node.get('last_position_update', 'Unknown')}") + f"๐ŸŒŽ Latitude: {node.get('latitude', 'Unknown')}\n" + f"๐ŸŒ Longitude: {node.get('longitude', 'Unknown')}\n" + f"๐Ÿ•’ Last updated: {node.get('last_position_update', 'Unknown')}") def validate_node_id(self, node_id: str) -> bool: return len(node_id) == 8 and all(c in '0123456789abcdefABCDEF' for c in node_id) diff --git a/src/telegram_interface.py b/src/telegram_interface.py index 169796a..1c8b52f 100644 --- a/src/telegram_interface.py +++ b/src/telegram_interface.py @@ -1,28 +1,20 @@ import asyncio -from typing import Dict, Any, Optional, List -from telegram import Bot, Update, BotCommand, Message, InlineKeyboardButton, InlineKeyboardMarkup +from typing import Dict, Any, Optional, Callable +from telegram import Bot, Update from telegram.ext import Application, CommandHandler, ContextTypes, MessageHandler, filters from config_manager import ConfigManager, get_logger -import time - -class RateLimiter: - def __init__(self, max_calls: int, period: float): - self.max_calls, self.period, self.calls = max_calls, period, [] - - async def wait(self): - now = time.time() - self.calls = [call for call in self.calls if now - call < self.period] - if len(self.calls) >= self.max_calls: - await asyncio.sleep(self.period - (now - self.calls[0])) - self.calls.append(time.time()) class TelegramInterface: def __init__(self, config: ConfigManager) -> None: - self.config, self.logger = config, get_logger(__name__) - self.bot, self.application = None, None - self.message_queue, self._stop_event = asyncio.Queue(), asyncio.Event() - self.chat_id, self.last_node_messages = None, {} - self.commands = { + self.config = config + self.logger = get_logger(__name__) + self.bot: Optional[Bot] = None + self.application: Optional[Application] = None + self.message_queue: asyncio.Queue[Dict[str, Any]] = asyncio.Queue() + self._stop_event: asyncio.Event = asyncio.Event() + self.chat_id: Optional[int] = None + self.last_messages: Dict[str, int] = {} + self.commands: Dict[str, Dict[str, str | Callable]] = { 'start': {'description': 'Start the bot and see available commands', 'handler': self.start_command}, 'help': {'description': 'Show help message', 'handler': self.help_command}, 'status': {'description': 'Check the current status', 'handler': self.handle_command}, @@ -33,7 +25,6 @@ def __init__(self, config: ConfigManager) -> None: 'node': {'description': 'Get information about a specific node', 'handler': self.handle_command}, 'user': {'description': 'Get information about your Telegram user', 'handler': self.user_command}, } - self.rate_limiter = RateLimiter(max_calls=30, period=60) async def setup(self) -> None: self.logger.info("Setting up telegram interface...") @@ -42,12 +33,12 @@ async def setup(self) -> None: if not token: raise ValueError("Telegram bot token not found in configuration") self.bot = Bot(token=token) - self.application = Application.builder().token(token=token).build() + self.application = Application.builder().token(token).build() self.application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, self.on_telegram_message)) self.application.add_handler(MessageHandler(filters.LOCATION, self.on_telegram_location)) for command, data in self.commands.items(): self.application.add_handler(CommandHandler(command, data['handler'])) - await self.bot.set_my_commands([BotCommand(command, data['description']) for command, data in self.commands.items()]) + await self.bot.set_my_commands([(cmd, data['description']) for cmd, data in self.commands.items()]) self.chat_id = self.config.get('telegram.chat_id') if not self.chat_id: raise ValueError("Telegram chat id not found in configuration") @@ -64,170 +55,111 @@ async def start_polling(self) -> None: self.logger.info("Starting telegram polling...") await self.application.initialize() await self.application.start() + await self.application.updater.start_polling(drop_pending_updates=True) - retry_delay = 1 - while not self._stop_event.is_set(): - try: - await self.application.updater.start_polling(drop_pending_updates=True) - self.logger.info("Telegram polling started") - await self._stop_event.wait() - except NetworkError as e: - self.logger.error(f"Network error occurred: {e}. Retrying in {retry_delay} seconds...") - await asyncio.sleep(retry_delay) - retry_delay = min(retry_delay * 2, 60) # Exponential backoff, max 60 seconds - except Exception as e: - self.logger.error(f"Unexpected error in Telegram polling: {e}") - break - - await self._shutdown_polling() + try: + await self._stop_event.wait() + finally: + await self._shutdown_polling() async def _shutdown_polling(self) -> None: self.logger.info("Stopping telegram polling...") - try: - if self.application.updater.running: - await self.application.updater.stop() + if self.application: await self.application.stop() await self.application.shutdown() - except RuntimeError as e: - self.logger.warning(f"RuntimeError during shutdown: {e}") - except Exception as e: - self.logger.error(f"Error during shutdown: {e}") self.logger.info("Telegram polling stopped") async def on_telegram_message(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: - self.logger.debug(f"Received message from Telegram: {update.message.text}") - try: - await self.message_queue.put({ - 'text': update.message.text, - 'sender': update.effective_user.username or update.effective_user.first_name, - 'type': 'telegram', - 'message_id': update.message.message_id, - 'user_id': update.effective_user.id - }) - self.logger.info(f"Received message from Telegram: {update.message.text}") - await update.message.reply_text("Message received and will be sent to Meshtastic.") - await self.bot.send_chat_action(chat_id=update.effective_chat.id, action="typing") - except Exception as e: - self.logger.error(f'Error handling Telegram message: {e}') - await update.message.reply_text("An error occurred while processing your message. Please try again.") - - async def send_message(self, text: str, disable_notification: bool = False, pin_message: bool = False) -> Optional[Message]: - self.logger.debug(f"Attempting to send message to Telegram: {text}") - try: - message = await self.bot.send_message(chat_id=self.chat_id, text=text, disable_notification=disable_notification, disable_web_page_preview=True) - self.logger.info(f"Sent message to Telegram: {text}") - if pin_message: - await self.bot.pin_chat_message(chat_id=self.chat_id, message_id=message.message_id) - self.logger.info("Pinned message to Telegram") - return message - except Exception as e: - self.logger.error(f"Failed to send Telegram message: {e}") - return None + await self.message_queue.put({ + 'text': update.message.text, + 'sender': update.effective_user.username or update.effective_user.first_name, + 'type': 'telegram', + 'message_id': update.message.message_id, + 'user_id': update.effective_user.id + }) - async def send_or_update_message(self, text: str, message_id: Optional[int] = None, disable_notification: bool = False) -> None: - await self.rate_limiter.wait() - try: + async def on_telegram_location(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: + await self.message_queue.put({ + 'location': { + 'latitude': update.message.location.latitude, + 'longitude': update.message.location.longitude + }, + 'sender': update.effective_user.username or update.effective_user.first_name, + 'type': 'location', + 'message_id': update.message.message_id + }) + + async def send_or_edit_message(self, message_type: str, node_id: str, content: str) -> None: + message_key = f"{message_type}:{node_id}" + if message_key in self.last_messages: + await self.edit_message(self.last_messages[message_key], content) + else: + message_id = await self.send_message(content) if message_id: - await self.bot.edit_message_text(chat_id=self.chat_id, message_id=message_id, text=text) - else: - message = await self.bot.send_message(chat_id=self.chat_id, text=text, disable_notification=disable_notification) - return message.message_id - except Exception as e: - self.logger.error(f"Failed to send or update telegram message: {e}") + self.last_messages[message_key] = message_id - async def on_telegram_location(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: + async def send_message(self, text: str, disable_notification: bool = False) -> Optional[int]: try: - await self.message_queue.put({ - 'location': { - 'latitude': update.message.location.latitude, - 'longitude': update.message.location.longitude - }, - 'sender': update.effective_user.username or update.effective_user.first_name, - 'type': 'telegram', - 'message_id': update.message.message_id - }) - await update.message.reply_text("Location received and will be sent to meshtastic.") + message = await self.bot.send_message( + chat_id=self.chat_id, + text=text, + disable_notification=disable_notification, + disable_web_page_preview=True + ) + return message.message_id except Exception as e: - self.logger.error(f'Error handling telegram location: {e}') - await update.message.reply_text("An error occurred while processing your location. Please try again.") + self.logger.error(f"Failed to send Telegram message: {e}") + return None - async def send_or_update_node_info(self, node_id: str, info_text: str) -> None: + async def edit_message(self, message_id: int, text: str) -> bool: try: - if node_id in self.last_node_messages: - await self.send_or_update_message(info_text, message_id=self.last_node_messages[node_id]) - else: - message_id = await self.send_or_update_message(info_text, disable_notification=True) - self.last_node_messages[node_id] = message_id - self.logger.info(f"Updated node info for {node_id}") + await self.bot.edit_message_text( + chat_id=self.chat_id, + message_id=message_id, + text=text + ) + return True except Exception as e: - self.logger.error(f"Failed to update node info for {node_id}: {e}") + self.logger.error(f"Failed to edit Telegram message: {e}") + return False - def generate_help_text(self) -> str: - help_text = "๐Ÿš€ Welcome to Meshgram! Here are the available commands:\n\n" - for command, data in self.commands.items(): - help_text += f"/{command} - {data['description']}\n" - - help_text += "\n๐Ÿ” Advanced Usage:\n" - help_text += "Some commands can target specific nodes by adding a node ID:\n" - help_text += "โ€ข /location [node_id] - ๐Ÿ“ Request location (e.g., /location !abc123)\n" - help_text += "โ€ข /telemetry [node_id] - ๐Ÿ“Š Request telemetry data\n" - help_text += "โ€ข /node [node_id] - โ„น๏ธ Get node information\n" - help_text += "\nIf no node ID is provided, the default node will be used." - - return help_text + def is_user_authorized(self, user_id: int) -> bool: + authorized_users = self.config.get_authorized_users() + return not authorized_users or user_id in authorized_users async def start_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: - try: - await update.message.reply_text(self.generate_help_text()) - except Exception as e: - self.logger.error(f"Error in start command: {e}") - await update.message.reply_text("An error occurred. Please try again.") + await self.help_command(update, context) async def help_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: - await self.start_command(update, context) - - def is_user_authorized(self, user_id: int) -> bool: - authorized_users = self.config.get_authorized_users() - return not authorized_users or user_id in authorized_users + help_text = "Available commands:\n\n" + for command, data in self.commands.items(): + help_text += f"/{command} - {data['description']}\n" + await update.message.reply_text(help_text) async def user_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: user = update.effective_user - user_info = (f"๐Ÿ‘ค User Information:\n" - f"๐Ÿ†” ID: {user.id}\n" - f"๐Ÿ“› Name: {user.full_name}\n" - f"๐Ÿท๏ธ Username: @{user.username}\n" - f"๐Ÿค– Is Bot: {'Yes' if user.is_bot else 'No'}") + user_info = f"User ID: {user.id}\nUsername: {user.username}\nName: {user.full_name}" await update.message.reply_text(user_info) async def handle_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: command = update.message.text.split()[0][1:].partition('@')[0] + args = context.args user_id = update.effective_user.id - if command not in ['start', 'help', 'user', 'status'] and not self.is_user_authorized(user_id): + if not self.is_user_authorized(user_id) and command not in ['start', 'help', 'user']: await update.message.reply_text("You are not authorized to use this command.") return - try: - args = {'node_id': context.args[0]} if context.args else {} - await self.message_queue.put({ - 'type': 'command', - 'command': command, - 'args': args, - 'user_id': user_id - }) - - if command in ['location', 'telemetry']: - node_id = args.get('node_id', 'default node') - await update.message.reply_text(f"Sending {command} request to {node_id}...", disable_notification=True) - except Exception as e: - self.logger.error(f"Error in {command} command: {e}") - await update.message.reply_text(f"An error occurred while processing the {command} command. Please try again.") + await self.message_queue.put({ + 'type': 'command', + 'command': command, + 'args': args, + 'user_id': user_id + }) async def close(self) -> None: self.logger.info("Stopping telegram interface...") self._stop_event.set() - if self.application and self.application.updater.running: - await self.application.updater.stop() if self.application: await self.application.stop() await self.application.shutdown()