gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r30121 - in gnunet/src: include multicast psyc


From: gnunet
Subject: [GNUnet-SVN] r30121 - in gnunet/src: include multicast psyc
Date: Thu, 10 Oct 2013 20:08:53 +0200

Author: tg
Date: 2013-10-10 20:08:53 +0200 (Thu, 10 Oct 2013)
New Revision: 30121

Modified:
   gnunet/src/include/gnunet_protocols.h
   gnunet/src/include/gnunet_psyc_service.h
   gnunet/src/multicast/multicast_api.c
   gnunet/src/psyc/gnunet-service-psyc.c
   gnunet/src/psyc/psyc.h
   gnunet/src/psyc/psyc_api.c
   gnunet/src/psyc/test_psyc.c
Log:
PSYC: master msg transmission

Modified: gnunet/src/include/gnunet_protocols.h
===================================================================
--- gnunet/src/include/gnunet_protocols.h       2013-10-10 18:08:50 UTC (rev 
30120)
+++ gnunet/src/include/gnunet_protocols.h       2013-10-10 18:08:53 UTC (rev 
30121)
@@ -2083,50 +2083,33 @@
 #define GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_RM 690
 
 
-#define GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_METHOD 691
+#define GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD 691
 
-#define GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_MODIFIER 692
+#define GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER 692
 
-#define GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_MOD_CONT 693
+#define GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT 693
 
-#define GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_DATA 694
+#define GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA 694
 
 #define GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_ACK 695
 
 
-#define GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD 696
-
-#define GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER 697
-
-#define GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT 698
-
-#define GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA 699
-
-#define GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK 700
-
-
 #define GNUNET_MESSAGE_TYPE_PSYC_STORY_REQUEST 701
 
-#define GNUNET_MESSAGE_TYPE_PSYC_STORY_METHOD 702
+#define GNUNET_MESSAGE_TYPE_PSYC_STORY_RESPONSE 702
 
-#define GNUNET_MESSAGE_TYPE_PSYC_STORY_MODIFIER 703
 
-#define GNUNET_MESSAGE_TYPE_PSYC_STORY_MOD_CONT 704
+#define GNUNET_MESSAGE_TYPE_PSYC_STATE_GET 703
 
-#define GNUNET_MESSAGE_TYPE_PSYC_STORY_DATA 705
+#define GNUNET_MESSAGE_TYPE_PSYC_STATE_GET_PREFIX 704
 
-#define GNUNET_MESSAGE_TYPE_PSYC_STORY_ACK 706
+#define GNUNET_MESSAGE_TYPE_PSYC_STATE_RESPONSE 705
 
+#define GNUNET_MESSAGE_TYPE_PSYC_STATE_MODIFIER 706
 
-#define GNUNET_MESSAGE_TYPE_PSYC_STATE_GET 707
+#define GNUNET_MESSAGE_TYPE_PSYC_STATE_MOD_CONT 707
 
-#define GNUNET_MESSAGE_TYPE_PSYC_STATE_GET_PREFIX 708
 
-#define GNUNET_MESSAGE_TYPE_PSYC_STATE_MODIFIER 709
-
-#define GNUNET_MESSAGE_TYPE_PSYC_STATE_MOD_CONT 710
-
-
 
/*******************************************************************************
  * CONVERSATION message types
  
******************************************************************************/

Modified: gnunet/src/include/gnunet_psyc_service.h
===================================================================
--- gnunet/src/include/gnunet_psyc_service.h    2013-10-10 18:08:50 UTC (rev 
30120)
+++ gnunet/src/include/gnunet_psyc_service.h    2013-10-10 18:08:53 UTC (rev 
30121)
@@ -207,8 +207,8 @@
   uint32_t flags GNUNET_PACKED;
 
   /**
-   * Sending slave's public key. NULL if the message is from the master, or 
when
-   * transmitting a message.
+   * Sending slave's public key.
+   * NULL if the message is from the master, or when transmitting a message.
    */
   struct GNUNET_CRYPTO_EddsaPublicKey slave_key;
 
@@ -264,7 +264,7 @@
 struct GNUNET_PSYC_MessageData
 {
   /**
-   * Type: GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER
+   * Type: GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA
    */
   struct GNUNET_MessageHeader header;
 
@@ -367,7 +367,7 @@
 void
 GNUNET_PSYC_join_decision (struct GNUNET_PSYC_JoinHandle *jh,
                            int is_admitted,
-                           unsigned int relay_count,
+                           uint32_t relay_count,
                            const struct GNUNET_PeerIdentity *relays,
                            const char *method_name,
                            const struct GNUNET_ENV_Environment *env,
@@ -582,7 +582,7 @@
                         const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
                         const struct GNUNET_CRYPTO_EddsaPrivateKey *slave_key,
                         const struct GNUNET_PeerIdentity *origin,
-                        size_t relay_count,
+                        uint32_t relay_count,
                         const struct GNUNET_PeerIdentity *relays,
                         GNUNET_PSYC_Method method,
                         GNUNET_PSYC_JoinCallback join_cb,
@@ -591,7 +591,7 @@
                         const char *method_name,
                         const struct GNUNET_ENV_Environment *env,
                         const void *data,
-                        size_t data_size);
+                        uint16_t data_size);
 
 
 /**

Modified: gnunet/src/multicast/multicast_api.c
===================================================================
--- gnunet/src/multicast/multicast_api.c        2013-10-10 18:08:50 UTC (rev 
30120)
+++ gnunet/src/multicast/multicast_api.c        2013-10-10 18:08:53 UTC (rev 
30121)
@@ -30,7 +30,10 @@
 #include "gnunet_multicast_service.h"
 #include "multicast.h"
 
-/**
+#define LOG(kind,...) GNUNET_log_from (kind, "multicast-api",__VA_ARGS__)
+
+
+/** 
  * Handle for a request to send a message to all multicast group members
  * (from the origin).
  */
