On Thu, Jun 19, 2014 at 05:48:46PM +0300, Chrysostomos Nanakos wrote:
+typedef struct BDRVArchipelagoState {
+ int fds[2];
+ int qemu_aio_count;
This field is never used. It's increment and decremented but nothing
ever checks the value. It can be dropped.
+ int event_reader_pos;
+ ArchipelagoAIOCB *event_acb;
+ const char *volname;
+ uint64_t size;
+ /* Archipelago specific */
+ struct xseg *xseg;
+ struct xseg_port *port;
+ xport srcport;
+ xport sport;
+ xport mportno;
+ xport vportno;
+ QemuMutex archip_mutex;
+ QemuCond archip_cond;
+ bool is_signaled;
+ /* Request handler specific */
+ QemuThread request_th;
+ QemuCond request_cond;
+ QemuMutex request_mutex;
+ bool th_is_signaled;
+ bool stopping;
+} BDRVArchipelagoState;
+
+typedef struct ArchipelagoSegmentedRequest {
+ size_t count;
+ size_t total;
+ int ref;
+ int failed;
+} ArchipelagoSegmentedRequest;
+
+typedef struct AIORequestData {
+ const char *volname;
+ off_t offset;
+ size_t size;
+ uint64_t bufidx;
+ int ret;
+ int op;
+ ArchipelagoAIOCB *aio_cb;
+ ArchipelagoSegmentedRequest *segreq;
+} AIORequestData;
+
+
+static int qemu_archipelago_signal_pipe(ArchipelagoAIOCB *aio_cb);
+
+static void init_local_signal(struct xseg *xseg, xport sport, xport srcport)
+{
+ if (xseg && (sport != srcport)) {
+ xseg_init_local_signal(xseg, srcport);
+ sport = srcport;
+ }
+}
QEMU should clean up by calling xseg_quit_local_signal().
+
+static void archipelago_finish_aiocb(AIORequestData *reqdata)
+{
+ int ret;
+ ret = qemu_archipelago_signal_pipe(reqdata->aio_cb);
+ if (ret < 0) {
+ error_report("archipelago_finish_aiocb(): failed writing"
+ " aio_cb->s->fds");
+ }
+ g_free(reqdata);
+}
+
+static int wait_reply(struct xseg *xseg, xport srcport, struct xseg_port *port,
+ struct xseg_request *expected_req)
+{
+ struct xseg_request *req;
+ xseg_prepare_wait(xseg, srcport);
+ void *psd = xseg_get_signal_desc(xseg, port);
+ while (1) {
+ req = xseg_receive(xseg, srcport, 0);
+ if (req) {
+ if (req != expected_req) {
+ archipelagolog("Unknown received request\n");
+ xseg_put_request(xseg, req, srcport);
+ } else if (!(req->state & XS_SERVED)) {
+ archipelagolog("Failed req\n");
+ return -1;
+ } else {
+ break;
+ }
+ }
+ xseg_wait_signal(xseg, psd, 100000UL);
+ }
+ xseg_cancel_wait(xseg, srcport);
+ return 0;
+}
+
+static void xseg_request_handler(void *state)
+{
This thread is only necessary because you're not integrating xseg into
the QEMU event loop. If you got the pipe fds from xseg and used
aio_set_fd_handler() you could eliminate this thread. The advantage is
that you can skip the archipelago_finish_aiocb() and get slightly better
performance due to one less context switch between threads.
+ BDRVArchipelagoState *s = (BDRVArchipelagoState *) state;
+ void *psd = xseg_get_signal_desc(s->xseg, s->port);
+ qemu_mutex_lock(&s->request_mutex);
+
+ while (!s->stopping) {
+ struct xseg_request *req;
+ char *data;
+ xseg_prepare_wait(s->xseg, s->srcport);
+ req = xseg_receive(s->xseg, s->srcport, 0);
+ if (req) {
+ AIORequestData *reqdata;
+ ArchipelagoSegmentedRequest *segreq;
+ xseg_get_req_data(s->xseg, req, (void **)&reqdata);
+
+ if (!(req->state & XS_SERVED)) {
+ segreq = reqdata->segreq;
+ __sync_bool_compare_and_swap(&segreq->failed, 0, 1);
+ }
+
+ switch (reqdata->op) {
+ case ARCHIP_OP_READ:
+ data = xseg_get_data(s->xseg, req);
+ segreq = reqdata->segreq;
+ segreq->count += req->serviced;
+
+ qemu_iovec_from_buf(reqdata->aio_cb->qiov, reqdata->bufidx,
+ data,
+ req->serviced);
+
+ xseg_put_request(s->xseg, req, s->srcport);
+
+ __sync_add_and_fetch(&segreq->ref, -1);
+
+ if (segreq->ref == 0) {
Not sure about the value of __sync_add_and_fetch() since the if
statement fetches segreq->ref again. But I'm not reviewing the details
of the shared memory accesses. I'm assuming this stuff is correct,
secure, etc.
+static void parse_filename_opts(const char *filename, Error **errp,
+ char **volume, xport *mport, xport *vport)
+{
+ const char *start;
+ char *tokens[3], *ds;
+ int idx;
+ xport lmport = NoPort, lvport = NoPort;
+
+ strstart(filename, "archipelago:", &start);
+
+ ds = g_strdup(start);
+ tokens[0] = strtok(ds, "/");
+ tokens[1] = strtok(NULL, ":");
+ tokens[2] = strtok(NULL, "\0");
+
+ if (!strlen(tokens[0])) {
+ error_setg(errp, "volume name must be specified first");
+ return;
ds is leaked.
+ }
+
+ for (idx = 1; idx < 3; idx++) {
+ if (tokens[idx] != NULL) {
+ if (strstart(tokens[idx], "mport=", NULL)) {
+ xseg_find_port(tokens[idx], "mport=", &lmport);
+ }
+ if (strstart(tokens[idx], "vport=", NULL)) {
+ xseg_find_port(tokens[idx], "vport=", &lvport);
+ }
+ }
+ }
+
+ if ((lmport == (xport) -2) || (lvport == (xport) -2)) {
+ error_setg(errp, "Usage: file=archipelago:"
+ "<volumename>[/mport=<mapperd_port>"
+ "[:vport=<vlmcd_port>]]");
ds is leaked.
+ return;
+ }
+ *volume = g_strdup(tokens[0]);
+ *mport = lmport;
+ *vport = lvport;
+ g_free(ds);
+}
+
+static void archipelago_parse_filename(const char *filename, QDict *options,
+ Error **errp)
+{
+ const char *start;
+ char *volume = NULL;
+ xport mport = NoPort, vport = NoPort;
+
+ if (qdict_haskey(options, ARCHIPELAGO_OPT_VOLUME)
+ || qdict_haskey(options, ARCHIPELAGO_OPT_MPORT)
+ || qdict_haskey(options, ARCHIPELAGO_OPT_VPORT)) {
+ error_setg(errp, "volume/mport/vport and a file name may not be "
+ "specified at the same time");
+ return;
+ }
+
+ if (!strstart(filename, "archipelago:", &start)) {
+ error_setg(errp, "File name must start with 'archipelago:'");
+ return;
+ }
+
+ if (!strlen(start) || strstart(start, "/", NULL)) {
+ error_setg(errp, "volume name must be specified");
+ return;
+ }
+
+ parse_filename_opts(filename, errp, &volume, &mport, &vport);
+
+ if (volume) {
+ qdict_put(options, ARCHIPELAGO_OPT_VOLUME, qstring_from_str(volume));
+ g_free(volume);
+ }
+ if (mport != NoPort) {
+ qdict_put(options, ARCHIPELAGO_OPT_MPORT, qint_from_int(mport));
+ }
+ if (vport != NoPort) {
+ qdict_put(options, ARCHIPELAGO_OPT_VPORT, qint_from_int(vport));
+ }
+}
+
+static QemuOptsList archipelago_runtime_opts = {
+ .name = "archipelago",
+ .head = QTAILQ_HEAD_INITIALIZER(archipelago_runtime_opts.head),
+ .desc = {
+ {
+ .name = ARCHIPELAGO_OPT_VOLUME,
+ .type = QEMU_OPT_STRING,
+ .help = "Name of the volume image",
+ },
+ {
+ .name = ARCHIPELAGO_OPT_MPORT,
+ .type = QEMU_OPT_NUMBER,
+ .help = "Archipelago mapperd port number"
+ },
+ {
+ .name = ARCHIPELAGO_OPT_VPORT,
+ .type = QEMU_OPT_NUMBER,
+ .help = "Archipelago vlmcd port number"
+
+ },
+ { /* end of list */ }
+ },
+};
+
+static int qemu_archipelago_open(BlockDriverState *bs,
+ QDict *options,
+ int bdrv_flags,
+ Error **errp)
+{
+ int ret = 0;
+ const char *volume;
+ QemuOpts *opts;
+ Error *local_err = NULL;
+ BDRVArchipelagoState *s = bs->opaque;
+
+ opts = qemu_opts_create(&archipelago_runtime_opts, NULL, 0, &error_abort);
+ qemu_opts_absorb_qdict(opts, options, &local_err);
+ if (local_err) {
+ error_propagate(errp, local_err);
+ qemu_opts_del(opts);
+ return -EINVAL;
+ }
+
+ s->mportno = qemu_opt_get_number(opts, ARCHIPELAGO_OPT_MPORT, 1001);
+ s->vportno = qemu_opt_get_number(opts, ARCHIPELAGO_OPT_VPORT, 501);
+
+ volume = qemu_opt_get(opts, ARCHIPELAGO_OPT_VOLUME);
+ if (volume == NULL) {
+ error_setg(errp, "archipelago block driver requires an 'volume'"
+ " options");
"archipelago block driver requires the 'volume' option"
+ error_propagate(errp, local_err);
This line is unnecessary since the error message was already put into
errp.
+ qemu_opts_del(opts);
+ return -EINVAL;
+ }
+ s->volname = g_strdup(volume);
+
+ /* Initialize XSEG, join shared memory segment */
+ ret = qemu_archipelago_init(s);
+ if (ret < 0) {
+ error_setg(errp, "cannot initialize XSEG and join shared "
+ "memory segment");
+ goto err_exit;
+ }
+
+ s->event_reader_pos = 0;
+ ret = qemu_pipe(s->fds);
+ if (ret < 0) {
+ error_setg(errp, "cannot create pipe");
+ goto err_exit;
Do we need to xseg_leave() to avoid leaking xseg refcounts, leaving
memory mapped, and memory leaks?
+ }
+
+ fcntl(s->fds[ARCHIP_FD_READ], F_SETFL, O_NONBLOCK);
+ fcntl(s->fds[ARCHIP_FD_WRITE], F_SETFL, O_NONBLOCK);
+ qemu_aio_set_fd_handler(s->fds[ARCHIP_FD_READ],
+ qemu_archipelago_aio_event_reader, NULL,
+ s);
+
+ qemu_opts_del(opts);
+ return 0;
+
+err_exit:
+ qemu_opts_del(opts);
+ return ret;
s->volname is leaked
+}
+
+static void qemu_archipelago_close(BlockDriverState *bs)
+{
+ int r, targetlen;
+ char *target;
+ struct xseg_request *req;
+ BDRVArchipelagoState *s = bs->opaque;
+
+ qemu_aio_set_fd_handler(s->fds[ARCHIP_FD_READ], NULL, NULL, NULL);
+ close(s->fds[0]);
+ close(s->fds[1]);
+
+ s->stopping = true;
+
+ qemu_mutex_lock(&s->request_mutex);
+ while (!s->th_is_signaled) {
+ qemu_cond_wait(&s->request_cond,
+ &s->request_mutex);
+ }
+ qemu_mutex_unlock(&s->request_mutex);
+ qemu_cond_destroy(&s->request_cond);
+ qemu_mutex_destroy(&s->request_mutex);
It's not safe to qemu_mutex_destroy() because the other thread may still
be inside qemu_mutex_unlock(&s->request_mutex) and may still access
s->request_mutex memory.
Use qemu_thread_join() before destroying request_cond and request_mutex.
That way you can be sure there is no race condition.
(I recently did the same thing and Paolo Bonzini pointed out the bug.
After checking the glibc implementation I was convinced that it's not
safe.)
+
+ qemu_cond_destroy(&s->archip_cond);
+ qemu_mutex_destroy(&s->archip_mutex);
+
+ targetlen = strlen(s->volname);
+ req = xseg_get_request(s->xseg, s->srcport, s->vportno, X_ALLOC);
+ if (!req) {
+ archipelagolog("Cannot get XSEG request\n");
+ goto err_exit;
+ }
+ r = xseg_prep_request(s->xseg, req, targetlen, 0);
+ if (r < 0) {
+ xseg_put_request(s->xseg, req, s->srcport);
+ archipelagolog("Cannot prepare XSEG close request\n");
+ goto err_exit;
+ }
+
+ target = xseg_get_target(s->xseg, req);
+ strncpy(target, s->volname, targetlen);
Using strncpy() hints that target is a string when in fact it's not. I
think memcpy() would be clearer here since you don't want a '\0' byte at
the end of the string.
Or maybe I'm wrong and there is some guarantee that there will be a '\0'
byte after target?
+ req->size = req->datalen;
+ req->offset = 0;
+ req->op = X_CLOSE;
+
+ xport p = xseg_submit(s->xseg, req, s->srcport, X_ALLOC);
+ if (p == NoPort) {
+ xseg_put_request(s->xseg, req, s->srcport);
+ archipelagolog("Cannot submit XSEG close request\n");
+ goto err_exit;
+ }
+
+ xseg_signal(s->xseg, p);
+ r = wait_reply(s->xseg, s->srcport, s->port, req);
+ if (r < 0) {
+ archipelagolog("wait_reply() error\n");
+ }
+ if (!(req->state & XS_SERVED)) {
+ archipelagolog("Could no close map for volume '%s'\n", s->volname);
+ }
+
+ xseg_put_request(s->xseg, req, s->srcport);
+
+err_exit:
+ xseg_leave_dynport(s->xseg, s->port);
+ xseg_leave(s->xseg);
s->volname is leaked.
+}
+
+static void qemu_archipelago_aio_cancel(BlockDriverAIOCB *blockacb)
+{
+ ArchipelagoAIOCB *aio_cb = (ArchipelagoAIOCB *) blockacb;
+ aio_cb->cancelled = true;
+ while (aio_cb->status == -EINPROGRESS) {
+ qemu_aio_wait();
+ }
+ qemu_aio_release(aio_cb);
+}
+
+static const AIOCBInfo archipelago_aiocb_info = {
+ .aiocb_size = sizeof(ArchipelagoAIOCB),
+ .cancel = qemu_archipelago_aio_cancel,
+};
+
+static int qemu_archipelago_signal_pipe(ArchipelagoAIOCB *aio_cb)
+{
+ int ret = 0;
+ while (1) {
+ fd_set wfd;
+ int fd = aio_cb->s->fds[1];
+
+ ret = write(fd, (void *)&aio_cb, sizeof(aio_cb));
+ if (ret > 0) {
+ break;
+ }
+ if (errno == EINTR) {
+ continue;
+ }
+ if (errno != EAGAIN) {
+ break;
+ }
+ FD_ZERO(&wfd);
+ FD_SET(fd, &wfd);
+ do {
+ ret = select(fd + 1, NULL, &wfd, NULL, NULL);
+ } while (ret < 0 && errno == EINTR);
+ }
+ return ret;
+}
A newer signalling approach is available and will let you drop the pipe
code. QEMUBH is a "bottom half" or deferred function call that can be
scheduled in an event loop. Scheduling the the QEMUBH is thread-safe so
you can perform it from any thread.