Skip to content

Commit

Permalink
Merge PR #6382: Backport "Merge PR #6372: FIX(server): Stale user poi…
Browse files Browse the repository at this point in the history
…nters in whisper cache " to 1.5.x
  • Loading branch information
Hartmnt authored Apr 9, 2024
2 parents dc1ac26 + 5fb748b commit ae3e59b
Show file tree
Hide file tree
Showing 4 changed files with 140 additions and 91 deletions.
26 changes: 15 additions & 11 deletions src/murmur/Messages.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2192,26 +2192,30 @@ void Server::msgVoiceTarget(ServerUser *uSource, MumbleProto::VoiceTarget &msg)
const MumbleProto::VoiceTarget_Target &t = msg.targets(i);
for (int j = 0; j < t.session_size(); ++j) {
unsigned int s = t.session(j);
if (qhUsers.contains(s))
wt.qlSessions << s;
if (qhUsers.contains(s)) {
wt.sessions.push_back(s);
}
}
if (t.has_channel_id()) {
unsigned int id = t.channel_id();
if (qhChannels.contains(id)) {
WhisperTarget::Channel wtc;
wtc.iId = static_cast< int >(id);
wtc.bChildren = t.children();
wtc.bLinks = t.links();
if (t.has_group())
wtc.qsGroup = u8(t.group());
wt.qlChannels << wtc;
wtc.id = id;
wtc.includeChildren = t.children();
wtc.includeLinks = t.links();
if (t.has_group()) {
wtc.targetGroup = u8(t.group());
}

wt.channels.push_back(wtc);
}
}
}
if (wt.qlSessions.isEmpty() && wt.qlChannels.isEmpty())
if (wt.sessions.empty() && wt.channels.empty()) {
uSource->qmTargets.remove(target);
else
uSource->qmTargets.insert(target, wt);
} else {
uSource->qmTargets.insert(target, std::move(wt));
}
}
}

