gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r10460 - in gnunet/src: datastore include transport


From: gnunet
Subject: [GNUnet-SVN] r10460 - in gnunet/src: datastore include transport
Date: Mon, 1 Mar 2010 14:16:24 +0100

Author: grothoff
Date: 2010-03-01 14:16:24 +0100 (Mon, 01 Mar 2010)
New Revision: 10460

Modified:
   gnunet/src/datastore/datastore.h
   gnunet/src/datastore/datastore_api.c
   gnunet/src/include/gnunet_core_service.h
   gnunet/src/transport/gnunet-service-transport.c
   gnunet/src/transport/plugin_transport.h
   gnunet/src/transport/plugin_transport_tcp.c
   gnunet/src/transport/plugin_transport_template.c
   gnunet/src/transport/plugin_transport_udp.c
   gnunet/src/transport/plugin_transport_udp_nat.c
Log:
fixing API issue of who is responsible for quota in

Modified: gnunet/src/datastore/datastore.h
===================================================================
--- gnunet/src/datastore/datastore.h    2010-02-28 12:59:17 UTC (rev 10459)
+++ gnunet/src/datastore/datastore.h    2010-03-01 13:16:24 UTC (rev 10460)
@@ -27,7 +27,7 @@
 #ifndef DATASTORE_H
 #define DATASTORE_H
 
-#define DEBUG_DATASTORE GNUNET_NO
+#define DEBUG_DATASTORE GNUNET_YES
 
 #include "gnunet_util_lib.h"
 

Modified: gnunet/src/datastore/datastore_api.c
===================================================================
--- gnunet/src/datastore/datastore_api.c        2010-02-28 12:59:17 UTC (rev 
10459)
+++ gnunet/src/datastore/datastore_api.c        2010-03-01 13:16:24 UTC (rev 
10460)
@@ -656,6 +656,7 @@
     {
       gm->header.size = htons(sizeof (struct GetMessage) - 
sizeof(GNUNET_HashCode));
     }
+  GNUNET_assert (h->response_proc == NULL);
   transmit_for_result (h, iter, iter_cls, timeout);
 }
 
@@ -680,6 +681,7 @@
   m = (struct GNUNET_MessageHeader*) &h[1];
   m->type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET_RANDOM);
   m->size = htons(sizeof (struct GNUNET_MessageHeader));
+  GNUNET_assert (h->response_proc == NULL);
   transmit_for_result (h, iter, iter_cls, timeout);
 }
 

Modified: gnunet/src/include/gnunet_core_service.h
===================================================================
--- gnunet/src/include/gnunet_core_service.h    2010-02-28 12:59:17 UTC (rev 
10459)
+++ gnunet/src/include/gnunet_core_service.h    2010-03-01 13:16:24 UTC (rev 
10460)
@@ -273,7 +273,8 @@
  * @param bpm_out set to the current bandwidth limit (sending) for this peer
  * @param latency current latency estimate, "FOREVER" if we have been
  *                disconnected
- * @param amount set to the amount that was actually reserved or unreserved
+ * @param amount set to the amount that was actually reserved or unreserved;
+ *               either the full requested amount or zero (no partial 
reservations)
  * @param preference current traffic preference for the given peer
  */
 typedef void

Modified: gnunet/src/transport/gnunet-service-transport.c
===================================================================
--- gnunet/src/transport/gnunet-service-transport.c     2010-02-28 12:59:17 UTC 
(rev 10459)
+++ gnunet/src/transport/gnunet-service-transport.c     2010-03-01 13:16:24 UTC 
(rev 10460)
@@ -871,6 +871,37 @@
   uint64_t allowed;
   uint64_t remaining;
 
