gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r5309 - in GNUnet/src: applications/dht/module applications


From: gnunet
Subject: [GNUnet-SVN] r5309 - in GNUnet/src: applications/dht/module applications/fs/module applications/getoption applications/identity applications/stats applications/tbench applications/tracekit applications/traffic applications/vpn include server transports util/network
Date: Sun, 15 Jul 2007 11:46:06 -0600 (MDT)

Author: grothoff
Date: 2007-07-15 11:46:06 -0600 (Sun, 15 Jul 2007)
New Revision: 5309

Modified:
   GNUnet/src/applications/dht/module/cs.c
   GNUnet/src/applications/fs/module/querymanager.c
   GNUnet/src/applications/getoption/getoption.c
   GNUnet/src/applications/identity/identity.c
   GNUnet/src/applications/stats/statistics.c
   GNUnet/src/applications/tbench/tbench.c
   GNUnet/src/applications/tracekit/tracekit.c
   GNUnet/src/applications/traffic/traffic.c
   GNUnet/src/applications/vpn/vpn.c
   GNUnet/src/include/gnunet_core.h
   GNUnet/src/server/connection.c
   GNUnet/src/server/tcpserver.c
   GNUnet/src/server/tcpserver.h
   GNUnet/src/transports/tcp_helper.c
   GNUnet/src/util/network/select.c
Log:
stc

Modified: GNUnet/src/applications/dht/module/cs.c
===================================================================
--- GNUnet/src/applications/dht/module/cs.c     2007-07-15 16:08:22 UTC (rev 
5308)
+++ GNUnet/src/applications/dht/module/cs.c     2007-07-15 17:46:06 UTC (rev 
5309)
@@ -121,7 +121,7 @@
           __FILE__,
           __LINE__, ntohl (value->size) - sizeof (DataContainer), &value[1]);
 #endif
-  if (OK != coreAPI->sendToClient (record->client, &msg->header))
+  if (OK != coreAPI->sendToClient (record->client, &msg->header, YES))
     {
       GE_LOG (coreAPI->ectx,
               GE_ERROR | GE_IMMEDIATE | GE_USER,

Modified: GNUnet/src/applications/fs/module/querymanager.c
===================================================================
--- GNUnet/src/applications/fs/module/querymanager.c    2007-07-15 16:08:22 UTC 
(rev 5308)
+++ GNUnet/src/applications/fs/module/querymanager.c    2007-07-15 17:46:06 UTC 
(rev 5309)
@@ -193,7 +193,7 @@
 #endif
           if (stats != NULL)
             stats->change (stat_replies_transmitted, 1);
-          coreAPI->sendToClient (trackers[i]->client, &rc->header);
+          coreAPI->sendToClient (trackers[i]->client, &rc->header, NO);
           FREE (rc);
         }
     }

Modified: GNUnet/src/applications/getoption/getoption.c
===================================================================
--- GNUnet/src/applications/getoption/getoption.c       2007-07-15 16:08:22 UTC 
(rev 5308)
+++ GNUnet/src/applications/getoption/getoption.c       2007-07-15 17:46:06 UTC 
(rev 5309)
@@ -60,7 +60,7 @@
   rep->header.size = htons (sizeof (MESSAGE_HEADER) + strlen (val) + 1);
   memcpy (rep->value, val, strlen (val) + 1);
   rep->header.type = htons (CS_PROTO_GET_OPTION_REPLY);
-  ret = coreAPI->sendToClient (sock, &rep->header);
+  ret = coreAPI->sendToClient (sock, &rep->header, YES);
   FREE (rep);
   FREE (val);
   return ret;

Modified: GNUnet/src/applications/identity/identity.c
===================================================================
--- GNUnet/src/applications/identity/identity.c 2007-07-15 16:08:22 UTC (rev 
5308)
+++ GNUnet/src/applications/identity/identity.c 2007-07-15 17:46:06 UTC (rev 
5309)
@@ -1215,7 +1215,7 @@
   if (hello == NULL)
     return SYSERR;
   hello->header.type = htons (CS_PROTO_identity_HELLO);
-  ret = coreAPI->sendToClient (sock, &hello->header);
+  ret = coreAPI->sendToClient (sock, &hello->header, YES);
   FREE (hello);
   return ret;
 }
@@ -1234,7 +1234,7 @@
                       ntohs (message->size) - sizeof (MESSAGE_HEADER),
                       &reply.sig))
     return SYSERR;
