gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r28335 - in msh: . src


From: gnunet
Subject: [GNUnet-SVN] r28335 - in msh: . src
Date: Mon, 29 Jul 2013 09:10:22 +0200

Author: harsha
Date: 2013-07-29 09:10:22 +0200 (Mon, 29 Jul 2013)
New Revision: 28335

Removed:
   msh/src/scheduler.c
   msh/src/scheduler.h
Modified:
   msh/configure.ac
   msh/src/Makefile.am
   msh/src/addressmap.c
   msh/src/mshd.c
   msh/src/mtypes.h
   msh/src/reduce.c
   msh/src/util.c
   msh/src/util.h
Log:
- use gnunetutil


Modified: msh/configure.ac
===================================================================
--- msh/configure.ac    2013-07-29 03:50:22 UTC (rev 28334)
+++ msh/configure.ac    2013-07-29 07:10:22 UTC (rev 28335)
@@ -46,39 +46,34 @@
  AC_MSG_ERROR([MSH requires MPI libraries])
 fi
 
-# test for libevent
-libevent=0
-AC_MSG_CHECKING(for libevent)
-AC_ARG_WITH(libevent,
-   [AS_HELP_STRING([--with-libevent=PFX],
-       [base of libevent installation])],
-   [AC_MSG_RESULT([$with_libevent])
-    case $with_libevent in
+# test for libgnunetutil
+libgnunetutil=0
+AC_MSG_CHECKING(for libgnunetutil)
+AC_ARG_WITH(gnunet,
+   [AS_HELP_STRING([--with-gnunet=PFX], [base of gnunet installation])],
+   [AC_MSG_RESULT([$with_gnunet])
+    case $with_gnunet in
       no)
         ;;
       yes)
-        AC_CHECK_HEADERS([event2/event.h], [AC_CHECK_LIB([event], 
[event_base_new], libevent=1)])
+        AC_CHECK_HEADERS([gnunet/gnunet_util_lib.h], 
[AC_CHECK_LIB([gnunetutil], [GNUNET_SCHEDULER_run], libgnunetutil=1)])
         ;;
       *)
-        SAVE_LDFLAGS=$LDFLAGS
-        SAVE_CPPFLAGS=$CPPFLAGS
-        LDFLAGS="-L$with_libevent/lib $LDFLAGS"
-        CPPFLAGS="-I$with_libevent/include $CPPFLAGS"
-        AC_CHECK_HEADERS([event2/event.h],
-          [AC_CHECK_LIB([event], [event_base_new],
-            [LIBEVENT_LDFLAGS="-L$with_libevent/lib"
-             LIBEVENT_CPPFLAGS="-I$with_libevent/include"
-             libevent=1])])
-        LDFLAGS=$SAVE_LDFLAGS
-        CPPFLAGS=$SAVE_CPPFLAGS
+        LDFLAGS="-L$with_gnunet/lib $LDFLAGS"
+        CPPFLAGS="-I$with_gnunet/include $CPPFLAGS"
+        AC_CHECK_HEADERS([gnunet/gnunet_util_lib.h],
+          [AC_CHECK_LIB([gnunetutil], [GNUNET_SCHEDULER_run],
+            [GNUNET_LDFLAGS="-L$with_gnunet/lib"
+             GNUNET_CPPFLAGS="-I$with_gnunet/include"
+             libgnunetutil=1])])
         ;;
     esac
    ],
-   [AC_MSG_RESULT([--with-libevent not specified])
-    AC_CHECK_HEADERS([event2/event.h], [AC_CHECK_LIB([event], 
[event_base_new], libevent=1)])])
-if test "$libevent" != 1
+   [AC_MSG_RESULT([--with-gnunet not specified])
+    AC_CHECK_HEADERS([gnunet/gnunet_util_lib.h], [AC_CHECK_LIB([gnunetutil], 
[GNUNET_SCHEDULER_run], libgnunetutil=1)])])
+if test "$libgnunetutil" != 1
 then
- AC_MSG_ERROR([MSH requires libevent])
+ AC_MSG_ERROR([MSH requires gnunet])
 fi
 
 dnl have all messages as of now

Modified: msh/src/Makefile.am
===================================================================
--- msh/src/Makefile.am 2013-07-29 03:50:22 UTC (rev 28334)
+++ msh/src/Makefile.am 2013-07-29 07:10:22 UTC (rev 28335)
@@ -2,30 +2,14 @@
 
 mping_SOURCES = mping.c
 
-mshd_SOURCES = mshd.c mshd.h util.c util.h scheduler.c scheduler.h \
+mshd_SOURCES = mshd.c mshd.h util.c util.h \
   common.h bitmap.c bitmap.h addressmap.c addressmap.h reduce.h reduce.c
-mshd_LDADD = -lgnunetutil -levent -lm
-mshd_CPPFLAGS = $(LIBEVENT_CPPFLAGS)
-mshd_LDFLAGS =  $(LIBEVENT_LDFLAGS)
+mshd_LDADD = -lgnunetutil -lm
 
 check_PROGRAMS = \
-  test-scheduler \
-  test-scheduler-socket \
   test-bitmap \
   test-addressmap
 
-test_scheduler_SOURCES = test_scheduler.c scheduler.c scheduler.h common.h \
-       util.c util.h common.h
-test_scheduler_LDADD = -lgnunetutil -levent
-test_scheduler_CPPFLAGS = $(LIBEVENT_CPPFLAGS)
-test_scheduler_LDFLAGS = $(LIBEVENT_LDFLAGS)
-
-test_scheduler_socket_SOURCES = test_scheduler_socket.c scheduler.c 
scheduler.h \
-       common.h util.c util.h
-test_scheduler_socket_LDADD = -lgnunetutil -levent
-test_scheduler_socket_CPPFLAGS = $(LIBEVENT_CPPFLAGS)
-test_scheduler_socket_LDFLAGS = $(LIBEVENT_LDFLAGS)
-
 test_bitmap_SOURCES = test_bitmap.c bitmap.c bitmap.h
 test_bitmap_LDADD = -lgnunetutil
 
@@ -38,6 +22,3 @@
   test-bitmap \
   test-addressmap
 
-noinst_PROGRAMS = test
-test_SOURCES = test.c
-test_LDADD = -lgnunetutil
\ No newline at end of file

Modified: msh/src/addressmap.c
===================================================================
--- msh/src/addressmap.c        2013-07-29 03:50:22 UTC (rev 28334)
+++ msh/src/addressmap.c        2013-07-29 07:10:22 UTC (rev 28335)
@@ -75,7 +75,7 @@
  * Get the 32bit IP addresses from an instance address
  */
 #define instance_address_ip(iaddr) \
