diff options
Diffstat (limited to 'src/input')
-rw-r--r-- | src/input/Edi.cpp | 22 | ||||
-rw-r--r-- | src/input/Zmq.cpp | 8 |
2 files changed, 21 insertions, 9 deletions
diff --git a/src/input/Edi.cpp b/src/input/Edi.cpp index a5e6525..e6a7e3e 100644 --- a/src/input/Edi.cpp +++ b/src/input/Edi.cpp @@ -35,6 +35,8 @@ #include <cstdlib> #include <cerrno> #include <climits> +#include "Socket.h" +#include "edi/common.hpp" #include "utils.h" using namespace std; @@ -330,13 +332,14 @@ void Edi::m_run() case InputUsed::UDP: { constexpr size_t packsize = 2048; - const auto packet = m_udp_sock.receive(packsize); + auto packet = m_udp_sock.receive(packsize); if (packet.buffer.size() == packsize) { fprintf(stderr, "Warning, possible UDP truncation\n"); } if (not packet.buffer.empty()) { try { - m_sti_decoder.push_packet(packet.buffer); + EdiDecoder::Packet p(move(packet.buffer)); + m_sti_decoder.push_packet(p); } catch (const runtime_error& e) { etiLog.level(warn) << "EDI input " << m_name << " exception: " << e.what(); @@ -350,19 +353,26 @@ void Edi::m_run() break; case InputUsed::TCP: { - auto packet = m_tcp_receive_server.receive(); - if (not packet.empty()) { + auto message = m_tcp_receive_server.receive(); + if (auto data = dynamic_pointer_cast<Socket::TCPReceiveMessageData>(message)) { try { - m_sti_decoder.push_bytes(packet); + m_sti_decoder.push_bytes(data->data); } catch (const runtime_error& e) { etiLog.level(warn) << "EDI input " << m_name << " exception: " << e.what(); this_thread::sleep_for(chrono::milliseconds(24)); } } - else { + else if (dynamic_pointer_cast<Socket::TCPReceiveMessageDisconnected>(message)) { + etiLog.level(info) << "EDI input " << m_name << " disconnected"; + m_sti_decoder.push_bytes({}); // Push an empty frame to clear the internal state + } + else if (dynamic_pointer_cast<Socket::TCPReceiveMessageEmpty>(message)) { this_thread::sleep_for(chrono::milliseconds(12)); } + else { + throw logic_error("unimplemented TCPReceiveMessage type"); + } } break; default: diff --git a/src/input/Zmq.cpp b/src/input/Zmq.cpp index 305653b..be3fd1f 100644 --- a/src/input/Zmq.cpp +++ b/src/input/Zmq.cpp @@ -51,6 +51,7 @@ #include <limits.h> #include "PcDebug.h" #include "Log.h" +#include "zmq.hpp" #ifdef __MINGW32__ # define bzero(s, n) memset(s, 0, n) @@ -348,7 +349,8 @@ int ZmqMPEG::readFromSocket(size_t framesize) zmq::message_t msg; try { - messageReceived = m_zmq_sock.recv(&msg, ZMQ_DONTWAIT); + auto result = m_zmq_sock.recv(msg, zmq::recv_flags::dontwait); + messageReceived = result.has_value(); if (not messageReceived) { return 0; } @@ -417,7 +419,8 @@ int ZmqAAC::readFromSocket(size_t framesize) zmq::message_t msg; try { - messageReceived = m_zmq_sock.recv(&msg, ZMQ_DONTWAIT); + auto result = m_zmq_sock.recv(msg, zmq::recv_flags::dontwait); + messageReceived = result.has_value(); if (not messageReceived) { return 0; } @@ -615,4 +618,3 @@ const string ZmqBase::get_parameter(const string& parameter) const } }; - |