diff options
author | Matthias P. Braendli <matthias.braendli@mpb.li> | 2021-01-16 08:06:09 +0100 |
---|---|---|
committer | Matthias P. Braendli <matthias.braendli@mpb.li> | 2021-01-16 08:06:09 +0100 |
commit | d14814a92377084177753c7a60d83a9307ad0672 (patch) | |
tree | 3b901eed7eaacc07341d16dbcd0db0d60951a5e0 /lib/Socket.cpp | |
parent | 0efb3830dcd441ffdb53ebe69f2dc2886614fb8b (diff) | |
download | dabmod-d14814a92377084177753c7a60d83a9307ad0672.tar.gz dabmod-d14814a92377084177753c7a60d83a9307ad0672.tar.bz2 dabmod-d14814a92377084177753c7a60d83a9307ad0672.zip |
Update common code to latest, update zmq.hpp and adapt
Diffstat (limited to 'lib/Socket.cpp')
-rw-r--r-- | lib/Socket.cpp | 154 |
1 files changed, 93 insertions, 61 deletions
diff --git a/lib/Socket.cpp b/lib/Socket.cpp index d41ed1c..c876f32 100644 --- a/lib/Socket.cpp +++ b/lib/Socket.cpp @@ -66,6 +66,18 @@ void InetAddress::resolveUdpDestination(const std::string& destination, int port } } +string InetAddress::to_string() const +{ + char received_from_str[64] = {}; + sockaddr *addr = reinterpret_cast<sockaddr*>(&addr); + const char* ret = inet_ntop(AF_INET, addr, received_from_str, 63); + + if (ret == nullptr) { + throw invalid_argument(string("Error converting InetAddress") + strerror(errno)); + } + return ret; +} + UDPPacket::UDPPacket() { } UDPPacket::UDPPacket(size_t initSize) : @@ -74,24 +86,37 @@ UDPPacket::UDPPacket(size_t initSize) : { } -UDPSocket::UDPSocket() : - m_sock(INVALID_SOCKET) +UDPSocket::UDPSocket() { reinit(0, ""); } -UDPSocket::UDPSocket(int port) : - m_sock(INVALID_SOCKET) +UDPSocket::UDPSocket(int port) { reinit(port, ""); } -UDPSocket::UDPSocket(int port, const std::string& name) : - m_sock(INVALID_SOCKET) +UDPSocket::UDPSocket(int port, const std::string& name) { reinit(port, name); } +UDPSocket::UDPSocket(UDPSocket&& other) +{ + m_sock = other.m_sock; + m_port = other.m_port; + other.m_port = 0; + other.m_sock = INVALID_SOCKET; +} + +const UDPSocket& UDPSocket::operator=(UDPSocket&& other) +{ + m_sock = other.m_sock; + m_port = other.m_port; + other.m_port = 0; + other.m_sock = INVALID_SOCKET; + return *this; +} void UDPSocket::setBlocking(bool block) { @@ -112,6 +137,8 @@ void UDPSocket::reinit(int port, const std::string& name) ::close(m_sock); } + m_port = port; + if (port == 0) { // No need to bind to a given port, creating the // socket is enough @@ -276,72 +303,71 @@ void UDPSocket::setMulticastTTL(int ttl) } } -UDPReceiver::UDPReceiver() { } - -UDPReceiver::~UDPReceiver() { - m_stop = true; - m_sock.close(); - if (m_thread.joinable()) { - m_thread.join(); - } +SOCKET UDPSocket::getNativeSocket() const +{ + return m_sock; } -void UDPReceiver::start(int port, const string& bindto, const string& mcastaddr, size_t max_packets_queued) { - m_port = port; - m_bindto = bindto; - m_mcastaddr = mcastaddr; - m_max_packets_queued = max_packets_queued; - m_thread = std::thread(&UDPReceiver::m_run, this); +int UDPSocket::getPort() const +{ + return m_port; } -std::vector<uint8_t> UDPReceiver::get_packet_buffer() -{ - if (m_stop) { - throw runtime_error("UDP Receiver not running"); - } +void UDPReceiver::add_receive_port(int port, const string& bindto, const string& mcastaddr) { + UDPSocket sock; - UDPPacket p; - m_packets.wait_and_pop(p); + if (IN_MULTICAST(ntohl(inet_addr(mcastaddr.c_str())))) { + sock.reinit(port, mcastaddr); + sock.setMulticastSource(bindto.c_str()); + sock.joinGroup(mcastaddr.c_str(), bindto.c_str()); + } + else { + sock.reinit(port, bindto); + } - return p.buffer; + m_sockets.push_back(move(sock)); } -void UDPReceiver::m_run() +vector<UDPReceiver::ReceivedPacket> UDPReceiver::receive(int timeout_ms) { - // Ensure that stop is set to true in case of exception or return - struct SetStopOnDestruct { - SetStopOnDestruct(atomic<bool>& stop) : m_stop(stop) {} - ~SetStopOnDestruct() { m_stop = true; } - private: atomic<bool>& m_stop; - } autoSetStop(m_stop); - - if (IN_MULTICAST(ntohl(inet_addr(m_mcastaddr.c_str())))) { - m_sock.reinit(m_port, m_mcastaddr); - m_sock.setMulticastSource(m_bindto.c_str()); - m_sock.joinGroup(m_mcastaddr.c_str(), m_bindto.c_str()); + constexpr size_t MAX_FDS = 64; + struct pollfd fds[MAX_FDS]; + if (m_sockets.size() > MAX_FDS) { + throw std::runtime_error("UDPReceiver only supports up to 64 ports"); } - else { - m_sock.reinit(m_port, m_bindto); + + for (size_t i = 0; i < m_sockets.size(); i++) { + fds[i].fd = m_sockets[i].getNativeSocket(); + fds[i].events = POLLIN; } - while (not m_stop) { - constexpr size_t packsize = 8192; - try { - auto packet = m_sock.receive(packsize); - if (packet.buffer.size() == packsize) { - // TODO replace fprintf - fprintf(stderr, "Warning, possible UDP truncation\n"); - } + int retval = poll(fds, m_sockets.size(), timeout_ms); - // If this blocks, the UDP socket will lose incoming packets - m_packets.push_wait_if_full(packet, m_max_packets_queued); - } - catch (const std::runtime_error& e) { - // TODO replace fprintf - // TODO handle intr - fprintf(stderr, "Socket error: %s\n", e.what()); - m_stop = true; + if (retval == -1 and errno == EINTR) { + throw Interrupted(); + } + else if (retval == -1) { + std::string errstr(strerror(errno)); + throw std::runtime_error("UDP receive with poll() error: " + errstr); + } + else if (retval > 0) { + vector<ReceivedPacket> received; + + for (size_t i = 0; i < m_sockets.size(); i++) { + if (fds[i].revents & POLLIN) { + auto p = m_sockets[i].receive(2048); // This is larger than the usual MTU + ReceivedPacket rp; + rp.packetdata = move(p.buffer); + rp.received_from = move(p.address); + rp.port_received_on = m_sockets[i].getPort(); + received.push_back(move(rp)); + } } + + return received; + } + else { + throw Timeout(); } } @@ -862,9 +888,9 @@ TCPReceiveServer::~TCPReceiveServer() } } -vector<uint8_t> TCPReceiveServer::receive() +shared_ptr<TCPReceiveMessage> TCPReceiveServer::receive() { - vector<uint8_t> buffer; + shared_ptr<TCPReceiveMessage> buffer = make_shared<TCPReceiveMessageEmpty>(); m_queue.try_pop(buffer); // we can ignore try_pop()'s return value, because @@ -892,11 +918,12 @@ void TCPReceiveServer::process() } else if (r == 0) { sock.close(); + m_queue.push(make_shared<TCPReceiveMessageDisconnected>()); break; } else { buf.resize(r); - m_queue.push(move(buf)); + m_queue.push(make_shared<TCPReceiveMessageData>(move(buf))); } } catch (const TCPSocket::Interrupted&) { @@ -905,6 +932,11 @@ void TCPReceiveServer::process() catch (const TCPSocket::Timeout&) { num_timeouts++; } + catch (const runtime_error& e) { + sock.close(); + // TODO replace fprintf + fprintf(stderr, "TCP Receiver restarted after error: %s\n", e.what()); + } if (num_timeouts > max_num_timeouts) { sock.close(); |