From c8c9d5abf8238c3531ecae8fb272fdf5bbffd336 Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Mon, 8 Jul 2019 12:08:44 +0200 Subject: Unify Socket library with other mmbTools --- src/EtiReader.cpp | 4 +- src/EtiReader.h | 9 +- src/InputReader.h | 2 +- src/Socket.cpp | 275 ------------------------------------------------ src/Socket.h | 104 ------------------ src/ThreadsafeQueue.h | 178 ------------------------------- src/output/Feedback.cpp | 7 +- 7 files changed, 9 insertions(+), 570 deletions(-) delete mode 100644 src/Socket.cpp delete mode 100644 src/Socket.h delete mode 100644 src/ThreadsafeQueue.h (limited to 'src') diff --git a/src/EtiReader.cpp b/src/EtiReader.cpp index 94c362a..93008bb 100644 --- a/src/EtiReader.cpp +++ b/src/EtiReader.cpp @@ -2,7 +2,7 @@ Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010, 2011 Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2018 + Copyright (C) 2019 Matthias P. Braendli, matthias.braendli@mpb.li http://opendigitalradio.org @@ -644,7 +644,7 @@ bool EdiTransport::rxPacket() return true; } } - catch (const TCPSocket::Timeout&) { + catch (const Socket::TCPSocket::Timeout&) { return false; } } diff --git a/src/EtiReader.h b/src/EtiReader.h index 38f7903..8548654 100644 --- a/src/EtiReader.h +++ b/src/EtiReader.h @@ -2,7 +2,7 @@ Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010, 2011 Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2018 + Copyright (C) 2019 Matthias P. Braendli, matthias.braendli@mpb.li http://opendigitalradio.org @@ -38,9 +38,6 @@ #include "SubchannelSource.h" #include "TimestampDecoder.h" #include "lib/edi/ETIDecoder.hpp" -#ifdef HAVE_EDI -# include "lib/UdpSocket.h" -#endif #include #include @@ -211,9 +208,9 @@ class EdiTransport { enum class Proto { UDP, TCP }; Proto m_proto; - UdpReceiver m_udp_rx; + Socket::UDPReceiver m_udp_rx; std::vector m_tcpbuffer; - TCPClient m_tcpclient; + Socket::TCPClient m_tcpclient; EdiDecoder::ETIDecoder& m_decoder; }; #endif diff --git a/src/InputReader.h b/src/InputReader.h index 63451e5..1a63cea 100644 --- a/src/InputReader.h +++ b/src/InputReader.h @@ -144,7 +144,7 @@ class InputTcpReader : public InputReader virtual std::string GetPrintableInfo() const override; private: - TCPClient m_tcpclient; + Socket::TCPClient m_tcpclient; std::string m_uri; }; diff --git a/src/Socket.cpp b/src/Socket.cpp deleted file mode 100644 index 08cda68..0000000 --- a/src/Socket.cpp +++ /dev/null @@ -1,275 +0,0 @@ -/* - Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010 Her Majesty the - Queen in Right of Canada (Communications Research Center Canada) - - Copyright (C) 2018 - Matthias P. Braendli, matthias.braendli@mpb.li - - http://opendigitalradio.org -*/ - -/* - This file is part of ODR-DabMod. - - ODR-DabMod is free software: you can redistribute it and/or modify - it under the terms of the GNU General Public License as - published by the Free Software Foundation, either version 3 of the - License, or (at your option) any later version. - - ODR-DabMod is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with ODR-DabMod. If not, see . - */ - -#include "Socket.h" -#include "Log.h" -#include - -TCPSocket::TCPSocket() -{ - if ((m_sock = socket(PF_INET, SOCK_STREAM, 0)) < 0) { - throw std::runtime_error("Can't create TCP socket"); - } - -#if defined(HAVE_SO_NOSIGPIPE) - int val = 1; - if (setsockopt(m_sock, SOL_SOCKET, SO_NOSIGPIPE, - &val, sizeof(val)) < 0) { - throw std::runtime_error("Can't set SO_NOSIGPIPE"); - } -#endif -} - -TCPSocket::~TCPSocket() -{ - if (m_sock != -1) { - ::close(m_sock); - } -} - -TCPSocket::TCPSocket(TCPSocket&& other) -{ - m_sock = other.m_sock; - - if (other.m_sock != -1) { - other.m_sock = -1; - } -} - -TCPSocket& TCPSocket::operator=(TCPSocket&& other) -{ - m_sock = other.m_sock; - - if (other.m_sock != -1) { - other.m_sock = -1; - } - - return *this; -} - -bool TCPSocket::valid() const -{ - return m_sock != -1; -} - -void TCPSocket::connect(const std::string& hostname, int port) -{ - struct sockaddr_in addr; - addr.sin_family = PF_INET; - addr.sin_addr.s_addr = htons(INADDR_ANY); - addr.sin_port = htons(port); - - hostent *host = gethostbyname(hostname.c_str()); - if (host) { - addr.sin_addr = *(in_addr *)(host->h_addr); - } - else { - std::string errstr(strerror(errno)); - throw std::runtime_error( - "could not resolve hostname " + - hostname + ":" + std::to_string(port) + - " : " + errstr); - } - - int ret = ::connect(m_sock, (struct sockaddr*)&addr, sizeof(addr)); - if (ret == -1 and errno != EINPROGRESS) { - std::string errstr(strerror(errno)); - throw std::runtime_error( - "could not connect to " + - hostname + ":" + std::to_string(port) + - " : " + errstr); - } -} - -void TCPSocket::listen(int port) -{ - struct sockaddr_in addr; - addr.sin_family = AF_INET; - addr.sin_port = htons(port); - addr.sin_addr.s_addr = htonl(INADDR_ANY); - - const int reuse = 1; - if (setsockopt(m_sock, SOL_SOCKET, SO_REUSEADDR, - &reuse, sizeof(reuse)) < 0) { - throw std::runtime_error("Can't reuse address for TCP socket"); - } - - if (::bind(m_sock, (struct sockaddr*)&addr, sizeof(addr)) < 0) { - close(); - throw std::runtime_error("Can't bind TCP socket"); - } - - if (::listen(m_sock, 1) < 0) { - close(); - m_sock = -1; - throw std::runtime_error("Can't listen TCP socket"); - } - -} - -void TCPSocket::close() -{ - ::close(m_sock); - m_sock = -1; -} - -TCPSocket TCPSocket::accept_with_timeout(int timeout_ms, struct sockaddr_in *client) -{ - struct pollfd fds[1]; - fds[0].fd = m_sock; - fds[0].events = POLLIN; - - int retval = poll(fds, 1, timeout_ms); - - if (retval == -1) { - std::string errstr(strerror(errno)); - throw std::runtime_error("TCP Socket accept error: " + errstr); - } - else if (retval > 0) { - socklen_t client_len = sizeof(struct sockaddr_in); - int sockfd = accept(m_sock, (struct sockaddr*)&client, &client_len); - TCPSocket s(sockfd); - return s; - } - else { - TCPSocket s(-1); - return s; - } -} - -ssize_t TCPSocket::sendall(const void *buffer, size_t buflen) -{ - uint8_t *buf = (uint8_t*)buffer; - while (buflen > 0) { - /* On Linux, the MSG_NOSIGNAL flag ensures that the process - * would not receive a SIGPIPE and die. - * Other systems have SO_NOSIGPIPE set on the socket for the - * same effect. */ -#if defined(HAVE_MSG_NOSIGNAL) - const int flags = MSG_NOSIGNAL; -#else - const int flags = 0; -#endif - ssize_t sent = ::send(m_sock, buf, buflen, flags); - if (sent < 0) { - return -1; - } - else { - buf += sent; - buflen -= sent; - } - } - return buflen; -} - -ssize_t TCPSocket::recv(void *buffer, size_t length, int flags) -{ - ssize_t ret = ::recv(m_sock, buffer, length, flags); - if (ret == -1) { - std::string errstr(strerror(errno)); - throw std::runtime_error("TCP receive error: " + errstr); - } - return ret; -} - -ssize_t TCPSocket::recv(void *buffer, size_t length, int flags, int timeout_ms) -{ - struct pollfd fds[1]; - fds[0].fd = m_sock; - fds[0].events = POLLIN; - - int retval = poll(fds, 1, timeout_ms); - - if (retval == -1 and errno == EINTR) { - throw Interrupted(); - } - else if (retval == -1) { - std::string errstr(strerror(errno)); - throw std::runtime_error("TCP receive with poll() error: " + errstr); - } - else if (retval > 0 and (fds[0].revents | POLLIN)) { - ssize_t ret = ::recv(m_sock, buffer, length, flags); - if (ret == -1) { - if (errno == ECONNREFUSED) { - return 0; - } - std::string errstr(strerror(errno)); - throw std::runtime_error("TCP receive after poll() error: " + errstr); - } - return ret; - } - else { - throw Timeout(); - } -} - -TCPSocket::TCPSocket(int sockfd) { - m_sock = sockfd; -} - -void TCPClient::connect(const std::string& hostname, int port) -{ - m_hostname = hostname; - m_port = port; - reconnect(); -} - -ssize_t TCPClient::recv(void *buffer, size_t length, int flags, int timeout_ms) -{ - try { - ssize_t ret = m_sock.recv(buffer, length, flags, timeout_ms); - - if (ret == 0) { - m_sock.close(); - - TCPSocket newsock; - m_sock = std::move(newsock); - reconnect(); - } - - return ret; - } - catch (const TCPSocket::Interrupted&) { - return -1; - } - catch (const TCPSocket::Timeout&) { - return 0; - } - - return 0; -} - -void TCPClient::reconnect() -{ - int flags = fcntl(m_sock.m_sock, F_GETFL); - if (fcntl(m_sock.m_sock, F_SETFL, flags | O_NONBLOCK) == -1) { - std::string errstr(strerror(errno)); - throw std::runtime_error("TCP: Could not set O_NONBLOCK: " + errstr); - } - - m_sock.connect(m_hostname, m_port); -} diff --git a/src/Socket.h b/src/Socket.h deleted file mode 100644 index 14c5cbe..0000000 --- a/src/Socket.h +++ /dev/null @@ -1,104 +0,0 @@ -/* - Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010 Her Majesty the - Queen in Right of Canada (Communications Research Center Canada) - - Copyright (C) 2018 - Matthias P. Braendli, matthias.braendli@mpb.li - - http://opendigitalradio.org - -DESCRIPTION: - Abstraction for sockets. -*/ - -/* - This file is part of ODR-DabMod. - - ODR-DabMod is free software: you can redistribute it and/or modify - it under the terms of the GNU General Public License as - published by the Free Software Foundation, either version 3 of the - License, or (at your option) any later version. - - ODR-DabMod is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with ODR-DabMod. If not, see . - */ - -#pragma once - -#ifdef HAVE_CONFIG_H -# include -#endif - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -class TCPSocket { - public: - TCPSocket(); - ~TCPSocket(); - TCPSocket(const TCPSocket& other) = delete; - TCPSocket& operator=(const TCPSocket& other) = delete; - TCPSocket(TCPSocket&& other); - TCPSocket& operator=(TCPSocket&& other); - - bool valid(void) const; - void connect(const std::string& hostname, int port); - void listen(int port); - void close(void); - - /* throws a runtime_error on failure, an invalid socket on timeout */ - TCPSocket accept_with_timeout(int timeout_ms, struct sockaddr_in *client); - - /* returns -1 on error */ - ssize_t sendall(const void *buffer, size_t buflen); - - /* Returns number of bytes read, 0 on disconnect. Throws a - * runtime_error on error */ - ssize_t recv(void *buffer, size_t length, int flags); - - class Timeout {}; - class Interrupted {}; - /* Returns number of bytes read, 0 on disconnect or refused connection. - * Throws a Timeout on timeout, Interrupted on EINTR, a runtime_error - * on error - */ - ssize_t recv(void *buffer, size_t length, int flags, int timeout_ms); - - private: - explicit TCPSocket(int sockfd); - int m_sock = -1; - - friend class TCPClient; -}; - -/* Implement a TCP receiver that auto-reconnects on errors */ -class TCPClient { - public: - void connect(const std::string& hostname, int port); - - /* Returns numer of bytes read, 0 on auto-reconnect, -1 - * on interruption. - * Throws a runtime_error on error */ - ssize_t recv(void *buffer, size_t length, int flags, int timeout_ms); - - private: - void reconnect(void); - TCPSocket m_sock; - std::string m_hostname; - int m_port; -}; - diff --git a/src/ThreadsafeQueue.h b/src/ThreadsafeQueue.h deleted file mode 100644 index ee26ca0..0000000 --- a/src/ThreadsafeQueue.h +++ /dev/null @@ -1,178 +0,0 @@ -/* - Copyright (C) 2007, 2008, 2009, 2010, 2011 Her Majesty the Queen in - Right of Canada (Communications Research Center Canada) - - Copyright (C) 2018 - Matthias P. Braendli, matthias.braendli@mpb.li - - An implementation for a threadsafe queue, depends on C++11 - - When creating a ThreadsafeQueue, one can specify the minimal number - of elements it must contain before it is possible to take one - element out. - */ -/* - This file is part of ODR-DabMod. - - ODR-DabMod is free software: you can redistribute it and/or modify - it under the terms of the GNU General Public License as - published by the Free Software Foundation, either version 3 of the - License, or (at your option) any later version. - - ODR-DabMod is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with ODR-DabMod. If not, see . - */ - -#pragma once - -#include -#include -#include -#include - -/* This queue is meant to be used by two threads. One producer - * that pushes elements into the queue, and one consumer that - * retrieves the elements. - * - * The queue can make the consumer block until an element - * is available, or a wakeup requested. - */ - -/* Class thrown by blocking pop to tell the consumer - * that there's a wakeup requested. */ -class ThreadsafeQueueWakeup {}; - -template -class ThreadsafeQueue -{ -public: - /* Push one element into the queue, and notify another thread that - * might be waiting. - * - * returns the new queue size. - */ - size_t push(T const& val) - { - std::unique_lock lock(the_mutex); - the_queue.push(val); - size_t queue_size = the_queue.size(); - lock.unlock(); - - the_rx_notification.notify_one(); - - return queue_size; - } - - size_t push(T&& val) - { - std::unique_lock lock(the_mutex); - the_queue.emplace(std::move(val)); - size_t queue_size = the_queue.size(); - lock.unlock(); - - the_rx_notification.notify_one(); - - return queue_size; - } - - /* Push one element into the queue, but wait until the - * queue size goes below the threshold. - * - * Notify waiting thread. - * - * returns the new queue size. - */ - size_t push_wait_if_full(T const& val, size_t threshold) - { - std::unique_lock lock(the_mutex); - while (the_queue.size() >= threshold) { - the_tx_notification.wait(lock); - } - the_queue.push(val); - size_t queue_size = the_queue.size(); - lock.unlock(); - - the_rx_notification.notify_one(); - - return queue_size; - } - - /* Trigger a wakeup event on a blocking consumer, which - * will receive a ThreadsafeQueueWakeup exception. - */ - void trigger_wakeup(void) - { - std::unique_lock lock(the_mutex); - wakeup_requested = true; - lock.unlock(); - the_rx_notification.notify_one(); - } - - /* Send a notification for the receiver thread */ - void notify(void) - { - the_rx_notification.notify_one(); - } - - bool empty() const - { - std::unique_lock lock(the_mutex); - return the_queue.empty(); - } - - size_t size() const - { - std::unique_lock lock(the_mutex); - return the_queue.size(); - } - - bool try_pop(T& popped_value) - { - std::unique_lock lock(the_mutex); - if (the_queue.empty()) { - return false; - } - - popped_value = the_queue.front(); - the_queue.pop(); - - lock.unlock(); - the_tx_notification.notify_one(); - - return true; - } - - void wait_and_pop(T& popped_value, size_t prebuffering = 1) - { - std::unique_lock lock(the_mutex); - while (the_queue.size() < prebuffering and - not wakeup_requested) { - the_rx_notification.wait(lock); - } - - if (wakeup_requested) { - wakeup_requested = false; - throw ThreadsafeQueueWakeup(); - } - else { - std::swap(popped_value, the_queue.front()); - the_queue.pop(); - - lock.unlock(); - the_tx_notification.notify_one(); - } - } - -private: - std::queue the_queue; - mutable std::mutex the_mutex; - std::condition_variable the_rx_notification; - std::condition_variable the_tx_notification; - bool wakeup_requested = false; -}; - diff --git a/src/output/Feedback.cpp b/src/output/Feedback.cpp index 17e45bf..88d8319 100644 --- a/src/output/Feedback.cpp +++ b/src/output/Feedback.cpp @@ -200,14 +200,13 @@ void DPDFeedbackServer::ReceiveBurstThread() void DPDFeedbackServer::ServeFeedback() { - TCPSocket m_server_sock; - m_server_sock.listen(m_port); + Socket::TCPSocket m_server_sock; + m_server_sock.listen(m_port, "127.0.0.1"); etiLog.level(info) << "DPD Feedback server listening on port " << m_port; while (m_running) { - struct sockaddr_in client; - TCPSocket client_sock = m_server_sock.accept_with_timeout(1000, &client); + auto client_sock = m_server_sock.accept(1000); if (not m_running) { break; -- cgit v1.2.3