aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--lib/UdpSocket.cpp7
-rw-r--r--lib/UdpSocket.h3
-rw-r--r--src/EtiReader.cpp9
3 files changed, 15 insertions, 4 deletions
diff --git a/lib/UdpSocket.cpp b/lib/UdpSocket.cpp
index b88c731..570da5f 100644
--- a/lib/UdpSocket.cpp
+++ b/lib/UdpSocket.cpp
@@ -261,8 +261,9 @@ UdpReceiver::~UdpReceiver() {
}
}
-void UdpReceiver::start(int port) {
+void UdpReceiver::start(int port, size_t max_packets_queued) {
m_port = port;
+ m_max_packets_queued = max_packets_queued;
m_thread = std::thread(&UdpReceiver::m_run, this);
}
@@ -299,7 +300,9 @@ void UdpReceiver::m_run()
// TODO replace fprintf
fprintf(stderr, "Warning, possible UDP truncation\n");
}
- m_packets.push(packet);
+
+ // If this blocks, the UDP socket will lose incoming packets
+ m_packets.push_wait_if_full(packet, m_max_packets_queued);
}
else
{
diff --git a/lib/UdpSocket.h b/lib/UdpSocket.h
index 81a7d2b..8c968d2 100644
--- a/lib/UdpSocket.h
+++ b/lib/UdpSocket.h
@@ -184,7 +184,7 @@ class UdpReceiver {
UdpReceiver operator=(const UdpReceiver&) = delete;
// Start the receiver in a separate thread
- void start(int port);
+ void start(int port, size_t max_packets_queued);
// Get the data contained in a UDP packet, blocks if none available
// In case of error, throws a runtime_error
@@ -194,6 +194,7 @@ class UdpReceiver {
void m_run(void);
int m_port;
+ size_t m_max_packets_queued;
std::thread m_thread;
std::atomic<bool> m_stop;
ThreadsafeQueue<UdpPacket> m_packets;
diff --git a/src/EtiReader.cpp b/src/EtiReader.cpp
index f0a1793..de23142 100644
--- a/src/EtiReader.cpp
+++ b/src/EtiReader.cpp
@@ -545,7 +545,14 @@ void EdiUdpInput::Open(const std::string& uri)
m_port = std::stoi(m[1].str());
etiLog.level(info) << "EDI port :" << m_port;
- m_udp_rx.start(m_port);
+
+ // The max_fragments_queued is only a protection against a runaway
+ // memory usage.
+ // Rough calculation:
+ // 300 seconds, 24ms per frame, up to 20 fragments per frame
+ const size_t max_fragments_queued = 20 * 300 * 1000 / 24;
+
+ m_udp_rx.start(m_port, max_fragments_queued);
m_enabled = true;
}
}