gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r34826 - gnunet/src/rps


From: gnunet
Subject: [GNUnet-SVN] r34826 - gnunet/src/rps
Date: Wed, 7 Jan 2015 00:48:24 +0100

Author: ch3
Date: 2015-01-07 00:48:24 +0100 (Wed, 07 Jan 2015)
New Revision: 34826

Added:
   gnunet/src/rps/gnunet-service-rps_sampler.c
   gnunet/src/rps/gnunet-service-rps_sampler.h
Modified:
   gnunet/src/rps/gnunet-service-rps.c
Log:
moved sampler functionality in file of its own

Modified: gnunet/src/rps/gnunet-service-rps.c
===================================================================
--- gnunet/src/rps/gnunet-service-rps.c 2015-01-06 14:35:19 UTC (rev 34825)
+++ gnunet/src/rps/gnunet-service-rps.c 2015-01-06 23:48:24 UTC (rev 34826)
@@ -29,6 +29,8 @@
 #include "gnunet_nse_service.h"
 #include "rps.h"
 
+#include "gnunet-service-rps_sampler.h"
+
 #include <math.h>
 #include <inttypes.h>
 
@@ -48,6 +50,11 @@
 
 // TODO Change API to accept initialisation peers
 
+// TODO Change API to accept good peers 'friends'
+
+
+// hist_size_init, hist_size_max
+
 /**
  * Our configuration.
  */
@@ -58,515 +65,28 @@
  */
 static struct GNUNET_PeerIdentity *own_identity;
 
-
-  struct GNUNET_PeerIdentity *
-get_rand_peer(struct GNUNET_PeerIdentity *peer_list, unsigned int size);
-
-/***********************************************************************
- * Sampler
- *
- * WARNING: This section needs to be reviewed regarding the use of
- * functions providing (pseudo)randomness!
-***********************************************************************/
-
-// TODO care about invalid input of the caller (size 0 or less...)
-
-// It might be interesting to formulate this independent of PeerIDs.
-
 /**
- * Callback that is called when a new PeerID is inserted into a sampler.
- *
- * @param cls the closure given alongside this function.
- * @param id the PeerID that is inserted
- * @param hash the hash the sampler produced of the PeerID
+ * Closure to the callback cadet calls on each peer it passes to us
  */
-typedef void (* SAMPLER_insertCB) (void *cls,
-    const struct GNUNET_PeerIdentity *id,
-    struct GNUNET_HashCode hash);
-
-/**
- * Callback that is called when a new PeerID is removed from a sampler.
- *
- * @param cls the closure given alongside this function.
- * @param id the PeerID that is removed
- * @param hash the hash the sampler produced of the PeerID
- */
-typedef void (* SAMPLER_removeCB) (void *cls,
-    const struct GNUNET_PeerIdentity *id,
-    struct GNUNET_HashCode hash);
-
-/**
- * A sampler sampling PeerIDs.
- */
-struct Sampler
+struct init_peer_cls
 {
   /**
-   * Min-wise linear permutation used by this sampler.
-   *
-   * This is an key later used by a hmac.
+   * The server handle to later listen to client requests
    */
-  struct GNUNET_CRYPTO_AuthKey auth_key;
+  struct GNUNET_SERVER_Handle *server;
 
   /**
-   * The PeerID this sampler currently samples.
+   * Counts how many peers cadet already passed to us
    */
-  struct GNUNET_PeerIdentity *peer_id;
-
-  /**
-   * The according hash value of this PeerID.
-   */
-  struct GNUNET_HashCode peer_id_hash;
-
-  /**
-   * Samplers are kept in a linked list.
-   */
-  struct Sampler *next;
-
-  /**
-   * Samplers are kept in a linked list.
-   */
-  struct Sampler *prev;
-
+  uint32_t i;
 };
 
