diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/zmq2edi/zmq2edi.cpp | 123 |
1 files changed, 68 insertions, 55 deletions
diff --git a/src/zmq2edi/zmq2edi.cpp b/src/zmq2edi/zmq2edi.cpp index e69a6c2..20ddcbe 100644 --- a/src/zmq2edi/zmq2edi.cpp +++ b/src/zmq2edi/zmq2edi.cpp @@ -224,6 +224,8 @@ static void parse_destination_args(char option) } } +class FCTDiscontinuity { }; + int start(int argc, char **argv) { edi_conf.enable_pft = true; @@ -350,89 +352,100 @@ int start(int argc, char **argv) zmq::context_t zmq_ctx(1); etiLog.level(info) << "Opening ZMQ input: " << source_url; - size_t num_consecutive_resets = 0; while (true) { zmq::socket_t zmq_sock(zmq_ctx, ZMQ_SUB); zmq_sock.connect(source_url); zmq_sock.setsockopt(ZMQ_SUBSCRIBE, NULL, 0); // subscribe to all messages size_t error_count = 0; + int previous_fct = -1; + + try { + while (error_count < MAX_ERROR_COUNT) { + zmq::message_t incoming; + zmq::pollitem_t items[1]; + items[0].socket = zmq_sock; + items[0].events = ZMQ_POLLIN; + const int num_events = zmq::poll(items, 1, ZMQ_TIMEOUT_MS); + if (num_events == 0) { // timeout + error_count++; + } + else { + // Event received: recv will not block + zmq_sock.recv(&incoming); + const auto received_at = std::chrono::steady_clock::now(); - while (error_count < MAX_ERROR_COUNT) { - zmq::message_t incoming; - zmq::pollitem_t items[1]; - items[0].socket = zmq_sock; - items[0].events = ZMQ_POLLIN; - const int num_events = zmq::poll(items, 1, ZMQ_TIMEOUT_MS); - if (num_events == 0) { // timeout - error_count++; - } - else { - num_consecutive_resets = 0; + zmq_dab_message_t* dab_msg = (zmq_dab_message_t*)incoming.data(); - // Event received: recv will not block - zmq_sock.recv(&incoming); - const auto received_at = std::chrono::steady_clock::now(); + if (dab_msg->version != 1) { + etiLog.level(error) << "ZeroMQ wrong packet version " << dab_msg->version; + error_count++; + } - zmq_dab_message_t* dab_msg = (zmq_dab_message_t*)incoming.data(); + int offset = sizeof(dab_msg->version) + + NUM_FRAMES_PER_ZMQ_MESSAGE * sizeof(*dab_msg->buflen); - if (dab_msg->version != 1) { - etiLog.level(error) << "ZeroMQ wrong packet version " << dab_msg->version; - error_count++; - } + std::vector<frame_t> all_frames; + all_frames.reserve(NUM_FRAMES_PER_ZMQ_MESSAGE); - int offset = sizeof(dab_msg->version) + - NUM_FRAMES_PER_ZMQ_MESSAGE * sizeof(*dab_msg->buflen); + for (int i = 0; i < NUM_FRAMES_PER_ZMQ_MESSAGE; i++) { + if (dab_msg->buflen[i] <= 0 or dab_msg->buflen[i] > 6144) { + etiLog.level(error) << "ZeroMQ buffer " << i << + " has invalid length " << dab_msg->buflen[i]; + error_count++; + } + else { + frame_t frame; + frame.data.resize(6144, 0x55); + frame.received_at = received_at; - std::vector<frame_t> all_frames; - all_frames.reserve(NUM_FRAMES_PER_ZMQ_MESSAGE); + const int framesize = dab_msg->buflen[i]; - for (int i = 0; i < NUM_FRAMES_PER_ZMQ_MESSAGE; i++) { - if (dab_msg->buflen[i] <= 0 or dab_msg->buflen[i] > 6144) { - etiLog.level(error) << "ZeroMQ buffer " << i << - " has invalid length " << dab_msg->buflen[i]; - error_count++; - } - else { - frame_t frame; - frame.data.resize(6144, 0x55); - frame.received_at = received_at; + memcpy(frame.data.data(), + ((uint8_t*)incoming.data()) + offset, + framesize); - const int framesize = dab_msg->buflen[i]; + const int fct = frame.data[4]; - memcpy(frame.data.data(), - ((uint8_t*)incoming.data()) + offset, - framesize); + const int expected_fct = (previous_fct + 1) % 250; + if (previous_fct != -1 and expected_fct != fct) { + etiLog.level(error) << "ETI wrong frame counter FCT=" << fct << " expected " << expected_fct; + throw FCTDiscontinuity(); + } + previous_fct = fct; - all_frames.push_back(std::move(frame)); + all_frames.push_back(std::move(frame)); - offset += framesize; + offset += framesize; + } } - } - for (auto &f : all_frames) { - size_t consumed_bytes = 0; + for (auto &f : all_frames) { + size_t consumed_bytes = 0; - f.metadata = get_md_one_frame( - static_cast<uint8_t*>(incoming.data()) + offset, - incoming.size() - offset, - &consumed_bytes); + f.metadata = get_md_one_frame( + static_cast<uint8_t*>(incoming.data()) + offset, + incoming.size() - offset, + &consumed_bytes); - offset += consumed_bytes; - } + offset += consumed_bytes; + } - for (auto &f : all_frames) { - edisender.push_frame(std::move(f)); + for (auto &f : all_frames) { + edisender.push_frame(std::move(f)); + } } } - } - num_consecutive_resets++; + etiLog.level(info) << "Backoff " << backoff_after_reset_ms << + "ms due to ZMQ input (" << source_url << ") timeout"; + } + catch (const FCTDiscontinuity&) { + etiLog.level(info) << "Backoff " << backoff_after_reset_ms << "ms FCT discontinuity"; + } zmq_sock.close(); std::this_thread::sleep_for(std::chrono::milliseconds(backoff_after_reset_ms)); - etiLog.level(info) << "ZMQ input (" << source_url << ") timeout after " << - num_consecutive_resets << " consecutive resets."; } return 0; |