summaryrefslogtreecommitdiffstats
path: root/src/dabOutput
diff options
context:
space:
mode:
authorMatthias P. Braendli <matthias.braendli@mpb.li>2016-09-11 22:15:35 +0200
committerMatthias P. Braendli <matthias.braendli@mpb.li>2016-09-11 22:24:33 +0200
commit9248c1d7976ba1c37e3df147a1eb3115fe72c8d0 (patch)
treee331a2fc4600fe80ca4e2b404d4379989a95d127 /src/dabOutput
parent8750493994d574001e466fef21ded86730359640 (diff)
downloaddabmux-9248c1d7976ba1c37e3df147a1eb3115fe72c8d0.tar.gz
dabmux-9248c1d7976ba1c37e3df147a1eb3115fe72c8d0.tar.bz2
dabmux-9248c1d7976ba1c37e3df147a1eb3115fe72c8d0.zip
Drop SLIP, Refactor sockets, improve TCP output
Quite a large refactoring of the sockets, TCP and UDP, in order to improve the ETI-over-TCP output. This can now accept several simultaneous connections, and requires a throttle. The SLIP input is gone. The UDP inputs are currently broken.
Diffstat (limited to 'src/dabOutput')
-rw-r--r--src/dabOutput/dabOutput.h84
-rw-r--r--src/dabOutput/dabOutputTcp.cpp250
-rw-r--r--src/dabOutput/dabOutputUdp.cpp14
3 files changed, 175 insertions, 173 deletions
diff --git a/src/dabOutput/dabOutput.h b/src/dabOutput/dabOutput.h
index d68cd9c..4a528bd 100644
--- a/src/dabOutput/dabOutput.h
+++ b/src/dabOutput/dabOutput.h
@@ -2,13 +2,13 @@
Copyright (C) 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the Queen in
Right of Canada (Communications Research Center Canada)
- Copyright (C) 2016 Matthias P. Braendli
- http://mpb.li
+ Copyright (C) 2016
+ Matthias P. Braendli, matthias.braendli@mpb.li
- http://opendigitalradio.org
+ http://www.opendigitalradio.org
An object-oriented version of the output channels.
- */
+*/
/*
This file is part of ODR-DabMux.
@@ -30,7 +30,6 @@
#define __DAB_OUTPUT_H
#include "UdpSocket.h"
-#include "TcpServer.h"
#include "Log.h"
#include "string.h"
#include <stdexcept>
@@ -39,20 +38,11 @@
#include <chrono>
#include <memory>
-#ifdef _WIN32
-# include <io.h>
-# ifdef __MINGW32__
-# define FS_DECLARE_CFG_ARRAYS
-# include <winioctl.h>
-# endif
-# include <sdci.h>
-#else
-# include <unistd.h>
-# include <sys/time.h>
-# ifndef O_BINARY
-# define O_BINARY 0
-# endif // O_BINARY
-#endif
+#include <unistd.h>
+#include <sys/time.h>
+#ifndef O_BINARY
+# define O_BINARY 0
+#endif // O_BINARY
#ifdef HAVE_OUTPUT_ZEROMQ
# include "zmq.hpp"
#endif
@@ -168,21 +158,15 @@ class DabOutputRaw : public DabOutput
public:
DabOutputRaw()
{
-#ifdef _WIN32
- socket_ = INVALID_HANDLE_VALUE;
-#else
socket_ = -1;
isCyclades_ = false;
-#endif
buffer_ = new unsigned char[6144];
}
DabOutputRaw(const DabOutputRaw& other)
{
socket_ = other.socket_;
-#ifndef _WIN32
isCyclades_ = other.isCyclades_;
-#endif
buffer_ = new unsigned char[6144];
memcpy(buffer_, other.buffer_, 6144);
}
@@ -202,12 +186,8 @@ class DabOutputRaw : public DabOutput
}
private:
std::string filename_;
-#ifdef _WIN32
- HANDLE socket_;
-#else
int socket_;
bool isCyclades_;
-#endif
unsigned char* buffer_;
};
@@ -216,17 +196,10 @@ class DabOutputUdp : public DabOutput
{
public:
DabOutputUdp() {
- UdpSocket::init();
packet_ = new UdpPacket(6144);
socket_ = new UdpSocket();
}
- // make sure we don't copy this output around
- // the UdpPacket and UdpSocket do not support
- // copying either
- DabOutputUdp(const DabOutputUdp& other);
- DabOutputUdp operator=(const DabOutputUdp& other);
-
~DabOutputUdp() {
delete socket_;
delete packet_;
@@ -240,34 +213,26 @@ class DabOutputUdp : public DabOutput
return "udp://" + uri_;
}
private:
+ // make sure we don't copy this output around
+ // the UdpPacket and UdpSocket do not support
+ // copying either
+ DabOutputUdp(const DabOutputUdp& other) = delete;
+ DabOutputUdp operator=(const DabOutputUdp& other) = delete;
+
std::string uri_;
UdpSocket* socket_;
UdpPacket* packet_;
};
// -------------- TCP ------------------
+class TCPDataDispatcher;
class DabOutputTcp : public DabOutput
{
public:
- DabOutputTcp()
- {
- TcpSocket::init();
- server = new TcpServer();
- client = NULL;
- }
-
- DabOutputTcp(const DabOutputTcp& other);
- DabOutputTcp operator=(const DabOutputTcp& other);
-
- ~DabOutputTcp() {
-
-#ifdef _WIN32
- CloseHandle(this->thread_);
-#endif
-
- delete this->server;
- delete this->client;
- }
+ DabOutputTcp() {}
+ DabOutputTcp(const DabOutputTcp& other) = delete;
+ const DabOutputTcp& operator=(const DabOutputTcp& other) = delete;
+ ~DabOutputTcp();
int Open(const char* name);
int Write(void* buffer, int size);
@@ -277,11 +242,10 @@ class DabOutputTcp : public DabOutput
return "tcp://" + uri_;
}
- TcpServer* server;
- TcpSocket* client;
private:
std::string uri_;
- pthread_t thread_;
+
+ TCPDataDispatcher* dispatcher_;
};
// -------------- Simul ------------------
@@ -306,11 +270,7 @@ class DabOutputSimul : public DabOutput
}
private:
std::string name_;
-#ifdef _WIN32
- DWORD startTime_;
-#else
std::chrono::steady_clock::time_point startTime_;
-#endif
};
#if defined(HAVE_OUTPUT_ZEROMQ)
diff --git a/src/dabOutput/dabOutputTcp.cpp b/src/dabOutput/dabOutputTcp.cpp
index 5f1943d..a8ae1bc 100644
--- a/src/dabOutput/dabOutputTcp.cpp
+++ b/src/dabOutput/dabOutputTcp.cpp
@@ -2,8 +2,10 @@
Copyright (C) 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the Queen in
Right of Canada (Communications Research Center Canada)
- Copyright (C) 2013 Matthias P. Braendli
- http://mpb.li
+ Copyright (C) 2016
+ Matthias P. Braendli, matthias.braendli@mpb.li
+
+ http://www.opendigitalradio.org
TCP output
*/
@@ -28,48 +30,129 @@
#include <signal.h>
#include <limits.h>
#include "dabOutput.h"
-#include "TcpServer.h"
-
-#ifdef _WIN32
-# include <io.h>
-# ifdef __MINGW32__
-# define FS_DECLARE_CFG_ARRAYS
-# include <winioctl.h>
-# endif
-# include <sdci.h>
-#else
-# include <unistd.h>
-# include <sys/time.h>
-# ifndef O_BINARY
-# define O_BINARY 0
-# endif // O_BINARY
-#endif
-
-void* tcpThread(void* param)
+#include <unistd.h>
+#include <sys/time.h>
+#include <list>
+#include <vector>
+#include <atomic>
+#include <boost/thread.hpp>
+#include "ThreadsafeQueue.h"
+#include "TcpSocket.h"
+
+using namespace std;
+
+using vec_u8 = std::vector<uint8_t>;
+
+// In ETI one element would be an ETI frame of 6144 bytes.
+// 250 frames correspond to 6 seconds. This is mostly here
+// to ensure we do not accumulate data for faulty sockets, delay
+// management has to be done on the receiver end.
+const size_t MAX_QUEUED_ELEMS = 250;
+
+class TCPConnection
{
- TcpSocket* client;
+ public:
+ TCPConnection(TcpSocket&& sock) :
+ queue(),
+ m_running(true),
+ m_sender_thread(),
+ m_sock(move(sock)) {
+ auto addr = m_sock.getRemoteAddress();
+ etiLog.level(debug) << "New TCP Connection from " <<
+ addr.getHostAddress() << ":" << addr.getPort();
+ m_sender_thread = boost::thread(&TCPConnection::process, this, 0);
+ }
+
+ ~TCPConnection() {
+ m_running = false;
+ queue.notify();
+ m_sender_thread.join();
+ }
- DabOutputTcp* tcp = (DabOutputTcp*)param;
+ ThreadsafeQueue<vec_u8> queue;
- while ((client = tcp->server->accept()) != NULL) {
- etiLog.log(info, "TCP server got a new client.\n");
- if (tcp->client != NULL) {
- delete tcp->client;
+ bool is_overloaded(void) const {
+ return queue.size() > MAX_QUEUED_ELEMS;
}
- tcp->client = client;
- }
- etiLog.log(error, "TCP thread can't accept new client (%s)\n",
- inetErrDesc, inetErrMsg);
- return NULL;
+ private:
+ TCPConnection(const TCPConnection& other) = delete;
+ TCPConnection& operator=(const TCPConnection& other) = delete;
+
+ atomic<bool> m_running;
+ boost::thread m_sender_thread;
+ TcpSocket m_sock;
+
+ void process(long) {
+ while (m_running) {
+ vec_u8 data;
+ queue.wait_and_pop(data);
+
+ try {
+ m_sock.send(&data[0], data.size());
+ }
+ catch (std::runtime_error& e) {
+ m_running = false;
+ }
+ }
+ }
+};
+
+class TCPDataDispatcher
+{
+ public:
+ ~TCPDataDispatcher() {
+ m_running = false;
+ m_connections.clear();
+ m_listener_socket.close();
+ m_listener_thread.join();
+ }
+
+ void start(int port, const string& address) {
+ TcpSocket sock(port, address);
+ m_listener_socket = move(sock);
+
+ m_running = true;
+ m_listener_thread = boost::thread(&TCPDataDispatcher::process, this, 0);
+ }
+
+ void Write(const vec_u8& data) {
+ for (auto& connection : m_connections) {
+ connection.queue.push(data);
+ }
+
+ m_connections.remove_if([](TCPConnection& conn){ return conn.is_overloaded(); });
+ }
+
+ private:
+ void process(long) {
+ m_listener_socket.listen();
+
+ while (m_running) {
+ // Add a new TCPConnection to the list, constructing it from the client socket
+ m_connections.emplace(m_connections.begin(), m_listener_socket.accept());
+ }
+ }
+
+ atomic<bool> m_running;
+ boost::thread m_listener_thread;
+ TcpSocket m_listener_socket;
+ std::list<TCPConnection> m_connections;
+};
+
+DabOutputTcp::~DabOutputTcp()
+{
+ if (dispatcher_) {
+ delete dispatcher_;
+ dispatcher_ = nullptr;
+ }
}
-int DabOutputTcp::Open(const char* name)
+static bool parse_uri(const char *uri, long *port, string& addr)
{
- char* hostport = strdup(name); // the name is actually an tuple host:port
+ char* const hostport = strdup(uri); // the uri is actually an tuple host:port
char* address;
- long port;
address = strchr((char*)hostport, ':');
if (address == NULL) {
etiLog.log(error,
@@ -82,98 +165,61 @@ int DabOutputTcp::Open(const char* name)
// terminate string hostport after the host, and advance address to the port number
*(address++) = 0;
- port = strtol(address, (char **)NULL, 10);
- if ((port == LONG_MIN) || (port == LONG_MAX)) {
+ *port = strtol(address, (char **)NULL, 10);
+ if ((*port == LONG_MIN) || (*port == LONG_MAX)) {
etiLog.log(error,
"can't convert port number in tcp address %s\n", address);
goto tcp_open_fail;
}
- if (port == 0) {
+ if (*port == 0) {
etiLog.log(error,
"can't use port number 0 in tcp address\n");
goto tcp_open_fail;
}
- address = hostport;
- if (strlen(address) > 0) {
- if (this->server->create(port, address) == -1) {
- etiLog.log(error, "Can't create Tcp server on %s:%i "
- "(%s: %s) -> aborting\n",
- address, port, inetErrDesc, inetErrMsg);
- goto tcp_open_fail;
- }
- } else {
- if (this->server->create(port) == -1) {
- etiLog.log(error, "Can't create Tcp server on :%i "
- "(%s: %s) -> aborting\n",
- port, inetErrDesc, inetErrMsg);
- goto tcp_open_fail;
- }
- }
+ addr = hostport;
+ free(hostport);
+ return true;
- //sprintf(name, "%s:%i", this->packet_->getAddress().getHostAddress(),
- // this->packet_->getAddress().getPort());
+tcp_open_fail:
+ free(hostport);
+ return false;
+}
- if (this->server->listen() == -1) {
- etiLog.log(error, "Can't listen on Tcp socket (%s: %s)\n",
- inetErrDesc, inetErrMsg);
- goto tcp_open_fail;
- }
-#ifdef _WIN32
- this->thread_ = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)tcpThread, this, 0, NULL);
- if (this->thread_ == NULL) {
- fprintf(stderr, "Can't create TCP child");
- goto tcp_open_fail;
+int DabOutputTcp::Open(const char* name)
+{
+ long port = 0;
+ string address;
+ bool success = parse_uri(name, &port, address);
+
+ if (success) {
+ dispatcher_ = new TCPDataDispatcher();
+ dispatcher_->start(port, address);
}
-#else
- if (pthread_create(&this->thread_, NULL, tcpThread, this)) {
- perror("Can't create TCP child");
- goto tcp_open_fail;
+ else {
+ stringstream ss;
+ ss << "Could not parse TCP output address " << name;
+ throw std::runtime_error(ss.str());
}
-#endif
-
return 0;
-
-tcp_open_fail:
- free(hostport);
- return -1;
}
int DabOutputTcp::Write(void* buffer, int size)
{
+ vec_u8 data(6144);
+ uint8_t* buffer_u8 = (uint8_t*)buffer;
- if (this->client != NULL) {
- if (this->client->write(&size, 2) == 2) {
- if (this->client->write(buffer, size) != size) {
- return size;
- }
- }
- else {
- etiLog.log(info, "TCP server client disconnected.\n");
- delete this->client;
- this->client = NULL;
- }
- }
+ std::copy(buffer_u8, buffer_u8 + size, data.begin());
+
+ // Pad to 6144 bytes
+ std::fill(data.begin() + size, data.end(), 0x55);
+
+ dispatcher_->Write(data);
return size;
}
int DabOutputTcp::Close()
{
- this->server->close();
- if( this->client != NULL )
- this->client->close();
-#ifdef WIN32
- DWORD status;
- for (int i = 0; i < 5; ++i) {
- if (GetExitCodeThread(this->thread_, &status)) {
- break;
- }
- Sleep(100);
- }
- TerminateThread(this->thread_, 1);
-#else
- pthread_kill(this->thread_, SIGPIPE);
-#endif
return 0;
}
diff --git a/src/dabOutput/dabOutputUdp.cpp b/src/dabOutput/dabOutputUdp.cpp
index 433ef8f..9d3ea84 100644
--- a/src/dabOutput/dabOutputUdp.cpp
+++ b/src/dabOutput/dabOutputUdp.cpp
@@ -2,8 +2,10 @@
Copyright (C) 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the Queen in
Right of Canada (Communications Research Center Canada)
- Copyright (C) 2013 Matthias P. Braendli
- http://mpb.li
+ Copyright (C) 2016
+ Matthias P. Braendli, matthias.braendli@mpb.li
+
+ http://www.opendigitalradio.org
UDP output
*/
@@ -79,12 +81,6 @@ int DabOutputUdp::Open(const char* name)
this->packet_->getAddress().setPort(port);
- if (this->socket_->create() == -1) {
- etiLog.level(error) << "can't create UDP socket (" <<
- inetErrDesc << ": " << inetErrMsg << ")";
- return -1;
- }
-
string query_params = what[3];
smatch query_what;
if (regex_match(query_params, query_what, re_query, match_default)) {
@@ -132,7 +128,7 @@ int DabOutputUdp::Open(const char* name)
int DabOutputUdp::Write(void* buffer, int size)
{
- this->packet_->setLength(0);
+ this->packet_->setSize(0);
this->packet_->addData(buffer, size);
return this->socket_->send(*this->packet_);
}