gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r22366 - in gnunet/src: include stream


From: gnunet
Subject: [GNUnet-SVN] r22366 - in gnunet/src: include stream
Date: Thu, 28 Jun 2012 21:42:01 +0200

Author: harsha
Date: 2012-06-28 21:42:01 +0200 (Thu, 28 Jun 2012)
New Revision: 22366

Modified:
   gnunet/src/include/gnunet_stream_lib.h
   gnunet/src/stream/stream_api.c
Log:
-control retransmission for HELLO and HELLO_ACK

Modified: gnunet/src/include/gnunet_stream_lib.h
===================================================================
--- gnunet/src/include/gnunet_stream_lib.h      2012-06-28 18:14:01 UTC (rev 
22365)
+++ gnunet/src/include/gnunet_stream_lib.h      2012-06-28 19:42:01 UTC (rev 
22366)
@@ -140,7 +140,8 @@
  * @param target the target peer to which the stream has to be opened
  * @param app_port the application port number which uniquely identifies this
  *            stream
- * @param open_cb this function will be called after stream has be established 
+ * @param open_cb this function will be called after stream has be established;
+ *          cannot be NULL
  * @param open_cb_cls the closure for open_cb
  * @param ... options to the stream, terminated by GNUNET_STREAM_OPTION_END
  * @return if successful it returns the stream socket; NULL if stream cannot be

Modified: gnunet/src/stream/stream_api.c
===================================================================
--- gnunet/src/stream/stream_api.c      2012-06-28 18:14:01 UTC (rev 22365)
+++ gnunet/src/stream/stream_api.c      2012-06-28 19:42:01 UTC (rev 22366)
@@ -272,9 +272,14 @@
   /**
    * Task identifier for retransmission task after timeout
    */
-  GNUNET_SCHEDULER_TaskIdentifier retransmission_timeout_task_id;
+  GNUNET_SCHEDULER_TaskIdentifier data_retransmission_task_id;
 
   /**
+   * Task identifier for retransmission of control messages
+   */
+  GNUNET_SCHEDULER_TaskIdentifier control_retransmission_task_id;
+
+  /**
    * The task for sending timely Acks
    */
   GNUNET_SCHEDULER_TaskIdentifier ack_task_id;
@@ -576,7 +581,6 @@
                                          socket);
     return 0;
   }
-
   ret = ntohs (head->message->header.size);
   GNUNET_assert (size >= ret);
   memcpy (buf, head->message, ret);
@@ -731,17 +735,16 @@
  * @param tc the Task context
  */
 static void
-retransmission_timeout_task (void *cls,
-                             const struct GNUNET_SCHEDULER_TaskContext *tc)
+data_retransmission_task (void *cls,
+                          const struct GNUNET_SCHEDULER_TaskContext *tc)
 {
   struct GNUNET_STREAM_Socket *socket = cls;
   
   if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason)
     return;
-
   LOG (GNUNET_ERROR_TYPE_DEBUG,
        "%s: Retransmitting DATA...\n", GNUNET_i2s (&socket->other_peer));
-  socket->retransmission_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
+  socket->data_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
   write_data (socket);
 }
 
@@ -925,11 +928,11 @@
                             NULL);
     packet++;
   }
-  if (GNUNET_SCHEDULER_NO_TASK == socket->retransmission_timeout_task_id)
-    socket->retransmission_timeout_task_id = 
+  if (GNUNET_SCHEDULER_NO_TASK == socket->data_retransmission_task_id)
+    socket->data_retransmission_task_id = 
       GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply 
                                     (GNUNET_TIME_UNIT_SECONDS, 8),
-                                    &retransmission_timeout_task,
+                                    &data_retransmission_task,
                                     socket);
 }
 
@@ -1292,7 +1295,7 @@
 /**
  * Callback to set state to ESTABLISHED
  *
- * @param cls the closure from queue_message FIXME: document
+ * @param cls the closure NULL;
  * @param socket the socket to requiring state change
  */
 static void
