gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r32575 - in gnunet/src: include multicast psyc psycstore


From: gnunet
Subject: [GNUnet-SVN] r32575 - in gnunet/src: include multicast psyc psycstore
Date: Fri, 7 Mar 2014 00:46:45 +0100

Author: tg
Date: 2014-03-07 00:46:45 +0100 (Fri, 07 Mar 2014)
New Revision: 32575

Added:
   gnunet/src/psyc/psyc_common.c
Modified:
   gnunet/src/include/gnunet_multicast_service.h
   gnunet/src/include/gnunet_protocols.h
   gnunet/src/include/gnunet_psyc_service.h
   gnunet/src/include/gnunet_social_service.h
   gnunet/src/multicast/gnunet-service-multicast.c
   gnunet/src/multicast/multicast.h
   gnunet/src/multicast/multicast_api.c
   gnunet/src/psyc/Makefile.am
   gnunet/src/psyc/gnunet-service-psyc.c
   gnunet/src/psyc/psyc.h
   gnunet/src/psyc/psyc_api.c
   gnunet/src/psyc/test_psyc.c
   gnunet/src/psycstore/plugin_psycstore_sqlite.c
Log:
PSYC: implement slave to master requests, tests, fixes, reorg

Multicast lib: handle member to origin requests.
Keep track of members and origins and call their callbacks when necessary.

Modified: gnunet/src/include/gnunet_multicast_service.h
===================================================================
--- gnunet/src/include/gnunet_multicast_service.h       2014-03-06 23:46:42 UTC 
(rev 32574)
+++ gnunet/src/include/gnunet_multicast_service.h       2014-03-06 23:46:45 UTC 
(rev 32575)
@@ -160,8 +160,62 @@
   /* Followed by message body. */
 };
 
+
+/**
+ * Header of a request from a member to the origin.
+ */
+struct GNUNET_MULTICAST_RequestHeader
+{
+  /**
+   * Header for all requests from a member to the origin.
+   */
+  struct GNUNET_MessageHeader header;
+
+  /**
+   * Public key of the sending member.
+   */
+  struct GNUNET_CRYPTO_EddsaPublicKey member_key;
+
+  /**
+   * ECC signature of the request fragment.
+   *
+   * Signature must match the public key of the multicast group.
+   */
+  struct GNUNET_CRYPTO_EddsaSignature signature;
+
+  /**
+   * Purpose for the signature and size of the signed data.
+   */
+  struct GNUNET_CRYPTO_EccSignaturePurpose purpose;
+
+  /**
+   * Number of the request fragment, monotonically increasing.
+   */
+  uint64_t fragment_id GNUNET_PACKED;
+
+  /**
+   * Byte offset of this @e fragment of the @e request.
+   */
+  uint64_t fragment_offset GNUNET_PACKED;
+
+  /**
+   * Number of the request this fragment belongs to.
+   *
+   * Set in GNUNET_MULTICAST_origin_to_all().
+   */
+  uint64_t request_id GNUNET_PACKED;
+
+  /**
+   * Flags for this request.
+   */
+  enum GNUNET_MULTICAST_MessageFlags flags GNUNET_PACKED;
+
+  /* Followed by request body. */
+};
+
 GNUNET_NETWORK_STRUCT_END
 
+
 /**
  * Maximum size of a multicast message fragment.
  */
@@ -492,7 +546,7 @@
  * @param next_fragment_id Next fragment ID to continue counting fragments from
  *        when restarting the origin.  1 for a new group.
  * @param join_cb Function called to approve / disapprove joining of a peer.
- * @param mem_test_cb Function multicast can use to test group membership.
+ * @param member_test_cb Function multicast can use to test group membership.
  * @param replay_frag_cb Function that can be called to replay a message 
fragment.
  * @param replay_msg_cb Function that can be called to replay a message.
  * @param request_cb Function called with message fragments from group members.
@@ -507,7 +561,7 @@
                                const struct GNUNET_CRYPTO_EddsaPrivateKey 
*priv_key,
                                uint64_t next_fragment_id,
                                GNUNET_MULTICAST_JoinCallback join_cb,
-                               GNUNET_MULTICAST_MembershipTestCallback 
mem_test_cb,
+                               GNUNET_MULTICAST_MembershipTestCallback 
member_test_cb,
                                GNUNET_MULTICAST_ReplayFragmentCallback 
replay_frag_cb,
                                GNUNET_MULTICAST_ReplayMessageCallback 
replay_msg_cb,
                                GNUNET_MULTICAST_RequestCallback request_cb,
@@ -756,14 +810,14 @@
  * Send a message to the origin of the multicast group.
  *
  * @param member Membership handle.
- * @param message_id Application layer ID for the message.  Opaque to 
multicast.
+ * @param request_id Application layer ID for the request.  Opaque to 
multicast.
  * @param notify Callback to call to get the message.
  * @param notify_cls Closure for @a notify.
  * @return Handle to cancel request, NULL on error (i.e. request already 
pending).
  */
 struct GNUNET_MULTICAST_MemberRequestHandle *
 GNUNET_MULTICAST_member_to_origin (struct GNUNET_MULTICAST_Member *member,
-                                   uint64_t message_id,
+                                   uint64_t request_id,
                                    GNUNET_MULTICAST_MemberTransmitNotify 
notify,
                                    void *notify_cls);
 

Modified: gnunet/src/include/gnunet_protocols.h
===================================================================
--- gnunet/src/include/gnunet_protocols.h       2014-03-06 23:46:42 UTC (rev 
32574)
+++ gnunet/src/include/gnunet_protocols.h       2014-03-06 23:46:45 UTC (rev 
32575)
@@ -2279,14 +2279,34 @@
 /* WIP: no numbers assigned yet */
 
 /**
+ * Start an origin.
+ */
+#define GNUNET_MESSAGE_TYPE_MULTICAST_ORIGIN_START 750
+
+/**
+ * Stop an origin.
+ */
+#define GNUNET_MESSAGE_TYPE_MULTICAST_ORIGIN_STOP 751
+
+/**
+ * Join a group as a member.
+ */
+#define GNUNET_MESSAGE_TYPE_MULTICAST_MEMBER_JOIN 752
+
+/**
+ * Leave a group.
+ */
+#define GNUNET_MESSAGE_TYPE_MULTICAST_MEMBER_PART 753
+
+/**
  * Multicast message from the origin to all members.
  */
-#define GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE 750
+#define GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE 754
 
 /**
  * A unicast message from a group member to the origin.
  */
-#define GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST
+#define GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST 755
 
 /**
  * A peer wants to join the group.
@@ -2366,14 +2386,6 @@
 
 
 
/*******************************************************************************
- * PSYC message types
- 
******************************************************************************/
-
-/*******************************************************************************
- * PSYCSTORE message types
- 
******************************************************************************/
-
-/*******************************************************************************
  * SOCIAL message types
  
******************************************************************************/
 

Modified: gnunet/src/include/gnunet_psyc_service.h
===================================================================
--- gnunet/src/include/gnunet_psyc_service.h    2014-03-06 23:46:42 UTC (rev 
32574)
+++ gnunet/src/include/gnunet_psyc_service.h    2014-03-06 23:46:45 UTC (rev 
32575)
@@ -110,7 +110,7 @@
    * Past messages are only available to slaves who were admitted at the time
    * they were sent to the channel.
    */
-  GNUNET_PSYC_CHANNEL_RESTRICTED_HISTORY = 1 << 1,
+  GNUNET_PSYC_CHANNEL_RESTRICTED_HISTORY = 1 << 1
 };
 
 /**
@@ -132,7 +132,7 @@
    */
   GNUNET_PSYC_CHANNEL_PRIVATE
     = GNUNET_PSYC_CHANNEL_ADMISSION_CONTROL
-    | GNUNET_PSYC_CHANNEL_RESTRICTED_HISTORY,
+    | GNUNET_PSYC_CHANNEL_RESTRICTED_HISTORY
 
 #if IDEAS_FOR_FUTURE
   /**
@@ -152,9 +152,7 @@
    */
   GNUNET_PSYC_CHANNEL_CLOSED
     = GNUNET_PSYC_CHANNEL_ADMISSION_CONTROL,
-,
 #endif
-
 };
 
 
@@ -163,7 +161,12 @@
   /**
    * Historic message, retrieved from PSYCstore.
    */
-  GNUNET_PSYC_MESSAGE_HISTORIC = 1
+  GNUNET_PSYC_MESSAGE_HISTORIC = 1 << 0,
+
+  /**
+   * Request from slave to master.
+   */
+  GNUNET_PSYC_MESSAGE_REQUEST = 1 << 1
 };
 
 GNUNET_NETWORK_STRUCT_BEGIN
@@ -406,7 +409,7 @@
 /**
  * Function called to provide data for a transmission via PSYC.
  *
- * Note that returning #GNUNET_OK or #GNUNET_SYSERR (but not #GNUNET_NO)
+ * Note that returning #GNUNET_YES or #GNUNET_SYSERR (but not #GNUNET_NO)
  * invalidates the respective transmission handle.
  *
  * @param cls Closure.
@@ -422,15 +425,43 @@
  *         #GNUNET_YES if this completes the transmission (all data supplied)
  */
 typedef int
-(*GNUNET_PSYC_MasterTransmitNotify) (void *cls,
-                                     uint16_t *data_size,
-                                     void *data);
+(*GNUNET_PSYC_TransmitNotifyData) (void *cls,
+                                   uint16_t *data_size,
+                                   void *data);
 
+/**
+ * Function called to provide a modifier for a transmission via PSYC.
+ *
+ * Note that returning #GNUNET_YES or #GNUNET_SYSERR (but not #GNUNET_NO)
+ * invalidates the respective transmission handle.
+ *
+ * @param cls Closure.
+ * @param[in,out] data_size  Initially set to the number of bytes available in
+ *         @a data, should be set to the number of bytes written to data.
+ * @param[out] data  Where to write the modifier's name and value.
+ *         The function must copy at most @a data_size bytes to @a data.
+ *         When this callback is first called for a modifier, @a data should
+ *         contain: "name\0value".  If the whole value does not fit, subsequent
+ *         calls to this function should write continuations of the value to
+ *         @a data.
+ * @param oper  Where to write the operator of the modifier.  Only needed 
during
+ *         the first call to this callback at the beginning of the modifier.
+ *         In case of subsequent calls asking for value continuations @a oper 
is
+ *         set to #NULL.
+ * @return #GNUNET_SYSERR on error (fatal, aborts transmission)
+ *         #GNUNET_NO on success, if more data is to be transmitted later.
+ *         Should be used if @a data_size was not big enough to take all the
+ *         data for the modifier's value (the name must be always returned
+ *         during the first call to this callback).
+ *         If 0 is returned in @a data_size the transmission is paused,
+ *         and can be resumed with GNUNET_PSYC_master_transmit_resume().
+ *         #GNUNET_YES if this completes the modifier (the whole value is 
supplied).
+ */
 typedef int
-(*GNUNET_PSYC_MasterTransmitNotifyModifier) (void *cls,
-                                             uint16_t *data_size,
-                                             void *data,
-                                             uint8_t *oper);
+(*GNUNET_PSYC_TransmitNotifyModifier) (void *cls,
+                                       uint16_t *data_size,
+                                       void *data,
+                                       uint8_t *oper);
 
 /**
  * Flags for transmitting messages to a channel by the master.
@@ -477,8 +508,8 @@
 struct GNUNET_PSYC_MasterTransmitHandle *
 GNUNET_PSYC_master_transmit (struct GNUNET_PSYC_Master *master,
                              const char *method_name,
-                             GNUNET_PSYC_MasterTransmitNotifyModifier 
notify_mod,
-                             GNUNET_PSYC_MasterTransmitNotify notify_data,
+                             GNUNET_PSYC_TransmitNotifyModifier notify_mod,
+                             GNUNET_PSYC_TransmitNotifyData notify_data,
                              void *notify_cls,
                              enum GNUNET_PSYC_MasterTransmitFlags flags);
 
@@ -588,29 +619,6 @@
 
 
 /**
- * Function called to provide data for a transmission to the channel master
- * (a.k.a. the @e host of the channel).
- *
- * Note that returning #GNUNET_OK or #GNUNET_SYSERR (but not #GNUNET_NO)
- * invalidates the respective transmission handle.
- *
- * @param cls Closure.
- * @param[in,out] data_size Initially set to the number of bytes available in
- *        @a data, should be set to the number of bytes written to data
- *        (IN/OUT).
- * @param[out] data Where to write the body of the message to give to the 
method;
- *        function must copy at most @a *data_size bytes to @a data.
- * @return #GNUNET_SYSERR on error (fatal, aborts transmission).
- *         #GNUNET_NO on success, if more data is to be transmitted later.
- *         #GNUNET_YES if this completes the transmission (all data supplied).
- */
-typedef int
-(*GNUNET_PSYC_SlaveTransmitNotify) (void *cls,
-                                    size_t *data_size,
-                                    char *data);
-
-
-/**
  * Flags for transmitting messages to the channel master by a slave.
  */
 enum GNUNET_PSYC_SlaveTransmitFlags
