gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r36380 - in gnunet/src: include multicast psyc psycstore so


From: gnunet
Subject: [GNUnet-SVN] r36380 - in gnunet/src: include multicast psyc psycstore social util
Date: Sun, 27 Sep 2015 23:04:34 +0200

Author: tg
Date: 2015-09-27 23:04:34 +0200 (Sun, 27 Sep 2015)
New Revision: 36380

Modified:
   gnunet/src/include/gnunet_protocols.h
   gnunet/src/multicast/gnunet-service-multicast.c
   gnunet/src/multicast/multicast_api.c
   gnunet/src/multicast/test_multicast.c
   gnunet/src/psyc/gnunet-service-psyc.c
   gnunet/src/psyc/test_psyc.c
   gnunet/src/psycstore/psyc_util_lib.c
   gnunet/src/social/social_api.c
   gnunet/src/util/client_manager.c
Log:
multicast/psyc/social: message acks & scheduling

Modified: gnunet/src/include/gnunet_protocols.h
===================================================================
--- gnunet/src/include/gnunet_protocols.h       2015-09-27 21:04:23 UTC (rev 
36379)
+++ gnunet/src/include/gnunet_protocols.h       2015-09-27 21:04:34 UTC (rev 
36380)
@@ -2439,19 +2439,24 @@
 #define GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST 758
 
 /**
+ * C->S: Acknowledgement of a message or request fragment for the client.
+ */
+#define GNUNET_MESSAGE_TYPE_MULTICAST_FRAGMENT_ACK 759
+
+/**
  * C<->S<->T: Replay request from a group member to another member.
  */
-#define GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST 759
+#define GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST 760
 
 /**
  * C<->S<->T: Replay response from a group member to another member.
  */
-#define GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE 763
+#define GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE 761
 
 /**
  * C<->S: End of replay response.
  */
-#define GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE_END 764
+#define GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE_END 762
 
 
 

Modified: gnunet/src/multicast/gnunet-service-multicast.c
===================================================================
--- gnunet/src/multicast/gnunet-service-multicast.c     2015-09-27 21:04:23 UTC 
(rev 36379)
+++ gnunet/src/multicast/gnunet-service-multicast.c     2015-09-27 21:04:34 UTC 
(rev 36380)
@@ -181,6 +181,11 @@
   int8_t join_status;
 
   /**
+   * Number of messages waiting to be sent to CADET.
+   */
+  uint8_t msgs_pending;
+
+  /**
    * Channel direction.
    * @see enum ChannelDirection
    */
@@ -619,8 +624,10 @@
 /**
  * Send message to all origin and member clients connected to the group.
  *
- * @param grp  The group to send @a msg to.
- * @param msg  Message to send.
+ * @param pub_key_hash
+ *        H(key_pub) of the group.
+ * @param msg
+ *        Message to send.
  */
 static int
 client_send_all (struct GNUNET_HashCode *pub_key_hash,
@@ -660,8 +667,10 @@
 /**
  * Send message to all origin clients connected to the group.
  *
- * @param grp  The group to send @a msg to.
- * @param msg  Message to send.
+ * @param pub_key_hash
+ *        H(key_pub) of the group.
+ * @param msg
+ *        Message to send.
  */
 static int
 client_send_origin (struct GNUNET_HashCode *pub_key_hash,
@@ -676,6 +685,33 @@
 
 
 /**
+ * Send fragment acknowledgement to all clients of the channel.
+ *
+ * @param pub_key_hash
+ *        H(key_pub) of the group.
+ */
+static void
+client_send_ack (struct GNUNET_HashCode *pub_key_hash)
+{
+  static struct GNUNET_MessageHeader *msg = NULL;
+  if (NULL == msg)
+  {
+    msg = GNUNET_malloc (sizeof (*msg));
+    msg->type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_FRAGMENT_ACK);
+    msg->size = htons (sizeof (*msg));
+  }
+  client_send_all (pub_key_hash, msg);
+}
+
+
+struct CadetTransmitClosure
+{
+  struct Channel *chn;
+  const struct GNUNET_MessageHeader *msg;
+};
+
+
+/**
  * CADET is ready to transmit a message.
  */
 size_t
@@ -686,10 +722,21 @@
     /* FIXME: connection closed */
     return 0;
   }
-  const struct GNUNET_MessageHeader *msg = cls;
-  uint16_t msg_size = ntohs (msg->size);
+  struct CadetTransmitClosure *tcls = cls;
+  struct Channel *chn = tcls->chn;
+  uint16_t msg_size = ntohs (tcls->msg->size);
   GNUNET_assert (msg_size <= buf_size);
-  memcpy (buf, msg, msg_size);
+  memcpy (buf, tcls->msg, msg_size);
+  GNUNET_free (tcls);
+
+  if (0 == chn->msgs_pending)
+  {
+    GNUNET_break (0);
+  }
+  else if (0 == --chn->msgs_pending)
+  {
+    client_send_ack (&chn->group_key_hash);
+  }
   return msg_size;
 }
 
