summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/ConfigParser.cpp3
-rw-r--r--src/input/Udp.cpp217
-rw-r--r--src/input/Udp.h31
3 files changed, 241 insertions, 10 deletions
diff --git a/src/ConfigParser.cpp b/src/ConfigParser.cpp
index f0ab13c..6d67b7b 100644
--- a/src/ConfigParser.cpp
+++ b/src/ConfigParser.cpp
@@ -736,6 +736,9 @@ static void setup_subchannel_from_ptree(DabSubchannel* subchan,
etiLog.level(warn) << "Using untested ipc:// zeromq input";
}
}
+ else if (proto == "sti-rtp") {
+ subchan->input = make_shared<Inputs::Sti_d_Rtp>();
+ }
else {
stringstream ss;
ss << "Subchannel with uid " << subchanuid <<
diff --git a/src/input/Udp.cpp b/src/input/Udp.cpp
index a238d9b..5d11d1c 100644
--- a/src/input/Udp.cpp
+++ b/src/input/Udp.cpp
@@ -2,7 +2,7 @@
Copyright (C) 2009 Her Majesty the Queen in Right of Canada (Communications
Research Center Canada)
- Copyright (C) 2016
+ Copyright (C) 2017
Matthias P. Braendli, matthias.braendli@mpb.li
http://www.opendigitalradio.org
@@ -54,6 +54,16 @@ int Udp::open(const std::string& name)
throw invalid_argument(ss.str());
}
+ m_name = name;
+
+ openUdpSocket(endpoint);
+
+ return 0;
+}
+
+void Udp::openUdpSocket(const std::string& endpoint)
+{
+ const auto colon_pos = endpoint.find_first_of(":");
const auto address = endpoint.substr(0, colon_pos);
const auto port_str = endpoint.substr(colon_pos + 1);
@@ -83,14 +93,10 @@ int Udp::open(const std::string& name)
ss << "Could not set non-blocking UDP socket: " << inetErrMsg;
throw runtime_error(ss.str());
}
-
- return 0;
}
int Udp::readFrame(uint8_t* buffer, size_t size)
{
- uint8_t* data = reinterpret_cast<uint8_t*>(buffer);
-
// Regardless of buffer contents, try receiving data.
UdpPacket packet;
int ret = m_sock.receive(packet);
@@ -107,10 +113,10 @@ int Udp::readFrame(uint8_t* buffer, size_t size)
// Take data from the buffer if it contains enough data,
// in any case write the buffer
if (m_buffer.size() >= (size_t)size) {
- std::copy(m_buffer.begin(), m_buffer.begin() + size, data);
+ std::copy(m_buffer.begin(), m_buffer.begin() + size, buffer);
}
else {
- memset(data, 0x0, size);
+ memset(buffer, 0x0, size);
}
return size;
@@ -131,4 +137,199 @@ int Udp::close()
return m_sock.close();
}
-};
+
+// ETSI EN 300 797 V1.2.1 ch 8.2.1.2
+#define STI_SYNC_LEN 3
+static const uint8_t STI_FSync0[STI_SYNC_LEN] = { 0x1F, 0x90, 0xCA };
+static const uint8_t STI_FSync1[STI_SYNC_LEN] = { 0xE0, 0x6F, 0x35 };
+
+static const size_t RTP_HEADER_LEN = 12;
+
+static bool stiSyncValid(const uint8_t *buf)
+{
+ return (memcmp(buf+1, STI_FSync0, sizeof(STI_FSync0)) == 0) or
+ (memcmp(buf+1, STI_FSync1, sizeof(STI_FSync1)) == 0);
+}
+
+static bool rtpHeaderValid(const uint8_t *buf)
+{
+ uint32_t version = (buf[0] & 0xC0) >> 6;
+ uint32_t payloadType = (buf[1] & 0x7F);
+ return (version == 2 and payloadType == 34);
+}
+
+static uint16_t unpack2(const uint8_t *buf)
+{
+ return (buf[0] << 8) | buf[1];
+}
+
+int Sti_d_Rtp::open(const std::string& name)
+{
+ // Skip the sti-rtp:// part if it is present
+ const string endpoint = (name.substr(0, 10) == "sti-rtp://") ?
+ name.substr(10) : name;
+
+ // The endpoint should be address:port
+ const auto colon_pos = endpoint.find_first_of(":");
+ if (colon_pos == string::npos) {
+ stringstream ss;
+ ss << "'" << name <<
+ " is an invalid format for sti-rtp address: "
+ "expected [sti-rtp://]address:port";
+ throw invalid_argument(ss.str());
+ }
+
+ m_name = name;
+
+ openUdpSocket(endpoint);
+
+ return 0;
+}
+
+void Sti_d_Rtp::receive_packet()
+{
+ UdpPacket packet;
+ int ret = m_sock.receive(packet);
+
+ if (ret == -1) {
+ stringstream ss;
+ ss << "Could not read from UDP socket: " << inetErrMsg;
+ throw runtime_error(ss.str());
+ }
+
+ if (packet.getSize() == 0) {
+ // No packet was received
+ return;
+ }
+
+ const size_t STI_FC_LEN = 8;
+
+ if (packet.getSize() < RTP_HEADER_LEN + STI_SYNC_LEN + STI_FC_LEN) {
+ etiLog.level(info) << "Received too small RTP packet for " <<
+ m_name;
+ return;
+ }
+
+ if (not rtpHeaderValid(packet.getData())) {
+ etiLog.level(info) << "Received invalid RTP header for " <<
+ m_name;
+ return;
+ }
+
+ // STI(PI, X)
+ size_t index = RTP_HEADER_LEN;
+ const uint8_t *buf = packet.getData();
+
+ // SYNC
+ index++; // Advance over STAT
+
+ // FSYNC
+ if (not stiSyncValid(buf + index)) {
+ etiLog.level(info) << "Received invalid STI-D header for " <<
+ m_name;
+ return;
+ }
+ index += 3;
+
+ // TFH
+ // DFS
+ uint16_t DFS = unpack2(buf+index);
+ index += 4;
+ if (DFS == 0) {
+ etiLog.level(info) << "Received STI data with DFS=0 for " <<
+ m_name;
+ return;
+ }
+ if (packet.getSize() < index + DFS) {
+ etiLog.level(info) << "Received STI too small for given DFS for " <<
+ m_name;
+ return;
+ }
+
+ // CFS
+ uint32_t CFS = unpack2(buf+index);
+ index += 2;
+ if (CFS != 0) {
+ etiLog.level(info) << "Ignoring STI control data for " <<
+ m_name;
+ }
+
+ // FD
+ // STAT
+ // ERR
+ index += 1;
+
+ // FC
+ // SPID
+ index += 2;
+ // rfa DL
+ index += 2;
+ // rfa
+ index += 1;
+
+ // DFCT
+ uint32_t DFCTL = buf[index];
+ index += 1;
+ uint32_t DFCTH = buf[index] >> 3;
+ uint32_t NST = unpack2(buf+index) & 0x7FF; // 11 bits
+ index += 2;
+
+ if (packet.getSize() < index + 4*NST) {
+ etiLog.level(info) << "Received STI too small to contain NST for " <<
+ m_name;
+ return;
+ }
+
+ if (NST == 0) {
+ etiLog.level(info) << "Received STI with NST=0 for " <<
+ m_name;
+ return;
+ }
+ else {
+ // Take the first stream even if NST > 1
+ uint32_t STL = unpack2(buf+index) & 0x1FFF; // 13 bits
+ uint32_t CRCSTF = buf[index+3] & 0x80 >> 7; // 7th bit
+ index += NST*4+4;
+
+ const size_t dataSize = STL - 2*CRCSTF;
+ const size_t frameNumber = DFCTH*250 + DFCTL;
+ // TODO must align framenumber with ETI
+
+ vec_u8 data(dataSize);
+ copy(buf+index, buf+index+dataSize, data.begin());
+
+ // TODO reordering
+ m_queue.push_back(data);
+ }
+
+ if (NST > 1) {
+ etiLog.level(info) << "Ignoring STI supernumerary STC streams for " <<
+ m_name;
+ }
+}
+
+int Sti_d_Rtp::readFrame(uint8_t* buffer, size_t size)
+{
+ // Make sure we fill faster than we consume in case there
+ // are pending packets.
+ receive_packet();
+ receive_packet();
+
+ if (m_queue.empty()) {
+ memset(buffer, 0x0, size);
+ }
+ else if (m_queue.front().size() != size) {
+ etiLog.level(warn) << "Invalid input data size for STI " << m_name <<
+ " : RX " << m_queue.front().size() << " expected " << size;
+ memset(buffer, 0x0, size);
+ m_queue.pop_front();
+ }
+ else {
+ copy(m_queue.front().begin(), m_queue.front().end(), buffer);
+ m_queue.pop_front();
+ }
+
+ return 0;
+}
+
+}
diff --git a/src/input/Udp.h b/src/input/Udp.h
index 379dbf3..dc01486 100644
--- a/src/input/Udp.h
+++ b/src/input/Udp.h
@@ -2,7 +2,7 @@
Copyright (C) 2009 Her Majesty the Queen in Right of Canada (Communications
Research Center Canada)
- Copyright (C) 2016
+ Copyright (C) 2017
Matthias P. Braendli, matthias.braendli@mpb.li
http://www.opendigitalradio.org
@@ -28,11 +28,16 @@
#include <string>
#include <vector>
+#include <deque>
+#include <boost/thread.hpp>
#include "input/inputs.h"
#include "UdpSocket.h"
namespace Inputs {
+/* A Udp input that takes incoming datagrams, concatenates them
+ * together and gives them back.
+ */
class Udp : public InputBase {
public:
virtual int open(const std::string& name);
@@ -40,13 +45,35 @@ class Udp : public InputBase {
virtual int setBitrate(int bitrate);
virtual int close();
- private:
+ protected:
UdpSocket m_sock;
+ std::string m_name;
+
+ void openUdpSocket(const std::string& endpoint);
+ private:
// The content of the UDP packets gets written into the
// buffer, and the UDP packet boundaries disappear there.
std::vector<uint8_t> m_buffer;
};
+/* An input for STI-D(LI) carried in STI(PI, X) inside RTP inside UDP.
+ * Reorders incoming datagrams which must contain an RTP header and valid
+ * STI-D data.
+ *
+ * This is intended to be compatible with encoders from AVT.
+ */
+class Sti_d_Rtp : public Udp {
+ using vec_u8 = std::vector<uint8_t>;
+
+ public:
+ virtual int open(const std::string& name);
+ virtual int readFrame(uint8_t* buffer, size_t size);
+
+ private:
+ void receive_packet(void);
+ std::deque<vec_u8> m_queue;
+};
+
};