gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r28124 - in msh: . src


From: gnunet
Subject: [GNUnet-SVN] r28124 - in msh: . src
Date: Wed, 17 Jul 2013 18:30:10 +0200

Author: harsha
Date: 2013-07-17 18:30:10 +0200 (Wed, 17 Jul 2013)
New Revision: 28124

Modified:
   msh/configure.ac
   msh/src/Makefile.am
   msh/src/mshd.c
   msh/src/mtypes.h
Log:
message sending logic for reduce operation


Modified: msh/configure.ac
===================================================================
--- msh/configure.ac    2013-07-17 15:48:08 UTC (rev 28123)
+++ msh/configure.ac    2013-07-17 16:30:10 UTC (rev 28124)
@@ -27,6 +27,15 @@
 AC_CHECK_FUNCS([strdup strerror getnameinfo getifaddrs freeifaddrs],,
                [AC_MSG_ERROR([a required C library function is missing])])
 
+# test for math functions
+AC_CHECK_HEADERS([math.h], 
+                 [AC_CHECK_LIB([m], [log],
+                               [AC_CHECK_FUNC([ceil],math=1)])])
+if test "$math" != 1
+then
+  AC_MSG_ERROR([MSH requires math library])
+fi
+
 # test for mpi
 AC_CHECK_HEADERS([mpi.h], [AC_CHECK_LIB([mpi], [MPI_Init], [mpi=1])])
 if test "$mpi" != 1

Modified: msh/src/Makefile.am
===================================================================
--- msh/src/Makefile.am 2013-07-17 15:48:08 UTC (rev 28123)
+++ msh/src/Makefile.am 2013-07-17 16:30:10 UTC (rev 28124)
@@ -4,7 +4,7 @@
 
 mshd_SOURCES = mshd.c util.c util.h scheduler.c scheduler.h \
   common.h bitmap.c bitmap.h addressmap.c addressmap.h
-mshd_LDADD = -levent
+mshd_LDADD = -levent -lm
 mshd_CPPFLAGS = $(LIBEVENT_CPPFLAGS)
 mshd_LDFLAGS =  $(LIBEVENT_LDFLAGS)
 

Modified: msh/src/mshd.c
===================================================================
--- msh/src/mshd.c      2013-07-17 15:48:08 UTC (rev 28123)
+++ msh/src/mshd.c      2013-07-17 16:30:10 UTC (rev 28124)
@@ -1,5 +1,6 @@
 #include "common.h"
 #include <mpi.h>
+#include <math.h>
 #include "util.h"
 #include "scheduler.h"
 #include "mtypes.h"
@@ -152,7 +153,7 @@
 /**
  * Current IP verification round 
  */
