gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r33776 - gnunet/src/transport


From: gnunet
Subject: [GNUnet-SVN] r33776 - gnunet/src/transport
Date: Mon, 23 Jun 2014 13:08:12 +0200

Author: grothoff
Date: 2014-06-23 13:08:12 +0200 (Mon, 23 Jun 2014)
New Revision: 33776

Modified:
   gnunet/src/transport/plugin_transport_udp.c
Log:
adding monitoring support to UDP plugin, plus some doxygen/indentation fixes

Modified: gnunet/src/transport/plugin_transport_udp.c
===================================================================
--- gnunet/src/transport/plugin_transport_udp.c 2014-06-23 10:53:10 UTC (rev 
33775)
+++ gnunet/src/transport/plugin_transport_udp.c 2014-06-23 11:08:12 UTC (rev 
33776)
@@ -185,13 +185,11 @@
 
   /**
    * Number of bytes waiting for transmission to this peer.
-   * FIXME: not updated yet!
    */
   unsigned long long bytes_in_queue;
 
   /**
    * Number of messages waiting for transmission to this peer.
-   * FIXME: not updated yet!
    */
   unsigned int msgs_in_queue;
 
@@ -1183,14 +1181,21 @@
 
 
 /**
- * FIXME.
+ * Remove a message from the transmission queue.
+ *
+ * @param plugin the UDP plugin
+ * @param udpw message wrapper to queue
  */
 static void
 dequeue (struct Plugin *plugin,
          struct UDP_MessageWrapper *udpw)
 {
+  struct Session *session = udpw->session;
+
   if (plugin->bytes_in_buffer < udpw->msg_size)
-    GNUNET_break(0);
+  {
+    GNUNET_break (0);
+  }
   else
   {
     GNUNET_STATISTICS_update (plugin->env->stats,
@@ -1203,13 +1208,22 @@
                             "# UDP, total, msgs in buffers",
                             -1, GNUNET_NO);
   if (udpw->session->address->address_length == sizeof(struct IPv4UdpAddress))
-    GNUNET_CONTAINER_DLL_remove(plugin->ipv4_queue_head,
-        plugin->ipv4_queue_tail, udpw);
+    GNUNET_CONTAINER_DLL_remove (plugin->ipv4_queue_head,
+                                 plugin->ipv4_queue_tail,
+                                 udpw);
   else if (udpw->session->address->address_length == sizeof(struct 
IPv6UdpAddress))
-    GNUNET_CONTAINER_DLL_remove(plugin->ipv6_queue_head,
-        plugin->ipv6_queue_tail, udpw);
+    GNUNET_CONTAINER_DLL_remove (plugin->ipv6_queue_head,
+                                 plugin->ipv6_queue_tail,
+                                 udpw);
   else
+  {
     GNUNET_break (0);
+    return;
+  }
+  GNUNET_assert (session->msgs_in_queue > 0);
+  session->msgs_in_queue--;
+  GNUNET_assert (session->bytes_in_queue >= udpw->msg_size);
+  session->bytes_in_queue -= udpw->msg_size;
 }
 
 
@@ -1240,9 +1254,7 @@
   dummy.cont = NULL;
   dummy.cont_cls = NULL;
   dummy.session = s;
-
   call_continuation (&dummy, result);
-
   /* Remove leftover fragments from queue */
   if (s->address->address_length == sizeof(struct IPv6UdpAddress))
   {
@@ -1273,12 +1285,15 @@
       udpw = tmp;
     }
   }
-
+  notify_session_monitor (s->plugin,
+                          s,
+                          GNUNET_TRANSPORT_SS_UP);
   /* Destroy fragmentation context */
-  GNUNET_FRAGMENT_context_destroy (fc->frag, &s->last_expected_msg_delay,
-      &s->last_expected_ack_delay);
+  GNUNET_FRAGMENT_context_destroy (fc->frag,
+                                   &s->last_expected_msg_delay,
+                                   &s->last_expected_ack_delay);
   s->frag_ctx = NULL;
-  GNUNET_free(fc);
+  GNUNET_free (fc);
 }
 
 
