diff options
-rw-r--r-- | lib/UdpSocket.cpp | 63 | ||||
-rw-r--r-- | lib/UdpSocket.h | 2 | ||||
-rw-r--r-- | src/DabMod.cpp | 13 | ||||
-rw-r--r-- | src/EtiReader.cpp | 13 |
4 files changed, 60 insertions, 31 deletions
diff --git a/lib/UdpSocket.cpp b/lib/UdpSocket.cpp index 7645eaf..345b971 100644 --- a/lib/UdpSocket.cpp +++ b/lib/UdpSocket.cpp @@ -25,6 +25,7 @@ */ #include "UdpSocket.h" +#include "Utils.h" #include <iostream> #include <stdio.h> @@ -116,29 +117,50 @@ UdpSocket::~UdpSocket() } } +static inline bool wait_for_recv_ready(int sock_fd, const size_t timeout_ms) +{ + //setup timeval for timeout + timeval tv; + tv.tv_sec = 0; + tv.tv_usec = timeout_ms*1000; + + //setup rset for timeout + fd_set rset; + FD_ZERO(&rset); + FD_SET(sock_fd, &rset); + + return ::select(sock_fd+1, &rset, NULL, NULL, &tv) > 0; +} int UdpSocket::receive(UdpPacket& packet) { - socklen_t addrSize; - addrSize = sizeof(*packet.getAddress().getAddress()); - ssize_t ret = recvfrom(listenSocket, - packet.getData(), - packet.getSize(), - 0, - packet.getAddress().getAddress(), - &addrSize); - - if (ret == SOCKET_ERROR) { - packet.setSize(0); - if (errno == EAGAIN) { - return 0; + bool ready = wait_for_recv_ready(listenSocket, 2000); + + if (ready) { + socklen_t addrSize; + addrSize = sizeof(*packet.getAddress().getAddress()); + ssize_t ret = recvfrom(listenSocket, + packet.getData(), + packet.getSize(), + 0, + packet.getAddress().getAddress(), + &addrSize); + + if (ret == SOCKET_ERROR) { + packet.setSize(0); + if (errno == EAGAIN) { + return 0; + } + setInetError("Can't receive UDP packet"); + return -1; } - setInetError("Can't receive UDP packet"); - return -1; + packet.setSize(ret); + return 0; + } + else { + packet.setSize(0); + return 0; } - - packet.setSize(ret); - return 0; } int UdpSocket::send(UdpPacket& packet) @@ -290,6 +312,8 @@ void UdpReceiver::m_run() private: atomic<bool>& m_stop; } autoSetStop(m_stop); + set_thread_name("udp_rx"); + if (IN_MULTICAST(ntohl(inet_addr(m_mcastaddr.c_str())))) { m_sock.reinit(m_port, m_mcastaddr); m_sock.setMulticastSource(m_bindto.c_str()); @@ -313,8 +337,7 @@ void UdpReceiver::m_run() // If this blocks, the UDP socket will lose incoming packets m_packets.push_wait_if_full(packet, m_max_packets_queued); } - else - { + else { if (inetErrNo != EINTR) { // TODO replace fprintf fprintf(stderr, "Socket error: %s\n", inetErrMsg); diff --git a/lib/UdpSocket.h b/lib/UdpSocket.h index 8c71b9d..b299c82 100644 --- a/lib/UdpSocket.h +++ b/lib/UdpSocket.h @@ -107,7 +107,7 @@ class UdpSocket /** Receive an UDP packet. * @param packet The packet that will receive the data. The address will be set * to the source address. - * @return 0 if ok, -1 if error + * @return 0 if ok or timeout, -1 if error */ int receive(UdpPacket& packet); diff --git a/src/DabMod.cpp b/src/DabMod.cpp index bc32b9d..e388b15 100644 --- a/src/DabMod.cpp +++ b/src/DabMod.cpp @@ -330,14 +330,21 @@ int launch_modulator(int argc, char* argv[]) bool first_frame = true; while (running) { - while (not ediReader.isFrameReady()) { - bool success = ediUdpInput.rxPacket(); - if (not success) { + while (running and not ediReader.isFrameReady()) { + try { + ediUdpInput.rxPacket(); + } + catch (std::runtime_error& e) { + etiLog.level(warn) << "EDI input: " << e.what(); running = 0; break; } } + if (not running) { + break; + } + if (first_frame) { if (ediReader.getFp() != 0) { // Do not start the flowgraph before we get to FP 0 diff --git a/src/EtiReader.cpp b/src/EtiReader.cpp index c1b7445..4c5ad79 100644 --- a/src/EtiReader.cpp +++ b/src/EtiReader.cpp @@ -591,14 +591,13 @@ void EdiUdpInput::Open(const std::string& uri) bool EdiUdpInput::rxPacket() { - try { - auto udp_data = m_udp_rx.get_packet_buffer(); - m_decoder.push_packet(udp_data); - return true; - } - catch (std::runtime_error& e) { - etiLog.level(warn) << "EDI input: " << e.what(); + auto udp_data = m_udp_rx.get_packet_buffer(); + + if (udp_data.empty()) { return false; } + + m_decoder.push_packet(udp_data); + return true; } #endif // HAVE_EDI |