Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

One pool per node test #24

Draft
wants to merge 23 commits into
base: cluster
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
10e07f3
Merge pull request #4 from juspay/cluster
aravindgopall Feb 7, 2023
910977c
Added retry logic for refreshShardMap
Mar 24, 2023
638e652
MissingNodeException bugfix
viv3kshukla-juspay Mar 27, 2023
3f49188
Merge pull request #21 from gupta-ujjwal/feature/NodeRetryLogicForRef…
aravindgopall Mar 27, 2023
a21fa1b
Merge pull request #22 from imviv3kshukla/fix/MissingNodeException
aravindgopall Mar 27, 2023
974d85e
throwing TimeoutException for timeouts
Apr 7, 2023
fc3b373
randomly choosing node for refreshShardMap
Apr 7, 2023
b7aea75
using fromException instead of displayException
Apr 7, 2023
22d8146
Merge pull request #23 from Candyman770/fix/timeoutException
aravindgopall Apr 10, 2023
039a83d
changing NodeConnection to Pool of NodeConnection and taking Cluster …
shashitnak Mar 17, 2023
bc79b73
moving withResource call to the beginning of requestNode
shashitnak Apr 10, 2023
8545c0e
moving NodeConnection HashMap in MVar and adding refreshCluster function
shashitnak Apr 12, 2023
0ee0bf0
refreshShardMap is now updating both MVar's and removing refreshCluster
shashitnak Apr 12, 2023
47613cf
dropping a NodeConnection from HashMap if it throws an exception in c…
shashitnak Apr 13, 2023
4362e64
only adding new nodes to hashmap and not changing the old ones
shashitnak Apr 13, 2023
4fd99de
saving a timestamp for last ShardMap refresh Time and creating entire…
shashitnak Apr 26, 2023
e22fbee
moving back to MVar's and fixing deadlock
shashitnak May 5, 2023
883936f
moving Pipeline from Connection object to ClusterEnv
shashitnak Jun 5, 2023
7427b2f
moving IORef inside Pool
shashitnak Jun 5, 2023
1230e4c
changing NodeResource from data to newtype
shashitnak Jun 5, 2023
8f27a40
addressing pr comments
shashitnak Jun 20, 2023
5e9065c
fixing redudant import error due to previous change
shashitnak Jun 20, 2023
7eed199
removing clusterConnect function
shashitnak Jun 20, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion hedis.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ library
HTTP,
errors,
network-uri,
unliftio-core
unliftio-core,
random
if !impl(ghc >= 8.0)
build-depends:
semigroups >= 0.11 && < 0.19
Expand Down
15 changes: 8 additions & 7 deletions src/Database/Redis/Cluster.hs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ module Database.Redis.Cluster
, ShardMap(..)
, HashSlot
, Shard(..)
, TimeoutException(..)
, connect
, disconnect
, requestPipelined
Expand All @@ -23,7 +24,7 @@ import qualified Data.ByteString as B
import qualified Data.ByteString.Char8 as Char8
import qualified Data.IORef as IOR
import Data.Maybe(mapMaybe, fromMaybe)
import Data.List(nub, sortBy, find)
import Data.List(nub, sortBy, find, isPrefixOf)
import Data.Map(fromListWith, assocs)
import Data.Function(on)
import Control.Exception(Exception, SomeException, throwIO, BlockedIndefinitelyOnMVar(..), catches, Handler(..), try, displayException)
Expand Down Expand Up @@ -124,7 +125,7 @@ instance Exception CrossSlotException
data NoNodeException = NoNodeException deriving (Show, Typeable)
instance Exception NoNodeException

data TimeoutException = TimeoutException deriving (Show, Typeable)
data TimeoutException = TimeoutException String deriving (Show, Typeable)
instance Exception TimeoutException