@@ -1363,7 +1378,6 @@
       GNUNET_free (d_ctx);
     }
   }
-
   next = plugin->ipv4_queue_head;
   while (NULL != (udpw = next))
   {
@@ -1386,6 +1400,9 @@
       GNUNET_free(udpw);
     }
   }
+  notify_session_monitor (s->plugin,
+                          s,
+                          GNUNET_TRANSPORT_SS_DOWN);
   plugin->env->session_end (plugin->env->cls,
                             s->address,
                             s);
@@ -1414,7 +1431,9 @@
                          GNUNET_CONTAINER_multipeermap_size (plugin->sessions),
                          GNUNET_NO);
   if (s->rc > 0)
+  {
     s->in_destroy = GNUNET_YES;
+  }
   else
   {
     GNUNET_HELLO_address_free (s->address);
@@ -1881,30 +1900,45 @@
 
 
 /**
- * FIXME.
+ * Enqueue a message for transmission.
+ *
+ * @param plugin the UDP plugin
+ * @param udpw message wrapper to queue
  */
 static void
 enqueue (struct Plugin *plugin,
          struct UDP_MessageWrapper *udpw)
 {
+  struct Session *session = udpw->session;
+
   if (plugin->bytes_in_buffer + udpw->msg_size > INT64_MAX)
-    GNUNET_break(0);
+  {
+    GNUNET_break (0);
+  }
   else
   {
     GNUNET_STATISTICS_update (plugin->env->stats,
         "# UDP, total, bytes in buffers", udpw->msg_size, GNUNET_NO);
     plugin->bytes_in_buffer += udpw->msg_size;
   }
-  GNUNET_STATISTICS_update (plugin->env->stats, "# UDP, total, msgs in 
buffers",
-      1, GNUNET_NO);
+  GNUNET_STATISTICS_update (plugin->env->stats,
+                            "# UDP, total, msgs in buffers",
+                            1, GNUNET_NO);
   if (udpw->session->address->address_length == sizeof (struct IPv4UdpAddress))
     GNUNET_CONTAINER_DLL_insert(plugin->ipv4_queue_head,
-        plugin->ipv4_queue_tail, udpw);
+                                plugin->ipv4_queue_tail,
+                                udpw);
   else if (udpw->session->address->address_length == sizeof (struct 
IPv6UdpAddress))
-    GNUNET_CONTAINER_DLL_insert(plugin->ipv6_queue_head,
-        plugin->ipv6_queue_tail, udpw);
+    GNUNET_CONTAINER_DLL_insert (plugin->ipv6_queue_head,
+                                 plugin->ipv6_queue_tail,
+                                 udpw);
   else
+  {
     GNUNET_break (0);
+    return;
+  }
+  session->msgs_in_queue++;
+  session->bytes_in_queue += udpw->msg_size;
 }
 
 
@@ -1949,7 +1983,9 @@
   struct UDP_MessageWrapper * udpw;
   size_t msg_len = ntohs (msg->size);
 
-  LOG(GNUNET_ERROR_TYPE_DEBUG, "Enqueuing fragment with %u bytes\n", msg_len);
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Enqueuing fragment with %u bytes\n",
+       msg_len);
   frag_ctx->fragments_used++;
   udpw = GNUNET_malloc (sizeof (struct UDP_MessageWrapper) + msg_len);
   udpw->session = frag_ctx->session;