@@ -630,8 +638,8 @@
  *
  * @param slave Slave handle.
  * @param method_name Which (PSYC) method should be invoked (on host).
- * @param env Environment containing transient variables for the message, or 
NULL.
- * @param notify Function to call when we are allowed to transmit (to get 
data).
+ * @param notify_mod Function to call to obtain modifiers.
+ * @param notify_data Function to call to obtain fragments of the data.
  * @param notify_cls Closure for @a notify.
  * @param flags Flags for the message being transmitted.
  * @return Transmission handle, NULL on error (i.e. more than one request 
queued).
@@ -639,8 +647,8 @@
 struct GNUNET_PSYC_SlaveTransmitHandle *
 GNUNET_PSYC_slave_transmit (struct GNUNET_PSYC_Slave *slave,
                             const char *method_name,
-                            const struct GNUNET_ENV_Environment *env,
-                            GNUNET_PSYC_SlaveTransmitNotify notify,
+                            GNUNET_PSYC_TransmitNotifyModifier notify_mod,
+                            GNUNET_PSYC_TransmitNotifyData notify_data,
                             void *notify_cls,
                             enum GNUNET_PSYC_SlaveTransmitFlags flags);
 

Modified: gnunet/src/include/gnunet_social_service.h
===================================================================
--- gnunet/src/include/gnunet_social_service.h  2014-03-06 23:46:42 UTC (rev 
32574)
+++ gnunet/src/include/gnunet_social_service.h  2014-03-06 23:46:45 UTC (rev 
32575)
@@ -375,12 +375,11 @@
  * Convert our home to a place so we can access it via the place API.
  *
  * @param home Handle for the home.
- * @param keep_active Keep home active after last application disconnected.
  * @return Place handle for the same home, valid as long as @a home is valid;
  *         do NOT try to GNUNET_SOCIAL_place_leave() this place, it's your 
home!
  */
 struct GNUNET_SOCIAL_Place *
-GNUNET_SOCIAL_home_get_place (struct GNUNET_SOCIAL_Home *home, int 
keep_active);
+GNUNET_SOCIAL_home_get_place (struct GNUNET_SOCIAL_Home *home);
 
 
 /**
@@ -390,9 +389,10 @@
  * Guests will be disconnected until the home is restarted.
  *
  * @param home Home to leave.
+ * @param keep_active Keep home active after last application disconnected.
  */
 void
-GNUNET_SOCIAL_home_leave (struct GNUNET_SOCIAL_Home *home);
+GNUNET_SOCIAL_home_leave (struct GNUNET_SOCIAL_Home *home, int keep_active);
 
 /**
  * Request entry to a place (home hosted by someone else).

Modified: gnunet/src/multicast/gnunet-service-multicast.c
===================================================================
--- gnunet/src/multicast/gnunet-service-multicast.c     2014-03-06 23:46:42 UTC 
(rev 32574)
+++ gnunet/src/multicast/gnunet-service-multicast.c     2014-03-06 23:46:45 UTC 
(rev 32575)
@@ -41,6 +41,71 @@
 
 
 /**
+ * Handle a connecting client starting an origin.
+ */
+static void
+handle_origin_start (void *cls, struct GNUNET_SERVER_Client *client,
+                     const struct GNUNET_MessageHeader *msg)
+{
+
+}
+
+
+/**
+ * Handle a client stopping an origin.
+ */
+static void
+handle_origin_stop (void *cls, struct GNUNET_SERVER_Client *client,
+                    const struct GNUNET_MessageHeader *msg)
+{
+
+}
+
+
+/**
+ * Handle a connecting client joining a group.
+ */
+static void
+handle_member_join (void *cls, struct GNUNET_SERVER_Client *client,
+                     const struct GNUNET_MessageHeader *msg)
+{
+
+}
+
+
+/**
+ * Handle a client parting a group.
+ */
+static void
+handle_member_part (void *cls, struct GNUNET_SERVER_Client *client,
+                    const struct GNUNET_MessageHeader *msg)
+{
+
+}
+
+
+/**
+ * Incoming message from a client.
+ */
+static void
+handle_multicast_message (void *cls, struct GNUNET_SERVER_Client *client,
+                         const struct GNUNET_MessageHeader *msg)
+{
+
+}
+
+
+/**
+ * Incoming request from a client.
+ */
+static void
+handle_multicast_request (void *cls, struct GNUNET_SERVER_Client *client,
+                         const struct GNUNET_MessageHeader *msg)
+{
+
+}
+
+/**
  * Process multicast requests.
  *
  * @param cls closure
@@ -52,7 +117,24 @@
      const struct GNUNET_CONFIGURATION_Handle *cfg)
 {
   static const struct GNUNET_SERVER_MessageHandler handlers[] = {
-    /* FIXME: add handlers here! */
+    { &handle_origin_start, NULL,
+      GNUNET_MESSAGE_TYPE_MULTICAST_ORIGIN_START, 0 },
+
+    { &handle_origin_stop, NULL,
+      GNUNET_MESSAGE_TYPE_MULTICAST_ORIGIN_STOP, 0 },
+
+    { &handle_member_join, NULL,
+      GNUNET_MESSAGE_TYPE_MULTICAST_MEMBER_JOIN, 0 },
+
+    { &handle_member_part, NULL,
+      GNUNET_MESSAGE_TYPE_MULTICAST_MEMBER_PART, 0 },
+
+    { &handle_multicast_message, NULL,
+      GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE, 0 },
+
+    { &handle_multicast_request, NULL,
+      GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST, 0 },
+
     {NULL, NULL, 0, 0}
   };
   /* FIXME: do setup here */

Modified: gnunet/src/multicast/multicast.h
===================================================================
--- gnunet/src/multicast/multicast.h    2014-03-06 23:46:42 UTC (rev 32574)
+++ gnunet/src/multicast/multicast.h    2014-03-06 23:46:45 UTC (rev 32575)
@@ -22,6 +22,7 @@
  * @file multicast/multicast.h
  * @brief multicast IPC messages
  * @author Christian Grothoff
+ * @author Gabor X Toth
  */
 #ifndef MULTICAST_H
 #define MULTICAST_H
@@ -30,12 +31,52 @@
 
 
 /**
+ * Header of a join request sent to the origin or another member.
+ */
+struct GNUNET_MULTICAST_JoinRequest
+{
+  /**
+   * Header for the join request.
+   */
+  struct GNUNET_MessageHeader header;
+
+  /**
+   * ECC signature of the rest of the fields of the join request.
+   *
+   * Signature must match the public key of the joining member.
+   */
+  struct GNUNET_CRYPTO_EddsaSignature signature;
+
+  /**
+   * Purpose for the signature and size of the signed data.
+   */
+  struct GNUNET_CRYPTO_EccSignaturePurpose purpose;
+
+  /**
+   * Public key of the target group.
+   */
+  struct GNUNET_CRYPTO_EddsaPublicKey group_key;
+
+  /**
+   * Public key of the joining member.
+   */
+  struct GNUNET_CRYPTO_EddsaPublicKey member_key;
+
+  /**
+   * Peer identity of the joining member.
+   */
+  struct GNUNET_PeerIdentity member_peer;
+
+  /* Followed by request body. */
+};
+
+
+/**
  * Message sent from the client to the service to notify the service
  * about a join decision.
  */
 struct MulticastJoinDecisionMessage
 {
-
   /**
    *
    */
@@ -329,9 +370,6 @@
 };
 
 
-
-
-
 GNUNET_NETWORK_STRUCT_END
 
 #endif

Modified: gnunet/src/multicast/multicast_api.c
===================================================================
--- gnunet/src/multicast/multicast_api.c        2014-03-06 23:46:42 UTC (rev 
32574)
+++ gnunet/src/multicast/multicast_api.c        2014-03-06 23:46:45 UTC (rev 
32575)
@@ -34,6 +34,19 @@
 
 
 /**
+ * Started origins.
+ * Group's pub_key_hash -> struct GNUNET_MULTICAST_Origin
+ */
+static struct GNUNET_CONTAINER_MultiHashMap *origins;
+
+/**
+ * Joined members.
+ * group_key_hash -> struct GNUNET_MULTICAST_Member
+ */
+static struct GNUNET_CONTAINER_MultiHashMap *members;
+
+
+/**
  * Handle for a request to send a message to all multicast group members
  * (from the origin).
  */
@@ -49,13 +62,20 @@
 };
 
 
+struct GNUNET_MULTICAST_Group
+{
+  uint8_t is_origin;
+};
+
 /**
  * Handle for the origin of a multicast group.
  */
 struct GNUNET_MULTICAST_Origin
 {
+  struct GNUNET_MULTICAST_Group grp;
+
+  struct GNUNET_MULTICAST_OriginMessageHandle msg_handle;
   struct GNUNET_CRYPTO_EddsaPrivateKey priv_key;
-  struct GNUNET_MULTICAST_OriginMessageHandle msg_handle;
 
   GNUNET_MULTICAST_JoinCallback join_cb;
   GNUNET_MULTICAST_MembershipTestCallback mem_test_cb;
@@ -66,6 +86,9 @@
   void *cls;
 
   uint64_t next_fragment_id;
+
+  struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
+  struct GNUNET_HashCode pub_key_hash;
 };
 
 
@@ -74,123 +97,176 @@
  */
 struct GNUNET_MULTICAST_MemberRequestHandle
 {
+  GNUNET_MULTICAST_MemberTransmitNotify notify;
+  void *notify_cls;
+  struct GNUNET_MULTICAST_Member *member;
+
+  uint64_t request_id;
+  uint64_t fragment_offset;
 };
 
 
 /**
- * Opaque handle for a multicast group member.
+ * Handle for a multicast group member.
  */
 struct GNUNET_MULTICAST_Member
 {
+  struct GNUNET_MULTICAST_Group grp;
+
+  struct GNUNET_MULTICAST_MemberRequestHandle req_handle;
+
+  struct GNUNET_CRYPTO_EddsaPublicKey group_key;
+  struct GNUNET_CRYPTO_EddsaPrivateKey member_key;
+  struct GNUNET_PeerIdentity origin;
+  struct GNUNET_PeerIdentity relays;
+  uint32_t relay_count;
+  struct GNUNET_MessageHeader *join_request;
+  GNUNET_MULTICAST_JoinCallback join_cb;
+  GNUNET_MULTICAST_MembershipTestCallback member_test_cb;
+  GNUNET_MULTICAST_ReplayFragmentCallback replay_frag_cb;
+  GNUNET_MULTICAST_ReplayMessageCallback replay_msg_cb;
+  GNUNET_MULTICAST_MessageCallback message_cb;
+  void *cls;
+
+  uint64_t next_fragment_id;
+  struct GNUNET_HashCode group_key_hash;
 };
 
 
-GNUNET_NETWORK_STRUCT_BEGIN
+/**
+ * Handle that identifies a join request.
+ *
+ * Used to match calls to #GNUNET_MULTICAST_JoinCallback to the
+ * corresponding calls to #GNUNET_MULTICAST_join_decision().
+ */
+struct GNUNET_MULTICAST_JoinHandle
+{
+};
 
+
 /**
- * Header of a request from a member to the origin.
+ * Handle to pass back for the answer of a membership test.
  */
-struct GNUNET_MULTICAST_RequestHeader
+struct GNUNET_MULTICAST_MembershipTestHandle
 {
-  /**
-   * Header for all requests from a member to the origin.
-   */
-  struct GNUNET_MessageHeader header;
+};
 
-  /**
-   * Public key of the sending member.
-   */
-  struct GNUNET_CRYPTO_EddsaPublicKey member_key;
 
-  /**
-   * ECC signature of the request fragment.
-   *
-   * Signature must match the public key of the multicast group.
-   */
-  struct GNUNET_CRYPTO_EddsaSignature signature;
+/**
+ * Opaque handle to a replay request from the multicast service.
+ */
+struct GNUNET_MULTICAST_ReplayHandle
+{
+};
 
-  /**
-   * Purpose for the signature and size of the signed data.
-   */
-  struct GNUNET_CRYPTO_EccSignaturePurpose purpose;
 
-  /**
-   * Number of the request fragment, monotonically increasing.
-   */
-  uint64_t fragment_id GNUNET_PACKED;
+/**
+ * Handle for a replay request.
+ */
+struct GNUNET_MULTICAST_MemberReplayHandle
+{
+};
 
-  /**
-   * Byte offset of this @e fragment of the @e request.
-   */
-  uint64_t fragment_offset GNUNET_PACKED;
 
-  /**
-   * Number of the request this fragment belongs to.
-   *
-   * Set in GNUNET_MULTICAST_origin_to_all().
-   */
-  uint64_t request_id GNUNET_PACKED;
+/**
+ * Iterator callback for calling message callbacks for all groups.
+ */
+static int
+message_callback (void *cls, const struct GNUNET_HashCode *chan_key_hash,
+                   void *group)
+{
+  const struct GNUNET_MessageHeader *msg = cls;
+  struct GNUNET_MULTICAST_Group *grp = group;
 
-  /**
-   * Flags for this request.
-   */
-  enum GNUNET_MULTICAST_MessageFlags flags GNUNET_PACKED;
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Calling message callback for a message of type %u and size 
%u.\n",
+              ntohs (msg->type), ntohs (msg->size));
 
