gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[gnunet] 01/02: - Trying to exchange iptables with nft, first shot faile


From: gnunet
Subject: [gnunet] 01/02: - Trying to exchange iptables with nft, first shot failed. - Fixed small bug in UDP communicator. - Fixed bug in DV circle test case - Introduced a default value to wait for a reliability ack. - Introduced a FC retransmission threshold together with a retransmission count. - Introduced a original size value for TransportDVBoxMessage - Checking if we have the root pending messge, when removing the pending message from virtual link. - Added delay value to schedule_transmit_on_queue to wait f [...]
Date: Thu, 17 Mar 2022 17:04:27 +0100

This is an automated email from the git hooks/post-receive script.

t3sserakt pushed a commit to branch master
in repository gnunet.

commit 95a1edacccd9b3bf769a144a12d41946d0ac25dc
Author: t3sserakt <t3ss@posteo.de>
AuthorDate: Thu Mar 17 14:28:40 2022 +0100

    - Trying to exchange iptables with nft, first shot failed.
    - Fixed small bug in UDP communicator.
    - Fixed bug in DV circle test case
    - Introduced a default value to wait for a reliability ack.
    - Introduced a FC retransmission threshold together with a retransmission 
count.
    - Introduced a original size value for TransportDVBoxMessage
    - Checking if we have the root pending messge, when removing the pending 
message from virtual link.
    - Added delay value to schedule_transmit_on_queue to wait for 
retransmitting.
    - Checking for confirmed virtual link, before routing.
    - Allow unconfirmed queues or DV routes when doing dv encapsulation for 
control traffic.
    - Changed check_vl_transmission to also check window size for DV next hop 
peer.
    - Fixed fragment box handling to also handle reliability boxed message 
which needed to be fragmented.
    - Fixed completing a message which was not only fragmented but also DV 
boxed.
    - Added logic to notify core about a new virtual link using distance vector 
without having validated next neighbour.
    - Added logic to create a virtual link to handle flow control messages.
    - fixed several smaller bugs in fragmentation logic.
    - Changed logic for adding the next_attempt value of PendingMessage.
---
 contrib/netjail/netjail_core.sh                    |   5 +-
 contrib/netjail/netjail_start.sh                   |   4 +
 src/transport/Makefile.am                          |   2 +-
 src/transport/gnunet-communicator-udp.c            |  29 +-
 src/transport/gnunet-service-tng.c                 | 699 +++++++++++++++------
 ...test_transport_distance_vector_circle_topo.conf |   2 +-
 .../test_transport_plugin_cmd_simple_send_dv.c     |  79 +--
 src/transport/transport_api_cmd_start_peer.c       |   2 +-
 src/util/mq.c                                      |  11 +-
 9 files changed, 581 insertions(+), 252 deletions(-)

diff --git a/contrib/netjail/netjail_core.sh b/contrib/netjail/netjail_core.sh
index ed363cf35..da784fa5e 100755
--- a/contrib/netjail/netjail_core.sh
+++ b/contrib/netjail/netjail_core.sh
@@ -188,7 +188,10 @@ netjail_node_add_nat() {
        local ADDRESS=$2
        local MASK=$3
 
-       ip netns exec $NODE iptables -t nat -A POSTROUTING -s "$ADDRESS/$MASK" 
-j MASQUERADE
+    ip netns exec $NODE nft add table nat
+    ip netns exec $NODE nft add chain nat postrouting { type nat hook 
postrouting priority 0 \; }
+    ip netns exec $NODE nft add rule ip nat postrouting ip saddr 
"$ADDRESS/$MASK" counter masquerade
+       # ip netns exec $NODE iptables -t nat -A POSTROUTING -s 
"$ADDRESS/$MASK" -j MASQUERADE
 }
 
 netjail_node_add_default() {
diff --git a/contrib/netjail/netjail_start.sh b/contrib/netjail/netjail_start.sh
index f7c417c27..e2d5fd634 100755
--- a/contrib/netjail/netjail_start.sh
+++ b/contrib/netjail/netjail_start.sh
@@ -77,11 +77,15 @@ for N in $(seq $GLOBAL_N); do
     
     if [ "1" == "${R_TCP[$N]}" ]
     then
+        #ip netns exec ${ROUTERS[$N]} nft add rule ip nat prerouting ip daddr 
$GLOBAL_GROUP.$N tcp dport 60002 counter dnat to $LOCAL_GROUP.1
+        #ip netns exec ${ROUTERS[$N]} nft add rule ip filter FORWARD ip daddr 
$LOCAL_GROUP.1 ct state new,related,established  counter accept
         ip netns exec ${ROUTERS[$N]} iptables -t nat -A PREROUTING -p tcp -d 
$GLOBAL_GROUP.$N --dport 60002 -j DNAT --to $LOCAL_GROUP.1
         ip netns exec ${ROUTERS[$N]} iptables -A FORWARD -d $LOCAL_GROUP.1  -m 
state --state NEW,RELATED,ESTABLISHED -j ACCEPT
     fi
     if [ "1" == "${R_UDP[$N]}" ]
     then
+        #ip netns exec ${ROUTERS[$N]} nft add rule ip nat prerouting ip daddr 
$GLOBAL_GROUP.$N udp dport $PORT counter dnat to $LOCAL_GROUP.1
+        #ip netns exec ${ROUTERS[$N]} nft add rule ip filter FORWARD ip daddr 
$LOCAL_GROUP.1 ct state new,related,established  counter accept
         ip netns exec ${ROUTERS[$N]} iptables -t nat -A PREROUTING -p udp -d 
$GLOBAL_GROUP.$N --dport $PORT -j DNAT --to $LOCAL_GROUP.1
         ip netns exec ${ROUTERS[$N]} iptables -A FORWARD -d $LOCAL_GROUP.1  -m 
state --state NEW,RELATED,ESTABLISHED -j ACCEPT
     fi
diff --git a/src/transport/Makefile.am b/src/transport/Makefile.am
index 5d6fb8421..ec00bc917 100644
--- a/src/transport/Makefile.am
+++ b/src/transport/Makefile.am
@@ -778,7 +778,7 @@ check_SCRIPTS= \
   test_transport_simple_send.sh \
   test_transport_simple_send_broadcast.sh \
   test_transport_udp_backchannel.sh \
-  # test_transport_simple_send_dv_circle.sh 
+  test_transport_simple_send_dv_circle.sh 
   # test_transport_simple_send_dv_inverse.sh
 
 test_transport_start_with_config_SOURCES = \
diff --git a/src/transport/gnunet-communicator-udp.c 
b/src/transport/gnunet-communicator-udp.c
index b6edff485..70848ff79 100644
--- a/src/transport/gnunet-communicator-udp.c
+++ b/src/transport/gnunet-communicator-udp.c
@@ -1273,7 +1273,7 @@ pass_plaintext_to_core (struct SenderAddress *sender,
   const struct GNUNET_MessageHeader *hdr = plaintext;
   const char *pos = plaintext;
 
-  while (ntohs (hdr->size) < plaintext_len)
+  while (ntohs (hdr->size) <= plaintext_len)
   {
     GNUNET_STATISTICS_update (stats,
                               "# bytes given to core",
@@ -1722,6 +1722,12 @@ try_handle_plaintext (struct SenderAddress *sender,
   const struct UDPAck *ack = (const struct UDPAck *) buf;
   uint16_t type;
 
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "try_handle_plaintext of size %u (%u %u) and type %u\n",
+              buf_size,
+              ntohs (hdr->size),
+              sizeof(*hdr),
+              ntohs (hdr->type));
   if (sizeof(*hdr) > buf_size)
     return; /* not even a header */
   if (ntohs (hdr->size) > buf_size)
@@ -2202,7 +2208,8 @@ verify_confirmation (const struct 
GNUNET_CRYPTO_EcdhePublicKey *ephemeral,
 {
   struct UdpHandshakeSignature uhs;
 
-  uhs.purpose.purpose = htonl 
(GNUNET_SIGNATURE_PURPOSE_COMMUNICATOR_UDP_HANDSHAKE);
+  uhs.purpose.purpose = htonl (
+    GNUNET_SIGNATURE_PURPOSE_COMMUNICATOR_UDP_HANDSHAKE);
   uhs.purpose.size = htonl (sizeof(uhs));
   uhs.sender = uc->sender;
   uhs.receiver = my_identity;
@@ -2350,7 +2357,8 @@ sock_read (void *cls)
                 "received UDPBroadcast from %s\n",
                 GNUNET_a2s ((const struct sockaddr *) addr_verify, salen));
     ub = (const struct UDPBroadcast *) buf;
-    uhs.purpose.purpose = htonl 
(GNUNET_SIGNATURE_PURPOSE_COMMUNICATOR_UDP_BROADCAST);
+    uhs.purpose.purpose = htonl (
+      GNUNET_SIGNATURE_PURPOSE_COMMUNICATOR_UDP_BROADCAST);
     uhs.purpose.size = htonl (sizeof(uhs));
     uhs.sender = ub->sender;
     sender = ub->sender;
@@ -2366,10 +2374,11 @@ sock_read (void *cls)
                 GNUNET_i2s (&sender));
     GNUNET_CRYPTO_hash ((struct sockaddr *) addr_verify, salen, 
&uhs.h_address);
     if (GNUNET_OK ==
-        GNUNET_CRYPTO_eddsa_verify 
(GNUNET_SIGNATURE_PURPOSE_COMMUNICATOR_UDP_BROADCAST,
-                                    &uhs,
-                                    &ub->sender_sig,
-                                    &ub->sender.public_key))
+        GNUNET_CRYPTO_eddsa_verify (
+          GNUNET_SIGNATURE_PURPOSE_COMMUNICATOR_UDP_BROADCAST,
+          &uhs,
+          &ub->sender_sig,
+          &ub->sender.public_key))
     {
       char *addr_s;
       enum GNUNET_NetworkType nt;
@@ -2699,7 +2708,8 @@ mq_send_kx (struct GNUNET_MQ_Handle *mq,
   uc.sender = my_identity;
   uc.monotonic_time =
     GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get_monotonic (cfg));
-  uhs.purpose.purpose = htonl 
(GNUNET_SIGNATURE_PURPOSE_COMMUNICATOR_UDP_HANDSHAKE);
+  uhs.purpose.purpose = htonl (
+    GNUNET_SIGNATURE_PURPOSE_COMMUNICATOR_UDP_HANDSHAKE);
   uhs.purpose.size = htonl (sizeof(uhs));
   uhs.sender = my_identity;
   uhs.receiver = receiver->target;
@@ -3644,7 +3654,8 @@ iface_proc (void *cls,
   bi->salen = addrlen;
   bi->found = GNUNET_YES;
   bi->bcm.sender = my_identity;
-  ubs.purpose.purpose = htonl 
(GNUNET_SIGNATURE_PURPOSE_COMMUNICATOR_UDP_BROADCAST);
+  ubs.purpose.purpose = htonl (
+    GNUNET_SIGNATURE_PURPOSE_COMMUNICATOR_UDP_BROADCAST);
   ubs.purpose.size = htonl (sizeof(ubs));
   ubs.sender = my_identity;
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
diff --git a/src/transport/gnunet-service-tng.c 
b/src/transport/gnunet-service-tng.c
index 56a854a70..0427bd229 100644
--- a/src/transport/gnunet-service-tng.c
+++ b/src/transport/gnunet-service-tng.c
@@ -81,6 +81,11 @@
 #include "gnunet_signatures.h"
 #include "transport.h"
 
+/**
+ * Maximum number of FC retransmissions for a runing retransmission task.
+ */
+#define MAX_FC_RETRANSMIT_COUNT 1000
+
 /**
  * Maximum number of messages we acknowledge together in one
  * cumulative ACK.  Larger values may save a bit of bandwidth.
@@ -185,6 +190,12 @@
 #define DV_FORWARD_TIMEOUT \
   GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 60)
 
+/**
+ * Default value for how long we wait for reliability ack.
+ */
+#define DEFAULT_ACK_WAIT_DURATION \
+  GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 1)
+
 /**
  * We only consider queues as "quality" connections when
  * suppressing the generation of DV initiation messages if
@@ -781,6 +792,16 @@ struct TransportDVBoxMessage
    */
   struct GNUNET_HashCode hmac;
 
+  /**
+   * Size this msg had initially. This is needed to calculate the hmac at the 
target.
+   * The header size can not be used for that, because the box size is getting 
smaller at each hop.
+   */
+  /**
+   * The length of the struct (in bytes, including the length field itself),
+   * in big-endian format.
+   */
+  uint16_t orig_size GNUNET_PACKED;
+
   /* Followed by @e num_hops `struct GNUNET_PeerIdentity` values;
      excluding the @e origin and the current peer, the last must be
      the ultimate target; if @e num_hops is zero, the receiver of this
@@ -1341,6 +1362,17 @@ struct VirtualLink
    */
   struct GNUNET_SCHEDULER_Task *fc_retransmit_task;
 
+  /**
+   * Number of FC retransmissions for this running task.
+   */
+  unsigned int fc_retransmit_count;
+
+  /**
+   * Is this VirtualLink confirmed.
+   * A unconfirmed VirtualLink might exist, if we got a FC from that target.
+   */
+  unsigned int confirmed;
+
   /**
    * Neighbour used by this virtual link, NULL if @e dv is used.
    */
@@ -2845,8 +2877,8 @@ free_pending_acknowledgement (struct 
PendingAcknowledgement *pa)
   struct PendingMessage *pm = pa->pm;
   struct DistanceVectorHop *dvh = pa->dvh;
 
-  GNUNET_CONTAINER_MDLL_remove (pa, pa_head, pa_tail, pa);
-  pa_count--;
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "free_pending_acknowledgement\n");
   if (NULL != q)
   {
     GNUNET_CONTAINER_MDLL_remove (queue, q->pa_head, q->pa_tail, pa);
@@ -2854,6 +2886,17 @@ free_pending_acknowledgement (struct 
PendingAcknowledgement *pa)
   }
   if (NULL != pm)
   {
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "remove pa from message\n");
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "remove pa from message %llu\n",
+                pm->logging_uuid);
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "remove pa from message %u\n",
+                pm->pmt);
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "remove pa from message %s\n",
+                GNUNET_uuid2s (&pa->ack_uuid.value));
     GNUNET_CONTAINER_MDLL_remove (pm, pm->pa_head, pm->pa_tail, pa);
     pa->pm = NULL;
   }
