[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r37995 - gnunet/src/psyc
From: |
gnunet |
Subject: |
[GNUnet-SVN] r37995 - gnunet/src/psyc |
Date: |
Sat, 24 Sep 2016 00:51:13 +0200 |
Author: tg
Date: 2016-09-24 00:51:13 +0200 (Sat, 24 Sep 2016)
New Revision: 37995
Modified:
gnunet/src/psyc/gnunet-service-psyc.c
Log:
psyc: switch to SERVICE API
Modified: gnunet/src/psyc/gnunet-service-psyc.c
===================================================================
--- gnunet/src/psyc/gnunet-service-psyc.c 2016-09-23 21:17:16 UTC (rev
37994)
+++ gnunet/src/psyc/gnunet-service-psyc.c 2016-09-23 22:51:13 UTC (rev
37995)
@@ -44,14 +44,14 @@
static const struct GNUNET_CONFIGURATION_Handle *cfg;
/**
- * Handle to the statistics service.
+ * Service handle.
*/
-static struct GNUNET_STATISTICS_Handle *stats;
+struct GNUNET_SERVICE_Handle *service;
/**
- * Notification context, simplifies client broadcasts.
+ * Handle to the statistics service.
*/
-static struct GNUNET_SERVER_NotificationContext *nc;
+static struct GNUNET_STATISTICS_Handle *stats;
/**
* Handle to the PSYCstore.
@@ -85,7 +85,7 @@
struct TransmitMessage *prev;
struct TransmitMessage *next;
- struct GNUNET_SERVER_Client *client;
+ struct GNUNET_SERVICE_Client *client;
/**
* ID assigned to the message.
@@ -185,12 +185,12 @@
/**
* List of connected clients.
*/
-struct Client
+struct ClientList
{
- struct Client *prev;
- struct Client *next;
+ struct ClientList *prev;
+ struct ClientList *next;
- struct GNUNET_SERVER_Client *client;
+ struct GNUNET_SERVICE_Client *client;
};
@@ -199,8 +199,8 @@
struct Operation *prev;
struct Operation *next;
- struct GNUNET_SERVER_Client *client;
- struct Channel *chn;
+ struct GNUNET_SERVICE_Client *client;
+ struct Channel *channel;
uint64_t op_id;
uint32_t flags;
};
@@ -211,8 +211,8 @@
*/
struct Channel
{
- struct Client *clients_head;
- struct Client *clients_tail;
+ struct ClientList *clients_head;
+ struct ClientList *clients_tail;
struct Operation *op_head;
struct Operation *op_tail;
@@ -270,11 +270,6 @@
uint32_t tmit_mod_value_size;
/**
- * Is this a channel master (#GNUNET_YES), or slave (#GNUNET_NO)?
- */
- uint8_t is_master;
-
- /**
* Is this channel ready to receive messages from client?
* #GNUNET_YES or #GNUNET_NO
*/
@@ -285,6 +280,16 @@
* #GNUNET_YES or #GNUNET_NO
*/
uint8_t is_disconnected;
+
+ /**
+ * Is this a channel master (#GNUNET_YES), or slave (#GNUNET_NO)?
+ */
+ uint8_t is_master;
+
+ union {
+ struct Master *master;
+ struct Slave *slave;
+ };
};
@@ -296,7 +301,7 @@
/**
* Channel struct common for Master and Slave
*/
- struct Channel chn;
+ struct Channel channel;
/**
* Private key of the channel.
@@ -353,7 +358,7 @@
/**
* Channel struct common for Master and Slave
*/
- struct Channel chn;
+ struct Channel channel;
/**
* Private key of the slave.
@@ -417,6 +422,24 @@
};
+/**
+ * Client context.
+ */
+struct Client {
+ struct GNUNET_SERVICE_Client *client;
+ struct Channel *channel;
+};
+
+
+struct ReplayRequestKey
+{
+ uint64_t fragment_id;
+ uint64_t message_id;
+ uint64_t fragment_offset;
+ uint64_t flags;
+};
+
+
static void
transmit_message (struct Channel *chn);
@@ -444,11 +467,6 @@
static void
shutdown_task (void *cls)
{
- if (NULL != nc)
- {
- GNUNET_SERVER_notification_context_destroy (nc);
- nc = NULL;
- }
if (NULL != stats)
{
GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
@@ -458,12 +476,12 @@
static struct Operation *
-op_add (struct Channel *chn, struct GNUNET_SERVER_Client *client,
+op_add (struct Channel *chn, struct GNUNET_SERVICE_Client *client,
uint64_t op_id, uint32_t flags)
{
struct Operation *op = GNUNET_malloc (sizeof (*op));
op->client = client;
- op->chn = chn;
+ op->channel = chn;
op->op_id = op_id;
op->flags = flags;
GNUNET_CONTAINER_DLL_insert (chn->op_head, chn->op_tail, op);
@@ -474,7 +492,7 @@
static void
op_remove (struct Operation *op)
{
- GNUNET_CONTAINER_DLL_remove (op->chn->op_head, op->chn->op_tail, op);
+ GNUNET_CONTAINER_DLL_remove (op->channel->op_head, op->channel->op_tail, op);
GNUNET_free (op);
}
@@ -485,7 +503,7 @@
static void
cleanup_master (struct Master *mst)
{
- struct Channel *chn = &mst->chn;
+ struct Channel *chn = &mst->channel;
if (NULL != mst->origin)
GNUNET_MULTICAST_origin_stop (mst->origin, NULL, NULL); // FIXME
@@ -500,7 +518,7 @@
static void
cleanup_slave (struct Slave *slv)
{
- struct Channel *chn = &slv->chn;
+ struct Channel *chn = &slv->channel;
struct GNUNET_CONTAINER_MultiHashMap *
chn_slv = GNUNET_CONTAINER_multihashmap_get (channel_slaves,
&chn->pub_key_hash);
@@ -556,8 +574,8 @@
}
(GNUNET_YES == chn->is_master)
- ? cleanup_master ((struct Master *) chn)
- : cleanup_slave ((struct Slave *) chn);
+ ? cleanup_master (chn->master)
+ : cleanup_slave (chn->slave);
GNUNET_free (chn);
}
@@ -566,18 +584,18 @@
* Called whenever a client is disconnected.
* Frees our resources associated with that client.
*
- * @param cls Closure.
- * @param client Identification of the client.
+ * @param cls closure
+ * @param client identification of the client
+ * @param app_ctx must match @a client
*/
static void
-client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
+client_notify_disconnect (void *cls,
+ struct GNUNET_SERVICE_Client *client,
+ void *app_ctx)
{
- if (NULL == client)
- return;
+ struct Client *c = app_ctx;
+ struct Channel *chn = c->channel;
- struct Channel *
- chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
-
if (NULL == chn)
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -593,7 +611,7 @@
(GNUNET_YES == chn->is_master) ? "master" : "slave",
GNUNET_h2s (&chn->pub_key_hash));
- struct Client *cli = chn->clients_head;
+ struct ClientList *cli = chn->clients_head;
while (NULL != cli)
{
if (cli->client == client)
@@ -637,6 +655,28 @@
/**
+ * A new client connected.
+ *
+ * @param cls NULL
+ * @param client client to add
+ * @param mq message queue for @a client
+ * @return @a client
+ */
+static void *
+client_notify_connect (void *cls,
+ struct GNUNET_SERVICE_Client *client,
+ struct GNUNET_MQ_Handle *mq)
+{
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Client connected: %p\n", client);
+
+ struct Client *c = GNUNET_malloc (sizeof (*c));
+ c->client = client;
+
+ return c;
+}
+
+
+/**
* Send message to all clients connected to the channel.
*/
static void
@@ -647,11 +687,15 @@
"%p Sending message to clients.\n",
chn);
- struct Client *cli = chn->clients_head;
+ struct ClientList *cli = chn->clients_head;
while (NULL != cli)
{
- GNUNET_SERVER_notification_context_add (nc, cli->client);
- GNUNET_SERVER_notification_context_unicast (nc, cli->client, msg,
GNUNET_NO);
+ struct GNUNET_MQ_Envelope *
+ env = GNUNET_MQ_msg_copy (msg);
+
+ GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (cli->client),
+ env);
+
cli = cli->next;
}
}
@@ -672,14 +716,14 @@
* Size of @a data.
*/
static void
-client_send_result (struct GNUNET_SERVER_Client *client, uint64_t op_id,
+client_send_result (struct GNUNET_SERVICE_Client *client, uint64_t op_id,
int64_t result_code, const void *data, uint16_t data_size)
{
struct GNUNET_OperationResultMessage *res;
-
- res = GNUNET_malloc (sizeof (*res) + data_size);
- res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_RESULT_CODE);
- res->header.size = htons (sizeof (*res) + data_size);
+ struct GNUNET_MQ_Envelope *
+ env = GNUNET_MQ_msg_extra (res,
+ data_size,
+ GNUNET_MESSAGE_TYPE_PSYC_RESULT_CODE);
res->result_code = GNUNET_htonll (result_code);
res->op_id = op_id;
if (0 < data_size)
@@ -692,10 +736,7 @@
result_code,
data_size);
- GNUNET_SERVER_notification_context_add (nc, client);
- GNUNET_SERVER_notification_context_unicast (nc, client, &res->header,
- GNUNET_NO);
- GNUNET_free (res);
+ GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client), env);
}
@@ -705,8 +746,8 @@
struct JoinMemTestClosure
{
struct GNUNET_CRYPTO_EcdsaPublicKey slave_pub_key;
- struct Channel *chn;
- struct GNUNET_MULTICAST_JoinHandle *jh;
+ struct Channel *channel;
+ struct GNUNET_MULTICAST_JoinHandle *join_handle;
struct GNUNET_PSYC_JoinRequestMessage *join_msg;
};
@@ -720,15 +761,15 @@
{
struct JoinMemTestClosure *jcls = cls;
- if (GNUNET_NO == result && GNUNET_YES == jcls->chn->is_master)
+ if (GNUNET_NO == result && GNUNET_YES == jcls->channel->is_master)
{ /* Pass on join request to client if this is a master channel */
- struct Master *mst = (struct Master *) jcls->chn;
+ struct Master *mst = jcls->channel->master;
struct GNUNET_HashCode slave_pub_hash;
GNUNET_CRYPTO_hash (&jcls->slave_pub_key, sizeof (jcls->slave_pub_key),
&slave_pub_hash);
- GNUNET_CONTAINER_multihashmap_put (mst->join_reqs, &slave_pub_hash,
jcls->jh,
+ GNUNET_CONTAINER_multihashmap_put (mst->join_reqs, &slave_pub_hash,
jcls->join_handle,
GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
- client_send_msg (jcls->chn, &jcls->join_msg->header);
+ client_send_msg (jcls->channel, &jcls->join_msg->header);
}
else
{
@@ -739,7 +780,7 @@
err_msg_size, err_msg);
}
// FIXME: add relays
- GNUNET_MULTICAST_join_decision (jcls->jh, result, 0, NULL, NULL);
+ GNUNET_MULTICAST_join_decision (jcls->join_handle, result, 0, NULL, NULL);
}
GNUNET_free (jcls->join_msg);
GNUNET_free (jcls);
@@ -786,8 +827,8 @@
struct JoinMemTestClosure *jcls = GNUNET_malloc (sizeof (*jcls));
jcls->slave_pub_key = *slave_pub_key;
- jcls->chn = chn;
- jcls->jh = jh;
+ jcls->channel = chn;
+ jcls->join_handle = jh;
jcls->join_msg = req;
GNUNET_PSYCSTORE_membership_test (store, &chn->pub_key, slave_pub_key,
@@ -807,7 +848,7 @@
const struct GNUNET_MessageHeader *join_resp)
{
struct Slave *slv = cls;
- struct Channel *chn = &slv->chn;
+ struct Channel *chn = &slv->channel;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"%p Got join decision: %d\n",
slv,
@@ -1033,7 +1074,7 @@
client_send_mcast_req (struct Master *mst,
const struct GNUNET_MULTICAST_RequestHeader *req)
{
- struct Channel *chn = &mst->chn;
+ struct Channel *chn = &mst->channel;
struct GNUNET_PSYC_MessageHeader *pmsg;
uint16_t size = ntohs (req->header.size);
@@ -1090,7 +1131,7 @@
if (NULL == fragq)
{
- fragq = GNUNET_new (struct FragmentQueue);
+ fragq = GNUNET_malloc (sizeof (*fragq));
fragq->state = MSG_FRAG_STATE_HEADER;
fragq->fragments
= GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
@@ -1122,7 +1163,7 @@
chn,
fragq->header_size,
size);
- cache_entry = GNUNET_new (struct RecvCacheEntry);
+ cache_entry = GNUNET_malloc (sizeof (*cache_entry));
cache_entry->ref_count = 1;
cache_entry->mmsg = GNUNET_malloc (size);
GNUNET_memcpy (cache_entry->mmsg, mmsg, size);
@@ -1305,7 +1346,7 @@
struct StateModifyClosure
{
- struct Channel *chn;
+ struct Channel *channel;
uint64_t msg_id;
struct GNUNET_HashCode msg_id_hash;
};
@@ -1316,7 +1357,7 @@
const char *err_msg, uint16_t err_msg_size)
{
struct StateModifyClosure *mcls = cls;
- struct Channel *chn = mcls->chn;
+ struct Channel *chn = mcls->channel;
uint64_t msg_id = mcls->msg_id;
struct FragmentQueue *
@@ -1434,7 +1475,7 @@
}
struct StateModifyClosure *mcls = GNUNET_malloc (sizeof (*mcls));
- mcls->chn = chn;
+ mcls->channel = chn;
mcls->msg_id = msg_id;
mcls->msg_id_hash = msg_id_hash;
@@ -1604,7 +1645,7 @@
uint64_t max_state_message_id)
{
struct Master *mst = cls;
- struct Channel *chn = &mst->chn;
+ struct Channel *chn = &mst->channel;
chn->store_op = NULL;
struct GNUNET_PSYC_CountersResultMessage res;
@@ -1649,7 +1690,7 @@
uint64_t max_state_message_id)
{
struct Slave *slv = cls;
- struct Channel *chn = &slv->chn;
+ struct Channel *chn = &slv->channel;
chn->store_op = NULL;
struct GNUNET_PSYC_CountersResultMessage res;
@@ -1703,11 +1744,11 @@
* Handle a connecting client starting a channel master.
*/
static void
-client_recv_master_start (void *cls, struct GNUNET_SERVER_Client *client,
- const struct GNUNET_MessageHeader *msg)
+handle_client_master_start (void *cls,
+ const struct MasterStartRequest *req)
{
- const struct MasterStartRequest *req
- = (const struct MasterStartRequest *) msg;
+ struct Client *c = cls;
+ struct GNUNET_SERVICE_Client *client = c->client;
struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
struct GNUNET_HashCode pub_key_hash;
@@ -1721,12 +1762,13 @@
if (NULL == mst)
{
- mst = GNUNET_new (struct Master);
+ mst = GNUNET_malloc (sizeof (*mst));
mst->policy = ntohl (req->policy);
mst->priv_key = req->channel_key;
mst->join_reqs = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
- chn = &mst->chn;
+ chn = c->channel = &mst->channel;
+ chn->master = mst;
chn->is_master = GNUNET_YES;
chn->pub_key = pub_key;
chn->pub_key_hash = pub_key_hash;
@@ -1739,17 +1781,15 @@
}
else
{
- chn = &mst->chn;
+ chn = &mst->channel;
- struct GNUNET_PSYC_CountersResultMessage res;
- res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK);
- res.header.size = htons (sizeof (res));
- res.result_code = htonl (GNUNET_OK);
- res.max_message_id = GNUNET_htonll (mst->max_message_id);
+ struct GNUNET_PSYC_CountersResultMessage *res;
+ struct GNUNET_MQ_Envelope *
+ env = GNUNET_MQ_msg (res, GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK);
+ res->result_code = htonl (GNUNET_OK);
+ res->max_message_id = GNUNET_htonll (mst->max_message_id);
- GNUNET_SERVER_notification_context_add (nc, client);
- GNUNET_SERVER_notification_context_unicast (nc, client, &res.header,
- GNUNET_NO);
+ GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client), env);
}
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -1756,24 +1796,32 @@
"%p Client connected as master to channel %s.\n",
mst, GNUNET_h2s (&chn->pub_key_hash));
- struct Client *cli = GNUNET_new (struct Client);
+ struct ClientList *cli = GNUNET_malloc (sizeof (*cli));
cli->client = client;
GNUNET_CONTAINER_DLL_insert (chn->clients_head, chn->clients_tail, cli);
- GNUNET_SERVER_client_set_user_context (client, chn);
- GNUNET_SERVER_receive_done (client, GNUNET_OK);
+ GNUNET_SERVICE_client_continue (client);
}
+static int
+check_client_slave_join (void *cls,
+ const struct SlaveJoinRequest *req)
+{
+ return GNUNET_OK;
+}
+
+
/**
* Handle a connecting client joining as a channel slave.
*/
static void
-client_recv_slave_join (void *cls, struct GNUNET_SERVER_Client *client,
- const struct GNUNET_MessageHeader *msg)
+handle_client_slave_join (void *cls,
+ const struct SlaveJoinRequest *req)
{
- const struct SlaveJoinRequest *req
- = (const struct SlaveJoinRequest *) msg;
+ struct Client *c = cls;
+ struct GNUNET_SERVICE_Client *client = c->client;
+
uint16_t req_size = ntohs (req->header.size);
struct GNUNET_CRYPTO_EcdsaPublicKey slv_pub_key;
@@ -1794,7 +1842,7 @@
}
if (NULL == slv)
{
- slv = GNUNET_new (struct Slave);
+ slv = GNUNET_malloc (sizeof (*slv));
slv->priv_key = req->slave_key;
slv->pub_key = slv_pub_key;
slv->pub_key_hash = slv_pub_hash;
@@ -1825,7 +1873,7 @@
join_msg_size,
req_size);
GNUNET_break (0);
- GNUNET_SERVER_client_disconnect (client);
+ GNUNET_SERVICE_client_drop (client);
GNUNET_free (slv);
return;
}
@@ -1835,7 +1883,8 @@
GNUNET_memcpy (slv->relays, &req[1], relay_size);
}
- chn = &slv->chn;
+ chn = c->channel = &slv->channel;
+ chn->slave = slv;
chn->is_master = GNUNET_NO;
chn->pub_key = req->channel_pub_key;
chn->pub_key_hash = pub_key_hash;
@@ -1856,18 +1905,17 @@
}
else
{
- chn = &slv->chn;
+ chn = &slv->channel;
- struct GNUNET_PSYC_CountersResultMessage res;
- res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK);
- res.header.size = htons (sizeof (res));
- res.result_code = htonl (GNUNET_OK);
- res.max_message_id = GNUNET_htonll (chn->max_message_id);
+ struct GNUNET_PSYC_CountersResultMessage *res;
- GNUNET_SERVER_notification_context_add (nc, client);
- GNUNET_SERVER_notification_context_unicast (nc, client, &res.header,
- GNUNET_NO);
+ struct GNUNET_MQ_Envelope *
+ env = GNUNET_MQ_msg (res, GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK);
+ res->result_code = htonl (GNUNET_OK);
+ res->max_message_id = GNUNET_htonll (chn->max_message_id);
+ GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client), env);
+
if (GNUNET_PSYC_SLAVE_JOIN_LOCAL & slv->join_flags)
{
mcast_recv_join_decision (slv, GNUNET_YES,
@@ -1893,10 +1941,9 @@
}
else if (NULL != slv->join_dcsn)
{
- GNUNET_SERVER_notification_context_add (nc, client);
- GNUNET_SERVER_notification_context_unicast (nc, client,
- &slv->join_dcsn->header,
- GNUNET_NO);
+ struct GNUNET_MQ_Envelope *
+ env = GNUNET_MQ_msg_copy (&slv->join_dcsn->header);
+ GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client), env);
}
}
@@ -1904,12 +1951,11 @@
"%p Client connected as slave to channel %s.\n",
slv, GNUNET_h2s (&chn->pub_key_hash));
- struct Client *cli = GNUNET_new (struct Client);
+ struct ClientList *cli = GNUNET_malloc (sizeof (*cli));
cli->client = client;
GNUNET_CONTAINER_DLL_insert (chn->clients_head, chn->clients_tail, cli);
- GNUNET_SERVER_client_set_user_context (client, chn);
- GNUNET_SERVER_receive_done (client, GNUNET_OK);
+ GNUNET_SERVICE_client_continue (client);
}
@@ -1935,31 +1981,37 @@
}
+static int
+check_client_join_decision (void *cls,
+ const struct GNUNET_PSYC_JoinDecisionMessage *dcsn)
+{
+ return GNUNET_OK;
+}
+
+
/**
* Join decision from client.
*/
static void
-client_recv_join_decision (void *cls, struct GNUNET_SERVER_Client *client,
- const struct GNUNET_MessageHeader *msg)
+handle_client_join_decision (void *cls,
+ const struct GNUNET_PSYC_JoinDecisionMessage
*dcsn)
{
- const struct GNUNET_PSYC_JoinDecisionMessage *dcsn
- = (const struct GNUNET_PSYC_JoinDecisionMessage *) msg;
- struct Channel *chn;
- struct Master *mst;
- struct JoinDecisionClosure jcls;
-
- chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
+ struct Client *c = cls;
+ struct GNUNET_SERVICE_Client *client = c->client;
+ struct Channel *chn = c->channel;
if (NULL == chn)
{
GNUNET_break (0);
- GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
+ GNUNET_SERVICE_client_drop (client);
return;
}
GNUNET_assert (GNUNET_YES == chn->is_master);
- mst = (struct Master *) chn;
+ struct Master *mst = chn->master;
+
+ struct JoinDecisionClosure jcls;
jcls.is_admitted = ntohl (dcsn->is_admitted);
jcls.msg
- = (sizeof (*dcsn) + sizeof (*jcls.msg) <= ntohs (msg->size))
+ = (sizeof (*dcsn) + sizeof (*jcls.msg) <= ntohs (dcsn->header.size))
? (struct GNUNET_MessageHeader *) &dcsn[1]
: NULL;
@@ -1977,7 +2029,7 @@
GNUNET_CONTAINER_multihashmap_get_multiple (mst->join_reqs, &slave_pub_hash,
&mcast_send_join_decision,
&jcls);
GNUNET_CONTAINER_multihashmap_remove_all (mst->join_reqs, &slave_pub_hash);
- GNUNET_SERVER_receive_done (client, GNUNET_OK);
+ GNUNET_SERVICE_client_continue (client);
}
@@ -1989,15 +2041,14 @@
* @param chn The channel struct for the client.
*/
static void
-send_message_ack (struct Channel *chn, struct GNUNET_SERVER_Client *client)
+send_message_ack (struct Channel *chn, struct GNUNET_SERVICE_Client *client)
{
- struct GNUNET_MessageHeader res;
- res.size = htons (sizeof (res));
- res.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK);
+ struct GNUNET_MessageHeader *res;
+ struct GNUNET_MQ_Envelope *
+ env = GNUNET_MQ_msg (res, GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK);
- /* FIXME */
- GNUNET_SERVER_notification_context_add (nc, client);
- GNUNET_SERVER_notification_context_unicast (nc, client, &res, GNUNET_NO);
+ /* FIXME? */
+ GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client), env);
}
@@ -2093,7 +2144,7 @@
static void
master_transmit_message (struct Master *mst)
{
- struct Channel *chn = &mst->chn;
+ struct Channel *chn = &mst->channel;
struct TransmitMessage *tmit_msg = chn->tmit_head;
if (NULL == tmit_msg)
return;
@@ -2120,13 +2171,13 @@
static void
slave_transmit_message (struct Slave *slv)
{
- if (NULL == slv->chn.tmit_head)
+ if (NULL == slv->channel.tmit_head)
return;
if (NULL == slv->tmit_handle)
{
slv->tmit_handle = (void *) &slv->tmit_handle;
struct GNUNET_MULTICAST_MemberTransmitHandle *
- tmit_handle = GNUNET_MULTICAST_member_to_origin (slv->member,
slv->chn.tmit_head->id,
+ tmit_handle = GNUNET_MULTICAST_member_to_origin (slv->member,
slv->channel.tmit_head->id,
slave_transmit_notify,
slv);
if (NULL != slv->tmit_handle)
slv->tmit_handle = tmit_handle;
@@ -2142,8 +2193,8 @@
transmit_message (struct Channel *chn)
{
chn->is_master
- ? master_transmit_message ((struct Master *) chn)
- : slave_transmit_message ((struct Slave *) chn);
+ ? master_transmit_message (chn->master)
+ : slave_transmit_message (chn->slave);
}
@@ -2226,7 +2277,7 @@
*/
static struct TransmitMessage *
queue_message (struct Channel *chn,
- struct GNUNET_SERVER_Client *client,
+ struct GNUNET_SERVICE_Client *client,
size_t data_size,
const void *data,
uint16_t first_ptype, uint16_t last_ptype)
@@ -2244,8 +2295,8 @@
GNUNET_CONTAINER_DLL_insert_tail (chn->tmit_head, chn->tmit_tail, tmit_msg);
chn->is_master
- ? master_queue_message ((struct Master *) chn, tmit_msg)
- : slave_queue_message ((struct Slave *) chn, tmit_msg);
+ ? master_queue_message (chn->master, tmit_msg)
+ : slave_queue_message (chn->slave, tmit_msg);
return tmit_msg;
}
@@ -2257,7 +2308,7 @@
* @param client Client the message originates from.
*/
static void
-transmit_cancel (struct Channel *chn, struct GNUNET_SERVER_Client *client)
+transmit_cancel (struct Channel *chn, struct GNUNET_SERVICE_Client *client)
{
uint16_t type = GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL;
@@ -2272,16 +2323,30 @@
}
+static int
+check_client_psyc_message (void *cls,
+ const struct GNUNET_MessageHeader *msg)
+{
+ return GNUNET_OK;
+}
+
+
/**
* Incoming message from a master or slave client.
*/
static void
-client_recv_psyc_message (void *cls, struct GNUNET_SERVER_Client *client,
- const struct GNUNET_MessageHeader *msg)
+handle_client_psyc_message (void *cls,
+ const struct GNUNET_MessageHeader *msg)
{
- struct Channel *
- chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
- GNUNET_assert (NULL != chn);
+ struct Client *c = cls;
+ struct GNUNET_SERVICE_Client *client = c->client;
+ struct Channel *chn = c->channel;
+ if (NULL == chn)
+ {
+ GNUNET_break (0);
+ GNUNET_SERVICE_client_drop (client);
+ return;
+ }
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"%p Received message from client.\n", chn);
@@ -2292,7 +2357,7 @@
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
"%p Channel is not ready yet, disconnecting client.\n", chn);
GNUNET_break (0);
- GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
+ GNUNET_SERVICE_client_drop (client);
return;
}
@@ -2306,7 +2371,7 @@
(unsigned int) (size - sizeof (*msg)));
GNUNET_break (0);
transmit_cancel (chn, client);
- GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
+ GNUNET_SERVICE_client_drop (client);
return;
}
@@ -2320,7 +2385,7 @@
"%p Received invalid message part from client.\n", chn);
GNUNET_break (0);
transmit_cancel (chn, client);
- GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
+ GNUNET_SERVICE_client_drop (client);
return;
}
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -2332,7 +2397,7 @@
transmit_message (chn);
/* FIXME: send a few ACKs even before transmit_notify is called */
- GNUNET_SERVER_receive_done (client, GNUNET_OK);
+ GNUNET_SERVICE_client_continue (client);
};
@@ -2348,7 +2413,7 @@
struct Operation *op = cls;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"%p GNUNET_PSYCSTORE_membership_store() returned %" PRId64 "
(%.*s)\n",
- op->chn,
+ op->channel,
result,
(int) err_msg_size,
err_msg);
@@ -2363,16 +2428,19 @@
* Client requests to add/remove a slave in the membership database.
*/
static void
-client_recv_membership_store (void *cls, struct GNUNET_SERVER_Client *client,
- const struct GNUNET_MessageHeader *msg)
+handle_client_membership_store (void *cls,
+ const struct ChannelMembershipStoreRequest
*req)
{
- struct Channel *
- chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
- GNUNET_assert (NULL != chn);
+ struct Client *c = cls;
+ struct GNUNET_SERVICE_Client *client = c->client;
+ struct Channel *chn = c->channel;
+ if (NULL == chn)
+ {
+ GNUNET_break (0);
+ GNUNET_SERVICE_client_drop (client);
+ return;
+ }
- const struct ChannelMembershipStoreRequest *
- req = (const struct ChannelMembershipStoreRequest *) msg;
-
struct Operation *op = op_add (chn, client, req->op_id, 0);
uint64_t announced_at = GNUNET_ntohll (req->announced_at);
@@ -2387,7 +2455,7 @@
req->did_join, announced_at,
effective_since,
0, /* FIXME: group_generation */
&store_recv_membership_store_result, op);
- GNUNET_SERVER_receive_done (client, GNUNET_OK);
+ GNUNET_SERVICE_client_continue (client);
}
@@ -2405,7 +2473,7 @@
{ /* Requesting client already disconnected. */
return GNUNET_NO;
}
- struct Channel *chn = op->chn;
+ struct Channel *chn = op->channel;
struct GNUNET_PSYC_MessageHeader *pmsg;
uint16_t msize = ntohs (mmsg->header.size);
@@ -2447,7 +2515,7 @@
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"%p History replay #%" PRIu64 ": "
"PSYCSTORE returned %" PRId64 " (%.*s)\n",
- op->chn, GNUNET_ntohll (op->op_id), result, err_msg_size,
err_msg);
+ op->channel, GNUNET_ntohll (op->op_id), result, err_msg_size,
err_msg);
if (op->flags & GNUNET_PSYC_HISTORY_REPLAY_REMOTE)
{
@@ -2459,20 +2527,32 @@
}
+static int
+check_client_history_replay (void *cls,
+ const struct GNUNET_PSYC_HistoryRequestMessage
*req)
+{
+ return GNUNET_OK;
+}
+
+
/**
* Client requests channel history.
*/
static void
-client_recv_history_replay (void *cls, struct GNUNET_SERVER_Client *client,
- const struct GNUNET_MessageHeader *msg)
+handle_client_history_replay (void *cls,
+ const struct GNUNET_PSYC_HistoryRequestMessage
*req)
{
- struct Channel *
- chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
- GNUNET_assert (NULL != chn);
+ struct Client *c = cls;
+ struct GNUNET_SERVICE_Client *client = c->client;
+ struct Channel *chn = c->channel;
+ if (NULL == chn)
+ {
+ GNUNET_break (0);
+ GNUNET_SERVICE_client_drop (client);
+ return;
+ }
- const struct GNUNET_PSYC_HistoryRequestMessage *
- req = (const struct GNUNET_PSYC_HistoryRequestMessage *) msg;
- uint16_t size = ntohs (msg->size);
+ uint16_t size = ntohs (req->header.size);
const char *method_prefix = (const char *) &req[1];
if (size < sizeof (*req) + 1
@@ -2486,7 +2566,7 @@
size,
(unsigned int) sizeof (*req) + 1);
GNUNET_break (0);
- GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
+ GNUNET_SERVICE_client_drop (client);
return;
}
@@ -2493,6 +2573,7 @@
struct Operation *op = op_add (chn, client, req->op_id, ntohl (req->flags));
if (0 == req->message_limit)
+ {
GNUNET_PSYCSTORE_message_get (store, &chn->pub_key, NULL,
GNUNET_ntohll (req->start_message_id),
GNUNET_ntohll (req->end_message_id),
@@ -2499,7 +2580,9 @@
0, method_prefix,
&store_recv_fragment_history,
&store_recv_fragment_history_result, op);
+ }
else
+ {
GNUNET_PSYCSTORE_message_get_latest (store, &chn->pub_key, NULL,
GNUNET_ntohll (req->message_limit),
method_prefix,
@@ -2506,8 +2589,8 @@
&store_recv_fragment_history,
&store_recv_fragment_history_result,
op);
-
- GNUNET_SERVER_receive_done (client, GNUNET_OK);
+ }
+ GNUNET_SERVICE_client_continue (client);
}
@@ -2520,18 +2603,19 @@
{
struct Operation *op = cls;
struct GNUNET_OperationResultMessage *res;
+ struct GNUNET_MQ_Envelope *env;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"%p state_get #%" PRIu64 " - received var from PSYCstore: %s\n",
- op->chn, GNUNET_ntohll (op->op_id), name);
+ op->channel, GNUNET_ntohll (op->op_id), name);
if (NULL != name) /* First part */
{
uint16_t name_size = strnlen (name, GNUNET_PSYC_MODIFIER_MAX_PAYLOAD) + 1;
struct GNUNET_PSYC_MessageModifier *mod;
- res = GNUNET_malloc (sizeof (*res) + sizeof (*mod) + name_size +
value_size);
- res->header.size = htons (sizeof (*res) + sizeof (*mod) + name_size +
value_size);
- res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT);
+ env = GNUNET_MQ_msg_extra (res,
+ sizeof (*mod) + name_size + value_size,
+ GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT);
res->op_id = op->op_id;
mod = (struct GNUNET_PSYC_MessageModifier *) &res[1];
@@ -2546,9 +2630,9 @@
else /* Continuation */
{
struct GNUNET_MessageHeader *mod;
- res = GNUNET_malloc (sizeof (*res) + sizeof (*mod) + value_size);
- res->header.size = htons (sizeof (*res) + sizeof (*mod) + value_size);
- res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT);
+ env = GNUNET_MQ_msg_extra (res,
+ sizeof (*mod) + value_size,
+ GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT);
res->op_id = op->op_id;
mod = (struct GNUNET_MessageHeader *) &res[1];
@@ -2558,10 +2642,7 @@
}
// FIXME: client might have been disconnected
- GNUNET_SERVER_notification_context_add (nc, op->client);
- GNUNET_SERVER_notification_context_unicast (nc, op->client, &res->header,
- GNUNET_NO);
- GNUNET_free (res);
+ GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (op->client), env);
return GNUNET_YES;
}
@@ -2578,7 +2659,7 @@
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"%p state_get #%" PRIu64 ": "
"PSYCSTORE returned %" PRId64 " (%.*s)\n",
- op->chn, GNUNET_ntohll (op->op_id), result, err_msg_size,
err_msg);
+ op->channel, GNUNET_ntohll (op->op_id), result, err_msg_size,
err_msg);
// FIXME: client might have been disconnected
client_send_result (op->client, op->op_id, result, err_msg, err_msg_size);
@@ -2586,97 +2667,94 @@
}
-/**
- * Client requests best matching state variable from PSYCstore.
- */
-static void
-client_recv_state_get (void *cls, struct GNUNET_SERVER_Client *client,
- const struct GNUNET_MessageHeader *msg)
+static int
+check_client_state_get (void *cls,
+ const struct StateRequest *req)
{
- struct Channel *
- chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
- GNUNET_assert (NULL != chn);
+ struct Client *c = cls;
+ struct Channel *chn = c->channel;
+ if (NULL == chn)
+ {
+ GNUNET_break (0);
+ return GNUNET_SYSERR;
+ }
- const struct StateRequest *
- req = (const struct StateRequest *) msg;
-
uint16_t name_size = ntohs (req->header.size) - sizeof (*req);
const char *name = (const char *) &req[1];
if (0 == name_size || '\0' != name[name_size - 1])
{
GNUNET_break (0);
- GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
- return;
+ return GNUNET_SYSERR;
}
+ return GNUNET_OK;
+}
+
+
+/**
+ * Client requests best matching state variable from PSYCstore.
+ */
+static void
+handle_client_state_get (void *cls,
+ const struct StateRequest *req)
+{
+ struct Client *c = cls;
+ struct GNUNET_SERVICE_Client *client = c->client;
+ struct Channel *chn = c->channel;
+
+ const char *name = (const char *) &req[1];
struct Operation *op = op_add (chn, client, req->op_id, 0);
GNUNET_PSYCSTORE_state_get (store, &chn->pub_key, name,
&store_recv_state_var,
&store_recv_state_result, op);
- GNUNET_SERVER_receive_done (client, GNUNET_OK);
+ GNUNET_SERVICE_client_continue (client);
}
-/**
- * Client requests state variables with a given prefix from PSYCstore.
- */
-static void
-client_recv_state_get_prefix (void *cls, struct GNUNET_SERVER_Client *client,
- const struct GNUNET_MessageHeader *msg)
+static int
+check_client_state_get_prefix (void *cls,
+ const struct StateRequest *req)
{
- struct Channel *
- chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
- GNUNET_assert (NULL != chn);
+ struct Client *c = cls;
+ struct Channel *chn = c->channel;
+ if (NULL == chn)
+ {
+ GNUNET_break (0);
+ return GNUNET_SYSERR;
+ }
- const struct StateRequest *
- req = (const struct StateRequest *) msg;
-
uint16_t name_size = ntohs (req->header.size) - sizeof (*req);
const char *name = (const char *) &req[1];
if (0 == name_size || '\0' != name[name_size - 1])
{
GNUNET_break (0);
- GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
- return;
+ return GNUNET_SYSERR;
}
+ return GNUNET_OK;
+}
+
+
+/**
+ * Client requests state variables with a given prefix from PSYCstore.
+ */
+static void
+handle_client_state_get_prefix (void *cls,
+ const struct StateRequest *req)
+{
+ struct Client *c = cls;
+ struct GNUNET_SERVICE_Client *client = c->client;
+ struct Channel *chn = c->channel;
+
+ const char *name = (const char *) &req[1];
struct Operation *op = op_add (chn, client, req->op_id, 0);
GNUNET_PSYCSTORE_state_get_prefix (store, &chn->pub_key, name,
&store_recv_state_var,
&store_recv_state_result, op);
- GNUNET_SERVER_receive_done (client, GNUNET_OK);
+ GNUNET_SERVICE_client_continue (client);
}
-static const struct GNUNET_SERVER_MessageHandler server_handlers[] = {
- { &client_recv_master_start, NULL,
- GNUNET_MESSAGE_TYPE_PSYC_MASTER_START, 0 },
-
- { &client_recv_slave_join, NULL,
- GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN, 0 },
-
- { &client_recv_join_decision, NULL,
- GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION, 0 },
-
- { &client_recv_psyc_message, NULL,
- GNUNET_MESSAGE_TYPE_PSYC_MESSAGE, 0 },
-
- { &client_recv_membership_store, NULL,
- GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_MEMBERSHIP_STORE, 0 },
-
- { &client_recv_history_replay, NULL,
- GNUNET_MESSAGE_TYPE_PSYC_HISTORY_REPLAY, 0 },
-
- { &client_recv_state_get, NULL,
- GNUNET_MESSAGE_TYPE_PSYC_STATE_GET, 0 },
-
- { &client_recv_state_get_prefix, NULL,
- GNUNET_MESSAGE_TYPE_PSYC_STATE_GET_PREFIX, 0 },
-
- { NULL, NULL, 0, 0 }
-};
-
-
/**
* Initialize the PSYC service.
*
@@ -2685,10 +2763,12 @@
* @param c Configuration to use.
*/
static void
-run (void *cls, struct GNUNET_SERVER_Handle *server,
- const struct GNUNET_CONFIGURATION_Handle *c)
+run (void *cls,
+ const struct GNUNET_CONFIGURATION_Handle *c,
+ struct GNUNET_SERVICE_Handle *svc)
{
cfg = c;
+ service = svc;
store = GNUNET_PSYCSTORE_connect (cfg);
stats = GNUNET_STATISTICS_create ("psyc", cfg);
masters = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
@@ -2695,27 +2775,51 @@
slaves = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
channel_slaves = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
recv_cache = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
- nc = GNUNET_SERVER_notification_context_create (server, 1);
- GNUNET_SERVER_add_handlers (server, server_handlers);
- GNUNET_SERVER_disconnect_notify (server, &client_disconnect, NULL);
GNUNET_SCHEDULER_add_shutdown (&shutdown_task, NULL);
}
/**
- * The main function for the service.
- *
- * @param argc number of arguments from the command line
- * @param argv command line arguments
- * @return 0 ok, 1 on error
+ * Define "main" method using service macro.
*/
-int
-main (int argc, char *const *argv)
-{
- return (GNUNET_OK ==
- GNUNET_SERVICE_run (argc, argv, "psyc",
- GNUNET_SERVICE_OPTION_NONE,
- &run, NULL)) ? 0 : 1;
-}
+GNUNET_SERVICE_MAIN
+("psyc",
+ GNUNET_SERVICE_OPTION_NONE,
+ run,
+ client_notify_connect,
+ client_notify_disconnect,
+ NULL,
+ GNUNET_MQ_hd_fixed_size (client_master_start,
+ GNUNET_MESSAGE_TYPE_PSYC_MASTER_START,
+ struct MasterStartRequest,
+ NULL),
+ GNUNET_MQ_hd_var_size (client_slave_join,
+ GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN,
+ struct SlaveJoinRequest,
+ NULL),
+ GNUNET_MQ_hd_var_size (client_join_decision,
+ GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION,
+ struct GNUNET_PSYC_JoinDecisionMessage,
+ NULL),
+ GNUNET_MQ_hd_var_size (client_psyc_message,
+ GNUNET_MESSAGE_TYPE_PSYC_MESSAGE,
+ struct GNUNET_MessageHeader,
+ NULL),
+ GNUNET_MQ_hd_fixed_size (client_membership_store,
+ GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_MEMBERSHIP_STORE,
+ struct ChannelMembershipStoreRequest,
+ NULL),
+ GNUNET_MQ_hd_var_size (client_history_replay,
+ GNUNET_MESSAGE_TYPE_PSYC_HISTORY_REPLAY,
+ struct GNUNET_PSYC_HistoryRequestMessage,
+ NULL),
+ GNUNET_MQ_hd_var_size (client_state_get,
+ GNUNET_MESSAGE_TYPE_PSYC_STATE_GET,
+ struct StateRequest,
+ NULL),
+ GNUNET_MQ_hd_var_size (client_state_get_prefix,
+ GNUNET_MESSAGE_TYPE_PSYC_STATE_GET_PREFIX,
+ struct StateRequest,
+ NULL));
/* end of gnunet-service-psyc.c */
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r37995 - gnunet/src/psyc,
gnunet <=