Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FIX(server): Stale user pointers in whisper cache #6372

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 15 additions & 11 deletions src/murmur/Messages.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2192,26 +2192,30 @@ void Server::msgVoiceTarget(ServerUser *uSource, MumbleProto::VoiceTarget &msg)
const MumbleProto::VoiceTarget_Target &t = msg.targets(i);
for (int j = 0; j < t.session_size(); ++j) {
unsigned int s = t.session(j);
if (qhUsers.contains(s))
wt.qlSessions << s;
if (qhUsers.contains(s)) {
wt.sessions.push_back(s);
}
}
if (t.has_channel_id()) {
unsigned int id = t.channel_id();
if (qhChannels.contains(id)) {
WhisperTarget::Channel wtc;
wtc.iId = static_cast< int >(id);
wtc.bChildren = t.children();
wtc.bLinks = t.links();
if (t.has_group())
wtc.qsGroup = u8(t.group());
wt.qlChannels << wtc;
wtc.id = id;
wtc.includeChildren = t.children();
wtc.includeLinks = t.links();
if (t.has_group()) {
wtc.targetGroup = u8(t.group());
}

wt.channels.push_back(wtc);
}
}
}
if (wt.qlSessions.isEmpty() && wt.qlChannels.isEmpty())
if (wt.sessions.empty() && wt.channels.empty()) {
uSource->qmTargets.remove(target);
else
uSource->qmTargets.insert(target, wt);
} else {
uSource->qmTargets.insert(target, std::move(wt));
}
}
}

