gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r33173 - in gnunet/src: include multicast psyc psycstore


From: gnunet
Subject: [GNUnet-SVN] r33173 - in gnunet/src: include multicast psyc psycstore
Date: Tue, 6 May 2014 12:26:24 +0200

Author: tg
Date: 2014-05-06 12:26:24 +0200 (Tue, 06 May 2014)
New Revision: 33173

Modified:
   gnunet/src/include/gnunet_protocols.h
   gnunet/src/include/gnunet_psyc_service.h
   gnunet/src/multicast/multicast_api.c
   gnunet/src/psyc/gnunet-service-psyc.c
   gnunet/src/psyc/psyc.h
   gnunet/src/psyc/psyc_api.c
   gnunet/src/psyc/psyc_common.c
   gnunet/src/psyc/test_psyc.c
   gnunet/src/psycstore/psycstore_api.c
Log:
psyc: in-order message delivery

Modified: gnunet/src/include/gnunet_protocols.h
===================================================================
--- gnunet/src/include/gnunet_protocols.h       2014-05-06 10:26:21 UTC (rev 
33172)
+++ gnunet/src/include/gnunet_protocols.h       2014-05-06 10:26:24 UTC (rev 
33173)
@@ -2148,7 +2148,7 @@
 /**
  * C: client
  * S: service
- * M: muticast
+ * M: multicast
  */
 
 /** S->C: result of an operation */

Modified: gnunet/src/include/gnunet_psyc_service.h
===================================================================
--- gnunet/src/include/gnunet_psyc_service.h    2014-05-06 10:26:21 UTC (rev 
33172)
+++ gnunet/src/include/gnunet_psyc_service.h    2014-05-06 10:26:24 UTC (rev 
33173)
@@ -167,14 +167,32 @@
   /**
    * Request from slave to master.
    */
-  GNUNET_PSYC_MESSAGE_REQUEST = 1 << 1
+  GNUNET_PSYC_MESSAGE_REQUEST = 1 << 1,
+
+  /**
+   * Message can be delivered out of order.
+   */
+  GNUNET_PSYC_MESSAGE_ORDER_ANY = 1 << 2
 };
 
 
+/**
+ * Values for the @a state_delta field of GNUNET_PSYC_MessageHeader.
+ */
+enum GNUNET_PSYC_StateDeltaValues
+{
+  GNUNET_PSYC_STATE_RESET = 0,
+
+  GNUNET_PSYC_STATE_NOT_MODIFIED = UINT64_MAX
+};
+
+
 GNUNET_NETWORK_STRUCT_BEGIN
 
 /**
  * Header of a PSYC message.
+ *
+ * Only present when receiving a message.
  */
 struct GNUNET_PSYC_MessageHeader
 {
@@ -223,6 +241,12 @@
    */
   uint32_t flags GNUNET_PACKED;
 
+  /**
+   * Number of message IDs since the last message that contained state
+   * operations. @see enum GNUNET_PSYC_StateDeltaValues
+   */
+  uint64_t state_delta GNUNET_PACKED;
+
   /* Followed by NUL-terminated method name. */
 };
 
@@ -479,22 +503,29 @@
 enum GNUNET_PSYC_MasterTransmitFlags
 {
   GNUNET_PSYC_MASTER_TRANSMIT_NONE = 0,
+
   /**
    * Whether this message should reset the channel state,
    * i.e. remove all previously stored state variables.
    */
-  GNUNET_PSYC_MASTER_TRANSMIT_RESET_STATE = 1 << 0,
 
+  GNUNET_PSYC_MASTER_TRANSMIT_STATE_RESET = 1 << 0,
+
   /**
-   * Whether we need to increment the group generation counter after
-   * transmitting this message.
+   * Whether this message contains any state modifiers.
    */
-  GNUNET_PSYC_MASTER_TRANSMIT_INC_GROUP_GEN = 1 << 1,
+  GNUNET_PSYC_MASTER_TRANSMIT_STATE_MODIFY = 1 << 1,
 
   /**
    * Add PSYC header variable with the hash of the current channel state.
    */
-  GNUNET_PSYC_MASTER_TRANSMIT_ADD_STATE_HASH = 1 << 2
+  GNUNET_PSYC_MASTER_TRANSMIT_STATE_HASH = 1 << 2,
+
+  /**
+   * Whether we need to increment the group generation counter after
+   * transmitting this message.
+   */
+  GNUNET_PSYC_MASTER_TRANSMIT_INC_GROUP_GEN = 1 << 3
 };
 
 

Modified: gnunet/src/multicast/multicast_api.c
===================================================================
--- gnunet/src/multicast/multicast_api.c        2014-05-06 10:26:21 UTC (rev 
33172)
+++ gnunet/src/multicast/multicast_api.c        2014-05-06 10:26:24 UTC (rev 
33173)
@@ -182,7 +182,7 @@
   {
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                 "Calling origin's message callback "
-                "for a message of type %u and size %u.\n",
+                "with a message of type %u and size %u.\n",
               ntohs (msg->type), ntohs (msg->size));
     struct GNUNET_MULTICAST_Origin *orig = (struct GNUNET_MULTICAST_Origin *) 
grp;
     orig->message_cb (orig->cls, msg);
@@ -190,8 +190,8 @@
   else
   {
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "Calling slave's message callback "
-                "for a message of type %u and size %u.\n",
+                "Calling member's message callback "
+                "with a message of type %u and size %u.\n",
                 ntohs (msg->type), ntohs (msg->size));
     struct GNUNET_MULTICAST_Member *mem = (struct GNUNET_MULTICAST_Member *) 
grp;
     mem->message_cb (mem->cls, msg);
@@ -477,8 +477,8 @@
   msg->group_generation = mh->group_generation;
 
   /* FIXME: add fragment ID and signature in the service instead of here */
-  msg->fragment_id = GNUNET_ntohll (orig->next_fragment_id++);
-  msg->fragment_offset = GNUNET_ntohll (mh->fragment_offset);
+  msg->fragment_id = GNUNET_htonll (orig->next_fragment_id++);
+  msg->fragment_offset = GNUNET_htonll (mh->fragment_offset);
   mh->fragment_offset += sizeof (*msg) + buf_size;
   msg->purpose.size = htonl (sizeof (*msg) + buf_size
                              - sizeof (msg->header)

Modified: gnunet/src/psyc/gnunet-service-psyc.c
===================================================================
--- gnunet/src/psyc/gnunet-service-psyc.c       2014-05-06 10:26:21 UTC (rev 
33172)
+++ gnunet/src/psyc/gnunet-service-psyc.c       2014-05-06 10:26:24 UTC (rev 
33173)
@@ -73,19 +73,21 @@
   struct TransmitMessage *next;
 
   /**
-   * Buffer with message to be transmitted.
+   * ID assigned to the message.
    */
-  char *buf;
+  uint64_t id;
 
   /**
    * Size of @a buf
    */
-  uint16_t size
-;
+  uint16_t size;
+
   /**
    * @see enum MessageState
    */
   uint8_t state;
+
+  /* Followed by message */
 };
 
 
