gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r19695 - gnunet/src/stream


From: gnunet
Subject: [GNUnet-SVN] r19695 - gnunet/src/stream
Date: Sun, 5 Feb 2012 11:44:42 +0100

Author: harsha
Date: 2012-02-05 11:44:42 +0100 (Sun, 05 Feb 2012)
New Revision: 19695

Modified:
   gnunet/src/stream/stream_api.c
Log:
-added message sequencing

Modified: gnunet/src/stream/stream_api.c
===================================================================
--- gnunet/src/stream/stream_api.c      2012-02-05 08:37:13 UTC (rev 19694)
+++ gnunet/src/stream/stream_api.c      2012-02-05 10:44:42 UTC (rev 19695)
@@ -86,6 +86,42 @@
 
 
 /**
+ * Functions of this type are called when a message is written
+ *
+ * @param socket the socket the written message was bound to
+ */
+typedef void (*SendFinishCallback) (void *cls,
+                                    struct GNUNET_STREAM_Socket *socket);
+
+
+/**
+ * The send message queue
+ */
+struct MessageQueue
+{
+  /**
+   * The message
+   */
+  struct GNUNET_STREAM_MessageHeader *message;
+
+  /**
+   * Callback to be called when the message is sent
+   */
+  SendFinishCallback finish_cb;
+
+  /**
+   * The closure for finish_cb
+   */
+  void *finish_cb_cls;
+
+  /**
+   * The next message in queue. Should be NULL in the last message
+   */
+  struct MessageQueue *next;
+};
+
+
+/**
  * The STREAM Socket Handler
  */
 struct GNUNET_STREAM_Socket
@@ -143,7 +179,17 @@
   /**
    * The current message associated with the transmit handle
    */
-  struct GNUNET_MessageHeader *message;
+  struct MessageQueue *queue;
+
+  /**
+   * The queue tail, should always point to the last message in queue
+   */
+  struct MessageQueue *queue_tail;
+
+  /**
+   * The number of previous timeouts
+   */
+  unsigned int retries;
 };
 
 
@@ -175,6 +221,7 @@
 
 };
 
+
 /**
  * Default value in seconds for various timeouts
  */
@@ -182,9 +229,9 @@
 
 
 /**
- * Callback function from send_message
+ * Callback function for sending hello message
  *
- * @param cls closure the socket on which the send message was called
+ * @param cls closure the socket
  * @param size number of bytes available in buf
  * @param buf where the callee should write the message
  * @return number of bytes written to buf
@@ -193,64 +240,108 @@
 send_message_notify (void *cls, size_t size, void *buf)
 {
   struct GNUNET_STREAM_Socket *socket = cls;
+  struct MessageQueue *head;
   size_t ret;
 
+  head = socket->queue;
   socket->transmit_handle = NULL; /* Remove the transmit handle */
   if (0 == size)                /* request timed out */
     {
-      // statistics ("message timeout")
-      
-      
-      GNUNET_log (GNUNET_ERROR_TYPE_INFO,
-                 "Message not sent as tunnel was closed \n");
-      ret = 0;
+      socket->retries++;
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                 "Message sending timed out. Retry %d \n",
+                  socket->retries);
+      socket->transmit_handle = 
+        GNUNET_MESH_notify_transmit_ready (socket->tunnel,
+                                           0, /* Corking */
+                                           1, /* Priority */
+                                           /* FIXME: exponential backoff */
+                                           socket->retransmit_timeout,
+                                           &socket->other_peer,
+                                           ntohs (head->message->header.size),
+                                           &send_message_notify,
+                                           socket);
+      return 0;
     }
-  else                          /* Size is more or equal to what was requested 
*/
+
+  ret = ntohs (head->message->header.size);
+  GNUNET_assert (size >= ret);
+  memcpy (buf, head->message, ret);
+  if (NULL != head->finish_cb)
     {
-      ret = ntohs (socket->message->size);
-      GNUNET_assert (size >= ret);
-      memcpy (buf, socket->message, ret);
+      head->finish_cb (socket, head->finish_cb_cls);
     }