@@ -1305,6 +1308,10 @@
   socket->write_offset = 0;
   socket->read_offset = 0;
   socket->state = STATE_ESTABLISHED;
+  GNUNET_assert (GNUNET_SCHEDULER_NO_TASK !=
+                 socket->control_retransmission_task_id);
+  GNUNET_SCHEDULER_cancel (socket->control_retransmission_task_id);
+  socket->control_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
   if (NULL != socket->lsocket)
   {
     LOG (GNUNET_ERROR_TYPE_DEBUG,
@@ -1321,7 +1328,7 @@
       GNUNET_free (socket);
     }
   }
-  else if (NULL != socket->open_cb)
+  else
     socket->open_cb (socket->open_cls, socket);
 }
 
@@ -1337,7 +1344,7 @@
                       struct GNUNET_STREAM_Socket *socket)
 {
   GNUNET_assert (STATE_INIT == socket->state);
-  LOG (GNUNET_ERROR_TYPE_DEBUG, 
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
        "%s: Attaining HELLO_WAIT state\n",
        GNUNET_i2s (&socket->other_peer));
   socket->state = STATE_HELLO_WAIT;
@@ -1416,41 +1423,102 @@
 
 
 /**
+ * Returns GNUNET_MESSAGE_TYPE_STREAM_HELLO
+ *
+ * @return the generate hello message
+ */
+static struct GNUNET_STREAM_MessageHeader *
+generate_hello (void)
+{
+  struct GNUNET_STREAM_MessageHeader *msg;
+
+  msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
+  msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO);
+  msg->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
+  return msg;
+}
+
+
+/**
  * Returns a new HelloAckMessage. Also sets the write sequence number for the
  * socket
  *
  * @param socket the socket for which this HelloAckMessage has to be generated
+ * @param generate_seq GNUNET_YES to generate the write sequence number,
+ *          GNUNET_NO to use the existing sequence number
  * @return the HelloAckMessage
  */
 static struct GNUNET_STREAM_HelloAckMessage *
-generate_hello_ack_msg (struct GNUNET_STREAM_Socket *socket)
+generate_hello_ack (struct GNUNET_STREAM_Socket *socket,
+                    int generate_seq)
 {
   struct GNUNET_STREAM_HelloAckMessage *msg;
 
-  /* Get the random sequence number */
-  if (GNUNET_YES == socket->testing_active)
-    socket->write_sequence_number =
-      socket->testing_set_write_sequence_number_value;
-  else
-    socket->write_sequence_number = 
-      GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX);
-  LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "%s: write sequence number %u\n",
-       GNUNET_i2s (&socket->other_peer),
-       (unsigned int) socket->write_sequence_number);
-  
+  if (GNUNET_YES == generate_seq)
+  {
+    if (GNUNET_YES == socket->testing_active)
+      socket->write_sequence_number =
+        socket->testing_set_write_sequence_number_value;
+    else
+      socket->write_sequence_number = 
+        GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX);
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "%s: write sequence number %u\n",
+         GNUNET_i2s (&socket->other_peer),
+         (unsigned int) socket->write_sequence_number);
+  }
   msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_HelloAckMessage));
   msg->header.header.size = 
     htons (sizeof (struct GNUNET_STREAM_HelloAckMessage));
   msg->header.header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK);
   msg->sequence_number = htonl (socket->write_sequence_number);
   msg->receiver_window_size = htonl (RECEIVE_BUFFER_SIZE);