@@ -38,6 +41,7 @@
 {
   GNUNET_MULTICAST_OriginTransmitNotify notify;
   void *notify_cls;
+  struct GNUNET_MULTICAST_Origin *origin;
 
   uint64_t message_id;
   uint64_t group_generation;
@@ -353,6 +357,7 @@
 static void
 schedule_origin_to_all (void *cls, const struct GNUNET_SCHEDULER_TaskContext 
*tc)
 {
+  LOG (GNUNET_ERROR_TYPE_DEBUG, "schedule_origin_to_all()\n");
   struct GNUNET_MULTICAST_Origin *orig = cls;
   struct GNUNET_MULTICAST_OriginMessageHandle *mh = &orig->msg_handle;
 
@@ -361,12 +366,18 @@
     = GNUNET_malloc (sizeof (*msg) + buf_size);
   int ret = mh->notify (mh->notify_cls, &buf_size, &msg[1]);
 
-  if (ret != GNUNET_YES || ret != GNUNET_NO)
+  if (! (GNUNET_YES == ret || GNUNET_NO == ret)
+      || buf_size > GNUNET_MULTICAST_FRAGMENT_MAX_SIZE)
   {
+    LOG (GNUNET_ERROR_TYPE_ERROR,
+         "MasterTransmitNotify() returned error or invalid message size.\n");
     /* FIXME: handle error */
     return;
   }
 
+  if (GNUNET_NO == ret && 0 == buf_size)
+    return; /* Transmission paused. */
+
   msg->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE);
   msg->header.size = htons (buf_size);
   msg->message_id = mh->message_id;
@@ -393,12 +404,12 @@
    *        returned signed message.
    * FIXME: Also send to local members in this group.
    */
-  orig->message_cb (orig->cls, msg);
+  orig->message_cb (orig->cls, (const struct GNUNET_MessageHeader *) msg);
 
   if (GNUNET_NO == ret)
     GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply
                                   (GNUNET_TIME_UNIT_SECONDS, 1),
-                                  schedule_origin_to_all, mh);
+                                  schedule_origin_to_all, orig);
 
 }
 
@@ -421,6 +432,7 @@
                                 void *notify_cls)
 {
   struct GNUNET_MULTICAST_OriginMessageHandle *mh = &origin->msg_handle;
+  mh->origin = origin;
   mh->message_id = message_id;
   mh->group_generation = group_generation;
   mh->notify = notify;
@@ -441,7 +453,7 @@
 void
 GNUNET_MULTICAST_origin_to_all_resume (struct 
GNUNET_MULTICAST_OriginMessageHandle *mh)
 {
-
+  GNUNET_SCHEDULER_add_now (schedule_origin_to_all, mh->origin);
 }
 
 

Modified: gnunet/src/psyc/gnunet-service-psyc.c
===================================================================
--- gnunet/src/psyc/gnunet-service-psyc.c       2013-10-10 18:08:50 UTC (rev 
30120)
+++ gnunet/src/psyc/gnunet-service-psyc.c       2013-10-10 18:08:53 UTC (rev 
30121)
@@ -34,9 +34,7 @@
 #include "gnunet_psyc_service.h"
 #include "psyc.h"
 
-#define LOG(kind,...) GNUNET_log_from (kind, "psyc-api",__VA_ARGS__)
 
-
 /**
  * Handle to our current configuration.
  */
@@ -58,6 +56,11 @@
 static struct GNUNET_PSYCSTORE_Handle *store;
 
 /**
+ * channel's pub_key_hash -> struct Channel
+ */
+static struct GNUNET_CONTAINER_MultiHashMap *clients;
+
+/**
  * Message in the transmission queue.
  */
 struct TransmitMessage
@@ -81,6 +84,7 @@
   struct TransmitMessage *tmit_tail;
 
   char *tmit_buf;
+  GNUNET_SCHEDULER_TaskIdentifier tmit_task;
   uint32_t tmit_mod_count;
   uint32_t tmit_mod_recvd;
   uint16_t tmit_size;
@@ -96,8 +100,9 @@
 struct Master
 {
   struct Channel channel;
-  struct GNUNET_CRYPTO_EccPrivateKey private_key;
-  struct GNUNET_CRYPTO_EccPublicSignKey public_key;
+  struct GNUNET_CRYPTO_EccPrivateKey priv_key;
+  struct GNUNET_CRYPTO_EccPublicSignKey pub_key;
+  struct GNUNET_HashCode pub_key_hash;
 
   struct GNUNET_MULTICAST_Origin *origin;
   struct GNUNET_MULTICAST_OriginMessageHandle *tmit_handle;
@@ -120,13 +125,20 @@
 {
   struct Channel channel;
   struct GNUNET_CRYPTO_EccPrivateKey slave_key;
-  struct GNUNET_CRYPTO_EccPublicSignKey channel_key;
+  struct GNUNET_CRYPTO_EccPublicSignKey chan_key;
+  struct GNUNET_HashCode chan_key_hash;
 
   struct GNUNET_MULTICAST_Member *member;
   struct GNUNET_MULTICAST_MemberRequestHandle *tmit_handle;
 
+  struct GNUNET_PeerIdentity origin;
+  struct GNUNET_PeerIdentity *relays;
+  struct GNUNET_MessageHeader *join_req;
+
   uint64_t max_message_id;
   uint64_t max_request_id;
+
+  uint32_t relay_count;
 };
 
 
@@ -166,41 +178,151 @@
 
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Client %p disconnected\n", client);
 
-  struct Channel *ch = GNUNET_SERVER_client_get_user_context (client,
-                                                              struct Channel);
-  GNUNET_assert (NULL != ch);
+  struct Channel *ch
+    = GNUNET_SERVER_client_get_user_context (client, struct Channel);
+  if (NULL == ch)
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+                "User context is NULL in client_disconnect()\n");
+    GNUNET_break (0);
+    return;
+  }
 
   if (NULL != ch->tmit_buf)
   {
     GNUNET_free (ch->tmit_buf);
     ch->tmit_buf = NULL;
   }
+
+  if (ch->is_master)
+  {
+    struct Master *mst = (struct Master *) ch;
+    if (NULL != mst->origin)
+      GNUNET_MULTICAST_origin_stop (mst->origin);
+  }
+  else
+  {
+    struct Slave *slv = (struct Slave *) ch;
+    if (NULL != slv->join_req)
+      GNUNET_free (slv->join_req);
+    if (NULL != slv->relays)
+      GNUNET_free (slv->relays);
+    if (NULL != slv->member)
+      GNUNET_MULTICAST_member_part (slv->member);
+  }
+
   GNUNET_free (ch);
 }
 