-  ((uint32_t) iaddr->ip)
+  (NULL == iaddr ? 0 : ((uint32_t) iaddr->ip))
 
 
 /**
@@ -323,11 +323,17 @@
   old_ia = old->addr_head;
   new_ia = new->addr_head;
   n = 0;
+  LOG_DEBUG ("Intersecting %u old address with %u new addresses for instance 
%u\n",
+             old->naddrs, new->naddrs, rank);
   while (NULL != old_ia)
   {
     if ((NULL == new_ia) 
         || (instance_address_ip (old_ia) < instance_address_ip (new_ia)) )
     {
+      LOG_DEBUG ("Removing old address %u < new address %u\n",
+                 instance_address_ip (old_ia), instance_address_ip (new_ia));
+      LOG_DEBUG ("\t old address: %s\n", ip2str (instance_address_ip(old_ia)));
+      LOG_DEBUG ("\t new address: %s\n", ip2str (instance_address_ip(new_ia)));
       tmp = old_ia->next;
       GNUNET_CONTAINER_DLL_remove (old->addr_head, old->addr_tail, old_ia);
       free (old_ia);
@@ -345,6 +351,8 @@
     new_ia = new_ia->next;
     n++;
   }
+  LOG_DEBUG ("Number of addresses for instance %u after intersection: %u\n",
+             rank, n);
   return n;
 }
 
@@ -472,7 +480,8 @@
     for (nip = 0; nip < iainfo->naddrs; nip++, iaddr = iaddr->next)
     {
       GNUNET_assert (NULL != iaddr);
-      _iaddr_msgs[cnt]->ipaddrs[nip] = instance_address_ip (iaddr);
+      GNUNET_assert (0 != htonl (instance_address_ip (iaddr)));
+      _iaddr_msgs[cnt]->ipaddrs[nip] = htonl (instance_address_ip (iaddr));    
  
     }
   }
   *iaddr_msgs = _iaddr_msgs;
@@ -519,7 +528,7 @@
   for (cnt = 0; cnt < n; cnt++)
   {
     iaddr = instance_address_create_sockaddr_in 
-        (0, (in_addr_t) iaddr_msg->ipaddrs[cnt]);
+        (0, (in_addr_t) ntohl (iaddr_msg->ipaddrs[cnt]));
     instance_address_info_add_address (iainfo, iaddr);
   }
   n = addressmap_intersect (m, iainfo);

Modified: msh/src/mshd.c
===================================================================
--- msh/src/mshd.c      2013-07-29 03:50:22 UTC (rev 28334)
+++ msh/src/mshd.c      2013-07-29 07:10:22 UTC (rev 28335)
@@ -2,7 +2,6 @@
 #include <gnunet/gnunet_util_lib.h>
 #include <mpi.h>
 #include "util.h"
-#include "scheduler.h"
 #include "mtypes.h"
 #include "bitmap.h"
 #include "addressmap.h"
@@ -38,46 +37,66 @@
   struct InstanceAddrInfo *iainfo;
 
   /**
-   * The socket open handle to the instance address
+   * The connection handle to the received instance address
    */
-  struct SocketOpenHandle *soh;
+  struct GNUNET_CONNECTION_Handle *conn;
 
   /**
-   * close task handle
+   * The transmit handle for the above connection
    */
-  struct Task *close_task;
+  struct GNUNET_CONNECTION_TransmitHandle *transmit_handle;
 
   /**
-   * the port number
+   * task to close the connection
    */
-  uint16_t port;
+  GNUNET_SCHEDULER_TaskIdentifier close_task;
 
   /**
+   * state for the context 
+   */
+  enum {
+    VERIFY_ADDRESS_CTX_WRITE,
+
+    VERIFY_ADDRESS_CTX_CLOSE
+  } state;
+
+  /**
    * the ip address
    */
   in_addr_t ip;
 
   /**
-   * The socket file descriptor associated with the connection used to verify
-   * the address 
+   * the port number
    */
-  int sock;
+  uint16_t port;
+
 };
 
 
 struct ReadContext
 {
+  /**
+   * next pointer for DLL
+   */
   struct ReadContext *next;
 
+  /**
+   * prev pointer for DLL
+   */
   struct ReadContext *prev;
 
-  /* struct sockaddr_in addr; */
-  
-  /* socklen_t addrlen; */
-  
-  struct Task *task;
+  /**
+   * The connection
+   */
+  struct GNUNET_CONNECTION_Handle *conn;
+
+  /**
+   * are we waiting for a read on the above connection
+   */
+  int in_receive;
 };
 
+
 /**
  * Mapping for instance addresses
  */
@@ -115,31 +134,51 @@
 static struct VerifyAddressesCtx *vactx_tail;
 
 /**
- * Task for finalising a round
+ * Array of our IP addresses in network-byte format
  */
-static struct Task *finalise_task;
+static in_addr_t *s_addrs;
 
 /**
- * Array of our IP addresses in network-byte format
+ * Signal handler for SIGINT
  */
-static in_addr_t *s_addrs;
+static struct GNUNET_SIGNAL_Context *shc_int;
 
 /**
- * Tasks for handling SIGINT and SIGTERM
+ * Signal handler for SIGTERM
  */
-static struct Task *sigshut_tasks[2];
+static struct GNUNET_SIGNAL_Context *shc_term;
 
 /**
+ * Pipe used to communicate shutdown via signal.
+ */
+static struct GNUNET_DISK_PipeHandle *sigpipe;
+
+/**
+ * network handle for the listen socket
+ */
+static struct GNUNET_NETWORK_Handle *listen_socket;
+
+/**
  * Task for running a round
  */
-static struct Task *rtask;
+static GNUNET_SCHEDULER_TaskIdentifier rtask;
 
 /**
  * Task for asynchronous accept on the socket
  */
-static struct Task *atask;
+static GNUNET_SCHEDULER_TaskIdentifier atask;
 
 /**
+ * Task for finalising a round
+ */
+static GNUNET_SCHEDULER_TaskIdentifier finalise_task;
+
+/**
+ * Task for waiting for a shutdown signal
+ */
+static GNUNET_SCHEDULER_TaskIdentifier sigread_task;
+
+/**
  * Bitmap for checking which MPI processes have verified our addresses in the
  * current round
  */
@@ -161,11 +200,6 @@
 static struct ReadContext *rtail;
 
 /**
- * The listen socket for the current round
- */
-static int listen_sock;
-
-/**
  * Number of IP addresses
  */
 static unsigned int nips;
@@ -178,7 +212,7 @@
 /**
  * The port number of our local socket
  */
-uint16_t lport;
+uint16_t listen_port;
 
 
 static char *
@@ -196,7 +230,7 @@
 }
 
 
