From 939abd61fc10cb8df2ecdc6d92764bb3ea5b41a7 Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Wed, 20 Nov 2019 16:22:20 +0100 Subject: zmq2edi: never quit because of input resets, make backoff configurable --- src/zmq2edi/zmq2edi.cpp | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/src/zmq2edi/zmq2edi.cpp b/src/zmq2edi/zmq2edi.cpp index fead249..f7d733c 100644 --- a/src/zmq2edi/zmq2edi.cpp +++ b/src/zmq2edi/zmq2edi.cpp @@ -40,8 +40,8 @@ #include "dabOutput/dabOutput.h" constexpr size_t MAX_ERROR_COUNT = 10; -constexpr size_t MAX_NUM_RESETS = 180; constexpr long ZMQ_TIMEOUT_MS = 1000; +constexpr long DEFAULT_BACKOFF = 5000; static edi::configuration_t edi_conf; @@ -66,7 +66,8 @@ static void usage() cerr << " -i Enable the interleaver with this latency." << endl; cerr << " -D Dump the EDI to edi.debug file." << endl; cerr << " -v Enables verbose mode." << endl; - cerr << " -a Set the alignment of the TAG Packet (default 8)." << endl << endl; + cerr << " -a Set the alignment of the TAG Packet (default 8)." << endl; + cerr << " -b Number of milliseconds to backoff after an input reset (default " << DEFAULT_BACKOFF << ")." << endl << endl; cerr << "The following options can be given several times, when more than UDP destination is desired:" << endl; cerr << " -d Set the destination ip." << endl; @@ -76,7 +77,6 @@ static void usage() cerr << "The input socket will be reset if no data is received for " << (int)(MAX_ERROR_COUNT * ZMQ_TIMEOUT_MS / 1000.0) << " seconds." << endl; - cerr << "After " << MAX_NUM_RESETS << " consecutive resets, the process will quit." << endl; cerr << "It is best practice to run this tool under a process supervisor that will restart it automatically." << endl; } @@ -233,10 +233,11 @@ int start(int argc, char **argv) int delay_ms = 500; bool drop_late_packets = false; + uint32_t backoff_after_reset_ms = DEFAULT_BACKOFF; int ch = 0; while (ch != -1) { - ch = getopt(argc, argv, "d:p:s:S:t:Pf:i:Dva:w:x"); + ch = getopt(argc, argv, "d:p:s:S:t:Pf:i:Dva:b:w:xh"); switch (ch) { case -1: break; @@ -281,6 +282,9 @@ int start(int argc, char **argv) case 'a': edi_conf.tagpacket_alignment = std::stoi(optarg); break; + case 'b': + backoff_after_reset_ms = std::stoi(optarg); + break; case 'w': delay_ms = std::stoi(optarg); break; @@ -322,7 +326,7 @@ int start(int argc, char **argv) etiLog.level(info) << "Opening ZMQ input: " << source_url; size_t num_consecutive_resets = 0; - while (num_consecutive_resets < MAX_NUM_RESETS) { + while (true) { zmq::socket_t zmq_sock(zmq_ctx, ZMQ_SUB); zmq_sock.connect(source_url); zmq_sock.setsockopt(ZMQ_SUBSCRIBE, NULL, 0); // subscribe to all messages @@ -400,7 +404,7 @@ int start(int argc, char **argv) num_consecutive_resets++; zmq_sock.close(); - std::this_thread::sleep_for(std::chrono::seconds(5)); + std::this_thread::sleep_for(std::chrono::milliseconds(backoff_after_reset_ms)); etiLog.level(info) << "ZMQ input (" << source_url << ") timeout after " << num_consecutive_resets << " consecutive resets."; } -- cgit v1.2.3