-/**
- * A n-tuple of samplers.
- */
-struct Samplers
-{
-  /**
-   * Number of samplers we hold.
-   */
-  unsigned int size;
-  //size_t size;
-  
-  /**
-   * All PeerIDs in one array.
-   */
-  struct GNUNET_PeerIdentity *peer_ids;
 
-  /**
-   * Callback to be called when a peer gets inserted into a sampler.
-   */
-  SAMPLER_insertCB insertCB;
+  struct GNUNET_PeerIdentity *
+get_rand_peer (const struct GNUNET_PeerIdentity *peer_list, unsigned int size);
 
-  /**
-   * Closure to the insertCB.
-   */
-  void *insertCLS;
 
-  /**
-   * Callback to be called when a peer gets inserted into a sampler.
-   */
-  SAMPLER_removeCB removeCB;
-
-  /**
-   * Closure to the removeCB.
-   */
-  void *removeCLS;
-
-  /**
-   * The head of the DLL.
-   */
-  struct Sampler *head;
-
-  /**
-   * The tail of the DLL.
-   */
-  struct Sampler *tail;
-
-};
-
-/**
- * Reinitialise a previously initialised sampler.
- *
- * @param sampler the sampler element.
- * @param id pointer to the memory that keeps the value.
- */
-  void
-SAMPLER_reinitialise_sampler (struct Sampler *sampler, struct 
GNUNET_PeerIdentity *id)
-{
-  // I guess I don't need to call GNUNET_CRYPTO_hmac_derive_key()...
-  GNUNET_CRYPTO_random_block(GNUNET_CRYPTO_QUALITY_STRONG,
-                             &(sampler->auth_key.key),
-                             GNUNET_CRYPTO_HASH_LENGTH);
-
-  GNUNET_assert(NULL != id);
-  sampler->peer_id = id;
-  memcpy(sampler->peer_id, own_identity, sizeof(struct GNUNET_PeerIdentity)); 
// FIXME this should probably be NULL -- the caller has to handle those.
-  // Maybe take a PeerID as second argument.
-
-  GNUNET_CRYPTO_hmac(&sampler->auth_key, sampler->peer_id,
-                     sizeof(struct GNUNET_PeerIdentity),
-                     &sampler->peer_id_hash);
-}
-
-
-/**
- * (Re)Initialise given Sampler with random min-wise independent function.
- *
- * In this implementation this means choosing an auth_key for later use in
- * a hmac at random.
- *
- * @param id pointer to the place where this sampler will store the PeerID.
- *           This will be overwritten.
- */
-  struct Sampler *
-SAMPLER_init(struct GNUNET_PeerIdentity *id)
-{
-  struct Sampler *s;
-  
-  s = GNUNET_new(struct Sampler);
-
-  SAMPLER_reinitialise_sampler (s, id);
-  LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: initialised with PeerID %s (at %p) 
\n",
-      GNUNET_i2s(s->peer_id), s->peer_id);
-
-  s->prev = NULL;
-  s->next = NULL;
-
-  return s;
-}
-
-/**
- * Input an PeerID into the given sampler.
- */
-  static void
-SAMPLER_next(struct Sampler *s, const struct GNUNET_PeerIdentity *other,
-    SAMPLER_insertCB insertCB, void *insertCLS,
-    SAMPLER_removeCB removeCB, void *removeCLS)
-  // TODO call update herein
-{
-  struct GNUNET_HashCode other_hash;
-
-  LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: New PeerID %s at %p\n",
-      GNUNET_i2s(other), other);
-  LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Old PeerID %s at %p\n",
-      GNUNET_i2s(s->peer_id), s->peer_id);
-
-  if ( 0 == GNUNET_CRYPTO_cmp_peer_identity(other, s->peer_id) )
-  {
-    LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER:          Got PeerID %s\n",
-        GNUNET_i2s(other));
-    LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Have already PeerID %s\n",
-        GNUNET_i2s(s->peer_id));
-  }
-  else
-  {
-    GNUNET_CRYPTO_hmac(&s->auth_key,
-        other,
-        sizeof(struct GNUNET_PeerIdentity),
-        &other_hash);
-
-    if ( NULL == s->peer_id )
-    { // Or whatever is a valid way to say
-      // "we have no PeerID at the moment"
-      LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Got PeerID %s; Simply accepting 
(got NULL previously).\n",
-          GNUNET_i2s(other));
-      memcpy(s->peer_id, other, sizeof(struct GNUNET_PeerIdentity));
-      //s->peer_id = other;
-      s->peer_id_hash = other_hash;
-      if (NULL != insertCB)
-      {
-        insertCB(insertCLS, s->peer_id, s->peer_id_hash);
-      }
-    }
-    else if ( 0 > GNUNET_CRYPTO_hash_cmp(&other_hash, &s->peer_id_hash) )
-    {
-      LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER:            Got PeerID %s\n",
-          GNUNET_i2s(other));
-      LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Discarding old PeerID %s\n",
-          GNUNET_i2s(s->peer_id));
-
-      if ( NULL != removeCB )
-      {
-        LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Removing old PeerID %s with the 
remove callback.\n",
-            GNUNET_i2s(s->peer_id));
-        removeCB(removeCLS, s->peer_id, s->peer_id_hash);
-      }
-
-      memcpy(s->peer_id, other, sizeof(struct GNUNET_PeerIdentity));
-      //s->peer_id = other;
-      s->peer_id_hash = other_hash;
-
-      if ( NULL != insertCB )
-      {
-        LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Inserting new PeerID %s with 
the insert callback.\n",
-            GNUNET_i2s(s->peer_id));
-        insertCB(insertCLS, s->peer_id, s->peer_id_hash);
-      }
-    }
-    else
-    {
-      LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER:         Got PeerID %s\n",
-          GNUNET_i2s(other));
-      LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Keeping old PeerID %s\n",
-          GNUNET_i2s(s->peer_id));
-    }
-  }
-}
-
-/**
- * Gow or shrink the size of the tuple of samplers.
- *
- * @param samplers the samplers to grow
- * @param new_size the new size of the samplers
- * @param fill_up_id if growing, that has to point to a
- *                   valid PeerID and will be used
- *                   to initialise newly created samplers
- */
-  void
-SAMPLER_samplers_resize (struct Samplers * samplers,
-    unsigned int new_size,
-    struct GNUNET_PeerIdentity *fill_up_id)
-{
-  if ( samplers->size == new_size )
-  {
-    LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Size remains the same -- nothing to 
do\n");
-    return;
-  }
-
-  unsigned int old_size;
-  struct Sampler *iter;
-  uint64_t i;
-  struct Sampler *tmp;
-
-  old_size = samplers->size;
-  LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Growing/Shrinking samplers %u -> 
%u\n", old_size, new_size);
-
-  iter = samplers->head;
-
-  if ( new_size < old_size )
-  {
-    for ( i = new_size ; i < old_size ; i++ )
-    {/* Remove unneeded rest */
-      tmp = iter->next;
-      LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Removing %" PRIX64 ". sampler\n", 
i);
-      if (NULL != samplers->removeCB)
-        samplers->removeCB(samplers->removeCLS, iter->peer_id, 
iter->peer_id_hash);
-        // FIXME When this is called and counts the amount of peer_ids in the 
samplers
-        //       this gets a wrong number.
-      GNUNET_CONTAINER_DLL_remove(samplers->head, samplers->tail, iter);
-      GNUNET_free(iter);
-      iter = tmp;
-    }
-  }
-
-  GNUNET_array_grow(samplers->peer_ids, samplers->size, new_size);
-  LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: samplers->peer_ids now points to 
%p\n", samplers->peer_ids);
-
-  if ( new_size > old_size )
-  { /* Growing */
-    GNUNET_assert( NULL != fill_up_id );
-    for ( i = 0 ; i < new_size ; i++ )
-    { /* All samplers */
-      if ( i < old_size )
-      { /* Update old samplers */
-        iter->peer_id = &samplers->peer_ids[i];
-        LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Updated %" PRIX64 ". sampler, 
now pointing to %p, contains %s\n",
-            i, &samplers->peer_ids[i], GNUNET_i2s(iter->peer_id));
-        iter = iter->next;
-      }
-      else
-      { /* Add new samplers */
-        memcpy(&samplers->peer_ids[i], fill_up_id, sizeof(struct 
GNUNET_PeerIdentity));
-        iter = SAMPLER_init(&samplers->peer_ids[i]);
-        if (NULL != samplers->insertCB)
-        {
-          samplers->insertCB(samplers->insertCLS, iter->peer_id, 
iter->peer_id_hash);
-        }
-        GNUNET_CONTAINER_DLL_insert_tail(samplers->head, samplers->tail, iter);
-        LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Added %" PRIX64 ". sampler, now 
pointing to %p, contains %s\n",
-            i, &samplers->peer_ids[i], GNUNET_i2s(iter->peer_id));
-      }
-    }
-  }
-  else// if ( new_size < old_size )
-  { /* Shrinking */
-    for ( i = 0 ; i < new_size ; i++)
-    { /* All samplers */
-      tmp = iter->next;
-      /* Update remaining samplers */
-      iter->peer_id = &samplers->peer_ids[i];
-      LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Updatied %" PRIX64 ". sampler, 
now pointing to %p, contains %s\n",
-          i, &samplers->peer_ids[i], GNUNET_i2s(iter->peer_id));
-
-      iter = tmp;
-    }
-  }
-
-  GNUNET_assert(samplers->size == new_size);
-  LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Finished growing/shrinking.\n");
-}
-
-
-/**
- * Initialise a tuple of samplers.
- */
-struct Samplers *
-SAMPLER_samplers_init(size_t init_size, struct GNUNET_PeerIdentity *id,
-    SAMPLER_insertCB insertCB, void *insertCLS,
-    SAMPLER_removeCB removeCB, void *removeCLS)
-{
-  struct Samplers *samplers;
-  //struct Sampler *s;
-  //uint64_t i;
-
-  samplers = GNUNET_new(struct Samplers);
-  samplers->size = 0;
-  samplers->peer_ids = NULL;
-  samplers->insertCB = insertCB;
-  samplers->insertCLS = insertCLS;
-  samplers->removeCB = removeCB;
-  samplers->removeCLS = removeCLS;
-  samplers->head = samplers->tail = NULL;
-  //samplers->peer_ids = GNUNET_new_array(init_size, struct 
GNUNET_PeerIdentity);
-
-  SAMPLER_samplers_resize(samplers, init_size, id);
-
-  GNUNET_assert(init_size == samplers->size);
-  return samplers;
-}
-
-
-/**
- * A fuction to update every sampler in the given list
- */
-  static void
-SAMPLER_update_list(struct Samplers *samplers, const struct 
GNUNET_PeerIdentity *id)
-{
-  struct Sampler *iter;
-
-  iter = samplers->head;
-  while ( NULL != iter->next )
-  {
-    SAMPLER_next(iter, id,
-        samplers->insertCB, samplers->insertCLS,
-        samplers->removeCB, samplers->removeCLS);
-    iter = iter->next;
-  }
-  
-}
-
-/**
- * Reinitialise all previously initialised sampler with the given value.
- *
- * @param samplers the sampler list.
- * @param id the id of the samplers to update.
- */
-  void
-SAMPLER_reinitialise_samplers_by_value (struct Samplers *samplers, const 
struct GNUNET_PeerIdentity *id)
-{
-  uint64_t i;
-  struct Sampler *iter;
-
-  iter = samplers->head;
-  for ( i = 0 ; i < samplers->size ; i++ )
-  {
-    if ( 0 == GNUNET_CRYPTO_cmp_peer_identity(id, &samplers->peer_ids[i]) )
-    {
-      LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Reinitialising sampler\n");
-      SAMPLER_reinitialise_sampler (iter, &samplers->peer_ids[i]);
-    }
-    if (NULL != iter->next)
-      iter = iter->next;
-  }
-}
-
-/**
- * Get one random peer out of the sampled peers.
- *
- * We might want to reinitialise this sampler after giving the
- * corrsponding peer to the client.
- */
-  const struct GNUNET_PeerIdentity* 
-SAMPLER_get_rand_peer (struct Samplers *samplers)
-{
-  LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER_get_rand_peer:\n");
-
-  if ( 0 == samplers->size )
-  {
-    LOG(GNUNET_ERROR_TYPE_DEBUG, "Sgrp: List empty - Returning own PeerID 
%s\n", GNUNET_i2s(own_identity));
-    return own_identity;
-  }
-  else
-  {
-    const struct GNUNET_PeerIdentity *peer;
-
-    peer = get_rand_peer(samplers->peer_ids, samplers->size);
-    LOG(GNUNET_ERROR_TYPE_DEBUG, "Sgrp: Returning PeerID %s\n", 
GNUNET_i2s(peer));
-    LOG(GNUNET_ERROR_TYPE_DEBUG, "Sgrp: (own ID: %s)\n", 
GNUNET_i2s(own_identity));
-
-    return peer;
-  }
-}
-
-/**
- * Get n random peers out of the sampled peers.
- *
- * We might want to reinitialise this sampler after giving the
- * corrsponding peer to the client.
- * Random with or without consumption?
- */
-  const struct GNUNET_PeerIdentity*  // TODO give back simple array
-SAMPLER_get_n_rand_peers (struct Samplers *samplers, uint64_t n)
-{
-  // TODO check if we have too much (distinct) sampled peers
-  // If we are not ready yet maybe schedule for later
-  struct GNUNET_PeerIdentity *peers;
-  uint64_t i;
-  
-  peers = GNUNET_malloc(n * sizeof(struct GNUNET_PeerIdentity));
-
-  for ( i = 0 ; i < n ; i++ ) {
-    //peers[i] = SAMPLER_get_rand_peer(samplers);
-    memcpy(&peers[i], SAMPLER_get_rand_peer(samplers), sizeof(struct 
GNUNET_PeerIdentity));
-  }
-
-  // TODO something else missing?
-  return peers;
-}
-
-/**
- * Counts how many Samplers currently hold a given PeerID.
- */
-  uint64_t
-SAMPLER_count_id ( struct Samplers *samplers, const struct GNUNET_PeerIdentity 
*id )
-{
-  struct Sampler *iter;
-  uint64_t count;
-
-  iter = samplers->head;
-  count = 0;
-  while ( NULL != iter )
-  {
-    if ( 0 == GNUNET_CRYPTO_cmp_peer_identity( iter->peer_id, id) )
-      count++;
-    iter = iter->next;
-  }
-  return count;
-}
-
-
-/**
- * Cleans the samplers.
- * 
- * @param samplers the samplers to clean up.
- */
-  void
-SAMPLER_samplers_destroy (struct Samplers *samplers)
-{
-  SAMPLER_samplers_resize(samplers, 0, NULL);
-  GNUNET_free(samplers);
-}
-
 /***********************************************************************
- * /Sampler
-***********************************************************************/
-
-
-
-/***********************************************************************
  * Housekeeping with peers
 ***********************************************************************/
 
@@ -637,12 +157,6 @@
 
 
 /**
- * The samplers.
- */
-static struct Samplers *sampler_list;
-
-
-/**
  * The gossiped list of peers.
  */
 static struct GNUNET_PeerIdentity *gossip_list;
@@ -665,12 +179,16 @@
 /**
  * Percentage of total peer number in the gossip list
  * to send random PUSHes to
+ *
+ * TODO do not read from configuration
  */
 static float alpha;
 
 /**
  * Percentage of total peer number in the gossip list
  * to send random PULLs to
+ *
+ * TODO do not read from configuration
  */
 static float beta;
 
@@ -696,6 +214,8 @@
 
 /**
  * List to store peers received through pushes temporary.
+ *
+ * TODO -> multipeermap
  */
 static struct GNUNET_PeerIdentity *push_list;
 
@@ -707,6 +227,8 @@
 
 /**
  * List to store peers received through pulls temporary.
+ *
+ * TODO -> multipeermap
  */
 static struct GNUNET_PeerIdentity *pull_list;
 
@@ -741,11 +263,12 @@
  * Get random peer from the gossip list.
  */
   struct GNUNET_PeerIdentity *
-get_rand_peer(struct GNUNET_PeerIdentity *peer_list, unsigned int list_size)
+get_rand_peer(const struct GNUNET_PeerIdentity *peer_list, unsigned int 
list_size)
 {
   uint64_t r_index;
   struct GNUNET_PeerIdentity *peer;
 
+  peer = GNUNET_new(struct GNUNET_PeerIdentity);
   // FIXME if we have only NULL in gossip list this will block
   // but then we might have a problem nevertheless
 
@@ -759,12 +282,13 @@
     r_index = GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_STRONG,
                                      list_size);
 
-    peer = &(peer_list[r_index]);
+    *peer = peer_list[r_index];
   } while (NULL == peer);
 
   return peer;
 }
 