@@ -100,9 +102,9 @@
 
 /**
  * Entry in the chan_msgs hashmap of @a recv_cache:
- * fragment_id -> FragmentEntry
+ * fragment_id -> RecvCacheEntry
  */
-struct FragmentEntry
+struct RecvCacheEntry
 {
   struct GNUNET_MULTICAST_MessageHeader *mmsg;
   uint16_t ref_count;
@@ -110,20 +112,48 @@
 
 
 /**
- * Entry in the @a recv_msgs hash map of a @a Channel.
- * message_id -> FragmentCache
+ * Entry in the @a recv_frags hash map of a @a Channel.
+ * message_id -> FragmentQueue
  */
-struct FragmentCache
+struct FragmentQueue
 {
   /**
-   * Total size of header fragments (METHOD & MODIFIERs)
+   * Fragment IDs stored in @a recv_cache.
    */
+  struct GNUNET_CONTAINER_Heap *fragments;
+
+  /**
+   * Total size of received fragments.
+   */
+  uint64_t size;
+
+  /**
+   * Total size of received header fragments (METHOD & MODIFIERs)
+   */
   uint64_t header_size;
 
   /**
-   * Fragment IDs stored in @a recv_cache.
+   * The @a state_delta field from struct GNUNET_PSYC_MessageMethod.
    */
-  struct GNUNET_CONTAINER_Heap *fragments;
+  uint64_t state_delta;
+
+  /**
+   * The @a flags field from struct GNUNET_PSYC_MessageMethod.
+   */
+  uint32_t flags;
+
+  /**
+   * Receive state of message.
+   *
+   * @see MessageFragmentState
+   */
+  uint8_t state;
+
+  /**
+   * Is the message queued for delivery to the client?
+   * i.e. added to the recv_msgs queue
+   */
+  uint8_t queued;
 };
 
 
@@ -139,13 +169,18 @@
 
   /**
    * Received fragments not yet sent to the client.
-   * message_id -> FragmentCache
+   * message_id -> FragmentQueue
    */
-  struct GNUNET_CONTAINER_MultiHashMap *recv_msgs;
+  struct GNUNET_CONTAINER_MultiHashMap *recv_frags;
 
   /**
-   * FIXME
+   * Received message IDs not yet sent to the client.
    */
+  struct GNUNET_CONTAINER_Heap *recv_msgs;
+
+  /**
+   * FIXME: needed?
+   */
   GNUNET_SCHEDULER_TaskIdentifier tmit_task;
 
   /**
@@ -159,6 +194,19 @@
   struct GNUNET_HashCode pub_key_hash;
 
   /**
+   * Last message ID sent to the client.
+   * 0 if there is no such message.
+   */
+  uint64_t max_message_id;
+
+  /**
+   * ID of the last stateful message, where the state operations has been
+   * processed and saved to PSYCstore and which has been sent to the client.
+   * 0 if there is no such message.
+   */
+  uint64_t max_state_message_id;
+
+  /**
    * Expected value size for the modifier being received from the PSYC service.
    */
   uint32_t tmit_mod_value_size_expected;
@@ -174,7 +222,7 @@
   uint8_t tmit_state;
 
   /**
-   * FIXME
+   * FIXME: needed?
    */
   uint8_t in_transmit;
 
@@ -221,7 +269,7 @@
   struct GNUNET_MULTICAST_OriginMessageHandle *tmit_handle;
 
   /**
-   * Maximum message ID for this channel.
+   * Last message ID transmitted to this channel.
    *
    * Incremented before sending a message, thus the message_id in messages sent
    * starts from 1.
@@ -229,13 +277,13 @@
   uint64_t max_message_id;
 
   /**
-   * ID of the last message that contains any state operations.
+   * ID of the last message with state operations transmitted to the channel.
    * 0 if there is no such message.
    */
   uint64_t max_state_message_id;
 
   /**
-   * Maximum group generation for this channel.
+   * Maximum group generation transmitted to the channel.
    */
   uint64_t max_group_generation;
 
@@ -292,11 +340,6 @@
   struct GNUNET_MessageHeader *join_req;
 
   /**
-   * Maximum message ID for this channel.
-   */
-  uint64_t max_message_id;
-
-  /**
    * Maximum request ID for this channel.
    */
   uint64_t max_request_id;
@@ -304,7 +347,7 @@
 
 
 static inline void
-transmit_message (struct Channel *ch, uint8_t inc_msg_id);
+transmit_message (struct Channel *ch);
 
 
 /**
@@ -386,7 +429,7 @@
   /* Send pending messages to multicast before cleanup. */
   if (NULL != ch->tmit_head)
   {
-    transmit_message (ch, GNUNET_NO);
+    transmit_message (ch);
   }
   else
   {
@@ -484,6 +527,7 @@
 hash_key_from_nll (struct GNUNET_HashCode *key, uint64_t n)
 {
   /* use little-endian order, as idx_of MultiHashMap casts key to unsigned int 
*/
+  /* TODO: use built-in byte swap functions if available */
 
   n = ((n <<  8) & 0xFF00FF00FF00FF00ULL) | ((n >>  8) & 
0x00FF00FF00FF00FFULL);
   n = ((n << 16) & 0xFFFF0000FFFF0000ULL) | ((n >> 16) & 
0x0000FFFF0000FFFFULL);
@@ -512,29 +556,40 @@
 }
 
 
+/**
+ * Insert a multicast message fragment into the queue belonging to the message.
+ *
+ * @param ch           Channel.
+ * @param mmsg         Multicast message fragment.
+ * @param msg_id_hash  Message ID of @a mmsg in a struct GNUNET_HashCode.
+ * @param first_ptype  First PSYC message part type in @a mmsg.
+ * @param last_ptype   Last PSYC message part type in @a mmsg.
+ */
 static void
-fragment_cache_insert (struct Channel *ch,
-                       const struct GNUNET_HashCode *msg_id,
-                       struct FragmentCache *frag_cache,
+fragment_queue_insert (struct Channel *ch,
                        const struct GNUNET_MULTICAST_MessageHeader *mmsg,
-                       uint16_t last_part_type)
+                       uint16_t first_ptype, uint16_t last_ptype)
 {
-  uint16_t size = ntohs (mmsg->header.size);
+  const uint16_t size = ntohs (mmsg->header.size);
+  const uint64_t frag_offset = GNUNET_ntohll (mmsg->fragment_offset);
   struct GNUNET_CONTAINER_MultiHashMap
     *chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache,
                                                     &ch->pub_key_hash);
 
-  if (NULL == frag_cache)
+  struct GNUNET_HashCode msg_id_hash;
+  hash_key_from_nll (&msg_id_hash, mmsg->message_id);
+
+  struct FragmentQueue
+    *fragq = GNUNET_CONTAINER_multihashmap_get (ch->recv_frags, &msg_id_hash);
+
+  if (NULL == fragq)
   {
-    frag_cache = GNUNET_new (struct FragmentCache);
-    frag_cache->fragments
+    fragq = GNUNET_new (struct FragmentQueue);
+    fragq->state = MSG_FRAG_STATE_HEADER;
+    fragq->fragments
       = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
 
-    if (NULL == ch->recv_msgs)
-    {
-      ch->recv_msgs = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
-    }
-    GNUNET_CONTAINER_multihashmap_put (ch->recv_msgs, msg_id, frag_cache,
+    GNUNET_CONTAINER_multihashmap_put (ch->recv_frags, &msg_id_hash, fragq,
                                        
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
 
     if (NULL == chan_msgs)
@@ -545,190 +600,335 @@
     }
   }
 
-  struct GNUNET_HashCode *frag_id = GNUNET_new (struct GNUNET_HashCode);
-  hash_key_from_nll (frag_id, mmsg->fragment_id);
-  struct FragmentEntry
-    *frag_entry = GNUNET_CONTAINER_multihashmap_get (chan_msgs, frag_id);
-  if (NULL == frag_entry)
+  struct GNUNET_HashCode frag_id_hash;
+  hash_key_from_nll (&frag_id_hash, mmsg->fragment_id);
+  struct RecvCacheEntry
+    *cache_entry = GNUNET_CONTAINER_multihashmap_get (chan_msgs, 
&frag_id_hash);
+  if (NULL == cache_entry)
   {
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                 "%p Adding message fragment to cache. "
-                "fragment_id: %" PRIu64 ", "
-                "header_size: %" PRIu64 " + %" PRIu64 ").\n",
-                ch, GNUNET_ntohll (mmsg->fragment_id),
-                frag_cache->header_size, size);
-    frag_entry = GNUNET_new (struct FragmentEntry);
-    frag_entry->ref_count = 1;
-    frag_entry->mmsg = GNUNET_malloc (size);
-    memcpy (frag_entry->mmsg, mmsg, size);
-    GNUNET_CONTAINER_multihashmap_put (chan_msgs, frag_id, frag_entry,
+                "message_id: %" PRIu64 ", fragment_id: %" PRIu64 ", "
+                "header_size: %" PRIu64 " + %u).\n",
+                ch, GNUNET_ntohll (mmsg->message_id),
+                GNUNET_ntohll (mmsg->fragment_id),
+                fragq->header_size, size);
+    cache_entry = GNUNET_new (struct RecvCacheEntry);
+    cache_entry->ref_count = 1;
+    cache_entry->mmsg = GNUNET_malloc (size);
+    memcpy (cache_entry->mmsg, mmsg, size);
+    GNUNET_CONTAINER_multihashmap_put (chan_msgs, &frag_id_hash, cache_entry,
                                        
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
   }
   else
   {
-    frag_entry->ref_count++;
+    cache_entry->ref_count++;
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "%p Message fragment already in cache. "
-                "fragment_id: %" PRIu64 ", ref_count: %u\n",
-                ch, GNUNET_ntohll (mmsg->fragment_id), frag_entry->ref_count);
+                "%p Message fragment is already in cache. "
+                "message_id: %" PRIu64 ", fragment_id: %" PRIu64
+                ", ref_count: %u\n",
+                ch, GNUNET_ntohll (mmsg->message_id),
+                GNUNET_ntohll (mmsg->fragment_id), cache_entry->ref_count);
   }
 
-  switch (last_part_type)
+  if (MSG_FRAG_STATE_HEADER == fragq->state)
   {
-  case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
-  case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
-  case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT:
-    frag_cache->header_size += size;
+    if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
+    {
+      struct GNUNET_PSYC_MessageMethod *
+        pmeth = (struct GNUNET_PSYC_MessageMethod *) &mmsg[1];
+      fragq->state_delta = GNUNET_ntohll (pmeth->state_delta);
+      fragq->flags = ntohl (pmeth->flags);
+    }
+
+    if (last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA)
+    {
+      fragq->header_size += size;
+    }
+    else if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype
+             || frag_offset == fragq->header_size)
+    { /* header is now complete */
+      GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+                  "%p Header of message %" PRIu64 " is complete.\n",
+                  ch, GNUNET_ntohll (mmsg->message_id));
+
+      GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+                  "%p Adding message %" PRIu64 " to queue.\n",
+                  ch, GNUNET_ntohll (mmsg->message_id));
+      fragq->state = MSG_FRAG_STATE_DATA;
+    }
+    else
+    {
+      GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+                  "%p Header of message %" PRIu64 " is NOT complete yet: "
+                  "%" PRIu64 " != %" PRIu64 "\n",
+                  ch, GNUNET_ntohll (mmsg->message_id), frag_offset,
+                  fragq->header_size);
+    }
   }
