aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMatthias P. Braendli <matthias.braendli@mpb.li>2021-01-16 08:07:24 +0100
committerMatthias P. Braendli <matthias.braendli@mpb.li>2021-01-16 08:07:24 +0100
commit47e5aa5f441129876048c6faf3ac8857ddaef590 (patch)
tree93147009f9d3889554f0827229c8ffbd0ccbd48b
parentcb5559ae45daa7ba689e6ae15a9f48c30ffa1d2b (diff)
downloadODR-SourceCompanion-47e5aa5f441129876048c6faf3ac8857ddaef590.tar.gz
ODR-SourceCompanion-47e5aa5f441129876048c6faf3ac8857ddaef590.tar.bz2
ODR-SourceCompanion-47e5aa5f441129876048c6faf3ac8857ddaef590.zip
Common 405ad00: Reimplement UDPReceiver with poll instead of a thread
-rw-r--r--lib/Socket.cpp142
-rw-r--r--lib/Socket.h42
2 files changed, 107 insertions, 77 deletions
diff --git a/lib/Socket.cpp b/lib/Socket.cpp
index 6a20429..c876f32 100644
--- a/lib/Socket.cpp
+++ b/lib/Socket.cpp
@@ -66,6 +66,18 @@ void InetAddress::resolveUdpDestination(const std::string& destination, int port
}
}
+string InetAddress::to_string() const
+{
+ char received_from_str[64] = {};
+ sockaddr *addr = reinterpret_cast<sockaddr*>(&addr);
+ const char* ret = inet_ntop(AF_INET, addr, received_from_str, 63);
+
+ if (ret == nullptr) {
+ throw invalid_argument(string("Error converting InetAddress") + strerror(errno));
+ }
+ return ret;
+}
+
UDPPacket::UDPPacket() { }
UDPPacket::UDPPacket(size_t initSize) :
@@ -74,24 +86,37 @@ UDPPacket::UDPPacket(size_t initSize) :
{ }
-UDPSocket::UDPSocket() :
- m_sock(INVALID_SOCKET)
+UDPSocket::UDPSocket()
{
reinit(0, "");
}
-UDPSocket::UDPSocket(int port) :
- m_sock(INVALID_SOCKET)
+UDPSocket::UDPSocket(int port)
{
reinit(port, "");
}
-UDPSocket::UDPSocket(int port, const std::string& name) :
- m_sock(INVALID_SOCKET)
+UDPSocket::UDPSocket(int port, const std::string& name)
{
reinit(port, name);
}
+UDPSocket::UDPSocket(UDPSocket&& other)
+{
+ m_sock = other.m_sock;
+ m_port = other.m_port;
+ other.m_port = 0;
+ other.m_sock = INVALID_SOCKET;
+}
+
+const UDPSocket& UDPSocket::operator=(UDPSocket&& other)
+{
+ m_sock = other.m_sock;
+ m_port = other.m_port;
+ other.m_port = 0;
+ other.m_sock = INVALID_SOCKET;
+ return *this;
+}
void UDPSocket::setBlocking(bool block)
{
@@ -112,6 +137,8 @@ void UDPSocket::reinit(int port, const std::string& name)
::close(m_sock);
}
+ m_port = port;
+
if (port == 0) {
// No need to bind to a given port, creating the
// socket is enough
@@ -276,72 +303,71 @@ void UDPSocket::setMulticastTTL(int ttl)
}
}
-UDPReceiver::UDPReceiver() { }
-
-UDPReceiver::~UDPReceiver() {
- m_stop = true;
- m_sock.close();
- if (m_thread.joinable()) {
- m_thread.join();
- }
+SOCKET UDPSocket::getNativeSocket() const
+{
+ return m_sock;
}
-void UDPReceiver::start(int port, const string& bindto, const string& mcastaddr, size_t max_packets_queued) {
- m_port = port;
- m_bindto = bindto;
- m_mcastaddr = mcastaddr;
- m_max_packets_queued = max_packets_queued;
- m_thread = std::thread(&UDPReceiver::m_run, this);
+int UDPSocket::getPort() const
+{
+ return m_port;
}
-std::vector<uint8_t> UDPReceiver::get_packet_buffer()
-{
- if (m_stop) {
- throw runtime_error("UDP Receiver not running");
- }
+void UDPReceiver::add_receive_port(int port, const string& bindto, const string& mcastaddr) {
+ UDPSocket sock;
- UDPPacket p;
- m_packets.wait_and_pop(p);
+ if (IN_MULTICAST(ntohl(inet_addr(mcastaddr.c_str())))) {
+ sock.reinit(port, mcastaddr);
+ sock.setMulticastSource(bindto.c_str());
+ sock.joinGroup(mcastaddr.c_str(), bindto.c_str());
+ }
+ else {
+ sock.reinit(port, bindto);
+ }
- return p.buffer;
+ m_sockets.push_back(move(sock));
}
-void UDPReceiver::m_run()
+vector<UDPReceiver::ReceivedPacket> UDPReceiver::receive(int timeout_ms)
{
- // Ensure that stop is set to true in case of exception or return
- struct SetStopOnDestruct {
- SetStopOnDestruct(atomic<bool>& stop) : m_stop(stop) {}
- ~SetStopOnDestruct() { m_stop = true; }
- private: atomic<bool>& m_stop;
- } autoSetStop(m_stop);
-
- if (IN_MULTICAST(ntohl(inet_addr(m_mcastaddr.c_str())))) {
- m_sock.reinit(m_port, m_mcastaddr);
- m_sock.setMulticastSource(m_bindto.c_str());
- m_sock.joinGroup(m_mcastaddr.c_str(), m_bindto.c_str());
+ constexpr size_t MAX_FDS = 64;
+ struct pollfd fds[MAX_FDS];
+ if (m_sockets.size() > MAX_FDS) {
+ throw std::runtime_error("UDPReceiver only supports up to 64 ports");
}
- else {
- m_sock.reinit(m_port, m_bindto);
+
+ for (size_t i = 0; i < m_sockets.size(); i++) {
+ fds[i].fd = m_sockets[i].getNativeSocket();
+ fds[i].events = POLLIN;
}
- while (not m_stop) {
- constexpr size_t packsize = 8192;
- try {
- auto packet = m_sock.receive(packsize);
- if (packet.buffer.size() == packsize) {
- // TODO replace fprintf
- fprintf(stderr, "Warning, possible UDP truncation\n");
- }
+ int retval = poll(fds, m_sockets.size(), timeout_ms);
- // If this blocks, the UDP socket will lose incoming packets
- m_packets.push_wait_if_full(packet, m_max_packets_queued);
- }
- catch (const std::runtime_error& e) {
- // TODO replace fprintf
- // TODO handle intr
- fprintf(stderr, "Socket error: %s\n", e.what());
- m_stop = true;
+ if (retval == -1 and errno == EINTR) {
+ throw Interrupted();
+ }
+ else if (retval == -1) {
+ std::string errstr(strerror(errno));
+ throw std::runtime_error("UDP receive with poll() error: " + errstr);
+ }
+ else if (retval > 0) {
+ vector<ReceivedPacket> received;
+
+ for (size_t i = 0; i < m_sockets.size(); i++) {
+ if (fds[i].revents & POLLIN) {
+ auto p = m_sockets[i].receive(2048); // This is larger than the usual MTU
+ ReceivedPacket rp;
+ rp.packetdata = move(p.buffer);
+ rp.received_from = move(p.address);
+ rp.port_received_on = m_sockets[i].getPort();
+ received.push_back(move(rp));
+ }
}
+
+ return received;
+ }
+ else {
+ throw Timeout();
}
}
diff --git a/lib/Socket.h b/lib/Socket.h
index 2291dd5..33cdc05 100644
--- a/lib/Socket.h
+++ b/lib/Socket.h
@@ -56,6 +56,8 @@ struct InetAddress {
struct sockaddr *as_sockaddr() { return reinterpret_cast<sockaddr*>(&addr); };
void resolveUdpDestination(const std::string& destination, int port);
+
+ std::string to_string() const;
};
/** This class represents a UDP packet.
@@ -103,6 +105,8 @@ class UDPSocket
~UDPSocket();
UDPSocket(const UDPSocket& other) = delete;
const UDPSocket& operator=(const UDPSocket& other) = delete;
+ UDPSocket(UDPSocket&& other);
+ const UDPSocket& operator=(UDPSocket&& other);
/** Close the already open socket, and create a new one. Throws a runtime_error on error. */
void reinit(int port);
@@ -121,36 +125,36 @@ class UDPSocket
*/
void setBlocking(bool block);
+ SOCKET getNativeSocket() const;
+ int getPort() const;
+
protected:
- SOCKET m_sock;
+ SOCKET m_sock = INVALID_SOCKET;
+ int m_port = 0;
};
-/* Threaded UDP receiver */
+/* UDP packet receiver supporting receiving from several ports at once */
class UDPReceiver {
public:
- UDPReceiver();
- ~UDPReceiver();
- UDPReceiver(const UDPReceiver&) = delete;
- UDPReceiver operator=(const UDPReceiver&) = delete;
+ void add_receive_port(int port, const std::string& bindto, const std::string& mcastaddr);
- // Start the receiver in a separate thread
- void start(int port, const std::string& bindto, const std::string& mcastaddr, size_t max_packets_queued);
+ struct ReceivedPacket {
+ std::vector<uint8_t> packetdata;
+ InetAddress received_from;
+ int port_received_on;
+ };
- // Get the data contained in a UDP packet, blocks if none available
- // In case of error, throws a runtime_error
- std::vector<uint8_t> get_packet_buffer(void);
+ class Interrupted {};
+ class Timeout {};
+ /* Returns one or several packets,
+ * throws a Timeout on timeout, Interrupted on EINTR, a runtime_error
+ * on error. */
+ std::vector<ReceivedPacket> receive(int timeout_ms);
private:
void m_run(void);
- int m_port = 0;
- std::string m_bindto;
- std::string m_mcastaddr;
- size_t m_max_packets_queued = 1;
- std::thread m_thread;
- std::atomic<bool> m_stop = ATOMIC_VAR_INIT(false);
- ThreadsafeQueue<UDPPacket> m_packets;
- UDPSocket m_sock;
+ std::vector<UDPSocket> m_sockets;
};
class TCPSocket {