Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Handle ping pong on thread
Browse files Browse the repository at this point in the history
dries-c committed Mar 5, 2024
1 parent f9125b5 commit 44b9499
Showing 3 changed files with 210 additions and 132 deletions.
40 changes: 0 additions & 40 deletions ProxyListener.php

This file was deleted.

85 changes: 10 additions & 75 deletions ProxyNetworkInterface.php
Original file line number Diff line number Diff line change
@@ -6,7 +6,6 @@
namespace libproxy;

use Error;
use ErrorException;
use Exception;
use libproxy\data\LatencyData;
use libproxy\data\TickSyncPacket;
@@ -18,16 +17,12 @@
use libproxy\protocol\ProxyPacketSerializer;
use pmmp\thread\Thread as NativeThread;
use pmmp\thread\ThreadSafeArray;
use pocketmine\network\mcpe\compression\DecompressionException;
use pocketmine\network\mcpe\compression\ZlibCompressor;
use pocketmine\network\mcpe\convert\TypeConverter;
use pocketmine\network\mcpe\EntityEventBroadcaster;
use pocketmine\network\mcpe\NetworkSession;
use pocketmine\network\mcpe\PacketBroadcaster;
use pocketmine\network\mcpe\protocol\PacketDecodeException;
use pocketmine\network\mcpe\protocol\PacketPool;
use pocketmine\network\mcpe\protocol\serializer\PacketBatch;
use pocketmine\network\mcpe\protocol\types\CompressionAlgorithm;
use pocketmine\network\mcpe\raklib\PthreadsChannelReader;
use pocketmine\network\mcpe\raklib\PthreadsChannelWriter;
use pocketmine\network\NetworkInterface;
@@ -37,16 +32,13 @@
use pocketmine\Server;
use pocketmine\snooze\SleeperHandlerEntry;
use pocketmine\thread\ThreadCrashException;
use pocketmine\timings\Timings;
use pocketmine\utils\Binary;
use pocketmine\utils\BinaryDataException;
use pocketmine\utils\BinaryStream;
use Socket;
use ThreadedArray;
use WeakMap;
use function base64_encode;
use function bin2hex;
use function ord;
use function socket_close;
use function socket_create_pair;
use function socket_last_error;
@@ -55,7 +47,6 @@
use function strlen;
use function substr;
use function trim;
use function zstd_uncompress;
use const AF_INET;
use const AF_UNIX;
use const SOCK_STREAM;
@@ -148,8 +139,6 @@ public function __construct(PluginBase $plugin, int $port, ?string $composerPath
$this->sendBytes = 0;
$this->receiveBytes = 0;
}), 20, 20);

$server->getPluginManager()->registerEvents(new ProxyListener(), $plugin);
}

public static function handleRawLatency(NetworkSession $session, int $upstream, int $downstream): void
@@ -221,78 +210,24 @@ private function onPacketReceive(string $buffer): void
break; // might be data arriving from the client after the server has closed the connection
}

$this->handleEncoded($session, $pk->payload);
$this->receiveBytes += strlen($pk->payload);
break;
}
} catch (PacketHandlingException|BinaryDataException $exception) {
$this->close($socketId, 'Error handling a Packet (Server)');

$this->server->getLogger()->logException($exception);
}
}

