gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] [gnunet] 01/04: more work on TCP communicator, almost there


From: gnunet
Subject: [GNUnet-SVN] [gnunet] 01/04: more work on TCP communicator, almost there
Date: Mon, 28 Jan 2019 18:08:02 +0100

This is an automated email from the git hooks/post-receive script.

grothoff pushed a commit to branch master
in repository gnunet.

commit 07533eec5c7b1637374ea1496595918861ac8b6d
Author: Christian Grothoff <address@hidden>
AuthorDate: Mon Jan 28 12:43:09 2019 +0100

    more work on TCP communicator, almost there
---
 src/transport/Makefile.am               |   1 +
 src/transport/gnunet-communicator-tcp.c | 498 ++++++++++++++++++++++++++++----
 2 files changed, 438 insertions(+), 61 deletions(-)

diff --git a/src/transport/Makefile.am b/src/transport/Makefile.am
index 0df3e4e27..ead9beeec 100644
--- a/src/transport/Makefile.am
+++ b/src/transport/Makefile.am
@@ -256,6 +256,7 @@ gnunet_communicator_tcp_SOURCES = \
  gnunet-communicator-tcp.c
 gnunet_communicator_tcp_LDADD = \
   libgnunettransportcommunicator.la \
+  $(top_builddir)/src/nat/libgnunetnatnew.la \
   $(top_builddir)/src/statistics/libgnunetstatistics.la \
   $(top_builddir)/src/util/libgnunetutil.la \
   $(LIBGCRYPT_LIBS) 
diff --git a/src/transport/gnunet-communicator-tcp.c 
b/src/transport/gnunet-communicator-tcp.c
index a94559bd2..050a5f225 100644
--- a/src/transport/gnunet-communicator-tcp.c
+++ b/src/transport/gnunet-communicator-tcp.c
@@ -24,14 +24,8 @@
  * @author Christian Grothoff
  *
  * TODO:
- * - lots of basic adaptations (see FIXMEs), need NAT service
- *   to determine our own listen IPs! Parsing of bindto spec!
- * - actual decryption and handling of boxes and rekeys!
- * - message queue management: flow control towards CORE!
- *   (stop reading from socket until MQ send to core is done;
- *    will need a counter as ONE read from socket may generate
- *    multiple messages en route to CORE; tricky bit: queue
- *    may die before we get MQ sent-done callbacks!)
+ * - NAT service API change to handle address stops!
+ * - handling of rekeys!
  */
 #include "platform.h"
 #include "gnunet_util_lib.h"
@@ -39,6 +33,7 @@
 #include "gnunet_signatures.h"
 #include "gnunet_constants.h"
 #include "gnunet_nt_lib.h"
+#include "gnunet_nat_service.h"
 #include "gnunet_statistics_service.h"
 #include "gnunet_transport_communication_service.h"
 
@@ -390,11 +385,20 @@ struct Queue
    */
   struct GNUNET_TIME_Absolute timeout;
 
+  /**
+   * How may messages did we pass from this queue to CORE for which we
+   * have yet to receive an acknoweldgement that CORE is done with
+   * them? If "large" (or even just non-zero), we should throttle
+   * reading to provide flow control.  See also #DEFAULT_MAX_QUEUE_LENGTH
+   * and #max_queue_length.
+   */ 
+  unsigned int backpressure;
+  
   /**
    * Which network type does this queue use?
    */
   enum GNUNET_NetworkType nt;
-
+  
   /**
    * Is MQ awaiting a #GNUNET_MQ_impl_send_continue() call?
    */
@@ -405,6 +409,14 @@ struct Queue
    */
   int finishing;
 
+  /**
+   * Did we technically destroy this queue, but kept the allocation
+   * around because of @e backpressure not being zero yet? Used
+   * simply to delay the final #GNUNET_free() operation until
+   * #core_read_finished_cb() has been called.
+   */
+  int destroyed;
+
   /**
    * #GNUNET_YES after #inject_key() placed the rekey message into the
    * plaintext buffer. Once the plaintext buffer is drained, this
@@ -474,11 +486,6 @@ struct ProtoQueue
  */
 static struct GNUNET_SCHEDULER_Task *listen_task;
 
