[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[Commit-gnuradio] [gnuradio] 23/29: zeromq: cleanup and made pull_msg_so
From: |
git |
Subject: |
[Commit-gnuradio] [gnuradio] 23/29: zeromq: cleanup and made pull_msg_source derive from gr::block |
Date: |
Tue, 13 Jan 2015 01:04:29 +0000 (UTC) |
This is an automated email from the git hooks/post-receive script.
jcorgan pushed a commit to branch master
in repository gnuradio.
commit 3a443cfe307c4f2c5f03806146d4ad1664faa288
Author: Johnathan Corgan <address@hidden>
Date: Mon Jan 12 14:24:48 2015 -0800
zeromq: cleanup and made pull_msg_source derive from gr::block
---
.../include/gnuradio/zeromq/pull_msg_source.h | 10 ++--
gr-zeromq/lib/pull_msg_source_impl.cc | 59 ++++++++++------------
gr-zeromq/lib/pull_msg_source_impl.h | 9 ++--
3 files changed, 35 insertions(+), 43 deletions(-)
diff --git a/gr-zeromq/include/gnuradio/zeromq/pull_msg_source.h
b/gr-zeromq/include/gnuradio/zeromq/pull_msg_source.h
index 7d87ba1..1749515 100644
--- a/gr-zeromq/include/gnuradio/zeromq/pull_msg_source.h
+++ b/gr-zeromq/include/gnuradio/zeromq/pull_msg_source.h
@@ -24,20 +24,20 @@
#define INCLUDED_ZEROMQ_PULL_MSG_SOURCE_H
#include <gnuradio/zeromq/api.h>
-#include <gnuradio/sync_block.h>
+#include <gnuradio/block.h>
namespace gr {
namespace zeromq {
/*!
- * \brief Receive messages on ZMQ PULL socket and source stream
+ * \brief Receive messages on ZMQ PULL socket and output async messages
* \ingroup zeromq
*
* \details
- * This block will connect to a ZMQ PUSH socket, then produce all
- * incoming messages as streaming output.
+ * This block will connect to a ZMQ PUSH socket, then convert
+ * received messages to outgoing async messages.
*/
- class ZEROMQ_API pull_msg_source : virtual public gr::sync_block
+ class ZEROMQ_API pull_msg_source : virtual public gr::block
{
public:
typedef boost::shared_ptr<pull_msg_source> sptr;
diff --git a/gr-zeromq/lib/pull_msg_source_impl.cc
b/gr-zeromq/lib/pull_msg_source_impl.cc
index 0c848fc..ca496ef 100644
--- a/gr-zeromq/lib/pull_msg_source_impl.cc
+++ b/gr-zeromq/lib/pull_msg_source_impl.cc
@@ -39,18 +39,21 @@ namespace gr {
}
pull_msg_source_impl::pull_msg_source_impl(char *address, int timeout)
- : gr::sync_block("pull_msg_source",
- gr::io_signature::make(0, 0, 0),
- gr::io_signature::make(0, 0, 0)),
+ : gr::block("pull_msg_source",
+ gr::io_signature::make(0, 0, 0),
+ gr::io_signature::make(0, 0, 0)),
d_timeout(timeout)
{
int major, minor, patch;
zmq::version (&major, &minor, &patch);
+
if (major < 3) {
d_timeout = timeout*1000;
}
+
d_context = new zmq::context_t(1);
d_socket = new zmq::socket_t(*d_context, ZMQ_PULL);
+
int time = 0;
d_socket->setsockopt(ZMQ_LINGER, &time, sizeof(time));
d_socket->connect (address);
@@ -65,53 +68,43 @@ namespace gr {
delete d_context;
}
- bool pull_msg_source_impl::start(){
+ bool pull_msg_source_impl::start()
+ {
d_finished = false;
- d_thread = new boost::thread( boost::bind(
&pull_msg_source_impl::readloop , this ) );
+ d_thread = new
boost::thread(boost::bind(&pull_msg_source_impl::readloop, this));
return true;
}
- bool pull_msg_source_impl::stop(){
+ bool pull_msg_source_impl::stop()
+ {
d_finished = true;
- d_thread->join();
+ d_thread->join();
return true;
}
- void pull_msg_source_impl::readloop(){
+ void pull_msg_source_impl::readloop()
+ {
while(!d_finished){
- //std::cout << "readloop\n";
-
+
zmq::pollitem_t items[] = { { *d_socket, 0, ZMQ_POLLIN, 0 } };
zmq::poll (&items[0], 1, d_timeout);
// If we got a reply, process
if (items[0].revents & ZMQ_POLLIN) {
- // Receive data
- zmq::message_t msg;
- d_socket->recv(&msg);
-
- //std::cout << "got msg...\n";
-
- std::string buf(static_cast<char*>(msg.data()), msg.size());
- std::stringbuf sb(buf);
- pmt::pmt_t m = pmt::deserialize(sb);
- //std::cout << m << "\n";
- message_port_pub(pmt::mp("out"), m);
-
- } else {
- usleep(100);
- }
- }
- }
+ // Receive data
+ zmq::message_t msg;
+ d_socket->recv(&msg);
+ std::string buf(static_cast<char*>(msg.data()), msg.size());
+ std::stringbuf sb(buf);
+ pmt::pmt_t m = pmt::deserialize(sb);
+ message_port_pub(pmt::mp("out"), m);
- int
- pull_msg_source_impl::work(int noutput_items,
- gr_vector_const_void_star &input_items,
- gr_vector_void_star &output_items)
- {
- return noutput_items;
+ } else {
+ usleep(100);
+ }
+ }
}
} /* namespace zeromq */
diff --git a/gr-zeromq/lib/pull_msg_source_impl.h
b/gr-zeromq/lib/pull_msg_source_impl.h
index 9ff89ef..6d8791d 100644
--- a/gr-zeromq/lib/pull_msg_source_impl.h
+++ b/gr-zeromq/lib/pull_msg_source_impl.h
@@ -35,19 +35,18 @@ namespace gr {
int d_timeout; // microseconds, -1 is blocking
zmq::context_t *d_context;
zmq::socket_t *d_socket;
+ boost::thread *d_thread;
+
void readloop();
- boost::thread *d_thread;
public:
+ bool d_finished;
+
pull_msg_source_impl(char *address, int timeout);
~pull_msg_source_impl();
bool start();
bool stop();
- bool d_finished;
- int work(int noutput_items,
- gr_vector_const_void_star &input_items,
- gr_vector_void_star &output_items);
};
} // namespace zeromq
- [Commit-gnuradio] [gnuradio] branch master updated (5cb307e -> 1581681), git, 2015/01/12
- [Commit-gnuradio] [gnuradio] 13/29: zmq: adding header information, git, 2015/01/12
- [Commit-gnuradio] [gnuradio] 11/29: zeromq: fix segfault in tag_headers, git, 2015/01/12
- [Commit-gnuradio] [gnuradio] 16/29: zmq: naming cleanup, git, 2015/01/12
- [Commit-gnuradio] [gnuradio] 17/29: zmq: pull_msg_source should be working, git, 2015/01/12
- [Commit-gnuradio] [gnuradio] 20/29: zeromq: cleanup and convert pub_msg_sink to derive from gr::block, git, 2015/01/12
- [Commit-gnuradio] [gnuradio] 23/29: zeromq: cleanup and made pull_msg_source derive from gr::block,
git <=
- [Commit-gnuradio] [gnuradio] 07/29: zmq: all source blocks should now support tag headers, git, 2015/01/12
- [Commit-gnuradio] [gnuradio] 06/29: zmq: hoisting tag parsing into helper function, should now be easy to use in other zmq blocks, git, 2015/01/12
- [Commit-gnuradio] [gnuradio] 14/29: zmq: Adding zmq pub/sub blocks for message passing, git, 2015/01/12
- [Commit-gnuradio] [gnuradio] 21/29: zeromq: cleanup and convert sub_msg_source to derive from gr::block, git, 2015/01/12
- [Commit-gnuradio] [gnuradio] 22/29: zeromq: cleanup and converted push_msg_sink to derive from gr::block, git, 2015/01/12
- [Commit-gnuradio] [gnuradio] 01/29: zmq: encoding tags into zmq stream, git, 2015/01/12
- [Commit-gnuradio] [gnuradio] 10/29: zeromq: fixups and stylistic changes before merge, git, 2015/01/12
- [Commit-gnuradio] [gnuradio] 24/29: zeromq: cleanup and converted rep_msg_sink to derive from gr::block, git, 2015/01/12
- [Commit-gnuradio] [gnuradio] 08/29: zmq: sync blocks now all support tag headers, git, 2015/01/12
- [Commit-gnuradio] [gnuradio] 12/29: zmq: stream tag passing now works, git, 2015/01/12