[Unison-hackers] [unison-svn] r372 - in trunk/src: . lwt

vouillon@seas.upenn.edu vouillon at seas.upenn.edu
Mon Jul 13 18:26:15 EDT 2009


Author: vouillon
Date: 2009-07-13 18:26:15 -0400 (Mon, 13 Jul 2009)
New Revision: 372

Modified:
   trunk/src/RECENTNEWS
   trunk/src/copy.ml
   trunk/src/copy.mli
   trunk/src/lwt/lwt_util.ml
   trunk/src/mkProjectInfo.ml
   trunk/src/transfer.ml
   trunk/src/transfer.mli
   trunk/src/transport.ml
   trunk/src/uigtk2.ml
   trunk/src/update.ml
   trunk/src/update.mli
Log:
* When a file transfer fails, turn off fastcheck for this file on the
  next sync.
* Limit the number of simultaneous transfer using rsync
  (as the rsync algorithm can use a large amount of memory when
   processing huge files)
* Raise the number of concurrent threads, as there is not much reason
  to leave it low anymore.


Modified: trunk/src/RECENTNEWS
===================================================================
--- trunk/src/RECENTNEWS	2009-07-13 12:38:43 UTC (rev 371)
+++ trunk/src/RECENTNEWS	2009-07-13 22:26:15 UTC (rev 372)
@@ -1,5 +1,16 @@
 CHANGES FROM VERSION 2.36.-27
 
+* When a file transfer fails, turn off fastcheck for this file on the
+  next sync.
+* Limit the number of simultaneous transfer using rsync
+  (as the rsync algorithm can use a large amount of memory when
+   processing huge files)
+* Raise the number of concurrent threads, as there is not much reason
+  to leave it low anymore.
+
+-------------------------------
+CHANGES FROM VERSION 2.36.-27
+
 * Clean-up in remote.ml
 * Dead-lock free flow control mechanism
 

Modified: trunk/src/copy.ml
===================================================================
--- trunk/src/copy.ml	2009-07-13 12:38:43 UTC (rev 371)
+++ trunk/src/copy.ml	2009-07-13 22:26:15 UTC (rev 372)
@@ -42,6 +42,11 @@
 
 (****)
 
+(* From update.ml *)
+(* (there is a dependency loop between copy.ml and update.ml...) *)
+let excelFile = ref (fun _ -> false)
+let markPossiblyUpdated = ref (fun _ _ -> ())
+
 (* Check whether the source file has been modified during synchronization *)
 let checkContentsChangeLocal
       fspathFrom pathFrom archDesc archDig archStamp archRess paranoid =
@@ -55,9 +60,7 @@
   let dataClearlyUnchanged =
     not clearlyModified
     && Props.same_time info.Fileinfo.desc archDesc
-(*FIX: should export from update.ml?
-    && not (excelFile path)
-*)
+    && not (!excelFile pathFrom)
     && match archStamp with
          Some (Fileinfo.InodeStamp inode) -> info.Fileinfo.inode = inode
        | Some (Fileinfo.CtimeStamp ctime) -> true
@@ -71,13 +74,15 @@
   if dataClearlyUnchanged && ressClearlyUnchanged then begin
     if paranoid then begin
       let newDig = Os.fingerprint fspathFrom pathFrom info in
-      if archDig <> newDig then
+      if archDig <> newDig then begin
+        !markPossiblyUpdated fspathFrom pathFrom;
         raise (Util.Transient (Printf.sprintf
           "The source file %s\n\
            has been modified but the fast update detection mechanism\n\
            failed to detect it.  Try running once with the fastcheck\n\
            option set to 'no'."
           (Fspath.toPrintString (Fspath.concat fspathFrom pathFrom))))
+      end
     end
   end else if
     clearlyModified
@@ -403,6 +408,12 @@
   | Some fd ->
       fd
 