@@ -703,6 +750,11 @@
 static void
 cadet_send_channel (struct Channel *chn, const struct GNUNET_MessageHeader 
*msg)
 {
+  struct CadetTransmitClosure *tcls = GNUNET_malloc (sizeof (*tcls));
+  tcls->chn = chn;
+  tcls->msg = msg;
+
+  chn->msgs_pending++;
   chn->tmit_handle
     = GNUNET_CADET_notify_transmit_ready (chn->channel, GNUNET_NO,
                                           GNUNET_TIME_UNIT_FOREVER_REL,
@@ -1132,7 +1184,10 @@
   }
 
   client_send_all (&grp->pub_key_hash, &out->header);
-  cadet_send_children (&grp->pub_key_hash, &out->header);
+  if (0 == cadet_send_children (&grp->pub_key_hash, &out->header))
+  {
+    client_send_ack (&grp->pub_key_hash);
+  }
   GNUNET_free (out);
 
   GNUNET_SERVER_receive_done (client, GNUNET_OK);
@@ -1174,11 +1229,13 @@
     GNUNET_assert (0);
   }
 
+  uint8_t send_ack = GNUNET_YES;
   if (0 == client_send_origin (&grp->pub_key_hash, &out->header))
   { /* No local origins, send to remote origin */
     if (NULL != mem->origin_channel)
     {
       cadet_send_channel (mem->origin_channel, &out->header);
+      send_ack = GNUNET_NO;
     }
     else
     {
@@ -1188,6 +1245,10 @@
       return;
     }
   }
+  if (GNUNET_YES == send_ack)
+  {
+    client_send_ack (&grp->pub_key_hash);
+  }
   GNUNET_free (out);
   GNUNET_SERVER_receive_done (client, GNUNET_OK);
 }

Modified: gnunet/src/multicast/multicast_api.c
===================================================================
--- gnunet/src/multicast/multicast_api.c        2015-09-27 21:04:23 UTC (rev 
36379)
+++ gnunet/src/multicast/multicast_api.c        2015-09-27 21:04:34 UTC (rev 
36380)
@@ -102,6 +102,11 @@
   uint8_t in_transmit;
 
   /**
+   * Number of MULTICAST_FRAGMENT_ACK messages we are still waiting for.
+   */
+  uint8_t acks_pending;
+
+  /**
    * Is this the origin or a member?
    */
   uint8_t is_origin;
@@ -185,6 +190,13 @@
 };
 
 
+static void
+origin_to_all (struct GNUNET_MULTICAST_Origin *orig);
+
+static void
+member_to_origin (struct GNUNET_MULTICAST_Member *mem);
+
+
 /**
  * Send first message to the service after connecting.
  */
@@ -274,6 +286,38 @@
 
 
 /**
+ * Receive message/request fragment acknowledgement from service.
+ */
+static void
+group_recv_fragment_ack (void *cls,
+                         struct GNUNET_CLIENT_MANAGER_Connection *client,
+                         const struct GNUNET_MessageHeader *msg)
+{
+  struct GNUNET_MULTICAST_Group *
+    grp = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*grp));
+
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "%p Got fragment ACK. in_transmit=%u, acks_pending=%u\n",
+       grp, grp->in_transmit, grp->acks_pending);
+
+  if (0 == grp->acks_pending)
+  {
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "%p Ignoring extraneous fragment ACK.\n", grp);
+    return;
+  }
+  grp->acks_pending--;
+
+  if (GNUNET_YES != grp->in_transmit)
+    return;
+
+  if (GNUNET_YES == grp->is_origin)
+    origin_to_all ((struct GNUNET_MULTICAST_Origin *) grp);
+  else
+    member_to_origin ((struct GNUNET_MULTICAST_Member *) grp);
+}
+
+/**
  * Origin receives uniquest request from a member.
  */
 static void
@@ -447,6 +491,10 @@
     GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST,
     sizeof (struct GNUNET_MULTICAST_RequestHeader), GNUNET_YES },
 
+  { group_recv_fragment_ack, NULL,
+    GNUNET_MESSAGE_TYPE_MULTICAST_FRAGMENT_ACK,
+    sizeof (struct GNUNET_MessageHeader), GNUNET_YES },
+
   { group_recv_join_request, NULL,
     GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST,
     sizeof (struct MulticastJoinRequestMessage), GNUNET_YES },
@@ -470,6 +518,10 @@
     GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE,
     sizeof (struct GNUNET_MULTICAST_MessageHeader), GNUNET_YES },
 
+  { group_recv_fragment_ack, NULL,
+    GNUNET_MESSAGE_TYPE_MULTICAST_FRAGMENT_ACK,
+    sizeof (struct GNUNET_MessageHeader), GNUNET_YES },
+
   { group_recv_join_request, NULL,
     GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST,
     sizeof (struct MulticastJoinRequestMessage), GNUNET_YES },
