gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r38174 - in gnunet/src: transport util


From: gnunet
Subject: [GNUnet-SVN] r38174 - in gnunet/src: transport util
Date: Fri, 21 Oct 2016 18:04:46 +0200

Author: grothoff
Date: 2016-10-21 18:04:46 +0200 (Fri, 21 Oct 2016)
New Revision: 38174

Modified:
   gnunet/src/transport/transport_api_core.c
   gnunet/src/util/client.c
   gnunet/src/util/client_new.c
   gnunet/src/util/mq.c
Log:
activating client_new implementation, seems to mostly work fine, or better than 
the old one

Modified: gnunet/src/transport/transport_api_core.c
===================================================================
--- gnunet/src/transport/transport_api_core.c   2016-10-21 14:26:15 UTC (rev 
38173)
+++ gnunet/src/transport/transport_api_core.c   2016-10-21 16:04:46 UTC (rev 
38174)
@@ -354,6 +354,25 @@
  * @param cls the `struct Neighbour` where the message was sent
  */
 static void
+notify_send_done_fin (void *cls)
+{
+  struct Neighbour *n = cls;
+
+  n->timeout_task = NULL;
+  n->is_ready = GNUNET_YES;
+  GNUNET_MQ_impl_send_continue (n->mq);
+}
+
+
+/**
+ * A message from the handler's message queue to a neighbour was
+ * transmitted.  Now trigger (possibly delayed) notification of the
+ * neighbour's message queue that we are done and thus ready for
+ * the next message.
+ *
+ * @param cls the `struct Neighbour` where the message was sent
+ */
+static void
 notify_send_done (void *cls)
 {
   struct Neighbour *n = cls;
@@ -364,8 +383,8 @@
   {
     GNUNET_BANDWIDTH_tracker_consume (&n->out_tracker,
                                       n->env_size + n->traffic_overhead);
+    n->env = NULL;
     n->traffic_overhead = 0;
-    n->env = NULL;
   }
   delay = GNUNET_BANDWIDTH_tracker_get_delay (&n->out_tracker,
                                               128);
@@ -375,10 +394,11 @@
     GNUNET_MQ_impl_send_continue (n->mq);
     return;
   }
+  GNUNET_MQ_impl_send_in_flight (n->mq);
   /* cannot send even a small message without violating
-     quota, wait a before notifying MQ */
+     quota, wait a before allowing MQ to send next message */
   n->timeout_task = GNUNET_SCHEDULER_add_delayed (delay,
-                                                  &notify_send_done,
+                                                  &notify_send_done_fin,
                                                   n);
 }
 
@@ -411,6 +431,7 @@
     GNUNET_MQ_impl_send_continue (mq);
     return;
   }
+  GNUNET_assert (NULL == n->env);
   n->env = GNUNET_MQ_msg_nested_mh (obm,
                                     GNUNET_MESSAGE_TYPE_TRANSPORT_SEND,
                                     msg);

Modified: gnunet/src/util/client.c
===================================================================
--- gnunet/src/util/client.c    2016-10-21 14:26:15 UTC (rev 38173)
+++ gnunet/src/util/client.c    2016-10-21 16:04:46 UTC (rev 38174)
@@ -375,7 +375,7 @@
  * @return the message queue, NULL on error
  */
 struct GNUNET_MQ_Handle *