@@ -2920,8 +2963,11 @@ free_pending_message (struct PendingMessage *pm)
                                   tc->details.core.pending_msg_tail,
                                   pm);
   }
-  if (NULL != vl)
+  if ((NULL != vl) && (NULL == pm->frag_parent))
   {
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Removing pm %lu\n",
+                pm->logging_uuid);
     GNUNET_CONTAINER_MDLL_remove (vl,
                                   vl->pending_msg_head,
                                   vl->pending_msg_tail,
@@ -2929,6 +2975,18 @@ free_pending_message (struct PendingMessage *pm)
   }
   while (NULL != (pa = pm->pa_head))
   {
+    if (NULL == pa)
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                  "free pending pa  null\n");
+    if (NULL == pm->pa_tail)
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                  "free pending pa_tail null\n");
+    if (NULL == pa->prev_pa)
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                  "free pending pa prev null\n");
+    if (NULL == pa->next_pa)
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                  "free pending pa next null\n");
     GNUNET_CONTAINER_MDLL_remove (pm, pm->pa_head, pm->pa_tail, pa);
     pa->pm = NULL;
   }
@@ -2944,6 +3002,9 @@ free_pending_message (struct PendingMessage *pm)
     free_fragment_tree (pm->bpm);
     GNUNET_free (pm->bpm);
   }
+  if (NULL == pm)
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "free pending pm  null\n");
   GNUNET_free (pm);
 }
 
@@ -3028,6 +3089,9 @@ free_virtual_link (struct VirtualLink *vl)
   struct PendingMessage *pm;
   struct CoreSentContext *csc;
 
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "free virtual link\n");
+
   if (NULL != vl->reassembly_map)
   {
     GNUNET_CONTAINER_multihashmap32_iterate (vl->reassembly_map,
@@ -3084,6 +3148,8 @@ free_validation_state (struct ValidationState *vs)
   vs->hn = NULL;
   if (NULL != vs->sc)
   {
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "store cancel\n");
     GNUNET_PEERSTORE_store_cancel (vs->sc);
     vs->sc = NULL;
   }
@@ -3392,6 +3458,8 @@ free_neighbour (struct Neighbour *neighbour)
   }
   if (NULL != neighbour->sc)
   {
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "store cancel\n");
     GNUNET_PEERSTORE_store_cancel (neighbour->sc);
     neighbour->sc = NULL;
   }
@@ -3503,7 +3571,8 @@ check_for_queue_with_higher_prio (struct Queue *queue, 
struct Queue *queue_head)
  * @param p task priority to use, if @a queue is scheduled
  */
 static void