+#if 0
+  struct GNUNET_TIME_Absolute now;
+  unsigned long long delta;
+  unsigned long long total_allowed;
+  unsigned long long total_remaining;
+
+  now = GNUNET_TIME_absolute_get ();
+  delta = now.value - session->last_quota_update.value;
+  if ((delta < MIN_QUOTA_REFRESH_TIME) && (!force))
+    return;                     /* too early, not enough data */
+
+  total_allowed = session->quota_in * delta;
+  if (total_allowed > session->last_received)
+    {
+      /* got less than acceptable */
+      total_remaining = total_allowed - session->last_received;
+      session->last_received = 0;
+      delta = total_remaining / session->quota_in;      /* bonus seconds */
+      if (delta > MAX_BANDWIDTH_CARRY)
+        delta = MAX_BANDWIDTH_CARRY;    /* limit amount of carry-over */
+    }
+  else
+    {
+      /* got more than acceptable */
+      session->last_received -= total_allowed;
+      delta = 0;
+    }
+  session->last_quota_update.value = now.value - delta;
+#endif
+
+
   delta = GNUNET_TIME_absolute_get_duration (n->last_quota_update);
   if (delta.value < MIN_QUOTA_REFRESH_TIME)
     return;                     /* not enough time passed for doing quota 
update */
@@ -2539,24 +2570,63 @@
 
 
 /**
+ * Calculate how long we should delay reading from the TCP socket to
+ * ensure that we stay within our bandwidth limits (push back).
+ *
+ * @param n for which neighbour should this be calculated
+ * @return how long to delay receiving more data
+ */
+static struct GNUNET_TIME_Relative
+calculate_throttle_delay (struct NeighbourList *n)
+{
+  struct GNUNET_TIME_Relative ret;
+  struct GNUNET_TIME_Absolute now;
+  uint64_t del;
+  uint64_t avail;
+  uint64_t excess;
+
+  now = GNUNET_TIME_absolute_get ();
+  del = now.value - n->last_quota_update.value;
+  if (del > MAX_BANDWIDTH_CARRY)
+    {
+      update_quota (n /*, GNUNET_YES*/);
+      del = now.value - n->last_quota_update.value;
+      GNUNET_assert (del <= MAX_BANDWIDTH_CARRY);
+    }
+  if (n->quota_in == 0)
+    n->quota_in = 1;      /* avoid divison by zero */
+  avail = del * n->quota_in;
+  if (avail > n->last_received)
+    return GNUNET_TIME_UNIT_ZERO;       /* can receive right now */
+  excess = n->last_received - avail;
+  ret.value = excess / n->quota_in;
+  if (ret.value > 0)
+    GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+               "Throttling read (%llu bytes excess at %llu b/ms), waiting 
%llums before reading more.\n",
+               (unsigned long long) excess,
+               (unsigned long long) n->quota_in,
+               (unsigned long long) ret.value);
+  return ret;
+}
+
+
+/**
  * Function called by the plugin for each received message.
  * Update data volumes, possibly notify plugins about
  * reducing the rate at which they read from the socket
  * and generally forward to our receive callback.
  *
  * @param cls the "struct TransportPlugin *" we gave to the plugin
- * @param message the message, NULL if peer was disconnected
- * @param distance the transport cost to this peer (not latency!)
- * @param sender_address the address that the sender reported
- *        (opaque to transport service)
- * @param sender_address_len the length of the sender address
  * @param peer (claimed) identity of the other peer
- * @return the new service_context that the plugin should use
- *         for future receive calls for messages from this
- *         particular peer
- *
+ * @param message the message, NULL if we only care about
+ *                learning about the delay until we should receive again
+ * @param distance in overlay hops; use 1 unless DV (or 0 if message == NULL)
+ * @param sender_address binary address of the sender (if observed)
+ * @param sender_address_len number of bytes in sender_address
+ * @return how long the plugin should wait until receiving more data
+ *         (plugins that do not support this, can ignore the return value)
  */
