gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] [gnunet] branch master updated (e15a29fb9 -> 9727e5e53)


From: gnunet
Subject: [GNUnet-SVN] [gnunet] branch master updated (e15a29fb9 -> 9727e5e53)
Date: Fri, 17 Feb 2017 11:06:16 +0100

This is an automated email from the git hooks/post-receive script.

grothoff pushed a change to branch master
in repository gnunet.

    from e15a29fb9 ignore perf files
     new c793bffc3 assertion was inverted, also probably too strict
     new 9727e5e53 convert to new CADET API, not working due to CADET-API 
internal bugs

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 src/fs/fs_download.c                    |  23 ++-
 src/fs/gnunet-service-fs.c              |   2 -
 src/fs/gnunet-service-fs_cadet.h        |  49 ++++--
 src/fs/gnunet-service-fs_cadet_client.c | 290 ++++++++++++++++----------------
 src/fs/gnunet-service-fs_cadet_server.c | 286 +++++++++++++------------------
 5 files changed, 301 insertions(+), 349 deletions(-)

diff --git a/src/fs/fs_download.c b/src/fs/fs_download.c
index a89a95907..0789162bf 100644
--- a/src/fs/fs_download.c
+++ b/src/fs/fs_download.c
@@ -1363,7 +1363,6 @@ do_reconnect (void *cls)
 static void
 try_reconnect (struct GNUNET_FS_DownloadContext *dc)
 {
-
   if (NULL != dc->mq)
   {
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -1380,7 +1379,7 @@ try_reconnect (struct GNUNET_FS_DownloadContext *dc)
               "Will try to reconnect in %s\n",
              GNUNET_STRINGS_relative_time_to_string (dc->reconnect_backoff,
                                                       GNUNET_YES));
-  GNUNET_assert (NULL == dc->job_queue);
+  GNUNET_break (NULL != dc->job_queue);
   dc->task =
     GNUNET_SCHEDULER_add_delayed (dc->reconnect_backoff,
                                  &do_reconnect,
@@ -2211,8 +2210,8 @@ GNUNET_FS_download_start_downloading_ (struct 
GNUNET_FS_DownloadContext *dc)
   GNUNET_assert (NULL == dc->job_queue);
   GNUNET_assert (NULL == dc->task);
   GNUNET_assert (NULL != dc->active);
-  dc->job_queue =
-      GNUNET_FS_queue_ (dc->h,
+  dc->job_queue
+    = GNUNET_FS_queue_ (dc->h,
                         &activate_fs_download,
                         &deactivate_fs_download,
                         dc,
@@ -2252,14 +2251,14 @@ GNUNET_FS_download_resume (struct 
GNUNET_FS_DownloadContext *dc)
   GNUNET_FS_download_make_status_ (&pi, dc);
 
   GNUNET_assert (NULL == dc->task);
-  dc->job_queue =
-    GNUNET_FS_queue_ (dc->h,
-                      &activate_fs_download,
-                      &deactivate_fs_download,
-                      dc, (dc->length + DBLOCK_SIZE - 1) / DBLOCK_SIZE,
-                      (0 == (dc->options & GNUNET_FS_DOWNLOAD_IS_PROBE))
-                      ? GNUNET_FS_QUEUE_PRIORITY_NORMAL
-                      : GNUNET_FS_QUEUE_PRIORITY_PROBE);
+  dc->job_queue
+    = GNUNET_FS_queue_ (dc->h,
+                        &activate_fs_download,
+                        &deactivate_fs_download,
+                        dc, (dc->length + DBLOCK_SIZE - 1) / DBLOCK_SIZE,
+                        (0 == (dc->options & GNUNET_FS_DOWNLOAD_IS_PROBE))
+                        ? GNUNET_FS_QUEUE_PRIORITY_NORMAL
+                        : GNUNET_FS_QUEUE_PRIORITY_PROBE);
 
 }
 
diff --git a/src/fs/gnunet-service-fs.c b/src/fs/gnunet-service-fs.c
index e38fdb032..8c605c6a2 100644
--- a/src/fs/gnunet-service-fs.c
+++ b/src/fs/gnunet-service-fs.c
@@ -1177,7 +1177,6 @@ handle_client_unindex (void *cls,
 static void
 shutdown_task (void *cls)
 {
-  GSF_cadet_stop_client ();
   GSF_cadet_stop_server ();
   if (NULL != GSF_core)
   {
@@ -1320,7 +1319,6 @@ main_init (const struct GNUNET_CONFIGURATION_Handle *c)
                                     NULL);
   datastore_get_load = GNUNET_LOAD_value_init (DATASTORE_LOAD_AUTODECLINE);
   GSF_cadet_start_server ();
-  GSF_cadet_start_client ();
   GNUNET_SCHEDULER_add_shutdown (&shutdown_task,
                                 NULL);
   return GNUNET_OK;
diff --git a/src/fs/gnunet-service-fs_cadet.h b/src/fs/gnunet-service-fs_cadet.h
index 060a3993c..1fbd3a406 100644
--- a/src/fs/gnunet-service-fs_cadet.h
+++ b/src/fs/gnunet-service-fs_cadet.h
@@ -1,6 +1,6 @@
 /*
      This file is part of GNUnet.
-     Copyright (C) 2012 GNUnet e.V.
+     Copyright (C) 2012, 2017 GNUnet e.V.
 
      GNUnet is free software; you can redistribute it and/or modify
      it under the terms of the GNU General Public License as published
@@ -38,14 +38,15 @@ struct GSF_CadetRequest;
  * @param cls closure
  * @param type type of the block, ANY on error
  * @param expiration expiration time for the block
- * @param data_size number of bytes in 'data', 0 on error
+ * @param data_size number of bytes in @a data, 0 on error
  * @param data reply block data, NULL on error
  */
-typedef void (*GSF_CadetReplyProcessor)(void *cls,
-                                        enum GNUNET_BLOCK_Type type,
-                                        struct GNUNET_TIME_Absolute expiration,
-                                        size_t data_size,
-                                        const void *data);
+typedef void
+(*GSF_CadetReplyProcessor)(void *cls,
+                           enum GNUNET_BLOCK_Type type,
+                           struct GNUNET_TIME_Absolute expiration,
+                           size_t data_size,
+                           const void *data);
 
 
 /**
@@ -55,14 +56,28 @@ typedef void (*GSF_CadetReplyProcessor)(void *cls,
  * @param query hash to query for the block
  * @param type desired type for the block
  * @param proc function to call with result
- * @param proc_cls closure for 'proc'
+ * @param proc_cls closure for @a proc
  * @return handle to cancel the operation
  */
 struct GSF_CadetRequest *
 GSF_cadet_query (const struct GNUNET_PeerIdentity *target,
-                 const struct GNUNET_HashCode *query,
-                 enum GNUNET_BLOCK_Type type,
-                 GSF_CadetReplyProcessor proc, void *proc_cls);
+                 const struct GNUNET_HashCode *query,
+                 enum GNUNET_BLOCK_Type type,
+                 GSF_CadetReplyProcessor proc,
+                 void *proc_cls);
+
+/**
+ * Function called on each active cadets to shut them down.
+ *
+ * @param cls NULL
+ * @param key target peer, unused
+ * @param value the `struct CadetHandle` to destroy
+ * @return #GNUNET_YES (continue to iterate)
+ */
+int
+GSF_cadet_release_clients (void *cls,
+                           const struct GNUNET_PeerIdentity *key,
+                           void *value);
 
 
 /**
@@ -89,17 +104,15 @@ void
 GSF_cadet_stop_server (void);
 
 /**
- * Initialize subsystem for non-anonymous file-sharing.
+ * Cadet channel for creating outbound channels.
  */
-void
-GSF_cadet_start_client (void);
-
+extern struct GNUNET_CADET_Handle *cadet_handle;
 
 /**
- * Shutdown subsystem for non-anonymous file-sharing.
+ * Map from peer identities to 'struct CadetHandles' with cadet
+ * channels to those peers.
  */
-void
-GSF_cadet_stop_client (void);
+extern struct GNUNET_CONTAINER_MultiPeerMap *cadet_map;
 
 
 GNUNET_NETWORK_STRUCT_BEGIN
diff --git a/src/fs/gnunet-service-fs_cadet_client.c 
b/src/fs/gnunet-service-fs_cadet_client.c
index 4e268b93c..193fe2263 100644
--- a/src/fs/gnunet-service-fs_cadet_client.c
+++ b/src/fs/gnunet-service-fs_cadet_client.c
@@ -155,13 +155,13 @@ struct CadetHandle
 /**
  * Cadet channel for creating outbound channels.
  */
-static struct GNUNET_CADET_Handle *cadet_handle;
+struct GNUNET_CADET_Handle *cadet_handle;
 
 /**
  * Map from peer identities to 'struct CadetHandles' with cadet
  * channels to those peers.
  */
-static struct GNUNET_CONTAINER_MultiPeerMap *cadet_map;
+struct GNUNET_CONTAINER_MultiPeerMap *cadet_map;
 
 
 /* ********************* client-side code ************************* */
@@ -419,9 +419,9 @@ struct HandleReplyClosure
  * @return #GNUNET_YES (continue to iterate)
  */
 static int
-handle_reply (void *cls,
-             const struct GNUNET_HashCode *key,
-             void *value)
+process_reply (void *cls,
+               const struct GNUNET_HashCode *key,
+               void *value)
 {
   struct HandleReplyClosure *hrc = cls;
   struct GSF_CadetRequest *sr = value;
@@ -443,38 +443,43 @@ handle_reply (void *cls,
  * is received.
  *
  * @param cls closure with the `struct CadetHandle`
- * @param channel channel handle
- * @param channel_ctx channel context
- * @param message the actual message
+ * @param srm the actual message
  * @return #GNUNET_OK on success, #GNUNET_SYSERR to stop further processing
  */
 static int
-reply_cb (void *cls,
-         struct GNUNET_CADET_Channel *channel,
-         void **channel_ctx,
-          const struct GNUNET_MessageHeader *message)
+check_reply (void *cls,
+             const struct CadetReplyMessage *srm)
 {
-  struct CadetHandle *mh = *channel_ctx;
-  const struct CadetReplyMessage *srm;
+  /* We check later... */
+  return GNUNET_OK;
+}
+
+
+/**
+ * Functions with this signature are called whenever a complete reply
+ * is received.
+ *
+ * @param cls closure with the `struct CadetHandle`
+ * @param srm the actual message
+ */
+static void
+handle_reply (void *cls,
+              const struct CadetReplyMessage *srm)
+{
+  struct CadetHandle *mh = cls;
   struct HandleReplyClosure hrc;
   uint16_t msize;
   enum GNUNET_BLOCK_Type type;
   struct GNUNET_HashCode query;
 
-  msize = ntohs (message->size);
-  if (sizeof (struct CadetReplyMessage) > msize)
-  {
-    GNUNET_break_op (0);
-    reset_cadet_async (mh);
-    return GNUNET_SYSERR;
-  }
-  srm = (const struct CadetReplyMessage *) message;
-  msize -= sizeof (struct CadetReplyMessage);
+  msize = ntohs (srm->header.size) - sizeof (struct CadetReplyMessage);
   type = (enum GNUNET_BLOCK_Type) ntohl (srm->type);
   if (GNUNET_YES !=
       GNUNET_BLOCK_get_key (GSF_block_ctx,
                            type,
-                           &srm[1], msize, &query))
+                           &srm[1],
+                            msize,
+                            &query))
   {
     GNUNET_break_op (0);
     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
@@ -483,13 +488,13 @@ reply_cb (void *cls,
                 msize,
                 GNUNET_i2s (&mh->target));
     reset_cadet_async (mh);
-    return GNUNET_SYSERR;
+    return;
   }
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
              "Received reply `%s' via cadet from peer %s\n",
              GNUNET_h2s (&query),
              GNUNET_i2s (&mh->target));
-  GNUNET_CADET_receive_done (channel);
+  GNUNET_CADET_receive_done (mh->channel);
   GNUNET_STATISTICS_update (GSF_stats,
                            gettext_noop ("# replies received via cadet"), 1,
                            GNUNET_NO);
@@ -500,16 +505,103 @@ reply_cb (void *cls,
   hrc.found = GNUNET_NO;
   GNUNET_CONTAINER_multihashmap_get_multiple (mh->waiting_map,
                                              &query,
-                                             &handle_reply,
+                                             &process_reply,
                                              &hrc);
   if (GNUNET_NO == hrc.found)
   {
     GNUNET_STATISTICS_update (GSF_stats,
                              gettext_noop ("# replies received via cadet 
dropped"), 1,
                              GNUNET_NO);
-    return GNUNET_OK;
   }
-  return GNUNET_OK;
+}
+
+
+/**
+ * Iterator called on each entry in a waiting map to
+ * call the 'proc' continuation and release associated
+ * resources.
+ *
+ * @param cls the `struct CadetHandle`
+ * @param key the key of the entry in the map (the query)
+ * @param value the `struct GSF_CadetRequest` to clean up
+ * @return #GNUNET_YES (continue to iterate)
+ */
+static int
+free_waiting_entry (void *cls,
+                   const struct GNUNET_HashCode *key,
+                   void *value)
+{
+  struct GSF_CadetRequest *sr = value;
+
+  GSF_cadet_query_cancel (sr);
+  return GNUNET_YES;
+}
+
+
+/**
+ * Function called by cadet when a client disconnects.
+ * Cleans up our `struct CadetClient` of that channel.
+ *
+ * @param cls our `struct CadetClient`
+ * @param channel channel of the disconnecting client
+ */
+static void
+disconnect_cb (void *cls,
+               const struct GNUNET_CADET_Channel *channel)
+{
+  struct CadetHandle *mh = cls;
+  struct GSF_CadetRequest *sr;
+
+  if (NULL == mh->channel)
+    return; /* being destroyed elsewhere */
+  GNUNET_assert (channel == mh->channel);
+  mh->channel = NULL;
+  while (NULL != (sr = mh->pending_head))
+    GSF_cadet_query_cancel (sr);
+  /* first remove `mh` from the `cadet_map`, so that if the
+     callback from `free_waiting_entry()` happens to re-issue
+     the request, we don't immediately have it back in the
+     `waiting_map`. */
+  GNUNET_assert (GNUNET_OK ==
+                GNUNET_CONTAINER_multipeermap_remove (cadet_map,
+                                                      &mh->target,
+                                                      mh));
+  GNUNET_CONTAINER_multihashmap_iterate (mh->waiting_map,
+                                        &free_waiting_entry,
+                                        mh);
+  if (NULL != mh->wh)
+    GNUNET_CADET_notify_transmit_ready_cancel (mh->wh);
+  if (NULL != mh->timeout_task)
+    GNUNET_SCHEDULER_cancel (mh->timeout_task);
+  if (NULL != mh->reset_task)
+    GNUNET_SCHEDULER_cancel (mh->reset_task);
+  GNUNET_assert (0 ==
+                 GNUNET_CONTAINER_multihashmap_size (mh->waiting_map));
+  GNUNET_CONTAINER_multihashmap_destroy (mh->waiting_map);
+  GNUNET_free (mh);
+}
+
+
+/**
+ * Function called whenever an MQ-channel's transmission window size changes.
+ *
+ * The first callback in an outgoing channel will be with a non-zero value
+ * and will mean the channel is connected to the destination.
+ *
+ * For an incoming channel it will be called immediately after the
+ * #GNUNET_CADET_ConnectEventHandler, also with a non-zero value.
+ *
+ * @param cls Channel closure.
+ * @param channel Connection to the other end (henceforth invalid).
+ * @param window_size New window size. If the is more messages than buffer size
+ *                    this value will be negative..
+ */
+static void
+window_change_cb (void *cls,
+                  const struct GNUNET_CADET_Channel *channel,
+                  int window_size)
+{
+  /* FIXME: for flow control, implement? */
 }
 
 
@@ -552,14 +644,25 @@ get_cadet (const struct GNUNET_PeerIdentity *target)
   GNUNET_CRYPTO_hash (GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER,
                       strlen (GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER),
                       &port);
-  mh->channel = GNUNET_CADET_channel_create (cadet_handle,
-                                             mh,
-                                             &mh->target,
-                                             &port,
-                                             GNUNET_CADET_OPTION_RELIABLE);
-  GNUNET_assert (mh ==
-                 GNUNET_CONTAINER_multipeermap_get (cadet_map,
-                                                    target));
+
+  {
+    struct GNUNET_MQ_MessageHandler handlers[] = {
+      GNUNET_MQ_hd_var_size (reply,
+                             GNUNET_MESSAGE_TYPE_FS_CADET_REPLY,
+                             struct CadetReplyMessage,
+                             mh),
+      GNUNET_MQ_handler_end ()
+    };
+
+    mh->channel = GNUNET_CADET_channel_creatE (cadet_handle,
+                                               mh,
+                                               &mh->target,
+                                               &port,
+                                               GNUNET_CADET_OPTION_RELIABLE,
+                                               &window_change_cb,
+                                               &disconnect_cb,
+                                               handlers);
+  }
   return mh;
 }
 
@@ -646,93 +749,6 @@ GSF_cadet_query_cancel (struct GSF_CadetRequest *sr)
 
 
 /**
- * Iterator called on each entry in a waiting map to
- * call the 'proc' continuation and release associated
- * resources.
- *
- * @param cls the `struct CadetHandle`
- * @param key the key of the entry in the map (the query)
- * @param value the `struct GSF_CadetRequest` to clean up
- * @return #GNUNET_YES (continue to iterate)
- */
-static int
-free_waiting_entry (void *cls,
-                   const struct GNUNET_HashCode *key,
-                   void *value)
-{
-  struct GSF_CadetRequest *sr = value;
-
-  GSF_cadet_query_cancel (sr);
-  return GNUNET_YES;
-}
-
-
-/**
- * Function called by cadet when a client disconnects.
- * Cleans up our `struct CadetClient` of that channel.
- *
- * @param cls NULL
- * @param channel channel of the disconnecting client
- * @param channel_ctx our `struct CadetClient`
- */
-static void
-cleaner_cb (void *cls,
-           const struct GNUNET_CADET_Channel *channel,
-           void *channel_ctx)
-{
-  struct CadetHandle *mh = channel_ctx;
-  struct GSF_CadetRequest *sr;
-
-  if (NULL == mh->channel)
-    return; /* being destroyed elsewhere */
-  GNUNET_assert (channel == mh->channel);
-  mh->channel = NULL;
-  while (NULL != (sr = mh->pending_head))
-    GSF_cadet_query_cancel (sr);
-  /* first remove `mh` from the `cadet_map`, so that if the
-     callback from `free_waiting_entry()` happens to re-issue
-     the request, we don't immediately have it back in the
-     `waiting_map`. */
-  GNUNET_assert (GNUNET_OK ==
-                GNUNET_CONTAINER_multipeermap_remove (cadet_map,
-                                                      &mh->target,
-                                                      mh));
-  GNUNET_CONTAINER_multihashmap_iterate (mh->waiting_map,
-                                        &free_waiting_entry,
-                                        mh);
-  if (NULL != mh->wh)
-    GNUNET_CADET_notify_transmit_ready_cancel (mh->wh);
-  if (NULL != mh->timeout_task)
-    GNUNET_SCHEDULER_cancel (mh->timeout_task);
-  if (NULL != mh->reset_task)
-    GNUNET_SCHEDULER_cancel (mh->reset_task);
-  GNUNET_assert (0 ==
-                 GNUNET_CONTAINER_multihashmap_size (mh->waiting_map));
-  GNUNET_CONTAINER_multihashmap_destroy (mh->waiting_map);
-  GNUNET_free (mh);
-}
-
-
-/**
- * Initialize subsystem for non-anonymous file-sharing.
- */
-void
-GSF_cadet_start_client ()
-{
-  static const struct GNUNET_CADET_MessageHandler handlers[] = {
-    { &reply_cb, GNUNET_MESSAGE_TYPE_FS_CADET_REPLY, 0 },
-    { NULL, 0, 0 }
-  };
-
-  cadet_map = GNUNET_CONTAINER_multipeermap_create (16, GNUNET_YES);
-  cadet_handle = GNUNET_CADET_connect (GSF_cfg,
-                                    NULL,
-                                    &cleaner_cb,
-                                    handlers);
-}
-
-
-/**
  * Function called on each active cadets to shut them down.
  *
  * @param cls NULL
@@ -740,10 +756,10 @@ GSF_cadet_start_client ()
  * @param value the `struct CadetHandle` to destroy
  * @return #GNUNET_YES (continue to iterate)
  */
-static int
-release_cadets (void *cls,
-              const struct GNUNET_PeerIdentity *key,
-              void *value)
+int
+GSF_cadet_release_clients (void *cls,
+                           const struct GNUNET_PeerIdentity *key,
+                           void *value)
 {
   struct CadetHandle *mh = value;
 
@@ -756,23 +772,5 @@ release_cadets (void *cls,
 }
 
 
-/**
- * Shutdown subsystem for non-anonymous file-sharing.
- */
-void
-GSF_cadet_stop_client ()
-{
-  GNUNET_CONTAINER_multipeermap_iterate (cadet_map,
-                                        &release_cadets,
-                                        NULL);
-  GNUNET_CONTAINER_multipeermap_destroy (cadet_map);
-  cadet_map = NULL;
-  if (NULL != cadet_handle)
-  {
-    GNUNET_CADET_disconnect (cadet_handle);
-    cadet_handle = NULL;
-  }
-}
-
 
 /* end of gnunet-service-fs_cadet_client.c */
diff --git a/src/fs/gnunet-service-fs_cadet_server.c 
b/src/fs/gnunet-service-fs_cadet_server.c
index ac86537c3..0a72a8279 100644
--- a/src/fs/gnunet-service-fs_cadet_server.c
+++ b/src/fs/gnunet-service-fs_cadet_server.c
@@ -124,9 +124,9 @@ struct CadetClient
 
 
 /**
- * Listen channel for incoming requests.
+ * Listen port for incoming requests.
  */
-static struct GNUNET_CADET_Handle *listen_channel;
+static struct GNUNET_CADET_Port *cadet_port;
 
 /**
  * Head of DLL of cadet clients.
@@ -188,121 +188,29 @@ refresh_timeout_task (struct CadetClient *sc)
 
 
 /**
- * We're done handling a request from a client, read the next one.
+ * Check if we are done with the write queue, and if so tell CADET
+ * that we are ready to read more.
  *
- * @param sc client to continue reading requests from
+ * @param cls where to process the write queue
  */
 static void
-continue_reading (struct CadetClient *sc)
-{
-  refresh_timeout_task (sc);
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "Finished processing cadet request from client %p, ready to 
receive the next one\n",
-             sc);
-  GNUNET_CADET_receive_done (sc->channel);
-}
-
-
-/**
- * Transmit the next entry from the write queue.
- *
- * @param sc where to process the write queue
- */
-static void
-continue_writing (struct CadetClient *sc);
-
-
-/**
- * Send a reply now, cadet is ready.
- *
- * @param cls closure with the `struct CadetClient` which sent the query
- * @param size number of bytes available in @a buf
- * @param buf where to write the message
- * @return number of bytes written to @a buf
- */
-static size_t
-write_continuation (void *cls,
-                   size_t size,
-                   void *buf)
+continue_writing (void *cls)
 {
   struct CadetClient *sc = cls;
-  struct GNUNET_CADET_Channel *tun;
-  struct WriteQueueItem *wqi;
-  size_t ret;
-
-  sc->wh = NULL;
-  if (NULL == (wqi = sc->wqi_head))
-  {
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-               "Write queue empty, reading more requests\n");
-    return 0;
-  }
-  if ( (0 == size) ||
-       (size < wqi->msize) )
-  {
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-               "Transmission of reply failed, terminating cadet\n");
-    tun = sc->channel;
-    sc->channel = NULL;
-    GNUNET_CADET_channel_destroy (tun);
-    return 0;
-  }
-  GNUNET_CONTAINER_DLL_remove (sc->wqi_head,
-                              sc->wqi_tail,
-                              wqi);
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "Transmitted %u byte reply via cadet to %p\n",
-             (unsigned int) size,
-             sc);
-  GNUNET_STATISTICS_update (GSF_stats,
-                           gettext_noop ("# Blocks transferred via cadet"), 1,
-                           GNUNET_NO);
-  ret = wqi->msize;
-  GNUNET_memcpy (buf, &wqi[1], ret);
-  GNUNET_free (wqi);
-  continue_writing (sc);
-  return ret;
-}
-
-
-/**
- * Transmit the next entry from the write queue.
- *
- * @param sc where to process the write queue
- */
-static void
-continue_writing (struct CadetClient *sc)
-{
-  struct WriteQueueItem *wqi;
-  struct GNUNET_CADET_Channel *tun;
+  struct GNUNET_MQ_Handle *mq;
 
-  if (NULL != sc->wh)
+  mq = GNUNET_CADET_get_mq (sc->channel);
+  if (0 != GNUNET_MQ_get_length (mq))
   {
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                "Write pending, waiting for it to complete\n");
-    return; /* write already pending */
-  }
-  if (NULL == (wqi = sc->wqi_head))
-  {
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-               "Write queue empty, reading more requests\n");
-    continue_reading (sc);
-    return;
-  }
-  sc->wh = GNUNET_CADET_notify_transmit_ready (sc->channel, GNUNET_NO,
-                                             GNUNET_TIME_UNIT_FOREVER_REL,
-                                             wqi->msize,
-                                             &write_continuation,
-                                             sc);
-  if (NULL == sc->wh)
-  {
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-               "Write failed; terminating cadet\n");
-    tun = sc->channel;
-    sc->channel = NULL;
-    GNUNET_CADET_channel_destroy (tun);
     return;
   }