+
 /**
  * Make sure the context of a given peer exists in the given peer_map.
  */
@@ -784,7 +308,7 @@
     ctx->mq = NULL;
     ctx->to_channel = NULL;
     ctx->from_channel = NULL;
-    GNUNET_CONTAINER_multipeermap_put( peer_map, peer, ctx, 
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
+    (void) GNUNET_CONTAINER_multipeermap_put( peer_map, peer, ctx, 
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
   }
 }
 
@@ -893,13 +417,14 @@
   //scale = .01;
   estimate = GNUNET_NSE_log_estimate_to_n(logestimate);
   // GNUNET_NSE_log_estimate_to_n (logestimate);
-  estimate = pow(estimate, 1./3);// * (std_dev * scale); // TODO add
+  estimate = pow(estimate, 1./3);
+  // TODO add if std_dev is a number
+  // estimate += (std_dev * scale);
   if ( 0 < estimate ) {
     LOG(GNUNET_ERROR_TYPE_DEBUG, "Changing estimate to %f\n", estimate);
     est_size = estimate;
-  } else {
+  } else
     LOG(GNUNET_ERROR_TYPE_DEBUG, "Not using estimate %f\n", estimate);
-  }
 }
 
 /**
@@ -929,26 +454,26 @@
 
   // TODO
   msg = (struct GNUNET_RPS_CS_RequestMessage *) message;
-  cli_ctx = GNUNET_SERVER_client_get_user_context(client, struct client_ctx);
+  cli_ctx = GNUNET_SERVER_client_get_user_context (client, struct client_ctx);
   if ( NULL == cli_ctx ) {
     cli_ctx = GNUNET_new(struct client_ctx);
-    cli_ctx->mq = GNUNET_MQ_queue_for_server_client(client);
-    GNUNET_SERVER_client_set_user_context(client, cli_ctx);
+    cli_ctx->mq = GNUNET_MQ_queue_for_server_client (client);
+    GNUNET_SERVER_client_set_user_context (client, cli_ctx);
   }
   
   // How many peers do we give back?
   // Wait until we have enough random peers?
 
-  ev = GNUNET_MQ_msg_extra(out_msg,
-                           GNUNET_ntohll(msg->num_peers) * sizeof(struct 
GNUNET_PeerIdentity),
-                           GNUNET_MESSAGE_TYPE_RPS_CS_REPLY);
+  ev = GNUNET_MQ_msg_extra (out_msg,
+                            GNUNET_ntohll (msg->num_peers) * sizeof (struct 
GNUNET_PeerIdentity),
+                            GNUNET_MESSAGE_TYPE_RPS_CS_REPLY);
   out_msg->num_peers = msg->num_peers; // No conversion between network and 
host order
 
-  num_peers = GNUNET_ntohll(msg->num_peers);
-  //&out_msg[1] = SAMPLER_get_n_rand_peers(sampler_list, num_peers);
+  num_peers = GNUNET_ntohll (msg->num_peers);
+  //&out_msg[1] = RPS_sampler_get_n_rand_peers(sampler_list, num_peers);
   memcpy(&out_msg[1],
-      SAMPLER_get_n_rand_peers(sampler_list, num_peers),
-      num_peers * sizeof(struct GNUNET_PeerIdentity));
+      RPS_sampler_get_n_rand_peers (num_peers),
+      num_peers * sizeof (struct GNUNET_PeerIdentity));
   
   GNUNET_MQ_send(cli_ctx->mq, ev);
   //GNUNET_MQ_destroy(mq);
@@ -974,18 +499,19 @@
     void **channel_ctx,
     const struct GNUNET_MessageHeader *msg)
 {
-  LOG(GNUNET_ERROR_TYPE_DEBUG, "PUSH received\n");
+  LOG (GNUNET_ERROR_TYPE_DEBUG, "PUSH received\n");
 
-  struct GNUNET_PeerIdentity *peer;
+  const struct GNUNET_PeerIdentity *peer;
 
   // TODO check the proof of work
   
-  peer = (struct GNUNET_PeerIdentity *) GNUNET_CADET_channel_get_info( 
channel, GNUNET_CADET_OPTION_PEER );
+  peer = (const struct GNUNET_PeerIdentity *) GNUNET_CADET_channel_get_info 
(channel, GNUNET_CADET_OPTION_PEER);
+  // FIXME wait for cadet to change this function
   
   /* Add the sending peer to the push_list */