-  GNUNET_CONTAINER_heap_insert (frag_cache->fragments, frag_id,
+
+  switch (last_ptype)
+  {
+  case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
+    if (frag_offset == fragq->size)
+      fragq->state = MSG_FRAG_STATE_END;
+    else
+      GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+                  "%p Message %" PRIu64 " is NOT complete yet: "
+                  "%" PRIu64 " != %" PRIu64 "\n",
+                  ch, GNUNET_ntohll (mmsg->message_id), frag_offset,
+                  fragq->size);
+    break;
+
+  case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
+    /* Drop message without delivering to client if it's a single fragment */
+    fragq->state =
+      (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
+      ? MSG_FRAG_STATE_DROP
+      : MSG_FRAG_STATE_CANCEL;
+  }
+
+  switch (fragq->state)
+  {
+  case MSG_FRAG_STATE_DATA:
+  case MSG_FRAG_STATE_END:
+  case MSG_FRAG_STATE_CANCEL:
+    if (GNUNET_NO == fragq->queued)
+    {
+      GNUNET_CONTAINER_heap_insert (ch->recv_msgs, NULL,
+                                    GNUNET_ntohll (mmsg->message_id));
+      fragq->queued = GNUNET_YES;
+    }
+  }
+
+  fragq->size += size;
+  GNUNET_CONTAINER_heap_insert (fragq->fragments, NULL,
                                 GNUNET_ntohll (mmsg->fragment_id));
 }
 
 
