Skip to content

Commit

Permalink
Use the same code path for local and remote syncs
Browse files Browse the repository at this point in the history
In the Copy module, local syncs do a direct copy and that's it. Remote
syncs meanwhile check for already transferred files, check for partially
transferred resumable files, can use the rsync algorithm and an external
copyprog.

Make local and remote syncs use the same code path. The functionality
for both cases is now the same, but since the code was optimized for the
remote case then there could be some optimization opportunities for
local syncs. This is something this patch does not include.
  • Loading branch information
tleedjarv committed Apr 1, 2024
1 parent f55cf70 commit f3fde14
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 34 deletions.
21 changes: 7 additions & 14 deletions src/copy.ml
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ let rec fingerprintPrefix fspath path offset len accu =
end

let fingerprintPrefixRemotely =
Remote.registerServerCmd
Remote.registerRemoteCmd
"fingerprintSubfile"
Umarshal.(prod3 Fspath.m Path.mlocal Uutil.Filesize.m id id)
Umarshal.(list Fingerprint.m)
Expand Down Expand Up @@ -382,7 +382,7 @@ let loadPropsExtDataLocal (fspath, path, desc) =
| `Global p -> Update.translatePathLocal fspath p in
(Some localPath, Props.loadExtData fspath localPath desc)

let loadPropsExtDataOnServer = Remote.registerServerCmd "propsExtData"
let loadPropsExtDataOnServer = Remote.registerRemoteCmd "propsExtData"
Umarshal.(prod3 Fspath.m mxpath Props.m id id)
Umarshal.(prod2 (option Path.mlocal) Props.mx id id)
(fun connFrom args -> Lwt.return (loadPropsExtDataLocal args))
Expand Down Expand Up @@ -638,7 +638,7 @@ let convV0 = Remote.makeConvV0FunArg
((biOpt, fspathFrom, pathFrom, fileKind), (sizeFrom, id, file_id)))

let compressRemotely =
Remote.registerServerCmd "compress" ~convV0 mcompress Umarshal.unit compress
Remote.registerRemoteCmd "compress" ~convV0 mcompress Umarshal.unit compress

let close_all infd outfd =
Util.convertUnixErrorsToTransient
Expand Down Expand Up @@ -1181,17 +1181,10 @@ let file rootFrom pathFrom rootTo fspathTo pathTo realPathTo
(Fspath.toDebugString fspathTo) (Path.toString pathTo)
(Props.toString desc));
let timer = Trace.startTimer "Transmitting file" in
begin match rootFrom, rootTo with
(Common.Local, fspathFrom), (Common.Local, realFspathTo) ->
localFile
fspathFrom pathFrom fspathTo pathTo realPathTo
update desc (Osx.ressLength ress) (Some id);
paranoidCheck fspathTo pathTo realPathTo desc fp ress
| _ ->
transferFile
rootFrom pathFrom rootTo fspathTo pathTo realPathTo
update desc fp ress id
end >>= fun status ->
transferFile
rootFrom pathFrom rootTo fspathTo pathTo realPathTo
update desc fp ress id
>>= fun status ->
Trace.showTimer timer;
match status with
TransferSucceeded info ->
Expand Down
46 changes: 34 additions & 12 deletions src/remote.ml
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,9 @@ let connectionIO conn =
let setConnectionVersion conn ver =
conn.version <- ver

let connectionVersion conn = conn.version
let connectionVersion = function
| None -> rpcDefaultVersion
| Some conn -> conn.version

let connEq conn conn' =
conn.inputBuffer.channel = conn'.inputBuffer.channel
Expand Down Expand Up @@ -596,7 +598,12 @@ module ClientConn = struct

end (* module ClientConn *)

let connectionOfRoot root = ClientConn.ofRoot root
let connectionOfRoot root =
(* This is not the same as [ClientConn.ofRootOpt]. We want this function
to fail when the remote connection is not found. *)
match root with
| (Common.Local, _) -> None
| (Common.Remote _ , _) -> Some (ClientConn.ofRoot root)

(****)

Expand Down Expand Up @@ -1141,6 +1148,17 @@ let registerServerCmd name ?(convV0=convV0_id_pair) mArg mRet f =
name (defaultMarshalingFunctions (fst convV0) mArg)
(defaultMarshalingFunctions (snd convV0) mRet) f

(* Same as [registerServerCmd] but returns a function that runs either
the proxy or the local version, depending on whether the call is to
the local host (in this case [conn] is None) or a remote one. *)
let registerRemoteCmd name ?convV0 mArg mRet f =
let serverSide = (fun conn args -> f (Some conn) args) in
let client0 = registerServerCmd name ?convV0 mArg mRet serverSide in
fun conn args ->
match conn with
| None -> f None args
| Some conn -> client0 conn args

(* RegisterHostCmd is a simpler version of registerClientServer [registerServerCmd?].
It is used to create remote procedure calls: the only communication
between the client and server is the sending of arguments from
Expand Down Expand Up @@ -1174,16 +1192,15 @@ let registerRootCmd (cmdName : string)
fun root args -> r root ((snd root), args)

let registerRootCmdWithConnection (cmdName : string)
?(convV0=convV0_id_pair) mArg mRet (cmd : connection -> 'a -> 'b) =
let client0 = registerServerCmd cmdName ~convV0 mArg mRet cmd in
?(convV0=convV0_id_pair) mArg mRet (cmd : connection option -> 'a -> 'b) =
let serverSide = (fun conn args -> cmd (Some conn) args) in
let client0 = registerServerCmd cmdName ~convV0 mArg mRet serverSide in
(* Return a function that runs either the proxy or the local version,
depending on whether the call is to the local host or a remote one *)
fun localRoot remoteRoot args ->
match (fst localRoot) with
| Common.Local -> let conn = ClientConn.ofRoot remoteRoot in
cmd conn args
| _ -> let conn = ClientConn.ofRoot localRoot in
client0 conn args
| Common.Local -> cmd (connectionOfRoot remoteRoot) args
| _ -> client0 (ClientConn.ofRoot localRoot) args

let streamReg = lwtRegionWithConnCleanup 1

Expand All @@ -1199,13 +1216,13 @@ let streamingActivated =
let registerStreamCmd
(cmdName : string)
marshalingFunctionsArgs
(serverSide : connection -> 'a -> unit)
(serverSide : connection option -> 'a -> unit)
=
let cmd =
registerSpecialServerCmd
cmdName marshalingFunctionsArgs
(defaultMarshalingFunctions convV0_id Umarshal.unit)
(fun conn v -> serverSide conn v; Lwt.return ())
(fun conn v -> serverSide (Some conn) v; Lwt.return ())
in
let ping =
registerServerCmd (cmdName ^ "Ping") Umarshal.int Umarshal.unit
Expand All @@ -1227,7 +1244,7 @@ let registerStreamCmd
(* Create a server function and remember it *)
let server conn buf =
let args = unmarshalArgs conn buf in
serverSide conn args
serverSide (Some conn) args
in
serverStreams := Util.StringMap.add cmdName server !serverStreams;
(* Create a client function and return it *)
Expand All @@ -1240,7 +1257,7 @@ let registerStreamCmd
in
dumpIdle conn request
in
fun conn sender ->
let proxy conn sender =
if not (Prefs.read streamingActivated) then
sender (fun v -> cmd conn v)
else begin
Expand All @@ -1259,6 +1276,11 @@ let registerStreamCmd
end else
Lwt.fail e)
end
in
fun conn sender ->
match conn with
| None -> sender (fun v -> Lwt.return (serverSide conn v))
| Some conn -> proxy conn sender

let commandAvailable =
registerRootCmd "commandAvailable" Umarshal.string Umarshal.bool
Expand Down
19 changes: 11 additions & 8 deletions src/remote.mli
Original file line number Diff line number Diff line change
Expand Up @@ -126,15 +126,18 @@ module MsgIdMap : Map.S with type key = msgId
val newMsgId : unit -> msgId

type connection
val connectionVersion : connection -> int
val connectionOfRoot : Common.root -> connection
val connectionVersion : connection option -> int
val connectionOfRoot : Common.root -> connection option
(* [connectionOfRoot] is None for a local root,
Some connection for a remote root,
raises if a remote connection is not found. *)

val registerServerCmd :
val registerRemoteCmd :
string
-> ?convV0: 'a convV0Fun * 'b convV0Fun
-> 'a Umarshal.t -> 'b Umarshal.t
-> (connection -> 'a -> 'b Lwt.t)
-> connection -> 'a -> 'b Lwt.t
-> (connection option -> 'a -> 'b Lwt.t)
-> connection option -> 'a -> 'b Lwt.t
val intSize : int
val encodeInt : int -> Bytearray.t * int * int
val decodeInt : Bytearray.t -> int -> int
Expand All @@ -144,7 +147,7 @@ val registerRootCmdWithConnection :
(* 2.51-compatibility functions for args
and result *)
-> 'a Umarshal.t -> 'b Umarshal.t
-> (connection -> 'a -> 'b Lwt.t) (* local command *)
-> (connection option -> 'a -> 'b Lwt.t) (* local command *)
-> Common.root (* root on which the command is executed *)
-> Common.root (* other root *)
-> 'a (* additional arguments *)
Expand All @@ -157,8 +160,8 @@ val registerStreamCmd :
(connection -> 'a ->
(Bytearray.t * int * int) list -> (Bytearray.t * int * int) list * int) *
(connection -> Bytearray.t -> int -> 'a) ->
(connection -> 'a -> unit) ->
connection -> (('a -> unit Lwt.t) -> 'b Lwt.t) -> 'b Lwt.t
(connection option -> 'a -> unit) ->
connection option -> (('a -> unit Lwt.t) -> 'b Lwt.t) -> 'b Lwt.t

(* Register a function to be run when the connection between client and server
is closed (willingly or unexpectedly). The function should not raise
Expand Down

0 comments on commit f3fde14

Please sign in to comment.