-  GNUNET_free (socket->message); /* Free the message memory */
-  socket->message = NULL;
+
+  socket->queue = head->next;   /* Will be NULL if the queue is empty */
+  GNUNET_free (head->message);
+  GNUNET_free (head);
+  head = socket->queue;
+  if (NULL != head)    /* more pending messages to send */
+    {
+      socket->retries = 0;
+      socket->transmit_handle = 
+        GNUNET_MESH_notify_transmit_ready (socket->tunnel,
+                                           0, /* Corking */
+                                           1, /* Priority */
+                                           /* FIXME: exponential backoff */
+                                           socket->retransmit_timeout,
+                                           &socket->other_peer,
+                                           ntohs (head->message->header.size),
+                                           &send_message_notify,
+                                           socket);
+    }
   return ret;
 }
 
 
 /**
- * Sends a message using the mesh connection of a socket
+ * Queues a message for sending using the mesh connection of a socket
  *
  * @param socket the socket whose mesh connection is used
  * @param message the message to be sent
+ * @param finish_cb the callback to be called when the message is sent
+ * @param finish_cb_cls the closure for the callback
  */
 static void
-send_message (struct GNUNET_STREAM_Socket *socket,
-              struct GNUNET_MessageHeader *message)
+queue_message (struct GNUNET_STREAM_Socket *socket,
+               struct GNUNET_STREAM_MessageHeader *message,
+               SendFinishCallback finish_cb,
+               void *finish_cb_cls)
 {
-  socket->message = message;
-  socket->transmit_handle = 
-    GNUNET_MESH_notify_transmit_ready (socket->tunnel,
-                                       0, /* Corking */
-                                       1, /* Priority */
-                                       socket->retransmit_timeout,
-                                       &socket->other_peer,
-                                       ntohs (message->size),
-                                       &send_message_notify,
-                                       socket);
-}
+  struct MessageQueue *msg_info;
 
-/**
- * Makes state transition dependending on the given state
- *
- * @param socket the socket whose state has to be transitioned
- */
-static void
-make_state_transition (struct GNUNET_STREAM_Socket *socket)
-{
+  msg_info = GNUNET_malloc (sizeof (struct MessageQueue));
+  msg_info->message = message;
+  msg_info->finish_cb = finish_cb;
+  msg_info->finish_cb_cls = finish_cb_cls;
+  msg_info->next = NULL;
 
+
+  if (NULL == socket->queue)
+    {
+      socket->queue = msg_info;
+      socket->queue_tail = msg_info;
+      socket->retries = 0;
+      socket->transmit_handle = 
+        GNUNET_MESH_notify_transmit_ready (socket->tunnel,
+                                           0, /* Corking */
+                                           1, /* Priority */
+                                           socket->retransmit_timeout,
+                                           &socket->other_peer,
+                                           ntohs (message->header.size),
+                                           &send_message_notify,
+                                           socket);
+    }
+  else                          /* There is a pending message in queue */
+    {
+      socket->queue_tail->next = msg_info; /* Add to tail */
+      socket->queue_tail = msg_info;
+    }
 }
 
 
+
+
 /**
  * Client's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
  *
@@ -290,29 +381,33 @@
   return GNUNET_OK;
 }
 
+
 /**
- * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO
+ * Callback to set state to ESTABLISHED
  *
- * @param cls the socket (set from GNUNET_MESH_connect)
- * @param tunnel connection to the other end
- * @param tunnel_ctx this is NULL
- * @param sender who sent the message
- * @param message the actual message
- * @param atsi performance data for the connection
- * @return GNUNET_OK to keep the connection open,
- *         GNUNET_SYSERR to close it (signal serious error)
+ * @param cls the closure from queue_message
+ * @param socket the socket to requiring state change
  */
-static int
-client_handle_hello (void *cls,
-                     struct GNUNET_MESH_Tunnel *tunnel,
-                     void **tunnel_ctx,
-                     const struct GNUNET_PeerIdentity *sender,
-                     const struct GNUNET_MessageHeader *message,
-                     const struct GNUNET_ATS_Information*atsi)
+static void
+set_state_established (void *cls,
+                       struct GNUNET_STREAM_Socket *socket)
 {
-  struct GNUNET_STREAM_Socket *socket = cls;
+  socket->state = STATE_ESTABLISHED;
+}
 
-  return GNUNET_OK;
+
+/**
+ * Callback to set state to HELLO_WAIT
+ *
+ * @param cls the closure from queue_message
+ * @param socket the socket to requiring state change
+ */
+static void
+set_state_hello_wait (void *cls,
+                      struct GNUNET_STREAM_Socket *socket)
+{
+  GNUNET_assert (STATE_INIT == socket->state);
+  socket->state = STATE_HELLO_WAIT;
 }
 
 
@@ -337,7 +432,23 @@
                          const struct GNUNET_ATS_Information*atsi)
 {
   struct GNUNET_STREAM_Socket *socket = cls;
+  struct GNUNET_STREAM_MessageHeader *reply;
 
+  GNUNET_assert (socket->tunnel == tunnel);
+  if (STATE_HELLO_WAIT == socket->state)
+    {
+      reply = 
+        GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
+      reply->header.size = 
+        htons (sizeof (struct GNUNET_STREAM_MessageHeader));
+      reply->header.type = 
+        htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK);
+      queue_message (socket, 
+                     reply, 
+                     &set_state_established, 
+                     NULL);
+    }
+
   return GNUNET_OK;
 }
 
