diff options
Diffstat (limited to 'src/zmq2edi/zmq2edi.cpp')
-rw-r--r-- | src/zmq2edi/zmq2edi.cpp | 114 |
1 files changed, 65 insertions, 49 deletions
diff --git a/src/zmq2edi/zmq2edi.cpp b/src/zmq2edi/zmq2edi.cpp index a1269a9..2216c85 100644 --- a/src/zmq2edi/zmq2edi.cpp +++ b/src/zmq2edi/zmq2edi.cpp @@ -235,77 +235,93 @@ int start(int argc, char **argv) const char* source_url = argv[optind]; - etiLog.level(info) << "Opening ZMQ input: " << source_url; zmq::context_t zmq_ctx(1); - zmq::socket_t zmq_sock(zmq_ctx, ZMQ_SUB); - - zmq_sock.connect(source_url); - zmq_sock.setsockopt(ZMQ_SUBSCRIBE, NULL, 0); // subscribe to all messages etiLog.level(info) << "Entering main loop"; size_t frame_count = 0; - size_t error_count = 0; - while (error_count < 10) - { - zmq::message_t incoming; - zmq_sock.recv(&incoming); - - zmq_dab_message_t* dab_msg = (zmq_dab_message_t*)incoming.data(); - if (dab_msg->version != 1) { - etiLog.level(error) << "ZeroMQ wrong packet version " << dab_msg->version; - error_count++; - } + while (true) { + size_t error_count = 0; - int offset = sizeof(dab_msg->version) + - NUM_FRAMES_PER_ZMQ_MESSAGE * sizeof(*dab_msg->buflen); + etiLog.level(info) << "Opening ZMQ input: " << source_url; - std::list<std::pair<std::vector<uint8_t>, metadata_t> > all_frames; + zmq::socket_t zmq_sock(zmq_ctx, ZMQ_SUB); + zmq_sock.connect(source_url); + zmq_sock.setsockopt(ZMQ_SUBSCRIBE, NULL, 0); // subscribe to all messages - for (int i = 0; i < NUM_FRAMES_PER_ZMQ_MESSAGE; i++) { - if (dab_msg->buflen[i] <= 0 || - dab_msg->buflen[i] > 6144) - { - etiLog.level(error) << "ZeroMQ buffer " << i << " has invalid length " << - dab_msg->buflen[i]; + while (error_count < 10) + { + zmq::message_t incoming; + zmq::pollitem_t items[1]; + items[0].socket = zmq_sock; + items[0].events = ZMQ_POLLIN; + const long timeout_ms = 3000; + const int num_events = zmq::poll(items, 1, timeout_ms); + if (num_events == 0) { // timeout error_count++; } else { - std::vector<uint8_t> buf(6144, 0x55); + // Event received: recv will not block + zmq_sock.recv(&incoming); - const int framesize = dab_msg->buflen[i]; + zmq_dab_message_t* dab_msg = (zmq_dab_message_t*)incoming.data(); - memcpy(&buf.front(), - ((uint8_t*)incoming.data()) + offset, - framesize); + if (dab_msg->version != 1) { + etiLog.level(error) << "ZeroMQ wrong packet version " << dab_msg->version; + error_count++; + } - all_frames.emplace_back( - std::piecewise_construct, - std::make_tuple(std::move(buf)), - std::make_tuple()); + int offset = sizeof(dab_msg->version) + + NUM_FRAMES_PER_ZMQ_MESSAGE * sizeof(*dab_msg->buflen); - offset += framesize; - } - } + std::list<std::pair<std::vector<uint8_t>, metadata_t> > all_frames; + + for (int i = 0; i < NUM_FRAMES_PER_ZMQ_MESSAGE; i++) { + if (dab_msg->buflen[i] <= 0 || + dab_msg->buflen[i] > 6144) + { + etiLog.level(error) << "ZeroMQ buffer " << i << " has invalid length " << + dab_msg->buflen[i]; + error_count++; + } + else { + std::vector<uint8_t> buf(6144, 0x55); - for (auto &f : all_frames) { - size_t consumed_bytes = 0; + const int framesize = dab_msg->buflen[i]; - f.second = get_md_one_frame( - static_cast<uint8_t*>(incoming.data()) + offset, - incoming.size() - offset, - &consumed_bytes); + memcpy(&buf.front(), + ((uint8_t*)incoming.data()) + offset, + framesize); - offset += consumed_bytes; - } + all_frames.emplace_back( + std::piecewise_construct, + std::make_tuple(std::move(buf)), + std::make_tuple()); - for (auto &f : all_frames) { - edisender.push_frame(f); - frame_count++; + offset += framesize; + } + } + + for (auto &f : all_frames) { + size_t consumed_bytes = 0; + + f.second = get_md_one_frame( + static_cast<uint8_t*>(incoming.data()) + offset, + incoming.size() - offset, + &consumed_bytes); + + offset += consumed_bytes; + } + + for (auto &f : all_frames) { + edisender.push_frame(f); + frame_count++; + } + } } } - return error_count > 0 ? 2 : 0; + return 0; } int main(int argc, char **argv) |