[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r34742 - gnunet/src/rps
From: |
gnunet |
Subject: |
[GNUnet-SVN] r34742 - gnunet/src/rps |
Date: |
Sat, 20 Dec 2014 16:57:31 +0100 |
Author: ch3
Date: 2014-12-20 16:57:31 +0100 (Sat, 20 Dec 2014)
New Revision: 34742
Modified:
gnunet/src/rps/gnunet-service-rps.c
Log:
Cleaned up
Modified: gnunet/src/rps/gnunet-service-rps.c
===================================================================
--- gnunet/src/rps/gnunet-service-rps.c 2014-12-20 15:37:11 UTC (rev 34741)
+++ gnunet/src/rps/gnunet-service-rps.c 2014-12-20 15:57:31 UTC (rev 34742)
@@ -42,12 +42,12 @@
// TODO align message structs
-// TODO multipeerlist indep of gossiped list
-
// (TODO api -- possibility of getting weak random peer immideately)
// TODO malicious peer
+// TODO Change API to accept initialisation peers
+
/**
* Our configuration.
*/
@@ -74,6 +74,28 @@
// It might be interesting to formulate this independent of PeerIDs.
/**
+ * Callback that is called when a new PeerID is inserted into a sampler.
+ *
+ * @param cls the closure given alongside this function.
+ * @param id the PeerID that is inserted
+ * @param hash the hash the sampler produced of the PeerID
+ */
+typedef void (* SAMPLER_insertCB) (void *cls,
+ const struct GNUNET_PeerIdentity *id,
+ struct GNUNET_HashCode hash);
+
+/**
+ * Callback that is called when a new PeerID is removed from a sampler.
+ *
+ * @param cls the closure given alongside this function.
+ * @param id the PeerID that is removed
+ * @param hash the hash the sampler produced of the PeerID
+ */
+typedef void (* SAMPLER_removeCB) (void *cls,
+ const struct GNUNET_PeerIdentity *id,
+ struct GNUNET_HashCode hash);
+
+/**
* A sampler sampling PeerIDs.
*/
struct Sampler
@@ -124,6 +146,26 @@
struct GNUNET_PeerIdentity *peer_ids;
/**
+ * Callback to be called when a peer gets inserted into a sampler.
+ */
+ SAMPLER_insertCB insertCB;
+
+ /**
+ * Closure to the insertCB.
+ */
+ void *insertCLS;
+
+ /**
+ * Callback to be called when a peer gets inserted into a sampler.
+ */
+ SAMPLER_removeCB removeCB;
+
+ /**
+ * Closure to the removeCB.
+ */
+ void *removeCLS;
+
+ /**
* The head of the DLL.
*/
struct Sampler *head;
@@ -135,9 +177,6 @@
};
-// TODO change to updateCB and call on updates in general
-typedef void (* SAMPLER_deleteCB) (void *cls, const struct GNUNET_PeerIdentity
*id, struct GNUNET_HashCode hash);
-
/**
* (Re)Initialise given Sampler with random min-wise independent function.
*
@@ -161,9 +200,7 @@
GNUNET_assert(NULL != id);
s->peer_id = id;
- LOG(GNUNET_ERROR_TYPE_DEBUG, "Modified sampler->peer_id in _init()\n");
memcpy(s->peer_id, own_identity, sizeof(struct GNUNET_PeerIdentity)); //
FIXME this should probably be NULL -- the caller has to handle those.
- LOG(GNUNET_ERROR_TYPE_DEBUG, "Modified sampler->peer_id content in
_init()\n");
//s->peer_id = own_identity; // Maybe set to own PeerID. So we always have
// a valid PeerID in the sampler.
// Maybe take a PeerID as second argument.
@@ -210,8 +247,9 @@
*/
static void
SAMPLER_next(struct Sampler *s, const struct GNUNET_PeerIdentity *other,
- SAMPLER_deleteCB del_cb, void *cb_cls)
- // TODO set id in peer_ids
+ SAMPLER_insertCB insertCB, void *insertCLS,
+ SAMPLER_removeCB removeCB, void *removeCLS)
+ // TODO call update herein
{
struct GNUNET_HashCode other_hash;
@@ -240,9 +278,12 @@
LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Got PeerID %s; Simply accepting
(got NULL previously).\n",
GNUNET_i2s(other));
memcpy(s->peer_id, other, sizeof(struct GNUNET_PeerIdentity));
- LOG(GNUNET_ERROR_TYPE_DEBUG, "Modified sampler->peer_id content in
_next()\n");
//s->peer_id = other;
s->peer_id_hash = other_hash;
+ if (NULL != insertCB)
+ {
+ insertCB(insertCLS, s->peer_id, s->peer_id_hash);
+ }
}
else if ( 0 > hash_cmp(&other_hash, &s->peer_id_hash) )
{
@@ -251,17 +292,23 @@
LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Discarding old PeerID %s\n",
GNUNET_i2s(s->peer_id));
- if ( NULL != del_cb )
+ if ( NULL != removeCB )
{
- LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Removing old PeerID %s with the
delete callback.\n",
+ LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Removing old PeerID %s with the
remove callback.\n",
GNUNET_i2s(s->peer_id));
- del_cb(cb_cls, s->peer_id, s->peer_id_hash);
+ removeCB(removeCLS, s->peer_id, s->peer_id_hash);
}
memcpy(s->peer_id, other, sizeof(struct GNUNET_PeerIdentity));
- LOG(GNUNET_ERROR_TYPE_DEBUG, "Modified sampler->peer_id content in
_next()\n");
//s->peer_id = other;
s->peer_id_hash = other_hash;
+
+ if ( NULL != insertCB )
+ {
+ LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Inserting new PeerID %s with
the insert callback.\n",
+ GNUNET_i2s(s->peer_id));
+ insertCB(insertCLS, s->peer_id, s->peer_id_hash);
+ }
}
else
{
@@ -289,7 +336,7 @@
{
if ( samplers->size == new_size )
{
- LOG(GNUNET_ERROR_TYPE_DEBUG, "Size remains the same -- nothing to do\n");
+ LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Size remains the same -- nothing to
do\n");
return;
}
@@ -299,12 +346,27 @@
struct Sampler *tmp;
old_size = samplers->size;
- LOG(GNUNET_ERROR_TYPE_DEBUG, "Growing/Shrinking samplers %u -> %u\n",
old_size, new_size);
+ LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Growing/Shrinking samplers %u ->
%u\n", old_size, new_size);
+
+ iter = samplers->head;
+
+ if ( new_size < old_size )
+ {
+ for ( i = new_size ; i < old_size ; i++ )
+ {/* Remove unneeded rest */
+ tmp = iter->next;
+ LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Removing %" PRIX64 ". sampler\n",
i);
+ if (NULL != samplers->removeCB)
+ samplers->removeCB(samplers->removeCLS, iter->peer_id,
iter->peer_id_hash);
+ GNUNET_CONTAINER_DLL_remove(samplers->head, samplers->tail, iter);
+ GNUNET_free(iter);
+ iter = tmp;
+ }
+ }
+
GNUNET_array_grow(samplers->peer_ids, samplers->size, new_size);
- LOG(GNUNET_ERROR_TYPE_DEBUG, "Modified samplers->peer_ids in
_samplers_resize()\n");
- LOG(GNUNET_ERROR_TYPE_DEBUG, "samplers->peer_ids now points to %p\n",
samplers->peer_ids);
+ LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: samplers->peer_ids now points to
%p\n", samplers->peer_ids);
- iter = samplers->head;
if ( new_size > old_size )
{ /* Growing */
GNUNET_assert( NULL != fill_up_id );
@@ -313,8 +375,7 @@
if ( i < old_size )
{ /* Update old samplers */
iter->peer_id = &samplers->peer_ids[i];
- LOG(GNUNET_ERROR_TYPE_DEBUG, "Modified sampler->peer_id in
_samplers_resize()\n");
- LOG(GNUNET_ERROR_TYPE_DEBUG, "Updated %" PRIX64 ". sampler, now
pointing to %p, contains %s\n",
+ LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Updated %" PRIX64 ". sampler,
now pointing to %p, contains %s\n",
i, &samplers->peer_ids[i], GNUNET_i2s(iter->peer_id));
iter = iter->next;
}
@@ -322,37 +383,32 @@
{ /* Add new samplers */
memcpy(&samplers->peer_ids[i], fill_up_id, sizeof(struct
GNUNET_PeerIdentity));
iter = SAMPLER_init(&samplers->peer_ids[i]);
+ if (NULL != samplers->insertCB)
+ {
+ samplers->insertCB(samplers->insertCLS, iter->peer_id,
iter->peer_id_hash);
+ }
GNUNET_CONTAINER_DLL_insert_tail(samplers->head, samplers->tail, iter);
- LOG(GNUNET_ERROR_TYPE_DEBUG, "Added %" PRIX64 ". sampler, now pointing
to %p, contains %s\n",
+ LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Added %" PRIX64 ". sampler, now
pointing to %p, contains %s\n",
i, &samplers->peer_ids[i], GNUNET_i2s(iter->peer_id));
}
}
}
else// if ( new_size < old_size )
{ /* Shrinking */
- for ( i = 0 ; i < old_size ; i++)
+ for ( i = 0 ; i < new_size ; i++)
{ /* All samplers */
tmp = iter->next;
- if ( i < new_size )
- { /* Update remaining samplers */
- iter->peer_id = &samplers->peer_ids[i];
- LOG(GNUNET_ERROR_TYPE_DEBUG, "Modified sampler->peer_id in
_samplers_resize()\n");
- LOG(GNUNET_ERROR_TYPE_DEBUG, "Updatied %" PRIX64 ". sampler, now
pointing to %p, contains %s\n",
- i, &samplers->peer_ids[i], GNUNET_i2s(iter->peer_id));
- }
- else
- { /* Remove unneeded rest */
- LOG(GNUNET_ERROR_TYPE_DEBUG, "Removing %" PRIX64 ". sampler\n", i);
- // TODO call delCB on elem?
- GNUNET_CONTAINER_DLL_remove(samplers->head, samplers->tail, iter);
- GNUNET_free(iter);
- }
+ /* Update remaining samplers */
+ iter->peer_id = &samplers->peer_ids[i];
+ LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Updatied %" PRIX64 ". sampler,
now pointing to %p, contains %s\n",
+ i, &samplers->peer_ids[i], GNUNET_i2s(iter->peer_id));
+
iter = tmp;
}
}
GNUNET_assert(samplers->size == new_size);
- LOG(GNUNET_ERROR_TYPE_DEBUG, "Finished growing/shrinking.\n");
+ LOG(GNUNET_ERROR_TYPE_DEBUG, "SAMPLER: Finished growing/shrinking.\n");
}
@@ -360,7 +416,9 @@
* Initialise a tuple of samplers.
*/
struct Samplers *
-SAMPLER_samplers_init(size_t init_size, struct GNUNET_PeerIdentity *id)
+SAMPLER_samplers_init(size_t init_size, struct GNUNET_PeerIdentity *id,
+ SAMPLER_insertCB insertCB, void *insertCLS,
+ SAMPLER_removeCB removeCB, void *removeCLS)
{
struct Samplers *samplers;
//struct Sampler *s;
@@ -368,9 +426,12 @@
samplers = GNUNET_new(struct Samplers);
samplers->size = 0;
+ samplers->peer_ids = NULL;
+ samplers->insertCB = insertCB;
+ samplers->insertCLS = insertCLS;
+ samplers->removeCB = removeCB;
+ samplers->removeCLS = removeCLS;
samplers->head = samplers->tail = NULL;
- samplers->peer_ids = NULL;
- LOG(GNUNET_ERROR_TYPE_DEBUG, "Modified samplers->peer_ids in
_samplers_init()\n");
//samplers->peer_ids = GNUNET_new_array(init_size, struct
GNUNET_PeerIdentity);
SAMPLER_samplers_resize(samplers, init_size, id);
@@ -380,7 +441,6 @@
// GNUNET_array_append(samplers->peer_ids,
// samplers->size,
// *id);
- // LOG(GNUNET_ERROR_TYPE_DEBUG, "Modified samplers->peer_ids in
_samplers_init()\n");
// s = SAMPLER_init(&samplers->peer_ids[i]);
// GNUNET_CONTAINER_DLL_insert_tail(samplers->head,
// samplers->tail,
@@ -396,15 +456,16 @@
* A fuction to update every sampler in the given list
*/
static void
-SAMPLER_update_list(struct Samplers *samplers, const struct
GNUNET_PeerIdentity *id,
- SAMPLER_deleteCB del_cb, void *cb_cls)
+SAMPLER_update_list(struct Samplers *samplers, const struct
GNUNET_PeerIdentity *id)
{
struct Sampler *iter;
iter = samplers->head;
while ( NULL != iter->next )
{
- SAMPLER_next(iter, id, del_cb, cb_cls);
+ SAMPLER_next(iter, id,
+ samplers->insertCB, samplers->insertCLS,
+ samplers->removeCB, samplers->removeCLS);
iter = iter->next;
}
@@ -468,7 +529,7 @@
* Counts how many Samplers currently hold a given PeerID.
*/
uint64_t
-SAMPLER_count_id ( struct Samplers *samplers, struct GNUNET_PeerIdentity *id )
+SAMPLER_count_id ( struct Samplers *samplers, const struct GNUNET_PeerIdentity
*id )
{
struct Sampler *iter;
uint64_t count;
@@ -710,51 +771,111 @@
}
/**
+ * Make sure the context of a given peer exists.
+ */
+ void
+touch_peer_ctx (struct GNUNET_CONTAINER_MultiPeerMap *peer_map, const struct
GNUNET_PeerIdentity *peer)
+{
+ struct peer_context *ctx;
+
+ if ( GNUNET_YES == GNUNET_CONTAINER_multipeermap_contains( peer_map, peer ) )
+ {
+ ctx = GNUNET_CONTAINER_multipeermap_get(peer_map, peer);
+ }
+ else
+ {
+ ctx = GNUNET_malloc(sizeof(struct peer_context));
+ ctx->in_flags = 0;
+ ctx->mq = NULL;
+ ctx->to_channel = NULL;
+ ctx->from_channel = NULL;
+ GNUNET_CONTAINER_multipeermap_put( peer_map, peer, ctx,
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
+ }
+}
+
+/**
+ * Get the context of a peer. If not existing, create.
+ */
+ struct peer_context *
+get_peer_ctx (struct GNUNET_CONTAINER_MultiPeerMap *peer_map, const struct
GNUNET_PeerIdentity *peer)
+{
+ struct peer_context *ctx;
+
+ touch_peer_ctx(peer_map, peer);
+ ctx = GNUNET_CONTAINER_multipeermap_get(peer_map, peer);
+ return ctx;
+}
+
+/**
+ * Get the channel of a peer. If not existing, create.
+ */
+ void
+touch_channel (struct GNUNET_CONTAINER_MultiPeerMap *peer_map, const struct
GNUNET_PeerIdentity *peer)
+{
+ struct peer_context *ctx;
+
+ ctx = get_peer_ctx (peer_map, peer);
+ if (NULL == ctx->to_channel)
+ {
+ ctx->to_channel = GNUNET_CADET_channel_create(cadet_handle, NULL, peer,
+ GNUNET_RPS_CADET_PORT,
+
GNUNET_CADET_OPTION_RELIABLE);
+ //TODO do I have to explicitly put it in the peer_map?
+ }
+}
+
+/**
+ * Get the channel of a peer. If not existing, create.
+ */
+ struct GNUNET_CADET_Channel *
+get_channel (struct GNUNET_CONTAINER_MultiPeerMap *peer_map, const struct
GNUNET_PeerIdentity *peer)
+{
+ struct peer_context *ctx;
+
+ ctx = get_peer_ctx (peer_map, peer);
+ touch_channel(peer_map, peer);
+ return ctx->to_channel;
+}
+
+/**
+ * Make sure the mq for a given peer exists.
+ *
+ * If we already have a message queue open to this client,
+ * simply return it, otherways create one.
+ */
+ void
+touch_mq (struct GNUNET_CONTAINER_MultiPeerMap *peer_map, const struct
GNUNET_PeerIdentity *peer_id)
+{
+ struct peer_context *ctx;
+
+ ctx = get_peer_ctx(peer_map, peer_id);
+ if (NULL == ctx->mq)
+ {
+ touch_channel(peer_map, peer_id);
+ ctx->mq = GNUNET_CADET_mq_create(ctx->to_channel);
+ //TODO do I have to explicitly put it in the peer_map?
+ //GNUNET_CONTAINER_multipeermap_put(peer_map, peer_id, ctx,
+ //
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
+ }
+}
+
+/**
* Get the message queue of a specific peer.
*
* If we already have a message queue open to this client,
* simply return it, otherways create one.
*/
struct GNUNET_MQ_Handle *
-get_mq (struct GNUNET_CONTAINER_MultiPeerMap *peer_map, struct
GNUNET_PeerIdentity *peer_id)
+get_mq (struct GNUNET_CONTAINER_MultiPeerMap *peer_map, const struct
GNUNET_PeerIdentity *peer_id)
{
struct peer_context *ctx;
- struct GNUNET_MQ_Handle * mq;
- struct GNUNET_CADET_Channel *channel;
- if ( GNUNET_OK != GNUNET_CONTAINER_multipeermap_contains( peer_map, peer_id
) ) {
+ ctx = get_peer_ctx(peer_map, peer_id);
+ touch_mq(peer_map, peer_id);
- channel = GNUNET_CADET_channel_create(cadet_handle, NULL, peer_id,
- GNUNET_RPS_CADET_PORT,
- GNUNET_CADET_OPTION_RELIABLE);
- mq = GNUNET_CADET_mq_create(channel);
-
- ctx = GNUNET_malloc(sizeof(struct peer_context));
- ctx->in_flags = 0;
- ctx->to_channel = channel;
- ctx->mq = mq;
-
- GNUNET_CONTAINER_multipeermap_put(peer_map, peer_id, ctx,
-
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
- } else {
- ctx = GNUNET_CONTAINER_multipeermap_get(peer_map, peer_id);
- if ( NULL == ctx->mq ) {
- if ( NULL == ctx->to_channel ) {
- channel = GNUNET_CADET_channel_create(cadet_handle, NULL, peer_id,
- GNUNET_RPS_CADET_PORT,
- GNUNET_CADET_OPTION_RELIABLE);
- ctx->to_channel = channel;
- }
-
- mq = GNUNET_CADET_mq_create(ctx->to_channel);
- ctx->mq = mq;
- }
- }
-
return ctx->mq;
}
-
/***********************************************************************
* /Util functions
***********************************************************************/
@@ -819,29 +940,21 @@
GNUNET_SERVER_client_set_user_context(client, cli_ctx);
}
- //mq = GNUNET_MQ_queue_for_server_client(client);
-
// TODO How many peers do we give back?
// Wait until we have enough random peers?
ev = GNUNET_MQ_msg_extra(out_msg,
GNUNET_ntohll(msg->num_peers) * sizeof(struct
GNUNET_PeerIdentity),
GNUNET_MESSAGE_TYPE_RPS_CS_REPLY);
- out_msg->num_peers = GNUNET_ntohll(msg->num_peers);
+ out_msg->num_peers = msg->num_peers; // No conversion between network and
host order
num_peers = GNUNET_ntohll(msg->num_peers);
//&out_msg[1] = SAMPLER_get_n_rand_peers(sampler_list, num_peers);
memcpy(&out_msg[1],
SAMPLER_get_n_rand_peers(sampler_list, num_peers),
num_peers * sizeof(struct GNUNET_PeerIdentity));
- //for ( i = 0 ; i < num_peers ; i++ ) {
- // memcpy(&out_msg[1] + i * sizeof(struct GNUNET_PeerIdentity),
- // SAMPLER_get_rand_peer(sampler_list),
- // sizeof(struct GNUNET_PeerIdentity));
- //}
GNUNET_MQ_send(cli_ctx->mq, ev);
- //GNUNET_MQ_send(mq, ev);
//GNUNET_MQ_destroy(mq);
GNUNET_SERVER_receive_done (client,
@@ -973,22 +1086,6 @@
/**
- * Callback called when a Sampler is updated.
- */
- void
-delete_cb (void *cls, struct GNUNET_PeerIdentity *id, struct GNUNET_HashCode
hash)
-{
- size_t s;
-
- s = SAMPLER_count_id(sampler_list, id);
- if ( 1 >= s ) {
- // TODO cleanup peer
- GNUNET_CONTAINER_multipeermap_remove_all( peer_map, id);
- }
-}
-
-
-/**
* Send out PUSHes and PULLs.
*
* This is executed regylary.
@@ -1010,16 +1107,12 @@
/* If the NSE has changed adapt the lists accordingly */
- // TODO check nse == 0!
- LOG(GNUNET_ERROR_TYPE_DEBUG, "Checking size estimate.\n");
if ( sampler_list->size != est_size )
SAMPLER_samplers_resize(sampler_list, est_size, own_identity);
GNUNET_array_grow(gossip_list, gossip_list_size, est_size);
- gossip_list_size = sampler_list->size = est_size;
-
/* Would it make sense to have one shuffeled gossip list and then
* to send PUSHes to first alpha peers, PULL requests to next beta peers and
* use the rest to update sampler?
@@ -1063,21 +1156,21 @@
}
-
-
/* Update gossip list */
uint64_t r_index;
if ( push_list_size <= alpha * gossip_list_size &&
push_list_size != 0 &&
- pull_list_size != 0 ) {
+ pull_list_size != 0 )
+ {
LOG(GNUNET_ERROR_TYPE_DEBUG, "Update of the gossip list. ()\n");
uint64_t first_border;
uint64_t second_border;
first_border = round(alpha * gossip_list_size);
- for ( i = 0 ; i < first_border ; i++ ) { // TODO use
SAMPLER_get_n_rand_peers
+ for ( i = 0 ; i < first_border ; i++ )
+ { // TODO use SAMPLER_get_n_rand_peers
/* Update gossip list with peers received through PUSHes */
r_index = GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_STRONG,
push_list_size);
@@ -1086,7 +1179,8 @@
}
second_border = first_border + round(beta * gossip_list_size);
- for ( i = first_border ; i < second_border ; i++ ) {
+ for ( i = first_border ; i < second_border ; i++ )
+ {
/* Update gossip list with peers received through PULLs */
r_index = GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_STRONG,
pull_list_size);
@@ -1094,7 +1188,8 @@
// TODO change the in_flags accordingly
}
- for ( i = second_border ; i < gossip_list_size ; i++ ) {
+ for ( i = second_border ; i < gossip_list_size ; i++ )
+ {
/* Update gossip list with peers from history */
r_index = GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_STRONG,
sampler_list->size);
@@ -1102,7 +1197,9 @@
// TODO change the in_flags accordingly
}
- } else {
+ }
+ else
+ {
LOG(GNUNET_ERROR_TYPE_DEBUG, "No update of the gossip list. ()\n");
}
// TODO independent of that also get some peers from CADET_get_peers()?
@@ -1112,35 +1209,66 @@
for ( i = 0 ; i < push_list_size ; i++ )
{
- SAMPLER_update_list(sampler_list, &push_list[i], NULL, NULL);
+ SAMPLER_update_list(sampler_list, &push_list[i]);
// TODO set in_flag?
}
for ( i = 0 ; i < pull_list_size ; i++ )
{
- SAMPLER_update_list(sampler_list, &pull_list[i], NULL, NULL);
+ SAMPLER_update_list(sampler_list, &pull_list[i]);
// TODO set in_flag?
}
- // TODO go over whole peer_map and do cleanups
- // delete unneeded peers, set in_flags, check channel/mq
- // -- already done with deleteCB?
-
-
/* Empty push/pull lists */
GNUNET_array_grow(push_list, push_list_size, 0);
- push_list_size = 0; // TODO I guess that's not necessary but doesn't hurt
+ push_list_size = 0; // I guess that's not necessary but doesn't hurt
GNUNET_array_grow(pull_list, pull_list_size, 0);
- pull_list_size = 0; // TODO I guess that's not necessary but doesn't hurt
+ pull_list_size = 0; // I guess that's not necessary but doesn't hurt
/* Schedule next round */
- // TODO
do_round_task = GNUNET_SCHEDULER_add_delayed( round_interval, &do_round,
NULL );
LOG(GNUNET_ERROR_TYPE_DEBUG, "Finished round\n");
}
+/**
+ * Open a connection to given peer and store channel and mq.
+ */
+ void
+insertCB (void *cls, const struct GNUNET_PeerIdentity *id, struct
GNUNET_HashCode hash)
+{
+ touch_mq(peer_map, id);
+}
+
+/**
+ * Close the connection to given peer and delete channel and mq.
+ */
+ void
+removeCB (void *cls, const struct GNUNET_PeerIdentity *id, struct
GNUNET_HashCode hash)
+{
+ size_t s;
+ struct peer_context *ctx;
+
+ s = SAMPLER_count_id(sampler_list, id);
+ if ( 1 >= s ) {
+ if (GNUNET_YES == GNUNET_CONTAINER_multipeermap_contains(peer_map, id))
+ {
+ ctx = GNUNET_CONTAINER_multipeermap_get(peer_map, id);
+ if (NULL != ctx->to_channel)
+ {
+ if (NULL != ctx->mq)
+ {
+ GNUNET_MQ_destroy(ctx->mq);
+ }
+ GNUNET_CADET_channel_destroy(ctx->to_channel);
+ }
+ // TODO cleanup peer
+ GNUNET_CONTAINER_multipeermap_remove_all(peer_map, id);
+ }
+ }
+}
+
static void
rps_start (struct GNUNET_SERVER_Handle *server);
@@ -1158,28 +1286,19 @@
unsigned int best_path) // "How long is the best path?
// (0 = unknown, 1 = ourselves, 2 =
neighbor)"
{
- if ( NULL != peer ) {
+ if ( NULL != peer )
+ {
LOG(GNUNET_ERROR_TYPE_DEBUG, "Got peer %s (at %p) from CADET\n",
GNUNET_i2s(peer), peer);
- SAMPLER_update_list(sampler_list, peer, NULL, NULL);
- // TODO put the following part in a function of its own.
- if ( GNUNET_YES == GNUNET_CONTAINER_multipeermap_contains( peer_map, peer
) ) {
- ;
- } else {
- struct peer_context *ctx;
+ SAMPLER_update_list(sampler_list, peer);
+ touch_peer_ctx(peer_map, peer);
- ctx = GNUNET_malloc(sizeof(struct peer_context));
- ctx->in_flags = 0;
- ctx->mq = NULL;
- ctx->to_channel = NULL;
- ctx->from_channel = NULL;
- GNUNET_CONTAINER_multipeermap_put( peer_map, peer, ctx,
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
- }
-
uint64_t i;
i = GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_STRONG,
gossip_list_size);
gossip_list[i] = *peer;
// TODO send push/pull to each of those peers?
- } else {
+ }
+ else
+ {
rps_start( (struct GNUNET_SERVER_Handle *) cls);
}
}
@@ -1251,7 +1370,7 @@
GNUNET_assert( NULL != channel );
- // TODO we might even not store the from_channel
+ // TODO we might not even store the from_channel
if ( GNUNET_CONTAINER_multipeermap_contains( peer_map, initiator ) ) {
((struct peer_context *) GNUNET_CONTAINER_multipeermap_get( peer_map,
initiator ))->from_channel = channel;
@@ -1282,6 +1401,8 @@
void *channel_ctx)
{
LOG(GNUNET_ERROR_TYPE_DEBUG, "Channel was destroyed by remote peer.\n");
+ // TODO test whether that was a peer in the samplers/a peer we opened a
connection to
+ // and if so, reinitialise the sampler
}
/**
@@ -1405,17 +1526,6 @@
peer_map = GNUNET_CONTAINER_multipeermap_create(est_size, GNUNET_NO);
- /* Initialise sampler and gossip list */
-
- sampler_list = SAMPLER_samplers_init(est_size, own_identity);
-
- push_list = NULL;
- //GNUNET_array_grow(push_list, push_list_size, 0);
- push_list_size = 0;
- pull_list = NULL;
- //GNUNET_array_grow(pull_list, pull_list_size, 0);
- pull_list_size = 0;
-
static const struct GNUNET_CADET_MessageHandler cadet_handlers[] = {
{&handle_peer_push , GNUNET_MESSAGE_TYPE_RPS_PP_PUSH , 0},
{&handle_peer_pull_request, GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST, 0},
@@ -1433,6 +1543,16 @@
LOG(GNUNET_ERROR_TYPE_DEBUG, "Connected to CADET\n");
+ /* Initialise sampler and gossip list */
+
+ sampler_list = SAMPLER_samplers_init(est_size, own_identity, insertCB, NULL,
removeCB, NULL);
+
+ push_list = NULL;
+ push_list_size = 0;
+ pull_list = NULL;
+ pull_list_size = 0;
+
+
LOG(GNUNET_ERROR_TYPE_DEBUG, "Requesting peers from CADET\n");
GNUNET_CADET_get_peers(cadet_handle, &init_peer_cb, server);
// FIXME use magic 0000 PeerID to _start_ the service
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r34742 - gnunet/src/rps,
gnunet <=