diff options
Diffstat (limited to 'src/zmq2edi')
-rw-r--r-- | src/zmq2edi/EDISender.cpp | 2 | ||||
-rw-r--r-- | src/zmq2edi/EDISender.h | 6 | ||||
-rw-r--r-- | src/zmq2edi/zmq2edi.cpp | 168 |
3 files changed, 96 insertions, 80 deletions
diff --git a/src/zmq2edi/EDISender.cpp b/src/zmq2edi/EDISender.cpp index 2128abf..2188f8a 100644 --- a/src/zmq2edi/EDISender.cpp +++ b/src/zmq2edi/EDISender.cpp @@ -79,7 +79,7 @@ void EDISender::print_configuration() void EDISender::send_eti_frame(uint8_t* p, metadata_t metadata) { edi::TagDETI edi_tagDETI; - edi::TagStarPTR edi_tagStarPtr; + edi::TagStarPTR edi_tagStarPtr("DETI"); map<int, edi::TagESTn> edi_subchannelToTag; // The above Tag Items will be assembled into a TAG Packet edi::TagPacket edi_tagpacket(edi_conf.tagpacket_alignment); diff --git a/src/zmq2edi/EDISender.h b/src/zmq2edi/EDISender.h index bb9c8bc..3525b4b 100644 --- a/src/zmq2edi/EDISender.h +++ b/src/zmq2edi/EDISender.h @@ -34,9 +34,9 @@ #include <atomic> #include "ThreadsafeQueue.h" #include "dabOutput/dabOutput.h" -#include "dabOutput/edi/TagItems.h" -#include "dabOutput/edi/TagPacket.h" -#include "dabOutput/edi/Transport.h" +#include "edioutput/TagItems.h" +#include "edioutput/TagPacket.h" +#include "edioutput/Transport.h" // This metadata gets transmitted in the zmq stream struct metadata_t { diff --git a/src/zmq2edi/zmq2edi.cpp b/src/zmq2edi/zmq2edi.cpp index 3888d8a..f7d733c 100644 --- a/src/zmq2edi/zmq2edi.cpp +++ b/src/zmq2edi/zmq2edi.cpp @@ -27,11 +27,13 @@ #include "Log.h" #include "zmq.hpp" -#include <math.h> #include <getopt.h> -#include <string.h> +#include <cmath> +#include <cstring> +#include <chrono> #include <iostream> #include <iterator> +#include <thread> #include <vector> #include "EDISender.h" @@ -39,12 +41,13 @@ constexpr size_t MAX_ERROR_COUNT = 10; constexpr long ZMQ_TIMEOUT_MS = 1000; +constexpr long DEFAULT_BACKOFF = 5000; static edi::configuration_t edi_conf; static EDISender edisender; -void usage(void) +static void usage() { using namespace std; @@ -54,23 +57,25 @@ void usage(void) cerr << "Options:" << endl; cerr << "The following options can be given only once:" << endl; cerr << " <source> is a ZMQ URL that points to a ODR-DabMux ZMQ output." << endl; - cerr << " -w <delay> Keep every ETI frame until TIST is <delay> milliseconds after current system time." << endl; - cerr << " -x Drop frames where for which the wait time would be negative, i.e. frames that arrived too late." << endl; - cerr << " -p <destination port> sets the destination port." << endl; - cerr << " -P Disable PFT and send AFPackets." << endl; - cerr << " -f <fec> sets the FEC." << endl; - cerr << " -i <interleave> enables the interleaved with this latency." << endl; - cerr << " -D dumps the EDI to edi.debug file." << endl; - cerr << " -v Enables verbose mode." << endl; - cerr << " -a <tagpacket alignement> sets the alignment of the TAG Packet (default 8)." << endl << endl; + cerr << " -w <delay> Keep every ETI frame until TIST is <delay> milliseconds after current system time." << endl; + cerr << " Negative delay values are also allowed." << endl; + cerr << " -x Drop frames where for which the wait time would be negative, i.e. frames that arrived too late." << endl; + cerr << " -p <destination port> Set the destination port." << endl; + cerr << " -P Disable PFT and send AFPackets." << endl; + cerr << " -f <fec> Set the FEC." << endl; + cerr << " -i <interleave> Enable the interleaver with this latency." << endl; + cerr << " -D Dump the EDI to edi.debug file." << endl; + cerr << " -v Enables verbose mode." << endl; + cerr << " -a <alignement> Set the alignment of the TAG Packet (default 8)." << endl; + cerr << " -b <backoff> Number of milliseconds to backoff after an input reset (default " << DEFAULT_BACKOFF << ")." << endl << endl; cerr << "The following options can be given several times, when more than UDP destination is desired:" << endl; - cerr << " -d <destination ip> sets the destination ip." << endl; - cerr << " -s <source port> sets the source port." << endl; - cerr << " -S <source ip> select the source IP in case we want to use multicast." << endl; - cerr << " -t <ttl> set the packet's TTL." << endl << endl; + cerr << " -d <destination ip> Set the destination ip." << endl; + cerr << " -s <source port> Set the source port." << endl; + cerr << " -S <source ip> Select the source IP in case we want to use multicast." << endl; + cerr << " -t <ttl> Set the packet's TTL." << endl << endl; - cerr << "odr-zmq2edi will quit if it does not receive data for " << + cerr << "The input socket will be reset if no data is received for " << (int)(MAX_ERROR_COUNT * ZMQ_TIMEOUT_MS / 1000.0) << " seconds." << endl; cerr << "It is best practice to run this tool under a process supervisor that will restart it automatically." << endl; } @@ -228,10 +233,11 @@ int start(int argc, char **argv) int delay_ms = 500; bool drop_late_packets = false; + uint32_t backoff_after_reset_ms = DEFAULT_BACKOFF; int ch = 0; while (ch != -1) { - ch = getopt(argc, argv, "d:p:s:S:t:Pf:i:Dva:w:x"); + ch = getopt(argc, argv, "d:p:s:S:t:Pf:i:Dva:b:w:xh"); switch (ch) { case -1: break; @@ -276,6 +282,9 @@ int start(int argc, char **argv) case 'a': edi_conf.tagpacket_alignment = std::stoi(optarg); break; + case 'b': + backoff_after_reset_ms = std::stoi(optarg); + break; case 'w': delay_ms = std::stoi(optarg); break; @@ -313,85 +322,92 @@ int start(int argc, char **argv) const char* source_url = argv[optind]; - - size_t frame_count = 0; - size_t error_count = 0; - - etiLog.level(info) << "Opening ZMQ input: " << source_url; - zmq::context_t zmq_ctx(1); - zmq::socket_t zmq_sock(zmq_ctx, ZMQ_SUB); - zmq_sock.connect(source_url); - zmq_sock.setsockopt(ZMQ_SUBSCRIBE, NULL, 0); // subscribe to all messages - - while (error_count < MAX_ERROR_COUNT) { - zmq::message_t incoming; - zmq::pollitem_t items[1]; - items[0].socket = zmq_sock; - items[0].events = ZMQ_POLLIN; - const int num_events = zmq::poll(items, 1, ZMQ_TIMEOUT_MS); - if (num_events == 0) { // timeout - error_count++; - } - else { - // Event received: recv will not block - zmq_sock.recv(&incoming); - - zmq_dab_message_t* dab_msg = (zmq_dab_message_t*)incoming.data(); + etiLog.level(info) << "Opening ZMQ input: " << source_url; - if (dab_msg->version != 1) { - etiLog.level(error) << "ZeroMQ wrong packet version " << dab_msg->version; + size_t num_consecutive_resets = 0; + while (true) { + zmq::socket_t zmq_sock(zmq_ctx, ZMQ_SUB); + zmq_sock.connect(source_url); + zmq_sock.setsockopt(ZMQ_SUBSCRIBE, NULL, 0); // subscribe to all messages + + size_t error_count = 0; + + while (error_count < MAX_ERROR_COUNT) { + zmq::message_t incoming; + zmq::pollitem_t items[1]; + items[0].socket = zmq_sock; + items[0].events = ZMQ_POLLIN; + const int num_events = zmq::poll(items, 1, ZMQ_TIMEOUT_MS); + if (num_events == 0) { // timeout error_count++; } + else { + num_consecutive_resets = 0; - int offset = sizeof(dab_msg->version) + - NUM_FRAMES_PER_ZMQ_MESSAGE * sizeof(*dab_msg->buflen); + // Event received: recv will not block + zmq_sock.recv(&incoming); - std::list<std::pair<std::vector<uint8_t>, metadata_t> > all_frames; + zmq_dab_message_t* dab_msg = (zmq_dab_message_t*)incoming.data(); - for (int i = 0; i < NUM_FRAMES_PER_ZMQ_MESSAGE; i++) { - if (dab_msg->buflen[i] <= 0 or dab_msg->buflen[i] > 6144) { - etiLog.level(error) << "ZeroMQ buffer " << i << - " has invalid length " << dab_msg->buflen[i]; + if (dab_msg->version != 1) { + etiLog.level(error) << "ZeroMQ wrong packet version " << dab_msg->version; error_count++; } - else { - std::vector<uint8_t> buf(6144, 0x55); - const int framesize = dab_msg->buflen[i]; + int offset = sizeof(dab_msg->version) + + NUM_FRAMES_PER_ZMQ_MESSAGE * sizeof(*dab_msg->buflen); + + std::list<std::pair<std::vector<uint8_t>, metadata_t> > all_frames; + + for (int i = 0; i < NUM_FRAMES_PER_ZMQ_MESSAGE; i++) { + if (dab_msg->buflen[i] <= 0 or dab_msg->buflen[i] > 6144) { + etiLog.level(error) << "ZeroMQ buffer " << i << + " has invalid length " << dab_msg->buflen[i]; + error_count++; + } + else { + std::vector<uint8_t> buf(6144, 0x55); + + const int framesize = dab_msg->buflen[i]; - memcpy(&buf.front(), - ((uint8_t*)incoming.data()) + offset, - framesize); + memcpy(&buf.front(), + ((uint8_t*)incoming.data()) + offset, + framesize); - all_frames.emplace_back( - std::piecewise_construct, - std::make_tuple(std::move(buf)), - std::make_tuple()); + all_frames.emplace_back( + std::piecewise_construct, + std::make_tuple(std::move(buf)), + std::make_tuple()); - offset += framesize; + offset += framesize; + } } - } - for (auto &f : all_frames) { - size_t consumed_bytes = 0; + for (auto &f : all_frames) { + size_t consumed_bytes = 0; - f.second = get_md_one_frame( - static_cast<uint8_t*>(incoming.data()) + offset, - incoming.size() - offset, - &consumed_bytes); + f.second = get_md_one_frame( + static_cast<uint8_t*>(incoming.data()) + offset, + incoming.size() - offset, + &consumed_bytes); - offset += consumed_bytes; - } + offset += consumed_bytes; + } - for (auto &f : all_frames) { - edisender.push_frame(f); - frame_count++; + for (auto &f : all_frames) { + edisender.push_frame(f); + } } } - } - etiLog.level(info) << "Quitting after " << frame_count << " frames transferred"; + num_consecutive_resets++; + + zmq_sock.close(); + std::this_thread::sleep_for(std::chrono::milliseconds(backoff_after_reset_ms)); + etiLog.level(info) << "ZMQ input (" << source_url << ") timeout after " << + num_consecutive_resets << " consecutive resets."; + } return 0; } |