[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r28124 - in msh: . src
From: |
gnunet |
Subject: |
[GNUnet-SVN] r28124 - in msh: . src |
Date: |
Wed, 17 Jul 2013 18:30:10 +0200 |
Author: harsha
Date: 2013-07-17 18:30:10 +0200 (Wed, 17 Jul 2013)
New Revision: 28124
Modified:
msh/configure.ac
msh/src/Makefile.am
msh/src/mshd.c
msh/src/mtypes.h
Log:
message sending logic for reduce operation
Modified: msh/configure.ac
===================================================================
--- msh/configure.ac 2013-07-17 15:48:08 UTC (rev 28123)
+++ msh/configure.ac 2013-07-17 16:30:10 UTC (rev 28124)
@@ -27,6 +27,15 @@
AC_CHECK_FUNCS([strdup strerror getnameinfo getifaddrs freeifaddrs],,
[AC_MSG_ERROR([a required C library function is missing])])
+# test for math functions
+AC_CHECK_HEADERS([math.h],
+ [AC_CHECK_LIB([m], [log],
+ [AC_CHECK_FUNC([ceil],math=1)])])
+if test "$math" != 1
+then
+ AC_MSG_ERROR([MSH requires math library])
+fi
+
# test for mpi
AC_CHECK_HEADERS([mpi.h], [AC_CHECK_LIB([mpi], [MPI_Init], [mpi=1])])
if test "$mpi" != 1
Modified: msh/src/Makefile.am
===================================================================
--- msh/src/Makefile.am 2013-07-17 15:48:08 UTC (rev 28123)
+++ msh/src/Makefile.am 2013-07-17 16:30:10 UTC (rev 28124)
@@ -4,7 +4,7 @@
mshd_SOURCES = mshd.c util.c util.h scheduler.c scheduler.h \
common.h bitmap.c bitmap.h addressmap.c addressmap.h
-mshd_LDADD = -levent
+mshd_LDADD = -levent -lm
mshd_CPPFLAGS = $(LIBEVENT_CPPFLAGS)
mshd_LDFLAGS = $(LIBEVENT_LDFLAGS)
Modified: msh/src/mshd.c
===================================================================
--- msh/src/mshd.c 2013-07-17 15:48:08 UTC (rev 28123)
+++ msh/src/mshd.c 2013-07-17 16:30:10 UTC (rev 28124)
@@ -1,5 +1,6 @@
#include "common.h"
#include <mpi.h>
+#include <math.h>
#include "util.h"
#include "scheduler.h"
#include "mtypes.h"
@@ -152,7 +153,7 @@
/**
* Current IP verification round
*/
-static unsigned int round;
+static unsigned int current_round;
/**
* width of the round -- how many other mshd instances verify our IP addresses
@@ -196,6 +197,131 @@
/**
+ * Send our addressmap to the instance with the given rank
+ *
+ * @param rank the rank of the instance to which our addressmap has to be sent
+ * @return MSH_OK upon success; MSH_SYSERR upon failure
+ */
+static int
+send_addressmap (int rank)
+{
+ struct MSH_MSG_AddressMap *map_msg;
+ struct MSH_MSG_InstanceAdresses **iaddr_msgs;
+ struct MSH_MessageHeader *msg;
+ //MPI_Request **send_reqs;
+ MPI_Request *send_reqs;
+ MPI_Status *stats;
+ unsigned int nmsg;
+ int cnt;
+ int ret;
+ int type;
+
+ ret = MSH_SYSERR;
+ send_reqs = NULL;
+ if (MSH_OK != addressmap_pack (addrmap, &map_msg, &iaddr_msgs))
+ {
+ MSH_break (0);
+ return ret;
+ }
+ nmsg = ntohs (map_msg->num) + 1;
+ //send_reqs = MSH_malloc (sizeof (MPI_Request *) * nmsg);
+ send_reqs = MSH_malloc (sizeof (MPI_Request) * nmsg);
+ for (cnt = 0; cnt < nmsg; cnt++)
+ {
+ if (0 == cnt)
+ {
+ msg = (struct MSH_MessageHeader *) map_msg;
+ type = MSH_MTYPE_ADDRESS_MAP;
+ }
+ else
+ {
+ msg = (struct MSH_MessageHeader *) iaddr_msgs[cnt - 1];
+ type = MSH_MTYPE_INSTANCE_ADDRESS;
+ }
+ //send_reqs[cnt] = MSH_malloc (sizeof (MPI_Request));
+ if (MPI_SUCCESS !=
+ MPI_Isend (msg, ntohs (msg->size), MPI_BYTE, rank, type,
+ MPI_COMM_WORLD, &send_reqs[cnt]))
+ {
+ MSH_break (0);
+ cnt--;
+ goto cleanup;
+ }
+ }
+ stats = MSH_malloc (sizeof (MPI_Status) * nmsg);
+ if (MPI_SUCCESS != MPI_Waitall (nmsg, send_reqs, stats))
+ {
+ MSH_break (0);
+ free (send_reqs);
+ send_reqs = NULL;
+ goto cleanup;
+ }
+ for (cnt = 0; cnt < nmsg; cnt++)
+ {
+ free (send_reqs[cnt]);
+ send_reqs[cnt] = NULL;
+ }
+ for (cnt = 0; cnt < nmsg; cnt++)
+ {
+ if (MPI_SUCCESS != stats[cnt].MPI_ERROR)
+ {
+ MSH_break (0);
+ goto cleanup;
+ }
+ }
+ ret = MSH_OK;
+
+ cleanup:
+ for (;(NULL != send_reqs) && (cnt >= 0); cnt--)
+ {
+ MSH_break (MPI_SUCCESS == MPI_Cancel (&send_reqs[cnt]));
+ MSH_break (MPI_SUCCESS == MPI_Wait (&send_reqs[cnt], MPI_STATUS_IGNORE));
+ }
+ for (cnt = 0; cnt < nmsg; cnt++)
+ {
+ if (0 == cnt)
+ msg = (struct MSH_MessageHeader *) map_msg;
+ else
+ msg = (struct MSH_MessageHeader *) iaddr_msgs[cnt - 1];
+ free (msg);
+ }
+ MSH_free_non_null (send_reqs);
+ MSH_free_non_null (stats);
+
+ return ret;
+}
+
+
+/**
+ * Perform an ntree reduction on address maps
+ *
+ * @return MSH_OK upon success; MSH_SYSERR upon failure
+ */
+static int
+ntree_reduction ()
+{
+ unsigned int cnt;
+ unsigned int max_steps;
+ unsigned int aggregator;
+ unsigned int step_width;
+
+ max_steps = (unsigned int) ceil (log ((double) nproc) / log ((double)
rwidth) );
+ for (cnt = 0; cnt < max_steps; cnt++)
+ {
+ if (MPI_SUCCESS != MPI_Barrier (MPI_COMM_WORLD))
+ return MSH_SYSERR;
+ step_width = (unsigned int) pow (rwidth, cnt);
+ if (0 != (aggregator = (rank % step_width)))
+ {
+ aggregator = rank - aggregator;
+ return send_addressmap (aggregator);
+ }
+ /* receive address maps */
+ }
+}
+
+
+/**
* Callback function invoked for each interface found.
*
* @param cls closure
@@ -329,13 +455,13 @@
static void
schedule_next_round ()
{
- int trounds;
+ int total_rounds;
MSH_assert (NULL == rtask);
/* Number of rounds required to contact all processes except ourselves
(rwidth
in parallel in each round) */
- trounds = ((nproc - 1) + (rwidth - 1)) / rwidth;
- if (round < trounds)
+ total_rounds = ((nproc - 1) + (rwidth - 1)) / rwidth;
+ if (current_round < total_rounds)
{
rtask = scheduler_add (&run_round, NULL, TV_IMMEDIATE);
return;
@@ -386,7 +512,7 @@
scheduler_shutdown ();
return;
}
- round++;
+ current_round++;
schedule_next_round ();
}
@@ -421,7 +547,7 @@
}
/* FIXME: add the addresses associated with the contex to the mapping */
- lb = rank - round * rwidth - rwidth + nproc;
+ lb = rank - current_round * rwidth - rwidth + nproc;
MSH_assert (0 <= lb);
lb %= nproc;
source = instance_address_info_get_rank (ctx->iainfo);
@@ -593,9 +719,9 @@
&status);
MPI_Get_elements (&status, MPI_BYTE, &rsize);
/* We expect a message from peers with id p in the range:
- (rank - round * rwidth - rwidth) <= p <= (rank - (round * rwidth) -1) */
- lb = rank - round * rwidth - rwidth + nproc;
- up = rank - (round * rwidth) - 1 + nproc;
+ (rank - current_round * rwidth - rwidth) <= p <= (rank - (current_round
* rwidth) -1) */
+ lb = rank - current_round * rwidth - rwidth + nproc;
+ up = rank - (current_round * rwidth) - 1 + nproc;
MSH_assert (lb >= 0);
MSH_assert (up >= 0);
lb %= nproc;
@@ -670,7 +796,7 @@
msg->ipaddrs[cnt] = (uint32_t) s_addrs[cnt]; /* IPs already in NB */
}
width = rwidth;
- if ( (0 != ( (nproc - 1) % rwidth)) && (round == ( (nproc - 1) / rwidth)) )
+ if ( (0 != ( (nproc - 1) % rwidth)) && (current_round == ( (nproc - 1) /
rwidth)) )
width = (nproc - 1) % rwidth;
cpys = NULL;
cpys = MSH_malloc (msize * width);
@@ -678,7 +804,7 @@
for (cnt=0; cnt < width; cnt++)
{
(void) memcpy (&cpys[cnt], msg, msize);
- target = (round * rwidth) + cnt + 1;
+ target = (current_round * rwidth) + cnt + 1;
MSH_assert (target < nproc);
target = (rank + target) % nproc;
LOG_DEBUG ("%d: Sending message to %d\n", rank, target);
@@ -702,7 +828,7 @@
{
MSH_break (MPI_SUCCESS == MPI_Wait (&sreqs[cnt], MPI_STATUS_IGNORE));
}
- LOG_DEBUG ("%d: Round: %d -- All messages sent successfully\n", rank, round);
+ LOG_DEBUG ("%d: Round: %d -- All messages sent successfully\n", rank,
current_round);
if (NULL != cpys)
{
free (cpys);
Modified: msh/src/mtypes.h
===================================================================
--- msh/src/mtypes.h 2013-07-17 15:48:08 UTC (rev 28123)
+++ msh/src/mtypes.h 2013-07-17 16:30:10 UTC (rev 28124)
@@ -77,7 +77,8 @@
/**
- * Message for signifying transmission of an address map
+ * Message for signifying transmission of an address map. Use MPI tag
+ * MSH_MTYPE_ADDRESS_MAP
*/
struct MSH_MSG_AddressMap
{
@@ -95,7 +96,8 @@
/**
* Structure for representing verified addresses of an instance. This does not
- * denote a message but is used in @see MSH_MSG_AddressMap
+ * denote a message but is used in @see MSH_MSG_AddressMap. The type for these
+ * messages should be MSH_MTYPE_INSTANCE_ADDRESS
*/
struct MSH_MSG_InstanceAdresses
{
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r28124 - in msh: . src,
gnunet <=