# # # patch "contrib/monotone-cluster-push.lua" # from [a563958864b495032320d5fa766ca07bcc8eac84] # to [e19bd126d888a624a547b90a321cba02546ebf84] # # patch "netsync.cc" # from [334420d569bc76a9ad178b7baba5b028f8a42a6e] # to [b74410f8fd40aefcf4f0a1c650426c1a0bb78437] # ============================================================ --- contrib/monotone-cluster-push.lua a563958864b495032320d5fa766ca07bcc8eac84 +++ contrib/monotone-cluster-push.lua e19bd126d888a624a547b90a321cba02546ebf84 @@ -154,6 +154,9 @@ do end local dat = rcfile:read("*a") io.close(rcfile) + if debug then + io.stderr:write("note_netsync_end: got this:\n", dat, "\n") + end local res = parse_basic_io(dat) if res == nil then io.stderr:write("file ", MCP_rcfile, " cannot be parsed\n") @@ -162,42 +165,114 @@ do local matches = false local patterns = {} + local previous_name = "" for i, item in pairs(res) do if item.name == "pattern" then - matches = false - patterns = {} - for j, val in pairs(item.values) do + if previous_name ~= "pattern" then if debug then - io.stderr:write("note_netsync_end: found pattern ", val, - "\n") + io.stderr:write("note_netsync_end: clearing matches and patterns because previous_name = \"", previous_name, "\"\n") end + matches = false + patterns = {} + end + for j, pattern in pairs(item.values) do + if debug then + io.stderr:write("note_netsync_end: found ", + item.name, " = \"", pattern, "\"\n") + end for branch, b in pairs(branches[nonce]) do if debug then io.stderr:write("note_netsync_end: trying to match branch ", branch, "\n") end - if globish_match(val, branch) then + if globish_match(pattern, branch) then if debug then io.stderr:write("note_netsync_end: it matches branch ", branch, "\n") end matches = true - patterns[val] = true + patterns[pattern] = true end end end elseif matches then if item.name == "server" then - for k, server in pairs(item.values) do + for j, server in pairs(item.values) do + if debug then + io.stderr:write("note_netsync_end: found ", + item.name, " = \"", server, "\"\n") + end for pattern, b in pairs(patterns) do - io.stderr:write("pushing pattern \"" .. pattern .. - "\" to server " .. server .. "\n") + io.stderr:write("pushing pattern \"", pattern, + "\" to server ", server, "\n") server_request_sync("push", server, pattern, "") end end end end + previous_name = item.name end end end + + local saved_note_mtn_startup = note_mtn_startup + function note_mtn_startup(...) + if saved_note_mtn_startup then + saved_note_mtn_startup(unpack(arg)) + end + + if debug then + io.stderr:write("note_mtn_startup: reading ", MCP_rcfile, + "\n") + end + local rcfile = io.open(MCP_rcfile, "r") + if (rcfile == nil) then + io.stderr:write("file ", MCP_rcfile, " cannot be opened\n") + return false + end + local dat = rcfile:read("*a") + io.close(rcfile) + if debug then + io.stderr:write("note_mtn_startup: got this:\n", dat, "\n") + end + local res = parse_basic_io(dat) + if res == nil then + io.stderr:write("file ", MCP_rcfile, " cannot be parsed\n") + return false + end + + local patterns = {} + local previous_name = "" + for i, item in pairs(res) do + if item.name == "pattern" then + if previous_name ~= "pattern" then + if debug then + io.stderr:write("note_mtn_startup: clearing patterns because previous_name = \"", previous_name, "\"\n") + end + patterns = {} + end + for j, pattern in pairs(item.values) do + if debug then + io.stderr:write("note_mtn_startup: found ", + item.name, " = \"", pattern, "\"\n") + end + patterns[pattern] = true + end + elseif item.name == "server" then + for j, server in pairs(item.values) do + if debug then + io.stderr:write("note_mtn_startup: found ", + item.name, " = \"", server, "\"\n") + end + for pattern, b in pairs(patterns) do + io.stderr:write("pushing pattern \"", pattern, + "\" to server ", server, "\n") + server_request_sync("push", server, pattern, "") + end + end + end + previous_name = item.name + end + return nil + end end ============================================================ --- netsync.cc 334420d569bc76a9ad178b7baba5b028f8a42a6e +++ netsync.cc b74410f8fd40aefcf4f0a1c650426c1a0bb78437 @@ -2810,6 +2810,49 @@ serve_connections(protocol_role role, I(guard); + while (!server_initiated_sync_requests.empty()) + { + server_initiated_sync_request request + = server_initiated_sync_requests.front(); + server_initiated_sync_requests.pop_front(); + + utf8 addr(request.address); + globish inc(request.include); + globish exc(request.exclude); + + try + { + P(F("connecting to %s") % addr()); + shared_ptr server + = build_stream_to_server(app, inc, exc, + addr, default_port, + timeout); + + // 'false' here means not to revert changes when + // the SockOpt goes out of scope. + Netxx::SockOpt socket_options(server->get_socketfd(), false); + socket_options.set_non_blocking(); + + protocol_role role = source_and_sink_role; + if (request.what == "sync") + role = source_and_sink_role; + else if (request.what == "push") + role = source_role; + else if (request.what == "pull") + role = sink_role; + + shared_ptr sess(new session(role, client_voice, + inc, exc, + app, addr(), server, true)); + + sessions.insert(make_pair(server->get_socketfd(), sess)); + } + catch (Netxx::NetworkException & e) + { + P(F("Network error: %s") % e.what()); + } + } + arm_sessions_and_calculate_probe(probe, sessions, armed_sessions, *guard); L(FL("i/o probe with %d armed") % armed_sessions.size()); @@ -2884,49 +2927,6 @@ serve_connections(protocol_role role, process_armed_sessions(sessions, armed_sessions, *guard); reap_dead_sessions(sessions, timeout_seconds); - while (!server_initiated_sync_requests.empty()) - { - server_initiated_sync_request request - = server_initiated_sync_requests.front(); - server_initiated_sync_requests.pop_front(); - - utf8 addr(request.address); - globish inc(request.include); - globish exc(request.exclude); - - try - { - P(F("connecting to %s") % addr()); - shared_ptr server - = build_stream_to_server(app, inc, exc, - addr, default_port, - timeout); - - // 'false' here means not to revert changes when - // the SockOpt goes out of scope. - Netxx::SockOpt socket_options(server->get_socketfd(), false); - socket_options.set_non_blocking(); - - protocol_role role = source_and_sink_role; - if (request.what == "sync") - role = source_and_sink_role; - else if (request.what == "push") - role = source_role; - else if (request.what == "pull") - role = sink_role; - - shared_ptr sess(new session(role, client_voice, - inc, exc, - app, addr(), server, true)); - - sessions.insert(make_pair(server->get_socketfd(), sess)); - } - catch (Netxx::NetworkException & e) - { - P(F("Network error: %s") % e.what()); - } - } - if (sessions.empty()) { // Let the guard die completely if everything's gone quiet.