gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r34742 - gnunet/src/rps


From: gnunet
Subject: [GNUnet-SVN] r34742 - gnunet/src/rps
Date: Sat, 20 Dec 2014 16:57:31 +0100

Author: ch3
Date: 2014-12-20 16:57:31 +0100 (Sat, 20 Dec 2014)
New Revision: 34742

Modified:
   gnunet/src/rps/gnunet-service-rps.c
Log:
Cleaned up

Modified: gnunet/src/rps/gnunet-service-rps.c
===================================================================
--- gnunet/src/rps/gnunet-service-rps.c 2014-12-20 15:37:11 UTC (rev 34741)
+++ gnunet/src/rps/gnunet-service-rps.c 2014-12-20 15:57:31 UTC (rev 34742)
@@ -42,12 +42,12 @@
 
 // TODO align message structs
 
-// TODO multipeerlist indep of gossiped list
-
 // (TODO api -- possibility of getting weak random peer immideately)
 
 // TODO malicious peer
 
+// TODO Change API to accept initialisation peers
+
 /**
  * Our configuration.
  */
@@ -74,6 +74,28 @@
 // It might be interesting to formulate this independent of PeerIDs.
 
 /**
+ * Callback that is called when a new PeerID is inserted into a sampler.
+ *
+ * @param cls the closure given alongside this function.
+ * @param id the PeerID that is inserted
+ * @param hash the hash the sampler produced of the PeerID
+ */
+typedef void (* SAMPLER_insertCB) (void *cls,
+    const struct GNUNET_PeerIdentity *id,
+    struct GNUNET_HashCode hash);
+
+/**
+ * Callback that is called when a new PeerID is removed from a sampler.
+ *
+ * @param cls the closure given alongside this function.
+ * @param id the PeerID that is removed
+ * @param hash the hash the sampler produced of the PeerID
+ */
+typedef void (* SAMPLER_removeCB) (void *cls,
+    const struct GNUNET_PeerIdentity *id,
+    struct GNUNET_HashCode hash);
+
+/**
  * A sampler sampling PeerIDs.
  */
 struct Sampler
@@ -124,6 +146,26 @@
   struct GNUNET_PeerIdentity *peer_ids;
 
   /**
+   * Callback to be called when a peer gets inserted into a sampler.
+   */
+  SAMPLER_insertCB insertCB;
+
+  /**
+   * Closure to the insertCB.
+   */
+  void *insertCLS;
+
+  /**
+   * Callback to be called when a peer gets inserted into a sampler.
+   */
+  SAMPLER_removeCB removeCB;
+
+  /**
+   * Closure to the removeCB.
+   */
+  void *removeCLS;
+
+  /**
    * The head of the DLL.
    */
   struct Sampler *head;
@@ -135,9 +177,6 @@
 
 };
 
-// TODO change to updateCB and call on updates in general
-typedef void (* SAMPLER_deleteCB) (void *cls, const struct GNUNET_PeerIdentity 
*id, struct GNUNET_HashCode hash);
-
 /**
  * (Re)Initialise given Sampler with random min-wise independent function.
  *
@@ -161,9 +200,7 @@
 
   GNUNET_assert(NULL != id);
   s->peer_id = id;
-  LOG(GNUNET_ERROR_TYPE_DEBUG, "Modified sampler->peer_id in _init()\n");
   memcpy(s->peer_id, own_identity, sizeof(struct GNUNET_PeerIdentity)); // 
FIXME this should probably be NULL -- the caller has to handle those.
-  LOG(GNUNET_ERROR_TYPE_DEBUG, "Modified sampler->peer_id content in 
_init()\n");
   //s->peer_id = own_identity; // Maybe set to own PeerID. So we always have
                      // a valid PeerID in the sampler.
                      // Maybe take a PeerID as second argument.
@@ -210,8 +247,9 @@
  */
   static void
 SAMPLER_next(struct Sampler *s, const struct GNUNET_PeerIdentity *other,
-             SAMPLER_deleteCB del_cb, void *cb_cls)
-  // TODO set id in peer_ids
+    SAMPLER_insertCB insertCB, void *insertCLS,
+    SAMPLER_removeCB removeCB, void *removeCLS)
+  // TODO call update herein
 {
   struct GNUNET_HashCode other_hash;
 
@@ -240,9 +278,12 @@
       LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Got PeerID %s; Simply accepting 
(got NULL previously).\n",
           GNUNET_i2s(other));
       memcpy(s->peer_id, other, sizeof(struct GNUNET_PeerIdentity));