+/**
+ * Run fragment queue of a message.
+ *
+ * Send fragments of a message in order to client, after all modifiers arrived
+ * from multicast.
+ *
+ * @param ch      Channel.
+ * @param msg_id  ID of the message @a fragq belongs to.
+ * @param fragq   Fragment queue of the message.
+ * @param drop    Drop message without delivering to client?
+ *                #GNUNET_YES or #GNUNET_NO.
+ */
 static void
-fragment_cache_clear (struct Channel *ch,
-                      const struct GNUNET_HashCode *msg_id,
-                      struct FragmentCache *frag_cache,
-                      uint8_t send_to_client)
+fragment_queue_run (struct Channel *ch, uint64_t msg_id,
+                    struct FragmentQueue *fragq, uint8_t drop)
 {
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "%p Clearing message fragment cache.\n", ch);
+  GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+              "%p Running message fragment queue for message %" PRIu64
+              " (state: %u).\n",
+              ch, msg_id, fragq->state);
 
   struct GNUNET_CONTAINER_MultiHashMap
     *chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache,
                                                     &ch->pub_key_hash);
   GNUNET_assert (NULL != chan_msgs);
-  struct GNUNET_HashCode *frag_id;
+  uint64_t frag_id;
 
-  while ((frag_id = GNUNET_CONTAINER_heap_remove_root (frag_cache->fragments)))
+  while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (fragq->fragments, NULL,
+                                                    &frag_id))
   {
-    struct FragmentEntry
-      *frag_entry = GNUNET_CONTAINER_multihashmap_get (chan_msgs, frag_id);
-    if (frag_entry != NULL)
+    struct GNUNET_HashCode frag_id_hash;
+    hash_key_from_hll (&frag_id_hash, frag_id);
+    struct RecvCacheEntry *cache_entry
+      = GNUNET_CONTAINER_multihashmap_get (chan_msgs, &frag_id_hash);
+    if (cache_entry != NULL)
     {
-      if (GNUNET_YES == send_to_client)
+      if (GNUNET_NO == drop)
       {
-        message_to_client (ch, frag_entry->mmsg);
+        message_to_client (ch, cache_entry->mmsg);
       }
-      if (1 == frag_entry->ref_count)
+      if (cache_entry->ref_count <= 1)
       {
-        GNUNET_CONTAINER_multihashmap_remove (chan_msgs, frag_id, frag_entry);
-        GNUNET_free (frag_entry->mmsg);
-        GNUNET_free (frag_entry);
+        GNUNET_CONTAINER_multihashmap_remove (chan_msgs, &frag_id_hash,
+                                              cache_entry);
+        GNUNET_free (cache_entry->mmsg);
+        GNUNET_free (cache_entry);
       }
       else
       {
-        frag_entry->ref_count--;
+        cache_entry->ref_count--;
       }
     }
-    GNUNET_free (frag_id);
+#if CACHE_AGING_IMPLEMENTED
+    else if (GNUNET_NO == drop)
+    {
+      /* TODO: fragment not in cache anymore, retrieve it from PSYCstore */
+    }
+#endif
+
+    GNUNET_CONTAINER_heap_remove_root (fragq->fragments);
   }
 
-  GNUNET_CONTAINER_multihashmap_remove (ch->recv_msgs, msg_id, frag_cache);
-  GNUNET_CONTAINER_heap_destroy (frag_cache->fragments);
-  GNUNET_free (frag_cache);
+  if (MSG_FRAG_STATE_END <= fragq->state)
+  {
+    struct GNUNET_HashCode msg_id_hash;
+    hash_key_from_nll (&msg_id_hash, msg_id);
+
+    GNUNET_CONTAINER_multihashmap_remove (ch->recv_frags, &msg_id_hash, fragq);
+    GNUNET_CONTAINER_heap_destroy (fragq->fragments);
+    GNUNET_free (fragq);
+  }
+  else
+  {
+    fragq->queued = GNUNET_NO;
+  }
 }
 
 
 /**
- * Incoming message fragment from multicast.
+ * Run message queue.
  *
- * Store it using PSYCstore and send it to the client of the channel.
+ * Send messages in queue to client in order after a message has arrived from
+ * multicast, according to the following:
+ * - A message is only sent if all of its modifiers arrived.
+ * - A stateful message is only sent if the previous stateful message
+ *   has already been delivered to the client.
+ *
+ * @param ch  Channel.
+ * @return Number of messages removed from queue and sent to client.
  */
-static void
-message_cb (void *cls, const struct GNUNET_MessageHeader *msg)
+static uint64_t
+message_queue_run (struct Channel *ch)
 {
-  struct Channel *ch = cls;
-  uint16_t type = ntohs (msg->type);
-  uint16_t size = ntohs (msg->size);
-
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "%p Received message of type %u and size %u from multicast.\n",
-              ch, type, size);
-
-  switch (type)
+  GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+              "%p Running message queue.\n", ch);
+  uint64_t n = 0;
+  uint64_t msg_id;
+  while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (ch->recv_msgs, NULL,
+                                                    &msg_id))
   {
-  case GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE:
-  {
-    GNUNET_PSYCSTORE_fragment_store (store, &ch->pub_key,
-                                     (const struct
-                                      GNUNET_MULTICAST_MessageHeader *) msg,
-                                     0, NULL, NULL);
+    GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+                "%p Processing message %" PRIu64 " in queue.\n", ch, msg_id);
+    struct GNUNET_HashCode msg_id_hash;
+    hash_key_from_hll (&msg_id_hash, msg_id);
 
-#if TODO
-    /* FIXME: apply modifiers to state in PSYCstore */
-    GNUNET_PSYCSTORE_state_modify (store, &ch->pub_key,
-                                   GNUNET_ntohll (mmsg->message_id),
-                                   meth->mod_count, mods,
-                                   rcb, rcb_cls);
-#endif
+    struct FragmentQueue *
+      fragq = GNUNET_CONTAINER_multihashmap_get (ch->recv_frags, &msg_id_hash);
 
-    const struct GNUNET_MULTICAST_MessageHeader
-      *mmsg = (const struct GNUNET_MULTICAST_MessageHeader *) msg;
-
-    uint16_t ptype = GNUNET_PSYC_message_last_part (size - sizeof (*mmsg),
-                                                    (const char *) &mmsg[1]);
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "Last message part type %u\n", ptype);
-
-    if (GNUNET_NO == ptype)
+    if (NULL == fragq || fragq->state <= MSG_FRAG_STATE_HEADER)
     {
       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
-                  "%p Received message with invalid parts from multicast. "
-                  "Dropping message.\n", ch);
-      GNUNET_break_op (0);
+                  "%p No fragq (%p) or header not complete.\n",
+                  ch, fragq);
       break;
     }
 
