[Unison-hackers] [unison-svn] r371 - trunk/src

vouillon@seas.upenn.edu vouillon at seas.upenn.edu
Mon Jul 13 08:38:43 EDT 2009


Author: vouillon
Date: 2009-07-13 08:38:43 -0400 (Mon, 13 Jul 2009)
New Revision: 371

Modified:
   trunk/src/RECENTNEWS
   trunk/src/mkProjectInfo.ml
   trunk/src/remote.ml
Log:
* Clean-up in remote.ml
* Dead-lock free flow control mechanism


Modified: trunk/src/RECENTNEWS
===================================================================
--- trunk/src/RECENTNEWS	2009-07-12 16:28:33 UTC (rev 370)
+++ trunk/src/RECENTNEWS	2009-07-13 12:38:43 UTC (rev 371)
@@ -1,5 +1,11 @@
 CHANGES FROM VERSION 2.36.-27
 
+* Clean-up in remote.ml
+* Dead-lock free flow control mechanism
+
+-------------------------------
+CHANGES FROM VERSION 2.36.-27
+
 * Fixed bug with case insensitive mode on a case sensitive filesystem:
   - if file "a/a" is created on one replica and directory "A" is
     created on the other, the file failed to be synchronized the first

Modified: trunk/src/mkProjectInfo.ml
===================================================================
--- trunk/src/mkProjectInfo.ml	2009-07-12 16:28:33 UTC (rev 370)
+++ trunk/src/mkProjectInfo.ml	2009-07-13 12:38:43 UTC (rev 371)
@@ -89,3 +89,4 @@
 
 
 
+

Modified: trunk/src/remote.ml
===================================================================
--- trunk/src/remote.ml	2009-07-12 16:28:33 UTC (rev 370)
+++ trunk/src/remote.ml	2009-07-13 12:38:43 UTC (rev 371)
@@ -33,6 +33,15 @@
   Scanf.sscanf Sys.ocaml_version "%d.%d"
     (fun maj min -> (maj = 3 && min >= 11) || maj > 3)
 
+(*
+   Flow-control mechanism (only active under windows).
+   Only one side is allowed to send messages at any given time.
+   Once it has finished sending messages, a special message is sent
+   meaning that the destination is now allowed to send messages.
+*)
+let needFlowControl = windowsHack
+let readOrWrite = needFlowControl && not recent_ocaml
+
 (****)
 
 let intSize = 5
@@ -65,10 +74,10 @@
 (*                           LOW-LEVEL IO                                *)
 (*************************************************************************)
 
-let lost_connection () =
+let lostConnection () =
   Lwt.fail (Util.Fatal "Lost connection with the server")
 
-let catch_io_errors th =
+let catchIoErrors th =
   Lwt.catch th
     (fun e ->
        match e with
@@ -77,63 +86,66 @@
          (* Windows may also return the following errors... *)
        | Unix.Unix_error(Unix.EINVAL, _, _) ->
          (* Client has closed its end of the connection *)
-           lost_connection ()
+           lostConnection ()
        | _ ->
            Lwt.fail e)
 
 (****)
 
-type connection =
-  { inputChannel : Unix.file_descr;
-    inputBuffer : string;
-    mutable inputLength : int;
-    outputChannel : Unix.file_descr;
-    outputBuffer : string;
-    mutable outputLength : int;
-    outputQueue : (Bytearray.t * int * int) list Queue.t;
-    mutable pendingOutput : bool;
-    mutable flowControl : bool;
-    mutable canWrite : bool;
-    mutable tokens : int;
-    mutable reader : unit Lwt.t option }
-
 let receivedBytes = ref 0.
 let emittedBytes = ref 0.
 
-let inputBuffer_size = 8192
+(****)
 