-GNUNET_CLIENT_connecT (const struct GNUNET_CONFIGURATION_Handle *cfg,
+GNUNET_CLIENT_connecTX (const struct GNUNET_CONFIGURATION_Handle *cfg,
                        const char *service_name,
                        const struct GNUNET_MQ_MessageHandler *handlers,
                        GNUNET_MQ_ErrorHandler error_handler,

Modified: gnunet/src/util/client_new.c
===================================================================
--- gnunet/src/util/client_new.c        2016-10-21 14:26:15 UTC (rev 38173)
+++ gnunet/src/util/client_new.c        2016-10-21 16:04:46 UTC (rev 38174)
@@ -213,10 +213,9 @@
 static void
 connect_fail_continuation (struct ClientState *cstate)
 {
-  LOG (GNUNET_ERROR_TYPE_INFO,
-       "Failed to establish TCP connection to `%s:%u', no further addresses to 
try.\n",
-       cstate->hostname,
-       cstate->port);
+  LOG (GNUNET_ERROR_TYPE_WARNING,
+       "Failed to establish connection to `%s', no further addresses to 
try.\n",
+       cstate->service_name);
   GNUNET_break (NULL == cstate->ap_head);
   GNUNET_break (NULL == cstate->ap_tail);
   GNUNET_break (NULL == cstate->dns_active);
@@ -245,6 +244,7 @@
   ssize_t ret;
   size_t len;
   const char *pos;
+  int notify_in_flight;
 
   cstate->send_task = NULL;
   pos = (const char *) cstate->msg;
@@ -262,10 +262,7 @@
                             GNUNET_MQ_ERROR_WRITE);
     return;
   }
-  if (0 == cstate->msg_off)
-  {
-    GNUNET_MQ_impl_send_in_flight (cstate->mq);
-  }
+  notify_in_flight = (0 == cstate->msg_off);
   cstate->msg_off += ret;
   if (cstate->msg_off < len)
   {
@@ -274,6 +271,8 @@
                                         cstate->sock,
                                         &transmit_ready,
                                         cstate);
+    if (notify_in_flight) 
+      GNUNET_MQ_impl_send_in_flight (cstate->mq);
     return;
   }
   cstate->msg = NULL;
@@ -345,6 +344,7 @@
   {
     /* defer destruction */
     cstate->in_destroy = GNUNET_YES;
+    cstate->mq = NULL;
     return;
   }
   if (NULL != cstate->dns_active)
@@ -384,8 +384,12 @@
                          GNUNET_NO);
   if (GNUNET_SYSERR == ret)
   {
-    GNUNET_MQ_inject_error (cstate->mq,
-                            GNUNET_MQ_ERROR_READ);
+    if (NULL != cstate->mq)
+      GNUNET_MQ_inject_error (cstate->mq,
+                             GNUNET_MQ_ERROR_READ);
+    if (GNUNET_YES == cstate->in_destroy)
+      connection_client_destroy_impl (cstate->mq,
+                                     cstate);
     return;
   }
   if (GNUNET_YES == cstate->in_destroy)
@@ -723,9 +727,11 @@
 #endif
 
   if ( (0 == (cstate->attempts++ % 2)) ||
-       (0 == cstate->port) )
+       (0 == cstate->port) ||
+       (NULL == cstate->hostname) )
   {
-    /* on even rounds, try UNIX first */
+    /* on even rounds, try UNIX first, or always
+       if we do not have a DNS name and TCP port. */
     cstate->sock = try_unixpath (cstate->service_name,
                                  cstate->cfg);
     if (NULL != cstate->sock)
@@ -732,8 +738,15 @@
     {
       connect_success_continuation (cstate);
       return;
-    }
+    }    
   }
