Skip to content

Commit

Permalink
serve multiple slaves at once #5
Browse files Browse the repository at this point in the history
  • Loading branch information
smarek committed Feb 18, 2022
1 parent 1cb4c63 commit 23aa916
Show file tree
Hide file tree
Showing 11 changed files with 306 additions and 210 deletions.
177 changes: 99 additions & 78 deletions bin/hytera-homebrew-bridge.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,34 @@
import sys
from asyncio import AbstractEventLoop, Queue
from signal import SIGINT, SIGTERM
from typing import Optional


class HyteraHomebrewBridge:
def __init__(self, settings_ini_path: str):
self.loop: Optional[AbstractEventLoop] = None
self.settings: BridgeSettings = BridgeSettings(filepath=settings_ini_path)
from typing import Optional, Dict

self_name: str = "hytera_homebrew_bridge"
self_spec = importlib.util.find_spec(self_name)
if self_spec is None:
parent_folder: str = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
expected_folder: str = f"{parent_folder}{os.path.sep}"
if os.path.isdir(expected_folder):
sys.path.append(expected_folder)

from hytera_homebrew_bridge.lib.hytera_protocols import (
HyteraP2PProtocol,
HyteraDMRProtocol,
HyteraRDACProtocol,
)
from hytera_homebrew_bridge.lib.mmdvm_protocol import MMDVMProtocol
from hytera_homebrew_bridge.lib.settings import BridgeSettings
from hytera_homebrew_bridge.lib.hytera_mmdvm_translator import HyteraMmdvmTranslator

from hytera_homebrew_bridge.lib.callback_interface import CallbackInterface


class HyteraRepeater(CallbackInterface):
def __init__(self, ip: str, settings: BridgeSettings, asyncloop: AbstractEventLoop):
# ip of hytera repeater
self.ip: str = ip
self.settings: BridgeSettings = settings
self.loop: AbstractEventLoop = asyncloop
# message queues for translator
self.queue_mmdvm_outgoing: Queue = Queue()
self.queue_hytera_incoming: Queue = Queue()
Expand All @@ -25,73 +46,35 @@ def __init__(self, settings_ini_path: str):
connection_lost_callback=self.homebrew_connection_lost,
queue_outgoing=self.queue_mmdvm_outgoing,
queue_incoming=self.queue_mmdvm_incoming,
)
# hytera ipsc: p2p dmr and rdac
self.hytera_p2p_protocol: HyteraP2PProtocol = HyteraP2PProtocol(
settings=self.settings, repeater_accepted_callback=self.homebrew_connect()
hytera_repeater_ip=ip,
)
self.hytera_dmr_protocol: HyteraDMRProtocol = HyteraDMRProtocol(
settings=self.settings,
queue_incoming=self.queue_hytera_incoming,
queue_outgoing=self.queue_hytera_outgoing,
hytera_repeater_ip=ip,
)
self.hytera_rdac_protocol: HyteraRDACProtocol = HyteraRDACProtocol(
settings=self.settings, rdac_completed_callback=self.homebrew_connect()
)
# prepare translator
self.hytera_mmdvm_translator: HyteraMmdvmTranslator = HyteraMmdvmTranslator(
settings=self.settings,
mmdvm_incoming=self.queue_mmdvm_incoming,
hytera_incoming=self.queue_hytera_incoming,
mmdvm_outgoing=self.queue_mmdvm_outgoing,
hytera_outgoing=self.queue_hytera_outgoing,
hytera_repeater_ip=self.ip,
)

async def go(self) -> None:
self.loop = asyncio.get_running_loop()
self.settings.print_settings()

# start translator tasks
self.loop.create_task(self.hytera_mmdvm_translator.translate_from_mmdvm())
self.loop.create_task(self.hytera_mmdvm_translator.translate_from_hytera())

# mmdvm maintenance (auto login, auth, ping/pong)
self.loop.create_task(self.homebrew_protocol.periodic_maintenance())

