From 03967733d70220e2de7af3cdad320aec5c82ede1 Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Tue, 25 Jun 2019 10:50:23 +0200 Subject: Add more EDI input improvements --- src/input/Edi.cpp | 189 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ src/input/Edi.h | 82 +++++++++++++++++++++++ 2 files changed, 271 insertions(+) create mode 100644 src/input/Edi.cpp create mode 100644 src/input/Edi.h (limited to 'src/input') diff --git a/src/input/Edi.cpp b/src/input/Edi.cpp new file mode 100644 index 0000000..8aee296 --- /dev/null +++ b/src/input/Edi.cpp @@ -0,0 +1,189 @@ +/* + Copyright (C) 2009 Her Majesty the Queen in Right of Canada (Communications + Research Center Canada) + + Copyright (C) 2019 + Matthias P. Braendli, matthias.braendli@mpb.li + + http://www.opendigitalradio.org + */ +/* + This file is part of ODR-DabMux. + + ODR-DabMux is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as + published by the Free Software Foundation, either version 3 of the + License, or (at your option) any later version. + + ODR-DabMux is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with ODR-DabMux. If not, see . + */ + +#include "input/Edi.h" + +#include +#include +#include +#include +#include +#include +#include +#include "utils.h" + +using namespace std; + +namespace Inputs { + +constexpr bool VERBOSE = false; +constexpr size_t TCP_BLOCKSIZE = 2048; +constexpr size_t MAX_FRAMES_QUEUED = 10; + +Edi::Edi() : + m_tcp_receive_server(TCP_BLOCKSIZE), + m_sti_writer(), + m_sti_decoder(m_sti_writer, VERBOSE) +{ } + +Edi::~Edi() { + m_running = false; + if (m_thread.joinable()) { + m_thread.join(); + } +} + +int Edi::open(const std::string& name) +{ + const std::regex re_udp("udp://:([0-9]+)"); + const std::regex re_tcp("tcp://(.*):([0-9]+)"); + + lock_guard lock(m_mutex); + + m_running = false; + if (m_thread.joinable()) { + m_thread.join(); + } + + std::smatch m; + if (std::regex_match(name, m, re_udp)) { + const int udp_port = std::stoi(m[1].str()); + m_input_used = InputUsed::UDP; + m_udp_sock.reinit(udp_port); + m_udp_sock.setBlocking(false); + // TODO multicast + } + else if (std::regex_match(name, m, re_tcp)) { + m_input_used = InputUsed::TCP; + const string addr = m[1].str(); + const int tcp_port = std::stoi(m[2].str()); + m_tcp_receive_server.start(tcp_port, addr); + } + else { + throw runtime_error("Cannot parse EDI input URI"); + } + + m_name = name; + + m_running = true; + m_thread = std::thread(&Edi::m_run, this); + + return 0; +} + +int Edi::readFrame(uint8_t* buffer, size_t size) +{ + vector frame; + if (m_is_prebuffering) { + m_is_prebuffering = m_frames.size() < 10; + if (not m_is_prebuffering) { + etiLog.level(info) << "EDI input " << m_name << " pre-buffering complete."; + } + } + else if (m_frames.try_pop(frame)) { + if (frame.size() == 0) { + etiLog.level(debug) << "EDI input " << m_name << " empty frame"; + return 0; + } + else if (frame.size() == size) { + std::copy(frame.cbegin(), frame.cend(), buffer); + } + else { + etiLog.level(debug) << "EDI input " << m_name << " size mismatch: " << frame.size() << + " received, " << size << " requested"; + memset(buffer, 0, size * sizeof(*buffer)); + } + } + else { + memset(buffer, 0, size * sizeof(*buffer)); + m_is_prebuffering = true; + etiLog.level(info) << "EDI input " << m_name << " re-enabling pre-buffering"; + } + return size; +} + +void Edi::m_run() +{ + while (m_running) { + bool work_done = false; + + switch (m_input_used) { + case InputUsed::UDP: + { + constexpr size_t packsize = 2048; + const auto packet = m_udp_sock.receive(packsize); + if (packet.buffer.size() == packsize) { + fprintf(stderr, "Warning, possible UDP truncation\n"); + } + if (not packet.buffer.empty()) { + m_sti_decoder.push_packet(packet.buffer); + work_done = true; + } + } + break; + case InputUsed::TCP: + { + auto packet = m_tcp_receive_server.receive(); + if (not packet.empty()) { + m_sti_decoder.push_bytes(packet); + work_done = true; + } + } + break; + default: + throw logic_error("unimplemented input"); + } + + const auto sti = m_sti_writer.getFrame(); + if (not sti.frame.empty()) { + m_frames.push_wait_if_full(move(sti.frame), MAX_FRAMES_QUEUED); + work_done = true; + } + + if (not work_done) { + // Avoid fast loop + this_thread::sleep_for(chrono::milliseconds(12)); + } + } +} + +int Edi::setBitrate(int bitrate) +{ + if (bitrate <= 0) { + etiLog.level(error) << "Invalid bitrate (" << bitrate << ") for " << m_name; + return -1; + } + + return bitrate; +} + +int Edi::close() +{ + m_udp_sock.close(); + return 0; +} + +} diff --git a/src/input/Edi.h b/src/input/Edi.h new file mode 100644 index 0000000..7b3dc04 --- /dev/null +++ b/src/input/Edi.h @@ -0,0 +1,82 @@ +/* + Copyright (C) 2009 Her Majesty the Queen in Right of Canada (Communications + Research Center Canada) + + Copyright (C) 2019 + Matthias P. Braendli, matthias.braendli@mpb.li + + http://www.opendigitalradio.org + */ +/* + This file is part of ODR-DabMux. + + ODR-DabMux is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as + published by the Free Software Foundation, either version 3 of the + License, or (at your option) any later version. + + ODR-DabMux is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with ODR-DabMux. If not, see . + */ + +#pragma once + +#include +#include +#include +#include +#include +#include "Socket.h" +#include "input/inputs.h" +#include "edi/STIDecoder.hpp" +#include "edi/STIWriter.hpp" +#include "ThreadsafeQueue.h" + +namespace Inputs { + +/* + * Receives EDI from UDP or TCP in a separate thread and pushes that data + * into the STIDecoder. Complete frames are then put into a queue for the consumer. + * + * This way, the EDI decoding happens in a separate thread. + */ +class Edi : public InputBase { + public: + Edi(); + Edi(const Edi&) = delete; + Edi& operator=(const Edi&) = delete; + ~Edi(); + + virtual int open(const std::string& name); + virtual int readFrame(uint8_t* buffer, size_t size); + virtual int setBitrate(int bitrate); + virtual int close(); + + protected: + void m_run(); + + std::mutex m_mutex; + + enum class InputUsed { Invalid, UDP, TCP }; + InputUsed m_input_used = InputUsed::Invalid; + Socket::UDPSocket m_udp_sock; + Socket::TCPReceiveServer m_tcp_receive_server; + + EdiDecoder::STIWriter m_sti_writer; + EdiDecoder::STIDecoder m_sti_decoder; + std::thread m_thread; + std::atomic m_running = ATOMIC_VAR_INIT(false); + ThreadsafeQueue > m_frames; + + bool m_is_prebuffering = true; + + std::string m_name; +}; + +}; + -- cgit v1.2.3