diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/zmq2edi/zmq2edi.cpp | 123 | 
1 files changed, 68 insertions, 55 deletions
diff --git a/src/zmq2edi/zmq2edi.cpp b/src/zmq2edi/zmq2edi.cpp index e69a6c2..20ddcbe 100644 --- a/src/zmq2edi/zmq2edi.cpp +++ b/src/zmq2edi/zmq2edi.cpp @@ -224,6 +224,8 @@ static void parse_destination_args(char option)      }  } +class FCTDiscontinuity { }; +  int start(int argc, char **argv)  {      edi_conf.enable_pft = true; @@ -350,89 +352,100 @@ int start(int argc, char **argv)      zmq::context_t zmq_ctx(1);      etiLog.level(info) << "Opening ZMQ input: " << source_url; -    size_t num_consecutive_resets = 0;      while (true) {          zmq::socket_t zmq_sock(zmq_ctx, ZMQ_SUB);          zmq_sock.connect(source_url);          zmq_sock.setsockopt(ZMQ_SUBSCRIBE, NULL, 0); // subscribe to all messages          size_t error_count = 0; +        int previous_fct = -1; + +        try { +            while (error_count < MAX_ERROR_COUNT) { +                zmq::message_t incoming; +                zmq::pollitem_t items[1]; +                items[0].socket = zmq_sock; +                items[0].events = ZMQ_POLLIN; +                const int num_events = zmq::poll(items, 1, ZMQ_TIMEOUT_MS); +                if (num_events == 0) { // timeout +                    error_count++; +                } +                else { +                    // Event received: recv will not block +                    zmq_sock.recv(&incoming); +                    const auto received_at = std::chrono::steady_clock::now(); -        while (error_count < MAX_ERROR_COUNT) { -            zmq::message_t incoming; -            zmq::pollitem_t items[1]; -            items[0].socket = zmq_sock; -            items[0].events = ZMQ_POLLIN; -            const int num_events = zmq::poll(items, 1, ZMQ_TIMEOUT_MS); -            if (num_events == 0) { // timeout -                error_count++; -            } -            else { -                num_consecutive_resets = 0; +                    zmq_dab_message_t* dab_msg = (zmq_dab_message_t*)incoming.data(); -                // Event received: recv will not block -                zmq_sock.recv(&incoming); -                const auto received_at = std::chrono::steady_clock::now(); +                    if (dab_msg->version != 1) { +                        etiLog.level(error) << "ZeroMQ wrong packet version " << dab_msg->version; +                        error_count++; +                    } -                zmq_dab_message_t* dab_msg = (zmq_dab_message_t*)incoming.data(); +                    int offset = sizeof(dab_msg->version) + +                        NUM_FRAMES_PER_ZMQ_MESSAGE * sizeof(*dab_msg->buflen); -                if (dab_msg->version != 1) { -                    etiLog.level(error) << "ZeroMQ wrong packet version " << dab_msg->version; -                    error_count++; -                } +                    std::vector<frame_t> all_frames; +                    all_frames.reserve(NUM_FRAMES_PER_ZMQ_MESSAGE); -                int offset = sizeof(dab_msg->version) + -                    NUM_FRAMES_PER_ZMQ_MESSAGE * sizeof(*dab_msg->buflen); +                    for (int i = 0; i < NUM_FRAMES_PER_ZMQ_MESSAGE; i++) { +                        if (dab_msg->buflen[i] <= 0 or dab_msg->buflen[i] > 6144) { +                            etiLog.level(error) << "ZeroMQ buffer " << i << +                                " has invalid length " << dab_msg->buflen[i]; +                            error_count++; +                        } +                        else { +                            frame_t frame; +                            frame.data.resize(6144, 0x55); +                            frame.received_at = received_at; -                std::vector<frame_t> all_frames; -                all_frames.reserve(NUM_FRAMES_PER_ZMQ_MESSAGE); +                            const int framesize = dab_msg->buflen[i]; -                for (int i = 0; i < NUM_FRAMES_PER_ZMQ_MESSAGE; i++) { -                    if (dab_msg->buflen[i] <= 0 or dab_msg->buflen[i] > 6144) { -                        etiLog.level(error) << "ZeroMQ buffer " << i << -                            " has invalid length " << dab_msg->buflen[i]; -                        error_count++; -                    } -                    else { -                        frame_t frame; -                        frame.data.resize(6144, 0x55); -                        frame.received_at = received_at; +                            memcpy(frame.data.data(), +                                    ((uint8_t*)incoming.data()) + offset, +                                    framesize); -                        const int framesize = dab_msg->buflen[i]; +                            const int fct = frame.data[4]; -                        memcpy(frame.data.data(), -                                ((uint8_t*)incoming.data()) + offset, -                                framesize); +                            const int expected_fct = (previous_fct + 1) % 250; +                            if (previous_fct != -1 and expected_fct != fct) { +                                etiLog.level(error) << "ETI wrong frame counter FCT=" << fct << " expected " << expected_fct; +                                throw FCTDiscontinuity(); +                            } +                            previous_fct = fct; -                        all_frames.push_back(std::move(frame)); +                            all_frames.push_back(std::move(frame)); -                        offset += framesize; +                            offset += framesize; +                        }                      } -                } -                for (auto &f : all_frames) { -                    size_t consumed_bytes = 0; +                    for (auto &f : all_frames) { +                        size_t consumed_bytes = 0; -                    f.metadata = get_md_one_frame( -                            static_cast<uint8_t*>(incoming.data()) + offset, -                            incoming.size() - offset, -                            &consumed_bytes); +                        f.metadata = get_md_one_frame( +                                static_cast<uint8_t*>(incoming.data()) + offset, +                                incoming.size() - offset, +                                &consumed_bytes); -                    offset += consumed_bytes; -                } +                        offset += consumed_bytes; +                    } -                for (auto &f : all_frames) { -                    edisender.push_frame(std::move(f)); +                    for (auto &f : all_frames) { +                        edisender.push_frame(std::move(f)); +                    }                  }              } -        } -        num_consecutive_resets++; +            etiLog.level(info) << "Backoff " << backoff_after_reset_ms << +                "ms due to ZMQ input (" << source_url << ") timeout"; +        } +        catch (const FCTDiscontinuity&) { +            etiLog.level(info) << "Backoff " << backoff_after_reset_ms << "ms FCT discontinuity"; +        }          zmq_sock.close();          std::this_thread::sleep_for(std::chrono::milliseconds(backoff_after_reset_ms)); -        etiLog.level(info) << "ZMQ input (" << source_url << ") timeout after " << -            num_consecutive_resets << " consecutive resets.";      }      return 0;  | 