-schedule_transmit_on_queue (struct Queue *queue,
+schedule_transmit_on_queue (struct GNUNET_TIME_Relative delay,
+                            struct Queue *queue,
                             enum GNUNET_SCHEDULER_Priority p)
 {
   if (check_for_queue_with_higher_prio (queue,
@@ -3552,7 +3621,8 @@ schedule_transmit_on_queue (struct Queue *queue,
   if (NULL != queue->transmit_task)
     GNUNET_SCHEDULER_cancel (queue->transmit_task);
   queue->transmit_task =
-    GNUNET_SCHEDULER_add_with_priority (p, &transmit_on_queue, queue);
+    GNUNET_SCHEDULER_add_delayed_with_priority (delay, p, &transmit_on_queue,
+                                                queue);
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Considering transmission on queue `%s' to %s\n",
               queue->address,
@@ -3677,13 +3747,15 @@ free_queue (struct Queue *queue)
       GNUNET_NO);
     for (struct Queue *s = tc->details.communicator.queue_head; NULL != s;
          s = s->next_client)
-      schedule_transmit_on_queue (s, GNUNET_SCHEDULER_PRIORITY_DEFAULT);
+      schedule_transmit_on_queue (GNUNET_TIME_UNIT_ZERO,
+                                  s,
+                                  GNUNET_SCHEDULER_PRIORITY_DEFAULT);
   }
   notify_monitors (&neighbour->pid, queue->address, queue->nt, &me);
   GNUNET_free (queue);
 
   vl = lookup_virtual_link (&neighbour->pid);
-  if ((NULL != vl) && (neighbour == vl->n))
+  if ((NULL != vl) && (GNUNET_YES == vl->confirmed) && (neighbour == vl->n))
   {
     GNUNET_SCHEDULER_cancel (vl->visibility_task);
     check_link_down (vl);
@@ -3710,6 +3782,8 @@ free_address_list_entry (struct AddressListEntry *ale)
                                ale);
   if (NULL != ale->sc)
   {
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "store cancel\n");
     GNUNET_PEERSTORE_store_cancel (ale->sc);
     ale->sc = NULL;
   }
@@ -3954,6 +4028,8 @@ client_send_response (struct PendingMessage *pm)
   struct TransportClient *tc = pm->client;
   struct VirtualLink *vl = pm->vl;
 
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "client send response\n");
   if (NULL != tc)
   {
     struct GNUNET_MQ_Envelope *env;
@@ -4127,7 +4203,7 @@ handle_client_recv_ok (void *cls, const struct 
RecvOkMessage *rom)
     return;
   }
   vl = lookup_virtual_link (&rom->peer);
-  if (NULL == vl)
+  if ((NULL == vl) || (GNUNET_NO == vl->confirmed))
   {
     GNUNET_STATISTICS_update (GST_stats,
                               "# RECV_OK dropped: virtual link unknown",
@@ -4323,6 +4399,12 @@ queue_send_msg (struct Queue *queue,
       queue->idle = GNUNET_NO;
     if (0 == queue->q_capacity)
       queue->idle = GNUNET_NO;
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Sending message of type %u (%u) and size %u with MQ %p\n",
+                ntohs (((const struct GNUNET_MessageHeader *) payload)->type),
+                ntohs (smt->header.size),
+                payload_size,
+                queue->tc->mq);
     GNUNET_MQ_send (queue->tc->mq, env);
   }
 }
@@ -4684,6 +4766,7 @@ encapsulate_for_dv (struct DistanceVector *dv,
     struct GNUNET_PeerIdentity *dhops;
 
     box_hdr.header.size = htons (sizeof(buf));
+    box_hdr.orig_size = htons (sizeof(buf));
     box_hdr.num_hops = htons (num_hops);
     memcpy (buf, &box_hdr, sizeof(box_hdr));
     dhops = (struct GNUNET_PeerIdentity *) &buf[sizeof(box_hdr)];
@@ -4741,7 +4824,7 @@ send_dv_to_neighbour (void *cls,
                       enum RouteMessageOptions options)
 {
   (void) cls;
-  (void) route_via_neighbour (next_hop, hdr, options);
+  (void) route_via_neighbour (next_hop, hdr, RMO_UNCONFIRMED_ALLOWED);
 }
 
 
@@ -4776,7 +4859,7 @@ route_control_message_without_fc (struct VirtualLink *vl,
 
   // TODO Do this elsewhere. vl should be given as parameter to method.
   // vl = lookup_virtual_link (target);
-  GNUNET_assert (NULL != vl);
+  GNUNET_assert (NULL != vl && GNUNET_YES == vl->confirmed);
   if (NULL == vl)
     return GNUNET_TIME_UNIT_FOREVER_REL;
   n = vl->n;
@@ -4916,7 +4999,7 @@ consider_sending_fc (void *cls)
   fc.outbound_sent = GNUNET_htonll (vl->outbound_fc_window_size_used);
   fc.outbound_window_size = GNUNET_htonll (vl->outbound_fc_window_size);
   fc.sender_time = GNUNET_TIME_absolute_hton (monotime);
-  rtt = route_control_message_without_fc (vl, &fc.header, RMO_NONE);
+  rtt = route_control_message_without_fc (vl, &fc.header, RMO_DV_ALLOWED);
   if (GNUNET_TIME_UNIT_FOREVER_REL.rel_value_us == rtt.rel_value_us)
   {
     rtt = GNUNET_TIME_UNIT_SECONDS;
@@ -4933,8 +5016,14 @@ consider_sending_fc (void *cls)
   }
   if (NULL != vl->fc_retransmit_task)
     GNUNET_SCHEDULER_cancel (vl->fc_retransmit_task);
+  if (MAX_FC_RETRANSMIT_COUNT == vl->fc_retransmit_count)
+  {
+    rtt = GNUNET_TIME_UNIT_MINUTES;
+    vl->fc_retransmit_count = 0;
+  }
   vl->fc_retransmit_task =
     GNUNET_SCHEDULER_add_delayed (rtt, &task_consider_sending_fc, vl);
+  vl->fc_retransmit_count++;
 }
 
 
@@ -4960,14 +5049,20 @@ check_vl_transmission (struct VirtualLink *vl)
   struct Neighbour *n = vl->n;
   struct DistanceVector *dv = vl->dv;
   struct GNUNET_TIME_Absolute now;
+  struct VirtualLink *vl_next_hop;
   int elig;
 
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "check_vl_transmission to target %s\n",
+              GNUNET_i2s (&vl->target));
   /* Check that we have an eligible pending message!
      (cheaper than having #transmit_on_queue() find out!) */
   elig = GNUNET_NO;
   for (struct PendingMessage *pm = vl->pending_msg_head; NULL != pm;
        pm = pm->next_vl)
   {
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "check_vl_transmission loop\n");
     if (NULL != pm->qe)
       continue;   /* not eligible, is in a queue! */
     if (pm->bytes_msg + vl->outbound_fc_window_size_used >
@@ -4983,62 +5078,96 @@ check_vl_transmission (struct VirtualLink *vl)
       consider_sending_fc (vl);
       return;     /* We have a message, but flow control says "nope" */
     }
-    elig = GNUNET_YES;
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "Eligible message %lu of size %llu to %s: %llu/%llu\n",
-                pm->logging_uuid,
-                pm->bytes_msg,
-                GNUNET_i2s (&vl->target),
-                (unsigned long long) vl->outbound_fc_window_size,
-                (unsigned long long) (pm->bytes_msg
-                                      + vl->outbound_fc_window_size_used));
-    break;
-  }
-  if (GNUNET_NO == elig)
-    return;
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Not stalled. Scheduling transmission on queue\n");
-  /* Notify queues at direct neighbours that we are interested */
-  now = GNUNET_TIME_absolute_get ();
-  if (NULL != n)
-  {
-    for (struct Queue *queue = n->queue_head; NULL != queue;
-         queue = queue->next_neighbour)
-    {
-      if ((GNUNET_YES == queue->idle) &&
-          (queue->validated_until.abs_value_us > now.abs_value_us))
-        schedule_transmit_on_queue (queue, GNUNET_SCHEDULER_PRIORITY_DEFAULT);
-      else
-        GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                    "Neighbour Queue QID: %u (%u) busy or invalid\n",
-                    queue->qid,
-                    queue->idle);
-    }
-  }
-  /* Notify queues via DV that we are interested */
-  if (NULL != dv)
-  {
-    /* Do DV with lower scheduler priority, which effectively means that
-       IF a neighbour exists and is available, we prefer it. */
-    for (struct DistanceVectorHop *pos = dv->dv_head; NULL != pos;
-         pos = pos->next_dv)
+                "Target window on VL %s not stalled. Scheduling transmission 
on queue\n",
+                GNUNET_i2s (&vl->target));
+    /* Notify queues at direct neighbours that we are interested */
+    now = GNUNET_TIME_absolute_get ();
+    if (NULL != n)
     {
-      struct Neighbour *nh = pos->next_hop;
-
-      if (pos->path_valid_until.abs_value_us <= now.abs_value_us)
-        continue;     /* skip this one: path not validated */
-      for (struct Queue *queue = nh->queue_head; NULL != queue;
+      for (struct Queue *queue = n->queue_head; NULL != queue;
            queue = queue->next_neighbour)
+      {
         if ((GNUNET_YES == queue->idle) &&
             (queue->validated_until.abs_value_us > now.abs_value_us))
-          schedule_transmit_on_queue (queue,
-                                      GNUNET_SCHEDULER_PRIORITY_BACKGROUND);
+        {
+          GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                      "Direct neighbour %s not stalled\n",
+                      GNUNET_i2s (&n->pid));
+          schedule_transmit_on_queue (GNUNET_TIME_UNIT_ZERO,
+                                      queue,
+                                      GNUNET_SCHEDULER_PRIORITY_DEFAULT);
+          elig = GNUNET_YES;
+        }
         else
           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                      "DV Queue QID: %u (%u) busy or invalid\n",
+                      "Neighbour Queue QID: %u (%u) busy or invalid\n",
                       queue->qid,
                       queue->idle);
+      }
     }
+    /* Notify queues via DV that we are interested */
+    if (NULL != dv)
+    {
+      /* Do DV with lower scheduler priority, which effectively means that
+         IF a neighbour exists and is available, we prefer it. */
+      for (struct DistanceVectorHop *pos = dv->dv_head; NULL != pos;
+           pos = pos->next_dv)
+      {
+        struct Neighbour *nh = pos->next_hop;
+
+
+        if (pos->path_valid_until.abs_value_us <= now.abs_value_us)
+          continue;   /* skip this one: path not validated */
+        else
+        {
+          vl_next_hop = lookup_virtual_link (&nh->pid);
+          if (pm->bytes_msg + vl_next_hop->outbound_fc_window_size_used >
+              vl_next_hop->outbound_fc_window_size)
+          {
+            GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                        "Stalled message %lu transmission on next hop %s due 
to flow control: %llu < %llu\n",
+                        pm->logging_uuid,
+                        GNUNET_i2s (&vl_next_hop->target),
+                        (unsigned long
+                         long) vl_next_hop->outbound_fc_window_size,
+                        (unsigned long long) (pm->bytes_msg
+                                              + vl_next_hop->
+                                              outbound_fc_window_size_used));
+            consider_sending_fc (vl_next_hop);
+            continue; /* We have a message, but flow control says "nope" for 
the first hop of this path */
+          }
+          for (struct Queue *queue = nh->queue_head; NULL != queue;
+               queue = queue->next_neighbour)
+            if ((GNUNET_YES == queue->idle) &&
+                (queue->validated_until.abs_value_us > now.abs_value_us))
+            {
+              GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                          "Next hop neighbour %s not stalled\n",
+                          GNUNET_i2s (&nh->pid));
+              schedule_transmit_on_queue (GNUNET_TIME_UNIT_ZERO,
+                                          queue,
+                                          
GNUNET_SCHEDULER_PRIORITY_BACKGROUND);
+              elig = GNUNET_YES;
+            }
+            else
+              GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                          "DV Queue QID: %u (%u) busy or invalid\n",
+                          queue->qid,
+                          queue->idle);
+        }
+      }
+    }
+    if (GNUNET_YES == elig)
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                  "Eligible message %lu of size %llu to %s: %llu/%llu\n",
+                  pm->logging_uuid,
+                  pm->bytes_msg,
+                  GNUNET_i2s (&vl->target),
+                  (unsigned long long) vl->outbound_fc_window_size,
+                  (unsigned long long) (pm->bytes_msg
+                                        + vl->outbound_fc_window_size_used));
+    break;
   }
 }
 
@@ -5064,7 +5193,7 @@ handle_client_send (void *cls, const struct 
OutboundMessage *obm)
   bytes_msg = ntohs (obmm->size);
   pp = (enum GNUNET_MQ_PriorityPreferences) ntohl (obm->priority);
   vl = lookup_virtual_link (&obm->peer);
-  if (NULL == vl)
+  if ((NULL == vl) || (GNUNET_NO == vl->confirmed))
   {
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                 "Don't have %s as a neighbour (anymore).\n",
@@ -5154,7 +5283,7 @@ handle_communicator_backchannel (
           strlen (is) + 1);
   // route_control_message_without_fc (&cb->pid, &be->header, RMO_DV_ALLOWED);
   vl = lookup_virtual_link (&cb->pid);
-  if (NULL != vl)
+  if ((NULL != vl) && (GNUNET_YES == vl->confirmed))
   {
     route_control_message_without_fc (vl, &be->header, RMO_DV_ALLOWED);
   }
@@ -5428,7 +5557,7 @@ handle_raw_message (void *cls, const struct 
GNUNET_MessageHeader *mh)
     return;
   }
   vl = lookup_virtual_link (&cmc->im.sender);
-  if (NULL == vl)
+  if ((NULL == vl) || (GNUNET_NO == vl->confirmed))
   {
     /* FIXME: sender is giving us messages for CORE but we don't have
        the link up yet! I *suspect* this can happen right now (i.e.
@@ -5624,7 +5753,7 @@ transmit_cummulative_ack_cb (void *cls)
     &ack->header,
     RMO_DV_ALLOWED);*/
   vl = lookup_virtual_link (&ac->target);
-  if (NULL != vl)
+  if ((NULL != vl) && (GNUNET_YES == vl->confirmed))
   {
     route_control_message_without_fc (
       vl,
@@ -5765,7 +5894,7 @@ handle_fragment_box (void *cls, const struct 
TransportFragmentBoxMessage *fb)
   struct FindByMessageUuidContext fc;
 
   vl = lookup_virtual_link (&cmc->im.sender);
-  if (NULL == vl)
+  if ((NULL == vl) || (GNUNET_NO == vl->confirmed))
   {
     struct GNUNET_SERVICE_Client *client = cmc->tc->client;
 
@@ -5794,6 +5923,7 @@ handle_fragment_box (void *cls, const struct 
TransportFragmentBoxMessage *fb)
                                                        fb->msg_uuid.uuid,
                                                        &find_by_message_uuid,
                                                        &fc);
+  fsize = ntohs (fb->header.size) - sizeof(*fb);
   if (NULL == (rc = fc.rc))
   {
     rc = GNUNET_malloc (sizeof(*rc) + msize    /* reassembly payload buffer */
@@ -5815,11 +5945,16 @@ handle_fragment_box (void *cls, const struct 
TransportFragmentBoxMessage *fb)
                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
     target = (char *) &rc[1];
     rc->bitfield = (uint8_t *) (target + rc->msg_size);
-    rc->msg_missing = rc->msg_size;
+    if (fsize != rc->msg_size)
+      rc->msg_missing = rc->msg_size;
+    else
+      rc->msg_missing = 0;
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "Received fragment at offset %u/%u from %s for NEW message 
%u\n",
+                "Received fragment with size %u at offset %u/%u %u bytes 
missing from %s for NEW message %u\n",
+                fsize,
                 ntohs (fb->frag_off),
                 msize,
+                rc->msg_missing,
                 GNUNET_i2s (&cmc->im.sender),
                 (unsigned int) fb->msg_uuid.uuid);
   }
@@ -5841,7 +5976,6 @@ handle_fragment_box (void *cls, const struct 
TransportFragmentBoxMessage *fb)
   }
 
   /* reassemble */
-  fsize = ntohs (fb->header.size) - sizeof(*fb);
   if (0 == fsize)
   {
     GNUNET_break (0);
@@ -5918,6 +6052,16 @@ check_reliability_box (void *cls,
                        const struct TransportReliabilityBoxMessage *rb)
 {
   (void) cls;
+  const struct GNUNET_MessageHeader *inbox =  (const struct
+                                               GNUNET_MessageHeader *) &rb[1];
+
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "check_send_msg with size %u: inner msg type %u and size %u (%u 
%u)\n",
+              ntohs (rb->header.size),
+              ntohs (inbox->type),
+              ntohs (inbox->size),
+              sizeof (struct TransportReliabilityBoxMessage),
+              sizeof (struct GNUNET_MessageHeader));
   GNUNET_MQ_check_boxed_message (rb);
   return GNUNET_YES;
 }
@@ -6060,6 +6204,10 @@ completed_pending_message (struct PendingMessage *pm)
 {
   struct PendingMessage *pos;
 
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Complete transmission of message %llu %u\n",
+              pm->logging_uuid,
+              pm->pmt);
   switch (pm->pmt)
   {
   case PMT_CORE:
@@ -6069,7 +6217,7 @@ completed_pending_message (struct PendingMessage *pm)
     return;
 
   case PMT_FRAGMENT_BOX:
-    /* Fragment sent over reliabile channel */
+    /* Fragment sent over reliable channel */
     free_fragment_tree (pm);
     pos = pm->frag_parent;
     GNUNET_CONTAINER_MDLL_remove (frag, pos->head_frag, pos->tail_frag, pm);
@@ -6080,11 +6228,17 @@ completed_pending_message (struct PendingMessage *pm)
     {
       pm = pos;
       pos = pm->frag_parent;
+      if (PMT_DV_BOX == pm->pmt)
+      {
+        GNUNET_free (pm);
+        client_send_response (pos);
+        return;
+      }
       GNUNET_CONTAINER_MDLL_remove (frag, pos->head_frag, pos->tail_frag, pm);
       GNUNET_free (pm);
     }
 
-    /* Was this the last applicable fragmment? */
+    /* Was this the last applicable fragment? */
     if ((NULL == pos->head_frag) && (NULL == pos->frag_parent) &&
         (pos->frag_off == pos->bytes_msg))
       client_send_response (pos);
@@ -6293,6 +6447,7 @@ handle_backchannel_encapsulation (
       target_communicator);
     GNUNET_STATISTICS_update (GST_stats, stastr, 1, GNUNET_NO);
     GNUNET_free (stastr);
+    finish_cmc_handling (cmc);
     return;
   }
   /* Finally, deliver backchannel message to communicator */
@@ -6309,6 +6464,7 @@ handle_backchannel_encapsulation (
   cbi->pid = cmc->im.sender;
   memcpy (&cbi[1], inbox, isize);
   GNUNET_MQ_send (tc->mq, env);
+  finish_cmc_handling (cmc);
 }
 
 
@@ -6359,39 +6515,54 @@ activate_core_visible_dv_path (struct DistanceVectorHop 
*hop)
   struct VirtualLink *vl;
 
   vl = lookup_virtual_link (&dv->target);
-  if (NULL != vl)
+  if (NULL == vl)
   {
-    /* Link was already up, remember dv is also now available and we are done 
*/
-    vl->dv = dv;
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "Virtual link to %s could now also use DV!\n",
+                "Creating new virtual link to %s using DV!\n",
                 GNUNET_i2s (&dv->target));
-    return;
+    vl = GNUNET_new (struct VirtualLink);
+    vl->confirmed = GNUNET_YES;
+    vl->message_uuid_ctr =
+      GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK, UINT64_MAX);
+    vl->target = dv->target;
+    vl->core_recv_window = RECV_WINDOW_SIZE;
+    vl->available_fc_window_size = DEFAULT_WINDOW_SIZE;
+    vl->incoming_fc_window_size = DEFAULT_WINDOW_SIZE;
+    GNUNET_break (GNUNET_YES ==
+                  GNUNET_CONTAINER_multipeermap_put (
+                    links,
+                    &vl->target,
+                    vl,
+                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
+    vl->dv = dv;
+    dv->vl = vl;
+    vl->visibility_task =
+      GNUNET_SCHEDULER_add_at (hop->path_valid_until, &check_link_down, vl);
+    consider_sending_fc (vl);
+    /* We lacked a confirmed connection to the target
+       before, so tell CORE about it (finally!) */
+    cores_send_connect_info (&dv->target);
+  }
+  else
+  {
+    /* Link was already up, remember dv is also now available and we are done 
*/
+    vl->dv = dv;
+    dv->vl = vl;
+    if (GNUNET_NO == vl->confirmed)
+    {
+      vl->confirmed = GNUNET_YES;
+      vl->visibility_task =
+        GNUNET_SCHEDULER_add_at (hop->path_valid_until, &check_link_down, vl);
+      consider_sending_fc (vl);
+      /* We lacked a confirmed connection to the target
+         before, so tell CORE about it (finally!) */
+      cores_send_connect_info (&dv->target);
+    }
+    else
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                  "Virtual link to %s could now also use DV!\n",
+                  GNUNET_i2s (&dv->target));
   }
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Creating new virtual link to %s using DV!\n",
-              GNUNET_i2s (&dv->target));
-  vl = GNUNET_new (struct VirtualLink);
-  vl->message_uuid_ctr =
-    GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK, UINT64_MAX);
-  vl->target = dv->target;
-  vl->dv = dv;
-  dv->vl = vl;
-  vl->core_recv_window = RECV_WINDOW_SIZE;
-  vl->available_fc_window_size = DEFAULT_WINDOW_SIZE;
-  vl->incoming_fc_window_size = DEFAULT_WINDOW_SIZE;
-  vl->visibility_task =
-    GNUNET_SCHEDULER_add_at (hop->path_valid_until, &check_link_down, vl);
-  GNUNET_break (GNUNET_YES ==
-                GNUNET_CONTAINER_multipeermap_put (
-                  links,
-                  &vl->target,
-                  vl,
-                  GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
-  consider_sending_fc (vl);
-  /* We lacked a confirmed connection to the target
-     before, so tell CORE about it (finally!) */
-  cores_send_connect_info (&dv->target);
 }
 
 
@@ -6530,8 +6701,10 @@ learn_dv_path (const struct GNUNET_PeerIdentity *path,
           return GNUNET_NO;
         }
         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                    "Refreshed known path to %s, forwarding further\n",
-                    GNUNET_i2s (&dv->target));
+                    "Refreshed known path to %s valid until %s, forwarding 
further\n",
+                    GNUNET_i2s (&dv->target),
+                    GNUNET_STRINGS_absolute_time_to_string (
+                      pos->path_valid_until));
         return GNUNET_YES;
       }
     }
@@ -6548,8 +6721,9 @@ learn_dv_path (const struct GNUNET_PeerIdentity *path,
   }
   /* create new DV path entry */
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Discovered new DV path to %s\n",
-              GNUNET_i2s (&dv->target));
+              "Discovered new DV path to %s valid until %s\n",
+              GNUNET_i2s (&dv->target),
+              GNUNET_STRINGS_absolute_time_to_string (path_valid_until));
   hop = GNUNET_malloc (sizeof(struct DistanceVectorHop)
                        + sizeof(struct GNUNET_PeerIdentity) * (path_len - 3));
   hop->next_hop = next_hop;
@@ -6680,7 +6854,7 @@ forward_dv_learn (const struct GNUNET_PeerIdentity 
*next_hop,
                                     &fwd->header,
                                     RMO_UNCONFIRMED_ALLOWED);*/
   vl = lookup_virtual_link (next_hop);
-  if (NULL != vl)
+  if ((NULL != vl) && (GNUNET_YES == vl->confirmed))
   {
     route_control_message_without_fc (vl,
                                       &fwd->header,
@@ -6977,13 +7151,6 @@ handle_dv_learn (void *cls, const struct 
TransportDVLearnMessage *dvl)
   struct GNUNET_TIME_Absolute in_time;
   struct Neighbour *n;
 
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "handle dv learn message from %s\n",
-              GNUNET_i2s (&dvl->initiator));
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "handle dv learn message sender %s\n",
-              GNUNET_i2s (&cmc->im.sender));
-
   nhops = ntohs (dvl->num_hops);  /* 0 = sender is initiator */
   bi_history = ntohs (dvl->bidirectional);
   hops = (const struct DVPathEntryP *) &dvl[1];
@@ -7017,10 +7184,6 @@ handle_dv_learn (void *cls, const struct 
TransportDVLearnMessage *dvl)
             cc); // FIXME: add bi-directional flag to cc?
   in_time = GNUNET_TIME_absolute_get ();
 
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "2 handle dv learn message from %s\n",
-              GNUNET_i2s (&dvl->initiator));
-
   /* continue communicator here, everything else can happen asynchronous! */
   finish_cmc_handling (cmc);
 
@@ -7055,7 +7218,11 @@ handle_dv_learn (void *cls, const struct 
TransportDVLearnMessage *dvl)
     if (GNUNET_YES == n->dv_monotime_available)
     {
       if (NULL != n->sc)
+      {
+        GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                    "store cancel\n");
         GNUNET_PEERSTORE_store_cancel (n->sc);
+      }
       n->sc =
         GNUNET_PEERSTORE_store (peerstore,
                                 "transport",
@@ -7069,9 +7236,6 @@ handle_dv_learn (void *cls, const struct 
TransportDVLearnMessage *dvl)
                                 n);
     }
   }
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "3 handle dv learn message from %s\n",
-              GNUNET_i2s (&dvl->initiator));
   /* OPTIMIZE-FIXME: asynchronously (!) verify signatures!,
      If signature verification load too high, implement random drop strategy */
   for (unsigned int i = 0; i < nhops; i++)
@@ -7110,9 +7274,6 @@ handle_dv_learn (void *cls, const struct 
TransportDVLearnMessage *dvl)
       return;
     }
   }
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "4 handle dv learn message from %s\n",
-              GNUNET_i2s (&dvl->initiator));
   if (GNUNET_EXTRA_LOGGING > 0)
   {
     char *path;
@@ -7137,9 +7298,6 @@ handle_dv_learn (void *cls, const struct 
TransportDVLearnMessage *dvl)
                 GNUNET_i2s (&GST_my_identity));
     GNUNET_free (path);
   }
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "5 handle dv learn message from %s\n",
-              GNUNET_i2s (&dvl->initiator));
   do_fwd = GNUNET_YES;
   if (0 == GNUNET_memcmp (&GST_my_identity, &dvl->initiator))
   {
@@ -7147,6 +7305,7 @@ handle_dv_learn (void *cls, const struct 
TransportDVLearnMessage *dvl)
     struct GNUNET_TIME_Relative host_latency_sum;
     struct GNUNET_TIME_Relative latency;
     struct GNUNET_TIME_Relative network_latency;
+    struct GNUNET_TIME_Absolute now;
 
     /* We initiated this, learn the forward path! */
     path[0] = GST_my_identity;
@@ -7155,7 +7314,11 @@ handle_dv_learn (void *cls, const struct 
TransportDVLearnMessage *dvl)
 
     // Need also something to lookup initiation time
     // to compute RTT! -> add RTT argument here?
-    latency = GNUNET_TIME_UNIT_FOREVER_REL;   // FIXME: initialize properly
+    now = GNUNET_TIME_absolute_get ();
+    latency = GNUNET_TIME_absolute_get_duration (GNUNET_TIME_absolute_ntoh (
+                                                   dvl->monotonic_time));
+    GNUNET_assert (latency.rel_value_us >= host_latency_sum.rel_value_us);
+    // latency = GNUNET_TIME_UNIT_FOREVER_REL;   // FIXME: initialize properly
     // (based on dvl->challenge, we can identify time of origin!)
 
     network_latency = GNUNET_TIME_relative_subtract (latency, 
host_latency_sum);
@@ -7184,9 +7347,6 @@ handle_dv_learn (void *cls, const struct 
TransportDVLearnMessage *dvl)
     do_fwd = GNUNET_NO;
     return;
   }
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "6 handle dv learn message from %s\n",
-              GNUNET_i2s (&dvl->initiator));
   if (bi_hop)
   {
     /* last hop was bi-directional, we could learn something here! */
@@ -7243,9 +7403,6 @@ handle_dv_learn (void *cls, const struct 
TransportDVLearnMessage *dvl)
       }
     }
   }
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "7 handle dv learn message from %s\n",
-              GNUNET_i2s (&dvl->initiator));
   if (MAX_DV_HOPS_ALLOWED == nhops)
   {
     /* At limit, we're out of here! */
@@ -7305,9 +7462,6 @@ handle_dv_learn (void *cls, const struct 
TransportDVLearnMessage *dvl)
                                            &dv_neighbour_transmission,
                                            &nsc);
   }
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "9 handle dv learn message from %s\n",
-              GNUNET_i2s (&dvl->initiator));
 }
 
 
@@ -7374,10 +7528,17 @@ forward_dv_box (struct Neighbour *next_hop,
   char msg_buf[msg_size] GNUNET_ALIGN;
   struct GNUNET_PeerIdentity *dhops;
 
+  if (GNUNET_NO == ntohs (hdr->without_fc))
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "forward dv box without fc\n");
+  if (NULL == vl)
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "forward dv box vl null\n");
   GNUNET_assert (GNUNET_YES == ntohs (hdr->without_fc) || NULL != vl);
 
   hdr->num_hops = htons (num_hops);
   hdr->total_hops = htons (total_hops);
+  hdr->header.size = htons (msg_size);
   memcpy (msg_buf, hdr, sizeof(*hdr));
   dhops = (struct GNUNET_PeerIdentity *) &msg_buf[sizeof(struct
                                                          
TransportDVBoxMessage)];
@@ -7387,7 +7548,8 @@ forward_dv_box (struct Neighbour *next_hop,
   if (GNUNET_YES == ntohs (hdr->without_fc))
   {
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "Forwarding control message in DV Box to next hop %s (%u/%u) 
\n",
+                "Forwarding control message (payload size %u) in DV Box to 
next hop %s (%u/%u) \n",
+                enc_payload_size,
                 GNUNET_i2s (&next_hop->pid),
                 (unsigned int) num_hops,
                 (unsigned int) total_hops);
@@ -7444,6 +7606,8 @@ free_backtalker (struct Backtalker *b)
   }
   if (NULL != b->sc)
   {
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "store cancel\n");
     GNUNET_PEERSTORE_store_cancel (b->sc);
     b->sc = NULL;
   }
@@ -7486,6 +7650,8 @@ backtalker_timeout_cb (void *cls)
 {
   struct Backtalker *b = cls;
 
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "backtalker timeout.\n");
   b->task = NULL;
   if (0 != GNUNET_TIME_absolute_get_remaining (b->timeout).rel_value_us)
   {
@@ -7589,6 +7755,8 @@ update_backtalker_monotime (struct Backtalker *b)
 
   if (NULL != b->sc)
   {
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "store cancel\n");
     GNUNET_PEERSTORE_store_cancel (b->sc);
     b->sc = NULL;
   }
@@ -7714,8 +7882,8 @@ handle_dv_box (void *cls, const struct 
TransportDVBoxMessage *dvb)
 
   dh_key_derive_eph_pub (&dvb->ephemeral_key, &dvb->iv, key);
   hdr = (const char *) &dvb[1];
-  hdr_len = ntohs (dvb->header.size) - sizeof(*dvb) - sizeof(struct
-                                                             
GNUNET_PeerIdentity)
+  hdr_len = ntohs (dvb->orig_size) - sizeof(*dvb) - sizeof(struct
+                                                           GNUNET_PeerIdentity)
             * ntohs (dvb->total_hops);
 
   dv_hmac (key, &hmac, hdr, hdr_len);
@@ -8112,7 +8280,7 @@ handle_validation_challenge (
   }
   sender = cmc->im.sender;
   vl = lookup_virtual_link (&sender);
-  if (NULL != vl)
+  if ((NULL != vl) && (GNUNET_YES == vl->confirmed))
   {
     // route_control_message_without_fc (&cmc->im.sender,
     route_control_message_without_fc (vl,
@@ -8380,46 +8548,63 @@ handle_validation_response (
   q->pd.aged_rtt = vs->validation_rtt;
   n = q->neighbour;
   vl = lookup_virtual_link (&vs->pid);
-  if (NULL != vl)
+  if (NULL == vl)
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Creating new virtual link to %s using direct neighbour!\n",
+                GNUNET_i2s (&vs->pid));
+    vl = GNUNET_new (struct VirtualLink);
+    vl->confirmed = GNUNET_YES;
+    vl->message_uuid_ctr =
+      GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK, UINT64_MAX);
+    vl->target = n->pid;
+    vl->core_recv_window = RECV_WINDOW_SIZE;
+    vl->available_fc_window_size = DEFAULT_WINDOW_SIZE;
+    vl->incoming_fc_window_size = DEFAULT_WINDOW_SIZE;
+    GNUNET_break (GNUNET_YES ==
+                  GNUNET_CONTAINER_multipeermap_put (
+                    links,
+                    &vl->target,
+                    vl,
+                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
+    vl->n = n;
+    n->vl = vl;
+    q->idle = GNUNET_YES;
+    vl->visibility_task =
+      GNUNET_SCHEDULER_add_at (q->validated_until, &check_link_down, vl);
+    consider_sending_fc (vl);
+    /* We lacked a confirmed connection to the target
+       before, so tell CORE about it (finally!) */
+    cores_send_connect_info (&n->pid);
+  }
+  else
   {
     /* Link was already up, remember n is also now available and we are done */
     if (NULL == vl->n)
     {
       vl->n = n;
       n->vl = vl;
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                  "Virtual link to %s could now also direct neighbour!\n",
-                  GNUNET_i2s (&vs->pid));
+      if (GNUNET_YES == vl->confirmed)
+        GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                    "Virtual link to %s could now also direct neighbour!\n",
+                    GNUNET_i2s (&vs->pid));
     }
     else
     {
       GNUNET_assert (n == vl->n);
     }
-    return;
+    if (GNUNET_NO == vl->confirmed)
+    {
+      vl->confirmed = GNUNET_YES;
+      q->idle = GNUNET_YES;
+      vl->visibility_task =
+        GNUNET_SCHEDULER_add_at (q->validated_until, &check_link_down, vl);
+      consider_sending_fc (vl);
+      /* We lacked a confirmed connection to the target
+         before, so tell CORE about it (finally!) */
+      cores_send_connect_info (&n->pid);
+    }
   }
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Creating new virtual link to %s using direct neighbour!\n",
-              GNUNET_i2s (&vs->pid));
-  vl = GNUNET_new (struct VirtualLink);
-  vl->target = n->pid;
-  vl->n = n;
-  n->vl = vl;
-  q->idle = GNUNET_YES;
-  vl->core_recv_window = RECV_WINDOW_SIZE;
-  vl->available_fc_window_size = DEFAULT_WINDOW_SIZE;
-  vl->incoming_fc_window_size = DEFAULT_WINDOW_SIZE;
-  vl->visibility_task =
-    GNUNET_SCHEDULER_add_at (q->validated_until, &check_link_down, vl);
-  GNUNET_break (GNUNET_YES ==
-                GNUNET_CONTAINER_multipeermap_put (
-                  links,
-                  &vl->target,
-                  vl,
-                  GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
-  consider_sending_fc (vl);
-  /* We lacked a confirmed connection to the target
-     before, so tell CORE about it (finally!) */
-  cores_send_connect_info (&n->pid);
 }
 
 
@@ -8462,6 +8647,7 @@ handle_flow_control (void *cls, const struct 
TransportFlowControlMessage *fc)
   struct GNUNET_TIME_Absolute st;
   uint64_t os;
   uint64_t wnd;
+  uint32_t random;
 
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Received FC from %s\n", GNUNET_i2s (&cmc->im.sender));
@@ -8469,13 +8655,22 @@ handle_flow_control (void *cls, const struct 
TransportFlowControlMessage *fc)
   if (NULL == vl)
   {
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "FC dropped: VL unknown\n");
-    GNUNET_STATISTICS_update (GST_stats,
-                              "# FC dropped: Virtual link unknown",
-                              1,
-                              GNUNET_NO);
-    finish_cmc_handling (cmc);
-    return;
+                "No virtual link for FC creating new unconfirmed virtual link 
to %s!\n",
+                GNUNET_i2s (&cmc->im.sender));
+    vl = GNUNET_new (struct VirtualLink);
+    vl->confirmed = GNUNET_NO;
+    vl->message_uuid_ctr =
+      GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK, UINT64_MAX);
+    vl->target = cmc->im.sender;
+    vl->core_recv_window = RECV_WINDOW_SIZE;
+    vl->available_fc_window_size = DEFAULT_WINDOW_SIZE;
+    vl->incoming_fc_window_size = DEFAULT_WINDOW_SIZE;
+    GNUNET_break (GNUNET_YES ==
+                  GNUNET_CONTAINER_multipeermap_put (
+                    links,
+                    &vl->target,
+                    vl,
+                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
   }
   st = GNUNET_TIME_absolute_ntoh (fc->sender_time);
   if (st.abs_value_us < vl->last_fc_timestamp.abs_value_us)
@@ -8509,15 +8704,20 @@ handle_flow_control (void *cls, const struct 
TransportFlowControlMessage *fc)
               (unsigned long long) vl->outbound_fc_window_size,
               (long long) vl->incoming_fc_window_size_loss);
   wnd = GNUNET_ntohll (fc->outbound_window_size);
-  if ((wnd < vl->incoming_fc_window_size) ||
-      (vl->last_outbound_window_size_received != wnd) ||
-      (0 == GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, UINT32_MAX)
-       % FC_NO_CHANGE_REPLY_PROBABILITY))
+  random = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
+                                     UINT32_MAX);
+  if ((GNUNET_YES == vl->confirmed) && ((wnd < vl->incoming_fc_window_size) ||
+                                        (vl->last_outbound_window_size_received
+                                         != wnd) ||
+                                        (0 == random
+                                         % FC_NO_CHANGE_REPLY_PROBABILITY)))
   {
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "Consider re-sending our FC message, as clearly the other 
peer's idea of the window is not up-to-date (%llu vs %llu)\n",
+                "Consider re-sending our FC message, as clearly the other 
peer's idea of the window is not up-to-date (%llu vs %llu) or %llu last 
received differs, or random reply %lu\n",
                 (unsigned long long) wnd,
-                (unsigned long long) vl->incoming_fc_window_size);
+                (unsigned long long) vl->incoming_fc_window_size,
+                (unsigned long long) vl->last_outbound_window_size_received,
+                random % FC_NO_CHANGE_REPLY_PROBABILITY);
     consider_sending_fc (vl);
   }
   if ((wnd == vl->incoming_fc_window_size) &&
@@ -8530,6 +8730,7 @@ handle_flow_control (void *cls, const struct 
TransportFlowControlMessage *fc)
                 (unsigned long long) wnd);
     GNUNET_SCHEDULER_cancel (vl->fc_retransmit_task);
     vl->fc_retransmit_task = NULL;
+    vl->fc_retransmit_count = 0;
   }
   vl->last_outbound_window_size_received = wnd;
   /* FC window likely increased, check transmission possibilities! */
@@ -8729,7 +8930,6 @@ fragment_message (struct Queue *queue,
               pm->bytes_msg,
               GNUNET_i2s (&pm->vl->target),
               (unsigned int) mtu);
-  pa = prepare_pending_acknowledgement (queue, dvh, pm);
 
   /* This invariant is established in #handle_add_queue_message() */
   GNUNET_assert (mtu > sizeof(struct TransportFragmentBoxMessage));
@@ -8745,7 +8945,7 @@ fragment_message (struct Queue *queue,
     ff = ff->head_frag;   /* descent into fragmented fragments */
   }
 
-  if (((ff->bytes_msg > mtu) || (pm == ff)) && (pm->frag_off < pm->bytes_msg))
+  if (((ff->bytes_msg > mtu) || (pm == ff)) && (ff->frag_off < ff->bytes_msg))
   {
     /* Did not yet calculate all fragments, calculate next fragment */
     struct PendingMessage *frag;
@@ -8783,13 +8983,15 @@ fragment_message (struct Queue *queue,
     tfb.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT);
     tfb.header.size =
       htons (sizeof(struct TransportFragmentBoxMessage) + fragsize);
+    pa = prepare_pending_acknowledgement (queue, dvh, frag);
     tfb.ack_uuid = pa->ack_uuid;
     tfb.msg_uuid = pm->msg_uuid;
     tfb.frag_off = htons (ff->frag_off + xoff);
     tfb.msg_size = htons (pm->bytes_msg);
     memcpy (msg, &tfb, sizeof(tfb));
     memcpy (&msg[sizeof(tfb)], &orig[ff->frag_off], fragsize);
-    GNUNET_CONTAINER_MDLL_insert (frag, ff->head_frag, ff->tail_frag, frag);
+    GNUNET_CONTAINER_MDLL_insert (frag, ff->head_frag,
+                                  ff->tail_frag, frag);
     ff->frag_off += fragsize;
     ff = frag;
   }
@@ -8803,6 +9005,7 @@ fragment_message (struct Queue *queue,
                                      ff->frag_parent->head_frag,
                                      ff->frag_parent->tail_frag,
                                      ff);
+
   return ff;
 }
 
@@ -8829,7 +9032,7 @@ reliability_box_message (struct Queue *queue,
   struct PendingMessage *bpm;
   char *msg;
 
-  if (PMT_CORE != pm->pmt)
+  if ((PMT_CORE != pm->pmt) && (PMT_DV_BOX != pm->pmt))
     return pm; /* already fragmented or reliability boxed, or control message:
                   do nothing */
   if (NULL != pm->bpm)
@@ -8842,11 +9045,7 @@ reliability_box_message (struct Queue *queue,
     client_send_response (pm);
     return NULL;
   }
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Preparing reliability box for message <%llu> to %s on queue 
%s\n",
-              pm->logging_uuid,
-              GNUNET_i2s (&pm->vl->target),
-              queue->address);
+
   pa = prepare_pending_acknowledgement (queue, dvh, pm);
 
   bpm = GNUNET_malloc (sizeof(struct PendingMessage) + sizeof(rbox)
@@ -8869,6 +9068,13 @@ reliability_box_message (struct Queue *queue,
   memcpy (msg, &rbox, sizeof(rbox));
   memcpy (&msg[sizeof(rbox)], &pm[1], pm->bytes_msg);
   pm->bpm = bpm;
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Preparing reliability box for message <%llu> of size %lu (%lu) 
to %s on queue %s\n",
+              pm->logging_uuid,
+              pm->bytes_msg,
+              ntohs (((const struct GNUNET_MessageHeader *) &pm[1])->size),
+              GNUNET_i2s (&pm->vl->target),
+              queue->address);
   return bpm;
 }
 
@@ -8911,6 +9117,7 @@ update_pm_next_attempt (struct PendingMessage *pm,
 {
   struct VirtualLink *vl = pm->vl;
 
+  // TODO Do we really need a next_attempt value for PendingMessage other than 
the root Pending Message?
   pm->next_attempt = next_attempt;
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Next attempt for message <%llu> set to %s\n",
@@ -8921,9 +9128,17 @@ update_pm_next_attempt (struct PendingMessage *pm,
   {
     reorder_root_pm (pm, next_attempt);
   }
-  else if ((PMT_RELIABILITY_BOX == pm->pmt)||(PMT_DV_BOX == pm->pmt))
+  else if ((PMT_RELIABILITY_BOX == pm->pmt) || (PMT_DV_BOX == pm->pmt))
   {
-    reorder_root_pm (pm->frag_parent, next_attempt);
+    struct PendingMessage *root = pm->frag_parent;
+
+    while (NULL != root->frag_parent)
+      root = root->frag_parent;
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Next attempt for root message <%llu> set to %s\n",
+                root->logging_uuid);
+    root->next_attempt = next_attempt;
+    reorder_root_pm (root, next_attempt);
   }
   else
   {
@@ -8988,6 +9203,16 @@ struct PendingMessageScoreContext
    * Did we have to reliability box?
    */
   int relb;
+
+  /**
+   * There are pending messages, but it was to early to send one of them.
+   */
+  int to_early;
+
+  /**
+   * When will we try to transmit the message again for which it was to early 
to retry.
+   */
+  struct GNUNET_TIME_Relative to_early_retry_delay;
 };
 
 
@@ -9020,14 +9245,32 @@ select_best_pending_from_link (struct 
PendingMessageScoreContext *sc,
     int relb;
 
     if ((NULL != dvh) && (PMT_DV_BOX == pos->pmt))
+    {
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                  "DV messages must not be DV-routed to next hop!\n");
       continue;   /* DV messages must not be DV-routed to next hop! */
+    }
     if (pos->next_attempt.abs_value_us > now.abs_value_us)
+    {
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                  "too early for all messages, they are sorted by 
next_attempt\n");
+      sc->to_early = GNUNET_YES;
+
       break;   /* too early for all messages, they are sorted by next_attempt 
*/
+    }
+    sc->to_early = GNUNET_NO;
     if (NULL != pos->qe)
+    {
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                  "not eligible\n");
       continue;   /* not eligible */
+    }
     sc->consideration_counter++;
     /* determine if we have to fragment, if so add fragmentation
        overhead! */
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "check %u for sc->best\n",
+                pos->logging_uuid);
     frag = GNUNET_NO;
     if (((0 != queue->mtu) &&
          (pos->bytes_msg + real_overhead > queue->mtu)) ||
@@ -9038,6 +9281,10 @@ select_best_pending_from_link (struct 
PendingMessageScoreContext *sc,
                                      respect that even if MTU is UINT16_MAX for
                                      this queue */))
     {
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                  "fragment msg with size %u, realoverhead is %u\n",
+                  pos->bytes_msg,
+                  real_overhead);
       frag = GNUNET_YES;
       if (GNUNET_TRANSPORT_CC_RELIABLE == queue->tc->details.communicator.cc)
       {
@@ -9064,7 +9311,10 @@ select_best_pending_from_link (struct 
PendingMessageScoreContext *sc,
       {
         relb = GNUNET_YES;
       }
-
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                  "Create reliability box of msg with size %u, realoverhead is 
%u\n",
+                  pos->bytes_msg,
+                  real_overhead);
     }
 
     /* Finally, compare to existing 'best' in sc to see if this 'pos' pending
@@ -9107,7 +9357,13 @@ select_best_pending_from_link (struct 
PendingMessageScoreContext *sc,
           pm_score -= queue->mtu - (real_overhead + pos->bytes_msg);
       }
       if (sc_score + time_delta > pm_score)
+      {
+        GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                    "sc_score of %u larger, keep sc->best %u\n",
+                    pos->logging_uuid,
+                    sc->best->logging_uuid);
         continue;     /* sc_score larger, keep sc->best */
+      }
     }
     sc->best = pos;
     sc->dvh = dvh;
@@ -9216,6 +9472,10 @@ transmit_on_queue (void *cls)
                 "No pending messages, queue `%s' to %s now idle\n",
                 queue->address,
                 GNUNET_i2s (&n->pid));
+    if (GNUNET_YES == sc.to_early)
+      schedule_transmit_on_queue (sc.to_early_retry_delay,
+                                  queue,
+                                  GNUNET_SCHEDULER_PRIORITY_DEFAULT);
     queue->idle = GNUNET_YES;
     return;
   }
@@ -9226,9 +9486,17 @@ transmit_on_queue (void *cls)
   pm = sc.best;
   if (NULL != sc.dvh)
   {
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Is this %u a DV box?\n",
+                pm->pmt);
     GNUNET_assert (PMT_DV_BOX != pm->pmt);
     if (NULL != sc.best->bpm)
     {
+      const struct DVPathEntryP *hops_old;
+      const struct DVPathEntryP *hops_selected;
+
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                  "Discard old box\n");
       /* We did this boxing before, but possibly for a different path!
          Discard old DV box!  OPTIMIZE-ME: we might want to check if
          it is the same and then not re-build the message... */
@@ -9246,6 +9514,13 @@ transmit_on_queue (void *cls)
                         RMO_NONE,
                         GNUNET_NO);
     GNUNET_assert (NULL != sc.best->bpm);
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "%u %u %u %u %u\n",
+                sizeof(struct GNUNET_PeerIdentity),
+                sizeof(struct TransportDVBoxMessage),
+                sizeof(struct TransportDVBoxPayloadP),
+                sizeof(struct TransportFragmentBoxMessage),
+                ((const struct GNUNET_MessageHeader *) &sc.best[1])->size);
     pm = sc.best->bpm;
   }
   if (GNUNET_YES == sc.frag)