-let fill_inputBuffer conn =
-  assert (conn.inputLength = 0);
-  catch_io_errors
+(* I/O buffers *)
+
+type ioBuffer =
+  { channel : Unix.file_descr;
+    buffer : string;
+    mutable length : int }
+
+let bufferSize = 16384
+(* No point in making this larger, as the Ocaml Unix library uses a
+   buffer of this size *)
+
+let makeBuffer ch =
+  { channel = ch; buffer = String.create bufferSize; length = 0 }
+
+(****)
+
+(* Low-level inputs *)
+
+let fillInputBuffer conn =
+  assert (conn.length = 0);
+  catchIoErrors
     (fun () ->
-       Lwt_unix.read conn.inputChannel conn.inputBuffer 0 inputBuffer_size
-         >>= (fun len ->
+       Lwt_unix.read conn.channel conn.buffer 0 bufferSize >>= fun len ->
        debugV (fun() ->
          if len = 0 then
            Util.msg "grab: EOF\n"
          else
            Util.msg "grab: %s\n"
-             (String.escaped (String.sub conn.inputBuffer 0 len)));
+             (String.escaped (String.sub conn.buffer 0 len)));
        if len = 0 then
-         lost_connection ()
+         lostConnection ()
        else begin
          receivedBytes := !receivedBytes +. float len;
-         conn.inputLength <- len;
+         conn.length <- len;
          Lwt.return ()
-       end))
+       end)
 
-let rec grab_rec conn s pos len =
-  if conn.inputLength = 0 then begin
-    fill_inputBuffer conn >>= (fun () ->
-    grab_rec conn s pos len)
+let rec grabRec conn s pos len =
+  if conn.length = 0 then begin
+    fillInputBuffer conn >>= fun () ->
+    grabRec conn s pos len
   end else begin
-    let l = min (len - pos) conn.inputLength in
-    Bytearray.blit_from_string conn.inputBuffer 0 s pos l;
-    conn.inputLength <- conn.inputLength - l;
-    if conn.inputLength > 0 then
-      String.blit conn.inputBuffer l conn.inputBuffer 0 conn.inputLength;
+    let l = min (len - pos) conn.length in
+    Bytearray.blit_from_string conn.buffer 0 s pos l;
+    conn.length <- conn.length - l;
+    if conn.length > 0 then
+      String.blit conn.buffer l conn.buffer 0 conn.length;
     if pos + l < len then
-      grab_rec conn s (pos + l) len
+      grabRec conn s (pos + l) len
     else
       Lwt.return ()
   end
@@ -141,215 +153,216 @@
 let grab conn s len =
   assert (len > 0);
   assert (Bytearray.length s <= len);
-  grab_rec conn s 0 len
+  grabRec conn s 0 len
 
-let peek_without_blocking conn =
-  String.sub conn.inputBuffer 0 conn.inputLength
+let peekWithoutBlocking conn =
+  String.sub conn.buffer 0 conn.length
 
 (****)
 
-let outputBuffer_size = 8192
+(* Low-level outputs *)
 
-let rec send_output conn =
-  catch_io_errors
+let rec sendOutput conn =
+  catchIoErrors
     (fun () ->
-       Lwt_unix.write
-         conn.outputChannel conn.outputBuffer 0 conn.outputLength
-         >>= (fun len ->
+       Lwt_unix.write conn.channel conn.buffer 0 conn.length >>= fun len ->
        debugV (fun() ->
          Util.msg "dump: %s\n"
-           (String.escaped (String.sub conn.outputBuffer 0 len)));
+           (String.escaped (String.sub conn.buffer 0 len)));
        emittedBytes := !emittedBytes +. float len;
-       conn.outputLength <- conn.outputLength - len;
-       if conn.outputLength > 0 then
+       conn.length <- conn.length - len;
+       if conn.length > 0 then
          String.blit
-           conn.outputBuffer len conn.outputBuffer 0 conn.outputLength;
-       Lwt.return ()))
+           conn.buffer len conn.buffer 0 conn.length;
+       Lwt.return ())
 
-let rec fill_buffer_2 conn s pos len =
-  if conn.outputLength = outputBuffer_size then
-    send_output conn >>= (fun () ->
-    fill_buffer_2 conn s pos len)
+let rec fillBuffer2 conn s pos len =
+  if conn.length = bufferSize then
+    sendOutput conn >>= fun () ->
+    fillBuffer2 conn s pos len
   else begin
-    let l = min (len - pos) (outputBuffer_size - conn.outputLength) in
-    Bytearray.blit_to_string s pos conn.outputBuffer conn.outputLength l;
-    conn.outputLength <- conn.outputLength + l;
+    let l = min (len - pos) (bufferSize - conn.length) in
+    Bytearray.blit_to_string s pos conn.buffer conn.length l;
+    conn.length <- conn.length + l;
     if pos + l < len then
