diff options
Diffstat (limited to 'src/dabOutput/dabOutputTcp.cpp')
-rw-r--r-- | src/dabOutput/dabOutputTcp.cpp | 129 |
1 files changed, 3 insertions, 126 deletions
diff --git a/src/dabOutput/dabOutputTcp.cpp b/src/dabOutput/dabOutputTcp.cpp index 7fb17ca..87dbfd5 100644 --- a/src/dabOutput/dabOutputTcp.cpp +++ b/src/dabOutput/dabOutputTcp.cpp @@ -37,7 +37,6 @@ #include <atomic> #include <thread> #include "ThreadsafeQueue.h" -#include "TcpSocket.h" using namespace std; @@ -47,129 +46,7 @@ using vec_u8 = std::vector<uint8_t>; // 250 frames correspond to 6 seconds. This is mostly here // to ensure we do not accumulate data for faulty sockets, delay // management has to be done on the receiver end. -const size_t MAX_QUEUED_ELEMS = 250; - -class TCPConnection -{ - public: - TCPConnection(TcpSocket&& sock) : - queue(), - m_running(true), - m_sender_thread(), - m_sock(move(sock)) { - auto addr = m_sock.getRemoteAddress(); - etiLog.level(debug) << "New TCP Connection from " << - addr.getHostAddress() << ":" << addr.getPort(); - m_sender_thread = std::thread(&TCPConnection::process, this, 0); - } - - ~TCPConnection() { - m_running = false; - vec_u8 termination_marker; - queue.push(termination_marker); - m_sender_thread.join(); - } - - ThreadsafeQueue<vec_u8> queue; - - bool is_overloaded(void) const { - return queue.size() > MAX_QUEUED_ELEMS; - } - - private: - TCPConnection(const TCPConnection& other) = delete; - TCPConnection& operator=(const TCPConnection& other) = delete; - - atomic<bool> m_running; - std::thread m_sender_thread; - TcpSocket m_sock; - - void process(long) { - while (m_running) { - vec_u8 data; - queue.wait_and_pop(data); - - if (data.empty()) { - // empty vector is the termination marker - break; - } - - try { - ssize_t sent = 0; - do { - const int timeout_ms = 10; // Less than one ETI frame - sent = m_sock.send(&data[0], data.size(), timeout_ms); - - if (is_overloaded()) { - m_running = false; - break; - } - } - while (sent == 0); - } - catch (const std::runtime_error& e) { - m_running = false; - } - } - - auto addr = m_sock.getRemoteAddress(); - etiLog.level(debug) << "Dropping TCP Connection from " << - addr.getHostAddress() << ":" << addr.getPort(); - } -}; - -class TCPDataDispatcher -{ - public: - ~TCPDataDispatcher() { - m_running = false; - m_connections.clear(); - m_listener_socket.close(); - m_listener_thread.join(); - } - - void start(int port, const string& address) { - TcpSocket sock(port, address); - m_listener_socket = move(sock); - - m_running = true; - m_listener_thread = std::thread(&TCPDataDispatcher::process, this, 0); - } - - void Write(const vec_u8& data) { - for (auto& connection : m_connections) { - connection.queue.push(data); - } - - m_connections.remove_if([](const TCPConnection& conn){ return conn.is_overloaded(); }); - } - - private: - void process(long) { - try { - m_listener_socket.listen(); - - const int timeout_ms = 1000; - - while (m_running) { - // Add a new TCPConnection to the list, constructing it from the client socket - auto sock = m_listener_socket.accept(timeout_ms); - if (sock.isValid()) { - m_connections.emplace(m_connections.begin(), move(sock)); - } - } - } - catch (const std::runtime_error& e) { - etiLog.level(error) << "TCPDataDispatcher caught runtime error: " << e.what(); - m_running = false; - } - } - - atomic<bool> m_running; - std::thread m_listener_thread; - TcpSocket m_listener_socket; - std::list<TCPConnection> m_connections; -}; +const size_t MAX_QUEUED_ETI_FRAMES = 250; static bool parse_uri(const char *uri, long *port, string& addr) { @@ -217,7 +94,7 @@ int DabOutputTcp::Open(const char* name) uri_ = name; if (success) { - dispatcher_ = make_shared<TCPDataDispatcher>(); + dispatcher_ = make_shared<TCPDataDispatcher>(MAX_QUEUED_ETI_FRAMES); dispatcher_->start(port, address); } else { @@ -237,7 +114,7 @@ int DabOutputTcp::Write(void* buffer, int size) // Pad to 6144 bytes std::fill(data.begin() + size, data.end(), 0x55); - dispatcher_->Write(data); + dispatcher_->write(data); return size; } |