-static void
+static struct GNUNET_TIME_Relative
 plugin_env_receive (void *cls, const struct GNUNET_PeerIdentity *peer,
                     const struct GNUNET_MessageHeader *message,
                     unsigned int distance, const char *sender_address,
@@ -2572,99 +2642,86 @@
 
   n = find_neighbour (peer);
   if (n == NULL)
-    {
-      if (message == NULL)
-        return;                 /* disconnect of peer already marked down */
-      n = setup_new_neighbour (peer);
-    }
+    n = setup_new_neighbour (peer);    
   service_context = n->plugins;
   while ((service_context != NULL) && (plugin != service_context->plugin))
     service_context = service_context->next;
   GNUNET_assert ((plugin->api->send == NULL) || (service_context != NULL));
-  if (message == NULL)
+  if (message != NULL)
     {
+      peer_address = add_peer_address(n, 
+                                     plugin->short_name,
+                                     sender_address, 
+                                     sender_address_len);  
+      if (peer_address != NULL)
+       {
+         peer_address->distance = distance;
+         if (peer_address->connected == GNUNET_NO)
+           {
+             peer_address->connected = GNUNET_YES;
+             peer_address->connect_attempts++;
+           }
+         peer_address->timeout
+           =
+           GNUNET_TIME_relative_to_absolute
+           (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
+       }
+      /* update traffic received amount ... */
+      msize = ntohs (message->size);
+      n->distance = distance;
+      n->peer_timeout =
+       GNUNET_TIME_relative_to_absolute
+       (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
+      GNUNET_SCHEDULER_cancel (sched,
+                              n->timeout_task);
+      n->timeout_task =
+       GNUNET_SCHEDULER_add_delayed (sched,
+                                     GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
+                                     &neighbour_timeout_task, n);
+      if (n->quota_violation_count > QUOTA_VIOLATION_DROP_THRESHOLD)
+       {
+         /* dropping message due to frequent inbound volume violations! */
+         GNUNET_log (GNUNET_ERROR_TYPE_WARNING |
+                     GNUNET_ERROR_TYPE_BULK,
+                     _
+                     ("Dropping incoming message due to repeated bandwidth 
quota violations (total of %u).\n"), 
+                     n->quota_violation_count);
+         return GNUNET_TIME_UNIT_MINUTES; /* minimum penalty, likely ignored 
(UDP...) */
+       }
+      switch (ntohs (message->type))
+       {
+       case GNUNET_MESSAGE_TYPE_HELLO:
+         process_hello (plugin, message);
+         break;
+       case GNUNET_MESSAGE_TYPE_TRANSPORT_PING:
+         handle_ping(plugin, message, peer, sender_address, 
sender_address_len);
+         break;
+       case GNUNET_MESSAGE_TYPE_TRANSPORT_PONG:
+         handle_pong(plugin, message, peer, sender_address, 
sender_address_len);
+         break;
+       default:
 #if DEBUG_TRANSPORT
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
-                  "Receive failed from `%4s', triggering disconnect\n",
-                  GNUNET_i2s (&n->id));
+         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                     "Received message of type %u from `%4s', sending to all 
clients.\n",
+                     ntohs (message->type), GNUNET_i2s (peer));
 #endif
-      /* TODO: call stats */
-      disconnect_neighbour (n, GNUNET_YES);
-      return;
-    }
-  peer_address = add_peer_address(n, 
-                                 plugin->short_name,
-                                 sender_address, 
-                                 sender_address_len);  
-  if (peer_address != NULL)
-    {
-      peer_address->distance = distance;
-      if (peer_address->connected == GNUNET_NO)
-        {
-         peer_address->connected = GNUNET_YES;
-          peer_address->connect_attempts++;
-        }
-      peer_address->timeout
-        =
-        GNUNET_TIME_relative_to_absolute
-        (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
-    }
-  /* update traffic received amount ... */
-  msize = ntohs (message->size);
-  n->last_received += msize;
-  n->distance = distance;
-  n->peer_timeout =
-    GNUNET_TIME_relative_to_absolute
-    (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
-  GNUNET_SCHEDULER_cancel (sched,
-                          n->timeout_task);
-  n->timeout_task =
-    GNUNET_SCHEDULER_add_delayed (sched,
-                                  GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
-                                  &neighbour_timeout_task, n);
-  update_quota (n);
-  if (n->quota_violation_count > QUOTA_VIOLATION_DROP_THRESHOLD)
-    {
-      /* dropping message due to frequent inbound volume violations! */
-      GNUNET_log (GNUNET_ERROR_TYPE_WARNING |
-                  GNUNET_ERROR_TYPE_BULK,
-                  _
-                  ("Dropping incoming message due to repeated bandwidth quota 
violations (total of %u).\n"), n->quota_violation_count);
-      /* TODO: call stats */
-      return;
-    }
-  switch (ntohs (message->type))
-    {
-    case GNUNET_MESSAGE_TYPE_HELLO:
-      process_hello (plugin, message);
-      break;
-    case GNUNET_MESSAGE_TYPE_TRANSPORT_PING:
-      handle_ping(plugin, message, peer, sender_address, sender_address_len);
-      break;
-    case GNUNET_MESSAGE_TYPE_TRANSPORT_PONG:
-      handle_pong(plugin, message, peer, sender_address, sender_address_len);
-      break;
-    default:
-#if DEBUG_TRANSPORT
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                  "Received message of type %u from `%4s', sending to all 
clients.\n",
-                  ntohs (message->type), GNUNET_i2s (peer));
-#endif
-      /* transmit message to all clients */
-      im = GNUNET_malloc (sizeof (struct InboundMessage) + msize);
-      im->header.size = htons (sizeof (struct InboundMessage) + msize);
-      im->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_RECV);
-      im->latency = GNUNET_TIME_relative_hton (n->latency);
-      im->peer = *peer;
-      memcpy (&im[1], message, msize);
-      cpos = clients;
-      while (cpos != NULL)
-        {
-          transmit_to_client (cpos, &im->header, GNUNET_YES);
-          cpos = cpos->next;
-        }
-      GNUNET_free (im);
-    }
+         /* transmit message to all clients */
+         im = GNUNET_malloc (sizeof (struct InboundMessage) + msize);
+         im->header.size = htons (sizeof (struct InboundMessage) + msize);
+         im->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_RECV);
+         im->latency = GNUNET_TIME_relative_hton (n->latency);
+         im->peer = *peer;
+         memcpy (&im[1], message, msize);
+         cpos = clients;
+         while (cpos != NULL)
+           {
+             transmit_to_client (cpos, &im->header, GNUNET_YES);
+             cpos = cpos->next;
+           }
+         GNUNET_free (im);
+       }
+    }  
+  return calculate_throttle_delay (n);
 }
 
 
@@ -2830,8 +2887,6 @@
   const struct QuotaSetMessage *qsm =
     (const struct QuotaSetMessage *) message;
   struct NeighbourList *n;
-  struct TransportPlugin *p;
-  struct ReadyList *rl;          
   uint32_t qin;
 
   n = find_neighbour (&qsm->peer);
@@ -2850,14 +2905,6 @@
   if (n->quota_in < qin)
     n->last_quota_update = GNUNET_TIME_absolute_get ();
   n->quota_in = qin;
-  rl = n->plugins;
-  while (rl != NULL)
-    {
-      p = rl->plugin;
-      p->api->set_receive_quota (p->api->cls,
-                                 &qsm->peer, qin);
-      rl = rl->next;
-    }
   GNUNET_SERVER_receive_done (client, GNUNET_OK);
 }
 
