gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r16884 - gnunet/src/transport


From: gnunet
Subject: [GNUnet-SVN] r16884 - gnunet/src/transport
Date: Fri, 16 Sep 2011 16:24:29 +0200

Author: wachs
Date: 2011-09-16 16:24:29 +0200 (Fri, 16 Sep 2011)
New Revision: 16884

Modified:
   gnunet/src/transport/plugin_transport_http.h
   gnunet/src/transport/plugin_transport_http_client.c
   gnunet/src/transport/plugin_transport_http_server.c
Log:
client sending & receiving


Modified: gnunet/src/transport/plugin_transport_http.h
===================================================================
--- gnunet/src/transport/plugin_transport_http.h        2011-09-16 14:02:53 UTC 
(rev 16883)
+++ gnunet/src/transport/plugin_transport_http.h        2011-09-16 14:24:29 UTC 
(rev 16884)
@@ -82,6 +82,7 @@
    */
   struct GNUNET_NAT_Handle *nat;
 
+
   /**
    * ipv4 DLL head
    */
@@ -124,7 +125,6 @@
 
   int cur_connections;
   uint32_t last_tag;
-
   /*
    * Server handles
    */
@@ -180,9 +180,9 @@
   struct Plugin *plugin;
 
   /**
-   * The client (used to identify this connection)
+   * message stream tokenizer for incoming data
    */
