[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[Qemu-devel] [PATCH 1/3] migration: rework compression code for adding m
From: |
Denis Plotnikov |
Subject: |
[Qemu-devel] [PATCH 1/3] migration: rework compression code for adding more data compressors |
Date: |
Tue, 26 Feb 2019 16:15:33 +0300 |
Also, the patch adds new migration parameter parameter: compress-type
to be able choose between data compressors available.
By the moment, the only available data compressor is gzip (zlib)
Signed-off-by: Denis Plotnikov <address@hidden>
---
migration/migration.c | 42 ++++++++-
migration/migration.h | 1 +
migration/qemu-file.c | 39 +++------
migration/qemu-file.h | 17 +++-
migration/ram.c | 196 +++++++++++++++++++++++++++++++-----------
qapi/migration.json | 26 ++++--
6 files changed, 236 insertions(+), 85 deletions(-)
diff --git a/migration/migration.c b/migration/migration.c
index 37e06b76dc..10cecb0eeb 100644
--- a/migration/migration.c
+++ b/migration/migration.c
@@ -739,6 +739,8 @@ MigrationParameters *qmp_query_migrate_parameters(Error
**errp)
params->max_postcopy_bandwidth = s->parameters.max_postcopy_bandwidth;
params->has_max_cpu_throttle = true;
params->max_cpu_throttle = s->parameters.max_cpu_throttle;
+ params->has_compress_type = true;
+ params->compress_type = s->parameters.compress_type;
return params;
}
@@ -1027,10 +1029,27 @@ void
qmp_migrate_set_capabilities(MigrationCapabilityStatusList *params,
*/
static bool migrate_params_check(MigrationParameters *params, Error **errp)
{
+ int max_compress_level = -1;
+
+ if (params->has_compress_type) {
+ switch (params->compress_type) {
+ case COMPRESSION_TYPE_ZLIB:
+ max_compress_level = 9;
+ break;
+ default:
+ error_setg(errp, QERR_INVALID_PARAMETER_VALUE, "compress_type",
+ "values: 0 - gzip");
+ return false;
+ }
+ }
+
if (params->has_compress_level &&
- (params->compress_level > 9)) {
+ (params->compress_level > max_compress_level)) {
+ char level_range_msg[30];
+ snprintf(level_range_msg, 30, "values from 0 to %d",
+ max_compress_level);
error_setg(errp, QERR_INVALID_PARAMETER_VALUE, "compress_level",
- "is invalid, it should be in the range of 0 to 9");
+ level_range_msg);
return false;
}
@@ -1125,6 +1144,9 @@ static void
migrate_params_test_apply(MigrateSetParameters *params,
*dest = migrate_get_current()->parameters;
/* TODO use QAPI_CLONE() instead of duplicating it inline */
+ if (params->has_compress_type) {
+ dest->compress_type = params->compress_type;
+ }
if (params->has_compress_level) {
dest->compress_level = params->compress_level;
@@ -1272,6 +1294,9 @@ static void migrate_params_apply(MigrateSetParameters
*params, Error **errp)
if (params->has_max_cpu_throttle) {
s->parameters.max_cpu_throttle = params->max_cpu_throttle;
}
+ if (params->has_compress_type) {
+ s->parameters.compress_type = params->compress_type;
+ }
}
void qmp_migrate_set_parameters(MigrateSetParameters *params, Error **errp)
@@ -1938,6 +1963,15 @@ bool migrate_use_compression(void)
return s->enabled_capabilities[MIGRATION_CAPABILITY_COMPRESS];
}
+int migrate_compress_type(void)
+{
+ MigrationState *s;
+
+ s = migrate_get_current();
+
+ return s->parameters.compress_type;
+}
+
int migrate_compress_level(void)
{
MigrationState *s;
@@ -3234,6 +3268,9 @@ static Property migration_properties[] = {
decompress_error_check, true),
/* Migration parameters */
+ DEFINE_PROP_UINT8("x-compress-type", MigrationState,
+ parameters.compress_type,
+ COMPRESSION_TYPE_ZLIB),
DEFINE_PROP_UINT8("x-compress-level", MigrationState,
parameters.compress_level,
DEFAULT_MIGRATE_COMPRESS_LEVEL),
@@ -3346,6 +3383,7 @@ static void migration_instance_init(Object *obj)
params->has_xbzrle_cache_size = true;
params->has_max_postcopy_bandwidth = true;
params->has_max_cpu_throttle = true;
+ params->has_compress_type = true;
qemu_sem_init(&ms->postcopy_pause_sem, 0);
qemu_sem_init(&ms->postcopy_pause_rp_sem, 0);
diff --git a/migration/migration.h b/migration/migration.h
index dcd05d9f87..ddb9efec86 100644
--- a/migration/migration.h
+++ b/migration/migration.h
@@ -280,6 +280,7 @@ bool migrate_use_return_path(void);
uint64_t ram_get_total_transferred_pages(void);
bool migrate_use_compression(void);
+int migrate_compress_type(void);
int migrate_compress_level(void);
int migrate_compress_threads(void);
int migrate_compress_wait_thread(void);
diff --git a/migration/qemu-file.c b/migration/qemu-file.c
index 977b9ae07c..cd95749aa6 100644
--- a/migration/qemu-file.c
+++ b/migration/qemu-file.c
@@ -662,28 +662,10 @@ uint64_t qemu_get_be64(QEMUFile *f)
return v;
}
-/* return the size after compression, or negative value on error */
-static int qemu_compress_data(z_stream *stream, uint8_t *dest, size_t dest_len,
+static int qemu_compress_data(Compression *comp, uint8_t *dest, size_t
dest_len,
const uint8_t *source, size_t source_len)
{
- int err;
-
- err = deflateReset(stream);
- if (err != Z_OK) {
- return -1;
- }
-
- stream->avail_in = source_len;
- stream->next_in = (uint8_t *)source;
- stream->avail_out = dest_len;
- stream->next_out = dest;
-
- err = deflate(stream, Z_FINISH);
- if (err != Z_STREAM_END) {
- return -1;
- }
-
- return stream->next_out - dest;
+ return comp->process(comp, dest, dest_len, source, source_len);
}
/* Compress size bytes of data start at p and store the compressed
@@ -695,23 +677,30 @@ static int qemu_compress_data(z_stream *stream, uint8_t
*dest, size_t dest_len,
* do fflush first, if f still has no space to save the compressed
* data, return -1.
*/
-ssize_t qemu_put_compression_data(QEMUFile *f, z_stream *stream,
+ssize_t qemu_put_compression_data(QEMUFile *f, Compression *comp,
const uint8_t *p, size_t size)
{
- ssize_t blen = IO_BUF_SIZE - f->buf_index - sizeof(int32_t);
+ int blen = IO_BUF_SIZE - f->buf_index - sizeof(int32_t);
+ unsigned long bound;
- if (blen < compressBound(size)) {
+ bound = comp->get_bound(size);
+
+ if (blen < bound) {
if (!qemu_file_is_writable(f)) {
+ error_report("compression: qemu file is not writable");
return -1;
}
+
qemu_fflush(f);
blen = IO_BUF_SIZE - sizeof(int32_t);
- if (blen < compressBound(size)) {
+ if (blen < bound) {
+ error_report("compression: io buffer is too small:%d needed: %lu",
+ IO_BUF_SIZE, bound);
return -1;
}
}
- blen = qemu_compress_data(stream, f->buf + f->buf_index + sizeof(int32_t),
+ blen = qemu_compress_data(comp, f->buf + f->buf_index + sizeof(int32_t),
blen, p, size);
if (blen < 0) {
return -1;
diff --git a/migration/qemu-file.h b/migration/qemu-file.h
index 2ccfcfb2a8..24cf0d7e25 100644
--- a/migration/qemu-file.h
+++ b/migration/qemu-file.h
@@ -115,6 +115,21 @@ typedef struct QEMUFileHooks {
QEMURamSaveFunc *save_page;
} QEMUFileHooks;
+typedef enum CompressionType {
+ COMPRESSION_TYPE_ZLIB = 0,
+} CompressionType;
+
+struct Compression {
+ CompressionType type;
+ bool is_decompression;
+ void *stream;
+ int (*process)(struct Compression *comp, uint8_t *dest, size_t dest_len,
+ const uint8_t *source, size_t source_len);
+ unsigned long (*get_bound)(unsigned long);
+};
+
+typedef struct Compression Compression;
+
QEMUFile *qemu_fopen_ops(void *opaque, const QEMUFileOps *ops);
void qemu_file_set_hooks(QEMUFile *f, const QEMUFileHooks *hooks);
int qemu_get_fd(QEMUFile *f);
@@ -134,7 +149,7 @@ bool qemu_file_is_writable(QEMUFile *f);
size_t qemu_peek_buffer(QEMUFile *f, uint8_t **buf, size_t size, size_t
offset);
size_t qemu_get_buffer_in_place(QEMUFile *f, uint8_t **buf, size_t size);
-ssize_t qemu_put_compression_data(QEMUFile *f, z_stream *stream,
+ssize_t qemu_put_compression_data(QEMUFile *f, Compression *comp,
const uint8_t *p, size_t size);
int qemu_put_qemu_file(QEMUFile *f_des, QEMUFile *f_src);
diff --git a/migration/ram.c b/migration/ram.c
index 59191c1ed2..9ff154ed7b 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -360,8 +360,8 @@ struct CompressParam {
ram_addr_t offset;
/* internally used fields */
- z_stream stream;
uint8_t *originbuf;
+ Compression comp;
};
typedef struct CompressParam CompressParam;
@@ -373,7 +373,7 @@ struct DecompressParam {
void *des;
uint8_t *compbuf;
int len;
- z_stream stream;
+ Compression comp;
};
typedef struct DecompressParam DecompressParam;
@@ -394,8 +394,114 @@ static QemuThread *decompress_threads;
static QemuMutex decomp_done_lock;
static QemuCond decomp_done_cond;
-static bool do_compress_ram_page(QEMUFile *f, z_stream *stream, RAMBlock
*block,
- ram_addr_t offset, uint8_t *source_buf);
+static bool do_compress_ram_page(QEMUFile *f, Compression *comp,
+ RAMBlock *block, ram_addr_t offset,
+ uint8_t *source_buf);
+
+static int zlib_compress(Compression *comp, uint8_t *dest, size_t dest_len,
+ const uint8_t *source, size_t source_len)
+{
+ int err;
+ z_stream *stream = comp->stream;
+
+ err = deflateReset(comp->stream);
+ if (err != Z_OK) {
+ return -1;
+ }
+
+ stream->avail_in = source_len;
+ stream->next_in = (uint8_t *)source;
+ stream->avail_out = dest_len;
+ stream->next_out = dest;
+
+ err = deflate(stream, Z_FINISH);
+ if (err != Z_STREAM_END) {
+ return -1;
+ }
+
+ return stream->next_out - dest;
+}
+
+static int zlib_decompress(Compression *comp, uint8_t *dest, size_t dest_len,
+ const uint8_t *source, size_t source_len)
+{
+ int err;
+ z_stream *stream = comp->stream;
+
+ err = inflateReset(stream);
+ if (err != Z_OK) {
+ return -1;
+ }
+
+ stream->avail_in = source_len;
+ stream->next_in = (uint8_t *)source;
+ stream->avail_out = dest_len;
+ stream->next_out = dest;
+
+ err = inflate(stream, Z_NO_FLUSH);
+ if (err != Z_STREAM_END) {
+ return -1;
+ }
+
+ return stream->total_out;
+}
+
+static int init_compression(Compression *comp, CompressionType type,
+ bool is_decompression)
+{
+ int res;
+
+ switch (type) {
+ case COMPRESSION_TYPE_ZLIB:
+ comp->stream = g_new0(z_stream, 1);
+
+ if (is_decompression) {
+ res = inflateInit(comp->stream);
+ } else {
+ res = deflateInit(comp->stream, migrate_compress_level());
+ }
+
+ if (res != Z_OK) {
+ g_free(comp->stream);
+ return 1;
+ }
+
+ if (is_decompression) {
+ comp->process = zlib_decompress;
+ } else {
+ comp->process = zlib_compress;
+ }
+
+ comp->get_bound = compressBound;
+ break;
+ default:
+ return 1;
+ }
+
+ comp->type = type;
+ comp->is_decompression = is_decompression;
+ return 0;
+}
+
+static void destroy_compression(Compression *comp)
+{
+ assert(comp);
+
+ switch (comp->type) {
+ case COMPRESSION_TYPE_ZLIB:
+ if (comp->is_decompression) {
+ inflateEnd(comp->stream);
+ } else {
+ deflateEnd(comp->stream);
+ }
+ g_free(comp->stream);
+ break;
+ default:
+ assert(false);
+ }
+
+ memset(comp, 0, sizeof(Compression));
+}
static void *do_data_compress(void *opaque)
{
@@ -412,7 +518,7 @@ static void *do_data_compress(void *opaque)
param->block = NULL;
qemu_mutex_unlock(¶m->mutex);
- zero_page = do_compress_ram_page(param->file, ¶m->stream,
+ zero_page = do_compress_ram_page(param->file, ¶m->comp,
block, offset, param->originbuf);
qemu_mutex_lock(&comp_done_lock);
@@ -457,7 +563,7 @@ static void compress_threads_save_cleanup(void)
qemu_thread_join(compress_threads + i);
qemu_mutex_destroy(&comp_param[i].mutex);
qemu_cond_destroy(&comp_param[i].cond);
- deflateEnd(&comp_param[i].stream);
+ destroy_compression(&comp_param->comp);
g_free(comp_param[i].originbuf);
qemu_fclose(comp_param[i].file);
comp_param[i].file = NULL;
@@ -480,31 +586,32 @@ static int compress_threads_save_setup(void)
thread_count = migrate_compress_threads();
compress_threads = g_new0(QemuThread, thread_count);
comp_param = g_new0(CompressParam, thread_count);
+
qemu_cond_init(&comp_done_cond);
qemu_mutex_init(&comp_done_lock);
for (i = 0; i < thread_count; i++) {
- comp_param[i].originbuf = g_try_malloc(TARGET_PAGE_SIZE);
- if (!comp_param[i].originbuf) {
+ CompressParam *comp = &comp_param[i];
+
+ comp->originbuf = g_try_malloc(TARGET_PAGE_SIZE);
+ if (!comp->originbuf) {
goto exit;
}
- if (deflateInit(&comp_param[i].stream,
- migrate_compress_level()) != Z_OK) {
- g_free(comp_param[i].originbuf);
+ if (init_compression(&comp->comp, migrate_compress_type(), false)) {
+ g_free(comp->originbuf);
goto exit;
}
/* comp_param[i].file is just used as a dummy buffer to save data,
* set its ops to empty.
*/
- comp_param[i].file = qemu_fopen_ops(NULL, &empty_ops);
- comp_param[i].done = true;
- comp_param[i].quit = false;
- qemu_mutex_init(&comp_param[i].mutex);
- qemu_cond_init(&comp_param[i].cond);
- qemu_thread_create(compress_threads + i, "compress",
- do_data_compress, comp_param + i,
- QEMU_THREAD_JOINABLE);
+ comp->file = qemu_fopen_ops(NULL, &empty_ops);
+ comp->done = true;
+ comp->quit = false;
+ qemu_mutex_init(&comp->mutex);
+ qemu_cond_init(&comp->cond);
+ qemu_thread_create(compress_threads + i, "compress", do_data_compress,
+ comp, QEMU_THREAD_JOINABLE);
}
return 0;
@@ -1890,8 +1997,9 @@ static int ram_save_multifd_page(RAMState *rs, RAMBlock
*block,
return 1;
}
-static bool do_compress_ram_page(QEMUFile *f, z_stream *stream, RAMBlock
*block,
- ram_addr_t offset, uint8_t *source_buf)
+static bool do_compress_ram_page(QEMUFile *f, Compression *comp,
+ RAMBlock *block, ram_addr_t offset,
+ uint8_t *source_buf)
{
RAMState *rs = ram_state;
uint8_t *p = block->host + (offset & TARGET_PAGE_MASK);
@@ -1911,7 +2019,7 @@ static bool do_compress_ram_page(QEMUFile *f, z_stream
*stream, RAMBlock *block,
* decompression
*/
memcpy(source_buf, p, TARGET_PAGE_SIZE);
- ret = qemu_put_compression_data(f, stream, source_buf, TARGET_PAGE_SIZE);
+ ret = qemu_put_compression_data(f, comp, source_buf, TARGET_PAGE_SIZE);
if (ret < 0) {
qemu_file_set_error(migrate_get_current()->to_dst_file, ret);
error_report("compressed data failed!");
@@ -3502,28 +3610,14 @@ void ram_handle_compressed(void *host, uint8_t ch,
uint64_t size)
}
/* return the size after decompression, or negative value on error */
-static int
-qemu_uncompress_data(z_stream *stream, uint8_t *dest, size_t dest_len,
- const uint8_t *source, size_t source_len)
+static int qemu_uncompress_data(Compression *comp, uint8_t *dest,
+ size_t dest_len, const uint8_t *source,
+ size_t source_len)
{
- int err;
-
- err = inflateReset(stream);
- if (err != Z_OK) {
+ if (source_len > comp->get_bound(TARGET_PAGE_SIZE)) {
return -1;
}
-
- stream->avail_in = source_len;
- stream->next_in = (uint8_t *)source;
- stream->avail_out = dest_len;
- stream->next_out = dest;
-
- err = inflate(stream, Z_NO_FLUSH);
- if (err != Z_STREAM_END) {
- return -1;
- }
-
- return stream->total_out;
+ return comp->process(comp, dest, dest_len, source, source_len);
}
static void *do_data_decompress(void *opaque)
@@ -3543,7 +3637,7 @@ static void *do_data_decompress(void *opaque)
pagesize = TARGET_PAGE_SIZE;
- ret = qemu_uncompress_data(¶m->stream, des, pagesize,
+ ret = qemu_uncompress_data(¶m->comp, des, pagesize,
param->compbuf, len);
if (ret < 0 && migrate_get_current()->decompress_error_check) {
error_report("decompress data failed");
@@ -3614,7 +3708,7 @@ static void compress_threads_load_cleanup(void)
qemu_thread_join(decompress_threads + i);
qemu_mutex_destroy(&decomp_param[i].mutex);
qemu_cond_destroy(&decomp_param[i].cond);
- inflateEnd(&decomp_param[i].stream);
+ destroy_compression(&decomp_param[i].comp);
g_free(decomp_param[i].compbuf);
decomp_param[i].compbuf = NULL;
}
@@ -3640,15 +3734,17 @@ static int compress_threads_load_setup(QEMUFile *f)
qemu_cond_init(&decomp_done_cond);
decomp_file = f;
for (i = 0; i < thread_count; i++) {
- if (inflateInit(&decomp_param[i].stream) != Z_OK) {
+ DecompressParam *decomp = &decomp_param[i];
+
+ if (init_compression(&decomp->comp, migrate_compress_type(), true)) {
goto exit;
}
- decomp_param[i].compbuf = g_malloc0(compressBound(TARGET_PAGE_SIZE));
- qemu_mutex_init(&decomp_param[i].mutex);
- qemu_cond_init(&decomp_param[i].cond);
- decomp_param[i].done = true;
- decomp_param[i].quit = false;
+ decomp->compbuf = g_malloc0(decomp->comp.get_bound(TARGET_PAGE_SIZE));
+ qemu_mutex_init(&decomp->mutex);
+ qemu_cond_init(&decomp->cond);
+ decomp->done = true;
+ decomp->quit = false;
qemu_thread_create(decompress_threads + i, "decompress",
do_data_decompress, decomp_param + i,
QEMU_THREAD_JOINABLE);
@@ -4169,7 +4265,7 @@ static int ram_load(QEMUFile *f, void *opaque, int
version_id)
case RAM_SAVE_FLAG_COMPRESS_PAGE:
len = qemu_get_be32(f);
- if (len < 0 || len > compressBound(TARGET_PAGE_SIZE)) {
+ if (len < 0) {
error_report("Invalid compressed data length: %d", len);
ret = -EINVAL;
break;
diff --git a/qapi/migration.json b/qapi/migration.json
index 7a795ecc16..9a3110e383 100644
--- a/qapi/migration.json
+++ b/qapi/migration.json
@@ -480,10 +480,15 @@
#
# Migration parameters enumeration
#
+# @compress-type: Set the compression type to be used in live migration,
+# the compression type is an integer from the list:
+# 0 - gzip
+#
# @compress-level: Set the compression level to be used in live migration,
-# the compression level is an integer between 0 and 9, where 0 means
-# no compression, 1 means the best compression speed, and 9 means best
-# compression ratio which will consume more CPU.
+# the compression level is an integer between 0 and 9,
+# where 0 means no compression, 1 means the best compression speed,
+# and the highest value depending on the compression type means
+# the best compression ratio which will consume more CPU.
#
# @compress-threads: Set compression thread count to be used in live migration,
# the compression thread count is an integer between 1 and 255.
@@ -560,8 +565,8 @@
# Since: 2.4
##
{ 'enum': 'MigrationParameter',
- 'data': ['compress-level', 'compress-threads', 'decompress-threads',
- 'compress-wait-thread',
+ 'data': ['compress-type', 'compress-level', 'compress-threads',
+ 'decompress-threads', 'compress-wait-thread',
'cpu-throttle-initial', 'cpu-throttle-increment',
'tls-creds', 'tls-hostname', 'max-bandwidth',
'downtime-limit', 'x-checkpoint-delay', 'block-incremental',
@@ -572,6 +577,9 @@
##
# @MigrateSetParameters:
#
+# @compress-type: Compression type is used for migration.
+# Available types: 0 - gzip
+#
# @compress-level: compression level
#
# @compress-threads: compression thread count
@@ -653,7 +661,8 @@
# TODO either fuse back into MigrationParameters, or make
# MigrationParameters members mandatory
{ 'struct': 'MigrateSetParameters',
- 'data': { '*compress-level': 'int',
+ 'data': { '*compress-type': 'int',
+ '*compress-level': 'int',
'*compress-threads': 'int',
'*compress-wait-thread': 'bool',
'*decompress-threads': 'int',
@@ -692,6 +701,8 @@
#
# The optional members aren't actually optional.
#
+# @compress-type: compression type
+#
# @compress-level: compression level
#
# @compress-threads: compression thread count
@@ -769,7 +780,8 @@
# Since: 2.4
##
{ 'struct': 'MigrationParameters',
- 'data': { '*compress-level': 'uint8',
+ 'data': { '*compress-type': 'uint8',
+ '*compress-level': 'uint8',
'*compress-threads': 'uint8',
'*compress-wait-thread': 'bool',
'*decompress-threads': 'uint8',
--
2.17.0