gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r33443 - in gnunet/src: include multicast psyc


From: gnunet
Subject: [GNUnet-SVN] r33443 - in gnunet/src: include multicast psyc
Date: Thu, 29 May 2014 18:35:55 +0200

Author: tg
Date: 2014-05-29 18:35:55 +0200 (Thu, 29 May 2014)
New Revision: 33443

Modified:
   gnunet/src/include/gnunet_multicast_service.h
   gnunet/src/include/gnunet_psyc_service.h
   gnunet/src/multicast/multicast_api.c
   gnunet/src/psyc/gnunet-service-psyc.c
   gnunet/src/psyc/psyc_api.c
   gnunet/src/psyc/psyc_util_lib.c
   gnunet/src/psyc/test_psyc.c
Log:
psyc, multicast: reorg code, use new client manager & psyc util lib

Modified: gnunet/src/include/gnunet_multicast_service.h
===================================================================
--- gnunet/src/include/gnunet_multicast_service.h       2014-05-29 16:35:53 UTC 
(rev 33442)
+++ gnunet/src/include/gnunet_multicast_service.h       2014-05-29 16:35:55 UTC 
(rev 33443)
@@ -368,9 +368,7 @@
  */
 typedef void
 (*GNUNET_MULTICAST_RequestCallback) (void *cls,
-                                     const struct GNUNET_CRYPTO_EddsaPublicKey 
*member_key,
-                                     const struct GNUNET_MessageHeader *req,
-                                     enum GNUNET_MULTICAST_MessageFlags flags);
+                                     const struct 
GNUNET_MULTICAST_RequestHeader *req);
 
 
 /**
@@ -394,7 +392,7 @@
  */
 typedef void
 (*GNUNET_MULTICAST_MessageCallback) (void *cls,
-                                     const struct GNUNET_MessageHeader *msg);
+                                     const struct 
GNUNET_MULTICAST_MessageHeader *msg);
 
 
 /**

Modified: gnunet/src/include/gnunet_psyc_service.h
===================================================================
--- gnunet/src/include/gnunet_psyc_service.h    2014-05-29 16:35:53 UTC (rev 
33442)
+++ gnunet/src/include/gnunet_psyc_service.h    2014-05-29 16:35:55 UTC (rev 
33443)
@@ -471,7 +471,7 @@
  *         Only needed during the first call to this callback at the beginning
  *         of the modifier.  In case of subsequent calls asking for value
  *         continuations @a oper is set to #NULL.
- * @param[out] value_size  Where to write the full size of the value.
+ * @param[out] full_value_size  Where to write the full size of the value.
  *         Only needed during the first call to this callback at the beginning
  *         of the modifier.  In case of subsequent calls asking for value
  *         continuations @a value_size is set to #NULL.
@@ -489,7 +489,7 @@
                                        uint16_t *data_size,
                                        void *data,
                                        uint8_t *oper,
-                                       uint32_t *value_size);
+                                       uint32_t *full_value_size);
 
 /**
  * Flags for transmitting messages to a channel by the master.

Modified: gnunet/src/multicast/multicast_api.c
===================================================================
--- gnunet/src/multicast/multicast_api.c        2014-05-29 16:35:53 UTC (rev 
33442)
+++ gnunet/src/multicast/multicast_api.c        2014-05-29 16:35:55 UTC (rev 
33443)
@@ -24,6 +24,7 @@
  * @author Christian Grothoff
  * @author Gabor X Toth
  */
+
 #include "platform.h"
 #include "gnunet_util_lib.h"
 #include "gnunet_multicast_service.h"
@@ -33,26 +34,6 @@
 
 
 /**
- * Started origins.
- * Group's pub_key_hash -> struct GNUNET_MULTICAST_Origin
- */
-static struct GNUNET_CONTAINER_MultiHashMap *origins;
-
-/**
- * Joined members.
- * group_key_hash -> struct GNUNET_MULTICAST_Member
- */
-static struct GNUNET_CONTAINER_MultiHashMap *members;
-
-
-struct MessageQueue
-{
-  struct MessageQueue *prev;
-  struct MessageQueue *next;
-};
-
-
-/**
  * Handle for a request to send a message to all multicast group members
  * (from the origin).
  */
@@ -90,48 +71,15 @@
   const struct GNUNET_CONFIGURATION_Handle *cfg;
 
   /**
-   * Socket (if available).
+   * Client connection to the service.
    */
-  struct GNUNET_CLIENT_Connection *client;
+  struct GNUNET_CLIENT_MANAGER_Connection *client;
 
   /**
-   * Currently pending transmission request, or NULL for none.
-   */
-  struct GNUNET_CLIENT_TransmitHandle *th;
-
-  /**
-   * Head of operations to transmit.
-   */
-  struct MessageQueue *tmit_head;
-
-  /**
-   * Tail of operations to transmit.
-   */
-  struct MessageQueue *tmit_tail;
-
-  /**
-   * Message being transmitted to the Multicast service.
-   */
-  struct MessageQueue *tmit_msg;
-
-  /**
    * Message to send on reconnect.
    */
-  struct GNUNET_MessageHeader *reconnect_msg;
+  struct GNUNET_MessageHeader *connect_msg;
 
-  /**
-   * Task doing exponential back-off trying to reconnect.
-   */
-  GNUNET_SCHEDULER_TaskIdentifier reconnect_task;
-
-  /**
-   * Time for next connect retry.
-   */
-  struct GNUNET_TIME_Relative reconnect_delay;
-
-  struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
-  struct GNUNET_HashCode pub_key_hash;
-
   GNUNET_MULTICAST_JoinRequestCallback join_req_cb;
   GNUNET_MULTICAST_MembershipTestCallback member_test_cb;
   GNUNET_MULTICAST_ReplayFragmentCallback replay_frag_cb;
@@ -140,11 +88,6 @@
   void *cb_cls;
 
   /**
-   * Are we polling for incoming messages right now?
-   */
-  uint8_t in_receive;
-
-  /**
    * Are we currently transmitting a message?
    */
   uint8_t in_transmit;
@@ -163,7 +106,6 @@
 {
   struct GNUNET_MULTICAST_Group grp;
   struct GNUNET_MULTICAST_OriginTransmitHandle tmit;
-  struct GNUNET_CRYPTO_EddsaPrivateKey priv_key;
 
   GNUNET_MULTICAST_RequestCallback request_cb;
 };
@@ -229,294 +171,125 @@
 };
 
 
-static void
-reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
-
-
-static void
-reschedule_connect (struct GNUNET_MULTICAST_Group *grp);
-
-
 /**
- * Schedule transmission of the next message from our queue.
- *
- * @param grp PSYC channel handle
+ * Send first message to the service after connecting.
  */
 static void
-transmit_next (struct GNUNET_MULTICAST_Group *grp);
-
-
-static void
-message_handler (void *cls, const struct GNUNET_MessageHeader *msg);
-
-
-/**
- * Reschedule a connect attempt to the service.
- *
- * @param c channel to reconnect
- */
-static void
-reschedule_connect (struct GNUNET_MULTICAST_Group *grp)
+group_send_connect_msg (struct GNUNET_MULTICAST_Group *grp)
 {
-  GNUNET_assert (grp->reconnect_task == GNUNET_SCHEDULER_NO_TASK);
-
-  if (NULL != grp->th)
-  {
-    GNUNET_CLIENT_notify_transmit_ready_cancel (grp->th);
-    grp->th = NULL;
-  }
-  if (NULL != grp->client)
-  {
-    GNUNET_CLIENT_disconnect (grp->client);
-    grp->client = NULL;
-  }
-  grp->in_receive = GNUNET_NO;
-  LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "Scheduling task to reconnect to Multicast service in %s.\n",
-       GNUNET_STRINGS_relative_time_to_string (grp->reconnect_delay, 
GNUNET_YES));
-  grp->reconnect_task =
-      GNUNET_SCHEDULER_add_delayed (grp->reconnect_delay, &reconnect, grp);
-  grp->reconnect_delay = GNUNET_TIME_STD_BACKOFF (grp->reconnect_delay);
+  uint16_t cmsg_size = ntohs (grp->connect_msg->size);
+  struct GNUNET_MessageHeader * cmsg = GNUNET_malloc (cmsg_size);
+  memcpy (cmsg, grp->connect_msg, cmsg_size);
+  GNUNET_CLIENT_MANAGER_transmit_now (grp->client, cmsg);
 }
 
 
 /**
- * Reset stored data related to the last received message.
+ * Got disconnected from service.  Reconnect.
  */
 static void
-recv_reset (struct GNUNET_MULTICAST_Group *grp)
+group_recv_disconnect (void *cls,
+                        struct GNUNET_CLIENT_MANAGER_Connection *client,
+                        const struct GNUNET_MessageHeader *msg)
 {
+  struct GNUNET_MULTICAST_Group *
+    grp = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*grp));
+  GNUNET_CLIENT_MANAGER_reconnect (client);
+  group_send_connect_msg (grp);
 }
 
 
-static void
-recv_error (struct GNUNET_MULTICAST_Group *grp)
-{
-  if (NULL != grp->message_cb)
-    grp->message_cb (grp->cb_cls, NULL);
-
-  recv_reset (grp);
-}
-
-
 /**
- * Transmit next message to service.
- *
- * @param cls  The struct GNUNET_MULTICAST_Group.
- * @param size Number of bytes available in @a buf.
- * @param buf  Where to copy the message.
- *
- * @return Number of bytes copied to @a buf.
+ * Receive join request from service.
  */
-static size_t
-send_next_message (void *cls, size_t size, void *buf)
-{
-  LOG (GNUNET_ERROR_TYPE_DEBUG, "send_next_message()\n");
-  struct GNUNET_MULTICAST_Group *grp = cls;
-  struct MessageQueue *mq = grp->tmit_head;
-  if (NULL == mq)
-    return 0;
-  struct GNUNET_MessageHeader *qmsg = (struct GNUNET_MessageHeader *) &mq[1];
-  size_t ret = ntohs (qmsg->size);
-  grp->th = NULL;
-  if (ret > size)
-  {
-    reschedule_connect (grp);
-    return 0;
-  }
-  memcpy (buf, qmsg, ret);
-
-  GNUNET_CONTAINER_DLL_remove (grp->tmit_head, grp->tmit_tail, mq);
-  GNUNET_free (mq);
-
-  if (NULL != grp->tmit_head)
-    transmit_next (grp);
-
-  if (GNUNET_NO == grp->in_receive)
-  {
-    grp->in_receive = GNUNET_YES;
-    GNUNET_CLIENT_receive (grp->client, &message_handler, grp,
-                           GNUNET_TIME_UNIT_FOREVER_REL);
-  }
-  return ret;
-}
-
-
-/**
- * Schedule transmission of the next message from our queue.
- *
- * @param grp  Multicast group handle.
- */
 static void
-transmit_next (struct GNUNET_MULTICAST_Group *grp)
+group_recv_join_request (void *cls,
+                          struct GNUNET_CLIENT_MANAGER_Connection *client,
+                          const struct GNUNET_MessageHeader *msg)
 {
-  LOG (GNUNET_ERROR_TYPE_DEBUG, "transmit_next()\n");
-  if (NULL != grp->th || NULL == grp->client)
-    return;
+  struct GNUNET_MULTICAST_Group *
+    grp = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*grp));
 
-  struct MessageQueue *mq = grp->tmit_head;
-  if (NULL == mq)
-    return;
-  struct GNUNET_MessageHeader *qmsg = (struct GNUNET_MessageHeader *) &mq[1];
+  const struct MulticastJoinRequestMessage *
+    jreq = (const struct MulticastJoinRequestMessage *) msg;
 
-  grp->th = GNUNET_CLIENT_notify_transmit_ready (grp->client,
-                                                 ntohs (qmsg->size),
-                                                 GNUNET_TIME_UNIT_FOREVER_REL,
-                                                 GNUNET_NO,
-                                                 &send_next_message,
-                                                 grp);
-}
+  struct GNUNET_MULTICAST_JoinHandle *jh = GNUNET_malloc (sizeof (*jh));
+  jh->group = grp;
+  jh->member_key = jreq->member_key;
+  jh->member_peer = jreq->member_peer;
 
+  const struct GNUNET_MessageHeader *jmsg = NULL;
+  if (sizeof (*jreq) + sizeof (*jmsg) <= ntohs (jreq->header.size))
+    jmsg = (const struct GNUNET_MessageHeader *) &jreq[1];
 
-/**
- * Try again to connect to the Multicast service.
- *
- * @param cls Channel handle.
- * @param tc Scheduler context.
- */
-static void
-reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
-{
-  struct GNUNET_MULTICAST_Group *grp = cls;
-
-  recv_reset (grp);
-  grp->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
-  LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "Connecting to Multicast service.\n");
-  GNUNET_assert (NULL == grp->client);
-  grp->client = GNUNET_CLIENT_connect ("multicast", grp->cfg);
-  GNUNET_assert (NULL != grp->client);
-  uint16_t reconn_size = ntohs (grp->reconnect_msg->size);
-
-  if (NULL == grp->tmit_head ||
-      0 != memcmp (&grp->tmit_head[1], grp->reconnect_msg, reconn_size))
-  {
-    struct MessageQueue *mq = GNUNET_malloc (sizeof (*mq) + reconn_size);
-    memcpy (&mq[1], grp->reconnect_msg, reconn_size);
-    GNUNET_CONTAINER_DLL_insert (grp->tmit_head, grp->tmit_tail, mq);
-  }
-  transmit_next (grp);
+  if (NULL != grp->join_req_cb)
+    grp->join_req_cb (grp->cb_cls, &jreq->member_key, jmsg, jh);
 }
 
 
 /**
- * Disconnect from the Multicast service.
- *
- * @param g  Group handle to disconnect.
+ * Receive multicast message from service.
  */
 static void
-disconnect (void *g)
+group_recv_message (void *cls,
+                    struct GNUNET_CLIENT_MANAGER_Connection *client,
+                    const struct GNUNET_MessageHeader *msg)
 {
-  struct GNUNET_MULTICAST_Group *grp = g;
+  struct GNUNET_MULTICAST_Group *
+    grp = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*grp));
+  struct GNUNET_MULTICAST_MessageHeader *
+    mmsg = (struct GNUNET_MULTICAST_MessageHeader *) msg;
 
-  GNUNET_assert (NULL != grp);
-  if (grp->tmit_head != grp->tmit_tail)
-  {
-    LOG (GNUNET_ERROR_TYPE_ERROR,
-         "Disconnecting while there are still outstanding messages!\n");
-    GNUNET_break (0);
-  }
-  if (grp->reconnect_task != GNUNET_SCHEDULER_NO_TASK)
-  {
-    GNUNET_SCHEDULER_cancel (grp->reconnect_task);
-    grp->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
-  }
-  if (NULL != grp->th)
-  {
-    GNUNET_CLIENT_notify_transmit_ready_cancel (grp->th);
-    grp->th = NULL;
-  }
-  if (NULL != grp->client)
-  {
-    GNUNET_CLIENT_disconnect (grp->client);
-    grp->client = NULL;
-  }
-  if (NULL != grp->reconnect_msg)
-  {
-    GNUNET_free (grp->reconnect_msg);
-    grp->reconnect_msg = NULL;
-  }
-}
-
-
-/**
- * Iterator callback for calling message callbacks for all groups.
- */
-static int
-message_cb (void *cls, const struct GNUNET_HashCode *pub_key_hash, void *group)
-{
-  const struct GNUNET_MessageHeader *msg = cls;
-  struct GNUNET_MULTICAST_Group *grp = group;
-
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Calling message callback with a message "
-              "of type %u and size %u.\n",
-              ntohs (msg->type), ntohs (msg->size));
+              "Calling message callback with a message of size %u.\n",
+              ntohs (mmsg->header.size));
 
   if (NULL != grp->message_cb)
-    grp->message_cb (grp->cb_cls, msg);
-
-  return GNUNET_YES;
+    grp->message_cb (grp->cb_cls, mmsg);
 }
 
 
 /**
- * Iterator callback for calling request callbacks of origins.
+ * Origin receives uniquest request from a member.
  */
-static int
-request_cb (void *cls, const struct GNUNET_HashCode *pub_key_hash, void 
*origin)
+static void
+origin_recv_request (void *cls,
+                     struct GNUNET_CLIENT_MANAGER_Connection *client,
+                     const struct GNUNET_MessageHeader *msg)
 {
-  const struct GNUNET_MULTICAST_RequestHeader *req = cls;
-  struct GNUNET_MULTICAST_Origin *orig = origin;
+  struct GNUNET_MULTICAST_Group *grp;
+  struct GNUNET_MULTICAST_Origin *
+    orig = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*grp));
+  grp = &orig->grp;
+  struct GNUNET_MULTICAST_RequestHeader *
+    req = (struct GNUNET_MULTICAST_RequestHeader *) msg;
 
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Calling request callback for a request of type %u and size 
%u.\n",
-              ntohs (req->header.type), ntohs (req->header.size));
+              "Calling request callback with a request of size %u.\n",
+              ntohs (req->header.size));
 
   if (NULL != orig->request_cb)
-    orig->request_cb (orig->grp.cb_cls, &req->member_key,
-                      (const struct GNUNET_MessageHeader *) req, 0);
-  return GNUNET_YES;
+    orig->request_cb (grp->cb_cls, req);
 }
 
 
 /**
- * Iterator callback for calling join request callbacks of origins.
+ * Member receives join decision.
  */
-static int
-join_request_cb (void *cls, const struct GNUNET_HashCode *pub_key_hash,
-                 void *group)
+static void
+member_recv_join_decision (void *cls,
+                           struct GNUNET_CLIENT_MANAGER_Connection *client,
+                           const struct GNUNET_MessageHeader *msg)
 {
-  const struct MulticastJoinRequestMessage *req = cls;
-  struct GNUNET_MULTICAST_Group *grp = group;
+  struct GNUNET_MULTICAST_Group *grp;
+  struct GNUNET_MULTICAST_Member *
+    mem = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*grp));
+  grp = &mem->grp;
 
-  struct GNUNET_MULTICAST_JoinHandle *jh = GNUNET_malloc (sizeof (*jh));
-  jh->group = grp;
-  jh->member_key = req->member_key;
-  jh->member_peer = req->member_peer;
-
-  const struct GNUNET_MessageHeader *msg = NULL;
-  if (sizeof (*req) + sizeof (*msg) <= ntohs (req->header.size))
-    msg = (const struct GNUNET_MessageHeader *) &req[1];
-
-  if (NULL != grp->join_req_cb)
-    grp->join_req_cb (grp->cb_cls, &req->member_key, msg, jh);
-  return GNUNET_YES;
-}
-
-
-/**
- * Iterator callback for calling join decision callbacks of members.
- */
-static int
-join_decision_cb (void *cls, const struct GNUNET_HashCode *pub_key_hash,
-                  void *member)
-{
-  const struct MulticastJoinDecisionMessageHeader *hdcsn = cls;
+  const struct MulticastJoinDecisionMessageHeader *
+    hdcsn = (const struct MulticastJoinDecisionMessageHeader *) msg;
   const struct MulticastJoinDecisionMessage *
     dcsn = (const struct MulticastJoinDecisionMessage *) &hdcsn[1];
-  struct GNUNET_MULTICAST_Member *mem = member;
-  struct GNUNET_MULTICAST_Group *grp = &mem->grp;
 
   uint16_t dcsn_size = ntohs (dcsn->header.size);
   int is_admitted = ntohl (dcsn->is_admitted);
@@ -549,125 +322,62 @@
 
   if (GNUNET_YES != is_admitted)
     GNUNET_MULTICAST_member_part (mem);
-
-  return GNUNET_YES;
 }
 
+
 /**
- * Function called when we receive a message from the service.
- *
- * @param cls  struct GNUNET_MULTICAST_Group
- * @param msg  Message received, NULL on timeout or fatal error.
+ * Message handlers for an origin.
  */
-static void
-message_handler (void *cls, const struct GNUNET_MessageHeader *msg)
+static struct GNUNET_CLIENT_MANAGER_MessageHandler origin_handlers[] =
 {
-  struct GNUNET_MULTICAST_Group *grp = cls;
+  { &group_recv_disconnect, NULL, 0, 0, GNUNET_NO },
 
-  if (NULL == msg)
-  {
-    // timeout / disconnected from service, reconnect
-    reschedule_connect (grp);
-    return;
-  }
+  { &group_recv_message, NULL,
+    GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE,
+    sizeof (struct GNUNET_MULTICAST_MessageHeader), GNUNET_YES },
 
-  uint16_t size_eq = 0;
-  uint16_t size_min = 0;
-  uint16_t size = ntohs (msg->size);
-  uint16_t type = ntohs (msg->type);
+  { &origin_recv_request, NULL,
+    GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST,
+    sizeof (struct GNUNET_MULTICAST_RequestHeader), GNUNET_YES },
 
-  LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "Received message of type %d and size %u from Multicast service\n",
-       type, size);
+  { &group_recv_join_request, NULL,
+    GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST,
+    sizeof (struct MulticastJoinRequestMessage), GNUNET_YES },
 
