Skip to content

Commit

Permalink
Passed!
Browse files Browse the repository at this point in the history
  • Loading branch information
vczh committed Feb 17, 2025
1 parent 7234d8c commit c73c422
Show file tree
Hide file tree
Showing 4 changed files with 2,364 additions and 15 deletions.
60 changes: 45 additions & 15 deletions Source/PlatformProviders/Remote/GuiRemoteProtocol_Channel_Async.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ void ChannelPackageSemanticUnpack(
using TUIMainProc = Func<void(GuiRemoteProtocolAsyncChannelSerializer<TPackage>*)>;

protected:
using TPendingRequestGroup = collections::List<vint>;
using TPendingRequestStack = collections::List<Ptr<TPendingRequestGroup>>;

IGuiRemoteProtocolChannel<TPackage>* channel = nullptr;
IGuiRemoteProtocolChannelReceiver<TPackage>* receiver = nullptr;
Expand All @@ -116,7 +118,7 @@ void ChannelPackageSemanticUnpack(
SpinLock lockResponses;
EventObject eventManualResponses;
collections::Dictionary<vint, TPackage> queuedResponses;
collections::List<vint> pendingRequests;
TPendingRequestStack pendingRequests;

volatile bool started = false;
volatile bool stopping = false;
Expand All @@ -139,6 +141,7 @@ void ChannelPackageSemanticUnpack(

// All remaining queued callbacks should be executed
FetchAndExecuteUITasks();
eventManualUIThreadStopped.Signal();
}

void ChannelThreadProc()
Expand All @@ -161,10 +164,24 @@ void ChannelPackageSemanticUnpack(

// All remaining queued callbacks should be executed
FetchAndExecuteChannelTasks();
eventManualChannelThreadStopped.Signal();
}

protected:

bool AreCurrentPendingRequestGroupSatisfied()
{
if (pendingRequests.Count() == 0) return false;
for (vint requestId : *pendingRequests[pendingRequests.Count() - 1].Obj())
{
if (!queuedResponses.Keys().Contains(requestId))
{
return false;
}
}
return true;
}

void OnReceive(const TPackage& package) override
{
#define ERROR_MESSAGE_PREFIX L"vl::presentation::remoteprotocol::channeling::GuiRemoteProtocolAsyncChannelSerializer<TPackage>::OnReceive(...)#"
Expand Down Expand Up @@ -192,7 +209,7 @@ void ChannelPackageSemanticUnpack(
SPIN_LOCK(lockResponses)
{
queuedResponses.Add(id, package);
if (pendingRequests.Count() > 0 && pendingRequests[pendingRequests.Count() - 1] == id)
if (AreCurrentPendingRequestGroupSatisfied())
{
eventManualResponses.Signal();
}
Expand All @@ -218,8 +235,8 @@ void ChannelPackageSemanticUnpack(
{
#define ERROR_MESSAGE_PREFIX L"vl::presentation::remoteprotocol::channeling::GuiRemoteProtocolAsyncChannelSerializer<TPackage>::Submit(...)#"

// Ensure at most one request per submit
vint requestId = -1;
// Group all pending requests into a group
Ptr<TPendingRequestGroup> requestGroup;
for (auto&& package : uiPendingPackages)
{
auto semantic = ChannelPackageSemantic::Unknown;
Expand All @@ -229,12 +246,18 @@ void ChannelPackageSemanticUnpack(

if (semantic == ChannelPackageSemantic::Request)
{
CHECK_ERROR(requestId == -1, ERROR_MESSAGE_PREFIX L"Only one request (message that requires a response) is allowed between Submit() calls.");
requestId = id;
SPIN_LOCK(lockResponses)
if (!requestGroup)
{
pendingRequests.Add(id);
requestGroup = Ptr(new TPendingRequestGroup);
}
requestGroup->Add(id);
}
}
if (requestGroup)
{
SPIN_LOCK(lockResponses)
{
pendingRequests.Add(requestGroup);
}
}

Expand All @@ -248,24 +271,31 @@ void ChannelPackageSemanticUnpack(
channel->Submit();
}, &eventAutoChannelTaskQueued);

// Block until the response of the top request is received
// Block until the all responses of the top request group are received
// Re-entrance recursively is possible
if (requestId != -1)
if (requestGroup)
{
eventManualResponses.Wait();
TPackage response;
collections::List<TPackage> responses;
SPIN_LOCK(lockResponses)
{
response = queuedResponses[requestId];
queuedResponses.Remove(requestId);
for (vint id : *requestGroup.Obj())
{
responses.Add(queuedResponses[id]);
queuedResponses.Remove(id);
}
pendingRequests.RemoveAt(pendingRequests.Count() - 1);

if (pendingRequests.Count() == 0 || !queuedResponses.Keys().Contains(pendingRequests[pendingRequests.Count() - 1]))
if (!AreCurrentPendingRequestGroupSatisfied())
{
eventManualResponses.Unsignal();
}
}
receiver->OnReceive(response);

for (auto&& response : responses)
{
receiver->OnReceive(response);
}
}

#undef ERROR_MESSAGE_PREFIX
Expand Down
Loading

0 comments on commit c73c422

Please sign in to comment.