# send translated or protocol generated packets to respective upstreams
self.loop.create_task(self.hytera_dmr_protocol.send_hytera_from_queue())
self.loop.create_task(self.homebrew_protocol.send_mmdvm_from_queue())

# connect Hytera repeater
await self.hytera_p2p_connect()
await self.hytera_dmr_connect()
if not self.settings.hytera_disable_rdac:
await self.hytera_rdac_connect()

# MMDVM will get connected once the Hytera is set-up and running correctly
# it is not meant to be started here

async def hytera_p2p_connect(self) -> None:
# P2P/IPSC Service address
await self.loop.create_datagram_endpoint(
lambda: self.hytera_p2p_protocol,
local_addr=(self.settings.ipsc_ip, self.settings.p2p_port),
)
def homebrew_connection_lost(self, ip: str) -> None:
asyncio.run(self.homebrew_connect(ip=ip))

async def hytera_dmr_connect(self) -> None:
await self.loop.create_datagram_endpoint(
lambda: self.hytera_dmr_protocol,
local_addr=(self.settings.ipsc_ip, self.settings.dmr_port),
reuse_port=True,
)

async def hytera_rdac_connect(self) -> None:
await self.loop.create_datagram_endpoint(
lambda: self.hytera_rdac_protocol,
local_addr=(self.settings.ipsc_ip, self.settings.rdac_port),
)

async def homebrew_connect(self) -> None:
incorrect_config_params = self.settings.get_incorrect_configurations()
async def homebrew_connect(self, ip: str) -> None:
incorrect_config_params = self.settings.get_incorrect_configurations(ip)
if len(incorrect_config_params) > 0:
self.homebrew_protocol.log_error(
"Current configuration is not valid for connection"
Expand All @@ -116,11 +99,71 @@ async def homebrew_connect(self) -> None:
# Extract bound socket port
self.settings.hb_local_port = hb_local_socket.getsockname()[1]

def homebrew_connection_lost(self) -> None:
asyncio.run(self.homebrew_connect())
async def go(self):
# start DMR protocol
await self.hytera_dmr_connect()

# start translator tasks
self.loop.create_task(self.hytera_mmdvm_translator.translate_from_mmdvm())
self.loop.create_task(self.hytera_mmdvm_translator.translate_from_hytera())

# mmdvm maintenance (auto login, auth, ping/pong)
self.loop.create_task(self.homebrew_protocol.periodic_maintenance())

# send translated or protocol generated packets to respective upstreams
self.loop.create_task(self.hytera_dmr_protocol.send_hytera_from_queue())
self.loop.create_task(self.homebrew_protocol.send_mmdvm_from_queue())


class HyteraHomebrewBridge(CallbackInterface):
def __init__(self, settings_ini_path: str):
self.loop: Optional[AbstractEventLoop] = None
self.settings: BridgeSettings = BridgeSettings(filepath=settings_ini_path)
self.repeaters: Dict[str, HyteraRepeater] = {}
# hytera ipsc: p2p dmr and rdac
self.hytera_p2p_protocol: HyteraP2PProtocol = HyteraP2PProtocol(
settings=self.settings, repeater_accepted_callback=self
)
self.hytera_rdac_protocol: HyteraRDACProtocol = HyteraRDACProtocol(
settings=self.settings, rdac_completed_callback=self
)
# prepare translator

async def go(self) -> None:
self.loop = asyncio.get_running_loop()
self.settings.print_settings()

# connect Hytera repeater
await self.hytera_p2p_connect()
if not self.settings.hytera_disable_rdac:
await self.hytera_rdac_connect()

async def homebrew_connect(self, ip: str) -> None:
if not self.repeaters.get(ip):
self.repeaters[ip] = HyteraRepeater(
ip=ip, settings=self.settings, asyncloop=self.loop
)
await self.repeaters[ip].go()

await self.repeaters[ip].homebrew_connect(ip)

async def hytera_p2p_connect(self) -> None:
# P2P/IPSC Service address
await self.loop.create_datagram_endpoint(
lambda: self.hytera_p2p_protocol,
local_addr=(self.settings.ipsc_ip, self.settings.p2p_port),
)

async def hytera_rdac_connect(self) -> None:
await self.loop.create_datagram_endpoint(
lambda: self.hytera_rdac_protocol,
local_addr=(self.settings.ipsc_ip, self.settings.rdac_port),
)

def stop_running(self) -> None:
self.homebrew_protocol.disconnect()
for ip, repeater in self.repeaters.items():
repeater.homebrew_protocol.disconnect()

self.hytera_p2p_protocol.disconnect()
self.loop.stop()
for task in asyncio.Task.all_tasks():
Expand Down Expand Up @@ -156,28 +199,6 @@ def stop_running(self) -> None:
)
exit(1)