/**
* @throws PacketHandlingException
*/
public function handleEncoded(NetworkSession $session, string $payload): void
{
if (!(fn() => $this->connected)->call($session)) {
return;
}

Timings::$playerNetworkReceive->startTiming();
try {
(fn() => $this->packetBatchLimiter->decrement())->call($session);

if (strlen($payload) < 1) {
throw new PacketHandlingException("No bytes in payload");
}

Timings::$playerNetworkReceiveDecompress->startTiming();
$compressionType = ord($payload[0]);
$compressed = substr($payload, 1);

try {
$decompressed = match ($compressionType) {
CompressionAlgorithm::NONE => $compressed,
CompressionAlgorithm::ZLIB => $session->getCompressor()->decompress($compressed),
CompressionAlgorithm::NONE - 1 => ($d = zstd_uncompress($compressed)) === false ? throw new DecompressionException("Failed to decompress packet") : $d,
default => throw new PacketHandlingException("Packet compressed with unexpected compression type $compressionType")
};
} catch (ErrorException|DecompressionException $e) {
$session->getLogger()->debug("Failed to decompress packet: " . base64_encode($compressed));
throw PacketHandlingException::wrap($e, "Compressed packet batch decode error");
} finally {
Timings::$playerNetworkReceiveDecompress->stopTiming();
}

try {
$stream = new BinaryStream($decompressed);
$count = 0;
foreach (PacketBatch::decodeRaw($stream) as $buffer) {
(fn() => $this->gamePacketLimiter->decrement())->call($session);
if (++$count > 100) {
throw new PacketHandlingException("Too many packets in batch");
}
$packet = PacketPool::getInstance()->getPacket($buffer);
$packet = PacketPool::getInstance()->getPacket($pk->payload);
if ($packet === null) {
$session->getLogger()->debug("Unknown packet: " . base64_encode($buffer));
$session->getLogger()->debug("Unknown packet: " . base64_encode($pk->payload));
throw new PacketHandlingException("Unknown packet received");
}
try {
$session->handleDataPacket($packet, $buffer);
$session->handleDataPacket($packet, $pk->payload);
} catch (PacketHandlingException $e) {
$session->getLogger()->debug($packet->getName() . ": " . base64_encode($buffer));
$session->getLogger()->debug($packet->getName() . ": " . base64_encode($pk->payload));
throw PacketHandlingException::wrap($e, "Error processing " . $packet->getName());
}
}
} catch (PacketDecodeException|BinaryDataException $e) {
$session->getLogger()->logException($e);
throw PacketHandlingException::wrap($e, "Packet batch decode error");
$this->receiveBytes += strlen($pk->payload);
break;
}
} finally {
Timings::$playerNetworkReceive->stopTiming();
} catch (PacketHandlingException|BinaryDataException $exception) {
$this->close($socketId, 'Error handling a Packet (Server)');

$this->server->getLogger()->logException($exception);
}
}

217 changes: 200 additions & 17 deletions ProxyServer.php
Original file line number Diff line number Diff line change
@@ -12,6 +12,19 @@
use libproxy\protocol\ProxyPacketPool;
use libproxy\protocol\ProxyPacketSerializer;
use pmmp\thread\ThreadSafeArray;
use pocketmine\network\mcpe\compression\DecompressionException;
use pocketmine\network\mcpe\compression\ZlibCompressor;
use pocketmine\network\mcpe\NetworkSession;
use pocketmine\network\mcpe\PacketRateLimiter;
use pocketmine\network\mcpe\protocol\NetworkStackLatencyPacket;
use pocketmine\network\mcpe\protocol\Packet as BedrockPacket;
use pocketmine\network\mcpe\protocol\PacketDecodeException;
use pocketmine\network\mcpe\protocol\PacketPool;
use pocketmine\network\mcpe\protocol\ProtocolInfo;
use pocketmine\network\mcpe\protocol\RequestNetworkSettingsPacket;
use pocketmine\network\mcpe\protocol\serializer\PacketBatch;
use pocketmine\network\mcpe\protocol\serializer\PacketSerializer;
use pocketmine\network\mcpe\protocol\types\CompressionAlgorithm;
use pocketmine\network\mcpe\raklib\PthreadsChannelReader;
use pocketmine\network\mcpe\raklib\SnoozeAwarePthreadsChannelWriter;
use pocketmine\network\PacketHandlingException;
@@ -29,6 +42,14 @@
use function socket_close;
use function socket_getpeername;
use function socket_last_error;
use pocketmine\utils\BinaryStream;
use Socket;

Check failure on line 46 in ProxyServer.php

GitHub Actions / PHPStan Analysis

Cannot use Socket as Socket because the name is already in use on line 46
use function array_keys;
use function base64_encode;
use function bin2hex;
use function chr;
use function getenv;
use function ord;
use function socket_read;
use function socket_recv;
use function socket_select;
@@ -45,6 +66,8 @@
use const SO_SNDTIMEO;
use const SOCKET_EWOULDBLOCK;
use const SOL_SOCKET;
use function substr;
use function zstd_uncompress;