-      fill_buffer_2 conn s (pos + l) len
+      fillBuffer2 conn s (pos + l) len
     else
       Lwt.return ()
   end
 
-let bufReg = Lwt_util.make_region 1
-
-let rec fill_buffer conn l =
+let rec fillBuffer conn l =
   match l with
     (s, pos, len) :: rem ->
       assert (pos >= 0);
       assert (len >= 0);
       assert (pos <= Bytearray.length s - len);
-      fill_buffer_2 conn s pos len >>= (fun () ->
-      fill_buffer conn rem)
+      fillBuffer2 conn s pos len >>= fun () ->
+      fillBuffer conn rem
   | [] ->
       Lwt.return ()
 
-let fill_buffer conn l =
-  Lwt_util.run_in_region bufReg 1 (fun () -> fill_buffer conn l)
-let send_output conn =
-  Lwt_util.run_in_region bufReg 1 (fun () -> send_output conn)
+let rec flushBuffer conn =
+  if conn.length > 0 then
+    sendOutput conn >>= fun () ->
+    flushBuffer conn
+  else
+    Lwt.return ()
 
+(****)
 
-let blockedStream = ref None
+(* Output scheduling *)
 
-let rec streamWaitForWrite conn =
-  if conn.canWrite then Lwt.return () else begin
-    debugE (fun() -> Util.msg "Stream: waiting for write token\n");
-    let w = Lwt.wait () in
-    blockedStream := Some w;
-    w >>= fun () ->
-    debugE (fun() -> Util.msg "Stream: restarting\n");
-    streamWaitForWrite conn
-  end
+type kind = Normal | Idle | Last | Urgent
 
-let restartStream () =
-  match !blockedStream with
-    Some w -> blockedStream := None; Lwt.wakeup w ()
-  | None   -> ()
+type outputQueue =
+  { mutable available : bool;
+    mutable canWrite : bool;
+    mutable flowControl : bool;
+    writes : (kind * (unit -> unit Lwt.t) * unit Lwt.t) Queue.t;
+    urgentWrites : (kind * (unit -> unit Lwt.t) * unit Lwt.t) Queue.t;
+    idleWrites : (kind * (unit -> unit Lwt.t) * unit Lwt.t) Queue.t;
+    flush : outputQueue -> unit Lwt.t }
 
-(*
-   Flow-control mechanism (only active under windows).
-   Only one side is allowed to send message at any given time.
-   Once it has finished sending message, a special message is sent
-   meaning that the destination is now allowed to send messages.
-   A side is allowed to send any number of messages, but will then
-   not be allowed to send before having received the same number of
-   messages.
-   This way, there can be no dead-lock with both sides trying
-   simultaneously to send some messages.  Furthermore, multiple
-   messages can still be coalesced.
-*)
-let needFlowControl = windowsHack
+let rec performOutputRec q (kind, action, res) =
+  action () >>= fun () ->
+  if kind = Last then begin
+    assert (q.canWrite);
+    if q.flowControl then q.canWrite <- false
+  end;
+  Lwt.wakeup res ();
+  popOutputQueues q
 
-let rec flush_buffer_simpl conn =
-  if conn.outputLength > 0 then
-    send_output conn >>= fun () ->
-    flush_buffer_simpl conn
-  else
-    Lwt.return ()
-
-(* Loop until the output buffer is empty *)
-let rec flush_buffer conn =
-  if conn.tokens <= 0 && conn.canWrite then begin
-    assert conn.flowControl;
-    conn.canWrite <- false;
-    debugE (fun() -> Util.msg "Sending write token\n");
-    (* Special message allowing the other side to write *)
-    fill_buffer conn [encodeInt 0] >>= (fun () ->
-    flush_buffer conn) >>= (fun () ->
-    if windowsHack then begin
-      debugE (fun() -> Util.msg "Restarting reader\n");
-      match conn.reader with
-        None ->
-          ()
-      | Some r ->
-          conn.reader <- None;
-          Lwt.wakeup r ()
-    end;
-    Lwt.return ())
-  end else if conn.outputLength > 0 then
-    send_output conn >>= (fun () ->
-    flush_buffer conn)
+and popOutputQueues q =
+  if not (Queue.is_empty q.urgentWrites) then
+    performOutputRec q (Queue.take q.urgentWrites)
+  else if not (Queue.is_empty q.writes) && q.canWrite then
+    performOutputRec q (Queue.take q.writes)
+  else if not (Queue.is_empty q.idleWrites) && q.canWrite then
+    performOutputRec q (Queue.take q.idleWrites)
   else begin