-
   return msg;
 }
 
 
 /**
+ * Task for retransmitting control messages if they aren't ACK'ed before a
+ * deadline
+ *
+ * @param cls the socket
+ * @param tc the Task context
+ */
+static void
+control_retransmission_task (void *cls,
+                             const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+  struct GNUNET_STREAM_Socket *socket = cls;
+    
+  if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason)
+    return;
+  socket->control_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
+  switch (socket->status)
+  {
+  case STATE_INIT:    
+    GNUNET_break (0);
+    break;
+  case STATE_LISTEN:
+    GNUNET_break (0);
+    break;
+  case STATE_HELLO_WAIT:
+    if (NULL == socket->lsocket) /* We are client */
+      queue_message (socket, generate_hello (), NULL, NULL);
+    else
+      queue_message (socket,
+                     (struct GNUNET_STREAM_MessageHeader *)
+                     generate_hello_ack (socket, GNUNET_NO), NULL, NULL);
+    break;
+  default:
+    GNUNET_break (0);
+  }
+  socket->control_retransmission_task_id =
+    GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
+                                  &control_retransmission_task, socket);
+}
+
+
+/**
  * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK
  *
  * @param cls the socket (set from GNUNET_MESH_connect)
@@ -1499,11 +1567,11 @@
          GNUNET_i2s (&socket->other_peer),
          (unsigned int) socket->read_sequence_number);
     socket->receiver_window_available = ntohl (ack_msg->receiver_window_size);
-    reply = generate_hello_ack_msg (socket);
+    reply = generate_hello_ack (socket, GNUNET_YES);
     queue_message (socket,
                    &reply->header, 
-                   &set_state_established, 
-                   NULL);      
+                   &set_state_established,
+                   NULL);    
     return GNUNET_OK;
   case STATE_ESTABLISHED:
   case STATE_RECEIVE_CLOSE_WAIT:
@@ -2087,31 +2155,34 @@
     return GNUNET_YES;
   }
 
-  GNUNET_assert (GNUNET_MESSAGE_TYPE_STREAM_HELLO == 
-                 ntohs (message->type));
+  GNUNET_assert (GNUNET_MESSAGE_TYPE_STREAM_HELLO == ntohs (message->type));
   GNUNET_assert (socket->tunnel == tunnel);
   LOG (GNUNET_ERROR_TYPE_DEBUG,
        "%s: Received HELLO from %s\n", 
        GNUNET_i2s (&socket->other_peer),
        GNUNET_i2s (&socket->other_peer));
 
-  if (STATE_INIT == socket->state)
+  switch (socket->status)
   {
-    reply = generate_hello_ack_msg (socket);
+  case STATE_INIT:
+    reply = generate_hello_ack (socket, GNUNET_YES);
     queue_message (socket, 
                    &reply->header,
                    &set_state_hello_wait, 
                    NULL);
-  }
-  else
-  {
+    break;
+  default:
     LOG (GNUNET_ERROR_TYPE_DEBUG,
          "%s: Client sent HELLO when in state %d\n", 
          GNUNET_i2s (&socket->other_peer),
          socket->state);
     /* FIXME: Send RESET? */
-      
   }
+  GNUNET_assert (GNUNET_SCHEDULER_NO_TASK ==
+                 socket->control_retransmission_task_id);
+  socket->control_retransmission_task_id =
+    GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
+                                  &control_retransmission_task, socket);
   return GNUNET_OK;
 }
 
@@ -2427,8 +2498,6 @@
            GNUNET_i2s (&socket->other_peer));
       return GNUNET_OK;
     }
-    /* FIXME: increment in the base sequence number is breaking current flow
-     */
     if (!((socket->write_sequence_number 
            - ntohl (ack->base_sequence_number)) < 
GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH))
     {
@@ -2450,10 +2519,10 @@
          GNUNET_i2s (&socket->other_peer));
       
     /* Cancel the retransmission task */
-    if (GNUNET_SCHEDULER_NO_TASK != socket->retransmission_timeout_task_id)
+    if (GNUNET_SCHEDULER_NO_TASK != socket->data_retransmission_task_id)
     {
-      GNUNET_SCHEDULER_cancel (socket->retransmission_timeout_task_id);
-      socket->retransmission_timeout_task_id = 
+      GNUNET_SCHEDULER_cancel (socket->data_retransmission_task_id);
+      socket->data_retransmission_task_id = 
         GNUNET_SCHEDULER_NO_TASK;
     }
     for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