@@ -9258,7 +9533,9 @@ transmit_on_queue (void *cls)
                   queue->address,
                   GNUNET_i2s (&n->pid),
                   sc.best->logging_uuid);
-      schedule_transmit_on_queue (queue, GNUNET_SCHEDULER_PRIORITY_DEFAULT);
+      schedule_transmit_on_queue (GNUNET_TIME_UNIT_ZERO,
+                                  queue,
+                                  GNUNET_SCHEDULER_PRIORITY_DEFAULT);
       return;
     }
   }
@@ -9274,7 +9551,9 @@ transmit_on_queue (void *cls)
         queue->address,
         GNUNET_i2s (&n->pid),
         sc.best->logging_uuid);
-      schedule_transmit_on_queue (queue, GNUNET_SCHEDULER_PRIORITY_DEFAULT);
+      schedule_transmit_on_queue (GNUNET_TIME_UNIT_ZERO,
+                                  queue,
+                                  GNUNET_SCHEDULER_PRIORITY_DEFAULT);
       return;
     }
   }
@@ -9323,6 +9602,8 @@ transmit_on_queue (void *cls)
   }
   else
   {
+    struct GNUNET_TIME_Relative wait_duration;
+
     /* Message not finished, waiting for acknowledgement.
        Update time by which we might retransmit 's' based on queue
        characteristics (i.e. RTT); it takes one RTT for the message to
@@ -9333,18 +9614,33 @@ transmit_on_queue (void *cls)
        OPTIMIZE: Note that in the future this heuristic should likely
        be improved further (measure RTT stability, consider message
        urgency and size when delaying ACKs, etc.) */
+
+    if (GNUNET_TIME_UNIT_FOREVER_REL.rel_value_us !=
+        queue->pd.aged_rtt.rel_value_us)
+      wait_duration = queue->pd.aged_rtt;
+    else
+      wait_duration = DEFAULT_ACK_WAIT_DURATION;
+    struct GNUNET_TIME_Absolute next = GNUNET_TIME_relative_to_absolute (
+      GNUNET_TIME_relative_multiply (
+        wait_duration, 4));
+    struct GNUNET_TIME_Relative plus = GNUNET_TIME_relative_multiply (
+      wait_duration, 4);
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "Waiting %s for ACK\n",
+                "Waiting %s (%llu) for ACK until %llu\n",
                 GNUNET_STRINGS_relative_time_to_string (
                   GNUNET_TIME_relative_multiply (
-                    queue->pd.aged_rtt, 4), GNUNET_NO));
+                    queue->pd.aged_rtt, 4), GNUNET_NO),
+                plus,
+                next);
     update_pm_next_attempt (pm,
                             GNUNET_TIME_relative_to_absolute (
-                              GNUNET_TIME_relative_multiply 
(queue->pd.aged_rtt,
+                              GNUNET_TIME_relative_multiply (wait_duration,
                                                              4)));
   }
   /* finally, re-schedule queue transmission task itself */