Expand Down
188 changes: 114 additions & 74 deletions src/murmur/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1230,90 +1230,31 @@ void Server::processMsg(ServerUser *u, Mumble::Protocol::AudioData audioData, Au
} else {
ZoneScopedN(TracyConstants::AUDIO_WHISPER_CACHE_CREATE);

const WhisperTarget &wt = u->qmTargets.value(static_cast< int >(audioData.targetOrContext));
if (!wt.qlChannels.isEmpty()) {
QMutexLocker qml(&qmCache);

foreach (const WhisperTarget::Channel &wtc, wt.qlChannels) {
Channel *wc = qhChannels.value(static_cast< unsigned int >(wtc.iId));
if (wc) {
bool link = wtc.bLinks && !wc->qhLinks.isEmpty();
bool dochildren = wtc.bChildren && !wc->qlChannels.isEmpty();
bool group = !wtc.qsGroup.isEmpty();
if (!link && !dochildren && !group) {
// Common case
if (ChanACL::hasPermission(u, wc, ChanACL::Whisper, &acCache)) {
foreach (User *p, wc->qlUsers) { channel.insert(static_cast< ServerUser * >(p)); }

foreach (unsigned int currentSession,
m_channelListenerManager.getListenersForChannel(wc->iId)) {
ServerUser *pDst = static_cast< ServerUser * >(qhUsers.value(currentSession));

if (pDst) {
addListener(cachedListeners, *pDst, *wc);
}
}
}
} else {
QSet< Channel * > channels;
if (link)
channels = wc->allLinks();
else
channels.insert(wc);
if (dochildren)
channels.unite(wc->allChildren());
const QString &redirect = u->qmWhisperRedirect.value(wtc.qsGroup);
const QString &qsg = redirect.isEmpty() ? wtc.qsGroup : redirect;
foreach (Channel *tc, channels) {
if (ChanACL::hasPermission(u, tc, ChanACL::Whisper, &acCache)) {
foreach (User *p, tc->qlUsers) {
ServerUser *su = static_cast< ServerUser * >(p);

if (!group || Group::appliesToUser(*tc, *tc, qsg, *su)) {
channel.insert(su);
}
}

foreach (unsigned int currentSession,
m_channelListenerManager.getListenersForChannel(tc->iId)) {
ServerUser *pDst = static_cast< ServerUser * >(qhUsers.value(currentSession));

if (pDst && (!group || Group::appliesToUser(*tc, *tc, qsg, *pDst))) {
// Only send audio to listener if the user exists and it is in the group the
// speech is directed at (if any)
addListener(cachedListeners, *pDst, *tc);
}
}
}
}
}
}
}
const unsigned int uiSession = u->uiSession;
qrwlVoiceThread.unlock();
qrwlVoiceThread.lockForWrite();

if (!qhUsers.contains(uiSession)) {
return;
}

{
QMutexLocker qml(&qmCache);
// Create cache entry for the given target
// Note: We have to compute the cache entry and add it to the user's cache store in an atomic
// transaction (ensured by the lock) to avoid running into situations in which a user from the cache
// gets deleted without this particular cache entry being purged (which happens, if the cache entry is
// in the store at the point of deleting the user).
const WhisperTarget &wt = u->qmTargets.value(static_cast< int >(audioData.targetOrContext));
WhisperTargetCache cache = createWhisperTargetCacheFor(*u, wt);

foreach (unsigned int id, wt.qlSessions) {
ServerUser *pDst = qhUsers.value(id);
if (pDst && ChanACL::hasPermission(u, pDst->cChannel, ChanACL::Whisper, &acCache)
&& !channel.contains(pDst))
direct.insert(pDst);
}
}
u->qmTargetCache.insert(static_cast< int >(audioData.targetOrContext), std::move(cache));

unsigned int uiSession = u->uiSession;
qrwlVoiceThread.unlock();
qrwlVoiceThread.lockForWrite();

if (qhUsers.contains(uiSession))
u->qmTargetCache.insert(static_cast< int >(audioData.targetOrContext),
{ channel, direct, cachedListeners });
qrwlVoiceThread.unlock();
qrwlVoiceThread.lockForRead();
if (!qhUsers.contains(uiSession))
return;
}

// These users receive the audio because someone is shouting to their channel
for (ServerUser *pDst : channel) {
buffer.addReceiver(*u, *pDst, Mumble::Protocol::AudioContext::SHOUT, audioData.containsPositionalData);
Expand Down Expand Up @@ -2412,5 +2353,104 @@ bool Server::canNest(Channel *newParent, Channel *channel) const {
return (parentLevel + channelDepth) < iChannelNestingLimit;
}

WhisperTargetCache Server::createWhisperTargetCacheFor(ServerUser &speaker, const WhisperTarget &target) {
ZoneScoped;

QMutexLocker qml(&qmCache);

WhisperTargetCache cache;

if (!target.channels.empty()) {
for (const WhisperTarget::Channel &currentTarget : target.channels) {
Channel *targetChannel = qhChannels.value(currentTarget.id);

if (targetChannel) {
bool includeLinks = currentTarget.includeLinks && !targetChannel->qhLinks.isEmpty();
bool includeChildren = currentTarget.includeChildren && !targetChannel->qlChannels.isEmpty();
bool restrictToGroup = !currentTarget.targetGroup.isEmpty();

if (!includeLinks && !includeChildren && !restrictToGroup) {
// Common case
if (ChanACL::hasPermission(&speaker, targetChannel, ChanACL::Whisper, &acCache)) {
for (User *p : targetChannel->qlUsers) {
// Add users of the target channel
cache.channelTargets.insert(static_cast< ServerUser * >(p));
}

for (unsigned int currentSession :
m_channelListenerManager.getListenersForChannel(targetChannel->iId)) {
// Add users that listen to the target channel (duplicates with users directly
// in this channel are handled further down)
ServerUser *pDst = static_cast< ServerUser * >(qhUsers.value(currentSession));

if (pDst) {
addListener(cache.listeningTargets, *pDst, *targetChannel);
}
}
}
} else {
QSet< Channel * > channels;

if (includeLinks) {
// allLinks contains the channel itself
channels = targetChannel->allLinks();
} else {
channels.insert(targetChannel);
}

if (includeChildren) {
channels.unite(targetChannel->allChildren());
}

// The target group might be changed by a redirect set up via RPC (Ice/gRPC). In that
// case the shout is sent to the redirection target instead the originally specified group
const QString &redirect = speaker.qmWhisperRedirect.value(currentTarget.targetGroup);
const QString &targetGroup = redirect.isEmpty() ? currentTarget.targetGroup : redirect;

for (Channel *subTargetChan : channels) {
if (ChanACL::hasPermission(&speaker, subTargetChan, ChanACL::Whisper, &acCache)) {
for (User *p : subTargetChan->qlUsers) {
ServerUser *su = static_cast< ServerUser * >(p);

if (!restrictToGroup
|| Group::appliesToUser(*subTargetChan, *subTargetChan, targetGroup, *su)) {
cache.channelTargets.insert(su);
}
}

for (unsigned int currentSession :
m_channelListenerManager.getListenersForChannel(subTargetChan->iId)) {
ServerUser *pDst = static_cast< ServerUser * >(qhUsers.value(currentSession));

if (pDst
&& (!restrictToGroup
|| Group::appliesToUser(*subTargetChan, *subTargetChan, targetGroup, *pDst))) {
// Only send audio to listener if the user exists and it is in the group the
// speech is directed at (if any)
addListener(cache.listeningTargets, *pDst, *subTargetChan);
}
}
}
}
}
}
}
}

for (unsigned int id : target.sessions) {
ServerUser *pDst = qhUsers.value(id);
if (pDst && ChanACL::hasPermission(&speaker, pDst->cChannel, ChanACL::Whisper, &acCache)
&& !cache.channelTargets.contains(pDst))
cache.directTargets.insert(pDst);
}

// Make sure the speaker themselves is not contained in these lists
cache.channelTargets.remove(&speaker);
cache.directTargets.remove(&speaker);
cache.listeningTargets.remove(&speaker);

return cache;
}

#undef SIO_UDP_CONNRESET
#undef SENDTO
2 changes: 2 additions & 0 deletions src/murmur/Server.h
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,8 @@ class Server : public QThread {
QTimer qtTick;
void initRegister();

WhisperTargetCache createWhisperTargetCacheFor(ServerUser &speaker, const WhisperTarget &target);

private:
int iChannelNestingLimit;
int iChannelCountLimit;
Expand Down
15 changes: 9 additions & 6 deletions src/murmur/ServerUser.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
# include <sys/socket.h>
#endif

#include <vector>

// Unfortunately, this needs to be "large enough" to hold
// enough frames to account for both short-term and
// long-term "maladjustments".
Expand All @@ -52,13 +54,14 @@ struct BandwidthRecord {

struct WhisperTarget {
struct Channel {
int iId;
bool bChildren;
bool bLinks;
QString qsGroup;
unsigned int id;
bool includeChildren;
bool includeLinks;
QString targetGroup;
};
QList< unsigned int > qlSessions;
QList< WhisperTarget::Channel > qlChannels;

std::vector< unsigned int > sessions;
std::vector< WhisperTarget::Channel > channels;
};

class ServerUser;
Expand Down
Loading