Skip to content

Commit

Permalink
Now supporting net performance graphs on client.
Browse files Browse the repository at this point in the history
See RigsOfRods/rigs-of-rods#3056 - `RoRnet::Header` was updated with extra timestamps to track network performance. The problem is, the server didn't transfer the header as-is, so the data got lost. I added code to transfer the headers as-is, but to avoid massive commit, I left all the server messages as-is, only changed processing messages from clients.

Changes:
- RoRnet updated to support new timestamps
- Messaging: added `SendMessageWithHeader()`
- Broadcaster - transformed `QueueMessage()` with separate values to `QueuePacket` with header.
- Sequencer - accept and broadcast the entire header. Added `sendStreamData()` helper for that.
  • Loading branch information
ohlidalp committed Jun 16, 2023
1 parent 925e53c commit 4965c06
Show file tree
Hide file tree
Showing 8 changed files with 134 additions and 85 deletions.
116 changes: 67 additions & 49 deletions source/protocol/rornet.h
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
/*
This file is part of Rigs of Rods
Copyright 2007 Pierre-Michel Ricordel
Copyright 2014+ Petr Ohlidal & contributors.
Copyright 2007 Pierre-Michel Ricordel
Copyright 2014-2017 Ulteq
Copyright 2020-2023 Petr Ohlidal
Rigs of Rods is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
Expand All @@ -20,18 +21,20 @@
#pragma once

#include <stdint.h>

typedef uint32_t BitMask_t;
#define BITMASK(x) (1 << (x-1))

namespace RoRnet {

#define RORNET_MAX_PEERS 64 //!< maximum clients connected at the same time
#define RORNET_MAX_MESSAGE_LENGTH 8192 //!< maximum size of a RoR message. 8192 bytes = 8 kibibytes
#define RORNET_LAN_BROADCAST_PORT 13000 //!< port used to send the broadcast announcement in LAN mode
#define RORNET_MAX_USERNAME_LEN 40 //!< port used to send the broadcast announcement in LAN mode
#define RORNET_MAX_USERNAME_LEN 40 //!< bytes.

#define RORNET_VERSION "RoRnet_2.44"

typedef uint32_t NetTime32_t; //!< Milliseconds

enum MessageType
{
MSG2_HELLO = 1025, //!< client sends its version as first message
Expand All @@ -54,8 +57,8 @@ enum MessageType
MSG2_GAME_CMD, //!< Script message. Can be sent in both directions.
MSG2_USER_JOIN, //!< new user joined
MSG2_USER_LEAVE, //!< user leaves
MSG2_UTF8_CHAT, //!< chat line in UTF8 encoding
MSG2_UTF8_PRIVCHAT, //!< private chat line in UTF8 encoding
MSG2_UTF8_CHAT, //!< broadcast chat line in UTF8 encoding; Payload: const char*(text)
MSG2_UTF8_PRIVCHAT, //!< private chat line in UTF8 encoding; Payload: uint32_t(uniqueid), const char*(text)

// Stream functions
MSG2_STREAM_REGISTER, //!< create new stream
Expand All @@ -67,6 +70,7 @@ enum MessageType
// Legacy values (RoRnet_2.38 and earlier)
MSG2_WRONG_VER_LEGACY = 1003, //!< Wrong version

// Special values
MSG2_INVALID = 0 //!< Not to be transmitted
};

Expand All @@ -82,37 +86,45 @@ enum UserAuth

enum Netmask
{
NETMASK_HORN = BITMASK(1), //!< horn is in use
NETMASK_LIGHTS = BITMASK(2), //!< lights on
NETMASK_BRAKES = BITMASK(3), //!< brake lights on
NETMASK_REVERSE = BITMASK(4), //!< reverse light on
NETMASK_BEACONS = BITMASK(5), //!< beacons on
NETMASK_BLINK_LEFT = BITMASK(6), //!< left blinker on
NETMASK_BLINK_RIGHT = BITMASK(7), //!< right blinker on
NETMASK_BLINK_WARN = BITMASK(8), //!< warn blinker on
NETMASK_CLIGHT1 = BITMASK(9), //!< custom light 1 on
NETMASK_CLIGHT2 = BITMASK(10), //!< custom light 2 on
NETMASK_CLIGHT3 = BITMASK(11), //!< custom light 3 on
NETMASK_CLIGHT4 = BITMASK(12), //!< custom light 4 on
NETMASK_CLIGHT5 = BITMASK(13), //!< custom light 5 on
NETMASK_CLIGHT6 = BITMASK(14), //!< custom light 6 on
NETMASK_CLIGHT7 = BITMASK(15), //!< custom light 7 on
NETMASK_CLIGHT8 = BITMASK(16), //!< custom light 8 on
NETMASK_CLIGHT9 = BITMASK(17), //!< custom light 9 on
NETMASK_CLIGHT10 = BITMASK(18), //!< custom light 10 on
NETMASK_POLICEAUDIO = BITMASK(19), //!< police siren on
NETMASK_PARTICLE = BITMASK(20), //!< custom particles on
NETMASK_PBRAKE = BITMASK(21), //!< parking brake on
NETMASK_TC_ACTIVE = BITMASK(22), //!< traction control light on?
NETMASK_ALB_ACTIVE = BITMASK(23), //!< anti lock brake light on?
NETMASK_ENGINE_CONT = BITMASK(24), //!< ignition on?
NETMASK_ENGINE_RUN = BITMASK(25), //!< engine running?

NETMASK_ENGINE_MODE_AUTOMATIC = BITMASK(26), //!< engine mode
NETMASK_ENGINE_MODE_SEMIAUTO = BITMASK(27), //!< engine mode
NETMASK_ENGINE_MODE_MANUAL = BITMASK(28), //!< engine mode
NETMASK_ENGINE_MODE_MANUAL_STICK = BITMASK(29), //!< engine mode
NETMASK_ENGINE_MODE_MANUAL_RANGES = BITMASK(30) //!< engine mode
NETMASK_HORN = BITMASK(1), //!< horn is in use
NETMASK_POLICEAUDIO = BITMASK(2), //!< police siren on
NETMASK_PARTICLE = BITMASK(3), //!< custom particles on
NETMASK_PBRAKE = BITMASK(4), //!< parking brake
NETMASK_TC_ACTIVE = BITMASK(5), //!< traction control light on?
NETMASK_ALB_ACTIVE = BITMASK(6), //!< anti lock brake light on?
NETMASK_ENGINE_CONT = BITMASK(7), //!< ignition on?
NETMASK_ENGINE_RUN = BITMASK(8), //!< engine running?

NETMASK_ENGINE_MODE_AUTOMATIC = BITMASK(9), //!< engine mode
NETMASK_ENGINE_MODE_SEMIAUTO = BITMASK(10), //!< engine mode
NETMASK_ENGINE_MODE_MANUAL = BITMASK(11), //!< engine mode
NETMASK_ENGINE_MODE_MANUAL_STICK = BITMASK(12), //!< engine mode
NETMASK_ENGINE_MODE_MANUAL_RANGES = BITMASK(13), //!< engine mode
};

enum Lightmask
{
LIGHTMASK_CUSTOM1 = BITMASK(1), //!< custom light 1 on
LIGHTMASK_CUSTOM2 = BITMASK(2), //!< custom light 2 on
LIGHTMASK_CUSTOM3 = BITMASK(3), //!< custom light 3 on
LIGHTMASK_CUSTOM4 = BITMASK(4), //!< custom light 4 on
LIGHTMASK_CUSTOM5 = BITMASK(5), //!< custom light 5 on
LIGHTMASK_CUSTOM6 = BITMASK(6), //!< custom light 6 on
LIGHTMASK_CUSTOM7 = BITMASK(7), //!< custom light 7 on
LIGHTMASK_CUSTOM8 = BITMASK(8), //!< custom light 8 on
LIGHTMASK_CUSTOM9 = BITMASK(9), //!< custom light 9 on
LIGHTMASK_CUSTOM10 = BITMASK(10), //!< custom light 10 on

LIGHTMASK_HEADLIGHT = BITMASK(11),
LIGHTMASK_HIGHBEAMS = BITMASK(12),
LIGHTMASK_FOGLIGHTS = BITMASK(13),
LIGHTMASK_SIDELIGHTS = BITMASK(14),
LIGHTMASK_BRAKES = BITMASK(15), //!< brake lights on
LIGHTMASK_REVERSE = BITMASK(16), //!< reverse light on
LIGHTMASK_BEACONS = BITMASK(17), //!< beacons on
LIGHTMASK_BLINK_LEFT = BITMASK(18), //!< left blinker on
LIGHTMASK_BLINK_RIGHT = BITMASK(19), //!< right blinker on
LIGHTMASK_BLINK_WARN = BITMASK(20), //!< warn blinker on
};

// -------------------------------- structs -----------------------------------
Expand All @@ -123,32 +135,37 @@ enum Netmask

struct Header //!< Common header for every packet
{
uint32_t command; //!< the command of this packet: MSG2_*
int32_t source; //!< source of this command: 0 = server
uint32_t streamid; //!< streamid for this command
uint32_t size; //!< size of the attached data block
uint32_t command; //!< the command of this packet: MSG2_*
int32_t source; //!< client who sent this command: 0 = server
NetTime32_t source_queue_time; //!< client time when queuing packet for sending
NetTime32_t source_send_time; //!< client time when actually sending the packet
uint32_t streamid; //!< streamid for this command
uint32_t size; //!< size of the attached data block

};

struct StreamRegister //!< Sent from the client to server and vice versa, to broadcast a new stream
{
int32_t type; //!< stream type
int32_t type; //!< 0 = Actor, 1 = Character, 3 = ChatSystem
int32_t status; //!< initial stream status
int32_t origin_sourceid; //!< origin sourceid
int32_t origin_streamid; //!< origin streamid
char name[128]; //!< the actor filename
char name[128]; //!< file name
char data[128]; //!< data used for stream setup
};

struct ActorStreamRegister
struct ActorStreamRegister //!< Must preserve mem. layout of RoRnet::StreamRegister
{
int32_t type; //!< stream type
// RoRnet::StreamRegister: Common
int32_t type; //!< 0
int32_t status; //!< initial stream status
int32_t origin_sourceid; //!< origin sourceid
int32_t origin_streamid; //!< origin streamid
char name[128]; //!< filename
char name[128]; //!< truck file name
// RoRnet::StreamRegister: Data buffer (128B)
int32_t bufferSize; //!< initial stream status
int32_t time; //!< initial time stamp
char skin[60]; //!< skin
char skin[60]; //!< skin
char sectionconfig[60]; //!< section configuration
};

Expand All @@ -164,7 +181,7 @@ struct UserInfo
int32_t slotnum; //!< slot number set by server
int32_t colournum; //!< colour set by server

char username[RORNET_MAX_USERNAME_LEN]; //!< the nickname of the user WIDE CHAR!
char username[RORNET_MAX_USERNAME_LEN]; //!< the nickname of the user (UTF-8)
char usertoken[40]; //!< user token
char serverpassword[40]; //!< server password
char language[10]; //!< user's language. For example "de-DE" or "en-US"
Expand All @@ -185,7 +202,8 @@ struct VehicleState //!< Formerly `oob_t`
float hydrodirstate; //!< the turning direction status
float brake; //!< the brake value
float wheelspeed; //!< the wheel speed value
uint32_t flagmask; //!< flagmask: NETMASK_*
BitMask_t flagmask; //!< flagmask: NETMASK_*
BitMask_t lightmask; //!< flagmask: LIGHTMASK_*
};

struct ServerInfo
Expand Down
21 changes: 10 additions & 11 deletions source/server/broadcaster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -122,39 +122,38 @@ Broadcaster::ThreadState Broadcaster::ThreadWaitForMessage(QueueEntry& out_messa


bool Broadcaster::ThreadTransmitMessage(QueueEntry const& msg) {
int type = msg.type;
int type = msg.header.command;
if (type == RoRnet::MSG2_INVALID)
return true; // No error.
if (type == RoRnet::MSG2_STREAM_DATA_DISCARDABLE)
type = RoRnet::MSG2_STREAM_DATA;

int res = Messaging::SendMessage(m_client->GetSocket(), type, msg.uid, msg.streamid, msg.datalen, msg.data);
int res = Messaging::SendMessageWithHeader(m_client->GetSocket(), msg.header, msg.data);
return res == 0;
}


void Broadcaster::QueueMessage(int type, int uid, unsigned int streamid, unsigned int len, const char *data) {
void Broadcaster::QueuePacket(RoRnet::Header header, const char *data) {
QueueEntry msg;
msg.type = (RoRnet::MessageType)type;
msg.uid = uid;
msg.streamid = streamid;
msg.datalen = len;
std::memcpy(msg.data, data, len);
msg.header = header;
std::memcpy(msg.data, data, header.size);

{
std::lock_guard<std::mutex> scoped_lock(m_mutex);
if (m_msg_queue.empty()) {
m_packet_drop_counter = 0;
m_is_dropping_packets = (++m_packet_good_counter > 3) ? false : m_is_dropping_packets;
} else if (type == RoRnet::MSG2_STREAM_DATA_DISCARDABLE) {
} else if (msg.header.command == RoRnet::MSG2_STREAM_DATA_DISCARDABLE) {
auto search = std::find_if(m_msg_queue.begin(), m_msg_queue.end(), [&](const QueueEntry& m)
{ return m.type == RoRnet::MSG2_STREAM_DATA_DISCARDABLE && m.uid == uid && m.streamid == streamid; });
{ return m.header.command == RoRnet::MSG2_STREAM_DATA_DISCARDABLE
&& m.header.source == msg.header.source
&& m.header.streamid == msg.header.streamid; });
if (search != m_msg_queue.end()) {
// Found outdated discardable streamdata -> replace it
(*search) = msg;
m_packet_good_counter = 0;
m_is_dropping_packets = (++m_packet_drop_counter > 3) ? true : m_is_dropping_packets;
Messaging::StatsAddOutgoingDrop(sizeof(RoRnet::Header) + msg.datalen); // Statistics
Messaging::StatsAddOutgoingDrop(sizeof(RoRnet::Header) + msg.header.size); // Statistics
return;
}
}
Expand Down
7 changes: 2 additions & 5 deletions source/server/broadcaster.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,7 @@ along with Foobar. If not, see <http://www.gnu.org/licenses/>.
#include <thread>

struct QueueEntry {
RoRnet::MessageType type = RoRnet::MSG2_INVALID;
int uid;
unsigned int streamid;
unsigned int datalen;
RoRnet::Header header;
char data[RORNET_MAX_MESSAGE_LENGTH];
};

Expand All @@ -55,7 +52,7 @@ class Broadcaster {
void Start(Client* client);
void Stop();

void QueueMessage(int msg_type, int client_id, unsigned int streamid, unsigned int payload_len, const char *payload);
void QueuePacket(RoRnet::Header header, const char *payload);
bool IsDroppingPackets() const { return m_is_dropping_packets; }

private:
Expand Down
31 changes: 21 additions & 10 deletions source/server/messaging.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,22 @@ namespace Messaging {
return s_traffic;
}

int SendMessage(SWInetSocket *socket,
int type, int source, unsigned int streamid, unsigned int len,
const char *content)
{
assert(socket != nullptr);

RoRnet::Header head;
memset(&head, 0, sizeof(RoRnet::Header));
head.command = type;
head.source = source;
head.size = len;
head.streamid = streamid;

return SendMessageWithHeader(socket, head, content);
}

/**
* @param socket Socket to communicate over
* @param type Command ID
Expand All @@ -93,32 +109,27 @@ namespace Messaging {
* @param content Payload
* @return 0 on success
*/
int SendMessage(SWInetSocket *socket, int type, int source, unsigned int streamid, unsigned int len,
int SendMessageWithHeader(SWInetSocket *socket, RoRnet::Header head,
const char *content) {
assert(socket != nullptr);

SWBaseSocket::SWBaseError error;
RoRnet::Header head;

const int msgsize = sizeof(RoRnet::Header) + len;
const int msgsize = sizeof(RoRnet::Header) + head.size;

if (msgsize >= RORNET_MAX_MESSAGE_LENGTH) {
Logger::Log(LOG_ERROR, "UID: %d - attempt to send too long message", source);
Logger::Log(LOG_ERROR, "UID: %d - attempt to send too long message", head.source);
return -4;
}

char buffer[RORNET_MAX_MESSAGE_LENGTH];

memset(&head, 0, sizeof(RoRnet::Header));
head.command = type;
head.source = source;
head.size = len;
head.streamid = streamid;


// construct buffer
memset(buffer, 0, RORNET_MAX_MESSAGE_LENGTH);
memcpy(buffer, (char *) &head, sizeof(RoRnet::Header));
memcpy(buffer + sizeof(RoRnet::Header), content, len);
memcpy(buffer + sizeof(RoRnet::Header), content, head.size);

if (socket->fsend(buffer, msgsize, &error) < msgsize)
{
Expand Down
5 changes: 5 additions & 0 deletions source/server/messaging.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ namespace Messaging {
unsigned int payload_len,
const char *payload);

int SendMessageWithHeader(
SWInetSocket *socket,
RoRnet::Header header,
const char *payload);

int ReceiveMessage(
SWInetSocket *socket,
int *out_msg_type,
Expand Down
3 changes: 1 addition & 2 deletions source/server/receiver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,7 @@ void Receiver::ThreadMain() {
break;
}

m_sequencer->queueMessage(m_client->GetUserId(),
(int)m_recv_header.command, m_recv_header.streamid, m_recv_payload, m_recv_header.size);
m_sequencer->queueMessage(m_recv_header, m_recv_payload);
}

Logger::Log(LOG_DEBUG, "Receiver thread (user ID %d) exits", m_client->GetUserId());
Expand Down
Loading

0 comments on commit 4965c06

Please sign in to comment.