-  schedule_transmit_on_queue (queue, GNUNET_SCHEDULER_PRIORITY_DEFAULT);
+  schedule_transmit_on_queue (GNUNET_TIME_UNIT_ZERO,
+                              queue,
+                              GNUNET_SCHEDULER_PRIORITY_DEFAULT);
 }
 
 
@@ -9476,7 +9772,9 @@ handle_send_message_ack (void *cls,
          NULL != queue;
          queue = queue->next_client)
     {
-      schedule_transmit_on_queue (queue, GNUNET_SCHEDULER_PRIORITY_DEFAULT);
+      schedule_transmit_on_queue (GNUNET_TIME_UNIT_ZERO,
+                                  queue,
+                                  GNUNET_SCHEDULER_PRIORITY_DEFAULT);
     }
   }
   else if (QUEUE_LENGTH_LIMIT - 1 == qe->queue->queue_length)
@@ -9486,7 +9784,9 @@ handle_send_message_ack (void *cls,
                               "# Transmission throttled due to queue queue 
limit",
                               -1,
                               GNUNET_NO);
-    schedule_transmit_on_queue (qe->queue, GNUNET_SCHEDULER_PRIORITY_DEFAULT);
+    schedule_transmit_on_queue (GNUNET_TIME_UNIT_ZERO,
+                                qe->queue,
+                                GNUNET_SCHEDULER_PRIORITY_DEFAULT);
   }
   else if (1 == qe->queue->q_capacity)
   {
@@ -9500,7 +9800,9 @@ handle_send_message_ack (void *cls,
                               "# Transmission throttled due to message queue 
capacity",
                               -1,
                               GNUNET_NO);
-    schedule_transmit_on_queue (qe->queue, GNUNET_SCHEDULER_PRIORITY_DEFAULT);
+    schedule_transmit_on_queue (GNUNET_TIME_UNIT_ZERO,
+                                qe->queue,
+                                GNUNET_SCHEDULER_PRIORITY_DEFAULT);
   }
 
   if (NULL != (pm = qe->pm))