-    struct GNUNET_HashCode msg_id;
-    hash_key_from_nll (&msg_id, mmsg->message_id);
-
-    struct FragmentCache *frag_cache = NULL;
-    if (NULL != ch->recv_msgs)
-      frag_cache = GNUNET_CONTAINER_multihashmap_get (ch->recv_msgs, &msg_id);
-
-    switch (ptype)
+    if (MSG_FRAG_STATE_HEADER == fragq->state)
     {
-    case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
-    case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
-      /* FIXME: check state flag / max_state_message_id */
-      if (NULL == frag_cache)
+      /* Check if there's a missing message before the current one */
+      if (GNUNET_PSYC_STATE_NOT_MODIFIED == fragq->state_delta)
       {
-        message_to_client (ch, mmsg);
-        break;
+        if (!(fragq->flags & GNUNET_PSYC_MESSAGE_ORDER_ANY)
+            && msg_id - 1 != ch->max_message_id)
+        {
+          GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+                      "%p Out of order message. "
+                      "(%" PRIu64 " - 1 != %" PRIu64 ")\n",
+                      ch, msg_id, ch->max_message_id);
+          break;
+        }
       }
       else
       {
-        if (GNUNET_ntohll (mmsg->fragment_offset) == frag_cache->header_size)
-        { /* first data fragment after the header, send cached fragments */
-          fragment_cache_clear (ch, &msg_id, frag_cache, GNUNET_YES);
-          message_to_client (ch, mmsg);
+        if (msg_id - fragq->state_delta != ch->max_state_message_id)
+        {
+          GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+                      "%p Out of order stateful message. "
+                      "(%" PRIu64 " - %" PRIu64 " != %" PRIu64 ")\n",
+                      ch, msg_id, fragq->state_delta, 
ch->max_state_message_id);
           break;
         }
-        else
-        { /* still missing fragments from the header, cache data fragment */
-          /* fall thru */
-        }
+#if TODO
+        /* FIXME: apply modifiers to state in PSYCstore */
+        GNUNET_PSYCSTORE_state_modify (store, &ch->pub_key, message_id,
+                                       state_modify_result_cb, cls);
+#endif
+        ch->max_state_message_id = msg_id;
       }
+      ch->max_message_id = msg_id;
+    }
+    fragment_queue_run (ch, msg_id, fragq, MSG_FRAG_STATE_DROP == 
fragq->state);
+    GNUNET_CONTAINER_heap_remove_root (ch->recv_msgs);
+    n++;
+  }
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "%p Removed %" PRIu64 " messages from queue.\n", ch, n);
+  return n;
+}
 
-    case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
-    case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
-    case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT:
-      /* not all modifiers arrived yet, cache fragment */
-      fragment_cache_insert (ch, &msg_id, frag_cache, mmsg, ptype);
-      break;
 
