aboutsummaryrefslogtreecommitdiffstats
path: root/lib
diff options
context:
space:
mode:
Diffstat (limited to 'lib')
-rw-r--r--lib/Socket.cpp7
-rw-r--r--lib/Socket.h4
-rw-r--r--lib/edi/ETIDecoder.cpp37
-rw-r--r--lib/edi/ETIDecoder.hpp28
-rw-r--r--lib/edi/common.cpp62
-rw-r--r--lib/edi/common.hpp26
6 files changed, 120 insertions, 44 deletions
diff --git a/lib/Socket.cpp b/lib/Socket.cpp
index 159de7e..bc1b179 100644
--- a/lib/Socket.cpp
+++ b/lib/Socket.cpp
@@ -2,7 +2,7 @@
Copyright (C) 2003, 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the
Queen in Right of Canada (Communications Research Center Canada)
- Copyright (C) 2019
+ Copyright (C) 2020
Matthias P. Braendli, matthias.braendli@mpb.li
http://www.opendigitalradio.org
@@ -932,6 +932,11 @@ void TCPSendClient::sendall(const std::vector<uint8_t>& buffer)
}
m_queue.push(buffer);
+
+ if (m_queue.size() > MAX_QUEUE_SIZE) {
+ vector<uint8_t> discard;
+ m_queue.try_pop(discard);
+ }
}
void TCPSendClient::process()
diff --git a/lib/Socket.h b/lib/Socket.h
index 84def40..8c6f8a9 100644
--- a/lib/Socket.h
+++ b/lib/Socket.h
@@ -2,7 +2,7 @@
Copyright (C) 2003, 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the
Queen in Right of Canada (Communications Research Center Canada)
- Copyright (C) 2019
+ Copyright (C) 2020
Matthias P. Braendli, matthias.braendli@mpb.li
http://www.opendigitalradio.org
@@ -311,7 +311,7 @@ class TCPSendClient {
bool m_is_connected = false;
TCPSocket m_sock;
- static constexpr size_t MAX_QUEUE_SIZE = 1024;
+ static constexpr size_t MAX_QUEUE_SIZE = 512;
ThreadsafeQueue<std::vector<uint8_t> > m_queue;
std::atomic<bool> m_running;
std::string m_exception_data;
diff --git a/lib/edi/ETIDecoder.cpp b/lib/edi/ETIDecoder.cpp
index 1fa9c3c..88a7333 100644
--- a/lib/edi/ETIDecoder.cpp
+++ b/lib/edi/ETIDecoder.cpp
@@ -1,5 +1,5 @@
/*
- Copyright (C) 2019
+ Copyright (C) 2020
Matthias P. Braendli, matthias.braendli@mpb.li
http://opendigitalradio.org
@@ -30,9 +30,9 @@ namespace EdiDecoder {
using namespace std;
-ETIDecoder::ETIDecoder(ETIDataCollector& data_collector, bool verbose) :
+ETIDecoder::ETIDecoder(ETIDataCollector& data_collector) :
m_data_collector(data_collector),
- m_dispatcher(std::bind(&ETIDecoder::packet_completed, this), verbose)
+ m_dispatcher(std::bind(&ETIDecoder::packet_completed, this))
{
using std::placeholders::_1;
using std::placeholders::_2;
@@ -44,6 +44,12 @@ ETIDecoder::ETIDecoder(ETIDataCollector& data_collector, bool verbose) :
std::bind(&ETIDecoder::decode_estn, this, _1, _2));
m_dispatcher.register_tag("*dmy",
std::bind(&ETIDecoder::decode_stardmy, this, _1, _2));
+ m_dispatcher.register_tagpacket_handler(std::bind(&ETIDecoder::decode_tagpacket, this, _1));
+}
+
+void ETIDecoder::set_verbose(bool verbose)
+{
+ m_dispatcher.set_verbose(verbose);
}
void ETIDecoder::push_bytes(const vector<uint8_t> &buf)
@@ -63,7 +69,7 @@ void ETIDecoder::setMaxDelay(int num_af_packets)
#define AFPACKET_HEADER_LEN 10 // includes SYNC
-bool ETIDecoder::decode_starptr(const vector<uint8_t> &value, uint16_t)
+bool ETIDecoder::decode_starptr(const std::vector<uint8_t>& value, const tag_name_t& n)
{
if (value.size() != 0x40 / 8) {
etiLog.log(warn, "Incorrect length %02lx for *PTR", value.size());
@@ -83,7 +89,7 @@ bool ETIDecoder::decode_starptr(const vector<uint8_t> &value, uint16_t)
return true;
}
-bool ETIDecoder::decode_deti(const vector<uint8_t> &value, uint16_t)
+bool ETIDecoder::decode_deti(const std::vector<uint8_t>& value, const tag_name_t& n)
{
/*
uint16_t detiHeader = fct | (fcth << 8) | (rfudf << 13) | (ficf << 14) | (atstf << 15);
@@ -145,6 +151,8 @@ bool ETIDecoder::decode_deti(const vector<uint8_t> &value, uint16_t)
i += 4;
m_data_collector.update_edi_time(utco, seconds);
+ m_received_tagpacket.timestamp.utco = utco;
+ m_received_tagpacket.timestamp.seconds = seconds;
fc.tsta = read_24b(value.begin() + i);
i += 3;
@@ -152,8 +160,12 @@ bool ETIDecoder::decode_deti(const vector<uint8_t> &value, uint16_t)
else {
// Null timestamp, ETSI ETS 300 799, C.2.2
fc.tsta = 0xFFFFFF;
+ m_received_tagpacket.timestamp.utco = 0;
+ m_received_tagpacket.timestamp.seconds = 0;
}
+ m_received_tagpacket.timestamp.tsta = fc.tsta;
+
if (fc.ficf) {
vector<uint8_t> fic(fic_length);
@@ -183,12 +195,13 @@ bool ETIDecoder::decode_deti(const vector<uint8_t> &value, uint16_t)
return true;
}
-bool ETIDecoder::decode_estn(const vector<uint8_t> &value, uint16_t n)
+bool ETIDecoder::decode_estn(const std::vector<uint8_t>& value, const tag_name_t& name)
{
uint32_t sstc = read_24b(value.begin());
eti_stc_data stc;
+ const uint8_t n = name[3];
stc.stream_index = n - 1; // n is 1-indexed
stc.scid = (sstc >> 18) & 0x3F;
stc.sad = (sstc >> 8) & 0x3FF;
@@ -207,14 +220,22 @@ bool ETIDecoder::decode_estn(const vector<uint8_t> &value, uint16_t n)
return true;
}
-bool ETIDecoder::decode_stardmy(const vector<uint8_t>& /*value*/, uint16_t)
+bool ETIDecoder::decode_stardmy(const std::vector<uint8_t>&, const tag_name_t&)
+{
+ return true;
+}
+
+bool ETIDecoder::decode_tagpacket(const std::vector<uint8_t>& value)
{
+ m_received_tagpacket.tagpacket = value;
return true;
}
void ETIDecoder::packet_completed()
{
- m_data_collector.assemble();
+ ReceivedTagPacket tp;
+ swap(tp, m_received_tagpacket);
+ m_data_collector.assemble(move(tp));
}
}
diff --git a/lib/edi/ETIDecoder.hpp b/lib/edi/ETIDecoder.hpp
index f5d0b81..ffa9037 100644
--- a/lib/edi/ETIDecoder.hpp
+++ b/lib/edi/ETIDecoder.hpp
@@ -1,5 +1,5 @@
/*
- Copyright (C) 2019
+ Copyright (C) 2020
Matthias P. Braendli, matthias.braendli@mpb.li
http://opendigitalradio.org
@@ -57,6 +57,12 @@ struct eti_stc_data {
uint16_t stl(void) const { return mst.size() / 8; }
};
+struct ReceivedTagPacket {
+ std::vector<uint8_t> tagpacket;
+ frame_timestamp_t timestamp;
+};
+
+
/* A class that receives multiplex data must implement the interface described
* in the ETIDataCollector. This can be e.g. a converter to ETI, or something that
* prepares data structures for a modulator.
@@ -87,8 +93,9 @@ class ETIDataCollector {
virtual void add_subchannel(eti_stc_data&& stc) = 0;
- // Tell the ETIWriter that the AFPacket is complete
- virtual void assemble(void) = 0;
+ // Tell the consumer that the AFPacket is complete, and include
+ // the raw received TAGs
+ virtual void assemble(ReceivedTagPacket&& tagpacket) = 0;
};
/* The ETIDecoder takes care of decoding the EDI TAGs related to the transport
@@ -99,7 +106,9 @@ class ETIDataCollector {
*/
class ETIDecoder {
public:
- ETIDecoder(ETIDataCollector& data_collector, bool verbose);
+ ETIDecoder(ETIDataCollector& data_collector);
+
+ void set_verbose(bool verbose);
/* Push bytes into the decoder. The buf can contain more
* than a single packet. This is useful when reading from streams
@@ -118,16 +127,19 @@ class ETIDecoder {
void setMaxDelay(int num_af_packets);
private:
- bool decode_starptr(const std::vector<uint8_t> &value, uint16_t);
- bool decode_deti(const std::vector<uint8_t> &value, uint16_t);
- bool decode_estn(const std::vector<uint8_t> &value, uint16_t n);
- bool decode_stardmy(const std::vector<uint8_t> &value, uint16_t);
+ bool decode_starptr(const std::vector<uint8_t>& value, const tag_name_t& n);
+ bool decode_deti(const std::vector<uint8_t>& value, const tag_name_t& n);
+ bool decode_estn(const std::vector<uint8_t>& value, const tag_name_t& n);
+ bool decode_stardmy(const std::vector<uint8_t>& value, const tag_name_t& n);
+
+ bool decode_tagpacket(const std::vector<uint8_t>& value);
void packet_completed();
ETIDataCollector& m_data_collector;
TagDispatcher m_dispatcher;
+ ReceivedTagPacket m_received_tagpacket;
};
}
diff --git a/lib/edi/common.cpp b/lib/edi/common.cpp
index 470b3ba..3005802 100644
--- a/lib/edi/common.cpp
+++ b/lib/edi/common.cpp
@@ -1,5 +1,5 @@
/*
- Copyright (C) 2019
+ Copyright (C) 2020
Matthias P. Braendli, matthias.braendli@mpb.li
http://opendigitalradio.org
@@ -26,6 +26,7 @@
#include <cassert>
#include <cmath>
#include <cstdio>
+#include <cctype>
namespace EdiDecoder {
@@ -66,7 +67,7 @@ time_t frame_timestamp_t::to_unix_epoch() const
return 946684800 + seconds - utco;
}
-double frame_timestamp_t::diff_ms(const frame_timestamp_t& other) const
+double frame_timestamp_t::diff_s(const frame_timestamp_t& other) const
{
const double lhs = (double)seconds + (tsta / 16384000.0);
const double rhs = (double)other.seconds + (other.tsta / 16384000.0);
@@ -111,10 +112,30 @@ std::chrono::system_clock::time_point frame_timestamp_t::to_system_clock() const
return ts;
}
+std::string tag_name_to_human_readable(const tag_name_t& name)
+{
+ std::string s;
+ for (const uint8_t c : name) {
+ if (isprint(c)) {
+ s += (char)c;
+ }
+ else {
+ char escaped[5];
+ snprintf(escaped, 5, "\\x%02x", c);
+ s += escaped;
+ }
+ }
+ return s;
+}
TagDispatcher::TagDispatcher(
- std::function<void()>&& af_packet_completed, bool verbose) :
- m_af_packet_completed(move(af_packet_completed))
+ std::function<void()>&& af_packet_completed) :
+ m_af_packet_completed(move(af_packet_completed)),
+ m_tagpacket_handler([](const std::vector<uint8_t>& ignore){})
+{
+}
+
+void TagDispatcher::set_verbose(bool verbose)
{
m_pft.setVerbose(verbose);
}
@@ -295,6 +316,11 @@ void TagDispatcher::register_tag(const std::string& tag, tag_handler&& h)
m_handlers[tag] = move(h);
}
+void TagDispatcher::register_tagpacket_handler(tagpacket_handler&& h)
+{
+ m_tagpacket_handler = move(h);
+}
+
bool TagDispatcher::decode_tagpacket(const vector<uint8_t> &payload)
{
@@ -326,31 +352,23 @@ bool TagDispatcher::decode_tagpacket(const vector<uint8_t> &payload)
break;
}
+ const array<uint8_t, 4> tag_name({
+ (uint8_t)tag_sz[0], (uint8_t)tag_sz[1], (uint8_t)tag_sz[2], (uint8_t)tag_sz[3]
+ });
vector<uint8_t> tag_value(taglength);
copy( payload.begin() + i+8,
payload.begin() + i+8+taglength,
tag_value.begin());
- bool tagsuccess = false;
+ bool tagsuccess = true;
bool found = false;
for (auto tag_handler : m_handlers) {
- if (tag_handler.first.size() == 4 and tag_handler.first == tag) {
- found = true;
- tagsuccess = tag_handler.second(tag_value, 0);
- }
- else if (tag_handler.first.size() == 3 and
- tag.substr(0, 3) == tag_handler.first) {
- found = true;
- uint8_t n = tag_sz[3];
- tagsuccess = tag_handler.second(tag_value, n);
- }
- else if (tag_handler.first.size() == 2 and
- tag.substr(0, 2) == tag_handler.first) {
+ if ( (tag_handler.first.size() == 4 and tag == tag_handler.first) or
+ (tag_handler.first.size() == 3 and tag.substr(0, 3) == tag_handler.first) or
+ (tag_handler.first.size() == 2 and tag.substr(0, 2) == tag_handler.first) or
+ (tag_handler.first.size() == 1 and tag.substr(0, 1) == tag_handler.first)) {
found = true;
- uint16_t n = 0;
- n = (uint16_t)(tag_sz[2]) << 8;
- n |= (uint16_t)(tag_sz[3]);
- tagsuccess = tag_handler.second(tag_value, n);
+ tagsuccess &= tag_handler.second(tag_value, tag_name);
}
}
@@ -366,6 +384,8 @@ bool TagDispatcher::decode_tagpacket(const vector<uint8_t> &payload)
}
}
+ m_tagpacket_handler(payload);
+
return success;
}
diff --git a/lib/edi/common.hpp b/lib/edi/common.hpp
index 2a9c683..498b28a 100644
--- a/lib/edi/common.hpp
+++ b/lib/edi/common.hpp
@@ -1,5 +1,5 @@
/*
- Copyright (C) 2019
+ Copyright (C) 2020
Matthias P. Braendli, matthias.braendli@mpb.li
http://opendigitalradio.org
@@ -41,7 +41,7 @@ struct frame_timestamp_t {
std::time_t to_unix_epoch() const;
std::chrono::system_clock::time_point to_system_clock() const;
- double diff_ms(const frame_timestamp_t& other) const;
+ double diff_s(const frame_timestamp_t& other) const;
frame_timestamp_t& operator+=(const std::chrono::milliseconds& ms);
@@ -55,6 +55,10 @@ struct decode_state_t {
size_t num_bytes_consumed;
};
+using tag_name_t = std::array<uint8_t, 4>;
+
+std::string tag_name_to_human_readable(const tag_name_t& name);
+
/* The TagDispatcher takes care of decoding EDI, with or without PFT, and
* will call functions when TAGs are encountered.
*
@@ -63,7 +67,10 @@ struct decode_state_t {
*/
class TagDispatcher {
public:
- TagDispatcher(std::function<void()>&& af_packet_completed, bool verbose);
+ TagDispatcher(std::function<void()>&& af_packet_completed);
+
+ void set_verbose(bool verbose);
+
/* Push bytes into the decoder. The buf can contain more
* than a single packet. This is useful when reading from streams
@@ -81,9 +88,19 @@ class TagDispatcher {
*/
void setMaxDelay(int num_af_packets);
- using tag_handler = std::function<bool(std::vector<uint8_t>, uint16_t)>;
+ /* Handler function for a tag. The first argument contains the tag value,
+ * the second argument contains the tag name */
+ using tag_handler = std::function<bool(const std::vector<uint8_t>&, const tag_name_t&)>;
+
+ /* Register a handler for a tag. If the tag string can be length 0, 1, 2, 3 or 4.
+ * If is shorter than 4, it will perform a longest match on the tag name.
+ */
void register_tag(const std::string& tag, tag_handler&& h);
+ /* The complete tagpacket can also be retrieved */
+ using tagpacket_handler = std::function<void(const std::vector<uint8_t>&)>;
+ void register_tagpacket_handler(tagpacket_handler&& h);
+
private:
decode_state_t decode_afpacket(const std::vector<uint8_t> &input_data);
bool decode_tagpacket(const std::vector<uint8_t> &payload);
@@ -93,6 +110,7 @@ class TagDispatcher {
std::vector<uint8_t> m_input_data;
std::map<std::string, tag_handler> m_handlers;
std::function<void()> m_af_packet_completed;
+ tagpacket_handler m_tagpacket_handler;
};
// Data carried inside the ODRv EDI TAG