-  switch (type)
-  {
-  case GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE:
-    size_min = sizeof (struct GNUNET_MULTICAST_MessageHeader);
-    break;
+  { NULL, NULL, 0, 0, GNUNET_NO }
+};
 
-  case GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST:
-    size_min = sizeof (struct GNUNET_MULTICAST_RequestHeader);
-    break;
 
-  case GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST:
-    size_min = sizeof (struct MulticastJoinRequestMessage);
-    break;
+/**
+ * Message handlers for a member.
+ */
+static struct GNUNET_CLIENT_MANAGER_MessageHandler member_handlers[] =
+{
+  { &group_recv_disconnect, NULL, 0, 0, GNUNET_NO },
 
-  case GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION:
-    size_min = sizeof (struct MulticastJoinDecisionMessage);
-    break;
+  { &group_recv_message, NULL,
+    GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE,
+    sizeof (struct GNUNET_MULTICAST_MessageHeader), GNUNET_YES },
 
-  default:
-    GNUNET_break_op (0);
-    type = 0;
-  }
+  { &group_recv_join_request, NULL,
+    GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST,
+    sizeof (struct MulticastJoinRequestMessage), GNUNET_YES },
 
-  if (! ((0 < size_eq && size == size_eq)
-         || (0 < size_min && size_min <= size)))
-  {
-    GNUNET_break_op (0);
-    type = 0;
-  }
+  { &member_recv_join_decision, NULL,
+    GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION,
+    sizeof (struct MulticastJoinDecisionMessage), GNUNET_YES },
 
-  switch (type)
-  {
-  case GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE:
-    if (origins != NULL)
-      GNUNET_CONTAINER_multihashmap_get_multiple (origins, &grp->pub_key_hash,
-                                                  message_cb, (void *) msg);
-    if (members != NULL)
-      GNUNET_CONTAINER_multihashmap_get_multiple (members, &grp->pub_key_hash,
-                                                  message_cb, (void *) msg);
-    break;
+  { NULL, NULL, 0, 0, GNUNET_NO }
+};
 
-  case GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST:
-    if (GNUNET_YES != grp->is_origin)
-    {
-      GNUNET_break (0);
-      break;
-    }
-    if (NULL != origins)
-      GNUNET_CONTAINER_multihashmap_get_multiple (origins, &grp->pub_key_hash,
-                                                  request_cb, (void *) msg);
-    break;
 
-  case GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST:
-    if (NULL != origins)
-      GNUNET_CONTAINER_multihashmap_get_multiple (origins, &grp->pub_key_hash,
-                                                  join_request_cb, (void *) 
msg);
-    if (NULL != members)
-      GNUNET_CONTAINER_multihashmap_get_multiple (members, &grp->pub_key_hash,
-                                                  join_request_cb, (void *) 
msg);
-    break;
-
-  case GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION:
-    if (GNUNET_NO != grp->is_origin)
-    {
-      GNUNET_break (0);
-      break;
-    }
-    if (NULL != members)
-      GNUNET_CONTAINER_multihashmap_get_multiple (members, &grp->pub_key_hash,
-                                                  join_decision_cb, (void *) 
msg);
-    break;
-  }
-
-  if (NULL != grp->client)
-  {
-    GNUNET_CLIENT_receive (grp->client, &message_handler, grp,
-                           GNUNET_TIME_UNIT_FOREVER_REL);
-  }
-}
-
-
 /**
  * Function to call with the decision made for a join request.
  *
  * Must be called once and only once in response to an invocation of the
  * #GNUNET_MULTICAST_JoinRequestCallback.
  *
- * @param jh Join request handle.
+ * @param join  Join request handle.
  * @param is_admitted  #GNUNET_YES    if the join is approved,
  *                     #GNUNET_NO     if it is disapproved,
  *                     #GNUNET_SYSERR if we cannot answer the request.
@@ -685,27 +395,25 @@
  *        peer that issued the request even if admission is denied.
  */
 struct GNUNET_MULTICAST_ReplayHandle *
-GNUNET_MULTICAST_join_decision (struct GNUNET_MULTICAST_JoinHandle *jh,
+GNUNET_MULTICAST_join_decision (struct GNUNET_MULTICAST_JoinHandle *join,
                                 int is_admitted,
                                 uint16_t relay_count,
                                 const struct GNUNET_PeerIdentity *relays,
                                 const struct GNUNET_MessageHeader *join_resp)
 {
-  struct GNUNET_MULTICAST_Group *grp = jh->group;
+  struct GNUNET_MULTICAST_Group *grp = join->group;
   uint16_t join_resp_size = (NULL != join_resp) ? ntohs (join_resp->size) : 0;
   uint16_t relay_size = relay_count * sizeof (*relays);
+
   struct MulticastJoinDecisionMessageHeader * hdcsn;
   struct MulticastJoinDecisionMessage *dcsn;
-  struct MessageQueue *
-    mq = GNUNET_malloc (sizeof (*mq) + sizeof (*hdcsn) + sizeof (*dcsn)
-                        + relay_size + join_resp_size);
-
-  hdcsn = (struct MulticastJoinDecisionMessageHeader *) &mq[1];
+  hdcsn = GNUNET_malloc (sizeof (*hdcsn) + sizeof (*dcsn)
+                         + relay_size + join_resp_size);
+  hdcsn->header.size = htons (sizeof (*hdcsn) + sizeof (*dcsn)
+                              + relay_size + join_resp_size);
   hdcsn->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION);
-  hdcsn->header.size = htons (sizeof (*hdcsn) + sizeof (*dcsn)
-                                + relay_size + join_resp_size);
-  hdcsn->member_key = jh->member_key;
-  hdcsn->peer = jh->member_peer;
+  hdcsn->member_key = join->member_key;
+  hdcsn->peer = join->member_peer;
 
   dcsn = (struct MulticastJoinDecisionMessage *) &hdcsn[1];
   dcsn->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION);
@@ -717,10 +425,8 @@
   if (0 < join_resp_size)
     memcpy (((char *) &dcsn[1]) + relay_size, join_resp, join_resp_size);
 
-  GNUNET_CONTAINER_DLL_insert_tail (grp->tmit_head, grp->tmit_tail, mq);
-  transmit_next (grp);
-
-  GNUNET_free (jh);
+  GNUNET_CLIENT_MANAGER_transmit (grp->client, &hdcsn->header);
+  GNUNET_free (join);
   return NULL;
 }
 
@@ -832,7 +538,7 @@
   start->max_fragment_id = max_fragment_id;
   memcpy (&start->group_key, priv_key, sizeof (*priv_key));
 
-  grp->reconnect_msg = (struct GNUNET_MessageHeader *) start;
+  grp->connect_msg = (struct GNUNET_MessageHeader *) start;
   grp->is_origin = GNUNET_YES;
   grp->cfg = cfg;
 
@@ -844,21 +550,11 @@
   grp->message_cb = message_cb;
 
   orig->request_cb = request_cb;
-  orig->priv_key = *priv_key;
 
-  GNUNET_CRYPTO_eddsa_key_get_public (&orig->priv_key, &grp->pub_key);
-  GNUNET_CRYPTO_hash (&grp->pub_key, sizeof (grp->pub_key),
-                      &grp->pub_key_hash);
+  grp->client = GNUNET_CLIENT_MANAGER_connect (cfg, "multicast", 
origin_handlers);
+  GNUNET_CLIENT_MANAGER_set_user_context_ (grp->client, orig, sizeof (*grp));
+  group_send_connect_msg (grp);
 
-  if (NULL == origins)
-    origins = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
-
-  GNUNET_CONTAINER_multihashmap_put (origins, &grp->pub_key_hash, orig,
-                                     
GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
-
-  grp->reconnect_delay = GNUNET_TIME_UNIT_ZERO;
-  grp->reconnect_task = GNUNET_SCHEDULER_add_now (&reconnect, grp);
-
   return orig;
 }
 
@@ -871,8 +567,7 @@
 void
 GNUNET_MULTICAST_origin_stop (struct GNUNET_MULTICAST_Origin *orig)
 {
-  disconnect (&orig->grp);
-  GNUNET_CONTAINER_multihashmap_remove (origins, &orig->grp.pub_key_hash, 
orig);
+  GNUNET_CLIENT_MANAGER_disconnect (orig->grp.client, GNUNET_YES);
   GNUNET_free (orig);
 }
 
@@ -885,26 +580,22 @@
   struct GNUNET_MULTICAST_OriginTransmitHandle *tmit = &orig->tmit;
 
   size_t buf_size = GNUNET_MULTICAST_FRAGMENT_MAX_SIZE;
-  struct MessageQueue *mq = GNUNET_malloc (sizeof (*mq) + buf_size);
-  GNUNET_CONTAINER_DLL_insert_tail (grp->tmit_head, grp->tmit_tail, mq);
-
-  struct GNUNET_MULTICAST_MessageHeader *
-    msg = (struct GNUNET_MULTICAST_MessageHeader *) &mq[1];
+  struct GNUNET_MULTICAST_MessageHeader *msg = GNUNET_malloc (buf_size);
   int ret = tmit->notify (tmit->notify_cls, &buf_size, &msg[1]);
 
   if (! (GNUNET_YES == ret || GNUNET_NO == ret)
-      || GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < buf_size)
+      || GNUNET_MULTICAST_FRAGMENT_MAX_SIZE < buf_size)
   {
     LOG (GNUNET_ERROR_TYPE_ERROR,
          "OriginTransmitNotify() returned error or invalid message size.\n");
     /* FIXME: handle error */
-    GNUNET_free (mq);
+    GNUNET_free (msg);
     return;
   }
 
   if (GNUNET_NO == ret && 0 == buf_size)
   {
-    GNUNET_free (mq);
+    GNUNET_free (msg);
     return; /* Transmission paused. */
   }
 
@@ -915,7 +606,7 @@
   msg->fragment_offset = GNUNET_htonll (tmit->fragment_offset);
   tmit->fragment_offset += sizeof (*msg) + buf_size;
 
-  transmit_next (grp);
+  GNUNET_CLIENT_MANAGER_transmit (grp->client, &msg->header);
 }
 
 
@@ -939,6 +630,12 @@
                                 GNUNET_MULTICAST_OriginTransmitNotify notify,
                                 void *notify_cls)
 {
+/* FIXME
+  if (GNUNET_YES == orig->grp.in_transmit)
+    return NULL;
+  orig->grp.in_transmit = GNUNET_YES;
+*/
+
   struct GNUNET_MULTICAST_OriginTransmitHandle *tmit = &orig->tmit;
   tmit->origin = orig;
   tmit->message_id = message_id;
@@ -1047,10 +744,9 @@
   if (0 < join_msg_size)
     memcpy (((char *) &join[1]) + relay_size, join_msg, join_msg_size);
 
-  grp->reconnect_msg = (struct GNUNET_MessageHeader *) join;
+  grp->connect_msg = (struct GNUNET_MessageHeader *) join;
   grp->is_origin = GNUNET_NO;
   grp->cfg = cfg;
-  grp->pub_key = *group_key;
 
   mem->join_dcsn_cb = join_decision_cb;
   grp->join_req_cb = join_request_cb;
@@ -1059,18 +755,10 @@
   grp->message_cb = message_cb;
   grp->cb_cls = cls;
 
-  GNUNET_CRYPTO_eddsa_key_get_public (member_key, &grp->pub_key);
-  GNUNET_CRYPTO_hash (&grp->pub_key, sizeof (grp->pub_key), 
&grp->pub_key_hash);
+  grp->client = GNUNET_CLIENT_MANAGER_connect (cfg, "multicast", 
member_handlers);
+  GNUNET_CLIENT_MANAGER_set_user_context_ (grp->client, mem, sizeof (*grp));
+  group_send_connect_msg (grp);
 
-  if (NULL == members)
-    members = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
-
-  GNUNET_CONTAINER_multihashmap_put (members, &grp->pub_key_hash, mem,
-                                     
GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
-
-  grp->reconnect_delay = GNUNET_TIME_UNIT_ZERO;
-  grp->reconnect_task = GNUNET_SCHEDULER_add_now (&reconnect, grp);
-
   return mem;
 }
 
@@ -1088,8 +776,7 @@
 void
 GNUNET_MULTICAST_member_part (struct GNUNET_MULTICAST_Member *mem)
 {
-  disconnect (&mem->grp);
-  GNUNET_CONTAINER_multihashmap_remove (members, &mem->grp.pub_key_hash, mem);
+  GNUNET_CLIENT_MANAGER_disconnect (mem->grp.client, GNUNET_YES);
   GNUNET_free (mem);
 }
 
@@ -1162,25 +849,26 @@
   struct GNUNET_MULTICAST_Group *grp = &mem->grp;
   struct GNUNET_MULTICAST_MemberTransmitHandle *tmit = &mem->tmit;
 
-  size_t buf_size = GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD;
-  struct MessageQueue *mq = GNUNET_malloc (sizeof (*mq) + buf_size);
-  GNUNET_CONTAINER_DLL_insert_tail (grp->tmit_head, grp->tmit_tail, mq);
-
-  struct GNUNET_MULTICAST_RequestHeader *
-    req = (struct GNUNET_MULTICAST_RequestHeader *) &mq[1];
+  size_t buf_size = GNUNET_MULTICAST_FRAGMENT_MAX_SIZE;
+  struct GNUNET_MULTICAST_RequestHeader *req = GNUNET_malloc (buf_size);
   int ret = tmit->notify (tmit->notify_cls, &buf_size, &req[1]);
 
   if (! (GNUNET_YES == ret || GNUNET_NO == ret)
-      || GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < buf_size)
+      || GNUNET_MULTICAST_FRAGMENT_MAX_SIZE < buf_size)
   {
     LOG (GNUNET_ERROR_TYPE_ERROR,
          "MemberTransmitNotify() returned error or invalid message size.\n");
     /* FIXME: handle error */
+    GNUNET_free (req);
     return;
   }
 
   if (GNUNET_NO == ret && 0 == buf_size)
-    return; /* Transmission paused. */
+  {
+    /* Transmission paused. */
+    GNUNET_free (req);
+    return;
+  }
 
   req->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST);
   req->header.size = htons (sizeof (*req) + buf_size);
@@ -1188,7 +876,7 @@
   req->fragment_offset = GNUNET_ntohll (tmit->fragment_offset);
   tmit->fragment_offset += sizeof (*req) + buf_size;
 
-  transmit_next (grp);
+  GNUNET_CLIENT_MANAGER_transmit (grp->client, &req->header);
 }
 
 
@@ -1207,6 +895,12 @@
                                    GNUNET_MULTICAST_MemberTransmitNotify 
notify,
                                    void *notify_cls)
 {
+/* FIXME
+  if (GNUNET_YES == mem->grp.in_transmit)
+    return NULL;
+  mem->grp.in_transmit = GNUNET_YES;
+*/
+
   struct GNUNET_MULTICAST_MemberTransmitHandle *tmit = &mem->tmit;
   tmit->member = mem;
   tmit->request_id = request_id;

Modified: gnunet/src/psyc/gnunet-service-psyc.c
===================================================================
--- gnunet/src/psyc/gnunet-service-psyc.c       2014-05-29 16:35:53 UTC (rev 
33442)
+++ gnunet/src/psyc/gnunet-service-psyc.c       2014-05-29 16:35:55 UTC (rev 
33443)
@@ -34,6 +34,7 @@
 #include "gnunet_multicast_service.h"
 #include "gnunet_psycstore_service.h"
 #include "gnunet_psyc_service.h"
+#include "gnunet_psyc_util_lib.h"
 #include "psyc.h"
 
 
@@ -174,10 +175,10 @@
 /**
  * List of connected clients.
  */
-struct ClientList
+struct ClientListItem
 {
-  struct ClientList *prev;
-  struct ClientList *next;
+  struct ClientListItem *prev;
+  struct ClientListItem *next;
   struct GNUNET_SERVER_Client *client;
 };
 
@@ -187,8 +188,8 @@
  */
 struct Channel
 {
-  struct ClientList *clients_head;
-  struct ClientList *clients_tail;
+  struct ClientListItem *clients_head;
+  struct ClientListItem *clients_tail;
 
   struct TransmitMessage *tmit_head;
   struct TransmitMessage *tmit_tail;
@@ -282,7 +283,7 @@
   /**
    * Channel struct common for Master and Slave
    */
-  struct Channel ch;
+  struct Channel chn;
 
   /**
    * Private key of the channel.
@@ -339,7 +340,7 @@
   /**
    * Channel struct common for Master and Slave
    */
-  struct Channel ch;
+  struct Channel chn;
 
   /**
    * Private key of the slave.
@@ -399,11 +400,11 @@
 
 
 static inline void
-transmit_message (struct Channel *ch);
+transmit_message (struct Channel *chn);
 
 
 static uint64_t
-message_queue_drop (struct Channel *ch);
+message_queue_drop (struct Channel *chn);
 
 
 /**
@@ -434,12 +435,12 @@
 static void
 cleanup_master (struct Master *mst)
 {
-  struct Channel *ch = &mst->ch;
+  struct Channel *chn = &mst->chn;
 
   if (NULL != mst->origin)
     GNUNET_MULTICAST_origin_stop (mst->origin);
   GNUNET_CONTAINER_multihashmap_destroy (mst->join_reqs);
-  GNUNET_CONTAINER_multihashmap_remove (masters, &ch->pub_key_hash, ch);
+  GNUNET_CONTAINER_multihashmap_remove (masters, &chn->pub_key_hash, chn);
 }
 
 
@@ -449,20 +450,20 @@
 static void
 cleanup_slave (struct Slave *slv)
 {
-  struct Channel *ch = &slv->ch;
+  struct Channel *chn = &slv->chn;
   struct GNUNET_CONTAINER_MultiHashMap *
-    ch_slv = GNUNET_CONTAINER_multihashmap_get (channel_slaves,
-                                                &ch->pub_key_hash);
-  GNUNET_assert (NULL != ch_slv);
-  GNUNET_CONTAINER_multihashmap_remove (ch_slv, &slv->pub_key_hash, slv);
+    chn_slv = GNUNET_CONTAINER_multihashmap_get (channel_slaves,
+                                                &chn->pub_key_hash);
+  GNUNET_assert (NULL != chn_slv);
+  GNUNET_CONTAINER_multihashmap_remove (chn_slv, &slv->pub_key_hash, slv);
 
-  if (0 == GNUNET_CONTAINER_multihashmap_size (ch_slv))
+  if (0 == GNUNET_CONTAINER_multihashmap_size (chn_slv))
   {
-    GNUNET_CONTAINER_multihashmap_remove (channel_slaves, &ch->pub_key_hash,
-                                          ch_slv);
-    GNUNET_CONTAINER_multihashmap_destroy (ch_slv);
+    GNUNET_CONTAINER_multihashmap_remove (channel_slaves, &chn->pub_key_hash,
+                                          chn_slv);
+    GNUNET_CONTAINER_multihashmap_destroy (chn_slv);
   }
-  GNUNET_CONTAINER_multihashmap_remove (slaves, &ch->pub_key_hash, slv);
+  GNUNET_CONTAINER_multihashmap_remove (slaves, &chn->pub_key_hash, slv);
 
   if (NULL != slv->join_req)
     GNUNET_free (slv->join_req);
@@ -470,7 +471,7 @@
     GNUNET_free (slv->relays);
   if (NULL != slv->member)
     GNUNET_MULTICAST_member_part (slv->member);
-  GNUNET_CONTAINER_multihashmap_remove (slaves, &ch->pub_key_hash, ch);
+  GNUNET_CONTAINER_multihashmap_remove (slaves, &chn->pub_key_hash, chn);
 }
 
 
@@ -478,18 +479,18 @@
  * Clean up channel data structures after a client disconnected.
  */
 static void
-cleanup_channel (struct Channel *ch)
+cleanup_channel (struct Channel *chn)
 {
-  message_queue_drop (ch);
-  GNUNET_CONTAINER_multihashmap_remove_all (recv_cache, &ch->pub_key_hash);
+  message_queue_drop (chn);
+  GNUNET_CONTAINER_multihashmap_remove_all (recv_cache, &chn->pub_key_hash);
 
-  if (NULL != ch->store_op)
-    GNUNET_PSYCSTORE_operation_cancel (ch->store_op);
+  if (NULL != chn->store_op)
+    GNUNET_PSYCSTORE_operation_cancel (chn->store_op);
 
-  (GNUNET_YES == ch->is_master)
-    ? cleanup_master ((struct Master *) ch)
-    : cleanup_slave ((struct Slave *) ch);
-  GNUNET_free (ch);
+  (GNUNET_YES == chn->is_master)
+    ? cleanup_master ((struct Master *) chn)
+    : cleanup_slave ((struct Slave *) chn);
+  GNUNET_free (chn);
 }
 
 
@@ -507,41 +508,41 @@
     return;
 
   struct Channel *
-    ch = GNUNET_SERVER_client_get_user_context (client, struct Channel);
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "%p Client (%s) disconnected from channel %s\n",
-              ch, (GNUNET_YES == ch->is_master) ? "master" : "slave",
-              GNUNET_h2s (&ch->pub_key_hash));
+    chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
 
-  if (NULL == ch)
+  if (NULL == chn)
   {
-    GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
-                "%p User context is NULL in client_disconnect()\n", ch);
-    GNUNET_break (0);
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "%p User context is NULL in client_disconnect()\n", chn);
     return;
   }
 
