diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/zmq2farsync/zmq2farsync.cpp | 84 | 
1 files changed, 49 insertions, 35 deletions
diff --git a/src/zmq2farsync/zmq2farsync.cpp b/src/zmq2farsync/zmq2farsync.cpp index 72f4cc3..16830a2 100644 --- a/src/zmq2farsync/zmq2farsync.cpp +++ b/src/zmq2farsync/zmq2farsync.cpp @@ -3,7 +3,7 @@     2011, 2012 Her Majesty the Queen in Right of Canada (Communications     Research Center Canada) -   Copyright (C) 2015 +   Copyright (C) 2018     Matthias P. Braendli, matthias.braendli@mpb.li      http://www.opendigitalradio.org @@ -31,6 +31,9 @@  #include <iostream>  #include <vector> +constexpr size_t MAX_ERROR_COUNT = 10; +constexpr long ZMQ_TIMEOUT_MS = 1000; +  void usage(void)  {      using namespace std; @@ -41,7 +44,11 @@ void usage(void)      cerr << "Where" << endl;      cerr << " <source> is a ZMQ URL that points to a ODR-DabMux ZMQ output." << endl;      cerr << " <destination> is the device information for the FarSync card." << endl << endl; -    cerr << " The syntax is the same as for ODR-DabMux" << endl; +    cerr << " The syntax is the same as for ODR-DabMux" << endl << endl; + +    cerr << "odr-zmq2edi will quit if it does not receive data for " << +        (int)(MAX_ERROR_COUNT * ZMQ_TIMEOUT_MS / 1000.0) << " seconds." << endl; +    cerr << "It is best practice to run this tool under a process supervisor that will restart it automatically." << endl;  }  int main(int argc, char **argv) @@ -82,53 +89,60 @@ int main(int argc, char **argv)      size_t frame_count = 0;      size_t loop_counter = 0;      size_t error_count = 0; -    while (error_count < 10) +    while (error_count < MAX_ERROR_COUNT)      {          zmq::message_t incoming; -        zmq_sock.recv(&incoming); - -        zmq_dab_message_t* dab_msg = (zmq_dab_message_t*)incoming.data(); - -        if (dab_msg->version != 1) { -            etiLog.level(error) << "ZeroMQ wrong packet version " << dab_msg->version; +        zmq::pollitem_t items[1]; +        items[0].socket = zmq_sock; +        items[0].events = ZMQ_POLLIN; +        const int num_events = zmq::poll(items, 1, ZMQ_TIMEOUT_MS); +        if (num_events == 0) { // timeout              error_count++;          } +        else { +            // Event received: recv will not block +            zmq_sock.recv(&incoming); +            zmq_dab_message_t* dab_msg = (zmq_dab_message_t*)incoming.data(); -        int offset = sizeof(dab_msg->version) + -            NUM_FRAMES_PER_ZMQ_MESSAGE * sizeof(*dab_msg->buflen); - -        for (int i = 0; i < NUM_FRAMES_PER_ZMQ_MESSAGE; i++) { -            if (dab_msg->buflen[i] <= 0 || -                dab_msg->buflen[i] > 6144) -            { -                etiLog.level(error) << "ZeroMQ buffer " << i << " has invalid length " << -                    dab_msg->buflen[i]; +            if (dab_msg->version != 1) { +                etiLog.level(error) << "ZeroMQ wrong packet version " << dab_msg->version;                  error_count++;              } -            else { -                std::vector<uint8_t> buf(6144, 0x55); - -                const int framesize = dab_msg->buflen[i]; - -                memcpy(&buf.front(), -                        ((uint8_t*)incoming.data()) + offset, -                        framesize); -                offset += framesize; +            int offset = sizeof(dab_msg->version) + +                NUM_FRAMES_PER_ZMQ_MESSAGE * sizeof(*dab_msg->buflen); -                if (output.Write(&buf.front(), buf.size()) == -1) { -                    etiLog.level(error) << "Cannot write to output!"; +            for (int i = 0; i < NUM_FRAMES_PER_ZMQ_MESSAGE; i++) { +                if (dab_msg->buflen[i] <= 0 or dab_msg->buflen[i] > 6144) { +                    etiLog.level(error) << "ZeroMQ buffer " << i << " has invalid length " << +                        dab_msg->buflen[i];                      error_count++;                  } +                else { +                    std::vector<uint8_t> buf(6144, 0x55); -                frame_count++; +                    const int framesize = dab_msg->buflen[i]; + +                    memcpy(&buf.front(), +                            ((uint8_t*)incoming.data()) + offset, +                            framesize); + +                    offset += framesize; + +                    if (output.Write(&buf.front(), buf.size()) == -1) { +                        etiLog.level(error) << "Cannot write to output!"; +                        error_count++; +                    } + +                    frame_count++; +                }              } -        } -        loop_counter++; -        if (loop_counter > 250) { -            etiLog.level(info) << "Transmitted " << frame_count << " ETI frames"; -            loop_counter = 0; +            loop_counter++; +            if (loop_counter > 250) { +                etiLog.level(info) << "Transmitted " << frame_count << " ETI frames"; +                loop_counter = 0; +            }          }      }  | 
