gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] [gnunet] branch master updated (eead33d85 -> 43de1e4a0)


From: gnunet
Subject: [GNUnet-SVN] [gnunet] branch master updated (eead33d85 -> 43de1e4a0)
Date: Thu, 01 Nov 2018 15:29:53 +0100

This is an automated email from the git hooks/post-receive script.

grothoff pushed a change to branch master
in repository gnunet.

    from eead33d85 RPS service: Add more detailed statistics on 
multi/single-hop peers
     new 11916b980 attempting to fix #5464
     new 43de1e4a0 work on TNG

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 src/cadet/.gitignore                               |   3 +-
 src/cadet/gnunet-service-cadet_channel.c           |   7 +-
 src/include/gnunet_protocols.h                     |  51 +-
 .../gnunet_transport_communication_service.h       |  14 +-
 src/transport/transport.h                          | 257 ++++++
 src/transport/transport_api2_communication.c       | 959 +++++++++++++++++++++
 6 files changed, 1280 insertions(+), 11 deletions(-)
 create mode 100644 src/transport/transport_api2_communication.c

diff --git a/src/cadet/.gitignore b/src/cadet/.gitignore
index 44382fde9..935049ce8 100644
--- a/src/cadet/.gitignore
+++ b/src/cadet/.gitignore
@@ -21,4 +21,5 @@ test_cadet_local
 test_cadet_single
 gnunet-service-cadet-new
 test_cadet_local_mq
-test_cadet_*_new
\ No newline at end of file
+test_cadet_*_newtest_cadet_2_reopen
+test_cadet_5_reopen
diff --git a/src/cadet/gnunet-service-cadet_channel.c 
b/src/cadet/gnunet-service-cadet_channel.c
index 06711dc8b..8ef598132 100644
--- a/src/cadet/gnunet-service-cadet_channel.c
+++ b/src/cadet/gnunet-service-cadet_channel.c
@@ -500,6 +500,11 @@ channel_destroy (struct CadetChannel *ch)
     GNUNET_free (crm->data_message);
     GNUNET_free (crm);
   }