+let rsyncReg = Lwt_util.make_region (40 * 1024)
+let rsyncThrottle useRsync sz f =
+  if not useRsync then f () else
+  let l = Transfer.Rsync.memoryFootprint sz in
+  Lwt_util.run_in_region rsyncReg l f
+
 let transferFileContents
       connFrom fspathFrom pathFrom fspathTo pathTo realPathTo update
       fileKind srcFileSize id =
@@ -412,19 +423,26 @@
   let infd = ref None in
   let showProgress count =
     Uutil.showProgress id (Uutil.Filesize.ofInt count) "r" in
-  let (bi, decompr) =
+
+  let destFileSize =
     match update with
-      `Update (destFileDataSize, destFileRessSize)
-          when Prefs.read rsyncActivated
-                 &&
-               let destFileSize =
-                 match fileKind with
-                   `DATA -> destFileDataSize
-                 | `RESS -> destFileRessSize
-               in
-               Transfer.Rsync.aboveRsyncThreshold destFileSize
-                 &&
-               Transfer.Rsync.aboveRsyncThreshold srcFileSize ->
+      `Copy ->
+        Uutil.Filesize.zero
+    | `Update (destFileDataSize, destFileRessSize) ->
+        match fileKind with
+            `DATA -> destFileDataSize
+          | `RESS -> destFileRessSize
+  in
+  let useRsync =
+    Prefs.read rsyncActivated
+      &&
+    Transfer.Rsync.aboveRsyncThreshold destFileSize
+      &&
+    Transfer.Rsync.aboveRsyncThreshold srcFileSize
+  in
+  rsyncThrottle useRsync destFileSize (fun () ->
+    let (bi, decompr) =
+      if useRsync then
         Util.convertUnixErrorsToTransient
           "preprocessing file"
           (fun () ->
@@ -444,7 +462,7 @@
                 Transfer.Rsync.rsyncDecompress ifd fd showProgress ti
               in
               if eof then begin close_out fd; outfd := None end))
-    | _ ->
+      else
         (None,
          (* Simple generic decompressor *)
          fun ti ->
@@ -452,23 +470,23 @@
            destinationFd fspathTo pathTo fileKind srcFileSize outfd id in
          let eof = Transfer.receive fd showProgress ti in
          if eof then begin close_out fd; outfd := None end)
-  in
-  let file_id = Remote.newMsgId () in
-  Lwt.catch
-    (fun () ->
-       decompressor := Remote.MsgIdMap.add file_id decompr !decompressor;
-       compressRemotely connFrom
-         (bi, fspathFrom, pathFrom, fileKind, srcFileSize, id, file_id)
-         >>= fun () ->
-       decompressor :=
-         Remote.MsgIdMap.remove file_id !decompressor; (* For GC *)
-       close_all infd outfd;
-       Lwt.return ())
-    (fun e ->
-       decompressor :=
-         Remote.MsgIdMap.remove file_id !decompressor; (* For GC *)
-       close_all_no_error infd outfd;
-       Lwt.fail e)
+    in
+    let file_id = Remote.newMsgId () in
+    Lwt.catch
+      (fun () ->
+         decompressor := Remote.MsgIdMap.add file_id decompr !decompressor;
+         compressRemotely connFrom
+           (bi, fspathFrom, pathFrom, fileKind, srcFileSize, id, file_id)
+           >>= fun () ->
+         decompressor :=
+           Remote.MsgIdMap.remove file_id !decompressor; (* For GC *)
+         close_all infd outfd;
+         Lwt.return ())
+      (fun e ->
+         decompressor :=
+           Remote.MsgIdMap.remove file_id !decompressor; (* For GC *)
+         close_all_no_error infd outfd;
+         Lwt.fail e))
 
 (****)
 
@@ -739,8 +757,6 @@
     f ()
   else
     let bufSz = bufferSize (max (Props.length desc) (Osx.ressLength ress)) in
-    (* This must be on the client: any lock on the server side may result
-       in a deadlock under windows *)
     Lwt_util.run_in_region transferFileReg bufSz f
 
 (****)

Modified: trunk/src/copy.mli
===================================================================
--- trunk/src/copy.mli	2009-07-13 12:38:43 UTC (rev 371)
+++ trunk/src/copy.mli	2009-07-13 22:26:15 UTC (rev 372)
@@ -27,3 +27,8 @@
  -> Uutil.Filesize.t     (* fork length *)
  -> Uutil.File.t option  (* file's index in UI (for progress bars), as appropriate *)
  -> unit
+
+(* From update.ml *)
+(* (there is a dependency loop between copy.ml and update.ml...) *)
+val excelFile : (Path.local -> bool) ref
+val markPossiblyUpdated : (Fspath.t -> Path.local -> unit) ref

Modified: trunk/src/lwt/lwt_util.ml
===================================================================
--- trunk/src/lwt/lwt_util.ml	2009-07-13 12:38:43 UTC (rev 371)
+++ trunk/src/lwt/lwt_util.ml	2009-07-13 22:26:15 UTC (rev 372)
@@ -66,7 +66,7 @@
 
 let leave_region reg sz =
    try
-     if reg.count > reg.size then raise Queue.Empty;
+     if reg.count - sz >= reg.size then raise Queue.Empty;
      let (w, sz') = Queue.take reg.waiters in
      reg.count <- reg.count - sz + sz';
      Lwt.wakeup w ()

Modified: trunk/src/mkProjectInfo.ml
===================================================================
--- trunk/src/mkProjectInfo.ml	2009-07-13 12:38:43 UTC (rev 371)
+++ trunk/src/mkProjectInfo.ml	2009-07-13 22:26:15 UTC (rev 372)
@@ -90,3 +90,4 @@
 
 
 
+

Modified: trunk/src/transfer.ml
===================================================================
--- trunk/src/transfer.ml	2009-07-13 12:38:43 UTC (rev 371)
+++ trunk/src/transfer.ml	2009-07-13 22:26:15 UTC (rev 372)
@@ -364,6 +364,14 @@
     Trace.showTimer timer;
     bi
 
+  (* Expected size of the [rsync_block_info] datastructure (in KiB). *)
+  (* The calculation here are for a 64 bit architecture. *)
+  (* When serialized, the datastructure takes currently 24 bytes per block. *)
+  (* In theory, 12 byte per block should be enough! *)
+  let memoryFootprint sz =
+    Int64.to_int
+      (min (Int64.div (Uutil.Filesize.toInt64 sz) 716800L) 16384L)
+    * 72
 
   (*** DECOMPRESSION ***)
 

Modified: trunk/src/transfer.mli
===================================================================
--- trunk/src/transfer.mli	2009-07-13 12:38:43 UTC (rev 371)
+++ trunk/src/transfer.mli	2009-07-13 22:26:15 UTC (rev 372)
@@ -77,6 +77,9 @@
     (* Built from the old file by the destination computer *)
     type rsync_block_info
 
+    (* Expected size of the [rsync_block_info] datastructure (in KiB). *)
+    val memoryFootprint : Uutil.Filesize.t -> int
+
     (* Compute block informations from the old file *)
     val rsyncPreprocess :
 	   in_channel            (* old file descriptor *)

Modified: trunk/src/transport.ml
===================================================================
--- trunk/src/transport.ml	2009-07-13 12:38:43 UTC (rev 371)
+++ trunk/src/transport.ml	2009-07-13 22:26:15 UTC (rev 372)
@@ -36,16 +36,18 @@
       assert false
 
 let maxthreads =
-  Prefs.createInt "maxthreads" 20
+  Prefs.createInt "maxthreads" 0
     "!maximum number of simultaneous file transfers"
-    ("This preference controls how much concurrency is allowed during"
-     ^ " the transport phase.  Normally, it should be set reasonably high "
-     ^ "(default is 20) to maximize performance, but when Unison is used "
-     ^ "over a low-bandwidth link it may be helpful to set it lower (e.g. "
-     ^ "to 1) so that Unison doesn't soak up all the available bandwidth."
-    )
+    ("This preference controls how much concurrency is allowed during \
+      the transport phase.  Normally, it should be set reasonably high \
+      to maximize performance, but when Unison is used over a \
+      low-bandwidth link it may be helpful to set it lower (e.g. \
+      to 1) so that Unison doesn't soak up all the available bandwidth. \
+      The default is the special value 0, which mean 20 threads \
+      when file content streaming is desactivated and 1000 threads \
+      when it is activated.")
 
-let actionReg = Lwt_util.make_region (Prefs.read maxthreads)
+let actionReg = Lwt_util.make_region 50
 
 (* Logging for a thread: write a message before and a message after the
    execution of the thread. *)
@@ -76,13 +78,16 @@
       Printf.sprintf "[END] %s\n" lwtShortDescription)
 
 let doAction fromRoot fromPath fromContents toRoot toPath toContents id =
-  Lwt_util.resize_region actionReg (Prefs.read maxthreads);
   (* When streaming, we can transfer many file simultaneously:
      as the contents of only one file is transferred in one direction
      at any time, little ressource is consumed this way. *)
-  Lwt_util.resize_region Files.copyReg
-    (if Prefs.read Remote.streamingActivated then 4000 else
-     Prefs.read maxthreads);
+  let limit =
+    let n = Prefs.read maxthreads in
+    if n > 0 then n else
+    if Prefs.read Remote.streamingActivated then 1000 else 20
+  in
+  Lwt_util.resize_region actionReg limit;
+  Lwt_util.resize_region Files.copyReg limit;
   Lwt_util.run_in_region actionReg 1 (fun () ->
     if not !Trace.sendLogMsgsToStderr then
       Trace.statusDetail (Path.toString toPath);

Modified: trunk/src/uigtk2.ml
===================================================================
--- trunk/src/uigtk2.ml	2009-07-13 12:38:43 UTC (rev 371)
+++ trunk/src/uigtk2.ml	2009-07-13 22:26:15 UTC (rev 372)
@@ -2047,6 +2047,15 @@
        ~callback:(fun () ->
                     getLock synchronize) ());
 
+  (* Does not quite work: too slow, and Files.copy must be modifed to
+     support an interruption without error. *)
+  (*
+  ignore (actionBar#insert_button ~text:"Stop"
+            ~icon:((GMisc.image ~stock:`STOP ())#coerce)
+            ~tooltip:"Exit Unison"
+            ~callback:Abort.all ());
+  *)
+
   (*********************************************************************
     Rescan button
    *********************************************************************)