-  /* Followed by request body. */
-};
+  if (GNUNET_YES == grp->is_origin)
+  {
+    struct GNUNET_MULTICAST_Origin *orig = (struct GNUNET_MULTICAST_Origin *) 
grp;
+    orig->message_cb (orig->cls, msg);
+  }
+  else
+  {
+    struct GNUNET_MULTICAST_Member *mem = (struct GNUNET_MULTICAST_Member *) 
grp;
+    mem->message_cb (mem->cls, msg);
+  }
 
+  return GNUNET_YES;
+}
+
+
 /**
- * Header of a join request sent to the origin or another member.
+ * Handle a multicast message from the service.
+ *
+ * Call message callbacks of all origins and members of the destination group.
+ *
+ * @param grp Destination group of the message.
+ * @param msg The message.
  */
-struct GNUNET_MULTICAST_JoinRequest
+static void
+handle_multicast_message (struct GNUNET_MULTICAST_Group *grp,
+                          const struct GNUNET_MULTICAST_MessageHeader *msg)
 {
-  /**
-   * Header for the join request.
-   */
-  struct GNUNET_MessageHeader header;
+  struct GNUNET_HashCode *hash;
 
-  /**
-   * ECC signature of the rest of the fields of the join request.
-   *
-   * Signature must match the public key of the joining member.
-   */
-  struct GNUNET_CRYPTO_EddsaSignature signature;
+  if (GNUNET_YES == grp->is_origin)
+  {
+    struct GNUNET_MULTICAST_Origin *orig = (struct GNUNET_MULTICAST_Origin *) 
grp;
+    hash = &orig->pub_key_hash;
+  }
+  else
+  {
+    struct GNUNET_MULTICAST_Member *mem = (struct GNUNET_MULTICAST_Member *) 
grp;
+    hash = &mem->group_key_hash;
+  }
 
-  /**
-   * Purpose for the signature and size of the signed data.
-   */
-  struct GNUNET_CRYPTO_EccSignaturePurpose purpose;
+  if (origins != NULL)
+    GNUNET_CONTAINER_multihashmap_get_multiple (origins, hash, 
message_callback,
+                                                (void *) msg);
+  if (members != NULL)
+    GNUNET_CONTAINER_multihashmap_get_multiple (members, hash, 
message_callback,
+                                                (void *) msg);
+}
 
-  /**
-   * Public key of the target group.
-   */
-  struct GNUNET_CRYPTO_EddsaPublicKey group_key;
 
-  /**
-   * Public key of the joining member.
-   */
-  struct GNUNET_CRYPTO_EddsaPublicKey member_key;
+/**
+ * Iterator callback for calling request callbacks of origins.
+ */
+static int
+request_callback (void *cls, const struct GNUNET_HashCode *chan_key_hash,
+                  void *origin)
+{
+  const struct GNUNET_MULTICAST_RequestHeader *req = cls;
+  struct GNUNET_MULTICAST_Origin *orig = origin;
 
-  /**
-   * Peer identity of the joining member.
-   */
-  struct GNUNET_PeerIdentity member_peer;
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Calling request callback for a request of type %u and size 
%u.\n",
+              ntohs (req->header.type), ntohs (req->header.size));
 
-  /* Followed by request body. */
-};
+  orig->request_cb (orig->cls, &req->member_key,
+                    (const struct GNUNET_MessageHeader *) req, 0);
+  return GNUNET_YES;
+}
 
-GNUNET_NETWORK_STRUCT_END
 
-
 /**
- * Handle that identifies a join request.
+ * Handle a multicast request from the service.
  *
- * Used to match calls to #GNUNET_MULTICAST_JoinCallback to the
- * corresponding calls to #GNUNET_MULTICAST_join_decision().
+ * Call request callbacks of all origins of the destination group.
+ *
+ * @param grp Destination group of the message.
+ * @param msg The message.
  */
-struct GNUNET_MULTICAST_JoinHandle
+static void
+handle_multicast_request (const struct GNUNET_HashCode *group_key_hash,
+                          const struct GNUNET_MULTICAST_RequestHeader *req)
 {
-};
+  if (NULL != origins)
+    GNUNET_CONTAINER_multihashmap_get_multiple (origins, group_key_hash,
+                                                request_callback, (void *) 
req);
+}
 
 
 /**
@@ -227,14 +303,6 @@
 
 
 /**
- * Handle to pass back for the answer of a membership test.
- */
-struct GNUNET_MULTICAST_MembershipTestHandle
-{
-};
-
-
-/**
  * Call informing multicast about the decision taken for a membership test.
  *
  * @param mth Handle that was given for the query.
@@ -249,14 +317,6 @@
 
 
 /**
- * Opaque handle to a replay request from the multicast service.
- */
-struct GNUNET_MULTICAST_ReplayHandle
-{
-};
-
-
-/**
  * Replay a message fragment for the multicast group.
  *
  * @param rh Replay handle identifying which replay operation was requested.
@@ -340,6 +400,7 @@
                                void *cls)
 {
   struct GNUNET_MULTICAST_Origin *orig = GNUNET_malloc (sizeof (*orig));
+  orig->grp.is_origin = GNUNET_YES;
   orig->priv_key = *priv_key;
   orig->next_fragment_id = next_fragment_id;
   orig->join_cb = join_cb;
@@ -349,11 +410,38 @@
   orig->request_cb = request_cb;
   orig->message_cb = message_cb;
   orig->cls = cls;
+
+  GNUNET_CRYPTO_eddsa_key_get_public (&orig->priv_key, &orig->pub_key);
+  GNUNET_CRYPTO_hash (&orig->pub_key, sizeof (orig->pub_key),
+                      &orig->pub_key_hash);
+
+  if (NULL == origins)
+    origins = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
+
+  GNUNET_CONTAINER_multihashmap_put (origins, &orig->pub_key_hash, orig,
+                                     
GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
+
+  /* FIXME: send ORIGIN_START to service */
+
   return orig;
 }
 
 
-/* FIXME: for now just send back to the client what it sent. */
+/**
+ * Stop a multicast group.
+ *
+ * @param origin Multicast group to stop.
+ */
+void
+GNUNET_MULTICAST_origin_stop (struct GNUNET_MULTICAST_Origin *orig)
+{
+  GNUNET_CONTAINER_multihashmap_remove (origins, &orig->pub_key_hash, orig);
+  GNUNET_free (orig);
+}
+
+
+/* FIXME: for now just call clients' callbacks
+ *        without sending anything to multicast. */
 static void
 schedule_origin_to_all (void *cls, const struct GNUNET_SCHEDULER_TaskContext 
*tc)
 {
@@ -371,7 +459,7 @@
       || GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < buf_size)
   {
     LOG (GNUNET_ERROR_TYPE_ERROR,
-         "MasterTransmitNotify() returned error or invalid message size.\n");
+         "OriginTransmitNotify() returned error or invalid message size.\n");
     /* FIXME: handle error */
     return;
   }
@@ -401,19 +489,18 @@
     return;
   }
 
-  /* FIXME: send msg to the service and only then call message_cb with the
-   *        returned signed message.
-   * FIXME: Also send to local members in this group.
+  /* FIXME: send msg to the service and only then call handle_multicast_message
+   *        with the returned signed message.
    */
-  orig->message_cb (orig->cls, (const struct GNUNET_MessageHeader *) msg);
+  handle_multicast_message (&orig->grp, msg);
 
   if (GNUNET_NO == ret)
     GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply
                                   (GNUNET_TIME_UNIT_SECONDS, 1),
                                   schedule_origin_to_all, orig);
-
 }
 
+
 /**
  * Send a message to the multicast group.
  *
@@ -439,6 +526,7 @@
   mh->notify = notify;
   mh->notify_cls = notify_cls;
 
+  /* FIXME: remove delay, it's there only for testing */
   GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply
                                 (GNUNET_TIME_UNIT_SECONDS, 1),
                                 schedule_origin_to_all, origin);
@@ -470,18 +558,6 @@
 
 
 /**
- * Stop a multicast group.
- *
- * @param origin Multicast group to stop.
- */
-void
-GNUNET_MULTICAST_origin_stop (struct GNUNET_MULTICAST_Origin *origin)
-{
-  GNUNET_free (origin);
-}
-
-
-/**
  * Join a multicast group.
  *
  * The entity joining is always the local peer.  Further information about the
@@ -531,24 +607,61 @@
                               const struct GNUNET_PeerIdentity *relays,
                               const struct GNUNET_MessageHeader *join_request,
                               GNUNET_MULTICAST_JoinCallback join_cb,
-                              GNUNET_MULTICAST_MembershipTestCallback 
mem_test_cb,
+                              GNUNET_MULTICAST_MembershipTestCallback 
member_test_cb,
                               GNUNET_MULTICAST_ReplayFragmentCallback 
replay_frag_cb,
                               GNUNET_MULTICAST_ReplayMessageCallback 
replay_msg_cb,
                               GNUNET_MULTICAST_MessageCallback message_cb,
                               void *cls)
 {
   struct GNUNET_MULTICAST_Member *mem = GNUNET_malloc (sizeof (*mem));
+  mem->group_key = *group_key;
+  mem->member_key = *member_key;
+  mem->origin = *origin;
+  mem->relay_count = relay_count;
+  mem->relays = *relays;
+  mem->join_cb = join_cb;
+  mem->member_test_cb = member_test_cb;
+  mem->replay_frag_cb = replay_frag_cb;
+  mem->message_cb = message_cb;
+  mem->cls = cls;
 
+  if (NULL != join_request)
+  {
+    uint16_t size = ntohs (join_request->size);
+    mem->join_request = GNUNET_malloc (size);
+    memcpy (mem->join_request, join_request, size);
+  }
+
+  GNUNET_CRYPTO_hash (&mem->group_key, sizeof (mem->group_key), 
&mem->group_key_hash);
+
+  if (NULL == members)
+    members = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
+
+  GNUNET_CONTAINER_multihashmap_put (members, &mem->group_key_hash, mem,
+                                     
GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
+
+  /* FIXME: send MEMBER_JOIN to service */
+
   return mem;
 }
 
 
 /**
- * Handle for a replay request.
+ * Part a multicast group.
+ *
+ * Disconnects from all group members and invalidates the @a member handle.
+ *
+ * An application-dependent part message can be transmitted beforehand using
+ * #GNUNET_MULTICAST_member_to_origin())
+ *
+ * @param member Membership handle.
  */
-struct GNUNET_MULTICAST_MemberReplayHandle
+void
+GNUNET_MULTICAST_member_part (struct GNUNET_MULTICAST_Member *mem)
 {
-};
+  GNUNET_CONTAINER_multihashmap_remove (members, &mem->group_key_hash, mem);
+  GNUNET_free (mem);
+}
 
 
 /**
@@ -612,20 +725,62 @@
 }
 
 
-/**
- * Part a multicast group.
- *
- * Disconnects from all group members and invalidates the @a member handle.
- *
- * An application-dependent part message can be transmitted beforehand using
- * #GNUNET_MULTICAST_member_to_origin())
- *
- * @param member Membership handle.
- */
-void
-GNUNET_MULTICAST_member_part (struct GNUNET_MULTICAST_Member *member)
+/* FIXME: for now just send back to the client what it sent. */
+static void
+schedule_member_to_origin (void *cls, const struct 
GNUNET_SCHEDULER_TaskContext *tc)
 {
-  GNUNET_free (member);
+  LOG (GNUNET_ERROR_TYPE_DEBUG, "schedule_member_to_origin()\n");
+  struct GNUNET_MULTICAST_Member *mem = cls;
+  struct GNUNET_MULTICAST_MemberRequestHandle *rh = &mem->req_handle;
+
+  size_t buf_size = GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD;
+  char buf[GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD] = "";
+  struct GNUNET_MULTICAST_RequestHeader *req
+    = (struct GNUNET_MULTICAST_RequestHeader *) buf;
+  int ret = rh->notify (rh->notify_cls, &buf_size, &req[1]);
+
+  if (! (GNUNET_YES == ret || GNUNET_NO == ret)
+      || GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < buf_size)
+  {
+    LOG (GNUNET_ERROR_TYPE_ERROR,
+         "MemberTransmitNotify() returned error or invalid message size.\n");
+    /* FIXME: handle error */
+    return;
+  }
+
+  if (GNUNET_NO == ret && 0 == buf_size)
+    return; /* Transmission paused. */
+
+  req->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST);
+  req->header.size = htons (sizeof (*req) + buf_size);
+  req->request_id = GNUNET_htonll (rh->request_id);
+
+  /* FIXME: add fragment ID and signature in the service instead of here */
+  req->fragment_id = GNUNET_ntohll (mem->next_fragment_id++);
+  req->fragment_offset = GNUNET_ntohll (rh->fragment_offset);
+  rh->fragment_offset += sizeof (*req) + buf_size;
+  req->purpose.size = htonl (sizeof (*req) + buf_size
+                             - sizeof (req->header)
+                             - sizeof (req->member_key)
+                             - sizeof (req->signature));
+  req->purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_MULTICAST_MESSAGE);
+
+  if (GNUNET_OK != GNUNET_CRYPTO_eddsa_sign (&mem->member_key, &req->purpose,
+                                           &req->signature))
+  {
+    /* FIXME: handle error */
+    return;
+  }
+
+  /* FIXME: send req to the service and only then call handle_multicast_request
+   *        with the returned request.
+   */
+  handle_multicast_request (&mem->group_key_hash, req);
+
+  if (GNUNET_NO == ret)
+    GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply
+                                  (GNUNET_TIME_UNIT_SECONDS, 1),
+                                  schedule_member_to_origin, mem);
 }
 
 
