-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathclient.py
178 lines (128 loc) · 5.31 KB
/
client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
from __future__ import annotations
import asyncio
import logging
import time
import typing as t
import asyncio_dgram
from .errors import IncorrectPasswordError, RCONError
T = t.TypeVar("T")
class Client:
def __init__(
self,
host: str,
port: int = 27960,
password: str = "secret",
*,
timeout: float = 2.0,
fragment_read_timeout: float = 0.25,
retries: int = 2,
logger: logging.Logger | None = None,
):
self.host = host
self.port = port
self.password = password
self.timeout = timeout
self.fragment_read_timeout = fragment_read_timeout
self.retries = retries
self.logger = logger or logging.getLogger("aio-q3-rcon")
if logger is None:
self.logger.disabled = True
self._dgram: asyncio_dgram.DatagramClient | None = None
async def __aenter__(self) -> Client:
await self.connect()
return self
async def __aexit__(
self, exc_type: t.Type[Exception] | None, exc_val: Exception | None, exc_tb: t.Any
) -> bool:
await self.close()
return False # indicate we don't want to supress the exception
def _get_dgram(self) -> asyncio_dgram.DatagramClient:
if self._dgram is None:
raise RuntimeError("Connection not yet established")
return self._dgram
async def _retry(self, call: t.Callable[[], t.Coroutine[t.Any, t.Any, T]]) -> T | None:
exc: Exception | None = None
for _ in range(self.retries):
try:
self.logger.debug("Calling %s with retry logic", call)
return await call()
except Exception as e:
self.logger.debug(
"The call to %s failed and it may be retried", call, exc_info=True
)
if exc is None:
exc = e
if exc is not None:
raise exc
return None
async def connect(self, verify: bool = True) -> None:
"""
Connects to the remote server.
If verify is True, this attempts to verify if the server is indeed a working Quake 3 server by using
the heartbeat command
"""
self.logger.debug("Connecting to %s:%s", self.host, self.port)
self._dgram = await asyncio.wait_for(
self._retry(lambda: asyncio_dgram.connect((self.host, self.port))), self.timeout
)
if verify and not (await self.send_command("heartbeat")).startswith("print\n"):
raise RCONError("Invalid / unsupported server")
async def close(self) -> None:
"""Closes the connection between the server and client."""
self.logger.debug("Closing")
if self._dgram is None:
return
try:
self._get_dgram().close()
finally:
self._dgram = None
@staticmethod
def _process_response(data: bytes, interpret: bool) -> bytes:
"""
Process a response from the server.
If interpret is True, we do a lazy attempt of parsing the data like an actual Quake3 client might for displaying
in the chat.
"""
if not data.startswith(b"\xFF" * 4):
raise ValueError("Invalid data received from server")
data = data[4:]
if data == b"print\nBad rconpassword.\n":
raise IncorrectPasswordError()
if interpret:
data = data.removeprefix(b"print\n").removeprefix(b"broadcast: ")
if data.startswith(b"print "):
data = data.removeprefix(b"print ")
data = data.strip(b'" \n\r')
return data
async def _get_response(self, interpret: bool) -> bytes:
"""
Read a response from the server
Since packets may be fragmented and there is no way to tell if they are, the client makes an attempt to read
more from the server until either the timeout is passed or the fragment_read_timeout is passed for an
individual call to asyncio_dgram.DatagramClient(...).recv().
"""
start_time = time.perf_counter()
data = bytearray()
while (time.perf_counter() - start_time) < self.timeout:
try:
part: bytes
part, remote = await asyncio.wait_for(
self._get_dgram().recv(), self.fragment_read_timeout
)
self.logger.debug("Received %s from %s", part, remote)
data.extend(self._process_response(part, interpret=interpret))
except asyncio.TimeoutError:
break
return bytes(data)
async def _send_command(self, command: str) -> None:
message = (b"\xFF" * 4) + f'rcon "{self.password}" {command}'.encode("ascii")
await self._get_dgram().send(message)
async def send_command(self, command: str, *, interpret: bool = False) -> str:
"""
Sends a command to the server and reads the response back.
If interpret is true, the response will be processed to be more similar to what would actually appear
in the in-game chat.
"""
await asyncio.wait_for(self._retry(lambda: self._send_command(command)), self.timeout)
response = await asyncio.wait_for(self._get_response(interpret), self.timeout)
return response.decode("ascii")