@@ -2665,30 +2734,23 @@
          GNUNET_i2s(peer));
     return;
   }
-  
   LOG (GNUNET_ERROR_TYPE_DEBUG,
        "%s: Target peer %s connected\n",
        GNUNET_i2s (&socket->other_peer),
        GNUNET_i2s (&socket->other_peer));
-  
   /* Set state to INIT */
   socket->state = STATE_INIT;
-
   /* Send HELLO message */
-  message = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
-  message->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO);
-  message->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
+  message = generate_hello ();
   queue_message (socket,
                  message,
                  &set_state_hello_wait,
                  NULL);
-
-  /* Call open callback */
-  if (NULL == socket->open_cb)
-  {
-    LOG (GNUNET_ERROR_TYPE_DEBUG,
-         "STREAM_open callback is NULL\n");
-  }
+  GNUNET_assert (GNUNET_SCHEDULER_NO_TASK ==
+                 socket->control_retransmission_task_id);
+  socket->control_retransmission_task_id =
+    GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
+                                  &control_retransmission_task, socket);
 }
 
 
@@ -2752,15 +2814,12 @@
   socket->retransmit_timeout = lsocket->retransmit_timeout;
   socket->testing_active = lsocket->testing_active;
   socket->testing_set_write_sequence_number_value =
-    lsocket->testing_set_write_sequence_number_value;
-    
+    lsocket->testing_set_write_sequence_number_value;    
   LOG (GNUNET_ERROR_TYPE_DEBUG,
        "%s: Peer %s initiated tunnel to us\n", 
        GNUNET_i2s (&socket->other_peer),
        GNUNET_i2s (&socket->other_peer));
-  
   /* FIXME: Copy MESH handle from lsocket to socket */
-  
   return socket;
 }
 
@@ -2814,10 +2873,10 @@
     GNUNET_SCHEDULER_cancel (socket->ack_task_id);
     socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
   }
-  if (GNUNET_SCHEDULER_NO_TASK != socket->retransmission_timeout_task_id)
+  if (GNUNET_SCHEDULER_NO_TASK != socket->data_retransmission_task_id)
   {
-    GNUNET_SCHEDULER_cancel (socket->retransmission_timeout_task_id);
-    socket->retransmission_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
+    GNUNET_SCHEDULER_cancel (socket->data_retransmission_task_id);
+    socket->data_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
   }
   /* FIXME: Cancel all other tasks using socket->tunnel */
   socket->tunnel = NULL;
@@ -2905,7 +2964,8 @@
  * @param target the target peer to which the stream has to be opened
  * @param app_port the application port number which uniquely identifies this
  *            stream
- * @param open_cb this function will be called after stream has be established 
+ * @param open_cb this function will be called after stream has be established;
+ *          cannot be NULL
  * @param open_cb_cls the closure for open_cb
  * @param ... options to the stream, terminated by GNUNET_STREAM_OPTION_END
  * @return if successful it returns the stream socket; NULL if stream cannot be
@@ -2922,17 +2982,17 @@
   struct GNUNET_STREAM_Socket *socket;
   enum GNUNET_STREAM_Option option;
   GNUNET_MESH_ApplicationType ports[] = {app_port, 0};
-  va_list vargs;                /* Variable arguments */
+  va_list vargs;
 
   LOG (GNUNET_ERROR_TYPE_DEBUG,
        "%s\n", __func__);
+  GNUNET_assert (NULL != open_cb);
   socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
   socket->other_peer = *target;
   socket->open_cb = open_cb;
   socket->open_cls = open_cb_cls;
   /* Set defaults */