-static char *
+char *
 ip2str (const in_addr_t ip)
 {
   static char hostip[NI_MAXHOST];
@@ -223,7 +257,7 @@
  * @return GNUNET_OK to continue iteration, GNUNET_SYSERR to abort
  */
 static int net_if_processor (void *cls, const char *name,
-                               int isDefault,
+                             int isDefault,
                              const struct sockaddr *addr,
                              const struct sockaddr *broadcast_addr,
                              const struct sockaddr *netmask, 
@@ -234,108 +268,104 @@
 
   if (sizeof (struct sockaddr_in) != addrlen)
     return GNUNET_OK;           /* Only consider IPv4 for now */
-  hostip = saddr2str (addr, addrlen);
-  if (NULL == hostip)
-    return GNUNET_OK;
   inaddr = (const struct sockaddr_in *) addr;
-  GNUNET_array_append (s_addrs, nips, inaddr->sin_addr.s_addr);
-  LOG_DEBUG ("%d: Found IP: %s\n", rank, hostip);
+  GNUNET_array_append (s_addrs, nips, ntohl (inaddr->sin_addr.s_addr));
+  LOG_DEBUG ("%d: Found IP: %s\n", rank, 
+             ip2str (ntohl (inaddr->sin_addr.s_addr)));
+  addressmap_add (addrmap, rank, listen_port,
+                  ntohl (inaddr->sin_addr.s_addr));
   return GNUNET_OK;
 }
 
 
 /**
- * Task to read from socket
- *
- * @param sock the socket
- * @param flags EV_* flags
- * @param cls &atask
+ * Callback function for data received from the network.  Note that
+ * both "available" and "err" would be 0 if the read simply timed out.
+ *inaddr->sin_addr.s_addrinaddr->sin_addr.s_addr
+ * @param cls the read context
+ * @param buf pointer to received data
+ * @param available number of bytes availabe in "buf",
+ *        possibly 0 (on errors)
+ * @param addr address of the sender
+ * @param addrlen size of addr
+ * @param errCode value of errno (on receiving errors)
  */
 static void
-read_socket (evutil_socket_t sock, short flags, void *cls)
+conn_reader(void *cls, const void *buf, size_t available,
+            const struct sockaddr * addr, socklen_t addrlen, int errCode)
 {
-  struct ReadContext *ctx = cls;
-  ssize_t rsize;
+  struct ReadContext *rc = cls;
   uint32_t cid;
 
-  scheduler_remove (ctx->task);
-  GNUNET_CONTAINER_DLL_remove (rhead, rtail, ctx);
-  free (ctx);
-  if (IS_SHUTDOWN_EVENT (flags))
+  if (0 == available)
   {
-    MSH_close (sock);
-    return;
-  }
-  rsize = read (sock, &cid, sizeof (uint32_t));
-  if (rsize < 0)
-  {
-    LOG_STRERROR (GNUNET_ERROR_TYPE_ERROR, "read");  
-    goto err_ret;
-  }
-  if (rsize == 0)
-  {
     GNUNET_break (0);
-    goto err_ret;
+    goto clo_ret;
   }
+  if ((NULL == buf) || (0 == available))
+    goto clo_ret;
+  (void) memcpy (&cid, buf, sizeof (uint32_t));
   cid = ntohl (cid);
   LOG_DEBUG ("%d: read id %u from connection\n", rank, cid);
-  /* if (!barray_isset (cid)) */
-  /*   barray_set (cid); */
-  MSH_close (sock);
-  return;
-  
- err_ret:
-  MSH_close (sock);
-  scheduler_shutdown ();
-  return;
+
+ clo_ret:
+  GNUNET_CONTAINER_DLL_remove (rhead, rtail, rc);
+  GNUNET_CONNECTION_destroy (rc->conn);
+  GNUNET_free (rc);
 }
 
 
 /**
  * Task to call accept and close on a listening socket
  *
- * @param sock the socket
- * @param flags EV_* flags
- * @param cls &atask
+ * @param cls NULL
+ * @param tc the scheduler task context
  */
 static void
-accept_task (evutil_socket_t sock, short flags, void *cls)
+accept_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
 {
   struct ReadContext *rctx;
+  struct GNUNET_CONNECTION_Handle *conn;
   int csock;
 
-  scheduler_remove (atask);
-  atask = NULL;
-  if (IS_SHUTDOWN_EVENT (flags))
+  atask = GNUNET_SCHEDULER_NO_TASK;
+  if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
   {
-    (void) close (sock);
-    return;
+    GNUNET_break (0);
+    goto clo_ret;
   }
   LOG_DEBUG ("%d: Got a connect\n", rank);
-  if (0 > (csock = accept4 (sock, NULL, NULL, SOCK_NONBLOCK | SOCK_CLOEXEC)))
+  conn = GNUNET_CONNECTION_create_from_accept (NULL, NULL, listen_socket);
+  if (NULL == conn)
   {
-    LOG_STRERROR (GNUNET_ERROR_TYPE_ERROR, "accept4");
-    MSH_close (sock);
-    scheduler_shutdown ();
-    return;
+    GNUNET_break (0);
+    goto clo_ret;
   }
   rctx = GNUNET_malloc (sizeof (struct ReadContext));
+  rctx->conn = conn;
+  rctx->in_receive = GNUNET_YES;
+  GNUNET_CONNECTION_receive (rctx->conn, sizeof (unsigned int),
+                             GNUNET_TIME_UNIT_FOREVER_REL, conn_reader, rctx);
   GNUNET_CONTAINER_DLL_insert_tail (rhead, rtail, rctx);
-  rctx->task = scheduler_add_socket (csock, EV_READ, &read_socket, rctx, NULL);
   /* resume accepting connections on the listen sock */
-  atask = scheduler_add_socket (sock, EV_READ, &accept_task, &atask, NULL);
+  atask = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL,
+                                         listen_socket, &accept_task, NULL);
+  return;
+
+ clo_ret:
+  GNUNET_break (GNUNET_OK == GNUNET_NETWORK_socket_close (listen_socket));
+  listen_socket = NULL;
 }
 
 
 /**
  * Task for running a round
  *
- * @param nosock we have no sockets associated with this callback
- * @param flags EV_* flags
  * @param cls NULL
+ * @param tc scheduler task context
  */
 static void
