qemu-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[PATCH v4 1/3] QIOChannel: Add io_writev_zerocopy & io_flush_zerocopy ca


From: Leonardo Bras
Subject: [PATCH v4 1/3] QIOChannel: Add io_writev_zerocopy & io_flush_zerocopy callbacks
Date: Sat, 9 Oct 2021 04:56:11 -0300

Adds io_async_writev and io_async_flush as optional callback to QIOChannelClass,
allowing the implementation of asynchronous writes by subclasses.

How to use them:
- Write data using qio_channel_writev_zerocopu(),
- Wait write completion with qio_channel_flush_zerocopy().

Notes:
As some zerocopy implementations work asynchronously, it's
recommended to keep the write buffer untouched until the return of
qio_channel_flush_zerocopy(), by the risk of sending an updated buffer
instead of the one at the write.

As the new callbacks are optional, if a subclass does not implement them, then:
- io_async_writev will return -1,
- io_async_flush will return 0 without changing anything.

Also, some functions like qio_channel_writev_full_all() were adapted to
receive a flag parameter. That allows shared code between zerocopy and
non-zerocopy writev.

Signed-off-by: Leonardo Bras <leobras@redhat.com>
---
 include/io/channel.h | 103 +++++++++++++++++++++++++++++++++++--------
 io/channel.c         |  74 +++++++++++++++++++++++--------
 2 files changed, 141 insertions(+), 36 deletions(-)

diff --git a/include/io/channel.h b/include/io/channel.h
index 88988979f8..e7d4e1521f 100644
--- a/include/io/channel.h
+++ b/include/io/channel.h
@@ -32,12 +32,15 @@ OBJECT_DECLARE_TYPE(QIOChannel, QIOChannelClass,
 
 #define QIO_CHANNEL_ERR_BLOCK -2
 
+#define QIO_CHANNEL_WRITE_FLAG_ZEROCOPY 0x1
+
 typedef enum QIOChannelFeature QIOChannelFeature;
 
 enum QIOChannelFeature {
     QIO_CHANNEL_FEATURE_FD_PASS,
     QIO_CHANNEL_FEATURE_SHUTDOWN,
     QIO_CHANNEL_FEATURE_LISTEN,
+    QIO_CHANNEL_FEATURE_WRITE_ZEROCOPY,
 };
 
 
@@ -136,6 +139,12 @@ struct QIOChannelClass {
                                   IOHandler *io_read,
                                   IOHandler *io_write,
                                   void *opaque);
+    ssize_t (*io_writev_zerocopy)(QIOChannel *ioc,
+                                  const struct iovec *iov,
+                                  size_t niov,
+                                  Error **errp);
+    int (*io_flush_zerocopy)(QIOChannel *ioc,
+                              Error **errp);
 };
 
 /* General I/O handling functions */