+  if (CADET_CHANNEL_LOOSE == ch->state)
+  {
+    GSC_drop_loose_channel (&ch->h_port,
+                           ch);
+  }
   if (NULL != ch->owner)
   {
     free_channel_client (ch->owner);
@@ -1136,8 +1141,6 @@ GCCH_channel_local_destroy (struct CadetChannel *ch,
          target, but that never went anywhere. Nothing to do here. */
       break;
     case CADET_CHANNEL_LOOSE:
-      GSC_drop_loose_channel (&ch->h_port,
-                              ch);
       break;
     default:
       GCT_send_channel_destroy (ch->t,
diff --git a/src/include/gnunet_protocols.h b/src/include/gnunet_protocols.h
index 03b13fd48..4831c9215 100644
--- a/src/include/gnunet_protocols.h
+++ b/src/include/gnunet_protocols.h
@@ -3005,9 +3005,58 @@ extern "C"
 #define GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_CANCEL  1135
 
 
+/*******************************************************
+  NEW (TNG) Transport service
+  ******************************************************* */
 
 /**
- * Next available: 1200
+ * @brief inform transport to add an address of this peer
+ */
+#define GNUNET_MESSAGE_TYPE_TRANSPORT_ADD_ADDRESS 1200
+
+/**
+ * @brief inform transport to delete an address of this peer
+ */
+#define GNUNET_MESSAGE_TYPE_TRANSPORT_DEL_ADDRESS 1201
+
+/**
+ * @brief inform transport about an incoming message
+ */
+#define GNUNET_MESSAGE_TYPE_TRANSPORT_INCOMING_MSG 1202
+
+/**
+ * @brief transport acknowledges processing an incoming message
+ */
+#define GNUNET_MESSAGE_TYPE_TRANSPORT_INCOMING_MSG_ACK 1203
+
+/**
+ * @brief inform transport that a queue was setup to talk to some peer
+ */
+#define GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_SETUP 1204
+
+/**
+ * @brief inform transport that a queue was torn down 
+ */
+#define GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_TEARDOWN 1205
+
+/**
+ * @brief transport tells communicator it wants a queue
+ */
+#define GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE 1206
+
+/**
+ * @brief transport tells communicator it wants to transmit
+ */
+#define GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG 1207
+
+/**
+ * @brief communicator tells transports that message was sent
+ */
+#define GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG_ACK 1208
+
+
+/**
+ * Next available: 1300
  */
 
 
diff --git a/src/include/gnunet_transport_communication_service.h 
b/src/include/gnunet_transport_communication_service.h
index 94d15af22..d93d5134e 100644
--- a/src/include/gnunet_transport_communication_service.h
+++ b/src/include/gnunet_transport_communication_service.h
@@ -137,8 +137,8 @@ typedef void
  * @return #GNUNET_OK if all is well, #GNUNET_NO if the message was
  *         immediately dropped due to memory limitations (communicator
  *         should try to apply back pressure),
- *         #GNUNET_SYSERR if the message is ill formed and communicator
- *         should try to reset stream
+ *         #GNUNET_SYSERR if the message could not be delivered because
+ *         the tranport service is not yet up
  */
 int
 GNUNET_TRANSPORT_communicator_receive (struct 
GNUNET_TRANSPORT_CommunicatorHandle *handle,
@@ -162,7 +162,7 @@ struct GNUNET_TRANSPORT_QueueHandle;
  * "inbound" connection or because the communicator discovered the
  * presence of another peer.
  *
- * @param handle connection to transport service
+ * @param ch connection to transport service
  * @param peer peer with which we can now communicate
  * @param address address in human-readable format, 0-terminated, UTF-8
  * @param nt which network type does the @a address belong to?
@@ -170,7 +170,7 @@ struct GNUNET_TRANSPORT_QueueHandle;
  * @return API handle identifying the new MQ
  */
 struct GNUNET_TRANSPORT_QueueHandle *
-GNUNET_TRANSPORT_communicator_mq_add (struct 
GNUNET_TRANSPORT_CommunicatorHandle *handle,
+GNUNET_TRANSPORT_communicator_mq_add (struct 
GNUNET_TRANSPORT_CommunicatorHandle *ch,
                                       const struct GNUNET_PeerIdentity *peer,
                                       const char *address,
                                       enum GNUNET_ATS_Network_Type nt,
@@ -198,16 +198,16 @@ struct GNUNET_TRANSPORT_AddressIdentifier;
  * Notify transport service about an address that this communicator
  * provides for this peer.
  *
- * @param handle connection to transport service
+ * @param ch connection to transport service
  * @param address our address in human-readable format, 0-terminated, UTF-8
  * @param nt which network type does the address belong to?
  * @param expiration when does the communicator forsee this address expiring?
  */
 struct GNUNET_TRANSPORT_AddressIdentifier *
-GNUNET_TRANSPORT_communicator_address_add (struct 
GNUNET_TRANSPORT_CommunicatorHandle *handle,
+GNUNET_TRANSPORT_communicator_address_add (struct 
GNUNET_TRANSPORT_CommunicatorHandle *ch,
                                            const char *address,
                                            enum GNUNET_ATS_Network_Type nt,
-                                           struct GNUNET_TIME_Absolute 
expiration);
+                                           struct GNUNET_TIME_Relative 
expiration);
 
 
 /**
diff --git a/src/transport/transport.h b/src/transport/transport.h
index 75726e462..ec373286d 100644
--- a/src/transport/transport.h
+++ b/src/transport/transport.h
@@ -644,6 +644,263 @@ struct TransportPluginMonitorMessage
 };
 
 
+
+
+
+
+
+
+
+/* *********************** TNG messages ***************** */
+
+/**
+ * Add address to the list.
+ */
+struct GNUNET_TRANSPORT_AddAddressMessage
+{
+
+  /**
+   * Type will be #GNUNET_MESSAGE_TYPE_TRANSPORT_ADD_ADDRESS.
+   */
+  struct GNUNET_MessageHeader header;
+
+  /**
+   * Address identifier (used during deletion).
+   */
+  uint32_t aid GNUNET_PACKED;
+
+  /**
+   * When does the address expire?
+   */
+  struct GNUNET_TIME_RelativeNBO expiration;
+
+  /**
+   * An `enum GNUNET_ATS_Network_Type` in NBO.
+   */
+  uint32_t nt;
+  
+  /* followed by UTF-8 encoded, 0-terminated human-readable address */
+};
+
+
+/**
+ * Remove address from the list.
+ */
+struct GNUNET_TRANSPORT_DelAddressMessage
+{
+
+  /**
+   * Type will be #GNUNET_MESSAGE_TYPE_TRANSPORT_DEL_ADDRESS.
+   */
+  struct GNUNET_MessageHeader header;
+
+  /**
+   * Address identifier.
+   */
+  uint32_t aid GNUNET_PACKED;
+
+};
+
+
+/**
+ * Inform transport about an incoming message.
+ */
+struct GNUNET_TRANSPORT_IncomingMessage
+{
+
+  /**
+   * Type will be #GNUNET_MESSAGE_TYPE_TRANSPORT_INCOMING_MSG.
+   */
+  struct GNUNET_MessageHeader header;
+
+  /**
+   * Do we use flow control or not?
+   */
+  uint32_t fc_on GNUNET_PACKED;
+  
+  /**
+   * 64-bit number to identify the matching ACK.
+   */
+  uint64_t fc_id GNUNET_PACKED;
+  
+  /**
+   * Sender identifier.
+   */
+  struct GNUNET_PeerIdentity sender GNUNET_PACKED;
+
+  /* followed by the message */
+};
+
+
+/**
+ * Transport informs us about being done with an incoming message.
+ * (only sent if fc_on was set).
+ */
+struct GNUNET_TRANSPORT_IncomingMessageAck
+{
+
+  /**
+   * Type will be #GNUNET_MESSAGE_TYPE_TRANSPORT_INCOMING_MSG_ACK.
+   */
+  struct GNUNET_MessageHeader header;
+
+  /**
+   * Reserved (0)
+   */
+  uint32_t reserved GNUNET_PACKED;
+  
+  /**
+   * Which message is being ACKed?
+   */
+  uint64_t fc_id GNUNET_PACKED;
+  
+  /**
+   * Sender identifier of the original message.
+   */
+  struct GNUNET_PeerIdentity sender GNUNET_PACKED;
+
+};
+
+
+/**
+ * Add queue to the transport
+ */
+struct GNUNET_TRANSPORT_AddQueueMessage
+{
+
+  /**
+   * Type will be #GNUNET_MESSAGE_TYPE_TRANSPORT_ADD_QUEUE.
+   */
+  struct GNUNET_MessageHeader header;
+
+  /**
+   * Queue identifier (used to identify the queue).
+   */
+  uint32_t qid GNUNET_PACKED;
+
+  /**
+   * Receiver that can be addressed via the queue.
+   */
+  struct GNUNET_PeerIdentity receiver GNUNET_PACKED;
+
+  /**
+   * An `enum GNUNET_ATS_Network_Type` in NBO.
+   */
+  uint32_t nt;
+  
+  /* followed by UTF-8 encoded, 0-terminated human-readable address */
+};
+
+
+/**
+ * Remove queue, it is no longer available.
+ */
+struct GNUNET_TRANSPORT_DelQueueMessage
+{
+
+  /**
+   * Type will be #GNUNET_MESSAGE_TYPE_TRANSPORT_DEL_QUEUE.
+   */
+  struct GNUNET_MessageHeader header;
+
+  /**
+   * Address identifier.
+   */
+  uint32_t qid GNUNET_PACKED;
+
+  /**
+   * Receiver that can be addressed via the queue.
+   */
+  struct GNUNET_PeerIdentity receiver GNUNET_PACKED;
+
+};
+
+
+/**
+ * Transport tells communicator that it wants a new queue.
+ */
+struct GNUNET_TRANSPORT_CreateQueue
+{
+
+  /**
+   * Type will be #GNUNET_MESSAGE_TYPE_TRANSPORT_QUEUE_CREATE.
+   */
+  struct GNUNET_MessageHeader header;
+
+  /**
+   * Always zero.
+   */
+  uint32_t reserved GNUNET_PACKED;
+
+  /**
+   * Receiver that can be addressed via the queue.
+   */
+  struct GNUNET_PeerIdentity receiver GNUNET_PACKED;
+
+  /* followed by UTF-8 encoded, 0-terminated human-readable address */
+};
+
+
+/**
+ * Inform communicator about transport's desire to send a message.
+ */
+struct GNUNET_TRANSPORT_SendMessageTo
+{
+
+  /**
+   * Type will be #GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG.
+   */
+  struct GNUNET_MessageHeader header;
+
+  /**
+   * Which queue should we use?
+   */
+  uint32_t qid GNUNET_PACKED;
+
+  /**
+   * Message ID, used for flow control.
+   */
+  uint64_t mid GNUNET_PACKED;
+  
+  /**
+   * Receiver identifier.
+   */
+  struct GNUNET_PeerIdentity receiver GNUNET_PACKED;
+
+  /* followed by the message */
+};
+
+
+/**
+ * Inform transport that message was sent.
+ */
+struct GNUNET_TRANSPORT_SendMessageToAck
+{
+
+  /**
+   * Type will be #GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG_ACK.
+   */
+  struct GNUNET_MessageHeader header;
+
+  /**
+   * Success (#GNUNET_OK), failure (#GNUNET_SYSERR).
+   */
+  uint32_t status GNUNET_PACKED;
+
+  /**
+   * Message ID of the original message.
+   */
+  uint64_t mid GNUNET_PACKED;
+  
+  /**
+   * Receiver identifier.
+   */
+  struct GNUNET_PeerIdentity receiver GNUNET_PACKED;
+
+};
+
+
+
 GNUNET_NETWORK_STRUCT_END
 
 /* end of transport.h */
diff --git a/src/transport/transport_api2_communication.c 
b/src/transport/transport_api2_communication.c
new file mode 100644
index 000000000..d446516bd
--- /dev/null
+++ b/src/transport/transport_api2_communication.c
@@ -0,0 +1,959 @@
+/*
+     This file is part of GNUnet.
+     Copyright (C) 2018 GNUnet e.V.
+
+     GNUnet is free software: you can redistribute it and/or modify it
+     under the terms of the GNU Affero General Public License as published
+     by the Free Software Foundation, either version 3 of the License,
+     or (at your option) any later version.
+
+     GNUnet is distributed in the hope that it will be useful, but
+     WITHOUT ANY WARRANTY; without even the implied warranty of
+     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+     Affero General Public License for more details.
+    
+     You should have received a copy of the GNU Affero General Public License
+     along with this program.  If not, see <http://www.gnu.org/licenses/>.
+*/
+
+/**
+ * @file transport/transport_api2_communication.c
+ * @brief implementation of the gnunet_transport_communication_service.h API
+ * @author Christian Grothoff
+ */
+#include "platform.h"
+#include "gnunet_util_lib.h"
+#include "gnunet_protocols.h"
+#include "gnunet_transport_communication_service.h"
+#include "transport.h"
+
+
+/**
+ * How many messages do we keep at most in the queue to the 
+ * transport service before we start to drop (default, 
+ * can be changed via the configuration file).
+ */
+#define DEFAULT_MAX_QUEUE_LENGTH 16
+
+
+/**
+ * Information we track per packet to enable flow control.
+ */
+struct FlowControl
+{
+  /**
+   * Kept in a DLL.
+   */
+  struct FlowControl *next;
+
+  /**
+   * Kept in a DLL.
+   */
+  struct FlowControl *prev;
+
+  /**
+   * Function to call once the message was processed.
+   */
+  GNUNET_TRANSPORT_MessageCompletedCallback cb;
+
+  /**
+   * Closure for @e cb
+   */
+  void *cb_cls;
+  
+  /**
+   * Which peer is this about?
+   */
+  struct GNUNET_PeerIdentity sender;
+
+  /**
+   * More-or-less unique ID for the message.
+   */
+  uint64_t id;
+};
+
+
+/**
+ * Information we track per message to tell the transport about
+ * success or failures.
+ */
+struct AckPending
+{
+  /**
+   * Kept in a DLL.
+   */
+  struct AckPending *next;
+
+  /**
+   * Kept in a DLL.
+   */
+  struct AckPending *prev;
+
+  /**
+   * Which peer is this about?
+   */
+  struct GNUNET_PeerIdentity receiver;
+
+  /**
+   * More-or-less unique ID for the message.
+   */
+  uint64_t mid;
+};
+
+
+/**
+ * Opaque handle to the transport service for communicators.
+ */
+struct GNUNET_TRANSPORT_CommunicatorHandle
+{
+  /**
+   * Head of DLL of addresses this communicator offers to the transport 
service.
+   */
+  struct GNUNET_TRANSPORT_AddressIdentifier *ai_head;
+
+  /**
+   * Tail of DLL of addresses this communicator offers to the transport 
service.
+   */
+  struct GNUNET_TRANSPORT_AddressIdentifier *ai_tail;
+
+  /**
+   * DLL of messages awaiting flow control confirmation (ack).
+   */
+  struct FlowControl *fc_head;
+
+  /**
+   * DLL of messages awaiting flow control confirmation (ack).
+   */
+  struct FlowControl *fc_tail;
+
+  /**
+   * DLL of messages awaiting transmission confirmation (ack).
+   */
+  struct AckPending *ap_head;
+
+  /**
+   * DLL of messages awaiting transmission confirmation (ack).
+   */
+  struct AckPending *ac_tail;
+
+  /**
+   * DLL of queues we offer.
+   */
+  struct QueueHandle *queue_head;
+  
+  /**
+   * DLL of queues we offer.
+   */
+  struct QueueHandle *queue_tail;
+    
+  /**
+   * Our configuration.
+   */
+  const struct GNUNET_CONFIGURATION_Handle *cfg;
+
+  /**
+   * Name of the communicator.
+   */
+  const char *name;
+
+  /**
+   * Function to call when the transport service wants us to initiate
+   * a communication channel with another peer.
+   */
+  GNUNET_TRANSPORT_CommunicatorMqInit mq_init;
+
+  /**
+   * Closure for @e mq_init.
+   */
+  void *mq_init_cls;
+
+  /**
+   * Maximum permissable queue length.
+   */
+  unsigned long long max_queue_length;
+  
+  /**
+   * Flow-control identifier generator.
+   */
+  uint64_t fc_gen;
+
+  /**
+   * MTU of the communicator
+   */
+  size_t mtu;
+  
+  /**
+   * Internal UUID for the address used in communication with the
+   * transport service.
+   */
+  uint32_t aid_gen;
+
+  /**
+   * Queue identifier generator.
+   */
+  uint32_t queue_gen;
+  
+};
+
+
+/**
+ * Handle returned to identify the internal data structure the transport
+ * API has created to manage a message queue to a particular peer.
+ */
+struct GNUNET_TRANSPORT_QueueHandle
+{
+  /**
+   * Handle this queue belongs to.
+   */
+  struct GNUNET_TRANSPORT_CommunicatorHandle *ch;
+
+  /**
+   * Which peer we can communciate with.
+   */
+  struct GNUNET_PeerIdentity peer;
+
+  /**
+   * Address used by the communication queue.
+   */ 
+  char *address;
+
+  /**
+   * Network type of the communciation queue.
+   */
+  enum GNUNET_ATS_Network_Type nt;
+
+  /**
+   * The queue itself.
+   */ 
+  struct GNUNET_MQ_Handle *mq;
+
+  /**
+   * ID for this queue when talking to the transport service.
+   */
+  uint32_t queue_id;
+  
+};
+
+
+/**
+ * Internal representation of an address a communicator is
+ * currently providing for the transport service.
+ */
+struct GNUNET_TRANSPORT_AddressIdentifier
+{
+
+  /**
+   * Kept in a DLL.
+   */
+  struct GNUNET_TRANSPORT_AddressIdentifier *next;
+
+  /**
+   * Kept in a DLL.
+   */
+  struct GNUNET_TRANSPORT_AddressIdentifier *prev;
+  
+  /**
+   * Transport handle where the address was added.
+   */
+  struct GNUNET_TRANSPORT_CommunicatorHandle *ch;
+
+  /**
+   * The actual address.
+   */
+  char *address;
+
+  /**
+   * When does the address expire? (Expected lifetime of the
+   * address.)
+   */
+  struct GNUNET_TIME_Relative expiration;
+  
+  /**
+   * Internal UUID for the address used in communication with the
+   * transport service.
+   */
+  uint32_t aid;
+
+  /**
+   * Network type for the address.
+   */
+  enum GNUNET_ATS_Network_Type nt;
+  
+};
+
+
+/**
+ * (re)connect our communicator to the transport service
+ *
+ * @param ch handle to reconnect
+ */
+static void
+reconnect (struct GNUNET_TRANSPORT_CommunicatorHandle *ch);
+
+
+/**
+ * Send message to the transport service about address @a ai
+ * being now available.
+ *
+ * @param ai address to add
+ */
+static void
+send_add_address (struct GNUNET_TRANSPORT_AddressIdentifier *ai)
+{
+  struct GNUNET_MQ_Envelope *env;
+  struct GNUNET_TRANSPORT_AddAddressMessage *aam;
+  
+  if (NULL == ai->ch->mq)
+    return;
+  env = GNUNET_MQ_msg_extra (aam,
+                            strlen (ai->address) + 1,
+                            GNUNET_MESSAGE_TYPE_TRANSPORT_ADD_ADDRESS);
+  aam->expiration = GNUNET_TIME_relative_to_nbo (ai->expiration);
+  aam->nt = htonl ((uint32_t) ai->nt);
+  memcpy (&aam[1],
+         ai->address,
+         strlen (ai->address) + 1);
+  GNUNET_MQ_send (ai->ch->mq,
+                 env);
+}
+
+
+/**
+ * Send message to the transport service about address @a ai
+ * being no longer available.
+ *
+ * @param ai address to delete
+ */
+static void
+send_del_address (struct GNUNET_TRANSPORT_AddressIdentifier *ai)
+{
+  struct GNUNET_MQ_Envelope *env;
+  struct GNUNET_TRANSPORT_DelAddressMessage *dam;
+  
+  if (NULL == ai->ch->mq)
+    return;
+  env = GNUNET_MQ_msg (dam,                         
+                      GNUNET_MESSAGE_TYPE_TRANSPORT_DEL_ADDRESS);
+  dam.aid = htonl (ai->aid);
+  GNUNET_MQ_send (ai->ch->mq,
+                 env);
+}
+
+
+/**
+ * Send message to the transport service about queue @a qh
+ * being now available.
+ *
+ * @param qh queue to add
+ */
+static void
+send_add_queue (struct GNUNET_TRANSPORT_QueueHandle *qh)
+{
+  struct GNUNET_MQ_Envelope *env;
+  struct GNUNET_TRANSPORT_AddQueueMessage *aqm;
+  
+  if (NULL == ai->ch->mq)
+    return;
+  env = GNUNET_MQ_msg_extra (aqm,
+                            strlen (ai->address) + 1,
+                            GNUNET_MESSAGE_TYPE_TRANSPORT_ADD_QUEUE);
+  aqm.receiver = qh->peer;
+  aqm.nt = htonl ((uint32_t) qh->nt);
+  aqm.qid = htonl (qh->qid);
+  memcpy (&aqm[1],
+         ai->address,
+         strlen (ai->address) + 1);
+  GNUNET_MQ_send (ai->ch->mq,
+                 env);
+}
+
+
+/**
+ * Send message to the transport service about queue @a qh
+ * being no longer available.
+ *
+ * @param qh queue to delete
+ */
+static void
+send_del_queue (struct GNUNET_TRANSPORT_QueueHandle *qh)
+{
+  struct GNUNET_MQ_Envelope *env;
+  struct GNUNET_TRANSPORT_DelQueueMessage *dqm;
+  
+  if (NULL == ai->ch->mq)
+    return;
+  env = GNUNET_MQ_msg (dqm,                         
+                      GNUNET_MESSAGE_TYPE_TRANSPORT_DEL_QUEUE);
+  dqm.qid = htonl (qh->qid);
+  dqm.receiver = qh->peer;
+  GNUNET_MQ_send (ai->ch->mq,
+                 env);
+}
+
+
+/**
+ * Disconnect from the transport service.  Purges
+ * all flow control entries as we will no longer receive
+ * the ACKs.  Purges the ack pending entries as the
+ * transport will no longer expect the confirmations.
+ *
+ * @param ch service to disconnect from
+ */
+static void
+disconnect (struct GNUNET_TRANSPORT_CommunicatorHandle *ch)
+{
+  struct FlowControl *fcn;
+  struct AckPending *apn;
+  
+  for (struct FlowControl *fc = ch->fc_head;
+       NULL != fc;
+       fc = fcn)
+  {
+    fcn = fc->next;
+    GNUNET_CONTAINER_DLL_remove (ch->fc_head,
+                                ch->fc_tail,
+                                fc);
+    fc->cb (fc->cb_cls,
+           GNUNET_SYSERR);
+    GNUNET_free (fc);
+  }
+  for (struct AckPending *ap = ch->ap_head;
+       NULL != ap;
+       ap = apn)
+  {
+    apn = ap->next;
+    GNUNET_CONTAINER_DLL_remove (ch->ap_head,
+                                ch->ap_tail,
+                                ap);
+    GNUNET_free (ap);
+  }
+  if (NULL == ch->mq)
+    return;
+  GNUNET_MQ_destroy (ch->mq);
+  ch->mq = NULL;
+}
+
+
+/**
+ * Function called on MQ errors.
+ */
+static void
+error_handler (void *cls,
+              enum GNUNET_MQ_Error error)
+{
+  struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls;
+
+  GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+             "MQ failure, reconnecting to transport service.\n");
+  disconnect (ch);
+  /* TODO: maybe do this with exponential backoff/delay */
+  reconnect (ch);
+}
+
+
+/**
+ * Transport service acknowledged a message we gave it
+ * (with flow control enabled). Tell the communicator.
+ *
+ * @param cls our `struct GNUNET_TRANSPORT_CommunicatorHandle *`
+ * @param incoming_ack the ack
+ */
+static void
+handle_incoming_ack (void *cls,
+                    struct GNUNET_TRANSPORT_IncomingMessageAck *incoming_ack)
+{
+  struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls;
+  
+  for (struct FlowControl *fc = ch->fc_head;
+       NULL != fc;
+       fc = fc->next)
+  {
+    if ( (fc->id == incoming_ack->fc_id) &&
+        (0 == memcmp (&fc->sender,
+                      incoming_ack->sender,
+                      sizeof (struct GNUNET_PeerIdentity))) )
+    {
+      GNUNET_CONTAINER_DLL_remove (ch->fc_head,
+                                  ch->fc_tail,
+                                  fc);
+      fc->cb (fc->cb_cls,
+             GNUNET_OK);
+      GNUNET_free (fc);
+      return;
+    }
+  }
+  GNUNET_break (0);
+  disconnect (ch);
+  /* TODO: maybe do this with exponential backoff/delay */
+  reconnect (ch);
+}
+
+
+/**
+ * Transport service wants us to create a queue. Check if @a cq
+ * is well-formed.
+ *
+ * @param cls our `struct GNUNET_TRANSPORT_CommunicatorHandle *`
+ * @param cq the queue creation request
+ * @return #GNUNET_OK if @a smt is well-formed
+ */
+static int
+check_create_queue (void *cls,
+                   struct GNUNET_TRANSPORT_CreateQueue *cq)
+{
+  uint16_t len = ntohs (cq->header.size) - sizeof (*cq);
+  const char *addr = (const char *) &cq[1];
+
+  if ( (0 == len) ||
+       ('\0' != addr[len-1]) )
+  {
+    GNUNET_break (0);
+    return GNUNET_SYSERR;
+  }
+  return GNUNET_OK;
+}
+
+
+/**
+ * Transport service wants us to create a queue. Tell the communicator.
+ *
+ * @param cls our `struct GNUNET_TRANSPORT_CommunicatorHandle *`
+ * @param cq the queue creation request
+ */
+static void
+handle_create_queue (void *cls,
+                    struct GNUNET_TRANSPORT_CreateQueue *cq)
+{
+  struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls;
+  const char *addr = (const char *) &cq[1];
+
+  if (GNUNET_OK !=
+      ch->mq_init (ch->mq_init_cls,
+                  &cq->receiver,
+                  addr))
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+               "Address `%s' invalid for this communicator\n",
+               addr);
+    // TODO: do we notify the transport!?
+  }
+}
+
+
+/**
+ * Transport service wants us to send a message. Check if @a smt
+ * is well-formed.
+ *
+ * @param cls our `struct GNUNET_TRANSPORT_CommunicatorHandle *`
+ * @param smt the transmission request
+ * @return #GNUNET_OK if @a smt is well-formed
+ */
+static int
+check_send_msg (void *cls,
+               struct GNUNET_TRANSPORT_SendMessageTo *smt)
+{
+  uint16_t len = ntohs (smt->header.size) - sizeof (*smt);
+  const struct GNUNET_MessageHeader *mh = (const struct GNUNET_MessageHeader 
*) &smt[1];
+
+  if (ntohs (mh->size) != len)
+  {
+    GNUNET_break (0);
+    return GNUNET_SYSERR;
+  }
+  return GNUNET_OK;
+}
+
+
+/**
+ * Notify transport service about @a status of a message with
+ * @a mid sent to @a receiver.
+ *
+ * @param ch handle
+ * @param status #GNUNET_OK on success, #GNUNET_SYSERR on failure
+ * @param receiver which peer was the receiver
+ * @param mid message that the ack is about
+ */
+static void
+send_ack (struct GNUNET_TRANSPORT_CommunicatorHandle *ch,
+         int status,
+         const struct GNUNET_PeerIdentity *receiver,
+         uint64_t mid)
+{
+  struct GNUNET_MQ_Envelope *env;
+  struct GNUNET_TRANSPORT_SendMessageToAck *ack;
+
+  env = GNUNET_MQ_msg (ack,
+                      GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG_ACK);
+  ack->status = htonl (GNUNET_OK);
+  ack->mid = ap->mid;
+  ack->receiver = ap->receiver;
+  GNUNET_MQ_send (ch->mq,
+                 env);
+}
+
+
+/**
+ * Message queue transmission by communicator was successful,
+ * notify transport service.
+ *
+ * @param cls an `struct AckPending *`
+ */
+static void
+send_ack_cb (void *cls)
+{
+  struct AckPending *ap = cls;
+  struct GNUNET_TRANSPORT_CommunicatorHandle *ch = ap->ch;
+
+  GNUNET_CONTAINER_DLL_remove (ch->ap_head,
+                              ch->ap_tail,
+                              ap);
+  send_ack (ch,
+           GNUNET_OK,
+           &ap->receiver,
+           ap->mid);
+  GNUNET_free (ap);
+}
+
+
+/**
+ * Transport service wants us to send a message. Tell the communicator.
+ *
+ * @param cls our `struct GNUNET_TRANSPORT_CommunicatorHandle *`
+ * @param smt the transmission request
+ */
+static void
+handle_send_msg (void *cls,
+                struct GNUNET_TRANSPORT_SendMessageTo *smt)
+{
+  struct GNUNET_TRANSPORT_CommunicatorHandle *ch = cls;
+  const struct GNUNET_MessageHeader *mh;
+  struct GNUNET_MQ_Envelope *env;
+  struct AckPending *ap;
+  struct QueueHandle *qh;
+
+  for (qh = ch->queue_head;NULL != qh; qh = qh->next)  
+    if ( (qh->queue_id == smt->qid) &&
+        (0 == memcmp (&qh->peer,
+                      &smt->target,
+                      sizeof (struct GNUNET_PeerIdentity))) )
+      break;  
+  if (NULL == qh)
+  {
+    /* queue is already gone, tell transport this one failed */
+    GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+               "Transmission failed, queue no longer exists.\n");
+    send_ack (ch,
+             GNUNET_NO,
+             &smt->receiver,
+             smt->mid);
+    return;
+  }
+  ap = GNUNET_new (struct AckPending);
+  ap->ch = ch;
+  ap->receiver = smt->receiver;
+  ap->mid = smt->mid;
+  GNUNET_CONTAINER_DLL_insert (ch->ap_head,
+                              cp->ap_tail,
+                              ap);
+  mh = (const struct GNUNET_MessageHeader *) &smt[1];
+  env = GNUNET_MQ_msg_copy (mh);
+  GNUNET_MQ_notify_sent (env,
+                        &send_ack_cb,
+                        ap);
+  GNUNET_MQ_send (qh->mq,
+                 env);
+}
+
+
+/**
+ * (re)connect our communicator to the transport service
+ *
+ * @param ch handle to reconnect
+ */
+static void
+reconnect (struct GNUNET_TRANSPORT_CommunicatorHandle *ch)
+{
+  struct GNUNET_MQ_MessageHandler handlers[] = {
+    GNUNET_MQ_hd_fixed_size (incoming_ack,
+                            GNUNET_MESSAGE_TYPE_TRANSPORT_INCOMING_MSG_ACK,
+                            struct GNUNET_TRANSPORT_IncomingMessageAck,
+                            ch),
+    GNUNET_MQ_hd_var_size (create_queue,
+                          GNUNET_MESSAGE_TYPE_TRANSPORT_CREATE_QUEUE,
+                          struct GNUNET_TRANSPORT_CreateQueue,
+                          ch),
+    GNUNET_MQ_hd_var_size (send_msg,
+                          GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_MSG,
+                          struct GNUNET_TRANSPORT_SendMessageTo,
+                          ch),
+    GNUNET_MQ_handler_end()
+  };
+  
+  ch->mq = GNUNET_CLIENT_connect (cfg,
+                                 "transport",
+                                 handlers,
+                                 &error_handler,
+                                 ch);
+  for (struct GNUNET_TRANSPORT_AddressIdentifier *ai = ch->ai_head;
+       NULL != ai;
+       ai = ai->next)
+    send_add_address (ai);
+  for (struct GNUNET_TRANSPORT_QueueHandle *qh = ch->queue_head;
+       NULL != qh;
+       qh = qh->next)
+    send_add_queue (qh);
+}
+
+
+/**
+ * Connect to the transport service.
+ *
+ * @param cfg configuration to use
+ * @param name name of the communicator that is connecting
+ * @param mtu maximum message size supported by communicator, 0 if
+ *            sending is not supported, SIZE_MAX for no MTU
+ * @param mq_init function to call to initialize a message queue given
+ *                the address of another peer, can be NULL if the
+ *                communicator only supports receiving messages
+ * @param mq_init_cls closure for @a mq_init
+ * @return NULL on error
+ */
+struct GNUNET_TRANSPORT_CommunicatorHandle *
+GNUNET_TRANSPORT_communicator_connect (const struct 
GNUNET_CONFIGURATION_Handle *cfg,
+                                       const char *name,
+                                       size_t mtu,
+                                       GNUNET_TRANSPORT_CommunicatorMqInit 
mq_init,
+                                       void *mq_init_cls)
+{
+  struct GNUNET_TRANSPORT_CommunicatorHandle *ch;
+  
+  ch = GNUNET_new (struct GNUNET_TRANSPORT_CommunicatorHandle);
+  ch->cfg = cfg;
+  ch->name = name;
+  ch->mtu = mtu;
+  ch->mq_init = mq_init;
+  ch->mq_init_cls = mq_init_cls;
+  reconnect (ch);
+  if (GNUNET_OK !=
+      GNUNET_CONFIGURATION_get_value_number (cfg,
+                                            name,
+                                            "MAX_QUEUE_LENGTH",
+                                            &ch->max_queue_length))
+    ch->max_queue_length = DEFAULT_MAX_QUEUE_LENGTH;
+  if (NULL == ch->mq)
+  {
+    GNUNET_free (ch);
+    return NULL;
+  }
+  return ch;
+}
+
+
+/**
+ * Disconnect from the transport service.
+ *
+ * @param ch handle returned from connect
+ */
+void
+GNUNET_TRANSPORT_communicator_disconnect (struct 
GNUNET_TRANSPORT_CommunicatorHandle *ch)
+{
+  disconnect (ch);
+  while (NULL != ch->ai_head)
+  {
+    GNUNET_break (0); /* communicator forgot to remove address, warn! */
+    GNUNET_TRANSPORT_communicator_address_remove (ch->ai_head);
+  }
+  GNUNET_free (ch);
+}
+
+
+/* ************************* Receiving *************************** */
+
+
+/**
+ * Notify transport service that the communicator has received
+ * a message.
+ *
+ * @param ch connection to transport service
+ * @param sender presumed sender of the message (details to be checked
+ *        by higher layers)
+ * @param msg the message
+ * @param cb function to call once handling the message is done, NULL if
+ *         flow control is not supported by this communicator
+ * @param cb_cls closure for @a cb
+ * @return #GNUNET_OK if all is well, #GNUNET_NO if the message was
+ *         immediately dropped due to memory limitations (communicator
+ *         should try to apply back pressure),
+ *         #GNUNET_SYSERR if the message could not be delivered because
+ *         the tranport service is not yet up
+ */
+int
+GNUNET_TRANSPORT_communicator_receive (struct 
GNUNET_TRANSPORT_CommunicatorHandle *ch,
+                                       const struct GNUNET_PeerIdentity 
*sender,
+                                       const struct GNUNET_MessageHeader *msg,
+                                       
GNUNET_TRANSPORT_MessageCompletedCallback cb,
+                                       void *cb_cls)
+{
+  struct GNUNET_MQ_Envelope *env;
+  struct GNUNET_TRANSPORT_IncomingMessage *im;
+  uint16_t msize;
+  
+  if (NULL == ai->ch->mq)
+    return GNUNET_SYSERR;
+  if (NULL != cb)
+  {
+    struct FlowControl *fc;
+
+    im->fc_on = htonl (GNUNET_YES);
+    im->fc_id = ai->ch->fc_gen++;
+    fc = GNUNET_new (struct FlowControl);
+    fc->sender = *sender;
+    fc->id = im->fc_id;
+    fc->cb = cb;
+    fc->cb_cls = cb_cls;
+    GNUNET_CONTAINER_DLL_insert (ch->fc_head,
+                                ch->fc_tail,
+                                fc);
+  }
+  else
+  {
+    if (GNUNET_MQ_get_length (ch->mq) >= ch->max_queue_length)
+    {
+      GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+                 "Dropping message: transprot is too slow, queue length %u 
exceeded\n",
+                 ch->max_queue_length);
+      return GNUNET_NO;
+    }
+  }
+  
+  msize = ntohs (msg->size);
+  env = GNUNET_MQ_msg_extra (im,
+                            msize,
+                            GNUNET_MESSAGE_TYPE_TRANSPORT_INCOMING_MSG);
+  if (NULL == env)
+  {
+    GNUNET_break (0);
+    return GNUNET_SYSERR;
+  }
+  im->sender = *sender;
+  memcpy (&im[1],
+         msg,
+         msize);
+  GNUNET_MQ_send (ai->ch->mq,
+                 env);
+  return GNUNET_OK;
+}
+
+
+/* ************************* Discovery *************************** */
+
+
+/**
+ * Notify transport service that an MQ became available due to an
+ * "inbound" connection or because the communicator discovered the
+ * presence of another peer.
+ *
+ * @param ch connection to transport service
+ * @param peer peer with which we can now communicate
+ * @param address address in human-readable format, 0-terminated, UTF-8
+ * @param nt which network type does the @a address belong to?
+ * @param mq message queue of the @a peer
+ * @return API handle identifying the new MQ
+ */
+struct GNUNET_TRANSPORT_QueueHandle *
+GNUNET_TRANSPORT_communicator_mq_add (struct 
GNUNET_TRANSPORT_CommunicatorHandle *ch,
+                                      const struct GNUNET_PeerIdentity *peer,
+                                      const char *address,
+                                      enum GNUNET_ATS_Network_Type nt,
+                                      struct GNUNET_MQ_Handle *mq)
+{
+  struct GNUNET_TRANSPORT_QueueHandle *qh;
+
+  qh = GNUNET_new (struct GNUNET_TRANSPORT_QueueHandle);
+  qh->ch = ch;
+  qh->peer = *peer;
+  qh->address = GNUNET_strdup (address);
+  qh->nt = nt;
+  qh->mq = mq;
+  qh->queue_id = ch->queue_gen++;
+  GNUNET_CONTAINER_DLL_insert (ch->queue_head,
+                              ch->queue_tail,
+                              qh);
+  send_add_queue (qh);
+  return qh;
+}
+
+
+/**
+ * Notify transport service that an MQ became unavailable due to a
+ * disconnect or timeout.
+ *
+ * @param qh handle for the queue that must be invalidated
+ */
+void
+GNUNET_TRANSPORT_communicator_mq_del (struct GNUNET_TRANSPORT_QueueHandle *qh)
+{
+  struct GNUNET_TRANSPORT_CommunicatorHandle *ch = qh->ch;
+  
+  send_del_queue (qh);
+  GNUNET_CONTAINER_DLL_remove (ch->queue_head,
+                              ch->queue_tail,
+                              qh);
+  GNUNET_MQ_destroy (qh->mq);
+  GNUNET_free (qh->address);
+  GNUNET_free (qh);
+}
+
+
+/**
+ * Notify transport service about an address that this communicator
+ * provides for this peer.
+ *
+ * @param ch connection to transport service
+ * @param address our address in human-readable format, 0-terminated, UTF-8
+ * @param nt which network type does the address belong to?
+ * @param expiration when does the communicator forsee this address expiring?
+ */
+struct GNUNET_TRANSPORT_AddressIdentifier *
+GNUNET_TRANSPORT_communicator_address_add (struct 
GNUNET_TRANSPORT_CommunicatorHandle *ch,
+                                           const char *address,
+                                           enum GNUNET_ATS_Network_Type nt,
+                                           struct GNUNET_TIME_Relative 
expiration)
+{
+  struct GNUNET_TRANSPORT_AddressIdentifier *ai;
+
+  ai = GNUNET_new (struct GNUNET_TRANSPORT_AddressIdentifier);
+  ai->ch = ch;
+  ai->address = GNUNET_strdup (address);
+  ai->nt = nt;
+  ai->expiration = expiration;
+  ai->aid = handle->aid_gen++;
+  GNUNET_CONTAINER_DLL_insert (handle->ai_head,
+                              handle->ai_tail,
+                              ai);
+  send_add_address (ai);
+  return ai;
+}
+
+
+/**
+ * Notify transport service about an address that this communicator no
+ * longer provides for this peer.
+ *
+ * @param ai address that is no longer provided
+ */
+void
+GNUNET_TRANSPORT_communicator_address_remove (struct 
GNUNET_TRANSPORT_AddressIdentifier *ai)
+{
+  struct GNUNET_TRANSPORT_CommunicatorHandle *ch = ai->ch;
+
+  send_del_address (ai);
+  GNUNET_CONTAINER_DLL_remove (ch->ai_head,
+                              ch->ai_tail,
+                              ai);
+  GNUNET_free (ai->address);
+  GNUNET_free (ai);
+}
+
+
+/* end of transport_api2_communication.c */

-- 
To stop receiving notification emails like this one, please contact
address@hidden



reply via email to

[Prev in Thread] Current Thread [Next in Thread]