-run_round (evutil_socket_t nosock, short flags, void *cls);
+run_round (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
 
 
 /**
@@ -346,13 +376,13 @@
 {
   int total_rounds;
 
-  GNUNET_assert (NULL == rtask);
+  GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == rtask);
   /* Number of rounds required to contact all processes except ourselves 
(rwidth
      in parallel in each round) */
   total_rounds = ((nproc - 1) + (rwidth - 1)) / rwidth;
   if (current_round < total_rounds)
   {
-    rtask = scheduler_add (&run_round, NULL, TV_IMMEDIATE);
+    rtask = GNUNET_SCHEDULER_add_now (&run_round, NULL);
     return;
   }
   LOG_DEBUG ("Verification phase complete; commencing reduction phase\n");
@@ -361,44 +391,45 @@
 
 
 /**
+ * Cleans up the address verification context
+ *
+ * @param ctx the context
+ */
+static void
+cleanup_verifiyaddressctx (struct VerifyAddressesCtx *ctx)
+{
+  if (GNUNET_SCHEDULER_NO_TASK != ctx->close_task)
+    GNUNET_SCHEDULER_cancel (ctx->close_task);
+  GNUNET_CONTAINER_DLL_remove (vactx_head, vactx_tail, ctx);  
+  GNUNET_free (ctx);  
+}
+
+
+/**
  * Callback triggered to finalise a round
  *
- * @param sock -1 do not use this
- * @param flags EV_* flags
- * @param cls
+ * @param cls NULL
+ * @param tc scheduler task context
  */
 static void
-finalise_round (evutil_socket_t sock, short flags, void *cls)
+finalise_round (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
 {
   struct VerifyAddressesCtx *ctx;
   unsigned int cnt;
 
-  scheduler_remove (finalise_task);
-  finalise_task = NULL;
+  finalise_task = GNUNET_SCHEDULER_NO_TASK;
+  GNUNET_SCHEDULER_cancel (atask);
+  atask = GNUNET_SCHEDULER_NO_TASK;
   while (NULL != (ctx = vactx_head))
   {
-    if (NULL != ctx->soh)
-      scheduler_open_socket_cancel (ctx->soh);
-    if (NULL != ctx->close_task)
-    {
-      MSH_close (ctx->sock);
-      scheduler_remove (ctx->close_task);
-    }
-    GNUNET_CONTAINER_DLL_remove (vactx_head, vactx_tail, ctx);
-    free (ctx);
+    cleanup_verifiyaddressctx (ctx);
   }
   for (cnt = 0; cnt < rwidth; cnt++)
     instance_address_info_destroy (riainfos[cnt]);
-  if (IS_SHUTDOWN_EVENT (flags))
-    return;
-  MSH_close (listen_sock);
-  listen_sock = -1;
-  scheduler_remove (atask);
-  atask = NULL;
   if (1 != bitmap_allset (bitmap))
   {
     LOG_ERROR ("Could not verify addresses of all hosts\n");
-    scheduler_shutdown ();
+    GNUNET_SCHEDULER_shutdown ();
     return;
   }
   current_round++;
@@ -407,36 +438,21 @@
 
 
 /**
- * Callback triggered when the data on the sock is written and the socket is
- * available for writing again.  We close the associated socket in this 
callback.
+ * Task for closing a connection
  *
- * @param sock the socket file descriptor
- * @param flags EV_* flags
- * @param cls context for verifying addresses
+ * @param cls the verify address context
+ * @param tc the scheduler task context
  */
 static void
-socket_write_cb (evutil_socket_t sock, short flags, void *cls)
+conn_close_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
 {
   struct VerifyAddressesCtx *ctx = cls;
   int lb;
   int source;
   int off;
 
-  scheduler_remove (ctx->close_task);
-  ctx->close_task = NULL;
-  if (ctx->sock == sock)
-    MSH_close (sock);
-  else if (-1 == sock)
-    GNUNET_break (0);
-  if (IS_SHUTDOWN_EVENT (flags))
-  {
-    GNUNET_CONTAINER_DLL_remove (vactx_head, vactx_tail, ctx);
-    free (ctx);
-    return;
-  }
-  /* FIXME: add the addresses associated with the contex to the mapping */
-  
-  lb = rank - current_round * rwidth - rwidth + nproc;
+  ctx->close_task = GNUNET_SCHEDULER_NO_TASK;
+  lb = rank - (current_round * rwidth) - rwidth + nproc;
   GNUNET_assert (0 <= lb);
   lb %= nproc;
   source = instance_address_info_get_rank (ctx->iainfo);
@@ -447,49 +463,66 @@
   bitmap_set (bitmap, off, 1);
   addressmap_add (addrmap, instance_address_info_get_rank (ctx->iainfo),
                   ctx->port, ctx->ip);
-  return;
+  cleanup_verifiyaddressctx (ctx);
 }
 
 
 /**
- * Callback triggered when a socket connection is ready to be written to
+ * Function called to notify a client about the connection
+ * begin ready to queue more data.  "buf" will be
+ * NULL and "size" zero if the connection was closed for
+ * writing in the meantime.
  *
- * @param sockfd the file descriptor of the socket which is ready to be written
- *          to
- * @param cls context information for verifying an instance address
+ * @param cls closure
+ * @param size number of bytes available in buf
+ * @param buf where the callee should write the message
+ * @return number of bytes written to buf
  */
-static void 
-socket_open_cb (int sockfd, void *cls)
+static size_t 
+conn_write_cb (void *cls, size_t size, void *buf)
 {
   struct VerifyAddressesCtx *ctx = cls;
-  uint32_t id;
+  size_t rsize;
+  uint32_t rank_;
 
-  ctx->soh = NULL;
-  if (-1 == sockfd)
+  ctx->transmit_handle = NULL;
+  rsize = 0;
+  if ((NULL == buf) || (0 == size))
   {
+    goto clo_ret;
+  }
+  if (size < sizeof (uint32_t))
+  {
     GNUNET_break (0);
-    /* FIXME: Check if we already got a mapping for the instance */
-    goto err_ret;
+    goto clo_ret;
   }
-  LOG_DEBUG ("%d: Opened a connection to %s:%u\n", rank, 
-             ip2str (ctx->ip), ctx->port);
-  ctx->sock = sockfd;
-  id = htonl ((uint32_t) rank);
-  if (sizeof (uint32_t) != write (sockfd, &id, sizeof (uint32_t)))
+  switch (ctx->state)
   {
-    GNUNET_break (0);  /* FIXME: handle error */
-    MSH_close (sockfd);
-    goto err_ret;
+  case VERIFY_ADDRESS_CTX_WRITE:
+    rank_ = htonl (rank);
+    rsize = sizeof (uint32_t);
+    (void) memcpy (buf, &rank_, rsize);
+    ctx->transmit_handle =
+        GNUNET_CONNECTION_notify_transmit_ready (ctx->conn, 0,
+                                                 GNUNET_TIME_UNIT_FOREVER_REL,
+                                                 &conn_write_cb, ctx);
+    ctx->state = VERIFY_ADDRESS_CTX_CLOSE;
+    return rsize;
+  case VERIFY_ADDRESS_CTX_CLOSE:
+    ctx->close_task = GNUNET_SCHEDULER_add_now (&conn_close_task, ctx);
+    return 0;
+  default:
+    GNUNET_assert (0);
   }
-  ctx->close_task = 
-      scheduler_add_socket (sockfd, EV_WRITE, &socket_write_cb, ctx, NULL);
-  return;
 
- err_ret:
+ clo_ret:
   GNUNET_CONTAINER_DLL_remove (vactx_head, vactx_tail, ctx);
-  free (ctx);
+  GNUNET_CONNECTION_destroy (ctx->conn);
+  GNUNET_free (ctx);
+  return size;
 }
 
+
 static unsigned int bmx;
 
 static int
@@ -504,20 +537,26 @@
   in_addr.sin_port = htons (port);
   in_addr.sin_addr.s_addr = htonl ((uint32_t) ip);
   ctx = GNUNET_malloc (sizeof (struct VerifyAddressesCtx));
-  ctx->soh = scheduler_open_socket ((struct sockaddr *) &in_addr,
-                                    sizeof (struct sockaddr_in),
-                                    &socket_open_cb, ctx);
-  ctx->port = port;
-  ctx->ip = ip;
-  ctx->sock = -1;
-  ctx->iainfo = iainfo;
-  if (NULL == ctx->soh)
+  ctx->conn = 
+      GNUNET_CONNECTION_create_from_sockaddr (AF_INET, 
+                                              (const struct sockaddr *)
+                                              &in_addr, 
+                                              sizeof (struct sockaddr_in));
+  if (NULL == ctx->conn)
   {
     GNUNET_break (0);
     free (ctx);
     return GNUNET_SYSERR;
   }
+  ctx->port = port;
+  ctx->ip = ip;
+  ctx->iainfo = iainfo;
+  ctx->state = VERIFY_ADDRESS_CTX_WRITE;
   GNUNET_CONTAINER_DLL_insert_tail (vactx_head, vactx_tail, ctx);
+  ctx->transmit_handle = 
+      GNUNET_CONNECTION_notify_transmit_ready (ctx->conn, sizeof (uint32_t),
+                                               GNUNET_TIME_UNIT_FOREVER_REL,
+                                               &conn_write_cb, ctx);
   return GNUNET_OK;
 }
 
@@ -539,8 +578,8 @@
   
   bmx = 0;
   if (GNUNET_OK != instance_address_info_iterate_addresses (iainfo,
-                                                         &address_iterator_cb,
-                                                         iainfo))
+                                                            
&address_iterator_cb,
+                                                            iainfo))
     return GNUNET_SYSERR;
   return GNUNET_OK;
 }
@@ -679,11 +718,11 @@
   msize = sizeof (struct MSH_MSG_VerifyAddress) + (nips * sizeof (uint32_t));
   msg = GNUNET_malloc (msize);
   msg->header.size = htons (msize);
-  msg->port = htons (lport);
+  msg->port = htons (listen_port);
   msg->nips = htons (nips);
   for (cnt = 0; cnt < nips; cnt++)
   {    
-    msg->ipaddrs[cnt] = (uint32_t) s_addrs[cnt]; /* IPs already in NB */
+    msg->ipaddrs[cnt] = htonl ((uint32_t) s_addrs[cnt]);
   }
   width = rwidth;  
   if ( (0 != ( (nproc - 1) % rwidth)) && (current_round == ( (nproc - 1) / 
rwidth)) )
@@ -744,49 +783,30 @@
 static int
 run_round_ ()
 {
-  struct sockaddr_in addr;
-  struct timeval tv;
-  socklen_t addrlen;
-  int sock;
   unsigned int cnt;
-
-  addrlen = sizeof (struct sockaddr_in);
-  (void) memset (&addr, 0, addrlen);
-  sock = open_listen_socket ((struct sockaddr *) &addr, addrlen, rwidth);
-  if (-1 == sock)
-    return GNUNET_SYSERR;
-  lport = ntohs (addr.sin_port);
-  if (0 == lport)
-  {
-    GNUNET_break (0);
-    goto clo_ret;
-  }
+  
   if (MPI_SUCCESS != MPI_Barrier (MPI_COMM_WORLD))
   {
     GNUNET_break (0);
-    goto clo_ret;
+    return GNUNET_SYSERR;
   }
   if (GNUNET_SYSERR == send_addresses ())
-    goto clo_ret;
+    return GNUNET_SYSERR;
   if (NULL == (riainfos = receive_addresses ()))
-    goto clo_ret;
-  atask = scheduler_add_socket (sock, EV_READ, &accept_task, &atask, NULL);
+    return GNUNET_SYSERR;
+  atask = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL,
+                                         listen_socket, &accept_task, NULL);
+
   if (MPI_SUCCESS != MPI_Barrier (MPI_COMM_WORLD))
   {
     GNUNET_break (0);
-    goto clo_ret;
+    return GNUNET_SYSERR;
   }
-  tv.tv_sec = 1;
-  tv.tv_usec = 0;
   for (cnt = 0; cnt < rwidth; cnt++)
     verify_addresses (riainfos[cnt]);
-  listen_sock = sock;
-  finalise_task = scheduler_add (&finalise_round, NULL, &tv);
+  finalise_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS, 
+                                                &finalise_round, NULL);
   return GNUNET_OK;
-
- clo_ret:
-  (void) close (sock);
-  return GNUNET_SYSERR;
 }
 
 
@@ -798,75 +818,109 @@
  * @param cls pointer to the corresponding Task
  */
 static void