+void
+join_cb (void *cls, const struct GNUNET_CRYPTO_EccPublicSignKey *member_key,
+         const struct GNUNET_MessageHeader *join_req,
+         struct GNUNET_MULTICAST_JoinHandle *jh)
+{
 
+}
+
 void
-counters_cb (void *cls, uint64_t max_fragment_id, uint64_t max_message_id,
-             uint64_t max_group_generation, uint64_t max_state_message_id)
+membership_test_cb (void *cls,
+                    const struct GNUNET_CRYPTO_EccPublicSignKey *member_key,
+                    uint64_t message_id, uint64_t group_generation,
+                    struct GNUNET_MULTICAST_MembershipTestHandle *mth)
 {
-  struct Channel *ch = cls;
+
+}
+
+void
+replay_fragment_cb (void *cls,
+                    const struct GNUNET_CRYPTO_EccPublicSignKey *member_key,
+                    uint64_t fragment_id, uint64_t flags,
+                    struct GNUNET_MULTICAST_ReplayHandle *rh)
+{
+
+}
+
+void
+replay_message_cb (void *cls,
+                   const struct GNUNET_CRYPTO_EccPublicSignKey *member_key,
+                   uint64_t message_id,
+                   uint64_t fragment_offset,
+                   uint64_t flags,
+                   struct GNUNET_MULTICAST_ReplayHandle *rh)
+{
+
+}
+
+void
+request_cb (void *cls, const struct GNUNET_CRYPTO_EccPublicSignKey *member_key,
+            const struct GNUNET_MessageHeader *req,
+            enum GNUNET_MULTICAST_MessageFlags flags)
+{
+
+}
+
+void
+message_cb (void *cls, const struct GNUNET_MessageHeader *msg)
+{
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Received message of type %u from multicast.\n",
+              ntohs (msg->type));
+}
+
+void
+master_counters_cb (void *cls, int result, uint64_t max_fragment_id,
+                    uint64_t max_message_id, uint64_t max_group_generation,
+                    uint64_t max_state_message_id)
+{
+  struct Master *mst = cls;
+  struct Channel *ch = &mst->channel;
   struct CountersResult *res = GNUNET_malloc (sizeof (*res));
+  res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK);
   res->header.size = htons (sizeof (*res));
+  res->result_code = htonl (result);
   res->max_message_id = GNUNET_htonll (max_message_id);
 
-  if (ch->is_master)
+  if (GNUNET_OK == result || GNUNET_NO == result)
   {
-    struct Master *mst = cls;
     mst->max_message_id = max_message_id;
     mst->max_state_message_id = max_state_message_id;
     mst->max_group_generation = max_group_generation;
-    res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK);
+    mst->origin
+      = GNUNET_MULTICAST_origin_start (cfg, &mst->priv_key,
+                                       max_fragment_id + 1,
+                                       join_cb, membership_test_cb,
+                                       replay_fragment_cb, replay_message_cb,
+                                       request_cb, message_cb, ch);
   }
-  else
+  GNUNET_SERVER_notification_context_add (nc, ch->client);
+  GNUNET_SERVER_notification_context_unicast (nc, ch->client, &res->header,
+                                              GNUNET_NO);
+  GNUNET_free (res);
+}
+
+
+void
+slave_counters_cb (void *cls, int result, uint64_t max_fragment_id,
+                   uint64_t max_message_id, uint64_t max_group_generation,
+                   uint64_t max_state_message_id)
+{
+  struct Slave *slv = cls;
+  struct Channel *ch = &slv->channel;
+  struct CountersResult *res = GNUNET_malloc (sizeof (*res));
+  res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK);
+  res->header.size = htons (sizeof (*res));
+  res->result_code = htonl (result);
+  res->max_message_id = GNUNET_htonll (max_message_id);
+
+  if (GNUNET_OK == result || GNUNET_NO == result)
   {
-    struct Slave *slv = cls;
     slv->max_message_id = max_message_id;
-    res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK);
+    slv->member
+      = GNUNET_MULTICAST_member_join (cfg, &slv->chan_key, &slv->slave_key,
+                                      &slv->origin,
+                                      slv->relay_count, slv->relays,
+                                      slv->join_req, join_cb,
+                                      membership_test_cb,
+                                      replay_fragment_cb, replay_message_cb,
+                                      message_cb, ch);
   }
 
   GNUNET_SERVER_notification_context_add (nc, ch->client);
@@ -220,14 +342,17 @@
   mst->channel.client = client;
   mst->channel.is_master = GNUNET_YES;
   mst->policy = ntohl (req->policy);
-  mst->private_key = req->channel_key;
-  GNUNET_CRYPTO_ecc_key_get_public_for_signature (&mst->private_key,
-                                                  &mst->public_key);
+  mst->priv_key = req->channel_key;
+  GNUNET_CRYPTO_ecc_key_get_public_for_signature (&mst->priv_key,
+                                                  &mst->pub_key);
+  GNUNET_CRYPTO_hash (&mst->pub_key, sizeof (mst->pub_key), 
&mst->pub_key_hash);
 
-  GNUNET_PSYCSTORE_counters_get (store, &mst->public_key,
-                                 counters_cb, mst);
+  GNUNET_PSYCSTORE_counters_get (store, &mst->pub_key,
+                                 master_counters_cb, mst);
 
-  GNUNET_SERVER_client_set_user_context (client, mst);
+  GNUNET_SERVER_client_set_user_context (client, &mst->channel);
+  GNUNET_CONTAINER_multihashmap_put (clients, &mst->pub_key_hash, mst,
+                                     
GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
   GNUNET_SERVER_receive_done (client, GNUNET_OK);
 }
 
@@ -241,13 +366,25 @@
   struct Slave *slv = GNUNET_new (struct Slave);
   slv->channel.client = client;
   slv->channel.is_master = GNUNET_NO;
-  slv->channel_key = req->channel_key;
   slv->slave_key = req->slave_key;
+  slv->chan_key = req->channel_key;
+  GNUNET_CRYPTO_hash (&slv->chan_key, sizeof (slv->chan_key),
+                      &slv->chan_key_hash);
+  slv->origin = req->origin;
+  slv->relay_count = ntohl (req->relay_count);
 
