[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[Commit-gnuradio] [gnuradio] 24/29: zeromq: cleanup and converted rep_ms
From: |
git |
Subject: |
[Commit-gnuradio] [gnuradio] 24/29: zeromq: cleanup and converted rep_msg_sink to derive from gr::block |
Date: |
Tue, 13 Jan 2015 01:04:29 +0000 (UTC) |
This is an automated email from the git hooks/post-receive script.
jcorgan pushed a commit to branch master
in repository gnuradio.
commit 7bf8b05bde26bfb1e2df684e2ab55b878b25350c
Author: Johnathan Corgan <address@hidden>
Date: Mon Jan 12 15:50:00 2015 -0800
zeromq: cleanup and converted rep_msg_sink to derive from gr::block
---
gr-zeromq/include/gnuradio/zeromq/rep_msg_sink.h | 14 +--
gr-zeromq/lib/rep_msg_sink_impl.cc | 111 ++++++++++-------------
gr-zeromq/lib/rep_msg_sink_impl.h | 4 +-
3 files changed, 54 insertions(+), 75 deletions(-)
diff --git a/gr-zeromq/include/gnuradio/zeromq/rep_msg_sink.h
b/gr-zeromq/include/gnuradio/zeromq/rep_msg_sink.h
index b0fd23c..97f3d83 100644
--- a/gr-zeromq/include/gnuradio/zeromq/rep_msg_sink.h
+++ b/gr-zeromq/include/gnuradio/zeromq/rep_msg_sink.h
@@ -24,22 +24,22 @@
#define INCLUDED_ZEROMQ_REP_MSG_SINK_H
#include <gnuradio/zeromq/api.h>
-#include <gnuradio/sync_block.h>
+#include <gnuradio/block.h>
namespace gr {
namespace zeromq {
/*!
- * \brief Sink the contents of a stream to a ZMQ REP socket
+ * \brief Sink the contents of a msg port to a ZMQ REP socket
* \ingroup zeromq
*
* \details
- * This block acts a a streaming sink for a GNU Radio flowgraph
- * and writes its contents to a ZMQ REP socket. A REP socket will
- * only send its contents to an attached REQ socket when it
- * requests items.
+ * This block acts a message port receiver and writes individual
+ * messages to a ZMQ REP socket. The corresponding receiving ZMQ
+ * REQ socket can be either another gr-zeromq source block or a
+ * non-GNU Radio ZMQ socket.
*/
- class ZEROMQ_API rep_msg_sink : virtual public gr::sync_block
+ class ZEROMQ_API rep_msg_sink : virtual public gr::block
{
public:
typedef boost::shared_ptr<rep_msg_sink> sptr;
diff --git a/gr-zeromq/lib/rep_msg_sink_impl.cc
b/gr-zeromq/lib/rep_msg_sink_impl.cc
index 0a18a8b..be86f83 100644
--- a/gr-zeromq/lib/rep_msg_sink_impl.cc
+++ b/gr-zeromq/lib/rep_msg_sink_impl.cc
@@ -39,25 +39,26 @@ namespace gr {
}
rep_msg_sink_impl::rep_msg_sink_impl(char *address, int timeout)
- : gr::sync_block("rep_msg_sink",
- gr::io_signature::make(0, 0, 0),
- gr::io_signature::make(0, 0, 0)),
- d_timeout(timeout)
+ : gr::block("rep_msg_sink",
+ gr::io_signature::make(0, 0, 0),
+ gr::io_signature::make(0, 0, 0)),
+ d_timeout(timeout)
{
int major, minor, patch;
- zmq::version (&major, &minor, &patch);
+ zmq::version(&major, &minor, &patch);
+
if (major < 3) {
d_timeout = timeout*1000;
}
+
d_context = new zmq::context_t(1);
d_socket = new zmq::socket_t(*d_context, ZMQ_REP);
+
int time = 0;
d_socket->setsockopt(ZMQ_LINGER, &time, sizeof(time));
d_socket->bind (address);
message_port_register_in(pmt::mp("in"));
-// set_msg_handler( pmt::mp("in"),
-// boost::bind(&rep_msg_sink_impl::handler, this, _1));
}
rep_msg_sink_impl::~rep_msg_sink_impl()
@@ -67,75 +68,55 @@ namespace gr {
delete d_context;
}
- bool rep_msg_sink_impl::start(){
+ bool rep_msg_sink_impl::start()
+ {
d_finished = false;
- d_thread = new boost::thread( boost::bind( &rep_msg_sink_impl::readloop
, this ) );
+ d_thread = new boost::thread(boost::bind(&rep_msg_sink_impl::readloop,
this));
return true;
}
- bool rep_msg_sink_impl::stop(){
+ bool rep_msg_sink_impl::stop()
+ {
d_finished = true;
d_thread->join();
return true;
}
-/*
- void rep_msg_sink_impl::handler(pmt::pmt_t msg){
- std::stringbuf sb("");
- pmt::serialize( msg, sb );
- std::string s = sb.str();
- zmq::message_t zmsg(s.size());
- memcpy( zmsg.data(), s.c_str(), s.size() );
- d_socket->send(zmsg);
- }
-*/
-
- int
- rep_msg_sink_impl::work(int noutput_items,
- gr_vector_const_void_star &input_items,
- gr_vector_void_star &output_items)
+ void rep_msg_sink_impl::readloop()
{
- return noutput_items;
+ while(!d_finished) {
+
+ // while we have data, wait for query...
+ while(!empty_p(pmt::mp("in"))) {
+
+ // wait for query...
+ zmq::pollitem_t items[] = { { *d_socket, 0, ZMQ_POLLIN, 0 } };
+ zmq::poll (&items[0], 1, d_timeout);
+
+ // If we got a reply, process
+ if (items[0].revents & ZMQ_POLLIN) {
+
+ // receive data request
+ zmq::message_t request;
+ d_socket->recv(&request);
+
+ int req_output_items = *(static_cast<int*>(request.data()));
+ if(req_output_items != 1)
+ throw std::runtime_error("Request was not 1 msg for rep/req
request!!");
+
+ // create message copy and send
+ pmt::pmt_t msg = delete_head_nowait(pmt::mp("in"));
+ std::stringbuf sb("");
+ pmt::serialize( msg, sb );
+ std::string s = sb.str();
+ zmq::message_t zmsg(s.size());
+ memcpy( zmsg.data(), s.c_str(), s.size() );
+ d_socket->send(zmsg);
+ } // if req
+ } // while !empty
+
+ } // while !d_finished
}
- void rep_msg_sink_impl::readloop(){
-
- while(!d_finished){
-
- // while we have data, wait for query...
- while(!empty_p(pmt::mp("in"))){
-
- //std::cout << "wait for req ...\n";
- // wait for query...
- zmq::pollitem_t items[] = { { *d_socket, 0, ZMQ_POLLIN, 0 } };
- zmq::poll (&items[0], 1, d_timeout);
-
- // If we got a reply, process
- if (items[0].revents & ZMQ_POLLIN) {
- //std::cout << "wait for req ... got req\n";
- // receive data request
- zmq::message_t request;
- d_socket->recv(&request);
- int req_output_items = *(static_cast<int*>(request.data()));
- if(req_output_items != 1)
- throw std::runtime_error("Request was not 1 msg for rep/req
request!!");
-
- // create message copy and send
- //std::cout << "get pmt in\n";
- pmt::pmt_t msg = delete_head_nowait(pmt::mp("in"));
- std::stringbuf sb("");
- pmt::serialize( msg, sb );
- std::string s = sb.str();
- zmq::message_t zmsg(s.size());
- memcpy( zmsg.data(), s.c_str(), s.size() );
- //std::cout << "send pmt zmq\n";
- d_socket->send(zmsg);
- } // if req
-
- } // while !empty
-
- } // while !d_finished
-
- }
} /* namespace zeromq */
} /* namespace gr */
diff --git a/gr-zeromq/lib/rep_msg_sink_impl.h
b/gr-zeromq/lib/rep_msg_sink_impl.h
index 25bd0e8..d37a409 100644
--- a/gr-zeromq/lib/rep_msg_sink_impl.h
+++ b/gr-zeromq/lib/rep_msg_sink_impl.h
@@ -37,15 +37,13 @@ namespace gr {
zmq::socket_t *d_socket;
boost::thread *d_thread;
bool d_finished;
+
void readloop();
public:
rep_msg_sink_impl(char *address, int timeout);
~rep_msg_sink_impl();
- int work(int noutput_items,
- gr_vector_const_void_star &input_items,
- gr_vector_void_star &output_items);
bool start();
bool stop();
};
- [Commit-gnuradio] [gnuradio] 17/29: zmq: pull_msg_source should be working, (continued)
- [Commit-gnuradio] [gnuradio] 17/29: zmq: pull_msg_source should be working, git, 2015/01/12
- [Commit-gnuradio] [gnuradio] 20/29: zeromq: cleanup and convert pub_msg_sink to derive from gr::block, git, 2015/01/12
- [Commit-gnuradio] [gnuradio] 23/29: zeromq: cleanup and made pull_msg_source derive from gr::block, git, 2015/01/12
- [Commit-gnuradio] [gnuradio] 07/29: zmq: all source blocks should now support tag headers, git, 2015/01/12
- [Commit-gnuradio] [gnuradio] 06/29: zmq: hoisting tag parsing into helper function, should now be easy to use in other zmq blocks, git, 2015/01/12
- [Commit-gnuradio] [gnuradio] 14/29: zmq: Adding zmq pub/sub blocks for message passing, git, 2015/01/12
- [Commit-gnuradio] [gnuradio] 21/29: zeromq: cleanup and convert sub_msg_source to derive from gr::block, git, 2015/01/12
- [Commit-gnuradio] [gnuradio] 22/29: zeromq: cleanup and converted push_msg_sink to derive from gr::block, git, 2015/01/12
- [Commit-gnuradio] [gnuradio] 01/29: zmq: encoding tags into zmq stream, git, 2015/01/12
- [Commit-gnuradio] [gnuradio] 10/29: zeromq: fixups and stylistic changes before merge, git, 2015/01/12
- [Commit-gnuradio] [gnuradio] 24/29: zeromq: cleanup and converted rep_msg_sink to derive from gr::block,
git <=
- [Commit-gnuradio] [gnuradio] 08/29: zmq: sync blocks now all support tag headers, git, 2015/01/12
- [Commit-gnuradio] [gnuradio] 12/29: zmq: stream tag passing now works, git, 2015/01/12
- [Commit-gnuradio] [gnuradio] 19/29: Merge remote-tracking branch 'osh/zmqtags' into zmq_tags, git, 2015/01/12
- [Commit-gnuradio] [gnuradio] 29/29: Merge branch 'maint', git, 2015/01/12
- [Commit-gnuradio] [gnuradio] 26/29: zeromq: added msg passing example, git, 2015/01/12
- [Commit-gnuradio] [gnuradio] 03/29: zmq: default to not pass tags (compatible wire format), git, 2015/01/12
- [Commit-gnuradio] [gnuradio] 02/29: zmq: tags should now be serializing and deserializing correctly for pub_sink/sub_source, git, 2015/01/12
- [Commit-gnuradio] [gnuradio] 27/29: zeromq: minor cleanup, git, 2015/01/12
- [Commit-gnuradio] [gnuradio] 25/29: zeromq: cleanup and made req_msg_source derive from gr::block, git, 2015/01/12
- [Commit-gnuradio] [gnuradio] 09/29: zmq: include pass_tags option default to false in all blocks' grc definitions, git, 2015/01/12