summaryrefslogtreecommitdiffstats
path: root/src/input
diff options
context:
space:
mode:
Diffstat (limited to 'src/input')
-rw-r--r--src/input/Edi.cpp22
-rw-r--r--src/input/Zmq.cpp8
2 files changed, 21 insertions, 9 deletions
diff --git a/src/input/Edi.cpp b/src/input/Edi.cpp
index a5e6525..e6a7e3e 100644
--- a/src/input/Edi.cpp
+++ b/src/input/Edi.cpp
@@ -35,6 +35,8 @@
#include <cstdlib>
#include <cerrno>
#include <climits>
+#include "Socket.h"
+#include "edi/common.hpp"
#include "utils.h"
using namespace std;
@@ -330,13 +332,14 @@ void Edi::m_run()
case InputUsed::UDP:
{
constexpr size_t packsize = 2048;
- const auto packet = m_udp_sock.receive(packsize);
+ auto packet = m_udp_sock.receive(packsize);
if (packet.buffer.size() == packsize) {
fprintf(stderr, "Warning, possible UDP truncation\n");
}
if (not packet.buffer.empty()) {
try {
- m_sti_decoder.push_packet(packet.buffer);
+ EdiDecoder::Packet p(move(packet.buffer));
+ m_sti_decoder.push_packet(p);
}
catch (const runtime_error& e) {
etiLog.level(warn) << "EDI input " << m_name << " exception: " << e.what();
@@ -350,19 +353,26 @@ void Edi::m_run()
break;
case InputUsed::TCP:
{
- auto packet = m_tcp_receive_server.receive();
- if (not packet.empty()) {
+ auto message = m_tcp_receive_server.receive();
+ if (auto data = dynamic_pointer_cast<Socket::TCPReceiveMessageData>(message)) {
try {
- m_sti_decoder.push_bytes(packet);
+ m_sti_decoder.push_bytes(data->data);
}
catch (const runtime_error& e) {
etiLog.level(warn) << "EDI input " << m_name << " exception: " << e.what();
this_thread::sleep_for(chrono::milliseconds(24));
}
}
- else {
+ else if (dynamic_pointer_cast<Socket::TCPReceiveMessageDisconnected>(message)) {
+ etiLog.level(info) << "EDI input " << m_name << " disconnected";
+ m_sti_decoder.push_bytes({}); // Push an empty frame to clear the internal state
+ }
+ else if (dynamic_pointer_cast<Socket::TCPReceiveMessageEmpty>(message)) {
this_thread::sleep_for(chrono::milliseconds(12));
}
+ else {
+ throw logic_error("unimplemented TCPReceiveMessage type");
+ }
}
break;
default:
diff --git a/src/input/Zmq.cpp b/src/input/Zmq.cpp
index 305653b..be3fd1f 100644
--- a/src/input/Zmq.cpp
+++ b/src/input/Zmq.cpp
@@ -51,6 +51,7 @@
#include <limits.h>
#include "PcDebug.h"
#include "Log.h"
+#include "zmq.hpp"
#ifdef __MINGW32__
# define bzero(s, n) memset(s, 0, n)
@@ -348,7 +349,8 @@ int ZmqMPEG::readFromSocket(size_t framesize)
zmq::message_t msg;
try {
- messageReceived = m_zmq_sock.recv(&msg, ZMQ_DONTWAIT);
+ auto result = m_zmq_sock.recv(msg, zmq::recv_flags::dontwait);
+ messageReceived = result.has_value();
if (not messageReceived) {
return 0;
}
@@ -417,7 +419,8 @@ int ZmqAAC::readFromSocket(size_t framesize)
zmq::message_t msg;
try {
- messageReceived = m_zmq_sock.recv(&msg, ZMQ_DONTWAIT);
+ auto result = m_zmq_sock.recv(msg, zmq::recv_flags::dontwait);
+ messageReceived = result.has_value();
if (not messageReceived) {
return 0;
}
@@ -615,4 +618,3 @@ const string ZmqBase::get_parameter(const string& parameter) const
}
};
-