@@ -2974,8 +3021,6 @@
   plug->env.cls = plug;
   plug->env.receive = &plugin_env_receive;
   plug->env.notify_address = &plugin_env_notify_address;
-  plug->env.default_quota_in =
-    (GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT + 59999) / (60 * 1000);
   plug->env.max_connections = max_connect_per_transport;
 }
 

Modified: gnunet/src/transport/plugin_transport.h
===================================================================
--- gnunet/src/transport/plugin_transport.h     2010-02-28 12:59:17 UTC (rev 
10459)
+++ gnunet/src/transport/plugin_transport.h     2010-03-01 13:16:24 UTC (rev 
10460)
@@ -26,14 +26,6 @@
  *        Note that the destructors of transport plugins will
  *        be given the value returned by the constructor
  *        and is expected to return a NULL pointer.
- *
- * TODO:
- * - consider moving DATA message (latency measurement)
- *   to service; avoids encapsulation overheads and
- *   would enable latency measurements for non-bidi
- *   transports.
- * -
- *
  * @author Christian Grothoff
  */
 #ifndef PLUGIN_TRANSPORT_H
@@ -51,21 +43,24 @@
  *
  * @param cls closure
  * @param peer (claimed) identity of the other peer
- * @param message the message, NULL if peer was disconnected
- * @param distance in overlay hops; use 1 unless DV
+ * @param message the message, NULL if we only care about
+ *                learning about the delay until we should receive again
+ * @param distance in overlay hops; use 1 unless DV (or 0 if message == NULL)
  * @param sender_address binary address of the sender (if observed)
  * @param sender_address_len number of bytes in sender_address
+ * @return how long the plugin should wait until receiving more data
+ *         (plugins that do not support this, can ignore the return value)
  */