-  struct ClientList *cl = ch->clients_head;
-  while (NULL != cl)
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "%p Client (%s) disconnected from channel %s\n",
+              chn, (GNUNET_YES == chn->is_master) ? "master" : "slave",
+              GNUNET_h2s (&chn->pub_key_hash));
+
+  struct ClientListItem *cli = chn->clients_head;
+  while (NULL != cli)
   {
-    if (cl->client == client)
+    if (cli->client == client)
     {
-      GNUNET_CONTAINER_DLL_remove (ch->clients_head, ch->clients_tail, cl);
-      GNUNET_free (cl);
+      GNUNET_CONTAINER_DLL_remove (chn->clients_head, chn->clients_tail, cli);
+      GNUNET_free (cli);
       break;
     }
-    cl = cl->next;
+    cli = cli->next;
   }
 
-  if (NULL == ch->clients_head)
+  if (NULL == chn->clients_head)
   { /* Last client disconnected. */
-    if (NULL != ch->tmit_head)
+    if (NULL != chn->tmit_head)
     { /* Send pending messages to multicast before cleanup. */
-      transmit_message (ch);
+      transmit_message (chn);
     }
     else
     {
-      cleanup_channel (ch);
+      cleanup_channel (chn);
     }
   }
 }
@@ -551,18 +552,18 @@
  * Send message to all clients connected to the channel.
  */
 static void
-msg_to_clients (const struct Channel *ch,
-                const struct GNUNET_MessageHeader *msg)
+client_send_msg (const struct Channel *chn,
+                 const struct GNUNET_MessageHeader *msg)
 {
   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
-              "%p Sending message to clients.\n", ch);
+              "%p Sending message to clients.\n", chn);
 
-  struct ClientList *cl = ch->clients_head;
-  while (NULL != cl)
+  struct ClientListItem *cli = chn->clients_head;
+  while (NULL != cli)
   {
-    GNUNET_SERVER_notification_context_add (nc, cl->client);
-    GNUNET_SERVER_notification_context_unicast (nc, cl->client, msg, 
GNUNET_NO);
-    cl = cl->next;
+    GNUNET_SERVER_notification_context_add (nc, cli->client);
+    GNUNET_SERVER_notification_context_unicast (nc, cli->client, msg, 
GNUNET_NO);
+    cli = cli->next;
   }
 }
 
@@ -573,7 +574,7 @@
 struct JoinMemTestClosure
 {
   struct GNUNET_CRYPTO_EddsaPublicKey slave_key;
-  struct Channel *ch;
+  struct Channel *chn;
   struct GNUNET_MULTICAST_JoinHandle *jh;
   struct MasterJoinRequest *master_join_req;
 };
@@ -587,15 +588,15 @@
 {
   struct JoinMemTestClosure *jcls = cls;
 
-  if (GNUNET_NO == result && GNUNET_YES == jcls->ch->is_master)
+  if (GNUNET_NO == result && GNUNET_YES == jcls->chn->is_master)
   { /* Pass on join request to client if this is a master channel */
-    struct Master *mst = (struct Master *) jcls->ch;
+    struct Master *mst = (struct Master *) jcls->chn;
     struct GNUNET_HashCode slave_key_hash;
     GNUNET_CRYPTO_hash (&jcls->slave_key, sizeof (jcls->slave_key),
                         &slave_key_hash);
     GNUNET_CONTAINER_multihashmap_put (mst->join_reqs, &slave_key_hash, 
jcls->jh,
                                        
GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
-    msg_to_clients (jcls->ch, &jcls->master_join_req->header);
+    client_send_msg (jcls->chn, &jcls->master_join_req->header);
   }
   else
   {
@@ -611,13 +612,13 @@
  * Incoming join request from multicast.
  */
 static void
-mcast_join_request_cb (void *cls,
-                       const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key,
-                       const struct GNUNET_MessageHeader *join_msg,
-                       struct GNUNET_MULTICAST_JoinHandle *jh)
+mcast_recv_join_request (void *cls,
+                         const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key,
+                         const struct GNUNET_MessageHeader *join_msg,
+                         struct GNUNET_MULTICAST_JoinHandle *jh)
 {
-  struct Channel *ch = cls;
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Got join request.\n", ch);
+  struct Channel *chn = cls;
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Got join request.\n", chn);
 
   uint16_t join_msg_size = 0;
   if (NULL != join_msg)
@@ -630,7 +631,7 @@
     {
       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
                   "%p Got join message with invalid type %u.\n",
-                  ch, ntohs (join_msg->type));
+                  chn, ntohs (join_msg->type));
     }
   }
 
@@ -643,12 +644,12 @@
 
   struct JoinMemTestClosure *jcls = GNUNET_malloc (sizeof (*jcls));
   jcls->slave_key = *slave_key;
-  jcls->ch = ch;
+  jcls->chn = chn;
   jcls->jh = jh;
   jcls->master_join_req = req;
 
-  GNUNET_PSYCSTORE_membership_test (store, &ch->pub_key, slave_key,
-                                    ch->max_message_id, 0,
+  GNUNET_PSYCSTORE_membership_test (store, &chn->pub_key, slave_key,
+                                    chn->max_message_id, 0,
                                     &join_mem_test_cb, jcls);
 }
 
@@ -657,14 +658,14 @@
  * Join decision received from multicast.
  */
 static void
-mcast_join_decision_cb (void *cls, int is_admitted,
+mcast_recv_join_decision (void *cls, int is_admitted,
                         const struct GNUNET_PeerIdentity *peer,
                         uint16_t relay_count,
                         const struct GNUNET_PeerIdentity *relays,
                         const struct GNUNET_MessageHeader *join_resp)
 {
   struct Slave *slv = cls;
-  struct Channel *ch = &slv->ch;
+  struct Channel *chn = &slv->chn;
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "%p Got join decision: %d\n", slv, is_admitted);
 
@@ -677,11 +678,11 @@
   if (0 < join_resp_size)
     memcpy (&dcsn[1], join_resp, join_resp_size);
 
-  msg_to_clients (ch, &dcsn->header);
+  client_send_msg (chn, &dcsn->header);
 
   if (GNUNET_YES == is_admitted)
   {
-    ch->ready = GNUNET_YES;
+    chn->ready = GNUNET_YES;
   }
   else
   {
@@ -691,20 +692,20 @@
 
 
 static void
-mcast_membership_test_cb (void *cls,
-                          const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key,
-                          uint64_t message_id, uint64_t group_generation,
-                          struct GNUNET_MULTICAST_MembershipTestHandle *mth)
+mcast_recv_membership_test (void *cls,
+                            const struct GNUNET_CRYPTO_EddsaPublicKey 
*slave_key,
+                            uint64_t message_id, uint64_t group_generation,
+                            struct GNUNET_MULTICAST_MembershipTestHandle *mth)
 {
 
 }
 
 
 static void
-mcast_replay_fragment_cb (void *cls,
-                          const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key,
-                          uint64_t fragment_id, uint64_t flags,
-                          struct GNUNET_MULTICAST_ReplayHandle *rh)
+mcast_recv_replay_fragment (void *cls,
+                            const struct GNUNET_CRYPTO_EddsaPublicKey 
*slave_key,
+                            uint64_t fragment_id, uint64_t flags,
+                            struct GNUNET_MULTICAST_ReplayHandle *rh)
 
 {
 
@@ -712,25 +713,17 @@
 
 
 static void
-mcast_replay_message_cb (void *cls,
-                         const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key,
-                         uint64_t message_id,
-                         uint64_t fragment_offset,
-                         uint64_t flags,
-                         struct GNUNET_MULTICAST_ReplayHandle *rh)
+mcast_recv_replay_message (void *cls,
+                           const struct GNUNET_CRYPTO_EddsaPublicKey 
*slave_key,
+                           uint64_t message_id,
+                           uint64_t fragment_offset,
+                           uint64_t flags,
+                           struct GNUNET_MULTICAST_ReplayHandle *rh)
 {
 
 }
 
 
-static void
-fragment_store_result (void *cls, int64_t result, const char *err_msg)
-{
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "fragment_store() returned %l (%s)\n", result, err_msg);
-}
-
-
 /**
  * Convert an uint64_t in network byte order to a HashCode
  * that can be used as key in a MultiHashMap
@@ -772,17 +765,17 @@
  * Send multicast message to all clients connected to the channel.
  */
 static void
-mmsg_to_clients (struct Channel *ch,
-                 const struct GNUNET_MULTICAST_MessageHeader *mmsg)
+client_send_mcast_msg (struct Channel *chn,
+                       const struct GNUNET_MULTICAST_MessageHeader *mmsg)
 {
+  struct GNUNET_PSYC_MessageHeader *pmsg;
   uint16_t size = ntohs (mmsg->header.size);
-  struct GNUNET_PSYC_MessageHeader *pmsg;
   uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg);
 
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "%p Sending message to client. "
+              "%p Sending multicast message to client. "
               "fragment_id: %" PRIu64 ", message_id: %" PRIu64 "\n",
-              ch, GNUNET_ntohll (mmsg->fragment_id),
+              chn, GNUNET_ntohll (mmsg->fragment_id),
               GNUNET_ntohll (mmsg->message_id));
 
   pmsg = GNUNET_malloc (psize);
@@ -791,22 +784,53 @@
   pmsg->message_id = mmsg->message_id;
 
   memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg));
-  msg_to_clients (ch, &pmsg->header);
+  client_send_msg (chn, &pmsg->header);
   GNUNET_free (pmsg);
 }
 
 
 /**
+ * Send multicast request to all clients connected to the channel.
+ */
+static void
+client_send_mcast_req (struct Master *mst,
+                       const struct GNUNET_MULTICAST_RequestHeader *req)
+{
+  struct Channel *chn = &mst->chn;
+
+  struct GNUNET_PSYC_MessageHeader *pmsg;
+  uint16_t size = ntohs (req->header.size);
+  uint16_t psize = sizeof (*pmsg) + size - sizeof (*req);
+
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "%p Sending multicast request to client. "
+              "fragment_id: %" PRIu64 ", message_id: %" PRIu64 "\n",
+              chn, GNUNET_ntohll (req->fragment_id),
+              GNUNET_ntohll (req->request_id));
+
+  pmsg = GNUNET_malloc (psize);
+  pmsg->header.size = htons (psize);
+  pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
+  pmsg->message_id = req->request_id;
+  pmsg->flags = htonl (GNUNET_PSYC_MESSAGE_REQUEST);
+
+  memcpy (&pmsg[1], &req[1], size - sizeof (*req));
+  client_send_msg (chn, &pmsg->header);
+  GNUNET_free (pmsg);
+}
+
+
+/**
  * Insert a multicast message fragment into the queue belonging to the message.
  *
- * @param ch           Channel.
+ * @param chn           Channel.
  * @param mmsg         Multicast message fragment.
  * @param msg_id_hash  Message ID of @a mmsg in a struct GNUNET_HashCode.
  * @param first_ptype  First PSYC message part type in @a mmsg.
  * @param last_ptype   Last PSYC message part type in @a mmsg.
  */
 static void
-fragment_queue_insert (struct Channel *ch,
+fragment_queue_insert (struct Channel *chn,
                        const struct GNUNET_MULTICAST_MessageHeader *mmsg,
                        uint16_t first_ptype, uint16_t last_ptype)
 {
@@ -814,13 +838,13 @@
   const uint64_t frag_offset = GNUNET_ntohll (mmsg->fragment_offset);
   struct GNUNET_CONTAINER_MultiHashMap
     *chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache,
-                                                    &ch->pub_key_hash);
+                                                    &chn->pub_key_hash);
 
   struct GNUNET_HashCode msg_id_hash;
   hash_key_from_nll (&msg_id_hash, mmsg->message_id);
 
   struct FragmentQueue
-    *fragq = GNUNET_CONTAINER_multihashmap_get (ch->recv_frags, &msg_id_hash);
+    *fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &msg_id_hash);
 
   if (NULL == fragq)
   {
@@ -829,13 +853,13 @@
     fragq->fragments
       = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
 
-    GNUNET_CONTAINER_multihashmap_put (ch->recv_frags, &msg_id_hash, fragq,
+    GNUNET_CONTAINER_multihashmap_put (chn->recv_frags, &msg_id_hash, fragq,
                                        
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
 
     if (NULL == chan_msgs)
     {
       chan_msgs = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
-      GNUNET_CONTAINER_multihashmap_put (recv_cache, &ch->pub_key_hash, 
chan_msgs,
+      GNUNET_CONTAINER_multihashmap_put (recv_cache, &chn->pub_key_hash, 
chan_msgs,
                                          
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
     }
   }
@@ -850,7 +874,7 @@
                 "%p Adding message fragment to cache. "
                 "message_id: %" PRIu64 ", fragment_id: %" PRIu64 ", "
                 "header_size: %" PRIu64 " + %u).\n",
-                ch, GNUNET_ntohll (mmsg->message_id),
+                chn, GNUNET_ntohll (mmsg->message_id),
                 GNUNET_ntohll (mmsg->fragment_id),
                 fragq->header_size, size);
     cache_entry = GNUNET_new (struct RecvCacheEntry);
@@ -867,7 +891,7 @@
                 "%p Message fragment is already in cache. "
                 "message_id: %" PRIu64 ", fragment_id: %" PRIu64
                 ", ref_count: %u\n",
-                ch, GNUNET_ntohll (mmsg->message_id),
+                chn, GNUNET_ntohll (mmsg->message_id),
                 GNUNET_ntohll (mmsg->fragment_id), cache_entry->ref_count);
   }
 
@@ -890,11 +914,11 @@
     { /* header is now complete */
       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
                   "%p Header of message %" PRIu64 " is complete.\n",
-                  ch, GNUNET_ntohll (mmsg->message_id));
+                  chn, GNUNET_ntohll (mmsg->message_id));
 
       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
                   "%p Adding message %" PRIu64 " to queue.\n",
-                  ch, GNUNET_ntohll (mmsg->message_id));
+                  chn, GNUNET_ntohll (mmsg->message_id));
       fragq->state = MSG_FRAG_STATE_DATA;
     }
     else
@@ -902,7 +926,7 @@
       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
                   "%p Header of message %" PRIu64 " is NOT complete yet: "
                   "%" PRIu64 " != %" PRIu64 "\n",
-                  ch, GNUNET_ntohll (mmsg->message_id), frag_offset,
+                  chn, GNUNET_ntohll (mmsg->message_id), frag_offset,
                   fragq->header_size);
     }
   }
@@ -916,7 +940,7 @@
       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
                   "%p Message %" PRIu64 " is NOT complete yet: "
                   "%" PRIu64 " != %" PRIu64 "\n",
-                  ch, GNUNET_ntohll (mmsg->message_id), frag_offset,
+                  chn, GNUNET_ntohll (mmsg->message_id), frag_offset,
                   fragq->size);
     break;
 
@@ -935,7 +959,7 @@
   case MSG_FRAG_STATE_CANCEL:
     if (GNUNET_NO == fragq->queued)
     {
-      GNUNET_CONTAINER_heap_insert (ch->recv_msgs, NULL,
+      GNUNET_CONTAINER_heap_insert (chn->recv_msgs, NULL,
                                     GNUNET_ntohll (mmsg->message_id));
       fragq->queued = GNUNET_YES;
     }
@@ -953,24 +977,24 @@
  * Send fragments of a message in order to client, after all modifiers arrived
  * from multicast.
  *
- * @param ch      Channel.
+ * @param chn      Channel.
  * @param msg_id  ID of the message @a fragq belongs to.
  * @param fragq   Fragment queue of the message.
  * @param drop    Drop message without delivering to client?
  *                #GNUNET_YES or #GNUNET_NO.
  */
 static void
-fragment_queue_run (struct Channel *ch, uint64_t msg_id,
+fragment_queue_run (struct Channel *chn, uint64_t msg_id,
                     struct FragmentQueue *fragq, uint8_t drop)
 {
   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
               "%p Running message fragment queue for message %" PRIu64
               " (state: %u).\n",
-              ch, msg_id, fragq->state);
+              chn, msg_id, fragq->state);
 
   struct GNUNET_CONTAINER_MultiHashMap
     *chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache,
-                                                    &ch->pub_key_hash);
+                                                    &chn->pub_key_hash);
   GNUNET_assert (NULL != chan_msgs);
   uint64_t frag_id;
 
@@ -985,7 +1009,7 @@
     {
       if (GNUNET_NO == drop)
       {
-        mmsg_to_clients (ch, cache_entry->mmsg);
+        client_send_mcast_msg (chn, cache_entry->mmsg);
       }
       if (cache_entry->ref_count <= 1)
       {
@@ -1014,7 +1038,7 @@
     struct GNUNET_HashCode msg_id_hash;
     hash_key_from_nll (&msg_id_hash, msg_id);
 
-    GNUNET_CONTAINER_multihashmap_remove (ch->recv_frags, &msg_id_hash, fragq);
+    GNUNET_CONTAINER_multihashmap_remove (chn->recv_frags, &msg_id_hash, 
fragq);
     GNUNET_CONTAINER_heap_destroy (fragq->fragments);
     GNUNET_free (fragq);
   }
@@ -1034,33 +1058,33 @@
  * - A stateful message is only sent if the previous stateful message
  *   has already been delivered to the client.
  *
- * @param ch  Channel.
+ * @param chn  Channel.
  *
  * @return Number of messages removed from queue and sent to client.
  */
 static uint64_t
-message_queue_run (struct Channel *ch)
+message_queue_run (struct Channel *chn)
 {
   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
-              "%p Running message queue.\n", ch);
+              "%p Running message queue.\n", chn);
   uint64_t n = 0;
   uint64_t msg_id;
-  while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (ch->recv_msgs, NULL,
+  while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (chn->recv_msgs, NULL,
                                                     &msg_id))
   {
     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
-                "%p Processing message %" PRIu64 " in queue.\n", ch, msg_id);
+                "%p Processing message %" PRIu64 " in queue.\n", chn, msg_id);
     struct GNUNET_HashCode msg_id_hash;
     hash_key_from_hll (&msg_id_hash, msg_id);
 
     struct FragmentQueue *
-      fragq = GNUNET_CONTAINER_multihashmap_get (ch->recv_frags, &msg_id_hash);
+      fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, 
&msg_id_hash);
 
     if (NULL == fragq || fragq->state <= MSG_FRAG_STATE_HEADER)
     {
       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
                   "%p No fragq (%p) or header not complete.\n",
-                  ch, fragq);
+                  chn, fragq);
       break;
     }
 
@@ -1070,40 +1094,40 @@
       if (GNUNET_PSYC_STATE_NOT_MODIFIED == fragq->state_delta)
       {
         if (!(fragq->flags & GNUNET_PSYC_MESSAGE_ORDER_ANY)
-            && msg_id - 1 != ch->max_message_id)
+            && msg_id - 1 != chn->max_message_id)
         {
           GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
                       "%p Out of order message. "
                       "(%" PRIu64 " - 1 != %" PRIu64 ")\n",
-                      ch, msg_id, ch->max_message_id);
+                      chn, msg_id, chn->max_message_id);
           break;
         }
       }
       else
       {
-        if (msg_id - fragq->state_delta != ch->max_state_message_id)
+        if (msg_id - fragq->state_delta != chn->max_state_message_id)
         {
           GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
                       "%p Out of order stateful message. "
                       "(%" PRIu64 " - %" PRIu64 " != %" PRIu64 ")\n",
-                      ch, msg_id, fragq->state_delta, 
ch->max_state_message_id);
+                      chn, msg_id, fragq->state_delta, 
chn->max_state_message_id);
           break;
         }
 #if TODO
         /* FIXME: apply modifiers to state in PSYCstore */