@@ -633,18 +788,28 @@
  * Send a message to the origin of the multicast group.
  *
  * @param member Membership handle.
- * @param message_id Application layer ID for the message.  Opaque to 
multicast.
+ * @param request_id Application layer ID for the request.  Opaque to 
multicast.
  * @param notify Callback to call to get the message.
  * @param notify_cls Closure for @a notify.
  * @return Handle to cancel request, NULL on error (i.e. request already 
pending).
  */
 struct GNUNET_MULTICAST_MemberRequestHandle *
 GNUNET_MULTICAST_member_to_origin (struct GNUNET_MULTICAST_Member *member,
-                                   uint64_t message_id,
+                                   uint64_t request_id,
                                    GNUNET_MULTICAST_MemberTransmitNotify 
notify,
                                    void *notify_cls)
 {
-  return NULL;
+  struct GNUNET_MULTICAST_MemberRequestHandle *rh = &member->req_handle;
+  rh->member = member;
+  rh->request_id = request_id;
+  rh->notify = notify;
+  rh->notify_cls = notify_cls;
+
+  /* FIXME: remove delay, it's there only for testing */
+  GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply
+                                (GNUNET_TIME_UNIT_SECONDS, 1),
+                                schedule_member_to_origin, member);
+  return &member->req_handle;
 }
 
 

Modified: gnunet/src/psyc/Makefile.am
===================================================================
--- gnunet/src/psyc/Makefile.am 2014-03-06 23:46:42 UTC (rev 32574)
+++ gnunet/src/psyc/Makefile.am 2014-03-06 23:46:45 UTC (rev 32575)
@@ -21,7 +21,7 @@
 
 libgnunetpsyc_la_SOURCES = \
   psyc_api.c \
-  psyc.h
+  psyc_common.c
 libgnunetpsyc_la_LIBADD = \
   $(top_builddir)/src/util/libgnunetutil.la \
   $(top_builddir)/src/env/libgnunetenv.la \
@@ -39,7 +39,8 @@
  gnunet-service-psyc
 
 gnunet_service_psyc_SOURCES = \
- gnunet-service-psyc.c
+ gnunet-service-psyc.c \
+ psyc_common.c
 gnunet_service_psyc_LDADD = \
   $(top_builddir)/src/statistics/libgnunetstatistics.la \
   $(top_builddir)/src/util/libgnunetutil.la \
@@ -51,6 +52,7 @@
   $(top_builddir)/src/util/libgnunetutil.la \
   $(top_builddir)/src/multicast/libgnunetmulticast.la \
   $(top_builddir)/src/psycstore/libgnunetpsycstore.la
+gnunet_service_psyc_CFLAGS = $(AM_CFLAGS)
 
 
 if HAVE_TESTING

Modified: gnunet/src/psyc/gnunet-service-psyc.c
===================================================================
--- gnunet/src/psyc/gnunet-service-psyc.c       2014-03-06 23:46:42 UTC (rev 
32574)
+++ gnunet/src/psyc/gnunet-service-psyc.c       2014-03-06 23:46:45 UTC (rev 
32575)
@@ -56,6 +56,7 @@
 static struct GNUNET_PSYCSTORE_Handle *store;
 
 /**
+ * All connected masters and slaves.
  * Channel's pub_key_hash -> struct Channel
  */
 static struct GNUNET_CONTAINER_MultiHashMap *clients;
@@ -105,6 +106,15 @@
 
   uint8_t in_transmit;
   uint8_t is_master;
+
+  /**
+   * Ready to receive messages from client.
+   */
+  uint8_t ready;
+
+  /**
+   * Client disconnected.
+   */
   uint8_t disconnected;
 };
 
@@ -116,7 +126,6 @@
   struct Channel channel;
   struct GNUNET_CRYPTO_EddsaPrivateKey priv_key;
   struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
-  struct GNUNET_HashCode pub_key_hash;
 
   struct GNUNET_MULTICAST_Origin *origin;
   struct GNUNET_MULTICAST_OriginMessageHandle *tmit_handle;
@@ -144,6 +153,8 @@
    * @see enum GNUNET_PSYC_Policy
    */
   uint32_t policy;
+
+  struct GNUNET_HashCode pub_key_hash;
 };
 
 
@@ -155,24 +166,26 @@
   struct Channel channel;
   struct GNUNET_CRYPTO_EddsaPrivateKey slave_key;
   struct GNUNET_CRYPTO_EddsaPublicKey chan_key;
-  struct GNUNET_HashCode chan_key_hash;
 
   struct GNUNET_MULTICAST_Member *member;
   struct GNUNET_MULTICAST_MemberRequestHandle *tmit_handle;
 
   struct GNUNET_PeerIdentity origin;
+
+  uint32_t relay_count;
   struct GNUNET_PeerIdentity *relays;
+
   struct GNUNET_MessageHeader *join_req;
 
   uint64_t max_message_id;
   uint64_t max_request_id;
 
-  uint32_t relay_count;
+  struct GNUNET_HashCode chan_key_hash;
 };
 
 
 static inline void
-transmit_message (struct Channel *ch);
+transmit_message (struct Channel *ch, uint8_t inc_msg_id);
 
 
 /**
@@ -235,14 +248,14 @@
   if (NULL == client)
     return;
 
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Client %p disconnected\n", client);
-
   struct Channel *ch
     = GNUNET_SERVER_client_get_user_context (client, struct Channel);
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Client disconnected\n", ch);
+
   if (NULL == ch)
   {
     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
-                "User context is NULL in client_disconnect()\n");
+                "%p User context is NULL in client_disconnect()\n", ch);
     GNUNET_break (0);
     return;
   }
@@ -252,7 +265,7 @@
   /* Send pending messages to multicast before cleanup. */
   if (NULL != ch->tmit_head)
   {
-    transmit_message (ch);
+    transmit_message (ch, GNUNET_NO);
   }
   else
   {
@@ -311,54 +324,23 @@
 
 
 /**
- * Iterator callback for sending a message to a client.
- *
- * @see message_cb()
- */
-static int
-message_to_client (void *cls, const struct GNUNET_HashCode *chan_key_hash,
-                   void *chan)
-{
-  const struct GNUNET_MessageHeader *msg = cls;
-  struct Channel *ch = chan;
-
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Sending message of type %u and size %u to client 0x%zx.\n",
-              ntohs (msg->type), ntohs (msg->size), ch->client);
-
-  GNUNET_SERVER_notification_context_add (nc, ch->client);
-  GNUNET_SERVER_notification_context_unicast (nc, ch->client, msg, GNUNET_NO);
-
-  return GNUNET_YES;
-}
-
-
-/**
  * Incoming message fragment from multicast.
  *
- * Store it using PSYCstore and send it to all clients of the channel.
+ * Store it using PSYCstore and send it to the client of the channel.
  */
 static void
-message_cb (void *cls, const struct GNUNET_MessageHeader *msg)
+message_cb (struct Channel *ch,
+            const struct GNUNET_CRYPTO_EddsaPublicKey *chan_key,
+            const struct GNUNET_HashCode *chan_key_hash,
+            const struct GNUNET_MessageHeader *msg)
 {
   uint16_t type = ntohs (msg->type);
   uint16_t size = ntohs (msg->size);
 
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Received message of type %u and size %u from multicast.\n",
-              type, size);
+              "%p Received message of type %u and size %u from multicast.\n",
+              ch, type, size);
 
-  struct Channel *ch = cls;
-  struct Master *mst = cls;
-  struct Slave *slv = cls;
-
-  /* const struct GNUNET_MULTICAST_MessageHeader *mmsg
-     = (const struct GNUNET_MULTICAST_MessageHeader *) msg; */
-  struct GNUNET_CRYPTO_EddsaPublicKey *chan_key
-    = ch->is_master ? &mst->pub_key : &slv->chan_key;
-  struct GNUNET_HashCode *chan_key_hash
-    = ch->is_master ? &mst->pub_key_hash : &slv->chan_key_hash;
-
   switch (type)
   {
   case GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE:
@@ -378,29 +360,19 @@
 
     const struct GNUNET_MULTICAST_MessageHeader *mmsg
       = (const struct GNUNET_MULTICAST_MessageHeader *) msg;
-    struct GNUNET_PSYC_MessageHeader *pmsg;
 
-    uint16_t size = ntohs (msg->size);
-    uint16_t psize = 0;
-    uint16_t pos = 0;
-
-    for (pos = 0; sizeof (*mmsg) + pos < size; pos += psize)
+    if (GNUNET_YES != GNUNET_PSYC_check_message_parts (size - sizeof (*mmsg),
+                                                       (const char *) 
&mmsg[1]))
     {
-      const struct GNUNET_MessageHeader *pmsg
-        = (const struct GNUNET_MessageHeader *) ((char *) &mmsg[1] + pos);
-      psize = ntohs (pmsg->size);
-      if (psize < sizeof (*pmsg) || sizeof (*mmsg) + pos + psize > size)
-      {
-        GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
-                    "Received invalid message part of type %u and size %u "
-                    "from multicast. Not sending to clients.\n",
-                    ntohs (pmsg->type), psize);
-        GNUNET_break_op (0);
-        return;
-      }
+      GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+                  "%p Received message with invalid parts from multicast. "
+                  "Dropping message.\n", ch);
+      GNUNET_break_op (0);
+      break;
     }
 
-    psize = sizeof (*pmsg) + size - sizeof (*mmsg);
+    struct GNUNET_PSYC_MessageHeader *pmsg;
+    uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg);
     pmsg = GNUNET_malloc (psize);
     pmsg->header.size = htons (psize);
     pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
@@ -408,39 +380,116 @@
 
     memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg));
 
-    GNUNET_CONTAINER_multihashmap_get_multiple (clients, chan_key_hash,
-                                                message_to_client,
-                                                (void *) pmsg);
+    GNUNET_SERVER_notification_context_add (nc, ch->client);
+    GNUNET_SERVER_notification_context_unicast (nc, ch->client,
+                                                (const struct 
GNUNET_MessageHeader *) pmsg,
+                                                GNUNET_NO);
     GNUNET_free (pmsg);
     break;
   }
   default:
     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
-                "Discarding unknown message of type %u and size %u.\n",
-                type, size);
+                "%p Dropping unknown message of type %u and size %u.\n",
+                ch, type, size);
   }
 }
 
 
 /**
- * Send a request received from multicast to a client.
+ * Incoming message fragment from multicast for a master.
  */
-static int
-request_to_client (void *cls, const struct GNUNET_HashCode *chan_key_hash,
-                   void *chan)
+static void
+master_message_cb (void *cls, const struct GNUNET_MessageHeader *msg)
 {
-  /* TODO */
+  struct Master *mst = cls;
+  GNUNET_assert (NULL != mst);
 
-  return GNUNET_YES;
+  struct GNUNET_CRYPTO_EddsaPublicKey *chan_key = &mst->pub_key;
+  struct GNUNET_HashCode *chan_key_hash = &mst->pub_key_hash;
+
+  message_cb (&mst->channel, chan_key, chan_key_hash, msg);
 }
 
 
+/**
+ * Incoming message fragment from multicast for a slave.
+ */
 static void
+slave_message_cb (void *cls, const struct GNUNET_MessageHeader *msg)
+{
+  struct Slave *slv = cls;
+  GNUNET_assert (NULL != slv);
+
+  struct GNUNET_CRYPTO_EddsaPublicKey *chan_key = &slv->chan_key;
+  struct GNUNET_HashCode *chan_key_hash = &slv->chan_key_hash;
+
+  message_cb (&slv->channel, chan_key, chan_key_hash, msg);
+}
+
+
+/**
+ * Incoming request fragment from multicast for a master.
+ *
+ * @param cls          Master.
+ * @param member_key   Sending member's public key.
+ * @param msg          The message.
+ * @param flags                Request flags.
+ */
+static void
 request_cb (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *member_key,
-            const struct GNUNET_MessageHeader *req,
+            const struct GNUNET_MessageHeader *msg,
             enum GNUNET_MULTICAST_MessageFlags flags)
 {
+  struct Master *mst = cls;
+  struct Channel *ch = &mst->channel;
 
+  uint16_t type = ntohs (msg->type);
+  uint16_t size = ntohs (msg->size);
+
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "%p Received request of type %u and size %u from multicast.\n",
+              ch, type, size);
+
+  switch (type)
+  {
+  case GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST:
+  {
+    const struct GNUNET_MULTICAST_RequestHeader *req
+      = (const struct GNUNET_MULTICAST_RequestHeader *) msg;
+
+    if (GNUNET_YES != GNUNET_PSYC_check_message_parts (size - sizeof (*req),
+                                                       (const char *) &req[1]))
+    {
+      GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+                  "%p Dropping message with invalid parts "
+                  "received from multicast.\n", ch);
+      GNUNET_break_op (0);
+      break;
+    }
+
+    struct GNUNET_PSYC_MessageHeader *pmsg;
+    uint16_t psize = sizeof (*pmsg) + size - sizeof (*req);
+    pmsg = GNUNET_malloc (psize);
+    pmsg->header.size = htons (psize);
+    pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
+    pmsg->message_id = req->request_id;
+    pmsg->flags = htonl (GNUNET_PSYC_MESSAGE_REQUEST);
+
+    memcpy (&pmsg[1], &req[1], size - sizeof (*req));
+
+    GNUNET_SERVER_notification_context_add (nc, ch->client);
+    GNUNET_SERVER_notification_context_unicast (nc, ch->client,
+                                                (const struct 
GNUNET_MessageHeader *) pmsg,
+                                                GNUNET_NO);
+    GNUNET_free (pmsg);
+    break;
+  }
+  default:
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "%p Dropping unknown request of type %u and size %u.\n",
+                ch, type, size);
+    GNUNET_break_op (0);
+  }
 }
 
 