@@ -577,6 +629,7 @@
     memcpy (((char *) &dcsn[1]) + relay_size, join_resp, join_resp_size);
 
   GNUNET_CLIENT_MANAGER_transmit (grp->client, &hdcsn->header);
+  GNUNET_free (hdcsn);
   GNUNET_free (join);
   return NULL;
 }
@@ -774,9 +827,10 @@
 static void
 origin_to_all (struct GNUNET_MULTICAST_Origin *orig)
 {
-  LOG (GNUNET_ERROR_TYPE_DEBUG, "origin_to_all()\n");
+  LOG (GNUNET_ERROR_TYPE_DEBUG, "%p origin_to_all()\n", orig);
   struct GNUNET_MULTICAST_Group *grp = &orig->grp;
   struct GNUNET_MULTICAST_OriginTransmitHandle *tmit = &orig->tmit;
+  GNUNET_assert (GNUNET_YES == grp->in_transmit);
 
   size_t buf_size = GNUNET_MULTICAST_FRAGMENT_MAX_SIZE;
   struct GNUNET_MULTICAST_MessageHeader *msg = GNUNET_malloc (buf_size);
@@ -786,7 +840,8 @@
       || GNUNET_MULTICAST_FRAGMENT_MAX_SIZE < buf_size)
   {
     LOG (GNUNET_ERROR_TYPE_ERROR,
-         "OriginTransmitNotify() returned error or invalid message size.\n");
+         "%p OriginTransmitNotify() returned error or invalid message size.\n",
+         orig);
     /* FIXME: handle error */
     GNUNET_free (msg);
     return;
@@ -794,6 +849,8 @@
 
   if (GNUNET_NO == ret && 0 == buf_size)
   {
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "%p OriginTransmitNotify() - transmission paused.\n", orig);
     GNUNET_free (msg);
     return; /* Transmission paused. */
   }
@@ -805,7 +862,12 @@
   msg->fragment_offset = GNUNET_htonll (tmit->fragment_offset);
   tmit->fragment_offset += sizeof (*msg) + buf_size;
 
+  grp->acks_pending++;
   GNUNET_CLIENT_MANAGER_transmit (grp->client, &msg->header);
+  GNUNET_free (msg);
+
+  if (GNUNET_YES == ret)
+    grp->in_transmit = GNUNET_NO;
 }
 
 
@@ -834,11 +896,10 @@
                                 GNUNET_MULTICAST_OriginTransmitNotify notify,
                                 void *notify_cls)
 {
-/* FIXME
-  if (GNUNET_YES == orig->grp.in_transmit)
+  struct GNUNET_MULTICAST_Group *grp = &orig->grp;
+  if (GNUNET_YES == grp->in_transmit)
     return NULL;
-  orig->grp.in_transmit = GNUNET_YES;
-*/
+  grp->in_transmit = GNUNET_YES;
 
   struct GNUNET_MULTICAST_OriginTransmitHandle *tmit = &orig->tmit;
   tmit->origin = orig;
@@ -861,6 +922,9 @@
 void
 GNUNET_MULTICAST_origin_to_all_resume (struct 
GNUNET_MULTICAST_OriginTransmitHandle *th)
 {
+  struct GNUNET_MULTICAST_Group *grp = &th->origin->grp;
+  if (0 != grp->acks_pending || GNUNET_YES != grp->in_transmit)
+    return;
   origin_to_all (th->origin);
 }
 
@@ -874,6 +938,7 @@
 void
 GNUNET_MULTICAST_origin_to_all_cancel (struct 
GNUNET_MULTICAST_OriginTransmitHandle *th)
 {
+  th->origin->grp.in_transmit = GNUNET_NO;
 }
 
 
@@ -1094,6 +1159,7 @@
   LOG (GNUNET_ERROR_TYPE_DEBUG, "member_to_origin()\n");
   struct GNUNET_MULTICAST_Group *grp = &mem->grp;
   struct GNUNET_MULTICAST_MemberTransmitHandle *tmit = &mem->tmit;
+  GNUNET_assert (GNUNET_YES == grp->in_transmit);
 
   size_t buf_size = GNUNET_MULTICAST_FRAGMENT_MAX_SIZE;
   struct GNUNET_MULTICAST_RequestHeader *req = GNUNET_malloc (buf_size);
@@ -1124,6 +1190,10 @@
   tmit->fragment_offset += sizeof (*req) + buf_size;
 
   GNUNET_CLIENT_MANAGER_transmit (grp->client, &req->header);
+  GNUNET_free (req);
+
+  if (GNUNET_YES == ret)
+    grp->in_transmit = GNUNET_NO;
 }
 
 
@@ -1147,11 +1217,9 @@
                                    GNUNET_MULTICAST_MemberTransmitNotify 
