summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMatthias P. Braendli <matthias.braendli@mpb.li>2017-09-27 14:24:49 +0200
committerMatthias P. Braendli <matthias.braendli@mpb.li>2017-09-28 13:43:03 +0200
commit7593596f6b21483d5af0a55715065fa2b44c1019 (patch)
treef68b99f909761390249db10a41e940183838564a
parent0cfca99b6f0a9b03f148e30c7c44e1f32b82baa1 (diff)
downloaddabmod-7593596f6b21483d5af0a55715065fa2b44c1019.tar.gz
dabmod-7593596f6b21483d5af0a55715065fa2b44c1019.tar.bz2
dabmod-7593596f6b21483d5af0a55715065fa2b44c1019.zip
Add TCP Socket listener abstraction and use for UHD Feedback
-rw-r--r--Makefile.am1
-rw-r--r--src/OutputUHDFeedback.cpp110
-rw-r--r--src/OutputUHDFeedback.h1
-rw-r--r--src/Socket.h164
4 files changed, 190 insertions, 86 deletions
diff --git a/Makefile.am b/Makefile.am
index 12dfe6e..2d84ab6 100644
--- a/Makefile.am
+++ b/Makefile.am
@@ -44,6 +44,7 @@ odr_dabmod_CFLAGS = -Wall -Isrc -Ilib \
odr_dabmod_LDADD = $(FFT_LDADD)
odr_dabmod_SOURCES = src/DabMod.cpp \
src/PcDebug.h \
+ src/Socket.h \
src/porting.c \
src/porting.h \
src/DabModulator.cpp \
diff --git a/src/OutputUHDFeedback.cpp b/src/OutputUHDFeedback.cpp
index b370885..056be29 100644
--- a/src/OutputUHDFeedback.cpp
+++ b/src/OutputUHDFeedback.cpp
@@ -46,6 +46,7 @@ DESCRIPTION:
#include <boost/date_time/posix_time/posix_time.hpp>
#include "OutputUHDFeedback.h"
#include "Utils.h"
+#include "Socket.h"
using namespace std;
typedef std::complex<float> complexf;
@@ -87,7 +88,9 @@ void OutputUHDFeedback::set_tx_frame(
{
boost::mutex::scoped_lock lock(burstRequest.mutex);
- assert(buf.size() % sizeof(complexf) == 0);
+ if (buf.size() % sizeof(complexf) != 0) {
+ throw std::logic_error("Buffer for tx frame has incorrect size");
+ }
if (burstRequest.state == BurstRequestState::SaveTransmitFrame) {
const size_t n = std::min(
@@ -183,85 +186,23 @@ void OutputUHDFeedback::ReceiveBurstThread()
}
}
-static int accept_with_timeout(int server_socket, int timeout_ms, struct sockaddr_in *client)
-{
- struct pollfd fds[1];
- fds[0].fd = server_socket;
- fds[0].events = POLLIN | POLLOUT;
-
- int retval = poll(fds, 1, timeout_ms);
-
- if (retval == -1) {
- throw std::runtime_error("TCP Socket accept error: " + to_string(errno));
- }
- else if (retval) {
- socklen_t client_len = sizeof(struct sockaddr_in);
- return accept(server_socket, (struct sockaddr*)&client, &client_len);
- }
- else {
- return -2;
- }
-}
-
-static ssize_t sendall(int socket, const void *buffer, size_t buflen)
-{
- uint8_t *buf = (uint8_t*)buffer;
- while (buflen > 0) {
- ssize_t sent = send(socket, buf, buflen, 0);
- if (sent < 0) {
- return -1;
- }
- else {
- buf += sent;
- buflen -= sent;
- }
- }
- return buflen;
-}
-
void OutputUHDFeedback::ServeFeedback()
{
- if ((m_server_sock = socket(PF_INET, SOCK_STREAM, 0)) < 0) {
- throw std::runtime_error("Can't create TCP socket");
- }
-
- struct sockaddr_in addr;
- addr.sin_family = AF_INET;
- addr.sin_port = htons(m_port);
- addr.sin_addr.s_addr = htonl(INADDR_ANY);
-
- const int reuse = 1;
- if (setsockopt(m_server_sock, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse))
- < 0) {
- throw std::runtime_error("Can't reuse address for TCP socket");
- }
-
- if (bind(m_server_sock, (struct sockaddr*)&addr, sizeof(addr)) < 0) {
- close(m_server_sock);
- throw std::runtime_error("Can't bind TCP socket");
- }
-
- if (listen(m_server_sock, 1) < 0) {
- close(m_server_sock);
- throw std::runtime_error("Can't listen TCP socket");
- }
+ TCPSocket m_server_sock;
+ m_server_sock.listen(m_port);
etiLog.level(info) << "DPD Feedback server listening on port " << m_port;
while (m_running) {
struct sockaddr_in client;
- int client_sock = accept_with_timeout(m_server_sock, 1000, &client);
+ TCPSocket client_sock = m_server_sock.accept_with_timeout(1000, &client);
- if (client_sock == -1) {
- close(m_server_sock);
+ if (not client_sock.valid()) {
throw runtime_error("Could not establish new connection");
}
- else if (client_sock == -2) {
- continue;
- }
uint8_t request_version = 0;
- ssize_t read = recv(client_sock, &request_version, 1, 0);
+ ssize_t read = client_sock.recv(&request_version, 1, 0);
if (!read) break; // done reading
if (read < 0) {
etiLog.level(info) <<
@@ -275,7 +216,7 @@ void OutputUHDFeedback::ServeFeedback()
}
uint32_t num_samples = 0;
- read = recv(client_sock, &num_samples, 4, 0);
+ read = client_sock.recv(&num_samples, 4, 0);
if (!read) break; // done reading
if (read < 0) {
etiLog.level(info) <<
@@ -308,13 +249,13 @@ void OutputUHDFeedback::ServeFeedback()
burstRequest.rx_samples.size() / sizeof(complexf)));
uint32_t num_samples_32 = burstRequest.num_samples;
- if (sendall(client_sock, &num_samples_32, sizeof(num_samples_32)) < 0) {
+ if (client_sock.sendall(&num_samples_32, sizeof(num_samples_32)) < 0) {
etiLog.level(info) <<
"DPD Feedback Server Client send num_samples failed";
break;
}
- if (sendall(client_sock,
+ if (client_sock.sendall(
&burstRequest.tx_second,
sizeof(burstRequest.tx_second)) < 0) {
etiLog.level(info) <<
@@ -322,7 +263,7 @@ void OutputUHDFeedback::ServeFeedback()
break;
}
- if (sendall(client_sock,
+ if (client_sock.sendall(
&burstRequest.tx_pps,
sizeof(burstRequest.tx_pps)) < 0) {
etiLog.level(info) <<
@@ -332,8 +273,11 @@ void OutputUHDFeedback::ServeFeedback()
const size_t frame_bytes = burstRequest.num_samples * sizeof(complexf);
- assert(burstRequest.tx_samples.size() >= frame_bytes);
- if (sendall(client_sock,
+ if (burstRequest.tx_samples.size() < frame_bytes) {
+ throw logic_error("DPD Feedback burstRequest invalid: not enough TX samples");
+ }
+
+ if (client_sock.sendall(
&burstRequest.tx_samples[0],
frame_bytes) < 0) {
etiLog.level(info) <<
@@ -341,7 +285,7 @@ void OutputUHDFeedback::ServeFeedback()
break;
}
- if (sendall(client_sock,
+ if (client_sock.sendall(
&burstRequest.rx_second,
sizeof(burstRequest.rx_second)) < 0) {
etiLog.level(info) <<
@@ -349,7 +293,7 @@ void OutputUHDFeedback::ServeFeedback()
break;
}
- if (sendall(client_sock,
+ if (client_sock.sendall(
&burstRequest.rx_pps,
sizeof(burstRequest.rx_pps)) < 0) {
etiLog.level(info) <<
@@ -357,16 +301,17 @@ void OutputUHDFeedback::ServeFeedback()
break;
}
- assert(burstRequest.rx_samples.size() >= frame_bytes);
- if (sendall(client_sock,
+ if (burstRequest.rx_samples.size() < frame_bytes) {
+ throw logic_error("DPD Feedback burstRequest invalid: not enough RX samples");
+ }
+
+ if (client_sock.sendall(
&burstRequest.rx_samples[0],
frame_bytes) < 0) {
etiLog.level(info) <<
"DPD Feedback Server Client send rx_frame failed";
break;
}
-
- close(client_sock);
}
}
@@ -392,11 +337,6 @@ void OutputUHDFeedback::ServeFeedbackThread()
}
m_running = false;
-
- if (m_server_sock != -1) {
- close(m_server_sock);
- m_server_sock = -1;
- }
}
#endif
diff --git a/src/OutputUHDFeedback.h b/src/OutputUHDFeedback.h
index c68f4c2..da0d487 100644
--- a/src/OutputUHDFeedback.h
+++ b/src/OutputUHDFeedback.h
@@ -109,7 +109,6 @@ class OutputUHDFeedback {
UHDReceiveBurstRequest burstRequest;
std::atomic_bool m_running;
- int m_server_sock = -1;
uint16_t m_port = 0;
uint32_t m_sampleRate = 0;
uhd::usrp::multi_usrp::sptr m_usrp;
diff --git a/src/Socket.h b/src/Socket.h
new file mode 100644
index 0000000..1d9c252
--- /dev/null
+++ b/src/Socket.h
@@ -0,0 +1,164 @@
+/*
+ Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010 Her Majesty the
+ Queen in Right of Canada (Communications Research Center Canada)
+
+ Copyright (C) 2017
+ Matthias P. Braendli, matthias.braendli@mpb.li
+
+ http://opendigitalradio.org
+
+DESCRIPTION:
+ Abstraction for sockets.
+*/
+
+/*
+ This file is part of ODR-DabMod.
+
+ ODR-DabMod is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as
+ published by the Free Software Foundation, either version 3 of the
+ License, or (at your option) any later version.
+
+ ODR-DabMod is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with ODR-DabMod. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#pragma once
+
+#ifdef HAVE_CONFIG_H
+# include <config.h>
+#endif
+
+#include <unistd.h>
+#include <cstdint>
+#include <stdexcept>
+#include <sys/socket.h>
+#include <netinet/ip.h>
+#include <errno.h>
+#include <poll.h>
+
+class TCPSocket {
+ public:
+ TCPSocket() {
+ if ((m_sock = socket(PF_INET, SOCK_STREAM, 0)) < 0) {
+ throw std::runtime_error("Can't create TCP socket");
+ }
+ }
+
+ ~TCPSocket() {
+ if (m_sock != -1) {
+ ::close(m_sock);
+ }
+ }
+
+ TCPSocket(const TCPSocket& other) = delete;
+ TCPSocket& operator=(const TCPSocket& other) = delete;
+ TCPSocket(TCPSocket&& other) {
+ m_sock = other.m_sock;
+
+ if (other.m_sock != -1) {
+ other.m_sock = -1;
+ }
+ }
+
+ TCPSocket& operator=(TCPSocket&& other)
+ {
+ m_sock = other.m_sock;
+
+ if (other.m_sock != -1) {
+ other.m_sock = -1;
+ }
+
+ return *this;
+ }
+
+ bool valid(void) const {
+ return m_sock != -1;
+ }
+
+ void listen(int port) {
+ struct sockaddr_in addr;
+ addr.sin_family = AF_INET;
+ addr.sin_port = htons(port);
+ addr.sin_addr.s_addr = htonl(INADDR_ANY);
+
+ const int reuse = 1;
+ if (setsockopt(m_sock, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse)) < 0) {
+ throw std::runtime_error("Can't reuse address for TCP socket");
+ }
+
+ if (bind(m_sock, (struct sockaddr*)&addr, sizeof(addr)) < 0) {
+ close();
+ throw std::runtime_error("Can't bind TCP socket");
+ }
+
+ if (::listen(m_sock, 1) < 0) {
+ close();
+ m_sock = -1;
+ throw std::runtime_error("Can't listen TCP socket");
+ }
+
+ }
+
+ void close(void) {
+ ::close(m_sock);
+ m_sock = -1;
+ }
+
+ TCPSocket accept_with_timeout(int timeout_ms, struct sockaddr_in *client)
+ {
+ struct pollfd fds[1];
+ fds[0].fd = m_sock;
+ fds[0].events = POLLIN | POLLOUT;
+
+ int retval = poll(fds, 1, timeout_ms);
+
+ if (retval == -1) {
+ throw std::runtime_error("TCP Socket accept error: " + std::to_string(errno));
+ }
+ else if (retval) {
+ socklen_t client_len = sizeof(struct sockaddr_in);
+ int sockfd = accept(m_sock, (struct sockaddr*)&client, &client_len);
+ TCPSocket s(sockfd);
+ return s;
+ }
+ else {
+ TCPSocket s(-1);
+ return s;
+ }
+ }
+
+ ssize_t sendall(const void *buffer, size_t buflen)
+ {
+ uint8_t *buf = (uint8_t*)buffer;
+ while (buflen > 0) {
+ ssize_t sent = send(m_sock, buf, buflen, 0);
+ if (sent < 0) {
+ return -1;
+ }
+ else {
+ buf += sent;
+ buflen -= sent;
+ }
+ }
+ return buflen;
+ }
+
+ ssize_t recv(void *buffer, size_t length, int flags)
+ {
+ return ::recv(m_sock, buffer, length, flags);
+ }
+
+ private:
+ explicit TCPSocket(int sockfd) {
+ m_sock = sockfd;
+ }
+
+ int m_sock = -1;
+};
+