[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
Re: [PATCH v8 2/4] generic vhost user server
From: |
Stefan Hajnoczi |
Subject: |
Re: [PATCH v8 2/4] generic vhost user server |
Date: |
Thu, 11 Jun 2020 14:14:49 +0100 |
On Fri, Jun 05, 2020 at 07:35:36AM +0800, Coiby Xu wrote:
> +static bool coroutine_fn
> +vu_message_read(VuDev *vu_dev, int conn_fd, VhostUserMsg *vmsg)
> +{
> + struct iovec iov = {
> + .iov_base = (char *)vmsg,
> + .iov_len = VHOST_USER_HDR_SIZE,
> + };
> + int rc, read_bytes = 0;
> + Error *local_err = NULL;
> + /*
> + * Store fds/nfds returned from qio_channel_readv_full into
> + * temporary variables.
> + *
> + * VhostUserMsg is a packed structure, gcc will complain about passing
> + * pointer to a packed structure member if we pass &VhostUserMsg.fd_num
> + * and &VhostUserMsg.fds directly when calling qio_channel_readv_full,
> + * thus two temporary variables nfds and fds are used here.
> + */
> + size_t nfds = 0, nfds_t = 0;
> + int *fds = NULL, *fds_t = NULL;
> + VuServer *server = container_of(vu_dev, VuServer, vu_dev);
> + QIOChannel *ioc = NULL;
> +
> + if (conn_fd == server->sioc->fd) {
> + ioc = server->ioc;
> + } else {
> + /* Slave communication will also use this function to read msg */
> + ioc = slave_io_channel(server, conn_fd, &local_err);
> + }
> +
> + if (!ioc) {
> + error_report_err(local_err);
> + goto fail;
> + }
> +
> + assert(qemu_in_coroutine());
> + do {
> + /*
> + * qio_channel_readv_full may have short reads, keeping calling it
> + * until getting VHOST_USER_HDR_SIZE or 0 bytes in total
> + */
> + rc = qio_channel_readv_full(ioc, &iov, 1, &fds_t, &nfds_t,
> &local_err);
> + if (rc < 0) {
> + if (rc == QIO_CHANNEL_ERR_BLOCK) {
> + qio_channel_yield(ioc, G_IO_IN);
> + continue;
> + } else {
> + error_report_err(local_err);
> + return false;
> + }
> + }
> + read_bytes += rc;
> + if (nfds_t > 0) {
> + fds = g_renew(int, fds, nfds + nfds_t);
> + memcpy(fds + nfds, fds_t, nfds_t *sizeof(int));
> + nfds += nfds_t;
> + if (nfds > VHOST_MEMORY_MAX_NREGIONS) {
> + error_report("A maximum of %d fds are allowed, "
> + "however got %lu fds now",
> + VHOST_MEMORY_MAX_NREGIONS, nfds);
> + goto fail;
> + }
> + g_free(fds_t);
I'm not sure why the temporary fds[] array is necessary. Copying the fds
directly into vmsg->fds would be simpler:
if (nfds + nfds_t > G_N_ELEMENTS(vmsg->fds)) {
error_report("A maximum of %d fds are allowed, "
"however got %lu fds now",
VHOST_MEMORY_MAX_NREGIONS, nfds);
goto fail;
}
memcpy(vmsg->fds + nfds, fds_t, nfds_t * sizeof(vds->fds[0]));
nfds += nfds_t;
Did I misunderstand how this works?
> + }
> + if (read_bytes == VHOST_USER_HDR_SIZE || rc == 0) {
> + break;
> + }
> + iov.iov_base = (char *)vmsg + read_bytes;
> + iov.iov_len = VHOST_USER_HDR_SIZE - read_bytes;
> + } while (true);
> +
> + vmsg->fd_num = nfds;
> + if (nfds > 0) {
> + memcpy(vmsg->fds, fds, nfds * sizeof(int));
> + }
> + g_free(fds);
> + /* qio_channel_readv_full will make socket fds blocking, unblock them */
> + vmsg_unblock_fds(vmsg);
> + if (vmsg->size > sizeof(vmsg->payload)) {
> + error_report("Error: too big message request: %d, "
> + "size: vmsg->size: %u, "
> + "while sizeof(vmsg->payload) = %zu",
> + vmsg->request, vmsg->size, sizeof(vmsg->payload));
> + goto fail;
> + }
> +
> + struct iovec iov_payload = {
> + .iov_base = (char *)&vmsg->payload,
> + .iov_len = vmsg->size,
> + };
> + if (vmsg->size) {
> + rc = qio_channel_readv_all_eof(ioc, &iov_payload, 1, &local_err);
> + if (rc == -1) {
> + error_report_err(local_err);
> + goto fail;
> + }
> + }
> +
> + return true;
> +
> +fail:
> + vmsg_close_fds(vmsg);
> +
> + return false;
> +}
> +
> +
> +static void vu_client_start(VuServer *server);
> +static coroutine_fn void vu_client_trip(void *opaque)
> +{
> + VuServer *server = opaque;
> +
> + while (!server->aio_context_changed && server->sioc) {
> + vu_dispatch(&server->vu_dev);
> + }
> +
> + if (server->aio_context_changed && server->sioc) {
> + server->aio_context_changed = false;
> + vu_client_start(server);
> + }
> +}
> +
> +static void vu_client_start(VuServer *server)
> +{
> + server->co_trip = qemu_coroutine_create(vu_client_trip, server);
> + aio_co_enter(server->ctx, server->co_trip);
> +}
> +
> +/*
> + * a wrapper for vu_kick_cb
> + *
> + * since aio_dispatch can only pass one user data pointer to the
> + * callback function, pack VuDev and pvt into a struct. Then unpack it
> + * and pass them to vu_kick_cb
> + */
> +static void kick_handler(void *opaque)
> +{
> + KickInfo *kick_info = opaque;
> + kick_info->cb(kick_info->vu_dev, 0, (void *) kick_info->index);
> +}
> +
> +
> +static void
> +set_watch(VuDev *vu_dev, int fd, int vu_evt,
> + vu_watch_cb cb, void *pvt)
> +{
> +
> + VuServer *server = container_of(vu_dev, VuServer, vu_dev);
> + g_assert(vu_dev);
> + g_assert(fd >= 0);
> + long index = (intptr_t) pvt;
> + g_assert(cb);
> + KickInfo *kick_info = &server->kick_info[index];
> + if (!kick_info->cb) {
> + kick_info->fd = fd;
> + kick_info->cb = cb;
> + qemu_set_nonblock(fd);
> + aio_set_fd_handler(server->ioc->ctx, fd, false, kick_handler,
> + NULL, NULL, kick_info);
> + kick_info->vu_dev = vu_dev;
> + }
> +}
> +
> +
> +static void remove_watch(VuDev *vu_dev, int fd)
> +{
> + VuServer *server;
> + int i;
> + int index = -1;
> + g_assert(vu_dev);
> + g_assert(fd >= 0);
> +
> + server = container_of(vu_dev, VuServer, vu_dev);
> + for (i = 0; i < vu_dev->max_queues; i++) {
> + if (server->kick_info[i].fd == fd) {
> + index = i;
> + break;
> + }
> + }
> +
> + if (index == -1) {
> + return;
> + }
> + server->kick_info[i].cb = NULL;
> + aio_set_fd_handler(server->ioc->ctx, fd, false, NULL, NULL, NULL, NULL);
> +}
> +
> +
> +static void vu_accept(QIONetListener *listener, QIOChannelSocket *sioc,
> + gpointer opaque)
> +{
> + VuServer *server = opaque;
> +
> + if (server->sioc) {
> + warn_report("Only one vhost-user client is allowed to "
> + "connect the server one time");
> + return;
> + }
> +
> + if (!vu_init(&server->vu_dev, server->max_queues, sioc->fd, panic_cb,
> + vu_message_read, set_watch, remove_watch,
> server->vu_iface)) {
> + error_report("Failed to initialized libvhost-user");
> + return;
> + }
> +
> + /*
> + * Unset the callback function for network listener to make another
> + * vhost-user client keeping waiting until this client disconnects
> + */
> + qio_net_listener_set_client_func(server->listener,
> + NULL,
> + NULL,
> + NULL);
> + server->sioc = sioc;
> + server->kick_info = g_new0(KickInfo, server->max_queues);
Where is kick_info freed?
> + /*
> + * Increase the object reference, so cioc will not freed by
s/cioc/sioc/
> + * qio_net_listener_channel_func which will call
> object_unref(OBJECT(sioc))
> + */
> + object_ref(OBJECT(server->sioc));
> + qio_channel_set_name(QIO_CHANNEL(sioc), "vhost-user client");
> + server->ioc = QIO_CHANNEL(sioc);
> + object_ref(OBJECT(server->ioc));
> + object_ref(OBJECT(sioc));
Why are there two object_refs for sioc and where is unref called?
> + qio_channel_attach_aio_context(server->ioc, server->ctx);
> + qio_channel_set_blocking(QIO_CHANNEL(server->sioc), false, NULL);
> + vu_client_start(server);
> +}
> +
> +
> +void vhost_user_server_stop(VuServer *server)
> +{
> + if (!server) {
> + return;
> + }
> +
> + if (server->sioc) {
> + close_client(server);
> + object_unref(OBJECT(server->sioc));
This call is object_unref(NULL) since close_client() does server->sioc =
NULL.
> + }
> +
> + if (server->listener) {
> + qio_net_listener_disconnect(server->listener);
> + object_unref(OBJECT(server->listener));
> + }
> +}
> +
> +static void detach_context(VuServer *server)
> +{
> + int i;
> + AioContext *ctx = server->ioc->ctx;
> + qio_channel_detach_aio_context(server->ioc);
> + for (i = 0; i < server->vu_dev.max_queues; i++) {
> + if (server->kick_info[i].cb) {
> + aio_set_fd_handler(ctx, server->kick_info[i].fd, false, NULL,
> + NULL, NULL, NULL);
> + }
> + }
> +}
> +
> +static void attach_context(VuServer *server, AioContext *ctx)
> +{
> + int i;
> + qio_channel_attach_aio_context(server->ioc, ctx);
> + server->aio_context_changed = true;
> + if (server->co_trip) {
> + aio_co_schedule(ctx, server->co_trip);
> + }
> + for (i = 0; i < server->vu_dev.max_queues; i++) {
> + if (server->kick_info[i].cb) {
> + aio_set_fd_handler(ctx, server->kick_info[i].fd, false,
> + kick_handler, NULL, NULL,
> + &server->kick_info[i]);
> + }
> + }
> +}
> +
> +void vhost_user_server_set_aio_context(AioContext *ctx, VuServer *server)
> +{
> + server->ctx = ctx ? ctx : qemu_get_aio_context();
> + if (!server->sioc) {
> + return;
> + }
> + if (ctx) {
> + attach_context(server, ctx);
> + } else {
> + detach_context(server);
> + }
> +}
> +
> +
> +bool vhost_user_server_start(uint16_t max_queues,
> + SocketAddress *socket_addr,
> + AioContext *ctx,
> + VuServer *server,
> + void *device_panic_notifier,
> + const VuDevIface *vu_iface,
> + Error **errp)
> +{
> + server->listener = qio_net_listener_new();
> + if (qio_net_listener_open_sync(server->listener, socket_addr, 1,
> + errp) < 0) {
> + goto error;
> + }
> +
> + qio_net_listener_set_name(server->listener,
> "vhost-user-backend-listener");
> +
> + server->vu_iface = vu_iface;
> + server->max_queues = max_queues;
> + server->ctx = ctx;
> + server->device_panic_notifier = device_panic_notifier;
> + qio_net_listener_set_client_func(server->listener,
> + vu_accept,
> + server,
> + NULL);
The qio_net_listener_set_client_func() call uses the default
GMainContext but we have an AioContext *ctx argument. This is
surprising. I would expect the socket to be handled in the AioContext.
Can you clarify how this should work?
> +
> + return true;
> +error:
> + g_free(server);
It's surprising that this function frees the server argument when an
error occurs. vhost_user_server_stop() does not free server. I suggest
letting the caller free server since they own the object.
> + return false;
> +}
> diff --git a/util/vhost-user-server.h b/util/vhost-user-server.h
> new file mode 100644
> index 0000000000..4315556b66
> --- /dev/null
> +++ b/util/vhost-user-server.h
> @@ -0,0 +1,59 @@
> +/*
> + * Sharing QEMU devices via vhost-user protocol
> + *
> + * Author: Coiby Xu <coiby.xu@gmail.com>
> + *
> + * This work is licensed under the terms of the GNU GPL, version 2 or
> + * later. See the COPYING file in the top-level directory.
> + */
> +
> +#ifndef VHOST_USER_SERVER_H
> +#define VHOST_USER_SERVER_H
> +
> +#include "contrib/libvhost-user/libvhost-user.h"
> +#include "io/channel-socket.h"
> +#include "io/channel-file.h"
> +#include "io/net-listener.h"
> +#include "qemu/error-report.h"
> +#include "qapi/error.h"
> +#include "standard-headers/linux/virtio_blk.h"
> +
> +typedef struct KickInfo {
> + VuDev *vu_dev;
> + int fd; /*kick fd*/
> + long index; /*queue index*/
> + vu_watch_cb cb;
> +} KickInfo;
> +
> +typedef struct VuServer {
> + QIONetListener *listener;
> + AioContext *ctx;
> + void (*device_panic_notifier)(struct VuServer *server) ;
> + int max_queues;
> + const VuDevIface *vu_iface;
> + VuDev vu_dev;
> + QIOChannel *ioc; /* The I/O channel with the client */
> + QIOChannelSocket *sioc; /* The underlying data channel with the client */
> + /* IOChannel for fd provided via VHOST_USER_SET_SLAVE_REQ_FD */
> + QIOChannel *ioc_slave;
> + QIOChannelSocket *sioc_slave;
> + Coroutine *co_trip; /* coroutine for processing VhostUserMsg */
> + KickInfo *kick_info; /* an array with the length of the queue number */
> + /* restart coroutine co_trip if AIOContext is changed */
> + bool aio_context_changed;
> +} VuServer;
> +
> +
> +bool vhost_user_server_start(uint16_t max_queues,
> + SocketAddress *unix_socket,
> + AioContext *ctx,
> + VuServer *server,
> + void *device_panic_notifier,
Please declare the function pointer type:
typedef void DevicePanicNotifierFn(struct VuServer *server);
Then the argument list can use DevicePanicNotifierFn
*device_panic_notifier instead of void *.
> + const VuDevIface *vu_iface,
> + Error **errp);
> +
> +void vhost_user_server_stop(VuServer *server);
> +
> +void vhost_user_server_set_aio_context(AioContext *ctx, VuServer *server);
If you send another revision, please make VuServer *server the first
argument of vhost_user_server_start() and
vhost_user_server_set_aio_context(). Functions usually have the object
they act on as the first argument.
signature.asc
Description: PGP signature
Re: [PATCH v8 0/4] vhost-user block device backend implementation, Stefano Garzarella, 2020/06/11