notify,
                                    void *notify_cls)
 {
-/* FIXME
   if (GNUNET_YES == mem->grp.in_transmit)
     return NULL;
   mem->grp.in_transmit = GNUNET_YES;
-*/
 
   struct GNUNET_MULTICAST_MemberTransmitHandle *tmit = &mem->tmit;
   tmit->member = mem;
@@ -1173,6 +1241,9 @@
 void
 GNUNET_MULTICAST_member_to_origin_resume (struct 
GNUNET_MULTICAST_MemberTransmitHandle *th)
 {
+  struct GNUNET_MULTICAST_Group *grp = &th->member->grp;
+  if (0 != grp->acks_pending || GNUNET_YES != grp->in_transmit)
+    return;
   member_to_origin (th->member);
 }
 
@@ -1186,6 +1257,7 @@
 void
 GNUNET_MULTICAST_member_to_origin_cancel (struct 
GNUNET_MULTICAST_MemberTransmitHandle *th)
 {
+  th->member->grp.in_transmit = GNUNET_NO;
 }
 
 

Modified: gnunet/src/multicast/test_multicast.c
===================================================================
--- gnunet/src/multicast/test_multicast.c       2015-09-27 21:04:23 UTC (rev 
36379)
+++ gnunet/src/multicast/test_multicast.c       2015-09-27 21:04:34 UTC (rev 
36380)
@@ -183,7 +183,7 @@
   struct TransmitClosure *tmit = cls;
   if (NULL != tmit->orig_tmit)
     GNUNET_MULTICAST_origin_to_all_resume (tmit->orig_tmit);
-  else
+  else if (NULL != tmit->mem_tmit)
     GNUNET_MULTICAST_member_to_origin_resume (tmit->mem_tmit);
 }
 
@@ -453,7 +453,7 @@
   *tmit = (struct TransmitClosure) {};
   tmit->data[0] = "abc def";
   tmit->data[1] = "ghi jkl mno";
-  tmit->data_delay[1] = 1;
+  tmit->data_delay[1] = 2;
   tmit->data[2] = "pqr stuw xyz";
   tmit->data_count = 3;
 
@@ -460,7 +460,8 @@
   origin_cls.n = 0;
   origin_cls.msgs_expected = 1;
 
-  GNUNET_MULTICAST_member_to_origin (member, 1, tmit_notify, tmit);
+  tmit->mem_tmit = GNUNET_MULTICAST_member_to_origin (member, 1,
+                                                      tmit_notify, tmit);
 }
 
 
@@ -533,15 +534,19 @@
   struct TransmitClosure *tmit = &tmit_cls;
   *tmit = (struct TransmitClosure) {};
   tmit->data[0] = "ABC DEF";
-  tmit->data[1] = "GHI JKL MNO";
-  tmit->data_delay[1] = 1;
-  tmit->data[2] = "PQR STUW XYZ";
-  tmit->data_count = 3;
+  tmit->data[1] =  GNUNET_malloc (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD + 1);
+  for (uint16_t i = 0; i < GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD; i++)
+    tmit->data[1][i] = (0 == i % 10000) ? '0' + i / 10000 : '_';
+  tmit->data[2] = "GHI JKL MNO";
+  tmit->data_delay[2] = 2;
+  tmit->data[3] = "PQR STUW XYZ";
+  tmit->data_count = 4;
 
   origin_cls.n = member_cls.n = 0;
-  origin_cls.msgs_expected = member_cls.msgs_expected = 1;
+  origin_cls.msgs_expected = member_cls.msgs_expected = tmit->data_count;
 
-  GNUNET_MULTICAST_origin_to_all (origin, 1, 1, tmit_notify, tmit);
+  tmit->orig_tmit = GNUNET_MULTICAST_origin_to_all (origin, 1, 1,
+                                                    tmit_notify, tmit);
 }
 
 

Modified: gnunet/src/psyc/gnunet-service-psyc.c
===================================================================
--- gnunet/src/psyc/gnunet-service-psyc.c       2015-09-27 21:04:23 UTC (rev 
36379)
+++ gnunet/src/psyc/gnunet-service-psyc.c       2015-09-27 21:04:34 UTC (rev 
36380)
@@ -107,17 +107,6 @@
    */
   uint16_t last_ptype;
 
-  /**
-   * @see enum MessageState
-   */
-  uint8_t state;
-
-  /**
-   * Whether a message ACK has already been sent to the client.
-   * #GNUNET_YES or #GNUNET_NO
-   */
-  uint8_t ack_sent;
-
   /* Followed by message */
 };
 
@@ -281,11 +270,6 @@
   uint32_t tmit_mod_value_size;
 
   /**
-   * @see enum MessageState
-   */
-  uint8_t tmit_state;
-
-  /**
    * Is this a channel master (#GNUNET_YES), or slave (#GNUNET_NO)?
    */
   uint8_t is_master;
@@ -438,6 +422,15 @@
 message_queue_drop (struct Channel *chn);
 
 
