diff options
| author | Matthias P. Braendli <matthias.braendli@mpb.li> | 2017-09-25 10:01:26 +0200 | 
|---|---|---|
| committer | Matthias P. Braendli <matthias.braendli@mpb.li> | 2017-09-25 10:01:26 +0200 | 
| commit | 1cbdb69dffaa0a649b762b84d5a485e68f5188f6 (patch) | |
| tree | 7a65d6677d507c0d742cb83e5f2c77e77f9d5743 /src | |
| parent | 0ca115acae8a5b8b0c51a3b19a1f9381b7347bf4 (diff) | |
| download | dabmux-1cbdb69dffaa0a649b762b84d5a485e68f5188f6.tar.gz dabmux-1cbdb69dffaa0a649b762b84d5a485e68f5188f6.tar.bz2 dabmux-1cbdb69dffaa0a649b762b84d5a485e68f5188f6.zip  | |
zmq2edi: automatically reconnect zmq socket if no data in 30s
Diffstat (limited to 'src')
| -rw-r--r-- | src/zmq2edi/zmq2edi.cpp | 114 | 
1 files changed, 65 insertions, 49 deletions
diff --git a/src/zmq2edi/zmq2edi.cpp b/src/zmq2edi/zmq2edi.cpp index a1269a9..2216c85 100644 --- a/src/zmq2edi/zmq2edi.cpp +++ b/src/zmq2edi/zmq2edi.cpp @@ -235,77 +235,93 @@ int start(int argc, char **argv)      const char* source_url = argv[optind]; -    etiLog.level(info) << "Opening ZMQ input: " << source_url;      zmq::context_t zmq_ctx(1); -    zmq::socket_t zmq_sock(zmq_ctx, ZMQ_SUB); - -    zmq_sock.connect(source_url); -    zmq_sock.setsockopt(ZMQ_SUBSCRIBE, NULL, 0); // subscribe to all messages      etiLog.level(info) << "Entering main loop";      size_t frame_count = 0; -    size_t error_count = 0; -    while (error_count < 10) -    { -        zmq::message_t incoming; -        zmq_sock.recv(&incoming); - -        zmq_dab_message_t* dab_msg = (zmq_dab_message_t*)incoming.data(); -        if (dab_msg->version != 1) { -            etiLog.level(error) << "ZeroMQ wrong packet version " << dab_msg->version; -            error_count++; -        } +    while (true) { +        size_t error_count = 0; -        int offset = sizeof(dab_msg->version) + -            NUM_FRAMES_PER_ZMQ_MESSAGE * sizeof(*dab_msg->buflen); +        etiLog.level(info) << "Opening ZMQ input: " << source_url; -        std::list<std::pair<std::vector<uint8_t>, metadata_t> > all_frames; +        zmq::socket_t zmq_sock(zmq_ctx, ZMQ_SUB); +        zmq_sock.connect(source_url); +        zmq_sock.setsockopt(ZMQ_SUBSCRIBE, NULL, 0); // subscribe to all messages -        for (int i = 0; i < NUM_FRAMES_PER_ZMQ_MESSAGE; i++) { -            if (dab_msg->buflen[i] <= 0 || -                dab_msg->buflen[i] > 6144) -            { -                etiLog.level(error) << "ZeroMQ buffer " << i << " has invalid length " << -                    dab_msg->buflen[i]; +        while (error_count < 10) +        { +            zmq::message_t incoming; +            zmq::pollitem_t items[1]; +            items[0].socket = zmq_sock; +            items[0].events = ZMQ_POLLIN; +            const long timeout_ms = 3000; +            const int num_events = zmq::poll(items, 1, timeout_ms); +            if (num_events == 0) { // timeout                  error_count++;              }              else { -                std::vector<uint8_t> buf(6144, 0x55); +                // Event received: recv will not block +                zmq_sock.recv(&incoming); -                const int framesize = dab_msg->buflen[i]; +                zmq_dab_message_t* dab_msg = (zmq_dab_message_t*)incoming.data(); -                memcpy(&buf.front(), -                        ((uint8_t*)incoming.data()) + offset, -                        framesize); +                if (dab_msg->version != 1) { +                    etiLog.level(error) << "ZeroMQ wrong packet version " << dab_msg->version; +                    error_count++; +                } -                all_frames.emplace_back( -                        std::piecewise_construct, -                        std::make_tuple(std::move(buf)), -                        std::make_tuple()); +                int offset = sizeof(dab_msg->version) + +                    NUM_FRAMES_PER_ZMQ_MESSAGE * sizeof(*dab_msg->buflen); -                offset += framesize; -            } -        } +                std::list<std::pair<std::vector<uint8_t>, metadata_t> > all_frames; + +                for (int i = 0; i < NUM_FRAMES_PER_ZMQ_MESSAGE; i++) { +                    if (dab_msg->buflen[i] <= 0 || +                            dab_msg->buflen[i] > 6144) +                    { +                        etiLog.level(error) << "ZeroMQ buffer " << i << " has invalid length " << +                            dab_msg->buflen[i]; +                        error_count++; +                    } +                    else { +                        std::vector<uint8_t> buf(6144, 0x55); -        for (auto &f : all_frames) { -            size_t consumed_bytes = 0; +                        const int framesize = dab_msg->buflen[i]; -            f.second = get_md_one_frame( -                    static_cast<uint8_t*>(incoming.data()) + offset, -                    incoming.size() - offset, -                    &consumed_bytes); +                        memcpy(&buf.front(), +                                ((uint8_t*)incoming.data()) + offset, +                                framesize); -            offset += consumed_bytes; -        } +                        all_frames.emplace_back( +                                std::piecewise_construct, +                                std::make_tuple(std::move(buf)), +                                std::make_tuple()); -        for (auto &f : all_frames) { -            edisender.push_frame(f); -            frame_count++; +                        offset += framesize; +                    } +                } + +                for (auto &f : all_frames) { +                    size_t consumed_bytes = 0; + +                    f.second = get_md_one_frame( +                            static_cast<uint8_t*>(incoming.data()) + offset, +                            incoming.size() - offset, +                            &consumed_bytes); + +                    offset += consumed_bytes; +                } + +                for (auto &f : all_frames) { +                    edisender.push_frame(f); +                    frame_count++; +                } +            }          }      } -    return error_count > 0 ? 2 : 0; +    return 0;  }  int main(int argc, char **argv)  | 
