gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r20051 - gnunet/src/stream


From: gnunet
Subject: [GNUnet-SVN] r20051 - gnunet/src/stream
Date: Sun, 26 Feb 2012 23:32:23 +0100

Author: harsha
Date: 2012-02-26 23:32:23 +0100 (Sun, 26 Feb 2012)
New Revision: 20051

Modified:
   gnunet/src/stream/stream_api.c
Log:
-copy buffer and STREAM_read(incomplete)

Modified: gnunet/src/stream/stream_api.c
===================================================================
--- gnunet/src/stream/stream_api.c      2012-02-26 22:31:40 UTC (rev 20050)
+++ gnunet/src/stream/stream_api.c      2012-02-26 22:32:23 UTC (rev 20051)
@@ -219,12 +219,12 @@
   /**
    * The write IO_handle associated with this socket
    */
-  struct GNUNET_STREAM_IOHandle *write_handle;
+  struct GNUNET_STREAM_IOWriteHandle *write_handle;
 
   /**
    * The read IO_handle associated with this socket
    */
-  struct GNUNET_STREAM_IOHandle *read_handle;
+  struct GNUNET_STREAM_IOReadHandle *read_handle;
 
   /**
    * Buffer for storing received messages
@@ -232,6 +232,11 @@
   void *receive_buffer;
 
   /**
+   * Copy buffer pointer; Used during read operations
+   */
+  void *copy_buffer;
+
+  /**
    * The state of the protocol associated with this socket
    */
   enum State state;
@@ -269,6 +274,11 @@
   uint32_t receive_buffer_size;
 
   /**
+   * The receiver buffer boundaries
+   */
+  uint32_t receive_buffer_boundaries[GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH];
+
+  /**
    * receiver's available buffer after the last acknowledged packet
    */
   uint32_t receive_window_available;
@@ -282,6 +292,16 @@
    * The offset after which we are expecting data
    */
   uint32_t read_offset;
+
+  /**
+   * The size of the copy buffer
+   */
+  uint32_t copy_buffer_size;
+  
+  /**
+   * The read offset of copy buffer
+   */
+  uint32_t copy_buffer_read_offset;
 };
 
 
@@ -314,9 +334,9 @@
 
 
 /**
- * The IO Handle
+ * The IO Write Handle
  */
-struct GNUNET_STREAM_IOHandle
+struct GNUNET_STREAM_IOWriteHandle
 {
   /**
    * The packet_buffers associated with this Handle
@@ -330,11 +350,6 @@
   GNUNET_STREAM_AckBitmap ack_bitmap;
 
   /**
-   * receiver's available buffer
-   */
-  uint32_t receive_window_available;
-
-  /**
    * Number of packets sent before waiting for an ack
    *
    * FIXME: Do we need this?
@@ -344,6 +359,23 @@
 
 
 /**
+ * The IO Read Handle
+ */
+struct GNUNET_STREAM_IOReadHandle
+{
+  /**
+   * Callback for the read processor
+   */
+  GNUNET_STREAM_DataProcessor proc;
+
+  /**
+   * The closure pointer for the read processor callback
+   */
+  void *proc_cls;
+};
+
+
+/**
  * Default value in seconds for various timeouts
  */
 static unsigned int default_timeout = 300;
@@ -511,7 +543,8 @@
   ack_msg->header.header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_ACK);
   ack_msg->bitmap = GNUNET_htonll (socket->ack_bitmap);
   ack_msg->base_sequence_number = htonl (socket->read_sequence_number);
