From d14814a92377084177753c7a60d83a9307ad0672 Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Sat, 16 Jan 2021 08:06:09 +0100 Subject: Update common code to latest, update zmq.hpp and adapt --- src/EtiReader.cpp | 59 +++++++++++++++++++++++++++++++++++++++++++------------ 1 file changed, 46 insertions(+), 13 deletions(-) (limited to 'src/EtiReader.cpp') diff --git a/src/EtiReader.cpp b/src/EtiReader.cpp index 33194b2..51266af 100644 --- a/src/EtiReader.cpp +++ b/src/EtiReader.cpp @@ -28,6 +28,7 @@ #include "Log.h" #include "PcDebug.h" #include "TimestampDecoder.h" +#include "edi/common.hpp" #include #include @@ -544,6 +545,10 @@ void EdiTransport::Open(const std::string& uri) const string proto = uri.substr(0, 3); if (proto == "udp") { + if (m_proto == Proto::TCP) { + throw std::invalid_argument("Cannot specify both TCP and UDP urls"); + } + size_t found_port = uri.find_first_of(":", 6); if (found_port == string::npos) { throw std::invalid_argument("EDI UDP input port must be provided"); @@ -565,17 +570,15 @@ void EdiTransport::Open(const std::string& uri) etiLog.level(info) << "EDI UDP input: host:" << m_bindto << ", source:" << m_mcastaddr << ", port:" << m_port; - // The max_fragments_queued is only a protection against a runaway - // memory usage. - // Rough calculation: - // 300 seconds, 24ms per frame, up to 20 fragments per frame - const size_t max_fragments_queued = 20 * 300 * 1000 / 24; - - m_udp_rx.start(m_port, m_bindto, m_mcastaddr, max_fragments_queued); + m_udp_rx.add_receive_port(m_port, m_bindto, m_mcastaddr); m_proto = Proto::UDP; m_enabled = true; } else if (proto == "tcp") { + if (m_proto != Proto::Unspecified) { + throw std::invalid_argument("Cannot call Open several times with TCP"); + } + size_t found_port = uri.find_first_of(":", 6); if (found_port == string::npos) { throw std::invalid_argument("EDI TCP input port must be provided"); @@ -598,16 +601,47 @@ void EdiTransport::Open(const std::string& uri) bool EdiTransport::rxPacket() { switch (m_proto) { + case Proto::Unspecified: + { + etiLog.level(warn) << "EDI receiving from uninitialised socket"; + return false; + } case Proto::UDP: { - auto udp_data = m_udp_rx.get_packet_buffer(); - - if (udp_data.empty()) { + Socket::InetAddress received_from; + try { + auto received_packets = m_udp_rx.receive(100); + for (auto rp : received_packets) { + received_from = rp.received_from; + + EdiDecoder::Packet p; + p.buf = move(rp.packetdata); + p.received_on_port = rp.port_received_on; + m_decoder.push_packet(p); + } + return true; + } + catch (const Socket::UDPReceiver::Timeout&) { return false; } + catch (const Socket::UDPReceiver::Interrupted&) { + return false; + } + catch (const invalid_argument& e) { + try { + fprintf(stderr, "Invalid argument receiving EDI from %s: %s\n", + received_from.to_string().c_str(), e.what()); + } + catch (const invalid_argument& ee) { + fprintf(stderr, "Invalid argument receiving EDI %s\n", e.what()); + fprintf(stderr, "Invalid argument converting source address %s\n", ee.what()); + } + } + catch (const runtime_error& e) { + fprintf(stderr, "Runtime error UDP Receive: %s\n", e.what()); + } - m_decoder.push_packet(udp_data); - return true; + return false; } case Proto::TCP: { @@ -648,4 +682,3 @@ EdiInput::EdiInput(double& tist_offset_s, float edi_max_delay_ms) : decoder.setMaxDelay(lroundf(edi_max_delay_ms / 24.0f)); } } - -- cgit v1.2.3