Modified: trunk/src/update.ml
===================================================================
--- trunk/src/update.ml	2009-07-13 12:38:43 UTC (rev 371)
+++ trunk/src/update.ml	2009-07-13 22:26:15 UTC (rev 372)
@@ -38,13 +38,10 @@
   time the format is modified *)
 (*FIX: also make Jerome's suggested change about file times (see his mesg in
        unison-pending email folder). *)
-(*FIX: one should also store whether we are in case-insensitive mode
-  in the archive and check the mode has not changed when the archive
-  is loaded *)
 (*FIX: we could also drop the use of 8.3-style filenames on Windows, next
   time the format is changed *)
-(* FIX: Another thing we should really consider doing is leaving a flag
-   in the archive when a file transfer fails and turning off fastcheck
+(* FIX: use a special stamp rather than the current hack to leave a flag
+   in the archive when a file transfer fails so as to turn off fastcheck
    for this file on the next sync. *)
 (*FIX: consider changing the way case-sensitivity mode is stored in
   the archive *)
@@ -1834,6 +1831,51 @@
 (*                  Make sure no change has happened                     *)
 (*************************************************************************)
 
+let fastCheckMiss path desc ress oldDesc oldRess =
+  useFastChecking()
+    &&
+  Props.same_time desc oldDesc
+    &&
+  Props.length desc = Props.length oldDesc
+    &&
+  not (excelFile path)
+    &&
+  Osx.ressUnchanged oldRess ress None true
+
+let doMarkPossiblyUpdated arch =
+  match arch with
+    ArchiveFile (desc, dig, stamp, ress) ->
+      (* It would be cleaner to have a special stamp for this *)
+      ArchiveFile (desc, dig, Fileinfo.InodeStamp (-1), ress)
+  | _ ->
+      (* Should not happen, actually.  But this is hard to test... *)
+      arch
+
+let markPossiblyUpdated fspath path =
+  debug (fun() ->
+    Util.msg "markPossiblyUpdated %s %s\n"
+      (Fspath.toDebugString fspath) (Path.toString path));
+  let root = thisRootsGlobalName fspath in
+  let archive = getArchive root in
+  let archive =
+    updatePathInArchive archive fspath Path.empty path
+      (fun arch _ -> doMarkPossiblyUpdated arch) in
+  setArchiveLocal root archive
+
+let rec markPossiblyUpdatedRec fspath path ui =
+  match ui with
+    Updates (File (desc, ContentsUpdated (_, _, ress)),
+             Previous (`FILE, oldDesc, _, oldRess)) ->
+      if fastCheckMiss path desc ress oldDesc oldRess then
+        markPossiblyUpdated fspath path
+  | Updates (Dir (_, uiChildren, _, _), _) ->
+      List.iter
+        (fun (nm, uiChild) ->
+           markPossiblyUpdatedRec fspath (Path.child path nm) uiChild)
+        uiChildren
+  | _ ->
+      ()
+
 let reportUpdate warnFastCheck explanation =
   let msg =
     "Destination updated during synchronization\n" ^ explanation ^
@@ -1861,16 +1903,7 @@
            (Path.toString path))
   | Updates (File (desc, ContentsUpdated (_, _, ress)),
              Previous (`FILE, oldDesc, _, oldRess)) ->
-      reportUpdate
-        (useFastChecking()
-           &&
-         Props.same_time desc oldDesc
-           &&
-         Props.length desc = Props.length oldDesc
-           &&
-         not (excelFile path)
-           &&
-         Osx.ressUnchanged oldRess ress None true)
+      reportUpdate (fastCheckMiss path desc ress oldDesc oldRess)
         (Format.sprintf "The contents of file %s has been modified\n"
            (Path.toString path))
   | Updates (File (_, ContentsUpdated _), _) ->
@@ -1911,6 +1944,7 @@
   let archive = updateArchiveRec ui archive in
   (* ...and check that this is a good description of what's out in the world *)
   let (_, uiNew) = buildUpdateRec archive fspath localPath false in
+  markPossiblyUpdatedRec fspath pathInArchive uiNew;
   explainUpdate pathInArchive uiNew
 
 (*****************************************************************************)
@@ -1987,3 +2021,10 @@
   let archive = getArchive root in
   let (_, subArch) = getPathInArchive archive Path.empty path in
   updateSizeRec subArch ui
+
+(*****)
+
+(* There is a dependency loop between copy.ml and update.ml... *)
+let _ =
+Copy.excelFile := excelFile;
+Copy.markPossiblyUpdated := markPossiblyUpdated

Modified: trunk/src/update.mli
===================================================================
--- trunk/src/update.mli	2009-07-13 12:38:43 UTC (rev 371)
+++ trunk/src/update.mli	2009-07-13 22:26:15 UTC (rev 372)
@@ -38,6 +38,9 @@
 (* Check that no updates has taken place in a given place of the filesystem *)
 val checkNoUpdates : Fspath.t -> Path.local -> Common.updateItem -> unit
 
+(* Turn off fastcheck for the given file on the next sync. *)
+val markPossiblyUpdated : Fspath.t -> Path.local -> unit
+
 (* Save to disk the archive updates *)
 val commitUpdates : unit -> unit
 



More information about the Unison-hackers mailing list