summaryrefslogtreecommitdiffstats
path: root/src/dabOutput/dabOutputTcp.cpp
diff options
context:
space:
mode:
authorMatthias P. Braendli <matthias.braendli@mpb.li>2016-09-11 22:15:35 +0200
committerMatthias P. Braendli <matthias.braendli@mpb.li>2016-09-11 22:24:33 +0200
commit9248c1d7976ba1c37e3df147a1eb3115fe72c8d0 (patch)
treee331a2fc4600fe80ca4e2b404d4379989a95d127 /src/dabOutput/dabOutputTcp.cpp
parent8750493994d574001e466fef21ded86730359640 (diff)
downloaddabmux-9248c1d7976ba1c37e3df147a1eb3115fe72c8d0.tar.gz
dabmux-9248c1d7976ba1c37e3df147a1eb3115fe72c8d0.tar.bz2
dabmux-9248c1d7976ba1c37e3df147a1eb3115fe72c8d0.zip
Drop SLIP, Refactor sockets, improve TCP output
Quite a large refactoring of the sockets, TCP and UDP, in order to improve the ETI-over-TCP output. This can now accept several simultaneous connections, and requires a throttle. The SLIP input is gone. The UDP inputs are currently broken.
Diffstat (limited to 'src/dabOutput/dabOutputTcp.cpp')
-rw-r--r--src/dabOutput/dabOutputTcp.cpp250
1 files changed, 148 insertions, 102 deletions
diff --git a/src/dabOutput/dabOutputTcp.cpp b/src/dabOutput/dabOutputTcp.cpp
index 5f1943d..a8ae1bc 100644
--- a/src/dabOutput/dabOutputTcp.cpp
+++ b/src/dabOutput/dabOutputTcp.cpp
@@ -2,8 +2,10 @@
Copyright (C) 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the Queen in
Right of Canada (Communications Research Center Canada)
- Copyright (C) 2013 Matthias P. Braendli
- http://mpb.li
+ Copyright (C) 2016
+ Matthias P. Braendli, matthias.braendli@mpb.li
+
+ http://www.opendigitalradio.org
TCP output
*/
@@ -28,48 +30,129 @@
#include <signal.h>
#include <limits.h>
#include "dabOutput.h"
-#include "TcpServer.h"
-
-#ifdef _WIN32
-# include <io.h>
-# ifdef __MINGW32__
-# define FS_DECLARE_CFG_ARRAYS
-# include <winioctl.h>
-# endif
-# include <sdci.h>
-#else
-# include <unistd.h>
-# include <sys/time.h>
-# ifndef O_BINARY
-# define O_BINARY 0
-# endif // O_BINARY
-#endif
-
-void* tcpThread(void* param)
+#include <unistd.h>
+#include <sys/time.h>
+#include <list>
+#include <vector>
+#include <atomic>
+#include <boost/thread.hpp>
+#include "ThreadsafeQueue.h"
+#include "TcpSocket.h"
+
+using namespace std;
+
+using vec_u8 = std::vector<uint8_t>;
+
+// In ETI one element would be an ETI frame of 6144 bytes.
+// 250 frames correspond to 6 seconds. This is mostly here
+// to ensure we do not accumulate data for faulty sockets, delay
+// management has to be done on the receiver end.
+const size_t MAX_QUEUED_ELEMS = 250;
+
+class TCPConnection
{
- TcpSocket* client;
+ public:
+ TCPConnection(TcpSocket&& sock) :
+ queue(),
+ m_running(true),
+ m_sender_thread(),
+ m_sock(move(sock)) {
+ auto addr = m_sock.getRemoteAddress();
+ etiLog.level(debug) << "New TCP Connection from " <<
+ addr.getHostAddress() << ":" << addr.getPort();
+ m_sender_thread = boost::thread(&TCPConnection::process, this, 0);
+ }
+
+ ~TCPConnection() {
+ m_running = false;
+ queue.notify();
+ m_sender_thread.join();
+ }
- DabOutputTcp* tcp = (DabOutputTcp*)param;
+ ThreadsafeQueue<vec_u8> queue;
- while ((client = tcp->server->accept()) != NULL) {
- etiLog.log(info, "TCP server got a new client.\n");
- if (tcp->client != NULL) {
- delete tcp->client;
+ bool is_overloaded(void) const {
+ return queue.size() > MAX_QUEUED_ELEMS;
}
- tcp->client = client;
- }
- etiLog.log(error, "TCP thread can't accept new client (%s)\n",
- inetErrDesc, inetErrMsg);
- return NULL;
+ private:
+ TCPConnection(const TCPConnection& other) = delete;
+ TCPConnection& operator=(const TCPConnection& other) = delete;
+
+ atomic<bool> m_running;
+ boost::thread m_sender_thread;
+ TcpSocket m_sock;
+
+ void process(long) {
+ while (m_running) {
+ vec_u8 data;
+ queue.wait_and_pop(data);
+
+ try {
+ m_sock.send(&data[0], data.size());
+ }
+ catch (std::runtime_error& e) {
+ m_running = false;
+ }
+ }
+ }
+};
+
+class TCPDataDispatcher
+{
+ public:
+ ~TCPDataDispatcher() {
+ m_running = false;
+ m_connections.clear();
+ m_listener_socket.close();
+ m_listener_thread.join();
+ }
+
+ void start(int port, const string& address) {
+ TcpSocket sock(port, address);
+ m_listener_socket = move(sock);
+
+ m_running = true;
+ m_listener_thread = boost::thread(&TCPDataDispatcher::process, this, 0);
+ }
+
+ void Write(const vec_u8& data) {
+ for (auto& connection : m_connections) {
+ connection.queue.push(data);
+ }
+
+ m_connections.remove_if([](TCPConnection& conn){ return conn.is_overloaded(); });
+ }
+
+ private:
+ void process(long) {
+ m_listener_socket.listen();
+
+ while (m_running) {
+ // Add a new TCPConnection to the list, constructing it from the client socket
+ m_connections.emplace(m_connections.begin(), m_listener_socket.accept());
+ }
+ }
+
+ atomic<bool> m_running;
+ boost::thread m_listener_thread;
+ TcpSocket m_listener_socket;
+ std::list<TCPConnection> m_connections;
+};
+
+DabOutputTcp::~DabOutputTcp()
+{
+ if (dispatcher_) {
+ delete dispatcher_;
+ dispatcher_ = nullptr;
+ }
}
-int DabOutputTcp::Open(const char* name)
+static bool parse_uri(const char *uri, long *port, string& addr)
{
- char* hostport = strdup(name); // the name is actually an tuple host:port
+ char* const hostport = strdup(uri); // the uri is actually an tuple host:port
char* address;
- long port;
address = strchr((char*)hostport, ':');
if (address == NULL) {
etiLog.log(error,
@@ -82,98 +165,61 @@ int DabOutputTcp::Open(const char* name)
// terminate string hostport after the host, and advance address to the port number
*(address++) = 0;
- port = strtol(address, (char **)NULL, 10);
- if ((port == LONG_MIN) || (port == LONG_MAX)) {
+ *port = strtol(address, (char **)NULL, 10);
+ if ((*port == LONG_MIN) || (*port == LONG_MAX)) {
etiLog.log(error,
"can't convert port number in tcp address %s\n", address);
goto tcp_open_fail;
}
- if (port == 0) {
+ if (*port == 0) {
etiLog.log(error,
"can't use port number 0 in tcp address\n");
goto tcp_open_fail;
}
- address = hostport;
- if (strlen(address) > 0) {
- if (this->server->create(port, address) == -1) {
- etiLog.log(error, "Can't create Tcp server on %s:%i "
- "(%s: %s) -> aborting\n",
- address, port, inetErrDesc, inetErrMsg);
- goto tcp_open_fail;
- }
- } else {
- if (this->server->create(port) == -1) {
- etiLog.log(error, "Can't create Tcp server on :%i "
- "(%s: %s) -> aborting\n",
- port, inetErrDesc, inetErrMsg);
- goto tcp_open_fail;
- }
- }
+ addr = hostport;
+ free(hostport);
+ return true;
- //sprintf(name, "%s:%i", this->packet_->getAddress().getHostAddress(),
- // this->packet_->getAddress().getPort());
+tcp_open_fail:
+ free(hostport);
+ return false;
+}
- if (this->server->listen() == -1) {
- etiLog.log(error, "Can't listen on Tcp socket (%s: %s)\n",
- inetErrDesc, inetErrMsg);
- goto tcp_open_fail;
- }
-#ifdef _WIN32
- this->thread_ = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)tcpThread, this, 0, NULL);
- if (this->thread_ == NULL) {
- fprintf(stderr, "Can't create TCP child");
- goto tcp_open_fail;
+int DabOutputTcp::Open(const char* name)
+{
+ long port = 0;
+ string address;
+ bool success = parse_uri(name, &port, address);
+
+ if (success) {
+ dispatcher_ = new TCPDataDispatcher();
+ dispatcher_->start(port, address);
}
-#else
- if (pthread_create(&this->thread_, NULL, tcpThread, this)) {
- perror("Can't create TCP child");
- goto tcp_open_fail;
+ else {
+ stringstream ss;
+ ss << "Could not parse TCP output address " << name;
+ throw std::runtime_error(ss.str());
}
-#endif
-
return 0;
-
-tcp_open_fail:
- free(hostport);
- return -1;
}
int DabOutputTcp::Write(void* buffer, int size)
{
+ vec_u8 data(6144);
+ uint8_t* buffer_u8 = (uint8_t*)buffer;
- if (this->client != NULL) {
- if (this->client->write(&size, 2) == 2) {
- if (this->client->write(buffer, size) != size) {
- return size;
- }
- }
- else {
- etiLog.log(info, "TCP server client disconnected.\n");
- delete this->client;
- this->client = NULL;
- }
- }
+ std::copy(buffer_u8, buffer_u8 + size, data.begin());
+
+ // Pad to 6144 bytes
+ std::fill(data.begin() + size, data.end(), 0x55);
+
+ dispatcher_->Write(data);
return size;
}
int DabOutputTcp::Close()
{
- this->server->close();
- if( this->client != NULL )
- this->client->close();
-#ifdef WIN32
- DWORD status;
- for (int i = 0; i < 5; ++i) {
- if (GetExitCodeThread(this->thread_, &status)) {
- break;
- }
- Sleep(100);
- }
- TerminateThread(this->thread_, 1);
-#else
- pthread_kill(this->thread_, SIGPIPE);
-#endif
return 0;
}