[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] [gnunet] 02/06: RPS api: Schedule callback
From: |
gnunet |
Subject: |
[GNUnet-SVN] [gnunet] 02/06: RPS api: Schedule callback |
Date: |
Fri, 23 Nov 2018 00:47:32 +0100 |
This is an automated email from the git hooks/post-receive script.
julius-buenger pushed a commit to branch master
in repository gnunet.
commit 101694a7120bc90c8c02024294b6f098b1bca9ff
Author: Julius Bünger <address@hidden>
AuthorDate: Thu Nov 22 13:07:47 2018 +0100
RPS api: Schedule callback
---
src/rps/rps_api.c | 84 +++++++++++++++++++++++++++++++++++++++++++++----------
1 file changed, 69 insertions(+), 15 deletions(-)
diff --git a/src/rps/rps_api.c b/src/rps/rps_api.c
index cfab06f17..420323c4b 100644
--- a/src/rps/rps_api.c
+++ b/src/rps/rps_api.c
@@ -11,7 +11,7 @@
WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
Affero General Public License for more details.
-
+
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
@@ -52,6 +52,11 @@ struct GNUNET_RPS_StreamRequestHandle
void *ready_cb_cls;
/**
+ * @brief Scheduler task for scheduled callback
+ */
+ struct GNUNET_SCHEDULER_Task *callback_task;
+
+ /**
* @brief Next element of the DLL
*/
struct GNUNET_RPS_StreamRequestHandle *next;
@@ -172,6 +177,19 @@ struct cb_cls_pack
/**
+ * @brief Peers received from the biased stream to be passed to all
+ * srh_handlers
+ */
+static struct GNUNET_PeerIdentity *srh_callback_peers;
+
+/**
+ * @brief Number of peers in the biased stream that are to be passed to all
+ * srh_handlers
+ */
+static uint64_t srh_callback_num_peers;
+
+
+/**
* @brief Create a new handle for a stream request
*
* @param rps_handle The rps handle
@@ -213,6 +231,12 @@ remove_stream_request (struct
GNUNET_RPS_StreamRequestHandle *srh,
struct GNUNET_RPS_StreamRequestHandle *srh_head,
struct GNUNET_RPS_StreamRequestHandle *srh_tail)
{
+ GNUNET_assert (NULL != srh);
+ if (NULL != srh->callback_task)
+ {
+ GNUNET_SCHEDULER_cancel (srh->callback_task);
+ srh->callback_task = NULL;
+ }
GNUNET_CONTAINER_DLL_remove (srh_head,
srh_tail,
srh);
@@ -425,12 +449,10 @@ GNUNET_RPS_stream_cancel (struct
GNUNET_RPS_StreamRequestHandle *srh)
{
struct GNUNET_RPS_Handle *rps_handle;
- GNUNET_assert (NULL != srh);
rps_handle = srh->rps_handle;
- GNUNET_CONTAINER_DLL_remove (rps_handle->stream_requests_head,
- rps_handle->stream_requests_tail,
- srh);
- GNUNET_free (srh);
+ remove_stream_request (srh,
+ rps_handle->stream_requests_head,
+ rps_handle->stream_requests_tail);
if (NULL == rps_handle->stream_requests_head) cancel_stream (rps_handle);
}
@@ -463,6 +485,24 @@ check_stream_input (void *cls,
return GNUNET_OK;
}
+
+/**
+ * @brief Called by the scheduler to call the callbacks of the srh handlers
+ *
+ * @param cls Stream request handle
+ */
+static void
+srh_callback_scheduled (void *cls)
+{
+ struct GNUNET_RPS_StreamRequestHandle *srh = cls;
+
+ srh->callback_task = NULL;
+ srh->ready_cb (srh->ready_cb_cls,
+ srh_callback_num_peers,
+ srh_callback_peers);
+}
+
+
/**
* This function is called, when the service sends another peer from the biased
* stream.
@@ -476,13 +516,20 @@ handle_stream_input (void *cls,
const struct GNUNET_RPS_CS_DEBUG_StreamReply *msg)
{
struct GNUNET_RPS_Handle *h = cls;
- const struct GNUNET_PeerIdentity *peers;
+ //const struct GNUNET_PeerIdentity *peers;
uint64_t num_peers;
struct GNUNET_RPS_StreamRequestHandle *srh_iter;
struct GNUNET_RPS_StreamRequestHandle *srh_next;
- peers = (struct GNUNET_PeerIdentity *) &msg[1];
+ //peers = (struct GNUNET_PeerIdentity *) &msg[1];
num_peers = ntohl (msg->num_peers);
+ srh_callback_num_peers = num_peers;
+ if (NULL != srh_callback_peers) GNUNET_free (srh_callback_peers);
+ srh_callback_peers =
+ GNUNET_malloc (num_peers * sizeof (struct GNUNET_PeerIdentity));
+ GNUNET_memcpy (srh_callback_peers,
+ &msg[1],
+ num_peers * sizeof (struct GNUNET_PeerIdentity));
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Received %" PRIu64 " peer(s) from stream input.\n",
num_peers);
@@ -492,9 +539,12 @@ handle_stream_input (void *cls,
LOG (GNUNET_ERROR_TYPE_DEBUG, "Calling srh \n");
/* Store next pointer - srh might be removed/freed in callback */
srh_next = srh_iter->next;
- srh_iter->ready_cb (srh_iter->ready_cb_cls,
- num_peers,
- peers);
+ if (NULL != srh_iter->callback_task)
+ {
+ GNUNET_SCHEDULER_cancel (srh_iter->callback_task);
+ }
+ srh_iter->callback_task =
+ GNUNET_SCHEDULER_add_now (srh_callback_scheduled, srh_iter);
srh_iter = srh_next;
}
@@ -855,10 +905,9 @@ GNUNET_RPS_request_cancel (struct
GNUNET_RPS_Request_Handle *rh)
h = rh->rps_handle;
GNUNET_assert (NULL != rh);
- GNUNET_assert (NULL != rh->srh);
- remove_stream_request (rh->srh,
- h->stream_requests_head,
- h->stream_requests_tail);
+ GNUNET_assert (h == rh->srh->rps_handle);
+ GNUNET_RPS_stream_cancel (rh->srh);
+ rh->srh = NULL;
if (NULL == h->stream_requests_head) cancel_stream(h);
if (NULL != rh->sampler_rh)
{
@@ -891,6 +940,11 @@ GNUNET_RPS_disconnect (struct GNUNET_RPS_Handle *h)
GNUNET_RPS_stream_cancel (srh_tmp);
}
}
+ if (NULL != srh_callback_peers)
+ {
+ GNUNET_free (srh_callback_peers);
+ srh_callback_peers = NULL;
+ }
if (NULL != h->view_update_cb)
{
LOG (GNUNET_ERROR_TYPE_WARNING,
--
To stop receiving notification emails like this one, please contact
address@hidden
- [GNUnet-SVN] [gnunet] branch master updated (fbc5f3876 -> 39af429db), gnunet, 2018/11/22
- [GNUnet-SVN] [gnunet] 01/06: RPS profiler: Dump more statistics, gnunet, 2018/11/22
- [GNUnet-SVN] [gnunet] 05/06: RPS tests: Get rid of warning (unused argument), gnunet, 2018/11/22
- [GNUnet-SVN] [gnunet] 02/06: RPS api: Schedule callback,
gnunet <=
- [GNUnet-SVN] [gnunet] 06/06: RPS profiler: Dump more statistics to file, gnunet, 2018/11/22
- [GNUnet-SVN] [gnunet] 03/06: fixes for DLL management and indentation, gnunet, 2018/11/22
- [GNUnet-SVN] [gnunet] 04/06: RPS API: Fix whitespaces, gnunet, 2018/11/22