-sig_shutdown (evutil_socket_t signal, short flags, void *cls)
+sighandler_shutdown ()
 {
-  struct Task **task = cls;
-  unsigned int cnt;
+  static char c;
+  int old_errno;       /* back-up errno */
 
-  scheduler_remove (*task);
-  *task = NULL;
-  if (IS_SHUTDOWN_EVENT (flags))
-    return;
-  LOG_DEBUG ("Got signal %d.  Exiting.\n", signal);
-  scheduler_shutdown ();
+  old_errno = errno;
+  GNUNET_break (1 ==
+                GNUNET_DISK_file_write (GNUNET_DISK_pipe_handle
+                                        (sigpipe, GNUNET_DISK_PIPE_END_WRITE),
+                                        &c, sizeof (c)));
+  errno = old_errno;
 }
 
 
 /**
  * Task for running a round
  *
- * @param nosock we have no sockets associated with this callback
- * @param flags EV_* flags
  * @param cls NULL
+ * @param tc scheduler task context
  */
 static void
-run_round (evutil_socket_t nosock, short flags, void *cls)
+run_round (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
 {
-  scheduler_remove (rtask);
-  rtask = NULL;
-  if (IS_SHUTDOWN_EVENT (flags))
-    return;
+  rtask = GNUNET_SCHEDULER_NO_TASK;
   if (GNUNET_OK != run_round_ ())
-    scheduler_shutdown ();
+    GNUNET_SCHEDULER_shutdown ();
 }
 
 
 /**
- * Event callback for the first running task
+ * Function called whenever a signal is written to the signal pipe
  *
- * @param nosock we have no sockets associated with this callback
- * @param flags EV_* flags
  * @param cls NULL
+ * @param tc scheduler task context
  */
 static void
-run (evutil_socket_t nosock, short flags, void *cls)
+sigpipe_read (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
 {
-  LOG_DEBUG ("Running main task\n");  
-  sigshut_tasks[0] = scheduler_add_signal (SIGINT, &sig_shutdown,
-                                           &sigshut_tasks[0], NULL);
-  sigshut_tasks[1] = scheduler_add_signal (SIGTERM, &sig_shutdown,
-                                           &sigshut_tasks[1], NULL);
-  //rtask = scheduler_add (&run_round, NULL, TV_IMMEDIATE);
-  schedule_next_round ();
+  const struct GNUNET_DISK_FileHandle *pr;
+  char c[16];
+
+  sigread_task = GNUNET_SCHEDULER_NO_TASK;
+  if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
+    return;
+  pr = GNUNET_DISK_pipe_handle (sigpipe, GNUNET_DISK_PIPE_END_READ);
+  GNUNET_break (0 < GNUNET_DISK_file_read (pr, &c, sizeof (c)));
+  LOG_DEBUG ("Caught signal.  Exiting.\n");
+  GNUNET_SCHEDULER_shutdown ();
 }
 
 
 /**
- * Prints help message for this program
+ * Main function that will be run.
+ *
+ * @param cls closure
+ * @param args remaining command-line arguments
+ * @param cfgfile name of the configuration file used (for saving, can be 
NULL!)
+ * @param cfg configuration
  */
-static void
-print_help ()
+static void 
+run (void *cls, char *const *args, const char *cfgfile,
+     const struct GNUNET_CONFIGURATION_Handle *cfg)
 {
-  char *msg = 
-"mshd: MSH daemon.\n" 
-"This binary is a part of "PACKAGE_NAME "-" PACKAGE_VERSION " available from " 
PACKAGE_URL ".\n"
-"This program takes the following options:\n"
-" -w num\t: \t The number of processes which verify at each round.\n"
-" -h \t: \t Print this help\n"
-"Report bugs to " PACKAGE_BUGREPORT "\n"
-      ;
-  
-  fprintf (stderr, "%s", msg);
+  const struct GNUNET_DISK_FileHandle *fh;
+  struct sockaddr_in addr;
+  socklen_t addrlen;
+  unsigned int cnt;
+
+  LOG_DEBUG ("Running main task\n");
+  if (0 == rwidth)
+  {
+    LOG_ERROR ("Round width cannot be 0.  Exiting\n");
+    return;
+  }
+  bitmap = bitmap_create (rwidth);
+  addrmap = addressmap_create (nproc);
+  fh = GNUNET_DISK_pipe_handle (sigpipe, GNUNET_DISK_PIPE_END_READ);
+  sigread_task = 
+      GNUNET_SCHEDULER_add_read_file (GNUNET_TIME_UNIT_FOREVER_REL, fh,
+                                      &sigpipe_read, NULL);
+  addrlen = sizeof (struct sockaddr_in);
+  (void) memset (&addr, 0, addrlen);
+  listen_socket = open_listen_socket ((struct sockaddr *) &addr, addrlen, 
rwidth);
+  listen_port = ntohs (addr.sin_port);
+  if (NULL == listen_socket)
+    return;
+  if (0 == listen_port)
+  {
+    GNUNET_break (0);
+    goto clo_ret;
+  }  
+  GNUNET_OS_network_interfaces_list (&net_if_processor, NULL);
+  if (0 == nips)
+  {
+    LOG_ERROR ("No IP addresses found\n");
+    return;
+  }
+  schedule_next_round ();
+  return;
+
+ clo_ret:
+  GNUNET_break (GNUNET_OK == GNUNET_NETWORK_socket_close (listen_socket));
+  listen_socket = NULL;
 }
 
 
@@ -880,51 +934,36 @@
 int 
 main (int argc, char **argv)
 {
-  extern char *optarg;
+  static const struct GNUNET_GETOPT_CommandLineOption options[] = {
+    {'w', "round-width", "COUNT",
+     "set the size of each round to COUNT",
+     GNUNET_YES, &GNUNET_GETOPT_set_uint, &rwidth},
+    GNUNET_GETOPT_OPTION_END
+  };
   int ret;
   int c;
+
   ret = 1;
-  rwidth = 1;
-  
-  listen_sock = -1;
-  GNUNET_log_setup ("mshd", "DEBUG", NULL);
-  while (-1 != (c = getopt (argc, argv, "hw:")))
+  rwidth = 1;  
+  if (GNUNET_OK != GNUNET_STRINGS_get_utf8_args (argc, argv, 
+                                                 &argc, (char *const **) 
&argv))
   {
-    switch (c)
-    {
-    case 'w':
-      if (NULL == optarg)
-      {
-        LOG_ERROR ("Argument is NULL\n");
-        return 1;
-      }
-      if (1 != sscanf (optarg, "%u", &rwidth))
-      {
-        LOG_ERROR ("-w option requires an unsinged number argument.\n");
-        print_help ();
-        return 1;
-      }
-      if (0 == rwidth)
-      {
-        LOG_ERROR ("Round width cannot be 0\n");
-        return 1;
-      }
-      break;
-    case 'h':
-      print_help ();
-      return 0;
-    case '?':
-      print_help();
-      return 1;
-    default:
-      printf ("Unknown option: %c\n", c);
-      GNUNET_assert (0);
-    }
+    GNUNET_break (0);
+    return 2; 
   }
+  if (NULL == (sigpipe = GNUNET_DISK_pipe (GNUNET_NO, GNUNET_NO, 
+                                           GNUNET_NO, GNUNET_NO)))
+  {
+    GNUNET_break (0);
+    ret = GNUNET_SYSERR;
+    return 1;
+  }
+  shc_int = GNUNET_SIGNAL_handler_install (SIGINT, &sighandler_shutdown);
+  shc_term = GNUNET_SIGNAL_handler_install (SIGTERM, &sighandler_shutdown);
   if (MPI_SUCCESS != MPI_Init(&argc, &argv))
   {
     LOG_ERROR ("Failed to initialise MPI\n");
-    return 1;
+    goto uninstall_sighandlers;
   }
   if (MPI_SUCCESS != MPI_Comm_size (MPI_COMM_WORLD, &nproc))
   {
@@ -941,16 +980,9 @@
     LOG_ERROR ("Cannot determine our MPI rank\n");
     goto fail;
   }
-  GNUNET_OS_network_interfaces_list (&net_if_processor, NULL);  
-  if (0 == nips)
+  if (GNUNET_OK != GNUNET_PROGRAM_run (argc, argv, "mshd", "mshd: MSH daemon",
+                                       options, &run, NULL))
   {
-    LOG_ERROR ("No IP addresses found\n");
-    goto fail;
-  }
-  bitmap = bitmap_create (rwidth);
-  addrmap = addressmap_create (nproc);
-  if (GNUNET_OK != scheduler_run (&run, NULL))
-  {
     GNUNET_break (0);
     goto fail;
   }
@@ -970,5 +1002,12 @@
   GNUNET_break (MPI_SUCCESS == MPI_Finalize());
   GNUNET_free_non_null (s_addrs);
   LOG_ERROR ("Returning\n");
+
+ uninstall_sighandlers:
+  if (NULL != listen_socket)
+    GNUNET_break (GNUNET_OK == GNUNET_NETWORK_socket_close (listen_socket));
+  GNUNET_DISK_pipe_close (sigpipe);
+  GNUNET_SIGNAL_handler_uninstall (shc_int);
+  GNUNET_SIGNAL_handler_uninstall (shc_term);
   return ret;
 }

Modified: msh/src/mtypes.h
===================================================================
--- msh/src/mtypes.h    2013-07-29 03:50:22 UTC (rev 28334)
+++ msh/src/mtypes.h    2013-07-29 07:10:22 UTC (rev 28335)
@@ -9,11 +9,6 @@
 
 #include "common.h"
 
-/**
- * gcc-ism to get packed structs.  This won't work on W32 but then do we use 
W32
- * for HPC? ;)
- */
-#define MSH_PACKED __attribute__((packed))
 
 /**
  * Message header that will be included for all messages
@@ -23,7 +18,7 @@
   /**
    * The size of the message
    */
-  uint16_t size MSH_PACKED;
+  uint16_t size GNUNET_PACKED;
 };
 
 
@@ -40,12 +35,12 @@
   /**
    * Randomly chosen port number
    */
-  uint16_t port MSH_PACKED;
+  uint16_t port GNUNET_PACKED;
 
   /**
    * Number of IP addresses
    */
-  uint16_t nips MSH_PACKED;
+  uint16_t nips GNUNET_PACKED;
 
   /**
    * IPv4 addresses to follow as 32 bit unsigned integeters
@@ -90,12 +85,12 @@
   /**
    * The rank of the instance
    */
-  uint16_t rank MSH_PACKED;
+  uint16_t rank GNUNET_PACKED;
 
   /**
    * The number of addresses
    */
-  uint16_t nips MSH_PACKED;
+  uint16_t nips GNUNET_PACKED;
 
   /**
    * IPv4 addresses to follow as 32 bit unsigned integers

Modified: msh/src/reduce.c
===================================================================
--- msh/src/reduce.c    2013-07-29 03:50:22 UTC (rev 28334)
+++ msh/src/reduce.c    2013-07-29 07:10:22 UTC (rev 28335)
@@ -12,7 +12,7 @@
 #include "mtypes.h"
 
 #define LOG(kind,...)                           \
-  GNUNET_log_from (kind, "mshd-addressmap", __VA_ARGS__)
+  GNUNET_log_from (kind, "mshd-reduce", __VA_ARGS__)
 
 #define LOG_DEBUG(...) LOG(GNUNET_ERROR_TYPE_DEBUG, __VA_ARGS__)
 
@@ -42,6 +42,7 @@
     return ret;
   }
   send_reqs = GNUNET_malloc (sizeof (MPI_Request) * nmsg);
+  LOG_DEBUG ("%d: Sending addressmap to instance %d\n", rank, instance);
   for (cnt = 0; cnt < nmsg; cnt++)
   {
     if (MPI_SUCCESS != 
@@ -60,6 +61,8 @@
     GNUNET_break (0);
     goto cleanup;
   }
+  GNUNET_free (send_reqs);
+  send_reqs = NULL;
   for (cnt = 0; cnt < nmsg; cnt++)
   {
     if (MPI_SUCCESS != stats[cnt].MPI_ERROR)
@@ -69,12 +72,12 @@
     }
   }
   ret = GNUNET_OK;
-
+  
  cleanup:
-  for (;(NULL != send_reqs) && (cnt >= 0); cnt--)
+  for (;(cnt > 0) && (NULL != send_reqs); cnt--)
   {
-    GNUNET_break (MPI_SUCCESS == MPI_Cancel (&send_reqs[cnt]));
-    GNUNET_break (MPI_SUCCESS == MPI_Wait (&send_reqs[cnt], 
MPI_STATUS_IGNORE));
+    GNUNET_break (MPI_SUCCESS == MPI_Cancel (&send_reqs[cnt - 1]));
+    GNUNET_break (MPI_SUCCESS == MPI_Wait (&send_reqs[cnt - 1], 
MPI_STATUS_IGNORE));
   }
   for (cnt = 0; cnt < nmsg; cnt++)
     free (iaddr_msgs[cnt]);
@@ -118,14 +121,13 @@
   else
   {
     nrecv = ((nproc - rank) - 1) / step_width;
-    if (0 != nrecv)
-    {
-      lb = rank + 1;
-      ub = nproc - 1;
-    }
+    if (0 == nrecv)
+      return GNUNET_OK;
+    lb = rank + 1;
+    ub = nproc - 1;
   }
   GNUNET_assert (nrecv >= 0);
-  nrecv *= nproc;  
+  nrecv *= nproc; /* we get a message for each instance from each instance */
   for (cnt = 0; cnt < nrecv; cnt++)
   {
     msg = NULL;
@@ -140,6 +142,8 @@
       GNUNET_break (0);
       goto cleanup;
     }
+    LOG_DEBUG ("%d: Receiving %d (nd/th) addressmap message from instance 
%d\n",
+               rank, cnt, stat.MPI_SOURCE);
     msize = 0;
     if ((MPI_SUCCESS != MPI_Get_elements (&stat, MPI_BYTE, &msize))
         || (msize <= 0))
@@ -198,9 +202,12 @@
     if (0 != (aggregator = (rank % step_width)))
     {
       aggregator = rank - aggregator;
-      return send_addressmap (aggregator);
+      if (GNUNET_SYSERR == send_addressmap (aggregator))
+        return GNUNET_SYSERR;
     }
     /* receive address maps */
-    receive_addressmap (step);
+    if (GNUNET_SYSERR  == receive_addressmap (step))
+      return GNUNET_SYSERR;
   }
