[Top][All Lists]
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[Qemu-devel] [RFC V4 8/9] quorum: Add quorum mechanism.
From: |
Benoît Canet |
Subject: |
[Qemu-devel] [RFC V4 8/9] quorum: Add quorum mechanism. |
Date: |
Mon, 20 Aug 2012 13:48:02 +0200 |
Signed-off-by: Benoit Canet <address@hidden>
---
block/quorum.c | 211 +++++++++++++++++++++++++++++++++++++++++++++++++++++++-
1 file changed, 210 insertions(+), 1 deletion(-)
diff --git a/block/quorum.c b/block/quorum.c
index 95f4668..a909ce2 100644
--- a/block/quorum.c
+++ b/block/quorum.c
@@ -14,6 +14,20 @@
*/
#include "block_int.h"
+#include "zlib.h"
+
+typedef struct QuorumVoteItem {
+ int index;
+ QLIST_ENTRY(QuorumVoteItem) next;
+} QuorumVoteItem;
+
+typedef struct QuorumVoteVersion {
+ unsigned long value;
+ int index;
+ int vote_count;
+ QLIST_HEAD(, QuorumVoteItem) items;
+ QLIST_ENTRY(QuorumVoteVersion) next;
+} QuorumVoteVersion;
typedef struct {
BlockDriverState **bs;
@@ -31,6 +45,10 @@ typedef struct QuorumSingleAIOCB {
QuorumAIOCB *parent;
} QuorumSingleAIOCB;
+typedef struct QuorumVotes {
+ QLIST_HEAD(, QuorumVoteVersion) vote_list;
+} QuorumVotes;
+
struct QuorumAIOCB {
BlockDriverAIOCB common;
BDRVQuorumState *bqs;
@@ -48,6 +66,8 @@ struct QuorumAIOCB {
int success_count; /* number of successfully completed AIOCB */
bool *finished; /* completion signal for cancel */
+ QuorumVotes votes;
+
void (*vote)(QuorumAIOCB *acb);
int vote_ret;
};
@@ -191,6 +211,11 @@ static void quorum_aio_bh(void *opaque)
}
qemu_bh_delete(acb->bh);
+
+ if (acb->vote_ret) {
+ ret = acb->vote_ret;
+ }
+
acb->common.cb(acb->common.opaque, ret);
if (acb->finished) {
*acb->finished = true;
@@ -226,6 +251,7 @@ static QuorumAIOCB *quorum_aio_get(BDRVQuorumState *s,
acb->nb_sectors = nb_sectors;
acb->vote = NULL;
acb->vote_ret = 0;
+ QLIST_INIT(&acb->votes.vote_list);
for (i = 0; i < s->total; i++) {
acb->aios[i].buf = NULL;
@@ -253,10 +279,191 @@ static void quorum_aio_cb(void *opaque, int ret)
return;
}
+ /* Do the vote */
+ if (acb->vote) {
+ acb->vote(acb);
+ }
+
acb->bh = qemu_bh_new(quorum_aio_bh, acb);
qemu_bh_schedule(acb->bh);
}
+static void quorum_print_bad(QuorumAIOCB *acb, const char *filename)
+{
+ fprintf(stderr, "quorum: corrected error in quorum file %s: sector_num=%"
+ PRId64 " nb_sectors=%i\n", filename, acb->sector_num,
+ acb->nb_sectors);
+}
+
+static void quorum_print_failure(QuorumAIOCB *acb)
+{
+ fprintf(stderr, "quorum: failure sector_num=%" PRId64 " nb_sectors=%i\n",
+ acb->sector_num, acb->nb_sectors);
+}
+
+static void quorum_print_bad_versions(QuorumAIOCB *acb,
+ unsigned long checksum)
+{
+ QuorumVoteVersion *version;
+ QuorumVoteItem *item;
+ BDRVQuorumState *s = acb->bqs;
+
+ QLIST_FOREACH(version, &acb->votes.vote_list, next) {
+ if (version->value == checksum) {
+ continue;
+ }
+ QLIST_FOREACH(item, &version->items, next) {
+ quorum_print_bad(acb, s->filenames[item->index]);
+ }
+ }
+}
+
+static void quorum_copy_qiov(QEMUIOVector *dest, QEMUIOVector *source)
+{
+ int i;
+ assert(dest->niov == source->niov);
+ assert(dest->size == source->size);
+ for (i = 0; i < source->niov; i++) {
+ assert(dest->iov[i].iov_len == source->iov[i].iov_len);
+ memcpy(dest->iov[i].iov_base,
+ source->iov[i].iov_base,
+ source->iov[i].iov_len);
+ }
+}
+
+static void quorum_count_vote(QuorumVotes *votes,
+ unsigned long checksum,
+ int index)
+{
+ QuorumVoteVersion *v = NULL, *version = NULL;
+ QuorumVoteItem *item;
+
+ /* look if we have something with this checksum */
+ QLIST_FOREACH(v, &votes->vote_list, next) {
+ if (v->value == checksum) {
+ version = v;
+ break;
+ }
+ }
+
+ /* It's a version not yet in the list add it */
+ if (!version) {
+ version = g_new0(QuorumVoteVersion, 1);
+ QLIST_INIT(&version->items);
+ version->value = checksum;
+ version->index = index;
+ version->vote_count = 0;
+ QLIST_INSERT_HEAD(&votes->vote_list, version, next);
+ }
+
+ version->vote_count++;
+
+ item = g_new0(QuorumVoteItem, 1);
+ item->index = index;
+ QLIST_INSERT_HEAD(&version->items, item, next);
+}
+
+static void quorum_free_vote_list(QuorumVotes *votes)
+{
+ QuorumVoteVersion *version, *next_version;
+ QuorumVoteItem *item, *next_item;
+
+ QLIST_FOREACH_SAFE(version, &votes->vote_list, next, next_version) {
+ QLIST_REMOVE(version, next);
+ QLIST_FOREACH_SAFE(item, &version->items, next, next_item) {
+ QLIST_REMOVE(item, next);
+ g_free(item);
+ }
+ g_free(version);
+ }
+}
+
+static unsigned long quorum_compute_checksum(QuorumAIOCB *acb, int i)
+{
+ int j;
+ unsigned long adler = adler32(0L, Z_NULL, 0);
+ QEMUIOVector *qiov = &acb->qiovs[i];
+
+ for (j = 0; j < qiov->niov; j++) {
+ adler = adler32(adler,
+ qiov->iov[j].iov_base,
+ qiov->iov[j].iov_len);
+ }
+
+ return adler;
+}
+
+static void quorum_vote(QuorumAIOCB *acb)
+{
+ bool quorum = true;
+ int i, j;
+ unsigned long checksum = 0;
+ BDRVQuorumState *s = acb->bqs;
+ QuorumVoteVersion *candidate, *winner = NULL;
+
+ /* get the index of the first successfull read */
+ for (i = 0; i < s->total; i++) {
+ if (!acb->aios[i].ret) {
+ break;
+ }
+ }
+
+ /* compare this read with all other successfull read looking for quorum */
+ for (j = i + 1; j < s->total; j++) {
+ if (acb->aios[j].ret) {
+ continue;
+ }
+ if (qemu_iovec_compare(&acb->qiovs[i],
+ &acb->qiovs[j]) != -1) {
+ quorum = false;
+ break;
+ }
+ }
+
+ /* Every successfull read agrees -> Quorum */
+ if (quorum) {
+ quorum_copy_qiov(acb->qiov, &acb->qiovs[i]);
+ return;
+ }
+
+ /* compute checksums for each successfull read, also store indexes */
+ for (i = 0; i < s->total; i++) {
+ if (acb->aios[i].ret) {
+ continue;
+ }
+ checksum = quorum_compute_checksum(acb, i);
+ quorum_count_vote(&acb->votes, checksum, i);
+ }
+
+ /* vote to select the most represented version */
+ i = 0;
+ QLIST_FOREACH(candidate, &acb->votes.vote_list, next) {
+ if (candidate->vote_count > i) {
+ i = candidate->vote_count;
+ winner = candidate;
+ }
+ }
+
+ /* if the winner count is smaller than threshold read fail */
+ if (winner->vote_count < s->threshold) {
+ quorum_print_failure(acb);
+ acb->vote_ret = -EIO;
+ goto free_exit;
+ }
+
+ /* we have a winner: copy it */
+ quorum_copy_qiov(acb->qiov, &acb->qiovs[winner->index]);
+
+ /* if some versions are bad print them */
+ if (i < s->total) {
+ quorum_print_bad_versions(acb, winner->value);
+ }
+
+free_exit:
+ /* free lists */
+ quorum_free_vote_list(&acb->votes);
+}
+
static BlockDriverAIOCB *quorum_aio_readv(BlockDriverState *bs,
int64_t sector_num,
QEMUIOVector *qiov,
@@ -269,6 +476,8 @@ static BlockDriverAIOCB *quorum_aio_readv(BlockDriverState
*bs,
nb_sectors, cb, opaque);
int i;
+ acb->vote = quorum_vote;
+
for (i = 0; i < s->total; i++) {
acb->aios[i].buf = qemu_blockalign(bs->file, qiov->size);
qemu_iovec_init(&acb->qiovs[i], qiov->niov);
@@ -276,7 +485,7 @@ static BlockDriverAIOCB *quorum_aio_readv(BlockDriverState
*bs,
}
for (i = 0; i < s->total; i++) {
- bdrv_aio_readv(s->bs[i], sector_num, qiov, nb_sectors,
+ bdrv_aio_readv(s->bs[i], sector_num, &acb->qiovs[i], nb_sectors,
quorum_aio_cb, &acb->aios[i]);
}
--
1.7.9.5
- [Qemu-devel] [RFC V4 1/9] quorum: Create quorum .c, add QuorumSingleAIOCB and QuorumAIOCB., (continued)
- [Qemu-devel] [RFC V4 5/9] blkverify: Extract qemu_iovec_clone() and qemu_iovec_compare() from blkverify., Benoît Canet, 2012/08/20
- [Qemu-devel] [RFC V4 4/9] quorum: Add quorum_aio_writev and its dependencies., Benoît Canet, 2012/08/20
- [Qemu-devel] [RFC V4 6/9] quorum: Add quorum_co_flush()., Benoît Canet, 2012/08/20
- [Qemu-devel] [RFC V4 7/9] quorum: Add quorum_aio_readv., Benoît Canet, 2012/08/20
- [Qemu-devel] [RFC V4 8/9] quorum: Add quorum mechanism.,
Benoît Canet <=
- [Qemu-devel] [RFC V4 9/9] quorum: Add quorum_getlength()., Benoît Canet, 2012/08/20