@@ -470,7 +519,8 @@
                                        max_fragment_id + 1,
                                        join_cb, membership_test_cb,
                                        replay_fragment_cb, replay_message_cb,
-                                       request_cb, message_cb, ch);
+                                       request_cb, master_message_cb, ch);
+    ch->ready = GNUNET_YES;
   }
   GNUNET_SERVER_notification_context_add (nc, ch->client);
   GNUNET_SERVER_notification_context_unicast (nc, ch->client, &res->header,
@@ -505,7 +555,8 @@
                                       slv->join_req, join_cb,
                                       membership_test_cb,
                                       replay_fragment_cb, replay_message_cb,
-                                      message_cb, ch);
+                                      slave_message_cb, ch);
+    ch->ready = GNUNET_YES;
   }
 
   GNUNET_SERVER_notification_context_add (nc, ch->client);
@@ -529,9 +580,11 @@
   mst->channel.is_master = GNUNET_YES;
   mst->policy = ntohl (req->policy);
   mst->priv_key = req->channel_key;
-  GNUNET_CRYPTO_eddsa_key_get_public (&mst->priv_key,
-                                                  &mst->pub_key);
+  GNUNET_CRYPTO_eddsa_key_get_public (&mst->priv_key, &mst->pub_key);
   GNUNET_CRYPTO_hash (&mst->pub_key, sizeof (mst->pub_key), 
&mst->pub_key_hash);
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "%p Master connected to channel %s.\n",
+              mst, GNUNET_h2s (&mst->pub_key_hash));
 
   GNUNET_PSYCSTORE_counters_get (store, &mst->pub_key,
                                  master_counters_cb, mst);
@@ -561,14 +614,20 @@
                       &slv->chan_key_hash);
   slv->origin = req->origin;
   slv->relay_count = ntohl (req->relay_count);
+  if (0 < slv->relay_count)
+  {
+    const struct GNUNET_PeerIdentity *relays
+      = (const struct GNUNET_PeerIdentity *) &req[1];
+    slv->relays
+      = GNUNET_malloc (slv->relay_count * sizeof (struct GNUNET_PeerIdentity));
+    uint32_t i;
+    for (i = 0; i < slv->relay_count; i++)
+      memcpy (&slv->relays[i], &relays[i], sizeof (*relays));
+  }
 
-  const struct GNUNET_PeerIdentity *relays
-    = (const struct GNUNET_PeerIdentity *) &req[1];
-  slv->relays
-    = GNUNET_malloc (slv->relay_count * sizeof (struct GNUNET_PeerIdentity));
-  uint32_t i;
-  for (i = 0; i < slv->relay_count; i++)
-    memcpy (&slv->relays[i], &relays[i], sizeof (*relays));
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "%p Slave connected to channel %s.\n",
+              slv, GNUNET_h2s (&slv->chan_key_hash));
 
   GNUNET_PSYCSTORE_counters_get (store, &slv->chan_key,
                                  slave_counters_cb, slv);
@@ -609,13 +668,14 @@
 
   if (NULL == tmit_msg || *data_size < tmit_msg->size)
   {
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "transmit_notify: nothing to 
send.\n");
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "%p transmit_notify: nothing to send.\n", ch);
     *data_size = 0;
     return GNUNET_NO;
   }
 
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "transmit_notify: sending %u bytes.\n", tmit_msg->size);
+              "%p transmit_notify: sending %u bytes.\n", ch, tmit_msg->size);
 
   *data_size = tmit_msg->size;
   memcpy (data, tmit_msg->buf, *data_size);
@@ -630,7 +690,7 @@
   {
     if (NULL != ch->tmit_head)
     {
-      transmit_message (ch);
+      transmit_message (ch, GNUNET_NO);
     }
     else if (ch->disconnected)
     {
@@ -644,19 +704,55 @@
 
 
 /**
+ * Callback for the transmit functions of multicast.
+ */
+static int
+master_transmit_notify (void *cls, size_t *data_size, void *data)
+{
+  int ret = transmit_notify (cls, data_size, data);
+
+  if (GNUNET_YES == ret)
+  {
+    struct Master *mst = cls;
+    mst->tmit_handle = NULL;
+  }
+  return ret;
+}
+
+
+/**
+ * Callback for the transmit functions of multicast.
+ */
+static int
+slave_transmit_notify (void *cls, size_t *data_size, void *data)
+{
+  int ret = transmit_notify (cls, data_size, data);
+
+  if (GNUNET_YES == ret)
+  {
+    struct Slave *slv = cls;
+    slv->tmit_handle = NULL;
+  }
+  return ret;
+}
+
+
+/**
  * Transmit a message from a channel master to the multicast group.
  */
 static void
-master_transmit_message (struct Master *mst)
+master_transmit_message (struct Master *mst, uint8_t inc_msg_id)
 {
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "master_transmit_message()\n");
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p master_transmit_message()\n", mst);
   mst->channel.tmit_task = 0;
   if (NULL == mst->tmit_handle)
   {
+    if (GNUNET_YES == inc_msg_id)
+      mst->max_message_id++;
     mst->tmit_handle
-      = GNUNET_MULTICAST_origin_to_all (mst->origin, ++mst->max_message_id,
+      = GNUNET_MULTICAST_origin_to_all (mst->origin, mst->max_message_id,
                                         mst->max_group_generation,
-                                        transmit_notify, mst);
+                                        master_transmit_notify, mst);
   }
   else
   {
@@ -669,14 +765,16 @@
  * Transmit a message from a channel slave to the multicast group.
  */
 static void
-slave_transmit_message (struct Slave *slv)
+slave_transmit_message (struct Slave *slv, uint8_t inc_msg_id)
 {
   slv->channel.tmit_task = 0;
   if (NULL == slv->tmit_handle)
   {
+    if (GNUNET_YES == inc_msg_id)
+      slv->max_message_id++;
     slv->tmit_handle
-      = GNUNET_MULTICAST_member_to_origin(slv->member, ++slv->max_request_id,
-                                          transmit_notify, slv);
+      = GNUNET_MULTICAST_member_to_origin (slv->member, slv->max_request_id,
+                                           slave_transmit_notify, slv);
   }
   else
   {
@@ -686,11 +784,11 @@
 
 
 static inline void
-transmit_message (struct Channel *ch)
+transmit_message (struct Channel *ch, uint8_t inc_msg_id)
 {
   ch->is_master
-    ? master_transmit_message ((struct Master *) ch)
-    : slave_transmit_message ((struct Slave *) ch);
+    ? master_transmit_message ((struct Master *) ch, inc_msg_id)
+    : slave_transmit_message ((struct Slave *) ch, inc_msg_id);
 }
 
 
@@ -708,10 +806,9 @@
   tmit_msg->size = sizeof (*msg);
   tmit_msg->state = ch->tmit_state;
   GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, tmit_msg);
-  transmit_message (ch);
+  transmit_message (ch, GNUNET_NO);
 
   /* FIXME: cleanup */
-  GNUNET_SERVER_client_disconnect (ch->client);
 }
 
 
@@ -720,40 +817,60 @@
  */
 static void
 handle_psyc_message (void *cls, struct GNUNET_SERVER_Client *client,
-                         const struct GNUNET_MessageHeader *msg)
+                     const struct GNUNET_MessageHeader *msg)
 {
   struct Channel *ch
     = GNUNET_SERVER_client_get_user_context (client, struct Channel);
   GNUNET_assert (NULL != ch);
 
+  if (GNUNET_YES != ch->ready)
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+                "%p Ignoring message from client, channel is not ready yet.\n",
+                ch);
+    GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
+    return;
+  }
+
+  uint8_t inc_msg_id = GNUNET_NO;
   uint16_t size = ntohs (msg->size);
-  uint16_t psize = 0, pos = 0;
+  uint16_t psize = 0, ptype = 0, pos = 0;
 
   if (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < size - sizeof (*msg))
   {
-    GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Message payload too large\n");
+    GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "%p Message payload too large\n", ch);
     GNUNET_break (0);
     transmit_error (ch);
+    GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
     return;
   }
 
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "%p Received message from client.\n", ch);
+  GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, msg);
+
   for (pos = 0; sizeof (*msg) + pos < size; pos += psize)
   {
     const struct GNUNET_MessageHeader *pmsg
       = (const struct GNUNET_MessageHeader *) ((char *) &msg[1] + pos);
     psize = ntohs (pmsg->size);
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "Received message part of type %u and size %u "
-                "from client.\n", ntohs (pmsg->type), psize);
+    ptype = ntohs (pmsg->type);
     if (psize < sizeof (*pmsg) || sizeof (*msg) + pos + psize > size)
     {
       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
-                  "Received invalid message part of type %u and size %u "
-                  "from client.\n", ntohs (pmsg->type), psize);
+                  "%p Received invalid message part of type %u and size %u "
+                  "from client.\n", ch, ptype, psize);
       GNUNET_break (0);
       transmit_error (ch);
+      GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
       return;
     }
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "%p Received message part from client.\n", ch);
+    GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, pmsg);
+
+    if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == ptype)
+      inc_msg_id = GNUNET_YES;
   }
 
   size -= sizeof (*msg);
@@ -763,7 +880,7 @@
   tmit_msg->size = size;
   tmit_msg->state = ch->tmit_state;
   GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, tmit_msg);
-  transmit_message (ch);
+  transmit_message (ch, inc_msg_id);
 
   GNUNET_SERVER_receive_done (client, GNUNET_OK);
 };

Modified: gnunet/src/psyc/psyc.h
===================================================================
--- gnunet/src/psyc/psyc.h      2014-03-06 23:46:42 UTC (rev 32574)
+++ gnunet/src/psyc/psyc.h      2014-03-06 23:46:45 UTC (rev 32575)
@@ -27,9 +27,18 @@
 #ifndef PSYC_H
 #define PSYC_H
 
-#include "gnunet_common.h"
+#include "platform.h"
+#include "gnunet_psyc_service.h"
 
 
+int
+GNUNET_PSYC_check_message_parts (uint16_t data_size, const char *data);
+
+void
+GNUNET_PSYC_log_message (enum GNUNET_ErrorType kind,
+                         const struct GNUNET_MessageHeader *msg);
+
+
 enum MessageState
 {
   MSG_STATE_START = 0,

Modified: gnunet/src/psyc/psyc_api.c
===================================================================
--- gnunet/src/psyc/psyc_api.c  2014-03-06 23:46:42 UTC (rev 32574)
+++ gnunet/src/psyc/psyc_api.c  2014-03-06 23:46:45 UTC (rev 32575)
@@ -48,12 +48,30 @@
   struct GNUNET_MessageHeader *msg;
 };
 