-  LOG(GNUNET_ERROR_TYPE_DEBUG, "Adding peer to push_list of size %u\n", 
push_list_size);
-  GNUNET_array_append(push_list, push_list_size, *peer);
-  LOG(GNUNET_ERROR_TYPE_DEBUG, "Size of push_list is now %u\n", 
push_list_size);
+  LOG (GNUNET_ERROR_TYPE_DEBUG, "Adding peer to push_list of size %u\n", 
push_list_size);
+  GNUNET_array_append (push_list, push_list_size, *peer);
+  LOG (GNUNET_ERROR_TYPE_DEBUG, "Size of push_list is now %u\n", 
push_list_size);
 
   return GNUNET_OK;
 }
@@ -1018,6 +544,7 @@
   // otherwise remove from peerlist?
 
   peer = (struct GNUNET_PeerIdentity *) GNUNET_CADET_channel_get_info(channel, 
GNUNET_CADET_OPTION_PEER);
+  // FIXME wait for cadet to change this function
   LOG(GNUNET_ERROR_TYPE_DEBUG, "PULL REQUEST from peer %s received\n", 
GNUNET_i2s(peer));
 
   //mq = GNUNET_CADET_mq_create(channel); // without mq?
@@ -1062,8 +589,17 @@
   uint64_t i;
 
   // TODO check that we sent a request and that it is the first reply
-
+  if (sizeof (struct GNUNET_RPS_P2P_PullReplyMessage) < ntohs (msg->size))
+  {
+    GNUNET_break_op (0); // At the moment our own implementation seems to 
break that.
+    return GNUNET_SYSERR;
+  }
   in_msg = (struct GNUNET_RPS_P2P_PullReplyMessage *) msg;
+  if (ntohs (msg->size) - sizeof (struct GNUNET_RPS_P2P_PullReplyMessage) / 
sizeof (struct GNUNET_PeerIdentity)  != GNUNET_ntohll (in_msg->num_peers))
+  {
+    GNUNET_break_op (0);
+    return GNUNET_SYSERR;
+  }
   peers = (struct GNUNET_PeerIdentity *) &msg[1];
   for ( i = 0 ; i < GNUNET_ntohll(in_msg->num_peers) ; i++ )
   {
@@ -1089,10 +625,11 @@
   struct GNUNET_RPS_P2P_PushMessage        *push_msg;
   struct GNUNET_RPS_P2P_PullRequestMessage *pull_msg; // FIXME Send empty 
message
   struct GNUNET_MQ_Envelope *ev;
-  struct GNUNET_PeerIdentity *peer;
+  const struct GNUNET_PeerIdentity *peer;
+  struct GNUNET_MQ_Handle *mq;
 
   // TODO print lists, ...
-  // TODO cleanup peer_map
+  // TODO rendomise and spread calls herein over time
 
 
   /* Would it make sense to have one shuffeled gossip list and then
@@ -1117,7 +654,8 @@
          push_msg; */
       push_msg->placeholder = 0;
       // FIXME sometimes it returns a pointer to a freed mq
-      GNUNET_MQ_send (get_mq (peer_map, peer), ev);
+      mq = get_mq (peer_map, peer);
+      GNUNET_MQ_send (mq, ev);
 
       // modify in_flags of respective peer?
     }
@@ -1137,16 +675,17 @@
 
       ev = GNUNET_MQ_msg(pull_msg, GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST);
       //ev = GNUNET_MQ_msg_extra();
-      pull_msg->placeholder = 0;
-      GNUNET_MQ_send( get_mq(peer_map, peer), ev );
+      //pull_msg->placeholder = 0;
+      pull_msg = NULL;
+      mq = get_mq (peer_map, peer);
+      GNUNET_MQ_send (mq, ev);
       // modify in_flags of respective peer?
     }
   }
 
 
   /* If the NSE has changed adapt the lists accordingly */
-  if ( sampler_list->size != est_size )
-    SAMPLER_samplers_resize(sampler_list, est_size, own_identity);
+  RPS_sampler_resize(est_size);
 
   GNUNET_array_grow(gossip_list, gossip_list_size, est_size);
 
@@ -1165,7 +704,7 @@
 
     first_border = round(alpha * gossip_list_size);
     for ( i = 0 ; i < first_border ; i++ )
-    { // TODO use SAMPLER_get_n_rand_peers
+    { // TODO use RPS_sampler_get_n_rand_peers
       /* Update gossip list with peers received through PUSHes */
       r_index = GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_STRONG,
                                        push_list_size);
@@ -1186,9 +725,8 @@
     for ( i = second_border ; i < gossip_list_size ; i++ )
     {
       /* Update gossip list with peers from history */
-      r_index = GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_STRONG,
-                                       sampler_list->size);
-      gossip_list[i] = sampler_list->peer_ids[r_index];
+      peer = RPS_sampler_get_rand_peer();
+      gossip_list[i] = *peer;
       // TODO change the in_flags accordingly
     }
 
@@ -1204,60 +742,61 @@
 
   for ( i = 0 ; i < push_list_size ; i++ )
   {
-    SAMPLER_update_list(sampler_list, &push_list[i]);
+    RPS_sampler_update_list (&push_list[i]);
     // TODO set in_flag?
   }
 
   for ( i = 0 ; i < pull_list_size ; i++ )
   {
-    SAMPLER_update_list(sampler_list, &pull_list[i]);
+    RPS_sampler_update_list (&pull_list[i]);
     // TODO set in_flag?
   }
 
 
   /* Empty push/pull lists */
-  GNUNET_array_grow(push_list, push_list_size, 0);
+  GNUNET_array_grow (push_list, push_list_size, 0);
   push_list_size = 0; // I guess that's not necessary but doesn't hurt
-  GNUNET_array_grow(pull_list, pull_list_size, 0);
+  GNUNET_array_grow (pull_list, pull_list_size, 0);
   pull_list_size = 0; // I guess that's not necessary but doesn't hurt
 
 
   /* Schedule next round */
-  do_round_task = GNUNET_SCHEDULER_add_delayed( round_interval, &do_round, 
NULL );
-  LOG(GNUNET_ERROR_TYPE_DEBUG, "Finished round\n");
+  do_round_task = GNUNET_SCHEDULER_add_delayed (round_interval, &do_round, 
NULL);
+  LOG (GNUNET_ERROR_TYPE_DEBUG, "Finished round\n");
 }
 
 /**
  * Open a connection to given peer and store channel and mq.
  */
   void
-insertCB (void *cls, const struct GNUNET_PeerIdentity *id, struct 
GNUNET_HashCode hash)
+insertCB (void *cls, const struct GNUNET_PeerIdentity *id)
 {
   // We open a channel to be notified when this peer goes down.
-  touch_channel(peer_map, id);
+  touch_channel (peer_map, id);
 }
 
 /**
  * Close the connection to given peer and delete channel and mq.
  */
   void
