Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

One pool per node test #24

Draft
wants to merge 23 commits into
base: cluster
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
10e07f3
Merge pull request #4 from juspay/cluster
aravindgopall Feb 7, 2023
910977c
Added retry logic for refreshShardMap
Mar 24, 2023
638e652
MissingNodeException bugfix
viv3kshukla-juspay Mar 27, 2023
3f49188
Merge pull request #21 from gupta-ujjwal/feature/NodeRetryLogicForRef…
aravindgopall Mar 27, 2023
a21fa1b
Merge pull request #22 from imviv3kshukla/fix/MissingNodeException
aravindgopall Mar 27, 2023
974d85e
throwing TimeoutException for timeouts
Apr 7, 2023
fc3b373
randomly choosing node for refreshShardMap
Apr 7, 2023
b7aea75
using fromException instead of displayException
Apr 7, 2023
22d8146
Merge pull request #23 from Candyman770/fix/timeoutException
aravindgopall Apr 10, 2023
039a83d
changing NodeConnection to Pool of NodeConnection and taking Cluster …
shashitnak Mar 17, 2023
bc79b73
moving withResource call to the beginning of requestNode
shashitnak Apr 10, 2023
8545c0e
moving NodeConnection HashMap in MVar and adding refreshCluster function
shashitnak Apr 12, 2023
0ee0bf0
refreshShardMap is now updating both MVar's and removing refreshCluster
shashitnak Apr 12, 2023
47613cf
dropping a NodeConnection from HashMap if it throws an exception in c…
shashitnak Apr 13, 2023
4362e64
only adding new nodes to hashmap and not changing the old ones
shashitnak Apr 13, 2023
4fd99de
saving a timestamp for last ShardMap refresh Time and creating entire…
shashitnak Apr 26, 2023
e22fbee
moving back to MVar's and fixing deadlock
shashitnak May 5, 2023
883936f
moving Pipeline from Connection object to ClusterEnv
shashitnak Jun 5, 2023
7427b2f
moving IORef inside Pool
shashitnak Jun 5, 2023
1230e4c
changing NodeResource from data to newtype
shashitnak Jun 5, 2023
8f27a40
addressing pr comments
shashitnak Jun 20, 2023
5e9065c
fixing redudant import error due to previous change
shashitnak Jun 20, 2023
7eed199
removing clusterConnect function
shashitnak Jun 20, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion hedis.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ library
HTTP,
errors,
network-uri,
unliftio-core
unliftio-core,
random
if !impl(ghc >= 8.0)
build-depends:
semigroups >= 0.11 && < 0.19
Expand Down
85 changes: 53 additions & 32 deletions src/Database/Redis/Cluster.hs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@ module Database.Redis.Cluster
, ShardMap(..)
, HashSlot
, Shard(..)
, TimeoutException(..)
, connect
, disconnect
, destroyNodeResources
, requestPipelined
, requestMasterNodes
, nodes
Expand All @@ -26,7 +27,8 @@ import Data.Maybe(mapMaybe, fromMaybe)
import Data.List(nub, sortBy, find)
import Data.Map(fromListWith, assocs)
import Data.Function(on)
import Control.Exception(Exception, SomeException, throwIO, BlockedIndefinitelyOnMVar(..), catches, Handler(..), try)
import Control.Exception(Exception, SomeException, throwIO, BlockedIndefinitelyOnMVar(..), catches, Handler(..), try, fromException)
import Data.Pool(Pool, createPool, withResource, destroyAllResources)
import Control.Concurrent.Async(race)
import Control.Concurrent(threadDelay)
import Control.Concurrent.MVar(MVar, newMVar, readMVar, modifyMVar, modifyMVar_)
Expand Down Expand Up @@ -61,7 +63,10 @@ type IsReadOnly = Bool
data Connection = Connection (HM.HashMap NodeID NodeConnection) (MVar Pipeline) (MVar ShardMap) CMD.InfoMap IsReadOnly

-- | A connection to a single node in the cluster, similar to 'ProtocolPipelining.Connection'
data NodeConnection = NodeConnection CC.ConnectionContext (IOR.IORef (Maybe B.ByteString)) NodeID
data NodeConnection = NodeConnection (Pool CC.ConnectionContext) (IOR.IORef (Maybe B.ByteString)) NodeID

