qemu-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[PATCH v5 6/7] migration/multifd: implement qpl compression and decompre


From: Yuan Liu
Subject: [PATCH v5 6/7] migration/multifd: implement qpl compression and decompression
Date: Wed, 20 Mar 2024 00:45:26 +0800

each qpl job is used to (de)compress a normal page and it can
be processed independently by the IAA hardware. All qpl jobs
are submitted to the hardware at once, and wait for all jobs
completion.

Signed-off-by: Yuan Liu <yuan1.liu@intel.com>
Reviewed-by: Nanhai Zou <nanhai.zou@intel.com>
---
 migration/multifd-qpl.c | 229 +++++++++++++++++++++++++++++++++++++++-
 1 file changed, 225 insertions(+), 4 deletions(-)

diff --git a/migration/multifd-qpl.c b/migration/multifd-qpl.c
index 6de65e9da7..479b051b24 100644
--- a/migration/multifd-qpl.c
+++ b/migration/multifd-qpl.c
@@ -13,6 +13,7 @@
 #include "qemu/osdep.h"
 #include "qemu/module.h"
 #include "qapi/error.h"
+#include "exec/ramblock.h"
 #include "migration.h"
 #include "multifd.h"
 #include "qpl/qpl.h"
@@ -171,6 +172,112 @@ static void qpl_send_cleanup(MultiFDSendParams *p, Error 
**errp)
     p->compress_data = NULL;
 }
 