-        GNUNET_PSYCSTORE_state_modify (store, &ch->pub_key, message_id,
-                                       state_modify_result_cb, cls);
+        GNUNET_PSYCSTORE_state_modify (store, &chn->pub_key, message_id,
+                                       store_recv_state_modify_result, cls);
 #endif
-        ch->max_state_message_id = msg_id;
+        chn->max_state_message_id = msg_id;
       }
-      ch->max_message_id = msg_id;
+      chn->max_message_id = msg_id;
     }
-    fragment_queue_run (ch, msg_id, fragq, MSG_FRAG_STATE_DROP == 
fragq->state);
-    GNUNET_CONTAINER_heap_remove_root (ch->recv_msgs);
+    fragment_queue_run (chn, msg_id, fragq, MSG_FRAG_STATE_DROP == 
fragq->state);
+    GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs);
     n++;
   }
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "%p Removed %" PRIu64 " messages from queue.\n", ch, n);
+              "%p Removed %" PRIu64 " messages from queue.\n", chn, n);
   return n;
 }
 
@@ -1113,111 +1137,92 @@
  *
  * Remove all messages in queue without sending it to clients.
  *
- * @param ch  Channel.
+ * @param chn  Channel.
  *
  * @return Number of messages removed from queue.
  */
 static uint64_t
-message_queue_drop (struct Channel *ch)
+message_queue_drop (struct Channel *chn)
 {
   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
-              "%p Dropping message queue.\n", ch);
+              "%p Dropping message queue.\n", chn);
   uint64_t n = 0;
   uint64_t msg_id;
-  while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (ch->recv_msgs, NULL,
+  while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (chn->recv_msgs, NULL,
                                                     &msg_id))
   {
     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
-                "%p Dropping message %" PRIu64 " from queue.\n", ch, msg_id);
+                "%p Dropping message %" PRIu64 " from queue.\n", chn, msg_id);
     struct GNUNET_HashCode msg_id_hash;
     hash_key_from_hll (&msg_id_hash, msg_id);
 
     struct FragmentQueue *
-      fragq = GNUNET_CONTAINER_multihashmap_get (ch->recv_frags, &msg_id_hash);
+      fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, 
&msg_id_hash);
 
-    fragment_queue_run (ch, msg_id, fragq, GNUNET_YES);
-    GNUNET_CONTAINER_heap_remove_root (ch->recv_msgs);
+    fragment_queue_run (chn, msg_id, fragq, GNUNET_YES);
+    GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs);
     n++;
   }
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "%p Removed %" PRIu64 " messages from queue.\n", ch, n);
+              "%p Removed %" PRIu64 " messages from queue.\n", chn, n);
   return n;
 }
 
 
 /**
- * Handle incoming message from multicast.
+ * Handle the result of a GNUNET_PSYCSTORE_fragment_store() operation.
+ */
+static void
+store_recv_fragment_store_result (void *cls, int64_t result, const char 
*err_msg)
+{
+  struct Channel *chn = cls;
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "%p GNUNET_PSYCSTORE_fragment_store() returned %" PRId64 " 
(%s)\n",
+              chn, result, err_msg);
+}
+
+
+/**
+ * Handle incoming message fragment from multicast.
  *
- * @param ch   Channel.
- * @param mmsg Multicast message.
- *
- * @return #GNUNET_OK or #GNUNET_SYSERR
+ * Store it using PSYCstore and send it to the clients of the channel in order.
  */
-static int
-client_multicast_message (struct Channel *ch,
-                          const struct GNUNET_MULTICAST_MessageHeader *mmsg)
+static void
+mcast_recv_message (void *cls, const struct GNUNET_MULTICAST_MessageHeader 
*mmsg)
 {
-  GNUNET_PSYCSTORE_fragment_store (store, &ch->pub_key, mmsg, 0, NULL, NULL);
+  struct Channel *chn = cls;
+  uint16_t size = ntohs (mmsg->header.size);
 
-  uint16_t size = ntohs (mmsg->header.size);
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "%p Received multicast message of size %u.\n",
+              chn, size);
+
+  GNUNET_PSYCSTORE_fragment_store (store, &chn->pub_key, mmsg, 0,
+                                   &store_recv_fragment_store_result, chn);
+
   uint16_t first_ptype = 0, last_ptype = 0;
-
   if (GNUNET_SYSERR
-      == GNUNET_PSYC_check_message_parts (size - sizeof (*mmsg),
+      == GNUNET_PSYC_receive_check_parts (size - sizeof (*mmsg),
                                           (const char *) &mmsg[1],
                                           &first_ptype, &last_ptype))
   {
     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
-                "%p Received message with invalid parts from multicast. "
-                "Dropping message.\n", ch);
+                "%p Dropping incoming multicast message with invalid parts.\n",
+                chn);
     GNUNET_break_op (0);
-    return GNUNET_SYSERR;
+    return;
   }
 
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Message parts: first: type %u, last: type %u\n",
               first_ptype, last_ptype);
 
-  fragment_queue_insert (ch, mmsg, first_ptype, last_ptype);
-  message_queue_run (ch);
-
-  return GNUNET_OK;
+  fragment_queue_insert (chn, mmsg, first_ptype, last_ptype);
+  message_queue_run (chn);
 }
 
 
 /**
- * Incoming message fragment from multicast.
- *
- * Store it using PSYCstore and send it to the client of the channel.
- */
-static void
-mcast_message_cb (void *cls, const struct GNUNET_MessageHeader *msg)
-{
-  struct Channel *ch = cls;
-  uint16_t type = ntohs (msg->type);
-  uint16_t size = ntohs (msg->size);
-
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "%p Received message of type %u and size %u from multicast.\n",
-              ch, type, size);
-
-  switch (type)
-  {
-  case GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE:
-  {
-    client_multicast_message (ch, (const struct
-                                   GNUNET_MULTICAST_MessageHeader *) msg);
-    break;
-  }
-  default:
-    GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
-                "%p Dropping unknown message of type %u and size %u.\n",
-                ch, type, size);
-  }
-}
-
-
-/**
  * Incoming request fragment from multicast for a master.
  *
  * @param cls          Master.
@@ -1226,59 +1231,35 @@
  * @param flags                Request flags.
  */
 static void
-mcast_request_cb (void *cls,
-                  const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key,
-                  const struct GNUNET_MessageHeader *msg,
-                  enum GNUNET_MULTICAST_MessageFlags flags)
+mcast_recv_request (void *cls,
+                    const struct GNUNET_MULTICAST_RequestHeader *req)
 {
   struct Master *mst = cls;
-  struct Channel *ch = &mst->ch;
+  uint16_t size = ntohs (req->header.size);
 
-  uint16_t type = ntohs (msg->type);
-  uint16_t size = ntohs (msg->size);
-
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "%p Received request of type %u and size %u from multicast.\n",
-              ch, type, size);
+              "%p Received multicast request of size %u.\n",
+              mst, size);
 
-  switch (type)
+  uint16_t first_ptype = 0, last_ptype = 0;
+  if (GNUNET_SYSERR
+      == GNUNET_PSYC_receive_check_parts (size - sizeof (*req),
+                                          (const char *) &req[1],
+                                          &first_ptype, &last_ptype))
   {
-  case GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST:
-  {
-    const struct GNUNET_MULTICAST_RequestHeader *req
-      = (const struct GNUNET_MULTICAST_RequestHeader *) msg;
+    GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+                "%p Dropping incoming multicast request with invalid parts.\n",
+                mst);
+    GNUNET_break_op (0);
+    return;
+  }
 
-    /* FIXME: see message_cb() */
-    if (GNUNET_SYSERR == GNUNET_PSYC_check_message_parts (size - sizeof (*req),
-                                                          (const char *) 
&req[1],
-                                                          NULL, NULL))
-    {
-      GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
-                  "%p Dropping request with invalid parts "
-                  "received from multicast.\n", ch);
-      GNUNET_break_op (0);
-      break;
-    }
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Message parts: first: type %u, last: type %u\n",
+              first_ptype, last_ptype);
 
-    struct GNUNET_PSYC_MessageHeader *pmsg;
-    uint16_t psize = sizeof (*pmsg) + size - sizeof (*req);
-    pmsg = GNUNET_malloc (psize);
-    pmsg->header.size = htons (psize);
-    pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
-    pmsg->message_id = req->request_id;
-    pmsg->flags = htonl (GNUNET_PSYC_MESSAGE_REQUEST);
-
-    memcpy (&pmsg[1], &req[1], size - sizeof (*req));
-    msg_to_clients (ch, &pmsg->header);
-    GNUNET_free (pmsg);
-    break;
-  }
-  default:
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "%p Dropping unknown request of type %u and size %u.\n",
-                ch, type, size);
-    GNUNET_break_op (0);
-  }
+  /* FIXME: in-order delivery */
+  client_send_mcast_req (mst, req);
 }
 
 
@@ -1286,13 +1267,13 @@
  * Response from PSYCstore with the current counter values for a channel 
master.
  */
 static void
-master_counters_cb (void *cls, int result, uint64_t max_fragment_id,
-                    uint64_t max_message_id, uint64_t max_group_generation,
-                    uint64_t max_state_message_id)
+store_recv_master_counters (void *cls, int result, uint64_t max_fragment_id,
+                            uint64_t max_message_id, uint64_t 
max_group_generation,
+                            uint64_t max_state_message_id)
 {
   struct Master *mst = cls;
-  struct Channel *ch = &mst->ch;
-  ch->store_op = NULL;
+  struct Channel *chn = &mst->chn;
+  chn->store_op = NULL;
 
   struct CountersResult res;
   res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK);
@@ -1303,28 +1284,28 @@
   if (GNUNET_OK == result || GNUNET_NO == result)
   {
     mst->max_message_id = max_message_id;
-    ch->max_message_id = max_message_id;
-    ch->max_state_message_id = max_state_message_id;
+    chn->max_message_id = max_message_id;
+    chn->max_state_message_id = max_state_message_id;
     mst->max_group_generation = max_group_generation;
     mst->origin
       = GNUNET_MULTICAST_origin_start (cfg, &mst->priv_key, max_fragment_id,
-                                       &mcast_join_request_cb,
-                                       &mcast_membership_test_cb,
-                                       &mcast_replay_fragment_cb,
-                                       &mcast_replay_message_cb,
-                                       &mcast_request_cb,
-                                       &mcast_message_cb, ch);
-    ch->ready = GNUNET_YES;
+                                       &mcast_recv_join_request,
+                                       &mcast_recv_membership_test,
+                                       &mcast_recv_replay_fragment,
+                                       &mcast_recv_replay_message,
+                                       &mcast_recv_request,
+                                       &mcast_recv_message, chn);
+    chn->ready = GNUNET_YES;
   }
   else
   {
     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
                 "%p GNUNET_PSYCSTORE_counters_get() "
                 "returned %d for channel %s.\n",
-                ch, result, GNUNET_h2s (&ch->pub_key_hash));
+                chn, result, GNUNET_h2s (&chn->pub_key_hash));
   }
 
-  msg_to_clients (ch, &res.header);
+  client_send_msg (chn, &res.header);
 }
 
 
@@ -1332,13 +1313,13 @@
  * Response from PSYCstore with the current counter values for a channel slave.
  */
 void
-slave_counters_cb (void *cls, int result, uint64_t max_fragment_id,
-                   uint64_t max_message_id, uint64_t max_group_generation,
-                   uint64_t max_state_message_id)
+store_recv_slave_counters (void *cls, int result, uint64_t max_fragment_id,
+                           uint64_t max_message_id, uint64_t 
max_group_generation,
+                           uint64_t max_state_message_id)
 {
   struct Slave *slv = cls;
-  struct Channel *ch = &slv->ch;
-  ch->store_op = NULL;
+  struct Channel *chn = &slv->chn;
+  chn->store_op = NULL;
 
   struct CountersResult res;
   res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK);
@@ -1348,38 +1329,38 @@
 
   if (GNUNET_OK == result || GNUNET_NO == result)
   {
-    ch->max_message_id = max_message_id;
-    ch->max_state_message_id = max_state_message_id;
+    chn->max_message_id = max_message_id;
+    chn->max_state_message_id = max_state_message_id;
     slv->member
-      = GNUNET_MULTICAST_member_join (cfg, &ch->pub_key, &slv->priv_key,
+      = GNUNET_MULTICAST_member_join (cfg, &chn->pub_key, &slv->priv_key,
                                       &slv->origin,
                                       slv->relay_count, slv->relays,
                                       slv->join_req,
-                                      &mcast_join_request_cb,
-                                      &mcast_join_decision_cb,
-                                      &mcast_membership_test_cb,
-                                      &mcast_replay_fragment_cb,
-                                      &mcast_replay_message_cb,
-                                      &mcast_message_cb, ch);
+                                      &mcast_recv_join_request,
+                                      &mcast_recv_join_decision,
+                                      &mcast_recv_membership_test,
+                                      &mcast_recv_replay_fragment,
+                                      &mcast_recv_replay_message,
+                                      &mcast_recv_message, chn);
   }
   else
   {
     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
                 "%p GNUNET_PSYCSTORE_counters_get() "
                 "returned %d for channel %s.\n",
-                ch, result, GNUNET_h2s (&ch->pub_key_hash));
+                chn, result, GNUNET_h2s (&chn->pub_key_hash));
   }
 
-  msg_to_clients (ch, &res.header);
+  client_send_msg (chn, &res.header);
 }
 
 
 static void
-channel_init (struct Channel *ch)
+channel_init (struct Channel *chn)
 {
-  ch->recv_msgs
+  chn->recv_msgs
     = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
-  ch->recv_frags = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
+  chn->recv_frags = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
 }
 
 
@@ -1387,8 +1368,8 @@
  * Handle a connecting client starting a channel master.
  */
 static void
-client_master_start (void *cls, struct GNUNET_SERVER_Client *client,
-                     const struct GNUNET_MessageHeader *msg)
+client_recv_master_start (void *cls, struct GNUNET_SERVER_Client *client,
+                          const struct GNUNET_MessageHeader *msg)
 {
   const struct MasterStartRequest *req
     = (const struct MasterStartRequest *) msg;
@@ -1401,7 +1382,7 @@
 
   struct Master *
     mst = GNUNET_CONTAINER_multihashmap_get (masters, &pub_key_hash);
-  struct Channel *ch;
+  struct Channel *chn;
 
   if (NULL == mst)
   {
@@ -1410,20 +1391,20 @@
     mst->priv_key = req->channel_key;
     mst->join_reqs = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
 
-    ch = &mst->ch;
-    ch->is_master = GNUNET_YES;
-    ch->pub_key = pub_key;
-    ch->pub_key_hash = pub_key_hash;
-    channel_init (ch);
+    chn = &mst->chn;
+    chn->is_master = GNUNET_YES;
+    chn->pub_key = pub_key;
+    chn->pub_key_hash = pub_key_hash;
+    channel_init (chn);
 
-    GNUNET_CONTAINER_multihashmap_put (masters, &ch->pub_key_hash, ch,
+    GNUNET_CONTAINER_multihashmap_put (masters, &chn->pub_key_hash, chn,
                                        
GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
-    ch->store_op = GNUNET_PSYCSTORE_counters_get (store, &ch->pub_key,
-                                                  master_counters_cb, mst);
+    chn->store_op = GNUNET_PSYCSTORE_counters_get (store, &chn->pub_key,
+                                                  store_recv_master_counters, 
mst);
   }
   else
   {
-    ch = &mst->ch;
+    chn = &mst->chn;
 
     struct CountersResult res;
     res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK);
@@ -1438,13 +1419,13 @@
 
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "%p Client connected as master to channel %s.\n",
-              mst, GNUNET_h2s (&ch->pub_key_hash));
+              mst, GNUNET_h2s (&chn->pub_key_hash));
 
-  struct ClientList *cl = GNUNET_new (struct ClientList);
-  cl->client = client;
-  GNUNET_CONTAINER_DLL_insert (ch->clients_head, ch->clients_tail, cl);
+  struct ClientListItem *cli = GNUNET_new (struct ClientListItem);
+  cli->client = client;
+  GNUNET_CONTAINER_DLL_insert (chn->clients_head, chn->clients_tail, cli);
 
-  GNUNET_SERVER_client_set_user_context (client, ch);
+  GNUNET_SERVER_client_set_user_context (client, chn);
   GNUNET_SERVER_receive_done (client, GNUNET_OK);
 }
 
@@ -1453,8 +1434,8 @@
  * Handle a connecting client joining as a channel slave.
  */
 static void
