qemu-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[PATCH v7 07/21] multi-process: add co-routines to communicate with remo


From: elena . ufimtseva
Subject: [PATCH v7 07/21] multi-process: add co-routines to communicate with remote
Date: Sat, 27 Jun 2020 10:09:29 -0700

From: Elena Ufimtseva <elena.ufimtseva@oracle.com>

process to avoid blocking the main loop during the message exchanges.
To be used by proxy device.

Signed-off-by: Elena Ufimtseva <elena.ufimtseva@oracle.com>
Signed-off-by: Jagannathan Raman <jag.raman@oracle.com>
Signed-off-by: John G Johnson <john.g.johnson@oracle.com>
---
 include/io/mpqemu-link.h | 16 +++++++++
 io/mpqemu-link.c         | 78 ++++++++++++++++++++++++++++++++++++++++
 2 files changed, 94 insertions(+)

diff --git a/include/io/mpqemu-link.h b/include/io/mpqemu-link.h
index 1542e8ed07..52aa89656c 100644
--- a/include/io/mpqemu-link.h
+++ b/include/io/mpqemu-link.h
@@ -17,6 +17,7 @@
 #include "qom/object.h"
 #include "qemu/thread.h"
 #include "io/channel.h"
+#include "io/channel-socket.h"
 
 #define REMOTE_MAX_FDS 8
 
@@ -30,6 +31,7 @@
  */
 typedef enum {
     INIT = 0,
+    RET_MSG,
     MAX = INT_MAX,
 } MPQemuCmd;
 
@@ -67,6 +69,20 @@ typedef struct {
     uint8_t *data2;
 } MPQemuMsg;
 
+struct MPQemuRequest {
+    MPQemuMsg *msg;
+    QIOChannelSocket *sioc;
+    Coroutine *co;
+    bool finished;
+    int error;
+    long ret;
+};
+
+typedef struct MPQemuRequest MPQemuRequest;
+
+uint64_t mpqemu_msg_send_reply_co(MPQemuMsg *msg, QIOChannel *ioc,
+                                  Error **errp);
+
 void mpqemu_msg_send(MPQemuMsg *msg, QIOChannel *ioc);
 int mpqemu_msg_recv(MPQemuMsg *msg, QIOChannel *ioc);
 
diff --git a/io/mpqemu-link.c b/io/mpqemu-link.c
index bfc542b5fd..c430b4d6a2 100644
--- a/io/mpqemu-link.c
+++ b/io/mpqemu-link.c
@@ -16,6 +16,8 @@
 #include "qapi/error.h"
 #include "qemu/iov.h"
 #include "qemu/error-report.h"
+#include "qemu/main-loop.h"
+#include "io/channel-socket.h"
 
 void mpqemu_msg_send(MPQemuMsg *msg, QIOChannel *ioc)
 {
@@ -118,6 +120,82 @@ int mpqemu_msg_recv(MPQemuMsg *msg, QIOChannel *ioc)
     return 0;
 }
 
+/* Use in proxy only as it clobbers fd handlers. */
+static void coroutine_fn mpqemu_msg_send_co(void *data)
+{
+    MPQemuRequest *req = (MPQemuRequest *)data;
+    MPQemuMsg msg_reply = {0};
+    long ret = -EINVAL;
+
+    if (!req->sioc) {
+        error_report("No channel available to send command %d",
+                     req->msg->cmd);
+        atomic_mb_set(&req->finished, true);
+        req->error = -EINVAL;
+        return;
+    }
+
+    req->co = qemu_coroutine_self();
+    mpqemu_msg_send(req->msg, QIO_CHANNEL(req->sioc));
+
+    yield_until_fd_readable(req->sioc->fd);
+
+    ret = mpqemu_msg_recv(&msg_reply, QIO_CHANNEL(req->sioc));
+    if (ret < 0) {
+        error_report("ERROR: failed to get a reply for command %d, \
+                     errno %s, ret is %ld",
+                     req->msg->cmd, strerror(errno), ret);
+        req->error = -errno;
+    } else {
+        if (!mpqemu_msg_valid(&msg_reply) || msg_reply.cmd != RET_MSG) {
+            error_report("ERROR: Invalid reply received for command %d",
+                         req->msg->cmd);
+            req->error = -EINVAL;
+        } else {
+            req->ret = msg_reply.data1.u64;
+        }
+    }
+    atomic_mb_set(&req->finished, true);
+}
+
+/*
+ * Create if needed and enter co-routine to send the message to the
+ * remote channel ioc and wait for the reply.
+ * Resturns the value from the reply message, sets the error on failure.
+ */
+
+uint64_t mpqemu_msg_send_reply_co(MPQemuMsg *msg, QIOChannel *ioc,
+                                  Error **errp)
+{
+    MPQemuRequest req = {0};
+    uint64_t ret = UINT64_MAX;
+
+    req.sioc = QIO_CHANNEL_SOCKET(ioc);
+    if (!req.sioc) {
+        return ret;
+    }
+
+    req.msg = msg;
+    req.ret = 0;
+    req.finished = false;
+
+    if (!req.co) {
+        req.co = qemu_coroutine_create(mpqemu_msg_send_co, &req);
+    }
+
+    qemu_coroutine_enter(req.co);
+    while (!req.finished) {
+        aio_poll(qemu_get_aio_context(), false);
+    }
+    if (req.error) {
+        error_setg(errp, "Error exchanging message with remote process, "\
+                        "socket %d, error %d", req.sioc->fd, req.error);
+    }
+    ret = req.ret;
+
+    return ret;
+}
+
 bool mpqemu_msg_valid(MPQemuMsg *msg)
 {
     if (msg->cmd >= MAX && msg->cmd < 0) {
-- 
2.25.GIT




reply via email to

[Prev in Thread] Current Thread [Next in Thread]