[Unison-hackers] [unison-svn] r406 - in trunk: doc src src/lwt src/lwt/generic src/lwt/win src/system/win

vouillon@seas.upenn.edu vouillon at seas.upenn.edu
Fri Jan 22 04:52:57 EST 2010


Author: vouillon
Date: 2010-01-22 04:52:57 -0500 (Fri, 22 Jan 2010)
New Revision: 406

Added:
   trunk/src/lwt/generic/
   trunk/src/lwt/generic/lwt_unix_impl.ml
   trunk/src/lwt/lwt_unix.ml
   trunk/src/lwt/lwt_unix_stubs.c
   trunk/src/lwt/win/
   trunk/src/lwt/win/lwt_unix_impl.ml
Removed:
   trunk/src/lwt/lwt_unix.ml
Modified:
   trunk/doc/Makefile
   trunk/src/.depend
   trunk/src/Makefile.OCaml
   trunk/src/RECENTNEWS
   trunk/src/fingerprint.ml
   trunk/src/lwt/lwt_unix.mli
   trunk/src/mkProjectInfo.ml
   trunk/src/osx.ml
   trunk/src/osxsupport.c
   trunk/src/remote.ml
   trunk/src/system/win/
Log:
* Fixed bug which made Unison ignore finder information and resource
  fork when compiled to 64bit on Mac OSX.
* Use asynchronous I/O under Windows


Modified: trunk/doc/Makefile
===================================================================
--- trunk/doc/Makefile	2010-01-20 16:11:27 UTC (rev 405)
+++ trunk/doc/Makefile	2010-01-22 09:52:57 UTC (rev 406)
@@ -34,7 +34,7 @@
 	@echo HEVEAPATH = $(HEVEAPATH)
 	@(if [ ! -f prefs.tmp ]; then $(MAKE) prefs.tmp; fi)
 	@(if [ ! -f prefsdocs.tmp ]; then $(MAKE) prefsdocs.tmp; fi)
-ifdef HEVEA
+ifeq ($(HEVEA),true)
 	printf '$(TEXDIRECTIVES)\\textversiontrue\\draftfalse' \
            > texdirectives.tex
 	latex unison-manual.tex
@@ -90,4 +90,4 @@
 	../src/mkProjectInfo > $@
 
 ../src/mkProjectInfo: ../src/mkProjectInfo.ml
-	ocamlc -o $@ $^
+	ocamlc str.cma -o $@ $^

Modified: trunk/src/.depend
===================================================================
--- trunk/src/.depend	2010-01-20 16:11:27 UTC (rev 405)
+++ trunk/src/.depend	2010-01-22 09:52:57 UTC (rev 406)
@@ -7,7 +7,7 @@
     fileinfo.cmi 
 copy.cmi: uutil.cmi props.cmi path.cmi osx.cmi os.cmi lwt/lwt.cmi fspath.cmi \
     fileinfo.cmi common.cmi 
-external.cmi: 
+external.cmi: lwt/lwt.cmi 
 fileinfo.cmi: system.cmi props.cmi path.cmi osx.cmi fspath.cmi 
 files.cmi: uutil.cmi system.cmi props.cmi path.cmi lwt/lwt_util.cmi \
     lwt/lwt.cmi common.cmi 
@@ -154,9 +154,9 @@
 pred.cmx: ubase/util.cmx ubase/safelist.cmx ubase/rx.cmx ubase/prefs.cmx \
     case.cmx pred.cmi 
 props.cmo: uutil.cmi ubase/util.cmi ubase/prefs.cmi path.cmi osx.cmi \
-    fspath.cmi fs.cmi external.cmi props.cmi 
+    lwt/lwt_unix.cmi fspath.cmi fs.cmi external.cmi props.cmi 
 props.cmx: uutil.cmx ubase/util.cmx ubase/prefs.cmx path.cmx osx.cmx \
-    fspath.cmx fs.cmx external.cmx props.cmi 
+    lwt/lwt_unix.cmx fspath.cmx fs.cmx external.cmx props.cmi 
 recon.cmo: ubase/util.cmi update.cmi tree.cmi ubase/trace.cmi sortri.cmi \
     ubase/safelist.cmi props.cmi ubase/prefs.cmi pred.cmi path.cmi name.cmi \
     globals.cmi fileinfo.cmi common.cmi recon.cmi 
@@ -295,8 +295,8 @@
     fspath.cmx xferhint.cmi 
 lwt/lwt.cmo: lwt/lwt.cmi 
 lwt/lwt.cmx: lwt/lwt.cmi 
-lwt/lwt_unix.cmo: lwt/pqueue.cmi lwt/lwt.cmi lwt/lwt_unix.cmi 
-lwt/lwt_unix.cmx: lwt/pqueue.cmx lwt/lwt.cmx lwt/lwt_unix.cmi 
+lwt/lwt_unix.cmo: lwt/lwt_unix.cmi 
+lwt/lwt_unix.cmx: lwt/lwt_unix.cmi 
 lwt/lwt_util.cmo: lwt/lwt.cmi lwt/lwt_util.cmi 
 lwt/lwt_util.cmx: lwt/lwt.cmx lwt/lwt_util.cmi 
 lwt/pqueue.cmo: lwt/pqueue.cmi 
@@ -350,6 +350,10 @@
 lwt/example/editor.cmx: lwt/lwt_unix.cmx 
 lwt/example/relay.cmo: lwt/lwt_unix.cmi lwt/lwt.cmi 
 lwt/example/relay.cmx: lwt/lwt_unix.cmx lwt/lwt.cmx 
+lwt/generic/lwt_unix_impl.cmo: lwt/pqueue.cmi lwt/lwt.cmi 
+lwt/generic/lwt_unix_impl.cmx: lwt/pqueue.cmx lwt/lwt.cmx 
+lwt/win/lwt_unix_impl.cmo: lwt/pqueue.cmi lwt/lwt.cmi 
+lwt/win/lwt_unix_impl.cmx: lwt/pqueue.cmx lwt/lwt.cmx 
 system/generic/system_impl.cmo: system/system_generic.cmo 
 system/generic/system_impl.cmx: system/system_generic.cmx 
 system/win/system_impl.cmo: system/system_win.cmo system/system_generic.cmo 

Modified: trunk/src/Makefile.OCaml
===================================================================
--- trunk/src/Makefile.OCaml	2010-01-20 16:11:27 UTC (rev 405)
+++ trunk/src/Makefile.OCaml	2010-01-22 09:52:57 UTC (rev 406)
@@ -5,6 +5,11 @@
 ####################################################################
 ### Try to automatically guess OS
 
+ifeq (${OSCOMP},cross) # Cross-compilation under Linux
+  OSARCH=win32gnuc
+  PATH := /usr/i586-mingw32msvc/bin:$(PATH)
+endif
+
 ifeq (${OSCOMP},cygwingnuc) # Define this if compiling with Cygwin GNU C
   OSARCH=win32gnuc
   ETAGS=/bin/etags
@@ -86,7 +91,7 @@
 
 INCLFLAGS=-I lwt -I ubase -I system
 CAMLFLAGS+=$(INCLFLAGS)
-CAMLFLAGS+=-I system/$(SYSTEM)
+CAMLFLAGS+=-I system/$(SYSTEM) -I lwt/$(SYSTEM)
 
 ifeq ($(OSARCH),win32)
   # Win32 system
@@ -100,7 +105,7 @@
 #    issue."
 #  CLIBS+=-cclib win32rc/unison.res
 #  STATICLIBS+=-cclib win32rc/unison.res
-  COBJS+=system/system_win_stubs$(OBJ_EXT)
+  COBJS+=system/system_win_stubs$(OBJ_EXT) lwt/lwt_unix_stubs$(OBJ_EXT)
   WINOBJS=system/system_win.cmo
   SYSTEM=win
   CLIBS+=-cclib "-link win32rc/unison.res"
@@ -113,7 +118,7 @@
   ifeq ($(OSARCH),win32gnuc)
     CWD=.
     EXEC_EXT=.exe
-    COBJS+=system/system_win_stubs$(OBJ_EXT)
+    COBJS+=system/system_win_stubs$(OBJ_EXT) lwt/lwt_unix_stubs$(OBJ_EXT)
     WINOBJS=system/system_win.cmo
     SYSTEM=win
     CLIBS+=-cclib win32rc/unison.res.lib
@@ -206,7 +211,8 @@
           ubase/uprintf.cmo ubase/util.cmo ubase/uarg.cmo \
           ubase/prefs.cmo ubase/trace.cmo ubase/proplist.cmo \
           \
-          lwt/pqueue.cmo lwt/lwt.cmo lwt/lwt_util.cmo lwt/lwt_unix.cmo \
+          lwt/pqueue.cmo lwt/lwt.cmo lwt/lwt_util.cmo \
+          lwt/$(SYSTEM)/lwt_unix_impl.cmo lwt/lwt_unix.cmo \
           \
           case.cmo pred.cmo uutil.cmo \
           fileutil.cmo name.cmo path.cmo fspath.cmo fs.cmo fingerprint.cmo \
@@ -303,6 +309,8 @@
 # Additional dependencied depending on the system
 system.cmo fspath.cmo fs.cmo: system/$(SYSTEM)/system_impl.cmo
 system.cmx fspath.cmx fs.cmx: system/$(SYSTEM)/system_impl.cmx
+lwt/lwt_unix.cmo: lwt/$(SYSTEM)/lwt_unix_impl.cmo
+lwt/lwt_unix.cmx: lwt/$(SYSTEM)/lwt_unix_impl.cmx
 
 ifeq ($(OSARCH), OpenBSD)
   ifeq ($(shell echo type ocamldot | ksh), file)

Modified: trunk/src/RECENTNEWS
===================================================================
--- trunk/src/RECENTNEWS	2010-01-20 16:11:27 UTC (rev 405)
+++ trunk/src/RECENTNEWS	2010-01-22 09:52:57 UTC (rev 406)
@@ -1,5 +1,12 @@
 CHANGES FROM VERSION 2.39.6
 
+* Fixed bug which made Unison ignore finder information and resource
+  fork when compiled to 64bit on Mac OSX.
+* Use asynchronous I/O under Windows
+
+-------------------------------
+CHANGES FROM VERSION 2.39.6
+
 * Made a server waiting on a socket more resilient to unexpected
   lost connections from the client.
 * Fixed possible race condition in half-duplex communication mode.

Modified: trunk/src/fingerprint.ml
===================================================================
--- trunk/src/fingerprint.ml	2010-01-20 16:11:27 UTC (rev 405)
+++ trunk/src/fingerprint.ml	2010-01-22 09:52:57 UTC (rev 406)
@@ -84,6 +84,7 @@
   if d == dummy then
     1234577
   else begin
+    assert (String.length d >= 3);
     Char.code (String.unsafe_get d 0) +
     (Char.code (String.unsafe_get d 1) lsl 8) +
     (Char.code (String.unsafe_get d 2) lsl 16)


Property changes on: trunk/src/lwt/generic
___________________________________________________________________
Added: svn:ignore
   + *.cmx
*.cmi
*.cmo