+
 /**
+ * Handle for a pending PSYC transmission operation.
+ */
+struct GNUNET_PSYC_ChannelTransmitHandle
+{
+  struct GNUNET_PSYC_Channel *ch;
+  GNUNET_PSYC_TransmitNotifyModifier notify_mod;
+  GNUNET_PSYC_TransmitNotifyData notify_data;
+  void *notify_cls;
+  enum MessageState state;
+};
+
+/**
  * Handle to access PSYC channel operations for both the master and slaves.
  */
 struct GNUNET_PSYC_Channel
 {
   /**
+   * Transmission handle;
+   */
+  struct GNUNET_PSYC_ChannelTransmitHandle tmit;
+
+  /**
    * Configuration to use.
    */
   const struct GNUNET_CONFIGURATION_Handle *cfg;
@@ -124,6 +142,11 @@
   uint64_t recv_message_id;
 
   /**
+   * Public key of the slave from which a message is being received.
+   */
+  struct GNUNET_CRYPTO_EddsaPublicKey recv_slave_key;
+
+  /**
    * State of the currently being received message from the PSYC service.
    */
   enum MessageState recv_state;
@@ -171,27 +194,12 @@
 
 
 /**
- * Handle for a pending PSYC transmission operation.
- */
-struct GNUNET_PSYC_MasterTransmitHandle
-{
-  struct GNUNET_PSYC_Master *master;
-  GNUNET_PSYC_MasterTransmitNotifyModifier notify_mod;
-  GNUNET_PSYC_MasterTransmitNotify notify_data;
-  void *notify_cls;
-  enum MessageState state;
-};
-
-
-/**
  * Handle for the master of a PSYC channel.
  */
 struct GNUNET_PSYC_Master
 {
   struct GNUNET_PSYC_Channel ch;
 
-  struct GNUNET_PSYC_MasterTransmitHandle *tmit;
-
   GNUNET_PSYC_MasterStartCallback start_cb;
 
   uint64_t max_message_id;
@@ -204,6 +212,10 @@
 struct GNUNET_PSYC_Slave
 {
   struct GNUNET_PSYC_Channel ch;
+
+  GNUNET_PSYC_SlaveJoinCallback join_cb;
+
+  uint64_t max_message_id;
 };
 
 
@@ -251,7 +263,7 @@
 
 
 static void
-master_transmit_data (struct GNUNET_PSYC_Master *mst);
+channel_transmit_data (struct GNUNET_PSYC_Channel *ch);
 
 
 /**
@@ -302,7 +314,8 @@
   ch->recv_state = MSG_STATE_START;
   ch->recv_flags = 0;
   ch->recv_message_id = 0;
-  ch->recv_mod_value_size =0;
+  //FIXME: ch->recv_slave_key = { 0 };
+  ch->recv_mod_value_size = 0;
   ch->recv_mod_value_size_expected = 0;
 }
 
@@ -379,8 +392,9 @@
   }
  
   if (NULL != op
-      && (end || (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD
-                  < op->msg->size + sizeof (struct GNUNET_MessageHeader))))
+      && (GNUNET_YES == end
+          || (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD
+              < op->msg->size + sizeof (struct GNUNET_MessageHeader))))
   {
     /* End of message or buffer is full, add it to transmission queue. */
     op->msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
@@ -390,6 +404,9 @@
     ch->tmit_ack_pending++;
   }
 
+  if (GNUNET_YES == end)
+    ch->in_transmit = GNUNET_NO;
+
   transmit_next (ch);
 }
 
@@ -400,15 +417,14 @@
  * @param mst Master handle.
  */
 static void
-master_transmit_mod (struct GNUNET_PSYC_Master *mst)
+channel_transmit_mod (struct GNUNET_PSYC_Channel *ch)
 {
-  struct GNUNET_PSYC_Channel *ch = &mst->ch;
   uint16_t max_data_size, data_size;
   char data[GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD] = "";
   struct GNUNET_MessageHeader *msg = (struct GNUNET_MessageHeader *) data;
   int notify_ret;
 
-  switch (mst->tmit->state)
+  switch (ch->tmit.state)
   {
   case MSG_STATE_MODIFIER:
   {
@@ -417,12 +433,11 @@
     max_data_size = data_size = GNUNET_PSYC_MODIFIER_MAX_PAYLOAD;
     msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER);
     msg->size = sizeof (struct GNUNET_PSYC_MessageModifier);
-    notify_ret = mst->tmit->notify_mod (mst->tmit->notify_cls,
-                                        &data_size, &mod[1], &mod->oper);
+    notify_ret = ch->tmit.notify_mod (ch->tmit.notify_cls,
+                                      &data_size, &mod[1], &mod->oper);
     mod->name_size = strnlen ((char *) &mod[1], data_size);
     if (mod->name_size < data_size)
     {
-      mod->oper = htons (mod->oper);
       mod->value_size = htons (data_size - 1 - mod->name_size);
       mod->name_size = htons (mod->name_size);
     }
@@ -438,8 +453,8 @@
     max_data_size = data_size = GNUNET_PSYC_MOD_CONT_MAX_PAYLOAD;
     msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT);    
     msg->size = sizeof (struct GNUNET_MessageHeader);
-    notify_ret = mst->tmit->notify_mod (mst->tmit->notify_cls,
-                                        &data_size, &msg[1], NULL);
+    notify_ret = ch->tmit.notify_mod (ch->tmit.notify_cls,
+                                      &data_size, &msg[1], NULL);
     break;
   }
   default:
@@ -454,27 +469,28 @@
       ch->tmit_paused = GNUNET_YES;
       return;
     }
-    mst->tmit->state = MSG_STATE_MOD_CONT;
+    ch->tmit.state = MSG_STATE_MOD_CONT;
     break;
 
   case GNUNET_YES:
     if (0 == data_size)
     {
       /* End of modifiers. */
-      mst->tmit->state = MSG_STATE_DATA;
+      ch->tmit.state = MSG_STATE_DATA;
       if (0 == ch->tmit_ack_pending)
-        master_transmit_data (mst);
+        channel_transmit_data (ch);
 
       return;
     }
-    mst->tmit->state = MSG_STATE_MODIFIER;
+    ch->tmit.state = MSG_STATE_MODIFIER;
     break;
 
   default:
     LOG (GNUNET_ERROR_TYPE_ERROR,
-         "MasterTransmitNotify returned error when requesting a modifier.\n");
+         "MasterTransmitNotifyModifier returned error "
+         "when requesting a modifier.\n");
 
-    mst->tmit->state = MSG_STATE_START;
+    ch->tmit.state = MSG_STATE_CANCEL;
     msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL);
     msg->size = htons (sizeof (*msg));
 
@@ -489,7 +505,7 @@
     queue_message (ch, msg, GNUNET_NO);
   }
 
-  master_transmit_mod (mst);
+  channel_transmit_mod (ch);
 }
 
 
@@ -499,17 +515,16 @@
  * @param mst Master handle.
  */
 static void
-master_transmit_data (struct GNUNET_PSYC_Master *mst)
+channel_transmit_data (struct GNUNET_PSYC_Channel *ch)
 {
-  struct GNUNET_PSYC_Channel *ch = &mst->ch;
   uint16_t data_size = GNUNET_PSYC_DATA_MAX_PAYLOAD;
   char data[GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD] = "";
   struct GNUNET_MessageHeader *msg = (struct GNUNET_MessageHeader *) data;
 
   msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA);
 
-  int notify_ret = mst->tmit->notify_data (mst->tmit->notify_cls,
-                                           &data_size, &msg[1]);
+  int notify_ret = ch->tmit.notify_data (ch->tmit.notify_cls,
+                                         &data_size, &msg[1]);
   switch (notify_ret)
   {
   case GNUNET_NO:
@@ -522,14 +537,14 @@
     break;
 
   case GNUNET_YES:
-    mst->tmit->state = MSG_STATE_START;
+    ch->tmit.state = MSG_STATE_END;
     break;
 
   default:
     LOG (GNUNET_ERROR_TYPE_ERROR,
          "MasterTransmitNotify returned error when requesting data.\n");
 
-    mst->tmit->state = MSG_STATE_START;
+    ch->tmit.state = MSG_STATE_CANCEL;
     msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL);
     msg->size = htons (sizeof (*msg));
     queue_message (ch, msg, GNUNET_YES);
@@ -554,6 +569,86 @@
 
 
 /**
+ * Send a message to a channel.
+ *
+ * @param ch Handle to the PSYC channel.
+ * @param method_name Which method should be invoked.
+ * @param notify_mod Function to call to obtain modifiers.
+ * @param notify_data Function to call to obtain fragments of the data.
+ * @param notify_cls Closure for @a notify_mod and @a notify_data.
+ * @param flags Flags for the message being transmitted.
+ * @return Transmission handle, NULL on error (i.e. more than one request 
queued).
+ */
+static struct GNUNET_PSYC_ChannelTransmitHandle *
+channel_transmit (struct GNUNET_PSYC_Channel *ch,
+                  const char *method_name,
+                  GNUNET_PSYC_TransmitNotifyModifier notify_mod,
+                  GNUNET_PSYC_TransmitNotifyData notify_data,
+                  void *notify_cls,
+                  uint32_t flags)
+{
+  if (GNUNET_NO != ch->in_transmit)
+    return NULL;
+  ch->in_transmit = GNUNET_YES;
+
+  size_t size = strlen (method_name) + 1;
+  struct GNUNET_PSYC_MessageMethod *pmeth;
+  struct OperationHandle *op;
+
+  ch->tmit_msg = op = GNUNET_malloc (sizeof (*op) + sizeof (*op->msg)
+                                     + sizeof (*pmeth) + size);
+  op->msg = (struct GNUNET_MessageHeader *) &op[1];
+  op->msg->size = sizeof (*op->msg) + sizeof (*pmeth) + size;
+
+  pmeth = (struct GNUNET_PSYC_MessageMethod *) &op->msg[1];
+  pmeth->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD);
+  pmeth->header.size = htons (sizeof (*pmeth) + size);
+  pmeth->flags = htonl (flags);
+  memcpy (&pmeth[1], method_name, size);
+
+  ch->tmit.ch = ch;
+  ch->tmit.notify_mod = notify_mod;
+  ch->tmit.notify_data = notify_data;
+  ch->tmit.notify_cls = notify_cls;
+  ch->tmit.state = MSG_STATE_MODIFIER;
+
+  channel_transmit_mod (ch);
+  return &ch->tmit;
+}
+
+
+/**
+ * Resume transmission to the channel.
+ *
+ * @param th Handle of the request that is being resumed.
+ */
+static void
+channel_transmit_resume (struct GNUNET_PSYC_ChannelTransmitHandle *th)
+{
+  struct GNUNET_PSYC_Channel *ch = th->ch;
+  if (0 == ch->tmit_ack_pending)
+  {
+    ch->tmit_paused = GNUNET_NO;
+    channel_transmit_data (ch);
+  }
+}
+
+
+/**
+ * Abort transmission request to channel.
+ *
+ * @param th Handle of the request that is being aborted.
+ */
+static void
+channel_transmit_cancel (struct GNUNET_PSYC_ChannelTransmitHandle *th)
+{
+  struct GNUNET_PSYC_Channel *ch = th->ch;
+  if (GNUNET_NO == ch->in_transmit)
+    return;
+}
+
+
+/**
  * Handle incoming message from the PSYC service.
  *
  * @param ch The channel the message is sent to.
@@ -564,14 +659,20 @@
                      const struct GNUNET_PSYC_MessageHeader *msg)
 {
   uint16_t size = ntohs (msg->header.size);
+  uint32_t flags = ntohl (msg->flags);
 
+  GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG,
+                           (struct GNUNET_MessageHeader *) msg);
+
   if (MSG_STATE_START == ch->recv_state)
   {
     ch->recv_message_id = GNUNET_ntohll (msg->message_id);
-    ch->recv_flags = ntohl (msg->flags);
+    ch->recv_flags = flags;
+    ch->recv_slave_key = msg->slave_key;
   }
   else if (GNUNET_ntohll (msg->message_id) != ch->recv_message_id)
   {
+    // FIXME
     LOG (GNUNET_ERROR_TYPE_WARNING,
          "Unexpected message ID. Got: %" PRIu64 ", expected: %" PRIu64 "\n",
          GNUNET_ntohll (msg->message_id), ch->recv_message_id);
@@ -579,11 +680,11 @@
     recv_error (ch);
     return;
   }
-  else if (ntohl (msg->flags) != ch->recv_flags)
+  else if (flags != ch->recv_flags)
   {
     LOG (GNUNET_ERROR_TYPE_WARNING,
          "Unexpected message flags. Got: %lu, expected: %lu\n",
-         ntohl (msg->flags), ch->recv_flags);
+         flags, ch->recv_flags);
     GNUNET_break_op (0);
     recv_error (ch);
     return;
@@ -599,10 +700,6 @@
     ptype = ntohs (pmsg->type);
     size_eq = size_min = 0;
 
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "Received message part of type %u and size %u from PSYC.\n",
-                ptype, psize);
-
     if (psize < sizeof (*pmsg) || sizeof (*msg) + pos + psize > size)
     {
       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
@@ -612,6 +709,10 @@
       return;
     }
 
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Received message part from PSYC.\n");
+    GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, pmsg);
+
     switch (ptype)
     {
     case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
@@ -758,6 +859,46 @@
 
 
 /**
+ * Handle incoming message acknowledgement from the PSYC service.
+ *
+ * @param ch The channel the acknowledgement is sent to.
+ */
+static void
+handle_psyc_message_ack (struct GNUNET_PSYC_Channel *ch)
+{
+  if (0 == ch->tmit_ack_pending)
+  {
+    LOG (GNUNET_ERROR_TYPE_WARNING, "Ignoring extraneous message ACK\n");
+    GNUNET_break (0);
+    return;
+  }
+  ch->tmit_ack_pending--;
+
+  switch (ch->tmit.state)
+  {
+  case MSG_STATE_MODIFIER:
+  case MSG_STATE_MOD_CONT:
+    if (GNUNET_NO == ch->tmit_paused)
+      channel_transmit_mod (ch);
+    break;
+
+  case MSG_STATE_DATA:
+    if (GNUNET_NO == ch->tmit_paused)
+      channel_transmit_data (ch);
+    break;
+
+  case MSG_STATE_END:
+  case MSG_STATE_CANCEL:
+    break;
+
+  default:
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "Ignoring message ACK in state %u.\n", ch->tmit.state);
+  }
+}
+
+
+/**
  * Type of a function to call when we receive a message
  * from the service.
  *
@@ -775,7 +916,7 @@
 
   if (NULL == msg)
   {
-    GNUNET_break (0);
+    // timeout / disconnected from server, reconnect
     reschedule_connect (ch);
     return;
   }
@@ -824,63 +965,15 @@
   }
   case GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK:
   {
-#if TODO
     struct CountersResult *cres = (struct CountersResult *) msg;
     slv->max_message_id = GNUNET_ntohll (cres->max_message_id);
-    if (NULL != slv->join_ack_cb)
-      mst->join_ack_cb (ch->cb_cls, mst->max_message_id);
-#endif
+    if (NULL != slv->join_cb)
+      slv->join_cb (ch->cb_cls, slv->max_message_id);
     break;
   }
   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK:
   {
-    if (0 == ch->tmit_ack_pending)
-    {
-      LOG (GNUNET_ERROR_TYPE_WARNING, "Ignoring extraneous message ACK\n");
-      GNUNET_break (0);
-      break;
-    }
-    ch->tmit_ack_pending--;
-
-    if (ch->is_master)
-    {
-      GNUNET_assert (NULL != mst->tmit);
-      switch (mst->tmit->state)
-      {
-      case MSG_STATE_MODIFIER:
-      case MSG_STATE_MOD_CONT:
-        if (GNUNET_NO == ch->tmit_paused)
-          master_transmit_mod (mst);
-        break;
-
-      case MSG_STATE_DATA:
-        if (GNUNET_NO == ch->tmit_paused)
-          master_transmit_data (mst);
-        break;
-
-      case MSG_STATE_END:
-      case MSG_STATE_CANCEL:
-        if (NULL != mst->tmit)
-        {
-          GNUNET_free (mst->tmit);
-          mst->tmit = NULL;
-        }
-        else
-        {
-          LOG (GNUNET_ERROR_TYPE_WARNING,
-               "Ignoring message ACK, there's no transmission going on.\n");
-          GNUNET_break (0);
-        }
-        break;
-      default:
-        LOG (GNUNET_ERROR_TYPE_DEBUG,
-             "Ignoring message ACK in state %u.\n", mst->tmit->state);
-      }
-    }
-    else
-    {
-      /* TODO: slave */
-    }
+    handle_psyc_message_ack (ch);
     break;
   }
 