-/**
- * Number of messages we currently have in our queues towards the transport 
service.
- */
-static unsigned long long delivering_messages;
-
 /**
  * Maximum queue length before we stop reading towards the transport service.
  */
@@ -504,11 +511,6 @@ static struct GNUNET_CONTAINER_MultiPeerMap *queue_map;
  */
 static struct GNUNET_NETWORK_Handle *listen_sock;
 
-/**
- * Handle to the operation that publishes our address.
- */
-static struct GNUNET_TRANSPORT_AddressIdentifier *ai;
-
 /**
  * Our public key.
  */
@@ -524,6 +526,11 @@ static struct GNUNET_CRYPTO_EddsaPrivateKey 
*my_private_key;
  */
 static const struct GNUNET_CONFIGURATION_Handle *cfg;
 
+/**
+ * Connection to NAT service.
+ */
+static struct GNUNET_NAT_Handle *nat;
+
 /**
  * Protoqueues DLL head.
  */ 
@@ -588,7 +595,10 @@ queue_destroy (struct Queue *queue)
   gcry_cipher_close (queue->in_cipher);
   gcry_cipher_close (queue->out_cipher);
   GNUNET_free (queue->address);
-  GNUNET_free (queue);
+  if (0 != queue->backpressure)
+    queue->destroyed = GNUNET_YES;
+  else
+    GNUNET_free (queue);
   if (NULL == listen_task)
     listen_task = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL,
                                                 listen_sock,
@@ -679,6 +689,213 @@ reschedule_queue_timeout (struct Queue *queue)
 }
 
 