-client_slave_join (void *cls, struct GNUNET_SERVER_Client *client,
-                   const struct GNUNET_MessageHeader *msg)
+client_recv_slave_join (void *cls, struct GNUNET_SERVER_Client *client,
+                        const struct GNUNET_MessageHeader *msg)
 {
   const struct SlaveJoinRequest *req
     = (const struct SlaveJoinRequest *) msg;
@@ -1467,13 +1448,13 @@
   GNUNET_CRYPTO_hash (&req->channel_key, sizeof (req->channel_key), 
&pub_key_hash);
 
   struct GNUNET_CONTAINER_MultiHashMap *
-    ch_slv = GNUNET_CONTAINER_multihashmap_get (channel_slaves, &pub_key_hash);
+    chn_slv = GNUNET_CONTAINER_multihashmap_get (channel_slaves, 
&pub_key_hash);
   struct Slave *slv = NULL;
-  struct Channel *ch;
+  struct Channel *chn;
 
-  if (NULL != ch_slv)
+  if (NULL != chn_slv)
   {
-    slv = GNUNET_CONTAINER_multihashmap_get (ch_slv, &slv_pub_key_hash);
+    slv = GNUNET_CONTAINER_multihashmap_get (chn_slv, &slv_pub_key_hash);
   }
   if (NULL == slv)
   {
@@ -1494,34 +1475,34 @@
         memcpy (&slv->relays[i], &relays[i], sizeof (*relays));
     }
 
-    ch = &slv->ch;
-    ch->is_master = GNUNET_NO;
-    ch->pub_key = req->channel_key;
-    ch->pub_key_hash = pub_key_hash;
-    channel_init (ch);
+    chn = &slv->chn;
+    chn->is_master = GNUNET_NO;
+    chn->pub_key = req->channel_key;
+    chn->pub_key_hash = pub_key_hash;
+    channel_init (chn);
 
-    if (NULL == ch_slv)
+    if (NULL == chn_slv)
     {
-      ch_slv = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
-      GNUNET_CONTAINER_multihashmap_put (channel_slaves, &ch->pub_key_hash, 
ch_slv,
+      chn_slv = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
+      GNUNET_CONTAINER_multihashmap_put (channel_slaves, &chn->pub_key_hash, 
chn_slv,
                                          
GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
     }
-    GNUNET_CONTAINER_multihashmap_put (ch_slv, &slv->pub_key_hash, ch,
+    GNUNET_CONTAINER_multihashmap_put (chn_slv, &slv->pub_key_hash, chn,
                                        
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
-    GNUNET_CONTAINER_multihashmap_put (slaves, &ch->pub_key_hash, ch,
+    GNUNET_CONTAINER_multihashmap_put (slaves, &chn->pub_key_hash, chn,
                                        
GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
-    ch->store_op = GNUNET_PSYCSTORE_counters_get (store, &ch->pub_key,
-                                                  slave_counters_cb, slv);
+    chn->store_op = GNUNET_PSYCSTORE_counters_get (store, &chn->pub_key,
+                                                  &store_recv_slave_counters, 
slv);
   }
   else
   {
-    ch = &slv->ch;
+    chn = &slv->chn;
 
     struct CountersResult res;
     res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK);
     res.header.size = htons (sizeof (res));
     res.result_code = htonl (GNUNET_OK);
-    res.max_message_id = GNUNET_htonll (ch->max_message_id);
+    res.max_message_id = GNUNET_htonll (chn->max_message_id);
 
     GNUNET_SERVER_notification_context_add (nc, client);
     GNUNET_SERVER_notification_context_unicast (nc, client, &res.header,
@@ -1530,16 +1511,16 @@
     if (NULL == slv->member)
     {
       slv->member
-        = GNUNET_MULTICAST_member_join (cfg, &ch->pub_key, &slv->priv_key,
+        = GNUNET_MULTICAST_member_join (cfg, &chn->pub_key, &slv->priv_key,
                                         &slv->origin,
                                         slv->relay_count, slv->relays,
                                         slv->join_req,
-                                        &mcast_join_request_cb,
-                                        &mcast_join_decision_cb,
-                                        &mcast_membership_test_cb,
-                                        &mcast_replay_fragment_cb,
-                                        &mcast_replay_message_cb,
-                                        &mcast_message_cb, ch);
+                                        &mcast_recv_join_request,
+                                        &mcast_recv_join_decision,
+                                        &mcast_recv_membership_test,
+                                        &mcast_recv_replay_fragment,
+                                        &mcast_recv_replay_message,
+                                        &mcast_recv_message, chn);
 
     }
     else if (NULL != slv->join_dcsn)
@@ -1553,13 +1534,13 @@
 
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "%p Client connected as slave to channel %s.\n",
-              slv, GNUNET_h2s (&ch->pub_key_hash));
+              slv, GNUNET_h2s (&chn->pub_key_hash));
 
-  struct ClientList *cl = GNUNET_new (struct ClientList);
-  cl->client = client;
-  GNUNET_CONTAINER_DLL_insert (ch->clients_head, ch->clients_tail, cl);
+  struct ClientListItem *cli = GNUNET_new (struct ClientListItem);
+  cli->client = client;
+  GNUNET_CONTAINER_DLL_insert (chn->clients_head, chn->clients_tail, cli);
 
-  GNUNET_SERVER_client_set_user_context (client, &slv->ch);
+  GNUNET_SERVER_client_set_user_context (client, &slv->chn);
   GNUNET_SERVER_receive_done (client, GNUNET_OK);
 }
 
@@ -1575,8 +1556,8 @@
  * Iterator callback for responding to join requests of a slave.
  */
 static int
-send_join_decision_cb (void *cls, const struct GNUNET_HashCode *pub_key_hash,
-                       void *jh)
+mcast_send_join_decision (void *cls, const struct GNUNET_HashCode 
*pub_key_hash,
+                          void *jh)
 {
   struct JoinDecisionClosure *jcls = cls;
   // FIXME: add relays
@@ -1589,13 +1570,13 @@
  * Join decision from client.
  */
 static void
-client_join_decision (void *cls, struct GNUNET_SERVER_Client *client,
-                      const struct GNUNET_MessageHeader *msg)
+client_recv_join_decision (void *cls, struct GNUNET_SERVER_Client *client,
+                           const struct GNUNET_MessageHeader *msg)
 {
   struct Channel *
-    ch = GNUNET_SERVER_client_get_user_context (client, struct Channel);
-  GNUNET_assert (GNUNET_YES == ch->is_master);
-  struct Master *mst = (struct Master *) ch;
+    chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
+  GNUNET_assert (GNUNET_YES == chn->is_master);
+  struct Master *mst = (struct Master *) chn;
 
   struct MasterJoinDecision *dcsn = (struct MasterJoinDecision *) msg;
   struct JoinDecisionClosure jcls;
@@ -1612,13 +1593,13 @@
 
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "%p Got join decision (%d) from client for channel %s..\n",
-              mst, jcls.is_admitted, GNUNET_h2s (&ch->pub_key_hash));
+              mst, jcls.is_admitted, GNUNET_h2s (&chn->pub_key_hash));
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "%p ..and slave %s.\n",
               mst, GNUNET_h2s (&slave_key_hash));
 
   GNUNET_CONTAINER_multihashmap_get_multiple (mst->join_reqs, &slave_key_hash,
-                                              &send_join_decision_cb, &jcls);
+                                              &mcast_send_join_decision, 
&jcls);
   GNUNET_CONTAINER_multihashmap_remove_all (mst->join_reqs, &slave_key_hash);
   GNUNET_SERVER_receive_done (client, GNUNET_OK);
 }
@@ -1629,10 +1610,10 @@
  *
  * Sent after a message fragment has been passed on to multicast.
  *
- * @param ch The channel struct for the client.
+ * @param chn The channel struct for the client.
  */
 static void
-send_message_ack (struct Channel *ch, struct GNUNET_SERVER_Client *client)
+send_message_ack (struct Channel *chn, struct GNUNET_SERVER_Client *client)
 {
   struct GNUNET_MessageHeader res;
   res.size = htons (sizeof (res));
@@ -1650,40 +1631,40 @@
 static int
 transmit_notify (void *cls, size_t *data_size, void *data)
 {
-  struct Channel *ch = cls;
-  struct TransmitMessage *tmit_msg = ch->tmit_head;
+  struct Channel *chn = cls;
+  struct TransmitMessage *tmit_msg = chn->tmit_head;
 
   if (NULL == tmit_msg || *data_size < tmit_msg->size)
   {
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "%p transmit_notify: nothing to send.\n", ch);
+                "%p transmit_notify: nothing to send.\n", chn);
     *data_size = 0;
     return GNUNET_NO;
   }
 
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "%p transmit_notify: sending %u bytes.\n", ch, tmit_msg->size);
+              "%p transmit_notify: sending %u bytes.\n", chn, tmit_msg->size);
 
   *data_size = tmit_msg->size;
   memcpy (data, &tmit_msg[1], *data_size);
 
-  int ret = (MSG_STATE_END < ch->tmit_state) ? GNUNET_NO : GNUNET_YES;
+  int ret = (MSG_STATE_END < chn->tmit_state) ? GNUNET_NO : GNUNET_YES;
   if (NULL != tmit_msg->client)
-    send_message_ack (ch, tmit_msg->client);
+    send_message_ack (chn, tmit_msg->client);
 
-  GNUNET_CONTAINER_DLL_remove (ch->tmit_head, ch->tmit_tail, tmit_msg);
+  GNUNET_CONTAINER_DLL_remove (chn->tmit_head, chn->tmit_tail, tmit_msg);
   GNUNET_free (tmit_msg);
 
-  if (0 == ch->tmit_task)
+  if (0 == chn->tmit_task)
   {
-    if (NULL != ch->tmit_head)
+    if (NULL != chn->tmit_head)
     {
-      transmit_message (ch);
+      transmit_message (chn);
     }
-    else if (ch->disconnected)
+    else if (chn->disconnected)
     {
       /* FIXME: handle partial message (when still in_transmit) */
-      cleanup_channel (ch);
+      cleanup_channel (chn);
     }
   }
 
@@ -1732,7 +1713,7 @@
 master_transmit_message (struct Master *mst)
 {
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p master_transmit_message()\n", mst);
-  mst->ch.tmit_task = 0;
+  mst->chn.tmit_task = 0;
   if (NULL == mst->tmit_handle)
   {
     mst->tmit_handle
@@ -1753,7 +1734,7 @@
 static void
 slave_transmit_message (struct Slave *slv)
 {
-  slv->ch.tmit_task = 0;
+  slv->chn.tmit_task = 0;
   if (NULL == slv->tmit_handle)
   {
     slv->tmit_handle
@@ -1768,11 +1749,11 @@
 
 
 static inline void
-transmit_message (struct Channel *ch)
+transmit_message (struct Channel *chn)
 {
-  ch->is_master
-    ? master_transmit_message ((struct Master *) ch)
-    : slave_transmit_message ((struct Slave *) ch);
+  chn->is_master
+    ? master_transmit_message ((struct Master *) chn)
+    : slave_transmit_message ((struct Slave *) chn);
 }
 
 
@@ -1828,7 +1809,7 @@
 /**
  * Queue PSYC message parts for sending to multicast.
  *
- * @param ch           Channel to send to.
+ * @param chn           Channel to send to.
  * @param client       Client the message originates from.
  * @param data_size    Size of @a data.
  * @param data         Concatenated message parts.
@@ -1836,7 +1817,7 @@
  * @param last_ptype   Last message part type in @a data.
  */
 static void
-queue_message (struct Channel *ch,
+queue_message (struct Channel *chn,
                struct GNUNET_SERVER_Client *client,
                size_t data_size,
                const void *data,
@@ -1847,14 +1828,14 @@
   memcpy (&tmit_msg[1], data, data_size);
   tmit_msg->client = client;
   tmit_msg->size = data_size;
-  tmit_msg->state = ch->tmit_state;
+  tmit_msg->state = chn->tmit_state;
 
-  GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, tmit_msg);
+  GNUNET_CONTAINER_DLL_insert_tail (chn->tmit_head, chn->tmit_tail, tmit_msg);
 
-  ch->is_master
-    ? master_queue_message ((struct Master *) ch, tmit_msg,
+  chn->is_master
+    ? master_queue_message ((struct Master *) chn, tmit_msg,
                             first_ptype, last_ptype)
-    : slave_queue_message ((struct Slave *) ch, tmit_msg,
+    : slave_queue_message ((struct Slave *) chn, tmit_msg,
                            first_ptype, last_ptype);
 }
 
@@ -1862,11 +1843,11 @@
 /**
  * Cancel transmission of current message.
  *
- * @param ch     Channel to send to.
+ * @param chn    Channel to send to.
  * @param client  Client the message originates from.
  */
 static void
-transmit_cancel (struct Channel *ch, struct GNUNET_SERVER_Client *client)
+transmit_cancel (struct Channel *chn, struct GNUNET_SERVER_Client *client)
 {
   uint16_t type = GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL;
 
@@ -1874,8 +1855,8 @@
   msg.size = htons (sizeof (msg));
   msg.type = htons (type);
 
-  queue_message (ch, client, sizeof (msg), &msg, type, type);
-  transmit_message (ch);
+  queue_message (chn, client, sizeof (msg), &msg, type, type);
+  transmit_message (chn);
 
   /* FIXME: cleanup */
 }
@@ -1885,21 +1866,21 @@
  * Incoming message from a master or slave client.
  */
 static void
-client_psyc_message (void *cls, struct GNUNET_SERVER_Client *client,
-                     const struct GNUNET_MessageHeader *msg)
+client_recv_psyc_message (void *cls, struct GNUNET_SERVER_Client *client,
+                          const struct GNUNET_MessageHeader *msg)
 {
   struct Channel *
-    ch = GNUNET_SERVER_client_get_user_context (client, struct Channel);
-  GNUNET_assert (NULL != ch);
+    chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
+  GNUNET_assert (NULL != chn);
 
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "%p Received message from client.\n", ch);
+              "%p Received message from client.\n", chn);
   GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, msg);
 
-  if (GNUNET_YES != ch->ready)
+  if (GNUNET_YES != chn->ready)
   {
     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
-                "%p Channel is not ready, dropping message from client.\n", 
ch);
+                "%p Channel is not ready, dropping message from client.\n", 
chn);
     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
     return;
   }
@@ -1907,30 +1888,30 @@
   uint16_t size = ntohs (msg->size);
   if (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < size - sizeof (*msg))
   {
-    GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "%p Message payload too large.\n", 
ch);
+    GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "%p Message payload too large.\n", 
chn);
     GNUNET_break (0);
-    transmit_cancel (ch, client);
+    transmit_cancel (chn, client);
     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
     return;
   }
 
   uint16_t first_ptype = 0, last_ptype = 0;
   if (GNUNET_SYSERR
-      == GNUNET_PSYC_check_message_parts (size - sizeof (*msg),
+      == GNUNET_PSYC_receive_check_parts (size - sizeof (*msg),
                                           (const char *) &msg[1],
                                           &first_ptype, &last_ptype))
   {
     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
-                "%p Received invalid message part from client.\n", ch);
+                "%p Received invalid message part from client.\n", chn);
     GNUNET_break (0);
-    transmit_cancel (ch, client);
+    transmit_cancel (chn, client);
     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
     return;
   }
 
-  queue_message (ch, client, size - sizeof (*msg), &msg[1],
+  queue_message (chn, client, size - sizeof (*msg), &msg[1],
                  first_ptype, last_ptype);
-  transmit_message (ch);
+  transmit_message (chn);
 
   GNUNET_SERVER_receive_done (client, GNUNET_OK);
 };
@@ -1940,8 +1921,8 @@
  * Client requests to add a slave to the membership database.
  */
 static void
-client_slave_add (void *cls, struct GNUNET_SERVER_Client *client,
-                  const struct GNUNET_MessageHeader *msg)
+client_recv_slave_add (void *cls, struct GNUNET_SERVER_Client *client,
+                       const struct GNUNET_MessageHeader *msg)
 {
 
 }
@@ -1951,8 +1932,8 @@
  * Client requests to remove a slave from the membership database.
  */
 static void
-client_slave_remove (void *cls, struct GNUNET_SERVER_Client *client,
-                     const struct GNUNET_MessageHeader *msg)
+client_recv_slave_remove (void *cls, struct GNUNET_SERVER_Client *client,
+                          const struct GNUNET_MessageHeader *msg)
 {
 
 }
@@ -1962,8 +1943,8 @@
  * Client requests channel history from PSYCstore.
  */
 static void
-client_story_request (void *cls, struct GNUNET_SERVER_Client *client,
-                      const struct GNUNET_MessageHeader *msg)
+client_recv_story_request (void *cls, struct GNUNET_SERVER_Client *client,
+                           const struct GNUNET_MessageHeader *msg)
 {
 
 }
@@ -1973,8 +1954,8 @@
  * Client requests best matching state variable from PSYCstore.
  */
 static void
-client_state_get (void *cls, struct GNUNET_SERVER_Client *client,
-                  const struct GNUNET_MessageHeader *msg)
+client_recv_state_get (void *cls, struct GNUNET_SERVER_Client *client,
+                       const struct GNUNET_MessageHeader *msg)
 {
 
 }
@@ -1984,8 +1965,8 @@
  * Client requests state variables with a given prefix from PSYCstore.
  */
 static void
-client_state_get_prefix (void *cls, struct GNUNET_SERVER_Client *client,
-                         const struct GNUNET_MessageHeader *msg)
+client_recv_state_get_prefix (void *cls, struct GNUNET_SERVER_Client *client,
+                              const struct GNUNET_MessageHeader *msg)
 {
 
 }
@@ -2003,32 +1984,34 @@
      const struct GNUNET_CONFIGURATION_Handle *c)
 {
   static const struct GNUNET_SERVER_MessageHandler handlers[] = {
-    { &client_master_start, NULL,
+    { &client_recv_master_start, NULL,
       GNUNET_MESSAGE_TYPE_PSYC_MASTER_START, 0 },
 
-    { &client_slave_join, NULL,
+    { &client_recv_slave_join, NULL,
       GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN, 0 },
 
-    { &client_join_decision, NULL,
+    { &client_recv_join_decision, NULL,
       GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION, 0 },
 
-    { &client_psyc_message, NULL,
+    { &client_recv_psyc_message, NULL,
       GNUNET_MESSAGE_TYPE_PSYC_MESSAGE, 0 },
 
-    { &client_slave_add, NULL,
+    { &client_recv_slave_add, NULL,
       GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_ADD, 0 },
 
-    { &client_slave_remove, NULL,
+    { &client_recv_slave_remove, NULL,
       GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_RM, 0 },
 
-    { &client_story_request, NULL,
+    { &client_recv_story_request, NULL,
       GNUNET_MESSAGE_TYPE_PSYC_STORY_REQUEST, 0 },
 
-    { &client_state_get, NULL,
+    { &client_recv_state_get, NULL,
       GNUNET_MESSAGE_TYPE_PSYC_STATE_GET, 0 },
 
-    { &client_state_get_prefix, NULL,
-      GNUNET_MESSAGE_TYPE_PSYC_STATE_GET_PREFIX, 0 }
+    { &client_recv_state_get_prefix, NULL,
+      GNUNET_MESSAGE_TYPE_PSYC_STATE_GET_PREFIX, 0 },
+
+    { NULL, NULL, 0, 0 }
   };
 
   cfg = c;

Modified: gnunet/src/psyc/psyc_api.c
===================================================================
--- gnunet/src/psyc/psyc_api.c  2014-05-29 16:35:53 UTC (rev 33442)
+++ gnunet/src/psyc/psyc_api.c  2014-05-29 16:35:55 UTC (rev 33443)
@@ -37,151 +37,48 @@
 #include "gnunet_env_lib.h"
 #include "gnunet_multicast_service.h"
 #include "gnunet_psyc_service.h"
+#include "gnunet_psyc_util_lib.h"
 #include "psyc.h"
 
 #define LOG(kind,...) GNUNET_log_from (kind, "psyc-api",__VA_ARGS__)
 
-struct MessageQueue
-{
-  struct MessageQueue *prev;
-  struct MessageQueue *next;
-  /* Followed by struct GNUNET_MessageHeader msg */
-};
 
-
 /**
- * Handle for a pending PSYC transmission operation.
- */
-struct GNUNET_PSYC_ChannelTransmitHandle
-{
-  struct GNUNET_PSYC_Channel *ch;
-  GNUNET_PSYC_TransmitNotifyModifier notify_mod;
-  GNUNET_PSYC_TransmitNotifyData notify_data;
-  void *notify_cls;
-  enum MessageState state;
-};
-
-/**
  * Handle to access PSYC channel operations for both the master and slaves.
  */
 struct GNUNET_PSYC_Channel
 {
   /**
-   * Transmission handle;
-   */
-  struct GNUNET_PSYC_ChannelTransmitHandle tmit;
-
-  /**
    * Configuration to use.
    */
   const struct GNUNET_CONFIGURATION_Handle *cfg;
 
   /**
-   * Socket (if available).
+   * Client connection to the service.
    */
-  struct GNUNET_CLIENT_Connection *client;
+  struct GNUNET_CLIENT_MANAGER_Connection *client;
 
   /**
-   * Currently pending transmission request, or NULL for none.
+   * Transmission handle;
    */
-  struct GNUNET_CLIENT_TransmitHandle *th;
+  struct GNUNET_PSYC_TransmitHandle *tmit;
 
   /**
-   * Head of messages to transmit to the service.
+   * Receipt handle;
    */
-  struct MessageQueue *tmit_head;
+  struct GNUNET_PSYC_ReceiveHandle *recv;
 
   /**
-   * Tail of operations to transmit to the service.
-   */
-  struct MessageQueue *tmit_tail;
-
-  /**
-   * Message currently being transmitted to the service.
-   */
-  struct MessageQueue *tmit_msg;
-
-  /**
    * Message to send on reconnect.
    */
-  struct GNUNET_MessageHeader *reconnect_msg;
+  struct GNUNET_MessageHeader *connect_msg;
 
   /**
-   * Task doing exponential back-off trying to reconnect.
-   */
-  GNUNET_SCHEDULER_TaskIdentifier reconnect_task;
-
-  /**
-   * Time for next connect retry.
-   */
-  struct GNUNET_TIME_Relative reconnect_delay;
-
-  /**
-   * Message part callback.
-   */
-  GNUNET_PSYC_MessageCallback message_cb;
-
-  /**
-   * Message part callback for historic message.
-   */
-  GNUNET_PSYC_MessageCallback hist_message_cb;
-
-  /**
-   * Closure for @a message_cb.
-   */
-  void *cb_cls;
-
-  /**
-   * ID of the message being received from the PSYC service.
-   */
-  uint64_t recv_message_id;
-
-  /**
-   * Public key of the slave from which a message is being received.
-   */
-  struct GNUNET_CRYPTO_EddsaPublicKey recv_slave_key;
-
-  /**
-   * State of the currently being received message from the PSYC service.
-   */
-  enum MessageState recv_state;
-
-  /**
-   * Flags for the currently being received message from the PSYC service.
-   */
-  enum GNUNET_PSYC_MessageFlags recv_flags;
-
-  /**
-   * Expected value size for the modifier being received from the PSYC service.
-   */
-  uint32_t recv_mod_value_size_expected;
-
-  /**
-   * Actual value size for the modifier being received from the PSYC service.
-   */
-  uint32_t recv_mod_value_size;
-
-  /**
-   * Is transmission paused?
-   */
-  uint8_t tmit_paused;
-
-  /**
-   * Are we still waiting for a PSYC_TRANSMIT_ACK?
-   */
-  uint8_t tmit_ack_pending;
-
-  /**
    * Are we polling for incoming messages right now?
    */
   uint8_t in_receive;
 
   /**
-   * Are we currently transmitting a message?
-   */
-  uint8_t in_transmit;
-
-  /**
    * Is this a master or slave channel?
    */
   uint8_t is_master;
@@ -193,7 +90,7 @@
  */
 struct GNUNET_PSYC_Master
 {
-  struct GNUNET_PSYC_Channel ch;
+  struct GNUNET_PSYC_Channel chn;
 
   GNUNET_PSYC_MasterStartCallback start_cb;
 
@@ -201,6 +98,11 @@
    * Join request callback.
    */
   GNUNET_PSYC_JoinRequestCallback join_req_cb;
+
+  /**
+   * Closure for the callbacks.
+   */
+  void *cb_cls;
 };
 
 
@@ -209,11 +111,16 @@
  */
 struct GNUNET_PSYC_Slave
 {
-  struct GNUNET_PSYC_Channel ch;
+  struct GNUNET_PSYC_Channel chn;
 
   GNUNET_PSYC_SlaveConnectCallback connect_cb;
 
   GNUNET_PSYC_JoinDecisionCallback join_dcsn_cb;
+
+  /**
+   * Closure for the callbacks.
+   */
+  void *cb_cls;
 };
 
 
@@ -258,934 +165,170 @@
 
 
 static void
-reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
-
-
-static void
-channel_transmit_data (struct GNUNET_PSYC_Channel *ch);
-
-
-/**
- * Reschedule a connect attempt to the service.
- *
- * @param ch  Channel to reconnect.
- */
-static void
-reschedule_connect (struct GNUNET_PSYC_Channel *ch)
+channel_send_connect_msg (struct GNUNET_PSYC_Channel *chn)
 {
-  GNUNET_assert (ch->reconnect_task == GNUNET_SCHEDULER_NO_TASK);
-
-  if (NULL != ch->th)
-  {
-    GNUNET_CLIENT_notify_transmit_ready_cancel (ch->th);
-    ch->th = NULL;
-  }
-  if (NULL != ch->client)
-  {
-    GNUNET_CLIENT_disconnect (ch->client);
-    ch->client = NULL;
-  }
-  ch->in_receive = GNUNET_NO;
-  LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "Scheduling task to reconnect to PSYC service in %s.\n",
-       GNUNET_STRINGS_relative_time_to_string (ch->reconnect_delay, 
GNUNET_YES));
-  ch->reconnect_task =
-      GNUNET_SCHEDULER_add_delayed (ch->reconnect_delay, &reconnect, ch);
-  ch->reconnect_delay = GNUNET_TIME_STD_BACKOFF (ch->reconnect_delay);
+  uint16_t cmsg_size = ntohs (chn->connect_msg->size);
+  struct GNUNET_MessageHeader * cmsg = GNUNET_malloc (cmsg_size);
+  memcpy (cmsg, chn->connect_msg, cmsg_size);
+  GNUNET_CLIENT_MANAGER_transmit_now (chn->client, cmsg);
 }
 
 
-/**
- * Schedule transmission of the next message from our queue.
- *
- * @param ch PSYC channel handle
- */
 static void
-transmit_next (struct GNUNET_PSYC_Channel *ch);
-
-
-/**
- * Reset stored data related to the last received message.
- */
-static void
-recv_reset (struct GNUNET_PSYC_Channel *ch)
+channel_recv_disconnect (void *cls,
+                         struct GNUNET_CLIENT_MANAGER_Connection *client,
+                         const struct GNUNET_MessageHeader *msg)
 {
-  ch->recv_state = MSG_STATE_START;
-  ch->recv_flags = 0;
-  ch->recv_message_id = 0;
-  //FIXME: ch->recv_slave_key = { 0 };
-  ch->recv_mod_value_size = 0;
-  ch->recv_mod_value_size_expected = 0;
+  struct GNUNET_PSYC_Channel *
+    chn = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*chn));
+  GNUNET_CLIENT_MANAGER_reconnect (client);
+  channel_send_connect_msg (chn);
 }
 
 
 static void
-recv_error (struct GNUNET_PSYC_Channel *ch)
+channel_recv_message (void *cls,
+                      struct GNUNET_CLIENT_MANAGER_Connection *client,
+                      const struct GNUNET_MessageHeader *msg)
 {
-  GNUNET_PSYC_MessageCallback message_cb
-    = ch->recv_flags & GNUNET_PSYC_MESSAGE_HISTORIC
-    ? ch->hist_message_cb
-    : ch->message_cb;
-
-  if (NULL != message_cb)
-    message_cb (ch->cb_cls, ch->recv_message_id, ch->recv_flags, NULL);
-
-  recv_reset (ch);
+  struct GNUNET_PSYC_Channel *
+    chn = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*chn));
+  GNUNET_PSYC_receive_message (chn->recv,
+                               (const struct GNUNET_PSYC_MessageHeader *) msg);
 }
 
 
-/**
- * Queue a message part for transmission to the PSYC service.
- *
- * The message part is added to the current message buffer.
- * When this buffer is full, it is added to the transmission queue.
- *
- * @param ch Channel struct for the client.
- * @param msg Modifier message part, or NULL when there's no more modifiers.
- * @param end End of message.
- */
 static void
-queue_message (struct GNUNET_PSYC_Channel *ch,
-               const struct GNUNET_MessageHeader *msg,
-               uint8_t end)
+channel_recv_message_ack (void *cls,
+                          struct GNUNET_CLIENT_MANAGER_Connection *client,
+                          const struct GNUNET_MessageHeader *msg)
 {
-  uint16_t size = msg ? ntohs (msg->size) : 0;
-
-  LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "Queueing message of type %u and size %u (end: %u)).\n",
-       ntohs (msg->type), size, end);
-
-  struct MessageQueue *mq = ch->tmit_msg;
-  struct GNUNET_MessageHeader *qmsg = NULL;
-  if (NULL != mq)
-  {
-    qmsg = (struct GNUNET_MessageHeader *) &mq[1];
-    if (NULL == msg
-        || GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < qmsg->size + size)
-    {
-      /* End of message or buffer is full, add it to transmission queue
-       * and start with empty buffer */
-      qmsg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
-      qmsg->size = htons (qmsg->size);
-      GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, mq);
-      ch->tmit_msg = mq = NULL;
-      ch->tmit_ack_pending++;
-    }
-    else
-    {
-      /* Message fits in current buffer, append */
-      ch->tmit_msg
-        = mq = GNUNET_realloc (mq, sizeof (*mq) + qmsg->size + size);
-      qmsg = (struct GNUNET_MessageHeader *) &mq[1];
-      memcpy ((char *) qmsg + qmsg->size, msg, size);
-      qmsg->size += size;
-    }
-  }
-
-  if (NULL == mq && NULL != msg)
-  {
-    /* Empty buffer, copy over message. */
-    ch->tmit_msg
-      = mq = GNUNET_malloc (sizeof (*mq) + sizeof (*qmsg) + size);
-    qmsg = (struct GNUNET_MessageHeader *) &mq[1];
-    qmsg->size = sizeof (*qmsg) + size;
-    memcpy (&qmsg[1], msg, size);
-  }
-
-  if (NULL != mq
-      && (GNUNET_YES == end
-          || (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD
-              < qmsg->size + sizeof (struct GNUNET_MessageHeader))))
-  {
-    /* End of message or buffer is full, add it to transmission queue. */
-    qmsg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
-    qmsg->size = htons (qmsg->size);
-    GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, mq);
-    ch->tmit_msg = mq = NULL;
-    ch->tmit_ack_pending++;
-  }
-
-  if (GNUNET_YES == end)
-    ch->in_transmit = GNUNET_NO;
-
-  transmit_next (ch);
+  struct GNUNET_PSYC_Channel *
+    chn = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*chn));
+  GNUNET_PSYC_transmit_got_ack (chn->tmit);
 }
 
 
