[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[Commit-gnuradio] [gnuradio] 08/29: zmq: sync blocks now all support tag
From: |
git |
Subject: |
[Commit-gnuradio] [gnuradio] 08/29: zmq: sync blocks now all support tag headers |
Date: |
Tue, 13 Jan 2015 01:04:27 +0000 (UTC) |
This is an automated email from the git hooks/post-receive script.
jcorgan pushed a commit to branch master
in repository gnuradio.
commit cb4e8856fdb7b1d0f30d01852d57566efe692cd2
Author: Tim O'Shea <address@hidden>
Date: Mon Oct 27 21:31:33 2014 -0400
zmq: sync blocks now all support tag headers
---
gr-zeromq/include/gnuradio/zeromq/push_sink.h | 2 +-
gr-zeromq/include/gnuradio/zeromq/rep_sink.h | 2 +-
gr-zeromq/lib/pub_sink_impl.cc | 2 --
gr-zeromq/lib/push_sink_impl.cc | 30 +++++++++++++++------
gr-zeromq/lib/push_sink_impl.h | 3 ++-
gr-zeromq/lib/rep_sink_impl.cc | 38 +++++++++++++++------------
gr-zeromq/lib/rep_sink_impl.h | 3 ++-
7 files changed, 49 insertions(+), 31 deletions(-)
diff --git a/gr-zeromq/include/gnuradio/zeromq/push_sink.h
b/gr-zeromq/include/gnuradio/zeromq/push_sink.h
index b54a1e4..1b8999e 100644
--- a/gr-zeromq/include/gnuradio/zeromq/push_sink.h
+++ b/gr-zeromq/include/gnuradio/zeromq/push_sink.h
@@ -55,7 +55,7 @@ namespace gr {
* \param timeout Receive timeout in seconds, default is 100ms, 1us
increments
*
*/
- static sptr make(size_t itemsize, size_t vlen, char *address, int
timeout=100);
+ static sptr make(size_t itemsize, size_t vlen, char *address, int
timeout=100, bool pass_tags=false);
};
} // namespace zeromq
diff --git a/gr-zeromq/include/gnuradio/zeromq/rep_sink.h
b/gr-zeromq/include/gnuradio/zeromq/rep_sink.h
index 1da3252..6d3c47b 100644
--- a/gr-zeromq/include/gnuradio/zeromq/rep_sink.h
+++ b/gr-zeromq/include/gnuradio/zeromq/rep_sink.h
@@ -53,7 +53,7 @@ namespace gr {
* \param timeout Receive timeout in seconds, default is 100ms, 1us
increments
*
*/
- static sptr make(size_t itemsize, size_t vlen, char *address, int
timeout=100);
+ static sptr make(size_t itemsize, size_t vlen, char *address, int
timeout=100, bool pass_tags=false);
};
} // namespace zeromq
diff --git a/gr-zeromq/lib/pub_sink_impl.cc b/gr-zeromq/lib/pub_sink_impl.cc
index 43819f3..4573990 100644
--- a/gr-zeromq/lib/pub_sink_impl.cc
+++ b/gr-zeromq/lib/pub_sink_impl.cc
@@ -27,8 +27,6 @@
#include <gnuradio/io_signature.h>
#include "pub_sink_impl.h"
#include "tag_headers.h"
-#include <sstream>
-#include <cstring>
namespace gr {
namespace zeromq {
diff --git a/gr-zeromq/lib/push_sink_impl.cc b/gr-zeromq/lib/push_sink_impl.cc
index d949a7f..4cc9ab9 100644
--- a/gr-zeromq/lib/push_sink_impl.cc
+++ b/gr-zeromq/lib/push_sink_impl.cc
@@ -26,22 +26,23 @@
#include <gnuradio/io_signature.h>
#include "push_sink_impl.h"
+#include "tag_headers.h"
namespace gr {
namespace zeromq {
push_sink::sptr
- push_sink::make(size_t itemsize, size_t vlen, char *address, int timeout)
+ push_sink::make(size_t itemsize, size_t vlen, char *address, int timeout,
bool pass_tags)
{
return gnuradio::get_initial_sptr
- (new push_sink_impl(itemsize, vlen, address, timeout));
+ (new push_sink_impl(itemsize, vlen, address, timeout, pass_tags));
}
- push_sink_impl::push_sink_impl(size_t itemsize, size_t vlen, char
*address, int timeout)
+ push_sink_impl::push_sink_impl(size_t itemsize, size_t vlen, char
*address, int timeout, bool pass_tags)
: gr::sync_block("push_sink",
gr::io_signature::make(1, 1, itemsize * vlen),
gr::io_signature::make(0, 0, 0)),
- d_itemsize(itemsize), d_vlen(vlen), d_timeout(timeout)
+ d_itemsize(itemsize), d_vlen(vlen), d_timeout(timeout),
d_pass_tags(pass_tags)
{
int major, minor, patch;
zmq::version (&major, &minor, &patch);
@@ -74,10 +75,23 @@ namespace gr {
// If we got a reply, process
if (itemsout[0].revents & ZMQ_POLLOUT) {
- // create message copy and send
- zmq::message_t msg(d_itemsize*d_vlen*noutput_items);
- memcpy((void *)msg.data(), in, d_itemsize*d_vlen*noutput_items);
- d_socket->send(msg);
+
+ // encode the current offset, # tags, and tags into header
+ std::string header("");
+
+ if(d_pass_tags){
+ uint64_t offset = nitems_read(0);
+ std::vector<gr::tag_t> tags;
+ get_tags_in_range(tags, 0, nitems_read(0),
nitems_read(0)+noutput_items);
+ header = gen_tag_header( offset, tags );
+ }
+
+ // create message copy and send
+ zmq::message_t msg(header.length() + d_itemsize*d_vlen*noutput_items);
+ if(d_pass_tags)
+ memcpy((void*) msg.data(), header.c_str(), header.length() );
+ memcpy((uint8_t *)msg.data() + header.length(), in,
d_itemsize*d_vlen*noutput_items);
+ d_socket->send(msg);
return noutput_items;
}
diff --git a/gr-zeromq/lib/push_sink_impl.h b/gr-zeromq/lib/push_sink_impl.h
index 9a10065..2590a7f 100644
--- a/gr-zeromq/lib/push_sink_impl.h
+++ b/gr-zeromq/lib/push_sink_impl.h
@@ -37,9 +37,10 @@ namespace gr {
float d_timeout;
zmq::context_t *d_context;
zmq::socket_t *d_socket;
+ bool d_pass_tags;
public:
- push_sink_impl(size_t itemsize, size_t vlen, char *address, int timeout);
+ push_sink_impl(size_t itemsize, size_t vlen, char *address, int timeout,
bool pass_tags=false);
~push_sink_impl();
int work(int noutput_items,
diff --git a/gr-zeromq/lib/rep_sink_impl.cc b/gr-zeromq/lib/rep_sink_impl.cc
index a8fd588..88ed6c1 100644
--- a/gr-zeromq/lib/rep_sink_impl.cc
+++ b/gr-zeromq/lib/rep_sink_impl.cc
@@ -26,22 +26,23 @@
#include <gnuradio/io_signature.h>
#include "rep_sink_impl.h"
+#include "tag_headers.h"
namespace gr {
namespace zeromq {
rep_sink::sptr
- rep_sink::make(size_t itemsize, size_t vlen, char *address, int timeout)
+ rep_sink::make(size_t itemsize, size_t vlen, char *address, int timeout,
bool pass_tags)
{
return gnuradio::get_initial_sptr
- (new rep_sink_impl(itemsize, vlen, address, timeout));
+ (new rep_sink_impl(itemsize, vlen, address, timeout, pass_tags));
}
- rep_sink_impl::rep_sink_impl(size_t itemsize, size_t vlen, char *address,
int timeout)
+ rep_sink_impl::rep_sink_impl(size_t itemsize, size_t vlen, char *address,
int timeout, bool pass_tags)
: gr::sync_block("rep_sink",
gr::io_signature::make(1, 1, itemsize * vlen),
gr::io_signature::make(0, 0, 0)),
- d_itemsize(itemsize), d_vlen(vlen), d_timeout(timeout)
+ d_itemsize(itemsize), d_vlen(vlen), d_timeout(timeout),
d_pass_tags(pass_tags)
{
int major, minor, patch;
zmq::version (&major, &minor, &patch);
@@ -78,22 +79,25 @@ namespace gr {
zmq::message_t request;
d_socket->recv(&request);
int req_output_items = *(static_cast<int*>(request.data()));
+ int nitems_send = std::min(noutput_items, req_output_items);
+
+ // encode the current offset, # tags, and tags into header
+ std::string header("");
+ if(d_pass_tags){
+ uint64_t offset = nitems_read(0);
+ std::vector<gr::tag_t> tags;
+ get_tags_in_range(tags, 0, nitems_read(0),
nitems_read(0)+noutput_items);
+ header = gen_tag_header( offset, tags );
+ }
// create message copy and send
- if (noutput_items < req_output_items) {
- zmq::message_t msg(d_itemsize*d_vlen*noutput_items);
- memcpy((void *)msg.data(), in, d_itemsize*d_vlen*noutput_items);
- d_socket->send(msg);
+ zmq::message_t msg(header.length() + d_itemsize*d_vlen*nitems_send);
+ if(d_pass_tags)
+ memcpy((void*) msg.data(), header.c_str(), header.length() );
+ memcpy((uint8_t *)msg.data() + header.length(), in,
d_itemsize*d_vlen*nitems_send);
+ d_socket->send(msg);
- return noutput_items;
- }
- else {
- zmq::message_t msg(d_itemsize*d_vlen*req_output_items);
- memcpy((void *)msg.data(), in, d_itemsize*d_vlen*req_output_items);
- d_socket->send(msg);
-
- return req_output_items;
- }
+ return nitems_send;
}
return 0;
diff --git a/gr-zeromq/lib/rep_sink_impl.h b/gr-zeromq/lib/rep_sink_impl.h
index ff69735..68bb9eb 100644
--- a/gr-zeromq/lib/rep_sink_impl.h
+++ b/gr-zeromq/lib/rep_sink_impl.h
@@ -37,9 +37,10 @@ namespace gr {
int d_timeout;
zmq::context_t *d_context;
zmq::socket_t *d_socket;
+ bool d_pass_tags;
public:
- rep_sink_impl(size_t itemsize, size_t vlen, char *address, int timeout);
+ rep_sink_impl(size_t itemsize, size_t vlen, char *address, int timeout,
bool pass_tags=false);
~rep_sink_impl();
int work(int noutput_items,
- [Commit-gnuradio] [gnuradio] 20/29: zeromq: cleanup and convert pub_msg_sink to derive from gr::block, (continued)
- [Commit-gnuradio] [gnuradio] 20/29: zeromq: cleanup and convert pub_msg_sink to derive from gr::block, git, 2015/01/12
- [Commit-gnuradio] [gnuradio] 23/29: zeromq: cleanup and made pull_msg_source derive from gr::block, git, 2015/01/12
- [Commit-gnuradio] [gnuradio] 07/29: zmq: all source blocks should now support tag headers, git, 2015/01/12
- [Commit-gnuradio] [gnuradio] 06/29: zmq: hoisting tag parsing into helper function, should now be easy to use in other zmq blocks, git, 2015/01/12
- [Commit-gnuradio] [gnuradio] 14/29: zmq: Adding zmq pub/sub blocks for message passing, git, 2015/01/12
- [Commit-gnuradio] [gnuradio] 21/29: zeromq: cleanup and convert sub_msg_source to derive from gr::block, git, 2015/01/12
- [Commit-gnuradio] [gnuradio] 22/29: zeromq: cleanup and converted push_msg_sink to derive from gr::block, git, 2015/01/12
- [Commit-gnuradio] [gnuradio] 01/29: zmq: encoding tags into zmq stream, git, 2015/01/12
- [Commit-gnuradio] [gnuradio] 10/29: zeromq: fixups and stylistic changes before merge, git, 2015/01/12
- [Commit-gnuradio] [gnuradio] 24/29: zeromq: cleanup and converted rep_msg_sink to derive from gr::block, git, 2015/01/12
- [Commit-gnuradio] [gnuradio] 08/29: zmq: sync blocks now all support tag headers,
git <=
- [Commit-gnuradio] [gnuradio] 12/29: zmq: stream tag passing now works, git, 2015/01/12
- [Commit-gnuradio] [gnuradio] 19/29: Merge remote-tracking branch 'osh/zmqtags' into zmq_tags, git, 2015/01/12
- [Commit-gnuradio] [gnuradio] 29/29: Merge branch 'maint', git, 2015/01/12
- [Commit-gnuradio] [gnuradio] 26/29: zeromq: added msg passing example, git, 2015/01/12
- [Commit-gnuradio] [gnuradio] 03/29: zmq: default to not pass tags (compatible wire format), git, 2015/01/12
- [Commit-gnuradio] [gnuradio] 02/29: zmq: tags should now be serializing and deserializing correctly for pub_sink/sub_source, git, 2015/01/12
- [Commit-gnuradio] [gnuradio] 27/29: zeromq: minor cleanup, git, 2015/01/12
- [Commit-gnuradio] [gnuradio] 25/29: zeromq: cleanup and made req_msg_source derive from gr::block, git, 2015/01/12
- [Commit-gnuradio] [gnuradio] 09/29: zmq: include pass_tags option default to false in all blocks' grc definitions, git, 2015/01/12
- [Commit-gnuradio] [gnuradio] 18/29: zmq: rep/req msg blocks now working, git, 2015/01/12