[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[Qemu-devel] [PULL 08/16] migration: Add the core code of multi-thread c
From: |
Juan Quintela |
Subject: |
[Qemu-devel] [PULL 08/16] migration: Add the core code of multi-thread compression |
Date: |
Thu, 7 May 2015 13:50:36 +0200 |
From: Liang Li <address@hidden>
Implement the core logic of the multiple thread compression. At this
point, multiple thread compression can't co-work with xbzrle yet.
Signed-off-by: Liang Li <address@hidden>
Signed-off-by: Yang Zhang <address@hidden>
Signed-off-by: Juan Quintela <address@hidden>
---
arch_init.c | 184 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++---
1 file changed, 177 insertions(+), 7 deletions(-)
diff --git a/arch_init.c b/arch_init.c
index 48cae22..9f63c0f 100644
--- a/arch_init.c
+++ b/arch_init.c
@@ -355,12 +355,33 @@ static DecompressParam *decomp_param;
static QemuThread *decompress_threads;
static uint8_t *compressed_data_buf;
+static int do_compress_ram_page(CompressParam *param);
+
static void *do_data_compress(void *opaque)
{
- while (!quit_comp_thread) {
+ CompressParam *param = opaque;
- /* To be done */
+ while (!quit_comp_thread) {
+ qemu_mutex_lock(¶m->mutex);
+ /* Re-check the quit_comp_thread in case of
+ * terminate_compression_threads is called just before
+ * qemu_mutex_lock(¶m->mutex) and after
+ * while(!quit_comp_thread), re-check it here can make
+ * sure the compression thread terminate as expected.
+ */
+ while (!param->start && !quit_comp_thread) {
+ qemu_cond_wait(¶m->cond, ¶m->mutex);
+ }
+ if (!quit_comp_thread) {
+ do_compress_ram_page(param);
+ }
+ param->start = false;
+ qemu_mutex_unlock(¶m->mutex);
+ qemu_mutex_lock(comp_done_lock);
+ param->done = true;
+ qemu_cond_signal(comp_done_cond);
+ qemu_mutex_unlock(comp_done_lock);
}
return NULL;
@@ -368,9 +389,15 @@ static void *do_data_compress(void *opaque)
static inline void terminate_compression_threads(void)
{
- quit_comp_thread = true;
+ int idx, thread_count;
- /* To be done */
+ thread_count = migrate_compress_threads();
+ quit_comp_thread = true;
+ for (idx = 0; idx < thread_count; idx++) {
+ qemu_mutex_lock(&comp_param[idx].mutex);
+ qemu_cond_signal(&comp_param[idx].cond);
+ qemu_mutex_unlock(&comp_param[idx].mutex);
+ }
}
void migrate_compress_threads_join(void)
@@ -420,6 +447,7 @@ void migrate_compress_threads_create(void)
* it's ops to empty.
*/
comp_param[i].file = qemu_fopen_ops(NULL, &empty_ops);
+ comp_param[i].done = true;
qemu_mutex_init(&comp_param[i].mutex);
qemu_cond_init(&comp_param[i].cond);
qemu_thread_create(compress_threads + i, "compress",
@@ -829,6 +857,97 @@ static int ram_save_page(QEMUFile *f, RAMBlock* block,
ram_addr_t offset,
return pages;
}
+static int do_compress_ram_page(CompressParam *param)
+{
+ int bytes_sent, blen;
+ uint8_t *p;
+ RAMBlock *block = param->block;
+ ram_addr_t offset = param->offset;
+
+ p = memory_region_get_ram_ptr(block->mr) + (offset & TARGET_PAGE_MASK);
+
+ bytes_sent = save_page_header(param->file, block, offset |
+ RAM_SAVE_FLAG_COMPRESS_PAGE);
+ blen = qemu_put_compression_data(param->file, p, TARGET_PAGE_SIZE,
+ migrate_compress_level());
+ bytes_sent += blen;
+ atomic_inc(&acct_info.norm_pages);
+
+ return bytes_sent;
+}
+
+static inline void start_compression(CompressParam *param)
+{
+ param->done = false;
+ qemu_mutex_lock(¶m->mutex);
+ param->start = true;
+ qemu_cond_signal(¶m->cond);
+ qemu_mutex_unlock(¶m->mutex);
+}
+
+
+static uint64_t bytes_transferred;
+
+static void flush_compressed_data(QEMUFile *f)
+{
+ int idx, len, thread_count;
+
+ if (!migrate_use_compression()) {
+ return;
+ }
+ thread_count = migrate_compress_threads();
+ for (idx = 0; idx < thread_count; idx++) {
+ if (!comp_param[idx].done) {
+ qemu_mutex_lock(comp_done_lock);
+ while (!comp_param[idx].done && !quit_comp_thread) {
+ qemu_cond_wait(comp_done_cond, comp_done_lock);
+ }
+ qemu_mutex_unlock(comp_done_lock);
+ }
+ if (!quit_comp_thread) {
+ len = qemu_put_qemu_file(f, comp_param[idx].file);
+ bytes_transferred += len;
+ }
+ }
+}
+
+static inline void set_compress_params(CompressParam *param, RAMBlock *block,
+ ram_addr_t offset)
+{
+ param->block = block;
+ param->offset = offset;
+}
+
+static int compress_page_with_multi_thread(QEMUFile *f, RAMBlock *block,
+ ram_addr_t offset,
+ uint64_t *bytes_transferred)
+{
+ int idx, thread_count, bytes_xmit = -1, pages = -1;
+
+ thread_count = migrate_compress_threads();
+ qemu_mutex_lock(comp_done_lock);
+ while (true) {
+ for (idx = 0; idx < thread_count; idx++) {
+ if (comp_param[idx].done) {
+ bytes_xmit = qemu_put_qemu_file(f, comp_param[idx].file);
+ set_compress_params(&comp_param[idx], block, offset);
+ start_compression(&comp_param[idx]);
+ pages = 1;
+ *bytes_transferred += bytes_xmit;
+ break;
+ }
+ }
+ if (pages > 0) {
+ break;
+ } else {
+ qemu_cond_wait(comp_done_cond, comp_done_lock);
+ }
+ }
+ qemu_mutex_unlock(comp_done_lock);
+
+ return pages;
+}
+
/**
* ram_save_compressed_page: compress the given page and send it to the stream
*
@@ -845,8 +964,59 @@ static int ram_save_compressed_page(QEMUFile *f, RAMBlock
*block,
uint64_t *bytes_transferred)
{
int pages = -1;
+ uint64_t bytes_xmit;
+ MemoryRegion *mr = block->mr;
+ uint8_t *p;
+ int ret;
+
+ p = memory_region_get_ram_ptr(mr) + offset;
- /* To be done*/
+ bytes_xmit = 0;
+ ret = ram_control_save_page(f, block->offset,
+ offset, TARGET_PAGE_SIZE, &bytes_xmit);
+ if (bytes_xmit) {
+ *bytes_transferred += bytes_xmit;
+ pages = 1;
+ }
+ if (block == last_sent_block) {
+ offset |= RAM_SAVE_FLAG_CONTINUE;
+ }
+ if (ret != RAM_SAVE_CONTROL_NOT_SUPP) {
+ if (ret != RAM_SAVE_CONTROL_DELAYED) {
+ if (bytes_xmit > 0) {
+ acct_info.norm_pages++;
+ } else if (bytes_xmit == 0) {
+ acct_info.dup_pages++;
+ }
+ }
+ } else {
+ /* When starting the process of a new block, the first page of
+ * the block should be sent out before other pages in the same
+ * block, and all the pages in last block should have been sent
+ * out, keeping this order is important, because the 'cont' flag
+ * is used to avoid resending the block name.
+ */
+ if (block != last_sent_block) {
+ flush_compressed_data(f);
+ pages = save_zero_page(f, block, offset, p, bytes_transferred);
+ if (pages == -1) {
+ set_compress_params(&comp_param[0], block, offset);
+ /* Use the qemu thread to compress the data to make sure the
+ * first page is sent out before other pages
+ */
+ bytes_xmit = do_compress_ram_page(&comp_param[0]);
+ qemu_put_qemu_file(f, comp_param[0].file);
+ *bytes_transferred += bytes_xmit;
+ pages = 1;
+ }
+ } else {
+ pages = save_zero_page(f, block, offset, p, bytes_transferred);
+ if (pages == -1) {
+ pages = compress_page_with_multi_thread(f, block, offset,
+ bytes_transferred);
+ }
+ }
+ }
return pages;
}
@@ -914,8 +1084,6 @@ static int ram_find_and_save_block(QEMUFile *f, bool
last_stage,
return pages;
}
-static uint64_t bytes_transferred;
-
void acct_update_position(QEMUFile *f, size_t size, bool zero)
{
uint64_t pages = size / TARGET_PAGE_SIZE;
@@ -1129,6 +1297,7 @@ static int ram_save_iterate(QEMUFile *f, void *opaque)
}
i++;
}
+ flush_compressed_data(f);
rcu_read_unlock();
/*
@@ -1170,6 +1339,7 @@ static int ram_save_complete(QEMUFile *f, void *opaque)
}
}
+ flush_compressed_data(f);
ram_control_after_iterate(f, RAM_CONTROL_FINISH);
migration_end();
--
2.4.0
- [Qemu-devel] [PULL 00/16] Migration pull request (v2), Juan Quintela, 2015/05/07
- [Qemu-devel] [PULL 01/16] docs: Add a doc about multiple thread compression, Juan Quintela, 2015/05/07
- [Qemu-devel] [PULL 02/16] migration: Add the framework of multi-thread compression, Juan Quintela, 2015/05/07
- [Qemu-devel] [PULL 03/16] migration: Add the framework of multi-thread decompression, Juan Quintela, 2015/05/07
- [Qemu-devel] [PULL 04/16] qemu-file: Add compression functions to QEMUFile, Juan Quintela, 2015/05/07
- [Qemu-devel] [PULL 06/16] arch_init: Add and free data struct for decompression, Juan Quintela, 2015/05/07
- [Qemu-devel] [PULL 07/16] migration: Split save_zero_page from ram_save_page, Juan Quintela, 2015/05/07
- [Qemu-devel] [PULL 05/16] arch_init: Alloc and free data struct for compression, Juan Quintela, 2015/05/07
- [Qemu-devel] [PULL 08/16] migration: Add the core code of multi-thread compression,
Juan Quintela <=
- [Qemu-devel] [PULL 09/16] migration: Make compression co-work with xbzrle, Juan Quintela, 2015/05/07
- [Qemu-devel] [PULL 11/16] migration: Add interface to control compression, Juan Quintela, 2015/05/07
- [Qemu-devel] [PULL 10/16] migration: Add the core code for decompression, Juan Quintela, 2015/05/07
- [Qemu-devel] [PULL 12/16] migration: Use an array instead of 3 parameters, Juan Quintela, 2015/05/07
- [Qemu-devel] [PULL 15/16] migration: avoid divide by zero in xbzrle cache miss rate, Juan Quintela, 2015/05/07
- [Qemu-devel] [PULL 13/16] migration: Add qmp commands to set and query parameters, Juan Quintela, 2015/05/07
- [Qemu-devel] [PULL 14/16] migration: Add hmp interface to set and query parameters, Juan Quintela, 2015/05/07
- [Qemu-devel] [PULL 16/16] migration: Fix migration state update issue, Juan Quintela, 2015/05/07
- Re: [Qemu-devel] [PULL 00/16] Migration pull request (v2), Peter Maydell, 2015/05/07