diff options
Diffstat (limited to 'src/input')
| -rw-r--r-- | src/input/Edi.cpp | 221 | ||||
| -rw-r--r-- | src/input/Edi.h | 83 | ||||
| -rw-r--r-- | src/input/Udp.cpp | 59 | ||||
| -rw-r--r-- | src/input/Udp.h | 4 | 
4 files changed, 324 insertions, 43 deletions
| diff --git a/src/input/Edi.cpp b/src/input/Edi.cpp new file mode 100644 index 0000000..765a355 --- /dev/null +++ b/src/input/Edi.cpp @@ -0,0 +1,221 @@ +/* +   Copyright (C) 2009 Her Majesty the Queen in Right of Canada (Communications +   Research Center Canada) + +   Copyright (C) 2019 +   Matthias P. Braendli, matthias.braendli@mpb.li + +    http://www.opendigitalradio.org +   */ +/* +   This file is part of ODR-DabMux. + +   ODR-DabMux is free software: you can redistribute it and/or modify +   it under the terms of the GNU General Public License as +   published by the Free Software Foundation, either version 3 of the +   License, or (at your option) any later version. + +   ODR-DabMux is distributed in the hope that it will be useful, +   but WITHOUT ANY WARRANTY; without even the implied warranty of +   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +   GNU General Public License for more details. + +   You should have received a copy of the GNU General Public License +   along with ODR-DabMux.  If not, see <http://www.gnu.org/licenses/>. +   */ + +#include "input/Edi.h" + +#include <regex> +#include <chrono> +#include <stdexcept> +#include <sstream> +#include <cstring> +#include <cstdlib> +#include <cerrno> +#include <climits> +#include "utils.h" + +using namespace std; + +namespace Inputs { + +constexpr bool VERBOSE = false; +constexpr size_t TCP_BLOCKSIZE = 2048; +constexpr size_t MAX_FRAMES_QUEUED = 1000; + +Edi::Edi() : +    m_tcp_receive_server(TCP_BLOCKSIZE), +    m_sti_writer(), +    m_sti_decoder(m_sti_writer, VERBOSE) +{ } + +Edi::~Edi() { +    m_running = false; +    if (m_thread.joinable()) { +        m_thread.join(); +    } +} + +int Edi::open(const std::string& name) +{ +    const std::regex re_udp("udp://:([0-9]+)"); +    const std::regex re_tcp("tcp://(.*):([0-9]+)"); + +    lock_guard<mutex> lock(m_mutex); + +    m_running = false; +    if (m_thread.joinable()) { +        m_thread.join(); +    } + +    std::smatch m; +    if (std::regex_match(name, m, re_udp)) { +        const int udp_port = std::stoi(m[1].str()); +        m_input_used = InputUsed::UDP; +        m_udp_sock.reinit(udp_port); +        m_udp_sock.setBlocking(false); +        // TODO multicast +    } +    else if (std::regex_match(name, m, re_tcp)) { +        m_input_used = InputUsed::TCP; +        const string addr = m[1].str(); +        const int tcp_port = std::stoi(m[2].str()); +        m_tcp_receive_server.start(tcp_port, addr); +    } +    else { +        throw runtime_error("Cannot parse EDI input URI"); +    } + +    m_name = name; + +    m_running = true; +    m_thread = std::thread(&Edi::m_run, this); + +    return 0; +} + +int Edi::readFrame(uint8_t* buffer, size_t size) +{ +    if (m_pending_sti_frame.frame.empty()) { +        m_frames.try_pop(m_pending_sti_frame); +    } + +    if (not m_pending_sti_frame.frame.empty()) { +        if (m_pending_sti_frame.frame.size() != size) { +            etiLog.level(debug) << "EDI input " << m_name << " size mismatch: " << +                m_pending_sti_frame.frame.size() << " received, " << size << " requested"; +            memset(buffer, 0, size * sizeof(*buffer)); +        } +        else { +            const auto now = chrono::system_clock::now(); + +            if (m_pending_sti_frame.timestamp.to_system_clock() <= now) { +                etiLog.level(debug) << "EDI input take frame with TS " << +                    m_pending_sti_frame.timestamp.to_string() << " queue size " << m_frames.size(); + +                std::copy(m_pending_sti_frame.frame.cbegin(), m_pending_sti_frame.frame.cend(), buffer); +                m_pending_sti_frame.frame.clear(); +            } +            else { +                etiLog.level(debug) << "EDI input skip frame with TS " << +                    m_pending_sti_frame.timestamp.to_string() << " queue size " << m_frames.size(); +            } +        } +    } + +    return size; + +#if 0 +    EdiDecoder::sti_frame_t sti; +    if (m_is_prebuffering) { +        m_is_prebuffering = m_frames.size() < 10; +        if (not m_is_prebuffering) { +            etiLog.level(info) << "EDI input " << m_name << " pre-buffering complete."; +        } +    } +    else if (m_frames.try_pop(sti)) { +        if (sti.frame.size() == 0) { +            etiLog.level(debug) << "EDI input " << m_name << " empty frame"; +            return 0; +        } +        else if (sti.frame.size() == size) { +            std::copy(sti.frame.cbegin(), sti.frame.cend(), buffer); +        } +        else { +            etiLog.level(debug) << "EDI input " << m_name << " size mismatch: " << sti.frame.size() << +                " received, " << size << " requested"; +            memset(buffer, 0, size * sizeof(*buffer)); +        } +    } +    else { +        memset(buffer, 0, size * sizeof(*buffer)); +        m_is_prebuffering = true; +        etiLog.level(info) << "EDI input " << m_name << " re-enabling pre-buffering"; +    } +    return size; +#endif +} + +void Edi::m_run() +{ +    while (m_running) { +        bool work_done = false; + +        switch (m_input_used) { +            case InputUsed::UDP: +                { +                    constexpr size_t packsize = 2048; +                    const auto packet = m_udp_sock.receive(packsize); +                    if (packet.buffer.size() == packsize) { +                        fprintf(stderr, "Warning, possible UDP truncation\n"); +                    } +                    if (not packet.buffer.empty()) { +                        m_sti_decoder.push_packet(packet.buffer); +                        work_done = true; +                    } +                } +                break; +            case InputUsed::TCP: +                { +                    auto packet = m_tcp_receive_server.receive(); +                    if (not packet.empty()) { +                        m_sti_decoder.push_bytes(packet); +                        work_done = true; +                    } +                } +                break; +            default: +                throw logic_error("unimplemented input"); +        } + +        const auto sti = m_sti_writer.getFrame(); +        if (not sti.frame.empty()) { +            m_frames.push_wait_if_full(move(sti), MAX_FRAMES_QUEUED); +            work_done = true; +        } + +        if (not work_done) { +            // Avoid fast loop +            this_thread::sleep_for(chrono::milliseconds(12)); +        } +    } +} + +int Edi::setBitrate(int bitrate) +{ +    if (bitrate <= 0) { +        etiLog.level(error) << "Invalid bitrate (" << bitrate << ") for " << m_name; +        return -1; +    } + +    return bitrate; +} + +int Edi::close() +{ +    m_udp_sock.close(); +    return 0; +} + +} diff --git a/src/input/Edi.h b/src/input/Edi.h new file mode 100644 index 0000000..66ff682 --- /dev/null +++ b/src/input/Edi.h @@ -0,0 +1,83 @@ +/* +   Copyright (C) 2009 Her Majesty the Queen in Right of Canada (Communications +   Research Center Canada) + +   Copyright (C) 2019 +   Matthias P. Braendli, matthias.braendli@mpb.li + +    http://www.opendigitalradio.org +   */ +/* +   This file is part of ODR-DabMux. + +   ODR-DabMux is free software: you can redistribute it and/or modify +   it under the terms of the GNU General Public License as +   published by the Free Software Foundation, either version 3 of the +   License, or (at your option) any later version. + +   ODR-DabMux is distributed in the hope that it will be useful, +   but WITHOUT ANY WARRANTY; without even the implied warranty of +   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +   GNU General Public License for more details. + +   You should have received a copy of the GNU General Public License +   along with ODR-DabMux.  If not, see <http://www.gnu.org/licenses/>. +   */ + +#pragma once + +#include <string> +#include <vector> +#include <deque> +#include <thread> +#include <mutex> +#include "Socket.h" +#include "input/inputs.h" +#include "edi/STIDecoder.hpp" +#include "edi/STIWriter.hpp" +#include "ThreadsafeQueue.h" + +namespace Inputs { + +/* + * Receives EDI from UDP or TCP in a separate thread and pushes that data + * into the STIDecoder. Complete frames are then put into a queue for the consumer. + * + * This way, the EDI decoding happens in a separate thread. + */ +class Edi : public InputBase { +    public: +        Edi(); +        Edi(const Edi&) = delete; +        Edi& operator=(const Edi&) = delete; +        ~Edi(); + +        virtual int open(const std::string& name); +        virtual int readFrame(uint8_t* buffer, size_t size); +        virtual int setBitrate(int bitrate); +        virtual int close(); + +    protected: +        void m_run(); + +        std::mutex m_mutex; + +        enum class InputUsed { Invalid, UDP, TCP }; +        InputUsed m_input_used = InputUsed::Invalid; +        Socket::UDPSocket m_udp_sock; +        Socket::TCPReceiveServer m_tcp_receive_server; + +        EdiDecoder::STIWriter m_sti_writer; +        EdiDecoder::STIDecoder m_sti_decoder; +        std::thread m_thread; +        std::atomic<bool> m_running = ATOMIC_VAR_INIT(false); +        ThreadsafeQueue<EdiDecoder::sti_frame_t> m_frames; +        EdiDecoder::sti_frame_t m_pending_sti_frame; + +        bool m_is_prebuffering = true; + +        std::string m_name; +}; + +}; + diff --git a/src/input/Udp.cpp b/src/input/Udp.cpp index 2cb49e7..5d4f964 100644 --- a/src/input/Udp.cpp +++ b/src/input/Udp.cpp @@ -82,17 +82,8 @@ void Udp::openUdpSocket(const std::string& endpoint)          throw out_of_range("can't use port number 0 in udp address");      } -    if (m_sock.reinit(port, address) == -1) { -        stringstream ss; -        ss << "Could not init UDP socket: " << inetErrMsg; -        throw runtime_error(ss.str()); -    } - -    if (m_sock.setBlocking(false) == -1) { -        stringstream ss; -        ss << "Could not set non-blocking UDP socket: " << inetErrMsg; -        throw runtime_error(ss.str()); -    } +    m_sock.reinit(port, address); +    m_sock.setBlocking(false);      etiLog.level(info) << "Opened UDP port " << address << ":" << port;  } @@ -100,17 +91,9 @@ void Udp::openUdpSocket(const std::string& endpoint)  int Udp::readFrame(uint8_t* buffer, size_t size)  {      // Regardless of buffer contents, try receiving data. -    UdpPacket packet(32768); -    int ret = m_sock.receive(packet); - -    if (ret == -1) { -        stringstream ss; -        ss << "Could not read from UDP socket: " << inetErrMsg; -        throw runtime_error(ss.str()); -    } +    auto packet = m_sock.receive(32768); -    std::copy(packet.getData(), packet.getData() + packet.getSize(), -            back_inserter(m_buffer)); +    std::copy(packet.buffer.cbegin(), packet.buffer.cend(), back_inserter(m_buffer));      // Take data from the buffer if it contains enough data,      // in any case write the buffer @@ -136,7 +119,8 @@ int Udp::setBitrate(int bitrate)  int Udp::close()  { -    return m_sock.close(); +    m_sock.close(); +    return 0;  } @@ -167,8 +151,8 @@ static uint16_t unpack2(const uint8_t *buf)  int Sti_d_Rtp::open(const std::string& name)  { -    // Skip the sti-rtp:// part if it is present -    const string endpoint = (name.substr(0, 10) == "sti-rtp://") ? +    // Skip the rtp:// part if it is present +    const string endpoint = (name.substr(0, 10) == "rtp://") ?          name.substr(10) : name;      // The endpoint should be address:port @@ -176,8 +160,8 @@ int Sti_d_Rtp::open(const std::string& name)      if (colon_pos == string::npos) {          stringstream ss;          ss << "'" << name << -                " is an invalid format for sti-rtp address: " -                "expected [sti-rtp://]address:port"; +                " is an invalid format for rtp address: " +                "expected [rtp://]address:port";          throw invalid_argument(ss.str());      } @@ -190,29 +174,22 @@ int Sti_d_Rtp::open(const std::string& name)  void Sti_d_Rtp::receive_packet()  { -    UdpPacket packet(32768); -    int ret = m_sock.receive(packet); - -    if (ret == -1) { -        stringstream ss; -        ss << "Could not read from UDP socket: " << inetErrMsg; -        throw runtime_error(ss.str()); -    } +    auto packet = m_sock.receive(32768); -    if (packet.getSize() == 0) { +    if (packet.buffer.empty()) {          // No packet was received          return;      }      const size_t STI_FC_LEN = 8; -    if (packet.getSize() < RTP_HEADER_LEN + STI_SYNC_LEN + STI_FC_LEN) { +    if (packet.buffer.size() < RTP_HEADER_LEN + STI_SYNC_LEN + STI_FC_LEN) {          etiLog.level(info) << "Received too small RTP packet for " <<              m_name;          return;      } -    if (not rtpHeaderValid(packet.getData())) { +    if (not rtpHeaderValid(packet.buffer.data())) {          etiLog.level(info) << "Received invalid RTP header for " <<              m_name;          return; @@ -220,7 +197,7 @@ void Sti_d_Rtp::receive_packet()      //  STI(PI, X)      size_t index = RTP_HEADER_LEN; -    const uint8_t *buf = packet.getData(); +    const uint8_t *buf = packet.buffer.data();      //   SYNC      index++; // Advance over STAT @@ -242,7 +219,7 @@ void Sti_d_Rtp::receive_packet()              m_name;          return;      } -    if (packet.getSize() < index + DFS) { +    if (packet.buffer.size() < index + DFS) {          etiLog.level(info) << "Received STI too small for given DFS for " <<              m_name;          return; @@ -270,9 +247,9 @@ void Sti_d_Rtp::receive_packet()      uint16_t NST   = unpack2(buf+index) & 0x7FF; // 11 bits      index += 2; -    if (packet.getSize() < index + 4*NST) { +    if (packet.buffer.size() < index + 4*NST) {          etiLog.level(info) << "Received STI too small to contain NST for " << -            m_name << " packet: " << packet.getSize() << " need " << +            m_name << " packet: " << packet.buffer.size() << " need " <<              index + 4*NST;          return;      } diff --git a/src/input/Udp.h b/src/input/Udp.h index dc01486..dd637c6 100644 --- a/src/input/Udp.h +++ b/src/input/Udp.h @@ -31,7 +31,7 @@  #include <deque>  #include <boost/thread.hpp>  #include "input/inputs.h" -#include "UdpSocket.h" +#include "Socket.h"  namespace Inputs { @@ -46,7 +46,7 @@ class Udp : public InputBase {          virtual int close();      protected: -        UdpSocket m_sock; +        Socket::UDPSocket m_sock;          std::string m_name;          void openUdpSocket(const std::string& endpoint); | 
