qemu-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: [PATCH v6 00/25] monitor: add asynchronous command type


From: Kevin Wolf
Subject: Re: [PATCH v6 00/25] monitor: add asynchronous command type
Date: Mon, 16 Dec 2019 13:07:01 +0100
User-agent: Mutt/1.12.1 (2019-06-15)

Am 13.12.2019 um 17:28 hat Marc-André Lureau geschrieben:
> On Fri, Dec 13, 2019 at 8:04 PM Kevin Wolf <address@hidden> wrote:
> >
> > Am 08.11.2019 um 16:00 hat Marc-André Lureau geschrieben:
> > > The following series implements an internal async command solution
> > > instead. By introducing a session context and a command return
> > > handler, QMP handlers can:
> > > - defer the return, allowing the mainloop to reenter
> > > - return only to the caller (instead of the broadcast event reply)
> > > - optionnally allow cancellation when the client is gone
> > > - track on-going qapi command(s) per session
> >
> > This requires major changes to existing QMP command handlers. Did you
> > consider at least optionally providing a way where instead of using the
> > new async_fn, QMP already creates a coroutine in which the handler is
> > executed?
> 
> Yes, but I don't see how this could be done without the basic callback
> infrastructure I propose here. Also forcing existing code to be
> translated to coroutine-aware is probably even more complicated.

I'll attach what I hacked up last week after discussing a problem with
Markus and Max. As you probably expected, screendump isn't really my
main motivation to look into this. The specific command we discussed was
block_resize, which can potentially block the monitor for a while, but
I'm sure that many more block commands have the same problem.

What my patch does is moving everything into a coroutine. This is wrong
because not everything can be run in a coroutine, so it needs to be made
optional (like you did with your async flag).

The problem isn't with completely coroutine-unaware code, though: That
one would just work, even if not taking advantage from the coroutine. A
potential problem exists with code that behaves differently when run in
a coroutine or outside of coroutine context (generally by checking
qemu_in_coroutine())), or calls of coroutine-unaware code into such
functions.

Running some command handlers outside of coroutine context wouldn't be
hard to add to my patch (basically just a BH), but I haven't looked into
the QAPI side of making it an option.

> > At least for some of the block layer commands, we could simply enable
> > this without changing any of the code and would automatically get rid of
> > blocking the guest while the command is doing I/O. If we need to
> > implement .async_fn, in contrast, it would be quite a bit of boiler
> > plate code for each single handler to create a coroutine for the real
> > handler and to wrap all parameters in a struct.
> 
> We could have the generator do that for you eventually, and spawn the
> coroutine.

Yes, if we need both, that's an option. I'd like to explore first if the
callback-based approach is actually needed, though.

> > > The screendump command is converted to an async:true version to solve
> > > rhbz#1230527. The command shows basic cancellation (this could be
> > > extended if needed). It could be further improved to do asynchronous
> > > IO writes as well.
> >
> > After converting it to QIOChannel like you already do, I/O would
> > automatically become asynchronous when run in a coroutine.
> >
> > If you don't want this yet, but only fix the BZ, I guess you could delay
> > that patch until later and just have a single yield and reenter of the
> > command handler coroutine like this:
> >
> >     co = qemu_coroutine_self();
> >     aio_co_schedule(qemu_coroutine_get_aio_context(co), co);
> >     qemu_coroutine_yield();

(This specific code is wrong, I misread your patches. You don't want to
immediately reenter the coroutine, but only in graphic_hw_update_done().)

> If various places of code start doing that, we are in trouble, the
> stack may grow, cancellation becomes hairy.

I don't understand. How does the coroutine-based approach differ from
what your series does? Basically, instead of splitting qmp_screendump()
in two parts, you keep the existing single function, just with a
qemu_coroutine_yield() in the middle, and instead of calling
qmp_screendump_finish() you wake up the coroutine.

Yes, instead of malloc'ing a struct qmp_screendump, you would keep
things on the stack, but that doesn't make the stack grow as these
things are already on the stack in qmp_screendump().

