[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r33200 - gnunet/src/fs
From: |
gnunet |
Subject: |
[GNUnet-SVN] r33200 - gnunet/src/fs |
Date: |
Wed, 7 May 2014 14:07:45 +0200 |
Author: bartpolot
Date: 2014-05-07 14:07:44 +0200 (Wed, 07 May 2014)
New Revision: 33200
Added:
gnunet/src/fs/gnunet-service-fs_cadet.h
gnunet/src/fs/gnunet-service-fs_cadet_client.c
gnunet/src/fs/gnunet-service-fs_cadet_server.c
Removed:
gnunet/src/fs/gnunet-service-fs_mesh.h
gnunet/src/fs/gnunet-service-fs_mesh_client.c
gnunet/src/fs/gnunet-service-fs_mesh_server.c
Log:
- update fs
Copied: gnunet/src/fs/gnunet-service-fs_cadet.h (from rev 33199,
gnunet/src/fs/gnunet-service-fs_mesh.h)
===================================================================
--- gnunet/src/fs/gnunet-service-fs_cadet.h (rev 0)
+++ gnunet/src/fs/gnunet-service-fs_cadet.h 2014-05-07 12:07:44 UTC (rev
33200)
@@ -0,0 +1,159 @@
+/*
+ This file is part of GNUnet.
+ (C) 2012 Christian Grothoff (and other contributing authors)
+
+ GNUnet is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published
+ by the Free Software Foundation; either version 3, or (at your
+ option) any later version.
+
+ GNUnet is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with GNUnet; see the file COPYING. If not, write to the
+ Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ Boston, MA 02111-1307, USA.
+*/
+
+/**
+ * @file fs/gnunet-service-fs_cadet.h
+ * @brief non-anonymous file-transfer
+ * @author Christian Grothoff
+ */
+#ifndef GNUNET_SERVICE_FS_CADET_H
+#define GNUNET_SERVICE_FS_CADET_H
+
+/**
+ * Handle for a request that is going out via cadet API.
+ */
+struct GSF_CadetRequest;
+
+
+/**
+ * Function called with a reply from the cadet.
+ *
+ * @param cls closure
+ * @param type type of the block, ANY on error
+ * @param expiration expiration time for the block
+ * @param data_size number of bytes in 'data', 0 on error
+ * @param data reply block data, NULL on error
+ */
+typedef void (*GSF_CadetReplyProcessor)(void *cls,
+ enum GNUNET_BLOCK_Type type,
+ struct GNUNET_TIME_Absolute expiration,
+ size_t data_size,
+ const void *data);
+
+
+/**
+ * Look for a block by directly contacting a particular peer.
+ *
+ * @param target peer that should have the block
+ * @param query hash to query for the block
+ * @param type desired type for the block
+ * @param proc function to call with result
+ * @param proc_cls closure for 'proc'
+ * @return handle to cancel the operation
+ */
+struct GSF_CadetRequest *
+GSF_cadet_query (const struct GNUNET_PeerIdentity *target,
+ const struct GNUNET_HashCode *query,
+ enum GNUNET_BLOCK_Type type,
+ GSF_CadetReplyProcessor proc, void *proc_cls);
+
+
+/**
+ * Cancel an active request; must not be called after 'proc'
+ * was calld.
+ *
+ * @param sr request to cancel
+ */
+void
+GSF_cadet_query_cancel (struct GSF_CadetRequest *sr);
+
+
+/**
+ * Initialize subsystem for non-anonymous file-sharing.
+ */
+void
+GSF_cadet_start_server (void);
+
+
+/**
+ * Shutdown subsystem for non-anonymous file-sharing.
+ */
+void
+GSF_cadet_stop_server (void);
+
+/**
+ * Initialize subsystem for non-anonymous file-sharing.
+ */
+void
+GSF_cadet_start_client (void);
+
+
+/**
+ * Shutdown subsystem for non-anonymous file-sharing.
+ */
+void
+GSF_cadet_stop_client (void);
+
+
+GNUNET_NETWORK_STRUCT_BEGIN
+
+/**
+ * Query from one peer, asking the other for CHK-data.
+ */
+struct CadetQueryMessage
+{
+
+ /**
+ * Type is GNUNET_MESSAGE_TYPE_FS_CADET_QUERY.
+ */
+ struct GNUNET_MessageHeader header;
+
+ /**
+ * Block type must be DBLOCK or IBLOCK.
+ */
+ uint32_t type GNUNET_PACKED;
+
+ /**
+ * Query hash from CHK (hash of encrypted block).
+ */
+ struct GNUNET_HashCode query;
+
+};
+
+
+/**
+ * Reply to a CadetQueryMessage.
+ */
+struct CadetReplyMessage
+{
+
+ /**
+ * Type is GNUNET_MESSAGE_TYPE_FS_CADET_REPLY.
+ */
+ struct GNUNET_MessageHeader header;
+
+ /**
+ * Block type must be DBLOCK or IBLOCK.
+ */
+ uint32_t type GNUNET_PACKED;
+
+ /**
+ * Expiration time for the block.
+ */
+ struct GNUNET_TIME_AbsoluteNBO expiration;
+
+ /* followed by the encrypted block */
+
+};
+
+GNUNET_NETWORK_STRUCT_END
+
+
+#endif
Copied: gnunet/src/fs/gnunet-service-fs_cadet_client.c (from rev 33199,
gnunet/src/fs/gnunet-service-fs_mesh_client.c)
===================================================================
--- gnunet/src/fs/gnunet-service-fs_cadet_client.c
(rev 0)
+++ gnunet/src/fs/gnunet-service-fs_cadet_client.c 2014-05-07 12:07:44 UTC
(rev 33200)
@@ -0,0 +1,766 @@
+/*
+ This file is part of GNUnet.
+ (C) 2012, 2013 Christian Grothoff (and other contributing authors)
+
+ GNUnet is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published
+ by the Free Software Foundation; either version 3, or (at your
+ option) any later version.
+
+ GNUnet is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with GNUnet; see the file COPYING. If not, write to the
+ Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ Boston, MA 02111-1307, USA.
+*/
+
+/**
+ * @file fs/gnunet-service-fs_cadet_client.c
+ * @brief non-anonymous file-transfer
+ * @author Christian Grothoff
+ *
+ * TODO:
+ * - PORT is set to old application type, unsure if we should keep
+ * it that way (fine for now)
+ */
+#include "platform.h"
+#include "gnunet_constants.h"
+#include "gnunet_util_lib.h"
+#include "gnunet_cadet_service.h"
+#include "gnunet_protocols.h"
+#include "gnunet_applications.h"
+#include "gnunet-service-fs.h"
+#include "gnunet-service-fs_indexing.h"
+#include "gnunet-service-fs_cadet.h"
+
+
+/**
+ * After how long do we reset connections without replies?
+ */
+#define CLIENT_RETRY_TIMEOUT GNUNET_TIME_relative_multiply
(GNUNET_TIME_UNIT_SECONDS, 30)
+
+
+/**
+ * Handle for a cadet to another peer.
+ */
+struct CadetHandle;
+
+
+/**
+ * Handle for a request that is going out via cadet API.
+ */
+struct GSF_CadetRequest
+{
+
+ /**
+ * DLL.
+ */
+ struct GSF_CadetRequest *next;
+
+ /**
+ * DLL.
+ */
+ struct GSF_CadetRequest *prev;
+
+ /**
+ * Which cadet is this request associated with?
+ */
+ struct CadetHandle *mh;
+
+ /**
+ * Function to call with the result.
+ */
+ GSF_CadetReplyProcessor proc;
+
+ /**
+ * Closure for 'proc'
+ */
+ void *proc_cls;
+
+ /**
+ * Query to transmit to the other peer.
+ */
+ struct GNUNET_HashCode query;
+
+ /**
+ * Desired type for the reply.
+ */
+ enum GNUNET_BLOCK_Type type;
+
+ /**
+ * Did we transmit this request already? #GNUNET_YES if we are
+ * in the 'waiting_map', #GNUNET_NO if we are in the 'pending' DLL.
+ */
+ int was_transmitted;
+};
+
+
+/**
+ * Handle for a cadet to another peer.
+ */
+struct CadetHandle
+{
+ /**
+ * Head of DLL of pending requests on this cadet.
+ */
+ struct GSF_CadetRequest *pending_head;
+
+ /**
+ * Tail of DLL of pending requests on this cadet.
+ */
+ struct GSF_CadetRequest *pending_tail;
+
+ /**
+ * Map from query to `struct GSF_CadetRequest`s waiting for
+ * a reply.
+ */
+ struct GNUNET_CONTAINER_MultiHashMap *waiting_map;
+
+ /**
+ * Channel to the other peer.
+ */
+ struct GNUNET_CADET_Channel *channel;
+
+ /**
+ * Handle for active write operation, or NULL.
+ */
+ struct GNUNET_CADET_TransmitHandle *wh;
+
+ /**
+ * Which peer does this cadet go to?
+ */
+ struct GNUNET_PeerIdentity target;
+
+ /**
+ * Task to kill inactive cadets (we keep them around for
+ * a few seconds to give the application a chance to give
+ * us another query).
+ */
+ GNUNET_SCHEDULER_TaskIdentifier timeout_task;
+
+ /**
+ * Task to reset cadets that had errors (asynchronously,
+ * as we may not be able to do it immediately during a
+ * callback from the cadet API).
+ */
+ GNUNET_SCHEDULER_TaskIdentifier reset_task;
+
+};
+
+
+/**
+ * Cadet channel for creating outbound channels.
+ */
+static struct GNUNET_CADET_Handle *cadet_handle;
+
+/**
+ * Map from peer identities to 'struct CadetHandles' with cadet
+ * channels to those peers.
+ */
+static struct GNUNET_CONTAINER_MultiPeerMap *cadet_map;
+
+
+/* ********************* client-side code ************************* */
+
+
+/**
+ * Transmit pending requests via the cadet.
+ *
+ * @param mh cadet to process
+ */
+static void
+transmit_pending (struct CadetHandle *mh);
+
+
+/**
+ * Iterator called on each entry in a waiting map to
+ * move it back to the pending list.
+ *
+ * @param cls the `struct CadetHandle`
+ * @param key the key of the entry in the map (the query)
+ * @param value the `struct GSF_CadetRequest` to move to pending
+ * @return #GNUNET_YES (continue to iterate)
+ */
+static int
+move_to_pending (void *cls,
+ const struct GNUNET_HashCode *key,
+ void *value)
+{
+ struct CadetHandle *mh = cls;
+ struct GSF_CadetRequest *sr = value;
+
+ GNUNET_assert (GNUNET_YES ==
+ GNUNET_CONTAINER_multihashmap_remove (mh->waiting_map,
+ key,
+ value));
+ GNUNET_CONTAINER_DLL_insert (mh->pending_head,
+ mh->pending_tail,
+ sr);
+ sr->was_transmitted = GNUNET_NO;
+ return GNUNET_YES;
+}
+
+
+/**
+ * We had a serious error, tear down and re-create cadet from scratch.
+ *
+ * @param mh cadet to reset
+ */
+static void
+reset_cadet (struct CadetHandle *mh)
+{
+ struct GNUNET_CADET_Channel *channel = mh->channel;
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Resetting cadet channel to %s\n",
+ GNUNET_i2s (&mh->target));
+ mh->channel = NULL;
+ if (NULL != channel)
+ GNUNET_CADET_channel_destroy (channel);
+ GNUNET_CONTAINER_multihashmap_iterate (mh->waiting_map,
+ &move_to_pending,
+ mh);
+ mh->channel = GNUNET_CADET_channel_create (cadet_handle,
+ mh,
+ &mh->target,
+
GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER,
+ GNUNET_CADET_OPTION_RELIABLE);
+ transmit_pending (mh);
+}
+
+
+/**
+ * Task called when it is time to destroy an inactive cadet channel.
+ *
+ * @param cls the `struct CadetHandle` to tear down
+ * @param tc scheduler context, unused
+ */
+static void
+cadet_timeout (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ struct CadetHandle *mh = cls;
+ struct GNUNET_CADET_Channel *tun;
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Timeout on cadet channel to %s\n",
+ GNUNET_i2s (&mh->target));
+ mh->timeout_task = GNUNET_SCHEDULER_NO_TASK;
+ tun = mh->channel;
+ mh->channel = NULL;
+ GNUNET_CADET_channel_destroy (tun);
+}
+
+
+/**
+ * Task called when it is time to reset an cadet.
+ *
+ * @param cls the `struct CadetHandle` to tear down
+ * @param tc scheduler context, unused
+ */
+static void
+reset_cadet_task (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ struct CadetHandle *mh = cls;
+
+ mh->reset_task = GNUNET_SCHEDULER_NO_TASK;
+ reset_cadet (mh);
+}
+
+
+/**
+ * We had a serious error, tear down and re-create cadet from scratch,
+ * but do so asynchronously.
+ *
+ * @param mh cadet to reset
+ */
+static void
+reset_cadet_async (struct CadetHandle *mh)
+{
+ if (GNUNET_SCHEDULER_NO_TASK != mh->reset_task)
+ GNUNET_SCHEDULER_cancel (mh->reset_task);
+ mh->reset_task = GNUNET_SCHEDULER_add_now (&reset_cadet_task,
+ mh);
+}
+
+
+/**
+ * Functions of this signature are called whenever we are ready to transmit
+ * query via a cadet.
+ *
+ * @param cls the struct CadetHandle for which we did the write call
+ * @param size the number of bytes that can be written to @a buf
+ * @param buf where to write the message
+ * @return number of bytes written to @a buf
+ */
+static size_t
+transmit_sqm (void *cls,
+ size_t size,
+ void *buf)
+{
+ struct CadetHandle *mh = cls;
+ struct CadetQueryMessage sqm;
+ struct GSF_CadetRequest *sr;
+
+ mh->wh = NULL;
+ if (NULL == buf)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Cadet channel to %s failed during transmission attempt,
rebuilding\n",
+ GNUNET_i2s (&mh->target));
+ reset_cadet_async (mh);
+ return 0;
+ }
+ sr = mh->pending_head;
+ if (NULL == sr)
+ return 0;
+ GNUNET_assert (size >= sizeof (struct CadetQueryMessage));
+ GNUNET_CONTAINER_DLL_remove (mh->pending_head,
+ mh->pending_tail,
+ sr);
+ GNUNET_assert (GNUNET_OK ==
+ GNUNET_CONTAINER_multihashmap_put (mh->waiting_map,
+ &sr->query,
+ sr,
+
GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
+ sr->was_transmitted = GNUNET_YES;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Sending query for %s via cadet to %s\n",
+ GNUNET_h2s (&sr->query),
+ GNUNET_i2s (&mh->target));
+ sqm.header.size = htons (sizeof (sqm));
+ sqm.header.type = htons (GNUNET_MESSAGE_TYPE_FS_CADET_QUERY);
+ sqm.type = htonl (sr->type);
+ sqm.query = sr->query;
+ memcpy (buf, &sqm, sizeof (sqm));
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Successfully transmitted %u bytes via cadet to %s\n",
+ (unsigned int) size,
+ GNUNET_i2s (&mh->target));
+ transmit_pending (mh);
+ return sizeof (sqm);
+}
+
+
+/**
+ * Transmit pending requests via the cadet.
+ *
+ * @param mh cadet to process
+ */
+static void
+transmit_pending (struct CadetHandle *mh)
+{
+ if (NULL == mh->channel)
+ return;
+ if (NULL != mh->wh)
+ return;
+ mh->wh = GNUNET_CADET_notify_transmit_ready (mh->channel, GNUNET_YES /*
allow cork */,
+ GNUNET_TIME_UNIT_FOREVER_REL,
+ sizeof (struct CadetQueryMessage),
+ &transmit_sqm, mh);
+}
+
+
+/**
+ * Closure for handle_reply().
+ */
+struct HandleReplyClosure
+{
+
+ /**
+ * Reply payload.
+ */
+ const void *data;
+
+ /**
+ * Expiration time for the block.
+ */
+ struct GNUNET_TIME_Absolute expiration;
+
+ /**
+ * Number of bytes in 'data'.
+ */
+ size_t data_size;
+
+ /**
+ * Type of the block.
+ */
+ enum GNUNET_BLOCK_Type type;
+
+ /**
+ * Did we have a matching query?
+ */
+ int found;
+};
+
+
+/**
+ * Iterator called on each entry in a waiting map to
+ * process a result.
+ *
+ * @param cls the `struct HandleReplyClosure`
+ * @param key the key of the entry in the map (the query)
+ * @param value the `struct GSF_CadetRequest` to handle result for
+ * @return #GNUNET_YES (continue to iterate)
+ */
+static int
+handle_reply (void *cls,
+ const struct GNUNET_HashCode *key,
+ void *value)
+{
+ struct HandleReplyClosure *hrc = cls;
+ struct GSF_CadetRequest *sr = value;
+
+ sr->proc (sr->proc_cls,
+ hrc->type,
+ hrc->expiration,
+ hrc->data_size,
+ hrc->data);
+ sr->proc = NULL;
+ GSF_cadet_query_cancel (sr);
+ hrc->found = GNUNET_YES;
+ return GNUNET_YES;
+}
+
+
+/**
+ * Functions with this signature are called whenever a complete reply
+ * is received.
+ *
+ * @param cls closure with the `struct CadetHandle`
+ * @param channel channel handle
+ * @param channel_ctx channel context
+ * @param message the actual message
+ * @return #GNUNET_OK on success, #GNUNET_SYSERR to stop further processing
+ */
+static int
+reply_cb (void *cls,
+ struct GNUNET_CADET_Channel *channel,
+ void **channel_ctx,
+ const struct GNUNET_MessageHeader *message)
+{
+ struct CadetHandle *mh = *channel_ctx;
+ const struct CadetReplyMessage *srm;
+ struct HandleReplyClosure hrc;
+ uint16_t msize;
+ enum GNUNET_BLOCK_Type type;
+ struct GNUNET_HashCode query;
+
+ msize = ntohs (message->size);
+ if (sizeof (struct CadetReplyMessage) > msize)
+ {
+ GNUNET_break_op (0);
+ reset_cadet_async (mh);
+ return GNUNET_SYSERR;
+ }
+ srm = (const struct CadetReplyMessage *) message;
+ msize -= sizeof (struct CadetReplyMessage);
+ type = (enum GNUNET_BLOCK_Type) ntohl (srm->type);
+ if (GNUNET_YES !=
+ GNUNET_BLOCK_get_key (GSF_block_ctx,
+ type,
+ &srm[1], msize, &query))
+ {
+ GNUNET_break_op (0);
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Received bogus reply of type %u with %u bytes via cadet from
peer %s\n",
+ type,
+ msize,
+ GNUNET_i2s (&mh->target));
+ reset_cadet_async (mh);
+ return GNUNET_SYSERR;
+ }
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Received reply `%s' via cadet from peer %s\n",
+ GNUNET_h2s (&query),
+ GNUNET_i2s (&mh->target));
+ GNUNET_CADET_receive_done (channel);
+ GNUNET_STATISTICS_update (GSF_stats,
+ gettext_noop ("# replies received via cadet"), 1,
+ GNUNET_NO);
+ hrc.data = &srm[1];
+ hrc.data_size = msize;
+ hrc.expiration = GNUNET_TIME_absolute_ntoh (srm->expiration);
+ hrc.type = type;
+ hrc.found = GNUNET_NO;
+ GNUNET_CONTAINER_multihashmap_get_multiple (mh->waiting_map,
+ &query,
+ &handle_reply,
+ &hrc);
+ if (GNUNET_NO == hrc.found)
+ {
+ GNUNET_STATISTICS_update (GSF_stats,
+ gettext_noop ("# replies received via cadet
dropped"), 1,
+ GNUNET_NO);
+ return GNUNET_OK;
+ }
+ return GNUNET_OK;
+}
+
+
+/**
+ * Get (or create) a cadet to talk to the given peer.
+ *
+ * @param target peer we want to communicate with
+ */
+static struct CadetHandle *
+get_cadet (const struct GNUNET_PeerIdentity *target)
+{
+ struct CadetHandle *mh;
+
+ mh = GNUNET_CONTAINER_multipeermap_get (cadet_map,
+ target);
+ if (NULL != mh)
+ {
+ if (GNUNET_SCHEDULER_NO_TASK != mh->timeout_task)
+ {
+ GNUNET_SCHEDULER_cancel (mh->timeout_task);
+ mh->timeout_task = GNUNET_SCHEDULER_NO_TASK;
+ }
+ return mh;
+ }
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Creating cadet channel to %s\n",
+ GNUNET_i2s (target));
+ mh = GNUNET_new (struct CadetHandle);
+ mh->reset_task = GNUNET_SCHEDULER_add_delayed (CLIENT_RETRY_TIMEOUT,
+ &reset_cadet_task,
+ mh);
+ mh->waiting_map = GNUNET_CONTAINER_multihashmap_create (16, GNUNET_YES);
+ mh->target = *target;
+ GNUNET_assert (GNUNET_OK ==
+ GNUNET_CONTAINER_multipeermap_put (cadet_map,
+ &mh->target,
+ mh,
+
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
+ mh->channel = GNUNET_CADET_channel_create (cadet_handle,
+ mh,
+ &mh->target,
+
GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER,
+ GNUNET_CADET_OPTION_RELIABLE);
+ GNUNET_assert (mh ==
+ GNUNET_CONTAINER_multipeermap_get (cadet_map,
+ target));
+ return mh;
+}
+
+
+/**
+ * Look for a block by directly contacting a particular peer.
+ *
+ * @param target peer that should have the block
+ * @param query hash to query for the block
+ * @param type desired type for the block
+ * @param proc function to call with result
+ * @param proc_cls closure for @a proc
+ * @return handle to cancel the operation
+ */
+struct GSF_CadetRequest *
+GSF_cadet_query (const struct GNUNET_PeerIdentity *target,
+ const struct GNUNET_HashCode *query,
+ enum GNUNET_BLOCK_Type type,
+ GSF_CadetReplyProcessor proc, void *proc_cls)
+{
+ struct CadetHandle *mh;
+ struct GSF_CadetRequest *sr;
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Preparing to send query for %s via cadet to %s\n",
+ GNUNET_h2s (query),
+ GNUNET_i2s (target));
+ mh = get_cadet (target);
+ sr = GNUNET_new (struct GSF_CadetRequest);
+ sr->mh = mh;
+ sr->proc = proc;
+ sr->proc_cls = proc_cls;
+ sr->type = type;
+ sr->query = *query;
+ GNUNET_CONTAINER_DLL_insert (mh->pending_head,
+ mh->pending_tail,
+ sr);
+ transmit_pending (mh);
+ return sr;
+}
+
+
+/**
+ * Cancel an active request; must not be called after 'proc'
+ * was calld.
+ *
+ * @param sr request to cancel
+ */
+void
+GSF_cadet_query_cancel (struct GSF_CadetRequest *sr)
+{
+ struct CadetHandle *mh = sr->mh;
+ GSF_CadetReplyProcessor p;
+
+ p = sr->proc;
+ sr->proc = NULL;
+ if (NULL != p)
+ {
+ /* signal failure / cancellation to callback */
+ p (sr->proc_cls, GNUNET_BLOCK_TYPE_ANY,
+ GNUNET_TIME_UNIT_ZERO_ABS,
+ 0, NULL);
+ }
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Cancelled query for %s via cadet to %s\n",
+ GNUNET_h2s (&sr->query),
+ GNUNET_i2s (&sr->mh->target));
+ if (GNUNET_YES == sr->was_transmitted)
+ GNUNET_assert (GNUNET_OK ==
+ GNUNET_CONTAINER_multihashmap_remove (mh->waiting_map,
+ &sr->query,
+ sr));
+ else
+ GNUNET_CONTAINER_DLL_remove (mh->pending_head,
+ mh->pending_tail,
+ sr);
+ GNUNET_free (sr);
+ if ( (0 == GNUNET_CONTAINER_multihashmap_size (mh->waiting_map)) &&
+ (NULL == mh->pending_head) )
+ mh->timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS,
+ &cadet_timeout,
+ mh);
+}
+
+
+/**
+ * Iterator called on each entry in a waiting map to
+ * call the 'proc' continuation and release associated
+ * resources.
+ *
+ * @param cls the `struct CadetHandle`
+ * @param key the key of the entry in the map (the query)
+ * @param value the `struct GSF_CadetRequest` to clean up
+ * @return #GNUNET_YES (continue to iterate)
+ */
+static int
+free_waiting_entry (void *cls,
+ const struct GNUNET_HashCode *key,
+ void *value)
+{
+ struct GSF_CadetRequest *sr = value;
+
+ GSF_cadet_query_cancel (sr);
+ return GNUNET_YES;
+}
+
+
+/**
+ * Function called by cadet when a client disconnects.
+ * Cleans up our `struct CadetClient` of that channel.
+ *
+ * @param cls NULL
+ * @param channel channel of the disconnecting client
+ * @param channel_ctx our `struct CadetClient`
+ */
+static void
+cleaner_cb (void *cls,
+ const struct GNUNET_CADET_Channel *channel,
+ void *channel_ctx)
+{
+ struct CadetHandle *mh = channel_ctx;
+ struct GSF_CadetRequest *sr;
+
+ if (NULL == mh->channel)
+ return; /* being destroyed elsewhere */
+ GNUNET_assert (channel == mh->channel);
+ mh->channel = NULL;
+ while (NULL != (sr = mh->pending_head))
+ GSF_cadet_query_cancel (sr);
+ /* first remove `mh` from the `cadet_map`, so that if the
+ callback from `free_waiting_entry()` happens to re-issue
+ the request, we don't immediately have it back in the
+ `waiting_map`. */
+ GNUNET_assert (GNUNET_OK ==
+ GNUNET_CONTAINER_multipeermap_remove (cadet_map,
+ &mh->target,
+ mh));
+ GNUNET_CONTAINER_multihashmap_iterate (mh->waiting_map,
+ &free_waiting_entry,
+ mh);
+ if (NULL != mh->wh)
+ GNUNET_CADET_notify_transmit_ready_cancel (mh->wh);
+ if (GNUNET_SCHEDULER_NO_TASK != mh->timeout_task)
+ GNUNET_SCHEDULER_cancel (mh->timeout_task);
+ if (GNUNET_SCHEDULER_NO_TASK != mh->reset_task)
+ GNUNET_SCHEDULER_cancel (mh->reset_task);
+ GNUNET_assert (0 ==
+ GNUNET_CONTAINER_multihashmap_size (mh->waiting_map));
+ GNUNET_CONTAINER_multihashmap_destroy (mh->waiting_map);
+ GNUNET_free (mh);
+}
+
+
+/**
+ * Initialize subsystem for non-anonymous file-sharing.
+ */
+void
+GSF_cadet_start_client ()
+{
+ static const struct GNUNET_CADET_MessageHandler handlers[] = {
+ { &reply_cb, GNUNET_MESSAGE_TYPE_FS_CADET_REPLY, 0 },
+ { NULL, 0, 0 }
+ };
+
+ cadet_map = GNUNET_CONTAINER_multipeermap_create (16, GNUNET_YES);
+ cadet_handle = GNUNET_CADET_connect (GSF_cfg,
+ NULL,
+ NULL,
+ &cleaner_cb,
+ handlers,
+ NULL);
+}
+
+
+/**
+ * Function called on each active cadets to shut them down.
+ *
+ * @param cls NULL
+ * @param key target peer, unused
+ * @param value the `struct CadetHandle` to destroy
+ * @return #GNUNET_YES (continue to iterate)
+ */
+static int
+release_cadets (void *cls,
+ const struct GNUNET_PeerIdentity *key,
+ void *value)
+{
+ struct CadetHandle *mh = value;
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Timeout on cadet channel to %s\n",
+ GNUNET_i2s (&mh->target));
+ if (NULL != mh->channel)
+ GNUNET_CADET_channel_destroy (mh->channel);
+ return GNUNET_YES;
+}
+
+
+/**
+ * Shutdown subsystem for non-anonymous file-sharing.
+ */
+void
+GSF_cadet_stop_client ()
+{
+ GNUNET_CONTAINER_multipeermap_iterate (cadet_map,
+ &release_cadets,
+ NULL);
+ GNUNET_CONTAINER_multipeermap_destroy (cadet_map);
+ cadet_map = NULL;
+ if (NULL != cadet_handle)
+ {
+ GNUNET_CADET_disconnect (cadet_handle);
+ cadet_handle = NULL;
+ }
+}
+
+
+/* end of gnunet-service-fs_cadet_client.c */
Copied: gnunet/src/fs/gnunet-service-fs_cadet_server.c (from rev 33199,
gnunet/src/fs/gnunet-service-fs_mesh_server.c)
===================================================================
--- gnunet/src/fs/gnunet-service-fs_cadet_server.c
(rev 0)
+++ gnunet/src/fs/gnunet-service-fs_cadet_server.c 2014-05-07 12:07:44 UTC
(rev 33200)
@@ -0,0 +1,595 @@
+/*
+ This file is part of GNUnet.
+ (C) 2012, 2013 Christian Grothoff (and other contributing authors)
+
+ GNUnet is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published
+ by the Free Software Foundation; either version 3, or (at your
+ option) any later version.
+
+ GNUnet is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with GNUnet; see the file COPYING. If not, write to the
+ Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ Boston, MA 02111-1307, USA.
+*/
+
+/**
+ * @file fs/gnunet-service-fs_cadet_server.c
+ * @brief non-anonymous file-transfer
+ * @author Christian Grothoff
+ *
+ * TODO:
+ * - PORT is set to old application type, unsure if we should keep
+ * it that way (fine for now)
+ */
+#include "platform.h"
+#include "gnunet_constants.h"
+#include "gnunet_util_lib.h"
+#include "gnunet_cadet_service.h"
+#include "gnunet_protocols.h"
+#include "gnunet_applications.h"
+#include "gnunet-service-fs.h"
+#include "gnunet-service-fs_indexing.h"
+#include "gnunet-service-fs_cadet.h"
+
+/**
+ * After how long do we termiante idle connections?
+ */
+#define IDLE_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES,
2)
+
+
+/**
+ * A message in the queue to be written to the cadet.
+ */
+struct WriteQueueItem
+{
+ /**
+ * Kept in a DLL.
+ */
+ struct WriteQueueItem *next;
+
+ /**
+ * Kept in a DLL.
+ */
+ struct WriteQueueItem *prev;
+
+ /**
+ * Number of bytes of payload, allocated at the end of this struct.
+ */
+ size_t msize;
+};
+
+
+/**
+ * Information we keep around for each active cadeting client.
+ */
+struct CadetClient
+{
+ /**
+ * DLL
+ */
+ struct CadetClient *next;
+
+ /**
+ * DLL
+ */
+ struct CadetClient *prev;
+
+ /**
+ * Channel for communication.
+ */
+ struct GNUNET_CADET_Channel *channel;
+
+ /**
+ * Handle for active write operation, or NULL.
+ */
+ struct GNUNET_CADET_TransmitHandle *wh;
+
+ /**
+ * Head of write queue.
+ */
+ struct WriteQueueItem *wqi_head;
+
+ /**
+ * Tail of write queue.
+ */
+ struct WriteQueueItem *wqi_tail;
+
+ /**
+ * Current active request to the datastore, if we have one pending.
+ */
+ struct GNUNET_DATASTORE_QueueEntry *qe;
+
+ /**
+ * Task that is scheduled to asynchronously terminate the connection.
+ */
+ GNUNET_SCHEDULER_TaskIdentifier terminate_task;
+
+ /**
+ * Task that is scheduled to terminate idle connections.
+ */
+ GNUNET_SCHEDULER_TaskIdentifier timeout_task;
+
+ /**
+ * Size of the last write that was initiated.
+ */
+ size_t reply_size;
+
+};
+
+
+/**
+ * Listen channel for incoming requests.
+ */
+static struct GNUNET_CADET_Handle *listen_channel;
+
+/**
+ * Head of DLL of cadet clients.
+ */
+static struct CadetClient *sc_head;
+
+/**
+ * Tail of DLL of cadet clients.
+ */
+static struct CadetClient *sc_tail;
+
+/**
+ * Number of active cadet clients in the 'sc_*'-DLL.
+ */
+static unsigned int sc_count;
+
+/**
+ * Maximum allowed number of cadet clients.
+ */
+static unsigned long long sc_count_max;
+
+
+
+/**
+ * Task run to asynchronously terminate the cadet due to timeout.
+ *
+ * @param cls the 'struct CadetClient'
+ * @param tc scheduler context
+ */
+static void
+timeout_cadet_task (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ struct CadetClient *sc = cls;
+ struct GNUNET_CADET_Channel *tun;
+
+ sc->timeout_task = GNUNET_SCHEDULER_NO_TASK;
+ tun = sc->channel;
+ sc->channel = NULL;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Timeout for inactive cadet client %p\n",
+ sc);
+ GNUNET_CADET_channel_destroy (tun);
+}
+
+
+/**
+ * Reset the timeout for the cadet client (due to activity).
+ *
+ * @param sc client handle to reset timeout for
+ */
+static void
+refresh_timeout_task (struct CadetClient *sc)
+{
+ if (GNUNET_SCHEDULER_NO_TASK != sc->timeout_task)
+ GNUNET_SCHEDULER_cancel (sc->timeout_task);
+ sc->timeout_task = GNUNET_SCHEDULER_add_delayed (IDLE_TIMEOUT,
+ &timeout_cadet_task,
+ sc);
+}
+
+
+/**
+ * We're done handling a request from a client, read the next one.
+ *
+ * @param sc client to continue reading requests from
+ */
+static void
+continue_reading (struct CadetClient *sc)
+{
+ refresh_timeout_task (sc);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Finished processing cadet request from client %p, ready to
receive the next one\n",
+ sc);
+ GNUNET_CADET_receive_done (sc->channel);
+}
+
+
+/**
+ * Transmit the next entry from the write queue.
+ *
+ * @param sc where to process the write queue
+ */
+static void
+continue_writing (struct CadetClient *sc);
+
+
+/**
+ * Send a reply now, cadet is ready.
+ *
+ * @param cls closure with the `struct CadetClient` which sent the query
+ * @param size number of bytes available in @a buf
+ * @param buf where to write the message
+ * @return number of bytes written to @a buf
+ */
+static size_t
+write_continuation (void *cls,
+ size_t size,
+ void *buf)
+{
+ struct CadetClient *sc = cls;
+ struct GNUNET_CADET_Channel *tun;
+ struct WriteQueueItem *wqi;
+ size_t ret;
+
+ sc->wh = NULL;
+ if (NULL == (wqi = sc->wqi_head))
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Write queue empty, reading more requests\n");
+ return 0;
+ }
+ if ( (0 == size) ||
+ (size < wqi->msize) )
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Transmission of reply failed, terminating cadet\n");
+ tun = sc->channel;
+ sc->channel = NULL;
+ GNUNET_CADET_channel_destroy (tun);
+ return 0;
+ }
+ GNUNET_CONTAINER_DLL_remove (sc->wqi_head,
+ sc->wqi_tail,
+ wqi);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Transmitted %u byte reply via cadet to %p\n",
+ (unsigned int) size,
+ sc);
+ GNUNET_STATISTICS_update (GSF_stats,
+ gettext_noop ("# Blocks transferred via cadet"), 1,
+ GNUNET_NO);
+ memcpy (buf, &wqi[1], ret = wqi->msize);
+ GNUNET_free (wqi);
+ continue_writing (sc);
+ return ret;
+}
+
+
+/**
+ * Transmit the next entry from the write queue.
+ *
+ * @param sc where to process the write queue
+ */
+static void
+continue_writing (struct CadetClient *sc)
+{
+ struct WriteQueueItem *wqi;
+ struct GNUNET_CADET_Channel *tun;
+
+ if (NULL != sc->wh)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Write pending, waiting for it to complete\n");
+ return; /* write already pending */
+ }
+ if (NULL == (wqi = sc->wqi_head))
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Write queue empty, reading more requests\n");
+ continue_reading (sc);
+ return;
+ }
+ sc->wh = GNUNET_CADET_notify_transmit_ready (sc->channel, GNUNET_NO,
+ GNUNET_TIME_UNIT_FOREVER_REL,
+ wqi->msize,
+ &write_continuation,
+ sc);
+ if (NULL == sc->wh)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Write failed; terminating cadet\n");
+ tun = sc->channel;
+ sc->channel = NULL;
+ GNUNET_CADET_channel_destroy (tun);
+ return;
+ }
+}
+
+
+/**
+ * Process a datum that was stored in the datastore.
+ *
+ * @param cls closure with the `struct CadetClient` which sent the query
+ * @param key key for the content
+ * @param size number of bytes in @a data
+ * @param data content stored
+ * @param type type of the content
+ * @param priority priority of the content
+ * @param anonymity anonymity-level for the content
+ * @param expiration expiration time for the content
+ * @param uid unique identifier for the datum;
+ * maybe 0 if no unique identifier is available
+ */
+static void
+handle_datastore_reply (void *cls,
+ const struct GNUNET_HashCode *key,
+ size_t size, const void *data,
+ enum GNUNET_BLOCK_Type type,
+ uint32_t priority,
+ uint32_t anonymity,
+ struct GNUNET_TIME_Absolute expiration,
+ uint64_t uid)
+{
+ struct CadetClient *sc = cls;
+ size_t msize = size + sizeof (struct CadetReplyMessage);
+ struct WriteQueueItem *wqi;
+ struct CadetReplyMessage *srm;
+
+ sc->qe = NULL;
+ if (NULL == data)
+ {
+ /* no result, this should not really happen, as for
+ non-anonymous routing only peers that HAVE the
+ answers should be queried; OTOH, this is not a
+ hard error as we might have had the answer in the
+ past and the user might have unindexed it. Hence
+ we log at level "INFO" for now. */
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Have no answer for query `%s'\n",
+ GNUNET_h2s (key));
+ GNUNET_STATISTICS_update (GSF_stats,
+ gettext_noop ("# queries received via cadet not
answered"), 1,
+ GNUNET_NO);
+ continue_writing (sc);
+ return;
+ }
+ if (GNUNET_BLOCK_TYPE_FS_ONDEMAND == type)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Performing on-demand encoding for query %s\n",
+ GNUNET_h2s (key));
+ if (GNUNET_OK !=
+ GNUNET_FS_handle_on_demand_block (key,
+ size, data, type,
+ priority, anonymity,
+ expiration, uid,
+ &handle_datastore_reply,
+ sc))
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "On-demand encoding request failed\n");
+ continue_writing (sc);
+ }
+ return;
+ }
+ if (msize > GNUNET_SERVER_MAX_MESSAGE_SIZE)
+ {
+ GNUNET_break (0);
+ continue_writing (sc);
+ return;
+ }
+ GNUNET_break (GNUNET_BLOCK_TYPE_ANY != type);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Starting transmission of %u byte reply of type %d for query `%s'
via cadet to %p\n",
+ (unsigned int) size,
+ (unsigned int) type,
+ GNUNET_h2s (key),
+ sc);
+ wqi = GNUNET_malloc (sizeof (struct WriteQueueItem) + msize);
+ wqi->msize = msize;
+ srm = (struct CadetReplyMessage *) &wqi[1];
+ srm->header.size = htons ((uint16_t) msize);
+ srm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_CADET_REPLY);
+ srm->type = htonl (type);
+ srm->expiration = GNUNET_TIME_absolute_hton (expiration);
+ memcpy (&srm[1], data, size);
+ sc->reply_size = msize;
+ GNUNET_CONTAINER_DLL_insert (sc->wqi_head,
+ sc->wqi_tail,
+ wqi);
+ continue_writing (sc);
+}
+
+
+/**
+ * Functions with this signature are called whenever a
+ * complete query message is received.
+ *
+ * Do not call #GNUNET_SERVER_mst_destroy in callback
+ *
+ * @param cls closure with the 'struct CadetClient'
+ * @param channel channel handle
+ * @param channel_ctx channel context
+ * @param message the actual message
+ * @return #GNUNET_OK on success, #GNUNET_SYSERR to stop further processing
+ */
+static int
+request_cb (void *cls,
+ struct GNUNET_CADET_Channel *channel,
+ void **channel_ctx,
+ const struct GNUNET_MessageHeader *message)
+{
+ struct CadetClient *sc = *channel_ctx;
+ const struct CadetQueryMessage *sqm;
+
+ sqm = (const struct CadetQueryMessage *) message;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Received query for `%s' via cadet from client %p\n",
+ GNUNET_h2s (&sqm->query),
+ sc);
+ GNUNET_STATISTICS_update (GSF_stats,
+ gettext_noop ("# queries received via cadet"), 1,
+ GNUNET_NO);
+ refresh_timeout_task (sc);
+ sc->qe = GNUNET_DATASTORE_get_key (GSF_dsh,
+ 0,
+ &sqm->query,
+ ntohl (sqm->type),
+ 0 /* priority */,
+ GSF_datastore_queue_size,
+ GNUNET_TIME_UNIT_FOREVER_REL,
+ &handle_datastore_reply, sc);
+ if (NULL == sc->qe)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Queueing request with datastore failed (queue full?)\n");
+ continue_writing (sc);
+ }
+ return GNUNET_OK;
+}
+
+
+/**
+ * Functions of this type are called upon new cadet connection from other
peers.
+ *
+ * @param cls the closure from GNUNET_CADET_connect
+ * @param channel the channel representing the cadet
+ * @param initiator the identity of the peer who wants to establish a cadet
+ * with us; NULL on binding error
+ * @param port cadet port used for the incoming connection
+ * @param options channel option flags
+ * @return initial channel context (our 'struct CadetClient')
+ */
+static void *
+accept_cb (void *cls,
+ struct GNUNET_CADET_Channel *channel,
+ const struct GNUNET_PeerIdentity *initiator,
+ uint32_t port, enum GNUNET_CADET_ChannelOption options)
+{
+ struct CadetClient *sc;
+
+ GNUNET_assert (NULL != channel);
+ if (sc_count >= sc_count_max)
+ {
+ GNUNET_STATISTICS_update (GSF_stats,
+ gettext_noop ("# cadet client connections
rejected"), 1,
+ GNUNET_NO);
+ GNUNET_CADET_channel_destroy (channel);
+ return NULL;
+ }
+ GNUNET_STATISTICS_update (GSF_stats,
+ gettext_noop ("# cadet connections active"), 1,
+ GNUNET_NO);
+ sc = GNUNET_new (struct CadetClient);
+ sc->channel = channel;
+ GNUNET_CONTAINER_DLL_insert (sc_head,
+ sc_tail,
+ sc);
+ sc_count++;
+ refresh_timeout_task (sc);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Accepting inbound cadet connection from `%s' as client %p\n",
+ GNUNET_i2s (initiator),
+ sc);
+ return sc;
+}
+
+
+/**
+ * Function called by cadet when a client disconnects.
+ * Cleans up our 'struct CadetClient' of that channel.
+ *
+ * @param cls NULL
+ * @param channel channel of the disconnecting client
+ * @param channel_ctx our 'struct CadetClient'
+ */
+static void
+cleaner_cb (void *cls,
+ const struct GNUNET_CADET_Channel *channel,
+ void *channel_ctx)
+{
+ struct CadetClient *sc = channel_ctx;
+ struct WriteQueueItem *wqi;
+
+ if (NULL == sc)
+ return;
+ sc->channel = NULL;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Terminating cadet connection with client %p\n",
+ sc);
+ GNUNET_STATISTICS_update (GSF_stats,
+ gettext_noop ("# cadet connections active"), -1,
+ GNUNET_NO);
+ if (GNUNET_SCHEDULER_NO_TASK != sc->terminate_task)
+ GNUNET_SCHEDULER_cancel (sc->terminate_task);
+ if (GNUNET_SCHEDULER_NO_TASK != sc->timeout_task)
+ GNUNET_SCHEDULER_cancel (sc->timeout_task);
+ if (NULL != sc->wh)
+ GNUNET_CADET_notify_transmit_ready_cancel (sc->wh);
+ if (NULL != sc->qe)
+ GNUNET_DATASTORE_cancel (sc->qe);
+ while (NULL != (wqi = sc->wqi_head))
+ {
+ GNUNET_CONTAINER_DLL_remove (sc->wqi_head,
+ sc->wqi_tail,
+ wqi);
+ GNUNET_free (wqi);
+ }
+ GNUNET_CONTAINER_DLL_remove (sc_head,
+ sc_tail,
+ sc);
+ sc_count--;
+ GNUNET_free (sc);
+}
+
+
+/**
+ * Initialize subsystem for non-anonymous file-sharing.
+ */
+void
+GSF_cadet_start_server ()
+{
+ static const struct GNUNET_CADET_MessageHandler handlers[] = {
+ { &request_cb, GNUNET_MESSAGE_TYPE_FS_CADET_QUERY, sizeof (struct
CadetQueryMessage)},
+ { NULL, 0, 0 }
+ };
+ static const uint32_t ports[] = {
+ GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER,
+ 0
+ };
+
+ if (GNUNET_YES !=
+ GNUNET_CONFIGURATION_get_value_number (GSF_cfg,
+ "fs",
+ "MAX_CADET_CLIENTS",
+ &sc_count_max))
+ return;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Initializing cadet FS server with a limit of %llu connections\n",
+ sc_count_max);
+ listen_channel = GNUNET_CADET_connect (GSF_cfg,
+ NULL,
+ &accept_cb,
+ &cleaner_cb,
+ handlers,
+ ports);
+}
+
+
+/**
+ * Shutdown subsystem for non-anonymous file-sharing.
+ */
+void
+GSF_cadet_stop_server ()
+{
+ if (NULL != listen_channel)
+ {
+ GNUNET_CADET_disconnect (listen_channel);
+ listen_channel = NULL;
+ }
+ GNUNET_assert (NULL == sc_head);
+ GNUNET_assert (0 == sc_count);
+}
+
+/* end of gnunet-service-fs_cadet.c */
Deleted: gnunet/src/fs/gnunet-service-fs_mesh.h
===================================================================
--- gnunet/src/fs/gnunet-service-fs_mesh.h 2014-05-07 12:07:42 UTC (rev
33199)
+++ gnunet/src/fs/gnunet-service-fs_mesh.h 2014-05-07 12:07:44 UTC (rev
33200)
@@ -1,159 +0,0 @@
-/*
- This file is part of GNUnet.
- (C) 2012 Christian Grothoff (and other contributing authors)
-
- GNUnet is free software; you can redistribute it and/or modify
- it under the terms of the GNU General Public License as published
- by the Free Software Foundation; either version 3, or (at your
- option) any later version.
-
- GNUnet is distributed in the hope that it will be useful, but
- WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with GNUnet; see the file COPYING. If not, write to the
- Free Software Foundation, Inc., 59 Temple Place - Suite 330,
- Boston, MA 02111-1307, USA.
-*/
-
-/**
- * @file fs/gnunet-service-fs_cadet.h
- * @brief non-anonymous file-transfer
- * @author Christian Grothoff
- */
-#ifndef GNUNET_SERVICE_FS_CADET_H
-#define GNUNET_SERVICE_FS_CADET_H
-
-/**
- * Handle for a request that is going out via cadet API.
- */
-struct GSF_CadetRequest;
-
-
-/**
- * Function called with a reply from the cadet.
- *
- * @param cls closure
- * @param type type of the block, ANY on error
- * @param expiration expiration time for the block
- * @param data_size number of bytes in 'data', 0 on error
- * @param data reply block data, NULL on error
- */
-typedef void (*GSF_CadetReplyProcessor)(void *cls,
- enum GNUNET_BLOCK_Type type,
- struct GNUNET_TIME_Absolute expiration,
- size_t data_size,
- const void *data);
-
-
-/**
- * Look for a block by directly contacting a particular peer.
- *
- * @param target peer that should have the block
- * @param query hash to query for the block
- * @param type desired type for the block
- * @param proc function to call with result
- * @param proc_cls closure for 'proc'
- * @return handle to cancel the operation
- */
-struct GSF_CadetRequest *
-GSF_cadet_query (const struct GNUNET_PeerIdentity *target,
- const struct GNUNET_HashCode *query,
- enum GNUNET_BLOCK_Type type,
- GSF_CadetReplyProcessor proc, void *proc_cls);
-
-
-/**
- * Cancel an active request; must not be called after 'proc'
- * was calld.
- *
- * @param sr request to cancel
- */
-void
-GSF_cadet_query_cancel (struct GSF_CadetRequest *sr);
-
-
-/**
- * Initialize subsystem for non-anonymous file-sharing.
- */
-void
-GSF_cadet_start_server (void);
-
-
-/**
- * Shutdown subsystem for non-anonymous file-sharing.
- */
-void
-GSF_cadet_stop_server (void);
-
-/**
- * Initialize subsystem for non-anonymous file-sharing.
- */
-void
-GSF_cadet_start_client (void);
-
-
-/**
- * Shutdown subsystem for non-anonymous file-sharing.
- */
-void
-GSF_cadet_stop_client (void);
-
-
-GNUNET_NETWORK_STRUCT_BEGIN
-
-/**
- * Query from one peer, asking the other for CHK-data.
- */
-struct CadetQueryMessage
-{
-
- /**
- * Type is GNUNET_MESSAGE_TYPE_FS_CADET_QUERY.
- */
- struct GNUNET_MessageHeader header;
-
- /**
- * Block type must be DBLOCK or IBLOCK.
- */
- uint32_t type GNUNET_PACKED;
-
- /**
- * Query hash from CHK (hash of encrypted block).
- */
- struct GNUNET_HashCode query;
-
-};
-
-
-/**
- * Reply to a CadetQueryMessage.
- */
-struct CadetReplyMessage
-{
-
- /**
- * Type is GNUNET_MESSAGE_TYPE_FS_CADET_REPLY.
- */
- struct GNUNET_MessageHeader header;
-
- /**
- * Block type must be DBLOCK or IBLOCK.
- */
- uint32_t type GNUNET_PACKED;
-
- /**
- * Expiration time for the block.
- */
- struct GNUNET_TIME_AbsoluteNBO expiration;
-
- /* followed by the encrypted block */
-
-};
-
-GNUNET_NETWORK_STRUCT_END
-
-
-#endif
Deleted: gnunet/src/fs/gnunet-service-fs_mesh_client.c
===================================================================
--- gnunet/src/fs/gnunet-service-fs_mesh_client.c 2014-05-07 12:07:42 UTC
(rev 33199)
+++ gnunet/src/fs/gnunet-service-fs_mesh_client.c 2014-05-07 12:07:44 UTC
(rev 33200)
@@ -1,766 +0,0 @@
-/*
- This file is part of GNUnet.
- (C) 2012, 2013 Christian Grothoff (and other contributing authors)
-
- GNUnet is free software; you can redistribute it and/or modify
- it under the terms of the GNU General Public License as published
- by the Free Software Foundation; either version 3, or (at your
- option) any later version.
-
- GNUnet is distributed in the hope that it will be useful, but
- WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with GNUnet; see the file COPYING. If not, write to the
- Free Software Foundation, Inc., 59 Temple Place - Suite 330,
- Boston, MA 02111-1307, USA.
-*/
-
-/**
- * @file fs/gnunet-service-fs_cadet_client.c
- * @brief non-anonymous file-transfer
- * @author Christian Grothoff
- *
- * TODO:
- * - PORT is set to old application type, unsure if we should keep
- * it that way (fine for now)
- */
-#include "platform.h"
-#include "gnunet_constants.h"
-#include "gnunet_util_lib.h"
-#include "gnunet_cadet_service.h"
-#include "gnunet_protocols.h"
-#include "gnunet_applications.h"
-#include "gnunet-service-fs.h"
-#include "gnunet-service-fs_indexing.h"
-#include "gnunet-service-fs_cadet.h"
-
-
-/**
- * After how long do we reset connections without replies?
- */
-#define CLIENT_RETRY_TIMEOUT GNUNET_TIME_relative_multiply
(GNUNET_TIME_UNIT_SECONDS, 30)
-
-
-/**
- * Handle for a cadet to another peer.
- */
-struct CadetHandle;
-
-
-/**
- * Handle for a request that is going out via cadet API.
- */
-struct GSF_CadetRequest
-{
-
- /**
- * DLL.
- */
- struct GSF_CadetRequest *next;
-
- /**
- * DLL.
- */
- struct GSF_CadetRequest *prev;
-
- /**
- * Which cadet is this request associated with?
- */
- struct CadetHandle *mh;
-
- /**
- * Function to call with the result.
- */
- GSF_CadetReplyProcessor proc;
-
- /**
- * Closure for 'proc'
- */
- void *proc_cls;
-
- /**
- * Query to transmit to the other peer.
- */
- struct GNUNET_HashCode query;
-
- /**
- * Desired type for the reply.
- */
- enum GNUNET_BLOCK_Type type;
-
- /**
- * Did we transmit this request already? #GNUNET_YES if we are
- * in the 'waiting_map', #GNUNET_NO if we are in the 'pending' DLL.
- */
- int was_transmitted;
-};
-
-
-/**
- * Handle for a cadet to another peer.
- */
-struct CadetHandle
-{
- /**
- * Head of DLL of pending requests on this cadet.
- */
- struct GSF_CadetRequest *pending_head;
-
- /**
- * Tail of DLL of pending requests on this cadet.
- */
- struct GSF_CadetRequest *pending_tail;
-
- /**
- * Map from query to `struct GSF_CadetRequest`s waiting for
- * a reply.
- */
- struct GNUNET_CONTAINER_MultiHashMap *waiting_map;
-
- /**
- * Channel to the other peer.
- */
- struct GNUNET_CADET_Channel *channel;
-
- /**
- * Handle for active write operation, or NULL.
- */
- struct GNUNET_CADET_TransmitHandle *wh;
-
- /**
- * Which peer does this cadet go to?
- */
- struct GNUNET_PeerIdentity target;
-
- /**
- * Task to kill inactive cadets (we keep them around for
- * a few seconds to give the application a chance to give
- * us another query).
- */
- GNUNET_SCHEDULER_TaskIdentifier timeout_task;
-
- /**
- * Task to reset cadets that had errors (asynchronously,
- * as we may not be able to do it immediately during a
- * callback from the cadet API).
- */
- GNUNET_SCHEDULER_TaskIdentifier reset_task;
-
-};
-
-
-/**
- * Cadet channel for creating outbound channels.
- */
-static struct GNUNET_CADET_Handle *cadet_handle;
-
-/**
- * Map from peer identities to 'struct CadetHandles' with cadet
- * channels to those peers.
- */
-static struct GNUNET_CONTAINER_MultiPeerMap *cadet_map;
-
-
-/* ********************* client-side code ************************* */
-
-
-/**
- * Transmit pending requests via the cadet.
- *
- * @param mh cadet to process
- */
-static void
-transmit_pending (struct CadetHandle *mh);
-
-
-/**
- * Iterator called on each entry in a waiting map to
- * move it back to the pending list.
- *
- * @param cls the `struct CadetHandle`
- * @param key the key of the entry in the map (the query)
- * @param value the `struct GSF_CadetRequest` to move to pending
- * @return #GNUNET_YES (continue to iterate)
- */
-static int
-move_to_pending (void *cls,
- const struct GNUNET_HashCode *key,
- void *value)
-{
- struct CadetHandle *mh = cls;
- struct GSF_CadetRequest *sr = value;
-
- GNUNET_assert (GNUNET_YES ==
- GNUNET_CONTAINER_multihashmap_remove (mh->waiting_map,
- key,
- value));
- GNUNET_CONTAINER_DLL_insert (mh->pending_head,
- mh->pending_tail,
- sr);
- sr->was_transmitted = GNUNET_NO;
- return GNUNET_YES;
-}
-
-
-/**
- * We had a serious error, tear down and re-create cadet from scratch.
- *
- * @param mh cadet to reset
- */
-static void
-reset_cadet (struct CadetHandle *mh)
-{
- struct GNUNET_CADET_Channel *channel = mh->channel;
-
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Resetting cadet channel to %s\n",
- GNUNET_i2s (&mh->target));
- mh->channel = NULL;
- if (NULL != channel)
- GNUNET_CADET_channel_destroy (channel);
- GNUNET_CONTAINER_multihashmap_iterate (mh->waiting_map,
- &move_to_pending,
- mh);
- mh->channel = GNUNET_CADET_channel_create (cadet_handle,
- mh,
- &mh->target,
-
GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER,
- GNUNET_CADET_OPTION_RELIABLE);
- transmit_pending (mh);
-}
-
-
-/**
- * Task called when it is time to destroy an inactive cadet channel.
- *
- * @param cls the `struct CadetHandle` to tear down
- * @param tc scheduler context, unused
- */
-static void
-cadet_timeout (void *cls,
- const struct GNUNET_SCHEDULER_TaskContext *tc)
-{
- struct CadetHandle *mh = cls;
- struct GNUNET_CADET_Channel *tun;
-
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Timeout on cadet channel to %s\n",
- GNUNET_i2s (&mh->target));
- mh->timeout_task = GNUNET_SCHEDULER_NO_TASK;
- tun = mh->channel;
- mh->channel = NULL;
- GNUNET_CADET_channel_destroy (tun);
-}
-
-
-/**
- * Task called when it is time to reset an cadet.
- *
- * @param cls the `struct CadetHandle` to tear down
- * @param tc scheduler context, unused
- */
-static void
-reset_cadet_task (void *cls,
- const struct GNUNET_SCHEDULER_TaskContext *tc)
-{
- struct CadetHandle *mh = cls;
-
- mh->reset_task = GNUNET_SCHEDULER_NO_TASK;
- reset_cadet (mh);
-}
-
-
-/**
- * We had a serious error, tear down and re-create cadet from scratch,
- * but do so asynchronously.
- *
- * @param mh cadet to reset
- */
-static void
-reset_cadet_async (struct CadetHandle *mh)
-{
- if (GNUNET_SCHEDULER_NO_TASK != mh->reset_task)
- GNUNET_SCHEDULER_cancel (mh->reset_task);
- mh->reset_task = GNUNET_SCHEDULER_add_now (&reset_cadet_task,
- mh);
-}
-
-
-/**
- * Functions of this signature are called whenever we are ready to transmit
- * query via a cadet.
- *
- * @param cls the struct CadetHandle for which we did the write call
- * @param size the number of bytes that can be written to @a buf
- * @param buf where to write the message
- * @return number of bytes written to @a buf
- */
-static size_t
-transmit_sqm (void *cls,
- size_t size,
- void *buf)
-{
- struct CadetHandle *mh = cls;
- struct CadetQueryMessage sqm;
- struct GSF_CadetRequest *sr;
-
- mh->wh = NULL;
- if (NULL == buf)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Cadet channel to %s failed during transmission attempt,
rebuilding\n",
- GNUNET_i2s (&mh->target));
- reset_cadet_async (mh);
- return 0;
- }
- sr = mh->pending_head;
- if (NULL == sr)
- return 0;
- GNUNET_assert (size >= sizeof (struct CadetQueryMessage));
- GNUNET_CONTAINER_DLL_remove (mh->pending_head,
- mh->pending_tail,
- sr);
- GNUNET_assert (GNUNET_OK ==
- GNUNET_CONTAINER_multihashmap_put (mh->waiting_map,
- &sr->query,
- sr,
-
GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
- sr->was_transmitted = GNUNET_YES;
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Sending query for %s via cadet to %s\n",
- GNUNET_h2s (&sr->query),
- GNUNET_i2s (&mh->target));
- sqm.header.size = htons (sizeof (sqm));
- sqm.header.type = htons (GNUNET_MESSAGE_TYPE_FS_CADET_QUERY);
- sqm.type = htonl (sr->type);
- sqm.query = sr->query;
- memcpy (buf, &sqm, sizeof (sqm));
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Successfully transmitted %u bytes via cadet to %s\n",
- (unsigned int) size,
- GNUNET_i2s (&mh->target));
- transmit_pending (mh);
- return sizeof (sqm);
-}
-
-
-/**
- * Transmit pending requests via the cadet.
- *
- * @param mh cadet to process
- */
-static void
-transmit_pending (struct CadetHandle *mh)
-{
- if (NULL == mh->channel)
- return;
- if (NULL != mh->wh)
- return;
- mh->wh = GNUNET_CADET_notify_transmit_ready (mh->channel, GNUNET_YES /*
allow cork */,
- GNUNET_TIME_UNIT_FOREVER_REL,
- sizeof (struct CadetQueryMessage),
- &transmit_sqm, mh);
-}
-
-
-/**
- * Closure for handle_reply().
- */
-struct HandleReplyClosure
-{
-
- /**
- * Reply payload.
- */
- const void *data;
-
- /**
- * Expiration time for the block.
- */
- struct GNUNET_TIME_Absolute expiration;
-
- /**
- * Number of bytes in 'data'.
- */
- size_t data_size;
-
- /**
- * Type of the block.
- */
- enum GNUNET_BLOCK_Type type;
-
- /**
- * Did we have a matching query?
- */
- int found;
-};
-
-
-/**
- * Iterator called on each entry in a waiting map to
- * process a result.
- *
- * @param cls the `struct HandleReplyClosure`
- * @param key the key of the entry in the map (the query)
- * @param value the `struct GSF_CadetRequest` to handle result for
- * @return #GNUNET_YES (continue to iterate)
- */
-static int
-handle_reply (void *cls,
- const struct GNUNET_HashCode *key,
- void *value)
-{
- struct HandleReplyClosure *hrc = cls;
- struct GSF_CadetRequest *sr = value;
-
- sr->proc (sr->proc_cls,
- hrc->type,
- hrc->expiration,
- hrc->data_size,
- hrc->data);
- sr->proc = NULL;
- GSF_cadet_query_cancel (sr);
- hrc->found = GNUNET_YES;
- return GNUNET_YES;
-}
-
-
-/**
- * Functions with this signature are called whenever a complete reply
- * is received.
- *
- * @param cls closure with the `struct CadetHandle`
- * @param channel channel handle
- * @param channel_ctx channel context
- * @param message the actual message
- * @return #GNUNET_OK on success, #GNUNET_SYSERR to stop further processing
- */
-static int
-reply_cb (void *cls,
- struct GNUNET_CADET_Channel *channel,
- void **channel_ctx,
- const struct GNUNET_MessageHeader *message)
-{
- struct CadetHandle *mh = *channel_ctx;
- const struct CadetReplyMessage *srm;
- struct HandleReplyClosure hrc;
- uint16_t msize;
- enum GNUNET_BLOCK_Type type;
- struct GNUNET_HashCode query;
-
- msize = ntohs (message->size);
- if (sizeof (struct CadetReplyMessage) > msize)
- {
- GNUNET_break_op (0);
- reset_cadet_async (mh);
- return GNUNET_SYSERR;
- }
- srm = (const struct CadetReplyMessage *) message;
- msize -= sizeof (struct CadetReplyMessage);
- type = (enum GNUNET_BLOCK_Type) ntohl (srm->type);
- if (GNUNET_YES !=
- GNUNET_BLOCK_get_key (GSF_block_ctx,
- type,
- &srm[1], msize, &query))
- {
- GNUNET_break_op (0);
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- "Received bogus reply of type %u with %u bytes via cadet from
peer %s\n",
- type,
- msize,
- GNUNET_i2s (&mh->target));
- reset_cadet_async (mh);
- return GNUNET_SYSERR;
- }
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Received reply `%s' via cadet from peer %s\n",
- GNUNET_h2s (&query),
- GNUNET_i2s (&mh->target));
- GNUNET_CADET_receive_done (channel);
- GNUNET_STATISTICS_update (GSF_stats,
- gettext_noop ("# replies received via cadet"), 1,
- GNUNET_NO);
- hrc.data = &srm[1];
- hrc.data_size = msize;
- hrc.expiration = GNUNET_TIME_absolute_ntoh (srm->expiration);
- hrc.type = type;
- hrc.found = GNUNET_NO;
- GNUNET_CONTAINER_multihashmap_get_multiple (mh->waiting_map,
- &query,
- &handle_reply,
- &hrc);
- if (GNUNET_NO == hrc.found)
- {
- GNUNET_STATISTICS_update (GSF_stats,
- gettext_noop ("# replies received via cadet
dropped"), 1,
- GNUNET_NO);
- return GNUNET_OK;
- }
- return GNUNET_OK;
-}
-
-
-/**
- * Get (or create) a cadet to talk to the given peer.
- *
- * @param target peer we want to communicate with
- */
-static struct CadetHandle *
-get_cadet (const struct GNUNET_PeerIdentity *target)
-{
- struct CadetHandle *mh;
-
- mh = GNUNET_CONTAINER_multipeermap_get (cadet_map,
- target);
- if (NULL != mh)
- {
- if (GNUNET_SCHEDULER_NO_TASK != mh->timeout_task)
- {
- GNUNET_SCHEDULER_cancel (mh->timeout_task);
- mh->timeout_task = GNUNET_SCHEDULER_NO_TASK;
- }
- return mh;
- }
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Creating cadet channel to %s\n",
- GNUNET_i2s (target));
- mh = GNUNET_new (struct CadetHandle);
- mh->reset_task = GNUNET_SCHEDULER_add_delayed (CLIENT_RETRY_TIMEOUT,
- &reset_cadet_task,
- mh);
- mh->waiting_map = GNUNET_CONTAINER_multihashmap_create (16, GNUNET_YES);
- mh->target = *target;
- GNUNET_assert (GNUNET_OK ==
- GNUNET_CONTAINER_multipeermap_put (cadet_map,
- &mh->target,
- mh,
-
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
- mh->channel = GNUNET_CADET_channel_create (cadet_handle,
- mh,
- &mh->target,
-
GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER,
- GNUNET_CADET_OPTION_RELIABLE);
- GNUNET_assert (mh ==
- GNUNET_CONTAINER_multipeermap_get (cadet_map,
- target));
- return mh;
-}
-
-
-/**
- * Look for a block by directly contacting a particular peer.
- *
- * @param target peer that should have the block
- * @param query hash to query for the block
- * @param type desired type for the block
- * @param proc function to call with result
- * @param proc_cls closure for @a proc
- * @return handle to cancel the operation
- */
-struct GSF_CadetRequest *
-GSF_cadet_query (const struct GNUNET_PeerIdentity *target,
- const struct GNUNET_HashCode *query,
- enum GNUNET_BLOCK_Type type,
- GSF_CadetReplyProcessor proc, void *proc_cls)
-{
- struct CadetHandle *mh;
- struct GSF_CadetRequest *sr;
-
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Preparing to send query for %s via cadet to %s\n",
- GNUNET_h2s (query),
- GNUNET_i2s (target));
- mh = get_cadet (target);
- sr = GNUNET_new (struct GSF_CadetRequest);
- sr->mh = mh;
- sr->proc = proc;
- sr->proc_cls = proc_cls;
- sr->type = type;
- sr->query = *query;
- GNUNET_CONTAINER_DLL_insert (mh->pending_head,
- mh->pending_tail,
- sr);
- transmit_pending (mh);
- return sr;
-}
-
-
-/**
- * Cancel an active request; must not be called after 'proc'
- * was calld.
- *
- * @param sr request to cancel
- */
-void
-GSF_cadet_query_cancel (struct GSF_CadetRequest *sr)
-{
- struct CadetHandle *mh = sr->mh;
- GSF_CadetReplyProcessor p;
-
- p = sr->proc;
- sr->proc = NULL;
- if (NULL != p)
- {
- /* signal failure / cancellation to callback */
- p (sr->proc_cls, GNUNET_BLOCK_TYPE_ANY,
- GNUNET_TIME_UNIT_ZERO_ABS,
- 0, NULL);
- }
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Cancelled query for %s via cadet to %s\n",
- GNUNET_h2s (&sr->query),
- GNUNET_i2s (&sr->mh->target));
- if (GNUNET_YES == sr->was_transmitted)
- GNUNET_assert (GNUNET_OK ==
- GNUNET_CONTAINER_multihashmap_remove (mh->waiting_map,
- &sr->query,
- sr));
- else
- GNUNET_CONTAINER_DLL_remove (mh->pending_head,
- mh->pending_tail,
- sr);
- GNUNET_free (sr);
- if ( (0 == GNUNET_CONTAINER_multihashmap_size (mh->waiting_map)) &&
- (NULL == mh->pending_head) )
- mh->timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS,
- &cadet_timeout,
- mh);
-}
-
-
-/**
- * Iterator called on each entry in a waiting map to
- * call the 'proc' continuation and release associated
- * resources.
- *
- * @param cls the `struct CadetHandle`
- * @param key the key of the entry in the map (the query)
- * @param value the `struct GSF_CadetRequest` to clean up
- * @return #GNUNET_YES (continue to iterate)
- */
-static int
-free_waiting_entry (void *cls,
- const struct GNUNET_HashCode *key,
- void *value)
-{
- struct GSF_CadetRequest *sr = value;
-
- GSF_cadet_query_cancel (sr);
- return GNUNET_YES;
-}
-
-
-/**
- * Function called by cadet when a client disconnects.
- * Cleans up our `struct CadetClient` of that channel.
- *
- * @param cls NULL
- * @param channel channel of the disconnecting client
- * @param channel_ctx our `struct CadetClient`
- */
-static void
-cleaner_cb (void *cls,
- const struct GNUNET_CADET_Channel *channel,
- void *channel_ctx)
-{
- struct CadetHandle *mh = channel_ctx;
- struct GSF_CadetRequest *sr;
-
- if (NULL == mh->channel)
- return; /* being destroyed elsewhere */
- GNUNET_assert (channel == mh->channel);
- mh->channel = NULL;
- while (NULL != (sr = mh->pending_head))
- GSF_cadet_query_cancel (sr);
- /* first remove `mh` from the `cadet_map`, so that if the
- callback from `free_waiting_entry()` happens to re-issue
- the request, we don't immediately have it back in the
- `waiting_map`. */
- GNUNET_assert (GNUNET_OK ==
- GNUNET_CONTAINER_multipeermap_remove (cadet_map,
- &mh->target,
- mh));
- GNUNET_CONTAINER_multihashmap_iterate (mh->waiting_map,
- &free_waiting_entry,
- mh);
- if (NULL != mh->wh)
- GNUNET_CADET_notify_transmit_ready_cancel (mh->wh);
- if (GNUNET_SCHEDULER_NO_TASK != mh->timeout_task)
- GNUNET_SCHEDULER_cancel (mh->timeout_task);
- if (GNUNET_SCHEDULER_NO_TASK != mh->reset_task)
- GNUNET_SCHEDULER_cancel (mh->reset_task);
- GNUNET_assert (0 ==
- GNUNET_CONTAINER_multihashmap_size (mh->waiting_map));
- GNUNET_CONTAINER_multihashmap_destroy (mh->waiting_map);
- GNUNET_free (mh);
-}
-
-
-/**
- * Initialize subsystem for non-anonymous file-sharing.
- */
-void
-GSF_cadet_start_client ()
-{
- static const struct GNUNET_CADET_MessageHandler handlers[] = {
- { &reply_cb, GNUNET_MESSAGE_TYPE_FS_CADET_REPLY, 0 },
- { NULL, 0, 0 }
- };
-
- cadet_map = GNUNET_CONTAINER_multipeermap_create (16, GNUNET_YES);
- cadet_handle = GNUNET_CADET_connect (GSF_cfg,
- NULL,
- NULL,
- &cleaner_cb,
- handlers,
- NULL);
-}
-
-
-/**
- * Function called on each active cadets to shut them down.
- *
- * @param cls NULL
- * @param key target peer, unused
- * @param value the `struct CadetHandle` to destroy
- * @return #GNUNET_YES (continue to iterate)
- */
-static int
-release_cadets (void *cls,
- const struct GNUNET_PeerIdentity *key,
- void *value)
-{
- struct CadetHandle *mh = value;
-
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Timeout on cadet channel to %s\n",
- GNUNET_i2s (&mh->target));
- if (NULL != mh->channel)
- GNUNET_CADET_channel_destroy (mh->channel);
- return GNUNET_YES;
-}
-
-
-/**
- * Shutdown subsystem for non-anonymous file-sharing.
- */
-void
-GSF_cadet_stop_client ()
-{
- GNUNET_CONTAINER_multipeermap_iterate (cadet_map,
- &release_cadets,
- NULL);
- GNUNET_CONTAINER_multipeermap_destroy (cadet_map);
- cadet_map = NULL;
- if (NULL != cadet_handle)
- {
- GNUNET_CADET_disconnect (cadet_handle);
- cadet_handle = NULL;
- }
-}
-
-
-/* end of gnunet-service-fs_cadet_client.c */
Deleted: gnunet/src/fs/gnunet-service-fs_mesh_server.c
===================================================================
--- gnunet/src/fs/gnunet-service-fs_mesh_server.c 2014-05-07 12:07:42 UTC
(rev 33199)
+++ gnunet/src/fs/gnunet-service-fs_mesh_server.c 2014-05-07 12:07:44 UTC
(rev 33200)
@@ -1,595 +0,0 @@
-/*
- This file is part of GNUnet.
- (C) 2012, 2013 Christian Grothoff (and other contributing authors)
-
- GNUnet is free software; you can redistribute it and/or modify
- it under the terms of the GNU General Public License as published
- by the Free Software Foundation; either version 3, or (at your
- option) any later version.
-
- GNUnet is distributed in the hope that it will be useful, but
- WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with GNUnet; see the file COPYING. If not, write to the
- Free Software Foundation, Inc., 59 Temple Place - Suite 330,
- Boston, MA 02111-1307, USA.
-*/
-
-/**
- * @file fs/gnunet-service-fs_cadet_server.c
- * @brief non-anonymous file-transfer
- * @author Christian Grothoff
- *
- * TODO:
- * - PORT is set to old application type, unsure if we should keep
- * it that way (fine for now)
- */
-#include "platform.h"
-#include "gnunet_constants.h"
-#include "gnunet_util_lib.h"
-#include "gnunet_cadet_service.h"
-#include "gnunet_protocols.h"
-#include "gnunet_applications.h"
-#include "gnunet-service-fs.h"
-#include "gnunet-service-fs_indexing.h"
-#include "gnunet-service-fs_cadet.h"
-
-/**
- * After how long do we termiante idle connections?
- */
-#define IDLE_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES,
2)
-
-
-/**
- * A message in the queue to be written to the cadet.
- */
-struct WriteQueueItem
-{
- /**
- * Kept in a DLL.
- */
- struct WriteQueueItem *next;
-
- /**
- * Kept in a DLL.
- */
- struct WriteQueueItem *prev;
-
- /**
- * Number of bytes of payload, allocated at the end of this struct.
- */
- size_t msize;
-};
-
-
-/**
- * Information we keep around for each active cadeting client.
- */
-struct CadetClient
-{
- /**
- * DLL
- */
- struct CadetClient *next;
-
- /**
- * DLL
- */
- struct CadetClient *prev;
-
- /**
- * Channel for communication.
- */
- struct GNUNET_CADET_Channel *channel;
-
- /**
- * Handle for active write operation, or NULL.
- */
- struct GNUNET_CADET_TransmitHandle *wh;
-
- /**
- * Head of write queue.
- */
- struct WriteQueueItem *wqi_head;
-
- /**
- * Tail of write queue.
- */
- struct WriteQueueItem *wqi_tail;
-
- /**
- * Current active request to the datastore, if we have one pending.
- */
- struct GNUNET_DATASTORE_QueueEntry *qe;
-
- /**
- * Task that is scheduled to asynchronously terminate the connection.
- */
- GNUNET_SCHEDULER_TaskIdentifier terminate_task;
-
- /**
- * Task that is scheduled to terminate idle connections.
- */
- GNUNET_SCHEDULER_TaskIdentifier timeout_task;
-
- /**
- * Size of the last write that was initiated.
- */
- size_t reply_size;
-
-};
-
-
-/**
- * Listen channel for incoming requests.
- */
-static struct GNUNET_CADET_Handle *listen_channel;
-
-/**
- * Head of DLL of cadet clients.
- */
-static struct CadetClient *sc_head;
-
-/**
- * Tail of DLL of cadet clients.
- */
-static struct CadetClient *sc_tail;
-
-/**
- * Number of active cadet clients in the 'sc_*'-DLL.
- */
-static unsigned int sc_count;
-
-/**
- * Maximum allowed number of cadet clients.
- */
-static unsigned long long sc_count_max;
-
-
-
-/**
- * Task run to asynchronously terminate the cadet due to timeout.
- *
- * @param cls the 'struct CadetClient'
- * @param tc scheduler context
- */
-static void
-timeout_cadet_task (void *cls,
- const struct GNUNET_SCHEDULER_TaskContext *tc)
-{
- struct CadetClient *sc = cls;
- struct GNUNET_CADET_Channel *tun;
-
- sc->timeout_task = GNUNET_SCHEDULER_NO_TASK;
- tun = sc->channel;
- sc->channel = NULL;
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Timeout for inactive cadet client %p\n",
- sc);
- GNUNET_CADET_channel_destroy (tun);
-}
-
-
-/**
- * Reset the timeout for the cadet client (due to activity).
- *
- * @param sc client handle to reset timeout for
- */
-static void
-refresh_timeout_task (struct CadetClient *sc)
-{
- if (GNUNET_SCHEDULER_NO_TASK != sc->timeout_task)
- GNUNET_SCHEDULER_cancel (sc->timeout_task);
- sc->timeout_task = GNUNET_SCHEDULER_add_delayed (IDLE_TIMEOUT,
- &timeout_cadet_task,
- sc);
-}
-
-
-/**
- * We're done handling a request from a client, read the next one.
- *
- * @param sc client to continue reading requests from
- */
-static void
-continue_reading (struct CadetClient *sc)
-{
- refresh_timeout_task (sc);
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Finished processing cadet request from client %p, ready to
receive the next one\n",
- sc);
- GNUNET_CADET_receive_done (sc->channel);
-}
-
-
-/**
- * Transmit the next entry from the write queue.
- *
- * @param sc where to process the write queue
- */
-static void
-continue_writing (struct CadetClient *sc);
-
-
-/**
- * Send a reply now, cadet is ready.
- *
- * @param cls closure with the `struct CadetClient` which sent the query
- * @param size number of bytes available in @a buf
- * @param buf where to write the message
- * @return number of bytes written to @a buf
- */
-static size_t
-write_continuation (void *cls,
- size_t size,
- void *buf)
-{
- struct CadetClient *sc = cls;
- struct GNUNET_CADET_Channel *tun;
- struct WriteQueueItem *wqi;
- size_t ret;
-
- sc->wh = NULL;
- if (NULL == (wqi = sc->wqi_head))
- {
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Write queue empty, reading more requests\n");
- return 0;
- }
- if ( (0 == size) ||
- (size < wqi->msize) )
- {
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Transmission of reply failed, terminating cadet\n");
- tun = sc->channel;
- sc->channel = NULL;
- GNUNET_CADET_channel_destroy (tun);
- return 0;
- }
- GNUNET_CONTAINER_DLL_remove (sc->wqi_head,
- sc->wqi_tail,
- wqi);
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Transmitted %u byte reply via cadet to %p\n",
- (unsigned int) size,
- sc);
- GNUNET_STATISTICS_update (GSF_stats,
- gettext_noop ("# Blocks transferred via cadet"), 1,
- GNUNET_NO);
- memcpy (buf, &wqi[1], ret = wqi->msize);
- GNUNET_free (wqi);
- continue_writing (sc);
- return ret;
-}
-
-
-/**
- * Transmit the next entry from the write queue.
- *
- * @param sc where to process the write queue
- */
-static void
-continue_writing (struct CadetClient *sc)
-{
- struct WriteQueueItem *wqi;
- struct GNUNET_CADET_Channel *tun;
-
- if (NULL != sc->wh)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Write pending, waiting for it to complete\n");
- return; /* write already pending */
- }
- if (NULL == (wqi = sc->wqi_head))
- {
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Write queue empty, reading more requests\n");
- continue_reading (sc);
- return;
- }
- sc->wh = GNUNET_CADET_notify_transmit_ready (sc->channel, GNUNET_NO,
- GNUNET_TIME_UNIT_FOREVER_REL,
- wqi->msize,
- &write_continuation,
- sc);
- if (NULL == sc->wh)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Write failed; terminating cadet\n");
- tun = sc->channel;
- sc->channel = NULL;
- GNUNET_CADET_channel_destroy (tun);
- return;
- }
-}
-
-
-/**
- * Process a datum that was stored in the datastore.
- *
- * @param cls closure with the `struct CadetClient` which sent the query
- * @param key key for the content
- * @param size number of bytes in @a data
- * @param data content stored
- * @param type type of the content
- * @param priority priority of the content
- * @param anonymity anonymity-level for the content
- * @param expiration expiration time for the content
- * @param uid unique identifier for the datum;
- * maybe 0 if no unique identifier is available
- */
-static void
-handle_datastore_reply (void *cls,
- const struct GNUNET_HashCode *key,
- size_t size, const void *data,
- enum GNUNET_BLOCK_Type type,
- uint32_t priority,
- uint32_t anonymity,
- struct GNUNET_TIME_Absolute expiration,
- uint64_t uid)
-{
- struct CadetClient *sc = cls;
- size_t msize = size + sizeof (struct CadetReplyMessage);
- struct WriteQueueItem *wqi;
- struct CadetReplyMessage *srm;
-
- sc->qe = NULL;
- if (NULL == data)
- {
- /* no result, this should not really happen, as for
- non-anonymous routing only peers that HAVE the
- answers should be queried; OTOH, this is not a
- hard error as we might have had the answer in the
- past and the user might have unindexed it. Hence
- we log at level "INFO" for now. */
- GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- "Have no answer for query `%s'\n",
- GNUNET_h2s (key));
- GNUNET_STATISTICS_update (GSF_stats,
- gettext_noop ("# queries received via cadet not
answered"), 1,
- GNUNET_NO);
- continue_writing (sc);
- return;
- }
- if (GNUNET_BLOCK_TYPE_FS_ONDEMAND == type)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Performing on-demand encoding for query %s\n",
- GNUNET_h2s (key));
- if (GNUNET_OK !=
- GNUNET_FS_handle_on_demand_block (key,
- size, data, type,
- priority, anonymity,
- expiration, uid,
- &handle_datastore_reply,
- sc))
- {
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "On-demand encoding request failed\n");
- continue_writing (sc);
- }
- return;
- }
- if (msize > GNUNET_SERVER_MAX_MESSAGE_SIZE)
- {
- GNUNET_break (0);
- continue_writing (sc);
- return;
- }
- GNUNET_break (GNUNET_BLOCK_TYPE_ANY != type);
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Starting transmission of %u byte reply of type %d for query `%s'
via cadet to %p\n",
- (unsigned int) size,
- (unsigned int) type,
- GNUNET_h2s (key),
- sc);
- wqi = GNUNET_malloc (sizeof (struct WriteQueueItem) + msize);
- wqi->msize = msize;
- srm = (struct CadetReplyMessage *) &wqi[1];
- srm->header.size = htons ((uint16_t) msize);
- srm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_CADET_REPLY);
- srm->type = htonl (type);
- srm->expiration = GNUNET_TIME_absolute_hton (expiration);
- memcpy (&srm[1], data, size);
- sc->reply_size = msize;
- GNUNET_CONTAINER_DLL_insert (sc->wqi_head,
- sc->wqi_tail,
- wqi);
- continue_writing (sc);
-}
-
-
-/**
- * Functions with this signature are called whenever a
- * complete query message is received.
- *
- * Do not call #GNUNET_SERVER_mst_destroy in callback
- *
- * @param cls closure with the 'struct CadetClient'
- * @param channel channel handle
- * @param channel_ctx channel context
- * @param message the actual message
- * @return #GNUNET_OK on success, #GNUNET_SYSERR to stop further processing
- */
-static int
-request_cb (void *cls,
- struct GNUNET_CADET_Channel *channel,
- void **channel_ctx,
- const struct GNUNET_MessageHeader *message)
-{
- struct CadetClient *sc = *channel_ctx;
- const struct CadetQueryMessage *sqm;
-
- sqm = (const struct CadetQueryMessage *) message;
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Received query for `%s' via cadet from client %p\n",
- GNUNET_h2s (&sqm->query),
- sc);
- GNUNET_STATISTICS_update (GSF_stats,
- gettext_noop ("# queries received via cadet"), 1,
- GNUNET_NO);
- refresh_timeout_task (sc);
- sc->qe = GNUNET_DATASTORE_get_key (GSF_dsh,
- 0,
- &sqm->query,
- ntohl (sqm->type),
- 0 /* priority */,
- GSF_datastore_queue_size,
- GNUNET_TIME_UNIT_FOREVER_REL,
- &handle_datastore_reply, sc);
- if (NULL == sc->qe)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Queueing request with datastore failed (queue full?)\n");
- continue_writing (sc);
- }
- return GNUNET_OK;
-}
-
-
-/**
- * Functions of this type are called upon new cadet connection from other
peers.
- *
- * @param cls the closure from GNUNET_CADET_connect
- * @param channel the channel representing the cadet
- * @param initiator the identity of the peer who wants to establish a cadet
- * with us; NULL on binding error
- * @param port cadet port used for the incoming connection
- * @param options channel option flags
- * @return initial channel context (our 'struct CadetClient')
- */
-static void *
-accept_cb (void *cls,
- struct GNUNET_CADET_Channel *channel,
- const struct GNUNET_PeerIdentity *initiator,
- uint32_t port, enum GNUNET_CADET_ChannelOption options)
-{
- struct CadetClient *sc;
-
- GNUNET_assert (NULL != channel);
- if (sc_count >= sc_count_max)
- {
- GNUNET_STATISTICS_update (GSF_stats,
- gettext_noop ("# cadet client connections
rejected"), 1,
- GNUNET_NO);
- GNUNET_CADET_channel_destroy (channel);
- return NULL;
- }
- GNUNET_STATISTICS_update (GSF_stats,
- gettext_noop ("# cadet connections active"), 1,
- GNUNET_NO);
- sc = GNUNET_new (struct CadetClient);
- sc->channel = channel;
- GNUNET_CONTAINER_DLL_insert (sc_head,
- sc_tail,
- sc);
- sc_count++;
- refresh_timeout_task (sc);
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Accepting inbound cadet connection from `%s' as client %p\n",
- GNUNET_i2s (initiator),
- sc);
- return sc;
-}
-
-
-/**
- * Function called by cadet when a client disconnects.
- * Cleans up our 'struct CadetClient' of that channel.
- *
- * @param cls NULL
- * @param channel channel of the disconnecting client
- * @param channel_ctx our 'struct CadetClient'
- */
-static void
-cleaner_cb (void *cls,
- const struct GNUNET_CADET_Channel *channel,
- void *channel_ctx)
-{
- struct CadetClient *sc = channel_ctx;
- struct WriteQueueItem *wqi;
-
- if (NULL == sc)
- return;
- sc->channel = NULL;
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Terminating cadet connection with client %p\n",
- sc);
- GNUNET_STATISTICS_update (GSF_stats,
- gettext_noop ("# cadet connections active"), -1,
- GNUNET_NO);
- if (GNUNET_SCHEDULER_NO_TASK != sc->terminate_task)
- GNUNET_SCHEDULER_cancel (sc->terminate_task);
- if (GNUNET_SCHEDULER_NO_TASK != sc->timeout_task)
- GNUNET_SCHEDULER_cancel (sc->timeout_task);
- if (NULL != sc->wh)
- GNUNET_CADET_notify_transmit_ready_cancel (sc->wh);
- if (NULL != sc->qe)
- GNUNET_DATASTORE_cancel (sc->qe);
- while (NULL != (wqi = sc->wqi_head))
- {
- GNUNET_CONTAINER_DLL_remove (sc->wqi_head,
- sc->wqi_tail,
- wqi);
- GNUNET_free (wqi);
- }
- GNUNET_CONTAINER_DLL_remove (sc_head,
- sc_tail,
- sc);
- sc_count--;
- GNUNET_free (sc);
-}
-
-
-/**
- * Initialize subsystem for non-anonymous file-sharing.
- */
-void
-GSF_cadet_start_server ()
-{
- static const struct GNUNET_CADET_MessageHandler handlers[] = {
- { &request_cb, GNUNET_MESSAGE_TYPE_FS_CADET_QUERY, sizeof (struct
CadetQueryMessage)},
- { NULL, 0, 0 }
- };
- static const uint32_t ports[] = {
- GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER,
- 0
- };
-
- if (GNUNET_YES !=
- GNUNET_CONFIGURATION_get_value_number (GSF_cfg,
- "fs",
- "MAX_CADET_CLIENTS",
- &sc_count_max))
- return;
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Initializing cadet FS server with a limit of %llu connections\n",
- sc_count_max);
- listen_channel = GNUNET_CADET_connect (GSF_cfg,
- NULL,
- &accept_cb,
- &cleaner_cb,
- handlers,
- ports);
-}
-
-
-/**
- * Shutdown subsystem for non-anonymous file-sharing.
- */
-void
-GSF_cadet_stop_server ()
-{
- if (NULL != listen_channel)
- {
- GNUNET_CADET_disconnect (listen_channel);
- listen_channel = NULL;
- }
- GNUNET_assert (NULL == sc_head);
- GNUNET_assert (0 == sc_count);
-}
-
-/* end of gnunet-service-fs_cadet.c */
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r33200 - gnunet/src/fs,
gnunet <=