/* Copyright (C) 2009 Her Majesty the Queen in Right of Canada (Communications Research Center Canada) Copyright (C) 2017 Matthias P. Braendli, matthias.braendli@mpb.li http://www.opendigitalradio.org */ /* This file is part of ODR-DabMux. ODR-DabMux is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. ODR-DabMux is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with ODR-DabMux. If not, see . */ #include "input/Udp.h" #include #include #include #include #include #include #include "utils.h" using namespace std; namespace Inputs { int Udp::open(const std::string& name) { // Skip the udp:// part if it is present const string endpoint = (name.substr(0, 6) == "udp://") ? name.substr(6) : name; // The endpoint should be address:port const auto colon_pos = endpoint.find_first_of(":"); if (colon_pos == string::npos) { stringstream ss; ss << "'" << name << " is an invalid format for udp address: " "expected [udp://]address:port"; throw invalid_argument(ss.str()); } m_name = name; openUdpSocket(endpoint); return 0; } void Udp::openUdpSocket(const std::string& endpoint) { const auto colon_pos = endpoint.find_first_of(":"); const auto address = endpoint.substr(0, colon_pos); const auto port_str = endpoint.substr(colon_pos + 1); const long port = strtol(port_str.c_str(), nullptr, 10); if ((port == LONG_MIN or port == LONG_MAX) and errno == ERANGE) { throw out_of_range("udp input: port out of range"); } else if (port == 0 and errno != 0) { stringstream ss; ss << "udp input port parse error: " << strerror(errno); throw invalid_argument(ss.str()); } if (port == 0) { throw out_of_range("can't use port number 0 in udp address"); } m_sock.reinit(port, address); m_sock.setBlocking(false); etiLog.level(info) << "Opened UDP port " << address << ":" << port; } int Udp::readFrame(uint8_t* buffer, size_t size) { // Regardless of buffer contents, try receiving data. auto packet = m_sock.receive(32768); std::copy(packet.buffer.cbegin(), packet.buffer.cend(), back_inserter(m_buffer)); // Take data from the buffer if it contains enough data, // in any case write the buffer if (m_buffer.size() >= (size_t)size) { std::copy(m_buffer.begin(), m_buffer.begin() + size, buffer); } else { memset(buffer, 0x0, size); } return size; } int Udp::setBitrate(int bitrate) { if (bitrate <= 0) { etiLog.log(error, "Invalid bitrate (%i)\n", bitrate); return -1; } return bitrate; } int Udp::close() { m_sock.close(); return 0; } // ETSI EN 300 797 V1.2.1 ch 8.2.1.2 #define STI_SYNC_LEN 3 static const uint8_t STI_FSync0[STI_SYNC_LEN] = { 0x1F, 0x90, 0xCA }; static const uint8_t STI_FSync1[STI_SYNC_LEN] = { 0xE0, 0x6F, 0x35 }; static const size_t RTP_HEADER_LEN = 12; static bool stiSyncValid(const uint8_t *buf) { return (memcmp(buf, STI_FSync0, sizeof(STI_FSync0)) == 0) or (memcmp(buf, STI_FSync1, sizeof(STI_FSync1)) == 0); } static bool rtpHeaderValid(const uint8_t *buf) { uint32_t version = (buf[0] & 0xC0) >> 6; uint32_t payloadType = (buf[1] & 0x7F); return (version == 2 and payloadType == 34); } static uint16_t unpack2(const uint8_t *buf) { return (((uint16_t)buf[0]) << 8) | buf[1]; } int Sti_d_Rtp::open(const std::string& name) { // Skip the rtp:// part if it is present const string endpoint = (name.substr(0, 10) == "rtp://") ? name.substr(10) : name; // The endpoint should be address:port const auto colon_pos = endpoint.find_first_of(":"); if (colon_pos == string::npos) { stringstream ss; ss << "'" << name << " is an invalid format for rtp address: " "expected [rtp://]address:port"; throw invalid_argument(ss.str()); } m_name = name; openUdpSocket(endpoint); return 0; } void Sti_d_Rtp::receive_packet() { auto packet = m_sock.receive(32768); if (packet.buffer.empty()) { // No packet was received return; } const size_t STI_FC_LEN = 8; if (packet.buffer.size() < RTP_HEADER_LEN + STI_SYNC_LEN + STI_FC_LEN) { etiLog.level(info) << "Received too small RTP packet for " << m_name; return; } if (not rtpHeaderValid(packet.buffer.data())) { etiLog.level(info) << "Received invalid RTP header for " << m_name; return; } // STI(PI, X) size_t index = RTP_HEADER_LEN; const uint8_t *buf = packet.buffer.data(); // SYNC index++; // Advance over STAT // FSYNC if (not stiSyncValid(buf + index)) { etiLog.level(info) << "Received invalid STI-D header for " << m_name; return; } index += 3; // TFH // DFS uint16_t DFS = unpack2(buf+index); index += 2; if (DFS == 0) { etiLog.level(info) << "Received STI data with DFS=0 for " << m_name; return; } if (packet.buffer.size() < index + DFS) { etiLog.level(info) << "Received STI too small for given DFS for " << m_name; return; } // CFS uint32_t CFS = unpack2(buf+index); index += 2; if (CFS != 0) { etiLog.level(info) << "Ignoring STI control data for " << m_name; } // SPID index += 2; // rfa DL index += 2; // rfa index += 1; // DFCT uint8_t DFCTL = buf[index]; index += 1; uint8_t DFCTH = buf[index] >> 3; uint16_t NST = unpack2(buf+index) & 0x7FF; // 11 bits index += 2; if (packet.buffer.size() < index + 4*NST) { etiLog.level(info) << "Received STI too small to contain NST for " << m_name << " packet: " << packet.buffer.size() << " need " << index + 4*NST; return; } if (NST == 0) { etiLog.level(info) << "Received STI with NST=0 for " << m_name; return; } else { // Take the first stream even if NST > 1 uint32_t STL = unpack2(buf+index) & 0x1FFF; // 13 bits uint32_t CRCSTF = buf[index+3] & 0x80 >> 7; // 7th bit index += NST*4+4; const size_t dataSize = STL - 2*CRCSTF; const size_t frameNumber = DFCTH*250 + DFCTL; (void)frameNumber; // TODO must align framenumber with ETI // TODO reordering vec_u8 data(dataSize); copy(buf+index, buf+index+dataSize, data.begin()); m_queue.push_back(data); } if (NST > 1) { etiLog.level(info) << "Ignoring STI supernumerary STC streams for " << m_name; } } int Sti_d_Rtp::readFrame(uint8_t* buffer, size_t size) { // Make sure we fill faster than we consume in case there // are pending packets. receive_packet(); receive_packet(); if (m_queue.empty()) { memset(buffer, 0x0, size); } else if (m_queue.front().size() != size) { etiLog.level(warn) << "Invalid input data size for STI " << m_name << " : RX " << m_queue.front().size() << " expected " << size; memset(buffer, 0x0, size); m_queue.pop_front(); } else { copy(m_queue.front().begin(), m_queue.front().end(), buffer); m_queue.pop_front(); } return 0; } }