# # patch "ChangeLog" # from [464922b829dcadac9bb7b5fe425efb77e33647b1] # to [462c4cadf680d7c8ff7dfc39a40546dba6344969] # # patch "contrib/usher.cc" # from [49c1faf810dfd58a94b0a9ce0ab1aa376282addc] # to [5b222d1e4750f78fceaf30e5f0b583bf362b6507] # ======================================================================== --- ChangeLog 464922b829dcadac9bb7b5fe425efb77e33647b1 +++ ChangeLog 462c4cadf680d7c8ff7dfc39a40546dba6344969 @@ -1,3 +1,9 @@ +2005-10-18 Timothy Brownawell + + * contrib/usher.cc: support dynamic local servers + These are started when a client attempts to connect, and killed at + a set time interval after the last client has disconnected. + 2005-10-17 Timothy Brownawell * commands.cc (serve): check that the db is valid before beginning ======================================================================== --- contrib/usher.cc 49c1faf810dfd58a94b0a9ce0ab1aa376282addc +++ contrib/usher.cc 5b222d1e4750f78fceaf30e5f0b583bf362b6507 @@ -10,44 +10,80 @@ // This requires cooperation from the client, which means it only works // for recent (post-0.22) clients. // -// Usage: usher +// Usage: usher [-a ] [-p ] // +// is the address to listen on // is the local port to listen on // is a file containing lines of -// stem ip-address port-number +// stem remote ip-address port-number +// stem local // +// Example server-file: +// localhost.personal-project local -d /usr/local/src/project.db * +// net.venge.monotone remote 66.96.28.3 5253 +// // A request for a pattern starting with "stem" will be directed to the -// server at : +// server at :, if that stem is marked as remote, +// and to a local server managed by the usher, started with the given +// arguments ("monotone serve --bind=something "), +// if it is marked as local. +// No stem should have any other stem as an initial substring - having both +// "foo" and "foo-bar" as stems will not work properly. #include #include #include #include #include +#include #include #include #include #include #include #include +#include #include #include #include #include +#include +#include +#include + +using std::vector; +using std::max; +using std::string; +using std::list; +using std::set; +using boost::lexical_cast; +using std::cerr; + +// defaults, overridden by command line int listenport = 5253; +string listenaddr = "0.0.0.0"; -char const netsync_version = 5; +// keep local servers around for this many seconds after the last +// client disconnects from them (only accurate to ~10 seconds) +int const server_idle_timeout = 60; -char const * const greeting = " Hello! This is the monotone usher at localhost. What would you like?"; +// ranges that dynamic (local) servers can be put on +int const minport = 15000; +int const maxport = 65000; +int const minaddr[] = {127, 0, 1, 1}; +int const maxaddr[] = {127, 254, 254, 254}; +int currport = 0; +int curraddr[] = {0, 0, 0, 0}; -char const * const errmsg = "!Sorry, I don't know where to find that."; +char const netsync_version = 5; +string const greeting = " Hello! This is the monotone usher at localhost. What would you like?"; -#undef max -#define max(x,y) ((x) > (y) ? (x) : (y)) +string const errmsg = "!Sorry, I don't know where to find that."; + struct errstr { std::string name; @@ -64,32 +100,6 @@ return ret; } -struct record -{ - std::string stem; - std::string addr; - int port; -}; - -std::list servers; - -bool get_server(std::string const & reply, std::string & addr, int & port) -{ - std::list::iterator i; - for (i = servers.begin(); i != servers.end(); ++i) { - if (reply.find(i->stem) == 0) - break; - } - if (i == servers.end()) { - std::cerr<<"no server found for "<port; - addr = i->addr; -// std::cerr<<"server for "< all_socks; operator int() { if (!s) @@ -201,6 +212,7 @@ s = new int[2]; s[0] = ss; s[1] = 1; + all_socks.insert(s); } sock(sock const & ss) { @@ -208,19 +220,6 @@ if (s) s[1]++; } - void close() - { - if (!s || s[0] == -1) - return; - shutdown(s[0], SHUT_RDWR); - while (::close(s[0]) < 0) { - if (errno == EIO) - throw errstr("close failed", errno); - if (errno != EINTR) - break; - } - s[0]=-1; - } ~sock() { if (!s || s[1]--) @@ -231,14 +230,39 @@ // if you want it to throw errors, call close manually } delete[] s; + all_socks.erase(all_socks.find(s)); s = 0; } sock operator=(int ss) { - if (!s) + if (!s) { s = new int[2]; + all_socks.insert(s); + } s[0]=ss; } + void close() + { + if (!s || s[0] == -1) + return; + shutdown(s[0], SHUT_RDWR); + while (::close(s[0]) < 0) { + if (errno == EIO) + throw errstr("close failed", errno); + if (errno != EINTR) + break; + } + s[0]=-1; + } + static void close_all_socks() + { + for (set::iterator i = all_socks.begin(); i != all_socks.end(); ++i) { +// shutdown((*i)[0], SHUT_RDWR); + while (::close((*i)[0]) < 0) + if (errno != EINTR) + break; + } + } bool read_to(buffer & buf) { if (!s) @@ -270,8 +294,9 @@ return true; } }; +set sock::all_socks; -sock start(int port) +bool check_address_empty(string const & addr, int port) { sock s = tosserr(socket(AF_INET, SOCK_STREAM, 0), "socket()"); int yes = 1; @@ -279,29 +304,241 @@ &yes, sizeof(yes)), "setsockopt"); sockaddr_in a; memset (&a, 0, sizeof (a)); + if (!inet_aton(addr.c_str(), (in_addr *) &a.sin_addr.s_addr)) + throw errstr("bad ip address format", 0); a.sin_port = htons(port); a.sin_family = AF_INET; + int r = bind(s, (sockaddr *) &a, sizeof(a)); + s.close(); + return r == 0; +} + +void find_addr(string & addr, int & port) +{ + if (currport == 0) { + currport = minport-1; + for(int i = 0; i < 4; ++i) + curraddr[i] = minaddr[i]; + curraddr[0]; + } + do { + // get the next address in our list + if (++currport > maxport) { + currport = minport; + for (int i = 0; i < 4; ++i) { + if (++curraddr[i] <= maxaddr[i]) + break; + curraddr[i] = minaddr[i]; + } + } + port = currport; + addr = lexical_cast(curraddr[0]) + "." + + lexical_cast(curraddr[1]) + "." + + lexical_cast(curraddr[2]) + "." + + lexical_cast(curraddr[3]); + } while (!check_address_empty(addr, port)); +} + +sock start(string addr, int port) +{ + sock s = tosserr(socket(AF_INET, SOCK_STREAM, 0), "socket()"); + int yes = 1; + tosserr(setsockopt(s, SOL_SOCKET, SO_REUSEADDR, + &yes, sizeof(yes)), "setsockopt"); + sockaddr_in a; + memset (&a, 0, sizeof (a)); + if (!inet_aton(addr.c_str(), (in_addr *) &a.sin_addr.s_addr)) + throw errstr("bad ip address format", 0); + a.sin_port = htons(port); + a.sin_family = AF_INET; tosserr(bind(s, (sockaddr *) &a, sizeof(a)), "bind"); + cerr<<"bound to "< const & args) +{ + int err[2]; + if (pipe(err) < 0) + return false; + int pid = fork(); + if (pid == -1) { + close(err[0]); + close(err[1]); + cerr<<"Failed to fork server.\n"; + return false; + } else if (pid == 0) { + close(err[0]); + close(0); + close(1); + close(2); + sock::close_all_socks(); + if (dup2(err[1], 2) < 0) { + exit(1); + } + + char ** a = new char*[args.size()+1]; + for (int i = 0; i < args.size(); ++i) { + a[i] = new char[args[i].size()+1]; + memcpy(a[i], args[i].c_str(), args[i].size()+1); + } + a[args.size()] = 0; + + execvp(a[0], a); + exit(1); + } else { + close(err[1]); + char head[256]; + int got = 0; + int r = 0; + bool line = false; + // the first line output on the server's stderr will be either + // "monotone: beginning service on : " or + // "monotone: network error: bind(2) error: Address already in use" + while(r >= 0 && !line) { + r = read(err[0], head + got, 256 - got); + if (r > 0) { + for (int i = 0; i < r && !line; ++i) + if (head[got+i] == '\n') + line = true; + got += r; + } + } + head[got] = 0; + if (string(head).find("beginning service") != string::npos) + return pid; + kill(pid, SIGKILL); + do {r = waitpid(pid, 0, 0);} while (r==-1 && errno == EINTR); + return -1; + } +} + +struct server +{ + bool local; + int pid; + string arguments; + string addr; + int port; + int connection_count; + int last_conn_time; + server() : pid(-1), local(false), port(0), + connection_count(0), last_conn_time(0) + { + } + ~server() + { + yeskill(); + } + sock connect() + { + if (local && pid == -1) { + // server needs to be started + // we'll try 3 times, since there's a delay between our checking that + // a port's available and the server taking it + for (int i = 0; i < 3 && pid == -1; ++i) { + if (i > 0 || port == 0) + find_addr(addr, port); + vector args; + args.push_back("monotone"); + args.push_back("serve"); + args.push_back("--bind=" + addr + ":" + lexical_cast(port)); + int n = 0, m = 0; + n = arguments.find_first_not_of(" \t"); + while (n != string::npos && m != string::npos) { + m = arguments.find_first_of(" ", n); + args.push_back(arguments.substr(n, m-n)); + n = arguments.find_first_not_of(" ", m); + } + pid = fork_server(args); + } + } + sock s = make_outgoing(port, addr); + ++connection_count; + return s; + } + void disconnect() + { + if (--connection_count || !local) + return; + last_conn_time = time(0); + } + void maybekill() + { + int difftime = time(0) - last_conn_time; + if (difftime > server_idle_timeout && !connection_count) + yeskill(); + else if (waitpid(pid, 0, WNOHANG) == 0) { + pid = -1; + port = 0; + } + } + void yeskill() + { + if (local && pid != -1) { + kill(pid, SIGTERM); + int r; + do {r = waitpid(pid, 0, 0);} while (r==-1 && errno == EINTR); + pid = -1; + port = 0; + } + } + string name() + { + if (local && port == 0) + return "dynamic local server"; + else + return addr + ":" + lexical_cast(port); + } +}; + +struct record +{ + std::string stem; + server srv; +}; + +list servers; + +server * get_server(std::string const & stem) +{ + std::list::iterator i; + for (i = servers.begin(); i != servers.end(); ++i) { + if (stem.find(i->stem) == 0) + break; + } + if (i == servers.end()) { + std::cerr<<"no server found for "<srv; +} + +void kill_old_servers() +{ + std::list::iterator i; + for (i = servers.begin(); i != servers.end(); ++i) { + i->srv.maybekill(); + } +} + bool extract_reply(buffer & buf, std::string & out) { char *p; @@ -332,15 +569,16 @@ { static int counter; int num; - sock client; - sock server; + sock cli; + sock srv; bool have_routed; bool no_server; buffer cbuf; buffer sbuf; + server * who; channel(sock & c): num(++counter), - client(c), server(-1), - have_routed(false), no_server(false) + cli(c), srv(-1), + have_routed(false), no_server(false), who(0) { char * dat; int size; @@ -353,16 +591,21 @@ sbuf.fixwrite(size); delete[] dat; - client.write_from(sbuf); + cli.write_from(sbuf); } + ~channel() + { + if (who) + who->disconnect(); + } bool is_finished() { - return (client == -1) && (server == -1); + return (cli == -1) && (srv == -1); } void add_to_select(int & maxfd, fd_set & rd, fd_set & wr, fd_set & er) { - int c = client; - int s = server; + int c = cli; + int s = srv; if (c > 0) { FD_SET(c, &er); @@ -383,14 +626,14 @@ } bool process_selected(fd_set & rd, fd_set & wr, fd_set & er) { - int c = client; - int s = server; + int c = cli; + int s = srv; /* NB: read oob data before normal reads */ if (c > 0 && FD_ISSET(c, &er)) { char d; errno = 0; if (recv(c, &d, 1, MSG_OOB) < 1) - client.close(), c = -1; + cli.close(), c = -1; else send(s, &d, 1, MSG_OOB); } @@ -398,7 +641,7 @@ char d; errno = 0; if (recv(s, &d, 1, MSG_OOB) < 1) - server.close(), s = -1; + srv.close(), s = -1; else send(c, &d, 1, MSG_OOB); } @@ -407,19 +650,18 @@ int n; if (c > 0 && FD_ISSET(c, &rd)) { - if (!client.read_to(cbuf)) c = -1; + if (!cli.read_to(cbuf)) c = -1; if (!have_routed) { std::string reply; if (extract_reply(cbuf, reply)) { - int port; - std::string addr; - if (get_server(reply, addr, port)) { + who = get_server(reply); + if (who) { try { - server = make_outgoing(port, addr); + srv = who->connect(); have_routed = true; - s = server; + s = srv; } catch (errstr & e) { - std::cerr<<"cannot contact server "<name()<<"\n"; no_server = true; } } else { @@ -437,22 +679,22 @@ } } if (s > 0 && FD_ISSET(s, &rd)) { - if (!server.read_to(sbuf)) s = -1; + if (!srv.read_to(sbuf)) s = -1; } if (c > 0 && FD_ISSET(c, &wr)) { - if (!client.write_from(sbuf)) c = -1; + if (!cli.write_from(sbuf)) c = -1; } if (s > 0 && FD_ISSET(s, &wr)) { - if (!server.write_from(cbuf)) s = -1; + if (!srv.write_from(cbuf)) s = -1; } // close sockets we have nothing more to send to if (c < 0 && !cbuf.canread()) { - server.close(), s = -1; + srv.close(), s = -1; } if ((no_server || have_routed && s < 0) && !sbuf.canread()) { - client.close(), c = -1; + cli.close(), c = -1; } } }; @@ -460,33 +702,56 @@ int main (int argc, char **argv) { - if (argc != 3) { - fprintf (stderr, "Usage\n\tusher \n"); - exit (1); - } - - record rec; - std::ifstream cf(argv[2]); - int pos = 0; - while(cf) { - if (pos == 0) - cf>>rec.stem; - else if (pos == 1) - cf>>rec.addr; - else if (pos == 2) - cf>>rec.port; - else if (pos == 3) { - pos = -1; + { + char const * conffile = 0; + int i; + for (i = 1; i < argc; ++i) { + if (string(argv[i]) == "-a") + listenaddr = argv[++i]; + else if (string(argv[i]) == "-p") + listenport = lexical_cast(argv[++i]); + else + conffile = argv[i]; + } + if (conffile == 0 || i != argc) { + cerr<<"Usage:\n"; + cerr<<"\tusher [-a ] [-p ] \n"; + exit (1); + } + std::ifstream cf(conffile); + int pos = 0; + while(cf) { + string stem, type; + cf>>stem; + cf>>type; + if (!cf) + break; + record rec; + rec.stem = stem; + if (type == "local") { + rec.srv.local = true; + rec.srv.arguments.clear(); + while(cf.peek() != '\n') + rec.srv.arguments += cf.get(); + } else if (type == "remote") { + rec.srv.local = false; + cf>>rec.srv.addr; + cf>>rec.srv.port; + } else { + cerr<<"Error parsing "<add_to_select(nfds, rd, wr, er); - int r = select(nfds+1, &rd, &wr, &er, NULL); + timeval timeout; + timeout.tv_sec = 10; + timeout.tv_usec = 0; + int r = select(nfds+1, &rd, &wr, &er, &timeout); if (r == -1 && errno == EINTR) continue; @@ -525,7 +793,7 @@ // std::cerr<<"connect from "<::iterator> finished; @@ -537,7 +805,7 @@ finished.push_back(i); } catch (errstr & e) { finished.push_back(i); - std::cerr<<"Error proccessing connection "<num<<": "<num<<": "<::iterator>::iterator i = finished.begin(); @@ -548,6 +816,7 @@ delete newchan; newchan = 0; } + kill_old_servers(); } return 0; }