-    case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
-      if (NULL != frag_cache)
-      { /* fragments not yet sent to client, remove from cache */
-        fragment_cache_clear (ch, &msg_id, frag_cache, GNUNET_NO);
-      }
-      else
-      {
-        message_to_client (ch, mmsg);
-      }
-      break;
-    }
+/**
+ * Handle incoming message from multicast.
+ *
+ * @param ch   Channel.
+ * @param mmsg Multicast message.
+ *
+ * @return #GNUNET_OK or #GNUNET_SYSERR
+ */
+static int
+handle_multicast_message (struct Channel *ch,
+                          const struct GNUNET_MULTICAST_MessageHeader *mmsg)
+{
+  GNUNET_PSYCSTORE_fragment_store (store, &ch->pub_key, mmsg, 0, NULL, NULL);
+
+  uint16_t size = ntohs (mmsg->header.size);
+  uint16_t first_ptype = 0, last_ptype = 0;
+
+  if (GNUNET_SYSERR
+      == GNUNET_PSYC_check_message_parts (size - sizeof (*mmsg),
+                                          (const char *) &mmsg[1],
+                                          &first_ptype, &last_ptype))
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+                "%p Received message with invalid parts from multicast. "
+                "Dropping message.\n", ch);
+    GNUNET_break_op (0);
+    return GNUNET_SYSERR;
+  }
+
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Message parts: first: type %u, last: type %u\n",
+              first_ptype, last_ptype);
+
+  fragment_queue_insert (ch, mmsg, first_ptype, last_ptype);
+  message_queue_run (ch);
+
+  return GNUNET_OK;
+}
+
+
+/**
+ * Incoming message fragment from multicast.
+ *
+ * Store it using PSYCstore and send it to the client of the channel.
+ */
+static void
+message_cb (void *cls, const struct GNUNET_MessageHeader *msg)
+{
+  struct Channel *ch = cls;
+  uint16_t type = ntohs (msg->type);
+  uint16_t size = ntohs (msg->size);
+
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "%p Received message of type %u and size %u from multicast.\n",
+              ch, type, size);
+
+  switch (type)
+  {
+  case GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE:
+  {
+    handle_multicast_message (ch, (const struct
+                                   GNUNET_MULTICAST_MessageHeader *) msg);
     break;
   }
   default:
@@ -770,8 +970,9 @@
       = (const struct GNUNET_MULTICAST_RequestHeader *) msg;
 
     /* FIXME: see message_cb() */
-    if (GNUNET_NO == GNUNET_PSYC_message_last_part (size - sizeof (*req),
-                                                    (const char *) &req[1]))
+    if (GNUNET_SYSERR == GNUNET_PSYC_check_message_parts (size - sizeof (*req),
+                                                          (const char *) 
&req[1],
+                                                          NULL, NULL))
     {
       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
                   "%p Dropping message with invalid parts "
@@ -825,7 +1026,8 @@
   if (GNUNET_OK == result || GNUNET_NO == result)
   {
     mst->max_message_id = max_message_id;
-    mst->max_state_message_id = max_state_message_id;
+    ch->max_message_id = max_message_id;
+    ch->max_state_message_id = max_state_message_id;
     mst->max_group_generation = max_group_generation;
     mst->origin
       = GNUNET_MULTICAST_origin_start (cfg, &mst->priv_key,
@@ -860,7 +1062,8 @@
 
   if (GNUNET_OK == result || GNUNET_NO == result)
   {
-    slv->max_message_id = max_message_id;
+    ch->max_message_id = max_message_id;
+    ch->max_state_message_id = max_state_message_id;
     slv->member
       = GNUNET_MULTICAST_member_join (cfg, &ch->pub_key, &slv->slave_key,
                                       &slv->origin,
@@ -879,6 +1082,15 @@
 }
 
 
+static void
+channel_init (struct Channel *ch)
+{
+  ch->recv_msgs
+    = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
+  ch->recv_frags = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
+}
+
+
 /**
  * Handle a connecting client starting a channel master.
  */
@@ -888,14 +1100,18 @@
 {
   const struct MasterStartRequest *req
     = (const struct MasterStartRequest *) msg;
+
   struct Master *mst = GNUNET_new (struct Master);
+  mst->policy = ntohl (req->policy);
+  mst->priv_key = req->channel_key;
+
   struct Channel *ch = &mst->channel;
   ch->client = client;
   ch->is_master = GNUNET_YES;
-  mst->policy = ntohl (req->policy);
-  mst->priv_key = req->channel_key;
   GNUNET_CRYPTO_eddsa_key_get_public (&mst->priv_key, &ch->pub_key);
   GNUNET_CRYPTO_hash (&ch->pub_key, sizeof (ch->pub_key), &ch->pub_key_hash);
+  channel_init (ch);
+
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "%p Master connected to channel %s.\n",
               mst, GNUNET_h2s (&ch->pub_key_hash));
@@ -919,13 +1135,7 @@
   const struct SlaveJoinRequest *req
     = (const struct SlaveJoinRequest *) msg;
   struct Slave *slv = GNUNET_new (struct Slave);
-  struct Channel *ch = &slv->channel;
-  slv->channel.client = client;
-  slv->channel.is_master = GNUNET_NO;
   slv->slave_key = req->slave_key;
-  ch->pub_key = req->channel_key;
-  GNUNET_CRYPTO_hash (&ch->pub_key, sizeof (ch->pub_key),
-                      &ch->pub_key_hash);
   slv->origin = req->origin;
   slv->relay_count = ntohl (req->relay_count);
   if (0 < slv->relay_count)
@@ -939,6 +1149,14 @@
       memcpy (&slv->relays[i], &relays[i], sizeof (*relays));
   }
 
+  struct Channel *ch = &slv->channel;
+  ch->client = client;
+  ch->is_master = GNUNET_NO;
+  ch->pub_key = req->channel_key;
+  GNUNET_CRYPTO_hash (&ch->pub_key, sizeof (ch->pub_key),
+                      &ch->pub_key_hash);
+  channel_init (ch);
+
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "%p Slave connected to channel %s.\n",
               slv, GNUNET_h2s (&ch->pub_key_hash));
@@ -991,7 +1209,7 @@
               "%p transmit_notify: sending %u bytes.\n", ch, tmit_msg->size);
 
   *data_size = tmit_msg->size;
-  memcpy (data, tmit_msg->buf, *data_size);
+  memcpy (data, &tmit_msg[1], *data_size);
 
   GNUNET_CONTAINER_DLL_remove (ch->tmit_head, ch->tmit_tail, tmit_msg);
   GNUNET_free (tmit_msg);
@@ -1003,7 +1221,7 @@
   {
     if (NULL != ch->tmit_head)
     {
-      transmit_message (ch, GNUNET_NO);
+      transmit_message (ch);
     }
     else if (ch->disconnected)
     {
@@ -1054,14 +1272,12 @@
  * Transmit a message from a channel master to the multicast group.
  */
 static void
-master_transmit_message (struct Master *mst, uint8_t inc_msg_id)
+master_transmit_message (struct Master *mst)
 {
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p master_transmit_message()\n", mst);
   mst->channel.tmit_task = 0;
   if (NULL == mst->tmit_handle)
   {
-    if (GNUNET_YES == inc_msg_id)
-      mst->max_message_id++;
     mst->tmit_handle
       = GNUNET_MULTICAST_origin_to_all (mst->origin, mst->max_message_id,
                                         mst->max_group_generation,
@@ -1078,13 +1294,11 @@
  * Transmit a message from a channel slave to the multicast group.
  */
 static void
-slave_transmit_message (struct Slave *slv, uint8_t inc_msg_id)
+slave_transmit_message (struct Slave *slv)
 {
   slv->channel.tmit_task = 0;
   if (NULL == slv->tmit_handle)
   {
-    if (GNUNET_YES == inc_msg_id)
-      slv->max_message_id++;
     slv->tmit_handle
       = GNUNET_MULTICAST_member_to_origin (slv->member, slv->max_request_id,
                                            slave_transmit_notify, slv);
@@ -1097,30 +1311,95 @@
 
 
 static inline void
-transmit_message (struct Channel *ch, uint8_t inc_msg_id)
+transmit_message (struct Channel *ch)
 {
   ch->is_master
-    ? master_transmit_message ((struct Master *) ch, inc_msg_id)
-    : slave_transmit_message ((struct Slave *) ch, inc_msg_id);
+    ? master_transmit_message ((struct Master *) ch)
+    : slave_transmit_message ((struct Slave *) ch);
 }
 
 
+/**
+ * Queue a message from a channel master for sending to the multicast group.
+ */
 static void
-transmit_error (struct Channel *ch)
+master_queue_message (struct Master *mst, struct TransmitMessage *tmit_msg,
+                     uint16_t first_ptype, uint16_t last_ptype)
 {
-  struct GNUNET_MessageHeader *msg;
-  struct TransmitMessage *tmit_msg = GNUNET_malloc (sizeof (*tmit_msg)
-                                                    + sizeof (*msg));
-  msg = (struct GNUNET_MessageHeader *) &tmit_msg[1];
-  msg->size = ntohs (sizeof (*msg));
-  msg->type = ntohs (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL);
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p master_queue_message()\n", mst);
 
-  tmit_msg->buf = (char *) &tmit_msg[1];
-  tmit_msg->size = sizeof (*msg);
+  if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
+  {
+    tmit_msg->id = ++mst->max_message_id;
+    struct GNUNET_PSYC_MessageMethod *pmeth
+      = (struct GNUNET_PSYC_MessageMethod *) &tmit_msg[1];
+
+    if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_RESET)
+    {
+      pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_RESET);
+    }
+    else if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_MODIFY)
+    {
+      pmeth->state_delta = GNUNET_htonll (tmit_msg->id
+                                          - mst->max_state_message_id);
+    }
+    else
+    {
+      pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_NOT_MODIFIED);
+    }
+  }
+}
+
+
+/**
+ * Queue a message from a channel slave for sending to the multicast group.
+ */
+static void
+slave_queue_message (struct Slave *slv, struct TransmitMessage *tmit_msg,
+                     uint16_t first_ptype, uint16_t last_ptype)
+{
+  if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
+  {
+    struct GNUNET_PSYC_MessageMethod *pmeth
+      = (struct GNUNET_PSYC_MessageMethod *) &tmit_msg[1];
+    pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_NOT_MODIFIED);
+    tmit_msg->id = ++slv->max_request_id;
+  }
+}
+
+
+static void
+queue_message (struct Channel *ch,  const struct GNUNET_MessageHeader *msg,
+               uint16_t first_ptype, uint16_t last_ptype)
+{
+  uint16_t size = ntohs (msg->size) - sizeof (*msg);
+  struct TransmitMessage *tmit_msg = GNUNET_malloc (sizeof (*tmit_msg) + size);
+  memcpy (&tmit_msg[1], &msg[1], size);
+  tmit_msg->size = size;
   tmit_msg->state = ch->tmit_state;
+
   GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, tmit_msg);
-  transmit_message (ch, GNUNET_NO);
 
+  ch->is_master
+    ? master_queue_message ((struct Master *) ch, tmit_msg,
+                            first_ptype, last_ptype)
+    : slave_queue_message ((struct Slave *) ch, tmit_msg,
+                           first_ptype, last_ptype);
+}
+
+
+static void
+transmit_error (struct Channel *ch)
+{
+  uint16_t type = GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL;
+
+  struct GNUNET_MessageHeader msg;
+  msg.size = ntohs (sizeof (msg));
+  msg.type = ntohs (type);
+
+  queue_message (ch, &msg, type, type);
+  transmit_message (ch);
+
   /* FIXME: cleanup */
 }
 
@@ -1136,6 +1415,10 @@
     = GNUNET_SERVER_client_get_user_context (client, struct Channel);
   GNUNET_assert (NULL != ch);
 
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "%p Received message from client.\n", ch);
+  GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, msg);
+
   if (GNUNET_YES != ch->ready)
   {
     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
@@ -1145,10 +1428,7 @@
     return;
   }
 
-  uint8_t inc_msg_id = GNUNET_NO;
   uint16_t size = ntohs (msg->size);
-  uint16_t psize = 0, ptype = 0, pos = 0;
-
   if (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < size - sizeof (*msg))
   {
     GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "%p Message payload too large\n", ch);
@@ -1158,42 +1438,22 @@
     return;
   }
 
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "%p Received message from client.\n", ch);
-  GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, msg);
-
-  for (pos = 0; sizeof (*msg) + pos < size; pos += psize)
+  uint16_t first_ptype = 0, last_ptype = 0;
+  if (GNUNET_SYSERR
+      == GNUNET_PSYC_check_message_parts (size - sizeof (*msg),
+                                          (const char *) &msg[1],
+                                          &first_ptype, &last_ptype))
   {
-    const struct GNUNET_MessageHeader *pmsg
-      = (const struct GNUNET_MessageHeader *) ((char *) &msg[1] + pos);
-    psize = ntohs (pmsg->size);
-    ptype = ntohs (pmsg->type);
-    if (psize < sizeof (*pmsg) || sizeof (*msg) + pos + psize > size)
-    {
-      GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
-                  "%p Received invalid message part of type %u and size %u "
-                  "from client.\n", ch, ptype, psize);
-      GNUNET_break (0);
-      transmit_error (ch);
-      GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
-      return;
-    }
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "%p Received message part from client.\n", ch);
-    GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, pmsg);
-
-    if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == ptype)
-      inc_msg_id = GNUNET_YES;
+    GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+                "%p Received invalid message part from client.\n", ch);
+    GNUNET_break (0);
+    transmit_error (ch);
+    GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
+    return;
   }
 
