gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r5094 - GNUnet/src/transports


From: gnunet
Subject: [GNUnet-SVN] r5094 - GNUnet/src/transports
Date: Sat, 16 Jun 2007 18:29:30 -0600 (MDT)

Author: grothoff
Date: 2007-06-16 18:29:29 -0600 (Sat, 16 Jun 2007)
New Revision: 5094

Modified:
   GNUnet/src/transports/http.c
Log:
http improvements

Modified: GNUnet/src/transports/http.c
===================================================================
--- GNUnet/src/transports/http.c        2007-06-16 23:23:19 UTC (rev 5093)
+++ GNUnet/src/transports/http.c        2007-06-17 00:29:29 UTC (rev 5094)
@@ -25,11 +25,17 @@
  *
  * TODO:
  * - connection timeout (shutdown inactive connections)
- * - proper connection re-establishment
- * - nothing copies TO wbuff, only from (see FIXMEs)
- * - free resources allocated for PUT!
- * - integrate MHD thread into CURL thread
- * - why does valgrind show "conditional jump depends on uninit values" for 
curl_multi_perform?
+ *   => CURL can help do this automatically, need to do it with MHD
+ *      and query CURL for timed-out connections (and then clean up)
+ * - proper connection re-establishment (i.e., if a GET times out or
+ *      dies otherwise, we need to re-start the TSession if the
+ *      core wants to keep using it!)
+ * - free resources allocated for PUT inside of CURL
+ *      select loop (as soon as PUT is complete)
+ * - bound the number of concurrent PUTs for a given
+ *      connection (to 1 + urgent?)
+ * - why does valgrind show "conditional jump depends on uninit values"
+ *   for curl_multi_perform?
  * - where does the 1s loopback-ping latency come from?
  */
 
@@ -77,6 +83,21 @@
 
 } HostAddress;
 
+struct HTTPPutData {
+  struct HTTPPutData * next;
+
+  char * msg;
+
+  CURL * curl_put;
+
+  unsigned int size;
+
+  unsigned int pos;
+
+  int done;
+
+};
+
 /**
  * Transport Session handle.
  */
@@ -190,25 +211,17 @@
        */
       char * url;
 
+      /**
+       * Linked list of PUT operations.
+       */
+      struct HTTPPutData * puts;
+
     } client;
 
   } cs;
 
 } HTTPSession;
 
-struct HTTPPutData {
-  struct HTTPPutData * next;
-
-  char * msg;
-
-  CURL * curl_put;
-
-  unsigned int size;
-
-  unsigned int pos;
-
-};
-
 /* *********** globals ************* */
 
 /**
@@ -265,10 +278,6 @@
  */
 static UPnP_ServiceAPI * upnp;
 
-/**
- * List of active PUT requests.
- */
-static struct HTTPPutData * putHead;
 
 /**
  * Lock for access to mutable state of the module,
@@ -325,6 +334,8 @@
  */
 static int httpDisconnect(TSession * tsession) {
   HTTPSession * httpsession = tsession->internal;
+  struct HTTPPutData * pos;
+  struct HTTPPutData * next;
 
   if (httpsession != NULL) {
     MUTEX_LOCK(httpsession->lock);
@@ -340,6 +351,17 @@
                               httpsession->cs.client.get);
       curl_easy_cleanup(httpsession->cs.client.get);
       FREE(httpsession->cs.client.url);
+      pos = httpsession->cs.client.puts;
+      while (pos != NULL) {
+       next = pos->next;
+       curl_multi_remove_handle(curl_multi,
+                                pos->curl_put);
+       curl_easy_cleanup(pos->curl_put);
+       FREE(pos->msg);
+       FREE(pos);
+       pos = next;
+      }
+
     } else {
       MHD_destroy_response(httpsession->cs.server.get);
     }
@@ -400,7 +422,7 @@
     GE_BREAK(NULL, 0);
     return SYSERR;
   }
