Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

One pool per node test #24

Draft
wants to merge 23 commits into
base: cluster
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
10e07f3
Merge pull request #4 from juspay/cluster
aravindgopall Feb 7, 2023
910977c
Added retry logic for refreshShardMap
Mar 24, 2023
638e652
MissingNodeException bugfix
viv3kshukla-juspay Mar 27, 2023
3f49188
Merge pull request #21 from gupta-ujjwal/feature/NodeRetryLogicForRef…
aravindgopall Mar 27, 2023
a21fa1b
Merge pull request #22 from imviv3kshukla/fix/MissingNodeException
aravindgopall Mar 27, 2023
974d85e
throwing TimeoutException for timeouts
Apr 7, 2023
fc3b373
randomly choosing node for refreshShardMap
Apr 7, 2023
b7aea75
using fromException instead of displayException
Apr 7, 2023
22d8146
Merge pull request #23 from Candyman770/fix/timeoutException
aravindgopall Apr 10, 2023
039a83d
changing NodeConnection to Pool of NodeConnection and taking Cluster …
shashitnak Mar 17, 2023
bc79b73
moving withResource call to the beginning of requestNode
shashitnak Apr 10, 2023
8545c0e
moving NodeConnection HashMap in MVar and adding refreshCluster function
shashitnak Apr 12, 2023
0ee0bf0
refreshShardMap is now updating both MVar's and removing refreshCluster
shashitnak Apr 12, 2023
47613cf
dropping a NodeConnection from HashMap if it throws an exception in c…
shashitnak Apr 13, 2023
4362e64
only adding new nodes to hashmap and not changing the old ones
shashitnak Apr 13, 2023
4fd99de
saving a timestamp for last ShardMap refresh Time and creating entire…
shashitnak Apr 26, 2023
e22fbee
moving back to MVar's and fixing deadlock
shashitnak May 5, 2023
883936f
moving Pipeline from Connection object to ClusterEnv
shashitnak Jun 5, 2023
7427b2f
moving IORef inside Pool
shashitnak Jun 5, 2023
1230e4c
changing NodeResource from data to newtype
shashitnak Jun 5, 2023
8f27a40
addressing pr comments
shashitnak Jun 20, 2023
5e9065c
fixing redudant import error due to previous change
shashitnak Jun 20, 2023
7eed199
removing clusterConnect function
shashitnak Jun 20, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 30 additions & 18 deletions src/Database/Redis/Cluster.hs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ module Database.Redis.Cluster
, nodes
, Pipeline
, newPipelineVar
, NodeResource(..)
) where

import qualified Data.ByteString as B
Expand Down Expand Up @@ -71,16 +72,18 @@ type IsReadOnly = Bool
data Connection = Connection (MVar NodeConnectionMap) (MVar ShardMap) CMD.InfoMap IsReadOnly TcpInfo

-- | A connection to a single node in the cluster, similar to 'ProtocolPipelining.Connection'
data NodeConnection = NodeConnection (Pool CC.ConnectionContext) (IOR.IORef (Maybe B.ByteString)) NodeID
data NodeConnection = NodeConnection (Pool NodeResource) NodeID

data NodeResource = NodeResource CC.ConnectionContext (IOR.IORef (Maybe B.ByteString))

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@shashitnak lets use a newtype here, data declaration are lesser performant than newtype since newtype is transparent at runtime.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ishan-juspay converted NodeResource to newtype. since newtype can only take one parameter so I made the two parameters into a tuple


instance Show NodeConnection where
show (NodeConnection _ _ id1) = "nodeId: " <> show id1
show (NodeConnection _ id1) = "nodeId: " <> show id1

instance Eq NodeConnection where
(NodeConnection _ _ id1) == (NodeConnection _ _ id2) = id1 == id2
(NodeConnection _ id1) == (NodeConnection _ id2) = id1 == id2

instance Ord NodeConnection where
compare (NodeConnection _ _ id1) (NodeConnection _ _ id2) = compare id1 id2
compare (NodeConnection _ id1) (NodeConnection _ id2) = compare id1 id2

data PipelineState =
-- Nothing in the pipeline has been evaluated yet so nothing has been
Expand Down Expand Up @@ -185,10 +188,16 @@ connect withAuth commandInfos shardMapVar isReadOnly refreshShardMap (tcpInfo@Tc
Left (_ :: SomeException) -> (acc, True)
) (mempty, False) info
connectNode :: Node -> IO (NodeID, NodeConnection)
connectNode (Node n _ host port) = do
ctx <- createPool (withAuth host (CC.PortNumber $ toEnum port) timeoutOpt) CC.disconnect 1 idleTime maxResources
connectNode node@(Node n _ _ _) = do
pool <- createPool (createNodeResource node) destroyNodeResource 1 idleTime maxResources
return (n, NodeConnection pool n)
createNodeResource :: Node -> IO NodeResource
createNodeResource (Node _ _ host port) = do
ctx <- withAuth host (CC.PortNumber $ toEnum port) timeoutOpt
ref <- IOR.newIORef Nothing
return (n, NodeConnection ctx ref n)
return $ NodeResource ctx ref
destroyNodeResource :: NodeResource -> IO ()
destroyNodeResource (NodeResource ctx _) = CC.disconnect ctx
refreshShardMapVar :: ShardMap -> IO ()
refreshShardMapVar shardMap = hasLocked $ modifyMVar_ shardMapVar (const (pure shardMap))

Expand All @@ -199,7 +208,7 @@ newPipelineVar = do

destroyNodeResources :: Connection -> IO ()
destroyNodeResources (Connection nodeConnMapVar _ _ _ _) = (readMVar nodeConnMapVar) >>= (mapM_ disconnectNode . HM.elems) where
disconnectNode (NodeConnection nodePool _ _) = destroyAllResources nodePool
disconnectNode (NodeConnection nodePool _) = destroyAllResources nodePool