-/**
- * Request a modifier from a client to transmit.
- *
- * @param mst Master handle.
- */
 static void
-channel_transmit_mod (struct GNUNET_PSYC_Channel *ch)
+master_recv_start_ack (void *cls,
+                       struct GNUNET_CLIENT_MANAGER_Connection *client,
+                       const struct GNUNET_MessageHeader *msg)
 {
-  uint16_t max_data_size, data_size;
-  char data[GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD] = "";
-  struct GNUNET_MessageHeader *msg = (struct GNUNET_MessageHeader *) data;
-  int notify_ret;
+  struct GNUNET_PSYC_Master *
+    mst = GNUNET_CLIENT_MANAGER_get_user_context_ (client,
+                                                   sizeof (struct 
GNUNET_PSYC_Channel));
 
-  switch (ch->tmit.state)
-  {
-  case MSG_STATE_MODIFIER:
-  {
-    struct GNUNET_PSYC_MessageModifier *mod
-      = (struct GNUNET_PSYC_MessageModifier *) msg;
-    max_data_size = data_size = GNUNET_PSYC_MODIFIER_MAX_PAYLOAD;
-    msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER);
-    msg->size = sizeof (struct GNUNET_PSYC_MessageModifier);
-    notify_ret = ch->tmit.notify_mod (ch->tmit.notify_cls, &data_size, &mod[1],
-                                      &mod->oper, &mod->value_size);
-    mod->name_size = strnlen ((char *) &mod[1], data_size);
-    if (mod->name_size < data_size)
-    {
-      mod->value_size = htonl (mod->value_size);
-      mod->name_size = htons (mod->name_size);
-    }
-    else if (0 < data_size)
-    {
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Got invalid modifier name.\n");
-      notify_ret = GNUNET_SYSERR;
-    }
-    break;
-  }
-  case MSG_STATE_MOD_CONT:
-  {
-    max_data_size = data_size = GNUNET_PSYC_MOD_CONT_MAX_PAYLOAD;
-    msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT);
-    msg->size = sizeof (struct GNUNET_MessageHeader);
-    notify_ret = ch->tmit.notify_mod (ch->tmit.notify_cls,
-                                      &data_size, &msg[1], NULL, NULL);
-    break;
-  }
-  default:
-    GNUNET_assert (0);
-  }
-
-  switch (notify_ret)
-  {
-  case GNUNET_NO:
-    if (0 == data_size)
-    { /* Transmission paused, nothing to send. */
-      ch->tmit_paused = GNUNET_YES;
-      return;
-    }
-    ch->tmit.state = MSG_STATE_MOD_CONT;
-    break;
-
-  case GNUNET_YES:
-    if (0 == data_size)
-    {
-      /* End of modifiers. */
-      ch->tmit.state = MSG_STATE_DATA;
-      if (0 == ch->tmit_ack_pending)
-        channel_transmit_data (ch);
-
-      return;
-    }
-    ch->tmit.state = MSG_STATE_MODIFIER;
-    break;
-
-  default:
-    LOG (GNUNET_ERROR_TYPE_ERROR,
-         "MasterTransmitNotifyModifier returned error "
-         "when requesting a modifier.\n");
-
-    ch->tmit.state = MSG_STATE_CANCEL;
-    msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL);
-    msg->size = htons (sizeof (*msg));
-
-    queue_message (ch, msg, GNUNET_YES);
-    return;
-  }
-
-  if (0 < data_size)
-  {
-    GNUNET_assert (data_size <= max_data_size);
-    msg->size = htons (msg->size + data_size);
-    queue_message (ch, msg, GNUNET_NO);
-  }
-
-  channel_transmit_mod (ch);
+  struct CountersResult *cres = (struct CountersResult *) msg;
+  if (NULL != mst->start_cb)
+    mst->start_cb (mst->cb_cls, GNUNET_ntohll (cres->max_message_id));
 }
 
 
-/**
- * Request data from a client to transmit.
- *
- * @param mst Master handle.
- */
 static void
-channel_transmit_data (struct GNUNET_PSYC_Channel *ch)
+master_recv_join_request (void *cls,
+                          struct GNUNET_CLIENT_MANAGER_Connection *client,
+                          const struct GNUNET_MessageHeader *msg)
 {
-  uint16_t data_size = GNUNET_PSYC_DATA_MAX_PAYLOAD;
-  char data[GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD] = "";
-  struct GNUNET_MessageHeader *msg = (struct GNUNET_MessageHeader *) data;
+  struct GNUNET_PSYC_Master *
+    mst = GNUNET_CLIENT_MANAGER_get_user_context_ (client,
+                                                   sizeof (struct 
GNUNET_PSYC_Channel));
 
-  msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA);
+  const struct MasterJoinRequest *req = (const struct MasterJoinRequest *) msg;
 
-  int notify_ret = ch->tmit.notify_data (ch->tmit.notify_cls,
-                                         &data_size, &msg[1]);
-  switch (notify_ret)
-  {
-  case GNUNET_NO:
-    if (0 == data_size)
-    {
-      /* Transmission paused, nothing to send. */
-      ch->tmit_paused = GNUNET_YES;
-      return;
-    }
-    break;
+  struct GNUNET_PSYC_MessageHeader *pmsg = NULL;
+  if (ntohs (req->header.size) <= sizeof (*req) + sizeof (*pmsg))
+    pmsg = (struct GNUNET_PSYC_MessageHeader *) &req[1];
 
-  case GNUNET_YES:
-    ch->tmit.state = MSG_STATE_END;
-    break;
-
-  default:
-    LOG (GNUNET_ERROR_TYPE_ERROR,
-         "MasterTransmitNotify returned error when requesting data.\n");
-
-    ch->tmit.state = MSG_STATE_CANCEL;
-    msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL);
-    msg->size = htons (sizeof (*msg));
-    queue_message (ch, msg, GNUNET_YES);
-    return;
-  }
-
-  if (0 < data_size)
-  {
-    GNUNET_assert (data_size <= GNUNET_PSYC_DATA_MAX_PAYLOAD);
-    msg->size = htons (sizeof (*msg) + data_size);
-    queue_message (ch, msg, !notify_ret);
-  }
-
-  /* End of message. */
-  if (GNUNET_YES == notify_ret)
-  {
-    msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END);
-    msg->size = htons (sizeof (*msg));
-    queue_message (ch, msg, GNUNET_YES);
-  }
-}
-
-
-/**
- * Send a message to a channel.
- *
- * @param ch Handle to the PSYC channel.
- * @param method_name Which method should be invoked.
- * @param notify_mod Function to call to obtain modifiers.
- * @param notify_data Function to call to obtain fragments of the data.
- * @param notify_cls Closure for @a notify_mod and @a notify_data.
- * @param flags Flags for the message being transmitted.
- *
- * @return Transmission handle, NULL on error (i.e. more than one request 
queued).
- */
-static struct GNUNET_PSYC_ChannelTransmitHandle *
-channel_transmit (struct GNUNET_PSYC_Channel *ch,
-                  const char *method_name,
-                  GNUNET_PSYC_TransmitNotifyModifier notify_mod,
-                  GNUNET_PSYC_TransmitNotifyData notify_data,
-                  void *notify_cls,
-                  uint32_t flags)
-{
-  if (GNUNET_NO != ch->in_transmit)
-    return NULL;
-  ch->in_transmit = GNUNET_YES;
-
-  size_t size = strlen (method_name) + 1;
-  struct GNUNET_PSYC_MessageMethod *pmeth;
-  struct GNUNET_MessageHeader *qmsg;
-  struct MessageQueue *
-    mq = ch->tmit_msg = GNUNET_malloc (sizeof (*mq) + sizeof (*qmsg)
-                                       + sizeof (*pmeth) + size);
-  qmsg = (struct GNUNET_MessageHeader *) &mq[1];
-  qmsg->size = sizeof (*qmsg) + sizeof (*pmeth) + size;
-
-  pmeth = (struct GNUNET_PSYC_MessageMethod *) &qmsg[1];
-  pmeth->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD);
-  pmeth->header.size = htons (sizeof (*pmeth) + size);
-  pmeth->flags = htonl (flags);
-  memcpy (&pmeth[1], method_name, size);
-
-  ch->tmit.ch = ch;
-  ch->tmit.notify_mod = notify_mod;
-  ch->tmit.notify_data = notify_data;
-  ch->tmit.notify_cls = notify_cls;
-  ch->tmit.state = MSG_STATE_MODIFIER;
-
-  channel_transmit_mod (ch);
-  return &ch->tmit;
-}
-
-
-/**
- * Resume transmission to the channel.
- *
- * @param th Handle of the request that is being resumed.
- */
-static void
-channel_transmit_resume (struct GNUNET_PSYC_ChannelTransmitHandle *th)
-{
-  struct GNUNET_PSYC_Channel *ch = th->ch;
-  if (0 == ch->tmit_ack_pending)
-  {
-    ch->tmit_paused = GNUNET_NO;
-    channel_transmit_data (ch);
-  }
-}
-
-
-/**
- * Abort transmission request to channel.
- *
- * @param th Handle of the request that is being aborted.
- */
-static void
-channel_transmit_cancel (struct GNUNET_PSYC_ChannelTransmitHandle *th)
-{
-  struct GNUNET_PSYC_Channel *ch = th->ch;
-  if (GNUNET_NO == ch->in_transmit)
-    return;
-}
-
-
-/**
- * Handle incoming message from the PSYC service.
- *
- * @param ch The channel the message is sent to.
- * @param pmsg The message.
- */
-static void
-handle_psyc_message (struct GNUNET_PSYC_Channel *ch,
-                     const struct GNUNET_PSYC_MessageHeader *msg)
-{
-  uint16_t size = ntohs (msg->header.size);
-  uint32_t flags = ntohl (msg->flags);
-
-  GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG,
-                           (struct GNUNET_MessageHeader *) msg);
-
-  if (MSG_STATE_START == ch->recv_state)
-  {
-    ch->recv_message_id = GNUNET_ntohll (msg->message_id);
-    ch->recv_flags = flags;
-    ch->recv_slave_key = msg->slave_key;
-    ch->recv_mod_value_size = 0;
-    ch->recv_mod_value_size_expected = 0;
-  }
-  else if (GNUNET_ntohll (msg->message_id) != ch->recv_message_id)
-  {
-    // FIXME
-    LOG (GNUNET_ERROR_TYPE_WARNING,
-         "Unexpected message ID. Got: %" PRIu64 ", expected: %" PRIu64 "\n",
-         GNUNET_ntohll (msg->message_id), ch->recv_message_id);
-    GNUNET_break_op (0);
-    recv_error (ch);
-    return;
-  }
-  else if (flags != ch->recv_flags)
-  {
-    LOG (GNUNET_ERROR_TYPE_WARNING,
-         "Unexpected message flags. Got: %lu, expected: %lu\n",
-         flags, ch->recv_flags);
-    GNUNET_break_op (0);
-    recv_error (ch);
-    return;
-  }
-
-  uint16_t pos = 0, psize = 0, ptype, size_eq, size_min;
-
-  for (pos = 0; sizeof (*msg) + pos < size; pos += psize)
-  {
-    const struct GNUNET_MessageHeader *pmsg
-      = (const struct GNUNET_MessageHeader *) ((char *) &msg[1] + pos);
-    psize = ntohs (pmsg->size);
-    ptype = ntohs (pmsg->type);
-    size_eq = size_min = 0;
-
-    if (psize < sizeof (*pmsg) || sizeof (*msg) + pos + psize > size)
-    {
-      GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
-                  "Dropping message of type %u with invalid size %u.\n",
-                  ptype, psize);
-      recv_error (ch);
-      return;
-    }
-
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "Received message part from PSYC.\n");
-    GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, pmsg);
-
-    switch (ptype)
-    {
-    case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
-      size_min = sizeof (struct GNUNET_PSYC_MessageMethod);
-      break;
-    case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
-      size_min = sizeof (struct GNUNET_PSYC_MessageModifier);
-      break;
-    case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT:
-    case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
-      size_min = sizeof (struct GNUNET_MessageHeader);
-      break;
-    case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
-    case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
-      size_eq = sizeof (struct GNUNET_MessageHeader);
-      break;
-    default:
-      GNUNET_break_op (0);
-      recv_error (ch);
-      return;
-    }
-
-    if (! ((0 < size_eq && psize == size_eq)
-           || (0 < size_min && size_min <= psize)))
-    {
-      GNUNET_break_op (0);
-      recv_error (ch);
-      return;
-    }
-
-    switch (ptype)
-    {
-    case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
-    {
-      struct GNUNET_PSYC_MessageMethod *meth
-        = (struct GNUNET_PSYC_MessageMethod *) pmsg;
-
-      if (MSG_STATE_START != ch->recv_state)
-      {
-        LOG (GNUNET_ERROR_TYPE_WARNING,
-             "Dropping out of order message method (%u).\n",
-             ch->recv_state);
-        /* It is normal to receive an incomplete message right after 
connecting,
-         * but should not happen later.
-         * FIXME: add a check for this condition.
-         */
-        GNUNET_break_op (0);
-        recv_error (ch);
-        return;
-      }
-
-      if ('\0' != *((char *) meth + psize - 1))
-      {
-        LOG (GNUNET_ERROR_TYPE_WARNING,
-             "Dropping message with malformed method. "
-             "Message ID: %" PRIu64 "\n", ch->recv_message_id);
-        GNUNET_break_op (0);
-        recv_error (ch);
-        return;
-      }
-      ch->recv_state = MSG_STATE_METHOD;
-      break;
-    }
-    case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
-    {
-      if (!(MSG_STATE_METHOD == ch->recv_state
-            || MSG_STATE_MODIFIER == ch->recv_state
-            || MSG_STATE_MOD_CONT == ch->recv_state))
-      {
-        LOG (GNUNET_ERROR_TYPE_WARNING,
-             "Dropping out of order message modifier (%u).\n",
-             ch->recv_state);
-        GNUNET_break_op (0);
-        recv_error (ch);
-        return;
-      }
-
-      struct GNUNET_PSYC_MessageModifier *mod
-        = (struct GNUNET_PSYC_MessageModifier *) pmsg;
-
-      uint16_t name_size = ntohs (mod->name_size);
-      ch->recv_mod_value_size_expected = ntohl (mod->value_size);
-      ch->recv_mod_value_size = psize - sizeof (*mod) - name_size - 1;
-
-      if (psize < sizeof (*mod) + name_size + 1
-          || '\0' != *((char *) &mod[1] + name_size)
-          || ch->recv_mod_value_size_expected < ch->recv_mod_value_size)
-      {
-        LOG (GNUNET_ERROR_TYPE_WARNING, "Dropping malformed modifier.\n");
-        GNUNET_break_op (0);
-        recv_error (ch);
-        return;
-      }
-      ch->recv_state = MSG_STATE_MODIFIER;
-      break;
-    }
-    case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT:
-    {
-      ch->recv_mod_value_size += psize - sizeof (*pmsg);
-
-      if (!(MSG_STATE_MODIFIER == ch->recv_state
-            || MSG_STATE_MOD_CONT == ch->recv_state)
-          || ch->recv_mod_value_size_expected < ch->recv_mod_value_size)
-      {
-        LOG (GNUNET_ERROR_TYPE_WARNING,
-             "Dropping out of order message modifier continuation "
-             "!(%u == %u || %u == %u) || %lu < %lu.\n",
-             MSG_STATE_MODIFIER, ch->recv_state,
-             MSG_STATE_MOD_CONT, ch->recv_state,
-             ch->recv_mod_value_size_expected, ch->recv_mod_value_size);
-        GNUNET_break_op (0);
-        recv_error (ch);
-        return;
-      }
-      break;
-    }
-    case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
-    {
-      if (ch->recv_state < MSG_STATE_METHOD
-          || ch->recv_mod_value_size_expected != ch->recv_mod_value_size)
-      {
-        LOG (GNUNET_ERROR_TYPE_WARNING,
-             "Dropping out of order message data fragment "
-             "(%u < %u || %lu != %lu).\n",
-             ch->recv_state, MSG_STATE_METHOD,
-             ch->recv_mod_value_size_expected, ch->recv_mod_value_size);
-
-        GNUNET_break_op (0);
-        recv_error (ch);
-        return;
-      }
-      ch->recv_state = MSG_STATE_DATA;
-      break;
-    }
-    }
-
-    GNUNET_PSYC_MessageCallback message_cb
-      = ch->recv_flags & GNUNET_PSYC_MESSAGE_HISTORIC
-      ? ch->hist_message_cb
-      : ch->message_cb;
-
-    if (NULL != message_cb)
-      message_cb (ch->cb_cls, ch->recv_message_id, ch->recv_flags, pmsg);
-
-    switch (ptype)
-    {
-    case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
-    case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
-      recv_reset (ch);
-      break;
-    }
-  }
-}
-
-
-/**
- * Handle incoming message acknowledgement from the PSYC service.
- *
- * @param ch The channel the acknowledgement is sent to.
- */
-static void
-handle_psyc_message_ack (struct GNUNET_PSYC_Channel *ch)
-{
-  if (0 == ch->tmit_ack_pending)
-  {
-    LOG (GNUNET_ERROR_TYPE_WARNING, "Ignoring extraneous message ACK\n");
-    GNUNET_break (0);
-    return;
-  }
-  ch->tmit_ack_pending--;
-
-  switch (ch->tmit.state)
-  {
-  case MSG_STATE_MODIFIER:
-  case MSG_STATE_MOD_CONT:
-    if (GNUNET_NO == ch->tmit_paused)
-      channel_transmit_mod (ch);
-    break;
-
-  case MSG_STATE_DATA:
-    if (GNUNET_NO == ch->tmit_paused)
-      channel_transmit_data (ch);
-    break;
-
-  case MSG_STATE_END:
-  case MSG_STATE_CANCEL:
-    break;
-
-  default:
-    LOG (GNUNET_ERROR_TYPE_DEBUG,
-         "Ignoring message ACK in state %u.\n", ch->tmit.state);
-  }
-}
-
-
-static void
-handle_psyc_join_request (struct GNUNET_PSYC_Master *mst,
-                          const struct MasterJoinRequest *req)
-{
-  struct GNUNET_PSYC_MessageHeader *msg = NULL;
-  if (ntohs (req->header.size) <= sizeof (*req) + sizeof (*msg))
-    msg = (struct GNUNET_PSYC_MessageHeader *) &req[1];
-
   struct GNUNET_PSYC_JoinHandle *jh = GNUNET_malloc (sizeof (*jh));
   jh->mst = mst;
   jh->slave_key = req->slave_key;
 
   if (NULL != mst->join_req_cb)
-    mst->join_req_cb (mst->ch.cb_cls, &req->slave_key, msg, jh);
+    mst->join_req_cb (mst->cb_cls, &req->slave_key, pmsg, jh);
 }
 
 
 static void
