# # # add_dir "tests/server_initiated_sync" # # add_file "tests/server_initiated_sync/__driver__.lua" # content [f7bc7d804399c89dd54309f88857bf5b0341b09d] # # add_file "tests/server_initiated_sync/server1.rc" # content [bf90ea7239e109825c276126b698019128be1ba6] # # patch "netsync.cc" # from [90d845d5e17b5e303e43573dd1274196f589ee47] # to [93ebea4aca4d92fc925111c041c52b93fd6306d1] # ============================================================ --- tests/server_initiated_sync/__driver__.lua f7bc7d804399c89dd54309f88857bf5b0341b09d +++ tests/server_initiated_sync/__driver__.lua f7bc7d804399c89dd54309f88857bf5b0341b09d @@ -0,0 +1,27 @@ + +include("common/netsync.lua") +mtn_setup() +netsync.setup() + +addfile("a", "b") +check(mtn2("commit", "-m", "foo"), 0, false, false) + + +srv2 = netsync.start(3) + +get("server1.rc") +rcdata = readfile("server1.rc") +rcdata = string.gsub(rcdata, "localhost:12345", srv2.address) +writefile("server1.rc", rcdata) + +srv1 = netsync.start({"--rcfile=server1.rc"}) + +srv1:push({"*"}) + +sleep(5) + +srv1:stop() +srv2:stop() + +-- should now exist in mtn3 +check(mtn3("update"), 0, false, false) ============================================================ --- tests/server_initiated_sync/server1.rc bf90ea7239e109825c276126b698019128be1ba6 +++ tests/server_initiated_sync/server1.rc bf90ea7239e109825c276126b698019128be1ba6 @@ -0,0 +1,12 @@ +function note_netsync_start(session_id, my_role, sync_type, remote_host, + remote_keyname, includes, excludes) + if netsync_info == nil then netsync_info = {} end + netsync_info[session_id] = remote_host +end + +function note_netsync_end(session_id, status, bytes_in, bytes_out, certs_in, + certs_out, revs_in, revs_out, keys_in, keys_out) + if netsync_info[session_id] ~= "localhost:12345" then + server_request_sync("push", "localhost:12345", "*", "") + end +end ============================================================ --- netsync.cc 90d845d5e17b5e303e43573dd1274196f589ee47 +++ netsync.cc 93ebea4aca4d92fc925111c041c52b93fd6306d1 @@ -28,6 +28,7 @@ #include "constants.hh" #include "enumerator.hh" #include "keys.hh" +#include "lua.hh" #include "merkle_tree.hh" #include "netcmd.hh" #include "netio.hh" @@ -249,6 +250,29 @@ using boost::lexical_cast; using boost::shared_ptr; using boost::lexical_cast; +struct server_initiated_sync_request +{ + string what; + string address; + string include; + string exclude; +}; +deque server_initiated_sync_requests; +LUAEXT(server_request_sync, ) +{ + char const * w = luaL_checkstring(L, 1); + char const * a = luaL_checkstring(L, 2); + char const * i = luaL_checkstring(L, 3); + char const * e = luaL_checkstring(L, 4); + server_initiated_sync_request request; + request.what = string(w); + request.address = string(a); + request.include = string(i); + request.exclude = string(e); + server_initiated_sync_requests.push_back(request); + return 0; +} + static inline void require(bool check, string const & context) { @@ -269,8 +293,8 @@ session: { protocol_role role; protocol_voice const voice; - globish const & our_include_pattern; - globish const & our_exclude_pattern; + globish our_include_pattern; + globish our_exclude_pattern; globish_matcher our_matcher; app_state & app; @@ -372,7 +396,8 @@ session: globish const & our_exclude_pattern, app_state & app, string const & peer, - shared_ptr sock); + shared_ptr sock, + bool initiated_by_server = false); virtual ~session(); @@ -477,6 +502,8 @@ session: void send_all_data(netcmd_item_type ty, set const & items); void begin_service(); bool process(transaction_guard & guard); + + bool initiated_by_server; }; size_t session::session_count = 0; @@ -486,7 +513,8 @@ session::session(protocol_role role, globish const & our_exclude_pattern, app_state & app, string const & peer, - shared_ptr sock) : + shared_ptr sock, + bool initiated_by_server) : role(role), voice(voice), our_include_pattern(our_include_pattern), @@ -525,7 +553,8 @@ session::session(protocol_role role, key_refiner(key_item, voice, *this), cert_refiner(cert_item, voice, *this), rev_refiner(revision_item, voice, *this), - rev_enumerator(*this, app) + rev_enumerator(*this, app), + initiated_by_server(initiated_by_server) {} session::~session() @@ -1313,7 +1342,8 @@ session::process_hello_cmd(rsa_keypair_i } rebuild_merkle_trees(app, ok_branches); - setup_client_tickers(); + if (!initiated_by_server) + setup_client_tickers(); if (app.opts.use_transport_auth && app.opts.signing_key() != "") @@ -2494,7 +2524,8 @@ arm_sessions_and_calculate_probe(Netxx:: static void arm_sessions_and_calculate_probe(Netxx::PipeCompatibleProbe & probe, map > & sessions, - set & armed_sessions) + set & armed_sessions, + transaction_guard & guard) { set arm_failed; for (mapsecond->maybe_step(); + i->second->maybe_say_goodbye(guard); try { if (i->second->arm()) @@ -2773,8 +2805,13 @@ serve_connections(protocol_role role, else probe.add(server); - arm_sessions_and_calculate_probe(probe, sessions, armed_sessions); + if (!guard) + guard = shared_ptr(new transaction_guard(app.db)); + I(guard); + + arm_sessions_and_calculate_probe(probe, sessions, armed_sessions, *guard); + L(FL("i/o probe with %d armed") % armed_sessions.size()); Netxx::socket_type fd; Netxx::Timeout how_long; @@ -2791,11 +2828,6 @@ serve_connections(protocol_role role, Netxx::Probe::ready_type event = res.second; fd = res.first; - if (!guard) - guard = shared_ptr(new transaction_guard(app.db)); - - I(guard); - if (fd == -1) { if (armed_sessions.empty()) @@ -2852,6 +2884,42 @@ serve_connections(protocol_role role, process_armed_sessions(sessions, armed_sessions, *guard); reap_dead_sessions(sessions, timeout_seconds); + while (!server_initiated_sync_requests.empty()) + { + server_initiated_sync_request request + = server_initiated_sync_requests.front(); + server_initiated_sync_requests.pop_front(); + + utf8 addr(request.address); + globish inc(request.include); + globish exc(request.exclude); + + P(F("connecting to %s") % address()); + shared_ptr server + = build_stream_to_server(app, inc, exc, + addr, default_port, + timeout); + + // 'false' here means not to revert changes when the SockOpt + // goes out of scope. + Netxx::SockOpt socket_options(server->get_socketfd(), false); + socket_options.set_non_blocking(); + + protocol_role role = source_and_sink_role; + if (request.what == "sync") + role = source_and_sink_role; + else if (request.what == "push") + role = source_role; + else if (request.what == "pull") + role = sink_role; + + shared_ptr sess(new session(role, client_voice, + inc, exc, + app, addr(), server, true)); + + sessions.insert(make_pair(server->get_socketfd(), sess)); + } + if (sessions.empty()) { // Let the guard die completely if everything's gone quiet. @@ -2927,7 +2995,7 @@ serve_single_connection(shared_ptr