From 0bdd2960707a8abb5d9e435d1313aec4cf264ab1 Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Tue, 6 Mar 2018 09:31:43 +0100 Subject: Implement TCP input auto-reconnect --- Makefile.am | 1 + src/DabMod.cpp | 8 +- src/InputReader.h | 8 +- src/InputTcpReader.cpp | 62 ++--------- src/Socket.cpp | 275 +++++++++++++++++++++++++++++++++++++++++++++++++ src/Socket.h | 189 ++++++++++----------------------- 6 files changed, 347 insertions(+), 196 deletions(-) create mode 100644 src/Socket.cpp diff --git a/Makefile.am b/Makefile.am index 9f53e32..e323b1d 100644 --- a/Makefile.am +++ b/Makefile.am @@ -45,6 +45,7 @@ odr_dabmod_LDADD = $(FFT_LDADD) $(BOOST_LDFLAGS) $(BOOST_THREAD_LIB) $(BOOST_ odr_dabmod_SOURCES = src/DabMod.cpp \ src/PcDebug.h \ src/Socket.h \ + src/Socket.cpp \ src/DabModulator.cpp \ src/DabModulator.h \ src/Buffer.cpp \ diff --git a/src/DabMod.cpp b/src/DabMod.cpp index 90751f6..80dc6b4 100644 --- a/src/DabMod.cpp +++ b/src/DabMod.cpp @@ -528,8 +528,12 @@ static run_modulator_state_t run_modulator(modulator_data& m) */ } #endif // defined(HAVE_ZEROMQ) - // No need to handle the TCP input in a special way to get SIGINT working, - // because recv() will return with EINTR. + else if (dynamic_pointer_cast(m.inputReader)) { + /* Same as for ZeroMQ */ + } + else { + throw logic_error("Unhandled framesize==0!"); + } } else { etiLog.level(error) << "Input read error."; diff --git a/src/InputReader.h b/src/InputReader.h index 5b9e80a..84f6835 100644 --- a/src/InputReader.h +++ b/src/InputReader.h @@ -40,6 +40,7 @@ # include "ThreadsafeQueue.h" #endif #include "Log.h" +#include "Socket.h" #include #define INVALID_SOCKET -1 @@ -129,11 +130,6 @@ class InputFileReader : public InputReader class InputTcpReader : public InputReader { public: - InputTcpReader(); - InputTcpReader(const InputTcpReader& other) = delete; - InputTcpReader& operator=(const InputTcpReader& other) = delete; - virtual ~InputTcpReader(); - // Endpoint is either host:port or tcp://host:port void Open(const std::string& endpoint); @@ -146,7 +142,7 @@ class InputTcpReader : public InputReader virtual void PrintInfo() const; private: - int m_sock = INVALID_SOCKET; + TCPClient m_tcpclient; std::string m_uri; }; diff --git a/src/InputTcpReader.cpp b/src/InputTcpReader.cpp index 9a93ad1..e48e258 100644 --- a/src/InputTcpReader.cpp +++ b/src/InputTcpReader.cpp @@ -3,7 +3,7 @@ Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2016 + Copyright (C) 2018 Matthias P. Braendli, matthias.braendli@mpb.li http://www.opendigitalradio.org @@ -34,24 +34,6 @@ #include "Utils.h" #include #include -#include -#include -#include -#include - -InputTcpReader::InputTcpReader() -{ - if ((m_sock = socket(PF_INET, SOCK_STREAM, 0)) == INVALID_SOCKET) { - throw std::runtime_error("Can't create TCP socket"); - } -} - -InputTcpReader::~InputTcpReader() -{ - if (m_sock != INVALID_SOCKET) { - close(m_sock); - } -} void InputTcpReader::Open(const std::string& endpoint) { @@ -79,35 +61,7 @@ void InputTcpReader::Open(const std::string& endpoint) hostname = hostname.substr(0, colon_pos); - struct sockaddr_in addr; - addr.sin_family = PF_INET; - addr.sin_addr.s_addr = htons(INADDR_ANY); - addr.sin_port = htons(port); - - hostent *host = gethostbyname(hostname.c_str()); - if (host) { - addr.sin_addr = *(in_addr *)(host->h_addr); - } - else { - std::stringstream ss; - ss << "Could not resolve hostname " << hostname << ": " << strerror(errno); - throw std::runtime_error(ss.str()); - } - - /* Set recv timeout of 30s, see socket(7) for details */ - struct timeval tv = {0}; - tv.tv_sec = 30; - if (setsockopt(m_sock, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(struct timeval)) == -1) { - std::stringstream ss; - ss << "Could not set socket recv timeout: " << strerror(errno); - throw std::runtime_error(ss.str()); - } - - if (connect(m_sock, (struct sockaddr*)&addr, sizeof(addr)) == -1) { - std::stringstream ss; - ss << "Could not connect to " << hostname << ":" << port << " :" << strerror(errno); - throw std::runtime_error(ss.str()); - } + m_tcpclient.connect(hostname, port); m_uri = endpoint; } @@ -117,16 +71,16 @@ int InputTcpReader::GetNextFrame(void* buffer) uint8_t* buf = (uint8_t*)buffer; const size_t framesize = 6144; + const int timeout_ms = 8000; - ssize_t r = recv(m_sock, buf, framesize, MSG_WAITALL); + ssize_t ret = m_tcpclient.recv(buf, framesize, MSG_WAITALL, timeout_ms); - if (r == -1) { - std::stringstream ss; - ss << "Could not receive from socket :" << strerror(errno); - throw std::runtime_error(ss.str()); + if (ret == 0) { + etiLog.level(debug) << "TCP input auto reconnect"; + std::this_thread::sleep_for(std::chrono::seconds(1)); } - return r; + return ret; } void InputTcpReader::PrintInfo() const diff --git a/src/Socket.cpp b/src/Socket.cpp new file mode 100644 index 0000000..08cda68 --- /dev/null +++ b/src/Socket.cpp @@ -0,0 +1,275 @@ +/* + Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010 Her Majesty the + Queen in Right of Canada (Communications Research Center Canada) + + Copyright (C) 2018 + Matthias P. Braendli, matthias.braendli@mpb.li + + http://opendigitalradio.org +*/ + +/* + This file is part of ODR-DabMod. + + ODR-DabMod is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as + published by the Free Software Foundation, either version 3 of the + License, or (at your option) any later version. + + ODR-DabMod is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with ODR-DabMod. If not, see . + */ + +#include "Socket.h" +#include "Log.h" +#include + +TCPSocket::TCPSocket() +{ + if ((m_sock = socket(PF_INET, SOCK_STREAM, 0)) < 0) { + throw std::runtime_error("Can't create TCP socket"); + } + +#if defined(HAVE_SO_NOSIGPIPE) + int val = 1; + if (setsockopt(m_sock, SOL_SOCKET, SO_NOSIGPIPE, + &val, sizeof(val)) < 0) { + throw std::runtime_error("Can't set SO_NOSIGPIPE"); + } +#endif +} + +TCPSocket::~TCPSocket() +{ + if (m_sock != -1) { + ::close(m_sock); + } +} + +TCPSocket::TCPSocket(TCPSocket&& other) +{ + m_sock = other.m_sock; + + if (other.m_sock != -1) { + other.m_sock = -1; + } +} + +TCPSocket& TCPSocket::operator=(TCPSocket&& other) +{ + m_sock = other.m_sock; + + if (other.m_sock != -1) { + other.m_sock = -1; + } + + return *this; +} + +bool TCPSocket::valid() const +{ + return m_sock != -1; +} + +void TCPSocket::connect(const std::string& hostname, int port) +{ + struct sockaddr_in addr; + addr.sin_family = PF_INET; + addr.sin_addr.s_addr = htons(INADDR_ANY); + addr.sin_port = htons(port); + + hostent *host = gethostbyname(hostname.c_str()); + if (host) { + addr.sin_addr = *(in_addr *)(host->h_addr); + } + else { + std::string errstr(strerror(errno)); + throw std::runtime_error( + "could not resolve hostname " + + hostname + ":" + std::to_string(port) + + " : " + errstr); + } + + int ret = ::connect(m_sock, (struct sockaddr*)&addr, sizeof(addr)); + if (ret == -1 and errno != EINPROGRESS) { + std::string errstr(strerror(errno)); + throw std::runtime_error( + "could not connect to " + + hostname + ":" + std::to_string(port) + + " : " + errstr); + } +} + +void TCPSocket::listen(int port) +{ + struct sockaddr_in addr; + addr.sin_family = AF_INET; + addr.sin_port = htons(port); + addr.sin_addr.s_addr = htonl(INADDR_ANY); + + const int reuse = 1; + if (setsockopt(m_sock, SOL_SOCKET, SO_REUSEADDR, + &reuse, sizeof(reuse)) < 0) { + throw std::runtime_error("Can't reuse address for TCP socket"); + } + + if (::bind(m_sock, (struct sockaddr*)&addr, sizeof(addr)) < 0) { + close(); + throw std::runtime_error("Can't bind TCP socket"); + } + + if (::listen(m_sock, 1) < 0) { + close(); + m_sock = -1; + throw std::runtime_error("Can't listen TCP socket"); + } + +} + +void TCPSocket::close() +{ + ::close(m_sock); + m_sock = -1; +} + +TCPSocket TCPSocket::accept_with_timeout(int timeout_ms, struct sockaddr_in *client) +{ + struct pollfd fds[1]; + fds[0].fd = m_sock; + fds[0].events = POLLIN; + + int retval = poll(fds, 1, timeout_ms); + + if (retval == -1) { + std::string errstr(strerror(errno)); + throw std::runtime_error("TCP Socket accept error: " + errstr); + } + else if (retval > 0) { + socklen_t client_len = sizeof(struct sockaddr_in); + int sockfd = accept(m_sock, (struct sockaddr*)&client, &client_len); + TCPSocket s(sockfd); + return s; + } + else { + TCPSocket s(-1); + return s; + } +} + +ssize_t TCPSocket::sendall(const void *buffer, size_t buflen) +{ + uint8_t *buf = (uint8_t*)buffer; + while (buflen > 0) { + /* On Linux, the MSG_NOSIGNAL flag ensures that the process + * would not receive a SIGPIPE and die. + * Other systems have SO_NOSIGPIPE set on the socket for the + * same effect. */ +#if defined(HAVE_MSG_NOSIGNAL) + const int flags = MSG_NOSIGNAL; +#else + const int flags = 0; +#endif + ssize_t sent = ::send(m_sock, buf, buflen, flags); + if (sent < 0) { + return -1; + } + else { + buf += sent; + buflen -= sent; + } + } + return buflen; +} + +ssize_t TCPSocket::recv(void *buffer, size_t length, int flags) +{ + ssize_t ret = ::recv(m_sock, buffer, length, flags); + if (ret == -1) { + std::string errstr(strerror(errno)); + throw std::runtime_error("TCP receive error: " + errstr); + } + return ret; +} + +ssize_t TCPSocket::recv(void *buffer, size_t length, int flags, int timeout_ms) +{ + struct pollfd fds[1]; + fds[0].fd = m_sock; + fds[0].events = POLLIN; + + int retval = poll(fds, 1, timeout_ms); + + if (retval == -1 and errno == EINTR) { + throw Interrupted(); + } + else if (retval == -1) { + std::string errstr(strerror(errno)); + throw std::runtime_error("TCP receive with poll() error: " + errstr); + } + else if (retval > 0 and (fds[0].revents | POLLIN)) { + ssize_t ret = ::recv(m_sock, buffer, length, flags); + if (ret == -1) { + if (errno == ECONNREFUSED) { + return 0; + } + std::string errstr(strerror(errno)); + throw std::runtime_error("TCP receive after poll() error: " + errstr); + } + return ret; + } + else { + throw Timeout(); + } +} + +TCPSocket::TCPSocket(int sockfd) { + m_sock = sockfd; +} + +void TCPClient::connect(const std::string& hostname, int port) +{ + m_hostname = hostname; + m_port = port; + reconnect(); +} + +ssize_t TCPClient::recv(void *buffer, size_t length, int flags, int timeout_ms) +{ + try { + ssize_t ret = m_sock.recv(buffer, length, flags, timeout_ms); + + if (ret == 0) { + m_sock.close(); + + TCPSocket newsock; + m_sock = std::move(newsock); + reconnect(); + } + + return ret; + } + catch (const TCPSocket::Interrupted&) { + return -1; + } + catch (const TCPSocket::Timeout&) { + return 0; + } + + return 0; +} + +void TCPClient::reconnect() +{ + int flags = fcntl(m_sock.m_sock, F_GETFL); + if (fcntl(m_sock.m_sock, F_SETFL, flags | O_NONBLOCK) == -1) { + std::string errstr(strerror(errno)); + throw std::runtime_error("TCP: Could not set O_NONBLOCK: " + errstr); + } + + m_sock.connect(m_hostname, m_port); +} diff --git a/src/Socket.h b/src/Socket.h index 392e758..14c5cbe 100644 --- a/src/Socket.h +++ b/src/Socket.h @@ -2,7 +2,7 @@ Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010 Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2017 + Copyright (C) 2018 Matthias P. Braendli, matthias.braendli@mpb.li http://opendigitalradio.org @@ -34,150 +34,71 @@ DESCRIPTION: # include #endif -#include -#include #include #include -#include -#include +#include +#include +#include #include #include +#include +#include +#include +#include class TCPSocket { public: - TCPSocket() { - if ((m_sock = socket(PF_INET, SOCK_STREAM, 0)) < 0) { - throw std::runtime_error("Can't create TCP socket"); - } - -#if defined(HAVE_SO_NOSIGPIPE) - int val = 1; - if (setsockopt(m_sock, SOL_SOCKET, SO_NOSIGPIPE, - &val, sizeof(val)) < 0) { - throw std::runtime_error("Can't set SO_NOSIGPIPE"); - } -#endif - } - - ~TCPSocket() { - if (m_sock != -1) { - ::close(m_sock); - } - } - + TCPSocket(); + ~TCPSocket(); TCPSocket(const TCPSocket& other) = delete; TCPSocket& operator=(const TCPSocket& other) = delete; - TCPSocket(TCPSocket&& other) { - m_sock = other.m_sock; - - if (other.m_sock != -1) { - other.m_sock = -1; - } - } - - TCPSocket& operator=(TCPSocket&& other) - { - m_sock = other.m_sock; - - if (other.m_sock != -1) { - other.m_sock = -1; - } - - return *this; - } - - bool valid(void) const { - return m_sock != -1; - } - - void listen(int port) { - struct sockaddr_in addr; - addr.sin_family = AF_INET; - addr.sin_port = htons(port); - addr.sin_addr.s_addr = htonl(INADDR_ANY); - - const int reuse = 1; - if (setsockopt(m_sock, SOL_SOCKET, SO_REUSEADDR, - &reuse, sizeof(reuse)) < 0) { - throw std::runtime_error("Can't reuse address for TCP socket"); - } - - if (::bind(m_sock, (struct sockaddr*)&addr, sizeof(addr)) < 0) { - close(); - throw std::runtime_error("Can't bind TCP socket"); - } - - if (::listen(m_sock, 1) < 0) { - close(); - m_sock = -1; - throw std::runtime_error("Can't listen TCP socket"); - } - - } - - void close(void) { - ::close(m_sock); - m_sock = -1; - } - - TCPSocket accept_with_timeout(int timeout_ms, struct sockaddr_in *client) - { - struct pollfd fds[1]; - fds[0].fd = m_sock; - fds[0].events = POLLIN | POLLOUT; - - int retval = poll(fds, 1, timeout_ms); - - if (retval == -1) { - throw std::runtime_error("TCP Socket accept error: " + std::to_string(errno)); - } - else if (retval) { - socklen_t client_len = sizeof(struct sockaddr_in); - int sockfd = accept(m_sock, (struct sockaddr*)&client, &client_len); - TCPSocket s(sockfd); - return s; - } - else { - TCPSocket s(-1); - return s; - } - } - - ssize_t sendall(const void *buffer, size_t buflen) - { - uint8_t *buf = (uint8_t*)buffer; - while (buflen > 0) { - /* On Linux, the MSG_NOSIGNAL flag ensures that the process - * would not receive a SIGPIPE and die. - * Other systems have SO_NOSIGPIPE set on the socket for the - * same effect. */ -#if defined(HAVE_MSG_NOSIGNAL) - const int flags = MSG_NOSIGNAL; -#else - const int flags = 0; -#endif - ssize_t sent = ::send(m_sock, buf, buflen, flags); - if (sent < 0) { - return -1; - } - else { - buf += sent; - buflen -= sent; - } - } - return buflen; - } - - ssize_t recv(void *buffer, size_t length, int flags) - { - return ::recv(m_sock, buffer, length, flags); - } + TCPSocket(TCPSocket&& other); + TCPSocket& operator=(TCPSocket&& other); - private: - explicit TCPSocket(int sockfd) { - m_sock = sockfd; - } + bool valid(void) const; + void connect(const std::string& hostname, int port); + void listen(int port); + void close(void); + + /* throws a runtime_error on failure, an invalid socket on timeout */ + TCPSocket accept_with_timeout(int timeout_ms, struct sockaddr_in *client); + /* returns -1 on error */ + ssize_t sendall(const void *buffer, size_t buflen); + + /* Returns number of bytes read, 0 on disconnect. Throws a + * runtime_error on error */ + ssize_t recv(void *buffer, size_t length, int flags); + + class Timeout {}; + class Interrupted {}; + /* Returns number of bytes read, 0 on disconnect or refused connection. + * Throws a Timeout on timeout, Interrupted on EINTR, a runtime_error + * on error + */ + ssize_t recv(void *buffer, size_t length, int flags, int timeout_ms); + + private: + explicit TCPSocket(int sockfd); int m_sock = -1; + + friend class TCPClient; +}; + +/* Implement a TCP receiver that auto-reconnects on errors */ +class TCPClient { + public: + void connect(const std::string& hostname, int port); + + /* Returns numer of bytes read, 0 on auto-reconnect, -1 + * on interruption. + * Throws a runtime_error on error */ + ssize_t recv(void *buffer, size_t length, int flags, int timeout_ms); + + private: + void reconnect(void); + TCPSocket m_sock; + std::string m_hostname; + int m_port; }; -- cgit v1.2.3