[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[Commit-gnuradio] [gnuradio] 01/03: digital: Major update to header/payl
From: |
git |
Subject: |
[Commit-gnuradio] [gnuradio] 01/03: digital: Major update to header/payload demuxer (HPD) |
Date: |
Mon, 2 May 2016 04:54:38 +0000 (UTC) |
This is an automated email from the git hooks/post-receive script.
jcorgan pushed a commit to branch master
in repository gnuradio.
commit 5283e459f6d83aea2d5133fecb0a11a873f0c3ad
Author: Martin Braun <address@hidden>
Date: Mon Apr 4 22:19:07 2016 -0700
digital: Major update to header/payload demuxer (HPD)
- Added padding feature (trigger/tag can now be off by some items)
- Payload offset can also be specified
- Fixed some index counting bugs
- More and better unit tests, cleaned up the unit test file
- Cleanups:
- Consistent whitespace
- Consistent use of size_t and other types
- Used more enums where it increases readability
---
gr-digital/grc/digital_header_payload_demux.xml | 7 +
.../gnuradio/digital/header_payload_demux.h | 91 ++-
gr-digital/lib/header_payload_demux_impl.cc | 670 ++++++++++++---------
gr-digital/lib/header_payload_demux_impl.h | 75 ++-
.../python/digital/qa_header_payload_demux.py | 451 ++++++++++----
5 files changed, 865 insertions(+), 429 deletions(-)
diff --git a/gr-digital/grc/digital_header_payload_demux.xml
b/gr-digital/grc/digital_header_payload_demux.xml
index 24c6c5b..a2fe80e 100644
--- a/gr-digital/grc/digital_header_payload_demux.xml
+++ b/gr-digital/grc/digital_header_payload_demux.xml
@@ -13,6 +13,7 @@
$timing_tag_key,
$samp_rate,
$special_tags,
+ $header_padding,
)</make>
<param>
<name>Header Length (Symbols)</name>
@@ -20,6 +21,12 @@
<type>int</type>
</param>
<param>
+ <name>Header Padding (Uncertainty / Symbols)</name>
+ <key>header_padding</key>
+ <value>0</value>
+ <type>int</type>
+ </param>
+ <param>
<name>Items per symbol</name>
<key>items_per_symbol</key>
<type>int</type>
diff --git a/gr-digital/include/gnuradio/digital/header_payload_demux.h
b/gr-digital/include/gnuradio/digital/header_payload_demux.h
index 303bebb..bcd6bd1 100644
--- a/gr-digital/include/gnuradio/digital/header_payload_demux.h
+++ b/gr-digital/include/gnuradio/digital/header_payload_demux.h
@@ -1,5 +1,5 @@
/* -*- c++ -*- */
-/* Copyright 2012 Free Software Foundation, Inc.
+/* Copyright 2012-2016 Free Software Foundation, Inc.
*
* This file is part of GNU Radio
*
@@ -29,7 +29,7 @@ namespace gr {
namespace digital {
/*!
- * \brief Header/Payload demuxer.
+ * \brief Header/Payload demuxer (HPD).
* \ingroup packet_operators_blk
*
* \details
@@ -58,6 +58,9 @@ namespace gr {
* and taken as the payload length. The payload, together with the header
data
* as tags, is then copied to output 1.
*
+ * If the header demodulation fails, the header must send a PMT with value
+ * pmt::PMT_F. The state gets reset and the header is ignored.
+ *
* \section hpd_item_sizes Symbols, Items and Item Sizes
*
* To generically and transparently handle different kinds of modulations,
@@ -68,20 +71,66 @@ namespace gr {
* grouping items. In OFDM, we usually don't care about individual
samples, but
* we do care about full OFDM symbols, so we set \p items_per_symbol to the
* IFFT / FFT length of the OFDM modulator / demodulator.
- * For most single-carrier modulations, this value can be set to 1 (the
default
- * value).
+ * For single-carrier modulations, this value can be set to the number of
+ * samples per symbol, to handle data in number of symbols, or to 1 to
+ * handle data in number of samples.
* If specified, \p guard_interval items are discarded before every symbol.
* This is useful for demuxing bursts of OFDM signals.
*
* On the output, we can deal with symbols directly by setting \p
output_symbols
* to true. In that case, the output item size is the <em>symbol size</em>.
*
- * \b Example: OFDM with 48 sub-carriers, using a length-64 IFFT on the
modulator,
- * and a cyclic-prefix length of 16 samples. In this case, the itemsize is
- * `sizeof(gr_complex)`, because we're receiving complex samples. One OFDM
symbol
- * has 64 samples, hence \p items_per_symbol is set to 64, and \p
guard_interval to
- * 16. The header length is specified in number of OFDM symbols. Because
we want to
- * deal with full OFDM symbols, we set \p output_symbols to true.
+ * \b Example: OFDM with 48 sub-carriers, using a length-64 IFFT on the
+ * modulator, and a cyclic-prefix length of 16 samples. In this case,
+ * \p itemsize is `sizeof(gr_complex)`, because we're receiving complex
+ * samples. One OFDM symbol has 64 samples, hence \p items_per_symbol is
+ * set to 64, and \p guard_interval to 16. The header length is specified
+ * in number of OFDM symbols. Because we want to deal with full OFDM
+ * symbols, we set \p output_symbols to true.
+ *
+ * \b Example: PSK-modulated signals, with 4 samples per symbol. Again,
+ * \p itemsize is `sizeof(gr_complex)` because we're still dealing with
+ * complex samples. \p items_per_symbol is 4, because one item is one
+ * sample. \p guard_interval must be set to 0. The header length is
+ * given in number of PSK symbols.
+ *
+ * \section hpd_uncertainty Handling timing uncertainty on the trigger
+ *
+ * By default, the assumption is made that the trigger arrives on *exactly*
+ * the sample that the header starts. These triggers typically come from
+ * timing synchronization algorithms which may be suboptimal, and have a
+ * known timing uncertainty (e.g., we know the trigger might be a sample
+ * too early or too late).
+ *
+ * The demuxer has an option for this case, the \p header_padding. If this
+ * value is non-zero, it specifies the number of items that are prepended
+ * and appended to the header before copying it to the header output.
+ *
+ * Example: Say our synchronization algorithm can be off by up to two
+ * samples, and the header length is 20 samples. So we set \p header_len
+ * to 20, and \p header_padding to 2.
+ * Now assume a trigger arrives on sample index 100. We copy a total of
+ * 24 samples to the header port, starting at sample index 98.
+ *
+ * The payload is *not* padded. Let's say the header demod reports a
+ * payload length of 100. In the previous examples, we would copy 100
+ * samples to the payload port, starting at sample index 120 (this means
+ * the padded samples appended to the header are copied to both ports!).
+ * However, the header demodulator has the option to specify a payload
+ * offset, which cannot exceed the padding value. To do this, include
+ * a key `payload_offset` in the message sent back to the HPD. A negative
+ * value means the payload starts earlier than otherwise.
+ * (If you wanted to always pad the payload, you could set `payload_offset`
+ * to `-header_padding` and increase the reported length of the payload).
+ *
+ * Because the padding is specified in number of items, and not symbols,
+ * this value can only be multiples of the number of items per symbol *if*
+ * either \p output_symbols is true, or a guard interval is specified (or
+ * both). Note that in practice, it is rare that both a guard interval is
+ * specified *and* a padding value is required. The difference between the
+ * padding value and a guard interval is that a guard interval is part of
+ * the signal, and comes with *every* symbol, whereas the header padding
+ * is added to only the header, and is not by design.
*
* \section hpd_tag_handling Tag Handling
*
@@ -95,12 +144,14 @@ namespace gr {
* it belongs to this packet or the following. In this case, it is
possible that the
* tag might be propagated twice.
*
- * Tags outside of packets are generally discarded. If this information is
important,
- * there are two additional mechanisms to preserve the tags:
+ * Tags outside of packets are generally discarded. If there are tags that
+ * carry important information that must not be list, there are two
+ * additional mechanisms to preserve the tags:
* - Timing tags might be relevant to know \b when a packet was received.
By
* specifying the name of a timestamp tag and the sample rate at this
block, it
* keeps track of the time and will add the time to the first item of
every packet.
- * The name of the timestamp tag is usually 'rx_time' (see
gr::uhd::usrp_source::make()).
+ * The name of the timestamp tag is usually 'rx_time' (see, e.g.,
+ * gr::uhd::usrp_source::make()).
* The time value must be specified in the UHD time format.
* - Other tags are simply stored and updated. As an example, the user
might want to know the
* rx frequency, which UHD stores in the rx_freq tag. In this case, add
the tag name 'rx_freq'
@@ -124,18 +175,20 @@ namespace gr {
* \param timing_tag_key The name of the tag with timing information,
usually 'rx_time' or empty (this means timing info is discarded)
* \param samp_rate Sampling rate at the input. Necessary to calculate
the rx time of packets.
* \param special_tags A vector of strings denoting tags which shall be
preserved (see \ref hpd_tag_handling)
+ * \param header_padding A number of items that is appended and
prepended to the header.
*/
static sptr make(
- int header_len,
- int items_per_symbol=1,
- int guard_interval=0,
+ const int header_len,
+ const int items_per_symbol=1,
+ const int guard_interval=0,
const std::string &length_tag_key="frame_len",
const std::string &trigger_tag_key="",
- bool output_symbols=false,
- size_t itemsize=sizeof(gr_complex),
+ const bool output_symbols=false,
+ const size_t itemsize=sizeof(gr_complex),
const std::string &timing_tag_key="",
const double samp_rate=1.0,
- const std::vector<std::string>
&special_tags=std::vector<std::string>()
+ const std::vector<std::string>
&special_tags=std::vector<std::string>(),
+ const size_t header_padding=0
);
};
diff --git a/gr-digital/lib/header_payload_demux_impl.cc
b/gr-digital/lib/header_payload_demux_impl.cc
index 89428fa..f887ea1 100644
--- a/gr-digital/lib/header_payload_demux_impl.cc
+++ b/gr-digital/lib/header_payload_demux_impl.cc
@@ -1,5 +1,5 @@
/* -*- c++ -*- */
-/* Copyright 2012-2014 Free Software Foundation, Inc.
+/* Copyright 2012-2016 Free Software Foundation, Inc.
*
* This file is part of GNU Radio
*
@@ -23,10 +23,10 @@
#include "config.h"
#endif
-#include <climits>
-#include <boost/format.hpp>
-#include <gnuradio/io_signature.h>
#include "header_payload_demux_impl.h"
+#include <gnuradio/io_signature.h>
+#include <boost/format.hpp>
+#include <climits>
namespace gr {
namespace digital {
@@ -55,55 +55,63 @@ namespace gr {
enum out_port_indexes_t {
PORT_HEADER = 0,
- PORT_PAYLOAD = 1
+ PORT_PAYLOAD = 1,
+ PORT_INPUTDATA = 0,
+ PORT_TRIGGER = 1
};
#define msg_port_id pmt::mp("header_data")
header_payload_demux::sptr
header_payload_demux::make(
- int header_len,
- int items_per_symbol,
- int guard_interval,
- const std::string &length_tag_key,
- const std::string &trigger_tag_key,
- bool output_symbols,
- size_t itemsize,
- const std::string &timing_tag_key,
- const double samp_rate,
- const std::vector<std::string> &special_tags
+ int header_len,
+ int items_per_symbol,
+ int guard_interval,
+ const std::string &length_tag_key,
+ const std::string &trigger_tag_key,
+ bool output_symbols,
+ size_t itemsize,
+ const std::string &timing_tag_key,
+ const double samp_rate,
+ const std::vector<std::string> &special_tags,
+ const size_t header_padding
){
return gnuradio::get_initial_sptr (
- new header_payload_demux_impl(
- header_len,
- items_per_symbol,
- guard_interval,
- length_tag_key,
- trigger_tag_key,
- output_symbols,
- itemsize,
- timing_tag_key,
- samp_rate,
- special_tags
- )
+ new header_payload_demux_impl(
+ header_len,
+ items_per_symbol,
+ guard_interval,
+ length_tag_key,
+ trigger_tag_key,
+ output_symbols,
+ itemsize,
+ timing_tag_key,
+ samp_rate,
+ special_tags,
+ header_padding
+ )
);
}
header_payload_demux_impl::header_payload_demux_impl(
- int header_len,
- int items_per_symbol,
- int guard_interval,
- const std::string &length_tag_key,
- const std::string &trigger_tag_key,
- bool output_symbols,
- size_t itemsize,
- const std::string &timing_tag_key,
- const double samp_rate,
- const std::vector<std::string> &special_tags
+ int header_len,
+ int items_per_symbol,
+ int guard_interval,
+ const std::string &length_tag_key,
+ const std::string &trigger_tag_key,
+ bool output_symbols,
+ size_t itemsize,
+ const std::string &timing_tag_key,
+ const double samp_rate,
+ const std::vector<std::string> &special_tags,
+ const size_t header_padding
) : block("header_payload_demux",
- io_signature::make2(1, 2, itemsize, sizeof(char)),
- io_signature::make(2, 2, (output_symbols ? itemsize *
items_per_symbol : itemsize))),
+ io_signature::make2(1, 2, itemsize, sizeof(char)),
+ io_signature::make(2, 2, (output_symbols ? itemsize *
items_per_symbol : itemsize))),
d_header_len(header_len),
+ d_header_padding_symbols(header_padding / items_per_symbol),
+ d_header_padding_items(header_padding % items_per_symbol),
+ d_header_padding_total_items(header_padding),
d_items_per_symbol(items_per_symbol),
d_gi(guard_interval),
d_len_tag_key(pmt::string_to_symbol(length_tag_key)),
@@ -113,32 +121,42 @@ namespace gr {
d_uses_trigger_tag(!trigger_tag_key.empty()),
d_state(STATE_FIND_TRIGGER),
d_curr_payload_len(0),
+ d_curr_payload_offset(0),
d_payload_tag_keys(0),
d_payload_tag_values(0),
d_track_time(!timing_tag_key.empty()),
d_timing_key(pmt::intern(timing_tag_key)),
+ d_payload_offset_key(pmt::intern("payload_offset")),
d_last_time_offset(0),
d_last_time(pmt::make_tuple(pmt::from_uint64(0L),
pmt::from_double(0.0))),
d_sampling_time(1.0/samp_rate)
{
if (d_header_len < 1) {
- throw std::invalid_argument("Header length must be at least 1 symbol.");
+ throw std::invalid_argument("Header length must be at least 1
symbol.");
+ }
+ if (header_padding < 0) {
+ throw std::invalid_argument("Header padding must be non-negative.");
}
if (d_items_per_symbol < 1 || d_gi < 0 || d_itemsize < 1) {
- throw std::invalid_argument("Items and symbol sizes must be at least
1.");
+ throw std::invalid_argument("Items and symbol sizes must be at least
1.");
}
if (d_output_symbols) {
- set_relative_rate(1.0 / (d_items_per_symbol + d_gi));
+ set_relative_rate(1.0 / (d_items_per_symbol + d_gi));
} else {
- set_relative_rate((double)d_items_per_symbol / (d_items_per_symbol +
d_gi));
- set_output_multiple(d_items_per_symbol);
+ set_relative_rate((double)d_items_per_symbol / (d_items_per_symbol +
d_gi));
+ set_output_multiple(d_items_per_symbol);
+ }
+ if ((d_output_symbols || d_gi) && d_header_padding_items) {
+ throw std::invalid_argument(
+ "If output_symbols is true or a guard interval is given, padding
must be a multiple of items_per_symbol!"
+ );
}
set_tag_propagation_policy(TPP_DONT);
message_port_register_in(msg_port_id);
set_msg_handler(msg_port_id,
boost::bind(&header_payload_demux_impl::parse_header_data_msg, this, _1));
- for (unsigned i = 0; i < special_tags.size(); i++) {
- d_special_tags.push_back(pmt::string_to_symbol(special_tags[i]));
- d_special_tags_last_value.push_back(pmt::PMT_NIL);
+ for (size_t i = 0; i < special_tags.size(); i++) {
+ d_special_tags.push_back(pmt::string_to_symbol(special_tags[i]));
+ d_special_tags_last_value.push_back(pmt::PMT_NIL);
}
}
@@ -146,144 +164,219 @@ namespace gr {
{
}
+ // forecast() depends on state:
+ // - When waiting for a header, we require at least the header length
+ // - when waiting for a payload, we require at least the payload length
+ // - Otherwise, pretend this is a sync block with a
decimation/interpolation
+ // depending on symbol size and if we output symbols or items
void
- header_payload_demux_impl::forecast (int noutput_items, gr_vector_int
&ninput_items_required)
- {
+ header_payload_demux_impl::forecast(
+ int noutput_items,
+ gr_vector_int &ninput_items_required
+ ) {
int n_items_reqd = 0;
if (d_state == STATE_HEADER) {
- n_items_reqd = d_header_len * (d_items_per_symbol + d_gi);
+ n_items_reqd = d_header_len * (d_items_per_symbol + d_gi)
+ + 2*d_header_padding_total_items;
} else if (d_state == STATE_PAYLOAD) {
- n_items_reqd = d_curr_payload_len * (d_items_per_symbol + d_gi);
+ n_items_reqd = d_curr_payload_len * (d_items_per_symbol + d_gi);
} else {
- n_items_reqd = noutput_items * (d_items_per_symbol + d_gi);
- if (!d_output_symbols) {
- // here, noutput_items is an integer multiple of d_items_per_symbol!
- n_items_reqd /= d_items_per_symbol;
- }
+ n_items_reqd = noutput_items * (d_items_per_symbol + d_gi);
+ if (!d_output_symbols) {
+ // Here, noutput_items is an integer multiple of d_items_per_symbol!
+ n_items_reqd /= d_items_per_symbol;
+ }
}
for (unsigned i = 0; i < ninput_items_required.size(); i++) {
- ninput_items_required[i] = n_items_reqd;
+ ninput_items_required[i] = n_items_reqd;
}
}
- inline bool
- header_payload_demux_impl::check_items_available(
- int n_symbols,
- gr_vector_int &ninput_items,
- int noutput_items,
- int nread
- )
- {
- // Check there's enough items on the input
- if ((n_symbols * (d_items_per_symbol + d_gi)) > (ninput_items[0]-nread)
- || (ninput_items.size() == 2 && ((n_symbols * (d_items_per_symbol +
d_gi)) > (ninput_items[1]-nread)))) {
- return false;
- }
-
+ bool header_payload_demux_impl::check_buffers_ready(
+ int output_symbols_reqd,
+ int extra_output_items_reqd,
+ int noutput_items,
+ int input_items_reqd,
+ gr_vector_int &ninput_items,
+ int n_items_read
+ ) {
// Check there's enough space on the output buffer
if (d_output_symbols) {
- if (noutput_items < n_symbols) {
- return false;
- }
+ if (noutput_items < output_symbols_reqd + extra_output_items_reqd) {
+ return false;
+ }
} else {
- if (noutput_items < n_symbols * d_items_per_symbol) {
- return false;
- }
+ if (noutput_items < (output_symbols_reqd * d_items_per_symbol) +
extra_output_items_reqd) {
+ return false;
+ }
+ }
+
+ // Check there's enough items on the input
+ if (input_items_reqd > (ninput_items[0]-n_items_read)
+ || (ninput_items.size() == 2 && (input_items_reqd >
(ninput_items[1]-n_items_read)))) {
+ return false;
}
+ // All good
return true;
}
int
- header_payload_demux_impl::general_work (int noutput_items,
- gr_vector_int &ninput_items,
- gr_vector_const_void_star &input_items,
- gr_vector_void_star &output_items)
- {
- const unsigned char *in = (const unsigned char *) input_items[0];
+ header_payload_demux_impl::general_work(
+ int noutput_items,
+ gr_vector_int &ninput_items,
+ gr_vector_const_void_star &input_items,
+ gr_vector_void_star &output_items
+ ) {
+ const unsigned char *in = (const unsigned char *)
input_items[PORT_INPUTDATA];
unsigned char *out_header = (unsigned char *) output_items[PORT_HEADER];
unsigned char *out_payload = (unsigned char *)
output_items[PORT_PAYLOAD];
- int nread = 0;
- int trigger_offset = 0;
-
+ const int n_input_items = (ninput_items.size() == 2) ?
+ std::min(ninput_items[0], ninput_items[1]) :
+ ninput_items[0];
+ // Items read going into general_work()
+ const uint64_t n_items_read_base = nitems_read(PORT_INPUTDATA);
+ // Items read during this call to general_work()
+ int n_items_read = 0;
+
+ #define CONSUME_ITEMS(items_to_consume) \
+ update_special_tags( \
+ n_items_read_base + n_items_read, \
+ n_items_read_base + n_items_read + (items_to_consume) \
+ ); \
+ consume_each(items_to_consume); \
+ n_items_read += (items_to_consume); \
+ in += (items_to_consume) * d_itemsize;
switch (d_state) {
- case STATE_WAIT_FOR_MSG:
- // In an ideal world, this would never be called
- return 0;
-
- case STATE_HEADER_RX_FAIL:
- update_special_tags(0, 1);
- consume_each (1);
- in += d_itemsize;
- nread++;
- d_state = STATE_FIND_TRIGGER;
- // The following break was added to this state as well as
STATE_FIND_TRIGGER
- // and STATE_HEADER. There appears to be a bug somewhere in this code
without
- // the breaks that can lead to failure of this block. With the breaks in
the code
- // testing has shown more stable performance with various block
paramters.
- // If an offset calculation bug is found and fixed, it should be
possible to
- // remove these breaks for some performance increase.
- break;
-
- case STATE_FIND_TRIGGER:
- trigger_offset = find_trigger_signal(nread, noutput_items,
input_items);
- if (trigger_offset == -1) {
- update_special_tags(0, noutput_items - nread);
- consume_each(noutput_items - nread);
- break;
- }
- update_special_tags(0, trigger_offset);
- consume_each (trigger_offset);
- in += trigger_offset * d_itemsize;
- d_state = STATE_HEADER;
- break;
-
- case STATE_HEADER:
- if (check_items_available(d_header_len, ninput_items, noutput_items,
nread)) {
- copy_n_symbols(in, out_header, PORT_HEADER, d_header_len);
- d_state = STATE_WAIT_FOR_MSG;
- add_special_tags();
- produce(
- PORT_HEADER,
- d_header_len * (d_output_symbols ? 1 : d_items_per_symbol)
- );
- }
- break;
-
- case STATE_HEADER_RX_SUCCESS:
- for (unsigned i = 0; i < d_payload_tag_keys.size(); i++) {
- add_item_tag(
- PORT_PAYLOAD,
- nitems_written(PORT_PAYLOAD),
- d_payload_tag_keys[i],
- d_payload_tag_values[i]
- );
- }
- nread += d_header_len * (d_items_per_symbol + d_gi);
- update_special_tags(0, nread);
- consume_each (nread);
- in += nread * d_itemsize;
- d_state = STATE_PAYLOAD;
- break;
-
- case STATE_PAYLOAD:
- if (check_items_available(d_curr_payload_len, ninput_items,
noutput_items, nread)) {
- // The -1 because we won't consume the last item, it might hold the
next trigger.
- update_special_tags(0, (d_curr_payload_len - 1) *
(d_items_per_symbol + d_gi));
- copy_n_symbols(in, out_payload, PORT_PAYLOAD, d_curr_payload_len);
- produce(PORT_PAYLOAD, d_curr_payload_len * (d_output_symbols ? 1 :
d_items_per_symbol));
- consume_each ((d_curr_payload_len - 1) * (d_items_per_symbol +
d_gi)); // Same here
- set_min_noutput_items(d_output_symbols ? 1 : (d_items_per_symbol +
d_gi));
- d_state = STATE_FIND_TRIGGER;
- }
- break;
-
- default:
- throw std::runtime_error("invalid state");
+ case STATE_WAIT_FOR_MSG:
+ // In an ideal world, this would never be called
+ // parse_header_data_msg() is the only place that can kick us out
+ // of this state.
+ return 0;
+
+ case STATE_HEADER_RX_FAIL:
+ // Actions:
+ // - Consume a single item to make sure we're not deleting any other
+ // info
+ CONSUME_ITEMS(1);
+ d_state = STATE_FIND_TRIGGER;
+ break;
+
+ case STATE_FIND_TRIGGER: {
+ // Assumptions going into this state:
+ // - No other state was active for this call to general_work()
+ // - i.e. n_items_read == 0
+ // Start looking for a trigger after any header padding.
+ // The trigger offset is relative to 'in'.
+ // => The absolute trigger offset is on n_items_read_base +
n_items_read_base + trigger_offset
+ const int max_rel_offset = n_input_items - n_items_read;
+ const int trigger_offset = find_trigger_signal(
+ d_header_padding_total_items,
+ max_rel_offset,
+ n_items_read_base + n_items_read,
+ (input_items.size() == 2) ?
+ ((const unsigned char *) input_items[PORT_TRIGGER]) +
n_items_read : NULL
+ );
+ if (trigger_offset < max_rel_offset) {
+ d_state = STATE_HEADER;
+ }
+ // If we're using padding, don't consume everything, or we might
+ // end up with not enough items before the trigger
+ const int items_to_consume = trigger_offset -
d_header_padding_total_items;
+ CONSUME_ITEMS(items_to_consume);
+ break;
+ } /* case STATE_FIND_TRIGGER */
+
+ case STATE_HEADER:
+ // Assumptions going into this state:
+ // - The first items on `in' are the header samples (including
padding)
+ // - So we can just copy from the beginning of `in'
+ // - The trigger is on item index `d_header_padding *
d_items_per_symbol'
+ // Actions:
+ // - Copy the entire header (including padding) to the header port
+ // - Special tags are added to the header port
+ if (check_buffers_ready(
+ d_header_len + 2*d_header_padding_symbols,
+ d_header_padding_items,
+ noutput_items,
+ d_header_len * (d_items_per_symbol + d_gi) +
2*d_header_padding_total_items,
+ ninput_items,
+ n_items_read)) {
+ add_special_tags();
+ copy_n_symbols(
+ in,
+ out_header,
+ PORT_HEADER,
+ n_items_read_base + n_items_read,
+ d_header_len+2*d_header_padding_symbols, // Number of symbols
to copy
+ 2*d_header_padding_items
+ );
+ d_state = STATE_WAIT_FOR_MSG;
+ }
+ break;
+
+ case STATE_HEADER_RX_SUCCESS:
+ // Copy tags from header to payload
+ for (size_t i = 0; i < d_payload_tag_keys.size(); i++) {
+ add_item_tag(
+ PORT_PAYLOAD,
+ nitems_written(PORT_PAYLOAD),
+ d_payload_tag_keys[i],
+ d_payload_tag_values[i]
+ );
+ }
+ // Consume header from input
+ {
+ // Consume the padding only once, we leave the second
+ // part in there because it might be part of the payload
+ const int items_to_consume =
+ d_header_len * (d_items_per_symbol + d_gi)
+ + d_header_padding_total_items
+ + d_curr_payload_offset;
+ CONSUME_ITEMS(items_to_consume);
+ d_curr_payload_offset = 0;
+ d_state = STATE_PAYLOAD;
+ }
+ break;
+
+ case STATE_PAYLOAD:
+ // Assumptions:
+ // - Input buffer is in the right spot to just start copying
+ if (check_buffers_ready(
+ d_curr_payload_len,
+ 0,
+ noutput_items,
+ d_curr_payload_len * (d_items_per_symbol + d_gi),
+ ninput_items,
+ n_items_read)) {
+ // Write payload
+ copy_n_symbols(
+ in,
+ out_payload,
+ PORT_PAYLOAD,
+ n_items_read_base + n_items_read,
+ d_curr_payload_len
+ );
+ // Consume payload
+ // We can't consume the full payload, because we need to hold off
+ // at least the padding value. We'll use a minimum padding of 1
+ // item here.
+ const int items_padding = std::max(d_header_padding_total_items,
1);
+ const int items_to_consume =
+ d_curr_payload_len * (d_items_per_symbol + d_gi)
+ - items_padding;
+ CONSUME_ITEMS(items_to_consume);
+ set_min_noutput_items(d_output_symbols ? 1 : (d_items_per_symbol +
d_gi));
+ d_state = STATE_FIND_TRIGGER;
+ }
+ break;
+
+ default:
+ throw std::runtime_error("invalid state");
} /* switch */
return WORK_CALLED_PRODUCE;
@@ -292,35 +385,41 @@ namespace gr {
int
header_payload_demux_impl::find_trigger_signal(
- int nread,
- int noutput_items,
- gr_vector_const_void_star &input_items)
- {
- if (input_items.size() == 2) {
- unsigned char *in_trigger = (unsigned char *) input_items[1];
- in_trigger += nread;
- for (int i = 0; i < noutput_items-nread; i++) {
- if (in_trigger[i]) {
- return i;
- }
- }
+ int skip_items,
+ int max_rel_offset,
+ uint64_t base_offset,
+ const unsigned char *in_trigger
+ ) {
+ int rel_offset = max_rel_offset;
+ if (max_rel_offset < skip_items) {
+ return rel_offset;
+ }
+ if (in_trigger) {
+ for (int i = skip_items; i < max_rel_offset; i++) {
+ if (in_trigger[i]) {
+ rel_offset = i;
+ break;
+ }
+ }
}
if (d_uses_trigger_tag) {
std::vector<tag_t> tags;
- get_tags_in_range(tags, 0, nitems_read(0),
nitems_read(0)+noutput_items, d_trigger_tag_key);
- uint64_t min_offset = ULLONG_MAX;
- int tag_index = -1;
- for (unsigned i = 0; i < tags.size(); i++) {
- if (tags[i].offset < min_offset) {
- tag_index = (int) i;
- min_offset = tags[i].offset;
+ get_tags_in_range(
+ tags,
+ PORT_INPUTDATA,
+ base_offset + skip_items,
+ base_offset + max_rel_offset,
+ d_trigger_tag_key
+ );
+ if (!tags.empty()) {
+ std::sort(tags.begin(), tags.end(), tag_t::offset_compare);
+ const int tag_rel_offset = tags[0].offset - base_offset;
+ if (tag_rel_offset < rel_offset) {
+ rel_offset = tag_rel_offset;
}
}
- if (tag_index != -1) {
- return min_offset - nitems_read(0);
- }
}
- return -1;
+ return rel_offset;
} /* find_trigger_signal() */
@@ -332,77 +431,100 @@ namespace gr {
d_state = STATE_HEADER_RX_FAIL;
if (pmt::is_integer(header_data)) {
- d_curr_payload_len = pmt::to_long(header_data);
- d_payload_tag_keys.push_back(d_len_tag_key);
- d_payload_tag_values.push_back(header_data);
- d_state = STATE_HEADER_RX_SUCCESS;
+ d_curr_payload_len = pmt::to_long(header_data);
+ d_payload_tag_keys.push_back(d_len_tag_key);
+ d_payload_tag_values.push_back(header_data);
+ d_state = STATE_HEADER_RX_SUCCESS;
} else if (pmt::is_dict(header_data)) {
- pmt::pmt_t dict_items(pmt::dict_items(header_data));
- while (!pmt::is_null(dict_items)) {
- pmt::pmt_t this_item(pmt::car(dict_items));
- d_payload_tag_keys.push_back(pmt::car(this_item));
- d_payload_tag_values.push_back(pmt::cdr(this_item));
- if (pmt::equal(pmt::car(this_item), d_len_tag_key)) {
- d_curr_payload_len = pmt::to_long(pmt::cdr(this_item));
- d_state = STATE_HEADER_RX_SUCCESS;
- }
- dict_items = pmt::cdr(dict_items);
- }
- if (d_state == STATE_HEADER_RX_FAIL) {
- GR_LOG_CRIT(d_logger, "no length tag passed from header data");
- }
+ pmt::pmt_t dict_items(pmt::dict_items(header_data));
+ while (!pmt::is_null(dict_items)) {
+ pmt::pmt_t this_item(pmt::car(dict_items));
+ d_payload_tag_keys.push_back(pmt::car(this_item));
+ d_payload_tag_values.push_back(pmt::cdr(this_item));
+ if (pmt::equal(pmt::car(this_item), d_len_tag_key)) {
+ d_curr_payload_len = pmt::to_long(pmt::cdr(this_item));
+ d_state = STATE_HEADER_RX_SUCCESS;
+ }
+ if (pmt::equal(pmt::car(this_item), d_payload_offset_key)) {
+ d_curr_payload_offset = pmt::to_long(pmt::cdr(this_item));
+ if (std::abs(d_curr_payload_offset) >
d_header_padding_total_items) {
+ GR_LOG_CRIT(d_logger, "Payload offset exceeds padding");
+ d_state = STATE_HEADER_RX_FAIL;
+ return;
+ }
+ }
+ dict_items = pmt::cdr(dict_items);
+ }
+ if (d_state == STATE_HEADER_RX_FAIL) {
+ GR_LOG_CRIT(d_logger, "no payload length passed from header data");
+ }
} else if (header_data == pmt::PMT_F || pmt::is_null(header_data)) {
- GR_LOG_INFO(d_logger, boost::format("Parser returned %1%") %
pmt::write_string(header_data));
+ GR_LOG_INFO(d_logger, boost::format("Parser returned %1%") %
pmt::write_string(header_data));
} else {
- GR_LOG_ALERT(d_logger, boost::format("Received illegal header data
(%1%)") % pmt::write_string(header_data));
+ GR_LOG_ALERT(d_logger, boost::format("Received illegal header data
(%1%)") % pmt::write_string(header_data));
}
if (d_state == STATE_HEADER_RX_SUCCESS)
{
- if ((d_curr_payload_len * (d_output_symbols ? 1 : d_items_per_symbol))
> max_output_buffer(1)/2) {
- d_state = STATE_HEADER_RX_FAIL;
- GR_LOG_INFO(d_logger, boost::format("Detected a packet larger than
max frame size (%1% symbols)") % d_curr_payload_len);
- } else {
- set_min_noutput_items(d_curr_payload_len * (d_output_symbols ? 1 :
d_items_per_symbol));
- }
+ if (d_curr_payload_len < 0) {
+ GR_LOG_WARN(d_logger, boost::format("Detected a packet larger than
max frame size (%1% symbols)") % d_curr_payload_len);
+ d_curr_payload_len = 0;
+ d_state = STATE_HEADER_RX_FAIL;
+ }
+ if ((d_curr_payload_len * (d_output_symbols ? 1 : d_items_per_symbol))
> max_output_buffer(1)/2) {
+ d_state = STATE_HEADER_RX_FAIL;
+ GR_LOG_INFO(d_logger, boost::format("Detected a packet larger than
max frame size (%1% symbols)") % d_curr_payload_len);
+ } else {
+ set_min_noutput_items(d_curr_payload_len * (d_output_symbols ? 1 :
d_items_per_symbol));
+ }
}
} /* parse_header_data_msg() */
void
header_payload_demux_impl::copy_n_symbols(
- const unsigned char *in,
- unsigned char *out,
- int port,
- int n_symbols
- )
- {
+ const unsigned char *in,
+ unsigned char *out,
+ int port,
+ const uint64_t n_items_read_base,
+ int n_symbols,
+ int n_padding_items
+ ) {
// Copy samples
if (d_gi) {
- for (int i = 0; i < n_symbols; i++) {
- memcpy((void *) out, (void *) (in + d_gi * d_itemsize),
d_items_per_symbol * d_itemsize);
- in += d_itemsize * (d_items_per_symbol + d_gi);
- out += d_itemsize * d_items_per_symbol;
- }
+ // Here we know n_padding_items must be 0 (see contract),
+ // because all padding items will be part of n_symbols
+ for (int i = 0; i < n_symbols; i++) {
+ memcpy(
+ (void *) out,
+ (void *) (in + d_gi * d_itemsize),
+ d_items_per_symbol * d_itemsize
+ );
+ in += d_itemsize * (d_items_per_symbol + d_gi);
+ out += d_itemsize * d_items_per_symbol;
+ }
} else {
- memcpy(
- (void *) out,
- (void *) in,
- n_symbols * d_items_per_symbol * d_itemsize
- );
+ memcpy(
+ (void *) out,
+ (void *) in,
+ (n_symbols * d_items_per_symbol + n_padding_items) * d_itemsize
+ );
}
// Copy tags
std::vector<tag_t> tags;
get_tags_in_range(
- tags, 0,
- nitems_read(0),
- nitems_read(0) + n_symbols * (d_items_per_symbol + d_gi)
+ tags,
+ PORT_INPUTDATA,
+ n_items_read_base,
+ n_items_read_base
+ + n_symbols * (d_items_per_symbol + d_gi)
+ + n_padding_items
);
for (size_t t = 0; t < tags.size(); t++) {
// The trigger tag is *not* propagated
if (tags[t].key == d_trigger_tag_key) {
continue;
}
- int new_offset = tags[t].offset - nitems_read(0);
+ int new_offset = tags[t].offset - n_items_read_base;
if (d_output_symbols) {
new_offset /= (d_items_per_symbol + d_gi);
} else if (d_gi) {
@@ -418,43 +540,49 @@ namespace gr {
tags[t].value
);
}
+ // Advance write pointers
+ // Items to produce might actually be symbols
+ const int items_to_produce = d_output_symbols ?
+ n_symbols :
+ (n_symbols * d_items_per_symbol + n_padding_items);
+ produce(port, items_to_produce);
} /* copy_n_symbols() */
void
header_payload_demux_impl::update_special_tags(
- int range_start,
- int range_end
+ uint64_t range_start,
+ uint64_t range_end
){
if (d_track_time) {
- std::vector<tag_t> tags;
- get_tags_in_range(tags, 0,
- nitems_read(0) + range_start,
- nitems_read(0) + range_end,
- d_timing_key
- );
- for (unsigned t = 0; t < tags.size(); t++) {
- if(tags[t].offset >= d_last_time_offset) {
- d_last_time = tags[t].value;
- d_last_time_offset = tags[t].offset;
- }
- }
+ std::vector<tag_t> tags;
+ get_tags_in_range(
+ tags,
+ PORT_INPUTDATA,
+ range_start,
+ range_end,
+ d_timing_key
+ );
+ if (!tags.empty()) {
+ std::sort(tags.begin(), tags.end(), tag_t::offset_compare);
+ d_last_time = tags.back().value;
+ d_last_time_offset = tags.back().offset;
+ }
}
std::vector<tag_t> tags;
- for (unsigned i = 0; i < d_special_tags.size(); i++) {
- uint64_t offset = 0;
- // TODO figure out if it's better to get all tags at once instead of
doing this for every tag individually
- get_tags_in_range(tags, 0,
- nitems_read(0) + range_start,
- nitems_read(0) + range_end,
- d_special_tags[i]
- );
- for (unsigned t = 0; t < tags.size(); t++) {
- if(tags[t].offset >= offset) {
- d_special_tags_last_value[i] = tags[t].value;
- offset = tags[t].offset;
- }
- }
+ for (size_t i = 0; i < d_special_tags.size(); i++) {
+ // TODO figure out if it's better to get all tags at once instead of
doing this for every tag individually
+ get_tags_in_range(
+ tags,
+ PORT_INPUTDATA, // Read from port 0
+ range_start,
+ range_end,
+ d_special_tags[i]
+ );
+ std::sort(tags.begin(), tags.end(), tag_t::offset_compare);
+ for (size_t t = 0; t < tags.size(); t++) {
+ d_special_tags_last_value[i] = tags[t].value;
+ }
}
} /* update_special_tags() */
@@ -462,24 +590,24 @@ namespace gr {
header_payload_demux_impl::add_special_tags(
){
if (d_track_time) {
- add_item_tag(
- PORT_HEADER,
- nitems_written(PORT_HEADER),
- d_timing_key,
- _update_pmt_time(
- d_last_time,
- d_sampling_time * (nitems_read(0) - d_last_time_offset)
- )
- );
+ add_item_tag(
+ PORT_HEADER,
+ nitems_written(PORT_HEADER),
+ d_timing_key,
+ _update_pmt_time(
+ d_last_time,
+ d_sampling_time * (nitems_read(PORT_INPUTDATA) -
d_last_time_offset)
+ )
+ );
}
for (unsigned i = 0; i < d_special_tags.size(); i++) {
- add_item_tag(
- PORT_HEADER,
- nitems_written(PORT_HEADER),
- d_special_tags[i],
- d_special_tags_last_value[i]
- );
+ add_item_tag(
+ PORT_HEADER,
+ nitems_written(PORT_HEADER),
+ d_special_tags[i],
+ d_special_tags_last_value[i]
+ );
}
} /* add_special_tags() */
diff --git a/gr-digital/lib/header_payload_demux_impl.h
b/gr-digital/lib/header_payload_demux_impl.h
index 1d45dc7..0a70e7d 100644
--- a/gr-digital/lib/header_payload_demux_impl.h
+++ b/gr-digital/lib/header_payload_demux_impl.h
@@ -1,18 +1,18 @@
/* -*- c++ -*- */
-/* Copyright 2012 Free Software Foundation, Inc.
- *
+/* Copyright 2012-2016 Free Software Foundation, Inc.
+ *
* This file is part of GNU Radio
- *
+ *
* GNU Radio is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 3, or (at your option)
* any later version.
- *
+ *
* GNU Radio is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
- *
+ *
* You should have received a copy of the GNU General Public License
* along with GNU Radio; see the file COPYING. If not, write to
* the Free Software Foundation, Inc., 51 Franklin Street,
@@ -31,6 +31,9 @@ namespace gr {
{
private:
int d_header_len; //!< Number of bytes per header
+ const int d_header_padding_symbols; //!< Symbols header padding
+ const int d_header_padding_items; //!< Items header padding
+ const int d_header_padding_total_items; //!< Items header padding
int d_items_per_symbol; //!< Bytes per symbol
int d_gi; //!< Bytes per guard interval
pmt::pmt_t d_len_tag_key; //!< Key of length tag
@@ -40,10 +43,12 @@ namespace gr {
bool d_uses_trigger_tag; //!< If a trigger tag is used
int d_state; //!< Current read state
int d_curr_payload_len; //!< Length of the next payload (symbols)
+ int d_curr_payload_offset; //!< Offset of the next payload (symbols)
std::vector<pmt::pmt_t> d_payload_tag_keys; //!< Temporary buffer for
PMTs that go on the payload (keys)
std::vector<pmt::pmt_t> d_payload_tag_values; //!< Temporary buffer for
PMTs that go on the payload (values)
bool d_track_time; //!< Whether or not to keep track of the rx time
pmt::pmt_t d_timing_key; //!< Key of the timing tag (usually 'rx_time')
+ pmt::pmt_t d_payload_offset_key; //!< Key of payload offset (usually
'payload_offset')
uint64_t d_last_time_offset; //!< Item number of the last time tag
pmt::pmt_t d_last_time; //!< The actual time that was indicated
double d_sampling_time; //!< Inverse sampling rate
@@ -53,7 +58,14 @@ namespace gr {
// Helper functions to make the state machine more readable
//! Checks if there are enough items on the inputs and enough space on
the output buffers to copy \p n_symbols symbols
- inline bool check_items_available(int n_symbols, gr_vector_int
&ninput_items, int noutput_items, int nread);
+ bool check_buffers_ready(
+ int output_symbols_reqd,
+ int extra_output_items_reqd,
+ int noutput_items,
+ int input_items_reqd,
+ gr_vector_int &ninput_items,
+ int n_items_read
+ );
//! Message handler: Reads the result from the header demod and sets
length tag (and other tags)
void parse_header_data_msg(pmt::pmt_t header_data);
@@ -62,49 +74,54 @@ namespace gr {
// Searches input 1 (if active), then the tags. Returns the offset in
the input buffer
// (or -1 if none is found)
int find_trigger_signal(
- int nread,
- int noutput_items,
- gr_vector_const_void_star &input_items);
+ int skip_items,
+ int noutput_items,
+ uint64_t base_offset,
+ const unsigned char *in_trigger
+ );
//! Copies n symbols from in to out, makes sure tags are propagated
properly. Does neither consume nor produce.
void copy_n_symbols(
- const unsigned char *in,
- unsigned char *out,
- int port,
- int n_symbols
+ const unsigned char *in,
+ unsigned char *out,
+ int port,
+ const uint64_t n_items_read_base,
+ int n_symbols,
+ int n_padding_items=0
);
//! Scans a given range for tags in d_special_tags
void update_special_tags(
- int range_start,
- int range_end
+ uint64_t range_start,
+ uint64_t range_end
);
//! Adds all tags in d_special_tags and timing info to the first item of
the header.
void add_special_tags();
-
public:
header_payload_demux_impl(
- int header_len,
- int items_per_symbol,
- int guard_interval,
- const std::string &length_tag_key,
- const std::string &trigger_tag_key,
- bool output_symbols,
- size_t itemsize,
- const std::string &timing_tag_key,
- const double samp_rate,
- const std::vector<std::string> &special_tags
+ const int header_len,
+ const int items_per_symbol,
+ const int guard_interval,
+ const std::string &length_tag_key,
+ const std::string &trigger_tag_key,
+ const bool output_symbols,
+ const size_t itemsize,
+ const std::string &timing_tag_key,
+ const double samp_rate,
+ const std::vector<std::string> &special_tags,
+ const size_t header_padding
);
~header_payload_demux_impl();
void forecast (int noutput_items, gr_vector_int &ninput_items_required);
int general_work(int noutput_items,
- gr_vector_int &ninput_items,
- gr_vector_const_void_star &input_items,
- gr_vector_void_star &output_items);
+ gr_vector_int &ninput_items,
+ gr_vector_const_void_star &input_items,
+ gr_vector_void_star &output_items
+ );
};
} // namespace digital
diff --git a/gr-digital/python/digital/qa_header_payload_demux.py
b/gr-digital/python/digital/qa_header_payload_demux.py
index 8006d44..f36d710 100755
--- a/gr-digital/python/digital/qa_header_payload_demux.py
+++ b/gr-digital/python/digital/qa_header_payload_demux.py
@@ -1,29 +1,69 @@
#!/usr/bin/env python
-# Copyright 2012 Free Software Foundation, Inc.
-#
+# Copyright 2012-2016 Free Software Foundation, Inc.
+#
# This file is part of GNU Radio
-#
+#
# GNU Radio is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3, or (at your option)
# any later version.
-#
+#
# GNU Radio is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
-#
+#
# You should have received a copy of the GNU General Public License
# along with GNU Radio; see the file COPYING. If not, write to
# the Free Software Foundation, Inc., 51 Franklin Street,
# Boston, MA 02110-1301, USA.
-#
+#
+from __future__ import print_function
import time
-
-from gnuradio import gr, gr_unittest, digital, blocks
+import random
+import numpy
+from gnuradio import gr
+from gnuradio import gr_unittest
+from gnuradio import digital
+from gnuradio import blocks
import pmt
+def make_tag(key, value, offset):
+ tag = gr.tag_t()
+ tag.offset = offset
+ tag.key = pmt.string_to_symbol(key)
+ tag.value = pmt.to_pmt(value)
+ return tag
+
+
+class HeaderToMessageBlock(gr.sync_block):
+ """
+ Helps with testing the HPD. Receives a header, stores it, posts
+ a predetermined message.
+ """
+ def __init__(self, itemsize, header_len, messages, header_is_symbol=False):
+ gr.sync_block.__init__(
+ self,
+ name="HeaderToMessageBlock",
+ in_sig=[itemsize],
+ out_sig=[itemsize],
+ )
+ self.header_len = header_len
+ self.message_port_register_out(pmt.intern('header_data'))
+ self.messages = messages
+ self.msg_count = 0
+
+ def work(self, input_items, output_items):
+ for i in xrange(len(input_items[0])/self.header_len):
+ msg = self.messages[self.msg_count] or False
+ #print("Sending message: {0}".format(msg))
+ self.message_port_pub(pmt.intern('header_data'), pmt.to_pmt(msg))
+ self.msg_count += 1
+ output_items[0][:] = input_items[0][:]
+ return len(input_items[0])
+
+
class qa_header_payload_demux (gr_unittest.TestCase):
def setUp (self):
@@ -32,6 +72,36 @@ class qa_header_payload_demux (gr_unittest.TestCase):
def tearDown (self):
self.tb = None
+ def connect_all_blocks(self,
+ data_src, trigger_src,
+ hpd,
+ mock_header_demod,
+ payload_sink, header_sink
+ ):
+ """
+ Connect the standard HPD test flowgraph
+ """
+ self.tb.connect(data_src, (hpd, 0))
+ if trigger_src is not None:
+ self.tb.connect(trigger_src, (hpd, 1))
+ self.tb.connect((hpd, 0), mock_header_demod)
+ self.tb.connect(mock_header_demod, header_sink)
+ self.tb.msg_connect(
+ mock_header_demod, 'header_data',
+ hpd, 'header_data'
+ )
+ self.tb.connect((hpd, 1), payload_sink)
+
+ def run_tb(self, payload_sink, payload_len, header_sink, header_len,
timeout=30):
+ stop_time = time.time() + timeout
+ self.tb.start()
+ while len(payload_sink.data()) < payload_len and \
+ len(header_sink.data()) < header_len and \
+ time.time() < stop_time:
+ time.sleep(.2)
+ self.tb.stop()
+ self.tb.wait()
+
def test_001_t (self):
""" Simplest possible test: put in zeros, then header,
then payload, trigger signal, try to demux.
@@ -45,25 +115,13 @@ class qa_header_payload_demux (gr_unittest.TestCase):
trigger_signal = [0,] * len(data_signal)
trigger_signal[n_zeros] = 1
# This is dropped:
- testtag1 = gr.tag_t()
- testtag1.offset = 0
- testtag1.key = pmt.string_to_symbol('tag1')
- testtag1.value = pmt.from_long(0)
+ testtag1 = make_tag('tag1', 0, 0)
# This goes on output 0, item 0:
- testtag2 = gr.tag_t()
- testtag2.offset = n_zeros
- testtag2.key = pmt.string_to_symbol('tag2')
- testtag2.value = pmt.from_long(23)
+ testtag2 = make_tag('tag2', 23, n_zeros)
# This goes on output 0, item 2:
- testtag3 = gr.tag_t()
- testtag3.offset = n_zeros + len(header) - 1
- testtag3.key = pmt.string_to_symbol('tag3')
- testtag3.value = pmt.from_long(42)
+ testtag3 = make_tag('tag3', 42, n_zeros + len(header) - 1)
# This goes on output 1, item 3:
- testtag4 = gr.tag_t()
- testtag4.offset = n_zeros + len(header) + 3
- testtag4.key = pmt.string_to_symbol('tag4')
- testtag4.value = pmt.from_long(314)
+ testtag4 = make_tag('tag4', 314, n_zeros + len(header) + 3)
data_src = blocks.vector_source_f(
data_signal,
False,
@@ -73,26 +131,17 @@ class qa_header_payload_demux (gr_unittest.TestCase):
hpd = digital.header_payload_demux(
len(header), 1, 0, "frame_len", "detect", False, gr.sizeof_float
)
+ mock_header_demod = HeaderToMessageBlock(
+ numpy.float32,
+ len(header),
+ [len(payload)]
+ )
self.assertEqual(pmt.length(hpd.message_ports_in()), 2) #extra system
port defined for you
- header_sink = blocks.vector_sink_f()
payload_sink = blocks.vector_sink_f()
-
- self.tb.connect(data_src, (hpd, 0))
- self.tb.connect(trigger_src, (hpd, 1))
- self.tb.connect((hpd, 0), header_sink)
- self.tb.connect((hpd, 1), payload_sink)
- self.tb.start()
- time.sleep(.2) # Need this, otherwise, the next message is ignored
- hpd.to_basic_block()._post(
- pmt.intern('header_data'),
- pmt.from_long(len(payload))
- )
- while len(payload_sink.data()) < len(payload):
- time.sleep(.2)
- self.tb.stop()
- self.tb.wait()
-
- self.assertEqual(header_sink.data(), header)
+ header_sink = blocks.vector_sink_f()
+ self.connect_all_blocks(data_src, trigger_src, hpd, mock_header_demod,
payload_sink, header_sink)
+ self.run_tb(payload_sink, len(payload), header_sink, len(header))
+ self.assertEqual(header_sink.data(), header)
self.assertEqual(payload_sink.data(), payload)
ptags_header = []
for tag in header_sink.tags():
@@ -122,30 +171,15 @@ class qa_header_payload_demux (gr_unittest.TestCase):
payload = tuple(range(5, 20))
data_signal = (0,) * n_zeros + header + payload
# Trigger tag
- trigger_tag = gr.tag_t()
- trigger_tag.offset = n_zeros
- trigger_tag.key = pmt.string_to_symbol('detect')
- trigger_tag.value = pmt.PMT_T
+ trigger_tag = make_tag('detect', True, n_zeros)
# This is dropped:
- testtag1 = gr.tag_t()
- testtag1.offset = 0
- testtag1.key = pmt.string_to_symbol('tag1')
- testtag1.value = pmt.from_long(0)
+ testtag1 = make_tag('tag1', 0, 0)
# This goes on output 0, item 0:
- testtag2 = gr.tag_t()
- testtag2.offset = n_zeros
- testtag2.key = pmt.string_to_symbol('tag2')
- testtag2.value = pmt.from_long(23)
+ testtag2 = make_tag('tag2', 23, n_zeros)
# This goes on output 0, item 2:
- testtag3 = gr.tag_t()
- testtag3.offset = n_zeros + len(header) - 1
- testtag3.key = pmt.string_to_symbol('tag3')
- testtag3.value = pmt.from_long(42)
+ testtag3 = make_tag('tag3', 42, n_zeros + len(header) - 1)
# This goes on output 1, item 3:
- testtag4 = gr.tag_t()
- testtag4.offset = n_zeros + len(header) + 3
- testtag4.key = pmt.string_to_symbol('tag4')
- testtag4.value = pmt.from_long(314)
+ testtag4 = make_tag('tag4', 314, n_zeros + len(header) + 3)
data_src = blocks.vector_source_f(
data_signal,
False,
@@ -157,21 +191,14 @@ class qa_header_payload_demux (gr_unittest.TestCase):
self.assertEqual(pmt.length(hpd.message_ports_in()), 2) #extra system
port defined for you
header_sink = blocks.vector_sink_f()
payload_sink = blocks.vector_sink_f()
-
- self.tb.connect(data_src, (hpd, 0))
- self.tb.connect((hpd, 0), header_sink)
- self.tb.connect((hpd, 1), payload_sink)
- self.tb.start()
- time.sleep(.2) # Need this, otherwise, the next message is ignored
- hpd.to_basic_block()._post(
- pmt.intern('header_data'),
- pmt.from_long(len(payload))
+ mock_header_demod = HeaderToMessageBlock(
+ numpy.float32,
+ len(header),
+ [len(payload)]
)
- while len(payload_sink.data()) < len(payload):
- time.sleep(.2)
- self.tb.stop()
- self.tb.wait()
-
+ self.connect_all_blocks(data_src, None, hpd, mock_header_demod,
payload_sink, header_sink)
+ self.run_tb(payload_sink, len(payload), header_sink, len(header))
+ # Check results
self.assertEqual(header_sink.data(), header)
self.assertEqual(payload_sink.data(), payload)
ptags_header = []
@@ -193,8 +220,143 @@ class qa_header_payload_demux (gr_unittest.TestCase):
]
self.assertEqual(expected_tags_payload, ptags_payload)
+ def test_001_headerpadding (self):
+ """ Like test 1, but with header padding. """
+ n_zeros = 3
+ header = (1, 2, 3)
+ header_padding = 1
+ payload = tuple(range(5, 20))
+ data_signal = (0,) * n_zeros + header + payload
+ trigger_signal = [0,] * len(data_signal)
+ trigger_signal[n_zeros] = 1
+ # This is dropped:
+ testtag1 = make_tag('tag1', 0, 0)
+ # This goes on output 0, item 0:
+ testtag2 = make_tag('tag2', 23, n_zeros)
+ # This goes on output 0, item 2:
+ testtag3 = make_tag('tag3', 42, n_zeros + len(header) - 1)
+ # This goes on output 1, item 3:
+ testtag4 = make_tag('tag4', 314, n_zeros + len(header) + 3)
+ data_src = blocks.vector_source_f(
+ data_signal,
+ False,
+ tags=(testtag1, testtag2, testtag3, testtag4)
+ )
+ trigger_src = blocks.vector_source_b(trigger_signal, False)
+ hpd = digital.header_payload_demux(
+ len(header),
+ 1, # Items per symbol
+ 0, # Guard interval
+ "frame_len", # TSB tag key
+ "detect", # Trigger tag key
+ False, # No symbols please
+ gr.sizeof_float, # Item size
+ "", # Timing tag key
+ 1.0, # Samp rate
+ (), # No special tags
+ header_padding
+ )
+ mock_header_demod = HeaderToMessageBlock(
+ numpy.float32,
+ len(header),
+ [len(payload)]
+ )
+ header_sink = blocks.vector_sink_f()
+ payload_sink = blocks.vector_sink_f()
+ self.connect_all_blocks(data_src, trigger_src, hpd, mock_header_demod,
payload_sink, header_sink)
+ self.run_tb(payload_sink, len(payload), header_sink, len(header)+2)
+ # Check values
+ # Header now is padded:
+ self.assertEqual(header_sink.data(), (0,) + header + (payload[0],))
+ self.assertEqual(payload_sink.data(), payload)
+ ptags_header = []
+ for tag in header_sink.tags():
+ ptag = gr.tag_to_python(tag)
+ ptags_header.append({'key': ptag.key, 'offset': ptag.offset})
+ expected_tags_header = [
+ {'key': 'tag2', 'offset': 1},
+ {'key': 'tag3', 'offset': 3},
+ ]
+ self.assertEqual(expected_tags_header, ptags_header)
+ ptags_payload = []
+ for tag in payload_sink.tags():
+ ptag = gr.tag_to_python(tag)
+ ptags_payload.append({'key': ptag.key, 'offset': ptag.offset})
+ expected_tags_payload = [
+ {'key': 'frame_len', 'offset': 0},
+ {'key': 'tag4', 'offset': 3},
+ ]
+ self.assertEqual(expected_tags_payload, ptags_payload)
+
+ def test_001_headerpadding_payload_offset (self):
+ """ Like test 1, but with header padding + payload offset. """
+ n_zeros = 3
+ header = (1, 2, 3)
+ header_padding = 1
+ payload_offset = -1
+ payload = tuple(range(5, 20))
+ data_signal = (0,) * n_zeros + header + payload + (0,) * 100
+ trigger_signal = [0,] * len(data_signal)
+ trigger_signal[n_zeros] = 1
+ # This goes on output 1, item 3 + 1 (for payload offset)
+ testtag4 = make_tag('tag4', 314, n_zeros + len(header) + 3)
+ data_src = blocks.vector_source_f(
+ data_signal,
+ False,
+ tags=(testtag4,)
+ )
+ trigger_src = blocks.vector_source_b(trigger_signal, False)
+ hpd = digital.header_payload_demux(
+ len(header),
+ 1, # Items per symbol
+ 0, # Guard interval
+ "frame_len", # TSB tag key
+ "detect", # Trigger tag key
+ False, # No symbols please
+ gr.sizeof_float, # Item size
+ "", # Timing tag key
+ 1.0, # Samp rate
+ (), # No special tags
+ header_padding
+ )
+ self.assertEqual(pmt.length(hpd.message_ports_in()), 2) #extra system
port defined for you
+ header_sink = blocks.vector_sink_f()
+ payload_sink = blocks.vector_sink_f()
+ self.tb.connect(data_src, (hpd, 0))
+ self.tb.connect(trigger_src, (hpd, 1))
+ self.tb.connect((hpd, 0), header_sink)
+ self.tb.connect((hpd, 1), payload_sink)
+ self.tb.start()
+ time.sleep(.2) # Need this, otherwise, the next message is ignored
+ hpd.to_basic_block()._post(
+ pmt.intern('header_data'),
+ pmt.to_pmt({'frame_len': len(payload), 'payload_offset':
payload_offset})
+ )
+ while len(payload_sink.data()) < len(payload):
+ time.sleep(.2)
+ self.tb.stop()
+ self.tb.wait()
+ # Header is now padded:
+ self.assertEqual(header_sink.data(), (0,) + header + (payload[0],))
+ # Payload is now offset:
+ self.assertEqual(
+ payload_sink.data(),
+ data_signal[n_zeros + len(header) + payload_offset:n_zeros +
len(header) + payload_offset + len(payload)]
+ )
+ ptags_payload = {}
+ for tag in payload_sink.tags():
+ ptag = gr.tag_to_python(tag)
+ ptags_payload[ptag.key] = ptag.offset
+ expected_tags_payload = {
+ 'frame_len': 0,
+ 'payload_offset': 0,
+ 'tag4': 3 - payload_offset,
+ }
+ self.assertEqual(expected_tags_payload, ptags_payload)
+
+
def test_002_symbols (self):
- """
+ """
Same as before, but operate on symbols
"""
n_zeros = 1
@@ -207,25 +369,13 @@ class qa_header_payload_demux (gr_unittest.TestCase):
trigger_signal = [0,] * len(data_signal)
trigger_signal[n_zeros] = 1
# This is dropped:
- testtag1 = gr.tag_t()
- testtag1.offset = 0
- testtag1.key = pmt.string_to_symbol('tag1')
- testtag1.value = pmt.from_long(0)
+ testtag1 = make_tag('tag1', 0, 0)
# This goes on output 0, item 0 (from the GI)
- testtag2 = gr.tag_t()
- testtag2.offset = n_zeros
- testtag2.key = pmt.string_to_symbol('tag2')
- testtag2.value = pmt.from_long(23)
+ testtag2 = make_tag('tag2', 23, n_zeros)
# This goes on output 0, item 0 (middle of the header symbol)
- testtag3 = gr.tag_t()
- testtag3.offset = n_zeros + gi + 1
- testtag3.key = pmt.string_to_symbol('tag3')
- testtag3.value = pmt.from_long(42)
+ testtag3 = make_tag('tag3', 42, n_zeros + gi + 1)
# This goes on output 1, item 1 (middle of the first payload symbol)
- testtag4 = gr.tag_t()
- testtag4.offset = n_zeros + (gi + items_per_symbol) * 2 + 1
- testtag4.key = pmt.string_to_symbol('tag4')
- testtag4.value = pmt.from_long(314)
+ testtag4 = make_tag('tag4', 314, n_zeros + (gi + items_per_symbol) * 2
+ 1)
data_src = blocks.vector_source_f(data_signal, False, tags=(testtag1,
testtag2, testtag3, testtag4))
trigger_src = blocks.vector_source_b(trigger_signal, False)
hpd = digital.header_payload_demux(
@@ -291,25 +441,20 @@ class qa_header_payload_demux (gr_unittest.TestCase):
trigger_signal[n_zeros] = 1
trigger_signal[len(data_signal)] = 1
trigger_signal[len(data_signal)+len(header_fail)+n_zeros] = 1
- tx_signal = data_signal + header_fail + (0,) * n_zeros + header +
payload2 + (0,) * 1000
+ print("Triggers at: {0} {1} {2}".format(
+ n_zeros,
+ len(data_signal),
+ len(data_signal)+len(header_fail)+n_zeros)
+ )
+ tx_signal = data_signal + \
+ header_fail + (0,) * n_zeros + \
+ header + payload2 + (0,) * 1000
# Timing tag: This is preserved and updated:
- timing_tag = gr.tag_t()
- timing_tag.offset = 0
- timing_tag.key = pmt.string_to_symbol('rx_time')
- timing_tag.value = pmt.to_pmt((0, 0))
+ timing_tag = make_tag('rx_time', (0, 0), 0)
# Rx freq tags:
- rx_freq_tag1 = gr.tag_t()
- rx_freq_tag1.offset = 0
- rx_freq_tag1.key = pmt.string_to_symbol('rx_freq')
- rx_freq_tag1.value = pmt.from_double(1.0)
- rx_freq_tag2 = gr.tag_t()
- rx_freq_tag2.offset = 29
- rx_freq_tag2.key = pmt.string_to_symbol('rx_freq')
- rx_freq_tag2.value = pmt.from_double(1.5)
- rx_freq_tag3 = gr.tag_t()
- rx_freq_tag3.offset = 30
- rx_freq_tag3.key = pmt.string_to_symbol('rx_freq')
- rx_freq_tag3.value = pmt.from_double(2.0)
+ rx_freq_tag1 = make_tag('rx_freq', 1.0, 0)
+ rx_freq_tag2 = make_tag('rx_freq', 1.5, 29)
+ rx_freq_tag3 = make_tag('rx_freq', 2.0, 30)
### Flow graph
data_src = blocks.vector_source_f(
tx_signal, False,
@@ -388,6 +533,92 @@ class qa_header_payload_demux (gr_unittest.TestCase):
self.assertEqual(tags_header, tags_expected_header)
self.assertEqual(tags_payload, tags_expected_payload)
+ def test_004_fuzz(self):
+ """
+ Long random test
+ """
+ def create_signal(
+ n_bursts,
+ header_len,
+ max_gap,
+ max_burstsize,
+ fail_rate,
+ ):
+ signal = []
+ indexes = []
+ burst_sizes = []
+ total_payload_len = 0
+ for burst_count in xrange(n_bursts):
+ gap_size = random.randint(0, max_gap)
+ signal += [0] * gap_size
+ is_failure = random.random() < fail_rate
+ if not is_failure:
+ burst_size = random.randint(0, max_burstsize)
+ else:
+ burst_size = 0
+ total_payload_len += burst_size
+ indexes += [len(signal)]
+ signal += [1] * header_len
+ signal += [2] * burst_size
+ burst_sizes += [burst_size]
+ return (signal, indexes, total_payload_len, burst_sizes)
+ def indexes_to_triggers(indexes, signal_len):
+ """
+ Convert indexes to a mix of trigger signals and tags
+ """
+ trigger_signal = [0] * signal_len
+ trigger_tags = []
+ for index in indexes:
+ if random.random() > 0.5:
+ trigger_signal[index] = 1
+ else:
+ trigger_tags += [make_tag('detect', True, index)]
+ return (trigger_signal, trigger_tags)
+ ### Go, go, go
+ # The divide-by-20 means we'll usually get the same random seed
+ # between the first run and the XML run.
+ random_seed = int(time.time()/20)
+ random.seed(random_seed)
+ print("Random seed: {0}".format(random_seed))
+ n_bursts = 400
+ header_len = 5
+ max_gap = 50
+ max_burstsize = 100
+ fail_rate = 0.05
+ signal, indexes, total_payload_len, burst_sizes = create_signal(
+ n_bursts, header_len, max_gap, max_burstsize, fail_rate
+ )
+ trigger_signal, trigger_tags = indexes_to_triggers(indexes,
len(signal))
+ # Flow graph
+ data_src = blocks.vector_source_f(
+ signal, False,
+ tags=trigger_tags
+ )
+ trigger_src = blocks.vector_source_b(trigger_signal, False)
+ hpd = digital.header_payload_demux(
+ header_len=header_len,
+ items_per_symbol=1,
+ guard_interval=0,
+ length_tag_key="frame_len",
+ trigger_tag_key="detect",
+ output_symbols=False,
+ itemsize=gr.sizeof_float,
+ timing_tag_key='rx_time',
+ samp_rate=1.0,
+ special_tags=('rx_freq',),
+ )
+ mock_header_demod = HeaderToMessageBlock(
+ numpy.float32,
+ header_len,
+ burst_sizes
+ )
+ header_sink = blocks.vector_sink_f()
+ payload_sink = blocks.vector_sink_f()
+ self.connect_all_blocks(data_src, trigger_src, hpd, mock_header_demod,
payload_sink, header_sink)
+ self.run_tb(payload_sink, total_payload_len, header_sink,
header_len*n_bursts)
+ self.assertEqual(header_sink.data(), tuple([1]*header_len*n_bursts))
+ self.assertEqual(payload_sink.data(), tuple([2]*total_payload_len))
+
if __name__ == '__main__':
gr_unittest.run(qa_header_payload_demux, "qa_header_payload_demux.xml")