@@ -2073,8 +2109,9 @@
     GNUNET_STATISTICS_update (plugin->env->stats,
         "# UDP, unfragmented msgs, messages, attempt", 1, GNUNET_NO);
     GNUNET_STATISTICS_update (plugin->env->stats,
-        "# UDP, unfragmented msgs, bytes payload, attempt", udpw->payload_size,
-        GNUNET_NO);
+                              "# UDP, unfragmented msgs, bytes payload, 
attempt",
+                              udpw->payload_size,
+                              GNUNET_NO);
   }
   else
   {
@@ -2092,17 +2129,30 @@
     frag_ctx->payload_size = msgbuf_size; /* unfragmented message size without 
UDP overhead */
     frag_ctx->on_wire_size = 0; /* bytes with UDP and fragmentation overhead */
     frag_ctx->frag = GNUNET_FRAGMENT_context_create (plugin->env->stats,
-        UDP_MTU, &plugin->tracker, s->last_expected_msg_delay,
-        s->last_expected_ack_delay, &udp->header, &enqueue_fragment, frag_ctx);
+                                                     UDP_MTU,
+                                                     &plugin->tracker,
+                                                     
s->last_expected_msg_delay,
+                                                     
s->last_expected_ack_delay,
+                                                     &udp->header,
+                                                     &enqueue_fragment,
+                                                     frag_ctx);
     s->frag_ctx = frag_ctx;
     GNUNET_STATISTICS_update (plugin->env->stats,
-        "# UDP, fragmented msgs, messages, pending", 1, GNUNET_NO);
+                              "# UDP, fragmented msgs, messages, pending",
+                              1,
+                              GNUNET_NO);
     GNUNET_STATISTICS_update (plugin->env->stats,
-        "# UDP, fragmented msgs, messages, attempt", 1, GNUNET_NO);
+                              "# UDP, fragmented msgs, messages, attempt",
+                              1,
+                              GNUNET_NO);
     GNUNET_STATISTICS_update (plugin->env->stats,
-        "# UDP, fragmented msgs, bytes payload, attempt",
-        frag_ctx->payload_size, GNUNET_NO);
+                              "# UDP, fragmented msgs, bytes payload, attempt",
+                              frag_ctx->payload_size,
+                              GNUNET_NO);
   }
+  notify_session_monitor (s->plugin,
+                          s,
+                          GNUNET_TRANSPORT_SS_UP);
   schedule_select (plugin);
   return udpmlen;
 }
@@ -2392,6 +2442,9 @@
   udp_ack->sender = *rc->plugin->env->my_identity;
   memcpy (&udp_ack[1], msg, ntohs (msg->size));
   enqueue (rc->plugin, udpw);
+  notify_session_monitor (s->plugin,
+                          s,
+                          GNUNET_TRANSPORT_SS_UP);
   schedule_select (rc->plugin);
 }
 
@@ -2658,7 +2711,9 @@
   struct GNUNET_TIME_Relative remaining;
   struct Session *session;
   struct Plugin *plugin;
+  int removed;
 
+  removed = GNUNET_NO;
   udpw = head;
   while (NULL != udpw)
   {
@@ -2673,52 +2728,74 @@
       {
       case UMT_MSG_UNFRAGMENTED:
         GNUNET_STATISTICS_update (plugin->env->stats,
-            "# UDP, total, bytes, sent, timeout", udpw->msg_size, GNUNET_NO);
+                                  "# UDP, total, bytes, sent, timeout",
+                                  udpw->msg_size,
+                                  GNUNET_NO);
         GNUNET_STATISTICS_update (plugin->env->stats,
-            "# UDP, total, messages, sent, timeout", 1, GNUNET_NO);
+                                  "# UDP, total, messages, sent, timeout",
+                                  1,
+                                  GNUNET_NO);
         GNUNET_STATISTICS_update (plugin->env->stats,
-            "# UDP, unfragmented msgs, messages, sent, timeout", 1, GNUNET_NO);
+                                  "# UDP, unfragmented msgs, messages, sent, 
timeout",
+                                  1,
+                                  GNUNET_NO);
         GNUNET_STATISTICS_update (plugin->env->stats,
-            "# UDP, unfragmented msgs, bytes, sent, timeout",
-            udpw->payload_size, GNUNET_NO);
+                                  "# UDP, unfragmented msgs, bytes, sent, 
timeout",
+                                  udpw->payload_size,
+                                  GNUNET_NO);
         /* Not fragmented message */
-        LOG(GNUNET_ERROR_TYPE_DEBUG,
-            "Message for peer `%s' with size %u timed out\n",
-            GNUNET_i2s (&udpw->session->target), udpw->payload_size);
+        LOG (GNUNET_ERROR_TYPE_DEBUG,
+             "Message for peer `%s' with size %u timed out\n",
+             GNUNET_i2s (&udpw->session->target),
+             udpw->payload_size);
         call_continuation (udpw, GNUNET_SYSERR);
         /* Remove message */
