summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/ConfigParser.cpp61
-rw-r--r--src/DabMultiplexer.h3
-rw-r--r--src/DabMux.cpp5
-rw-r--r--src/InetAddress.cpp155
-rw-r--r--src/InetAddress.h78
-rw-r--r--src/ReedSolomon.cpp116
-rw-r--r--src/ReedSolomon.h56
-rw-r--r--src/TcpSocket.cpp359
-rw-r--r--src/TcpSocket.h164
-rw-r--r--src/ThreadsafeQueue.h178
-rw-r--r--src/UdpSocket.cpp256
-rw-r--r--src/UdpSocket.h174
-rw-r--r--src/dabOutput/dabOutput.h21
-rw-r--r--src/dabOutput/dabOutputTcp.cpp2
-rw-r--r--src/dabOutput/dabOutputUdp.cpp65
-rw-r--r--src/dabOutput/edi/Config.h9
-rw-r--r--src/dabOutput/edi/PFT.cpp2
-rw-r--r--src/dabOutput/edi/Transport.cpp67
-rw-r--r--src/dabOutput/edi/Transport.h8
-rw-r--r--src/input/Edi.cpp189
-rw-r--r--src/input/Edi.h82
-rw-r--r--src/input/Udp.cpp59
-rw-r--r--src/input/Udp.h4
23 files changed, 414 insertions, 1699 deletions
diff --git a/src/ConfigParser.cpp b/src/ConfigParser.cpp
index fb49efc..3142bb3 100644
--- a/src/ConfigParser.cpp
+++ b/src/ConfigParser.cpp
@@ -40,6 +40,7 @@
#include "utils.h"
#include "DabMux.h"
#include "ManagementServer.h"
+#include "input/Edi.h"
#include "input/Prbs.h"
#include "input/Zmq.h"
#include "input/File.h"
@@ -876,34 +877,46 @@ static void setup_subchannel_from_ptree(shared_ptr<DabSubchannel>& subchan,
type = pt.get<string>("type");
}
catch (const ptree_error &e) {
- stringstream ss;
- ss << "Subchannel with uid " << subchanuid << " has no type defined!";
- throw runtime_error(ss.str());
+ throw runtime_error("Subchannel with uid " + subchanuid + " has no type defined!");
}
- /* Both inputfile and inputuri are supported, and are equivalent.
- * inputuri has precedence
+ /* Up to v2.3.1, both inputfile and inputuri are supported, and are
+ * equivalent. inputuri has precedence.
+ *
+ * After that, either inputfile or the (inputproto, inputuri) pair must be given, but not both.
*/
string inputUri = pt.get<string>("inputuri", "");
+ string proto = pt.get<string>("inputproto", "");
- if (inputUri == "") {
+ if (inputUri.empty() and proto.empty()) {
try {
+ /* Old approach, derives proto from scheme used in the URL.
+ * This makes it impossible to distinguish between ZMQ tcp:// and
+ * EDI tcp://
+ */
inputUri = pt.get<string>("inputfile");
+ size_t protopos = inputUri.find("://");
+
+ if (protopos == string::npos) {
+ proto = "file";
+ }
+ else {
+ proto = inputUri.substr(0, protopos);
+
+ if (proto == "tcp" or proto == "epgm" or proto == "ipc") {
+ proto = "zmq";
+ }
+ else if (proto == "sti-rtp") {
+ proto = "sti";
+ }
+ }
}
catch (const ptree_error &e) {
- stringstream ss;
- ss << "Subchannel with uid " << subchanuid << " has no inputUri defined!";
- throw runtime_error(ss.str());
+ throw runtime_error("Subchannel with uid " + subchanuid + " has no input defined!");
}
}
-
- string proto;
- size_t protopos = inputUri.find("://");
- if (protopos == string::npos) {
- proto = "file";
- }
- else {
- proto = inputUri.substr(0, protopos);
+ else if (inputUri.empty() or proto.empty()) {
+ throw runtime_error("Must define both inputuri and inputproto for uid " + subchanuid);
}
subchan->inputUri = inputUri;
@@ -928,7 +941,7 @@ static void setup_subchannel_from_ptree(shared_ptr<DabSubchannel>& subchan,
throw logic_error("Incomplete handling of file input");
}
}
- else if (proto == "tcp" or proto == "epgm" or proto == "ipc") {
+ else if (proto == "zmq") {
auto zmqconfig = setup_zmq_input(pt, subchanuid);
if (type == "audio") {
@@ -941,15 +954,11 @@ static void setup_subchannel_from_ptree(shared_ptr<DabSubchannel>& subchan,
rcs.enrol(inzmq.get());
subchan->input = inzmq;
}
-
- if (proto == "epgm") {
- etiLog.level(warn) << "Using untested epgm:// zeromq input";
- }
- else if (proto == "ipc") {
- etiLog.level(warn) << "Using untested ipc:// zeromq input";
- }
}
- else if (proto == "sti-rtp") {
+ else if (proto == "edi") {
+ subchan->input = make_shared<Inputs::Edi>();
+ }
+ else if (proto == "stp") {
subchan->input = make_shared<Inputs::Sti_d_Rtp>();
}
else {
diff --git a/src/DabMultiplexer.h b/src/DabMultiplexer.h
index 386c23c..d1075a6 100644
--- a/src/DabMultiplexer.h
+++ b/src/DabMultiplexer.h
@@ -37,8 +37,7 @@
#include "fig/FIGCarousel.h"
#include "crc.h"
#include "utils.h"
-#include "UdpSocket.h"
-#include "InetAddress.h"
+#include "Socket.h"
#include "PcDebug.h"
#include "MuxElements.h"
#include "RemoteControl.h"
diff --git a/src/DabMux.cpp b/src/DabMux.cpp
index 51f0310..578fc63 100644
--- a/src/DabMux.cpp
+++ b/src/DabMux.cpp
@@ -99,8 +99,7 @@ typedef DWORD32 uint32_t;
#include "dabOutput/dabOutput.h"
#include "crc.h"
-#include "UdpSocket.h"
-#include "InetAddress.h"
+#include "Socket.h"
#include "PcDebug.h"
#include "DabMux.h"
#include "MuxElements.h"
@@ -305,7 +304,7 @@ int main(int argc, char *argv[])
edi_conf.destinations.push_back(dest);
}
else if (proto == "tcp") {
- auto dest = make_shared<edi::tcp_destination_t>();
+ auto dest = make_shared<edi::tcp_server_t>();
dest->listen_port = pt_edi_dest.second.get<unsigned int>("listenport");
dest->max_frames_queued = pt_edi_dest.second.get<size_t>("max_frames_queued", 500);
edi_conf.destinations.push_back(dest);
diff --git a/src/InetAddress.cpp b/src/InetAddress.cpp
deleted file mode 100644
index 7660263..0000000
--- a/src/InetAddress.cpp
+++ /dev/null
@@ -1,155 +0,0 @@
-/*
- Copyright (C) 2003, 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the
- Queen in Right of Canada (Communications Research Center Canada)
-
- Copyright (C) 2016
- Matthias P. Braendli, matthias.braendli@mpb.li
-
- http://www.opendigitalradio.org
- */
-/*
- This file is part of ODR-DabMux.
-
- ODR-DabMux is free software: you can redistribute it and/or modify
- it under the terms of the GNU General Public License as
- published by the Free Software Foundation, either version 3 of the
- License, or (at your option) any later version.
-
- ODR-DabMux is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>.
- */
-
-#include "InetAddress.h"
-#include <iostream>
-#include <stdio.h>
-#include <errno.h>
-#include <string.h>
-
-#ifdef TRACE_ON
-# ifndef TRACE_CLASS
-# define TRACE_CLASS(clas, func) cout <<"-" <<(clas) <<"\t(" <<this <<")::" <<(func) <<endl
-# define TRACE_STATIC(clas, func) cout <<"-" <<(clas) <<"\t(static)::" <<(func) <<endl
-# endif
-#else
-# ifndef TRACE_CLASS
-# define TRACE_CLASS(clas, func)
-# define TRACE_STATIC(clas, func)
-# endif
-#endif
-
-
-int inetErrNo = 0;
-const char *inetErrMsg = nullptr;
-const char *inetErrDesc = nullptr;
-
-
-/**
- * Constructs an IP address.
- * @param port The port of this address
- * @param name The name of this address
- */
-InetAddress::InetAddress(int port, const char* name) {
- TRACE_CLASS("InetAddress", "InetAddress(int, char)");
- addr.sin_family = PF_INET;
- addr.sin_addr.s_addr = htons(INADDR_ANY);
- addr.sin_port = htons(port);
- if (name)
- setAddress(name);
-}
-
-
-/// Returns the raw IP address of this InetAddress object.
-sockaddr *InetAddress::getAddress() {
- TRACE_CLASS("InetAddress", "getAddress()");
- return (sockaddr *)&addr;
-}
-
-
-/// Return the port of this address.
-int InetAddress::getPort()
-{
- TRACE_CLASS("InetAddress", "getPort()");
- return ntohs(addr.sin_port);
-}
-
-
-/**
- * Returns the IP address string "%d.%d.%d.%d".
- * @return IP address
- */
-const char *InetAddress::getHostAddress() {
- TRACE_CLASS("InetAddress", "getHostAddress()");
- return inet_ntoa(addr.sin_addr);
-}
-
-
-/// Returns true if this address is multicast
-bool InetAddress::isMulticastAddress() {
- TRACE_CLASS("InetAddress", "isMulticastAddress()");
- return IN_MULTICAST(ntohl(addr.sin_addr.s_addr)); // a modifier
-}
-
-
-/**
- * Set the port number
- * @param port The new port number
- */
-void InetAddress::setPort(int port)
-{
- TRACE_CLASS("InetAddress", "setPort(int)");
- addr.sin_port = htons(port);
-}
-
-
-/**
- * Set the address
- * @param name The new address name
- * @return 0 if ok
- * -1 if error
- */
-int InetAddress::setAddress(const std::string& name)
-{
- TRACE_CLASS("InetAddress", "setAddress(string)");
- if (!name.empty()) {
- if (atoi(name.c_str())) { // If it start with a number
- if ((addr.sin_addr.s_addr = inet_addr(name.c_str())) == INADDR_NONE) {
- addr.sin_addr.s_addr = htons(INADDR_ANY);
- inetErrNo = 0;
- inetErrMsg = "Invalid address";
- inetErrDesc = name.c_str();
- return -1;
- }
- }
- else { // Assume it's a real name
- hostent *host = gethostbyname(name.c_str());
- if (host) {
- addr.sin_addr = *(in_addr *)(host->h_addr);
- } else {
- addr.sin_addr.s_addr = htons(INADDR_ANY);
- inetErrNo = 0;
- inetErrMsg = "Could not find address";
- inetErrDesc = name.c_str();
- return -1;
- }
- }
- }
- else {
- addr.sin_addr.s_addr = INADDR_ANY;
- }
- return 0;
-}
-
-
-void setInetError(const char* description)
-{
- inetErrNo = 0;
- inetErrNo = errno;
- inetErrMsg = strerror(inetErrNo);
- inetErrDesc = description;
-}
-
diff --git a/src/InetAddress.h b/src/InetAddress.h
deleted file mode 100644
index e246d4c..0000000
--- a/src/InetAddress.h
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- Copyright (C) 2003, 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the
- Queen in Right of Canada (Communications Research Center Canada)
-
- Copyright (C) 2016
- Matthias P. Braendli, matthias.braendli@mpb.li
-
- http://www.opendigitalradio.org
- */
-/*
- This file is part of ODR-DabMux.
-
- ODR-DabMux is free software: you can redistribute it and/or modify
- it under the terms of the GNU General Public License as
- published by the Free Software Foundation, either version 3 of the
- License, or (at your option) any later version.
-
- ODR-DabMux is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>.
- */
-
-#ifndef _InetAddress
-#define _InetAddress
-
-#ifdef HAVE_CONFIG_H
-# include "config.h"
-#endif
-
-#include <stdlib.h>
-#include <sys/socket.h>
-#include <netinet/in.h>
-#include <unistd.h>
-#include <netdb.h>
-#include <arpa/inet.h>
-#include <pthread.h>
-#include <string>
-
-#define SOCKET int
-#define INVALID_SOCKET -1
-#define INVALID_PORT -1
-
-
-/// The last error number
-extern int inetErrNo;
-/// The last error message
-extern const char *inetErrMsg;
-/// The description of the last error
-extern const char *inetErrDesc;
-/// Set the number, message and description of the last error
-void setInetError(const char* description);
-
-
-/**
- * This class represents an Internet Protocol (IP) address.
- * @author Pascal Charest pascal.charest@crc.ca
- */
-class InetAddress {
- public:
- InetAddress(int port = 0, const char* name = NULL);
-
- sockaddr *getAddress();
- const char *getHostAddress();
- int getPort();
- int setAddress(const std::string& name);
- void setPort(int port);
- bool isMulticastAddress();
-
- private:
- sockaddr_in addr;
-};
-
-
-#endif
diff --git a/src/ReedSolomon.cpp b/src/ReedSolomon.cpp
deleted file mode 100644
index 38d8ea8..0000000
--- a/src/ReedSolomon.cpp
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- Copyright (C) 2005, 2006, 2007, 2008, 2009 Her Majesty the Queen in Right
- of Canada (Communications Research Center Canada)
-
- Copyright (C) 2016
- Matthias P. Braendli, matthias.braendli@mpb.li
-
- http://www.opendigitalradio.org
- */
-/*
- This file is part of ODR-DabMux.
-
- ODR-DabMux is free software: you can redistribute it and/or modify
- it under the terms of the GNU General Public License as
- published by the Free Software Foundation, either version 3 of the
- License, or (at your option) any later version.
-
- ODR-DabMux is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>.
- */
-
-#include "ReedSolomon.h"
-#include <vector>
-#include <algorithm>
-#include <stdexcept>
-#include <sstream>
-#include <stdio.h> // For galois.h ...
-#include <string.h> // For memcpy
-
-extern "C" {
-#include "fec/fec.h"
-}
-#include <assert.h>
-
-#define SYMSIZE 8
-
-
-ReedSolomon::ReedSolomon(int N, int K, bool reverse, int gfpoly, int firstRoot, int primElem)
-{
- setReverse(reverse);
-
- m_N = N;
- m_K = K;
-
- const int symsize = SYMSIZE;
- const int nroots = N - K; // For EDI PFT, this must be 48
- const int pad = ((1 << symsize) - 1) - N; // is 255-N
-
- rsData = init_rs_char(symsize, gfpoly, firstRoot, primElem, nroots, pad);
-
- if (rsData == nullptr) {
- std::stringstream ss;
- ss << "Invalid Reed-Solomon parameters! " <<
- "N=" << N << " ; K=" << K << " ; pad=" << pad;
- throw std::invalid_argument(ss.str());
- }
-}
-
-
-ReedSolomon::~ReedSolomon()
-{
- free_rs_char(rsData);
-}
-
-
-void ReedSolomon::setReverse(bool state)
-{
- reverse = state;
-}
-
-
-int ReedSolomon::encode(void* data, void* fec, size_t size)
-{
- uint8_t* input = reinterpret_cast<uint8_t*>(data);
- uint8_t* output = reinterpret_cast<uint8_t*>(fec);
- int ret = 0;
-
- if (reverse) {
- std::vector<uint8_t> buffer(m_N);
-
- memcpy(&buffer[0], input, m_K);
- memcpy(&buffer[m_K], output, m_N - m_K);
-
- ret = decode_rs_char(rsData, &buffer[0], nullptr, 0);
- if ((ret != 0) && (ret != -1)) {
- memcpy(input, &buffer[0], m_K);
- memcpy(output, &buffer[m_K], m_N - m_K);
- }
- }
- else {
- encode_rs_char(rsData, input, output);
- }
-
- return ret;
-}
-
-
-int ReedSolomon::encode(void* data, size_t size)
-{
- uint8_t* input = reinterpret_cast<uint8_t*>(data);
- int ret = 0;
-
- if (reverse) {
- ret = decode_rs_char(rsData, input, nullptr, 0);
- }
- else {
- encode_rs_char(rsData, input, &input[m_K]);
- }
-
- return ret;
-}
diff --git a/src/ReedSolomon.h b/src/ReedSolomon.h
deleted file mode 100644
index abcef62..0000000
--- a/src/ReedSolomon.h
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- Copyright (C) 2005, 2006, 2007, 2008, 2009 Her Majesty the Queen in Right
- of Canada (Communications Research Center Canada)
-
- Copyright (C) 2016
- Matthias P. Braendli, matthias.braendli@mpb.li
-
- http://www.opendigitalradio.org
- */
-/*
- This file is part of ODR-DabMux.
-
- ODR-DabMux is free software: you can redistribute it and/or modify
- it under the terms of the GNU General Public License as
- published by the Free Software Foundation, either version 3 of the
- License, or (at your option) any later version.
-
- ODR-DabMux is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>.
- */
-
-#pragma once
-
-#ifdef HAVE_CONFIG_H
-# include <config.h>
-#endif
-
-#include <stdlib.h>
-
-class ReedSolomon
-{
-public:
- ReedSolomon(int N, int K,
- bool reverse = false,
- int gfpoly = 0x11d, int firstRoot = 0, int primElem = 1);
- ReedSolomon(const ReedSolomon& other) = delete;
- ReedSolomon operator=(const ReedSolomon& other) = delete;
- ~ReedSolomon();
-
- void setReverse(bool state);
- int encode(void* data, void* fec, size_t size);
- int encode(void* data, size_t size);
-
-private:
- int m_N;
- int m_K;
-
- void* rsData;
- bool reverse;
-};
-
diff --git a/src/TcpSocket.cpp b/src/TcpSocket.cpp
deleted file mode 100644
index 3ebe73c..0000000
--- a/src/TcpSocket.cpp
+++ /dev/null
@@ -1,359 +0,0 @@
-/*
- Copyright (C) 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the Queen in
- Right of Canada (Communications Research Center Canada)
-
- Copyright (C) 2019
- Matthias P. Braendli, matthias.braendli@mpb.li
-
- http://www.opendigitalradio.org
- */
-/*
- This file is part of ODR-DabMux.
-
- ODR-DabMux is free software: you can redistribute it and/or modify
- it under the terms of the GNU General Public License as
- published by the Free Software Foundation, either version 3 of the
- License, or (at your option) any later version.
-
- ODR-DabMux is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>.
- */
-
-#include "TcpSocket.h"
-#include "Log.h"
-#include <iostream>
-#include <cstdio>
-#include <cstring>
-#include <cstdint>
-#include <signal.h>
-#include <errno.h>
-#include <poll.h>
-#include <thread>
-
-using namespace std;
-
-using vec_u8 = std::vector<uint8_t>;
-
-
-TcpSocket::TcpSocket() :
- m_sock(INVALID_SOCKET)
-{
-}
-
-TcpSocket::TcpSocket(int port, const string& name) :
- m_sock(INVALID_SOCKET)
-{
- if (port) {
- if ((m_sock = socket(PF_INET, SOCK_STREAM, 0)) == INVALID_SOCKET) {
- throw std::runtime_error("Can't create socket");
- }
-
- reuseopt_t reuse = 1;
- if (setsockopt(m_sock, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse))
- == SOCKET_ERROR) {
- throw std::runtime_error("Can't reuse address");
- }
-
-#if defined(HAVE_SO_NOSIGPIPE)
- int val = 1;
- if (setsockopt(m_sock, SOL_SOCKET, SO_NOSIGPIPE, &val, sizeof(val))
- == SOCKET_ERROR) {
- throw std::runtime_error("Can't set SO_NOSIGPIPE");
- }
-#endif
-
- m_own_address.setAddress(name);
- m_own_address.setPort(port);
-
- if (::bind(m_sock, m_own_address.getAddress(), sizeof(sockaddr_in)) == SOCKET_ERROR) {
- ::close(m_sock);
- m_sock = INVALID_SOCKET;
- throw std::runtime_error("Can't bind socket");
- }
- }
-}
-
-TcpSocket::TcpSocket(SOCKET sock, InetAddress own, InetAddress remote) :
- m_own_address(own),
- m_remote_address(remote),
- m_sock(sock) { }
-
-// The move constructors must ensure the moved-from
-// TcpSocket won't destroy our socket handle
-TcpSocket::TcpSocket(TcpSocket&& other)
-{
- m_sock = other.m_sock;
- other.m_sock = INVALID_SOCKET;
-
- m_own_address = other.m_own_address;
- m_remote_address = other.m_remote_address;
-}
-
-TcpSocket& TcpSocket::operator=(TcpSocket&& other)
-{
- m_sock = other.m_sock;
- other.m_sock = INVALID_SOCKET;
-
- m_own_address = other.m_own_address;
- m_remote_address = other.m_remote_address;
- return *this;
-}
-
-/**
- * Close the underlying socket.
- * @return 0 if ok
- * -1 if error
- */
-int TcpSocket::close()
-{
- if (m_sock != INVALID_SOCKET) {
- int res = ::close(m_sock);
- if (res != 0) {
- setInetError("Can't close socket");
- return -1;
- }
- m_sock = INVALID_SOCKET;
- }
- return 0;
-}
-
-TcpSocket::~TcpSocket()
-{
- close();
-}
-
-bool TcpSocket::isValid()
-{
- return m_sock != INVALID_SOCKET;
-}
-
-ssize_t TcpSocket::recv(void* data, size_t size)
-{
- ssize_t ret = ::recv(m_sock, (char*)data, size, 0);
- if (ret == SOCKET_ERROR) {
- stringstream ss;
- ss << "TCP Socket recv error: " << strerror(errno);
- throw std::runtime_error(ss.str());
- }
- return ret;
-}
-
-
-ssize_t TcpSocket::send(const void* data, size_t size, int timeout_ms)
-{
- if (timeout_ms) {
- struct pollfd fds[1];
- fds[0].fd = m_sock;
- fds[0].events = POLLOUT;
-
- const int retval = poll(fds, 1, timeout_ms);
-
- if (retval == -1) {
- stringstream ss;
- ss << "TCP Socket send error on poll(): " << strerror(errno);
- throw std::runtime_error(ss.str());
- }
- else if (retval == 0) {
- // Timed out
- return 0;
- }
- }
-
- /* On Linux, the MSG_NOSIGNAL flag ensures that the process would not
- * receive a SIGPIPE and die.
- * Other systems have SO_NOSIGPIPE set on the socket for the same effect. */
-#if defined(HAVE_MSG_NOSIGNAL)
- const int flags = MSG_NOSIGNAL;
-#else
- const int flags = 0;
-#endif
- const ssize_t ret = ::send(m_sock, (const char*)data, size, flags);
-
- if (ret == SOCKET_ERROR) {
- stringstream ss;
- ss << "TCP Socket send error: " << strerror(errno);
- throw std::runtime_error(ss.str());
- }
- return ret;
-}
-
-void TcpSocket::listen()
-{
- if (::listen(m_sock, 1) == SOCKET_ERROR) {
- stringstream ss;
- ss << "TCP Socket listen error: " << strerror(errno);
- throw std::runtime_error(ss.str());
- }
-}
-
-TcpSocket TcpSocket::accept()
-{
- InetAddress remote_addr;
- socklen_t addrLen = sizeof(sockaddr_in);
-
- SOCKET socket = ::accept(m_sock, remote_addr.getAddress(), &addrLen);
- if (socket == SOCKET_ERROR) {
- stringstream ss;
- ss << "TCP Socket accept error: " << strerror(errno);
- throw std::runtime_error(ss.str());
- }
- else {
- TcpSocket client(socket, m_own_address, remote_addr);
- return client;
- }
-}
-
-TcpSocket TcpSocket::accept(int timeout_ms)
-{
- struct pollfd fds[1];
- fds[0].fd = m_sock;
- fds[0].events = POLLIN | POLLOUT;
-
- int retval = poll(fds, 1, timeout_ms);
-
- if (retval == -1) {
- stringstream ss;
- ss << "TCP Socket accept error: " << strerror(errno);
- throw std::runtime_error(ss.str());
- }
- else if (retval) {
- return accept();
- }
- else {
- TcpSocket invalidsock(0, "");
- return invalidsock;
- }
-}
-
-
-InetAddress TcpSocket::getOwnAddress() const
-{
- return m_own_address;
-}
-
-InetAddress TcpSocket::getRemoteAddress() const
-{
- return m_remote_address;
-}
-
-
-TCPConnection::TCPConnection(TcpSocket&& sock) :
- queue(),
- m_running(true),
- m_sender_thread(),
- m_sock(move(sock))
-{
- auto own_addr = m_sock.getOwnAddress();
- auto addr = m_sock.getRemoteAddress();
- etiLog.level(debug) << "New TCP Connection on port " <<
- own_addr.getPort() << " from " <<
- addr.getHostAddress() << ":" << addr.getPort();
- m_sender_thread = std::thread(&TCPConnection::process, this);
-}
-
-TCPConnection::~TCPConnection()
-{
- m_running = false;
- vec_u8 termination_marker;
- queue.push(termination_marker);
- m_sender_thread.join();
-}
-
-void TCPConnection::process()
-{
- while (m_running) {
- vec_u8 data;
- queue.wait_and_pop(data);
-
- if (data.empty()) {
- // empty vector is the termination marker
- m_running = false;
- break;
- }
-
- try {
- ssize_t remaining = data.size();
- const uint8_t *buf = reinterpret_cast<const uint8_t*>(data.data());
- const int timeout_ms = 10; // Less than one ETI frame
-
- while (m_running and remaining > 0) {
- const ssize_t sent = m_sock.send(buf, remaining, timeout_ms);
- if (sent < 0 or sent > remaining) {
- throw std::logic_error("Invalid TcpSocket::send() return value");
- }
- remaining -= sent;
- buf += sent;
- }
- }
- catch (const std::runtime_error& e) {
- m_running = false;
- }
- }
-
-
- auto own_addr = m_sock.getOwnAddress();
- auto addr = m_sock.getRemoteAddress();
- etiLog.level(debug) << "Dropping TCP Connection on port " <<
- own_addr.getPort() << " from " <<
- addr.getHostAddress() << ":" << addr.getPort();
-}
-
-
-TCPDataDispatcher::TCPDataDispatcher(size_t max_queue_size) :
- m_max_queue_size(max_queue_size)
-{
-}
-
-TCPDataDispatcher::~TCPDataDispatcher()
-{
- m_running = false;
- m_connections.clear();
- m_listener_socket.close();
- m_listener_thread.join();
-}
-
-void TCPDataDispatcher::start(int port, const string& address)
-{
- TcpSocket sock(port, address);
- m_listener_socket = move(sock);
-
- m_running = true;
- m_listener_thread = std::thread(&TCPDataDispatcher::process, this);
-}
-
-void TCPDataDispatcher::write(const vec_u8& data)
-{
- for (auto& connection : m_connections) {
- connection.queue.push(data);
- }
-
- m_connections.remove_if(
- [&](const TCPConnection& conn){ return conn.queue.size() > m_max_queue_size; });
-}
-
-void TCPDataDispatcher::process()
-{
- try {
- m_listener_socket.listen();
-
- const int timeout_ms = 1000;
-
- while (m_running) {
- // Add a new TCPConnection to the list, constructing it from the client socket
- auto sock = m_listener_socket.accept(timeout_ms);
- if (sock.isValid()) {
- m_connections.emplace(m_connections.begin(), move(sock));
- }
- }
- }
- catch (const std::runtime_error& e) {
- etiLog.level(error) << "TCPDataDispatcher caught runtime error: " << e.what();
- m_running = false;
- }
-}
-
diff --git a/src/TcpSocket.h b/src/TcpSocket.h
deleted file mode 100644
index ec7afd3..0000000
--- a/src/TcpSocket.h
+++ /dev/null
@@ -1,164 +0,0 @@
-/*
- Copyright (C) 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the Queen in
- Right of Canada (Communications Research Center Canada)
-
- Copyright (C) 2019
- Matthias P. Braendli, matthias.braendli@mpb.li
-
- http://www.opendigitalradio.org
- */
-/*
- This file is part of ODR-DabMux.
-
- ODR-DabMux is free software: you can redistribute it and/or modify
- it under the terms of the GNU General Public License as
- published by the Free Software Foundation, either version 3 of the
- License, or (at your option) any later version.
-
- ODR-DabMux is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>.
- */
-
-#ifndef _TCPSOCKET
-#define _TCPSOCKET
-
-#ifdef HAVE_CONFIG_H
-# include "config.h"
-#endif
-
-#include "InetAddress.h"
-#include "ThreadsafeQueue.h"
-#include <sys/socket.h>
-#include <netinet/in.h>
-#include <unistd.h>
-#include <netdb.h>
-#include <arpa/inet.h>
-#include <pthread.h>
-#define SOCKET int
-#define INVALID_SOCKET -1
-#define SOCKET_ERROR -1
-#define reuseopt_t int
-
-#include <iostream>
-#include <string>
-#include <vector>
-#include <memory>
-#include <atomic>
-#include <thread>
-#include <list>
-
-/**
- * This class represents a TCP socket.
- */
-class TcpSocket
-{
- public:
- /** Create a new socket that does nothing */
- TcpSocket();
-
- /** Create a new socket listening for incoming connections.
- * @param port The port number on which the socket will listen.
- * @param name The IP address on which the socket will be bound.
- * It is used to bind the socket on a specific interface if
- * the computer have many NICs.
- */
- TcpSocket(int port, const std::string& name);
- ~TcpSocket();
- TcpSocket(TcpSocket&& other);
- TcpSocket& operator=(TcpSocket&& other);
- TcpSocket(const TcpSocket& other) = delete;
- TcpSocket& operator=(const TcpSocket& other) = delete;
-
- bool isValid(void);
-
- int close(void);
-
- /** Send data over the TCP connection.
- * @param data The buffer that will be sent.
- * @param size Number of bytes to send.
- * @param timeout_ms number of milliseconds before timeout, or 0 for infinite timeout
- * return number of bytes sent, 0 on timeout, or throws runtime_error.
- */
- ssize_t send(const void* data, size_t size, int timeout_ms=0);
-
- /** Receive data from the socket.
- * @param data The buffer that will receive data.
- * @param size The buffer size.
- * @return number of bytes received or -1 (SOCKET_ERROR) if error
- */
- ssize_t recv(void* data, size_t size);
-
- void listen(void);
- TcpSocket accept(void);
-
- /* Returns either valid socket if a connection was
- * accepted before the timeout expired, or an invalid
- * socket otherwise.
- */
- TcpSocket accept(int timeout_ms);
-
- /** Retrieve address this socket is bound to */
- InetAddress getOwnAddress() const;
- InetAddress getRemoteAddress() const;
-
- private:
- TcpSocket(SOCKET sock, InetAddress own, InetAddress remote);
-
- /// The address on which the socket is bound.
- InetAddress m_own_address;
- InetAddress m_remote_address;
- /// The low-level socket used by system functions.
- SOCKET m_sock;
-};
-
-/* Helper class for TCPDataDispatcher, contains a queue of pending data and
- * a sender thread. */
-class TCPConnection
-{
- public:
- TCPConnection(TcpSocket&& sock);
- TCPConnection(const TCPConnection&) = delete;
- TCPConnection& operator=(const TCPConnection&) = delete;
- ~TCPConnection();
-
- ThreadsafeQueue<std::vector<uint8_t> > queue;
-
- private:
- std::atomic<bool> m_running;
- std::thread m_sender_thread;
- TcpSocket m_sock;
-
- void process(void);
-};
-
-/* Send a TCP stream to several destinations, and automatically disconnect destinations
- * whose buffer overflows.
- */
-class TCPDataDispatcher
-{
- public:
- TCPDataDispatcher(size_t max_queue_size);
- ~TCPDataDispatcher();
- TCPDataDispatcher(const TCPDataDispatcher&) = delete;
- TCPDataDispatcher& operator=(const TCPDataDispatcher&) = delete;
-
- void start(int port, const std::string& address);
- void write(const std::vector<uint8_t>& data);
-
- private:
- void process(void);
-
- size_t m_max_queue_size;
-
- std::atomic<bool> m_running;
- std::thread m_listener_thread;
- TcpSocket m_listener_socket;
- std::list<TCPConnection> m_connections;
-};
-
-#endif // _TCPSOCKET
diff --git a/src/ThreadsafeQueue.h b/src/ThreadsafeQueue.h
deleted file mode 100644
index ab287b2..0000000
--- a/src/ThreadsafeQueue.h
+++ /dev/null
@@ -1,178 +0,0 @@
-/*
- Copyright (C) 2007, 2008, 2009, 2010, 2011 Her Majesty the Queen in
- Right of Canada (Communications Research Center Canada)
-
- Copyright (C) 2018
- Matthias P. Braendli, matthias.braendli@mpb.li
-
- An implementation for a threadsafe queue, depends on C++11
-
- When creating a ThreadsafeQueue, one can specify the minimal number
- of elements it must contain before it is possible to take one
- element out.
- */
-/*
- This file is part of ODR-DabMux.
-
- ODR-DabMux is free software: you can redistribute it and/or modify
- it under the terms of the GNU General Public License as
- published by the Free Software Foundation, either version 3 of the
- License, or (at your option) any later version.
-
- ODR-DabMux is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>.
- */
-
-#pragma once
-
-#include <mutex>
-#include <condition_variable>
-#include <queue>
-#include <utility>
-
-/* This queue is meant to be used by two threads. One producer
- * that pushes elements into the queue, and one consumer that
- * retrieves the elements.
- *
- * The queue can make the consumer block until an element
- * is available, or a wakeup requested.
- */
-
-/* Class thrown by blocking pop to tell the consumer
- * that there's a wakeup requested. */
-class ThreadsafeQueueWakeup {};
-
-template<typename T>
-class ThreadsafeQueue
-{
-public:
- /* Push one element into the queue, and notify another thread that
- * might be waiting.
- *
- * returns the new queue size.
- */
- size_t push(T const& val)
- {
- std::unique_lock<std::mutex> lock(the_mutex);
- the_queue.push(val);
- size_t queue_size = the_queue.size();
- lock.unlock();
-
- the_rx_notification.notify_one();
-
- return queue_size;
- }
-
- size_t push(T&& val)
- {
- std::unique_lock<std::mutex> lock(the_mutex);
- the_queue.emplace(std::move(val));
- size_t queue_size = the_queue.size();
- lock.unlock();
-
- the_rx_notification.notify_one();
-
- return queue_size;
- }
-
- /* Push one element into the queue, but wait until the
- * queue size goes below the threshold.
- *
- * Notify waiting thread.
- *
- * returns the new queue size.
- */
- size_t push_wait_if_full(T const& val, size_t threshold)
- {
- std::unique_lock<std::mutex> lock(the_mutex);
- while (the_queue.size() >= threshold) {
- the_tx_notification.wait(lock);
- }
- the_queue.push(val);
- size_t queue_size = the_queue.size();
- lock.unlock();
-
- the_rx_notification.notify_one();
-
- return queue_size;
- }
-
- /* Trigger a wakeup event on a blocking consumer, which
- * will receive a ThreadsafeQueueWakeup exception.
- */
- void trigger_wakeup(void)
- {
- std::unique_lock<std::mutex> lock(the_mutex);
- wakeup_requested = true;
- lock.unlock();
- the_rx_notification.notify_one();
- }
-
- /* Send a notification for the receiver thread */
- void notify(void)
- {
- the_rx_notification.notify_one();
- }
-
- bool empty() const
- {
- std::unique_lock<std::mutex> lock(the_mutex);
- return the_queue.empty();
- }
-
- size_t size() const
- {
- std::unique_lock<std::mutex> lock(the_mutex);
- return the_queue.size();
- }
-
- bool try_pop(T& popped_value)
- {
- std::unique_lock<std::mutex> lock(the_mutex);
- if (the_queue.empty()) {
- return false;
- }
-
- popped_value = the_queue.front();
- the_queue.pop();
-
- lock.unlock();
- the_tx_notification.notify_one();
-
- return true;
- }
-
- void wait_and_pop(T& popped_value, size_t prebuffering = 1)
- {
- std::unique_lock<std::mutex> lock(the_mutex);
- while (the_queue.size() < prebuffering and
- not wakeup_requested) {
- the_rx_notification.wait(lock);
- }
-
- if (wakeup_requested) {
- wakeup_requested = false;
- throw ThreadsafeQueueWakeup();
- }
- else {
- std::swap(popped_value, the_queue.front());
- the_queue.pop();
-
- lock.unlock();
- the_tx_notification.notify_one();
- }
- }
-
-private:
- std::queue<T> the_queue;
- mutable std::mutex the_mutex;
- std::condition_variable the_rx_notification;
- std::condition_variable the_tx_notification;
- bool wakeup_requested = false;
-};
-
diff --git a/src/UdpSocket.cpp b/src/UdpSocket.cpp
deleted file mode 100644
index 3d015ec..0000000
--- a/src/UdpSocket.cpp
+++ /dev/null
@@ -1,256 +0,0 @@
-/*
- Copyright (C) 2003, 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the
- Queen in Right of Canada (Communications Research Center Canada)
-
- Copyright (C) 2017
- Matthias P. Braendli, matthias.braendli@mpb.li
-
- http://www.opendigitalradio.org
- */
-/*
- This file is part of ODR-DabMux.
-
- ODR-DabMux is free software: you can redistribute it and/or modify
- it under the terms of the GNU General Public License as
- published by the Free Software Foundation, either version 3 of the
- License, or (at your option) any later version.
-
- ODR-DabMux is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>.
- */
-
-#include "UdpSocket.h"
-
-#include <iostream>
-#include <stdio.h>
-#include <errno.h>
-#include <fcntl.h>
-#include <string.h>
-
-using namespace std;
-
-UdpSocket::UdpSocket() :
- listenSocket(INVALID_SOCKET)
-{
- reinit(0, "");
-}
-
-UdpSocket::UdpSocket(int port) :
- listenSocket(INVALID_SOCKET)
-{
- reinit(port, "");
-}
-
-UdpSocket::UdpSocket(int port, const std::string& name) :
- listenSocket(INVALID_SOCKET)
-{
- reinit(port, name);
-}
-
-
-int UdpSocket::setBlocking(bool block)
-{
- int res;
- if (block)
- res = fcntl(listenSocket, F_SETFL, 0);
- else
- res = fcntl(listenSocket, F_SETFL, O_NONBLOCK);
- if (res == SOCKET_ERROR) {
- setInetError("Can't change blocking state of socket");
- return -1;
- }
- return 0;
-}
-
-int UdpSocket::reinit(int port, const std::string& name)
-{
- if (listenSocket != INVALID_SOCKET) {
- ::close(listenSocket);
- }
-
- if ((listenSocket = socket(PF_INET, SOCK_DGRAM, 0)) == INVALID_SOCKET) {
- setInetError("Can't create socket");
- return -1;
- }
- reuseopt_t reuse = 1;
- if (setsockopt(listenSocket, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse))
- == SOCKET_ERROR) {
- setInetError("Can't reuse address");
- return -1;
- }
-
- if (port) {
- address.setAddress(name);
- address.setPort(port);
-
- if (::bind(listenSocket, address.getAddress(), sizeof(sockaddr_in)) == SOCKET_ERROR) {
- setInetError("Can't bind socket");
- ::close(listenSocket);
- listenSocket = INVALID_SOCKET;
- return -1;
- }
- }
- return 0;
-}
-
-int UdpSocket::close()
-{
- if (listenSocket != INVALID_SOCKET) {
- ::close(listenSocket);
- }
-
- listenSocket = INVALID_SOCKET;
-
- return 0;
-}
-
-UdpSocket::~UdpSocket()
-{
- if (listenSocket != INVALID_SOCKET) {
- ::close(listenSocket);
- }
-}
-
-
-int UdpSocket::receive(UdpPacket& packet)
-{
- socklen_t addrSize;
- addrSize = sizeof(*packet.getAddress().getAddress());
- ssize_t ret = recvfrom(listenSocket,
- packet.getData(),
- packet.getSize(),
- 0,
- packet.getAddress().getAddress(),
- &addrSize);
-
- if (ret == SOCKET_ERROR) {
- packet.setSize(0);
- if (errno == EAGAIN) {
- return 0;
- }
- setInetError("Can't receive UDP packet");
- return -1;
- }
-
- packet.setSize(ret);
- return 0;
-}
-
-int UdpSocket::send(UdpPacket& packet)
-{
- int ret = sendto(listenSocket, packet.getData(), packet.getSize(), 0,
- packet.getAddress().getAddress(), sizeof(*packet.getAddress().getAddress()));
- if (ret == SOCKET_ERROR && errno != ECONNREFUSED) {
- setInetError("Can't send UDP packet");
- return -1;
- }
- return 0;
-}
-
-
-int UdpSocket::send(const std::vector<uint8_t>& data, InetAddress destination)
-{
- int ret = sendto(listenSocket, &data[0], data.size(), 0,
- destination.getAddress(), sizeof(*destination.getAddress()));
- if (ret == SOCKET_ERROR && errno != ECONNREFUSED) {
- setInetError("Can't send UDP packet");
- return -1;
- }
- return 0;
-}
-
-
-/**
- * Must be called to receive data on a multicast address.
- * @param groupname The multica
-st address to join.
- * @return 0 if ok, -1 if error
- */
-int UdpSocket::joinGroup(char* groupname)
-{
- ip_mreqn group;
- if ((group.imr_multiaddr.s_addr = inet_addr(groupname)) == INADDR_NONE) {
- setInetError(groupname);
- return -1;
- }
- if (!IN_MULTICAST(ntohl(group.imr_multiaddr.s_addr))) {
- setInetError("Not a multicast address");
- return -1;
- }
- group.imr_address.s_addr = htons(INADDR_ANY);;
- group.imr_ifindex = 0;
- if (setsockopt(listenSocket, IPPROTO_IP, IP_ADD_MEMBERSHIP, &group, sizeof(group))
- == SOCKET_ERROR) {
- setInetError("Can't join multicast group");
- }
- return 0;
-}
-
-int UdpSocket::setMulticastTTL(int ttl)
-{
- if (setsockopt(listenSocket, IPPROTO_IP, IP_MULTICAST_TTL, &ttl, sizeof(ttl))
- == SOCKET_ERROR) {
- setInetError("Can't set ttl");
- return -1;
- }
-
- return 0;
-}
-
-int UdpSocket::setMulticastSource(const char* source_addr)
-{
- struct in_addr addr;
- if (inet_aton(source_addr, &addr) == 0) {
- setInetError("Can't parse source address");
- return -1;
- }
-
- if (setsockopt(listenSocket, IPPROTO_IP, IP_MULTICAST_IF, &addr, sizeof(addr))
- == SOCKET_ERROR) {
- setInetError("Can't set source address");
- return -1;
- }
-
- return 0;
-}
-
-UdpPacket::UdpPacket() { }
-
-UdpPacket::UdpPacket(size_t initSize) :
- m_buffer(initSize)
-{ }
-
-
-void UdpPacket::setSize(size_t newSize)
-{
- m_buffer.resize(newSize);
-}
-
-
-uint8_t* UdpPacket::getData()
-{
- return &m_buffer[0];
-}
-
-
-void UdpPacket::addData(const void *data, size_t size)
-{
- uint8_t *d = (uint8_t*)data;
- std::copy(d, d + size, std::back_inserter(m_buffer));
-}
-
-size_t UdpPacket::getSize()
-{
- return m_buffer.size();
-}
-
-InetAddress UdpPacket::getAddress()
-{
- return address;
-}
-
diff --git a/src/UdpSocket.h b/src/UdpSocket.h
deleted file mode 100644
index f51e87c..0000000
--- a/src/UdpSocket.h
+++ /dev/null
@@ -1,174 +0,0 @@
-/*
- Copyright (C) 2003, 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the
- Queen in Right of Canada (Communications Research Center Canada)
-
- Copyright (C) 2017
- Matthias P. Braendli, matthias.braendli@mpb.li
-
- http://www.opendigitalradio.org
- */
-/*
- This file is part of ODR-DabMux.
-
- ODR-DabMux is free software: you can redistribute it and/or modify
- it under the terms of the GNU General Public License as
- published by the Free Software Foundation, either version 3 of the
- License, or (at your option) any later version.
-
- ODR-DabMux is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>.
- */
-
-#pragma once
-
-#ifdef HAVE_CONFIG_H
-# include "config.h"
-#endif
-
-#include "InetAddress.h"
-#include <sys/socket.h>
-#include <netinet/in.h>
-#include <unistd.h>
-#include <netdb.h>
-#include <arpa/inet.h>
-#include <pthread.h>
-#define SOCKET int
-#define INVALID_SOCKET -1
-#define SOCKET_ERROR -1
-#define reuseopt_t int
-
-#include <stdlib.h>
-#include <iostream>
-#include <vector>
-
-class UdpPacket;
-
-
-/**
- * This class represents a socket for sending and receiving UDP packets.
- *
- * A UDP socket is the sending or receiving point for a packet delivery service.
- * Each packet sent or received on a datagram socket is individually
- * addressed and routed. Multiple packets sent from one machine to another may
- * be routed differently, and may arrive in any order.
- */
-class UdpSocket
-{
- public:
- /** Create a new socket that will not be bound to any port. To be used
- * for data output.
- */
- UdpSocket();
- /** Create a new socket.
- * @param port The port number on which the socket will be bound
- */
- UdpSocket(int port);
- /** Create a new socket.
- * @param port The port number on which the socket will be bound
- * @param name The IP address on which the socket will be bound.
- * It is used to bind the socket on a specific interface if
- * the computer have many NICs.
- */
- UdpSocket(int port, const std::string& name);
- ~UdpSocket();
- UdpSocket(const UdpSocket& other) = delete;
- const UdpSocket& operator=(const UdpSocket& other) = delete;
-
- /** reinitialise socket. Close the already open socket, and
- * create a new one
- */
- int reinit(int port, const std::string& name);
-
- /** Close the socket
- */
- int close(void);
-
- /** Send an UDP packet.
- * @param packet The UDP packet to be sent. It includes the data and the
- * destination address
- * return 0 if ok, -1 if error
- */
- int send(UdpPacket& packet);
-
- /** Send an UDP packet
- *
- * return 0 if ok, -1 if error
- */
- int send(const std::vector<uint8_t>& data, InetAddress destination);
-
- /** Receive an UDP packet.
- * @param packet The packet that will receive the data. The address will be set
- * to the source address.
- * @return 0 if ok, -1 if error
- */
- int receive(UdpPacket& packet);
-
- int joinGroup(char* groupname);
- int setMulticastSource(const char* source_addr);
- int setMulticastTTL(int ttl);
-
- /** Set blocking mode. By default, the socket is blocking.
- * @return 0 if ok
- * -1 if error
- */
- int setBlocking(bool block);
-
- protected:
-
- /// The address on which the socket is bound.
- InetAddress address;
- /// The low-level socket used by system functions.
- SOCKET listenSocket;
-};
-
-/** This class represents a UDP packet.
- *
- * A UDP packet contains a payload (sequence of bytes) and an address. For
- * outgoing packets, the address is the destination address. For incoming
- * packets, the address tells the user from what source the packet arrived from.
- */
-class UdpPacket
-{
- public:
- /** Construct an empty UDP packet.
- */
- UdpPacket();
- UdpPacket(size_t initSize);
-
- /** Give the pointer to data.
- * @return The pointer
- */
- uint8_t* getData(void);
-
- /** Append some data at the end of data buffer and adjust size.
- * @param data Pointer to the data to add
- * @param size Size in bytes of new data
- */
- void addData(const void *data, size_t size);
-
- size_t getSize(void);
-
- /** Changes size of the data buffer size. Keeps data intact unless
- * truncated.
- */
- void setSize(size_t newSize);
-
- /** Returns the UDP address of the packet.
- */
- InetAddress getAddress(void);
-
- const std::vector<uint8_t>& getBuffer(void) const {
- return m_buffer;
- }
-
-
- private:
- std::vector<uint8_t> m_buffer;
- InetAddress address;
-};
-
diff --git a/src/dabOutput/dabOutput.h b/src/dabOutput/dabOutput.h
index 9cc18d7..c7e570b 100644
--- a/src/dabOutput/dabOutput.h
+++ b/src/dabOutput/dabOutput.h
@@ -28,8 +28,7 @@
#pragma once
-#include "UdpSocket.h"
-#include "TcpSocket.h"
+#include "Socket.h"
#include "Log.h"
#include "string.h"
#include <stdexcept>
@@ -57,6 +56,8 @@ class DabOutput
{
return Open(name.c_str());
}
+
+ // Return -1 on failure
virtual int Write(void* buffer, int size) = 0;
virtual int Close() = 0;
@@ -145,15 +146,7 @@ class DabOutputRaw : public DabOutput
class DabOutputUdp : public DabOutput
{
public:
- DabOutputUdp() {
- packet_ = new UdpPacket(6144);
- socket_ = new UdpSocket();
- }
-
- virtual ~DabOutputUdp() {
- delete socket_;
- delete packet_;
- }
+ DabOutputUdp();
int Open(const char* name);
int Write(void* buffer, int size);
@@ -171,8 +164,8 @@ class DabOutputUdp : public DabOutput
DabOutputUdp operator=(const DabOutputUdp& other) = delete;
std::string uri_;
- UdpSocket* socket_;
- UdpPacket* packet_;
+ Socket::UDPSocket socket_;
+ Socket::UDPPacket packet_;
};
// -------------- TCP ------------------
@@ -190,7 +183,7 @@ class DabOutputTcp : public DabOutput
private:
std::string uri_;
- std::shared_ptr<TCPDataDispatcher> dispatcher_;
+ std::shared_ptr<Socket::TCPDataDispatcher> dispatcher_;
};
// -------------- Simul ------------------
diff --git a/src/dabOutput/dabOutputTcp.cpp b/src/dabOutput/dabOutputTcp.cpp
index 87dbfd5..4dc3538 100644
--- a/src/dabOutput/dabOutputTcp.cpp
+++ b/src/dabOutput/dabOutputTcp.cpp
@@ -94,7 +94,7 @@ int DabOutputTcp::Open(const char* name)
uri_ = name;
if (success) {
- dispatcher_ = make_shared<TCPDataDispatcher>(MAX_QUEUED_ETI_FRAMES);
+ dispatcher_ = make_shared<Socket::TCPDataDispatcher>(MAX_QUEUED_ETI_FRAMES);
dispatcher_->start(port, address);
}
else {
diff --git a/src/dabOutput/dabOutputUdp.cpp b/src/dabOutput/dabOutputUdp.cpp
index c129569..b9c22db 100644
--- a/src/dabOutput/dabOutputUdp.cpp
+++ b/src/dabOutput/dabOutputUdp.cpp
@@ -38,18 +38,12 @@
#include <cstdio>
#include <limits.h>
#include "dabOutput.h"
-#include "UdpSocket.h"
-
-#ifdef _WIN32
-# include <fscfg.h>
-# include <sdci.h>
-#else
-# include <netinet/in.h>
-# include <sys/types.h>
-# include <sys/socket.h>
-# include <sys/ioctl.h>
-# include <net/if_arp.h>
-#endif
+#include "Socket.h"
+
+DabOutputUdp::DabOutputUdp() :
+ socket_(),
+ packet_(6144)
+{ }
int DabOutputUdp::Open(const char* name)
{
@@ -64,12 +58,6 @@ int DabOutputUdp::Open(const char* name)
regex_constants::match_default)) {
string address = what[1];
- if (this->packet_->getAddress().setAddress(address.c_str()) == -1) {
- etiLog.level(error) << "can't set address " <<
- address << "(" << inetErrDesc << ": " << inetErrMsg << ")";
- return -1;
- }
-
string port_str = what[2];
long port = std::strtol(port_str.c_str(), nullptr, 0);
@@ -79,7 +67,7 @@ int DabOutputUdp::Open(const char* name)
return -1;
}
- this->packet_->getAddress().setPort(port);
+ packet_.address.resolveUdpDestination(address, port);
string query_params = what[3];
smatch query_what;
@@ -87,28 +75,25 @@ int DabOutputUdp::Open(const char* name)
regex_constants::match_default)) {
string src = query_what[1];
- int err = socket_->setMulticastSource(src.c_str());
- if (err) {
- etiLog.level(error) << "UDP output socket set source failed!";
- return -1;
- }
+ try {
+ socket_.setMulticastSource(src.c_str());
- string ttl_str = query_what[2];
+ string ttl_str = query_what[2];
- if (not ttl_str.empty()) {
- long ttl = std::strtol(ttl_str.c_str(), nullptr, 0);
- if ((ttl <= 0) || (ttl >= 255)) {
- etiLog.level(error) << "Invalid TTL setting in " <<
- uri_without_proto;
- return -1;
- }
+ if (not ttl_str.empty()) {
+ long ttl = std::strtol(ttl_str.c_str(), nullptr, 0);
+ if ((ttl <= 0) || (ttl >= 255)) {
+ etiLog.level(error) << "Invalid TTL setting in " <<
+ uri_without_proto;
+ return -1;
+ }
- err = socket_->setMulticastTTL(ttl);
- if (err) {
- etiLog.level(error) << "UDP output socket set TTL failed!";
- return -1;
+ socket_.setMulticastTTL(ttl);
}
}
+ catch (const std::runtime_error& e) {
+ etiLog.level(error) << "Failed to set UDP output settings" << e.what();
+ }
}
else if (not query_params.empty()) {
etiLog.level(error) << "UDP output: could not parse parameters " <<
@@ -129,9 +114,11 @@ int DabOutputUdp::Open(const char* name)
int DabOutputUdp::Write(void* buffer, int size)
{
- this->packet_->setSize(0);
- this->packet_->addData(buffer, size);
- return this->socket_->send(*this->packet_);
+ const uint8_t *buf = reinterpret_cast<uint8_t*>(buffer);
+ packet_.buffer.resize(0);
+ std::copy(buf, buf + size, std::back_inserter(packet_.buffer));
+ socket_.send(packet_);
+ return 0;
}
#endif // defined(HAVE_OUTPUT_UDP)
diff --git a/src/dabOutput/edi/Config.h b/src/dabOutput/edi/Config.h
index 55d5f0f..0c7dce8 100644
--- a/src/dabOutput/edi/Config.h
+++ b/src/dabOutput/edi/Config.h
@@ -50,11 +50,18 @@ struct udp_destination_t : public destination_t {
};
// TCP server that can accept multiple connections
-struct tcp_destination_t : public destination_t {
+struct tcp_server_t : public destination_t {
unsigned int listen_port = 0;
size_t max_frames_queued = 1024;
};
+// TCP client that connects to one endpoint
+struct tcp_client_t : public destination_t {
+ std::string dest_addr;
+ unsigned int dest_port = 0;
+ size_t max_frames_queued = 1024;
+};
+
struct configuration_t {
unsigned chunk_len = 207; // RSk, data length of each chunk
unsigned fec = 0; // number of fragments that can be recovered
diff --git a/src/dabOutput/edi/PFT.cpp b/src/dabOutput/edi/PFT.cpp
index 5b93016..63dfa34 100644
--- a/src/dabOutput/edi/PFT.cpp
+++ b/src/dabOutput/edi/PFT.cpp
@@ -314,7 +314,7 @@ std::vector< PFTFragment > PFT::Assemble(AFPacket af_packet)
#if 0
fprintf(stderr, "* PFT pseq %d, findex %d, fcount %d, plen %d\n",
- m_pseq, findex, fcount, plen & ~0x8000);
+ m_pseq, findex, fcount, plen & ~0xC000);
#endif
}
diff --git a/src/dabOutput/edi/Transport.cpp b/src/dabOutput/edi/Transport.cpp
index d99e987..187aabe 100644
--- a/src/dabOutput/edi/Transport.cpp
+++ b/src/dabOutput/edi/Transport.cpp
@@ -45,12 +45,16 @@ void configuration_t::print() const
}
etiLog.level(info) << " source port " << udp_dest->source_port;
}
- else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_destination_t>(edi_dest)) {
+ else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_server_t>(edi_dest)) {
etiLog.level(info) << " TCP listening on port " << tcp_dest->listen_port;
etiLog.level(info) << " max frames queued " << tcp_dest->max_frames_queued;
}
+ else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_client_t>(edi_dest)) {
+ etiLog.level(info) << " TCP client connecting to " << tcp_dest->dest_addr << ":" << tcp_dest->dest_port;
+ etiLog.level(info) << " max frames queued " << tcp_dest->max_frames_queued;
+ }
else {
- throw std::logic_error("EDI destination not implemented");
+ throw logic_error("EDI destination not implemented");
}
}
if (interleaver_enabled()) {
@@ -69,28 +73,27 @@ Sender::Sender(const configuration_t& conf) :
for (const auto& edi_dest : m_conf.destinations) {
if (const auto udp_dest = dynamic_pointer_cast<edi::udp_destination_t>(edi_dest)) {
- auto udp_socket = std::make_shared<UdpSocket>(udp_dest->source_port);
+ auto udp_socket = std::make_shared<Socket::UDPSocket>(udp_dest->source_port);
if (not udp_dest->source_addr.empty()) {
- int err = udp_socket->setMulticastSource(udp_dest->source_addr.c_str());
- if (err) {
- throw runtime_error("EDI socket set source failed!");
- }
- err = udp_socket->setMulticastTTL(udp_dest->ttl);
- if (err) {
- throw runtime_error("EDI socket set TTL failed!");
- }
+ udp_socket->setMulticastSource(udp_dest->source_addr.c_str());
+ udp_socket->setMulticastTTL(udp_dest->ttl);
}
udp_sockets.emplace(udp_dest.get(), udp_socket);
}
- else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_destination_t>(edi_dest)) {
- auto dispatcher = make_shared<TCPDataDispatcher>(tcp_dest->max_frames_queued);
+ else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_server_t>(edi_dest)) {
+ auto dispatcher = make_shared<Socket::TCPDataDispatcher>(tcp_dest->max_frames_queued);
dispatcher->start(tcp_dest->listen_port, "0.0.0.0");
tcp_dispatchers.emplace(tcp_dest.get(), dispatcher);
}
+ else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_client_t>(edi_dest)) {
+ auto tcp_socket = make_shared<Socket::TCPSocket>();
+ tcp_socket->connect(tcp_dest->dest_addr, tcp_dest->dest_port);
+ tcp_senders.emplace(tcp_dest.get(), tcp_socket);
+ }
else {
- throw std::logic_error("EDI destination not implemented");
+ throw logic_error("EDI destination not implemented");
}
}
@@ -117,7 +120,7 @@ void Sender::write(const TagPacket& tagpacket)
vector<edi::PFTFragment> edi_fragments = edi_pft.Assemble(af_packet);
if (m_conf.verbose) {
- fprintf(stderr, "EDI number of PFT fragment before interleaver %zu",
+ fprintf(stderr, "EDI number of PFT fragment before interleaver %zu\n",
edi_fragments.size());
}
@@ -129,28 +132,30 @@ void Sender::write(const TagPacket& tagpacket)
for (const auto& edi_frag : edi_fragments) {
for (auto& dest : m_conf.destinations) {
if (const auto& udp_dest = dynamic_pointer_cast<edi::udp_destination_t>(dest)) {
- InetAddress addr;
- addr.setAddress(udp_dest->dest_addr.c_str());
- addr.setPort(m_conf.dest_port);
+ Socket::InetAddress addr;
+ addr.resolveUdpDestination(udp_dest->dest_addr, m_conf.dest_port);
udp_sockets.at(udp_dest.get())->send(edi_frag, addr);
}
- else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_destination_t>(dest)) {
+ else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_server_t>(dest)) {
tcp_dispatchers.at(tcp_dest.get())->write(edi_frag);
}
+ else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_client_t>(dest)) {
+ tcp_senders.at(tcp_dest.get())->sendall(edi_frag.data(), edi_frag.size());
+ }
else {
- throw std::logic_error("EDI destination not implemented");
+ throw logic_error("EDI destination not implemented");
}
}
if (m_conf.dump) {
- std::ostream_iterator<uint8_t> debug_iterator(edi_debug_file);
- std::copy(edi_frag.begin(), edi_frag.end(), debug_iterator);
+ ostream_iterator<uint8_t> debug_iterator(edi_debug_file);
+ copy(edi_frag.begin(), edi_frag.end(), debug_iterator);
}
}
if (m_conf.verbose) {
- fprintf(stderr, "EDI number of PFT fragments %zu",
+ fprintf(stderr, "EDI number of PFT fragments %zu\n",
edi_fragments.size());
}
}
@@ -158,23 +163,25 @@ void Sender::write(const TagPacket& tagpacket)
// Send over ethernet
for (auto& dest : m_conf.destinations) {
if (const auto& udp_dest = dynamic_pointer_cast<edi::udp_destination_t>(dest)) {
- InetAddress addr;
- addr.setAddress(udp_dest->dest_addr.c_str());
- addr.setPort(m_conf.dest_port);
+ Socket::InetAddress addr;
+ addr.resolveUdpDestination(udp_dest->dest_addr, m_conf.dest_port);
udp_sockets.at(udp_dest.get())->send(af_packet, addr);
}
- else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_destination_t>(dest)) {
+ else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_server_t>(dest)) {
tcp_dispatchers.at(tcp_dest.get())->write(af_packet);
}
+ else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_client_t>(dest)) {
+ tcp_senders.at(tcp_dest.get())->sendall(af_packet.data(), af_packet.size());
+ }
else {
- throw std::logic_error("EDI destination not implemented");
+ throw logic_error("EDI destination not implemented");
}
}
if (m_conf.dump) {
- std::ostream_iterator<uint8_t> debug_iterator(edi_debug_file);
- std::copy(af_packet.begin(), af_packet.end(), debug_iterator);
+ ostream_iterator<uint8_t> debug_iterator(edi_debug_file);
+ copy(af_packet.begin(), af_packet.end(), debug_iterator);
}
}
}
diff --git a/src/dabOutput/edi/Transport.h b/src/dabOutput/edi/Transport.h
index 7b0a0db..9633275 100644
--- a/src/dabOutput/edi/Transport.h
+++ b/src/dabOutput/edi/Transport.h
@@ -32,11 +32,12 @@
#include "AFPacket.h"
#include "PFT.h"
#include "Interleaver.h"
+#include "Socket.h"
#include <vector>
#include <unordered_map>
#include <stdexcept>
+#include <fstream>
#include <cstdint>
-#include "dabOutput/dabOutput.h"
namespace edi {
@@ -61,8 +62,9 @@ class Sender {
// To mitigate for burst packet loss, PFT fragments can be sent out-of-order
edi::Interleaver edi_interleaver;
- std::unordered_map<udp_destination_t*, std::shared_ptr<UdpSocket>> udp_sockets;
- std::unordered_map<tcp_destination_t*, std::shared_ptr<TCPDataDispatcher>> tcp_dispatchers;
+ std::unordered_map<udp_destination_t*, std::shared_ptr<Socket::UDPSocket>> udp_sockets;
+ std::unordered_map<tcp_server_t*, std::shared_ptr<Socket::TCPDataDispatcher>> tcp_dispatchers;
+ std::unordered_map<tcp_client_t*, std::shared_ptr<Socket::TCPSocket>> tcp_senders;
};
}
diff --git a/src/input/Edi.cpp b/src/input/Edi.cpp
new file mode 100644
index 0000000..8aee296
--- /dev/null
+++ b/src/input/Edi.cpp
@@ -0,0 +1,189 @@
+/*
+ Copyright (C) 2009 Her Majesty the Queen in Right of Canada (Communications
+ Research Center Canada)
+
+ Copyright (C) 2019
+ Matthias P. Braendli, matthias.braendli@mpb.li
+
+ http://www.opendigitalradio.org
+ */
+/*
+ This file is part of ODR-DabMux.
+
+ ODR-DabMux is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as
+ published by the Free Software Foundation, either version 3 of the
+ License, or (at your option) any later version.
+
+ ODR-DabMux is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include "input/Edi.h"
+
+#include <regex>
+#include <stdexcept>
+#include <sstream>
+#include <cstring>
+#include <cstdlib>
+#include <cerrno>
+#include <climits>
+#include "utils.h"
+
+using namespace std;
+
+namespace Inputs {
+
+constexpr bool VERBOSE = false;
+constexpr size_t TCP_BLOCKSIZE = 2048;
+constexpr size_t MAX_FRAMES_QUEUED = 10;
+
+Edi::Edi() :
+ m_tcp_receive_server(TCP_BLOCKSIZE),
+ m_sti_writer(),
+ m_sti_decoder(m_sti_writer, VERBOSE)
+{ }
+
+Edi::~Edi() {
+ m_running = false;
+ if (m_thread.joinable()) {
+ m_thread.join();
+ }
+}
+
+int Edi::open(const std::string& name)
+{
+ const std::regex re_udp("udp://:([0-9]+)");
+ const std::regex re_tcp("tcp://(.*):([0-9]+)");
+
+ lock_guard<mutex> lock(m_mutex);
+
+ m_running = false;
+ if (m_thread.joinable()) {
+ m_thread.join();
+ }
+
+ std::smatch m;
+ if (std::regex_match(name, m, re_udp)) {
+ const int udp_port = std::stoi(m[1].str());
+ m_input_used = InputUsed::UDP;
+ m_udp_sock.reinit(udp_port);
+ m_udp_sock.setBlocking(false);
+ // TODO multicast
+ }
+ else if (std::regex_match(name, m, re_tcp)) {
+ m_input_used = InputUsed::TCP;
+ const string addr = m[1].str();
+ const int tcp_port = std::stoi(m[2].str());
+ m_tcp_receive_server.start(tcp_port, addr);
+ }
+ else {
+ throw runtime_error("Cannot parse EDI input URI");
+ }
+
+ m_name = name;
+
+ m_running = true;
+ m_thread = std::thread(&Edi::m_run, this);
+
+ return 0;
+}
+
+int Edi::readFrame(uint8_t* buffer, size_t size)
+{
+ vector<uint8_t> frame;
+ if (m_is_prebuffering) {
+ m_is_prebuffering = m_frames.size() < 10;
+ if (not m_is_prebuffering) {
+ etiLog.level(info) << "EDI input " << m_name << " pre-buffering complete.";
+ }
+ }
+ else if (m_frames.try_pop(frame)) {
+ if (frame.size() == 0) {
+ etiLog.level(debug) << "EDI input " << m_name << " empty frame";
+ return 0;
+ }
+ else if (frame.size() == size) {
+ std::copy(frame.cbegin(), frame.cend(), buffer);
+ }
+ else {
+ etiLog.level(debug) << "EDI input " << m_name << " size mismatch: " << frame.size() <<
+ " received, " << size << " requested";
+ memset(buffer, 0, size * sizeof(*buffer));
+ }
+ }
+ else {
+ memset(buffer, 0, size * sizeof(*buffer));
+ m_is_prebuffering = true;
+ etiLog.level(info) << "EDI input " << m_name << " re-enabling pre-buffering";
+ }
+ return size;
+}
+
+void Edi::m_run()
+{
+ while (m_running) {
+ bool work_done = false;
+
+ switch (m_input_used) {
+ case InputUsed::UDP:
+ {
+ constexpr size_t packsize = 2048;
+ const auto packet = m_udp_sock.receive(packsize);
+ if (packet.buffer.size() == packsize) {
+ fprintf(stderr, "Warning, possible UDP truncation\n");
+ }
+ if (not packet.buffer.empty()) {
+ m_sti_decoder.push_packet(packet.buffer);
+ work_done = true;
+ }
+ }
+ break;
+ case InputUsed::TCP:
+ {
+ auto packet = m_tcp_receive_server.receive();
+ if (not packet.empty()) {
+ m_sti_decoder.push_bytes(packet);
+ work_done = true;
+ }
+ }
+ break;
+ default:
+ throw logic_error("unimplemented input");
+ }
+
+ const auto sti = m_sti_writer.getFrame();
+ if (not sti.frame.empty()) {
+ m_frames.push_wait_if_full(move(sti.frame), MAX_FRAMES_QUEUED);
+ work_done = true;
+ }
+
+ if (not work_done) {
+ // Avoid fast loop
+ this_thread::sleep_for(chrono::milliseconds(12));
+ }
+ }
+}
+
+int Edi::setBitrate(int bitrate)
+{
+ if (bitrate <= 0) {
+ etiLog.level(error) << "Invalid bitrate (" << bitrate << ") for " << m_name;
+ return -1;
+ }
+
+ return bitrate;
+}
+
+int Edi::close()
+{
+ m_udp_sock.close();
+ return 0;
+}
+
+}
diff --git a/src/input/Edi.h b/src/input/Edi.h
new file mode 100644
index 0000000..7b3dc04
--- /dev/null
+++ b/src/input/Edi.h
@@ -0,0 +1,82 @@
+/*
+ Copyright (C) 2009 Her Majesty the Queen in Right of Canada (Communications
+ Research Center Canada)
+
+ Copyright (C) 2019
+ Matthias P. Braendli, matthias.braendli@mpb.li
+
+ http://www.opendigitalradio.org
+ */
+/*
+ This file is part of ODR-DabMux.
+
+ ODR-DabMux is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as
+ published by the Free Software Foundation, either version 3 of the
+ License, or (at your option) any later version.
+
+ ODR-DabMux is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#pragma once
+
+#include <string>
+#include <vector>
+#include <deque>
+#include <thread>
+#include <mutex>
+#include "Socket.h"
+#include "input/inputs.h"
+#include "edi/STIDecoder.hpp"
+#include "edi/STIWriter.hpp"
+#include "ThreadsafeQueue.h"
+
+namespace Inputs {
+
+/*
+ * Receives EDI from UDP or TCP in a separate thread and pushes that data
+ * into the STIDecoder. Complete frames are then put into a queue for the consumer.
+ *
+ * This way, the EDI decoding happens in a separate thread.
+ */
+class Edi : public InputBase {
+ public:
+ Edi();
+ Edi(const Edi&) = delete;
+ Edi& operator=(const Edi&) = delete;
+ ~Edi();
+
+ virtual int open(const std::string& name);
+ virtual int readFrame(uint8_t* buffer, size_t size);
+ virtual int setBitrate(int bitrate);
+ virtual int close();
+
+ protected:
+ void m_run();
+
+ std::mutex m_mutex;
+
+ enum class InputUsed { Invalid, UDP, TCP };
+ InputUsed m_input_used = InputUsed::Invalid;
+ Socket::UDPSocket m_udp_sock;
+ Socket::TCPReceiveServer m_tcp_receive_server;
+
+ EdiDecoder::STIWriter m_sti_writer;
+ EdiDecoder::STIDecoder m_sti_decoder;
+ std::thread m_thread;
+ std::atomic<bool> m_running = ATOMIC_VAR_INIT(false);
+ ThreadsafeQueue<std::vector<uint8_t> > m_frames;
+
+ bool m_is_prebuffering = true;
+
+ std::string m_name;
+};
+
+};
+
diff --git a/src/input/Udp.cpp b/src/input/Udp.cpp
index 2cb49e7..5d4f964 100644
--- a/src/input/Udp.cpp
+++ b/src/input/Udp.cpp
@@ -82,17 +82,8 @@ void Udp::openUdpSocket(const std::string& endpoint)
throw out_of_range("can't use port number 0 in udp address");
}
- if (m_sock.reinit(port, address) == -1) {
- stringstream ss;
- ss << "Could not init UDP socket: " << inetErrMsg;
- throw runtime_error(ss.str());
- }
-
- if (m_sock.setBlocking(false) == -1) {
- stringstream ss;
- ss << "Could not set non-blocking UDP socket: " << inetErrMsg;
- throw runtime_error(ss.str());
- }
+ m_sock.reinit(port, address);
+ m_sock.setBlocking(false);
etiLog.level(info) << "Opened UDP port " << address << ":" << port;
}
@@ -100,17 +91,9 @@ void Udp::openUdpSocket(const std::string& endpoint)
int Udp::readFrame(uint8_t* buffer, size_t size)
{
// Regardless of buffer contents, try receiving data.
- UdpPacket packet(32768);
- int ret = m_sock.receive(packet);
-
- if (ret == -1) {
- stringstream ss;
- ss << "Could not read from UDP socket: " << inetErrMsg;
- throw runtime_error(ss.str());
- }
+ auto packet = m_sock.receive(32768);
- std::copy(packet.getData(), packet.getData() + packet.getSize(),
- back_inserter(m_buffer));
+ std::copy(packet.buffer.cbegin(), packet.buffer.cend(), back_inserter(m_buffer));
// Take data from the buffer if it contains enough data,
// in any case write the buffer
@@ -136,7 +119,8 @@ int Udp::setBitrate(int bitrate)
int Udp::close()
{
- return m_sock.close();
+ m_sock.close();
+ return 0;
}
@@ -167,8 +151,8 @@ static uint16_t unpack2(const uint8_t *buf)
int Sti_d_Rtp::open(const std::string& name)
{
- // Skip the sti-rtp:// part if it is present
- const string endpoint = (name.substr(0, 10) == "sti-rtp://") ?
+ // Skip the rtp:// part if it is present
+ const string endpoint = (name.substr(0, 10) == "rtp://") ?
name.substr(10) : name;
// The endpoint should be address:port
@@ -176,8 +160,8 @@ int Sti_d_Rtp::open(const std::string& name)
if (colon_pos == string::npos) {
stringstream ss;
ss << "'" << name <<
- " is an invalid format for sti-rtp address: "
- "expected [sti-rtp://]address:port";
+ " is an invalid format for rtp address: "
+ "expected [rtp://]address:port";
throw invalid_argument(ss.str());
}
@@ -190,29 +174,22 @@ int Sti_d_Rtp::open(const std::string& name)
void Sti_d_Rtp::receive_packet()
{
- UdpPacket packet(32768);
- int ret = m_sock.receive(packet);
-
- if (ret == -1) {
- stringstream ss;
- ss << "Could not read from UDP socket: " << inetErrMsg;
- throw runtime_error(ss.str());
- }
+ auto packet = m_sock.receive(32768);
- if (packet.getSize() == 0) {
+ if (packet.buffer.empty()) {
// No packet was received
return;
}
const size_t STI_FC_LEN = 8;
- if (packet.getSize() < RTP_HEADER_LEN + STI_SYNC_LEN + STI_FC_LEN) {
+ if (packet.buffer.size() < RTP_HEADER_LEN + STI_SYNC_LEN + STI_FC_LEN) {
etiLog.level(info) << "Received too small RTP packet for " <<
m_name;
return;
}
- if (not rtpHeaderValid(packet.getData())) {
+ if (not rtpHeaderValid(packet.buffer.data())) {
etiLog.level(info) << "Received invalid RTP header for " <<
m_name;
return;
@@ -220,7 +197,7 @@ void Sti_d_Rtp::receive_packet()
// STI(PI, X)
size_t index = RTP_HEADER_LEN;
- const uint8_t *buf = packet.getData();
+ const uint8_t *buf = packet.buffer.data();
// SYNC
index++; // Advance over STAT
@@ -242,7 +219,7 @@ void Sti_d_Rtp::receive_packet()
m_name;
return;
}
- if (packet.getSize() < index + DFS) {
+ if (packet.buffer.size() < index + DFS) {
etiLog.level(info) << "Received STI too small for given DFS for " <<
m_name;
return;
@@ -270,9 +247,9 @@ void Sti_d_Rtp::receive_packet()
uint16_t NST = unpack2(buf+index) & 0x7FF; // 11 bits
index += 2;
- if (packet.getSize() < index + 4*NST) {
+ if (packet.buffer.size() < index + 4*NST) {
etiLog.level(info) << "Received STI too small to contain NST for " <<
- m_name << " packet: " << packet.getSize() << " need " <<
+ m_name << " packet: " << packet.buffer.size() << " need " <<
index + 4*NST;
return;
}
diff --git a/src/input/Udp.h b/src/input/Udp.h
index dc01486..dd637c6 100644
--- a/src/input/Udp.h
+++ b/src/input/Udp.h
@@ -31,7 +31,7 @@
#include <deque>
#include <boost/thread.hpp>
#include "input/inputs.h"
-#include "UdpSocket.h"
+#include "Socket.h"
namespace Inputs {
@@ -46,7 +46,7 @@ class Udp : public InputBase {
virtual int close();
protected:
- UdpSocket m_sock;
+ Socket::UDPSocket m_sock;
std::string m_name;
void openUdpSocket(const std::string& endpoint);