From ef732cab3b631c293764263775994dcf8a3e8680 Mon Sep 17 00:00:00 2001 From: Alberto Valverde Date: Wed, 16 Oct 2024 12:49:53 +0200 Subject: [PATCH] [inferno-vc] Cached client and server logging. More granular cache locks. Cleanup --- inferno-vc/CHANGELOG.md | 7 + inferno-vc/inferno-vc.cabal | 3 +- .../Inferno/VersionControl/Client/Cached.hs | 213 +++++++++++------- inferno-vc/src/Inferno/VersionControl/Log.hs | 28 ++- .../src/Inferno/VersionControl/Server.hs | 17 +- 5 files changed, 184 insertions(+), 84 deletions(-) diff --git a/inferno-vc/CHANGELOG.md b/inferno-vc/CHANGELOG.md index f572decc..42461d1d 100644 --- a/inferno-vc/CHANGELOG.md +++ b/inferno-vc/CHANGELOG.md @@ -1,6 +1,13 @@ # Revision History for inferno-vc *Note*: we use https://pvp.haskell.org/ (MAJOR.MAJOR.MINOR.PATCH) +## 0.3.8.0 -- 2024-10-16 +* Added logging to cached client to see hits and misses +* Added logging to server to see what scriptIds are being used to request + fetchObjects and fetchObjectClosureHashes +* Made locks on cache more granular and only fetch a single upstream object per + request + ## 0.3.7.1 -- 2024-09-23 * Fix overflowing threadDelay on armv7l diff --git a/inferno-vc/inferno-vc.cabal b/inferno-vc/inferno-vc.cabal index 38f8ac4a..20e38bd8 100644 --- a/inferno-vc/inferno-vc.cabal +++ b/inferno-vc/inferno-vc.cabal @@ -1,6 +1,6 @@ cabal-version: >=1.10 name: inferno-vc -version: 0.3.7.1 +version: 0.3.8.0 synopsis: Version control server for Inferno description: A version control server for Inferno scripts category: DSL,Scripting @@ -69,6 +69,7 @@ library , QuickCheck , stm , unbounded-delays + , random-shuffle default-language: Haskell2010 default-extensions: diff --git a/inferno-vc/src/Inferno/VersionControl/Client/Cached.hs b/inferno-vc/src/Inferno/VersionControl/Client/Cached.hs index 7f049a8c..5bd452f3 100644 --- a/inferno-vc/src/Inferno/VersionControl/Client/Cached.hs +++ b/inferno-vc/src/Inferno/VersionControl/Client/Cached.hs @@ -16,8 +16,8 @@ import Control.Concurrent.STM retry, writeTVar, ) -import Control.Monad (forM, forM_) -import Control.Monad.Catch (MonadMask, bracket_) +import Control.Monad (forM, forM_, guard) +import Control.Monad.Catch (MonadMask, bracket_, tryJust) import Control.Monad.Error.Lens (throwing) import Control.Monad.Except (MonadError (..)) import Control.Monad.IO.Class (MonadIO (..)) @@ -27,15 +27,14 @@ import Data.Aeson (FromJSON, ToJSON, eitherDecodeStrict, encode) import qualified Data.ByteString as B import qualified Data.ByteString.Base64.URL as Base64 import qualified Data.ByteString.Char8 as Char8 -import qualified Data.ByteString.Lazy as BL -import Data.Either (partitionEithers) +import qualified Data.ByteString.Lazy.Char8 as BL import Data.Generics.Product (HasType, getTyped) import Data.Generics.Sum (AsType (..)) -import Data.List (foldl') import qualified Data.Map as Map import qualified Data.Set as Set import GHC.Generics (Generic) import qualified Inferno.VersionControl.Client as VCClient +import Inferno.VersionControl.Log (VCCacheTrace (..)) import Inferno.VersionControl.Operations.Error (VCStoreError (..)) import Inferno.VersionControl.Server (VCServerError) import Inferno.VersionControl.Types @@ -44,32 +43,59 @@ import Inferno.VersionControl.Types VCObjectHash (..), vcObjectHashToByteString, ) +import Plow.Logging (IOTracer (..), traceWith) import Servant.Client (ClientEnv, ClientError) import Servant.Typed.Error (TypedClientM, runTypedClientM) import System.AtomicWrite.Writer.LazyByteString (atomicWriteFile) -import System.Directory (createDirectoryIfMissing, doesFileExist) +import System.Directory (createDirectoryIfMissing) import System.FilePath.Posix (()) +import System.IO.Error (isDoesNotExistError) +import System.Random.Shuffle (shuffleM) data VCCacheEnv = VCCacheEnv { cachePath :: FilePath, - cacheInFlight :: TVar (Set.Set VCObjectHash) + cacheObjInFlight :: TVar (Set.Set VCObjectHash), + cacheDepInFlight :: TVar (Set.Set VCObjectHash), + tracer :: IOTracer VCCacheTrace } deriving (Generic) --- | Makes sure only one thread at a time fetches the closure for certain --- VCObjectHashes -withInFlight :: (MonadMask m, MonadIO m) => VCCacheEnv -> [VCObjectHash] -> m a -> m a -withInFlight VCCacheEnv {cacheInFlight} hashes = bracket_ acquire release +-- | Prevents thundering-herd problem by locking the key 'k' so only a single +-- 'fetchAndSave' runs concurrently +withSingleConcurrentFetch :: + (MonadMask m, MonadIO m, Ord k) => + -- | A 'TVar' holding the set of keys which are currently being fetched + TVar (Set.Set k) -> + -- | A function that returns Just the value being cached if found locally or + -- Nothing if it hasn't being cached yet. No lock is taken when calling this + -- function + (k -> m (Maybe a)) -> + -- | A function that fetches the value and caches it. A lock is taken so only + -- a single call per key to this function runs concurrently + (k -> m a) -> + -- | The key associated to the value being cached + k -> + m a +withSingleConcurrentFetch keySetRef check fetchAndSave key = + check key >>= \case + Just x -> pure x + Nothing -> + bracket_ acquire release $ + -- check again because another thread may have fetched while we were + -- blocked + check key >>= \case + Just x -> pure x + Nothing -> fetchAndSave key where acquire = liftIO $ atomically $ do - inFlight <- readTVar cacheInFlight - if any (`Set.member` inFlight) hashes + inFlight <- readTVar keySetRef + if key `Set.member` inFlight then retry else do - writeTVar cacheInFlight $! foldl' (flip Set.insert) inFlight hashes + writeTVar keySetRef $! key `Set.insert` inFlight release = liftIO $ atomically $ do - inFlight <- readTVar cacheInFlight - writeTVar cacheInFlight $! foldl' (flip Set.delete) inFlight hashes + inFlight <- readTVar keySetRef + writeTVar keySetRef $! key `Set.delete` inFlight data CachedVCClientError = ClientVCStoreError VCServerError @@ -77,11 +103,12 @@ data CachedVCClientError | LocalVCStoreError VCStoreError deriving (Show, Generic) -initVCCachedClient :: FilePath -> IO VCCacheEnv -initVCCachedClient cachePath = do +initVCCachedClient :: FilePath -> IOTracer VCCacheTrace -> IO VCCacheEnv +initVCCachedClient cachePath tracer = do createDirectoryIfMissing True $ cachePath "deps" - cacheInFlight <- newTVarIO mempty - pure VCCacheEnv {cachePath, cacheInFlight} + cacheObjInFlight <- newTVarIO mempty + cacheDepInFlight <- newTVarIO mempty + pure VCCacheEnv {cachePath, cacheObjInFlight, cacheDepInFlight, tracer} fetchVCObjectClosure :: ( MonadError err m, @@ -103,82 +130,116 @@ fetchVCObjectClosure :: VCObjectHash -> m (Map.Map VCObjectHash (VCMeta a g VCObject)) fetchVCObjectClosure fetchVCObjects remoteFetchVCObjectClosureHashes objHash = do - env@VCCacheEnv {cachePath} <- asks getTyped - deps <- - withInFlight env [objHash] $ - liftIO (doesFileExist $ cachePath "deps" show objHash) >>= \case - False -> do - deps <- liftServantClient $ remoteFetchVCObjectClosureHashes objHash - liftIO - $ atomicWriteFile - (cachePath "deps" show objHash) - $ BL.concat [BL.fromStrict (vcObjectHashToByteString h) <> "\n" | h <- deps] - pure deps - True -> fetchVCObjectClosureHashes objHash - withInFlight env deps $ do - (nonLocalHashes, localHashes) <- - partitionEithers - <$> forM - (objHash : deps) - ( \depHash -> do - liftIO (doesFileExist $ cachePath show depHash) >>= \case - True -> pure $ Right depHash - False -> pure $ Left depHash - ) - localObjs <- - Map.fromList - <$> forM - localHashes - ( \h -> - (h,) <$> fetchVCObjectUnsafe h - ) + VCCacheEnv {cacheObjInFlight, cacheDepInFlight} <- asks getTyped + deps <- withSingleConcurrentFetch cacheDepInFlight maybeReadCachedClosureHashes (fetchAndCacheClosureHashes remoteFetchVCObjectClosureHashes) objHash + -- shuffle scriptIds to improve concurrent performance when cache is cold + shuffledDeps <- liftIO $ shuffleM $ objHash : deps + mconcat + <$> mapM (withSingleConcurrentFetch cacheObjInFlight maybeReadCachedVCObject (fetchAndCacheVCObject fetchVCObjects)) shuffledDeps - nonLocalObjs <- liftServantClient $ fetchVCObjects nonLocalHashes - forM_ (Map.toList nonLocalObjs) $ \(h, o) -> - liftIO $ atomicWriteFile (cachePath show h) $ encode o - pure $ localObjs `Map.union` nonLocalObjs - -fetchVCObjectClosureHashes :: +maybeReadCachedClosureHashes :: ( MonadError err m, - MonadIO m, - MonadReader env m, + HasType VCCacheEnv env, AsType VCStoreError err, - HasType VCCacheEnv env + MonadReader env m, + MonadIO m, + MonadMask m ) => VCObjectHash -> - m [VCObjectHash] -fetchVCObjectClosureHashes h = do - VCCacheEnv {cachePath} <- asks getTyped - let fp = cachePath "deps" show h - readVCObjectHashTxt fp + m (Maybe [VCObjectHash]) +maybeReadCachedClosureHashes objHash = do + VCCacheEnv {tracer} <- asks getTyped + tryJust (guard . isDoesNotExistError) readCachedClosureHashes >>= \case + Right deps -> + Just deps <$ traceWith tracer (VCCacheDepsHit objHash) + Left _ -> + Nothing <$ traceWith tracer (VCCacheDepsMiss objHash) + where + readCachedClosureHashes = do + path <- cachedDepsPath objHash + deps <- filter (not . B.null) . Char8.lines <$> liftIO (B.readFile path) + forM deps $ \dep -> do + decoded <- + either (const $ throwing _Typed $ InvalidHash $ Char8.unpack dep) pure $ + Base64.decode dep + maybe (throwing _Typed $ InvalidHash $ Char8.unpack dep) (pure . VCObjectHash) $ + digestFromByteString decoded -readVCObjectHashTxt :: +fetchAndCacheClosureHashes :: ( MonadError err m, - AsType VCStoreError err, + HasType VCCacheEnv env, + HasType ClientEnv env, + AsType VCServerError err, + AsType ClientError err, + MonadReader env m, MonadIO m ) => - FilePath -> + (VCObjectHash -> VCClient.ClientMWithVCStoreError [VCObjectHash]) -> + VCObjectHash -> m [VCObjectHash] -readVCObjectHashTxt fp = do - deps <- filter (not . B.null) . Char8.lines <$> liftIO (B.readFile fp) - forM deps $ \dep -> do - decoded <- either (const $ throwing _Typed $ InvalidHash $ Char8.unpack dep) pure $ Base64.decode dep - maybe (throwing _Typed $ InvalidHash $ Char8.unpack dep) (pure . VCObjectHash) $ digestFromByteString decoded +fetchAndCacheClosureHashes remoteFetchVCObjectClosureHashes objHash = do + deps <- liftServantClient $ remoteFetchVCObjectClosureHashes objHash + path <- cachedDepsPath objHash + liftIO $ + atomicWriteFile path $ + BL.unlines $ + map (BL.fromStrict . vcObjectHashToByteString) deps + pure deps -fetchVCObjectUnsafe :: +maybeReadCachedVCObject :: ( MonadReader r m, HasType VCCacheEnv r, MonadError e m, AsType VCStoreError e, MonadIO m, + MonadMask m, FromJSON b ) => VCObjectHash -> - m b -fetchVCObjectUnsafe h = do + m (Maybe (Map.Map VCObjectHash b)) +maybeReadCachedVCObject objHash = do + VCCacheEnv {tracer} <- asks getTyped + tryJust (guard . isDoesNotExistError) readCachedVCObject >>= \case + Left _ -> + Nothing <$ traceWith tracer (VCCacheMiss objHash) + Right obj -> + Just (Map.singleton objHash obj) <$ traceWith tracer (VCCacheHit objHash) + where + readCachedVCObject = do + path <- cachedObjPath objHash + either (throwing _Typed . CouldNotDecodeObject objHash) pure + =<< liftIO (eitherDecodeStrict <$> Char8.readFile path) + +fetchAndCacheVCObject :: + ( MonadError err m, + HasType VCCacheEnv env, + HasType ClientEnv env, + AsType VCServerError err, + AsType ClientError err, + MonadReader env m, + MonadIO m, + ToJSON a, + ToJSON g + ) => + ([VCObjectHash] -> VCClient.ClientMWithVCStoreError (Map.Map VCObjectHash (VCMeta a g VCObject))) -> + VCObjectHash -> + m (Map.Map VCObjectHash (VCMeta a g VCObject)) +fetchAndCacheVCObject fetchVCObjects objHash = do + objs <- liftServantClient $ fetchVCObjects [objHash] + forM_ (Map.toList objs) $ \(h, o) -> do + path <- cachedObjPath h + liftIO $ atomicWriteFile path $ encode o + pure objs + +cachedDepsPath :: (MonadReader r m, HasType VCCacheEnv r) => VCObjectHash -> m FilePath +cachedDepsPath objHash = do + VCCacheEnv {cachePath} <- asks getTyped + pure $ cachePath "deps" show objHash + +cachedObjPath :: (MonadReader r m, HasType VCCacheEnv r) => VCObjectHash -> m FilePath +cachedObjPath objHash = do VCCacheEnv {cachePath} <- asks getTyped - let fp = cachePath show h - either (throwing _Typed . CouldNotDecodeObject h) pure =<< liftIO (eitherDecodeStrict <$> Char8.readFile fp) + pure $ cachePath show objHash liftServantClient :: ( MonadError e m, diff --git a/inferno-vc/src/Inferno/VersionControl/Log.hs b/inferno-vc/src/Inferno/VersionControl/Log.hs index 5f76a878..b7409bb9 100644 --- a/inferno-vc/src/Inferno/VersionControl/Log.hs +++ b/inferno-vc/src/Inferno/VersionControl/Log.hs @@ -1,9 +1,16 @@ {-# LANGUAGE OverloadedStrings #-} -module Inferno.VersionControl.Log where +module Inferno.VersionControl.Log + ( VCServerTrace (..), + VCCacheTrace (..), + vcServerTraceToText, + vcCacheTraceToText, + ) +where -import Data.Text (Text, pack) +import Data.Text (Text, intercalate, pack) import Inferno.VersionControl.Operations.Error (VCStoreError, vcStoreErrorToString) +import Inferno.VersionControl.Types (VCObjectHash) data VCServerTrace = ThrownVCStoreError VCStoreError @@ -14,6 +21,8 @@ data VCServerTrace | ReadJSON FilePath | ReadTxt FilePath | DeleteFile FilePath + | VCFetchObjects [VCObjectHash] + | VCFetchObjectClosureHashes VCObjectHash vcServerTraceToText :: VCServerTrace -> Text vcServerTraceToText = \case @@ -25,3 +34,18 @@ vcServerTraceToText = \case ThrownVCStoreError e -> pack (vcStoreErrorToString e) ThrownVCOtherError e -> "Other server error: " <> e DeleteFile fp -> "Deleting file: " <> pack fp + VCFetchObjects objs -> "FetchObjects " <> intercalate ", " (map (pack . show) objs) + VCFetchObjectClosureHashes obj -> "FetchObjectClosureHashes " <> pack (show obj) + +data VCCacheTrace + = VCCacheHit VCObjectHash + | VCCacheMiss VCObjectHash + | VCCacheDepsHit VCObjectHash + | VCCacheDepsMiss VCObjectHash + +vcCacheTraceToText :: VCCacheTrace -> Text +vcCacheTraceToText = \case + VCCacheHit h -> "VC Cache hit " <> pack (show h) + VCCacheMiss h -> "VC Cache miss " <> pack (show h) + VCCacheDepsHit h -> "VC Cache deps hit " <> pack (show h) + VCCacheDepsMiss h -> "VC Cache deps miss " <> pack (show h) diff --git a/inferno-vc/src/Inferno/VersionControl/Server.hs b/inferno-vc/src/Inferno/VersionControl/Server.hs index e8414ed8..fa0abbd4 100644 --- a/inferno-vc/src/Inferno/VersionControl/Server.hs +++ b/inferno-vc/src/Inferno/VersionControl/Server.hs @@ -37,7 +37,7 @@ import Data.Time.Clock.POSIX (getPOSIXTime) import GHC.Generics (Generic) import Inferno.Types.Syntax (Expr) import Inferno.Types.Type (TCScheme) -import Inferno.VersionControl.Log (VCServerTrace (ThrownVCOtherError, ThrownVCStoreError), vcServerTraceToText) +import Inferno.VersionControl.Log (VCServerTrace (..), vcServerTraceToText) import qualified Inferno.VersionControl.Operations as Ops import qualified Inferno.VersionControl.Operations.Error as Ops import Inferno.VersionControl.Server.Types (readServerConfig) @@ -100,14 +100,21 @@ vcServer :: Ord (Ops.Group m) ) => (forall x. m x -> Handler (Union (WithError VCServerError x))) -> + IOTracer VCServerTrace -> Server (VersionControlAPI (Ops.Author m) (Ops.Group m)) -vcServer toHandler = +vcServer toHandler tracer = toHandler . fetchFunctionH :<|> toHandler . Ops.fetchFunctionsForGroups :<|> toHandler . Ops.fetchVCObject :<|> toHandler . Ops.fetchVCObjectHistory - :<|> toHandler . fetchVCObjects - :<|> toHandler . Ops.fetchVCObjectClosureHashes + :<|> ( \objs -> + traceWith tracer (VCFetchObjects objs) + >> toHandler (fetchVCObjects objs) + ) + :<|> ( \obj -> + traceWith tracer (VCFetchObjectClosureHashes obj) + >> toHandler (Ops.fetchVCObjectClosureHashes obj) + ) :<|> toHandler . pushFunctionH :<|> toHandler . Ops.deleteAutosavedVCObject :<|> toHandler . Ops.deleteVCObjects @@ -187,7 +194,7 @@ runServerConfig middleware withEnv runOp serverConfig = do gzip def $ middleware env $ serve (Proxy :: Proxy (VersionControlAPI a g)) $ - vcServer (liftIO . liftTypedError . flip runOp env) + vcServer (liftIO . liftTypedError . flip runOp env) serverTracer withLinkedAsync_ :: IO a -> IO b -> IO b withLinkedAsync_ f g = withAsync f $ \h -> link h >> g