-removeCB (void *cls, const struct GNUNET_PeerIdentity *id, struct 
GNUNET_HashCode hash)
+removeCB (void *cls, const struct GNUNET_PeerIdentity *id)
 {
   size_t s;
   struct peer_context *ctx;
 
-  s = SAMPLER_count_id(sampler_list, id);
-  if ( 1 >= s ) {
-    if (GNUNET_YES == GNUNET_CONTAINER_multipeermap_contains(peer_map, id))
+  s = RPS_sampler_count_id (id);
+  if ( 1 >= s )
+  {
+    if (GNUNET_YES == GNUNET_CONTAINER_multipeermap_contains (peer_map, id))
     {
-      ctx = GNUNET_CONTAINER_multipeermap_get(peer_map, id);
+      ctx = GNUNET_CONTAINER_multipeermap_get (peer_map, id);
       if (NULL != ctx->to_channel)
       {
         if (NULL != ctx->mq)
         {
-          GNUNET_MQ_destroy(ctx->mq);
+          GNUNET_MQ_destroy (ctx->mq);
         }
-        GNUNET_CADET_channel_destroy(ctx->to_channel);
+        GNUNET_CADET_channel_destroy (ctx->to_channel);
       }
       // TODO cleanup peer
       GNUNET_CONTAINER_multipeermap_remove_all(peer_map, id);
@@ -1282,27 +821,29 @@
               unsigned int best_path) // "How long is the best path?
                                       // (0 = unknown, 1 = ourselves, 2 = 
neighbor)"
 {
+  struct init_peer_cls *ipc;
+
+  ipc = (struct init_peer_cls *) cls;
   if ( NULL != peer )
   {
     LOG(GNUNET_ERROR_TYPE_DEBUG, "Got peer %s (at %p) from CADET\n", 
GNUNET_i2s(peer), peer);
-    SAMPLER_update_list(sampler_list, peer);
+    RPS_sampler_update_list(peer);
     touch_peer_ctx(peer_map, peer); // unneeded? -> insertCB
 
-    gossip_list[g_i] = *peer;
-    g_i++;
-    // FIXME find a better way to have a global counter
+    gossip_list[ipc->i] = *peer;
+    ipc->i++;
 
     // send push/pull to each of those peers?
   }
   else
   {
-    if (g_i < sampler_list->size)
+    if (ipc->i < gossip_list_size)
     {
-      memcpy(&gossip_list[g_i],
-          &(sampler_list->peer_ids[g_i]),
-          (sampler_list->size - g_i) * sizeof(struct GNUNET_PeerIdentity));
+      memcpy(&gossip_list[ipc->i],
+          RPS_sampler_get_rand_peer(),
+          (gossip_list_size - ipc->i) * sizeof(struct GNUNET_PeerIdentity));
     }
-    rps_start( (struct GNUNET_SERVER_Handle *) cls);
+    rps_start (ipc->server);
   }
 }
 
@@ -1328,7 +869,7 @@
   GNUNET_NSE_disconnect(nse);
   GNUNET_CADET_disconnect(cadet_handle);
   GNUNET_free(own_identity);
-  SAMPLER_samplers_destroy(sampler_list);
+  RPS_sampler_destroy();
   GNUNET_array_grow(gossip_list, gossip_list_size, 0);
   GNUNET_array_grow(push_list, push_list_size, 0);
   GNUNET_array_grow(pull_list, pull_list_size, 0);
@@ -1405,7 +946,8 @@
   peer = (struct GNUNET_PeerIdentity *) GNUNET_CADET_channel_get_info (
       (struct GNUNET_CADET_Channel *) channel, GNUNET_CADET_OPTION_PEER);
        // Guess simply casting isn't the nicest way...
-  SAMPLER_reinitialise_samplers_by_value(sampler_list, peer);
+       // FIXME wait for cadet to change this function
+  RPS_sampler_reinitialise_by_value(peer);
 }
 
 /**
@@ -1413,7 +955,7 @@
  */
 static void
 rps_start (struct GNUNET_SERVER_Handle *server)
-{
+{ // TODO get msg sizes right
   static const struct GNUNET_SERVER_MessageHandler handlers[] = {
     {&handle_cs_request, NULL, GNUNET_MESSAGE_TYPE_RPS_CS_REQUEST, 0},
     {NULL, NULL, 0, 0}
@@ -1453,19 +995,18 @@
 
   LOG(GNUNET_ERROR_TYPE_DEBUG, "RPS started\n");
 
+  struct init_peer_cls *ipc;
+
   cfg = c;
 
 
-  own_identity = GNUNET_new(struct GNUNET_PeerIdentity); // needed?
-
+  /* Get own ID */
+  own_identity = GNUNET_new(struct GNUNET_PeerIdentity);
   GNUNET_CRYPTO_get_peer_identity(cfg, own_identity); // TODO check return 
value
-
   GNUNET_assert(NULL != own_identity);
-
   LOG(GNUNET_ERROR_TYPE_DEBUG, "Own identity is %s (at %p).\n", 
GNUNET_i2s(own_identity), own_identity);
 
 
-
   /* Get time interval from the configuration */
   if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_time (cfg, "RPS",
                                                         "ROUNDINTERVAL",
@@ -1519,15 +1060,16 @@
                                                          "BETA",
                                                          &beta))
   {
-    LOG(GNUNET_ERROR_TYPE_DEBUG, "No BETA specified in the config\n");
+    LOG (GNUNET_ERROR_TYPE_DEBUG, "No BETA specified in the config\n");
   }
-  LOG(GNUNET_ERROR_TYPE_DEBUG, "BETA is %f\n", beta);
+  LOG (GNUNET_ERROR_TYPE_DEBUG, "BETA is %f\n", beta);
 
   // TODO check that alpha + beta < 1
 
-  peer_map = GNUNET_CONTAINER_multipeermap_create(est_size, GNUNET_NO);
+  peer_map = GNUNET_CONTAINER_multipeermap_create (est_size, GNUNET_NO);
 
 
+  /* Initialise cadet */
   static const struct GNUNET_CADET_MessageHandler cadet_handlers[] = {
     {&handle_peer_push        , GNUNET_MESSAGE_TYPE_RPS_PP_PUSH        , 0},
     {&handle_peer_pull_request, GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST, 0},
@@ -1536,28 +1078,30 @@
   };
 
   const uint32_t ports[] = {GNUNET_RPS_CADET_PORT, 0}; // _PORT specified in 
src/rps/rps.h
-  cadet_handle = GNUNET_CADET_connect(cfg,
+  cadet_handle = GNUNET_CADET_connect (cfg,
                                     cls,
                                     &handle_inbound_channel,
                                     &cleanup_channel,
                                     cadet_handlers,
                                     ports);
-  LOG(GNUNET_ERROR_TYPE_DEBUG, "Connected to CADET\n");
+  LOG (GNUNET_ERROR_TYPE_DEBUG, "Connected to CADET\n");
 
 
-  /* Initialise sampler and gossip list */
+  /* Initialise sampler */
+  RPS_sampler_init (est_size, own_identity, insertCB, NULL, removeCB, NULL);
 
-  sampler_list = SAMPLER_samplers_init(est_size, own_identity, insertCB, NULL, 
removeCB, NULL);
-
+  /* Initialise push and pull maps */
   push_list = NULL;
   push_list_size = 0;
   pull_list = NULL;
   pull_list_size = 0;
 
 
-  LOG(GNUNET_ERROR_TYPE_DEBUG, "Requesting peers from CADET\n");
-  GNUNET_CADET_get_peers(cadet_handle, &init_peer_cb, server);
-  // FIXME use magic 0000 PeerID to _start_ the service
+  ipc = GNUNET_new (struct init_peer_cls);
+  ipc->server = server;
+  ipc->i = 0;
+  LOG (GNUNET_ERROR_TYPE_DEBUG, "Requesting peers from CADET\n");
+  GNUNET_CADET_get_peers (cadet_handle, &init_peer_cb, ipc);
 
   // TODO send push/pull to each of those peers?
 }

Added: gnunet/src/rps/gnunet-service-rps_sampler.c
===================================================================
--- gnunet/src/rps/gnunet-service-rps_sampler.c                         (rev 0)
+++ gnunet/src/rps/gnunet-service-rps_sampler.c 2015-01-06 23:48:24 UTC (rev 
34826)
@@ -0,0 +1,677 @@
+/*
+     This file is part of GNUnet.
+     (C)
+
+     GNUnet is free software; you can redistribute it and/or modify
+     it under the terms of the GNU General Public License as published
+     by the Free Software Foundation; either version 3, or (at your
+     option) any later version.
+
+     GNUnet is distributed in the hope that it will be useful, but
+     WITHOUT ANY WARRANTY; without even the implied warranty of
+     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+     General Public License for more details.
+
+     You should have received a copy of the GNU General Public License
+     along with GNUnet; see the file COPYING.  If not, write to the
+     Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+     Boston, MA 02111-1307, USA.
+*/
+
+/**
+ * @file rps/gnunet-service-rps_sampler.c
+ * @brief sampler implementation
+ * @author Julius Bünger
+ */
+#include "platform.h"
+#include "gnunet_util_lib.h"
+#include "rps.h"
+
+#include "gnunet-service-rps_sampler.h"
+
+#include <math.h>
+#include <inttypes.h>
+
+#define LOG(kind, ...) GNUNET_log(kind, __VA_ARGS__)
+
+// multiple 'clients'?
+
+// TODO check for overflows
+
+// TODO align message structs
+
+// hist_size_init, hist_size_max
+
+/***********************************************************************
+ * WARNING: This section needs to be reviewed regarding the use of
+ * functions providing (pseudo)randomness!
+***********************************************************************/
+
+// TODO care about invalid input of the caller (size 0 or less...)
+
+enum RPS_SamplerEmpty
+{
+  NOT_EMPTY = 0x0,
+      EMPTY = 0x1
+};
+
+/**
+ * A sampler element sampling one PeerID at a time.
+ */
+struct RPS_SamplerElement
+{
+  /**
+   * Min-wise linear permutation used by this sampler.
+   *
+   * This is an key later used by a hmac.
+   */
+  struct GNUNET_CRYPTO_AuthKey auth_key;
+
+  /**
+   * The PeerID this sampler currently samples.
+   */
+  struct GNUNET_PeerIdentity peer_id;
+
+  /**
+   * The according hash value of this PeerID.
+   */
+  struct GNUNET_HashCode peer_id_hash;
+
+  /**
+   * Time of last request.
+   */
+  struct GNUNET_TIME_Absolute last_request;
+  
+  /**
+   * Flag that indicates that we are not holding a valid PeerID right now.
+   */
+  enum RPS_SamplerEmpty is_empty;
+};
+
+/**
+ * Sampler with its own array of SamplerElements
+ */
+struct RPS_Sampler
+{
+  /**
+   * Number of sampler elements we hold.
+   */
+  unsigned int sampler_size;
+  //size_t size;
+
+  /**
+   * All Samplers in one array.
+   */
+  struct RPS_SamplerElement **sampler_elements;
+
+  /**
+   * Index to a sampler element.
+   *
+   * Gets cycled on every hist_request.
+   */
+  uint64_t sampler_elem_index;
+
+  /**
+   * Callback to be called when a peer gets inserted into a sampler.
+   */
+  RPS_sampler_insert_cb insert_cb;
+
+  /**
+   * Closure to the insert_cb.
+   */
+  void *insert_cls;
+
+  /**
+   * Callback to be called when a peer gets inserted into a sampler.
+   */
+  RPS_sampler_remove_cb remove_cb;
+
+  /**
+   * Closure to the remove_cb.
+   */
+  void *remove_cls;
+};
+
+/**
+ * Global sampler variable.
+ */
+struct RPS_Sampler *sampler;
+
+
+/**
+ * The minimal size for the extended sampler elements.
+ */
+static size_t min_size;
+
+/**
+ * The maximal size the extended sampler elements should grow to.
+ */
+static size_t max_size;
+
+/**
+ * The size the extended sampler elements currently have.
+ */
+static size_t extra_size;
+
+/**
+ * Inedex to the sampler element that is the next to be returned
+ */
+static struct RPS_SamplerElement **extended_samplers_index;
+
+/**
+ * Request counter.
+ *
+ * Only needed in the beginning to check how many of the 64 deltas
+ * we already have
+ */
+static unsigned int req_counter;
+
+/**
+ * Time of the last request we received.
+ *
+ * Used to compute the expected request rate.
+ */
+static struct GNUNET_TIME_Absolute last_request;
+
+/**
+ * Last 64 deltas between requests
+ */
+static struct GNUNET_TIME_Relative request_deltas[64];
+
+/**
+ * The prediction of the rate of requests
+ */
+static struct GNUNET_TIME_Relative  request_rate;
+
+
+/**
+ * Sum all time relatives of an array.
+ */
+  struct GNUNET_TIME_Relative
+T_relative_sum (const struct GNUNET_TIME_Relative *rel_array, uint64_t 
arr_size)
+{
+  struct GNUNET_TIME_Relative sum;
+  uint64_t i;
+
+  sum = GNUNET_TIME_UNIT_ZERO;
+  for ( i = 0 ; i < arr_size ; i++ )
+  {
+    sum = GNUNET_TIME_relative_add (sum, rel_array[i]);
+  }
+  return sum;
+}
+
+/**
+ * Compute the average of given time relatives.
+ */
+  struct GNUNET_TIME_Relative
+T_relative_avg (const struct GNUNET_TIME_Relative *rel_array, uint64_t 
arr_size)
+{
+  return T_relative_sum (rel_array, arr_size); // FIXME find a way to devide 
that by arr_size
+}
+
+
+
+/**
+ * Reinitialise a previously initialised sampler element.
+ *
+ * @param sampler pointer to the memory that keeps the value.
+ */
+  static void
+RPS_sampler_elem_reinit (struct RPS_SamplerElement *sampler_el)
+{
+  sampler_el->is_empty = EMPTY;
+
+  // I guess I don't need to call GNUNET_CRYPTO_hmac_derive_key()...
+  GNUNET_CRYPTO_random_block(GNUNET_CRYPTO_QUALITY_STRONG,
+                             &(sampler_el->auth_key.key),
+                             GNUNET_CRYPTO_HASH_LENGTH);
+
+  sampler_el->last_request = GNUNET_TIME_UNIT_FOREVER_ABS;
+
+  /* We might want to keep the previous peer */
+
+  //GNUNET_CRYPTO_hmac(&sampler_el->auth_key, sampler_el->peer_id,
+  //                   sizeof(struct GNUNET_PeerIdentity),
+  //                   &sampler_el->peer_id_hash);
+}
+
+
+/**
+ * (Re)Initialise given Sampler with random min-wise independent function.
+ *
+ * In this implementation this means choosing an auth_key for later use in
+ * a hmac at random.
+ *
+ * @return a newly created RPS_SamplerElement which currently holds no id.
+ */
+  struct RPS_SamplerElement *
+RPS_sampler_elem_create (void)
+{
+  struct RPS_SamplerElement *s;
+  
+  s = GNUNET_new (struct RPS_SamplerElement);
+
+  RPS_sampler_elem_reinit (s);
+  LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: initialised with empty PeerID\n");
+
+  return s;
+}
+
+
+/**
+ * Input an PeerID into the given sampler.
+ */
+  static void
+RPS_sampler_elem_next (struct RPS_SamplerElement *s_elem, const struct 
GNUNET_PeerIdentity *other,
+    RPS_sampler_insert_cb insert_cb, void *insert_cls,
+    RPS_sampler_remove_cb remove_cb, void *remove_cls)
+{
+  struct GNUNET_HashCode other_hash;
+
+  if ( 0 == GNUNET_CRYPTO_cmp_peer_identity(other, &(s_elem->peer_id)) )
+  {
+    LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER:          Got PeerID %s_elem\n",
+        GNUNET_i2s(other));
+    LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Have already PeerID %s_elem\n",
+        GNUNET_i2s(&(s_elem->peer_id)));
+  }
+  else
+  {
+    GNUNET_CRYPTO_hmac(&s_elem->auth_key,
+        other,
+        sizeof(struct GNUNET_PeerIdentity),
+        &other_hash);
+
+    if ( EMPTY == s_elem->is_empty )
+    { // Or whatever is a valid way to say
+      // "we have no PeerID at the moment"
+      LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Got PeerID %s_elem; Simply 
accepting (was empty previously).\n",
+          GNUNET_i2s(other));
+      s_elem->peer_id = *other;
+      //s_elem->peer_id = other;
+      s_elem->peer_id_hash = other_hash;
+      if (NULL != sampler->insert_cb)
+      {
+        sampler->insert_cb(sampler->insert_cls, &(s_elem->peer_id));
+      }
+    }
+    else if ( 0 > GNUNET_CRYPTO_hash_cmp(&other_hash, &s_elem->peer_id_hash) )
+    {
+      LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER:            Got PeerID %s_elem\n",
+          GNUNET_i2s(other));
+      LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Discarding old PeerID %s_elem\n",
+          GNUNET_i2s(&s_elem->peer_id));
+
+      if ( NULL != sampler->remove_cb )
+      {
+        LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Removing old PeerID %s_elem 
with the remove callback.\n",
+            GNUNET_i2s(&s_elem->peer_id));
+        sampler->remove_cb(sampler->remove_cls, &s_elem->peer_id);
+      }
+
+      memcpy(&s_elem->peer_id, other, sizeof(struct GNUNET_PeerIdentity));
+      //s_elem->peer_id = other;
+      s_elem->peer_id_hash = other_hash;
+
+      if ( NULL != sampler->insert_cb )
+      {
+        LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Inserting new PeerID %s_elem 
with the insert callback.\n",
+            GNUNET_i2s(&s_elem->peer_id));
+        sampler->insert_cb(sampler->insert_cls, &s_elem->peer_id);
+      }
+    }
+    else
+    {
+      LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER:         Got PeerID %s_elem\n",
+          GNUNET_i2s(other));
+      LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Keeping old PeerID %s_elem\n",
+          GNUNET_i2s(&s_elem->peer_id));
+    }
+  }
+  s_elem->is_empty = NOT_EMPTY;
+}
+
+
+/**
+ * Grow or shrink the size of the sampler.
+ *
+ * @param new_size the new size of the sampler
+ */
+  void
+RPS_sampler_resize (unsigned int new_size)
+{
+  unsigned int old_size;
+  uint64_t i;
+  struct RPS_SamplerElement **rem_list;
+
+  // TODO check min and max size
+
+  old_size = sampler->sampler_size;
+
+  if (old_size > new_size*4 &&
+      extra_size > new_size*4)
+  { /* Shrinking */
+
+    new_size /= 2;
+
+    /* Temporary store those to properly call the removeCB on those later */
+    rem_list = GNUNET_malloc ((old_size - new_size) * sizeof (struct 
RPS_SamplerElement *));
+    memcpy (rem_list,
+        &sampler->sampler_elements[new_size],
+        (old_size - new_size) * sizeof(struct RPS_SamplerElement *));
+
+    LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Shrinking sampler %d -> %d\n", 
old_size, new_size);
+    GNUNET_array_grow (sampler->sampler_elements, sampler->sampler_size, 
new_size);
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+        "SAMPLER: sampler->sampler_elements now points to %p\n",
+        sampler->sampler_elements);
+
+    // TODO move extended_samplers_index
+    for (i = new_size ; i < old_size ; i++)
+    {/* Remove unneeded rest */
+      LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Removing %" PRIX64 ". 
sampler\n", i);
+      if (NULL != sampler->remove_cb)
+        sampler->remove_cb (sampler->remove_cls, &rem_list[i]->peer_id);
+      GNUNET_free (rem_list[i]);
+    }
+  }
+  else if (old_size < new_size)// ||
+      //extra_size < new_size) // needed?
+  { /* Growing */
+    new_size *= 2; // TODO check overflow
+
+    LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Growing sampler %d -> %d\n", 
old_size, new_size);
+    GNUNET_array_grow (sampler->sampler_elements, sampler->sampler_size, 
new_size);
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+        "SAMPLER: sampler->sampler_elements now points to %p\n",
+        sampler->sampler_elements);
+
+    // TODO move extended_samplers_index
+
+    for ( i = old_size ; i < new_size ; i++ )
+    { /* Add new sampler elements */
+      sampler->sampler_elements[i] = RPS_sampler_elem_create ();
+      if (NULL != sampler->insert_cb)
+        sampler->insert_cb (sampler->insert_cls, 
&sampler->sampler_elements[i]->peer_id);
+      LOG (GNUNET_ERROR_TYPE_DEBUG,
+          "SAMPLER: Added %" PRIX64 ". sampler, now pointing to %p, contains 
%s\n",
+          i, &sampler->sampler_elements[i], GNUNET_i2s 
(&sampler->sampler_elements[i]->peer_id));
+    }
+  }
+  else
+  {
+    LOG (GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Size remains the same -- nothing 
to do\n");
+    return;
+  }
+
+  GNUNET_assert(sampler->sampler_size == new_size);
+  LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Finished growing/shrinking.\n"); // 
remove
+}
+
+
+/**
+ * Initialise a tuple of sampler elements.
+ *
+ * @param init_size the size the sampler is initialised with
+ * @param id with which all newly created sampler elements are initialised
+ * @param ins_cb the callback that will be called on every PeerID that is 
+ *               newly inserted into a sampler element
+ * @param ins_cls the closure given to #ins_cb
+ * @param rem_cb the callback that will be called on every PeerID that is
+ *               removed from a sampler element
+ * @param rem_cls the closure given to #rem_cb
+ */
+  void
+RPS_sampler_init (size_t init_size, const struct GNUNET_PeerIdentity *id,
+    RPS_sampler_insert_cb ins_cb, void *ins_cls,
+    RPS_sampler_remove_cb rem_cb, void *rem_cls)
+{
+  //struct RPS_Sampler *sampler;
+  //uint64_t i;
+
+  /* Initialise context around extended sampler */
+  min_size = 10; // TODO make input to _samplers_init()
+  max_size = 1000; // TODO make input to _samplers_init()
+  GNUNET_new_array (64, struct GNUNET_TIME_Relative);
+
+  sampler = GNUNET_new (struct RPS_Sampler);
+  sampler->sampler_size = 0;
+  sampler->sampler_elements = NULL;
+  sampler->insert_cb = ins_cb;
+  sampler->insert_cls = ins_cls;
+  sampler->remove_cb = rem_cb;
+  sampler->remove_cls = rem_cls;
+  //sampler->sampler_elements = GNUNET_new_array(init_size, struct 
GNUNET_PeerIdentity);
+  //GNUNET_array_grow (sampler->sampler_elements, sampler->sampler_size, 
min_size);
+  RPS_sampler_resize (init_size);
+  RPS_sampler_update_list (id); // no super nice desing but ok for the moment
+
+  extended_samplers_index = sampler->sampler_elements;
+
+  //GNUNET_assert (init_size == sampler->sampler_size);
+}
+
+
+/**
+ * A fuction to update every sampler in the given list
+ *
+ * @param id the PeerID that is put in the sampler
+ */
+  void
+RPS_sampler_update_list (const struct GNUNET_PeerIdentity *id)
+{
+  uint64_t i;
+
+  for ( i = 0 ; i < sampler->sampler_size ; i++ )
+    RPS_sampler_elem_next (sampler->sampler_elements[i], id,
+        sampler->insert_cb, sampler->insert_cls,
+        sampler->remove_cb, sampler->remove_cls);
+}
+
+
+/**
+ * Reinitialise all previously initialised sampler elements with the given 
value.
+ *
+ * Used to get rid of a PeerID.
+ *
+ * @param id the id of the sampler elements to update.
+ */
+  void
+RPS_sampler_reinitialise_by_value (const struct GNUNET_PeerIdentity *id)
+{
+  uint64_t i;
+
+  for ( i = 0 ; i < sampler->sampler_size ; i++ )
+  {
+    if ( 0 == GNUNET_CRYPTO_cmp_peer_identity(id, 
&(sampler->sampler_elements[i]->peer_id)) )
+    {
+      LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Reinitialising sampler\n");
+      RPS_sampler_elem_reinit (sampler->sampler_elements[i]);
+    }
+  }
+}
+
+
+/**
+ * Get one random peer out of the sampled peers.
+ *
+ * We might want to reinitialise this sampler after giving the
+ * corrsponding peer to the client.
+ * Only used internally
+ */
+  const struct GNUNET_PeerIdentity * 
+RPS_sampler_get_rand_peer_ ()
+{
+  uint64_t r_index;
+  const struct GNUNET_PeerIdentity *peer; // do we have to malloc that?
+
+  // TODO implement extra logic
+
+  /**;
+   * Choose the r_index of the peer we want to return
+   * at random from the interval of the gossip list
+   */
+  r_index = GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_STRONG,
+      sampler->sampler_size);
+
+  //if ( EMPTY == sampler->sampler_elements[r_index]->is_empty )
+  //  // TODO schedule for later
+  //  peer = NULL;
+  //else
+    peer = &(sampler->sampler_elements[r_index]->peer_id);
+  sampler->sampler_elements[r_index]->last_request = 
GNUNET_TIME_absolute_get();
+  LOG(GNUNET_ERROR_TYPE_DEBUG, "Sgrp: Returning PeerID %s\n", 
GNUNET_i2s(peer));
+
+
+  return peer;
+}
+
+/**
+ * Get n random peers out of the sampled peers.
+ *
+ * We might want to reinitialise this sampler after giving the
+ * corrsponding peer to the client.
+ * Random with or without consumption?
+ * Only used internally
+ */
+  const struct GNUNET_PeerIdentity *
+RPS_sampler_get_n_rand_peers_ (uint64_t n)
+{
+  if ( 0 == sampler->sampler_size )
+  {
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+        "Sgrp: List empty - Returning NULL\n");
+    return NULL;
+  }
+  else
+  {
+    // TODO check if we have too much (distinct) sampled peers
+    // If we are not ready yet maybe schedule for later
+    struct GNUNET_PeerIdentity *peers;
+    uint64_t i;
+
+    peers = GNUNET_malloc (n * sizeof(struct GNUNET_PeerIdentity));
+
+    for ( i = 0 ; i < n ; i++ ) {
+      //peers[i] = RPS_sampler_get_rand_peer_(sampler->sampler_elements);
+      memcpy (&peers[i], RPS_sampler_get_rand_peer_ (), sizeof (struct 
GNUNET_PeerIdentity));
+    }
+    return peers;
+  }
+}
+
+
+/**
+ * Get one random peer out of the sampled peers.
+ *
+ * We might want to reinitialise this sampler after giving the
+ * corrsponding peer to the client.
+ *
+ * @return a random PeerID of the PeerIDs previously put into the sampler.
+ */
+  const struct GNUNET_PeerIdentity * 
+RPS_sampler_get_rand_peer ()
+{
+  struct GNUNET_PeerIdentity *peer;
+
+  if (64 > req_counter)
+    req_counter++;
+  if (1 < req_counter)
+  {
+    memcpy (&request_deltas[1],
+        request_deltas,
+        (req_counter - 1) * sizeof (struct GNUNET_TIME_Relative));
+    request_deltas[0] = GNUNET_TIME_absolute_get_difference (last_request,
+        GNUNET_TIME_absolute_get ());
+    request_rate = T_relative_avg (request_deltas, req_counter);
+  }
+  last_request = GNUNET_TIME_absolute_get();
+  // TODO resize the size of the extended_samplers
+
+  // use _get_rand_peer_ ?
+  peer = GNUNET_new (struct GNUNET_PeerIdentity);
+  *peer = (*extended_samplers_index)->peer_id;
+  RPS_sampler_elem_reinit (*extended_samplers_index);
+  if ( extended_samplers_index == 
&sampler->sampler_elements[sampler->sampler_size -1] )
+    extended_samplers_index = &sampler->sampler_elements[0];
+  else
+    extended_samplers_index++;
+  // TODO
+  return peer;
+}
+
+
+/**
+ * Get n random peers out of the sampled peers.
+ *
+ * We might want to reinitialise this sampler after giving the
+ * corrsponding peer to the client.
+ * Random with or without consumption?
+ *
+ * @return n random PeerIDs of the PeerIDs previously put into the sampler.
+ */
+  const struct GNUNET_PeerIdentity *
+RPS_sampler_get_n_rand_peers (uint64_t n)
+{
+  // use _get_rand_peers_ ?
+  if ( 0 == sampler->sampler_size )
+  {
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+        "Sgrp: List empty - Returning NULL\n");
+    return NULL;
+  }
+  else
+  {
+    // TODO check if we have too much (distinct) sampled peers
+    // If we are not ready yet maybe schedule for later
+    struct GNUNET_PeerIdentity *peers;
+    uint64_t i;
+
+    peers = GNUNET_malloc (n * sizeof(struct GNUNET_PeerIdentity));
+
+    for ( i = 0 ; i < n ; i++ ) {
+      //peers[i] = RPS_sampler_get_rand_peer_(sampler->sampler_elements);
+      memcpy (&peers[i], RPS_sampler_get_rand_peer (), sizeof (struct 
GNUNET_PeerIdentity));
+    }
+    return peers;
+  }
+}
+
+
+/**
+ * Counts how many Samplers currently hold a given PeerID.
+ *
+ * @param id the PeerID to count.
+ *
+ * @return the number of occurrences of id.
+ */
+  uint64_t
+RPS_sampler_count_id (const struct GNUNET_PeerIdentity *id)
+{
+  uint64_t count;
+  uint64_t i;
+
+  count = 0;
+  for ( i = 0 ; i < sampler->sampler_size ; i++ )
+  {
+    if ( 0 == GNUNET_CRYPTO_cmp_peer_identity 
(&sampler->sampler_elements[i]->peer_id, id) 
+        && EMPTY != sampler->sampler_elements[i]->is_empty)
+      count++;
+  }
+  return count;
+}
+
+
+/**
+ * Cleans the sampler.
+ */
+  void
+RPS_sampler_destroy ()
+{
+  RPS_sampler_resize (0);
+  GNUNET_free (request_deltas); // _array_grow()?
+  GNUNET_array_grow (sampler->sampler_elements, sampler->sampler_size, 0);
+}
+
+/* end of gnunet-service-rps.c */

