summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--lib/UdpSocket.cpp63
-rw-r--r--lib/UdpSocket.h2
-rw-r--r--src/DabMod.cpp13
-rw-r--r--src/EtiReader.cpp13
4 files changed, 60 insertions, 31 deletions
diff --git a/lib/UdpSocket.cpp b/lib/UdpSocket.cpp
index 7645eaf..345b971 100644
--- a/lib/UdpSocket.cpp
+++ b/lib/UdpSocket.cpp
@@ -25,6 +25,7 @@
*/
#include "UdpSocket.h"
+#include "Utils.h"
#include <iostream>
#include <stdio.h>
@@ -116,29 +117,50 @@ UdpSocket::~UdpSocket()
}
}
+static inline bool wait_for_recv_ready(int sock_fd, const size_t timeout_ms)
+{
+ //setup timeval for timeout
+ timeval tv;
+ tv.tv_sec = 0;
+ tv.tv_usec = timeout_ms*1000;
+
+ //setup rset for timeout
+ fd_set rset;
+ FD_ZERO(&rset);
+ FD_SET(sock_fd, &rset);
+
+ return ::select(sock_fd+1, &rset, NULL, NULL, &tv) > 0;
+}
int UdpSocket::receive(UdpPacket& packet)
{
- socklen_t addrSize;
- addrSize = sizeof(*packet.getAddress().getAddress());
- ssize_t ret = recvfrom(listenSocket,
- packet.getData(),
- packet.getSize(),
- 0,
- packet.getAddress().getAddress(),
- &addrSize);
-
- if (ret == SOCKET_ERROR) {
- packet.setSize(0);
- if (errno == EAGAIN) {
- return 0;
+ bool ready = wait_for_recv_ready(listenSocket, 2000);
+
+ if (ready) {
+ socklen_t addrSize;
+ addrSize = sizeof(*packet.getAddress().getAddress());
+ ssize_t ret = recvfrom(listenSocket,
+ packet.getData(),
+ packet.getSize(),
+ 0,
+ packet.getAddress().getAddress(),
+ &addrSize);
+
+ if (ret == SOCKET_ERROR) {
+ packet.setSize(0);
+ if (errno == EAGAIN) {
+ return 0;
+ }
+ setInetError("Can't receive UDP packet");
+ return -1;
}
- setInetError("Can't receive UDP packet");
- return -1;
+ packet.setSize(ret);
+ return 0;
+ }
+ else {
+ packet.setSize(0);
+ return 0;
}
-
- packet.setSize(ret);
- return 0;
}
int UdpSocket::send(UdpPacket& packet)
@@ -290,6 +312,8 @@ void UdpReceiver::m_run()
private: atomic<bool>& m_stop;
} autoSetStop(m_stop);
+ set_thread_name("udp_rx");
+
if (IN_MULTICAST(ntohl(inet_addr(m_mcastaddr.c_str())))) {
m_sock.reinit(m_port, m_mcastaddr);
m_sock.setMulticastSource(m_bindto.c_str());
@@ -313,8 +337,7 @@ void UdpReceiver::m_run()
// If this blocks, the UDP socket will lose incoming packets
m_packets.push_wait_if_full(packet, m_max_packets_queued);
}
- else
- {
+ else {
if (inetErrNo != EINTR) {
// TODO replace fprintf
fprintf(stderr, "Socket error: %s\n", inetErrMsg);
diff --git a/lib/UdpSocket.h b/lib/UdpSocket.h
index 8c71b9d..b299c82 100644
--- a/lib/UdpSocket.h
+++ b/lib/UdpSocket.h
@@ -107,7 +107,7 @@ class UdpSocket
/** Receive an UDP packet.
* @param packet The packet that will receive the data. The address will be set
* to the source address.
- * @return 0 if ok, -1 if error
+ * @return 0 if ok or timeout, -1 if error
*/
int receive(UdpPacket& packet);
diff --git a/src/DabMod.cpp b/src/DabMod.cpp
index bc32b9d..e388b15 100644
--- a/src/DabMod.cpp
+++ b/src/DabMod.cpp
@@ -330,14 +330,21 @@ int launch_modulator(int argc, char* argv[])
bool first_frame = true;
while (running) {
- while (not ediReader.isFrameReady()) {
- bool success = ediUdpInput.rxPacket();
- if (not success) {
+ while (running and not ediReader.isFrameReady()) {
+ try {
+ ediUdpInput.rxPacket();
+ }
+ catch (std::runtime_error& e) {
+ etiLog.level(warn) << "EDI input: " << e.what();
running = 0;
break;
}
}
+ if (not running) {
+ break;
+ }
+
if (first_frame) {
if (ediReader.getFp() != 0) {
// Do not start the flowgraph before we get to FP 0
diff --git a/src/EtiReader.cpp b/src/EtiReader.cpp
index c1b7445..4c5ad79 100644
--- a/src/EtiReader.cpp
+++ b/src/EtiReader.cpp
@@ -591,14 +591,13 @@ void EdiUdpInput::Open(const std::string& uri)
bool EdiUdpInput::rxPacket()
{
- try {
- auto udp_data = m_udp_rx.get_packet_buffer();
- m_decoder.push_packet(udp_data);
- return true;
- }
- catch (std::runtime_error& e) {
- etiLog.level(warn) << "EDI input: " << e.what();
+ auto udp_data = m_udp_rx.get_packet_buffer();
+
+ if (udp_data.empty()) {
return false;
}
+
+ m_decoder.push_packet(udp_data);
+ return true;
}
#endif // HAVE_EDI