> Furthermore, in the case of screendump, IO is not necessarily within
> the coroutine context. In this case, we need to wait for the QXL
> device to "flush" the screen. Communicating this event back to the
> coroutine isn't simpler than what I propose here.

Waiting for something in a coroutine means calling
qemu_coroutine_yield() and then letting the event handler call
aio_co_wake() to resume the coroutine. It's really simple. I'm pretty
sure that if qmp_screendump() is called in a coroutine, your I/O would
automatically end up in the coroutine context unless you do something
specifically to avoid it.

Kevin


>From 8d4b8ba34d33d3bbf3f0a21703928eb68895e169 Mon Sep 17 00:00:00 2001
From: Kevin Wolf <address@hidden>
Date: Thu, 12 Dec 2019 16:32:27 +0100
Subject: [PATCH] qmp: Move dispatcher to a coroutine

Signed-off-by: Kevin Wolf <address@hidden>
---
 include/qapi/qmp/dispatch.h |  2 +
 monitor/monitor-internal.h  |  5 ++-
 monitor/monitor.c           | 24 +++++++----
 monitor/qmp.c               | 80 +++++++++++++++++++++++--------------
 qapi/qmp-dispatch.c         |  4 ++
 5 files changed, 75 insertions(+), 40 deletions(-)

diff --git a/include/qapi/qmp/dispatch.h b/include/qapi/qmp/dispatch.h
index 9aa426a398..4088255463 100644
--- a/include/qapi/qmp/dispatch.h
+++ b/include/qapi/qmp/dispatch.h
@@ -29,6 +29,8 @@ typedef enum QmpCommandOptions
 typedef struct QmpCommand
 {
     const char *name;
+    /* Runs outside of coroutine context for OOB commands, but in coroutine 
context
+     * for everything else. */
     QmpCommandFunc *fn;
     QmpCommandOptions options;
     QTAILQ_ENTRY(QmpCommand) node;
diff --git a/monitor/monitor-internal.h b/monitor/monitor-internal.h
index d78f5ca190..7389b6a56c 100644
--- a/monitor/monitor-internal.h
+++ b/monitor/monitor-internal.h
@@ -154,7 +154,8 @@ static inline bool monitor_is_qmp(const Monitor *mon)
 
 typedef QTAILQ_HEAD(MonitorList, Monitor) MonitorList;
 extern IOThread *mon_iothread;
-extern QEMUBH *qmp_dispatcher_bh;
+extern Coroutine *qmp_dispatcher_co;
+extern bool qmp_dispatcher_co_busy;
 extern QmpCommandList qmp_commands, qmp_cap_negotiation_commands;
 extern QemuMutex monitor_lock;
 extern MonitorList mon_list;
@@ -172,7 +173,7 @@ void monitor_fdsets_cleanup(void);
 
 void qmp_send_response(MonitorQMP *mon, const QDict *rsp);
 void monitor_data_destroy_qmp(MonitorQMP *mon);
-void monitor_qmp_bh_dispatcher(void *data);
+void coroutine_fn monitor_qmp_dispatcher_co(void *data);
 
 int get_monitor_def(int64_t *pval, const char *name);
 void help_cmd(Monitor *mon, const char *name);
diff --git a/monitor/monitor.c b/monitor/monitor.c
index 12898b6448..8f90d0b46c 100644
--- a/monitor/monitor.c
+++ b/monitor/monitor.c
@@ -53,8 +53,9 @@ typedef struct {
 /* Shared monitor I/O thread */
 IOThread *mon_iothread;
 
-/* Bottom half to dispatch the requests received from I/O thread */
-QEMUBH *qmp_dispatcher_bh;
+/* Coroutine to dispatch the requests received from I/O thread */
+Coroutine *qmp_dispatcher_co;
+bool qmp_dispatcher_co_busy;
 
 /* Protects mon_list, monitor_qapi_event_state, monitor_destroyed.  */
 QemuMutex monitor_lock;
@@ -579,9 +580,16 @@ void monitor_cleanup(void)
     }
     qemu_mutex_unlock(&monitor_lock);
 
-    /* QEMUBHs needs to be deleted before destroying the I/O thread */
-    qemu_bh_delete(qmp_dispatcher_bh);
-    qmp_dispatcher_bh = NULL;
+    /* The dispatcher needs to stop before destroying the I/O thread */
+    if (!atomic_mb_read(&qmp_dispatcher_co_busy)) {
+        aio_co_schedule(iohandler_get_aio_context(), qmp_dispatcher_co);
+        qmp_dispatcher_co = NULL;
+    }
+
+    AIO_WAIT_WHILE(qemu_get_aio_context(),
+                   aio_bh_poll(iohandler_get_aio_context()) ||
+                   atomic_mb_read(&qmp_dispatcher_co_busy));
+
     if (mon_iothread) {
         iothread_destroy(mon_iothread);
         mon_iothread = NULL;
@@ -604,9 +612,9 @@ void monitor_init_globals_core(void)
      * have commands assuming that context.  It would be nice to get
      * rid of those assumptions.
      */
-    qmp_dispatcher_bh = aio_bh_new(iohandler_get_aio_context(),
-                                   monitor_qmp_bh_dispatcher,
-                                   NULL);
+    qmp_dispatcher_co = qemu_coroutine_create(monitor_qmp_dispatcher_co, NULL);
+    atomic_mb_set(&qmp_dispatcher_co_busy, true);
+    aio_co_schedule(iohandler_get_aio_context(), qmp_dispatcher_co);
 }
 
 QemuOptsList qemu_mon_opts = {
diff --git a/monitor/qmp.c b/monitor/qmp.c
index b67a8e7d1f..f5dc77cd0a 100644
--- a/monitor/qmp.c
+++ b/monitor/qmp.c
@@ -133,6 +133,8 @@ static void monitor_qmp_respond(MonitorQMP *mon, QDict *rsp)
     }
 }
 