instance Show NodeConnection where
show (NodeConnection _ _ id1) = "nodeId: " <> show id1

instance Eq NodeConnection where
(NodeConnection _ _ id1) == (NodeConnection _ _ id2) = id1 == id2
Expand Down Expand Up @@ -121,8 +126,11 @@ instance Exception CrossSlotException
data NoNodeException = NoNodeException deriving (Show, Typeable)
instance Exception NoNodeException

connect :: (Host -> CC.PortID -> Maybe Int -> IO CC.ConnectionContext) -> [CMD.CommandInfo] -> MVar ShardMap -> Maybe Int -> Bool -> (NodeConnection -> IO ShardMap) -> IO Connection
connect withAuth commandInfos shardMapVar timeoutOpt isReadOnly refreshShardMap = do
data TimeoutException = TimeoutException String deriving (Show, Typeable)
instance Exception TimeoutException

connect :: (Host -> CC.PortID -> Maybe Int -> IO CC.ConnectionContext) -> [CMD.CommandInfo] -> MVar ShardMap -> Maybe Int -> Bool -> ([NodeConnection] -> IO ShardMap) -> Time.NominalDiffTime -> Int -> IO Connection
connect withAuth commandInfos shardMapVar timeoutOpt isReadOnly refreshShardMap idleTime maxResources = do
shardMap <- readMVar shardMapVar
stateVar <- newMVar $ Pending []
pipelineVar <- newMVar $ Pipeline stateVar
Expand All @@ -134,7 +142,7 @@ connect withAuth commandInfos shardMapVar timeoutOpt isReadOnly refreshShardMap
if shouldRetry
then if not (HM.null eNodeConns)
then do
newShardMap <- refreshShardMap (head $ HM.elems eNodeConns)
newShardMap <- refreshShardMap (HM.elems eNodeConns)
refreshShardMapVar newShardMap
simpleNodeConnections newShardMap
else
Expand All @@ -154,15 +162,15 @@ connect withAuth commandInfos shardMapVar timeoutOpt isReadOnly refreshShardMap
) (mempty, False) info
connectNode :: Node -> IO (NodeID, NodeConnection)
connectNode (Node n _ host port) = do
ctx <- withAuth host (CC.PortNumber $ toEnum port) timeoutOpt
ctx <- createPool (withAuth host (CC.PortNumber $ toEnum port) timeoutOpt) CC.disconnect 1 idleTime maxResources
ref <- IOR.newIORef Nothing
return (n, NodeConnection ctx ref n)
refreshShardMapVar :: ShardMap -> IO ()
refreshShardMapVar shardMap = hasLocked $ modifyMVar_ shardMapVar (const (pure shardMap))

disconnect :: Connection -> IO ()
disconnect (Connection nodeConnMap _ _ _ _ ) = mapM_ disconnectNode (HM.elems nodeConnMap) where
disconnectNode (NodeConnection nodeCtx _ _) = CC.disconnect nodeCtx
destroyNodeResources :: Connection -> IO ()
destroyNodeResources (Connection nodeConnMap _ _ _ _ ) = mapM_ disconnectNode (HM.elems nodeConnMap) where
disconnectNode (NodeConnection nodePool _ _) = destroyAllResources nodePool

-- Add a request to the current pipeline for this connection. The pipeline will
-- be executed implicitly as soon as any result returned from this function is
Expand Down Expand Up @@ -239,18 +247,32 @@ rawResponse (CompletedRequest _ _ r) = r
evaluatePipeline :: MVar ShardMap -> IO ShardMap -> Connection -> [[B.ByteString]] -> IO [Reply]
evaluatePipeline shardMapVar refreshShardmapAction conn requests = do
shardMap <- hasLocked $ readMVar shardMapVar
requestsByNode <- getRequestsByNode shardMap
erequestsByNode <- try $ getRequestsByNode shardMap
requestsByNode <- case erequestsByNode of
Right reqByNode-> pure reqByNode
Left (_ :: MissingNodeException) -> do
refreshShardMapVar
newShardMap <- hasLocked $ readMVar shardMapVar
getRequestsByNode newShardMap
-- catch the exception thrown at each node level
-- send the command to random node.
-- merge the current responses with new responses.
eresps <- mapM (try . uncurry executeRequests) requestsByNode
-- take a random connection where there are no exceptions.
-- PERF_CONCERN: Since usually we send only one request at time, this won't be
-- heavy perf issue. but still should be evaluated and figured out with complete rewrite.
resps <- concat <$> mapM (\(resp, (cc, r)) -> case resp of
Right v -> return v
Left (_ :: SomeException) -> executeRequests (getRandomConnection cc conn) r
) (zip eresps requestsByNode)

