diff --git a/lib/mobility-core/src/Kernel/External/Notification/GRPC/Flow.hs b/lib/mobility-core/src/Kernel/External/Notification/GRPC/Flow.hs index 02e1c1a0c..1c30c3993 100644 --- a/lib/mobility-core/src/Kernel/External/Notification/GRPC/Flow.hs +++ b/lib/mobility-core/src/Kernel/External/Notification/GRPC/Flow.hs @@ -43,6 +43,7 @@ notifyPerson cfg notificationData = do case T.splitOn "-" notificationData.streamId of [startUuid, midOneUuid, _, _] -> T.intercalate "-" [startUuid, midOneUuid] _ -> notificationData.streamId + _ <- Hedis.withCrossAppRedis $ Hedis.publish "active-notification" notificationStreamId void $ Hedis.withCrossAppRedis $ Hedis.xAddExp ("N" <> notificationStreamId <> "{" <> (show shardId) <> "}") "*" (buildFieldValue notificationData now) cfg.streamExpirationTime where buildFieldValue notifData createdAt = diff --git a/lib/mobility-core/src/Kernel/Storage/Hedis/Queries.hs b/lib/mobility-core/src/Kernel/Storage/Hedis/Queries.hs index 43a38ef44..04c9bea13 100644 --- a/lib/mobility-core/src/Kernel/Storage/Hedis/Queries.hs +++ b/lib/mobility-core/src/Kernel/Storage/Hedis/Queries.hs @@ -824,3 +824,24 @@ ttl key = withLogTag "Redis" $ do withLogTag "CLUSTER" $ logTagInfo "FAILED_TO_TTL" $ show err pure (-1) -- Returning -1 if there was an error Right expSec -> pure expSec + +publish :: (HedisFlow m env) => Text -> Text -> m () +publish channel message = withLogTag "Redis" $ do + migrating <- asks (.hedisMigrationStage) + when migrating $ do + res <- withTimeRedis "RedisStandalone" "publish" $ + try @_ @SomeException $ + runWithPrefix'_ channel $ \prefChannel -> + Hedis.publish prefChannel (BSL.toStrict $ Ae.encode message) + + whenLeft res $ \err -> + withLogTag "STANDALONE" $ + logTagInfo "FAILED_TO_PUBLISH" (show err) + + res <- withTimeRedis "RedisCluster" "publish" $ + try @_ @SomeException $ + runWithPrefix_ channel $ \prefChannel -> + Hedis.publish prefChannel (BSL.toStrict $ Ae.encode message) + whenLeft res $ \err -> + withLogTag "CLUSTER" $ + logTagInfo "FAILED_TO_PUBLISH" (show err)