-- Add a request to the current pipeline for this connection. The pipeline will
-- be executed implicitly as soon as any result returned from this function is
Expand Down Expand Up @@ -494,22 +503,25 @@ allMasterNodes (Connection nodeConnsVar _ _ _ _) (ShardMap shardMap) = do
onlyMasterNodes = (\(Shard master _) -> master) <$> nub (IntMap.elems shardMap)

requestNode :: NodeConnection -> [[B.ByteString]] -> IO [Reply]
requestNode (NodeConnection pool lastRecvRef _) requests = withResource pool $ \ctx -> do
requestNode (NodeConnection pool _) requests = withResource pool (`requestNodeResource` requests)

requestNodeResource :: NodeResource -> [[B.ByteString]] -> IO [Reply]
requestNodeResource (NodeResource ctx lastRecvRef) requests = do
envTimeout <- round . (\x -> (x :: Time.NominalDiffTime) * 1000000) . realToFrac . fromMaybe (0.5 :: Double) . (>>= readMaybe) <$> lookupEnv "REDIS_REQUEST_NODE_TIMEOUT"

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets move these default values and environment variables into a single Config file. @shashitnak

eresp <- race (requestNodeImpl ctx) (threadDelay envTimeout)
eresp <- race requestNodeImpl (threadDelay envTimeout)
case eresp of
Left e -> return e
Right _ -> putStrLn "timeout happened" *> throwIO (RequestTimingOut requests)
where
requestNodeImpl :: CC.ConnectionContext -> IO [Reply]
requestNodeImpl ctx = do
mapM_ (sendNode ctx . renderRequest) requests
requestNodeImpl :: IO [Reply]
requestNodeImpl = do
mapM_ (sendNode . renderRequest) requests
_ <- CC.flush ctx
replicateM (length requests) $ recvNode ctx
sendNode :: CC.ConnectionContext -> B.ByteString -> IO ()
sendNode = CC.send
recvNode :: CC.ConnectionContext -> IO Reply
recvNode ctx = do
replicateM (length requests) $ recvNode
sendNode :: B.ByteString -> IO ()
sendNode = CC.send ctx
recvNode :: IO Reply
recvNode = do
maybeLastRecv <- IOR.readIORef lastRecvRef
scanResult <- case maybeLastRecv of
Just lastRecv -> Scanner.scanWith (CC.recv ctx) reply lastRecv
Expand Down
20 changes: 13 additions & 7 deletions src/Database/Redis/Connection.hs
Original file line number Diff line number Diff line change
Expand Up @@ -265,8 +265,8 @@ connectCluster bootstrapConnInfo = do
return clusterConn
where
ctxToConn :: Cluster.NodeConnection -> IO (Maybe PP.Connection)
ctxToConn (Cluster.NodeConnection pool _ nid) = do
maybeConn <- try $ withResource pool PP.fromCtx
ctxToConn (Cluster.NodeConnection pool nid) = do
maybeConn <- try $ withResource pool (\(Cluster.NodeResource ctx _) -> PP.fromCtx ctx)
case maybeConn of
Right ppConn -> return $ Just ppConn
Left (_ :: SomeException) -> do
Expand Down Expand Up @@ -337,18 +337,24 @@ refreshShardMap (Cluster.Connection nodeConnsVar shardMapVar _ _ Cluster.TcpInfo
Left (_ :: SomeException) -> acc
) mempty info
connectNode :: Cluster.Node -> IO (Cluster.NodeID, Cluster.NodeConnection)
connectNode (Cluster.Node n _ host port) = do
ctx <- createPool (withAuth host (CC.PortNumber $ toEnum port) timeoutOpt) CC.disconnect 1 idleTime maxResources
connectNode node@(Cluster.Node n _ _ _) = do
pool <- createPool (createNodeResource node) destroyNodeResource 1 idleTime maxResources
return (n, Cluster.NodeConnection pool n)
createNodeResource :: Cluster.Node -> IO Cluster.NodeResource
createNodeResource (Cluster.Node _ _ host port) = do
ctx <- withAuth host (CC.PortNumber $ toEnum port) timeoutOpt
ref <- IOR.newIORef Nothing
return (n, Cluster.NodeConnection ctx ref n)
return $ Cluster.NodeResource ctx ref
destroyNodeResource :: Cluster.NodeResource -> IO ()
destroyNodeResource (Cluster.NodeResource ctx _) = CC.disconnect ctx

refreshShardMapWithNodeConn :: [Cluster.NodeConnection] -> IO ShardMap
refreshShardMapWithNodeConn [] = throwIO $ ClusterConnectError (Error "Couldn't refresh shardMap due to connection error")
refreshShardMapWithNodeConn nodeConnsList = do
selectedIdx <- randomRIO (0, (length nodeConnsList) - 1)
print selectedIdx
let (Cluster.NodeConnection pool _ _) = nodeConnsList !! selectedIdx
withResource pool $ \ctx -> do
let (Cluster.NodeConnection pool _) = nodeConnsList !! selectedIdx
withResource pool $ \(Cluster.NodeResource ctx _) -> do
pipelineConn <- PP.fromCtx ctx
envTimeout <- fromMaybe (10 ^ (3 :: Int)) . (>>= readMaybe) <$> lookupEnv "REDIS_CLUSTER_SLOTS_TIMEOUT"

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment, lets combine all such configs in a single place to be more readable / trackable.

raceResult <- race (threadDelay envTimeout) (try $ refreshShardMapWithConn pipelineConn True) -- racing with delay of default 1 ms
Expand Down