+  if ( (NULL == cstate->hostname) ||
+       (0 == cstate->port) )
+  {
+    /* All options failed. Boo! */
+    connect_fail_continuation (cstate);
+    return;
+  }
   cstate->dns_active
     = GNUNET_RESOLVER_ip_get (cstate->hostname,
                              AF_UNSPEC,
@@ -807,11 +820,11 @@
  * @return the message queue, NULL on error
  */
 struct GNUNET_MQ_Handle *
-GNUNET_CLIENT_connecT2 (const struct GNUNET_CONFIGURATION_Handle *cfg,
-                        const char *service_name,
-                        const struct GNUNET_MQ_MessageHandler *handlers,
-                        GNUNET_MQ_ErrorHandler error_handler,
-                        void *error_handler_cls)
+GNUNET_CLIENT_connecT (const struct GNUNET_CONFIGURATION_Handle *cfg,
+                      const char *service_name,
+                      const struct GNUNET_MQ_MessageHandler *handlers,
+                      GNUNET_MQ_ErrorHandler error_handler,
+                      void *error_handler_cls)
 {
   struct ClientState *cstate;
 

Modified: gnunet/src/util/mq.c
===================================================================
--- gnunet/src/util/mq.c        2016-10-21 14:26:15 UTC (rev 38173)
+++ gnunet/src/util/mq.c        2016-10-21 16:04:46 UTC (rev 38174)
@@ -128,6 +128,11 @@
   void *error_handler_cls;
 
   /**
+   * Task to asynchronously run #impl_send_continue(). 
+   */
+  struct GNUNET_SCHEDULER_Task *send_task;
+  
+  /**
    * Linked list of messages pending to be sent
    */
   struct GNUNET_MQ_Envelope *envelope_head;
@@ -145,23 +150,11 @@
   struct GNUNET_MQ_Envelope *current_envelope;
 
   /**
-   * GNUNET_YES if the sent notification was called 
-   * for the current envelope.
-   */
-  int send_notification_called;
-
-  /**
    * Map of associations, lazily allocated
    */
   struct GNUNET_CONTAINER_MultiHashMap32 *assoc_map;
 
   /**
-   * Task scheduled during #GNUNET_MQ_impl_send_continue
-   * or #GNUNET_MQ_impl_send_in_flight
-   */
-  struct GNUNET_SCHEDULER_Task *send_task;
-
-  /**
    * Functions to call on queue destruction; kept in a DLL.
    */
   struct GNUNET_MQ_DestroyNotificationHandle *dnh_head;
@@ -196,9 +189,15 @@
   unsigned int queue_length;
 
   /**
-   * GNUNET_YES if GNUNET_MQ_impl_evacuate was called.
+   * #GNUNET_YES if GNUNET_MQ_impl_evacuate was called.
+   * FIXME: is this dead?
    */
   int evacuate_called;
+
+  /**
+   * #GNUNET_YES if GNUNET_MQ_impl_send_in_flight() was called.
+   */
+  int in_flight;
 };
 
 
@@ -364,7 +363,7 @@
 unsigned int
 GNUNET_MQ_get_length (struct GNUNET_MQ_Handle *mq)
 {
-  return mq->queue_length;
+  return mq->queue_length - (GNUNET_YES == mq->in_flight) ? 1 : 0;
 }
 
 
@@ -385,7 +384,8 @@
   mq->queue_length++;
   ev->parent_queue = mq;
   /* is the implementation busy? queue it! */
-  if (NULL != mq->current_envelope)
+  if ( (NULL != mq->current_envelope) ||
+       (NULL != mq->send_task) )
   {
     GNUNET_CONTAINER_DLL_insert_tail (mq->envelope_head,
                                       mq->envelope_tail,
@@ -428,35 +428,6 @@
 
 
 /**
- * Task run to call the send notification for the next queued
- * message, if any.  Only useful for implementing message queues,
- * results in undefined behavior if not used carefully.
- *
- * @param cls message queue to send the next message with
- */
-static void
-impl_send_in_flight (void *cls)
-{
-  struct GNUNET_MQ_Handle *mq = cls;
-  struct GNUNET_MQ_Envelope *current_envelope;
-
-  mq->send_task = NULL;
-  /* call is only valid if we're actually currently sending
-   * a message */
-  current_envelope = mq->current_envelope;
-  GNUNET_assert (NULL != current_envelope);
-  /* can't call cancel from now on anymore */
-  current_envelope->parent_queue = NULL;
-  if ( (GNUNET_NO == mq->send_notification_called) &&
-       (NULL != current_envelope->sent_cb) )
-  {
-    current_envelope->sent_cb (current_envelope->sent_cls);
-  }
-  mq->send_notification_called = GNUNET_YES;
-}
-
-
-/**
  * Task run to call the send implementation for the next queued
  * message, if any.  Only useful for implementing message queues,
  * results in undefined behavior if not used carefully.
@@ -467,32 +438,19 @@
 impl_send_continue (void *cls)
 {
   struct GNUNET_MQ_Handle *mq = cls;
-  struct GNUNET_MQ_Envelope *current_envelope;
-
+  
   mq->send_task = NULL;
   /* call is only valid if we're actually currently sending
    * a message */
-  current_envelope = mq->current_envelope;
-  GNUNET_assert (NULL != current_envelope);
-  impl_send_in_flight (mq);
-  GNUNET_assert (0 < mq->queue_length);
-  mq->queue_length--;
   if (NULL == mq->envelope_head)
-  {
-    mq->current_envelope = NULL;
-  }
-  else
-  {
-    mq->current_envelope = mq->envelope_head;
-    GNUNET_CONTAINER_DLL_remove (mq->envelope_head,
-                                 mq->envelope_tail,
-                                 mq->current_envelope);
-    mq->send_notification_called = GNUNET_NO;
-    mq->send_impl (mq,
-                  mq->current_envelope->mh,
-                  mq->impl_state);
-  }
-  GNUNET_free (current_envelope);
+    return;
+  mq->current_envelope = mq->envelope_head;
+  GNUNET_CONTAINER_DLL_remove (mq->envelope_head,
+                              mq->envelope_tail,
+                              mq->current_envelope);
+  mq->send_impl (mq,
+                mq->current_envelope->mh,
+                mq->impl_state);
 }
 
 
@@ -506,22 +464,32 @@
 void
 GNUNET_MQ_impl_send_continue (struct GNUNET_MQ_Handle *mq)
 {
-  /* maybe #GNUNET_MQ_impl_send_in_flight was called? */
-  if (NULL != mq->send_task)
+  struct GNUNET_MQ_Envelope *current_envelope;
+  GNUNET_MQ_NotifyCallback cb;
+  
+  GNUNET_assert (0 < mq->queue_length);
+  mq->queue_length--;
+  current_envelope = mq->current_envelope;
+  current_envelope->parent_queue = NULL;
+  mq->current_envelope = NULL;
+  GNUNET_assert (NULL == mq->send_task);
+  mq->send_task = GNUNET_SCHEDULER_add_now (&impl_send_continue,
+                                           mq);
+  if (NULL != (cb = current_envelope->sent_cb))
   {
-    GNUNET_SCHEDULER_cancel (mq->send_task);
-  }
-  mq->send_task = GNUNET_SCHEDULER_add_now (&impl_send_continue,
-                                            mq);
+    current_envelope->sent_cb = NULL;
+    cb (current_envelope->sent_cls);
+  }  
+  GNUNET_free (current_envelope);
 }
 
 
 /**
  * Call the send notification for the current message, but do not
- * try to send the next message until #gnunet_mq_impl_send_continue
+ * try to send the next message until #GNUNET_MQ_impl_send_continue
  * is called.
  *
- * only useful for implementing message queues, results in undefined
+ * Only useful for implementing message queues, results in undefined
  * behavior if not used carefully.
  *
  * @param mq message queue to send the next message with
@@ -529,9 +497,21 @@
 void
 GNUNET_MQ_impl_send_in_flight (struct GNUNET_MQ_Handle *mq)
 {
-  GNUNET_assert (NULL == mq->send_task);
-  mq->send_task = GNUNET_SCHEDULER_add_now (&impl_send_in_flight,
-                                            mq);
+  struct GNUNET_MQ_Envelope *current_envelope;
+  GNUNET_MQ_NotifyCallback cb;
+  
+  mq->in_flight = GNUNET_YES;
+  /* call is only valid if we're actually currently sending
+   * a message */
+  current_envelope = mq->current_envelope;
+  GNUNET_assert (NULL != current_envelope);
+  /* can't call cancel from now on anymore */
+  current_envelope->parent_queue = NULL;
+  if (NULL != (cb = current_envelope->sent_cb))
+  {
+    current_envelope->sent_cb = NULL;
+    cb (current_envelope->sent_cls);
+  }
 }
 
 
@@ -1187,7 +1167,6 @@
       GNUNET_CONTAINER_DLL_remove (mq->envelope_head,
                                    mq->envelope_tail,
                                    mq->current_envelope);
-      mq->send_notification_called = GNUNET_NO;
       mq->send_impl (mq,
                     mq->current_envelope->mh,
                     mq->impl_state);




reply via email to

[Prev in Thread] Current Thread [Next in Thread]