-  httpSession = (HTTPSession*) tsession->internal;
+  httpSession = tsession->internal;
   MUTEX_LOCK(httpSession->lock);
   if (httpSession->destroyed == YES) {
     MUTEX_UNLOCK(httpSession->lock);
@@ -455,7 +477,7 @@
           "HTTP port is 0, will only send using HTTP.\n");
     return NULL; /* HTTP transport is configured SEND-only! */
   }
-  msg = (P2P_hello_MESSAGE *) MALLOC(sizeof(P2P_hello_MESSAGE) + 
sizeof(HostAddress));
+  msg = MALLOC(sizeof(P2P_hello_MESSAGE) + sizeof(HostAddress));
   haddr = (HostAddress*) &msg[1];
 
   if (! ( ( (upnp != NULL) &&
@@ -590,21 +612,13 @@
 
   if (httpSession == NULL) {
     httpSession = MALLOC(sizeof(HTTPSession));
-    httpSession->destroyed = NO;
-    httpSession->rpos1 = 0;
-    httpSession->rpos2 = 0;
-    httpSession->rsize2 = 0;
-    httpSession->rbuff2 = NULL;
-    httpSession->wsize = 0;
-    httpSession->woff = 0;
-    httpSession->wpos = 0;
-    httpSession->wbuff = NULL;
+    memset(httpSession,
+          0,
+          sizeof(HTTPSession));
     httpSession->sender = *(coreAPI->myIdentity);
     httpSession->lock = MUTEX_CREATE(YES);
     httpSession->users = 1; /* us only, core has not seen this tsession! */
     httpSession->lastUse = get_time();
-    httpSession->is_client = NO;
-    httpSession->cs.client.get = NULL;
     tsession = MALLOC(sizeof(TSession));
     tsession->ttype = HTTP_PROTOCOL_NUMBER;
     tsession->internal = httpSession;
@@ -784,10 +798,28 @@
   CURLMcode mret;
   char * url;
   EncName enc;
+  int i;
 
-  /* FIXME: check if we have a GET pending for
-     this peer, and if so, use that! */
+  /* check if we have a session pending for this peer */
+  tsession = NULL;
+  MUTEX_LOCK(httplock);
+  for (i=0;i<tsessionCount;i++) {
+    if (0 == memcmp(&hello->senderIdentity,
+                   &tsessions[i]->peer,
+                   sizeof(PeerIdentity))) {
+      tsession = tsessions[i];
+      break;
+    }
+  }
+  if ( (tsession != NULL) &&
+       (OK == httpAssociate(tsession)) ) {
+    *tsessionPtr = tsession;
+    MUTEX_UNLOCK(httplock);
+    return OK;
+  }
+  MUTEX_UNLOCK(httplock);
 
+  /* no session pending, initiate a new one! */
   curl_get = curl_easy_init();
   if (curl_get == NULL)
     return SYSERR;
@@ -833,6 +865,9 @@
                   &receiveContentCallback);
 
   httpSession = MALLOC(sizeof(HTTPSession));
+  memset(httpSession,
+        0,
+        sizeof(HTTPSession));
   httpSession->cs.client.url = url;
   CURL_EASY_SETOPT(curl_get,
                   CURLOPT_WRITEDATA,
@@ -852,15 +887,6 @@
   }
 
   /* create SESSION */
-  httpSession->destroyed = NO;
-  httpSession->rpos1 = 0;
-  httpSession->rpos2 = 0;
-  httpSession->rsize2 = 0;
-  httpSession->rbuff2 = NULL;
-  httpSession->wsize = 0;
-  httpSession->woff = 0;
-  httpSession->wpos = 0;
-  httpSession->wbuff = NULL;
   httpSession->sender = hello->senderIdentity;
   httpSession->lock = MUTEX_CREATE(YES);
   httpSession->users = 1; /* us only, core has not seen this tsession! */
@@ -950,57 +976,152 @@
   CURL * curl_put;
   CURLMcode mret;
   MESSAGE_HEADER * hdr;
+  char * tmp;
 
-  /* FIXME: check if we have a GET pending for
-     this peer, and if so, use that! */
+  if (httpSession->is_client) {
+    if (size >= MAX_BUFFER_SIZE)
+      return SYSERR;
+    if (size == 0) {
+      GE_BREAK(NULL, 0);
+      return SYSERR;
+    }
+    putData = MALLOC(sizeof(struct HTTPPutData));
+    memset(putData,
+          0,
+          sizeof(struct HTTPPutData));
+    putData->msg = MALLOC(size + sizeof(MESSAGE_HEADER));
+    hdr = (MESSAGE_HEADER*) putData->msg;
+    hdr->size = htons(size + sizeof(MESSAGE_HEADER));
+    hdr->type = htons(0);
+    memcpy(&putData->msg[sizeof(MESSAGE_HEADER)],
+          msg,
+          size);
+    putData->size = size + sizeof(MESSAGE_HEADER);
+    MUTEX_LOCK(httpSession->lock);
+    curl_put = create_curl_put(httpSession,
+                              putData,
+                              size + sizeof(MESSAGE_HEADER)); 
+    if (curl_put == NULL) {
+      MUTEX_UNLOCK(httpSession->lock);
+      FREE(putData->msg);
+      FREE(putData);
+      return SYSERR;
+    }
+    putData->curl_put = curl_put;
+    putData->next = httpSession->cs.client.puts;
+    httpSession->cs.client.puts = putData;  
+    MUTEX_UNLOCK(httpSession->lock);
+    MUTEX_LOCK(httplock);
+    mret = curl_multi_add_handle(curl_multi, curl_put);
+    if (mret != CURLM_OK) {
+      GE_LOG(coreAPI->ectx,
+            GE_ERROR | GE_ADMIN | GE_USER | GE_BULK,
+            _("%s failed at %s:%d: `%s'\n"),
+            "curl_multi_add_handle",
+            __FILE__,
+            __LINE__,
+            curl_multi_strerror(mret));
+      putData->done = YES;
+      MUTEX_UNLOCK(httplock);
+      return SYSERR;
+    }
+    MUTEX_UNLOCK(httplock);
+    return OK;
+  } else { /* httpSession->isClient == false */
+    MUTEX_LOCK(httpSession->lock);
+    if (httpSession->wsize == 0) 
+      GROW(httpSession->wbuff,
+          httpSession->wsize,
+          HTTP_BUF_SIZE);    
+    if (httpSession->wpos + size > httpSession->wsize) {
+      /* need to grow or discard */
+      if (! important) {
+       MUTEX_UNLOCK(httpSession->lock);
+       return NO;
+      }
+      tmp = MALLOC(httpSession->wpos + size);
+      memcpy(tmp,
+            &httpSession->wbuff[httpSession->woff],
+            httpSession->wpos);
+      FREE(httpSession->wbuff);
+      httpSession->wbuff = tmp;
+      httpSession->wsize = httpSession->wpos + size;
+      httpSession->woff = 0;
+      httpSession->wpos = httpSession->wpos + size;
+    } else {
+      /* fits without growing */
+      if (httpSession->wpos + httpSession->woff + size > httpSession->wsize) {
+       /* need to compact first */
+       memmove(httpSession->wbuff,
+               &httpSession->wbuff[httpSession->woff],
+               httpSession->wpos);
+       httpSession->woff = 0;
+      }
+      /* append */
+      memcpy(&httpSession->wbuff[httpSession->woff + httpSession->wpos],
+            msg,
+            size);
+      httpSession->wpos += size;
+    }    
+    MUTEX_UNLOCK(httpSession->lock);
+    return OK;
+  }
+}
 
-  if (size >= MAX_BUFFER_SIZE)
-    return SYSERR;
-  if (size == 0) {
-    GE_BREAK(NULL, 0);
-    return SYSERR;
-  }
-  putData = MALLOC(sizeof(struct HTTPPutData));
-  putData->msg = MALLOC(size + sizeof(MESSAGE_HEADER));
-  hdr = (MESSAGE_HEADER*) putData->msg;
-  hdr->size = htons(size + sizeof(MESSAGE_HEADER));
-  hdr->type = htons(0);
-  memcpy(&putData->msg[sizeof(MESSAGE_HEADER)],
-        msg,
-        size);
-  putData->size = size + sizeof(MESSAGE_HEADER);
-  MUTEX_LOCK(httpSession->lock);
-  curl_put = create_curl_put(httpSession,
-                            putData,
-                            size + sizeof(MESSAGE_HEADER)); 
-  MUTEX_UNLOCK(httpSession->lock);
-  putData->curl_put = curl_put;
-  if (curl_put == NULL) {
-    FREE(putData->msg);
-    FREE(putData);
-    return SYSERR;
-  }
+/**
+ * Function called to cleanup dead connections
+ * (completed PUTs, GETs that have timed out,
+ * etc.).
+ */
+static void 
+cleanup_connections() {
+  int i;
+  HTTPSession * s;
+  struct HTTPPutData * prev;
+  struct HTTPPutData * pos;
+
   MUTEX_LOCK(httplock);
-  putData->next = putHead;
-  putHead = putData;
-  mret = curl_multi_add_handle(curl_multi, curl_put);
-  if (mret != CURLM_OK) {
-    GE_LOG(coreAPI->ectx,
-          GE_ERROR | GE_ADMIN | GE_USER | GE_BULK,
-          _("%s failed at %s:%d: `%s'\n"),
-          "curl_multi_add_handle",
-          __FILE__,
-          __LINE__,
-          curl_multi_strerror(mret));
-    putHead = putData->next;
-    curl_easy_cleanup(curl_put);
-    FREE(putData->msg);
-    FREE(putData);
-    MUTEX_UNLOCK(httplock);
-    return SYSERR;
+  for (i=0;i<tsessionCount;i++) {
+    s = tsessions[i]->internal;
+    if (s->is_client) {
+      prev = NULL;
+      pos = s->cs.client.puts;
+      while (pos != NULL) {
+       /* FIXME: check if CURL has timed out
+          the GET operation! If so, clean up! 
+          (and make sure we re-establish GET 
+          as needed!) */
+       
+
+       if (pos->done) {
+         if (prev == NULL)
+           s->cs.client.puts = pos->next;
+         else
+           prev->next = pos->next;
+         FREE(pos->msg);
+         curl_multi_remove_handle(curl_multi,
+                                  pos->curl_put);
+         curl_easy_cleanup(pos->curl_put);
+         FREE(pos);      
+         if (prev == NULL)
+           pos = s->cs.client.puts;
+         else
+           pos = pos->next;
+         continue;
+       }
+       prev = pos;
+       pos = pos->next;
+      }
+    } else {
+      /* FIXME: add code to close MHD connection
+        from the server side (timeout!); need
+        to 
+        A) tell GET callback to return "end of transmission"
+        B) destroy response object
+      */
+    }
   }
-  MUTEX_UNLOCK(httplock);
-  return OK;
+  MUTEX_UNLOCK(httplock);  
 }
 
 static void *
@@ -1033,6 +1154,12 @@
             curl_multi_strerror(mret));
       break;
     }
+    if (mhd_daemon != NULL)
+      MHD_get_fdset(mhd_daemon,
+                   &rs,
+                   &ws,
+                   &es,
+                   &max);
     /* CURL requires a regular timeout... */
     tv.tv_sec = 0;
     tv.tv_usec = 1000;
@@ -1045,6 +1172,9 @@
       break;
     running = 0;
     curl_multi_perform(curl_multi, &running);
+    if (mhd_daemon != NULL)
+      MHD_run(mhd_daemon);
+    cleanup_connections();
   }
   return NULL;
 }
@@ -1065,7 +1195,7 @@
   port = getGNUnetHTTPPort();
   if ( (mhd_daemon == NULL) &&
        (port != 0) ) {
-    mhd_daemon = MHD_start_daemon(MHD_USE_SELECT_INTERNALLY | MHD_USE_IPv4,
+    mhd_daemon = MHD_start_daemon(MHD_USE_IPv4,
                                  port,
                                  &acceptPolicyCallback,
                                  NULL,





reply via email to

[Prev in Thread] Current Thread [Next in Thread]