+/**
+ * Queue read task. If we hit the timeout, disconnect it
+ *
+ * @param cls the `struct Queue *` to disconnect
+ */
+static void
+queue_read (void *cls);
+
+
+/**
+ * Core tells us it is done processing a message that transport
+ * received on a queue with status @a success.
+ *
+ * @param cls a `struct Queue *` where the message originally came from
+ * @param success #GNUNET_OK on success
+ */
+static void
+core_read_finished_cb (void *cls,
+                      int success)
+{
+  struct Queue *queue = cls;
+
+  if (GNUNET_OK != success)
+    GNUNET_STATISTICS_update (stats,
+                             "# messages lost in communicator API towards 
CORE",
+                             1,
+                             GNUNET_NO);
+  queue->backpressure--;
+  /* handle deferred queue destruction */
+  if ( (queue->destroyed) &&
+       (0 == queue->backpressure) )
+  {
+    GNUNET_free (queue);
+    return;
+  }
+  reschedule_queue_timeout (queue);
+  /* possibly unchoke reading, now that CORE made progress */
+  if (NULL == queue->read_task)
+    queue->read_task
+      = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_absolute_get_remaining 
(queue->timeout),
+                                      queue->sock,
+                                      &queue_read,
+                                      queue);
+}
+
+
+/**
+ * We received @a plaintext_len bytes of @a plaintext on @a queue.
+ * Pass it on to CORE.  If transmission is actually happening,
+ * increase backpressure counter.
+ *
+ * @param queue the queue that received the plaintext
+ * @param plaintext the plaintext that was received
+ * @param plaintext_len number of bytes of plaintext received
+ */ 
+static void
+pass_plaintext_to_core (struct Queue *queue,
+                       const void *plaintext,
+                       size_t plaintext_len)
+{
+  const struct GNUNET_MessageHeader *hdr = plaintext;
+  int ret;
+
+  if (ntohs (hdr->size) != plaintext_len)
+  {
+    /* NOTE: If we ever allow multiple CORE messages in one
+       BOX, this will have to change! */
+    GNUNET_break (0);
+    return;
+  }
+  ret = GNUNET_TRANSPORT_communicator_receive (ch,
+                                              &queue->target,
+                                              hdr,
+                                              &core_read_finished_cb,
+                                              queue);
+  if (GNUNET_OK == ret)
+    queue->backpressure++;
+  GNUNET_break (GNUNET_NO != ret); /* backpressure not working!? */
+  if (GNUNET_SYSERR == ret)
+    GNUNET_STATISTICS_update (stats,
+                             "# bytes lost due to CORE not running",
+                             plaintext_len,
+                             GNUNET_NO);
+}
+
+
+/**
+ * Test if we have received a full message in plaintext.
+ * If so, handle it.
+ *
+ * @param queue queue to process inbound plaintext for
+ */ 
+static void
+try_handle_plaintext (struct Queue *queue)
+{
+  const struct GNUNET_MessageHeader *hdr
+    = (const struct GNUNET_MessageHeader *) queue->pread_buf;
+  const struct TCPBox *box
+    = (const struct TCPBox *) queue->pread_buf;
+  const struct TCPRekey *rekey
+    = (const struct TCPRekey *) queue->pread_buf;
+  const struct TCPFinish *fin
+    = (const struct TCPFinish *) queue->pread_buf;
+  struct TCPRekey rekeyz;
+  struct TCPFinish finz;
+  struct GNUNET_ShortHashCode tmac;
+  uint16_t type;
+  size_t size = 0; /* make compiler happy */
+
+  if (sizeof (*hdr) > queue->pread_off)
+    return; /* not even a header */
+  type = ntohs (hdr->type);
+  switch (type)
+  {
+  case GNUNET_MESSAGE_TYPE_COMMUNICATOR_TCP_BOX:
+    /* Special case: header size excludes box itself! */
+    if (ntohs (hdr->size) + sizeof (struct TCPBox) > queue->pread_off)
+      return;
+    hmac (&queue->in_hmac,
+         &box[1],
+         ntohs (hdr->size),
+         &tmac);
+    if (0 != memcmp (&tmac,
+                    &box->hmac,
+                    sizeof (tmac)))
+    {
+      GNUNET_break_op (0);
+      queue_finish (queue);
+      return;
+    }
+    pass_plaintext_to_core (queue,
+                           (const void *) &box[1],
+                           ntohs (hdr->size));
+    size = ntohs (hdr->size) + sizeof (*box);
+    break;
+  case GNUNET_MESSAGE_TYPE_COMMUNICATOR_TCP_REKEY:
+    if (sizeof (*rekey) > queue->pread_off)
+      return;
+    if (ntohs (hdr->size) != sizeof (*rekey))
+    {
+      GNUNET_break_op (0);
+      queue_finish (queue);
+      return;
+    }
+    rekeyz = *rekey;
+    memset (&rekeyz.hmac,
+           0,
+           sizeof (rekeyz.hmac));
+    hmac (&queue->in_hmac,
+         &rekeyz,
+         sizeof (rekeyz),
+         &tmac);
+    if (0 != memcmp (&tmac,
+                    &box->hmac,
+                    sizeof (tmac)))
+    {
+      GNUNET_break_op (0);
+      queue_finish (queue);
+      return;
+    }
+    // FIXME: handle rekey!
+
+    size = ntohs (hdr->size);
+    break;
+  case GNUNET_MESSAGE_TYPE_COMMUNICATOR_TCP_FINISH:
+    if (sizeof (*fin) > queue->pread_off)
+      return;
+    if (ntohs (hdr->size) != sizeof (*fin))
+    {
+      GNUNET_break_op (0);
+      queue_finish (queue);
+      return;
+    }
+    finz = *fin;
+    memset (&finz.hmac,
+           0,
+           sizeof (finz.hmac));
+    hmac (&queue->in_hmac,
+         &rekeyz,
+         sizeof (rekeyz),
+         &tmac);
+    if (0 != memcmp (&tmac,
+                    &fin->hmac,
+                    sizeof (tmac)))
+    {
+      GNUNET_break_op (0);
+      queue_finish (queue);
+      return;
+    }
+    /* handle FINISH by destroying queue */
+    queue_destroy (queue);
+    break;
+  default:
+    GNUNET_break_op (0);
+    queue_finish (queue);
+    return;
+  }
+  GNUNET_assert (0 != size);
+  /* 'size' bytes of plaintext were used, shift buffer */
+  GNUNET_assert (size <= queue->pread_off);
+  memmove (queue->pread_buf,
+          &queue->pread_buf[size],
+          queue->pread_off - size);
+  queue->pread_off -= size;
+}
+
+
 /**
  * Queue read task. If we hit the timeout, disconnect it
  *
@@ -718,10 +935,20 @@ queue_read (void *cls)
   queue->cread_off += rcvd;
   if (queue->pread_off < sizeof (queue->pread_buf))
   {
-    /* FIXME: decrypt */
-  
-    /* FIXME: check plaintext for complete messages, if complete, hand to CORE 
*/
-    /* FIXME: CORE flow control: suspend doing more until CORE has ACKed */
+    size_t max = GNUNET_MIN (sizeof (queue->pread_buf) - queue->pread_off,
+                            queue->cread_off);
+    GNUNET_assert (0 ==
+                  gcry_cipher_decrypt (queue->in_cipher,
+                                       &queue->pread_buf[queue->pread_off],
+                                       max,
+                                       queue->cread_buf,
+                                       max));
+    queue->pread_off += max;
+    memmove (queue->cread_buf,
+            &queue->cread_buf[max],
+            queue->cread_off - max);
+    queue->cread_off -= max;
+    try_handle_plaintext (queue);
   }
   
   if (BUF_SIZE == queue->cread_off)
