gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r22955 - gnunet/src/stream


From: gnunet
Subject: [GNUnet-SVN] r22955 - gnunet/src/stream
Date: Sun, 29 Jul 2012 22:40:55 +0200

Author: harsha
Date: 2012-07-29 22:40:55 +0200 (Sun, 29 Jul 2012)
New Revision: 22955

Modified:
   gnunet/src/stream/stream_api.c
Log:
fixes

Modified: gnunet/src/stream/stream_api.c
===================================================================
--- gnunet/src/stream/stream_api.c      2012-07-29 17:41:22 UTC (rev 22954)
+++ gnunet/src/stream/stream_api.c      2012-07-29 20:40:55 UTC (rev 22955)
@@ -226,16 +226,6 @@
   struct GNUNET_MESH_TransmitHandle *transmit_handle;
 
   /**
-   * The current act transmit handle (if a pending ack transmit request exists)
-   */
-  struct GNUNET_MESH_TransmitHandle *ack_transmit_handle;
-
-  /**
-   * Pointer to the current ack message using in ack_task
-   */
-  struct GNUNET_STREAM_AckMessage *ack_msg;
-
-  /**
    * The current message associated with the transmit handle
    */
   struct MessageQueue *queue_head;
@@ -629,19 +619,21 @@
  * @param message the message to be sent
  * @param finish_cb the callback to be called when the message is sent
  * @param finish_cb_cls the closure for the callback
+ * @param urgent set to GNUNET_YES to add the message to the beginning of the
+ *          queue; GNUNET_NO to add at the tail
  */
 static void
 queue_message (struct GNUNET_STREAM_Socket *socket,
                struct GNUNET_STREAM_MessageHeader *message,
                SendFinishCallback finish_cb,
-               void *finish_cb_cls)
+               void *finish_cb_cls,
+               int urgent)
 {
   struct MessageQueue *queue_entity;
 
   GNUNET_assert 
     ((ntohs (message->header.type) >= GNUNET_MESSAGE_TYPE_STREAM_DATA)
      && (ntohs (message->header.type) <= 
GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK));
-
   LOG (GNUNET_ERROR_TYPE_DEBUG,
        "%s: Queueing message of type %d and size %d\n",
        GNUNET_i2s (&socket->other_peer),
@@ -652,9 +644,20 @@
   queue_entity->message = message;
   queue_entity->finish_cb = finish_cb;
   queue_entity->finish_cb_cls = finish_cb_cls;
-  GNUNET_CONTAINER_DLL_insert_tail (socket->queue_head,
-                                   socket->queue_tail,
-                                   queue_entity);
+  if (GNUNET_YES == urgent)
+  {
+    GNUNET_CONTAINER_DLL_insert (socket->queue_head, socket->queue_tail,
+                                 queue_entity);
+    if (NULL != socket->transmit_handle)
+    {
+      GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
+      socket->transmit_handle = NULL;
+    }
+  }
+  else
+    GNUNET_CONTAINER_DLL_insert_tail (socket->queue_head,
+                                      socket->queue_tail,
+                                      queue_entity);
   if (NULL == socket->transmit_handle)
   {
     socket->retries = 0;
@@ -691,42 +694,11 @@
   size = ntohs (message->header.size);
   msg_copy = GNUNET_malloc (size);
   memcpy (msg_copy, message, size);
-  queue_message (socket, msg_copy, finish_cb, finish_cb_cls);
+  queue_message (socket, msg_copy, finish_cb, finish_cb_cls, GNUNET_NO);
 }
 
 
 /**
- * Callback function for sending ack message
- *
- * @param cls closure the ACK message created in ack_task
- * @param size number of bytes available in buffer
- * @param buf where the callee should write the message
- * @return number of bytes written to buf
- */
-static size_t
-send_ack_notify (void *cls, size_t size, void *buf)
-{
-  struct GNUNET_STREAM_Socket *socket = cls;
-
-  if (0 == size)
-  {
-    LOG (GNUNET_ERROR_TYPE_DEBUG,
-         "%s called with size 0\n", __func__);
-    return 0;
-  }
-  GNUNET_assert (ntohs (socket->ack_msg->header.header.size) <= size);
-  
-  size = ntohs (socket->ack_msg->header.header.size);
-  memcpy (buf, socket->ack_msg, size);
-  
-  GNUNET_free (socket->ack_msg);
-  socket->ack_msg = NULL;
-  socket->ack_transmit_handle = NULL;
-  return size;
-}
-
-
-/**
  * Writes data using the given socket. The amount of data written is limited by
  * the receiver_window_size
  *
@@ -785,16 +757,8 @@
   ack_msg->base_sequence_number = htonl (socket->read_sequence_number);
   ack_msg->receive_window_remaining = 
     htonl (RECEIVE_BUFFER_SIZE - socket->receive_buffer_size);
-  socket->ack_msg = ack_msg;
-  /* Request MESH for sending ACK */
-  socket->ack_transmit_handle = 
-    GNUNET_MESH_notify_transmit_ready (socket->tunnel,
-                                       GNUNET_NO, /* Corking */
-                                       socket->retransmit_timeout,
-                                       &socket->other_peer,
-                                       ntohs (ack_msg->header.header.size),
-                                       &send_ack_notify,
-                                       socket);
+  /* Queue up ACK for immediate sending */
+  queue_message (socket, &ack_msg->header, NULL, NULL, GNUNET_YES);
 }
 
 
@@ -834,7 +798,7 @@
       GNUNET_SCHEDULER_NO_TASK;
     return;
   }