@@ -10090,7 +10392,8 @@ handle_add_queue_message (void *cls,
                                               
&check_validation_request_pending,
                                               queue);
   /* look for traffic for this queue */
-  schedule_transmit_on_queue (queue, GNUNET_SCHEDULER_PRIORITY_DEFAULT);
+  schedule_transmit_on_queue (GNUNET_TIME_UNIT_ZERO,
+                              queue, GNUNET_SCHEDULER_PRIORITY_DEFAULT);
   /* might be our first queue, try launching DV learning */
   if (NULL == dvlearn_task)
     dvlearn_task = GNUNET_SCHEDULER_add_now (&start_dv_learn, NULL);
diff --git a/src/transport/test_transport_distance_vector_circle_topo.conf 
b/src/transport/test_transport_distance_vector_circle_topo.conf
index ce26bf923..210179291 100644
--- a/src/transport/test_transport_distance_vector_circle_topo.conf
+++ b/src/transport/test_transport_distance_vector_circle_topo.conf
@@ -2,7 +2,7 @@ M:1
 N:3
 X:0
 AC:1
-B:1
+B:0
 T:libgnunet_test_transport_plugin_cmd_simple_send_dv
 R:1|{tcp_port:0}|{udp_port:1}
 R:2|{tcp_port:0}|{udp_port:1}