connect :: (Host -> CC.PortID -> Maybe Int -> IO CC.ConnectionContext) -> [CMD.CommandInfo] -> MVar ShardMap -> Maybe Int -> Bool -> ([NodeConnection] -> IO ShardMap) -> IO Connection
Expand Down Expand Up @@ -266,10 +267,10 @@ evaluatePipeline shardMapVar refreshShardmapAction conn requests = do
resps <- concat <$>
mapM (\(resp, (cc, r)) -> case resp of
Right v -> return v
Left (e :: SomeException) ->
case displayException e of
"TimeoutException" -> throwIO TimeoutException
_ -> executeRequests (getRandomConnection cc conn) r
Left (err :: SomeException) ->
if isPrefixOf "TimeoutException" (displayException err)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to match string in case of exception ? Can't we have a type of exception and pattern match it ?

then throwIO err
else executeRequests (getRandomConnection cc conn) r
) (zip eresps requestsByNode)
-- check for any moved in both responses and continue the flow.
when (any (moved . rawResponse) resps) refreshShardMapVar
Expand Down Expand Up @@ -457,7 +458,7 @@ requestNode (NodeConnection ctx lastRecvRef _) requests = do
eresp <- race requestNodeImpl (threadDelay envTimeout)
case eresp of
Left e -> return e
Right _ -> putStrLn "timeout happened" *> throwIO TimeoutException
Right _ -> putStrLn "timeout happened" *> throwIO (TimeoutException "Request Timeout")

where
requestNodeImpl :: IO [Reply]
Expand Down
15 changes: 11 additions & 4 deletions src/Database/Redis/Connection.hs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ import qualified Data.Time as Time
import Network.TLS (ClientParams)
import qualified Network.Socket as NS
import qualified Data.HashMap.Strict as HM
import System.Random (randomRIO)
import System.Environment (lookupEnv)
import Data.Maybe (fromMaybe)
import Text.Read (readMaybe)

import qualified Database.Redis.ProtocolPipelining as PP
import Database.Redis.Core(Redis, runRedisInternal, runRedisClusteredInternal)
Expand Down Expand Up @@ -276,19 +280,22 @@ refreshShardMap (Cluster.Connection nodeConns _ _ _ _) =

refreshShardMapWithNodeConn :: [Cluster.NodeConnection] -> IO ShardMap
refreshShardMapWithNodeConn [] = throwIO $ ClusterConnectError (Error "Couldn't refresh shardMap due to connection error")
refreshShardMapWithNodeConn ((Cluster.NodeConnection ctx _ _) : xs) = do
refreshShardMapWithNodeConn nodeConnsList = do
selectedIdx <- randomRIO (0, (length nodeConnsList) - 1)
let (Cluster.NodeConnection ctx _ _) = nodeConnsList !! selectedIdx
pipelineConn <- PP.fromCtx ctx
raceResult <- race (threadDelay (10^(3 :: Int))) (try $ refreshShardMapWithConn pipelineConn True) -- racing with delay of 1 ms
envTimeout <- fromMaybe (10 ^ (3 :: Int)) . (>>= readMaybe) <$> lookupEnv "REDIS_CLUSTER_SLOTS_TIMEOUT"

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we put these timeouts default in a single Config file within Hedis ? Having them distributed now is looking untidy.

raceResult <- race (threadDelay envTimeout) (try $ refreshShardMapWithConn pipelineConn True) -- racing with delay of default 1 ms
case raceResult of
Left () -> do
print $ "TimeoutForConnection " <> show ctx
refreshShardMapWithNodeConn xs
throwIO $ Cluster.TimeoutException "ClusterSlots Timeout"
Right eiShardMapResp ->
case eiShardMapResp of
Right shardMap -> pure shardMap
Left (err :: SomeException) -> do
print $ "ShardMapRefreshError-" <> show err
refreshShardMapWithNodeConn xs
throwIO $ ClusterConnectError (Error "Couldn't refresh shardMap due to connection error")

refreshShardMapWithConn :: PP.Connection -> Bool -> IO ShardMap
refreshShardMapWithConn pipelineConn _ = do
Expand Down