monotone-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Monotone-devel] server_request_sync vs pipe (uri)


From: Roland McGrath
Subject: [Monotone-devel] server_request_sync vs pipe (uri)
Date: Sun, 6 Jan 2008 05:14:46 -0800 (PST)

I tried a hook doing server_request_sync with a (file) uri and the mtn
serve process went into tight loop of "got woken up for action on unknown fd".
The fd it said was its fd for the pipe to the child 'mtn serve --stdio' stdin.

I think I more or less understood what was going on there and kind of fixed
it with the patch attached below.  Now it works once, so the push by
server_request_sync completes fine.  But after that, the server is stuck so
a client connecting again never gets anywhere.  I think I may be out of my
depth with poking more at this code without really trying to understand all
the libraries (not to mention C++ features).  Any help?


Thanks,
Roland


This is my test scenario:

mtn --db /tmp/test/server.mtn db init
mtn --db /tmp/test/mirror.mtn db init
mtn --debug serve --db /tmp/test/server.mtn --rcfile test.lua --bind 
localhost:9999 &
mtn sync --db /some/real/db.mtn localhost:9999

sessions = {}

function note_netsync_start(session_id, my_role, sync_type, remote_host, 
remote_keyname, includes, excludes)
   sessions[session_id] = my_role
   io.stderr:write("XXX start " .. remote_host .. " " .. my_role .. "\n")
end

function note_netsync_end(session, status, ...)
   if sessions[session] == "server" then
      server_request_sync("push", "file:///tmp/test/mirror.mtn", "*", "")
   end
end
#
# old_revision [fb5d7770cc09575641ace2932dbdedc5e9ec02dd]
#
# patch "netsync.cc"
#  from [1e875fe8ba23a5c956819d1d81b7322feff2682f]
#    to [25e27334aefa45ab627e409a05d0d532abf21fed]
# 
# patch "netxx_pipe.cc"
#  from [fd09aa9fe81d1f7d17c214b3489e23371e9aa056]
#    to [87ca9a30b2d368d8402fc9369b1b5a66efc0a0e5]
# 
# patch "netxx_pipe.hh"
#  from [873bc7d111d59db057cba5ea5f9ce31ad7608e1f]
#    to [3e118fa04de89325dfd7aafe1a970aed34861e20]
#
============================================================
--- netsync.cc  1e875fe8ba23a5c956819d1d81b7322feff2682f
+++ netsync.cc  25e27334aefa45ab627e409a05d0d532abf21fed
@@ -419,7 +419,7 @@ session:
   void note_item_sent(netcmd_item_type ty, id const & i);
 
   Netxx::Probe::ready_type which_events() const;
-  bool read_some();
+  int read_some();
   bool write_some();
 
   void error(int errcode, string const & errmsg);
@@ -1009,7 +1009,7 @@ session::which_events() const
     }
 }
 
-bool
+int
 session::read_some()
 {
   I(inbuf.size() < constants::netcmd_maxsz);
@@ -1028,10 +1028,10 @@ session::read_some()
       if (byte_in_ticker.get() != NULL)
         (*byte_in_ticker) += count;
       bytes_in += count;
-      return true;
+      return 1;
     }
   else
-    return false;
+    return count == 0 ? 0 : -1;
 }
 
 bool
@@ -2442,7 +2442,7 @@ call_server(protocol_role role,
       bool all_io_clean = (event != Netxx::Probe::ready_oobd);
 
       if (event & Netxx::Probe::ready_read)
-        all_io_clean = all_io_clean && sess.read_some();
+        all_io_clean = all_io_clean && sess.read_some() > 0;
 
       if (event & Netxx::Probe::ready_write)
         all_io_clean = all_io_clean && sess.write_some();
@@ -2530,6 +2530,25 @@ static void
 }
 
 static void