Copied: trunk/src/lwt/generic/lwt_unix_impl.ml (from rev 402, trunk/src/lwt/lwt_unix.ml)
===================================================================
--- trunk/src/lwt/generic/lwt_unix_impl.ml	                        (rev 0)
+++ trunk/src/lwt/generic/lwt_unix_impl.ml	2010-01-22 09:52:57 UTC (rev 406)
@@ -0,0 +1,508 @@
+(*
+Non-blocking I/O and select does not (fully) work under Windows.
+The libray therefore does not use them under Windows, and will
+therefore have the following limitations:
+- No read will be performed while there are some threads ready to run
+  or waiting to write;
+- When a read is pending, everything else will be blocked: [sleep]
+  will not terminate and other reads will not be performed before
+  this read terminates;
+- A write on a socket or a pipe can block the execution of the program
+  if the data are never consumed at the other end of the connection.
+  In particular, if both ends use this library and write at the same
+  time, this could result in a dead-lock.
+- [connect] is blocking
+*)
+let windows_hack = Sys.os_type <> "Unix"
+let recent_ocaml =
+  Scanf.sscanf Sys.ocaml_version "%d.%d"
+    (fun maj min -> (maj = 3 && min >= 11) || maj > 3)
+
+module SleepQueue =
+  Pqueue.Make (struct
+    type t = float * int * unit Lwt.t
+    let compare (t, i, _) (t', i', _) =
+      let c = compare t t' in
+      if c = 0 then i - i' else c
+  end)
+let sleep_queue = ref SleepQueue.empty
+
+let event_counter = ref 0
+
+let sleep d =
+  let res = Lwt.wait () in
+  incr event_counter;
+  let t = if d <= 0. then 0. else Unix.gettimeofday () +. d in
+  sleep_queue :=
+    SleepQueue.add (t, !event_counter, res) !sleep_queue;
+  res
+
+let yield () = sleep 0.
+
+let get_time t =
+  if !t = -1. then t := Unix.gettimeofday ();
+  !t
+
+let in_the_past now t =
+  t = 0. || t <= get_time now
+
+let rec restart_threads imax now =
+  match
+    try Some (SleepQueue.find_min !sleep_queue) with Not_found -> None
+  with
+    Some (time, i, thr) when in_the_past now time && i - imax <= 0 ->
+      sleep_queue := SleepQueue.remove_min !sleep_queue;
+      Lwt.wakeup thr ();
+      restart_threads imax now
+  | _ ->
+      ()
+
+type file_descr = Unix.file_descr
+
+let of_unix_file_descr fd = if not windows_hack then Unix.set_nonblock fd; fd
+
+let inputs = ref []
+let outputs = ref []
+let wait_children = ref []
+
+let child_exited = ref false
+let _ =
+  if not windows_hack then
+    ignore(Sys.signal Sys.sigchld (Sys.Signal_handle (fun _ -> child_exited := true)))
+
+let bad_fd fd =
+  try ignore (Unix.LargeFile.fstat fd); false with
+    Unix.Unix_error (_, _, _) ->
+      true
+
+let wrap_syscall queue fd cont syscall =
+  let res =
+    try
+      Some (syscall ())
+    with
+      Exit
+    | Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK | Unix.EINTR), _, _) ->
+        (* EINTR because we are catching SIG_CHLD hence the system call
+           might be interrupted to handle the signal; this lets us restart
+           the system call eventually. *)
+        None
+    | e ->
+        queue := List.remove_assoc fd !queue;
+        Lwt.wakeup_exn cont e;
+        None
+  in
+  match res with
+    Some v ->
+      queue := List.remove_assoc fd !queue;
+      Lwt.wakeup cont v
+  | None ->
+      ()
+
+let rec run thread =
+  match Lwt.poll thread with
+    Some v ->
+      v
+  | None ->
+      let next_event =
+        try
+          let (time, _, _) = SleepQueue.find_min !sleep_queue in Some time
+        with Not_found ->
+          None
+      in
+      let now = ref (-1.) in
+      let delay =
+        match next_event with
+          None      -> -1.
+        | Some 0.   -> 0.
+        | Some time -> max 0. (time -. get_time now)
+      in
+      let infds = List.map fst !inputs in
+      let outfds = List.map fst !outputs in
+      let (readers, writers, _) =
+        if windows_hack && not recent_ocaml then
+          let writers = outfds in
+          let readers =
+            if delay = 0. || writers <> [] then [] else infds in
+          (readers, writers, [])
+        else if infds = [] && outfds = [] && delay = 0. then
+          ([], [], [])
+        else
+          try
+            let res = Unix.select infds outfds [] delay in
+            if delay > 0. && !now <> -1. then now := !now +. delay;
+            res
+          with
+            Unix.Unix_error (Unix.EINTR, _, _) ->
+              ([], [], [])
+          | Unix.Unix_error (Unix.EBADF, _, _) ->
+              (List.filter bad_fd infds, List.filter bad_fd outfds, [])
+          | Unix.Unix_error (Unix.EPIPE, _, _)
+            when windows_hack && recent_ocaml ->
+            (* Workaround for a bug in Ocaml 3.11: select fails with an
+               EPIPE error when the file descriptor is remotely closed *)
+              (infds, [], [])
+      in
+      restart_threads !event_counter now;
+      List.iter
+        (fun fd ->
+           try
+             match List.assoc fd !inputs with
+               `Read (buf, pos, len, res) ->
+                  wrap_syscall inputs fd res
+                    (fun () -> Unix.read fd buf pos len)
+             | `Accept res ->
+                  wrap_syscall inputs fd res
+                    (fun () ->
+                       let (s, _) as v = Unix.accept fd in
+                       if not windows_hack then Unix.set_nonblock s;
+                       v)
+             | `Wait res ->
+                  wrap_syscall inputs fd res (fun () -> ())
+           with Not_found ->
+             ())
+        readers;
+      List.iter
+        (fun fd ->
+           try
+             match List.assoc fd !outputs with
+               `Write (buf, pos, len, res) ->
+                  wrap_syscall outputs fd res
+                    (fun () -> Unix.write fd buf pos len)
+             | `CheckSocket res ->
+                  wrap_syscall outputs fd res
+                    (fun () ->
+                       try ignore (Unix.getpeername fd) with
+                         Unix.Unix_error (Unix.ENOTCONN, _, _) ->
+                           ignore (Unix.read fd " " 0 1))
+             | `Wait res ->
+                  wrap_syscall inputs fd res (fun () -> ())
+           with Not_found ->
+             ())
+        writers;
+      if !child_exited then begin
+        child_exited := false;
+        List.iter
+          (fun (id, (res, flags, pid)) ->
+             wrap_syscall wait_children id res
+               (fun () ->
+                  let (pid', _) as v = Unix.waitpid flags pid in
+                  if pid' = 0 then raise Exit;
+                  v))
+          !wait_children
+      end;
+      run thread
+
+(****)
+
+let wait_read ch =
+  let res = Lwt.wait () in
+  inputs := (ch, `Wait res) :: !inputs;
+  res
+
+let wait_write ch =
+  let res = Lwt.wait () in
+  outputs := (ch, `Wait res) :: !outputs;
+  res
+
+let read ch buf pos len =
+  try
+    if windows_hack then raise (Unix.Unix_error (Unix.EAGAIN, "", ""));
+    Lwt.return (Unix.read ch buf pos len)
+  with
+    Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK), _, _) ->
+      let res = Lwt.wait () in
+      inputs := (ch, `Read (buf, pos, len, res)) :: !inputs;
+      res
+  | e ->
+      Lwt.fail e
+
+let write ch buf pos len =
+  try
+    if windows_hack && recent_ocaml then
+      raise (Unix.Unix_error (Unix.EAGAIN, "", ""));
+    Lwt.return (Unix.write ch buf pos len)
+  with
+    Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK), _, _) ->
+      let res = Lwt.wait () in
+      outputs := (ch, `Write (buf, pos, len, res)) :: !outputs;
+      res
+  | e ->
+      Lwt.fail e
+
+(*
+let pipe () =
+  let (in_fd, out_fd) as fd_pair = Unix.pipe() in
+  if not windows_hack then begin
+    Unix.set_nonblock in_fd;
+    Unix.set_nonblock out_fd
+  end;
+  fd_pair
+*)
+
+let pipe_in () =
+  let (in_fd, out_fd) as fd_pair = Unix.pipe() in
+  if not windows_hack then
+    Unix.set_nonblock in_fd;
+  fd_pair
+
+let pipe_out () =
+  let (in_fd, out_fd) as fd_pair = Unix.pipe() in
+  if not windows_hack then
+    Unix.set_nonblock out_fd;
+  fd_pair
+
+let socket dom typ proto =
+  let s = Unix.socket dom typ proto in
+  if not windows_hack then Unix.set_nonblock s;
+  s
+
+let socketpair dom typ proto =
+  let (s1, s2) as spair = Unix.socketpair dom typ proto in
+  if not windows_hack then begin
+    Unix.set_nonblock s1; Unix.set_nonblock s2
+  end;
+  Lwt.return spair
+
+let bind = Unix.bind
+let setsockopt = Unix.setsockopt
+let listen = Unix.listen
+let close = Unix.close
+let set_close_on_exec = Unix.set_close_on_exec
+
+let accept ch =
+  let res = Lwt.wait () in
+  inputs := (ch, `Accept res) :: !inputs;
+  res
+
+let check_socket ch =
+  let res = Lwt.wait () in
+  outputs := (ch, `CheckSocket res) :: !outputs;
+  res
+
+let connect s addr =
+  try
+    Unix.connect s addr;
+    Lwt.return ()
+  with
+    Unix.Unix_error
+      ((Unix.EINPROGRESS | Unix.EWOULDBLOCK | Unix.EAGAIN), _, _) ->
+        check_socket s
+  | e ->
+      Lwt.fail e
+
+let ids = ref 0
+let new_id () = incr ids; !ids
+
+let _waitpid flags pid =
+  try
+    Lwt.return (Unix.waitpid flags pid)
+  with e ->
+    Lwt.fail e
+
+let waitpid flags pid =
+  if List.mem Unix.WNOHANG flags || windows_hack then
+    _waitpid flags pid
+  else
+    let flags = Unix.WNOHANG :: flags in
+    Lwt.bind (_waitpid flags pid) (fun ((pid', _) as res) ->
+    if pid' <> 0 then
+      Lwt.return res
+    else
+      let res = Lwt.wait () in
+      wait_children := (new_id (), (res, flags, pid)) :: !wait_children;
+      res)
+
+let wait () = waitpid [] (-1)
+
+let system cmd =
+  match Unix.fork () with
+     0 -> Unix.execv "/bin/sh" [| "/bin/sh"; "-c"; cmd |]
+  | id -> Lwt.bind (waitpid [] id) (fun (pid, status) -> Lwt.return status)
+
+(****)
+
+type lwt_in_channel = in_channel
+type lwt_out_channel = out_channel
+
+let intern_in_channel ch =
+  Unix.set_nonblock (Unix.descr_of_in_channel ch); ch
+let intern_out_channel ch =
+  Unix.set_nonblock (Unix.descr_of_out_channel ch); ch
+
+
+let wait_inchan ic = wait_read (Unix.descr_of_in_channel ic)
+let wait_outchan oc = wait_write (Unix.descr_of_out_channel oc)
+
+let rec input_char ic =
+  try
+    Lwt.return (Pervasives.input_char ic)
+  with
+    Sys_blocked_io ->
+      Lwt.bind (wait_inchan ic) (fun () -> input_char ic)
+  | e ->
+      Lwt.fail e
+
+let rec input ic s ofs len =
+  try
+    Lwt.return (Pervasives.input ic s ofs len)
+  with
+    Sys_blocked_io ->
+      Lwt.bind (wait_inchan ic) (fun () -> input ic s ofs len)
+  | e ->
+      Lwt.fail e
+
+let rec unsafe_really_input ic s ofs len =
+  if len <= 0 then
+    Lwt.return ()
+  else begin
+    Lwt.bind (input ic s ofs len) (fun r ->
+    if r = 0
+    then Lwt.fail End_of_file
+    else unsafe_really_input ic s (ofs+r) (len-r))
+  end
+
+let really_input ic s ofs len =
+  if ofs < 0 || len < 0 || ofs > String.length s - len
+  then Lwt.fail (Invalid_argument "really_input")
+  else unsafe_really_input ic s ofs len
+
+let input_line ic =
+  let buf = ref (String.create 128) in
+  let pos = ref 0 in
+  let rec loop () =
+    if !pos = String.length !buf then begin
+      let newbuf = String.create (2 * !pos) in
+      String.blit !buf 0 newbuf 0 !pos;
+      buf := newbuf
+    end;
+    Lwt.bind (input_char ic) (fun c ->
+    if c = '\n' then
+      Lwt.return ()
+    else begin
+      !buf.[!pos] <- c;
+      incr pos;
+      loop ()
+    end)
+  in
+  Lwt.bind
+    (Lwt.catch loop
+       (fun e ->
+          match e with
+            End_of_file when !pos <> 0 ->
+              Lwt.return ()
+          | _ ->
+              Lwt.fail e))
+    (fun () ->
+       let res = String.create !pos in
+       String.blit !buf 0 res 0 !pos;
+       Lwt.return res)
+
+(****)
+
+type popen_process =
+    Process of in_channel * out_channel
+  | Process_in of in_channel
+  | Process_out of out_channel
+  | Process_full of in_channel * out_channel * in_channel
+
+let popen_processes = (Hashtbl.create 7 : (popen_process, int) Hashtbl.t)
+
+let open_proc cmd proc input output toclose =
+  match Unix.fork () with
+     0 -> if input <> Unix.stdin then begin
+            Unix.dup2 input Unix.stdin;
+            Unix.close input
+          end;
+          if output <> Unix.stdout then begin
+            Unix.dup2 output Unix.stdout;
+            Unix.close output
+          end;
+          List.iter Unix.close toclose;
+          Unix.execv "/bin/sh" [| "/bin/sh"; "-c"; cmd |]
+  | id -> Hashtbl.add popen_processes proc id
+
+let open_process_in cmd =
+  let (in_read, in_write) = pipe_in () in
+  let inchan = Unix.in_channel_of_descr in_read in
+  open_proc cmd (Process_in inchan) Unix.stdin in_write [in_read];
+  Unix.close in_write;
+  Lwt.return inchan
+
+let open_process_out cmd =
+  let (out_read, out_write) = pipe_out () in
+  let outchan = Unix.out_channel_of_descr out_write in
+  open_proc cmd (Process_out outchan) out_read Unix.stdout [out_write];
+  Unix.close out_read;
+  Lwt.return outchan
+
+let open_process cmd =
+  let (in_read, in_write) = pipe_in () in
+  let (out_read, out_write) = pipe_out () in
+  let inchan = Unix.in_channel_of_descr in_read in
+  let outchan = Unix.out_channel_of_descr out_write in
+  open_proc cmd (Process(inchan, outchan)) out_read in_write
+                                           [in_read; out_write];
+  Unix.close out_read;
+  Unix.close in_write;
+  Lwt.return (inchan, outchan)
+
+(* FIX: Subprocesses that use /dev/tty to print things on the terminal
+   will NOT have this output captured and returned to the caller of this
+   function.  There's an argument that this is correct, but if we are
+   running from a GUI the user may not be looking at any terminal and it
+   will appear that the process is just hanging.  This can be fixed, in
+   principle, by writing a little C code that opens /dev/tty and then uses
+   the TIOCNOTTY ioctl control to detach the terminal. *)
+
+let open_proc_full cmd env proc input output error toclose =
+  match Unix.fork () with
+     0 -> Unix.dup2 input Unix.stdin; Unix.close input;
+          Unix.dup2 output Unix.stdout; Unix.close output;
+          Unix.dup2 error Unix.stderr; Unix.close error;
+          List.iter Unix.close toclose;
+          Unix.execve "/bin/sh" [| "/bin/sh"; "-c"; cmd |] env
+  | id -> Hashtbl.add popen_processes proc id
+
+let open_process_full cmd env =
+  let (in_read, in_write) = pipe_in () in
+  let (out_read, out_write) = pipe_out () in
+  let (err_read, err_write) = pipe_in () in
+  let inchan = Unix.in_channel_of_descr in_read in
+  let outchan = Unix.out_channel_of_descr out_write in
+  let errchan = Unix.in_channel_of_descr err_read in
+  open_proc_full cmd env (Process_full(inchan, outchan, errchan))
+                 out_read in_write err_write [in_write; out_read; err_read];
+  Unix.close out_read;
+  Unix.close in_write;
+  Unix.close err_write;
+  Lwt.return (inchan, outchan, errchan)
+
+let find_proc_id fun_name proc =
+  try
+    let pid = Hashtbl.find popen_processes proc in
+    Hashtbl.remove popen_processes proc;
+    pid
+  with Not_found ->
+    raise (Unix.Unix_error (Unix.EBADF, fun_name, ""))
+
+let close_process_in inchan =
+  let pid = find_proc_id "close_process_in" (Process_in inchan) in
+  close_in inchan;
+  Lwt.bind (waitpid [] pid) (fun (_, status) -> Lwt.return status)
+
+let close_process_out outchan =
+  let pid = find_proc_id "close_process_out" (Process_out outchan) in
+  close_out outchan;
+  Lwt.bind (waitpid [] pid) (fun (_, status) -> Lwt.return status)
+
+let close_process (inchan, outchan) =
+  let pid = find_proc_id "close_process" (Process(inchan, outchan)) in
+  close_in inchan; close_out outchan;
+  Lwt.bind (waitpid [] pid) (fun (_, status) -> Lwt.return status)
+
+let close_process_full (outchan, inchan, errchan) =
+  let pid =
+    find_proc_id "close_process_full"
+                 (Process_full(outchan, inchan, errchan)) in
+  close_out inchan; close_in outchan; close_in errchan;
+  Lwt.bind (waitpid [] pid) (fun (_, status) -> Lwt.return status)

Deleted: trunk/src/lwt/lwt_unix.ml
===================================================================
--- trunk/src/lwt/lwt_unix.ml	2010-01-20 16:11:27 UTC (rev 405)
+++ trunk/src/lwt/lwt_unix.ml	2010-01-22 09:52:57 UTC (rev 406)
@@ -1,508 +0,0 @@
-(*
-Non-blocking I/O and select does not (fully) work under Windows.
-The libray therefore does not use them under Windows, and will
-therefore have the following limitations:
-- No read will be performed while there are some threads ready to run
-  or waiting to write;
-- When a read is pending, everything else will be blocked: [sleep]
-  will not terminate and other reads will not be performed before
-  this read terminates;
-- A write on a socket or a pipe can block the execution of the program
-  if the data are never consumed at the other end of the connection.
-  In particular, if both ends use this library and write at the same
-  time, this could result in a dead-lock.
-- [connect] is blocking
-*)
-let windows_hack = Sys.os_type <> "Unix"
-let recent_ocaml =
-  Scanf.sscanf Sys.ocaml_version "%d.%d"
-    (fun maj min -> (maj = 3 && min >= 11) || maj > 3)
-
-module SleepQueue =
-  Pqueue.Make (struct
-    type t = float * int * unit Lwt.t
-    let compare (t, i, _) (t', i', _) =
-      let c = compare t t' in
-      if c = 0 then i - i' else c
-  end)
-let sleep_queue = ref SleepQueue.empty
-
-let event_counter = ref 0
-
-let sleep d =
-  let res = Lwt.wait () in
-  incr event_counter;
-  let t = if d <= 0. then 0. else Unix.gettimeofday () +. d in
-  sleep_queue :=
-    SleepQueue.add (t, !event_counter, res) !sleep_queue;
-  res
-
-let yield () = sleep 0.
-
-let get_time t =
-  if !t = -1. then t := Unix.gettimeofday ();
-  !t
-
-let in_the_past now t =
-  t = 0. || t <= get_time now
-
-let rec restart_threads imax now =
-  match
-    try Some (SleepQueue.find_min !sleep_queue) with Not_found -> None
-  with
-    Some (time, i, thr) when in_the_past now time && i - imax <= 0 ->
-      sleep_queue := SleepQueue.remove_min !sleep_queue;
-      Lwt.wakeup thr ();
-      restart_threads imax now
-  | _ ->
-      ()
-
-type file_descr = Unix.file_descr
-
-let of_unix_file_descr fd = if not windows_hack then Unix.set_nonblock fd; fd
-
-let inputs = ref []
-let outputs = ref []
-let wait_children = ref []
-
-let child_exited = ref false
-let _ =
-  if not windows_hack then
-    ignore(Sys.signal Sys.sigchld (Sys.Signal_handle (fun _ -> child_exited := true)))
-
-let bad_fd fd =
-  try ignore (Unix.LargeFile.fstat fd); false with
-    Unix.Unix_error (_, _, _) ->
-      true
-
-let wrap_syscall queue fd cont syscall =
-  let res =
-    try
-      Some (syscall ())
-    with
-      Exit
-    | Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK | Unix.EINTR), _, _) ->
-        (* EINTR because we are catching SIG_CHLD hence the system call
-           might be interrupted to handle the signal; this lets us restart
-           the system call eventually. *)
-        None
-    | e ->
-        queue := List.remove_assoc fd !queue;
-        Lwt.wakeup_exn cont e;
-        None
-  in
-  match res with
-    Some v ->
-      queue := List.remove_assoc fd !queue;
-      Lwt.wakeup cont v
-  | None ->
-      ()
-
-let rec run thread =
-  match Lwt.poll thread with
-    Some v ->
-      v
-  | None ->
-      let next_event =
-        try
-          let (time, _, _) = SleepQueue.find_min !sleep_queue in Some time
-        with Not_found ->
-          None
-      in
-      let now = ref (-1.) in
-      let delay =
-        match next_event with
-          None      -> -1.
-        | Some 0.   -> 0.
-        | Some time -> max 0. (time -. get_time now)
-      in
-      let infds = List.map fst !inputs in
-      let outfds = List.map fst !outputs in
-      let (readers, writers, _) =
-        if windows_hack && not recent_ocaml then
-          let writers = outfds in
-          let readers =
-            if delay = 0. || writers <> [] then [] else infds in
-          (readers, writers, [])
-        else if infds = [] && outfds = [] && delay = 0. then
-          ([], [], [])
-        else
-          try
-            let res = Unix.select infds outfds [] delay in
-            if delay > 0. && !now <> -1. then now := !now +. delay;
-            res
-          with
-            Unix.Unix_error (Unix.EINTR, _, _) ->
-              ([], [], [])
-          | Unix.Unix_error (Unix.EBADF, _, _) ->
-              (List.filter bad_fd infds, List.filter bad_fd outfds, [])
-          | Unix.Unix_error (Unix.EPIPE, _, _)
-            when windows_hack && recent_ocaml ->
-            (* Workaround for a bug in Ocaml 3.11: select fails with an
-               EPIPE error when the file descriptor is remotely closed *)
-              (infds, [], [])
-      in
-      restart_threads !event_counter now;
-      List.iter
-        (fun fd ->
-           try
-             match List.assoc fd !inputs with
-               `Read (buf, pos, len, res) ->
-                  wrap_syscall inputs fd res
-                    (fun () -> Unix.read fd buf pos len)
-             | `Accept res ->
-                  wrap_syscall inputs fd res
-                    (fun () ->
-                       let (s, _) as v = Unix.accept fd in
-                       if not windows_hack then Unix.set_nonblock s;
-                       v)
-             | `Wait res ->
-                  wrap_syscall inputs fd res (fun () -> ())
-           with Not_found ->
-             ())
-        readers;
-      List.iter
-        (fun fd ->
-           try
-             match List.assoc fd !outputs with
-               `Write (buf, pos, len, res) ->
-                  wrap_syscall outputs fd res
-                    (fun () -> Unix.write fd buf pos len)
-             | `CheckSocket res ->
-                  wrap_syscall outputs fd res
-                    (fun () ->
-                       try ignore (Unix.getpeername fd) with
-                         Unix.Unix_error (Unix.ENOTCONN, _, _) ->
-                           ignore (Unix.read fd " " 0 1))
-             | `Wait res ->
-                  wrap_syscall inputs fd res (fun () -> ())
-           with Not_found ->
-             ())
-        writers;
-      if !child_exited then begin
-        child_exited := false;
-        List.iter
-          (fun (id, (res, flags, pid)) ->
-             wrap_syscall wait_children id res
-               (fun () ->
-                  let (pid', _) as v = Unix.waitpid flags pid in
-                  if pid' = 0 then raise Exit;
-                  v))
-          !wait_children
-      end;
-      run thread
-
-(****)
-
-let wait_read ch =
-  let res = Lwt.wait () in
-  inputs := (ch, `Wait res) :: !inputs;
-  res
-
-let wait_write ch =
-  let res = Lwt.wait () in
-  outputs := (ch, `Wait res) :: !outputs;
-  res
-
-let read ch buf pos len =
-  try
-    if windows_hack then raise (Unix.Unix_error (Unix.EAGAIN, "", ""));
-    Lwt.return (Unix.read ch buf pos len)
-  with
-    Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK), _, _) ->
-      let res = Lwt.wait () in
-      inputs := (ch, `Read (buf, pos, len, res)) :: !inputs;
-      res
-  | e ->
-      Lwt.fail e
-
-let write ch buf pos len =
-  try
-    if windows_hack && recent_ocaml then
-      raise (Unix.Unix_error (Unix.EAGAIN, "", ""));
-    Lwt.return (Unix.write ch buf pos len)
-  with
-    Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK), _, _) ->
-      let res = Lwt.wait () in
-      outputs := (ch, `Write (buf, pos, len, res)) :: !outputs;
-      res
-  | e ->
-      Lwt.fail e
-
-(*
-let pipe () =
-  let (in_fd, out_fd) as fd_pair = Unix.pipe() in
-  if not windows_hack then begin
-    Unix.set_nonblock in_fd;
-    Unix.set_nonblock out_fd
-  end;
-  fd_pair
-*)
-
-let pipe_in () =
-  let (in_fd, out_fd) as fd_pair = Unix.pipe() in
-  if not windows_hack then
-    Unix.set_nonblock in_fd;
-  fd_pair
-
-let pipe_out () =
-  let (in_fd, out_fd) as fd_pair = Unix.pipe() in
-  if not windows_hack then
-    Unix.set_nonblock out_fd;
-  fd_pair
-
-let socket dom typ proto =
-  let s = Unix.socket dom typ proto in
-  if not windows_hack then Unix.set_nonblock s;
-  s
-
-let socketpair dom typ proto =
-  let (s1, s2) as spair = Unix.socketpair dom typ proto in
-  if not windows_hack then begin
-    Unix.set_nonblock s1; Unix.set_nonblock s2
-  end;
-  Lwt.return spair
-
-let bind = Unix.bind
-let setsockopt = Unix.setsockopt
-let listen = Unix.listen
-let close = Unix.close
-let set_close_on_exec = Unix.set_close_on_exec
-
-let accept ch =
-  let res = Lwt.wait () in
-  inputs := (ch, `Accept res) :: !inputs;
-  res
-
-let check_socket ch =
-  let res = Lwt.wait () in
-  outputs := (ch, `CheckSocket res) :: !outputs;
-  res
-
-let connect s addr =
-  try
-    Unix.connect s addr;
-    Lwt.return ()
-  with
-    Unix.Unix_error
-      ((Unix.EINPROGRESS | Unix.EWOULDBLOCK | Unix.EAGAIN), _, _) ->
-        check_socket s
-  | e ->
-      Lwt.fail e
-
-let ids = ref 0
-let new_id () = incr ids; !ids
-
-let _waitpid flags pid =
-  try
-    Lwt.return (Unix.waitpid flags pid)
-  with e ->
-    Lwt.fail e
-
-let waitpid flags pid =
-  if List.mem Unix.WNOHANG flags || windows_hack then
-    _waitpid flags pid
-  else
-    let flags = Unix.WNOHANG :: flags in
-    Lwt.bind (_waitpid flags pid) (fun ((pid', _) as res) ->
-    if pid' <> 0 then
-      Lwt.return res
-    else
-      let res = Lwt.wait () in
-      wait_children := (new_id (), (res, flags, pid)) :: !wait_children;
-      res)
-
-let wait () = waitpid [] (-1)
-
-let system cmd =
-  match Unix.fork () with
-     0 -> Unix.execv "/bin/sh" [| "/bin/sh"; "-c"; cmd |]
-  | id -> Lwt.bind (waitpid [] id) (fun (pid, status) -> Lwt.return status)
-
-(****)
-
-type lwt_in_channel = in_channel
-type lwt_out_channel = out_channel
-
-let intern_in_channel ch =
-  Unix.set_nonblock (Unix.descr_of_in_channel ch); ch
-let intern_out_channel ch =
-  Unix.set_nonblock (Unix.descr_of_out_channel ch); ch
-
-
-let wait_inchan ic = wait_read (Unix.descr_of_in_channel ic)
-let wait_outchan oc = wait_write (Unix.descr_of_out_channel oc)
-
-let rec input_char ic =
-  try
-    Lwt.return (Pervasives.input_char ic)
-  with
-    Sys_blocked_io ->
-      Lwt.bind (wait_inchan ic) (fun () -> input_char ic)
-  | e ->
-      Lwt.fail e
-
-let rec input ic s ofs len =
-  try
-    Lwt.return (Pervasives.input ic s ofs len)
-  with
-    Sys_blocked_io ->
-      Lwt.bind (wait_inchan ic) (fun () -> input ic s ofs len)
-  | e ->
-      Lwt.fail e
-
-let rec unsafe_really_input ic s ofs len =
-  if len <= 0 then
-    Lwt.return ()
-  else begin
-    Lwt.bind (input ic s ofs len) (fun r ->
-    if r = 0
-    then Lwt.fail End_of_file
-    else unsafe_really_input ic s (ofs+r) (len-r))
-  end
-
-let really_input ic s ofs len =
-  if ofs < 0 || len < 0 || ofs > String.length s - len
-  then Lwt.fail (Invalid_argument "really_input")
-  else unsafe_really_input ic s ofs len
-
-let input_line ic =
-  let buf = ref (String.create 128) in
-  let pos = ref 0 in
-  let rec loop () =
-    if !pos = String.length !buf then begin
-      let newbuf = String.create (2 * !pos) in
-      String.blit !buf 0 newbuf 0 !pos;
-      buf := newbuf
-    end;
-    Lwt.bind (input_char ic) (fun c ->
-    if c = '\n' then
-      Lwt.return ()
-    else begin
-      !buf.[!pos] <- c;
-      incr pos;
-      loop ()
-    end)
-  in
-  Lwt.bind
-    (Lwt.catch loop
-       (fun e ->
-          match e with
-            End_of_file when !pos <> 0 ->
-              Lwt.return ()
-          | _ ->
-              Lwt.fail e))
-    (fun () ->
-       let res = String.create !pos in
-       String.blit !buf 0 res 0 !pos;
-       Lwt.return res)
-
-(****)
-
-type popen_process =
-    Process of in_channel * out_channel
-  | Process_in of in_channel
-  | Process_out of out_channel
-  | Process_full of in_channel * out_channel * in_channel
-
-let popen_processes = (Hashtbl.create 7 : (popen_process, int) Hashtbl.t)
-
-let open_proc cmd proc input output toclose =
-  match Unix.fork () with
-     0 -> if input <> Unix.stdin then begin
-            Unix.dup2 input Unix.stdin;
-            Unix.close input
-          end;
-          if output <> Unix.stdout then begin
-            Unix.dup2 output Unix.stdout;
-            Unix.close output
-          end;
-          List.iter Unix.close toclose;
-          Unix.execv "/bin/sh" [| "/bin/sh"; "-c"; cmd |]
-  | id -> Hashtbl.add popen_processes proc id
-
-let open_process_in cmd =
-  let (in_read, in_write) = pipe_in () in
-  let inchan = Unix.in_channel_of_descr in_read in
-  open_proc cmd (Process_in inchan) Unix.stdin in_write [in_read];
-  Unix.close in_write;
-  Lwt.return inchan
-
-let open_process_out cmd =
-  let (out_read, out_write) = pipe_out () in
-  let outchan = Unix.out_channel_of_descr out_write in
-  open_proc cmd (Process_out outchan) out_read Unix.stdout [out_write];
-  Unix.close out_read;
-  Lwt.return outchan
-
-let open_process cmd =
-  let (in_read, in_write) = pipe_in () in
-  let (out_read, out_write) = pipe_out () in
-  let inchan = Unix.in_channel_of_descr in_read in
-  let outchan = Unix.out_channel_of_descr out_write in
-  open_proc cmd (Process(inchan, outchan)) out_read in_write
-                                           [in_read; out_write];
-  Unix.close out_read;
-  Unix.close in_write;
-  Lwt.return (inchan, outchan)
-
-(* FIX: Subprocesses that use /dev/tty to print things on the terminal
-   will NOT have this output captured and returned to the caller of this
-   function.  There's an argument that this is correct, but if we are
-   running from a GUI the user may not be looking at any terminal and it
-   will appear that the process is just hanging.  This can be fixed, in
-   principle, by writing a little C code that opens /dev/tty and then uses
-   the TIOCNOTTY ioctl control to detach the terminal. *)
-
-let open_proc_full cmd env proc input output error toclose =
-  match Unix.fork () with
-     0 -> Unix.dup2 input Unix.stdin; Unix.close input;
-          Unix.dup2 output Unix.stdout; Unix.close output;
-          Unix.dup2 error Unix.stderr; Unix.close error;
-          List.iter Unix.close toclose;
-          Unix.execve "/bin/sh" [| "/bin/sh"; "-c"; cmd |] env
-  | id -> Hashtbl.add popen_processes proc id
-
-let open_process_full cmd env =
-  let (in_read, in_write) = pipe_in () in
-  let (out_read, out_write) = pipe_out () in
-  let (err_read, err_write) = pipe_in () in
-  let inchan = Unix.in_channel_of_descr in_read in
-  let outchan = Unix.out_channel_of_descr out_write in
-  let errchan = Unix.in_channel_of_descr err_read in
-  open_proc_full cmd env (Process_full(inchan, outchan, errchan))
-                 out_read in_write err_write [in_write; out_read; err_read];
-  Unix.close out_read;
-  Unix.close in_write;
-  Unix.close err_write;
-  Lwt.return (inchan, outchan, errchan)
-
-let find_proc_id fun_name proc =
-  try
-    let pid = Hashtbl.find popen_processes proc in
-    Hashtbl.remove popen_processes proc;
-    pid
-  with Not_found ->
-    raise (Unix.Unix_error (Unix.EBADF, fun_name, ""))
-
-let close_process_in inchan =
-  let pid = find_proc_id "close_process_in" (Process_in inchan) in
-  close_in inchan;
-  Lwt.bind (waitpid [] pid) (fun (_, status) -> Lwt.return status)
-
-let close_process_out outchan =
-  let pid = find_proc_id "close_process_out" (Process_out outchan) in
-  close_out outchan;
-  Lwt.bind (waitpid [] pid) (fun (_, status) -> Lwt.return status)
-
-let close_process (inchan, outchan) =
-  let pid = find_proc_id "close_process" (Process(inchan, outchan)) in
-  close_in inchan; close_out outchan;
-  Lwt.bind (waitpid [] pid) (fun (_, status) -> Lwt.return status)
-
-let close_process_full (outchan, inchan, errchan) =
-  let pid =
-    find_proc_id "close_process_full"
-                 (Process_full(outchan, inchan, errchan)) in
-  close_out inchan; close_in outchan; close_in errchan;
-  Lwt.bind (waitpid [] pid) (fun (_, status) -> Lwt.return status)

Added: trunk/src/lwt/lwt_unix.ml
===================================================================
--- trunk/src/lwt/lwt_unix.ml	                        (rev 0)
+++ trunk/src/lwt/lwt_unix.ml	2010-01-22 09:52:57 UTC (rev 406)
@@ -0,0 +1 @@
+include Lwt_unix_impl

Modified: trunk/src/lwt/lwt_unix.mli
===================================================================
--- trunk/src/lwt/lwt_unix.mli	2010-01-20 16:11:27 UTC (rev 405)
+++ trunk/src/lwt/lwt_unix.mli	2010-01-22 09:52:57 UTC (rev 406)
@@ -42,9 +42,6 @@
 val pipe_out : unit -> Unix.file_descr * file_descr
 val socket :
   Unix.socket_domain -> Unix.socket_type -> int -> file_descr
-val socketpair :
-  Unix.socket_domain -> Unix.socket_type -> int ->
-  (file_descr * file_descr) Lwt.t
 val bind : file_descr -> Unix.sockaddr -> unit
 val setsockopt : file_descr -> Unix.socket_bool_option -> bool -> unit
 val accept : file_descr -> (file_descr * Unix.sockaddr) Lwt.t
@@ -53,32 +50,7 @@
 val close : file_descr -> unit
 val set_close_on_exec : file_descr -> unit
 
-val wait : unit -> (int * Unix.process_status) Lwt.t
-val waitpid : Unix.wait_flag list -> int -> (int * Unix.process_status) Lwt.t
-
-val system : string -> Unix.process_status Lwt.t
-
 type lwt_in_channel
-type lwt_out_channel
 
 val intern_in_channel : in_channel -> lwt_in_channel
-val intern_out_channel : out_channel -> lwt_out_channel
-
-val input_char : lwt_in_channel -> char Lwt.t
 val input_line : lwt_in_channel -> string Lwt.t
-val input : lwt_in_channel -> string -> int -> int -> int Lwt.t
-val really_input : lwt_in_channel -> string -> int -> int -> unit Lwt.t
-
-val open_process_in: string -> lwt_in_channel Lwt.t
-val open_process_out: string -> lwt_out_channel Lwt.t
-val open_process: string -> (lwt_in_channel * lwt_out_channel) Lwt.t
-val open_process_full:
-  string -> string array ->
-  (lwt_in_channel * lwt_out_channel * lwt_in_channel) Lwt.t
-val close_process_in: lwt_in_channel -> Unix.process_status Lwt.t
-val close_process_out: lwt_out_channel -> Unix.process_status Lwt.t
-val close_process:
-  lwt_in_channel * lwt_out_channel -> Unix.process_status Lwt.t
-val close_process_full:
-  lwt_in_channel * lwt_out_channel * lwt_in_channel ->
-  Unix.process_status Lwt.t

Added: trunk/src/lwt/lwt_unix_stubs.c
===================================================================
--- trunk/src/lwt/lwt_unix_stubs.c	                        (rev 0)
+++ trunk/src/lwt/lwt_unix_stubs.c	2010-01-22 09:52:57 UTC (rev 406)
@@ -0,0 +1,600 @@
+#include <wtypes.h>
+#include <winbase.h>
+#include <mswsock.h>
+#include <winsock2.h>
+#include <errno.h>
+#include <stdio.h>
+
+#include <caml/mlvalues.h>
+#include <caml/alloc.h>
+#include <caml/memory.h>
+#include <caml/fail.h>
+#include <caml/bigarray.h>
+#include <caml/callback.h>
+
+//#define D(x) x
+#define D(x) while(0){}
+
+#define UNIX_BUFFER_SIZE 16384
+#define Nothing ((value) 0)
+
+typedef struct
+{
+  OVERLAPPED overlapped;
+  long id;
+  long action;
+} completionData;
+
+struct filedescr {
+  union {
+    HANDLE handle;
+    SOCKET socket;
+  } fd;
+  enum { KIND_HANDLE, KIND_SOCKET } kind;
+  int crt_fd;
+};
+#define Handle_val(v) (((struct filedescr *) Data_custom_val(v))->fd.handle)
+#define Socket_val(v) (((struct filedescr *) Data_custom_val(v))->fd.socket)
+
+extern void win32_maperr (DWORD errcode);
+extern void uerror (char * cmdname, value arg);
+extern value unix_error_of_code (int errcode);
+extern value win_alloc_handle (HANDLE h);
+extern value win_alloc_socket(SOCKET);
+extern void get_sockaddr (value mladdr,
+                          struct sockaddr * addr /*out*/,
+                          int * addr_len /*out*/);
+
+#define Array_data(a, i) (((char *) a->data) + Long_val(i))
+
+CAMLprim value ml_blit_string_to_buffer
+(value s, value i, value a, value j, value l)
+{
+  char *src = String_val(s) + Int_val(i);
+  char *dest = Array_data(Bigarray_val(a), j);
+  memcpy(dest, src, Long_val(l));
+  return Val_unit;
+}
+
+CAMLprim value ml_blit_buffer_to_string
+(value a, value i, value s, value j, value l)
+{
+  char *src = Array_data(Bigarray_val(a), i);
+  char *dest = String_val(s) + Long_val(j);
+  memcpy(dest, src, Long_val(l));
+  return Val_unit;
+}
+
+/****/
+
+#define READ 0
+#define WRITE 1
+#define READ_OVERLAPPED 2
+#define WRITE_OVERLAPPED 3
+
+static char * action_name[4] = {
+  "read", "write", "read(overlapped)", "write(overlapped)"
+};
+
+static value completionCallback;
+
+static void invoke_completion_callback
+(long id, long len, long errCode, long action) {
+  CAMLlocal2 (err, name);
+  value args[4];
+  err = Val_long(0);
+  if (errCode != NO_ERROR) {
+    len = -1;
+    win32_maperr (errCode);
+    err = unix_error_of_code(errno);
+  }
+  name = copy_string (action_name[action]);
+  D(printf("Action %s completed: id %ld -> len %ld / err %d (errCode %ld)\n",
+           action_name[action], id, len, errno, errCode));
+  args[0] = Val_long(id);
+  args[1] = Val_long(len);
+  args[2] = err;
+  args[3] = name;
+  caml_callbackN(completionCallback, 4, args);
+  D(printf("Callback performed\n"));
+}
+
+typedef struct {
+  long id;
+  long len;
+  long errCode;
+  long action; } completionInfo;
+
+int compN = 0;
+int complQueueSize = 0;
+completionInfo * complQueue = NULL;
+
+static void completion (long id, long len, long errCode, long action) {
+  D(printf("Queueing action %s: id %ld -> len %ld / err %d (errCode %ld)\n",
+           action_name[action], id, len, errno, errCode));
+  if (compN + 1 > complQueueSize) {
+    int n = complQueueSize * 2 + 1;
+    D(printf("Resizing queue to %d\n", n));
+    completionInfo * queue =
+      (completionInfo *) GlobalAlloc(GPTR, n * sizeof(completionInfo));
+    if (complQueue != NULL)
+      CopyMemory (queue, complQueue, complQueueSize * sizeof(completionInfo));
+    complQueue = queue;
+    complQueueSize = n;
+  }
+  complQueue[compN].id = id;
+  complQueue[compN].len = len;
+  complQueue[compN].errCode = errCode;
+  complQueue[compN].action = action;
+  compN++;
+}
+
+CAMLprim value get_queue (value unit) {
+  CAMLparam1 (unit);
+  int i;
+  for (i = 0; i < compN; i++)
+    invoke_completion_callback
+      (complQueue[i].id, complQueue[i].len,
+       complQueue[i].errCode, complQueue[i].action);
+  compN = 0;
+  CAMLreturn (Val_unit);
+}
+
+/****/
+
+static HANDLE main_thread;
+
+static DWORD CALLBACK helper_thread (void * param) {
+  D(printf("Helper thread created\n"));
+  while (1) SleepEx(INFINITE, TRUE);
+}
+
+static VOID CALLBACK exit_thread(ULONG_PTR param) {
+  D(printf("Helper thread exiting\n"));
+  ExitThread(0);
+}
+
+static HANDLE get_helper_thread (value threads, int kind) {
+  HANDLE h = (HANDLE) Field(threads, kind);
+
+  if (h != INVALID_HANDLE_VALUE) return h;
+
+  h = CreateThread (NULL, 0, helper_thread, NULL, 0, NULL);
+  if (h == NULL) {
+    win32_maperr (GetLastError ());
+    uerror("createHelperThread", Nothing);
+  }
+  Field(threads, kind) = (value) h;
+  return h;
+}
+
+static void kill_thread (HANDLE *h) {
+  D(printf("Killing thread\n"));
+  QueueUserAPC(exit_thread, *h, 0);
+  CloseHandle(*h);
+  *h = INVALID_HANDLE_VALUE;
+}
+
+CAMLprim value win_kill_threads (value fd) {
+  CAMLparam1(fd);
+  if (Field(fd, 1) != Val_long(0)) {
+    kill_thread((HANDLE *) &Field(Field(fd, 1), READ));
+    kill_thread((HANDLE *) &Field(Field(fd, 1), WRITE));
+  }
+  CAMLreturn(Val_unit);
+}
+
+CAMLprim value win_wrap_fd (value fd) {
+  CAMLparam1(fd);
+  CAMLlocal2(th, res);
+  D(printf("Wrapping file descriptor (sync)\n"));
+  res = caml_alloc_tuple(2);
+  Store_field(res, 0, fd);
+  th = caml_alloc(2, Abstract_tag);
+  Field(th, READ) = (value) INVALID_HANDLE_VALUE;
+  Field(th, WRITE) = (value) INVALID_HANDLE_VALUE;
+  Store_field(res, 1, th);
+  CAMLreturn(res);
+}
+
+/****/
+
+typedef struct {
+  long action;
+  long id;
+  HANDLE fd;
+  char * buffer;
+  long len;
+  long error;
+} ioInfo;
+
+
+static VOID CALLBACK thread_completion(ULONG_PTR param) {
+  ioInfo * info = (ioInfo *) param;
+  completion (info->id, info->len, info->error, info->action);
+  GlobalFree (info);
+}
+
+static VOID CALLBACK perform_io_on_thread(ULONG_PTR param) {
+  ioInfo * info = (ioInfo *) param;
+  DWORD l;
+  BOOL res;
+
+  D(printf("Starting %s: id %ld, len %ld\n",
+           action_name[info->action], info->id, info->len));
+
+  res =
+    (info->action == READ)?
+    ReadFile(info->fd, info->buffer,info->len, &l, NULL):
+    WriteFile(info->fd, info->buffer,info->len, &l, NULL);
+  if (!res) {
+    info->len = -1;
+    info->error = GetLastError ();
+  } else {
+    info->len = l;
+    info->error = NO_ERROR;
+  }
+  D(printf("Action %s done: id %ld -> len %ld / err %d (errCode %ld)\n",
+           action_name[info->action],
+           info->id, info->len, errno, info->error));
+  QueueUserAPC(thread_completion, main_thread, param);
+}
+
+static void thread_io
+(long action, long id, value threads, HANDLE h, char * buf, long len) {
+  struct caml_bigarray *buf_arr = Bigarray_val(buf);
+  ioInfo * info = GlobalAlloc(GPTR, sizeof(ioInfo));
+  if (info == NULL) {
+    errno = ENOMEM;
+    uerror(action_name[action], Nothing);
+  }
+
+  info->action = action;
+  info->id = id;
+  info->fd = h;
+  info->buffer = buf;
+  info->len = len;
+
+  h = get_helper_thread(threads, action);
+  QueueUserAPC(perform_io_on_thread, h, (ULONG_PTR) info);
+}
+
+/****/
+
+static void CALLBACK overlapped_completion
+(DWORD errCode, DWORD len, LPOVERLAPPED overlapped) {
+  completionData * d = (completionData * )overlapped;
+  completion (d->id, len, errCode, d->action);
+  GlobalFree (d);
+}
+
+static void overlapped_action(long action, long id,
+                              HANDLE fd, char *buf, long len) {
+  BOOL res;
+  long err;
+  completionData * d = GlobalAlloc(GPTR, sizeof(completionData));
+  if (d == NULL) {
+    errno = ENOMEM;
+    uerror(action_name[action], Nothing);
+  }
+  d->id = id;
+  d->action = action;
+
+  D(printf("Starting %s: id %ld, len %ld\n", action_name[action], id, len));
+  res =
+    (action == READ_OVERLAPPED)?
+    ReadFileEx(fd, buf, len, &(d->overlapped), overlapped_completion):
+    WriteFileEx(fd, buf, len, &(d->overlapped), overlapped_completion);
+
+  if (!res) {
+    err = GetLastError ();
+    if (err != ERROR_IO_PENDING) {
+      win32_maperr (err);
+  D(printf("Action %s failed: id %ld -> err %d (errCode %ld)\n",
+           action_name[action], id, errno, err));
+      uerror("ReadFileEx", Nothing);
+    }
+  }
+}
+
+CAMLprim value win_wrap_overlapped (value fd) {
+  CAMLparam1(fd);
+  CAMLlocal1(res);
+  D(printf("Wrapping file descriptor (async)\n"));
+  res = caml_alloc_tuple(2);
+  Store_field(res, 0, fd);
+  Store_field(res, 1, Val_long(0));
+  CAMLreturn(res);
+}
+
+/****/
+
+#define Handle(fd) Handle_val(Field(fd, 0))
+
+CAMLprim value win_read
+(value fd, value buf, value ofs, value len, value id) {
+  CAMLparam4(fd, buf, ofs, len);
+  struct caml_bigarray *buf_arr = Bigarray_val(buf);
+
+  if (Field(fd, 1) == Val_long(0))
+    overlapped_action (READ_OVERLAPPED, Long_val(id), Handle(fd),
+                       Array_data (buf_arr, ofs), Long_val(len));
+  else
+    thread_io (READ, Long_val(id), Field(fd, 1), Handle(fd),
+               Array_data (buf_arr, ofs), Long_val(len));
+  CAMLreturn (Val_unit);
+}
+
+CAMLprim value win_write
+(value fd, value buf, value ofs, value len, value id) {
+  CAMLparam4(fd, buf, ofs, len);
+  struct caml_bigarray *buf_arr = Bigarray_val(buf);
+
+  if (Field(fd, 1) == Val_long(0))
+    overlapped_action (WRITE_OVERLAPPED, Long_val(id), Handle(fd),
+                       Array_data (buf_arr, ofs), Long_val(len));
+  else
+    thread_io (WRITE, Long_val(id), Field(fd, 1), Handle(fd),
+               Array_data (buf_arr, ofs), Long_val(len));
+  CAMLreturn (Val_unit);
+}
+
+/*
+#ifndef SO_UPDATE_CONNECT_CONTEXT
+#define SO_UPDATE_CONNECT_CONTEXT 0x7010
+#endif
+
+static void after_connect (SOCKET s) {
+  if (!setsockopt(s, SOL_SOCKET, SO_UPDATE_CONNECT_CONTEXT, NULL, 0)) {
+    win32_maperr (GetLastError ());
+    uerror("after_connect", Nothing);
+  }
+}
+*/
+
+static HANDLE events[MAXIMUM_WAIT_OBJECTS];
+//static OVERLAPPED oData[MAXIMUM_WAIT_OBJECTS];
+
+CAMLprim value win_register_wait (value socket, value kind, value idx) {
+  CAMLparam3(socket, kind, idx);
+  long i = Long_val(idx);
+  long mask;
+
+  D(printf("Register: i %ld, kind %ld\n", Long_val(i), Long_val(kind)));
+  events[i] = CreateEvent(NULL, TRUE, FALSE, NULL);
+  mask = (Long_val(kind) == 0) ? FD_CONNECT : FD_ACCEPT;
+  if (WSAEventSelect(Socket_val(socket), events[i], mask) == SOCKET_ERROR) {
+    win32_maperr(WSAGetLastError ());
+    uerror("WSAEventSelect", Nothing);
+  }
+
+  CAMLreturn (Val_unit);
+}
+
+CAMLprim value win_check_connection (value socket, value kind, value idx) {
+  CAMLparam3 (socket, kind, idx);
+  WSANETWORKEVENTS evs;
+  int res, err, i = Long_val(idx);
+
+  D(printf("Check connection... %d\n", i));
+  if (WSAEnumNetworkEvents(Socket_val(socket), NULL, &evs)) {
+    win32_maperr(WSAGetLastError ());
+    uerror("WSAEnumNetworkEvents", Nothing);
+  }
+  if (WSAEventSelect(Socket_val(socket), NULL, 0) == SOCKET_ERROR) {
+    win32_maperr(WSAGetLastError ());
+    uerror("WSAEventSelect", Nothing);
+  }
+  if (!CloseHandle(events[i])) {
+    win32_maperr(GetLastError ());
+    uerror("CloseHandle", Nothing);
+  }
+  err =
+    evs.iErrorCode[(Long_val(kind) == 0) ? FD_CONNECT_BIT : FD_ACCEPT_BIT];
+  D(printf("Check connection: %ld, err %d\n", evs.lNetworkEvents, err));
+  if (err != 0) {
+    win32_maperr(err);
+    uerror("check_connection", Nothing);
+  }
+  CAMLreturn (Val_unit);
+}
+
+static HANDLE dummyEvent;
+
+CAMLprim value init_lwt (value callback) {
+  CAMLparam1 (callback);
+  //  GUID GuidConnectEx = WSAID_CONNECTEX;
+  //  SOCKET s;
+  //  DWORD l;
+  int i;
+
+  D(printf("Init...\n"));
+  register_global_root (&completionCallback);
+  completionCallback = callback;
+
+  dummyEvent = CreateEvent(NULL, TRUE, FALSE, NULL);  // Dummy event
+
+  DuplicateHandle (GetCurrentProcess (), GetCurrentThread (),
+                   GetCurrentProcess (), &main_thread,
+                   0, FALSE, DUPLICATE_SAME_ACCESS);
+
+  /*
+  s = socket(AF_INET, SOCK_STREAM, 0);
+  if (s == INVALID_SOCKET) return Val_unit;
+  WSAIoctl(s, SIO_GET_EXTENSION_FUNCTION_POINTER,
+           &GuidConnectEx, sizeof(GuidConnectEx),
+           &ConnectEx, sizeof(ConnectExPtr),
+           &l, NULL, NULL);
+  closesocket(s);
+  */
+
+  D(printf("Init done\n"));
+  CAMLreturn (Val_long (MAXIMUM_WAIT_OBJECTS));
+}
+
+CAMLprim value win_wait (value timeout, value event_count) {
+  CAMLparam2(timeout, event_count);
+  DWORD t, t2;
+  DWORD res;
+  long ret, n = Long_val(event_count);
+  t = Long_val(timeout);
+  if (t < 0) t = INFINITE;
+  t2 = (compN > 0) ? 0 : t;
+  D(printf("Waiting: %ld events, timeout %ldms -> %ldms\n", n, t, t2));
+  res =
+    (n > 0) ?
+    WaitForMultipleObjectsEx(n, events, FALSE, t, TRUE) :
+    WaitForMultipleObjectsEx(1, &dummyEvent, FALSE, t, TRUE);
+  D(printf("Done waiting\n"));
+  if ((t != t2) && (res == WAIT_TIMEOUT)) res = WAIT_IO_COMPLETION;
+  switch (res) {
+  case WAIT_TIMEOUT:
+    D(printf("Timeout\n"));
+    ret = -1;
+    break;
+  case WAIT_IO_COMPLETION:
+    D(printf("I/O completion\n"));
+    ret = -2;
+    break;
+  case WAIT_FAILED:
+    D(printf("Wait failed\n"));
+    ret = 0;
+    win32_maperr (GetLastError ());
+    uerror("WaitForMultipleObjectsEx", Nothing);
+    break;
+  default:
+    ret = res;
+    D(printf("Event: %ld\n", res));
+    break;
+  }
+  get_queue (Val_unit);
+  CAMLreturn (Val_long(ret));
+}
+
+static long pipeSerial;
+
+value win_pipe(long readMode, long writeMode) {
+  CAMLparam0();
+  SECURITY_ATTRIBUTES attr;
+  HANDLE readh, writeh;
+  CHAR name[MAX_PATH];
+  CAMLlocal3(readfd, writefd, res);
+
+  attr.nLength = sizeof(attr);
+  attr.lpSecurityDescriptor = NULL;
+  attr.bInheritHandle = TRUE;
+
+  sprintf(name, "\\\\.\\Pipe\\UnisonAnonPipe.%08lx.%08lx",
+             GetCurrentProcessId(), pipeSerial++);
+
+  readh =
+    CreateNamedPipeA
+    (name, PIPE_ACCESS_INBOUND | readMode, PIPE_TYPE_BYTE | PIPE_WAIT,
+     1, UNIX_BUFFER_SIZE, UNIX_BUFFER_SIZE, 0, &attr);
+
+  if (readh == INVALID_HANDLE_VALUE) {
+    win32_maperr(GetLastError());
+    uerror("CreateNamedPipe", Nothing);
+    return FALSE;
+  }
+
+  writeh =
+    CreateFileA
+    (name, GENERIC_WRITE, 0, &attr, OPEN_EXISTING,
+     FILE_ATTRIBUTE_NORMAL | writeMode, NULL);
+
+  if (writeh == INVALID_HANDLE_VALUE) {
+    win32_maperr(GetLastError());
+    CloseHandle(readh);
+    uerror("CreateFile", Nothing);
+    return FALSE;
+  }
+
+  readfd = win_alloc_handle(readh);
+  writefd = win_alloc_handle(writeh);
+  res = alloc_small(2, 0);
+  Store_field(res, 0, readfd);
+  Store_field(res, 1, writefd);
+  CAMLreturn (res);
+}
+
+CAMLprim value win_pipe_in (value unit) {
+  CAMLparam0();
+  CAMLreturn (win_pipe (FILE_FLAG_OVERLAPPED, 0));
+}
+
+CAMLprim value win_pipe_out (value unit) {
+  CAMLparam0();
+  CAMLreturn (win_pipe (0, FILE_FLAG_OVERLAPPED));
+}
+
+static int socket_domain_table[] = {
+  PF_UNIX, PF_INET
+};
+
+static int socket_type_table[] = {
+  SOCK_STREAM, SOCK_DGRAM, SOCK_RAW, SOCK_SEQPACKET
+};
+
+CAMLprim value win_socket (value domain, value type, value proto) {
+  CAMLparam3(domain, type, proto);
+  SOCKET s;
+
+  s = WSASocket(socket_domain_table[Int_val(domain)],
+                socket_type_table[Int_val(type)],
+                Int_val(proto),
+                NULL, 0, WSA_FLAG_OVERLAPPED);
+  D(printf("Created socket %lx\n", (long)s));
+  if (s == INVALID_SOCKET) {
+    win32_maperr(WSAGetLastError ());
+    uerror("WSASocket", Nothing);
+  }
+  CAMLreturn(win_alloc_socket(s));
+}
+
+/*
+#ifndef WSAID_CONNECTEX
+#define WSAID_CONNECTEX \
+        {0x25a207b9,0xddf3,0x4660,{0x8e,0xe9,0x76,0xe5,0x8c,0x74,0x06,0x3e}}
+#endif
+
+typedef BOOL (WINAPI *ConnectExPtr)(SOCKET, const struct sockaddr *, int, PVOID, DWORD, LPDWORD, LPOVERLAPPED);
+
+static ConnectExPtr ConnectEx = NULL;
+
+CAMLprim value win_connect (value socket, value address, value id) {
+  CAMLparam3(socket, address, id);
+  SOCKET s = Socket_val (socket);
+  struct sockaddr addr;
+  int addr_len;
+  DWORD err;
+  int i;
+
+  if (ConnectEx == NULL) {
+    errno = ENOSYS;
+    uerror("ConnectEx", Nothing);
+  }
+  if (eventCount == MAXIMUM_WAIT_OBJECTS) {
+    errno = EAGAIN;
+    uerror("ConnectEx", Nothing);
+  }
+  i = free_list[eventCount];
+  eventCount++;
+
+  ZeroMemory(&(oData[i]), sizeof(OVERLAPPED));
+  oData[i].hEvent = events[i];
+  ids[i] = Long_val(id);
+  sockets[i] = s;
+
+  get_sockaddr(address, &addr, &addr_len);
+  if (!ConnectEx(s, &addr, addr_len, NULL, 0, 0, &(oData[i]))) {
+    err = WSAGetLastError ();
+    if (err != ERROR_IO_PENDING) {
+      win32_maperr(err);
+      uerror("ConnectEx", Nothing);
+    }
+  } else
+      after_connect(s);
+  CAMLreturn (Val_unit);
+}
+*/


Property changes on: trunk/src/lwt/win
___________________________________________________________________
Added: svn:ignore
   + *.cmx
*.cmi
*.cmo


Added: trunk/src/lwt/win/lwt_unix_impl.ml
===================================================================
--- trunk/src/lwt/win/lwt_unix_impl.ml	                        (rev 0)
+++ trunk/src/lwt/win/lwt_unix_impl.ml	2010-01-22 09:52:57 UTC (rev 406)
@@ -0,0 +1,645 @@
+(*
+- should check all events before looping again for avoiding race
+  conditions...
+  (we have the first, scan the subsequent ones)
+*)
+
+let no_overlapped_io = false
+let d = ref false
+
+(****)
+
+type buffer =
+  (char, Bigarray.int8_unsigned_elt, Bigarray.c_layout) Bigarray.Array1.t
+
+let buffer_create l = Bigarray.Array1.create Bigarray.char Bigarray.c_layout l
+
+external unsafe_blit_string_to_buffer :
+  string -> int -> buffer -> int -> int -> unit = "ml_blit_string_to_buffer"
+external unsafe_blit_buffer_to_string :
+  buffer -> int -> string -> int -> int -> unit = "ml_blit_buffer_to_string"
+
+let buffer_length = Bigarray.Array1.dim
+
+let blit_string_to_buffer s i a j l =
+  if l < 0 || i < 0 || i > String.length s - l
+           || j < 0 || j > buffer_length a - l
+  then invalid_arg "Lwt_unix.blit_string_to_buffer"
+  else unsafe_blit_string_to_buffer s i a j l
+
+let blit_buffer_to_string a i s j l =
+  if l < 0 || i < 0 || i > buffer_length a - l
+           || j < 0 || j > String.length s - l
+  then invalid_arg "Lwt_unix.blit_buffer_to_string"
+  else unsafe_blit_buffer_to_string a i s j l
+
+let buffer_size = 16384
+
+let avail_buffers = ref []
+
+let acquire_buffer () =
+  match !avail_buffers with
+    []     -> buffer_create buffer_size
+  | b :: r -> avail_buffers := r; b
+
+let release_buffer b = avail_buffers := b :: !avail_buffers
+
+(****)
+
+let last_id = ref 0
+let free_list = ref (Array.init 1 (fun i -> i))
+
+let acquire_id () =
+  let len = Array.length !free_list in
+  if !last_id = len then begin
+    let a = Array.init (len * 2) (fun i -> i) in
+    Array.blit !free_list 0 a 0 len;
+    free_list := a
+  end;
+  let i = !free_list.(!last_id) in
+  incr last_id;
+  i
+
+let release_id i =
+  decr last_id;
+  !free_list.(!last_id) <- i
+
+(****)
+
+let completionEvents = ref []
+
+let actionCompleted id len errno name =
+  completionEvents := (id, len, errno, name) :: !completionEvents
+
+external init_lwt :
+  (int -> int -> Unix.error -> string -> unit) -> int = "init_lwt"
+
+let max_event_count = init_lwt actionCompleted
+
+let event_count = ref 0
+let free_list = Array.init max_event_count (fun i -> i)
+
+let acquire_event nm =
+  if !event_count = max_event_count then
+    raise (Unix.Unix_error (Unix.EAGAIN, nm, ""));
+  let i = free_list.(!event_count) in
+  incr event_count;
+  i
+
+let release_event i =
+  decr event_count;
+  free_list.(!event_count) <- i
+
+(****)
+
+type helpers
+type file_descr = { fd : Unix.file_descr; helpers : helpers }
+
+external of_unix_file_descr : Unix.file_descr -> file_descr = "win_wrap_fd"
+
+external win_wrap_async : Unix.file_descr -> file_descr = "win_wrap_overlapped"
+
+let wrap_async =
+  if no_overlapped_io then of_unix_file_descr else win_wrap_async
+
+(****)
+
+module SleepQueue =
+  Pqueue.Make (struct
+    type t = float * int * unit Lwt.t
+    let compare (t, i, _) (t', i', _) =
+      let c = compare t t' in
+      if c = 0 then i - i' else c
+  end)
+let sleep_queue = ref SleepQueue.empty
+
+let event_counter = ref 0
+
+let sleep d =
+  let res = Lwt.wait () in
+  incr event_counter;
+  let t = if d <= 0. then 0. else Unix.gettimeofday () +. d in
+  sleep_queue :=
+    SleepQueue.add (t, !event_counter, res) !sleep_queue;
+  res
+
+let yield () = sleep 0.
+
+let get_time t =
+  if !t = -1. then t := Unix.gettimeofday ();
+  !t
+
+let in_the_past now t =
+  t = 0. || t <= get_time now
+
+let rec restart_threads imax now =
+  match
+    try Some (SleepQueue.find_min !sleep_queue) with Not_found -> None
+  with
+    Some (time, i, thr) when in_the_past now time && i - imax <= 0 ->
+      sleep_queue := SleepQueue.remove_min !sleep_queue;
+if !d then Format.eprintf "RESTART at .";
+      Lwt.wakeup thr ();
+if !d then Format.eprintf "RESTART...DONE at .";
+      restart_threads imax now
+  | _ ->
+      ()
+
+module IntTbl =
+  Hashtbl.Make
+    (struct type t = int let equal (x : int) y = x = y let hash x = x end)
+
+let ioInFlight = IntTbl.create 17
+let connInFlight = IntTbl.create 17
+
+let handleCompletionEvent (id, len, errno, name) =
+if !d then Format.eprintf "Handling event %d (len %d)@." id len;
+  let (action, buf, res) =
+    try IntTbl.find ioInFlight id with Not_found -> assert false
+  in
+  begin match action with
+    `Write         -> ()
+  | `Read (s, pos) -> if len > 0 then blit_buffer_to_string buf 0 s pos len
+  end;
+  IntTbl.remove ioInFlight id;
+  release_id id;
+  release_buffer buf;
+  if len = -1 then
+    Lwt.wakeup_exn res (Unix.Unix_error (errno, name, ""))
+  else
+    Lwt.wakeup res len
+
+type kind = CONNECT | ACCEPT
+
+external win_wait : int -> int -> int = "win_wait"
+
+external win_register_wait :
+  Unix.file_descr -> kind -> int -> unit = "win_register_wait"
+
+external win_check_connection :
+  Unix.file_descr -> kind -> int -> unit = "win_check_connection"
+
+let handle_wait_event i ch kind cont action =
+if !d then prerr_endline "MMM";
+  let res =
+    try
+      Some (action ())
+    with
+      Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK | Unix.EINTR), _, _) ->
+if !d then prerr_endline "NNN";
+        win_register_wait ch.fd kind i;
+        None
+    | e ->
+if !d then prerr_endline "OOO";
+        release_event i;
+        IntTbl.remove connInFlight i;
+        Lwt.wakeup_exn cont e;
+        None
+  in
+  match res with
+    Some v ->
+if !d then prerr_endline "PPP";
+      release_event i;
+      IntTbl.remove connInFlight i;
+      Lwt.wakeup cont v
+  | None ->
+      ()
+
+let rec run thread =
+if !d then Format.eprintf "Main loop at .";
+  match Lwt.poll thread with
+    Some v ->
+if !d then Format.eprintf "DONE!@.";
+      v
+  | None ->
+      let next_event =
+        try
+          let (time, _, _) = SleepQueue.find_min !sleep_queue in Some time
+        with Not_found ->
+          None
+      in
+      let now = ref (-1.) in
+      let delay =
+        match next_event with
+          None      -> -1.
+        | Some 0.   -> 0.
+        | Some time -> max 0. (time -. get_time now)
+      in
+if !d then Format.eprintf "vvv at .";
+      let i =
+        try
+          win_wait (truncate (ceil (delay *. 1000.))) !event_count
+        with e -> assert false
+      in
+if !d then Format.eprintf "^^^@.";
+      if i = -1 then now := !now +. delay;
+      restart_threads !event_counter now;
+if !d then Format.eprintf "threads restarted at .";
+      let ev = !completionEvents in
+      completionEvents := [];
+      List.iter handleCompletionEvent (List.rev ev);
+      if i >= 0 then begin
+        let (kind, ch) =
+          try IntTbl.find connInFlight i with Not_found -> assert false in
+        match kind with
+          `CheckSocket res  ->
+if !d then prerr_endline "CHECK CONN";
+            handle_wait_event i ch CONNECT res
+               (fun () -> win_check_connection ch.fd CONNECT i)
+        | `Accept res ->
+if !d then prerr_endline "ACCEPT";
+            handle_wait_event i ch ACCEPT res
+              (fun () ->
+                 win_check_connection ch.fd ACCEPT i;
+                 let (v, info) = Unix.accept ch.fd in
+                 (wrap_async v, info))
+      end;
+(*
+      let infds = List.map fst !inputs in
+      let outfds = List.map fst !outputs in
+      let (readers, writers, _) =
+        if windows_hack && not recent_ocaml then
+          let writers = outfds in
+          let readers =
+            if delay = 0. || writers <> [] then [] else infds in
+          (readers, writers, [])
+        else if infds = [] && outfds = [] && delay = 0. then
+          ([], [], [])
+        else
+          try
+            let res = Unix.select infds outfds [] delay in
+            if delay > 0. && !now <> -1. then now := !now +. delay;
+            res
+          with
+            Unix.Unix_error (Unix.EINTR, _, _) ->
+              ([], [], [])
+          | Unix.Unix_error (Unix.EBADF, _, _) ->
+              (List.filter bad_fd infds, List.filter bad_fd outfds, [])
+          | Unix.Unix_error (Unix.EPIPE, _, _)
+            when windows_hack && recent_ocaml ->
+            (* Workaround for a bug in Ocaml 3.11: select fails with an
+               EPIPE error when the file descriptor is remotely closed *)
+              (infds, [], [])
+      in
+      restart_threads !event_counter now;
+      List.iter
+        (fun fd ->
+           try
+             match List.assoc fd !inputs with
+               `Read (buf, pos, len, res) ->
+                  wrap_syscall inputs fd res
+                    (fun () -> Unix.read fd buf pos len)
+             | `Accept res ->
+                  wrap_syscall inputs fd res
+                    (fun () ->
+                       let (s, i) = Unix.accept fd.fd in
+                       if not windows_hack then Unix.set_nonblock s;
+                       (wrap_async s, i))
+             | `Wait res ->
+                  wrap_syscall inputs fd res (fun () -> ())
+           with Not_found ->
+             ())
+        readers;
+      List.iter
+        (fun fd ->
+           try
+             match List.assoc fd !outputs with
+               `Write (buf, pos, len, res) ->
+                  wrap_syscall outputs fd res
+                    (fun () -> Unix.write fd buf pos len)
+             | `Wait res ->
+                  wrap_syscall inputs fd res (fun () -> ())
+           with Not_found ->
+             ())
+        writers;
+      if !child_exited then begin
+        child_exited := false;
+        List.iter
+          (fun (id, (res, flags, pid)) ->
+             wrap_syscall wait_children id res
+               (fun () ->
+                  let (pid', _) as v = Unix.waitpid flags pid in
+                  if pid' = 0 then raise Exit;
+                  v))
+          !wait_children
+      end;
+*)
+      run thread
+
+(****)
+
+let wait_read ch = assert false
+
+let wait_write ch = assert false
+
+external start_read :
+  file_descr -> buffer -> int -> int -> int -> unit = "win_read"
+external start_write :
+  file_descr -> buffer -> int -> int -> int -> unit = "win_write"
+
+let read ch s pos len =
+if !d then Format.eprintf "Start reading at .";
+  let id = acquire_id () in
+  let buf = acquire_buffer () in
+  let len = if len > buffer_size then buffer_size else len in
+  let res = Lwt.wait () in
+  IntTbl.add ioInFlight id (`Read (s, pos), buf, res);
+  start_read ch buf 0 len id;
+if !d then Format.eprintf "Reading started at .";
+  res
+
+let write ch s pos len =
+if !d then Format.eprintf "Start writing at .";
+  let id = acquire_id () in
+  let buf = acquire_buffer () in
+  let len = if len > buffer_size then buffer_size else len in
+  blit_string_to_buffer s pos buf 0 len;
+  let res = Lwt.wait () in
+  IntTbl.add ioInFlight id (`Write, buf, res);
+  start_write ch buf 0 len id;
+if !d then Format.eprintf "Writing started at .";
+  res
+
+external win_pipe_in :
+  unit -> Unix.file_descr * Unix.file_descr = "win_pipe_in"
+external win_pipe_out :
+  unit -> Unix.file_descr * Unix.file_descr = "win_pipe_out"
+
+let pipe_in () =
+  let (i, o) = if no_overlapped_io then Unix.pipe () else win_pipe_in () in
+  (wrap_async i, o)
+let pipe_out () =
+  let (i, o) = if no_overlapped_io then Unix.pipe () else win_pipe_out () in
+  (i, wrap_async o)
+
+external win_socket :
+  Unix.socket_domain -> Unix.socket_type -> int -> Unix.file_descr =
+  "win_socket"
+
+let socket d t p =
+  let s = if no_overlapped_io then Unix.socket d t p else win_socket d t p in
+  Unix.set_nonblock s;
+  wrap_async s
+
+let bind ch addr = Unix.bind ch.fd addr
+let setsockopt ch opt v = Unix.setsockopt ch.fd opt v
+let listen ch n = Unix.listen ch.fd n
+let set_close_on_exec ch = Unix.set_close_on_exec ch.fd
+
+external kill_threads : file_descr -> unit = "win_kill_threads"
+
+let close ch = Unix.close ch.fd; kill_threads ch
+
+let accept ch =
+  let res = Lwt.wait () in
+  let i = acquire_event "accept" in
+  IntTbl.add connInFlight i (`Accept res, ch);
+  win_register_wait ch.fd ACCEPT i;
+  res
+
+let check_socket ch =
+  let res = Lwt.wait () in
+  let i = acquire_event "connect" in
+  IntTbl.add connInFlight i (`CheckSocket res, ch);
+  win_register_wait ch.fd CONNECT i;
+  res
+
+let connect s addr =
+  try
+    Unix.connect s.fd addr;
+if !d then prerr_endline "AAA";
+    Lwt.return ()
+  with
+    Unix.Unix_error
+      ((Unix.EINPROGRESS | Unix.EWOULDBLOCK | Unix.EAGAIN), _, _) ->
+if !d then prerr_endline "BBB";
+        check_socket s
+  | e ->
+if !d then prerr_endline "CCC";
+      Lwt.fail e
+
+(*
+let ids = ref 0
+let new_id () = incr ids; !ids
+
+let _waitpid flags pid =
+  try
+    Lwt.return (Unix.waitpid flags pid)
+  with e ->
+    Lwt.fail e
+
+let waitpid flags pid =
+  if List.mem Unix.WNOHANG flags || windows_hack then
+    _waitpid flags pid
+  else
+    let flags = Unix.WNOHANG :: flags in
+    Lwt.bind (_waitpid flags pid) (fun ((pid', _) as res) ->
+    if pid' <> 0 then
+      Lwt.return res
+    else
+      let res = Lwt.wait () in
+      wait_children := (new_id (), (res, flags, pid)) :: !wait_children;
+      res)
+
+let wait () = waitpid [] (-1)
+
+let system cmd =
+  match Unix.fork () with
+     0 -> Unix.execv "/bin/sh" [| "/bin/sh"; "-c"; cmd |]
+  | id -> Lwt.bind (waitpid [] id) (fun (pid, status) -> Lwt.return status)
+*)
+
+(****)
+(*
+type lwt_in_channel = in_channel
+type lwt_out_channel = out_channel
+
+let intern_in_channel ch =
+  Unix.set_nonblock (Unix.descr_of_in_channel ch); ch
+let intern_out_channel ch =
+  Unix.set_nonblock (Unix.descr_of_out_channel ch); ch
+
+
+let wait_inchan ic = wait_read (Unix.descr_of_in_channel ic)
+let wait_outchan oc = wait_write (Unix.descr_of_out_channel oc)
+
+let rec input_char ic =
+  try
+    Lwt.return (Pervasives.input_char ic)
+  with
+    Sys_blocked_io ->
+      Lwt.bind (wait_inchan ic) (fun () -> input_char ic)
+  | e ->
+      Lwt.fail e
+
+let rec input ic s ofs len =
+  try
+    Lwt.return (Pervasives.input ic s ofs len)
+  with
+    Sys_blocked_io ->
+      Lwt.bind (wait_inchan ic) (fun () -> input ic s ofs len)
+  | e ->
+      Lwt.fail e
+
+let rec unsafe_really_input ic s ofs len =
+  if len <= 0 then
+    Lwt.return ()
+  else begin
+    Lwt.bind (input ic s ofs len) (fun r ->
+    if r = 0
+    then Lwt.fail End_of_file
+    else unsafe_really_input ic s (ofs+r) (len-r))
+  end
+
+let really_input ic s ofs len =
+  if ofs < 0 || len < 0 || ofs > String.length s - len
+  then Lwt.fail (Invalid_argument "really_input")
+  else unsafe_really_input ic s ofs len
+
+let input_line ic =
+  let buf = ref (String.create 128) in
+  let pos = ref 0 in
+  let rec loop () =
+    if !pos = String.length !buf then begin
+      let newbuf = String.create (2 * !pos) in
+      String.blit !buf 0 newbuf 0 !pos;
+      buf := newbuf
+    end;
+    Lwt.bind (input_char ic) (fun c ->
+    if c = '\n' then
+      Lwt.return ()
+    else begin
+      !buf.[!pos] <- c;
+      incr pos;
+      loop ()
+    end)
+  in
+  Lwt.bind
+    (Lwt.catch loop
+       (fun e ->
+          match e with
+            End_of_file when !pos <> 0 ->
+              Lwt.return ()
+          | _ ->
+              Lwt.fail e))
+    (fun () ->
+       let res = String.create !pos in
+       String.blit !buf 0 res 0 !pos;
+       Lwt.return res)
+*)
+(****)
+
+(*
+type popen_process =
+    Process of in_channel * out_channel
+  | Process_in of in_channel
+  | Process_out of out_channel
+  | Process_full of in_channel * out_channel * in_channel
+
+let popen_processes = (Hashtbl.create 7 : (popen_process, int) Hashtbl.t)
+
+let open_proc cmd proc input output toclose =
+  match Unix.fork () with
+     0 -> if input <> Unix.stdin then begin
+            Unix.dup2 input Unix.stdin;
+            Unix.close input
+          end;
+          if output <> Unix.stdout then begin
+            Unix.dup2 output Unix.stdout;
+            Unix.close output
+          end;
+          List.iter Unix.close toclose;
+          Unix.execv "/bin/sh" [| "/bin/sh"; "-c"; cmd |]
+  | id -> Hashtbl.add popen_processes proc id
+
+let open_process_in cmd =
+  let (in_read, in_write) = pipe_in () in
+  let inchan = Unix.in_channel_of_descr in_read in
+  open_proc cmd (Process_in inchan) Unix.stdin in_write [in_read];
+  Unix.close in_write;
+  Lwt.return inchan
+
+let open_process_out cmd =
+  let (out_read, out_write) = pipe_out () in
+  let outchan = Unix.out_channel_of_descr out_write in
+  open_proc cmd (Process_out outchan) out_read Unix.stdout [out_write];
+  Unix.close out_read;
+  Lwt.return outchan
+
+let open_process cmd =
+  let (in_read, in_write) = pipe_in () in
+  let (out_read, out_write) = pipe_out () in
+  let inchan = Unix.in_channel_of_descr in_read in
+  let outchan = Unix.out_channel_of_descr out_write in
+  open_proc cmd (Process(inchan, outchan)) out_read in_write
+                                           [in_read; out_write];
+  Unix.close out_read;
+  Unix.close in_write;
+  Lwt.return (inchan, outchan)
+
+(* FIX: Subprocesses that use /dev/tty to print things on the terminal
+   will NOT have this output captured and returned to the caller of this
+   function.  There's an argument that this is correct, but if we are
+   running from a GUI the user may not be looking at any terminal and it
+   will appear that the process is just hanging.  This can be fixed, in
+   principle, by writing a little C code that opens /dev/tty and then uses
+   the TIOCNOTTY ioctl control to detach the terminal. *)
+
+let open_proc_full cmd env proc input output error toclose =
+  match Unix.fork () with
+     0 -> Unix.dup2 input Unix.stdin; Unix.close input;
+          Unix.dup2 output Unix.stdout; Unix.close output;
+          Unix.dup2 error Unix.stderr; Unix.close error;
+          List.iter Unix.close toclose;
+          Unix.execve "/bin/sh" [| "/bin/sh"; "-c"; cmd |] env
+  | id -> Hashtbl.add popen_processes proc id
+
+let open_process_full cmd env =
+  let (in_read, in_write) = pipe_in () in
+  let (out_read, out_write) = pipe_out () in
+  let (err_read, err_write) = pipe_in () in
+  let inchan = Unix.in_channel_of_descr in_read in
+  let outchan = Unix.out_channel_of_descr out_write in
+  let errchan = Unix.in_channel_of_descr err_read in
+  open_proc_full cmd env (Process_full(inchan, outchan, errchan))
+                 out_read in_write err_write [in_write; out_read; err_read];
+  Unix.close out_read;
+  Unix.close in_write;
+  Unix.close err_write;
+  Lwt.return (inchan, outchan, errchan)
+
+let find_proc_id fun_name proc =
+  try
+    let pid = Hashtbl.find popen_processes proc in
+    Hashtbl.remove popen_processes proc;
+    pid
+  with Not_found ->
+    raise (Unix.Unix_error (Unix.EBADF, fun_name, ""))
+*)
+(*
+let close_process_in inchan =
+  let pid = find_proc_id "close_process_in" (Process_in inchan) in
+  close_in inchan;
+  Lwt.bind (waitpid [] pid) (fun (_, status) -> Lwt.return status)
+
+let close_process_out outchan =
+  let pid = find_proc_id "close_process_out" (Process_out outchan) in
+  close_out outchan;
+  Lwt.bind (waitpid [] pid) (fun (_, status) -> Lwt.return status)
+
+let close_process (inchan, outchan) =
+  let pid = find_proc_id "close_process" (Process(inchan, outchan)) in
+  close_in inchan; close_out outchan;
+  Lwt.bind (waitpid [] pid) (fun (_, status) -> Lwt.return status)
+
+let close_process_full (outchan, inchan, errchan) =
+  let pid =
+    find_proc_id "close_process_full"
+                 (Process_full(outchan, inchan, errchan)) in
+  close_out inchan; close_in outchan; close_in errchan;
+  Lwt.bind (waitpid [] pid) (fun (_, status) -> Lwt.return status)
+*)
+
+type lwt_in_channel
+let input_line _ = assert false (*XXXXX*)
+let intern_in_channel _ = assert false (*XXXXX*)

Modified: trunk/src/mkProjectInfo.ml
===================================================================
--- trunk/src/mkProjectInfo.ml	2010-01-20 16:11:27 UTC (rev 405)
+++ trunk/src/mkProjectInfo.ml	2010-01-22 09:52:57 UTC (rev 406)
@@ -98,3 +98,4 @@
 Printf.printf "NAME=%s\n" projectName;;
 
 
+

Modified: trunk/src/osx.ml
===================================================================
--- trunk/src/osx.ml	2010-01-20 16:11:27 UTC (rev 405)
+++ trunk/src/osx.ml	2010-01-22 09:52:57 UTC (rev 406)
@@ -514,6 +514,7 @@
         output_string outch "\000\000\014\176"; (* length *)
         output_string outch "\000\000\000\002"; (* Resource fork *)
         output_string outch "\000\000\014\226"; (* offset *)
+(* FIX: should check for overflow! *)
         output_string outch (setInt4 (Uutil.Filesize.toInt64 length));
                                                 (* length *)
         output_string outch (emptyFinderInfo ());

Modified: trunk/src/osxsupport.c
===================================================================
--- trunk/src/osxsupport.c	2010-01-20 16:11:27 UTC (rev 405)
+++ trunk/src/osxsupport.c	2010-01-22 09:52:57 UTC (rev 406)
@@ -33,12 +33,12 @@
   CAMLlocal3(res, fInfo, length);
   int retcode;
   struct attrlist attrList;
-  unsigned long options = 0;
+  unsigned long options = FSOPT_REPORT_FULLSIZE;
   struct {
-    unsigned long length;
-    char          finderInfo [32];
-    off_t         rsrcLength;
-  } attrBuf;
+    u_int32_t length;
+    char      finderInfo [32];
+    off_t     rsrcLength;
+  } __attribute__ ((packed)) attrBuf;
 
   attrList.bitmapcount = ATTR_BIT_MAP_COUNT;
   attrList.reserved = 0;
@@ -58,10 +58,10 @@
 
   if (Bool_val (need_size)) {
     if (attrBuf.length != sizeof attrBuf)
-      unix_error (EOPNOTSUPP, "getattrlist", path);
+      unix_error (EINVAL, "getattrlist", path);
   } else {
-    if (attrBuf.length < sizeof (unsigned long) + 32)
-      unix_error (EOPNOTSUPP, "getattrlist", path);
+    if (attrBuf.length != sizeof (u_int32_t) + 32)
+      unix_error (EINVAL, "getattrlist", path);
   }
 
   fInfo = alloc_string (32);
@@ -92,9 +92,9 @@
   struct attrlist attrList;
   unsigned long options = 0;
   struct {
-    unsigned long length;
-    char          finderInfo [32];
-  } attrBuf;
+    u_int32_t length;
+    char      finderInfo [32];
+  } __attribute__ ((packed))  attrBuf;
 
   attrList.bitmapcount = ATTR_BIT_MAP_COUNT;
   attrList.reserved = 0;

Modified: trunk/src/remote.ml
===================================================================
--- trunk/src/remote.ml	2010-01-20 16:11:27 UTC (rev 405)
+++ trunk/src/remote.ml	2010-01-22 09:52:57 UTC (rev 406)
@@ -28,10 +28,9 @@
    But that resulted in huge amounts of output from '-debug all'.
 *)
 
-let windowsHack = Sys.os_type <> "Unix"
-let recent_ocaml =
-  Scanf.sscanf Sys.ocaml_version "%d.%d"
-    (fun maj min -> (maj = 3 && min >= 11) || maj > 3)
+let _ =
+  if Sys.os_type = "Unix" then
+    ignore(Sys.set_signal Sys.sigpipe Sys.Signal_ignore)
 
 let _ =
   if Sys.os_type = "Unix" then
@@ -53,8 +52,6 @@
    But then, there is the risk that the two sides exchange spurious
    messages.
 *)
-let needFlowControl = windowsHack
-let readOrWrite = needFlowControl && not recent_ocaml
 
 (****)
 
@@ -307,10 +304,9 @@
 type connection =
   { inputBuffer : ioBuffer;
     outputBuffer : ioBuffer;
-    outputQueue : outputQueue;
-    receiver :  (unit -> unit Lwt.t) option ref }
+    outputQueue : outputQueue }
 
-let maybeFlush receiver pendingFlush q buf =
+let maybeFlush pendingFlush q buf =
   (* We return immediately if a flush is already scheduled, or if the
      output buffer is already empty. *)
   (* If we are doing flow control and we can write, we need to send
@@ -335,25 +331,19 @@
              flushBuffer buf
            end else
              flushBuffer buf) >>= fun () ->
-      assert (not (q.flowControl && q.canWrite));
-      (* Restart the reader thread if needed *)
-      match !receiver with
-        None   -> Lwt.return ()
-      | Some f -> f ()
+      Lwt.return ()
     end else
       Lwt.return ()
   end
 
 let makeConnection isServer inCh outCh =
   let pendingFlush = ref false in
-  let receiver = ref None in
   let outputBuffer = makeBuffer outCh in
-    { inputBuffer = makeBuffer inCh;
-      outputBuffer = outputBuffer;
-      outputQueue =
-        makeOutputQueue isServer
-          (fun q -> maybeFlush receiver pendingFlush q outputBuffer);
-      receiver = receiver }
+  { inputBuffer = makeBuffer inCh;
+    outputBuffer = outputBuffer;
+    outputQueue =
+      makeOutputQueue isServer
+        (fun q -> maybeFlush pendingFlush q outputBuffer) }
 
 (* Send message [l] *)
 let dump conn l =
@@ -694,9 +684,7 @@
 (* Receiving thread: read a message and dispatch it to the right
    thread or create a new thread to process requests. *)
 let rec receive conn =
-  if readOrWrite && conn.outputQueue.canWrite then begin
-    conn.receiver := Some (fun () -> receive conn); Lwt.return ()
-  end else begin
+  begin
     debugE (fun () -> Util.msg "Waiting for next message\n");
     (* Get the message ID *)
     let id = Bytearray.create intSize in
@@ -966,15 +954,15 @@
      in a deadlock."
 
 let negociateFlowControlLocal conn () =
-  if not needFlowControl then disableFlowControl conn.outputQueue;
-  Lwt.return needFlowControl
+  disableFlowControl conn.outputQueue;
+  Lwt.return false
 
 let negociateFlowControlRemote =
   registerServerCmd "negociateFlowControl" negociateFlowControlLocal
 
 let negociateFlowControl conn =
   (* Flow control negociation can be done asynchronously. *)
-  if not (needFlowControl || Prefs.read halfduplex) then
+  if not (Prefs.read halfduplex) then
     Lwt.ignore_result
       (negociateFlowControlRemote conn () >>= fun needed ->
        if not needed then


Property changes on: trunk/src/system/win
___________________________________________________________________
Added: svn:ignore
   + *.cmx
*.cmi
*.cmo




More information about the Unison-hackers mailing list