[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r26423 - gnunet/src/dv
From: |
gnunet |
Subject: |
[GNUnet-SVN] r26423 - gnunet/src/dv |
Date: |
Thu, 14 Mar 2013 14:46:14 +0100 |
Author: grothoff
Date: 2013-03-14 14:46:14 +0100 (Thu, 14 Mar 2013)
New Revision: 26423
Modified:
gnunet/src/dv/Makefile.am
gnunet/src/dv/gnunet-service-dv.c
Log:
-more DV hacking
Modified: gnunet/src/dv/Makefile.am
===================================================================
--- gnunet/src/dv/Makefile.am 2013-03-14 12:49:54 UTC (rev 26422)
+++ gnunet/src/dv/Makefile.am 2013-03-14 13:46:14 UTC (rev 26423)
@@ -38,6 +38,7 @@
gnunet_service_dv_SOURCES = \
gnunet-service-dv.c dv.h
gnunet_service_dv_LDADD = \
+ $(top_builddir)/src/consensus/libgnunetconsensus.la \
$(top_builddir)/src/statistics/libgnunetstatistics.la \
$(top_builddir)/src/core/libgnunetcore.la \
$(top_builddir)/src/util/libgnunetutil.la \
Modified: gnunet/src/dv/gnunet-service-dv.c
===================================================================
--- gnunet/src/dv/gnunet-service-dv.c 2013-03-14 12:49:54 UTC (rev 26422)
+++ gnunet/src/dv/gnunet-service-dv.c 2013-03-14 13:46:14 UTC (rev 26423)
@@ -43,6 +43,11 @@
#define GNUNET_DV_CONSENSUS_FREQUENCY
GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 5))
/**
+ * Maximum number of messages we queue per peer.
+ */
+#define MAX_QUEUE_SIZE 16
+
+/**
* The default fisheye depth, from how many hops away will
* we keep peers?
*/
@@ -101,6 +106,11 @@
*/
struct GNUNET_PeerIdentity target;
+ /**
+ * The (actual) sender of the message.
+ */
+ struct GNUNET_PeerIdentity sender;
+
};
GNUNET_NETWORK_STRUCT_END
@@ -190,6 +200,12 @@
struct GNUNET_CONSENSUS_Handle *consensus;
/**
+ * ID of the task we use to (periodically) update our consensus
+ * with this peer.
+ */
+ GNUNET_SCHEDULER_TaskIdentifier consensus_task;
+
+ /**
* At what offset are we, with respect to inserting our own routes
* into the consensus?
*/
@@ -201,6 +217,11 @@
*/
unsigned int consensus_insertion_distance;
+ /**
+ * Number of messages currently in the 'pm_XXXX'-DLL.
+ */
+ unsigned int pm_queue_size;
+
};
@@ -272,12 +293,6 @@
static struct ConsensusSet consensi[DEFAULT_FISHEYE_DEPTH - 1];
/**
- * ID of the task we use to (periodically) update our consensus
- * with other peers.
- */
-static GNUNET_SCHEDULER_Task consensus_task;
-
-/**
* Handle to the core service api.
*/
static struct GNUNET_CORE_Handle *core_api;
@@ -319,12 +334,7 @@
*/
struct GNUNET_STATISTICS_Handle *stats;
-/**
- * How far out to keep peers we learn about.
- */
-static unsigned long long fisheye_depth;
-
/**
* Get distance information from 'atsi'.
*
@@ -400,7 +410,7 @@
*/
static void
send_data_to_plugin (const struct GNUNET_MessageHeader *message,
- struct GNUNET_PeerIdentity *distant_neighbor,
+ const struct GNUNET_PeerIdentity *distant_neighbor,
uint32_t distance)
{
struct GNUNET_DV_ReceivedMessage *received_msg;
@@ -487,7 +497,7 @@
* @param uid plugin-chosen UID for the message
*/
static void
-send_ack_to_plugin (struct GNUNET_PeerIdentity *target,
+send_ack_to_plugin (const struct GNUNET_PeerIdentity *target,
uint32_t uid)
{
struct GNUNET_DV_AckMessage ack_msg;
@@ -581,6 +591,7 @@
while ( (NULL != (pending = dn->pm_head)) &&
(size >= off + (msize = ntohs (pending->msg->size))))
{
+ dn->pm_queue_size--;
GNUNET_CONTAINER_DLL_remove (dn->pm_head,
dn->pm_tail,
pending);
@@ -604,6 +615,59 @@
/**
+ * Forward the given payload to the given target.
+ *
+ * @param target where to send the message
+ * @param distance expected (remaining) distance to the target
+ * @param sender original sender of the message
+ * @param payload payload of the message
+ */
+static void
+forward_payload (struct DirectNeighbor *target,
+ uint32_t distance,
+ const struct GNUNET_PeerIdentity *sender,
+ const struct GNUNET_MessageHeader *payload)
+{
+ struct PendingMessage *pm;
+ struct RouteMessage *rm;
+ size_t msize;
+
+ if ( (target->pm_queue_size >= MAX_QUEUE_SIZE) &&
+ (0 != memcmp (sender,
+ &my_identity,
+ sizeof (struct GNUNET_PeerIdentity))) )
+ return;
+ msize = sizeof (struct RouteMessage) + ntohs (payload->size);
+ if (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
+ {
+ GNUNET_break (0);
+ return;
+ }
+ pm = GNUNET_malloc (sizeof (struct PendingMessage) + msize);
+ pm->msg = (const struct GNUNET_MessageHeader *) &pm[1];
+ rm = (struct RouteMessage *) &pm[1];
+ rm->header.size = htons ((uint16_t) msize);
+ rm->header.type = htons (GNUNET_MESSAGE_TYPE_DV_ROUTE);
+ rm->distance = htonl (distance);
+ rm->target = target->peer;
+ rm->sender = *sender;
+ memcpy (&rm[1], payload, ntohs (payload->size));
+ GNUNET_CONTAINER_DLL_insert_tail (target->pm_head,
+ target->pm_tail,
+ pm);
+ target->pm_queue_size++;
+ if (NULL == target->cth)
+ target->cth = GNUNET_CORE_notify_transmit_ready (core_api,
+ GNUNET_YES /* cork */,
+ 0 /* priority */,
+
GNUNET_TIME_UNIT_FOREVER_REL,
+ &target->peer,
+ msize,
+ &core_transmit_notify,
target);
+}
+
+
+/**
* Find a free slot for storing a 'route' in the 'consensi'
* set at the given distance.
*
@@ -741,6 +805,7 @@
* @param message the message
* @param atsi transport ATS information (latency, distance, etc.)
* @param atsi_count number of entries in atsi
+ * @return GNUNET_OK on success, GNUNET_SYSERR if the other peer violated the
protocol
*/
static int
handle_dv_route_message (void *cls, const struct GNUNET_PeerIdentity *peer,
@@ -748,7 +813,62 @@
const struct GNUNET_ATS_Information *atsi,
unsigned int atsi_count)
{
- GNUNET_break (0); // FIXME
+ const struct RouteMessage *rm;
+ const struct GNUNET_MessageHeader *payload;
+ struct Route *route;
+
+ if (ntohs (message->size) < sizeof (struct RouteMessage) + sizeof (struct
GNUNET_MessageHeader))
+ {
+ GNUNET_break_op (0);
+ return GNUNET_SYSERR;
+ }
+ rm = (const struct RouteMessage *) message;
+ payload = (const struct GNUNET_MessageHeader *) &rm[1];
+ if (ntohs (message->size) != sizeof (struct RouteMessage) + ntohs
(payload->size))
+ {
+ GNUNET_break_op (0);
+ return GNUNET_SYSERR;
+ }
+ if (0 == memcmp (&rm->target,
+ &my_identity,
+ sizeof (struct GNUNET_PeerIdentity)))
+ {
+ /* message is for me, check reverse route! */
+ route = GNUNET_CONTAINER_multihashmap_get (all_routes,
+ &rm->sender.hashPubKey);
+ if (NULL == route)
+ {
+ /* don't have reverse route, drop */
+ GNUNET_STATISTICS_update (stats,
+ "# message discarded (no reverse route)",
+ 1, GNUNET_NO);
+ return GNUNET_OK;
+ }
+ send_data_to_plugin (payload,
+ &rm->sender,
+ route->target.distance);
+ return GNUNET_OK;
+ }
+ route = GNUNET_CONTAINER_multihashmap_get (all_routes,
+ &rm->target.hashPubKey);
+ if (NULL == route)
+ {
+ GNUNET_STATISTICS_update (stats,
+ "# messages discarded (no route)",
+ 1, GNUNET_NO);
+ return GNUNET_OK;
+ }
+ if (route->target.distance > ntohl (rm->distance) + 1)
+ {
+ GNUNET_STATISTICS_update (stats,
+ "# messages discarded (target too far)",
+ 1, GNUNET_NO);
+ return GNUNET_OK;
+ }
+ forward_payload (route->next_hop,
+ route->target.distance,
+ &rm->sender,
+ payload);
return GNUNET_OK;
}
@@ -765,7 +885,43 @@
handle_dv_send_message (void *cls, struct GNUNET_SERVER_Client *client,
const struct GNUNET_MessageHeader *message)
{
- GNUNET_break (0); // FIXME
+ struct Route *route;
+ const struct GNUNET_DV_SendMessage *msg;
+ const struct GNUNET_MessageHeader *payload;
+
+ if (ntohs (message->size) < sizeof (struct GNUNET_DV_SendMessage) + sizeof
(struct GNUNET_MessageHeader))
+ {
+ GNUNET_break (0);
+ GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
+ return;
+ }
+ msg = (const struct GNUNET_DV_SendMessage *) message;
+ payload = (const struct GNUNET_MessageHeader *) &msg[1];
+ if (ntohs (message->size) != sizeof (struct GNUNET_DV_SendMessage) + ntohs
(payload->size))
+ {
+ GNUNET_break (0);
+ GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
+ return;
+ }
+ route = GNUNET_CONTAINER_multihashmap_get (all_routes,
+ &msg->target.hashPubKey);
+ if (NULL == route)
+ {
+ /* got disconnected, send ACK anyway?
+ FIXME: What we really want is an 'NACK' here... */
+ GNUNET_STATISTICS_update (stats,
+ "# local messages discarded (no route)",
+ 1, GNUNET_NO);
+ send_ack_to_plugin (&msg->target, htonl (msg->uid));
+ GNUNET_SERVER_receive_done (client, GNUNET_OK);
+ return;
+ }
+ // FIXME: flow control (send ACK only once message has left the queue...)
+ send_ack_to_plugin (&msg->target, htonl (msg->uid));
+ forward_payload (route->next_hop,
+ route->target.distance,
+ &my_identity,
+ payload);
GNUNET_SERVER_receive_done (client, GNUNET_OK);
}
@@ -876,6 +1032,7 @@
while (NULL != (pending = neighbor->pm_head))
{
+ neighbor->pm_queue_size--;
GNUNET_CONTAINER_DLL_remove (neighbor->pm_head,
neighbor->pm_tail,
pending);
@@ -889,6 +1046,16 @@
GNUNET_CORE_notify_transmit_ready_cancel (neighbor->cth);
neighbor->cth = NULL;
}
+ if (GNUNET_SCHEDULER_NO_TASK != neighbor->consensus_task)
+ {
+ GNUNET_SCHEDULER_cancel (neighbor->consensus_task);
+ neighbor->consensus_task = GNUNET_SCHEDULER_NO_TASK;
+ }
+ if (NULL != neighbor->consensus)
+ {
+ GNUNET_CONSENSUS_destroy (neighbor->consensus);
+ neighbor->consensus = NULL;
+ }
GNUNET_assert (GNUNET_YES ==
GNUNET_CONTAINER_multihashmap_remove (direct_neighbors,
&neighbor->peer.hashPubKey,
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r26423 - gnunet/src/dv,
gnunet <=