-  return coreAPI->sendToClient (sock, &reply.header);
+  return coreAPI->sendToClient (sock, &reply.header, YES);
 }
 
 static int
@@ -1288,7 +1288,7 @@
   reply->bpm = htonl (bpm);
   memcpy (&reply[1], address, len);
   FREENONNULL (address);
-  ret = coreAPI->sendToClient (sock, &reply->header);
+  ret = coreAPI->sendToClient (sock, &reply->header, YES);
   FREE (reply);
   return ret;
 }

Modified: GNUnet/src/applications/stats/statistics.c
===================================================================
--- GNUnet/src/applications/stats/statistics.c  2007-07-15 16:08:22 UTC (rev 
5308)
+++ GNUnet/src/applications/stats/statistics.c  2007-07-15 17:46:06 UTC (rev 
5309)
@@ -319,7 +319,7 @@
       /* printf("writing message of size %d with stats %d to %d out of %d to 
socket\n",
          ntohs(statMsg->header.size),
          start, end, statCounters); */
-      if (SYSERR == coreAPI->sendToClient (sock, &statMsg->header))
+      if (SYSERR == coreAPI->sendToClient (sock, &statMsg->header, YES))
         break;                  /* abort, socket error! */
       start = end;
     }

Modified: GNUnet/src/applications/tbench/tbench.c
===================================================================
--- GNUnet/src/applications/tbench/tbench.c     2007-07-15 16:08:22 UTC (rev 
5308)
+++ GNUnet/src/applications/tbench/tbench.c     2007-07-15 17:46:06 UTC (rev 
5309)
@@ -377,7 +377,7 @@
   reply.variance_loss = sum_variance_loss / (iterations - 1);
   FREE (results);
   results = NULL;
-  return coreAPI->sendToClient (client, &reply.header);
+  return coreAPI->sendToClient (client, &reply.header, YES);
 }
 
 /**

Modified: GNUnet/src/applications/tracekit/tracekit.c
===================================================================
--- GNUnet/src/applications/tracekit/tracekit.c 2007-07-15 16:08:22 UTC (rev 
5308)
+++ GNUnet/src/applications/tracekit/tracekit.c 2007-07-15 17:46:06 UTC (rev 
5309)
@@ -131,7 +131,7 @@
                       peerList[0],
                       &((P2P_tracekit_reply_MESSAGE_GENERIC *) reply)->
                       peerList[0], hostCount * sizeof (PeerIdentity));
-              coreAPI->sendToClient (clients[idx], &csReply->header);
+              coreAPI->sendToClient (clients[idx], &csReply->header, YES);
               FREE (csReply);
             }
           else

Modified: GNUnet/src/applications/traffic/traffic.c
===================================================================
--- GNUnet/src/applications/traffic/traffic.c   2007-07-15 16:08:22 UTC (rev 
5308)
+++ GNUnet/src/applications/traffic/traffic.c   2007-07-15 17:46:06 UTC (rev 
5309)
@@ -347,7 +347,7 @@
     return SYSERR;
   msg = (const CS_traffic_request_MESSAGE *) message;
   reply = buildReply (ntohl (msg->timePeriod));
-  ret = coreAPI->sendToClient (sock, &reply->header);
+  ret = coreAPI->sendToClient (sock, &reply->header, YES);
   FREE (reply);
   return ret;
 }

Modified: GNUnet/src/applications/vpn/vpn.c
===================================================================
--- GNUnet/src/applications/vpn/vpn.c   2007-07-15 16:08:22 UTC (rev 5308)
+++ GNUnet/src/applications/vpn/vpn.c   2007-07-15 17:46:06 UTC (rev 5309)
@@ -241,13 +241,13 @@
   b->size = htons (sizeof (MESSAGE_HEADER) + strlen ((char *) (b + 1)));
   if (c != NULL)
     {
-      coreAPI->sendToClient (c, b);
+      coreAPI->sendToClient (c, b, YES);
     }
   else
     {
       for (r = 0; r < clients_entries; r++)
         {
-          coreAPI->sendToClient (*(clients_store + r), b);
+          coreAPI->sendToClient (*(clients_store + r), b, YES);
         }
     }
   FREE (b);

Modified: GNUnet/src/include/gnunet_core.h
===================================================================
--- GNUnet/src/include/gnunet_core.h    2007-07-15 16:08:22 UTC (rev 5308)
+++ GNUnet/src/include/gnunet_core.h    2007-07-15 17:46:06 UTC (rev 5309)
@@ -157,9 +157,12 @@
  * and only return SYSERR if it runs out of buffers.  Returning OK
  * on the other hand does NOT confirm delivery since the actual
  * transfer happens asynchronously.
+ *
+ * @param force YES if this message MUST be queued
  */
 typedef int (*SendToClientCallback) (struct ClientHandle * handle,
-                                     const MESSAGE_HEADER * message);
+                                     const MESSAGE_HEADER * message,
+                                    int force);
 
 /**
  * GNUnet CORE API for applications and services that are implemented

Modified: GNUnet/src/server/connection.c
===================================================================
--- GNUnet/src/server/connection.c      2007-07-15 16:08:22 UTC (rev 5308)
+++ GNUnet/src/server/connection.c      2007-07-15 17:46:06 UTC (rev 5309)
@@ -3353,6 +3353,7 @@
           shutdownConnection (be);
           prev = be;
           be = be->overflowChain;
+         CONNECTION_buffer_[i] = be;
           FREE (prev);
         }
     }

Modified: GNUnet/src/server/tcpserver.c
===================================================================
--- GNUnet/src/server/tcpserver.c       2007-07-15 16:08:22 UTC (rev 5308)
+++ GNUnet/src/server/tcpserver.c       2007-07-15 17:46:06 UTC (rev 5309)
@@ -190,16 +190,19 @@
  * and only return errors if it runs out of buffers.  Returning OK
  * on the other hand does NOT confirm delivery since the actual
  * transfer happens asynchronously.
+ *
+ * @param force YES if this message MUST be queued
  */
 int