-  /* void *client; */
+  struct GNUNET_SERVER_MessageStreamTokenizer *msg_tk;
 
   /**
    * Continuation function to call once the transmission buffer
@@ -232,7 +232,8 @@
 
   void *server_recv;
   void *server_send;
-
+  struct GNUNET_TIME_Absolute delay;
+  GNUNET_SCHEDULER_TaskIdentifier reset_task;
   uint32_t tag;
 
 };

Modified: gnunet/src/transport/plugin_transport_http_client.c
===================================================================
--- gnunet/src/transport/plugin_transport_http_client.c 2011-09-16 14:02:53 UTC 
(rev 16883)
+++ gnunet/src/transport/plugin_transport_http_client.c 2011-09-16 14:24:29 UTC 
(rev 16884)
@@ -183,7 +183,9 @@
                    "Connection to '%s'  %s ended\n", GNUNET_i2s(&s->target), 
http_plugin_address_to_string(plugin, s->addr, s->addrlen));
 #endif
          client_disconnect(s);
-         GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, plugin->name,"Notifying 
about ended session to peer `%s' `%s'\n", GNUNET_i2s (&s->target), GNUNET_a2s 
(s->addr, s->addrlen));
+         GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, plugin->name,"Notifying 
about ended session to peer `%s' `%s'\n", GNUNET_i2s (&s->target), 
http_plugin_address_to_string (plugin, s->addr, s->addrlen));
+         if (s->msg_tk != NULL)
+           GNUNET_SERVER_mst_destroy (s->msg_tk);
          notify_session_end (plugin, &s->target, s);
        }
     }
@@ -252,7 +254,149 @@
   return res;
 }
 
+static void
+curl_receive_mst_cb (void *cls, void *client,
+                     const struct GNUNET_MessageHeader *message)
+{
+  struct Session *s = cls;
+  struct Plugin *plugin = s->plugin;
+  struct GNUNET_TRANSPORT_ATS_Information distance[2];
+  struct GNUNET_TIME_Relative delay;
 
+  distance[0].type = htonl (GNUNET_TRANSPORT_ATS_QUALITY_NET_DISTANCE);
+  distance[0].value = htonl (1);
+  distance[1].type = htonl (GNUNET_TRANSPORT_ATS_ARRAY_TERMINATOR);
+  distance[1].value = htonl (0);
+
+  delay = plugin->env->receive (plugin->env->cls, &s->target, message, (const 
struct GNUNET_TRANSPORT_ATS_Information*) &distance, 2, s, s->addr, s->addrlen);
+  s->delay = GNUNET_TIME_absolute_add(GNUNET_TIME_absolute_get(), delay);
+
+  if (GNUNET_TIME_absolute_get().abs_value < s->delay.abs_value)
+  {
+#if VERBOSE_CLIENT
+    GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, plugin->name, "Client: peer `%s' 
address `%s' next read delayed for %llu ms\n",
+                GNUNET_i2s (&s->target), GNUNET_a2s (s->addr, s->addrlen), 
delay);
+#endif
+  }
+}
+
+/**
+* Callback method used with libcurl
+* Method is called when libcurl needs to write data during sending
+* @param stream pointer where to write data
+* @param size size of an individual element
+* @param nmemb count of elements that can be written to the buffer
+* @param ptr destination pointer, passed to the libcurl handle
+* @return bytes read from stream
+*/
+static size_t
+curl_receive_cb (void *stream, size_t size, size_t nmemb, void *cls)
+{
+  struct Session *s = cls;
+  struct Plugin *plugin = s->plugin;
+
+  if (GNUNET_TIME_absolute_get().abs_value < s->delay.abs_value)
+  {
+#if DEBUG_HTTP
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Connection %X: no inbound bandwidth available! Next read was 
delayed for  %llu ms\n",
+                s, GNUNET_TIME_absolute_get_difference(s->delay, 
GNUNET_TIME_absolute_get()).rel_value);
+#endif
+    return 0;
+  }
+
+  if (s->msg_tk == NULL)
+      s->msg_tk = GNUNET_SERVER_mst_create (&curl_receive_mst_cb, s);
+
+  GNUNET_SERVER_mst_receive (s->msg_tk, s, stream, size * nmemb, GNUNET_NO,
+                             GNUNET_NO);
+
+#if VERBOSE_CLIENT
+  GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, plugin->name, "Received %u bytes 
from peer `%s'\n",
+                   size * nmemb,
+                   GNUNET_i2s (&s->target));
+#endif
+  return (size * nmemb);
+}
+
+/**
+ * Callback method used with libcurl
+ * Method is called when libcurl needs to read data during sending
+ * @param stream pointer where to write data
+ * @param size size of an individual element
+ * @param nmemb count of elements that can be written to the buffer
+ * @param ptr source pointer, passed to the libcurl handle
+ * @return bytes written to stream
+ */
+static size_t
+curl_send_cb (void *stream, size_t size, size_t nmemb, void *ptr)
+{
+  size_t bytes_sent = 0;
+
+#if 0
+  struct Session *ps = ptr;
+  struct HTTP_Message *msg = ps->pending_msgs_tail;
+
+  size_t len;
+
+  if (ps->send_active == GNUNET_NO)
+    return CURL_READFUNC_PAUSE;
+  if ((ps->pending_msgs_tail == NULL) && (ps->send_active == GNUNET_YES))
+  {
+#if DEBUG_CONNECTIONS
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Connection %X: No Message to send, pausing connection\n", ps);
+#endif
+    ps->send_active = GNUNET_NO;
+    return CURL_READFUNC_PAUSE;
+  }
+
+  GNUNET_assert (msg != NULL);
+
+  /* data to send */
+  if (msg->pos < msg->size)
+  {
+    /* data fit in buffer */
+    if ((msg->size - msg->pos) <= (size * nmemb))
+    {
+      len = (msg->size - msg->pos);
+      memcpy (stream, &msg->buf[msg->pos], len);
+      msg->pos += len;
+      bytes_sent = len;
+    }
+    else
+    {
+      len = size * nmemb;
+      memcpy (stream, &msg->buf[msg->pos], len);
+      msg->pos += len;
+      bytes_sent = len;
+    }
+  }
+  /* no data to send */
+  else
+  {
+    bytes_sent = 0;
+  }
+
+  if (msg->pos == msg->size)
+  {
+#if DEBUG_CONNECTIONS
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Connection %X: Message with %u bytes sent, removing message 
from queue\n",
+                ps, msg->pos);
+#endif
+    /* Calling transmit continuation  */
+    if (NULL != ps->pending_msgs_tail->transmit_cont)
+      msg->transmit_cont (ps->pending_msgs_tail->transmit_cont_cls,
+                          &(ps->peercontext)->identity, GNUNET_OK);
+    ps->queue_length_cur -= msg->size;
+    remove_http_message (ps, msg);
+  }
+
+#endif
+  return bytes_sent;
+}
+
 int
 client_connect (struct Session *s)
 {
@@ -292,10 +436,10 @@
   curl_easy_setopt (s->client_get, CURLOPT_URL, url);
   //curl_easy_setopt (s->client_get, CURLOPT_HEADERFUNCTION, 
&curl_get_header_cb);
   //curl_easy_setopt (s->client_get, CURLOPT_WRITEHEADER, ps);
-  //curl_easy_setopt (s->client_get, CURLOPT_READFUNCTION, curl_send_cb);
-  //curl_easy_setopt (s->client_get, CURLOPT_READDATA, ps);
-  //curl_easy_setopt (s->client_get, CURLOPT_WRITEFUNCTION, curl_receive_cb);
-  //curl_easy_setopt (s->client_get, CURLOPT_WRITEDATA, ps);
+  curl_easy_setopt (s->client_get, CURLOPT_READFUNCTION, curl_send_cb);
+  curl_easy_setopt (s->client_get, CURLOPT_READDATA, s);
+  curl_easy_setopt (s->client_get, CURLOPT_WRITEFUNCTION, curl_receive_cb);
+  curl_easy_setopt (s->client_get, CURLOPT_WRITEDATA, s);
   curl_easy_setopt (s->client_get, CURLOPT_TIMEOUT_MS,
                     (long) GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.rel_value);
   curl_easy_setopt (s->client_get, CURLOPT_PRIVATE, s);
@@ -323,10 +467,10 @@
   curl_easy_setopt (s->client_put, CURLOPT_PUT, 1L);
   //curl_easy_setopt (s->client_put, CURLOPT_HEADERFUNCTION, 
&curl_put_header_cb);
   //curl_easy_setopt (s->client_put, CURLOPT_WRITEHEADER, ps);
-  //curl_easy_setopt (s->client_put, CURLOPT_READFUNCTION, curl_send_cb);
-  //curl_easy_setopt (s->client_put, CURLOPT_READDATA, ps);
-  //curl_easy_setopt (s->client_put, CURLOPT_WRITEFUNCTION, curl_receive_cb);
-  //curl_easy_setopt (s->client_put, CURLOPT_WRITEDATA, ps);
+  curl_easy_setopt (s->client_put, CURLOPT_READFUNCTION, curl_send_cb);
+  curl_easy_setopt (s->client_put, CURLOPT_READDATA, s);
+  curl_easy_setopt (s->client_put, CURLOPT_WRITEFUNCTION, curl_receive_cb);
+  curl_easy_setopt (s->client_put, CURLOPT_WRITEDATA, s);
   curl_easy_setopt (s->client_put, CURLOPT_TIMEOUT_MS,
                     (long) GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.rel_value);
   curl_easy_setopt (s->client_put, CURLOPT_PRIVATE, s);

Modified: gnunet/src/transport/plugin_transport_http_server.c
===================================================================
--- gnunet/src/transport/plugin_transport_http_server.c 2011-09-16 14:02:53 UTC 
(rev 16883)
+++ gnunet/src/transport/plugin_transport_http_server.c 2011-09-16 14:24:29 UTC 
(rev 16884)
@@ -292,6 +292,9 @@
 
     if (check == GNUNET_NO)
       goto error;
+
+    plugin->cur_connections++;
+
 #if VERBOSE_SERVER
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "server: New inbound connection from 
%s with tag %u\n", GNUNET_h2s_full(&(target.hashPubKey)), tag);
 #endif
@@ -495,6 +498,7 @@
     }
     t = t->next;
   }
+  plugin->cur_connections--;
 
   if ((s->server_send == NULL) && (s->server_recv == NULL))
   {




reply via email to

[Prev in Thread] Current Thread [Next in Thread]