From fcaedf40bbd8e6361cdd241cd993e45add8fbf92 Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Sun, 14 Jan 2018 00:54:51 +0100 Subject: Let zmq2edi crash after too many errors --- src/zmq2edi/zmq2edi.cpp | 141 +++++++++++++++++++++++------------------------- 1 file changed, 68 insertions(+), 73 deletions(-) (limited to 'src') diff --git a/src/zmq2edi/zmq2edi.cpp b/src/zmq2edi/zmq2edi.cpp index fadd163..8ea1bf0 100644 --- a/src/zmq2edi/zmq2edi.cpp +++ b/src/zmq2edi/zmq2edi.cpp @@ -240,19 +240,19 @@ int start(int argc, char **argv) break; case 'i': { - double interleave_ms = std::stod(optarg); - if (interleave_ms != 0.0) { - if (interleave_ms < 0) { - throw std::runtime_error("EDI output: negative interleave value is invalid."); + double interleave_ms = std::stod(optarg); + if (interleave_ms != 0.0) { + if (interleave_ms < 0) { + throw std::runtime_error("EDI output: negative interleave value is invalid."); + } + + auto latency_rounded = lround(interleave_ms / 24.0); + if (latency_rounded * 24 > 30000) { + throw std::runtime_error("EDI output: interleaving set for more than 30 seconds!"); + } + + edi_conf.latency_frames = latency_rounded; } - - auto latency_rounded = lround(interleave_ms / 24.0); - if (latency_rounded * 24 > 30000) { - throw std::runtime_error("EDI output: interleaving set for more than 30 seconds!"); - } - - edi_conf.latency_frames = latency_rounded; - } } break; case 'D': @@ -297,92 +297,87 @@ int start(int argc, char **argv) const char* source_url = argv[optind]; - zmq::context_t zmq_ctx(1); - etiLog.level(info) << "Entering main loop"; size_t frame_count = 0; + size_t error_count = 0; - while (true) { - size_t error_count = 0; + etiLog.level(info) << "Opening ZMQ input: " << source_url; - etiLog.level(info) << "Opening ZMQ input: " << source_url; + zmq::context_t zmq_ctx(1); + zmq::socket_t zmq_sock(zmq_ctx, ZMQ_SUB); + zmq_sock.connect(source_url); + zmq_sock.setsockopt(ZMQ_SUBSCRIBE, NULL, 0); // subscribe to all messages + + while (error_count < 10) { + zmq::message_t incoming; + zmq::pollitem_t items[1]; + items[0].socket = zmq_sock; + items[0].events = ZMQ_POLLIN; + const long timeout_ms = 300; + const int num_events = zmq::poll(items, 1, timeout_ms); + if (num_events == 0) { // timeout + error_count++; + } + else { + // Event received: recv will not block + zmq_sock.recv(&incoming); - zmq::socket_t zmq_sock(zmq_ctx, ZMQ_SUB); - zmq_sock.connect(source_url); - zmq_sock.setsockopt(ZMQ_SUBSCRIBE, NULL, 0); // subscribe to all messages + zmq_dab_message_t* dab_msg = (zmq_dab_message_t*)incoming.data(); - while (error_count < 10) - { - zmq::message_t incoming; - zmq::pollitem_t items[1]; - items[0].socket = zmq_sock; - items[0].events = ZMQ_POLLIN; - const long timeout_ms = 3000; - const int num_events = zmq::poll(items, 1, timeout_ms); - if (num_events == 0) { // timeout + if (dab_msg->version != 1) { + etiLog.level(error) << "ZeroMQ wrong packet version " << dab_msg->version; error_count++; } - else { - // Event received: recv will not block - zmq_sock.recv(&incoming); - zmq_dab_message_t* dab_msg = (zmq_dab_message_t*)incoming.data(); + int offset = sizeof(dab_msg->version) + + NUM_FRAMES_PER_ZMQ_MESSAGE * sizeof(*dab_msg->buflen); - if (dab_msg->version != 1) { - etiLog.level(error) << "ZeroMQ wrong packet version " << dab_msg->version; + std::list, metadata_t> > all_frames; + + for (int i = 0; i < NUM_FRAMES_PER_ZMQ_MESSAGE; i++) { + if (dab_msg->buflen[i] <= 0 or dab_msg->buflen[i] > 6144) { + etiLog.level(error) << "ZeroMQ buffer " << i << + " has invalid length " << dab_msg->buflen[i]; error_count++; } + else { + std::vector buf(6144, 0x55); - int offset = sizeof(dab_msg->version) + - NUM_FRAMES_PER_ZMQ_MESSAGE * sizeof(*dab_msg->buflen); - - std::list, metadata_t> > all_frames; + const int framesize = dab_msg->buflen[i]; - for (int i = 0; i < NUM_FRAMES_PER_ZMQ_MESSAGE; i++) { - if (dab_msg->buflen[i] <= 0 || - dab_msg->buflen[i] > 6144) - { - etiLog.level(error) << "ZeroMQ buffer " << i << " has invalid length " << - dab_msg->buflen[i]; - error_count++; - } - else { - std::vector buf(6144, 0x55); - - const int framesize = dab_msg->buflen[i]; - - memcpy(&buf.front(), - ((uint8_t*)incoming.data()) + offset, - framesize); + memcpy(&buf.front(), + ((uint8_t*)incoming.data()) + offset, + framesize); - all_frames.emplace_back( - std::piecewise_construct, - std::make_tuple(std::move(buf)), - std::make_tuple()); + all_frames.emplace_back( + std::piecewise_construct, + std::make_tuple(std::move(buf)), + std::make_tuple()); - offset += framesize; - } + offset += framesize; } + } - for (auto &f : all_frames) { - size_t consumed_bytes = 0; + for (auto &f : all_frames) { + size_t consumed_bytes = 0; - f.second = get_md_one_frame( - static_cast(incoming.data()) + offset, - incoming.size() - offset, - &consumed_bytes); + f.second = get_md_one_frame( + static_cast(incoming.data()) + offset, + incoming.size() - offset, + &consumed_bytes); - offset += consumed_bytes; - } + offset += consumed_bytes; + } - for (auto &f : all_frames) { - edisender.push_frame(f); - frame_count++; - } + for (auto &f : all_frames) { + edisender.push_frame(f); + frame_count++; } } } + etiLog.level(info) << "Quitting after " << frame_count << " frames transferred"; + return 0; } -- cgit v1.2.3