[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r33173 - in gnunet/src: include multicast psyc psycstore
From: |
gnunet |
Subject: |
[GNUnet-SVN] r33173 - in gnunet/src: include multicast psyc psycstore |
Date: |
Tue, 6 May 2014 12:26:24 +0200 |
Author: tg
Date: 2014-05-06 12:26:24 +0200 (Tue, 06 May 2014)
New Revision: 33173
Modified:
gnunet/src/include/gnunet_protocols.h
gnunet/src/include/gnunet_psyc_service.h
gnunet/src/multicast/multicast_api.c
gnunet/src/psyc/gnunet-service-psyc.c
gnunet/src/psyc/psyc.h
gnunet/src/psyc/psyc_api.c
gnunet/src/psyc/psyc_common.c
gnunet/src/psyc/test_psyc.c
gnunet/src/psycstore/psycstore_api.c
Log:
psyc: in-order message delivery
Modified: gnunet/src/include/gnunet_protocols.h
===================================================================
--- gnunet/src/include/gnunet_protocols.h 2014-05-06 10:26:21 UTC (rev
33172)
+++ gnunet/src/include/gnunet_protocols.h 2014-05-06 10:26:24 UTC (rev
33173)
@@ -2148,7 +2148,7 @@
/**
* C: client
* S: service
- * M: muticast
+ * M: multicast
*/
/** S->C: result of an operation */
Modified: gnunet/src/include/gnunet_psyc_service.h
===================================================================
--- gnunet/src/include/gnunet_psyc_service.h 2014-05-06 10:26:21 UTC (rev
33172)
+++ gnunet/src/include/gnunet_psyc_service.h 2014-05-06 10:26:24 UTC (rev
33173)
@@ -167,14 +167,32 @@
/**
* Request from slave to master.
*/
- GNUNET_PSYC_MESSAGE_REQUEST = 1 << 1
+ GNUNET_PSYC_MESSAGE_REQUEST = 1 << 1,
+
+ /**
+ * Message can be delivered out of order.
+ */
+ GNUNET_PSYC_MESSAGE_ORDER_ANY = 1 << 2
};
+/**
+ * Values for the @a state_delta field of GNUNET_PSYC_MessageHeader.
+ */
+enum GNUNET_PSYC_StateDeltaValues
+{
+ GNUNET_PSYC_STATE_RESET = 0,
+
+ GNUNET_PSYC_STATE_NOT_MODIFIED = UINT64_MAX
+};
+
+
GNUNET_NETWORK_STRUCT_BEGIN
/**
* Header of a PSYC message.
+ *
+ * Only present when receiving a message.
*/
struct GNUNET_PSYC_MessageHeader
{
@@ -223,6 +241,12 @@
*/
uint32_t flags GNUNET_PACKED;
+ /**
+ * Number of message IDs since the last message that contained state
+ * operations. @see enum GNUNET_PSYC_StateDeltaValues
+ */
+ uint64_t state_delta GNUNET_PACKED;
+
/* Followed by NUL-terminated method name. */
};
@@ -479,22 +503,29 @@
enum GNUNET_PSYC_MasterTransmitFlags
{
GNUNET_PSYC_MASTER_TRANSMIT_NONE = 0,
+
/**
* Whether this message should reset the channel state,
* i.e. remove all previously stored state variables.
*/
- GNUNET_PSYC_MASTER_TRANSMIT_RESET_STATE = 1 << 0,
+ GNUNET_PSYC_MASTER_TRANSMIT_STATE_RESET = 1 << 0,
+
/**
- * Whether we need to increment the group generation counter after
- * transmitting this message.
+ * Whether this message contains any state modifiers.
*/
- GNUNET_PSYC_MASTER_TRANSMIT_INC_GROUP_GEN = 1 << 1,
+ GNUNET_PSYC_MASTER_TRANSMIT_STATE_MODIFY = 1 << 1,
/**
* Add PSYC header variable with the hash of the current channel state.
*/
- GNUNET_PSYC_MASTER_TRANSMIT_ADD_STATE_HASH = 1 << 2
+ GNUNET_PSYC_MASTER_TRANSMIT_STATE_HASH = 1 << 2,
+
+ /**
+ * Whether we need to increment the group generation counter after
+ * transmitting this message.
+ */
+ GNUNET_PSYC_MASTER_TRANSMIT_INC_GROUP_GEN = 1 << 3
};
Modified: gnunet/src/multicast/multicast_api.c
===================================================================
--- gnunet/src/multicast/multicast_api.c 2014-05-06 10:26:21 UTC (rev
33172)
+++ gnunet/src/multicast/multicast_api.c 2014-05-06 10:26:24 UTC (rev
33173)
@@ -182,7 +182,7 @@
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Calling origin's message callback "
- "for a message of type %u and size %u.\n",
+ "with a message of type %u and size %u.\n",
ntohs (msg->type), ntohs (msg->size));
struct GNUNET_MULTICAST_Origin *orig = (struct GNUNET_MULTICAST_Origin *)
grp;
orig->message_cb (orig->cls, msg);
@@ -190,8 +190,8 @@
else
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Calling slave's message callback "
- "for a message of type %u and size %u.\n",
+ "Calling member's message callback "
+ "with a message of type %u and size %u.\n",
ntohs (msg->type), ntohs (msg->size));
struct GNUNET_MULTICAST_Member *mem = (struct GNUNET_MULTICAST_Member *)
grp;
mem->message_cb (mem->cls, msg);
@@ -477,8 +477,8 @@
msg->group_generation = mh->group_generation;
/* FIXME: add fragment ID and signature in the service instead of here */
- msg->fragment_id = GNUNET_ntohll (orig->next_fragment_id++);
- msg->fragment_offset = GNUNET_ntohll (mh->fragment_offset);
+ msg->fragment_id = GNUNET_htonll (orig->next_fragment_id++);
+ msg->fragment_offset = GNUNET_htonll (mh->fragment_offset);
mh->fragment_offset += sizeof (*msg) + buf_size;
msg->purpose.size = htonl (sizeof (*msg) + buf_size
- sizeof (msg->header)
Modified: gnunet/src/psyc/gnunet-service-psyc.c
===================================================================
--- gnunet/src/psyc/gnunet-service-psyc.c 2014-05-06 10:26:21 UTC (rev
33172)
+++ gnunet/src/psyc/gnunet-service-psyc.c 2014-05-06 10:26:24 UTC (rev
33173)
@@ -73,19 +73,21 @@
struct TransmitMessage *next;
/**
- * Buffer with message to be transmitted.
+ * ID assigned to the message.
*/
- char *buf;
+ uint64_t id;
/**
* Size of @a buf
*/
- uint16_t size
-;
+ uint16_t size;
+
/**
* @see enum MessageState
*/
uint8_t state;
+
+ /* Followed by message */
};
@@ -100,9 +102,9 @@
/**
* Entry in the chan_msgs hashmap of @a recv_cache:
- * fragment_id -> FragmentEntry
+ * fragment_id -> RecvCacheEntry
*/
-struct FragmentEntry
+struct RecvCacheEntry
{
struct GNUNET_MULTICAST_MessageHeader *mmsg;
uint16_t ref_count;
@@ -110,20 +112,48 @@
/**
- * Entry in the @a recv_msgs hash map of a @a Channel.
- * message_id -> FragmentCache
+ * Entry in the @a recv_frags hash map of a @a Channel.
+ * message_id -> FragmentQueue
*/
-struct FragmentCache
+struct FragmentQueue
{
/**
- * Total size of header fragments (METHOD & MODIFIERs)
+ * Fragment IDs stored in @a recv_cache.
*/
+ struct GNUNET_CONTAINER_Heap *fragments;
+
+ /**
+ * Total size of received fragments.
+ */
+ uint64_t size;
+
+ /**
+ * Total size of received header fragments (METHOD & MODIFIERs)
+ */
uint64_t header_size;
/**
- * Fragment IDs stored in @a recv_cache.
+ * The @a state_delta field from struct GNUNET_PSYC_MessageMethod.
*/
- struct GNUNET_CONTAINER_Heap *fragments;
+ uint64_t state_delta;
+
+ /**
+ * The @a flags field from struct GNUNET_PSYC_MessageMethod.
+ */
+ uint32_t flags;
+
+ /**
+ * Receive state of message.
+ *
+ * @see MessageFragmentState
+ */
+ uint8_t state;
+
+ /**
+ * Is the message queued for delivery to the client?
+ * i.e. added to the recv_msgs queue
+ */
+ uint8_t queued;
};
@@ -139,13 +169,18 @@
/**
* Received fragments not yet sent to the client.
- * message_id -> FragmentCache
+ * message_id -> FragmentQueue
*/
- struct GNUNET_CONTAINER_MultiHashMap *recv_msgs;
+ struct GNUNET_CONTAINER_MultiHashMap *recv_frags;
/**
- * FIXME
+ * Received message IDs not yet sent to the client.
*/
+ struct GNUNET_CONTAINER_Heap *recv_msgs;
+
+ /**
+ * FIXME: needed?
+ */
GNUNET_SCHEDULER_TaskIdentifier tmit_task;
/**
@@ -159,6 +194,19 @@
struct GNUNET_HashCode pub_key_hash;
/**
+ * Last message ID sent to the client.
+ * 0 if there is no such message.
+ */
+ uint64_t max_message_id;
+
+ /**
+ * ID of the last stateful message, where the state operations has been
+ * processed and saved to PSYCstore and which has been sent to the client.
+ * 0 if there is no such message.
+ */
+ uint64_t max_state_message_id;
+
+ /**
* Expected value size for the modifier being received from the PSYC service.
*/
uint32_t tmit_mod_value_size_expected;
@@ -174,7 +222,7 @@
uint8_t tmit_state;
/**
- * FIXME
+ * FIXME: needed?
*/
uint8_t in_transmit;
@@ -221,7 +269,7 @@
struct GNUNET_MULTICAST_OriginMessageHandle *tmit_handle;
/**
- * Maximum message ID for this channel.
+ * Last message ID transmitted to this channel.
*
* Incremented before sending a message, thus the message_id in messages sent
* starts from 1.
@@ -229,13 +277,13 @@
uint64_t max_message_id;
/**
- * ID of the last message that contains any state operations.
+ * ID of the last message with state operations transmitted to the channel.
* 0 if there is no such message.
*/
uint64_t max_state_message_id;
/**
- * Maximum group generation for this channel.
+ * Maximum group generation transmitted to the channel.
*/
uint64_t max_group_generation;
@@ -292,11 +340,6 @@
struct GNUNET_MessageHeader *join_req;
/**
- * Maximum message ID for this channel.
- */
- uint64_t max_message_id;
-
- /**
* Maximum request ID for this channel.
*/
uint64_t max_request_id;
@@ -304,7 +347,7 @@
static inline void
-transmit_message (struct Channel *ch, uint8_t inc_msg_id);
+transmit_message (struct Channel *ch);
/**
@@ -386,7 +429,7 @@
/* Send pending messages to multicast before cleanup. */
if (NULL != ch->tmit_head)
{
- transmit_message (ch, GNUNET_NO);
+ transmit_message (ch);
}
else
{
@@ -484,6 +527,7 @@
hash_key_from_nll (struct GNUNET_HashCode *key, uint64_t n)
{
/* use little-endian order, as idx_of MultiHashMap casts key to unsigned int
*/
+ /* TODO: use built-in byte swap functions if available */
n = ((n << 8) & 0xFF00FF00FF00FF00ULL) | ((n >> 8) &
0x00FF00FF00FF00FFULL);
n = ((n << 16) & 0xFFFF0000FFFF0000ULL) | ((n >> 16) &
0x0000FFFF0000FFFFULL);
@@ -512,29 +556,40 @@
}
+/**
+ * Insert a multicast message fragment into the queue belonging to the message.
+ *
+ * @param ch Channel.
+ * @param mmsg Multicast message fragment.
+ * @param msg_id_hash Message ID of @a mmsg in a struct GNUNET_HashCode.
+ * @param first_ptype First PSYC message part type in @a mmsg.
+ * @param last_ptype Last PSYC message part type in @a mmsg.
+ */
static void
-fragment_cache_insert (struct Channel *ch,
- const struct GNUNET_HashCode *msg_id,
- struct FragmentCache *frag_cache,
+fragment_queue_insert (struct Channel *ch,
const struct GNUNET_MULTICAST_MessageHeader *mmsg,
- uint16_t last_part_type)
+ uint16_t first_ptype, uint16_t last_ptype)
{
- uint16_t size = ntohs (mmsg->header.size);
+ const uint16_t size = ntohs (mmsg->header.size);
+ const uint64_t frag_offset = GNUNET_ntohll (mmsg->fragment_offset);
struct GNUNET_CONTAINER_MultiHashMap
*chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache,
&ch->pub_key_hash);
- if (NULL == frag_cache)
+ struct GNUNET_HashCode msg_id_hash;
+ hash_key_from_nll (&msg_id_hash, mmsg->message_id);
+
+ struct FragmentQueue
+ *fragq = GNUNET_CONTAINER_multihashmap_get (ch->recv_frags, &msg_id_hash);
+
+ if (NULL == fragq)
{
- frag_cache = GNUNET_new (struct FragmentCache);
- frag_cache->fragments
+ fragq = GNUNET_new (struct FragmentQueue);
+ fragq->state = MSG_FRAG_STATE_HEADER;
+ fragq->fragments
= GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
- if (NULL == ch->recv_msgs)
- {
- ch->recv_msgs = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
- }
- GNUNET_CONTAINER_multihashmap_put (ch->recv_msgs, msg_id, frag_cache,
+ GNUNET_CONTAINER_multihashmap_put (ch->recv_frags, &msg_id_hash, fragq,
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
if (NULL == chan_msgs)
@@ -545,190 +600,335 @@
}
}
- struct GNUNET_HashCode *frag_id = GNUNET_new (struct GNUNET_HashCode);
- hash_key_from_nll (frag_id, mmsg->fragment_id);
- struct FragmentEntry
- *frag_entry = GNUNET_CONTAINER_multihashmap_get (chan_msgs, frag_id);
- if (NULL == frag_entry)
+ struct GNUNET_HashCode frag_id_hash;
+ hash_key_from_nll (&frag_id_hash, mmsg->fragment_id);
+ struct RecvCacheEntry
+ *cache_entry = GNUNET_CONTAINER_multihashmap_get (chan_msgs,
&frag_id_hash);
+ if (NULL == cache_entry)
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"%p Adding message fragment to cache. "
- "fragment_id: %" PRIu64 ", "
- "header_size: %" PRIu64 " + %" PRIu64 ").\n",
- ch, GNUNET_ntohll (mmsg->fragment_id),
- frag_cache->header_size, size);
- frag_entry = GNUNET_new (struct FragmentEntry);
- frag_entry->ref_count = 1;
- frag_entry->mmsg = GNUNET_malloc (size);
- memcpy (frag_entry->mmsg, mmsg, size);
- GNUNET_CONTAINER_multihashmap_put (chan_msgs, frag_id, frag_entry,
+ "message_id: %" PRIu64 ", fragment_id: %" PRIu64 ", "
+ "header_size: %" PRIu64 " + %u).\n",
+ ch, GNUNET_ntohll (mmsg->message_id),
+ GNUNET_ntohll (mmsg->fragment_id),
+ fragq->header_size, size);
+ cache_entry = GNUNET_new (struct RecvCacheEntry);
+ cache_entry->ref_count = 1;
+ cache_entry->mmsg = GNUNET_malloc (size);
+ memcpy (cache_entry->mmsg, mmsg, size);
+ GNUNET_CONTAINER_multihashmap_put (chan_msgs, &frag_id_hash, cache_entry,
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
}
else
{
- frag_entry->ref_count++;
+ cache_entry->ref_count++;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%p Message fragment already in cache. "
- "fragment_id: %" PRIu64 ", ref_count: %u\n",
- ch, GNUNET_ntohll (mmsg->fragment_id), frag_entry->ref_count);
+ "%p Message fragment is already in cache. "
+ "message_id: %" PRIu64 ", fragment_id: %" PRIu64
+ ", ref_count: %u\n",
+ ch, GNUNET_ntohll (mmsg->message_id),
+ GNUNET_ntohll (mmsg->fragment_id), cache_entry->ref_count);
}
- switch (last_part_type)
+ if (MSG_FRAG_STATE_HEADER == fragq->state)
{
- case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
- case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
- case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT:
- frag_cache->header_size += size;
+ if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
+ {
+ struct GNUNET_PSYC_MessageMethod *
+ pmeth = (struct GNUNET_PSYC_MessageMethod *) &mmsg[1];
+ fragq->state_delta = GNUNET_ntohll (pmeth->state_delta);
+ fragq->flags = ntohl (pmeth->flags);
+ }
+
+ if (last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA)
+ {
+ fragq->header_size += size;
+ }
+ else if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype
+ || frag_offset == fragq->header_size)
+ { /* header is now complete */
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "%p Header of message %" PRIu64 " is complete.\n",
+ ch, GNUNET_ntohll (mmsg->message_id));
+
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "%p Adding message %" PRIu64 " to queue.\n",
+ ch, GNUNET_ntohll (mmsg->message_id));
+ fragq->state = MSG_FRAG_STATE_DATA;
+ }
+ else
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "%p Header of message %" PRIu64 " is NOT complete yet: "
+ "%" PRIu64 " != %" PRIu64 "\n",
+ ch, GNUNET_ntohll (mmsg->message_id), frag_offset,
+ fragq->header_size);
+ }
}
- GNUNET_CONTAINER_heap_insert (frag_cache->fragments, frag_id,
+
+ switch (last_ptype)
+ {
+ case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
+ if (frag_offset == fragq->size)
+ fragq->state = MSG_FRAG_STATE_END;
+ else
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "%p Message %" PRIu64 " is NOT complete yet: "
+ "%" PRIu64 " != %" PRIu64 "\n",
+ ch, GNUNET_ntohll (mmsg->message_id), frag_offset,
+ fragq->size);
+ break;
+
+ case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
+ /* Drop message without delivering to client if it's a single fragment */
+ fragq->state =
+ (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
+ ? MSG_FRAG_STATE_DROP
+ : MSG_FRAG_STATE_CANCEL;
+ }
+
+ switch (fragq->state)
+ {
+ case MSG_FRAG_STATE_DATA:
+ case MSG_FRAG_STATE_END:
+ case MSG_FRAG_STATE_CANCEL:
+ if (GNUNET_NO == fragq->queued)
+ {
+ GNUNET_CONTAINER_heap_insert (ch->recv_msgs, NULL,
+ GNUNET_ntohll (mmsg->message_id));
+ fragq->queued = GNUNET_YES;
+ }
+ }
+
+ fragq->size += size;
+ GNUNET_CONTAINER_heap_insert (fragq->fragments, NULL,
GNUNET_ntohll (mmsg->fragment_id));
}
+/**
+ * Run fragment queue of a message.
+ *
+ * Send fragments of a message in order to client, after all modifiers arrived
+ * from multicast.
+ *
+ * @param ch Channel.
+ * @param msg_id ID of the message @a fragq belongs to.
+ * @param fragq Fragment queue of the message.
+ * @param drop Drop message without delivering to client?
+ * #GNUNET_YES or #GNUNET_NO.
+ */
static void
-fragment_cache_clear (struct Channel *ch,
- const struct GNUNET_HashCode *msg_id,
- struct FragmentCache *frag_cache,
- uint8_t send_to_client)
+fragment_queue_run (struct Channel *ch, uint64_t msg_id,
+ struct FragmentQueue *fragq, uint8_t drop)
{
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%p Clearing message fragment cache.\n", ch);
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "%p Running message fragment queue for message %" PRIu64
+ " (state: %u).\n",
+ ch, msg_id, fragq->state);
struct GNUNET_CONTAINER_MultiHashMap
*chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache,
&ch->pub_key_hash);
GNUNET_assert (NULL != chan_msgs);
- struct GNUNET_HashCode *frag_id;
+ uint64_t frag_id;
- while ((frag_id = GNUNET_CONTAINER_heap_remove_root (frag_cache->fragments)))
+ while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (fragq->fragments, NULL,
+ &frag_id))
{
- struct FragmentEntry
- *frag_entry = GNUNET_CONTAINER_multihashmap_get (chan_msgs, frag_id);
- if (frag_entry != NULL)
+ struct GNUNET_HashCode frag_id_hash;
+ hash_key_from_hll (&frag_id_hash, frag_id);
+ struct RecvCacheEntry *cache_entry
+ = GNUNET_CONTAINER_multihashmap_get (chan_msgs, &frag_id_hash);
+ if (cache_entry != NULL)
{
- if (GNUNET_YES == send_to_client)
+ if (GNUNET_NO == drop)
{
- message_to_client (ch, frag_entry->mmsg);
+ message_to_client (ch, cache_entry->mmsg);
}
- if (1 == frag_entry->ref_count)
+ if (cache_entry->ref_count <= 1)
{
- GNUNET_CONTAINER_multihashmap_remove (chan_msgs, frag_id, frag_entry);
- GNUNET_free (frag_entry->mmsg);
- GNUNET_free (frag_entry);
+ GNUNET_CONTAINER_multihashmap_remove (chan_msgs, &frag_id_hash,
+ cache_entry);
+ GNUNET_free (cache_entry->mmsg);
+ GNUNET_free (cache_entry);
}
else
{
- frag_entry->ref_count--;
+ cache_entry->ref_count--;
}
}
- GNUNET_free (frag_id);
+#if CACHE_AGING_IMPLEMENTED
+ else if (GNUNET_NO == drop)
+ {
+ /* TODO: fragment not in cache anymore, retrieve it from PSYCstore */
+ }
+#endif
+
+ GNUNET_CONTAINER_heap_remove_root (fragq->fragments);
}
- GNUNET_CONTAINER_multihashmap_remove (ch->recv_msgs, msg_id, frag_cache);
- GNUNET_CONTAINER_heap_destroy (frag_cache->fragments);
- GNUNET_free (frag_cache);
+ if (MSG_FRAG_STATE_END <= fragq->state)
+ {
+ struct GNUNET_HashCode msg_id_hash;
+ hash_key_from_nll (&msg_id_hash, msg_id);
+
+ GNUNET_CONTAINER_multihashmap_remove (ch->recv_frags, &msg_id_hash, fragq);
+ GNUNET_CONTAINER_heap_destroy (fragq->fragments);
+ GNUNET_free (fragq);
+ }
+ else
+ {
+ fragq->queued = GNUNET_NO;
+ }
}
/**
- * Incoming message fragment from multicast.
+ * Run message queue.
*
- * Store it using PSYCstore and send it to the client of the channel.
+ * Send messages in queue to client in order after a message has arrived from
+ * multicast, according to the following:
+ * - A message is only sent if all of its modifiers arrived.
+ * - A stateful message is only sent if the previous stateful message
+ * has already been delivered to the client.
+ *
+ * @param ch Channel.
+ * @return Number of messages removed from queue and sent to client.
*/
-static void
-message_cb (void *cls, const struct GNUNET_MessageHeader *msg)
+static uint64_t
+message_queue_run (struct Channel *ch)
{
- struct Channel *ch = cls;
- uint16_t type = ntohs (msg->type);
- uint16_t size = ntohs (msg->size);
-
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%p Received message of type %u and size %u from multicast.\n",
- ch, type, size);
-
- switch (type)
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "%p Running message queue.\n", ch);
+ uint64_t n = 0;
+ uint64_t msg_id;
+ while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (ch->recv_msgs, NULL,
+ &msg_id))
{
- case GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE:
- {
- GNUNET_PSYCSTORE_fragment_store (store, &ch->pub_key,
- (const struct
- GNUNET_MULTICAST_MessageHeader *) msg,
- 0, NULL, NULL);
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "%p Processing message %" PRIu64 " in queue.\n", ch, msg_id);
+ struct GNUNET_HashCode msg_id_hash;
+ hash_key_from_hll (&msg_id_hash, msg_id);
-#if TODO
- /* FIXME: apply modifiers to state in PSYCstore */
- GNUNET_PSYCSTORE_state_modify (store, &ch->pub_key,
- GNUNET_ntohll (mmsg->message_id),
- meth->mod_count, mods,
- rcb, rcb_cls);
-#endif
+ struct FragmentQueue *
+ fragq = GNUNET_CONTAINER_multihashmap_get (ch->recv_frags, &msg_id_hash);
- const struct GNUNET_MULTICAST_MessageHeader
- *mmsg = (const struct GNUNET_MULTICAST_MessageHeader *) msg;
-
- uint16_t ptype = GNUNET_PSYC_message_last_part (size - sizeof (*mmsg),
- (const char *) &mmsg[1]);
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Last message part type %u\n", ptype);
-
- if (GNUNET_NO == ptype)
+ if (NULL == fragq || fragq->state <= MSG_FRAG_STATE_HEADER)
{
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- "%p Received message with invalid parts from multicast. "
- "Dropping message.\n", ch);
- GNUNET_break_op (0);
+ "%p No fragq (%p) or header not complete.\n",
+ ch, fragq);
break;
}
- struct GNUNET_HashCode msg_id;
- hash_key_from_nll (&msg_id, mmsg->message_id);
-
- struct FragmentCache *frag_cache = NULL;
- if (NULL != ch->recv_msgs)
- frag_cache = GNUNET_CONTAINER_multihashmap_get (ch->recv_msgs, &msg_id);
-
- switch (ptype)
+ if (MSG_FRAG_STATE_HEADER == fragq->state)
{
- case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
- case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
- /* FIXME: check state flag / max_state_message_id */
- if (NULL == frag_cache)
+ /* Check if there's a missing message before the current one */
+ if (GNUNET_PSYC_STATE_NOT_MODIFIED == fragq->state_delta)
{
- message_to_client (ch, mmsg);
- break;
+ if (!(fragq->flags & GNUNET_PSYC_MESSAGE_ORDER_ANY)
+ && msg_id - 1 != ch->max_message_id)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "%p Out of order message. "
+ "(%" PRIu64 " - 1 != %" PRIu64 ")\n",
+ ch, msg_id, ch->max_message_id);
+ break;
+ }
}
else
{
- if (GNUNET_ntohll (mmsg->fragment_offset) == frag_cache->header_size)
- { /* first data fragment after the header, send cached fragments */
- fragment_cache_clear (ch, &msg_id, frag_cache, GNUNET_YES);
- message_to_client (ch, mmsg);
+ if (msg_id - fragq->state_delta != ch->max_state_message_id)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "%p Out of order stateful message. "
+ "(%" PRIu64 " - %" PRIu64 " != %" PRIu64 ")\n",
+ ch, msg_id, fragq->state_delta,
ch->max_state_message_id);
break;
}
- else
- { /* still missing fragments from the header, cache data fragment */
- /* fall thru */
- }
+#if TODO
+ /* FIXME: apply modifiers to state in PSYCstore */
+ GNUNET_PSYCSTORE_state_modify (store, &ch->pub_key, message_id,
+ state_modify_result_cb, cls);
+#endif
+ ch->max_state_message_id = msg_id;
}
+ ch->max_message_id = msg_id;
+ }
+ fragment_queue_run (ch, msg_id, fragq, MSG_FRAG_STATE_DROP ==
fragq->state);
+ GNUNET_CONTAINER_heap_remove_root (ch->recv_msgs);
+ n++;
+ }
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "%p Removed %" PRIu64 " messages from queue.\n", ch, n);
+ return n;
+}
- case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
- case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
- case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT:
- /* not all modifiers arrived yet, cache fragment */
- fragment_cache_insert (ch, &msg_id, frag_cache, mmsg, ptype);
- break;
- case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
- if (NULL != frag_cache)
- { /* fragments not yet sent to client, remove from cache */
- fragment_cache_clear (ch, &msg_id, frag_cache, GNUNET_NO);
- }
- else
- {
- message_to_client (ch, mmsg);
- }
- break;
- }
+/**
+ * Handle incoming message from multicast.
+ *
+ * @param ch Channel.
+ * @param mmsg Multicast message.
+ *
+ * @return #GNUNET_OK or #GNUNET_SYSERR
+ */
+static int
+handle_multicast_message (struct Channel *ch,
+ const struct GNUNET_MULTICAST_MessageHeader *mmsg)
+{
+ GNUNET_PSYCSTORE_fragment_store (store, &ch->pub_key, mmsg, 0, NULL, NULL);
+
+ uint16_t size = ntohs (mmsg->header.size);
+ uint16_t first_ptype = 0, last_ptype = 0;
+
+ if (GNUNET_SYSERR
+ == GNUNET_PSYC_check_message_parts (size - sizeof (*mmsg),
+ (const char *) &mmsg[1],
+ &first_ptype, &last_ptype))
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "%p Received message with invalid parts from multicast. "
+ "Dropping message.\n", ch);
+ GNUNET_break_op (0);
+ return GNUNET_SYSERR;
+ }
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Message parts: first: type %u, last: type %u\n",
+ first_ptype, last_ptype);
+
+ fragment_queue_insert (ch, mmsg, first_ptype, last_ptype);
+ message_queue_run (ch);
+
+ return GNUNET_OK;
+}
+
+
+/**
+ * Incoming message fragment from multicast.
+ *
+ * Store it using PSYCstore and send it to the client of the channel.
+ */
+static void
+message_cb (void *cls, const struct GNUNET_MessageHeader *msg)
+{
+ struct Channel *ch = cls;
+ uint16_t type = ntohs (msg->type);
+ uint16_t size = ntohs (msg->size);
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "%p Received message of type %u and size %u from multicast.\n",
+ ch, type, size);
+
+ switch (type)
+ {
+ case GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE:
+ {
+ handle_multicast_message (ch, (const struct
+ GNUNET_MULTICAST_MessageHeader *) msg);
break;
}
default:
@@ -770,8 +970,9 @@
= (const struct GNUNET_MULTICAST_RequestHeader *) msg;
/* FIXME: see message_cb() */
- if (GNUNET_NO == GNUNET_PSYC_message_last_part (size - sizeof (*req),
- (const char *) &req[1]))
+ if (GNUNET_SYSERR == GNUNET_PSYC_check_message_parts (size - sizeof (*req),
+ (const char *)
&req[1],
+ NULL, NULL))
{
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
"%p Dropping message with invalid parts "
@@ -825,7 +1026,8 @@
if (GNUNET_OK == result || GNUNET_NO == result)
{
mst->max_message_id = max_message_id;
- mst->max_state_message_id = max_state_message_id;
+ ch->max_message_id = max_message_id;
+ ch->max_state_message_id = max_state_message_id;
mst->max_group_generation = max_group_generation;
mst->origin
= GNUNET_MULTICAST_origin_start (cfg, &mst->priv_key,
@@ -860,7 +1062,8 @@
if (GNUNET_OK == result || GNUNET_NO == result)
{
- slv->max_message_id = max_message_id;
+ ch->max_message_id = max_message_id;
+ ch->max_state_message_id = max_state_message_id;
slv->member
= GNUNET_MULTICAST_member_join (cfg, &ch->pub_key, &slv->slave_key,
&slv->origin,
@@ -879,6 +1082,15 @@
}
+static void
+channel_init (struct Channel *ch)
+{
+ ch->recv_msgs
+ = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
+ ch->recv_frags = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
+}
+
+
/**
* Handle a connecting client starting a channel master.
*/
@@ -888,14 +1100,18 @@
{
const struct MasterStartRequest *req
= (const struct MasterStartRequest *) msg;
+
struct Master *mst = GNUNET_new (struct Master);
+ mst->policy = ntohl (req->policy);
+ mst->priv_key = req->channel_key;
+
struct Channel *ch = &mst->channel;
ch->client = client;
ch->is_master = GNUNET_YES;
- mst->policy = ntohl (req->policy);
- mst->priv_key = req->channel_key;
GNUNET_CRYPTO_eddsa_key_get_public (&mst->priv_key, &ch->pub_key);
GNUNET_CRYPTO_hash (&ch->pub_key, sizeof (ch->pub_key), &ch->pub_key_hash);
+ channel_init (ch);
+
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"%p Master connected to channel %s.\n",
mst, GNUNET_h2s (&ch->pub_key_hash));
@@ -919,13 +1135,7 @@
const struct SlaveJoinRequest *req
= (const struct SlaveJoinRequest *) msg;
struct Slave *slv = GNUNET_new (struct Slave);
- struct Channel *ch = &slv->channel;
- slv->channel.client = client;
- slv->channel.is_master = GNUNET_NO;
slv->slave_key = req->slave_key;
- ch->pub_key = req->channel_key;
- GNUNET_CRYPTO_hash (&ch->pub_key, sizeof (ch->pub_key),
- &ch->pub_key_hash);
slv->origin = req->origin;
slv->relay_count = ntohl (req->relay_count);
if (0 < slv->relay_count)
@@ -939,6 +1149,14 @@
memcpy (&slv->relays[i], &relays[i], sizeof (*relays));
}
+ struct Channel *ch = &slv->channel;
+ ch->client = client;
+ ch->is_master = GNUNET_NO;
+ ch->pub_key = req->channel_key;
+ GNUNET_CRYPTO_hash (&ch->pub_key, sizeof (ch->pub_key),
+ &ch->pub_key_hash);
+ channel_init (ch);
+
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"%p Slave connected to channel %s.\n",
slv, GNUNET_h2s (&ch->pub_key_hash));
@@ -991,7 +1209,7 @@
"%p transmit_notify: sending %u bytes.\n", ch, tmit_msg->size);
*data_size = tmit_msg->size;
- memcpy (data, tmit_msg->buf, *data_size);
+ memcpy (data, &tmit_msg[1], *data_size);
GNUNET_CONTAINER_DLL_remove (ch->tmit_head, ch->tmit_tail, tmit_msg);
GNUNET_free (tmit_msg);
@@ -1003,7 +1221,7 @@
{
if (NULL != ch->tmit_head)
{
- transmit_message (ch, GNUNET_NO);
+ transmit_message (ch);
}
else if (ch->disconnected)
{
@@ -1054,14 +1272,12 @@
* Transmit a message from a channel master to the multicast group.
*/
static void
-master_transmit_message (struct Master *mst, uint8_t inc_msg_id)
+master_transmit_message (struct Master *mst)
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p master_transmit_message()\n", mst);
mst->channel.tmit_task = 0;
if (NULL == mst->tmit_handle)
{
- if (GNUNET_YES == inc_msg_id)
- mst->max_message_id++;
mst->tmit_handle
= GNUNET_MULTICAST_origin_to_all (mst->origin, mst->max_message_id,
mst->max_group_generation,
@@ -1078,13 +1294,11 @@
* Transmit a message from a channel slave to the multicast group.
*/
static void
-slave_transmit_message (struct Slave *slv, uint8_t inc_msg_id)
+slave_transmit_message (struct Slave *slv)
{
slv->channel.tmit_task = 0;
if (NULL == slv->tmit_handle)
{
- if (GNUNET_YES == inc_msg_id)
- slv->max_message_id++;
slv->tmit_handle
= GNUNET_MULTICAST_member_to_origin (slv->member, slv->max_request_id,
slave_transmit_notify, slv);
@@ -1097,30 +1311,95 @@
static inline void
-transmit_message (struct Channel *ch, uint8_t inc_msg_id)
+transmit_message (struct Channel *ch)
{
ch->is_master
- ? master_transmit_message ((struct Master *) ch, inc_msg_id)
- : slave_transmit_message ((struct Slave *) ch, inc_msg_id);
+ ? master_transmit_message ((struct Master *) ch)
+ : slave_transmit_message ((struct Slave *) ch);
}
+/**
+ * Queue a message from a channel master for sending to the multicast group.
+ */
static void
-transmit_error (struct Channel *ch)
+master_queue_message (struct Master *mst, struct TransmitMessage *tmit_msg,
+ uint16_t first_ptype, uint16_t last_ptype)
{
- struct GNUNET_MessageHeader *msg;
- struct TransmitMessage *tmit_msg = GNUNET_malloc (sizeof (*tmit_msg)
- + sizeof (*msg));
- msg = (struct GNUNET_MessageHeader *) &tmit_msg[1];
- msg->size = ntohs (sizeof (*msg));
- msg->type = ntohs (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p master_queue_message()\n", mst);
- tmit_msg->buf = (char *) &tmit_msg[1];
- tmit_msg->size = sizeof (*msg);
+ if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
+ {
+ tmit_msg->id = ++mst->max_message_id;
+ struct GNUNET_PSYC_MessageMethod *pmeth
+ = (struct GNUNET_PSYC_MessageMethod *) &tmit_msg[1];
+
+ if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_RESET)
+ {
+ pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_RESET);
+ }
+ else if (pmeth->flags & GNUNET_PSYC_MASTER_TRANSMIT_STATE_MODIFY)
+ {
+ pmeth->state_delta = GNUNET_htonll (tmit_msg->id
+ - mst->max_state_message_id);
+ }
+ else
+ {
+ pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_NOT_MODIFIED);
+ }
+ }
+}
+
+
+/**
+ * Queue a message from a channel slave for sending to the multicast group.
+ */
+static void
+slave_queue_message (struct Slave *slv, struct TransmitMessage *tmit_msg,
+ uint16_t first_ptype, uint16_t last_ptype)
+{
+ if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == first_ptype)
+ {
+ struct GNUNET_PSYC_MessageMethod *pmeth
+ = (struct GNUNET_PSYC_MessageMethod *) &tmit_msg[1];
+ pmeth->state_delta = GNUNET_htonll (GNUNET_PSYC_STATE_NOT_MODIFIED);
+ tmit_msg->id = ++slv->max_request_id;
+ }
+}
+
+
+static void
+queue_message (struct Channel *ch, const struct GNUNET_MessageHeader *msg,
+ uint16_t first_ptype, uint16_t last_ptype)
+{
+ uint16_t size = ntohs (msg->size) - sizeof (*msg);
+ struct TransmitMessage *tmit_msg = GNUNET_malloc (sizeof (*tmit_msg) + size);
+ memcpy (&tmit_msg[1], &msg[1], size);
+ tmit_msg->size = size;
tmit_msg->state = ch->tmit_state;
+
GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, tmit_msg);
- transmit_message (ch, GNUNET_NO);
+ ch->is_master
+ ? master_queue_message ((struct Master *) ch, tmit_msg,
+ first_ptype, last_ptype)
+ : slave_queue_message ((struct Slave *) ch, tmit_msg,
+ first_ptype, last_ptype);
+}
+
+
+static void
+transmit_error (struct Channel *ch)
+{
+ uint16_t type = GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL;
+
+ struct GNUNET_MessageHeader msg;
+ msg.size = ntohs (sizeof (msg));
+ msg.type = ntohs (type);
+
+ queue_message (ch, &msg, type, type);
+ transmit_message (ch);
+
/* FIXME: cleanup */
}
@@ -1136,6 +1415,10 @@
= GNUNET_SERVER_client_get_user_context (client, struct Channel);
GNUNET_assert (NULL != ch);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "%p Received message from client.\n", ch);
+ GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, msg);
+
if (GNUNET_YES != ch->ready)
{
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
@@ -1145,10 +1428,7 @@
return;
}
- uint8_t inc_msg_id = GNUNET_NO;
uint16_t size = ntohs (msg->size);
- uint16_t psize = 0, ptype = 0, pos = 0;
-
if (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < size - sizeof (*msg))
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "%p Message payload too large\n", ch);
@@ -1158,42 +1438,22 @@
return;
}
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%p Received message from client.\n", ch);
- GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, msg);
-
- for (pos = 0; sizeof (*msg) + pos < size; pos += psize)
+ uint16_t first_ptype = 0, last_ptype = 0;
+ if (GNUNET_SYSERR
+ == GNUNET_PSYC_check_message_parts (size - sizeof (*msg),
+ (const char *) &msg[1],
+ &first_ptype, &last_ptype))
{
- const struct GNUNET_MessageHeader *pmsg
- = (const struct GNUNET_MessageHeader *) ((char *) &msg[1] + pos);
- psize = ntohs (pmsg->size);
- ptype = ntohs (pmsg->type);
- if (psize < sizeof (*pmsg) || sizeof (*msg) + pos + psize > size)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
- "%p Received invalid message part of type %u and size %u "
- "from client.\n", ch, ptype, psize);
- GNUNET_break (0);
- transmit_error (ch);
- GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
- return;
- }
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%p Received message part from client.\n", ch);
- GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, pmsg);
-
- if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == ptype)
- inc_msg_id = GNUNET_YES;
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "%p Received invalid message part from client.\n", ch);
+ GNUNET_break (0);
+ transmit_error (ch);
+ GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
+ return;
}
- size -= sizeof (*msg);
- struct TransmitMessage *tmit_msg = GNUNET_malloc (sizeof (*tmit_msg) + size);
- tmit_msg->buf = (char *) &tmit_msg[1];
- memcpy (tmit_msg->buf, &msg[1], size);
- tmit_msg->size = size;
- tmit_msg->state = ch->tmit_state;
- GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, tmit_msg);
- transmit_message (ch, inc_msg_id);
+ queue_message (ch, msg, first_ptype, last_ptype);
+ transmit_message (ch);
GNUNET_SERVER_receive_done (client, GNUNET_OK);
};
Modified: gnunet/src/psyc/psyc.h
===================================================================
--- gnunet/src/psyc/psyc.h 2014-05-06 10:26:21 UTC (rev 33172)
+++ gnunet/src/psyc/psyc.h 2014-05-06 10:26:24 UTC (rev 33173)
@@ -31,8 +31,9 @@
#include "gnunet_psyc_service.h"
-uint16_t
-GNUNET_PSYC_message_last_part (uint16_t data_size, const char *data);
+int
+GNUNET_PSYC_check_message_parts (uint16_t data_size, const char *data,
+ uint16_t *first_ptype, uint16_t *last_ptype);
void
GNUNET_PSYC_log_message (enum GNUNET_ErrorType kind,
@@ -41,17 +42,29 @@
enum MessageState
{
- MSG_STATE_START = 0,
- MSG_STATE_HEADER = 1,
- MSG_STATE_METHOD = 2,
+ MSG_STATE_START = 0,
+ MSG_STATE_HEADER = 1,
+ MSG_STATE_METHOD = 2,
MSG_STATE_MODIFIER = 3,
MSG_STATE_MOD_CONT = 4,
- MSG_STATE_DATA = 5,
- MSG_STATE_END = 6,
- MSG_STATE_CANCEL = 7,
+ MSG_STATE_DATA = 5,
+ MSG_STATE_END = 6,
+ MSG_STATE_CANCEL = 7,
+ MSG_STATE_ERROR = 8,
};
+enum MessageFragmentState
+{
+ MSG_FRAG_STATE_START = 0,
+ MSG_FRAG_STATE_HEADER = 1,
+ MSG_FRAG_STATE_DATA = 2,
+ MSG_FRAG_STATE_END = 3,
+ MSG_FRAG_STATE_CANCEL = 4,
+ MSG_FRAG_STATE_DROP = 5,
+};
+
+
GNUNET_NETWORK_STRUCT_BEGIN
Modified: gnunet/src/psyc/psyc_api.c
===================================================================
--- gnunet/src/psyc/psyc_api.c 2014-05-06 10:26:21 UTC (rev 33172)
+++ gnunet/src/psyc/psyc_api.c 2014-05-06 10:26:24 UTC (rev 33173)
@@ -1502,7 +1502,7 @@
slvadd = (struct ChannelSlaveAdd *) &op[1];
op->msg = (struct GNUNET_MessageHeader *) slvadd;
- slvadd->header.type = GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_ADD;
+ slvadd->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_ADD);
slvadd->header.size = htons (sizeof (*slvadd));
slvadd->announced_at = GNUNET_htonll (announced_at);
slvadd->effective_since = GNUNET_htonll (effective_since);
@@ -1544,7 +1544,7 @@
slvrm = (struct ChannelSlaveRemove *) &op[1];
op->msg = (struct GNUNET_MessageHeader *) slvrm;
- slvrm->header.type = GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_RM;
+ slvrm->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_RM);
slvrm->header.size = htons (sizeof (*slvrm));
slvrm->announced_at = GNUNET_htonll (announced_at);
GNUNET_CONTAINER_DLL_insert_tail (channel->tmit_head,
Modified: gnunet/src/psyc/psyc_common.c
===================================================================
--- gnunet/src/psyc/psyc_common.c 2014-05-06 10:26:21 UTC (rev 33172)
+++ gnunet/src/psyc/psyc_common.c 2014-05-06 10:26:24 UTC (rev 33173)
@@ -30,36 +30,48 @@
/**
* Check if @a data contains a series of valid message parts.
*
- * @param data_size Size of @a data.
- * @param data Data.
+ * @param data_size Size of @a data.
+ * @param data Data.
+ * @param[out] first_ptype Type of first message part.
+ * @param[out] last_ptype Type of last message part.
*
- * @return Message type number
- * or GNUNET_NO if the message contains invalid or no parts.
+ * @return Number of message parts found in @a data.
+ * or GNUNET_SYSERR if the message contains invalid parts.
*/
-uint16_t
-GNUNET_PSYC_message_last_part (uint16_t data_size, const char *data)
+int
+GNUNET_PSYC_check_message_parts (uint16_t data_size, const char *data,
+ uint16_t *first_ptype, uint16_t *last_ptype)
{
const struct GNUNET_MessageHeader *pmsg;
- uint16_t ptype = GNUNET_NO;
- uint16_t psize = 0;
- uint16_t pos = 0;
+ uint16_t parts = 0, ptype = 0, psize = 0, pos = 0;
+ if (NULL != first_ptype)
+ *first_ptype = 0;
+ if (NULL != last_ptype)
+ *last_ptype = 0;
- for (pos = 0; pos < data_size; pos += psize)
+ for (pos = 0; pos < data_size; pos += psize, parts++)
{
pmsg = (const struct GNUNET_MessageHeader *) (data + pos);
psize = ntohs (pmsg->size);
ptype = ntohs (pmsg->type);
- if (psize < sizeof (*pmsg) || pos + psize > data_size
+ if (0 == parts && NULL != first_ptype)
+ *first_ptype = ptype;
+ if (NULL != last_ptype
+ && *last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END)
+ *last_ptype = ptype;
+ if (psize < sizeof (*pmsg)
+ || pos + psize > data_size
|| ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD
|| GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL < ptype)
{
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
"Invalid message part of type %u and size %u.\n",
ptype, psize);
- return GNUNET_NO;
+ return GNUNET_SYSERR;
}
+ /* FIXME: check message part order */
}
- return ptype;
+ return parts;
}
Modified: gnunet/src/psyc/test_psyc.c
===================================================================
--- gnunet/src/psyc/test_psyc.c 2014-05-06 10:26:21 UTC (rev 33172)
+++ gnunet/src/psyc/test_psyc.c 2014-05-06 10:26:24 UTC (rev 33173)
@@ -35,9 +35,9 @@
#include "gnunet_env_lib.h"
#include "gnunet_psyc_service.h"
-#define TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30)
+#define TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 10)
-#define DEBUG_SERVICE 1
+#define DEBUG_SERVICE 0
/**
@@ -120,7 +120,7 @@
/**
- * Terminate the testcase (failure).
+ * Terminate the test case (failure).
*
* @param cls NULL
* @param tc scheduler context
@@ -134,7 +134,7 @@
/**
- * Terminate the testcase (success).
+ * Terminate the test case (success).
*
* @param cls NULL
* @param tc scheduler context
@@ -148,7 +148,7 @@
/**
- * Finish the testcase (successfully).
+ * Finish the test case (successfully).
*/
static void
end ()
@@ -518,7 +518,8 @@
GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Starting master.\n");
mst = GNUNET_PSYC_master_start (cfg, channel_key,
GNUNET_PSYC_CHANNEL_PRIVATE,
- &master_message, &join_request,
&master_started, NULL);
+ &master_message, &join_request,
+ &master_started, NULL);
}
Modified: gnunet/src/psycstore/psycstore_api.c
===================================================================
--- gnunet/src/psycstore/psycstore_api.c 2014-05-06 10:26:21 UTC (rev
33172)
+++ gnunet/src/psycstore/psycstore_api.c 2014-05-06 10:26:24 UTC (rev
33173)
@@ -1099,9 +1099,9 @@
req->message_id = GNUNET_htonll (message_id);
req->name_size = htons (name_size);
req->flags
- = 0 == i
+ = (0 == i)
? STATE_OP_FIRST
- : modifier_count - 1 == i
+ : (modifier_count - 1 == i)
? STATE_OP_LAST
: 0;
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r33173 - in gnunet/src: include multicast psyc psycstore,
gnunet <=