+static void
+schedule_transmit_message (void *cls,
+                           const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+  struct Channel *chn = cls;
+  transmit_message (chn);
+}
+
+
 /**
  * Task run during shutdown.
  *
@@ -1145,8 +1138,8 @@
       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
                   "%p Header of message %" PRIu64 " is NOT complete yet: "
                   "%" PRIu64 " != %" PRIu64 "\n",
-                  chn, GNUNET_ntohll (mmsg->message_id), frag_offset,
-                  fragq->header_size);
+                  chn, GNUNET_ntohll (mmsg->message_id),
+                  frag_offset, fragq->header_size);
     }
   }
 
@@ -1159,8 +1152,8 @@
       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
                   "%p Message %" PRIu64 " is NOT complete yet: "
                   "%" PRIu64 " != %" PRIu64 "\n",
-                  chn, GNUNET_ntohll (mmsg->message_id), frag_offset,
-                  fragq->size);
+                  chn, GNUNET_ntohll (mmsg->message_id),
+                  frag_offset, fragq->size);
     break;
 
   case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
@@ -1486,17 +1479,26 @@
   uint16_t size = ntohs (mmsg->header.size);
 
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "%p Received multicast message of size %u.\n",
-              chn, size);
+              "%p Received multicast message of size %u. "
+              "fragment_id=%" PRIu64 ", message_id=%" PRIu64
+              ", fragment_offset=%" PRIu64 ", flags=%" PRIu64 "\n",
+              chn, size,
+              GNUNET_ntohll (mmsg->fragment_id),
+              GNUNET_ntohll (mmsg->message_id),
+              GNUNET_ntohll (mmsg->fragment_offset),
+              GNUNET_ntohll (mmsg->flags));
 
   GNUNET_PSYCSTORE_fragment_store (store, &chn->pub_key, mmsg, 0,
                                    &store_recv_fragment_store_result, chn);
 
   uint16_t first_ptype = 0, last_ptype = 0;
-  if (GNUNET_SYSERR
-      == GNUNET_PSYC_receive_check_parts (size - sizeof (*mmsg),
-                                          (const char *) &mmsg[1],
-                                          &first_ptype, &last_ptype))
+  int check = GNUNET_PSYC_receive_check_parts (size - sizeof (*mmsg),
+                                               (const char *) &mmsg[1],
+                                               &first_ptype, &last_ptype);
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "%p Message check result %d, first part type %u, last part type 
%u\n",
+              chn, check, first_ptype, last_ptype);
+  if (GNUNET_SYSERR == check)
   {
     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
                 "%p Dropping incoming multicast message with invalid parts.\n",
@@ -1505,10 +1507,6 @@
     return;
   }
 
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Message parts: first: type %u, last: type %u\n",
-              first_ptype, last_ptype);
-
   fragment_queue_insert (chn, mmsg, first_ptype, last_ptype);
   message_queue_run (chn);
 }
@@ -1965,6 +1963,8 @@
   {
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                 "%p transmit_notify: nothing to send.\n", chn);
+    if (NULL != tmit_msg && *data_size < tmit_msg->size)
+      GNUNET_break (0);
     *data_size = 0;
     return GNUNET_NO;
   }
@@ -1975,9 +1975,13 @@
   *data_size = tmit_msg->size;
   memcpy (data, &tmit_msg[1], *data_size);
 
-  int ret = (MSG_STATE_END < chn->tmit_state) ? GNUNET_NO : GNUNET_YES;
+  int ret
+    = (tmit_msg->last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END)
+    ? GNUNET_NO
+    : GNUNET_YES;
 
-  if (NULL != tmit_msg->client && GNUNET_NO == tmit_msg->ack_sent)
+  /* FIXME: handle disconnecting clients */
+  if (NULL != tmit_msg->client)
     send_message_ack (chn, tmit_msg->client);
 
   GNUNET_CONTAINER_DLL_remove (chn->tmit_head, chn->tmit_tail, tmit_msg);
@@ -1985,7 +1989,7 @@
 
   if (NULL != chn->tmit_head)
   {
-    transmit_message (chn);
+    GNUNET_SCHEDULER_add_now (schedule_transmit_message, chn);
   }
   else if (GNUNET_YES == chn->is_disconnected
            && tmit_msg->last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END)