+associate_session_with_fds(shared_ptr<session> sess,
+                           map<Netxx::socket_type, shared_ptr<session> > & 
sessions)
+{
+  if (sess->str->get_socketfd() != -1)
+    sessions[sess->str->get_socketfd()] = sess;
+  else
+    {
+      // Unix pipes are non-duplex, have two filedescriptors
+      shared_ptr<Netxx::PipeStream> pipe =
+        boost::dynamic_pointer_cast<Netxx::PipeStream, 
Netxx::StreamBase>(sess->str);
+      I(static_cast<bool>(pipe));
+      I(pipe->get_writefd() != -1);
+      I(pipe->get_readfd() != -1);
+      sessions[pipe->get_readfd()] = sess;
+      sessions[pipe->get_writefd()] = sess;
+    }
+}
+
+static void
 arm_sessions_and_calculate_probe(Netxx::PipeCompatibleProbe & probe,
                                  map<Netxx::socket_type, shared_ptr<session> > 
& sessions,
                                  set<Netxx::socket_type> & armed_sessions,
@@ -2613,7 +2632,9 @@ handle_read_available(Netxx::socket_type
                       set<Netxx::socket_type> & armed_sessions,
                       bool & live_p)
 {
-  if (sess->read_some())
+  int ok = sess->read_some();
+
+  if (ok > 0)
     {
       try
         {
@@ -2644,8 +2665,12 @@ handle_read_available(Netxx::socket_type
           break;
 
         case session::confirmed_state:
-          P(F("peer %s read failed in confirmed state (success)")
-            % sess->peer_id);
+          if (ok == 0)
+            P(F("successful exchange with %s")
+              % sess->peer_id);
+          else
+            P(F("peer %s read failed in confirmed state (success)")
+              % sess->peer_id);
           break;
         }
       drop_session_associated_with_fd(sessions, fd);
@@ -2871,7 +2896,7 @@ serve_connections(protocol_role role,
                                                            inc, exc,
                                                            app, addr(), 
server, true));
 
-                      sessions.insert(make_pair(server->get_socketfd(), sess));
+                      associate_session_with_fds(sess, sessions);
                     }
                   catch (Netxx::NetworkException & e)
                     {
@@ -3011,17 +3036,7 @@ serve_single_connection(shared_ptr<sessi
   map<Netxx::socket_type, shared_ptr<session> > sessions;
   set<Netxx::socket_type> armed_sessions;
 
-  if (sess->str->get_socketfd() == -1)
-    {
-      // Unix pipes are non-duplex, have two filedescriptors
-      shared_ptr<Netxx::PipeStream> pipe =
-        boost::dynamic_pointer_cast<Netxx::PipeStream, 
Netxx::StreamBase>(sess->str);
-      I(pipe);
-      sessions[pipe->get_writefd()]=sess;
-      sessions[pipe->get_readfd()]=sess;
-    }
-  else
-    sessions[sess->str->get_socketfd()]=sess;
+  associate_session_with_fds(sess, sessions);
 
   while (!sessions.empty())
     {
============================================================
--- netxx_pipe.cc       fd09aa9fe81d1f7d17c214b3489e23371e9aa056
+++ netxx_pipe.cc       87ca9a30b2d368d8402fc9369b1b5a66efc0a0e5
@@ -539,6 +539,32 @@ Netxx::PipeCompatibleProbe::add(const St
 {
   Probe::add(ss,rt);
 }
+
+void
+Netxx::PipeCompatibleProbe::remove(PipeStream &ps)
+{
+  remove_socket(ps.get_readfd());
+  remove_socket(ps.get_writefd());
+}
+
+void
+Netxx::PipeCompatibleProbe::remove(const StreamBase &sb)
+{
+  try
+    {
+      remove(const_cast<PipeStream&>(dynamic_cast<const PipeStream&>(sb)));
+    }
+  catch (...)
+    {
+      Probe::remove(sb);
+    }
+}
+
+void
+Netxx::PipeCompatibleProbe::remove(const StreamServer &ss)
+{
+  Probe::remove(ss);
+}
 #endif
 
 #ifdef BUILD_UNIT_TESTS
============================================================
--- netxx_pipe.hh       873bc7d111d59db057cba5ea5f9ce31ad7608e1f
+++ netxx_pipe.hh       3e118fa04de89325dfd7aafe1a970aed34861e20
@@ -127,6 +127,10 @@ namespace Netxx
       void add(PipeStream &ps, ready_type rt=ready_none);
       void add(const StreamBase &sb, ready_type rt=ready_none);
       void add(const StreamServer &ss, ready_type rt=ready_none);
+
+      void remove(PipeStream &ps);
+      void remove(const StreamBase &sb);
+      void remove(const StreamServer &ss);
     };
 #endif
 

reply via email to

[Prev in Thread] Current Thread [Next in Thread]