self_name: str = "hytera_homebrew_bridge"
self_spec = importlib.util.find_spec(self_name)
if self_spec is None:
mainlog.debug(
"Package hytera-homebrew-bridge is not installed, trying locally\n"
)
parent_folder: str = os.path.dirname(
os.path.dirname(os.path.realpath(__file__))
)
expected_folder: str = f"{parent_folder}{os.path.sep}"
if os.path.isdir(expected_folder):
sys.path.append(expected_folder)

from hytera_homebrew_bridge.lib.hytera_protocols import (
HyteraP2PProtocol,
HyteraDMRProtocol,
HyteraRDACProtocol,
)
from hytera_homebrew_bridge.lib.mmdvm_protocol import MMDVMProtocol
from hytera_homebrew_bridge.lib.settings import BridgeSettings
from hytera_homebrew_bridge.lib.hytera_mmdvm_translator import HyteraMmdvmTranslator

uvloop_spec = importlib.util.find_spec("uvloop")
if uvloop_spec:
import uvloop
Expand Down
10 changes: 4 additions & 6 deletions hytera_homebrew_bridge/dmrlib/transmission.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,13 +130,13 @@ def process_csbk(self, csbk: DmrCsbk):
self.new_transmission(TransmissionType.DataTransmission)
if csbk.csbk_opcode == DmrCsbk.CsbkoTypes.preamble:
if self.blocks_expected == 0:
self.blocks_expected = csbk.preamble_csbk_blocks_to_follow + 1
self.blocks_expected = csbk.csbk_data.preamble_csbk_blocks_to_follow + 1
elif (
self.blocks_expected - self.blocks_received
!= csbk.preamble_csbk_blocks_to_follow + 1
!= csbk.csbk_data.preamble_csbk_blocks_to_follow + 1
):
print(
f"CSBK not setting expected to {self.blocks_expected} - {self.blocks_received} != {csbk.preamble_csbk_blocks_to_follow}"
f"CSBK not setting expected to {self.blocks_expected} - {self.blocks_received} != {csbk.csbk_data.preamble_csbk_blocks_to_follow}"
)