-    conn.pendingOutput <- false;
+    q.available <- true;
+    (* Flush asynchronously the output *)
+    Lwt.ignore_result (q.flush q);
     Lwt.return ()
   end
 
-(* Send all pending messages *)
-let rec dump_rec conn =
-  try
-    let l = Queue.take conn.outputQueue in
-    fill_buffer conn l >>= (fun () ->
-    if conn.flowControl then conn.tokens <- conn.tokens - 1;
-    debugE (fun () -> Util.msg "Remaining tokens: %d\n" conn.tokens);
-    dump_rec conn)
-  with Queue.Empty ->
-    (* We wait a bit before flushing everything, so that other packets
-       send just afterwards can be coalesced *)
-    Lwt_unix.yield () >>= (fun () ->
-    if not (Queue.is_empty conn.outputQueue) then
-      dump_rec conn
-    else begin
-      flush_buffer conn >>= fun () ->
-      if not (Queue.is_empty conn.outputQueue) then
-        signalSomethingToWrite conn;
+(* Perform an output action in an atomic way *)
+let performOutput q kind action =
+  if q.available && (kind = Urgent || q.canWrite) then begin
+    q.available <- false;
+    performOutputRec q (kind, action, Lwt.wait ())
+  end else begin
+    let res = Lwt.wait () in
+    Queue.add (kind, action, res)
+      (match kind with
+         Urgent -> q.urgentWrites
+       | Normal -> q.writes
+       | Idle   -> q.idleWrites
+       | Last   -> assert false);
+    res
+  end
+
+let allowWrites q =
+  assert (not q.canWrite);
+  q.canWrite <- true;
+  q.available <- false;
+  (* We yield to let the receiving thread restart and to let some time
+     to the requests to be processed *)
+  Lwt.ignore_result (Lwt_unix.yield () >>= fun () -> popOutputQueues q)
+
+
+let disableFlowControl q =
+  q.flowControl <- false;
+  if not q.canWrite then allowWrites q
+
+let outputQueueIsEmpty q = q.available
+
+let makeOutputQueue isServer flush =
+  { available = true; canWrite = isServer; flowControl = true;
+    writes = Queue.create (); urgentWrites = Queue.create ();
+    idleWrites = Queue.create ();
+    flush = flush }
+
+(****)
+
+type connection =
+  { inputBuffer : ioBuffer;
+    outputBuffer : ioBuffer;
+    outputQueue : outputQueue;
+    receiver :  (unit -> unit Lwt.t) option ref }
+
+let maybeFlush receiver pendingFlush q buf =
+  (* We return immediately if a flush is already scheduled, or if the
+     output buffer is already empty. *)
+  (* If we are doing flow control and we can write, we need to send
+     a write token even when the buffer is empty. *)
+  if
+    !pendingFlush || (buf.length = 0 && not (q.flowControl && q.canWrite))
+  then
+    Lwt.return ()
+  else begin
+    pendingFlush := true;
+    (* Wait a bit, in case there are some new requests being processed *)
+    Lwt_unix.yield () >>= fun () ->
+    pendingFlush := false;
+    (* If there are other writes scheduled, we do not flush yet *)
+    if outputQueueIsEmpty q then begin
+      performOutput q Last
+        (fun () ->
+           if q.flowControl then begin
+             debugE (fun() -> Util.msg "Sending write token\n");
+             fillBuffer buf [encodeInt 0] >>= fun () ->
+             flushBuffer buf
+           end else
+             flushBuffer buf) >>= fun () ->
+      (* Restart the reader thread if needed *)
+      match !receiver with
+        None   -> Lwt.return ()
+      | Some f -> f ()
+    end else
       Lwt.return ()
-    end)
+  end
 
