# # # patch "cmd_scgi.cc" # from [73b6d790262a839810f04d79d886a56e9e12f917] # to [b526e6a8d5d3e6f20689fb80511fd67c3ba30512] # # patch "gsync.cc" # from [fa8fcd0b060bca33ac0ae1032f447b050ed381f7] # to [d81cf073a1d92362cd99c270106b18f468bcfd6d] # # patch "gsync.hh" # from [c7544438a00e6d3224c9886a8fbc00ed3abbcb18] # to [dff14231ff185742fdbd7941cdb44c0e94218edd] # # patch "http_client.cc" # from [6561236ce5d96d312751a03ce24d5de94f1d5664] # to [0d6e55b1421bcd7a4047fe5fc4b0b7d296dd2e30] # # patch "http_client.hh" # from [8eac2ba77c84328372e3f2980bac2c91f70a5304] # to [807bce3cae5c6ed4881393fddf658f4dc7020165] # # patch "json_msgs.cc" # from [59063a3a187dc0017d80f187b3ce5e05353a69a2] # to [6c003e8759e0f22ae7b3e7f971937ed6da834165] # # patch "json_msgs.hh" # from [5057e28480a4d7e2f673bd4111e8b10bd169782d] # to [86a66c961fb6eef48b00414a74865929d662d855] # ============================================================ --- cmd_scgi.cc 73b6d790262a839810f04d79d886a56e9e12f917 +++ cmd_scgi.cc b526e6a8d5d3e6f20689fb80511fd67c3ba30512 @@ -17,6 +17,7 @@ #include "constants.hh" #include "database.hh" #include "globish.hh" +#include "graph.hh" #include "json_io.hh" #include "json_msgs.hh" #include "keys.hh" @@ -179,9 +180,31 @@ do_cmd(database & db, json_io::json_obje response_revs.insert(*i); return encode_msg_inquire_response(response_revs); } + else if (decode_msg_descendants_request(cmd_obj, request_revs)) + { + L(FL("descendants %d revisions") % request_revs.size()); + db.ensure_open(); + rev_ancestry_map parent_to_child_map; + db.get_revision_ancestry(parent_to_child_map); + + set descendant_set, response_set; + // get_all_ancestors can be used as get_all_descendants if used with + // the normal parent-to-child order ancestry map. the resulting + // ancestors include all those in the frontier we started from which + // we don't want so remove these to arrive at the set of revs this + // server has the the attached client does not. + get_all_ancestors(request_revs, parent_to_child_map, descendant_set); + set_difference(descendant_set.begin(), descendant_set.end(), + request_revs.begin(), request_revs.end(), + inserter(response_set, response_set.begin())); + + vector response_revs; + toposort(db, response_set, response_revs); + return encode_msg_descendants_response(response_revs); + } else { - return encode_msg_error("request not understood"); + return encode_msg_error("unknown request"); } } @@ -228,9 +251,15 @@ process_scgi_transaction(database & db, return; } } + else + { + // FIXME: do something better for reporting errors from the server + std::cerr << "parse error" << std::endl; + } } catch (scgi_error & e) { + std::cerr << "scgi error -- " << e.msg << std::endl; out << "Status: 400 Bad request\r\n" << "Content-Type: application/jsonrequest\r\n" << "\r\n"; @@ -238,6 +267,7 @@ process_scgi_transaction(database & db, } catch (informative_failure & e) { + std::cerr << "informative failure -- " << e.what() << std::endl; out << "Status: 400 Bad request\r\n" << "Content-Type: application/jsonrequest\r\n" << "\r\n"; ============================================================ --- gsync.cc fa8fcd0b060bca33ac0ae1032f447b050ed381f7 +++ gsync.cc d81cf073a1d92362cd99c270106b18f468bcfd6d @@ -180,73 +180,33 @@ static void } static void -do_missing_playback(database & db, - channel const & ch, - set & core_frontier, - set & revs_to_push, - rev_ancestry_map const & parent_to_child_map) +push_revs(database & db, + channel const & ch, + vector const & outbound_revs) { - // add the root revision to the frontier, so we also push - // initial revisions. - core_frontier.insert(revision_id("")); - while (1) + for (vector::const_iterator i = outbound_revs.begin(); + i != outbound_revs.end(); ++i) { - // collect a set of revisions we can push immediately, i.e. for - // which the other peer has the necessary ancestor revision(s). - vector< pair > pushable_revs; - for (set::const_iterator i = core_frontier.begin(); - i != core_frontier.end(); ++i) - { - typedef rev_ancestry_map::const_iterator ci; - pair range = parent_to_child_map.equal_range(*i); - - for (ci j = range.first; j != range.second; ++j) - if ((!j->second.inner()().empty()) - && (revs_to_push.find(j->second) != revs_to_push.end())) - pushable_revs.push_back(*j); - } - - // abort condition and some invariants - if (revs_to_push.empty()) - { - I(pushable_revs.empty()); - break; - } - else - I(!pushable_revs.empty()); - - // push that set of revisions and advance the frontier. - for (vector< pair >::const_iterator i = pushable_revs.begin(); - i != pushable_revs.end(); ++i) - { - L(FL(" pushing revision %s (child of rev %s)") - % i->second % i->first); - - ch.push_rev(i->second); - - revs_to_push.erase(i->second); - core_frontier.erase(i->first); - core_frontier.insert(i->second); - } + ch.push_rev(*i); } - - // remove that root revision again. - core_frontier.erase(revision_id("")); } - static void -request_missing_playback(database & db, - channel const & ch, - set const & core_frontier) +pull_revs(database & db, + channel const & ch, + vector const & inbound_revs) { - + for (vector::const_iterator i = inbound_revs.begin(); + i != inbound_revs.end(); ++i) + { + ch.pull_rev(*i); + } } void run_gsync_protocol(lua_hooks & lua, database & db, channel const & ch, - globish const & include_pattern, - globish const & exclude_pattern) + globish const & include_pattern, // FIXME: use this pattern + globish const & exclude_pattern) // FIXME: use this pattern { bool pushing = true, pulling = true; @@ -264,22 +224,39 @@ run_gsync_protocol(lua_hooks & lua, data our_revs.insert(i->second); } - set common_core; - determine_common_core(ch, our_revs, child_to_parent_map, parent_to_child_map, common_core); + set common_revs; + determine_common_core(ch, our_revs, child_to_parent_map, parent_to_child_map, common_revs); - set ours_alone; - do_set_difference(our_revs, common_core, ours_alone); - P(F("revs to send: %d") % ours_alone.size()); + P(F("%d common revisions") % common_revs.size()); - set core_frontier = common_core; + set core_frontier = common_revs; erase_ancestors(db, core_frontier); + P(F("%d frontier revisions") % core_frontier.size()); + + set outbound_set; + do_set_difference(our_revs, common_revs, outbound_set); + vector outbound_revs, inbound_revs; + toposort(db, outbound_set, outbound_revs); + + P(F("%d outbound revisions") % outbound_revs.size()); + + ch.get_descendants(core_frontier, inbound_revs); + + P(F("%d inbound revisions") % inbound_revs.size()); + + // gsync is a request/response protocol where a "client" sends requests to + // a "server" and then receives responses from the "server". the client + // will first push any unique revs that only it has and the server will + // then push back any unique revs that only it has. both sides push revs + // from the core frontier in ancestry order, so that parent revisions are + // always received before child revisions. + if (pushing) - do_missing_playback(db, ch, core_frontier, ours_alone, - parent_to_child_map); + push_revs(db, ch, outbound_revs); if (pulling) - request_missing_playback(db, ch, core_frontier); + pull_revs(db, ch, inbound_revs); } ============================================================ --- gsync.hh c7544438a00e6d3224c9886a8fbc00ed3abbcb18 +++ gsync.hh dff14231ff185742fdbd7941cdb44c0e94218edd @@ -25,11 +25,14 @@ public: channel { public: - virtual - void inquire_about_revs(std::set const & query_set, - std::set & theirs) const = 0; - virtual - void push_rev(revision_id const & rid) const = 0; + virtual void inquire_about_revs(std::set const & query_set, + std::set & theirs) const = 0; + virtual void get_descendants(std::set const & common_revs, + std::vector & inbound_revs) const = 0; + + virtual void push_rev(revision_id const & rid) const = 0; + virtual void pull_rev(revision_id const & rid) const = 0; + virtual ~channel() {} }; ============================================================ --- http_client.cc 6561236ce5d96d312751a03ce24d5de94f1d5664 +++ http_client.cc 0d6e55b1421bcd7a4047fe5fc4b0b7d296dd2e30 @@ -110,6 +110,7 @@ http_client::transact_json(json_value_t // Now read back the result string data; parse_http_response(data); + json_io::input_source in(data, "scgi"); json_io::tokenizer tok(in); json_io::parser p(tok); @@ -207,10 +208,26 @@ void } void +http_channel::get_descendants(set const & common_revs, + vector & inbound_revs) const +{ + inbound_revs.clear(); + json_value_t request = encode_msg_descendants_request(common_revs); + json_value_t response = client.transact_json(request); + E(decode_msg_descendants_response(response, inbound_revs), + F("received unexpected reply to 'descendants_request' message")); +} + +void http_channel::push_rev(revision_id const & rid) const { } +void +http_channel::pull_rev(revision_id const & rid) const +{ +} + // Local Variables: // mode: C++ // fill-column: 76 ============================================================ --- http_client.hh 8eac2ba77c84328372e3f2980bac2c91f70a5304 +++ http_client.hh 807bce3cae5c6ed4881393fddf658f4dc7020165 @@ -42,16 +42,16 @@ http_client bool open; http_client(options & opts, lua_hooks & lua, - uri const & u, + uri const & u, globish const & include_pattern, - globish const & exclude_pattern); + globish const & exclude_pattern); json_io::json_value_t transact_json(json_io::json_value_t v); void parse_http_status_line(); void parse_http_header_line(size_t & content_length, bool & keepalive); void parse_http_response(std::string & data); - void crlf(); + void crlf(); }; class http_channel @@ -64,7 +64,11 @@ public: { }; virtual void inquire_about_revs(std::set const & query_set, std::set & theirs) const; + virtual void get_descendants(std::set const & common_revs, + std::vector & inbound_revs) const; + virtual void push_rev(revision_id const & rid) const; + virtual void pull_rev(revision_id const & rid) const; }; // Local Variables: ============================================================ --- json_msgs.cc 59063a3a187dc0017d80f187b3ce5e05353a69a2 +++ json_msgs.cc 6c003e8759e0f22ae7b3e7f971937ed6da834165 @@ -21,6 +21,7 @@ using std::pair; using std::set; using std::string; using std::pair; +using std::vector; using json_io::json_value_t; using json_io::json_array_t; @@ -187,10 +188,38 @@ json_value_t ///////////////////////////////////////////////////////////////////// json_value_t -encode_msg_descendants_request(set const & revs); +encode_msg_descendants_request(set const & revs) +{ + json_io::builder b; + b[syms::type].str(syms::descendants_request()); + b[syms::vers].str("1"); + json_io::builder r = b[syms::revs].arr(); + for (set::const_iterator i = revs.begin(); i != revs.end(); ++i) + r.add_str(i->inner()()); + return b.v; +} + bool -decode_msg_descendants_response(json_value_t val, - set & revs); +decode_msg_descendants_request(json_value_t val, + set & revs) +{ + string type, vers; + json_io::query q(val); + if (q[syms::type].get(type) && type == syms::descendants_request() && + q[syms::vers].get(vers) && vers == "1") + { + size_t nargs = 0; + if (q[syms::revs].len(nargs)) + { + std::string s; + for (size_t i = 0; i < nargs; ++i) + if (q[syms::revs][i].get(s)) + revs.insert(revision_id(s)); + return true; + } + } + return false; +} ///////////////////////////////////////////////////////////////////// @@ -198,10 +227,38 @@ json_value_t ///////////////////////////////////////////////////////////////////// json_value_t -encode_descendants_response(rev_ancestry_map const & parent_to_child_map); +encode_msg_descendants_response(vector const & revs) +{ + json_io::builder b; + b[syms::type].str(syms::descendants_response()); + b[syms::vers].str("1"); + json_io::builder r = b[syms::revs].arr(); + for (vector::const_iterator i = revs.begin(); i != revs.end(); ++i) + r.add_str(i->inner()()); + return b.v; +} + bool -decode_descendants_response(json_value_t val, - rev_ancestry_map & parent_to_child_map); +decode_msg_descendants_response(json_value_t val, + vector & revs) +{ + string type, vers; + json_io::query q(val); + if (q[syms::type].get(type) && type == syms::descendants_response() && + q[syms::vers].get(vers) && vers == "1") + { + size_t nargs = 0; + if (q[syms::revs].len(nargs)) + { + std::string s; + for (size_t i = 0; i < nargs; ++i) + if (q[syms::revs][i].get(s)) + revs.push_back(revision_id(s)); + return true; + } + } + return false; +} ///////////////////////////////////////////////////////////////////// @@ -286,7 +343,6 @@ encode_msg_rev(revision_t const & rev) } - json_value_t encode_msg_full_rev(revision_id const & rid, revision_t const & rev, ============================================================ --- json_msgs.hh 5057e28480a4d7e2f673bd4111e8b10bd169782d +++ json_msgs.hh 86a66c961fb6eef48b00414a74865929d662d855 @@ -33,14 +33,15 @@ bool decode_msg_inquire_response(json_io bool decode_msg_inquire_response(json_io::json_value_t val, std::set & revs); -json_io::json_value_t encode_msg_descendants_request(std::set const & start); +json_io::json_value_t encode_msg_descendants_request(std::set const & revs); bool decode_msg_descendants_request(json_io::json_value_t val, - std::set & start); + std::set & revs); +json_io::json_value_t encode_msg_descendants_response(std::vector const & revs); +bool decode_msg_descendants_response(json_io::json_value_t val, + std::vector & revs); -json_io::json_value_t encode_msg_descendants_response(rev_ancestry_map const & parent_to_child_map); -bool decode_msg_descendants_response(json_io::json_value_t val, - rev_ancestry_map & parent_to_child_map); + json_io::json_value_t encode_msg_get_file_data(file_id const & fid); bool decode_msg_get_file_data(json_io::json_value_t val, file_id & fid);