aboutsummaryrefslogtreecommitdiffstats
path: root/src/input
diff options
context:
space:
mode:
authorMatthias P. Braendli <matthias.braendli@mpb.li>2019-08-21 10:11:35 +0200
committerMatthias P. Braendli <matthias.braendli@mpb.li>2019-08-21 10:11:35 +0200
commit86ea8cd8b8b5af7917db28ae30cfb2d2886868fe (patch)
tree7222d8e077dd2155eecac68b8c78330bcfe5dc80 /src/input
parent86fbf91f7323a2c5626a357b8414b15e20c19c9e (diff)
parent5ee85c4ac41337e383eb1a735bc05f1e5d46a98f (diff)
downloaddabmux-86ea8cd8b8b5af7917db28ae30cfb2d2886868fe.tar.gz
dabmux-86ea8cd8b8b5af7917db28ae30cfb2d2886868fe.tar.bz2
dabmux-86ea8cd8b8b5af7917db28ae30cfb2d2886868fe.zip
Merge branch 'ediInput' into next
Diffstat (limited to 'src/input')
-rw-r--r--src/input/Edi.cpp221
-rw-r--r--src/input/Edi.h83
-rw-r--r--src/input/Udp.cpp59
-rw-r--r--src/input/Udp.h4
4 files changed, 324 insertions, 43 deletions
diff --git a/src/input/Edi.cpp b/src/input/Edi.cpp
new file mode 100644
index 0000000..765a355
--- /dev/null
+++ b/src/input/Edi.cpp
@@ -0,0 +1,221 @@
+/*
+ Copyright (C) 2009 Her Majesty the Queen in Right of Canada (Communications
+ Research Center Canada)
+
+ Copyright (C) 2019
+ Matthias P. Braendli, matthias.braendli@mpb.li
+
+ http://www.opendigitalradio.org
+ */
+/*
+ This file is part of ODR-DabMux.
+
+ ODR-DabMux is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as
+ published by the Free Software Foundation, either version 3 of the
+ License, or (at your option) any later version.
+
+ ODR-DabMux is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include "input/Edi.h"
+
+#include <regex>
+#include <chrono>
+#include <stdexcept>
+#include <sstream>
+#include <cstring>
+#include <cstdlib>
+#include <cerrno>
+#include <climits>
+#include "utils.h"
+
+using namespace std;
+
+namespace Inputs {
+
+constexpr bool VERBOSE = false;
+constexpr size_t TCP_BLOCKSIZE = 2048;
+constexpr size_t MAX_FRAMES_QUEUED = 1000;
+
+Edi::Edi() :
+ m_tcp_receive_server(TCP_BLOCKSIZE),
+ m_sti_writer(),
+ m_sti_decoder(m_sti_writer, VERBOSE)
+{ }
+
+Edi::~Edi() {
+ m_running = false;
+ if (m_thread.joinable()) {
+ m_thread.join();
+ }
+}
+
+int Edi::open(const std::string& name)
+{
+ const std::regex re_udp("udp://:([0-9]+)");
+ const std::regex re_tcp("tcp://(.*):([0-9]+)");
+
+ lock_guard<mutex> lock(m_mutex);
+
+ m_running = false;
+ if (m_thread.joinable()) {
+ m_thread.join();
+ }
+
+ std::smatch m;
+ if (std::regex_match(name, m, re_udp)) {
+ const int udp_port = std::stoi(m[1].str());
+ m_input_used = InputUsed::UDP;
+ m_udp_sock.reinit(udp_port);
+ m_udp_sock.setBlocking(false);
+ // TODO multicast
+ }
+ else if (std::regex_match(name, m, re_tcp)) {
+ m_input_used = InputUsed::TCP;
+ const string addr = m[1].str();
+ const int tcp_port = std::stoi(m[2].str());
+ m_tcp_receive_server.start(tcp_port, addr);
+ }
+ else {
+ throw runtime_error("Cannot parse EDI input URI");
+ }
+
+ m_name = name;
+
+ m_running = true;
+ m_thread = std::thread(&Edi::m_run, this);
+
+ return 0;
+}
+
+int Edi::readFrame(uint8_t* buffer, size_t size)
+{
+ if (m_pending_sti_frame.frame.empty()) {
+ m_frames.try_pop(m_pending_sti_frame);
+ }
+
+ if (not m_pending_sti_frame.frame.empty()) {
+ if (m_pending_sti_frame.frame.size() != size) {
+ etiLog.level(debug) << "EDI input " << m_name << " size mismatch: " <<
+ m_pending_sti_frame.frame.size() << " received, " << size << " requested";
+ memset(buffer, 0, size * sizeof(*buffer));
+ }
+ else {
+ const auto now = chrono::system_clock::now();
+
+ if (m_pending_sti_frame.timestamp.to_system_clock() <= now) {
+ etiLog.level(debug) << "EDI input take frame with TS " <<
+ m_pending_sti_frame.timestamp.to_string() << " queue size " << m_frames.size();
+
+ std::copy(m_pending_sti_frame.frame.cbegin(), m_pending_sti_frame.frame.cend(), buffer);
+ m_pending_sti_frame.frame.clear();
+ }
+ else {
+ etiLog.level(debug) << "EDI input skip frame with TS " <<
+ m_pending_sti_frame.timestamp.to_string() << " queue size " << m_frames.size();
+ }
+ }
+ }
+
+ return size;
+
+#if 0
+ EdiDecoder::sti_frame_t sti;
+ if (m_is_prebuffering) {
+ m_is_prebuffering = m_frames.size() < 10;
+ if (not m_is_prebuffering) {
+ etiLog.level(info) << "EDI input " << m_name << " pre-buffering complete.";
+ }
+ }
+ else if (m_frames.try_pop(sti)) {
+ if (sti.frame.size() == 0) {
+ etiLog.level(debug) << "EDI input " << m_name << " empty frame";
+ return 0;
+ }
+ else if (sti.frame.size() == size) {
+ std::copy(sti.frame.cbegin(), sti.frame.cend(), buffer);
+ }
+ else {
+ etiLog.level(debug) << "EDI input " << m_name << " size mismatch: " << sti.frame.size() <<
+ " received, " << size << " requested";
+ memset(buffer, 0, size * sizeof(*buffer));
+ }
+ }
+ else {
+ memset(buffer, 0, size * sizeof(*buffer));
+ m_is_prebuffering = true;
+ etiLog.level(info) << "EDI input " << m_name << " re-enabling pre-buffering";
+ }
+ return size;
+#endif
+}
+
+void Edi::m_run()
+{
+ while (m_running) {
+ bool work_done = false;
+
+ switch (m_input_used) {
+ case InputUsed::UDP:
+ {
+ constexpr size_t packsize = 2048;
+ const auto packet = m_udp_sock.receive(packsize);
+ if (packet.buffer.size() == packsize) {
+ fprintf(stderr, "Warning, possible UDP truncation\n");
+ }
+ if (not packet.buffer.empty()) {
+ m_sti_decoder.push_packet(packet.buffer);
+ work_done = true;
+ }
+ }
+ break;
+ case InputUsed::TCP:
+ {
+ auto packet = m_tcp_receive_server.receive();
+ if (not packet.empty()) {
+ m_sti_decoder.push_bytes(packet);
+ work_done = true;
+ }
+ }
+ break;
+ default:
+ throw logic_error("unimplemented input");
+ }
+
+ const auto sti = m_sti_writer.getFrame();
+ if (not sti.frame.empty()) {
+ m_frames.push_wait_if_full(move(sti), MAX_FRAMES_QUEUED);
+ work_done = true;
+ }
+
+ if (not work_done) {
+ // Avoid fast loop
+ this_thread::sleep_for(chrono::milliseconds(12));
+ }
+ }
+}
+
+int Edi::setBitrate(int bitrate)
+{
+ if (bitrate <= 0) {
+ etiLog.level(error) << "Invalid bitrate (" << bitrate << ") for " << m_name;
+ return -1;
+ }
+
+ return bitrate;
+}
+
+int Edi::close()
+{
+ m_udp_sock.close();
+ return 0;
+}
+
+}
diff --git a/src/input/Edi.h b/src/input/Edi.h
new file mode 100644
index 0000000..66ff682
--- /dev/null
+++ b/src/input/Edi.h
@@ -0,0 +1,83 @@
+/*
+ Copyright (C) 2009 Her Majesty the Queen in Right of Canada (Communications
+ Research Center Canada)
+
+ Copyright (C) 2019
+ Matthias P. Braendli, matthias.braendli@mpb.li
+
+ http://www.opendigitalradio.org
+ */
+/*
+ This file is part of ODR-DabMux.
+
+ ODR-DabMux is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as
+ published by the Free Software Foundation, either version 3 of the
+ License, or (at your option) any later version.
+
+ ODR-DabMux is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#pragma once
+
+#include <string>
+#include <vector>
+#include <deque>
+#include <thread>
+#include <mutex>
+#include "Socket.h"
+#include "input/inputs.h"
+#include "edi/STIDecoder.hpp"
+#include "edi/STIWriter.hpp"
+#include "ThreadsafeQueue.h"
+
+namespace Inputs {
+
+/*
+ * Receives EDI from UDP or TCP in a separate thread and pushes that data
+ * into the STIDecoder. Complete frames are then put into a queue for the consumer.
+ *
+ * This way, the EDI decoding happens in a separate thread.
+ */
+class Edi : public InputBase {
+ public:
+ Edi();
+ Edi(const Edi&) = delete;
+ Edi& operator=(const Edi&) = delete;
+ ~Edi();
+
+ virtual int open(const std::string& name);
+ virtual int readFrame(uint8_t* buffer, size_t size);
+ virtual int setBitrate(int bitrate);
+ virtual int close();
+
+ protected:
+ void m_run();
+
+ std::mutex m_mutex;
+
+ enum class InputUsed { Invalid, UDP, TCP };
+ InputUsed m_input_used = InputUsed::Invalid;
+ Socket::UDPSocket m_udp_sock;
+ Socket::TCPReceiveServer m_tcp_receive_server;
+
+ EdiDecoder::STIWriter m_sti_writer;
+ EdiDecoder::STIDecoder m_sti_decoder;
+ std::thread m_thread;
+ std::atomic<bool> m_running = ATOMIC_VAR_INIT(false);
+ ThreadsafeQueue<EdiDecoder::sti_frame_t> m_frames;
+ EdiDecoder::sti_frame_t m_pending_sti_frame;
+
+ bool m_is_prebuffering = true;
+
+ std::string m_name;
+};
+
+};
+
diff --git a/src/input/Udp.cpp b/src/input/Udp.cpp
index 2cb49e7..5d4f964 100644
--- a/src/input/Udp.cpp
+++ b/src/input/Udp.cpp
@@ -82,17 +82,8 @@ void Udp::openUdpSocket(const std::string& endpoint)
throw out_of_range("can't use port number 0 in udp address");
}
- if (m_sock.reinit(port, address) == -1) {
- stringstream ss;
- ss << "Could not init UDP socket: " << inetErrMsg;
- throw runtime_error(ss.str());
- }
-
- if (m_sock.setBlocking(false) == -1) {
- stringstream ss;
- ss << "Could not set non-blocking UDP socket: " << inetErrMsg;
- throw runtime_error(ss.str());
- }
+ m_sock.reinit(port, address);
+ m_sock.setBlocking(false);
etiLog.level(info) << "Opened UDP port " << address << ":" << port;
}
@@ -100,17 +91,9 @@ void Udp::openUdpSocket(const std::string& endpoint)
int Udp::readFrame(uint8_t* buffer, size_t size)
{
// Regardless of buffer contents, try receiving data.
- UdpPacket packet(32768);
- int ret = m_sock.receive(packet);
-
- if (ret == -1) {
- stringstream ss;
- ss << "Could not read from UDP socket: " << inetErrMsg;
- throw runtime_error(ss.str());
- }
+ auto packet = m_sock.receive(32768);
- std::copy(packet.getData(), packet.getData() + packet.getSize(),
- back_inserter(m_buffer));
+ std::copy(packet.buffer.cbegin(), packet.buffer.cend(), back_inserter(m_buffer));
// Take data from the buffer if it contains enough data,
// in any case write the buffer
@@ -136,7 +119,8 @@ int Udp::setBitrate(int bitrate)
int Udp::close()
{
- return m_sock.close();
+ m_sock.close();
+ return 0;
}
@@ -167,8 +151,8 @@ static uint16_t unpack2(const uint8_t *buf)
int Sti_d_Rtp::open(const std::string& name)
{
- // Skip the sti-rtp:// part if it is present
- const string endpoint = (name.substr(0, 10) == "sti-rtp://") ?
+ // Skip the rtp:// part if it is present
+ const string endpoint = (name.substr(0, 10) == "rtp://") ?
name.substr(10) : name;
// The endpoint should be address:port
@@ -176,8 +160,8 @@ int Sti_d_Rtp::open(const std::string& name)
if (colon_pos == string::npos) {
stringstream ss;
ss << "'" << name <<
- " is an invalid format for sti-rtp address: "
- "expected [sti-rtp://]address:port";
+ " is an invalid format for rtp address: "
+ "expected [rtp://]address:port";
throw invalid_argument(ss.str());
}
@@ -190,29 +174,22 @@ int Sti_d_Rtp::open(const std::string& name)
void Sti_d_Rtp::receive_packet()
{
- UdpPacket packet(32768);
- int ret = m_sock.receive(packet);
-
- if (ret == -1) {
- stringstream ss;
- ss << "Could not read from UDP socket: " << inetErrMsg;
- throw runtime_error(ss.str());
- }
+ auto packet = m_sock.receive(32768);
- if (packet.getSize() == 0) {
+ if (packet.buffer.empty()) {
// No packet was received
return;
}
const size_t STI_FC_LEN = 8;
- if (packet.getSize() < RTP_HEADER_LEN + STI_SYNC_LEN + STI_FC_LEN) {
+ if (packet.buffer.size() < RTP_HEADER_LEN + STI_SYNC_LEN + STI_FC_LEN) {
etiLog.level(info) << "Received too small RTP packet for " <<
m_name;
return;
}
- if (not rtpHeaderValid(packet.getData())) {
+ if (not rtpHeaderValid(packet.buffer.data())) {
etiLog.level(info) << "Received invalid RTP header for " <<
m_name;
return;
@@ -220,7 +197,7 @@ void Sti_d_Rtp::receive_packet()
// STI(PI, X)
size_t index = RTP_HEADER_LEN;
- const uint8_t *buf = packet.getData();
+ const uint8_t *buf = packet.buffer.data();
// SYNC
index++; // Advance over STAT
@@ -242,7 +219,7 @@ void Sti_d_Rtp::receive_packet()
m_name;
return;
}
- if (packet.getSize() < index + DFS) {
+ if (packet.buffer.size() < index + DFS) {
etiLog.level(info) << "Received STI too small for given DFS for " <<
m_name;
return;
@@ -270,9 +247,9 @@ void Sti_d_Rtp::receive_packet()
uint16_t NST = unpack2(buf+index) & 0x7FF; // 11 bits
index += 2;
- if (packet.getSize() < index + 4*NST) {
+ if (packet.buffer.size() < index + 4*NST) {
etiLog.level(info) << "Received STI too small to contain NST for " <<
- m_name << " packet: " << packet.getSize() << " need " <<
+ m_name << " packet: " << packet.buffer.size() << " need " <<
index + 4*NST;
return;
}
diff --git a/src/input/Udp.h b/src/input/Udp.h
index dc01486..dd637c6 100644
--- a/src/input/Udp.h
+++ b/src/input/Udp.h
@@ -31,7 +31,7 @@
#include <deque>
#include <boost/thread.hpp>
#include "input/inputs.h"
-#include "UdpSocket.h"
+#include "Socket.h"
namespace Inputs {
@@ -46,7 +46,7 @@ class Udp : public InputBase {
virtual int close();
protected:
- UdpSocket m_sock;
+ Socket::UDPSocket m_sock;
std::string m_name;
void openUdpSocket(const std::string& endpoint);