+/* Runs outside of coroutine context for OOB commands, but in coroutine context
+ * for everything else. */
 static void monitor_qmp_dispatch(MonitorQMP *mon, QObject *req)
 {
     Monitor *old_mon;
@@ -211,43 +213,59 @@ static QMPRequest 
*monitor_qmp_requests_pop_any_with_lock(void)
     return req_obj;
 }
 
-void monitor_qmp_bh_dispatcher(void *data)
+void coroutine_fn monitor_qmp_dispatcher_co(void *data)
 {
-    QMPRequest *req_obj = monitor_qmp_requests_pop_any_with_lock();
+    QMPRequest *req_obj = NULL;
     QDict *rsp;
     bool need_resume;
     MonitorQMP *mon;
 
-    if (!req_obj) {
-        return;
-    }
+    while (true) {
+        assert(atomic_mb_read(&qmp_dispatcher_co_busy) == true);
+
+        while (!(req_obj = monitor_qmp_requests_pop_any_with_lock())) {
+            /* Wait to be reentered from handle_qmp_command, or terminate if
+             * qmp_dispatcher_co has been reset to NULL */
+            atomic_mb_set(&qmp_dispatcher_co_busy, false);
+            if (qmp_dispatcher_co) {
+                qemu_coroutine_yield();
+            } else {
+                return;
+            }
+            atomic_mb_set(&qmp_dispatcher_co_busy, true);
+        }
 
-    mon = req_obj->mon;
-    /*  qmp_oob_enabled() might change after "qmp_capabilities" */
-    need_resume = !qmp_oob_enabled(mon) ||
-        mon->qmp_requests->length == QMP_REQ_QUEUE_LEN_MAX - 1;
-    qemu_mutex_unlock(&mon->qmp_queue_lock);
-    if (req_obj->req) {
-        QDict *qdict = qobject_to(QDict, req_obj->req);
-        QObject *id = qdict ? qdict_get(qdict, "id") : NULL;
-        trace_monitor_qmp_cmd_in_band(qobject_get_try_str(id) ?: "");
-        monitor_qmp_dispatch(mon, req_obj->req);
-    } else {
-        assert(req_obj->err);
-        rsp = qmp_error_response(req_obj->err);
-        req_obj->err = NULL;
-        monitor_qmp_respond(mon, rsp);
-        qobject_unref(rsp);
-    }
+        aio_co_schedule(qemu_get_aio_context(), qmp_dispatcher_co);
+        qemu_coroutine_yield();
+
+        mon = req_obj->mon;
+        /*  qmp_oob_enabled() might change after "qmp_capabilities" */
+        need_resume = !qmp_oob_enabled(mon) ||
+            mon->qmp_requests->length == QMP_REQ_QUEUE_LEN_MAX - 1;
+        qemu_mutex_unlock(&mon->qmp_queue_lock);
+        if (req_obj->req) {
+            QDict *qdict = qobject_to(QDict, req_obj->req);
+            QObject *id = qdict ? qdict_get(qdict, "id") : NULL;
+            trace_monitor_qmp_cmd_in_band(qobject_get_try_str(id) ?: "");
+            monitor_qmp_dispatch(mon, req_obj->req);
+        } else {
+            assert(req_obj->err);
+            rsp = qmp_error_response(req_obj->err);
+            req_obj->err = NULL;
+            monitor_qmp_respond(mon, rsp);
+            qobject_unref(rsp);
+        }
 
-    if (need_resume) {
-        /* Pairs with the monitor_suspend() in handle_qmp_command() */
-        monitor_resume(&mon->common);
-    }
-    qmp_request_free(req_obj);
+        if (need_resume) {
+            /* Pairs with the monitor_suspend() in handle_qmp_command() */
+            monitor_resume(&mon->common);
+        }
+        qmp_request_free(req_obj);
 
-    /* Reschedule instead of looping so the main loop stays responsive */
-    qemu_bh_schedule(qmp_dispatcher_bh);
+        /* Reschedule instead of looping so the main loop stays responsive */
+        aio_co_schedule(iohandler_get_aio_context(), qmp_dispatcher_co);
+        qemu_coroutine_yield();
+    }
 }
 
 static void handle_qmp_command(void *opaque, QObject *req, Error *err)
