diff options
| -rw-r--r-- | src/zmq2edi/zmq2edi.cpp | 135 | ||||
| -rw-r--r-- | src/zmq2farsync/zmq2farsync.cpp | 115 | 
2 files changed, 137 insertions, 113 deletions
diff --git a/src/zmq2edi/zmq2edi.cpp b/src/zmq2edi/zmq2edi.cpp index 3888d8a..2294f20 100644 --- a/src/zmq2edi/zmq2edi.cpp +++ b/src/zmq2edi/zmq2edi.cpp @@ -27,24 +27,27 @@  #include "Log.h"  #include "zmq.hpp" -#include <math.h>  #include <getopt.h> -#include <string.h> +#include <cmath> +#include <cstring> +#include <chrono>  #include <iostream>  #include <iterator> +#include <thread>  #include <vector>  #include "EDISender.h"  #include "dabOutput/dabOutput.h"  constexpr size_t MAX_ERROR_COUNT = 10; +constexpr size_t MAX_NUM_RESETS = 180;  constexpr long ZMQ_TIMEOUT_MS = 1000;  static edi::configuration_t edi_conf;  static EDISender edisender; -void usage(void) +static void usage()  {      using namespace std; @@ -70,8 +73,9 @@ void usage(void)      cerr << " -S <source ip> select the source IP in case we want to use multicast." << endl;      cerr << " -t <ttl> set the packet's TTL." << endl << endl; -    cerr << "odr-zmq2edi will quit if it does not receive data for " << +    cerr << "The input socket will be reset if no data is received for " <<          (int)(MAX_ERROR_COUNT * ZMQ_TIMEOUT_MS / 1000.0) << " seconds." << endl; +    cerr << "After " << MAX_NUM_RESETS << " consecutive resets, the process will quit." << endl;      cerr << "It is best practice to run this tool under a process supervisor that will restart it automatically." << endl;  } @@ -313,85 +317,92 @@ int start(int argc, char **argv)      const char* source_url = argv[optind]; - -    size_t frame_count = 0; -    size_t error_count = 0; - -    etiLog.level(info) << "Opening ZMQ input: " << source_url; -      zmq::context_t zmq_ctx(1); -    zmq::socket_t zmq_sock(zmq_ctx, ZMQ_SUB); -    zmq_sock.connect(source_url); -    zmq_sock.setsockopt(ZMQ_SUBSCRIBE, NULL, 0); // subscribe to all messages - -    while (error_count < MAX_ERROR_COUNT) { -        zmq::message_t incoming; -        zmq::pollitem_t items[1]; -        items[0].socket = zmq_sock; -        items[0].events = ZMQ_POLLIN; -        const int num_events = zmq::poll(items, 1, ZMQ_TIMEOUT_MS); -        if (num_events == 0) { // timeout -            error_count++; -        } -        else { -            // Event received: recv will not block -            zmq_sock.recv(&incoming); - -            zmq_dab_message_t* dab_msg = (zmq_dab_message_t*)incoming.data(); +    etiLog.level(info) << "Opening ZMQ input: " << source_url; -            if (dab_msg->version != 1) { -                etiLog.level(error) << "ZeroMQ wrong packet version " << dab_msg->version; +    size_t num_consecutive_resets = 0; +    while (num_consecutive_resets < MAX_NUM_RESETS) { +        zmq::socket_t zmq_sock(zmq_ctx, ZMQ_SUB); +        zmq_sock.connect(source_url); +        zmq_sock.setsockopt(ZMQ_SUBSCRIBE, NULL, 0); // subscribe to all messages + +        size_t error_count = 0; + +        while (error_count < MAX_ERROR_COUNT) { +            zmq::message_t incoming; +            zmq::pollitem_t items[1]; +            items[0].socket = zmq_sock; +            items[0].events = ZMQ_POLLIN; +            const int num_events = zmq::poll(items, 1, ZMQ_TIMEOUT_MS); +            if (num_events == 0) { // timeout                  error_count++;              } +            else { +                num_consecutive_resets = 0; -            int offset = sizeof(dab_msg->version) + -                NUM_FRAMES_PER_ZMQ_MESSAGE * sizeof(*dab_msg->buflen); +                // Event received: recv will not block +                zmq_sock.recv(&incoming); -            std::list<std::pair<std::vector<uint8_t>, metadata_t> > all_frames; +                zmq_dab_message_t* dab_msg = (zmq_dab_message_t*)incoming.data(); -            for (int i = 0; i < NUM_FRAMES_PER_ZMQ_MESSAGE; i++) { -                if (dab_msg->buflen[i] <= 0 or dab_msg->buflen[i] > 6144) { -                    etiLog.level(error) << "ZeroMQ buffer " << i << -                        " has invalid length " << dab_msg->buflen[i]; +                if (dab_msg->version != 1) { +                    etiLog.level(error) << "ZeroMQ wrong packet version " << dab_msg->version;                      error_count++;                  } -                else { -                    std::vector<uint8_t> buf(6144, 0x55); -                    const int framesize = dab_msg->buflen[i]; +                int offset = sizeof(dab_msg->version) + +                    NUM_FRAMES_PER_ZMQ_MESSAGE * sizeof(*dab_msg->buflen); + +                std::list<std::pair<std::vector<uint8_t>, metadata_t> > all_frames; + +                for (int i = 0; i < NUM_FRAMES_PER_ZMQ_MESSAGE; i++) { +                    if (dab_msg->buflen[i] <= 0 or dab_msg->buflen[i] > 6144) { +                        etiLog.level(error) << "ZeroMQ buffer " << i << +                            " has invalid length " << dab_msg->buflen[i]; +                        error_count++; +                    } +                    else { +                        std::vector<uint8_t> buf(6144, 0x55); -                    memcpy(&buf.front(), -                            ((uint8_t*)incoming.data()) + offset, -                            framesize); +                        const int framesize = dab_msg->buflen[i]; -                    all_frames.emplace_back( -                            std::piecewise_construct, -                            std::make_tuple(std::move(buf)), -                            std::make_tuple()); +                        memcpy(&buf.front(), +                                ((uint8_t*)incoming.data()) + offset, +                                framesize); -                    offset += framesize; +                        all_frames.emplace_back( +                                std::piecewise_construct, +                                std::make_tuple(std::move(buf)), +                                std::make_tuple()); + +                        offset += framesize; +                    }                  } -            } -            for (auto &f : all_frames) { -                size_t consumed_bytes = 0; +                for (auto &f : all_frames) { +                    size_t consumed_bytes = 0; -                f.second = get_md_one_frame( -                        static_cast<uint8_t*>(incoming.data()) + offset, -                        incoming.size() - offset, -                        &consumed_bytes); +                    f.second = get_md_one_frame( +                            static_cast<uint8_t*>(incoming.data()) + offset, +                            incoming.size() - offset, +                            &consumed_bytes); -                offset += consumed_bytes; -            } +                    offset += consumed_bytes; +                } -            for (auto &f : all_frames) { -                edisender.push_frame(f); -                frame_count++; +                for (auto &f : all_frames) { +                    edisender.push_frame(f); +                }              }          } -    } -    etiLog.level(info) << "Quitting after " << frame_count << " frames transferred"; +        num_consecutive_resets++; + +        zmq_sock.close(); +        std::this_thread::sleep_for(std::chrono::seconds(1)); +        etiLog.level(info) << "ZMQ input (" << source_url << ") timeout after " << +            num_consecutive_resets << " consecutive resets."; +    }      return 0;  } diff --git a/src/zmq2farsync/zmq2farsync.cpp b/src/zmq2farsync/zmq2farsync.cpp index 16830a2..95dc074 100644 --- a/src/zmq2farsync/zmq2farsync.cpp +++ b/src/zmq2farsync/zmq2farsync.cpp @@ -3,7 +3,7 @@     2011, 2012 Her Majesty the Queen in Right of Canada (Communications     Research Center Canada) -   Copyright (C) 2018 +   Copyright (C) 2019     Matthias P. Braendli, matthias.braendli@mpb.li      http://www.opendigitalradio.org @@ -28,13 +28,16 @@  #include "dabOutput/dabOutput.h"  #include "Log.h"  #include "zmq.hpp" +#include <chrono>  #include <iostream> +#include <thread>  #include <vector>  constexpr size_t MAX_ERROR_COUNT = 10; +constexpr size_t MAX_NUM_RESETS = 180;  constexpr long ZMQ_TIMEOUT_MS = 1000; -void usage(void) +static void usage()  {      using namespace std; @@ -46,8 +49,9 @@ void usage(void)      cerr << " <destination> is the device information for the FarSync card." << endl << endl;      cerr << " The syntax is the same as for ODR-DabMux" << endl << endl; -    cerr << "odr-zmq2edi will quit if it does not receive data for " << +    cerr << "The input socket will be reset if no data is received for " <<          (int)(MAX_ERROR_COUNT * ZMQ_TIMEOUT_MS / 1000.0) << " seconds." << endl; +    cerr << "After " << MAX_NUM_RESETS << " consecutive resets, the process will quit." << endl;      cerr << "It is best practice to run this tool under a process supervisor that will restart it automatically." << endl;  } @@ -80,72 +84,81 @@ int main(int argc, char **argv)      etiLog.level(info) << "Opening ZMQ input: " << source_url;      zmq::context_t zmq_ctx(1); -    zmq::socket_t zmq_sock(zmq_ctx, ZMQ_SUB); - -    zmq_sock.connect(source_url); -    zmq_sock.setsockopt(ZMQ_SUBSCRIBE, NULL, 0); // subscribe to all messages - -    etiLog.level(info) << "Entering main loop";      size_t frame_count = 0;      size_t loop_counter = 0; -    size_t error_count = 0; -    while (error_count < MAX_ERROR_COUNT) -    { -        zmq::message_t incoming; -        zmq::pollitem_t items[1]; -        items[0].socket = zmq_sock; -        items[0].events = ZMQ_POLLIN; -        const int num_events = zmq::poll(items, 1, ZMQ_TIMEOUT_MS); -        if (num_events == 0) { // timeout -            error_count++; -        } -        else { -            // Event received: recv will not block -            zmq_sock.recv(&incoming); -            zmq_dab_message_t* dab_msg = (zmq_dab_message_t*)incoming.data(); - -            if (dab_msg->version != 1) { -                etiLog.level(error) << "ZeroMQ wrong packet version " << dab_msg->version; +    size_t num_consecutive_resets = 0; +    while (num_consecutive_resets < MAX_NUM_RESETS) { +        zmq::socket_t zmq_sock(zmq_ctx, ZMQ_SUB); +        zmq_sock.connect(source_url); +        zmq_sock.setsockopt(ZMQ_SUBSCRIBE, NULL, 0); // subscribe to all messages + +        size_t error_count = 0; +        while (error_count < MAX_ERROR_COUNT) { +            zmq::message_t incoming; +            zmq::pollitem_t items[1]; +            items[0].socket = zmq_sock; +            items[0].events = ZMQ_POLLIN; +            const int num_events = zmq::poll(items, 1, ZMQ_TIMEOUT_MS); +            if (num_events == 0) { // timeout                  error_count++;              } +            else { +                num_consecutive_resets = 0; -            int offset = sizeof(dab_msg->version) + -                NUM_FRAMES_PER_ZMQ_MESSAGE * sizeof(*dab_msg->buflen); +                // Event received: recv will not block +                zmq_sock.recv(&incoming); +                zmq_dab_message_t* dab_msg = (zmq_dab_message_t*)incoming.data(); -            for (int i = 0; i < NUM_FRAMES_PER_ZMQ_MESSAGE; i++) { -                if (dab_msg->buflen[i] <= 0 or dab_msg->buflen[i] > 6144) { -                    etiLog.level(error) << "ZeroMQ buffer " << i << " has invalid length " << -                        dab_msg->buflen[i]; +                if (dab_msg->version != 1) { +                    etiLog.level(error) << "ZeroMQ wrong packet version " << dab_msg->version;                      error_count++;                  } -                else { -                    std::vector<uint8_t> buf(6144, 0x55); -                    const int framesize = dab_msg->buflen[i]; +                int offset = sizeof(dab_msg->version) + +                    NUM_FRAMES_PER_ZMQ_MESSAGE * sizeof(*dab_msg->buflen); -                    memcpy(&buf.front(), -                            ((uint8_t*)incoming.data()) + offset, -                            framesize); - -                    offset += framesize; - -                    if (output.Write(&buf.front(), buf.size()) == -1) { -                        etiLog.level(error) << "Cannot write to output!"; +                for (int i = 0; i < NUM_FRAMES_PER_ZMQ_MESSAGE; i++) { +                    if (dab_msg->buflen[i] <= 0 or dab_msg->buflen[i] > 6144) { +                        etiLog.level(error) << "ZeroMQ buffer " << i << " has invalid length " << +                            dab_msg->buflen[i];                          error_count++;                      } +                    else { +                        std::vector<uint8_t> buf(6144, 0x55); + +                        const int framesize = dab_msg->buflen[i]; + +                        memcpy(&buf.front(), +                                ((uint8_t*)incoming.data()) + offset, +                                framesize); -                    frame_count++; +                        offset += framesize; + +                        if (output.Write(&buf.front(), buf.size()) == -1) { +                            etiLog.level(error) << "Cannot write to output!"; +                            error_count++; +                        } + +                        frame_count++; +                    }                  } -            } -            loop_counter++; -            if (loop_counter > 250) { -                etiLog.level(info) << "Transmitted " << frame_count << " ETI frames"; -                loop_counter = 0; +                loop_counter++; +                if (loop_counter > 250) { +                    etiLog.level(info) << "Transmitted " << frame_count << " ETI frames"; +                    loop_counter = 0; +                }              }          } + +        num_consecutive_resets++; + +        zmq_sock.close(); +        std::this_thread::sleep_for(std::chrono::seconds(1)); +        etiLog.level(info) << "ZMQ input (" << source_url << ") timeout after " << +            num_consecutive_resets << " consecutive resets.";      } -    return error_count > 0 ? 2 : 0; +    return 0;  }  | 
