# # # patch "cmd_scgi.cc" # from [5a5a2d9288e6506b6d6d7c3afbbe19e587e7aa83] # to [5af4065b772cd457d94270d6c2b5cd54f3b0bdc8] # # patch "gsync.cc" # from [2fe15a794fc2956289fb0397051607fec89ea736] # to [29cebe236d2164c9985f9d967ca99c405e238d3c] # # patch "gsync.hh" # from [55c2690c41c78c33df2678cfc6e277baff0459c0] # to [752324fb915ee92a4c74f8fe03d71c243f51eb5e] # # patch "http_client.cc" # from [d3f43f0481c7101204fc17f03f2e5b4ef7a01010] # to [36476ca29218bcc77fbf8a1424fdc6a706bb02fd] # # patch "http_client.hh" # from [fc8581a72087cf0924b2fa0dab5353756e07f560] # to [72ea8083efe626659072e69a49fe9f23de9ba25d] # # patch "json_msgs.cc" # from [a2358398a8ac5c553e0693efbba21881745be452] # to [f14bafa35f5544386705727fcfd4c599320edcb6] # # patch "json_msgs.hh" # from [94731c4b1fe5c8b2a48bf27a16bed8aeb0bef035] # to [49912f31573a7568ff4b112db180da479735be03] # ============================================================ --- cmd_scgi.cc 5a5a2d9288e6506b6d6d7c3afbbe19e587e7aa83 +++ cmd_scgi.cc 5af4065b772cd457d94270d6c2b5cd54f3b0bdc8 @@ -18,6 +18,7 @@ #include "database.hh" #include "globish.hh" #include "graph.hh" +#include "gsync.hh" #include "json_io.hh" #include "json_msgs.hh" #include "keys.hh" @@ -175,6 +176,8 @@ do_cmd(database & db, json_io::json_obje file_id fid, old_id, new_id; file_data data; file_delta delta; + vector data_records; + vector delta_records; db.ensure_open(); if (decode_msg_inquire_request(cmd_obj, request_revs)) @@ -208,6 +211,20 @@ do_cmd(database & db, json_io::json_obje toposort(db, response_set, response_revs); return encode_msg_descendants_response(response_revs); } + else if (decode_msg_get_full_rev_request(cmd_obj, rid)) + { + load_full_rev(db, rid, rev, data_records, delta_records); + return encode_msg_get_full_rev_response(rev, data_records, delta_records); + } + else if (decode_msg_put_full_rev_request(cmd_obj, rid, rev, + data_records, delta_records)) + { + revision_id check; + calculate_ident(rev, check); + I(rid == check); + store_full_rev(db, rid, rev, data_records, delta_records); + return encode_msg_put_full_rev_response(); + } else if (decode_msg_get_rev_request(cmd_obj, rid)) { db.get_revision(rid, rev); ============================================================ --- gsync.cc 2fe15a794fc2956289fb0397051607fec89ea736 +++ gsync.cc 29cebe236d2164c9985f9d967ca99c405e238d3c @@ -20,6 +20,7 @@ #include "globish.hh" #include "graph.hh" #include "gsync.hh" +#include "json_msgs.hh" #include "revision.hh" #include "sanity.hh" #include "ui.hh" @@ -181,7 +182,112 @@ invert_ancestry(rev_ancestry_map const & out.insert(make_pair(i->second, i->first)); } +void +load_full_rev(database & db, + revision_id const rid, + revision_t & rev, + vector & data_records, + vector & delta_records) +{ + db.get_revision(rid, rev); + + for (edge_map::const_iterator e = rev.edges.begin(); + e != rev.edges.end(); ++e) + { + cset const & cs = edge_changes(e); + for (map::const_iterator + f = cs.files_added.begin(); f != cs.files_added.end(); ++f) + { + file_data data; + db.get_file_version(f->second, data); + data_records.push_back(file_data_record(f->second, data)); + } + + for (map >::const_iterator + f = cs.deltas_applied.begin(); f != cs.deltas_applied.end(); ++f) + { + file_delta delta; + db.get_arbitrary_file_delta(f->second.first, f->second.second, delta); + delta_records.push_back(file_delta_record(f->second.first, + f->second.second, + delta)); + } + } +} + +void +store_full_rev(database & db, + revision_id const rid, + revision_t const & rev, + vector const & data_records, + vector const & delta_records) +{ + for (vector::const_iterator + f = data_records.begin(); f != data_records.end(); ++f) + db.put_file(f->id, f->dat); + + for (vector::const_iterator + f = delta_records.begin(); f != delta_records.end(); ++f) + db.put_file_version(f->src_id, f->dst_id, f->del); + + db.put_revision(rid, rev); +} + static void +push_full_revs(database & db, + channel const & ch, + vector const & outbound_revs, + bool const dryrun) +{ + ticker rev_ticker(N_("pushing revisions"), "R", 1); + rev_ticker.set_total(outbound_revs.size()); + + for (vector::const_iterator i = outbound_revs.begin(); + i != outbound_revs.end(); ++i) + { + revision_t rev; + vector data_records; + vector delta_records; + + load_full_rev(db, *i, rev, data_records, delta_records); + + if (!dryrun) + ch.push_full_rev(*i, rev, data_records, delta_records); + + ++rev_ticker; + } +} + +static void +pull_full_revs(database & db, + channel const & ch, + vector const & inbound_revs, + bool const dryrun) +{ + ticker rev_ticker(N_("pulling revisions"), "R", 1); + rev_ticker.set_total(inbound_revs.size()); + + for (vector::const_iterator i = inbound_revs.begin(); + i != inbound_revs.end(); ++i) + { + revision_t rev; + vector data_records; + vector delta_records; + + ch.pull_full_rev(*i, rev, data_records, delta_records); + + if (!dryrun) + { + transaction_guard guard(db); + store_full_rev(db, *i, rev, data_records, delta_records); + guard.commit(); + } + + ++rev_ticker; + } +} + +static void push_revs(database & db, channel const & ch, vector const & outbound_revs, @@ -192,8 +298,6 @@ push_revs(database & db, rev_ticker.set_total(outbound_revs.size()); - transaction_guard guard(db); - for (vector::const_iterator i = outbound_revs.begin(); i != outbound_revs.end(); ++i) { @@ -229,9 +333,6 @@ push_revs(database & db, if (!dryrun) ch.push_rev(*i, rev); } - - if (!dryrun) - guard.commit(); } static void @@ -254,8 +355,6 @@ pull_revs(database & db, ch.pull_rev(*i, rev); ++rev_ticker; - transaction_guard guard(db); - for (edge_map::const_iterator e = rev.edges.begin(); e != rev.edges.end(); ++e) { @@ -344,10 +443,10 @@ run_gsync_protocol(lua_hooks & lua, data // always received before child revisions. if (pushing) - push_revs(db, ch, outbound_revs, dryrun); + push_full_revs(db, ch, outbound_revs, dryrun); if (pulling) - pull_revs(db, ch, inbound_revs, dryrun); + pull_full_revs(db, ch, inbound_revs, dryrun); } #ifdef BUILD_UNIT_TESTS ============================================================ --- gsync.hh 55c2690c41c78c33df2678cfc6e277baff0459c0 +++ gsync.hh 752324fb915ee92a4c74f8fe03d71c243f51eb5e @@ -18,6 +18,9 @@ struct globish; struct uri; struct globish; +struct file_data_record; +struct file_delta_record; + class lua_hooks; class database; class revision_t; @@ -31,6 +34,16 @@ public: virtual void get_descendants(std::set const & common_revs, std::vector & inbound_revs) const = 0; + virtual void push_full_rev(revision_id const & rid, + revision_t const & rev, + std::vector const & data_records, + std::vector const & delta_records) const = 0; + + virtual void pull_full_rev(revision_id const & rid, + revision_t & rev, + std::vector & data_records, + std::vector & delta_records) const = 0; + virtual void push_file_data(file_id const & id, file_data const & data) const = 0; virtual void push_file_delta(file_id const & old_id, @@ -55,6 +68,20 @@ run_gsync_protocol(lua_hooks & lua, data globish const & exclude_pattern, bool const dryrun); +void +load_full_rev(database & db, + revision_id const rid, + revision_t & rev, + std::vector & data_records, + std::vector & delta_records); + +void +store_full_rev(database & db, + revision_id const rid, + revision_t const & rev, + std::vector const & data_records, + std::vector const & delta_records); + // Local Variables: // mode: C++ // fill-column: 76 ============================================================ --- http_client.cc d3f43f0481c7101204fc17f03f2e5b4ef7a01010 +++ http_client.cc 36476ca29218bcc77fbf8a1424fdc6a706bb02fd @@ -230,7 +230,35 @@ http_channel::get_descendants(set const & data_records, + vector const & delta_records) const +{ + json_value_t request = encode_msg_put_full_rev_request(rid, rev, + data_records, + delta_records); + json_value_t response = client.transact_json(request); + E(decode_msg_put_full_rev_response(response), + F("received unexpected reply to 'put_full_rev_request' message")); +} + +void +http_channel::pull_full_rev(revision_id const & rid, + revision_t & rev, + vector & data_records, + vector & delta_records) const +{ + json_value_t request = encode_msg_get_full_rev_request(rid); + json_value_t response = client.transact_json(request); + E(decode_msg_get_full_rev_response(response, rev, + data_records, delta_records), + F("received unexpected reply to 'get_full_rev_request' message")); +} + +void http_channel::push_file_data(file_id const & id, file_data const & data) const { ============================================================ --- http_client.hh fc8581a72087cf0924b2fa0dab5353756e07f560 +++ http_client.hh 72ea8083efe626659072e69a49fe9f23de9ba25d @@ -67,6 +67,16 @@ public: virtual void get_descendants(std::set const & common_revs, std::vector & inbound_revs) const; + virtual void push_full_rev(revision_id const & rid, + revision_t const & rev, + std::vector const & data_records, + std::vector const & delta_records) const; + + virtual void pull_full_rev(revision_id const & rid, + revision_t & rev, + std::vector & data_records, + std::vector & delta_records) const; + virtual void push_file_data(file_id const & id, file_data const & data) const; virtual void push_file_delta(file_id const & old_id, ============================================================ --- json_msgs.cc a2358398a8ac5c553e0693efbba21881745be452 +++ json_msgs.cc f14bafa35f5544386705727fcfd4c599320edcb6 @@ -66,6 +66,8 @@ namespace symbol const dst_id("dst_id"); symbol const delta("delta"); symbol const data("data"); + symbol const data_records("data_records"); + symbol const delta_records("delta_records"); // command symbols symbol const type("type"); @@ -82,6 +84,12 @@ namespace symbol const descendants_request("descendants_request"); symbol const descendants_response("descendants_response"); + symbol const get_full_rev_request("get_full_rev_request"); + symbol const get_full_rev_response("get_full_rev_response"); + + symbol const put_full_rev_request("put_full_rev_request"); + symbol const put_full_rev_response("put_full_rev_response"); + symbol const get_rev_request("get_rev_request"); symbol const get_rev_response("get_rev_response"); @@ -464,6 +472,224 @@ decode_rev(query q, revision_t & rev) } ///////////////////////////////////////////////////////////////////// +// encode/decode file data records +///////////////////////////////////////////////////////////////////// + +static void +encode_data_records(builder b, vector const & data_records) +{ + for (vector::const_iterator + i = data_records.begin(); i != data_records.end(); ++i) + { + builder tmp = b.add_obj(); + tmp[syms::id].str(i->id.inner()()); + tmp[syms::data].str(encode_base64(i->dat.inner())()); + } +} + +static void +decode_data_records(query q, vector & data_records) +{ + size_t nargs = 0; + I(q.len(nargs)); + + for (size_t i = 0; i < nargs; ++i) + { + query d = q[i]; + string id, dat; + d[syms::id].get(id); + d[syms::data].get(dat); + file_data data(decode_base64_as(dat)); + data_records.push_back(file_data_record(file_id(id), + data)); + } +} + +///////////////////////////////////////////////////////////////////// +// encode/decode file delta records +///////////////////////////////////////////////////////////////////// + +static void +encode_delta_records(builder b, vector const & delta_records) +{ + for (vector::const_iterator + i = delta_records.begin(); i != delta_records.end(); ++i) + { + builder tmp = b.add_obj(); + tmp[syms::src_id].str(i->src_id.inner()()); + tmp[syms::dst_id].str(i->dst_id.inner()()); + tmp[syms::delta].str(encode_base64(i->del.inner())()); + } +} + +static void +decode_delta_records(query q, vector & delta_records) +{ + size_t nargs = 0; + I(q.len(nargs)); + + for (size_t i = 0; i < nargs; ++i) + { + query d = q[i]; + string src_id, dst_id, del; + d[syms::src_id].get(src_id); + d[syms::dst_id].get(dst_id); + d[syms::delta].get(del); + file_delta delta(decode_base64_as(del)); + delta_records.push_back(file_delta_record(file_id(src_id), + file_id(dst_id), + delta)); + } +} + +///////////////////////////////////////////////////////////////////// +// message type 'get_full_rev_request' +///////////////////////////////////////////////////////////////////// + +json_value_t +encode_msg_get_full_rev_request(revision_id const & rid) +{ + builder b; + b[syms::type].str(syms::get_full_rev_request()); + b[syms::vers].str("1"); + b[syms::id].str(rid.inner()()); + return b.v; +} + +bool +decode_msg_get_full_rev_request(json_value_t val, + revision_id & rid) +{ + string type, vers, id; + query q(val); + if (q[syms::type].get(type) && type == syms::get_full_rev_request() && + q[syms::vers].get(vers) && vers == "1" && + q[syms::id].get(id)) + { + rid = revision_id(id); + return true; + } + return false; +} + +///////////////////////////////////////////////////////////////////// +// message type 'get_full_rev_response' +///////////////////////////////////////////////////////////////////// + +json_value_t +encode_msg_get_full_rev_response(revision_t const & rev, + vector const & data_records, + vector const & delta_records) +{ + builder b; + b[syms::type].str(syms::get_full_rev_response()); + b[syms::vers].str("1"); + builder rb = b[syms::rev].obj(); + encode_rev(rb, rev); + builder dat = b[syms::data_records].arr(); + encode_data_records(dat, data_records); + builder del = b[syms::delta_records].arr(); + encode_delta_records(del, delta_records); + return b.v; +} + +bool +decode_msg_get_full_rev_response(json_value_t val, + revision_t & rev, + vector & data_records, + vector & delta_records) +{ + string type, vers; + query q(val); + if (q[syms::type].get(type) && type == syms::get_full_rev_response() && + q[syms::vers].get(vers) && vers == "1") + { + query rq = q[syms::rev]; + decode_rev(rq, rev); + query dat = q[syms::data_records]; + decode_data_records(dat, data_records); + query del = q[syms::delta_records]; + decode_delta_records(del, delta_records); + return true; + } + return false; +} + +///////////////////////////////////////////////////////////////////// +// message type 'put_full_rev_request' +///////////////////////////////////////////////////////////////////// + +json_value_t encode_msg_put_full_rev_request(revision_id const & rid, + revision_t const & rev, + vector const & data_records, + vector const & delta_records) +{ + builder b; + b[syms::type].str(syms::put_full_rev_request()); + b[syms::vers].str("1"); + b[syms::id].str(rid.inner()()); + builder rb = b[syms::rev].obj(); + encode_rev(rb, rev); + builder dat = b[syms::data_records].arr(); + encode_data_records(dat, data_records); + builder del = b[syms::delta_records].arr(); + encode_delta_records(del, delta_records); + return b.v; +} + +bool +decode_msg_put_full_rev_request(json_value_t val, + revision_id & rid, + revision_t & rev, + vector & data_records, + vector & delta_records) +{ + string type, vers, id; + query q(val); + if (q[syms::type].get(type) && type == syms::put_full_rev_request() && + q[syms::vers].get(vers) && vers == "1" && + q[syms::id].get(id)) + { + rid = revision_id(id); + query rq = q[syms::rev]; + decode_rev(rq, rev); + query dat = q[syms::data_records]; + decode_data_records(dat, data_records); + query del = q[syms::delta_records]; + decode_delta_records(del, delta_records); + return true; + } + return false; +} + +///////////////////////////////////////////////////////////////////// +// message type 'put_full_rev_response' +///////////////////////////////////////////////////////////////////// + +json_value_t encode_msg_put_full_rev_response() +{ + builder b; + b[syms::type].str(syms::put_full_rev_response()); + b[syms::vers].str("1"); + b[syms::status].str("received"); + return b.v; +} + +bool +decode_msg_put_full_rev_response(json_value_t val) +{ + string type, vers, status; + query q(val); + if (q[syms::type].get(type) && type == syms::put_full_rev_response() && + q[syms::vers].get(vers) && vers == "1" && + q[syms::status].get(status)) + { + return true; + } + return false; +} + +///////////////////////////////////////////////////////////////////// // message type 'get_rev_request' ///////////////////////////////////////////////////////////////////// ============================================================ --- json_msgs.hh 94731c4b1fe5c8b2a48bf27a16bed8aeb0bef035 +++ json_msgs.hh 49912f31573a7568ff4b112db180da479735be03 @@ -64,7 +64,50 @@ bool decode_msg_put_rev_response(json_io json_io::json_value_t encode_msg_put_rev_response(); bool decode_msg_put_rev_response(json_io::json_value_t val); +// full revs with all file data and deltas in one request/response +struct file_data_record +{ + file_id id; + file_data dat; + file_data_record(file_id id, file_data dat) : + id(id), dat(dat) {} +}; + +struct file_delta_record +{ + file_id src_id; + file_id dst_id; + file_delta del; + file_delta_record(file_id src_id, file_id dst_id, file_delta del) : + src_id(src_id), dst_id(dst_id), del(del) {} +}; + +json_io::json_value_t encode_msg_get_full_rev_request(revision_id const & rid); +bool decode_msg_get_full_rev_request(json_io::json_value_t val, + revision_id & rid); + +json_io::json_value_t encode_msg_get_full_rev_response(revision_t const & rev, + std::vector const & data_records, + std::vector const & delta_records); +bool decode_msg_get_full_rev_response(json_io::json_value_t val, + revision_t & rev, + std::vector & data_records, + std::vector & delta_records); + +json_io::json_value_t encode_msg_put_full_rev_request(revision_id const & rid, + revision_t const & rev, + std::vector const & data_records, + std::vector const & delta_records); +bool decode_msg_put_full_rev_request(json_io::json_value_t val, + revision_id & rid, + revision_t & rev, + std::vector & data_records, + std::vector & delta_records); + +json_io::json_value_t encode_msg_put_full_rev_response(); +bool decode_msg_put_full_rev_response(json_io::json_value_t val); + // file data json_io::json_value_t encode_msg_get_file_data_request(file_id const & fid);