From 537f2ce708afcb2146f249b90fd442c016928a14 Mon Sep 17 00:00:00 2001 From: Dries C Date: Tue, 5 Mar 2024 13:27:20 +0100 Subject: [PATCH] Handle ping pong on thread --- ProxyListener.php | 40 -------- ProxyNetworkInterface.php | 87 ++-------------- ProxyServer.php | 207 ++++++++++++++++++++++++++++++++++++-- 3 files changed, 207 insertions(+), 127 deletions(-) delete mode 100644 ProxyListener.php diff --git a/ProxyListener.php b/ProxyListener.php deleted file mode 100644 index 501e6f9..0000000 --- a/ProxyListener.php +++ /dev/null @@ -1,40 +0,0 @@ -getOrigin(); - $packet = $event->getPacket(); - - /** @var NetworkStackLatencyPacket $packet USED FOR PING CALCULATIONS */ - if ($packet->pid() == NetworkStackLatencyPacket::NETWORK_ID) { - if ($packet->timestamp === 0 && $packet->needResponse) { - if (($player = $origin->getPlayer()) !== null && $player->isConnected()) { - $origin->sendDataPacket(NetworkStackLatencyPacket::response(0)); - } - $event->cancel(); - } - } - } -} diff --git a/ProxyNetworkInterface.php b/ProxyNetworkInterface.php index 1503048..ca926a0 100644 --- a/ProxyNetworkInterface.php +++ b/ProxyNetworkInterface.php @@ -6,7 +6,6 @@ namespace libproxy; use Error; -use ErrorException; use Exception; use libproxy\data\LatencyData; use libproxy\data\TickSyncPacket; @@ -18,16 +17,12 @@ use libproxy\protocol\ProxyPacketSerializer; use pmmp\thread\Thread as NativeThread; use pmmp\thread\ThreadSafeArray; -use pocketmine\network\mcpe\compression\DecompressionException; use pocketmine\network\mcpe\compression\ZlibCompressor; use pocketmine\network\mcpe\convert\TypeConverter; use pocketmine\network\mcpe\EntityEventBroadcaster; use pocketmine\network\mcpe\NetworkSession; use pocketmine\network\mcpe\PacketBroadcaster; -use pocketmine\network\mcpe\protocol\PacketDecodeException; use pocketmine\network\mcpe\protocol\PacketPool; -use pocketmine\network\mcpe\protocol\serializer\PacketBatch; -use pocketmine\network\mcpe\protocol\types\CompressionAlgorithm; use pocketmine\network\mcpe\raklib\PthreadsChannelReader; use pocketmine\network\mcpe\raklib\PthreadsChannelWriter; use pocketmine\network\NetworkInterface; @@ -37,16 +32,13 @@ use pocketmine\Server; use pocketmine\snooze\SleeperHandlerEntry; use pocketmine\thread\ThreadCrashException; -use pocketmine\timings\Timings; use pocketmine\utils\Binary; use pocketmine\utils\BinaryDataException; -use pocketmine\utils\BinaryStream; use Socket; use ThreadedArray; use WeakMap; use function base64_encode; use function bin2hex; -use function ord; use function socket_close; use function socket_create_pair; use function socket_last_error; @@ -55,7 +47,6 @@ use function strlen; use function substr; use function trim; -use function zstd_uncompress; use const AF_INET; use const AF_UNIX; use const SOCK_STREAM; @@ -148,8 +139,6 @@ public function __construct(PluginBase $plugin, int $port, ?string $composerPath $this->sendBytes = 0; $this->receiveBytes = 0; }), 20, 20); - - $server->getPluginManager()->registerEvents(new ProxyListener(), $plugin); } public static function handleRawLatency(NetworkSession $session, int $upstream, int $downstream): void @@ -217,82 +206,28 @@ private function onPacketReceive(string $buffer): void break; case ForwardPacket::NETWORK_ID: /** @var ForwardPacket $pk */ - if (($session = $this->getSession($socketId)) === null) { + if (($session = $this->getSession($socketId)) === null || !(fn() => $this->connected)->call($session)) { break; // might be data arriving from the client after the server has closed the connection } - $this->handleEncoded($session, $pk->payload); - $this->receiveBytes += strlen($pk->payload); - break; - } - } catch (PacketHandlingException|BinaryDataException $exception) { - $this->close($socketId, 'Error handling a Packet (Server)'); - - $this->server->getLogger()->logException($exception); - } - } - - /** - * @throws PacketHandlingException - */ - public function handleEncoded(NetworkSession $session, string $payload): void - { - if (!(fn() => $this->connected)->call($session)) { - return; - } - - Timings::$playerNetworkReceive->startTiming(); - try { - (fn() => $this->packetBatchLimiter->decrement())->call($session); - - if (strlen($payload) < 1) { - throw new PacketHandlingException("No bytes in payload"); - } - - Timings::$playerNetworkReceiveDecompress->startTiming(); - $compressionType = ord($payload[0]); - $compressed = substr($payload, 1); - - try { - $decompressed = match ($compressionType) { - CompressionAlgorithm::NONE => $compressed, - CompressionAlgorithm::ZLIB => $session->getCompressor()->decompress($compressed), - CompressionAlgorithm::NONE - 1 => ($d = zstd_uncompress($compressed)) === false ? throw new DecompressionException("Failed to decompress packet") : $d, - default => throw new PacketHandlingException("Packet compressed with unexpected compression type $compressionType") - }; - } catch (ErrorException|DecompressionException $e) { - $session->getLogger()->debug("Failed to decompress packet: " . base64_encode($compressed)); - throw PacketHandlingException::wrap($e, "Compressed packet batch decode error"); - } finally { - Timings::$playerNetworkReceiveDecompress->stopTiming(); - } - - try { - $stream = new BinaryStream($decompressed); - $count = 0; - foreach (PacketBatch::decodeRaw($stream) as $buffer) { - (fn() => $this->gamePacketLimiter->decrement())->call($session); - if (++$count > 100) { - throw new PacketHandlingException("Too many packets in batch"); - } - $packet = PacketPool::getInstance()->getPacket($buffer); + $packet = PacketPool::getInstance()->getPacket($pk->payload); if ($packet === null) { - $session->getLogger()->debug("Unknown packet: " . base64_encode($buffer)); + $session->getLogger()->debug("Unknown packet: " . base64_encode($pk->payload)); throw new PacketHandlingException("Unknown packet received"); } try { - $session->handleDataPacket($packet, $buffer); + $session->handleDataPacket($packet, $pk->payload); } catch (PacketHandlingException $e) { - $session->getLogger()->debug($packet->getName() . ": " . base64_encode($buffer)); + $session->getLogger()->debug($packet->getName() . ": " . base64_encode($pk->payload)); throw PacketHandlingException::wrap($e, "Error processing " . $packet->getName()); } - } - } catch (PacketDecodeException|BinaryDataException $e) { - $session->getLogger()->logException($e); - throw PacketHandlingException::wrap($e, "Packet batch decode error"); + $this->receiveBytes += strlen($pk->payload); + break; } - } finally { - Timings::$playerNetworkReceive->stopTiming(); + } catch (PacketHandlingException|BinaryDataException $exception) { + $this->close($socketId, 'Error handling a Packet (Server)'); + + $this->server->getLogger()->logException($exception); } } diff --git a/ProxyServer.php b/ProxyServer.php index 895b928..1ddcbfd 100644 --- a/ProxyServer.php +++ b/ProxyServer.php @@ -4,6 +4,7 @@ namespace libproxy; +use ErrorException; use libproxy\protocol\DisconnectPacket; use libproxy\protocol\ForwardPacket; use libproxy\protocol\LoginPacket; @@ -17,6 +18,19 @@ use NetherGames\Quiche\stream\BiDirectionalQuicheStream; use NetherGames\Quiche\stream\QuicheStream; use pmmp\thread\ThreadSafeArray; +use pocketmine\network\mcpe\compression\DecompressionException; +use pocketmine\network\mcpe\compression\ZlibCompressor; +use pocketmine\network\mcpe\NetworkSession; +use pocketmine\network\mcpe\PacketRateLimiter; +use pocketmine\network\mcpe\protocol\NetworkStackLatencyPacket; +use pocketmine\network\mcpe\protocol\Packet as BedrockPacket; +use pocketmine\network\mcpe\protocol\PacketDecodeException; +use pocketmine\network\mcpe\protocol\PacketPool; +use pocketmine\network\mcpe\protocol\ProtocolInfo; +use pocketmine\network\mcpe\protocol\RequestNetworkSettingsPacket; +use pocketmine\network\mcpe\protocol\serializer\PacketBatch; +use pocketmine\network\mcpe\protocol\serializer\PacketSerializer; +use pocketmine\network\mcpe\protocol\types\CompressionAlgorithm; use pocketmine\network\mcpe\raklib\PthreadsChannelReader; use pocketmine\network\mcpe\raklib\SnoozeAwarePthreadsChannelWriter; use pocketmine\network\PacketHandlingException; @@ -25,13 +39,19 @@ use pocketmine\thread\log\AttachableThreadSafeLogger; use pocketmine\utils\Binary; use pocketmine\utils\BinaryDataException; +use pocketmine\utils\BinaryStream; use Socket; use function array_keys; +use function base64_encode; +use function bin2hex; +use function chr; use function getenv; +use function ord; use function socket_read; use function spl_object_id; use function strlen; use function substr; +use function zstd_uncompress; class ProxyServer { @@ -55,6 +75,13 @@ class ProxyServer /** @var array */ private array $streams = []; + /** @phpstan-var array */ + private array $gamePacketLimiter = []; + /** @phpstan-var array */ + private array $batchPacketLimiter = []; + /** @phpstan-var array */ + private array $protocolId = []; + /** @phpstan-var array */ private array $socketBuffer = []; /** @phpstan-var array */ @@ -153,7 +180,14 @@ public function waitShutdown(): void private function onStreamShutdown(int $streamIdentifier): void { - unset($this->streamWriters[$streamIdentifier], $this->connectionIdByStreamId[$streamIdentifier], $this->streams[$streamIdentifier]); + unset( + $this->streamWriters[$streamIdentifier], + $this->connectionIdByStreamId[$streamIdentifier], + $this->streams[$streamIdentifier], + $this->gamePacketLimiter[$streamIdentifier], + $this->batchPacketLimiter[$streamIdentifier], + $this->protocolId[$streamIdentifier] + ); } public function getTickSleeper(): SleeperHandler @@ -166,6 +200,16 @@ private function getStreamWriter(int $streamIdentifier): ?QueueWriter return $this->streamWriters[$streamIdentifier] ?? null; } + private function getGamePacketLimiter(int $streamIdentifier): PacketRateLimiter + { + return $this->gamePacketLimiter[$streamIdentifier] ??= new PacketRateLimiter("Game Packets", 2, 100); + } + + private function getBatchPacketLimiter(int $streamIdentifier): PacketRateLimiter + { + return $this->batchPacketLimiter[$streamIdentifier] ??= new PacketRateLimiter("Batch Packets", 2, 100); + } + private function shutdownStream(int $streamIdentifier, string $reason, bool $fromMain): void { if (!$fromMain) { @@ -229,17 +273,161 @@ private function pushSockets(): void break; case ForwardPacket::NETWORK_ID: /** @var ForwardPacket $pk */ - if (($writer = $this->getStreamWriter($streamIdentifier)) === null) { - $this->shutdownStream($streamIdentifier, 'stream not found', false); - return; - } - - $writer->write(Binary::writeInt(strlen($pk->payload)) . $pk->payload); + $this->sendPayload($streamIdentifier, $pk->payload); break; } } } + /** + * Sends a payload to the client + */ + private function sendPayload(int $streamIdentifier, string $payload): void + { + if (($writer = $this->getStreamWriter($streamIdentifier)) === null) { + $this->shutdownStream($streamIdentifier, 'stream not found', false); + return; + } + + $writer->write(Binary::writeInt(strlen($payload)) . $payload); + } + + /** + * Sends a data packet to the main thread. + */ + private function sendDataPacketToMain(int $socketIdentifier, string $payload): void + { + $pk = new ForwardPacket(); + $pk->payload = $payload; + + $this->sendToMainBuffer($socketIdentifier, $pk); + } + + /** + * Returns the protocol ID for the given socket identifier. + */ + private function getProtocolId(int $socketIdentifier): int + { + return $this->protocolId[$socketIdentifier] ?? ProtocolInfo::CURRENT_PROTOCOL; + } + + /** + * Sends a data packet to the client using a single packet in a batch. + */ + private function sendDataPacket(int $socketIdentifier, BedrockPacket $packet): void + { + $packetSerializer = PacketSerializer::encoder($protocolId = $this->getProtocolId($socketIdentifier)); + $packet->encode($packetSerializer); + + $stream = new BinaryStream(); + PacketBatch::encodeRaw($stream, [$packetSerializer->getBuffer()]); + $payload = ($protocolId >= ProtocolInfo::PROTOCOL_1_20_60 ? chr(CompressionAlgorithm::NONE) : '') . $stream->getBuffer(); + + $this->sendPayload($socketIdentifier, $payload); + } + + private function decodePacket(int $socketIdentifier, BedrockPacket $packet, string $buffer): void + { + $stream = PacketSerializer::decoder($this->protocolId[$socketIdentifier] ?? ProtocolInfo::CURRENT_PROTOCOL, $buffer, 0); + try { + $packet->decode($stream); + } catch (PacketDecodeException $e) { + throw PacketHandlingException::wrap($e); + } + if (!$stream->feof()) { + $remains = substr($stream->getBuffer(), $stream->getOffset()); + $this->logger->debug("Still " . strlen($remains) . " bytes unread in " . $packet->getName() . ": " . bin2hex($remains)); + } + } + + /** + * Returns true if the packet was handled successfully, false if it should be sent to the main thread. + * + * @return bool whether the packet was handled successfully + */ + private function handleDataPacket(int $socketIdentifier, BedrockPacket $packet, string $buffer): bool + { + if ($packet->pid() == NetworkStackLatencyPacket::NETWORK_ID) { + /** @var NetworkStackLatencyPacket $packet USED FOR PING CALCULATIONS */ + $this->decodePacket($socketIdentifier, $packet, $buffer); + + if ($packet->timestamp === 0 && $packet->needResponse) { + $this->sendDataPacket($socketIdentifier, NetworkStackLatencyPacket::response(0)); + return true; + } + } else if ($packet->pid() === RequestNetworkSettingsPacket::NETWORK_ID) { + /** @var RequestNetworkSettingsPacket $packet USED TO GET PROTOCOLID */ + $this->decodePacket($socketIdentifier, $packet, $buffer); + + $this->protocolId[$socketIdentifier] = $packet->getProtocolVersion(); + } + + return false; + } + + /** + * @param int $socketIdentifier + * @param string $payload + * @return void + * @see NetworkSession::handleEncoded($payload) + * + */ + private function onFullDataReceive(int $socketIdentifier, string $payload): void + { + try { + $this->getBatchPacketLimiter($socketIdentifier)->decrement(); + + if (strlen($payload) < 1) { + throw new PacketHandlingException("No bytes in payload"); + } + + $compressionType = ord($payload[0]); + $compressed = substr($payload, 1); + + try { + $decompressed = match ($compressionType) { + CompressionAlgorithm::NONE => $compressed, + CompressionAlgorithm::ZLIB => ZlibCompressor::getInstance()->decompress($compressed), + CompressionAlgorithm::NONE - 1 => ($d = zstd_uncompress($compressed)) === false ? throw new DecompressionException("Failed to decompress packet") : $d, + default => throw new PacketHandlingException("Packet compressed with unexpected compression type $compressionType") + }; + } catch (ErrorException|DecompressionException $e) { + $this->logger->debug("Failed to decompress packet: " . base64_encode($compressed)); + throw PacketHandlingException::wrap($e, "Compressed packet batch decode error"); + } + + try { + $stream = new BinaryStream($decompressed); + $count = 0; + foreach (PacketBatch::decodeRaw($stream) as $buffer) { + $this->getGamePacketLimiter($socketIdentifier)->decrement(); + if (++$count > 100) { + throw new PacketHandlingException("Too many packets in batch"); + } + $packet = PacketPool::getInstance()->getPacket($buffer); + if ($packet === null) { + $this->logger->debug("Unknown packet: " . base64_encode($buffer)); + throw new PacketHandlingException("Unknown packet received"); + } + try { + if (!$this->handleDataPacket($socketIdentifier, $packet, $buffer)) { + $this->sendDataPacketToMain($socketIdentifier, $buffer); + } + } catch (PacketHandlingException $e) { + $this->logger->debug($packet->getName() . ": " . base64_encode($buffer)); + throw PacketHandlingException::wrap($e, "Error processing " . $packet->getName()); + } + } + } catch (PacketDecodeException|BinaryDataException $e) { + $this->logger->logException($e); + throw PacketHandlingException::wrap($e, "Packet batch decode error"); + } + } catch (PacketHandlingException $e) { + $this->logger->logException($e); + $this->shutdownStream($socketIdentifier, "invalid packet", false); + } + } + private function onDataReceive(int $socketIdentifier, string $data): void { if (isset($this->socketBuffer[$socketIdentifier])) { @@ -268,10 +456,7 @@ private function onDataReceive(int $socketIdentifier, string $data): void $this->socketBuffer[$socketIdentifier] = substr($buffer, 4); } } else if ($length >= $lengthNeeded) { - $pk = new ForwardPacket(); - $pk->payload = substr($buffer, 0, $lengthNeeded); - - $this->sendToMainBuffer($socketIdentifier, $pk); + $this->onFullDataReceive($socketIdentifier, substr($buffer, 0, $lengthNeeded)); $this->socketBuffer[$socketIdentifier] = substr($buffer, $lengthNeeded); unset($this->socketBufferLengthNeeded[$socketIdentifier]);