gluster-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Gluster-devel] [RFC PATCH v0 1/1] readv_zcopy() implementation


From: Bharata B Rao
Subject: [Gluster-devel] [RFC PATCH v0 1/1] readv_zcopy() implementation
Date: Tue, 5 Mar 2013 20:11:46 +0530

A hack to support zero copy readv, only supports glfs_preadv_async() now.

From: Bharata B Rao <address@hidden>


---
 api/src/glfs-fops.c                           |   38 +++++++
 api/src/glfs.h                                |    2
 libglusterfs/src/call-stub.c                  |   63 ++++++++++++
 libglusterfs/src/call-stub.h                  |   17 +++
 libglusterfs/src/globals.c                    |    1
 libglusterfs/src/glusterfs.h                  |    1
 libglusterfs/src/syncop.c                     |   37 +++++++
 libglusterfs/src/syncop.h                     |    4 +
 libglusterfs/src/xlator.h                     |   20 ++++
 rpc/rpc-lib/src/protocol-common.h             |    1
 rpc/rpc-lib/src/rpc-clnt.c                    |    7 +
 rpc/rpc-lib/src/rpc-clnt.h                    |    2
 rpc/rpc-lib/src/rpc-transport.h               |    3 +
 rpc/rpc-transport/socket/src/socket.c         |  103 ++++++++++++++++++-
 xlators/cluster/dht/src/dht-common.h          |    7 +
 xlators/cluster/dht/src/dht-inode-read.c      |   92 +++++++++++++++++
 xlators/cluster/dht/src/dht.c                 |    1
 xlators/debug/error-gen/src/error-gen.c       |   49 +++++++++
 xlators/debug/io-stats/src/io-stats.c         |   52 ++++++++++
 xlators/performance/md-cache/src/md-cache.c   |   43 ++++++++
 xlators/protocol/client/src/client-rpc-fops.c |  135 +++++++++++++++++++++++++
 xlators/protocol/client/src/client.c          |   41 ++++++++
 xlators/protocol/client/src/client.h          |    2
 xlators/protocol/server/src/server-rpc-fops.c |  133 +++++++++++++++++++++++++
 24 files changed, 848 insertions(+), 6 deletions(-)

diff --git a/api/src/glfs-fops.c b/api/src/glfs-fops.c
index be26dc1..b977a4b 100644
--- a/api/src/glfs-fops.c
+++ b/api/src/glfs-fops.c
@@ -344,6 +344,35 @@ glfs_preadv (struct glfs_fd *glfd, const struct
iovec *iovec, int iovcnt,
        return size;
 }

+ssize_t
+glfs_preadv_zcopy (struct glfs_fd *glfd, const struct iovec *iovec, int iovcnt,
+            off_t offset, int flags)
+{
+       xlator_t       *subvol = NULL;
+       int             ret = -1;
+       struct iovec   *iov = NULL;
+       int             cnt = 0;
+       struct iobref  *iobref = NULL;
+
+       __glfs_entry_fd (glfd);
+
+       subvol = glfs_fd_subvol (glfd);
+
+       ret = syncop_readv_zcopy (subvol, glfd->fd, offset,
+                           0, iovec, iovcnt, &iobref);
+       if (ret <= 0)
+               return ret;
+
+       glfd->offset = (offset + ret);
+
+       if (iov)
+               GF_FREE (iov);
+       if (iobref)
+               iobref_unref (iobref);
+
+       return ret;
+}
+

 ssize_t
 glfs_read (struct glfs_fd *glfd, void *buf, size_t count, int flags)
@@ -425,6 +454,10 @@ glfs_io_async_task (void *data)
                ret = glfs_preadv (gio->glfd, gio->iov, gio->count,
                                   gio->offset, gio->flags);
                break;
+       case GF_FOP_READ_ZCOPY:
+               ret = glfs_preadv_zcopy (gio->glfd, gio->iov, gio->count,
+                                  gio->offset, gio->flags);
+               break;
        case GF_FOP_WRITE:
                ret = glfs_pwritev (gio->glfd, gio->iov, gio->count,
                                    gio->offset, gio->flags);
@@ -464,7 +497,10 @@ glfs_preadv_async (struct glfs_fd *glfd, const
struct iovec *iovec, int count,
                return -1;
        }

-       gio->op     = GF_FOP_READ;
+       if (flags & GLUSTERFS_READV_ZCOPY)
+               gio->op     = GF_FOP_READ_ZCOPY;
+       else
+               gio->op     = GF_FOP_READ;
        gio->glfd   = glfd;
        gio->count  = count;
        gio->offset = offset;
diff --git a/api/src/glfs.h b/api/src/glfs.h
index e19c1cd..435fe45 100644
--- a/api/src/glfs.h
+++ b/api/src/glfs.h
@@ -48,6 +48,8 @@ __BEGIN_DECLS
 struct glfs;
 typedef struct glfs glfs_t;