+        removed = GNUNET_YES;
         dequeue (plugin, udpw);
         GNUNET_free(udpw);
         break;
       case UMT_MSG_FRAGMENTED:
         /* Fragmented message */
         GNUNET_STATISTICS_update (plugin->env->stats,
-            "# UDP, total, bytes, sent, timeout", udpw->frag_ctx->on_wire_size,
-            GNUNET_NO);
+                                  "# UDP, total, bytes, sent, timeout",
+                                  udpw->frag_ctx->on_wire_size,
+                                  GNUNET_NO);
         GNUNET_STATISTICS_update (plugin->env->stats,
-            "# UDP, total, messages, sent, timeout", 1, GNUNET_NO);
+                                  "# UDP, total, messages, sent, timeout",
+                                  1,
+                                  GNUNET_NO);
         call_continuation (udpw, GNUNET_SYSERR);
-        LOG(GNUNET_ERROR_TYPE_DEBUG,
-            "Fragment for message for peer `%s' with size %u timed out\n",
-            GNUNET_i2s (&udpw->session->target), udpw->frag_ctx->payload_size);
+        LOG (GNUNET_ERROR_TYPE_DEBUG,
+             "Fragment for message for peer `%s' with size %u timed out\n",
+             GNUNET_i2s (&udpw->session->target),
+            udpw->frag_ctx->payload_size);
 
         GNUNET_STATISTICS_update (plugin->env->stats,
-            "# UDP, fragmented msgs, messages, sent, timeout", 1, GNUNET_NO);
+                                  "# UDP, fragmented msgs, messages, sent, 
timeout",
+                                  1,
+                                  GNUNET_NO);
         GNUNET_STATISTICS_update (plugin->env->stats,
-            "# UDP, fragmented msgs, bytes, sent, timeout",
-            udpw->frag_ctx->payload_size, GNUNET_NO);
+                                  "# UDP, fragmented msgs, bytes, sent, 
timeout",
+                                  udpw->frag_ctx->payload_size,
+                                  GNUNET_NO);
         /* Remove fragmented message due to timeout */
         fragmented_message_done (udpw->frag_ctx, GNUNET_SYSERR);
         break;
       case UMT_MSG_ACK:
         GNUNET_STATISTICS_update (plugin->env->stats,
-            "# UDP, total, bytes, sent, timeout", udpw->msg_size, GNUNET_NO);
+                                  "# UDP, total, bytes, sent, timeout",
+                                  udpw->msg_size,
+                                  GNUNET_NO);
         GNUNET_STATISTICS_update (plugin->env->stats,
-            "# UDP, total, messages, sent, timeout", 1, GNUNET_NO);
-        LOG(GNUNET_ERROR_TYPE_DEBUG,
-            "ACK Message for peer `%s' with size %u timed out\n",
-            GNUNET_i2s (&udpw->session->target), udpw->payload_size);
+                                  "# UDP, total, messages, sent, timeout",
+                                  1,
+                                  GNUNET_NO);
+        LOG (GNUNET_ERROR_TYPE_DEBUG,
+             "ACK Message for peer `%s' with size %u timed out\n",
+             GNUNET_i2s (&udpw->session->target),
+             udpw->payload_size);
         call_continuation (udpw, GNUNET_SYSERR);
+        removed = GNUNET_YES;
         dequeue (plugin, udpw);
         GNUNET_free(udpw);
         break;