Added: gnunet/src/rps/gnunet-service-rps_sampler.h
===================================================================
--- gnunet/src/rps/gnunet-service-rps_sampler.h                         (rev 0)
+++ gnunet/src/rps/gnunet-service-rps_sampler.h 2015-01-06 23:48:24 UTC (rev 
34826)
@@ -0,0 +1,147 @@
+/*
+     This file is part of GNUnet.
+     (C)
+
+     GNUnet is free software; you can redistribute it and/or modify
+     it under the terms of the GNU General Public License as published
+     by the Free Software Foundation; either version 3, or (at your
+     option) any later version.
+
+     GNUnet is distributed in the hope that it will be useful, but
+     WITHOUT ANY WARRANTY; without even the implied warranty of
+     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+     General Public License for more details.
+
+     You should have received a copy of the GNU General Public License
+     along with GNUnet; see the file COPYING.  If not, write to the
+     Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+     Boston, MA 02111-1307, USA.
+*/
+
+/**
+ * @file rps/gnunet-service-rps_sampler.h
+ * @brief sampler implementation
+ * @author Julius Bünger
+ */
+
+#ifndef RPS_SAMPLER_H
+#define RPS_SAMPLER_H
+#include <inttypes.h>
+
+/**
+ * Callback that is called when a new PeerID is inserted into a sampler.
+ *
+ * @param cls the closure given alongside this function.
+ * @param id the PeerID that is inserted
+ */
+typedef void
+(*RPS_sampler_insert_cb) (void *cls,
+    const struct GNUNET_PeerIdentity *id);
+
+/**
+ * Callback that is called when a new PeerID is removed from a sampler.
+ *
+ * @param cls the closure given alongside this function.
+ * @param id the PeerID that is removed
+ */
+typedef void
+(*RPS_sampler_remove_cb) (void *cls,
+    const struct GNUNET_PeerIdentity *id);
+
+/**
+ * A sampler sampling a stream of PeerIDs.
+ */
+//struct RPS_Sampler;
+
+
+/**
+ * Grow or shrink the size of the sampler.
+ *
+ * @param new_size the new size of the sampler
+ */
+  void
+RPS_sampler_resize (unsigned int new_size);
+
+
+/**
+ * Initialise a tuple of samplers.
+ *
+ * @param init_size the size the sampler is initialised with
+ * @param id with which all newly created sampler elements are initialised
+ * @param ins_cb the callback that will be called on every PeerID that is 
+ *               newly inserted into a sampler element
+ * @param ins_cls the closure given to #ins_cb
+ * @param rem_cb the callback that will be called on every PeerID that is
+ *               removed from a sampler element
+ * @param rem_cls the closure given to #rem_cb
+ */
+  void
+RPS_sampler_init (size_t init_size, const struct GNUNET_PeerIdentity *id,
+    RPS_sampler_insert_cb ins_cb, void *ins_cls,
+    RPS_sampler_remove_cb rem_cb, void *rem_cls);
+
+
+/**
+ * A fuction to update every sampler in the given list
+ *
+ * @param id the PeerID that is put in the sampler
+ */
+  void
+RPS_sampler_update_list (const struct GNUNET_PeerIdentity *id);
+
+
+/**
+ * Reinitialise all previously initialised sampler elements with the given 
value.
+ *
+ * Used to get rid of a PeerID.
+ *
+ * @param id the id of the samplers to update.
+ */
+  void
+RPS_sampler_reinitialise_by_value (const struct GNUNET_PeerIdentity *id);
+
+
+/**
+ * Get one random peer out of the sampled peers.
+ *
+ * We might want to reinitialise this sampler after giving the
+ * corrsponding peer to the client.
+ *
+ * @return a random PeerID of the PeerIDs previously put into the sampler.
+ */
+  const struct GNUNET_PeerIdentity * 
+RPS_sampler_get_rand_peer ();
+
+
+/**
+ * Get n random peers out of the sampled peers.
+ *
+ * We might want to reinitialise this sampler after giving the
+ * corrsponding peer to the client.
+ * Random with or without consumption?
+ *
+ * @return n random PeerIDs of the PeerIDs previously put into the sampler.
+ */
+  const struct GNUNET_PeerIdentity *
+RPS_sampler_get_n_rand_peers (uint64_t n);
+
+
+/**
+ * Counts how many Samplers currently hold a given PeerID.
+ *
+ * @param id the PeerID to count.
+ *
+ * @return the number of occurrences of id.
+ */
+  uint64_t
+RPS_sampler_count_id (const struct GNUNET_PeerIdentity *id);
+
+
+/**
+ * Cleans the samplers.
+ */
+  void
+RPS_sampler_destroy ();
+
+#endif
+/* end of gnunet-service-rps.c */




reply via email to

[Prev in Thread] Current Thread [Next in Thread]