From 03967733d70220e2de7af3cdad320aec5c82ede1 Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Tue, 25 Jun 2019 10:50:23 +0200 Subject: Add more EDI input improvements --- lib/Socket.cpp | 97 +++++++++++++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 89 insertions(+), 8 deletions(-) (limited to 'lib/Socket.cpp') diff --git a/lib/Socket.cpp b/lib/Socket.cpp index 9b404eb..cd70a8e 100644 --- a/lib/Socket.cpp +++ b/lib/Socket.cpp @@ -274,6 +274,8 @@ void UDPSocket::setMulticastTTL(int ttl) } } +UDPReceiver::UDPReceiver() { } + UDPReceiver::~UDPReceiver() { m_stop = true; m_sock.close(); @@ -355,7 +357,7 @@ TCPSocket::~TCPSocket() TCPSocket::TCPSocket(TCPSocket&& other) : m_sock(other.m_sock), - m_remote_address(other.m_remote_address) + m_remote_address(move(other.m_remote_address)) { if (other.m_sock != -1) { other.m_sock = -1; @@ -364,9 +366,9 @@ TCPSocket::TCPSocket(TCPSocket&& other) : TCPSocket& TCPSocket::operator=(TCPSocket&& other) { - m_sock = other.m_sock; - m_remote_address = other.m_remote_address; + swap(m_remote_address, other.m_remote_address); + m_sock = other.m_sock; if (other.m_sock != -1) { other.m_sock = -1; } @@ -487,14 +489,21 @@ void TCPSocket::listen(int port, const string& name) freeaddrinfo(result); + if (m_sock != INVALID_SOCKET) { #if defined(HAVE_SO_NOSIGPIPE) - int val = 1; - if (setsockopt(m_sock, SOL_SOCKET, SO_NOSIGPIPE, - &val, sizeof(val)) < 0) { - throw std::runtime_error("Can't set SO_NOSIGPIPE"); - } + int val = 1; + if (setsockopt(m_sock, SOL_SOCKET, SO_NOSIGPIPE, + &val, sizeof(val)) < 0) { + throw std::runtime_error("Can't set SO_NOSIGPIPE"); + } #endif + int ret = ::listen(m_sock, 0); + if (ret == -1) { + throw std::runtime_error(string("Could not listen: ") + strerror(errno)); + } + } + if (rp == nullptr) { throw runtime_error("Could not bind"); } @@ -814,4 +823,76 @@ void TCPDataDispatcher::process() } } +TCPReceiveServer::TCPReceiveServer(size_t blocksize) : + m_blocksize(blocksize) +{ +} + +void TCPReceiveServer::start(int listen_port, const std::string& address) +{ + m_listener_socket.listen(listen_port, address); + + m_running = true; + m_listener_thread = std::thread(&TCPReceiveServer::process, this); +} + +TCPReceiveServer::~TCPReceiveServer() +{ + m_running = false; + if (m_listener_thread.joinable()) { + m_listener_thread.join(); + } +} + +vector TCPReceiveServer::receive() +{ + vector buffer; + m_queue.try_pop(buffer); + + // we can ignore try_pop()'s return value, because + // if it is unsuccessful the buffer is not touched. + return buffer; +} + +void TCPReceiveServer::process() +{ + constexpr int timeout_ms = 1000; + constexpr int disconnect_timeout_ms = 10000; + constexpr int max_num_timeouts = disconnect_timeout_ms / timeout_ms; + + while (m_running) { + auto sock = m_listener_socket.accept(timeout_ms); + + int num_timeouts = 0; + + while (m_running and sock.valid()) { + try { + vector buf(m_blocksize); + ssize_t r = sock.recv(buf.data(), buf.size(), 0, timeout_ms); + if (r < 0) { + throw logic_error("Invalid recv return value"); + } + else if (r == 0) { + sock.close(); + break; + } + else { + buf.resize(r); + m_queue.push(move(buf)); + } + } + catch (const TCPSocket::Interrupted&) { + break; + } + catch (const TCPSocket::Timeout&) { + num_timeouts++; + } + + if (num_timeouts > max_num_timeouts) { + sock.close(); + } + } + } +} + } -- cgit v1.2.3