Skip to content

Commit

Permalink
Simple priorities
Browse files Browse the repository at this point in the history
Summary: This is not the full draft 5 scheme, this just munges all the bits into a uint64 and uses sequential ordering, with stream ID tiebreakers.

Reviewed By: jordicenzano

Differential Revision:
D59690806

Privacy Context Container: L1222497

fbshipit-source-id: 60a07e8d07b01585cef967f4c94df43636298496
  • Loading branch information
afrind authored and facebook-github-bot committed Aug 1, 2024
1 parent 5bbab8c commit 568cd84
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 1 deletion.
38 changes: 38 additions & 0 deletions moxygen/MoQSession.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ void MoQSession::start() {
return;
}
auto controlStream = cs.value();
controlStream.writeHandle->setPriority(0, 0, false);

auto mergeToken = folly::CancellationToken::merge(
cancellationSource_.getToken(),
Expand Down Expand Up @@ -271,11 +272,14 @@ void MoQSession::onSubscribe(SubscribeRequest subscribeRequest) {
// Subscribe ID -> Track Name, Locations [currently held in MoQForwarder]
// Track Alias -> Track Name
// If ths session holds this state, it can check for duplicate subscriptions
pubTracks_[subscribeRequest.subscribeID].priority = subscribeRequest.priority;
controlMessages_.enqueue(std::move(subscribeRequest));
}

void MoQSession::onSubscribeUpdate(SubscribeUpdate subscribeUpdate) {
XLOG(DBG1) << __func__;
pubTracks_[subscribeUpdate.subscribeID].priority = subscribeUpdate.priority;
// TODO: update priority of tracks in flight
controlMessages_.enqueue(std::move(subscribeUpdate));
}

Expand Down Expand Up @@ -464,6 +468,7 @@ MoQSession::subscribe(SubscribeRequest sub) {

void MoQSession::subscribeOk(SubscribeOk subOk) {
XLOG(DBG1) << __func__;
pubTracks_[subOk.subscribeID].groupOrder = subOk.groupOrder;
auto res = writeSubscribeOk(controlWriteBuf_, subOk);
if (!res) {
XLOG(ERR) << "writeSubscribeOk failed";
Expand All @@ -474,6 +479,7 @@ void MoQSession::subscribeOk(SubscribeOk subOk) {

void MoQSession::subscribeError(SubscribeError subErr) {
XLOG(DBG1) << __func__;
pubTracks_.erase(subErr.subscribeID);
auto res = writeSubscribeError(controlWriteBuf_, std::move(subErr));
if (!res) {
XLOG(ERR) << "writeSubscribeError failed";
Expand All @@ -494,6 +500,7 @@ void MoQSession::unsubscribe(Unsubscribe unsubscribe) {

void MoQSession::subscribeDone(SubscribeDone subDone) {
XLOG(DBG1) << __func__;
pubTracks_.erase(subDone.subscribeID);
auto res = writeSubscribeDone(controlWriteBuf_, std::move(subDone));
if (!res) {
XLOG(ERR) << "writeSubscribeDone failed";
Expand All @@ -502,6 +509,35 @@ void MoQSession::subscribeDone(SubscribeDone subDone) {
controlWriteEvent_.signal();
}

namespace {
constexpr uint32_t IdMask = 0x1FFFFF;
uint64_t groupOrder(GroupOrder groupOrder, uint64_t group) {
uint32_t truncGroup = static_cast<uint32_t>(group) & IdMask;
return groupOrder == GroupOrder::OldestFirst ? truncGroup
: (IdMask - truncGroup);
}

uint32_t objOrder(uint64_t objId) {
return static_cast<uint32_t>(objId) & IdMask;
}
} // namespace

uint64_t MoQSession::order(const ObjectHeader& objHeader) {
PubTrack pubTrack{
std::numeric_limits<uint8_t>::max(), GroupOrder::OldestFirst};
auto pubTrackIt = pubTracks_.find(objHeader.subscribeID);
if (pubTrackIt != pubTracks_.end()) {
pubTrack = pubTrackIt->second;
}
// 6 reserved bits | 58 bit order
// 6 reserved | 8 sub pri | 8 pub pri | 21 group order | 21 obj order
return (
(uint64_t(pubTrack.priority) << 50) |
(uint64_t(objHeader.priority) << 42) |
(groupOrder(pubTrack.groupOrder, objHeader.group) << 21) |
objOrder(objHeader.id));
}

void MoQSession::publish(
const ObjectHeader& objHeader,
uint64_t payloadOffset,
Expand Down Expand Up @@ -565,6 +601,7 @@ void MoQSession::publishImpl(
}
stream = *res;
XLOG(DBG4) << "New stream created, id: " << stream->getID();
stream->setPriority(1, order(objHeader), false);
}

// Add publishing key
Expand Down Expand Up @@ -680,6 +717,7 @@ void MoQSession::onNewBidiStream(proxygen::WebTransport::BidiStreamHandle bh) {
bh.writeHandle->resetStream(/*error=*/0);
bh.readHandle->stopSending(/*error=*/0);
} else {
bh.writeHandle->setPriority(0, 0, false);
readLoop(StreamType::CONTROL, bh.readHandle).scheduleOn(evb_).start();
auto mergeToken = folly::CancellationToken::merge(
cancellationSource_.getToken(), bh.writeHandle->getCancelToken());
Expand Down
8 changes: 7 additions & 1 deletion moxygen/MoQSession.h
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,8 @@ class MoQSession : public MoQCodec::Callback {
std::unique_ptr<folly::IOBuf> payload,
bool eom);

uint64_t order(const ObjectHeader& objHeader);

struct PublishKey {
uint64_t subscribeID;
uint64_t group;
Expand Down Expand Up @@ -366,7 +368,11 @@ class MoQSession : public MoQCodec::Callback {
std::string,
folly::coro::Promise<folly::Expected<AnnounceOk, AnnounceError>>>
pendingAnnounce_;
folly::F14FastMap<uint64_t, FullTrackName> pubTracks_;
struct PubTrack {
uint8_t priority;
GroupOrder groupOrder;
};
folly::F14FastMap<uint64_t, PubTrack> pubTracks_;
folly::F14FastMap<PublishKey, PublishData, PublishKey::hash> publishDataMap_;
uint64_t nextTrackId_{0};

Expand Down

0 comments on commit 568cd84

Please sign in to comment.