@@ -222,12 +231,13 @@ ssize_t qio_channel_readv_full(QIOChannel *ioc,
 
 
 /**
- * qio_channel_writev_full:
+ * qio_channel_writev_full_flags:
  * @ioc: the channel object
  * @iov: the array of memory regions to write data from
  * @niov: the length of the @iov array
  * @fds: an array of file handles to send
  * @nfds: number of file handles in @fds
+ * @flags: write flags (QIO_CHANNEL_WRITE_FLAG_*)
  * @errp: pointer to a NULL-initialized error object
  *
  * Write data to the IO channel, reading it from the
@@ -242,6 +252,10 @@ ssize_t qio_channel_readv_full(QIOChannel *ioc,
  * guaranteed. If the channel is non-blocking and no
  * data can be sent, it will return QIO_CHANNEL_ERR_BLOCK
  *
+ * If flag QIO_CHANNEL_WRITE_FLAG_ZEROCOPY is passed,
+ * function will return once each buffer was queued for
+ * sending.
+ *
  * If there are file descriptors to send, the @fds
  * array should be non-NULL and provide the handles.
  * All file descriptors will be sent if at least one
@@ -255,12 +269,15 @@ ssize_t qio_channel_readv_full(QIOChannel *ioc,
  * or QIO_CHANNEL_ERR_BLOCK if no data is can be sent
  * and the channel is non-blocking
  */
-ssize_t qio_channel_writev_full(QIOChannel *ioc,
-                                const struct iovec *iov,
-                                size_t niov,
-                                int *fds,
-                                size_t nfds,
-                                Error **errp);
+ssize_t qio_channel_writev_full_flags(QIOChannel *ioc,
+                                      const struct iovec *iov,
+                                      size_t niov,
+                                      int *fds,
+                                      size_t nfds,
+                                      int flags,
+                                      Error **errp);
+#define qio_channel_writev_full(ioc, iov, niov, fds, nfds, errp) \
+    qio_channel_writev_full_flags(ioc, iov, niov, fds, nfds, 0, errp)
 
 /**
  * qio_channel_readv_all_eof:
@@ -321,10 +338,11 @@ int qio_channel_readv_all(QIOChannel *ioc,
 
 
 /**
- * qio_channel_writev_all:
+ * qio_channel_writev_all_flags:
  * @ioc: the channel object
  * @iov: the array of memory regions to write data from
  * @niov: the length of the @iov array
+ * @flags: write flags (QIO_CHANNEL_WRITE_FLAG_*)
  * @errp: pointer to a NULL-initialized error object
  *
  * Write data to the IO channel, reading it from the
@@ -339,10 +357,13 @@ int qio_channel_readv_all(QIOChannel *ioc,
  *
  * Returns: 0 if all bytes were written, or -1 on error
  */
-int qio_channel_writev_all(QIOChannel *ioc,
-                           const struct iovec *iov,
-                           size_t niov,
-                           Error **erp);
+int qio_channel_writev_all_flags(QIOChannel *ioc,
+                                 const struct iovec *iov,
+                                 size_t niov,
+                                 int flags,
+                                 Error **errp);
+#define qio_channel_writev_all(ioc, iov, niov, errp) \
+    qio_channel_writev_all_flags(ioc, iov, niov, 0, errp)
 
 /**
  * qio_channel_readv:
@@ -831,12 +852,13 @@ int qio_channel_readv_full_all(QIOChannel *ioc,
                                Error **errp);
 
 /**
- * qio_channel_writev_full_all:
+ * qio_channel_writev_full_all_flags:
  * @ioc: the channel object
  * @iov: the array of memory regions to write data from
  * @niov: the length of the @iov array
  * @fds: an array of file handles to send
  * @nfds: number of file handles in @fds
+ * @flags: write flags (QIO_CHANNEL_WRITE_FLAG_*)
  * @errp: pointer to a NULL-initialized error object
  *
  *
@@ -846,13 +868,58 @@ int qio_channel_readv_full_all(QIOChannel *ioc,
  * to be written, yielding from the current coroutine
  * if required.
  *
+ * If QIO_CHANNEL_WRITE_FLAG_ZEROCOPY is passed in flags,
+ * instead of waiting for all requested data to be written,
+ * this function will wait until it's all queued for writing.
+ *
  * Returns: 0 if all bytes were written, or -1 on error
  */
 
-int qio_channel_writev_full_all(QIOChannel *ioc,
-                                const struct iovec *iov,
-                                size_t niov,
-                                int *fds, size_t nfds,
-                                Error **errp);
+int qio_channel_writev_full_all_flags(QIOChannel *ioc,
+                                      const struct iovec *iov,
+                                      size_t niov,
+                                      int *fds, size_t nfds,
+                                      int flags, Error **errp);
+#define qio_channel_writev_full_all(ioc, iov, niov, fds, nfds, errp) \
+    qio_channel_writev_full_all_flags(ioc, iov, niov, fds, nfds, 0, errp)
+
+/**
+ * qio_channel_writev_zerocopy:
+ * @ioc: the channel object
+ * @iov: the array of memory regions to write data from
+ * @niov: the length of the @iov array
+ * @errp: pointer to a NULL-initialized error object
+ *
+ * Behaves like qio_channel_writev_full_all_flags, but will write
+ * data asynchronously while avoiding unnecessary data copy.
+ * This function may return before any data is actually written,
+ * but should queue every buffer for writting.
+ *
+ * If at some point it's necessary wait for all data to be
+ * written, use qio_channel_flush_zerocopy().
+ *
+ * If zerocopy is not available, returns -1 and set errp.
+ */
+
+ssize_t qio_channel_writev_zerocopy(QIOChannel *ioc,
+                                    const struct iovec *iov,
+                                    size_t niov,
+                                    Error **errp);
+
+/**
+ * qio_channel_flush_zerocopy:
+ * @ioc: the channel object
+ * @errp: pointer to a NULL-initialized error object
+ *
+ * Will lock until every packet queued with
+ * qio_channel_writev_zerocopy() is sent, or return
+ * in case of any error.
+ *
+ * Returns -1 if any error is found, 0 otherwise.
+ * If not implemented, returns 0 without changing anything.
+ */
+
+int qio_channel_flush_zerocopy(QIOChannel *ioc,
+                               Error **errp);
 
 #endif /* QIO_CHANNEL_H */
diff --git a/io/channel.c b/io/channel.c
index e8b019dc36..811c93ae23 100644
--- a/io/channel.c
+++ b/io/channel.c
@@ -67,15 +67,27 @@ ssize_t qio_channel_readv_full(QIOChannel *ioc,
 }
 
 
-ssize_t qio_channel_writev_full(QIOChannel *ioc,
-                                const struct iovec *iov,
-                                size_t niov,
-                                int *fds,
-                                size_t nfds,
-                                Error **errp)
+ssize_t qio_channel_writev_full_flags(QIOChannel *ioc,
+                                      const struct iovec *iov,
+                                      size_t niov,
+                                      int *fds,
+                                      size_t nfds,
+                                      int flags,
+                                      Error **errp)
 {
     QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
 
+    if (flags & QIO_CHANNEL_WRITE_FLAG_ZEROCOPY) {
+        if (!klass->io_writev_zerocopy ||
+            !qio_channel_has_feature(ioc, QIO_CHANNEL_FEATURE_WRITE_ZEROCOPY)) 
{
+            error_setg_errno(errp, EINVAL,
+                             "Channel does not support zerocopy writev");
+            return -1;
+        }
+
+        return klass->io_writev_zerocopy(ioc, iov, niov, errp);
+    }
+
     if ((fds || nfds) &&
         !qio_channel_has_feature(ioc, QIO_CHANNEL_FEATURE_FD_PASS)) {
         error_setg_errno(errp, EINVAL,
@@ -212,19 +224,20 @@ int qio_channel_readv_full_all(QIOChannel *ioc,
     return ret;
 }
 
-int qio_channel_writev_all(QIOChannel *ioc,
-                           const struct iovec *iov,
-                           size_t niov,
-                           Error **errp)
+int qio_channel_writev_all_flags(QIOChannel *ioc,
+                                 const struct iovec *iov,
+                                 size_t niov,
+                                 int flags,
+                                 Error **errp)
 {
-    return qio_channel_writev_full_all(ioc, iov, niov, NULL, 0, errp);
+    return qio_channel_writev_full_all_flags(ioc, iov, niov, NULL, 0, flags, 
errp);
 }
 
-int qio_channel_writev_full_all(QIOChannel *ioc,
-                                const struct iovec *iov,
-                                size_t niov,
-                                int *fds, size_t nfds,
-                                Error **errp)
+int qio_channel_writev_full_all_flags(QIOChannel *ioc,
+                                      const struct iovec *iov,
+                                      size_t niov,
+                                      int *fds, size_t nfds,
+                                      int flags, Error **errp)
 {
     int ret = -1;
     struct iovec *local_iov = g_new(struct iovec, niov);
@@ -237,8 +250,8 @@ int qio_channel_writev_full_all(QIOChannel *ioc,
 
     while (nlocal_iov > 0) {
         ssize_t len;
-        len = qio_channel_writev_full(ioc, local_iov, nlocal_iov, fds, nfds,
-                                      errp);
+        len = qio_channel_writev_full_flags(ioc, local_iov, nlocal_iov, fds, 
nfds,
+                                          flags, errp);
         if (len == QIO_CHANNEL_ERR_BLOCK) {
             if (qemu_in_coroutine()) {
                 qio_channel_yield(ioc, G_IO_OUT);
@@ -474,6 +487,31 @@ off_t qio_channel_io_seek(QIOChannel *ioc,
 }
 
 
+ssize_t qio_channel_writev_zerocopy(QIOChannel *ioc,
+                                    const struct iovec *iov,
+                                    size_t niov,
+                                    Error **errp)
+{
+    return qio_channel_writev_full_flags(ioc, iov, niov, NULL, 0,
+                                         QIO_CHANNEL_WRITE_FLAG_ZEROCOPY,
+                                         errp);
+}
+
+
+int qio_channel_flush_zerocopy(QIOChannel *ioc,
+                               Error **errp)
+{
+    QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
+
+    if (!klass->io_flush_zerocopy ||
+        !qio_channel_has_feature(ioc, QIO_CHANNEL_FEATURE_WRITE_ZEROCOPY)) {
+        return 0;
+    }
+
+    return klass->io_flush_zerocopy(ioc, errp);
+}
+
+
 static void qio_channel_restart_read(void *opaque)
 {
     QIOChannel *ioc = opaque;
-- 
2.33.0




reply via email to

[Prev in Thread] Current Thread [Next in Thread]