[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
Re: [Qemu-devel] [PATCH v4 1/3] block: Support Archipelago as a QEMU blo
From: |
Stefan Hajnoczi |
Subject: |
Re: [Qemu-devel] [PATCH v4 1/3] block: Support Archipelago as a QEMU block backend |
Date: |
Fri, 20 Jun 2014 22:33:46 +0800 |
User-agent: |
Mutt/1.5.23 (2014-03-12) |
On Thu, Jun 19, 2014 at 05:48:46PM +0300, Chrysostomos Nanakos wrote:
> +typedef struct BDRVArchipelagoState {
> + int fds[2];
> + int qemu_aio_count;
This field is never used. It's increment and decremented but nothing
ever checks the value. It can be dropped.
> + int event_reader_pos;
> + ArchipelagoAIOCB *event_acb;
> + const char *volname;
> + uint64_t size;
> + /* Archipelago specific */
> + struct xseg *xseg;
> + struct xseg_port *port;
> + xport srcport;
> + xport sport;
> + xport mportno;
> + xport vportno;
> + QemuMutex archip_mutex;
> + QemuCond archip_cond;
> + bool is_signaled;
> + /* Request handler specific */
> + QemuThread request_th;
> + QemuCond request_cond;
> + QemuMutex request_mutex;
> + bool th_is_signaled;
> + bool stopping;
> +} BDRVArchipelagoState;
> +
> +typedef struct ArchipelagoSegmentedRequest {
> + size_t count;
> + size_t total;
> + int ref;
> + int failed;
> +} ArchipelagoSegmentedRequest;
> +
> +typedef struct AIORequestData {
> + const char *volname;
> + off_t offset;
> + size_t size;
> + uint64_t bufidx;
> + int ret;
> + int op;
> + ArchipelagoAIOCB *aio_cb;
> + ArchipelagoSegmentedRequest *segreq;
> +} AIORequestData;
> +
> +
> +static int qemu_archipelago_signal_pipe(ArchipelagoAIOCB *aio_cb);
> +
> +static void init_local_signal(struct xseg *xseg, xport sport, xport srcport)
> +{
> + if (xseg && (sport != srcport)) {
> + xseg_init_local_signal(xseg, srcport);
> + sport = srcport;
> + }
> +}
QEMU should clean up by calling xseg_quit_local_signal().
> +
> +static void archipelago_finish_aiocb(AIORequestData *reqdata)
> +{
> + int ret;
> + ret = qemu_archipelago_signal_pipe(reqdata->aio_cb);
> + if (ret < 0) {
> + error_report("archipelago_finish_aiocb(): failed writing"
> + " aio_cb->s->fds");
> + }
> + g_free(reqdata);
> +}
> +
> +static int wait_reply(struct xseg *xseg, xport srcport, struct xseg_port
> *port,
> + struct xseg_request *expected_req)
> +{
> + struct xseg_request *req;
> + xseg_prepare_wait(xseg, srcport);
> + void *psd = xseg_get_signal_desc(xseg, port);
> + while (1) {
> + req = xseg_receive(xseg, srcport, 0);
> + if (req) {
> + if (req != expected_req) {
> + archipelagolog("Unknown received request\n");
> + xseg_put_request(xseg, req, srcport);
> + } else if (!(req->state & XS_SERVED)) {
> + archipelagolog("Failed req\n");
> + return -1;
> + } else {
> + break;
> + }
> + }
> + xseg_wait_signal(xseg, psd, 100000UL);
> + }
> + xseg_cancel_wait(xseg, srcport);
> + return 0;
> +}
> +
> +static void xseg_request_handler(void *state)
> +{
This thread is only necessary because you're not integrating xseg into
the QEMU event loop. If you got the pipe fds from xseg and used
aio_set_fd_handler() you could eliminate this thread. The advantage is
that you can skip the archipelago_finish_aiocb() and get slightly better
performance due to one less context switch between threads.
> + BDRVArchipelagoState *s = (BDRVArchipelagoState *) state;
> + void *psd = xseg_get_signal_desc(s->xseg, s->port);
> + qemu_mutex_lock(&s->request_mutex);
> +
> + while (!s->stopping) {
> + struct xseg_request *req;
> + char *data;
> + xseg_prepare_wait(s->xseg, s->srcport);
> + req = xseg_receive(s->xseg, s->srcport, 0);
> + if (req) {
> + AIORequestData *reqdata;
> + ArchipelagoSegmentedRequest *segreq;
> + xseg_get_req_data(s->xseg, req, (void **)&reqdata);
> +
> + if (!(req->state & XS_SERVED)) {
> + segreq = reqdata->segreq;
> + __sync_bool_compare_and_swap(&segreq->failed, 0, 1);
> + }
> +
> + switch (reqdata->op) {
> + case ARCHIP_OP_READ:
> + data = xseg_get_data(s->xseg, req);
> + segreq = reqdata->segreq;
> + segreq->count += req->serviced;
> +
> + qemu_iovec_from_buf(reqdata->aio_cb->qiov,
> reqdata->bufidx,
> + data,
> + req->serviced);
> +
> + xseg_put_request(s->xseg, req, s->srcport);
> +
> + __sync_add_and_fetch(&segreq->ref, -1);
> +
> + if (segreq->ref == 0) {
Not sure about the value of __sync_add_and_fetch() since the if
statement fetches segreq->ref again. But I'm not reviewing the details
of the shared memory accesses. I'm assuming this stuff is correct,
secure, etc.
> + if (!segreq->failed) {
> + reqdata->aio_cb->ret = segreq->count;
> + archipelago_finish_aiocb(reqdata);
> + }
What does segreq->failed mean? We should always finish the I/O request,
otherwise the upper layers will run out of resources as we leak
failed requests.
> +static void parse_filename_opts(const char *filename, Error **errp,
> + char **volume, xport *mport, xport *vport)
> +{
> + const char *start;
> + char *tokens[3], *ds;
> + int idx;
> + xport lmport = NoPort, lvport = NoPort;
> +
> + strstart(filename, "archipelago:", &start);
> +
> + ds = g_strdup(start);
> + tokens[0] = strtok(ds, "/");
> + tokens[1] = strtok(NULL, ":");
> + tokens[2] = strtok(NULL, "\0");
> +
> + if (!strlen(tokens[0])) {
> + error_setg(errp, "volume name must be specified first");
> + return;
ds is leaked.
> + }
> +
> + for (idx = 1; idx < 3; idx++) {
> + if (tokens[idx] != NULL) {
> + if (strstart(tokens[idx], "mport=", NULL)) {
> + xseg_find_port(tokens[idx], "mport=", &lmport);
> + }
> + if (strstart(tokens[idx], "vport=", NULL)) {
> + xseg_find_port(tokens[idx], "vport=", &lvport);
> + }
> + }
> + }
> +
> + if ((lmport == (xport) -2) || (lvport == (xport) -2)) {
> + error_setg(errp, "Usage: file=archipelago:"
> + "<volumename>[/mport=<mapperd_port>"
> + "[:vport=<vlmcd_port>]]");
ds is leaked.
> + return;
> + }
> + *volume = g_strdup(tokens[0]);
> + *mport = lmport;
> + *vport = lvport;
> + g_free(ds);
> +}
> +
> +static void archipelago_parse_filename(const char *filename, QDict *options,
> + Error **errp)
> +{
> + const char *start;
> + char *volume = NULL;
> + xport mport = NoPort, vport = NoPort;
> +
> + if (qdict_haskey(options, ARCHIPELAGO_OPT_VOLUME)
> + || qdict_haskey(options, ARCHIPELAGO_OPT_MPORT)
> + || qdict_haskey(options, ARCHIPELAGO_OPT_VPORT)) {
> + error_setg(errp, "volume/mport/vport and a file name may not be "
> + "specified at the same time");
> + return;
> + }
> +
> + if (!strstart(filename, "archipelago:", &start)) {
> + error_setg(errp, "File name must start with 'archipelago:'");
> + return;
> + }
> +
> + if (!strlen(start) || strstart(start, "/", NULL)) {
> + error_setg(errp, "volume name must be specified");
> + return;
> + }
> +
> + parse_filename_opts(filename, errp, &volume, &mport, &vport);
> +
> + if (volume) {
> + qdict_put(options, ARCHIPELAGO_OPT_VOLUME, qstring_from_str(volume));
> + g_free(volume);
> + }
> + if (mport != NoPort) {
> + qdict_put(options, ARCHIPELAGO_OPT_MPORT, qint_from_int(mport));
> + }
> + if (vport != NoPort) {
> + qdict_put(options, ARCHIPELAGO_OPT_VPORT, qint_from_int(vport));
> + }
> +}
> +
> +static QemuOptsList archipelago_runtime_opts = {
> + .name = "archipelago",
> + .head = QTAILQ_HEAD_INITIALIZER(archipelago_runtime_opts.head),
> + .desc = {
> + {
> + .name = ARCHIPELAGO_OPT_VOLUME,
> + .type = QEMU_OPT_STRING,
> + .help = "Name of the volume image",
> + },
> + {
> + .name = ARCHIPELAGO_OPT_MPORT,
> + .type = QEMU_OPT_NUMBER,
> + .help = "Archipelago mapperd port number"
> + },
> + {
> + .name = ARCHIPELAGO_OPT_VPORT,
> + .type = QEMU_OPT_NUMBER,
> + .help = "Archipelago vlmcd port number"
> +
> + },
> + { /* end of list */ }
> + },
> +};
> +
> +static int qemu_archipelago_open(BlockDriverState *bs,
> + QDict *options,
> + int bdrv_flags,
> + Error **errp)
> +{
> + int ret = 0;
> + const char *volume;
> + QemuOpts *opts;
> + Error *local_err = NULL;
> + BDRVArchipelagoState *s = bs->opaque;
> +
> + opts = qemu_opts_create(&archipelago_runtime_opts, NULL, 0,
> &error_abort);
> + qemu_opts_absorb_qdict(opts, options, &local_err);
> + if (local_err) {
> + error_propagate(errp, local_err);
> + qemu_opts_del(opts);
> + return -EINVAL;
> + }
> +
> + s->mportno = qemu_opt_get_number(opts, ARCHIPELAGO_OPT_MPORT, 1001);
> + s->vportno = qemu_opt_get_number(opts, ARCHIPELAGO_OPT_VPORT, 501);
> +
> + volume = qemu_opt_get(opts, ARCHIPELAGO_OPT_VOLUME);
> + if (volume == NULL) {
> + error_setg(errp, "archipelago block driver requires an 'volume'"
> + " options");
"archipelago block driver requires the 'volume' option"
> + error_propagate(errp, local_err);
This line is unnecessary since the error message was already put into
errp.
> + qemu_opts_del(opts);
> + return -EINVAL;
> + }
> + s->volname = g_strdup(volume);
> +
> + /* Initialize XSEG, join shared memory segment */
> + ret = qemu_archipelago_init(s);
> + if (ret < 0) {
> + error_setg(errp, "cannot initialize XSEG and join shared "
> + "memory segment");
> + goto err_exit;
> + }
> +
> + s->event_reader_pos = 0;
> + ret = qemu_pipe(s->fds);
> + if (ret < 0) {
> + error_setg(errp, "cannot create pipe");
> + goto err_exit;
Do we need to xseg_leave() to avoid leaking xseg refcounts, leaving
memory mapped, and memory leaks?
> + }
> +
> + fcntl(s->fds[ARCHIP_FD_READ], F_SETFL, O_NONBLOCK);
> + fcntl(s->fds[ARCHIP_FD_WRITE], F_SETFL, O_NONBLOCK);
> + qemu_aio_set_fd_handler(s->fds[ARCHIP_FD_READ],
> + qemu_archipelago_aio_event_reader, NULL,
> + s);
> +
> + qemu_opts_del(opts);
> + return 0;
> +
> +err_exit:
> + qemu_opts_del(opts);
> + return ret;
s->volname is leaked
> +}
> +
> +static void qemu_archipelago_close(BlockDriverState *bs)
> +{
> + int r, targetlen;
> + char *target;
> + struct xseg_request *req;
> + BDRVArchipelagoState *s = bs->opaque;
> +
> + qemu_aio_set_fd_handler(s->fds[ARCHIP_FD_READ], NULL, NULL, NULL);
> + close(s->fds[0]);
> + close(s->fds[1]);
> +
> + s->stopping = true;
> +
> + qemu_mutex_lock(&s->request_mutex);
> + while (!s->th_is_signaled) {
> + qemu_cond_wait(&s->request_cond,
> + &s->request_mutex);
> + }
> + qemu_mutex_unlock(&s->request_mutex);
> + qemu_cond_destroy(&s->request_cond);
> + qemu_mutex_destroy(&s->request_mutex);
It's not safe to qemu_mutex_destroy() because the other thread may still
be inside qemu_mutex_unlock(&s->request_mutex) and may still access
s->request_mutex memory.
Use qemu_thread_join() before destroying request_cond and request_mutex.
That way you can be sure there is no race condition.
(I recently did the same thing and Paolo Bonzini pointed out the bug.
After checking the glibc implementation I was convinced that it's not
safe.)
> +
> + qemu_cond_destroy(&s->archip_cond);
> + qemu_mutex_destroy(&s->archip_mutex);
> +
> + targetlen = strlen(s->volname);
> + req = xseg_get_request(s->xseg, s->srcport, s->vportno, X_ALLOC);
> + if (!req) {
> + archipelagolog("Cannot get XSEG request\n");
> + goto err_exit;
> + }
> + r = xseg_prep_request(s->xseg, req, targetlen, 0);
> + if (r < 0) {
> + xseg_put_request(s->xseg, req, s->srcport);
> + archipelagolog("Cannot prepare XSEG close request\n");
> + goto err_exit;
> + }
> +
> + target = xseg_get_target(s->xseg, req);
> + strncpy(target, s->volname, targetlen);
Using strncpy() hints that target is a string when in fact it's not. I
think memcpy() would be clearer here since you don't want a '\0' byte at
the end of the string.
Or maybe I'm wrong and there is some guarantee that there will be a '\0'
byte after target?
> + req->size = req->datalen;
> + req->offset = 0;
> + req->op = X_CLOSE;
> +
> + xport p = xseg_submit(s->xseg, req, s->srcport, X_ALLOC);
> + if (p == NoPort) {
> + xseg_put_request(s->xseg, req, s->srcport);
> + archipelagolog("Cannot submit XSEG close request\n");
> + goto err_exit;
> + }
> +
> + xseg_signal(s->xseg, p);
> + r = wait_reply(s->xseg, s->srcport, s->port, req);
> + if (r < 0) {
> + archipelagolog("wait_reply() error\n");
> + }
> + if (!(req->state & XS_SERVED)) {
> + archipelagolog("Could no close map for volume '%s'\n", s->volname);
> + }
> +
> + xseg_put_request(s->xseg, req, s->srcport);
> +
> +err_exit:
> + xseg_leave_dynport(s->xseg, s->port);
> + xseg_leave(s->xseg);
s->volname is leaked.
> +}
> +
> +static void qemu_archipelago_aio_cancel(BlockDriverAIOCB *blockacb)
> +{
> + ArchipelagoAIOCB *aio_cb = (ArchipelagoAIOCB *) blockacb;
> + aio_cb->cancelled = true;
> + while (aio_cb->status == -EINPROGRESS) {
> + qemu_aio_wait();
> + }
> + qemu_aio_release(aio_cb);
> +}
> +
> +static const AIOCBInfo archipelago_aiocb_info = {
> + .aiocb_size = sizeof(ArchipelagoAIOCB),
> + .cancel = qemu_archipelago_aio_cancel,
> +};
> +
> +static int qemu_archipelago_signal_pipe(ArchipelagoAIOCB *aio_cb)
> +{
> + int ret = 0;
> + while (1) {
> + fd_set wfd;
> + int fd = aio_cb->s->fds[1];
> +
> + ret = write(fd, (void *)&aio_cb, sizeof(aio_cb));
> + if (ret > 0) {
> + break;
> + }
> + if (errno == EINTR) {
> + continue;
> + }
> + if (errno != EAGAIN) {
> + break;
> + }
> + FD_ZERO(&wfd);
> + FD_SET(fd, &wfd);
> + do {
> + ret = select(fd + 1, NULL, &wfd, NULL, NULL);
> + } while (ret < 0 && errno == EINTR);
> + }
> + return ret;
> +}
A newer signalling approach is available and will let you drop the pipe
code. QEMUBH is a "bottom half" or deferred function call that can be
scheduled in an event loop. Scheduling the the QEMUBH is thread-safe so
you can perform it from any thread.
See block/gluster.c:gluster_finish_aiocb() for an example using QEMUBH.
> +static int64_t archipelago_volume_info(BDRVArchipelagoState *s)
> +{
> + uint64_t size;
> + int ret, targetlen;
> + struct xseg_request *req;
> + struct xseg_reply_info *xinfo;
> + AIORequestData *reqdata = g_malloc(sizeof(AIORequestData));
> +
> + if (!reqdata) {
> + archipelagolog("Cannot allocate reqdata\n");
> + return -1;
g_malloc() never returns NULL, this if statement can be dropped.
pgpQwi3tlyFDQ.pgp
Description: PGP signature