-typedef void (*GNUNET_TRANSPORT_PluginReceiveCallback) (void *cls,
-                                                        const struct
-                                                        GNUNET_PeerIdentity *
-                                                        peer,
-                                                       const struct
-                                                        GNUNET_MessageHeader *
-                                                        message,
-                                                        uint32_t distance,
-                                                       const char 
*sender_address,
-                                                       size_t 
sender_address_len);
+typedef struct GNUNET_TIME_Relative (*GNUNET_TRANSPORT_PluginReceiveCallback) 
(void *cls,
+                                                                              
const struct
+                                                                              
GNUNET_PeerIdentity *
+                                                                              
peer,
+                                                                              
const struct
+                                                                              
GNUNET_MessageHeader *
+                                                                              
message,
+                                                                              
uint32_t distance,
+                                                                              
const char *sender_address,
+                                                                              
size_t sender_address_len);
 
 
 /**
@@ -89,6 +84,27 @@
 
 
 /**
+ * Function that will be called whenever the plugin receives data over
+ * the network and wants to determine how long it should wait until
+ * the next time it reads from the given peer.  Note that some plugins
+ * (such as UDP) may not be able to wait (for a particular peer), so
+ * the waiting part is optional.  Plugins that can wait should call
+ * this function, sleep the given amount of time, and call it again
+ * (with zero bytes read) UNTIL it returns zero and only then read.
+ * 
+ * @param cls closure
+ * @param peer which peer did we read data from 
+ * @param amount_recved number of bytes read (can be zero)
+ * @return how long to wait until reading more from this peer
+ *         (to enforce inbound quotas)
+ */
+typedef struct GNUNET_TIME_Relative (*GNUNET_TRANSPORT_TrafficReport) (void 
*cls,
+                                                                      const 
struct 
+                                                                      
GNUNET_PeerIdentity *peer,
+                                                                      size_t 
amount_recved);
+
+
+/**
  * The transport service will pass a pointer to a struct
  * of this type as the first and only argument to the
  * entry point of each transport plugin.
@@ -129,10 +145,10 @@
   GNUNET_TRANSPORT_AddressNotification notify_address;
 
   /**
-   * What is the default quota (in terms of incoming bytes per
-   * ms) for new connections?
+   * Inform service about traffic received, get information
+   * about when we might be willing to receive more.
    */
-  uint32_t default_quota_in;
+  GNUNET_TRANSPORT_TrafficReport traffic_report;
 
   /**
    * What is the maximum number of connections that this transport
@@ -270,21 +286,6 @@
 
 
 /**
- * Set a quota for receiving data from the given peer; this is a
- * per-transport limit.  The transport should limit its read/select
- * calls to stay below the quota (in terms of incoming data).
- *
- * @param cls closure
- * @param peer the peer for whom the quota is given
- * @param quota_in quota for receiving/sending data in bytes per ms
- */
-typedef void
-  (*GNUNET_TRANSPORT_SetQuota) (void *cls,
-                                const struct GNUNET_PeerIdentity * target,
-                                uint32_t quota_in);
-
-
-/**
  * Another peer has suggested an address for this peer and transport
  * plugin.  Check that this could be a valid address.  This function
  * is not expected to 'validate' the address in the sense of trying to
@@ -338,14 +339,6 @@
   GNUNET_TRANSPORT_AddressPrettyPrinter address_pretty_printer;
 
   /**
-   * Function that the transport service can use to try to enforce a
-   * quota for the number of bytes received via this transport.
-   * Transports that can not refuse incoming data (such as UDP)
-   * are free to ignore these calls.
-   */
-  GNUNET_TRANSPORT_SetQuota set_receive_quota;
-
-  /**
    * Function that will be called to check if a binary address
    * for this plugin is well-formed.  If clearly needed, patch
    * up information such as port numbers.

Modified: gnunet/src/transport/plugin_transport_tcp.c
===================================================================
--- gnunet/src/transport/plugin_transport_tcp.c 2010-02-28 12:59:17 UTC (rev 
10459)
+++ gnunet/src/transport/plugin_transport_tcp.c 2010-03-01 13:16:24 UTC (rev 
10460)
@@ -165,9 +165,9 @@
   struct GNUNET_PeerIdentity target;
 
   /**
-   * At what time did we reset last_received last?
+   * ID of task used to delay receiving more to throttle sender.
    */
-  struct GNUNET_TIME_Absolute last_quota_update;
+  GNUNET_SCHEDULER_TaskIdentifier receive_delay_task;
 
   /**
    * Address of the other peer (either based on our 'connect'
@@ -176,18 +176,6 @@
   void *connect_addr;
 
   /**
-   * How many bytes have we received since the "last_quota_update"
-   * timestamp?
-   */
-  uint64_t last_received;
-
-  /**
-   * Number of bytes per ms that this peer is allowed
-   * to send to us.
-   */
-  uint32_t quota_in;
-
-  /**
    * Length of connect_addr.
    */
   size_t connect_alen;