-(* Start the thread that write all pending messages, if this thread is
-   not running at this time *)
-and signalSomethingToWrite conn =
-  if not conn.canWrite && conn.pendingOutput then
-    debugE
-      (fun () -> Util.msg "Something to write, but no write token (%d)\n"
-                          conn.tokens);
-  if not conn.pendingOutput && conn.canWrite then begin
-    conn.pendingOutput <- true;
-    Lwt.ignore_result (dump_rec conn)
-  end;
-  if conn.canWrite then restartStream ()
+let makeConnection isServer inCh outCh =
+  let pendingFlush = ref false in
+  let receiver = ref None in
+  let outputBuffer = makeBuffer outCh in
+    { inputBuffer = makeBuffer inCh;
+      outputBuffer = outputBuffer;
+      outputQueue =
+        makeOutputQueue isServer
+          (fun q -> maybeFlush receiver pendingFlush q outputBuffer);
+      receiver = receiver }
 
-(* Add a message to the output queue and schedule its emission *)
-(* A message is a list of fragments of messages, represented by triplets
-   (string, position in string, length) *)
+(* Send message [l] *)
 let dump conn l =
-  Queue.add l conn.outputQueue;
-  signalSomethingToWrite conn;
-  Lwt.return ()
+  performOutput
+    conn.outputQueue Normal (fun () -> fillBuffer conn.outputBuffer l)
 
-(* Invoked when a special message is received from the other side,
-   allowing this side to send messages *)
-let allowWrites conn =
-  if conn.flowControl then begin
-    assert (conn.pendingOutput = false);
-    assert (not conn.canWrite);
-    conn.canWrite <- true;
-    debugE (fun () -> Util.msg "Received write token (%d)\n" conn.tokens);
-    (* Flush pending messages, if there are any *)
-    signalSomethingToWrite conn
-  end
+(* Send message [l] when idle *)
+let dumpIdle conn l =
+  performOutput
+    conn.outputQueue Idle (fun () -> fillBuffer conn.outputBuffer l)
 
-(* Invoked when a special message is received from the other side,
-   meaning that the other side does not block on write, and that
-   therefore there can be no dead-lock. *)
-let disableFlowControl conn =
-  debugE (fun () -> Util.msg "Flow control disabled\n");
-  conn.flowControl <- false;
-  conn.canWrite <- true;
-  conn.tokens <- 1;
-  (* We are allowed to write, so we flush pending messages, if there
-     are any *)
-  signalSomethingToWrite conn
+(* Send message [l], even if write are disabled.  This is used for
+   aborting rapidly a stream.  This works as long as only one small
+   message is written at a time (the write will succeed as the pipe
+   will not be full) *)
+let dumpUrgent conn l =
+  performOutput conn.outputQueue Urgent
+    (fun () ->
+       fillBuffer conn.outputBuffer l >>= fun () ->
+       flushBuffer conn.outputBuffer)
 
 (****)
 
 (* Initialize the connection *)
-let setupIO in_ch out_ch =
+let setupIO isServer inCh outCh =
   if not windowsHack then begin
-    Unix.set_nonblock in_ch;
-    Unix.set_nonblock out_ch
+    Unix.set_nonblock inCh;
+    Unix.set_nonblock outCh
   end;
-  { inputChannel = in_ch;
-    inputBuffer = String.create inputBuffer_size;
-    inputLength = 0;
-    outputChannel = out_ch;
-    outputBuffer = String.create outputBuffer_size;
-    outputLength = 0;
-    outputQueue = Queue.create ();
-    pendingOutput = false;
-    flowControl = true;
-    canWrite = true;
-    tokens = 1;
-    reader = None }
+  makeConnection isServer inCh outCh
 
 (* XXX *)
 module Thread = struct
@@ -558,12 +571,12 @@
 let receivePacket conn =
   (* Get the length of the packet *)
   let int_buf = Bytearray.create intSize in
