diff options
-rw-r--r-- | doc/advanced.mux | 13 | ||||
-rw-r--r-- | doc/example.mux | 2 | ||||
-rw-r--r-- | src/ConfigParser.cpp | 3 | ||||
-rw-r--r-- | src/input/Udp.cpp | 217 | ||||
-rw-r--r-- | src/input/Udp.h | 31 |
5 files changed, 250 insertions, 16 deletions
diff --git a/doc/advanced.mux b/doc/advanced.mux index 3a00ca9..f725f84 100644 --- a/doc/advanced.mux +++ b/doc/advanced.mux @@ -165,10 +165,13 @@ subchannels { protection 4 } sub-lu { - type audio - inputfile "luschtig.mp2" - nonblock false - bitrate 128 + type dabplus + ; EXPERIMENTAL! + ; Receive STI-D(LI) carried in STI(PI, X) inside RTP using UDP. + ; This is intended to be compatible with AVT audio encoders. + ; EXPERIMENTAL! + inputfile "sti-rtp://127.0.0.1:32010" + bitrate 96 id 3 protection 3 } @@ -178,7 +181,7 @@ subchannels { ;inputfile "rick.dabp" ; example zmq input: ; Accepts connections to port 9000 from any interface. - ; Use FDK-AAC-DABplus as encoder + ; Use ODR-AudioEnc as encoder inputfile "tcp://*:9000" bitrate 96 id 1 diff --git a/doc/example.mux b/doc/example.mux index 3f2d6f4..7af47c8 100644 --- a/doc/example.mux +++ b/doc/example.mux @@ -110,7 +110,7 @@ subchannels { ; This is our DAB+ programme, using a ZeroMQ type dabplus ; Accepts connections to port 9000 from any interface. - ; Use FDK-AAC-DABplus as encoder + ; Use ODR-AudioEnc as encoder inputfile "tcp://*:9000" bitrate 96 id 1 diff --git a/src/ConfigParser.cpp b/src/ConfigParser.cpp index f0ab13c..6d67b7b 100644 --- a/src/ConfigParser.cpp +++ b/src/ConfigParser.cpp @@ -736,6 +736,9 @@ static void setup_subchannel_from_ptree(DabSubchannel* subchan, etiLog.level(warn) << "Using untested ipc:// zeromq input"; } } + else if (proto == "sti-rtp") { + subchan->input = make_shared<Inputs::Sti_d_Rtp>(); + } else { stringstream ss; ss << "Subchannel with uid " << subchanuid << diff --git a/src/input/Udp.cpp b/src/input/Udp.cpp index a238d9b..5d11d1c 100644 --- a/src/input/Udp.cpp +++ b/src/input/Udp.cpp @@ -2,7 +2,7 @@ Copyright (C) 2009 Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2016 + Copyright (C) 2017 Matthias P. Braendli, matthias.braendli@mpb.li http://www.opendigitalradio.org @@ -54,6 +54,16 @@ int Udp::open(const std::string& name) throw invalid_argument(ss.str()); } + m_name = name; + + openUdpSocket(endpoint); + + return 0; +} + +void Udp::openUdpSocket(const std::string& endpoint) +{ + const auto colon_pos = endpoint.find_first_of(":"); const auto address = endpoint.substr(0, colon_pos); const auto port_str = endpoint.substr(colon_pos + 1); @@ -83,14 +93,10 @@ int Udp::open(const std::string& name) ss << "Could not set non-blocking UDP socket: " << inetErrMsg; throw runtime_error(ss.str()); } - - return 0; } int Udp::readFrame(uint8_t* buffer, size_t size) { - uint8_t* data = reinterpret_cast<uint8_t*>(buffer); - // Regardless of buffer contents, try receiving data. UdpPacket packet; int ret = m_sock.receive(packet); @@ -107,10 +113,10 @@ int Udp::readFrame(uint8_t* buffer, size_t size) // Take data from the buffer if it contains enough data, // in any case write the buffer if (m_buffer.size() >= (size_t)size) { - std::copy(m_buffer.begin(), m_buffer.begin() + size, data); + std::copy(m_buffer.begin(), m_buffer.begin() + size, buffer); } else { - memset(data, 0x0, size); + memset(buffer, 0x0, size); } return size; @@ -131,4 +137,199 @@ int Udp::close() return m_sock.close(); } -}; + +// ETSI EN 300 797 V1.2.1 ch 8.2.1.2 +#define STI_SYNC_LEN 3 +static const uint8_t STI_FSync0[STI_SYNC_LEN] = { 0x1F, 0x90, 0xCA }; +static const uint8_t STI_FSync1[STI_SYNC_LEN] = { 0xE0, 0x6F, 0x35 }; + +static const size_t RTP_HEADER_LEN = 12; + +static bool stiSyncValid(const uint8_t *buf) +{ + return (memcmp(buf+1, STI_FSync0, sizeof(STI_FSync0)) == 0) or + (memcmp(buf+1, STI_FSync1, sizeof(STI_FSync1)) == 0); +} + +static bool rtpHeaderValid(const uint8_t *buf) +{ + uint32_t version = (buf[0] & 0xC0) >> 6; + uint32_t payloadType = (buf[1] & 0x7F); + return (version == 2 and payloadType == 34); +} + +static uint16_t unpack2(const uint8_t *buf) +{ + return (buf[0] << 8) | buf[1]; +} + +int Sti_d_Rtp::open(const std::string& name) +{ + // Skip the sti-rtp:// part if it is present + const string endpoint = (name.substr(0, 10) == "sti-rtp://") ? + name.substr(10) : name; + + // The endpoint should be address:port + const auto colon_pos = endpoint.find_first_of(":"); + if (colon_pos == string::npos) { + stringstream ss; + ss << "'" << name << + " is an invalid format for sti-rtp address: " + "expected [sti-rtp://]address:port"; + throw invalid_argument(ss.str()); + } + + m_name = name; + + openUdpSocket(endpoint); + + return 0; +} + +void Sti_d_Rtp::receive_packet() +{ + UdpPacket packet; + int ret = m_sock.receive(packet); + + if (ret == -1) { + stringstream ss; + ss << "Could not read from UDP socket: " << inetErrMsg; + throw runtime_error(ss.str()); + } + + if (packet.getSize() == 0) { + // No packet was received + return; + } + + const size_t STI_FC_LEN = 8; + + if (packet.getSize() < RTP_HEADER_LEN + STI_SYNC_LEN + STI_FC_LEN) { + etiLog.level(info) << "Received too small RTP packet for " << + m_name; + return; + } + + if (not rtpHeaderValid(packet.getData())) { + etiLog.level(info) << "Received invalid RTP header for " << + m_name; + return; + } + + // STI(PI, X) + size_t index = RTP_HEADER_LEN; + const uint8_t *buf = packet.getData(); + + // SYNC + index++; // Advance over STAT + + // FSYNC + if (not stiSyncValid(buf + index)) { + etiLog.level(info) << "Received invalid STI-D header for " << + m_name; + return; + } + index += 3; + + // TFH + // DFS + uint16_t DFS = unpack2(buf+index); + index += 4; + if (DFS == 0) { + etiLog.level(info) << "Received STI data with DFS=0 for " << + m_name; + return; + } + if (packet.getSize() < index + DFS) { + etiLog.level(info) << "Received STI too small for given DFS for " << + m_name; + return; + } + + // CFS + uint32_t CFS = unpack2(buf+index); + index += 2; + if (CFS != 0) { + etiLog.level(info) << "Ignoring STI control data for " << + m_name; + } + + // FD + // STAT + // ERR + index += 1; + + // FC + // SPID + index += 2; + // rfa DL + index += 2; + // rfa + index += 1; + + // DFCT + uint32_t DFCTL = buf[index]; + index += 1; + uint32_t DFCTH = buf[index] >> 3; + uint32_t NST = unpack2(buf+index) & 0x7FF; // 11 bits + index += 2; + + if (packet.getSize() < index + 4*NST) { + etiLog.level(info) << "Received STI too small to contain NST for " << + m_name; + return; + } + + if (NST == 0) { + etiLog.level(info) << "Received STI with NST=0 for " << + m_name; + return; + } + else { + // Take the first stream even if NST > 1 + uint32_t STL = unpack2(buf+index) & 0x1FFF; // 13 bits + uint32_t CRCSTF = buf[index+3] & 0x80 >> 7; // 7th bit + index += NST*4+4; + + const size_t dataSize = STL - 2*CRCSTF; + const size_t frameNumber = DFCTH*250 + DFCTL; + // TODO must align framenumber with ETI + + vec_u8 data(dataSize); + copy(buf+index, buf+index+dataSize, data.begin()); + + // TODO reordering + m_queue.push_back(data); + } + + if (NST > 1) { + etiLog.level(info) << "Ignoring STI supernumerary STC streams for " << + m_name; + } +} + +int Sti_d_Rtp::readFrame(uint8_t* buffer, size_t size) +{ + // Make sure we fill faster than we consume in case there + // are pending packets. + receive_packet(); + receive_packet(); + + if (m_queue.empty()) { + memset(buffer, 0x0, size); + } + else if (m_queue.front().size() != size) { + etiLog.level(warn) << "Invalid input data size for STI " << m_name << + " : RX " << m_queue.front().size() << " expected " << size; + memset(buffer, 0x0, size); + m_queue.pop_front(); + } + else { + copy(m_queue.front().begin(), m_queue.front().end(), buffer); + m_queue.pop_front(); + } + + return 0; +} + +} diff --git a/src/input/Udp.h b/src/input/Udp.h index 379dbf3..dc01486 100644 --- a/src/input/Udp.h +++ b/src/input/Udp.h @@ -2,7 +2,7 @@ Copyright (C) 2009 Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2016 + Copyright (C) 2017 Matthias P. Braendli, matthias.braendli@mpb.li http://www.opendigitalradio.org @@ -28,11 +28,16 @@ #include <string> #include <vector> +#include <deque> +#include <boost/thread.hpp> #include "input/inputs.h" #include "UdpSocket.h" namespace Inputs { +/* A Udp input that takes incoming datagrams, concatenates them + * together and gives them back. + */ class Udp : public InputBase { public: virtual int open(const std::string& name); @@ -40,13 +45,35 @@ class Udp : public InputBase { virtual int setBitrate(int bitrate); virtual int close(); - private: + protected: UdpSocket m_sock; + std::string m_name; + + void openUdpSocket(const std::string& endpoint); + private: // The content of the UDP packets gets written into the // buffer, and the UDP packet boundaries disappear there. std::vector<uint8_t> m_buffer; }; +/* An input for STI-D(LI) carried in STI(PI, X) inside RTP inside UDP. + * Reorders incoming datagrams which must contain an RTP header and valid + * STI-D data. + * + * This is intended to be compatible with encoders from AVT. + */ +class Sti_d_Rtp : public Udp { + using vec_u8 = std::vector<uint8_t>; + + public: + virtual int open(const std::string& name); + virtual int readFrame(uint8_t* buffer, size_t size); + + private: + void receive_packet(void); + std::deque<vec_u8> m_queue; +}; + }; |