class ProxyServer
{
@@ -70,6 +93,14 @@

/** @var Socket[] */
private array $sockets = [];

/** @phpstan-var array<int, PacketRateLimiter> */
private array $gamePacketLimiter = [];
/** @phpstan-var array<int, PacketRateLimiter> */
private array $batchPacketLimiter = [];
/** @phpstan-var array<int, int> */
private array $protocolId = [];

/** @phpstan-var array<int, array<int|string>> */
private array $socketBuffer = [];

@@ -109,7 +140,13 @@
$this->logger->debug('Socket is not connected anymore.');
}
socket_close($socket);
unset($this->sockets[$socketId], $this->socketBuffer[$socketId]);
unset(
$this->sockets[$socketId],
$this->socketBuffer[$socketId],
$this->gamePacketLimiter[$socketId],
$this->batchPacketLimiter[$socketId],
$this->protocolId[$socketId]
);
}

$this->logger->debug("Disconnected socket with id " . $socketId);
@@ -127,6 +164,16 @@
return $this->sockets[$socketId] ?? null;
}

private function getGamePacketLimiter(int $streamIdentifier): PacketRateLimiter
{
return $this->gamePacketLimiter[$streamIdentifier] ??= new PacketRateLimiter("Game Packets", 2, 100);
}

private function getBatchPacketLimiter(int $streamIdentifier): PacketRateLimiter
{
return $this->batchPacketLimiter[$streamIdentifier] ??= new PacketRateLimiter("Batch Packets", 2, 100);
}

private function putPacket(int $socketId, ProxyPacket $pk): void
{
$serializer = new ProxyPacketSerializer();
@@ -190,17 +237,7 @@
break;
case ForwardPacket::NETWORK_ID:
/** @var ForwardPacket $pk */
if (($socket = $this->getSocket($socketId)) === null) {
throw new PacketHandlingException('Socket with id (' . $socketId . ") doesn't exist.");
}

try {
if (socket_write($socket, Binary::writeInt(strlen($pk->payload)) . $pk->payload) === false) {
throw new PacketHandlingException('client disconnect');
}
} catch (ErrorException $exception) {
throw PacketHandlingException::wrap($exception, 'client disconnect');
}
$this->sendPayload($socketId, $pk->payload);
break;
}
} catch (PacketHandlingException $exception) {
@@ -209,6 +246,156 @@
}
}

/**
* Sends a payload to the client
*/
private function sendPayload(int $socketId, string $payload): void
{
if (($socket = $this->getSocket($socketId)) === null) {
throw new PacketHandlingException('Socket with id (' . $socketId . ") doesn't exist.");
}

try {
if (socket_write($socket, Binary::writeInt(strlen($payload)) . $payload) === false) {
throw new PacketHandlingException('client disconnect');
}
} catch (ErrorException $exception) {
throw PacketHandlingException::wrap($exception, 'client disconnect');
}
}

/**
* Sends a data packet to the main thread.
*/
private function sendDataPacketToMain(int $socketId, string $payload): void
{
$pk = new ForwardPacket();
$pk->payload = $payload;

$this->sendToMainBuffer($socketId, $pk);
}

/**
* Returns the protocol ID for the given socket identifier.
*/
private function getProtocolId(int $socketId): int
{
return $this->protocolId[$socketId] ?? ProtocolInfo::CURRENT_PROTOCOL;
}

/**
* Sends a data packet to the client using a single packet in a batch.
*/
private function sendDataPacket(int $socketId, BedrockPacket $packet): void
{
$packetSerializer = PacketSerializer::encoder($protocolId = $this->getProtocolId($socketId));
$packet->encode($packetSerializer);

$stream = new BinaryStream();
PacketBatch::encodeRaw($stream, [$packetSerializer->getBuffer()]);
$payload = ($protocolId >= ProtocolInfo::PROTOCOL_1_20_60 ? chr(CompressionAlgorithm::NONE) : '') . $stream->getBuffer();

$this->sendPayload($socketId, $payload);
}

