#
# patch "ChangeLog"
# from [464922b829dcadac9bb7b5fe425efb77e33647b1]
# to [462c4cadf680d7c8ff7dfc39a40546dba6344969]
#
# patch "contrib/usher.cc"
# from [49c1faf810dfd58a94b0a9ce0ab1aa376282addc]
# to [5b222d1e4750f78fceaf30e5f0b583bf362b6507]
#
========================================================================
--- ChangeLog 464922b829dcadac9bb7b5fe425efb77e33647b1
+++ ChangeLog 462c4cadf680d7c8ff7dfc39a40546dba6344969
@@ -1,3 +1,9 @@
+2005-10-18 Timothy Brownawell
+
+ * contrib/usher.cc: support dynamic local servers
+ These are started when a client attempts to connect, and killed at
+ a set time interval after the last client has disconnected.
+
2005-10-17 Timothy Brownawell
* commands.cc (serve): check that the db is valid before beginning
========================================================================
--- contrib/usher.cc 49c1faf810dfd58a94b0a9ce0ab1aa376282addc
+++ contrib/usher.cc 5b222d1e4750f78fceaf30e5f0b583bf362b6507
@@ -10,44 +10,80 @@
// This requires cooperation from the client, which means it only works
// for recent (post-0.22) clients.
//
-// Usage: usher
+// Usage: usher [-a ] [-p ]
//
+// is the address to listen on
// is the local port to listen on
// is a file containing lines of
-// stem ip-address port-number
+// stem remote ip-address port-number
+// stem local
//
+// Example server-file:
+// localhost.personal-project local -d /usr/local/src/project.db *
+// net.venge.monotone remote 66.96.28.3 5253
+//
// A request for a pattern starting with "stem" will be directed to the
-// server at :
+// server at :, if that stem is marked as remote,
+// and to a local server managed by the usher, started with the given
+// arguments ("monotone serve --bind=something "),
+// if it is marked as local.
+// No stem should have any other stem as an initial substring - having both
+// "foo" and "foo-bar" as stems will not work properly.
#include
#include
#include
#include
#include
+#include
#include
#include
#include
#include
#include
#include
+#include
#include
#include
#include
#include
+#include
+#include
+#include
+
+using std::vector;
+using std::max;
+using std::string;
+using std::list;
+using std::set;
+using boost::lexical_cast;
+using std::cerr;
+
+// defaults, overridden by command line
int listenport = 5253;
+string listenaddr = "0.0.0.0";
-char const netsync_version = 5;
+// keep local servers around for this many seconds after the last
+// client disconnects from them (only accurate to ~10 seconds)
+int const server_idle_timeout = 60;
-char const * const greeting = " Hello! This is the monotone usher at localhost. What would you like?";
+// ranges that dynamic (local) servers can be put on
+int const minport = 15000;
+int const maxport = 65000;
+int const minaddr[] = {127, 0, 1, 1};
+int const maxaddr[] = {127, 254, 254, 254};
+int currport = 0;
+int curraddr[] = {0, 0, 0, 0};
-char const * const errmsg = "!Sorry, I don't know where to find that.";
+char const netsync_version = 5;
+string const greeting = " Hello! This is the monotone usher at localhost. What would you like?";
-#undef max
-#define max(x,y) ((x) > (y) ? (x) : (y))
+string const errmsg = "!Sorry, I don't know where to find that.";
+
struct errstr
{
std::string name;
@@ -64,32 +100,6 @@
return ret;
}
-struct record
-{
- std::string stem;
- std::string addr;
- int port;
-};
-
-std::list servers;
-
-bool get_server(std::string const & reply, std::string & addr, int & port)
-{
- std::list::iterator i;
- for (i = servers.begin(); i != servers.end(); ++i) {
- if (reply.find(i->stem) == 0)
- break;
- }
- if (i == servers.end()) {
- std::cerr<<"no server found for "<port;
- addr = i->addr;
-// std::cerr<<"server for "< all_socks;
operator int()
{
if (!s)
@@ -201,6 +212,7 @@
s = new int[2];
s[0] = ss;
s[1] = 1;
+ all_socks.insert(s);
}
sock(sock const & ss)
{
@@ -208,19 +220,6 @@
if (s)
s[1]++;
}
- void close()
- {
- if (!s || s[0] == -1)
- return;
- shutdown(s[0], SHUT_RDWR);
- while (::close(s[0]) < 0) {
- if (errno == EIO)
- throw errstr("close failed", errno);
- if (errno != EINTR)
- break;
- }
- s[0]=-1;
- }
~sock()
{
if (!s || s[1]--)
@@ -231,14 +230,39 @@
// if you want it to throw errors, call close manually
}
delete[] s;
+ all_socks.erase(all_socks.find(s));
s = 0;
}
sock operator=(int ss)
{
- if (!s)
+ if (!s) {
s = new int[2];
+ all_socks.insert(s);
+ }
s[0]=ss;
}
+ void close()
+ {
+ if (!s || s[0] == -1)
+ return;
+ shutdown(s[0], SHUT_RDWR);
+ while (::close(s[0]) < 0) {
+ if (errno == EIO)
+ throw errstr("close failed", errno);
+ if (errno != EINTR)
+ break;
+ }
+ s[0]=-1;
+ }
+ static void close_all_socks()
+ {
+ for (set::iterator i = all_socks.begin(); i != all_socks.end(); ++i) {
+// shutdown((*i)[0], SHUT_RDWR);
+ while (::close((*i)[0]) < 0)
+ if (errno != EINTR)
+ break;
+ }
+ }
bool read_to(buffer & buf)
{
if (!s)
@@ -270,8 +294,9 @@
return true;
}
};
+set sock::all_socks;
-sock start(int port)
+bool check_address_empty(string const & addr, int port)
{
sock s = tosserr(socket(AF_INET, SOCK_STREAM, 0), "socket()");
int yes = 1;
@@ -279,29 +304,241 @@
&yes, sizeof(yes)), "setsockopt");
sockaddr_in a;
memset (&a, 0, sizeof (a));
+ if (!inet_aton(addr.c_str(), (in_addr *) &a.sin_addr.s_addr))
+ throw errstr("bad ip address format", 0);
a.sin_port = htons(port);
a.sin_family = AF_INET;
+ int r = bind(s, (sockaddr *) &a, sizeof(a));
+ s.close();
+ return r == 0;
+}
+
+void find_addr(string & addr, int & port)
+{
+ if (currport == 0) {
+ currport = minport-1;
+ for(int i = 0; i < 4; ++i)
+ curraddr[i] = minaddr[i];
+ curraddr[0];
+ }
+ do {
+ // get the next address in our list
+ if (++currport > maxport) {
+ currport = minport;
+ for (int i = 0; i < 4; ++i) {
+ if (++curraddr[i] <= maxaddr[i])
+ break;
+ curraddr[i] = minaddr[i];
+ }
+ }
+ port = currport;
+ addr = lexical_cast(curraddr[0]) + "." +
+ lexical_cast(curraddr[1]) + "." +
+ lexical_cast(curraddr[2]) + "." +
+ lexical_cast(curraddr[3]);
+ } while (!check_address_empty(addr, port));
+}
+
+sock start(string addr, int port)
+{
+ sock s = tosserr(socket(AF_INET, SOCK_STREAM, 0), "socket()");
+ int yes = 1;
+ tosserr(setsockopt(s, SOL_SOCKET, SO_REUSEADDR,
+ &yes, sizeof(yes)), "setsockopt");
+ sockaddr_in a;
+ memset (&a, 0, sizeof (a));
+ if (!inet_aton(addr.c_str(), (in_addr *) &a.sin_addr.s_addr))
+ throw errstr("bad ip address format", 0);
+ a.sin_port = htons(port);
+ a.sin_family = AF_INET;
tosserr(bind(s, (sockaddr *) &a, sizeof(a)), "bind");
+ cerr<<"bound to "< const & args)
+{
+ int err[2];
+ if (pipe(err) < 0)
+ return false;
+ int pid = fork();
+ if (pid == -1) {
+ close(err[0]);
+ close(err[1]);
+ cerr<<"Failed to fork server.\n";
+ return false;
+ } else if (pid == 0) {
+ close(err[0]);
+ close(0);
+ close(1);
+ close(2);
+ sock::close_all_socks();
+ if (dup2(err[1], 2) < 0) {
+ exit(1);
+ }
+
+ char ** a = new char*[args.size()+1];
+ for (int i = 0; i < args.size(); ++i) {
+ a[i] = new char[args[i].size()+1];
+ memcpy(a[i], args[i].c_str(), args[i].size()+1);
+ }
+ a[args.size()] = 0;
+
+ execvp(a[0], a);
+ exit(1);
+ } else {
+ close(err[1]);
+ char head[256];
+ int got = 0;
+ int r = 0;
+ bool line = false;
+ // the first line output on the server's stderr will be either
+ // "monotone: beginning service on : " or
+ // "monotone: network error: bind(2) error: Address already in use"
+ while(r >= 0 && !line) {
+ r = read(err[0], head + got, 256 - got);
+ if (r > 0) {
+ for (int i = 0; i < r && !line; ++i)
+ if (head[got+i] == '\n')
+ line = true;
+ got += r;
+ }
+ }
+ head[got] = 0;
+ if (string(head).find("beginning service") != string::npos)
+ return pid;
+ kill(pid, SIGKILL);
+ do {r = waitpid(pid, 0, 0);} while (r==-1 && errno == EINTR);
+ return -1;
+ }
+}
+
+struct server
+{
+ bool local;
+ int pid;
+ string arguments;
+ string addr;
+ int port;
+ int connection_count;
+ int last_conn_time;
+ server() : pid(-1), local(false), port(0),
+ connection_count(0), last_conn_time(0)
+ {
+ }
+ ~server()
+ {
+ yeskill();
+ }
+ sock connect()
+ {
+ if (local && pid == -1) {
+ // server needs to be started
+ // we'll try 3 times, since there's a delay between our checking that
+ // a port's available and the server taking it
+ for (int i = 0; i < 3 && pid == -1; ++i) {
+ if (i > 0 || port == 0)
+ find_addr(addr, port);
+ vector args;
+ args.push_back("monotone");
+ args.push_back("serve");
+ args.push_back("--bind=" + addr + ":" + lexical_cast(port));
+ int n = 0, m = 0;
+ n = arguments.find_first_not_of(" \t");
+ while (n != string::npos && m != string::npos) {
+ m = arguments.find_first_of(" ", n);
+ args.push_back(arguments.substr(n, m-n));
+ n = arguments.find_first_not_of(" ", m);
+ }
+ pid = fork_server(args);
+ }
+ }
+ sock s = make_outgoing(port, addr);
+ ++connection_count;
+ return s;
+ }
+ void disconnect()
+ {
+ if (--connection_count || !local)
+ return;
+ last_conn_time = time(0);
+ }
+ void maybekill()
+ {
+ int difftime = time(0) - last_conn_time;
+ if (difftime > server_idle_timeout && !connection_count)
+ yeskill();
+ else if (waitpid(pid, 0, WNOHANG) == 0) {
+ pid = -1;
+ port = 0;
+ }
+ }
+ void yeskill()
+ {
+ if (local && pid != -1) {
+ kill(pid, SIGTERM);
+ int r;
+ do {r = waitpid(pid, 0, 0);} while (r==-1 && errno == EINTR);
+ pid = -1;
+ port = 0;
+ }
+ }
+ string name()
+ {
+ if (local && port == 0)
+ return "dynamic local server";
+ else
+ return addr + ":" + lexical_cast(port);
+ }
+};
+
+struct record
+{
+ std::string stem;
+ server srv;
+};
+
+list servers;
+
+server * get_server(std::string const & stem)
+{
+ std::list::iterator i;
+ for (i = servers.begin(); i != servers.end(); ++i) {
+ if (stem.find(i->stem) == 0)
+ break;
+ }
+ if (i == servers.end()) {
+ std::cerr<<"no server found for "<srv;
+}
+
+void kill_old_servers()
+{
+ std::list::iterator i;
+ for (i = servers.begin(); i != servers.end(); ++i) {
+ i->srv.maybekill();
+ }
+}
+
bool extract_reply(buffer & buf, std::string & out)
{
char *p;
@@ -332,15 +569,16 @@
{
static int counter;
int num;
- sock client;
- sock server;
+ sock cli;
+ sock srv;
bool have_routed;
bool no_server;
buffer cbuf;
buffer sbuf;
+ server * who;
channel(sock & c): num(++counter),
- client(c), server(-1),
- have_routed(false), no_server(false)
+ cli(c), srv(-1),
+ have_routed(false), no_server(false), who(0)
{
char * dat;
int size;
@@ -353,16 +591,21 @@
sbuf.fixwrite(size);
delete[] dat;
- client.write_from(sbuf);
+ cli.write_from(sbuf);
}
+ ~channel()
+ {
+ if (who)
+ who->disconnect();
+ }
bool is_finished()
{
- return (client == -1) && (server == -1);
+ return (cli == -1) && (srv == -1);
}
void add_to_select(int & maxfd, fd_set & rd, fd_set & wr, fd_set & er)
{
- int c = client;
- int s = server;
+ int c = cli;
+ int s = srv;
if (c > 0) {
FD_SET(c, &er);
@@ -383,14 +626,14 @@
}
bool process_selected(fd_set & rd, fd_set & wr, fd_set & er)
{
- int c = client;
- int s = server;
+ int c = cli;
+ int s = srv;
/* NB: read oob data before normal reads */
if (c > 0 && FD_ISSET(c, &er)) {
char d;
errno = 0;
if (recv(c, &d, 1, MSG_OOB) < 1)
- client.close(), c = -1;
+ cli.close(), c = -1;
else
send(s, &d, 1, MSG_OOB);
}
@@ -398,7 +641,7 @@
char d;
errno = 0;
if (recv(s, &d, 1, MSG_OOB) < 1)
- server.close(), s = -1;
+ srv.close(), s = -1;
else
send(c, &d, 1, MSG_OOB);
}
@@ -407,19 +650,18 @@
int n;
if (c > 0 && FD_ISSET(c, &rd)) {
- if (!client.read_to(cbuf)) c = -1;
+ if (!cli.read_to(cbuf)) c = -1;
if (!have_routed) {
std::string reply;
if (extract_reply(cbuf, reply)) {
- int port;
- std::string addr;
- if (get_server(reply, addr, port)) {
+ who = get_server(reply);
+ if (who) {
try {
- server = make_outgoing(port, addr);
+ srv = who->connect();
have_routed = true;
- s = server;
+ s = srv;
} catch (errstr & e) {
- std::cerr<<"cannot contact server "<name()<<"\n";
no_server = true;
}
} else {
@@ -437,22 +679,22 @@
}
}
if (s > 0 && FD_ISSET(s, &rd)) {
- if (!server.read_to(sbuf)) s = -1;
+ if (!srv.read_to(sbuf)) s = -1;
}
if (c > 0 && FD_ISSET(c, &wr)) {
- if (!client.write_from(sbuf)) c = -1;
+ if (!cli.write_from(sbuf)) c = -1;
}
if (s > 0 && FD_ISSET(s, &wr)) {
- if (!server.write_from(cbuf)) s = -1;
+ if (!srv.write_from(cbuf)) s = -1;
}
// close sockets we have nothing more to send to
if (c < 0 && !cbuf.canread()) {
- server.close(), s = -1;
+ srv.close(), s = -1;
}
if ((no_server || have_routed && s < 0) && !sbuf.canread()) {
- client.close(), c = -1;
+ cli.close(), c = -1;
}
}
};
@@ -460,33 +702,56 @@
int main (int argc, char **argv)
{
- if (argc != 3) {
- fprintf (stderr, "Usage\n\tusher \n");
- exit (1);
- }
-
- record rec;
- std::ifstream cf(argv[2]);
- int pos = 0;
- while(cf) {
- if (pos == 0)
- cf>>rec.stem;
- else if (pos == 1)
- cf>>rec.addr;
- else if (pos == 2)
- cf>>rec.port;
- else if (pos == 3) {
- pos = -1;
+ {
+ char const * conffile = 0;
+ int i;
+ for (i = 1; i < argc; ++i) {
+ if (string(argv[i]) == "-a")
+ listenaddr = argv[++i];
+ else if (string(argv[i]) == "-p")
+ listenport = lexical_cast(argv[++i]);
+ else
+ conffile = argv[i];
+ }
+ if (conffile == 0 || i != argc) {
+ cerr<<"Usage:\n";
+ cerr<<"\tusher [-a ] [-p ] \n";
+ exit (1);
+ }
+ std::ifstream cf(conffile);
+ int pos = 0;
+ while(cf) {
+ string stem, type;
+ cf>>stem;
+ cf>>type;
+ if (!cf)
+ break;
+ record rec;
+ rec.stem = stem;
+ if (type == "local") {
+ rec.srv.local = true;
+ rec.srv.arguments.clear();
+ while(cf.peek() != '\n')
+ rec.srv.arguments += cf.get();
+ } else if (type == "remote") {
+ rec.srv.local = false;
+ cf>>rec.srv.addr;
+ cf>>rec.srv.port;
+ } else {
+ cerr<<"Error parsing "<add_to_select(nfds, rd, wr, er);
- int r = select(nfds+1, &rd, &wr, &er, NULL);
+ timeval timeout;
+ timeout.tv_sec = 10;
+ timeout.tv_usec = 0;
+ int r = select(nfds+1, &rd, &wr, &er, &timeout);
if (r == -1 && errno == EINTR)
continue;
@@ -525,7 +793,7 @@
// std::cerr<<"connect from "<::iterator> finished;
@@ -537,7 +805,7 @@
finished.push_back(i);
} catch (errstr & e) {
finished.push_back(i);
- std::cerr<<"Error proccessing connection "<num<<": "<num<<": "<::iterator>::iterator i = finished.begin();
@@ -548,6 +816,7 @@
delete newchan;
newchan = 0;
}
+ kill_old_servers();
}
return 0;
}