@@ -872,8 +983,6 @@
   {&client_handle_data, GNUNET_MESSAGE_TYPE_STREAM_DATA, 0},
   {&client_handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK, 
    sizeof (struct GNUNET_STREAM_AckMessage) },
-  {&client_handle_hello, GNUNET_MESSAGE_TYPE_STREAM_HELLO, 
-   sizeof (struct GNUNET_STREAM_MessageHeader)},
   {&client_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK,
    sizeof (struct GNUNET_STREAM_MessageHeader)},
   {&client_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET,
@@ -936,6 +1045,7 @@
                             const struct GNUNET_ATS_Information * atsi)
 {
   struct GNUNET_STREAM_Socket *socket = cls;
+  struct GNUNET_STREAM_MessageHeader *message;
 
   if (0 != memcmp (&socket->other_peer, 
                    peer, 
@@ -953,13 +1063,19 @@
   /* Set state to INIT */
   socket->state = STATE_INIT;
 
-  /* Try to achieve ESTABLISHED state */
-  make_state_transition (socket);
+  /* Send HELLO message */
+  message = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
+  message->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO);
+  message->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
+  queue_message (socket,
+                 message,
+                 &set_state_hello_wait,
+                 NULL);
 
   /* Call open callback */
   if (NULL == socket->open_cls)
     {
-      GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                   "STREAM_open callback is NULL\n");
     }
   if (NULL != socket->open_cb)
@@ -982,18 +1098,6 @@
 }
 
 
-/**
- * Function to find the mapped socket of a tunnel
- *
- * @param tunnel the tunnel whose associated socket has to be retrieved
- * @return the socket corresponding to the tunnel
- */
-static struct GNUNET_STREAM_Socket *
-find_socket (const struct GNUNET_MESH_Tunnel *tunnel)
-{
-  /* Search tunnel in a list or hashtable and retrieve the socket */
-}
-
 /*****************/
 /* API functions */
 /*****************/
@@ -1084,11 +1188,7 @@
     {
       GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
     }
-  /* Clear existing message queue message */
-  if (NULL != socket->message)
-    {
-      GNUNET_free (socket->message);
-    }
+  /* FIXME: Clear message queue */
   /* Close associated tunnel */
   if (NULL != socket->tunnel)
     {
@@ -1126,21 +1226,17 @@
   socket->tunnel = tunnel;
   socket->session_id = 0;       /* FIXME */
   socket->other_peer = *initiator;
-  socket->state = STATE_LISTEN;
+  socket->state = STATE_INIT;
 
   if (GNUNET_SYSERR == lsocket->listen_cb (lsocket->listen_cb_cls,
                                            socket,
                                            &socket->other_peer))
     {
       socket->state = STATE_CLOSED;
-      make_state_transition (socket);
+      /* FIXME: Send CLOSE message and then free */
       GNUNET_free (socket);
       GNUNET_MESH_tunnel_destroy (tunnel); /* Destroy the tunnel */
     }
-  else
-    {
-      make_state_transition (socket);
-    }
   return socket;
 }
 
@@ -1164,8 +1260,8 @@
 {
   struct GNUNET_STREAM_ListenSocket *lsocket = cls;
   struct GNUNET_STREAM_Socket *socket = tunnel_ctx;
+  struct MessageQueue *head;
 
-  socket = find_socket (tunnel);
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Peer %s has terminated connection abruptly\n",
               GNUNET_i2s (&socket->other_peer));
@@ -1178,12 +1274,13 @@
       socket->transmit_handle = NULL;
     }
    
-  /* Clear existing message queue message */
-  if (NULL != socket->message)
-    {
-      GNUNET_free (socket->message);
-      socket->message = NULL;
-    }
+  /* Clear existing message queue */
+  while (NULL != socket->queue) {
+    head = socket->queue;
+    socket->queue = head->next;
+    GNUNET_free (head->message);
+    GNUNET_free (head);
+  }
 }
 
 
@@ -1208,7 +1305,7 @@
   GNUNET_MESH_ApplicationType app_types[2];
 
   app_types[0] = app_port;
-  app_types[1] = NULL;
+  app_types[1] = 0;
   lsocket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_ListenSocket));
   lsocket->port = app_port;
   lsocket->listen_cb = listen_cb;




reply via email to

[Prev in Thread] Current Thread [Next in Thread]