-  GNUNET_PSYCSTORE_counters_get (store, &slv->channel_key,
-                                 counters_cb, slv);
+  const struct GNUNET_PeerIdentity *relays
+    = (const struct GNUNET_PeerIdentity *) &req[1];
+  slv->relays
+    = GNUNET_malloc (slv->relay_count * sizeof (struct GNUNET_PeerIdentity));
+  uint32_t i;
+  for (i = 0; i < slv->relay_count; i++)
+    memcpy (&slv->relays[i], &relays[i], sizeof (*relays));
 
-  GNUNET_SERVER_client_set_user_context (client, slv);
+  GNUNET_PSYCSTORE_counters_get (store, &slv->chan_key,
+                                 slave_counters_cb, slv);
+
+  GNUNET_SERVER_client_set_user_context (client, &slv->channel);
   GNUNET_SERVER_receive_done (client, GNUNET_OK);
 }
 
@@ -268,34 +405,40 @@
 
 
 static int
-transmit_notify (void *cls, uint64_t fragment_id, size_t *data_size, void 
*data)
+transmit_notify (void *cls, size_t *data_size, void *data)
 {
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "transmit_notify()\n");
   struct Channel *ch = cls;
   struct TransmitMessage *msg = ch->tmit_head;
 
-  if (NULL == msg || *data_size < msg->size)
+  if (NULL == msg || *data_size < ntohs (msg->size))
   {
     *data_size = 0;
     return GNUNET_NO;
   }
 
-  memcpy (data, msg->buf, msg->size);
-  *data_size = msg->size;
+  *data_size = ntohs (msg->size);
+  memcpy (data, msg->buf, *data_size);
 
   GNUNET_free (ch->tmit_buf);
+  ch->tmit_buf = NULL;
   GNUNET_CONTAINER_DLL_remove (ch->tmit_head, ch->tmit_tail, msg);
 
   return (GNUNET_YES == ch->in_transmit) ? GNUNET_NO : GNUNET_YES;
 }
 
 
-static int
-master_transmit_message (struct Master *mst)
+static void
+master_transmit_message (void *cls,
+                         const struct GNUNET_SCHEDULER_TaskContext *tc)
 {
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "master_transmit_message()\n");
+  struct Master *mst = cls;
+  mst->channel.tmit_task = 0;
   if (NULL == mst->tmit_handle)
   {
     mst->tmit_handle
-      = GNUNET_MULTICAST_origin_to_all (mst->origin, mst->max_message_id,
+      = GNUNET_MULTICAST_origin_to_all (mst->origin, ++mst->max_message_id,
                                         mst->max_group_generation,
                                         transmit_notify, mst);
   }
@@ -303,24 +446,25 @@
   {
     GNUNET_MULTICAST_origin_to_all_resume (mst->tmit_handle);
   }
-  return GNUNET_OK;
 }
 
 
-static int
-slave_transmit_message (struct Slave *slv)
+static void
+slave_transmit_message (void *cls,
+                        const struct GNUNET_SCHEDULER_TaskContext *tc)
 {
+  struct Slave *slv = cls;
+  slv->channel.tmit_task = 0;
   if (NULL == slv->tmit_handle)
   {
     slv->tmit_handle
-      = GNUNET_MULTICAST_member_to_origin(slv->member, slv->max_request_id,
+      = GNUNET_MULTICAST_member_to_origin(slv->member, ++slv->max_request_id,
                                           transmit_notify, slv);
   }
   else
   {
     GNUNET_MULTICAST_member_to_origin_resume (slv->tmit_handle);
   }
-  return GNUNET_OK;
 }
 
 
@@ -328,6 +472,7 @@
 buffer_message (struct Channel *ch, const struct GNUNET_MessageHeader *msg)
 {
   uint16_t size = ntohs (msg->size);
+  struct GNUNET_TIME_Relative tmit_delay = GNUNET_TIME_UNIT_ZERO;
 
   if (GNUNET_MULTICAST_FRAGMENT_MAX_SIZE < size)
     return GNUNET_SYSERR;
@@ -353,12 +498,17 @@
     tmit_msg->size = size;
     tmit_msg->status = ch->tmit_status;
     GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, tmit_msg);
-
-    ch->is_master
-      ? master_transmit_message ((struct Master *) ch)
-      : slave_transmit_message ((struct Slave *) ch);
+    tmit_delay = GNUNET_TIME_UNIT_ZERO;
   }
 
+  if (0 != ch->tmit_task)
+    GNUNET_SCHEDULER_cancel (ch->tmit_task);
+
+  ch->tmit_task
+    = ch->is_master
+    ? GNUNET_SCHEDULER_add_delayed (tmit_delay, master_transmit_message, ch)
+    : GNUNET_SCHEDULER_add_delayed (tmit_delay, slave_transmit_message, ch);
+
   return GNUNET_OK;
 }
 
@@ -368,8 +518,8 @@
 {
   const struct GNUNET_PSYC_MessageMethod *meth
     = (const struct GNUNET_PSYC_MessageMethod *) msg;
-  struct Channel *ch = GNUNET_SERVER_client_get_user_context (client,
-                                                              struct Channel);
+  struct Channel *ch
+    = GNUNET_SERVER_client_get_user_context (client, struct Channel);
   GNUNET_assert (NULL != ch);
 
   if (GNUNET_NO != ch->in_transmit)
@@ -378,6 +528,7 @@
     return;
   }
 
+  ch->in_transmit = GNUNET_YES;
   ch->tmit_buf = NULL;
   ch->tmit_size = 0;
   ch->tmit_mod_recvd = 0;
@@ -388,6 +539,8 @@
 
   if (0 == ch->tmit_mod_count)
     send_transmit_ack (ch);
+
+  GNUNET_SERVER_receive_done (client, GNUNET_OK);
 };
 
 
@@ -397,8 +550,8 @@
 {
   const struct GNUNET_PSYC_MessageModifier *mod
     = (const struct GNUNET_PSYC_MessageModifier *) msg;
-  struct Channel *ch = GNUNET_SERVER_client_get_user_context (client,
-                                                              struct Channel);
+  struct Channel *ch
+    = GNUNET_SERVER_client_get_user_context (client, struct Channel);
   GNUNET_assert (NULL != ch);
 
   ch->tmit_mod_recvd++;
@@ -406,6 +559,8 @@
 
   if (ch->tmit_mod_recvd == ch->tmit_mod_count)
     send_transmit_ack (ch);
+
+  GNUNET_SERVER_receive_done (client, GNUNET_OK);
 };
 
 