@@ -266,28 +254,6 @@
 
 
 /**
- * Find a session handle for the given peer. 
- * FIXME: using a hash map we could do this in O(1).
- *
- * @return NULL if no matching session exists
- */
-static struct Session *
-find_session_by_target (struct Plugin *plugin,
-                        const struct GNUNET_PeerIdentity *target)
-{
-  struct Session *ret;
-
-  ret = plugin->sessions;
-  while ( (ret != NULL) &&
-         ((GNUNET_SYSERR == ret->expecting_welcome) ||
-          (0 != memcmp (target,
-                        &ret->target, sizeof (struct GNUNET_PeerIdentity)))))
-    ret = ret->next;
-  return ret;
-}
-
-
-/**
  * Find the session handle for the given client.
  *
  * @return NULL if no matching session exists
@@ -332,9 +298,6 @@
   plugin->sessions = ret;
   ret->client = client;
   ret->target = *target;
-  ret->last_quota_update = GNUNET_TIME_absolute_get ();
-  // FIXME: This is simply wrong...
-  ret->quota_in = plugin->env->default_quota_in;
   ret->expecting_welcome = GNUNET_YES;
   pm = GNUNET_malloc (sizeof (struct PendingMessage) + sizeof (struct 
WelcomeMessage));
   pm->msg = (const char*) &pm[1];
@@ -534,18 +497,22 @@
          transport service may know about this one, so we need to
          notify transport service about disconnect */
       // FIXME: we should have a very clear connect-disconnect
-      // protocol with gnunet-service-transport!
-      session->plugin->env->receive (session->plugin->env->cls,
-                                     &session->target, NULL,
-                                     1,
-                                    session->connect_addr,
-                                    session->connect_alen);
+      // protocol with gnunet-service-transport! 
+      // FIXME: but this is not possible for all plugins, so what gives?
     }
+  if (session->receive_delay_task != GNUNET_SCHEDULER_NO_TASK)
+    {
+      GNUNET_SCHEDULER_cancel (session->plugin->env->sched,
+                              session->receive_delay_task);
+      if (session->client != NULL)
+       GNUNET_SERVER_receive_done (session->client, 
+                                   GNUNET_SYSERR);
+    }
   if (session->client != NULL)
     {
       GNUNET_SERVER_client_drop (session->client);
       session->client = NULL;
-    }
+    } 
   GNUNET_free_non_null (session->connect_addr);
   GNUNET_free (session);
 }
@@ -846,98 +813,6 @@
 
 
 /**
- * Update the last-received and bandwidth quota values
- * for this session.
- *
- * @param session session to update
- * @param force set to GNUNET_YES if we should update even
- *        though the minimum refresh time has not yet expired
- */
-static void
-update_quota (struct Session *session, int force)
-{
-  struct GNUNET_TIME_Absolute now;
-  unsigned long long delta;
-  unsigned long long total_allowed;
-  unsigned long long total_remaining;
-
-  now = GNUNET_TIME_absolute_get ();
-  delta = now.value - session->last_quota_update.value;
-  if ((delta < MIN_QUOTA_REFRESH_TIME) && (!force))
-    return;                     /* too early, not enough data */
-
-  total_allowed = session->quota_in * delta;
-  if (total_allowed > session->last_received)
-    {
-      /* got less than acceptable */
-      total_remaining = total_allowed - session->last_received;
-      session->last_received = 0;
-      delta = total_remaining / session->quota_in;      /* bonus seconds */
-      if (delta > MAX_BANDWIDTH_CARRY)
-        delta = MAX_BANDWIDTH_CARRY;    /* limit amount of carry-over */
-    }
-  else
-    {
-      /* got more than acceptable */
-      session->last_received -= total_allowed;
-      delta = 0;
-    }
-  session->last_quota_update.value = now.value - delta;
-}
-
-
-/**
- * Set a quota for receiving data from the given peer; this is a
- * per-transport limit.  The transport should limit its read/select
- * calls to stay below the quota (in terms of incoming data).
- *
- * @param cls closure
- * @param target the peer for whom the quota is given
- * @param quota_in quota for receiving/sending data in bytes per ms
- */
-static void
-tcp_plugin_set_receive_quota (void *cls,
-                              const struct GNUNET_PeerIdentity *target,
-                              uint32_t quota_in)
-{
-  struct Plugin *plugin = cls;
-  struct Session *session;
-
-  // FIXME: This is simply wrong:
-  // We may have multiple sessions for the target,
-  // and some OTHER session might be the one to 
-  // survive; not to mention the inbound-quota should
-  // be enforced across transports!
-  // => keep quota-related states in the service (globally, per peer)
-  //    and update/query the information when it is needed!
-  // => we can likely get rid of this entire function and
-  //    replace it with a query/update API!
-  session = find_session_by_target (plugin, target);
-  if (session == NULL)
-    {
-      GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
-                      "tcp",
-                      "Could not find session for peer `%4s' to update 
quota.\n",
-                      GNUNET_i2s (target));
-      return;                     /* peer must have disconnected, ignore */
-    }
-  if (session->quota_in != quota_in)
-    {
-      GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
-                      "tcp",
-                      "Changing quota for peer `%4s' from %u to %u\n",
-                      GNUNET_i2s (target),
-                      session->quota_in,
-                      quota_in);
-      update_quota (session, GNUNET_YES);
-      if (session->quota_in > quota_in)
-        session->last_quota_update = GNUNET_TIME_absolute_get ();
-      session->quota_in = quota_in;
-    }
-}
-
-
-/**
  * Check if the given port is plausible (must be either
  * our listen port or our advertised port).  If it is
  * neither, we return one of these two ports at random.
@@ -1073,46 +948,6 @@
 
 
 /**
- * Calculate how long we should delay reading from the TCP socket to
- * ensure that we stay within our bandwidth limits (push back).
- *
- * @param session for which client should this be calculated
- */
-static struct GNUNET_TIME_Relative
-calculate_throttle_delay (struct Session *session)
-{
-  struct GNUNET_TIME_Relative ret;
-  struct GNUNET_TIME_Absolute now;
-  uint64_t del;
-  uint64_t avail;
-  uint64_t excess;
-
-  now = GNUNET_TIME_absolute_get ();
-  del = now.value - session->last_quota_update.value;
-  if (del > MAX_BANDWIDTH_CARRY)
-    {
-      update_quota (session, GNUNET_YES);
-      del = now.value - session->last_quota_update.value;
-      GNUNET_assert (del <= MAX_BANDWIDTH_CARRY);
-    }
-  if (session->quota_in == 0)
-    session->quota_in = 1;      /* avoid divison by zero */
-  avail = del * session->quota_in;
-  if (avail > session->last_received)
-    return GNUNET_TIME_UNIT_ZERO;       /* can receive right now */
-  excess = session->last_received - avail;
-  ret.value = excess / session->quota_in;
-  GNUNET_log_from (GNUNET_ERROR_TYPE_WARNING,
-                  "tcp",
-                  "Throttling read (%llu bytes excess at %llu b/ms), waiting 
%llums before reading more.\n",
-                  (unsigned long long) excess,
-                  (unsigned long long) session->quota_in,
-                  (unsigned long long) ret.value);
-  return ret;
-}
-
-
-/**
  * Task to signal the server that we can continue
  * receiving from the TCP client now.
  */