diff --git a/src/transport/test_transport_plugin_cmd_simple_send_dv.c 
b/src/transport/test_transport_plugin_cmd_simple_send_dv.c
index 167120e2b..f1f168102 100644
--- a/src/transport/test_transport_plugin_cmd_simple_send_dv.c
+++ b/src/transport/test_transport_plugin_cmd_simple_send_dv.c
@@ -65,12 +65,12 @@ struct TestState
    */
   struct GNUNET_TESTING_NetjailTopology *topology;
 
-  /**
+};
+
+/**
    * The number of messages received.
    */
-  unsigned int number_received;
-
-};
+static unsigned int number_received;
 
 static struct GNUNET_TESTING_Command block_send;
 
@@ -105,52 +105,61 @@ static void
 handle_test (void *cls,
              const struct GNUNET_TRANSPORT_TESTING_TestMessage *message)
 {
-  struct TestState *ts = cls;
+  struct GNUNET_PeerIdentity *peer = cls;
   const struct GNUNET_TESTING_AsyncContext *ac_block;
   const struct GNUNET_TESTING_AsyncContext *ac_start;
   const struct GNUNET_TESTING_Command *cmd;
   const struct GNUNET_CONTAINER_MultiShortmap *connected_peers_map;
   unsigned int connected;
   struct BlockState *bs;
+  struct GNUNET_TRANSPORT_CoreHandle *ch;
+  const struct StartPeerState *sps;
 
 
-
+  GNUNET_TRANSPORT_get_trait_state (&start_peer,
+                                    &sps);
+  ch = sps->th;
   GNUNET_TRANSPORT_get_trait_connected_peers_map (&start_peer,
                                                   &connected_peers_map);
 
-  connected = GNUNET_CONTAINER_multishortmap_size (
-    connected_peers_map);
+  if (NULL != connected_peers_map)
+  {
+    connected = GNUNET_CONTAINER_multishortmap_size (
+      connected_peers_map);
 
-  ts->number_received++;
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Received %u test message(s) from %u connected peer(s)\n",
-              ts->number_received,
-              connected);
+    number_received++;
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Received %u test message(s) from %s, %u connected peer(s)\n",
+                number_received,
+                GNUNET_i2s (peer),
+                connected);
 