@@ -1106,8 +1199,6 @@
 GNUNET_PSYC_master_stop (struct GNUNET_PSYC_Master *master)
 {
   disconnect (master);
-  if (NULL != master->tmit)
-    GNUNET_free (master->tmit);
   GNUNET_free (master);
 }
 
@@ -1162,41 +1253,14 @@
 struct GNUNET_PSYC_MasterTransmitHandle *
 GNUNET_PSYC_master_transmit (struct GNUNET_PSYC_Master *master,
                              const char *method_name,
-                             GNUNET_PSYC_MasterTransmitNotifyModifier 
notify_mod,
-                             GNUNET_PSYC_MasterTransmitNotify notify_data,
+                             GNUNET_PSYC_TransmitNotifyModifier notify_mod,
+                             GNUNET_PSYC_TransmitNotifyData notify_data,
                              void *notify_cls,
                              enum GNUNET_PSYC_MasterTransmitFlags flags)
 {
-  GNUNET_assert (NULL != master);
-  struct GNUNET_PSYC_Channel *ch = &master->ch;
-  if (GNUNET_NO != ch->in_transmit)
-    return NULL;
-  ch->in_transmit = GNUNET_YES;
-
-  size_t size = strlen (method_name) + 1;
-  struct GNUNET_PSYC_MessageMethod *pmeth;
-  struct OperationHandle *op;
-
-  ch->tmit_msg = op = GNUNET_malloc (sizeof (*op) + sizeof (*op->msg)
-                                     + sizeof (*pmeth) + size);
-  op->msg = (struct GNUNET_MessageHeader *) &op[1];
-  op->msg->size = sizeof (*op->msg) + sizeof (*pmeth) + size;
-
-  pmeth = (struct GNUNET_PSYC_MessageMethod *) &op->msg[1];
-  pmeth->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD);
-  pmeth->header.size = htons (sizeof (*pmeth) + size);
-  pmeth->flags = htonl (flags);
-  memcpy (&pmeth[1], method_name, size);
-
-  master->tmit = GNUNET_malloc (sizeof (*master->tmit));
-  master->tmit->master = master;
-  master->tmit->notify_mod = notify_mod;
-  master->tmit->notify_data = notify_data;
-  master->tmit->notify_cls = notify_cls;
-  master->tmit->state = MSG_STATE_MODIFIER;
-
-  master_transmit_mod (master);
-  return master->tmit;
+  return (struct GNUNET_PSYC_MasterTransmitHandle *)
+    channel_transmit (&master->ch, method_name, notify_mod, notify_data,
+                      notify_cls, flags);
 }
 
 
@@ -1208,12 +1272,7 @@
 void
 GNUNET_PSYC_master_transmit_resume (struct GNUNET_PSYC_MasterTransmitHandle 
*th)
 {
-  struct GNUNET_PSYC_Channel *ch = &th->master->ch;
-  if (0 == ch->tmit_ack_pending)
-  {
-    ch->tmit_paused = GNUNET_NO;
-    master_transmit_data (th->master);
-  }
+  channel_transmit_resume ((struct GNUNET_PSYC_ChannelTransmitHandle *) th);
 }
 
 
@@ -1225,10 +1284,7 @@
 void
 GNUNET_PSYC_master_transmit_cancel (struct GNUNET_PSYC_MasterTransmitHandle 
*th)
 {
-  struct GNUNET_PSYC_Master *master = th->master;
-  struct GNUNET_PSYC_Channel *ch = &master->ch;
-  if (GNUNET_NO != ch->in_transmit)
-    return;
+  channel_transmit_cancel ((struct GNUNET_PSYC_ChannelTransmitHandle *) th);
 }
 
 
@@ -1282,15 +1338,15 @@
 {
   struct GNUNET_PSYC_Slave *slv = GNUNET_malloc (sizeof (*slv));
   struct GNUNET_PSYC_Channel *ch = &slv->ch;
-  struct SlaveJoinRequest *req = GNUNET_malloc (sizeof (*req)
-                                                + relay_count * sizeof 
(*relays));
+  struct SlaveJoinRequest *req
+    = GNUNET_malloc (sizeof (*req) + relay_count * sizeof (*relays));
   req->header.size = htons (sizeof (*req)
                             + relay_count * sizeof (*relays));
   req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN);
   req->channel_key = *channel_key;
   req->slave_key = *slave_key;
   req->origin = *origin;
-  req->relay_count = relay_count;
+  req->relay_count = htonl (relay_count);
   memcpy (&req[1], relays, relay_count * sizeof (*relays));
 
   ch->message_cb = message_cb;
@@ -1303,6 +1359,7 @@
   ch->reconnect_delay = GNUNET_TIME_UNIT_ZERO;
   ch->reconnect_task = GNUNET_SCHEDULER_add_now (&reconnect, slv);
 
+  slv->join_cb = slave_joined_cb;
   return slv;
 }
 
@@ -1328,9 +1385,8 @@
  *
  * @param slave Slave handle.
  * @param method_name Which (PSYC) method should be invoked (on host).
- * @param env Environment containing transient variables for the message, or
- *            NULL.
- * @param notify Function to call when we are allowed to transmit (to get 
data).
+ * @param notify_mod Function to call to obtain modifiers.
+ * @param notify_data Function to call to obtain fragments of the data.
  * @param notify_cls Closure for @a notify.
  * @param flags Flags for the message being transmitted.
  * @return Transmission handle, NULL on error (i.e. more than one request
@@ -1339,12 +1395,14 @@
 struct GNUNET_PSYC_SlaveTransmitHandle *
 GNUNET_PSYC_slave_transmit (struct GNUNET_PSYC_Slave *slave,
                             const char *method_name,
-                            const struct GNUNET_ENV_Environment *env,
-                            GNUNET_PSYC_SlaveTransmitNotify notify,
+                            GNUNET_PSYC_TransmitNotifyModifier notify_mod,
+                            GNUNET_PSYC_TransmitNotifyData notify_data,
                             void *notify_cls,
                             enum GNUNET_PSYC_SlaveTransmitFlags flags)
 {
-  return NULL;
+  return (struct GNUNET_PSYC_SlaveTransmitHandle *)
+    channel_transmit (&slave->ch, method_name,
+                      notify_mod, notify_data, notify_cls, flags);
 }
 
 
@@ -1356,7 +1414,7 @@
 void
 GNUNET_PSYC_slave_transmit_resume (struct GNUNET_PSYC_MasterTransmitHandle *th)
 {
-
+  channel_transmit_resume ((struct GNUNET_PSYC_ChannelTransmitHandle *) th);
 }
 
 
@@ -1368,7 +1426,7 @@
 void
 GNUNET_PSYC_slave_transmit_cancel (struct GNUNET_PSYC_SlaveTransmitHandle *th)
 {
-
+  channel_transmit_cancel ((struct GNUNET_PSYC_ChannelTransmitHandle *) th);
 }
 
 
@@ -1382,7 +1440,7 @@
 struct GNUNET_PSYC_Channel *
 GNUNET_PSYC_master_get_channel (struct GNUNET_PSYC_Master *master)
 {
-  return (struct GNUNET_PSYC_Channel *) master;
+  return &master->ch;
 }
 
 
@@ -1395,7 +1453,7 @@
 struct GNUNET_PSYC_Channel *
 GNUNET_PSYC_slave_get_channel (struct GNUNET_PSYC_Slave *slave)
 {
-  return (struct GNUNET_PSYC_Channel *) slave;
+  return &slave->ch;
 }
 
 

Added: gnunet/src/psyc/psyc_common.c
===================================================================
--- gnunet/src/psyc/psyc_common.c                               (rev 0)
+++ gnunet/src/psyc/psyc_common.c       2014-03-06 23:46:45 UTC (rev 32575)
@@ -0,0 +1,100 @@
+/*
+ * This file is part of GNUnet
+ * (C) 2013 Christian Grothoff (and other contributing authors)
+ *
+ * GNUnet is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published
+ * by the Free Software Foundation; either version 3, or (at your
+ * option) any later version.
+ *
+ * GNUnet is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with GNUnet; see the file COPYING.  If not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 02111-1307, USA.
+ */
+
+/**
+ * @file psyc/psyc_common.c
+ * @brief Common functions for PSYC
+ * @author Gabor X Toth
+ */
+
+#include <inttypes.h>
+#include "psyc.h"
+
+/**
+ * Check if @a data contains a series of valid message parts.
+ *
+ * @param data_size  Size of @a data.
+ * @param data       Data.
+ *
+ * @return GNUNET_YES or GNUNET_NO
+ */
+int
+GNUNET_PSYC_check_message_parts (uint16_t data_size, const char *data)
+{
+  const struct GNUNET_MessageHeader *pmsg;
+  uint16_t psize = 0;
+  uint16_t pos = 0;
+
+  for (pos = 0; data_size + pos < data_size; pos += psize)
+  {
+    pmsg = (const struct GNUNET_MessageHeader *) (data + pos);
+    psize = ntohs (pmsg->size);
+    if (psize < sizeof (*pmsg) || data_size + pos + psize > data_size)
+    {
+      GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+                  "Invalid message part of type %u and size %u.",
+                  ntohs (pmsg->type), psize);
+      return GNUNET_NO;
+    }
+  }
+  return GNUNET_YES;
+}
+
+
+void
+GNUNET_PSYC_log_message (enum GNUNET_ErrorType kind,
+                         const struct GNUNET_MessageHeader *msg)
+{
+  uint16_t size = ntohs (msg->size);
+  uint16_t type = ntohs (msg->type);
+  GNUNET_log (kind, "Message of type %d and size %u:\n", type, size);
+  switch (type)
+  {
+  case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE:
+  {
+    struct GNUNET_PSYC_MessageHeader *pmsg
+      = (struct GNUNET_PSYC_MessageHeader *) msg;
+    GNUNET_log (kind, "\tID: %" PRIu64 "\tflags: %" PRIu32 "\n",
+                GNUNET_ntohll (pmsg->message_id), ntohl (pmsg->flags));
+    break;
+  }
+  case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
+  {
+    struct GNUNET_PSYC_MessageMethod *meth
+      = (struct GNUNET_PSYC_MessageMethod *) msg;
+    GNUNET_log (kind, "\t%.*s\n", size - sizeof (*meth), &meth[1]);
+    break;
+  }
+  case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
+  {
+    struct GNUNET_PSYC_MessageModifier *mod
+      = (struct GNUNET_PSYC_MessageModifier *) msg;
+    uint16_t name_size = ntohs (mod->name_size);
+    char oper = ' ' < mod->oper ? mod->oper : ' ';
+    GNUNET_log (kind, "\t%c%.*s\t%.*s\n", oper, name_size, &mod[1],
+                ntohs (mod->value_size), ((char *) &mod[1]) + name_size + 1);
+    break;
+  }
+  case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT:
+  case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
+    GNUNET_log (kind, "\t%.*s\n", size - sizeof (*msg), &msg[1]);
+    break;
+  }
+}