@@ -415,13 +570,18 @@
 {
   const struct GNUNET_PSYC_MessageData *data
     = (const struct GNUNET_PSYC_MessageData *) msg;
-  struct Channel *ch = GNUNET_SERVER_client_get_user_context (client,
-                                                              struct Channel);
+  struct Channel *ch
+    = GNUNET_SERVER_client_get_user_context (client, struct Channel);
   GNUNET_assert (NULL != ch);
 
-  ch->tmit_status = data->status;
+  ch->tmit_status = ntohs (data->status);
   buffer_message (ch, msg);
   send_transmit_ack (ch);
+
+  if (GNUNET_PSYC_DATA_CONT != ch->tmit_status)
+    ch->in_transmit = GNUNET_NO;
+
+  GNUNET_SERVER_receive_done (client, GNUNET_OK);
 };
 
 
@@ -444,13 +604,13 @@
       GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN, 0 },
 
     { &handle_transmit_method, NULL,
-      GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_METHOD, 0 },
+      GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD, 0 },
 
     { &handle_transmit_modifier, NULL,
-      GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_MODIFIER, 0 },
+      GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER, 0 },
 
     { &handle_transmit_data, NULL,
-      GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_DATA, 0 },
+      GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA, 0 },
 
     { NULL, NULL, 0, 0 }
   };
@@ -458,6 +618,7 @@
   cfg = c;
   store = GNUNET_PSYCSTORE_connect (cfg);
   stats = GNUNET_STATISTICS_create ("psyc", cfg);
+  clients = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
   nc = GNUNET_SERVER_notification_context_create (server, 1);
   GNUNET_SERVER_add_handlers (server, handlers);
   GNUNET_SERVER_disconnect_notify (server, &client_disconnect, NULL);

Modified: gnunet/src/psyc/psyc.h
===================================================================
--- gnunet/src/psyc/psyc.h      2013-10-10 18:08:50 UTC (rev 30120)
+++ gnunet/src/psyc/psyc.h      2013-10-10 18:08:53 UTC (rev 30121)
@@ -65,6 +65,11 @@
    */
   struct GNUNET_MessageHeader header;
 
+  /**
+   * Status code for the operation.
+   */
+  int32_t result_code GNUNET_PACKED;
+
   uint64_t max_message_id;
 };
 
@@ -121,6 +126,8 @@
 
   struct GNUNET_CRYPTO_EccPrivateKey slave_key;
 
+  struct GNUNET_PeerIdentity origin;
+
   /* Followed by struct GNUNET_PeerIdentity relays[relay_count] */
 };
 

Modified: gnunet/src/psyc/psyc_api.c
===================================================================
--- gnunet/src/psyc/psyc_api.c  2013-10-10 18:08:50 UTC (rev 30120)
+++ gnunet/src/psyc/psyc_api.c  2013-10-10 18:08:53 UTC (rev 30121)
@@ -106,16 +106,40 @@
    * Are we currently transmitting a message?
    */
   int in_transmit;
+
+  /**
+   * Is this a master or slave channel?
+   */
+  int is_master;
+
+  /**
+   * Buffer space available for transmitting the next data fragment.
+   */
+  uint16_t tmit_buf_avail;
 };
 
 
 /**
+ * Handle for a pending PSYC transmission operation.
+ */
+struct GNUNET_PSYC_MasterTransmitHandle
+{
+  struct GNUNET_PSYC_Master *master;
+  GNUNET_PSYC_MasterTransmitNotify notify;
+  void *notify_cls;
+  enum GNUNET_PSYC_DataStatus status;
+};
+
+
+/**
  * Handle for the master of a PSYC channel.
  */
 struct GNUNET_PSYC_Master
 {
   struct GNUNET_PSYC_Channel ch;
 
+  struct GNUNET_PSYC_MasterTransmitHandle *tmit;
+
   GNUNET_PSYC_MasterStartCallback start_cb;
 
   uint64_t max_message_id;
@@ -146,19 +170,6 @@
 /**
  * Handle for a pending PSYC transmission operation.
  */
-struct GNUNET_PSYC_MasterTransmitHandle
-{
-  struct GNUNET_PSYC_Master *master;
-  const struct GNUNET_ENV_Environment *env;
-  GNUNET_PSYC_MasterTransmitNotify notify;
-  void *notify_cls;
-  enum GNUNET_PSYC_MasterTransmitFlags flags;
-};
-
-
-/**
- * Handle for a pending PSYC transmission operation.
- */
 struct GNUNET_PSYC_SlaveTransmitHandle
 {
 
@@ -184,10 +195,10 @@
 
 
 /**
- * Try again to connect to the PSYCstore service.
+ * Try again to connect to the PSYC service.
  *
- * @param cls handle to the PSYCstore service.
- * @param tc scheduler context
+ * @param cls Handle to the PSYC service.
+ * @param tc Scheduler context
  */
 static void
 reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
@@ -215,7 +226,7 @@
   }
   c->in_receive = GNUNET_NO;
   LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "Scheduling task to reconnect to PSYCstore service in %s.\n",
+       "Scheduling task to reconnect to PSYC service in %s.\n",
        GNUNET_STRINGS_relative_time_to_string (c->reconnect_delay, 
GNUNET_YES));
   c->reconnect_task =
       GNUNET_SCHEDULER_add_delayed (c->reconnect_delay, &reconnect, c);
@@ -226,12 +237,56 @@
 /**
  * Schedule transmission of the next message from our queue.
  *
- * @param h PSYCstore handle
+ * @param h PSYC handle
  */
 static void
 transmit_next (struct GNUNET_PSYC_Channel *c);
 
 