@@ -2037,10 +2041,12 @@
 static void
 master_transmit_message (struct Master *mst)
 {
+  if (NULL == mst->chn.tmit_head)
+    return;
   if (NULL == mst->tmit_handle)
   {
     mst->tmit_handle
-      = GNUNET_MULTICAST_origin_to_all (mst->origin, mst->max_message_id,
+      = GNUNET_MULTICAST_origin_to_all (mst->origin, mst->chn.tmit_head->id,
                                         mst->max_group_generation,
                                         master_transmit_notify, mst);
   }
@@ -2057,10 +2063,12 @@
 static void
 slave_transmit_message (struct Slave *slv)
 {
+  if (NULL == slv->chn.tmit_head)
+    return;
   if (NULL == slv->tmit_handle)
   {
     slv->tmit_handle
-      = GNUNET_MULTICAST_member_to_origin (slv->member, slv->max_request_id,
+      = GNUNET_MULTICAST_member_to_origin (slv->member, slv->chn.tmit_head->id,
                                            slave_transmit_notify, slv);
   }
   else
@@ -2090,6 +2098,9 @@
   if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == tmit_msg->first_ptype)
   {
     tmit_msg->id = ++mst->max_message_id;
+    GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+                "%p master_queue_message: message_id=%" PRIu64 "\n",
+                mst, tmit_msg->id);
     struct GNUNET_PSYC_MessageMethod *pmeth
       = (struct GNUNET_PSYC_MessageMethod *) &tmit_msg[1];
 
@@ -2159,7 +2170,6 @@
   memcpy (&tmit_msg[1], data, data_size);
   tmit_msg->client = client;
   tmit_msg->size = data_size;
-  tmit_msg->state = chn->tmit_state;
   tmit_msg->first_ptype = first_ptype;
   tmit_msg->last_ptype = last_ptype;
 

Modified: gnunet/src/psyc/test_psyc.c
===================================================================
--- gnunet/src/psyc/test_psyc.c 2015-09-27 21:04:23 UTC (rev 36379)
+++ gnunet/src/psyc/test_psyc.c 2015-09-27 21:04:34 UTC (rev 36380)
@@ -225,7 +225,8 @@
   if (NULL == msg)
   {
     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
-                "Error while receiving message %" PRIu64 "\n", message_id);
+                "Test #%d: Error while master is receiving part of message #%" 
PRIu64 ".\n",
+                test, message_id);
     return;
   }
 
@@ -243,7 +244,8 @@
     if (GNUNET_PSYC_MESSAGE_REQUEST != flags)
     {
       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
-                  "Unexpected request flags: %x" PRIu32 "\n", flags);
+                  "Test #%d: Unexpected request flags: %x" PRIu32 "\n",
+                  test, flags);
       GNUNET_assert (0);
       return;
     }
@@ -297,7 +299,8 @@
   if (NULL == msg)
   {
     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
-                "Error while receiving message " PRIu64 "\n", message_id);
+                "Test #%d: Error while slave is receiving part of message #%" 
PRIu64 ".\n",
+                test, message_id);
     return;
   }
 
@@ -322,7 +325,7 @@
     {
       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
                   "Test #%d: Unexpected flags for historic message: %x" PRIu32 
"\n",
-                  flags);
+                  test, flags);
       GNUNET_assert (0);
       return;
     }
@@ -575,9 +578,9 @@
 
   uint16_t size = strlen (tmit->data[tmit->n]);
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Transmit notify data: %u bytes available, "
+              "Test #%d: Transmit notify data: %u bytes available, "
               "processing fragment %u/%u (size %u).\n",
-              *data_size, tmit->n + 1, tmit->data_count, size);
+              test, *data_size, tmit->n + 1, tmit->data_count, size);
   if (*data_size < size)
   {
     *data_size = 0;
@@ -587,7 +590,8 @@
 
   if (GNUNET_YES != tmit->paused && 0 < tmit->data_delay[tmit->n])
   {
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transmission paused.\n");
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Test #%d: Transmission paused.\n", test);
     tmit->paused = GNUNET_YES;
     GNUNET_SCHEDULER_add_delayed (
       GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
@@ -611,9 +615,9 @@
 {
   struct TransmitClosure *tmit = cls;
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Transmit notify modifier: %lu bytes available, "
+              "Test #%d: Transmit notify modifier: %lu bytes available, "
               "%u modifiers left to process.\n",
-              *data_size, GNUNET_ENV_environment_get_count (tmit->env));
+              test, *data_size, GNUNET_ENV_environment_get_count (tmit->env));
 
   uint16_t name_size = 0;
   size_t value_size = 0;
@@ -688,9 +692,9 @@
 void
 slave_transmit ()
 {
-
-  GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Slave sending request to master.\n");
   test = TEST_SLAVE_TRANSMIT;
+  GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+              "Test #%d: Slave sending request to master.\n", test);
 
   tmit = GNUNET_new (struct TransmitClosure);
   tmit->env = GNUNET_ENV_environment_create ();
@@ -772,7 +776,7 @@
                   const struct GNUNET_PSYC_Message *join_msg)
 {
   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
-              "Slave got join decision: %d\n", is_admitted);
+              "Test #%d: Slave got join decision: %d\n", test, is_admitted);
 
   switch (test)
   {
@@ -804,8 +808,8 @@
   struct GNUNET_HashCode slave_key_hash;
   GNUNET_CRYPTO_hash (slave_key, sizeof (*slave_key), &slave_key_hash);
   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
-              "Got join request #%u from %s.\n",
-              join_req_count, GNUNET_h2s (&slave_key_hash));
+              "Test #%d: Got join request #%u from %s.\n",
+              test, join_req_count, GNUNET_h2s (&slave_key_hash));
 
   /* Reject first request */
   int is_admitted = (0 < join_req_count++) ? GNUNET_YES : GNUNET_NO;
