aboutsummaryrefslogtreecommitdiffstats
path: root/src/zmq2farsync
diff options
context:
space:
mode:
authorMatthias P. Braendli <matthias.braendli@mpb.li>2018-01-14 01:03:05 +0100
committerMatthias P. Braendli <matthias.braendli@mpb.li>2018-01-14 01:03:05 +0100
commit5c763a0a7c5d15a46574efd664431db67750e5c2 (patch)
tree2ea59b187e88d35733fe79a315d6f4d7889572c6 /src/zmq2farsync
parent9f6ee28853dfefd8c6c8250b8f0416ad43469737 (diff)
downloaddabmux-5c763a0a7c5d15a46574efd664431db67750e5c2.tar.gz
dabmux-5c763a0a7c5d15a46574efd664431db67750e5c2.tar.bz2
dabmux-5c763a0a7c5d15a46574efd664431db67750e5c2.zip
Add timeout to zmq2farsync
Diffstat (limited to 'src/zmq2farsync')
-rw-r--r--src/zmq2farsync/zmq2farsync.cpp84
1 files changed, 49 insertions, 35 deletions
diff --git a/src/zmq2farsync/zmq2farsync.cpp b/src/zmq2farsync/zmq2farsync.cpp
index 72f4cc3..16830a2 100644
--- a/src/zmq2farsync/zmq2farsync.cpp
+++ b/src/zmq2farsync/zmq2farsync.cpp
@@ -3,7 +3,7 @@
2011, 2012 Her Majesty the Queen in Right of Canada (Communications
Research Center Canada)
- Copyright (C) 2015
+ Copyright (C) 2018
Matthias P. Braendli, matthias.braendli@mpb.li
http://www.opendigitalradio.org
@@ -31,6 +31,9 @@
#include <iostream>
#include <vector>
+constexpr size_t MAX_ERROR_COUNT = 10;
+constexpr long ZMQ_TIMEOUT_MS = 1000;
+
void usage(void)
{
using namespace std;
@@ -41,7 +44,11 @@ void usage(void)
cerr << "Where" << endl;
cerr << " <source> is a ZMQ URL that points to a ODR-DabMux ZMQ output." << endl;
cerr << " <destination> is the device information for the FarSync card." << endl << endl;
- cerr << " The syntax is the same as for ODR-DabMux" << endl;
+ cerr << " The syntax is the same as for ODR-DabMux" << endl << endl;
+
+ cerr << "odr-zmq2edi will quit if it does not receive data for " <<
+ (int)(MAX_ERROR_COUNT * ZMQ_TIMEOUT_MS / 1000.0) << " seconds." << endl;
+ cerr << "It is best practice to run this tool under a process supervisor that will restart it automatically." << endl;
}
int main(int argc, char **argv)
@@ -82,53 +89,60 @@ int main(int argc, char **argv)
size_t frame_count = 0;
size_t loop_counter = 0;
size_t error_count = 0;
- while (error_count < 10)
+ while (error_count < MAX_ERROR_COUNT)
{
zmq::message_t incoming;
- zmq_sock.recv(&incoming);
-
- zmq_dab_message_t* dab_msg = (zmq_dab_message_t*)incoming.data();
-
- if (dab_msg->version != 1) {
- etiLog.level(error) << "ZeroMQ wrong packet version " << dab_msg->version;
+ zmq::pollitem_t items[1];
+ items[0].socket = zmq_sock;
+ items[0].events = ZMQ_POLLIN;
+ const int num_events = zmq::poll(items, 1, ZMQ_TIMEOUT_MS);
+ if (num_events == 0) { // timeout
error_count++;
}
+ else {
+ // Event received: recv will not block
+ zmq_sock.recv(&incoming);
+ zmq_dab_message_t* dab_msg = (zmq_dab_message_t*)incoming.data();
- int offset = sizeof(dab_msg->version) +
- NUM_FRAMES_PER_ZMQ_MESSAGE * sizeof(*dab_msg->buflen);
-
- for (int i = 0; i < NUM_FRAMES_PER_ZMQ_MESSAGE; i++) {
- if (dab_msg->buflen[i] <= 0 ||
- dab_msg->buflen[i] > 6144)
- {
- etiLog.level(error) << "ZeroMQ buffer " << i << " has invalid length " <<
- dab_msg->buflen[i];
+ if (dab_msg->version != 1) {
+ etiLog.level(error) << "ZeroMQ wrong packet version " << dab_msg->version;
error_count++;
}
- else {
- std::vector<uint8_t> buf(6144, 0x55);
-
- const int framesize = dab_msg->buflen[i];
-
- memcpy(&buf.front(),
- ((uint8_t*)incoming.data()) + offset,
- framesize);
- offset += framesize;
+ int offset = sizeof(dab_msg->version) +
+ NUM_FRAMES_PER_ZMQ_MESSAGE * sizeof(*dab_msg->buflen);
- if (output.Write(&buf.front(), buf.size()) == -1) {
- etiLog.level(error) << "Cannot write to output!";
+ for (int i = 0; i < NUM_FRAMES_PER_ZMQ_MESSAGE; i++) {
+ if (dab_msg->buflen[i] <= 0 or dab_msg->buflen[i] > 6144) {
+ etiLog.level(error) << "ZeroMQ buffer " << i << " has invalid length " <<
+ dab_msg->buflen[i];
error_count++;
}
+ else {
+ std::vector<uint8_t> buf(6144, 0x55);
- frame_count++;
+ const int framesize = dab_msg->buflen[i];
+
+ memcpy(&buf.front(),
+ ((uint8_t*)incoming.data()) + offset,
+ framesize);
+
+ offset += framesize;
+
+ if (output.Write(&buf.front(), buf.size()) == -1) {
+ etiLog.level(error) << "Cannot write to output!";
+ error_count++;
+ }
+
+ frame_count++;
+ }
}
- }
- loop_counter++;
- if (loop_counter > 250) {
- etiLog.level(info) << "Transmitted " << frame_count << " ETI frames";
- loop_counter = 0;
+ loop_counter++;
+ if (loop_counter > 250) {
+ etiLog.level(info) << "Transmitted " << frame_count << " ETI frames";
+ loop_counter = 0;
+ }
}
}