Expand Down
188 changes: 114 additions & 74 deletions src/murmur/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1230,90 +1230,31 @@ void Server::processMsg(ServerUser *u, Mumble::Protocol::AudioData audioData, Au
} else {
ZoneScopedN(TracyConstants::AUDIO_WHISPER_CACHE_CREATE);

const WhisperTarget &wt = u->qmTargets.value(static_cast< int >(audioData.targetOrContext));
if (!wt.qlChannels.isEmpty()) {
QMutexLocker qml(&qmCache);

foreach (const WhisperTarget::Channel &wtc, wt.qlChannels) {
Channel *wc = qhChannels.value(static_cast< unsigned int >(wtc.iId));
if (wc) {
bool link = wtc.bLinks && !wc->qhLinks.isEmpty();
bool dochildren = wtc.bChildren && !wc->qlChannels.isEmpty();
bool group = !wtc.qsGroup.isEmpty();
if (!link && !dochildren && !group) {
// Common case
if (ChanACL::hasPermission(u, wc, ChanACL::Whisper, &acCache)) {
foreach (User *p, wc->qlUsers) { channel.insert(static_cast< ServerUser * >(p)); }

foreach (unsigned int currentSession,
m_channelListenerManager.getListenersForChannel(wc->iId)) {
ServerUser *pDst = static_cast< ServerUser * >(qhUsers.value(currentSession));

if (pDst) {
addListener(cachedListeners, *pDst, *wc);
}
}
}
} else {
QSet< Channel * > channels;
if (link)
channels = wc->allLinks();
else
channels.insert(wc);
if (dochildren)
channels.unite(wc->allChildren());
const QString &redirect = u->qmWhisperRedirect.value(wtc.qsGroup);
const QString &qsg = redirect.isEmpty() ? wtc.qsGroup : redirect;
foreach (Channel *tc, channels) {
if (ChanACL::hasPermission(u, tc, ChanACL::Whisper, &acCache)) {
foreach (User *p, tc->qlUsers) {
ServerUser *su = static_cast< ServerUser * >(p);

if (!group || Group::appliesToUser(*tc, *tc, qsg, *su)) {
channel.insert(su);
}
}

foreach (unsigned int currentSession,
m_channelListenerManager.getListenersForChannel(tc->iId)) {
ServerUser *pDst = static_cast< ServerUser * >(qhUsers.value(currentSession));

if (pDst && (!group || Group::appliesToUser(*tc, *tc, qsg, *pDst))) {
// Only send audio to listener if the user exists and it is in the group the
// speech is directed at (if any)
addListener(cachedListeners, *pDst, *tc);
}
}
}
}
}
}
}
const unsigned int uiSession = u->uiSession;
qrwlVoiceThread.unlock();
qrwlVoiceThread.lockForWrite();

if (!qhUsers.contains(uiSession)) {
return;
}

{
QMutexLocker qml(&qmCache);
// Create cache entry for the given target
// Note: We have to compute the cache entry and add it to the user's cache store in an atomic
// transaction (ensured by the lock) to avoid running into situations in which a user from the cache
// gets deleted without this particular cache entry being purged (which happens, if the cache entry is
// in the store at the point of deleting the user).
const WhisperTarget &wt = u->qmTargets.value(static_cast< int >(audioData.targetOrContext));
WhisperTargetCache cache = createWhisperTargetCacheFor(*u, wt);

foreach (unsigned int id, wt.qlSessions) {
ServerUser *pDst = qhUsers.value(id);
if (pDst && ChanACL::hasPermission(u, pDst->cChannel, ChanACL::Whisper, &acCache)
&& !channel.contains(pDst))
direct.insert(pDst);
}
}
u->qmTargetCache.insert(static_cast< int >(audioData.targetOrContext), std::move(cache));

unsigned int uiSession = u->uiSession;
qrwlVoiceThread.unlock();
qrwlVoiceThread.lockForWrite();

if (qhUsers.contains(uiSession))
u->qmTargetCache.insert(static_cast< int >(audioData.targetOrContext),
{ channel, direct, cachedListeners });
qrwlVoiceThread.unlock();
qrwlVoiceThread.lockForRead();
if (!qhUsers.contains(uiSession))
return;
}

// These users receive the audio because someone is shouting to their channel
for (ServerUser *pDst : channel) {
buffer.addReceiver(*u, *pDst, Mumble::Protocol::AudioContext::SHOUT, audioData.containsPositionalData);
Expand Down Expand Up @@ -2412,5 +2353,104 @@ bool Server::canNest(Channel *newParent, Channel *channel) const {
return (parentLevel + channelDepth) < iChannelNestingLimit;
}

WhisperTargetCache Server::createWhisperTargetCacheFor(ServerUser &speaker, const WhisperTarget &target) {
ZoneScoped;

QMutexLocker qml(&qmCache);

WhisperTargetCache cache;

if (!target.channels.empty()) {
for (const WhisperTarget::Channel &currentTarget : target.channels) {
Channel *targetChannel = qhChannels.value(currentTarget.id);

if (targetChannel) {
bool includeLinks = currentTarget.includeLinks && !targetChannel->qhLinks.isEmpty();
bool includeChildren = currentTarget.includeChildren && !targetChannel->qlChannels.isEmpty();
bool restrictToGroup = !currentTarget.targetGroup.isEmpty();

if (!includeLinks && !includeChildren && !restrictToGroup) {
// Common case
if (ChanACL::hasPermission(&speaker, targetChannel, ChanACL::Whisper, &acCache)) {
for (User *p : targetChannel->qlUsers) {
// Add users of the target channel
cache.channelTargets.insert(static_cast< ServerUser * >(p));
}

for (unsigned int currentSession :
m_channelListenerManager.getListenersForChannel(targetChannel->iId)) {
// Add users that listen to the target channel (duplicates with users directly
// in this channel are handled further down)
ServerUser *pDst = static_cast< ServerUser * >(qhUsers.value(currentSession));

if (pDst) {
addListener(cache.listeningTargets, *pDst, *targetChannel);
}
}
}
} else {
QSet< Channel * > channels;

if (includeLinks) {
// allLinks contains the channel itself
channels = targetChannel->allLinks();
} else {
channels.insert(targetChannel);
}

if (includeChildren) {
channels.unite(targetChannel->allChildren());
}

// The target group might be changed by a redirect set up via RPC (Ice/gRPC). In that
// case the shout is sent to the redirection target instead the originally specified group
const QString &redirect = speaker.qmWhisperRedirect.value(currentTarget.targetGroup);
const QString &targetGroup = redirect.isEmpty() ? currentTarget.targetGroup : redirect;

for (Channel *subTargetChan : channels) {
if (ChanACL::hasPermission(&speaker, subTargetChan, ChanACL::Whisper, &acCache)) {
for (User *p : subTargetChan->qlUsers) {
ServerUser *su = static_cast< ServerUser * >(p);

if (!restrictToGroup
|| Group::appliesToUser(*subTargetChan, *subTargetChan, targetGroup, *su)) {
cache.channelTargets.insert(su);
}
}

for (unsigned int currentSession :
m_channelListenerManager.getListenersForChannel(subTargetChan->iId)) {
ServerUser *pDst = static_cast< ServerUser * >(qhUsers.value(currentSession));

if (pDst
&& (!restrictToGroup
|| Group::appliesToUser(*subTargetChan, *subTargetChan, targetGroup, *pDst))) {
// Only send audio to listener if the user exists and it is in the group the
// speech is directed at (if any)
addListener(cache.listeningTargets, *pDst, *subTargetChan);
}
}
}
}
}
}
}
}

for (unsigned int id : target.sessions) {
ServerUser *pDst = qhUsers.value(id);
if (pDst && ChanACL::hasPermission(&speaker, pDst->cChannel, ChanACL::Whisper, &acCache)
&& !cache.channelTargets.contains(pDst))
cache.directTargets.insert(pDst);
}

// Make sure the speaker themselves is not contained in these lists
cache.channelTargets.remove(&speaker);
cache.directTargets.remove(&speaker);
cache.listeningTargets.remove(&speaker);

return cache;
}

#undef SIO_UDP_CONNRESET
#undef SENDTO
2 changes: 2 additions & 0 deletions src/murmur/Server.h
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,8 @@ class Server : public QThread {
QTimer qtTick;
void initRegister();

WhisperTargetCache createWhisperTargetCacheFor(ServerUser &speaker, const WhisperTarget &target);

private:
int iChannelNestingLimit;
int iChannelCountLimit;
Expand Down
15 changes: 9 additions & 6 deletions src/murmur/ServerUser.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
# include <sys/socket.h>
#endif

#include <vector>

// Unfortunately, this needs to be "large enough" to hold
// enough frames to account for both short-term and
// long-term "maladjustments".
Expand All @@ -52,13 +54,14 @@ struct BandwidthRecord {

struct WhisperTarget {
struct Channel {
int iId;
bool bChildren;
bool bLinks;
QString qsGroup;
unsigned int id;
bool includeChildren;
bool includeLinks;
QString targetGroup;
};
QList< unsigned int > qlSessions;
QList< WhisperTarget::Channel > qlChannels;

std::vector< unsigned int > sessions;
std::vector< WhisperTarget::Channel > channels;
};

class ServerUser;
Expand Down

0 comments on commit ae3e59b

Please sign in to comment.