From 7593596f6b21483d5af0a55715065fa2b44c1019 Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Wed, 27 Sep 2017 14:24:49 +0200 Subject: Add TCP Socket listener abstraction and use for UHD Feedback --- Makefile.am | 1 + src/OutputUHDFeedback.cpp | 110 +++++++------------------------ src/OutputUHDFeedback.h | 1 - src/Socket.h | 164 ++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 190 insertions(+), 86 deletions(-) create mode 100644 src/Socket.h diff --git a/Makefile.am b/Makefile.am index 12dfe6e..2d84ab6 100644 --- a/Makefile.am +++ b/Makefile.am @@ -44,6 +44,7 @@ odr_dabmod_CFLAGS = -Wall -Isrc -Ilib \ odr_dabmod_LDADD = $(FFT_LDADD) odr_dabmod_SOURCES = src/DabMod.cpp \ src/PcDebug.h \ + src/Socket.h \ src/porting.c \ src/porting.h \ src/DabModulator.cpp \ diff --git a/src/OutputUHDFeedback.cpp b/src/OutputUHDFeedback.cpp index b370885..056be29 100644 --- a/src/OutputUHDFeedback.cpp +++ b/src/OutputUHDFeedback.cpp @@ -46,6 +46,7 @@ DESCRIPTION: #include #include "OutputUHDFeedback.h" #include "Utils.h" +#include "Socket.h" using namespace std; typedef std::complex complexf; @@ -87,7 +88,9 @@ void OutputUHDFeedback::set_tx_frame( { boost::mutex::scoped_lock lock(burstRequest.mutex); - assert(buf.size() % sizeof(complexf) == 0); + if (buf.size() % sizeof(complexf) != 0) { + throw std::logic_error("Buffer for tx frame has incorrect size"); + } if (burstRequest.state == BurstRequestState::SaveTransmitFrame) { const size_t n = std::min( @@ -183,85 +186,23 @@ void OutputUHDFeedback::ReceiveBurstThread() } } -static int accept_with_timeout(int server_socket, int timeout_ms, struct sockaddr_in *client) -{ - struct pollfd fds[1]; - fds[0].fd = server_socket; - fds[0].events = POLLIN | POLLOUT; - - int retval = poll(fds, 1, timeout_ms); - - if (retval == -1) { - throw std::runtime_error("TCP Socket accept error: " + to_string(errno)); - } - else if (retval) { - socklen_t client_len = sizeof(struct sockaddr_in); - return accept(server_socket, (struct sockaddr*)&client, &client_len); - } - else { - return -2; - } -} - -static ssize_t sendall(int socket, const void *buffer, size_t buflen) -{ - uint8_t *buf = (uint8_t*)buffer; - while (buflen > 0) { - ssize_t sent = send(socket, buf, buflen, 0); - if (sent < 0) { - return -1; - } - else { - buf += sent; - buflen -= sent; - } - } - return buflen; -} - void OutputUHDFeedback::ServeFeedback() { - if ((m_server_sock = socket(PF_INET, SOCK_STREAM, 0)) < 0) { - throw std::runtime_error("Can't create TCP socket"); - } - - struct sockaddr_in addr; - addr.sin_family = AF_INET; - addr.sin_port = htons(m_port); - addr.sin_addr.s_addr = htonl(INADDR_ANY); - - const int reuse = 1; - if (setsockopt(m_server_sock, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse)) - < 0) { - throw std::runtime_error("Can't reuse address for TCP socket"); - } - - if (bind(m_server_sock, (struct sockaddr*)&addr, sizeof(addr)) < 0) { - close(m_server_sock); - throw std::runtime_error("Can't bind TCP socket"); - } - - if (listen(m_server_sock, 1) < 0) { - close(m_server_sock); - throw std::runtime_error("Can't listen TCP socket"); - } + TCPSocket m_server_sock; + m_server_sock.listen(m_port); etiLog.level(info) << "DPD Feedback server listening on port " << m_port; while (m_running) { struct sockaddr_in client; - int client_sock = accept_with_timeout(m_server_sock, 1000, &client); + TCPSocket client_sock = m_server_sock.accept_with_timeout(1000, &client); - if (client_sock == -1) { - close(m_server_sock); + if (not client_sock.valid()) { throw runtime_error("Could not establish new connection"); } - else if (client_sock == -2) { - continue; - } uint8_t request_version = 0; - ssize_t read = recv(client_sock, &request_version, 1, 0); + ssize_t read = client_sock.recv(&request_version, 1, 0); if (!read) break; // done reading if (read < 0) { etiLog.level(info) << @@ -275,7 +216,7 @@ void OutputUHDFeedback::ServeFeedback() } uint32_t num_samples = 0; - read = recv(client_sock, &num_samples, 4, 0); + read = client_sock.recv(&num_samples, 4, 0); if (!read) break; // done reading if (read < 0) { etiLog.level(info) << @@ -308,13 +249,13 @@ void OutputUHDFeedback::ServeFeedback() burstRequest.rx_samples.size() / sizeof(complexf))); uint32_t num_samples_32 = burstRequest.num_samples; - if (sendall(client_sock, &num_samples_32, sizeof(num_samples_32)) < 0) { + if (client_sock.sendall(&num_samples_32, sizeof(num_samples_32)) < 0) { etiLog.level(info) << "DPD Feedback Server Client send num_samples failed"; break; } - if (sendall(client_sock, + if (client_sock.sendall( &burstRequest.tx_second, sizeof(burstRequest.tx_second)) < 0) { etiLog.level(info) << @@ -322,7 +263,7 @@ void OutputUHDFeedback::ServeFeedback() break; } - if (sendall(client_sock, + if (client_sock.sendall( &burstRequest.tx_pps, sizeof(burstRequest.tx_pps)) < 0) { etiLog.level(info) << @@ -332,8 +273,11 @@ void OutputUHDFeedback::ServeFeedback() const size_t frame_bytes = burstRequest.num_samples * sizeof(complexf); - assert(burstRequest.tx_samples.size() >= frame_bytes); - if (sendall(client_sock, + if (burstRequest.tx_samples.size() < frame_bytes) { + throw logic_error("DPD Feedback burstRequest invalid: not enough TX samples"); + } + + if (client_sock.sendall( &burstRequest.tx_samples[0], frame_bytes) < 0) { etiLog.level(info) << @@ -341,7 +285,7 @@ void OutputUHDFeedback::ServeFeedback() break; } - if (sendall(client_sock, + if (client_sock.sendall( &burstRequest.rx_second, sizeof(burstRequest.rx_second)) < 0) { etiLog.level(info) << @@ -349,7 +293,7 @@ void OutputUHDFeedback::ServeFeedback() break; } - if (sendall(client_sock, + if (client_sock.sendall( &burstRequest.rx_pps, sizeof(burstRequest.rx_pps)) < 0) { etiLog.level(info) << @@ -357,16 +301,17 @@ void OutputUHDFeedback::ServeFeedback() break; } - assert(burstRequest.rx_samples.size() >= frame_bytes); - if (sendall(client_sock, + if (burstRequest.rx_samples.size() < frame_bytes) { + throw logic_error("DPD Feedback burstRequest invalid: not enough RX samples"); + } + + if (client_sock.sendall( &burstRequest.rx_samples[0], frame_bytes) < 0) { etiLog.level(info) << "DPD Feedback Server Client send rx_frame failed"; break; } - - close(client_sock); } } @@ -392,11 +337,6 @@ void OutputUHDFeedback::ServeFeedbackThread() } m_running = false; - - if (m_server_sock != -1) { - close(m_server_sock); - m_server_sock = -1; - } } #endif diff --git a/src/OutputUHDFeedback.h b/src/OutputUHDFeedback.h index c68f4c2..da0d487 100644 --- a/src/OutputUHDFeedback.h +++ b/src/OutputUHDFeedback.h @@ -109,7 +109,6 @@ class OutputUHDFeedback { UHDReceiveBurstRequest burstRequest; std::atomic_bool m_running; - int m_server_sock = -1; uint16_t m_port = 0; uint32_t m_sampleRate = 0; uhd::usrp::multi_usrp::sptr m_usrp; diff --git a/src/Socket.h b/src/Socket.h new file mode 100644 index 0000000..1d9c252 --- /dev/null +++ b/src/Socket.h @@ -0,0 +1,164 @@ +/* + Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010 Her Majesty the + Queen in Right of Canada (Communications Research Center Canada) + + Copyright (C) 2017 + Matthias P. Braendli, matthias.braendli@mpb.li + + http://opendigitalradio.org + +DESCRIPTION: + Abstraction for sockets. +*/ + +/* + This file is part of ODR-DabMod. + + ODR-DabMod is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as + published by the Free Software Foundation, either version 3 of the + License, or (at your option) any later version. + + ODR-DabMod is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with ODR-DabMod. If not, see . + */ + +#pragma once + +#ifdef HAVE_CONFIG_H +# include +#endif + +#include +#include +#include +#include +#include +#include +#include + +class TCPSocket { + public: + TCPSocket() { + if ((m_sock = socket(PF_INET, SOCK_STREAM, 0)) < 0) { + throw std::runtime_error("Can't create TCP socket"); + } + } + + ~TCPSocket() { + if (m_sock != -1) { + ::close(m_sock); + } + } + + TCPSocket(const TCPSocket& other) = delete; + TCPSocket& operator=(const TCPSocket& other) = delete; + TCPSocket(TCPSocket&& other) { + m_sock = other.m_sock; + + if (other.m_sock != -1) { + other.m_sock = -1; + } + } + + TCPSocket& operator=(TCPSocket&& other) + { + m_sock = other.m_sock; + + if (other.m_sock != -1) { + other.m_sock = -1; + } + + return *this; + } + + bool valid(void) const { + return m_sock != -1; + } + + void listen(int port) { + struct sockaddr_in addr; + addr.sin_family = AF_INET; + addr.sin_port = htons(port); + addr.sin_addr.s_addr = htonl(INADDR_ANY); + + const int reuse = 1; + if (setsockopt(m_sock, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse)) < 0) { + throw std::runtime_error("Can't reuse address for TCP socket"); + } + + if (bind(m_sock, (struct sockaddr*)&addr, sizeof(addr)) < 0) { + close(); + throw std::runtime_error("Can't bind TCP socket"); + } + + if (::listen(m_sock, 1) < 0) { + close(); + m_sock = -1; + throw std::runtime_error("Can't listen TCP socket"); + } + + } + + void close(void) { + ::close(m_sock); + m_sock = -1; + } + + TCPSocket accept_with_timeout(int timeout_ms, struct sockaddr_in *client) + { + struct pollfd fds[1]; + fds[0].fd = m_sock; + fds[0].events = POLLIN | POLLOUT; + + int retval = poll(fds, 1, timeout_ms); + + if (retval == -1) { + throw std::runtime_error("TCP Socket accept error: " + std::to_string(errno)); + } + else if (retval) { + socklen_t client_len = sizeof(struct sockaddr_in); + int sockfd = accept(m_sock, (struct sockaddr*)&client, &client_len); + TCPSocket s(sockfd); + return s; + } + else { + TCPSocket s(-1); + return s; + } + } + + ssize_t sendall(const void *buffer, size_t buflen) + { + uint8_t *buf = (uint8_t*)buffer; + while (buflen > 0) { + ssize_t sent = send(m_sock, buf, buflen, 0); + if (sent < 0) { + return -1; + } + else { + buf += sent; + buflen -= sent; + } + } + return buflen; + } + + ssize_t recv(void *buffer, size_t length, int flags) + { + return ::recv(m_sock, buffer, length, flags); + } + + private: + explicit TCPSocket(int sockfd) { + m_sock = sockfd; + } + + int m_sock = -1; +}; + -- cgit v1.2.3