+void
+master_transmit_data (struct GNUNET_PSYC_Master *mst)
+{
+  struct GNUNET_PSYC_Channel *ch = &mst->ch;
+  size_t data_size = ch->tmit_buf_avail;
+  struct GNUNET_PSYC_MessageData *pdata;
+  struct OperationHandle *op
+    = GNUNET_malloc (sizeof (*op) + sizeof (*pdata) + data_size);
+  pdata = (struct GNUNET_PSYC_MessageData *) &op[1];
+  op->msg = (struct GNUNET_MessageHeader *) pdata;
+  pdata->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA);
+
+  switch (mst->tmit->notify (mst->tmit->notify_cls, &data_size, &pdata[1]))
+  {
+  case GNUNET_NO:
+    mst->tmit->status = GNUNET_PSYC_DATA_CONT;
+    break;
+
+  case GNUNET_YES:
+    mst->tmit->status = GNUNET_PSYC_DATA_END;
+    break;
+
+  default:
+    mst->tmit->status = GNUNET_PSYC_DATA_CANCEL;
+    data_size = 0;
+    LOG (GNUNET_ERROR_TYPE_ERROR, "MasterTransmitNotify returned error\n");
+  }
+
+  if ((GNUNET_PSYC_DATA_CONT == mst->tmit->status && 0 == data_size))
+  {
+    /* Transmission paused, nothing to send. */
+    GNUNET_free (op);
+  }
+  else
+  {
+    GNUNET_assert (data_size <= ch->tmit_buf_avail);
+    pdata->header.size = htons (sizeof (*pdata) + data_size);
+    pdata->status = htons (mst->tmit->status);
+    GNUNET_CONTAINER_DLL_insert_tail (ch->transmit_head, ch->transmit_tail, 
op);
+    transmit_next (ch);
+  }
+}
+
+
 /**
  * Type of a function to call when we receive a message
  * from the service.
@@ -253,8 +308,8 @@
   }
   uint16_t size_eq = 0;
   uint16_t size_min = 0;
-  const uint16_t size = ntohs (msg->size);
-  const uint16_t type = ntohs (msg->type);
+  uint16_t size = ntohs (msg->size);
+  uint16_t type = ntohs (msg->type);
 
   LOG (GNUNET_ERROR_TYPE_DEBUG,
        "Received message of type %d from PSYC service\n", type);
@@ -265,6 +320,9 @@
   case GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK:
     size_eq = sizeof (struct CountersResult);
     break;
+  case GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_ACK:
+    size_eq = sizeof (struct TransmitAck);
+    break;
   }
 
   if (! ((0 < size_eq && size == size_eq)
@@ -276,6 +334,7 @@
   }
 
   struct CountersResult *cres;
+  struct TransmitAck *tack;
 
   switch (type)
   {
@@ -294,17 +353,39 @@
       mst->join_ack_cb (ch->cb_cls, mst->max_message_id);
 #endif
     break;
+
+  case GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_ACK:
+    tack = (struct TransmitAck *) msg;
+    if (ch->is_master)
+    {
+      GNUNET_assert (NULL != mst->tmit);
+      if (GNUNET_PSYC_DATA_CONT != mst->tmit->status
+          || NULL == mst->tmit->notify)
+      {
+        GNUNET_free (mst->tmit);
+        mst->tmit = NULL;
+      }
+      else
+      {
+        ch->tmit_buf_avail = ntohs (tack->buf_avail);
+        master_transmit_data (mst);
+      }
+    }
+    else
+    {
+      /* TODO: slave */
+    }
+    break;
   }
 
   GNUNET_CLIENT_receive (ch->client, &message_handler, ch,
                          GNUNET_TIME_UNIT_FOREVER_REL);
 }
 
-
 /**
  * Transmit next message to service.
  *
- * @param cls The 'struct GNUNET_PSYCSTORE_Handle'.
+ * @param cls The 'struct GNUNET_PSYC_Channel'.
  * @param size Number of bytes available in buf.
  * @param buf Where to copy the message.
  * @return Number of bytes copied to buf.
@@ -326,7 +407,7 @@
     return 0;
   }
   LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "Sending message of type %d to PSYCstore service\n",
+       "Sending message of type %d to PSYC service\n",
        ntohs (op->msg->type));
   memcpy (buf, op->msg, ret);
 
@@ -349,7 +430,7 @@
 /**
  * Schedule transmission of the next message from our queue.
  *
- * @param h PSYCstore handle.
+ * @param h PSYC handle.
  */
 static void
 transmit_next (struct GNUNET_PSYC_Channel *ch)
@@ -391,14 +472,12 @@
   if (NULL == ch->transmit_head ||
       ch->transmit_head->msg->type != ch->reconnect_msg->type)
   {
-    struct OperationHandle *op
-      = GNUNET_malloc (sizeof (struct OperationHandle)
-                       + ntohs (ch->reconnect_msg->size));
-    memcpy (&op[1], ch->reconnect_msg, ntohs (ch->reconnect_msg->size));
+    uint16_t reconn_size = ntohs (ch->reconnect_msg->size);
+    struct OperationHandle *op = GNUNET_malloc (sizeof (*op) + reconn_size);
+    memcpy (&op[1], ch->reconnect_msg, reconn_size);
     op->msg = (struct GNUNET_MessageHeader *) &op[1];
     GNUNET_CONTAINER_DLL_insert (ch->transmit_head, ch->transmit_tail, op);
   }
-
   transmit_next (ch);
 }
 
@@ -414,7 +493,12 @@
 {
   struct GNUNET_PSYC_Channel *ch = c;
   GNUNET_assert (NULL != ch);
-  GNUNET_assert (ch->transmit_head == ch->transmit_tail);
+  if (ch->transmit_head != ch->transmit_tail)
+  {
+    LOG (GNUNET_ERROR_TYPE_ERROR,
+         "Disconnecting while there are still outstanding messages!\n");
+    GNUNET_break (0);
+  }
   if (ch->reconnect_task != GNUNET_SCHEDULER_NO_TASK)
   {
     GNUNET_SCHEDULER_cancel (ch->reconnect_task);
@@ -431,7 +515,10 @@
     ch->client = NULL;
   }
   if (NULL != ch->reconnect_msg)
+  {
+    GNUNET_free (ch->reconnect_msg);
     ch->reconnect_msg = NULL;
+  }
 }
 
 
@@ -475,12 +562,13 @@
   struct GNUNET_PSYC_Channel *ch = &mst->ch;
   struct MasterStartRequest *req = GNUNET_malloc (sizeof (*req));
 
-  req->header.size = htons (sizeof (*req) + sizeof (*channel_key));
+  req->header.size = htons (sizeof (*req));
   req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START);
   req->channel_key = *channel_key;
   req->policy = policy;
 
   ch->cfg = cfg;
+  ch->is_master = GNUNET_YES;
   ch->reconnect_msg = (struct GNUNET_MessageHeader *) req;
   ch->reconnect_delay = GNUNET_TIME_UNIT_ZERO;
   ch->reconnect_task = GNUNET_SCHEDULER_add_now (&reconnect, mst);
