aboutsummaryrefslogtreecommitdiffstats
path: root/src/InputZeroMQReader.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/InputZeroMQReader.cpp')
-rw-r--r--src/InputZeroMQReader.cpp26
1 files changed, 23 insertions, 3 deletions
diff --git a/src/InputZeroMQReader.cpp b/src/InputZeroMQReader.cpp
index aa342d5..f6a816a 100644
--- a/src/InputZeroMQReader.cpp
+++ b/src/InputZeroMQReader.cpp
@@ -96,8 +96,6 @@ int InputZeroMQReader::Open(const string& uri, size_t max_queued_frames)
int InputZeroMQReader::GetNextFrame(void* buffer)
{
- const size_t framesize = 6144;
-
if (not m_running) {
return 0;
}
@@ -123,7 +121,17 @@ int InputZeroMQReader::GetNextFrame(void* buffer)
throw zmq_input_overflow();
}
- memcpy(buffer, &incoming->front(), framesize);
+
+ const size_t framesize = 6144;
+ if (incoming->empty()) {
+ return 0;
+ }
+ else if (incoming->size() == framesize) {
+ memcpy(buffer, &incoming->front(), framesize);
+ }
+ else {
+ throw logic_error("ZMQ ETI not 6144");
+ }
return framesize;
}
@@ -170,6 +178,18 @@ void InputZeroMQReader::RecvProcess()
if (success) try {
while (m_running) {
zmq::message_t incoming;
+ zmq::pollitem_t items[1];
+ items[0].socket = subscriber;
+ items[0].events = ZMQ_POLLIN;
+ const int zmq_timeout_ms = 100;
+ const int num_events = zmq::poll(items, 1, zmq_timeout_ms);
+ if (num_events == 0) {
+ // timeout is signalled by an empty buffer
+ auto buf = make_shared<vector<uint8_t> >();
+ m_in_messages.push(buf);
+ continue;
+ }
+
subscriber.recv(&incoming);
if (m_to_drop) {