Modified: gnunet/src/psyc/test_psyc.c
===================================================================
--- gnunet/src/psyc/test_psyc.c 2014-03-06 23:46:42 UTC (rev 32574)
+++ gnunet/src/psyc/test_psyc.c 2014-03-06 23:46:45 UTC (rev 32575)
@@ -37,7 +37,7 @@
 
 #define TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 10)
 
-#define DEBUG_SERVICE 0
+#define DEBUG_SERVICE 1
 
 
 /**
@@ -66,7 +66,8 @@
 
 struct TransmitClosure
 {
-  struct GNUNET_PSYC_MasterTransmitHandle *handle;
+  struct GNUNET_PSYC_MasterTransmitHandle *mst_tmit;
+  struct GNUNET_PSYC_SlaveTransmitHandle *slv_tmit;
   struct GNUNET_ENV_Environment *env;
   char *data[16];
   const char *mod_value;
@@ -78,12 +79,30 @@
 
 struct TransmitClosure *tmit;
 
+
+enum
+{
+  TEST_NONE,
+  TEST_SLAVE_TRANSMIT,
+  TEST_MASTER_TRANSMIT,
+} test;
+
+
+static void
+master_transmit ();
+
+
 /**
  * Clean up all resources used.
  */
 static void
 cleanup ()
 {
+  if (NULL != slv)
+  {
+    GNUNET_PSYC_slave_part (slv);
+    slv = NULL;
+  }
   if (NULL != mst)
   {
     GNUNET_PSYC_master_stop (mst);
@@ -133,6 +152,8 @@
 static void
 end ()
 {
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Ending tests.\n");
+
   if (end_badly_task != GNUNET_SCHEDULER_NO_TASK)
   {
     GNUNET_SCHEDULER_cancel (end_badly_task);
@@ -144,8 +165,8 @@
 
 
 static void
-message (void *cls, uint64_t message_id, uint32_t flags,
-         const struct GNUNET_MessageHeader *msg)
+master_message (void *cls, uint64_t message_id, uint32_t flags,
+                const struct GNUNET_MessageHeader *msg)
 {
   if (NULL == msg)
   {
@@ -158,16 +179,68 @@
   uint16_t size = ntohs (msg->size);
 
   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
-              "Got message part of type %u and size %u "
+              "Master got message part of type %u and size %u "
               "belonging to message ID %llu with flags %u\n",
               type, size, message_id, flags);
 
-  if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END == type)
-    end ();
+  switch (test)
+  {
+  case TEST_SLAVE_TRANSMIT:
+    if (GNUNET_PSYC_MESSAGE_REQUEST != flags)
+    {
+      GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+                  "Unexpected request flags: %lu\n", flags);
+      GNUNET_assert (0);
+      return;
+    }
+    // FIXME: check rest of message
+
+    if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END == type)
+      master_transmit ();
+    break;
+
+  case TEST_MASTER_TRANSMIT:
+    break;
+
+  default:
+    GNUNET_assert (0);
+  }
 }
 
 
 static void
+slave_message (void *cls, uint64_t message_id, uint32_t flags,
+               const struct GNUNET_MessageHeader *msg)
+{
+  if (NULL == msg)
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+                "Error while receiving message %llu\n", message_id);
+    return;
+  }
+
+  uint16_t type = ntohs (msg->type);
+  uint16_t size = ntohs (msg->size);
+
+  GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+              "Slave got message part of type %u and size %u "
+              "belonging to message ID %llu with flags %u\n",
+              type, size, message_id, flags);
+
+  switch (test)
+  {
+  case TEST_MASTER_TRANSMIT:
+    if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END == type)
+      end ();
+    break;
+
+  default:
+    GNUNET_assert (0);
+  }
+}
+
+
+static void
 join_request (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key,
               const char *method_name,
               size_t variable_count, const struct GNUNET_ENV_Modifier 
*variables,
@@ -175,7 +248,9 @@
               struct GNUNET_PSYC_JoinHandle *jh)
 {
   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
-              "Got join request.");
+              "Got join request: %s (%zu vars)", method_name, variable_count);
+  GNUNET_PSYC_join_decision (jh, GNUNET_YES, 0, NULL, "_notice_join", NULL,
+                             "you're in", 9);
 }
 
 
@@ -185,7 +260,7 @@
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transmission resumed.\n");
   struct TransmitClosure *tmit = cls;
   tmit->paused = GNUNET_NO;
-  GNUNET_PSYC_master_transmit_resume (tmit->handle);
+  GNUNET_PSYC_master_transmit_resume (tmit->mst_tmit);
 }
 
 
@@ -204,7 +279,35 @@
   uint16_t name_size = 0;
   size_t value_size = 0;
 
-  if (NULL != tmit->mod_value && 0 < tmit->mod_value_size)
+  if (NULL != oper)
+  { /* New modifier */
+    if (GNUNET_NO == GNUNET_ENV_environment_shift (tmit->env, &op, &name,
+                                                   (void *) &value, 
&value_size))
+    { /* No more modifiers, continue with data */
+      *data_size = 0;
+      return GNUNET_YES;
+    }
+
+    *oper = op;
+    name_size = strlen (name);
+
+    if (name_size + 1 + value_size <= *data_size)
+    {
+      *data_size = name_size + 1 + value_size;
+    }
+    else
+    {
+      tmit->mod_value_size = value_size;
+      value_size = *data_size - name_size - 1;
+      tmit->mod_value_size -= value_size;
+      tmit->mod_value = value + value_size;
+    }
+
+    memcpy (data, name, name_size);
+    ((char *)data)[name_size] = '\0';
+    memcpy ((char *)data + name_size + 1, value, value_size);
+  }
+  else if (NULL != tmit->mod_value && 0 < tmit->mod_value_size)
   { /* Modifier continuation */
     value = tmit->mod_value;
     if (tmit->mod_value_size <= *data_size)
@@ -231,35 +334,7 @@
     *data_size = value_size;
     memcpy (data, value, value_size);
   }
-  else if (NULL != oper)
-  {
-    if (GNUNET_NO == GNUNET_ENV_environment_shift (tmit->env, &op, &name,
-                                                   (void *) &value, 
&value_size))
-    { /* No more modifiers, continue with data */
-      *data_size = 0;
-      return GNUNET_YES;
-    }
 
-    *oper = op;
-    name_size = strlen (name);
-
-    if (name_size + 1 + value_size <= *data_size)
-    {
-      *data_size = name_size + 1 + value_size;
-    }
-    else
-    {
-      tmit->mod_value_size = value_size;
-      value_size = *data_size - name_size - 1;
-      tmit->mod_value_size -= value_size;
-      tmit->mod_value = value + value_size;
-    }
-
-    memcpy (data, name, name_size);
-    ((char *)data)[name_size] = '\0';
-    memcpy ((char *)data + name_size + 1, value, value_size);
-  }
-
   return 0 == tmit->mod_value_size ? GNUNET_YES : GNUNET_NO;
 }
 
@@ -268,6 +343,12 @@
 tmit_notify_data (void *cls, uint16_t *data_size, void *data)
 {
   struct TransmitClosure *tmit = cls;
+  if (0 == tmit->data_count)
+  {
+    *data_size = 0;
+    return GNUNET_YES;
+  }
+
   uint16_t size = strlen (tmit->data[tmit->n]);
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Transmit notify data: %lu bytes available, "
@@ -300,32 +381,76 @@
 
 
 static void
-master_started (void *cls, uint64_t max_message_id)
+slave_joined (void *cls, uint64_t max_message_id)
 {
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Master started: %" PRIu64 "\n", max_message_id);
+  GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Slave joined: %lu\n", 
max_message_id);
+  GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Slave sending request to master.\n");
 
+  test = TEST_SLAVE_TRANSMIT;
+
   tmit = GNUNET_new (struct TransmitClosure);
   tmit->env = GNUNET_ENV_environment_create ();
   GNUNET_ENV_environment_add (tmit->env, GNUNET_ENV_OP_ASSIGN,
+                              "_abc", "abc def", 7);
+  GNUNET_ENV_environment_add (tmit->env, GNUNET_ENV_OP_ASSIGN,
+                              "_abc_def", "abc def ghi", 11);
+  tmit->n = 0;
+  tmit->data[0] = "slave test";
+  tmit->data_count = 1;
+  tmit->slv_tmit
+    = GNUNET_PSYC_slave_transmit (slv, "_request_test", tmit_notify_mod,
+                                  tmit_notify_data, tmit,
+                                  GNUNET_PSYC_SLAVE_TRANSMIT_NONE);
+}
+
+static void
+slave_join ()
+{
+  GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Joining slave.\n");
+
+  struct GNUNET_PeerIdentity origin;
+  struct GNUNET_PeerIdentity relays[16];
+  struct GNUNET_ENV_Environment *env = GNUNET_ENV_environment_create ();
+  GNUNET_ENV_environment_add (env, GNUNET_ENV_OP_ASSIGN,
                               "_foo", "bar baz", 7);
+  GNUNET_ENV_environment_add (env, GNUNET_ENV_OP_ASSIGN,
+                              "_foo_bar", "foo bar baz", 11);
+  slv = GNUNET_PSYC_slave_join (cfg, &channel_pub_key, slave_key, &origin,
+                                16, relays, &slave_message, &join_request, 
&slave_joined,
+                                NULL, "_request_join", env, "some data", 9);
+  GNUNET_ENV_environment_destroy (env);
+}
+
+
+static void
+master_transmit ()
+{
+  GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Master sending message to all.\n");
+  test = TEST_MASTER_TRANSMIT;
+
+  tmit = GNUNET_new (struct TransmitClosure);
+  tmit->env = GNUNET_ENV_environment_create ();
   GNUNET_ENV_environment_add (tmit->env, GNUNET_ENV_OP_ASSIGN,
+                              "_foo", "bar baz", 7);
+  GNUNET_ENV_environment_add (tmit->env, GNUNET_ENV_OP_ASSIGN,
                               "_foo_bar", "foo bar baz", 11);
   tmit->data[0] = "foo";
   tmit->data[1] = "foo bar";
   tmit->data[2] = "foo bar baz";
   tmit->data_count = 3;
-  tmit->handle
-    = GNUNET_PSYC_master_transmit (mst, "_test", tmit_notify_mod,
+  tmit->mst_tmit
+    = GNUNET_PSYC_master_transmit (mst, "_notice_test", tmit_notify_mod,
                                    tmit_notify_data, tmit,
                                    GNUNET_PSYC_MASTER_TRANSMIT_INC_GROUP_GEN);
 }
 
 
 static void
-slave_joined (void *cls, uint64_t max_message_id)
+master_started (void *cls, uint64_t max_message_id)
 {
-  GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Slave joined: %lu\n", 
max_message_id);
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Master started: %" PRIu64 "\n", max_message_id);
+  slave_join ();
 }
 
 
@@ -355,21 +480,9 @@
   GNUNET_CRYPTO_eddsa_key_get_public (channel_key, &channel_pub_key);
   GNUNET_CRYPTO_eddsa_key_get_public (slave_key, &slave_pub_key);
 
+  GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Starting master.\n");
   mst = GNUNET_PSYC_master_start (cfg, channel_key, 
GNUNET_PSYC_CHANNEL_PRIVATE,
-                                  &message, &join_request, &master_started, 
NULL);
-  return; /* FIXME: test slave */
-
-  struct GNUNET_PeerIdentity origin;
-  struct GNUNET_PeerIdentity relays[16];
-  struct GNUNET_ENV_Environment *env = GNUNET_ENV_environment_create ();
-  GNUNET_ENV_environment_add (env, GNUNET_ENV_OP_ASSIGN,
-                              "_foo", "bar baz", 7);
-  GNUNET_ENV_environment_add (env, GNUNET_ENV_OP_ASSIGN,
-                              "_foo_bar", "foo bar baz", 11);
-  slv = GNUNET_PSYC_slave_join (cfg, &channel_pub_key, slave_key, &origin,
-                                16, relays, &message, &join_request, 
&slave_joined,
-                                NULL, "_request_join", env, "some data", 9);
-  GNUNET_ENV_environment_destroy (env);
+                                  &master_message, &join_request, 
&master_started, NULL);
 }
 
 

Modified: gnunet/src/psycstore/plugin_psycstore_sqlite.c
===================================================================
--- gnunet/src/psycstore/plugin_psycstore_sqlite.c      2014-03-06 23:46:42 UTC 
(rev 32574)
+++ gnunet/src/psycstore/plugin_psycstore_sqlite.c      2014-03-06 23:46:45 UTC 
(rev 32575)
@@ -435,7 +435,7 @@
                &plugin->select_membership);
 
   sql_prepare (plugin->dbh,
-               "INSERT INTO messages\n"
+               "INSERT OR IGNORE INTO messages\n"
                " (channel_id, hop_counter, signature, purpose,\n"
                "  fragment_id, fragment_offset, message_id,\n"
                "  group_generation, multicast_flags, psycstore_flags, data)\n"




reply via email to

[Prev in Thread] Current Thread [Next in Thread]