[Unison-hackers] [unison-svn] r371 - trunk/src
vouillon@seas.upenn.edu
vouillon at seas.upenn.edu
Mon Jul 13 08:38:43 EDT 2009
Author: vouillon
Date: 2009-07-13 08:38:43 -0400 (Mon, 13 Jul 2009)
New Revision: 371
Modified:
trunk/src/RECENTNEWS
trunk/src/mkProjectInfo.ml
trunk/src/remote.ml
Log:
* Clean-up in remote.ml
* Dead-lock free flow control mechanism
Modified: trunk/src/RECENTNEWS
===================================================================
--- trunk/src/RECENTNEWS 2009-07-12 16:28:33 UTC (rev 370)
+++ trunk/src/RECENTNEWS 2009-07-13 12:38:43 UTC (rev 371)
@@ -1,5 +1,11 @@
CHANGES FROM VERSION 2.36.-27
+* Clean-up in remote.ml
+* Dead-lock free flow control mechanism
+
+-------------------------------
+CHANGES FROM VERSION 2.36.-27
+
* Fixed bug with case insensitive mode on a case sensitive filesystem:
- if file "a/a" is created on one replica and directory "A" is
created on the other, the file failed to be synchronized the first
Modified: trunk/src/mkProjectInfo.ml
===================================================================
--- trunk/src/mkProjectInfo.ml 2009-07-12 16:28:33 UTC (rev 370)
+++ trunk/src/mkProjectInfo.ml 2009-07-13 12:38:43 UTC (rev 371)
@@ -89,3 +89,4 @@
+
Modified: trunk/src/remote.ml
===================================================================
--- trunk/src/remote.ml 2009-07-12 16:28:33 UTC (rev 370)
+++ trunk/src/remote.ml 2009-07-13 12:38:43 UTC (rev 371)
@@ -33,6 +33,15 @@
Scanf.sscanf Sys.ocaml_version "%d.%d"
(fun maj min -> (maj = 3 && min >= 11) || maj > 3)
+(*
+ Flow-control mechanism (only active under windows).
+ Only one side is allowed to send messages at any given time.
+ Once it has finished sending messages, a special message is sent
+ meaning that the destination is now allowed to send messages.
+*)
+let needFlowControl = windowsHack
+let readOrWrite = needFlowControl && not recent_ocaml
+
(****)
let intSize = 5
@@ -65,10 +74,10 @@
(* LOW-LEVEL IO *)
(*************************************************************************)
-let lost_connection () =
+let lostConnection () =
Lwt.fail (Util.Fatal "Lost connection with the server")
-let catch_io_errors th =
+let catchIoErrors th =
Lwt.catch th
(fun e ->
match e with
@@ -77,63 +86,66 @@
(* Windows may also return the following errors... *)
| Unix.Unix_error(Unix.EINVAL, _, _) ->
(* Client has closed its end of the connection *)
- lost_connection ()
+ lostConnection ()
| _ ->
Lwt.fail e)
(****)
-type connection =
- { inputChannel : Unix.file_descr;
- inputBuffer : string;
- mutable inputLength : int;
- outputChannel : Unix.file_descr;
- outputBuffer : string;
- mutable outputLength : int;
- outputQueue : (Bytearray.t * int * int) list Queue.t;
- mutable pendingOutput : bool;
- mutable flowControl : bool;
- mutable canWrite : bool;
- mutable tokens : int;
- mutable reader : unit Lwt.t option }
-
let receivedBytes = ref 0.
let emittedBytes = ref 0.
-let inputBuffer_size = 8192
+(****)
-let fill_inputBuffer conn =
- assert (conn.inputLength = 0);
- catch_io_errors
+(* I/O buffers *)
+
+type ioBuffer =
+ { channel : Unix.file_descr;
+ buffer : string;
+ mutable length : int }
+
+let bufferSize = 16384
+(* No point in making this larger, as the Ocaml Unix library uses a
+ buffer of this size *)
+
+let makeBuffer ch =
+ { channel = ch; buffer = String.create bufferSize; length = 0 }
+
+(****)
+
+(* Low-level inputs *)
+
+let fillInputBuffer conn =
+ assert (conn.length = 0);
+ catchIoErrors
(fun () ->
- Lwt_unix.read conn.inputChannel conn.inputBuffer 0 inputBuffer_size
- >>= (fun len ->
+ Lwt_unix.read conn.channel conn.buffer 0 bufferSize >>= fun len ->
debugV (fun() ->
if len = 0 then
Util.msg "grab: EOF\n"
else
Util.msg "grab: %s\n"
- (String.escaped (String.sub conn.inputBuffer 0 len)));
+ (String.escaped (String.sub conn.buffer 0 len)));
if len = 0 then
- lost_connection ()
+ lostConnection ()
else begin
receivedBytes := !receivedBytes +. float len;
- conn.inputLength <- len;
+ conn.length <- len;
Lwt.return ()
- end))
+ end)
-let rec grab_rec conn s pos len =
- if conn.inputLength = 0 then begin
- fill_inputBuffer conn >>= (fun () ->
- grab_rec conn s pos len)
+let rec grabRec conn s pos len =
+ if conn.length = 0 then begin
+ fillInputBuffer conn >>= fun () ->
+ grabRec conn s pos len
end else begin
- let l = min (len - pos) conn.inputLength in
- Bytearray.blit_from_string conn.inputBuffer 0 s pos l;
- conn.inputLength <- conn.inputLength - l;
- if conn.inputLength > 0 then
- String.blit conn.inputBuffer l conn.inputBuffer 0 conn.inputLength;
+ let l = min (len - pos) conn.length in
+ Bytearray.blit_from_string conn.buffer 0 s pos l;
+ conn.length <- conn.length - l;
+ if conn.length > 0 then
+ String.blit conn.buffer l conn.buffer 0 conn.length;
if pos + l < len then
- grab_rec conn s (pos + l) len
+ grabRec conn s (pos + l) len
else
Lwt.return ()
end
@@ -141,215 +153,216 @@
let grab conn s len =
assert (len > 0);
assert (Bytearray.length s <= len);
- grab_rec conn s 0 len
+ grabRec conn s 0 len
-let peek_without_blocking conn =
- String.sub conn.inputBuffer 0 conn.inputLength
+let peekWithoutBlocking conn =
+ String.sub conn.buffer 0 conn.length
(****)
-let outputBuffer_size = 8192
+(* Low-level outputs *)
-let rec send_output conn =
- catch_io_errors
+let rec sendOutput conn =
+ catchIoErrors
(fun () ->
- Lwt_unix.write
- conn.outputChannel conn.outputBuffer 0 conn.outputLength
- >>= (fun len ->
+ Lwt_unix.write conn.channel conn.buffer 0 conn.length >>= fun len ->
debugV (fun() ->
Util.msg "dump: %s\n"
- (String.escaped (String.sub conn.outputBuffer 0 len)));
+ (String.escaped (String.sub conn.buffer 0 len)));
emittedBytes := !emittedBytes +. float len;
- conn.outputLength <- conn.outputLength - len;
- if conn.outputLength > 0 then
+ conn.length <- conn.length - len;
+ if conn.length > 0 then
String.blit
- conn.outputBuffer len conn.outputBuffer 0 conn.outputLength;
- Lwt.return ()))
+ conn.buffer len conn.buffer 0 conn.length;
+ Lwt.return ())
-let rec fill_buffer_2 conn s pos len =
- if conn.outputLength = outputBuffer_size then
- send_output conn >>= (fun () ->
- fill_buffer_2 conn s pos len)
+let rec fillBuffer2 conn s pos len =
+ if conn.length = bufferSize then
+ sendOutput conn >>= fun () ->
+ fillBuffer2 conn s pos len
else begin
- let l = min (len - pos) (outputBuffer_size - conn.outputLength) in
- Bytearray.blit_to_string s pos conn.outputBuffer conn.outputLength l;
- conn.outputLength <- conn.outputLength + l;
+ let l = min (len - pos) (bufferSize - conn.length) in
+ Bytearray.blit_to_string s pos conn.buffer conn.length l;
+ conn.length <- conn.length + l;
if pos + l < len then
- fill_buffer_2 conn s (pos + l) len
+ fillBuffer2 conn s (pos + l) len
else
Lwt.return ()
end
-let bufReg = Lwt_util.make_region 1
-
-let rec fill_buffer conn l =
+let rec fillBuffer conn l =
match l with
(s, pos, len) :: rem ->
assert (pos >= 0);
assert (len >= 0);
assert (pos <= Bytearray.length s - len);
- fill_buffer_2 conn s pos len >>= (fun () ->
- fill_buffer conn rem)
+ fillBuffer2 conn s pos len >>= fun () ->
+ fillBuffer conn rem
| [] ->
Lwt.return ()
-let fill_buffer conn l =
- Lwt_util.run_in_region bufReg 1 (fun () -> fill_buffer conn l)
-let send_output conn =
- Lwt_util.run_in_region bufReg 1 (fun () -> send_output conn)
+let rec flushBuffer conn =
+ if conn.length > 0 then
+ sendOutput conn >>= fun () ->
+ flushBuffer conn
+ else
+ Lwt.return ()
+(****)
-let blockedStream = ref None
+(* Output scheduling *)
-let rec streamWaitForWrite conn =
- if conn.canWrite then Lwt.return () else begin
- debugE (fun() -> Util.msg "Stream: waiting for write token\n");
- let w = Lwt.wait () in
- blockedStream := Some w;
- w >>= fun () ->
- debugE (fun() -> Util.msg "Stream: restarting\n");
- streamWaitForWrite conn
- end
+type kind = Normal | Idle | Last | Urgent
-let restartStream () =
- match !blockedStream with
- Some w -> blockedStream := None; Lwt.wakeup w ()
- | None -> ()
+type outputQueue =
+ { mutable available : bool;
+ mutable canWrite : bool;
+ mutable flowControl : bool;
+ writes : (kind * (unit -> unit Lwt.t) * unit Lwt.t) Queue.t;
+ urgentWrites : (kind * (unit -> unit Lwt.t) * unit Lwt.t) Queue.t;
+ idleWrites : (kind * (unit -> unit Lwt.t) * unit Lwt.t) Queue.t;
+ flush : outputQueue -> unit Lwt.t }
-(*
- Flow-control mechanism (only active under windows).
- Only one side is allowed to send message at any given time.
- Once it has finished sending message, a special message is sent
- meaning that the destination is now allowed to send messages.
- A side is allowed to send any number of messages, but will then
- not be allowed to send before having received the same number of
- messages.
- This way, there can be no dead-lock with both sides trying
- simultaneously to send some messages. Furthermore, multiple
- messages can still be coalesced.
-*)
-let needFlowControl = windowsHack
+let rec performOutputRec q (kind, action, res) =
+ action () >>= fun () ->
+ if kind = Last then begin
+ assert (q.canWrite);
+ if q.flowControl then q.canWrite <- false
+ end;
+ Lwt.wakeup res ();
+ popOutputQueues q
-let rec flush_buffer_simpl conn =
- if conn.outputLength > 0 then
- send_output conn >>= fun () ->
- flush_buffer_simpl conn
- else
- Lwt.return ()
-
-(* Loop until the output buffer is empty *)
-let rec flush_buffer conn =
- if conn.tokens <= 0 && conn.canWrite then begin
- assert conn.flowControl;
- conn.canWrite <- false;
- debugE (fun() -> Util.msg "Sending write token\n");
- (* Special message allowing the other side to write *)
- fill_buffer conn [encodeInt 0] >>= (fun () ->
- flush_buffer conn) >>= (fun () ->
- if windowsHack then begin
- debugE (fun() -> Util.msg "Restarting reader\n");
- match conn.reader with
- None ->
- ()
- | Some r ->
- conn.reader <- None;
- Lwt.wakeup r ()
- end;
- Lwt.return ())
- end else if conn.outputLength > 0 then
- send_output conn >>= (fun () ->
- flush_buffer conn)
+and popOutputQueues q =
+ if not (Queue.is_empty q.urgentWrites) then
+ performOutputRec q (Queue.take q.urgentWrites)
+ else if not (Queue.is_empty q.writes) && q.canWrite then
+ performOutputRec q (Queue.take q.writes)
+ else if not (Queue.is_empty q.idleWrites) && q.canWrite then
+ performOutputRec q (Queue.take q.idleWrites)
else begin
- conn.pendingOutput <- false;
+ q.available <- true;
+ (* Flush asynchronously the output *)
+ Lwt.ignore_result (q.flush q);
Lwt.return ()
end
-(* Send all pending messages *)
-let rec dump_rec conn =
- try
- let l = Queue.take conn.outputQueue in
- fill_buffer conn l >>= (fun () ->
- if conn.flowControl then conn.tokens <- conn.tokens - 1;
- debugE (fun () -> Util.msg "Remaining tokens: %d\n" conn.tokens);
- dump_rec conn)
- with Queue.Empty ->
- (* We wait a bit before flushing everything, so that other packets
- send just afterwards can be coalesced *)
- Lwt_unix.yield () >>= (fun () ->
- if not (Queue.is_empty conn.outputQueue) then
- dump_rec conn
- else begin
- flush_buffer conn >>= fun () ->
- if not (Queue.is_empty conn.outputQueue) then
- signalSomethingToWrite conn;
+(* Perform an output action in an atomic way *)
+let performOutput q kind action =
+ if q.available && (kind = Urgent || q.canWrite) then begin
+ q.available <- false;
+ performOutputRec q (kind, action, Lwt.wait ())
+ end else begin
+ let res = Lwt.wait () in
+ Queue.add (kind, action, res)
+ (match kind with
+ Urgent -> q.urgentWrites
+ | Normal -> q.writes
+ | Idle -> q.idleWrites
+ | Last -> assert false);
+ res
+ end
+
+let allowWrites q =
+ assert (not q.canWrite);
+ q.canWrite <- true;
+ q.available <- false;
+ (* We yield to let the receiving thread restart and to let some time
+ to the requests to be processed *)
+ Lwt.ignore_result (Lwt_unix.yield () >>= fun () -> popOutputQueues q)
+
+
+let disableFlowControl q =
+ q.flowControl <- false;
+ if not q.canWrite then allowWrites q
+
+let outputQueueIsEmpty q = q.available
+
+let makeOutputQueue isServer flush =
+ { available = true; canWrite = isServer; flowControl = true;
+ writes = Queue.create (); urgentWrites = Queue.create ();
+ idleWrites = Queue.create ();
+ flush = flush }
+
+(****)
+
+type connection =
+ { inputBuffer : ioBuffer;
+ outputBuffer : ioBuffer;
+ outputQueue : outputQueue;
+ receiver : (unit -> unit Lwt.t) option ref }
+
+let maybeFlush receiver pendingFlush q buf =
+ (* We return immediately if a flush is already scheduled, or if the
+ output buffer is already empty. *)
+ (* If we are doing flow control and we can write, we need to send
+ a write token even when the buffer is empty. *)
+ if
+ !pendingFlush || (buf.length = 0 && not (q.flowControl && q.canWrite))
+ then
+ Lwt.return ()
+ else begin
+ pendingFlush := true;
+ (* Wait a bit, in case there are some new requests being processed *)
+ Lwt_unix.yield () >>= fun () ->
+ pendingFlush := false;
+ (* If there are other writes scheduled, we do not flush yet *)
+ if outputQueueIsEmpty q then begin
+ performOutput q Last
+ (fun () ->
+ if q.flowControl then begin
+ debugE (fun() -> Util.msg "Sending write token\n");
+ fillBuffer buf [encodeInt 0] >>= fun () ->
+ flushBuffer buf
+ end else
+ flushBuffer buf) >>= fun () ->
+ (* Restart the reader thread if needed *)
+ match !receiver with
+ None -> Lwt.return ()
+ | Some f -> f ()
+ end else
Lwt.return ()
- end)
+ end
-(* Start the thread that write all pending messages, if this thread is
- not running at this time *)
-and signalSomethingToWrite conn =
- if not conn.canWrite && conn.pendingOutput then
- debugE
- (fun () -> Util.msg "Something to write, but no write token (%d)\n"
- conn.tokens);
- if not conn.pendingOutput && conn.canWrite then begin
- conn.pendingOutput <- true;
- Lwt.ignore_result (dump_rec conn)
- end;
- if conn.canWrite then restartStream ()
+let makeConnection isServer inCh outCh =
+ let pendingFlush = ref false in
+ let receiver = ref None in
+ let outputBuffer = makeBuffer outCh in
+ { inputBuffer = makeBuffer inCh;
+ outputBuffer = outputBuffer;
+ outputQueue =
+ makeOutputQueue isServer
+ (fun q -> maybeFlush receiver pendingFlush q outputBuffer);
+ receiver = receiver }
-(* Add a message to the output queue and schedule its emission *)
-(* A message is a list of fragments of messages, represented by triplets
- (string, position in string, length) *)
+(* Send message [l] *)
let dump conn l =
- Queue.add l conn.outputQueue;
- signalSomethingToWrite conn;
- Lwt.return ()
+ performOutput
+ conn.outputQueue Normal (fun () -> fillBuffer conn.outputBuffer l)
-(* Invoked when a special message is received from the other side,
- allowing this side to send messages *)
-let allowWrites conn =
- if conn.flowControl then begin
- assert (conn.pendingOutput = false);
- assert (not conn.canWrite);
- conn.canWrite <- true;
- debugE (fun () -> Util.msg "Received write token (%d)\n" conn.tokens);
- (* Flush pending messages, if there are any *)
- signalSomethingToWrite conn
- end
+(* Send message [l] when idle *)
+let dumpIdle conn l =
+ performOutput
+ conn.outputQueue Idle (fun () -> fillBuffer conn.outputBuffer l)
-(* Invoked when a special message is received from the other side,
- meaning that the other side does not block on write, and that
- therefore there can be no dead-lock. *)
-let disableFlowControl conn =
- debugE (fun () -> Util.msg "Flow control disabled\n");
- conn.flowControl <- false;
- conn.canWrite <- true;
- conn.tokens <- 1;
- (* We are allowed to write, so we flush pending messages, if there
- are any *)
- signalSomethingToWrite conn
+(* Send message [l], even if write are disabled. This is used for
+ aborting rapidly a stream. This works as long as only one small
+ message is written at a time (the write will succeed as the pipe
+ will not be full) *)
+let dumpUrgent conn l =
+ performOutput conn.outputQueue Urgent
+ (fun () ->
+ fillBuffer conn.outputBuffer l >>= fun () ->
+ flushBuffer conn.outputBuffer)
(****)
(* Initialize the connection *)
-let setupIO in_ch out_ch =
+let setupIO isServer inCh outCh =
if not windowsHack then begin
- Unix.set_nonblock in_ch;
- Unix.set_nonblock out_ch
+ Unix.set_nonblock inCh;
+ Unix.set_nonblock outCh
end;
- { inputChannel = in_ch;
- inputBuffer = String.create inputBuffer_size;
- inputLength = 0;
- outputChannel = out_ch;
- outputBuffer = String.create outputBuffer_size;
- outputLength = 0;
- outputQueue = Queue.create ();
- pendingOutput = false;
- flowControl = true;
- canWrite = true;
- tokens = 1;
- reader = None }
+ makeConnection isServer inCh outCh
(* XXX *)
module Thread = struct
@@ -558,12 +571,12 @@
let receivePacket conn =
(* Get the length of the packet *)
let int_buf = Bytearray.create intSize in
- grab conn int_buf intSize >>= (fun () ->
+ grab conn.inputBuffer int_buf intSize >>= (fun () ->
let length = decodeInt int_buf 0 in
assert (length >= 0);
(* Get packet *)
let buf = Bytearray.create length in
- grab conn buf length >>= (fun () ->
+ grab conn.inputBuffer buf length >>= (fun () ->
(debugE (fun () ->
let start =
if length > 10 then (Bytearray.sub buf 0 10) ^ "..."
@@ -622,8 +635,7 @@
if not !streamAbortedDst then begin
streamAbortedDst := true;
let request = encodeInt id :: marshalHeader StreamAbort [] in
- fill_buffer conn request >>= fun () ->
- flush_buffer_simpl conn
+ dumpUrgent conn request
end else
Lwt.return ()
@@ -665,63 +677,56 @@
(* Receiving thread: read a message and dispatch it to the right
thread or create a new thread to process requests. *)
let rec receive conn =
- (if windowsHack && conn.canWrite && not recent_ocaml then
- let wait = Lwt.wait () in
- assert (conn.reader = None);
- conn.reader <- Some wait;
- wait
- else
- Lwt.return ()) >>= (fun () ->
- debugE (fun () -> Util.msg "Waiting for next message\n");
- (* Get the message ID *)
- let id = Bytearray.create intSize in
- grab conn id intSize >>= (fun () ->
- let num_id = decodeInt id 0 in
- if num_id = 0 then begin
- debugE (fun () -> Util.msg "Received the write permission\n");
- allowWrites conn;
- receive conn
+ if readOrWrite && conn.outputQueue.canWrite then begin
+ conn.receiver := Some (fun () -> receive conn); Lwt.return ()
end else begin
- if conn.flowControl then conn.tokens <- conn.tokens + 1;
- debugE
- (fun () -> Util.msg "Message received (id: %d) (tokens: %d)\n"
- num_id conn.tokens);
- (* Read the header *)
- receivePacket conn >>= (fun buf ->
- let req = unmarshalHeader buf in
- begin match req with
- Request cmdName ->
- receivePacket conn >>= (fun buf ->
- (* We yield before starting processing the request.
- This way, the request may call [Lwt_unix.run] and this will
- not block the receiving thread. *)
- Lwt.ignore_result
- (Lwt_unix.yield () >>= (fun () ->
- processRequest conn id cmdName buf));
- receive conn)
- | NormalResult ->
- receivePacket conn >>= (fun buf ->
- Lwt.wakeup (find_receiver num_id) buf;
- receive conn)
- | TransientExn s ->
- debugV (fun() -> Util.msg "receive: Transient remote error '%s']" s);
- Lwt.wakeup_exn (find_receiver num_id) (Util.Transient s);
- receive conn
- | FatalExn s ->
- debugV (fun() -> Util.msg "receive: Fatal remote error '%s']" s);
- Lwt.wakeup_exn (find_receiver num_id) (Util.Fatal ("Server: " ^ s));
- receive conn
- | Stream cmdName ->
- receivePacket conn >>= fun buf ->
- if conn.flowControl then conn.tokens <- conn.tokens - 1;
- processStream conn id cmdName buf >>= fun () ->
- receive conn
- | StreamAbort ->
- if conn.flowControl then conn.tokens <- conn.tokens - 1;
- streamAbortedSrc := num_id;
- receive conn
+ debugE (fun () -> Util.msg "Waiting for next message\n");
+ (* Get the message ID *)
+ let id = Bytearray.create intSize in
+ grab conn.inputBuffer id intSize >>= (fun () ->
+ let num_id = decodeInt id 0 in
+ if num_id = 0 then begin
+ debugE (fun () -> Util.msg "Received the write permission\n");
+ allowWrites conn.outputQueue;
+ receive conn
+ end else begin
+ debugE
+ (fun () -> Util.msg "Message received (id: %d)\n" num_id);
+ (* Read the header *)
+ receivePacket conn >>= (fun buf ->
+ let req = unmarshalHeader buf in
+ begin match req with
+ Request cmdName ->
+ receivePacket conn >>= (fun buf ->
+ (* We yield before starting processing the request.
+ This way, the request may call [Lwt_unix.run] and this will
+ not block the receiving thread. *)
+ Lwt.ignore_result
+ (Lwt_unix.yield () >>= (fun () ->
+ processRequest conn id cmdName buf));
+ receive conn)
+ | NormalResult ->
+ receivePacket conn >>= (fun buf ->
+ Lwt.wakeup (find_receiver num_id) buf;
+ receive conn)
+ | TransientExn s ->
+ debugV (fun() -> Util.msg "receive: Transient remote error '%s']" s);
+ Lwt.wakeup_exn (find_receiver num_id) (Util.Transient s);
+ receive conn
+ | FatalExn s ->
+ debugV (fun() -> Util.msg "receive: Fatal remote error '%s']" s);
+ Lwt.wakeup_exn (find_receiver num_id) (Util.Fatal ("Server: " ^ s));
+ receive conn
+ | Stream cmdName ->
+ receivePacket conn >>= fun buf ->
+ processStream conn id cmdName buf >>= fun () ->
+ receive conn
+ | StreamAbort ->
+ streamAbortedSrc := num_id;
+ receive conn
+ end)
end)
- end))
+ end
let wait_for_reply id =
let res = Lwt.wait () in
@@ -867,15 +872,13 @@
serverStreams := Util.StringMap.add cmdName server !serverStreams;
(* Create a client function and return it *)
let client conn id serverArgs =
- if !streamAbortedSrc = id then raise (Util.Transient "Streaming aborted");
- streamWaitForWrite conn >>= fun () ->
debugE (fun () -> Util.msg "Sending stream chunk (id: %d)\n" id);
if !streamAbortedSrc = id then raise (Util.Transient "Streaming aborted");
let request =
encodeInt id ::
marshalHeader (Stream cmdName) (marshalArgs serverArgs [])
in
- fill_buffer conn request
+ dumpIdle conn request
in
fun conn sender ->
if not (Prefs.read streamingActivated) then
@@ -886,9 +889,7 @@
Lwt.try_bind
(fun () ->
Lwt_util.run_in_region streamReg 1
- (fun () ->
- Lwt_unix.yield () >>= fun () ->
- sender (fun v -> client conn id v)))
+ (fun () -> sender (fun v -> client conn id v)))
(fun v -> ping conn id >>= fun () -> Lwt.return v)
(fun e -> ping conn id >>= fun () -> Lwt.fail e)
end
@@ -903,11 +904,11 @@
if pos = len then
Lwt.return ()
else begin
- (grab conn buffer 1 >>= (fun () ->
+ (grab conn.inputBuffer buffer 1 >>= (fun () ->
if buffer.{0} <> connectionHeader.[pos] then
let prefix =
String.sub connectionHeader 0 pos ^ Bytearray.to_string buffer in
- let rest = peek_without_blocking conn in
+ let rest = peekWithoutBlocking conn.inputBuffer in
Lwt.fail
(Util.Fatal
("Received unexpected header from the server:\n \
@@ -934,7 +935,7 @@
*)
let negociateFlowControlLocal conn () =
- if not needFlowControl then disableFlowControl conn;
+ if not needFlowControl then disableFlowControl conn.outputQueue;
Lwt.return needFlowControl
let negociateFlowControlRemote =
@@ -955,8 +956,7 @@
let initConnection in_ch out_ch =
if not windowsHack then
ignore(Sys.set_signal Sys.sigpipe Sys.Signal_ignore);
- let conn = setupIO in_ch out_ch in
- conn.canWrite <- false;
+ let conn = setupIO false in_ch out_ch in
checkHeader
conn (Bytearray.create 1) 0 (String.length connectionHeader) >>= (fun () ->
Lwt.ignore_result (receive conn);
@@ -1293,7 +1293,7 @@
Trace.runningasserver := true;
(* Send header indicating to the client that it has successfully
connected to the server *)
- let conn = setupIO in_ch out_ch in
+ let conn = setupIO true in_ch out_ch in
try
Lwt_unix.run
(dump conn [(Bytearray.of_string connectionHeader, 0,
More information about the Unison-hackers
mailing list