#
# patch "ChangeLog"
# from [ca7af199e3ec7cbe2e7333e3a4c107ed23161796]
# to [7d63b013470e09c70dd05b9da8f899ec63ed2201]
#
# patch "netsync.cc"
# from [356f622b68dcfef9265dad15a8c988f2ea62741f]
# to [0aef99065fae04d90bbf3ae8bcf2df03097539cd]
#
# patch "netxx_pipe.hh"
# from [7f13e757f0721fccd410751bdcc10f2d604de2a3]
# to [74bdced617276bc40eca3e34f8be2b6ca8d390a9]
#
========================================================================
--- ChangeLog ca7af199e3ec7cbe2e7333e3a4c107ed23161796
+++ ChangeLog 7d63b013470e09c70dd05b9da8f899ec63ed2201
@@ -1,3 +1,17 @@
+2005-08-13 Christof Petig
+
+ use portable PipeStream implementation to reimplement changes on the
+ ssh branch
+
+2004-06-18 Christof Petig
+
+ * netsync.cc:
+ (struct session): use PipeStream, pair of sockets
+ (find_wordend,parse_ssh_url): helper functions to parse an URL
+ (call_server): recognize ssh: and file: URLs
+ (serve_stdio): variant of serve_connections for stdio
+ (run_netsync_protocol): call serve_stdio if address is -
+
2005-07-26 Nathaniel Smith
* change_set.cc (dump): Add state_renumbering dumper.
========================================================================
--- netsync.cc 356f622b68dcfef9265dad15a8c988f2ea62741f
+++ netsync.cc 0aef99065fae04d90bbf3ae8bcf2df03097539cd
@@ -45,6 +45,7 @@
#include "netxx/stream.h"
#include "netxx/streamserver.h"
#include "netxx/timeout.h"
+#include "netxx_pipe.h"
//
// this is the "new" network synchronization (netsync) system in
@@ -234,8 +235,8 @@
app_state & app;
string peer_id;
- Netxx::socket_type fd;
- Netxx::Stream str;
+// Netxx::socket_type fd;
+ shared_ptr str;
string inbuf;
// deque of pair
@@ -292,8 +293,7 @@
utf8 const & our_exclude_pattern,
app_state & app,
string const & peer,
- Netxx::socket_type sock,
- Netxx::Timeout const & to);
+ shared_ptr sock);
virtual ~session();
@@ -458,8 +458,7 @@
utf8 const & our_exclude_pattern,
app_state & app,
string const & peer,
- Netxx::socket_type sock,
- Netxx::Timeout const & to) :
+ shared_ptr sock) :
role(role),
voice(voice),
our_include_pattern(our_include_pattern),
@@ -467,8 +466,7 @@
our_matcher(our_include_pattern, our_exclude_pattern),
app(app),
peer_id(peer),
- fd(sock),
- str(sock, to),
+ str(sock),
inbuf(""),
outbuf_size(0),
armed(false),
@@ -1327,10 +1325,10 @@
{
I(inbuf.size() < constants::netcmd_maxsz);
char tmp[constants::bufsz];
- Netxx::signed_size_type count = str.read(tmp, sizeof(tmp));
+ Netxx::signed_size_type count = str->read(tmp, sizeof(tmp));
if (count > 0)
{
- L(F("read %d bytes from fd %d (peer %s)\n") % count % fd % peer_id);
+ L(F("read %d bytes from fd %d (peer %s)\n") % count % str->get_socketfd() % peer_id);
if (encountered_error)
{
L(F("in error unwind mode, so throwing them into the bit bucket\n"));
@@ -1351,7 +1349,7 @@
{
I(!outbuf.empty());
size_t writelen = outbuf.front().first.size() - outbuf.front().second;
- Netxx::signed_size_type count = str.write(outbuf.front().first.data() + outbuf.front().second,
+ Netxx::signed_size_type count = str->write(outbuf.front().first.data() + outbuf.front().second,
std::min(writelen,
constants::bufsz));
if (count > 0)
@@ -1366,7 +1364,7 @@
outbuf.front().second += count;
}
L(F("wrote %d bytes to fd %d (peer %s)\n")
- % count % fd % peer_id);
+ % count % str->get_socketfd() % peer_id);
mark_recent_io();
if (byte_out_ticker.get() != NULL)
(*byte_out_ticker) += count;
@@ -3215,7 +3213,42 @@
}
}
+static std::string::size_type find_wordend(const std::string &address,
+ std::string::size_type begin, std::string::size_type end,
+ const std::string &breaks)
+{ for (;begin server;
+ if (address().substr(0,5)=="file:")
+ { int fd1[2],fd2[2];
+ pid_t child=pipe_and_fork(fd1,fd2);
+ if (child<0)
+ { L(F("pipe/fork failed"));
+ return;
+ }
+ else if (!child)
+ { std::string db_path=address().substr(5);
+ const unsigned newsize=64;
+ const char *newargv[newsize];
+ unsigned newargc=0;
+ newargv[newargc++]="monotone";
+ if (global_sanity.debug) newargv[newargc++]="--debug";
+ newargv[newargc++]="--db";
+ newargv[newargc++]=db_path.c_str();
+ newargv[newargc++]="--";
+ newargv[newargc++]="serve";
+ newargv[newargc++]="-";
+ for (unsigned i=0; i str(new Netxx::Stream(client.get_socketfd(),
+ timeout));
shared_ptr sess(new session(role, server_voice,
include_pattern, exclude_pattern,
app,
- lexical_cast(client),
- client.get_socketfd(), timeout));
+ lexical_cast(client), str));
sess->begin_service();
sessions.insert(make_pair(client.get_socketfd(), sess));
}
@@ -3583,7 +3694,87 @@
}
}
+static void
+serve_single_connection(//protocol_role role,
+ shared_ptr sess,
+ unsigned long timeout_seconds)
+{
+ Netxx::PipeCompatibleProbe probe;
+ Netxx::Timeout
+ forever,
+ timeout(static_cast(timeout_seconds)),
+ instant(0,1);
+
+ P(F("beginning service on %s\n") % sess->peer_id);
+
+// Netxx::StreamServer server(addr, timeout);
+ sess->begin_service();
+
+ map > sessions;
+ set armed_sessions;
+
+ sessions[sess->str.get_socketfd()]=sess;
+ PipeStream *pipe=dynamic_cast(&*sess->str);
+ if (pipe) sessions[pipe->get_writefd()]=sess;
+
+ // no addr, no server
+
+// bool live_p = true;
+ while (!sessions.empty())
+ {
+ probe.clear();
+ armed_sessions.clear();
+
+ arm_sessions_and_calculate_probe(probe, sessions, armed_sessions);
+
+ L(F("i/o probe with %d armed\n") % armed_sessions.size());
+ Netxx::Probe::result_type res = probe.ready(/*sessions.empty() ? forever */
+ : (armed_sessions.empty() ? timeout
+ : instant));
+ Netxx::Probe::ready_type event = res.second;
+ Netxx::socket_type fd = res.first;
+
+ if (fd == -1)
+ {
+ if (armed_sessions.empty())
+ L(F("timed out waiting for I/O (listening on %s : %d)\n")
+ % addr.get_name() % addr.get_port());
+ }
+
+ // an existing session woke up
+ else
+ {
+ map >::iterator i;
+ i = sessions.find(fd);
+ if (i == sessions.end())
+ {
+ L(F("got woken up for action on unknown fd %d\n") % fd);
+ }
+ else
+ {
+ shared_ptr sess = i->second;
+ bool live_p = true;
+
+ if (event & Netxx::Probe::ready_read)
+ handle_read_available(fd, sess, sessions, armed_sessions, live_p);
+
+ if (live_p && (event & Netxx::Probe::ready_write))
+ handle_write_available(fd, sess, sessions, live_p);
+
+ if (live_p && (event & Netxx::Probe::ready_oobd))
+ {
+ P(F("got some OOB data on fd %d (peer %s), disconnecting\n")
+ % fd % sess->peer_id);
+ sessions.erase(i);
+ }
+ }
+ }
+ process_armed_sessions(sessions, armed_sessions);
+ reap_dead_sessions(sessions, timeout_seconds);
+ }
+}
+
/////////////////////////////////////////////////
//
// layer 4: monotone interface layer
@@ -3765,7 +3956,15 @@
start_platform_netsync();
if (voice == server_voice)
{
- serve_connections(role, include_pattern, exclude_pattern, app,
+ if (addr==utf8("-"))
+ { shared_ptr str(new PipeStream(0,1));
+ shared_ptr sess(new session(role, server_voice,
+ include_pattern, exclude_pattern,
+ app, "stdio", str));
+ serve_single_connection(sess,constants::netsync_timeout_seconds)
+ }
+ else
+ serve_connections(role, include_pattern, exclude_pattern, app,
addr, static_cast(constants::netsync_default_port),
static_cast(constants::netsync_timeout_seconds),
static_cast(constants::netsync_connection_limit));
========================================================================
--- netxx_pipe.hh 7f13e757f0721fccd410751bdcc10f2d604de2a3
+++ netxx_pipe.hh 74bdced617276bc40eca3e34f8be2b6ca8d390a9
@@ -29,7 +29,7 @@
friend class PipeCompatibleProbe;
#endif
public:
- explicit PipeStream (int readfd, int writefd);
+ explicit PipeStream (int readfd, int writefd); // Timeout?
explicit PipeStream (const std::string &cmd, const std::vector &args);
virtual signed_size_type read (void *buffer, size_type length);
virtual signed_size_type write (const void *buffer, size_type length);