[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[Commit-gnuradio] [gnuradio] 17/29: zmq: pull_msg_source should be worki
From: |
git |
Subject: |
[Commit-gnuradio] [gnuradio] 17/29: zmq: pull_msg_source should be working |
Date: |
Tue, 13 Jan 2015 01:04:28 +0000 (UTC) |
This is an automated email from the git hooks/post-receive script.
jcorgan pushed a commit to branch master
in repository gnuradio.
commit 29bd7ae09383372afddcbab22fcd99b2333e4c1e
Author: Tim O'Shea <address@hidden>
Date: Tue Dec 30 18:07:52 2014 +0100
zmq: pull_msg_source should be working
---
gr-zeromq/lib/pull_msg_source_impl.cc | 84 +++++++++++++++++------------------
gr-zeromq/lib/pull_msg_source_impl.h | 5 +++
2 files changed, 46 insertions(+), 43 deletions(-)
diff --git a/gr-zeromq/lib/pull_msg_source_impl.cc
b/gr-zeromq/lib/pull_msg_source_impl.cc
index 5b207ce..0c848fc 100644
--- a/gr-zeromq/lib/pull_msg_source_impl.cc
+++ b/gr-zeromq/lib/pull_msg_source_impl.cc
@@ -65,55 +65,53 @@ namespace gr {
delete d_context;
}
+ bool pull_msg_source_impl::start(){
+ d_finished = false;
+ d_thread = new boost::thread( boost::bind(
&pull_msg_source_impl::readloop , this ) );
+ return true;
+ }
+
+ bool pull_msg_source_impl::stop(){
+ d_finished = true;
+ d_thread->join();
+ return true;
+ }
+
+ void pull_msg_source_impl::readloop(){
+ while(!d_finished){
+ //std::cout << "readloop\n";
+
+ zmq::pollitem_t items[] = { { *d_socket, 0, ZMQ_POLLIN, 0 } };
+ zmq::poll (&items[0], 1, d_timeout);
+
+ // If we got a reply, process
+ if (items[0].revents & ZMQ_POLLIN) {
+
+ // Receive data
+ zmq::message_t msg;
+ d_socket->recv(&msg);
+
+ //std::cout << "got msg...\n";
+
+ std::string buf(static_cast<char*>(msg.data()), msg.size());
+ std::stringbuf sb(buf);
+ pmt::pmt_t m = pmt::deserialize(sb);
+ //std::cout << m << "\n";
+ message_port_pub(pmt::mp("out"), m);
+
+ } else {
+ usleep(100);
+ }
+ }
+ }
+
+
int
pull_msg_source_impl::work(int noutput_items,
gr_vector_const_void_star &input_items,
gr_vector_void_star &output_items)
{
return noutput_items;
- /*
- char *out = (char*)output_items[0];
-
- zmq::pollitem_t items[] = { { *d_socket, 0, ZMQ_POLLIN, 0 } };
- zmq::poll (&items[0], 1, d_timeout);
-
- // If we got a reply, process
- if (items[0].revents & ZMQ_POLLIN) {
-
- // Receive data
- zmq::message_t msg;
- d_socket->recv(&msg);
-
- // check header for tags...
- std::string buf(static_cast<char*>(msg.data()), msg.size());
- if(d_pass_tags){
- uint64_t rcv_offset;
- std::vector<gr::tag_t> tags;
- buf = parse_tag_header(buf, rcv_offset, tags);
- for(size_t i=0; i<tags.size(); i++){
- tags[i].offset -= rcv_offset - nitems_written(0);
- add_item_tag(0, tags[i]);
- }
- }
-
-
- // Copy to ouput buffer and return
- if (buf.size() >= d_itemsize*d_vlen*noutput_items) {
- memcpy(out, (void *)&buf[0], d_itemsize*d_vlen*noutput_items);
-
- return noutput_items;
- }
- else {
- memcpy(out, (void *)&buf[0], buf.size());
-
- return buf.size()/(d_itemsize*d_vlen);
- }
- }
- else {
- return 0; // FIXME: someday when the scheduler does all the
poll/selects
- }
-
- */
}
} /* namespace zeromq */
diff --git a/gr-zeromq/lib/pull_msg_source_impl.h
b/gr-zeromq/lib/pull_msg_source_impl.h
index fb9237c..9ff89ef 100644
--- a/gr-zeromq/lib/pull_msg_source_impl.h
+++ b/gr-zeromq/lib/pull_msg_source_impl.h
@@ -35,11 +35,16 @@ namespace gr {
int d_timeout; // microseconds, -1 is blocking
zmq::context_t *d_context;
zmq::socket_t *d_socket;
+ void readloop();
+ boost::thread *d_thread;
public:
pull_msg_source_impl(char *address, int timeout);
~pull_msg_source_impl();
+ bool start();
+ bool stop();
+ bool d_finished;
int work(int noutput_items,
gr_vector_const_void_star &input_items,
gr_vector_void_star &output_items);
- [Commit-gnuradio] [gnuradio] branch master updated (5cb307e -> 1581681), git, 2015/01/12
- [Commit-gnuradio] [gnuradio] 13/29: zmq: adding header information, git, 2015/01/12
- [Commit-gnuradio] [gnuradio] 11/29: zeromq: fix segfault in tag_headers, git, 2015/01/12
- [Commit-gnuradio] [gnuradio] 16/29: zmq: naming cleanup, git, 2015/01/12
- [Commit-gnuradio] [gnuradio] 17/29: zmq: pull_msg_source should be working,
git <=
- [Commit-gnuradio] [gnuradio] 20/29: zeromq: cleanup and convert pub_msg_sink to derive from gr::block, git, 2015/01/12
- [Commit-gnuradio] [gnuradio] 23/29: zeromq: cleanup and made pull_msg_source derive from gr::block, git, 2015/01/12
- [Commit-gnuradio] [gnuradio] 07/29: zmq: all source blocks should now support tag headers, git, 2015/01/12
- [Commit-gnuradio] [gnuradio] 06/29: zmq: hoisting tag parsing into helper function, should now be easy to use in other zmq blocks, git, 2015/01/12
- [Commit-gnuradio] [gnuradio] 14/29: zmq: Adding zmq pub/sub blocks for message passing, git, 2015/01/12
- [Commit-gnuradio] [gnuradio] 21/29: zeromq: cleanup and convert sub_msg_source to derive from gr::block, git, 2015/01/12
- [Commit-gnuradio] [gnuradio] 22/29: zeromq: cleanup and converted push_msg_sink to derive from gr::block, git, 2015/01/12
- [Commit-gnuradio] [gnuradio] 01/29: zmq: encoding tags into zmq stream, git, 2015/01/12
- [Commit-gnuradio] [gnuradio] 10/29: zeromq: fixups and stylistic changes before merge, git, 2015/01/12
- [Commit-gnuradio] [gnuradio] 24/29: zeromq: cleanup and converted rep_msg_sink to derive from gr::block, git, 2015/01/12