diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/zmq2edi/zmq2edi.cpp | 141 | 
1 files changed, 68 insertions, 73 deletions
diff --git a/src/zmq2edi/zmq2edi.cpp b/src/zmq2edi/zmq2edi.cpp index fadd163..8ea1bf0 100644 --- a/src/zmq2edi/zmq2edi.cpp +++ b/src/zmq2edi/zmq2edi.cpp @@ -240,19 +240,19 @@ int start(int argc, char **argv)                  break;              case 'i':                  { -                double interleave_ms = std::stod(optarg); -                if (interleave_ms != 0.0) { -                    if (interleave_ms < 0) { -                        throw std::runtime_error("EDI output: negative interleave value is invalid."); +                    double interleave_ms = std::stod(optarg); +                    if (interleave_ms != 0.0) { +                        if (interleave_ms < 0) { +                            throw std::runtime_error("EDI output: negative interleave value is invalid."); +                        } + +                        auto latency_rounded = lround(interleave_ms / 24.0); +                        if (latency_rounded * 24 > 30000) { +                            throw std::runtime_error("EDI output: interleaving set for more than 30 seconds!"); +                        } + +                        edi_conf.latency_frames = latency_rounded;                      } - -                    auto latency_rounded = lround(interleave_ms / 24.0); -                    if (latency_rounded * 24 > 30000) { -                        throw std::runtime_error("EDI output: interleaving set for more than 30 seconds!"); -                    } - -                    edi_conf.latency_frames = latency_rounded; -                }                  }                  break;              case 'D': @@ -297,92 +297,87 @@ int start(int argc, char **argv)      const char* source_url = argv[optind]; -    zmq::context_t zmq_ctx(1); -    etiLog.level(info) << "Entering main loop";      size_t frame_count = 0; +    size_t error_count = 0; -    while (true) { -        size_t error_count = 0; +    etiLog.level(info) << "Opening ZMQ input: " << source_url; -        etiLog.level(info) << "Opening ZMQ input: " << source_url; +    zmq::context_t zmq_ctx(1); +    zmq::socket_t zmq_sock(zmq_ctx, ZMQ_SUB); +    zmq_sock.connect(source_url); +    zmq_sock.setsockopt(ZMQ_SUBSCRIBE, NULL, 0); // subscribe to all messages + +    while (error_count < 10) { +        zmq::message_t incoming; +        zmq::pollitem_t items[1]; +        items[0].socket = zmq_sock; +        items[0].events = ZMQ_POLLIN; +        const long timeout_ms = 300; +        const int num_events = zmq::poll(items, 1, timeout_ms); +        if (num_events == 0) { // timeout +            error_count++; +        } +        else { +            // Event received: recv will not block +            zmq_sock.recv(&incoming); -        zmq::socket_t zmq_sock(zmq_ctx, ZMQ_SUB); -        zmq_sock.connect(source_url); -        zmq_sock.setsockopt(ZMQ_SUBSCRIBE, NULL, 0); // subscribe to all messages +            zmq_dab_message_t* dab_msg = (zmq_dab_message_t*)incoming.data(); -        while (error_count < 10) -        { -            zmq::message_t incoming; -            zmq::pollitem_t items[1]; -            items[0].socket = zmq_sock; -            items[0].events = ZMQ_POLLIN; -            const long timeout_ms = 3000; -            const int num_events = zmq::poll(items, 1, timeout_ms); -            if (num_events == 0) { // timeout +            if (dab_msg->version != 1) { +                etiLog.level(error) << "ZeroMQ wrong packet version " << dab_msg->version;                  error_count++;              } -            else { -                // Event received: recv will not block -                zmq_sock.recv(&incoming); -                zmq_dab_message_t* dab_msg = (zmq_dab_message_t*)incoming.data(); +            int offset = sizeof(dab_msg->version) + +                NUM_FRAMES_PER_ZMQ_MESSAGE * sizeof(*dab_msg->buflen); -                if (dab_msg->version != 1) { -                    etiLog.level(error) << "ZeroMQ wrong packet version " << dab_msg->version; +            std::list<std::pair<std::vector<uint8_t>, metadata_t> > all_frames; + +            for (int i = 0; i < NUM_FRAMES_PER_ZMQ_MESSAGE; i++) { +                if (dab_msg->buflen[i] <= 0 or dab_msg->buflen[i] > 6144) { +                    etiLog.level(error) << "ZeroMQ buffer " << i << +                        " has invalid length " << dab_msg->buflen[i];                      error_count++;                  } +                else { +                    std::vector<uint8_t> buf(6144, 0x55); -                int offset = sizeof(dab_msg->version) + -                    NUM_FRAMES_PER_ZMQ_MESSAGE * sizeof(*dab_msg->buflen); - -                std::list<std::pair<std::vector<uint8_t>, metadata_t> > all_frames; +                    const int framesize = dab_msg->buflen[i]; -                for (int i = 0; i < NUM_FRAMES_PER_ZMQ_MESSAGE; i++) { -                    if (dab_msg->buflen[i] <= 0 || -                            dab_msg->buflen[i] > 6144) -                    { -                        etiLog.level(error) << "ZeroMQ buffer " << i << " has invalid length " << -                            dab_msg->buflen[i]; -                        error_count++; -                    } -                    else { -                        std::vector<uint8_t> buf(6144, 0x55); - -                        const int framesize = dab_msg->buflen[i]; - -                        memcpy(&buf.front(), -                                ((uint8_t*)incoming.data()) + offset, -                                framesize); +                    memcpy(&buf.front(), +                            ((uint8_t*)incoming.data()) + offset, +                            framesize); -                        all_frames.emplace_back( -                                std::piecewise_construct, -                                std::make_tuple(std::move(buf)), -                                std::make_tuple()); +                    all_frames.emplace_back( +                            std::piecewise_construct, +                            std::make_tuple(std::move(buf)), +                            std::make_tuple()); -                        offset += framesize; -                    } +                    offset += framesize;                  } +            } -                for (auto &f : all_frames) { -                    size_t consumed_bytes = 0; +            for (auto &f : all_frames) { +                size_t consumed_bytes = 0; -                    f.second = get_md_one_frame( -                            static_cast<uint8_t*>(incoming.data()) + offset, -                            incoming.size() - offset, -                            &consumed_bytes); +                f.second = get_md_one_frame( +                        static_cast<uint8_t*>(incoming.data()) + offset, +                        incoming.size() - offset, +                        &consumed_bytes); -                    offset += consumed_bytes; -                } +                offset += consumed_bytes; +            } -                for (auto &f : all_frames) { -                    edisender.push_frame(f); -                    frame_count++; -                } +            for (auto &f : all_frames) { +                edisender.push_frame(f); +                frame_count++;              }          }      } +    etiLog.level(info) << "Quitting after " << frame_count << " frames transferred"; +      return 0;  }  | 