@@ -1120,7 +955,18 @@
 delayed_done (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
 {
   struct Session *session = cls;
-  GNUNET_SERVER_receive_done (session->client, GNUNET_OK);
+  struct GNUNET_TIME_Relative delay;
+
+  session->receive_delay_task = GNUNET_SCHEDULER_NO_TASK;
+  delay = session->plugin->env->receive (session->plugin->env->cls,
+                                        &session->target,
+                                        NULL, 0, NULL, 0);
+  if (delay.value == 0)
+    GNUNET_SERVER_receive_done (session->client, GNUNET_OK);
+  else
+    session->receive_delay_task = 
+      GNUNET_SCHEDULER_add_delayed (session->plugin->env->sched,
+                                   delay, &delayed_done, session);
 }
 
 
@@ -1165,18 +1011,16 @@
                   (unsigned int) ntohs (message->type),
                   GNUNET_i2s (&session->target));
 #endif
-  plugin->env->receive (plugin->env->cls, &session->target, message, 1,
-                       session->connect_addr,
-                       session->connect_alen);
-  /* update bandwidth used */
-  session->last_received += msize;
-  update_quota (session, GNUNET_NO);
-  delay = calculate_throttle_delay (session);
+  delay = plugin->env->receive (plugin->env->cls, &session->target, message, 1,
+                               session->connect_addr,
+                               session->connect_alen);
+
   if (delay.value == 0)
     GNUNET_SERVER_receive_done (client, GNUNET_OK);
   else
-    GNUNET_SCHEDULER_add_delayed (session->plugin->env->sched,
-                                  delay, &delayed_done, session);
+    session->receive_delay_task = 
+      GNUNET_SCHEDULER_add_delayed (session->plugin->env->sched,
+                                   delay, &delayed_done, session);
 }
 
 
@@ -1342,7 +1186,6 @@
   api->send = &tcp_plugin_send;
   api->disconnect = &tcp_plugin_disconnect;
   api->address_pretty_printer = &tcp_plugin_address_pretty_printer;
