aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorMatthias P. Braendli <matthias.braendli@mpb.li>2019-07-08 12:08:44 +0200
committerMatthias P. Braendli <matthias.braendli@mpb.li>2019-07-08 12:08:44 +0200
commitc8c9d5abf8238c3531ecae8fb272fdf5bbffd336 (patch)
tree30049a585baf782b3a0d74a319768efbdf66542e /src
parent3a7202306c6aca5be2dad604f62063d605fd0982 (diff)
downloaddabmod-c8c9d5abf8238c3531ecae8fb272fdf5bbffd336.tar.gz
dabmod-c8c9d5abf8238c3531ecae8fb272fdf5bbffd336.tar.bz2
dabmod-c8c9d5abf8238c3531ecae8fb272fdf5bbffd336.zip
Unify Socket library with other mmbTools
Diffstat (limited to 'src')
-rw-r--r--src/EtiReader.cpp4
-rw-r--r--src/EtiReader.h9
-rw-r--r--src/InputReader.h2
-rw-r--r--src/Socket.cpp275
-rw-r--r--src/Socket.h104
-rw-r--r--src/ThreadsafeQueue.h178
-rw-r--r--src/output/Feedback.cpp7
7 files changed, 9 insertions, 570 deletions
diff --git a/src/EtiReader.cpp b/src/EtiReader.cpp
index 94c362a..93008bb 100644
--- a/src/EtiReader.cpp
+++ b/src/EtiReader.cpp
@@ -2,7 +2,7 @@
Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010, 2011 Her Majesty
the Queen in Right of Canada (Communications Research Center Canada)
- Copyright (C) 2018
+ Copyright (C) 2019
Matthias P. Braendli, matthias.braendli@mpb.li
http://opendigitalradio.org
@@ -644,7 +644,7 @@ bool EdiTransport::rxPacket()
return true;
}
}
- catch (const TCPSocket::Timeout&) {
+ catch (const Socket::TCPSocket::Timeout&) {
return false;
}
}
diff --git a/src/EtiReader.h b/src/EtiReader.h
index 38f7903..8548654 100644
--- a/src/EtiReader.h
+++ b/src/EtiReader.h
@@ -2,7 +2,7 @@
Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010, 2011 Her Majesty
the Queen in Right of Canada (Communications Research Center Canada)
- Copyright (C) 2018
+ Copyright (C) 2019
Matthias P. Braendli, matthias.braendli@mpb.li
http://opendigitalradio.org
@@ -38,9 +38,6 @@
#include "SubchannelSource.h"
#include "TimestampDecoder.h"
#include "lib/edi/ETIDecoder.hpp"
-#ifdef HAVE_EDI
-# include "lib/UdpSocket.h"
-#endif
#include <vector>
#include <memory>
@@ -211,9 +208,9 @@ class EdiTransport {
enum class Proto { UDP, TCP };
Proto m_proto;
- UdpReceiver m_udp_rx;
+ Socket::UDPReceiver m_udp_rx;
std::vector<uint8_t> m_tcpbuffer;
- TCPClient m_tcpclient;
+ Socket::TCPClient m_tcpclient;
EdiDecoder::ETIDecoder& m_decoder;
};
#endif
diff --git a/src/InputReader.h b/src/InputReader.h
index 63451e5..1a63cea 100644
--- a/src/InputReader.h
+++ b/src/InputReader.h
@@ -144,7 +144,7 @@ class InputTcpReader : public InputReader
virtual std::string GetPrintableInfo() const override;
private:
- TCPClient m_tcpclient;
+ Socket::TCPClient m_tcpclient;
std::string m_uri;
};
diff --git a/src/Socket.cpp b/src/Socket.cpp
deleted file mode 100644
index 08cda68..0000000
--- a/src/Socket.cpp
+++ /dev/null
@@ -1,275 +0,0 @@
-/*
- Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010 Her Majesty the
- Queen in Right of Canada (Communications Research Center Canada)
-
- Copyright (C) 2018
- Matthias P. Braendli, matthias.braendli@mpb.li
-
- http://opendigitalradio.org
-*/
-
-/*
- This file is part of ODR-DabMod.
-
- ODR-DabMod is free software: you can redistribute it and/or modify
- it under the terms of the GNU General Public License as
- published by the Free Software Foundation, either version 3 of the
- License, or (at your option) any later version.
-
- ODR-DabMod is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with ODR-DabMod. If not, see <http://www.gnu.org/licenses/>.
- */
-
-#include "Socket.h"
-#include "Log.h"
-#include <fcntl.h>
-
-TCPSocket::TCPSocket()
-{
- if ((m_sock = socket(PF_INET, SOCK_STREAM, 0)) < 0) {
- throw std::runtime_error("Can't create TCP socket");
- }
-
-#if defined(HAVE_SO_NOSIGPIPE)
- int val = 1;
- if (setsockopt(m_sock, SOL_SOCKET, SO_NOSIGPIPE,
- &val, sizeof(val)) < 0) {
- throw std::runtime_error("Can't set SO_NOSIGPIPE");
- }
-#endif
-}
-
-TCPSocket::~TCPSocket()
-{
- if (m_sock != -1) {
- ::close(m_sock);
- }
-}
-
-TCPSocket::TCPSocket(TCPSocket&& other)
-{
- m_sock = other.m_sock;
-
- if (other.m_sock != -1) {
- other.m_sock = -1;
- }
-}
-
-TCPSocket& TCPSocket::operator=(TCPSocket&& other)
-{
- m_sock = other.m_sock;
-
- if (other.m_sock != -1) {
- other.m_sock = -1;
- }
-
- return *this;
-}
-
-bool TCPSocket::valid() const
-{
- return m_sock != -1;
-}
-
-void TCPSocket::connect(const std::string& hostname, int port)
-{
- struct sockaddr_in addr;
- addr.sin_family = PF_INET;
- addr.sin_addr.s_addr = htons(INADDR_ANY);
- addr.sin_port = htons(port);
-
- hostent *host = gethostbyname(hostname.c_str());
- if (host) {
- addr.sin_addr = *(in_addr *)(host->h_addr);
- }
- else {
- std::string errstr(strerror(errno));
- throw std::runtime_error(
- "could not resolve hostname " +
- hostname + ":" + std::to_string(port) +
- " : " + errstr);
- }
-
- int ret = ::connect(m_sock, (struct sockaddr*)&addr, sizeof(addr));
- if (ret == -1 and errno != EINPROGRESS) {
- std::string errstr(strerror(errno));
- throw std::runtime_error(
- "could not connect to " +
- hostname + ":" + std::to_string(port) +
- " : " + errstr);
- }
-}
-
-void TCPSocket::listen(int port)
-{
- struct sockaddr_in addr;
- addr.sin_family = AF_INET;
- addr.sin_port = htons(port);
- addr.sin_addr.s_addr = htonl(INADDR_ANY);
-
- const int reuse = 1;
- if (setsockopt(m_sock, SOL_SOCKET, SO_REUSEADDR,
- &reuse, sizeof(reuse)) < 0) {
- throw std::runtime_error("Can't reuse address for TCP socket");
- }
-
- if (::bind(m_sock, (struct sockaddr*)&addr, sizeof(addr)) < 0) {
- close();
- throw std::runtime_error("Can't bind TCP socket");
- }
-
- if (::listen(m_sock, 1) < 0) {
- close();
- m_sock = -1;
- throw std::runtime_error("Can't listen TCP socket");
- }
-
-}
-
-void TCPSocket::close()
-{
- ::close(m_sock);
- m_sock = -1;
-}
-
-TCPSocket TCPSocket::accept_with_timeout(int timeout_ms, struct sockaddr_in *client)
-{
- struct pollfd fds[1];
- fds[0].fd = m_sock;
- fds[0].events = POLLIN;
-
- int retval = poll(fds, 1, timeout_ms);
-
- if (retval == -1) {
- std::string errstr(strerror(errno));
- throw std::runtime_error("TCP Socket accept error: " + errstr);
- }
- else if (retval > 0) {
- socklen_t client_len = sizeof(struct sockaddr_in);
- int sockfd = accept(m_sock, (struct sockaddr*)&client, &client_len);
- TCPSocket s(sockfd);
- return s;
- }
- else {
- TCPSocket s(-1);
- return s;
- }
-}
-
-ssize_t TCPSocket::sendall(const void *buffer, size_t buflen)
-{
- uint8_t *buf = (uint8_t*)buffer;
- while (buflen > 0) {
- /* On Linux, the MSG_NOSIGNAL flag ensures that the process
- * would not receive a SIGPIPE and die.
- * Other systems have SO_NOSIGPIPE set on the socket for the
- * same effect. */
-#if defined(HAVE_MSG_NOSIGNAL)
- const int flags = MSG_NOSIGNAL;
-#else
- const int flags = 0;
-#endif
- ssize_t sent = ::send(m_sock, buf, buflen, flags);
- if (sent < 0) {
- return -1;
- }
- else {
- buf += sent;
- buflen -= sent;
- }
- }
- return buflen;
-}
-
-ssize_t TCPSocket::recv(void *buffer, size_t length, int flags)
-{
- ssize_t ret = ::recv(m_sock, buffer, length, flags);
- if (ret == -1) {
- std::string errstr(strerror(errno));
- throw std::runtime_error("TCP receive error: " + errstr);
- }
- return ret;
-}
-
-ssize_t TCPSocket::recv(void *buffer, size_t length, int flags, int timeout_ms)
-{
- struct pollfd fds[1];
- fds[0].fd = m_sock;
- fds[0].events = POLLIN;
-
- int retval = poll(fds, 1, timeout_ms);
-
- if (retval == -1 and errno == EINTR) {
- throw Interrupted();
- }
- else if (retval == -1) {
- std::string errstr(strerror(errno));
- throw std::runtime_error("TCP receive with poll() error: " + errstr);
- }
- else if (retval > 0 and (fds[0].revents | POLLIN)) {
- ssize_t ret = ::recv(m_sock, buffer, length, flags);
- if (ret == -1) {
- if (errno == ECONNREFUSED) {
- return 0;
- }
- std::string errstr(strerror(errno));
- throw std::runtime_error("TCP receive after poll() error: " + errstr);
- }
- return ret;
- }
- else {
- throw Timeout();
- }
-}
-
-TCPSocket::TCPSocket(int sockfd) {
- m_sock = sockfd;
-}
-
-void TCPClient::connect(const std::string& hostname, int port)
-{
- m_hostname = hostname;
- m_port = port;
- reconnect();
-}
-
-ssize_t TCPClient::recv(void *buffer, size_t length, int flags, int timeout_ms)
-{
- try {
- ssize_t ret = m_sock.recv(buffer, length, flags, timeout_ms);
-
- if (ret == 0) {
- m_sock.close();
-
- TCPSocket newsock;
- m_sock = std::move(newsock);
- reconnect();
- }
-
- return ret;
- }
- catch (const TCPSocket::Interrupted&) {
- return -1;
- }
- catch (const TCPSocket::Timeout&) {
- return 0;
- }
-
- return 0;
-}
-
-void TCPClient::reconnect()
-{
- int flags = fcntl(m_sock.m_sock, F_GETFL);
- if (fcntl(m_sock.m_sock, F_SETFL, flags | O_NONBLOCK) == -1) {
- std::string errstr(strerror(errno));
- throw std::runtime_error("TCP: Could not set O_NONBLOCK: " + errstr);
- }
-
- m_sock.connect(m_hostname, m_port);
-}
diff --git a/src/Socket.h b/src/Socket.h
deleted file mode 100644
index 14c5cbe..0000000
--- a/src/Socket.h
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010 Her Majesty the
- Queen in Right of Canada (Communications Research Center Canada)
-
- Copyright (C) 2018
- Matthias P. Braendli, matthias.braendli@mpb.li
-
- http://opendigitalradio.org
-
-DESCRIPTION:
- Abstraction for sockets.
-*/
-
-/*
- This file is part of ODR-DabMod.
-
- ODR-DabMod is free software: you can redistribute it and/or modify
- it under the terms of the GNU General Public License as
- published by the Free Software Foundation, either version 3 of the
- License, or (at your option) any later version.
-
- ODR-DabMod is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with ODR-DabMod. If not, see <http://www.gnu.org/licenses/>.
- */
-
-#pragma once
-
-#ifdef HAVE_CONFIG_H
-# include <config.h>
-#endif
-
-#include <stdexcept>
-#include <string>
-#include <cstdint>
-#include <cstring>
-#include <unistd.h>
-#include <errno.h>
-#include <poll.h>
-#include <sys/socket.h>
-#include <netinet/in.h>
-#include <netdb.h>
-#include <arpa/inet.h>
-
-class TCPSocket {
- public:
- TCPSocket();
- ~TCPSocket();
- TCPSocket(const TCPSocket& other) = delete;
- TCPSocket& operator=(const TCPSocket& other) = delete;
- TCPSocket(TCPSocket&& other);
- TCPSocket& operator=(TCPSocket&& other);
-
- bool valid(void) const;
- void connect(const std::string& hostname, int port);
- void listen(int port);
- void close(void);
-
- /* throws a runtime_error on failure, an invalid socket on timeout */
- TCPSocket accept_with_timeout(int timeout_ms, struct sockaddr_in *client);
-
- /* returns -1 on error */
- ssize_t sendall(const void *buffer, size_t buflen);
-
- /* Returns number of bytes read, 0 on disconnect. Throws a
- * runtime_error on error */
- ssize_t recv(void *buffer, size_t length, int flags);
-
- class Timeout {};
- class Interrupted {};
- /* Returns number of bytes read, 0 on disconnect or refused connection.
- * Throws a Timeout on timeout, Interrupted on EINTR, a runtime_error
- * on error
- */
- ssize_t recv(void *buffer, size_t length, int flags, int timeout_ms);
-
- private:
- explicit TCPSocket(int sockfd);
- int m_sock = -1;
-
- friend class TCPClient;
-};
-
-/* Implement a TCP receiver that auto-reconnects on errors */
-class TCPClient {
- public:
- void connect(const std::string& hostname, int port);
-
- /* Returns numer of bytes read, 0 on auto-reconnect, -1
- * on interruption.
- * Throws a runtime_error on error */
- ssize_t recv(void *buffer, size_t length, int flags, int timeout_ms);
-
- private:
- void reconnect(void);
- TCPSocket m_sock;
- std::string m_hostname;
- int m_port;
-};
-
diff --git a/src/ThreadsafeQueue.h b/src/ThreadsafeQueue.h
deleted file mode 100644
index ee26ca0..0000000
--- a/src/ThreadsafeQueue.h
+++ /dev/null
@@ -1,178 +0,0 @@
-/*
- Copyright (C) 2007, 2008, 2009, 2010, 2011 Her Majesty the Queen in
- Right of Canada (Communications Research Center Canada)
-
- Copyright (C) 2018
- Matthias P. Braendli, matthias.braendli@mpb.li
-
- An implementation for a threadsafe queue, depends on C++11
-
- When creating a ThreadsafeQueue, one can specify the minimal number
- of elements it must contain before it is possible to take one
- element out.
- */
-/*
- This file is part of ODR-DabMod.
-
- ODR-DabMod is free software: you can redistribute it and/or modify
- it under the terms of the GNU General Public License as
- published by the Free Software Foundation, either version 3 of the
- License, or (at your option) any later version.
-
- ODR-DabMod is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with ODR-DabMod. If not, see <http://www.gnu.org/licenses/>.
- */
-
-#pragma once
-
-#include <mutex>
-#include <condition_variable>
-#include <queue>
-#include <utility>
-
-/* This queue is meant to be used by two threads. One producer
- * that pushes elements into the queue, and one consumer that
- * retrieves the elements.
- *
- * The queue can make the consumer block until an element
- * is available, or a wakeup requested.
- */
-
-/* Class thrown by blocking pop to tell the consumer
- * that there's a wakeup requested. */
-class ThreadsafeQueueWakeup {};
-
-template<typename T>
-class ThreadsafeQueue
-{
-public:
- /* Push one element into the queue, and notify another thread that
- * might be waiting.
- *
- * returns the new queue size.
- */
- size_t push(T const& val)
- {
- std::unique_lock<std::mutex> lock(the_mutex);
- the_queue.push(val);
- size_t queue_size = the_queue.size();
- lock.unlock();
-
- the_rx_notification.notify_one();
-
- return queue_size;
- }
-
- size_t push(T&& val)
- {
- std::unique_lock<std::mutex> lock(the_mutex);
- the_queue.emplace(std::move(val));
- size_t queue_size = the_queue.size();
- lock.unlock();
-
- the_rx_notification.notify_one();
-
- return queue_size;
- }
-
- /* Push one element into the queue, but wait until the
- * queue size goes below the threshold.
- *
- * Notify waiting thread.
- *
- * returns the new queue size.
- */
- size_t push_wait_if_full(T const& val, size_t threshold)
- {
- std::unique_lock<std::mutex> lock(the_mutex);
- while (the_queue.size() >= threshold) {
- the_tx_notification.wait(lock);
- }
- the_queue.push(val);
- size_t queue_size = the_queue.size();
- lock.unlock();
-
- the_rx_notification.notify_one();
-
- return queue_size;
- }
-
- /* Trigger a wakeup event on a blocking consumer, which
- * will receive a ThreadsafeQueueWakeup exception.
- */
- void trigger_wakeup(void)
- {
- std::unique_lock<std::mutex> lock(the_mutex);
- wakeup_requested = true;
- lock.unlock();
- the_rx_notification.notify_one();
- }
-
- /* Send a notification for the receiver thread */
- void notify(void)
- {
- the_rx_notification.notify_one();
- }
-
- bool empty() const
- {
- std::unique_lock<std::mutex> lock(the_mutex);
- return the_queue.empty();
- }
-
- size_t size() const
- {
- std::unique_lock<std::mutex> lock(the_mutex);
- return the_queue.size();
- }
-
- bool try_pop(T& popped_value)
- {
- std::unique_lock<std::mutex> lock(the_mutex);
- if (the_queue.empty()) {
- return false;
- }
-
- popped_value = the_queue.front();
- the_queue.pop();
-
- lock.unlock();
- the_tx_notification.notify_one();
-
- return true;
- }
-
- void wait_and_pop(T& popped_value, size_t prebuffering = 1)
- {
- std::unique_lock<std::mutex> lock(the_mutex);
- while (the_queue.size() < prebuffering and
- not wakeup_requested) {
- the_rx_notification.wait(lock);
- }
-
- if (wakeup_requested) {
- wakeup_requested = false;
- throw ThreadsafeQueueWakeup();
- }
- else {
- std::swap(popped_value, the_queue.front());
- the_queue.pop();
-
- lock.unlock();
- the_tx_notification.notify_one();
- }
- }
-
-private:
- std::queue<T> the_queue;
- mutable std::mutex the_mutex;
- std::condition_variable the_rx_notification;
- std::condition_variable the_tx_notification;
- bool wakeup_requested = false;
-};
-
diff --git a/src/output/Feedback.cpp b/src/output/Feedback.cpp
index 17e45bf..88d8319 100644
--- a/src/output/Feedback.cpp
+++ b/src/output/Feedback.cpp
@@ -200,14 +200,13 @@ void DPDFeedbackServer::ReceiveBurstThread()
void DPDFeedbackServer::ServeFeedback()
{
- TCPSocket m_server_sock;
- m_server_sock.listen(m_port);
+ Socket::TCPSocket m_server_sock;
+ m_server_sock.listen(m_port, "127.0.0.1");
etiLog.level(info) << "DPD Feedback server listening on port " << m_port;
while (m_running) {
- struct sockaddr_in client;
- TCPSocket client_sock = m_server_sock.accept_with_timeout(1000, &client);
+ auto client_sock = m_server_sock.accept(1000);
if (not m_running) {
break;