# # # patch "automate.ml" # from [ad1749526d052113da277fbcb68813a836cfed37] # to [226131260a583b06dfd4c61465ea9c8635df5737] # # patch "automate.mli" # from [af3fd8e56df2a66b0113a47ca374f1d065211b44] # to [e5b4eb3d7c214f7613e11395a39e15e30357a273] # # patch "database.ml" # from [50a5a35ff7dc8d9910fe824e9ff81b6d8afae95b] # to [2add1983b5938540a6271edd7fc62ec45f74df8c] # ============================================================ --- automate.ml ad1749526d052113da277fbcb68813a836cfed37 +++ automate.ml 226131260a583b06dfd4c61465ea9c8635df5737 @@ -14,15 +14,18 @@ type pb = [ (** Type definitions *) type pb = [ - | `EXIT of int + | `HANGUP | `FAILURE | `ERROR of exn ] +type watch_state = [ + | `DISABLED + | `WATCH of Giochannel.source_id + | pb ] type watch = { w_name : string ; w_chan : Giochannel.t ; - mutable w_finished : bool ; - mutable w : Giochannel.source_id option ; - exn_cb : pb -> unit ; + mutable w_state : watch_state ; + mutable exn_cb : pb -> unit ; } type in_watch = { in_w : watch ; @@ -33,32 +36,37 @@ type out_watch = { out_w : watch ; out_sb : string ; out_buffer : Buffer.t ; - out_cb : (Buffer.t -> unit) + mutable out_cb : (Buffer.t -> unit) } -type process = { - p_in : in_watch ; - p_out : out_watch ; - p_err : out_watch ; - pid : Gspawn.pid ; - mutable state : [`RUNNING | `EXITING ] - } - type command_id = int type output = [ | `OUTPUT of string | `ERROR of string | `SYNTAX_ERROR of string] type chunk = command_id * int * bool * string +type process = { + p_in : in_watch ; + p_out : out_watch ; + p_err : out_watch ; + mutable cmd_number : command_id ; + mutable callbacks : (command_id * (output -> unit)) list ; + mutable chunks : (command_id * chunk list ref) list ; + mutable state : [`RUNNING|`EXITING|`EXIT of int] ; + mutable exit_cb : (pb -> string -> unit) ; + } + + type t = { mtn : string ; db_fname : string ; - mutable process : process option ; - mutable cmd_number : command_id ; - mutable callbacks : (command_id * (output -> unit)) list ; - mutable chunks : (command_id * chunk list ref) list ; + mutable process : process option ; } + + + + let string_of_conditions cond = let s = String.make 6 '.' in Array.iteri @@ -67,24 +75,42 @@ let string_of_conditions cond = `HUP, 'H' ; `ERR, 'E' ; `PRI, 'P' ; `NVAL, 'N' |] ; s +let string_of_pb = function + | `ERROR exn -> Printf.sprintf "EXN '%s'" (Printexc.to_string exn) + | `FAILURE -> "ERR" + | `HANGUP -> "HUP" + +let error_cb w conditions = + if debug then log "%s hup_cb = %s" w.w_name (string_of_conditions conditions) ; + if List.mem `ERR conditions + then begin + w.exn_cb `FAILURE + end else begin + assert (conditions = [`HUP]) ; + w.exn_cb `HANGUP + end + + let do_write w data = let bytes_written = ref 0 in try match Giochannel.write_chars w.in_w.w_chan ~bytes_written ~off:w.in_pos data with | `NORMAL written -> if debug then log "%s cb: wrote %d" w.in_w.w_name written ; - w.in_pos <- w.in_pos + written + w.in_pos <- w.in_pos + written ; + w.in_pos >= String.length data | `AGAIN -> (* should not happen, our out channels are blocking *) - if debug then log "%s cb: EAGAIN ?" w.in_w.w_name + if debug then log "%s cb: EAGAIN ?" w.in_w.w_name ; + false with | Giochannel.Error (_, msg) as exn -> - (* an error occurred, we continue here but the the exn callback will probably tear down everything *) if debug then log "%s cb: error %s, wrote %d" w.in_w.w_name msg !bytes_written ; - w.in_w.exn_cb (`ERROR exn) + w.in_w.exn_cb (`ERROR exn) ; + false let _write_cb w conditions = if debug then log "%s cb = %s" w.in_w.w_name (string_of_conditions conditions) ; @@ -92,10 +118,9 @@ let _write_cb w conditions = | [] -> (* nothing to write, remove the source from the main loop *) if debug then log "%s cb: empty write queue, removing watch" w.in_w.w_name ; - w.in_w.w <- None ; - false + w.in_w.w_state <- `DISABLED - | (_, data) :: tl -> + | (nb, data) :: tl -> (* some data to write *) let len = String.length data in let to_write = len - w.in_pos in @@ -105,23 +130,18 @@ let _write_cb w conditions = if List.mem `OUT conditions then begin - do_write w data ; - if w.in_pos >= len + let finished = do_write w data in + if finished then begin + if debug then log "%s cb: finished writing cmd %d" w.in_w.w_name nb ; (* written everything, proceed to the next chunk *) w.in_data <- tl ; w.in_pos <- 0 - end ; - true + end end - else begin - (* no `OUT condition, only `HUP or `ERR: not good, the channel is going down ! *) - w.in_w.exn_cb `FAILURE ; - false - end + else + error_cb w.in_w conditions - - let _read_cb w conditions = if debug then log "%s cb = %s" w.out_w.w_name (string_of_conditions conditions) ; if List.mem `IN conditions @@ -131,76 +151,81 @@ let _read_cb w conditions = | `NORMAL read -> if debug then log "%s cb: read %d" w.out_w.w_name read ; Buffer.add_substring w.out_buffer w.out_sb 0 read ; - w.out_cb w.out_buffer ; - true + w.out_cb w.out_buffer | `EOF -> - if debug then log "%s cb: eof" w.out_w.w_name ; - w.out_w.exn_cb `FAILURE ; - false + if debug then log "%s cb: eof ?" w.out_w.w_name ; + w.out_w.exn_cb `FAILURE | `AGAIN -> - if debug then log "%s cb: AGAIN" w.out_w.w_name ; - true + if debug then log "%s cb: AGAIN" w.out_w.w_name with exn -> if debug then log "%s cb: error %s" w.out_w.w_name (Printexc.to_string exn) ; - w.out_w.exn_cb (`ERROR exn) ; - false + w.out_w.exn_cb (`ERROR exn) end - else begin - (* no `IN condition, only `HUP or `ERR: not good, the channel is going down ! *) - w.out_w.exn_cb `FAILURE ; - false - end - + else + error_cb w.out_w conditions + + +let reschedule_watch w = + match w.w_state with + | `WATCH _ -> true + | _ -> false + let write_cb w c = - try _write_cb w c + try _write_cb w c ; reschedule_watch w.in_w with exn -> if debug then log "write cb %s: uncaught exception '%s'" w.in_w.w_name (Printexc.to_string exn) ; - false + true let read_cb w c = - try _read_cb w c + try _read_cb w c ; reschedule_watch w.out_w with exn -> if debug then log "read cb %s: uncaught exception '%s'" w.out_w.w_name (Printexc.to_string exn) ; - false + true let setup_watch_write w nb data = - match w.in_w.w with - | None -> + match w.in_w.w_state with + | `DISABLED -> + assert (w.in_data = []) ; w.in_data <- [ nb, data ] ; w.in_pos <- 0 ; let id = Giochannel.add_watch w.in_w.w_chan [ `OUT ; `HUP ; `ERR ] (write_cb w) in - w.in_w.w <- Some id - | Some id -> + w.in_w.w_state <- `WATCH id ; + | `WATCH _ -> w.in_data <- w.in_data @ [ nb, data ] + | _ -> + assert (false) let setup_watch_read w = - assert (w.out_w.w = None) ; + assert (w.out_w.w_state = `DISABLED) ; let id = Giochannel.add_watch w.out_w.w_chan [ `IN ; `HUP ; `ERR ] (read_cb w) in - w.out_w.w <- Some id + w.out_w.w_state <- `WATCH id +let setup_exn_cb w cb = + w.exn_cb <- cb w + let setup_channel ~nonblock fd = let chan = Giochannel.unix_new (some fd) in if nonblock then Giochannel.set_flags chan [`NONBLOCK] ; Giochannel.set_encoding chan None ; Giochannel.set_buffered chan false ; chan -let make_watch name chan exn_cb = - { w_name = name ; w_chan = chan ; w_finished = false ; w = None ; exn_cb = exn_cb } -let make_in_watch name fd exn_cb = +let make_watch name chan = + { w_name = name ; w_chan = chan ; w_state = `DISABLED ; exn_cb = ignore } +let make_in_watch name fd = let chan = setup_channel ~nonblock:true fd in - { in_w = make_watch name chan exn_cb ; in_data = [] ; in_pos = 0 } -let make_out_watch name fd exn_cb out_cb = + { in_w = make_watch name chan ; in_data = [] ; in_pos = 0 } +let make_out_watch name fd = let chan = setup_channel ~nonblock:false fd in let w = { - out_w = make_watch name chan exn_cb ; + out_w = make_watch name chan ; out_sb = String.create 4096 ; out_buffer = Buffer.create 1024 ; - out_cb = out_cb + out_cb = ignore } in setup_watch_read w ; w @@ -209,22 +234,6 @@ let make_out_watch name fd exn_cb out_cb -let spawn mtn db exn_cb out_cb = - let flags = - [ `PIPE_STDIN ; `PIPE_STDOUT ; `PIPE_STDERR ; - `SEARCH_PATH ; `DO_NOT_REAP_CHILD ] in - let child = - Gspawn.async_with_pipes - ~flags - [ mtn ; "-d" ; db ; "automate" ; "stdio" ] in - let pid = some child.Gspawn.pid in - ignore (Gspawn.add_child_watch pid (fun st -> exn_cb (`EXIT st))) ; - { p_in = make_in_watch "stdin" child.Gspawn.standard_input exn_cb ; - p_out = make_out_watch "stdout" child.Gspawn.standard_output exn_cb out_cb ; - p_err = make_out_watch "stderr" child.Gspawn.standard_error exn_cb ignore ; - pid = pid ; - state = `RUNNING - } @@ -285,38 +294,40 @@ let decode_stdio_chunk b = `INCOMPLETE -let aborted_cmd ctrl nb = - not (List.mem_assoc nb ctrl.callbacks) +let aborted_cmd p nb = + not (List.mem_assoc nb p.callbacks) -let rec out_cb ctrl b = +let rec out_cb p b = match decode_stdio_chunk b with | `INCOMPLETE -> () - | `CHUNK (nb, _, _, _) when aborted_cmd ctrl nb -> - ctrl.chunks <- List.remove_assoc nb ctrl.chunks ; - out_cb ctrl b + | `CHUNK (nb, _, _, _) when aborted_cmd p nb -> + p.chunks <- List.remove_assoc nb p.chunks ; + out_cb p b | `CHUNK ((nb, code, false, data) as chunk) -> + if debug then log "decoded a chunk" ; let previous_chunks = - try List.assoc nb ctrl.chunks + try List.assoc nb p.chunks with Not_found -> let c = ref [] in - ctrl.chunks <- (nb, c) :: ctrl.chunks ; + p.chunks <- (nb, c) :: p.chunks ; c in previous_chunks := chunk :: !previous_chunks ; - out_cb ctrl b + out_cb p b | `CHUNK ((nb, code, true, data) as chunk) -> + if debug then log "decoded last chunk" ; let chunks = try - let c = List.assoc nb ctrl.chunks in - ctrl.chunks <- List.remove_assoc nb ctrl.chunks ; + let c = List.assoc nb p.chunks in + p.chunks <- List.remove_assoc nb p.chunks ; List.rev (chunk :: !c) with Not_found -> [ chunk ] in - let cb = List.assoc nb ctrl.callbacks in - ctrl.callbacks <- List.remove_assoc nb ctrl.callbacks ; + let cb = List.assoc nb p.callbacks in + p.callbacks <- List.remove_assoc nb p.callbacks ; let msg = String.concat "" (List.map (fun (_, _, _, d) -> d) chunks) in @@ -327,77 +338,104 @@ let rec out_cb ctrl b = | 2 -> `ERROR msg | _ -> failwith "invalid_code in automate stdio output" in ignore (Glib.Idle.add ~prio:0 (fun () -> cb data ; false)) ; - out_cb ctrl b + out_cb p b -let stop_watch w = - if debug then log "stopping watch %s" w.w_name ; - begin - match w.w with - | Some id -> - if debug then log "stopping watch %s: remove" w.w_name ; - Giochannel.remove_watch id ; - w.w <- None - | None -> - () - end ; - if not w.w_finished - then begin - if debug then log "stopping watch %s: shutdown" w.w_name ; - Giochannel.shutdown w.w_chan false ; - w.w_finished <- true - end ; - if debug then log "stopping watch %s: all done" w.w_name +let check_exit p = + match p.state with + | `RUNNING + | `EXITING -> + () + | `EXIT _ -> + let stderr = Buffer.contents p.p_err.out_buffer in + let r = + if p.p_in.in_w.w_state <> `HANGUP + then p.p_in.in_w.w_state + else if p.p_out.out_w.w_state <> `HANGUP + then p.p_out.out_w.w_state + else if p.p_err.out_w.w_state <> `HANGUP + then p.p_err.out_w.w_state + else `HANGUP in + match r with + | #pb as r -> p.exit_cb r stderr + | _ -> () -let tear_down ctrl arg = - let msg = - match arg with - | `EXIT s -> - Printf.sprintf "exiting with status %d" s - | `FAILURE -> - "some channel is closing" - | `ERROR (Giochannel.Error (_, msg)) -> - Printf.sprintf "GIOChannel error: %s" msg - | `ERROR exn -> - Printf.sprintf "uncaught exception: %s" (Printexc.to_string exn) in - if debug then log "tear_down cb: '%s'" msg ; +let exn_cb p w r = + if debug then log "%s exn_cb: %s" w.w_name (string_of_pb r) ; + if p.state = `RUNNING then p.state <- `EXITING ; + Giochannel.shutdown w.w_chan false ; + w.w_state <- (r : pb :> watch_state) ; + check_exit p + +let reap_cb p pid st = + if debug then log "reap_cb: %d" st ; + Gspawn.close_pid pid ; + if p.p_in.in_w.w_state = `DISABLED + then exn_cb p p.p_in.in_w `HANGUP ; + p.state <- `EXIT st ; + check_exit p + + + + +let _submit p cmd cb = + let id = p.cmd_number in + send_data p id (encode_stdio cmd) ; + p.cmd_number <- id + 1 ; + p.callbacks <- (id, cb) :: p.callbacks ; + id + + +let spawn mtn db = + let flags = + [ `PIPE_STDIN ; `PIPE_STDOUT ; `PIPE_STDERR ; + `SEARCH_PATH ; `DO_NOT_REAP_CHILD] in + let child = + Gspawn.async_with_pipes + ~flags + [ mtn ; "-d" ; db ; "automate" ; "stdio" ] in + let p = + { p_in = make_in_watch "stdin" child.Gspawn.standard_input ; + p_out = make_out_watch "stdout" child.Gspawn.standard_output ; + p_err = make_out_watch "stderr" child.Gspawn.standard_error ; + state = `RUNNING ; + cmd_number = 0 ; + callbacks = [] ; + chunks = [] ; + exit_cb = (fun _ -> assert false) + } in + let pid = some child.Gspawn.pid in + ignore (Gspawn.add_child_watch ~prio:50 pid (reap_cb p pid)) ; + p.p_out.out_cb <- out_cb p ; + setup_exn_cb p.p_in.in_w (exn_cb p) ; + setup_exn_cb p.p_out.out_w (exn_cb p) ; + setup_exn_cb p.p_err.out_w (exn_cb p) ; + p + + + + + +let exit_cb ctrl p r stderr = + if debug then log "exit_cb: r='%s' stderr='%s'" (string_of_pb r) stderr ; + (* display dialog box in case of error ... *) match ctrl.process with - | None -> - if debug then log "tear_down cb: no process ?" - | Some p -> - if p.state <> `EXITING - then begin - if debug then log "tear_down cb: removing watches" ; - p.state <- `EXITING ; - stop_watch p.p_in.in_w ; - stop_watch p.p_out.out_w ; - stop_watch p.p_err.out_w ; - end - else - if debug then log "tear_down cb: process already exiting" ; - match arg with - | `EXIT _ -> - if debug then log "tear_down cb: clearing process" ; - ctrl.callbacks <- [] ; - ctrl.chunks <- [] ; - Gspawn.close_pid p.pid ; - ctrl.process <- None - | _ -> () + | Some p' when p' == p -> + ctrl.process <- None + | _ -> + () + let ensure_process ctrl = match ctrl.process with - | Some p -> - assert (p.state = `RUNNING) ; + | Some ({ state = `RUNNING } as p) -> p + | Some { state = `EXITING | `EXIT _ } | None -> - let p = - spawn - ctrl.mtn ctrl.db_fname - (tear_down ctrl) - (out_cb ctrl) in + let p = spawn ctrl.mtn ctrl.db_fname in + p.exit_cb <- exit_cb ctrl p ; ctrl.process <- Some p ; - ctrl.cmd_number <- 0 ; p @@ -411,38 +449,62 @@ let make mtn db = { let make mtn db = { mtn = mtn ; db_fname = db ; - process = None ; - cmd_number = 0 ; - callbacks = [] ; - chunks = [] ; + process = None } let exit ctrl = match ctrl.process with - | None -> + | Some ({ state = `RUNNING } as p) -> + if debug then log "forced exit" ; + let w = p.p_in.in_w in + begin + match w.w_state with + | `WATCH id -> + Giochannel.remove_watch id + | _ -> () + end ; + Giochannel.shutdown w.w_chan false ; + w.w_state <- `HANGUP + | Some { state = `EXITING | `EXIT _ } + | None -> () - | Some p -> - stop_watch p.p_in.in_w -let submit c cmd cb = - let p = ensure_process c in - let id = c.cmd_number in - send_data p id (encode_stdio cmd) ; - c.cmd_number <- id + 1 ; - c.callbacks <- (id, cb) :: c.callbacks ; - id +let submit ctrl cmd cb = + _submit (ensure_process ctrl) cmd cb +let submit_sync ctrl cmd = + let output = ref None in + let exit_loop = ref false in + let _ = + submit + ctrl cmd + (fun v -> output := Some v ; exit_loop := true) in + while not !exit_loop do + ignore (Glib.Main.iteration true) + done ; + match some !output with + | `OUTPUT msg -> + msg + | `ERROR msg + | `SYNTAX_ERROR msg -> + Viz_types.errorf "mtn automate error: %s" msg + + + let abort ctrl nb = - ctrl.callbacks <- List.remove_assoc nb ctrl.callbacks ; match ctrl.process with | None -> () | Some p -> + p.callbacks <- List.remove_assoc nb p.callbacks ; match p.p_in.in_data with - | (id, _) :: tl when id = nb && p.p_in.in_pos = 0 -> - p.p_in.in_data <- tl ; - p.p_in.in_pos <- 0 + | (id, _) :: tl when id = nb -> + if p.p_in.in_pos = 0 + then begin + p.p_in.in_data <- tl ; + p.p_in.in_pos <- 0 + end | h :: tl -> p.p_in.in_data <- h :: (List.remove_assoc nb tl) | [] -> ============================================================ --- automate.mli af3fd8e56df2a66b0113a47ca374f1d065211b44 +++ automate.mli e5b4eb3d7c214f7613e11395a39e15e30357a273 @@ -10,6 +10,7 @@ val submit : t -> string list -> (output val exit : t -> unit val submit : t -> string list -> (output -> unit) -> command_id +val submit_sync : t -> string list -> string val abort : t -> command_id -> unit ============================================================ --- database.ml 50a5a35ff7dc8d9910fe824e9ff81b6d8afae95b +++ database.ml 2add1983b5938540a6271edd7fc62ec45f74df8c @@ -488,7 +488,9 @@ type t = { stmts : Sqlite3.stmt array ; rostered : bool ; base64 : bool ; - schema_id : string + schema_id : string ; + + mtn_automate : Automate.t ; } @@ -525,7 +527,8 @@ let open_db ?busy_handler fname = stmts = stmts ; rostered = rostered ; base64 = base64 ; - schema_id = schema + schema_id = schema ; + mtn_automate = Automate.make "mtn" fname ; } with Sqlite3.Error (_, msg) -> Sqlite3.close_db db ; @@ -541,8 +544,15 @@ let get_filename d = d.filename let get_filename d = d.filename +let decode_branches msg = + let l = string_split '\n' msg in + List.map (fun l -> l, 0) l + let fetch_branches db = - sqlite_try (fetch_branches db.base64) db + decode_branches + (Automate.submit_sync + db.mtn_automate + [ "branches" ]) let fetch_ancestry_graph db query = sqlite_try (fetch_agraph query db.base64) db