From 5c763a0a7c5d15a46574efd664431db67750e5c2 Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Sun, 14 Jan 2018 01:03:05 +0100 Subject: Add timeout to zmq2farsync --- src/zmq2farsync/zmq2farsync.cpp | 84 ++++++++++++++++++++++++----------------- 1 file changed, 49 insertions(+), 35 deletions(-) (limited to 'src/zmq2farsync') diff --git a/src/zmq2farsync/zmq2farsync.cpp b/src/zmq2farsync/zmq2farsync.cpp index 72f4cc3..16830a2 100644 --- a/src/zmq2farsync/zmq2farsync.cpp +++ b/src/zmq2farsync/zmq2farsync.cpp @@ -3,7 +3,7 @@ 2011, 2012 Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2015 + Copyright (C) 2018 Matthias P. Braendli, matthias.braendli@mpb.li http://www.opendigitalradio.org @@ -31,6 +31,9 @@ #include #include +constexpr size_t MAX_ERROR_COUNT = 10; +constexpr long ZMQ_TIMEOUT_MS = 1000; + void usage(void) { using namespace std; @@ -41,7 +44,11 @@ void usage(void) cerr << "Where" << endl; cerr << " is a ZMQ URL that points to a ODR-DabMux ZMQ output." << endl; cerr << " is the device information for the FarSync card." << endl << endl; - cerr << " The syntax is the same as for ODR-DabMux" << endl; + cerr << " The syntax is the same as for ODR-DabMux" << endl << endl; + + cerr << "odr-zmq2edi will quit if it does not receive data for " << + (int)(MAX_ERROR_COUNT * ZMQ_TIMEOUT_MS / 1000.0) << " seconds." << endl; + cerr << "It is best practice to run this tool under a process supervisor that will restart it automatically." << endl; } int main(int argc, char **argv) @@ -82,53 +89,60 @@ int main(int argc, char **argv) size_t frame_count = 0; size_t loop_counter = 0; size_t error_count = 0; - while (error_count < 10) + while (error_count < MAX_ERROR_COUNT) { zmq::message_t incoming; - zmq_sock.recv(&incoming); - - zmq_dab_message_t* dab_msg = (zmq_dab_message_t*)incoming.data(); - - if (dab_msg->version != 1) { - etiLog.level(error) << "ZeroMQ wrong packet version " << dab_msg->version; + zmq::pollitem_t items[1]; + items[0].socket = zmq_sock; + items[0].events = ZMQ_POLLIN; + const int num_events = zmq::poll(items, 1, ZMQ_TIMEOUT_MS); + if (num_events == 0) { // timeout error_count++; } + else { + // Event received: recv will not block + zmq_sock.recv(&incoming); + zmq_dab_message_t* dab_msg = (zmq_dab_message_t*)incoming.data(); - int offset = sizeof(dab_msg->version) + - NUM_FRAMES_PER_ZMQ_MESSAGE * sizeof(*dab_msg->buflen); - - for (int i = 0; i < NUM_FRAMES_PER_ZMQ_MESSAGE; i++) { - if (dab_msg->buflen[i] <= 0 || - dab_msg->buflen[i] > 6144) - { - etiLog.level(error) << "ZeroMQ buffer " << i << " has invalid length " << - dab_msg->buflen[i]; + if (dab_msg->version != 1) { + etiLog.level(error) << "ZeroMQ wrong packet version " << dab_msg->version; error_count++; } - else { - std::vector buf(6144, 0x55); - - const int framesize = dab_msg->buflen[i]; - - memcpy(&buf.front(), - ((uint8_t*)incoming.data()) + offset, - framesize); - offset += framesize; + int offset = sizeof(dab_msg->version) + + NUM_FRAMES_PER_ZMQ_MESSAGE * sizeof(*dab_msg->buflen); - if (output.Write(&buf.front(), buf.size()) == -1) { - etiLog.level(error) << "Cannot write to output!"; + for (int i = 0; i < NUM_FRAMES_PER_ZMQ_MESSAGE; i++) { + if (dab_msg->buflen[i] <= 0 or dab_msg->buflen[i] > 6144) { + etiLog.level(error) << "ZeroMQ buffer " << i << " has invalid length " << + dab_msg->buflen[i]; error_count++; } + else { + std::vector buf(6144, 0x55); - frame_count++; + const int framesize = dab_msg->buflen[i]; + + memcpy(&buf.front(), + ((uint8_t*)incoming.data()) + offset, + framesize); + + offset += framesize; + + if (output.Write(&buf.front(), buf.size()) == -1) { + etiLog.level(error) << "Cannot write to output!"; + error_count++; + } + + frame_count++; + } } - } - loop_counter++; - if (loop_counter > 250) { - etiLog.level(info) << "Transmitted " << frame_count << " ETI frames"; - loop_counter = 0; + loop_counter++; + if (loop_counter > 250) { + etiLog.level(info) << "Transmitted " << frame_count << " ETI frames"; + loop_counter = 0; + } } } -- cgit v1.2.3