From 233781a987305690f4bd2323662207a255709b1f Mon Sep 17 00:00:00 2001 From: Daniel Haarhoff Date: Thu, 21 Nov 2024 15:36:08 +0000 Subject: [PATCH] Handle cases where we have failed to assign a DOI Refs: #2100 --- src/Comments/index.ts | 75 ++++++++++++++++++++++++++++--------------- 1 file changed, 50 insertions(+), 25 deletions(-) diff --git a/src/Comments/index.ts b/src/Comments/index.ts index 1fe3d98d0..2ca989824 100644 --- a/src/Comments/index.ts +++ b/src/Comments/index.ts @@ -1,4 +1,4 @@ -import { Array, Effect, Layer, Match, pipe, PubSub, Queue } from 'effect' +import { Array, Effect, flow, Layer, Match, pipe, PubSub, Queue, Schedule } from 'effect' import { EventStore } from '../Context.js' import { RequiresAVerifiedEmailAddress } from '../feature-flags.js' import type { Uuid } from '../types/index.js' @@ -132,36 +132,61 @@ export const ReactToCommentEvents: Layer.Layer< const eventStore = yield* EventStore const dequeue = yield* PubSub.subscribe(commentEvents) - yield* pipe( - Queue.take(dequeue), - Effect.andThen( - pipe( - Match.type<{ commentId: Uuid.Uuid; event: CommentEvent }>(), - Match.when({ event: { _tag: 'CommentWasStarted' } }, ({ commentId }) => - pipe( - React.CheckIfUserHasAVerifiedEmailAddress(commentId), - Effect.tapError(() => Effect.annotateLogs(Effect.logError('ReactToCommentEvents failed'), { commentId })), + yield* Effect.all( + [ + Effect.repeat( + pipe( + eventStore.getAllEvents, + Effect.andThen(events => Queries.GetACommentInNeedOfADoi(events)), + Effect.andThen( + flow( + React.AssignCommentADoiWhenPublicationWasRequested, + Effect.tapError(() => Effect.annotateLogs(Effect.logError('ReactToCommentEvents on timer failed'), {})), + ), ), + Effect.catchAll(() => Effect.void), ), - Match.when({ event: { _tag: 'CommentPublicationWasRequested' } }, ({ commentId }) => - pipe( - eventStore.getAllEvents, - Effect.andThen(events => Queries.GetACommentInNeedOfADoi(events)), - Effect.andThen(React.AssignCommentADoiWhenPublicationWasRequested), - Effect.tapError(() => Effect.annotateLogs(Effect.logError('ReactToCommentEvents failed'), { commentId })), - ), - ), - Match.when({ event: { _tag: 'DoiWasAssigned' } }, ({ commentId, event }) => + Schedule.fixed('1 minute'), + ), + pipe( + Queue.take(dequeue), + Effect.andThen( pipe( - React.PublishCommentWhenDoiWasAssigned({ commentId, event }), - Effect.tapError(() => Effect.annotateLogs(Effect.logError('ReactToCommentEvents failed'), { commentId })), + Match.type<{ commentId: Uuid.Uuid; event: CommentEvent }>(), + Match.when({ event: { _tag: 'CommentWasStarted' } }, ({ commentId }) => + pipe( + React.CheckIfUserHasAVerifiedEmailAddress(commentId), + Effect.tapError(() => + Effect.annotateLogs(Effect.logError('ReactToCommentEvents failed'), { commentId }), + ), + ), + ), + Match.when({ event: { _tag: 'CommentPublicationWasRequested' } }, ({ commentId }) => + pipe( + eventStore.getAllEvents, + Effect.andThen(events => Queries.GetACommentInNeedOfADoi(events)), + Effect.andThen(React.AssignCommentADoiWhenPublicationWasRequested), + Effect.tapError(() => + Effect.annotateLogs(Effect.logError('ReactToCommentEvents failed'), { commentId }), + ), + ), + ), + Match.when({ event: { _tag: 'DoiWasAssigned' } }, ({ commentId, event }) => + pipe( + React.PublishCommentWhenDoiWasAssigned({ commentId, event }), + Effect.tapError(() => + Effect.annotateLogs(Effect.logError('ReactToCommentEvents failed'), { commentId }), + ), + ), + ), + Match.orElse(() => Effect.void), ), ), - Match.orElse(() => Effect.void), + Effect.catchAll(() => Effect.void), + Effect.forever, ), - ), - Effect.catchAll(() => Effect.void), - Effect.forever, + ], + { concurrency: 'unbounded' }, ) }), )