# # # add_dir "tests/serve-automate" # # add_file "automate_ostream.hh" # content [a7d6900e637ed9f9987da5593a64bbe8f6c15953] # # add_file "tests/serve-automate/__driver__.lua" # content [22b275f3775911eab4c3a184779543931a24ff29] # # patch "Makefile.am" # from [9f43a6ddc7e20848595d446afc4e1996b4f297d7] # to [9718304152afdf5dbc4eb43e4cc6ccd1e40c6570] # # patch "cmd.hh" # from [b1068b8f8f0fe3b691a0a2b5624527d37adc4e48] # to [2ac1dfe625bcadd2ce41a5d46c1214643b2b4054] # # patch "cmd_automate.cc" # from [a73f7f3582b7341889daff6d03a27dfc5e1f0290] # to [793a0682e8545ffd16a1d36169183462238c9d08] # # patch "cmd_netsync.cc" # from [ccffd713e9f1139b29b9726fb5de936b2683a265] # to [e67546897e6e02942f15dfb76345413092f89301] # # patch "netcmd.hh" # from [10527907b0ef2b48afc16182cc998211ba879d08] # to [711df49e30cc0e6160b9b9067be4717be4163d63] # # patch "netsync.cc" # from [550bd33295edd35c99ac6befbe6f93da9cca8639] # to [01c71b1e96ca666173b26a0276dc113ee3bbca40] # # patch "options_list.hh" # from [67023185e0e775e535d178143ddad30bd6b2f298] # to [a234c3ac2d68bbc08a04c62194ae870ba99c5df8] # ============================================================ --- automate_ostream.hh a7d6900e637ed9f9987da5593a64bbe8f6c15953 +++ automate_ostream.hh a7d6900e637ed9f9987da5593a64bbe8f6c15953 @@ -0,0 +1,109 @@ +#ifndef __AUTOMATE_OSTREAM_HH__ +#define __AUTOMATE_OSTREAM_HH__ + +#include + +template > +class basic_automate_streambuf : public std::basic_streambuf<_CharT, _Traits> +{ + typedef _Traits traits_type; + typedef typename _Traits::int_type int_type; + size_t _bufsize; + std::basic_ostream<_CharT, _Traits> *out; + int cmdnum; + int err; +public: + /* + automate_streambuf(size_t bufsize) + : std::streambuf(), _bufsize(bufsize), out(0), cmdnum(0), err(0) + { + char *inbuf = new char[_bufsize]; + setp(inbuf, inbuf + _bufsize); + } + */ + basic_automate_streambuf(std::ostream & o, size_t bufsize) + : std::streambuf(), _bufsize(bufsize), out(&o), cmdnum(0), err(0) + { + _CharT *inbuf = new _CharT[_bufsize]; + setp(inbuf, inbuf + _bufsize); + } + ~basic_automate_streambuf() + {} + + void set_err(int e) + { + sync(); + err = e; + } + + void end_cmd() + { + _M_sync(true); + ++cmdnum; + err = 0; + } + + virtual int sync() + { + _M_sync(); + return 0; + } + + void _M_sync(bool end = false) + { + if (!out) + { + setp(this->pbase(), this->pbase() + _bufsize); + return; + } + int num = this->pptr() - this->pbase(); + if (num || end) + { + (*out) << cmdnum << ':' + << err << ':' + << (end?'l':'m') << ':' + << num << ':' + << std::basic_string<_CharT,_Traits>(this->pbase(), num); + setp(this->pbase(), this->pbase() + _bufsize); + out->flush(); + } + } + int_type + overflow(int_type c = traits_type::eof()) + { + sync(); + sputc(c); + return 0; + } +}; + +template > +struct basic_automate_ostream : public std::basic_ostream<_CharT, _Traits> +{ + typedef basic_automate_streambuf<_CharT, _Traits> streambuf_type; + streambuf_type _M_autobuf; + + basic_automate_ostream(std::basic_ostream<_CharT, _Traits> &out, + size_t blocksize) + : std::ostream(NULL), + _M_autobuf(out, blocksize) + { this->init(&_M_autobuf); } + + ~basic_automate_ostream() + {} + + streambuf_type * + rdbuf() const + { return const_cast(&_M_autobuf); } + + void set_err(int e) + { _M_autobuf.set_err(e); } + + void end_cmd() + { _M_autobuf.end_cmd(); } +}; + +typedef basic_automate_streambuf automate_streambuf; +typedef basic_automate_ostream automate_ostream; + +#endif ============================================================ --- tests/serve-automate/__driver__.lua 22b275f3775911eab4c3a184779543931a24ff29 +++ tests/serve-automate/__driver__.lua 22b275f3775911eab4c3a184779543931a24ff29 @@ -0,0 +1,19 @@ +-- this test uses netcat +skip_if(not existsonpath("nc")) + +include("common/netsync.lua") + +mtn_setup() +netsync.setup() + +automate_port = math.random(1024, 65535) +server = netsync.start({"--bind-automate", "localhost:" .. automate_port}) + +check({"nc", "-q", "10", "localhost", automate_port}, 0, true, false, + "l17:interface_versione") + +rename("stdout", "version") + +check(qgrep("^0:0:l:", "version")) + +server:stop() \ No newline at end of file ============================================================ --- Makefile.am 9f43a6ddc7e20848595d446afc4e1996b4f297d7 +++ Makefile.am 9718304152afdf5dbc4eb43e4cc6ccd1e40c6570 @@ -27,6 +27,7 @@ MOST_SOURCES = \ $(SANITY_CORE_SOURCES) $(LUAEXT_SOURCES) platform-wrapped.hh \ rev_types.hh mtn-sanity.cc mtn-sanity.hh ui.cc ui.hh \ app_state.cc app_state.hh \ + automate_ostream.hh \ botan_pipe_cache.hh \ commands.cc commands.hh $(CMD_SOURCES) \ diff_output.cc diff_output.hh \ ============================================================ --- cmd.hh b1068b8f8f0fe3b691a0a2b5624527d37adc4e48 +++ cmd.hh 2ac1dfe625bcadd2ce41a5d46c1214643b2b4054 @@ -19,6 +19,8 @@ class app_state; class app_state; +class automate_session; + namespace commands { class command @@ -111,6 +113,7 @@ namespace commands args_vector const & args, std::ostream & output) const = 0; friend class automate_stdio; + friend class ::automate_session; public: automate(std::string const & name, ============================================================ --- cmd_automate.cc a73f7f3582b7341889daff6d03a27dfc5e1f0290 +++ cmd_automate.cc 793a0682e8545ffd16a1d36169183462238c9d08 @@ -14,6 +14,7 @@ #include "cmd.hh" #include "app_state.hh" +#include "automate_ostream.hh" #include "ui.hh" #include "lua.hh" #include "lua_hooks.hh" @@ -248,105 +249,7 @@ public: } }; -struct automate_streambuf : public std::streambuf -{ -private: - size_t _bufsize; - std::ostream *out; - automate_reader *in; - int cmdnum; - int err; -public: - automate_streambuf(size_t bufsize) - : std::streambuf(), _bufsize(bufsize), out(0), in(0), cmdnum(0), err(0) - { - char *inbuf = new char[_bufsize]; - setp(inbuf, inbuf + _bufsize); - } - automate_streambuf(std::ostream & o, size_t bufsize) - : std::streambuf(), _bufsize(bufsize), out(&o), in(0), cmdnum(0), err(0) - { - char *inbuf = new char[_bufsize]; - setp(inbuf, inbuf + _bufsize); - } - automate_streambuf(automate_reader & i, size_t bufsize) - : std::streambuf(), _bufsize(bufsize), out(0), in(&i), cmdnum(0), err(0) - { - char *inbuf = new char[_bufsize]; - setp(inbuf, inbuf + _bufsize); - } - ~automate_streambuf() - {} - void set_err(int e) - { - sync(); - err = e; - } - - void end_cmd() - { - _M_sync(true); - ++cmdnum; - err = 0; - } - - virtual int sync() - { - _M_sync(); - return 0; - } - - void _M_sync(bool end = false) - { - if (!out) - { - setp(pbase(), pbase() + _bufsize); - return; - } - int num = pptr() - pbase(); - if (num || end) - { - (*out) << cmdnum << ':' - << err << ':' - << (end?'l':'m') << ':' - << num << ':' << std::string(pbase(), num); - setp(pbase(), pbase() + _bufsize); - out->flush(); - } - } - int_type - overflow(int_type c = traits_type::eof()) - { - sync(); - sputc(c); - return 0; - } -}; - -struct automate_ostream : public std::ostream -{ - automate_streambuf _M_autobuf; - - automate_ostream(std::ostream &out, size_t blocksize) - : std::ostream(NULL), - _M_autobuf(out, blocksize) - { this->init(&_M_autobuf); } - - ~automate_ostream() - {} - - automate_streambuf * - rdbuf() const - { return const_cast(&_M_autobuf); } - - void set_err(int e) - { _M_autobuf.set_err(e); } - - void end_cmd() - { _M_autobuf.end_cmd(); } -}; - CMD_AUTOMATE(stdio, "", N_("Automates several commands in one run"), "", ============================================================ --- cmd_netsync.cc ccffd713e9f1139b29b9726fb5de936b2683a265 +++ cmd_netsync.cc e67546897e6e02942f15dfb76345413092f89301 @@ -236,7 +236,7 @@ CMD(push, "push", "", CMD_REF(network), netsync_connection_info info; extract_client_connection_info(app.opts, app.lua, db, keys, args, info); - run_netsync_protocol(app.opts, app.lua, project, keys, + run_netsync_protocol(app, app.opts, app.lua, project, keys, client_voice, source_role, info); } @@ -258,7 +258,7 @@ CMD(pull, "pull", "", CMD_REF(network), if (app.opts.signing_key() == "") P(F("doing anonymous pull; use -kKEYNAME if you need authentication")); - run_netsync_protocol(app.opts, app.lua, project, keys, + run_netsync_protocol(app, app.opts, app.lua, project, keys, client_voice, sink_role, info); } @@ -284,7 +284,7 @@ CMD(sync, "sync", "", CMD_REF(network), workspace work(app, true); } - run_netsync_protocol(app.opts, app.lua, project, keys, + run_netsync_protocol(app, app.opts, app.lua, project, keys, client_voice, source_and_sink_role, info); } @@ -392,7 +392,7 @@ CMD(clone, "clone", "", CMD_REF(network) // make sure we're back in the original dir so that file: URIs work change_current_working_dir(start_dir); - run_netsync_protocol(app.opts, app.lua, project, keys, + run_netsync_protocol(app, app.opts, app.lua, project, keys, client_voice, sink_role, info); change_current_working_dir(workspace_dir); @@ -487,7 +487,8 @@ CMD_NO_WORKSPACE(serve, "serve", "", CMD N_("Serves the database to connecting clients"), "", options::opts::bind | options::opts::pidfile | - options::opts::bind_stdio | options::opts::no_transport_auth ) + options::opts::bind_stdio | options::opts::no_transport_auth | + options::opts::bind_automate_uris) { if (!args.empty()) throw usage(execid); @@ -518,7 +519,7 @@ CMD_NO_WORKSPACE(serve, "serve", "", CMD W(F("The --no-transport-auth option is usually only used " "in combination with --stdio")); - run_netsync_protocol(app.opts, app.lua, project, keys, + run_netsync_protocol(app, app.opts, app.lua, project, keys, server_voice, source_and_sink_role, info); } ============================================================ --- netcmd.hh 10527907b0ef2b48afc16182cc998211ba879d08 +++ netcmd.hh 711df49e30cc0e6160b9b9067be4717be4163d63 @@ -29,6 +29,8 @@ class options; class lua_hooks; class options; +class app_state; + typedef enum { server_voice, @@ -195,7 +197,8 @@ struct netsync_connection_info } client; }; -void run_netsync_protocol(options & opts, lua_hooks & lua, +void run_netsync_protocol(app_state & app, + options & opts, lua_hooks & lua, project_t & project, key_store & keys, protocol_voice voice, protocol_role role, ============================================================ --- netsync.cc 550bd33295edd35c99ac6befbe6f93da9cca8639 +++ netsync.cc 01c71b1e96ca666173b26a0276dc113ee3bbca40 @@ -26,6 +26,12 @@ #include #include +#include "automate_ostream.hh" +#include +#include "cmd.hh" +#include "work.hh" +#include "app_state.hh" + #include "lua_hooks.hh" #include "key_store.hh" #include "project.hh" @@ -378,6 +384,10 @@ protected: { return outbuf_bytes > constants::bufsz * 10; } + bool output_empty() const + { + return outbuf.empty(); + } public: string peer_id; string name() { return peer_id; } @@ -3214,8 +3224,287 @@ session_from_server_sync_item(options & } } +CMD_FWD_DECL(automate); + +class automate_session : public session_base +{ + app_state & app; + typedef commands::command_id command_id; + typedef commands::command command; + typedef commands::automate automate; + struct Command + { + vector > opts; + vector args; + }; + bool skip_ws(size_t & pos, size_t len) + { + static string whitespace(" \r\n\t"); + while (pos < len && whitespace.find(inbuf[pos]) != string::npos) + { + ++pos; + } + if (pos == len) + return false; + return true; + } + bool read_str(size_t & pos, size_t len, string & out) + { + if (pos >= len) + return false; + if (!skip_ws(pos, len)) + return false; + size_t size = 0; + char c = inbuf[pos++]; + while (pos < len && c <= '9' && c >= '0') + { + size = (size * 10) + (c - '0'); + c = inbuf[pos++]; + } + if (pos == len && c <= '9' && c >= '0') + return false; + + if (c != ':') + throw bad_decode(F("bad automate stdio input; cannot read string")); + + if (pos + size > len) + return false; + + out = inbuf.substr(pos, size); + pos += size; + return true; + } + bool read_cmd(Command & cmd) + { + cmd.opts.clear(); + cmd.args.clear(); + + size_t len = inbuf.size(); + if (len < 2) + return false; + size_t pos = 0; + if (!skip_ws(pos, len)) + return false; + if (inbuf[pos] == 'o') + { + ++pos; + while (inbuf[pos] != 'e') + { + string opt, val; + if (!read_str(pos, len, opt)) + return false; + if (!read_str(pos, len, val)) + return false; + cmd.opts.push_back(make_pair(opt, val)); + if (!skip_ws(pos, len)) + return false; + if (pos == len) + return false; + }; + ++pos; + } + if (inbuf[pos] == 'l') + { + ++pos; + while (inbuf[pos] != 'e') + { + string arg; + if (!read_str(pos, len, arg)) + return false; + cmd.args.push_back(arg); + if (!skip_ws(pos, len)) + return false; + if (pos == len) + return false; + } + ++pos; + } + else + throw bad_decode(F("bad automate stdio input; cannot find command")); + + if (cmd.args.empty()) + throw bad_decode(F("bad automate stdio input: empty command")); + inbuf.pop_front(pos); + return true; + } + bool armed; + Command cmd; + + void note_bytes_in(int count) + { + protocol_state = working_state; + } + void note_bytes_out(int count) + { + size_t len = inbuf.size(); + size_t pos = 0; + if (output_empty() && !skip_ws(pos, len)) + { + protocol_state = confirmed_state; + } + } + std::ostringstream oss; + automate_ostream os; +public: + automate_session(app_state & app, + string const & peer_id, + shared_ptr str) : + session_base(peer_id, str), + app(app), armed(false), + os(oss, app.opts.automate_stdio_size) + { } + bool arm() + { + if (!armed) + { + if (output_overfull()) + { + return false; + } + armed = read_cmd(cmd); + } + return armed; + } + bool do_work(transaction_guard & guard) + { + try + { + if (!arm()) + return true; + } + catch (bad_decode & bd) + { + W(F("stdio protocol error processing %s : '%s'") + % peer_id % bd.what); + return false; + } + armed = false; + + args_vector args; + for (vector::iterator i = cmd.args.begin(); + i != cmd.args.end(); ++i) + { + args.push_back(arg_type(*i, origin::user)); + } + + oss.str(string()); + + try + { + options::options_type opts; + opts = options::opts::all_options() - options::opts::globals(); + opts.instantiate(&app.opts).reset(); + + command_id id; + for (args_vector::const_iterator iter = args.begin(); + iter != args.end(); iter++) + id.push_back(typecast_vocab(*iter)); + + set< command_id > matches = + CMD_REF(automate)->complete_command(id); + + if (matches.empty()) + { + E(false, origin::user, + F("no completions for this command")); + } + else if (matches.size() > 1) + { + E(false, origin::user, + F("multiple completions possible for this command")); + } + + id = *matches.begin(); + + I(args.size() >= id.size()); + for (command_id::size_type i = 0; i < id.size(); i++) + args.erase(args.begin()); + + command const * cmd = CMD_REF(automate)->find_command(id); + I(cmd != NULL); + automate const * acmd = reinterpret_cast< automate const * >(cmd); + + opts = options::opts::globals() | acmd->opts(); + + if (cmd->use_workspace_options()) + { + // Re-read the ws options file, rather than just copying + // the options from the previous apts.opts object, because + // the file may have changed due to user activity. + workspace::check_ws_format(); + workspace::get_ws_options(app.opts); + } + + opts.instantiate(&app.opts).from_key_value_pairs(this->cmd.opts); + acmd->exec_from_automate(app, id, args, os); + } + catch (recoverable_failure & f) + { + os.set_err(2); + os << f.what(); + } + os.end_cmd(); + queue_output(oss.str()); + return true; + } +}; + +class automate_listener : public listener_base +{ + app_state & app; + shared_ptr &guard; + Netxx::Address addr; + Netxx::Timeout timeout; + reactor & react; +public: + automate_listener(app_state & app, + shared_ptr &guard, + reactor & react, + bool use_ipv6) : + listener_base(shared_ptr()), + app(app), guard(guard), addr(use_ipv6), + timeout(static_cast(constants::netsync_timeout_seconds)), + react(react) + { + srv = make_server(app.opts.bind_automate_uris, 0, timeout, use_ipv6, addr); + } + bool do_io(Netxx::Probe::ready_type event) + { + L(FL("accepting new automate connection on %s : %s") + % (addr.get_name()?addr.get_name():"") % lexical_cast(addr.get_port())); + Netxx::Peer client = srv->accept_connection(); + + if (!client) + { + L(FL("accept() returned a dead client")); + } + else + { + P(F("accepted new client connection from %s : %s") + % client.get_address() % lexical_cast(client.get_port())); + + // 'false' here means not to revert changes when the SockOpt + // goes out of scope. + Netxx::SockOpt socket_options(client.get_socketfd(), false); + socket_options.set_non_blocking(); + + shared_ptr str = + shared_ptr + (new Netxx::Stream(client.get_socketfd(), timeout)); + + shared_ptr sess(new automate_session(app, + lexical_cast(client), + str)); + I(guard); + react.add(sess, *guard); + } + return true; + } +}; + static void -serve_connections(options & opts, +serve_connections(app_state & app, + options & opts, lua_hooks & lua, project_t & project, key_store & keys, @@ -3236,7 +3525,14 @@ serve_connections(options & opts, guard, use_ipv6)); react.add(listen, *guard); + if (!app.opts.bind_automate_uris.empty()) + { + shared_ptr al(new automate_listener(app, guard, + react, use_ipv6)); + react.add(al, *guard); + } + while (true) { if (!guard) @@ -3474,7 +3770,8 @@ void } void -run_netsync_protocol(options & opts, lua_hooks & lua, +run_netsync_protocol(app_state & app, + options & opts, lua_hooks & lua, project_t & project, key_store & keys, protocol_voice voice, protocol_role role, @@ -3510,7 +3807,7 @@ run_netsync_protocol(options & opts, lua serve_single_connection(project, sess); } else - serve_connections(opts, lua, project, keys, + serve_connections(app, opts, lua, project, keys, role, info.server.addrs); } else ============================================================ --- options_list.hh 67023185e0e775e535d178143ddad30bd6b2f298 +++ options_list.hh a234c3ac2d68bbc08a04c62194ae870ba99c5df8 @@ -113,6 +113,14 @@ OPTION(bind_opts, bind_stdio, false, "st } #endif +OPT(bind_automate_uris, "bind-automate", std::list, , + gettext_noop("serve 'automate stdio' connections on this address")) +#ifdef option_bodies +{ + bind_automate_uris.push_back(utf8(arg, origin::user)); +} +#endif + OPTVAR(branch, branch_name, branchname, ) OPTION(branch, branch, true, "branch,b", gettext_noop("select branch cert for operation"))