[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[Commit-gnuradio] [gnuradio] 18/29: zmq: rep/req msg blocks now working
From: |
git |
Subject: |
[Commit-gnuradio] [gnuradio] 18/29: zmq: rep/req msg blocks now working |
Date: |
Tue, 13 Jan 2015 01:04:28 +0000 (UTC) |
This is an automated email from the git hooks/post-receive script.
jcorgan pushed a commit to branch master
in repository gnuradio.
commit 18944a9a761eb7c2256e4ad450b943f09664c410
Author: Tim O'Shea <address@hidden>
Date: Tue Dec 30 18:42:48 2014 +0100
zmq: rep/req msg blocks now working
---
gr-zeromq/lib/rep_msg_sink_impl.cc | 66 +++++++++++++++---------
gr-zeromq/lib/rep_msg_sink_impl.h | 6 ++-
gr-zeromq/lib/req_msg_source_impl.cc | 99 ++++++++++++++++++++----------------
gr-zeromq/lib/req_msg_source_impl.h | 5 ++
4 files changed, 108 insertions(+), 68 deletions(-)
diff --git a/gr-zeromq/lib/rep_msg_sink_impl.cc
b/gr-zeromq/lib/rep_msg_sink_impl.cc
index 72dd5fb..0a18a8b 100644
--- a/gr-zeromq/lib/rep_msg_sink_impl.cc
+++ b/gr-zeromq/lib/rep_msg_sink_impl.cc
@@ -56,8 +56,8 @@ namespace gr {
d_socket->bind (address);
message_port_register_in(pmt::mp("in"));
- set_msg_handler( pmt::mp("in"),
- boost::bind(&rep_msg_sink_impl::handler, this, _1));
+// set_msg_handler( pmt::mp("in"),
+// boost::bind(&rep_msg_sink_impl::handler, this, _1));
}
rep_msg_sink_impl::~rep_msg_sink_impl()
@@ -67,6 +67,19 @@ namespace gr {
delete d_context;
}
+ bool rep_msg_sink_impl::start(){
+ d_finished = false;
+ d_thread = new boost::thread( boost::bind( &rep_msg_sink_impl::readloop
, this ) );
+ return true;
+ }
+
+ bool rep_msg_sink_impl::stop(){
+ d_finished = true;
+ d_thread->join();
+ return true;
+ }
+
+/*
void rep_msg_sink_impl::handler(pmt::pmt_t msg){
std::stringbuf sb("");
pmt::serialize( msg, sb );
@@ -75,47 +88,54 @@ namespace gr {
memcpy( zmsg.data(), s.c_str(), s.size() );
d_socket->send(zmsg);
}
+*/
int
rep_msg_sink_impl::work(int noutput_items,
gr_vector_const_void_star &input_items,
gr_vector_void_star &output_items)
{
- return noutput_items;
-/*
- const char *in = (const char *) input_items[0];
+ return noutput_items;
+ }
+ void rep_msg_sink_impl::readloop(){
+
+ while(!d_finished){
+
+ // while we have data, wait for query...
+ while(!empty_p(pmt::mp("in"))){
+
+ //std::cout << "wait for req ...\n";
+ // wait for query...
zmq::pollitem_t items[] = { { *d_socket, 0, ZMQ_POLLIN, 0 } };
zmq::poll (&items[0], 1, d_timeout);
// If we got a reply, process
if (items[0].revents & ZMQ_POLLIN) {
+ //std::cout << "wait for req ... got req\n";
// receive data request
zmq::message_t request;
d_socket->recv(&request);
int req_output_items = *(static_cast<int*>(request.data()));
- int nitems_send = std::min(noutput_items, req_output_items);
+ if(req_output_items != 1)
+ throw std::runtime_error("Request was not 1 msg for rep/req
request!!");
- // encode the current offset, # tags, and tags into header
- std::string header("");
- if(d_pass_tags){
- uint64_t offset = nitems_read(0);
- std::vector<gr::tag_t> tags;
- get_tags_in_range(tags, 0, nitems_read(0),
nitems_read(0)+noutput_items);
- header = gen_tag_header( offset, tags );
- }
-
// create message copy and send
- zmq::message_t msg(header.length() + d_itemsize*d_vlen*nitems_send);
- if(d_pass_tags)
- memcpy((void*) msg.data(), header.c_str(), header.length() );
- memcpy((uint8_t *)msg.data() + header.length(), in,
d_itemsize*d_vlen*nitems_send);
- d_socket->send(msg);
+ //std::cout << "get pmt in\n";
+ pmt::pmt_t msg = delete_head_nowait(pmt::mp("in"));
+ std::stringbuf sb("");
+ pmt::serialize( msg, sb );
+ std::string s = sb.str();
+ zmq::message_t zmsg(s.size());
+ memcpy( zmsg.data(), s.c_str(), s.size() );
+ //std::cout << "send pmt zmq\n";
+ d_socket->send(zmsg);
+ } // if req
- return nitems_send;
- }
+ } // while !empty
+
+ } // while !d_finished
- return 0;*/
}
} /* namespace zeromq */
} /* namespace gr */
diff --git a/gr-zeromq/lib/rep_msg_sink_impl.h
b/gr-zeromq/lib/rep_msg_sink_impl.h
index 40d7969..25bd0e8 100644
--- a/gr-zeromq/lib/rep_msg_sink_impl.h
+++ b/gr-zeromq/lib/rep_msg_sink_impl.h
@@ -35,6 +35,9 @@ namespace gr {
int d_timeout;
zmq::context_t *d_context;
zmq::socket_t *d_socket;
+ boost::thread *d_thread;
+ bool d_finished;
+ void readloop();
public:
rep_msg_sink_impl(char *address, int timeout);
@@ -43,7 +46,8 @@ namespace gr {
int work(int noutput_items,
gr_vector_const_void_star &input_items,
gr_vector_void_star &output_items);
- void handler(pmt::pmt_t msg);
+ bool start();
+ bool stop();
};
} // namespace zeromq
diff --git a/gr-zeromq/lib/req_msg_source_impl.cc
b/gr-zeromq/lib/req_msg_source_impl.cc
index 92e26e9..2dda152 100644
--- a/gr-zeromq/lib/req_msg_source_impl.cc
+++ b/gr-zeromq/lib/req_msg_source_impl.cc
@@ -65,56 +65,67 @@ namespace gr {
delete d_context;
}
+ bool req_msg_source_impl::start(){
+ d_finished = false;
+ d_thread = new boost::thread( boost::bind(
&req_msg_source_impl::readloop , this ) );
+ return true;
+ }
+
+ bool req_msg_source_impl::stop(){
+ d_finished = true;
+ d_thread->join();
+ return true;
+ }
+
+ void req_msg_source_impl::readloop(){
+ while(!d_finished){
+ //std::cout << "readloop\n";
+
+ zmq::pollitem_t itemsout[] = { { *d_socket, 0, ZMQ_POLLOUT, 0 } };
+ zmq::poll (&itemsout[0], 1, d_timeout);
+
+ // If we got a reply, process
+ if (itemsout[0].revents & ZMQ_POLLOUT) {
+ // Request data, FIXME non portable?
+ int nmsg = 1;
+ zmq::message_t request(sizeof(int));
+ memcpy ((void *) request.data (), &nmsg, sizeof(int));
+ d_socket->send(request);
+ //std::cout << "sent request...\n";
+ }
+
+ zmq::pollitem_t items[] = { { *d_socket, 0, ZMQ_POLLIN, 0 } };
+ zmq::poll (&items[0], 1, d_timeout);
+ //std::cout << "rx response...\n";
+
+ // If we got a reply, process
+ if (items[0].revents & ZMQ_POLLIN) {
+ //std::cout << "rx response... got data\n";
+
+ // Receive data
+ zmq::message_t msg;
+ d_socket->recv(&msg);
+
+ //std::cout << "got msg...\n";
+
+ std::string buf(static_cast<char*>(msg.data()), msg.size());
+ std::stringbuf sb(buf);
+ pmt::pmt_t m = pmt::deserialize(sb);
+ //std::cout << m << "\n";
+ message_port_pub(pmt::mp("out"), m);
+
+ } else {
+ usleep(100);
+ }
+ }
+ }
+
int
req_msg_source_impl::work(int noutput_items,
gr_vector_const_void_star &input_items,
gr_vector_void_star &output_items)
{
return noutput_items;
-/*
- char *out = (char*)output_items[0];
-
- zmq::pollitem_t itemsout[] = { { *d_socket, 0, ZMQ_POLLOUT, 0 } };
- zmq::poll (&itemsout[0], 1, d_timeout);
-
- // If we got a reply, process
- if (itemsout[0].revents & ZMQ_POLLOUT) {
- // Request data, FIXME non portable?
- zmq::message_t request(sizeof(int));
- memcpy ((void *) request.data (), &noutput_items, sizeof(int));
- d_socket->send(request);
- }
-
- zmq::pollitem_t itemsin[] = { { *d_socket, 0, ZMQ_POLLIN, 0 } };
- zmq::poll (&itemsin[0], 1, d_timeout);
-
- // If we got a reply, process
- if (itemsin[0].revents & ZMQ_POLLIN) {
- // Receive data
- zmq::message_t reply;
- d_socket->recv(&reply);
-
- // Deserialize header data / tags
- std::string buf(static_cast<char*>(reply.data()), reply.size());
-
- if(d_pass_tags){
- uint64_t rcv_offset;
- std::vector<gr::tag_t> tags;
- buf = parse_tag_header(buf, rcv_offset, tags);
- for(size_t i=0; i<tags.size(); i++){
- tags[i].offset -= rcv_offset - nitems_written(0);
- add_item_tag(0, tags[i]);
- }
- }
-
-
- // Copy to ouput buffer and return
- memcpy(out, (void *)&buf[0], buf.size());
- return buf.size()/(d_itemsize*d_vlen);
- }
-
- return 0;
- */
}
} /* namespace zeromq */
diff --git a/gr-zeromq/lib/req_msg_source_impl.h
b/gr-zeromq/lib/req_msg_source_impl.h
index 635fa45..3a69174 100644
--- a/gr-zeromq/lib/req_msg_source_impl.h
+++ b/gr-zeromq/lib/req_msg_source_impl.h
@@ -35,11 +35,16 @@ namespace gr {
int d_timeout;
zmq::context_t *d_context;
zmq::socket_t *d_socket;
+ void readloop();
+ boost::thread *d_thread;
public:
req_msg_source_impl(char *address, int timeout);
~req_msg_source_impl();
+ bool start();
+ bool stop();
+ bool d_finished;
int work(int noutput_items,
gr_vector_const_void_star &input_items,
gr_vector_void_star &output_items);
- [Commit-gnuradio] [gnuradio] 08/29: zmq: sync blocks now all support tag headers, (continued)
- [Commit-gnuradio] [gnuradio] 08/29: zmq: sync blocks now all support tag headers, git, 2015/01/12
- [Commit-gnuradio] [gnuradio] 12/29: zmq: stream tag passing now works, git, 2015/01/12
- [Commit-gnuradio] [gnuradio] 19/29: Merge remote-tracking branch 'osh/zmqtags' into zmq_tags, git, 2015/01/12
- [Commit-gnuradio] [gnuradio] 29/29: Merge branch 'maint', git, 2015/01/12
- [Commit-gnuradio] [gnuradio] 26/29: zeromq: added msg passing example, git, 2015/01/12
- [Commit-gnuradio] [gnuradio] 03/29: zmq: default to not pass tags (compatible wire format), git, 2015/01/12
- [Commit-gnuradio] [gnuradio] 02/29: zmq: tags should now be serializing and deserializing correctly for pub_sink/sub_source, git, 2015/01/12
- [Commit-gnuradio] [gnuradio] 27/29: zeromq: minor cleanup, git, 2015/01/12
- [Commit-gnuradio] [gnuradio] 25/29: zeromq: cleanup and made req_msg_source derive from gr::block, git, 2015/01/12
- [Commit-gnuradio] [gnuradio] 09/29: zmq: include pass_tags option default to false in all blocks' grc definitions, git, 2015/01/12
- [Commit-gnuradio] [gnuradio] 18/29: zmq: rep/req msg blocks now working,
git <=
- [Commit-gnuradio] [gnuradio] 05/29: zmq: hoisting header encoding to helper function, git, 2015/01/12
- [Commit-gnuradio] [gnuradio] 04/29: zmq: adding pass tags option to zmq pub/sub block grc xml defs, git, 2015/01/12
- [Commit-gnuradio] [gnuradio] 15/29: zmq: building working versions of additional zmq message blocks in place (push/pull + rep/req), git, 2015/01/12
- [Commit-gnuradio] [gnuradio] 28/29: zeromq: added stream tag passing example, git, 2015/01/12