-  grab conn int_buf intSize >>= (fun () ->
+  grab conn.inputBuffer int_buf intSize >>= (fun () ->
   let length = decodeInt int_buf 0 in
   assert (length >= 0);
   (* Get packet *)
   let buf = Bytearray.create length in
-  grab conn buf length >>= (fun () ->
+  grab conn.inputBuffer buf length >>= (fun () ->
   (debugE (fun () ->
              let start =
                if length > 10 then (Bytearray.sub buf 0 10) ^ "..."
@@ -622,8 +635,7 @@
   if not !streamAbortedDst then begin
     streamAbortedDst := true;
     let request = encodeInt id :: marshalHeader StreamAbort [] in
-    fill_buffer conn request >>= fun () ->
-    flush_buffer_simpl conn
+    dumpUrgent conn request
   end else
     Lwt.return ()
 
@@ -665,63 +677,56 @@
 (* Receiving thread: read a message and dispatch it to the right
    thread or create a new thread to process requests. *)
 let rec receive conn =
-  (if windowsHack && conn.canWrite && not recent_ocaml then
-     let wait = Lwt.wait () in
-     assert (conn.reader = None);
-     conn.reader <- Some wait;
-     wait
-   else
-     Lwt.return ()) >>= (fun () ->
-  debugE (fun () -> Util.msg "Waiting for next message\n");
-  (* Get the message ID *)
-  let id = Bytearray.create intSize in
-  grab conn id intSize >>= (fun () ->
-  let num_id = decodeInt id 0 in
-  if num_id = 0 then begin
-    debugE (fun () -> Util.msg "Received the write permission\n");
-    allowWrites conn;
-    receive conn
+  if readOrWrite && conn.outputQueue.canWrite then begin
+    conn.receiver := Some (fun () -> receive conn); Lwt.return ()
   end else begin
-    if conn.flowControl then conn.tokens <- conn.tokens + 1;
-    debugE
-      (fun () -> Util.msg "Message received (id: %d) (tokens: %d)\n"
-                          num_id  conn.tokens);
-    (* Read the header *)
-    receivePacket conn >>= (fun buf ->
-    let req = unmarshalHeader buf in
-    begin match req with
-      Request cmdName ->
-        receivePacket conn >>= (fun buf ->
-        (* We yield before starting processing the request.
-           This way, the request may call [Lwt_unix.run] and this will
-           not block the receiving thread. *)
-        Lwt.ignore_result
-          (Lwt_unix.yield () >>= (fun () ->
-           processRequest conn id cmdName buf));
-        receive conn)
-    | NormalResult ->
-        receivePacket conn >>= (fun buf ->
-        Lwt.wakeup (find_receiver num_id) buf;
-        receive conn)
-    | TransientExn s ->
-        debugV (fun() -> Util.msg "receive: Transient remote error '%s']" s);
-        Lwt.wakeup_exn (find_receiver num_id) (Util.Transient s);
-        receive conn
-    | FatalExn s ->
-        debugV (fun() -> Util.msg "receive: Fatal remote error '%s']" s);
-        Lwt.wakeup_exn (find_receiver num_id) (Util.Fatal ("Server: " ^ s));
-        receive conn
-    | Stream cmdName ->
-        receivePacket conn >>= fun buf ->
-        if conn.flowControl then conn.tokens <- conn.tokens - 1;
-        processStream conn id cmdName buf >>= fun () ->
-        receive conn
-    | StreamAbort ->
-        if conn.flowControl then conn.tokens <- conn.tokens - 1;
-        streamAbortedSrc := num_id;
-        receive conn
+    debugE (fun () -> Util.msg "Waiting for next message\n");
+    (* Get the message ID *)
+    let id = Bytearray.create intSize in
+    grab conn.inputBuffer id intSize >>= (fun () ->
+    let num_id = decodeInt id 0 in
+    if num_id = 0 then begin
+      debugE (fun () -> Util.msg "Received the write permission\n");
+      allowWrites conn.outputQueue;
+      receive conn
+    end else begin
+      debugE
+        (fun () -> Util.msg "Message received (id: %d)\n" num_id);
+      (* Read the header *)
+      receivePacket conn >>= (fun buf ->
+      let req = unmarshalHeader buf in
+      begin match req with
+        Request cmdName ->
+          receivePacket conn >>= (fun buf ->
+          (* We yield before starting processing the request.
+             This way, the request may call [Lwt_unix.run] and this will
+             not block the receiving thread. *)
+          Lwt.ignore_result
+            (Lwt_unix.yield () >>= (fun () ->
+             processRequest conn id cmdName buf));
+          receive conn)
+      | NormalResult ->
+          receivePacket conn >>= (fun buf ->
+          Lwt.wakeup (find_receiver num_id) buf;
+          receive conn)
+      | TransientExn s ->
+          debugV (fun() -> Util.msg "receive: Transient remote error '%s']" s);
+          Lwt.wakeup_exn (find_receiver num_id) (Util.Transient s);
+          receive conn
+      | FatalExn s ->
+          debugV (fun() -> Util.msg "receive: Fatal remote error '%s']" s);
+          Lwt.wakeup_exn (find_receiver num_id) (Util.Fatal ("Server: " ^ s));
+          receive conn
+      | Stream cmdName ->
+          receivePacket conn >>= fun buf ->
+          processStream conn id cmdName buf >>= fun () ->
+          receive conn
+      | StreamAbort ->
+          streamAbortedSrc := num_id;
+          receive conn
+      end)
     end)
-  end))
+  end
 
 let wait_for_reply id =
   let res = Lwt.wait () in
@@ -867,15 +872,13 @@
   serverStreams := Util.StringMap.add cmdName server !serverStreams;
   (* Create a client function and return it *)
   let client conn id serverArgs =
-    if !streamAbortedSrc = id then raise (Util.Transient "Streaming aborted");
-    streamWaitForWrite conn >>= fun () ->
     debugE (fun () -> Util.msg "Sending stream chunk (id: %d)\n" id);
     if !streamAbortedSrc = id then raise (Util.Transient "Streaming aborted");
     let request =
       encodeInt id ::
       marshalHeader (Stream cmdName) (marshalArgs serverArgs [])
     in
-    fill_buffer conn request
+    dumpIdle conn request
   in
   fun conn sender ->
     if not (Prefs.read streamingActivated) then
@@ -886,9 +889,7 @@
       Lwt.try_bind
         (fun () ->
            Lwt_util.run_in_region streamReg 1
-             (fun () ->
-                Lwt_unix.yield () >>= fun () ->
-                sender (fun v -> client conn id v)))
+             (fun () -> sender (fun v -> client conn id v)))
         (fun v -> ping conn id >>= fun () -> Lwt.return v)
 	(fun e -> ping conn id >>= fun () -> Lwt.fail e)
     end
@@ -903,11 +904,11 @@
   if pos = len then
     Lwt.return ()
   else begin
-    (grab conn buffer 1 >>= (fun () ->
+    (grab conn.inputBuffer buffer 1 >>= (fun () ->
     if buffer.{0} <> connectionHeader.[pos] then
       let prefix =
         String.sub connectionHeader 0 pos ^ Bytearray.to_string buffer in
-      let rest = peek_without_blocking conn in
+      let rest = peekWithoutBlocking conn.inputBuffer in
       Lwt.fail
         (Util.Fatal
            ("Received unexpected header from the server:\n \
@@ -934,7 +935,7 @@
 *)
 
 let negociateFlowControlLocal conn () =
-  if not needFlowControl then disableFlowControl conn;
+  if not needFlowControl then disableFlowControl conn.outputQueue;
   Lwt.return needFlowControl
 
 let negociateFlowControlRemote =
@@ -955,8 +956,7 @@
 let initConnection in_ch out_ch =
   if not windowsHack then
     ignore(Sys.set_signal Sys.sigpipe Sys.Signal_ignore);
-  let conn = setupIO in_ch out_ch in
-  conn.canWrite <- false;
+  let conn = setupIO false in_ch out_ch in
   checkHeader
     conn (Bytearray.create 1) 0 (String.length connectionHeader) >>= (fun () ->
   Lwt.ignore_result (receive conn);
@@ -1293,7 +1293,7 @@
   Trace.runningasserver := true;
   (* Send header indicating to the client that it has successfully
      connected to the server *)
-  let conn = setupIO in_ch out_ch in
+  let conn = setupIO true in_ch out_ch in
   try
     Lwt_unix.run
       (dump conn [(Bytearray.of_string connectionHeader, 0,



More information about the Unison-hackers mailing list