-static unsigned int round;
+static unsigned int current_round;
 
 /**
  * width of the round -- how many other mshd instances verify our IP addresses
@@ -196,6 +197,131 @@
 
 
 /**
+ * Send our addressmap to the instance with the given rank
+ *
+ * @param rank the rank of the instance to which our addressmap has to be sent
+ * @return MSH_OK upon success; MSH_SYSERR upon failure
+ */
+static int
+send_addressmap (int rank)
+{ 
+  struct MSH_MSG_AddressMap *map_msg;
+  struct MSH_MSG_InstanceAdresses **iaddr_msgs;
+  struct MSH_MessageHeader *msg;  
+  //MPI_Request **send_reqs;
+  MPI_Request *send_reqs;
+  MPI_Status *stats;
+  unsigned int nmsg;
+  int cnt;
+  int ret;
+  int type;
+
+  ret = MSH_SYSERR;
+  send_reqs = NULL;
+  if (MSH_OK != addressmap_pack (addrmap, &map_msg, &iaddr_msgs))
+  {
+    MSH_break (0);
+    return ret;
+  }
+  nmsg = ntohs (map_msg->num) + 1;  
+  //send_reqs = MSH_malloc (sizeof (MPI_Request *) * nmsg);
+  send_reqs = MSH_malloc (sizeof (MPI_Request) * nmsg);
+  for (cnt = 0; cnt < nmsg; cnt++)
+  {
+    if (0 == cnt)
+    {
+      msg = (struct MSH_MessageHeader *) map_msg;
+      type = MSH_MTYPE_ADDRESS_MAP;
+    } 
+    else
+    {      
+      msg = (struct MSH_MessageHeader *) iaddr_msgs[cnt - 1];
+      type = MSH_MTYPE_INSTANCE_ADDRESS;
+    }
+    //send_reqs[cnt] = MSH_malloc (sizeof (MPI_Request));
+    if (MPI_SUCCESS != 
+        MPI_Isend (msg, ntohs (msg->size), MPI_BYTE, rank, type,
+                   MPI_COMM_WORLD, &send_reqs[cnt]))
+    {
+      MSH_break (0);
+      cnt--;
+      goto cleanup;
+    }
+  }
+  stats = MSH_malloc (sizeof (MPI_Status) * nmsg);
+  if (MPI_SUCCESS != MPI_Waitall (nmsg, send_reqs, stats))
+  {
+    MSH_break (0);
+    free (send_reqs);
+    send_reqs = NULL;
+    goto cleanup;
+  }
+  for (cnt = 0; cnt < nmsg; cnt++)
+  {
+    free (send_reqs[cnt]);
+    send_reqs[cnt] = NULL;
+  }
+  for (cnt = 0; cnt < nmsg; cnt++)
+  {
+    if (MPI_SUCCESS != stats[cnt].MPI_ERROR)
+    {
+      MSH_break (0);
+      goto cleanup;
+    }
+  }
+  ret = MSH_OK;
+
+ cleanup:
+  for (;(NULL != send_reqs) && (cnt >= 0); cnt--)
+  {
+    MSH_break (MPI_SUCCESS == MPI_Cancel (&send_reqs[cnt]));
+    MSH_break (MPI_SUCCESS == MPI_Wait (&send_reqs[cnt], MPI_STATUS_IGNORE));
+  }
+  for (cnt = 0; cnt < nmsg; cnt++)
+  {
+    if (0 == cnt)
+      msg = (struct MSH_MessageHeader *) map_msg;
+    else
+      msg = (struct MSH_MessageHeader *) iaddr_msgs[cnt - 1];
+    free (msg);
+  }  
+  MSH_free_non_null (send_reqs);
+  MSH_free_non_null (stats);
+
+  return ret;
+}
+
+
+/**
+ * Perform an ntree reduction on address maps
+ *
+ * @return MSH_OK upon success; MSH_SYSERR upon failure
+ */
+static int
+ntree_reduction ()
+{
+  unsigned int cnt;
+  unsigned int max_steps;
+  unsigned int aggregator;
+  unsigned int step_width;
+  
+  max_steps = (unsigned int) ceil (log ((double) nproc) / log ((double) 
rwidth) );
+  for (cnt = 0; cnt < max_steps; cnt++)
+  {
+    if (MPI_SUCCESS != MPI_Barrier (MPI_COMM_WORLD))
+      return MSH_SYSERR;
+    step_width = (unsigned int) pow (rwidth, cnt);
+    if (0 != (aggregator = (rank % step_width)))
+    {
+      aggregator = rank - aggregator;
+      return send_addressmap (aggregator);
+    }
+    /* receive address maps */
+  }
+}
+
+
+/**
  * Callback function invoked for each interface found.
  *
  * @param cls closure
@@ -329,13 +455,13 @@
 static void
 schedule_next_round ()
 {
-  int trounds;
+  int total_rounds;
 
   MSH_assert (NULL == rtask);
   /* Number of rounds required to contact all processes except ourselves 
(rwidth
      in parallel in each round) */
