# # add_dir "" # # add_file "administrator.cc" # content [9dad369694bb93586422be021b9e0c97cb6cf073] # # add_file "administrator.hh" # content [f44bcbb5a1ed11d650be9000a67b1c9ba15c7509] # # add_file "basic_io_serverlist_reader.cc" # content [1e4823fd4213a8a1bef5bb35bcfe31562e737d64] # # add_file "basic_io_serverlist_reader.hh" # content [eec996e24c5eb8cea9fb675b2f61f096716f6b15] # # add_file "buffer.cc" # content [6930ee89bbdc297f11453ade805115110116acd1] # # add_file "buffer.hh" # content [4c747ee60d3e481cd03b17a4dff0d75a1dc7f474] # # add_file "channel.cc" # content [484866adc662ada029b74fe057c4f02a2d4149ce] # # add_file "channel.hh" # content [f6bce5e76eca88fd407e1bc87ded2647010cb3f6] # # add_file "err.cc" # content [2fef2a9feb7078a0f30ed64c6a1789ddfca7f5bb] # # add_file "err.hh" # content [6b90b83b476c2210c867fecfedefa65a40d7b31c] # # add_file "errstr.hh" # content [df9ea7a89888c78128c0a9e2bf3ce79dde7b6c18] # # add_file "io.cc" # content [b0753ee961d8799ad24f008d6497132fc07ad9c2] # # add_file "io.hh" # content [ccad8be31096e6ca11cae88425b5986ea258f07c] # # add_file "server.cc" # content [28e10a516fead692ae7458380b028278b2b8ff7a] # # add_file "server.hh" # content [ea20a68b9e2148f3a6ed3d2ec6c840ea908aa98c] # # add_file "server_manager.cc" # content [7d541212bc837f286f076a50f425539bb1e40f9d] # # add_file "server_manager.hh" # content [e8ce8f0ba21d9b3584ef88bc99397f431a4a56df] # # add_file "serverlist_reader.hh" # content [08600cfed47d72207bd084f9d1ab9d9cb5642fab] # # add_file "sock.cc" # content [35c7fe924e267ab211b550ef0af7d835e4224687] # # add_file "sock.hh" # content [9c83cf93213eae40a676161d3c29868e4d77503b] # # add_file "usher.cc" # content [6f96de0f0086858777b09d4dab290835d8d6e8cc] # # add_file "usher.txt" # content [c46389df6e6c49b81d83d05de68f546ccf4433cd] --- administrator.cc +++ administrator.cc @@ -0,0 +1,210 @@ +#include "administrator.hh" + +#include + +administrator::cstate::cstate() + : auth(false), rdone(false) +{ +} + +administrator::administrator() + : port(-1) +{ +} + +bool +administrator::process(cstate & cs) +{ + unsigned int n = cs.buf.find("\n"); + if (n == cs.buf.npos) + return true; + string l = cs.buf.substr(0, n); + cs.buf.erase(0, n+1); + std::istringstream iss(l); + string cmd; + iss>>cmd; + if (cmd == "USERPASS") { + string user, pass; + iss>>user>>pass; + map::iterator i = admins.find(user); + if (i == admins.end() || i->second != pass) { + cerr<<"Failed admin login.\n"; + return false; + } else { + if (cs.auth == true) + return false; + cs.auth = true; + return process(cs); + } + } else if (cmd == "STATUS") { + string srv; + iss>>srv; + std::ostringstream oss; + if (srv.empty()) { + serverstate ss; + ss.num = total_connections; + if (connections_allowed) { + if (total_connections) + ss.state = serverstate::active; + else + ss.state = serverstate::waiting; + } else { + if (total_connections) + ss.state = serverstate::shuttingdown; + else + ss.state = serverstate::shutdown; + } + oss< >::iterator i; + i = server::servers_by_name.find(srv); + if (i != server::servers_by_name.end()) + oss<second->get_state()<<"\n"; + else + oss<<"No such server.\n"; + } + cs.buf = oss.str(); + } else if (cmd == "START") { + string srv; + iss>>srv; + std::ostringstream oss; + map >::iterator i; + i = server::servers_by_name.find(srv); + if (i != server::servers_by_name.end()) { + i->second->enabled = true; + oss<second->get_state()<<"\n"; + } else + oss<<"No such server.\n"; + cs.buf = oss.str(); + } else if (cmd == "STOP") { + string srv; + iss>>srv; + std::ostringstream oss; + map >::iterator i; + i = server::servers_by_name.find(srv); + if (i != server::servers_by_name.end()) { + i->second->enabled = false; + i->second->maybekill(); + oss<second->get_state()<<"\n"; + } else + oss<<"No such server.\n"; + cs.buf = oss.str(); + } else if (cmd == "LIST") { + string state; + iss>>state; + map >::iterator i; + for (i = server::servers_by_name.begin(); + i != server::servers_by_name.end(); ++i) { + if (state.empty() || i->second->get_state() == state) + cs.buf += (cs.buf.empty()?"":" ") + i->first; + } + cs.buf += "\n"; + } else if (cmd == "SHUTDOWN") { + connections_allowed = false; + kill_old_servers(); + cs.buf = "ok\n"; + } else if (cmd == "CONNECTIONS") { + cs.buf = lexical_cast(total_connections) + "\n"; + } else if (cmd == "RELOAD") { + reload_conffile(conffile); + cs.buf = "ok\n"; + } else if (cmd == "STARTUP") { + connections_allowed = true; + cs.buf = "ok\n"; + } else { + return true; + } + cs.rdone = true; + return true; +} + +void +administrator::initialize(string const & ap) +{ + try { + int c = ap.find(":"); + string a = ap.substr(0, c); + int p = lexical_cast(ap.substr(c+1)); + port = start(a, p); + } catch (errstr & s) { + cerr<<"Could not initialize admin port: "< >::iterator i = conns.begin(); + i != conns.end(); ++i) { + int c = i->second; + if (!i->first.rdone) + FD_SET(c, &rd); + else + FD_SET(c, &wr); + maxfd = max(maxfd, int(c)); + } +} + +void +administrator::process_selected(fd_set & rd, fd_set & wr, fd_set & er) +{ + if (int(port) == -1) + return; + if (FD_ISSET(port, &rd)) { + try { + struct sockaddr_in addr; + unsigned int l = sizeof(addr); + memset(&addr, 0, l); + sock nc = tosserr(accept(port, (struct sockaddr *) + &addr, &l), "accept()"); + conns.push_back(make_pair(cstate(), nc)); + } catch(errstr & s) { + cerr<<"During new admin connection: "< >::iterator> del; + for (list >::iterator i = conns.begin(); + i != conns.end(); ++i) { + int c = i->second; + if (c <= 0) { +// cerr<<"Bad socket.\n"; + del.push_back(i); + } else if (FD_ISSET(c, &rd)) { + char buf[120]; + int n; + n = read(c, buf, 120); + if (n < 1) { +// cerr<<"Read failed.\n"; + del.push_back(i); + } + i->first.buf.append(buf, n); + if (!process(i->first)) { +// cerr<<"Closing connection...\n"; +// i->second.close(); + del.push_back(i); + } + } + else if (FD_ISSET(c, &wr)) { + int n = write(c, i->first.buf.c_str(), i->first.buf.size()); + if (n < 1) { +// cerr<<"Write failed.\n"; + del.push_back(i); + } else { + i->first.buf.erase(0, n); + if (i->first.buf.empty() && i->first.rdone) { +// cerr<<"Done.\n"; + del.push_back(i); + } + } + } + } + for (list >::iterator>::iterator i = del.begin(); + i != del.end(); ++i) { + conns.erase(*i); + } +} --- administrator.hh +++ administrator.hh @@ -0,0 +1,31 @@ +#ifndef __ADMINISTRATOR_HH_ +#define __ADMINISTRATOR_HH_ + +#include "sock.hh" + +#include +using std::list; +#include +using std::pair; + +struct fd_set; + +struct administrator +{ + sock port; + struct cstate + { + bool auth; + bool rdone; + string buf; + cstate(); + }; + list > conns; + administrator(); + bool process(cstate & cs); + void initialize(string const & ap); + void add_to_select(int & maxfd, fd_set & rd, fd_set & wr, fd_set & er); + void process_selected(fd_set & rd, fd_set & wr, fd_set & er); +}; + +#endif --- basic_io_serverlist_reader.cc +++ basic_io_serverlist_reader.cc @@ -0,0 +1,109 @@ +#include "basic_io_serverlist_reader.hh" + +#include + +basic_io_serverlist_reader::basic_io_serverlist_reader(string const &fn) + : filename(fn) +{ +} + +basic_io_serverlist_reader::~basic_io_serverlist_reader() +{ +} + +serverspec +basic_io_serverlist_reader::get_next() +{ + serverspec ss; + if (!reader) + { + ss.valid = false; + return ss; + } + basic_io::stanza st; + bool ok = reader->get(st); + if (st.items.empty()) + ok = false; + else if (st.items[0].key != "server") + return get_next(); + ss.valid = ok; + if (!ok) + return ss; + for (vector::iterator i = st.items.begin(); + i != st.items.end(); ++i) + { + if (i->key == "server") + { + if (i->values.empty()) + { + ss.valid = false; + return ss; + } + ss.name = i->values[0].parsed; + } + else if (i->key == "pattern") + { + for (vector::iterator j = i->values.begin(); + j != i->values.end(); ++j) + { + ss.patterns.insert(j->parsed); + } + } + else if (i->key == "host") + { + for (vector::iterator j = i->values.begin(); + j != i->values.end(); ++j) + { + ss.hosts.insert(j->parsed); + } + } + else if (i->key == "remote") + { + ss.local = false; + if (i->values.empty()) + { + ss.valid = false; + return ss; + } + string str = i->values[0].parsed; + unsigned int c = str.find(":"); + ss.remote_host = str.substr(0, c); + if (c != lp.npos) + ss.remote_port = boost::lexical_cast(str.substr(c+1)); + } + else if (i->key == "local") + { + ss.local = true; + for (vector::iterator j = i->values.begin(); + j != i->values.end(); ++j) + { + ss.local_args.push_back(j->parsed); + } + } + else + { + ss.valid = false; + return ss; + } + } + return ss; +} + +bool +basic_io_serverlist_reader::begin_reading() +{ + if (ifs.is_open()) + ifs.close(); + ifs.open(filename.c_str()); + vector delims; + delims.push_back("server"); + reader.reset(new basic_io::stanza_reader(ifs, delims)); + return ifs; +} + +void +basic_io_serverlist_reader::end_reading() +{ + reader.reset(); + ifs.close(); +} --- basic_io_serverlist_reader.hh +++ basic_io_serverlist_reader.hh @@ -0,0 +1,23 @@ +#ifndef __SERVERLIST_READER_HH_ +#define __SERVERLIST_READER_HH_ + +#include "serverlist_reader.hh" +#include "io.hh" + +#include +#include +#include + +struct basic_io_serverlist_reader : public serverlist_reader +{ + std::string filename; + std::ifstream ifs; + boost::scoped_ptr reader; + basic_io_serverlist_reader(string const &fn); + ~basic_io_serverlist_reader(); + virtual serverspec get_next(); + virtual bool begin_reading(); + virtual void end_reading(); +}; + +#endif --- buffer.cc +++ buffer.cc @@ -0,0 +1,71 @@ +#include "buffer.hh" +#include "errstr.hh" + +#include + +buffer::buffer() + : readpos(0), writepos(0) +{ + ptr = new char[buf_size]; +} + +buffer::buffer(buffer const & b) +{ + ptr = new char[buf_size]; + memcpy(ptr, b.ptr, buf_size); + readpos = b.readpos; + writepos = b.writepos; +} + +buffer::~buffer() +{ + delete[] ptr; +} + +bool +buffer::canread() +{ + return writepos > readpos; +} + +bool +buffer::canwrite() +{ + return writepos < buf_size; +} + +void +buffer::getread(char *& p, int & n) +{ + p = ptr + readpos; + n = writepos - readpos; +} + +void +buffer::getwrite(char *& p, int & n) +{ + p = ptr + writepos; + n = buf_size - writepos; +} + +void +buffer::fixread(int n) +{ + if (n < 0) throw errstr("negative read\n", 0); + readpos += n; + if (readpos == writepos) { + readpos = writepos = 0; + } else if (readpos > buf_reset_size) { + memcpy(ptr, ptr+readpos, writepos-readpos); + writepos -= readpos; + readpos = 0; + } +} + +void +buffer::fixwrite(int n) +{ + if (n < 0) + throw errstr("negative write\n", 0); + writepos += n; +} --- buffer.hh +++ buffer.hh @@ -0,0 +1,22 @@ +#ifndef __BUFFER_HH_ +#define __BUFFER_HH_ + +struct buffer +{ + static int const buf_size = 2048; + static int const buf_reset_size = 1024; + char * ptr; + int readpos; + int writepos; + buffer(); + ~buffer(); + buffer(buffer const & b); + bool canread(); + bool canwrite(); + void getread(char *& p, int & n); + void getwrite(char *& p, int & n); + void fixread(int n); + void fixwrite(int n); +}; + +#endif --- channel.cc +++ channel.cc @@ -0,0 +1,131 @@ +#include "channel.hh" + +#include + +channel(sock & c): num(++counter), + cli(c), srv(-1), + have_routed(false), no_server(false) +{ + char * dat; + int size; + make_packet(greeting, dat, size); + char *p; + int n; + sbuf.getwrite(p, n); + if (n < size) size = n; + memcpy(p, dat, size); + sbuf.fixwrite(size); + delete[] dat; + + cli.write_from(sbuf); +} +~channel() +{ + if (who && !no_server) + who->disconnect(); +} +bool is_finished() +{ + return (cli == -1) && (srv == -1); +} +void add_to_select(int & maxfd, fd_set & rd, fd_set & wr, fd_set & er) +{ + int c = cli; + int s = srv; + + if (c > 0) { + FD_SET(c, &er); + if (cbuf.canwrite()) + FD_SET(c, &rd); + if (sbuf.canread()) + FD_SET(c, &wr); + maxfd = max(maxfd, c); + } + if (s > 0) { + FD_SET(s, &er); + if (sbuf.canwrite()) + FD_SET(s, &rd); + if (cbuf.canread()) + FD_SET(s, &wr); + maxfd = max(maxfd, s); + } +} +bool process_selected(fd_set & rd, fd_set & wr, fd_set & er) +{ + int c = cli; + int s = srv; +/* NB: read oob data before normal reads */ + if (c > 0 && FD_ISSET(c, &er)) { + char d; + errno = 0; + if (recv(c, &d, 1, MSG_OOB) < 1) + cli.close(), c = -1; + else + send(s, &d, 1, MSG_OOB); + } + if (s > 0 && FD_ISSET(s, &er)) { + char d; + errno = 0; + if (recv(s, &d, 1, MSG_OOB) < 1) + srv.close(), s = -1; + else + send(c, &d, 1, MSG_OOB); + } + + char *p=0; + int n; + + if (c > 0 && FD_ISSET(c, &rd)) { + if (!cli.read_to(cbuf)) c = -1; + if (!have_routed) { + string reply_srv, reply_pat; + if (extract_reply(cbuf, reply_srv, reply_pat)) { + who = get_server(reply_srv, reply_pat); + if (who && who->enabled) { + try { + srv = who->connect(); + have_routed = true; + s = srv; + } catch (errstr & e) { + cerr<<"cannot contact server "<name()<<"\n"; + no_server = true; + } + } else { + char * dat; + int size; + sbuf.getwrite(p, n); + if (who) + make_packet(srvdisabled, dat, size); + else + make_packet(notfound, dat, size); + if (n < size) size = n; + memcpy(p, dat, size); + sbuf.fixwrite(size); + delete[] dat; + no_server = true; + } + } + } + } + if (s > 0 && FD_ISSET(s, &rd)) { + if (!srv.read_to(sbuf)) s = -1; + } + + if (c > 0 && FD_ISSET(c, &wr)) { + if (!cli.write_from(sbuf)) c = -1; + } + if (s > 0 && FD_ISSET(s, &wr)) { + if (!srv.write_from(cbuf)) s = -1; + } + + // close sockets we have nothing more to send to + if (c < 0 && !cbuf.canread()) { + srv.close(), s = -1; + } + if ((no_server || have_routed && s < 0) && !sbuf.canread()) { + cli.close(), c = -1; + } + return true; +} + +int channel::counter = 0; --- channel.hh +++ channel.hh @@ -0,0 +1,31 @@ +#ifndef __CHANNEL_HH_ +#define __CHANNEL_HH_ + +#include "sock.hh" +#include "buffer.hh" +#include "server.hh" + +#include +using boost::shared_ptr; + +struct fd_set; + +struct channel +{ + static int counter; + int num; + sock cli; + sock srv; + bool have_routed; + bool no_server; + buffer cbuf; + buffer sbuf; + shared_ptr who; + channel(sock & c); + ~channel(); + bool is_finished(); + void add_to_select(int & maxfd, fd_set & rd, fd_set & wr, fd_set & er); + bool process_selected(fd_set & rd, fd_set & wr, fd_set & er); +}; + +#endif --- err.cc +++ err.cc @@ -0,0 +1,22 @@ +#include "err.hh" + +#include + +errstr::errstr(std::string const & s) + : name(s), err(0) +{ +} + +errstr::errstr(std::string const & s, int e) + : name(s), err(e) +{ +} + +int tosserr(int ret, std::string const & name) +{ + if (ret == -1) + throw errstr(name, errno); + if (ret < 0) + throw errstr(name, ret); + return ret; +} --- err.hh +++ err.hh @@ -0,0 +1,14 @@ +#ifndef __ERR_HH_ +#define __ERR_HH_ + +struct errstr +{ + std::string name; + int err; + errstr(std::string const & s); + errstr(std::string const & s, int e); +}; + +int tosserr(int ret, std::string const & name); + +#endif --- errstr.hh +++ errstr.hh @@ -0,0 +1,17 @@ + +struct errstr +{ + std::string name; + int err; + errstr(std::string const & s): name(s), err(0) {} + errstr(std::string const & s, int e): name(s), err(e) {} +}; + +int tosserr(int ret, std::string const & name) +{ + if (ret == -1) + throw errstr(name, errno); + if (ret < 0) + throw errstr(name, ret); + return ret; +} --- io.cc +++ io.cc @@ -0,0 +1,157 @@ +// 2006-04-13 Timothy Brownawell +// +// The basic_io implementation in monotone represents basic_io as a +// token stream. This represents it as a data structure. +// Also, this allows i/o to/from iostreams, not just strings. + +#include "io.hh" + + +using namespace basic_io; + +istream &basic_io::operator>>(istream &in, value &v) +{ + v.literal.clear(); + v.parsed.clear(); + char c; + in.get(c); + if (c == '[') + { + v.literal = c; + while (in.get(c) && std::isxdigit(c)) + { + v.literal += c; + v.parsed += c; + } + if (c == ']') + v.literal += c; + else + in.setstate(std::ios_base::failbit);//parse error + } + else if (c == '"') + { + v.literal = c; + while (in.get(c) && c != '"') + { + if (c == '\\') + { + v.literal += c; + in.get(c); + } + v.literal += c; + v.parsed += c; + } + if (c == '"') + v.literal += c; + else + in.setstate(std::ios_base::failbit);//parse error + } + else + { + in.unget(); + } + return in; +} +ostream &basic_io::operator<<(ostream &out, value &v) +{ + out<>(istream &in, item &i) +{ + i.key.clear(); + i.values.clear(); + bool got_key = false, done = false; + char c; + in >> c; + if (!in) + return in; + in.unget(); + do + { + if ((c == '[' || c == '"') && got_key) + { + value v; + in >> v; + i.values.push_back(v); + } + else if (!got_key && c != '[' && c != '"') + { + in >> i.key; + i.longest = i.key.size(); + got_key = true; + } + else + done = true; + bool ok = in; + in >> c; + in.unget(); + if (ok && !in) + { + in.clear(); + done = true; + } + } + while (!done && in); + return in; +} +ostream &basic_io::operator<<(ostream &out, item const &i) +{ + out<::const_iterator j = i.values.begin(); + j != i.values.end(); ++j) + { + out << " " << j->literal; + } + out<<"\n"; + return out; +} + +void +stanza::compute_longest() +{ + unsigned int longest=0; + for (vector::const_iterator i = items.begin(); + i != items.end(); ++i) + { + longest=(i->key.size()>longest?i->key.size():longest); + } + for (vector::iterator i = items.begin(); + i != items.end(); ++i) + { + i->longest = longest; + } +} +ostream &basic_io::operator<<(ostream &out, stanza const &st) +{ + for (vector::const_iterator i = st.items.begin(); + i != st.items.end(); ++i) + { + out << *i; + } + return out; +} +bool +stanza_reader::get(stanza &st) +{ + st.items.clear(); + while ((st.items.empty() && !last_read.key.empty()) || is >> last_read) + { + if (delims.find(last_read.key) == delims.end() || st.items.empty()) + st.items.push_back(last_read); + else + { + st.compute_longest(); + return true; + } + } + if (!is && !st.items.empty()) + { + st.compute_longest(); + return true; + } + return false; +} --- io.hh +++ io.hh @@ -0,0 +1,57 @@ +#ifndef __IO_HH_ +#define __IO_HH_ +// 2006-04-13 Timothy Brownawell +// +// The basic_io implementation in monotone represents basic_io as a +// token stream. This represents it as a data structure. +// Also, this allows i/o to/from iostreams, not just strings. + +#include +#include +#include +#include + +namespace basic_io { + using std::string; + using std::vector; + using std::istream; + using std::ostream; + using std::set; + struct value + { + string literal, parsed; + }; + istream &operator>>(istream &in, value &v); + ostream &operator<<(ostream &out, value &v); + struct item + { + string key; + vector values; + int longest; + }; + istream &operator>>(istream &in, item &i); + ostream &operator<<(ostream &out, item const &i); + + struct stanza + { + vector items; + void compute_longest(); + }; + ostream &operator<<(ostream &out, stanza const &st); + // There is no operator>>(istream, stanza), because that operation + // requires knowing what the stanza delimiters are. + // So we have a stanza_reader instead. + struct stanza_reader + { + istream &is; + set delims; + item last_read; + stanza_reader(istream &i, vector const &d) + : is(i), delims(d.begin(), d.end()) + { + } + bool get(stanza &st); + }; +} + +#endif --- server.cc +++ server.cc @@ -0,0 +1,239 @@ +#include "server.hh" + +int fork_server(vector const & args); + +serverstate::serverstate(): state(unknown), num(0) {} +serverstate const & serverstate::operator=(string const & s) +{ + if (s == "REMOTE") + state = remote; + else if (s == "ACTIVE") + state = active; + else if (s == "WAITING") + state = waiting; + else if (s == "SLEEPING") + state = sleeping; + else if (s == "STOPPING") + state = stopping; + else if (s == "STOPPED") + state = stopped; + else if (s == "SHUTTINGDOWN") + state = shuttingdown; + else if (s == "SHUTDOWN") + state = shutdown; + return *this; +} +bool serverstate::operator==(string const & s) +{ + serverstate foo; + foo = s; + return foo.state == state; +} + +std::ostream & operator<<(std::ostream & os, serverstate const & ss) +{ + switch (ss.state) { + case serverstate::remote: + os<<"REMOTE"; + break; + case serverstate::active: + os<<"ACTIVE "< 0 || port == 0) + find_addr(addr, port); + vector args; + args.push_back(monotone); + args.push_back("serve"); + args.push_back("--bind=" + addr + ":" + lexical_cast(port)); + unsigned int n = 0, m = 0; + n = arguments.find_first_not_of(" \t"); + while (n != string::npos && m != string::npos) { + m = arguments.find_first_of(" ", n); + args.push_back(arguments.substr(n, m-n)); + n = arguments.find_first_not_of(" ", m); + } + pid = fork_server(args); + } + } + sock s = make_outgoing(port, addr); + if (local && !connection_count) { + live_servers.insert(by_name->second); + } + ++connection_count; + return s; +} + +void +server::disconnect() +{ + if (--connection_count || !local) + return; + last_conn_time = time(0); + maybekill(); +} + +void +server::maybekill() +{ + if (!local) + return; + if (pid == -1) + return; + int difftime = time(0) - last_conn_time; + if (!connection_count + && (difftime > server_idle_timeout || !connections_allowed)) + yeskill(); + else if (waitpid(pid, 0, WNOHANG) > 0) { + pid = -1; + port = 0; + } +} + +void +server::yeskill() +{ + if (local && pid != -1) { + kill(pid, SIGTERM); + int r; + do {r = waitpid(pid, 0, 0);} while (r==-1 && errno == EINTR); + pid = -1; + port = 0; + live_servers.erase(live_servers.find(by_name->second)); + } +} + +int fork_server(vector const & args) +{ + int err[2]; + if (pipe(err) < 0) + return false; + int pid = fork(); + if (pid == -1) { + close(err[0]); + close(err[1]); + cerr<<"Failed to fork server.\n"; + return false; + } else if (pid == 0) { + close(err[0]); + close(0); + close(1); + close(2); + sock::close_all_socks(); + if (dup2(err[1], 2) < 0) { + exit(1); + } + + char ** a = new char*[args.size()+1]; + for (unsigned int i = 0; i < args.size(); ++i) { + a[i] = new char[args[i].size()+1]; + memcpy(a[i], args[i].c_str(), args[i].size()+1); + } + a[args.size()] = 0; + + // Must set C locale, because usher interprets the output from + // monotone itself, and only deals with english! + setlocale(LC_ALL,"C"); + setlocale(LC_MESSAGES,"C"); + setlocale(LC_NUMERIC,"C"); + setenv("LC_ALL","C",1); + + execvp(a[0], a); + perror("execvp failed"); + exit(1); + } else { + close(err[1]); + char head[256]; + int got = 0; + int r = 0; + bool line = false; + // the first line output on the server's stderr will be either + // "mtn: beginning service on : " or + // "mtn: network error: bind(2) error: Address already in use" + do { + r = read(err[0], head + got, 256 - got); + if (r) + cerr<<"Read '"< 0) { + for (int i = 0; i < r && !line; ++i) + if (head[got+i] == '\n') + line = true; + got += r; + } + } while(r > 0 && !line && got < 256); + head[got] = 0; + if (string(head).find("beginning service") != string::npos) + return pid; + kill(pid, SIGKILL); + do {r = waitpid(pid, 0, 0);} while (r==-1 && errno == EINTR); + return -1; + } +} --- server.hh +++ server.hh @@ -0,0 +1,40 @@ +#ifndef __SERVER_HH_ +#define __SERVER_HH_ +] +#include "sock.hh" + +#include +using std::string; + +struct serverstate +{ + enum ss {remote, active, waiting, sleeping, stopping, + stopped, shuttingdown, shutdown, unknown}; + ss state; + int num; + serverstate(); + serverstate const & operator=(string const & s); + bool operator==(string const & s); +}; +std::ostream & operator<<(std::ostream & os, serverstate const & ss); + +struct server +{ + bool enabled; + bool local; + int pid; + vector arguments; + string addr; + int port; + int connection_count; + int last_conn_time; + server(); + ~server(); + serverstate get_state(); + sock connect(); + void disconnect(); + void maybekill(); + void yeskill(); +}; + +#endif --- server_manager.cc +++ server_manager.cc @@ -0,0 +1,187 @@ +#include "server_manager.hh" +#include "serverlist_reader.hh" + +#include + +server_manager::server_manager(serverlist_reader &r) + : reader(r) +{ +} + +server_manager::prefix::prefix(string const &s) + : str(s), ok_shorter(true) {} +server_manager::prefix::prefix(string const &s, bool b) + : str(s), ok_shorter(b) {} +server_manager::prefix::~prefix() {} +server_manager::prefix::operator string() {return str;} +int server_manager::prefix::cmp(prefix const &r) +{ + unsigned int size = min(str.size(), r.str.size()); + int c = strncmp(str.c_str(), r.str.c_str(), size); + int longer = str.size() - r.str.size(); + if (ok_shorter == r.ok_shorter) + return (c==0)?longer:c; + else if (ok_shorter) + return (c==0)?max(longer,0):c; + else // (r.ok_shorter) + return (c==0)?min(longer,0):c; +} +bool server_manager::prefix::operator<(prefix const &r) +{ + return cmp(r) < 0; +} +bool server_manager::prefix::operator==(prefix const &r) +{ + return cmp(r) == 0; +} +bool server_manager::prefix::operator>(prefix const &r) +{ + return cmp(r) > 0; +} + +void +server_manager::add_replace_server(shared_ptr srv, serverdata const &dat) +{ + servers.insert(make_pair(srv, dat)); + by_name.insert(make_pair(dat.name, srv)); + for (set::iterator i = dat.hosts.begin(); + i != dat.hosts.end(); ++i) + by_host.insert(make_pair(*i, srv)); + for (set::iterator i = dat.patterns.begin(); + i != dat.patterns.end(); ++i) + by_pattern.insert(make_pair(*i, srv)); +} + +void +server_manager::delist_server(shared_ptr srv) +{ + map >::iterator j = by_name.find(i->second.name); + if (j == by_name.end()) + return; + serverdata sd; + map, serverdata>::iterator k = servers.find(j->second); + if (k != servers.end()) + { + sd = k->second; + servers.erase(k); + } + by_name.erase(j); + map >::iterator i; + for (set::iterator j = sd.hosts.begin(); + j != sd.hosts.end(); ++j) + { + i = by_host.find(*j); + if (i != by_host.end()) + by_host.erase(i); + } + for (set::iterator j = sd.patterns.begin(); + j != sd.patterns.end(); ++j) + { + i = by_pattern.find(*j); + if (i != by_pattern.end()) + by_pattern.erase(i); + } +} + +void +server_manager::reload_servers() +{ + if (!reader.begin_reading()) + return; + serverspec ss; + set all; + for (map >::iterator i = by_name.begin(); + i != by_name.end(); ++i) + all.insert(i->first); + while ((ss = reader.get_next()).valid) + { + shared_ptr srv; + set::iterator i = all.find(ss.name); + bool preexist(false); + if (i != all.end()) + { + all.erase(i); + srv = by_name.find(ss.name)->second; + preexist = true; + } + else + srv.resert(new server()); + srv->local = ss.local; + if (ss.local) + { + srv->arguments = ss.local_args; + } + else + { + srv->addr = ss.remote_host; + srv->port = ss.remote_port; + } + serverdata sd; + sd.name = ss.name; + sd.hosts = ss.hosts; + sd.patterns = ss.patterns; + if (preexist) + { + map, serverdata>::iterator i = servers.find(srv); + if (i->second == sd) + continue; + else + delist_server(ss.name); + } + add_replace_server(srv, sd); + } + reader.end_reading(); + for (set::iterator j = all.begin(); + j != all.end(); ++j) + { + map >::iterator i = by_name.find(*j); + if (i != b_name.end()) + delist_server(i->second); + } +} + +serversock +server_manager::connect_to_server(string const &host, + string const &pattern) +{ + if (!connections_allowed) + throw errstr("All servers are disabled."); + shared_ptr srv; + map >::iterator i; + if (!host.empty() && !by_host.empty()) + { + i = --by_host.upper_bound(host); + if (i->first == prefix(host)) + srv = i->second; + } + if (!srv && !pattern.empty() && !by_pattern.empty()) + { + i = --by_pattern.upper_bound(pattern); + if (i->first == prefix(host)) + srv = i->second; + } + if (srv) + { + sock s = srv->connect(); + serversock ss(s); + ss.server = srv; + ++total_connections; + return ss; + } + else + throw errstr("Cannot find server."); +} +void +server_manager::disconnect_from_server(serversock const &s) +{ + s.server->disconnect(); + --total_connections; +} +serverstate +server_manager::get_server_state(string const &name) +{ + map >::iterator i = by_name.find(name); + if (i != by_name.end()) + return i->second.get_state(); + throw errstr("No such server."); +} --- server_manager.hh +++ server_manager.hh @@ -0,0 +1,62 @@ +#ifndef __SERVER_MANAGER_HH_ +#define __SERVER_MANAGER_HH_ + +#include "sock.hh" + +#include +using std::string; +#include +using std::set; +#include +using std::map; + +#include +using boost::shared_ptr; + +struct serverlist_reader; + +struct serversock : public sock +{ + shared_ptr srv; +}; + +struct server_manager +{ + struct serverdata + { + string name; + set hosts, patterns; + }; + struct prefix + { + string str; + bool ok_shorter; + prefix(string const &s); + prefix(string const &s, bool b); + ~prefix(); + operator string(); + int cmp(prefix const &r); + bool operator<(prefix const &r); + bool operator==(prefix const &r); + bool operator>(prefix const &r); + }; + map > by_host, by_pattern; + map > by_name; + set > live; + map, serverdata> servers; + bool connections_allowed; + int total_connections; + serverlist_reader &reader; + + server_manager(serverlist_reader &r); + + void add_replace_server(shared_ptr srv, serverdata const &dat); + void delist_server(shared_ptr srv); + + void reload_servers(); + serversock connect_to_server(string const &host, string const &pattern); + void disconnect_from_server(serversock const &s); + serverstate get_server_state(string const &name); +}; + +#endif --- serverlist_reader.hh +++ serverlist_reader.hh @@ -0,0 +1,23 @@ +#ifndef __SERVERLIST_READER_HH_ +#define __SERVERLIST_READER_HH_ + +struct serverspec +{ + string name; + set hosts, patterns; + bool local; + bool valid; + vector local_args; + string remote_host; + int remote_port; +}; + +struct serverlist_reader +{ + virtual ~serverlist_reader(); + virtual serverspec get_next() = 0; + virtual bool begin_reading() = 0; + virtual void end_reading() = 0; +}; + +#endif --- sock.cc +++ sock.cc @@ -0,0 +1,110 @@ +#include "sock.hh" +#include "buffer.hh" + +#include + +sock::operator int() +{ + if (!s) + return -1; + else + return s[0]; +} +sock::sock(int ss) +{ + s = new int[2]; + s[0] = ss; + s[1] = 1; + all_socks.insert(s); +} +sock::sock(sock const & ss) +{ + s = ss.s; + if (s) + s[1]++; +} +void sock::deref() +{ + if (s && !(--s[1])) { + try { + close(); + } catch(errstr & e) { + // if you want it to throw errors, call close manually + } + delete[] s; + all_socks.erase(all_socks.find(s)); + } + s = 0; +} +sock::~sock() +{ + deref(); +} +sock const & sock::operator=(int ss) +{ + deref(); + s = new int[2]; + all_socks.insert(s); + s[0]=ss; + return *this; +} +sock const & sock::operator=(sock const & ss) +{ + deref(); + s = ss.s; + if (s) + ++s[1]; + return *this; +} +void sock::close() +{ + if (!s || s[0] == -1) + return; + shutdown(s[0], SHUT_RDWR); + while (::close(s[0]) < 0) { + if (errno == EIO) + throw errstr("close failed", errno); + if (errno != EINTR) + break; + } + s[0]=-1; +} +static void sock::close_all_socks() +{ + for (set::iterator i = all_socks.begin(); i != all_socks.end(); ++i) { + while (::close((*i)[0]) < 0) + if (errno != EINTR) + break; + } +} +bool sock::read_to(buffer & buf) +{ + if (!s) + return false; + char *p; + int n; + buf.getwrite(p, n); + n = read(s[0], p, n); + if (n < 1) { + close(); + return false; + } else + buf.fixwrite(n); + return true; +} +bool sock::write_from(buffer & buf) +{ + if (!s) + return false; + char *p; + int n; + buf.getread(p, n); + n = write(s[0], p, n); + if (n < 1) { + close(); + return false; + } else + buf.fixread(n); + return true; +} +set sock::all_socks; --- sock.hh +++ sock.hh @@ -0,0 +1,23 @@ +#ifndef __SOCK_HH_ +#define __SOCK_HH_ + +struct buffer; + +struct sock +{ + int *s; + static set all_socks; + operator int(); + sock(int ss); + sock(sock const & ss); + void deref(); + ~sock(); + sock const & operator=(int ss); + sock const & operator=(sock const & ss); + void close(); + static void close_all_socks(); + bool read_to(buffer & buf); + bool write_from(buffer & buf); +}; + +#endif --- usher.cc +++ usher.cc @@ -0,0 +1,453 @@ +// -*- mode: C++; c-file-style: "gnu"; indent-tabs-mode: nil -*- +// Timothy Brownawell +// GPL v2 + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +using std::vector; +using std::max; +using std::string; +using std::list; +using std::set; +using std::map; +using boost::lexical_cast; +using boost::shared_ptr; +using std::cerr; +using std::pair; +using std::make_pair; + +// defaults, overridden by command line +int listenport = 4691; +string listenaddr = "0.0.0.0"; +string monotone = "mtn"; + +// keep local servers around for this many seconds after the last +// client disconnects from them (only accurate to ~10 seconds) +int const server_idle_timeout = 60; + +// ranges that dynamic (local) servers can be put on +int const minport = 15000; +int const maxport = 65000; +int const minaddr[] = {127, 0, 1, 1}; +int const maxaddr[] = {127, 254, 254, 254}; +int currport = 0; +int curraddr[] = {0, 0, 0, 0}; + +char const netsync_version = 6; + +string const greeting = " Hello! This is the monotone usher at localhost. What would you like?"; + +string const notfound = "!Sorry, I don't know where to find that."; + +string const disabled = "!Sorry, this usher is not currently accepting connections."; + +string const srvdisabled = "!Sorry, that server is currently disabled."; + + +// packet format is: +// byte version +// byte cmd {100 if we send, 101 if we receive} +// uleb128 {size of everything after this} +// uleb128 {size of string} +// string +// { +// uleb128 {size of string} +// string +// } // only present if we're receiving + +// uleb128 is +// byte 0x80 | +// byte 0x80 | +// ... +// byte 0xff & +// +// the high bit says that this byte is not the last + +void make_packet(std::string const & msg, char * & pkt, int & size) +{ + size = msg.size(); + char const * txt = msg.c_str(); + char header[6]; + header[0] = netsync_version; + header[1] = 100; + int headersize; + if (size >= 128) { + header[2] = 0x80 | (0x7f & (char)(size+2)); + header[3] = (char)((size+2)>>7); + header[4] = 0x80 | (0x7f & (char)(size)); + header[5] = (char)(size>>7); + headersize = 6; + } else if (size >= 127) { + header[2] = 0x80 | (0x7f & (char)(size+1)); + header[3] = (char)((size+1)>>7); + header[4] = (char)(size); + headersize = 5; + } else { + header[2] = (char)(size+1); + header[3] = (char)(size); + headersize = 4; + } + pkt = new char[headersize + size]; + memcpy(pkt, header, headersize); + memcpy(pkt + headersize, txt, size); + size += headersize; +} + + +bool check_address_empty(string const & addr, int port) +{ + sock s = tosserr(socket(AF_INET, SOCK_STREAM, 0), "socket()"); + int yes = 1; + tosserr(setsockopt(s, SOL_SOCKET, SO_REUSEADDR, + &yes, sizeof(yes)), "setsockopt"); + sockaddr_in a; + memset (&a, 0, sizeof (a)); + if (!inet_aton(addr.c_str(), (in_addr *) &a.sin_addr.s_addr)) + throw errstr("bad ip address format", 0); + a.sin_port = htons(port); + a.sin_family = AF_INET; + int r = bind(s, (sockaddr *) &a, sizeof(a)); + s.close(); + return r == 0; +} + +void find_addr(string & addr, int & port) +{ + if (currport == 0) { + currport = minport-1; + for(int i = 0; i < 4; ++i) + curraddr[i] = minaddr[i]; + } + do { + // get the next address in our list + if (++currport > maxport) { + currport = minport; + for (int i = 0; i < 4; ++i) { + if (++curraddr[i] <= maxaddr[i]) + break; + curraddr[i] = minaddr[i]; + } + } + port = currport; + addr = lexical_cast(curraddr[0]) + "." + + lexical_cast(curraddr[1]) + "." + + lexical_cast(curraddr[2]) + "." + + lexical_cast(curraddr[3]); + } while (!check_address_empty(addr, port)); +} + +sock start(string addr, int port) +{ + sock s = tosserr(socket(AF_INET, SOCK_STREAM, 0), "socket()"); + int yes = 1; + tosserr(setsockopt(s, SOL_SOCKET, SO_REUSEADDR, + &yes, sizeof(yes)), "setsockopt"); + sockaddr_in a; + memset (&a, 0, sizeof (a)); + if (!inet_aton(addr.c_str(), (in_addr *) &a.sin_addr.s_addr)) + throw errstr("bad ip address format", 0); + a.sin_port = htons(port); + a.sin_family = AF_INET; + tosserr(bind(s, (sockaddr *) &a, sizeof(a)), "bind"); + cerr<<"bound to "< 0) + out.append(buf, got); + } while (in.fail() && !in.eof()); + return out; +} + + +void kill_old_servers() +{ + set >::iterator i; + for (i = server::live_servers.begin(); i != server::live_servers.end(); ++i) { + (*i)->maybekill(); + } +} + +int extract_uleb128(char *p, int maxsize, int & out) +{ + out = 0; + int b = 0; + unsigned char got; + do { + if (b == maxsize) + return -1; + got = p[b]; + out += ((int)(p[b] & 0x7f))<<(b*7); + ++b; + } while ((unsigned int)(b*7) < sizeof(int)*8-1 && (got & 0x80)); + return b; +} + +int extract_vstr(char *p, int maxsize, string & out) +{ + int size; + out.clear(); + int chars = extract_uleb128(p, maxsize, size); + if (chars == -1 || chars + size > maxsize) { + return -1; + } + out.append(p+chars, size); + return chars+size; +} + +bool extract_reply(buffer & buf, string & host, string & pat) +{ + char *p; + int n, s; + buf.getread(p, n); + if (n < 4) return false; + p += 2; // first 2 bytes are header + n -= 2; + // extract size, and make sure we have the entire packet + int pos = extract_uleb128(p, n, s); + if (pos == -1 || n < s+pos) { + return false; + } + // extract host vstr + int num = extract_vstr(p+pos, n-pos, host); + if (num == -1) { + return false; + } + pos += num; + // extract pattern vstr + num = extract_vstr(p+pos, n-pos, pat); + if (num == -1) { + cerr<<"old-style reply.\n"; + pat = host; + host.clear(); + return true; + } + pos += num; + buf.fixread(pos+2); + return true; +} + +bool reload_pending; +map admins; +string conffile; + +void reload_conffile(string const & file) +{ + reload_pending = false; + cerr<<"Reloading config file...\n"; + std::ifstream cf(file.c_str()); + + string line = getline(cf); + while (!line.empty()) { + std::istringstream iss(line); + string a, b, c; + iss>>a>>b>>c; + if (a == "userpass") + admins.insert(make_pair(b, c)); + line = getline(cf); + } + +} +void sched_reload(int sig) +{ + reload_pending = true; +} + +struct pidfile +{ + string filename; + void initialize(string const & file) + { + filename = file; + std::ofstream ofs(filename.c_str()); + ofs<(lp.substr(c+1)); + } else if (string(argv[i]) == "-m") + monotone = argv[++i]; + else if (string(argv[i]) == "-a") + admin.initialize(argv[++i]); + else if (string(argv[i]) == "-p") + pf.initialize(argv[++i]); + else + conffile = argv[i]; + } + if (conffile.empty() || i != argc) { + cerr<<"Usage:\n"; + cerr<<"\tusher [-l addr[:port]] [-a addr:port] \n"; + exit (1); + } + } + reload_conffile(conffile); + + + struct sigaction sa, sa_old; + sa.sa_handler = &sched_reload; + sigemptyset(&sa.sa_mask); + sa.sa_flags = 0; + while(sigaction(SIGHUP, &sa, &sa_old) == -1 && errno == EINTR); + sa.sa_handler = SIG_IGN; + while(sigaction(SIGPIPE, &sa, &sa_old) == -1 && errno == EINTR); + sa.sa_handler = sig_end; + while(sigaction(SIGTERM, &sa, &sa_old) == -1 && errno == EINTR); + while(sigaction(SIGINT, &sa, &sa_old) == -1 && errno == EINTR); + + sock h(-1); + try { + h = start(listenaddr, listenport); + } catch (errstr & s) { + std::cerr<<"Error while opening socket: "< channels; + + done = false; + while (!done) { + fd_set rd, wr, er; + FD_ZERO (&rd); + FD_ZERO (&wr); + FD_ZERO (&er); + FD_SET (h, &rd); + int nfds = h; + channel *newchan = 0; + + for (std::list::iterator i = channels.begin(); + i != channels.end(); ++i) + i->add_to_select(nfds, rd, wr, er); + + admin.add_to_select(nfds, rd, wr, er); + + timeval timeout; + timeout.tv_sec = 10; + timeout.tv_usec = 0; + int r = select(nfds+1, &rd, &wr, &er, &timeout); + + if (r < 0 && errno != EINTR) { + perror ("select()"); + exit (1); + } + if (done) + return 0; + if (FD_ISSET(h, &rd)) { + try { + struct sockaddr_in client_address; + unsigned int l = sizeof(client_address); + memset(&client_address, 0, l); + sock cli = tosserr(accept(h, (struct sockaddr *) + &client_address, &l), "accept()"); + if (connections_allowed) + newchan = new channel(cli); + else { + char * dat; + int size; + make_packet(disabled, dat, size); + write(cli, dat, size); + delete[] dat; + } + } catch(errstr & s) { + cerr<<"During new connection: "<::iterator> finished; + for (std::list::iterator i = channels.begin(); + i != channels.end(); ++i) { + try { + i->process_selected(rd, wr, er); + if (i->is_finished()) + finished.push_back(i); + } catch (errstr & e) { + finished.push_back(i); + cerr<<"Error proccessing connection "<num<<": "<::iterator>::iterator i = finished.begin(); + i != finished.end(); ++i) + channels.erase(*i); + if (newchan) { + channels.push_back(*newchan); + delete newchan; + newchan = 0; + } + kill_old_servers(); + if (reload_pending) + reload_conffile(conffile); + + admin.process_selected(rd, wr, er); + } + return 0; +} --- usher.txt +++ usher.txt @@ -0,0 +1,118 @@ + +This is an "usher" to allow multiple monotone servers to work from +the same port. It asks the client what it wants to sync, +and then looks up the matching server in a table. It then forwards +the connection to that server. All servers using the same usher need +to have the same server key. + +This requires cooperation from the client, which means it only works +for recent (0.23 or later) clients. In order to match against hostnames +a post-0.23 client is needed (0.23 clients can only be matched against +their include pattern). + +Usage: usher [-l address[:port]] [-a address:port] [-p pidfile] + +options: +-m the monotone command, defaults to "mtn" +-l address and port to listen on, defaults to 0.0.0.0:4691 +-a address and port to listen for admin commands +-p a file (deleted on program exit) to record the pid of the usher in + a file that looks like + userpass username password + + server monotone + host localhost + pattern net.venge.monotone + remote 66.96.28.3:4691 + + server local + host 127.0.0.1 + pattern * + local -d /usr/local/src/managed/mt.db~ * + +or in general, one block of one or more lines of + userpass +followed by any number of blocks of a + server +line followed by one or more + host +lines and/or one or more + pattern +lines, and one of + remote + local +, with blocks separated by blank lines + +"userpass" lines specify who is allowed to use the administrative port. + +A request to server "hostname" will be directed to the +server at :, if that stem is marked as remote, +and to a local server managed by the usher, started with the given +arguments ("mtn serve --bind=something "), if it is +marked as local. +Note that "hostname" has to be an initial substring of who the client asked +to connect to, but does not have to match exactly. This means that you don't +have to know in advance whether clients will be asking for + or : . + + +Admin commands + +If the -a option is given, the usher will listen for administrative +connections on that port. The connecting client gives commands of the form + COMMAND [arguments] +, and after any command except USERPASS the usher will send a reply and +close the connection. The reply will always end with a newline. + +USERPASS username password +Required before any other command, so random people can't do bad things. +If incorrect, the connection will be closed immediately. + +STATUS [servername] +Get the status of a server, as named by the "server" lines in the +config file. If a server is specified, the result will be one of: + REMOTE - this is a remote server without active connections + ACTIVE n - this server currently has n active connections + WAITING - this (local) server is running, but has no connections + SLEEPING - this (local) server is not running, but is available + STOPPING n - this (local) server has been asked to stop, but still has + n active connections. It will not accept further connections. + STOPPED - this (local) server has been stopped, and will not accept + connections. The server process is not running. + SHUTTINGDOWN - the usher has been shut down, no servers are accepting + connections. + SHUTDOWN - the usher has been shut down, all connections have been closed, + and all local server processes have been stopped. +If no server is specified, the repsonse will be SHUTTINGDOWN, SHUTDOWN, +WAITING, or ACTIVE (with n being the total number of open connections, +across all servers). + +STOP servername +Prevent the given local server from receiving further connections, and stop +it once all connections are closed. The result will be the new status of +that server: ACTIVE local servers become STOPPING, and WAITING and SLEEPING +servers become STOPPED. Servers in other states are not affected. + +START servername +Allow a stopped or stopping server to receive connections again. The result +will be the new status of that server. (A server in the "STOPPING" state +becomes ACTIVE, and a STOPPED server becomes SLEEPING. A server in some +other state is not affected.) + +LIST [state] +Returns a space-separated list of all servers. If a state is given, only +list the servers that are in that state. + +SHUTDOWN +Do not accept new connections for any servers, local or remote. Returns "ok". + +STARTUP +Begin accepting connections again after a SHUTDOWN. Returns "ok". + +CONNECTIONS +Returns the number of connections currently open. + +RELOAD +Reload the config file (same as sending SIGHUP). The reply will be "ok", +and will not be given until the config file has been reloaded. +