[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[Commit-gnuradio] [gnuradio] 27/29: zeromq: minor cleanup
From: |
git |
Subject: |
[Commit-gnuradio] [gnuradio] 27/29: zeromq: minor cleanup |
Date: |
Tue, 13 Jan 2015 01:04:29 +0000 (UTC) |
This is an automated email from the git hooks/post-receive script.
jcorgan pushed a commit to branch master
in repository gnuradio.
commit 0b65c1aa92356c9b3d848718b232176b96fda30e
Author: Johnathan Corgan <address@hidden>
Date: Mon Jan 12 16:51:39 2015 -0800
zeromq: minor cleanup
---
gr-zeromq/lib/pull_source_impl.cc | 22 +++++++++++-----------
gr-zeromq/lib/push_sink_impl.cc | 36 ++++++++++++++++++++----------------
gr-zeromq/lib/rep_sink_impl.cc | 19 +++++++++++--------
gr-zeromq/lib/req_source_impl.cc | 21 ++++++++++++---------
gr-zeromq/lib/sub_source_impl.cc | 25 +++++++++++++------------
5 files changed, 67 insertions(+), 56 deletions(-)
diff --git a/gr-zeromq/lib/pull_source_impl.cc
b/gr-zeromq/lib/pull_source_impl.cc
index 87b330a..3215096 100644
--- a/gr-zeromq/lib/pull_source_impl.cc
+++ b/gr-zeromq/lib/pull_source_impl.cc
@@ -46,11 +46,14 @@ namespace gr {
{
int major, minor, patch;
zmq::version (&major, &minor, &patch);
+
if (major < 3) {
d_timeout = timeout*1000;
}
+
d_context = new zmq::context_t(1);
d_socket = new zmq::socket_t(*d_context, ZMQ_PULL);
+
int time = 0;
d_socket->setsockopt(ZMQ_LINGER, &time, sizeof(time));
d_socket->connect (address);
@@ -86,25 +89,22 @@ namespace gr {
// check header for tags...
std::string buf(static_cast<char*>(msg.data()), msg.size());
if(d_pass_tags){
- uint64_t rcv_offset;
- std::vector<gr::tag_t> tags;
- buf = parse_tag_header(buf, rcv_offset, tags);
- for(size_t i=0; i<tags.size(); i++){
- tags[i].offset -= rcv_offset - nitems_written(0);
- add_item_tag(0, tags[i]);
- }
- }
-
+ uint64_t rcv_offset;
+ std::vector<gr::tag_t> tags;
+ buf = parse_tag_header(buf, rcv_offset, tags);
+ for(size_t i=0; i<tags.size(); i++){
+ tags[i].offset -= rcv_offset - nitems_written(0);
+ add_item_tag(0, tags[i]);
+ }
+ }
// Copy to ouput buffer and return
if (buf.size() >= d_itemsize*d_vlen*noutput_items) {
memcpy(out, (void *)&buf[0], d_itemsize*d_vlen*noutput_items);
-
return noutput_items;
}
else {
memcpy(out, (void *)&buf[0], buf.size());
-
return buf.size()/(d_itemsize*d_vlen);
}
}
diff --git a/gr-zeromq/lib/push_sink_impl.cc b/gr-zeromq/lib/push_sink_impl.cc
index 4cc9ab9..677de10 100644
--- a/gr-zeromq/lib/push_sink_impl.cc
+++ b/gr-zeromq/lib/push_sink_impl.cc
@@ -46,11 +46,14 @@ namespace gr {
{
int major, minor, patch;
zmq::version (&major, &minor, &patch);
+
if (major < 3) {
d_timeout = timeout*1000;
}
+
d_context = new zmq::context_t(1);
d_socket = new zmq::socket_t(*d_context, ZMQ_PUSH);
+
int time = 0;
d_socket->setsockopt(ZMQ_LINGER, &time, sizeof(time));
d_socket->bind (address);
@@ -71,32 +74,33 @@ namespace gr {
const char *in = (const char *) input_items[0];
zmq::pollitem_t itemsout[] = { { *d_socket, 0, ZMQ_POLLOUT, 0 } };
- zmq::poll (&itemsout[0], 1, d_timeout);
+ zmq::poll(&itemsout[0], 1, d_timeout);
// If we got a reply, process
if (itemsout[0].revents & ZMQ_POLLOUT) {
- // encode the current offset, # tags, and tags into header
- std::string header("");
-
- if(d_pass_tags){
- uint64_t offset = nitems_read(0);
- std::vector<gr::tag_t> tags;
- get_tags_in_range(tags, 0, nitems_read(0),
nitems_read(0)+noutput_items);
- header = gen_tag_header( offset, tags );
+ // encode the current offset, # tags, and tags into header
+ std::string header("");
+
+ if(d_pass_tags){
+ uint64_t offset = nitems_read(0);
+ std::vector<gr::tag_t> tags;
+ get_tags_in_range(tags, 0, nitems_read(0),
nitems_read(0)+noutput_items);
+ header = gen_tag_header(offset, tags);
}
- // create message copy and send
- zmq::message_t msg(header.length() + d_itemsize*d_vlen*noutput_items);
- if(d_pass_tags)
- memcpy((void*) msg.data(), header.c_str(), header.length() );
- memcpy((uint8_t *)msg.data() + header.length(), in,
d_itemsize*d_vlen*noutput_items);
- d_socket->send(msg);
+ // create message copy and send
+ zmq::message_t msg(header.length() + d_itemsize*d_vlen*noutput_items);
+
+ if(d_pass_tags)
+ memcpy((void*) msg.data(), header.c_str(), header.length());
+ memcpy((uint8_t *)msg.data() + header.length(), in,
d_itemsize*d_vlen*noutput_items);
+ d_socket->send(msg);
return noutput_items;
}
else {
- return 0;
+ return 0; // FIXME: when scheduler supports return blocking
}
}
diff --git a/gr-zeromq/lib/rep_sink_impl.cc b/gr-zeromq/lib/rep_sink_impl.cc
index 88ed6c1..85f9a78 100644
--- a/gr-zeromq/lib/rep_sink_impl.cc
+++ b/gr-zeromq/lib/rep_sink_impl.cc
@@ -46,11 +46,14 @@ namespace gr {
{
int major, minor, patch;
zmq::version (&major, &minor, &patch);
+
if (major < 3) {
d_timeout = timeout*1000;
}
+
d_context = new zmq::context_t(1);
d_socket = new zmq::socket_t(*d_context, ZMQ_REP);
+
int time = 0;
d_socket->setsockopt(ZMQ_LINGER, &time, sizeof(time));
d_socket->bind (address);
@@ -71,7 +74,7 @@ namespace gr {
const char *in = (const char *) input_items[0];
zmq::pollitem_t items[] = { { *d_socket, 0, ZMQ_POLLIN, 0 } };
- zmq::poll (&items[0], 1, d_timeout);
+ zmq::poll(&items[0], 1, d_timeout);
// If we got a reply, process
if (items[0].revents & ZMQ_POLLIN) {
@@ -80,20 +83,20 @@ namespace gr {
d_socket->recv(&request);
int req_output_items = *(static_cast<int*>(request.data()));
int nitems_send = std::min(noutput_items, req_output_items);
-
+
// encode the current offset, # tags, and tags into header
std::string header("");
if(d_pass_tags){
- uint64_t offset = nitems_read(0);
- std::vector<gr::tag_t> tags;
- get_tags_in_range(tags, 0, nitems_read(0),
nitems_read(0)+noutput_items);
- header = gen_tag_header( offset, tags );
- }
+ uint64_t offset = nitems_read(0);
+ std::vector<gr::tag_t> tags;
+ get_tags_in_range(tags, 0, nitems_read(0),
nitems_read(0)+noutput_items);
+ header = gen_tag_header( offset, tags );
+ }
// create message copy and send
zmq::message_t msg(header.length() + d_itemsize*d_vlen*nitems_send);
if(d_pass_tags)
- memcpy((void*) msg.data(), header.c_str(), header.length() );
+ memcpy((void*) msg.data(), header.c_str(), header.length() );
memcpy((uint8_t *)msg.data() + header.length(), in,
d_itemsize*d_vlen*nitems_send);
d_socket->send(msg);
diff --git a/gr-zeromq/lib/req_source_impl.cc b/gr-zeromq/lib/req_source_impl.cc
index 5c5071e..f69d447 100644
--- a/gr-zeromq/lib/req_source_impl.cc
+++ b/gr-zeromq/lib/req_source_impl.cc
@@ -46,11 +46,14 @@ namespace gr {
{
int major, minor, patch;
zmq::version (&major, &minor, &patch);
+
if (major < 3) {
d_timeout = timeout*1000;
}
+
d_context = new zmq::context_t(1);
d_socket = new zmq::socket_t(*d_context, ZMQ_REQ);
+
int time = 0;
d_socket->setsockopt(ZMQ_LINGER, &time, sizeof(time));
d_socket->connect (address);
@@ -82,7 +85,7 @@ namespace gr {
}
zmq::pollitem_t itemsin[] = { { *d_socket, 0, ZMQ_POLLIN, 0 } };
- zmq::poll (&itemsin[0], 1, d_timeout);
+ zmq::poll(&itemsin[0], 1, d_timeout);
// If we got a reply, process
if (itemsin[0].revents & ZMQ_POLLIN) {
@@ -94,14 +97,14 @@ namespace gr {
std::string buf(static_cast<char*>(reply.data()), reply.size());
if(d_pass_tags){
- uint64_t rcv_offset;
- std::vector<gr::tag_t> tags;
- buf = parse_tag_header(buf, rcv_offset, tags);
- for(size_t i=0; i<tags.size(); i++){
- tags[i].offset -= rcv_offset - nitems_written(0);
- add_item_tag(0, tags[i]);
- }
- }
+ uint64_t rcv_offset;
+ std::vector<gr::tag_t> tags;
+ buf = parse_tag_header(buf, rcv_offset, tags);
+ for(size_t i=0; i<tags.size(); i++){
+ tags[i].offset -= rcv_offset - nitems_written(0);
+ add_item_tag(0, tags[i]);
+ }
+ }
// Copy to ouput buffer and return
diff --git a/gr-zeromq/lib/sub_source_impl.cc b/gr-zeromq/lib/sub_source_impl.cc
index 813ff5a..1242688 100644
--- a/gr-zeromq/lib/sub_source_impl.cc
+++ b/gr-zeromq/lib/sub_source_impl.cc
@@ -46,12 +46,14 @@ namespace gr {
{
int major, minor, patch;
zmq::version (&major, &minor, &patch);
+
if (major < 3) {
d_timeout = timeout*1000;
}
+
d_context = new zmq::context_t(1);
d_socket = new zmq::socket_t(*d_context, ZMQ_SUB);
- //int time = 0;
+
d_socket->setsockopt(ZMQ_SUBSCRIBE, "", 0);
d_socket->connect (address);
}
@@ -87,17 +89,16 @@ namespace gr {
std::string buf(static_cast<char*>(msg.data()), msg.size());
if(d_pass_tags){
- uint64_t rcv_offset;
- std::vector<gr::tag_t> tags;
- //int olen = buf.size();
- buf = parse_tag_header(buf, rcv_offset, tags);
- //std::cout << "SUB: Header Len = " << olen - buf.size() << ",
data len = " << buf.size() << "\n";
- for(size_t i=0; i<tags.size(); i++){
- //std::cout << "add item tag ... (offset = " << tags[i].offset
<< " rcv_offset = " << rcv_offset << " nitems_read(0) = " << nitems_written(0)
<< "\n";
- tags[i].offset -= rcv_offset - nitems_written(0);
- add_item_tag(0, tags[i]);
- }
- }
+ uint64_t rcv_offset;
+ std::vector<gr::tag_t> tags;
+
+ buf = parse_tag_header(buf, rcv_offset, tags);
+
+ for(size_t i=0; i<tags.size(); i++) {
+ tags[i].offset -= rcv_offset - nitems_written(0);
+ add_item_tag(0, tags[i]);
+ }
+ }
// Copy to ouput buffer and return
if (buf.size() >= d_itemsize*d_vlen*noutput_items) {
- [Commit-gnuradio] [gnuradio] 01/29: zmq: encoding tags into zmq stream, (continued)
- [Commit-gnuradio] [gnuradio] 01/29: zmq: encoding tags into zmq stream, git, 2015/01/12
- [Commit-gnuradio] [gnuradio] 10/29: zeromq: fixups and stylistic changes before merge, git, 2015/01/12
- [Commit-gnuradio] [gnuradio] 24/29: zeromq: cleanup and converted rep_msg_sink to derive from gr::block, git, 2015/01/12
- [Commit-gnuradio] [gnuradio] 08/29: zmq: sync blocks now all support tag headers, git, 2015/01/12
- [Commit-gnuradio] [gnuradio] 12/29: zmq: stream tag passing now works, git, 2015/01/12
- [Commit-gnuradio] [gnuradio] 19/29: Merge remote-tracking branch 'osh/zmqtags' into zmq_tags, git, 2015/01/12
- [Commit-gnuradio] [gnuradio] 29/29: Merge branch 'maint', git, 2015/01/12
- [Commit-gnuradio] [gnuradio] 26/29: zeromq: added msg passing example, git, 2015/01/12
- [Commit-gnuradio] [gnuradio] 03/29: zmq: default to not pass tags (compatible wire format), git, 2015/01/12
- [Commit-gnuradio] [gnuradio] 02/29: zmq: tags should now be serializing and deserializing correctly for pub_sink/sub_source, git, 2015/01/12
- [Commit-gnuradio] [gnuradio] 27/29: zeromq: minor cleanup,
git <=
- [Commit-gnuradio] [gnuradio] 25/29: zeromq: cleanup and made req_msg_source derive from gr::block, git, 2015/01/12
- [Commit-gnuradio] [gnuradio] 09/29: zmq: include pass_tags option default to false in all blocks' grc definitions, git, 2015/01/12
- [Commit-gnuradio] [gnuradio] 18/29: zmq: rep/req msg blocks now working, git, 2015/01/12
- [Commit-gnuradio] [gnuradio] 05/29: zmq: hoisting header encoding to helper function, git, 2015/01/12
- [Commit-gnuradio] [gnuradio] 04/29: zmq: adding pass tags option to zmq pub/sub block grc xml defs, git, 2015/01/12
- [Commit-gnuradio] [gnuradio] 15/29: zmq: building working versions of additional zmq message blocks in place (push/pull + rep/req), git, 2015/01/12
- [Commit-gnuradio] [gnuradio] 28/29: zeromq: added stream tag passing example, git, 2015/01/12