+  return GNUNET_OK;
 }

Deleted: msh/src/scheduler.c
===================================================================
--- msh/src/scheduler.c 2013-07-29 03:50:22 UTC (rev 28334)
+++ msh/src/scheduler.c 2013-07-29 07:10:22 UTC (rev 28335)
@@ -1,305 +0,0 @@
-/**
- * @file scheduler.c
- * @brief task scheduler based on libevent
- * @author Sree Harsha Totakura <address@hidden> 
- */
-
-#include "common.h"
-#include "gnunet/gnunet_util_lib.h"
-#include "scheduler.h"
-
-#define LOG(kind,...)                           \
-  GNUNET_log_from (kind, "mshd-scheduler", __VA_ARGS__);
-
-#define LOG_DEBUG(...) LOG(GNUNET_ERROR_TYPE_DEBUG, __VA_ARGS__);
-
-#define LOG_ERROR(...) LOG(GNUNET_ERROR_TYPE_ERROR, __VA_ARGS__);
-
-#define LOG_STRERROR(cmd)                       \
-  GNUNET_log_from_strerror (GNUNET_ERROR_TYPE_WARNING, "mshd", cmd)
-
-/**
- * variable for 0 time.  Externalised in scheduler.h
- */
-struct timeval tv_immediate;
-
-struct Task
-{
-  /**
-   * DLL next
-   */
-  struct Task *next;
-
-  /**
-   * DLL prev
-   */
-  struct Task *prev;
-  
-  struct event *ev;
-};
-
-/**
- * Head for the DLL
- */
-static struct Task *thead;
-
-/**
- * Tail for the DLL
- */
-static struct Task *ttail;
-
-/**
- * Our event base
- */
-static struct event_base *ebase;
-
-
-/**
- * Adds a task which is to be executed when one of the given events are
- * triggered on the given socket or upon the expiry of the given timeout
- *
- * @param sock the sock to wait for
- * @param flags EV_* events; the callback cb
- * @param cb the callback to call when one of the events marked in flags are
- *          triggered on for the sock
- * @param cls closure for the callback
- * @param tv how long should we wait for the events.  Upon this value the cb is
- *          called with EV_TIMEOUT flag.  Use NULL to wait forever.
- * @return handle for the task; NULL upon error
- */
-struct Task *
-scheduler_add_socket (evutil_socket_t sock, short flags, event_callback_fn cb, 
-                      void *cls, const struct timeval *tv)
-{
-  struct Task *task;
-
-  GNUNET_assert (NULL != ebase);
-  task = GNUNET_malloc (sizeof (struct Task));
-  task->ev = event_new (ebase, sock, flags, cb, cls);
-  if (0 != event_add (task->ev, tv))
-  {
-    free (task);
-    return NULL;
-  }
-  GNUNET_CONTAINER_DLL_insert_tail (thead, ttail, task);
-  return task;
-}
-
-
-/**
- * Adds a task which is to be executed after given interval
- *
- * @param cb the callback to call for executing the task
- * @param cls closure for the above callback
- * @param tv the interval after which the task has to be executed; NULL to
- *          denote infinite delay
- * @return handle for task; NULL upon error
- */
-struct Task *
-scheduler_add (event_callback_fn cb, void *cls, const struct timeval *tv)
-{
-  return scheduler_add_socket (-1, 0, cb, cls, tv);
-}
-
-
-/**
- * Add a task to be executed upon reception of a signal or upon the expiry of a
- * given timeout
- *
- * @param signal the signal to wait for
- * @param cb the callback to call upon reception of the signal
- * @param cls closure for the above callback
- * @param tv how long should we wait for the signal before.  Upon this value 
the cb is
- *          called with EV_TIMEOUT flag.  Use NULL to wait forever.
- * @return handle for the task; NULL upon error
- */
-struct Task *
-scheduler_add_signal (int signal, event_callback_fn cb, void *cls, 
-                      const struct timeval *tv)
-{
-  return scheduler_add_socket (signal, EV_SIGNAL, cb, cls, tv);
-}
-
-
-/**
- * Remove a task.  All tasks are to be removed (even after their respective
- * callbacks are executed)
- *
- * @param task the task handle to remove
- */
-void
-scheduler_remove (struct Task *task)
-{
-  GNUNET_CONTAINER_DLL_remove (thead, ttail, task);
-  GNUNET_break (0 == event_del (task->ev));
-  event_free (task->ev);
-  free (task);
-}
-
-/**
- * Shutdowns the scheduler.  All pending tasks are executed (their respective
- * callbacks will be called).  Use IS_SHUTDOWN_EVENT() to check if the 
callbacks
- * are called upon scheduler's shutdown.  It is not possible to add any tasks
- * after this function is called.
- *
- * @see IS_SHUTDOWN_EVENT
- */
-void
-scheduler_shutdown ()
-{
-  struct Task *task;
-
-  for (task = thead; NULL != task; task = task->next)
-  {
-    event_active (task->ev, EV_READ | EV_WRITE | EV_TIMEOUT, 0);
-  }
-}
-
-
-/**
- * Run the scheduler loop by calling the given callback.  This function returns
- * once all tasks are finished or after a call to scheduler_shutdown() (which
- * causes all waiting tasks to be executed)
- *
- * @param cb the callback to call when the scheduler is ready.  Further tasks
- *          can be added through this callback.
- * @return GNUNET_OK if all tasks are successfully executed; GNUNET_SYSERR 
upon error
- */
-int
-scheduler_run (event_callback_fn cb, void *cls)
-{
-  struct Task *task;
-  struct event *sev;
-  int ret;
-
-  ebase = event_base_new ();
-  if (NULL == ebase)
-  {
-    LOG_ERROR ("Cannot allocate libevent event base\n");
-    return GNUNET_SYSERR;
-  }
-  sev = evtimer_new (ebase, cb, cls);
-  evtimer_add (sev, TV_IMMEDIATE);
-  ret = event_base_dispatch (ebase);
-  evtimer_del (sev);
-  event_free (sev);
-  event_base_free (ebase);  
-  return (1 == ret) ? GNUNET_OK : GNUNET_SYSERR;
-}
-
-
-/**
- * Handle to be returned from scheduler_open_socket()
- */
-struct SocketOpenHandle
-{
-  /**
-   * the function to call when the socket is ready
-   */
-  socket_open_fn cb;
-
-  /**
-   * The closure for the above callback
-   */
-  void *cls;
-
-  /**
-   * The task associated with the socket.  Will be executed when the connection
-   * on the socket is ready
-   */
-  struct Task *task;
-
-  /**
-   * The file descriptor of the socket
-   */
-  int sock;
-};
-
-
-/**
- * Callback that will be called when the socket is ready for reading
- *
- * @param sock the file descriptor of the socket
- * @param flags EV_* flags
- * @param cls the closure
- */
-static void
-open_socket_cb (evutil_socket_t sock, short flags, void *cls)
-{
-  struct SocketOpenHandle *h = cls;
-  socket_open_fn cb;
-  void *cbcls;
-  int errval;
-  socklen_t optlen;
-  
-  scheduler_remove (h->task);
-  h->task = NULL;
-  cb = h->cb;
-  cbcls = h->cls;
-  GNUNET_assert (h->sock == sock);
-  free (h);
-  if (IS_SHUTDOWN_EVENT (flags))
-    goto err_ret;
-  errval = 1;
-  optlen = sizeof (errval);
-  if (0 != getsockopt (sock, SOL_SOCKET, SO_ERROR, &errval, &optlen))
-  {
-    LOG_STRERROR ("getsockopt");
-    goto err_ret;
-  }
-  if (0 != errval)
-  {
-    LOG_ERROR ("connect() failed for a socket: %s\n", strerror (errval));
-    goto err_ret;
-  }
-  cb (sock, cbcls);
-  return;
-
- err_ret:
-  MSH_close (sock);
-  cb (-1, cbcls);
-}
-
-
-/**
- * Open a socket, connect it to the target address and schedule a task to be
- * executed when the connection is ready.
- *
- * @param addr the target address to connect
- * @param addrlen the length of the addr
- * @param cb the callback to call to signal success or failure
- * @param cls the closure for the above callback
- * @return a handle which can be used to cancel the task to be executed when 
the
- *           connection is ready; NULL upon error
- */
-struct SocketOpenHandle *
-scheduler_open_socket (const struct sockaddr *addr, const socklen_t addrlen,
-                       socket_open_fn cb, void *cls)
-{
-  struct SocketOpenHandle *h;
-  int sock;
-
-  GNUNET_assert (NULL != cb);
-  if (-1 == (sock = open_socket (addr, addrlen)))
-    return NULL;
-  h = GNUNET_malloc (sizeof (struct SocketOpenHandle));
-  h->cb = cb;
-  h->cls = cls;
-  h->sock = sock;
-  h->task = scheduler_add_socket (sock, EV_WRITE, &open_socket_cb, h, NULL);
-  return h;
-}
-
-
-/**
- * Cancel a handle created with scheduler_open_socket()
- *
- * @param h the handle to cancel
- */
-void
-scheduler_open_socket_cancel (struct SocketOpenHandle *h)
-{
-  scheduler_remove (h->task);
-  MSH_close (h->sock);
-  free (h);
-}

