Skip to content

Commit

Permalink
Track subscriptions per backend; update logic after discussion
Browse files Browse the repository at this point in the history
  • Loading branch information
Syndace committed Sep 30, 2024
1 parent 3153a29 commit f56ec6e
Showing 1 changed file with 125 additions and 51 deletions.
176 changes: 125 additions & 51 deletions slixmpp_omemo/xep_0384.py
Original file line number Diff line number Diff line change
Expand Up @@ -737,69 +737,134 @@ async def _on_subscription_changed(self, presence: Presence) -> None:
presence: The presence stanza triggering this callback.
"""

# TODO: There is currently no way to untrack a JID, for example in case an account is deleted or
# blocked.

jid = JID(presence["from"].bare)

roster: RosterNode = self.xmpp.client_roster

pep_enabled = jid in roster and roster[jid]["subscription"] == "both"
subscribed = (await self.storage.load_primitive(f"/slixmpp/subscribed/{jid}", bool)).maybe(None)

if subscribed is None:
# This JID is not tracked.
return
for namespace in [ twomemo.twomemo.NAMESPACE, oldmemo.oldmemo.NAMESPACE ]:
subscribed = (await self.storage.load_primitive(
f"/slixmpp/subscribed/{jid}/{namespace}",
bool
)).maybe(None)

# Remove manual subscriptions if PEP is enabled now
if pep_enabled and subscribed:
await self._unsubscribe(jid)
if subscribed is None:
# This JID is not tracked.
return

# Add a manual subscription if PEP is disabled now
if not pep_enabled and not subscribed:
await self._subscribe(jid)
# Remove manual subscriptions if PEP is enabled now
if pep_enabled and subscribed:
await self._unsubscribe(namespace, jid)

async def _subscribe(self, jid: JID) -> None:
# Add a manual subscription if PEP is disabled now
if not pep_enabled and not subscribed:
await self._subscribe(namespace, jid)

async def _subscribe(self, namespace: str, jid: JID) -> None:
"""
Manually subscribe to the device list pubsub nodes of the JID and track the subscription status.
Manually subscribe to the device list pubsub node of the JID and track the subscription status.
Args:
jid: The JID whose device lists to manually subscribe to. Can be a bare (aka "userhost") JID but
namespace: The OMEMO version namespace (not the node).
jid: The JID whose device list to manually subscribe to. Can be a bare (aka "userhost") JID but
doesn't have to.
"""

jid = JID(jid.bare)

node = {
twomemo.twomemo.NAMESPACE: TWOMEMO_DEVICE_LIST_NODE,
oldmemo.oldmemo.NAMESPACE: OLDMEMO_DEVICE_LIST_NODE
}.get(namespace, None)

if node is None:
raise UnknownNamespace(f"Unknown namespace during device list subscription: {namespace}")

xep_0060: XEP_0060 = self.xmpp["xep_0060"]
xep_0060.subscribe(jid, OLDMEMO_DEVICE_LIST_NODE)
xep_0060.subscribe(jid, TWOMEMO_DEVICE_LIST_NODE)

await self.storage.store(f"/slixmpp/subscribed/{jid}", True)
try:
xep_0060.subscribe(jid, node)
except IqError as e:
# Failure to subscribe is non-critical here, simply debug log the error (and don't update the
# subscription status).
log.debug(f"Couldn't subscribe to {namespace} device list of {jid.bare}", exc_info=e)
else:
await self.storage.store(f"/slixmpp/subscribed/{jid.bare}/{namespace}", True)

async def _unsubscribe(self, jid: JID) -> None:
async def _unsubscribe(self, namespace: str, jid: JID) -> None:
"""
Manually unsubscribe from the device list pubsub nodes of the JID and track the subscription status.
Manually unsubscribe from the device list pubsub node of the JID and track the subscription status.
Args:
jid: The JID whose device lists to manually unsubscribe from. Can be a bare (aka "userhost") JID
namespace: The OMEMO version namespace (not the node).
jid: The JID whose device list to manually unsubscribe from. Can be a bare (aka "userhost") JID
but doesn't have to.
"""

jid = JID(jid.bare)

node = {
twomemo.twomemo.NAMESPACE: TWOMEMO_DEVICE_LIST_NODE,
oldmemo.oldmemo.NAMESPACE: OLDMEMO_DEVICE_LIST_NODE
}.get(namespace, None)

if node is None:
raise UnknownNamespace(f"Unknown namespace during device list unsubscription: {namespace}")

xep_0060: XEP_0060 = self.xmpp["xep_0060"]
xep_0060.unsubscribe(jid, OLDMEMO_DEVICE_LIST_NODE)
xep_0060.unsubscribe(jid, TWOMEMO_DEVICE_LIST_NODE)

await self.storage.store(f"/slixmpp/subscribed/{jid}", False)
try:
xep_0060.unsubscribe(jid, node)
except IqError as e:
# Don't really care about any of the possible Iq error cases:
# https://xmpp.org/extensions/xep-0060.html#subscriber-unsubscribe-error
# Worst case we keep receiving updates we don't need.
log.debug(f"Couldn't unsubscribe from {namespace} device list of {jid.bare}", exc_info=e)

await self.storage.store(f"/slixmpp/subscribed/{jid.bare}/{namespace}", False)

async def refresh_device_lists(self, jid: JID, force_download: bool = False) -> None:
async def _fetch_subscription_status(self, namespace: str, jid: JID) -> Optional[bool]:
"""
Ensure that up-to-date device lists for the JID are cached. This is done automatically by
:meth:`encrypt_message`; you shouldn't need to manually call this method.
Fetches the subscription status to the device list pubsub node of the JID, updates the status in
storage and returns it.
Args:
jid: The JID whose device lists to refresh. Can be a bare (aka "userhost") JID but doesn't have
namespace: The OMEMO version namespace (not the node).
jid: The JID whose device list to manually unsubscribe from. Can be a bare (aka "userhost") JID
but doesn't have to.
Returns:
The updated subscription status. Can still be ``None`` if the server doesn't support fetching
subscription status.
"""

jid = JID(jid.bare)

node = {
twomemo.twomemo.NAMESPACE: TWOMEMO_DEVICE_LIST_NODE,
oldmemo.oldmemo.NAMESPACE: OLDMEMO_DEVICE_LIST_NODE
}.get(namespace, None)

if node is None:
raise UnknownNamespace(f"Unknown namespace during device list unsubscription: {namespace}")

# xep_0060: XEP_0060 = self.xmpp["xep_0060"]

# TODO

return None

async def refresh_device_lists(self, jids: Set[JID], force_download: bool = False) -> None:
"""
Ensure that up-to-date device lists for the JIDs are cached. This is done automatically by
:meth:`encrypt_message`. You don't have to ever manually call this method, but you can do so for
optimization reasons. For example, in a UI-based IM application, this method can be called when an
OMEMO-enabled chat tab/window is opened, to be optimally prepared if the user decides to send an
encrypted message.
Args:
jids: The JIDs whose device lists to refresh. Can be bare (aka "userhost") JIDs but don't have
to.
force_download: Force downloading the device list even if pubsub/PEP are enabled to automatically
keep the cached device lists up-to-date.
Expand All @@ -809,32 +874,42 @@ async def refresh_device_lists(self, jid: JID, force_download: bool = False) ->
as-is.
"""

jid = JID(jid.bare)

session_manager = await self.get_session_manager()
storage = self.storage

roster: RosterNode = self.xmpp.client_roster

pep_enabled = jid in roster and roster[jid]["subscription"] == "both"
subscribed = (await storage.load_primitive(f"/slixmpp/subscribed/{jid}", bool)).maybe(False)
for jid in jids:
jid = JID(jid.bare)

if pep_enabled:
# If PEP is enabled, return unless the download is forced
if not force_download:
return
else:
# If PEP is not enabled, check whether manual subscription is enabled instead
if subscribed:
# If manual subscription is enabled, return unless the download is forced
if not force_download:
return
else:
# Otherwise, manually subscribe to stay up-to-date automatically in the future
await self._subscribe(jid)
# Track which namespaces require a manual refresh
refresh_namespaces: Set[str] = \
{ twomemo.twomemo.NAMESPACE, oldmemo.oldmemo.NAMESPACE } if force_download else set()

# PEP is "enabled" with mutual presence subscription and applies to all backends when enabled.
pep_enabled = jid in roster and roster[jid]["subscription"] == "both"

if not pep_enabled:
# If PEP is not enabled, check whether manual subscription is enabled instead. Manual
# subscription is tracked per-backend.
for namespace in [ twomemo.twomemo.NAMESPACE, oldmemo.oldmemo.NAMESPACE ]:
subscribed = (await storage.load_primitive(
f"/slixmpp/subscribed/{jid.bare}/{namespace}",
bool
)).maybe(None)

if subscribed is None:
# Subscription status unknown, ask the server
subscribed = await self._fetch_subscription_status(namespace, jid)

if not subscribed:
# If not subscribed already, manually subscribe to stay up-to-date automatically in
# the future
await self._subscribe(namespace, jid)
refresh_namespaces.add(namespace)

# Manually force-download all device lists
await session_manager.refresh_device_lists(jid.bare)
for namespace in refresh_namespaces:
# Force-download the device lists that need a manual refresh
await session_manager.refresh_device_list(namespace, jid.bare)

async def encrypt_message(
self,
Expand Down Expand Up @@ -889,8 +964,7 @@ async def encrypt_message(
raise ValueError("At least one JID must be specified")

# Make sure all recipient device lists are available
for recipient_jid in recipient_jids:
await self.refresh_device_lists(recipient_jid)
await self.refresh_device_lists(recipient_jids)

recipient_bare_jids = frozenset({ recipient_jid.bare for recipient_jid in recipient_jids })

Expand Down

0 comments on commit f56ec6e

Please sign in to comment.