-sendToClient (struct ClientHandle *handle, const MESSAGE_HEADER * message)
+sendToClient (struct ClientHandle *handle, const MESSAGE_HEADER * message,
+             int force)
 {
 #if DEBUG_TCPHANDLER
   GE_LOG (ectx,
           GE_DEBUG | GE_DEVELOPER | GE_REQUEST,
           "%s: sending reply to client\n", __FUNCTION__);
 #endif
-  return select_write (selector, handle->sock, message, YES, YES);
+  return select_write (selector, handle->sock, message, YES, force);
 }
 
 void
@@ -513,7 +516,7 @@
   rv.header.size = htons (sizeof (RETURN_VALUE_MESSAGE));
   rv.header.type = htons (CS_PROTO_RETURN_VALUE);
   rv.return_value = htonl (ret);
-  return sendToClient (sock, &rv.header);
+  return sendToClient (sock, &rv.header, YES);
 }
 
 /**
@@ -542,7 +545,7 @@
   rv->header.type = htons (CS_PROTO_RETURN_ERROR);
   rv->kind = htonl (kind);
   memcpy (&rv[1], message, strlen (message));
-  ret = sendToClient (sock, &rv->header);
+  ret = sendToClient (sock, &rv->header, YES);
   FREE (rv);
   return ret;
 }

Modified: GNUnet/src/server/tcpserver.h
===================================================================
--- GNUnet/src/server/tcpserver.h       2007-07-15 16:08:22 UTC (rev 5308)
+++ GNUnet/src/server/tcpserver.h       2007-07-15 17:46:06 UTC (rev 5309)
@@ -84,7 +84,8 @@
  * transfer happens asynchronously.
  */
 int sendToClient (struct ClientHandle *handle,
-                  const MESSAGE_HEADER * message);
+                  const MESSAGE_HEADER * message,
+                 int force);
 
 
 /**

Modified: GNUnet/src/transports/tcp_helper.c
===================================================================
--- GNUnet/src/transports/tcp_helper.c  2007-07-15 16:08:22 UTC (rev 5308)
+++ GNUnet/src/transports/tcp_helper.c  2007-07-15 17:46:06 UTC (rev 5309)
@@ -162,8 +162,8 @@
       MUTEX_UNLOCK (tcplock);
       if (tcpsession->users == 0) {
         select_change_timeout (selector, tcpsession->sock, TCP_FAST_TIMEOUT);
-       GE_BREAK(ectx,
-                OK == coreAPI->assertUnused(tsession));
+       GE_ASSERT(ectx,
+                 OK == coreAPI->assertUnused(tsession));
       }
       return OK;
     }

Modified: GNUnet/src/util/network/select.c
===================================================================
--- GNUnet/src/util/network/select.c    2007-07-15 16:08:22 UTC (rev 5308)
+++ GNUnet/src/util/network/select.c    2007-07-15 17:46:06 UTC (rev 5309)
@@ -30,13 +30,6 @@
 
 #define DEBUG_SELECT NO
 
-struct SemaphoreList
-{
-  struct SemaphoreList * next;
-
-  struct SEMAPHORE * sem;
-};
-
 /**
  * Select Session handle.
  */