@@ -532,7 +620,7 @@
 void
 GNUNET_PSYC_join_decision (struct GNUNET_PSYC_JoinHandle *jh,
                            int is_admitted,
-                           unsigned int relay_count,
+                           uint32_t relay_count,
                            const struct GNUNET_PeerIdentity *relays,
                            const char *method_name,
                            const struct GNUNET_ENV_Environment *env,
@@ -556,13 +644,13 @@
   pmod = (struct GNUNET_PSYC_MessageModifier *) &op[1];
   op->msg = (struct GNUNET_MessageHeader *) pmod;
 
-  pmod->header.type = GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_MODIFIER;
+  pmod->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER);
   pmod->header.size = htons (sizeof (*pmod) + name_size + mod->value_size);
   pmod->name_size = htons (name_size);
   memcpy (&pmod[1], mod->name, name_size);
-  memcpy ((void *) &pmod[1] + name_size, mod->value, mod->value_size);
+  memcpy ((char *) &pmod[1] + name_size, mod->value, mod->value_size);
 
-  GNUNET_CONTAINER_DLL_insert (ch->transmit_head, ch->transmit_tail, op);
+  GNUNET_CONTAINER_DLL_insert_tail (ch->transmit_head, ch->transmit_tail, op);
   return GNUNET_YES;
 }
 
@@ -594,33 +682,45 @@
     return NULL;
   ch->in_transmit = GNUNET_YES;
 
+  size_t size = strlen (method_name) + 1;
   struct GNUNET_PSYC_MessageMethod *pmeth;
-  struct OperationHandle *op = GNUNET_malloc (sizeof (*op) + sizeof (*pmeth));
+  struct OperationHandle *op
+    = GNUNET_malloc (sizeof (*op) + sizeof (*pmeth) + size);
   pmeth = (struct GNUNET_PSYC_MessageMethod *) &op[1];
   op->msg = (struct GNUNET_MessageHeader *) pmeth;
 
-  pmeth->header.type = GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_METHOD;
-  size_t size = strlen (method_name) + 1;
+  pmeth->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD);
   pmeth->header.size = htons (sizeof (*pmeth) + size);
   pmeth->flags = htonl (flags);
-  pmeth->mod_count
-    = GNUNET_ntohll (GNUNET_ENV_environment_get_mod_count (env));
+  pmeth->mod_count = GNUNET_ntohll (GNUNET_ENV_environment_get_mod_count 
(env));
   memcpy (&pmeth[1], method_name, size);
 
-  GNUNET_CONTAINER_DLL_insert (ch->transmit_head, ch->transmit_tail, op);
-
+  GNUNET_CONTAINER_DLL_insert_tail (ch->transmit_head, ch->transmit_tail, op);
   GNUNET_ENV_environment_iterate (env, send_modifier, mst);
+  transmit_next (ch);
 
-  struct GNUNET_PSYC_MasterTransmitHandle *th = GNUNET_malloc (sizeof (*th));
-  th->master = mst;
-  th->env = env;
-  th->notify = notify;
-  th->notify_cls = notify_cls;
-  return th;
+  mst->tmit = GNUNET_malloc (sizeof (*mst->tmit));
+  mst->tmit->master = mst;
+  mst->tmit->notify = notify;
+  mst->tmit->notify_cls = notify_cls;
+  mst->tmit->status = GNUNET_PSYC_DATA_CONT;
+  return mst->tmit;
 }
 
 
 /**
+ * Resume transmission to the channel.
+ *
+ * @param th Handle of the request that is being resumed.
+ */
+void
+GNUNET_PSYC_master_transmit_resume (struct GNUNET_PSYC_MasterTransmitHandle 
*th)
+{
+  master_transmit_data (th->master);
+}
+
+
+/**
  * Abort transmission request to the channel.
  *
  * @param th Handle of the request that is being aborted.
@@ -671,7 +771,7 @@
                         const struct GNUNET_CRYPTO_EccPublicSignKey 
*channel_key,
                         const struct GNUNET_CRYPTO_EccPrivateKey *slave_key,
                         const struct GNUNET_PeerIdentity *origin,
-                        size_t relay_count,
+                        uint32_t relay_count,
                         const struct GNUNET_PeerIdentity *relays,
                         GNUNET_PSYC_Method method,
                         GNUNET_PSYC_JoinCallback join_cb,
@@ -680,7 +780,7 @@
                         const char *method_name,
                         const struct GNUNET_ENV_Environment *env,
                         const void *data,
-                        size_t data_size)
+                        uint16_t data_size)
 {
   struct GNUNET_PSYC_Slave *slv = GNUNET_malloc (sizeof (*slv));
   struct GNUNET_PSYC_Channel *ch = &slv->ch;
@@ -692,10 +792,12 @@
   req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN);
   req->channel_key = *channel_key;
   req->slave_key = *slave_key;
+  req->origin = *origin;
   req->relay_count = relay_count;
   memcpy (&req[1], relays, relay_count * sizeof (*relays));
 
   ch->cfg = cfg;
+  ch->is_master = GNUNET_NO;
   ch->reconnect_msg = (struct GNUNET_MessageHeader *) req;
   ch->reconnect_delay = GNUNET_TIME_UNIT_ZERO;
   ch->reconnect_task = GNUNET_SCHEDULER_add_now (&reconnect, slv);
@@ -746,6 +848,18 @@
 
 
 /**
+ * Resume transmission to the master.
+ *
+ * @param th Handle of the request that is being resumed.
+ */
+void
+GNUNET_PSYC_slave_transmit_resume (struct GNUNET_PSYC_MasterTransmitHandle *th)
+{
+
+}
+
+
+/**
  * Abort transmission request to master.
  *
  * @param th Handle of the request that is being aborted.
@@ -822,7 +936,7 @@
   slvadd->announced_at = GNUNET_htonll (announced_at);
   slvadd->effective_since = GNUNET_htonll (effective_since);
 
-  GNUNET_CONTAINER_DLL_insert (ch->transmit_head, ch->transmit_tail, op);
+  GNUNET_CONTAINER_DLL_insert_tail (ch->transmit_head, ch->transmit_tail, op);
   transmit_next (ch);
 }
 
@@ -863,7 +977,7 @@
   slvrm->header.size = htons (sizeof (*slvrm));
   slvrm->announced_at = GNUNET_htonll (announced_at);
 
-  GNUNET_CONTAINER_DLL_insert (ch->transmit_head, ch->transmit_tail, op);
+  GNUNET_CONTAINER_DLL_insert_tail (ch->transmit_head, ch->transmit_tail, op);
   transmit_next (ch);
 }
 

Modified: gnunet/src/psyc/test_psyc.c
===================================================================
--- gnunet/src/psyc/test_psyc.c 2013-10-10 18:08:50 UTC (rev 30120)
+++ gnunet/src/psyc/test_psyc.c 2013-10-10 18:08:53 UTC (rev 30121)
@@ -19,8 +19,8 @@
  */
 
 /**
- * @file psycstore/test_psycstore.c
- * @brief Test for the PSYCstore service.
+ * @file psyc/test_psyc.c
+ * @brief Test for the PSYC service.
  * @author Gabor X Toth
  * @author Christian Grothoff
  */
