Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add SyncV2 Causal Negotiation #5570

Merged
merged 8 commits into from
Feb 5, 2025
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions unison-cli/src/Unison/Cli/DownloadUtils.hs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ where
import Control.Concurrent.STM (atomically)
import Control.Concurrent.STM.TVar (modifyTVar', newTVarIO, readTVar, readTVarIO)
import Data.List.NonEmpty (pattern (:|))
import Data.Set qualified as Set
import System.Console.Regions qualified as Console.Regions
import U.Codebase.HashTags (CausalHash)
import U.Codebase.Sqlite.Queries qualified as Queries
Expand Down Expand Up @@ -67,11 +66,9 @@ downloadProjectBranchFromShare syncVersion useSquashed branch =
Cli.respond (Output.DownloadedEntities numDownloaded)
SyncV2 -> do
let branchRef = SyncV2.BranchRef (into @Text (ProjectAndBranch branch.projectName remoteProjectBranchName))
-- TODO: Fill this in.
let knownHashes = Set.empty
let downloadedCallback = \_ -> pure ()
let shouldValidate = not $ Codeserver.isCustomCodeserver Codeserver.defaultCodeserver
result <- SyncV2.syncFromCodeserver shouldValidate Share.hardCodedBaseUrl branchRef causalHashJwt knownHashes downloadedCallback
result <- SyncV2.syncFromCodeserver shouldValidate Share.hardCodedBaseUrl branchRef causalHashJwt downloadedCallback
result & onLeft \err0 -> do
done case err0 of
Share.SyncError pullErr ->
Expand Down
112 changes: 99 additions & 13 deletions unison-cli/src/Unison/Share/SyncV2.hs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ import Data.Attoparsec.ByteString.Char8 qualified as A8
import Data.ByteString qualified as BS
import Data.ByteString.Lazy qualified as BL
import Data.Conduit.Attoparsec qualified as C
import Data.Conduit.List qualified as C
import Data.Conduit.Combinators qualified as C
import Data.Conduit.List qualified as CL
import Data.Conduit.Zlib qualified as C
import Data.Foldable qualified as Foldable
import Data.Graph qualified as Graph
Expand Down Expand Up @@ -62,7 +63,7 @@ import Unison.Sync.Types qualified as Share
import Unison.Sync.Types qualified as Sync
import Unison.SyncV2.API (Routes (downloadEntitiesStream))
import Unison.SyncV2.API qualified as SyncV2
import Unison.SyncV2.Types (CBORBytes, CBORStream)
import Unison.SyncV2.Types (CBORBytes, CBORStream, DependencyType (..))
import Unison.SyncV2.Types qualified as SyncV2
import Unison.Util.Servant.CBOR qualified as CBOR
import Unison.Util.Timing qualified as Timing
Expand Down Expand Up @@ -116,6 +117,9 @@ syncFromFile ::
Cli (Either (SyncError SyncV2.PullError) CausalHash)
syncFromFile shouldValidate syncFilePath = do
Cli.Env {codebase} <- ask
-- Every insert into SQLite checks the temp entity tables, but syncv2 doesn't actually use them, so it's faster
-- if we clear them out before starting a sync.
Cli.runTransaction Q.clearTempEntityTables
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The temp entity table should be empty except if a past sync failed and was never completed, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup; but it's also never otherwise cleared, so if you accidentally clone @unison/base/main instead of lib.install @unison/base or something like that it'll leave junk in there basically forever.

runExceptT do
mapExceptT liftIO $ Timing.time "File Sync" $ do
header <- mapExceptT C.runResourceT $ do
Expand All @@ -135,6 +139,9 @@ syncFromCodebase ::
CausalHash ->
IO (Either (SyncError SyncV2.PullError) ())
syncFromCodebase shouldValidate srcConn destCodebase causalHash = do
-- Every insert into SQLite checks the temp entity tables, but syncv2 doesn't actually use them, so it's faster
-- if we clear them out before starting a sync.
Sqlite.runTransaction srcConn Q.clearTempEntityTables
liftIO . C.runResourceT . runExceptT $ withCodebaseEntityStream srcConn causalHash Nothing \_total entityStream -> do
(header, rest) <- initializeStream entityStream
streamIntoCodebase shouldValidate destCodebase header rest
Expand All @@ -148,13 +155,16 @@ syncFromCodeserver ::
SyncV2.BranchRef ->
-- | The hash to download.
Share.HashJWT ->
Set Hash32 ->
-- | Callback that's given a number of entities we just downloaded.
(Int -> IO ()) ->
Cli (Either (SyncError SyncV2.PullError) ())
syncFromCodeserver shouldValidate unisonShareUrl branchRef hashJwt knownHashes _downloadedCallback = do
syncFromCodeserver shouldValidate unisonShareUrl branchRef hashJwt _downloadedCallback = do
Cli.Env {authHTTPClient, codebase} <- ask
-- Every insert into SQLite checks the temp entity tables, but syncv2 doesn't actually use them, so it's faster
-- if we clear them out before starting a sync.
Cli.runTransaction Q.clearTempEntityTables
runExceptT do
knownHashes <- ExceptT $ negotiateKnownCausals unisonShareUrl branchRef hashJwt
let hash = Share.hashJWTHash hashJwt
ExceptT $ do
(Cli.runTransaction (Q.entityLocation hash)) >>= \case
Expand Down Expand Up @@ -218,7 +228,7 @@ syncUnsortedStream shouldValidate codebase stream = do
allEntities <-
C.runConduit $
stream
C..| C.chunksOf batchSize
C..| CL.chunksOf batchSize
C..| unpackChunks codebase
C..| validateBatch
C..| C.concat
Expand Down Expand Up @@ -247,7 +257,7 @@ syncSortedStream shouldValidate codebase stream = do
validateAndSave shouldValidate codebase entityBatch
C.runConduit $
stream
C..| C.chunksOf batchSize
C..| CL.chunksOf batchSize
C..| unpackChunks codebase
C..| handler

Expand Down Expand Up @@ -385,21 +395,21 @@ _decodeFramedEntity bs = do
Right chunk -> pure chunk

-- | Unpacks a stream of tightly-packed CBOR entities without any framing/separators.
decodeUnframedEntities :: Stream ByteString SyncV2.DownloadEntitiesChunk
decodeUnframedEntities :: forall a. (CBOR.Serialise a) => Stream ByteString a
decodeUnframedEntities = C.transPipe (mapExceptT (lift . stToIO)) $ do
C.await >>= \case
Nothing -> pure ()
Just bs -> do
d <- newDecoder
loop bs d
where
newDecoder :: ConduitT ByteString SyncV2.DownloadEntitiesChunk (ExceptT SyncErr (ST s)) (Maybe ByteString -> ST s (CBOR.IDecode s (SyncV2.DownloadEntitiesChunk)))
newDecoder :: ConduitT ByteString a (ExceptT SyncErr (ST s)) (Maybe ByteString -> ST s (CBOR.IDecode s a))
newDecoder = do
(lift . lift) CBOR.deserialiseIncremental >>= \case
CBOR.Done _ _ _ -> throwError . SyncError . SyncV2.PullError'Sync $ SyncV2.SyncErrorStreamFailure "Invalid initial decoder"
CBOR.Fail _ _ err -> throwError . SyncError . SyncV2.PullError'Sync $ SyncV2.SyncErrorDeserializationFailure err
CBOR.Partial k -> pure k
loop :: ByteString -> (Maybe ByteString -> ST s (CBOR.IDecode s (SyncV2.DownloadEntitiesChunk))) -> ConduitT ByteString SyncV2.DownloadEntitiesChunk (ExceptT SyncErr (ST s)) ()
loop :: ByteString -> (Maybe ByteString -> ST s (CBOR.IDecode s a)) -> ConduitT ByteString a (ExceptT SyncErr (ST s)) ()
loop bs k = do
(lift . lift) (k (Just bs)) >>= \case
CBOR.Fail _ _ err -> throwError . SyncError . SyncV2.PullError'Sync $ SyncV2.SyncErrorDeserializationFailure err
Expand Down Expand Up @@ -441,13 +451,15 @@ syncAPI :: Proxy SyncAPI
syncAPI = Proxy @SyncAPI

downloadEntitiesStreamClientM :: SyncV2.DownloadEntitiesRequest -> Servant.ClientM (Servant.SourceT IO (CBORStream SyncV2.DownloadEntitiesChunk))
causalDependenciesStreamClientM :: SyncV2.CausalDependenciesRequest -> Servant.ClientM (Servant.SourceT IO (CBORStream SyncV2.CausalDependenciesChunk))
SyncV2.Routes
{ downloadEntitiesStream = downloadEntitiesStreamClientM
{ downloadEntitiesStream = downloadEntitiesStreamClientM,
causalDependenciesStream = causalDependenciesStreamClientM
} = Servant.client syncAPI

-- | Helper for running clientM that returns a stream of entities.
-- You MUST consume the stream within the callback, it will be closed when the callback returns.
withConduit :: forall r. Servant.ClientEnv -> (Stream () (SyncV2.DownloadEntitiesChunk) -> StreamM r) -> Servant.ClientM (Servant.SourceIO (CBORStream SyncV2.DownloadEntitiesChunk)) -> StreamM r
withConduit :: forall r chunk. (CBOR.Serialise chunk) => Servant.ClientEnv -> (Stream () chunk -> StreamM r) -> Servant.ClientM (Servant.SourceIO (CBORStream chunk)) -> StreamM r
withConduit clientEnv callback clientM = do
ExceptT $ withRunInIO \runInIO -> do
Servant.withClientM clientM clientEnv $ \case
Expand All @@ -456,7 +468,7 @@ withConduit clientEnv callback clientM = do
conduit <- liftIO $ Servant.fromSourceIO sourceT
(runInIO . runExceptT $ callback (conduit C..| unpackCBORBytesStream))

unpackCBORBytesStream :: Stream (CBORStream SyncV2.DownloadEntitiesChunk) SyncV2.DownloadEntitiesChunk
unpackCBORBytesStream :: (CBOR.Serialise a) => Stream (CBORStream a) a
unpackCBORBytesStream =
C.map (BL.toStrict . coerce @_ @BL.ByteString) C..| decodeUnframedEntities

Expand All @@ -480,7 +492,6 @@ handleClientError clientEnv err =

-- | Stream entities from the codeserver.
httpStreamEntities ::
forall.
Auth.AuthenticatedHttpClient ->
Servant.BaseUrl ->
SyncV2.DownloadEntitiesRequest ->
Expand Down Expand Up @@ -522,6 +533,81 @@ initializeStream stream = do
SyncV2.ErrorC (SyncV2.ErrorChunk err) -> throwError . SyncError $ SyncV2.PullError'DownloadEntities err
SyncV2.InitialC {} -> throwError . SyncError $ SyncV2.PullError'Sync SyncV2.SyncErrorMisplacedInitialChunk

------------------------------------------------------------------------------------------------------------------------
-- Causal Dependency negotiation
------------------------------------------------------------------------------------------------------------------------

httpStreamCausalDependencies ::
forall r.
Auth.AuthenticatedHttpClient ->
Servant.BaseUrl ->
SyncV2.CausalDependenciesRequest ->
(Stream () SyncV2.CausalDependenciesChunk -> StreamM r) ->
StreamM r
httpStreamCausalDependencies (Auth.AuthenticatedHttpClient httpClient) unisonShareUrl req callback = do
let clientEnv =
(Servant.mkClientEnv httpClient unisonShareUrl)
{ Servant.makeClientRequest = \url request ->
-- Disable client-side timeouts
(Servant.defaultMakeClientRequest url request)
<&> \r ->
r
{ Http.Client.responseTimeout = Http.Client.responseTimeoutNone
}
}
(causalDependenciesStreamClientM req) & withConduit clientEnv callback

-- | Ask Share for the dependencies of a given hash jwt,
-- then filter them to get the set of causals which we have and don't need sent.
negotiateKnownCausals ::
-- | The Unison Share URL.
Servant.BaseUrl ->
-- | The branch to download from.
SyncV2.BranchRef ->
-- | The hash to download.
Share.HashJWT ->
Cli (Either (SyncError SyncV2.PullError) (Set Hash32))
negotiateKnownCausals unisonShareUrl branchRef hashJwt = do
Cli.Env {authHTTPClient, codebase} <- ask
liftIO $ Text.hPutStrLn IO.stderr $ " 🔎 Identifying missing entities..."
Timing.time "Causal Negotiation" $ do
liftIO . C.runResourceT . runExceptT $ httpStreamCausalDependencies
authHTTPClient
unisonShareUrl
SyncV2.CausalDependenciesRequest {branchRef, rootCausal = hashJwt}
\stream -> do
Set.fromList <$> C.runConduit (stream C..| C.map unpack C..| findKnownDeps codebase C..| C.sinkList)
where
-- Go through the dependencies of the remote root from top-down, yielding all causal hashes that we already
-- have until we find one in the causal spine we already have, then yield that one and stop since we'll implicitly
-- have all of its dependencies.
findKnownDeps :: Codebase.Codebase IO v a -> Stream (Hash32, DependencyType) Hash32
findKnownDeps codebase = do
C.await >>= \case
Just (hash, LibDependency) -> do
-- We yield all lib dependencies we have, it's possible we don't have any of the causal spine in common, but _do_ have
-- some of the libraries we can still save a lot of work.
whenM (lift $ haveCausalHash codebase hash) (C.yield hash)
-- We continue regardless.
findKnownDeps codebase
Just (hash, CausalSpineDependency) -> do
lift (haveCausalHash codebase hash) >>= \case
True -> do
-- If we find a causal hash we have in the spine, we don't need to look further,
-- we can pass it on, then hang up the stream.
C.yield hash
False -> do
-- Otherwise we keep looking, maybe we'll have one further in.
findKnownDeps codebase
Nothing -> pure ()
unpack :: SyncV2.CausalDependenciesChunk -> (Hash32, DependencyType)
unpack = \case
SyncV2.CausalHashDepC {causalHash, dependencyType} -> (causalHash, dependencyType)
haveCausalHash :: Codebase.Codebase IO v a -> Hash32 -> StreamM Bool
haveCausalHash codebase causalHash = do
liftIO $ Codebase.runTransaction codebase do
Q.causalExistsByHash32 causalHash

------------------------------------------------------------------------------------------------------------------------
-- Progress Tracking
------------------------------------------------------------------------------------------------------------------------
Expand Down
8 changes: 7 additions & 1 deletion unison-share-api/src/Unison/SyncV2/API.hs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,13 @@ type DownloadEntitiesStream =
ReqBody '[CBOR, JSON] DownloadEntitiesRequest
:> StreamPost NoFraming OctetStream (SourceIO (CBORStream DownloadEntitiesChunk))

-- | Get the relevant dependencies of a causal, including the causal spine and the causal hashes of any library roots.
type CausalDependenciesStream =
ReqBody '[CBOR, JSON] CausalDependenciesRequest
:> StreamPost NoFraming OctetStream (SourceIO (CBORStream CausalDependenciesChunk))

data Routes mode = Routes
{ downloadEntitiesStream :: mode :- "entities" :> "download" :> DownloadEntitiesStream
{ downloadEntitiesStream :: mode :- "entities" :> "download" :> DownloadEntitiesStream,
causalDependenciesStream :: mode :- "entities" :> "dependencies" :> CausalDependenciesStream
}
deriving stock (Generic)
90 changes: 89 additions & 1 deletion unison-share-api/src/Unison/SyncV2/Types.hs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
{-# LANGUAGE RecordWildCards #-}

module Unison.SyncV2.Types
( DownloadEntitiesRequest (..),
DownloadEntitiesChunk (..),
Expand All @@ -6,8 +8,11 @@ module Unison.SyncV2.Types
StreamInitInfo (..),
SyncError (..),
DownloadEntitiesError (..),
CausalDependenciesRequest (..),
CausalDependenciesChunk (..),
DependencyType (..),
CBORBytes (..),
CBORStream(..),
CBORStream (..),
EntityKind (..),
serialiseCBORBytes,
deserialiseOrFailCBORBytes,
Expand All @@ -24,6 +29,7 @@ import Codec.Serialise qualified as CBOR
import Codec.Serialise.Decoding qualified as CBOR
import Control.Exception (Exception)
import Data.Aeson (FromJSON (..), ToJSON (..), object, withObject, (.:), (.=))
import Data.Aeson qualified as Aeson
import Data.Map (Map)
import Data.Map qualified as Map
import Data.Set (Set)
Expand Down Expand Up @@ -299,3 +305,85 @@ instance Serialise EntityKind where
3 -> pure TypeEntity
4 -> pure PatchEntity
_ -> fail "invalid tag"

------------------------------------------------------------------------------------------------------------------------
-- Causal Dependencies

data CausalDependenciesRequest = CausalDependenciesRequest
{ branchRef :: BranchRef,
rootCausal :: HashJWT
}
deriving stock (Show, Eq, Ord)

instance ToJSON CausalDependenciesRequest where
toJSON (CausalDependenciesRequest branchRef rootCausal) =
object
[ "branch_ref" .= branchRef,
"root_causal" .= rootCausal
]

instance FromJSON CausalDependenciesRequest where
parseJSON = Aeson.withObject "CausalDependenciesRequest" \obj -> do
branchRef <- obj .: "branch_ref"
rootCausal <- obj .: "root_causal"
pure CausalDependenciesRequest {..}

instance Serialise CausalDependenciesRequest where
encode (CausalDependenciesRequest {branchRef, rootCausal}) =
encode branchRef <> encode rootCausal
decode = CausalDependenciesRequest <$> decode <*> decode

data DependencyType
= -- This is a top-level history node of the root we're pulling.
CausalSpineDependency
| -- This is the causal root of a library dependency.
LibDependency
deriving (Show, Eq, Ord)

instance Serialise DependencyType where
encode = \case
CausalSpineDependency -> CBOR.encodeWord8 0
LibDependency -> CBOR.encodeWord8 1
decode = do
tag <- CBOR.decodeWord8
case tag of
0 -> pure CausalSpineDependency
1 -> pure LibDependency
_ -> fail "invalid tag"

instance ToJSON DependencyType where
toJSON = \case
CausalSpineDependency -> "causal_spine"
LibDependency -> "lib"

instance FromJSON DependencyType where
parseJSON = Aeson.withText "DependencyType" \case
"causal_spine" -> pure CausalSpineDependency
"lib" -> pure LibDependency
_ -> fail "invalid DependencyType"

-- | A chunk of the download entities response stream.
data CausalDependenciesChunk
= CausalHashDepC {causalHash :: Hash32, dependencyType :: DependencyType}
deriving (Show, Eq, Ord)

data CausalDependenciesChunkTag = CausalHashDepChunkTag
deriving (Show, Eq, Ord)

instance Serialise CausalDependenciesChunkTag where
encode = \case
CausalHashDepChunkTag -> CBOR.encodeWord8 0
decode = do
tag <- CBOR.decodeWord8
case tag of
0 -> pure CausalHashDepChunkTag
_ -> fail "invalid tag"

instance Serialise CausalDependenciesChunk where
encode = \case
(CausalHashDepC {causalHash, dependencyType}) -> do
encode CausalHashDepChunkTag <> CBOR.encode causalHash <> CBOR.encode dependencyType
decode = do
tag <- decode
case tag of
CausalHashDepChunkTag -> CausalHashDepC <$> CBOR.decode <*> CBOR.decode
Loading