summaryrefslogtreecommitdiffstats
path: root/src/EtiReader.cpp
diff options
context:
space:
mode:
authorMatthias P. Braendli <matthias.braendli@mpb.li>2021-01-16 08:06:09 +0100
committerMatthias P. Braendli <matthias.braendli@mpb.li>2021-01-16 08:06:09 +0100
commitd14814a92377084177753c7a60d83a9307ad0672 (patch)
tree3b901eed7eaacc07341d16dbcd0db0d60951a5e0 /src/EtiReader.cpp
parent0efb3830dcd441ffdb53ebe69f2dc2886614fb8b (diff)
downloaddabmod-d14814a92377084177753c7a60d83a9307ad0672.tar.gz
dabmod-d14814a92377084177753c7a60d83a9307ad0672.tar.bz2
dabmod-d14814a92377084177753c7a60d83a9307ad0672.zip
Update common code to latest, update zmq.hpp and adapt
Diffstat (limited to 'src/EtiReader.cpp')
-rw-r--r--src/EtiReader.cpp59
1 files changed, 46 insertions, 13 deletions
diff --git a/src/EtiReader.cpp b/src/EtiReader.cpp
index 33194b2..51266af 100644
--- a/src/EtiReader.cpp
+++ b/src/EtiReader.cpp
@@ -28,6 +28,7 @@
#include "Log.h"
#include "PcDebug.h"
#include "TimestampDecoder.h"
+#include "edi/common.hpp"
#include <stdexcept>
#include <memory>
@@ -544,6 +545,10 @@ void EdiTransport::Open(const std::string& uri)
const string proto = uri.substr(0, 3);
if (proto == "udp") {
+ if (m_proto == Proto::TCP) {
+ throw std::invalid_argument("Cannot specify both TCP and UDP urls");
+ }
+
size_t found_port = uri.find_first_of(":", 6);
if (found_port == string::npos) {
throw std::invalid_argument("EDI UDP input port must be provided");
@@ -565,17 +570,15 @@ void EdiTransport::Open(const std::string& uri)
etiLog.level(info) << "EDI UDP input: host:" << m_bindto <<
", source:" << m_mcastaddr << ", port:" << m_port;
- // The max_fragments_queued is only a protection against a runaway
- // memory usage.
- // Rough calculation:
- // 300 seconds, 24ms per frame, up to 20 fragments per frame
- const size_t max_fragments_queued = 20 * 300 * 1000 / 24;
-
- m_udp_rx.start(m_port, m_bindto, m_mcastaddr, max_fragments_queued);
+ m_udp_rx.add_receive_port(m_port, m_bindto, m_mcastaddr);
m_proto = Proto::UDP;
m_enabled = true;
}
else if (proto == "tcp") {
+ if (m_proto != Proto::Unspecified) {
+ throw std::invalid_argument("Cannot call Open several times with TCP");
+ }
+
size_t found_port = uri.find_first_of(":", 6);
if (found_port == string::npos) {
throw std::invalid_argument("EDI TCP input port must be provided");
@@ -598,16 +601,47 @@ void EdiTransport::Open(const std::string& uri)
bool EdiTransport::rxPacket()
{
switch (m_proto) {
+ case Proto::Unspecified:
+ {
+ etiLog.level(warn) << "EDI receiving from uninitialised socket";
+ return false;
+ }
case Proto::UDP:
{
- auto udp_data = m_udp_rx.get_packet_buffer();
-
- if (udp_data.empty()) {
+ Socket::InetAddress received_from;
+ try {
+ auto received_packets = m_udp_rx.receive(100);
+ for (auto rp : received_packets) {
+ received_from = rp.received_from;
+
+ EdiDecoder::Packet p;
+ p.buf = move(rp.packetdata);
+ p.received_on_port = rp.port_received_on;
+ m_decoder.push_packet(p);
+ }
+ return true;
+ }
+ catch (const Socket::UDPReceiver::Timeout&) {
return false;
}
+ catch (const Socket::UDPReceiver::Interrupted&) {
+ return false;
+ }
+ catch (const invalid_argument& e) {
+ try {
+ fprintf(stderr, "Invalid argument receiving EDI from %s: %s\n",
+ received_from.to_string().c_str(), e.what());
+ }
+ catch (const invalid_argument& ee) {
+ fprintf(stderr, "Invalid argument receiving EDI %s\n", e.what());
+ fprintf(stderr, "Invalid argument converting source address %s\n", ee.what());
+ }
+ }
+ catch (const runtime_error& e) {
+ fprintf(stderr, "Runtime error UDP Receive: %s\n", e.what());
+ }
- m_decoder.push_packet(udp_data);
- return true;
+ return false;
}
case Proto::TCP:
{
@@ -648,4 +682,3 @@ EdiInput::EdiInput(double& tist_offset_s, float edi_max_delay_ms) :
decoder.setMaxDelay(lroundf(edi_max_delay_ms / 24.0f));
}
}
-