-  ack_msg->receive_window_remaining = htonl (socket->receive_window_available);
+  ack_msg->receive_window_remaining = 
+    htonl (RECEIVE_BUFFER_SIZE - socket->receive_buffer_size);
 
   /* Request MESH for sending ACK */
   GNUNET_MESH_notify_transmit_ready (socket->tunnel,
@@ -574,7 +607,7 @@
 write_data_finish_cb (void *cls,
                       struct GNUNET_STREAM_Socket *socket)
 {
-  struct GNUNET_STREAM_IOHandle *io_handle = cls;
+  struct GNUNET_STREAM_IOWriteHandle *io_handle = cls;
 
   io_handle->sent_packets++;
 }
@@ -589,7 +622,7 @@
 static void 
 write_data (struct GNUNET_STREAM_Socket *socket)
 {
-  struct GNUNET_STREAM_IOHandle *io_handle = socket->write_handle;
+  struct GNUNET_STREAM_IOWriteHandle *io_handle = socket->write_handle;
   unsigned int packet;
   int ack_packet;
 
@@ -618,9 +651,9 @@
   packet = ack_packet + 1;
   /* Now send new packets if there is enough buffer space */
   while ( (NULL != io_handle->messages[packet]) &&
-         (io_handle->receive_window_available >= ntohs 
(io_handle->messages[packet]->header.header.size)) )
+         (socket->receive_window_available >= ntohs 
(io_handle->messages[packet]->header.header.size)) )
     {
-      io_handle->receive_window_available -= ntohs 
(io_handle->messages[packet]->header.header.size);
+      socket->receive_window_available -= ntohs 
(io_handle->messages[packet]->header.header.size);
       queue_message (socket,
                      &io_handle->messages[packet]->header,
                      &write_data_finish_cb,
@@ -651,6 +684,7 @@
   const void *payload;
   uint32_t bytes_needed;
   uint32_t relative_offset;
+  uint32_t relative_sequence_number;
   uint16_t size;
 
   size = htons (msg->header.header.size);
@@ -666,9 +700,11 @@
     case STATE_TRANSMIT_CLOSED:
     case STATE_TRANSMIT_CLOSE_WAIT:
 
-      /* check if the message's sequence number is greater than the one we are
+      /* check if the message's sequence number is in the range we are
          expecting */
-      if (ntohl (msg->sequence_number) - socket->read_sequence_number <= 64)
+      relative_sequence_number = 
+        ntohl (msg->sequence_number) - socket->read_sequence_number;
+      if ( relative_sequence_number > 64)
         {
           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                       "Ignoring received message with sequence number %d",
@@ -688,8 +724,6 @@
               socket->receive_buffer = GNUNET_realloc (socket->receive_buffer,
                                                        bytes_needed);
               socket->receive_buffer_size = bytes_needed;
-              socket->receive_window_available = 
-                RECEIVE_BUFFER_SIZE - socket->receive_buffer_size;
             }
           else
             {
@@ -700,16 +734,18 @@
             }
         }
       
-      /* Copy Data to buffer and send acknowledgement for this packet */
+      /* Copy Data to buffer */
       payload = &msg[1];
+      GNUNET_assert (relative_offset + size <= socket->receive_buffer_size);
       memcpy (socket->receive_buffer + relative_offset,
               payload,
               size);
+      socket->receive_buffer_boundaries[relative_sequence_number] = 
+        relative_offset + size;
       
       /* Modify the ACK bitmap */
       ackbitmap_modify_bit (&socket->ack_bitmap,
-                            ntohl (msg->sequence_number) -
-                            socket->read_sequence_number,
+                            relative_sequence_number,
                             GNUNET_YES);
 
       /* Start ACK sending task if one is not already present */
@@ -1427,7 +1463,7 @@
         }
 
       socket->write_handle->ack_bitmap = GNUNET_ntohll (ack->bitmap);
-      socket->write_handle->receive_window_available = 
+      socket->receive_window_available = 
         ntohl (ack->receive_window_remaining);
       write_data (socket);
       break;
@@ -1617,6 +1653,53 @@
 }
 
 
+/**
+ * Task for calling the read processor
+ *
+ * @param cls the socket
+ */
+static void
+call_read_processor_task (void *cls,
+                          const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+  struct GNUNET_STREAM_Socket *socket = cls;
+  size_t read_size;
+  size_t valid_read_size;
+
+  if (tc->reason == GNUNET_SCHEDULER_REASON_SHUTDOWN) return;
+
+  GNUNET_assert (NULL != socket->read_handle);
+  GNUNET_assert (NULL != socket->read_handle->proc);
+  GNUNET_assert (NULL != socket->copy_buffer);
+  GNUNET_assert (0 != socket->copy_buffer_size);
+
+  valid_read_size = socket->copy_buffer_size - socket->copy_buffer_read_offset;
+  GNUNET_assert (0 != valid_read_size);
+
+  read_size = socket->read_handle->proc (socket->read_handle->proc_cls,
+                                         socket->status,
+                                         socket->copy_buffer 
+                                         + socket->copy_buffer_read_offset,
+                                         valid_read_size);
+
+  GNUNET_assert (read_size <= valid_read_size);
+  socket->copy_buffer_read_offset += read_size;
+
+  /* Free the copy buffer once it has been read entirely */
+  if (socket->copy_buffer_read_offset == socket->copy_buffer_size)
+    {
+      GNUNET_free (socket->copy_buffer);
+      socket->copy_buffer = NULL;
+      socket->copy_buffer_size = 0;
+      socket->copy_buffer_read_offset = 0;
+    }
+
+  /* Free the read handle */
+  GNUNET_free (socket->read_handle);
+  socket->read_handle = NULL;
+}
+
+
 /*****************/
 /* API functions */
 /*****************/