Deleted: msh/src/scheduler.h
===================================================================
--- msh/src/scheduler.h 2013-07-29 03:50:22 UTC (rev 28334)
+++ msh/src/scheduler.h 2013-07-29 07:10:22 UTC (rev 28335)
@@ -1,151 +0,0 @@
-/**
- * @file scheduler.h
- * @brief interface for task scheduler based on libevent
- * @author Sree Harsha Totakura <address@hidden> 
- */
-
-#ifndef SCHEDULER_H_
-#define SCHEDULER_H_
-
-#include "common.h"
-#include "event2/event.h"
-
-extern struct timeval tv_immediate;
-
-/**
- * Use this for scheduling tasks immediately
- */
-#define TV_IMMEDIATE &tv_immediate
-
-/**
- * Returns true if the flags denote a shutdown event
- */
-#define IS_SHUTDOWN_EVENT(flags) ((flags & (EV_READ | EV_WRITE | EV_TIMEOUT)) 
== (EV_READ | EV_WRITE | EV_TIMEOUT))
-
-
-/**
- * Opaque handle for a task
- */
-struct Task;
-
-
-/**
- * Adds a task which is to be executed when one of the given events are
- * triggered on the given socket or upon the expiry of the given timeout
- *
- * @param sock the sock to wait for
- * @param flags EV_* events; the callback cb
- * @param cb the callback to call when one of the events marked in flags are
- *          triggered on for the sock
- * @param cls closure for the callback
- * @param tv how long should we wait for the events.  Upon this value the cb is
- *          called with EV_TIMEOUT flag.  Use NULL to wait forever.
- * @return handle for the task; NULL upon error
- */
-struct Task *
-scheduler_add_socket (evutil_socket_t sock, short flags, event_callback_fn cb, 
-                      void *cls, const struct timeval *tv);
-
-
-/**
- * Adds a task which is to be executed after given interval
- *
- * @param cb the callback to call for executing the task
- * @param cls closure for the above callback
- * @param tv the interval after which the task has to be executed; NULL to
- *          denote infinite delay
- * @return handle for task; NULL upon error
- */
-struct Task *
-scheduler_add (event_callback_fn cb, void *cls, const struct timeval *tv);
-
-
-/**
- * Add a task to be executed upon reception of a signal or upon the expiry of a
- * given timeout
- *
- * @param signal the signal to wait for
- * @param cb the callback to call upon reception of the signal
- * @param cls closure for the above callback
- * @param tv how long should we wait for the signal before.  Upon this value 
the cb is
- *          called with EV_TIMEOUT flag.  Use NULL to wait forever.
- * @return handle for the task; NULL upon error
- */
-struct Task *
-scheduler_add_signal (int signal, event_callback_fn cb, void *cls, 
-                      const struct timeval *tv);
-
-
-/**
- * Remove a task.  All tasks are to be removed (even after their respective
- * callbacks are executed)
- *
- * @param task the task handle to remove
- */
-void
-scheduler_remove (struct Task *task);
-
-
-/**
- * Shutdowns the scheduler.  All pending tasks are executed (their respective
- * callbacks will be called).  Use IS_SHUTDOWN_EVENT() to check if the 
callbacks
- * are called upon scheduler's shutdown.  It is not possible to add any tasks
- * after this function is called.
- *
- * @see IS_SHUTDOWN_EVENT
- */
-void
-scheduler_shutdown ();
-
-
-/**
- * Run the scheduler loop by calling the given callback.  This function returns
- * once all tasks are finished or after a call to scheduler_shutdown() (which
- * causes all waiting tasks to be executed)
- *
- * @param cb the callback to call when the scheduler is ready.  Further tasks
- *          can be added through this callback.
- * @return MSH_OK if all tasks are successfully executed; MSH_SYSERR upon error
- */
-int
-scheduler_run (event_callback_fn cb, void *cls);
-
-
-/**
- * The type of the function which is used as a callback argument to
- * scheduler_open_socket().  The callback will be called when a socket
- * connection is either successfully established or failed
- *
- * @param sockfd the socket file descriptor; upon failure its value is -1
- * @param cls the closure for this callback as passed to 
scheduler_open_socket()
- */
-typedef void (* socket_open_fn) (int sockfd, void *cls);
-
-
-/**
- * Open a socket, connect it to the target address and schedule a task to be
- * executed when the connection is ready.
- *
- * @param addr the target address to connect
- * @param addrlen the length of the addr
- * @param cb the callback to call to signal success or failure
- * @param cls the closure for the above callback
- * @return a handle which can be used to cancel the task to be executed when 
the
- *           connection is ready; NULL upon error
- */
-struct SocketOpenHandle *
-scheduler_open_socket (const struct sockaddr *addr, const socklen_t addrlen,
-                       socket_open_fn cb, void *cls);
-
-
-/**
- * Cancel a handle created with scheduler_open_socket()
- *
- * @param h the handle to cancel
- */
-void
-scheduler_open_socket_cancel (struct SocketOpenHandle *h);
-
-#endif  /* SCHEDULER_H_ */
-
-/* End of scheduler.h */