@@ -729,14 +956,15 @@ queue_read (void *cls)
   left = GNUNET_TIME_absolute_get_remaining (queue->timeout);
   if (0 != left.rel_value_us) 
   {
-    /* not actually our turn yet, but let's at least update
-       the monitor, it may think we're about to die ... */
-    queue->read_task
-      = GNUNET_SCHEDULER_add_read_net (left,
-                                      queue->sock,
-                                      &queue_read,
-                                      queue);
-
+    if (max_queue_length < queue->backpressure)
+    {
+      /* continue reading */
+      queue->read_task
+       = GNUNET_SCHEDULER_add_read_net (left,
+                                        queue->sock,
+                                        &queue_read,
+                                        queue);
+    }
     return;
   }
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -760,9 +988,119 @@ tcp_address_to_sockaddr (const char *bindto,
                         socklen_t *sock_len)
 {
   struct sockaddr *in;
-  size_t slen;
+  unsigned int port;
+  char dummy[2];
+  char *colon;
+  char *cp;
+  
+  if (1 == SSCANF (bindto,
+                  "%u%1s",
+                  &port,
+                  dummy))
+  {
+    /* interpreting value as just a PORT number */
+    if (port > UINT16_MAX)
+    {
+      GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+                 "BINDTO specification `%s' invalid: value too large for 
port\n",
+                 bindto);
+      return NULL;
+    }
+    if (GNUNET_YES ==
+       GNUNET_CONFIGURATION_get_value_yesno (cfg,
+                                             COMMUNICATOR_CONFIG_SECTION,
+                                             "DISABLE_V6"))
+    {
+      struct sockaddr_in *i4;
+      
+      i4 = GNUNET_malloc (sizeof (struct sockaddr_in));
+      i4->sin_family = AF_INET;
+      i4->sin_port = htons ((uint16_t) port);
+      *sock_len = sizeof (struct sockaddr_in);
+      in = (struct sockaddr *) i4;
+    }
+    else
+    {
+      struct sockaddr_in6 *i6;
+      
+      i6 = GNUNET_malloc (sizeof (struct sockaddr_in6));
+      i6->sin6_family = AF_INET6;
+      i6->sin6_port = htons ((uint16_t) port);
+      *sock_len = sizeof (struct sockaddr_in6);
+      in = (struct sockaddr *) i6;
+    }
+    return in;
+  }
+  cp = GNUNET_strdup (bindto);
+  colon = strrchr (cp, ':');
+  if (NULL != colon)
+  {
+    /* interpet value after colon as port */
+    *colon = '\0';
+    colon++;
+    if (1 == SSCANF (colon,
+                    "%u%1s",
+                    &port,
+                    dummy))
+    {
+      /* interpreting value as just a PORT number */
+      if (port > UINT16_MAX)
+      {
+       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+                   "BINDTO specification `%s' invalid: value too large for 
port\n",
+                   bindto);
+       GNUNET_free (cp);
+       return NULL;
+      }
+    }
+    else
+    {
+      GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+                 "BINDTO specification `%s' invalid: last ':' not followed by 
number\n",
+                 bindto);
+      GNUNET_free (cp);
+      return NULL;
+    }
+  }
+  else
+  {
+    /* interpret missing port as 0, aka pick any free one */
+    port = 0;
+  }
+  {
+    /* try IPv4 */
+    struct sockaddr_in v4;
 
-  /* FIXME: parse, allocate, return! */
+    if (1 == inet_pton (AF_INET,
+                       cp,
+                       &v4))
+    {
+      v4.sin_port = htons ((uint16_t) port);
+      in = GNUNET_memdup (&v4,
+                         sizeof (v4));
+      *sock_len = sizeof (v4);
+      GNUNET_free (cp);
+      return in;
+    }
+  }
+  {
+    /* try IPv6 */
+    struct sockaddr_in6 v6;
+
+    if (1 == inet_pton (AF_INET6,
+                       cp,
+                       &v6))
+    {
+      v6.sin6_port = htons ((uint16_t) port);
+      in = GNUNET_memdup (&v6,
+                         sizeof (v6));
+      *sock_len = sizeof (v6);
+      GNUNET_free (cp);
+      return in;
+    }
+  }
+  /* FIXME (feature!): maybe also try getnameinfo()? */
+  GNUNET_free (cp);
   return NULL;
 }
 