-  size -= sizeof (*msg);
-  struct TransmitMessage *tmit_msg = GNUNET_malloc (sizeof (*tmit_msg) + size);
-  tmit_msg->buf = (char *) &tmit_msg[1];
-  memcpy (tmit_msg->buf, &msg[1], size);
-  tmit_msg->size = size;
-  tmit_msg->state = ch->tmit_state;
-  GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, tmit_msg);
-  transmit_message (ch, inc_msg_id);
+  queue_message (ch, msg, first_ptype, last_ptype);
+  transmit_message (ch);
 
   GNUNET_SERVER_receive_done (client, GNUNET_OK);
 };

Modified: gnunet/src/psyc/psyc.h
===================================================================
--- gnunet/src/psyc/psyc.h      2014-05-06 10:26:21 UTC (rev 33172)
+++ gnunet/src/psyc/psyc.h      2014-05-06 10:26:24 UTC (rev 33173)
@@ -31,8 +31,9 @@
 #include "gnunet_psyc_service.h"
 
 
-uint16_t
-GNUNET_PSYC_message_last_part (uint16_t data_size, const char *data);
+int
+GNUNET_PSYC_check_message_parts (uint16_t data_size, const char *data,
+                                 uint16_t *first_ptype, uint16_t *last_ptype);
 
 void
 GNUNET_PSYC_log_message (enum GNUNET_ErrorType kind,
@@ -41,17 +42,29 @@
 
 enum MessageState
 {
-  MSG_STATE_START = 0,
-  MSG_STATE_HEADER = 1,
-  MSG_STATE_METHOD = 2,
+  MSG_STATE_START    = 0,
+  MSG_STATE_HEADER   = 1,
+  MSG_STATE_METHOD   = 2,
   MSG_STATE_MODIFIER = 3,
   MSG_STATE_MOD_CONT = 4,
-  MSG_STATE_DATA = 5,
-  MSG_STATE_END = 6,
-  MSG_STATE_CANCEL = 7,
+  MSG_STATE_DATA     = 5,
+  MSG_STATE_END      = 6,
+  MSG_STATE_CANCEL   = 7,
+  MSG_STATE_ERROR    = 8,
 };
 
 
+enum MessageFragmentState
+{
+  MSG_FRAG_STATE_START    = 0,
+  MSG_FRAG_STATE_HEADER   = 1,
+  MSG_FRAG_STATE_DATA     = 2,
+  MSG_FRAG_STATE_END      = 3,
+  MSG_FRAG_STATE_CANCEL   = 4,
+  MSG_FRAG_STATE_DROP     = 5,
+};
+
+
 GNUNET_NETWORK_STRUCT_BEGIN
 
 

Modified: gnunet/src/psyc/psyc_api.c
===================================================================
--- gnunet/src/psyc/psyc_api.c  2014-05-06 10:26:21 UTC (rev 33172)
+++ gnunet/src/psyc/psyc_api.c  2014-05-06 10:26:24 UTC (rev 33173)
@@ -1502,7 +1502,7 @@
   slvadd = (struct ChannelSlaveAdd *) &op[1];
   op->msg = (struct GNUNET_MessageHeader *) slvadd;
 
-  slvadd->header.type = GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_ADD;
+  slvadd->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_ADD);
   slvadd->header.size = htons (sizeof (*slvadd));
   slvadd->announced_at = GNUNET_htonll (announced_at);
   slvadd->effective_since = GNUNET_htonll (effective_since);