Modified: msh/src/util.c
===================================================================
--- msh/src/util.c      2013-07-29 03:50:22 UTC (rev 28334)
+++ msh/src/util.c      2013-07-29 07:10:22 UTC (rev 28335)
@@ -1,9 +1,12 @@
 #include "common.h"
+#include <gnunet/gnunet_util_lib.h>
+#include "mshd.h"
 #include "util.h"
 
 #define LOG_STRERROR(cmd)                  \
   GNUNET_log_from_strerror (GNUNET_ERROR_TYPE_WARNING, "mshd-util", cmd)
 
+
 /**
  * Creates a new non-blocking socket and binds it to the given address and 
makes
  * it a listen socket
@@ -12,46 +15,47 @@
  * @param addrlen the length of the addr
  * @param backlog the max length of the pending connections.  This will be
  *          passed to listen()
- * @return the socket's fd; -1 on error
+ * @return the handler to the socket; NULL upon error
  */
-int
+struct GNUNET_NETWORK_Handle *
 open_listen_socket (struct sockaddr *addr, const socklen_t addrlen, int 
backlog)
 {
+  struct GNUNET_NETWORK_Handle *lsock;
   socklen_t newaddrlen;
-  int sock;
+  int sockfd;
     
-  sock = socket (AF_INET, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0);
-  if (-1 == sock)
+  if (NULL == (lsock = GNUNET_NETWORK_socket_create (AF_INET, SOCK_STREAM, 0)))
   {
-    LOG_STRERROR ("socket");
-    return -1;
+    GNUNET_break (0);
+    return NULL;
   }
-  if (-1 == bind (sock, addr, addrlen))
+  if (GNUNET_OK != GNUNET_NETWORK_socket_bind (lsock, addr, addrlen, 0))
   {
-    LOG_STRERROR ("bind");
+    GNUNET_break (0);
     goto clo_ret;
   }
+  sockfd = GNUNET_NETWORK_get_fd (lsock);
   newaddrlen = addrlen;
-  if (-1 == getsockname (sock, addr, &newaddrlen))
+  if (-1 == getsockname (sockfd, addr, &newaddrlen))
   {
     LOG_STRERROR ("getsockname");
     goto clo_ret;
-  }  
+  }
   if (newaddrlen != addrlen)
   {
     GNUNET_break (0);
     goto clo_ret;
   }
-  if (-1 == listen (sock, backlog))
+  if (GNUNET_OK != GNUNET_NETWORK_socket_listen (lsock, rwidth))
   {
-    LOG_STRERROR ("listen");
+    GNUNET_break (0);
     goto clo_ret;
   }
-  return sock;
+  return lsock;
 
  clo_ret:
-  (void) close (sock);
-  return -1;
+  GNUNET_break (GNUNET_OK == GNUNET_NETWORK_socket_close (lsock));
+  return NULL;
 }
 
 

Modified: msh/src/util.h
===================================================================
--- msh/src/util.h      2013-07-29 03:50:22 UTC (rev 28334)
+++ msh/src/util.h      2013-07-29 07:10:22 UTC (rev 28335)
@@ -3,6 +3,7 @@
 
 #include <gnunet/gnunet_common.h>
 
+
 /**
  * Creates a new non-blocking socket and binds it to the given address and 
makes
  * it a listen socket
@@ -11,9 +12,9 @@
  * @param addrlen the length of the addr
  * @param backlog the max length of the pending connections.  This will be
  *          passed to listen()
- * @return the socket's fd; -1 on error
+ * @return the handler to the socket; NULL upon error
  */
-int
+struct GNUNET_NETWORK_Handle *
 open_listen_socket (struct sockaddr *addr, const socklen_t addrlen, int 
backlog);
 
 




reply via email to

[Prev in Thread] Current Thread [Next in Thread]