+/* flags for readv variants */
+#define GLUSTERFS_READV_ZCOPY  0x1

 /*
   SYNOPSIS
diff --git a/libglusterfs/src/call-stub.c b/libglusterfs/src/call-stub.c
index 7bf8613..3f74eb6 100644
--- a/libglusterfs/src/call-stub.c
+++ b/libglusterfs/src/call-stub.c
@@ -918,6 +918,58 @@ out:
         return stub;
 }

+call_stub_t *
+fop_readv_zcopy_stub (call_frame_t *frame, fop_readv_zcopy_t fn,
+                fd_t *fd, struct iovec *vector, int32_t count, off_t off,
+               uint32_t flags, dict_t *xdata)
+{
+        call_stub_t *stub = NULL;
+
+        GF_VALIDATE_OR_GOTO ("call-stub", frame, out);
+
+       stub = stub_new (frame, 1, GF_FOP_READ_ZCOPY);
+        GF_VALIDATE_OR_GOTO ("call-stub", stub, out);
+
+        stub->fn.readv = fn;
+        if (fd)
+                stub->args.fd = fd_ref (fd);
+        stub->args.size  = iov_length(vector, count);
+        stub->args.offset  = off;
+        stub->args.flags = flags;
+
+        if (xdata)
+                stub->args.xdata = dict_ref (xdata);
+out:
+        return stub;
+}
+
+
+call_stub_t *
+fop_readv_zcopy_cbk_stub (call_frame_t *frame, fop_readv_cbk_t fn,
+                    int32_t op_ret, int32_t op_errno,
+                   struct iatt *stbuf,
+                    struct iobref *iobref, dict_t *xdata)
+{
+        call_stub_t *stub = NULL;
+
+        GF_VALIDATE_OR_GOTO ("call-stub", frame, out);
+
+        stub = stub_new (frame, 0, GF_FOP_READ_ZCOPY);
+        GF_VALIDATE_OR_GOTO ("call-stub", stub, out);
+
+        stub->fn_cbk.readv = fn;
+        stub->args_cbk.op_ret = op_ret;
+        stub->args_cbk.op_errno = op_errno;
+        if (op_ret >= 0) {
+                stub->args_cbk.stat = *stbuf;
+                stub->args_cbk.iobref = iobref_ref (iobref);
+        }
+        if (xdata)
+                stub->args_cbk.xdata = dict_ref (xdata);
+out:
+        return stub;
+}
+

 call_stub_t *
 fop_writev_stub (call_frame_t *frame, fop_writev_t fn,
@@ -2204,6 +2256,12 @@ call_resume_wind (call_stub_t *stub)
                                stub->args.offset, stub->args.flags,
                                stub->args.xdata);
                 break;
+        case GF_FOP_READ_ZCOPY:
+                stub->fn.readv_zcopy (stub->frame, stub->frame->this,
+                               stub->args.fd, stub->args.vector,
+                               stub->args.count, stub->args.offset,
+                               stub->args.flags, stub->args.xdata);
+                break;
         case GF_FOP_WRITE:
                 stub->fn.writev (stub->frame, stub->frame->this,
                                 stub->args.fd, stub->args.vector,
@@ -2439,6 +2497,11 @@ call_resume_unwind (call_stub_t *stub)
                             stub->args_cbk.count, &stub->args_cbk.stat,
                             stub->args_cbk.iobref, stub->args_cbk.xdata);
                break;
+        case GF_FOP_READ_ZCOPY:
+               STUB_UNWIND (stub, readv_zcopy, stub->args_cbk.vector,
+                            stub->args_cbk.count, &stub->args_cbk.stat,
+                            stub->args_cbk.iobref, stub->args_cbk.xdata);
+               break;
         case GF_FOP_WRITE:
                STUB_UNWIND (stub, writev, &stub->args_cbk.prestat,
                             &stub->args_cbk.poststat, stub->args_cbk.xdata);
diff --git a/libglusterfs/src/call-stub.h b/libglusterfs/src/call-stub.h
index 3351118..ad03f6b 100644
--- a/libglusterfs/src/call-stub.h
+++ b/libglusterfs/src/call-stub.h
@@ -69,6 +69,7 @@ typedef struct {
                fop_fxattrop_t fxattrop;
                fop_setattr_t setattr;
                fop_fsetattr_t fsetattr;
+               fop_readv_zcopy_t readv_zcopy;
        } fn;

        union {
@@ -113,6 +114,7 @@ typedef struct {
                fop_fxattrop_cbk_t fxattrop;
                fop_setattr_cbk_t setattr;
                fop_fsetattr_cbk_t fsetattr;
+               fop_readv_zcopy_cbk_t readv_zcopy;
        } fn_cbk;

        struct {
@@ -410,6 +412,21 @@ fop_readv_cbk_stub (call_frame_t *frame,
                     struct iobref *iobref, dict_t *xdata);

 call_stub_t *
+fop_readv_zcopy_stub (call_frame_t *frame,
+               fop_readv_zcopy_t fn,
+               fd_t *fd, struct iovec *vector,
+               int32_t count,
+               off_t off, uint32_t flags, dict_t *xdata);
+
+call_stub_t *
+fop_readv_zcopy_cbk_stub (call_frame_t *frame,
+                   fop_readv_zcopy_cbk_t fn,
+                   int32_t op_ret,
+                   int32_t op_errno,
+                   struct iatt *stbuf,
+                    struct iobref *iobref, dict_t *xdata);
+
+call_stub_t *
 fop_writev_stub (call_frame_t *frame,
                 fop_writev_t fn,
                 fd_t *fd,
diff --git a/libglusterfs/src/globals.c b/libglusterfs/src/globals.c
index 05ff52c..8bb6458 100644
--- a/libglusterfs/src/globals.c
+++ b/libglusterfs/src/globals.c
@@ -67,6 +67,7 @@ const char *gf_fop_list[GF_FOP_MAXVALUE] = {
         [GF_FOP_RELEASE]     = "RELEASE",
         [GF_FOP_RELEASEDIR]  = "RELEASEDIR",
         [GF_FOP_FREMOVEXATTR]= "FREMOVEXATTR",
+        [GF_FOP_READ_ZCOPY]  = "READ_ZCOPY",
 };
 /* THIS */

diff --git a/libglusterfs/src/glusterfs.h b/libglusterfs/src/glusterfs.h
index 74e6847..ef6067a 100644
--- a/libglusterfs/src/glusterfs.h
+++ b/libglusterfs/src/glusterfs.h
@@ -196,6 +196,7 @@ typedef enum {
         GF_FOP_RELEASEDIR,
         GF_FOP_GETSPEC,
         GF_FOP_FREMOVEXATTR,
+        GF_FOP_READ_ZCOPY,
         GF_FOP_MAXVALUE,
 } glusterfs_fop_t;

diff --git a/libglusterfs/src/syncop.c b/libglusterfs/src/syncop.c
index c996b8f..0596188 100644
--- a/libglusterfs/src/syncop.c
+++ b/libglusterfs/src/syncop.c
@@ -1095,6 +1095,43 @@ out:

 }

+int32_t
+syncop_readv_zcopy_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
+                  int32_t op_ret, int32_t op_errno, struct iatt *stbuf,
+                 struct iobref *iobref, dict_t *xdata)
+{
+        struct syncargs *args = NULL;
+
+        args = cookie;
+
+        INIT_LIST_HEAD (&args->entries.list);
+
+        args->op_ret   = op_ret;
+        args->op_errno = op_errno;
+
+        __wake (args);
+
+        return 0;
+
+}
+
+int
+syncop_readv_zcopy (xlator_t *subvol, fd_t *fd, off_t off,
+              uint32_t flags, struct iovec *vector, int count,
+              struct iobref **iobref)
+{
+        struct syncargs args = {0, };
+
+        SYNCOP (subvol, (&args), syncop_readv_zcopy_cbk,
+               subvol->fops->readv_zcopy, fd, vector, count, off, flags, NULL);
+
+        if (args.op_ret < 0)
+                goto out;
+out:
+        errno = args.op_errno;
+        return args.op_ret;
+}
+
 int
 syncop_writev_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
                    int op_ret, int op_errno, struct iatt *prebuf,
diff --git a/libglusterfs/src/syncop.h b/libglusterfs/src/syncop.h
index 001c68f..61c4222 100644
--- a/libglusterfs/src/syncop.h
+++ b/libglusterfs/src/syncop.h
@@ -252,6 +252,10 @@ int syncop_readv (xlator_t *subvol, fd_t *fd,
size_t size, off_t off,
                   uint32_t flags,
                   /* out */
                   struct iovec **vector, int *count, struct iobref **iobref);
+int syncop_readv_zcopy (xlator_t *subvol, fd_t *fd, off_t off,
+                  uint32_t flags,
+                  /* out */
+                  struct iovec *vector, int count, struct iobref **iobref);

 int syncop_ftruncate (xlator_t *subvol, fd_t *fd, off_t offset);
 int syncop_truncate (xlator_t *subvol, loc_t *loc, off_t offset);