-  api->set_receive_quota = &tcp_plugin_set_receive_quota;
   api->check_address = &tcp_plugin_check_address;
   plugin->service = service;
   plugin->server = GNUNET_SERVICE_get_server (service);

Modified: gnunet/src/transport/plugin_transport_template.c
===================================================================
--- gnunet/src/transport/plugin_transport_template.c    2010-02-28 12:59:17 UTC 
(rev 10459)
+++ gnunet/src/transport/plugin_transport_template.c    2010-03-01 13:16:24 UTC 
(rev 10460)
@@ -219,23 +219,6 @@
   asc (asc_cls, NULL);
 }
 
-/**
- * Set a quota for receiving data from the given peer; this is a
- * per-transport limit.  The transport should limit its read/select
- * calls to stay below the quota (in terms of incoming data).
- *
- * @param cls closure
- * @param target the peer for whom the quota is given
- * @param quota_in quota for receiving/sending data in bytes per ms
- */
-static void
-template_plugin_set_receive_quota (void *cls,
-                                   const struct GNUNET_PeerIdentity *target,
-                                   uint32_t quota_in)
-{
-  // struct Plugin *plugin = cls;
-  // FIXME!
-}
 
 
 /**
@@ -280,7 +263,6 @@
   api->send = &template_plugin_send;
   api->disconnect = &template_plugin_disconnect;
   api->address_pretty_printer = &template_plugin_address_pretty_printer;
-  api->set_receive_quota = &template_plugin_set_receive_quota;
   api->check_address = &template_plugin_address_suggested;
   return api;
 }

Modified: gnunet/src/transport/plugin_transport_udp.c
===================================================================
--- gnunet/src/transport/plugin_transport_udp.c 2010-02-28 12:59:17 UTC (rev 
10459)
+++ gnunet/src/transport/plugin_transport_udp.c 2010-03-01 13:16:24 UTC (rev 
10460)
@@ -682,23 +682,6 @@
                                 !numeric, timeout, &append_port, ppc);
 }
 
-/**
- * Set a quota for receiving data from the given peer; this is a
- * per-transport limit.  This call has no meaning for UDP, as if
- * we don't receive data it still comes in.  UDP has no friendliness
- * guarantees, and our buffers will fill at some level.
- *
- * @param cls closure
- * @param target the peer for whom the quota is given
- * @param quota_in quota for receiving/sending data in bytes per ms
- */
-static void
-udp_plugin_set_receive_quota (void *cls,
-                              const struct GNUNET_PeerIdentity *target,
-                              uint32_t quota_in)
-{
-  return; /* Do nothing */
-}
 
 /**
  * The exported method. Makes the core api available via a global and
@@ -766,7 +749,6 @@
   api->send = &udp_plugin_send;
   api->disconnect = &udp_disconnect;
   api->address_pretty_printer = &udp_plugin_address_pretty_printer;
-  api->set_receive_quota = &udp_plugin_set_receive_quota;
   api->check_address = &udp_check_address;
 
   plugin->service = service;

Modified: gnunet/src/transport/plugin_transport_udp_nat.c
===================================================================
--- gnunet/src/transport/plugin_transport_udp_nat.c     2010-02-28 12:59:17 UTC 
(rev 10459)
+++ gnunet/src/transport/plugin_transport_udp_nat.c     2010-03-01 13:16:24 UTC 
(rev 10460)
@@ -1595,23 +1595,6 @@
                                 !numeric, timeout, &append_port, ppc);
 }
 
-/**
- * Set a quota for receiving data from the given peer; this is a
- * per-transport limit.  This call has no meaning for UDP, as if
- * we don't receive data it still comes in.  UDP has no friendliness
- * guarantees, and our buffers will fill at some level.
- *
- * @param cls closure
- * @param target the peer for whom the quota is given
- * @param quota_in quota for receiving/sending data in bytes per ms
- */
-static void
-udp_nat_plugin_set_receive_quota (void *cls,
-                              const struct GNUNET_PeerIdentity *target,
-                              uint32_t quota_in)
-{
-  return; /* Do nothing */
-}
 
 /**
  * The exported method. Makes the core api available via a global and
@@ -1715,7 +1698,6 @@
   api->send = &udp_nat_plugin_send;
   api->disconnect = &udp_nat_disconnect;
   api->address_pretty_printer = &udp_nat_plugin_address_pretty_printer;
-  api->set_receive_quota = &udp_nat_plugin_set_receive_quota;
   api->check_address = &udp_nat_check_address;
 
   plugin->service = service;





reply via email to

[Prev in Thread] Current Thread [Next in Thread]