From dd8d49b9be2e1b2a07ed4d07954ffb87f7f86d8b Mon Sep 17 00:00:00 2001
From: Henri Dickson <90480431+alphatownsman@users.noreply.github.com>
Date: Tue, 2 Jan 2024 14:26:45 -0500
Subject: [PATCH 1/4] relay: manage relays
---
takahe/urls.py | 5 +
templates/admin/_menu.html | 4 +
templates/admin/relays.html | 53 ++++++++++
users/migrations/0023_add_relay.py | 60 ++++++++++++
users/models/__init__.py | 1 +
users/models/inbox_message.py | 12 ++-
users/models/relay.py | 150 +++++++++++++++++++++++++++++
users/views/admin/__init__.py | 1 +
users/views/admin/relays.py | 31 ++++++
9 files changed, 314 insertions(+), 3 deletions(-)
create mode 100644 templates/admin/relays.html
create mode 100644 users/migrations/0023_add_relay.py
create mode 100644 users/models/relay.py
create mode 100644 users/views/admin/relays.py
diff --git a/takahe/urls.py b/takahe/urls.py
index cb833ae92..63cd0bfa3 100644
--- a/takahe/urls.py
+++ b/takahe/urls.py
@@ -153,6 +153,11 @@
admin.FederationEdit.as_view(),
name="admin_federation_edit",
),
+ path(
+ "admin/relays/",
+ admin.RelaysRoot.as_view(),
+ name="admin_relays",
+ ),
path(
"admin/users/",
admin.UsersRoot.as_view(),
diff --git a/templates/admin/_menu.html b/templates/admin/_menu.html
index 0b0ec10f3..07fe06aef 100644
--- a/templates/admin/_menu.html
+++ b/templates/admin/_menu.html
@@ -45,6 +45,10 @@
Administration
Federation
+
+
+ Relays
+
Users
diff --git a/templates/admin/relays.html b/templates/admin/relays.html
new file mode 100644
index 000000000..9038dfd7e
--- /dev/null
+++ b/templates/admin/relays.html
@@ -0,0 +1,53 @@
+{% extends "admin/base_main.html" %}
+{% load activity_tags %}
+{% block subtitle %}Relay{% endblock %}
+{% block settings_content %}
+
+
+ {% for relay in page_obj %}
+
+
+ {% if relay.state == 'subscribed' %}
+
+ {% elif relay.state == 'failed' or relay.state == 'rejected' or relay.state == 'unsubscribed' %}
+
+ {% else %}
+
+ {% endif %}
+ |
+ {{ relay.inbox_uri }} |
+ {{ relay.state }} |
+
+
+ |
+
+
+ |
+
+ {% empty %}
+
+ There are no relay yet. |
+
+ {% endfor %}
+
+
+ Use remove only when it's stuck in (un)subscribing state for more than 10 minutes.
+
+ {% include "admin/_pagination.html" with nouns="relay,relays" %}
+{% endblock %}
diff --git a/users/migrations/0023_add_relay.py b/users/migrations/0023_add_relay.py
new file mode 100644
index 000000000..d215ea8bc
--- /dev/null
+++ b/users/migrations/0023_add_relay.py
@@ -0,0 +1,60 @@
+# Generated by Django 4.2.8 on 2024-01-02 16:20
+
+from django.db import migrations, models
+
+import stator.models
+import users.models.relay
+
+
+class Migration(migrations.Migration):
+ dependencies = [
+ ("users", "0022_follow_request"),
+ ]
+
+ operations = [
+ migrations.CreateModel(
+ name="Relay",
+ fields=[
+ (
+ "id",
+ models.BigAutoField(
+ auto_created=True,
+ primary_key=True,
+ serialize=False,
+ verbose_name="ID",
+ ),
+ ),
+ ("state_changed", models.DateTimeField(auto_now_add=True)),
+ ("state_next_attempt", models.DateTimeField(blank=True, null=True)),
+ (
+ "state_locked_until",
+ models.DateTimeField(blank=True, db_index=True, null=True),
+ ),
+ ("inbox_uri", models.CharField(max_length=500, unique=True)),
+ (
+ "state",
+ stator.models.StateField(
+ choices=[
+ ("new", "new"),
+ ("subscribed", "subscribed"),
+ ("unsubscribing", "unsubscribing"),
+ ("unsubscribed", "unsubscribed"),
+ ],
+ default="new",
+ graph=users.models.relay.RelayStates,
+ max_length=100,
+ ),
+ ),
+ ("created", models.DateTimeField(auto_now_add=True)),
+ ("updated", models.DateTimeField(auto_now=True)),
+ ],
+ options={
+ "indexes": [
+ models.Index(
+ fields=["state", "state_next_attempt", "state_locked_until"],
+ name="ix_relay_state_next",
+ )
+ ],
+ },
+ ),
+ ]
diff --git a/users/models/__init__.py b/users/models/__init__.py
index 8396e424e..be5b97972 100644
--- a/users/models/__init__.py
+++ b/users/models/__init__.py
@@ -8,6 +8,7 @@
from .inbox_message import InboxMessage, InboxMessageStates # noqa
from .invite import Invite # noqa
from .password_reset import PasswordReset # noqa
+from .relay import Relay, RelayStates # noqa
from .report import Report # noqa
from .system_actor import SystemActor # noqa
from .user import User # noqa
diff --git a/users/models/inbox_message.py b/users/models/inbox_message.py
index 5609a77f3..61a61ed0b 100644
--- a/users/models/inbox_message.py
+++ b/users/models/inbox_message.py
@@ -16,7 +16,7 @@ class InboxMessageStates(StateGraph):
@classmethod
def handle_received(cls, instance: "InboxMessage"):
from activities.models import Post, PostInteraction, TimelineEvent
- from users.models import Block, Follow, Identity, Report
+ from users.models import Block, Follow, Identity, Relay, Report
from users.services import IdentityService
try:
@@ -68,7 +68,10 @@ def handle_received(cls, instance: "InboxMessage"):
case "accept":
match instance.message_object_type:
case "follow":
- Follow.handle_accept_ap(instance.message)
+ if Relay.is_ap_message_for_relay(instance.message):
+ Relay.handle_accept_ap(instance.message)
+ else:
+ Follow.handle_accept_ap(instance.message)
case None:
# It's a string object, but these will only be for Follows
Follow.handle_accept_ap(instance.message)
@@ -77,7 +80,10 @@ def handle_received(cls, instance: "InboxMessage"):
case "reject":
match instance.message_object_type:
case "follow":
- Follow.handle_reject_ap(instance.message)
+ if Relay.is_ap_message_for_relay(instance.message):
+ Relay.handle_reject_ap(instance.message)
+ else:
+ Follow.handle_reject_ap(instance.message)
case None:
# It's a string object, but these will only be for Follows
Follow.handle_reject_ap(instance.message)
diff --git a/users/models/relay.py b/users/models/relay.py
new file mode 100644
index 000000000..1bce4b264
--- /dev/null
+++ b/users/models/relay.py
@@ -0,0 +1,150 @@
+import logging
+import re
+from typing import Optional
+
+import httpx
+from django.db import models, transaction
+
+from core.ld import canonicalise, get_str_or_id
+from core.snowflake import Snowflake
+from stator.models import State, StateField, StateGraph, StatorModel
+from users.models.system_actor import SystemActor
+
+logger = logging.getLogger(__name__)
+
+
+class RelayStates(StateGraph):
+ new = State(try_interval=600)
+ subscribing = State(externally_progressed=True)
+ subscribed = State(externally_progressed=True)
+ failed = State(externally_progressed=True)
+ rejected = State(externally_progressed=True)
+ unsubscribing = State(try_interval=600)
+ unsubscribed = State(delete_after=1)
+
+ new.transitions_to(subscribing)
+ new.transitions_to(unsubscribing)
+ new.transitions_to(failed)
+ new.times_out_to(failed, seconds=38400)
+ subscribing.transitions_to(subscribed)
+ subscribing.transitions_to(unsubscribing)
+ subscribing.transitions_to(unsubscribed)
+ subscribing.transitions_to(rejected)
+ subscribing.transitions_to(failed)
+ subscribed.transitions_to(unsubscribing)
+ subscribed.transitions_to(rejected)
+ failed.transitions_to(unsubscribed)
+ rejected.transitions_to(unsubscribed)
+ unsubscribing.transitions_to(failed)
+ unsubscribing.transitions_to(unsubscribed)
+ unsubscribing.times_out_to(failed, seconds=38400)
+
+ @classmethod
+ def handle_new(cls, instance: "Relay"):
+ system_actor = SystemActor()
+ try:
+ response = system_actor.signed_request(
+ method="post",
+ uri=instance.inbox_uri,
+ body=instance.to_follow_ap(),
+ )
+ except:
+ logger.warning(f"Error sending follow: {instance.inbox_uri}")
+ return cls.failed
+ if response.status_code >= 200 and response.status_code < 300:
+ return cls.subscribing
+ else:
+ logger.error(f"Follow {instance.inbox_uri} HTTP {response.status_code}")
+ return cls.failed
+
+ @classmethod
+ def handle_unsubscribing(cls, instance: "Relay"):
+ system_actor = SystemActor()
+ try:
+ response = system_actor.signed_request(
+ method="post",
+ uri=instance.inbox_uri,
+ body=instance.to_unfollow_ap(),
+ )
+ except:
+ logger.error(f"Error sending unfollow {instance.inbox_uri}")
+ return cls.failed
+ if response.status_code >= 200 and response.status_code < 300:
+ return cls.unsubscribed
+ else:
+ logger.error(f"Unfollow {instance.inbox_uri} HTTP {response.status_code}")
+ return cls.failed
+
+
+class Relay(StatorModel):
+ inbox_uri = models.CharField(max_length=500, unique=True)
+
+ state = StateField(RelayStates)
+
+ created = models.DateTimeField(auto_now_add=True)
+ updated = models.DateTimeField(auto_now=True)
+
+ class Meta:
+ indexes: list = []
+
+ @classmethod
+ def active_inbox_uris(cls):
+ return list(
+ cls.objects.filter(state=RelayStates.subscribed).values_list(
+ "inbox_uri", flat=True
+ )
+ )
+
+ @classmethod
+ def subscribe(cls, inbox_uri: str) -> "Relay":
+ return cls.objects.get_or_create(inbox_uri=inbox_uri.strip())[0]
+
+ def unsubscribe(self):
+ self.transition_perform(RelayStates.unsubscribing)
+
+ def force_unsubscribe(self):
+ self.transition_perform(RelayStates.unsubscribed)
+
+ def to_follow_ap(self):
+ system_actor = SystemActor()
+ return { # skip canonicalise here to keep Public addressing as full URI
+ "@context": ["https://www.w3.org/ns/activitystreams"],
+ "id": f"{system_actor.actor_uri}relay/{self.pk}/#follow",
+ "type": "Follow",
+ "actor": system_actor.actor_uri,
+ "object": "https://www.w3.org/ns/activitystreams#Public",
+ }
+
+ def to_unfollow_ap(self):
+ system_actor = SystemActor()
+ return { # skip canonicalise here to keep Public addressing as full URI
+ "@context": ["https://www.w3.org/ns/activitystreams"],
+ "id": f"{system_actor.actor_uri}relay/{self.pk}/#unfollow",
+ "type": "Undo",
+ "actor": system_actor.actor_uri,
+ "object": self.to_follow_ap(),
+ }
+
+ @classmethod
+ def is_ap_message_for_relay(cls, message) -> bool:
+ return (
+ re.match(r".+/relay/(\d+)/#(follow|unfollow)$", message["object"]["id"])
+ is not None
+ )
+
+ @classmethod
+ def get_by_ap(cls, message) -> "Relay":
+ m = re.match(r".+/relay/(\d+)/#(follow|unfollow)$", message["object"]["id"])
+ if not m:
+ raise ValueError("Not a valid relay follow response")
+ return cls.objects.get(pk=int(m[1]))
+
+ @classmethod
+ def handle_accept_ap(cls, message):
+ relay = cls.get_by_ap(message)
+ relay.transition_perform(RelayStates.subscribed)
+
+ @classmethod
+ def handle_reject_ap(cls, message):
+ relay = cls.get_by_ap(message)
+ relay.transition_perform(RelayStates.rejected)
diff --git a/users/views/admin/__init__.py b/users/views/admin/__init__.py
index 3330b1bf8..d66384c29 100644
--- a/users/views/admin/__init__.py
+++ b/users/views/admin/__init__.py
@@ -31,6 +31,7 @@
from users.views.admin.hashtags import HashtagEdit, HashtagEnable, Hashtags # noqa
from users.views.admin.identities import IdentitiesRoot, IdentityEdit # noqa
from users.views.admin.invites import InviteCreate, InvitesRoot, InviteView # noqa
+from users.views.admin.relays import RelaysRoot # noqa
from users.views.admin.reports import ReportsRoot, ReportView # noqa
from users.views.admin.settings import ( # noqa
BasicSettings,
diff --git a/users/views/admin/relays.py b/users/views/admin/relays.py
new file mode 100644
index 000000000..e6ed3d05d
--- /dev/null
+++ b/users/views/admin/relays.py
@@ -0,0 +1,31 @@
+from django.db import models
+from django.shortcuts import redirect
+from django.utils.decorators import method_decorator
+from django.views.generic import ListView
+
+from users.decorators import admin_required
+from users.models import Identity, Relay
+
+
+@method_decorator(admin_required, name="dispatch")
+class RelaysRoot(ListView):
+ template_name = "admin/relays.html"
+ paginate_by = 30
+
+ def get(self, request, *args, **kwargs):
+ self.extra_context = {
+ "section": "relays",
+ }
+ return super().get(request, *args, **kwargs)
+
+ def get_queryset(self):
+ return Relay.objects.all().order_by("-id")
+
+ def post(self, request, *args, **kwargs):
+ if "subscribe" in request.GET:
+ Relay.subscribe(request.POST.get("inbox_uri"))
+ elif "unsubscribe" in request.GET:
+ Relay.objects.get(pk=int(request.POST.get("id"))).unsubscribe()
+ elif "remove" in request.GET:
+ Relay.objects.get(pk=int(request.POST.get("id"))).force_unsubscribe()
+ return redirect(".")
From 32200e5868917831189bee3357fd2e3bdeed4fb5 Mon Sep 17 00:00:00 2001
From: Henri Dickson <90480431+alphatownsman@users.noreply.github.com>
Date: Tue, 2 Jan 2024 17:48:25 -0500
Subject: [PATCH 2/4] relay: fanout public posts to relays
---
activities/models/post.py | 26 ++++++++++++++++++++++++++
1 file changed, 26 insertions(+)
diff --git a/activities/models/post.py b/activities/models/post.py
index fef4db4e3..1fc6a39bf 100644
--- a/activities/models/post.py
+++ b/activities/models/post.py
@@ -40,10 +40,12 @@
from core.snowflake import Snowflake
from stator.exceptions import TryAgainLater
from stator.models import State, StateField, StateGraph, StatorModel
+from users.models import relay
from users.models.follow import FollowStates
from users.models.hashtag_follow import HashtagFollow
from users.models.identity import Identity, IdentityStates
from users.models.inbox_message import InboxMessage
+from users.models.relay import Relay
from users.models.system_actor import SystemActor
logger = logging.getLogger(__name__)
@@ -77,6 +79,30 @@ def targets_fan_out(cls, post: "Post", type_: str) -> None:
type=type_,
subject_post=post,
)
+ cls.fan_out_to_relay(post, type_)
+
+ @classmethod
+ def fan_out_to_relay(cls, post: "Post", type_: str) -> None:
+ if not post.local or post.visibility != Post.Visibilities.public:
+ return
+ relay_uris = Relay.active_inbox_uris()
+ if not relay_uris:
+ return
+ obj = None
+ match type_:
+ case FanOut.Types.post:
+ obj = canonicalise(post.to_create_ap())
+ case FanOut.Types.post_edited:
+ obj = canonicalise(post.to_update_ap())
+ case FanOut.Types.post_deleted:
+ obj = canonicalise(post.to_delete_ap())
+ if not obj:
+ return
+ for uri in relay_uris:
+ try:
+ post.author.signed_request(method="post", uri=uri, body=obj)
+ except Exception as e:
+ logger.warning(f"Error sending relay: {uri} {e}")
@classmethod
def handle_new(cls, instance: "Post"):
From 0f4dc3b19fd81e206f9834b1bf710f2ce4649730 Mon Sep 17 00:00:00 2001
From: Henri Dickson <90480431+alphatownsman@users.noreply.github.com>
Date: Tue, 2 Jan 2024 18:21:29 -0500
Subject: [PATCH 3/4] remove unused imports
---
activities/models/post.py | 1 -
users/models/relay.py | 6 +-----
users/views/admin/relays.py | 3 +--
3 files changed, 2 insertions(+), 8 deletions(-)
diff --git a/activities/models/post.py b/activities/models/post.py
index 1fc6a39bf..f907f1cf3 100644
--- a/activities/models/post.py
+++ b/activities/models/post.py
@@ -40,7 +40,6 @@
from core.snowflake import Snowflake
from stator.exceptions import TryAgainLater
from stator.models import State, StateField, StateGraph, StatorModel
-from users.models import relay
from users.models.follow import FollowStates
from users.models.hashtag_follow import HashtagFollow
from users.models.identity import Identity, IdentityStates
diff --git a/users/models/relay.py b/users/models/relay.py
index 1bce4b264..ae7edb04d 100644
--- a/users/models/relay.py
+++ b/users/models/relay.py
@@ -1,12 +1,8 @@
import logging
import re
-from typing import Optional
-import httpx
-from django.db import models, transaction
+from django.db import models
-from core.ld import canonicalise, get_str_or_id
-from core.snowflake import Snowflake
from stator.models import State, StateField, StateGraph, StatorModel
from users.models.system_actor import SystemActor
diff --git a/users/views/admin/relays.py b/users/views/admin/relays.py
index e6ed3d05d..19be5ebf2 100644
--- a/users/views/admin/relays.py
+++ b/users/views/admin/relays.py
@@ -1,10 +1,9 @@
-from django.db import models
from django.shortcuts import redirect
from django.utils.decorators import method_decorator
from django.views.generic import ListView
from users.decorators import admin_required
-from users.models import Identity, Relay
+from users.models import Relay
@method_decorator(admin_required, name="dispatch")
From fb02bf8492e212703f627c41070c6d7d4703bd87 Mon Sep 17 00:00:00 2001
From: Henri Dickson <90480431+alphatownsman@users.noreply.github.com>
Date: Tue, 2 Jan 2024 18:28:40 -0500
Subject: [PATCH 4/4] fix precommit checks
---
users/models/relay.py | 8 ++++----
1 file changed, 4 insertions(+), 4 deletions(-)
diff --git a/users/models/relay.py b/users/models/relay.py
index ae7edb04d..8d2932fca 100644
--- a/users/models/relay.py
+++ b/users/models/relay.py
@@ -44,8 +44,8 @@ def handle_new(cls, instance: "Relay"):
uri=instance.inbox_uri,
body=instance.to_follow_ap(),
)
- except:
- logger.warning(f"Error sending follow: {instance.inbox_uri}")
+ except Exception as e:
+ logger.error(f"Error sending follow request: {instance.inbox_uri} {e}")
return cls.failed
if response.status_code >= 200 and response.status_code < 300:
return cls.subscribing
@@ -62,8 +62,8 @@ def handle_unsubscribing(cls, instance: "Relay"):
uri=instance.inbox_uri,
body=instance.to_unfollow_ap(),
)
- except:
- logger.error(f"Error sending unfollow {instance.inbox_uri}")
+ except Exception as e:
+ logger.error(f"Error sending unfollow request: {instance.inbox_uri} {e}")
return cls.failed
if response.status_code >= 200 and response.status_code < 300:
return cls.unsubscribed