diff options
| -rw-r--r-- | Makefile.am | 2 | ||||
| -rw-r--r-- | lib/RemoteControl.cpp | 18 | ||||
| -rw-r--r-- | lib/Socket.cpp | 154 | ||||
| -rw-r--r-- | lib/Socket.h | 65 | ||||
| -rw-r--r-- | lib/edi/ETIDecoder.cpp | 6 | ||||
| -rw-r--r-- | lib/edi/ETIDecoder.hpp | 6 | ||||
| -rw-r--r-- | lib/edi/PFT.cpp | 40 | ||||
| -rw-r--r-- | lib/edi/PFT.hpp | 7 | ||||
| -rw-r--r-- | lib/edi/README.md | 2 | ||||
| -rw-r--r-- | lib/edi/common.cpp | 31 | ||||
| -rw-r--r-- | lib/edi/common.hpp | 16 | ||||
| -rw-r--r-- | lib/zmq.hpp | 2120 | ||||
| -rw-r--r-- | src/EtiReader.cpp | 59 | ||||
| -rw-r--r-- | src/EtiReader.h | 8 | ||||
| -rw-r--r-- | src/InputZeroMQReader.cpp | 2 | ||||
| -rw-r--r-- | src/OutputZeroMQ.cpp | 4 | ||||
| -rw-r--r-- | src/zmq.hpp | 602 | 
17 files changed, 2401 insertions, 741 deletions
| diff --git a/Makefile.am b/Makefile.am index 64e8c78..9afa08a 100644 --- a/Makefile.am +++ b/Makefile.am @@ -94,7 +94,7 @@ odr_dabmod_SOURCES  = src/DabMod.cpp \  					  src/FormatConverter.h \  					  src/Utils.cpp \  					  src/Utils.h \ -					  src/zmq.hpp \ +					  lib/zmq.hpp \  					  lib/RemoteControl.cpp \  					  lib/RemoteControl.h \  					  lib/Log.cpp \ diff --git a/lib/RemoteControl.cpp b/lib/RemoteControl.cpp index 4adb90c..9ca8d22 100644 --- a/lib/RemoteControl.cpp +++ b/lib/RemoteControl.cpp @@ -29,6 +29,7 @@  #include <algorithm>  #include "RemoteControl.h" +#include "zmq.hpp"  using namespace std; @@ -424,7 +425,7 @@ void RemoteControllerZmq::recv_all(zmq::socket_t& pSocket, std::vector<std::stri      bool more = true;      do {          zmq::message_t msg; -        pSocket.recv(&msg); +        pSocket.recv(msg);          std::string incoming((char*)msg.data(), msg.size());          message.push_back(incoming);          more = msg.more(); @@ -436,7 +437,7 @@ void RemoteControllerZmq::send_ok_reply(zmq::socket_t &pSocket)      zmq::message_t msg(2);      char repCode[2] = {'o', 'k'};      memcpy ((void*) msg.data(), repCode, 2); -    pSocket.send(msg, 0); +    pSocket.send(msg, zmq::send_flags::none);  }  void RemoteControllerZmq::send_fail_reply(zmq::socket_t &pSocket, const std::string &error) @@ -444,11 +445,11 @@ void RemoteControllerZmq::send_fail_reply(zmq::socket_t &pSocket, const std::str      zmq::message_t msg1(4);      char repCode[4] = {'f', 'a', 'i', 'l'};      memcpy ((void*) msg1.data(), repCode, 4); -    pSocket.send(msg1, ZMQ_SNDMORE); +    pSocket.send(msg1, zmq::send_flags::sndmore);      zmq::message_t msg2(error.length());      memcpy ((void*) msg2.data(), error.c_str(), error.length()); -    pSocket.send(msg2, 0); +    pSocket.send(msg2, zmq::send_flags::none);  }  void RemoteControllerZmq::process() @@ -508,8 +509,7 @@ void RemoteControllerZmq::process()                          zmq::message_t zmsg(ss.str().size());                          memcpy ((void*) zmsg.data(), msg_s.data(), msg_s.size()); -                        int flag = (--cohort_size > 0) ? ZMQ_SNDMORE : 0; -                        repSocket.send(zmsg, flag); +                        repSocket.send(zmsg, (--cohort_size > 0) ? zmq::send_flags::sndmore : zmq::send_flags::none);                      }                  }                  else if (msg.size() == 2 && command == "show") { @@ -523,8 +523,7 @@ void RemoteControllerZmq::process()                              zmq::message_t zmsg(ss.str().size());                              memcpy(zmsg.data(), ss.str().data(), ss.str().size()); -                            int flag = (--r_size > 0) ? ZMQ_SNDMORE : 0; -                            repSocket.send(zmsg, flag); +                            repSocket.send(zmsg, (--r_size > 0) ? zmq::send_flags::sndmore : zmq::send_flags::none);                          }                      }                      catch (const ParameterError &err) { @@ -539,7 +538,7 @@ void RemoteControllerZmq::process()                          std::string value = rcs.get_param(module, parameter);                          zmq::message_t zmsg(value.size());                          memcpy ((void*) zmsg.data(), value.data(), value.size()); -                        repSocket.send(zmsg, 0); +                        repSocket.send(zmsg, zmq::send_flags::none);                      }                      catch (const ParameterError &err) {                          send_fail_reply(repSocket, err.what()); @@ -576,4 +575,3 @@ void RemoteControllerZmq::process()  }  #endif - diff --git a/lib/Socket.cpp b/lib/Socket.cpp index d41ed1c..c876f32 100644 --- a/lib/Socket.cpp +++ b/lib/Socket.cpp @@ -66,6 +66,18 @@ void InetAddress::resolveUdpDestination(const std::string& destination, int port      }  } +string InetAddress::to_string() const +{ +    char received_from_str[64] = {}; +    sockaddr *addr = reinterpret_cast<sockaddr*>(&addr); +    const char* ret = inet_ntop(AF_INET, addr, received_from_str, 63); + +    if (ret == nullptr) { +        throw invalid_argument(string("Error converting InetAddress") + strerror(errno)); +    } +    return ret; +} +  UDPPacket::UDPPacket() { }  UDPPacket::UDPPacket(size_t initSize) : @@ -74,24 +86,37 @@ UDPPacket::UDPPacket(size_t initSize) :  { } -UDPSocket::UDPSocket() : -    m_sock(INVALID_SOCKET) +UDPSocket::UDPSocket()  {      reinit(0, "");  } -UDPSocket::UDPSocket(int port) : -    m_sock(INVALID_SOCKET) +UDPSocket::UDPSocket(int port)  {      reinit(port, "");  } -UDPSocket::UDPSocket(int port, const std::string& name) : -    m_sock(INVALID_SOCKET) +UDPSocket::UDPSocket(int port, const std::string& name)  {      reinit(port, name);  } +UDPSocket::UDPSocket(UDPSocket&& other) +{ +    m_sock = other.m_sock; +    m_port = other.m_port; +    other.m_port = 0; +    other.m_sock = INVALID_SOCKET; +} + +const UDPSocket& UDPSocket::operator=(UDPSocket&& other) +{ +    m_sock = other.m_sock; +    m_port = other.m_port; +    other.m_port = 0; +    other.m_sock = INVALID_SOCKET; +    return *this; +}  void UDPSocket::setBlocking(bool block)  { @@ -112,6 +137,8 @@ void UDPSocket::reinit(int port, const std::string& name)          ::close(m_sock);      } +    m_port = port; +      if (port == 0) {          // No need to bind to a given port, creating the          // socket is enough @@ -276,72 +303,71 @@ void UDPSocket::setMulticastTTL(int ttl)      }  } -UDPReceiver::UDPReceiver() { } - -UDPReceiver::~UDPReceiver() { -    m_stop = true; -    m_sock.close(); -    if (m_thread.joinable()) { -        m_thread.join(); -    } +SOCKET UDPSocket::getNativeSocket() const +{ +    return m_sock;  } -void UDPReceiver::start(int port, const string& bindto, const string& mcastaddr, size_t max_packets_queued) { -    m_port = port; -    m_bindto = bindto; -    m_mcastaddr = mcastaddr; -    m_max_packets_queued = max_packets_queued; -    m_thread = std::thread(&UDPReceiver::m_run, this); +int UDPSocket::getPort() const +{ +    return m_port;  } -std::vector<uint8_t> UDPReceiver::get_packet_buffer() -{ -    if (m_stop) { -        throw runtime_error("UDP Receiver not running"); -    } +void UDPReceiver::add_receive_port(int port, const string& bindto, const string& mcastaddr) { +    UDPSocket sock; -    UDPPacket p; -    m_packets.wait_and_pop(p); +    if (IN_MULTICAST(ntohl(inet_addr(mcastaddr.c_str())))) { +        sock.reinit(port, mcastaddr); +        sock.setMulticastSource(bindto.c_str()); +        sock.joinGroup(mcastaddr.c_str(), bindto.c_str()); +    } +    else { +        sock.reinit(port, bindto); +    } -    return p.buffer; +    m_sockets.push_back(move(sock));  } -void UDPReceiver::m_run() +vector<UDPReceiver::ReceivedPacket> UDPReceiver::receive(int timeout_ms)  { -    // Ensure that stop is set to true in case of exception or return -    struct SetStopOnDestruct { -        SetStopOnDestruct(atomic<bool>& stop) : m_stop(stop) {} -        ~SetStopOnDestruct() { m_stop = true; } -        private: atomic<bool>& m_stop; -    } autoSetStop(m_stop); - -    if (IN_MULTICAST(ntohl(inet_addr(m_mcastaddr.c_str())))) { -        m_sock.reinit(m_port, m_mcastaddr); -        m_sock.setMulticastSource(m_bindto.c_str()); -        m_sock.joinGroup(m_mcastaddr.c_str(), m_bindto.c_str()); +    constexpr size_t MAX_FDS = 64; +    struct pollfd fds[MAX_FDS]; +    if (m_sockets.size() > MAX_FDS) { +        throw std::runtime_error("UDPReceiver only supports up to 64 ports");      } -    else { -        m_sock.reinit(m_port, m_bindto); + +    for (size_t i = 0; i < m_sockets.size(); i++) { +        fds[i].fd = m_sockets[i].getNativeSocket(); +        fds[i].events = POLLIN;      } -    while (not m_stop) { -        constexpr size_t packsize = 8192; -        try { -            auto packet = m_sock.receive(packsize); -            if (packet.buffer.size() == packsize) { -                // TODO replace fprintf -                fprintf(stderr, "Warning, possible UDP truncation\n"); -            } +    int retval = poll(fds, m_sockets.size(), timeout_ms); -            // If this blocks, the UDP socket will lose incoming packets -            m_packets.push_wait_if_full(packet, m_max_packets_queued); -        } -        catch (const std::runtime_error& e) { -            // TODO replace fprintf -            // TODO handle intr -            fprintf(stderr, "Socket error: %s\n", e.what()); -            m_stop = true; +    if (retval == -1 and errno == EINTR) { +        throw Interrupted(); +    } +    else if (retval == -1) { +        std::string errstr(strerror(errno)); +        throw std::runtime_error("UDP receive with poll() error: " + errstr); +    } +    else if (retval > 0) { +        vector<ReceivedPacket> received; + +        for (size_t i = 0; i < m_sockets.size(); i++) { +            if (fds[i].revents & POLLIN) { +                auto p = m_sockets[i].receive(2048); // This is larger than the usual MTU +                ReceivedPacket rp; +                rp.packetdata = move(p.buffer); +                rp.received_from = move(p.address); +                rp.port_received_on = m_sockets[i].getPort(); +                received.push_back(move(rp)); +            }          } + +        return received; +    } +    else { +        throw Timeout();      }  } @@ -862,9 +888,9 @@ TCPReceiveServer::~TCPReceiveServer()      }  } -vector<uint8_t> TCPReceiveServer::receive() +shared_ptr<TCPReceiveMessage> TCPReceiveServer::receive()  { -    vector<uint8_t> buffer; +    shared_ptr<TCPReceiveMessage> buffer = make_shared<TCPReceiveMessageEmpty>();      m_queue.try_pop(buffer);      // we can ignore try_pop()'s return value, because @@ -892,11 +918,12 @@ void TCPReceiveServer::process()                  }                  else if (r == 0) {                      sock.close(); +                    m_queue.push(make_shared<TCPReceiveMessageDisconnected>());                      break;                  }                  else {                      buf.resize(r); -                    m_queue.push(move(buf)); +                    m_queue.push(make_shared<TCPReceiveMessageData>(move(buf)));                  }              }              catch (const TCPSocket::Interrupted&) { @@ -905,6 +932,11 @@ void TCPReceiveServer::process()              catch (const TCPSocket::Timeout&) {                  num_timeouts++;              } +            catch (const runtime_error& e) { +                sock.close(); +                // TODO replace fprintf +                fprintf(stderr, "TCP Receiver restarted after error: %s\n", e.what()); +            }              if (num_timeouts > max_num_timeouts) {                  sock.close(); diff --git a/lib/Socket.h b/lib/Socket.h index 8881be3..33cdc05 100644 --- a/lib/Socket.h +++ b/lib/Socket.h @@ -30,11 +30,12 @@  #include "ThreadsafeQueue.h"  #include <cstdlib> -#include <iostream> -#include <vector>  #include <atomic> -#include <thread> +#include <iostream>  #include <list> +#include <memory> +#include <thread> +#include <vector>  #include <sys/socket.h>  #include <netinet/in.h> @@ -55,6 +56,8 @@ struct InetAddress {      struct sockaddr *as_sockaddr() { return reinterpret_cast<sockaddr*>(&addr); };      void resolveUdpDestination(const std::string& destination, int port); + +    std::string to_string() const;  };  /** This class represents a UDP packet. @@ -102,6 +105,8 @@ class UDPSocket          ~UDPSocket();          UDPSocket(const UDPSocket& other) = delete;          const UDPSocket& operator=(const UDPSocket& other) = delete; +        UDPSocket(UDPSocket&& other); +        const UDPSocket& operator=(UDPSocket&& other);          /** Close the already open socket, and create a new one. Throws a runtime_error on error.  */          void reinit(int port); @@ -120,36 +125,36 @@ class UDPSocket           */          void setBlocking(bool block); +        SOCKET getNativeSocket() const; +        int getPort() const; +      protected: -        SOCKET m_sock; +        SOCKET m_sock = INVALID_SOCKET; +        int m_port = 0;  }; -/* Threaded UDP receiver */ +/* UDP packet receiver supporting receiving from several ports at once */  class UDPReceiver {      public: -        UDPReceiver(); -        ~UDPReceiver(); -        UDPReceiver(const UDPReceiver&) = delete; -        UDPReceiver operator=(const UDPReceiver&) = delete; +        void add_receive_port(int port, const std::string& bindto, const std::string& mcastaddr); -        // Start the receiver in a separate thread -        void start(int port, const std::string& bindto, const std::string& mcastaddr, size_t max_packets_queued); +        struct ReceivedPacket { +            std::vector<uint8_t> packetdata; +            InetAddress received_from; +            int port_received_on; +        }; -        // Get the data contained in a UDP packet, blocks if none available -        // In case of error, throws a runtime_error -        std::vector<uint8_t> get_packet_buffer(void); +        class Interrupted {}; +        class Timeout {}; +        /* Returns one or several packets, +         * throws a Timeout on timeout, Interrupted on EINTR, a runtime_error +         * on error. */ +        std::vector<ReceivedPacket> receive(int timeout_ms);      private:          void m_run(void); -        int m_port = 0; -        std::string m_bindto; -        std::string m_mcastaddr; -        size_t m_max_packets_queued = 1; -        std::thread m_thread; -        std::atomic<bool> m_stop = ATOMIC_VAR_INIT(false); -        ThreadsafeQueue<UDPPacket> m_packets; -        UDPSocket m_sock; +        std::vector<UDPSocket> m_sockets;  };  class TCPSocket { @@ -265,6 +270,14 @@ class TCPDataDispatcher          std::list<TCPConnection> m_connections;  }; +struct TCPReceiveMessage { virtual ~TCPReceiveMessage() {}; }; +struct TCPReceiveMessageDisconnected : public TCPReceiveMessage { }; +struct TCPReceiveMessageEmpty : public TCPReceiveMessage { }; +struct TCPReceiveMessageData : public TCPReceiveMessage { +    TCPReceiveMessageData(std::vector<uint8_t> d) : data(d) {}; +    std::vector<uint8_t> data; +}; +  /* A TCP Server to receive data, which abstracts the handling of connects and disconnects.   */  class TCPReceiveServer { @@ -276,15 +289,15 @@ class TCPReceiveServer {          void start(int listen_port, const std::string& address); -        // Return a vector that contains up to blocksize bytes of data, or -        // and empty vector if no data is available. -        std::vector<uint8_t> receive(); +        // Return an instance of a subclass of TCPReceiveMessage that contains up to blocksize +        // bytes of data, or TCPReceiveMessageEmpty if no data is available. +        std::shared_ptr<TCPReceiveMessage> receive();      private:          void process();          size_t m_blocksize = 0; -        ThreadsafeQueue<std::vector<uint8_t> > m_queue; +        ThreadsafeQueue<std::shared_ptr<TCPReceiveMessage> > m_queue;          std::atomic<bool> m_running = ATOMIC_VAR_INIT(false);          std::string m_exception_data;          std::thread m_listener_thread; diff --git a/lib/edi/ETIDecoder.cpp b/lib/edi/ETIDecoder.cpp index 88a7333..656f50b 100644 --- a/lib/edi/ETIDecoder.cpp +++ b/lib/edi/ETIDecoder.cpp @@ -57,9 +57,9 @@ void ETIDecoder::push_bytes(const vector<uint8_t> &buf)      m_dispatcher.push_bytes(buf);  } -void ETIDecoder::push_packet(const vector<uint8_t> &buf) +void ETIDecoder::push_packet(Packet& pack)  { -    m_dispatcher.push_packet(buf); +    m_dispatcher.push_packet(pack);  }  void ETIDecoder::setMaxDelay(int num_af_packets) @@ -107,7 +107,7 @@ bool ETIDecoder::decode_deti(const std::vector<uint8_t>& value, const tag_name_t      uint8_t fcth = (detiHeader >> 8) & 0x1F;      uint8_t fct = detiHeader & 0xFF; -    fc.dflc = fcth * 250 + fct; // modulo 5000 counter +    fc.dlfc = fcth * 250 + fct; // modulo 5000 counter      uint32_t etiHeader = read_32b(value.begin() + 2); diff --git a/lib/edi/ETIDecoder.hpp b/lib/edi/ETIDecoder.hpp index ffa9037..e0865ce 100644 --- a/lib/edi/ETIDecoder.hpp +++ b/lib/edi/ETIDecoder.hpp @@ -38,11 +38,11 @@ struct eti_fc_data {      bool atstf;      uint32_t tsta;      bool ficf; -    uint16_t dflc; +    uint16_t dlfc;      uint8_t mid;      uint8_t fp; -    uint8_t fct(void) const { return dflc % 250; } +    uint8_t fct(void) const { return dlfc % 250; }  };  // Information for a subchannel available in EDI @@ -119,7 +119,7 @@ class ETIDecoder {          /* Push a complete packet into the decoder. Useful for UDP and other           * datagram-oriented protocols.           */ -        void push_packet(const std::vector<uint8_t> &buf); +        void push_packet(Packet &pack);          /* Set the maximum delay in number of AF Packets before we           * abandon decoding a given pseq. diff --git a/lib/edi/PFT.cpp b/lib/edi/PFT.cpp index 158b206..85d6b63 100644 --- a/lib/edi/PFT.cpp +++ b/lib/edi/PFT.cpp @@ -23,6 +23,7 @@  #include <cstdio>  #include <cassert>  #include <cstring> +#include <cmath>  #include <sstream>  #include <stdexcept>  #include <algorithm> @@ -109,11 +110,18 @@ class FECDecoder {  size_t Fragment::loadData(const std::vector<uint8_t> &buf)  { +    return loadData(buf, 0); +} + +size_t Fragment::loadData(const std::vector<uint8_t> &buf, int received_on_port) +{      const size_t header_len = 14;      if (buf.size() < header_len) {          return 0;      } +    this->received_on_port = received_on_port; +      size_t index = 0;      // Parse PFT Fragment Header (ETSI TS 102 821 V1.4.1 ch7.1) @@ -461,6 +469,32 @@ std::string AFBuilder::visualise() const      return ss.str();  } +std::string AFBuilder::visualise_fragment_origins() const +{ +    stringstream ss; +    if (_fragments.size() == 0) { +        return "No fragments"; +    } +    else { +        ss << _fragments.size() << " fragments: "; +    } + +    std::map<int, size_t> port_count; + +    for (const auto& f : _fragments) { +        port_count[f.second.received_on_port]++; +    } + +    for (const auto& p : port_count) { +        ss << "p" << p.first << " " << +            std::round(100.0 * ((double)p.second) / (double)_fragments.size()) << "% "; +    } + +    ss << "\n"; + +    return ss.str(); +} +  void PFT::pushPFTFrag(const Fragment &fragment)  {      // Start decoding the first pseq we receive. In normal @@ -518,6 +552,9 @@ std::vector<uint8_t> PFT::getNextAFPacket()      if (builder.canAttemptToDecode() == dar_t::yes) {          auto afpacket = builder.extractAF();          assert(not afpacket.empty()); +        if (m_verbose) { +            etiLog.level(debug) << "Fragment origin stats: " << builder.visualise_fragment_origins(); +        }          incrementNextPseq();          return afpacket;      } @@ -533,6 +570,9 @@ std::vector<uint8_t> PFT::getNextAFPacket()              if (afpacket.empty()) {                  etiLog.log(debug,"pseq %d timed out after RS", m_next_pseq);              } +            if (m_verbose) { +                etiLog.level(debug) << "Fragment origin stats: " << builder.visualise_fragment_origins(); +            }              incrementNextPseq();              return afpacket;          } diff --git a/lib/edi/PFT.hpp b/lib/edi/PFT.hpp index 208fd70..08dca45 100644 --- a/lib/edi/PFT.hpp +++ b/lib/edi/PFT.hpp @@ -36,11 +36,14 @@ using findex_t = uint32_t; // findex is a 24-bit value  class Fragment  {      public: +        int received_on_port = 0; +          // Load the data for one fragment from buf into          // the Fragment.          // \returns the number of bytes of useful data found in buf          // A non-zero return value doesn't imply a valid fragment          // the isValid() method must be used to verify this. +        size_t loadData(const std::vector<uint8_t> &buf, int received_on_port);          size_t loadData(const std::vector<uint8_t> &buf);          bool isValid() const { return _valid; } @@ -111,7 +114,9 @@ class AFBuilder                  return {_fragments.size(), _Fcount};              } -        std::string visualise(void) const; +        std::string visualise() const; + +        std::string visualise_fragment_origins() const;          /* The user of this instance can keep track of the lifetime of this           * builder diff --git a/lib/edi/README.md b/lib/edi/README.md index b6ab67a..535a65c 100644 --- a/lib/edi/README.md +++ b/lib/edi/README.md @@ -1 +1 @@ -These files are copied from the odr-edilib project. +These files are copied from the common ODR code repository. diff --git a/lib/edi/common.cpp b/lib/edi/common.cpp index 306261a..7907656 100644 --- a/lib/edi/common.cpp +++ b/lib/edi/common.cpp @@ -22,6 +22,7 @@  #include "buffer_unpack.hpp"  #include "Log.h"  #include "crc.h" +#include <algorithm>  #include <sstream>  #include <cassert>  #include <cmath> @@ -142,6 +143,12 @@ void TagDispatcher::set_verbose(bool verbose)  void TagDispatcher::push_bytes(const vector<uint8_t> &buf)  { +    if (buf.empty()) { +        m_input_data.clear(); +        m_last_seq_valid = false; +        return; +    } +      copy(buf.begin(), buf.end(), back_inserter(m_input_data));      while (m_input_data.size() > 2) { @@ -194,14 +201,16 @@ void TagDispatcher::push_bytes(const vector<uint8_t> &buf)              }          }          else { -            etiLog.log(warn,"Unknown %c!", *m_input_data.data()); +            etiLog.log(warn, "Unknown 0x%02x!", *m_input_data.data());              m_input_data.erase(m_input_data.begin());          }      }  } -void TagDispatcher::push_packet(const vector<uint8_t> &buf) +void TagDispatcher::push_packet(const Packet &packet)  { +    auto& buf = packet.buf; +      if (buf.size() < 2) {          throw std::invalid_argument("Not enough bytes to read EDI packet header");      } @@ -216,7 +225,7 @@ void TagDispatcher::push_packet(const vector<uint8_t> &buf)      }      else if (buf[0] == 'P' and buf[1] == 'F') {          PFT::Fragment fragment; -        fragment.loadData(buf); +        fragment.loadData(buf, packet.received_on_port);          if (fragment.isValid()) {              m_pft.pushPFTFrag(fragment); @@ -232,11 +241,10 @@ void TagDispatcher::push_packet(const vector<uint8_t> &buf)          }      }      else { -        const char packettype[3] = {(char)buf[0], (char)buf[1], '\0'};          std::stringstream ss; -        ss << "Unknown EDI packet "; -        ss << packettype; -        throw std::invalid_argument(ss.str()); +        ss << "Unknown EDI packet " << std::hex << (int)buf[0] << " " << (int)buf[1]; +        m_ignored_tags.clear(); +        throw invalid_argument(ss.str());      }  } @@ -268,6 +276,7 @@ decode_state_t TagDispatcher::decode_afpacket(          const uint16_t expected_seq = m_last_seq + 1;          if (expected_seq != seq) {              etiLog.level(warn) << "EDI AF Packet sequence error, " << seq; +            m_ignored_tags.clear();          }      }      else { @@ -303,8 +312,7 @@ decode_state_t TagDispatcher::decode_afpacket(      uint16_t packet_crc = read_16b(input_data.begin() + AFPACKET_HEADER_LEN + taglength);      if (packet_crc != crc) { -        throw invalid_argument( -                "AF Packet crc wrong"); +        throw invalid_argument("AF Packet crc wrong");      }      else {          vector<uint8_t> payload(taglength); @@ -379,7 +387,10 @@ bool TagDispatcher::decode_tagpacket(const vector<uint8_t> &payload)          }          if (not found) { -            etiLog.log(warn, "Ignoring unknown TAG %s", tag.c_str()); +            if (std::find(m_ignored_tags.begin(), m_ignored_tags.end(), tag) == m_ignored_tags.end()) { +                etiLog.log(warn, "Ignoring unknown TAG %s", tag.c_str()); +                m_ignored_tags.push_back(tag); +            }              break;          } diff --git a/lib/edi/common.hpp b/lib/edi/common.hpp index c8c4bb3..14b91ba 100644 --- a/lib/edi/common.hpp +++ b/lib/edi/common.hpp @@ -60,6 +60,14 @@ using tag_name_t = std::array<uint8_t, 4>;  std::string tag_name_to_human_readable(const tag_name_t& name); +struct Packet { +    std::vector<uint8_t> buf; +    int received_on_port; + +    Packet(std::vector<uint8_t>&& b) : buf(b), received_on_port(0) { } +    Packet() {} +}; +  /* The TagDispatcher takes care of decoding EDI, with or without PFT, and   * will call functions when TAGs are encountered.   * @@ -72,17 +80,17 @@ class TagDispatcher {          void set_verbose(bool verbose); -          /* Push bytes into the decoder. The buf can contain more           * than a single packet. This is useful when reading from streams -         * (files, TCP) +         * (files, TCP). Pushing an empty buf will clear the internal decoder +         * state to ensure realignment (e.g. on stream reconnection)           */          void push_bytes(const std::vector<uint8_t> &buf);          /* Push a complete packet into the decoder. Useful for UDP and other           * datagram-oriented protocols.           */ -        void push_packet(const std::vector<uint8_t> &buf); +        void push_packet(const Packet &packet);          /* Set the maximum delay in number of AF Packets before we           * abandon decoding a given pseq. @@ -113,6 +121,8 @@ class TagDispatcher {          std::map<std::string, tag_handler> m_handlers;          std::function<void()> m_af_packet_completed;          tagpacket_handler m_tagpacket_handler; + +        std::vector<std::string> m_ignored_tags;  };  // Data carried inside the ODRv EDI TAG diff --git a/lib/zmq.hpp b/lib/zmq.hpp new file mode 100644 index 0000000..74a0574 --- /dev/null +++ b/lib/zmq.hpp @@ -0,0 +1,2120 @@ +/* +    Copyright (c) 2016-2017 ZeroMQ community +    Copyright (c) 2009-2011 250bpm s.r.o. +    Copyright (c) 2011 Botond Ballo +    Copyright (c) 2007-2009 iMatix Corporation + +    Permission is hereby granted, free of charge, to any person obtaining a copy +    of this software and associated documentation files (the "Software"), to +    deal in the Software without restriction, including without limitation the +    rights to use, copy, modify, merge, publish, distribute, sublicense, and/or +    sell copies of the Software, and to permit persons to whom the Software is +    furnished to do so, subject to the following conditions: + +    The above copyright notice and this permission notice shall be included in +    all copies or substantial portions of the Software. + +    THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +    IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +    FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +    AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +    LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +    FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS +    IN THE SOFTWARE. +*/ + +#ifndef __ZMQ_HPP_INCLUDED__ +#define __ZMQ_HPP_INCLUDED__ + +// macros defined if has a specific standard or greater +#if (defined(__cplusplus) && __cplusplus >= 201103L) || (defined(_MSC_VER) && _MSC_VER >= 1900) +    #define ZMQ_CPP11 +#endif +#if (defined(__cplusplus) && __cplusplus >= 201402L) || \ +    (defined(_HAS_CXX14) && _HAS_CXX14 == 1) || \ +    (defined(_HAS_CXX17) && _HAS_CXX17 == 1) // _HAS_CXX14 might not be defined when using C++17 on MSVC +    #define ZMQ_CPP14 +#endif +#if (defined(__cplusplus) && __cplusplus >= 201703L) || (defined(_HAS_CXX17) && _HAS_CXX17 == 1) +    #define ZMQ_CPP17 +#endif + +#if defined(ZMQ_CPP14) +#define ZMQ_DEPRECATED(msg) [[deprecated(msg)]] +#elif defined(_MSC_VER) +#define ZMQ_DEPRECATED(msg) __declspec(deprecated(msg)) +#elif defined(__GNUC__) +#define ZMQ_DEPRECATED(msg) __attribute__((deprecated(msg))) +#endif + +#if defined(ZMQ_CPP17) +#define ZMQ_NODISCARD [[nodiscard]] +#else +#define ZMQ_NODISCARD +#endif + +#if defined(ZMQ_CPP11) +#define ZMQ_NOTHROW noexcept +#define ZMQ_EXPLICIT explicit +#define ZMQ_OVERRIDE override +#define ZMQ_NULLPTR nullptr +#define ZMQ_CONSTEXPR_FN constexpr +#define ZMQ_CONSTEXPR_VAR constexpr +#else +#define ZMQ_NOTHROW throw() +#define ZMQ_EXPLICIT +#define ZMQ_OVERRIDE +#define ZMQ_NULLPTR 0 +#define ZMQ_CONSTEXPR_FN +#define ZMQ_CONSTEXPR_VAR const +#endif + +#include <zmq.h> + +#include <cassert> +#include <cstring> + +#include <algorithm> +#include <exception> +#include <iomanip> +#include <sstream> +#include <string> +#include <vector> +#ifdef ZMQ_CPP11 +#include <array> +#include <chrono> +#include <tuple> +#include <memory> +#endif +#ifdef ZMQ_CPP17 +#ifdef __has_include +#if __has_include(<optional>) +#include <optional> +#define ZMQ_HAS_OPTIONAL 1 +#endif +#if __has_include(<string_view>) +#include <string_view> +#define ZMQ_HAS_STRING_VIEW 1 +#endif +#endif + +#endif + +/*  Version macros for compile-time API version detection                     */ +#define CPPZMQ_VERSION_MAJOR 4 +#define CPPZMQ_VERSION_MINOR 6 +#define CPPZMQ_VERSION_PATCH 0 + +#define CPPZMQ_VERSION                                                              \ +    ZMQ_MAKE_VERSION(CPPZMQ_VERSION_MAJOR, CPPZMQ_VERSION_MINOR,                    \ +                     CPPZMQ_VERSION_PATCH) + +//  Detect whether the compiler supports C++11 rvalue references. +#if (defined(__GNUC__) && (__GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ > 2))   \ +     && defined(__GXX_EXPERIMENTAL_CXX0X__)) +#define ZMQ_HAS_RVALUE_REFS +#define ZMQ_DELETED_FUNCTION = delete +#elif defined(__clang__) +#if __has_feature(cxx_rvalue_references) +#define ZMQ_HAS_RVALUE_REFS +#endif + +#if __has_feature(cxx_deleted_functions) +#define ZMQ_DELETED_FUNCTION = delete +#else +#define ZMQ_DELETED_FUNCTION +#endif +#elif defined(_MSC_VER) && (_MSC_VER >= 1900) +#define ZMQ_HAS_RVALUE_REFS +#define ZMQ_DELETED_FUNCTION = delete +#elif defined(_MSC_VER) && (_MSC_VER >= 1600) +#define ZMQ_HAS_RVALUE_REFS +#define ZMQ_DELETED_FUNCTION +#else +#define ZMQ_DELETED_FUNCTION +#endif + +#if defined(ZMQ_CPP11) && !defined(__llvm__) && !defined(__INTEL_COMPILER) \ +    && defined(__GNUC__) && __GNUC__ < 5 +#define ZMQ_CPP11_PARTIAL +#elif defined(__GLIBCXX__) && __GLIBCXX__ < 20160805 +//the date here is the last date of gcc 4.9.4, which +// effectively means libstdc++ from gcc 5.5 and higher won't trigger this branch +#define ZMQ_CPP11_PARTIAL +#endif + +#ifdef ZMQ_CPP11 +#ifdef ZMQ_CPP11_PARTIAL +#define ZMQ_IS_TRIVIALLY_COPYABLE(T) __has_trivial_copy(T) +#else +#include <type_traits> +#define ZMQ_IS_TRIVIALLY_COPYABLE(T) std::is_trivially_copyable<T>::value +#endif +#endif + +#if ZMQ_VERSION >= ZMQ_MAKE_VERSION(3, 3, 0) +#define ZMQ_NEW_MONITOR_EVENT_LAYOUT +#endif + +#if ZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 1, 0) +#define ZMQ_HAS_PROXY_STEERABLE +/*  Socket event data  */ +typedef struct +{ +    uint16_t event; // id of the event as bitfield +    int32_t value;  // value is either error code, fd or reconnect interval +} zmq_event_t; +#endif + +// Avoid using deprecated message receive function when possible +#if ZMQ_VERSION < ZMQ_MAKE_VERSION(3, 2, 0) +#define zmq_msg_recv(msg, socket, flags) zmq_recvmsg(socket, msg, flags) +#endif + + +// In order to prevent unused variable warnings when building in non-debug +// mode use this macro to make assertions. +#ifndef NDEBUG +#define ZMQ_ASSERT(expression) assert(expression) +#else +#define ZMQ_ASSERT(expression) (void) (expression) +#endif + +namespace zmq +{ + +#ifdef ZMQ_CPP11 +namespace detail +{ +namespace ranges +{ +using std::begin; +using std::end; +template<class T> +auto begin(T&& r) -> decltype(begin(std::forward<T>(r))) +{ +    return begin(std::forward<T>(r)); +} +template<class T> +auto end(T&& r) -> decltype(end(std::forward<T>(r))) +{ +    return end(std::forward<T>(r)); +} +} // namespace ranges + +template<class T> using void_t = void; + +template<class Iter> +using iter_value_t = typename std::iterator_traits<Iter>::value_type; + +template<class Range> +using range_iter_t = decltype( +  ranges::begin(std::declval<typename std::remove_reference<Range>::type &>())); + +template<class Range> +using range_value_t = iter_value_t<range_iter_t<Range>>; + +template<class T, class = void> struct is_range : std::false_type +{ +}; + +template<class T> +struct is_range< +  T, +  void_t<decltype( +    ranges::begin(std::declval<typename std::remove_reference<T>::type &>()) +    == ranges::end(std::declval<typename std::remove_reference<T>::type &>()))>> +    : std::true_type +{ +}; + +} // namespace detail +#endif + +typedef zmq_free_fn free_fn; +typedef zmq_pollitem_t pollitem_t; + +class error_t : public std::exception +{ +  public: +    error_t() : errnum(zmq_errno()) {} +    virtual const char *what() const ZMQ_NOTHROW ZMQ_OVERRIDE { return zmq_strerror(errnum); } +    int num() const { return errnum; } + +  private: +    int errnum; +}; + +inline int poll(zmq_pollitem_t *items_, size_t nitems_, long timeout_ = -1) +{ +    int rc = zmq_poll(items_, static_cast<int>(nitems_), timeout_); +    if (rc < 0) +        throw error_t(); +    return rc; +} + +ZMQ_DEPRECATED("from 4.3.1, use poll taking non-const items") +inline int poll(zmq_pollitem_t const *items_, size_t nitems_, long timeout_ = -1) +{ +    return poll(const_cast<zmq_pollitem_t *>(items_), nitems_, timeout_); +} + +#ifdef ZMQ_CPP11 +ZMQ_DEPRECATED("from 4.3.1, use poll taking non-const items") +inline int +poll(zmq_pollitem_t const *items, size_t nitems, std::chrono::milliseconds timeout) +{ +    return poll(const_cast<zmq_pollitem_t *>(items), nitems, static_cast<long>(timeout.count())); +} + +ZMQ_DEPRECATED("from 4.3.1, use poll taking non-const items") +inline int poll(std::vector<zmq_pollitem_t> const &items, +                std::chrono::milliseconds timeout) +{ +    return poll(const_cast<zmq_pollitem_t *>(items.data()), items.size(), static_cast<long>(timeout.count())); +} + +ZMQ_DEPRECATED("from 4.3.1, use poll taking non-const items") +inline int poll(std::vector<zmq_pollitem_t> const &items, long timeout_ = -1) +{ +    return poll(const_cast<zmq_pollitem_t *>(items.data()), items.size(), timeout_); +} + +inline int +poll(zmq_pollitem_t *items, size_t nitems, std::chrono::milliseconds timeout) +{ +    return poll(items, nitems, static_cast<long>(timeout.count())); +} + +inline int poll(std::vector<zmq_pollitem_t> &items, +                std::chrono::milliseconds timeout) +{ +    return poll(items.data(), items.size(), static_cast<long>(timeout.count())); +} + +inline int poll(std::vector<zmq_pollitem_t> &items, long timeout_ = -1) +{ +    return poll(items.data(), items.size(), timeout_); +} +#endif + + +inline void version(int *major_, int *minor_, int *patch_) +{ +    zmq_version(major_, minor_, patch_); +} + +#ifdef ZMQ_CPP11 +inline std::tuple<int, int, int> version() +{ +    std::tuple<int, int, int> v; +    zmq_version(&std::get<0>(v), &std::get<1>(v), &std::get<2>(v)); +    return v; +} +#endif + +class message_t +{ +  public: +    message_t() ZMQ_NOTHROW +    { +        int rc = zmq_msg_init(&msg); +        ZMQ_ASSERT(rc == 0); +    } + +    explicit message_t(size_t size_) +    { +        int rc = zmq_msg_init_size(&msg, size_); +        if (rc != 0) +            throw error_t(); +    } + +    template<class ForwardIter> message_t(ForwardIter first, ForwardIter last) +    { +        typedef typename std::iterator_traits<ForwardIter>::value_type value_t; + +        assert(std::distance(first, last) >= 0); +        size_t const size_ = +          static_cast<size_t>(std::distance(first, last)) * sizeof(value_t); +        int const rc = zmq_msg_init_size(&msg, size_); +        if (rc != 0) +            throw error_t(); +        std::copy(first, last, data<value_t>()); +    } + +    message_t(const void *data_, size_t size_) +    { +        int rc = zmq_msg_init_size(&msg, size_); +        if (rc != 0) +            throw error_t(); +        memcpy(data(), data_, size_); +    } + +    message_t(void *data_, size_t size_, free_fn *ffn_, void *hint_ = ZMQ_NULLPTR) +    { +        int rc = zmq_msg_init_data(&msg, data_, size_, ffn_, hint_); +        if (rc != 0) +            throw error_t(); +    } + +#if defined(ZMQ_CPP11) && !defined(ZMQ_CPP11_PARTIAL) +    template<class Range, +             typename = typename std::enable_if< +               detail::is_range<Range>::value +               && ZMQ_IS_TRIVIALLY_COPYABLE(detail::range_value_t<Range>) +               && !std::is_same<Range, message_t>::value>::type> +    explicit message_t(const Range &rng) : +        message_t(detail::ranges::begin(rng), detail::ranges::end(rng)) +    { +    } +#endif + +#ifdef ZMQ_HAS_RVALUE_REFS +    message_t(message_t &&rhs) ZMQ_NOTHROW : msg(rhs.msg) +    { +        int rc = zmq_msg_init(&rhs.msg); +        ZMQ_ASSERT(rc == 0); +    } + +    message_t &operator=(message_t &&rhs) ZMQ_NOTHROW +    { +        std::swap(msg, rhs.msg); +        return *this; +    } +#endif + +    ~message_t() ZMQ_NOTHROW +    { +        int rc = zmq_msg_close(&msg); +        ZMQ_ASSERT(rc == 0); +    } + +    void rebuild() +    { +        int rc = zmq_msg_close(&msg); +        if (rc != 0) +            throw error_t(); +        rc = zmq_msg_init(&msg); +        ZMQ_ASSERT(rc == 0); +    } + +    void rebuild(size_t size_) +    { +        int rc = zmq_msg_close(&msg); +        if (rc != 0) +            throw error_t(); +        rc = zmq_msg_init_size(&msg, size_); +        if (rc != 0) +            throw error_t(); +    } + +    void rebuild(const void *data_, size_t size_) +    { +        int rc = zmq_msg_close(&msg); +        if (rc != 0) +            throw error_t(); +        rc = zmq_msg_init_size(&msg, size_); +        if (rc != 0) +            throw error_t(); +        memcpy(data(), data_, size_); +    } + +    void rebuild(void *data_, size_t size_, free_fn *ffn_, void *hint_ = ZMQ_NULLPTR) +    { +        int rc = zmq_msg_close(&msg); +        if (rc != 0) +            throw error_t(); +        rc = zmq_msg_init_data(&msg, data_, size_, ffn_, hint_); +        if (rc != 0) +            throw error_t(); +    } + +    ZMQ_DEPRECATED("from 4.3.1, use move taking non-const reference instead") +    void move(message_t const *msg_) +    { +        int rc = zmq_msg_move(&msg, const_cast<zmq_msg_t *>(msg_->handle())); +        if (rc != 0) +            throw error_t(); +    } + +    void move(message_t &msg_) +    { +        int rc = zmq_msg_move(&msg, msg_.handle()); +        if (rc != 0) +            throw error_t(); +    } + +    ZMQ_DEPRECATED("from 4.3.1, use copy taking non-const reference instead") +    void copy(message_t const *msg_) +    { +        int rc = zmq_msg_copy(&msg, const_cast<zmq_msg_t *>(msg_->handle())); +        if (rc != 0) +            throw error_t(); +    } + +    void copy(message_t &msg_) +    { +        int rc = zmq_msg_copy(&msg, msg_.handle()); +        if (rc != 0) +            throw error_t(); +    } + +    bool more() const ZMQ_NOTHROW +    { +        int rc = zmq_msg_more(const_cast<zmq_msg_t *>(&msg)); +        return rc != 0; +    } + +    void *data() ZMQ_NOTHROW { return zmq_msg_data(&msg); } + +    const void *data() const ZMQ_NOTHROW +    { +        return zmq_msg_data(const_cast<zmq_msg_t *>(&msg)); +    } + +    size_t size() const ZMQ_NOTHROW +    { +        return zmq_msg_size(const_cast<zmq_msg_t *>(&msg)); +    } + +    ZMQ_NODISCARD bool empty() const ZMQ_NOTHROW +    { +        return size() == 0u; +    } + +    template<typename T> T *data() ZMQ_NOTHROW { return static_cast<T *>(data()); } + +    template<typename T> T const *data() const ZMQ_NOTHROW +    { +        return static_cast<T const *>(data()); +    } + +    ZMQ_DEPRECATED("from 4.3.0, use operator== instead") +    bool equal(const message_t *other) const ZMQ_NOTHROW +    { +        return *this == *other; +    } + +    bool operator==(const message_t &other) const ZMQ_NOTHROW +    { +        const size_t my_size = size(); +        return my_size == other.size() && 0 == memcmp(data(), other.data(), my_size); +    } + +    bool operator!=(const message_t &other) const ZMQ_NOTHROW +    { +        return !(*this == other); +    } + +#if ZMQ_VERSION >= ZMQ_MAKE_VERSION(3, 2, 0) +    int get(int property_) +    { +        int value = zmq_msg_get(&msg, property_); +        if (value == -1) +            throw error_t(); +        return value; +    } +#endif + +#if ZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 1, 0) +    const char *gets(const char *property_) +    { +        const char *value = zmq_msg_gets(&msg, property_); +        if (value == ZMQ_NULLPTR) +            throw error_t(); +        return value; +    } +#endif + +#if defined(ZMQ_BUILD_DRAFT_API) && ZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 2, 0) +    uint32_t routing_id() const +    { +        return zmq_msg_routing_id(const_cast<zmq_msg_t*>(&msg)); +    } + +    void set_routing_id(uint32_t routing_id) +    { +        int rc = zmq_msg_set_routing_id(&msg, routing_id); +        if (rc != 0) +            throw error_t(); +    } + +    const char* group() const +    { +        return zmq_msg_group(const_cast<zmq_msg_t*>(&msg)); +    } + +    void set_group(const char* group) +    { +        int rc = zmq_msg_set_group(&msg, group); +        if (rc != 0) +            throw error_t(); +    } +#endif + +    // interpret message content as a string +    std::string to_string() const +    { +        return std::string(static_cast<const char*>(data()), size()); +    } +#ifdef ZMQ_CPP17 +    // interpret message content as a string +    std::string_view to_string_view() const noexcept +    { +        return std::string_view(static_cast<const char*>(data()), size()); +    } +#endif + +    /** Dump content to string for debugging. +    *   Ascii chars are readable, the rest is printed as hex. +    *   Probably ridiculously slow. +    *   Use to_string() or to_string_view() for +    *   interpreting the message as a string. +    */ +    std::string str() const +    { +        // Partly mutuated from the same method in zmq::multipart_t +        std::stringstream os; + +        const unsigned char *msg_data = this->data<unsigned char>(); +        unsigned char byte; +        size_t size = this->size(); +        int is_ascii[2] = {0, 0}; + +        os << "zmq::message_t [size " << std::dec << std::setw(3) +           << std::setfill('0') << size << "] ("; +        // Totally arbitrary +        if (size >= 1000) { +            os << "... too big to print)"; +        } else { +            while (size--) { +                byte = *msg_data++; + +                is_ascii[1] = (byte >= 32 && byte < 127); +                if (is_ascii[1] != is_ascii[0]) +                    os << " "; // Separate text/non text + +                if (is_ascii[1]) { +                    os << byte; +                } else { +                    os << std::hex << std::uppercase << std::setw(2) +                       << std::setfill('0') << static_cast<short>(byte); +                } +                is_ascii[0] = is_ascii[1]; +            } +            os << ")"; +        } +        return os.str(); +    } + +    void swap(message_t &other) ZMQ_NOTHROW +    { +        // this assumes zmq::msg_t from libzmq is trivially relocatable +        std::swap(msg, other.msg); +    } + +    ZMQ_NODISCARD zmq_msg_t *handle() ZMQ_NOTHROW { return &msg; } +    ZMQ_NODISCARD const zmq_msg_t *handle() const ZMQ_NOTHROW { return &msg; } + +  private: +    //  The underlying message +    zmq_msg_t msg; + +    //  Disable implicit message copying, so that users won't use shared +    //  messages (less efficient) without being aware of the fact. +    message_t(const message_t &) ZMQ_DELETED_FUNCTION; +    void operator=(const message_t &) ZMQ_DELETED_FUNCTION; +}; + +inline void swap(message_t &a, message_t &b) ZMQ_NOTHROW +{ +    a.swap(b); +} + +class context_t +{ +  public: +    context_t() +    { +        ptr = zmq_ctx_new(); +        if (ptr == ZMQ_NULLPTR) +            throw error_t(); +    } + + +    explicit context_t(int io_threads_, +                              int max_sockets_ = ZMQ_MAX_SOCKETS_DFLT) +    { +        ptr = zmq_ctx_new(); +        if (ptr == ZMQ_NULLPTR) +            throw error_t(); + +        int rc = zmq_ctx_set(ptr, ZMQ_IO_THREADS, io_threads_); +        ZMQ_ASSERT(rc == 0); + +        rc = zmq_ctx_set(ptr, ZMQ_MAX_SOCKETS, max_sockets_); +        ZMQ_ASSERT(rc == 0); +    } + +#ifdef ZMQ_HAS_RVALUE_REFS +    context_t(context_t &&rhs) ZMQ_NOTHROW : ptr(rhs.ptr) { rhs.ptr = ZMQ_NULLPTR; } +    context_t &operator=(context_t &&rhs) ZMQ_NOTHROW +    { +        close(); +        std::swap(ptr, rhs.ptr); +        return *this; +    } +#endif + +    int setctxopt(int option_, int optval_) +    { +        int rc = zmq_ctx_set(ptr, option_, optval_); +        ZMQ_ASSERT(rc == 0); +        return rc; +    } + +    int getctxopt(int option_) { return zmq_ctx_get(ptr, option_); } + +    ~context_t() ZMQ_NOTHROW { close(); } + +    void close() ZMQ_NOTHROW +    { +        if (ptr == ZMQ_NULLPTR) +            return; + +        int rc; +        do { +            rc = zmq_ctx_destroy(ptr); +        } while (rc == -1 && errno == EINTR); + +        ZMQ_ASSERT(rc == 0); +        ptr = ZMQ_NULLPTR; +    } + +    //  Be careful with this, it's probably only useful for +    //  using the C api together with an existing C++ api. +    //  Normally you should never need to use this. +    ZMQ_EXPLICIT operator void *() ZMQ_NOTHROW { return ptr; } + +    ZMQ_EXPLICIT operator void const *() const ZMQ_NOTHROW { return ptr; } + +    operator bool() const ZMQ_NOTHROW { return ptr != ZMQ_NULLPTR; } + +    void swap(context_t &other) ZMQ_NOTHROW +    { +        std::swap(ptr, other.ptr); +    } + +  private: +    void *ptr; + +    context_t(const context_t &) ZMQ_DELETED_FUNCTION; +    void operator=(const context_t &) ZMQ_DELETED_FUNCTION; +}; + +inline void swap(context_t &a, context_t &b) ZMQ_NOTHROW { +    a.swap(b); +} + +#ifdef ZMQ_CPP11 + +struct recv_buffer_size +{ +    size_t size;    // number of bytes written to buffer +    size_t untruncated_size;  // untruncated message size in bytes + +    ZMQ_NODISCARD bool truncated() const noexcept +    { +        return size != untruncated_size; +    } +}; + +#if defined(ZMQ_HAS_OPTIONAL) && (ZMQ_HAS_OPTIONAL > 0) + +using send_result_t = std::optional<size_t>; +using recv_result_t = std::optional<size_t>; +using recv_buffer_result_t = std::optional<recv_buffer_size>; + +#else + +namespace detail +{ +// A C++11 type emulating the most basic +// operations of std::optional for trivial types +template<class T> class trivial_optional +{ +  public: +    static_assert(std::is_trivial<T>::value, "T must be trivial"); +    using value_type = T; + +    trivial_optional() = default; +    trivial_optional(T value) noexcept : _value(value), _has_value(true) {} + +    const T *operator->() const noexcept +    { +        assert(_has_value); +        return &_value; +    } +    T *operator->() noexcept +    { +        assert(_has_value); +        return &_value; +    } + +    const T &operator*() const noexcept +    { +        assert(_has_value); +        return _value; +    } +    T &operator*() noexcept +    { +        assert(_has_value); +        return _value; +    } + +    T &value() +    { +        if (!_has_value) +            throw std::exception(); +        return _value; +    } +    const T &value() const +    { +        if (!_has_value) +            throw std::exception(); +        return _value; +    } + +    explicit operator bool() const noexcept { return _has_value; } +    bool has_value() const noexcept { return _has_value; } + +  private: +    T _value{}; +    bool _has_value{false}; +}; +} // namespace detail + +using send_result_t = detail::trivial_optional<size_t>; +using recv_result_t = detail::trivial_optional<size_t>; +using recv_buffer_result_t = detail::trivial_optional<recv_buffer_size>; + +#endif + +namespace detail +{ + +template<class T> +constexpr T enum_bit_or(T a, T b) noexcept +{ +    static_assert(std::is_enum<T>::value, "must be enum"); +    using U = typename std::underlying_type<T>::type; +    return static_cast<T>(static_cast<U>(a) | static_cast<U>(b)); +} +template<class T> +constexpr T enum_bit_and(T a, T b) noexcept +{ +    static_assert(std::is_enum<T>::value, "must be enum"); +    using U = typename std::underlying_type<T>::type; +    return static_cast<T>(static_cast<U>(a) & static_cast<U>(b)); +} +template<class T> +constexpr T enum_bit_xor(T a, T b) noexcept +{ +    static_assert(std::is_enum<T>::value, "must be enum"); +    using U = typename std::underlying_type<T>::type; +    return static_cast<T>(static_cast<U>(a) ^ static_cast<U>(b)); +} +template<class T> +constexpr T enum_bit_not(T a) noexcept +{ +    static_assert(std::is_enum<T>::value, "must be enum"); +    using U = typename std::underlying_type<T>::type; +    return static_cast<T>(~static_cast<U>(a)); +} +} // namespace detail + +// partially satisfies named requirement BitmaskType +enum class send_flags : int +{ +    none = 0, +    dontwait = ZMQ_DONTWAIT, +    sndmore = ZMQ_SNDMORE +}; + +constexpr send_flags operator|(send_flags a, send_flags b) noexcept +{ +    return detail::enum_bit_or(a, b); +} +constexpr send_flags operator&(send_flags a, send_flags b) noexcept +{ +    return detail::enum_bit_and(a, b); +} +constexpr send_flags operator^(send_flags a, send_flags b) noexcept +{ +    return detail::enum_bit_xor(a, b); +} +constexpr send_flags operator~(send_flags a) noexcept +{ +    return detail::enum_bit_not(a); +} + +// partially satisfies named requirement BitmaskType +enum class recv_flags : int +{ +    none = 0, +    dontwait = ZMQ_DONTWAIT +}; + +constexpr recv_flags operator|(recv_flags a, recv_flags b) noexcept +{ +    return detail::enum_bit_or(a, b); +} +constexpr recv_flags operator&(recv_flags a, recv_flags b) noexcept +{ +    return detail::enum_bit_and(a, b); +} +constexpr recv_flags operator^(recv_flags a, recv_flags b) noexcept +{ +    return detail::enum_bit_xor(a, b); +} +constexpr recv_flags operator~(recv_flags a) noexcept +{ +    return detail::enum_bit_not(a); +} + + +// mutable_buffer, const_buffer and buffer are based on +// the Networking TS specification, draft: +// http://www.open-std.org/jtc1/sc22/wg21/docs/papers/2018/n4771.pdf + +class mutable_buffer +{ +  public: +    constexpr mutable_buffer() noexcept : _data(nullptr), _size(0) {} +    constexpr mutable_buffer(void *p, size_t n) noexcept : _data(p), _size(n) +    { +#ifdef ZMQ_CPP14 +        assert(p != nullptr || n == 0); +#endif +    } + +    constexpr void *data() const noexcept { return _data; } +    constexpr size_t size() const noexcept { return _size; } +    mutable_buffer &operator+=(size_t n) noexcept +    { +        // (std::min) is a workaround for when a min macro is defined +        const auto shift = (std::min)(n, _size); +        _data = static_cast<char *>(_data) + shift; +        _size -= shift; +        return *this; +    } + +  private: +    void *_data; +    size_t _size; +}; + +inline mutable_buffer operator+(const mutable_buffer &mb, size_t n) noexcept +{ +    return mutable_buffer(static_cast<char *>(mb.data()) + (std::min)(n, mb.size()), +                          mb.size() - (std::min)(n, mb.size())); +} +inline mutable_buffer operator+(size_t n, const mutable_buffer &mb) noexcept +{ +    return mb + n; +} + +class const_buffer +{ +  public: +    constexpr const_buffer() noexcept : _data(nullptr), _size(0) {} +    constexpr const_buffer(const void *p, size_t n) noexcept : _data(p), _size(n) +    { +#ifdef ZMQ_CPP14 +        assert(p != nullptr || n == 0); +#endif +    } +    constexpr const_buffer(const mutable_buffer &mb) noexcept : +        _data(mb.data()), +        _size(mb.size()) +    { +    } + +    constexpr const void *data() const noexcept { return _data; } +    constexpr size_t size() const noexcept { return _size; } +    const_buffer &operator+=(size_t n) noexcept +    { +        const auto shift = (std::min)(n, _size); +        _data = static_cast<const char *>(_data) + shift; +        _size -= shift; +        return *this; +    } + +  private: +    const void *_data; +    size_t _size; +}; + +inline const_buffer operator+(const const_buffer &cb, size_t n) noexcept +{ +    return const_buffer(static_cast<const char *>(cb.data()) +                          + (std::min)(n, cb.size()), +                        cb.size() - (std::min)(n, cb.size())); +} +inline const_buffer operator+(size_t n, const const_buffer &cb) noexcept +{ +    return cb + n; +} + +// buffer creation + +constexpr mutable_buffer buffer(void* p, size_t n) noexcept +{ +    return mutable_buffer(p, n); +} +constexpr const_buffer buffer(const void* p, size_t n) noexcept +{ +    return const_buffer(p, n); +} +constexpr mutable_buffer buffer(const mutable_buffer& mb) noexcept +{ +    return mb; +} +inline mutable_buffer buffer(const mutable_buffer& mb, size_t n) noexcept +{ +    return mutable_buffer(mb.data(), (std::min)(mb.size(), n)); +} +constexpr const_buffer buffer(const const_buffer& cb) noexcept +{ +    return cb; +} +inline const_buffer buffer(const const_buffer& cb, size_t n) noexcept +{ +    return const_buffer(cb.data(), (std::min)(cb.size(), n)); +} + +namespace detail +{ + +template<class T> +struct is_buffer +{ +    static constexpr bool value =  +        std::is_same<T, const_buffer>::value || +        std::is_same<T, mutable_buffer>::value; +}; + +template<class T> struct is_pod_like +{ +    // NOTE: The networking draft N4771 section 16.11 requires +    // T in the buffer functions below to be +    // trivially copyable OR standard layout. +    // Here we decide to be conservative and require both. +    static constexpr bool value = +      ZMQ_IS_TRIVIALLY_COPYABLE(T) && std::is_standard_layout<T>::value; +}; + +template<class C> constexpr auto seq_size(const C &c) noexcept -> decltype(c.size()) +{ +    return c.size(); +} +template<class T, size_t N> +constexpr size_t seq_size(const T (&/*array*/)[N]) noexcept +{ +    return N; +} + +template<class Seq> +auto buffer_contiguous_sequence(Seq &&seq) noexcept +  -> decltype(buffer(std::addressof(*std::begin(seq)), size_t{})) +{ +    using T = typename std::remove_cv< +      typename std::remove_reference<decltype(*std::begin(seq))>::type>::type; +    static_assert(detail::is_pod_like<T>::value, "T must be POD"); + +    const auto size = seq_size(seq); +    return buffer(size != 0u ? std::addressof(*std::begin(seq)) : nullptr, +                  size * sizeof(T)); +} +template<class Seq> +auto buffer_contiguous_sequence(Seq &&seq, size_t n_bytes) noexcept +  -> decltype(buffer_contiguous_sequence(seq)) +{ +    using T = typename std::remove_cv< +      typename std::remove_reference<decltype(*std::begin(seq))>::type>::type; +    static_assert(detail::is_pod_like<T>::value, "T must be POD"); + +    const auto size = seq_size(seq); +    return buffer(size != 0u ? std::addressof(*std::begin(seq)) : nullptr, +                  (std::min)(size * sizeof(T), n_bytes)); +} + +} // namespace detail + +// C array +template<class T, size_t N> mutable_buffer buffer(T (&data)[N]) noexcept +{ +    return detail::buffer_contiguous_sequence(data); +} +template<class T, size_t N> +mutable_buffer buffer(T (&data)[N], size_t n_bytes) noexcept +{ +    return detail::buffer_contiguous_sequence(data, n_bytes); +} +template<class T, size_t N> const_buffer buffer(const T (&data)[N]) noexcept +{ +    return detail::buffer_contiguous_sequence(data); +} +template<class T, size_t N> +const_buffer buffer(const T (&data)[N], size_t n_bytes) noexcept +{ +    return detail::buffer_contiguous_sequence(data, n_bytes); +} +// std::array +template<class T, size_t N> mutable_buffer buffer(std::array<T, N> &data) noexcept +{ +    return detail::buffer_contiguous_sequence(data); +} +template<class T, size_t N> +mutable_buffer buffer(std::array<T, N> &data, size_t n_bytes) noexcept +{ +    return detail::buffer_contiguous_sequence(data, n_bytes); +} +template<class T, size_t N> +const_buffer buffer(std::array<const T, N> &data) noexcept +{ +    return detail::buffer_contiguous_sequence(data); +} +template<class T, size_t N> +const_buffer buffer(std::array<const T, N> &data, size_t n_bytes) noexcept +{ +    return detail::buffer_contiguous_sequence(data, n_bytes); +} +template<class T, size_t N> +const_buffer buffer(const std::array<T, N> &data) noexcept +{ +    return detail::buffer_contiguous_sequence(data); +} +template<class T, size_t N> +const_buffer buffer(const std::array<T, N> &data, size_t n_bytes) noexcept +{ +    return detail::buffer_contiguous_sequence(data, n_bytes); +} +// std::vector +template<class T, class Allocator> +mutable_buffer buffer(std::vector<T, Allocator> &data) noexcept +{ +    return detail::buffer_contiguous_sequence(data); +} +template<class T, class Allocator> +mutable_buffer buffer(std::vector<T, Allocator> &data, size_t n_bytes) noexcept +{ +    return detail::buffer_contiguous_sequence(data, n_bytes); +} +template<class T, class Allocator> +const_buffer buffer(const std::vector<T, Allocator> &data) noexcept +{ +    return detail::buffer_contiguous_sequence(data); +} +template<class T, class Allocator> +const_buffer buffer(const std::vector<T, Allocator> &data, size_t n_bytes) noexcept +{ +    return detail::buffer_contiguous_sequence(data, n_bytes); +} +// std::basic_string +template<class T, class Traits, class Allocator> +mutable_buffer buffer(std::basic_string<T, Traits, Allocator> &data) noexcept +{ +    return detail::buffer_contiguous_sequence(data); +} +template<class T, class Traits, class Allocator> +mutable_buffer buffer(std::basic_string<T, Traits, Allocator> &data, +                      size_t n_bytes) noexcept +{ +    return detail::buffer_contiguous_sequence(data, n_bytes); +} +template<class T, class Traits, class Allocator> +const_buffer buffer(const std::basic_string<T, Traits, Allocator> &data) noexcept +{ +    return detail::buffer_contiguous_sequence(data); +} +template<class T, class Traits, class Allocator> +const_buffer buffer(const std::basic_string<T, Traits, Allocator> &data, +                    size_t n_bytes) noexcept +{ +    return detail::buffer_contiguous_sequence(data, n_bytes); +} + +#if defined(ZMQ_HAS_STRING_VIEW) && (ZMQ_HAS_STRING_VIEW > 0) +// std::basic_string_view +template<class T, class Traits> +const_buffer buffer(std::basic_string_view<T, Traits> data) noexcept +{ +    return detail::buffer_contiguous_sequence(data); +} +template<class T, class Traits> +const_buffer buffer(std::basic_string_view<T, Traits> data, size_t n_bytes) noexcept +{ +    return detail::buffer_contiguous_sequence(data, n_bytes); +} +#endif + +// Buffer for a string literal (null terminated) +// where the buffer size excludes the terminating character. +// Equivalent to zmq::buffer(std::string_view("...")). +template<class Char, size_t N> +constexpr const_buffer str_buffer(const Char (&data)[N]) noexcept +{ +    static_assert(detail::is_pod_like<Char>::value, "Char must be POD"); +#ifdef ZMQ_CPP14 +    assert(data[N - 1] == Char{0}); +#endif +    return const_buffer(static_cast<const Char*>(data), +                        (N - 1) * sizeof(Char)); +} + +namespace literals +{ +    constexpr const_buffer operator"" _zbuf(const char* str, size_t len) noexcept +    { +        return const_buffer(str, len * sizeof(char)); +    } +    constexpr const_buffer operator"" _zbuf(const wchar_t* str, size_t len) noexcept +    { +        return const_buffer(str, len * sizeof(wchar_t)); +    } +    constexpr const_buffer operator"" _zbuf(const char16_t* str, size_t len) noexcept +    { +        return const_buffer(str, len * sizeof(char16_t)); +    } +    constexpr const_buffer operator"" _zbuf(const char32_t* str, size_t len) noexcept +    { +        return const_buffer(str, len * sizeof(char32_t)); +    } +} + +#endif // ZMQ_CPP11 + +namespace detail +{ + +class socket_base +{ +public: +    socket_base() ZMQ_NOTHROW : _handle(ZMQ_NULLPTR) {} +    ZMQ_EXPLICIT socket_base(void *handle) ZMQ_NOTHROW : _handle(handle) {} + +    template<typename T> void setsockopt(int option_, T const &optval) +    { +        setsockopt(option_, &optval, sizeof(T)); +    } + +    void setsockopt(int option_, const void *optval_, size_t optvallen_) +    { +        int rc = zmq_setsockopt(_handle, option_, optval_, optvallen_); +        if (rc != 0) +            throw error_t(); +    } + +    void getsockopt(int option_, void *optval_, size_t *optvallen_) const +    { +        int rc = zmq_getsockopt(_handle, option_, optval_, optvallen_); +        if (rc != 0) +            throw error_t(); +    } + +    template<typename T> T getsockopt(int option_) const +    { +        T optval; +        size_t optlen = sizeof(T); +        getsockopt(option_, &optval, &optlen); +        return optval; +    } + +    void bind(std::string const &addr) { bind(addr.c_str()); } + +    void bind(const char *addr_) +    { +        int rc = zmq_bind(_handle, addr_); +        if (rc != 0) +            throw error_t(); +    } + +    void unbind(std::string const &addr) { unbind(addr.c_str()); } + +    void unbind(const char *addr_) +    { +        int rc = zmq_unbind(_handle, addr_); +        if (rc != 0) +            throw error_t(); +    } + +    void connect(std::string const &addr) { connect(addr.c_str()); } + +    void connect(const char *addr_) +    { +        int rc = zmq_connect(_handle, addr_); +        if (rc != 0) +            throw error_t(); +    } + +    void disconnect(std::string const &addr) { disconnect(addr.c_str()); } + +    void disconnect(const char *addr_) +    { +        int rc = zmq_disconnect(_handle, addr_); +        if (rc != 0) +            throw error_t(); +    } + +    bool connected() const ZMQ_NOTHROW { return (_handle != ZMQ_NULLPTR); } + +#ifdef ZMQ_CPP11 +    ZMQ_DEPRECATED("from 4.3.1, use send taking a const_buffer and send_flags") +#endif +    size_t send(const void *buf_, size_t len_, int flags_ = 0) +    { +        int nbytes = zmq_send(_handle, buf_, len_, flags_); +        if (nbytes >= 0) +            return static_cast<size_t>(nbytes); +        if (zmq_errno() == EAGAIN) +            return 0; +        throw error_t(); +    } + +#ifdef ZMQ_CPP11 +    ZMQ_DEPRECATED("from 4.3.1, use send taking message_t and send_flags") +#endif +    bool send(message_t &msg_, +              int flags_ = 0) // default until removed +    { +        int nbytes = zmq_msg_send(msg_.handle(), _handle, flags_); +        if (nbytes >= 0) +            return true; +        if (zmq_errno() == EAGAIN) +            return false; +        throw error_t(); +    } + +    template<typename T> +#ifdef ZMQ_CPP11 +    ZMQ_DEPRECATED("from 4.4.1, use send taking message_t or buffer (for contiguous ranges), and send_flags") +#endif +    bool send(T first, T last, int flags_ = 0) +    { +        zmq::message_t msg(first, last); +        int nbytes = zmq_msg_send(msg.handle(), _handle, flags_); +        if (nbytes >= 0) +            return true; +        if (zmq_errno() == EAGAIN) +            return false; +        throw error_t(); +    } + +#ifdef ZMQ_HAS_RVALUE_REFS +#ifdef ZMQ_CPP11 +    ZMQ_DEPRECATED("from 4.3.1, use send taking message_t and send_flags") +#endif +    bool send(message_t &&msg_, +              int flags_ = 0) // default until removed +    { +        #ifdef ZMQ_CPP11 +        return send(msg_, static_cast<send_flags>(flags_)).has_value(); +        #else +        return send(msg_, flags_); +        #endif +    } +#endif + +#ifdef ZMQ_CPP11 +    send_result_t send(const_buffer buf, send_flags flags = send_flags::none) +    { +        const int nbytes = +          zmq_send(_handle, buf.data(), buf.size(), static_cast<int>(flags)); +        if (nbytes >= 0) +            return static_cast<size_t>(nbytes); +        if (zmq_errno() == EAGAIN) +            return {}; +        throw error_t(); +    } + +    send_result_t send(message_t &msg, send_flags flags) +    { +        int nbytes = zmq_msg_send(msg.handle(), _handle, static_cast<int>(flags)); +        if (nbytes >= 0) +            return static_cast<size_t>(nbytes); +        if (zmq_errno() == EAGAIN) +            return {}; +        throw error_t(); +    } + +    send_result_t send(message_t &&msg, send_flags flags) +    { +        return send(msg, flags); +    } +#endif + +#ifdef ZMQ_CPP11 +    ZMQ_DEPRECATED("from 4.3.1, use recv taking a mutable_buffer and recv_flags") +#endif +    size_t recv(void *buf_, size_t len_, int flags_ = 0) +    { +        int nbytes = zmq_recv(_handle, buf_, len_, flags_); +        if (nbytes >= 0) +            return static_cast<size_t>(nbytes); +        if (zmq_errno() == EAGAIN) +            return 0; +        throw error_t(); +    } + +#ifdef ZMQ_CPP11 +    ZMQ_DEPRECATED("from 4.3.1, use recv taking a reference to message_t and recv_flags") +#endif +    bool recv(message_t *msg_, int flags_ = 0) +    { +        int nbytes = zmq_msg_recv(msg_->handle(), _handle, flags_); +        if (nbytes >= 0) +            return true; +        if (zmq_errno() == EAGAIN) +            return false; +        throw error_t(); +    } + +#ifdef ZMQ_CPP11 +    ZMQ_NODISCARD +    recv_buffer_result_t recv(mutable_buffer buf, +                                    recv_flags flags = recv_flags::none) +    { +        const int nbytes = +          zmq_recv(_handle, buf.data(), buf.size(), static_cast<int>(flags)); +        if (nbytes >= 0) { +            return recv_buffer_size{(std::min)(static_cast<size_t>(nbytes), buf.size()), +                                 static_cast<size_t>(nbytes)}; +        } +        if (zmq_errno() == EAGAIN) +            return {}; +        throw error_t(); +    } + +    ZMQ_NODISCARD +    recv_result_t recv(message_t &msg, recv_flags flags = recv_flags::none) +    { +        const int nbytes = zmq_msg_recv(msg.handle(), _handle, static_cast<int>(flags)); +        if (nbytes >= 0) { +            assert(msg.size() == static_cast<size_t>(nbytes)); +            return static_cast<size_t>(nbytes); +        } +        if (zmq_errno() == EAGAIN) +            return {}; +        throw error_t(); +    } +#endif + +#if defined(ZMQ_BUILD_DRAFT_API) && ZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 2, 0) +    void join(const char* group) +    { +        int rc = zmq_join(_handle, group); +        if (rc != 0) +            throw error_t(); +    } + +    void leave(const char* group) +    { +        int rc = zmq_leave(_handle, group); +        if (rc != 0) +            throw error_t(); +    } +#endif + +    ZMQ_NODISCARD void *handle() ZMQ_NOTHROW { return _handle; } +    ZMQ_NODISCARD const void *handle() const ZMQ_NOTHROW { return _handle; } + +    ZMQ_EXPLICIT operator bool() const ZMQ_NOTHROW { return _handle != ZMQ_NULLPTR; } +    // note: non-const operator bool can be removed once +    // operator void* is removed from socket_t +    ZMQ_EXPLICIT operator bool() ZMQ_NOTHROW { return _handle != ZMQ_NULLPTR; } + +protected: +    void *_handle; +}; +} // namespace detail + +#ifdef ZMQ_CPP11 +enum class socket_type : int +{ +    req = ZMQ_REQ, +    rep = ZMQ_REP, +    dealer = ZMQ_DEALER, +    router = ZMQ_ROUTER, +    pub = ZMQ_PUB, +    sub = ZMQ_SUB, +    xpub = ZMQ_XPUB, +    xsub = ZMQ_XSUB, +    push = ZMQ_PUSH, +    pull = ZMQ_PULL, +#if defined(ZMQ_BUILD_DRAFT_API) && ZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 2, 0) +    server = ZMQ_SERVER, +    client = ZMQ_CLIENT, +    radio = ZMQ_RADIO, +    dish = ZMQ_DISH, +#endif +#if ZMQ_VERSION_MAJOR >= 4 +    stream = ZMQ_STREAM, +#endif +    pair = ZMQ_PAIR +}; +#endif + +struct from_handle_t +{ +    struct _private {}; // disabling use other than with from_handle +    ZMQ_CONSTEXPR_FN ZMQ_EXPLICIT from_handle_t(_private /*p*/) ZMQ_NOTHROW {} +}; + +ZMQ_CONSTEXPR_VAR from_handle_t from_handle = from_handle_t(from_handle_t::_private()); + +// A non-owning nullable reference to a socket. +// The reference is invalidated on socket close or destruction. +class socket_ref : public detail::socket_base +{ +  public: +    socket_ref() ZMQ_NOTHROW : detail::socket_base() {} +#ifdef ZMQ_CPP11 +    socket_ref(std::nullptr_t) ZMQ_NOTHROW : detail::socket_base() {} +#endif +    socket_ref(from_handle_t /*fh*/, void *handle) ZMQ_NOTHROW +        : detail::socket_base(handle) {} +}; + +#ifdef ZMQ_CPP11 +inline bool operator==(socket_ref sr, std::nullptr_t /*p*/) ZMQ_NOTHROW +{ +    return sr.handle() == nullptr; +} +inline bool operator==(std::nullptr_t /*p*/, socket_ref sr) ZMQ_NOTHROW +{ +    return sr.handle() == nullptr; +} +inline bool operator!=(socket_ref sr, std::nullptr_t /*p*/) ZMQ_NOTHROW +{ +    return !(sr == nullptr); +} +inline bool operator!=(std::nullptr_t /*p*/, socket_ref sr) ZMQ_NOTHROW +{ +    return !(sr == nullptr); +} +#endif + +inline bool operator==(socket_ref a, socket_ref b) ZMQ_NOTHROW +{ +    return std::equal_to<void*>()(a.handle(), b.handle()); +} +inline bool operator!=(socket_ref a, socket_ref b) ZMQ_NOTHROW +{ +    return !(a == b); +} +inline bool operator<(socket_ref a, socket_ref b) ZMQ_NOTHROW +{ +    return std::less<void*>()(a.handle(), b.handle()); +} +inline bool operator>(socket_ref a, socket_ref b) ZMQ_NOTHROW +{ +    return b < a; +} +inline bool operator<=(socket_ref a, socket_ref b) ZMQ_NOTHROW +{ +    return !(a > b); +} +inline bool operator>=(socket_ref a, socket_ref b) ZMQ_NOTHROW +{ +    return !(a < b); +} + +} // namespace zmq + +#ifdef ZMQ_CPP11 +namespace std +{ +template<> +struct hash<zmq::socket_ref> +{ +    size_t operator()(zmq::socket_ref sr) const ZMQ_NOTHROW +    { +        return hash<void*>()(sr.handle()); +    } +}; +} // namespace std +#endif + +namespace zmq +{ + +class socket_t : public detail::socket_base +{ +    friend class monitor_t; + +  public: +    socket_t() ZMQ_NOTHROW +      : detail::socket_base(ZMQ_NULLPTR) +      , ctxptr(ZMQ_NULLPTR) +    { +    } + +    socket_t(context_t &context_, int type_) +        : detail::socket_base(zmq_socket(static_cast<void*>(context_), type_)) +        , ctxptr(static_cast<void*>(context_)) +    { +        if (_handle == ZMQ_NULLPTR) +            throw error_t(); +    } + +#ifdef ZMQ_CPP11 +    socket_t(context_t &context_, socket_type type_) +        : socket_t(context_, static_cast<int>(type_)) +    { +    } +#endif + +#ifdef ZMQ_HAS_RVALUE_REFS +    socket_t(socket_t &&rhs) ZMQ_NOTHROW : detail::socket_base(rhs._handle), ctxptr(rhs.ctxptr) +    { +        rhs._handle = ZMQ_NULLPTR; +        rhs.ctxptr = ZMQ_NULLPTR; +    } +    socket_t &operator=(socket_t &&rhs) ZMQ_NOTHROW +    { +        close(); +        std::swap(_handle, rhs._handle); +        return *this; +    } +#endif + +    ~socket_t() ZMQ_NOTHROW { close(); } + +    operator void *() ZMQ_NOTHROW { return _handle; } + +    operator void const *() const ZMQ_NOTHROW { return _handle; } + +    void close() ZMQ_NOTHROW +    { +        if (_handle == ZMQ_NULLPTR) +            // already closed +            return; +        int rc = zmq_close(_handle); +        ZMQ_ASSERT(rc == 0); +        _handle = ZMQ_NULLPTR; +    } + +    void swap(socket_t &other) ZMQ_NOTHROW +    { +        std::swap(_handle, other._handle); +        std::swap(ctxptr, other.ctxptr); +    } + +    operator socket_ref() ZMQ_NOTHROW +    { +        return socket_ref(from_handle, _handle); +    } + +  private: +    void *ctxptr; + +    socket_t(const socket_t &) ZMQ_DELETED_FUNCTION; +    void operator=(const socket_t &) ZMQ_DELETED_FUNCTION; + +    // used by monitor_t +    socket_t(void *context_, int type_) +        : detail::socket_base(zmq_socket(context_, type_)) +        , ctxptr(context_) +    { +        if (_handle == ZMQ_NULLPTR) +            throw error_t(); +    } +}; + +inline void swap(socket_t &a, socket_t &b) ZMQ_NOTHROW { +    a.swap(b); +} + +ZMQ_DEPRECATED("from 4.3.1, use proxy taking socket_t objects") +inline void proxy(void *frontend, void *backend, void *capture) +{ +    int rc = zmq_proxy(frontend, backend, capture); +    if (rc != 0) +        throw error_t(); +} + +inline void +proxy(socket_ref frontend, socket_ref backend, socket_ref capture = socket_ref()) +{ +    int rc = zmq_proxy(frontend.handle(), backend.handle(), capture.handle()); +    if (rc != 0) +        throw error_t(); +} + +#ifdef ZMQ_HAS_PROXY_STEERABLE +ZMQ_DEPRECATED("from 4.3.1, use proxy_steerable taking socket_t objects") +inline void +proxy_steerable(void *frontend, void *backend, void *capture, void *control) +{ +    int rc = zmq_proxy_steerable(frontend, backend, capture, control); +    if (rc != 0) +        throw error_t(); +} + +inline void proxy_steerable(socket_ref frontend, +                            socket_ref backend, +                            socket_ref capture, +                            socket_ref control) +{ +    int rc = zmq_proxy_steerable(frontend.handle(), backend.handle(), +                                 capture.handle(), control.handle()); +    if (rc != 0) +        throw error_t(); +} +#endif + +class monitor_t +{ +  public: +    monitor_t() : _socket(), _monitor_socket() {} + +    virtual ~monitor_t() +    { +        close(); +    } + +#ifdef ZMQ_HAS_RVALUE_REFS +    monitor_t(monitor_t &&rhs) ZMQ_NOTHROW : _socket(), _monitor_socket() +    { +        std::swap(_socket, rhs._socket); +        std::swap(_monitor_socket, rhs._monitor_socket); +    } + +    monitor_t &operator=(monitor_t &&rhs) ZMQ_NOTHROW +    { +        close(); +        _socket = socket_ref(); +        std::swap(_socket, rhs._socket); +        std::swap(_monitor_socket, rhs._monitor_socket); +        return *this; +    } +#endif + + +    void +    monitor(socket_t &socket, std::string const &addr, int events = ZMQ_EVENT_ALL) +    { +        monitor(socket, addr.c_str(), events); +    } + +    void monitor(socket_t &socket, const char *addr_, int events = ZMQ_EVENT_ALL) +    { +        init(socket, addr_, events); +        while (true) { +            check_event(-1); +        } +    } + +    void init(socket_t &socket, std::string const &addr, int events = ZMQ_EVENT_ALL) +    { +        init(socket, addr.c_str(), events); +    } + +    void init(socket_t &socket, const char *addr_, int events = ZMQ_EVENT_ALL) +    { +        int rc = zmq_socket_monitor(socket.handle(), addr_, events); +        if (rc != 0) +            throw error_t(); + +        _socket = socket; +        _monitor_socket = socket_t(socket.ctxptr, ZMQ_PAIR); +        _monitor_socket.connect(addr_); + +        on_monitor_started(); +    } + +    bool check_event(int timeout = 0) +    { +        assert(_monitor_socket); + +        zmq_msg_t eventMsg; +        zmq_msg_init(&eventMsg); + +        zmq::pollitem_t items[] = { +          {_monitor_socket.handle(), 0, ZMQ_POLLIN, 0}, +        }; + +        zmq::poll(&items[0], 1, timeout); + +        if (items[0].revents & ZMQ_POLLIN) { +            int rc = zmq_msg_recv(&eventMsg, _monitor_socket.handle(), 0); +            if (rc == -1 && zmq_errno() == ETERM) +                return false; +            assert(rc != -1); + +        } else { +            zmq_msg_close(&eventMsg); +            return false; +        } + +#if ZMQ_VERSION_MAJOR >= 4 +        const char *data = static_cast<const char *>(zmq_msg_data(&eventMsg)); +        zmq_event_t msgEvent; +        memcpy(&msgEvent.event, data, sizeof(uint16_t)); +        data += sizeof(uint16_t); +        memcpy(&msgEvent.value, data, sizeof(int32_t)); +        zmq_event_t *event = &msgEvent; +#else +        zmq_event_t *event = static_cast<zmq_event_t *>(zmq_msg_data(&eventMsg)); +#endif + +#ifdef ZMQ_NEW_MONITOR_EVENT_LAYOUT +        zmq_msg_t addrMsg; +        zmq_msg_init(&addrMsg); +        int rc = zmq_msg_recv(&addrMsg, _monitor_socket.handle(), 0); +        if (rc == -1 && zmq_errno() == ETERM) { +            zmq_msg_close(&eventMsg); +            return false; +        } + +        assert(rc != -1); +        const char *str = static_cast<const char *>(zmq_msg_data(&addrMsg)); +        std::string address(str, str + zmq_msg_size(&addrMsg)); +        zmq_msg_close(&addrMsg); +#else +        // Bit of a hack, but all events in the zmq_event_t union have the same layout so this will work for all event types. +        std::string address = event->data.connected.addr; +#endif + +#ifdef ZMQ_EVENT_MONITOR_STOPPED +        if (event->event == ZMQ_EVENT_MONITOR_STOPPED) { +            zmq_msg_close(&eventMsg); +            return false; +        } + +#endif + +        switch (event->event) { +            case ZMQ_EVENT_CONNECTED: +                on_event_connected(*event, address.c_str()); +                break; +            case ZMQ_EVENT_CONNECT_DELAYED: +                on_event_connect_delayed(*event, address.c_str()); +                break; +            case ZMQ_EVENT_CONNECT_RETRIED: +                on_event_connect_retried(*event, address.c_str()); +                break; +            case ZMQ_EVENT_LISTENING: +                on_event_listening(*event, address.c_str()); +                break; +            case ZMQ_EVENT_BIND_FAILED: +                on_event_bind_failed(*event, address.c_str()); +                break; +            case ZMQ_EVENT_ACCEPTED: +                on_event_accepted(*event, address.c_str()); +                break; +            case ZMQ_EVENT_ACCEPT_FAILED: +                on_event_accept_failed(*event, address.c_str()); +                break; +            case ZMQ_EVENT_CLOSED: +                on_event_closed(*event, address.c_str()); +                break; +            case ZMQ_EVENT_CLOSE_FAILED: +                on_event_close_failed(*event, address.c_str()); +                break; +            case ZMQ_EVENT_DISCONNECTED: +                on_event_disconnected(*event, address.c_str()); +                break; +#ifdef ZMQ_BUILD_DRAFT_API +#if ZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 2, 3) +            case ZMQ_EVENT_HANDSHAKE_FAILED_NO_DETAIL: +                on_event_handshake_failed_no_detail(*event, address.c_str()); +                break; +            case ZMQ_EVENT_HANDSHAKE_FAILED_PROTOCOL: +                on_event_handshake_failed_protocol(*event, address.c_str()); +                break; +            case ZMQ_EVENT_HANDSHAKE_FAILED_AUTH: +                on_event_handshake_failed_auth(*event, address.c_str()); +                break; +            case ZMQ_EVENT_HANDSHAKE_SUCCEEDED: +                on_event_handshake_succeeded(*event, address.c_str()); +                break; +#elif ZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 2, 1) +            case ZMQ_EVENT_HANDSHAKE_FAILED: +                on_event_handshake_failed(*event, address.c_str()); +                break; +            case ZMQ_EVENT_HANDSHAKE_SUCCEED: +                on_event_handshake_succeed(*event, address.c_str()); +                break; +#endif +#endif +            default: +                on_event_unknown(*event, address.c_str()); +                break; +        } +        zmq_msg_close(&eventMsg); + +        return true; +    } + +#ifdef ZMQ_EVENT_MONITOR_STOPPED +    void abort() +    { +        if (_socket) +            zmq_socket_monitor(_socket.handle(), ZMQ_NULLPTR, 0); + +        _socket = socket_ref(); +    } +#endif +    virtual void on_monitor_started() {} +    virtual void on_event_connected(const zmq_event_t &event_, const char *addr_) +    { +        (void) event_; +        (void) addr_; +    } +    virtual void on_event_connect_delayed(const zmq_event_t &event_, +                                          const char *addr_) +    { +        (void) event_; +        (void) addr_; +    } +    virtual void on_event_connect_retried(const zmq_event_t &event_, +                                          const char *addr_) +    { +        (void) event_; +        (void) addr_; +    } +    virtual void on_event_listening(const zmq_event_t &event_, const char *addr_) +    { +        (void) event_; +        (void) addr_; +    } +    virtual void on_event_bind_failed(const zmq_event_t &event_, const char *addr_) +    { +        (void) event_; +        (void) addr_; +    } +    virtual void on_event_accepted(const zmq_event_t &event_, const char *addr_) +    { +        (void) event_; +        (void) addr_; +    } +    virtual void on_event_accept_failed(const zmq_event_t &event_, const char *addr_) +    { +        (void) event_; +        (void) addr_; +    } +    virtual void on_event_closed(const zmq_event_t &event_, const char *addr_) +    { +        (void) event_; +        (void) addr_; +    } +    virtual void on_event_close_failed(const zmq_event_t &event_, const char *addr_) +    { +        (void) event_; +        (void) addr_; +    } +    virtual void on_event_disconnected(const zmq_event_t &event_, const char *addr_) +    { +        (void) event_; +        (void) addr_; +    } +#if ZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 2, 3) +    virtual void on_event_handshake_failed_no_detail(const zmq_event_t &event_, +                                                     const char *addr_) +    { +        (void) event_; +        (void) addr_; +    } +    virtual void on_event_handshake_failed_protocol(const zmq_event_t &event_, +                                                    const char *addr_) +    { +        (void) event_; +        (void) addr_; +    } +    virtual void on_event_handshake_failed_auth(const zmq_event_t &event_, +                                                const char *addr_) +    { +        (void) event_; +        (void) addr_; +    } +    virtual void on_event_handshake_succeeded(const zmq_event_t &event_, +                                              const char *addr_) +    { +        (void) event_; +        (void) addr_; +    } +#elif ZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 2, 1) +    virtual void on_event_handshake_failed(const zmq_event_t &event_, +                                           const char *addr_) +    { +        (void) event_; +        (void) addr_; +    } +    virtual void on_event_handshake_succeed(const zmq_event_t &event_, +                                            const char *addr_) +    { +        (void) event_; +        (void) addr_; +    } +#endif +    virtual void on_event_unknown(const zmq_event_t &event_, const char *addr_) +    { +        (void) event_; +        (void) addr_; +    } + +  private: +    monitor_t(const monitor_t &) ZMQ_DELETED_FUNCTION; +    void operator=(const monitor_t &) ZMQ_DELETED_FUNCTION; + +    socket_ref _socket; +    socket_t _monitor_socket; + +    void close() ZMQ_NOTHROW +    { +        if (_socket) +            zmq_socket_monitor(_socket.handle(), ZMQ_NULLPTR, 0); +        _monitor_socket.close(); +    } +}; + +#if defined(ZMQ_BUILD_DRAFT_API) && defined(ZMQ_CPP11) && defined(ZMQ_HAVE_POLLER) + +// polling events +enum class event_flags : short +{ +    none = 0, +    pollin = ZMQ_POLLIN, +    pollout = ZMQ_POLLOUT, +    pollerr = ZMQ_POLLERR, +    pollpri = ZMQ_POLLPRI +}; + +constexpr event_flags operator|(event_flags a, event_flags b) noexcept +{ +    return detail::enum_bit_or(a, b); +} +constexpr event_flags operator&(event_flags a, event_flags b) noexcept +{ +    return detail::enum_bit_and(a, b); +} +constexpr event_flags operator^(event_flags a, event_flags b) noexcept +{ +    return detail::enum_bit_xor(a, b); +} +constexpr event_flags operator~(event_flags a) noexcept +{ +    return detail::enum_bit_not(a); +} + +struct no_user_data; + +// layout compatible with zmq_poller_event_t +template<class T = no_user_data> +struct poller_event +{ +    socket_ref socket; +#ifdef _WIN32 +    SOCKET fd; +#else +    int fd; +#endif +    T *user_data; +    event_flags events; +}; + +template<typename T = no_user_data> class poller_t +{ +  public: +    using event_type = poller_event<T>; + +    poller_t() : poller_ptr(zmq_poller_new()) +    { +        if (!poller_ptr) +            throw error_t(); +    } + +    template< +      typename Dummy = void, +      typename = +        typename std::enable_if<!std::is_same<T, no_user_data>::value, Dummy>::type> +    void add(zmq::socket_ref socket, event_flags events, T *user_data) +    { +        add_impl(socket, events, user_data); +    } + +    void add(zmq::socket_ref socket, event_flags events) +    { +        add_impl(socket, events, nullptr); +    } + +    void remove(zmq::socket_ref socket) +    { +        if (0 != zmq_poller_remove(poller_ptr.get(), socket.handle())) { +            throw error_t(); +        } +    } + +    void modify(zmq::socket_ref socket, event_flags events) +    { +        if (0 +            != zmq_poller_modify(poller_ptr.get(), socket.handle(), +                                 static_cast<short>(events))) { +            throw error_t(); +        } +    } + +    size_t wait_all(std::vector<event_type> &poller_events, +                    const std::chrono::milliseconds timeout) +    { +        int rc = zmq_poller_wait_all( +          poller_ptr.get(), +          reinterpret_cast<zmq_poller_event_t *>(poller_events.data()), +          static_cast<int>(poller_events.size()), +          static_cast<long>(timeout.count())); +        if (rc > 0) +            return static_cast<size_t>(rc); + +#if ZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 2, 3) +        if (zmq_errno() == EAGAIN) +#else +        if (zmq_errno() == ETIMEDOUT) +#endif +            return 0; + +        throw error_t(); +    } + +  private: +    struct destroy_poller_t +    { +        void operator()(void *ptr) noexcept +        { +            int rc = zmq_poller_destroy(&ptr); +            ZMQ_ASSERT(rc == 0); +        } +    }; + +    std::unique_ptr<void, destroy_poller_t> poller_ptr; + +    void add_impl(zmq::socket_ref socket, event_flags events, T *user_data) +    { +        if (0 +            != zmq_poller_add(poller_ptr.get(), socket.handle(), +                              user_data, static_cast<short>(events))) { +            throw error_t(); +        } +    } +}; +#endif //  defined(ZMQ_BUILD_DRAFT_API) && defined(ZMQ_CPP11) && defined(ZMQ_HAVE_POLLER) + +inline std::ostream &operator<<(std::ostream &os, const message_t &msg) +{ +    return os << msg.str(); +} + +} // namespace zmq + +#endif // __ZMQ_HPP_INCLUDED__ diff --git a/src/EtiReader.cpp b/src/EtiReader.cpp index 33194b2..51266af 100644 --- a/src/EtiReader.cpp +++ b/src/EtiReader.cpp @@ -28,6 +28,7 @@  #include "Log.h"  #include "PcDebug.h"  #include "TimestampDecoder.h" +#include "edi/common.hpp"  #include <stdexcept>  #include <memory> @@ -544,6 +545,10 @@ void EdiTransport::Open(const std::string& uri)      const string proto = uri.substr(0, 3);      if (proto == "udp") { +        if (m_proto == Proto::TCP) { +            throw std::invalid_argument("Cannot specify both TCP and UDP urls"); +        } +          size_t found_port = uri.find_first_of(":", 6);          if (found_port == string::npos) {              throw std::invalid_argument("EDI UDP input port must be provided"); @@ -565,17 +570,15 @@ void EdiTransport::Open(const std::string& uri)          etiLog.level(info) << "EDI UDP input: host:" << m_bindto <<              ", source:" << m_mcastaddr << ", port:" << m_port; -        // The max_fragments_queued is only a protection against a runaway -        // memory usage. -        // Rough calculation: -        // 300 seconds, 24ms per frame, up to 20 fragments per frame -        const size_t max_fragments_queued = 20 * 300 * 1000 / 24; - -        m_udp_rx.start(m_port, m_bindto, m_mcastaddr, max_fragments_queued); +        m_udp_rx.add_receive_port(m_port, m_bindto, m_mcastaddr);          m_proto = Proto::UDP;          m_enabled = true;      }      else if (proto == "tcp") { +        if (m_proto != Proto::Unspecified) { +            throw std::invalid_argument("Cannot call Open several times with TCP"); +        } +          size_t found_port = uri.find_first_of(":", 6);          if (found_port == string::npos) {              throw std::invalid_argument("EDI TCP input port must be provided"); @@ -598,16 +601,47 @@ void EdiTransport::Open(const std::string& uri)  bool EdiTransport::rxPacket()  {      switch (m_proto) { +        case Proto::Unspecified: +            { +                etiLog.level(warn) << "EDI receiving from uninitialised socket"; +                return false; +            }          case Proto::UDP:              { -                auto udp_data = m_udp_rx.get_packet_buffer(); - -                if (udp_data.empty()) { +                Socket::InetAddress received_from; +                try { +                    auto received_packets = m_udp_rx.receive(100); +                    for (auto rp : received_packets) { +                        received_from = rp.received_from; + +                        EdiDecoder::Packet p; +                        p.buf = move(rp.packetdata); +                        p.received_on_port = rp.port_received_on; +                        m_decoder.push_packet(p); +                    } +                    return true; +                } +                catch (const Socket::UDPReceiver::Timeout&) {                      return false;                  } +                catch (const Socket::UDPReceiver::Interrupted&) { +                    return false; +                } +                catch (const invalid_argument& e) { +                    try { +                        fprintf(stderr, "Invalid argument receiving EDI from %s: %s\n", +                                received_from.to_string().c_str(), e.what()); +                    } +                    catch (const invalid_argument& ee) { +                        fprintf(stderr, "Invalid argument receiving EDI %s\n", e.what()); +                        fprintf(stderr, "Invalid argument converting source address %s\n", ee.what()); +                    } +                } +                catch (const runtime_error& e) { +                    fprintf(stderr, "Runtime error UDP Receive: %s\n", e.what()); +                } -                m_decoder.push_packet(udp_data); -                return true; +                return false;              }          case Proto::TCP:              { @@ -648,4 +682,3 @@ EdiInput::EdiInput(double& tist_offset_s, float edi_max_delay_ms) :          decoder.setMaxDelay(lroundf(edi_max_delay_ms / 24.0f));      }  } - diff --git a/src/EtiReader.h b/src/EtiReader.h index be3dd27..d97acf6 100644 --- a/src/EtiReader.h +++ b/src/EtiReader.h @@ -201,13 +201,13 @@ private:  };  /* The EDI input does not use the inputs defined in InputReader.h, as they were - * designed for ETI. It uses the EdiTransport which in turn uses a threaded - * receiver. + * designed for ETI.   */  class EdiTransport {      public:          EdiTransport(EdiDecoder::ETIDecoder& decoder); +        /* Can be called once when using TCP, or several times when using UDP */          void Open(const std::string& uri);          bool isEnabled(void) const { return m_enabled; } @@ -224,8 +224,8 @@ class EdiTransport {          std::string m_bindto;          std::string m_mcastaddr; -        enum class Proto { UDP, TCP }; -        Proto m_proto; +        enum class Proto { Unspecified, UDP, TCP }; +        Proto m_proto = Proto::Unspecified;          Socket::UDPReceiver m_udp_rx;          std::vector<uint8_t> m_tcpbuffer;          Socket::TCPClient m_tcpclient; diff --git a/src/InputZeroMQReader.cpp b/src/InputZeroMQReader.cpp index 995e4b0..40a07d4 100644 --- a/src/InputZeroMQReader.cpp +++ b/src/InputZeroMQReader.cpp @@ -198,7 +198,7 @@ void InputZeroMQReader::RecvProcess()                  continue;              } -            subscriber.recv(&incoming); +            subscriber.recv(incoming);              if (queue_size < m_max_queued_frames) {                  if (incoming.size() < ZMQ_DAB_MESSAGE_T_HEADERSIZE) { diff --git a/src/OutputZeroMQ.cpp b/src/OutputZeroMQ.cpp index 69f4aa1..373081b 100644 --- a/src/OutputZeroMQ.cpp +++ b/src/OutputZeroMQ.cpp @@ -68,10 +68,10 @@ int OutputZeroMQ::process(Buffer* dataIn)      if (m_type == ZMQ_REP) {          // A ZMQ_REP socket requires a request first          zmq::message_t msg; -        m_zmq_sock.recv(&msg); +        m_zmq_sock.recv(msg, zmq::recv_flags::none);      } -    m_zmq_sock.send(dataIn->getData(), dataIn->getLength()); +    m_zmq_sock.send(zmq::const_buffer{dataIn->getData(), dataIn->getLength()});      return dataIn->getLength();  } diff --git a/src/zmq.hpp b/src/zmq.hpp deleted file mode 100644 index eb5416e..0000000 --- a/src/zmq.hpp +++ /dev/null @@ -1,602 +0,0 @@ -/* -    Copyright (c) 2009-2011 250bpm s.r.o. -    Copyright (c) 2011 Botond Ballo -    Copyright (c) 2007-2009 iMatix Corporation - -    Permission is hereby granted, free of charge, to any person obtaining a copy -    of this software and associated documentation files (the "Software"), to -    deal in the Software without restriction, including without limitation the -    rights to use, copy, modify, merge, publish, distribute, sublicense, and/or -    sell copies of the Software, and to permit persons to whom the Software is -    furnished to do so, subject to the following conditions: - -    The above copyright notice and this permission notice shall be included in -    all copies or substantial portions of the Software. - -    THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -    IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -    FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -    AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -    LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING -    FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS -    IN THE SOFTWARE. -*/ - -#ifndef __ZMQ_HPP_INCLUDED__ -#define __ZMQ_HPP_INCLUDED__ - -#include <zmq.h> - -#include <algorithm> -#include <cassert> -#include <cstring> -#include <string> -#include <exception> - -//  Detect whether the compiler supports C++11 rvalue references. -#if (defined(__GNUC__) && (__GNUC__ > 4 || \ -      (__GNUC__ == 4 && __GNUC_MINOR__ > 2)) && \ -      defined(__GXX_EXPERIMENTAL_CXX0X__)) -    #define ZMQ_HAS_RVALUE_REFS -    #define ZMQ_DELETED_FUNCTION = delete -#elif defined(__clang__) -    #if __has_feature(cxx_rvalue_references) -        #define ZMQ_HAS_RVALUE_REFS -    #endif - -    #if __has_feature(cxx_deleted_functions) -        #define ZMQ_DELETED_FUNCTION = delete -    #else -        #define ZMQ_DELETED_FUNCTION -    #endif -#elif defined(_MSC_VER) && (_MSC_VER >= 1600) -    #define ZMQ_HAS_RVALUE_REFS -    #define ZMQ_DELETED_FUNCTION -#else -    #define ZMQ_DELETED_FUNCTION -#endif - -#if ZMQ_VERSION >= ZMQ_MAKE_VERSION(3, 3, 0) -#define ZMQ_NEW_MONITOR_EVENT_LAYOUT -#endif - -#if ZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 1, 0) -#define ZMQ_HAS_PROXY_STEERABLE -/*  Socket event data  */ -typedef struct { -    uint16_t event;  // id of the event as bitfield -    int32_t  value ; // value is either error code, fd or reconnect interval -} zmq_event_t; -#endif - -// In order to prevent unused variable warnings when building in non-debug -// mode use this macro to make assertions. -#ifndef NDEBUG -#   define ZMQ_ASSERT(expression) assert(expression) -#else -#   define ZMQ_ASSERT(expression) (void)(expression) -#endif - -namespace zmq -{ - -    typedef zmq_free_fn free_fn; -    typedef zmq_pollitem_t pollitem_t; - -    class error_t : public std::exception -    { -    public: - -        error_t () : errnum (zmq_errno ()) {} - -        virtual const char *what () const throw () -        { -            return zmq_strerror (errnum); -        } - -        int num () const -        { -            return errnum; -        } - -    private: - -        int errnum; -    }; - -    inline int poll (zmq_pollitem_t *items_, int nitems_, long timeout_ = -1) -    { -        int rc = zmq_poll (items_, nitems_, timeout_); -        if (rc < 0) -            throw error_t (); -        return rc; -    } - -    inline void proxy (void *frontend, void *backend, void *capture) -    { -        int rc = zmq_proxy (frontend, backend, capture); -        if (rc != 0) -            throw error_t (); -    } -     -#ifdef ZMQ_HAS_PROXY_STEERABLE -    inline void proxy_steerable (void *frontend, void *backend, void *capture, void *control) -    { -        int rc = zmq_proxy_steerable (frontend, backend, capture, control); -        if (rc != 0) -            throw error_t (); -    } -#endif -     -    inline void version (int *major_, int *minor_, int *patch_) -    { -        zmq_version (major_, minor_, patch_); -    } - -    class message_t -    { -        friend class socket_t; - -    public: - -        inline message_t () -        { -            int rc = zmq_msg_init (&msg); -            if (rc != 0) -                throw error_t (); -        } - -        inline explicit message_t (size_t size_) -        { -            int rc = zmq_msg_init_size (&msg, size_); -            if (rc != 0) -                throw error_t (); -        } - -        inline message_t (void *data_, size_t size_, free_fn *ffn_, -            void *hint_ = NULL) -        { -            int rc = zmq_msg_init_data (&msg, data_, size_, ffn_, hint_); -            if (rc != 0) -                throw error_t (); -        } - -#ifdef ZMQ_HAS_RVALUE_REFS -        inline message_t (message_t &&rhs) : msg (rhs.msg) -        { -            int rc = zmq_msg_init (&rhs.msg); -            if (rc != 0) -                throw error_t (); -        } - -        inline message_t &operator = (message_t &&rhs) -        { -            std::swap (msg, rhs.msg); -            return *this; -        } -#endif - -        inline ~message_t () -        { -            int rc = zmq_msg_close (&msg); -            ZMQ_ASSERT (rc == 0); -        } - -        inline void rebuild () -        { -            int rc = zmq_msg_close (&msg); -            if (rc != 0) -                throw error_t (); -            rc = zmq_msg_init (&msg); -            if (rc != 0) -                throw error_t (); -        } - -        inline void rebuild (size_t size_) -        { -            int rc = zmq_msg_close (&msg); -            if (rc != 0) -                throw error_t (); -            rc = zmq_msg_init_size (&msg, size_); -            if (rc != 0) -                throw error_t (); -        } - -        inline void rebuild (void *data_, size_t size_, free_fn *ffn_, -            void *hint_ = NULL) -        { -            int rc = zmq_msg_close (&msg); -            if (rc != 0) -                throw error_t (); -            rc = zmq_msg_init_data (&msg, data_, size_, ffn_, hint_); -            if (rc != 0) -                throw error_t (); -        } - -        inline void move (message_t *msg_) -        { -            int rc = zmq_msg_move (&msg, &(msg_->msg)); -            if (rc != 0) -                throw error_t (); -        } - -        inline void copy (message_t *msg_) -        { -            int rc = zmq_msg_copy (&msg, &(msg_->msg)); -            if (rc != 0) -                throw error_t (); -        } - -        inline bool more () -        { -            int rc = zmq_msg_more (&msg); -            return rc != 0; -        } - -        inline void *data () -        { -            return zmq_msg_data (&msg); -        } - -        inline const void* data () const -        { -            return zmq_msg_data (const_cast<zmq_msg_t*>(&msg)); -        } - -        inline size_t size () const -        { -            return zmq_msg_size (const_cast<zmq_msg_t*>(&msg)); -        } - -    private: - -        //  The underlying message -        zmq_msg_t msg; - -        //  Disable implicit message copying, so that users won't use shared -        //  messages (less efficient) without being aware of the fact. -        message_t (const message_t&); -        void operator = (const message_t&); -    }; - -    class context_t -    { -        friend class socket_t; - -    public: -        inline context_t () -        { -            ptr = zmq_ctx_new (); -            if (ptr == NULL) -                throw error_t (); -        } - - -        inline explicit context_t (int io_threads_, int max_sockets_ = ZMQ_MAX_SOCKETS_DFLT) -        { -            ptr = zmq_ctx_new (); -            if (ptr == NULL) -                throw error_t (); - -            int rc = zmq_ctx_set (ptr, ZMQ_IO_THREADS, io_threads_); -            ZMQ_ASSERT (rc == 0); - -            rc = zmq_ctx_set (ptr, ZMQ_MAX_SOCKETS, max_sockets_); -            ZMQ_ASSERT (rc == 0); -        } - -#ifdef ZMQ_HAS_RVALUE_REFS -        inline context_t (context_t &&rhs) : ptr (rhs.ptr) -        { -            rhs.ptr = NULL; -        } -        inline context_t &operator = (context_t &&rhs) -        { -            std::swap (ptr, rhs.ptr); -            return *this; -        } -#endif - -        inline ~context_t () -        { -            close(); -        } - -        inline void close() -        { -            if (ptr == NULL) -                return; -            int rc = zmq_ctx_destroy (ptr); -            ZMQ_ASSERT (rc == 0); -            ptr = NULL; -        } - -        //  Be careful with this, it's probably only useful for -        //  using the C api together with an existing C++ api. -        //  Normally you should never need to use this. -        inline operator void* () -        { -            return ptr; -        } - -    private: - -        void *ptr; - -        context_t (const context_t&); -        void operator = (const context_t&); -    }; - -    class socket_t -    { -        friend class monitor_t; -    public: - -        inline socket_t (context_t &context_, int type_) -        { -            ctxptr = context_.ptr; -            ptr = zmq_socket (context_.ptr, type_); -            if (ptr == NULL) -                throw error_t (); -        } - -#ifdef ZMQ_HAS_RVALUE_REFS -        inline socket_t(socket_t&& rhs) : ptr(rhs.ptr) -        { -            rhs.ptr = NULL; -        } -        inline socket_t& operator=(socket_t&& rhs) -        { -            std::swap(ptr, rhs.ptr); -            return *this; -        } -#endif - -        inline ~socket_t () -        { -            close(); -        } - -        inline operator void* () -        { -            return ptr; -        } - -        inline void close() -        { -            if(ptr == NULL) -                // already closed -                return ; -            int rc = zmq_close (ptr); -            ZMQ_ASSERT (rc == 0); -            ptr = 0 ; -        } - -        inline void setsockopt (int option_, const void *optval_, -            size_t optvallen_) -        { -            int rc = zmq_setsockopt (ptr, option_, optval_, optvallen_); -            if (rc != 0) -                throw error_t (); -        } - -        inline void getsockopt (int option_, void *optval_, -            size_t *optvallen_) -        { -            int rc = zmq_getsockopt (ptr, option_, optval_, optvallen_); -            if (rc != 0) -                throw error_t (); -        } -         -        inline void bind (const char *addr_) -        { -            int rc = zmq_bind (ptr, addr_); -            if (rc != 0) -                throw error_t (); -        } - -        inline void unbind (const char *addr_) -        { -            int rc = zmq_unbind (ptr, addr_); -            if (rc != 0) -                throw error_t (); -        } - -        inline void connect (const char *addr_) -        { -            int rc = zmq_connect (ptr, addr_); -            if (rc != 0) -                throw error_t (); -        } - -        inline void disconnect (const char *addr_) -        { -            int rc = zmq_disconnect (ptr, addr_); -            if (rc != 0) -                throw error_t (); -        } - -        inline bool connected() -        { -            return(ptr != NULL); -        } -         -        inline size_t send (const void *buf_, size_t len_, int flags_ = 0) -        { -            int nbytes = zmq_send (ptr, buf_, len_, flags_); -            if (nbytes >= 0) -                return (size_t) nbytes; -            if (zmq_errno () == EAGAIN) -                return 0; -            throw error_t (); -        } - -        inline bool send (message_t &msg_, int flags_ = 0) -        { -            int nbytes = zmq_msg_send (&(msg_.msg), ptr, flags_); -            if (nbytes >= 0) -                return true; -            if (zmq_errno () == EAGAIN) -                return false; -            throw error_t (); -        } - -#ifdef ZMQ_HAS_RVALUE_REFS -        inline bool send (message_t &&msg_, int flags_ = 0) -        { -            return send(msg_, flags_); -        } -#endif - -        inline size_t recv (void *buf_, size_t len_, int flags_ = 0) -        { -            int nbytes = zmq_recv (ptr, buf_, len_, flags_); -            if (nbytes >= 0) -                return (size_t) nbytes; -            if (zmq_errno () == EAGAIN) -                return 0; -            throw error_t (); -        } - -        inline bool recv (message_t *msg_, int flags_ = 0) -        { -            int nbytes = zmq_msg_recv (&(msg_->msg), ptr, flags_); -            if (nbytes >= 0) -                return true; -            if (zmq_errno () == EAGAIN) -                return false; -            throw error_t (); -        } -         -    private: -        void *ptr; -        void *ctxptr; - -        socket_t (const socket_t&) ZMQ_DELETED_FUNCTION; -        void operator = (const socket_t&) ZMQ_DELETED_FUNCTION; -    }; - -    class monitor_t -    { -    public: -        monitor_t() : socketPtr(NULL) {} -        virtual ~monitor_t() {} - -        void monitor(socket_t &socket, const char *addr_, int events = ZMQ_EVENT_ALL) -        { -            int rc = zmq_socket_monitor(socket.ptr, addr_, events); -            if (rc != 0) -                throw error_t (); - -            socketPtr = socket.ptr; -            void *s = zmq_socket (socket.ctxptr, ZMQ_PAIR); -            assert (s); - -            rc = zmq_connect (s, addr_); -            assert (rc == 0); -             -            on_monitor_started(); -             -            while (true) { -                zmq_msg_t eventMsg; -                zmq_msg_init (&eventMsg); -                rc = zmq_recvmsg (s, &eventMsg, 0); -                if (rc == -1 && zmq_errno() == ETERM) -                    break; -                assert (rc != -1); -#if ZMQ_VERSION_MAJOR >= 4 -                const char* data = static_cast<const char*>(zmq_msg_data(&eventMsg)); -                zmq_event_t msgEvent; -                memcpy(&msgEvent.event, data, sizeof(uint16_t)); data += sizeof(uint16_t); -                memcpy(&msgEvent.value, data, sizeof(int32_t)); -                zmq_event_t* event = &msgEvent; -#else -                zmq_event_t* event = static_cast<zmq_event_t*>(zmq_msg_data(&eventMsg)); -#endif -                 -#ifdef ZMQ_NEW_MONITOR_EVENT_LAYOUT -                zmq_msg_t addrMsg; -                zmq_msg_init (&addrMsg); -                rc = zmq_recvmsg (s, &addrMsg, 0); -                if (rc == -1 && zmq_errno() == ETERM) -                    break; -                assert (rc != -1); -                const char* str = static_cast<const char*>(zmq_msg_data (&addrMsg)); -                std::string address(str, str + zmq_msg_size(&addrMsg)); -                zmq_msg_close (&addrMsg); -#else -                // Bit of a hack, but all events in the zmq_event_t union have the same layout so this will work for all event types. -                std::string address = event->data.connected.addr; -#endif - -#ifdef ZMQ_EVENT_MONITOR_STOPPED -                if (event->event == ZMQ_EVENT_MONITOR_STOPPED) -                    break; -#endif - -                switch (event->event) { -                case ZMQ_EVENT_CONNECTED: -                    on_event_connected(*event, address.c_str()); -                    break; -                case ZMQ_EVENT_CONNECT_DELAYED: -                    on_event_connect_delayed(*event, address.c_str()); -                    break; -                case ZMQ_EVENT_CONNECT_RETRIED: -                    on_event_connect_retried(*event, address.c_str()); -                    break; -                case ZMQ_EVENT_LISTENING: -                    on_event_listening(*event, address.c_str()); -                    break; -                case ZMQ_EVENT_BIND_FAILED: -                    on_event_bind_failed(*event, address.c_str()); -                    break; -                case ZMQ_EVENT_ACCEPTED: -                    on_event_accepted(*event, address.c_str()); -                    break; -                case ZMQ_EVENT_ACCEPT_FAILED: -                    on_event_accept_failed(*event, address.c_str()); -                    break; -                case ZMQ_EVENT_CLOSED: -                    on_event_closed(*event, address.c_str()); -                    break; -                case ZMQ_EVENT_CLOSE_FAILED: -                    on_event_close_failed(*event, address.c_str()); -                    break; -                case ZMQ_EVENT_DISCONNECTED: -                    on_event_disconnected(*event, address.c_str()); -                    break; -                default: -                    on_event_unknown(*event, address.c_str()); -                    break; -                } -                zmq_msg_close (&eventMsg); -            } -            zmq_close (s); -            socketPtr = NULL; -        } - -#ifdef ZMQ_EVENT_MONITOR_STOPPED -        void abort() -        { -            if (socketPtr) -                zmq_socket_monitor(socketPtr, NULL, 0); -        } -#endif -        virtual void on_monitor_started() {} -        virtual void on_event_connected(const zmq_event_t &event_, const char* addr_) { (void)event_; (void)addr_; } -        virtual void on_event_connect_delayed(const zmq_event_t &event_, const char* addr_) { (void)event_; (void)addr_; } -        virtual void on_event_connect_retried(const zmq_event_t &event_, const char* addr_) { (void)event_; (void)addr_; } -        virtual void on_event_listening(const zmq_event_t &event_, const char* addr_) { (void)event_; (void)addr_; } -        virtual void on_event_bind_failed(const zmq_event_t &event_, const char* addr_) { (void)event_; (void)addr_; } -        virtual void on_event_accepted(const zmq_event_t &event_, const char* addr_) { (void)event_; (void)addr_; } -        virtual void on_event_accept_failed(const zmq_event_t &event_, const char* addr_) { (void)event_; (void)addr_; } -        virtual void on_event_closed(const zmq_event_t &event_, const char* addr_) { (void)event_; (void)addr_; } -        virtual void on_event_close_failed(const zmq_event_t &event_, const char* addr_) { (void)event_; (void)addr_; } -        virtual void on_event_disconnected(const zmq_event_t &event_, const char* addr_) { (void)event_; (void)addr_; } -        virtual void on_event_unknown(const zmq_event_t &event_, const char* addr_) { (void)event_; (void)addr_; } -    private: -        void* socketPtr; -    }; -} - -#endif | 
