diff --git a/bin/hytera-homebrew-bridge.py b/bin/hytera-homebrew-bridge.py index d924f47..799249b 100755 --- a/bin/hytera-homebrew-bridge.py +++ b/bin/hytera-homebrew-bridge.py @@ -7,13 +7,34 @@ import sys from asyncio import AbstractEventLoop, Queue from signal import SIGINT, SIGTERM -from typing import Optional - - -class HyteraHomebrewBridge: - def __init__(self, settings_ini_path: str): - self.loop: Optional[AbstractEventLoop] = None - self.settings: BridgeSettings = BridgeSettings(filepath=settings_ini_path) +from typing import Optional, Dict + +self_name: str = "hytera_homebrew_bridge" +self_spec = importlib.util.find_spec(self_name) +if self_spec is None: + parent_folder: str = os.path.dirname(os.path.dirname(os.path.realpath(__file__))) + expected_folder: str = f"{parent_folder}{os.path.sep}" + if os.path.isdir(expected_folder): + sys.path.append(expected_folder) + +from hytera_homebrew_bridge.lib.hytera_protocols import ( + HyteraP2PProtocol, + HyteraDMRProtocol, + HyteraRDACProtocol, +) +from hytera_homebrew_bridge.lib.mmdvm_protocol import MMDVMProtocol +from hytera_homebrew_bridge.lib.settings import BridgeSettings +from hytera_homebrew_bridge.lib.hytera_mmdvm_translator import HyteraMmdvmTranslator + +from hytera_homebrew_bridge.lib.callback_interface import CallbackInterface + + +class HyteraRepeater(CallbackInterface): + def __init__(self, ip: str, settings: BridgeSettings, asyncloop: AbstractEventLoop): + # ip of hytera repeater + self.ip: str = ip + self.settings: BridgeSettings = settings + self.loop: AbstractEventLoop = asyncloop # message queues for translator self.queue_mmdvm_outgoing: Queue = Queue() self.queue_hytera_incoming: Queue = Queue() @@ -25,73 +46,35 @@ def __init__(self, settings_ini_path: str): connection_lost_callback=self.homebrew_connection_lost, queue_outgoing=self.queue_mmdvm_outgoing, queue_incoming=self.queue_mmdvm_incoming, - ) - # hytera ipsc: p2p dmr and rdac - self.hytera_p2p_protocol: HyteraP2PProtocol = HyteraP2PProtocol( - settings=self.settings, repeater_accepted_callback=self.homebrew_connect() + hytera_repeater_ip=ip, ) self.hytera_dmr_protocol: HyteraDMRProtocol = HyteraDMRProtocol( settings=self.settings, queue_incoming=self.queue_hytera_incoming, queue_outgoing=self.queue_hytera_outgoing, + hytera_repeater_ip=ip, ) - self.hytera_rdac_protocol: HyteraRDACProtocol = HyteraRDACProtocol( - settings=self.settings, rdac_completed_callback=self.homebrew_connect() - ) - # prepare translator self.hytera_mmdvm_translator: HyteraMmdvmTranslator = HyteraMmdvmTranslator( settings=self.settings, mmdvm_incoming=self.queue_mmdvm_incoming, hytera_incoming=self.queue_hytera_incoming, mmdvm_outgoing=self.queue_mmdvm_outgoing, hytera_outgoing=self.queue_hytera_outgoing, + hytera_repeater_ip=self.ip, ) - async def go(self) -> None: - self.loop = asyncio.get_running_loop() - self.settings.print_settings() - - # start translator tasks - self.loop.create_task(self.hytera_mmdvm_translator.translate_from_mmdvm()) - self.loop.create_task(self.hytera_mmdvm_translator.translate_from_hytera()) - - # mmdvm maintenance (auto login, auth, ping/pong) - self.loop.create_task(self.homebrew_protocol.periodic_maintenance()) - - # send translated or protocol generated packets to respective upstreams - self.loop.create_task(self.hytera_dmr_protocol.send_hytera_from_queue()) - self.loop.create_task(self.homebrew_protocol.send_mmdvm_from_queue()) - - # connect Hytera repeater - await self.hytera_p2p_connect() - await self.hytera_dmr_connect() - if not self.settings.hytera_disable_rdac: - await self.hytera_rdac_connect() - - # MMDVM will get connected once the Hytera is set-up and running correctly - # it is not meant to be started here - - async def hytera_p2p_connect(self) -> None: - # P2P/IPSC Service address - await self.loop.create_datagram_endpoint( - lambda: self.hytera_p2p_protocol, - local_addr=(self.settings.ipsc_ip, self.settings.p2p_port), - ) + def homebrew_connection_lost(self, ip: str) -> None: + asyncio.run(self.homebrew_connect(ip=ip)) async def hytera_dmr_connect(self) -> None: await self.loop.create_datagram_endpoint( lambda: self.hytera_dmr_protocol, local_addr=(self.settings.ipsc_ip, self.settings.dmr_port), + reuse_port=True, ) - async def hytera_rdac_connect(self) -> None: - await self.loop.create_datagram_endpoint( - lambda: self.hytera_rdac_protocol, - local_addr=(self.settings.ipsc_ip, self.settings.rdac_port), - ) - - async def homebrew_connect(self) -> None: - incorrect_config_params = self.settings.get_incorrect_configurations() + async def homebrew_connect(self, ip: str) -> None: + incorrect_config_params = self.settings.get_incorrect_configurations(ip) if len(incorrect_config_params) > 0: self.homebrew_protocol.log_error( "Current configuration is not valid for connection" @@ -116,11 +99,71 @@ async def homebrew_connect(self) -> None: # Extract bound socket port self.settings.hb_local_port = hb_local_socket.getsockname()[1] - def homebrew_connection_lost(self) -> None: - asyncio.run(self.homebrew_connect()) + async def go(self): + # start DMR protocol + await self.hytera_dmr_connect() + + # start translator tasks + self.loop.create_task(self.hytera_mmdvm_translator.translate_from_mmdvm()) + self.loop.create_task(self.hytera_mmdvm_translator.translate_from_hytera()) + + # mmdvm maintenance (auto login, auth, ping/pong) + self.loop.create_task(self.homebrew_protocol.periodic_maintenance()) + + # send translated or protocol generated packets to respective upstreams + self.loop.create_task(self.hytera_dmr_protocol.send_hytera_from_queue()) + self.loop.create_task(self.homebrew_protocol.send_mmdvm_from_queue()) + + +class HyteraHomebrewBridge(CallbackInterface): + def __init__(self, settings_ini_path: str): + self.loop: Optional[AbstractEventLoop] = None + self.settings: BridgeSettings = BridgeSettings(filepath=settings_ini_path) + self.repeaters: Dict[str, HyteraRepeater] = {} + # hytera ipsc: p2p dmr and rdac + self.hytera_p2p_protocol: HyteraP2PProtocol = HyteraP2PProtocol( + settings=self.settings, repeater_accepted_callback=self + ) + self.hytera_rdac_protocol: HyteraRDACProtocol = HyteraRDACProtocol( + settings=self.settings, rdac_completed_callback=self + ) + # prepare translator + + async def go(self) -> None: + self.loop = asyncio.get_running_loop() + self.settings.print_settings() + + # connect Hytera repeater + await self.hytera_p2p_connect() + if not self.settings.hytera_disable_rdac: + await self.hytera_rdac_connect() + + async def homebrew_connect(self, ip: str) -> None: + if not self.repeaters.get(ip): + self.repeaters[ip] = HyteraRepeater( + ip=ip, settings=self.settings, asyncloop=self.loop + ) + await self.repeaters[ip].go() + + await self.repeaters[ip].homebrew_connect(ip) + + async def hytera_p2p_connect(self) -> None: + # P2P/IPSC Service address + await self.loop.create_datagram_endpoint( + lambda: self.hytera_p2p_protocol, + local_addr=(self.settings.ipsc_ip, self.settings.p2p_port), + ) + + async def hytera_rdac_connect(self) -> None: + await self.loop.create_datagram_endpoint( + lambda: self.hytera_rdac_protocol, + local_addr=(self.settings.ipsc_ip, self.settings.rdac_port), + ) def stop_running(self) -> None: - self.homebrew_protocol.disconnect() + for ip, repeater in self.repeaters.items(): + repeater.homebrew_protocol.disconnect() + self.hytera_p2p_protocol.disconnect() self.loop.stop() for task in asyncio.Task.all_tasks(): @@ -156,28 +199,6 @@ def stop_running(self) -> None: ) exit(1) - self_name: str = "hytera_homebrew_bridge" - self_spec = importlib.util.find_spec(self_name) - if self_spec is None: - mainlog.debug( - "Package hytera-homebrew-bridge is not installed, trying locally\n" - ) - parent_folder: str = os.path.dirname( - os.path.dirname(os.path.realpath(__file__)) - ) - expected_folder: str = f"{parent_folder}{os.path.sep}" - if os.path.isdir(expected_folder): - sys.path.append(expected_folder) - - from hytera_homebrew_bridge.lib.hytera_protocols import ( - HyteraP2PProtocol, - HyteraDMRProtocol, - HyteraRDACProtocol, - ) - from hytera_homebrew_bridge.lib.mmdvm_protocol import MMDVMProtocol - from hytera_homebrew_bridge.lib.settings import BridgeSettings - from hytera_homebrew_bridge.lib.hytera_mmdvm_translator import HyteraMmdvmTranslator - uvloop_spec = importlib.util.find_spec("uvloop") if uvloop_spec: import uvloop diff --git a/hytera_homebrew_bridge/dmrlib/transmission.py b/hytera_homebrew_bridge/dmrlib/transmission.py index 906bfa7..c9357b1 100644 --- a/hytera_homebrew_bridge/dmrlib/transmission.py +++ b/hytera_homebrew_bridge/dmrlib/transmission.py @@ -130,13 +130,13 @@ def process_csbk(self, csbk: DmrCsbk): self.new_transmission(TransmissionType.DataTransmission) if csbk.csbk_opcode == DmrCsbk.CsbkoTypes.preamble: if self.blocks_expected == 0: - self.blocks_expected = csbk.preamble_csbk_blocks_to_follow + 1 + self.blocks_expected = csbk.csbk_data.preamble_csbk_blocks_to_follow + 1 elif ( self.blocks_expected - self.blocks_received - != csbk.preamble_csbk_blocks_to_follow + 1 + != csbk.csbk_data.preamble_csbk_blocks_to_follow + 1 ): print( - f"CSBK not setting expected to {self.blocks_expected} - {self.blocks_received} != {csbk.preamble_csbk_blocks_to_follow}" + f"CSBK not setting expected to {self.blocks_expected} - {self.blocks_received} != {csbk.csbk_data.preamble_csbk_blocks_to_follow}" ) self.blocks_received += 1 @@ -266,7 +266,7 @@ def end_data_transmission(self): for packet in self.blocks: if isinstance(packet, DmrCsbk): print( - f"[CSBK] [{packet.preamble_source_address} -> {packet.preamble_target_address}] [{packet.preamble_group_or_individual}]" + f"[CSBK] [{packet.csbk_data.source_address} -> {packet.csbk_data.target_address}]" ) elif isinstance(packet, DmrDataHeader): print( @@ -355,14 +355,12 @@ def process_packet(self, burst: BurstInfo) -> BurstInfo: lc_info_bits = decode_complete_lc(burst.data_bits[:98] + burst.data_bits[166:]) if burst.data_type == DataType.VoiceLCHeader: - print("voice header", lc_info_bits.tobytes().hex()) self.process_voice_header(FullLinkControl.from_bytes(lc_info_bits)) elif burst.data_type == DataType.DataHeader: self.process_data_header(DmrDataHeader.from_bytes(lc_info_bits)) elif burst.data_type == DataType.CSBK: self.process_csbk(DmrCsbk.from_bytes(lc_info_bits)) elif burst.data_type == DataType.TerminatorWithLC: - print("voice terminator", lc_info_bits.tobytes().hex()) self.blocks_received += 1 self.end_voice_transmission() elif burst.data_type in [ diff --git a/hytera_homebrew_bridge/lib/callback_interface.py b/hytera_homebrew_bridge/lib/callback_interface.py new file mode 100644 index 0000000..6b0a609 --- /dev/null +++ b/hytera_homebrew_bridge/lib/callback_interface.py @@ -0,0 +1,3 @@ +class CallbackInterface: + async def homebrew_connect(self, ip: str) -> None: + pass diff --git a/hytera_homebrew_bridge/lib/custom_bridge_datagram_protocol.py b/hytera_homebrew_bridge/lib/custom_bridge_datagram_protocol.py index 894b6b3..1d89dd4 100644 --- a/hytera_homebrew_bridge/lib/custom_bridge_datagram_protocol.py +++ b/hytera_homebrew_bridge/lib/custom_bridge_datagram_protocol.py @@ -14,7 +14,7 @@ def __init__(self, settings: BridgeSettings) -> None: def hytera_repeater_obtain_snmp(self, address: tuple, force: bool = False) -> None: self.settings.hytera_repeater_ip = address[0] if self.settings.snmp_enabled: - if force or not self.settings.hytera_snmp_data: + if force or not self.settings.hytera_snmp_data.get(address[0]): SNMP().walk_ip(address, self.settings) else: self.log_warning("SNMP is disabled") diff --git a/hytera_homebrew_bridge/lib/hytera_mmdvm_translator.py b/hytera_homebrew_bridge/lib/hytera_mmdvm_translator.py index 7bfb4b8..a2bb9ae 100644 --- a/hytera_homebrew_bridge/lib/hytera_mmdvm_translator.py +++ b/hytera_homebrew_bridge/lib/hytera_mmdvm_translator.py @@ -28,6 +28,7 @@ def __init__( hytera_outgoing: Queue, mmdvm_incoming: Queue, mmdvm_outgoing: Queue, + hytera_repeater_ip: str, ): self.transmission_watcher: TransmissionWatcher = TransmissionWatcher() self.settings = settings @@ -35,12 +36,15 @@ def __init__( self.queue_hytera_output = hytera_outgoing self.queue_mmdvm_to_translate = mmdvm_incoming self.queue_mmdvm_output = mmdvm_outgoing + self.hytera_ip: str = hytera_repeater_ip async def translate_from_hytera(self): loop = asyncio.get_running_loop() while loop.is_running(): try: - packet: KaitaiStruct = await self.queue_hytera_to_translate.get() + ip, packet = await self.queue_hytera_to_translate.get() + assert isinstance(packet, KaitaiStruct) + assert isinstance(ip, str) if not isinstance(packet, IpSiteConnectProtocol): self.queue_hytera_to_translate.task_done() continue @@ -68,7 +72,7 @@ async def translate_from_hytera(self): + burst.sequence_no.to_bytes(1, byteorder="big") + packet.source_radio_id.to_bytes(3, byteorder="big") + packet.destination_radio_id.to_bytes(3, byteorder="big") - + self.settings.get_repeater_dmrid().to_bytes( + + self.settings.get_repeater_dmrid(ip).to_bytes( 4, byteorder="big" ) + get_mmdvm_bitflags(burst, packet) @@ -77,7 +81,7 @@ async def translate_from_hytera(self): + byteswap_bytes(packet.ipsc_payload)[0:-1] ) - self.queue_mmdvm_output.put_nowait(mmdvm_out) + self.queue_mmdvm_output.put_nowait((self.hytera_ip, mmdvm_out)) else: print( "Hytera BurstInfo not available", @@ -105,7 +109,9 @@ async def translate_from_mmdvm(self): loop = asyncio.get_running_loop() while loop.is_running(): try: - packet: Mmdvm2020 = await self.queue_mmdvm_to_translate.get() + ip, packet = await self.queue_mmdvm_to_translate.get() + assert isinstance(packet, Mmdvm2020) + assert isinstance(ip, str) if not isinstance(packet.command_data, Mmdvm2020.TypeDmrData): self.queue_mmdvm_to_translate.task_done() continue @@ -133,7 +139,9 @@ async def translate_from_mmdvm(self): frame_type=get_ipsc_frame_type(burst), hytera_slot_type=get_ipsc_slot_type(burst), ) - self.queue_hytera_output.put_nowait(hytera_out_packet) + self.queue_hytera_output.put_nowait( + (self.hytera_ip, hytera_out_packet) + ) else: print( f"MMDVM BurstInfo not available", diff --git a/hytera_homebrew_bridge/lib/hytera_protocols.py b/hytera_homebrew_bridge/lib/hytera_protocols.py index b871a11..f3b3ec9 100644 --- a/hytera_homebrew_bridge/lib/hytera_protocols.py +++ b/hytera_homebrew_bridge/lib/hytera_protocols.py @@ -2,12 +2,13 @@ import asyncio from asyncio import transports, Queue from binascii import hexlify -from typing import Optional, Tuple, Coroutine +from typing import Optional, Tuple, Dict from kaitaistruct import ValidationNotEqualError, KaitaiStruct from okdmr.kaitai.hytera.ip_site_connect_protocol import IpSiteConnectProtocol from hytera_homebrew_bridge.dmrlib.packet_utils import parse_hytera_data +from hytera_homebrew_bridge.lib.callback_interface import CallbackInterface from hytera_homebrew_bridge.lib.custom_bridge_datagram_protocol import ( CustomBridgeDatagramProtocol, ) @@ -31,10 +32,11 @@ class HyteraP2PProtocol(CustomBridgeDatagramProtocol): PACKET_TYPE_REQUEST_REGISTRATION, ] - def __init__(self, settings: BridgeSettings, repeater_accepted_callback: Coroutine): + def __init__( + self, settings: BridgeSettings, repeater_accepted_callback: CallbackInterface + ): super().__init__(settings) self.transport: Optional[transports.DatagramTransport] = None - self.settings.hytera_is_registered = False self.repeater_accepted_callback = repeater_accepted_callback @staticmethod @@ -53,7 +55,7 @@ def packet_is_ack(data: bytes) -> bool: def command_get_type(data: bytes) -> int: return data[20] if len(data) > 20 else 0 - def handle_registration(self, data: bytes, address: tuple) -> None: + def handle_registration(self, data: bytes, address: Tuple[str, int]) -> None: data = bytearray(data) data[3] = 0x50 # set repeater ID @@ -67,12 +69,16 @@ def handle_registration(self, data: bytes, address: tuple) -> None: self.transport.sendto(data, address) self.hytera_repeater_obtain_snmp(address) - self.settings.hytera_is_registered = True - asyncio.get_running_loop().create_task(self.repeater_accepted_callback) + self.settings.hytera_is_registered[address[0]] = True + asyncio.get_running_loop().create_task( + self.repeater_accepted_callback.homebrew_connect(address[0]) + ) - def handle_rdac_request(self, data: bytes, address: tuple) -> None: - if not self.settings.hytera_is_registered: - self.log_debug("Rejecting RDAC request for not-registered repeater") + def handle_rdac_request(self, data: bytes, address: Tuple[str, int]) -> None: + if not self.settings.hytera_is_registered.get(address[0]): + self.log_debug( + f"Rejecting RDAC request for not-registered repeater {address[0]}" + ) self.transport.sendto(bytes([0x00]), address) return @@ -85,7 +91,7 @@ def handle_rdac_request(self, data: bytes, address: tuple) -> None: data[13] = 0x01 data.append(0x01) - self.settings.hytera_repeater_ip = address[0] + self.settings.hytera_repeater_data[address[0]].hytera_repeater_ip = address[0] self.transport.sendto(data, response_address) self.log_debug("RDAC Accept for %s.%s" % address) @@ -106,9 +112,11 @@ def get_redirect_packet(data: bytearray, target_port: int): data += target_port.to_bytes(2, "little") return data - def handle_dmr_request(self, data: bytes, address: tuple) -> None: - if not self.settings.hytera_is_registered: - self.log_debug("Rejecting DMR request for not-registered repeater") + def handle_dmr_request(self, data: bytes, address: Tuple[str, int]) -> None: + if not self.settings.hytera_is_registered.get(address[0]): + self.log_debug( + f"Rejecting DMR request for not-registered repeater {address[0]}" + ) self.transport.sendto(bytes([0x00]), address) return @@ -126,9 +134,11 @@ def handle_dmr_request(self, data: bytes, address: tuple) -> None: data = self.get_redirect_packet(data, self.settings.dmr_port) self.transport.sendto(data, response_address) - def handle_ping(self, data: bytes, address: tuple) -> None: - if not self.settings.hytera_is_registered: - self.log_debug("Rejecting ping request for not-registered repeater") + def handle_ping(self, data: bytes, address: Tuple[str, int]) -> None: + if not self.settings.hytera_is_registered.get(address[0]): + self.log_debug( + f"Rejecting ping request for not-registered repeater {address[0]}" + ) self.transport.sendto(bytes([0x00]), address) return data = bytearray(data) @@ -148,6 +158,7 @@ def connection_made(self, transport: transports.BaseTransport) -> None: def datagram_received(self, data: bytes, address: Tuple[str, int]) -> None: packet_type = self.command_get_type(data) is_command = self.packet_is_command(data) + self.settings.ensure_repeater_data(address) if is_command: if packet_type not in self.KNOWN_PACKET_TYPES: if not self.packet_is_ack(data): @@ -353,118 +364,125 @@ class HyteraRDACProtocol(CustomBridgeDatagramProtocol): ) STEP12_RESPONSE = bytes([0x7E, 0x04, 0x00, 0xFA]) - def __init__(self, settings: BridgeSettings, rdac_completed_callback: Coroutine): + def __init__( + self, settings: BridgeSettings, rdac_completed_callback: CallbackInterface + ): super().__init__(settings) self.transport: Optional[transports.DatagramTransport] = None self.rdac_completed_callback = rdac_completed_callback - self.step = 0 + self.step: Dict[str, int] = dict() - def step0(self, _: bytes, address: tuple) -> None: + def step0(self, _: bytes, address: Tuple[str, int]) -> None: self.log_debug("RDAC identification started") - self.step = 1 + self.step[address[0]] = 1 self.transport.sendto(self.STEP0_REQUEST, address) - def step1(self, data: bytes, address: tuple) -> None: + def step1(self, data: bytes, address: Tuple[str, int]) -> None: if data[: len(self.STEP0_RESPONSE)] == self.STEP0_RESPONSE: - self.step = 2 + self.step[address[0]] = 2 self.transport.sendto(self.STEP1_REQUEST, address) - def step2(self, data: bytes, _: tuple) -> None: + def step2(self, data: bytes, address: Tuple[str, int]) -> None: if data[: len(self.STEP1_RESPONSE)] == self.STEP1_RESPONSE: - self.step = 3 + self.step[address[0]] = 3 - def step3(self, data: bytes, address: tuple) -> None: + def step3(self, data: bytes, address: Tuple[str, int]) -> None: if data[: len(self.STEP2_RESPONSE)] == self.STEP2_RESPONSE: self.settings.hytera_repeater_id = int.from_bytes( data[18:21], byteorder="little" ) - self.step = 4 + self.step[address[0]] = 4 self.transport.sendto(self.STEP3_REQUEST, address) - def step4(self, data: bytes, address: tuple) -> None: + def step4(self, data: bytes, address: Tuple[str, int]) -> None: if data[: len(self.STEP3_RESPONSE)] == self.STEP3_RESPONSE: - self.step = 5 + self.step[address[0]] = 5 self.transport.sendto(self.STEP4_REQUEST_1, address) self.transport.sendto(self.STEP4_REQUEST_2, address) - def step5(self, data: bytes, _: tuple) -> None: + def step5(self, data: bytes, address: Tuple[str, int]) -> None: if data[: len(self.STEP4_RESPONSE_1)] == self.STEP4_RESPONSE_1: - self.step = 6 + self.step[address[0]] = 6 - def step6(self, data: bytes, address: tuple) -> None: + def step6(self, data: bytes, address: Tuple[str, int]) -> None: + ip: str = address[0] if data[: len(self.STEP4_RESPONSE_2)] == self.STEP4_RESPONSE_2: - self.settings.hytera_callsign = ( + self.settings.hytera_repeater_data[ip].hytera_callsign = ( data[88:108] .decode("utf_16_le") .encode("utf-8") .strip(b"\x00") .decode("utf-8") ) - self.settings.hytera_hardware = ( + self.settings.hytera_repeater_data[ip].hytera_hardware = ( data[120:184] .decode("utf_16_le") .encode("utf-8") .strip(b"\x00") .decode("utf-8") ) - self.settings.hytera_firmware = ( + self.settings.hytera_repeater_data[ip].hytera_firmware = ( data[56:88] .decode("utf_16_le") .encode("utf-8") .strip(b"\x00") .decode("utf-8") ) - self.settings.hytera_serial_number = ( + self.settings.hytera_repeater_data[ip].hytera_serial_number = ( data[184:216] .decode("utf_16_le") .encode("utf-8") .strip(b"\x00") .decode("utf-8") ) - self.step = 7 + self.step[address[0]] = 7 self.transport.sendto(self.STEP6_REQUEST_1, address) self.transport.sendto(self.STEP6_REQUEST_2, address) - def step7(self, data: bytes, address: tuple) -> None: + def step7(self, data: bytes, address: Tuple[str, int]) -> None: if data[: len(self.STEP6_RESPONSE)] == self.STEP6_RESPONSE: - self.step = 8 + self.step[address[0]] = 8 self.transport.sendto(self.STEP7_REQUEST, address) - def step8(self, data: bytes, _: tuple) -> None: + def step8(self, data: bytes, address: Tuple[str, int]) -> None: if data[: len(self.STEP7_RESPONSE_1)] == self.STEP7_RESPONSE_1: - self.step = 10 + self.step[address[0]] = 10 - def step10(self, data: bytes, address: tuple) -> None: + def step10(self, data: bytes, address: Tuple[str, int]) -> None: if data[: len(self.STEP7_RESPONSE_2)] == self.STEP7_RESPONSE_2: - self.settings.hytera_repeater_mode = data[26] - self.settings.hytera_tx_freq = int.from_bytes( - data[29:33], byteorder="little" - ) - self.settings.hytera_rx_freq = int.from_bytes( - data[33:37], byteorder="little" - ) - self.step = 11 + self.settings.hytera_repeater_data[address[0]].hytera_repeater_mode = data[ + 26 + ] + self.settings.hytera_repeater_data[ + address[0] + ].hytera_tx_freq = int.from_bytes(data[29:33], byteorder="little") + self.settings.hytera_repeater_data[ + address[0] + ].hytera_rx_freq = int.from_bytes(data[33:37], byteorder="little") + self.step[address[0]] = 11 self.transport.sendto(self.STEP10_REQUEST, address) - def step11(self, data: bytes, _: tuple) -> None: + def step11(self, data: bytes, address: Tuple[str, int]) -> None: if data[: len(self.STEP10_RESPONSE_1)] == self.STEP10_RESPONSE_1: - self.step = 12 + self.step[address[0]] = 12 - def step12(self, data: bytes, address: tuple) -> None: + def step12(self, data: bytes, address: Tuple[str, int]) -> None: if data[: len(self.STEP10_RESPONSE_2)] == self.STEP10_RESPONSE_2: - self.step = 13 + self.step[address[0]] = 13 self.transport.sendto(self.STEP12_REQUEST_1, address) self.transport.sendto(self.STEP12_REQUEST_2, address) - def step13(self, data: bytes, address: tuple) -> None: + def step13(self, data: bytes, address: Tuple[str, int]) -> None: if data[: len(self.STEP12_RESPONSE)] == self.STEP12_RESPONSE: - self.step = 14 + self.step[address[0]] = 14 self.log_debug("rdac completed identification") self.settings.print_repeater_configuration() self.hytera_repeater_obtain_snmp(address) - asyncio.get_running_loop().create_task(self.rdac_completed_callback) + asyncio.get_running_loop().create_task( + self.rdac_completed_callback.homebrew_connect(address[0]) + ) - def step14(self, data: bytes, address: tuple) -> None: + def step14(self, data: bytes, address: Tuple[str, int]) -> None: pass def connection_lost(self, exc: Optional[Exception]) -> None: @@ -477,8 +495,13 @@ def connection_made(self, transport: transports.BaseTransport) -> None: self.log_debug("connection prepared") def datagram_received(self, data: bytes, addr: Tuple[str, int]) -> None: - if len(data) == 1 and self.step != 14: - if self.step == 4: + self.settings.ensure_repeater_data(addr) + + if not self.step.get(addr[0]): + self.step[addr[0]] = 0 + + if len(data) == 1 and self.step[addr[0]] != 14: + if self.step[addr[0]] == 4: self.log_warning( "check repeater zone programming, if Digital IP" "Multi-Site Connect mode allows data pass from timeslots" @@ -486,16 +509,16 @@ def datagram_received(self, data: bytes, addr: Tuple[str, int]) -> None: self.log_warning( "restart process if response is protocol reset and current step is not 14" ) - self.step = 0 + self.step[addr[0]] = 0 self.step0(data, addr) - elif len(data) != 1 and self.step == 14: + elif len(data) != 1 and self.step[addr[0]] == 14: self.log_error("RDAC finished, received extra data %s" % hexlify(data)) - elif len(data) == 1 and self.step == 14: + elif len(data) == 1 and self.step[addr[0]] == 14: if data[0] == 0x00: # no data available response self.transport.sendto(bytes(0x41), addr) else: - getattr(self, "step%d" % self.step)(data, addr) + getattr(self, "step%d" % self.step[addr[0]])(data, addr) class HyteraDMRProtocol(CustomBridgeDatagramProtocol): @@ -504,15 +527,19 @@ def __init__( settings: BridgeSettings, queue_incoming: Queue, queue_outgoing: Queue, + hytera_repeater_ip: str, ) -> None: super().__init__(settings) self.transport: Optional[transports.DatagramTransport] = None self.queue_incoming = queue_incoming self.queue_outgoing = queue_outgoing + self.ip: str = hytera_repeater_ip async def send_hytera_from_queue(self) -> None: while asyncio.get_running_loop().is_running(): - packet: bytes = await self.queue_outgoing.get() + ip, packet = await self.queue_outgoing.get() + assert isinstance(ip, str) + assert isinstance(packet, bytes) if self.transport and not self.transport.is_closing(): ipsc = IpSiteConnectProtocol.from_bytes(packet) self.log_debug( @@ -525,9 +552,7 @@ async def send_hytera_from_queue(self) -> None: dmrdata_hash="", ) ) - self.transport.sendto( - packet, (self.settings.hytera_repeater_ip, self.settings.dmr_port) - ) + self.transport.sendto(packet, (ip, self.settings.dmr_port)) # notify about outbound done self.queue_outgoing.task_done() @@ -542,10 +567,16 @@ def connection_made(self, transport: transports.BaseTransport) -> None: self.log_debug("connection prepared") def datagram_received(self, data: bytes, addr: Tuple[str, int]) -> None: + if self.ip != addr[0]: + print(f"HyteraDMRProtocol ignore from {addr[0]} expected {self.ip}") + return + + self.settings.ensure_repeater_data(addr) + self.log_debug(f"HYTER->HHB {data.hex()}") try: hytera_data: KaitaiStruct = parse_hytera_data(data) - self.queue_incoming.put_nowait(hytera_data) + self.queue_incoming.put_nowait((addr[0], hytera_data)) self.log_debug( common_log_format( diff --git a/hytera_homebrew_bridge/lib/mmdvm_protocol.py b/hytera_homebrew_bridge/lib/mmdvm_protocol.py index d19052d..f1859d6 100644 --- a/hytera_homebrew_bridge/lib/mmdvm_protocol.py +++ b/hytera_homebrew_bridge/lib/mmdvm_protocol.py @@ -34,14 +34,16 @@ def __init__( connection_lost_callback: Callable, queue_outgoing: Queue, queue_incoming: Queue, + hytera_repeater_ip: str, ) -> None: super().__init__(settings) self.settings = settings self.transport: Optional[transports.DatagramTransport] = None self.connection_lost_callback = connection_lost_callback - self.connection_status = self.CON_NEW - self.queue_outgoing = queue_outgoing - self.queue_incoming = queue_incoming + self.connection_status: int = self.CON_NEW + self.queue_outgoing: Queue = queue_outgoing + self.queue_incoming: Queue = queue_incoming + self.ip: str = hytera_repeater_ip async def periodic_maintenance(self) -> None: while not asyncio.get_running_loop().is_closed(): @@ -58,7 +60,9 @@ async def periodic_maintenance(self) -> None: async def send_mmdvm_from_queue(self) -> None: while not asyncio.get_running_loop().is_closed(): - packet: bytes = await self.queue_outgoing.get() + ip, packet = await self.queue_outgoing.get() + assert isinstance(ip, str) + assert isinstance(packet, bytes) if self.transport and not self.transport.is_closing(): self.transport.sendto(packet) try: @@ -150,7 +154,7 @@ def datagram_received(self, data: bytes, addr: Tuple[str, int]) -> None: self.connection_status = self.CON_NEW is_handled = True elif isinstance(packet.command_data, Mmdvm2020.TypeDmrData): - self.queue_incoming.put_nowait(packet) + self.queue_incoming.put_nowait((self.ip, packet)) is_handled = True self.log_debug( @@ -173,7 +177,10 @@ def send_login_request(self) -> None: self.log_info("Sending Login Request") self.connection_status = self.CON_LOGIN_REQUEST_SENT self.queue_outgoing.put_nowait( - struct.pack(">4sI", b"RPTL", self.settings.get_repeater_dmrid()) + ( + self.ip, + struct.pack(">4sI", b"RPTL", self.settings.get_repeater_dmrid(self.ip)), + ) ) def send_login_response(self, challenge: int) -> None: @@ -182,7 +189,7 @@ def send_login_response(self, challenge: int) -> None: challenge_response = struct.pack( ">4sI32s", b"RPTK", - self.settings.get_repeater_dmrid(), + self.settings.get_repeater_dmrid(self.ip), a2b_hex( sha256( b"".join( @@ -194,17 +201,17 @@ def send_login_response(self, challenge: int) -> None: ).hexdigest() ), ) - self.queue_outgoing.put_nowait(challenge_response) + self.queue_outgoing.put_nowait((self.ip, challenge_response)) def send_configuration(self) -> None: self.log_info(f"Sending self configuration to master") packet = struct.pack( ">4sI8s9s9s2s2s8s9s3s20s19s1s124s40s40s", b"RPTC", - self.settings.get_repeater_dmrid(), - self.settings.get_repeater_callsign()[0:8].ljust(8).encode(), - self.settings.get_repeater_rx_freq()[0:9].rjust(9, "0").encode(), - self.settings.get_repeater_tx_freq()[0:9].rjust(9, "0").encode(), + self.settings.get_repeater_dmrid(self.ip), + self.settings.get_repeater_callsign(self.ip)[0:8].ljust(8).encode(), + self.settings.get_repeater_rx_freq(self.ip)[0:9].rjust(9, "0").encode(), + self.settings.get_repeater_tx_freq(self.ip)[0:9].rjust(9, "0").encode(), str(self.settings.hb_tx_power & 0xFFFF).rjust(2, "0").encode(), str(self.settings.hb_color_code & 0xF).rjust(2, "0").encode(), self.settings.hb_latitude[0:8].rjust(8, "0").encode(), @@ -220,19 +227,23 @@ def send_configuration(self) -> None: self.settings.hb_package_id[0:40].ljust(40).encode(), ) - self.queue_outgoing.put_nowait(packet) + self.queue_outgoing.put_nowait((self.ip, packet)) config: Mmdvm2020 = Mmdvm2020.from_bytes(packet) log_mmdvm_configuration(logger=self.get_logger(), packet=config) def send_ping(self) -> None: - packet = struct.pack(">7sI", b"RPTPING", self.settings.get_repeater_dmrid()) - self.queue_outgoing.put_nowait(packet) + packet = struct.pack( + ">7sI", b"RPTPING", self.settings.get_repeater_dmrid(self.ip) + ) + self.queue_outgoing.put_nowait((self.ip, packet)) def send_closing(self) -> None: self.log_info("Closing MMDVM connection") - packet = struct.pack(">5sI", b"RPTCL", self.settings.get_repeater_dmrid()) - self.queue_outgoing.put_nowait(packet) + packet = struct.pack( + ">5sI", b"RPTCL", self.settings.get_repeater_dmrid(self.ip) + ) + self.queue_outgoing.put_nowait((self.ip, packet)) def disconnect(self) -> None: if self.transport and not self.transport.is_closing(): diff --git a/hytera_homebrew_bridge/lib/settings.py b/hytera_homebrew_bridge/lib/settings.py index 4c43f46..c668c56 100644 --- a/hytera_homebrew_bridge/lib/settings.py +++ b/hytera_homebrew_bridge/lib/settings.py @@ -1,12 +1,29 @@ #!/usr/bin/env python3 import configparser +from typing import Dict, Tuple from hytera_homebrew_bridge.lib.logging_trait import LoggingTrait _UNSET = object() +class HyteraRepeaterData: + def __init__(self): + self.hytera_repeater_id: int = 0 + self.hytera_callsign: str = "" + self.hytera_hardware: str = "" + self.hytera_firmware: str = "" + self.hytera_serial_number: str = "" + self.hytera_repeater_mode: int = 0 + self.hytera_tx_freq: int = 0 + self.hytera_rx_freq: int = 0 + self.hytera_repeater_ip: str = "" + + def __repr__(self) -> str: + return f"[DMRID: {self.hytera_repeater_id}] [IP: {self.hytera_repeater_ip}] [CALL: {self.hytera_callsign}]" + + class BridgeSettings(LoggingTrait): SECTION_GENERAL = "general" SECTION_HOMEBREW = "homebrew" @@ -144,19 +161,11 @@ def __init__(self, filepath: str = None, filedata: str = None) -> None: ) # hytera_protocols variables - self.hytera_is_registered: bool = False - self.hytera_snmp_data: dict = dict() + self.hytera_is_registered: Dict[str, bool] = dict() + self.hytera_snmp_data: Dict[str, dict] = dict() # hytera repeater data - self.hytera_repeater_id: int = 0 - self.hytera_callsign: str = "" - self.hytera_hardware: str = "" - self.hytera_firmware: str = "" - self.hytera_serial_number: str = "" - self.hytera_repeater_mode: int = 0 - self.hytera_tx_freq: int = 0 - self.hytera_rx_freq: int = 0 - self.hytera_repeater_ip: str = "" + self.hytera_repeater_data: Dict[str, HyteraRepeaterData] = dict() @staticmethod def getint_safe( @@ -179,44 +188,50 @@ def getint_safe( raise return fallback - def get_repeater_rx_freq(self) -> str: + def get_repeater_rx_freq(self, ip: str) -> str: from hytera_homebrew_bridge.lib import snmp return ( self.hb_rx_freq - or str(self.hytera_rx_freq) - or str(self.hytera_snmp_data.get(snmp.SNMP.OID_RX_FREQUENCE)) + or str( + self.hytera_repeater_data.get(ip, HyteraRepeaterData()).hytera_rx_freq + ) + or str(self.hytera_snmp_data.get(ip, {}).get(snmp.SNMP.OID_RX_FREQUENCE)) ) - def get_repeater_tx_freq(self) -> str: + def get_repeater_tx_freq(self, ip: str) -> str: from hytera_homebrew_bridge.lib import snmp return ( self.hb_tx_freq - or str(self.hytera_tx_freq) - or str(self.hytera_snmp_data.get(snmp.SNMP.OID_TX_FREQUENCE)) + or str( + self.hytera_repeater_data.get(ip, HyteraRepeaterData()).hytera_tx_freq + ) + or str(self.hytera_snmp_data.get(ip, {}).get(snmp.SNMP.OID_TX_FREQUENCE)) ) - def get_repeater_callsign(self) -> str: + def get_repeater_callsign(self, ip: str) -> str: from hytera_homebrew_bridge.lib import snmp return ( self.hb_callsign - or self.hytera_callsign - or self.hytera_snmp_data.get(snmp.SNMP.OID_RADIO_ALIAS) + or self.hytera_repeater_data.get(ip, HyteraRepeaterData()).hytera_callsign + or self.hytera_snmp_data.get(ip, {}).get(snmp.SNMP.OID_RADIO_ALIAS) ) - def get_repeater_dmrid(self) -> int: + def get_repeater_dmrid(self, ip: str) -> int: from hytera_homebrew_bridge.lib import snmp return int( self.hb_repeater_dmr_id - or self.hytera_repeater_id - or self.hytera_snmp_data.get(snmp.SNMP.OID_RADIO_ID) + or self.hytera_repeater_data.get( + ip, HyteraRepeaterData() + ).hytera_repeater_id + or self.hytera_snmp_data.get(ip, {}).get(snmp.SNMP.OID_RADIO_ID) or 0 ) - def get_incorrect_configurations(self) -> list: + def get_incorrect_configurations(self, ip: str) -> list: rtn: list = list() generic_error_message: str = ( @@ -224,11 +239,11 @@ def get_incorrect_configurations(self) -> list: "configuration process (either P2P, RDAC or SNMP) " ) - repeater_id = self.get_repeater_dmrid() + repeater_id = self.get_repeater_dmrid(ip=ip) if repeater_id < 1: rtn.append(("homebrew.repeater_dmr_id", repeater_id, generic_error_message)) - repeater_callsign = self.get_repeater_callsign() + repeater_callsign = self.get_repeater_callsign(ip=ip) if not repeater_callsign: rtn.append(("homebrew.callsign", repeater_callsign, generic_error_message)) @@ -246,3 +261,7 @@ def print_settings(self) -> None: def print_repeater_configuration(self): pass + + def ensure_repeater_data(self, address: Tuple[str, int]): + if not self.hytera_repeater_data.get(address[0]): + self.hytera_repeater_data[address[0]] = HyteraRepeaterData() diff --git a/hytera_homebrew_bridge/lib/snmp.py b/hytera_homebrew_bridge/lib/snmp.py index 3eddc26..2cd3ae9 100755 --- a/hytera_homebrew_bridge/lib/snmp.py +++ b/hytera_homebrew_bridge/lib/snmp.py @@ -123,6 +123,10 @@ def walk_ip( ) -> dict: ip, port = address is_success: bool = False + + if not settings_storage.hytera_snmp_data.get(address[0]): + settings_storage.hytera_snmp_data[address[0]] = {} + other_family: str = ( "public" if settings_storage.snmp_family == "hytera" else "hytera" ) @@ -139,7 +143,7 @@ def walk_ip( snmp_result = octet_string_to_utf8(str(snmp_result, "utf8")) elif oid in SNMP.ALL_FLOATS: snmp_result = int.from_bytes(snmp_result, byteorder="big") - settings_storage.hytera_snmp_data[oid] = snmp_result + settings_storage.hytera_snmp_data[address[0]][oid] = snmp_result is_success = True except SystemError: self.log_error("SNMP failed to obtain repeater info") @@ -163,11 +167,11 @@ def walk_ip( self.log_exception(e) if is_success: - self.print_snmp_data(settings_storage=settings_storage) + self.print_snmp_data(settings_storage=settings_storage, address=address) - return settings_storage.hytera_snmp_data + return settings_storage.hytera_snmp_data[address[0]] - def print_snmp_data(self, settings_storage: BridgeSettings): + def print_snmp_data(self, settings_storage: BridgeSettings, address: tuple): self.log_debug( "-------------- REPEATER SNMP CONFIGURATION ----------------------------" ) @@ -177,10 +181,10 @@ def print_snmp_data(self, settings_storage: BridgeSettings): if label_len > longest_label: longest_label = label_len - for key in settings_storage.hytera_snmp_data: + for key in settings_storage.hytera_snmp_data[address[0]]: print_settings = SNMP.READABLE_LABELS.get(key) if print_settings: - value = settings_storage.hytera_snmp_data.get(key) + value = settings_storage.hytera_snmp_data[address[0]].get(key) self.log_debug( "%s| %s" % ( diff --git a/pytest.ini b/pytest.ini new file mode 100644 index 0000000..2f6c8d1 --- /dev/null +++ b/pytest.ini @@ -0,0 +1,2 @@ +[pytest] +asyncio_mode=strict diff --git a/requirements.txt b/requirements.txt index f2a371b..f270797 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,12 +4,11 @@ asyncio>=3.4.3 dmr-utils3>=0.1.29 setuptools>=60.9.1 bitarray>=2.3.5 -dmr-kaitai>=0.8 -ok-dmrlib>=0.2 +ok-dmrlib>=0.3 # developers / contributors scapy>=2.4.5 uvloop>=0.15.2 pytest pytest-cov -pytest-asyncio>=0.18.1 \ No newline at end of file +pytest-asyncio>=0.18.1