gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r20043 - gnunet/src/stream


From: gnunet
Subject: [GNUnet-SVN] r20043 - gnunet/src/stream
Date: Sun, 26 Feb 2012 16:50:19 +0100

Author: harsha
Date: 2012-02-26 16:50:19 +0100 (Sun, 26 Feb 2012)
New Revision: 20043

Modified:
   gnunet/src/stream/stream_api.c
   gnunet/src/stream/stream_protocol.h
Log:
-receive buffer (re)allocation in handle_data

Modified: gnunet/src/stream/stream_api.c
===================================================================
--- gnunet/src/stream/stream_api.c      2012-02-26 15:26:02 UTC (rev 20042)
+++ gnunet/src/stream/stream_api.c      2012-02-26 15:50:19 UTC (rev 20043)
@@ -139,7 +139,7 @@
   struct MessageQueue *next;
 
   /**
-   * The next message in queue. Should be NULL in the last message
+   * The next message in queue. Should be NULL in the first message
    */
   struct MessageQueue *prev;
 };
@@ -264,9 +264,24 @@
   uint32_t read_sequence_number;
 
   /**
+   * The receiver buffer size
+   */
+  uint32_t receive_buffer_size;
+
+  /**
    * receiver's available buffer after the last acknowledged packet
    */
   uint32_t receive_window_available;
+
+  /**
+   * The offset pointer used during write operation
+   */
+  uint32_t write_offset;
+
+  /**
+   * The offset after which we are expecting data
+   */
+  uint32_t read_offset;
 };
 
 
@@ -298,8 +313,6 @@
 };
 
 
-
-
 /**
  * The IO Handle
  */
@@ -635,10 +648,12 @@
              const struct GNUNET_STREAM_DataMessage *msg,
              const struct GNUNET_ATS_Information*atsi)
 {
+  const void *payload;
+  uint32_t bytes_needed;
+  uint32_t relative_offset;
   uint16_t size;
-  const void *payload;
 
-  size = msg->header.header.size;
+  size = htons (msg->header.header.size);
   if (size < sizeof (struct GNUNET_STREAM_DataMessage))
     {
       GNUNET_break_op (0);
@@ -650,23 +665,44 @@
     case STATE_ESTABLISHED:
     case STATE_TRANSMIT_CLOSED:
     case STATE_TRANSMIT_CLOSE_WAIT:
-      GNUNET_assert (NULL != socket->receive_buffer);
+
       /* check if the message's sequence number is greater than the one we are
          expecting */
-      if (ntohl (msg->sequence_number) - socket->read_sequence_number >= 64)
-        {/* We are receiving a retransmitted message */
+      if (ntohl (msg->sequence_number) - socket->read_sequence_number <= 64)
+        {
           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                      "Message with sequence number %d retransmitted\n",
-                      ntohl (socket->read_sequence_number));
+                      "Ignoring received message with sequence number %d",
+                      ntohl (msg->sequence_number));
           return GNUNET_YES;
         }
 
+      /* Check if we have to allocate the buffer */
+      size -= sizeof (struct GNUNET_STREAM_DataMessage);
+      relative_offset = ntohl (msg->offset) - socket->read_offset;
+      bytes_needed = relative_offset + size;
+      
+      if (bytes_needed > socket->receive_buffer_size)
+        {
+          if (bytes_needed <= RECEIVE_BUFFER_SIZE)
+            {
+              socket->receive_buffer = GNUNET_realloc (socket->receive_buffer,
+                                                       bytes_needed);
+              socket->receive_buffer_size = bytes_needed;
+              socket->receive_window_available = 
+                RECEIVE_BUFFER_SIZE - socket->receive_buffer_size;
+            }
+          else
+            {
+              GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                          "Cannot accommodate packet %d as buffer is full\n",
+                          ntohl (msg->sequence_number));
+              return GNUNET_YES;
+            }
+        }
+      
       /* Copy Data to buffer and send acknowledgement for this packet */
-      size -= sizeof (struct GNUNET_STREAM_DataMessage);
       payload = &msg[1];
-      memcpy (socket->receive_buffer 
-              + (ntohl (msg->sequence_number) - socket->read_sequence_number)
-              * MAX_PACKET_SIZE,
+      memcpy (socket->receive_buffer + relative_offset,
               payload,
               size);
       
@@ -736,8 +772,8 @@
                        struct GNUNET_STREAM_Socket *socket)
 {
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Attaining ESTABLISHED state\n");
-  /* Initialize the receive buffer */
-  socket->receive_buffer = GNUNET_malloc (RECEIVE_BUFFER_SIZE);
+  socket->write_offset = 0;
+  socket->read_offset = 0;
   socket->state = STATE_ESTABLISHED;
 }
 
@@ -1162,7 +1198,8 @@
       socket->read_sequence_number = ntohl (ack_message->sequence_number);
       socket->receive_window_available = 
         ntohl (ack_message->receive_window_size);
-      socket->state = STATE_ESTABLISHED;
+      /* Attain ESTABLISHED state */
+      set_state_established (NULL, socket);
     }
   else
     {
@@ -1852,7 +1889,8 @@
   unsigned int num_needed_packets;
   unsigned int packet;
   struct GNUNET_STREAM_IOHandle *io_handle;
-  size_t packet_size;
+  uint32_t packet_size;
+  uint32_t payload_size;
   struct GNUNET_STREAM_DataMessage *data_msg;
   const void *sweep;
 
@@ -1882,12 +1920,14 @@
     {
       if ((packet + 1) * max_payload_size < size) 
         {
+          payload_size = max_payload_size;
           packet_size = MAX_PACKET_SIZE;
         }
       else 
         {
-          packet_size = size - packet * max_payload_size
-            + sizeof (struct GNUNET_STREAM_DataMessage);
+          payload_size = size - packet * max_payload_size;
+          packet_size =  payload_size + sizeof (struct
+                                                GNUNET_STREAM_DataMessage); 
         }
       io_handle->messages[packet] = GNUNET_malloc (packet_size);
       io_handle->messages[packet]->header.header.size = htons (packet_size);
@@ -1895,6 +1935,7 @@
         htons (GNUNET_MESSAGE_TYPE_STREAM_DATA);
       io_handle->messages[packet]->sequence_number =
         htons (socket->write_sequence_number++);
+      io_handle->messages[packet]->offset = htons (socket->write_offset);
 
       /* FIXME: Remove the fixed delay for ack deadline; Set it to the value
          determined from RTT */
@@ -1905,8 +1946,9 @@
       /* Copy data from given buffer to the packet */
       memcpy (&data_msg[1],
               sweep,
-              packet_size - sizeof (struct GNUNET_STREAM_DataMessage));
-      sweep += packet_size - sizeof (struct GNUNET_STREAM_DataMessage);
+              payload_size);
+      sweep += payload_size;
+      socket->write_offset += payload_size;
     }
   socket->write_handle = io_handle;
   write_data (socket);

Modified: gnunet/src/stream/stream_protocol.h
===================================================================
--- gnunet/src/stream/stream_protocol.h 2012-02-26 15:26:02 UTC (rev 20042)
+++ gnunet/src/stream/stream_protocol.h 2012-02-26 15:50:19 UTC (rev 20043)
@@ -89,9 +89,6 @@
    * Offset of the packet in the overall stream, modulo 2^32; allows
    * the receiver to calculate where in the destination buffer the
    * message should be placed.  In network byte order.
-   *
-   * FIXME: if all the packets except the last one are of constant size we
-   * don't need this anymore
    */
   uint32_t offset GNUNET_PACKED;
 




reply via email to

[Prev in Thread] Current Thread [Next in Thread]