@@ -1544,7 +1544,7 @@
 
   slvrm = (struct ChannelSlaveRemove *) &op[1];
   op->msg = (struct GNUNET_MessageHeader *) slvrm;
-  slvrm->header.type = GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_RM;
+  slvrm->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_RM);
   slvrm->header.size = htons (sizeof (*slvrm));
   slvrm->announced_at = GNUNET_htonll (announced_at);
   GNUNET_CONTAINER_DLL_insert_tail (channel->tmit_head,

Modified: gnunet/src/psyc/psyc_common.c
===================================================================
--- gnunet/src/psyc/psyc_common.c       2014-05-06 10:26:21 UTC (rev 33172)
+++ gnunet/src/psyc/psyc_common.c       2014-05-06 10:26:24 UTC (rev 33173)
@@ -30,36 +30,48 @@
 /**
  * Check if @a data contains a series of valid message parts.
  *
- * @param data_size  Size of @a data.
- * @param data       Data.
+ * @param      data_size    Size of @a data.
+ * @param      data        Data.
+ * @param[out] first_ptype  Type of first message part.
+ * @param[out] last_ptype   Type of last message part.
  *
- * @return Message type number
- *         or GNUNET_NO if the message contains invalid or no parts.
+ * @return Number of message parts found in @a data.
+ *         or GNUNET_SYSERR if the message contains invalid parts.
  */
-uint16_t
-GNUNET_PSYC_message_last_part (uint16_t data_size, const char *data)
+int
+GNUNET_PSYC_check_message_parts (uint16_t data_size, const char *data,
+                                 uint16_t *first_ptype, uint16_t *last_ptype)
 {
   const struct GNUNET_MessageHeader *pmsg;
-  uint16_t ptype = GNUNET_NO;
-  uint16_t psize = 0;
-  uint16_t pos = 0;
+  uint16_t parts = 0, ptype = 0, psize = 0, pos = 0;
+  if (NULL != first_ptype)
+    *first_ptype = 0;
+  if (NULL != last_ptype)
+    *last_ptype = 0;
 
-  for (pos = 0; pos < data_size; pos += psize)
+  for (pos = 0; pos < data_size; pos += psize, parts++)
   {
     pmsg = (const struct GNUNET_MessageHeader *) (data + pos);
     psize = ntohs (pmsg->size);
     ptype = ntohs (pmsg->type);
-    if (psize < sizeof (*pmsg) || pos + psize > data_size
+    if (0 == parts && NULL != first_ptype)
+      *first_ptype = ptype;
+    if (NULL != last_ptype
+        && *last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END)
+      *last_ptype = ptype;
+    if (psize < sizeof (*pmsg)
+        || pos + psize > data_size
         || ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD
         || GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL < ptype)
     {
       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
                   "Invalid message part of type %u and size %u.\n",
                   ptype, psize);
-      return GNUNET_NO;
+      return GNUNET_SYSERR;
     }
+    /* FIXME: check message part order */
   }
-  return ptype;
+  return parts;
 }
 
 

Modified: gnunet/src/psyc/test_psyc.c
===================================================================
--- gnunet/src/psyc/test_psyc.c 2014-05-06 10:26:21 UTC (rev 33172)
+++ gnunet/src/psyc/test_psyc.c 2014-05-06 10:26:24 UTC (rev 33173)
@@ -35,9 +35,9 @@
 #include "gnunet_env_lib.h"
 #include "gnunet_psyc_service.h"
 
-#define TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30)
+#define TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 10)
 
-#define DEBUG_SERVICE 1
+#define DEBUG_SERVICE 0
 
 
 /**
@@ -120,7 +120,7 @@
 
 
 /**
- * Terminate the testcase (failure).
+ * Terminate the test case (failure).
  *
  * @param cls NULL
  * @param tc scheduler context
@@ -134,7 +134,7 @@
 
 
 /**
- * Terminate the testcase (success).
+ * Terminate the test case (success).
  *
  * @param cls NULL
  * @param tc scheduler context
@@ -148,7 +148,7 @@
 
 
 /**
- * Finish the testcase (successfully).
+ * Finish the test case (successfully).
  */
 static void
 end ()
@@ -518,7 +518,8 @@
 
   GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Starting master.\n");
   mst = GNUNET_PSYC_master_start (cfg, channel_key, 
GNUNET_PSYC_CHANNEL_PRIVATE,
-                                  &master_message, &join_request, 
&master_started, NULL);
+                                  &master_message, &join_request,
+                                  &master_started, NULL);
 }
 
 

Modified: gnunet/src/psycstore/psycstore_api.c
===================================================================
--- gnunet/src/psycstore/psycstore_api.c        2014-05-06 10:26:21 UTC (rev 
33172)
+++ gnunet/src/psycstore/psycstore_api.c        2014-05-06 10:26:24 UTC (rev 
33173)
@@ -1099,9 +1099,9 @@
     req->message_id = GNUNET_htonll (message_id);
     req->name_size = htons (name_size);
     req->flags
-      = 0 == i
+      = (0 == i)
       ? STATE_OP_FIRST
-      : modifier_count - 1 == i
+      : (modifier_count - 1 == i)
       ? STATE_OP_LAST
       : 0;
 




reply via email to

[Prev in Thread] Current Thread [Next in Thread]