Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor: initial state sync #1936

Merged
merged 30 commits into from
Sep 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
38dbfa1
rename WalletId to FromWalletId
NickKhalow Aug 27, 2024
b76b87a
ISDKMessageBusCommsControllerAPI clear contract
NickKhalow Aug 27, 2024
c40a273
nullables
NickKhalow Aug 27, 2024
494ac31
ISDKObservableEventsEngineApi simplify contracts
NickKhalow Aug 27, 2024
54c5504
EngineApiWrapper update contract
NickKhalow Aug 27, 2024
eabcb22
SceneRuntimeImpl remove unused params
NickKhalow Aug 27, 2024
3ba260c
remove unused import
NickKhalow Aug 27, 2024
a4d8874
SubscribeToSDKObservableEvent simplify
NickKhalow Aug 27, 2024
3df5ceb
ISDKObservableEventsEngineApi simplify
NickKhalow Aug 27, 2024
c4fee02
messagePipesHub rename to communicationControllerHub
NickKhalow Aug 27, 2024
f69a1a4
CommunicationsControllerAPIImplementationBase as abstract
NickKhalow Aug 27, 2024
f8f72bb
remove double decoding OnMessageReceived
NickKhalow Aug 27, 2024
d3c0cb0
remove logic duplication for encoding
NickKhalow Aug 27, 2024
9462152
decode message private static
NickKhalow Aug 27, 2024
8196ad0
ICommunicationControllerHub to ISceneCommunicationPipe
NickKhalow Aug 27, 2024
8822345
InitialSceneSyncMessagePipe
NickKhalow Aug 28, 2024
5b66a1e
constant sync
NickKhalow Aug 28, 2024
e240c5d
fix nullables
NickKhalow Aug 28, 2024
6a9ee97
check zero byte
NickKhalow Aug 28, 2024
f11bbe4
Revert "check zero byte"
NickKhalow Aug 28, 2024
a44efec
remove content size check
NickKhalow Aug 28, 2024
fb82010
Revert "remove content size check"
NickKhalow Aug 28, 2024
400b270
special msg signal send
NickKhalow Aug 28, 2024
bac8771
Revert "special msg signal send"
NickKhalow Aug 28, 2024
4ea67c4
Merge remote-tracking branch 'origin/main' into fix/initial-state-sync
NickKhalow Aug 29, 2024
13d550d
remove unused pipe
NickKhalow Aug 29, 2024
e48ba50
fix encoding EncodeAndSendMessage
NickKhalow Aug 29, 2024
be1b682
Merge branch 'main' into fix/initial-state-sync
NickKhalow Sep 2, 2024
a8d461a
Merge branch 'main' into fix/initial-state-sync
NickKhalow Sep 9, 2024
9d38064
fix unsigned commits
NickKhalow Sep 9, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@
"GUID:e0eedfa2deb9406daf86fd8368728e39",
"GUID:3640f3c0b42946b0b8794a1ed8e06ca5",
"GUID:d414ef88f3b15f746a4b97636b50dfb4",
"GUID:ca4e81cdd6a34d1aa54c32ad41fc5b3b"
"GUID:ca4e81cdd6a34d1aa54c32ad41fc5b3b",
"GUID:1d2c76eb8b48e0b40940e8b31a679ce1",
"GUID:286980af24684da6acc1caa413039811",
"GUID:e25ef972de004615a22937e739de2def"
],
"includePlatforms": [],
"excludePlatforms": [],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@ public class MessagePipesHub : IMessagePipesHub
private readonly IMessagePipe islandPipe;

public MessagePipesHub(IRoomHub roomHub, IMultiPool sendingMultiPool, IMultiPool receivingMultiPool, IMemoryPool memoryPool) : this(
new MessagePipe(roomHub.SceneRoom().DataPipe, sendingMultiPool, receivingMultiPool, memoryPool).WithLog("Scene"),
new MessagePipe(roomHub.IslandRoom().DataPipe, sendingMultiPool, receivingMultiPool, memoryPool).WithLog("Island")
new MessagePipe(roomHub.SceneRoom().DataPipe, sendingMultiPool, receivingMultiPool, memoryPool)
.WithLog("Scene"),
new MessagePipe(roomHub.IslandRoom().DataPipe, sendingMultiPool, receivingMultiPool, memoryPool)
.WithLog("Island")
) { }

