diff --git a/lib/mobility-core/src/Kernel/Mock/App.hs b/lib/mobility-core/src/Kernel/Mock/App.hs index 8bdf2f48a..ae7b1d749 100644 --- a/lib/mobility-core/src/Kernel/Mock/App.hs +++ b/lib/mobility-core/src/Kernel/Mock/App.hs @@ -58,6 +58,9 @@ instance CoreMetrics (MockM e) where incrementErrorCounter _ _ = return () addUrlCallRetries _ _ = return () addUrlCallRetryFailures _ = return () + incrementSortedSetCounter _ _ = return () + incrementStreamCounter _ _ = return () + addGenericLatency _ _ = return () instance MonadTime (MockM e) where getCurrentTime = liftIO getCurrentTime diff --git a/lib/mobility-core/src/Kernel/Storage/Hedis/Queries.hs b/lib/mobility-core/src/Kernel/Storage/Hedis/Queries.hs index a9c6fd36f..137a616f0 100644 --- a/lib/mobility-core/src/Kernel/Storage/Hedis/Queries.hs +++ b/lib/mobility-core/src/Kernel/Storage/Hedis/Queries.hs @@ -18,9 +18,8 @@ import qualified Data.Aeson as Ae import qualified Data.ByteString as BS import qualified Data.ByteString.Lazy as BSL import Data.String.Conversions -import Data.Text hiding (map, null) +import Data.Text hiding (concatMap, map, null) import qualified Data.Text as Text -import qualified Data.Text.Encoding as DE import Database.Redis (Queued, Redis, RedisTx, Reply, TxResult (..)) import qualified Database.Redis as Hedis import EulerHS.Prelude (whenLeft) @@ -34,6 +33,32 @@ import Kernel.Utils.Logging type ExpirationTime = Int +data XReadResponse = XReadResponse + { stream :: BS.ByteString, + records :: [StreamsRecord] + } + deriving (Show) + +data StreamsRecord = StreamsRecord + { recordId :: BS.ByteString, + keyValues :: [(BS.ByteString, BS.ByteString)] + } + deriving (Show) + +convertFromHedisResponse :: Hedis.XReadResponse -> XReadResponse +convertFromHedisResponse hedisResponse = + XReadResponse + { stream = Hedis.stream hedisResponse, + records = map convertFromHedisRecord (Hedis.records hedisResponse) + } + +convertFromHedisRecord :: Hedis.StreamsRecord -> StreamsRecord +convertFromHedisRecord hedisRecord = + StreamsRecord + { recordId = Hedis.recordId hedisRecord, + keyValues = Hedis.keyValues hedisRecord + } + runHedis :: HedisFlow m env => Redis (Either Reply a) -> m a runHedis action = do @@ -153,7 +178,7 @@ getImpl decodeResult key = withLogTag "Redis" $ do Just res' -> decodeResult res' get :: (FromJSON a, HedisFlow m env) => Text -> m (Maybe a) -get key = getImpl decodeResult key +get = getImpl decodeResult where decodeResult bs = Error.fromMaybeM (HedisDecodeError $ cs bs) $ Ae.decode $ BSL.fromStrict bs @@ -415,7 +440,7 @@ zrevrangeWithscores key start stop = do pure $ map (\(k, score) -> (cs' k, score)) res where cs' :: BS.ByteString -> Text - cs' = DE.decodeUtf8 + cs' = cs zScore :: (FromJSON Double, HedisFlow m env) => Text -> Text -> m (Maybe Double) zScore key member = do @@ -429,3 +454,143 @@ zRevRank key member = do zCard :: (HedisFlow m env) => Text -> m Integer zCard key = runWithPrefix key Hedis.zcard + +zAdd :: + (ToJSON member, HedisFlow m env) => + Text -> + [(Double, member)] -> + m () +zAdd key members = withLogTag "Redis" $ do + migrating <- asks (.hedisMigrationStage) + if migrating + then do + res <- withTimeRedis "RedisStandalone" "zAdd" $ try @_ @SomeException (runWithPrefix'_ key $ \prefKey -> Hedis.zadd prefKey $ map (\(score, member) -> (score, BSL.toStrict $ Ae.encode member)) members) + whenLeft res (\err -> withLogTag "STANDALONE" $ logTagInfo "FAILED_TO_ZADD" $ show err) + else pure () + res <- withTimeRedis "RedisCluster" "zAdd" $ try @_ @SomeException (runWithPrefix_ key $ \prefKey -> Hedis.zadd prefKey $ map (\(score, member) -> (score, BSL.toStrict $ Ae.encode member)) members) + whenLeft res (\err -> withLogTag "CLUSTER" $ logTagInfo "FAILED_TO_ZADD" $ show err) + +xInfoGroups :: + (HedisFlow m env) => + Text -> -- Stream key + m Bool +xInfoGroups key = do + eitherMaybeBS <- withTimeRedis "RedisStandalone" "get" $ try @_ @SomeException (runWithPrefix key Hedis.xinfoGroups) + ls <- + case eitherMaybeBS of + Left err -> logTagInfo "ERROR_WHILE_GET" (show err) $> [] + Right maybeBS -> pure maybeBS + return $ not (null ls) + +-- Function to create a new consumer group for a stream +xGroupCreate :: + (HedisFlow m env) => + Text -> -- Stream key + Text -> -- Group name + Text -> -- Start ID + m () +xGroupCreate key groupName startId = withLogTag "Redis" $ do + migrating <- asks (.hedisMigrationStage) + when migrating $ do + res <- withTimeRedis "RedisStandalone" "xGroupCreate" $ try @_ @SomeException (runWithPrefix'_ key $ \prefKey -> Hedis.xgroupCreate prefKey (cs groupName) (cs startId)) + whenLeft res (withLogTag "STANDALONE" . logTagInfo "FAILED_TO_xGroupCreate" . show) + res <- withTimeRedis "RedisCluster" "xGroupCreate" $ try @_ @SomeException (runWithPrefix_ key $ \prefKey -> Hedis.xgroupCreate prefKey (cs groupName) (cs startId)) + whenLeft res (withLogTag "CLUSTER" . logTagInfo "FAILED_TO_xGroupCreate" . show) + +extractKeyValuePairs :: [StreamsRecord] -> [(Text, Text)] +extractKeyValuePairs = concatMap (\(StreamsRecord _ keyVals) -> map (\(k, v) -> (cs k, cs v)) keyVals) + +extractRecordIds :: [StreamsRecord] -> [BS.ByteString] +extractRecordIds = map (\(StreamsRecord recordId _) -> recordId) + +xReadGroup :: + (HedisFlow m env) => + Text -> -- group name + Text -> -- consumer name + [(Text, Text)] -> -- (stream, id) pairs + m (Maybe [XReadResponse]) +xReadGroup groupName consumerName pairsList = do + let bsPairsList = map (\(stream, id) -> (cs stream, cs id)) pairsList + let mbKeyVal = listToMaybe bsPairsList + case mbKeyVal of + Just keyVal -> do + eitherMaybeBS <- withTimeRedis "RedisStandalone" "get" $ try @_ @SomeException (runWithPrefix (cs $ fst keyVal) $ \_ -> Hedis.xreadGroup (cs groupName) (cs consumerName) bsPairsList) + mbRes <- + case eitherMaybeBS of + Left err -> logTagInfo "ERROR_WHILE_GET" (show err) $> Nothing + Right maybeBS -> pure maybeBS + case mbRes of + Just res -> return $ Just (map convertFromHedisResponse res) + Nothing -> pure Nothing + Nothing -> pure Nothing + +xAdd :: (HedisFlow m env) => Text -> Text -> [(BS.ByteString, BS.ByteString)] -> m BS.ByteString +xAdd key entryId fieldValues = withLogTag "Redis" $ do + migrating <- asks (.hedisMigrationStage) + when migrating $ do + res <- withTimeRedis "RedisStandalone" "xadd" $ try @_ @SomeException (runWithPrefix'_ key $ \prefKey -> Hedis.xadd prefKey (cs entryId) fieldValues) + whenLeft res (withLogTag "STANDALONE" . logTagInfo "FAILED_TO_xadd" . show) + res <- withTimeRedis "RedisCluster" "xadd" $ try @_ @SomeException (runWithPrefix key $ \prefKey -> Hedis.xadd prefKey (cs entryId) fieldValues) + case res of + Left err -> do + withLogTag "CLUSTER" $ logTagInfo "xadd" $ show err + pure "" + Right items -> pure items + +zRangeByScore :: (HedisFlow m env) => Text -> Double -> Double -> m [BS.ByteString] +zRangeByScore key start end = withLogTag "Redis" $ do + migrating <- asks (.hedisMigrationStage) + when migrating $ do + res <- withTimeRedis "RedisStandalone" "zRangeByScore" $ try @_ @SomeException (runWithPrefix'_ key $ \prefKey -> Hedis.zrangebyscore prefKey start end) + whenLeft res (withLogTag "STANDALONE" . logTagInfo "FAILED_TO_ZRANGEBYSCORE" . show) + res <- withTimeRedis "RedisCluster" "zRangeByScore" $ try @_ @SomeException (runWithPrefix key $ \prefKey -> Hedis.zrangebyscore prefKey start end) + case res of + Left err -> do + withLogTag "CLUSTER" $ logTagInfo "FAILED_TO_ZRANGEBYSCORE" $ show err + pure [] -- Return an empty list if there was an error + Right items -> pure items + +zRemRangeByScore :: (HedisFlow m env) => Text -> Double -> Double -> m Integer +zRemRangeByScore key start end = withLogTag "Redis" $ do + migrating <- asks (.hedisMigrationStage) + when migrating $ do + res <- withTimeRedis "RedisStandalone" "zRemRangeByScore" $ try @_ @SomeException (runWithPrefix'_ key $ \prefKey -> Hedis.zremrangebyscore prefKey start end) + case res of + Left err -> withLogTag "STANDALONE" $ logTagInfo "FAILED_TO_ZREMRANGEBYSCORE" $ show err + Right items -> pure items + res <- withTimeRedis "RedisCluster" "zRemRangeByScore" $ try @_ @SomeException (runWithPrefix key $ \prefKey -> Hedis.zremrangebyscore prefKey start end) + case res of + Left err -> do + withLogTag "CLUSTER" $ logTagInfo "FAILED_TO_ZREMRANGEBYSCORE" $ show err + pure (-1) -- Return -1 if there was an error + Right items -> pure items + +xDel :: (HedisFlow m env) => Text -> [BS.ByteString] -> m Integer +xDel key entryId = withLogTag "Redis" $ do + migrating <- asks (.hedisMigrationStage) + when migrating $ do + res <- withTimeRedis "RedisStandalone" "xDel" $ try @_ @SomeException (runWithPrefix'_ key $ \prefKey -> Hedis.xdel prefKey entryId) + case res of + Left err -> withLogTag "STANDALONE" $ logTagInfo "FAILED_TO_XDEL" $ show err + Right items -> pure items + res <- withTimeRedis "RedisCluster" "xDel" $ try @_ @SomeException (runWithPrefix key $ \prefKey -> Hedis.xdel prefKey entryId) + case res of + Left err -> do + withLogTag "CLUSTER" $ logTagInfo "FAILED_TO_XDEL" $ show err + pure (-1) -- Return -1 if there was an error + Right items -> pure items + +xAck :: (HedisFlow m env) => Text -> Text -> [BS.ByteString] -> m Integer +xAck key groupName entryId = withLogTag "Redis" $ do + migrating <- asks (.hedisMigrationStage) + when migrating $ do + res <- withTimeRedis "RedisStandalone" "xAck" $ try @_ @SomeException (runWithPrefix'_ key $ \prefKey -> Hedis.xack prefKey (cs groupName) entryId) + case res of + Left err -> withLogTag "STANDALONE" $ logTagInfo "FAILED_TO_xAck" $ show err + Right items -> pure items + res <- withTimeRedis "RedisCluster" "xAck" $ try @_ @SomeException (runWithPrefix key $ \prefKey -> Hedis.xack prefKey (cs groupName) entryId) + case res of + Left err -> do + withLogTag "CLUSTER" $ logTagInfo "FAILED_TO_xAck" $ show err + pure (-1) -- Return -1 if there was an error + Right items -> pure items diff --git a/lib/mobility-core/src/Kernel/Tools/Metrics/CoreMetrics.hs b/lib/mobility-core/src/Kernel/Tools/Metrics/CoreMetrics.hs index fd2f60e10..9af0cd8b3 100644 --- a/lib/mobility-core/src/Kernel/Tools/Metrics/CoreMetrics.hs +++ b/lib/mobility-core/src/Kernel/Tools/Metrics/CoreMetrics.hs @@ -163,3 +163,64 @@ addUrlCallFailuresImplementation' cmContainers url version = do urlCallRetriesMetric (showBaseUrlText url, version.getDeploymentVersion) P.incCounter + +incrementSortedSetCounterImplementation :: + ( HasCoreMetrics r, + L.MonadFlow m, + MonadReader r m + ) => + Text -> + Int -> + m () +incrementSortedSetCounterImplementation context scheduledSecond = do + cmContainer <- asks (.coreMetrics) + version <- asks (.version) + incrementSortedSetCounterImplementation' cmContainer context scheduledSecond version + +incrementSortedSetCounterImplementation' :: L.MonadFlow m => CoreMetricsContainer -> Text -> Int -> DeploymentVersion -> m () +incrementSortedSetCounterImplementation' cmContainers context scheduledSecond version = do + let sortedSetMetric = cmContainers.sortedSetCounter + L.runIO $ + P.withLabel + sortedSetMetric + (context, show scheduledSecond, version.getDeploymentVersion) + P.incCounter + +incrementStreamCounterImplementation :: + ( HasCoreMetrics r, + L.MonadFlow m, + MonadReader r m + ) => + Text -> + Int -> + m () +incrementStreamCounterImplementation context executedseconds = do + cmContainer <- asks (.coreMetrics) + version <- asks (.version) + incrementStreamCounterImplementation' cmContainer context executedseconds version + +incrementStreamCounterImplementation' :: L.MonadFlow m => CoreMetricsContainer -> Text -> Int -> DeploymentVersion -> m () +incrementStreamCounterImplementation' cmContainers context executedseconds version = do + let sortedSetMetric = cmContainers.sortedSetCounter + L.runIO $ + P.withLabel + sortedSetMetric + (context, show executedseconds, version.getDeploymentVersion) + P.incCounter + +addGenericLatencyImplementation :: + ( HasCoreMetrics r, + L.MonadFlow m, + MonadReader r m + ) => + Text -> + NominalDiffTime -> + m () +addGenericLatencyImplementation operation latency = do + cmContainer <- asks (.coreMetrics) + version <- asks (.version) + L.runIO $ + P.withLabel + cmContainer.genericLatency + (operation, version.getDeploymentVersion) + (`P.observe` (fromIntegral $ div (fromEnum . nominalDiffTimeToSeconds $ latency) 1000000000000)) diff --git a/lib/mobility-core/src/Kernel/Tools/Metrics/CoreMetrics/Types.hs b/lib/mobility-core/src/Kernel/Tools/Metrics/CoreMetrics/Types.hs index 3a15a0a4f..bc8d777ad 100644 --- a/lib/mobility-core/src/Kernel/Tools/Metrics/CoreMetrics/Types.hs +++ b/lib/mobility-core/src/Kernel/Tools/Metrics/CoreMetrics/Types.hs @@ -38,6 +38,12 @@ type URLCallRetriesMetric = P.Vector P.Label3 P.Counter type URLCallRetryFailuresMetric = P.Vector P.Label2 P.Counter +type SortedSetMetric = P.Vector P.Label3 P.Counter + +type StreamMetric = P.Vector P.Label3 P.Counter + +type GenericLatencyMetric = P.Vector P.Label2 P.Histogram + type HasCoreMetrics r = ( HasField "coreMetrics" r CoreMetricsContainer, HasField "version" r DeploymentVersion @@ -46,32 +52,36 @@ type HasCoreMetrics r = newtype DeploymentVersion = DeploymentVersion {getDeploymentVersion :: Text} class CoreMetrics m where - addRequestLatency :: - Text -> - Text -> - Milliseconds -> - Either ClientError a -> - m () + addRequestLatency :: Text -> Text -> Milliseconds -> Either ClientError a -> m () addDatastoreLatency :: Text -> Text -> NominalDiffTime -> m () incrementErrorCounter :: Text -> SomeException -> m () addUrlCallRetries :: BaseUrl -> Int -> m () addUrlCallRetryFailures :: BaseUrl -> m () + incrementSortedSetCounter :: Text -> Int -> m () + incrementStreamCounter :: Text -> Int -> m () + addGenericLatency :: Text -> NominalDiffTime -> m () data CoreMetricsContainer = CoreMetricsContainer { requestLatency :: RequestLatencyMetric, datastoresLatency :: DatastoresLatencyMetric, + genericLatency :: GenericLatencyMetric, errorCounter :: ErrorCounterMetric, urlCallRetries :: URLCallRetriesMetric, - urlCallRetryFailures :: URLCallRetryFailuresMetric + urlCallRetryFailures :: URLCallRetryFailuresMetric, + sortedSetCounter :: SortedSetMetric, + streamCounter :: StreamMetric } registerCoreMetricsContainer :: IO CoreMetricsContainer registerCoreMetricsContainer = do requestLatency <- registerRequestLatencyMetric datastoresLatency <- registerDatastoresLatencyMetrics + genericLatency <- registerGenericLatencyMetrics errorCounter <- registerErrorCounterMetric urlCallRetries <- registerURLCallRetriesMetric urlCallRetryFailures <- registerURLCallRetryFailuresMetric + sortedSetCounter <- registerSortedSetMetric + streamCounter <- registerStreamCounter return CoreMetricsContainer {..} @@ -114,3 +124,27 @@ registerURLCallRetryFailuresMetric = P.counter info where info = P.Info "url_call_retry_failures_counter" "" + +registerSortedSetMetric :: IO SortedSetMetric +registerSortedSetMetric = + P.register $ + P.vector ("job_type", "scheduled_second", "version") $ + P.counter info + where + info = P.Info "sortedset_scheduled_jobs_counter" "" + +registerStreamCounter :: IO StreamMetric +registerStreamCounter = + P.register $ + P.vector ("job_type", "executed_seconds", "version") $ + P.counter info + where + info = P.Info "stream_jobs_counter" "" + +registerGenericLatencyMetrics :: IO GenericLatencyMetric +registerGenericLatencyMetrics = + P.register $ + P.vector ("operation", "version") $ + P.histogram info P.defaultBuckets + where + info = P.Info "producer_operation_duration" "" diff --git a/lib/mobility-core/src/Kernel/Types/Flow.hs b/lib/mobility-core/src/Kernel/Types/Flow.hs index 9a990a4bc..51da445c3 100644 --- a/lib/mobility-core/src/Kernel/Types/Flow.hs +++ b/lib/mobility-core/src/Kernel/Types/Flow.hs @@ -163,6 +163,9 @@ instance Metrics.HasCoreMetrics r => Metrics.CoreMetrics (FlowR r) where incrementErrorCounter = Metrics.incrementErrorCounterImplementation addUrlCallRetries = Metrics.addUrlCallRetriesImplementation addUrlCallRetryFailures = Metrics.addUrlCallFailuresImplementation + incrementSortedSetCounter = Metrics.incrementSortedSetCounterImplementation + incrementStreamCounter = Metrics.incrementStreamCounterImplementation + addGenericLatency = Metrics.addGenericLatencyImplementation instance MonadMonitor (FlowR r) where doIO = liftIO diff --git a/lib/mobility-core/src/Kernel/Utils/Time.hs b/lib/mobility-core/src/Kernel/Utils/Time.hs index ab7027abc..fdc051b57 100644 --- a/lib/mobility-core/src/Kernel/Utils/Time.hs +++ b/lib/mobility-core/src/Kernel/Utils/Time.hs @@ -24,6 +24,7 @@ where import qualified Data.Text as T import Data.Time hiding (getCurrentTime, nominalDiffTimeToSeconds, secondsToNominalDiffTime) import qualified Data.Time as Time hiding (secondsToNominalDiffTime) +import Data.Time.Clock.System import EulerHS.Prelude import Kernel.Types.Time import Kernel.Utils.Logging @@ -113,3 +114,14 @@ compareTimeWithInterval dt time1 time2 | abs (diffUTCTime time1 time2) < abs dt = EQ | time1 < time2 = LT | otherwise = GT -- time1 > time2 + +utcToMilliseconds :: UTCTime -> Double +utcToMilliseconds utcTime = fromIntegral $ div (systemSeconds systemTime * 1000000000 + fromIntegral (systemNanoseconds systemTime)) 1000000 + where + systemTime = utcToSystemTime utcTime + +getCurrentTimestamp :: (Monad m, MonadTime m) => m Double +getCurrentTimestamp = do + now <- getCurrentTime + let systemTime = utcToSystemTime now + pure . fromIntegral $ div (systemSeconds systemTime * 1000000000 + fromIntegral (systemNanoseconds systemTime)) 1000000 diff --git a/lib/mobility-core/test/src/APIExceptions.hs b/lib/mobility-core/test/src/APIExceptions.hs index b6a8e4f46..554e95f20 100644 --- a/lib/mobility-core/test/src/APIExceptions.hs +++ b/lib/mobility-core/test/src/APIExceptions.hs @@ -65,6 +65,9 @@ instance Metrics.CoreMetrics IO where incrementErrorCounter _ _ = return () addUrlCallRetries _ _ = return () addUrlCallRetryFailures _ = return () + incrementSortedSetCounter _ _ = return () + incrementStreamCounter _ _ = return () + addGenericLatency _ _ = return () httpExceptionTests :: TestTree httpExceptionTests =