-  queue_message (socket, msg, NULL, NULL);
+  queue_message (socket, msg, NULL, NULL, GNUNET_NO);
   shutdown_handle->close_msg_retransmission_task_id =
     GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
                                   &close_msg_retransmission_task,
@@ -1512,11 +1476,12 @@
     break;
   case STATE_HELLO_WAIT:
     if (NULL == socket->lsocket) /* We are client */
-      queue_message (socket, generate_hello (), NULL, NULL);
+      queue_message (socket, generate_hello (), NULL, NULL, GNUNET_NO);
     else
       queue_message (socket,
                      (struct GNUNET_STREAM_MessageHeader *)
-                     generate_hello_ack (socket, GNUNET_NO), NULL, NULL);
+                     generate_hello_ack (socket, GNUNET_NO), NULL, NULL,
+                     GNUNET_NO);
     socket->control_retransmission_task_id =
     GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
                                   &control_retransmission_task, socket);
@@ -1525,7 +1490,8 @@
     if (NULL == socket->lsocket)
       queue_message (socket,
                      (struct GNUNET_STREAM_MessageHeader *)
-                     generate_hello_ack (socket, GNUNET_NO), NULL, NULL);
+                     generate_hello_ack (socket, GNUNET_NO), NULL, NULL,
+                     GNUNET_NO);
     else
       GNUNET_break (0);
   default:
@@ -1584,10 +1550,8 @@
          (unsigned int) socket->read_sequence_number);
     socket->receiver_window_available = ntohl (ack_msg->receiver_window_size);
     reply = generate_hello_ack (socket, GNUNET_YES);
-    queue_message (socket,
-                   &reply->header, 
-                   &set_state_established,
-                   NULL);    
+    queue_message (socket, &reply->header, &set_state_established, 
+                   NULL, GNUNET_NO);    
     return GNUNET_OK;
   case STATE_ESTABLISHED:
     // call statistics (# ACKs ignored++)
@@ -1663,7 +1627,7 @@
     reply->header.type = 
       htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK);
     reply->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
-    queue_message (socket, reply, NULL, NULL);
+    queue_message (socket, reply, NULL, NULL, GNUNET_NO);
     break;
 
   default:
@@ -1914,11 +1878,8 @@
     htons (sizeof (struct GNUNET_STREAM_MessageHeader));
   receive_close_ack->header.type =
     htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK);
-  queue_message (socket,
-                 receive_close_ack,
-                 &set_state_closed,
-                 NULL);
-  
+  queue_message (socket, receive_close_ack, &set_state_closed,
+                 NULL, GNUNET_NO);  
   /* FIXME: Handle the case where write handle is present; the write operation
      should be deemed as finised and the write continuation callback
      has to be called with the stream status GNUNET_STREAM_SHUTDOWN */
