[Top][All Lists]
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r25116 - in gnunet/src: fs include stream
From: |
gnunet |
Subject: |
[GNUnet-SVN] r25116 - in gnunet/src: fs include stream |
Date: |
Sat, 24 Nov 2012 23:17:15 +0100 |
Author: grothoff
Date: 2012-11-24 23:17:15 +0100 (Sat, 24 Nov 2012)
New Revision: 25116
Modified:
gnunet/src/fs/gnunet-service-fs_stream.c
gnunet/src/include/gnunet_stream_lib.h
gnunet/src/stream/stream_api.c
Log:
-ensure that either stream_api calls callbacks last or that we don't destroy a
stream handle while it is in use below us on the stack
Modified: gnunet/src/fs/gnunet-service-fs_stream.c
===================================================================
--- gnunet/src/fs/gnunet-service-fs_stream.c 2012-11-24 08:08:27 UTC (rev
25115)
+++ gnunet/src/fs/gnunet-service-fs_stream.c 2012-11-24 22:17:15 UTC (rev
25116)
@@ -25,7 +25,6 @@
*
* TODO:
* - limit # concurrent clients, have timeouts for server-side
- * - stream shutdown in callbacks from stream may not always work right now
(check with stream_api!)
*/
#include "platform.h"
#include "gnunet_constants.h"
@@ -78,6 +77,11 @@
struct GNUNET_DATASTORE_QueueEntry *qe;
/**
+ * Task that is scheduled to asynchronously terminate the connection.
+ */
+ GNUNET_SCHEDULER_TaskIdentifier terminate_task;
+
+ /**
* Size of the last write that was initiated.
*/
size_t reply_size;
@@ -248,6 +252,13 @@
GNUNET_SCHEDULER_TaskIdentifier timeout_task;
/**
+ * Task to reset streams that had errors (asynchronously,
+ * as we may not be able to do it immediately during a
+ * callback from the stream API).
+ */
+ GNUNET_SCHEDULER_TaskIdentifier reset_task;
+
+ /**
* Is this stream ready for transmission?
*/
int is_ready;
@@ -378,6 +389,55 @@
/**
+ * Task called when it is time to destroy an inactive stream.
+ *
+ * @param cls the 'struct StreamHandle' to tear down
+ * @param tc scheduler context, unused
+ */
+static void
+stream_timeout (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ struct StreamHandle *sh = cls;
+
+ sh->timeout_task = GNUNET_SCHEDULER_NO_TASK;
+ destroy_stream_handle (sh);
+}
+
+
+/**
+ * Task called when it is time to reset an stream.
+ *
+ * @param cls the 'struct StreamHandle' to tear down
+ * @param tc scheduler context, unused
+ */
+static void
+reset_stream_task (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ struct StreamHandle *sh = cls;
+
+ sh->reset_task = GNUNET_SCHEDULER_NO_TASK;
+ reset_stream (sh);
+}
+
+
+/**
+ * We had a serious error, tear down and re-create stream from scratch,
+ * but do so asynchronously.
+ *
+ * @param sh stream to reset
+ */
+static void
+reset_stream_async (struct StreamHandle *sh)
+{
+ if (GNUNET_SCHEDULER_NO_TASK == sh->reset_task)
+ sh->reset_task = GNUNET_SCHEDULER_add_now (&reset_stream_task,
+ sh);
+}
+
+
+/**
* We got a reply from the stream. Process it.
*
* @param cls the struct StreamHandle
@@ -403,7 +463,7 @@
GNUNET_NO, GNUNET_NO))
{
GNUNET_break_op (0);
- reset_stream (sh);
+ reset_stream_async (sh);
return size;
}
sh->rh = GNUNET_STREAM_read (sh->stream,
@@ -513,6 +573,7 @@
if (sizeof (struct StreamReplyMessage) > msize)
{
GNUNET_break_op (0);
+ reset_stream_async (sh);
return GNUNET_SYSERR;
}
srm = (const struct StreamReplyMessage *) message;
@@ -523,7 +584,8 @@
type,
&srm[1], msize, &query))
{
- GNUNET_break_op (0);
+ GNUNET_break_op (0);
+ reset_stream_async (sh);
return GNUNET_SYSERR;
}
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -553,6 +615,7 @@
return GNUNET_OK;
default:
GNUNET_break_op (0);
+ reset_stream_async (sh);
return GNUNET_SYSERR;
}
}
@@ -633,23 +696,6 @@
/**
- * Task called when it is time to destroy an inactive stream.
- *
- * @param cls the 'struct StreamHandle' to tear down
- * @param tc scheduler context, unused
- */
-static void
-stream_timeout (void *cls,
- const struct GNUNET_SCHEDULER_TaskContext *tc)
-{
- struct StreamHandle *sh = cls;
-
- sh->timeout_task = GNUNET_SCHEDULER_NO_TASK;
- destroy_stream_handle (sh);
-}
-
-
-/**
* Cancel an active request; must not be called after 'proc'
* was calld.
*
@@ -691,7 +737,9 @@
GNUNET_STATISTICS_update (GSF_stats,
gettext_noop ("# stream connections active"), -1,
GNUNET_NO);
- if (NULL != sc->rh)
+ if (GNUNET_SCHEDULER_NO_TASK != sc->terminate_task)
+ GNUNET_SCHEDULER_cancel (sc->terminate_task);
+ if (NULL != sc->rh)
GNUNET_STREAM_io_read_cancel (sc->rh);
if (NULL != sc->wh)
GNUNET_STREAM_io_write_cancel (sc->wh);
@@ -707,6 +755,38 @@
/**
+ * Task run to asynchronously terminate the stream.
+ *
+ * @param cls the 'struct StreamClient'
+ * @param tc scheduler context
+ */
+static void
+terminate_stream_task (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ struct StreamClient *sc = cls;
+
+ sc->terminate_task = GNUNET_SCHEDULER_NO_TASK;
+ terminate_stream (sc);
+}
+
+
+/**
+ * We had a serious error, termiante stream,
+ * but do so asynchronously.
+ *
+ * @param sc stream to reset
+ */
+static void
+terminate_stream_async (struct StreamClient *sc)
+{
+ if (GNUNET_SCHEDULER_NO_TASK == sc->terminate_task)
+ sc->terminate_task = GNUNET_SCHEDULER_add_now (&terminate_stream_task,
+ sc);
+}
+
+
+/**
* Functions of this signature are called whenever data is available from the
* stream.
*
@@ -782,7 +862,7 @@
if (GNUNET_SYSERR == ret)
{
GNUNET_break_op (0);
- terminate_stream (sc);
+ terminate_stream_async (sc);
return size;
}
break;
@@ -790,7 +870,7 @@
case GNUNET_STREAM_SHUTDOWN:
case GNUNET_STREAM_SYSERR:
case GNUNET_STREAM_BROKEN:
- terminate_stream (sc);
+ terminate_stream_async (sc);
return size;
default:
GNUNET_break (0);
@@ -922,6 +1002,7 @@
ntohs (message->size))
{
GNUNET_break_op (0);
+ terminate_stream_async (sc);
return GNUNET_SYSERR;
}
sqm = (const struct StreamQueryMessage *) message;
@@ -944,6 +1025,7 @@
return GNUNET_OK;
default:
GNUNET_break_op (0);
+ terminate_stream_async (sc);
return GNUNET_SYSERR;
}
}
Modified: gnunet/src/include/gnunet_stream_lib.h
===================================================================
--- gnunet/src/include/gnunet_stream_lib.h 2012-11-24 08:08:27 UTC (rev
25115)
+++ gnunet/src/include/gnunet_stream_lib.h 2012-11-24 22:17:15 UTC (rev
25116)
@@ -246,12 +246,10 @@
* Listens for stream connections for a specific application ports
*
* @param cfg the configuration to use
- *
* @param app_port the application port for which new streams will be
* accepted. If another stream is listening on the same port the
* listen_cb will be called to signal binding error and the returned
* ListenSocket will be invalidated.
- *
* @param listen_cb this function will be called when a peer tries to establish
* a stream with us
* @param listen_cb_cls closure for listen_cb
Modified: gnunet/src/stream/stream_api.c
===================================================================
--- gnunet/src/stream/stream_api.c 2012-11-24 08:08:27 UTC (rev 25115)
+++ gnunet/src/stream/stream_api.c 2012-11-24 22:17:15 UTC (rev 25116)
@@ -1917,13 +1917,16 @@
that that stream has been shutdown */
if (NULL != socket->write_handle)
{
- // FIXME: this breaks if 'write_cont' decides to
- // call SOCKET_close!
- if (NULL != socket->write_handle->write_cont)
- socket->write_handle->write_cont (socket->write_handle->write_cont_cls,
- GNUNET_STREAM_SHUTDOWN, 0);
+ GNUNET_STREAM_CompletionContinuation wc;
+ void *wc_cls;
+
+ wc = socket->write_handle->write_cont;
+ wc_cls = socket->write_handle->write_cont_cls;
GNUNET_STREAM_io_write_cancel (socket->write_handle);
socket->write_handle = NULL;
+ if (NULL != wc)
+ wc (wc_cls,
+ GNUNET_STREAM_SHUTDOWN, 0);
}
return GNUNET_OK;
}
@@ -2041,13 +2044,16 @@
that that stream has been shutdown */
if (NULL != socket->write_handle)
{
- // FIXME: this breaks if 'write_cont' decides to
- // call SOCKET_close!
- if (NULL != socket->write_handle->write_cont)
- socket->write_handle->write_cont (socket->write_handle->write_cont_cls,
- GNUNET_STREAM_SHUTDOWN, 0);
+ GNUNET_STREAM_CompletionContinuation wc;
+ void *wc_cls;
+
+ wc = socket->write_handle->write_cont;
+ wc_cls = socket->write_handle->write_cont_cls;
GNUNET_STREAM_io_write_cancel (socket->write_handle);
socket->write_handle = NULL;
+ if (NULL != wc)
+ wc (wc_cls,
+ GNUNET_STREAM_SHUTDOWN, 0);
}
return GNUNET_OK;
}
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r25116 - in gnunet/src: fs include stream,
gnunet <=