-- throwing exception for timeouts thus closing the connection instead of retrying.
-- otherwise if there is any response in the connection buffer it'll get forwarded to other requests that are reusing the same connection.
-- leading to jumbled up responses
resps <- concat <$>
mapM (\(resp, (cc, r)) -> case resp of
Right v -> return v
Left (err :: SomeException) ->
case fromException err of
Just (er :: TimeoutException) -> throwIO er
_ -> executeRequests (getRandomConnection cc conn) r
) (zip eresps requestsByNode)
-- check for any moved in both responses and continue the flow.
when (any (moved . rawResponse) resps) refreshShardMapVar
retriedResps <- mapM (retry 0) resps
Expand Down Expand Up @@ -432,34 +454,33 @@ allMasterNodes (Connection nodeConns _ _ _ _) (ShardMap shardMap) =
onlyMasterNodes = (\(Shard master _) -> master) <$> nub (IntMap.elems shardMap)

requestNode :: NodeConnection -> [[B.ByteString]] -> IO [Reply]
requestNode (NodeConnection ctx lastRecvRef _) requests = do
requestNode (NodeConnection pool lastRecvRef _) requests = do
envTimeout <- round . (\x -> (x :: Time.NominalDiffTime) * 1000000) . realToFrac . fromMaybe (0.5 :: Double) . (>>= readMaybe) <$> lookupEnv "REDIS_REQUEST_NODE_TIMEOUT"

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets move these default values and environment variables into a single Config file. @shashitnak

eresp <- race requestNodeImpl (threadDelay envTimeout)
eresp <- race (withResource pool requestNodeImpl) (threadDelay envTimeout)
case eresp of
Left e -> return e
Right _ -> putStrLn "timeout happened" *> throwIO NoNodeException

Right _ -> putStrLn "timeout happened" *> throwIO (TimeoutException "Request Timeout")
where
requestNodeImpl :: IO [Reply]
requestNodeImpl = do
mapM_ (sendNode . renderRequest) requests
requestNodeImpl :: CC.ConnectionContext -> IO [Reply]
requestNodeImpl ctx = do
mapM_ (sendNode ctx . renderRequest) requests
_ <- CC.flush ctx
replicateM (length requests) recvNode
sendNode :: B.ByteString -> IO ()
sendNode = CC.send ctx
recvNode :: IO Reply
recvNode = do
replicateM (length requests) $ recvNode ctx
sendNode :: CC.ConnectionContext -> B.ByteString -> IO ()
sendNode = CC.send
recvNode :: CC.ConnectionContext -> IO Reply
recvNode ctx = do
maybeLastRecv <- IOR.readIORef lastRecvRef
scanResult <- case maybeLastRecv of
Just lastRecv -> Scanner.scanWith (CC.recv ctx) reply lastRecv
Nothing -> Scanner.scanWith (CC.recv ctx) reply B.empty

