[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r30825 - in msh: . mshd2 src
From: |
gnunet |
Subject: |
[GNUnet-SVN] r30825 - in msh: . mshd2 src |
Date: |
Wed, 20 Nov 2013 11:44:32 +0100 |
Author: harsha
Date: 2013-11-20 11:44:31 +0100 (Wed, 20 Nov 2013)
New Revision: 30825
Added:
msh/mshd2/
msh/mshd2/mshd.c
Removed:
msh/mshd2/mshd.c
msh/mshd2/mshd2.c
Modified:
msh/mshd2/Makefile.am
msh/mshd2/mshd-server.c
msh/src/Makefile.am
msh/src/bitmap.c
msh/src/bitmap.h
msh/src/mshd.c
Log:
- rename and fix compile warnings
Modified: msh/mshd2/Makefile.am
===================================================================
--- msh/src/Makefile.am 2013-10-10 15:15:25 UTC (rev 30107)
+++ msh/mshd2/Makefile.am 2013-11-20 10:44:31 UTC (rev 30825)
@@ -1,14 +1,14 @@
-bin_PROGRAMS = mping mshd msh test-pty msh-waiter
+bin_PROGRAMS = mping mshd2 msh2 test-pty msh-waiter
mping_SOURCES = mping.c
-mshd_SOURCES = mshd.c mshd.h util.c util.h mtypes.h \
+mshd2_SOURCES = mshd.c mshd.h util.c util.h mtypes.h \
common.h bitmap.c bitmap.h addressmap.c addressmap.h reduce.h reduce.c \
mshd-server.c mshd_pmonitor.c mshd_pmonitor.h
-mshd_LDADD = -lgnunetutil -lm
+mshd2_LDADD = -lgnunetutil -lm
-msh_SOURCES = msh.c mtypes.h
-msh_LDADD = -lgnunetutil util.$(OBJEXT)
+msh2_SOURCES = msh.c mtypes.h
+msh2_LDADD = -lgnunetutil util.$(OBJEXT)
check_PROGRAMS = \
test-bitmap \
Modified: msh/mshd2/mshd-server.c
===================================================================
--- msh/src/mshd-server.c 2013-10-10 15:15:25 UTC (rev 30107)
+++ msh/mshd2/mshd-server.c 2013-11-20 10:44:31 UTC (rev 30825)
@@ -367,7 +367,8 @@
static const struct GNUNET_SERVER_MessageHandler handlers[] = {
{&handle_addresslookup, NULL, MSH_MTYPE_ADDRESS_LOOKUP, sizeof (struct
MSH_MSG_AddressLookup)},
- {&handle_auth_challenge, NULL, MSH_MTYPE_CHALLENGE, sizeof (struct
MSH_MSG_Challenge)},
+ {&handle_auth_challenge, NULL, MSH_MTYPE_CHALLENGE, sizeof (struct
+
MSH_MSG_Challenge)},
{NULL, NULL, 0, 0}
};
struct sockaddr *saddrs[] = {
@@ -761,6 +762,7 @@
{&handle_auth_challenge_response, NULL, MSH_MTYPE_CHALLENGE_RESPONSE,
sizeof (struct MSH_MSG_ChallengeResponse)},
{&handle_command_input, NULL, MSH_MTYPE_CMD_STREAM_STDIN, 0},
+ {&handle_pty_mode, NULL, MSH_MTYPE_CHALLENGE, sizeof (struct
MSH_MSG_Challenge)},
{NULL, NULL, 0, 0}
};
Deleted: msh/mshd2/mshd.c
===================================================================
--- msh/src/mshd.c 2013-10-10 15:15:25 UTC (rev 30107)
+++ msh/mshd2/mshd.c 2013-11-20 10:44:31 UTC (rev 30825)
@@ -1,1190 +0,0 @@
-/**
- * @file mshd.c
- * @brief implementation of the MSH Daemon
- * @author Sree Harsha Totakura <address@hidden>
- */
-
-#include "common.h"
-#include <gnunet/gnunet_util_lib.h>
-#include <mpi.h>
-#include "util.h"
-#include "mtypes.h"
-#include "bitmap.h"
-#include "addressmap.h"
-
-#define LOG(kind,...) \
- GNUNET_log (kind, __VA_ARGS__)
-
-#define LOG_DEBUG(...) LOG(GNUNET_ERROR_TYPE_DEBUG, __VA_ARGS__)
-
-#define LOG_ERROR(...) LOG(GNUNET_ERROR_TYPE_ERROR, __VA_ARGS__)
-
-#define LOG_STRERROR(kind,cmd) \
- GNUNET_log_from_strerror (kind, "mshd", cmd)
-
-/**
- * Polling interval for checking termination signal
- */
-#define POLL_SHUTDOWN_INTERVAL \
- GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS, 500)
-
-/**
- * Context for verifying addresses
- */
-struct VerifyAddressesCtx
-{
- /**
- * The DLL next ptr
- */
- struct VerifyAddressesCtx *next;
-
- /**
- * The DLL prev ptr
- */
- struct VerifyAddressesCtx *prev;
-
- /**
- * The instance addresses
- */
- struct InstanceAddrInfo *iainfo;
-
- /**
- * The connection handle to the received instance address
- */
- struct GNUNET_CONNECTION_Handle *conn;
-
- /**
- * The transmit handle for the above connection
- */
- struct GNUNET_CONNECTION_TransmitHandle *transmit_handle;
-
- /**
- * task to close the connection
- */
- GNUNET_SCHEDULER_TaskIdentifier close_task;
-
- /**
- * state for the context
- */
- enum {
- VERIFY_ADDRESS_CTX_WRITE,
-
- VERIFY_ADDRESS_CTX_CLOSE
- } state;
-
- /**
- * the ip address
- */
- in_addr_t ip;
-
- /**
- * the port number
- */
- uint16_t port;
-
-};
-
-
-/**
- * Context information for reading from incoming connections
- */
-struct ReadContext
-{
- /**
- * next pointer for DLL
- */
- struct ReadContext *next;
-
- /**
- * prev pointer for DLL
- */
- struct ReadContext *prev;
-
- /**
- * The connection
- */
- struct GNUNET_CONNECTION_Handle *conn;
-
- /**
- * are we waiting for a read on the above connection
- */
- int in_receive;
-};
-
-
-/**
- * The mode of the current listen socket;
- */
-enum ListenMode
-{
- /**
- * Mode in which the listen socket accepts connections from other instances
- * and closes them immediately after reading some data. The incoming
- * connections are used to verify which IP addresses of this instance are
- * reachable from other instances
- */
- LISTEN_MODE_PROBE,
-
- /**
- * In this mode the listen socket accepts requests for starting remote
processes
- */
- LISTEN_MODE_SERV
-} listen_mode;;
-
-
-/**
- * Mapping for instance addresses
- */
-AddressMap *addrmap;
-
-/**
- * Reverse mapping of the address map
- */
-struct ReverseAddressMap *rmap;
-
-/**
- * Rank of this process
- */
-int rank;
-
-/**
- * width of the round -- how many other mshd instances verify our IP addresses
- * in a round
- */
-unsigned int rwidth;
-
-/**
- * The number of total mshd processes
- */
-int nproc;
-
-
-/****************************/
-/* static variables */
-/****************************/
-
-/**
- * DLL head for address verification contexts
- */
-static struct VerifyAddressesCtx *vactx_head;
-
-/**
- * DLL tail for address verification contexts
- */
-static struct VerifyAddressesCtx *vactx_tail;
-
-/**
- * Array of our IP addresses in network-byte format
- */
-static in_addr_t *s_addrs;
-
-/**
- * network handle for the listen socket
- */
-static struct GNUNET_NETWORK_Handle *listen_socket;
-
-/**
- * The process handle of the process started by instance running with rank 0
- */
-static struct GNUNET_OS_Process *proc;
-
-/**
- * Task for running a round
- */
-static GNUNET_SCHEDULER_TaskIdentifier rtask;
-
-/**
- * Task for asynchronous accept on the socket
- */
-static GNUNET_SCHEDULER_TaskIdentifier atask;
-
-/**
- * Task for finalising a round
- */
-static GNUNET_SCHEDULER_TaskIdentifier finalise_task;
-
-/**
- * Bitmap for checking which MPI processes have verified our addresses in the
- * current round
- */
-static struct BitMap *bitmap;
-
-/**
- * Instances addresses learnt in the current round
- */
-struct InstanceAddrInfo **riainfos;
-
-/**
- * head for read context DLL
- */
-static struct ReadContext *rhead;
-
-/**
- * tail for read context DLL
- */
-static struct ReadContext *rtail;
-
-/**
- * arguments representing the command to run and its arguments
- */
-static char **run_args;
-
-/**
- * The path of the unix domain socket we use for communication with local MSH
clients
- */
-static char *unixpath;
-
-/**
- * The file where the addresses of available hosts are written to
- */
-static char *hostsfile;
-
-/**
- * shutdown task
- */
-GNUNET_SCHEDULER_TaskIdentifier shutdown_task;
-
-/**
- * Shutdown polling task
- */
-GNUNET_SCHEDULER_TaskIdentifier poll_shutdown_task;
-
-/**
- * Random hashcode for authentication
- */
-struct GNUNET_HashCode shash;
-
-/**
- * Number of IP addresses
- */
-static unsigned int nips;
-
-/**
- * Current IP verification round
- */
-static unsigned int current_round;
-
-/**
- * The port number of our local socket
- */
-uint16_t listen_port;
-
-
-/**
- * Perform cleanup for shutdown
- *
- * @param cls NULL
- * @param tc scheduler task context
- */
-static void
-do_shutdown (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
-{
- shutdown_task = GNUNET_SCHEDULER_NO_TASK;
- shutdown_local_server ();
- shutdown_daemon_server ();
- MSH_pmonitor_shutdown ();
-}
-
-
-/**
- * Callback function invoked for each interface found.
- *
- * @param cls closure
- * @param name name of the interface (can be NULL for unknown)
- * @param isDefault is this presumably the default interface
- * @param addr address of this interface (can be NULL for unknown or
unassigned)
- * @param broadcast_addr the broadcast address (can be NULL for unknown or
unassigned)
- * @param netmask the network mask (can be NULL for unknown or unassigned))
- * @param addrlen length of the address
- * @return GNUNET_OK to continue iteration, GNUNET_SYSERR to abort
- */
-static int net_if_processor (void *cls, const char *name,
- int isDefault,
- const struct sockaddr *addr,
- const struct sockaddr *broadcast_addr,
- const struct sockaddr *netmask,
- socklen_t addrlen)
-{
- in_addr_t ip;
- const struct sockaddr_in *inaddr;
-
- if (sizeof (struct sockaddr_in) != addrlen)
- return GNUNET_OK; /* Only consider IPv4 for now */
- inaddr = (const struct sockaddr_in *) addr;
- ip = ntohl (inaddr->sin_addr.s_addr);
- if (127 == ip >> 24) /* ignore loopback addresses */
- return GNUNET_OK;
- GNUNET_array_append (s_addrs, nips, ip);
- LOG_DEBUG ("%d: Found IP: %s\n", rank, ip2str (ip));
- addressmap_add (addrmap, rank, listen_port, ip);
- return GNUNET_OK;
-}
-
-
-/**
- * Callback function for data received from the network. Note that
- * both "available" and "err" would be 0 if the read simply timed out.
- *inaddr->sin_addr.s_addrinaddr->sin_addr.s_addr
- * @param cls the read context
- * @param buf pointer to received data
- * @param available number of bytes availabe in "buf",
- * possibly 0 (on errors)
- * @param addr address of the sender
- * @param addrlen size of addr
- * @param errCode value of errno (on receiving errors)
- */
-static void
-conn_reader(void *cls, const void *buf, size_t available,
- const struct sockaddr * addr, socklen_t addrlen, int errCode)
-{
- struct ReadContext *rc = cls;
- uint32_t cid;
-
- if (0 == available)
- {
- GNUNET_break (0);
- goto clo_ret;
- }
- if ((NULL == buf) || (0 == available))
- goto clo_ret;
- (void) memcpy (&cid, buf, sizeof (uint32_t));
- cid = ntohl (cid);
- LOG_DEBUG ("%d: read id %u from connection\n", rank, cid);
-
- clo_ret:
- GNUNET_CONTAINER_DLL_remove (rhead, rtail, rc);
- GNUNET_CONNECTION_destroy (rc->conn);
- GNUNET_free (rc);
-}
-
-
-/**
- * Task to call accept and close on a listening socket
- *
- * @param cls NULL
- * @param tc the scheduler task context
- */
-static void
-accept_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
-{
- struct ReadContext *rctx;
- struct GNUNET_CONNECTION_Handle *conn;
-
- atask = GNUNET_SCHEDULER_NO_TASK;
- if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
- {
- goto clo_ret;
- }
- switch (listen_mode)
- {
- case LISTEN_MODE_PROBE:
- LOG_DEBUG ("%d: Got a probe connect\n", rank);
- conn = GNUNET_CONNECTION_create_from_accept (NULL, NULL, listen_socket);
- if (NULL == conn)
- {
- GNUNET_break (0);
- goto clo_ret;
- }
- rctx = GNUNET_malloc (sizeof (struct ReadContext));
- rctx->conn = conn;
- rctx->in_receive = GNUNET_YES;
- GNUNET_CONNECTION_receive (rctx->conn, sizeof (unsigned int),
- GNUNET_TIME_UNIT_FOREVER_REL, conn_reader,
rctx);
- GNUNET_CONTAINER_DLL_insert_tail (rhead, rtail, rctx);
- break;
- case LISTEN_MODE_SERV:
- {
- struct GNUNET_NETWORK_Handle *client_sock;
- struct GNUNET_CONNECTION_Handle *client_conn;
-
- LOG_DEBUG ("Got a command execution connection\n");
- client_sock = GNUNET_NETWORK_socket_accept (listen_socket, NULL, NULL);
- client_conn = GNUNET_CONNECTION_create_from_existing (client_sock);
- daemon_server_add_connection (client_conn);
- }
- break;
- default:
- GNUNET_assert (0);
- }
- atask = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL,
- listen_socket, &accept_task, NULL);
- return;
-
- clo_ret:
- GNUNET_break (GNUNET_OK == GNUNET_NETWORK_socket_close (listen_socket));
- listen_socket = NULL;
-}
-
-
-/**
- * Task to check if we received a shutdown signal through MPI message from
- * instance 0. This task is to be run every 500ms
- *
- * @param cls NULL
- * @param tc scheduler task context
- */
-static void
-poll_shutdown (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
-{
- int flag;
-
- poll_shutdown_task = GNUNET_SCHEDULER_NO_TASK;
- if (0 != (GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason))
- return;
- flag = 0;
- if (MPI_SUCCESS != MPI_Iprobe(0, MSH_MTYPE_SHUTDOWN, MPI_COMM_WORLD, &flag,
- MPI_STATUS_IGNORE))
- {
- GNUNET_break (0);
- goto reschedule;
- }
- if (0 == flag)
- goto reschedule;
- LOG_DEBUG ("Got termination signal. Shutting down\n");
- GNUNET_SCHEDULER_shutdown (); /* We terminate */
- return;
-
- reschedule:
- poll_shutdown_task = GNUNET_SCHEDULER_add_delayed (POLL_SHUTDOWN_INTERVAL,
- &poll_shutdown, NULL);
-}
-
-
-/**
- * Sends termination signal to all other instances through MPI messaging
- */
-static void
-send_term_signal ()
-{
- unsigned int cnt;
- MPI_Request *req;
-
- /* We broadcase termination signal. Can't use MPI_Bcast here... */
- req = GNUNET_malloc (sizeof (MPI_Request) * (nproc - 1));
- for (cnt = 1; cnt < nproc; cnt++)
- {
- GNUNET_assert (MPI_SUCCESS ==
- MPI_Isend (&cnt, 1, MPI_INT, cnt, MSH_MTYPE_SHUTDOWN,
- MPI_COMM_WORLD, &req[cnt - 1]));
- }
- GNUNET_assert (MPI_SUCCESS == MPI_Waitall (nproc - 1, req,
- MPI_STATUSES_IGNORE));
- GNUNET_free (req);
-}
-
-
-/**
- * Callbacks of this type can be supplied to MSH_monitor_process() to be
- * notified when the corresponding processes exits.
- *
- * @param cls the closure passed to MSH_monitor_process()
- * @param type the process status type
- * @param long the return/exit code of the process
- */
-static void
-proc_exit_cb (void *cls, enum GNUNET_OS_ProcessStatusType type, int code)
-{
- GNUNET_OS_process_destroy (proc);
- proc = NULL;
- LOG (GNUNET_ERROR_TYPE_INFO, "Main process died. Exiting.\n");
- GNUNET_SCHEDULER_shutdown ();
- send_term_signal ();
-}
-
-
-/**
- * Task for running a round
- *
- * @param cls NULL
- * @param tc scheduler task context
- */
-static void
-run_round (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
-
-
-/**
- * Schedules next round. If all the rounds are completed, call the next
- */
-static void
-schedule_next_round ()
-{
- intmax_t pid;
- int total_rounds;
-
- GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == rtask);
- /* Number of rounds required to contact all processes except ourselves
(rwidth
- in parallel in each round) */
- total_rounds = ((nproc - 1) + (rwidth - 1)) / rwidth;
- if (current_round < total_rounds)
- {
- rtask = GNUNET_SCHEDULER_add_now (&run_round, NULL);
- return;
- }
- if (MPI_SUCCESS != MPI_Barrier (MPI_COMM_WORLD))
- {
- GNUNET_break (0);
- return;
- }
- LOG_DEBUG ("Verification phase complete; commencing reduction phase\n");
- GNUNET_break (GNUNET_OK == reduce_ntree ());
- addressmap_print (addrmap);
- listen_mode = LISTEN_MODE_SERV;
- rmap = addressmap_create_reverse_mapping (addrmap);
- shutdown_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL,
- &do_shutdown, NULL);
- pid = (intmax_t) getpid ();
- GNUNET_assert (0 < asprintf (&unixpath, "%ju.sock", pid));
- hostsfile = GNUNET_DISK_mktemp ("MSHD_HOSTS");
- if (GNUNET_OK != addressmap_write_hosts (addrmap, hostsfile))
- {
- GNUNET_SCHEDULER_shutdown ();
- return;
- }
- setenv (MSHD_HOSTSFILE, hostsfile, 1);
- setenv (MSHD_SOCK_NAME, unixpath, 1);
- init_local_server (unixpath);
- init_daemon_server ();
- MSH_pmonitor_init ();
- GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == atask);
- atask = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL,
- listen_socket, &accept_task, NULL);
- if (0 == rank)
- {
- proc = GNUNET_OS_start_process_vap (GNUNET_NO,
- GNUNET_OS_INHERIT_STD_ALL,
- NULL,
- NULL,
- run_args[0],
- run_args);
- if (NULL == proc)
- {
- LOG_ERROR ("Unable to start process `%s'\n", run_args[0]);
- GNUNET_SCHEDULER_shutdown ();
- send_term_signal ();
- return;
- }
- MSH_monitor_process (proc, &proc_exit_cb, NULL);
- return;
- }
- poll_shutdown_task = GNUNET_SCHEDULER_add_delayed (POLL_SHUTDOWN_INTERVAL,
- &poll_shutdown, NULL);
-}
-
-
-/**
- * Cleans up the address verification context
- *
- * @param ctx the context
- */
-static void
-cleanup_verifyaddressctx (struct VerifyAddressesCtx *ctx)
-{
- if (GNUNET_SCHEDULER_NO_TASK != ctx->close_task)
- GNUNET_SCHEDULER_cancel (ctx->close_task);
- if (NULL != ctx->conn)
- GNUNET_CONNECTION_destroy (ctx->conn);
- GNUNET_CONTAINER_DLL_remove (vactx_head, vactx_tail, ctx);
- GNUNET_free (ctx);
-}
-
-
-/**
- * Finalise a round by freeing the resources used by it, cancel the accept task
- * and schedule next round
- *
- * @param cls NULL
- * @param tc scheduler task context
- */
-static void
-finalise_round (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
-{
- struct VerifyAddressesCtx *ctx;
- unsigned int cnt;
-
- finalise_task = GNUNET_SCHEDULER_NO_TASK;
- GNUNET_SCHEDULER_cancel (atask);
- atask = GNUNET_SCHEDULER_NO_TASK;
- while (NULL != (ctx = vactx_head))
- {
- cleanup_verifyaddressctx (ctx);
- }
- for (cnt = 0; cnt < rwidth; cnt++)
- instance_address_info_destroy (riainfos[cnt]);
- if (1 != bitmap_allset (bitmap))
- {
- LOG_ERROR ("Could not verify addresses of all hosts\n");
- GNUNET_SCHEDULER_shutdown ();
- return;
- }
- current_round++;
- schedule_next_round ();
-}
-
-
-/**
- * Task for closing a connection
- *
- * @param cls the verify address context
- * @param tc the scheduler task context
- */
-static void
-conn_close_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
-{
- struct VerifyAddressesCtx *ctx = cls;
- int lb;
- int source;
- int off;
-
- ctx->close_task = GNUNET_SCHEDULER_NO_TASK;
- lb = rank - (current_round * rwidth) - rwidth + nproc;
- GNUNET_assert (0 <= lb);
- lb %= nproc;
- source = instance_address_info_get_rank (ctx->iainfo);
- if (lb <= source)
- off = source - lb;
- else
- off = nproc - lb + source;
- bitmap_set (bitmap, off, 1);
- addressmap_add (addrmap, instance_address_info_get_rank (ctx->iainfo),
- ctx->port, ctx->ip);
- cleanup_verifyaddressctx (ctx);
-}
-
-
-/**
- * Function called to notify a client about the connection
- * begin ready to queue more data. "buf" will be
- * NULL and "size" zero if the connection was closed for
- * writing in the meantime.
- *
- * @param cls closure
- * @param size number of bytes available in buf
- * @param buf where the callee should write the message
- * @return number of bytes written to buf
- */
-static size_t
-conn_write_cb (void *cls, size_t size, void *buf)
-{
- struct VerifyAddressesCtx *ctx = cls;
- size_t rsize;
- uint32_t rank_;
-
- ctx->transmit_handle = NULL;
- rsize = 0;
- if ((NULL == buf) || (0 == size))
- {
- goto clo_ret;
- }
- if (size < sizeof (uint32_t))
- {
- GNUNET_break (0);
- goto clo_ret;
- }
- switch (ctx->state)
- {
- case VERIFY_ADDRESS_CTX_WRITE:
- rank_ = htonl (rank);
- rsize = sizeof (uint32_t);
- (void) memcpy (buf, &rank_, rsize);
- ctx->transmit_handle =
- GNUNET_CONNECTION_notify_transmit_ready (ctx->conn, 0,
- GNUNET_TIME_UNIT_FOREVER_REL,
- &conn_write_cb, ctx);
- ctx->state = VERIFY_ADDRESS_CTX_CLOSE;
- return rsize;
- case VERIFY_ADDRESS_CTX_CLOSE:
- ctx->close_task = GNUNET_SCHEDULER_add_now (&conn_close_task, ctx);
- GNUNET_CONNECTION_destroy (ctx->conn);
- ctx->conn = NULL;
- return 0;
- default:
- GNUNET_assert (0);
- }
-
- clo_ret:
- cleanup_verifyaddressctx (ctx);
- return size;
-}
-
-
-static unsigned int bmx;
-
-static int
-address_iterator_cb (void *cls, uint16_t port, in_addr_t ip)
-{
- struct VerifyAddressesCtx *ctx;
- struct InstanceAddrInfo *iainfo = cls;
- struct sockaddr_in in_addr;;
-
- LOG_DEBUG ("%d: \t %d Opening connection to: %s\n", rank, bmx++, ip2str
((uint32_t) ip) );
- in_addr.sin_family = AF_INET;
- in_addr.sin_port = htons (port);
- in_addr.sin_addr.s_addr = htonl ((uint32_t) ip);
- ctx = GNUNET_malloc (sizeof (struct VerifyAddressesCtx));
- ctx->conn =
- GNUNET_CONNECTION_create_from_sockaddr (AF_INET,
- (const struct sockaddr *)
- &in_addr,
- sizeof (struct sockaddr_in));
- if (NULL == ctx->conn)
- {
- GNUNET_break (0);
- free (ctx);
- return GNUNET_SYSERR;
- }
- ctx->port = port;
- ctx->ip = ip;
- ctx->iainfo = iainfo;
- ctx->state = VERIFY_ADDRESS_CTX_WRITE;
- GNUNET_CONTAINER_DLL_insert_tail (vactx_head, vactx_tail, ctx);
- ctx->transmit_handle =
- GNUNET_CONNECTION_notify_transmit_ready (ctx->conn, sizeof (uint32_t),
- GNUNET_TIME_UNIT_FOREVER_REL,
- &conn_write_cb, ctx);
- return GNUNET_OK;
-}
-
-
-/**
- * Verify the addresses of an instance by connecting to the instance's listen
- * socket
- *
- * @param iainfo the instance's address information
- * @return GNUNET_OK upon success initialisation of the connection to
instance's
- * listen socket (this does not mean that the connection is
- * established or an address is verified); GNUNET_SYSERR upon error
- */
-static int
-verify_addresses (struct InstanceAddrInfo *iainfo)
-{
-
- bmx = 0;
- if (GNUNET_OK != instance_address_info_iterate_addresses (iainfo,
-
&address_iterator_cb,
- iainfo))
- return GNUNET_SYSERR;
- return GNUNET_OK;
-}
-
-
-/**
- * Parse a verfication message from a source for its address information
- *
- * @param msg the message to parse
- * @param source the MPI id of the instance which has sent this message
- * @return the instance's address information
- */
-static struct InstanceAddrInfo *
-parse_verify_address_msg (struct MSH_MSG_VerifyAddress *msg, int source)
-{
- struct InstanceAddr *iaddr;
- struct InstanceAddrInfo *iainfo;
- size_t size;
- uint16_t nips;
- uint16_t cnt;
-
- size = ntohs (msg->header.size);
- nips = ntohs (msg->nips);
- if (size != (sizeof (struct MSH_MSG_VerifyAddress)
- + (sizeof (uint32_t) * nips)))
- {
- LOG_ERROR ("Parsing failed\n");
- return NULL;
- }
- iainfo = instance_address_info_create (source);
- for (cnt = 0; cnt < nips; cnt++)
- {
- LOG_DEBUG ("%d: Parsed address: %s\n", rank, ip2str ((in_addr_t) ntohl
(msg->ipaddrs[cnt])));
- iaddr = instance_address_create_sockaddr_in (ntohs (msg->port),
- (in_addr_t) ntohl
(msg->ipaddrs[cnt]));
- GNUNET_break (GNUNET_OK == instance_address_info_add_address (iainfo,
iaddr));
- }
- return iainfo;
-}
-
-
-/**
- * Receives the IP addresses to verify in the current round from instances
- *
- * @return an array containing the instance addresses; NULL upon a receive
error
- */
-static struct InstanceAddrInfo **
-receive_addresses ()
-{
- struct InstanceAddrInfo **iainfos;
- MPI_Status status;
- int cnt;
-
- iainfos = GNUNET_malloc (sizeof (struct InstanceAddrInfo *) * rwidth);
- for (cnt=0; cnt < rwidth; cnt++)
- {
- struct MSH_MSG_VerifyAddress *msg;
- int rsize;
- int lb;
- int up;
- int source;
-
- GNUNET_break (MPI_SUCCESS ==
- MPI_Probe (MPI_ANY_SOURCE, MSH_MTYPE_VERIFY_ADDRESSES,
- MPI_COMM_WORLD, &status));
- MPI_Get_elements (&status, MPI_BYTE, &rsize);
- /* We expect a message from peers with id p in the range:
- (rank - current_round * rwidth - rwidth) <= p <= (rank - (current_round
* rwidth) -1) */
- lb = rank - current_round * rwidth - rwidth + nproc;
- up = rank - (current_round * rwidth) - 1 + nproc;
- GNUNET_assert (lb >= 0);
- GNUNET_assert (up >= 0);
- lb %= nproc;
- up %= nproc;
- source = status.MPI_SOURCE;
- if (lb == up)
- {
- if (source != lb)
- {
- GNUNET_break (0);
- LOG_ERROR ("%d: Error: source %d; lb: %d; up: %d\n", rank, source, lb,
up);
- goto err_ret;
- }
- }
- else if(lb < up)
- {
- if ((source < lb) || (source > up))
- {
- GNUNET_break (0);
- goto err_ret;
- }
- }
- else if (up < lb)
- {
- if ((source > up) && (source < lb))
- {
- GNUNET_break (0);
- goto err_ret;
- }
- }
- msg = GNUNET_malloc (rsize);
- if (MPI_SUCCESS != MPI_Recv (msg, rsize, MPI_BYTE, source,
- MSH_MTYPE_VERIFY_ADDRESSES, MPI_COMM_WORLD,
- MPI_STATUS_IGNORE))
- {
- GNUNET_break (0);
- goto err_ret;
- }
- LOG_DEBUG ("%d: Received message of size %d from %d\n", rank, rsize,
source);
- if (NULL == (iainfos[cnt] = parse_verify_address_msg (msg, source)))
- {
- free (msg);
- goto err_ret;
- }
- free (msg);
- }
- return iainfos;
-
- err_ret:
- for (cnt=0; cnt < rwidth; cnt++)
- {
- if (NULL != iainfos[cnt])
- instance_address_info_destroy (iainfos[cnt]);
- }
- free (iainfos);
- return NULL;
-}
-
-
-/**
- * Send our addresses to an MPI processes
- *
- * @param rank the rank of the process which has to receive our request
- * @return GNUNET_OK on success; GNUNET_SYSERR upon error
- */
-static int
-send_addresses ()
-{
- struct MSH_MSG_VerifyAddress *msg;
- struct MSH_MSG_VerifyAddress *cpys;
- MPI_Request *sreqs;
- size_t msize;
- int cnt;
- int ret;
- int target;
- unsigned int width;
-
- msize = sizeof (struct MSH_MSG_VerifyAddress) + (nips * sizeof (uint32_t));
- msg = GNUNET_malloc (msize);
- msg->header.size = htons (msize);
- msg->port = htons (listen_port);
- msg->nips = htons (nips);
- for (cnt = 0; cnt < nips; cnt++)
- {
- msg->ipaddrs[cnt] = htonl ((uint32_t) s_addrs[cnt]);
- }
- width = rwidth;
- if ( (0 != ( (nproc - 1) % rwidth)) && (current_round == ( (nproc - 1) /
rwidth)) )
- width = (nproc - 1) % rwidth;
- cpys = NULL;
- cpys = GNUNET_malloc (msize * width);
- sreqs = GNUNET_malloc (width * sizeof (MPI_Request));
- for (cnt=0; cnt < width; cnt++)
- {
- (void) memcpy (&cpys[cnt], msg, msize);
- target = (current_round * rwidth) + cnt + 1;
- GNUNET_assert (target < nproc);
- target = (rank + target) % nproc;
- LOG_DEBUG ("%d: Sending message to %d\n", rank, target);
- ret = MPI_Isend (&cpys[cnt], msize, MPI_BYTE, target,
- MSH_MTYPE_VERIFY_ADDRESSES, MPI_COMM_WORLD, &sreqs[cnt]);
- if (MPI_SUCCESS != ret)
- break;
- }
- free (msg);
- msg = NULL;
- if (cnt != width)
- {
- for (cnt--; cnt >= 0; cnt--)
- {
- GNUNET_break (MPI_SUCCESS == MPI_Cancel (&sreqs[cnt]));
- GNUNET_break (MPI_SUCCESS == MPI_Wait (&sreqs[cnt], MPI_STATUS_IGNORE));
- }
- goto err_ret;
- }
- for (cnt=0; cnt < width; cnt++)
- {
- GNUNET_break (MPI_SUCCESS == MPI_Wait (&sreqs[cnt], MPI_STATUS_IGNORE));
- }
- LOG_DEBUG ("%d: Round: %d -- All messages sent successfully\n", rank,
current_round);
- if (NULL != cpys)
- {
- free (cpys);
- cpys = NULL;
- }
-
- err_ret:
- GNUNET_free_non_null (cpys);
- GNUNET_free_non_null (sreqs);
- return (MPI_SUCCESS == ret) ? GNUNET_OK : GNUNET_SYSERR;
-}
-
-
-/**
- * This functions opens a listen socket, sends this instance's IP addresses to
- * other instances and receives their IP addresses, starts accepting
connections
- * on listen socket and verifies the IP addresses of other instances by
- * connecting to their listen sockets
- *
- * @return GNUNET_OK if verification is successful; GNUNET_SYSERR upon error
(an error
- * message is logged)
- */
-static int
-run_round_ ()
-{
- unsigned int cnt;
-
- if (GNUNET_SYSERR == send_addresses ())
- return GNUNET_SYSERR;
- if (NULL == (riainfos = receive_addresses ()))
- return GNUNET_SYSERR;
- atask = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL,
- listen_socket, &accept_task, NULL);
-
- if (MPI_SUCCESS != MPI_Barrier (MPI_COMM_WORLD))
- {
- GNUNET_break (0);
- return GNUNET_SYSERR;
- }
- for (cnt = 0; cnt < rwidth; cnt++)
- verify_addresses (riainfos[cnt]);
- finalise_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS,
- &finalise_round, NULL);
- return GNUNET_OK;
-}
-
-
-/**
- * Task for running a round
- *
- * @param cls NULL
- * @param tc scheduler task context
- */
-static void
-run_round (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
-{
- rtask = GNUNET_SCHEDULER_NO_TASK;
- if (GNUNET_OK != run_round_ ())
- GNUNET_SCHEDULER_shutdown ();
-}
-
-
-/**
- * Function to copy NULL terminated list of arguments
- *
- * @param argv the NULL terminated list of arguments. Cannot be NULL.
- * @return the copied NULL terminated arguments
- */
-static char **
-copy_argv (char *const *argv)
-{
- char **argv_dup;
- unsigned int argp;
-
- GNUNET_assert (NULL != argv);
- for (argp = 0; NULL != argv[argp]; argp++) ;
- argv_dup = GNUNET_malloc (sizeof (char *) * (argp + 1));
- for (argp = 0; NULL != argv[argp]; argp++)
- argv_dup[argp] = strdup (argv[argp]);
- return argv_dup;
-}
-
-
-/**
- * Frees the given NULL terminated arguments
- *
- * @param argv the NULL terminated list of arguments
- */
-static void
-free_argv (char **argv)
-{
- unsigned int argp;
-
- for (argp = 0; NULL != argv[argp]; argp++)
- GNUNET_free (argv[argp]);
- GNUNET_free (argv);
-}
-
-
-/**
- * Main function that will be run.
- *
- * @param cls closure
- * @param args remaining command-line arguments
- * @param cfgfile name of the configuration file used (for saving, can be
NULL!)
- * @param cfg configuration
- */
-static void
-run (void *cls, char *const *args, const char *cfgfile,
- const struct GNUNET_CONFIGURATION_Handle *cfg)
-{
- struct sockaddr_in addr;
- socklen_t addrlen;
- unsigned int cnt;
-
- LOG_DEBUG ("Running main task\n");
- if (0 == rwidth)
- {
- LOG_ERROR ("Round width cannot be 0. Exiting\n");
- return;
- }
- if (nproc <= rwidth)
- {
- LOG_ERROR ("Round width should be less than the number of processes\n");
- return;
- }
- for (cnt = 0; NULL != args[cnt]; cnt++);
- if (0 == cnt)
- {
- LOG_ERROR ("Need a command to execute\n");
- return;
- }
- run_args = copy_argv (args);
- bitmap = bitmap_create (rwidth);
- addrmap = addressmap_create (nproc);
- addrlen = sizeof (struct sockaddr_in);
- (void) memset (&addr, 0, addrlen);
- addr.sin_addr.s_addr = INADDR_ANY; /* bind to all available addresses */
- listen_socket = open_listen_socket ((struct sockaddr *) &addr, addrlen,
rwidth);
- listen_port = ntohs (addr.sin_port);
- if (NULL == listen_socket)
- return;
- if (0 == listen_port)
- {
- GNUNET_break (0);
- goto clo_ret;
- }
- LOG_DEBUG ("Listening on port %u\n", listen_port);
- GNUNET_OS_network_interfaces_list (&net_if_processor, NULL);
- if (0 == nips)
- {
- LOG_ERROR ("No IP addresses found\n");
- return;
- }
- schedule_next_round ();
- return;
-
- clo_ret:
- GNUNET_break (GNUNET_OK == GNUNET_NETWORK_socket_close (listen_socket));
- listen_socket = NULL;
-}
-
-
-/**
- * The execution start point
- *
- * @param argc the number of arguments
- * @param argv the argument strings
- * @return 0 for successful termination; 1 for termination upon error
- */
-int
-main (int argc, char **argv)
-{
- static const struct GNUNET_GETOPT_CommandLineOption options[] = {
- {'w', "round-width", "COUNT",
- "set the size of each round to COUNT",
- GNUNET_YES, &GNUNET_GETOPT_set_uint, &rwidth},
- GNUNET_GETOPT_OPTION_END
- };
- int ret;
-
- ret = 1;
- rwidth = 1;
- GNUNET_log_setup ("mshd", NULL, NULL);
- if (MPI_SUCCESS != MPI_Init(&argc, &argv))
- {
- LOG_ERROR ("Failed to initialise MPI\n");
- return 1;
- }
- if (MPI_SUCCESS != MPI_Comm_size (MPI_COMM_WORLD, &nproc))
- {
- LOG_ERROR ("Cannot determine the number of mshd processes\n");
- goto fail;
- }
- if (nproc <= rwidth)
- {
- LOG_ERROR ("Given round width is greater than or equal to number of mshd
processes\n");
- goto fail;
- }
- if (MPI_SUCCESS != MPI_Comm_rank (MPI_COMM_WORLD, &rank))
- {
- LOG_ERROR ("Cannot determine our MPI rank\n");
- goto fail;
- }
- if (GNUNET_OK != GNUNET_PROGRAM_run (argc, argv, "mshd", "mshd: MSH daemon",
- options, &run, NULL))
- {
- GNUNET_break (0);
- goto fail;
- }
- ret = 0;
-
- fail:
- if (NULL != bitmap)
- {
- bitmap_destroy (bitmap);
- bitmap = NULL;
- }
- if (NULL != addrmap)
- addressmap_destroy (addrmap);
- if (NULL != rmap)
- reverse_map_destroy (rmap);
- GNUNET_free_non_null (s_addrs);
- if (NULL != run_args)
- free_argv (run_args);
- GNUNET_free_non_null (unixpath);
- if (NULL != hostsfile)
- {
- (void) unlink (hostsfile);
- GNUNET_free (hostsfile);
- }
- LOG_DEBUG ("Finalizing\n");
- GNUNET_break (MPI_SUCCESS == MPI_Finalize());
- LOG_DEBUG ("Returning\n");
- return ret;
-}
Copied: msh/mshd2/mshd.c (from rev 30107, msh/src/mshd2.c)
===================================================================
--- msh/mshd2/mshd.c (rev 0)
+++ msh/mshd2/mshd.c 2013-11-20 10:44:31 UTC (rev 30825)
@@ -0,0 +1,1287 @@
+/**
+ * @file mshd.c
+ * @brief implementation of the MSH Daemon
+ * @author Sree Harsha Totakura <address@hidden>
+ */
+
+#include "common.h"
+#include <gnunet/gnunet_util_lib.h>
+#include <mpi.h>
+#include "util.h"
+#include "mtypes.h"
+#include "bitmap.h"
+#include "addressmap.h"
+
+#define LOG(kind,...) \
+ GNUNET_log (kind, __VA_ARGS__)
+
+#define LOG_DEBUG(...) LOG(GNUNET_ERROR_TYPE_DEBUG, __VA_ARGS__)
+
+#define LOG_ERROR(...) LOG(GNUNET_ERROR_TYPE_ERROR, __VA_ARGS__)
+
+#define LOG_STRERROR(kind,cmd) \
+ GNUNET_log_from_strerror (kind, "mshd", cmd)
+
+/**
+ * Polling interval for checking termination signal
+ */
+#define POLL_SHUTDOWN_INTERVAL \
+ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS, 500)
+
+/**
+ * Context for verifying addresses
+ */
+struct VerifyAddressesCtx
+{
+ /**
+ * The DLL next ptr
+ */
+ struct VerifyAddressesCtx *next;
+
+ /**
+ * The DLL prev ptr
+ */
+ struct VerifyAddressesCtx *prev;
+
+ /**
+ * The instance addresses
+ */
+ struct InstanceAddrInfo *iainfo;
+
+ /**
+ * The connection handle to the received instance address
+ */
+ struct GNUNET_CONNECTION_Handle *conn;
+
+ /**
+ * The transmit handle for the above connection
+ */
+ struct GNUNET_CONNECTION_TransmitHandle *transmit_handle;
+
+ /**
+ * task to close the connection
+ */
+ GNUNET_SCHEDULER_TaskIdentifier close_task;
+
+ /**
+ * state for the context
+ */
+ enum {
+ VERIFY_ADDRESS_CTX_WRITE,
+
+ VERIFY_ADDRESS_CTX_CLOSE
+ } state;
+
+ /**
+ * the ip address
+ */
+ in_addr_t ip;
+
+ /**
+ * the port number
+ */
+ uint16_t port;
+
+};
+
+
+/**
+ * Context information for reading from incoming connections
+ */
+struct ReadContext
+{
+ /**
+ * next pointer for DLL
+ */
+ struct ReadContext *next;
+
+ /**
+ * prev pointer for DLL
+ */
+ struct ReadContext *prev;
+
+ /**
+ * The connection
+ */
+ struct GNUNET_CONNECTION_Handle *conn;
+
+ /**
+ * are we waiting for a read on the above connection
+ */
+ int in_receive;
+};
+
+
+/**
+ * The mode of the current listen socket;
+ */
+enum ListenMode
+{
+ /**
+ * Mode in which the listen socket accepts connections from other instances
+ * and closes them immediately after reading some data. The incoming
+ * connections are used to verify which IP addresses of this instance are
+ * reachable from other instances
+ */
+ MODE_PROBE,
+
+ /**
+ * In this mode the listen socket accepts requests for starting remote
processes
+ */
+ MODE_SERV,
+
+ /**
+ * Simple worker mode. No listening is done.
+ */
+ MODE_WORKER,
+
+ /**
+ * Worker mode with protocol.
+ */
+ MODE_PROTOWORKER
+
+} mode;
+
+
+/**
+ * Mapping for instance addresses
+ */
+AddressMap *addrmap;
+
+/**
+ * Reverse mapping of the address map
+ */
+struct ReverseAddressMap *rmap;
+
+/**
+ * Rank of this process
+ */
+int rank;
+
+/**
+ * width of the round -- how many other mshd instances verify our IP addresses
+ * in a round
+ */
+unsigned int rwidth;
+
+/**
+ * The number of total mshd processes
+ */
+int nproc;
+
+
+/****************************/
+/* static variables */
+/****************************/
+
+/**
+ * DLL head for address verification contexts
+ */
+static struct VerifyAddressesCtx *vactx_head;
+
+/**
+ * DLL tail for address verification contexts
+ */
+static struct VerifyAddressesCtx *vactx_tail;
+
+/**
+ * Array of our IP addresses in network-byte format
+ */
+static in_addr_t *s_addrs;
+
+/**
+ * network handle for the listen socket
+ */
+static struct GNUNET_NETWORK_Handle *listen_socket;
+
+/**
+ * The process handle of the process started by instance running with rank 0
+ */
+static struct GNUNET_OS_Process *proc;
+
+/**
+ * Task for running a round
+ */
+static GNUNET_SCHEDULER_TaskIdentifier rtask;
+
+/**
+ * Task for asynchronous accept on the socket
+ */
+static GNUNET_SCHEDULER_TaskIdentifier atask;
+
+/**
+ * Task for finalising a round
+ */
+static GNUNET_SCHEDULER_TaskIdentifier finalise_task;
+
+/**
+ * Task for waiting for a shutdown signal
+ */
+static GNUNET_SCHEDULER_TaskIdentifier sigread_task;
+
+/**
+ * Bitmap for checking which MPI processes have verified our addresses in the
+ * current round
+ */
+static struct BitMap *bitmap;
+
+/**
+ * Instances addresses learnt in the current round
+ */
+struct InstanceAddrInfo **riainfos;
+
+/**
+ * head for read context DLL
+ */
+static struct ReadContext *rhead;
+
+/**
+ * tail for read context DLL
+ */
+static struct ReadContext *rtail;
+
+/**
+ * arguments representing the command to run and its arguments
+ */
+static char **run_args;
+
+/**
+ * the process handle for the command to run
+ */
+static struct GNUNET_OS_Process *process;
+
+/**
+ * The path of the unix domain socket we use for communication with local MSH
clients
+ */
+static char *unixpath;
+
+/**
+ * The file where the addresses of available hosts are written to
+ */
+static char *hostsfile;
+
+/**
+ * shutdown task
+ */
+GNUNET_SCHEDULER_TaskIdentifier shutdown_task;
+
+/**
+ * Shutdown polling task
+ */
+GNUNET_SCHEDULER_TaskIdentifier poll_shutdown_task;
+
+/**
+ * Random hashcode for authentication
+ */
+struct GNUNET_HashCode shash;
+
+/**
+ * Number of IP addresses
+ */
+static unsigned int nips;
+
+/**
+ * Current IP verification round
+ */
+static unsigned int current_round;
+
+/**
+ * Do we have to create a pty
+ */
+static int need_pty;
+
+/**
+ * The port number of our local socket
+ */
+uint16_t listen_port;
+
+
+/**
+ * Perform cleanup for shutdown
+ *
+ * @param cls NULL
+ * @param tc scheduler task context
+ */
+static void
+do_shutdown (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ shutdown_task = GNUNET_SCHEDULER_NO_TASK;
+ switch (mode)
+ {
+ case MODE_PROBE:
+ break;
+ case MODE_SERV:
+ shutdown_local_server ();
+ MSH_pmonitor_shutdown ();
+ break;
+ case MODE_WORKER:
+ break;
+ case MODE_PROTOWORKER:
+ shutdown_daemon_server ();
+ break;
+ }
+ if (GNUNET_SCHEDULER_NO_TASK != accept_task)
+ {
+ GNUNET_SCHEDULER_cancel (accept_task);
+ accept_task = GNUNET_SCHEDULER_NO_TASK;
+ }
+ if (NULL != listen_socket)
+ {
+ GNUNET_NETWORK_socket_close (listen_socket);
+ listen_socket = NULL;
+ }
+ if (NULL != bitmap)
+ {
+ bitmap_destroy (bitmap);
+ bitmap = NULL;
+ }
+ if (NULL != addrmap)
+ {
+ addressmap_destroy (addrmap);
+ addressmap = NULL;
+ }
+ if (NULL != rmap)
+ {
+ reverse_map_destroy (rmap);
+ rmap = NULL;
+ }
+ GNUNET_free_non_null (s_addrs);
+ s_addrs = NULL;
+ if (NULL != run_args)
+ {
+ free_argv (run_args);
+ run_args = NULL;
+ }
+ GNUNET_free_non_null (unixpath);
+ unixpath = NULL;
+ if (NULL != hostsfile)
+ {
+ (void) unlink (hostsfile);
+ GNUNET_free (hostsfile);
+ hostsfile = NULL;
+ }
+}
+
+
+/**
+ * Callback function invoked for each interface found.
+ *
+ * @param cls closure
+ * @param name name of the interface (can be NULL for unknown)
+ * @param isDefault is this presumably the default interface
+ * @param addr address of this interface (can be NULL for unknown or
unassigned)
+ * @param broadcast_addr the broadcast address (can be NULL for unknown or
unassigned)
+ * @param netmask the network mask (can be NULL for unknown or unassigned))
+ * @param addrlen length of the address
+ * @return GNUNET_OK to continue iteration, GNUNET_SYSERR to abort
+ */
+static int net_if_processor (void *cls, const char *name,
+ int isDefault,
+ const struct sockaddr *addr,
+ const struct sockaddr *broadcast_addr,
+ const struct sockaddr *netmask,
+ socklen_t addrlen)
+{
+ char *hostip;
+ in_addr_t ip;
+ const struct sockaddr_in *inaddr;
+
+ if (sizeof (struct sockaddr_in) != addrlen)
+ return GNUNET_OK; /* Only consider IPv4 for now */
+ inaddr = (const struct sockaddr_in *) addr;
+ ip = ntohl (inaddr->sin_addr.s_addr);
+ if (127 == ip >> 24) /* ignore loopback addresses */
+ return GNUNET_OK;
+ GNUNET_array_append (s_addrs, nips, ip);
+ LOG_DEBUG ("%d: Found IP: %s\n", rank, ip2str (ip));
+ addressmap_add (addrmap, rank, listen_port, ip);
+ return GNUNET_OK;
+}
+
+
+/**
+ * Callback function for data received from the network. Note that
+ * both "available" and "err" would be 0 if the read simply timed out.
+ *inaddr->sin_addr.s_addrinaddr->sin_addr.s_addr
+ * @param cls the read context
+ * @param buf pointer to received data
+ * @param available number of bytes availabe in "buf",
+ * possibly 0 (on errors)
+ * @param addr address of the sender
+ * @param addrlen size of addr
+ * @param errCode value of errno (on receiving errors)
+ */
+static void
+conn_reader(void *cls, const void *buf, size_t available,
+ const struct sockaddr * addr, socklen_t addrlen, int errCode)
+{
+ struct ReadContext *rc = cls;
+ uint32_t cid;
+
+ if (0 == available)
+ {
+ GNUNET_break (0);
+ goto clo_ret;
+ }
+ if ((NULL == buf) || (0 == available))
+ goto clo_ret;
+ (void) memcpy (&cid, buf, sizeof (uint32_t));
+ cid = ntohl (cid);
+ LOG_DEBUG ("%d: read id %u from connection\n", rank, cid);
+
+ clo_ret:
+ GNUNET_CONTAINER_DLL_remove (rhead, rtail, rc);
+ GNUNET_CONNECTION_destroy (rc->conn);
+ GNUNET_free (rc);
+}
+
+
+/**
+ * Fork a worker process. This process sets up a PTY if needed, forks a child
+ * which exec's the binary to start and manages the communication between the
+ * binary and network if given a network connection.
+ */
+static pid_t
+spawn_worker (int do_protocol)
+{
+ struct GNUNET_NETWORK_Handle *sock;
+ struct GNUNET_CONNECTION_Handle *conn;
+ pid_t ret;
+
+ ret = fork ();
+ if (0 != ret)
+ return ret;
+ /* Child process continues here */
+ if (do_protocol)
+ {
+ GNUNET_assert (MODE_SERV == mode);
+ GNUNET_assert (NULL != listen_socket);
+ sock = GNUNET_NETWORK_socket_accept (listen_socket, NULL, NULL);
+ conn = GNUNET_CONNECTION_create_from_existing (sock);
+ }
+ GNUNET_SCHEDULER_cancel (shutdown_task);
+ shutdown_task = GNUNET_SCHEDULER_NO_TASK;
+ do_shutdown (NULL, NULL);
+ mode = MODE_WORKER;
+ if (do_protocol)
+ {
+ mode = MODE_PROTOWORKER;
+ init_daemon_server ();
+ daemon_server_add_connection (conn);
+ }
+ return 0;
+}
+
+
+/**
+ * Task to call accept and close on a listening socket
+ *
+ * @param cls NULL
+ * @param tc the scheduler task context
+ */
+static void
+accept_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ struct ReadContext *rctx;
+ struct GNUNET_CONNECTION_Handle *conn;
+ pid_t pid;
+ int csock;
+
+ atask = GNUNET_SCHEDULER_NO_TASK;
+ if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
+ {
+ goto clo_ret;
+ }
+ switch (mode)
+ {
+ case MODE_PROBE:
+ LOG_DEBUG ("%d: Got a probe connect\n", rank);
+ conn = GNUNET_CONNECTION_create_from_accept (NULL, NULL, listen_socket);
+ if (NULL == conn)
+ {
+ GNUNET_break (0);
+ goto clo_ret;
+ }
+ rctx = GNUNET_malloc (sizeof (struct ReadContext));
+ rctx->conn = conn;
+ rctx->in_receive = GNUNET_YES;
+ GNUNET_CONNECTION_receive (rctx->conn, sizeof (unsigned int),
+ GNUNET_TIME_UNIT_FOREVER_REL, conn_reader,
rctx);
+ GNUNET_CONTAINER_DLL_insert_tail (rhead, rtail, rctx);
+ break;
+ case MODE_SERV:
+ pid = spawn_worker (1);
+ if (-1 == pid)
+ {
+ GNUNET_break (0);
+ GNUNET_SCHEDULER_shutdown (0);
+ goto clo_ret;
+ }
+ if (0 == pid) /* state is cleared and hence we return */
+ return;
+ break;
+ case MODE_WORKER:
+ case MODE_PROTOWORKER:
+ GNUNET_assert (0);
+ }
+ atask = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL,
+ listen_socket, &accept_task, NULL);
+ return;
+
+ clo_ret:
+ GNUNET_break (GNUNET_OK == GNUNET_NETWORK_socket_close (listen_socket));
+ listen_socket = NULL;
+}
+
+
+/**
+ * Task to check if we received a shutdown signal through MPI message from
+ * instance 0. This task is to be run every 500ms
+ *
+ * @param cls NULL
+ * @param tc scheduler task context
+ */
+static void
+poll_shutdown (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ MPI_Status status;
+ int flag;
+
+ poll_shutdown_task = GNUNET_SCHEDULER_NO_TASK;
+ if (0 != (GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason))
+ return;
+ flag = 0;
+ if (MPI_SUCCESS != MPI_Iprobe(0, MSH_MTYPE_SHUTDOWN, MPI_COMM_WORLD, &flag,
+ MPI_STATUS_IGNORE))
+ {
+ GNUNET_break (0);
+ goto reschedule;
+ }
+ if (0 == flag)
+ goto reschedule;
+ LOG_DEBUG ("Got termination signal. Shutting down\n");
+ GNUNET_SCHEDULER_shutdown (); /* We terminate */
+ return;
+
+ reschedule:
+ poll_shutdown_task = GNUNET_SCHEDULER_add_delayed (POLL_SHUTDOWN_INTERVAL,
+ &poll_shutdown, NULL);
+}
+
+
+/**
+ * Sends termination signal to all other instances through MPI messaging
+ */
+static void
+send_term_signal ()
+{
+ unsigned int cnt;
+ MPI_Request *req;
+
+ /* We broadcase termination signal. Can't use MPI_Bcast here... */
+ req = GNUNET_malloc (sizeof (MPI_Request) * (nproc - 1));
+ for (cnt = 1; cnt < nproc; cnt++)
+ {
+ GNUNET_assert (MPI_SUCCESS ==
+ MPI_Isend (&cnt, 1, MPI_INT, cnt, MSH_MTYPE_SHUTDOWN,
+ MPI_COMM_WORLD, &req[cnt - 1]));
+ }
+ GNUNET_assert (MPI_SUCCESS == MPI_Waitall (nproc - 1, req,
+ MPI_STATUSES_IGNORE));
+ GNUNET_free (req);
+}
+
+
+/**
+ * Callbacks of this type can be supplied to MSH_monitor_process() to be
+ * notified when the corresponding processes exits.
+ *
+ * @param cls the closure passed to MSH_monitor_process()
+ * @param type the process status type
+ * @param long the return/exit code of the process
+ */
+static void
+proc_exit_cb (void *cls, enum GNUNET_OS_ProcessStatusType type, int code)
+{
+ GNUNET_OS_process_destroy (proc);
+ proc = NULL;
+ LOG (GNUNET_ERROR_TYPE_INFO, "Main process died. Exiting.\n");
+ GNUNET_SCHEDULER_shutdown ();
+ send_term_signal ();
+}
+
+
+/**
+ * Task for running a round
+ *
+ * @param cls NULL
+ * @param tc scheduler task context
+ */
+static void
+run_round (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
+
+
+/**
+ * Schedules next round. If all the rounds are completed, call the next
+ */
+static void
+schedule_next_round ()
+{
+ pid_t pid;
+ int total_rounds;
+
+ GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == rtask);
+ /* Number of rounds required to contact all processes except ourselves
(rwidth
+ in parallel in each round) */
+ total_rounds = ((nproc - 1) + (rwidth - 1)) / rwidth;
+ if (current_round < total_rounds)
+ {
+ rtask = GNUNET_SCHEDULER_add_now (&run_round, NULL);
+ return;
+ }
+ if (MPI_SUCCESS != MPI_Barrier (MPI_COMM_WORLD))
+ {
+ GNUNET_break (0);
+ return;
+ }
+ LOG_DEBUG ("Verification phase complete; commencing reduction phase\n");
+ GNUNET_break (GNUNET_OK == reduce_ntree ());
+ addressmap_print (addrmap);
+ rmap = addressmap_create_reverse_mapping (addrmap);
+ pid = getpid ();
+ GNUNET_assert (0 < asprintf (&unixpath, "%ju.sock", (intmax_t) pid));
+ setenv (MSHD_SOCK_NAME, unixpath, 1);
+ hostsfile = GNUNET_DISK_mktemp ("MSHD_HOSTS");
+ if (GNUNET_OK != addressmap_write_hosts (addrmap, hostsfile))
+ {
+ GNUNET_SCHEDULER_shutdown ();
+ return;
+ }
+ setenv (MSHD_HOSTSFILE, hostsfile, 1);
+ init_local_server (unixpath);
+ MSH_pmonitor_init ();
+ mode = MODE_SERV;
+ GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == atask);
+ atask = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL,
+ listen_socket, &accept_task, NULL);
+ if (0 == rank)
+ {
+ pid = spawn_worker (0);
+ if (-1 == pid)
+ {
+ GNUNET_break (0);
+ GNUNET_SCHEDULER_shutdown (0);
+ return;
+ }
+ if (0 != pid)
+ {
+ MSH_monitor_process_pid (proc, &proc_exit_cb, NULL);
+ goto end;
+ }
+ if (reverse_connect)
+ do_reverse_connect ();
+ if (need_pty)
+ create_pty ();
+ fork_and_exec (run_args[0]);
+ return;
+ }
+ end:
+ poll_shutdown_task = GNUNET_SCHEDULER_add_delayed (POLL_SHUTDOWN_INTERVAL,
+ &poll_shutdown, NULL);
+}
+
+
+/**
+ * Cleans up the address verification context
+ *
+ * @param ctx the context
+ */
+static void
+cleanup_verifyaddressctx (struct VerifyAddressesCtx *ctx)
+{
+ if (GNUNET_SCHEDULER_NO_TASK != ctx->close_task)
+ GNUNET_SCHEDULER_cancel (ctx->close_task);
+ if (NULL != ctx->conn)
+ GNUNET_CONNECTION_destroy (ctx->conn);
+ GNUNET_CONTAINER_DLL_remove (vactx_head, vactx_tail, ctx);
+ GNUNET_free (ctx);
+}
+
+
+/**
+ * Finalise a round by freeing the resources used by it, cancel the accept task
+ * and schedule next round
+ *
+ * @param cls NULL
+ * @param tc scheduler task context
+ */
+static void
+finalise_round (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ struct VerifyAddressesCtx *ctx;
+ unsigned int cnt;
+
+ finalise_task = GNUNET_SCHEDULER_NO_TASK;
+ GNUNET_SCHEDULER_cancel (atask);
+ atask = GNUNET_SCHEDULER_NO_TASK;
+ while (NULL != (ctx = vactx_head))
+ {
+ cleanup_verifyaddressctx (ctx);
+ }
+ for (cnt = 0; cnt < rwidth; cnt++)
+ instance_address_info_destroy (riainfos[cnt]);
+ if (1 != bitmap_allset (bitmap))
+ {
+ LOG_ERROR ("Could not verify addresses of all hosts\n");
+ GNUNET_SCHEDULER_shutdown ();
+ return;
+ }
+ current_round++;
+ schedule_next_round ();
+}
+
+
+/**
+ * Task for closing a connection
+ *
+ * @param cls the verify address context
+ * @param tc the scheduler task context
+ */
+static void
+conn_close_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ struct VerifyAddressesCtx *ctx = cls;
+ int lb;
+ int source;
+ int off;
+
+ ctx->close_task = GNUNET_SCHEDULER_NO_TASK;
+ lb = rank - (current_round * rwidth) - rwidth + nproc;
+ GNUNET_assert (0 <= lb);
+ lb %= nproc;
+ source = instance_address_info_get_rank (ctx->iainfo);
+ if (lb <= source)
+ off = source - lb;
+ else
+ off = nproc - lb + source;
+ bitmap_set (bitmap, off, 1);
+ addressmap_add (addrmap, instance_address_info_get_rank (ctx->iainfo),
+ ctx->port, ctx->ip);
+ cleanup_verifyaddressctx (ctx);
+}
+
+
+/**
+ * Function called to notify a client about the connection
+ * begin ready to queue more data. "buf" will be
+ * NULL and "size" zero if the connection was closed for
+ * writing in the meantime.
+ *
+ * @param cls closure
+ * @param size number of bytes available in buf
+ * @param buf where the callee should write the message
+ * @return number of bytes written to buf
+ */
+static size_t
+conn_write_cb (void *cls, size_t size, void *buf)
+{
+ struct VerifyAddressesCtx *ctx = cls;
+ size_t rsize;
+ uint32_t rank_;
+
+ ctx->transmit_handle = NULL;
+ rsize = 0;
+ if ((NULL == buf) || (0 == size))
+ {
+ goto clo_ret;
+ }
+ if (size < sizeof (uint32_t))
+ {
+ GNUNET_break (0);
+ goto clo_ret;
+ }
+ switch (ctx->state)
+ {
+ case VERIFY_ADDRESS_CTX_WRITE:
+ rank_ = htonl (rank);
+ rsize = sizeof (uint32_t);
+ (void) memcpy (buf, &rank_, rsize);
+ ctx->transmit_handle =
+ GNUNET_CONNECTION_notify_transmit_ready (ctx->conn, 0,
+ GNUNET_TIME_UNIT_FOREVER_REL,
+ &conn_write_cb, ctx);
+ ctx->state = VERIFY_ADDRESS_CTX_CLOSE;
+ return rsize;
+ case VERIFY_ADDRESS_CTX_CLOSE:
+ ctx->close_task = GNUNET_SCHEDULER_add_now (&conn_close_task, ctx);
+ GNUNET_CONNECTION_destroy (ctx->conn);
+ ctx->conn = NULL;
+ return 0;
+ default:
+ GNUNET_assert (0);
+ }
+
+ clo_ret:
+ cleanup_verifyaddressctx (ctx);
+ return size;
+}
+
+
+static unsigned int bmx;
+
+static int
+address_iterator_cb (void *cls, uint16_t port, in_addr_t ip)
+{
+ struct VerifyAddressesCtx *ctx;
+ struct InstanceAddrInfo *iainfo = cls;
+ struct sockaddr_in in_addr;;
+
+ LOG_DEBUG ("%d: \t %d Opening connection to: %s\n", rank, bmx++, ip2str
((uint32_t) ip) );
+ in_addr.sin_family = AF_INET;
+ in_addr.sin_port = htons (port);
+ in_addr.sin_addr.s_addr = htonl ((uint32_t) ip);
+ ctx = GNUNET_malloc (sizeof (struct VerifyAddressesCtx));
+ ctx->conn =
+ GNUNET_CONNECTION_create_from_sockaddr (AF_INET,
+ (const struct sockaddr *)
+ &in_addr,
+ sizeof (struct sockaddr_in));
+ if (NULL == ctx->conn)
+ {
+ GNUNET_break (0);
+ free (ctx);
+ return GNUNET_SYSERR;
+ }
+ ctx->port = port;
+ ctx->ip = ip;
+ ctx->iainfo = iainfo;
+ ctx->state = VERIFY_ADDRESS_CTX_WRITE;
+ GNUNET_CONTAINER_DLL_insert_tail (vactx_head, vactx_tail, ctx);
+ ctx->transmit_handle =
+ GNUNET_CONNECTION_notify_transmit_ready (ctx->conn, sizeof (uint32_t),
+ GNUNET_TIME_UNIT_FOREVER_REL,
+ &conn_write_cb, ctx);
+ return GNUNET_OK;
+}
+
+
+/**
+ * Verify the addresses of an instance by connecting to the instance's listen
+ * socket
+ *
+ * @param iainfo the instance's address information
+ * @return GNUNET_OK upon success initialisation of the connection to
instance's
+ * listen socket (this does not mean that the connection is
+ * established or an address is verified); GNUNET_SYSERR upon error
+ */
+static int
+verify_addresses (struct InstanceAddrInfo *iainfo)
+{
+
+ struct InstanceAddr *iaddr;
+
+ bmx = 0;
+ if (GNUNET_OK != instance_address_info_iterate_addresses (iainfo,
+
&address_iterator_cb,
+ iainfo))
+ return GNUNET_SYSERR;
+ return GNUNET_OK;
+}
+
+
+/**
+ * Parse a verfication message from a source for its address information
+ *
+ * @param msg the message to parse
+ * @param source the MPI id of the instance which has sent this message
+ * @return the instance's address information
+ */
+static struct InstanceAddrInfo *
+parse_verify_address_msg (struct MSH_MSG_VerifyAddress *msg, int source)
+{
+ struct InstanceAddr *iaddr;
+ struct InstanceAddrInfo *iainfo;
+ size_t size;
+ uint16_t nips;
+ uint16_t cnt;
+
+ size = ntohs (msg->header.size);
+ nips = ntohs (msg->nips);
+ if (size != (sizeof (struct MSH_MSG_VerifyAddress)
+ + (sizeof (uint32_t) * nips)))
+ {
+ LOG_ERROR ("Parsing failed\n");
+ return NULL;
+ }
+ iainfo = instance_address_info_create (source);
+ for (cnt = 0; cnt < nips; cnt++)
+ {
+ LOG_DEBUG ("%d: Parsed address: %s\n", rank, ip2str ((in_addr_t) ntohl
(msg->ipaddrs[cnt])));
+ iaddr = instance_address_create_sockaddr_in (ntohs (msg->port),
+ (in_addr_t) ntohl
(msg->ipaddrs[cnt]));
+ GNUNET_break (GNUNET_OK == instance_address_info_add_address (iainfo,
iaddr));
+ }
+ return iainfo;
+}
+
+
+/**
+ * Receives the IP addresses to verify in the current round from instances
+ *
+ * @return an array containing the instance addresses; NULL upon a receive
error
+ */
+static struct InstanceAddrInfo **
+receive_addresses ()
+{
+ struct InstanceAddrInfo **iainfos;
+ MPI_Status status;
+ int cnt;
+
+ iainfos = GNUNET_malloc (sizeof (struct InstanceAddrInfo *) * rwidth);
+ for (cnt=0; cnt < rwidth; cnt++)
+ {
+ struct MSH_MSG_VerifyAddress *msg;
+ int rsize;
+ int lb;
+ int up;
+ int source;
+ int ret;
+
+ GNUNET_break (MPI_SUCCESS ==
+ MPI_Probe (MPI_ANY_SOURCE, MSH_MTYPE_VERIFY_ADDRESSES,
+ MPI_COMM_WORLD, &status));
+ MPI_Get_elements (&status, MPI_BYTE, &rsize);
+ /* We expect a message from peers with id p in the range:
+ (rank - current_round * rwidth - rwidth) <= p <= (rank - (current_round
* rwidth) -1) */
+ lb = rank - current_round * rwidth - rwidth + nproc;
+ up = rank - (current_round * rwidth) - 1 + nproc;
+ GNUNET_assert (lb >= 0);
+ GNUNET_assert (up >= 0);
+ lb %= nproc;
+ up %= nproc;
+ source = status.MPI_SOURCE;
+ if (lb == up)
+ if (source != lb)
+ {
+ GNUNET_break (0);
+ LOG_ERROR ("%d: Error: source %d; lb: %d; up: %d\n", rank, source, lb,
up);
+ goto err_ret;
+ }
+ else if ((source > up) || (source < lb))
+ {
+ GNUNET_break (0);
+ goto err_ret;
+ }
+ msg = GNUNET_malloc (rsize);
+ if (MPI_SUCCESS != MPI_Recv (msg, rsize, MPI_BYTE, source,
+ MSH_MTYPE_VERIFY_ADDRESSES, MPI_COMM_WORLD,
+ MPI_STATUS_IGNORE))
+ {
+ GNUNET_break (0);
+ goto err_ret;
+ }
+ LOG_DEBUG ("%d: Received message of size %d from %d\n", rank, rsize,
source);
+ if (NULL == (iainfos[cnt] = parse_verify_address_msg (msg, source)))
+ {
+ free (msg);
+ goto err_ret;
+ }
+ free (msg);
+ }
+ return iainfos;
+
+ err_ret:
+ for (cnt=0; cnt < rwidth; cnt++)
+ {
+ if (NULL != iainfos[cnt])
+ instance_address_info_destroy (iainfos[cnt]);
+ }
+ free (iainfos);
+ return NULL;
+}
+
+
+/**
+ * Send our addresses to an MPI processes
+ *
+ * @param rank the rank of the process which has to receive our request
+ * @return GNUNET_OK on success; GNUNET_SYSERR upon error
+ */
+static int
+send_addresses ()
+{
+ struct MSH_MSG_VerifyAddress *msg;
+ struct MSH_MSG_VerifyAddress *cpys;
+ MPI_Request *sreqs;
+ size_t msize;
+ int cnt;
+ int ret;
+ int target;
+ unsigned int width;
+
+ msize = sizeof (struct MSH_MSG_VerifyAddress) + (nips * sizeof (uint32_t));
+ msg = GNUNET_malloc (msize);
+ msg->header.size = htons (msize);
+ msg->port = htons (listen_port);
+ msg->nips = htons (nips);
+ for (cnt = 0; cnt < nips; cnt++)
+ {
+ msg->ipaddrs[cnt] = htonl ((uint32_t) s_addrs[cnt]);
+ }
+ width = rwidth;
+ if ( (0 != ( (nproc - 1) % rwidth)) && (current_round == ( (nproc - 1) /
rwidth)) )
+ width = (nproc - 1) % rwidth;
+ cpys = NULL;
+ cpys = GNUNET_malloc (msize * width);
+ sreqs = GNUNET_malloc (width * sizeof (MPI_Request));
+ for (cnt=0; cnt < width; cnt++)
+ {
+ (void) memcpy (&cpys[cnt], msg, msize);
+ target = (current_round * rwidth) + cnt + 1;
+ GNUNET_assert (target < nproc);
+ target = (rank + target) % nproc;
+ LOG_DEBUG ("%d: Sending message to %d\n", rank, target);
+ ret = MPI_Isend (&cpys[cnt], msize, MPI_BYTE, target,
+ MSH_MTYPE_VERIFY_ADDRESSES, MPI_COMM_WORLD, &sreqs[cnt]);
+ if (MPI_SUCCESS != ret)
+ break;
+ }
+ free (msg);
+ msg = NULL;
+ if (cnt != width)
+ {
+ for (cnt--; cnt >= 0; cnt--)
+ {
+ GNUNET_break (MPI_SUCCESS == MPI_Cancel (&sreqs[cnt]));
+ GNUNET_break (MPI_SUCCESS == MPI_Wait (&sreqs[cnt], MPI_STATUS_IGNORE));
+ }
+ goto err_ret;
+ }
+ for (cnt=0; cnt < width; cnt++)
+ {
+ GNUNET_break (MPI_SUCCESS == MPI_Wait (&sreqs[cnt], MPI_STATUS_IGNORE));
+ }
+ LOG_DEBUG ("%d: Round: %d -- All messages sent successfully\n", rank,
current_round);
+ if (NULL != cpys)
+ {
+ free (cpys);
+ cpys = NULL;
+ }
+
+ err_ret:
+ GNUNET_free_non_null (cpys);
+ GNUNET_free_non_null (sreqs);
+ return (MPI_SUCCESS == ret) ? GNUNET_OK : GNUNET_SYSERR;
+}
+
+
+/**
+ * This functions opens a listen socket, sends this instance's IP addresses to
+ * other instances and receives their IP addresses, starts accepting
connections
+ * on listen socket and verifies the IP addresses of other instances by
+ * connecting to their listen sockets
+ *
+ * @return GNUNET_OK if verification is successful; GNUNET_SYSERR upon error
(an error
+ * message is logged)
+ */
+static int
+run_round_ ()
+{
+ unsigned int cnt;
+
+ if (GNUNET_SYSERR == send_addresses ())
+ return GNUNET_SYSERR;
+ if (NULL == (riainfos = receive_addresses ()))
+ return GNUNET_SYSERR;
+ atask = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL,
+ listen_socket, &accept_task, NULL);
+
+ if (MPI_SUCCESS != MPI_Barrier (MPI_COMM_WORLD))
+ {
+ GNUNET_break (0);
+ return GNUNET_SYSERR;
+ }
+ for (cnt = 0; cnt < rwidth; cnt++)
+ verify_addresses (riainfos[cnt]);
+ finalise_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS,
+ &finalise_round, NULL);
+ return GNUNET_OK;
+}
+
+
+/**
+ * Task for running a round
+ *
+ * @param cls NULL
+ * @param tc scheduler task context
+ */
+static void
+run_round (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ rtask = GNUNET_SCHEDULER_NO_TASK;
+ if (GNUNET_OK != run_round_ ())
+ GNUNET_SCHEDULER_shutdown ();
+}
+
+
+/**
+ * Function to copy NULL terminated list of arguments
+ *
+ * @param argv the NULL terminated list of arguments. Cannot be NULL.
+ * @return the copied NULL terminated arguments
+ */
+static char **
+copy_argv (char *const *argv)
+{
+ char **argv_dup;
+ unsigned int argp;
+
+ GNUNET_assert (NULL != argv);
+ for (argp = 0; NULL != argv[argp]; argp++) ;
+ argv_dup = GNUNET_malloc (sizeof (char *) * (argp + 1));
+ for (argp = 0; NULL != argv[argp]; argp++)
+ argv_dup[argp] = strdup (argv[argp]);
+ return argv_dup;
+}
+
+
+/**
+ * Frees the given NULL terminated arguments
+ *
+ * @param argv the NULL terminated list of arguments
+ */
+static void
+free_argv (char **argv)
+{
+ unsigned int argp;
+
+ for (argp = 0; NULL != argv[argp]; argp++)
+ GNUNET_free (argv[argp]);
+ GNUNET_free (argv);
+}
+
+
+/**
+ * Main function that will be run.
+ *
+ * @param cls closure
+ * @param args remaining command-line arguments
+ * @param cfgfile name of the configuration file used (for saving, can be
NULL!)
+ * @param cfg configuration
+ */
+static void
+run (void *cls, char *const *args, const char *cfgfile,
+ const struct GNUNET_CONFIGURATION_Handle *cfg)
+{
+ const struct GNUNET_DISK_FileHandle *fh;
+ struct sockaddr_in addr;
+ socklen_t addrlen;
+ unsigned int cnt;
+
+ LOG_DEBUG ("Running main task\n");
+ if (0 == rwidth)
+ {
+ LOG_ERROR ("Round width cannot be 0. Exiting\n");
+ return;
+ }
+ if (nproc <= rwidth)
+ {
+ LOG_ERROR ("Round width should be less than the number of processes\n");
+ return;
+ }
+ for (cnt = 0; NULL != args[cnt]; cnt++);
+ if (0 == cnt)
+ {
+ LOG_ERROR ("Need a command to execute\n");
+ return;
+ }
+ run_args = copy_argv (args);
+ bitmap = bitmap_create (rwidth);
+ addrmap = addressmap_create (nproc);
+ addrlen = sizeof (struct sockaddr_in);
+ (void) memset (&addr, 0, addrlen);
+ addr.sin_addr.s_addr = INADDR_ANY; /* bind to all available addresses */
+ listen_socket = open_listen_socket ((struct sockaddr *) &addr, addrlen,
rwidth);
+ listen_port = ntohs (addr.sin_port);
+ if (NULL == listen_socket)
+ return;
+ if (0 == listen_port)
+ {
+ GNUNET_break (0);
+ goto clo_ret;
+ }
+ LOG_DEBUG ("Listening on port %u\n", listen_port);
+ GNUNET_OS_network_interfaces_list (&net_if_processor, NULL);
+ if (0 == nips)
+ {
+ LOG_ERROR ("No IP addresses found\n");
+ return;
+ }
+ schedule_next_round ();
+ shutdown_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL,
+ &do_shutdown, NULL);
+ return;
+
+ clo_ret:
+ GNUNET_break (GNUNET_OK == GNUNET_NETWORK_socket_close (listen_socket));
+ listen_socket = NULL;
+}
+
+
+/**
+ * The execution start point
+ *
+ * @param argc the number of arguments
+ * @param argv the argument strings
+ * @return 0 for successful termination; 1 for termination upon error
+ */
+int
+main (int argc, char **argv)
+{
+ static const struct GNUNET_GETOPT_CommandLineOption options[] = {
+ {'w', "round-width", "COUNT",
+ "set the size of each round to COUNT",
+ GNUNET_YES, &GNUNET_GETOPT_set_uint, &rwidth},
+ GNUNET_GETOPT_OPTION_END
+ };
+ int ret;
+ int c;
+
+ ret = 1;
+ rwidth = 1;
+ GNUNET_log_setup ("mshd", NULL, NULL);
+ if (MPI_SUCCESS != MPI_Init(&argc, &argv))
+ {
+ LOG_ERROR ("Failed to initialise MPI\n");
+ return 1;
+ }
+ if (MPI_SUCCESS != MPI_Comm_size (MPI_COMM_WORLD, &nproc))
+ {
+ LOG_ERROR ("Cannot determine the number of mshd processes\n");
+ goto fail;
+ }
+ if (nproc <= rwidth)
+ {
+ LOG_ERROR ("Given round width is greater than or equal to number of mshd
processes\n");
+ goto fail;
+ }
+ if (MPI_SUCCESS != MPI_Comm_rank (MPI_COMM_WORLD, &rank))
+ {
+ LOG_ERROR ("Cannot determine our MPI rank\n");
+ goto fail;
+ }
+ if (GNUNET_OK != GNUNET_PROGRAM_run (argc, argv, "mshd", "mshd: MSH daemon",
+ options, &run, NULL))
+ {
+ GNUNET_break (0);
+ goto fail;
+ }
+ ret = 0;
+
+ fail:
+ if (MODE_WORKER <= mode)
+ return;
+ LOG_DEBUG ("Returning\n");
+ GNUNET_break (MPI_SUCCESS == MPI_Finalize());
+ return ret;
+}
Deleted: msh/mshd2/mshd2.c
===================================================================
--- msh/src/mshd2.c 2013-10-10 15:15:25 UTC (rev 30107)
+++ msh/mshd2/mshd2.c 2013-11-20 10:44:31 UTC (rev 30825)
@@ -1,1287 +0,0 @@
-/**
- * @file mshd.c
- * @brief implementation of the MSH Daemon
- * @author Sree Harsha Totakura <address@hidden>
- */
-
-#include "common.h"
-#include <gnunet/gnunet_util_lib.h>
-#include <mpi.h>
-#include "util.h"
-#include "mtypes.h"
-#include "bitmap.h"
-#include "addressmap.h"
-
-#define LOG(kind,...) \
- GNUNET_log (kind, __VA_ARGS__)
-
-#define LOG_DEBUG(...) LOG(GNUNET_ERROR_TYPE_DEBUG, __VA_ARGS__)
-
-#define LOG_ERROR(...) LOG(GNUNET_ERROR_TYPE_ERROR, __VA_ARGS__)
-
-#define LOG_STRERROR(kind,cmd) \
- GNUNET_log_from_strerror (kind, "mshd", cmd)
-
-/**
- * Polling interval for checking termination signal
- */
-#define POLL_SHUTDOWN_INTERVAL \
- GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS, 500)
-
-/**
- * Context for verifying addresses
- */
-struct VerifyAddressesCtx
-{
- /**
- * The DLL next ptr
- */
- struct VerifyAddressesCtx *next;
-
- /**
- * The DLL prev ptr
- */
- struct VerifyAddressesCtx *prev;
-
- /**
- * The instance addresses
- */
- struct InstanceAddrInfo *iainfo;
-
- /**
- * The connection handle to the received instance address
- */
- struct GNUNET_CONNECTION_Handle *conn;
-
- /**
- * The transmit handle for the above connection
- */
- struct GNUNET_CONNECTION_TransmitHandle *transmit_handle;
-
- /**
- * task to close the connection
- */
- GNUNET_SCHEDULER_TaskIdentifier close_task;
-
- /**
- * state for the context
- */
- enum {
- VERIFY_ADDRESS_CTX_WRITE,
-
- VERIFY_ADDRESS_CTX_CLOSE
- } state;
-
- /**
- * the ip address
- */
- in_addr_t ip;
-
- /**
- * the port number
- */
- uint16_t port;
-
-};
-
-
-/**
- * Context information for reading from incoming connections
- */
-struct ReadContext
-{
- /**
- * next pointer for DLL
- */
- struct ReadContext *next;
-
- /**
- * prev pointer for DLL
- */
- struct ReadContext *prev;
-
- /**
- * The connection
- */
- struct GNUNET_CONNECTION_Handle *conn;
-
- /**
- * are we waiting for a read on the above connection
- */
- int in_receive;
-};
-
-
-/**
- * The mode of the current listen socket;
- */
-enum ListenMode
-{
- /**
- * Mode in which the listen socket accepts connections from other instances
- * and closes them immediately after reading some data. The incoming
- * connections are used to verify which IP addresses of this instance are
- * reachable from other instances
- */
- MODE_PROBE,
-
- /**
- * In this mode the listen socket accepts requests for starting remote
processes
- */
- MODE_SERV,
-
- /**
- * Simple worker mode. No listening is done.
- */
- MODE_WORKER,
-
- /**
- * Worker mode with protocol.
- */
- MODE_PROTOWORKER
-
-} mode;
-
-
-/**
- * Mapping for instance addresses
- */
-AddressMap *addrmap;
-
-/**
- * Reverse mapping of the address map
- */
-struct ReverseAddressMap *rmap;
-
-/**
- * Rank of this process
- */
-int rank;
-
-/**
- * width of the round -- how many other mshd instances verify our IP addresses
- * in a round
- */
-unsigned int rwidth;
-
-/**
- * The number of total mshd processes
- */
-int nproc;
-
-
-/****************************/
-/* static variables */
-/****************************/
-
-/**
- * DLL head for address verification contexts
- */
-static struct VerifyAddressesCtx *vactx_head;
-
-/**
- * DLL tail for address verification contexts
- */
-static struct VerifyAddressesCtx *vactx_tail;
-
-/**
- * Array of our IP addresses in network-byte format
- */
-static in_addr_t *s_addrs;
-
-/**
- * network handle for the listen socket
- */
-static struct GNUNET_NETWORK_Handle *listen_socket;
-
-/**
- * The process handle of the process started by instance running with rank 0
- */
-static struct GNUNET_OS_Process *proc;
-
-/**
- * Task for running a round
- */
-static GNUNET_SCHEDULER_TaskIdentifier rtask;
-
-/**
- * Task for asynchronous accept on the socket
- */
-static GNUNET_SCHEDULER_TaskIdentifier atask;
-
-/**
- * Task for finalising a round
- */
-static GNUNET_SCHEDULER_TaskIdentifier finalise_task;
-
-/**
- * Task for waiting for a shutdown signal
- */
-static GNUNET_SCHEDULER_TaskIdentifier sigread_task;
-
-/**
- * Bitmap for checking which MPI processes have verified our addresses in the
- * current round
- */
-static struct BitMap *bitmap;
-
-/**
- * Instances addresses learnt in the current round
- */
-struct InstanceAddrInfo **riainfos;
-
-/**
- * head for read context DLL
- */
-static struct ReadContext *rhead;
-
-/**
- * tail for read context DLL
- */
-static struct ReadContext *rtail;
-
-/**
- * arguments representing the command to run and its arguments
- */
-static char **run_args;
-
-/**
- * the process handle for the command to run
- */
-static struct GNUNET_OS_Process *process;
-
-/**
- * The path of the unix domain socket we use for communication with local MSH
clients
- */
-static char *unixpath;
-
-/**
- * The file where the addresses of available hosts are written to
- */
-static char *hostsfile;
-
-/**
- * shutdown task
- */
-GNUNET_SCHEDULER_TaskIdentifier shutdown_task;
-
-/**
- * Shutdown polling task
- */
-GNUNET_SCHEDULER_TaskIdentifier poll_shutdown_task;
-
-/**
- * Random hashcode for authentication
- */
-struct GNUNET_HashCode shash;
-
-/**
- * Number of IP addresses
- */
-static unsigned int nips;
-
-/**
- * Current IP verification round
- */
-static unsigned int current_round;
-
-/**
- * Do we have to create a pty
- */
-static int need_pty;
-
-/**
- * The port number of our local socket
- */
-uint16_t listen_port;
-
-
-/**
- * Perform cleanup for shutdown
- *
- * @param cls NULL
- * @param tc scheduler task context
- */
-static void
-do_shutdown (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
-{
- shutdown_task = GNUNET_SCHEDULER_NO_TASK;
- switch (mode)
- {
- case MODE_PROBE:
- break;
- case MODE_SERV:
- shutdown_local_server ();
- MSH_pmonitor_shutdown ();
- break;
- case MODE_WORKER:
- break;
- case MODE_PROTOWORKER:
- shutdown_daemon_server ();
- break;
- }
- if (GNUNET_SCHEDULER_NO_TASK != accept_task)
- {
- GNUNET_SCHEDULER_cancel (accept_task);
- accept_task = GNUNET_SCHEDULER_NO_TASK;
- }
- if (NULL != listen_socket)
- {
- GNUNET_NETWORK_socket_close (listen_socket);
- listen_socket = NULL;
- }
- if (NULL != bitmap)
- {
- bitmap_destroy (bitmap);
- bitmap = NULL;
- }
- if (NULL != addrmap)
- {
- addressmap_destroy (addrmap);
- addressmap = NULL;
- }
- if (NULL != rmap)
- {
- reverse_map_destroy (rmap);
- rmap = NULL;
- }
- GNUNET_free_non_null (s_addrs);
- s_addrs = NULL;
- if (NULL != run_args)
- {
- free_argv (run_args);
- run_args = NULL;
- }
- GNUNET_free_non_null (unixpath);
- unixpath = NULL;
- if (NULL != hostsfile)
- {
- (void) unlink (hostsfile);
- GNUNET_free (hostsfile);
- hostsfile = NULL;
- }
-}
-
-
-/**
- * Callback function invoked for each interface found.
- *
- * @param cls closure
- * @param name name of the interface (can be NULL for unknown)
- * @param isDefault is this presumably the default interface
- * @param addr address of this interface (can be NULL for unknown or
unassigned)
- * @param broadcast_addr the broadcast address (can be NULL for unknown or
unassigned)
- * @param netmask the network mask (can be NULL for unknown or unassigned))
- * @param addrlen length of the address
- * @return GNUNET_OK to continue iteration, GNUNET_SYSERR to abort
- */
-static int net_if_processor (void *cls, const char *name,
- int isDefault,
- const struct sockaddr *addr,
- const struct sockaddr *broadcast_addr,
- const struct sockaddr *netmask,
- socklen_t addrlen)
-{
- char *hostip;
- in_addr_t ip;
- const struct sockaddr_in *inaddr;
-
- if (sizeof (struct sockaddr_in) != addrlen)
- return GNUNET_OK; /* Only consider IPv4 for now */
- inaddr = (const struct sockaddr_in *) addr;
- ip = ntohl (inaddr->sin_addr.s_addr);
- if (127 == ip >> 24) /* ignore loopback addresses */
- return GNUNET_OK;
- GNUNET_array_append (s_addrs, nips, ip);
- LOG_DEBUG ("%d: Found IP: %s\n", rank, ip2str (ip));
- addressmap_add (addrmap, rank, listen_port, ip);
- return GNUNET_OK;
-}
-
-
-/**
- * Callback function for data received from the network. Note that
- * both "available" and "err" would be 0 if the read simply timed out.
- *inaddr->sin_addr.s_addrinaddr->sin_addr.s_addr
- * @param cls the read context
- * @param buf pointer to received data
- * @param available number of bytes availabe in "buf",
- * possibly 0 (on errors)
- * @param addr address of the sender
- * @param addrlen size of addr
- * @param errCode value of errno (on receiving errors)
- */
-static void
-conn_reader(void *cls, const void *buf, size_t available,
- const struct sockaddr * addr, socklen_t addrlen, int errCode)
-{
- struct ReadContext *rc = cls;
- uint32_t cid;
-
- if (0 == available)
- {
- GNUNET_break (0);
- goto clo_ret;
- }
- if ((NULL == buf) || (0 == available))
- goto clo_ret;
- (void) memcpy (&cid, buf, sizeof (uint32_t));
- cid = ntohl (cid);
- LOG_DEBUG ("%d: read id %u from connection\n", rank, cid);
-
- clo_ret:
- GNUNET_CONTAINER_DLL_remove (rhead, rtail, rc);
- GNUNET_CONNECTION_destroy (rc->conn);
- GNUNET_free (rc);
-}
-
-
-/**
- * Fork a worker process. This process sets up a PTY if needed, forks a child
- * which exec's the binary to start and manages the communication between the
- * binary and network if given a network connection.
- */
-static pid_t
-spawn_worker (int do_protocol)
-{
- struct GNUNET_NETWORK_Handle *sock;
- struct GNUNET_CONNECTION_Handle *conn;
- pid_t ret;
-
- ret = fork ();
- if (0 != ret)
- return ret;
- /* Child process continues here */
- if (do_protocol)
- {
- GNUNET_assert (MODE_SERV == mode);
- GNUNET_assert (NULL != listen_socket);
- sock = GNUNET_NETWORK_socket_accept (listen_socket, NULL, NULL);
- conn = GNUNET_CONNECTION_create_from_existing (sock);
- }
- GNUNET_SCHEDULER_cancel (shutdown_task);
- shutdown_task = GNUNET_SCHEDULER_NO_TASK;
- do_shutdown (NULL, NULL);
- mode = MODE_WORKER;
- if (do_protocol)
- {
- mode = MODE_PROTOWORKER;
- init_daemon_server ();
- daemon_server_add_connection (conn);
- }
- return 0;
-}
-
-
-/**
- * Task to call accept and close on a listening socket
- *
- * @param cls NULL
- * @param tc the scheduler task context
- */
-static void
-accept_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
-{
- struct ReadContext *rctx;
- struct GNUNET_CONNECTION_Handle *conn;
- pid_t pid;
- int csock;
-
- atask = GNUNET_SCHEDULER_NO_TASK;
- if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
- {
- goto clo_ret;
- }
- switch (mode)
- {
- case MODE_PROBE:
- LOG_DEBUG ("%d: Got a probe connect\n", rank);
- conn = GNUNET_CONNECTION_create_from_accept (NULL, NULL, listen_socket);
- if (NULL == conn)
- {
- GNUNET_break (0);
- goto clo_ret;
- }
- rctx = GNUNET_malloc (sizeof (struct ReadContext));
- rctx->conn = conn;
- rctx->in_receive = GNUNET_YES;
- GNUNET_CONNECTION_receive (rctx->conn, sizeof (unsigned int),
- GNUNET_TIME_UNIT_FOREVER_REL, conn_reader,
rctx);
- GNUNET_CONTAINER_DLL_insert_tail (rhead, rtail, rctx);
- break;
- case MODE_SERV:
- pid = spawn_worker (NULL);
- if (-1 == pid)
- {
- GNUNET_break (0);
- GNUNET_SCHEDULER_shutdown (0);
- goto clo_ret;
- }
- if (0 == pid) /* state is cleared and hence we return */
- return;
- break;
- case MODE_WORKER:
- case MODE_PROTOWORKER:
- GNUNET_assert (0);
- }
- atask = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL,
- listen_socket, &accept_task, NULL);
- return;
-
- clo_ret:
- GNUNET_break (GNUNET_OK == GNUNET_NETWORK_socket_close (listen_socket));
- listen_socket = NULL;
-}
-
-
-/**
- * Task to check if we received a shutdown signal through MPI message from
- * instance 0. This task is to be run every 500ms
- *
- * @param cls NULL
- * @param tc scheduler task context
- */
-static void
-poll_shutdown (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
-{
- MPI_Status status;
- int flag;
-
- poll_shutdown_task = GNUNET_SCHEDULER_NO_TASK;
- if (0 != (GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason))
- return;
- flag = 0;
- if (MPI_SUCCESS != MPI_Iprobe(0, MSH_MTYPE_SHUTDOWN, MPI_COMM_WORLD, &flag,
- MPI_STATUS_IGNORE))
- {
- GNUNET_break (0);
- goto reschedule;
- }
- if (0 == flag)
- goto reschedule;
- LOG_DEBUG ("Got termination signal. Shutting down\n");
- GNUNET_SCHEDULER_shutdown (); /* We terminate */
- return;
-
- reschedule:
- poll_shutdown_task = GNUNET_SCHEDULER_add_delayed (POLL_SHUTDOWN_INTERVAL,
- &poll_shutdown, NULL);
-}
-
-
-/**
- * Sends termination signal to all other instances through MPI messaging
- */
-static void
-send_term_signal ()
-{
- unsigned int cnt;
- MPI_Request *req;
-
- /* We broadcase termination signal. Can't use MPI_Bcast here... */
- req = GNUNET_malloc (sizeof (MPI_Request) * (nproc - 1));
- for (cnt = 1; cnt < nproc; cnt++)
- {
- GNUNET_assert (MPI_SUCCESS ==
- MPI_Isend (&cnt, 1, MPI_INT, cnt, MSH_MTYPE_SHUTDOWN,
- MPI_COMM_WORLD, &req[cnt - 1]));
- }
- GNUNET_assert (MPI_SUCCESS == MPI_Waitall (nproc - 1, req,
- MPI_STATUSES_IGNORE));
- GNUNET_free (req);
-}
-
-
-/**
- * Callbacks of this type can be supplied to MSH_monitor_process() to be
- * notified when the corresponding processes exits.
- *
- * @param cls the closure passed to MSH_monitor_process()
- * @param type the process status type
- * @param long the return/exit code of the process
- */
-static void
-proc_exit_cb (void *cls, enum GNUNET_OS_ProcessStatusType type, int code)
-{
- GNUNET_OS_process_destroy (proc);
- proc = NULL;
- LOG (GNUNET_ERROR_TYPE_INFO, "Main process died. Exiting.\n");
- GNUNET_SCHEDULER_shutdown ();
- send_term_signal ();
-}
-
-
-/**
- * Task for running a round
- *
- * @param cls NULL
- * @param tc scheduler task context
- */
-static void
-run_round (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
-
-
-/**
- * Schedules next round. If all the rounds are completed, call the next
- */
-static void
-schedule_next_round ()
-{
- pid_t pid;
- int total_rounds;
-
- GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == rtask);
- /* Number of rounds required to contact all processes except ourselves
(rwidth
- in parallel in each round) */
- total_rounds = ((nproc - 1) + (rwidth - 1)) / rwidth;
- if (current_round < total_rounds)
- {
- rtask = GNUNET_SCHEDULER_add_now (&run_round, NULL);
- return;
- }
- if (MPI_SUCCESS != MPI_Barrier (MPI_COMM_WORLD))
- {
- GNUNET_break (0);
- return;
- }
- LOG_DEBUG ("Verification phase complete; commencing reduction phase\n");
- GNUNET_break (GNUNET_OK == reduce_ntree ());
- addressmap_print (addrmap);
- rmap = addressmap_create_reverse_mapping (addrmap);
- pid = getpid ();
- GNUNET_assert (0 < asprintf (&unixpath, "%ju.sock", (intmax_t) pid));
- setenv (MSHD_SOCK_NAME, unixpath, 1);
- hostsfile = GNUNET_DISK_mktemp ("MSHD_HOSTS");
- if (GNUNET_OK != addressmap_write_hosts (addrmap, hostsfile))
- {
- GNUNET_SCHEDULER_shutdown ();
- return;
- }
- setenv (MSHD_HOSTSFILE, hostsfile, 1);
- init_local_server (unixpath);
- MSH_pmonitor_init ();
- mode = MODE_SERV;
- GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == atask);
- atask = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL,
- listen_socket, &accept_task, NULL);
- if (0 == rank)
- {
- pid = spawn_worker (0);
- if (-1 == pid)
- {
- GNUNET_break (0);
- GNUNET_SCHEDULER_shutdown (0);
- return;
- }
- if (0 != pid)
- {
- MSH_monitor_process_pid (proc, &proc_exit_cb, NULL);
- goto end;
- }
- if (reverse_connect)
- do_reverse_connect ();
- if (need_pty)
- create_pty ();
- fork_and_exec (run_args[0]);
- return;
- }
- end:
- poll_shutdown_task = GNUNET_SCHEDULER_add_delayed (POLL_SHUTDOWN_INTERVAL,
- &poll_shutdown, NULL);
-}
-
-
-/**
- * Cleans up the address verification context
- *
- * @param ctx the context
- */
-static void
-cleanup_verifyaddressctx (struct VerifyAddressesCtx *ctx)
-{
- if (GNUNET_SCHEDULER_NO_TASK != ctx->close_task)
- GNUNET_SCHEDULER_cancel (ctx->close_task);
- if (NULL != ctx->conn)
- GNUNET_CONNECTION_destroy (ctx->conn);
- GNUNET_CONTAINER_DLL_remove (vactx_head, vactx_tail, ctx);
- GNUNET_free (ctx);
-}
-
-
-/**
- * Finalise a round by freeing the resources used by it, cancel the accept task
- * and schedule next round
- *
- * @param cls NULL
- * @param tc scheduler task context
- */
-static void
-finalise_round (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
-{
- struct VerifyAddressesCtx *ctx;
- unsigned int cnt;
-
- finalise_task = GNUNET_SCHEDULER_NO_TASK;
- GNUNET_SCHEDULER_cancel (atask);
- atask = GNUNET_SCHEDULER_NO_TASK;
- while (NULL != (ctx = vactx_head))
- {
- cleanup_verifyaddressctx (ctx);
- }
- for (cnt = 0; cnt < rwidth; cnt++)
- instance_address_info_destroy (riainfos[cnt]);
- if (1 != bitmap_allset (bitmap))
- {
- LOG_ERROR ("Could not verify addresses of all hosts\n");
- GNUNET_SCHEDULER_shutdown ();
- return;
- }
- current_round++;
- schedule_next_round ();
-}
-
-
-/**
- * Task for closing a connection
- *
- * @param cls the verify address context
- * @param tc the scheduler task context
- */
-static void
-conn_close_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
-{
- struct VerifyAddressesCtx *ctx = cls;
- int lb;
- int source;
- int off;
-
- ctx->close_task = GNUNET_SCHEDULER_NO_TASK;
- lb = rank - (current_round * rwidth) - rwidth + nproc;
- GNUNET_assert (0 <= lb);
- lb %= nproc;
- source = instance_address_info_get_rank (ctx->iainfo);
- if (lb <= source)
- off = source - lb;
- else
- off = nproc - lb + source;
- bitmap_set (bitmap, off, 1);
- addressmap_add (addrmap, instance_address_info_get_rank (ctx->iainfo),
- ctx->port, ctx->ip);
- cleanup_verifyaddressctx (ctx);
-}
-
-
-/**
- * Function called to notify a client about the connection
- * begin ready to queue more data. "buf" will be
- * NULL and "size" zero if the connection was closed for
- * writing in the meantime.
- *
- * @param cls closure
- * @param size number of bytes available in buf
- * @param buf where the callee should write the message
- * @return number of bytes written to buf
- */
-static size_t
-conn_write_cb (void *cls, size_t size, void *buf)
-{
- struct VerifyAddressesCtx *ctx = cls;
- size_t rsize;
- uint32_t rank_;
-
- ctx->transmit_handle = NULL;
- rsize = 0;
- if ((NULL == buf) || (0 == size))
- {
- goto clo_ret;
- }
- if (size < sizeof (uint32_t))
- {
- GNUNET_break (0);
- goto clo_ret;
- }
- switch (ctx->state)
- {
- case VERIFY_ADDRESS_CTX_WRITE:
- rank_ = htonl (rank);
- rsize = sizeof (uint32_t);
- (void) memcpy (buf, &rank_, rsize);
- ctx->transmit_handle =
- GNUNET_CONNECTION_notify_transmit_ready (ctx->conn, 0,
- GNUNET_TIME_UNIT_FOREVER_REL,
- &conn_write_cb, ctx);
- ctx->state = VERIFY_ADDRESS_CTX_CLOSE;
- return rsize;
- case VERIFY_ADDRESS_CTX_CLOSE:
- ctx->close_task = GNUNET_SCHEDULER_add_now (&conn_close_task, ctx);
- GNUNET_CONNECTION_destroy (ctx->conn);
- ctx->conn = NULL;
- return 0;
- default:
- GNUNET_assert (0);
- }
-
- clo_ret:
- cleanup_verifyaddressctx (ctx);
- return size;
-}
-
-
-static unsigned int bmx;
-
-static int
-address_iterator_cb (void *cls, uint16_t port, in_addr_t ip)
-{
- struct VerifyAddressesCtx *ctx;
- struct InstanceAddrInfo *iainfo = cls;
- struct sockaddr_in in_addr;;
-
- LOG_DEBUG ("%d: \t %d Opening connection to: %s\n", rank, bmx++, ip2str
((uint32_t) ip) );
- in_addr.sin_family = AF_INET;
- in_addr.sin_port = htons (port);
- in_addr.sin_addr.s_addr = htonl ((uint32_t) ip);
- ctx = GNUNET_malloc (sizeof (struct VerifyAddressesCtx));
- ctx->conn =
- GNUNET_CONNECTION_create_from_sockaddr (AF_INET,
- (const struct sockaddr *)
- &in_addr,
- sizeof (struct sockaddr_in));
- if (NULL == ctx->conn)
- {
- GNUNET_break (0);
- free (ctx);
- return GNUNET_SYSERR;
- }
- ctx->port = port;
- ctx->ip = ip;
- ctx->iainfo = iainfo;
- ctx->state = VERIFY_ADDRESS_CTX_WRITE;
- GNUNET_CONTAINER_DLL_insert_tail (vactx_head, vactx_tail, ctx);
- ctx->transmit_handle =
- GNUNET_CONNECTION_notify_transmit_ready (ctx->conn, sizeof (uint32_t),
- GNUNET_TIME_UNIT_FOREVER_REL,
- &conn_write_cb, ctx);
- return GNUNET_OK;
-}
-
-
-/**
- * Verify the addresses of an instance by connecting to the instance's listen
- * socket
- *
- * @param iainfo the instance's address information
- * @return GNUNET_OK upon success initialisation of the connection to
instance's
- * listen socket (this does not mean that the connection is
- * established or an address is verified); GNUNET_SYSERR upon error
- */
-static int
-verify_addresses (struct InstanceAddrInfo *iainfo)
-{
-
- struct InstanceAddr *iaddr;
-
- bmx = 0;
- if (GNUNET_OK != instance_address_info_iterate_addresses (iainfo,
-
&address_iterator_cb,
- iainfo))
- return GNUNET_SYSERR;
- return GNUNET_OK;
-}
-
-
-/**
- * Parse a verfication message from a source for its address information
- *
- * @param msg the message to parse
- * @param source the MPI id of the instance which has sent this message
- * @return the instance's address information
- */
-static struct InstanceAddrInfo *
-parse_verify_address_msg (struct MSH_MSG_VerifyAddress *msg, int source)
-{
- struct InstanceAddr *iaddr;
- struct InstanceAddrInfo *iainfo;
- size_t size;
- uint16_t nips;
- uint16_t cnt;
-
- size = ntohs (msg->header.size);
- nips = ntohs (msg->nips);
- if (size != (sizeof (struct MSH_MSG_VerifyAddress)
- + (sizeof (uint32_t) * nips)))
- {
- LOG_ERROR ("Parsing failed\n");
- return NULL;
- }
- iainfo = instance_address_info_create (source);
- for (cnt = 0; cnt < nips; cnt++)
- {
- LOG_DEBUG ("%d: Parsed address: %s\n", rank, ip2str ((in_addr_t) ntohl
(msg->ipaddrs[cnt])));
- iaddr = instance_address_create_sockaddr_in (ntohs (msg->port),
- (in_addr_t) ntohl
(msg->ipaddrs[cnt]));
- GNUNET_break (GNUNET_OK == instance_address_info_add_address (iainfo,
iaddr));
- }
- return iainfo;
-}
-
-
-/**
- * Receives the IP addresses to verify in the current round from instances
- *
- * @return an array containing the instance addresses; NULL upon a receive
error
- */
-static struct InstanceAddrInfo **
-receive_addresses ()
-{
- struct InstanceAddrInfo **iainfos;
- MPI_Status status;
- int cnt;
-
- iainfos = GNUNET_malloc (sizeof (struct InstanceAddrInfo *) * rwidth);
- for (cnt=0; cnt < rwidth; cnt++)
- {
- struct MSH_MSG_VerifyAddress *msg;
- int rsize;
- int lb;
- int up;
- int source;
- int ret;
-
- GNUNET_break (MPI_SUCCESS ==
- MPI_Probe (MPI_ANY_SOURCE, MSH_MTYPE_VERIFY_ADDRESSES,
- MPI_COMM_WORLD, &status));
- MPI_Get_elements (&status, MPI_BYTE, &rsize);
- /* We expect a message from peers with id p in the range:
- (rank - current_round * rwidth - rwidth) <= p <= (rank - (current_round
* rwidth) -1) */
- lb = rank - current_round * rwidth - rwidth + nproc;
- up = rank - (current_round * rwidth) - 1 + nproc;
- GNUNET_assert (lb >= 0);
- GNUNET_assert (up >= 0);
- lb %= nproc;
- up %= nproc;
- source = status.MPI_SOURCE;
- if (lb == up)
- if (source != lb)
- {
- GNUNET_break (0);
- LOG_ERROR ("%d: Error: source %d; lb: %d; up: %d\n", rank, source, lb,
up);
- goto err_ret;
- }
- else if ((source > up) || (source < lb))
- {
- GNUNET_break (0);
- goto err_ret;
- }
- msg = GNUNET_malloc (rsize);
- if (MPI_SUCCESS != MPI_Recv (msg, rsize, MPI_BYTE, source,
- MSH_MTYPE_VERIFY_ADDRESSES, MPI_COMM_WORLD,
- MPI_STATUS_IGNORE))
- {
- GNUNET_break (0);
- goto err_ret;
- }
- LOG_DEBUG ("%d: Received message of size %d from %d\n", rank, rsize,
source);
- if (NULL == (iainfos[cnt] = parse_verify_address_msg (msg, source)))
- {
- free (msg);
- goto err_ret;
- }
- free (msg);
- }
- return iainfos;
-
- err_ret:
- for (cnt=0; cnt < rwidth; cnt++)
- {
- if (NULL != iainfos[cnt])
- instance_address_info_destroy (iainfos[cnt]);
- }
- free (iainfos);
- return NULL;
-}
-
-
-/**
- * Send our addresses to an MPI processes
- *
- * @param rank the rank of the process which has to receive our request
- * @return GNUNET_OK on success; GNUNET_SYSERR upon error
- */
-static int
-send_addresses ()
-{
- struct MSH_MSG_VerifyAddress *msg;
- struct MSH_MSG_VerifyAddress *cpys;
- MPI_Request *sreqs;
- size_t msize;
- int cnt;
- int ret;
- int target;
- unsigned int width;
-
- msize = sizeof (struct MSH_MSG_VerifyAddress) + (nips * sizeof (uint32_t));
- msg = GNUNET_malloc (msize);
- msg->header.size = htons (msize);
- msg->port = htons (listen_port);
- msg->nips = htons (nips);
- for (cnt = 0; cnt < nips; cnt++)
- {
- msg->ipaddrs[cnt] = htonl ((uint32_t) s_addrs[cnt]);
- }
- width = rwidth;
- if ( (0 != ( (nproc - 1) % rwidth)) && (current_round == ( (nproc - 1) /
rwidth)) )
- width = (nproc - 1) % rwidth;
- cpys = NULL;
- cpys = GNUNET_malloc (msize * width);
- sreqs = GNUNET_malloc (width * sizeof (MPI_Request));
- for (cnt=0; cnt < width; cnt++)
- {
- (void) memcpy (&cpys[cnt], msg, msize);
- target = (current_round * rwidth) + cnt + 1;
- GNUNET_assert (target < nproc);
- target = (rank + target) % nproc;
- LOG_DEBUG ("%d: Sending message to %d\n", rank, target);
- ret = MPI_Isend (&cpys[cnt], msize, MPI_BYTE, target,
- MSH_MTYPE_VERIFY_ADDRESSES, MPI_COMM_WORLD, &sreqs[cnt]);
- if (MPI_SUCCESS != ret)
- break;
- }
- free (msg);
- msg = NULL;
- if (cnt != width)
- {
- for (cnt--; cnt >= 0; cnt--)
- {
- GNUNET_break (MPI_SUCCESS == MPI_Cancel (&sreqs[cnt]));
- GNUNET_break (MPI_SUCCESS == MPI_Wait (&sreqs[cnt], MPI_STATUS_IGNORE));
- }
- goto err_ret;
- }
- for (cnt=0; cnt < width; cnt++)
- {
- GNUNET_break (MPI_SUCCESS == MPI_Wait (&sreqs[cnt], MPI_STATUS_IGNORE));
- }
- LOG_DEBUG ("%d: Round: %d -- All messages sent successfully\n", rank,
current_round);
- if (NULL != cpys)
- {
- free (cpys);
- cpys = NULL;
- }
-
- err_ret:
- GNUNET_free_non_null (cpys);
- GNUNET_free_non_null (sreqs);
- return (MPI_SUCCESS == ret) ? GNUNET_OK : GNUNET_SYSERR;
-}
-
-
-/**
- * This functions opens a listen socket, sends this instance's IP addresses to
- * other instances and receives their IP addresses, starts accepting
connections
- * on listen socket and verifies the IP addresses of other instances by
- * connecting to their listen sockets
- *
- * @return GNUNET_OK if verification is successful; GNUNET_SYSERR upon error
(an error
- * message is logged)
- */
-static int
-run_round_ ()
-{
- unsigned int cnt;
-
- if (GNUNET_SYSERR == send_addresses ())
- return GNUNET_SYSERR;
- if (NULL == (riainfos = receive_addresses ()))
- return GNUNET_SYSERR;
- atask = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL,
- listen_socket, &accept_task, NULL);
-
- if (MPI_SUCCESS != MPI_Barrier (MPI_COMM_WORLD))
- {
- GNUNET_break (0);
- return GNUNET_SYSERR;
- }
- for (cnt = 0; cnt < rwidth; cnt++)
- verify_addresses (riainfos[cnt]);
- finalise_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS,
- &finalise_round, NULL);
- return GNUNET_OK;
-}
-
-
-/**
- * Task for running a round
- *
- * @param cls NULL
- * @param tc scheduler task context
- */
-static void
-run_round (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
-{
- rtask = GNUNET_SCHEDULER_NO_TASK;
- if (GNUNET_OK != run_round_ ())
- GNUNET_SCHEDULER_shutdown ();
-}
-
-
-/**
- * Function to copy NULL terminated list of arguments
- *
- * @param argv the NULL terminated list of arguments. Cannot be NULL.
- * @return the copied NULL terminated arguments
- */
-static char **
-copy_argv (char *const *argv)
-{
- char **argv_dup;
- unsigned int argp;
-
- GNUNET_assert (NULL != argv);
- for (argp = 0; NULL != argv[argp]; argp++) ;
- argv_dup = GNUNET_malloc (sizeof (char *) * (argp + 1));
- for (argp = 0; NULL != argv[argp]; argp++)
- argv_dup[argp] = strdup (argv[argp]);
- return argv_dup;
-}
-
-
-/**
- * Frees the given NULL terminated arguments
- *
- * @param argv the NULL terminated list of arguments
- */
-static void
-free_argv (char **argv)
-{
- unsigned int argp;
-
- for (argp = 0; NULL != argv[argp]; argp++)
- GNUNET_free (argv[argp]);
- GNUNET_free (argv);
-}
-
-
-/**
- * Main function that will be run.
- *
- * @param cls closure
- * @param args remaining command-line arguments
- * @param cfgfile name of the configuration file used (for saving, can be
NULL!)
- * @param cfg configuration
- */
-static void
-run (void *cls, char *const *args, const char *cfgfile,
- const struct GNUNET_CONFIGURATION_Handle *cfg)
-{
- const struct GNUNET_DISK_FileHandle *fh;
- struct sockaddr_in addr;
- socklen_t addrlen;
- unsigned int cnt;
-
- LOG_DEBUG ("Running main task\n");
- if (0 == rwidth)
- {
- LOG_ERROR ("Round width cannot be 0. Exiting\n");
- return;
- }
- if (nproc <= rwidth)
- {
- LOG_ERROR ("Round width should be less than the number of processes\n");
- return;
- }
- for (cnt = 0; NULL != args[cnt]; cnt++);
- if (0 == cnt)
- {
- LOG_ERROR ("Need a command to execute\n");
- return;
- }
- run_args = copy_argv (args);
- bitmap = bitmap_create (rwidth);
- addrmap = addressmap_create (nproc);
- addrlen = sizeof (struct sockaddr_in);
- (void) memset (&addr, 0, addrlen);
- addr.sin_addr.s_addr = INADDR_ANY; /* bind to all available addresses */
- listen_socket = open_listen_socket ((struct sockaddr *) &addr, addrlen,
rwidth);
- listen_port = ntohs (addr.sin_port);
- if (NULL == listen_socket)
- return;
- if (0 == listen_port)
- {
- GNUNET_break (0);
- goto clo_ret;
- }
- LOG_DEBUG ("Listening on port %u\n", listen_port);
- GNUNET_OS_network_interfaces_list (&net_if_processor, NULL);
- if (0 == nips)
- {
- LOG_ERROR ("No IP addresses found\n");
- return;
- }
- schedule_next_round ();
- shutdown_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL,
- &do_shutdown, NULL);
- return;
-
- clo_ret:
- GNUNET_break (GNUNET_OK == GNUNET_NETWORK_socket_close (listen_socket));
- listen_socket = NULL;
-}
-
-
-/**
- * The execution start point
- *
- * @param argc the number of arguments
- * @param argv the argument strings
- * @return 0 for successful termination; 1 for termination upon error
- */
-int
-main (int argc, char **argv)
-{
- static const struct GNUNET_GETOPT_CommandLineOption options[] = {
- {'w', "round-width", "COUNT",
- "set the size of each round to COUNT",
- GNUNET_YES, &GNUNET_GETOPT_set_uint, &rwidth},
- GNUNET_GETOPT_OPTION_END
- };
- int ret;
- int c;
-
- ret = 1;
- rwidth = 1;
- GNUNET_log_setup ("mshd", NULL, NULL);
- if (MPI_SUCCESS != MPI_Init(&argc, &argv))
- {
- LOG_ERROR ("Failed to initialise MPI\n");
- return 1;
- }
- if (MPI_SUCCESS != MPI_Comm_size (MPI_COMM_WORLD, &nproc))
- {
- LOG_ERROR ("Cannot determine the number of mshd processes\n");
- goto fail;
- }
- if (nproc <= rwidth)
- {
- LOG_ERROR ("Given round width is greater than or equal to number of mshd
processes\n");
- goto fail;
- }
- if (MPI_SUCCESS != MPI_Comm_rank (MPI_COMM_WORLD, &rank))
- {
- LOG_ERROR ("Cannot determine our MPI rank\n");
- goto fail;
- }
- if (GNUNET_OK != GNUNET_PROGRAM_run (argc, argv, "mshd", "mshd: MSH daemon",
- options, &run, NULL))
- {
- GNUNET_break (0);
- goto fail;
- }
- ret = 0;
-
- fail:
- if (MODE_WORKER <= mode)
- return;
- LOG_DEBUG ("Returning\n");
- GNUNET_break (MPI_SUCCESS == MPI_Finalize());
- return ret;
-}
Modified: msh/src/Makefile.am
===================================================================
--- msh/src/Makefile.am 2013-11-20 10:41:28 UTC (rev 30824)
+++ msh/src/Makefile.am 2013-11-20 10:44:31 UTC (rev 30825)
@@ -4,7 +4,7 @@
mshd_SOURCES = mshd.c mshd.h util.c util.h mtypes.h \
common.h bitmap.c bitmap.h addressmap.c addressmap.h reduce.h reduce.c \
- mshd-server.c mshd_pmonitor.c mshd_pmonitor.h \
+ server.c pmonitor.c pmonitor.h \
ttymodes.h ttymodes.c
mshd_LDADD = -lgnunetutil -lm
Modified: msh/src/bitmap.c
===================================================================
--- msh/src/bitmap.c 2013-11-20 10:41:28 UTC (rev 30824)
+++ msh/src/bitmap.c 2013-11-20 10:44:31 UTC (rev 30825)
@@ -148,7 +148,7 @@
GNUNET_assert (off < bm->array_size);
for (cnt = 0; cnt < off; cnt ++)
{
- if (0 != max ^ bm->barray[cnt])
+ if (0 != (max ^ bm->barray[cnt]))
return 0;
}
if (0 == bitidx)
Modified: msh/src/bitmap.h
===================================================================
--- msh/src/bitmap.h 2013-11-20 10:41:28 UTC (rev 30824)
+++ msh/src/bitmap.h 2013-11-20 10:44:31 UTC (rev 30825)
@@ -64,7 +64,17 @@
bitmap_isbitset (struct BitMap *bm, unsigned int id);
+/**
+ * Check if all relevant bits in the bitmap are set
+ *
+ * @param bm the handle to the bitmap
+ * @return 1 if all the bits are set; 0 if not
+ */
+int
+bitmap_allset (struct BitMap *bm);
+
+
#endif /* #ifndef BITMAP_H_ */
/* End of bitmap.h */
Modified: msh/src/mshd.c
===================================================================
--- msh/src/mshd.c 2013-11-20 10:41:28 UTC (rev 30824)
+++ msh/src/mshd.c 2013-11-20 10:44:31 UTC (rev 30825)
@@ -11,6 +11,9 @@
#include "mtypes.h"
#include "bitmap.h"
#include "addressmap.h"
+#include "reduce.h"
+#include "pmonitor.h"
+#include "server.h"
#define LOG(kind,...) \
GNUNET_log (kind, __VA_ARGS__)
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r30825 - in msh: . mshd2 src,
gnunet <=