summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMatthias P. Braendli <matthias.braendli@mpb.li>2019-05-06 11:45:08 +0200
committerMatthias P. Braendli <matthias.braendli@mpb.li>2019-05-06 11:45:08 +0200
commitb74b895dd44f5a76d581b8dec65dbf76dd5cece9 (patch)
treed0420ab64fc3b89ad0a3bd2057df417fa5a987fa
parent4b77ed3e2ae60ac5577a850b4c9c7803f01069f2 (diff)
downloaddabmux-b74b895dd44f5a76d581b8dec65dbf76dd5cece9.tar.gz
dabmux-b74b895dd44f5a76d581b8dec65dbf76dd5cece9.tar.bz2
dabmux-b74b895dd44f5a76d581b8dec65dbf76dd5cece9.zip
Move TCPDataDispatcher into TcpSocket
-rw-r--r--src/TcpSocket.cpp130
-rw-r--r--src/TcpSocket.h57
-rw-r--r--src/dabOutput/dabOutput.h2
-rw-r--r--src/dabOutput/dabOutputTcp.cpp129
4 files changed, 180 insertions, 138 deletions
diff --git a/src/TcpSocket.cpp b/src/TcpSocket.cpp
index 76e9c2e..c05eace 100644
--- a/src/TcpSocket.cpp
+++ b/src/TcpSocket.cpp
@@ -2,7 +2,7 @@
Copyright (C) 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the Queen in
Right of Canada (Communications Research Center Canada)
- Copyright (C) 2017
+ Copyright (C) 2019
Matthias P. Braendli, matthias.braendli@mpb.li
http://www.opendigitalradio.org
@@ -27,15 +27,19 @@
#include "TcpSocket.h"
#include "Log.h"
#include <iostream>
-#include <stdio.h>
-#include <errno.h>
-#include <string.h>
+#include <cstdio>
+#include <cstring>
+#include <cstdint>
#include <signal.h>
-#include <stdint.h>
+#include <errno.h>
#include <poll.h>
+#include <thread>
using namespace std;
+using vec_u8 = std::vector<uint8_t>;
+
+
TcpSocket::TcpSocket() :
m_sock(INVALID_SOCKET)
{
@@ -147,7 +151,7 @@ ssize_t TcpSocket::send(const void* data, size_t size, int timeout_ms)
fds[0].fd = m_sock;
fds[0].events = POLLOUT;
- int retval = poll(fds, 1, timeout_ms);
+ const int retval = poll(fds, 1, timeout_ms);
if (retval == -1) {
stringstream ss;
@@ -168,7 +172,7 @@ ssize_t TcpSocket::send(const void* data, size_t size, int timeout_ms)
#else
const int flags = 0;
#endif
- ssize_t ret = ::send(m_sock, (const char*)data, size, flags);
+ const ssize_t ret = ::send(m_sock, (const char*)data, size, flags);
if (ret == SOCKET_ERROR) {
stringstream ss;
@@ -236,3 +240,115 @@ InetAddress TcpSocket::getRemoteAddress() const
{
return m_remote_address;
}
+
+
+TCPConnection::TCPConnection(TcpSocket&& sock) :
+ queue(),
+ m_running(true),
+ m_sender_thread(),
+ m_sock(move(sock))
+{
+ auto addr = m_sock.getRemoteAddress();
+ etiLog.level(debug) << "New TCP Connection from " <<
+ addr.getHostAddress() << ":" << addr.getPort();
+ m_sender_thread = std::thread(&TCPConnection::process, this);
+}
+
+TCPConnection::~TCPConnection()
+{
+ m_running = false;
+ vec_u8 termination_marker;
+ queue.push(termination_marker);
+ m_sender_thread.join();
+}
+
+void TCPConnection::process()
+{
+ while (m_running) {
+ vec_u8 data;
+ queue.wait_and_pop(data);
+
+ if (data.empty()) {
+ // empty vector is the termination marker
+ m_running = false;
+ break;
+ }
+
+ try {
+ ssize_t remaining = data.size();
+ const uint8_t *buf = reinterpret_cast<const uint8_t*>(data.data());
+ const int timeout_ms = 10; // Less than one ETI frame
+
+ while (m_running and remaining > 0) {
+ const ssize_t sent = m_sock.send(buf, remaining, timeout_ms);
+ if (sent < 0 or sent > remaining) {
+ throw std::logic_error("Invalid TcpSocket::send() return value");
+ }
+ remaining -= sent;
+ buf += sent;
+ }
+ }
+ catch (const std::runtime_error& e) {
+ m_running = false;
+ }
+ }
+
+ auto addr = m_sock.getRemoteAddress();
+ etiLog.level(debug) << "Dropping TCP Connection from " <<
+ addr.getHostAddress() << ":" << addr.getPort();
+}
+
+
+TCPDataDispatcher::TCPDataDispatcher(size_t max_queue_size) :
+ m_max_queue_size(max_queue_size)
+{
+}
+
+TCPDataDispatcher::~TCPDataDispatcher()
+{
+ m_running = false;
+ m_connections.clear();
+ m_listener_socket.close();
+ m_listener_thread.join();
+}
+
+void TCPDataDispatcher::start(int port, const string& address)
+{
+ TcpSocket sock(port, address);
+ m_listener_socket = move(sock);
+
+ m_running = true;
+ m_listener_thread = std::thread(&TCPDataDispatcher::process, this);
+}
+
+void TCPDataDispatcher::write(const vec_u8& data)
+{
+ for (auto& connection : m_connections) {
+ connection.queue.push(data);
+ }
+
+ m_connections.remove_if(
+ [&](const TCPConnection& conn){ return conn.queue.size() > m_max_queue_size; });
+}
+
+void TCPDataDispatcher::process()
+{
+ try {
+ m_listener_socket.listen();
+
+ const int timeout_ms = 1000;
+
+ while (m_running) {
+ // Add a new TCPConnection to the list, constructing it from the client socket
+ auto sock = m_listener_socket.accept(timeout_ms);
+ if (sock.isValid()) {
+ m_connections.emplace(m_connections.begin(), move(sock));
+ }
+ }
+ }
+ catch (const std::runtime_error& e) {
+ etiLog.level(error) << "TCPDataDispatcher caught runtime error: " << e.what();
+ m_running = false;
+ }
+}
+
diff --git a/src/TcpSocket.h b/src/TcpSocket.h
index 9ff09e5..ec7afd3 100644
--- a/src/TcpSocket.h
+++ b/src/TcpSocket.h
@@ -2,7 +2,7 @@
Copyright (C) 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the Queen in
Right of Canada (Communications Research Center Canada)
- Copyright (C) 2017
+ Copyright (C) 2019
Matthias P. Braendli, matthias.braendli@mpb.li
http://www.opendigitalradio.org
@@ -32,6 +32,7 @@
#endif
#include "InetAddress.h"
+#include "ThreadsafeQueue.h"
#include <sys/socket.h>
#include <netinet/in.h>
#include <unistd.h>
@@ -45,8 +46,11 @@
#include <iostream>
#include <string>
-
+#include <vector>
#include <memory>
+#include <atomic>
+#include <thread>
+#include <list>
/**
* This class represents a TCP socket.
@@ -77,8 +81,8 @@ class TcpSocket
/** Send data over the TCP connection.
* @param data The buffer that will be sent.
* @param size Number of bytes to send.
- * @param timeout_ms number of milliseconds before timeout
- * return number of bytes sent or -1 if error
+ * @param timeout_ms number of milliseconds before timeout, or 0 for infinite timeout
+ * return number of bytes sent, 0 on timeout, or throws runtime_error.
*/
ssize_t send(const void* data, size_t size, int timeout_ms=0);
@@ -112,4 +116,49 @@ class TcpSocket
SOCKET m_sock;
};
+/* Helper class for TCPDataDispatcher, contains a queue of pending data and
+ * a sender thread. */
+class TCPConnection
+{
+ public:
+ TCPConnection(TcpSocket&& sock);
+ TCPConnection(const TCPConnection&) = delete;
+ TCPConnection& operator=(const TCPConnection&) = delete;
+ ~TCPConnection();
+
+ ThreadsafeQueue<std::vector<uint8_t> > queue;
+
+ private:
+ std::atomic<bool> m_running;
+ std::thread m_sender_thread;
+ TcpSocket m_sock;
+
+ void process(void);
+};
+
+/* Send a TCP stream to several destinations, and automatically disconnect destinations
+ * whose buffer overflows.
+ */
+class TCPDataDispatcher
+{
+ public:
+ TCPDataDispatcher(size_t max_queue_size);
+ ~TCPDataDispatcher();
+ TCPDataDispatcher(const TCPDataDispatcher&) = delete;
+ TCPDataDispatcher& operator=(const TCPDataDispatcher&) = delete;
+
+ void start(int port, const std::string& address);
+ void write(const std::vector<uint8_t>& data);
+
+ private:
+ void process(void);
+
+ size_t m_max_queue_size;
+
+ std::atomic<bool> m_running;
+ std::thread m_listener_thread;
+ TcpSocket m_listener_socket;
+ std::list<TCPConnection> m_connections;
+};
+
#endif // _TCPSOCKET
diff --git a/src/dabOutput/dabOutput.h b/src/dabOutput/dabOutput.h
index 677fffc..e5a8a94 100644
--- a/src/dabOutput/dabOutput.h
+++ b/src/dabOutput/dabOutput.h
@@ -29,6 +29,7 @@
#pragma once
#include "UdpSocket.h"
+#include "TcpSocket.h"
#include "Log.h"
#include "string.h"
#include <stdexcept>
@@ -203,7 +204,6 @@ class DabOutputUdp : public DabOutput
};
// -------------- TCP ------------------
-class TCPDataDispatcher;
class DabOutputTcp : public DabOutput
{
public:
diff --git a/src/dabOutput/dabOutputTcp.cpp b/src/dabOutput/dabOutputTcp.cpp
index 7fb17ca..87dbfd5 100644
--- a/src/dabOutput/dabOutputTcp.cpp
+++ b/src/dabOutput/dabOutputTcp.cpp
@@ -37,7 +37,6 @@
#include <atomic>
#include <thread>
#include "ThreadsafeQueue.h"
-#include "TcpSocket.h"
using namespace std;
@@ -47,129 +46,7 @@ using vec_u8 = std::vector<uint8_t>;
// 250 frames correspond to 6 seconds. This is mostly here
// to ensure we do not accumulate data for faulty sockets, delay
// management has to be done on the receiver end.
-const size_t MAX_QUEUED_ELEMS = 250;
-
-class TCPConnection
-{
- public:
- TCPConnection(TcpSocket&& sock) :
- queue(),
- m_running(true),
- m_sender_thread(),
- m_sock(move(sock)) {
- auto addr = m_sock.getRemoteAddress();
- etiLog.level(debug) << "New TCP Connection from " <<
- addr.getHostAddress() << ":" << addr.getPort();
- m_sender_thread = std::thread(&TCPConnection::process, this, 0);
- }
-
- ~TCPConnection() {
- m_running = false;
- vec_u8 termination_marker;
- queue.push(termination_marker);
- m_sender_thread.join();
- }
-
- ThreadsafeQueue<vec_u8> queue;
-
- bool is_overloaded(void) const {
- return queue.size() > MAX_QUEUED_ELEMS;
- }
-
- private:
- TCPConnection(const TCPConnection& other) = delete;
- TCPConnection& operator=(const TCPConnection& other) = delete;
-
- atomic<bool> m_running;
- std::thread m_sender_thread;
- TcpSocket m_sock;
-
- void process(long) {
- while (m_running) {
- vec_u8 data;
- queue.wait_and_pop(data);
-
- if (data.empty()) {
- // empty vector is the termination marker
- break;
- }
-
- try {
- ssize_t sent = 0;
- do {
- const int timeout_ms = 10; // Less than one ETI frame
- sent = m_sock.send(&data[0], data.size(), timeout_ms);
-
- if (is_overloaded()) {
- m_running = false;
- break;
- }
- }
- while (sent == 0);
- }
- catch (const std::runtime_error& e) {
- m_running = false;
- }
- }
-
- auto addr = m_sock.getRemoteAddress();
- etiLog.level(debug) << "Dropping TCP Connection from " <<
- addr.getHostAddress() << ":" << addr.getPort();
- }
-};
-
-class TCPDataDispatcher
-{
- public:
- ~TCPDataDispatcher() {
- m_running = false;
- m_connections.clear();
- m_listener_socket.close();
- m_listener_thread.join();
- }
-
- void start(int port, const string& address) {
- TcpSocket sock(port, address);
- m_listener_socket = move(sock);
-
- m_running = true;
- m_listener_thread = std::thread(&TCPDataDispatcher::process, this, 0);
- }
-
- void Write(const vec_u8& data) {
- for (auto& connection : m_connections) {
- connection.queue.push(data);
- }
-
- m_connections.remove_if([](const TCPConnection& conn){ return conn.is_overloaded(); });
- }
-
- private:
- void process(long) {
- try {
- m_listener_socket.listen();
-
- const int timeout_ms = 1000;
-
- while (m_running) {
- // Add a new TCPConnection to the list, constructing it from the client socket
- auto sock = m_listener_socket.accept(timeout_ms);
- if (sock.isValid()) {
- m_connections.emplace(m_connections.begin(), move(sock));
- }
- }
- }
- catch (const std::runtime_error& e) {
- etiLog.level(error) << "TCPDataDispatcher caught runtime error: " << e.what();
- m_running = false;
- }
- }
-
- atomic<bool> m_running;
- std::thread m_listener_thread;
- TcpSocket m_listener_socket;
- std::list<TCPConnection> m_connections;
-};
+const size_t MAX_QUEUED_ETI_FRAMES = 250;
static bool parse_uri(const char *uri, long *port, string& addr)
{
@@ -217,7 +94,7 @@ int DabOutputTcp::Open(const char* name)
uri_ = name;
if (success) {
- dispatcher_ = make_shared<TCPDataDispatcher>();
+ dispatcher_ = make_shared<TCPDataDispatcher>(MAX_QUEUED_ETI_FRAMES);
dispatcher_->start(port, address);
}
else {
@@ -237,7 +114,7 @@ int DabOutputTcp::Write(void* buffer, int size)
// Pad to 6144 bytes
std::fill(data.begin() + size, data.end(), 0x55);
- dispatcher_->Write(data);
+ dispatcher_->write(data);
return size;
}