@@ -817,8 +821,8 @@
 slave_connect_cb (void *cls, int result, uint64_t max_message_id)
 {
   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
-              "Slave connected: %d, max_message_id: %" PRIu64 "\n",
-              result, max_message_id);
+              "Test #%d: Slave connected: %d, max_message_id: %" PRIu64 "\n",
+              test, result, max_message_id);
   GNUNET_assert (TEST_SLAVE_JOIN_REJECT == test || TEST_SLAVE_JOIN_ACCEPT == 
test);
   GNUNET_assert (GNUNET_OK == result || GNUNET_NO == result);
 }
@@ -827,8 +831,8 @@
 static void
 slave_join (int t)
 {
-  GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Joining slave.\n");
   test = t;
+  GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Test #%d: Joining slave.\n");
 
   struct GNUNET_PeerIdentity origin = this_peer;
   struct GNUNET_ENV_Environment *env = GNUNET_ENV_environment_create ();
@@ -852,8 +856,9 @@
 void
 master_transmit ()
 {
-  GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Master sending message to all.\n");
   test = TEST_MASTER_TRANSMIT;
+  GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+              "Test #%d: Master sending message to all.\n", test);
   end_count = 0;
 
   uint32_t i, j;
@@ -907,8 +912,8 @@
 master_start_cb (void *cls, int result, uint64_t max_message_id)
 {
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Master started: %d, max_message_id: %" PRIu64 "\n",
-              result, max_message_id);
+              "Test #%d: Master started: %d, max_message_id: %" PRIu64 "\n",
+              test, result, max_message_id);
   GNUNET_assert (TEST_MASTER_START == test);
   GNUNET_assert (GNUNET_OK == result || GNUNET_NO == result);
   slave_join (TEST_SLAVE_JOIN_REJECT);
@@ -918,8 +923,8 @@
 void
 master_start ()
 {
-  GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Starting master.\n");
   test = TEST_MASTER_START;
+  GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Test #%d: Starting master.\n", test);
   mst = GNUNET_PSYC_master_start (cfg, channel_key, 
GNUNET_PSYC_CHANNEL_PRIVATE,
                                   &master_start_cb, &join_request_cb,
                                   &master_message_cb, &master_message_part_cb,

Modified: gnunet/src/psycstore/psyc_util_lib.c
===================================================================
--- gnunet/src/psycstore/psyc_util_lib.c        2015-09-27 21:04:23 UTC (rev 
36379)
+++ gnunet/src/psycstore/psyc_util_lib.c        2015-09-27 21:04:34 UTC (rev 
36380)
@@ -101,6 +101,12 @@
    * Are we currently transmitting a message?
    */
   uint8_t in_transmit;
+
+  /**
+   * Notify callback is currently being called.
+   */
+  uint8_t in_notify;
+
 };
 
 
@@ -334,20 +340,20 @@
  *        Transmission handle.
  * @param msg
  *        Message part, or NULL.
- * @param end
- *        End of message?
+ * @param tmit_now
+ *        Transmit message now, or wait for buffer to fill up?
  *        #GNUNET_YES or #GNUNET_NO.
  */
 static void
 transmit_queue_insert (struct GNUNET_PSYC_TransmitHandle *tmit,
                        const struct GNUNET_MessageHeader *msg,
-                       uint8_t end)
+                       uint8_t tmit_now)
 {
   uint16_t size = (NULL != msg) ? ntohs (msg->size) : 0;
 
   LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "Queueing message part of type %u and size %u (end: %u)).\n",
-       NULL != msg ? ntohs (msg->type) : 0, size, end);
+       "Queueing message part of type %u and size %u (tmit_now: %u)).\n",
+       NULL != msg ? ntohs (msg->type) : 0, size, tmit_now);
 
   if (NULL != tmit->msg)
   {
@@ -380,7 +386,7 @@
   }
 
   if (NULL != tmit->msg
-      && (GNUNET_YES == end
+      && (GNUNET_YES == tmit_now
           || (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD
               < tmit->msg->size + sizeof (struct GNUNET_MessageHeader))))
   {
@@ -391,9 +397,6 @@
     tmit->msg = NULL;
     tmit->acks_pending++;
   }
-
-  if (GNUNET_YES == end)
-    tmit->in_transmit = GNUNET_NO;
 }
 
 
@@ -414,7 +417,9 @@
   if (NULL != tmit->notify_data)
   {
     data_size = GNUNET_PSYC_DATA_MAX_PAYLOAD;
+    tmit->in_notify = GNUNET_YES;
     notify_ret = tmit->notify_data (tmit->notify_data_cls, &data_size, 
&msg[1]);
+    tmit->in_notify = GNUNET_NO;
   }
   LOG (GNUNET_ERROR_TYPE_DEBUG,
        "transmit_data (ret: %d, size: %u): %.*s\n",
@@ -442,6 +447,7 @@
     msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL);
     msg->size = htons (sizeof (*msg));
     transmit_queue_insert (tmit, msg, GNUNET_YES);