public MessagePipesHub(IMessagePipe scenePipe, IMessagePipe islandPipe)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
using Decentraland.Kernel.Comms.Rfc4;
using ECS.SceneLifeCycle;
using Google.Protobuf;
using LiveKit.Rooms;
using System;

namespace DCL.Multiplayer.Connections.Messaging.Pipe
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,7 @@
using SceneRuntime;
using System;
using System.Buffers;
using System.Collections.Generic;
using System.Text;
using Utility;

namespace CrdtEcsBridge.JsModulesImplementation.Communications
{
Expand All @@ -18,7 +16,7 @@ public class CommunicationsControllerAPIImplementation : CommunicationsControlle
public CommunicationsControllerAPIImplementation(
IRealmData realmData,
ISceneData sceneData,
ICommunicationControllerHub messagePipesHub,
ISceneCommunicationPipe messagePipesHub,
IJsOperations jsOperations,
ICRDTMemoryAllocator crdtMemoryAllocator,
ISceneStateProvider sceneStateProvider) : base(
Expand All @@ -31,18 +29,15 @@ public CommunicationsControllerAPIImplementation(
this.crdtMemoryAllocator = crdtMemoryAllocator;
}

protected override void OnMessageReceived(ICommunicationControllerHub.SceneMessage receivedMessage)
protected override void OnMessageReceived(MsgType messageType, ReadOnlySpan<byte> decodedMessage, string fromWalletId)
{
ReadOnlySpan<byte> decodedMessage = receivedMessage.Data.Span;
MsgType msgType = DecodeMessage(ref decodedMessage);

if (msgType != MsgType.Uint8Array || decodedMessage.Length == 0)
if (messageType != MsgType.Uint8Array)
return;

// Wallet Id
int walletBytesCount = Encoding.UTF8.GetByteCount(receivedMessage.WalletId);
int walletBytesCount = Encoding.UTF8.GetByteCount(fromWalletId);
Span<byte> senderBytes = stackalloc byte[walletBytesCount];
Encoding.UTF8.GetBytes(receivedMessage.WalletId, senderBytes);
Encoding.UTF8.GetBytes(fromWalletId, senderBytes);

int messageLength = senderBytes.Length + decodedMessage.Length + 1;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
using CrdtEcsBridge.PoolsProviders;
using DCL.Multiplayer.Connections.Messaging;
using Decentraland.Kernel.Comms.Rfc4;
using ECS;
using SceneRunner.Scene;
using SceneRuntime;
Expand All @@ -13,47 +11,48 @@

namespace CrdtEcsBridge.JsModulesImplementation.Communications
{
public class CommunicationsControllerAPIImplementationBase : ICommunicationsControllerAPI
public abstract class CommunicationsControllerAPIImplementationBase : ICommunicationsControllerAPI
{
internal enum MsgType
public enum MsgType
{
String = 1, // SDK scenes MessageBus messages
Uint8Array = 2,
}

protected readonly CancellationTokenSource cancellationTokenSource = new ();
protected readonly ICommunicationControllerHub messagePipesHub;
protected readonly ISceneData sceneData;
protected readonly ISceneStateProvider sceneStateProvider;
protected readonly IJsOperations jsOperations;
protected readonly Action<ICommunicationControllerHub.SceneMessage> onMessageReceivedCached;
protected readonly List<IMemoryOwner<byte>> eventsToProcess = new ();
private readonly CancellationTokenSource cancellationTokenSource = new ();
private readonly ISceneCommunicationPipe sceneCommunicationPipe;
private readonly IRealmData realmData;
private readonly ISceneData sceneData;
private readonly ISceneStateProvider sceneStateProvider;
private readonly IJsOperations jsOperations;
private readonly Action<ISceneCommunicationPipe.SceneMessage> onMessageReceivedCached;

internal IReadOnlyList<IMemoryOwner<byte>> EventsToProcess => eventsToProcess;

public CommunicationsControllerAPIImplementationBase(
protected CommunicationsControllerAPIImplementationBase(
IRealmData realmData,
ISceneData sceneData,
ICommunicationControllerHub messagePipesHub,
ISceneCommunicationPipe sceneCommunicationPipe,
IJsOperations jsOperations,
ISceneStateProvider sceneStateProvider)
{
this.realmData = realmData;
this.sceneData = sceneData;
this.messagePipesHub = messagePipesHub;
this.sceneCommunicationPipe = sceneCommunicationPipe;
this.jsOperations = jsOperations;
this.sceneStateProvider = sceneStateProvider;

onMessageReceivedCached = OnMessageReceived;

// if it's the world subscribe to the messages straight-away
if (IgnoreIsCurrentScene())
this.messagePipesHub.SetSceneMessageHandler(onMessageReceivedCached);
this.sceneCommunicationPipe.SetSceneMessageHandler(onMessageReceivedCached);
}

public void Dispose()
{
messagePipesHub.RemoveSceneMessageHandler(onMessageReceivedCached);
sceneCommunicationPipe.RemoveSceneMessageHandler(onMessageReceivedCached);

lock (eventsToProcess) { CleanUpReceivedMessages(); }

Expand All @@ -65,9 +64,9 @@ public void OnSceneIsCurrentChanged(bool isCurrent)
if (IgnoreIsCurrentScene()) return;

if (isCurrent)
messagePipesHub.SetSceneMessageHandler(onMessageReceivedCached);
sceneCommunicationPipe.SetSceneMessageHandler(onMessageReceivedCached);
else
messagePipesHub.RemoveSceneMessageHandler(onMessageReceivedCached);
sceneCommunicationPipe.RemoveSceneMessageHandler(onMessageReceivedCached);
}

public object SendBinary(IReadOnlyList<PoolableByteArray> data)
Expand All @@ -76,22 +75,8 @@ public object SendBinary(IReadOnlyList<PoolableByteArray> data)
return jsOperations.ConvertToScriptTypedArrays(Array.Empty<IMemoryOwner<byte>>());

foreach (PoolableByteArray poolable in data)
{
if (poolable.Length == 0)
continue;

Memory<byte> message = poolable.Memory;

EncodeAndSend();

void EncodeAndSend()
{
Span<byte> encodedMessage = stackalloc byte[message.Length + 1];
encodedMessage[0] = (byte)MsgType.Uint8Array;
message.Span.CopyTo(encodedMessage[1..]);
SendMessage(encodedMessage);
}
}
if (poolable.Length > 0)
EncodeAndSendMessage(MsgType.Uint8Array, poolable.Memory.Span);

lock (eventsToProcess)
{
Expand All @@ -118,18 +103,32 @@ private void CleanUpReceivedMessages()
eventsToProcess.Clear();
}

private void SendMessage(ReadOnlySpan<byte> message)
protected void EncodeAndSendMessage(MsgType msgType, ReadOnlySpan<byte> message)
{
messagePipesHub.SendMessage(message, sceneData.SceneEntityDefinition.id!, cancellationTokenSource.Token);
Span<byte> encodedMessage = stackalloc byte[message.Length + 1];
encodedMessage[0] = (byte)msgType;
message.CopyTo(encodedMessage[1..]);
sceneCommunicationPipe.SendMessage(encodedMessage, sceneData.SceneEntityDefinition.id!, cancellationTokenSource.Token);
}

protected virtual void OnMessageReceived(ICommunicationControllerHub.SceneMessage receivedMessage) { }

internal static MsgType DecodeMessage(ref ReadOnlySpan<byte> value)
private static MsgType DecodeMessage(ref ReadOnlySpan<byte> value)
{
var msgType = (MsgType)value[0];
value = value[1..];
return msgType;
}

private void OnMessageReceived(ISceneCommunicationPipe.SceneMessage receivedMessage)
{
ReadOnlySpan<byte> decodedMessage = receivedMessage.Data.Span;
MsgType msgType = DecodeMessage(ref decodedMessage);

if (decodedMessage.Length == 0)
return;

OnMessageReceived(msgType, decodedMessage, receivedMessage.FromWalletId);
}

protected abstract void OnMessageReceived(MsgType messageType, ReadOnlySpan<byte> decodedMessage, string fromWalletId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

namespace CrdtEcsBridge.JsModulesImplementation.Communications
{
public interface ICommunicationControllerHub
public interface ISceneCommunicationPipe
{
void SetSceneMessageHandler(Action<SceneMessage> onSceneMessage);

Expand All @@ -17,13 +17,13 @@ readonly struct SceneMessage
{
public readonly ReadOnlyMemory<byte> Data;
public readonly string SceneId;
public readonly string WalletId;
public readonly string FromWalletId;

private SceneMessage(in ReceivedMessage<Scene> message)
{
Data = message.Payload.Data.Memory;
SceneId = message.Payload.SceneId;
WalletId = message.FromWalletId;
FromWalletId = message.FromWalletId;
}

public static SceneMessage CopyFrom(in ReceivedMessage<Scene> message) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,39 +11,32 @@ namespace CrdtEcsBridge.JsModulesImplementation.Communications.SDKMessageBus
{
public class SDKMessageBusCommsAPIImplementation : CommunicationsControllerAPIImplementationBase, ISDKMessageBusCommsControllerAPI
{
public List<CommsPayload> SceneCommsMessages { get; } = new ();
private readonly List<CommsPayload> messages = new ();

public SDKMessageBusCommsAPIImplementation(
IRealmData realmData,
ISceneData sceneData,
ICommunicationControllerHub messagePipesHub,
IJsOperations jsOperations,
ISceneStateProvider sceneStateProvider)
: base(realmData, sceneData, messagePipesHub, jsOperations, sceneStateProvider)
public IReadOnlyList<CommsPayload> SceneCommsMessages => messages;

public SDKMessageBusCommsAPIImplementation(IRealmData realmData, ISceneData sceneData, ISceneCommunicationPipe sceneCommunicationPipe, IJsOperations jsOperations, ISceneStateProvider sceneStateProvider)
: base(realmData, sceneData, sceneCommunicationPipe, jsOperations, sceneStateProvider) { }

public void ClearMessages()
{
messages.Clear();
}

public void Send(string data)
{
var dataBytes = Encoding.UTF8.GetBytes(data);
Span<byte> encodedMessage = stackalloc byte[dataBytes.Length + 1];
encodedMessage[0] = (byte)MsgType.String;
dataBytes.CopyTo(encodedMessage[1..]);

messagePipesHub.SendMessage(encodedMessage, sceneData.SceneEntityDefinition.id, cancellationTokenSource.Token);
byte[] dataBytes = Encoding.UTF8.GetBytes(data);
EncodeAndSendMessage(MsgType.String, dataBytes);
}

protected override void OnMessageReceived(ICommunicationControllerHub.SceneMessage receivedMessage)
protected override void OnMessageReceived(MsgType messageType, ReadOnlySpan<byte> decodedMessage, string fromWalletId)
{
ReadOnlySpan<byte> decodedMessage = receivedMessage.Data.Span;
MsgType msgType = DecodeMessage(ref decodedMessage);

if (msgType != MsgType.String || decodedMessage.Length == 0)
if (messageType != MsgType.String)
return;

SceneCommsMessages.Add(new CommsPayload
messages.Add(new CommsPayload
{
sender = receivedMessage.WalletId,
sender = fromWalletId,
message = Encoding.UTF8.GetString(decodedMessage)
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@ namespace CrdtEcsBridge.JsModulesImplementation.Communications
/// We can't subscribe to the `Scene` message multiple times
/// so Hub handles the subscription and the API implementation handles the message processing
/// </summary>
public class CommunicationControllerHub : ICommunicationControllerHub
public class SceneCommunicationPipe : ISceneCommunicationPipe
{
private Action<ICommunicationControllerHub.SceneMessage>? onSceneMessage;
private Action<ISceneCommunicationPipe.SceneMessage>? onSceneMessage;
private readonly IMessagePipe messagePipe;

public CommunicationControllerHub(IMessagePipesHub messagePipesHub)
public SceneCommunicationPipe(IMessagePipesHub messagePipesHub)
{
messagePipe = messagePipesHub.ScenePipe();
messagePipe.Subscribe<Scene>(Packet.MessageOneofCase.Scene, InvokeCurrentHandler);
Expand All @@ -27,15 +27,15 @@ public CommunicationControllerHub(IMessagePipesHub messagePipesHub)
private void InvokeCurrentHandler(ReceivedMessage<Scene> message)
{
using (message)
onSceneMessage?.Invoke(ICommunicationControllerHub.SceneMessage.CopyFrom(in message));
onSceneMessage?.Invoke(ISceneCommunicationPipe.SceneMessage.CopyFrom(in message));
}

public void RemoveSceneMessageHandler(Action<ICommunicationControllerHub.SceneMessage> onSceneMessage)
public void RemoveSceneMessageHandler(Action<ISceneCommunicationPipe.SceneMessage> onSceneMessage)
{
lock (this) { this.onSceneMessage -= onSceneMessage; }
}

public void SetSceneMessageHandler(Action<ICommunicationControllerHub.SceneMessage> onSceneMessage)
public void SetSceneMessageHandler(Action<ISceneCommunicationPipe.SceneMessage> onSceneMessage)
{
lock (this) { this.onSceneMessage += onSceneMessage; }
}
Expand Down
Loading
Loading