-handle_psyc_join_decision (struct GNUNET_PSYC_Slave *slv,
-                           const struct SlaveJoinDecision *dcsn)
+slave_recv_join_ack (void *cls,
+                     struct GNUNET_CLIENT_MANAGER_Connection *client,
+                     const struct GNUNET_MessageHeader *msg)
 {
-  struct GNUNET_PSYC_MessageHeader *msg = NULL;
-  if (ntohs (dcsn->header.size) <= sizeof (*dcsn) + sizeof (*msg))
-    msg = (struct GNUNET_PSYC_MessageHeader *) &dcsn[1];
-
-  struct GNUNET_PSYC_JoinHandle *jh = GNUNET_malloc (sizeof (*jh));
-  if (NULL != slv->join_dcsn_cb)
-    slv->join_dcsn_cb (slv->ch.cb_cls, ntohl (dcsn->is_admitted), msg);
+  struct GNUNET_PSYC_Slave *
+    slv = GNUNET_CLIENT_MANAGER_get_user_context_ (client,
+                                                   sizeof (struct 
GNUNET_PSYC_Channel));
+  struct CountersResult *cres = (struct CountersResult *) msg;
+  if (NULL != slv->connect_cb)
+    slv->connect_cb (slv->cb_cls, GNUNET_ntohll (cres->max_message_id));
 }
 
 
-/**
- * Type of a function to call when we receive a message
- * from the service.
- *
- * @param cls closure
- * @param msg message received, NULL on timeout or fatal error
- */
 static void
-message_handler (void *cls,
-                 const struct GNUNET_MessageHeader *msg)
+slave_recv_join_decision (void *cls,
+                          struct GNUNET_CLIENT_MANAGER_Connection *client,
+                          const struct GNUNET_MessageHeader *msg)
 {
-  struct GNUNET_PSYC_Channel *ch = cls;
-  struct GNUNET_PSYC_Master *mst = cls;
-  struct GNUNET_PSYC_Slave *slv = cls;
+  struct GNUNET_PSYC_Slave *
+    slv = GNUNET_CLIENT_MANAGER_get_user_context_ (client,
+                                                   sizeof (struct 
GNUNET_PSYC_Channel));
+  const struct SlaveJoinDecision *
+    dcsn = (const struct SlaveJoinDecision *) msg;
 
-  if (NULL == msg)
-  {
-    // timeout / disconnected from service, reconnect
-    reschedule_connect (ch);
-    return;
-  }
-  uint16_t size_eq = 0;
-  uint16_t size_min = 0;
-  uint16_t size = ntohs (msg->size);
-  uint16_t type = ntohs (msg->type);
+  struct GNUNET_PSYC_MessageHeader *pmsg = NULL;
+  if (ntohs (dcsn->header.size) <= sizeof (*dcsn) + sizeof (*pmsg))
+    pmsg = (struct GNUNET_PSYC_MessageHeader *) &dcsn[1];
 
-  LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "Received message of type %d and size %u from PSYC service\n",
-       type, size);
-
-  switch (type)
-  {
-  case GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK:
-  case GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK:
-    size_eq = sizeof (struct CountersResult);
-    break;
-  case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE:
-    size_min = sizeof (struct GNUNET_PSYC_MessageHeader);
-    break;
-  case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK:
-    size_eq = sizeof (struct GNUNET_MessageHeader);
-    break;
-  case GNUNET_MESSAGE_TYPE_PSYC_JOIN_REQUEST:
-    size_min = sizeof (struct MasterJoinRequest);
-    break;
-  case GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION:
-    size_min = sizeof (struct SlaveJoinDecision);
-    break;
-  default:
-    GNUNET_break_op (0);
-    return;
-  }
-
-  if (! ((0 < size_eq && size == size_eq)
-         || (0 < size_min && size_min <= size)))
-  {
-    GNUNET_break_op (0);
-    return;
-  }
-
-  switch (type)
-  {
-  case GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK:
-  {
-    struct CountersResult *cres = (struct CountersResult *) msg;
-    if (NULL != mst->start_cb)
-      mst->start_cb (ch->cb_cls, GNUNET_ntohll (cres->max_message_id));
-    break;
-  }
-  case GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK:
-  {
-    struct CountersResult *cres = (struct CountersResult *) msg;
-    if (NULL != slv->connect_cb)
-      slv->connect_cb (ch->cb_cls, GNUNET_ntohll (cres->max_message_id));
-    break;
-  }
-  case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK:
-  {
-    handle_psyc_message_ack (ch);
-    break;
-  }
-
-  case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE:
-    handle_psyc_message (ch, (const struct GNUNET_PSYC_MessageHeader *) msg);
-    break;
-
-  case GNUNET_MESSAGE_TYPE_PSYC_JOIN_REQUEST:
-    handle_psyc_join_request ((struct GNUNET_PSYC_Master *) ch,
-                              (const struct MasterJoinRequest *) msg);
-    break;
-
-  case GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION:
-    handle_psyc_join_decision ((struct GNUNET_PSYC_Slave *) ch,
-                               (const struct SlaveJoinDecision *) msg);
-    break;
-  }
-
-  if (NULL != ch->client)
-  {
-    GNUNET_CLIENT_receive (ch->client, &message_handler, ch,
-                           GNUNET_TIME_UNIT_FOREVER_REL);
-  }
+  struct GNUNET_PSYC_JoinHandle *jh = GNUNET_malloc (sizeof (*jh));
+  if (NULL != slv->join_dcsn_cb)
+    slv->join_dcsn_cb (slv->cb_cls, ntohl (dcsn->is_admitted), pmsg);
 }
 
 
-/**
- * Transmit next message to service.
- *
- * @param cls  The struct GNUNET_PSYC_Channel.
- * @param size Number of bytes available in @a buf.
- * @param buf  Where to copy the message.
- *
- * @return Number of bytes copied to @a buf.
- */
-static size_t
-send_next_message (void *cls, size_t size, void *buf)
+static struct GNUNET_CLIENT_MANAGER_MessageHandler master_handlers[] =
 {
-  LOG (GNUNET_ERROR_TYPE_DEBUG, "send_next_message()\n");
-  struct GNUNET_PSYC_Channel *ch = cls;
-  struct MessageQueue *mq = ch->tmit_head;
-  if (NULL == mq)
-    return 0;
-  struct GNUNET_MessageHeader *qmsg = (struct GNUNET_MessageHeader *) &mq[1];
-  size_t ret = ntohs (qmsg->size);
-  ch->th = NULL;
-  if (ret > size)
-  {
-    reschedule_connect (ch);
-    return 0;
-  }
-  memcpy (buf, qmsg, ret);
+  { &channel_recv_disconnect, NULL, 0, 0, GNUNET_NO },
 
-  GNUNET_CONTAINER_DLL_remove (ch->tmit_head, ch->tmit_tail, mq);
-  GNUNET_free (mq);
+  { &channel_recv_message, NULL,
+    GNUNET_MESSAGE_TYPE_PSYC_MESSAGE,
+    sizeof (struct GNUNET_PSYC_MessageHeader), GNUNET_YES },
 
-  if (NULL != ch->tmit_head)
-    transmit_next (ch);
+  { &channel_recv_message_ack, NULL,
+    GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK,
+    sizeof (struct GNUNET_MessageHeader), GNUNET_NO },
 
-  if (GNUNET_NO == ch->in_receive)
-  {
-    ch->in_receive = GNUNET_YES;
-    GNUNET_CLIENT_receive (ch->client, &message_handler, ch,
-                           GNUNET_TIME_UNIT_FOREVER_REL);
-  }
-  return ret;
-}
+  { &master_recv_start_ack, NULL,
+    GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK,
+    sizeof (struct CountersResult), GNUNET_NO },
 
+  { &master_recv_join_request, NULL,
+    GNUNET_MESSAGE_TYPE_PSYC_JOIN_REQUEST,
+    sizeof (struct MasterJoinRequest), GNUNET_YES },
 
-/**
- * Schedule transmission of the next message from our queue.
- *
- * @param ch PSYC handle.
- */
-static void
-transmit_next (struct GNUNET_PSYC_Channel *ch)
-{
-  LOG (GNUNET_ERROR_TYPE_DEBUG, "transmit_next()\n");
-  if (NULL != ch->th || NULL == ch->client)
-    return;
+  { NULL, NULL, 0, 0, GNUNET_NO }
+};
 
-  struct MessageQueue *mq = ch->tmit_head;
-  if (NULL == mq)
-    return;
-  struct GNUNET_MessageHeader *qmsg = (struct GNUNET_MessageHeader *) &mq[1];
 
-  ch->th = GNUNET_CLIENT_notify_transmit_ready (ch->client,
-                                                ntohs (qmsg->size),
-                                                GNUNET_TIME_UNIT_FOREVER_REL,
-                                                GNUNET_NO,
-                                                &send_next_message,
-                                                ch);
-}
-
-
-/**
- * Try again to connect to the PSYC service.
- *
- * @param cls Channel handle.
- * @param tc Scheduler context.
- */
-static void
-reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+static struct GNUNET_CLIENT_MANAGER_MessageHandler slave_handlers[] =
 {
-  struct GNUNET_PSYC_Channel *ch = cls;
+  { &channel_recv_disconnect, NULL, 0, 0, GNUNET_NO },
 
-  recv_reset (ch);
-  ch->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
-  LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "Connecting to PSYC service.\n");
-  GNUNET_assert (NULL == ch->client);
-  ch->client = GNUNET_CLIENT_connect ("psyc", ch->cfg);
-  GNUNET_assert (NULL != ch->client);
-  uint16_t reconn_size = ntohs (ch->reconnect_msg->size);
+  { &channel_recv_message, NULL,
+    GNUNET_MESSAGE_TYPE_PSYC_MESSAGE,
+    sizeof (struct GNUNET_PSYC_MessageHeader), GNUNET_YES },
 
-  if (NULL == ch->tmit_head ||
-      0 != memcmp (&ch->tmit_head[1], ch->reconnect_msg, reconn_size))
-  {
-    struct MessageQueue *mq = GNUNET_malloc (sizeof (*mq) + reconn_size);
-    memcpy (&mq[1], ch->reconnect_msg, reconn_size);
-    GNUNET_CONTAINER_DLL_insert (ch->tmit_head, ch->tmit_tail, mq);
-  }
-  transmit_next (ch);
-}
+  { &channel_recv_message_ack, NULL,
+    GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK,
+    sizeof (struct GNUNET_MessageHeader), GNUNET_NO },
 
+  { &slave_recv_join_ack, NULL,
+    GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK,
+    sizeof (struct CountersResult), GNUNET_NO },
 
-/**
- * Disconnect from the PSYC service.
- *
- * @param c  Channel handle to disconnect.
- */
-static void
-disconnect (void *c)
-{
-  struct GNUNET_PSYC_Channel *ch = c;
+  { &slave_recv_join_decision, NULL,
+    GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION,
+    sizeof (struct SlaveJoinDecision), GNUNET_YES },
 
-  GNUNET_assert (NULL != ch);
-  if (ch->tmit_head != ch->tmit_tail)
-  {
-    LOG (GNUNET_ERROR_TYPE_ERROR,
-         "Disconnecting while there are still outstanding messages!\n");
-    GNUNET_break (0);
-  }
-  if (ch->reconnect_task != GNUNET_SCHEDULER_NO_TASK)
-  {
-    GNUNET_SCHEDULER_cancel (ch->reconnect_task);
-    ch->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
-  }
-  if (NULL != ch->th)
-  {
-    GNUNET_CLIENT_notify_transmit_ready_cancel (ch->th);
-    ch->th = NULL;
-  }
-  if (NULL != ch->client)
-  {
-    GNUNET_CLIENT_disconnect (ch->client);
-    ch->client = NULL;
-  }
-  if (NULL != ch->reconnect_msg)
-  {
-    GNUNET_free (ch->reconnect_msg);
-    ch->reconnect_msg = NULL;
-  }
-}
+  { NULL, NULL, 0, 0, GNUNET_NO }
+};
 
 
 /**
@@ -1227,24 +370,29 @@
                           void *cls)
 {
   struct GNUNET_PSYC_Master *mst = GNUNET_malloc (sizeof (*mst));
-  struct GNUNET_PSYC_Channel *ch = &mst->ch;
+  struct GNUNET_PSYC_Channel *chn = &mst->chn;
+
   struct MasterStartRequest *req = GNUNET_malloc (sizeof (*req));
-
   req->header.size = htons (sizeof (*req));
   req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START);
   req->channel_key = *channel_key;
   req->policy = policy;
 
+  chn->connect_msg = (struct GNUNET_MessageHeader *) req;
+  chn->cfg = cfg;
+  chn->is_master = GNUNET_YES;
+
   mst->start_cb = start_cb;
   mst->join_req_cb = join_request_cb;
-  ch->message_cb = message_cb;
-  ch->cb_cls = cls;
-  ch->cfg = cfg;
-  ch->is_master = GNUNET_YES;
-  ch->reconnect_msg = (struct GNUNET_MessageHeader *) req;
-  ch->reconnect_delay = GNUNET_TIME_UNIT_ZERO;
-  ch->reconnect_task = GNUNET_SCHEDULER_add_now (&reconnect, mst);
+  mst->cb_cls = cls;
 
+  chn->client = GNUNET_CLIENT_MANAGER_connect (cfg, "psyc", master_handlers);
+  GNUNET_CLIENT_MANAGER_set_user_context_ (chn->client, mst, sizeof (*chn));
+
+  chn->tmit = GNUNET_PSYC_transmit_create (chn->client);
+  chn->recv = GNUNET_PSYC_receive_create (message_cb, message_cb, cls);
+
+  channel_send_connect_msg (chn);
   return mst;
 }
 
@@ -1253,12 +401,13 @@
  * Stop a PSYC master channel.
  *
  * @param master PSYC channel master to stop.
+ * @param keep_active  FIXME
  */
 void
-GNUNET_PSYC_master_stop (struct GNUNET_PSYC_Master *master)
+GNUNET_PSYC_master_stop (struct GNUNET_PSYC_Master *mst)
 {
-  disconnect (master);
-  GNUNET_free (master);
+  GNUNET_CLIENT_MANAGER_disconnect (mst->chn.client, GNUNET_YES);
+  GNUNET_free (mst);
 }
 
 
@@ -1292,7 +441,7 @@
                            const struct GNUNET_PeerIdentity *relays,
                            const struct GNUNET_PSYC_MessageHeader *join_resp)
 {
-  struct GNUNET_PSYC_Channel *ch = &jh->mst->ch;
+  struct GNUNET_PSYC_Channel *chn = &jh->mst->chn;
   struct MasterJoinDecision *dcsn;
   uint16_t join_resp_size
     = (NULL != join_resp) ? ntohs (join_resp->header.size) : 0;
@@ -1302,9 +451,7 @@
       < sizeof (*dcsn) + relay_size + join_resp_size)
     return GNUNET_SYSERR;
 
-  struct MessageQueue *mq = GNUNET_malloc (sizeof (*mq) + sizeof (*dcsn)
-                                           + relay_size + join_resp_size);
-  dcsn = (struct MasterJoinDecision *) &mq[1];
+  dcsn = GNUNET_malloc (sizeof (*dcsn) + relay_size + join_resp_size);
   dcsn->header.size = htons (sizeof (*dcsn) + relay_size + join_resp_size);
   dcsn->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION);
   dcsn->is_admitted = htonl (is_admitted);
@@ -1313,8 +460,7 @@
   if (0 < join_resp_size)
     memcpy (&dcsn[1], join_resp, join_resp_size);
 
-  GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, mq);
-  transmit_next (ch);
+  GNUNET_CLIENT_MANAGER_transmit (chn->client, &dcsn->header);
   return GNUNET_OK;
 }
 