-  socket->retransmit_timeout = 
-    GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, default_timeout);
+  socket->retransmit_timeout = TIME_REL_SECS (default_timeout);
   socket->testing_active = GNUNET_NO;
   va_start (vargs, open_cb_cls); /* Parse variable args */
   do {
@@ -2972,10 +3032,8 @@
     GNUNET_free (socket);
     return NULL;
   }
-
   /* Now create the mesh tunnel to target */
-  LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "Creating MESH Tunnel\n");
+  LOG (GNUNET_ERROR_TYPE_DEBUG, "Creating MESH Tunnel\n");
   socket->tunnel = GNUNET_MESH_tunnel_create (socket->mesh,
                                               NULL, /* Tunnel context */
                                               &mesh_peer_connect_callback,
@@ -2984,9 +3042,7 @@
   GNUNET_assert (NULL != socket->tunnel);
   GNUNET_MESH_peer_request_connect_add (socket->tunnel,
                                         &socket->other_peer);
-  
-  LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "%s() END\n", __func__);
+  LOG (GNUNET_ERROR_TYPE_DEBUG, "%s() END\n", __func__);
   return socket;
 }
 
@@ -3088,6 +3144,7 @@
 {
   if (GNUNET_SCHEDULER_NO_TASK != handle->close_msg_retransmission_task_id)
     GNUNET_SCHEDULER_cancel (handle->close_msg_retransmission_task_id);
+  handle->socket->shutdown_handle = NULL;
   GNUNET_free (handle);
 }
 
@@ -3114,22 +3171,24 @@
     GNUNET_STREAM_io_write_cancel (socket->write_handle);
     //socket->write_handle = NULL;
   }
-
   if (socket->read_task_id != GNUNET_SCHEDULER_NO_TASK)
   {
     /* socket closed with read task pending!? */
     GNUNET_break (0);
     GNUNET_SCHEDULER_cancel (socket->read_task_id);
     socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
-  }
-  
+  }  
   /* Terminate the ack'ing tasks if they are still present */
   if (socket->ack_task_id != GNUNET_SCHEDULER_NO_TASK)
   {
     GNUNET_SCHEDULER_cancel (socket->ack_task_id);
     socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
   }
-
+  /* Terminate the control retransmission tasks */
+  if (GNUNET_SCHEDULER_NO_TASK != socket->control_retransmission_task_id)
+  {
+    GNUNET_SCHEDULER_cancel (socket->control_retransmission_task_id);
+  }
   /* Clear Transmit handles */
   if (NULL != socket->transmit_handle)
   {
@@ -3143,7 +3202,6 @@
     socket->ack_msg = NULL;
     socket->ack_transmit_handle = NULL;
   }
-
   /* Clear existing message queue */
   while (NULL != (head = socket->queue_head)) {
     GNUNET_CONTAINER_DLL_remove (socket->queue_head,
@@ -3213,8 +3271,7 @@
   }
   lsocket->listening = GNUNET_NO;/* We listen when we get a lock on app_port 
*/  
   /* Set defaults */
-  lsocket->retransmit_timeout = 
-    GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, default_timeout);
+  lsocket->retransmit_timeout = TIME_REL_SECS (default_timeout);
   lsocket->testing_active = GNUNET_NO;
   lsocket->listen_ok_cb = NULL;
   listen_timeout = TIME_REL_SECS (60); /* A minute for listen timeout */  
@@ -3491,10 +3548,10 @@
   GNUNET_assert (NULL != socket->write_handle);
   GNUNET_assert (socket->write_handle == ioh);
 
-  if (GNUNET_SCHEDULER_NO_TASK != socket->retransmission_timeout_task_id)
+  if (GNUNET_SCHEDULER_NO_TASK != socket->data_retransmission_task_id)
   {
-    GNUNET_SCHEDULER_cancel (socket->retransmission_timeout_task_id);
-    socket->retransmission_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
+    GNUNET_SCHEDULER_cancel (socket->data_retransmission_task_id);
+    socket->data_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
   }
 
   for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)




reply via email to

[Prev in Thread] Current Thread [Next in Thread]