-  GNUNET_TESTING_get_trait_async_context (&block_receive,
-                                          &ac_block);
+    GNUNET_TESTING_get_trait_async_context (&block_receive,
+                                            &ac_block);
 
-  if ( connected == ts->number_received)
-  {
-    if (NULL != ac_block->is)
+    if ( connected == number_received)
     {
-      GNUNET_assert  (NULL != ac_block);
-      if (NULL == ac_block->cont)
-        GNUNET_TESTING_async_fail ((struct
-                                    GNUNET_TESTING_AsyncContext *) ac_block);
-      else
-        GNUNET_TESTING_async_finish ((struct
+      if (NULL != ac_block->is)
+      {
+        GNUNET_assert  (NULL != ac_block);
+        if (NULL == ac_block->cont)
+          GNUNET_TESTING_async_fail ((struct
                                       GNUNET_TESTING_AsyncContext *) ac_block);
-    }
-    else
-    {
-      GNUNET_TESTING_get_trait_block_state (
-        &block_receive,
-        (const struct BlockState  **) &bs);
-      bs->asynchronous_finish = GNUNET_YES;
-    }
+        else
+          GNUNET_TESTING_async_finish ((struct
+                                        GNUNET_TESTING_AsyncContext *) 
ac_block);
+      }
+      else
+      {
+        GNUNET_TESTING_get_trait_block_state (
+          &block_receive,
+          (const struct BlockState  **) &bs);
+        bs->asynchronous_finish = GNUNET_YES;
+      }
 
+    }
   }
+  GNUNET_TRANSPORT_core_receive_continue (ch, peer);
 }
 
 
@@ -284,10 +293,6 @@ start_testcase (TESTING_CMD_HELPER_write_cb write_message, 
char *router_ip,
     GNUNET_MQ_handler_end ()
   };
 
-  LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "number_received %u\n",
-       ts->number_received);
-
   if (GNUNET_YES == *read_file)
   {
     LOG (GNUNET_ERROR_TYPE_DEBUG,
@@ -347,7 +352,7 @@ start_testcase (TESTING_CMD_HELPER_write_cb write_message, 
char *router_ip,
                                                 handlers,
                                                 ts->cfgname,
                                                 notify_connect,
-                                                GNUNET_YES);
+                                                GNUNET_NO);
   struct GNUNET_TESTING_Command commands[] = {
     GNUNET_TESTING_cmd_system_create ("system-create",
                                       ts->testdir),
diff --git a/src/transport/transport_api_cmd_start_peer.c 
b/src/transport/transport_api_cmd_start_peer.c
index e305e24e1..7448eff5a 100644
--- a/src/transport/transport_api_cmd_start_peer.c
+++ b/src/transport/transport_api_cmd_start_peer.c
@@ -124,7 +124,7 @@ notify_connect (void *cls,
   struct GNUNET_HashCode hc;
   struct GNUNET_CRYPTO_EddsaPublicKey public_key = peer->public_key;
 
-  void *ret = sps->handlers;
+  void *ret = (struct GNUNET_PeerIdentity *) peer;
 
   LOG (GNUNET_ERROR_TYPE_DEBUG,
        "This Peer %s \n",
diff --git a/src/util/mq.c b/src/util/mq.c
index b09837459..40ac97bbe 100644
--- a/src/util/mq.c
+++ b/src/util/mq.c
@@ -273,7 +273,7 @@ GNUNET_MQ_handle_message (const struct 
GNUNET_MQ_MessageHandler *handlers,
       break;
     }
   }
-done:
+  done:
   if (GNUNET_NO == handled)
   {
     LOG (GNUNET_ERROR_TYPE_INFO,
@@ -384,8 +384,9 @@ GNUNET_MQ_send (struct GNUNET_MQ_Handle *mq,
   mq->current_envelope = ev;
 
   LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "sending message of type %u, queue empty (MQ: %p)\n",
+       "sending message of type %u and size %u, queue empty (MQ: %p)\n",
        ntohs (ev->mh->type),
+       ntohs (ev->mh->size),
        mq);
 
   mq->send_impl (mq,
@@ -479,8 +480,10 @@ impl_send_continue (void *cls)
                                mq->current_envelope);
 
   LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "sending message of type %u from queue\n",
-       ntohs (mq->current_envelope->mh->type));
+       "sending message of type %u and size %u from queue (MQ: %p)\n",
+       ntohs (mq->current_envelope->mh->type),
+       ntohs (mq->current_envelope->mh->size),
+       mq);
 
   mq->send_impl (mq,
                  mq->current_envelope->mh,

-- 
To stop receiving notification emails like this one, please contact
gnunet@gnunet.org.



reply via email to

[Prev in Thread] Current Thread [Next in Thread]