@@ -1332,44 +478,63 @@
  * @return Transmission handle, NULL on error (i.e. more than one request 
queued).
  */
 struct GNUNET_PSYC_MasterTransmitHandle *
-GNUNET_PSYC_master_transmit (struct GNUNET_PSYC_Master *master,
+GNUNET_PSYC_master_transmit (struct GNUNET_PSYC_Master *mst,
                              const char *method_name,
                              GNUNET_PSYC_TransmitNotifyModifier notify_mod,
                              GNUNET_PSYC_TransmitNotifyData notify_data,
                              void *notify_cls,
                              enum GNUNET_PSYC_MasterTransmitFlags flags)
 {
-  return (struct GNUNET_PSYC_MasterTransmitHandle *)
-    channel_transmit (&master->ch, method_name, notify_mod, notify_data,
-                      notify_cls, flags);
+  if (GNUNET_OK
+      == GNUNET_PSYC_transmit_message (mst->chn.tmit, method_name, NULL,
+                                       notify_mod, notify_data, notify_cls,
+                                       flags))
+    return (struct GNUNET_PSYC_MasterTransmitHandle *) mst->chn.tmit;
+  else
+    return NULL;
 }
 
 
 /**
  * Resume transmission to the channel.
  *
- * @param th Handle of the request that is being resumed.
+ * @param tmit  Handle of the request that is being resumed.
  */
 void
-GNUNET_PSYC_master_transmit_resume (struct GNUNET_PSYC_MasterTransmitHandle 
*th)
+GNUNET_PSYC_master_transmit_resume (struct GNUNET_PSYC_MasterTransmitHandle 
*tmit)
 {
-  channel_transmit_resume ((struct GNUNET_PSYC_ChannelTransmitHandle *) th);
+  GNUNET_PSYC_transmit_resume ((struct GNUNET_PSYC_TransmitHandle *) tmit);
 }
 
 
 /**
  * Abort transmission request to the channel.
  *
- * @param th Handle of the request that is being aborted.
+ * @param tmit  Handle of the request that is being aborted.
  */
 void
-GNUNET_PSYC_master_transmit_cancel (struct GNUNET_PSYC_MasterTransmitHandle 
*th)
+GNUNET_PSYC_master_transmit_cancel (struct GNUNET_PSYC_MasterTransmitHandle 
*tmit)
 {
-  channel_transmit_cancel ((struct GNUNET_PSYC_ChannelTransmitHandle *) th);
+  GNUNET_PSYC_transmit_cancel ((struct GNUNET_PSYC_TransmitHandle *) tmit);
 }
 
 
 /**
+ * Convert a channel @a master to a @e channel handle to access the @e channel
+ * APIs.
+ *
+ * @param master Channel master handle.
+ *
+ * @return Channel handle, valid for as long as @a master is valid.
+ */
+struct GNUNET_PSYC_Channel *
+GNUNET_PSYC_master_get_channel (struct GNUNET_PSYC_Master *master)
+{
+  return &master->chn;
+}
+
+
+/**
  * Join a PSYC channel.
  *
  * The entity joining is always the local peer.  The user must immediately use
@@ -1420,7 +585,7 @@
                         uint16_t data_size)
 {
   struct GNUNET_PSYC_Slave *slv = GNUNET_malloc (sizeof (*slv));
-  struct GNUNET_PSYC_Channel *ch = &slv->ch;
+  struct GNUNET_PSYC_Channel *chn = &slv->chn;
   struct SlaveJoinRequest *req
     = GNUNET_malloc (sizeof (*req) + relay_count * sizeof (*relays));
   req->header.size = htons (sizeof (*req)
@@ -1432,17 +597,21 @@
   req->relay_count = htonl (relay_count);
   memcpy (&req[1], relays, relay_count * sizeof (*relays));
 
+  chn->connect_msg = (struct GNUNET_MessageHeader *) req;
+  chn->cfg = cfg;
+  chn->is_master = GNUNET_NO;
+
   slv->connect_cb = connect_cb;
   slv->join_dcsn_cb = join_decision_cb;
-  ch->message_cb = message_cb;
-  ch->cb_cls = cls;
+  slv->cb_cls = cls;
 
-  ch->cfg = cfg;
-  ch->is_master = GNUNET_NO;
-  ch->reconnect_msg = (struct GNUNET_MessageHeader *) req;
-  ch->reconnect_delay = GNUNET_TIME_UNIT_ZERO;
-  ch->reconnect_task = GNUNET_SCHEDULER_add_now (&reconnect, slv);
+  chn->client = GNUNET_CLIENT_MANAGER_connect (cfg, "psyc", slave_handlers);
+  GNUNET_CLIENT_MANAGER_set_user_context_ (chn->client, slv, sizeof (*chn));
 
+  chn->recv = GNUNET_PSYC_receive_create (message_cb, message_cb, cls);
+  chn->tmit = GNUNET_PSYC_transmit_create (chn->client);
+
+  channel_send_connect_msg (chn);
   return slv;
 }
 
@@ -1456,10 +625,10 @@
  * @param slave Slave handle.
  */
 void
-GNUNET_PSYC_slave_part (struct GNUNET_PSYC_Slave *slave)
+GNUNET_PSYC_slave_part (struct GNUNET_PSYC_Slave *slv)
 {
-  disconnect (slave);
-  GNUNET_free (slave);
+  GNUNET_CLIENT_MANAGER_disconnect (slv->chn.client, GNUNET_YES);
+  GNUNET_free (slv);
 }
 
 
@@ -1477,69 +646,59 @@
  *         queued).
  */
 struct GNUNET_PSYC_SlaveTransmitHandle *
-GNUNET_PSYC_slave_transmit (struct GNUNET_PSYC_Slave *slave,
+GNUNET_PSYC_slave_transmit (struct GNUNET_PSYC_Slave *slv,
                             const char *method_name,
                             GNUNET_PSYC_TransmitNotifyModifier notify_mod,
                             GNUNET_PSYC_TransmitNotifyData notify_data,
                             void *notify_cls,
                             enum GNUNET_PSYC_SlaveTransmitFlags flags)
+
 {
-  return (struct GNUNET_PSYC_SlaveTransmitHandle *)
-    channel_transmit (&slave->ch, method_name,
-                      notify_mod, notify_data, notify_cls, flags);
+  if (GNUNET_OK
+      == GNUNET_PSYC_transmit_message (slv->chn.tmit, method_name, NULL,
+                                       notify_mod, notify_data, notify_cls,
+                                       flags))
+    return (struct GNUNET_PSYC_SlaveTransmitHandle *) slv->chn.tmit;
+  else
+    return NULL;
 }
 
 
 /**
  * Resume transmission to the master.
  *
- * @param th Handle of the request that is being resumed.
+ * @param tmit Handle of the request that is being resumed.
  */
 void
-GNUNET_PSYC_slave_transmit_resume (struct GNUNET_PSYC_SlaveTransmitHandle *th)
+GNUNET_PSYC_slave_transmit_resume (struct GNUNET_PSYC_SlaveTransmitHandle 
*tmit)
 {
-  channel_transmit_resume ((struct GNUNET_PSYC_ChannelTransmitHandle *) th);
+  GNUNET_PSYC_transmit_resume ((struct GNUNET_PSYC_TransmitHandle *) tmit);
 }
 
 
 /**
  * Abort transmission request to master.
  *
- * @param th Handle of the request that is being aborted.
+ * @param tmit Handle of the request that is being aborted.
  */
 void
-GNUNET_PSYC_slave_transmit_cancel (struct GNUNET_PSYC_SlaveTransmitHandle *th)
+GNUNET_PSYC_slave_transmit_cancel (struct GNUNET_PSYC_SlaveTransmitHandle 
*tmit)
 {
-  channel_transmit_cancel ((struct GNUNET_PSYC_ChannelTransmitHandle *) th);
+  GNUNET_PSYC_transmit_cancel ((struct GNUNET_PSYC_TransmitHandle *) tmit);
 }
 
 
 /**
- * Convert a channel @a master to a @e channel handle to access the @e channel
- * APIs.
- *
- * @param master Channel master handle.
- *
- * @return Channel handle, valid for as long as @a master is valid.
- */
-struct GNUNET_PSYC_Channel *
-GNUNET_PSYC_master_get_channel (struct GNUNET_PSYC_Master *master)
-{
-  return &master->ch;
-}
-
-
-/**
  * Convert @a slave to a @e channel handle to access the @e channel APIs.
  *
- * @param slave Slave handle.
+ * @param slv Slave handle.
  *
  * @return Channel handle, valid for as long as @a slave is valid.
  */
 struct GNUNET_PSYC_Channel *
-GNUNET_PSYC_slave_get_channel (struct GNUNET_PSYC_Slave *slave)
+GNUNET_PSYC_slave_get_channel (struct GNUNET_PSYC_Slave *slv)
 {
-  return &slave->ch;
+  return &slv->chn;
 }
 
 
@@ -1565,23 +724,17 @@
  * @param effective_since Addition of slave is in effect since this message ID.
  */
 void
-GNUNET_PSYC_channel_slave_add (struct GNUNET_PSYC_Channel *channel,
+GNUNET_PSYC_channel_slave_add (struct GNUNET_PSYC_Channel *chn,
                                const struct GNUNET_CRYPTO_EddsaPublicKey 
*slave_key,
                                uint64_t announced_at,
                                uint64_t effective_since)
 {
-  struct ChannelSlaveAdd *slvadd;
-  struct MessageQueue *mq = GNUNET_malloc (sizeof (*mq) + sizeof (*slvadd));
-
-  slvadd = (struct ChannelSlaveAdd *) &mq[1];
-  slvadd->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_ADD);
-  slvadd->header.size = htons (sizeof (*slvadd));
-  slvadd->announced_at = GNUNET_htonll (announced_at);
-  slvadd->effective_since = GNUNET_htonll (effective_since);
-  GNUNET_CONTAINER_DLL_insert_tail (channel->tmit_head,
-                                    channel->tmit_tail,
-                                    mq);
-  transmit_next (channel);
+  struct ChannelSlaveAdd *add = GNUNET_malloc (sizeof (*add));
+  add->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_ADD);
+  add->header.size = htons (sizeof (*add));
+  add->announced_at = GNUNET_htonll (announced_at);
+  add->effective_since = GNUNET_htonll (effective_since);
+  GNUNET_CLIENT_MANAGER_transmit (chn->client, &add->header);
 }
 
 
@@ -1607,21 +760,15 @@
  * @param announced_at ID of the message that announced the membership change.
  */
 void
-GNUNET_PSYC_channel_slave_remove (struct GNUNET_PSYC_Channel *channel,
+GNUNET_PSYC_channel_slave_remove (struct GNUNET_PSYC_Channel *chn,
                                   const struct GNUNET_CRYPTO_EddsaPublicKey 
*slave_key,
                                   uint64_t announced_at)
 {
-  struct ChannelSlaveRemove *slvrm;
-  struct MessageQueue *mq = GNUNET_malloc (sizeof (*mq) + sizeof (*slvrm));
-
-  slvrm = (struct ChannelSlaveRemove *) &mq[1];
-  slvrm->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_RM);
-  slvrm->header.size = htons (sizeof (*slvrm));
-  slvrm->announced_at = GNUNET_htonll (announced_at);
-  GNUNET_CONTAINER_DLL_insert_tail (channel->tmit_head,
-                                    channel->tmit_tail,
-                                    mq);
-  transmit_next (channel);
+  struct ChannelSlaveRemove *rm = GNUNET_malloc (sizeof (*rm));
+  rm->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_RM);
+  rm->header.size = htons (sizeof (*rm));
+  rm->announced_at = GNUNET_htonll (announced_at);
+  GNUNET_CLIENT_MANAGER_transmit (chn->client, &rm->header);
 }
 
 

Modified: gnunet/src/psyc/psyc_util_lib.c
===================================================================
--- gnunet/src/psyc/psyc_util_lib.c     2014-05-29 16:35:53 UTC (rev 33442)
+++ gnunet/src/psyc/psyc_util_lib.c     2014-05-29 16:35:55 UTC (rev 33443)
@@ -28,6 +28,7 @@
 
 #include "platform.h"
 #include "gnunet_util_lib.h"
+#include "gnunet_env_lib.h"
 #include "gnunet_psyc_service.h"
 #include "gnunet_psyc_util_lib.h"
 

Modified: gnunet/src/psyc/test_psyc.c
===================================================================
--- gnunet/src/psyc/test_psyc.c 2014-05-29 16:35:53 UTC (rev 33442)
+++ gnunet/src/psyc/test_psyc.c 2014-05-29 16:35:55 UTC (rev 33443)
@@ -68,6 +68,7 @@
   struct GNUNET_PSYC_MasterTransmitHandle *mst_tmit;
   struct GNUNET_PSYC_SlaveTransmitHandle *slv_tmit;
   struct GNUNET_ENV_Environment *env;
+  struct GNUNET_ENV_Modifier *mod;
   char *data[16];
   const char *mod_value;
   size_t mod_value_size;
@@ -79,7 +80,7 @@
 
 struct TransmitClosure *tmit;
 
-static int join_req_count;
+static uint8_t join_req_count;
 
 enum
 {
@@ -183,7 +184,7 @@
 
   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
               "Master got message part of type %u and size %u "
-              "belonging to message ID %llu with flags %xu\n",
+              "belonging to message ID %llu with flags %x\n",
               type, size, message_id, flags);
 
   switch (test)
@@ -227,7 +228,7 @@
 
   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
               "Slave got message part of type %u and size %u "
-              "belonging to message ID %llu with flags %xu\n",
+              "belonging to message ID %llu with flags %x\n",
               type, size, message_id, flags);
 
   switch (test)
@@ -256,6 +257,48 @@
 
 
 static int
+tmit_notify_data (void *cls, uint16_t *data_size, void *data)
+{
+  struct TransmitClosure *tmit = cls;
+  if (0 == tmit->data_count)
+  {
+    *data_size = 0;
+    return GNUNET_YES;
+  }
+
+  uint16_t size = strlen (tmit->data[tmit->n]);
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Transmit notify data: %u bytes available, "
+              "processing fragment %u/%u (size %u).\n",
+              *data_size, tmit->n + 1, tmit->data_count, size);
+  if (*data_size < size)
+  {
+    *data_size = 0;
+    GNUNET_assert (0);
+    return GNUNET_SYSERR;
+  }
+
+  if (GNUNET_YES != tmit->paused && 0 < tmit->data_delay[tmit->n])
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transmission paused.\n");
+    tmit->paused = GNUNET_YES;
+    GNUNET_SCHEDULER_add_delayed (
+      GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
+                                     tmit->data_delay[tmit->n]),
+      &transmit_resume, tmit);
+    *data_size = 0;
+    return GNUNET_NO;
+  }
+  tmit->paused = GNUNET_NO;
+
+  *data_size = size;
+  memcpy (data, tmit->data[tmit->n], size);
+
+  return ++tmit->n < tmit->data_count ? GNUNET_NO : GNUNET_YES;
+}
+
+
+static int
 tmit_notify_mod (void *cls, uint16_t *data_size, void *data, uint8_t *oper,
                  uint32_t *full_value_size)
 {
@@ -265,41 +308,39 @@
               "%u modifiers left to process.\n",
               *data_size, GNUNET_ENV_environment_get_count (tmit->env));
 
-  enum GNUNET_ENV_Operator op = 0;
-  const char *name = NULL;
-  const char *value = NULL;
   uint16_t name_size = 0;
   size_t value_size = 0;
+  const char *value = NULL;
 
-  if (NULL != oper)
+  if (NULL != oper && NULL != tmit->mod)
   { /* New modifier */
-    if (GNUNET_NO == GNUNET_ENV_environment_shift (tmit->env, &op, &name,
-                                                   (void *) &value, 
&value_size))
+    tmit->mod = tmit->mod->next;
+    if (NULL == tmit->mod)
     { /* No more modifiers, continue with data */
       *data_size = 0;
       return GNUNET_YES;
     }
 
-    GNUNET_assert (value_size < UINT32_MAX);
-    *full_value_size = value_size;
-    *oper = op;
-    name_size = strlen (name);
+    GNUNET_assert (tmit->mod->value_size < UINT32_MAX);
+    *full_value_size = tmit->mod->value_size;
+    *oper = tmit->mod->oper;
+    name_size = strlen (tmit->mod->name);
 
-    if (name_size + 1 + value_size <= *data_size)
+    if (name_size + 1 + tmit->mod->value_size <= *data_size)
     {
-      *data_size = name_size + 1 + value_size;
+      *data_size = name_size + 1 + tmit->mod->value_size;
     }
     else
     {
-      tmit->mod_value_size = value_size;
+      tmit->mod_value_size = tmit->mod->value_size;
       value_size = *data_size - name_size - 1;
       tmit->mod_value_size -= value_size;
-      tmit->mod_value = value + value_size;
+      tmit->mod_value = tmit->mod->value + value_size;
     }
 
-    memcpy (data, name, name_size);
+    memcpy (data, tmit->mod->name, name_size);
     ((char *)data)[name_size] = '\0';
-    memcpy ((char *)data + name_size + 1, value, value_size);
+    memcpy ((char *)data + name_size + 1, tmit->mod->value, value_size);
   }
   else if (NULL != tmit->mod_value && 0 < tmit->mod_value_size)
   { /* Modifier continuation */
@@ -333,48 +374,6 @@
 }
 
 
-static int
-tmit_notify_data (void *cls, uint16_t *data_size, void *data)
-{
-  struct TransmitClosure *tmit = cls;
-  if (0 == tmit->data_count)
-  {
-    *data_size = 0;
-    return GNUNET_YES;
-  }
-
-  uint16_t size = strlen (tmit->data[tmit->n]);
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Transmit notify data: %u bytes available, "
-              "processing fragment %u/%u (size %u).\n",
-              *data_size, tmit->n + 1, tmit->data_count, size);
-  if (*data_size < size)
-  {
-    *data_size = 0;
-    GNUNET_assert (0);
-    return GNUNET_SYSERR;
-  }
-
-  if (GNUNET_YES != tmit->paused && 0 < tmit->data_delay[tmit->n])
-  {
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transmission paused.\n");
-    tmit->paused = GNUNET_YES;
-    GNUNET_SCHEDULER_add_delayed (
-      GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
-                                     tmit->data_delay[tmit->n]),
-      &transmit_resume, tmit);
-    *data_size = 0;
-    return GNUNET_NO;
-  }
-  tmit->paused = GNUNET_NO;
-
-  *data_size = size;
-  memcpy (data, tmit->data[tmit->n], size);
-
-  return ++tmit->n < tmit->data_count ? GNUNET_NO : GNUNET_YES;
-}
-
-
 static void
 slave_join ();
 
@@ -388,7 +387,7 @@
 
   if (GNUNET_YES != is_admitted)
   { /* First join request is refused, retry. */
-    //GNUNET_assert (1 == join_req_count);
+    GNUNET_assert (1 == join_req_count);
     slave_join ();
     return;
   }
@@ -403,6 +402,7 @@
                               "_abc", "abc def", 7);
   GNUNET_ENV_environment_add (tmit->env, GNUNET_ENV_OP_ASSIGN,
                               "_abc_def", "abc def ghi", 11);
+  tmit->mod = GNUNET_ENV_environment_head (tmit->env);
   tmit->n = 0;
   tmit->data[0] = "slave test";
   tmit->data_count = 1;
@@ -421,8 +421,8 @@
   struct GNUNET_HashCode slave_key_hash;
   GNUNET_CRYPTO_hash (slave_key, sizeof (*slave_key), &slave_key_hash);
   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
-              "Got join request from %s.\n",
-              GNUNET_h2s (&slave_key_hash));
+              "Got join request #%u from %s.\n",
+              join_req_count, GNUNET_h2s (&slave_key_hash));
 
   /* Reject first request */
   int is_admitted = (0 < join_req_count++) ? GNUNET_YES : GNUNET_NO;
@@ -493,6 +493,7 @@
                               name_cont, val_cont,
                               GNUNET_PSYC_MODIFIER_MAX_PAYLOAD - name_cont_size
                               + GNUNET_PSYC_MOD_CONT_MAX_PAYLOAD);
+  tmit->mod = GNUNET_ENV_environment_head (tmit->env);
   tmit->data[0] = "foo";
   tmit->data[1] =  GNUNET_malloc (GNUNET_PSYC_DATA_MAX_PAYLOAD + 1);
   for (i = 0; i < GNUNET_PSYC_DATA_MAX_PAYLOAD; i++)




reply via email to

[Prev in Thread] Current Thread [Next in Thread]