@@ -49,12 +42,6 @@
   struct SocketHandle *sock;
 
   /**
-   * List of semaphores to raise whenever the
-   * write buffer is empty.
-   */
-  struct SemaphoreList * list;
-
-  /**
    * Client connection context.
    */
   void *sock_ctx;
@@ -86,6 +73,12 @@
   int locked;
 
   /**
+   * Do not read from this socket until the
+   * current write is complete.
+   */
+  int no_read;
+
+  /**
    * Current read position in the buffer.
    */
   unsigned int pos;
@@ -240,6 +233,7 @@
              "Destroying session %p of select %p with loss of %u in read and 
%u in write buffer.\n",
              s, sh, s->pos, s->wapos - s->wspos);
 #endif
+  /* signal waiting threads, if any */
   MUTEX_UNLOCK (sh->lock);
   sh->ch (sh->ch_cls, sh, s->sock, s->sock_ctx);
   MUTEX_LOCK (sh->lock);
@@ -404,7 +398,7 @@
               /* free compaction! */
               session->wspos = 0;
               session->wapos = 0;
-
+             session->no_read = NO;
               if (session->wsize > sh->memory_quota)
                 {
                   /* if we went over quota before because of
@@ -503,8 +497,9 @@
             }
           else
             {
-              add_to_select_set (sock, &readSet, &max);
               add_to_select_set (sock, &errorSet, &max);
+             if (session->no_read != YES)
+               add_to_select_set (sock, &readSet, &max);
               GE_ASSERT (NULL, session->wapos >= session->wspos);
               if (session->wapos > session->wspos)
                 add_to_select_set (sock, &writeSet, &max);      /* do we have 
a pending write request? */
@@ -945,9 +940,6 @@
   unsigned short len;
   char *newBuffer;
   unsigned int newBufferSize;
-  struct SemaphoreList list;
-  struct SemaphoreList * prev;
-  struct SemaphoreList * pos;
 
 #if DEBUG_SELECT
   GE_LOG (sh->ectx,
@@ -970,9 +962,11 @@
       return SYSERR;
     }
   GE_ASSERT (NULL, session->wapos >= session->wspos);
-  if ((sh->memory_quota > 0) &&
-      (session->wapos - session->wspos + len > sh->memory_quota) &&
-      (force == NO))
+  if ( (force == NO) &&
+       ( ( (sh->memory_quota > 0) &&
+          (session->wapos - session->wspos + len > sh->memory_quota) ) ||
+        ( (sh->memory_quota == 0) &&
+          (session->wapos - session->wspos + len > MAX_MALLOC_CHECKED / 2) ) ) 
) 
     {
       /* not enough free space, not allowed to grow that much */
       MUTEX_UNLOCK (sh->lock);
@@ -1017,29 +1011,10 @@
   GE_ASSERT (NULL, session->wapos + len <= session->wsize);
   memcpy (&session->wbuff[session->wapos], msg, len);
   session->wapos += len;
-  if (mayBlock) {
-    list.next = session->list;
-    list.sem = SEMAPHORE_CREATE(0);
-    session->list = &list;
-  }
+  if (mayBlock) 
+    session->no_read = YES;  
   MUTEX_UNLOCK (sh->lock);
   signalSelect (sh);
-  if (mayBlock) {
-    SEMAPHORE_DOWN(list.sem, YES);
-    prev = NULL;
-    MUTEX_LOCK (sh->lock);
-    pos = session->list;
-    while (pos != &list) {
-      GE_ASSERT(NULL, pos != NULL);
-      prev = pos;
-      pos = pos->next;
-    }
-    if (prev == NULL)
-      session->list = list.next;
-    else
-      prev->next = list.next;
-    MUTEX_UNLOCK (sh->lock);
-  }
   return OK;
 }
 





reply via email to

[Prev in Thread] Current Thread [Next in Thread]