# # # add_file "net_common.cc" # content [149d1818998cabb5f1272b7661187bf6a089fb32] # # add_file "net_common.hh" # content [35e33459670654ffe1b980f0cb85058d56df2096] # # patch "Makefile.am" # from [e9e3d2c42df7879640a66e21854ad11a3f8eba30] # to [6f447accf75920caed5092841638835d59a429d9] # # patch "cmd_scgi.cc" # from [0ba8a300f548e224ba3c7f716de24fa4aac2279b] # to [18ff8d191dfc47514b90fd910d811a25284b0266] # # patch "constants.hh" # from [26ea1fa80b9236854f37ac0fbdeaccf6236a5c90] # to [543fe816b29fe333a10afbc71e64e63164d8c9c3] # # patch "http_client.cc" # from [52d1fd430adb95bae1d814a2bebbdb99128fa0f4] # to [04067dcf693ff4f61072c778668e58c69d728fe6] # # patch "netsync.cc" # from [1e875fe8ba23a5c956819d1d81b7322feff2682f] # to [6ccf9f2ca21fcc1865ac22d243eb381519a3d37e] # ============================================================ --- net_common.cc 149d1818998cabb5f1272b7661187bf6a089fb32 +++ net_common.cc 149d1818998cabb5f1272b7661187bf6a089fb32 @@ -0,0 +1,110 @@ +// Copyright (C) 2008 Graydon Hoare +// +// This program is made available under the GNU GPL version 2.0 or +// greater. See the accompanying file COPYING for details. +// +// This program is distributed WITHOUT ANY WARRANTY; without even the +// implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR +// PURPOSE. + +#include "base.hh" + +#include "app_state.hh" +#include "globish.hh" +#include "net_common.hh" +#include "uri.hh" +#include "vocab.hh" + +#include +#include +#include + +#include + +#include "netxx/address.h" +#include "netxx/stream.h" +#include "netxx/streambase.h" +#include "netxx/timeout.h" +#include "netxx_pipe.hh" + + +using std::string; +using std::list; +using std::vector; +using boost::shared_ptr; + +void +add_address_names(Netxx::Address & addr, + std::list const & addresses, + Netxx::port_type default_port) +{ + if (addresses.empty()) + addr.add_all_addresses(default_port); + else + { + for (std::list::const_iterator it = addresses.begin(); it != addresses.end(); ++it) + { + const utf8 & address = *it; + if (!address().empty()) + { + size_t l_colon = address().find(':'); + size_t r_colon = address().rfind(':'); + + if (l_colon == r_colon && l_colon == 0) + { + // can't be an IPv6 address as there is only one colon + // must be a : followed by a port + string port_str = address().substr(1); + addr.add_all_addresses(std::atoi(port_str.c_str())); + } + else + addr.add_address(address().c_str(), default_port); + } + } + } +} + +shared_ptr +build_stream_to_server(app_state & app, + uri const & u, + globish const & include_pattern, + globish const & exclude_pattern, + Netxx::port_type default_port, + Netxx::Timeout timeout) +{ + shared_ptr server; + vector argv; + + if (app.lua.hook_get_netsync_connect_command(u, + include_pattern, + exclude_pattern, + global_sanity.debug_p(), + argv)) + { + I(argv.size() > 0); + string cmd = argv[0]; + argv.erase(argv.begin()); + app.opts.use_transport_auth = app.lua.hook_use_transport_auth(u); + return shared_ptr + (new Netxx::PipeStream(cmd, argv)); + } + else + { +#ifdef USE_IPV6 + bool use_ipv6=true; +#else + bool use_ipv6=false; +#endif + Netxx::Address addr(u.host.c_str(), default_port, use_ipv6); + return shared_ptr(new Netxx::Stream(addr, timeout)); + } +} + + +// Local Variables: +// mode: C++ +// fill-column: 76 +// c-file-style: "gnu" +// indent-tabs-mode: nil +// End: +// vim: et:sw=2:sts=2:ts=2:cino=>2s,{s,\:s,+s,t0,g0,^-2,e-2,n-2,p2s,(0,=s: ============================================================ --- net_common.hh 35e33459670654ffe1b980f0cb85058d56df2096 +++ net_common.hh 35e33459670654ffe1b980f0cb85058d56df2096 @@ -0,0 +1,57 @@ +#ifndef __NET_COMMON_HH__ +#define __NET_COMMON_HH__ + +// Copyright (C) 2008 Graydon Hoare +// +// This program is made available under the GNU GPL version 2.0 or +// greater. See the accompanying file COPYING for details. +// +// This program is distributed WITHOUT ANY WARRANTY; without even the +// implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR +// PURPOSE. + +#include "base.hh" + +#include + +#include "netxx/types.h" + +namespace Netxx { + class Address; + class Timeout; + class StreamBase; +} + +struct globish; +struct utf8; +struct uri; +struct app_state; + + +// This just covers helper routines that are shared across networking +// facilities (netsync and gsync). When we retire netsync, we can retire +// this file and shift the code into http_client or gsync. + +void +add_address_names(Netxx::Address & addr, + std::list const & addresses, + Netxx::port_type default_port); + +boost::shared_ptr +build_stream_to_server(app_state & app, + uri const & u, + globish const & include_pattern, + globish const & exclude_pattern, + Netxx::port_type default_port, + Netxx::Timeout timeout); + + +// Local Variables: +// mode: C++ +// fill-column: 76 +// c-file-style: "gnu" +// indent-tabs-mode: nil +// End: +// vim: et:sw=2:sts=2:ts=2:cino=>2s,{s,\:s,+s,t0,g0,^-2,e-2,n-2,p2s,(0,=s: + +#endif // __NET_COMMON_HH__ ============================================================ --- Makefile.am e9e3d2c42df7879640a66e21854ad11a3f8eba30 +++ Makefile.am 6f447accf75920caed5092841638835d59a429d9 @@ -45,6 +45,7 @@ MOST_SOURCES = \ schema_migration.cc schema_migration.hh \ refiner.cc refiner.hh \ enumerator.cc enumerator.hh \ + net_common.cc net_common.hh \ netsync.cc \ gsync.cc \ netxx_pipe.cc netxx_pipe.hh \ ============================================================ --- cmd_scgi.cc 0ba8a300f548e224ba3c7f716de24fa4aac2279b +++ cmd_scgi.cc 18ff8d191dfc47514b90fd910d811a25284b0266 @@ -12,14 +12,25 @@ #include #include +#include "app_state.hh" #include "cmd.hh" -#include "app_state.hh" -#include "ui.hh" +#include "constants.hh" +#include "globish.hh" +#include "json_io.hh" +#include "keys.hh" #include "lexical_cast.hh" #include "lua.hh" #include "lua_hooks.hh" -#include "json_io.hh" +#include "net_common.hh" +#include "ui.hh" +#include "netxx/address.h" +#include "netxx/peer.h" +#include "netxx/netbuf.h" +#include "netxx/socket.h" +#include "netxx/stream.h" +#include "netxx/streamserver.h" + using std::istream; using std::make_pair; using std::map; @@ -216,27 +227,15 @@ do_cmd(app_state & app, json_io::json_ob } -CMD_NO_WORKSPACE(scgi, // C - "scgi", // name - "", // aliases - CMD_REF(network), // parent - N_(""), // params - N_("Serves SCGI+JSON connections"), // abstract - "", // desc - options::opts::none // options - ) +void +process_scgi_transaction(app_state & app, + std::istream & in, + std::ostream & out) { - // FIXME: expand this function to take a pathname for a win32 named pipe - // or unix domain socket, for accept/read/dispatch loop. - - N(args.size() == 0, - F("no arguments needed")); - string data; - - if (parse_scgi(std::cin, data)) + if (parse_scgi(in, data)) { - L(FL("read SCGI request: [[%s]]") % data); + L(FL("read %d-byte SCGI request") % data.size()); json_io::input_source in(data, "scgi"); json_io::tokenizer tok(in); @@ -245,30 +244,138 @@ CMD_NO_WORKSPACE(scgi, // C if (static_cast(obj)) { + transaction_guard guard(app.db); L(FL("read JSON object")); json_io::json_object_t res = do_cmd(app, obj); if (static_cast(res)) { - L(FL("sending JSON response")); - json_io::printer out; - res->write(out); + json_io::printer out_data; + res->write(out_data); + L(FL("sending JSON %d-byte response") % (out_data.buf.size() + 1)); - std::cout << "Status: 200 OK\r\n" - << "Content-Length: " << (out.buf.size() + 1) << "\r\n" - << "Content-Type: application/jsonrequest\r\n" - << "\r\n"; + out << "Status: 200 OK\r\n" + << "Content-Length: " << (out_data.buf.size() + 1) << "\r\n" + << "Content-Type: application/jsonrequest\r\n" + << "\r\n"; - std::cout.write(out.buf.data(), out.buf.size()); - std::cout << "\n"; + out.write(out_data.buf.data(), out_data.buf.size()); + out << "\n"; + out.flush(); return; } } + } + out << "Status: 400 Bad request\r\n" + << "Content-Type: application/jsonrequest\r\n" + << "\r\n"; + out.flush(); +} + + +CMD_NO_WORKSPACE(scgi, // C + "scgi", // name + "", // aliases + CMD_REF(network), // parent + N_(""), // params + N_("Serves SCGI+JSON connections"), // abstract + "", // desc + options::opts::bind | + options::opts::pidfile | + options::opts::bind_stdio | + options::opts::no_transport_auth + ) +{ + if (!args.empty()) + throw usage(execid); + + if (app.opts.signing_key() == "") + { + rsa_keypair_id key; + get_user_key(key, app); + app.opts.signing_key = key; + } + + if (app.opts.use_transport_auth) + { + N(app.lua.hook_persist_phrase_ok(), + F("need permission to store persistent passphrase (see hook persist_phrase_ok())")); + require_password(app.opts.signing_key, app); + } + else if (!app.opts.bind_stdio) + W(F("The --no-transport-auth option is usually only used in combination with --stdio")); - std::cout << "Status: 400 Bad request\r\n" - << "Content-Type: application/jsonrequest\r\n" - << "\r\n"; + if (app.opts.bind_stdio) + process_scgi_transaction(app, std::cin, std::cout); + else + { + +#ifdef USE_IPV6 + bool use_ipv6=true; +#else + bool use_ipv6=false; +#endif + // This will be true when we try to bind while using IPv6. See comments + // further down. + bool try_again=false; + + do + { + try + { + try_again = false; + + Netxx::Address addr(use_ipv6); + + add_address_names(addr, app.opts.bind_uris, constants::default_scgi_port); + + // If we use IPv6 and the initialisation of server fails, we want + // to try again with IPv4. The reason is that someone may have + // downloaded a IPv6-enabled monotone on a system that doesn't + // have IPv6, and which might fail therefore. + // On failure, Netxx::NetworkException is thrown, and we catch + // it further down. + try_again=use_ipv6; + + Netxx::StreamServer server(addr); + + // If we came this far, whatever we used (IPv6 or IPv4) was + // accepted, so we don't need to try again any more. + try_again=false; + + while (true) + { + Netxx::Peer peer = server.accept_connection(); + if (peer) + { + Netxx::Stream stream(peer.get_socketfd()); + Netxx::Netbuf buf(stream); + std::iostream io(&buf); + process_scgi_transaction(app, io, io); + } + else + break; + } + } + // Possibly loop around if we get exceptions from Netxx and we're + // attempting to use ipv6, or have some other reason to try again. + catch (Netxx::NetworkException &) + { + if (try_again) + use_ipv6 = false; + else + throw; + } + catch (Netxx::Exception &) + { + if (try_again) + use_ipv6 = false; + else + throw; + } + } while (try_again); + } } // Local Variables: ============================================================ --- constants.hh 26ea1fa80b9236854f37ac0fbdeaccf6236a5c90 +++ constants.hh 543fe816b29fe333a10afbc71e64e63164d8c9c3 @@ -101,6 +101,9 @@ namespace constants // maximum number nodes in a randomized gsync probe set std::size_t const gsync_max_probe_set_size = 128; + // TCP port to listen on / connect to when doing scgi service + std::size_t const default_scgi_port = 3000; + // standard HTTP port number std::size_t const default_http_port = 80; ============================================================ --- http_client.cc 52d1fd430adb95bae1d814a2bebbdb99128fa0f4 +++ http_client.cc 04067dcf693ff4f61072c778668e58c69d728fe6 @@ -14,6 +14,7 @@ #include "globish.hh" #include "http_client.hh" #include "json_io.hh" +#include "net_common.hh" #include "sanity.hh" #include "lexical_cast.hh" #include "constants.hh" @@ -45,42 +46,6 @@ using Netxx::PipeStream; using Netxx::PipeStream; - -static shared_ptr -build_stream(app_state & app, - uri const & u, - globish const & include_pattern, - globish const & exclude_pattern, - Netxx::port_type default_port) -{ - Timeout timeout(static_cast(constants::netsync_timeout_seconds)), - instant(0,1); - vector argv; - if (app.lua.hook_get_netsync_connect_command(u, - include_pattern, - exclude_pattern, - global_sanity.debug_p(), - argv)) - { - I(argv.size() > 0); - string cmd = argv[0]; - argv.erase(argv.begin()); - app.opts.use_transport_auth = app.lua.hook_use_transport_auth(u); - return shared_ptr(new PipeStream(cmd, argv)); - } - else - { -#ifdef USE_IPV6 - bool use_ipv6=true; -#else - bool use_ipv6=false; -#endif - Netxx::Address addr(u.host.c_str(), default_port, use_ipv6); - return shared_ptr(new Stream(addr, timeout)); - } -} - - http_client::http_client(app_state & app, uri const & u, globish const & include_pattern, @@ -89,8 +54,9 @@ http_client::http_client(app_state & app u(u), include_pattern(include_pattern), exclude_pattern(exclude_pattern), - stream(build_stream(app, u, include_pattern, exclude_pattern, - constants::default_http_port)), + stream(build_stream_to_server(app, u, include_pattern, exclude_pattern, + constants::default_http_port, + Netxx::Timeout(static_cast(constants::netsync_timeout_seconds)))), nb(new Netbuf(*stream)), io(new iostream(&(*nb))), open(true) @@ -102,8 +68,9 @@ http_client::transact_json(json_value_t if (!open) { L(FL("reopening connection")); - stream = build_stream(app, u, include_pattern, exclude_pattern, - constants::default_http_port); + stream = build_stream_to_server(app, u, include_pattern, exclude_pattern, + constants::default_http_port, + Netxx::Timeout(static_cast(constants::netsync_timeout_seconds))); nb = shared_ptr< Netbuf >(new Netbuf(*stream)); io = shared_ptr(new iostream(&(*nb))); open = true; @@ -131,11 +98,12 @@ http_client::transact_json(json_value_t L(FL("http_client: sending request [[POST %s HTTP/1.0]]") % (u.path.empty() ? "/" : u.path)); L(FL("http_client: to [[Host: %s]]") % u.host); + L(FL("http_client: sending %d-byte body") % out.buf.size()); io->write(header.data(), header.size()); io->write(out.buf.data(), out.buf.length()); io->flush(); + L(FL("http_client: sent %d-byte body") % out.buf.size()); - L(FL("http_client: sending %d-byte body") % out.buf.size()); // Now read back the result string data; @@ -147,14 +115,14 @@ http_client::transact_json(json_value_t } - void http_client::parse_http_status_line() { // We're only interested in 200-series responses string tmp; string pat("HTTP/1.0 200"); - while (tmp.empty()) + L(FL("http_client: reading response...")); + while (io->good() && tmp.empty()) std::getline(*io, tmp); L(FL("http_client: response: [[%s]]") % tmp); E(tmp.substr(0,pat.size()) == pat, F("HTTP status line: %s") % tmp); @@ -180,6 +148,7 @@ http_client::parse_http_header_line(size || v == "keep-alive"); } + void http_client::crlf() { @@ -187,6 +156,7 @@ http_client::crlf() E(io->get() == '\n', F("expected LF in HTTP response")); } + void http_client::parse_http_response(std::string & data) { @@ -194,7 +164,7 @@ http_client::parse_http_response(std::st bool keepalive = false; data.clear(); parse_http_status_line(); - while (io->peek() != '\r') + while (io->good() && io->peek() != '\r') parse_http_header_line(content_length, keepalive); crlf(); ============================================================ --- netsync.cc 1e875fe8ba23a5c956819d1d81b7322feff2682f +++ netsync.cc 6ccf9f2ca21fcc1865ac22d243eb381519a3d37e @@ -18,9 +18,7 @@ #include #include "lexical_cast.hh" -#include #include -#include #include "app_state.hh" #include "cert.hh" @@ -30,6 +28,7 @@ #include "lua.hh" #include "merkle_tree.hh" #include "netcmd.hh" +#include "net_common.hh" #include "netio.hh" #include "numeric_vocab.hh" #include "refiner.hh" @@ -43,6 +42,7 @@ #include "hmac.hh" #include "globish.hh" #include "uri.hh" +#include "vocab.hh" #include "botan/botan.h" @@ -2333,47 +2333,6 @@ bool session::process(transaction_guard } -static shared_ptr -build_stream_to_server(app_state & app, - globish const & include_pattern, - globish const & exclude_pattern, - utf8 const & address, - Netxx::port_type default_port, - Netxx::Timeout timeout) -{ - shared_ptr server; - uri u; - vector argv; - - parse_uri(address(), u); - if (app.lua.hook_get_netsync_connect_command(u, - include_pattern, - exclude_pattern, - global_sanity.debug_p(), - argv)) - { - I(argv.size() > 0); - string cmd = argv[0]; - argv.erase(argv.begin()); - app.opts.use_transport_auth = app.lua.hook_use_transport_auth(u); - return shared_ptr - (new Netxx::PipeStream(cmd, argv)); - - } - else - { -#ifdef USE_IPV6 - bool use_ipv6=true; -#else - bool use_ipv6=false; -#endif - Netxx::Address addr(address().c_str(), - default_port, use_ipv6); - return shared_ptr - (new Netxx::Stream(addr, timeout)); - } -} - static void call_server(protocol_role role, globish const & include_pattern, @@ -2390,13 +2349,16 @@ call_server(protocol_role role, Netxx::Timeout timeout(static_cast(timeout_seconds)), instant(0,1); - P(F("connecting to %s") % address); + uri u; + parse_uri(address(), u); + P(F("connecting to %s") % address); shared_ptr server = build_stream_to_server(app, + u, include_pattern, exclude_pattern, - address, default_port, + default_port, timeout); @@ -2770,32 +2732,8 @@ serve_connections(protocol_role role, try_again = false; Netxx::Address addr(use_ipv6); + add_address_names(addr, addresses, default_port); - if (addresses.empty()) - addr.add_all_addresses(default_port); - else - { - for (std::list::const_iterator it = addresses.begin(); it != addresses.end(); ++it) - { - const utf8 & address = *it; - if (!address().empty()) - { - size_t l_colon = address().find(':'); - size_t r_colon = address().rfind(':'); - - if (l_colon == r_colon && l_colon == 0) - { - // can't be an IPv6 address as there is only one colon - // must be a : followed by a port - string port_str = address().substr(1); - addr.add_all_addresses(std::atoi(port_str.c_str())); - } - else - addr.add_address(address().c_str(), default_port); - } - } - } - // If se use IPv6 and the initialisation of server fails, we want // to try again with IPv4. The reason is that someone may have // downloaded a IPv6-enabled monotone on a system that doesn't @@ -2848,10 +2786,12 @@ serve_connections(protocol_role role, try { + uri u; + parse_uri(addr(), u); P(F("connecting to %s") % addr()); shared_ptr server - = build_stream_to_server(app, inc, exc, - addr, default_port, + = build_stream_to_server(app, u, inc, exc, + default_port, timeout); // 'false' here means not to revert changes when @@ -2961,29 +2901,19 @@ serve_connections(protocol_role role, } } } - // This exception is thrown when bind() fails somewhere in Netxx. + // Possibly loop around if we get exceptions from Netxx and we're + // attempting to use ipv6, or have some other reason to try again. catch (Netxx::NetworkException &) { - // If we tried with IPv6 and failed, we want to try again using IPv4. if (try_again) - { - use_ipv6 = false; - } - // In all other cases, just rethrow the exception. + use_ipv6 = false; else throw; } - // This exception is thrown when there is no support for the type of - // connection we want to do in the kernel, for example when a socket() - // call fails somewhere in Netxx. catch (Netxx::Exception &) { - // If we tried with IPv6 and failed, we want to try again using IPv4. if (try_again) - { use_ipv6 = false; - } - // In all other cases, just rethrow the exception. else throw; }