case scanResult of
Scanner.Fail{} -> CC.errConnClosed
Scanner.More{} -> error "Hedis: parseWith returned Partial"
Scanner.Done rest' r -> do
IOR.writeIORef lastRecvRef (Just rest')
return r
Scanner.Fail{} -> CC.errConnClosed
Scanner.More{} -> error "Hedis: parseWith returned Partial"
Scanner.Done rest' r -> do
IOR.writeIORef lastRecvRef (Just rest')
return r

{-# INLINE nodes #-}
nodes :: ShardMap -> [Node]
Expand Down
72 changes: 55 additions & 17 deletions src/Database/Redis/Connection.hs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
{-# LANGUAGE TupleSections #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE ScopedTypeVariables #-}
module Database.Redis.Connection where

import Control.Exception
Expand All @@ -18,14 +19,22 @@ import qualified Data.Time as Time
import Network.TLS (ClientParams)
import qualified Network.Socket as NS
import qualified Data.HashMap.Strict as HM
import System.Random (randomRIO)
import System.Environment (lookupEnv)
import Data.Maybe (fromMaybe)
import Text.Read (readMaybe)

import qualified Database.Redis.ProtocolPipelining as PP
import Database.Redis.Core(Redis, runRedisInternal, runRedisClusteredInternal)
import Database.Redis.Protocol(Reply(..))
import Database.Redis.Cluster(ShardMap(..), Node, Shard(..))
import qualified Database.Redis.Cluster as Cluster
import qualified Database.Redis.ConnectionContext as CC
import Control.Concurrent (threadDelay)
import Control.Concurrent.Async (race)
import qualified Database.Redis.Types as T
--import qualified Database.Redis.Cluster.Pipeline as ClusterPipeline

import Database.Redis.Commands
( ping
, select
Expand All @@ -45,7 +54,7 @@ import Database.Redis.Commands
-- 'connect' function to create one.
data Connection
= NonClusteredConnection (Pool PP.Connection)
| ClusteredConnection (MVar ShardMap) (Pool Cluster.Connection)
| ClusteredConnection (MVar ShardMap) Cluster.Connection

-- |Information for connnecting to a Redis server.
--
Expand Down Expand Up @@ -164,7 +173,7 @@ checkedConnect connInfo = do
-- |Destroy all idle resources in the pool.
disconnect :: Connection -> IO ()
disconnect (NonClusteredConnection pool) = destroyAllResources pool
disconnect (ClusteredConnection _ pool) = destroyAllResources pool
disconnect (ClusteredConnection _ conn) = Cluster.destroyNodeResources conn

-- | Memory bracket around 'connect' and 'disconnect'.
withConnect :: (Catch.MonadMask m, MonadIO m) => ConnectInfo -> (Connection -> m c) -> m c
Expand All @@ -182,8 +191,8 @@ withCheckedConnect connInfo = bracket (checkedConnect connInfo) disconnect
runRedis :: Connection -> Redis a -> IO a
runRedis (NonClusteredConnection pool) redis =
withResource pool $ \conn -> runRedisInternal conn redis
runRedis (ClusteredConnection _ pool) redis =
withResource pool $ \conn -> runRedisClusteredInternal conn (refreshShardMap conn) redis
runRedis (ClusteredConnection _ conn) redis =
runRedisClusteredInternal conn (refreshShardMap conn) redis

newtype ClusterConnectError = ClusterConnectError Reply
deriving (Eq, Show, Typeable)
Expand Down Expand Up @@ -215,9 +224,10 @@ connectCluster bootstrapConnInfo = do
Right infos -> do
let
isConnectionReadOnly = connectReadOnly bootstrapConnInfo
clusterConnection = Cluster.connect withAuth infos shardMapVar timeoutOptUs isConnectionReadOnly refreshShardMapWithNodeConn
pool <- createPool (clusterConnect isConnectionReadOnly clusterConnection) Cluster.disconnect 1 (connectMaxIdleTime bootstrapConnInfo) (connectMaxConnections bootstrapConnInfo)
return $ ClusteredConnection shardMapVar pool
clusterConnection = Cluster.connect withAuth infos shardMapVar timeoutOptUs isConnectionReadOnly refreshShardMapWithNodeConn (connectMaxIdleTime bootstrapConnInfo) (connectMaxConnections bootstrapConnInfo)
-- pool <- createPool (clusterConnect isConnectionReadOnly clusterConnection) Cluster.disconnect 3 (connectMaxIdleTime bootstrapConnInfo) (connectMaxConnections bootstrapConnInfo)
connection <- clusterConnect isConnectionReadOnly clusterConnection
return $ ClusteredConnection shardMapVar connection
where
withAuth host port timeout = do
conn <- PP.connect host port timeout
Expand All @@ -240,13 +250,24 @@ connectCluster bootstrapConnInfo = do
clusterConnect :: Bool -> IO Cluster.Connection -> IO Cluster.Connection
clusterConnect readOnlyConnection connection = do
clusterConn@(Cluster.Connection nodeMap _ _ _ _) <- connection
nodesConns <- sequence $ ( PP.fromCtx . (\(Cluster.NodeConnection ctx _ _) -> ctx ) . snd) <$> (HM.toList nodeMap)
nodesConns <- sequence $ (ctxToConn . snd) <$> (HM.toList nodeMap)
when readOnlyConnection $
mapM_ (\conn -> do
PP.beginReceiving conn
runRedisInternal conn readOnly
mapM_ (\maybeConn -> case maybeConn of
Just conn -> do
PP.beginReceiving conn
runRedisInternal conn readOnly
Nothing -> return $ Right (T.Status "Connection does not exist")
) nodesConns
return clusterConn
where
ctxToConn :: Cluster.NodeConnection -> IO (Maybe PP.Connection)
ctxToConn (Cluster.NodeConnection pool _ nid) = do
maybeConn <- try $ withResource pool PP.fromCtx
case maybeConn of
Right ppConn -> return $ Just ppConn
Left (_ :: SomeException) -> do
putStrLn ("SomeException Occured in NodeID " ++ show nid)
return Nothing

shardMapFromClusterSlotsResponse :: ClusterSlotsResponse -> IO ShardMap
shardMapFromClusterSlotsResponse ClusterSlotsResponse{..} = ShardMap <$> foldr mkShardMap (pure IntMap.empty) clusterSlotsResponseEntries where
Expand All @@ -267,17 +288,34 @@ shardMapFromClusterSlotsResponse ClusterSlotsResponse{..} = ShardMap <$> foldr m

refreshShardMap :: Cluster.Connection -> IO ShardMap
refreshShardMap (Cluster.Connection nodeConns _ _ _ _) =
refreshShardMapWithNodeConn (head $ HM.elems nodeConns)
refreshShardMapWithNodeConn (HM.elems nodeConns)

refreshShardMapWithNodeConn :: Cluster.NodeConnection -> IO ShardMap
refreshShardMapWithNodeConn (Cluster.NodeConnection ctx _ _) = do
pipelineConn <- PP.fromCtx ctx
refreshShardMapWithConn pipelineConn True
refreshShardMapWithNodeConn :: [Cluster.NodeConnection] -> IO ShardMap
refreshShardMapWithNodeConn [] = throwIO $ ClusterConnectError (Error "Couldn't refresh shardMap due to connection error")
refreshShardMapWithNodeConn nodeConnsList = do
selectedIdx <- randomRIO (0, (length nodeConnsList) - 1)
let (Cluster.NodeConnection pool _ _) = nodeConnsList !! selectedIdx
withResource pool $ \ctx -> do
pipelineConn <- PP.fromCtx ctx
envTimeout <- fromMaybe (10 ^ (3 :: Int)) . (>>= readMaybe) <$> lookupEnv "REDIS_CLUSTER_SLOTS_TIMEOUT"

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment, lets combine all such configs in a single place to be more readable / trackable.

raceResult <- race (threadDelay envTimeout) (try $ refreshShardMapWithConn pipelineConn True) -- racing with delay of default 1 ms
case raceResult of
Left () -> do
print $ "TimeoutForConnection " <> show ctx
throwIO $ Cluster.TimeoutException "ClusterSlots Timeout"
Right eiShardMapResp ->
case eiShardMapResp of
Right shardMap -> pure shardMap
Left (err :: SomeException) -> do
print $ "ShardMapRefreshError-" <> show err
throwIO $ ClusterConnectError (Error "Couldn't refresh shardMap due to connection error")

refreshShardMapWithConn :: PP.Connection -> Bool -> IO ShardMap
refreshShardMapWithConn pipelineConn _ = do
_ <- PP.beginReceiving pipelineConn
slotsResponse <- runRedisInternal pipelineConn clusterSlots
case slotsResponse of
Left e -> throwIO $ ClusterConnectError e
Right slots -> shardMapFromClusterSlotsResponse slots
Right slots -> case clusterSlotsResponseEntries slots of
[] -> throwIO $ ClusterConnectError $ SingleLine "empty slotsResponse"
_ -> shardMapFromClusterSlotsResponse slots