aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorMatthias P. Braendli <matthias.braendli@mpb.li>2019-05-06 17:18:25 +0200
committerMatthias P. Braendli <matthias.braendli@mpb.li>2019-05-06 17:18:25 +0200
commit107dca66a069c5e317d040360b8aafd62c8282db (patch)
tree534e9e4860327b8b49170f103b9f7419832c1928 /src
parent473232ab177a4811115ff5713bb25893448dafd7 (diff)
downloaddabmod-107dca66a069c5e317d040360b8aafd62c8282db.tar.gz
dabmod-107dca66a069c5e317d040360b8aafd62c8282db.tar.bz2
dabmod-107dca66a069c5e317d040360b8aafd62c8282db.zip
Implement EDI over TCP
Diffstat (limited to 'src')
-rw-r--r--src/DabMod.cpp21
-rw-r--r--src/EtiReader.cpp115
-rw-r--r--src/EtiReader.h12
3 files changed, 107 insertions, 41 deletions
diff --git a/src/DabMod.cpp b/src/DabMod.cpp
index 7ebde12..1f435bf 100644
--- a/src/DabMod.cpp
+++ b/src/DabMod.cpp
@@ -306,11 +306,11 @@ int launch_modulator(int argc, char* argv[])
// setMaxDelay wants number of AF packets, which correspond to 24ms ETI frames
ediInput.setMaxDelay(lroundf(mod_settings.edi_max_delay_ms / 24.0f));
}
- EdiUdpInput ediUdpInput(ediInput);
+ EdiTransport ediTransport(ediInput);
- ediUdpInput.Open(mod_settings.inputName);
- if (not ediUdpInput.isEnabled()) {
- throw runtime_error("inputTransport is edi, but ediUdpInput is not enabled");
+ ediTransport.Open(mod_settings.inputName);
+ if (not ediTransport.isEnabled()) {
+ throw runtime_error("inputTransport is edi, but ediTransport is not enabled");
}
Flowgraph flowgraph;
@@ -329,16 +329,27 @@ int launch_modulator(int argc, char* argv[])
bool first_frame = true;
+ auto frame_received_tp = chrono::steady_clock::now();
+
while (running) {
while (running and not ediReader.isFrameReady()) {
try {
- ediUdpInput.rxPacket();
+ bool packet_received = ediTransport.rxPacket();
+ if (packet_received) {
+ frame_received_tp = chrono::steady_clock::now();
+ }
}
catch (const std::runtime_error& e) {
etiLog.level(warn) << "EDI input: " << e.what();
running = 0;
break;
}
+
+ if (frame_received_tp + chrono::seconds(10) < chrono::steady_clock::now()) {
+ etiLog.level(error) << "No EDI data received in 10 seconds.";
+ running = 0;
+ break;
+ }
}
if (not running) {
diff --git a/src/EtiReader.cpp b/src/EtiReader.cpp
index 4c5ad79..94c362a 100644
--- a/src/EtiReader.cpp
+++ b/src/EtiReader.cpp
@@ -547,7 +547,7 @@ void EdiReader::assemble()
m_frameReady = true;
}
-EdiUdpInput::EdiUdpInput(EdiDecoder::ETIDecoder& decoder) :
+EdiTransport::EdiTransport(EdiDecoder::ETIDecoder& decoder) :
m_enabled(false),
m_port(0),
m_bindto("0.0.0.0"),
@@ -555,49 +555,100 @@ EdiUdpInput::EdiUdpInput(EdiDecoder::ETIDecoder& decoder) :
m_decoder(decoder) { }
-void EdiUdpInput::Open(const std::string& uri)
+void EdiTransport::Open(const std::string& uri)
{
etiLog.level(info) << "Opening EDI :" << uri;
- size_t found_port = uri.find_first_of(":", 6);
- if (found_port == string::npos) {
- throw std::invalid_argument("EDI input port must be provided");
- }
- m_port = std::stoi(uri.substr(found_port+1));
- std::string host_full = uri.substr(6, found_port-6);// ignore udp://
- size_t found_mcast = host_full.find_first_of("@"); //have multicast address:
- if (found_mcast != string::npos) {
- if (found_mcast > 0) {
- m_bindto = host_full.substr(0, found_mcast);
+ const string proto = uri.substr(0, 3);
+ if (proto == "udp") {
+ size_t found_port = uri.find_first_of(":", 6);
+ if (found_port == string::npos) {
+ throw std::invalid_argument("EDI UDP input port must be provided");
}
- m_mcastaddr = host_full.substr(found_mcast+1);
- }
- else if (found_port != 6) {
- m_bindto=host_full;
+
+ m_port = std::stoi(uri.substr(found_port+1));
+ std::string host_full = uri.substr(6, found_port-6);// skip udp://
+ size_t found_mcast = host_full.find_first_of("@"); //have multicast address:
+ if (found_mcast != string::npos) {
+ if (found_mcast > 0) {
+ m_bindto = host_full.substr(0, found_mcast);
+ }
+ m_mcastaddr = host_full.substr(found_mcast+1);
+ }
+ else if (found_port != 6) {
+ m_bindto=host_full;
+ }
+
+ etiLog.level(info) << "EDI UDP input: host:" << m_bindto <<
+ ", source:" << m_mcastaddr << ", port:" << m_port;
+
+ // The max_fragments_queued is only a protection against a runaway
+ // memory usage.
+ // Rough calculation:
+ // 300 seconds, 24ms per frame, up to 20 fragments per frame
+ const size_t max_fragments_queued = 20 * 300 * 1000 / 24;
+
+ m_udp_rx.start(m_port, m_bindto, m_mcastaddr, max_fragments_queued);
+ m_proto = Proto::UDP;
+ m_enabled = true;
}
+ else if (proto == "tcp") {
+ size_t found_port = uri.find_first_of(":", 6);
+ if (found_port == string::npos) {
+ throw std::invalid_argument("EDI TCP input port must be provided");
+ }
- etiLog.level(info) << "EDI input: host:" << m_bindto <<
- ", source:" << m_mcastaddr << ", port:" << m_port;
+ m_port = std::stoi(uri.substr(found_port+1));
+ const std::string hostname = uri.substr(6, found_port-6);// skip tcp://
- // The max_fragments_queued is only a protection against a runaway
- // memory usage.
- // Rough calculation:
- // 300 seconds, 24ms per frame, up to 20 fragments per frame
- const size_t max_fragments_queued = 20 * 300 * 1000 / 24;
+ etiLog.level(info) << "EDI TCP connect to " << hostname << ":" << m_port;
- m_udp_rx.start(m_port, m_bindto, m_mcastaddr, max_fragments_queued);
- m_enabled = true;
+ m_tcpclient.connect(hostname, m_port);
+ m_proto = Proto::TCP;
+ m_enabled = true;
+ }
+ else {
+ throw std::invalid_argument("ETI protocol '" + proto + "' unknown");
+ }
}
-bool EdiUdpInput::rxPacket()
+bool EdiTransport::rxPacket()
{
- auto udp_data = m_udp_rx.get_packet_buffer();
+ switch (m_proto) {
+ case Proto::UDP:
+ {
+ auto udp_data = m_udp_rx.get_packet_buffer();
- if (udp_data.empty()) {
- return false;
- }
+ if (udp_data.empty()) {
+ return false;
+ }
- m_decoder.push_packet(udp_data);
- return true;
+ m_decoder.push_packet(udp_data);
+ return true;
+ }
+ case Proto::TCP:
+ {
+ m_tcpbuffer.resize(4096);
+ const int timeout_ms = 1000;
+ try {
+ ssize_t ret = m_tcpclient.recv(m_tcpbuffer.data(), m_tcpbuffer.size(), 0, timeout_ms);
+ if (ret == 0 or ret == -1) {
+ return false;
+ }
+ else if (ret > (ssize_t)m_tcpbuffer.size()) {
+ throw logic_error("EDI TCP: invalid recv() return value");
+ }
+ else {
+ m_tcpbuffer.resize(ret);
+ m_decoder.push_bytes(m_tcpbuffer);
+ return true;
+ }
+ }
+ catch (const TCPSocket::Timeout&) {
+ return false;
+ }
+ }
+ }
+ throw logic_error("Incomplete rxPacket implementation!");
}
#endif // HAVE_EDI
diff --git a/src/EtiReader.h b/src/EtiReader.h
index 554231e..38f7903 100644
--- a/src/EtiReader.h
+++ b/src/EtiReader.h
@@ -34,6 +34,7 @@
#include "Eti.h"
#include "Log.h"
#include "FicSource.h"
+#include "Socket.h"
#include "SubchannelSource.h"
#include "TimestampDecoder.h"
#include "lib/edi/ETIDecoder.hpp"
@@ -185,13 +186,12 @@ private:
};
/* The EDI input does not use the inputs defined in InputReader.h, as they were
- * designed for ETI. It uses the EdiUdpInput which in turn uses a threaded
+ * designed for ETI. It uses the EdiTransport which in turn uses a threaded
* receiver.
*/
-
-class EdiUdpInput {
+class EdiTransport {
public:
- EdiUdpInput(EdiDecoder::ETIDecoder& decoder);
+ EdiTransport(EdiDecoder::ETIDecoder& decoder);
void Open(const std::string& uri);
@@ -209,7 +209,11 @@ class EdiUdpInput {
std::string m_bindto;
std::string m_mcastaddr;
+ enum class Proto { UDP, TCP };
+ Proto m_proto;
UdpReceiver m_udp_rx;
+ std::vector<uint8_t> m_tcpbuffer;
+ TCPClient m_tcpclient;
EdiDecoder::ETIDecoder& m_decoder;
};
#endif