[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[Qemu-devel] [RFC 5/9] block: add block job transactions
From: |
Stefan Hajnoczi |
Subject: |
[Qemu-devel] [RFC 5/9] block: add block job transactions |
Date: |
Fri, 12 Jun 2015 11:09:17 +0100 |
Sometimes block jobs must execute as a transaction group. Finishing
jobs wait until all other jobs are ready to complete successfully.
Failure or cancellation of one job cancels the other jobs in the group.
Signed-off-by: Stefan Hajnoczi <address@hidden>
---
blockjob.c | 160 ++++++++++++++++++++++++++++++++++++++++++++++
include/block/block.h | 1 +
include/block/block_int.h | 3 +-
include/block/blockjob.h | 49 ++++++++++++++
trace-events | 4 ++
5 files changed, 216 insertions(+), 1 deletion(-)
diff --git a/blockjob.c b/blockjob.c
index 2755465..ff622f5 100644
--- a/blockjob.c
+++ b/blockjob.c
@@ -399,3 +399,163 @@ void block_job_defer_to_main_loop(BlockJob *job,
qemu_bh_schedule(data->bh);
}
+
+/* Transactional group of block jobs */
+struct BlockJobTxn {
+ /* Jobs may be in different AioContexts so protect all fields */
+ QemuMutex lock;
+
+ /* Reference count for txn object */
+ unsigned int ref;
+
+ /* Is this txn cancelling its jobs? */
+ bool aborting;
+
+ /* Number of jobs still running */
+ unsigned int jobs_pending;
+
+ /* List of jobs */
+ QLIST_HEAD(, BlockJob) jobs;
+};
+
+BlockJobTxn *block_job_txn_new(void)
+{
+ BlockJobTxn *txn = g_new(BlockJobTxn, 1);
+ qemu_mutex_init(&txn->lock);
+ txn->ref = 1; /* dropped by block_job_txn_begin() */
+ txn->aborting = false;
+ txn->jobs_pending = 0;
+ QLIST_INIT(&txn->jobs);
+ return txn;
+}
+
+static void block_job_txn_unref(BlockJobTxn *txn)
+{
+ qemu_mutex_lock(&txn->lock);
+
+ if (--txn->ref > 0) {
+ qemu_mutex_unlock(&txn->lock);
+ return;
+ }
+
+ qemu_mutex_unlock(&txn->lock);
+ qemu_mutex_destroy(&txn->lock);
+ g_free(txn);
+}
+
+/* The purpose of this is to keep txn alive until all jobs have been added */
+void block_job_txn_begin(BlockJobTxn *txn)
+{
+ block_job_txn_unref(txn);
+}
+
+void block_job_txn_add_job(BlockJobTxn *txn, BlockJob *job)
+{
+ if (!txn) {
+ return;
+ }
+
+ assert(!job->txn);
+ job->txn = txn;
+
+ qemu_mutex_lock(&txn->lock);
+ txn->ref++;
+ txn->jobs_pending++;
+ QLIST_INSERT_HEAD(&txn->jobs, job, txn_list);
+ qemu_mutex_unlock(&txn->lock);
+}
+
+/* Cancel all other jobs in case of abort, wake all waiting jobs in case of
+ * successful completion. Runs from main loop.
+ */
+static void block_job_txn_complete(BlockJob *job, void *opaque)
+{
+ BlockJobTxn *txn = opaque;
+ BlockJob *other_job;
+ bool aborting = txn->aborting;
+
+ qemu_mutex_lock(&txn->lock);
+ txn->ref++; /* keep txn alive until the end of this loop */
+
+ QLIST_FOREACH(other_job, &txn->jobs, txn_list) {
+ AioContext *ctx;
+
+ qemu_mutex_unlock(&txn->lock);
+ ctx = bdrv_get_aio_context(other_job->bs);
+ aio_context_acquire(ctx);
+
+ /* Cancel all other jobs if aborting. Don't cancel our own failed job
+ * since cancellation throws away the error value.
+ */
+ if (aborting && other_job != job) {
+ block_job_cancel(other_job);
+ } else {
+ block_job_enter(other_job);
+ }
+
+ aio_context_release(ctx);
+ qemu_mutex_lock(&txn->lock);
+ }
+
+ qemu_mutex_unlock(&txn->lock);
+ block_job_txn_unref(txn);
+}
+
+void coroutine_fn block_job_txn_prepare_to_complete(BlockJobTxn *txn,
+ BlockJob *job,
+ int ret)
+{
+ if (!txn) {
+ return;
+ }
+
+ qemu_mutex_lock(&txn->lock);
+
+ /* This function is entered in 3 cases:
+ *
+ * 1. Successful job completion - wait for other jobs
+ * 2. First failed/cancelled job in txn - cancel other jobs and wait
+ * 3. Subsequent cancelled jobs - finish immediately, don't wait
+ */
+ trace_block_job_txn_prepare_to_complete_entry(txn, job, ret,
+ block_job_is_cancelled(job),
+ txn->aborting,
+ txn->jobs_pending);
+
+ if (txn->aborting) { /* Case 3 */
+ assert(block_job_is_cancelled(job));
+ goto out; /* already cancelled, don't yield */
+ }
+
+ if (ret != 0 || block_job_is_cancelled(job)) { /* Case 2 */
+abort:
+ txn->aborting = true;
+ block_job_defer_to_main_loop(job, block_job_txn_complete, txn);
+ } else { /* Case 1 */
+ if (--txn->jobs_pending == 0) {
+ block_job_defer_to_main_loop(job, block_job_txn_complete, txn);
+ }
+ }
+
+ /* Wait for block_job_txn_complete() */
+ do {
+ qemu_mutex_unlock(&txn->lock);
+ job->busy = false;
+ qemu_coroutine_yield();
+ job->busy = true;
+ qemu_mutex_lock(&txn->lock);
+
+ if (block_job_is_cancelled(job) && !txn->aborting) {
+ goto abort; /* this job just got cancelled by the user */
+ }
+ } while (!txn->aborting && txn->jobs_pending > 0);
+
+out:
+ trace_block_job_txn_prepare_to_complete_return(txn, job, ret,
+ block_job_is_cancelled(job),
+ txn->aborting,
+ txn->jobs_pending);
+
+ qemu_mutex_unlock(&txn->lock);
+ block_job_txn_unref(txn);
+}
diff --git a/include/block/block.h b/include/block/block.h
index 3d62d3e..439925c 100644
--- a/include/block/block.h
+++ b/include/block/block.h
@@ -12,6 +12,7 @@
/* block.c */
typedef struct BlockDriver BlockDriver;
typedef struct BlockJob BlockJob;
+typedef struct BlockJobTxn BlockJobTxn;
typedef struct BlockDriverInfo {
/* in bytes, 0 if irrelevant */
diff --git a/include/block/block_int.h b/include/block/block_int.h
index 5af712f..9464142 100644
--- a/include/block/block_int.h
+++ b/include/block/block_int.h
@@ -617,6 +617,7 @@ void mirror_start(BlockDriverState *bs, BlockDriverState
*target,
* @on_source_error: The action to take upon error reading from the source.
* @on_target_error: The action to take upon error writing to the target.
* @cb: Completion function for the job.
+ * @txn: Transaction that this job is part of (may be NULL).
* @opaque: Opaque pointer value passed to @cb.
*
* Start a backup operation on @bs. Clusters in @bs are written to @target
@@ -628,7 +629,7 @@ void backup_start(BlockDriverState *bs, BlockDriverState
*target,
BlockdevOnError on_source_error,
BlockdevOnError on_target_error,
BlockCompletionFunc *cb, void *opaque,
- Error **errp);
+ BlockJobTxn *txn, Error **errp);
void blk_dev_change_media_cb(BlockBackend *blk, bool load);
bool blk_dev_has_removable_media(BlockBackend *blk);
diff --git a/include/block/blockjob.h b/include/block/blockjob.h
index 57d8ef1..ce57e98 100644
--- a/include/block/blockjob.h
+++ b/include/block/blockjob.h
@@ -122,6 +122,10 @@ struct BlockJob {
/** The opaque value that is passed to the completion function. */
void *opaque;
+
+ /** Non-NULL if this job is part of a transaction */
+ BlockJobTxn *txn;
+ QLIST_ENTRY(BlockJob) txn_list;
};
/**
@@ -348,4 +352,49 @@ void block_job_defer_to_main_loop(BlockJob *job,
BlockJobDeferToMainLoopFn *fn,
void *opaque);
+/**
+ * block_job_txn_new:
+ *
+ * Allocate and return a new block job transaction. Jobs can be added to the
+ * transaction using block_job_txn_add_job(). block_job_txn_begin() must be
+ * called when all jobs (if any) have been added.
+ *
+ * All jobs in the transaction either complete successfully or fail/cancel as a
+ * group. Jobs wait for each other before completing. Cancelling one job
+ * cancels all jobs in the transaction.
+ */
+BlockJobTxn *block_job_txn_new(void);
+
+/**
+ * block_job_txn_add_job:
+ * @txn: The transaction
+ * @job: Job to add to the transaction
+ *
+ * Add @job to the transaction. The @job must not already be in a transaction.
+ * The block job driver must call block_job_txn_prepare_to_complete() before
+ * final cleanup and completion.
+ */
+void block_job_txn_add_job(BlockJobTxn *txn, BlockJob *job);
+
+/**
+ * block_job_txn_begin:
+ * @txn: The transaction
+ *
+ * Call this to mark the end of adding jobs to the transaction. This must be
+ * called even if no jobs were added.
+ */
+void block_job_txn_begin(BlockJobTxn *txn);
+
+/**
+ * block_job_txn_prepare_to_complete:
+ * @txn: The transaction
+ * @job: The block job
+ * @ret: Block job return value (0 for success, otherwise job failure)
+ *
+ * Wait for other jobs in the transaction to complete. If @ret is non-zero or
+ * @job is cancelled, all other jobs in the transaction will be cancelled.
+ */
+void coroutine_fn block_job_txn_prepare_to_complete(BlockJobTxn *txn,
+ BlockJob *job, int ret);
+
#endif
diff --git a/trace-events b/trace-events
index a589650..526e6ac 100644
--- a/trace-events
+++ b/trace-events
@@ -123,6 +123,10 @@ virtio_blk_data_plane_start(void *s) "dataplane %p"
virtio_blk_data_plane_stop(void *s) "dataplane %p"
virtio_blk_data_plane_process_request(void *s, unsigned int out_num, unsigned
int in_num, unsigned int head) "dataplane %p out_num %u in_num %u head %u"
+# blockjob.c
+block_job_txn_prepare_to_complete_entry(void *txn, void *job, int ret, bool
cancelled, bool aborting, unsigned int jobs_pending) "txn %p job %p ret %d
cancelled %d aborting %d jobs_pending %u"
+block_job_txn_prepare_to_complete_return(void *txn, void *job, int ret, bool
cancelled, bool aborting, unsigned int jobs_pending) "txn %p job %p ret %d
cancelled %d aborting %d jobs_pending %u"
+
# hw/virtio/dataplane/vring.c
vring_setup(uint64_t physical, void *desc, void *avail, void *used) "vring
physical %#"PRIx64" desc %p avail %p used %p"
--
2.4.2
- [Qemu-devel] [RFC 0/9] block: incremental backup transactions using BlockJobTxn, Stefan Hajnoczi, 2015/06/12
- [Qemu-devel] [RFC 1/9] qapi: Add transaction support to block-dirty-bitmap operations, Stefan Hajnoczi, 2015/06/12
- [Qemu-devel] [RFC 2/9] iotests: add transactional incremental backup test, Stefan Hajnoczi, 2015/06/12
- [Qemu-devel] [RFC 3/9] block: rename BlkTransactionState and BdrvActionOps, Stefan Hajnoczi, 2015/06/12
- [Qemu-devel] [RFC 4/9] block: keep bitmap if incremental backup job is cancelled, Stefan Hajnoczi, 2015/06/12
- [Qemu-devel] [RFC 5/9] block: add block job transactions,
Stefan Hajnoczi <=
- [Qemu-devel] [RFC 6/9] blockdev: make BlockJobTxn available to qmp 'transaction', Stefan Hajnoczi, 2015/06/12
- [Qemu-devel] [RFC 7/9] block/backup: support block job transactions, Stefan Hajnoczi, 2015/06/12
- [Qemu-devel] [RFC 8/9] iotests: 124 - transactional failure test, Stefan Hajnoczi, 2015/06/12
- [Qemu-devel] [RFC 9/9] qmp-commands.hx: Update the supported 'transaction' operations, Stefan Hajnoczi, 2015/06/12