From 039a83d914c08c87cf2ca95d2cc16cd63c9748af Mon Sep 17 00:00:00 2001 From: shashitnak Date: Fri, 17 Mar 2023 10:52:29 +0530 Subject: [PATCH] changing NodeConnection to Pool of NodeConnection and taking Cluster Connection out of the Pool --- src/Database/Redis/Cluster.hs | 48 +++++++++++------------ src/Database/Redis/Connection.hs | 65 +++++++++++++++++++------------- 2 files changed, 63 insertions(+), 50 deletions(-) diff --git a/src/Database/Redis/Cluster.hs b/src/Database/Redis/Cluster.hs index e94c9f08..a9ae2910 100644 --- a/src/Database/Redis/Cluster.hs +++ b/src/Database/Redis/Cluster.hs @@ -14,7 +14,7 @@ module Database.Redis.Cluster , Shard(..) , TimeoutException(..) , connect - , disconnect + , destroyNodeResources , requestPipelined , requestMasterNodes , nodes @@ -28,6 +28,7 @@ import Data.List(nub, sortBy, find) import Data.Map(fromListWith, assocs) import Data.Function(on) import Control.Exception(Exception, SomeException, throwIO, BlockedIndefinitelyOnMVar(..), catches, Handler(..), try, fromException) +import Data.Pool(Pool, createPool, withResource, destroyAllResources) import Control.Concurrent.Async(race) import Control.Concurrent(threadDelay) import Control.Concurrent.MVar(MVar, newMVar, readMVar, modifyMVar, modifyMVar_) @@ -62,7 +63,7 @@ type IsReadOnly = Bool data Connection = Connection (HM.HashMap NodeID NodeConnection) (MVar Pipeline) (MVar ShardMap) CMD.InfoMap IsReadOnly -- | A connection to a single node in the cluster, similar to 'ProtocolPipelining.Connection' -data NodeConnection = NodeConnection CC.ConnectionContext (IOR.IORef (Maybe B.ByteString)) NodeID +data NodeConnection = NodeConnection (Pool CC.ConnectionContext) (IOR.IORef (Maybe B.ByteString)) NodeID instance Show NodeConnection where show (NodeConnection _ _ id1) = "nodeId: " <> show id1 @@ -128,8 +129,8 @@ instance Exception NoNodeException data TimeoutException = TimeoutException String deriving (Show, Typeable) instance Exception TimeoutException -connect :: (Host -> CC.PortID -> Maybe Int -> IO CC.ConnectionContext) -> [CMD.CommandInfo] -> MVar ShardMap -> Maybe Int -> Bool -> ([NodeConnection] -> IO ShardMap) -> IO Connection -connect withAuth commandInfos shardMapVar timeoutOpt isReadOnly refreshShardMap = do +connect :: (Host -> CC.PortID -> Maybe Int -> IO CC.ConnectionContext) -> [CMD.CommandInfo] -> MVar ShardMap -> Maybe Int -> Bool -> ([NodeConnection] -> IO ShardMap) -> Time.NominalDiffTime -> Int -> IO Connection +connect withAuth commandInfos shardMapVar timeoutOpt isReadOnly refreshShardMap idleTime maxResources = do shardMap <- readMVar shardMapVar stateVar <- newMVar $ Pending [] pipelineVar <- newMVar $ Pipeline stateVar @@ -161,15 +162,15 @@ connect withAuth commandInfos shardMapVar timeoutOpt isReadOnly refreshShardMap ) (mempty, False) info connectNode :: Node -> IO (NodeID, NodeConnection) connectNode (Node n _ host port) = do - ctx <- withAuth host (CC.PortNumber $ toEnum port) timeoutOpt + ctx <- createPool (withAuth host (CC.PortNumber $ toEnum port) timeoutOpt) CC.disconnect 1 idleTime maxResources ref <- IOR.newIORef Nothing return (n, NodeConnection ctx ref n) refreshShardMapVar :: ShardMap -> IO () refreshShardMapVar shardMap = hasLocked $ modifyMVar_ shardMapVar (const (pure shardMap)) -disconnect :: Connection -> IO () -disconnect (Connection nodeConnMap _ _ _ _ ) = mapM_ disconnectNode (HM.elems nodeConnMap) where - disconnectNode (NodeConnection nodeCtx _ _) = CC.disconnect nodeCtx +destroyNodeResources :: Connection -> IO () +destroyNodeResources (Connection nodeConnMap _ _ _ _ ) = mapM_ disconnectNode (HM.elems nodeConnMap) where + disconnectNode (NodeConnection nodePool _ _) = destroyAllResources nodePool -- Add a request to the current pipeline for this connection. The pipeline will -- be executed implicitly as soon as any result returned from this function is @@ -453,34 +454,33 @@ allMasterNodes (Connection nodeConns _ _ _ _) (ShardMap shardMap) = onlyMasterNodes = (\(Shard master _) -> master) <$> nub (IntMap.elems shardMap) requestNode :: NodeConnection -> [[B.ByteString]] -> IO [Reply] -requestNode (NodeConnection ctx lastRecvRef _) requests = do +requestNode (NodeConnection pool lastRecvRef _) requests = do envTimeout <- round . (\x -> (x :: Time.NominalDiffTime) * 1000000) . realToFrac . fromMaybe (0.5 :: Double) . (>>= readMaybe) <$> lookupEnv "REDIS_REQUEST_NODE_TIMEOUT" - eresp <- race requestNodeImpl (threadDelay envTimeout) + eresp <- race (withResource pool requestNodeImpl) (threadDelay envTimeout) case eresp of Left e -> return e Right _ -> putStrLn "timeout happened" *> throwIO (TimeoutException "Request Timeout") - where - requestNodeImpl :: IO [Reply] - requestNodeImpl = do - mapM_ (sendNode . renderRequest) requests + requestNodeImpl :: CC.ConnectionContext -> IO [Reply] + requestNodeImpl ctx = do + mapM_ (sendNode ctx . renderRequest) requests _ <- CC.flush ctx - replicateM (length requests) recvNode - sendNode :: B.ByteString -> IO () - sendNode = CC.send ctx - recvNode :: IO Reply - recvNode = do + replicateM (length requests) $ recvNode ctx + sendNode :: CC.ConnectionContext -> B.ByteString -> IO () + sendNode = CC.send + recvNode :: CC.ConnectionContext -> IO Reply + recvNode ctx = do maybeLastRecv <- IOR.readIORef lastRecvRef scanResult <- case maybeLastRecv of Just lastRecv -> Scanner.scanWith (CC.recv ctx) reply lastRecv Nothing -> Scanner.scanWith (CC.recv ctx) reply B.empty case scanResult of - Scanner.Fail{} -> CC.errConnClosed - Scanner.More{} -> error "Hedis: parseWith returned Partial" - Scanner.Done rest' r -> do - IOR.writeIORef lastRecvRef (Just rest') - return r + Scanner.Fail{} -> CC.errConnClosed + Scanner.More{} -> error "Hedis: parseWith returned Partial" + Scanner.Done rest' r -> do + IOR.writeIORef lastRecvRef (Just rest') + return r {-# INLINE nodes #-} nodes :: ShardMap -> [Node] diff --git a/src/Database/Redis/Connection.hs b/src/Database/Redis/Connection.hs index 01daec53..b5373761 100644 --- a/src/Database/Redis/Connection.hs +++ b/src/Database/Redis/Connection.hs @@ -2,7 +2,6 @@ {-# LANGUAGE RecordWildCards #-} {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE ScopedTypeVariables #-} - module Database.Redis.Connection where import Control.Exception @@ -33,6 +32,7 @@ import qualified Database.Redis.Cluster as Cluster import qualified Database.Redis.ConnectionContext as CC import Control.Concurrent (threadDelay) import Control.Concurrent.Async (race) +import qualified Database.Redis.Types as T --import qualified Database.Redis.Cluster.Pipeline as ClusterPipeline import Database.Redis.Commands @@ -54,7 +54,7 @@ import Database.Redis.Commands -- 'connect' function to create one. data Connection = NonClusteredConnection (Pool PP.Connection) - | ClusteredConnection (MVar ShardMap) (Pool Cluster.Connection) + | ClusteredConnection (MVar ShardMap) Cluster.Connection -- |Information for connnecting to a Redis server. -- @@ -173,7 +173,7 @@ checkedConnect connInfo = do -- |Destroy all idle resources in the pool. disconnect :: Connection -> IO () disconnect (NonClusteredConnection pool) = destroyAllResources pool -disconnect (ClusteredConnection _ pool) = destroyAllResources pool +disconnect (ClusteredConnection _ conn) = Cluster.destroyNodeResources conn -- | Memory bracket around 'connect' and 'disconnect'. withConnect :: (Catch.MonadMask m, MonadIO m) => ConnectInfo -> (Connection -> m c) -> m c @@ -191,8 +191,8 @@ withCheckedConnect connInfo = bracket (checkedConnect connInfo) disconnect runRedis :: Connection -> Redis a -> IO a runRedis (NonClusteredConnection pool) redis = withResource pool $ \conn -> runRedisInternal conn redis -runRedis (ClusteredConnection _ pool) redis = - withResource pool $ \conn -> runRedisClusteredInternal conn (refreshShardMap conn) redis +runRedis (ClusteredConnection _ conn) redis = + runRedisClusteredInternal conn (refreshShardMap conn) redis newtype ClusterConnectError = ClusterConnectError Reply deriving (Eq, Show, Typeable) @@ -224,9 +224,10 @@ connectCluster bootstrapConnInfo = do Right infos -> do let isConnectionReadOnly = connectReadOnly bootstrapConnInfo - clusterConnection = Cluster.connect withAuth infos shardMapVar timeoutOptUs isConnectionReadOnly refreshShardMapWithNodeConn - pool <- createPool (clusterConnect isConnectionReadOnly clusterConnection) Cluster.disconnect 1 (connectMaxIdleTime bootstrapConnInfo) (connectMaxConnections bootstrapConnInfo) - return $ ClusteredConnection shardMapVar pool + clusterConnection = Cluster.connect withAuth infos shardMapVar timeoutOptUs isConnectionReadOnly refreshShardMapWithNodeConn (connectMaxIdleTime bootstrapConnInfo) (connectMaxConnections bootstrapConnInfo) + -- pool <- createPool (clusterConnect isConnectionReadOnly clusterConnection) Cluster.disconnect 3 (connectMaxIdleTime bootstrapConnInfo) (connectMaxConnections bootstrapConnInfo) + connection <- clusterConnect isConnectionReadOnly clusterConnection + return $ ClusteredConnection shardMapVar connection where withAuth host port timeout = do conn <- PP.connect host port timeout @@ -249,13 +250,24 @@ connectCluster bootstrapConnInfo = do clusterConnect :: Bool -> IO Cluster.Connection -> IO Cluster.Connection clusterConnect readOnlyConnection connection = do clusterConn@(Cluster.Connection nodeMap _ _ _ _) <- connection - nodesConns <- sequence $ ( PP.fromCtx . (\(Cluster.NodeConnection ctx _ _) -> ctx ) . snd) <$> (HM.toList nodeMap) + nodesConns <- sequence $ (ctxToConn . snd) <$> (HM.toList nodeMap) when readOnlyConnection $ - mapM_ (\conn -> do - PP.beginReceiving conn - runRedisInternal conn readOnly + mapM_ (\maybeConn -> case maybeConn of + Just conn -> do + PP.beginReceiving conn + runRedisInternal conn readOnly + Nothing -> return $ Right (T.Status "Connection does not exist") ) nodesConns return clusterConn + where + ctxToConn :: Cluster.NodeConnection -> IO (Maybe PP.Connection) + ctxToConn (Cluster.NodeConnection pool _ nid) = do + maybeConn <- try $ withResource pool PP.fromCtx + case maybeConn of + Right ppConn -> return $ Just ppConn + Left (_ :: SomeException) -> do + putStrLn ("SomeException Occured in NodeID " ++ show nid) + return Nothing shardMapFromClusterSlotsResponse :: ClusterSlotsResponse -> IO ShardMap shardMapFromClusterSlotsResponse ClusterSlotsResponse{..} = ShardMap <$> foldr mkShardMap (pure IntMap.empty) clusterSlotsResponseEntries where @@ -282,20 +294,21 @@ refreshShardMapWithNodeConn :: [Cluster.NodeConnection] -> IO ShardMap refreshShardMapWithNodeConn [] = throwIO $ ClusterConnectError (Error "Couldn't refresh shardMap due to connection error") refreshShardMapWithNodeConn nodeConnsList = do selectedIdx <- randomRIO (0, (length nodeConnsList) - 1) - let (Cluster.NodeConnection ctx _ _) = nodeConnsList !! selectedIdx - pipelineConn <- PP.fromCtx ctx - envTimeout <- fromMaybe (10 ^ (3 :: Int)) . (>>= readMaybe) <$> lookupEnv "REDIS_CLUSTER_SLOTS_TIMEOUT" - raceResult <- race (threadDelay envTimeout) (try $ refreshShardMapWithConn pipelineConn True) -- racing with delay of default 1 ms - case raceResult of - Left () -> do - print $ "TimeoutForConnection " <> show ctx - throwIO $ Cluster.TimeoutException "ClusterSlots Timeout" - Right eiShardMapResp -> - case eiShardMapResp of - Right shardMap -> pure shardMap - Left (err :: SomeException) -> do - print $ "ShardMapRefreshError-" <> show err - throwIO $ ClusterConnectError (Error "Couldn't refresh shardMap due to connection error") + let (Cluster.NodeConnection pool _ _) = nodeConnsList !! selectedIdx + withResource pool $ \ctx -> do + pipelineConn <- PP.fromCtx ctx + envTimeout <- fromMaybe (10 ^ (3 :: Int)) . (>>= readMaybe) <$> lookupEnv "REDIS_CLUSTER_SLOTS_TIMEOUT" + raceResult <- race (threadDelay envTimeout) (try $ refreshShardMapWithConn pipelineConn True) -- racing with delay of default 1 ms + case raceResult of + Left () -> do + print $ "TimeoutForConnection " <> show ctx + throwIO $ Cluster.TimeoutException "ClusterSlots Timeout" + Right eiShardMapResp -> + case eiShardMapResp of + Right shardMap -> pure shardMap + Left (err :: SomeException) -> do + print $ "ShardMapRefreshError-" <> show err + throwIO $ ClusterConnectError (Error "Couldn't refresh shardMap due to connection error") refreshShardMapWithConn :: PP.Connection -> Bool -> IO ShardMap refreshShardMapWithConn pipelineConn _ = do