@@ -966,8 +1304,8 @@ queue_write (void *cls)
     size_t usent = (size_t) sent;
 
     memmove (queue->cwrite_buf,
-            &queue->cwrite_buf[sent],
-            queue->cwrite_off - sent);
+            &queue->cwrite_buf[usent],
+            queue->cwrite_off - usent);
     reschedule_queue_timeout (queue);
  }
   /* can we encrypt more? (always encrypt full messages, needed
@@ -1670,6 +2008,11 @@ get_queue_delete_it (void *cls,
 static void
 do_shutdown (void *cls)
 {
+  if (NULL != nat)
+  {
+     GNUNET_NAT_unregister (nat);
+     nat = NULL;
+  }
   if (NULL != listen_task)
   {
     GNUNET_SCHEDULER_cancel (listen_task);
@@ -1685,11 +2028,6 @@ do_shutdown (void *cls)
                                         &get_queue_delete_it,
                                          NULL);
   GNUNET_CONTAINER_multipeermap_destroy (queue_map);
-  if (NULL != ai)
-  {
-    GNUNET_TRANSPORT_communicator_address_remove (ai);
-    ai = NULL;
-  }
   if (NULL != ch)
   {
     GNUNET_TRANSPORT_communicator_disconnect (ch);
@@ -1732,6 +2070,51 @@ enc_notify_cb (void *cls,
 }
 
 
+/**
+ * Signature of the callback passed to #GNUNET_NAT_register() for
+ * a function to call whenever our set of 'valid' addresses changes.
+ *
+ * @param cls closure
+ * @param add_remove #GNUNET_YES to add a new public IP address, 
+ *                   #GNUNET_NO to remove a previous (now invalid) one
+ * @param ac address class the address belongs to
+ * @param addr either the previous or the new public IP address
+ * @param addrlen actual length of the @a addr
+ */
+static void
+nat_address_cb (void *cls,
+               int add_remove,
+               enum GNUNET_NAT_AddressClass ac,
+               const struct sockaddr *addr,
+               socklen_t addrlen)
+{
+  char *my_addr;
+  static struct GNUNET_TRANSPORT_AddressIdentifier *ai; // FIXME: store in 
*ctx of NAT!
+
+  if (GNUNET_YES == add_remove)
+  {
+    // FIXME: do better job at stringification of @a addr?
+    GNUNET_asprintf (&my_addr,
+                    "%s-%s",
+                    COMMUNICATOR_ADDRESS_PREFIX,
+                    GNUNET_a2s (addr,
+                                addrlen));
+    // FIXME: translate 'ac' to 'nt'?
+    ai = GNUNET_TRANSPORT_communicator_address_add (ch,
+                                                   my_addr,
+                                                   GNUNET_NT_LOOPBACK, // 
FIXME: wrong NT!
+                                                   
GNUNET_TIME_UNIT_FOREVER_REL);
+    GNUNET_free (my_addr);
+  }
+  else
+  {
+    // FIXME: support removal! => improve NAT API!
+    GNUNET_TRANSPORT_communicator_address_remove (ai);
+    ai = NULL;
+  }
+}
+
+
 /**
  * Setup communicator and launch network interactions.
  *
@@ -1749,9 +2132,8 @@ run (void *cls,
   char *bindto;
   struct sockaddr *in;
   socklen_t in_len;
-  char *my_addr;
-  (void) cls;
 
+  (void) cls;
   cfg = c;
   if (GNUNET_OK !=
       GNUNET_CONFIGURATION_get_value_filename (cfg,
@@ -1810,6 +2192,7 @@ run (void *cls,
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
              "Bound to `%s'\n",
              bindto);
+  GNUNET_free (bindto);
   stats = GNUNET_STATISTICS_create ("C-TCP",
                                    cfg);
   GNUNET_SCHEDULER_add_shutdown (&do_shutdown,
@@ -1824,13 +2207,13 @@ run (void *cls,
   }
   GNUNET_CRYPTO_eddsa_key_get_public (my_private_key,
                                       &my_identity.public_key);
-
+  /* start listening */
   listen_task = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL,
                                               listen_sock,
                                               &listen_cb,
                                               NULL);
   queue_map = GNUNET_CONTAINER_multipeermap_create (10,
-                                                     GNUNET_NO);
+                                                   GNUNET_NO);
   ch = GNUNET_TRANSPORT_communicator_connect (cfg,
                                              COMMUNICATOR_CONFIG_SECTION,
                                              COMMUNICATOR_ADDRESS_PREFIX,
@@ -1843,24 +2226,17 @@ run (void *cls,
   {
     GNUNET_break (0);
     GNUNET_SCHEDULER_shutdown ();
-    GNUNET_free (bindto);
     return;
   }
-  // FIXME: bindto is wrong here, we MUST get our external
-  // IP address and really look at 'in' here as we might
-  // be bound to loopback or some other specific IP address!
-  GNUNET_asprintf (&my_addr,
-                  "%s-%s",
-                  COMMUNICATOR_ADDRESS_PREFIX,
-                  bindto);
-  GNUNET_free (bindto);
-  // FIXME: based on our bindto, we might not be able to tell the
-  // network type yet! What to do here!?
-  ai = GNUNET_TRANSPORT_communicator_address_add (ch,
-                                                 my_addr,
-                                                 GNUNET_NT_LOOPBACK, // FIXME: 
wrong NT!
-                                                 GNUNET_TIME_UNIT_FOREVER_REL);
-  GNUNET_free (my_addr);
+  nat = GNUNET_NAT_register (cfg,
+                            COMMUNICATOR_CONFIG_SECTION,
+                            IPPROTO_TCP,
+                            1 /* one address */,
+                            (const struct sockaddr **) &in,
+                            &in_len,
+                            &nat_address_cb,
+                            NULL /* FIXME: support reversal! */,
+                            NULL /* closure */);
 }
 
 

-- 
To stop receiving notification emails like this one, please contact
address@hidden



reply via email to

[Prev in Thread] Current Thread [Next in Thread]