@@ -1878,7 +1961,7 @@
  * @param write_cont_cls the closure
  * @return handle to cancel the operation
  */
-struct GNUNET_STREAM_IOHandle *
+struct GNUNET_STREAM_IOWriteHandle *
 GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket,
                      const void *data,
                      size_t size,
@@ -1888,7 +1971,7 @@
 {
   unsigned int num_needed_packets;
   unsigned int packet;
-  struct GNUNET_STREAM_IOHandle *io_handle;
+  struct GNUNET_STREAM_IOWriteHandle *io_handle;
   uint32_t packet_size;
   uint32_t payload_size;
   struct GNUNET_STREAM_DataMessage *data_msg;
@@ -1912,8 +1995,7 @@
   if (GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH * max_payload_size < size)
     size = GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH  * max_payload_size;
   num_needed_packets = (size + (max_payload_size - 1)) / max_payload_size;
-  io_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOHandle));
-  io_handle->receive_window_available = socket->receive_window_available;
+  io_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOWriteHandle));
   sweep = data;
   /* Divide the given buffer into packets for sending */
   for (packet=0; packet < num_needed_packets; packet++)
@@ -1966,22 +2048,59 @@
  * @param proc_cls the closure for proc
  * @return handle to cancel the operation
  */
-struct GNUNET_STREAM_IOHandle *
-GNUNET_STREAM_read (const struct GNUNET_STREAM_Socket *socket,
+struct GNUNET_STREAM_IOReadHandle *
+GNUNET_STREAM_read (struct GNUNET_STREAM_Socket *socket,
                     struct GNUNET_TIME_Relative timeout,
                    GNUNET_STREAM_DataProcessor proc,
                    void *proc_cls)
 {
+  unsigned int packet;
+  struct GNUNET_STREAM_IOReadHandle *read_handle;
+  
+  /* Return NULL if there is already a read handle; the user has to cancel that
+  first before continuing or has to wait until it is completed */
+  if (NULL != socket->read_handle) return NULL;
+
+  read_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOReadHandle));
+  read_handle->proc = proc;
+  socket->read_handle = read_handle;
+
+  /* if previous copy buffer is still not read call the data processor on it */
+  if (NULL != socket->copy_buffer)
+    {
+      GNUNET_SCHEDULER_add_now (&call_read_processor_task,
+                                socket);
+    }
+
   /* Check the bitmap for any holes */
+  for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
+    {
+      if (GNUNET_NO == ackbitmap_is_bit_set (&socket->ack_bitmap,
+                                             packet))
+        break;
+    }
 
-  /* Deem the data from the starting of the bitmap upto a hole as available
-  data */
+  if (0 == packet)              /* The first packet is still missing */
+    {
+      /* We can't do anything until it arrives */
+    }
+  else
+    {
+      /* Copy data to copy buffer */
+      socket->copy_buffer = 
+        GNUNET_malloc (socket->receive_buffer_boundaries[packet-1]);
+      
+      /* Shift the bitmap */
+      socket->ack_bitmap << packet;
 
-  /* Create an IO handle */
+      /* Set read_sequence_number */
+      socket->read_sequence_number += packet;
 
-  /* Call the Data processor with this available data */
-  
-  /* Update the read_sequence_number to the first hole in the bitmap */
+      /* Set read_offset */
+      socket->read_offset += packet;
 
-  /* Shift the bitmap so that the first hole is now at the start */
+      /* FIXME: Fix relative calucations in receive buffer management */
+    }
+
+  return read_handle;
 }




reply via email to

[Prev in Thread] Current Thread [Next in Thread]