-  trounds = ((nproc - 1) + (rwidth - 1)) / rwidth;
-  if (round < trounds)
+  total_rounds = ((nproc - 1) + (rwidth - 1)) / rwidth;
+  if (current_round < total_rounds)
   {
     rtask = scheduler_add (&run_round, NULL, TV_IMMEDIATE);
     return;
@@ -386,7 +512,7 @@
     scheduler_shutdown ();
     return;
   }
-  round++;
+  current_round++;
   schedule_next_round ();
 }
 
@@ -421,7 +547,7 @@
   }
   /* FIXME: add the addresses associated with the contex to the mapping */
   
-  lb = rank - round * rwidth - rwidth + nproc;
+  lb = rank - current_round * rwidth - rwidth + nproc;
   MSH_assert (0 <= lb);
   lb %= nproc;
   source = instance_address_info_get_rank (ctx->iainfo);
@@ -593,9 +719,9 @@
                &status);
     MPI_Get_elements (&status, MPI_BYTE, &rsize);
     /* We expect a message from peers with id p in the range:       
-       (rank - round * rwidth - rwidth) <= p <= (rank - (round * rwidth) -1) */
-    lb = rank - round * rwidth - rwidth + nproc;
-    up = rank - (round * rwidth) - 1 + nproc;
+       (rank - current_round * rwidth - rwidth) <= p <= (rank - (current_round 
* rwidth) -1) */
+    lb = rank - current_round * rwidth - rwidth + nproc;
+    up = rank - (current_round * rwidth) - 1 + nproc;
     MSH_assert (lb >= 0);
     MSH_assert (up >= 0);
     lb %= nproc;
@@ -670,7 +796,7 @@
     msg->ipaddrs[cnt] = (uint32_t) s_addrs[cnt]; /* IPs already in NB */
   }
   width = rwidth;  
-  if ( (0 != ( (nproc - 1) % rwidth)) && (round == ( (nproc - 1) / rwidth)) )
+  if ( (0 != ( (nproc - 1) % rwidth)) && (current_round == ( (nproc - 1) / 
rwidth)) )
     width = (nproc - 1) % rwidth;
   cpys = NULL;
   cpys = MSH_malloc (msize * width);
@@ -678,7 +804,7 @@
   for (cnt=0; cnt < width; cnt++)
   {    
     (void) memcpy (&cpys[cnt], msg, msize);
-    target = (round * rwidth) + cnt + 1;
+    target = (current_round * rwidth) + cnt + 1;
     MSH_assert (target < nproc);
     target = (rank + target) % nproc;
     LOG_DEBUG ("%d: Sending message to %d\n", rank, target);
@@ -702,7 +828,7 @@
   {
     MSH_break (MPI_SUCCESS == MPI_Wait (&sreqs[cnt], MPI_STATUS_IGNORE));    
   }
-  LOG_DEBUG ("%d: Round: %d -- All messages sent successfully\n", rank, round);
+  LOG_DEBUG ("%d: Round: %d -- All messages sent successfully\n", rank, 
current_round);
   if (NULL != cpys)
   {    
     free (cpys);

Modified: msh/src/mtypes.h
===================================================================
--- msh/src/mtypes.h    2013-07-17 15:48:08 UTC (rev 28123)
+++ msh/src/mtypes.h    2013-07-17 16:30:10 UTC (rev 28124)
@@ -77,7 +77,8 @@
 
 
 /**
- * Message for signifying transmission of an address map
+ * Message for signifying transmission of an address map.  Use MPI tag
+ * MSH_MTYPE_ADDRESS_MAP 
  */
 struct MSH_MSG_AddressMap
 {
@@ -95,7 +96,8 @@
 
 /**
  * Structure for representing verified addresses of an instance.  This does not
- * denote a message but is used in @see MSH_MSG_AddressMap
+ * denote a message but is used in @see MSH_MSG_AddressMap.  The type for these
+ * messages should be MSH_MTYPE_INSTANCE_ADDRESS
  */
 struct MSH_MSG_InstanceAdresses
 {




reply via email to

[Prev in Thread] Current Thread [Next in Thread]