[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[Commit-gnuradio] [gnuradio] 01/01: gr-zeromq: Big rework for performanc
From: |
git |
Subject: |
[Commit-gnuradio] [gnuradio] 01/01: gr-zeromq: Big rework for performance and correctness |
Date: |
Wed, 27 Jan 2016 21:14:37 +0000 (UTC) |
This is an automated email from the git hooks/post-receive script.
jcorgan pushed a commit to branch maint
in repository gnuradio.
commit 6e482c5bb6bf49f000f6b8d35a1ca84127e38c46
Author: Sylvain Munaut <address@hidden>
Date: Wed Jan 27 12:58:50 2016 +0100
gr-zeromq: Big rework for performance and correctness
- Use class hierarchy trying to maximize code re-use.
- Dont' drop samples on receive if the output buffer doesn't have
enough space.
- Don't drop tags on receive by putting tags in the future.
- Better metadata creation/parsing avoiding copying lots data.
- Always do as much work as possible in a single call to work()
to avoid scheduler overhead as long as possible.
- Allow setting the high watermark to avoid older version of
zeromq's default of buffering infinite messages and causing a
paging thrash to/from disk when the flow graph can't keep up.
Signed-off-by: Sylvain Munaut <address@hidden>
---
gr-zeromq/grc/zeromq_pub_sink.xml | 20 ++-
gr-zeromq/grc/zeromq_pull_source.xml | 20 ++-
gr-zeromq/grc/zeromq_push_sink.xml | 20 ++-
gr-zeromq/grc/zeromq_rep_sink.xml | 20 ++-
gr-zeromq/grc/zeromq_req_source.xml | 20 ++-
gr-zeromq/grc/zeromq_sub_source.xml | 20 ++-
gr-zeromq/include/gnuradio/zeromq/pub_sink.h | 3 +-
gr-zeromq/include/gnuradio/zeromq/pull_source.h | 3 +-
gr-zeromq/include/gnuradio/zeromq/push_sink.h | 3 +-
gr-zeromq/include/gnuradio/zeromq/rep_sink.h | 3 +-
gr-zeromq/include/gnuradio/zeromq/req_source.h | 3 +-
gr-zeromq/include/gnuradio/zeromq/sub_source.h | 3 +-
gr-zeromq/lib/CMakeLists.txt | 1 +
gr-zeromq/lib/base_impl.cc | 198 ++++++++++++++++++++++++
gr-zeromq/lib/base_impl.h | 77 +++++++++
gr-zeromq/lib/pub_sink_impl.cc | 55 +------
gr-zeromq/lib/pub_sink_impl.h | 15 +-
gr-zeromq/lib/pull_source_impl.cc | 93 ++++-------
gr-zeromq/lib/pull_source_impl.h | 15 +-
gr-zeromq/lib/push_sink_impl.cc | 69 ++-------
gr-zeromq/lib/push_sink_impl.h | 15 +-
gr-zeromq/lib/rep_sink_impl.cc | 91 +++++------
gr-zeromq/lib/rep_sink_impl.h | 15 +-
gr-zeromq/lib/req_source_impl.cc | 107 ++++++-------
gr-zeromq/lib/req_source_impl.h | 18 +--
gr-zeromq/lib/sub_source_impl.cc | 94 ++++-------
gr-zeromq/lib/sub_source_impl.h | 23 +--
gr-zeromq/lib/tag_headers.cc | 96 +++++++-----
gr-zeromq/lib/tag_headers.h | 5 +-
29 files changed, 651 insertions(+), 474 deletions(-)
diff --git a/gr-zeromq/grc/zeromq_pub_sink.xml
b/gr-zeromq/grc/zeromq_pub_sink.xml
index 7babc9e..1b2f9ec 100644
--- a/gr-zeromq/grc/zeromq_pub_sink.xml
+++ b/gr-zeromq/grc/zeromq_pub_sink.xml
@@ -4,7 +4,7 @@
<key>zeromq_pub_sink</key>
<category>ZeroMQ Interfaces</category>
<import>from gnuradio import zeromq</import>
- <make>zeromq.pub_sink($type.itemsize, $vlen, $address, $timeout,
$pass_tags)</make>
+ <make>zeromq.pub_sink($type.itemsize, $vlen, $address, $timeout, $pass_tags,
$hwm)</make>
<param>
<name>IO Type</name>
@@ -61,7 +61,23 @@
<name>Pass Tags</name>
<key>pass_tags</key>
<value>False</value>
- <type>bool</type>
+ <type>enum</type>
+ <option>
+ <name>Yes</name>
+ <key>True</key>
+ </option>
+ <option>
+ <name>No</name>
+ <key>False</key>
+ </option>
+ </param>
+
+ <param>
+ <name>High Watermark</name>
+ <key>hwm</key>
+ <value>-1</value>
+ <type>int</type>
+ <hide>#if $hwm() == -1 then 'part' else 'none'#</hide>
</param>
<sink>
diff --git a/gr-zeromq/grc/zeromq_pull_source.xml
b/gr-zeromq/grc/zeromq_pull_source.xml
index c8a7b89..8158b47 100644
--- a/gr-zeromq/grc/zeromq_pull_source.xml
+++ b/gr-zeromq/grc/zeromq_pull_source.xml
@@ -4,7 +4,7 @@
<key>zeromq_pull_source</key>
<category>ZeroMQ Interfaces</category>
<import>from gnuradio import zeromq</import>
- <make>zeromq.pull_source($type.itemsize, $vlen, $address, $timeout,
$pass_tags)</make>
+ <make>zeromq.pull_source($type.itemsize, $vlen, $address, $timeout,
$pass_tags, $hwm)</make>
<param>
<name>IO Type</name>
@@ -61,7 +61,23 @@
<name>Pass Tags</name>
<key>pass_tags</key>
<value>False</value>
- <type>bool</type>
+ <type>enum</type>
+ <option>
+ <name>Yes</name>
+ <key>True</key>
+ </option>
+ <option>
+ <name>No</name>
+ <key>False</key>
+ </option>
+ </param>
+
+ <param>
+ <name>High Watermark</name>
+ <key>hwm</key>
+ <value>-1</value>
+ <type>int</type>
+ <hide>#if $hwm() == -1 then 'part' else 'none'#</hide>
</param>
<source>
diff --git a/gr-zeromq/grc/zeromq_push_sink.xml
b/gr-zeromq/grc/zeromq_push_sink.xml
index eb6ead5..528da94 100644
--- a/gr-zeromq/grc/zeromq_push_sink.xml
+++ b/gr-zeromq/grc/zeromq_push_sink.xml
@@ -4,7 +4,7 @@
<key>zeromq_push_sink</key>
<category>ZeroMQ Interfaces</category>
<import>from gnuradio import zeromq</import>
- <make>zeromq.push_sink($type.itemsize, $vlen, $address, $timeout,
$pass_tags)</make>
+ <make>zeromq.push_sink($type.itemsize, $vlen, $address, $timeout,
$pass_tags, $hwm)</make>
<param>
<name>IO Type</name>
@@ -61,7 +61,23 @@
<name>Pass Tags</name>
<key>pass_tags</key>
<value>False</value>
- <type>bool</type>
+ <type>enum</type>
+ <option>
+ <name>Yes</name>
+ <key>True</key>
+ </option>
+ <option>
+ <name>No</name>
+ <key>False</key>
+ </option>
+ </param>
+
+ <param>
+ <name>High Watermark</name>
+ <key>hwm</key>
+ <value>-1</value>
+ <type>int</type>
+ <hide>#if $hwm() == -1 then 'part' else 'none'#</hide>
</param>
<sink>
diff --git a/gr-zeromq/grc/zeromq_rep_sink.xml
b/gr-zeromq/grc/zeromq_rep_sink.xml
index 2209b4f..db735a3 100644
--- a/gr-zeromq/grc/zeromq_rep_sink.xml
+++ b/gr-zeromq/grc/zeromq_rep_sink.xml
@@ -4,7 +4,7 @@
<key>zeromq_rep_sink</key>
<category>ZeroMQ Interfaces</category>
<import>from gnuradio import zeromq</import>
- <make>zeromq.rep_sink($type.itemsize, $vlen, $address, $timeout,
$pass_tags)</make>
+ <make>zeromq.rep_sink($type.itemsize, $vlen, $address, $timeout, $pass_tags,
$hwm)</make>
<param>
<name>IO Type</name>
@@ -61,7 +61,23 @@
<name>Pass Tags</name>
<key>pass_tags</key>
<value>False</value>
- <type>bool</type>
+ <type>enum</type>
+ <option>
+ <name>Yes</name>
+ <key>True</key>
+ </option>
+ <option>
+ <name>No</name>
+ <key>False</key>
+ </option>
+ </param>
+
+ <param>
+ <name>High Watermark</name>
+ <key>hwm</key>
+ <value>-1</value>
+ <type>int</type>
+ <hide>#if $hwm() == -1 then 'part' else 'none'#</hide>
</param>
<sink>
diff --git a/gr-zeromq/grc/zeromq_req_source.xml
b/gr-zeromq/grc/zeromq_req_source.xml
index 050718c..2ef2243 100644
--- a/gr-zeromq/grc/zeromq_req_source.xml
+++ b/gr-zeromq/grc/zeromq_req_source.xml
@@ -4,7 +4,7 @@
<key>zeromq_req_source</key>
<category>ZeroMQ Interfaces</category>
<import>from gnuradio import zeromq</import>
- <make>zeromq.req_source($type.itemsize, $vlen, $address, $timeout,
$pass_tags)</make>
+ <make>zeromq.req_source($type.itemsize, $vlen, $address, $timeout,
$pass_tags, $hwm)</make>
<param>
<name>IO Type</name>
@@ -61,7 +61,23 @@
<name>Pass Tags</name>
<key>pass_tags</key>
<value>False</value>
- <type>bool</type>
+ <type>enum</type>
+ <option>
+ <name>Yes</name>
+ <key>True</key>
+ </option>
+ <option>
+ <name>No</name>
+ <key>False</key>
+ </option>
+ </param>
+
+ <param>
+ <name>High Watermark</name>
+ <key>hwm</key>
+ <value>-1</value>
+ <type>int</type>
+ <hide>#if $hwm() == -1 then 'part' else 'none'#</hide>
</param>
<source>
diff --git a/gr-zeromq/grc/zeromq_sub_source.xml
b/gr-zeromq/grc/zeromq_sub_source.xml
index 86af506..268a893 100644
--- a/gr-zeromq/grc/zeromq_sub_source.xml
+++ b/gr-zeromq/grc/zeromq_sub_source.xml
@@ -4,7 +4,7 @@
<key>zeromq_sub_source</key>
<category>ZeroMQ Interfaces</category>
<import>from gnuradio import zeromq</import>
- <make>zeromq.sub_source($type.itemsize, $vlen, $address, $timeout,
$pass_tags)</make>
+ <make>zeromq.sub_source($type.itemsize, $vlen, $address, $timeout,
$pass_tags, $hwm)</make>
<param>
<name>IO Type</name>
@@ -61,7 +61,23 @@
<name>Pass Tags</name>
<key>pass_tags</key>
<value>False</value>
- <type>bool</type>
+ <type>enum</type>
+ <option>
+ <name>Yes</name>
+ <key>True</key>
+ </option>
+ <option>
+ <name>No</name>
+ <key>False</key>
+ </option>
+ </param>
+
+ <param>
+ <name>High Watermark</name>
+ <key>hwm</key>
+ <value>-1</value>
+ <type>int</type>
+ <hide>#if $hwm() == -1 then 'part' else 'none'#</hide>
</param>
<source>
diff --git a/gr-zeromq/include/gnuradio/zeromq/pub_sink.h
b/gr-zeromq/include/gnuradio/zeromq/pub_sink.h
index e8871c2..e87c552 100644
--- a/gr-zeromq/include/gnuradio/zeromq/pub_sink.h
+++ b/gr-zeromq/include/gnuradio/zeromq/pub_sink.h
@@ -53,9 +53,10 @@ namespace gr {
* \param address ZMQ socket address specifier.
* \param timeout Receive timeout in seconds, default is 100ms, 1us
increments.
* \param pass_tags Whether sink will serialize and pass tags over the
link.
+ * \param hwm High Watermark to configure the socket to (-1 => zmq's
default)
*/
static sptr make(size_t itemsize, size_t vlen, char *address,
- int timeout=100, bool pass_tags=false);
+ int timeout=100, bool pass_tags=false, int hwm=-1);
};
} // namespace zeromq
diff --git a/gr-zeromq/include/gnuradio/zeromq/pull_source.h
b/gr-zeromq/include/gnuradio/zeromq/pull_source.h
index ca7b407..07cf6af 100644
--- a/gr-zeromq/include/gnuradio/zeromq/pull_source.h
+++ b/gr-zeromq/include/gnuradio/zeromq/pull_source.h
@@ -50,9 +50,10 @@ namespace gr {
* \param address ZMQ socket address specifier.
* \param timeout Receive timeout in seconds, default is 100ms, 1us
increments.
* \param pass_tags Whether source will look for and deserialize tags.
+ * \param hwm High Watermark to configure the socket to (-1 => zmq's
default)
*/
static sptr make(size_t itemsize, size_t vlen, char *address,
- int timeout=100, bool pass_tags=false);
+ int timeout=100, bool pass_tags=false, int hwm=-1);
};
} // namespace zeromq
diff --git a/gr-zeromq/include/gnuradio/zeromq/push_sink.h
b/gr-zeromq/include/gnuradio/zeromq/push_sink.h
index 0f21b44..e2260aa 100644
--- a/gr-zeromq/include/gnuradio/zeromq/push_sink.h
+++ b/gr-zeromq/include/gnuradio/zeromq/push_sink.h
@@ -54,9 +54,10 @@ namespace gr {
* \param address ZMQ socket address specifier.
* \param timeout Receive timeout in seconds, default is 100ms, 1us
increments.
* \param pass_tags Whether sink will serialize and pass tags over the
link.
+ * \param hwm High Watermark to configure the socket to (-1 => zmq's
default)
*/
static sptr make(size_t itemsize, size_t vlen, char *address,
- int timeout=100, bool pass_tags=false);
+ int timeout=100, bool pass_tags=false, int hwm=-1);
};
} // namespace zeromq
diff --git a/gr-zeromq/include/gnuradio/zeromq/rep_sink.h
b/gr-zeromq/include/gnuradio/zeromq/rep_sink.h
index 33fd38b..220bd34 100644
--- a/gr-zeromq/include/gnuradio/zeromq/rep_sink.h
+++ b/gr-zeromq/include/gnuradio/zeromq/rep_sink.h
@@ -52,9 +52,10 @@ namespace gr {
* \param address ZMQ socket address specifier.
* \param timeout Receive timeout in seconds, default is 100ms, 1us
increments.
* \param pass_tags Whether sink will serialize and pass tags over the
link.
+ * \param hwm High Watermark to configure the socket to (-1 => zmq's
default)
*/
static sptr make(size_t itemsize, size_t vlen, char *address,
- int timeout=100, bool pass_tags=false);
+ int timeout=100, bool pass_tags=false, int hwm=-1);
};
} // namespace zeromq
diff --git a/gr-zeromq/include/gnuradio/zeromq/req_source.h
b/gr-zeromq/include/gnuradio/zeromq/req_source.h
index 9936406..461f653 100644
--- a/gr-zeromq/include/gnuradio/zeromq/req_source.h
+++ b/gr-zeromq/include/gnuradio/zeromq/req_source.h
@@ -50,9 +50,10 @@ namespace gr {
* \param address ZMQ socket address specifier.
* \param timeout Receive timeout in seconds, default is 100ms, 1us
increments.
* \param pass_tags Whether source will look for and deserialize tags.
+ * \param hwm High Watermark to configure the socket to (-1 => zmq's
default)
*/
static sptr make(size_t itemsize, size_t vlen, char *address,
- int timeout=100, bool pass_tags=false);
+ int timeout=100, bool pass_tags=false, int hwm=-1);
};
} // namespace zeromq
diff --git a/gr-zeromq/include/gnuradio/zeromq/sub_source.h
b/gr-zeromq/include/gnuradio/zeromq/sub_source.h
index 5fdd893..def3a70 100644
--- a/gr-zeromq/include/gnuradio/zeromq/sub_source.h
+++ b/gr-zeromq/include/gnuradio/zeromq/sub_source.h
@@ -50,9 +50,10 @@ namespace gr {
* \param address ZMQ socket address specifier.
* \param timeout Receive timeout in seconds, default is 100ms, 1us
increments.
* \param pass_tags Whether source will look for and deserialize tags.
+ * \param hwm High Watermark to configure the socket to (-1 => zmq's
default)
*/
static sptr make(size_t itemsize, size_t vlen, char *address,
- int timeout=100, bool pass_tags=false);
+ int timeout=100, bool pass_tags=false, int hwm=-1);
};
} // namespace zeromq
diff --git a/gr-zeromq/lib/CMakeLists.txt b/gr-zeromq/lib/CMakeLists.txt
index 941e5ff..d7b03fa 100644
--- a/gr-zeromq/lib/CMakeLists.txt
+++ b/gr-zeromq/lib/CMakeLists.txt
@@ -37,6 +37,7 @@ endif(ENABLE_GR_CTRLPORT)
# Setup library
########################################################################
list(APPEND zeromq_sources
+ base_impl.cc
pub_sink_impl.cc
pub_msg_sink_impl.cc
sub_source_impl.cc
diff --git a/gr-zeromq/lib/base_impl.cc b/gr-zeromq/lib/base_impl.cc
new file mode 100644
index 0000000..f41e5cb
--- /dev/null
+++ b/gr-zeromq/lib/base_impl.cc
@@ -0,0 +1,198 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2016 Free Software Foundation, Inc.
+ *
+ * This file is part of GNU Radio.
+ *
+ * This is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3, or (at your option)
+ * any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this software; see the file COPYING. If not, write to
+ * the Free Software Foundation, Inc., 51 Franklin Street,
+ * Boston, MA 02110-1301, USA.
+ */
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+#include <gnuradio/io_signature.h>
+#include "base_impl.h"
+#include "tag_headers.h"
+
+namespace gr {
+ namespace zeromq {
+
+ base_impl::base_impl(int type, size_t itemsize, size_t vlen, int timeout,
bool pass_tags)
+ : d_vsize(itemsize * vlen), d_timeout(timeout), d_pass_tags(pass_tags)
+ {
+ /* "Fix" timeout value (ms for new API, us for old API) */
+ int major, minor, patch;
+ zmq::version (&major, &minor, &patch);
+
+ if (major < 3) {
+ d_timeout *= 1000;
+ }
+
+ /* Create context & socket */
+ d_context = new zmq::context_t(1);
+ d_socket = new zmq::socket_t(*d_context, type);
+ }
+
+ base_impl::~base_impl()
+ {
+ d_socket->close();
+ delete d_socket;
+ delete d_context;
+ }
+
+
+ base_sink_impl::base_sink_impl(int type, size_t itemsize, size_t vlen,
char *address, int timeout, bool pass_tags, int hwm)
+ : base_impl(type, itemsize, vlen, timeout, pass_tags)
+ {
+ /* Set high watermark */
+ if (hwm >= 0) {
+#ifdef ZMQ_SNDHWM
+ d_socket->setsockopt(ZMQ_SNDHWM, &hwm, sizeof(hwm));
+#else // major < 3
+ uint64_t tmp = hwm;
+ d_socket->setsockopt(ZMQ_HWM, &tmp, sizeof(tmp));
+#endif
+ }
+
+ /* Bind */
+ d_socket->bind(address);
+ }
+
+ int
+ base_sink_impl::send_message(const void *in_buf, const int in_nitems,
const uint64_t in_offset)
+ {
+ /* Meta-data header */
+ std::string header("");
+ if(d_pass_tags){
+ std::vector<gr::tag_t> tags;
+ get_tags_in_range(tags, 0, in_offset, in_offset + in_nitems);
+ header = gen_tag_header(in_offset, tags);
+ }
+
+ /* Create message */
+ size_t payload_len = in_nitems * d_vsize;
+ size_t msg_len = d_pass_tags ? payload_len + header.length() :
payload_len;
+ zmq::message_t msg(msg_len);
+
+ if(d_pass_tags){
+ memcpy(msg.data(), header.c_str(), header.length());
+ memcpy((uint8_t*)msg.data() + header.length(), in_buf, payload_len);
+ } else {
+ memcpy(msg.data(), in_buf, payload_len);
+ }
+
+ /* Send */
+ d_socket->send(msg);
+
+ /* Report back */
+ return in_nitems;
+ }
+
+ base_source_impl::base_source_impl(int type, size_t itemsize, size_t vlen,
char *address, int timeout, bool pass_tags, int hwm)
+ : base_impl(type, itemsize, vlen, timeout, pass_tags),
+ d_consumed_bytes(0), d_consumed_items(0)
+ {
+ /* Set high watermark */
+ if (hwm >= 0) {
+#ifdef ZMQ_RCVHWM
+ d_socket->setsockopt(ZMQ_RCVHWM, &hwm, sizeof(hwm));
+#else // major < 3
+ uint64_t tmp = hwm;
+ d_socket->setsockopt(ZMQ_HWM, &tmp, sizeof(tmp));
+#endif
+ }
+
+ /* Connect */
+ d_socket->connect(address);
+ }
+
+ bool
+ base_source_impl::has_pending()
+ {
+ return d_msg.size() > d_consumed_bytes;
+ }
+
+ int
+ base_source_impl::flush_pending(void *out_buf, const int out_nitems, const
uint64_t out_offset)
+ {
+ /* How much to copy in this call */
+ int to_copy_items = std::min(out_nitems, (int)((d_msg.size() -
d_consumed_bytes) / d_vsize));
+ int to_copy_bytes = d_vsize * to_copy_items;
+
+ /* Copy actual data */
+ memcpy(out_buf, (uint8_t*)d_msg.data() + d_consumed_bytes,
to_copy_bytes);
+
+ /* Add tags matching this segment of samples */
+ for (unsigned int i=0; i<d_tags.size(); i++)
+ {
+ if ((d_tags[i].offset >= (uint64_t)d_consumed_items) &&
+ (d_tags[i].offset < (uint64_t)d_consumed_items + to_copy_items))
+ {
+ gr::tag_t nt = d_tags[i];
+ nt.offset += out_offset - d_consumed_items;
+ add_item_tag(0, nt);
+ }
+ }
+
+ /* Update pointer */
+ d_consumed_items += to_copy_items;
+ d_consumed_bytes += to_copy_bytes;
+
+ return to_copy_items;
+ }
+
+ bool
+ base_source_impl::load_message(bool wait)
+ {
+ /* Poll for input */
+ zmq::pollitem_t items[] = { { *d_socket, 0, ZMQ_POLLIN, 0 } };
+ zmq::poll(&items[0], 1, wait ? d_timeout : 0);
+
+ if (!(items[0].revents & ZMQ_POLLIN))
+ return false;
+
+ /* Reset */
+ d_msg.rebuild();
+ d_tags.clear();
+ d_consumed_items = 0;
+ d_consumed_bytes = 0;
+
+ /* Get the message */
+ d_socket->recv(&d_msg);
+
+ /* Parse header */
+ if (d_pass_tags)
+ {
+ uint64_t rcv_offset;
+
+ /* Parse header */
+ d_consumed_bytes = parse_tag_header(d_msg, rcv_offset, d_tags);
+
+ /* Fixup the tags offset to be relative to the start of this message */
+ for (unsigned int i=0; i<d_tags.size(); i++) {
+ d_tags[i].offset -= rcv_offset;
+ }
+ }
+
+ /* We got one ! */
+ return true;
+ }
+
+ } /* namespace zeromq */
+} /* namespace gr */
+
+// vim: ts=2 sw=2 expandtab
diff --git a/gr-zeromq/lib/base_impl.h b/gr-zeromq/lib/base_impl.h
new file mode 100644
index 0000000..ed16951
--- /dev/null
+++ b/gr-zeromq/lib/base_impl.h
@@ -0,0 +1,77 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2016 Free Software Foundation, Inc.
+ *
+ * This file is part of GNU Radio.
+ *
+ * This is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3, or (at your option)
+ * any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this software; see the file COPYING. If not, write to
+ * the Free Software Foundation, Inc., 51 Franklin Street,
+ * Boston, MA 02110-1301, USA.
+ */
+
+#ifndef INCLUDED_ZEROMQ_BASE_IMPL_H
+#define INCLUDED_ZEROMQ_BASE_IMPL_H
+
+#include <zmq.hpp>
+
+#include <gnuradio/sync_block.h>
+
+namespace gr {
+ namespace zeromq {
+
+ class base_impl : public virtual gr::sync_block
+ {
+ public:
+ base_impl(int type, size_t itemsize, size_t vlen, int timeout, bool
pass_tags);
+ virtual ~base_impl();
+
+ protected:
+ zmq::context_t *d_context;
+ zmq::socket_t *d_socket;
+ size_t d_vsize;
+ int d_timeout;
+ bool d_pass_tags;
+ };
+
+ class base_sink_impl : public base_impl
+ {
+ public:
+ base_sink_impl(int type, size_t itemsize, size_t vlen, char *address,
int timeout, bool pass_tags, int hwm);
+
+ protected:
+ int send_message(const void *in_buf, const int in_nitems, const uint64_t
in_offset);
+ };
+
+ class base_source_impl : public base_impl
+ {
+ public:
+ base_source_impl(int type, size_t itemsize, size_t vlen, char *address,
int timeout, bool pass_tags, int hwm);
+
+ protected:
+ zmq::message_t d_msg;
+ std::vector<gr::tag_t> d_tags;
+ size_t d_consumed_bytes;
+ int d_consumed_items;
+
+ bool has_pending();
+ int flush_pending(void *out_buf, const int out_nitems, const uint64_t
out_offset);
+ bool load_message(bool wait);
+ };
+
+ } // namespace zeromq
+} // namespace gr
+
+#endif /* INCLUDED_ZEROMQ_BASE_IMPL_H */
+
+// vim: ts=2 sw=2 expandtab
diff --git a/gr-zeromq/lib/pub_sink_impl.cc b/gr-zeromq/lib/pub_sink_impl.cc
index c103069..b602bc8 100644
--- a/gr-zeromq/lib/pub_sink_impl.cc
+++ b/gr-zeromq/lib/pub_sink_impl.cc
@@ -32,35 +32,19 @@ namespace gr {
namespace zeromq {
pub_sink::sptr
- pub_sink::make(size_t itemsize, size_t vlen, char *address, int timeout,
bool pass_tags)
+ pub_sink::make(size_t itemsize, size_t vlen, char *address, int timeout,
bool pass_tags, int hwm)
{
return gnuradio::get_initial_sptr
- (new pub_sink_impl(itemsize, vlen, address, timeout, pass_tags));
+ (new pub_sink_impl(itemsize, vlen, address, timeout, pass_tags, hwm));
}
- pub_sink_impl::pub_sink_impl(size_t itemsize, size_t vlen, char *address,
int timeout, bool pass_tags)
+ pub_sink_impl::pub_sink_impl(size_t itemsize, size_t vlen, char *address,
int timeout, bool pass_tags, int hwm)
: gr::sync_block("pub_sink",
gr::io_signature::make(1, 1, itemsize * vlen),
gr::io_signature::make(0, 0, 0)),
- d_itemsize(itemsize), d_vlen(vlen), d_timeout(timeout),
d_pass_tags(pass_tags)
+ base_sink_impl(ZMQ_PUB, itemsize, vlen, address, timeout, pass_tags,
hwm)
{
- int major, minor, patch;
- zmq::version (&major, &minor, &patch);
- if (major < 3) {
- d_timeout = timeout*1000;
- }
- d_context = new zmq::context_t(1);
- d_socket = new zmq::socket_t(*d_context, ZMQ_PUB);
- int time = 0;
- d_socket->setsockopt(ZMQ_LINGER, &time, sizeof(time));
- d_socket->bind(address);
- }
-
- pub_sink_impl::~pub_sink_impl()
- {
- d_socket->close();
- delete d_socket;
- delete d_context;
+ /* All is delegated */
}
int
@@ -68,33 +52,10 @@ namespace gr {
gr_vector_const_void_star &input_items,
gr_vector_void_star &output_items)
{
- const char *in = (const char *)input_items[0];
-
- // encode the current offset, # tags, and tags into header
- std::string header("");
- if(d_pass_tags){
- uint64_t offset = nitems_read(0);
- std::vector<gr::tag_t> tags;
- get_tags_in_range(tags, 0, nitems_read(0), nitems_read(0)+noutput_items);
- header = gen_tag_header( offset, tags );
- }
-
- // create message copy and send
- int payloadlen = d_itemsize * d_vlen * noutput_items;
- int msglen = d_pass_tags ? payloadlen + header.length() : payloadlen;
- zmq::message_t msg(msglen);
-
- if(d_pass_tags){
- memcpy((void*) msg.data(), header.c_str(), header.length() );
- memcpy((uint8_t *)msg.data() + header.length(), in,
d_itemsize*d_vlen*noutput_items);
- } else {
- memcpy((uint8_t *)msg.data(), in, d_itemsize*d_vlen*noutput_items);
- }
-
- d_socket->send(msg);
-
- return noutput_items;
+ return send_message(input_items[0], noutput_items, nitems_read(0));
}
} /* namespace zeromq */
} /* namespace gr */
+
+// vim: ts=2 sw=2 expandtab
diff --git a/gr-zeromq/lib/pub_sink_impl.h b/gr-zeromq/lib/pub_sink_impl.h
index 100b0f5..8637c35 100644
--- a/gr-zeromq/lib/pub_sink_impl.h
+++ b/gr-zeromq/lib/pub_sink_impl.h
@@ -26,22 +26,15 @@
#include <gnuradio/zeromq/pub_sink.h>
#include <zmq.hpp>
+#include "base_impl.h"
+
namespace gr {
namespace zeromq {
- class pub_sink_impl : public pub_sink
+ class pub_sink_impl : public pub_sink, public base_sink_impl
{
- private:
- size_t d_itemsize;
- size_t d_vlen;
- float d_timeout;
- zmq::context_t *d_context;
- zmq::socket_t *d_socket;
- bool d_pass_tags;
-
public:
- pub_sink_impl(size_t itemsize, size_t vlen, char *address, int timeout,
bool pass_tags);
- ~pub_sink_impl();
+ pub_sink_impl(size_t itemsize, size_t vlen, char *address, int timeout,
bool pass_tags, int hwm);
int work(int noutput_items,
gr_vector_const_void_star &input_items,
diff --git a/gr-zeromq/lib/pull_source_impl.cc
b/gr-zeromq/lib/pull_source_impl.cc
index 3215096..4045dd7 100644
--- a/gr-zeromq/lib/pull_source_impl.cc
+++ b/gr-zeromq/lib/pull_source_impl.cc
@@ -32,41 +32,19 @@ namespace gr {
namespace zeromq {
pull_source::sptr
- pull_source::make(size_t itemsize, size_t vlen, char *address, int
timeout, bool pass_tags)
+ pull_source::make(size_t itemsize, size_t vlen, char *address, int
timeout, bool pass_tags, int hwm)
{
return gnuradio::get_initial_sptr
- (new pull_source_impl(itemsize, vlen, address, timeout, pass_tags));
+ (new pull_source_impl(itemsize, vlen, address, timeout, pass_tags,
hwm));
}
- pull_source_impl::pull_source_impl(size_t itemsize, size_t vlen, char
*address, int timeout, bool pass_tags)
+ pull_source_impl::pull_source_impl(size_t itemsize, size_t vlen, char
*address, int timeout, bool pass_tags, int hwm)
: gr::sync_block("pull_source",
gr::io_signature::make(0, 0, 0),
gr::io_signature::make(1, 1, itemsize * vlen)),
- d_itemsize(itemsize), d_vlen(vlen), d_timeout(timeout),
d_pass_tags(pass_tags)
+ base_source_impl(ZMQ_PULL, itemsize, vlen, address, timeout,
pass_tags, hwm)
{
- int major, minor, patch;
- zmq::version (&major, &minor, &patch);
-
- if (major < 3) {
- d_timeout = timeout*1000;
- }
-
- d_context = new zmq::context_t(1);
- d_socket = new zmq::socket_t(*d_context, ZMQ_PULL);
-
- int time = 0;
- d_socket->setsockopt(ZMQ_LINGER, &time, sizeof(time));
- d_socket->connect (address);
- }
-
- /*
- * Our virtual destructor.
- */
- pull_source_impl::~pull_source_impl()
- {
- d_socket->close();
- delete d_socket;
- delete d_context;
+ /* All is delegated */
}
int
@@ -74,44 +52,37 @@ namespace gr {
gr_vector_const_void_star &input_items,
gr_vector_void_star &output_items)
{
- char *out = (char*)output_items[0];
-
- zmq::pollitem_t items[] = { { *d_socket, 0, ZMQ_POLLIN, 0 } };
- zmq::poll (&items[0], 1, d_timeout);
-
- // If we got a reply, process
- if (items[0].revents & ZMQ_POLLIN) {
-
- // Receive data
- zmq::message_t msg;
- d_socket->recv(&msg);
-
- // check header for tags...
- std::string buf(static_cast<char*>(msg.data()), msg.size());
- if(d_pass_tags){
- uint64_t rcv_offset;
- std::vector<gr::tag_t> tags;
- buf = parse_tag_header(buf, rcv_offset, tags);
- for(size_t i=0; i<tags.size(); i++){
- tags[i].offset -= rcv_offset - nitems_written(0);
- add_item_tag(0, tags[i]);
- }
- }
-
- // Copy to ouput buffer and return
- if (buf.size() >= d_itemsize*d_vlen*noutput_items) {
- memcpy(out, (void *)&buf[0], d_itemsize*d_vlen*noutput_items);
- return noutput_items;
+ uint8_t *out = (uint8_t *) output_items[0];
+ bool first = true;
+ int done = 0;
+
+ /* Process as much as we can */
+ while (1)
+ {
+ if (has_pending())
+ {
+ /* Flush anything pending */
+ done += flush_pending(out + (done * d_vsize), noutput_items - done,
nitems_written(0) + done);
+
+ /* No more space ? */
+ if (done == noutput_items)
+ break;
}
- else {
- memcpy(out, (void *)&buf[0], buf.size());
- return buf.size()/(d_itemsize*d_vlen);
+ else
+ {
+ /* Try to get the next message */
+ if (!load_message(first))
+ break; /* No message, we're done for now */
+
+ /* Not the first anymore */
+ first = false;
}
}
- else {
- return 0; // FIXME: someday when the scheduler does all the
poll/selects
- }
+
+ return done;
}
} /* namespace zeromq */
} /* namespace gr */
+
+// vim: ts=2 sw=2 expandtab
diff --git a/gr-zeromq/lib/pull_source_impl.h b/gr-zeromq/lib/pull_source_impl.h
index 7578679..7d8ab53 100644
--- a/gr-zeromq/lib/pull_source_impl.h
+++ b/gr-zeromq/lib/pull_source_impl.h
@@ -26,22 +26,15 @@
#include <gnuradio/zeromq/pull_source.h>
#include <zmq.hpp>
+#include "base_impl.h"
+
namespace gr {
namespace zeromq {
- class pull_source_impl : public pull_source
+ class pull_source_impl : public pull_source, public base_source_impl
{
- private:
- size_t d_itemsize;
- size_t d_vlen;
- int d_timeout; // microseconds, -1 is blocking
- zmq::context_t *d_context;
- zmq::socket_t *d_socket;
- bool d_pass_tags;
-
public:
- pull_source_impl(size_t itemsize, size_t vlen, char *address, int
timeout, bool pass_tags);
- ~pull_source_impl();
+ pull_source_impl(size_t itemsize, size_t vlen, char *address, int
timeout, bool pass_tags, int hwm);
int work(int noutput_items,
gr_vector_const_void_star &input_items,
diff --git a/gr-zeromq/lib/push_sink_impl.cc b/gr-zeromq/lib/push_sink_impl.cc
index 7c06dc5..a5aec2c 100644
--- a/gr-zeromq/lib/push_sink_impl.cc
+++ b/gr-zeromq/lib/push_sink_impl.cc
@@ -32,38 +32,19 @@ namespace gr {
namespace zeromq {
push_sink::sptr
- push_sink::make(size_t itemsize, size_t vlen, char *address, int timeout,
bool pass_tags)
+ push_sink::make(size_t itemsize, size_t vlen, char *address, int timeout,
bool pass_tags, int hwm)
{
return gnuradio::get_initial_sptr
- (new push_sink_impl(itemsize, vlen, address, timeout, pass_tags));
+ (new push_sink_impl(itemsize, vlen, address, timeout, pass_tags, hwm));
}
- push_sink_impl::push_sink_impl(size_t itemsize, size_t vlen, char
*address, int timeout, bool pass_tags)
+ push_sink_impl::push_sink_impl(size_t itemsize, size_t vlen, char
*address, int timeout, bool pass_tags, int hwm)
: gr::sync_block("push_sink",
gr::io_signature::make(1, 1, itemsize * vlen),
gr::io_signature::make(0, 0, 0)),
- d_itemsize(itemsize), d_vlen(vlen), d_timeout(timeout),
d_pass_tags(pass_tags)
+ base_sink_impl(ZMQ_PUSH, itemsize, vlen, address, timeout, pass_tags,
hwm)
{
- int major, minor, patch;
- zmq::version (&major, &minor, &patch);
-
- if (major < 3) {
- d_timeout = timeout*1000;
- }
-
- d_context = new zmq::context_t(1);
- d_socket = new zmq::socket_t(*d_context, ZMQ_PUSH);
-
- int time = 0;
- d_socket->setsockopt(ZMQ_LINGER, &time, sizeof(time));
- d_socket->bind (address);
- }
-
- push_sink_impl::~push_sink_impl()
- {
- d_socket->close();
- delete d_socket;
- delete d_context;
+ /* All is delegated */
}
int
@@ -71,43 +52,19 @@ namespace gr {
gr_vector_const_void_star &input_items,
gr_vector_void_star &output_items)
{
- const char *in = (const char *) input_items[0];
-
+ // Poll with a timeout (FIXME: scheduler can't wait for us)
zmq::pollitem_t itemsout[] = { { *d_socket, 0, ZMQ_POLLOUT, 0 } };
zmq::poll(&itemsout[0], 1, d_timeout);
- // If we got a reply, process
- if (itemsout[0].revents & ZMQ_POLLOUT) {
-
- // encode the current offset, # tags, and tags into header
- std::string header("");
-
- if(d_pass_tags){
- uint64_t offset = nitems_read(0);
- std::vector<gr::tag_t> tags;
- get_tags_in_range(tags, 0, nitems_read(0),
nitems_read(0)+noutput_items);
- header = gen_tag_header(offset, tags);
- }
+ // If we can send something, do it
+ if (itemsout[0].revents & ZMQ_POLLOUT)
+ return send_message(input_items[0], noutput_items, nitems_read(0));
- // create message copy and send
- int payloadlen = d_itemsize * d_vlen * noutput_items;
- int msglen = d_pass_tags ? payloadlen + header.length() : payloadlen;
- zmq::message_t msg(msglen);
-
- if(d_pass_tags){
- memcpy((void*) msg.data(), header.c_str(), header.length() );
- memcpy((uint8_t *)msg.data() + header.length(), in,
d_itemsize*d_vlen*noutput_items);
- } else {
- memcpy((uint8_t *)msg.data(), in, d_itemsize*d_vlen*noutput_items);
- }
-
- d_socket->send(msg);
- return noutput_items;
- }
- else {
- return 0; // FIXME: when scheduler supports return blocking
- }
+ // If not, do nothing
+ return 0;
}
} /* namespace zeromq */
} /* namespace gr */
+
+// vim: ts=2 sw=2 expandtab
diff --git a/gr-zeromq/lib/push_sink_impl.h b/gr-zeromq/lib/push_sink_impl.h
index 924dee3..0a5de10 100644
--- a/gr-zeromq/lib/push_sink_impl.h
+++ b/gr-zeromq/lib/push_sink_impl.h
@@ -26,22 +26,15 @@
#include <gnuradio/zeromq/push_sink.h>
#include <zmq.hpp>
+#include "base_impl.h"
+
namespace gr {
namespace zeromq {
- class push_sink_impl : public push_sink
+ class push_sink_impl : public push_sink, public base_sink_impl
{
- private:
- size_t d_itemsize;
- size_t d_vlen;
- float d_timeout;
- zmq::context_t *d_context;
- zmq::socket_t *d_socket;
- bool d_pass_tags;
-
public:
- push_sink_impl(size_t itemsize, size_t vlen, char *address, int timeout,
bool pass_tags);
- ~push_sink_impl();
+ push_sink_impl(size_t itemsize, size_t vlen, char *address, int timeout,
bool pass_tags, int hwm);
int work(int noutput_items,
gr_vector_const_void_star &input_items,
diff --git a/gr-zeromq/lib/rep_sink_impl.cc b/gr-zeromq/lib/rep_sink_impl.cc
index 034a5b0..ac6fc9c 100644
--- a/gr-zeromq/lib/rep_sink_impl.cc
+++ b/gr-zeromq/lib/rep_sink_impl.cc
@@ -32,38 +32,19 @@ namespace gr {
namespace zeromq {
rep_sink::sptr
- rep_sink::make(size_t itemsize, size_t vlen, char *address, int timeout,
bool pass_tags)
+ rep_sink::make(size_t itemsize, size_t vlen, char *address, int timeout,
bool pass_tags, int hwm)
{
return gnuradio::get_initial_sptr
- (new rep_sink_impl(itemsize, vlen, address, timeout, pass_tags));
+ (new rep_sink_impl(itemsize, vlen, address, timeout, pass_tags, hwm));
}
- rep_sink_impl::rep_sink_impl(size_t itemsize, size_t vlen, char *address,
int timeout, bool pass_tags)
+ rep_sink_impl::rep_sink_impl(size_t itemsize, size_t vlen, char *address,
int timeout, bool pass_tags, int hwm)
: gr::sync_block("rep_sink",
gr::io_signature::make(1, 1, itemsize * vlen),
gr::io_signature::make(0, 0, 0)),
- d_itemsize(itemsize), d_vlen(vlen), d_timeout(timeout),
d_pass_tags(pass_tags)
+ base_sink_impl(ZMQ_REP, itemsize, vlen, address, timeout, pass_tags,
hwm)
{
- int major, minor, patch;
- zmq::version (&major, &minor, &patch);
-
- if (major < 3) {
- d_timeout = timeout*1000;
- }
-
- d_context = new zmq::context_t(1);
- d_socket = new zmq::socket_t(*d_context, ZMQ_REP);
-
- int time = 0;
- d_socket->setsockopt(ZMQ_LINGER, &time, sizeof(time));
- d_socket->bind (address);
- }
-
- rep_sink_impl::~rep_sink_impl()
- {
- d_socket->close();
- delete d_socket;
- delete d_context;
+ /* All is delegated */
}
int
@@ -71,46 +52,44 @@ namespace gr {
gr_vector_const_void_star &input_items,
gr_vector_void_star &output_items)
{
- const char *in = (const char *) input_items[0];
-
- zmq::pollitem_t items[] = { { *d_socket, 0, ZMQ_POLLIN, 0 } };
- zmq::poll(&items[0], 1, d_timeout);
-
- // If we got a reply, process
- if (items[0].revents & ZMQ_POLLIN) {
- // receive data request
+ const uint8_t *in = (const uint8_t *) input_items[0];
+ bool first = true;
+ int done = 0;
+
+ /* Process as much as we can */
+ while (1)
+ {
+ /* Wait for a small time (FIXME: scheduler can't wait for us) */
+ /* We only wait if its the first iteration, for the others we'll
+ * let the scheduler retry */
+ zmq::pollitem_t items[] = { { *d_socket, 0, ZMQ_POLLIN, 0 } };
+ zmq::poll(&items[0], 1, first ? d_timeout : 0);
+
+ /* If we dont have anything, we're done */
+ if (!(items[0].revents & ZMQ_POLLIN))
+ break;
+
+ /* Get and parse the request */
zmq::message_t request;
d_socket->recv(&request);
- int req_output_items = *(static_cast<int*>(request.data()));
- int nitems_send = std::min(noutput_items, req_output_items);
- // encode the current offset, # tags, and tags into header
- std::string header("");
- if(d_pass_tags){
- uint64_t offset = nitems_read(0);
- std::vector<gr::tag_t> tags;
- get_tags_in_range(tags, 0, nitems_read(0),
nitems_read(0)+noutput_items);
- header = gen_tag_header( offset, tags );
+ int nitems_send = noutput_items - done;
+ if (request.size() >= sizeof(uint32_t))
+ {
+ int req = (int)*(static_cast<uint32_t*>(request.data()));
+ nitems_send = std::min(nitems_send, req);
}
+ /* Delegate the actual send */
+ done += send_message(in + (done * d_vsize), nitems_send,
nitems_read(0) + done);
- // create message copy and send
- int payloadlen = d_itemsize * d_vlen * noutput_items;
- int msglen = d_pass_tags ? payloadlen + header.length() : payloadlen;
- zmq::message_t msg(msglen);
-
- if(d_pass_tags){
- memcpy((void*) msg.data(), header.c_str(), header.length() );
- memcpy((uint8_t *)msg.data() + header.length(), in,
d_itemsize*d_vlen*noutput_items);
- } else {
- memcpy((uint8_t *)msg.data(), in, d_itemsize*d_vlen*noutput_items);
- }
- d_socket->send(msg);
-
- return nitems_send;
+ /* Not the first anymore */
+ first = false;
}
- return 0;
+ return done;
}
} /* namespace zeromq */
} /* namespace gr */
+
+// vim: ts=2 sw=2 expandtab
diff --git a/gr-zeromq/lib/rep_sink_impl.h b/gr-zeromq/lib/rep_sink_impl.h
index 55ebb69..012fc45 100644
--- a/gr-zeromq/lib/rep_sink_impl.h
+++ b/gr-zeromq/lib/rep_sink_impl.h
@@ -26,22 +26,15 @@
#include <gnuradio/zeromq/rep_sink.h>
#include <zmq.hpp>
+#include "base_impl.h"
+
namespace gr {
namespace zeromq {
- class rep_sink_impl : public rep_sink
+ class rep_sink_impl : public rep_sink, public base_sink_impl
{
- private:
- size_t d_itemsize;
- size_t d_vlen;
- int d_timeout;
- zmq::context_t *d_context;
- zmq::socket_t *d_socket;
- bool d_pass_tags;
-
public:
- rep_sink_impl(size_t itemsize, size_t vlen, char *address, int timeout,
bool pass_tags);
- ~rep_sink_impl();
+ rep_sink_impl(size_t itemsize, size_t vlen, char *address, int timeout,
bool pass_tags, int hwm);
int work(int noutput_items,
gr_vector_const_void_star &input_items,
diff --git a/gr-zeromq/lib/req_source_impl.cc b/gr-zeromq/lib/req_source_impl.cc
index f69d447..5267363 100644
--- a/gr-zeromq/lib/req_source_impl.cc
+++ b/gr-zeromq/lib/req_source_impl.cc
@@ -32,38 +32,20 @@ namespace gr {
namespace zeromq {
req_source::sptr
- req_source::make(size_t itemsize, size_t vlen, char *address, int timeout,
bool pass_tags)
+ req_source::make(size_t itemsize, size_t vlen, char *address, int timeout,
bool pass_tags, int hwm)
{
return gnuradio::get_initial_sptr
- (new req_source_impl(itemsize, vlen, address, timeout, pass_tags));
+ (new req_source_impl(itemsize, vlen, address, timeout, pass_tags,
hwm));
}
- req_source_impl::req_source_impl(size_t itemsize, size_t vlen, char
*address, int timeout, bool pass_tags)
+ req_source_impl::req_source_impl(size_t itemsize, size_t vlen, char
*address, int timeout, bool pass_tags, int hwm)
: gr::sync_block("req_source",
gr::io_signature::make(0, 0, 0),
gr::io_signature::make(1, 1, itemsize * vlen)),
- d_itemsize(itemsize), d_vlen(vlen), d_timeout(timeout),
d_pass_tags(pass_tags)
+ base_source_impl(ZMQ_REQ, itemsize, vlen, address, timeout, pass_tags,
hwm),
+ d_req_pending(false)
{
- int major, minor, patch;
- zmq::version (&major, &minor, &patch);
-
- if (major < 3) {
- d_timeout = timeout*1000;
- }
-
- d_context = new zmq::context_t(1);
- d_socket = new zmq::socket_t(*d_context, ZMQ_REQ);
-
- int time = 0;
- d_socket->setsockopt(ZMQ_LINGER, &time, sizeof(time));
- d_socket->connect (address);
- }
-
- req_source_impl::~req_source_impl()
- {
- d_socket->close();
- delete d_socket;
- delete d_context;
+ /* All is delegated */
}
int
@@ -71,49 +53,56 @@ namespace gr {
gr_vector_const_void_star &input_items,
gr_vector_void_star &output_items)
{
- char *out = (char*)output_items[0];
+#if 0
+#endif
+ uint8_t *out = (uint8_t *) output_items[0];
+ bool first = true;
+ int done = 0;
+
+ /* Process as much as we can */
+ while (1)
+ {
+ if (has_pending())
+ {
+ /* Flush anything pending */
+ done += flush_pending(out + (done * d_vsize), noutput_items - done,
nitems_written(0) + done);
+
+ /* No more space ? */
+ if (done == noutput_items)
+ break;
+ }
+ else
+ {
+ /* Send request if needed */
+ if (!d_req_pending)
+ {
+ /* The REP/REQ pattern state machine guarantees we can send at
this point */
+ uint32_t req_len = noutput_items - done;
+ zmq::message_t request(sizeof(uint32_t));
+ memcpy ((void *) request.data (), &req_len, sizeof(uint32_t));
+ d_socket->send(request);
+
+ d_req_pending = true;
+ }
- zmq::pollitem_t itemsout[] = { { *d_socket, 0, ZMQ_POLLOUT, 0 } };
- zmq::poll (&itemsout[0], 1, d_timeout);
+ /* Try to get the next message */
+ if (!load_message(first))
+ break; /* No message, we're done for now */
- // If we got a reply, process
- if (itemsout[0].revents & ZMQ_POLLOUT) {
- // Request data, FIXME non portable?
- zmq::message_t request(sizeof(int));
- memcpy ((void *) request.data (), &noutput_items, sizeof(int));
- d_socket->send(request);
- }
+ /* Got response */
+ d_req_pending = false;
- zmq::pollitem_t itemsin[] = { { *d_socket, 0, ZMQ_POLLIN, 0 } };
- zmq::poll(&itemsin[0], 1, d_timeout);
-
- // If we got a reply, process
- if (itemsin[0].revents & ZMQ_POLLIN) {
- // Receive data
- zmq::message_t reply;
- d_socket->recv(&reply);
-
- // Deserialize header data / tags
- std::string buf(static_cast<char*>(reply.data()), reply.size());
-
- if(d_pass_tags){
- uint64_t rcv_offset;
- std::vector<gr::tag_t> tags;
- buf = parse_tag_header(buf, rcv_offset, tags);
- for(size_t i=0; i<tags.size(); i++){
- tags[i].offset -= rcv_offset - nitems_written(0);
- add_item_tag(0, tags[i]);
- }
+ /* Not the first anymore */
+ first = false;
}
-
-
- // Copy to ouput buffer and return
- memcpy(out, (void *)&buf[0], buf.size());
- return buf.size()/(d_itemsize*d_vlen);
}
+ return done;
+
return 0;
}
} /* namespace zeromq */
} /* namespace gr */
+
+// vim: ts=2 sw=2 expandtab
diff --git a/gr-zeromq/lib/req_source_impl.h b/gr-zeromq/lib/req_source_impl.h
index 7c6bc53..8bdbd33 100644
--- a/gr-zeromq/lib/req_source_impl.h
+++ b/gr-zeromq/lib/req_source_impl.h
@@ -26,26 +26,22 @@
#include <gnuradio/zeromq/req_source.h>
#include <zmq.hpp>
+#include "base_impl.h"
+
namespace gr {
namespace zeromq {
- class req_source_impl : public req_source
+ class req_source_impl : public req_source, public base_source_impl
{
- private:
- size_t d_itemsize;
- size_t d_vlen;
- int d_timeout;
- zmq::context_t *d_context;
- zmq::socket_t *d_socket;
- bool d_pass_tags;
-
public:
- req_source_impl(size_t itemsize, size_t vlen, char *address, int
timeout, bool pass_tags);
- ~req_source_impl();
+ req_source_impl(size_t itemsize, size_t vlen, char *address, int
timeout, bool pass_tags, int hwm);
int work(int noutput_items,
gr_vector_const_void_star &input_items,
gr_vector_void_star &output_items);
+
+ private:
+ bool d_req_pending;
};
} // namespace zeromq
diff --git a/gr-zeromq/lib/sub_source_impl.cc b/gr-zeromq/lib/sub_source_impl.cc
index 1242688..9a2e0bf 100644
--- a/gr-zeromq/lib/sub_source_impl.cc
+++ b/gr-zeromq/lib/sub_source_impl.cc
@@ -32,40 +32,20 @@ namespace gr {
namespace zeromq {
sub_source::sptr
- sub_source::make(size_t itemsize, size_t vlen, char *address, int timeout,
bool pass_tags)
+ sub_source::make(size_t itemsize, size_t vlen, char *address, int timeout,
bool pass_tags, int hwm)
{
return gnuradio::get_initial_sptr
- (new sub_source_impl(itemsize, vlen, address, timeout, pass_tags));
+ (new sub_source_impl(itemsize, vlen, address, timeout, pass_tags,
hwm));
}
- sub_source_impl::sub_source_impl(size_t itemsize, size_t vlen, char
*address, int timeout, bool pass_tags)
+ sub_source_impl::sub_source_impl(size_t itemsize, size_t vlen, char
*address, int timeout, bool pass_tags, int hwm)
: gr::sync_block("sub_source",
gr::io_signature::make(0, 0, 0),
gr::io_signature::make(1, 1, itemsize * vlen)),
- d_itemsize(itemsize), d_vlen(vlen), d_timeout(timeout),
d_pass_tags(pass_tags)
+ base_source_impl(ZMQ_SUB, itemsize, vlen, address, timeout, pass_tags,
hwm)
{
- int major, minor, patch;
- zmq::version (&major, &minor, &patch);
-
- if (major < 3) {
- d_timeout = timeout*1000;
- }
-
- d_context = new zmq::context_t(1);
- d_socket = new zmq::socket_t(*d_context, ZMQ_SUB);
-
+ /* Subscribe */
d_socket->setsockopt(ZMQ_SUBSCRIBE, "", 0);
- d_socket->connect (address);
- }
-
- /*
- * Our virtual destructor.
- */
- sub_source_impl::~sub_source_impl()
- {
- d_socket->close();
- delete d_socket;
- delete d_context;
}
int
@@ -73,47 +53,37 @@ namespace gr {
gr_vector_const_void_star &input_items,
gr_vector_void_star &output_items)
{
- char *out = (char*)output_items[0];
-
- zmq::pollitem_t items[] = { { *d_socket, 0, ZMQ_POLLIN, 0 } };
- zmq::poll (&items[0], 1, d_timeout);
-
- // If we got a reply, process
- if (items[0].revents & ZMQ_POLLIN) {
-
- // Receive data
- zmq::message_t msg;
- d_socket->recv(&msg);
-
- // Deserialize header data / tags
- std::string buf(static_cast<char*>(msg.data()), msg.size());
-
- if(d_pass_tags){
- uint64_t rcv_offset;
- std::vector<gr::tag_t> tags;
-
- buf = parse_tag_header(buf, rcv_offset, tags);
-
- for(size_t i=0; i<tags.size(); i++) {
- tags[i].offset -= rcv_offset - nitems_written(0);
- add_item_tag(0, tags[i]);
- }
+ uint8_t *out = (uint8_t *) output_items[0];
+ bool first = true;
+ int done = 0;
+
+ /* Process as much as we can */
+ while (1)
+ {
+ if (has_pending())
+ {
+ /* Flush anything pending */
+ done += flush_pending(out + (done * d_vsize), noutput_items - done,
nitems_written(0) + done);
+
+ /* No more space ? */
+ if (done == noutput_items)
+ break;
}
-
- // Copy to ouput buffer and return
- if (buf.size() >= d_itemsize*d_vlen*noutput_items) {
- memcpy(out, (void *)&buf[0], d_itemsize*d_vlen*noutput_items);
- return noutput_items;
+ else
+ {
+ /* Try to get the next message */
+ if (!load_message(first))
+ break; /* No message, we're done for now */
+
+ /* Not the first anymore */
+ first = false;
}
- else {
- memcpy(out, (void *)&buf[0], buf.size());
- return buf.size()/(d_itemsize*d_vlen);
- }
- }
- else {
- return 0; // FIXME: someday when the scheduler does all the
poll/selects
}
+
+ return done;
}
} /* namespace zeromq */
} /* namespace gr */
+
+// vim: ts=2 sw=2 expandtab
diff --git a/gr-zeromq/lib/sub_source_impl.h b/gr-zeromq/lib/sub_source_impl.h
index 0fa8d17..8f82a9a 100644
--- a/gr-zeromq/lib/sub_source_impl.h
+++ b/gr-zeromq/lib/sub_source_impl.h
@@ -24,28 +24,21 @@
#define INCLUDED_ZEROMQ_SUB_SOURCE_IMPL_H
#include <gnuradio/zeromq/sub_source.h>
-#include "zmq.hpp"
+#include <zmq.hpp>
+
+#include "base_impl.h"
namespace gr {
namespace zeromq {
- class sub_source_impl : public sub_source
+ class sub_source_impl : public sub_source, public base_source_impl
{
- private:
- size_t d_itemsize;
- size_t d_vlen;
- int d_timeout; // microseconds, -1 is blocking
- zmq::context_t *d_context;
- zmq::socket_t *d_socket;
- bool d_pass_tags;
-
- public:
- sub_source_impl(size_t itemsize, size_t vlen, char *address, int
timeout, bool pass_tags);
- ~sub_source_impl();
+ public:
+ sub_source_impl(size_t itemsize, size_t vlen, char *address, int
timeout, bool pass_tags, int hwm);
int work(int noutput_items,
- gr_vector_const_void_star &input_items,
- gr_vector_void_star &output_items);
+ gr_vector_const_void_star &input_items,
+ gr_vector_void_star &output_items);
};
} // namespace zeromq
diff --git a/gr-zeromq/lib/tag_headers.cc b/gr-zeromq/lib/tag_headers.cc
index c970666..5a5a417 100644
--- a/gr-zeromq/lib/tag_headers.cc
+++ b/gr-zeromq/lib/tag_headers.cc
@@ -24,78 +24,88 @@
#include <gnuradio/block.h>
#include <sstream>
#include <cstring>
+#include <zmq.hpp>
-#define GR_HEADER_MAGIC 0x5FF0
+#define GR_HEADER_MAGIC 0x5FF0
#define GR_HEADER_VERSION 0x01
namespace gr {
namespace zeromq {
+ struct membuf: std::streambuf
+ {
+ membuf(void *b, size_t len)
+ {
+ char *bc = static_cast<char*>(b);
+ this->setg(bc, bc, bc+len);
+ }
+ };
+
std::string
- gen_tag_header(uint64_t &offset, std::vector<gr::tag_t> &tags) {
+ gen_tag_header(uint64_t offset, std::vector<gr::tag_t> &tags)
+ {
+ std::stringbuf sb("");
+ std::ostream ss(&sb);
uint16_t header_magic = GR_HEADER_MAGIC;
uint8_t header_version = GR_HEADER_VERSION;
+ uint64_t ntags = (uint64_t)tags.size();
- std::stringstream ss;
- size_t ntags = tags.size();
- ss.write( reinterpret_cast< const char* >( &header_magic ),
sizeof(uint16_t) );
- ss.write( reinterpret_cast< const char* >( &header_version ),
sizeof(uint8_t) );
-
- ss.write( reinterpret_cast< const char* >( &offset ), sizeof(uint64_t)
); // offset
- ss.write( reinterpret_cast< const char* >( &ntags ), sizeof(size_t) );
// num tags
- std::stringbuf sb("");
- //std::cout << "TX TAGS: (offset="<<offset<<" ntags="<<ntags<<")\n";
- for(size_t i=0; i<tags.size(); i++){
- //std::cout << "TX TAG: (" << tags[i].offset << ", " << tags[i].key <<
", " << tags[i].value << ", " << tags[i].srcid << ")\n";
- ss.write( reinterpret_cast< const char* >( &tags[i].offset ),
sizeof(uint64_t) ); // offset
- sb.str("");
- pmt::serialize( tags[i].key, sb );
// key
- pmt::serialize( tags[i].value, sb );
// value
- pmt::serialize( tags[i].srcid, sb );
// srcid
- ss.write( sb.str().c_str() , sb.str().length() );
+ ss.write( (const char*)&header_magic, sizeof(uint16_t) );
+ ss.write( (const char*)&header_version, sizeof(uint8_t) );
+ ss.write( (const char*)&offset, sizeof(uint64_t) );
+ ss.write( (const char*)&ntags, sizeof(uint64_t) );
+
+ for(size_t i=0; i<tags.size(); i++)
+ {
+ ss.write( (const char *)&tags[i].offset, sizeof(uint64_t) );
+ pmt::serialize( tags[i].key, sb );
+ pmt::serialize( tags[i].value, sb );
+ pmt::serialize( tags[i].srcid, sb );
}
- return ss.str();
+ return sb.str();
}
- std::string
- parse_tag_header(std::string &buf_in, uint64_t &offset_out,
std::vector<gr::tag_t> &tags_out) {
+ size_t
+ parse_tag_header(zmq::message_t &msg, uint64_t &offset_out,
std::vector<gr::tag_t> &tags_out)
+ {
+ membuf sb(msg.data(), msg.size());
+ std::istream iss(&sb);
- std::istringstream iss( buf_in );
- size_t rcv_ntags;
+ size_t min_len = sizeof(uint16_t) + sizeof(uint8_t) + sizeof(uint64_t) +
sizeof(uint64_t);
+ if (msg.size() < min_len)
+ throw std::runtime_error("incoming zmq msg too small to hold gr tag
header!");
uint16_t header_magic;
- uint8_t header_version;
+ uint8_t header_version;
+ uint64_t rcv_ntags;
+
+ iss.read( (char*)&header_magic, sizeof(uint16_t) );
+ iss.read( (char*)&header_version, sizeof(uint8_t) );
- iss.read( (char*)&header_magic, sizeof(uint16_t ) );
- iss.read( (char*)&header_version, sizeof(uint8_t ) );
- if(header_magic != GR_HEADER_MAGIC){
+ if(header_magic != GR_HEADER_MAGIC)
throw std::runtime_error("gr header magic does not match!");
- }
- if(header_version != 1){
+
+ if(header_version != 1)
throw std::runtime_error("gr header version too high!");
- }
- iss.read( (char*)&offset_out, sizeof(uint64_t ) );
- iss.read( (char*)&rcv_ntags, sizeof(size_t ) );
- //std::cout << "RX TAGS: (offset="<<offset_out<<"
ntags="<<rcv_ntags<<")\n";
- int rd_offset = sizeof(uint16_t) + sizeof(uint8_t) + sizeof(uint64_t) +
sizeof(size_t);
- std::stringbuf sb( iss.str().substr(rd_offset) );
+ iss.read( (char*)&offset_out, sizeof(uint64_t) );
+ iss.read( (char*)&rcv_ntags, sizeof(uint64_t) );
- for(size_t i=0; i<rcv_ntags; i++){
- gr::tag_t newtag;
- sb.sgetn( (char*) &(newtag.offset), sizeof(uint64_t) );
+ for(size_t i=0; i<rcv_ntags; i++)
+ {
+ gr::tag_t newtag;
+ sb.sgetn( (char*)&(newtag.offset), sizeof(uint64_t) );
newtag.key = pmt::deserialize( sb );
newtag.value = pmt::deserialize( sb );
newtag.srcid = pmt::deserialize( sb );
- //std::cout << "RX TAG: (" << newtag.offset << ", " << newtag.key <<
", " << newtag.value << ", " << newtag.srcid << ")\n";
tags_out.push_back(newtag);
- iss.str(sb.str());
}
- int ndata = sb.in_avail();
- return iss.str().substr(iss.str().size() - ndata);
+ return msg.size() - sb.in_avail();
}
} /* namespace zeromq */
} /* namespace gr */
+
+// vim: ts=2 sw=2 expandtab
diff --git a/gr-zeromq/lib/tag_headers.h b/gr-zeromq/lib/tag_headers.h
index 4c7a812..dede5e9 100644
--- a/gr-zeromq/lib/tag_headers.h
+++ b/gr-zeromq/lib/tag_headers.h
@@ -27,12 +27,13 @@
#include <gnuradio/block.h>
#include <sstream>
#include <cstring>
+#include <zmq.hpp>
namespace gr {
namespace zeromq {
- std::string gen_tag_header(uint64_t &offset, std::vector<gr::tag_t> &tags);
- std::string parse_tag_header(std::string &buf_in, uint64_t &offset_out,
std::vector<gr::tag_t> &tags_out);
+ std::string gen_tag_header(uint64_t offset, std::vector<gr::tag_t> &tags);
+ size_t parse_tag_header(zmq::message_t &msg, uint64_t &offset_out,
std::vector<gr::tag_t> &tags_out);
} /* namespace zeromq */
} /* namespace gr */