@@ -30,6 +30,7 @@
 #include "gnunet_common.h"
 #include "gnunet_util_lib.h"
 #include "gnunet_testing_lib.h"
+#include "gnunet_env_lib.h"
 #include "gnunet_psyc_service.h"
 
 #define TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 10)
@@ -59,6 +60,8 @@
 static struct GNUNET_CRYPTO_EccPublicSignKey channel_pub_key;
 static struct GNUNET_CRYPTO_EccPublicSignKey slave_pub_key;
 
+struct GNUNET_PSYC_MasterTransmitHandle *mth;
+
 /**
  * Clean up all resources used.
  */
@@ -120,11 +123,14 @@
 
 static int
 method (void *cls, const struct GNUNET_CRYPTO_EccPublicSignKey *slave_key,
-        uint64_t message_id, const char *method_name,
+        uint64_t message_id, const char *name,
         size_t modifier_count, const struct GNUNET_ENV_Modifier *modifiers,
         uint64_t data_offset, const void *data, size_t data_size,
         enum GNUNET_PSYC_MessageFlags flags)
 {
+  GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+              "Method: %s, modifiers: %lu, flags: %u\n%.*s\n",
+              name, modifier_count, flags, data_size, data);
   return GNUNET_OK;
 }
 
@@ -138,11 +144,72 @@
   return GNUNET_OK;
 }
 
+struct TransmitClosure
+{
+  struct GNUNET_PSYC_MasterTransmitHandle *handle;
+  uint8_t n;
+  uint8_t fragment_count;
+  char *fragments[16];
+  uint16_t fragment_sizes[16];
+};
 
+
+static void
+transmit_resume (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+  GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Transmit resume\n");
+  struct TransmitClosure *tmit = cls;
+  GNUNET_PSYC_master_transmit_resume (tmit->handle);
+}
+
+
+static int
+transmit_notify (void *cls, size_t *data_size, void *data)
+{
+  struct TransmitClosure *tmit = cls;
+  GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+              "Transmit notify: %lu bytes\n", *data_size);
+
+  if (tmit->fragment_count <= tmit->n)
+    return GNUNET_YES;
+
+  GNUNET_assert (tmit->fragment_sizes[tmit->n] <= *data_size);
+
+  *data_size = tmit->fragment_sizes[tmit->n];
+  memcpy (data, tmit->fragments[tmit->n], *data_size);
+  tmit->n++;
+
+  if (tmit->n == tmit->fragment_count - 1)
+  {
+    /* Send last fragment later. */
+    GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS, &transmit_resume,
+                                  tmit);
+    *data_size = 0;
+    return GNUNET_NO;
+  }
+  return tmit->n <= tmit->fragment_count ? GNUNET_NO : GNUNET_YES;
+}
+
 void
 master_started (void *cls, uint64_t max_message_id)
 {
   GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Master started: %lu\n", 
max_message_id);
+
+  struct GNUNET_ENV_Environment *env = GNUNET_ENV_environment_create ();
+  GNUNET_ENV_environment_add_mod (env, GNUNET_ENV_OP_ASSIGN,
+                                  "_foo", "bar baz", 7);
+  GNUNET_ENV_environment_add_mod (env, GNUNET_ENV_OP_ASSIGN,
+                                  "_foo_bar", "foo bar baz", 11);
+
+  struct TransmitClosure *tmit = GNUNET_new (struct TransmitClosure);
+  tmit->fragment_count = 2;
+  tmit->fragments[0] = "foo bar";
+  tmit->fragment_sizes[0] = 7;
+  tmit->fragments[1] = "baz!";
+  tmit->fragment_sizes[1] = 4;
+  tmit->handle
+    = GNUNET_PSYC_master_transmit (mst, "_test", env, transmit_notify, tmit,
+                                   GNUNET_PSYC_MASTER_TRANSMIT_INC_GROUP_GEN);
 }
 
 
@@ -157,7 +224,7 @@
  * Main function of the test, run from scheduler.
  *
  * @param cls NULL
- * @param cfg configuration we use (also to connect to PSYCstore service)
+ * @param cfg configuration we use (also to connect to PSYC service)
  * @param peer handle to access more of the peer (not used)
  */
 static void
@@ -182,9 +249,18 @@
   mst = GNUNET_PSYC_master_start (cfg, channel_key,
                                   GNUNET_PSYC_CHANNEL_PRIVATE,
                                   &method, &join, &master_started, NULL);
-
-  slv = GNUNET_PSYC_slave_join (cfg, &channel_pub_key, slave_key,
-                                &method, &join, &slave_joined, NULL);
+  return;
+  struct GNUNET_PeerIdentity origin;
+  struct GNUNET_PeerIdentity relays[16];
+  struct GNUNET_ENV_Environment *env = GNUNET_ENV_environment_create ();
+  GNUNET_ENV_environment_add_mod (env, GNUNET_ENV_OP_ASSIGN,
+                                  "_foo", "bar baz", 7);
+  GNUNET_ENV_environment_add_mod (env, GNUNET_ENV_OP_ASSIGN,
+                                  "_foo_bar", "foo bar baz", 11);
+  slv = GNUNET_PSYC_slave_join (cfg, &channel_pub_key, slave_key, &origin,
+                                16, relays, &method, &join, &slave_joined,
+                                NULL, "_request_join", env, "some data", 9);
+  GNUNET_ENV_environment_destroy (env);
 }
 
 




reply via email to

[Prev in Thread] Current Thread [Next in Thread]