[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r34826 - gnunet/src/rps
From: |
gnunet |
Subject: |
[GNUnet-SVN] r34826 - gnunet/src/rps |
Date: |
Wed, 7 Jan 2015 00:48:24 +0100 |
Author: ch3
Date: 2015-01-07 00:48:24 +0100 (Wed, 07 Jan 2015)
New Revision: 34826
Added:
gnunet/src/rps/gnunet-service-rps_sampler.c
gnunet/src/rps/gnunet-service-rps_sampler.h
Modified:
gnunet/src/rps/gnunet-service-rps.c
Log:
moved sampler functionality in file of its own
Modified: gnunet/src/rps/gnunet-service-rps.c
===================================================================
--- gnunet/src/rps/gnunet-service-rps.c 2015-01-06 14:35:19 UTC (rev 34825)
+++ gnunet/src/rps/gnunet-service-rps.c 2015-01-06 23:48:24 UTC (rev 34826)
@@ -29,6 +29,8 @@
#include "gnunet_nse_service.h"
#include "rps.h"
+#include "gnunet-service-rps_sampler.h"
+
#include <math.h>
#include <inttypes.h>
@@ -48,6 +50,11 @@
// TODO Change API to accept initialisation peers
+// TODO Change API to accept good peers 'friends'
+
+
+// hist_size_init, hist_size_max
+
/**
* Our configuration.
*/
@@ -58,515 +65,28 @@
*/
static struct GNUNET_PeerIdentity *own_identity;
-
- struct GNUNET_PeerIdentity *
-get_rand_peer(struct GNUNET_PeerIdentity *peer_list, unsigned int size);
-
-/***********************************************************************
- * Sampler
- *
- * WARNING: This section needs to be reviewed regarding the use of
- * functions providing (pseudo)randomness!
-***********************************************************************/
-
-// TODO care about invalid input of the caller (size 0 or less...)
-
-// It might be interesting to formulate this independent of PeerIDs.
-
/**
- * Callback that is called when a new PeerID is inserted into a sampler.
- *
- * @param cls the closure given alongside this function.
- * @param id the PeerID that is inserted
- * @param hash the hash the sampler produced of the PeerID
+ * Closure to the callback cadet calls on each peer it passes to us
*/
-typedef void (* SAMPLER_insertCB) (void *cls,
- const struct GNUNET_PeerIdentity *id,
- struct GNUNET_HashCode hash);
-
-/**
- * Callback that is called when a new PeerID is removed from a sampler.
- *
- * @param cls the closure given alongside this function.
- * @param id the PeerID that is removed
- * @param hash the hash the sampler produced of the PeerID
- */
-typedef void (* SAMPLER_removeCB) (void *cls,
- const struct GNUNET_PeerIdentity *id,
- struct GNUNET_HashCode hash);
-
-/**
- * A sampler sampling PeerIDs.
- */
-struct Sampler
+struct init_peer_cls
{
/**
- * Min-wise linear permutation used by this sampler.
- *
- * This is an key later used by a hmac.
+ * The server handle to later listen to client requests
*/
- struct GNUNET_CRYPTO_AuthKey auth_key;
+ struct GNUNET_SERVER_Handle *server;
/**
- * The PeerID this sampler currently samples.
+ * Counts how many peers cadet already passed to us
*/
- struct GNUNET_PeerIdentity *peer_id;
-
- /**
- * The according hash value of this PeerID.
- */
- struct GNUNET_HashCode peer_id_hash;
-
- /**
- * Samplers are kept in a linked list.
- */
- struct Sampler *next;
-
- /**
- * Samplers are kept in a linked list.
- */
- struct Sampler *prev;
-
+ uint32_t i;
};
-/**
- * A n-tuple of samplers.
- */
-struct Samplers
-{
- /**
- * Number of samplers we hold.
- */
- unsigned int size;
- //size_t size;
-
- /**
- * All PeerIDs in one array.
- */
- struct GNUNET_PeerIdentity *peer_ids;
- /**
- * Callback to be called when a peer gets inserted into a sampler.
- */
- SAMPLER_insertCB insertCB;
+ struct GNUNET_PeerIdentity *
+get_rand_peer (const struct GNUNET_PeerIdentity *peer_list, unsigned int size);
- /**
- * Closure to the insertCB.
- */
- void *insertCLS;
- /**
- * Callback to be called when a peer gets inserted into a sampler.
- */
- SAMPLER_removeCB removeCB;
-
- /**
- * Closure to the removeCB.
- */
- void *removeCLS;
-
- /**
- * The head of the DLL.
- */
- struct Sampler *head;
-
- /**
- * The tail of the DLL.
- */
- struct Sampler *tail;
-
-};
-
-/**
- * Reinitialise a previously initialised sampler.
- *
- * @param sampler the sampler element.
- * @param id pointer to the memory that keeps the value.
- */
- void
-SAMPLER_reinitialise_sampler (struct Sampler *sampler, struct
GNUNET_PeerIdentity *id)
-{
- // I guess I don't need to call GNUNET_CRYPTO_hmac_derive_key()...
- GNUNET_CRYPTO_random_block(GNUNET_CRYPTO_QUALITY_STRONG,
- &(sampler->auth_key.key),
- GNUNET_CRYPTO_HASH_LENGTH);
-
- GNUNET_assert(NULL != id);
- sampler->peer_id = id;
- memcpy(sampler->peer_id, own_identity, sizeof(struct GNUNET_PeerIdentity));
// FIXME this should probably be NULL -- the caller has to handle those.
- // Maybe take a PeerID as second argument.
-
- GNUNET_CRYPTO_hmac(&sampler->auth_key, sampler->peer_id,
- sizeof(struct GNUNET_PeerIdentity),
- &sampler->peer_id_hash);
-}
-
-
-/**
- * (Re)Initialise given Sampler with random min-wise independent function.
- *
- * In this implementation this means choosing an auth_key for later use in
- * a hmac at random.
- *
- * @param id pointer to the place where this sampler will store the PeerID.
- * This will be overwritten.
- */
- struct Sampler *
-SAMPLER_init(struct GNUNET_PeerIdentity *id)
-{
- struct Sampler *s;
-
- s = GNUNET_new(struct Sampler);
-
- SAMPLER_reinitialise_sampler (s, id);
- LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: initialised with PeerID %s (at %p)
\n",
- GNUNET_i2s(s->peer_id), s->peer_id);
-
- s->prev = NULL;
- s->next = NULL;
-
- return s;
-}
-
-/**
- * Input an PeerID into the given sampler.
- */
- static void
-SAMPLER_next(struct Sampler *s, const struct GNUNET_PeerIdentity *other,
- SAMPLER_insertCB insertCB, void *insertCLS,
- SAMPLER_removeCB removeCB, void *removeCLS)
- // TODO call update herein
-{
- struct GNUNET_HashCode other_hash;
-
- LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: New PeerID %s at %p\n",
- GNUNET_i2s(other), other);
- LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Old PeerID %s at %p\n",
- GNUNET_i2s(s->peer_id), s->peer_id);
-
- if ( 0 == GNUNET_CRYPTO_cmp_peer_identity(other, s->peer_id) )
- {
- LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Got PeerID %s\n",
- GNUNET_i2s(other));
- LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Have already PeerID %s\n",
- GNUNET_i2s(s->peer_id));
- }
- else
- {
- GNUNET_CRYPTO_hmac(&s->auth_key,
- other,
- sizeof(struct GNUNET_PeerIdentity),
- &other_hash);
-
- if ( NULL == s->peer_id )
- { // Or whatever is a valid way to say
- // "we have no PeerID at the moment"
- LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Got PeerID %s; Simply accepting
(got NULL previously).\n",
- GNUNET_i2s(other));
- memcpy(s->peer_id, other, sizeof(struct GNUNET_PeerIdentity));
- //s->peer_id = other;
- s->peer_id_hash = other_hash;
- if (NULL != insertCB)
- {
- insertCB(insertCLS, s->peer_id, s->peer_id_hash);
- }
- }
- else if ( 0 > GNUNET_CRYPTO_hash_cmp(&other_hash, &s->peer_id_hash) )
- {
- LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Got PeerID %s\n",
- GNUNET_i2s(other));
- LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Discarding old PeerID %s\n",
- GNUNET_i2s(s->peer_id));
-
- if ( NULL != removeCB )
- {
- LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Removing old PeerID %s with the
remove callback.\n",
- GNUNET_i2s(s->peer_id));
- removeCB(removeCLS, s->peer_id, s->peer_id_hash);
- }
-
- memcpy(s->peer_id, other, sizeof(struct GNUNET_PeerIdentity));
- //s->peer_id = other;
- s->peer_id_hash = other_hash;
-
- if ( NULL != insertCB )
- {
- LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Inserting new PeerID %s with
the insert callback.\n",
- GNUNET_i2s(s->peer_id));
- insertCB(insertCLS, s->peer_id, s->peer_id_hash);
- }
- }
- else
- {
- LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Got PeerID %s\n",
- GNUNET_i2s(other));
- LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Keeping old PeerID %s\n",
- GNUNET_i2s(s->peer_id));
- }
- }
-}
-
-/**
- * Gow or shrink the size of the tuple of samplers.
- *
- * @param samplers the samplers to grow
- * @param new_size the new size of the samplers
- * @param fill_up_id if growing, that has to point to a
- * valid PeerID and will be used
- * to initialise newly created samplers
- */
- void
-SAMPLER_samplers_resize (struct Samplers * samplers,
- unsigned int new_size,
- struct GNUNET_PeerIdentity *fill_up_id)
-{
- if ( samplers->size == new_size )
- {
- LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Size remains the same -- nothing to
do\n");
- return;
- }
-
- unsigned int old_size;
- struct Sampler *iter;
- uint64_t i;
- struct Sampler *tmp;
-
- old_size = samplers->size;
- LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Growing/Shrinking samplers %u ->
%u\n", old_size, new_size);
-
- iter = samplers->head;
-
- if ( new_size < old_size )
- {
- for ( i = new_size ; i < old_size ; i++ )
- {/* Remove unneeded rest */
- tmp = iter->next;
- LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Removing %" PRIX64 ". sampler\n",
i);
- if (NULL != samplers->removeCB)
- samplers->removeCB(samplers->removeCLS, iter->peer_id,
iter->peer_id_hash);
- // FIXME When this is called and counts the amount of peer_ids in the
samplers
- // this gets a wrong number.
- GNUNET_CONTAINER_DLL_remove(samplers->head, samplers->tail, iter);
- GNUNET_free(iter);
- iter = tmp;
- }
- }
-
- GNUNET_array_grow(samplers->peer_ids, samplers->size, new_size);
- LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: samplers->peer_ids now points to
%p\n", samplers->peer_ids);
-
- if ( new_size > old_size )
- { /* Growing */
- GNUNET_assert( NULL != fill_up_id );
- for ( i = 0 ; i < new_size ; i++ )
- { /* All samplers */
- if ( i < old_size )
- { /* Update old samplers */
- iter->peer_id = &samplers->peer_ids[i];
- LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Updated %" PRIX64 ". sampler,
now pointing to %p, contains %s\n",
- i, &samplers->peer_ids[i], GNUNET_i2s(iter->peer_id));
- iter = iter->next;
- }
- else
- { /* Add new samplers */
- memcpy(&samplers->peer_ids[i], fill_up_id, sizeof(struct
GNUNET_PeerIdentity));
- iter = SAMPLER_init(&samplers->peer_ids[i]);
- if (NULL != samplers->insertCB)
- {
- samplers->insertCB(samplers->insertCLS, iter->peer_id,
iter->peer_id_hash);
- }
- GNUNET_CONTAINER_DLL_insert_tail(samplers->head, samplers->tail, iter);
- LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Added %" PRIX64 ". sampler, now
pointing to %p, contains %s\n",
- i, &samplers->peer_ids[i], GNUNET_i2s(iter->peer_id));
- }
- }
- }
- else// if ( new_size < old_size )
- { /* Shrinking */
- for ( i = 0 ; i < new_size ; i++)
- { /* All samplers */
- tmp = iter->next;
- /* Update remaining samplers */
- iter->peer_id = &samplers->peer_ids[i];
- LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Updatied %" PRIX64 ". sampler,
now pointing to %p, contains %s\n",
- i, &samplers->peer_ids[i], GNUNET_i2s(iter->peer_id));
-
- iter = tmp;
- }
- }
-
- GNUNET_assert(samplers->size == new_size);
- LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Finished growing/shrinking.\n");
-}
-
-
-/**
- * Initialise a tuple of samplers.
- */
-struct Samplers *
-SAMPLER_samplers_init(size_t init_size, struct GNUNET_PeerIdentity *id,
- SAMPLER_insertCB insertCB, void *insertCLS,
- SAMPLER_removeCB removeCB, void *removeCLS)
-{
- struct Samplers *samplers;
- //struct Sampler *s;
- //uint64_t i;
-
- samplers = GNUNET_new(struct Samplers);
- samplers->size = 0;
- samplers->peer_ids = NULL;
- samplers->insertCB = insertCB;
- samplers->insertCLS = insertCLS;
- samplers->removeCB = removeCB;
- samplers->removeCLS = removeCLS;
- samplers->head = samplers->tail = NULL;
- //samplers->peer_ids = GNUNET_new_array(init_size, struct
GNUNET_PeerIdentity);
-
- SAMPLER_samplers_resize(samplers, init_size, id);
-
- GNUNET_assert(init_size == samplers->size);
- return samplers;
-}
-
-
-/**
- * A fuction to update every sampler in the given list
- */
- static void
-SAMPLER_update_list(struct Samplers *samplers, const struct
GNUNET_PeerIdentity *id)
-{
- struct Sampler *iter;
-
- iter = samplers->head;
- while ( NULL != iter->next )
- {
- SAMPLER_next(iter, id,
- samplers->insertCB, samplers->insertCLS,
- samplers->removeCB, samplers->removeCLS);
- iter = iter->next;
- }
-
-}
-
-/**
- * Reinitialise all previously initialised sampler with the given value.
- *
- * @param samplers the sampler list.
- * @param id the id of the samplers to update.
- */
- void
-SAMPLER_reinitialise_samplers_by_value (struct Samplers *samplers, const
struct GNUNET_PeerIdentity *id)
-{
- uint64_t i;
- struct Sampler *iter;
-
- iter = samplers->head;
- for ( i = 0 ; i < samplers->size ; i++ )
- {
- if ( 0 == GNUNET_CRYPTO_cmp_peer_identity(id, &samplers->peer_ids[i]) )
- {
- LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Reinitialising sampler\n");
- SAMPLER_reinitialise_sampler (iter, &samplers->peer_ids[i]);
- }
- if (NULL != iter->next)
- iter = iter->next;
- }
-}
-
-/**
- * Get one random peer out of the sampled peers.
- *
- * We might want to reinitialise this sampler after giving the
- * corrsponding peer to the client.
- */
- const struct GNUNET_PeerIdentity*
-SAMPLER_get_rand_peer (struct Samplers *samplers)
-{
- LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER_get_rand_peer:\n");
-
- if ( 0 == samplers->size )
- {
- LOG(GNUNET_ERROR_TYPE_DEBUG, "Sgrp: List empty - Returning own PeerID
%s\n", GNUNET_i2s(own_identity));
- return own_identity;
- }
- else
- {
- const struct GNUNET_PeerIdentity *peer;
-
- peer = get_rand_peer(samplers->peer_ids, samplers->size);
- LOG(GNUNET_ERROR_TYPE_DEBUG, "Sgrp: Returning PeerID %s\n",
GNUNET_i2s(peer));
- LOG(GNUNET_ERROR_TYPE_DEBUG, "Sgrp: (own ID: %s)\n",
GNUNET_i2s(own_identity));
-
- return peer;
- }
-}
-
-/**
- * Get n random peers out of the sampled peers.
- *
- * We might want to reinitialise this sampler after giving the
- * corrsponding peer to the client.
- * Random with or without consumption?
- */
- const struct GNUNET_PeerIdentity* // TODO give back simple array
-SAMPLER_get_n_rand_peers (struct Samplers *samplers, uint64_t n)
-{
- // TODO check if we have too much (distinct) sampled peers
- // If we are not ready yet maybe schedule for later
- struct GNUNET_PeerIdentity *peers;
- uint64_t i;
-
- peers = GNUNET_malloc(n * sizeof(struct GNUNET_PeerIdentity));
-
- for ( i = 0 ; i < n ; i++ ) {
- //peers[i] = SAMPLER_get_rand_peer(samplers);
- memcpy(&peers[i], SAMPLER_get_rand_peer(samplers), sizeof(struct
GNUNET_PeerIdentity));
- }
-
- // TODO something else missing?
- return peers;
-}
-
-/**
- * Counts how many Samplers currently hold a given PeerID.
- */
- uint64_t
-SAMPLER_count_id ( struct Samplers *samplers, const struct GNUNET_PeerIdentity
*id )
-{
- struct Sampler *iter;
- uint64_t count;
-
- iter = samplers->head;
- count = 0;
- while ( NULL != iter )
- {
- if ( 0 == GNUNET_CRYPTO_cmp_peer_identity( iter->peer_id, id) )
- count++;
- iter = iter->next;
- }
- return count;
-}
-
-
-/**
- * Cleans the samplers.
- *
- * @param samplers the samplers to clean up.
- */
- void
-SAMPLER_samplers_destroy (struct Samplers *samplers)
-{
- SAMPLER_samplers_resize(samplers, 0, NULL);
- GNUNET_free(samplers);
-}
-
/***********************************************************************
- * /Sampler
-***********************************************************************/
-
-
-
-/***********************************************************************
* Housekeeping with peers
***********************************************************************/
@@ -637,12 +157,6 @@
/**
- * The samplers.
- */
-static struct Samplers *sampler_list;
-
-
-/**
* The gossiped list of peers.
*/
static struct GNUNET_PeerIdentity *gossip_list;
@@ -665,12 +179,16 @@
/**
* Percentage of total peer number in the gossip list
* to send random PUSHes to
+ *
+ * TODO do not read from configuration
*/
static float alpha;
/**
* Percentage of total peer number in the gossip list
* to send random PULLs to
+ *
+ * TODO do not read from configuration
*/
static float beta;
@@ -696,6 +214,8 @@
/**
* List to store peers received through pushes temporary.
+ *
+ * TODO -> multipeermap
*/
static struct GNUNET_PeerIdentity *push_list;
@@ -707,6 +227,8 @@
/**
* List to store peers received through pulls temporary.
+ *
+ * TODO -> multipeermap
*/
static struct GNUNET_PeerIdentity *pull_list;
@@ -741,11 +263,12 @@
* Get random peer from the gossip list.
*/
struct GNUNET_PeerIdentity *
-get_rand_peer(struct GNUNET_PeerIdentity *peer_list, unsigned int list_size)
+get_rand_peer(const struct GNUNET_PeerIdentity *peer_list, unsigned int
list_size)
{
uint64_t r_index;
struct GNUNET_PeerIdentity *peer;
+ peer = GNUNET_new(struct GNUNET_PeerIdentity);
// FIXME if we have only NULL in gossip list this will block
// but then we might have a problem nevertheless
@@ -759,12 +282,13 @@
r_index = GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_STRONG,
list_size);
- peer = &(peer_list[r_index]);
+ *peer = peer_list[r_index];
} while (NULL == peer);
return peer;
}
+
/**
* Make sure the context of a given peer exists in the given peer_map.
*/
@@ -784,7 +308,7 @@
ctx->mq = NULL;
ctx->to_channel = NULL;
ctx->from_channel = NULL;
- GNUNET_CONTAINER_multipeermap_put( peer_map, peer, ctx,
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
+ (void) GNUNET_CONTAINER_multipeermap_put( peer_map, peer, ctx,
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
}
}
@@ -893,13 +417,14 @@
//scale = .01;
estimate = GNUNET_NSE_log_estimate_to_n(logestimate);
// GNUNET_NSE_log_estimate_to_n (logestimate);
- estimate = pow(estimate, 1./3);// * (std_dev * scale); // TODO add
+ estimate = pow(estimate, 1./3);
+ // TODO add if std_dev is a number
+ // estimate += (std_dev * scale);
if ( 0 < estimate ) {
LOG(GNUNET_ERROR_TYPE_DEBUG, "Changing estimate to %f\n", estimate);
est_size = estimate;
- } else {
+ } else
LOG(GNUNET_ERROR_TYPE_DEBUG, "Not using estimate %f\n", estimate);
- }
}
/**
@@ -929,26 +454,26 @@
// TODO
msg = (struct GNUNET_RPS_CS_RequestMessage *) message;
- cli_ctx = GNUNET_SERVER_client_get_user_context(client, struct client_ctx);
+ cli_ctx = GNUNET_SERVER_client_get_user_context (client, struct client_ctx);
if ( NULL == cli_ctx ) {
cli_ctx = GNUNET_new(struct client_ctx);
- cli_ctx->mq = GNUNET_MQ_queue_for_server_client(client);
- GNUNET_SERVER_client_set_user_context(client, cli_ctx);
+ cli_ctx->mq = GNUNET_MQ_queue_for_server_client (client);
+ GNUNET_SERVER_client_set_user_context (client, cli_ctx);
}
// How many peers do we give back?
// Wait until we have enough random peers?
- ev = GNUNET_MQ_msg_extra(out_msg,
- GNUNET_ntohll(msg->num_peers) * sizeof(struct
GNUNET_PeerIdentity),
- GNUNET_MESSAGE_TYPE_RPS_CS_REPLY);
+ ev = GNUNET_MQ_msg_extra (out_msg,
+ GNUNET_ntohll (msg->num_peers) * sizeof (struct
GNUNET_PeerIdentity),
+ GNUNET_MESSAGE_TYPE_RPS_CS_REPLY);
out_msg->num_peers = msg->num_peers; // No conversion between network and
host order
- num_peers = GNUNET_ntohll(msg->num_peers);
- //&out_msg[1] = SAMPLER_get_n_rand_peers(sampler_list, num_peers);
+ num_peers = GNUNET_ntohll (msg->num_peers);
+ //&out_msg[1] = RPS_sampler_get_n_rand_peers(sampler_list, num_peers);
memcpy(&out_msg[1],
- SAMPLER_get_n_rand_peers(sampler_list, num_peers),
- num_peers * sizeof(struct GNUNET_PeerIdentity));
+ RPS_sampler_get_n_rand_peers (num_peers),
+ num_peers * sizeof (struct GNUNET_PeerIdentity));
GNUNET_MQ_send(cli_ctx->mq, ev);
//GNUNET_MQ_destroy(mq);
@@ -974,18 +499,19 @@
void **channel_ctx,
const struct GNUNET_MessageHeader *msg)
{
- LOG(GNUNET_ERROR_TYPE_DEBUG, "PUSH received\n");
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "PUSH received\n");
- struct GNUNET_PeerIdentity *peer;
+ const struct GNUNET_PeerIdentity *peer;
// TODO check the proof of work
- peer = (struct GNUNET_PeerIdentity *) GNUNET_CADET_channel_get_info(
channel, GNUNET_CADET_OPTION_PEER );
+ peer = (const struct GNUNET_PeerIdentity *) GNUNET_CADET_channel_get_info
(channel, GNUNET_CADET_OPTION_PEER);
+ // FIXME wait for cadet to change this function
/* Add the sending peer to the push_list */
- LOG(GNUNET_ERROR_TYPE_DEBUG, "Adding peer to push_list of size %u\n",
push_list_size);
- GNUNET_array_append(push_list, push_list_size, *peer);
- LOG(GNUNET_ERROR_TYPE_DEBUG, "Size of push_list is now %u\n",
push_list_size);
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "Adding peer to push_list of size %u\n",
push_list_size);
+ GNUNET_array_append (push_list, push_list_size, *peer);
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "Size of push_list is now %u\n",
push_list_size);
return GNUNET_OK;
}
@@ -1018,6 +544,7 @@
// otherwise remove from peerlist?
peer = (struct GNUNET_PeerIdentity *) GNUNET_CADET_channel_get_info(channel,
GNUNET_CADET_OPTION_PEER);
+ // FIXME wait for cadet to change this function
LOG(GNUNET_ERROR_TYPE_DEBUG, "PULL REQUEST from peer %s received\n",
GNUNET_i2s(peer));
//mq = GNUNET_CADET_mq_create(channel); // without mq?
@@ -1062,8 +589,17 @@
uint64_t i;
// TODO check that we sent a request and that it is the first reply
-
+ if (sizeof (struct GNUNET_RPS_P2P_PullReplyMessage) < ntohs (msg->size))
+ {
+ GNUNET_break_op (0); // At the moment our own implementation seems to
break that.
+ return GNUNET_SYSERR;
+ }
in_msg = (struct GNUNET_RPS_P2P_PullReplyMessage *) msg;
+ if (ntohs (msg->size) - sizeof (struct GNUNET_RPS_P2P_PullReplyMessage) /
sizeof (struct GNUNET_PeerIdentity) != GNUNET_ntohll (in_msg->num_peers))
+ {
+ GNUNET_break_op (0);
+ return GNUNET_SYSERR;
+ }
peers = (struct GNUNET_PeerIdentity *) &msg[1];
for ( i = 0 ; i < GNUNET_ntohll(in_msg->num_peers) ; i++ )
{
@@ -1089,10 +625,11 @@
struct GNUNET_RPS_P2P_PushMessage *push_msg;
struct GNUNET_RPS_P2P_PullRequestMessage *pull_msg; // FIXME Send empty
message
struct GNUNET_MQ_Envelope *ev;
- struct GNUNET_PeerIdentity *peer;
+ const struct GNUNET_PeerIdentity *peer;
+ struct GNUNET_MQ_Handle *mq;
// TODO print lists, ...
- // TODO cleanup peer_map
+ // TODO rendomise and spread calls herein over time
/* Would it make sense to have one shuffeled gossip list and then
@@ -1117,7 +654,8 @@
push_msg; */
push_msg->placeholder = 0;
// FIXME sometimes it returns a pointer to a freed mq
- GNUNET_MQ_send (get_mq (peer_map, peer), ev);
+ mq = get_mq (peer_map, peer);
+ GNUNET_MQ_send (mq, ev);
// modify in_flags of respective peer?
}
@@ -1137,16 +675,17 @@
ev = GNUNET_MQ_msg(pull_msg, GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST);
//ev = GNUNET_MQ_msg_extra();
- pull_msg->placeholder = 0;
- GNUNET_MQ_send( get_mq(peer_map, peer), ev );
+ //pull_msg->placeholder = 0;
+ pull_msg = NULL;
+ mq = get_mq (peer_map, peer);
+ GNUNET_MQ_send (mq, ev);
// modify in_flags of respective peer?
}
}
/* If the NSE has changed adapt the lists accordingly */
- if ( sampler_list->size != est_size )
- SAMPLER_samplers_resize(sampler_list, est_size, own_identity);
+ RPS_sampler_resize(est_size);
GNUNET_array_grow(gossip_list, gossip_list_size, est_size);
@@ -1165,7 +704,7 @@
first_border = round(alpha * gossip_list_size);
for ( i = 0 ; i < first_border ; i++ )
- { // TODO use SAMPLER_get_n_rand_peers
+ { // TODO use RPS_sampler_get_n_rand_peers
/* Update gossip list with peers received through PUSHes */
r_index = GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_STRONG,
push_list_size);
@@ -1186,9 +725,8 @@
for ( i = second_border ; i < gossip_list_size ; i++ )
{
/* Update gossip list with peers from history */
- r_index = GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_STRONG,
- sampler_list->size);
- gossip_list[i] = sampler_list->peer_ids[r_index];
+ peer = RPS_sampler_get_rand_peer();
+ gossip_list[i] = *peer;
// TODO change the in_flags accordingly
}
@@ -1204,60 +742,61 @@
for ( i = 0 ; i < push_list_size ; i++ )
{
- SAMPLER_update_list(sampler_list, &push_list[i]);
+ RPS_sampler_update_list (&push_list[i]);
// TODO set in_flag?
}
for ( i = 0 ; i < pull_list_size ; i++ )
{
- SAMPLER_update_list(sampler_list, &pull_list[i]);
+ RPS_sampler_update_list (&pull_list[i]);
// TODO set in_flag?
}
/* Empty push/pull lists */
- GNUNET_array_grow(push_list, push_list_size, 0);
+ GNUNET_array_grow (push_list, push_list_size, 0);
push_list_size = 0; // I guess that's not necessary but doesn't hurt
- GNUNET_array_grow(pull_list, pull_list_size, 0);
+ GNUNET_array_grow (pull_list, pull_list_size, 0);
pull_list_size = 0; // I guess that's not necessary but doesn't hurt
/* Schedule next round */
- do_round_task = GNUNET_SCHEDULER_add_delayed( round_interval, &do_round,
NULL );
- LOG(GNUNET_ERROR_TYPE_DEBUG, "Finished round\n");
+ do_round_task = GNUNET_SCHEDULER_add_delayed (round_interval, &do_round,
NULL);
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "Finished round\n");
}
/**
* Open a connection to given peer and store channel and mq.
*/
void
-insertCB (void *cls, const struct GNUNET_PeerIdentity *id, struct
GNUNET_HashCode hash)
+insertCB (void *cls, const struct GNUNET_PeerIdentity *id)
{
// We open a channel to be notified when this peer goes down.
- touch_channel(peer_map, id);
+ touch_channel (peer_map, id);
}
/**
* Close the connection to given peer and delete channel and mq.
*/
void
-removeCB (void *cls, const struct GNUNET_PeerIdentity *id, struct
GNUNET_HashCode hash)
+removeCB (void *cls, const struct GNUNET_PeerIdentity *id)
{
size_t s;
struct peer_context *ctx;
- s = SAMPLER_count_id(sampler_list, id);
- if ( 1 >= s ) {
- if (GNUNET_YES == GNUNET_CONTAINER_multipeermap_contains(peer_map, id))
+ s = RPS_sampler_count_id (id);
+ if ( 1 >= s )
+ {
+ if (GNUNET_YES == GNUNET_CONTAINER_multipeermap_contains (peer_map, id))
{
- ctx = GNUNET_CONTAINER_multipeermap_get(peer_map, id);
+ ctx = GNUNET_CONTAINER_multipeermap_get (peer_map, id);
if (NULL != ctx->to_channel)
{
if (NULL != ctx->mq)
{
- GNUNET_MQ_destroy(ctx->mq);
+ GNUNET_MQ_destroy (ctx->mq);
}
- GNUNET_CADET_channel_destroy(ctx->to_channel);
+ GNUNET_CADET_channel_destroy (ctx->to_channel);
}
// TODO cleanup peer
GNUNET_CONTAINER_multipeermap_remove_all(peer_map, id);
@@ -1282,27 +821,29 @@
unsigned int best_path) // "How long is the best path?
// (0 = unknown, 1 = ourselves, 2 =
neighbor)"
{
+ struct init_peer_cls *ipc;
+
+ ipc = (struct init_peer_cls *) cls;
if ( NULL != peer )
{
LOG(GNUNET_ERROR_TYPE_DEBUG, "Got peer %s (at %p) from CADET\n",
GNUNET_i2s(peer), peer);
- SAMPLER_update_list(sampler_list, peer);
+ RPS_sampler_update_list(peer);
touch_peer_ctx(peer_map, peer); // unneeded? -> insertCB
- gossip_list[g_i] = *peer;
- g_i++;
- // FIXME find a better way to have a global counter
+ gossip_list[ipc->i] = *peer;
+ ipc->i++;
// send push/pull to each of those peers?
}
else
{
- if (g_i < sampler_list->size)
+ if (ipc->i < gossip_list_size)
{
- memcpy(&gossip_list[g_i],
- &(sampler_list->peer_ids[g_i]),
- (sampler_list->size - g_i) * sizeof(struct GNUNET_PeerIdentity));
+ memcpy(&gossip_list[ipc->i],
+ RPS_sampler_get_rand_peer(),
+ (gossip_list_size - ipc->i) * sizeof(struct GNUNET_PeerIdentity));
}
- rps_start( (struct GNUNET_SERVER_Handle *) cls);
+ rps_start (ipc->server);
}
}
@@ -1328,7 +869,7 @@
GNUNET_NSE_disconnect(nse);
GNUNET_CADET_disconnect(cadet_handle);
GNUNET_free(own_identity);
- SAMPLER_samplers_destroy(sampler_list);
+ RPS_sampler_destroy();
GNUNET_array_grow(gossip_list, gossip_list_size, 0);
GNUNET_array_grow(push_list, push_list_size, 0);
GNUNET_array_grow(pull_list, pull_list_size, 0);
@@ -1405,7 +946,8 @@
peer = (struct GNUNET_PeerIdentity *) GNUNET_CADET_channel_get_info (
(struct GNUNET_CADET_Channel *) channel, GNUNET_CADET_OPTION_PEER);
// Guess simply casting isn't the nicest way...
- SAMPLER_reinitialise_samplers_by_value(sampler_list, peer);
+ // FIXME wait for cadet to change this function
+ RPS_sampler_reinitialise_by_value(peer);
}
/**
@@ -1413,7 +955,7 @@
*/
static void
rps_start (struct GNUNET_SERVER_Handle *server)
-{
+{ // TODO get msg sizes right
static const struct GNUNET_SERVER_MessageHandler handlers[] = {
{&handle_cs_request, NULL, GNUNET_MESSAGE_TYPE_RPS_CS_REQUEST, 0},
{NULL, NULL, 0, 0}
@@ -1453,19 +995,18 @@
LOG(GNUNET_ERROR_TYPE_DEBUG, "RPS started\n");
+ struct init_peer_cls *ipc;
+
cfg = c;
- own_identity = GNUNET_new(struct GNUNET_PeerIdentity); // needed?
-
+ /* Get own ID */
+ own_identity = GNUNET_new(struct GNUNET_PeerIdentity);
GNUNET_CRYPTO_get_peer_identity(cfg, own_identity); // TODO check return
value
-
GNUNET_assert(NULL != own_identity);
-
LOG(GNUNET_ERROR_TYPE_DEBUG, "Own identity is %s (at %p).\n",
GNUNET_i2s(own_identity), own_identity);
-
/* Get time interval from the configuration */
if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_time (cfg, "RPS",
"ROUNDINTERVAL",
@@ -1519,15 +1060,16 @@
"BETA",
&beta))
{
- LOG(GNUNET_ERROR_TYPE_DEBUG, "No BETA specified in the config\n");
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "No BETA specified in the config\n");
}
- LOG(GNUNET_ERROR_TYPE_DEBUG, "BETA is %f\n", beta);
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "BETA is %f\n", beta);
// TODO check that alpha + beta < 1
- peer_map = GNUNET_CONTAINER_multipeermap_create(est_size, GNUNET_NO);
+ peer_map = GNUNET_CONTAINER_multipeermap_create (est_size, GNUNET_NO);
+ /* Initialise cadet */
static const struct GNUNET_CADET_MessageHandler cadet_handlers[] = {
{&handle_peer_push , GNUNET_MESSAGE_TYPE_RPS_PP_PUSH , 0},
{&handle_peer_pull_request, GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST, 0},
@@ -1536,28 +1078,30 @@
};
const uint32_t ports[] = {GNUNET_RPS_CADET_PORT, 0}; // _PORT specified in
src/rps/rps.h
- cadet_handle = GNUNET_CADET_connect(cfg,
+ cadet_handle = GNUNET_CADET_connect (cfg,
cls,
&handle_inbound_channel,
&cleanup_channel,
cadet_handlers,
ports);
- LOG(GNUNET_ERROR_TYPE_DEBUG, "Connected to CADET\n");
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "Connected to CADET\n");
- /* Initialise sampler and gossip list */
+ /* Initialise sampler */
+ RPS_sampler_init (est_size, own_identity, insertCB, NULL, removeCB, NULL);
- sampler_list = SAMPLER_samplers_init(est_size, own_identity, insertCB, NULL,
removeCB, NULL);
-
+ /* Initialise push and pull maps */
push_list = NULL;
push_list_size = 0;
pull_list = NULL;
pull_list_size = 0;
- LOG(GNUNET_ERROR_TYPE_DEBUG, "Requesting peers from CADET\n");
- GNUNET_CADET_get_peers(cadet_handle, &init_peer_cb, server);
- // FIXME use magic 0000 PeerID to _start_ the service
+ ipc = GNUNET_new (struct init_peer_cls);
+ ipc->server = server;
+ ipc->i = 0;
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "Requesting peers from CADET\n");
+ GNUNET_CADET_get_peers (cadet_handle, &init_peer_cb, ipc);
// TODO send push/pull to each of those peers?
}
Added: gnunet/src/rps/gnunet-service-rps_sampler.c
===================================================================
--- gnunet/src/rps/gnunet-service-rps_sampler.c (rev 0)
+++ gnunet/src/rps/gnunet-service-rps_sampler.c 2015-01-06 23:48:24 UTC (rev
34826)
@@ -0,0 +1,677 @@
+/*
+ This file is part of GNUnet.
+ (C)
+
+ GNUnet is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published
+ by the Free Software Foundation; either version 3, or (at your
+ option) any later version.
+
+ GNUnet is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with GNUnet; see the file COPYING. If not, write to the
+ Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ Boston, MA 02111-1307, USA.
+*/
+
+/**
+ * @file rps/gnunet-service-rps_sampler.c
+ * @brief sampler implementation
+ * @author Julius Bünger
+ */
+#include "platform.h"
+#include "gnunet_util_lib.h"
+#include "rps.h"
+
+#include "gnunet-service-rps_sampler.h"
+
+#include <math.h>
+#include <inttypes.h>
+
+#define LOG(kind, ...) GNUNET_log(kind, __VA_ARGS__)
+
+// multiple 'clients'?
+
+// TODO check for overflows
+
+// TODO align message structs
+
+// hist_size_init, hist_size_max
+
+/***********************************************************************
+ * WARNING: This section needs to be reviewed regarding the use of
+ * functions providing (pseudo)randomness!
+***********************************************************************/
+
+// TODO care about invalid input of the caller (size 0 or less...)
+
+enum RPS_SamplerEmpty
+{
+ NOT_EMPTY = 0x0,
+ EMPTY = 0x1
+};
+
+/**
+ * A sampler element sampling one PeerID at a time.
+ */
+struct RPS_SamplerElement
+{
+ /**
+ * Min-wise linear permutation used by this sampler.
+ *
+ * This is an key later used by a hmac.
+ */
+ struct GNUNET_CRYPTO_AuthKey auth_key;
+
+ /**
+ * The PeerID this sampler currently samples.
+ */
+ struct GNUNET_PeerIdentity peer_id;
+
+ /**
+ * The according hash value of this PeerID.
+ */
+ struct GNUNET_HashCode peer_id_hash;
+
+ /**
+ * Time of last request.
+ */
+ struct GNUNET_TIME_Absolute last_request;
+
+ /**
+ * Flag that indicates that we are not holding a valid PeerID right now.
+ */
+ enum RPS_SamplerEmpty is_empty;
+};
+
+/**
+ * Sampler with its own array of SamplerElements
+ */
+struct RPS_Sampler
+{
+ /**
+ * Number of sampler elements we hold.
+ */
+ unsigned int sampler_size;
+ //size_t size;
+
+ /**
+ * All Samplers in one array.
+ */
+ struct RPS_SamplerElement **sampler_elements;
+
+ /**
+ * Index to a sampler element.
+ *
+ * Gets cycled on every hist_request.
+ */
+ uint64_t sampler_elem_index;
+
+ /**
+ * Callback to be called when a peer gets inserted into a sampler.
+ */
+ RPS_sampler_insert_cb insert_cb;
+
+ /**
+ * Closure to the insert_cb.
+ */
+ void *insert_cls;
+
+ /**
+ * Callback to be called when a peer gets inserted into a sampler.
+ */
+ RPS_sampler_remove_cb remove_cb;
+
+ /**
+ * Closure to the remove_cb.
+ */
+ void *remove_cls;
+};
+
+/**
+ * Global sampler variable.
+ */
+struct RPS_Sampler *sampler;
+
+
+/**
+ * The minimal size for the extended sampler elements.
+ */
+static size_t min_size;
+
+/**
+ * The maximal size the extended sampler elements should grow to.
+ */
+static size_t max_size;
+
+/**
+ * The size the extended sampler elements currently have.
+ */
+static size_t extra_size;
+
+/**
+ * Inedex to the sampler element that is the next to be returned
+ */
+static struct RPS_SamplerElement **extended_samplers_index;
+
+/**
+ * Request counter.
+ *
+ * Only needed in the beginning to check how many of the 64 deltas
+ * we already have
+ */
+static unsigned int req_counter;
+
+/**
+ * Time of the last request we received.
+ *
+ * Used to compute the expected request rate.
+ */
+static struct GNUNET_TIME_Absolute last_request;
+
+/**
+ * Last 64 deltas between requests
+ */
+static struct GNUNET_TIME_Relative request_deltas[64];
+
+/**
+ * The prediction of the rate of requests
+ */
+static struct GNUNET_TIME_Relative request_rate;
+
+
+/**
+ * Sum all time relatives of an array.
+ */
+ struct GNUNET_TIME_Relative
+T_relative_sum (const struct GNUNET_TIME_Relative *rel_array, uint64_t
arr_size)
+{
+ struct GNUNET_TIME_Relative sum;
+ uint64_t i;
+
+ sum = GNUNET_TIME_UNIT_ZERO;
+ for ( i = 0 ; i < arr_size ; i++ )
+ {
+ sum = GNUNET_TIME_relative_add (sum, rel_array[i]);
+ }
+ return sum;
+}
+
+/**
+ * Compute the average of given time relatives.
+ */
+ struct GNUNET_TIME_Relative
+T_relative_avg (const struct GNUNET_TIME_Relative *rel_array, uint64_t
arr_size)
+{
+ return T_relative_sum (rel_array, arr_size); // FIXME find a way to devide
that by arr_size
+}
+
+
+
+/**
+ * Reinitialise a previously initialised sampler element.
+ *
+ * @param sampler pointer to the memory that keeps the value.
+ */
+ static void
+RPS_sampler_elem_reinit (struct RPS_SamplerElement *sampler_el)
+{
+ sampler_el->is_empty = EMPTY;
+
+ // I guess I don't need to call GNUNET_CRYPTO_hmac_derive_key()...
+ GNUNET_CRYPTO_random_block(GNUNET_CRYPTO_QUALITY_STRONG,
+ &(sampler_el->auth_key.key),
+ GNUNET_CRYPTO_HASH_LENGTH);
+
+ sampler_el->last_request = GNUNET_TIME_UNIT_FOREVER_ABS;
+
+ /* We might want to keep the previous peer */
+
+ //GNUNET_CRYPTO_hmac(&sampler_el->auth_key, sampler_el->peer_id,
+ // sizeof(struct GNUNET_PeerIdentity),
+ // &sampler_el->peer_id_hash);
+}
+
+
+/**
+ * (Re)Initialise given Sampler with random min-wise independent function.
+ *
+ * In this implementation this means choosing an auth_key for later use in
+ * a hmac at random.
+ *
+ * @return a newly created RPS_SamplerElement which currently holds no id.
+ */
+ struct RPS_SamplerElement *
+RPS_sampler_elem_create (void)
+{
+ struct RPS_SamplerElement *s;
+
+ s = GNUNET_new (struct RPS_SamplerElement);
+
+ RPS_sampler_elem_reinit (s);
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: initialised with empty PeerID\n");
+
+ return s;
+}
+
+
+/**
+ * Input an PeerID into the given sampler.
+ */
+ static void
+RPS_sampler_elem_next (struct RPS_SamplerElement *s_elem, const struct
GNUNET_PeerIdentity *other,
+ RPS_sampler_insert_cb insert_cb, void *insert_cls,
+ RPS_sampler_remove_cb remove_cb, void *remove_cls)
+{
+ struct GNUNET_HashCode other_hash;
+
+ if ( 0 == GNUNET_CRYPTO_cmp_peer_identity(other, &(s_elem->peer_id)) )
+ {
+ LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Got PeerID %s_elem\n",
+ GNUNET_i2s(other));
+ LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Have already PeerID %s_elem\n",
+ GNUNET_i2s(&(s_elem->peer_id)));
+ }
+ else
+ {
+ GNUNET_CRYPTO_hmac(&s_elem->auth_key,
+ other,
+ sizeof(struct GNUNET_PeerIdentity),
+ &other_hash);
+
+ if ( EMPTY == s_elem->is_empty )
+ { // Or whatever is a valid way to say
+ // "we have no PeerID at the moment"
+ LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Got PeerID %s_elem; Simply
accepting (was empty previously).\n",
+ GNUNET_i2s(other));
+ s_elem->peer_id = *other;
+ //s_elem->peer_id = other;
+ s_elem->peer_id_hash = other_hash;
+ if (NULL != sampler->insert_cb)
+ {
+ sampler->insert_cb(sampler->insert_cls, &(s_elem->peer_id));
+ }
+ }
+ else if ( 0 > GNUNET_CRYPTO_hash_cmp(&other_hash, &s_elem->peer_id_hash) )
+ {
+ LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Got PeerID %s_elem\n",
+ GNUNET_i2s(other));
+ LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Discarding old PeerID %s_elem\n",
+ GNUNET_i2s(&s_elem->peer_id));
+
+ if ( NULL != sampler->remove_cb )
+ {
+ LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Removing old PeerID %s_elem
with the remove callback.\n",
+ GNUNET_i2s(&s_elem->peer_id));
+ sampler->remove_cb(sampler->remove_cls, &s_elem->peer_id);
+ }
+
+ memcpy(&s_elem->peer_id, other, sizeof(struct GNUNET_PeerIdentity));
+ //s_elem->peer_id = other;
+ s_elem->peer_id_hash = other_hash;
+
+ if ( NULL != sampler->insert_cb )
+ {
+ LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Inserting new PeerID %s_elem
with the insert callback.\n",
+ GNUNET_i2s(&s_elem->peer_id));
+ sampler->insert_cb(sampler->insert_cls, &s_elem->peer_id);
+ }
+ }
+ else
+ {
+ LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Got PeerID %s_elem\n",
+ GNUNET_i2s(other));
+ LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Keeping old PeerID %s_elem\n",
+ GNUNET_i2s(&s_elem->peer_id));
+ }
+ }
+ s_elem->is_empty = NOT_EMPTY;
+}
+
+
+/**
+ * Grow or shrink the size of the sampler.
+ *
+ * @param new_size the new size of the sampler
+ */
+ void
+RPS_sampler_resize (unsigned int new_size)
+{
+ unsigned int old_size;
+ uint64_t i;
+ struct RPS_SamplerElement **rem_list;
+
+ // TODO check min and max size
+
+ old_size = sampler->sampler_size;
+
+ if (old_size > new_size*4 &&
+ extra_size > new_size*4)
+ { /* Shrinking */
+
+ new_size /= 2;
+
+ /* Temporary store those to properly call the removeCB on those later */
+ rem_list = GNUNET_malloc ((old_size - new_size) * sizeof (struct
RPS_SamplerElement *));
+ memcpy (rem_list,
+ &sampler->sampler_elements[new_size],
+ (old_size - new_size) * sizeof(struct RPS_SamplerElement *));
+
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Shrinking sampler %d -> %d\n",
old_size, new_size);
+ GNUNET_array_grow (sampler->sampler_elements, sampler->sampler_size,
new_size);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "SAMPLER: sampler->sampler_elements now points to %p\n",
+ sampler->sampler_elements);
+
+ // TODO move extended_samplers_index
+ for (i = new_size ; i < old_size ; i++)
+ {/* Remove unneeded rest */
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Removing %" PRIX64 ".
sampler\n", i);
+ if (NULL != sampler->remove_cb)
+ sampler->remove_cb (sampler->remove_cls, &rem_list[i]->peer_id);
+ GNUNET_free (rem_list[i]);
+ }
+ }
+ else if (old_size < new_size)// ||
+ //extra_size < new_size) // needed?
+ { /* Growing */
+ new_size *= 2; // TODO check overflow
+
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Growing sampler %d -> %d\n",
old_size, new_size);
+ GNUNET_array_grow (sampler->sampler_elements, sampler->sampler_size,
new_size);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "SAMPLER: sampler->sampler_elements now points to %p\n",
+ sampler->sampler_elements);
+
+ // TODO move extended_samplers_index
+
+ for ( i = old_size ; i < new_size ; i++ )
+ { /* Add new sampler elements */
+ sampler->sampler_elements[i] = RPS_sampler_elem_create ();
+ if (NULL != sampler->insert_cb)
+ sampler->insert_cb (sampler->insert_cls,
&sampler->sampler_elements[i]->peer_id);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "SAMPLER: Added %" PRIX64 ". sampler, now pointing to %p, contains
%s\n",
+ i, &sampler->sampler_elements[i], GNUNET_i2s
(&sampler->sampler_elements[i]->peer_id));
+ }
+ }
+ else
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Size remains the same -- nothing
to do\n");
+ return;
+ }
+
+ GNUNET_assert(sampler->sampler_size == new_size);
+ LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Finished growing/shrinking.\n"); //
remove
+}
+
+
+/**
+ * Initialise a tuple of sampler elements.
+ *
+ * @param init_size the size the sampler is initialised with
+ * @param id with which all newly created sampler elements are initialised
+ * @param ins_cb the callback that will be called on every PeerID that is
+ * newly inserted into a sampler element
+ * @param ins_cls the closure given to #ins_cb
+ * @param rem_cb the callback that will be called on every PeerID that is
+ * removed from a sampler element
+ * @param rem_cls the closure given to #rem_cb
+ */
+ void
+RPS_sampler_init (size_t init_size, const struct GNUNET_PeerIdentity *id,
+ RPS_sampler_insert_cb ins_cb, void *ins_cls,
+ RPS_sampler_remove_cb rem_cb, void *rem_cls)
+{
+ //struct RPS_Sampler *sampler;
+ //uint64_t i;
+
+ /* Initialise context around extended sampler */
+ min_size = 10; // TODO make input to _samplers_init()
+ max_size = 1000; // TODO make input to _samplers_init()
+ GNUNET_new_array (64, struct GNUNET_TIME_Relative);
+
+ sampler = GNUNET_new (struct RPS_Sampler);
+ sampler->sampler_size = 0;
+ sampler->sampler_elements = NULL;
+ sampler->insert_cb = ins_cb;
+ sampler->insert_cls = ins_cls;
+ sampler->remove_cb = rem_cb;
+ sampler->remove_cls = rem_cls;
+ //sampler->sampler_elements = GNUNET_new_array(init_size, struct
GNUNET_PeerIdentity);
+ //GNUNET_array_grow (sampler->sampler_elements, sampler->sampler_size,
min_size);
+ RPS_sampler_resize (init_size);
+ RPS_sampler_update_list (id); // no super nice desing but ok for the moment
+
+ extended_samplers_index = sampler->sampler_elements;
+
+ //GNUNET_assert (init_size == sampler->sampler_size);
+}
+
+
+/**
+ * A fuction to update every sampler in the given list
+ *
+ * @param id the PeerID that is put in the sampler
+ */
+ void
+RPS_sampler_update_list (const struct GNUNET_PeerIdentity *id)
+{
+ uint64_t i;
+
+ for ( i = 0 ; i < sampler->sampler_size ; i++ )
+ RPS_sampler_elem_next (sampler->sampler_elements[i], id,
+ sampler->insert_cb, sampler->insert_cls,
+ sampler->remove_cb, sampler->remove_cls);
+}
+
+
+/**
+ * Reinitialise all previously initialised sampler elements with the given
value.
+ *
+ * Used to get rid of a PeerID.
+ *
+ * @param id the id of the sampler elements to update.
+ */
+ void
+RPS_sampler_reinitialise_by_value (const struct GNUNET_PeerIdentity *id)
+{
+ uint64_t i;
+
+ for ( i = 0 ; i < sampler->sampler_size ; i++ )
+ {
+ if ( 0 == GNUNET_CRYPTO_cmp_peer_identity(id,
&(sampler->sampler_elements[i]->peer_id)) )
+ {
+ LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Reinitialising sampler\n");
+ RPS_sampler_elem_reinit (sampler->sampler_elements[i]);
+ }
+ }
+}
+
+
+/**
+ * Get one random peer out of the sampled peers.
+ *
+ * We might want to reinitialise this sampler after giving the
+ * corrsponding peer to the client.
+ * Only used internally
+ */
+ const struct GNUNET_PeerIdentity *
+RPS_sampler_get_rand_peer_ ()
+{
+ uint64_t r_index;
+ const struct GNUNET_PeerIdentity *peer; // do we have to malloc that?
+
+ // TODO implement extra logic
+
+ /**;
+ * Choose the r_index of the peer we want to return
+ * at random from the interval of the gossip list
+ */
+ r_index = GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_STRONG,
+ sampler->sampler_size);
+
+ //if ( EMPTY == sampler->sampler_elements[r_index]->is_empty )
+ // // TODO schedule for later
+ // peer = NULL;
+ //else
+ peer = &(sampler->sampler_elements[r_index]->peer_id);
+ sampler->sampler_elements[r_index]->last_request =
GNUNET_TIME_absolute_get();
+ LOG(GNUNET_ERROR_TYPE_DEBUG, "Sgrp: Returning PeerID %s\n",
GNUNET_i2s(peer));
+
+
+ return peer;
+}
+
+/**
+ * Get n random peers out of the sampled peers.
+ *
+ * We might want to reinitialise this sampler after giving the
+ * corrsponding peer to the client.
+ * Random with or without consumption?
+ * Only used internally
+ */
+ const struct GNUNET_PeerIdentity *
+RPS_sampler_get_n_rand_peers_ (uint64_t n)
+{
+ if ( 0 == sampler->sampler_size )
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Sgrp: List empty - Returning NULL\n");
+ return NULL;
+ }
+ else
+ {
+ // TODO check if we have too much (distinct) sampled peers
+ // If we are not ready yet maybe schedule for later
+ struct GNUNET_PeerIdentity *peers;
+ uint64_t i;
+
+ peers = GNUNET_malloc (n * sizeof(struct GNUNET_PeerIdentity));
+
+ for ( i = 0 ; i < n ; i++ ) {
+ //peers[i] = RPS_sampler_get_rand_peer_(sampler->sampler_elements);
+ memcpy (&peers[i], RPS_sampler_get_rand_peer_ (), sizeof (struct
GNUNET_PeerIdentity));
+ }
+ return peers;
+ }
+}
+
+
+/**
+ * Get one random peer out of the sampled peers.
+ *
+ * We might want to reinitialise this sampler after giving the
+ * corrsponding peer to the client.
+ *
+ * @return a random PeerID of the PeerIDs previously put into the sampler.
+ */
+ const struct GNUNET_PeerIdentity *
+RPS_sampler_get_rand_peer ()
+{
+ struct GNUNET_PeerIdentity *peer;
+
+ if (64 > req_counter)
+ req_counter++;
+ if (1 < req_counter)
+ {
+ memcpy (&request_deltas[1],
+ request_deltas,
+ (req_counter - 1) * sizeof (struct GNUNET_TIME_Relative));
+ request_deltas[0] = GNUNET_TIME_absolute_get_difference (last_request,
+ GNUNET_TIME_absolute_get ());
+ request_rate = T_relative_avg (request_deltas, req_counter);
+ }
+ last_request = GNUNET_TIME_absolute_get();
+ // TODO resize the size of the extended_samplers
+
+ // use _get_rand_peer_ ?
+ peer = GNUNET_new (struct GNUNET_PeerIdentity);
+ *peer = (*extended_samplers_index)->peer_id;
+ RPS_sampler_elem_reinit (*extended_samplers_index);
+ if ( extended_samplers_index ==
&sampler->sampler_elements[sampler->sampler_size -1] )
+ extended_samplers_index = &sampler->sampler_elements[0];
+ else
+ extended_samplers_index++;
+ // TODO
+ return peer;
+}
+
+
+/**
+ * Get n random peers out of the sampled peers.
+ *
+ * We might want to reinitialise this sampler after giving the
+ * corrsponding peer to the client.
+ * Random with or without consumption?
+ *
+ * @return n random PeerIDs of the PeerIDs previously put into the sampler.
+ */
+ const struct GNUNET_PeerIdentity *
+RPS_sampler_get_n_rand_peers (uint64_t n)
+{
+ // use _get_rand_peers_ ?
+ if ( 0 == sampler->sampler_size )
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Sgrp: List empty - Returning NULL\n");
+ return NULL;
+ }
+ else
+ {
+ // TODO check if we have too much (distinct) sampled peers
+ // If we are not ready yet maybe schedule for later
+ struct GNUNET_PeerIdentity *peers;
+ uint64_t i;
+
+ peers = GNUNET_malloc (n * sizeof(struct GNUNET_PeerIdentity));
+
+ for ( i = 0 ; i < n ; i++ ) {
+ //peers[i] = RPS_sampler_get_rand_peer_(sampler->sampler_elements);
+ memcpy (&peers[i], RPS_sampler_get_rand_peer (), sizeof (struct
GNUNET_PeerIdentity));
+ }
+ return peers;
+ }
+}
+
+
+/**
+ * Counts how many Samplers currently hold a given PeerID.
+ *
+ * @param id the PeerID to count.
+ *
+ * @return the number of occurrences of id.
+ */
+ uint64_t
+RPS_sampler_count_id (const struct GNUNET_PeerIdentity *id)
+{
+ uint64_t count;
+ uint64_t i;
+
+ count = 0;
+ for ( i = 0 ; i < sampler->sampler_size ; i++ )
+ {
+ if ( 0 == GNUNET_CRYPTO_cmp_peer_identity
(&sampler->sampler_elements[i]->peer_id, id)
+ && EMPTY != sampler->sampler_elements[i]->is_empty)
+ count++;
+ }
+ return count;
+}
+
+
+/**
+ * Cleans the sampler.
+ */
+ void
+RPS_sampler_destroy ()
+{
+ RPS_sampler_resize (0);
+ GNUNET_free (request_deltas); // _array_grow()?
+ GNUNET_array_grow (sampler->sampler_elements, sampler->sampler_size, 0);
+}
+
+/* end of gnunet-service-rps.c */
Added: gnunet/src/rps/gnunet-service-rps_sampler.h
===================================================================
--- gnunet/src/rps/gnunet-service-rps_sampler.h (rev 0)
+++ gnunet/src/rps/gnunet-service-rps_sampler.h 2015-01-06 23:48:24 UTC (rev
34826)
@@ -0,0 +1,147 @@
+/*
+ This file is part of GNUnet.
+ (C)
+
+ GNUnet is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published
+ by the Free Software Foundation; either version 3, or (at your
+ option) any later version.
+
+ GNUnet is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with GNUnet; see the file COPYING. If not, write to the
+ Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ Boston, MA 02111-1307, USA.
+*/
+
+/**
+ * @file rps/gnunet-service-rps_sampler.h
+ * @brief sampler implementation
+ * @author Julius Bünger
+ */
+
+#ifndef RPS_SAMPLER_H
+#define RPS_SAMPLER_H
+#include <inttypes.h>
+
+/**
+ * Callback that is called when a new PeerID is inserted into a sampler.
+ *
+ * @param cls the closure given alongside this function.
+ * @param id the PeerID that is inserted
+ */
+typedef void
+(*RPS_sampler_insert_cb) (void *cls,
+ const struct GNUNET_PeerIdentity *id);
+
+/**
+ * Callback that is called when a new PeerID is removed from a sampler.
+ *
+ * @param cls the closure given alongside this function.
+ * @param id the PeerID that is removed
+ */
+typedef void
+(*RPS_sampler_remove_cb) (void *cls,
+ const struct GNUNET_PeerIdentity *id);
+
+/**
+ * A sampler sampling a stream of PeerIDs.
+ */
+//struct RPS_Sampler;
+
+
+/**
+ * Grow or shrink the size of the sampler.
+ *
+ * @param new_size the new size of the sampler
+ */
+ void
+RPS_sampler_resize (unsigned int new_size);
+
+
+/**
+ * Initialise a tuple of samplers.
+ *
+ * @param init_size the size the sampler is initialised with
+ * @param id with which all newly created sampler elements are initialised
+ * @param ins_cb the callback that will be called on every PeerID that is
+ * newly inserted into a sampler element
+ * @param ins_cls the closure given to #ins_cb
+ * @param rem_cb the callback that will be called on every PeerID that is
+ * removed from a sampler element
+ * @param rem_cls the closure given to #rem_cb
+ */
+ void
+RPS_sampler_init (size_t init_size, const struct GNUNET_PeerIdentity *id,
+ RPS_sampler_insert_cb ins_cb, void *ins_cls,
+ RPS_sampler_remove_cb rem_cb, void *rem_cls);
+
+
+/**
+ * A fuction to update every sampler in the given list
+ *
+ * @param id the PeerID that is put in the sampler
+ */
+ void
+RPS_sampler_update_list (const struct GNUNET_PeerIdentity *id);
+
+
+/**
+ * Reinitialise all previously initialised sampler elements with the given
value.
+ *
+ * Used to get rid of a PeerID.
+ *
+ * @param id the id of the samplers to update.
+ */
+ void
+RPS_sampler_reinitialise_by_value (const struct GNUNET_PeerIdentity *id);
+
+
+/**
+ * Get one random peer out of the sampled peers.
+ *
+ * We might want to reinitialise this sampler after giving the
+ * corrsponding peer to the client.
+ *
+ * @return a random PeerID of the PeerIDs previously put into the sampler.
+ */
+ const struct GNUNET_PeerIdentity *
+RPS_sampler_get_rand_peer ();
+
+
+/**
+ * Get n random peers out of the sampled peers.
+ *
+ * We might want to reinitialise this sampler after giving the
+ * corrsponding peer to the client.
+ * Random with or without consumption?
+ *
+ * @return n random PeerIDs of the PeerIDs previously put into the sampler.
+ */
+ const struct GNUNET_PeerIdentity *
+RPS_sampler_get_n_rand_peers (uint64_t n);
+
+
+/**
+ * Counts how many Samplers currently hold a given PeerID.
+ *
+ * @param id the PeerID to count.
+ *
+ * @return the number of occurrences of id.
+ */
+ uint64_t
+RPS_sampler_count_id (const struct GNUNET_PeerIdentity *id);
+
+
+/**
+ * Cleans the samplers.
+ */
+ void
+RPS_sampler_destroy ();
+
+#endif
+/* end of gnunet-service-rps.c */
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r34826 - gnunet/src/rps,
gnunet <=