Skip to content

Commit

Permalink
Add initial revision of the WS/WSS socket server.
Browse files Browse the repository at this point in the history
  • Loading branch information
sobomax committed Jul 10, 2024
1 parent f1ed5f9 commit 97e82ed
Show file tree
Hide file tree
Showing 3 changed files with 173 additions and 3 deletions.
5 changes: 5 additions & 0 deletions sippy/MyConfigParser.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,11 @@
'and "SUBSCRIBE" messages. Address in the format ' \
'"host[:port]"'),
'nat_traversal': ('B', 'enable NAT traversal for signalling'), \
'wss_socket': ('S', 'WSS (SIP via websockets, RFC7118) socket ' \
'configuration. Configuration in the format ' \
'"host:port:cert_file:key_file", where "cert_file" ' \
'/ "key_file" are paths to the TLS certificate ' \
'and key file respectively in the X.509 PEM format'),
'xmpp_b2bua_id': ('I', 'ID passed to the XMPP socket server')}

class MyConfigParser(RawConfigParser):
Expand Down
147 changes: 147 additions & 0 deletions sippy/Wss_server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
# Copyright (c) 2006-2024 Sippy Software, Inc. All rights reserved.
#
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
# are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice, this
# list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation and/or
# other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR
# ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
# (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
# ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

from typing import Optional, Dict, Tuple
from threading import Thread
from asyncio import get_event_loop, all_tasks, new_event_loop, set_event_loop, CancelledError, \
Queue as AsyncQueue, create_task
from ssl import SSLContext, PROTOCOL_TLS_SERVER
from uuid import UUID
from websockets import WebSocketServerProtocol, ConnectionClosed, serve as ws_serve

from sippy.Core.EventDispatcher import ED2
from sippy.Network_server import Network_server, Network_server_opts, Remote_address
from sippy.Time.MonoTime import MonoTime

class Wss_server_opts(Network_server_opts):
certfile: Optional[str] = None
keyfile: Optional[str] = None

def __init__(self, *args, certfile = None, keyfile = None, o = None):
super().__init__(*args, o = o)
if o != None:
self.certfile, self.keyfile = o.certfile, o.keyfile
return
self.certfile = certfile
self.keyfile = keyfile

class Wss_server(Thread, Network_server):
transport = 'ws'
daemon = True
ssl_context: Optional[SSLContext] = None
connections: Dict[UUID, Tuple[WebSocketServerProtocol, AsyncQueue]]

def __init__(self, global_config, uopts:Wss_server_opts):
Thread.__init__(self)
Network_server.__init__(self, uopts)
if self.uopts.certfile is not None:
self.ssl_context = SSLContext(PROTOCOL_TLS_SERVER)
self.ssl_context.load_cert_chain(self.uopts.certfile, self.uopts.keyfile)
self.connections = {}
self.start()

async def monitor_queue(self):
while True:
item = await get_event_loop().run_in_executor(None, self.sendqueue.get)
if item is None:
for task in all_tasks():
task.cancel()
break
data, address = item
uaddress = address[0]
if uaddress not in self.connections:
print(f'ERROR: Invalid address {uaddress=}')
continue
await self.connections[uaddress][1].put(data)

async def sip_to_ws(self, queue:AsyncQueue, websocket:WebSocketServerProtocol):
while True:
item = await queue.get()
await websocket.send(item)

async def ws_to_sip(self, websocket, path):
print(f'New connection {websocket.id=}')
queue = AsyncQueue()
sender = create_task(self.sip_to_ws(queue, websocket))
conn_id = f'{websocket.id}.invalid'
self.connections[conn_id] = (websocket, queue)
address = Remote_address(websocket.remote_address, self.transport)
address.received = conn_id
try:
while True:
data = await websocket.recv()
rtime = MonoTime()
ED2.callFromThread(self.handle_read, data, address, rtime)
except ConnectionClosed:
print(f'Connection {websocket.id} closed')
finally:
del self.connections[conn_id]
sender.cancel()
await sender

async def async_run(self):
start_server = ws_serve(
self.ws_to_sip, self.uopts.laddress[0], self.uopts.laddress[1], ssl = self.ssl_context,
subprotocols = ['sip']
)
server = await start_server
await self.monitor_queue()
server.close()
await server.wait_closed()

def runFailed(self, exception):
ED2.breakLoop(255)
raise exception

def run(self):
loop = new_event_loop()
set_event_loop(loop)
try:
loop.run_until_complete(self.async_run())
except CancelledError:
pass
except OSError as ex:
ED2.callFromThread(self.runFailed, ex)
finally:
loop.close()

if __name__ == '__main__':
laddr = ('192.168.23.43', 9878)
certfile = '/home/sobomax/server.crt'
keyfile = '/home/sobomax/server.key'
from sippy.SipRequest import SipRequest
def data_callback(data, address, server, rtime):
sr = SipRequest(data)
print(f'Got {sr=} from {address=}')
for rr in (100, 'Trying'), (666, 'Busy Here'):
res = sr.genResponse(rr[0], rr[1])
server.send_to(str(res), address)
ED2.breakLoop()
wopts = Wss_server_opts(laddr, data_callback, certfile = certfile, keyfile = keyfile)
wserv = Wss_server(None, wopts)
try:
ED2.loop()
finally:
wserv.shutdown()
24 changes: 21 additions & 3 deletions sippy/b2bua_radius.py
Original file line number Diff line number Diff line change
Expand Up @@ -435,8 +435,11 @@ def recvRequest(self, req, sip_t):
via = req.getHFBody('via', 1)
else:
via = req.getHFBody('via', 0)
remote_ip = via.getTAddr()[0]
source = req.getSource()
if not via.transport == 'WSS':
remote_ip = via.getTAddr()[0]
else:
remote_ip = source[0]

# First check if request comes from IP that
# we want to accept our traffic from
Expand Down Expand Up @@ -820,7 +823,18 @@ def main_func():

if global_config.getdefault('xmpp_b2bua_id', None) != None:
global_config['_xmpp_mode'] = True
global_config['_sip_tm'] = SipTransactionManager(global_config, global_config['_cmap'].recvRequest)
stm = SipTransactionManager(global_config, global_config['_cmap'].recvRequest)

from sippy.Wss_server import Wss_server, Wss_server_opts
if 'wss_socket' in global_config:
parts = global_config['wss_socket'].split(':', 3)
wss_laddr = (parts[0], int(parts[1]))
wss_certfile = parts[2]
wss_keyfile = parts[3]
wss_opts = Wss_server_opts(wss_laddr, stm.handleIncoming, certfile=wss_certfile, keyfile=wss_keyfile)
global_config['_wss_server'] = Wss_server(global_config, wss_opts)

global_config['_sip_tm'] = stm
global_config['_sip_tm'].nat_traversal = global_config.getdefault('nat_traversal', False)

cmdfile = global_config['b2bua_socket']
Expand All @@ -832,7 +846,11 @@ def main_func():
open(global_config['pidfile'], 'w').write(str(os.getpid()) + '\n')
Signal(SIGUSR1, reopen, SIGUSR1, global_config['logfile'])

ED2.loop()
try:
ED2.loop()
finally:
if '_wss_server' in global_config:
global_config['_wss_server'].shutdown()

if __name__ == '__main__':
main_func()

0 comments on commit 97e82ed

Please sign in to comment.