+    tmit->in_transmit = GNUNET_NO;
     return;
   }
 
@@ -458,6 +464,8 @@
     msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END);
     msg->size = htons (sizeof (*msg));
     transmit_queue_insert (tmit, msg, GNUNET_YES);
+    /* FIXME: wait for ACK before setting in_transmit to no */
+    tmit->in_transmit = GNUNET_NO;
   }
 }
 
@@ -489,8 +497,10 @@
     {
       max_data_size = GNUNET_PSYC_MODIFIER_MAX_PAYLOAD;
       data_size = max_data_size;
+      tmit->in_notify = GNUNET_YES;
       notify_ret = tmit->notify_mod (tmit->notify_mod_cls, &data_size, &mod[1],
                                      &mod->oper, &mod->value_size);
+      tmit->in_notify = GNUNET_NO;
     }
 
     mod->name_size = strnlen ((char *) &mod[1], data_size) + 1;
@@ -520,8 +530,10 @@
     {
       max_data_size = GNUNET_PSYC_MOD_CONT_MAX_PAYLOAD;
       data_size = max_data_size;
+      tmit->in_notify = GNUNET_YES;
       notify_ret = tmit->notify_mod (tmit->notify_mod_cls,
                                      &data_size, &msg[1], NULL, NULL);
+      tmit->in_notify = GNUNET_NO;
     }
     tmit->mod_value_remaining -= data_size;
     LOG (GNUNET_ERROR_TYPE_DEBUG,
@@ -558,8 +570,8 @@
     tmit->state = GNUNET_PSYC_MESSAGE_STATE_CANCEL;
     msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL);
     msg->size = htons (sizeof (*msg));
-
     transmit_queue_insert (tmit, msg, GNUNET_YES);
+    tmit->in_transmit = GNUNET_NO;
     return;
   }
 
@@ -748,6 +760,9 @@
 void
 GNUNET_PSYC_transmit_resume (struct GNUNET_PSYC_TransmitHandle *tmit)
 {
+  if (GNUNET_YES != tmit->in_transmit || GNUNET_NO != tmit->in_notify)
+    return;
+
   if (0 == tmit->acks_pending)
   {
     tmit->paused = GNUNET_NO;
@@ -800,13 +815,11 @@
   {
   case GNUNET_PSYC_MESSAGE_STATE_MODIFIER:
   case GNUNET_PSYC_MESSAGE_STATE_MOD_CONT:
-    if (GNUNET_NO == tmit->paused)
-      transmit_mod (tmit);
+    transmit_mod (tmit);
     break;
 
   case GNUNET_PSYC_MESSAGE_STATE_DATA:
-    if (GNUNET_NO == tmit->paused)
-      transmit_data (tmit);
+    transmit_data (tmit);
     break;
 
   case GNUNET_PSYC_MESSAGE_STATE_END:

Modified: gnunet/src/social/social_api.c
===================================================================
--- gnunet/src/social/social_api.c      2015-09-27 21:04:23 UTC (rev 36379)
+++ gnunet/src/social/social_api.c      2015-09-27 21:04:34 UTC (rev 36380)
@@ -1837,8 +1837,10 @@
 {
   if (GNUNET_OK ==
       GNUNET_PSYC_transmit_message (hst->plc.tmit, method_name, env,
-                                    NULL, notify_data, notify_data_cls, 
flags));
-  return (struct GNUNET_SOCIAL_Announcement *) hst->plc.tmit;
+                                    NULL, notify_data, notify_data_cls, flags))
+    return (struct GNUNET_SOCIAL_Announcement *) hst->plc.tmit;
+  else
+    return NULL;
 }
 
 
@@ -2168,8 +2170,10 @@
 
   if (GNUNET_OK ==
       GNUNET_PSYC_transmit_message (plc->tmit, method_name, env,
-                                    NULL, notify_data, notify_data_cls, 
flags));
-  return (struct GNUNET_SOCIAL_TalkRequest *) plc->tmit;
+                                    NULL, notify_data, notify_data_cls, flags))
+    return (struct GNUNET_SOCIAL_TalkRequest *) plc->tmit;
+  else
+    return NULL;
 }
 
 

Modified: gnunet/src/util/client_manager.c
===================================================================
--- gnunet/src/util/client_manager.c    2015-09-27 21:04:23 UTC (rev 36379)
+++ gnunet/src/util/client_manager.c    2015-09-27 21:04:34 UTC (rev 36380)
@@ -328,7 +328,7 @@
 
   mgr->client_tmit
     = GNUNET_CLIENT_notify_transmit_ready (mgr->client,
-                                           ntohs (mgr->tmit_head->msg->size),
+                                           GNUNET_SERVER_MAX_MESSAGE_SIZE - 1,
                                            GNUNET_TIME_UNIT_FOREVER_REL,
                                            GNUNET_NO,
                                            &send_next_message,




reply via email to

[Prev in Thread] Current Thread [Next in Thread]