diff options
Diffstat (limited to 'src/InputZeroMQReader.cpp')
-rw-r--r-- | src/InputZeroMQReader.cpp | 26 |
1 files changed, 23 insertions, 3 deletions
diff --git a/src/InputZeroMQReader.cpp b/src/InputZeroMQReader.cpp index aa342d5..f6a816a 100644 --- a/src/InputZeroMQReader.cpp +++ b/src/InputZeroMQReader.cpp @@ -96,8 +96,6 @@ int InputZeroMQReader::Open(const string& uri, size_t max_queued_frames) int InputZeroMQReader::GetNextFrame(void* buffer) { - const size_t framesize = 6144; - if (not m_running) { return 0; } @@ -123,7 +121,17 @@ int InputZeroMQReader::GetNextFrame(void* buffer) throw zmq_input_overflow(); } - memcpy(buffer, &incoming->front(), framesize); + + const size_t framesize = 6144; + if (incoming->empty()) { + return 0; + } + else if (incoming->size() == framesize) { + memcpy(buffer, &incoming->front(), framesize); + } + else { + throw logic_error("ZMQ ETI not 6144"); + } return framesize; } @@ -170,6 +178,18 @@ void InputZeroMQReader::RecvProcess() if (success) try { while (m_running) { zmq::message_t incoming; + zmq::pollitem_t items[1]; + items[0].socket = subscriber; + items[0].events = ZMQ_POLLIN; + const int zmq_timeout_ms = 100; + const int num_events = zmq::poll(items, 1, zmq_timeout_ms); + if (num_events == 0) { + // timeout is signalled by an empty buffer + auto buf = make_shared<vector<uint8_t> >(); + m_in_messages.push(buf); + continue; + } + subscriber.recv(&incoming); if (m_to_drop) { |