+  refresh_timeout_task (sc);
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Finished processing cadet request from client %p, ready to 
receive the next one\n",
+             sc);
+  GNUNET_CADET_receive_done (sc->channel);
 }
 
 
@@ -333,7 +241,7 @@ handle_datastore_reply (void *cls,
 {
   struct CadetClient *sc = cls;
   size_t msize = size + sizeof (struct CadetReplyMessage);
-  struct WriteQueueItem *wqi;
+  struct GNUNET_MQ_Envelope *env;
   struct CadetReplyMessage *srm;
 
   sc->qe = NULL;
@@ -357,7 +265,8 @@ handle_datastore_reply (void *cls,
                  GNUNET_h2s (key));
     }
     GNUNET_STATISTICS_update (GSF_stats,
-                              gettext_noop ("# queries received via CADET not 
answered"), 1,
+                              gettext_noop ("# queries received via CADET not 
answered"),
+                              1,
                               GNUNET_NO);
     continue_writing (sc);
     return;
@@ -369,9 +278,13 @@ handle_datastore_reply (void *cls,
                GNUNET_h2s (key));
     if (GNUNET_OK !=
        GNUNET_FS_handle_on_demand_block (key,
-                                         size, data, type,
-                                         priority, anonymity,
-                                         expiration, uid,
+                                         size,
+                                          data,
+                                          type,
+                                         priority,
+                                          anonymity,
+                                         expiration,
+                                          uid,
                                          &handle_datastore_reply,
                                          sc))
     {
@@ -394,19 +307,23 @@ handle_datastore_reply (void *cls,
               (unsigned int) type,
              GNUNET_h2s (key),
              sc);
-  wqi = GNUNET_malloc (sizeof (struct WriteQueueItem) + msize);
-  wqi->msize = msize;
-  srm = (struct CadetReplyMessage *) &wqi[1];
-  srm->header.size = htons ((uint16_t) msize);
-  srm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_CADET_REPLY);
+  env = GNUNET_MQ_msg_extra (srm,
+                             size,
+                             GNUNET_MESSAGE_TYPE_FS_CADET_REPLY);
   srm->type = htonl (type);
   srm->expiration = GNUNET_TIME_absolute_hton (expiration);
-  GNUNET_memcpy (&srm[1], data, size);
-  sc->reply_size = msize;
-  GNUNET_CONTAINER_DLL_insert (sc->wqi_head,
-                              sc->wqi_tail,
-                              wqi);
-  continue_writing (sc);
+  GNUNET_memcpy (&srm[1],
+                 data,
+                 size);
+  GNUNET_MQ_notify_sent (env,
+                         &continue_writing,
+                         sc);
+  GNUNET_STATISTICS_update (GSF_stats,
+                           gettext_noop ("# Blocks transferred via cadet"),
+                            1,
+                           GNUNET_NO);
+  GNUNET_MQ_send (GNUNET_CADET_get_mq (sc->channel),
+                  env);
 }
 
 
@@ -414,30 +331,22 @@ handle_datastore_reply (void *cls,
  * Functions with this signature are called whenever a
  * complete query message is received.
  *
- * Do not call #GNUNET_SERVER_mst_destroy() in callback
- *
  * @param cls closure with the `struct CadetClient`
- * @param channel channel handle
- * @param channel_ctx channel context
- * @param message the actual message
- * @return #GNUNET_OK on success, #GNUNET_SYSERR to stop further processing
+ * @param sqm the actual message
  */
-static int
-request_cb (void *cls,
-           struct GNUNET_CADET_Channel *channel,
-           void **channel_ctx,
-           const struct GNUNET_MessageHeader *message)
+static void
+handle_request (void *cls,
+                const struct CadetQueryMessage *sqm)
 {
-  struct CadetClient *sc = *channel_ctx;
-  const struct CadetQueryMessage *sqm;
+  struct CadetClient *sc = cls;
 
-  sqm = (const struct CadetQueryMessage *) message;
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
              "Received query for `%s' via cadet from client %p\n",
              GNUNET_h2s (&sqm->query),
              sc);
   GNUNET_STATISTICS_update (GSF_stats,
-                           gettext_noop ("# queries received via cadet"), 1,
+                           gettext_noop ("# queries received via cadet"),
+                            1,
                            GNUNET_NO);
   refresh_timeout_task (sc);
   sc->qe = GNUNET_DATASTORE_get_key (GSF_dsh,
@@ -446,14 +355,14 @@ request_cb (void *cls,
                                     ntohl (sqm->type),
                                     0 /* priority */,
                                     GSF_datastore_queue_size,
-                                    &handle_datastore_reply, sc);
+                                    &handle_datastore_reply,
+                                     sc);
   if (NULL == sc->qe)
   {
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                "Queueing request with datastore failed (queue full?)\n");
     continue_writing (sc);
   }
-  return GNUNET_OK;
 }
 
 
@@ -464,16 +373,12 @@ request_cb (void *cls,
  * @param channel the channel representing the cadet
  * @param initiator the identity of the peer who wants to establish a cadet
  *            with us; NULL on binding error
- * @param port cadet port used for the incoming connection
- * @param options channel option flags
- * @return initial channel context (our 'struct CadetClient')
+ * @return initial channel context (our `struct CadetClient`)
  */
 static void *
-accept_cb (void *cls,
-          struct GNUNET_CADET_Channel *channel,
-          const struct GNUNET_PeerIdentity *initiator,
-          const struct GNUNET_HashCode *port,
-           enum GNUNET_CADET_ChannelOption options)
+connect_cb (void *cls,
+            struct GNUNET_CADET_Channel *channel,
+            const struct GNUNET_PeerIdentity *initiator)
 {
   struct CadetClient *sc;
 
@@ -481,13 +386,15 @@ accept_cb (void *cls,
   if (sc_count >= sc_count_max)
   {
     GNUNET_STATISTICS_update (GSF_stats,
-                             gettext_noop ("# cadet client connections 
rejected"), 1,
+                             gettext_noop ("# cadet client connections 
rejected"),
+                              1,
                              GNUNET_NO);
     GNUNET_CADET_channel_destroy (channel);
     return NULL;
   }
   GNUNET_STATISTICS_update (GSF_stats,
-                           gettext_noop ("# cadet connections active"), 1,
+                           gettext_noop ("# cadet connections active"),
+                            1,
                            GNUNET_NO);
   sc = GNUNET_new (struct CadetClient);
   sc->channel = channel;
@@ -506,18 +413,17 @@ accept_cb (void *cls,
 
 /**
  * Function called by cadet when a client disconnects.
- * Cleans up our 'struct CadetClient' of that channel.
+ * Cleans up our `struct CadetClient` of that channel.
  *
- * @param cls NULL
+ * @param cls  our `struct CadetClient`
  * @param channel channel of the disconnecting client
- * @param channel_ctx our 'struct CadetClient'
+ * @param channel_ctx
  */
 static void
-cleaner_cb (void *cls,
-           const struct GNUNET_CADET_Channel *channel,
-           void *channel_ctx)
+disconnect_cb (void *cls,
+               const struct GNUNET_CADET_Channel *channel)
 {
-  struct CadetClient *sc = channel_ctx;
+  struct CadetClient *sc = cls;
   struct WriteQueueItem *wqi;
 
   if (NULL == sc)
@@ -552,15 +458,42 @@ cleaner_cb (void *cls,
 }
 
 
+
+/**
+ * Function called whenever an MQ-channel's transmission window size changes.
+ *
+ * The first callback in an outgoing channel will be with a non-zero value
+ * and will mean the channel is connected to the destination.
+ *
+ * For an incoming channel it will be called immediately after the
+ * #GNUNET_CADET_ConnectEventHandler, also with a non-zero value.
+ *
+ * @param cls Channel closure.
+ * @param channel Connection to the other end (henceforth invalid).
+ * @param window_size New window size. If the is more messages than buffer size
+ *                    this value will be negative..
+ */
+static void
+window_change_cb (void *cls,
+                  const struct GNUNET_CADET_Channel *channel,
+                  int window_size)
+{
+  /* FIXME: could do flow control here... */
+}
+
+
 /**
  * Initialize subsystem for non-anonymous file-sharing.
  */
 void
 GSF_cadet_start_server ()
 {
-  static const struct GNUNET_CADET_MessageHandler handlers[] = {
-    { &request_cb, GNUNET_MESSAGE_TYPE_FS_CADET_QUERY, sizeof (struct 
CadetQueryMessage)},
-    { NULL, 0, 0 }
+  struct GNUNET_MQ_MessageHandler handlers[] = {
+    GNUNET_MQ_hd_fixed_size (request,
+                             GNUNET_MESSAGE_TYPE_FS_CADET_QUERY,
+                             struct CadetQueryMessage,
+                             NULL),
+    GNUNET_MQ_handler_end ()
   };
   struct GNUNET_HashCode port;
 
@@ -573,18 +506,19 @@ GSF_cadet_start_server ()
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
              "Initializing cadet FS server with a limit of %llu connections\n",
              sc_count_max);
-  listen_channel = GNUNET_CADET_connect (GSF_cfg,
-                                         NULL,
-                                         &cleaner_cb,
-                                         handlers);
-  GNUNET_assert (NULL != listen_channel);
+  cadet_map = GNUNET_CONTAINER_multipeermap_create (16, GNUNET_YES);
+  cadet_handle = GNUNET_CADET_connecT (GSF_cfg);
+  GNUNET_assert (NULL != cadet_handle);
   GNUNET_CRYPTO_hash (GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER,
                       strlen (GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER),
                       &port);
-  GNUNET_CADET_open_port (listen_channel,
-                          &port,
-                          &accept_cb,
-                          NULL);
+  cadet_port = GNUNET_CADET_open_porT (cadet_handle,
+                                       &port,
+                                       &connect_cb,
+                                       NULL,
+                                       &window_change_cb,
+                                       &disconnect_cb,
+                                       handlers);
 }
 
 
@@ -594,10 +528,20 @@ GSF_cadet_start_server ()
 void
 GSF_cadet_stop_server ()
 {
-  if (NULL != listen_channel)
+  GNUNET_CONTAINER_multipeermap_iterate (cadet_map,
+                                        &GSF_cadet_release_clients,
+                                        NULL);
+  GNUNET_CONTAINER_multipeermap_destroy (cadet_map);
+  cadet_map = NULL;
+  if (NULL != cadet_port)
+  {
+    GNUNET_CADET_close_port (cadet_port);
+    cadet_port = NULL;
+  }
+  if (NULL != cadet_handle)
   {
-    GNUNET_CADET_disconnect (listen_channel);
-    listen_channel = NULL;
+    GNUNET_CADET_disconnect (cadet_handle);
+    cadet_handle = NULL;
   }
   GNUNET_assert (NULL == sc_head);
   GNUNET_assert (0 == sc_count);

-- 
To stop receiving notification emails like this one, please contact
address@hidden



reply via email to

[Prev in Thread] Current Thread [Next in Thread]