[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r33443 - in gnunet/src: include multicast psyc
From: |
gnunet |
Subject: |
[GNUnet-SVN] r33443 - in gnunet/src: include multicast psyc |
Date: |
Thu, 29 May 2014 18:35:55 +0200 |
Author: tg
Date: 2014-05-29 18:35:55 +0200 (Thu, 29 May 2014)
New Revision: 33443
Modified:
gnunet/src/include/gnunet_multicast_service.h
gnunet/src/include/gnunet_psyc_service.h
gnunet/src/multicast/multicast_api.c
gnunet/src/psyc/gnunet-service-psyc.c
gnunet/src/psyc/psyc_api.c
gnunet/src/psyc/psyc_util_lib.c
gnunet/src/psyc/test_psyc.c
Log:
psyc, multicast: reorg code, use new client manager & psyc util lib
Modified: gnunet/src/include/gnunet_multicast_service.h
===================================================================
--- gnunet/src/include/gnunet_multicast_service.h 2014-05-29 16:35:53 UTC
(rev 33442)
+++ gnunet/src/include/gnunet_multicast_service.h 2014-05-29 16:35:55 UTC
(rev 33443)
@@ -368,9 +368,7 @@
*/
typedef void
(*GNUNET_MULTICAST_RequestCallback) (void *cls,
- const struct GNUNET_CRYPTO_EddsaPublicKey
*member_key,
- const struct GNUNET_MessageHeader *req,
- enum GNUNET_MULTICAST_MessageFlags flags);
+ const struct
GNUNET_MULTICAST_RequestHeader *req);
/**
@@ -394,7 +392,7 @@
*/
typedef void
(*GNUNET_MULTICAST_MessageCallback) (void *cls,
- const struct GNUNET_MessageHeader *msg);
+ const struct
GNUNET_MULTICAST_MessageHeader *msg);
/**
Modified: gnunet/src/include/gnunet_psyc_service.h
===================================================================
--- gnunet/src/include/gnunet_psyc_service.h 2014-05-29 16:35:53 UTC (rev
33442)
+++ gnunet/src/include/gnunet_psyc_service.h 2014-05-29 16:35:55 UTC (rev
33443)
@@ -471,7 +471,7 @@
* Only needed during the first call to this callback at the beginning
* of the modifier. In case of subsequent calls asking for value
* continuations @a oper is set to #NULL.
- * @param[out] value_size Where to write the full size of the value.
+ * @param[out] full_value_size Where to write the full size of the value.
* Only needed during the first call to this callback at the beginning
* of the modifier. In case of subsequent calls asking for value
* continuations @a value_size is set to #NULL.
@@ -489,7 +489,7 @@
uint16_t *data_size,
void *data,
uint8_t *oper,
- uint32_t *value_size);
+ uint32_t *full_value_size);
/**
* Flags for transmitting messages to a channel by the master.
Modified: gnunet/src/multicast/multicast_api.c
===================================================================
--- gnunet/src/multicast/multicast_api.c 2014-05-29 16:35:53 UTC (rev
33442)
+++ gnunet/src/multicast/multicast_api.c 2014-05-29 16:35:55 UTC (rev
33443)
@@ -24,6 +24,7 @@
* @author Christian Grothoff
* @author Gabor X Toth
*/
+
#include "platform.h"
#include "gnunet_util_lib.h"
#include "gnunet_multicast_service.h"
@@ -33,26 +34,6 @@
/**
- * Started origins.
- * Group's pub_key_hash -> struct GNUNET_MULTICAST_Origin
- */
-static struct GNUNET_CONTAINER_MultiHashMap *origins;
-
-/**
- * Joined members.
- * group_key_hash -> struct GNUNET_MULTICAST_Member
- */
-static struct GNUNET_CONTAINER_MultiHashMap *members;
-
-
-struct MessageQueue
-{
- struct MessageQueue *prev;
- struct MessageQueue *next;
-};
-
-
-/**
* Handle for a request to send a message to all multicast group members
* (from the origin).
*/
@@ -90,48 +71,15 @@
const struct GNUNET_CONFIGURATION_Handle *cfg;
/**
- * Socket (if available).
+ * Client connection to the service.
*/
- struct GNUNET_CLIENT_Connection *client;
+ struct GNUNET_CLIENT_MANAGER_Connection *client;
/**
- * Currently pending transmission request, or NULL for none.
- */
- struct GNUNET_CLIENT_TransmitHandle *th;
-
- /**
- * Head of operations to transmit.
- */
- struct MessageQueue *tmit_head;
-
- /**
- * Tail of operations to transmit.
- */
- struct MessageQueue *tmit_tail;
-
- /**
- * Message being transmitted to the Multicast service.
- */
- struct MessageQueue *tmit_msg;
-
- /**
* Message to send on reconnect.
*/
- struct GNUNET_MessageHeader *reconnect_msg;
+ struct GNUNET_MessageHeader *connect_msg;
- /**
- * Task doing exponential back-off trying to reconnect.
- */
- GNUNET_SCHEDULER_TaskIdentifier reconnect_task;
-
- /**
- * Time for next connect retry.
- */
- struct GNUNET_TIME_Relative reconnect_delay;
-
- struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
- struct GNUNET_HashCode pub_key_hash;
-
GNUNET_MULTICAST_JoinRequestCallback join_req_cb;
GNUNET_MULTICAST_MembershipTestCallback member_test_cb;
GNUNET_MULTICAST_ReplayFragmentCallback replay_frag_cb;
@@ -140,11 +88,6 @@
void *cb_cls;
/**
- * Are we polling for incoming messages right now?
- */
- uint8_t in_receive;
-
- /**
* Are we currently transmitting a message?
*/
uint8_t in_transmit;
@@ -163,7 +106,6 @@
{
struct GNUNET_MULTICAST_Group grp;
struct GNUNET_MULTICAST_OriginTransmitHandle tmit;
- struct GNUNET_CRYPTO_EddsaPrivateKey priv_key;
GNUNET_MULTICAST_RequestCallback request_cb;
};
@@ -229,294 +171,125 @@
};
-static void
-reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
-
-
-static void
-reschedule_connect (struct GNUNET_MULTICAST_Group *grp);
-
-
/**
- * Schedule transmission of the next message from our queue.
- *
- * @param grp PSYC channel handle
+ * Send first message to the service after connecting.
*/
static void
-transmit_next (struct GNUNET_MULTICAST_Group *grp);
-
-
-static void
-message_handler (void *cls, const struct GNUNET_MessageHeader *msg);
-
-
-/**
- * Reschedule a connect attempt to the service.
- *
- * @param c channel to reconnect
- */
-static void
-reschedule_connect (struct GNUNET_MULTICAST_Group *grp)
+group_send_connect_msg (struct GNUNET_MULTICAST_Group *grp)
{
- GNUNET_assert (grp->reconnect_task == GNUNET_SCHEDULER_NO_TASK);
-
- if (NULL != grp->th)
- {
- GNUNET_CLIENT_notify_transmit_ready_cancel (grp->th);
- grp->th = NULL;
- }
- if (NULL != grp->client)
- {
- GNUNET_CLIENT_disconnect (grp->client);
- grp->client = NULL;
- }
- grp->in_receive = GNUNET_NO;
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Scheduling task to reconnect to Multicast service in %s.\n",
- GNUNET_STRINGS_relative_time_to_string (grp->reconnect_delay,
GNUNET_YES));
- grp->reconnect_task =
- GNUNET_SCHEDULER_add_delayed (grp->reconnect_delay, &reconnect, grp);
- grp->reconnect_delay = GNUNET_TIME_STD_BACKOFF (grp->reconnect_delay);
+ uint16_t cmsg_size = ntohs (grp->connect_msg->size);
+ struct GNUNET_MessageHeader * cmsg = GNUNET_malloc (cmsg_size);
+ memcpy (cmsg, grp->connect_msg, cmsg_size);
+ GNUNET_CLIENT_MANAGER_transmit_now (grp->client, cmsg);
}
/**
- * Reset stored data related to the last received message.
+ * Got disconnected from service. Reconnect.
*/
static void
-recv_reset (struct GNUNET_MULTICAST_Group *grp)
+group_recv_disconnect (void *cls,
+ struct GNUNET_CLIENT_MANAGER_Connection *client,
+ const struct GNUNET_MessageHeader *msg)
{
+ struct GNUNET_MULTICAST_Group *
+ grp = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*grp));
+ GNUNET_CLIENT_MANAGER_reconnect (client);
+ group_send_connect_msg (grp);
}
-static void
-recv_error (struct GNUNET_MULTICAST_Group *grp)
-{
- if (NULL != grp->message_cb)
- grp->message_cb (grp->cb_cls, NULL);
-
- recv_reset (grp);
-}
-
-
/**
- * Transmit next message to service.
- *
- * @param cls The struct GNUNET_MULTICAST_Group.
- * @param size Number of bytes available in @a buf.
- * @param buf Where to copy the message.
- *
- * @return Number of bytes copied to @a buf.
+ * Receive join request from service.
*/
-static size_t
-send_next_message (void *cls, size_t size, void *buf)
-{
- LOG (GNUNET_ERROR_TYPE_DEBUG, "send_next_message()\n");
- struct GNUNET_MULTICAST_Group *grp = cls;
- struct MessageQueue *mq = grp->tmit_head;
- if (NULL == mq)
- return 0;
- struct GNUNET_MessageHeader *qmsg = (struct GNUNET_MessageHeader *) &mq[1];
- size_t ret = ntohs (qmsg->size);
- grp->th = NULL;
- if (ret > size)
- {
- reschedule_connect (grp);
- return 0;
- }
- memcpy (buf, qmsg, ret);
-
- GNUNET_CONTAINER_DLL_remove (grp->tmit_head, grp->tmit_tail, mq);
- GNUNET_free (mq);
-
- if (NULL != grp->tmit_head)
- transmit_next (grp);
-
- if (GNUNET_NO == grp->in_receive)
- {
- grp->in_receive = GNUNET_YES;
- GNUNET_CLIENT_receive (grp->client, &message_handler, grp,
- GNUNET_TIME_UNIT_FOREVER_REL);
- }
- return ret;
-}
-
-
-/**
- * Schedule transmission of the next message from our queue.
- *
- * @param grp Multicast group handle.
- */
static void
-transmit_next (struct GNUNET_MULTICAST_Group *grp)
+group_recv_join_request (void *cls,
+ struct GNUNET_CLIENT_MANAGER_Connection *client,
+ const struct GNUNET_MessageHeader *msg)
{
- LOG (GNUNET_ERROR_TYPE_DEBUG, "transmit_next()\n");
- if (NULL != grp->th || NULL == grp->client)
- return;
+ struct GNUNET_MULTICAST_Group *
+ grp = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*grp));
- struct MessageQueue *mq = grp->tmit_head;
- if (NULL == mq)
- return;
- struct GNUNET_MessageHeader *qmsg = (struct GNUNET_MessageHeader *) &mq[1];
+ const struct MulticastJoinRequestMessage *
+ jreq = (const struct MulticastJoinRequestMessage *) msg;
- grp->th = GNUNET_CLIENT_notify_transmit_ready (grp->client,
- ntohs (qmsg->size),
- GNUNET_TIME_UNIT_FOREVER_REL,
- GNUNET_NO,
- &send_next_message,
- grp);
-}
+ struct GNUNET_MULTICAST_JoinHandle *jh = GNUNET_malloc (sizeof (*jh));
+ jh->group = grp;
+ jh->member_key = jreq->member_key;
+ jh->member_peer = jreq->member_peer;
+ const struct GNUNET_MessageHeader *jmsg = NULL;
+ if (sizeof (*jreq) + sizeof (*jmsg) <= ntohs (jreq->header.size))
+ jmsg = (const struct GNUNET_MessageHeader *) &jreq[1];
-/**
- * Try again to connect to the Multicast service.
- *
- * @param cls Channel handle.
- * @param tc Scheduler context.
- */
-static void
-reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
-{
- struct GNUNET_MULTICAST_Group *grp = cls;
-
- recv_reset (grp);
- grp->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Connecting to Multicast service.\n");
- GNUNET_assert (NULL == grp->client);
- grp->client = GNUNET_CLIENT_connect ("multicast", grp->cfg);
- GNUNET_assert (NULL != grp->client);
- uint16_t reconn_size = ntohs (grp->reconnect_msg->size);
-
- if (NULL == grp->tmit_head ||
- 0 != memcmp (&grp->tmit_head[1], grp->reconnect_msg, reconn_size))
- {
- struct MessageQueue *mq = GNUNET_malloc (sizeof (*mq) + reconn_size);
- memcpy (&mq[1], grp->reconnect_msg, reconn_size);
- GNUNET_CONTAINER_DLL_insert (grp->tmit_head, grp->tmit_tail, mq);
- }
- transmit_next (grp);
+ if (NULL != grp->join_req_cb)
+ grp->join_req_cb (grp->cb_cls, &jreq->member_key, jmsg, jh);
}
/**
- * Disconnect from the Multicast service.
- *
- * @param g Group handle to disconnect.
+ * Receive multicast message from service.
*/
static void
-disconnect (void *g)
+group_recv_message (void *cls,
+ struct GNUNET_CLIENT_MANAGER_Connection *client,
+ const struct GNUNET_MessageHeader *msg)
{
- struct GNUNET_MULTICAST_Group *grp = g;
+ struct GNUNET_MULTICAST_Group *
+ grp = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*grp));
+ struct GNUNET_MULTICAST_MessageHeader *
+ mmsg = (struct GNUNET_MULTICAST_MessageHeader *) msg;
- GNUNET_assert (NULL != grp);
- if (grp->tmit_head != grp->tmit_tail)
- {
- LOG (GNUNET_ERROR_TYPE_ERROR,
- "Disconnecting while there are still outstanding messages!\n");
- GNUNET_break (0);
- }
- if (grp->reconnect_task != GNUNET_SCHEDULER_NO_TASK)
- {
- GNUNET_SCHEDULER_cancel (grp->reconnect_task);
- grp->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
- }
- if (NULL != grp->th)
- {
- GNUNET_CLIENT_notify_transmit_ready_cancel (grp->th);
- grp->th = NULL;
- }
- if (NULL != grp->client)
- {
- GNUNET_CLIENT_disconnect (grp->client);
- grp->client = NULL;
- }
- if (NULL != grp->reconnect_msg)
- {
- GNUNET_free (grp->reconnect_msg);
- grp->reconnect_msg = NULL;
- }
-}
-
-
-/**
- * Iterator callback for calling message callbacks for all groups.
- */
-static int
-message_cb (void *cls, const struct GNUNET_HashCode *pub_key_hash, void *group)
-{
- const struct GNUNET_MessageHeader *msg = cls;
- struct GNUNET_MULTICAST_Group *grp = group;
-
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Calling message callback with a message "
- "of type %u and size %u.\n",
- ntohs (msg->type), ntohs (msg->size));
+ "Calling message callback with a message of size %u.\n",
+ ntohs (mmsg->header.size));
if (NULL != grp->message_cb)
- grp->message_cb (grp->cb_cls, msg);
-
- return GNUNET_YES;
+ grp->message_cb (grp->cb_cls, mmsg);
}
/**
- * Iterator callback for calling request callbacks of origins.
+ * Origin receives uniquest request from a member.
*/
-static int
-request_cb (void *cls, const struct GNUNET_HashCode *pub_key_hash, void
*origin)
+static void
+origin_recv_request (void *cls,
+ struct GNUNET_CLIENT_MANAGER_Connection *client,
+ const struct GNUNET_MessageHeader *msg)
{
- const struct GNUNET_MULTICAST_RequestHeader *req = cls;
- struct GNUNET_MULTICAST_Origin *orig = origin;
+ struct GNUNET_MULTICAST_Group *grp;
+ struct GNUNET_MULTICAST_Origin *
+ orig = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*grp));
+ grp = &orig->grp;
+ struct GNUNET_MULTICAST_RequestHeader *
+ req = (struct GNUNET_MULTICAST_RequestHeader *) msg;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Calling request callback for a request of type %u and size
%u.\n",
- ntohs (req->header.type), ntohs (req->header.size));
+ "Calling request callback with a request of size %u.\n",
+ ntohs (req->header.size));
if (NULL != orig->request_cb)
- orig->request_cb (orig->grp.cb_cls, &req->member_key,
- (const struct GNUNET_MessageHeader *) req, 0);
- return GNUNET_YES;
+ orig->request_cb (grp->cb_cls, req);
}
/**
- * Iterator callback for calling join request callbacks of origins.
+ * Member receives join decision.
*/
-static int
-join_request_cb (void *cls, const struct GNUNET_HashCode *pub_key_hash,
- void *group)
+static void
+member_recv_join_decision (void *cls,
+ struct GNUNET_CLIENT_MANAGER_Connection *client,
+ const struct GNUNET_MessageHeader *msg)
{
- const struct MulticastJoinRequestMessage *req = cls;
- struct GNUNET_MULTICAST_Group *grp = group;
+ struct GNUNET_MULTICAST_Group *grp;
+ struct GNUNET_MULTICAST_Member *
+ mem = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*grp));
+ grp = &mem->grp;
- struct GNUNET_MULTICAST_JoinHandle *jh = GNUNET_malloc (sizeof (*jh));
- jh->group = grp;
- jh->member_key = req->member_key;
- jh->member_peer = req->member_peer;
-
- const struct GNUNET_MessageHeader *msg = NULL;
- if (sizeof (*req) + sizeof (*msg) <= ntohs (req->header.size))
- msg = (const struct GNUNET_MessageHeader *) &req[1];
-
- if (NULL != grp->join_req_cb)
- grp->join_req_cb (grp->cb_cls, &req->member_key, msg, jh);
- return GNUNET_YES;
-}
-
-
-/**
- * Iterator callback for calling join decision callbacks of members.
- */
-static int
-join_decision_cb (void *cls, const struct GNUNET_HashCode *pub_key_hash,
- void *member)
-{
- const struct MulticastJoinDecisionMessageHeader *hdcsn = cls;
+ const struct MulticastJoinDecisionMessageHeader *
+ hdcsn = (const struct MulticastJoinDecisionMessageHeader *) msg;
const struct MulticastJoinDecisionMessage *
dcsn = (const struct MulticastJoinDecisionMessage *) &hdcsn[1];
- struct GNUNET_MULTICAST_Member *mem = member;
- struct GNUNET_MULTICAST_Group *grp = &mem->grp;
uint16_t dcsn_size = ntohs (dcsn->header.size);
int is_admitted = ntohl (dcsn->is_admitted);
@@ -549,125 +322,62 @@
if (GNUNET_YES != is_admitted)
GNUNET_MULTICAST_member_part (mem);
-
- return GNUNET_YES;
}
+
/**
- * Function called when we receive a message from the service.
- *
- * @param cls struct GNUNET_MULTICAST_Group
- * @param msg Message received, NULL on timeout or fatal error.
+ * Message handlers for an origin.
*/
-static void
-message_handler (void *cls, const struct GNUNET_MessageHeader *msg)
+static struct GNUNET_CLIENT_MANAGER_MessageHandler origin_handlers[] =
{
- struct GNUNET_MULTICAST_Group *grp = cls;
+ { &group_recv_disconnect, NULL, 0, 0, GNUNET_NO },
- if (NULL == msg)
- {
- // timeout / disconnected from service, reconnect
- reschedule_connect (grp);
- return;
- }
+ { &group_recv_message, NULL,
+ GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE,
+ sizeof (struct GNUNET_MULTICAST_MessageHeader), GNUNET_YES },
- uint16_t size_eq = 0;
- uint16_t size_min = 0;
- uint16_t size = ntohs (msg->size);
- uint16_t type = ntohs (msg->type);
+ { &origin_recv_request, NULL,
+ GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST,
+ sizeof (struct GNUNET_MULTICAST_RequestHeader), GNUNET_YES },
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Received message of type %d and size %u from Multicast service\n",
- type, size);
+ { &group_recv_join_request, NULL,
+ GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST,
+ sizeof (struct MulticastJoinRequestMessage), GNUNET_YES },
- switch (type)
- {
- case GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE:
- size_min = sizeof (struct GNUNET_MULTICAST_MessageHeader);
- break;
+ { NULL, NULL, 0, 0, GNUNET_NO }
+};
- case GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST:
- size_min = sizeof (struct GNUNET_MULTICAST_RequestHeader);
- break;
- case GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST:
- size_min = sizeof (struct MulticastJoinRequestMessage);
- break;
+/**
+ * Message handlers for a member.
+ */
+static struct GNUNET_CLIENT_MANAGER_MessageHandler member_handlers[] =
+{
+ { &group_recv_disconnect, NULL, 0, 0, GNUNET_NO },
- case GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION:
- size_min = sizeof (struct MulticastJoinDecisionMessage);
- break;
+ { &group_recv_message, NULL,
+ GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE,
+ sizeof (struct GNUNET_MULTICAST_MessageHeader), GNUNET_YES },
- default:
- GNUNET_break_op (0);
- type = 0;
- }
+ { &group_recv_join_request, NULL,
+ GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST,
+ sizeof (struct MulticastJoinRequestMessage), GNUNET_YES },
- if (! ((0 < size_eq && size == size_eq)
- || (0 < size_min && size_min <= size)))
- {
- GNUNET_break_op (0);
- type = 0;
- }
+ { &member_recv_join_decision, NULL,
+ GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION,
+ sizeof (struct MulticastJoinDecisionMessage), GNUNET_YES },
- switch (type)
- {
- case GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE:
- if (origins != NULL)
- GNUNET_CONTAINER_multihashmap_get_multiple (origins, &grp->pub_key_hash,
- message_cb, (void *) msg);
- if (members != NULL)
- GNUNET_CONTAINER_multihashmap_get_multiple (members, &grp->pub_key_hash,
- message_cb, (void *) msg);
- break;
+ { NULL, NULL, 0, 0, GNUNET_NO }
+};
- case GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST:
- if (GNUNET_YES != grp->is_origin)
- {
- GNUNET_break (0);
- break;
- }
- if (NULL != origins)
- GNUNET_CONTAINER_multihashmap_get_multiple (origins, &grp->pub_key_hash,
- request_cb, (void *) msg);
- break;
- case GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST:
- if (NULL != origins)
- GNUNET_CONTAINER_multihashmap_get_multiple (origins, &grp->pub_key_hash,
- join_request_cb, (void *)
msg);
- if (NULL != members)
- GNUNET_CONTAINER_multihashmap_get_multiple (members, &grp->pub_key_hash,
- join_request_cb, (void *)
msg);
- break;
-
- case GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION:
- if (GNUNET_NO != grp->is_origin)
- {
- GNUNET_break (0);
- break;
- }
- if (NULL != members)
- GNUNET_CONTAINER_multihashmap_get_multiple (members, &grp->pub_key_hash,
- join_decision_cb, (void *)
msg);
- break;
- }
-
- if (NULL != grp->client)
- {
- GNUNET_CLIENT_receive (grp->client, &message_handler, grp,
- GNUNET_TIME_UNIT_FOREVER_REL);
- }
-}
-
-
/**
* Function to call with the decision made for a join request.
*
* Must be called once and only once in response to an invocation of the
* #GNUNET_MULTICAST_JoinRequestCallback.
*
- * @param jh Join request handle.
+ * @param join Join request handle.
* @param is_admitted #GNUNET_YES if the join is approved,
* #GNUNET_NO if it is disapproved,
* #GNUNET_SYSERR if we cannot answer the request.
@@ -685,27 +395,25 @@
* peer that issued the request even if admission is denied.
*/
struct GNUNET_MULTICAST_ReplayHandle *
-GNUNET_MULTICAST_join_decision (struct GNUNET_MULTICAST_JoinHandle *jh,
+GNUNET_MULTICAST_join_decision (struct GNUNET_MULTICAST_JoinHandle *join,
int is_admitted,
uint16_t relay_count,
const struct GNUNET_PeerIdentity *relays,
const struct GNUNET_MessageHeader *join_resp)
{
- struct GNUNET_MULTICAST_Group *grp = jh->group;
+ struct GNUNET_MULTICAST_Group *grp = join->group;
uint16_t join_resp_size = (NULL != join_resp) ? ntohs (join_resp->size) : 0;
uint16_t relay_size = relay_count * sizeof (*relays);
+
struct MulticastJoinDecisionMessageHeader * hdcsn;
struct MulticastJoinDecisionMessage *dcsn;
- struct MessageQueue *
- mq = GNUNET_malloc (sizeof (*mq) + sizeof (*hdcsn) + sizeof (*dcsn)
- + relay_size + join_resp_size);
-
- hdcsn = (struct MulticastJoinDecisionMessageHeader *) &mq[1];
+ hdcsn = GNUNET_malloc (sizeof (*hdcsn) + sizeof (*dcsn)
+ + relay_size + join_resp_size);
+ hdcsn->header.size = htons (sizeof (*hdcsn) + sizeof (*dcsn)
+ + relay_size + join_resp_size);
hdcsn->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION);
- hdcsn->header.size = htons (sizeof (*hdcsn) + sizeof (*dcsn)
- + relay_size + join_resp_size);
- hdcsn->member_key = jh->member_key;
- hdcsn->peer = jh->member_peer;
+ hdcsn->member_key = join->member_key;
+ hdcsn->peer = join->member_peer;
dcsn = (struct MulticastJoinDecisionMessage *) &hdcsn[1];
dcsn->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION);
@@ -717,10 +425,8 @@
if (0 < join_resp_size)
memcpy (((char *) &dcsn[1]) + relay_size, join_resp, join_resp_size);
- GNUNET_CONTAINER_DLL_insert_tail (grp->tmit_head, grp->tmit_tail, mq);
- transmit_next (grp);
-
- GNUNET_free (jh);
+ GNUNET_CLIENT_MANAGER_transmit (grp->client, &hdcsn->header);
+ GNUNET_free (join);
return NULL;
}
@@ -832,7 +538,7 @@
start->max_fragment_id = max_fragment_id;
memcpy (&start->group_key, priv_key, sizeof (*priv_key));
- grp->reconnect_msg = (struct GNUNET_MessageHeader *) start;
+ grp->connect_msg = (struct GNUNET_MessageHeader *) start;
grp->is_origin = GNUNET_YES;
grp->cfg = cfg;
@@ -844,21 +550,11 @@
grp->message_cb = message_cb;
orig->request_cb = request_cb;
- orig->priv_key = *priv_key;
- GNUNET_CRYPTO_eddsa_key_get_public (&orig->priv_key, &grp->pub_key);
- GNUNET_CRYPTO_hash (&grp->pub_key, sizeof (grp->pub_key),
- &grp->pub_key_hash);
+ grp->client = GNUNET_CLIENT_MANAGER_connect (cfg, "multicast",
origin_handlers);
+ GNUNET_CLIENT_MANAGER_set_user_context_ (grp->client, orig, sizeof (*grp));
+ group_send_connect_msg (grp);
- if (NULL == origins)
- origins = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
-
- GNUNET_CONTAINER_multihashmap_put (origins, &grp->pub_key_hash, orig,
-
GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
-
- grp->reconnect_delay = GNUNET_TIME_UNIT_ZERO;
- grp->reconnect_task = GNUNET_SCHEDULER_add_now (&reconnect, grp);
-
return orig;
}
@@ -871,8 +567,7 @@
void
GNUNET_MULTICAST_origin_stop (struct GNUNET_MULTICAST_Origin *orig)
{
- disconnect (&orig->grp);
- GNUNET_CONTAINER_multihashmap_remove (origins, &orig->grp.pub_key_hash,
orig);
+ GNUNET_CLIENT_MANAGER_disconnect (orig->grp.client, GNUNET_YES);
GNUNET_free (orig);
}
@@ -885,26 +580,22 @@
struct GNUNET_MULTICAST_OriginTransmitHandle *tmit = &orig->tmit;
size_t buf_size = GNUNET_MULTICAST_FRAGMENT_MAX_SIZE;
- struct MessageQueue *mq = GNUNET_malloc (sizeof (*mq) + buf_size);
- GNUNET_CONTAINER_DLL_insert_tail (grp->tmit_head, grp->tmit_tail, mq);
-
- struct GNUNET_MULTICAST_MessageHeader *
- msg = (struct GNUNET_MULTICAST_MessageHeader *) &mq[1];
+ struct GNUNET_MULTICAST_MessageHeader *msg = GNUNET_malloc (buf_size);
int ret = tmit->notify (tmit->notify_cls, &buf_size, &msg[1]);
if (! (GNUNET_YES == ret || GNUNET_NO == ret)
- || GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < buf_size)
+ || GNUNET_MULTICAST_FRAGMENT_MAX_SIZE < buf_size)
{
LOG (GNUNET_ERROR_TYPE_ERROR,
"OriginTransmitNotify() returned error or invalid message size.\n");
/* FIXME: handle error */
- GNUNET_free (mq);
+ GNUNET_free (msg);
return;
}
if (GNUNET_NO == ret && 0 == buf_size)
{
- GNUNET_free (mq);
+ GNUNET_free (msg);
return; /* Transmission paused. */
}
@@ -915,7 +606,7 @@
msg->fragment_offset = GNUNET_htonll (tmit->fragment_offset);
tmit->fragment_offset += sizeof (*msg) + buf_size;
- transmit_next (grp);
+ GNUNET_CLIENT_MANAGER_transmit (grp->client, &msg->header);
}
@@ -939,6 +630,12 @@
GNUNET_MULTICAST_OriginTransmitNotify notify,
void *notify_cls)
{
+/* FIXME
+ if (GNUNET_YES == orig->grp.in_transmit)
+ return NULL;
+ orig->grp.in_transmit = GNUNET_YES;
+*/
+
struct GNUNET_MULTICAST_OriginTransmitHandle *tmit = &orig->tmit;
tmit->origin = orig;
tmit->message_id = message_id;
@@ -1047,10 +744,9 @@
if (0 < join_msg_size)
memcpy (((char *) &join[1]) + relay_size, join_msg, join_msg_size);
- grp->reconnect_msg = (struct GNUNET_MessageHeader *) join;
+ grp->connect_msg = (struct GNUNET_MessageHeader *) join;
grp->is_origin = GNUNET_NO;
grp->cfg = cfg;
- grp->pub_key = *group_key;
mem->join_dcsn_cb = join_decision_cb;
grp->join_req_cb = join_request_cb;
@@ -1059,18 +755,10 @@
grp->message_cb = message_cb;
grp->cb_cls = cls;
- GNUNET_CRYPTO_eddsa_key_get_public (member_key, &grp->pub_key);
- GNUNET_CRYPTO_hash (&grp->pub_key, sizeof (grp->pub_key),
&grp->pub_key_hash);
+ grp->client = GNUNET_CLIENT_MANAGER_connect (cfg, "multicast",
member_handlers);
+ GNUNET_CLIENT_MANAGER_set_user_context_ (grp->client, mem, sizeof (*grp));
+ group_send_connect_msg (grp);
- if (NULL == members)
- members = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
-
- GNUNET_CONTAINER_multihashmap_put (members, &grp->pub_key_hash, mem,
-
GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
-
- grp->reconnect_delay = GNUNET_TIME_UNIT_ZERO;
- grp->reconnect_task = GNUNET_SCHEDULER_add_now (&reconnect, grp);
-
return mem;
}
@@ -1088,8 +776,7 @@
void
GNUNET_MULTICAST_member_part (struct GNUNET_MULTICAST_Member *mem)
{
- disconnect (&mem->grp);
- GNUNET_CONTAINER_multihashmap_remove (members, &mem->grp.pub_key_hash, mem);
+ GNUNET_CLIENT_MANAGER_disconnect (mem->grp.client, GNUNET_YES);
GNUNET_free (mem);
}
@@ -1162,25 +849,26 @@
struct GNUNET_MULTICAST_Group *grp = &mem->grp;
struct GNUNET_MULTICAST_MemberTransmitHandle *tmit = &mem->tmit;
- size_t buf_size = GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD;
- struct MessageQueue *mq = GNUNET_malloc (sizeof (*mq) + buf_size);
- GNUNET_CONTAINER_DLL_insert_tail (grp->tmit_head, grp->tmit_tail, mq);
-
- struct GNUNET_MULTICAST_RequestHeader *
- req = (struct GNUNET_MULTICAST_RequestHeader *) &mq[1];
+ size_t buf_size = GNUNET_MULTICAST_FRAGMENT_MAX_SIZE;
+ struct GNUNET_MULTICAST_RequestHeader *req = GNUNET_malloc (buf_size);
int ret = tmit->notify (tmit->notify_cls, &buf_size, &req[1]);
if (! (GNUNET_YES == ret || GNUNET_NO == ret)
- || GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < buf_size)
+ || GNUNET_MULTICAST_FRAGMENT_MAX_SIZE < buf_size)
{
LOG (GNUNET_ERROR_TYPE_ERROR,
"MemberTransmitNotify() returned error or invalid message size.\n");
/* FIXME: handle error */
+ GNUNET_free (req);
return;
}
if (GNUNET_NO == ret && 0 == buf_size)
- return; /* Transmission paused. */
+ {
+ /* Transmission paused. */
+ GNUNET_free (req);
+ return;
+ }
req->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST);
req->header.size = htons (sizeof (*req) + buf_size);
@@ -1188,7 +876,7 @@
req->fragment_offset = GNUNET_ntohll (tmit->fragment_offset);
tmit->fragment_offset += sizeof (*req) + buf_size;
- transmit_next (grp);
+ GNUNET_CLIENT_MANAGER_transmit (grp->client, &req->header);
}
@@ -1207,6 +895,12 @@
GNUNET_MULTICAST_MemberTransmitNotify
notify,
void *notify_cls)
{
+/* FIXME
+ if (GNUNET_YES == mem->grp.in_transmit)
+ return NULL;
+ mem->grp.in_transmit = GNUNET_YES;
+*/
+
struct GNUNET_MULTICAST_MemberTransmitHandle *tmit = &mem->tmit;
tmit->member = mem;
tmit->request_id = request_id;
Modified: gnunet/src/psyc/gnunet-service-psyc.c
===================================================================
--- gnunet/src/psyc/gnunet-service-psyc.c 2014-05-29 16:35:53 UTC (rev
33442)
+++ gnunet/src/psyc/gnunet-service-psyc.c 2014-05-29 16:35:55 UTC (rev
33443)
@@ -34,6 +34,7 @@
#include "gnunet_multicast_service.h"
#include "gnunet_psycstore_service.h"
#include "gnunet_psyc_service.h"
+#include "gnunet_psyc_util_lib.h"
#include "psyc.h"
@@ -174,10 +175,10 @@
/**
* List of connected clients.
*/
-struct ClientList
+struct ClientListItem
{
- struct ClientList *prev;
- struct ClientList *next;
+ struct ClientListItem *prev;
+ struct ClientListItem *next;
struct GNUNET_SERVER_Client *client;
};
@@ -187,8 +188,8 @@
*/
struct Channel
{
- struct ClientList *clients_head;
- struct ClientList *clients_tail;
+ struct ClientListItem *clients_head;
+ struct ClientListItem *clients_tail;
struct TransmitMessage *tmit_head;
struct TransmitMessage *tmit_tail;
@@ -282,7 +283,7 @@
/**
* Channel struct common for Master and Slave
*/
- struct Channel ch;
+ struct Channel chn;
/**
* Private key of the channel.
@@ -339,7 +340,7 @@
/**
* Channel struct common for Master and Slave
*/
- struct Channel ch;
+ struct Channel chn;
/**
* Private key of the slave.
@@ -399,11 +400,11 @@
static inline void
-transmit_message (struct Channel *ch);
+transmit_message (struct Channel *chn);
static uint64_t
-message_queue_drop (struct Channel *ch);
+message_queue_drop (struct Channel *chn);
/**
@@ -434,12 +435,12 @@
static void
cleanup_master (struct Master *mst)
{
- struct Channel *ch = &mst->ch;
+ struct Channel *chn = &mst->chn;
if (NULL != mst->origin)
GNUNET_MULTICAST_origin_stop (mst->origin);
GNUNET_CONTAINER_multihashmap_destroy (mst->join_reqs);
- GNUNET_CONTAINER_multihashmap_remove (masters, &ch->pub_key_hash, ch);
+ GNUNET_CONTAINER_multihashmap_remove (masters, &chn->pub_key_hash, chn);
}
@@ -449,20 +450,20 @@
static void
cleanup_slave (struct Slave *slv)
{
- struct Channel *ch = &slv->ch;
+ struct Channel *chn = &slv->chn;
struct GNUNET_CONTAINER_MultiHashMap *
- ch_slv = GNUNET_CONTAINER_multihashmap_get (channel_slaves,
- &ch->pub_key_hash);
- GNUNET_assert (NULL != ch_slv);
- GNUNET_CONTAINER_multihashmap_remove (ch_slv, &slv->pub_key_hash, slv);
+ chn_slv = GNUNET_CONTAINER_multihashmap_get (channel_slaves,
+ &chn->pub_key_hash);
+ GNUNET_assert (NULL != chn_slv);
+ GNUNET_CONTAINER_multihashmap_remove (chn_slv, &slv->pub_key_hash, slv);
- if (0 == GNUNET_CONTAINER_multihashmap_size (ch_slv))
+ if (0 == GNUNET_CONTAINER_multihashmap_size (chn_slv))
{
- GNUNET_CONTAINER_multihashmap_remove (channel_slaves, &ch->pub_key_hash,
- ch_slv);
- GNUNET_CONTAINER_multihashmap_destroy (ch_slv);
+ GNUNET_CONTAINER_multihashmap_remove (channel_slaves, &chn->pub_key_hash,
+ chn_slv);
+ GNUNET_CONTAINER_multihashmap_destroy (chn_slv);
}
- GNUNET_CONTAINER_multihashmap_remove (slaves, &ch->pub_key_hash, slv);
+ GNUNET_CONTAINER_multihashmap_remove (slaves, &chn->pub_key_hash, slv);
if (NULL != slv->join_req)
GNUNET_free (slv->join_req);
@@ -470,7 +471,7 @@
GNUNET_free (slv->relays);
if (NULL != slv->member)
GNUNET_MULTICAST_member_part (slv->member);
- GNUNET_CONTAINER_multihashmap_remove (slaves, &ch->pub_key_hash, ch);
+ GNUNET_CONTAINER_multihashmap_remove (slaves, &chn->pub_key_hash, chn);
}
@@ -478,18 +479,18 @@
* Clean up channel data structures after a client disconnected.
*/
static void
-cleanup_channel (struct Channel *ch)
+cleanup_channel (struct Channel *chn)
{
- message_queue_drop (ch);
- GNUNET_CONTAINER_multihashmap_remove_all (recv_cache, &ch->pub_key_hash);
+ message_queue_drop (chn);
+ GNUNET_CONTAINER_multihashmap_remove_all (recv_cache, &chn->pub_key_hash);
- if (NULL != ch->store_op)
- GNUNET_PSYCSTORE_operation_cancel (ch->store_op);
+ if (NULL != chn->store_op)
+ GNUNET_PSYCSTORE_operation_cancel (chn->store_op);
- (GNUNET_YES == ch->is_master)
- ? cleanup_master ((struct Master *) ch)
- : cleanup_slave ((struct Slave *) ch);
- GNUNET_free (ch);
+ (GNUNET_YES == chn->is_master)
+ ? cleanup_master ((struct Master *) chn)
+ : cleanup_slave ((struct Slave *) chn);
+ GNUNET_free (chn);
}
@@ -507,41 +508,41 @@
return;
struct Channel *
- ch = GNUNET_SERVER_client_get_user_context (client, struct Channel);
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%p Client (%s) disconnected from channel %s\n",
- ch, (GNUNET_YES == ch->is_master) ? "master" : "slave",
- GNUNET_h2s (&ch->pub_key_hash));
+ chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
- if (NULL == ch)
+ if (NULL == chn)
{
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
- "%p User context is NULL in client_disconnect()\n", ch);
- GNUNET_break (0);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "%p User context is NULL in client_disconnect()\n", chn);
return;
}
- struct ClientList *cl = ch->clients_head;
- while (NULL != cl)
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "%p Client (%s) disconnected from channel %s\n",
+ chn, (GNUNET_YES == chn->is_master) ? "master" : "slave",
+ GNUNET_h2s (&chn->pub_key_hash));
+
+ struct ClientListItem *cli = chn->clients_head;
+ while (NULL != cli)
{
- if (cl->client == client)
+ if (cli->client == client)
{
- GNUNET_CONTAINER_DLL_remove (ch->clients_head, ch->clients_tail, cl);
- GNUNET_free (cl);
+ GNUNET_CONTAINER_DLL_remove (chn->clients_head, chn->clients_tail, cli);
+ GNUNET_free (cli);
break;
}
- cl = cl->next;
+ cli = cli->next;
}
- if (NULL == ch->clients_head)
+ if (NULL == chn->clients_head)
{ /* Last client disconnected. */
- if (NULL != ch->tmit_head)
+ if (NULL != chn->tmit_head)
{ /* Send pending messages to multicast before cleanup. */
- transmit_message (ch);
+ transmit_message (chn);
}
else
{
- cleanup_channel (ch);
+ cleanup_channel (chn);
}
}
}
@@ -551,18 +552,18 @@
* Send message to all clients connected to the channel.
*/
static void
-msg_to_clients (const struct Channel *ch,
- const struct GNUNET_MessageHeader *msg)
+client_send_msg (const struct Channel *chn,
+ const struct GNUNET_MessageHeader *msg)
{
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- "%p Sending message to clients.\n", ch);
+ "%p Sending message to clients.\n", chn);
- struct ClientList *cl = ch->clients_head;
- while (NULL != cl)
+ struct ClientListItem *cli = chn->clients_head;
+ while (NULL != cli)
{
- GNUNET_SERVER_notification_context_add (nc, cl->client);
- GNUNET_SERVER_notification_context_unicast (nc, cl->client, msg,
GNUNET_NO);
- cl = cl->next;
+ GNUNET_SERVER_notification_context_add (nc, cli->client);
+ GNUNET_SERVER_notification_context_unicast (nc, cli->client, msg,
GNUNET_NO);
+ cli = cli->next;
}
}
@@ -573,7 +574,7 @@
struct JoinMemTestClosure
{
struct GNUNET_CRYPTO_EddsaPublicKey slave_key;
- struct Channel *ch;
+ struct Channel *chn;
struct GNUNET_MULTICAST_JoinHandle *jh;
struct MasterJoinRequest *master_join_req;
};
@@ -587,15 +588,15 @@
{
struct JoinMemTestClosure *jcls = cls;
- if (GNUNET_NO == result && GNUNET_YES == jcls->ch->is_master)
+ if (GNUNET_NO == result && GNUNET_YES == jcls->chn->is_master)
{ /* Pass on join request to client if this is a master channel */
- struct Master *mst = (struct Master *) jcls->ch;
+ struct Master *mst = (struct Master *) jcls->chn;
struct GNUNET_HashCode slave_key_hash;
GNUNET_CRYPTO_hash (&jcls->slave_key, sizeof (jcls->slave_key),
&slave_key_hash);
GNUNET_CONTAINER_multihashmap_put (mst->join_reqs, &slave_key_hash,
jcls->jh,
GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
- msg_to_clients (jcls->ch, &jcls->master_join_req->header);
+ client_send_msg (jcls->chn, &jcls->master_join_req->header);
}
else
{
@@ -611,13 +612,13 @@
* Incoming join request from multicast.
*/
static void
-mcast_join_request_cb (void *cls,
- const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key,
- const struct GNUNET_MessageHeader *join_msg,
- struct GNUNET_MULTICAST_JoinHandle *jh)
+mcast_recv_join_request (void *cls,
+ const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key,
+ const struct GNUNET_MessageHeader *join_msg,
+ struct GNUNET_MULTICAST_JoinHandle *jh)
{
- struct Channel *ch = cls;
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Got join request.\n", ch);
+ struct Channel *chn = cls;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Got join request.\n", chn);
uint16_t join_msg_size = 0;
if (NULL != join_msg)
@@ -630,7 +631,7 @@
{
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"%p Got join message with invalid type %u.\n",
- ch, ntohs (join_msg->type));
+ chn, ntohs (join_msg->type));
}
}
@@ -643,12 +644,12 @@
struct JoinMemTestClosure *jcls = GNUNET_malloc (sizeof (*jcls));
jcls->slave_key = *slave_key;
- jcls->ch = ch;
+ jcls->chn = chn;
jcls->jh = jh;
jcls->master_join_req = req;
- GNUNET_PSYCSTORE_membership_test (store, &ch->pub_key, slave_key,
- ch->max_message_id, 0,
+ GNUNET_PSYCSTORE_membership_test (store, &chn->pub_key, slave_key,
+ chn->max_message_id, 0,
&join_mem_test_cb, jcls);
}
@@ -657,14 +658,14 @@
* Join decision received from multicast.
*/
static void
-mcast_join_decision_cb (void *cls, int is_admitted,
+mcast_recv_join_decision (void *cls, int is_admitted,
const struct GNUNET_PeerIdentity *peer,
uint16_t relay_count,
const struct GNUNET_PeerIdentity *relays,
const struct GNUNET_MessageHeader *join_resp)
{
struct Slave *slv = cls;
- struct Channel *ch = &slv->ch;
+ struct Channel *chn = &slv->chn;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"%p Got join decision: %d\n", slv, is_admitted);
@@ -677,11 +678,11 @@
if (0 < join_resp_size)
memcpy (&dcsn[1], join_resp, join_resp_size);
- msg_to_clients (ch, &dcsn->header);
+ client_send_msg (chn, &dcsn->header);
if (GNUNET_YES == is_admitted)
{
- ch->ready = GNUNET_YES;
+ chn->ready = GNUNET_YES;
}
else
{
@@ -691,20 +692,20 @@
static void
-mcast_membership_test_cb (void *cls,
- const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key,
- uint64_t message_id, uint64_t group_generation,
- struct GNUNET_MULTICAST_MembershipTestHandle *mth)
+mcast_recv_membership_test (void *cls,
+ const struct GNUNET_CRYPTO_EddsaPublicKey
*slave_key,
+ uint64_t message_id, uint64_t group_generation,
+ struct GNUNET_MULTICAST_MembershipTestHandle *mth)
{
}
static void
-mcast_replay_fragment_cb (void *cls,
- const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key,
- uint64_t fragment_id, uint64_t flags,
- struct GNUNET_MULTICAST_ReplayHandle *rh)
+mcast_recv_replay_fragment (void *cls,
+ const struct GNUNET_CRYPTO_EddsaPublicKey
*slave_key,
+ uint64_t fragment_id, uint64_t flags,
+ struct GNUNET_MULTICAST_ReplayHandle *rh)
{
@@ -712,25 +713,17 @@
static void
-mcast_replay_message_cb (void *cls,
- const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key,
- uint64_t message_id,
- uint64_t fragment_offset,
- uint64_t flags,
- struct GNUNET_MULTICAST_ReplayHandle *rh)
+mcast_recv_replay_message (void *cls,
+ const struct GNUNET_CRYPTO_EddsaPublicKey
*slave_key,
+ uint64_t message_id,
+ uint64_t fragment_offset,
+ uint64_t flags,
+ struct GNUNET_MULTICAST_ReplayHandle *rh)
{
}
-static void
-fragment_store_result (void *cls, int64_t result, const char *err_msg)
-{
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "fragment_store() returned %l (%s)\n", result, err_msg);
-}
-
-
/**
* Convert an uint64_t in network byte order to a HashCode
* that can be used as key in a MultiHashMap
@@ -772,17 +765,17 @@
* Send multicast message to all clients connected to the channel.
*/
static void
-mmsg_to_clients (struct Channel *ch,
- const struct GNUNET_MULTICAST_MessageHeader *mmsg)
+client_send_mcast_msg (struct Channel *chn,
+ const struct GNUNET_MULTICAST_MessageHeader *mmsg)
{
+ struct GNUNET_PSYC_MessageHeader *pmsg;
uint16_t size = ntohs (mmsg->header.size);
- struct GNUNET_PSYC_MessageHeader *pmsg;
uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%p Sending message to client. "
+ "%p Sending multicast message to client. "
"fragment_id: %" PRIu64 ", message_id: %" PRIu64 "\n",
- ch, GNUNET_ntohll (mmsg->fragment_id),
+ chn, GNUNET_ntohll (mmsg->fragment_id),
GNUNET_ntohll (mmsg->message_id));
pmsg = GNUNET_malloc (psize);
@@ -791,22 +784,53 @@
pmsg->message_id = mmsg->message_id;
memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg));
- msg_to_clients (ch, &pmsg->header);
+ client_send_msg (chn, &pmsg->header);
GNUNET_free (pmsg);
}
/**
+ * Send multicast request to all clients connected to the channel.
+ */
+static void
+client_send_mcast_req (struct Master *mst,
+ const struct GNUNET_MULTICAST_RequestHeader *req)
+{
+ struct Channel *chn = &mst->chn;
+
+ struct GNUNET_PSYC_MessageHeader *pmsg;
+ uint16_t size = ntohs (req->header.size);
+ uint16_t psize = sizeof (*pmsg) + size - sizeof (*req);
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "%p Sending multicast request to client. "
+ "fragment_id: %" PRIu64 ", message_id: %" PRIu64 "\n",
+ chn, GNUNET_ntohll (req->fragment_id),
+ GNUNET_ntohll (req->request_id));
+
+ pmsg = GNUNET_malloc (psize);
+ pmsg->header.size = htons (psize);
+ pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
+ pmsg->message_id = req->request_id;
+ pmsg->flags = htonl (GNUNET_PSYC_MESSAGE_REQUEST);
+
+ memcpy (&pmsg[1], &req[1], size - sizeof (*req));
+ client_send_msg (chn, &pmsg->header);
+ GNUNET_free (pmsg);
+}
+
+
+/**
* Insert a multicast message fragment into the queue belonging to the message.
*
- * @param ch Channel.
+ * @param chn Channel.
* @param mmsg Multicast message fragment.
* @param msg_id_hash Message ID of @a mmsg in a struct GNUNET_HashCode.
* @param first_ptype First PSYC message part type in @a mmsg.
* @param last_ptype Last PSYC message part type in @a mmsg.
*/
static void
-fragment_queue_insert (struct Channel *ch,
+fragment_queue_insert (struct Channel *chn,
const struct GNUNET_MULTICAST_MessageHeader *mmsg,
uint16_t first_ptype, uint16_t last_ptype)
{
@@ -814,13 +838,13 @@
const uint64_t frag_offset = GNUNET_ntohll (mmsg->fragment_offset);
struct GNUNET_CONTAINER_MultiHashMap
*chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache,
- &ch->pub_key_hash);
+ &chn->pub_key_hash);
struct GNUNET_HashCode msg_id_hash;
hash_key_from_nll (&msg_id_hash, mmsg->message_id);
struct FragmentQueue
- *fragq = GNUNET_CONTAINER_multihashmap_get (ch->recv_frags, &msg_id_hash);
+ *fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &msg_id_hash);
if (NULL == fragq)
{
@@ -829,13 +853,13 @@
fragq->fragments
= GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
- GNUNET_CONTAINER_multihashmap_put (ch->recv_frags, &msg_id_hash, fragq,
+ GNUNET_CONTAINER_multihashmap_put (chn->recv_frags, &msg_id_hash, fragq,
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
if (NULL == chan_msgs)
{
chan_msgs = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
- GNUNET_CONTAINER_multihashmap_put (recv_cache, &ch->pub_key_hash,
chan_msgs,
+ GNUNET_CONTAINER_multihashmap_put (recv_cache, &chn->pub_key_hash,
chan_msgs,
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
}
}
@@ -850,7 +874,7 @@
"%p Adding message fragment to cache. "
"message_id: %" PRIu64 ", fragment_id: %" PRIu64 ", "
"header_size: %" PRIu64 " + %u).\n",
- ch, GNUNET_ntohll (mmsg->message_id),
+ chn, GNUNET_ntohll (mmsg->message_id),
GNUNET_ntohll (mmsg->fragment_id),
fragq->header_size, size);
cache_entry = GNUNET_new (struct RecvCacheEntry);
@@ -867,7 +891,7 @@
"%p Message fragment is already in cache. "
"message_id: %" PRIu64 ", fragment_id: %" PRIu64
", ref_count: %u\n",
- ch, GNUNET_ntohll (mmsg->message_id),
+ chn, GNUNET_ntohll (mmsg->message_id),
GNUNET_ntohll (mmsg->fragment_id), cache_entry->ref_count);
}
@@ -890,11 +914,11 @@
{ /* header is now complete */
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
"%p Header of message %" PRIu64 " is complete.\n",
- ch, GNUNET_ntohll (mmsg->message_id));
+ chn, GNUNET_ntohll (mmsg->message_id));
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
"%p Adding message %" PRIu64 " to queue.\n",
- ch, GNUNET_ntohll (mmsg->message_id));
+ chn, GNUNET_ntohll (mmsg->message_id));
fragq->state = MSG_FRAG_STATE_DATA;
}
else
@@ -902,7 +926,7 @@
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
"%p Header of message %" PRIu64 " is NOT complete yet: "
"%" PRIu64 " != %" PRIu64 "\n",
- ch, GNUNET_ntohll (mmsg->message_id), frag_offset,
+ chn, GNUNET_ntohll (mmsg->message_id), frag_offset,
fragq->header_size);
}
}
@@ -916,7 +940,7 @@
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
"%p Message %" PRIu64 " is NOT complete yet: "
"%" PRIu64 " != %" PRIu64 "\n",
- ch, GNUNET_ntohll (mmsg->message_id), frag_offset,
+ chn, GNUNET_ntohll (mmsg->message_id), frag_offset,
fragq->size);
break;
@@ -935,7 +959,7 @@
case MSG_FRAG_STATE_CANCEL:
if (GNUNET_NO == fragq->queued)
{
- GNUNET_CONTAINER_heap_insert (ch->recv_msgs, NULL,
+ GNUNET_CONTAINER_heap_insert (chn->recv_msgs, NULL,
GNUNET_ntohll (mmsg->message_id));
fragq->queued = GNUNET_YES;
}
@@ -953,24 +977,24 @@
* Send fragments of a message in order to client, after all modifiers arrived
* from multicast.
*
- * @param ch Channel.
+ * @param chn Channel.
* @param msg_id ID of the message @a fragq belongs to.
* @param fragq Fragment queue of the message.
* @param drop Drop message without delivering to client?
* #GNUNET_YES or #GNUNET_NO.
*/
static void
-fragment_queue_run (struct Channel *ch, uint64_t msg_id,
+fragment_queue_run (struct Channel *chn, uint64_t msg_id,
struct FragmentQueue *fragq, uint8_t drop)
{
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
"%p Running message fragment queue for message %" PRIu64
" (state: %u).\n",
- ch, msg_id, fragq->state);
+ chn, msg_id, fragq->state);
struct GNUNET_CONTAINER_MultiHashMap
*chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache,
- &ch->pub_key_hash);
+ &chn->pub_key_hash);
GNUNET_assert (NULL != chan_msgs);
uint64_t frag_id;
@@ -985,7 +1009,7 @@
{
if (GNUNET_NO == drop)
{
- mmsg_to_clients (ch, cache_entry->mmsg);
+ client_send_mcast_msg (chn, cache_entry->mmsg);
}
if (cache_entry->ref_count <= 1)
{
@@ -1014,7 +1038,7 @@
struct GNUNET_HashCode msg_id_hash;
hash_key_from_nll (&msg_id_hash, msg_id);
- GNUNET_CONTAINER_multihashmap_remove (ch->recv_frags, &msg_id_hash, fragq);
+ GNUNET_CONTAINER_multihashmap_remove (chn->recv_frags, &msg_id_hash,
fragq);
GNUNET_CONTAINER_heap_destroy (fragq->fragments);
GNUNET_free (fragq);
}
@@ -1034,33 +1058,33 @@
* - A stateful message is only sent if the previous stateful message
* has already been delivered to the client.
*
- * @param ch Channel.
+ * @param chn Channel.
*
* @return Number of messages removed from queue and sent to client.
*/
static uint64_t
-message_queue_run (struct Channel *ch)
+message_queue_run (struct Channel *chn)
{
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- "%p Running message queue.\n", ch);
+ "%p Running message queue.\n", chn);
uint64_t n = 0;
uint64_t msg_id;
- while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (ch->recv_msgs, NULL,
+ while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (chn->recv_msgs, NULL,
&msg_id))
{
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- "%p Processing message %" PRIu64 " in queue.\n", ch, msg_id);
+ "%p Processing message %" PRIu64 " in queue.\n", chn, msg_id);
struct GNUNET_HashCode msg_id_hash;
hash_key_from_hll (&msg_id_hash, msg_id);
struct FragmentQueue *
- fragq = GNUNET_CONTAINER_multihashmap_get (ch->recv_frags, &msg_id_hash);
+ fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags,
&msg_id_hash);
if (NULL == fragq || fragq->state <= MSG_FRAG_STATE_HEADER)
{
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
"%p No fragq (%p) or header not complete.\n",
- ch, fragq);
+ chn, fragq);
break;
}
@@ -1070,40 +1094,40 @@
if (GNUNET_PSYC_STATE_NOT_MODIFIED == fragq->state_delta)
{
if (!(fragq->flags & GNUNET_PSYC_MESSAGE_ORDER_ANY)
- && msg_id - 1 != ch->max_message_id)
+ && msg_id - 1 != chn->max_message_id)
{
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
"%p Out of order message. "
"(%" PRIu64 " - 1 != %" PRIu64 ")\n",
- ch, msg_id, ch->max_message_id);
+ chn, msg_id, chn->max_message_id);
break;
}
}
else
{
- if (msg_id - fragq->state_delta != ch->max_state_message_id)
+ if (msg_id - fragq->state_delta != chn->max_state_message_id)
{
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
"%p Out of order stateful message. "
"(%" PRIu64 " - %" PRIu64 " != %" PRIu64 ")\n",
- ch, msg_id, fragq->state_delta,
ch->max_state_message_id);
+ chn, msg_id, fragq->state_delta,
chn->max_state_message_id);
break;
}
#if TODO
/* FIXME: apply modifiers to state in PSYCstore */
- GNUNET_PSYCSTORE_state_modify (store, &ch->pub_key, message_id,
- state_modify_result_cb, cls);
+ GNUNET_PSYCSTORE_state_modify (store, &chn->pub_key, message_id,
+ store_recv_state_modify_result, cls);
#endif
- ch->max_state_message_id = msg_id;
+ chn->max_state_message_id = msg_id;
}
- ch->max_message_id = msg_id;
+ chn->max_message_id = msg_id;
}
- fragment_queue_run (ch, msg_id, fragq, MSG_FRAG_STATE_DROP ==
fragq->state);
- GNUNET_CONTAINER_heap_remove_root (ch->recv_msgs);
+ fragment_queue_run (chn, msg_id, fragq, MSG_FRAG_STATE_DROP ==
fragq->state);
+ GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs);
n++;
}
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%p Removed %" PRIu64 " messages from queue.\n", ch, n);
+ "%p Removed %" PRIu64 " messages from queue.\n", chn, n);
return n;
}
@@ -1113,111 +1137,92 @@
*
* Remove all messages in queue without sending it to clients.
*
- * @param ch Channel.
+ * @param chn Channel.
*
* @return Number of messages removed from queue.
*/
static uint64_t
-message_queue_drop (struct Channel *ch)
+message_queue_drop (struct Channel *chn)
{
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- "%p Dropping message queue.\n", ch);
+ "%p Dropping message queue.\n", chn);
uint64_t n = 0;
uint64_t msg_id;
- while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (ch->recv_msgs, NULL,
+ while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (chn->recv_msgs, NULL,
&msg_id))
{
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- "%p Dropping message %" PRIu64 " from queue.\n", ch, msg_id);
+ "%p Dropping message %" PRIu64 " from queue.\n", chn, msg_id);
struct GNUNET_HashCode msg_id_hash;
hash_key_from_hll (&msg_id_hash, msg_id);
struct FragmentQueue *
- fragq = GNUNET_CONTAINER_multihashmap_get (ch->recv_frags, &msg_id_hash);
+ fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags,
&msg_id_hash);
- fragment_queue_run (ch, msg_id, fragq, GNUNET_YES);
- GNUNET_CONTAINER_heap_remove_root (ch->recv_msgs);
+ fragment_queue_run (chn, msg_id, fragq, GNUNET_YES);
+ GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs);
n++;
}
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%p Removed %" PRIu64 " messages from queue.\n", ch, n);
+ "%p Removed %" PRIu64 " messages from queue.\n", chn, n);
return n;
}
/**
- * Handle incoming message from multicast.
+ * Handle the result of a GNUNET_PSYCSTORE_fragment_store() operation.
+ */
+static void
+store_recv_fragment_store_result (void *cls, int64_t result, const char
*err_msg)
+{
+ struct Channel *chn = cls;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "%p GNUNET_PSYCSTORE_fragment_store() returned %" PRId64 "
(%s)\n",
+ chn, result, err_msg);
+}
+
+
+/**
+ * Handle incoming message fragment from multicast.
*
- * @param ch Channel.
- * @param mmsg Multicast message.
- *
- * @return #GNUNET_OK or #GNUNET_SYSERR
+ * Store it using PSYCstore and send it to the clients of the channel in order.
*/
-static int
-client_multicast_message (struct Channel *ch,
- const struct GNUNET_MULTICAST_MessageHeader *mmsg)
+static void
+mcast_recv_message (void *cls, const struct GNUNET_MULTICAST_MessageHeader
*mmsg)
{
- GNUNET_PSYCSTORE_fragment_store (store, &ch->pub_key, mmsg, 0, NULL, NULL);
+ struct Channel *chn = cls;
+ uint16_t size = ntohs (mmsg->header.size);
- uint16_t size = ntohs (mmsg->header.size);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "%p Received multicast message of size %u.\n",
+ chn, size);
+
+ GNUNET_PSYCSTORE_fragment_store (store, &chn->pub_key, mmsg, 0,
+ &store_recv_fragment_store_result, chn);
+
uint16_t first_ptype = 0, last_ptype = 0;
-
if (GNUNET_SYSERR
- == GNUNET_PSYC_check_message_parts (size - sizeof (*mmsg),
+ == GNUNET_PSYC_receive_check_parts (size - sizeof (*mmsg),
(const char *) &mmsg[1],
&first_ptype, &last_ptype))
{
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- "%p Received message with invalid parts from multicast. "
- "Dropping message.\n", ch);
+ "%p Dropping incoming multicast message with invalid parts.\n",
+ chn);
GNUNET_break_op (0);
- return GNUNET_SYSERR;
+ return;
}
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Message parts: first: type %u, last: type %u\n",
first_ptype, last_ptype);
- fragment_queue_insert (ch, mmsg, first_ptype, last_ptype);
- message_queue_run (ch);
-
- return GNUNET_OK;
+ fragment_queue_insert (chn, mmsg, first_ptype, last_ptype);
+ message_queue_run (chn);
}
/**
- * Incoming message fragment from multicast.
- *
- * Store it using PSYCstore and send it to the client of the channel.
- */
-static void
-mcast_message_cb (void *cls, const struct GNUNET_MessageHeader *msg)
-{
- struct Channel *ch = cls;
- uint16_t type = ntohs (msg->type);
- uint16_t size = ntohs (msg->size);
-
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%p Received message of type %u and size %u from multicast.\n",
- ch, type, size);
-
- switch (type)
- {
- case GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE:
- {
- client_multicast_message (ch, (const struct
- GNUNET_MULTICAST_MessageHeader *) msg);
- break;
- }
- default:
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- "%p Dropping unknown message of type %u and size %u.\n",
- ch, type, size);
- }
-}
-
-
-/**
* Incoming request fragment from multicast for a master.
*
* @param cls Master.
@@ -1226,59 +1231,35 @@
* @param flags Request flags.
*/
static void
-mcast_request_cb (void *cls,
- const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key,
- const struct GNUNET_MessageHeader *msg,
- enum GNUNET_MULTICAST_MessageFlags flags)
+mcast_recv_request (void *cls,
+ const struct GNUNET_MULTICAST_RequestHeader *req)
{
struct Master *mst = cls;
- struct Channel *ch = &mst->ch;
+ uint16_t size = ntohs (req->header.size);
- uint16_t type = ntohs (msg->type);
- uint16_t size = ntohs (msg->size);
-
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%p Received request of type %u and size %u from multicast.\n",
- ch, type, size);
+ "%p Received multicast request of size %u.\n",
+ mst, size);
- switch (type)
+ uint16_t first_ptype = 0, last_ptype = 0;
+ if (GNUNET_SYSERR
+ == GNUNET_PSYC_receive_check_parts (size - sizeof (*req),
+ (const char *) &req[1],
+ &first_ptype, &last_ptype))
{
- case GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST:
- {
- const struct GNUNET_MULTICAST_RequestHeader *req
- = (const struct GNUNET_MULTICAST_RequestHeader *) msg;
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "%p Dropping incoming multicast request with invalid parts.\n",
+ mst);
+ GNUNET_break_op (0);
+ return;
+ }
- /* FIXME: see message_cb() */
- if (GNUNET_SYSERR == GNUNET_PSYC_check_message_parts (size - sizeof (*req),
- (const char *)
&req[1],
- NULL, NULL))
- {
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- "%p Dropping request with invalid parts "
- "received from multicast.\n", ch);
- GNUNET_break_op (0);
- break;
- }
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Message parts: first: type %u, last: type %u\n",
+ first_ptype, last_ptype);
- struct GNUNET_PSYC_MessageHeader *pmsg;
- uint16_t psize = sizeof (*pmsg) + size - sizeof (*req);
- pmsg = GNUNET_malloc (psize);
- pmsg->header.size = htons (psize);
- pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
- pmsg->message_id = req->request_id;
- pmsg->flags = htonl (GNUNET_PSYC_MESSAGE_REQUEST);
-
- memcpy (&pmsg[1], &req[1], size - sizeof (*req));
- msg_to_clients (ch, &pmsg->header);
- GNUNET_free (pmsg);
- break;
- }
- default:
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%p Dropping unknown request of type %u and size %u.\n",
- ch, type, size);
- GNUNET_break_op (0);
- }
+ /* FIXME: in-order delivery */
+ client_send_mcast_req (mst, req);
}
@@ -1286,13 +1267,13 @@
* Response from PSYCstore with the current counter values for a channel
master.
*/
static void
-master_counters_cb (void *cls, int result, uint64_t max_fragment_id,
- uint64_t max_message_id, uint64_t max_group_generation,
- uint64_t max_state_message_id)
+store_recv_master_counters (void *cls, int result, uint64_t max_fragment_id,
+ uint64_t max_message_id, uint64_t
max_group_generation,
+ uint64_t max_state_message_id)
{
struct Master *mst = cls;
- struct Channel *ch = &mst->ch;
- ch->store_op = NULL;
+ struct Channel *chn = &mst->chn;
+ chn->store_op = NULL;
struct CountersResult res;
res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK);
@@ -1303,28 +1284,28 @@
if (GNUNET_OK == result || GNUNET_NO == result)
{
mst->max_message_id = max_message_id;
- ch->max_message_id = max_message_id;
- ch->max_state_message_id = max_state_message_id;
+ chn->max_message_id = max_message_id;
+ chn->max_state_message_id = max_state_message_id;
mst->max_group_generation = max_group_generation;
mst->origin
= GNUNET_MULTICAST_origin_start (cfg, &mst->priv_key, max_fragment_id,
- &mcast_join_request_cb,
- &mcast_membership_test_cb,
- &mcast_replay_fragment_cb,
- &mcast_replay_message_cb,
- &mcast_request_cb,
- &mcast_message_cb, ch);
- ch->ready = GNUNET_YES;
+ &mcast_recv_join_request,
+ &mcast_recv_membership_test,
+ &mcast_recv_replay_fragment,
+ &mcast_recv_replay_message,
+ &mcast_recv_request,
+ &mcast_recv_message, chn);
+ chn->ready = GNUNET_YES;
}
else
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"%p GNUNET_PSYCSTORE_counters_get() "
"returned %d for channel %s.\n",
- ch, result, GNUNET_h2s (&ch->pub_key_hash));
+ chn, result, GNUNET_h2s (&chn->pub_key_hash));
}
- msg_to_clients (ch, &res.header);
+ client_send_msg (chn, &res.header);
}
@@ -1332,13 +1313,13 @@
* Response from PSYCstore with the current counter values for a channel slave.
*/
void
-slave_counters_cb (void *cls, int result, uint64_t max_fragment_id,
- uint64_t max_message_id, uint64_t max_group_generation,
- uint64_t max_state_message_id)
+store_recv_slave_counters (void *cls, int result, uint64_t max_fragment_id,
+ uint64_t max_message_id, uint64_t
max_group_generation,
+ uint64_t max_state_message_id)
{
struct Slave *slv = cls;
- struct Channel *ch = &slv->ch;
- ch->store_op = NULL;
+ struct Channel *chn = &slv->chn;
+ chn->store_op = NULL;
struct CountersResult res;
res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK);
@@ -1348,38 +1329,38 @@
if (GNUNET_OK == result || GNUNET_NO == result)
{
- ch->max_message_id = max_message_id;
- ch->max_state_message_id = max_state_message_id;
+ chn->max_message_id = max_message_id;
+ chn->max_state_message_id = max_state_message_id;
slv->member
- = GNUNET_MULTICAST_member_join (cfg, &ch->pub_key, &slv->priv_key,
+ = GNUNET_MULTICAST_member_join (cfg, &chn->pub_key, &slv->priv_key,
&slv->origin,
slv->relay_count, slv->relays,
slv->join_req,
- &mcast_join_request_cb,
- &mcast_join_decision_cb,
- &mcast_membership_test_cb,
- &mcast_replay_fragment_cb,
- &mcast_replay_message_cb,
- &mcast_message_cb, ch);
+ &mcast_recv_join_request,
+ &mcast_recv_join_decision,
+ &mcast_recv_membership_test,
+ &mcast_recv_replay_fragment,
+ &mcast_recv_replay_message,
+ &mcast_recv_message, chn);
}
else
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"%p GNUNET_PSYCSTORE_counters_get() "
"returned %d for channel %s.\n",
- ch, result, GNUNET_h2s (&ch->pub_key_hash));
+ chn, result, GNUNET_h2s (&chn->pub_key_hash));
}
- msg_to_clients (ch, &res.header);
+ client_send_msg (chn, &res.header);
}
static void
-channel_init (struct Channel *ch)
+channel_init (struct Channel *chn)
{
- ch->recv_msgs
+ chn->recv_msgs
= GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
- ch->recv_frags = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
+ chn->recv_frags = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
}
@@ -1387,8 +1368,8 @@
* Handle a connecting client starting a channel master.
*/
static void
-client_master_start (void *cls, struct GNUNET_SERVER_Client *client,
- const struct GNUNET_MessageHeader *msg)
+client_recv_master_start (void *cls, struct GNUNET_SERVER_Client *client,
+ const struct GNUNET_MessageHeader *msg)
{
const struct MasterStartRequest *req
= (const struct MasterStartRequest *) msg;
@@ -1401,7 +1382,7 @@
struct Master *
mst = GNUNET_CONTAINER_multihashmap_get (masters, &pub_key_hash);
- struct Channel *ch;
+ struct Channel *chn;
if (NULL == mst)
{
@@ -1410,20 +1391,20 @@
mst->priv_key = req->channel_key;
mst->join_reqs = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
- ch = &mst->ch;
- ch->is_master = GNUNET_YES;
- ch->pub_key = pub_key;
- ch->pub_key_hash = pub_key_hash;
- channel_init (ch);
+ chn = &mst->chn;
+ chn->is_master = GNUNET_YES;
+ chn->pub_key = pub_key;
+ chn->pub_key_hash = pub_key_hash;
+ channel_init (chn);
- GNUNET_CONTAINER_multihashmap_put (masters, &ch->pub_key_hash, ch,
+ GNUNET_CONTAINER_multihashmap_put (masters, &chn->pub_key_hash, chn,
GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
- ch->store_op = GNUNET_PSYCSTORE_counters_get (store, &ch->pub_key,
- master_counters_cb, mst);
+ chn->store_op = GNUNET_PSYCSTORE_counters_get (store, &chn->pub_key,
+ store_recv_master_counters,
mst);
}
else
{
- ch = &mst->ch;
+ chn = &mst->chn;
struct CountersResult res;
res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK);
@@ -1438,13 +1419,13 @@
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"%p Client connected as master to channel %s.\n",
- mst, GNUNET_h2s (&ch->pub_key_hash));
+ mst, GNUNET_h2s (&chn->pub_key_hash));
- struct ClientList *cl = GNUNET_new (struct ClientList);
- cl->client = client;
- GNUNET_CONTAINER_DLL_insert (ch->clients_head, ch->clients_tail, cl);
+ struct ClientListItem *cli = GNUNET_new (struct ClientListItem);
+ cli->client = client;
+ GNUNET_CONTAINER_DLL_insert (chn->clients_head, chn->clients_tail, cli);
- GNUNET_SERVER_client_set_user_context (client, ch);
+ GNUNET_SERVER_client_set_user_context (client, chn);
GNUNET_SERVER_receive_done (client, GNUNET_OK);
}
@@ -1453,8 +1434,8 @@
* Handle a connecting client joining as a channel slave.
*/
static void
-client_slave_join (void *cls, struct GNUNET_SERVER_Client *client,
- const struct GNUNET_MessageHeader *msg)
+client_recv_slave_join (void *cls, struct GNUNET_SERVER_Client *client,
+ const struct GNUNET_MessageHeader *msg)
{
const struct SlaveJoinRequest *req
= (const struct SlaveJoinRequest *) msg;
@@ -1467,13 +1448,13 @@
GNUNET_CRYPTO_hash (&req->channel_key, sizeof (req->channel_key),
&pub_key_hash);
struct GNUNET_CONTAINER_MultiHashMap *
- ch_slv = GNUNET_CONTAINER_multihashmap_get (channel_slaves, &pub_key_hash);
+ chn_slv = GNUNET_CONTAINER_multihashmap_get (channel_slaves,
&pub_key_hash);
struct Slave *slv = NULL;
- struct Channel *ch;
+ struct Channel *chn;
- if (NULL != ch_slv)
+ if (NULL != chn_slv)
{
- slv = GNUNET_CONTAINER_multihashmap_get (ch_slv, &slv_pub_key_hash);
+ slv = GNUNET_CONTAINER_multihashmap_get (chn_slv, &slv_pub_key_hash);
}
if (NULL == slv)
{
@@ -1494,34 +1475,34 @@
memcpy (&slv->relays[i], &relays[i], sizeof (*relays));
}
- ch = &slv->ch;
- ch->is_master = GNUNET_NO;
- ch->pub_key = req->channel_key;
- ch->pub_key_hash = pub_key_hash;
- channel_init (ch);
+ chn = &slv->chn;
+ chn->is_master = GNUNET_NO;
+ chn->pub_key = req->channel_key;
+ chn->pub_key_hash = pub_key_hash;
+ channel_init (chn);
- if (NULL == ch_slv)
+ if (NULL == chn_slv)
{
- ch_slv = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
- GNUNET_CONTAINER_multihashmap_put (channel_slaves, &ch->pub_key_hash,
ch_slv,
+ chn_slv = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
+ GNUNET_CONTAINER_multihashmap_put (channel_slaves, &chn->pub_key_hash,
chn_slv,
GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
}
- GNUNET_CONTAINER_multihashmap_put (ch_slv, &slv->pub_key_hash, ch,
+ GNUNET_CONTAINER_multihashmap_put (chn_slv, &slv->pub_key_hash, chn,
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
- GNUNET_CONTAINER_multihashmap_put (slaves, &ch->pub_key_hash, ch,
+ GNUNET_CONTAINER_multihashmap_put (slaves, &chn->pub_key_hash, chn,
GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
- ch->store_op = GNUNET_PSYCSTORE_counters_get (store, &ch->pub_key,
- slave_counters_cb, slv);
+ chn->store_op = GNUNET_PSYCSTORE_counters_get (store, &chn->pub_key,
+ &store_recv_slave_counters,
slv);
}
else
{
- ch = &slv->ch;
+ chn = &slv->chn;
struct CountersResult res;
res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK);
res.header.size = htons (sizeof (res));
res.result_code = htonl (GNUNET_OK);
- res.max_message_id = GNUNET_htonll (ch->max_message_id);
+ res.max_message_id = GNUNET_htonll (chn->max_message_id);
GNUNET_SERVER_notification_context_add (nc, client);
GNUNET_SERVER_notification_context_unicast (nc, client, &res.header,
@@ -1530,16 +1511,16 @@
if (NULL == slv->member)
{
slv->member
- = GNUNET_MULTICAST_member_join (cfg, &ch->pub_key, &slv->priv_key,
+ = GNUNET_MULTICAST_member_join (cfg, &chn->pub_key, &slv->priv_key,
&slv->origin,
slv->relay_count, slv->relays,
slv->join_req,
- &mcast_join_request_cb,
- &mcast_join_decision_cb,
- &mcast_membership_test_cb,
- &mcast_replay_fragment_cb,
- &mcast_replay_message_cb,
- &mcast_message_cb, ch);
+ &mcast_recv_join_request,
+ &mcast_recv_join_decision,
+ &mcast_recv_membership_test,
+ &mcast_recv_replay_fragment,
+ &mcast_recv_replay_message,
+ &mcast_recv_message, chn);
}
else if (NULL != slv->join_dcsn)
@@ -1553,13 +1534,13 @@
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"%p Client connected as slave to channel %s.\n",
- slv, GNUNET_h2s (&ch->pub_key_hash));
+ slv, GNUNET_h2s (&chn->pub_key_hash));
- struct ClientList *cl = GNUNET_new (struct ClientList);
- cl->client = client;
- GNUNET_CONTAINER_DLL_insert (ch->clients_head, ch->clients_tail, cl);
+ struct ClientListItem *cli = GNUNET_new (struct ClientListItem);
+ cli->client = client;
+ GNUNET_CONTAINER_DLL_insert (chn->clients_head, chn->clients_tail, cli);
- GNUNET_SERVER_client_set_user_context (client, &slv->ch);
+ GNUNET_SERVER_client_set_user_context (client, &slv->chn);
GNUNET_SERVER_receive_done (client, GNUNET_OK);
}
@@ -1575,8 +1556,8 @@
* Iterator callback for responding to join requests of a slave.
*/
static int
-send_join_decision_cb (void *cls, const struct GNUNET_HashCode *pub_key_hash,
- void *jh)
+mcast_send_join_decision (void *cls, const struct GNUNET_HashCode
*pub_key_hash,
+ void *jh)
{
struct JoinDecisionClosure *jcls = cls;
// FIXME: add relays
@@ -1589,13 +1570,13 @@
* Join decision from client.
*/
static void
-client_join_decision (void *cls, struct GNUNET_SERVER_Client *client,
- const struct GNUNET_MessageHeader *msg)
+client_recv_join_decision (void *cls, struct GNUNET_SERVER_Client *client,
+ const struct GNUNET_MessageHeader *msg)
{
struct Channel *
- ch = GNUNET_SERVER_client_get_user_context (client, struct Channel);
- GNUNET_assert (GNUNET_YES == ch->is_master);
- struct Master *mst = (struct Master *) ch;
+ chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
+ GNUNET_assert (GNUNET_YES == chn->is_master);
+ struct Master *mst = (struct Master *) chn;
struct MasterJoinDecision *dcsn = (struct MasterJoinDecision *) msg;
struct JoinDecisionClosure jcls;
@@ -1612,13 +1593,13 @@
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"%p Got join decision (%d) from client for channel %s..\n",
- mst, jcls.is_admitted, GNUNET_h2s (&ch->pub_key_hash));
+ mst, jcls.is_admitted, GNUNET_h2s (&chn->pub_key_hash));
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"%p ..and slave %s.\n",
mst, GNUNET_h2s (&slave_key_hash));
GNUNET_CONTAINER_multihashmap_get_multiple (mst->join_reqs, &slave_key_hash,
- &send_join_decision_cb, &jcls);
+ &mcast_send_join_decision,
&jcls);
GNUNET_CONTAINER_multihashmap_remove_all (mst->join_reqs, &slave_key_hash);
GNUNET_SERVER_receive_done (client, GNUNET_OK);
}
@@ -1629,10 +1610,10 @@
*
* Sent after a message fragment has been passed on to multicast.
*
- * @param ch The channel struct for the client.
+ * @param chn The channel struct for the client.
*/
static void
-send_message_ack (struct Channel *ch, struct GNUNET_SERVER_Client *client)
+send_message_ack (struct Channel *chn, struct GNUNET_SERVER_Client *client)
{
struct GNUNET_MessageHeader res;
res.size = htons (sizeof (res));
@@ -1650,40 +1631,40 @@
static int
transmit_notify (void *cls, size_t *data_size, void *data)
{
- struct Channel *ch = cls;
- struct TransmitMessage *tmit_msg = ch->tmit_head;
+ struct Channel *chn = cls;
+ struct TransmitMessage *tmit_msg = chn->tmit_head;
if (NULL == tmit_msg || *data_size < tmit_msg->size)
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%p transmit_notify: nothing to send.\n", ch);
+ "%p transmit_notify: nothing to send.\n", chn);
*data_size = 0;
return GNUNET_NO;
}
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%p transmit_notify: sending %u bytes.\n", ch, tmit_msg->size);
+ "%p transmit_notify: sending %u bytes.\n", chn, tmit_msg->size);
*data_size = tmit_msg->size;
memcpy (data, &tmit_msg[1], *data_size);
- int ret = (MSG_STATE_END < ch->tmit_state) ? GNUNET_NO : GNUNET_YES;
+ int ret = (MSG_STATE_END < chn->tmit_state) ? GNUNET_NO : GNUNET_YES;
if (NULL != tmit_msg->client)
- send_message_ack (ch, tmit_msg->client);
+ send_message_ack (chn, tmit_msg->client);
- GNUNET_CONTAINER_DLL_remove (ch->tmit_head, ch->tmit_tail, tmit_msg);
+ GNUNET_CONTAINER_DLL_remove (chn->tmit_head, chn->tmit_tail, tmit_msg);
GNUNET_free (tmit_msg);
- if (0 == ch->tmit_task)
+ if (0 == chn->tmit_task)
{
- if (NULL != ch->tmit_head)
+ if (NULL != chn->tmit_head)
{
- transmit_message (ch);
+ transmit_message (chn);
}
- else if (ch->disconnected)
+ else if (chn->disconnected)
{
/* FIXME: handle partial message (when still in_transmit) */
- cleanup_channel (ch);
+ cleanup_channel (chn);
}
}
@@ -1732,7 +1713,7 @@
master_transmit_message (struct Master *mst)
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p master_transmit_message()\n", mst);
- mst->ch.tmit_task = 0;
+ mst->chn.tmit_task = 0;
if (NULL == mst->tmit_handle)
{
mst->tmit_handle
@@ -1753,7 +1734,7 @@
static void
slave_transmit_message (struct Slave *slv)
{
- slv->ch.tmit_task = 0;
+ slv->chn.tmit_task = 0;
if (NULL == slv->tmit_handle)
{
slv->tmit_handle
@@ -1768,11 +1749,11 @@
static inline void
-transmit_message (struct Channel *ch)
+transmit_message (struct Channel *chn)
{
- ch->is_master
- ? master_transmit_message ((struct Master *) ch)
- : slave_transmit_message ((struct Slave *) ch);
+ chn->is_master
+ ? master_transmit_message ((struct Master *) chn)
+ : slave_transmit_message ((struct Slave *) chn);
}
@@ -1828,7 +1809,7 @@
/**
* Queue PSYC message parts for sending to multicast.
*
- * @param ch Channel to send to.
+ * @param chn Channel to send to.
* @param client Client the message originates from.
* @param data_size Size of @a data.
* @param data Concatenated message parts.
@@ -1836,7 +1817,7 @@
* @param last_ptype Last message part type in @a data.
*/
static void
-queue_message (struct Channel *ch,
+queue_message (struct Channel *chn,
struct GNUNET_SERVER_Client *client,
size_t data_size,
const void *data,
@@ -1847,14 +1828,14 @@
memcpy (&tmit_msg[1], data, data_size);
tmit_msg->client = client;
tmit_msg->size = data_size;
- tmit_msg->state = ch->tmit_state;
+ tmit_msg->state = chn->tmit_state;
- GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, tmit_msg);
+ GNUNET_CONTAINER_DLL_insert_tail (chn->tmit_head, chn->tmit_tail, tmit_msg);
- ch->is_master
- ? master_queue_message ((struct Master *) ch, tmit_msg,
+ chn->is_master
+ ? master_queue_message ((struct Master *) chn, tmit_msg,
first_ptype, last_ptype)
- : slave_queue_message ((struct Slave *) ch, tmit_msg,
+ : slave_queue_message ((struct Slave *) chn, tmit_msg,
first_ptype, last_ptype);
}
@@ -1862,11 +1843,11 @@
/**
* Cancel transmission of current message.
*
- * @param ch Channel to send to.
+ * @param chn Channel to send to.
* @param client Client the message originates from.
*/
static void
-transmit_cancel (struct Channel *ch, struct GNUNET_SERVER_Client *client)
+transmit_cancel (struct Channel *chn, struct GNUNET_SERVER_Client *client)
{
uint16_t type = GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL;
@@ -1874,8 +1855,8 @@
msg.size = htons (sizeof (msg));
msg.type = htons (type);
- queue_message (ch, client, sizeof (msg), &msg, type, type);
- transmit_message (ch);
+ queue_message (chn, client, sizeof (msg), &msg, type, type);
+ transmit_message (chn);
/* FIXME: cleanup */
}
@@ -1885,21 +1866,21 @@
* Incoming message from a master or slave client.
*/
static void
-client_psyc_message (void *cls, struct GNUNET_SERVER_Client *client,
- const struct GNUNET_MessageHeader *msg)
+client_recv_psyc_message (void *cls, struct GNUNET_SERVER_Client *client,
+ const struct GNUNET_MessageHeader *msg)
{
struct Channel *
- ch = GNUNET_SERVER_client_get_user_context (client, struct Channel);
- GNUNET_assert (NULL != ch);
+ chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
+ GNUNET_assert (NULL != chn);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%p Received message from client.\n", ch);
+ "%p Received message from client.\n", chn);
GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, msg);
- if (GNUNET_YES != ch->ready)
+ if (GNUNET_YES != chn->ready)
{
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- "%p Channel is not ready, dropping message from client.\n",
ch);
+ "%p Channel is not ready, dropping message from client.\n",
chn);
GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
return;
}
@@ -1907,30 +1888,30 @@
uint16_t size = ntohs (msg->size);
if (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < size - sizeof (*msg))
{
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "%p Message payload too large.\n",
ch);
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "%p Message payload too large.\n",
chn);
GNUNET_break (0);
- transmit_cancel (ch, client);
+ transmit_cancel (chn, client);
GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
return;
}
uint16_t first_ptype = 0, last_ptype = 0;
if (GNUNET_SYSERR
- == GNUNET_PSYC_check_message_parts (size - sizeof (*msg),
+ == GNUNET_PSYC_receive_check_parts (size - sizeof (*msg),
(const char *) &msg[1],
&first_ptype, &last_ptype))
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
- "%p Received invalid message part from client.\n", ch);
+ "%p Received invalid message part from client.\n", chn);
GNUNET_break (0);
- transmit_cancel (ch, client);
+ transmit_cancel (chn, client);
GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
return;
}
- queue_message (ch, client, size - sizeof (*msg), &msg[1],
+ queue_message (chn, client, size - sizeof (*msg), &msg[1],
first_ptype, last_ptype);
- transmit_message (ch);
+ transmit_message (chn);
GNUNET_SERVER_receive_done (client, GNUNET_OK);
};
@@ -1940,8 +1921,8 @@
* Client requests to add a slave to the membership database.
*/
static void
-client_slave_add (void *cls, struct GNUNET_SERVER_Client *client,
- const struct GNUNET_MessageHeader *msg)
+client_recv_slave_add (void *cls, struct GNUNET_SERVER_Client *client,
+ const struct GNUNET_MessageHeader *msg)
{
}
@@ -1951,8 +1932,8 @@
* Client requests to remove a slave from the membership database.
*/
static void
-client_slave_remove (void *cls, struct GNUNET_SERVER_Client *client,
- const struct GNUNET_MessageHeader *msg)
+client_recv_slave_remove (void *cls, struct GNUNET_SERVER_Client *client,
+ const struct GNUNET_MessageHeader *msg)
{
}
@@ -1962,8 +1943,8 @@
* Client requests channel history from PSYCstore.
*/
static void
-client_story_request (void *cls, struct GNUNET_SERVER_Client *client,
- const struct GNUNET_MessageHeader *msg)
+client_recv_story_request (void *cls, struct GNUNET_SERVER_Client *client,
+ const struct GNUNET_MessageHeader *msg)
{
}
@@ -1973,8 +1954,8 @@
* Client requests best matching state variable from PSYCstore.
*/
static void
-client_state_get (void *cls, struct GNUNET_SERVER_Client *client,
- const struct GNUNET_MessageHeader *msg)
+client_recv_state_get (void *cls, struct GNUNET_SERVER_Client *client,
+ const struct GNUNET_MessageHeader *msg)
{
}
@@ -1984,8 +1965,8 @@
* Client requests state variables with a given prefix from PSYCstore.
*/
static void
-client_state_get_prefix (void *cls, struct GNUNET_SERVER_Client *client,
- const struct GNUNET_MessageHeader *msg)
+client_recv_state_get_prefix (void *cls, struct GNUNET_SERVER_Client *client,
+ const struct GNUNET_MessageHeader *msg)
{
}
@@ -2003,32 +1984,34 @@
const struct GNUNET_CONFIGURATION_Handle *c)
{
static const struct GNUNET_SERVER_MessageHandler handlers[] = {
- { &client_master_start, NULL,
+ { &client_recv_master_start, NULL,
GNUNET_MESSAGE_TYPE_PSYC_MASTER_START, 0 },
- { &client_slave_join, NULL,
+ { &client_recv_slave_join, NULL,
GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN, 0 },
- { &client_join_decision, NULL,
+ { &client_recv_join_decision, NULL,
GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION, 0 },
- { &client_psyc_message, NULL,
+ { &client_recv_psyc_message, NULL,
GNUNET_MESSAGE_TYPE_PSYC_MESSAGE, 0 },
- { &client_slave_add, NULL,
+ { &client_recv_slave_add, NULL,
GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_ADD, 0 },
- { &client_slave_remove, NULL,
+ { &client_recv_slave_remove, NULL,
GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_RM, 0 },
- { &client_story_request, NULL,
+ { &client_recv_story_request, NULL,
GNUNET_MESSAGE_TYPE_PSYC_STORY_REQUEST, 0 },
- { &client_state_get, NULL,
+ { &client_recv_state_get, NULL,
GNUNET_MESSAGE_TYPE_PSYC_STATE_GET, 0 },
- { &client_state_get_prefix, NULL,
- GNUNET_MESSAGE_TYPE_PSYC_STATE_GET_PREFIX, 0 }
+ { &client_recv_state_get_prefix, NULL,
+ GNUNET_MESSAGE_TYPE_PSYC_STATE_GET_PREFIX, 0 },
+
+ { NULL, NULL, 0, 0 }
};
cfg = c;
Modified: gnunet/src/psyc/psyc_api.c
===================================================================
--- gnunet/src/psyc/psyc_api.c 2014-05-29 16:35:53 UTC (rev 33442)
+++ gnunet/src/psyc/psyc_api.c 2014-05-29 16:35:55 UTC (rev 33443)
@@ -37,151 +37,48 @@
#include "gnunet_env_lib.h"
#include "gnunet_multicast_service.h"
#include "gnunet_psyc_service.h"
+#include "gnunet_psyc_util_lib.h"
#include "psyc.h"
#define LOG(kind,...) GNUNET_log_from (kind, "psyc-api",__VA_ARGS__)
-struct MessageQueue
-{
- struct MessageQueue *prev;
- struct MessageQueue *next;
- /* Followed by struct GNUNET_MessageHeader msg */
-};
-
/**
- * Handle for a pending PSYC transmission operation.
- */
-struct GNUNET_PSYC_ChannelTransmitHandle
-{
- struct GNUNET_PSYC_Channel *ch;
- GNUNET_PSYC_TransmitNotifyModifier notify_mod;
- GNUNET_PSYC_TransmitNotifyData notify_data;
- void *notify_cls;
- enum MessageState state;
-};
-
-/**
* Handle to access PSYC channel operations for both the master and slaves.
*/
struct GNUNET_PSYC_Channel
{
/**
- * Transmission handle;
- */
- struct GNUNET_PSYC_ChannelTransmitHandle tmit;
-
- /**
* Configuration to use.
*/
const struct GNUNET_CONFIGURATION_Handle *cfg;
/**
- * Socket (if available).
+ * Client connection to the service.
*/
- struct GNUNET_CLIENT_Connection *client;
+ struct GNUNET_CLIENT_MANAGER_Connection *client;
/**
- * Currently pending transmission request, or NULL for none.
+ * Transmission handle;
*/
- struct GNUNET_CLIENT_TransmitHandle *th;
+ struct GNUNET_PSYC_TransmitHandle *tmit;
/**
- * Head of messages to transmit to the service.
+ * Receipt handle;
*/
- struct MessageQueue *tmit_head;
+ struct GNUNET_PSYC_ReceiveHandle *recv;
/**
- * Tail of operations to transmit to the service.
- */
- struct MessageQueue *tmit_tail;
-
- /**
- * Message currently being transmitted to the service.
- */
- struct MessageQueue *tmit_msg;
-
- /**
* Message to send on reconnect.
*/
- struct GNUNET_MessageHeader *reconnect_msg;
+ struct GNUNET_MessageHeader *connect_msg;
/**
- * Task doing exponential back-off trying to reconnect.
- */
- GNUNET_SCHEDULER_TaskIdentifier reconnect_task;
-
- /**
- * Time for next connect retry.
- */
- struct GNUNET_TIME_Relative reconnect_delay;
-
- /**
- * Message part callback.
- */
- GNUNET_PSYC_MessageCallback message_cb;
-
- /**
- * Message part callback for historic message.
- */
- GNUNET_PSYC_MessageCallback hist_message_cb;
-
- /**
- * Closure for @a message_cb.
- */
- void *cb_cls;
-
- /**
- * ID of the message being received from the PSYC service.
- */
- uint64_t recv_message_id;
-
- /**
- * Public key of the slave from which a message is being received.
- */
- struct GNUNET_CRYPTO_EddsaPublicKey recv_slave_key;
-
- /**
- * State of the currently being received message from the PSYC service.
- */
- enum MessageState recv_state;
-
- /**
- * Flags for the currently being received message from the PSYC service.
- */
- enum GNUNET_PSYC_MessageFlags recv_flags;
-
- /**
- * Expected value size for the modifier being received from the PSYC service.
- */
- uint32_t recv_mod_value_size_expected;
-
- /**
- * Actual value size for the modifier being received from the PSYC service.
- */
- uint32_t recv_mod_value_size;
-
- /**
- * Is transmission paused?
- */
- uint8_t tmit_paused;
-
- /**
- * Are we still waiting for a PSYC_TRANSMIT_ACK?
- */
- uint8_t tmit_ack_pending;
-
- /**
* Are we polling for incoming messages right now?
*/
uint8_t in_receive;
/**
- * Are we currently transmitting a message?
- */
- uint8_t in_transmit;
-
- /**
* Is this a master or slave channel?
*/
uint8_t is_master;
@@ -193,7 +90,7 @@
*/
struct GNUNET_PSYC_Master
{
- struct GNUNET_PSYC_Channel ch;
+ struct GNUNET_PSYC_Channel chn;
GNUNET_PSYC_MasterStartCallback start_cb;
@@ -201,6 +98,11 @@
* Join request callback.
*/
GNUNET_PSYC_JoinRequestCallback join_req_cb;
+
+ /**
+ * Closure for the callbacks.
+ */
+ void *cb_cls;
};
@@ -209,11 +111,16 @@
*/
struct GNUNET_PSYC_Slave
{
- struct GNUNET_PSYC_Channel ch;
+ struct GNUNET_PSYC_Channel chn;
GNUNET_PSYC_SlaveConnectCallback connect_cb;
GNUNET_PSYC_JoinDecisionCallback join_dcsn_cb;
+
+ /**
+ * Closure for the callbacks.
+ */
+ void *cb_cls;
};
@@ -258,934 +165,170 @@
static void
-reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
-
-
-static void
-channel_transmit_data (struct GNUNET_PSYC_Channel *ch);
-
-
-/**
- * Reschedule a connect attempt to the service.
- *
- * @param ch Channel to reconnect.
- */
-static void
-reschedule_connect (struct GNUNET_PSYC_Channel *ch)
+channel_send_connect_msg (struct GNUNET_PSYC_Channel *chn)
{
- GNUNET_assert (ch->reconnect_task == GNUNET_SCHEDULER_NO_TASK);
-
- if (NULL != ch->th)
- {
- GNUNET_CLIENT_notify_transmit_ready_cancel (ch->th);
- ch->th = NULL;
- }
- if (NULL != ch->client)
- {
- GNUNET_CLIENT_disconnect (ch->client);
- ch->client = NULL;
- }
- ch->in_receive = GNUNET_NO;
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Scheduling task to reconnect to PSYC service in %s.\n",
- GNUNET_STRINGS_relative_time_to_string (ch->reconnect_delay,
GNUNET_YES));
- ch->reconnect_task =
- GNUNET_SCHEDULER_add_delayed (ch->reconnect_delay, &reconnect, ch);
- ch->reconnect_delay = GNUNET_TIME_STD_BACKOFF (ch->reconnect_delay);
+ uint16_t cmsg_size = ntohs (chn->connect_msg->size);
+ struct GNUNET_MessageHeader * cmsg = GNUNET_malloc (cmsg_size);
+ memcpy (cmsg, chn->connect_msg, cmsg_size);
+ GNUNET_CLIENT_MANAGER_transmit_now (chn->client, cmsg);
}
-/**
- * Schedule transmission of the next message from our queue.
- *
- * @param ch PSYC channel handle
- */
static void
-transmit_next (struct GNUNET_PSYC_Channel *ch);
-
-
-/**
- * Reset stored data related to the last received message.
- */
-static void
-recv_reset (struct GNUNET_PSYC_Channel *ch)
+channel_recv_disconnect (void *cls,
+ struct GNUNET_CLIENT_MANAGER_Connection *client,
+ const struct GNUNET_MessageHeader *msg)
{
- ch->recv_state = MSG_STATE_START;
- ch->recv_flags = 0;
- ch->recv_message_id = 0;
- //FIXME: ch->recv_slave_key = { 0 };
- ch->recv_mod_value_size = 0;
- ch->recv_mod_value_size_expected = 0;
+ struct GNUNET_PSYC_Channel *
+ chn = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*chn));
+ GNUNET_CLIENT_MANAGER_reconnect (client);
+ channel_send_connect_msg (chn);
}
static void
-recv_error (struct GNUNET_PSYC_Channel *ch)
+channel_recv_message (void *cls,
+ struct GNUNET_CLIENT_MANAGER_Connection *client,
+ const struct GNUNET_MessageHeader *msg)
{
- GNUNET_PSYC_MessageCallback message_cb
- = ch->recv_flags & GNUNET_PSYC_MESSAGE_HISTORIC
- ? ch->hist_message_cb
- : ch->message_cb;
-
- if (NULL != message_cb)
- message_cb (ch->cb_cls, ch->recv_message_id, ch->recv_flags, NULL);
-
- recv_reset (ch);
+ struct GNUNET_PSYC_Channel *
+ chn = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*chn));
+ GNUNET_PSYC_receive_message (chn->recv,
+ (const struct GNUNET_PSYC_MessageHeader *) msg);
}
-/**
- * Queue a message part for transmission to the PSYC service.
- *
- * The message part is added to the current message buffer.
- * When this buffer is full, it is added to the transmission queue.
- *
- * @param ch Channel struct for the client.
- * @param msg Modifier message part, or NULL when there's no more modifiers.
- * @param end End of message.
- */
static void
-queue_message (struct GNUNET_PSYC_Channel *ch,
- const struct GNUNET_MessageHeader *msg,
- uint8_t end)
+channel_recv_message_ack (void *cls,
+ struct GNUNET_CLIENT_MANAGER_Connection *client,
+ const struct GNUNET_MessageHeader *msg)
{
- uint16_t size = msg ? ntohs (msg->size) : 0;
-
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Queueing message of type %u and size %u (end: %u)).\n",
- ntohs (msg->type), size, end);
-
- struct MessageQueue *mq = ch->tmit_msg;
- struct GNUNET_MessageHeader *qmsg = NULL;
- if (NULL != mq)
- {
- qmsg = (struct GNUNET_MessageHeader *) &mq[1];
- if (NULL == msg
- || GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < qmsg->size + size)
- {
- /* End of message or buffer is full, add it to transmission queue
- * and start with empty buffer */
- qmsg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
- qmsg->size = htons (qmsg->size);
- GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, mq);
- ch->tmit_msg = mq = NULL;
- ch->tmit_ack_pending++;
- }
- else
- {
- /* Message fits in current buffer, append */
- ch->tmit_msg
- = mq = GNUNET_realloc (mq, sizeof (*mq) + qmsg->size + size);
- qmsg = (struct GNUNET_MessageHeader *) &mq[1];
- memcpy ((char *) qmsg + qmsg->size, msg, size);
- qmsg->size += size;
- }
- }
-
- if (NULL == mq && NULL != msg)
- {
- /* Empty buffer, copy over message. */
- ch->tmit_msg
- = mq = GNUNET_malloc (sizeof (*mq) + sizeof (*qmsg) + size);
- qmsg = (struct GNUNET_MessageHeader *) &mq[1];
- qmsg->size = sizeof (*qmsg) + size;
- memcpy (&qmsg[1], msg, size);
- }
-
- if (NULL != mq
- && (GNUNET_YES == end
- || (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD
- < qmsg->size + sizeof (struct GNUNET_MessageHeader))))
- {
- /* End of message or buffer is full, add it to transmission queue. */
- qmsg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
- qmsg->size = htons (qmsg->size);
- GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, mq);
- ch->tmit_msg = mq = NULL;
- ch->tmit_ack_pending++;
- }
-
- if (GNUNET_YES == end)
- ch->in_transmit = GNUNET_NO;
-
- transmit_next (ch);
+ struct GNUNET_PSYC_Channel *
+ chn = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*chn));
+ GNUNET_PSYC_transmit_got_ack (chn->tmit);
}
-/**
- * Request a modifier from a client to transmit.
- *
- * @param mst Master handle.
- */
static void
-channel_transmit_mod (struct GNUNET_PSYC_Channel *ch)
+master_recv_start_ack (void *cls,
+ struct GNUNET_CLIENT_MANAGER_Connection *client,
+ const struct GNUNET_MessageHeader *msg)
{
- uint16_t max_data_size, data_size;
- char data[GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD] = "";
- struct GNUNET_MessageHeader *msg = (struct GNUNET_MessageHeader *) data;
- int notify_ret;
+ struct GNUNET_PSYC_Master *
+ mst = GNUNET_CLIENT_MANAGER_get_user_context_ (client,
+ sizeof (struct
GNUNET_PSYC_Channel));
- switch (ch->tmit.state)
- {
- case MSG_STATE_MODIFIER:
- {
- struct GNUNET_PSYC_MessageModifier *mod
- = (struct GNUNET_PSYC_MessageModifier *) msg;
- max_data_size = data_size = GNUNET_PSYC_MODIFIER_MAX_PAYLOAD;
- msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER);
- msg->size = sizeof (struct GNUNET_PSYC_MessageModifier);
- notify_ret = ch->tmit.notify_mod (ch->tmit.notify_cls, &data_size, &mod[1],
- &mod->oper, &mod->value_size);
- mod->name_size = strnlen ((char *) &mod[1], data_size);
- if (mod->name_size < data_size)
- {
- mod->value_size = htonl (mod->value_size);
- mod->name_size = htons (mod->name_size);
- }
- else if (0 < data_size)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Got invalid modifier name.\n");
- notify_ret = GNUNET_SYSERR;
- }
- break;
- }
- case MSG_STATE_MOD_CONT:
- {
- max_data_size = data_size = GNUNET_PSYC_MOD_CONT_MAX_PAYLOAD;
- msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT);
- msg->size = sizeof (struct GNUNET_MessageHeader);
- notify_ret = ch->tmit.notify_mod (ch->tmit.notify_cls,
- &data_size, &msg[1], NULL, NULL);
- break;
- }
- default:
- GNUNET_assert (0);
- }
-
- switch (notify_ret)
- {
- case GNUNET_NO:
- if (0 == data_size)
- { /* Transmission paused, nothing to send. */
- ch->tmit_paused = GNUNET_YES;
- return;
- }
- ch->tmit.state = MSG_STATE_MOD_CONT;
- break;
-
- case GNUNET_YES:
- if (0 == data_size)
- {
- /* End of modifiers. */
- ch->tmit.state = MSG_STATE_DATA;
- if (0 == ch->tmit_ack_pending)
- channel_transmit_data (ch);
-
- return;
- }
- ch->tmit.state = MSG_STATE_MODIFIER;
- break;
-
- default:
- LOG (GNUNET_ERROR_TYPE_ERROR,
- "MasterTransmitNotifyModifier returned error "
- "when requesting a modifier.\n");
-
- ch->tmit.state = MSG_STATE_CANCEL;
- msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL);
- msg->size = htons (sizeof (*msg));
-
- queue_message (ch, msg, GNUNET_YES);
- return;
- }
-
- if (0 < data_size)
- {
- GNUNET_assert (data_size <= max_data_size);
- msg->size = htons (msg->size + data_size);
- queue_message (ch, msg, GNUNET_NO);
- }
-
- channel_transmit_mod (ch);
+ struct CountersResult *cres = (struct CountersResult *) msg;
+ if (NULL != mst->start_cb)
+ mst->start_cb (mst->cb_cls, GNUNET_ntohll (cres->max_message_id));
}
-/**
- * Request data from a client to transmit.
- *
- * @param mst Master handle.
- */
static void
-channel_transmit_data (struct GNUNET_PSYC_Channel *ch)
+master_recv_join_request (void *cls,
+ struct GNUNET_CLIENT_MANAGER_Connection *client,
+ const struct GNUNET_MessageHeader *msg)
{
- uint16_t data_size = GNUNET_PSYC_DATA_MAX_PAYLOAD;
- char data[GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD] = "";
- struct GNUNET_MessageHeader *msg = (struct GNUNET_MessageHeader *) data;
+ struct GNUNET_PSYC_Master *
+ mst = GNUNET_CLIENT_MANAGER_get_user_context_ (client,
+ sizeof (struct
GNUNET_PSYC_Channel));
- msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA);
+ const struct MasterJoinRequest *req = (const struct MasterJoinRequest *) msg;
- int notify_ret = ch->tmit.notify_data (ch->tmit.notify_cls,
- &data_size, &msg[1]);
- switch (notify_ret)
- {
- case GNUNET_NO:
- if (0 == data_size)
- {
- /* Transmission paused, nothing to send. */
- ch->tmit_paused = GNUNET_YES;
- return;
- }
- break;
+ struct GNUNET_PSYC_MessageHeader *pmsg = NULL;
+ if (ntohs (req->header.size) <= sizeof (*req) + sizeof (*pmsg))
+ pmsg = (struct GNUNET_PSYC_MessageHeader *) &req[1];
- case GNUNET_YES:
- ch->tmit.state = MSG_STATE_END;
- break;
-
- default:
- LOG (GNUNET_ERROR_TYPE_ERROR,
- "MasterTransmitNotify returned error when requesting data.\n");
-
- ch->tmit.state = MSG_STATE_CANCEL;
- msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL);
- msg->size = htons (sizeof (*msg));
- queue_message (ch, msg, GNUNET_YES);
- return;
- }
-
- if (0 < data_size)
- {
- GNUNET_assert (data_size <= GNUNET_PSYC_DATA_MAX_PAYLOAD);
- msg->size = htons (sizeof (*msg) + data_size);
- queue_message (ch, msg, !notify_ret);
- }
-
- /* End of message. */
- if (GNUNET_YES == notify_ret)
- {
- msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END);
- msg->size = htons (sizeof (*msg));
- queue_message (ch, msg, GNUNET_YES);
- }
-}
-
-
-/**
- * Send a message to a channel.
- *
- * @param ch Handle to the PSYC channel.
- * @param method_name Which method should be invoked.
- * @param notify_mod Function to call to obtain modifiers.
- * @param notify_data Function to call to obtain fragments of the data.
- * @param notify_cls Closure for @a notify_mod and @a notify_data.
- * @param flags Flags for the message being transmitted.
- *
- * @return Transmission handle, NULL on error (i.e. more than one request
queued).
- */
-static struct GNUNET_PSYC_ChannelTransmitHandle *
-channel_transmit (struct GNUNET_PSYC_Channel *ch,
- const char *method_name,
- GNUNET_PSYC_TransmitNotifyModifier notify_mod,
- GNUNET_PSYC_TransmitNotifyData notify_data,
- void *notify_cls,
- uint32_t flags)
-{
- if (GNUNET_NO != ch->in_transmit)
- return NULL;
- ch->in_transmit = GNUNET_YES;
-
- size_t size = strlen (method_name) + 1;
- struct GNUNET_PSYC_MessageMethod *pmeth;
- struct GNUNET_MessageHeader *qmsg;
- struct MessageQueue *
- mq = ch->tmit_msg = GNUNET_malloc (sizeof (*mq) + sizeof (*qmsg)
- + sizeof (*pmeth) + size);
- qmsg = (struct GNUNET_MessageHeader *) &mq[1];
- qmsg->size = sizeof (*qmsg) + sizeof (*pmeth) + size;
-
- pmeth = (struct GNUNET_PSYC_MessageMethod *) &qmsg[1];
- pmeth->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD);
- pmeth->header.size = htons (sizeof (*pmeth) + size);
- pmeth->flags = htonl (flags);
- memcpy (&pmeth[1], method_name, size);
-
- ch->tmit.ch = ch;
- ch->tmit.notify_mod = notify_mod;
- ch->tmit.notify_data = notify_data;
- ch->tmit.notify_cls = notify_cls;
- ch->tmit.state = MSG_STATE_MODIFIER;
-
- channel_transmit_mod (ch);
- return &ch->tmit;
-}
-
-
-/**
- * Resume transmission to the channel.
- *
- * @param th Handle of the request that is being resumed.
- */
-static void
-channel_transmit_resume (struct GNUNET_PSYC_ChannelTransmitHandle *th)
-{
- struct GNUNET_PSYC_Channel *ch = th->ch;
- if (0 == ch->tmit_ack_pending)
- {
- ch->tmit_paused = GNUNET_NO;
- channel_transmit_data (ch);
- }
-}
-
-
-/**
- * Abort transmission request to channel.
- *
- * @param th Handle of the request that is being aborted.
- */
-static void
-channel_transmit_cancel (struct GNUNET_PSYC_ChannelTransmitHandle *th)
-{
- struct GNUNET_PSYC_Channel *ch = th->ch;
- if (GNUNET_NO == ch->in_transmit)
- return;
-}
-
-
-/**
- * Handle incoming message from the PSYC service.
- *
- * @param ch The channel the message is sent to.
- * @param pmsg The message.
- */
-static void
-handle_psyc_message (struct GNUNET_PSYC_Channel *ch,
- const struct GNUNET_PSYC_MessageHeader *msg)
-{
- uint16_t size = ntohs (msg->header.size);
- uint32_t flags = ntohl (msg->flags);
-
- GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG,
- (struct GNUNET_MessageHeader *) msg);
-
- if (MSG_STATE_START == ch->recv_state)
- {
- ch->recv_message_id = GNUNET_ntohll (msg->message_id);
- ch->recv_flags = flags;
- ch->recv_slave_key = msg->slave_key;
- ch->recv_mod_value_size = 0;
- ch->recv_mod_value_size_expected = 0;
- }
- else if (GNUNET_ntohll (msg->message_id) != ch->recv_message_id)
- {
- // FIXME
- LOG (GNUNET_ERROR_TYPE_WARNING,
- "Unexpected message ID. Got: %" PRIu64 ", expected: %" PRIu64 "\n",
- GNUNET_ntohll (msg->message_id), ch->recv_message_id);
- GNUNET_break_op (0);
- recv_error (ch);
- return;
- }
- else if (flags != ch->recv_flags)
- {
- LOG (GNUNET_ERROR_TYPE_WARNING,
- "Unexpected message flags. Got: %lu, expected: %lu\n",
- flags, ch->recv_flags);
- GNUNET_break_op (0);
- recv_error (ch);
- return;
- }
-
- uint16_t pos = 0, psize = 0, ptype, size_eq, size_min;
-
- for (pos = 0; sizeof (*msg) + pos < size; pos += psize)
- {
- const struct GNUNET_MessageHeader *pmsg
- = (const struct GNUNET_MessageHeader *) ((char *) &msg[1] + pos);
- psize = ntohs (pmsg->size);
- ptype = ntohs (pmsg->type);
- size_eq = size_min = 0;
-
- if (psize < sizeof (*pmsg) || sizeof (*msg) + pos + psize > size)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- "Dropping message of type %u with invalid size %u.\n",
- ptype, psize);
- recv_error (ch);
- return;
- }
-
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Received message part from PSYC.\n");
- GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, pmsg);
-
- switch (ptype)
- {
- case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
- size_min = sizeof (struct GNUNET_PSYC_MessageMethod);
- break;
- case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
- size_min = sizeof (struct GNUNET_PSYC_MessageModifier);
- break;
- case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT:
- case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
- size_min = sizeof (struct GNUNET_MessageHeader);
- break;
- case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
- case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
- size_eq = sizeof (struct GNUNET_MessageHeader);
- break;
- default:
- GNUNET_break_op (0);
- recv_error (ch);
- return;
- }
-
- if (! ((0 < size_eq && psize == size_eq)
- || (0 < size_min && size_min <= psize)))
- {
- GNUNET_break_op (0);
- recv_error (ch);
- return;
- }
-
- switch (ptype)
- {
- case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
- {
- struct GNUNET_PSYC_MessageMethod *meth
- = (struct GNUNET_PSYC_MessageMethod *) pmsg;
-
- if (MSG_STATE_START != ch->recv_state)
- {
- LOG (GNUNET_ERROR_TYPE_WARNING,
- "Dropping out of order message method (%u).\n",
- ch->recv_state);
- /* It is normal to receive an incomplete message right after
connecting,
- * but should not happen later.
- * FIXME: add a check for this condition.
- */
- GNUNET_break_op (0);
- recv_error (ch);
- return;
- }
-
- if ('\0' != *((char *) meth + psize - 1))
- {
- LOG (GNUNET_ERROR_TYPE_WARNING,
- "Dropping message with malformed method. "
- "Message ID: %" PRIu64 "\n", ch->recv_message_id);
- GNUNET_break_op (0);
- recv_error (ch);
- return;
- }
- ch->recv_state = MSG_STATE_METHOD;
- break;
- }
- case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
- {
- if (!(MSG_STATE_METHOD == ch->recv_state
- || MSG_STATE_MODIFIER == ch->recv_state
- || MSG_STATE_MOD_CONT == ch->recv_state))
- {
- LOG (GNUNET_ERROR_TYPE_WARNING,
- "Dropping out of order message modifier (%u).\n",
- ch->recv_state);
- GNUNET_break_op (0);
- recv_error (ch);
- return;
- }
-
- struct GNUNET_PSYC_MessageModifier *mod
- = (struct GNUNET_PSYC_MessageModifier *) pmsg;
-
- uint16_t name_size = ntohs (mod->name_size);
- ch->recv_mod_value_size_expected = ntohl (mod->value_size);
- ch->recv_mod_value_size = psize - sizeof (*mod) - name_size - 1;
-
- if (psize < sizeof (*mod) + name_size + 1
- || '\0' != *((char *) &mod[1] + name_size)
- || ch->recv_mod_value_size_expected < ch->recv_mod_value_size)
- {
- LOG (GNUNET_ERROR_TYPE_WARNING, "Dropping malformed modifier.\n");
- GNUNET_break_op (0);
- recv_error (ch);
- return;
- }
- ch->recv_state = MSG_STATE_MODIFIER;
- break;
- }
- case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT:
- {
- ch->recv_mod_value_size += psize - sizeof (*pmsg);
-
- if (!(MSG_STATE_MODIFIER == ch->recv_state
- || MSG_STATE_MOD_CONT == ch->recv_state)
- || ch->recv_mod_value_size_expected < ch->recv_mod_value_size)
- {
- LOG (GNUNET_ERROR_TYPE_WARNING,
- "Dropping out of order message modifier continuation "
- "!(%u == %u || %u == %u) || %lu < %lu.\n",
- MSG_STATE_MODIFIER, ch->recv_state,
- MSG_STATE_MOD_CONT, ch->recv_state,
- ch->recv_mod_value_size_expected, ch->recv_mod_value_size);
- GNUNET_break_op (0);
- recv_error (ch);
- return;
- }
- break;
- }
- case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
- {
- if (ch->recv_state < MSG_STATE_METHOD
- || ch->recv_mod_value_size_expected != ch->recv_mod_value_size)
- {
- LOG (GNUNET_ERROR_TYPE_WARNING,
- "Dropping out of order message data fragment "
- "(%u < %u || %lu != %lu).\n",
- ch->recv_state, MSG_STATE_METHOD,
- ch->recv_mod_value_size_expected, ch->recv_mod_value_size);
-
- GNUNET_break_op (0);
- recv_error (ch);
- return;
- }
- ch->recv_state = MSG_STATE_DATA;
- break;
- }
- }
-
- GNUNET_PSYC_MessageCallback message_cb
- = ch->recv_flags & GNUNET_PSYC_MESSAGE_HISTORIC
- ? ch->hist_message_cb
- : ch->message_cb;
-
- if (NULL != message_cb)
- message_cb (ch->cb_cls, ch->recv_message_id, ch->recv_flags, pmsg);
-
- switch (ptype)
- {
- case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
- case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
- recv_reset (ch);
- break;
- }
- }
-}
-
-
-/**
- * Handle incoming message acknowledgement from the PSYC service.
- *
- * @param ch The channel the acknowledgement is sent to.
- */
-static void
-handle_psyc_message_ack (struct GNUNET_PSYC_Channel *ch)
-{
- if (0 == ch->tmit_ack_pending)
- {
- LOG (GNUNET_ERROR_TYPE_WARNING, "Ignoring extraneous message ACK\n");
- GNUNET_break (0);
- return;
- }
- ch->tmit_ack_pending--;
-
- switch (ch->tmit.state)
- {
- case MSG_STATE_MODIFIER:
- case MSG_STATE_MOD_CONT:
- if (GNUNET_NO == ch->tmit_paused)
- channel_transmit_mod (ch);
- break;
-
- case MSG_STATE_DATA:
- if (GNUNET_NO == ch->tmit_paused)
- channel_transmit_data (ch);
- break;
-
- case MSG_STATE_END:
- case MSG_STATE_CANCEL:
- break;
-
- default:
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Ignoring message ACK in state %u.\n", ch->tmit.state);
- }
-}
-
-
-static void
-handle_psyc_join_request (struct GNUNET_PSYC_Master *mst,
- const struct MasterJoinRequest *req)
-{
- struct GNUNET_PSYC_MessageHeader *msg = NULL;
- if (ntohs (req->header.size) <= sizeof (*req) + sizeof (*msg))
- msg = (struct GNUNET_PSYC_MessageHeader *) &req[1];
-
struct GNUNET_PSYC_JoinHandle *jh = GNUNET_malloc (sizeof (*jh));
jh->mst = mst;
jh->slave_key = req->slave_key;
if (NULL != mst->join_req_cb)
- mst->join_req_cb (mst->ch.cb_cls, &req->slave_key, msg, jh);
+ mst->join_req_cb (mst->cb_cls, &req->slave_key, pmsg, jh);
}
static void
-handle_psyc_join_decision (struct GNUNET_PSYC_Slave *slv,
- const struct SlaveJoinDecision *dcsn)
+slave_recv_join_ack (void *cls,
+ struct GNUNET_CLIENT_MANAGER_Connection *client,
+ const struct GNUNET_MessageHeader *msg)
{
- struct GNUNET_PSYC_MessageHeader *msg = NULL;
- if (ntohs (dcsn->header.size) <= sizeof (*dcsn) + sizeof (*msg))
- msg = (struct GNUNET_PSYC_MessageHeader *) &dcsn[1];
-
- struct GNUNET_PSYC_JoinHandle *jh = GNUNET_malloc (sizeof (*jh));
- if (NULL != slv->join_dcsn_cb)
- slv->join_dcsn_cb (slv->ch.cb_cls, ntohl (dcsn->is_admitted), msg);
+ struct GNUNET_PSYC_Slave *
+ slv = GNUNET_CLIENT_MANAGER_get_user_context_ (client,
+ sizeof (struct
GNUNET_PSYC_Channel));
+ struct CountersResult *cres = (struct CountersResult *) msg;
+ if (NULL != slv->connect_cb)
+ slv->connect_cb (slv->cb_cls, GNUNET_ntohll (cres->max_message_id));
}
-/**
- * Type of a function to call when we receive a message
- * from the service.
- *
- * @param cls closure
- * @param msg message received, NULL on timeout or fatal error
- */
static void
-message_handler (void *cls,
- const struct GNUNET_MessageHeader *msg)
+slave_recv_join_decision (void *cls,
+ struct GNUNET_CLIENT_MANAGER_Connection *client,
+ const struct GNUNET_MessageHeader *msg)
{
- struct GNUNET_PSYC_Channel *ch = cls;
- struct GNUNET_PSYC_Master *mst = cls;
- struct GNUNET_PSYC_Slave *slv = cls;
+ struct GNUNET_PSYC_Slave *
+ slv = GNUNET_CLIENT_MANAGER_get_user_context_ (client,
+ sizeof (struct
GNUNET_PSYC_Channel));
+ const struct SlaveJoinDecision *
+ dcsn = (const struct SlaveJoinDecision *) msg;
- if (NULL == msg)
- {
- // timeout / disconnected from service, reconnect
- reschedule_connect (ch);
- return;
- }
- uint16_t size_eq = 0;
- uint16_t size_min = 0;
- uint16_t size = ntohs (msg->size);
- uint16_t type = ntohs (msg->type);
+ struct GNUNET_PSYC_MessageHeader *pmsg = NULL;
+ if (ntohs (dcsn->header.size) <= sizeof (*dcsn) + sizeof (*pmsg))
+ pmsg = (struct GNUNET_PSYC_MessageHeader *) &dcsn[1];
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Received message of type %d and size %u from PSYC service\n",
- type, size);
-
- switch (type)
- {
- case GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK:
- case GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK:
- size_eq = sizeof (struct CountersResult);
- break;
- case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE:
- size_min = sizeof (struct GNUNET_PSYC_MessageHeader);
- break;
- case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK:
- size_eq = sizeof (struct GNUNET_MessageHeader);
- break;
- case GNUNET_MESSAGE_TYPE_PSYC_JOIN_REQUEST:
- size_min = sizeof (struct MasterJoinRequest);
- break;
- case GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION:
- size_min = sizeof (struct SlaveJoinDecision);
- break;
- default:
- GNUNET_break_op (0);
- return;
- }
-
- if (! ((0 < size_eq && size == size_eq)
- || (0 < size_min && size_min <= size)))
- {
- GNUNET_break_op (0);
- return;
- }
-
- switch (type)
- {
- case GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK:
- {
- struct CountersResult *cres = (struct CountersResult *) msg;
- if (NULL != mst->start_cb)
- mst->start_cb (ch->cb_cls, GNUNET_ntohll (cres->max_message_id));
- break;
- }
- case GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK:
- {
- struct CountersResult *cres = (struct CountersResult *) msg;
- if (NULL != slv->connect_cb)
- slv->connect_cb (ch->cb_cls, GNUNET_ntohll (cres->max_message_id));
- break;
- }
- case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK:
- {
- handle_psyc_message_ack (ch);
- break;
- }
-
- case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE:
- handle_psyc_message (ch, (const struct GNUNET_PSYC_MessageHeader *) msg);
- break;
-
- case GNUNET_MESSAGE_TYPE_PSYC_JOIN_REQUEST:
- handle_psyc_join_request ((struct GNUNET_PSYC_Master *) ch,
- (const struct MasterJoinRequest *) msg);
- break;
-
- case GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION:
- handle_psyc_join_decision ((struct GNUNET_PSYC_Slave *) ch,
- (const struct SlaveJoinDecision *) msg);
- break;
- }
-
- if (NULL != ch->client)
- {
- GNUNET_CLIENT_receive (ch->client, &message_handler, ch,
- GNUNET_TIME_UNIT_FOREVER_REL);
- }
+ struct GNUNET_PSYC_JoinHandle *jh = GNUNET_malloc (sizeof (*jh));
+ if (NULL != slv->join_dcsn_cb)
+ slv->join_dcsn_cb (slv->cb_cls, ntohl (dcsn->is_admitted), pmsg);
}
-/**
- * Transmit next message to service.
- *
- * @param cls The struct GNUNET_PSYC_Channel.
- * @param size Number of bytes available in @a buf.
- * @param buf Where to copy the message.
- *
- * @return Number of bytes copied to @a buf.
- */
-static size_t
-send_next_message (void *cls, size_t size, void *buf)
+static struct GNUNET_CLIENT_MANAGER_MessageHandler master_handlers[] =
{
- LOG (GNUNET_ERROR_TYPE_DEBUG, "send_next_message()\n");
- struct GNUNET_PSYC_Channel *ch = cls;
- struct MessageQueue *mq = ch->tmit_head;
- if (NULL == mq)
- return 0;
- struct GNUNET_MessageHeader *qmsg = (struct GNUNET_MessageHeader *) &mq[1];
- size_t ret = ntohs (qmsg->size);
- ch->th = NULL;
- if (ret > size)
- {
- reschedule_connect (ch);
- return 0;
- }
- memcpy (buf, qmsg, ret);
+ { &channel_recv_disconnect, NULL, 0, 0, GNUNET_NO },
- GNUNET_CONTAINER_DLL_remove (ch->tmit_head, ch->tmit_tail, mq);
- GNUNET_free (mq);
+ { &channel_recv_message, NULL,
+ GNUNET_MESSAGE_TYPE_PSYC_MESSAGE,
+ sizeof (struct GNUNET_PSYC_MessageHeader), GNUNET_YES },
- if (NULL != ch->tmit_head)
- transmit_next (ch);
+ { &channel_recv_message_ack, NULL,
+ GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK,
+ sizeof (struct GNUNET_MessageHeader), GNUNET_NO },
- if (GNUNET_NO == ch->in_receive)
- {
- ch->in_receive = GNUNET_YES;
- GNUNET_CLIENT_receive (ch->client, &message_handler, ch,
- GNUNET_TIME_UNIT_FOREVER_REL);
- }
- return ret;
-}
+ { &master_recv_start_ack, NULL,
+ GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK,
+ sizeof (struct CountersResult), GNUNET_NO },
+ { &master_recv_join_request, NULL,
+ GNUNET_MESSAGE_TYPE_PSYC_JOIN_REQUEST,
+ sizeof (struct MasterJoinRequest), GNUNET_YES },
-/**
- * Schedule transmission of the next message from our queue.
- *
- * @param ch PSYC handle.
- */
-static void
-transmit_next (struct GNUNET_PSYC_Channel *ch)
-{
- LOG (GNUNET_ERROR_TYPE_DEBUG, "transmit_next()\n");
- if (NULL != ch->th || NULL == ch->client)
- return;
+ { NULL, NULL, 0, 0, GNUNET_NO }
+};
- struct MessageQueue *mq = ch->tmit_head;
- if (NULL == mq)
- return;
- struct GNUNET_MessageHeader *qmsg = (struct GNUNET_MessageHeader *) &mq[1];
- ch->th = GNUNET_CLIENT_notify_transmit_ready (ch->client,
- ntohs (qmsg->size),
- GNUNET_TIME_UNIT_FOREVER_REL,
- GNUNET_NO,
- &send_next_message,
- ch);
-}
-
-
-/**
- * Try again to connect to the PSYC service.
- *
- * @param cls Channel handle.
- * @param tc Scheduler context.
- */
-static void
-reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+static struct GNUNET_CLIENT_MANAGER_MessageHandler slave_handlers[] =
{
- struct GNUNET_PSYC_Channel *ch = cls;
+ { &channel_recv_disconnect, NULL, 0, 0, GNUNET_NO },
- recv_reset (ch);
- ch->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Connecting to PSYC service.\n");
- GNUNET_assert (NULL == ch->client);
- ch->client = GNUNET_CLIENT_connect ("psyc", ch->cfg);
- GNUNET_assert (NULL != ch->client);
- uint16_t reconn_size = ntohs (ch->reconnect_msg->size);
+ { &channel_recv_message, NULL,
+ GNUNET_MESSAGE_TYPE_PSYC_MESSAGE,
+ sizeof (struct GNUNET_PSYC_MessageHeader), GNUNET_YES },
- if (NULL == ch->tmit_head ||
- 0 != memcmp (&ch->tmit_head[1], ch->reconnect_msg, reconn_size))
- {
- struct MessageQueue *mq = GNUNET_malloc (sizeof (*mq) + reconn_size);
- memcpy (&mq[1], ch->reconnect_msg, reconn_size);
- GNUNET_CONTAINER_DLL_insert (ch->tmit_head, ch->tmit_tail, mq);
- }
- transmit_next (ch);
-}
+ { &channel_recv_message_ack, NULL,
+ GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK,
+ sizeof (struct GNUNET_MessageHeader), GNUNET_NO },
+ { &slave_recv_join_ack, NULL,
+ GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK,
+ sizeof (struct CountersResult), GNUNET_NO },
-/**
- * Disconnect from the PSYC service.
- *
- * @param c Channel handle to disconnect.
- */
-static void
-disconnect (void *c)
-{
- struct GNUNET_PSYC_Channel *ch = c;
+ { &slave_recv_join_decision, NULL,
+ GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION,
+ sizeof (struct SlaveJoinDecision), GNUNET_YES },
- GNUNET_assert (NULL != ch);
- if (ch->tmit_head != ch->tmit_tail)
- {
- LOG (GNUNET_ERROR_TYPE_ERROR,
- "Disconnecting while there are still outstanding messages!\n");
- GNUNET_break (0);
- }
- if (ch->reconnect_task != GNUNET_SCHEDULER_NO_TASK)
- {
- GNUNET_SCHEDULER_cancel (ch->reconnect_task);
- ch->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
- }
- if (NULL != ch->th)
- {
- GNUNET_CLIENT_notify_transmit_ready_cancel (ch->th);
- ch->th = NULL;
- }
- if (NULL != ch->client)
- {
- GNUNET_CLIENT_disconnect (ch->client);
- ch->client = NULL;
- }
- if (NULL != ch->reconnect_msg)
- {
- GNUNET_free (ch->reconnect_msg);
- ch->reconnect_msg = NULL;
- }
-}
+ { NULL, NULL, 0, 0, GNUNET_NO }
+};
/**
@@ -1227,24 +370,29 @@
void *cls)
{
struct GNUNET_PSYC_Master *mst = GNUNET_malloc (sizeof (*mst));
- struct GNUNET_PSYC_Channel *ch = &mst->ch;
+ struct GNUNET_PSYC_Channel *chn = &mst->chn;
+
struct MasterStartRequest *req = GNUNET_malloc (sizeof (*req));
-
req->header.size = htons (sizeof (*req));
req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START);
req->channel_key = *channel_key;
req->policy = policy;
+ chn->connect_msg = (struct GNUNET_MessageHeader *) req;
+ chn->cfg = cfg;
+ chn->is_master = GNUNET_YES;
+
mst->start_cb = start_cb;
mst->join_req_cb = join_request_cb;
- ch->message_cb = message_cb;
- ch->cb_cls = cls;
- ch->cfg = cfg;
- ch->is_master = GNUNET_YES;
- ch->reconnect_msg = (struct GNUNET_MessageHeader *) req;
- ch->reconnect_delay = GNUNET_TIME_UNIT_ZERO;
- ch->reconnect_task = GNUNET_SCHEDULER_add_now (&reconnect, mst);
+ mst->cb_cls = cls;
+ chn->client = GNUNET_CLIENT_MANAGER_connect (cfg, "psyc", master_handlers);
+ GNUNET_CLIENT_MANAGER_set_user_context_ (chn->client, mst, sizeof (*chn));
+
+ chn->tmit = GNUNET_PSYC_transmit_create (chn->client);
+ chn->recv = GNUNET_PSYC_receive_create (message_cb, message_cb, cls);
+
+ channel_send_connect_msg (chn);
return mst;
}
@@ -1253,12 +401,13 @@
* Stop a PSYC master channel.
*
* @param master PSYC channel master to stop.
+ * @param keep_active FIXME
*/
void
-GNUNET_PSYC_master_stop (struct GNUNET_PSYC_Master *master)
+GNUNET_PSYC_master_stop (struct GNUNET_PSYC_Master *mst)
{
- disconnect (master);
- GNUNET_free (master);
+ GNUNET_CLIENT_MANAGER_disconnect (mst->chn.client, GNUNET_YES);
+ GNUNET_free (mst);
}
@@ -1292,7 +441,7 @@
const struct GNUNET_PeerIdentity *relays,
const struct GNUNET_PSYC_MessageHeader *join_resp)
{
- struct GNUNET_PSYC_Channel *ch = &jh->mst->ch;
+ struct GNUNET_PSYC_Channel *chn = &jh->mst->chn;
struct MasterJoinDecision *dcsn;
uint16_t join_resp_size
= (NULL != join_resp) ? ntohs (join_resp->header.size) : 0;
@@ -1302,9 +451,7 @@
< sizeof (*dcsn) + relay_size + join_resp_size)
return GNUNET_SYSERR;
- struct MessageQueue *mq = GNUNET_malloc (sizeof (*mq) + sizeof (*dcsn)
- + relay_size + join_resp_size);
- dcsn = (struct MasterJoinDecision *) &mq[1];
+ dcsn = GNUNET_malloc (sizeof (*dcsn) + relay_size + join_resp_size);
dcsn->header.size = htons (sizeof (*dcsn) + relay_size + join_resp_size);
dcsn->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION);
dcsn->is_admitted = htonl (is_admitted);
@@ -1313,8 +460,7 @@
if (0 < join_resp_size)
memcpy (&dcsn[1], join_resp, join_resp_size);
- GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, mq);
- transmit_next (ch);
+ GNUNET_CLIENT_MANAGER_transmit (chn->client, &dcsn->header);
return GNUNET_OK;
}
@@ -1332,44 +478,63 @@
* @return Transmission handle, NULL on error (i.e. more than one request
queued).
*/
struct GNUNET_PSYC_MasterTransmitHandle *
-GNUNET_PSYC_master_transmit (struct GNUNET_PSYC_Master *master,
+GNUNET_PSYC_master_transmit (struct GNUNET_PSYC_Master *mst,
const char *method_name,
GNUNET_PSYC_TransmitNotifyModifier notify_mod,
GNUNET_PSYC_TransmitNotifyData notify_data,
void *notify_cls,
enum GNUNET_PSYC_MasterTransmitFlags flags)
{
- return (struct GNUNET_PSYC_MasterTransmitHandle *)
- channel_transmit (&master->ch, method_name, notify_mod, notify_data,
- notify_cls, flags);
+ if (GNUNET_OK
+ == GNUNET_PSYC_transmit_message (mst->chn.tmit, method_name, NULL,
+ notify_mod, notify_data, notify_cls,
+ flags))
+ return (struct GNUNET_PSYC_MasterTransmitHandle *) mst->chn.tmit;
+ else
+ return NULL;
}
/**
* Resume transmission to the channel.
*
- * @param th Handle of the request that is being resumed.
+ * @param tmit Handle of the request that is being resumed.
*/
void
-GNUNET_PSYC_master_transmit_resume (struct GNUNET_PSYC_MasterTransmitHandle
*th)
+GNUNET_PSYC_master_transmit_resume (struct GNUNET_PSYC_MasterTransmitHandle
*tmit)
{
- channel_transmit_resume ((struct GNUNET_PSYC_ChannelTransmitHandle *) th);
+ GNUNET_PSYC_transmit_resume ((struct GNUNET_PSYC_TransmitHandle *) tmit);
}
/**
* Abort transmission request to the channel.
*
- * @param th Handle of the request that is being aborted.
+ * @param tmit Handle of the request that is being aborted.
*/
void
-GNUNET_PSYC_master_transmit_cancel (struct GNUNET_PSYC_MasterTransmitHandle
*th)
+GNUNET_PSYC_master_transmit_cancel (struct GNUNET_PSYC_MasterTransmitHandle
*tmit)
{
- channel_transmit_cancel ((struct GNUNET_PSYC_ChannelTransmitHandle *) th);
+ GNUNET_PSYC_transmit_cancel ((struct GNUNET_PSYC_TransmitHandle *) tmit);
}
/**
+ * Convert a channel @a master to a @e channel handle to access the @e channel
+ * APIs.
+ *
+ * @param master Channel master handle.
+ *
+ * @return Channel handle, valid for as long as @a master is valid.
+ */
+struct GNUNET_PSYC_Channel *
+GNUNET_PSYC_master_get_channel (struct GNUNET_PSYC_Master *master)
+{
+ return &master->chn;
+}
+
+
+/**
* Join a PSYC channel.
*
* The entity joining is always the local peer. The user must immediately use
@@ -1420,7 +585,7 @@
uint16_t data_size)
{
struct GNUNET_PSYC_Slave *slv = GNUNET_malloc (sizeof (*slv));
- struct GNUNET_PSYC_Channel *ch = &slv->ch;
+ struct GNUNET_PSYC_Channel *chn = &slv->chn;
struct SlaveJoinRequest *req
= GNUNET_malloc (sizeof (*req) + relay_count * sizeof (*relays));
req->header.size = htons (sizeof (*req)
@@ -1432,17 +597,21 @@
req->relay_count = htonl (relay_count);
memcpy (&req[1], relays, relay_count * sizeof (*relays));
+ chn->connect_msg = (struct GNUNET_MessageHeader *) req;
+ chn->cfg = cfg;
+ chn->is_master = GNUNET_NO;
+
slv->connect_cb = connect_cb;
slv->join_dcsn_cb = join_decision_cb;
- ch->message_cb = message_cb;
- ch->cb_cls = cls;
+ slv->cb_cls = cls;
- ch->cfg = cfg;
- ch->is_master = GNUNET_NO;
- ch->reconnect_msg = (struct GNUNET_MessageHeader *) req;
- ch->reconnect_delay = GNUNET_TIME_UNIT_ZERO;
- ch->reconnect_task = GNUNET_SCHEDULER_add_now (&reconnect, slv);
+ chn->client = GNUNET_CLIENT_MANAGER_connect (cfg, "psyc", slave_handlers);
+ GNUNET_CLIENT_MANAGER_set_user_context_ (chn->client, slv, sizeof (*chn));
+ chn->recv = GNUNET_PSYC_receive_create (message_cb, message_cb, cls);
+ chn->tmit = GNUNET_PSYC_transmit_create (chn->client);
+
+ channel_send_connect_msg (chn);
return slv;
}
@@ -1456,10 +625,10 @@
* @param slave Slave handle.
*/
void
-GNUNET_PSYC_slave_part (struct GNUNET_PSYC_Slave *slave)
+GNUNET_PSYC_slave_part (struct GNUNET_PSYC_Slave *slv)
{
- disconnect (slave);
- GNUNET_free (slave);
+ GNUNET_CLIENT_MANAGER_disconnect (slv->chn.client, GNUNET_YES);
+ GNUNET_free (slv);
}
@@ -1477,69 +646,59 @@
* queued).
*/
struct GNUNET_PSYC_SlaveTransmitHandle *
-GNUNET_PSYC_slave_transmit (struct GNUNET_PSYC_Slave *slave,
+GNUNET_PSYC_slave_transmit (struct GNUNET_PSYC_Slave *slv,
const char *method_name,
GNUNET_PSYC_TransmitNotifyModifier notify_mod,
GNUNET_PSYC_TransmitNotifyData notify_data,
void *notify_cls,
enum GNUNET_PSYC_SlaveTransmitFlags flags)
+
{
- return (struct GNUNET_PSYC_SlaveTransmitHandle *)
- channel_transmit (&slave->ch, method_name,
- notify_mod, notify_data, notify_cls, flags);
+ if (GNUNET_OK
+ == GNUNET_PSYC_transmit_message (slv->chn.tmit, method_name, NULL,
+ notify_mod, notify_data, notify_cls,
+ flags))
+ return (struct GNUNET_PSYC_SlaveTransmitHandle *) slv->chn.tmit;
+ else
+ return NULL;
}
/**
* Resume transmission to the master.
*
- * @param th Handle of the request that is being resumed.
+ * @param tmit Handle of the request that is being resumed.
*/
void
-GNUNET_PSYC_slave_transmit_resume (struct GNUNET_PSYC_SlaveTransmitHandle *th)
+GNUNET_PSYC_slave_transmit_resume (struct GNUNET_PSYC_SlaveTransmitHandle
*tmit)
{
- channel_transmit_resume ((struct GNUNET_PSYC_ChannelTransmitHandle *) th);
+ GNUNET_PSYC_transmit_resume ((struct GNUNET_PSYC_TransmitHandle *) tmit);
}
/**
* Abort transmission request to master.
*
- * @param th Handle of the request that is being aborted.
+ * @param tmit Handle of the request that is being aborted.
*/
void
-GNUNET_PSYC_slave_transmit_cancel (struct GNUNET_PSYC_SlaveTransmitHandle *th)
+GNUNET_PSYC_slave_transmit_cancel (struct GNUNET_PSYC_SlaveTransmitHandle
*tmit)
{
- channel_transmit_cancel ((struct GNUNET_PSYC_ChannelTransmitHandle *) th);
+ GNUNET_PSYC_transmit_cancel ((struct GNUNET_PSYC_TransmitHandle *) tmit);
}
/**
- * Convert a channel @a master to a @e channel handle to access the @e channel
- * APIs.
- *
- * @param master Channel master handle.
- *
- * @return Channel handle, valid for as long as @a master is valid.
- */
-struct GNUNET_PSYC_Channel *
-GNUNET_PSYC_master_get_channel (struct GNUNET_PSYC_Master *master)
-{
- return &master->ch;
-}
-
-
-/**
* Convert @a slave to a @e channel handle to access the @e channel APIs.
*
- * @param slave Slave handle.
+ * @param slv Slave handle.
*
* @return Channel handle, valid for as long as @a slave is valid.
*/
struct GNUNET_PSYC_Channel *
-GNUNET_PSYC_slave_get_channel (struct GNUNET_PSYC_Slave *slave)
+GNUNET_PSYC_slave_get_channel (struct GNUNET_PSYC_Slave *slv)
{
- return &slave->ch;
+ return &slv->chn;
}
@@ -1565,23 +724,17 @@
* @param effective_since Addition of slave is in effect since this message ID.
*/
void
-GNUNET_PSYC_channel_slave_add (struct GNUNET_PSYC_Channel *channel,
+GNUNET_PSYC_channel_slave_add (struct GNUNET_PSYC_Channel *chn,
const struct GNUNET_CRYPTO_EddsaPublicKey
*slave_key,
uint64_t announced_at,
uint64_t effective_since)
{
- struct ChannelSlaveAdd *slvadd;
- struct MessageQueue *mq = GNUNET_malloc (sizeof (*mq) + sizeof (*slvadd));
-
- slvadd = (struct ChannelSlaveAdd *) &mq[1];
- slvadd->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_ADD);
- slvadd->header.size = htons (sizeof (*slvadd));
- slvadd->announced_at = GNUNET_htonll (announced_at);
- slvadd->effective_since = GNUNET_htonll (effective_since);
- GNUNET_CONTAINER_DLL_insert_tail (channel->tmit_head,
- channel->tmit_tail,
- mq);
- transmit_next (channel);
+ struct ChannelSlaveAdd *add = GNUNET_malloc (sizeof (*add));
+ add->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_ADD);
+ add->header.size = htons (sizeof (*add));
+ add->announced_at = GNUNET_htonll (announced_at);
+ add->effective_since = GNUNET_htonll (effective_since);
+ GNUNET_CLIENT_MANAGER_transmit (chn->client, &add->header);
}
@@ -1607,21 +760,15 @@
* @param announced_at ID of the message that announced the membership change.
*/
void
-GNUNET_PSYC_channel_slave_remove (struct GNUNET_PSYC_Channel *channel,
+GNUNET_PSYC_channel_slave_remove (struct GNUNET_PSYC_Channel *chn,
const struct GNUNET_CRYPTO_EddsaPublicKey
*slave_key,
uint64_t announced_at)
{
- struct ChannelSlaveRemove *slvrm;
- struct MessageQueue *mq = GNUNET_malloc (sizeof (*mq) + sizeof (*slvrm));
-
- slvrm = (struct ChannelSlaveRemove *) &mq[1];
- slvrm->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_RM);
- slvrm->header.size = htons (sizeof (*slvrm));
- slvrm->announced_at = GNUNET_htonll (announced_at);
- GNUNET_CONTAINER_DLL_insert_tail (channel->tmit_head,
- channel->tmit_tail,
- mq);
- transmit_next (channel);
+ struct ChannelSlaveRemove *rm = GNUNET_malloc (sizeof (*rm));
+ rm->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_RM);
+ rm->header.size = htons (sizeof (*rm));
+ rm->announced_at = GNUNET_htonll (announced_at);
+ GNUNET_CLIENT_MANAGER_transmit (chn->client, &rm->header);
}
Modified: gnunet/src/psyc/psyc_util_lib.c
===================================================================
--- gnunet/src/psyc/psyc_util_lib.c 2014-05-29 16:35:53 UTC (rev 33442)
+++ gnunet/src/psyc/psyc_util_lib.c 2014-05-29 16:35:55 UTC (rev 33443)
@@ -28,6 +28,7 @@
#include "platform.h"
#include "gnunet_util_lib.h"
+#include "gnunet_env_lib.h"
#include "gnunet_psyc_service.h"
#include "gnunet_psyc_util_lib.h"
Modified: gnunet/src/psyc/test_psyc.c
===================================================================
--- gnunet/src/psyc/test_psyc.c 2014-05-29 16:35:53 UTC (rev 33442)
+++ gnunet/src/psyc/test_psyc.c 2014-05-29 16:35:55 UTC (rev 33443)
@@ -68,6 +68,7 @@
struct GNUNET_PSYC_MasterTransmitHandle *mst_tmit;
struct GNUNET_PSYC_SlaveTransmitHandle *slv_tmit;
struct GNUNET_ENV_Environment *env;
+ struct GNUNET_ENV_Modifier *mod;
char *data[16];
const char *mod_value;
size_t mod_value_size;
@@ -79,7 +80,7 @@
struct TransmitClosure *tmit;
-static int join_req_count;
+static uint8_t join_req_count;
enum
{
@@ -183,7 +184,7 @@
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
"Master got message part of type %u and size %u "
- "belonging to message ID %llu with flags %xu\n",
+ "belonging to message ID %llu with flags %x\n",
type, size, message_id, flags);
switch (test)
@@ -227,7 +228,7 @@
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
"Slave got message part of type %u and size %u "
- "belonging to message ID %llu with flags %xu\n",
+ "belonging to message ID %llu with flags %x\n",
type, size, message_id, flags);
switch (test)
@@ -256,6 +257,48 @@
static int
+tmit_notify_data (void *cls, uint16_t *data_size, void *data)
+{
+ struct TransmitClosure *tmit = cls;
+ if (0 == tmit->data_count)
+ {
+ *data_size = 0;
+ return GNUNET_YES;
+ }
+
+ uint16_t size = strlen (tmit->data[tmit->n]);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Transmit notify data: %u bytes available, "
+ "processing fragment %u/%u (size %u).\n",
+ *data_size, tmit->n + 1, tmit->data_count, size);
+ if (*data_size < size)
+ {
+ *data_size = 0;
+ GNUNET_assert (0);
+ return GNUNET_SYSERR;
+ }
+
+ if (GNUNET_YES != tmit->paused && 0 < tmit->data_delay[tmit->n])
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transmission paused.\n");
+ tmit->paused = GNUNET_YES;
+ GNUNET_SCHEDULER_add_delayed (
+ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
+ tmit->data_delay[tmit->n]),
+ &transmit_resume, tmit);
+ *data_size = 0;
+ return GNUNET_NO;
+ }
+ tmit->paused = GNUNET_NO;
+
+ *data_size = size;
+ memcpy (data, tmit->data[tmit->n], size);
+
+ return ++tmit->n < tmit->data_count ? GNUNET_NO : GNUNET_YES;
+}
+
+
+static int
tmit_notify_mod (void *cls, uint16_t *data_size, void *data, uint8_t *oper,
uint32_t *full_value_size)
{
@@ -265,41 +308,39 @@
"%u modifiers left to process.\n",
*data_size, GNUNET_ENV_environment_get_count (tmit->env));
- enum GNUNET_ENV_Operator op = 0;
- const char *name = NULL;
- const char *value = NULL;
uint16_t name_size = 0;
size_t value_size = 0;
+ const char *value = NULL;
- if (NULL != oper)
+ if (NULL != oper && NULL != tmit->mod)
{ /* New modifier */
- if (GNUNET_NO == GNUNET_ENV_environment_shift (tmit->env, &op, &name,
- (void *) &value,
&value_size))
+ tmit->mod = tmit->mod->next;
+ if (NULL == tmit->mod)
{ /* No more modifiers, continue with data */
*data_size = 0;
return GNUNET_YES;
}
- GNUNET_assert (value_size < UINT32_MAX);
- *full_value_size = value_size;
- *oper = op;
- name_size = strlen (name);
+ GNUNET_assert (tmit->mod->value_size < UINT32_MAX);
+ *full_value_size = tmit->mod->value_size;
+ *oper = tmit->mod->oper;
+ name_size = strlen (tmit->mod->name);
- if (name_size + 1 + value_size <= *data_size)
+ if (name_size + 1 + tmit->mod->value_size <= *data_size)
{
- *data_size = name_size + 1 + value_size;
+ *data_size = name_size + 1 + tmit->mod->value_size;
}
else
{
- tmit->mod_value_size = value_size;
+ tmit->mod_value_size = tmit->mod->value_size;
value_size = *data_size - name_size - 1;
tmit->mod_value_size -= value_size;
- tmit->mod_value = value + value_size;
+ tmit->mod_value = tmit->mod->value + value_size;
}
- memcpy (data, name, name_size);
+ memcpy (data, tmit->mod->name, name_size);
((char *)data)[name_size] = '\0';
- memcpy ((char *)data + name_size + 1, value, value_size);
+ memcpy ((char *)data + name_size + 1, tmit->mod->value, value_size);
}
else if (NULL != tmit->mod_value && 0 < tmit->mod_value_size)
{ /* Modifier continuation */
@@ -333,48 +374,6 @@
}
-static int
-tmit_notify_data (void *cls, uint16_t *data_size, void *data)
-{
- struct TransmitClosure *tmit = cls;
- if (0 == tmit->data_count)
- {
- *data_size = 0;
- return GNUNET_YES;
- }
-
- uint16_t size = strlen (tmit->data[tmit->n]);
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Transmit notify data: %u bytes available, "
- "processing fragment %u/%u (size %u).\n",
- *data_size, tmit->n + 1, tmit->data_count, size);
- if (*data_size < size)
- {
- *data_size = 0;
- GNUNET_assert (0);
- return GNUNET_SYSERR;
- }
-
- if (GNUNET_YES != tmit->paused && 0 < tmit->data_delay[tmit->n])
- {
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transmission paused.\n");
- tmit->paused = GNUNET_YES;
- GNUNET_SCHEDULER_add_delayed (
- GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
- tmit->data_delay[tmit->n]),
- &transmit_resume, tmit);
- *data_size = 0;
- return GNUNET_NO;
- }
- tmit->paused = GNUNET_NO;
-
- *data_size = size;
- memcpy (data, tmit->data[tmit->n], size);
-
- return ++tmit->n < tmit->data_count ? GNUNET_NO : GNUNET_YES;
-}
-
-
static void
slave_join ();
@@ -388,7 +387,7 @@
if (GNUNET_YES != is_admitted)
{ /* First join request is refused, retry. */
- //GNUNET_assert (1 == join_req_count);
+ GNUNET_assert (1 == join_req_count);
slave_join ();
return;
}
@@ -403,6 +402,7 @@
"_abc", "abc def", 7);
GNUNET_ENV_environment_add (tmit->env, GNUNET_ENV_OP_ASSIGN,
"_abc_def", "abc def ghi", 11);
+ tmit->mod = GNUNET_ENV_environment_head (tmit->env);
tmit->n = 0;
tmit->data[0] = "slave test";
tmit->data_count = 1;
@@ -421,8 +421,8 @@
struct GNUNET_HashCode slave_key_hash;
GNUNET_CRYPTO_hash (slave_key, sizeof (*slave_key), &slave_key_hash);
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- "Got join request from %s.\n",
- GNUNET_h2s (&slave_key_hash));
+ "Got join request #%u from %s.\n",
+ join_req_count, GNUNET_h2s (&slave_key_hash));
/* Reject first request */
int is_admitted = (0 < join_req_count++) ? GNUNET_YES : GNUNET_NO;
@@ -493,6 +493,7 @@
name_cont, val_cont,
GNUNET_PSYC_MODIFIER_MAX_PAYLOAD - name_cont_size
+ GNUNET_PSYC_MOD_CONT_MAX_PAYLOAD);
+ tmit->mod = GNUNET_ENV_environment_head (tmit->env);
tmit->data[0] = "foo";
tmit->data[1] = GNUNET_malloc (GNUNET_PSYC_DATA_MAX_PAYLOAD + 1);
for (i = 0; i < GNUNET_PSYC_DATA_MAX_PAYLOAD; i++)
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r33443 - in gnunet/src: include multicast psyc,
gnunet <=