Skip to content

Commit

Permalink
Revert durable consumer
Browse files Browse the repository at this point in the history
  • Loading branch information
neilalexander committed Dec 11, 2024
1 parent be8a4ea commit 71e722a
Showing 1 changed file with 5 additions and 26 deletions.
31 changes: 5 additions & 26 deletions roomserver/internal/input/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,6 @@ func (r *Inputer) startWorkerForRoom(roomID string) {
}

consumerConfig := &nats.ConsumerConfig{
Name: consumer,
Durable: consumer,
AckPolicy: nats.AckExplicitPolicy,
DeliverPolicy: nats.DeliverAllPolicy,
Expand Down Expand Up @@ -228,43 +227,23 @@ func (r *Inputer) Start() error {
if r.EnableMetrics {
prometheus.MustRegister(roomserverInputBackpressure, processRoomEventDuration)
}

ccfg := &nats.ConsumerConfig{
Name: "supervisor",
Durable: "supervisor",
DeliverPolicy: nats.DeliverAllPolicy,
AckPolicy: nats.AckExplicitPolicy,
MaxDeliver: -1,
MaxAckPending: 1,
ReplayPolicy: nats.ReplayInstantPolicy,
}

if _, err := r.JetStream.AddConsumer(r.InputRoomEventTopic, ccfg); err != nil {
if !errors.Is(err, nats.ErrConsumerNameAlreadyInUse) {
return err
}
if _, err = r.JetStream.UpdateConsumer(r.InputRoomEventTopic, ccfg); err != nil {
return err
}
}

_, err := r.JetStream.Subscribe(
"", // This is blank because we specified Bind().
"", // This is blank because we specified it in BindStream.
func(m *nats.Msg) {
roomID := m.Header.Get(jetstream.RoomID)
r.startWorkerForRoom(roomID)
_ = m.AckSync()
},
nats.HeadersOnly(),
nats.Bind(r.InputRoomEventTopic, "supervisor"),
nats.BindStream(r.InputRoomEventTopic),
nats.OrderedConsumer(),
)

// Make sure that the room consumers have the right config.
stream := r.Cfg.Matrix.JetStream.Prefixed(jetstream.InputRoomEvent)
for consumer := range r.JetStream.Consumers(stream) {
switch {
case consumer.Config.Name == "supervisor" || consumer.Config.Durable == "":
continue // Ignore supervisor consumers
case consumer.Config.Durable == "":
continue // Ignore ephemeral consumers
case consumer.Config.InactiveThreshold != inactiveThreshold:
consumer.Config.InactiveThreshold = inactiveThreshold
if _, cerr := r.JetStream.UpdateConsumer(stream, &consumer.Config); cerr != nil {
Expand Down

0 comments on commit 71e722a

Please sign in to comment.