-      LOG(GNUNET_ERROR_TYPE_DEBUG, "Modified sampler->peer_id content in 
_next()\n");
       //s->peer_id = other;
       s->peer_id_hash = other_hash;
+      if (NULL != insertCB)
+      {
+        insertCB(insertCLS, s->peer_id, s->peer_id_hash);
+      }
     }
     else if ( 0 > hash_cmp(&other_hash, &s->peer_id_hash) )
     {
@@ -251,17 +292,23 @@
       LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Discarding old PeerID %s\n",
           GNUNET_i2s(s->peer_id));
 
-      if ( NULL != del_cb )
+      if ( NULL != removeCB )
       {
-        LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Removing old PeerID %s with the 
delete callback.\n",
+        LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Removing old PeerID %s with the 
remove callback.\n",
             GNUNET_i2s(s->peer_id));
-        del_cb(cb_cls, s->peer_id, s->peer_id_hash);
+        removeCB(removeCLS, s->peer_id, s->peer_id_hash);
       }
 
       memcpy(s->peer_id, other, sizeof(struct GNUNET_PeerIdentity));
-      LOG(GNUNET_ERROR_TYPE_DEBUG, "Modified sampler->peer_id content in 
_next()\n");
       //s->peer_id = other;
       s->peer_id_hash = other_hash;
+
+      if ( NULL != insertCB )
+      {
+        LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Inserting new PeerID %s with 
the insert callback.\n",
+            GNUNET_i2s(s->peer_id));
+        insertCB(insertCLS, s->peer_id, s->peer_id_hash);
+      }
     }
     else
     {
@@ -289,7 +336,7 @@
 {
   if ( samplers->size == new_size )
   {
-    LOG(GNUNET_ERROR_TYPE_DEBUG, "Size remains the same -- nothing to do\n");
+    LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Size remains the same -- nothing to 
do\n");
     return;
   }
 
@@ -299,12 +346,27 @@
   struct Sampler *tmp;
 
   old_size = samplers->size;
-  LOG(GNUNET_ERROR_TYPE_DEBUG, "Growing/Shrinking samplers %u -> %u\n", 
old_size, new_size);
+  LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Growing/Shrinking samplers %u -> 
%u\n", old_size, new_size);
+
+  iter = samplers->head;
+
+  if ( new_size < old_size )
+  {
+    for ( i = new_size ; i < old_size ; i++ )
+    {/* Remove unneeded rest */
+      tmp = iter->next;
+      LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Removing %" PRIX64 ". sampler\n", 
i);
+      if (NULL != samplers->removeCB)
+        samplers->removeCB(samplers->removeCLS, iter->peer_id, 
iter->peer_id_hash);
+      GNUNET_CONTAINER_DLL_remove(samplers->head, samplers->tail, iter);
+      GNUNET_free(iter);
+      iter = tmp;
+    }
+  }
+
   GNUNET_array_grow(samplers->peer_ids, samplers->size, new_size);
-  LOG(GNUNET_ERROR_TYPE_DEBUG, "Modified samplers->peer_ids in 
_samplers_resize()\n");
-  LOG(GNUNET_ERROR_TYPE_DEBUG, "samplers->peer_ids now points to %p\n", 
samplers->peer_ids);
+  LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: samplers->peer_ids now points to 
%p\n", samplers->peer_ids);
 
-  iter = samplers->head;
   if ( new_size > old_size )
   { /* Growing */
     GNUNET_assert( NULL != fill_up_id );
@@ -313,8 +375,7 @@
       if ( i < old_size )
       { /* Update old samplers */
         iter->peer_id = &samplers->peer_ids[i];
-        LOG(GNUNET_ERROR_TYPE_DEBUG, "Modified sampler->peer_id in 
_samplers_resize()\n");
-        LOG(GNUNET_ERROR_TYPE_DEBUG, "Updated %" PRIX64 ". sampler, now 
pointing to %p, contains %s\n",
+        LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Updated %" PRIX64 ". sampler, 
now pointing to %p, contains %s\n",
             i, &samplers->peer_ids[i], GNUNET_i2s(iter->peer_id));
         iter = iter->next;
       }
@@ -322,37 +383,32 @@
       { /* Add new samplers */
         memcpy(&samplers->peer_ids[i], fill_up_id, sizeof(struct 
GNUNET_PeerIdentity));
         iter = SAMPLER_init(&samplers->peer_ids[i]);
+        if (NULL != samplers->insertCB)
+        {
+          samplers->insertCB(samplers->insertCLS, iter->peer_id, 
iter->peer_id_hash);
+        }
         GNUNET_CONTAINER_DLL_insert_tail(samplers->head, samplers->tail, iter);
-        LOG(GNUNET_ERROR_TYPE_DEBUG, "Added %" PRIX64 ". sampler, now pointing 
to %p, contains %s\n",
+        LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Added %" PRIX64 ". sampler, now 
pointing to %p, contains %s\n",
             i, &samplers->peer_ids[i], GNUNET_i2s(iter->peer_id));
       }
     }
   }
   else// if ( new_size < old_size )
   { /* Shrinking */
-    for ( i = 0 ; i < old_size ; i++)
+    for ( i = 0 ; i < new_size ; i++)
     { /* All samplers */
       tmp = iter->next;
-      if ( i < new_size )
-      { /* Update remaining samplers */
-        iter->peer_id = &samplers->peer_ids[i];
-        LOG(GNUNET_ERROR_TYPE_DEBUG, "Modified sampler->peer_id in 
_samplers_resize()\n");
-        LOG(GNUNET_ERROR_TYPE_DEBUG, "Updatied %" PRIX64 ". sampler, now 
pointing to %p, contains %s\n",
-            i, &samplers->peer_ids[i], GNUNET_i2s(iter->peer_id));
-      }
-      else
-      { /* Remove unneeded rest */
-        LOG(GNUNET_ERROR_TYPE_DEBUG, "Removing %" PRIX64 ". sampler\n", i);
-        // TODO call delCB on elem?
-        GNUNET_CONTAINER_DLL_remove(samplers->head, samplers->tail, iter);
-        GNUNET_free(iter);
-      }
+      /* Update remaining samplers */
+      iter->peer_id = &samplers->peer_ids[i];
+      LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Updatied %" PRIX64 ". sampler, 
now pointing to %p, contains %s\n",
+          i, &samplers->peer_ids[i], GNUNET_i2s(iter->peer_id));
+
       iter = tmp;
     }
   }
 
   GNUNET_assert(samplers->size == new_size);
-  LOG(GNUNET_ERROR_TYPE_DEBUG, "Finished growing/shrinking.\n");
+  LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Finished growing/shrinking.\n");
 }
 
 
@@ -360,7 +416,9 @@
  * Initialise a tuple of samplers.
  */
 struct Samplers *
-SAMPLER_samplers_init(size_t init_size, struct GNUNET_PeerIdentity *id)
+SAMPLER_samplers_init(size_t init_size, struct GNUNET_PeerIdentity *id,
+    SAMPLER_insertCB insertCB, void *insertCLS,
+    SAMPLER_removeCB removeCB, void *removeCLS)
 {
   struct Samplers *samplers;
   //struct Sampler *s;
@@ -368,9 +426,12 @@
 
   samplers = GNUNET_new(struct Samplers);
   samplers->size = 0;
+  samplers->peer_ids = NULL;
+  samplers->insertCB = insertCB;
+  samplers->insertCLS = insertCLS;
+  samplers->removeCB = removeCB;
+  samplers->removeCLS = removeCLS;
   samplers->head = samplers->tail = NULL;
-  samplers->peer_ids = NULL;
-  LOG(GNUNET_ERROR_TYPE_DEBUG, "Modified samplers->peer_ids in 
_samplers_init()\n");
   //samplers->peer_ids = GNUNET_new_array(init_size, struct 
GNUNET_PeerIdentity);
 
   SAMPLER_samplers_resize(samplers, init_size, id);
@@ -380,7 +441,6 @@
   //  GNUNET_array_append(samplers->peer_ids,
   //      samplers->size,
   //      *id);
-  //  LOG(GNUNET_ERROR_TYPE_DEBUG, "Modified samplers->peer_ids in 
_samplers_init()\n");
   //  s = SAMPLER_init(&samplers->peer_ids[i]);
   //  GNUNET_CONTAINER_DLL_insert_tail(samplers->head,
   //      samplers->tail,
@@ -396,15 +456,16 @@
  * A fuction to update every sampler in the given list
  */
   static void
-SAMPLER_update_list(struct Samplers *samplers, const struct 
GNUNET_PeerIdentity *id,
-                    SAMPLER_deleteCB del_cb, void *cb_cls)
+SAMPLER_update_list(struct Samplers *samplers, const struct 
GNUNET_PeerIdentity *id)
 {
   struct Sampler *iter;
 
   iter = samplers->head;
   while ( NULL != iter->next )
   {
-    SAMPLER_next(iter, id, del_cb, cb_cls);
+    SAMPLER_next(iter, id,
+        samplers->insertCB, samplers->insertCLS,
+        samplers->removeCB, samplers->removeCLS);
     iter = iter->next;
   }
   
@@ -468,7 +529,7 @@
  * Counts how many Samplers currently hold a given PeerID.
  */
   uint64_t
-SAMPLER_count_id ( struct Samplers *samplers, struct GNUNET_PeerIdentity *id )
+SAMPLER_count_id ( struct Samplers *samplers, const struct GNUNET_PeerIdentity 
*id )
 {
   struct Sampler *iter;
   uint64_t count;
@@ -710,51 +771,111 @@
 }
 
 /**
+ * Make sure the context of a given peer exists.
+ */
+  void
+touch_peer_ctx (struct GNUNET_CONTAINER_MultiPeerMap *peer_map, const struct 
GNUNET_PeerIdentity *peer)
+{
+  struct peer_context *ctx;
+
+  if ( GNUNET_YES == GNUNET_CONTAINER_multipeermap_contains( peer_map, peer ) )
+  {
+    ctx = GNUNET_CONTAINER_multipeermap_get(peer_map, peer);
+  }
+  else
+  {
+    ctx = GNUNET_malloc(sizeof(struct peer_context));
+    ctx->in_flags = 0;
+    ctx->mq = NULL;
+    ctx->to_channel = NULL;
+    ctx->from_channel = NULL;
+    GNUNET_CONTAINER_multipeermap_put( peer_map, peer, ctx, 
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
+  }
+}
+
+/**
+ * Get the context of a peer. If not existing, create.
+ */
+  struct peer_context *
+get_peer_ctx (struct GNUNET_CONTAINER_MultiPeerMap *peer_map, const struct 
GNUNET_PeerIdentity *peer)
+{
+  struct peer_context *ctx;
+
+  touch_peer_ctx(peer_map, peer);
+  ctx = GNUNET_CONTAINER_multipeermap_get(peer_map, peer);
+  return ctx;
+}
+
+/**
+ * Get the channel of a peer. If not existing, create.
+ */
+  void
+touch_channel (struct GNUNET_CONTAINER_MultiPeerMap *peer_map, const struct 
GNUNET_PeerIdentity *peer)
+{
+  struct peer_context *ctx;
+
+  ctx = get_peer_ctx (peer_map, peer);
+  if (NULL == ctx->to_channel)
+  {
+    ctx->to_channel = GNUNET_CADET_channel_create(cadet_handle, NULL, peer,
+                                                  GNUNET_RPS_CADET_PORT,
+                                                  
GNUNET_CADET_OPTION_RELIABLE);
+    //TODO do I have to explicitly put it in the peer_map?
+  }
+}
+
+/**
+ * Get the channel of a peer. If not existing, create.
+ */
+  struct GNUNET_CADET_Channel *
+get_channel (struct GNUNET_CONTAINER_MultiPeerMap *peer_map, const struct 
GNUNET_PeerIdentity *peer)
+{
+  struct peer_context *ctx;
+
+  ctx = get_peer_ctx (peer_map, peer);
+  touch_channel(peer_map, peer);
+  return ctx->to_channel;
+}
+
+/**
+ * Make sure the mq for a given peer exists.
+ *
+ * If we already have a message queue open to this client,
+ * simply return it, otherways create one.
+ */
+  void
+touch_mq (struct GNUNET_CONTAINER_MultiPeerMap *peer_map, const struct 
GNUNET_PeerIdentity *peer_id)
+{
+  struct peer_context *ctx;
+
+  ctx = get_peer_ctx(peer_map, peer_id);
+  if (NULL == ctx->mq)
+  {
+    touch_channel(peer_map, peer_id);
+    ctx->mq = GNUNET_CADET_mq_create(ctx->to_channel);
+    //TODO do I have to explicitly put it in the peer_map?
+    //GNUNET_CONTAINER_multipeermap_put(peer_map, peer_id, ctx,
+    //                                  
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
+  }
+}
+
+/**
  * Get the message queue of a specific peer.
  *
  * If we already have a message queue open to this client,
  * simply return it, otherways create one.
  */
   struct GNUNET_MQ_Handle *
-get_mq (struct GNUNET_CONTAINER_MultiPeerMap *peer_map, struct 
GNUNET_PeerIdentity *peer_id)
+get_mq (struct GNUNET_CONTAINER_MultiPeerMap *peer_map, const struct 
GNUNET_PeerIdentity *peer_id)
 {
   struct peer_context *ctx;
-  struct GNUNET_MQ_Handle * mq;
-  struct GNUNET_CADET_Channel *channel;
 
-  if ( GNUNET_OK != GNUNET_CONTAINER_multipeermap_contains( peer_map, peer_id 
) ) {
+  ctx = get_peer_ctx(peer_map, peer_id);
+  touch_mq(peer_map, peer_id);
 
-    channel = GNUNET_CADET_channel_create(cadet_handle, NULL, peer_id,
-                                  GNUNET_RPS_CADET_PORT,
-                                  GNUNET_CADET_OPTION_RELIABLE);
-    mq = GNUNET_CADET_mq_create(channel);
-
-    ctx = GNUNET_malloc(sizeof(struct peer_context));
-    ctx->in_flags = 0;
-    ctx->to_channel = channel;
-    ctx->mq = mq;
-
-    GNUNET_CONTAINER_multipeermap_put(peer_map, peer_id, ctx,
-                                      
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
-  } else {
-    ctx = GNUNET_CONTAINER_multipeermap_get(peer_map, peer_id);
-    if ( NULL == ctx->mq ) {
-      if ( NULL == ctx->to_channel ) {
-        channel = GNUNET_CADET_channel_create(cadet_handle, NULL, peer_id,
-                                      GNUNET_RPS_CADET_PORT,
-                                      GNUNET_CADET_OPTION_RELIABLE);
-        ctx->to_channel = channel;
-      }
-
-      mq = GNUNET_CADET_mq_create(ctx->to_channel);
-      ctx->mq = mq;
-    }
-  }
-
   return ctx->mq;
 }
 
-
 /***********************************************************************
  * /Util functions
 ***********************************************************************/
@@ -819,29 +940,21 @@
     GNUNET_SERVER_client_set_user_context(client, cli_ctx);
   }
   
-  //mq = GNUNET_MQ_queue_for_server_client(client);
-    
   // TODO How many peers do we give back?
   // Wait until we have enough random peers?
 
   ev = GNUNET_MQ_msg_extra(out_msg,
                            GNUNET_ntohll(msg->num_peers) * sizeof(struct 
GNUNET_PeerIdentity),
                            GNUNET_MESSAGE_TYPE_RPS_CS_REPLY);
-  out_msg->num_peers = GNUNET_ntohll(msg->num_peers);
+  out_msg->num_peers = msg->num_peers; // No conversion between network and 
host order
 
   num_peers = GNUNET_ntohll(msg->num_peers);
   //&out_msg[1] = SAMPLER_get_n_rand_peers(sampler_list, num_peers);
   memcpy(&out_msg[1],
       SAMPLER_get_n_rand_peers(sampler_list, num_peers),
       num_peers * sizeof(struct GNUNET_PeerIdentity));
-  //for ( i = 0 ; i < num_peers ; i++ ) {
-  //  memcpy(&out_msg[1] + i * sizeof(struct GNUNET_PeerIdentity),
-  //         SAMPLER_get_rand_peer(sampler_list),
-  //         sizeof(struct GNUNET_PeerIdentity));
-  //}
   
   GNUNET_MQ_send(cli_ctx->mq, ev);
-  //GNUNET_MQ_send(mq, ev);
   //GNUNET_MQ_destroy(mq);
 
   GNUNET_SERVER_receive_done (client,
@@ -973,22 +1086,6 @@
 
 
 /**
- * Callback called when a Sampler is updated.
- */
-  void
-delete_cb (void *cls, struct GNUNET_PeerIdentity *id, struct GNUNET_HashCode 
hash)
-{
-  size_t s;
-
-  s = SAMPLER_count_id(sampler_list, id);
-  if ( 1 >= s ) {
-    // TODO cleanup peer
-    GNUNET_CONTAINER_multipeermap_remove_all( peer_map, id);
-  }
-}
-
-
-/**
  * Send out PUSHes and PULLs.
  *
  * This is executed regylary.
@@ -1010,16 +1107,12 @@
 
 
   /* If the NSE has changed adapt the lists accordingly */
-  // TODO check nse == 0!
-  LOG(GNUNET_ERROR_TYPE_DEBUG, "Checking size estimate.\n");
   if ( sampler_list->size != est_size )
     SAMPLER_samplers_resize(sampler_list, est_size, own_identity);
 
   GNUNET_array_grow(gossip_list, gossip_list_size, est_size);
 
-  gossip_list_size = sampler_list->size = est_size;
 
-
   /* Would it make sense to have one shuffeled gossip list and then
    * to send PUSHes to first alpha peers, PULL requests to next beta peers and
    * use the rest to update sampler?
@@ -1063,21 +1156,21 @@
   }
 
 
-
-
   /* Update gossip list */
   uint64_t r_index;
 
   if ( push_list_size <= alpha * gossip_list_size &&
        push_list_size != 0 &&
-       pull_list_size != 0 ) {
+       pull_list_size != 0 )
+  {
     LOG(GNUNET_ERROR_TYPE_DEBUG, "Update of the gossip list. ()\n");
 
     uint64_t first_border;
     uint64_t second_border;
 
     first_border = round(alpha * gossip_list_size);
-    for ( i = 0 ; i < first_border ; i++ ) { // TODO use 
SAMPLER_get_n_rand_peers
+    for ( i = 0 ; i < first_border ; i++ )
+    { // TODO use SAMPLER_get_n_rand_peers
       /* Update gossip list with peers received through PUSHes */
       r_index = GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_STRONG,
                                        push_list_size);
@@ -1086,7 +1179,8 @@
     }
 
     second_border = first_border + round(beta * gossip_list_size);
-    for ( i = first_border ; i < second_border ; i++ ) {
+    for ( i = first_border ; i < second_border ; i++ )
+    {
       /* Update gossip list with peers received through PULLs */
       r_index = GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_STRONG,
                                        pull_list_size);
@@ -1094,7 +1188,8 @@
       // TODO change the in_flags accordingly
     }
 
-    for ( i = second_border ; i < gossip_list_size ; i++ ) {
+    for ( i = second_border ; i < gossip_list_size ; i++ )
+    {
       /* Update gossip list with peers from history */
       r_index = GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_STRONG,
                                        sampler_list->size);
@@ -1102,7 +1197,9 @@
       // TODO change the in_flags accordingly
     }
 
-  } else {
+  }
+  else
+  {
     LOG(GNUNET_ERROR_TYPE_DEBUG, "No update of the gossip list. ()\n");
   }
   // TODO independent of that also get some peers from CADET_get_peers()?
@@ -1112,35 +1209,66 @@
 
   for ( i = 0 ; i < push_list_size ; i++ )
   {
-    SAMPLER_update_list(sampler_list, &push_list[i], NULL, NULL);
+    SAMPLER_update_list(sampler_list, &push_list[i]);
     // TODO set in_flag?
   }
 
   for ( i = 0 ; i < pull_list_size ; i++ )
   {
-    SAMPLER_update_list(sampler_list, &pull_list[i], NULL, NULL);
+    SAMPLER_update_list(sampler_list, &pull_list[i]);
     // TODO set in_flag?
   }
 
 
-  // TODO go over whole peer_map and do cleanups
-  // delete unneeded peers, set in_flags, check channel/mq
-  // -- already done with deleteCB?
-
-
   /* Empty push/pull lists */
   GNUNET_array_grow(push_list, push_list_size, 0);
-  push_list_size = 0; // TODO I guess that's not necessary but doesn't hurt
+  push_list_size = 0; // I guess that's not necessary but doesn't hurt
   GNUNET_array_grow(pull_list, pull_list_size, 0);
-  pull_list_size = 0; // TODO I guess that's not necessary but doesn't hurt
+  pull_list_size = 0; // I guess that's not necessary but doesn't hurt
 
 
   /* Schedule next round */
-  // TODO
   do_round_task = GNUNET_SCHEDULER_add_delayed( round_interval, &do_round, 
NULL );
   LOG(GNUNET_ERROR_TYPE_DEBUG, "Finished round\n");
 }
 
+/**
+ * Open a connection to given peer and store channel and mq.
+ */
+  void
+insertCB (void *cls, const struct GNUNET_PeerIdentity *id, struct 
GNUNET_HashCode hash)
+{
+  touch_mq(peer_map, id);
+}
+
+/**
+ * Close the connection to given peer and delete channel and mq.
+ */
+  void
+removeCB (void *cls, const struct GNUNET_PeerIdentity *id, struct 
GNUNET_HashCode hash)
+{
+  size_t s;
+  struct peer_context *ctx;
+
+  s = SAMPLER_count_id(sampler_list, id);
+  if ( 1 >= s ) {
+    if (GNUNET_YES == GNUNET_CONTAINER_multipeermap_contains(peer_map, id))
+    {
+      ctx = GNUNET_CONTAINER_multipeermap_get(peer_map, id);
+      if (NULL != ctx->to_channel)
+      {
+        if (NULL != ctx->mq)
+        {
+          GNUNET_MQ_destroy(ctx->mq);
+        }
+        GNUNET_CADET_channel_destroy(ctx->to_channel);
+      }
+      // TODO cleanup peer
+      GNUNET_CONTAINER_multipeermap_remove_all(peer_map, id);
+    }
+  }
+}
+
 static void
 rps_start (struct GNUNET_SERVER_Handle *server);
 
@@ -1158,28 +1286,19 @@
               unsigned int best_path) // "How long is the best path?
                                       // (0 = unknown, 1 = ourselves, 2 = 
neighbor)"
 {
-  if ( NULL != peer ) {
+  if ( NULL != peer )
+  {
     LOG(GNUNET_ERROR_TYPE_DEBUG, "Got peer %s (at %p) from CADET\n", 
GNUNET_i2s(peer), peer);
-    SAMPLER_update_list(sampler_list, peer, NULL, NULL);
-    // TODO put the following part in a function of its own.
-    if ( GNUNET_YES == GNUNET_CONTAINER_multipeermap_contains( peer_map, peer 
) ) {
-      ;
-    } else {
-      struct peer_context *ctx;
+    SAMPLER_update_list(sampler_list, peer);
+    touch_peer_ctx(peer_map, peer);
 
-      ctx = GNUNET_malloc(sizeof(struct peer_context));
-      ctx->in_flags = 0;
-      ctx->mq = NULL;
-      ctx->to_channel = NULL;
-      ctx->from_channel = NULL;
-      GNUNET_CONTAINER_multipeermap_put( peer_map, peer, ctx, 
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
-    }
-
     uint64_t i;
     i = GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_STRONG, 
gossip_list_size);
     gossip_list[i] = *peer;
     // TODO send push/pull to each of those peers?
-  } else {
+  }
+  else
+  {
     rps_start( (struct GNUNET_SERVER_Handle *) cls);
   }
 }
@@ -1251,7 +1370,7 @@
 
   GNUNET_assert( NULL != channel );
 
-  // TODO we might even not store the from_channel
+  // TODO we might not even store the from_channel
 
   if ( GNUNET_CONTAINER_multipeermap_contains( peer_map, initiator ) ) {
     ((struct peer_context *) GNUNET_CONTAINER_multipeermap_get( peer_map, 
initiator ))->from_channel = channel;
@@ -1282,6 +1401,8 @@
                 void *channel_ctx)
 {
   LOG(GNUNET_ERROR_TYPE_DEBUG, "Channel was destroyed by remote peer.\n");
+  // TODO test whether that was a peer in the samplers/a peer we opened a 
connection to
+  //      and if so, reinitialise the sampler
 }
 
 /**
@@ -1405,17 +1526,6 @@
   peer_map = GNUNET_CONTAINER_multipeermap_create(est_size, GNUNET_NO);
 
 
-  /* Initialise sampler and gossip list */
-
-  sampler_list = SAMPLER_samplers_init(est_size, own_identity);
-
-  push_list = NULL;
-  //GNUNET_array_grow(push_list, push_list_size, 0);
-  push_list_size = 0;
-  pull_list = NULL;
-  //GNUNET_array_grow(pull_list, pull_list_size, 0);
-  pull_list_size = 0;
-
   static const struct GNUNET_CADET_MessageHandler cadet_handlers[] = {
     {&handle_peer_push        , GNUNET_MESSAGE_TYPE_RPS_PP_PUSH        , 0},
     {&handle_peer_pull_request, GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST, 0},
@@ -1433,6 +1543,16 @@
   LOG(GNUNET_ERROR_TYPE_DEBUG, "Connected to CADET\n");
 
 
+  /* Initialise sampler and gossip list */
+
+  sampler_list = SAMPLER_samplers_init(est_size, own_identity, insertCB, NULL, 
removeCB, NULL);
+
+  push_list = NULL;
+  push_list_size = 0;
+  pull_list = NULL;
+  pull_list_size = 0;
+
+
   LOG(GNUNET_ERROR_TYPE_DEBUG, "Requesting peers from CADET\n");
   GNUNET_CADET_get_peers(cadet_handle, &init_peer_cb, server);
   // FIXME use magic 0000 PeerID to _start_ the service




reply via email to

[Prev in Thread] Current Thread [Next in Thread]