summaryrefslogtreecommitdiffstats
path: root/lib/Socket.cpp
diff options
context:
space:
mode:
authorMatthias P. Braendli <matthias.braendli@mpb.li>2019-06-25 10:50:23 +0200
committerMatthias P. Braendli <matthias.braendli@mpb.li>2019-06-25 10:50:23 +0200
commit03967733d70220e2de7af3cdad320aec5c82ede1 (patch)
tree4a1bd7adfb8825c95cfc1fa0c69f857aef234561 /lib/Socket.cpp
parent15d7ad8ac5bb187ac323da7dc30b9724b18c7df7 (diff)
downloaddabmux-03967733d70220e2de7af3cdad320aec5c82ede1.tar.gz
dabmux-03967733d70220e2de7af3cdad320aec5c82ede1.tar.bz2
dabmux-03967733d70220e2de7af3cdad320aec5c82ede1.zip
Add more EDI input improvements
Diffstat (limited to 'lib/Socket.cpp')
-rw-r--r--lib/Socket.cpp97
1 files changed, 89 insertions, 8 deletions
diff --git a/lib/Socket.cpp b/lib/Socket.cpp
index 9b404eb..cd70a8e 100644
--- a/lib/Socket.cpp
+++ b/lib/Socket.cpp
@@ -274,6 +274,8 @@ void UDPSocket::setMulticastTTL(int ttl)
}
}
+UDPReceiver::UDPReceiver() { }
+
UDPReceiver::~UDPReceiver() {
m_stop = true;
m_sock.close();
@@ -355,7 +357,7 @@ TCPSocket::~TCPSocket()
TCPSocket::TCPSocket(TCPSocket&& other) :
m_sock(other.m_sock),
- m_remote_address(other.m_remote_address)
+ m_remote_address(move(other.m_remote_address))
{
if (other.m_sock != -1) {
other.m_sock = -1;
@@ -364,9 +366,9 @@ TCPSocket::TCPSocket(TCPSocket&& other) :
TCPSocket& TCPSocket::operator=(TCPSocket&& other)
{
- m_sock = other.m_sock;
- m_remote_address = other.m_remote_address;
+ swap(m_remote_address, other.m_remote_address);
+ m_sock = other.m_sock;
if (other.m_sock != -1) {
other.m_sock = -1;
}
@@ -487,14 +489,21 @@ void TCPSocket::listen(int port, const string& name)
freeaddrinfo(result);
+ if (m_sock != INVALID_SOCKET) {
#if defined(HAVE_SO_NOSIGPIPE)
- int val = 1;
- if (setsockopt(m_sock, SOL_SOCKET, SO_NOSIGPIPE,
- &val, sizeof(val)) < 0) {
- throw std::runtime_error("Can't set SO_NOSIGPIPE");
- }
+ int val = 1;
+ if (setsockopt(m_sock, SOL_SOCKET, SO_NOSIGPIPE,
+ &val, sizeof(val)) < 0) {
+ throw std::runtime_error("Can't set SO_NOSIGPIPE");
+ }
#endif
+ int ret = ::listen(m_sock, 0);
+ if (ret == -1) {
+ throw std::runtime_error(string("Could not listen: ") + strerror(errno));
+ }
+ }
+
if (rp == nullptr) {
throw runtime_error("Could not bind");
}
@@ -814,4 +823,76 @@ void TCPDataDispatcher::process()
}
}
+TCPReceiveServer::TCPReceiveServer(size_t blocksize) :
+ m_blocksize(blocksize)
+{
+}
+
+void TCPReceiveServer::start(int listen_port, const std::string& address)
+{
+ m_listener_socket.listen(listen_port, address);
+
+ m_running = true;
+ m_listener_thread = std::thread(&TCPReceiveServer::process, this);
+}
+
+TCPReceiveServer::~TCPReceiveServer()
+{
+ m_running = false;
+ if (m_listener_thread.joinable()) {
+ m_listener_thread.join();
+ }
+}
+
+vector<uint8_t> TCPReceiveServer::receive()
+{
+ vector<uint8_t> buffer;
+ m_queue.try_pop(buffer);
+
+ // we can ignore try_pop()'s return value, because
+ // if it is unsuccessful the buffer is not touched.
+ return buffer;
+}
+
+void TCPReceiveServer::process()
+{
+ constexpr int timeout_ms = 1000;
+ constexpr int disconnect_timeout_ms = 10000;
+ constexpr int max_num_timeouts = disconnect_timeout_ms / timeout_ms;
+
+ while (m_running) {
+ auto sock = m_listener_socket.accept(timeout_ms);
+
+ int num_timeouts = 0;
+
+ while (m_running and sock.valid()) {
+ try {
+ vector<uint8_t> buf(m_blocksize);
+ ssize_t r = sock.recv(buf.data(), buf.size(), 0, timeout_ms);
+ if (r < 0) {
+ throw logic_error("Invalid recv return value");
+ }
+ else if (r == 0) {
+ sock.close();
+ break;
+ }
+ else {
+ buf.resize(r);
+ m_queue.push(move(buf));
+ }
+ }
+ catch (const TCPSocket::Interrupted&) {
+ break;
+ }
+ catch (const TCPSocket::Timeout&) {
+ num_timeouts++;
+ }
+
+ if (num_timeouts > max_num_timeouts) {
+ sock.close();
+ }
+ }
+ }
+}
+
}