self.blocks_received += 1
Expand Down Expand Up @@ -266,7 +266,7 @@ def end_data_transmission(self):
for packet in self.blocks:
if isinstance(packet, DmrCsbk):
print(
f"[CSBK] [{packet.preamble_source_address} -> {packet.preamble_target_address}] [{packet.preamble_group_or_individual}]"
f"[CSBK] [{packet.csbk_data.source_address} -> {packet.csbk_data.target_address}]"
)
elif isinstance(packet, DmrDataHeader):
print(
Expand Down Expand Up @@ -355,14 +355,12 @@ def process_packet(self, burst: BurstInfo) -> BurstInfo:

lc_info_bits = decode_complete_lc(burst.data_bits[:98] + burst.data_bits[166:])
if burst.data_type == DataType.VoiceLCHeader:
print("voice header", lc_info_bits.tobytes().hex())
self.process_voice_header(FullLinkControl.from_bytes(lc_info_bits))
elif burst.data_type == DataType.DataHeader:
self.process_data_header(DmrDataHeader.from_bytes(lc_info_bits))
elif burst.data_type == DataType.CSBK:
self.process_csbk(DmrCsbk.from_bytes(lc_info_bits))
elif burst.data_type == DataType.TerminatorWithLC:
print("voice terminator", lc_info_bits.tobytes().hex())
self.blocks_received += 1
self.end_voice_transmission()
elif burst.data_type in [
Expand Down
3 changes: 3 additions & 0 deletions hytera_homebrew_bridge/lib/callback_interface.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
class CallbackInterface:
async def homebrew_connect(self, ip: str) -> None:
pass
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ def __init__(self, settings: BridgeSettings) -> None:
def hytera_repeater_obtain_snmp(self, address: tuple, force: bool = False) -> None:
self.settings.hytera_repeater_ip = address[0]
if self.settings.snmp_enabled:
if force or not self.settings.hytera_snmp_data:
if force or not self.settings.hytera_snmp_data.get(address[0]):
SNMP().walk_ip(address, self.settings)
else:
self.log_warning("SNMP is disabled")
18 changes: 13 additions & 5 deletions hytera_homebrew_bridge/lib/hytera_mmdvm_translator.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,23 @@ def __init__(
hytera_outgoing: Queue,
mmdvm_incoming: Queue,
mmdvm_outgoing: Queue,
hytera_repeater_ip: str,
):
self.transmission_watcher: TransmissionWatcher = TransmissionWatcher()
self.settings = settings
self.queue_hytera_to_translate = hytera_incoming
self.queue_hytera_output = hytera_outgoing
self.queue_mmdvm_to_translate = mmdvm_incoming
self.queue_mmdvm_output = mmdvm_outgoing
self.hytera_ip: str = hytera_repeater_ip

async def translate_from_hytera(self):
loop = asyncio.get_running_loop()
while loop.is_running():
try:
packet: KaitaiStruct = await self.queue_hytera_to_translate.get()
ip, packet = await self.queue_hytera_to_translate.get()
assert isinstance(packet, KaitaiStruct)
assert isinstance(ip, str)
if not isinstance(packet, IpSiteConnectProtocol):
self.queue_hytera_to_translate.task_done()
continue
Expand Down Expand Up @@ -68,7 +72,7 @@ async def translate_from_hytera(self):
+ burst.sequence_no.to_bytes(1, byteorder="big")
+ packet.source_radio_id.to_bytes(3, byteorder="big")
+ packet.destination_radio_id.to_bytes(3, byteorder="big")
+ self.settings.get_repeater_dmrid().to_bytes(
+ self.settings.get_repeater_dmrid(ip).to_bytes(
4, byteorder="big"
)
+ get_mmdvm_bitflags(burst, packet)
Expand All @@ -77,7 +81,7 @@ async def translate_from_hytera(self):
+ byteswap_bytes(packet.ipsc_payload)[0:-1]
)

self.queue_mmdvm_output.put_nowait(mmdvm_out)
self.queue_mmdvm_output.put_nowait((self.hytera_ip, mmdvm_out))
else:
print(
"Hytera BurstInfo not available",
Expand Down Expand Up @@ -105,7 +109,9 @@ async def translate_from_mmdvm(self):
loop = asyncio.get_running_loop()
while loop.is_running():
try:
packet: Mmdvm2020 = await self.queue_mmdvm_to_translate.get()
ip, packet = await self.queue_mmdvm_to_translate.get()
assert isinstance(packet, Mmdvm2020)
assert isinstance(ip, str)
if not isinstance(packet.command_data, Mmdvm2020.TypeDmrData):
self.queue_mmdvm_to_translate.task_done()
continue
Expand Down Expand Up @@ -133,7 +139,9 @@ async def translate_from_mmdvm(self):
frame_type=get_ipsc_frame_type(burst),
hytera_slot_type=get_ipsc_slot_type(burst),
)
self.queue_hytera_output.put_nowait(hytera_out_packet)
self.queue_hytera_output.put_nowait(
(self.hytera_ip, hytera_out_packet)
)
else:
print(
f"MMDVM BurstInfo not available",
Expand Down
Loading

0 comments on commit 23aa916

Please sign in to comment.