private function decodePacket(int $socketId, BedrockPacket $packet, string $buffer): void
{
$stream = PacketSerializer::decoder($this->protocolId[$socketId] ?? ProtocolInfo::CURRENT_PROTOCOL, $buffer, 0);
try {
$packet->decode($stream);
} catch (PacketDecodeException $e) {
throw PacketHandlingException::wrap($e);
}
if (!$stream->feof()) {
$remains = substr($stream->getBuffer(), $stream->getOffset());
$this->logger->debug("Still " . strlen($remains) . " bytes unread in " . $packet->getName() . ": " . bin2hex($remains));
}
}

/**
* Returns true if the packet was handled successfully, false if it should be sent to the main thread.
*
* @return bool whether the packet was handled successfully
*/
private function handleDataPacket(int $socketId, BedrockPacket $packet, string $buffer): bool
{
if ($packet->pid() == NetworkStackLatencyPacket::NETWORK_ID) {
/** @var NetworkStackLatencyPacket $packet USED FOR PING CALCULATIONS */
$this->decodePacket($socketId, $packet, $buffer);

if ($packet->timestamp === 0 && $packet->needResponse) {
$this->sendDataPacket($socketId, NetworkStackLatencyPacket::response(0));
return true;
}
} else if ($packet->pid() === RequestNetworkSettingsPacket::NETWORK_ID) {
/** @var RequestNetworkSettingsPacket $packet USED TO GET PROTOCOLID */
$this->decodePacket($socketId, $packet, $buffer);

$this->protocolId[$socketId] = $packet->getProtocolVersion();
}

return false;
}

/**
* @see NetworkSession::handleEncoded($payload)
*/
private function onFullDataReceive(int $socketId, string $payload): void
{
try {
$this->getBatchPacketLimiter($socketId)->decrement();

if (strlen($payload) < 1) {
throw new PacketHandlingException("No bytes in payload");
}

$compressionType = ord($payload[0]);
$compressed = substr($payload, 1);

try {
$decompressed = match ($compressionType) {
CompressionAlgorithm::NONE => $compressed,
CompressionAlgorithm::ZLIB => ZlibCompressor::getInstance()->decompress($compressed),
CompressionAlgorithm::NONE - 1 => ($d = zstd_uncompress($compressed)) === false ? throw new DecompressionException("Failed to decompress packet") : $d,
default => throw new PacketHandlingException("Packet compressed with unexpected compression type $compressionType")
};
} catch (ErrorException|DecompressionException $e) {
$this->logger->debug("Failed to decompress packet: " . base64_encode($compressed));
throw PacketHandlingException::wrap($e, "Compressed packet batch decode error");
}

try {
$stream = new BinaryStream($decompressed);
$count = 0;
foreach (PacketBatch::decodeRaw($stream) as $buffer) {
$this->getGamePacketLimiter($socketId)->decrement();
if (++$count > 100) {
throw new PacketHandlingException("Too many packets in batch");
}
$packet = PacketPool::getInstance()->getPacket($buffer);
if ($packet === null) {
$this->logger->debug("Unknown packet: " . base64_encode($buffer));
throw new PacketHandlingException("Unknown packet received");
}
try {
if (!$this->handleDataPacket($socketId, $packet, $buffer)) {
$this->sendDataPacketToMain($socketId, $buffer);
}
} catch (PacketHandlingException $e) {
$this->logger->debug($packet->getName() . ": " . base64_encode($buffer));
throw PacketHandlingException::wrap($e, "Error processing " . $packet->getName());
}
}
} catch (PacketDecodeException|BinaryDataException $e) {
$this->logger->logException($e);
throw PacketHandlingException::wrap($e, "Packet batch decode error");
}
} catch (PacketHandlingException $e) {
$this->logger->logException($e);
$this->closeSocket($socketId, $exception->getMessage());
}
}

private function onServerSocketReceive(): void
{
$socket = socket_accept($this->serverSocket);
@@ -273,11 +460,7 @@
// A null frame data indicates that there is not enough bytes to read.
if ($rawFrameData !== null) {
unset($this->socketBuffer[$socketId]);

$pk = new ForwardPacket();
$pk->payload = $rawFrameData;

$this->putPacket($socketId, $pk);
$this->onFullDataReceive($socketId, $rawFrameData);
}
}

0 comments on commit 44b9499

Please sign in to comment.