@@ -2735,32 +2812,38 @@
         udpw = NULL;
       }
       GNUNET_STATISTICS_update (plugin->env->stats,
-          "# messages dismissed due to timeout", 1, GNUNET_NO);
+                                "# messages discarded due to timeout",
+                                1,
+                                GNUNET_NO);
     }
     else
     {
       /* Message did not time out, check flow delay */
-      remaining = GNUNET_TIME_absolute_get_remaining (
-          udpw->session->flow_delay_from_other_peer);
+      remaining = GNUNET_TIME_absolute_get_remaining 
(udpw->session->flow_delay_from_other_peer);
       if (GNUNET_TIME_UNIT_ZERO.rel_value_us == remaining.rel_value_us)
       {
         /* this message is not delayed */
-        LOG(GNUNET_ERROR_TYPE_DEBUG,
-            "Message for peer `%s' (%u bytes) is not delayed \n",
-            GNUNET_i2s (&udpw->session->target), udpw->payload_size);
+        LOG (GNUNET_ERROR_TYPE_DEBUG,
+             "Message for peer `%s' (%u bytes) is not delayed \n",
+             GNUNET_i2s (&udpw->session->target),
+             udpw->payload_size);
         break; /* Found message to send, break */
       }
       else
       {
         /* Message is delayed, try next */
-        LOG(GNUNET_ERROR_TYPE_DEBUG,
-            "Message for peer `%s' (%u bytes) is delayed for %s\n",
-            GNUNET_i2s (&udpw->session->target), udpw->payload_size,
-            GNUNET_STRINGS_relative_time_to_string (remaining, GNUNET_YES));
+        LOG (GNUNET_ERROR_TYPE_DEBUG,
+             "Message for peer `%s' (%u bytes) is delayed for %s\n",
+             GNUNET_i2s (&udpw->session->target), udpw->payload_size,
+             GNUNET_STRINGS_relative_time_to_string (remaining, GNUNET_YES));
         udpw = udpw->next;
       }
     }
   }
+  if (GNUNET_YES == removed)
+    notify_session_monitor (session->plugin,
+                            session,
+                            GNUNET_TRANSPORT_SS_UP);
   return udpw;
 }
 
@@ -2868,13 +2951,18 @@
   {
     call_continuation (udpw, GNUNET_OK);
     dequeue (plugin, udpw);
+    notify_session_monitor (plugin,
+                            udpw->session,
+                            GNUNET_TRANSPORT_SS_UP);
     GNUNET_free (udpw);
     return GNUNET_SYSERR;
   }
 
-  sent = GNUNET_NETWORK_socket_sendto (sock, udpw->msg_buf, udpw->msg_size, a,
-      slen);
-
+  sent = GNUNET_NETWORK_socket_sendto (sock,
+                                       udpw->msg_buf,
+                                       udpw->msg_size,
+                                       a,
+                                       slen);
   if (GNUNET_SYSERR == sent)
   {
     /* Failure */
@@ -2902,9 +2990,10 @@
     call_continuation (udpw, GNUNET_OK);
   }
   dequeue (plugin, udpw);
+  notify_session_monitor (plugin,
+                          udpw->session,
+                          GNUNET_TRANSPORT_SS_UP);
   GNUNET_free(udpw);
-  udpw = NULL;
-
   return sent;
 }
 
@@ -3146,8 +3235,10 @@
     }
     else
     {
-      LOG(GNUNET_ERROR_TYPE_ERROR, "Failed to bind UDP socket to %s: %s\n",
-          GNUNET_a2s (server_addr, addrlen), STRERROR (eno));
+      LOG (GNUNET_ERROR_TYPE_ERROR,
+           _("Failed to bind UDP socket to %s: %s\n"),
+           GNUNET_a2s (server_addr, addrlen),
+           STRERROR (eno));
     }
   }
 
@@ -3185,10 +3276,15 @@
   }
 
   schedule_select (plugin);
-  plugin->nat = GNUNET_NAT_register (plugin->env->cfg, GNUNET_NO, plugin->port,
+  plugin->nat = GNUNET_NAT_register (plugin->env->cfg,
+                                     GNUNET_NO,
+                                     plugin->port,
                                      sockets_created,
-                                     (const struct sockaddr **) addrs, 
addrlens,
-                                     &udp_nat_port_map_callback, NULL, plugin);
+                                     (const struct sockaddr **) addrs,
+                                     addrlens,
+                                     &udp_nat_port_map_callback,
+                                     NULL,
+                                     plugin);
 
   return sockets_created;
 }




reply via email to

[Prev in Thread] Current Thread [Next in Thread]