+static inline void prepare_job(qpl_job *job, uint8_t *input, uint32_t 
input_len,
+                               uint8_t *output, uint32_t output_len,
+                               bool is_compression)
+{
+    job->op = is_compression ? qpl_op_compress : qpl_op_decompress;
+    job->next_in_ptr = input;
+    job->next_out_ptr = output;
+    job->available_in = input_len;
+    job->available_out = output_len;
+    job->flags = QPL_FLAG_FIRST | QPL_FLAG_LAST | QPL_FLAG_OMIT_VERIFY;
+    /* only supports one compression level */
+    job->level = 1;
+}
+
+/**
+ * set_raw_data_hdr: set the length of raw data
+ *
+ * If the length of the compressed output data is greater than or equal to
+ * the page size, then set the compressed data length to the data size and
+ * send raw data directly.
+ *
+ * @qpl: pointer to the QplData structure
+ * @index: the index of the compression job header
+ */
+static inline void set_raw_data_hdr(QplData *qpl, uint32_t index)
+{
+    assert(index < qpl->job_num);
+    qpl->zbuf_hdr[index] = cpu_to_be32(qpl->data_size);
+}
+
+/**
+ * is_raw_data: check if the data is raw data
+ *
+ * The raw data length is always equal to data size, which is the
+ * size of one page.
+ *
+ * Returns true if the data is raw data, otherwise false
+ *
+ * @qpl: pointer to the QplData structure
+ * @index: the index of the decompressed job header
+ */
+static inline bool is_raw_data(QplData *qpl, uint32_t index)
+{
+    assert(index < qpl->job_num);
+    return qpl->zbuf_hdr[index] == qpl->data_size;
+}
+
+static int run_comp_jobs(MultiFDSendParams *p, Error **errp)
+{
+    qpl_status status;
+    QplData *qpl = p->compress_data;
+    MultiFDPages_t *pages = p->pages;
+    uint32_t job_num = pages->normal_num;
+    qpl_job *job = NULL;
+    uint32_t off = 0;
+
+    assert(job_num <= qpl->job_num);
+    /* submit all compression jobs */
+    for (int i = 0; i < job_num; i++) {
+        job = qpl->job_array[i];
+        /* the compressed data size should be less than one page */
+        prepare_job(job, pages->block->host + pages->offset[i], qpl->data_size,
+                    qpl->zbuf + off, qpl->data_size - 1, true);
+retry:
+        status = qpl_submit_job(job);
+        if (status == QPL_STS_OK) {
+            off += qpl->data_size;
+        } else if (status == QPL_STS_QUEUES_ARE_BUSY_ERR) {
+            goto retry;
+        } else {
+            error_setg(errp, "multifd %u: qpl_submit_job failed with error %d",
+                       p->id, status);
+            return -1;
+        }
+    }
+
+    /* wait all jobs to complete */
+    for (int i = 0; i < job_num; i++) {
+        job = qpl->job_array[i];
+        status = qpl_wait_job(job);
+        if (status == QPL_STS_OK) {
+            qpl->zbuf_hdr[i] = cpu_to_be32(job->total_out);
+            p->iov[p->iovs_num].iov_len = job->total_out;
+            p->iov[p->iovs_num].iov_base = qpl->zbuf + (qpl->data_size * i);
+            p->next_packet_size += job->total_out;
+        } else if (status == QPL_STS_MORE_OUTPUT_NEEDED) {
+            /*
+             * the compression job does not fail, the output data
+             * size is larger than the provided memory size. In this
+             * case, raw data is sent directly to the destination.
+             */
+            set_raw_data_hdr(qpl, i);
+            p->iov[p->iovs_num].iov_len = qpl->data_size;
+            p->iov[p->iovs_num].iov_base = pages->block->host +
+                                           pages->offset[i];
+            p->next_packet_size += qpl->data_size;
+        } else {
+            error_setg(errp, "multifd %u: qpl_wait_job failed with error %d",
+                       p->id, status);
+            return -1;
+        }
+        p->iovs_num++;
+    }
+    return 0;
+}
+
 /**
  * qpl_send_prepare: prepare data to be able to send
  *
@@ -184,8 +291,28 @@ static void qpl_send_cleanup(MultiFDSendParams *p, Error 
**errp)
  */
 static int qpl_send_prepare(MultiFDSendParams *p, Error **errp)
 {
-    /* Implement in next patch */
-    return -1;
+    QplData *qpl = p->compress_data;
+    uint32_t hdr_size;
+
+    if (!multifd_send_prepare_common(p)) {
+        goto out;
+    }
+
+    assert(p->pages->normal_num <= qpl->job_num);
+    hdr_size = p->pages->normal_num * sizeof(uint32_t);
+    /* prepare the header that stores the lengths of all compressed data */
+    p->iov[1].iov_base = (uint8_t *) qpl->zbuf_hdr;
+    p->iov[1].iov_len = hdr_size;
+    p->iovs_num++;
+    p->next_packet_size += hdr_size;
+    if (run_comp_jobs(p, errp) != 0) {
+        return -1;
+    }
+
+out:
+    p->flags |= MULTIFD_FLAG_QPL;
+    multifd_send_fill_packet(p);
+    return 0;
 }
 
 /**
@@ -227,6 +354,60 @@ static void qpl_recv_cleanup(MultiFDRecvParams *p)
     p->compress_data = NULL;
 }
 
+static int run_decomp_jobs(MultiFDRecvParams *p, Error **errp)
+{
+    qpl_status status;
+    qpl_job *job;
+    QplData *qpl = p->compress_data;
+    uint32_t off = 0;
+    uint32_t job_num = p->normal_num;
+
+    assert(job_num <= qpl->job_num);
+    /* submit all decompression jobs */
+    for (int i = 0; i < job_num; i++) {
+        /* for the raw data, load it directly */
+        if (is_raw_data(qpl, i)) {
+            memcpy(p->host + p->normal[i], qpl->zbuf + off, qpl->data_size);
+            off += qpl->data_size;
+            continue;
+        }
+        job = qpl->job_array[i];
+        prepare_job(job, qpl->zbuf + off, qpl->zbuf_hdr[i],
+                    p->host + p->normal[i], qpl->data_size, false);
+retry:
+        status = qpl_submit_job(job);
+        if (status == QPL_STS_OK) {
+            off += qpl->zbuf_hdr[i];
+        } else if (status == QPL_STS_QUEUES_ARE_BUSY_ERR) {
+            goto retry;
+        } else {
+            error_setg(errp, "multifd %u: qpl_submit_job failed with error %d",
+                       p->id, status);
+            return -1;
+        }
+    }
+
+    /* wait all jobs to complete */
+    for (int i = 0; i < job_num; i++) {
+        if (is_raw_data(qpl, i)) {
+            continue;
+        }
+        job = qpl->job_array[i];
+        status = qpl_wait_job(job);
+        if (status != QPL_STS_OK) {
+            error_setg(errp, "multifd %u: qpl_wait_job failed with error %d",
+                       p->id, status);
+            return -1;
+        }
+        if (job->total_out != qpl->data_size) {
+            error_setg(errp, "multifd %u: decompressed len %u, expected len 
%u",
+                       p->id, job->total_out, qpl->data_size);
+            return -1;
+        }
+    }
+    return 0;
+}
+
 /**
  * qpl_recv: read the data from the channel into actual pages
  *
@@ -240,8 +421,48 @@ static void qpl_recv_cleanup(MultiFDRecvParams *p)
  */
 static int qpl_recv(MultiFDRecvParams *p, Error **errp)
 {
-    /* Implement in next patch */
-    return -1;
+    QplData *qpl = p->compress_data;
+    uint32_t in_size = p->next_packet_size;
+    uint32_t flags = p->flags & MULTIFD_FLAG_COMPRESSION_MASK;
+    uint32_t hdr_len = p->normal_num * sizeof(uint32_t);
+    uint32_t data_len = 0;
+    int ret;
+
+    if (flags != MULTIFD_FLAG_QPL) {
+        error_setg(errp, "multifd %u: flags received %x flags expected %x",
+                   p->id, flags, MULTIFD_FLAG_QPL);
+        return -1;
+    }
+    multifd_recv_zero_page_process(p);
+    if (!p->normal_num) {
+        assert(in_size == 0);
+        return 0;
+    }
+
+    /* read compressed data lengths */
+    assert(hdr_len < in_size);
+    ret = qio_channel_read_all(p->c, (void *) qpl->zbuf_hdr, hdr_len, errp);
+    if (ret != 0) {
+        return ret;
+    }
+    assert(p->normal_num <= qpl->job_num);
+    for (int i = 0; i < p->normal_num; i++) {
+        qpl->zbuf_hdr[i] = be32_to_cpu(qpl->zbuf_hdr[i]);
+        data_len += qpl->zbuf_hdr[i];
+        assert(qpl->zbuf_hdr[i] <= qpl->data_size);
+    }
+
+    /* read compressed data */
+    assert(in_size == hdr_len + data_len);
+    ret = qio_channel_read_all(p->c, (void *) qpl->zbuf, data_len, errp);
+    if (ret != 0) {
+        return ret;
+    }
+
+    if (run_decomp_jobs(p, errp) != 0) {
+        return -1;
+    }
+    return 0;
 }
 
 static MultiFDMethods multifd_qpl_ops = {
-- 
2.39.3




reply via email to

[Prev in Thread] Current Thread [Next in Thread]