diff --git a/libglusterfs/src/xlator.h b/libglusterfs/src/xlator.h
index 1e21b63..30884c3 100644
--- a/libglusterfs/src/xlator.h
+++ b/libglusterfs/src/xlator.h
@@ -261,6 +261,16 @@ typedef int32_t (*fop_readv_cbk_t) (call_frame_t *frame,
                                     struct iatt *stbuf,
                                     struct iobref *iobref, dict_t *xdata);

+typedef int32_t (*fop_readv_zcopy_cbk_t) (call_frame_t *frame,
+                                    void *cookie,
+                                    xlator_t *this,
+                                    int32_t op_ret,
+                                    int32_t op_errno,
+                                    struct iovec *vector,
+                                    int32_t count,
+                                    struct iatt *stbuf,
+                                    struct iobref *iobref, dict_t *xdata);
+
 typedef int32_t (*fop_writev_cbk_t) (call_frame_t *frame,
                                      void *cookie,
                                      xlator_t *this,
@@ -501,6 +511,14 @@ typedef int32_t (*fop_readv_t) (call_frame_t *frame,
                                 off_t offset,
                                 uint32_t flags, dict_t *xdata);

+typedef int32_t (*fop_readv_zcopy_t) (call_frame_t *frame,
+                                xlator_t *this,
+                                fd_t *fd,
+                               struct iovec *vector,
+                               int32_t count,
+                                off_t offset,
+                                uint32_t flags, dict_t *xdata);
+
 typedef int32_t (*fop_writev_t) (call_frame_t *frame,
                                  xlator_t *this,
                                  fd_t *fd,
@@ -678,6 +696,7 @@ struct xlator_fops {
         fop_setattr_t        setattr;
         fop_fsetattr_t       fsetattr;
         fop_getspec_t        getspec;
+        fop_readv_zcopy_t    readv_zcopy;

         /* these entries are used for a typechecking hack in
STACK_WIND _only_ */
         fop_lookup_cbk_t         lookup_cbk;
@@ -722,6 +741,7 @@ struct xlator_fops {
         fop_setattr_cbk_t        setattr_cbk;
         fop_fsetattr_cbk_t       fsetattr_cbk;
         fop_getspec_cbk_t        getspec_cbk;
+        fop_readv_zcopy_cbk_t    readv_zcopy_cbk;
 };

 typedef int32_t (*cbk_forget_t) (xlator_t *this,
diff --git a/rpc/rpc-lib/src/protocol-common.h
b/rpc/rpc-lib/src/protocol-common.h
index 97017e5..e65d806 100644
--- a/rpc/rpc-lib/src/protocol-common.h
+++ b/rpc/rpc-lib/src/protocol-common.h
@@ -56,6 +56,7 @@ enum gf_fop_procnum {
         GFS3_OP_RELEASE,
         GFS3_OP_RELEASEDIR,
         GFS3_OP_FREMOVEXATTR,
+        GFS3_OP_READ_ZCOPY,
         GFS3_OP_MAXVALUE,
 } ;

diff --git a/rpc/rpc-lib/src/rpc-clnt.c b/rpc/rpc-lib/src/rpc-clnt.c
index e6c681d..212f4c7 100644
--- a/rpc/rpc-lib/src/rpc-clnt.c
+++ b/rpc/rpc-lib/src/rpc-clnt.c
@@ -466,6 +466,8 @@ rpc_clnt_fill_request_info (struct rpc_clnt *clnt,
rpc_request_info_t *info)
         info->progver = saved_frame.rpcreq->prog->progver;
         info->rpc_req = saved_frame.rpcreq;
         info->rsp     = saved_frame.rsp;
+       info->rsp_payload = saved_frame.rpcreq->rsp_payload;
+       info->rsp_payload_count = saved_frame.rpcreq->rsp_payload_count;

         ret = 0;
 out:
@@ -1437,6 +1439,11 @@ rpc_clnt_submit (struct rpc_clnt *rpc,
rpc_clnt_prog_t *prog,
         rpcreq->xid = callid;
         rpcreq->cbkfn = cbkfn;

+       memcpy(rpcreq->rsp_payload, rsp_payload,
+               rsp_payload_count * sizeof (struct iovec));
+        rpcreq->rsp_payload_count = rsp_payload_count;
+       
+
         ret = -1;

         if (proghdr) {
diff --git a/rpc/rpc-lib/src/rpc-clnt.h b/rpc/rpc-lib/src/rpc-clnt.h
index 0da1655..710951e 100644
--- a/rpc/rpc-lib/src/rpc-clnt.h
+++ b/rpc/rpc-lib/src/rpc-clnt.h
@@ -161,6 +161,8 @@ struct rpc_req {
         int                    procnum;
         fop_cbk_fn_t           cbkfn;
         void                  *conn_private;
+       struct iovec           rsp_payload[256]; /* TODO: Allocate this */
+       int32_t                rsp_payload_count;
 };

 typedef struct rpc_clnt {
diff --git a/rpc/rpc-lib/src/rpc-transport.h b/rpc/rpc-lib/src/rpc-transport.h
index 272de9d..8bc2d18 100644
--- a/rpc/rpc-lib/src/rpc-transport.h
+++ b/rpc/rpc-lib/src/rpc-transport.h
@@ -158,6 +158,9 @@ struct rpc_request_info {
         int                 procnum;
         void               *rpc_req; /* struct rpc_req */
         rpc_transport_rsp_t rsp;
+       /* TODO: This should ideally reside in @rsp above */
+       struct iovec       *rsp_payload;
+       int32_t             rsp_payload_count;
 };
 typedef struct rpc_request_info rpc_request_info_t;

diff --git a/rpc/rpc-transport/socket/src/socket.c
b/rpc/rpc-transport/socket/src/socket.c
index fffc137..1a7302e 100644
--- a/rpc/rpc-transport/socket/src/socket.c
+++ b/rpc/rpc-transport/socket/src/socket.c
@@ -378,6 +378,9 @@ __socket_cached_read (rpc_transport_t *this,
struct iovec *opvector, int opcount
                goto uncached;
        }

+       /* TODO */
+       goto uncached;
+
        if (!in->ra_max) {
                /* first call after passing SP_STATE_READING_FRAGHDR */
                in->ra_max = min (RPC_FRAGSIZE (in->fraghdr), GF_SOCKET_RA_MAX);
@@ -1165,6 +1168,72 @@ out:
         return ret;
 }

+static inline int
+__socket_read_simple_msg_zcopy (rpc_transport_t *this)
+{
+        int                           ret            = 0;
+        uint32_t                      remaining_size = 0;
+        size_t                        bytes_read     = 0;
+        socket_private_t             *priv           = NULL;
+        struct gf_sock_incoming      *in             = NULL;
+        struct gf_sock_incoming_frag *frag           = NULL;
+
+        GF_VALIDATE_OR_GOTO ("socket", this, out);
+        GF_VALIDATE_OR_GOTO ("socket", this->private, out);
+
+        priv = this->private;
+
+        in = &priv->incoming;
+        frag = &in->frag;
+
+        switch (frag->simple_state) {
+
+        case SP_STATE_SIMPLE_MSG_INIT:
+                remaining_size = RPC_FRAGSIZE (in->fraghdr) - frag->bytes_read;
+
+                frag->simple_state = SP_STATE_READING_SIMPLE_MSG;
+
+                /* fall through */
+
+        case SP_STATE_READING_SIMPLE_MSG:
+                ret = 0;
+
+                remaining_size = RPC_FRAGSIZE (in->fraghdr) - frag->bytes_read;
+
+                if (remaining_size > 0) {
+                        ret = __socket_readv (this,
+                                              in->pending_vector,
+                                             in->pending_count,
+                                              &in->pending_vector,
+                                              &in->pending_count,
+                                              &bytes_read);
+                }
+
+                if (ret == -1) {
+                        gf_log (this->name, GF_LOG_WARNING,
+                                "reading from socket failed. Error (%s), "
+                                "peer (%s)", strerror (errno),
+                                this->peerinfo.identifier);
+                        break;
+                }
+
+                frag->bytes_read += bytes_read;
+                //frag->fragcurrent += bytes_read;
+
+                if (ret > 0) {
+                        gf_log (this->name, GF_LOG_TRACE,
+                                "partial read on non-blocking socket.");
+                        break;
+                }
+
+                if (ret == 0) {
+                        frag->simple_state =  SP_STATE_SIMPLE_MSG_INIT;
+                }
+        }
+
+out:
+        return ret;
+}

 static inline int
 __socket_read_simple_request (rpc_transport_t *this)
@@ -1510,6 +1579,17 @@ __socket_read_accepted_successful_reply
(rpc_transport_t *this)

         case SP_STATE_READ_PROC_OPAQUE:
         read_proc_opaque:
+               /* Initialize in->pending_vector with user supplied iovec */
+               if (in->request_info &&
+                       in->request_info->procnum == GFS3_OP_READ_ZCOPY) {
+                       in->pending_vector = in->request_info->rsp_payload;
+                       in->pending_count = in->request_info->rsp_payload_count;
+                       frag->call_body.reply.accepted_success_state
+                               = SP_STATE_READ_PROC_HEADER;
+
+                       /* fall through */
+
+               } else {
                 if (in->payload_vector.iov_base == NULL) {

                         size = (RPC_FRAGSIZE (in->fraghdr) - frag->bytes_read);
@@ -1541,12 +1621,17 @@ __socket_read_accepted_successful_reply
(rpc_transport_t *this)

                 frag->call_body.reply.accepted_success_state
                         = SP_STATE_READ_PROC_HEADER;
-
+               }
                 /* fall through */

         case SP_STATE_READ_PROC_HEADER:
                 /* now read the entire remaining msg into new iobuf */
-                ret = __socket_read_simple_msg (this);
+               if (in->request_info &&
+                       in->request_info->procnum == GFS3_OP_READ_ZCOPY) {
+                       ret = __socket_read_simple_msg_zcopy (this);
+               } else {
+                       ret = __socket_read_simple_msg (this);
+               }
                 if ((ret == -1)
                     || ((ret == 0) && RPC_LASTFRAG (in->fraghdr))) {
                         frag->call_body.reply.accepted_success_state
@@ -1797,7 +1882,8 @@ __socket_read_reply (rpc_transport_t *this)
         }

         if ((request_info->prognum == GLUSTER_FOP_PROGRAM)
-            && (request_info->procnum == GF_FOP_READ)) {
+            && (request_info->procnum == GF_FOP_READ ||
+               request_info->procnum == GFS3_OP_READ_ZCOPY)) {
                 if (map_xid && request_info->rsp.rsp_payload_count != 0) {
                         in->iobref = iobref_ref (request_info->rsp.rsp_iobref);
                         in->payload_vector = *request_info->rsp.rsp_payload;
@@ -1976,7 +2062,14 @@ __socket_proto_state_machine (rpc_transport_t *this,
                         /* fall through */

                 case SP_STATE_READ_FRAGHDR:
-
+                       /*
+                        * TODO:
+                        * IIUC, Memory is allocated for entire payload here,
+                        * but in case of readv, payload memory is allocated
+                        * again in __socket_read_accepted_successful_reply().
+                        * The latter one is actually used to return data to
+                        * the caller.
+                        */
                         in->fraghdr = ntoh32 (in->fraghdr);
                         in->total_bytes_read += RPC_FRAGSIZE(in->fraghdr);
                         iobuf = iobuf_get2 (this->ctx->iobuf_pool,
@@ -2059,6 +2152,8 @@ __socket_proto_state_machine (rpc_transport_t *this,
                                 in->request_info = NULL;
                         }
                         in->record_state = SP_STATE_COMPLETE;
+                       in->pending_vector = NULL;
+                       in->pending_count = 0;
                         break;

                 case SP_STATE_COMPLETE:
diff --git a/xlators/cluster/dht/src/dht-common.h
b/xlators/cluster/dht/src/dht-common.h
index bd00089..e0271f4 100644
--- a/xlators/cluster/dht/src/dht-common.h
+++ b/xlators/cluster/dht/src/dht-common.h
@@ -560,6 +560,13 @@ int32_t dht_readv (call_frame_t *frame,
                    size_t    size,
                    off_t     offset, uint32_t flags, dict_t *xdata);

+int32_t dht_readv_zcopy (call_frame_t *frame,
+                   xlator_t *this,
+                   fd_t     *fd,
+                  struct iovec *vector,
+                  int32_t count,
+                   off_t     offset, uint32_t flags, dict_t *xdata);
+
 int32_t dht_writev (call_frame_t      *frame,
                     xlator_t      *this,
                     fd_t          *fd,
diff --git a/xlators/cluster/dht/src/dht-inode-read.c
b/xlators/cluster/dht/src/dht-inode-read.c
index f17cb73..9438c99 100644
--- a/xlators/cluster/dht/src/dht-inode-read.c
+++ b/xlators/cluster/dht/src/dht-inode-read.c
@@ -420,6 +420,52 @@ out:
 }

 int
+dht_readv_zcopy_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
+               int op_ret, int op_errno,
+               struct iatt *stbuf,
+               struct iobref *iobref, dict_t *xdata)
+{
+        dht_local_t *local      = NULL;
+        int          ret        = 0;
+
+        local = frame->local;
+        if (!local) {
+                op_ret = -1;
+                op_errno = EINVAL;
+                goto out;
+        }
+
+        /* This is already second try, no need for re-check */
+        if (local->call_cnt != 1)
+                goto out;
+
+        if ((op_ret == -1) && (op_errno != ENOENT))
+                goto out;
+
+        if ((op_ret == -1) || IS_DHT_MIGRATION_PHASE2 (stbuf)) {
+                /* File would be migrated to other node */
+                ret = fd_ctx_get (local->fd, this, NULL);
+                if (ret) {
+                        local->rebalance.target_op_fn = dht_readv2;
+                        ret = dht_rebalance_complete_check (this, frame);
+                } else {
+                        /* value is already set in fd_ctx, that means no need
+                           to check for whether its complete or not. */
+                        dht_readv2 (this, frame, 0);
+                }
+                if (!ret)
+                        return 0;
+        }
+
+out:
+        DHT_STRIP_PHASE1_FLAGS (stbuf);
+        DHT_STACK_UNWIND (readv, frame, op_ret, op_errno, NULL, 0, stbuf,
+                          iobref, xdata);
+
+        return 0;
+}
+
+int
 dht_readv2 (xlator_t *this, call_frame_t *frame, int op_ret)
 {
         dht_local_t *local  = NULL;
@@ -493,6 +539,52 @@ err:
 }

 int
+dht_readv_zcopy (call_frame_t *frame, xlator_t *this,
+           fd_t *fd, struct iovec *vector, int32_t count, off_t off,
+          uint32_t flags, dict_t *xdata)
+{
+        xlator_t     *subvol = NULL;
+        int           op_errno = -1;
+        dht_local_t  *local = NULL;
+       size_t size;
+
+        VALIDATE_OR_GOTO (frame, err);
+        VALIDATE_OR_GOTO (this, err);
+        VALIDATE_OR_GOTO (fd, err);
+
+        local = dht_local_init (frame, NULL, fd, GF_FOP_READ_ZCOPY);
+        if (!local) {
+                op_errno = ENOMEM;
+                goto err;
+        }
+
+        subvol = local->cached_subvol;
+        if (!subvol) {
+                gf_log (this->name, GF_LOG_DEBUG,
+                        "no cached subvolume for fd=%p", fd);
+                op_errno = EINVAL;
+                goto err;
+        }
+
+       size = iov_length(vector, count);
+        local->rebalance.offset = off;
+        local->rebalance.size   = size;
+        local->rebalance.flags  = flags;
+        local->call_cnt = 1;
+
+        STACK_WIND (frame, dht_readv_zcopy_cbk,
+                    subvol, subvol->fops->readv_zcopy,
+                    fd, vector, count, off, flags, xdata);
+
+        return 0;
+
+err:
+        op_errno = (op_errno == -1) ? errno : op_errno;
+        DHT_STACK_UNWIND (readv, frame, -1, op_errno, NULL, 0, NULL,
NULL, NULL);
+
+        return 0;
+}
+int
 dht_access_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
                 int op_ret, int op_errno, dict_t *xdata)
 {
diff --git a/xlators/cluster/dht/src/dht.c b/xlators/cluster/dht/src/dht.c
index 784ed92..1289b0f 100644
--- a/xlators/cluster/dht/src/dht.c
+++ b/xlators/cluster/dht/src/dht.c
@@ -599,6 +599,7 @@ struct xlator_fops fops = {
         .fxattrop    = dht_fxattrop,
         .setattr     = dht_setattr,
         .fsetattr    = dht_fsetattr,
+        .readv_zcopy = dht_readv_zcopy,
 };

 struct xlator_dumpops dumpops = {
diff --git a/xlators/debug/error-gen/src/error-gen.c
b/xlators/debug/error-gen/src/error-gen.c
index 6bdb041..f62784d 100644
--- a/xlators/debug/error-gen/src/error-gen.c
+++ b/xlators/debug/error-gen/src/error-gen.c
@@ -174,7 +174,10 @@ sys_error_t error_no_list[] = {
                                                  EROFS,EBADF,EIO}},
         [GF_FOP_GETSPEC]           = { .error_no_count = 4,
                                     .error_no = {EACCES,EBADF,ENAMETOOLONG,
-                                                 EINTR}}
+                                                 EINTR}},
+        [GF_FOP_READ_ZCOPY]        = { .error_no_count = 5,
+                                    .error_no = {EINVAL,EBADF,EFAULT,EISDIR,
+                                                 ENAMETOOLONG}}
 };

 int
@@ -275,6 +278,8 @@ get_fop_int (char **op_no_str)
                 return GF_FOP_OPEN;
         else if (!strcmp ((*op_no_str), "readv"))
                 return GF_FOP_READ;
+        else if (!strcmp ((*op_no_str), "readv_zcopy"))
+                return GF_FOP_READ_ZCOPY;
         else if (!strcmp ((*op_no_str), "writev"))
                 return GF_FOP_WRITE;
         else if (!strcmp ((*op_no_str), "statfs"))
@@ -1082,6 +1087,47 @@ error_gen_readv (call_frame_t *frame, xlator_t *this,
         return 0;
 }

+int
+error_gen_readv_zcopy_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
+                    int32_t op_ret, int32_t op_errno,
+                    struct iatt *stbuf, struct iobref *iobref, dict_t *xdata)
+{
+       STACK_UNWIND_STRICT (readv_zcopy, frame, op_ret, op_errno,
+                             NULL, 0, stbuf, iobref, xdata);
+        return 0;
+}
+
+
+int
+error_gen_readv_zcopy (call_frame_t *frame, xlator_t *this,
+                fd_t *fd, struct iovec *vector, int32_t count, off_t offset,
+                uint32_t flags, dict_t *xdata)
+{
+       int              op_errno = 0;
+        eg_t            *egp = NULL;
+        int              enable = 1;
+
+        egp = this->private;
+        enable = egp->enable[GF_FOP_READ_ZCOPY];
+
+        if (enable)
+                op_errno = error_gen (this, GF_FOP_READ_ZCOPY);
+
+       if (op_errno) {
+               GF_ERROR(this, "unwind(-1, %s)", strerror (op_errno));
+               STACK_UNWIND_STRICT (readv_zcopy, frame, -1, op_errno, NULL, 0,
+                                     NULL, NULL, xdata);
+        return 0;
+       }
+
+
+       STACK_WIND (frame, error_gen_readv_zcopy_cbk,
+                   FIRST_CHILD(this),
+                   FIRST_CHILD(this)->fops->readv_zcopy,
+                   fd, vector, count, offset, flags, xdata);
+        return 0;
+}
+

 int
 error_gen_writev_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
@@ -2148,6 +2194,7 @@ struct xlator_fops fops = {
        .create      = error_gen_create,
        .open        = error_gen_open,
        .readv       = error_gen_readv,
+       .readv_zcopy = error_gen_readv_zcopy,
        .writev      = error_gen_writev,
        .statfs      = error_gen_statfs,
        .flush       = error_gen_flush,
diff --git a/xlators/debug/io-stats/src/io-stats.c
b/xlators/debug/io-stats/src/io-stats.c
index 63bb8fa..f3d6660 100644
--- a/xlators/debug/io-stats/src/io-stats.c
+++ b/xlators/debug/io-stats/src/io-stats.c
@@ -1346,6 +1346,40 @@ io_stats_readv_cbk (call_frame_t *frame, void
*cookie, xlator_t *this,

 }

+int
+io_stats_readv_zcopy_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
+                    int32_t op_ret, int32_t op_errno,
+                    struct iatt *buf, struct iobref *iobref, dict_t *xdata)
+{
+        //int              len = 0;
+        fd_t            *fd = NULL;
+        struct ios_stat *iosstat = NULL;
+
+        fd = frame->local;
+        frame->local = NULL;
+
+#if 0
+        if (op_ret > 0) {
+                len = iov_length (vector, count);
+                BUMP_READ (fd, len);
+        }
+#endif
+
+        UPDATE_PROFILE_STATS (frame, READ);
+        ios_inode_ctx_get (fd->inode, this, &iosstat);
+
+        if (iosstat) {
+              BUMP_STATS (iosstat, IOS_STATS_TYPE_READ);
+              BUMP_THROUGHPUT (iosstat, IOS_STATS_THRU_READ);
+              iosstat = NULL;
+        }
+
+        STACK_UNWIND_STRICT (readv, frame, op_ret, op_errno,
+                             NULL, 0, buf, iobref, xdata);
+        return 0;
+
+}
+

 int
 io_stats_writev_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
@@ -2074,6 +2108,23 @@ io_stats_readv (call_frame_t *frame, xlator_t *this,


 int
+io_stats_readv_zcopy (call_frame_t *frame, xlator_t *this,
+                fd_t *fd, struct iovec *vector, int32_t count, off_t offset,
+               uint32_t flags, dict_t *xdata)
+{
+        frame->local = fd;
+
+        START_FOP_LATENCY (frame);
+
+        STACK_WIND (frame, io_stats_readv_zcopy_cbk,
+                    FIRST_CHILD(this),
+                    FIRST_CHILD(this)->fops->readv_zcopy,
+                    fd, vector, count, offset, flags, xdata);
+        return 0;
+}
+
+
+int
 io_stats_writev (call_frame_t *frame, xlator_t *this,
                  fd_t *fd, struct iovec *vector,
                  int32_t count, off_t offset,
@@ -2790,6 +2841,7 @@ struct xlator_fops fops = {
         .truncate    = io_stats_truncate,
         .open        = io_stats_open,
         .readv       = io_stats_readv,
+        .readv_zcopy = io_stats_readv_zcopy,
         .writev      = io_stats_writev,
         .statfs      = io_stats_statfs,
         .flush       = io_stats_flush,
diff --git a/xlators/performance/md-cache/src/md-cache.c
b/xlators/performance/md-cache/src/md-cache.c
index 0c5ca87..0c3d6b2 100644
--- a/xlators/performance/md-cache/src/md-cache.c
+++ b/xlators/performance/md-cache/src/md-cache.c
@@ -1385,6 +1385,48 @@ mdc_readv (call_frame_t *frame, xlator_t *this,
fd_t *fd, size_t size,
         return 0;
 }

+int
+mdc_readv_zcopy_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
+               int32_t op_ret, int32_t op_errno,
+               struct iatt *stbuf, struct iobref *iobref, dict_t *xdata)
+{
+        mdc_local_t  *local = NULL;
+
+        local = frame->local;
+
+        if (op_ret != 0)
+                goto out;
+
+        if (!local)
+                goto out;
+
+        mdc_inode_iatt_set (this, local->fd->inode, stbuf);
+
+out:
+        MDC_STACK_UNWIND (readv, frame, op_ret, op_errno, NULL, 0,
+                          stbuf, iobref, xdata);
+
+        return 0;
+}
+
+
+int
+mdc_readv_zcopy (call_frame_t *frame, xlator_t *this, fd_t *fd,
+          struct iovec *vector, int32_t count, off_t offset, uint32_t flags,
+          dict_t *xdata)
+{
+        mdc_local_t  *local = NULL;
+
+        local = mdc_local_get (frame);
+
+        local->fd = fd_ref (fd);
+
+        STACK_WIND (frame, mdc_readv_zcopy_cbk,
+                    FIRST_CHILD(this), FIRST_CHILD(this)->fops->readv_zcopy,
+                    fd, vector, count, offset, flags, xdata);
+        return 0;
+}
+

 int
 mdc_writev_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
@@ -1947,6 +1989,7 @@ struct xlator_fops fops = {
         .link        = mdc_link,
         .create      = mdc_create,
         .readv       = mdc_readv,
+        .readv_zcopy = mdc_readv_zcopy,
         .writev      = mdc_writev,
         .setattr     = mdc_setattr,
         .fsetattr    = mdc_fsetattr,
diff --git a/xlators/protocol/client/src/client-rpc-fops.c
b/xlators/protocol/client/src/client-rpc-fops.c
index f524c1a..b05c003 100644
--- a/xlators/protocol/client/src/client-rpc-fops.c
+++ b/xlators/protocol/client/src/client-rpc-fops.c
@@ -2712,6 +2712,79 @@ out:
 }

 int
+client3_3_readv_zcopy_cbk (struct rpc_req *req, struct iovec *iov, int count,
+                     void *myframe)
+{
+        call_frame_t   *frame  = NULL;
+        struct iobref  *iobref = NULL;
+        struct iovec    vector[MAX_IOVEC] = {{0}, };
+        struct iatt     stat   = {0,};
+        gfs3_read_rsp   rsp    = {0,};
+        int             ret    = 0, rspcount = 0;
+        clnt_local_t   *local  = NULL;
+        xlator_t *this       = NULL;
+        dict_t  *xdata       = NULL;
+
+        this = THIS;
+
+        memset (vector, 0, sizeof (vector));
+
+        frame = myframe;
+        local = frame->local;
+
+        if (-1 == req->rpc_status) {
+                rsp.op_ret   = -1;
+                rsp.op_errno = ENOTCONN;
+                goto out;
+        }
+
+        ret = xdr_to_generic (*iov, &rsp, (xdrproc_t)xdr_gfs3_read_rsp);
+        if (ret < 0) {
+                gf_log (this->name, GF_LOG_ERROR, "XDR decoding failed");
+                rsp.op_ret   = -1;
+                rsp.op_errno = EINVAL;
+                goto out;
+        }
+
+        if (rsp.op_ret != -1) {
+                iobref = req->rsp_iobref;
+                gf_stat_to_iatt (&rsp.stat, &stat);
+
+                vector[0].iov_len = rsp.op_ret;
+                if (rsp.op_ret > 0)
+                        vector[0].iov_base = req->rsp[1].iov_base;
+                rspcount = 1;
+        }
+        GF_PROTOCOL_DICT_UNSERIALIZE (this, xdata, (rsp.xdata.xdata_val),
+                                      (rsp.xdata.xdata_len), ret,
+                                      rsp.op_errno, out);
+
+#ifdef GF_TESTING_IO_XDATA
+        dict_dump (xdata);
+#endif
+
+out:
+        if (rsp.op_ret == -1) {
+                gf_log (this->name, GF_LOG_WARNING,
+                        "remote operation failed: %s",
+                        strerror (gf_error_to_errno (rsp.op_errno)));
+        } else if (rsp.op_ret >= 0) {
+                if (local->attempt_reopen)
+                        client_attempt_reopen (local->fd, this);
+        }
+        CLIENT_STACK_UNWIND (readv_zcopy, frame, rsp.op_ret,
+                             gf_error_to_errno (rsp.op_errno),
vector, rspcount,
+                             &stat, iobref, xdata);
+
+        free (rsp.xdata.xdata_val);
+
+        if (xdata)
+                dict_unref (xdata);
+
+        return 0;
+}
+
+int
 client3_3_release_cbk (struct rpc_req *req, struct iovec *iov, int count,
                        void *myframe)
 {
@@ -3955,6 +4028,7 @@ client3_3_readv (call_frame_t *frame, xlator_t *this,
         }

         local->iobref = rsp_iobref;
+        local->iobref = NULL;
         rsp_iobref = NULL;

         GF_PROTOCOL_DICT_SERIALIZE (this, args->xdata, (&req.xdata.xdata_val),
@@ -3986,6 +4060,65 @@ unwind:
         return 0;
 }

+int32_t
+client3_3_readv_zcopy (call_frame_t *frame, xlator_t *this,
+                 void *data)
+{
+        clnt_args_t    *args       = NULL;
+        int64_t         remote_fd  = -1;
+        clnt_conf_t    *conf       = NULL;
+        clnt_local_t   *local      = NULL;
+        int             op_errno   = ESTALE;
+        gfs3_read_req   req        = {{0,},};
+        int             ret        = 0;
+
+        if (!frame || !this || !data)
+                goto unwind;
+
+        args = data;
+        conf = this->private;
+
+        CLIENT_GET_REMOTE_FD (this, args->fd, FALLBACK_TO_ANON_FD,
+                              remote_fd, op_errno, unwind);
+        ret = client_fd_fop_prepare_local (frame, args->fd, remote_fd);
+        if (ret) {
+                op_errno = -ret;
+                goto unwind;
+        }
+        local = frame->local;
+
+        req.size   = args->size;
+        req.offset = args->offset;
+        req.fd     = remote_fd;
+        req.flag   = args->flags;
+
+        memcpy (req.gfid, args->fd->inode->gfid, 16);
+
+        GF_PROTOCOL_DICT_SERIALIZE (this, args->xdata, (&req.xdata.xdata_val),
+                                    req.xdata.xdata_len, op_errno, unwind);
+
+        ret = client_submit_request (this, &req, frame, conf->fops,
+                                     GFS3_OP_READ_ZCOPY,
client3_3_readv_zcopy_cbk, NULL,
+                                     NULL, 0, args->payload_vector,
+                                    args->payload_count,
+                                     local->iobref,
+                                     (xdrproc_t)xdr_gfs3_read_req);
+        if (ret) {
+                //unwind is done in the cbk
+                gf_log (this->name, GF_LOG_WARNING, "failed to send the fop");
+        }
+
+        GF_FREE (req.xdata.xdata_val);
+
+        return 0;
+unwind:
+
+        CLIENT_STACK_UNWIND (readv_zcopy, frame, -1, op_errno, NULL,
0, NULL, NULL, NULL);
+        GF_FREE (req.xdata.xdata_val);
+
+        return 0;
+}
+

 int32_t
 client3_3_writev (call_frame_t *frame, xlator_t *this, void *data)
@@ -5845,6 +5978,7 @@ rpc_clnt_procedure_t
clnt3_3_fop_actors[GF_FOP_MAXVALUE] = {
         [GF_FOP_RELEASEDIR]  = { "RELEASEDIR",  client3_3_releasedir },
         [GF_FOP_GETSPEC]     = { "GETSPEC",     client3_getspec },
         [GF_FOP_FREMOVEXATTR] = { "FREMOVEXATTR", client3_3_fremovexattr },
+        [GF_FOP_READ_ZCOPY]  = { "READ_ZCOPY",  client3_3_readv_zcopy },
 };

 /* Used From RPC-CLNT library to log proper name of procedure based
on number */
@@ -5893,6 +6027,7 @@ char *clnt3_3_fop_names[GFS3_OP_MAXVALUE] = {
         [GFS3_OP_RELEASE]     = "RELEASE",
         [GFS3_OP_RELEASEDIR]  = "RELEASEDIR",
         [GFS3_OP_FREMOVEXATTR] = "FREMOVEXATTR",
+        [GFS3_OP_READ_ZCOPY]  = "READ_ZCOPY",
 };

 rpc_clnt_prog_t clnt3_3_fop_prog = {
diff --git a/xlators/protocol/client/src/client.c
b/xlators/protocol/client/src/client.c
index 931c671..5d4e9d1 100644
--- a/xlators/protocol/client/src/client.c
+++ b/xlators/protocol/client/src/client.c
@@ -923,6 +923,46 @@ out:
 }


+int32_t
+client_readv_zcopy (call_frame_t *frame, xlator_t *this, fd_t *fd,
+             struct iovec *vector, int32_t count,
+              off_t offset, uint32_t flags, dict_t *xdata)
+{
+        int          ret  = -1;
+        clnt_conf_t *conf = NULL;
+        rpc_clnt_procedure_t *proc = NULL;
+        clnt_args_t  args = {0,};
+
+        conf = this->private;
+        if (!conf || !conf->fops)
+                goto out;
+
+        args.fd     = fd;
+        args.size   = iov_length(vector, count);
+        args.offset = offset;
+        args.flags  = flags;
+        args.xdata = xdata;
+        args.payload_vector = vector;
+       args.payload_count = count;
+
+        proc = &conf->fops->proctable[GF_FOP_READ_ZCOPY];
+        if (!proc) {
+                gf_log (this->name, GF_LOG_ERROR,
+                        "rpc procedure not found for %s",
+                        gf_fop_list[GF_FOP_READ]);
+                goto out;
+        }
+        if (proc->fn)
+                ret = proc->fn (frame, this, &args);
+
+out:
+        if (ret)
+                STACK_UNWIND_STRICT (readv_zcopy, frame, -1, ENOTCONN,
+                                     NULL, 0, NULL, NULL, NULL);
+
+       return 0;
+}
+


 int32_t
@@ -2636,6 +2676,7 @@ struct xlator_fops fops = {
         .truncate    = client_truncate,
         .open        = client_open,
         .readv       = client_readv,
+        .readv_zcopy = client_readv_zcopy,
         .writev      = client_writev,
         .statfs      = client_statfs,
         .flush       = client_flush,
diff --git a/xlators/protocol/client/src/client.h
b/xlators/protocol/client/src/client.h
index 0a27c09..39c3bf8 100644
--- a/xlators/protocol/client/src/client.h
+++ b/xlators/protocol/client/src/client.h
@@ -195,6 +195,8 @@ typedef struct client_args {

         mode_t              umask;
         dict_t             *xdata;
+       struct iovec       *payload_vector;
+       int32_t             payload_count;
 } clnt_args_t;

 typedef ssize_t (*gfs_serialize_t) (struct iovec outmsg, void *args);
diff --git a/xlators/protocol/server/src/server-rpc-fops.c
b/xlators/protocol/server/src/server-rpc-fops.c
index f44ced4..f16187d 100644
--- a/xlators/protocol/server/src/server-rpc-fops.c
+++ b/xlators/protocol/server/src/server-rpc-fops.c
@@ -1515,6 +1515,55 @@ out:
 }

 int
+server_readv_zcopy_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
+                  int32_t op_ret, int32_t op_errno,
+                  struct iovec *vector, int32_t count,
+                  struct iatt *stbuf, struct iobref *iobref, dict_t *xdata)
+{
+        gfs3_read_rsp     rsp   = {0,};
+        server_state_t   *state = NULL;
+        rpcsvc_request_t *req   = NULL;
+
+        req = frame->local;
+        state = CALL_STATE(frame);
+
+#ifdef GF_TESTING_IO_XDATA
+        {
+                int ret = 0;
+                if (!xdata)
+                        xdata = dict_new ();
+
+                ret = dict_set_str (xdata, "testing-the-xdata-key",
+                                       "testing-xdata-value");
+        }
+#endif
+        GF_PROTOCOL_DICT_SERIALIZE (this, xdata, (&rsp.xdata.xdata_val),
+                                    rsp.xdata.xdata_len, op_errno, out);
+
+        if (op_ret < 0) {
+                gf_log (this->name, GF_LOG_INFO,
+                        "%"PRId64": READV_ZCOPY %"PRId64" (%s) ==> (%s)",
+                        frame->root->unique, state->resolve.fd_no,
+                        uuid_utoa (state->resolve.gfid), strerror (op_errno));
+                goto out;
+        }
+
+        gf_stat_from_iatt (&rsp.stat, stbuf);
+        rsp.size = op_ret;
+
+out:
+        rsp.op_ret    = op_ret;
+        rsp.op_errno  = gf_errno_to_error (op_errno);
+
+        server_submit_reply (frame, req, &rsp, vector, count, iobref,
+                             (xdrproc_t)xdr_gfs3_read_rsp);
+
+        GF_FREE (rsp.xdata.xdata_val);
+
+        return 0;
+}
+
+int
 server_rchecksum_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
                       int32_t op_ret, int32_t op_errno,
                       uint32_t weak_checksum, uint8_t *strong_checksum,
@@ -2749,6 +2798,27 @@ err:
         return 0;
 }

+int
+server_readv_zcopy_resume (call_frame_t *frame, xlator_t *bound_xl)
+{
+        server_state_t    *state = NULL;
+
+        state = CALL_STATE (frame);
+
+        if (state->resolve.op_ret != 0)
+                goto err;
+
+        STACK_WIND (frame, server_readv_cbk,
+                    bound_xl, bound_xl->fops->readv,
+                    state->fd, state->size, state->offset,
state->flags, state->xdata);
+
+        return 0;
+err:
+        server_readv_cbk (frame, NULL, frame->this, state->resolve.op_ret,
+                          state->resolve.op_errno, NULL, 0, NULL, NULL, NULL);
+        return 0;
+}
+

 int
 server_create_resume (call_frame_t *frame, xlator_t *bound_xl)
@@ -3347,6 +3417,68 @@ out:
         return ret;
 }

+int
+server3_3_readv_zcopy (rpcsvc_request_t *req)
+{
+        server_state_t *state = NULL;
+        call_frame_t   *frame = NULL;
+        gfs3_read_req   args  = {{0,},};
+        int             ret   = -1;
+        int             op_errno = 0;
+
+        if (!req)
+                goto out;
+
+        ret = xdr_to_generic (req->msg[0], &args,
(xdrproc_t)xdr_gfs3_read_req);
+        if (ret < 0) {
+                //failed to decode msg;
+                req->rpc_err = GARBAGE_ARGS;
+                goto out;
+        }
+
+        frame = get_frame_from_request (req);
+        if (!frame) {
+                // something wrong, mostly insufficient memory
+                req->rpc_err = GARBAGE_ARGS; /* TODO */
+                goto out;
+        }
+       /*
+        * TODO: ZCOPY client requests are treated as normal READ requests
+        * in server
+        */
+        frame->root->op = GF_FOP_READ;
+
+        state = CALL_STATE (frame);
+        if (!state->conn->bound_xl) {
+                /* auth failure, request on subvolume without setvolume */
+                req->rpc_err = GARBAGE_ARGS;
+                goto out;
+        }
+
+        state->resolve.type   = RESOLVE_MUST;
+        state->resolve.fd_no  = args.fd;
+        state->size           = args.size;
+        state->offset         = args.offset;
+        state->flags          = args.flag;
+
+        memcpy (state->resolve.gfid, args.gfid, 16);
+
+        GF_PROTOCOL_DICT_UNSERIALIZE (state->conn->bound_xl, state->xdata,
+                                      (args.xdata.xdata_val),
+                                      (args.xdata.xdata_len), ret,
+                                      op_errno, out);
+
+        ret = 0;
+        resolve_and_resume (frame, server_readv_resume);
+out:
+        /* memory allocated by libc, don't use GF_FREE */
+        free (args.xdata.xdata_val);
+
+        if (op_errno)
+                req->rpc_err = GARBAGE_ARGS;
+
+        return ret;
+}

 int
 server3_3_writev (rpcsvc_request_t *req)
@@ -5760,6 +5892,7 @@ rpcsvc_actor_t glusterfs3_3_fop_actors[] = {
         [GFS3_OP_RELEASE]     = { "RELEASE",    GFS3_OP_RELEASE,
server3_3_release, NULL, 0},
         [GFS3_OP_RELEASEDIR]  = { "RELEASEDIR", GFS3_OP_RELEASEDIR,
server3_3_releasedir, NULL, 0},
         [GFS3_OP_FREMOVEXATTR] = { "FREMOVEXATTR",
GFS3_OP_FREMOVEXATTR, server3_3_fremovexattr, NULL, 0},
+        [GFS3_OP_READ_ZCOPY]  = { "READ_ZCOPY", GFS3_OP_READ_ZCOPY,
server3_3_readv_zcopy, NULL, 0},
 };


-- 
http://raobharata.wordpress.com/



reply via email to

[Prev in Thread] Current Thread [Next in Thread]