@@ -2029,10 +1990,7 @@
   close_ack = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
   close_ack->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
   close_ack->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK);
-  queue_message (socket,
-                 close_ack,
-                 &set_state_closed,
-                 NULL);
+  queue_message (socket, close_ack, &set_state_closed, NULL, GNUNET_NO);
   if (socket->state == STATE_CLOSED)
     return GNUNET_OK;
 
@@ -2177,7 +2135,8 @@
   {
   case STATE_INIT:
     reply = generate_hello_ack (socket, GNUNET_YES);
-    queue_message (socket, &reply->header, &set_state_hello_wait, NULL);
+    queue_message (socket, &reply->header, &set_state_hello_wait, NULL,
+                   GNUNET_NO);
     GNUNET_assert (GNUNET_SCHEDULER_NO_TASK ==
                    socket->control_retransmission_task_id);
     socket->control_retransmission_task_id =
@@ -2753,10 +2712,7 @@
   socket->state = STATE_INIT;
   /* Send HELLO message */
   message = generate_hello ();
-  queue_message (socket,
-                 message,
-                 &set_state_hello_wait,
-                 NULL);
+  queue_message (socket, message, &set_state_hello_wait, NULL, GNUNET_NO);
   GNUNET_assert (GNUNET_SCHEDULER_NO_TASK ==
                  socket->control_retransmission_task_id);
   socket->control_retransmission_task_id =
@@ -2873,13 +2829,6 @@
     GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
     socket->transmit_handle = NULL;
   }
-  if (NULL != socket->ack_transmit_handle)
-  {
-    GNUNET_MESH_notify_transmit_ready_cancel (socket->ack_transmit_handle);
-    GNUNET_free (socket->ack_msg);
-    socket->ack_msg = NULL;
-    socket->ack_transmit_handle = NULL;
-  }
   /* Stop Tasks using socket->tunnel */
   if (GNUNET_SCHEDULER_NO_TASK != socket->ack_task_id)
   {
@@ -3096,10 +3045,8 @@
            "Existing read handle should be cancelled before shutting"
            " down reading\n");
     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE);
-    queue_message (socket,
-                   msg,
-                   &set_state_receive_close_wait,
-                   NULL);
+    queue_message (socket, msg, &set_state_receive_close_wait, NULL,
+                   GNUNET_NO);
     break;
   case SHUT_WR:
     handle->operation = SHUT_WR;
@@ -3108,10 +3055,8 @@
            "Existing write handle should be cancelled before shutting"
            " down writing\n");
     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE);
-    queue_message (socket,
-                   msg,
-                   &set_state_transmit_close_wait,
-                   NULL);
+    queue_message (socket, msg, &set_state_transmit_close_wait, NULL,
+                   GNUNET_NO);
     break;
   case SHUT_RDWR:
     handle->operation = SHUT_RDWR;
@@ -3124,10 +3069,7 @@
            "Existing read handle should be cancelled before shutting"
            " down reading\n");
     msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE);
-    queue_message (socket,
-                   msg,
-                   &set_state_close_wait,
-                   NULL);
+    queue_message (socket, msg, &set_state_close_wait, NULL, GNUNET_NO);
     break;
   default:
     LOG (GNUNET_ERROR_TYPE_WARNING,
@@ -3206,13 +3148,6 @@
     GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
     socket->transmit_handle = NULL;
   }
-  if (NULL != socket->ack_transmit_handle)
-  {
-    GNUNET_MESH_notify_transmit_ready_cancel (socket->ack_transmit_handle);
-    GNUNET_free (socket->ack_msg);
-    socket->ack_msg = NULL;
-    socket->ack_transmit_handle = NULL;
-  }
   /* Clear existing message queue */
   while (NULL != (head = socket->queue_head)) {
     GNUNET_CONTAINER_DLL_remove (socket->queue_head,




reply via email to

[Prev in Thread] Current Thread [Next in Thread]