@@ -308,7 +326,9 @@ static void handle_qmp_command(void *opaque, QObject *req, 
Error *err)
     qemu_mutex_unlock(&mon->qmp_queue_lock);
 
     /* Kick the dispatcher routine */
-    qemu_bh_schedule(qmp_dispatcher_bh);
+    if (!atomic_mb_read(&qmp_dispatcher_co_busy)) {
+        aio_co_wake(qmp_dispatcher_co);
+    }
 }
 
 static void monitor_qmp_read(void *opaque, const uint8_t *buf, int size)
diff --git a/qapi/qmp-dispatch.c b/qapi/qmp-dispatch.c
index bc264b3c9b..1dbc152307 100644
--- a/qapi/qmp-dispatch.c
+++ b/qapi/qmp-dispatch.c
@@ -75,6 +75,8 @@ static QDict *qmp_dispatch_check_obj(const QObject *request, 
bool allow_oob,
     return dict;
 }
 
+/* Runs outside of coroutine context for OOB commands, but in coroutine context
+ * for everything else. */
 static QObject *do_qmp_dispatch(QmpCommandList *cmds, QObject *request,
                                 bool allow_oob, Error **errp)
 {
@@ -164,6 +166,8 @@ bool qmp_is_oob(const QDict *dict)
         && !qdict_haskey(dict, "execute");
 }
 
+/* Runs outside of coroutine context for OOB commands, but in coroutine context
+ * for everything else. */
 QDict *qmp_dispatch(QmpCommandList *cmds, QObject *request,
                     bool allow_oob)
 {
-- 
2.20.1




reply via email to

[Prev in Thread] Current Thread [Next in Thread]