From 12670a017ddb14fbf4a932799051dcfe21dd6c78 Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Fri, 15 Jan 2021 07:09:03 +0100 Subject: Common 6b5db53: Update zmq.hpp, TCPReceiveServer, EDI decoder and output --- doc/advanced.mux | 19 +- lib/RemoteControl.cpp | 18 +- lib/Socket.cpp | 12 +- lib/Socket.h | 23 +- lib/edi/PFT.cpp | 40 + lib/edi/PFT.hpp | 7 +- lib/edi/STIDecoder.cpp | 18 +- lib/edi/STIDecoder.hpp | 7 +- lib/edi/STIWriter.cpp | 1 + lib/edi/STIWriter.hpp | 1 + lib/edi/common.cpp | 31 +- lib/edi/common.hpp | 16 +- lib/edioutput/EDIConfig.h | 3 +- lib/edioutput/PFT.cpp | 1 - lib/edioutput/PFT.h | 5 +- lib/edioutput/TagItems.cpp | 6 +- lib/edioutput/TagItems.h | 2 +- lib/edioutput/Transport.cpp | 6 +- lib/zmq.hpp | 2408 +++++++++++++++++++++++++++++++++++-------- src/DabMux.cpp | 18 +- src/input/Edi.cpp | 22 +- src/input/Zmq.cpp | 8 +- src/zmq2edi/zmq2edi.cpp | 20 +- 23 files changed, 2153 insertions(+), 539 deletions(-) diff --git a/doc/advanced.mux b/doc/advanced.mux index b7ff69e..3aa3a22 100644 --- a/doc/advanced.mux +++ b/doc/advanced.mux @@ -389,19 +389,24 @@ outputs { ; example for unicast EDI over UDP ; for unicast EDI, do not set source protocol udp - destination "192.168.23.23" sourceport 13000 + destination "192.168.23.23" + port 12000 + + ; For compatibility: if port is not specified in the destination itself, + ; it is taken from the parent 'destinations' block. } example_multicast { ; example for multicast EDI, the source IP is required ; so that the data is sent on the correct ethernet interface protocol udp - destination "232.20.10.1" source "192.168.0.50" + sourceport 13000 + destination "232.20.10.1" + port 12000 ; The multicast TTL has to be adapted according to your network ttl 1 - sourceport 13000 } example_tcp { ; example for EDI TCP server. TCP is reliable, so it is counterproductive to @@ -420,19 +425,11 @@ outputs { } ; The settings below apply to all destinations - ; The destination port cannot be set independently for - ; different outputs because it is encoded in the transport - ; header of the PFT layer. - ; Necessary when using UDP, optional when only using TCP. - port 12000 ; Enable the PFT subsystem. If false, AFPackets are sent. ; PFT is not necessary when using TCP. enable_pft false - enable_transport_addressing true - ; Enables the Addr flag in PFT and sets Source and Dest fields. - ; How many lost fragments can be recovered by Reed-Solomon. ; Requires enable_pft true. ; diff --git a/lib/RemoteControl.cpp b/lib/RemoteControl.cpp index 4adb90c..9ca8d22 100644 --- a/lib/RemoteControl.cpp +++ b/lib/RemoteControl.cpp @@ -29,6 +29,7 @@ #include #include "RemoteControl.h" +#include "zmq.hpp" using namespace std; @@ -424,7 +425,7 @@ void RemoteControllerZmq::recv_all(zmq::socket_t& pSocket, std::vector 0) ? ZMQ_SNDMORE : 0; - repSocket.send(zmsg, flag); + repSocket.send(zmsg, (--cohort_size > 0) ? zmq::send_flags::sndmore : zmq::send_flags::none); } } else if (msg.size() == 2 && command == "show") { @@ -523,8 +523,7 @@ void RemoteControllerZmq::process() zmq::message_t zmsg(ss.str().size()); memcpy(zmsg.data(), ss.str().data(), ss.str().size()); - int flag = (--r_size > 0) ? ZMQ_SNDMORE : 0; - repSocket.send(zmsg, flag); + repSocket.send(zmsg, (--r_size > 0) ? zmq::send_flags::sndmore : zmq::send_flags::none); } } catch (const ParameterError &err) { @@ -539,7 +538,7 @@ void RemoteControllerZmq::process() std::string value = rcs.get_param(module, parameter); zmq::message_t zmsg(value.size()); memcpy ((void*) zmsg.data(), value.data(), value.size()); - repSocket.send(zmsg, 0); + repSocket.send(zmsg, zmq::send_flags::none); } catch (const ParameterError &err) { send_fail_reply(repSocket, err.what()); @@ -576,4 +575,3 @@ void RemoteControllerZmq::process() } #endif - diff --git a/lib/Socket.cpp b/lib/Socket.cpp index d41ed1c..6a20429 100644 --- a/lib/Socket.cpp +++ b/lib/Socket.cpp @@ -862,9 +862,9 @@ TCPReceiveServer::~TCPReceiveServer() } } -vector TCPReceiveServer::receive() +shared_ptr TCPReceiveServer::receive() { - vector buffer; + shared_ptr buffer = make_shared(); m_queue.try_pop(buffer); // we can ignore try_pop()'s return value, because @@ -892,11 +892,12 @@ void TCPReceiveServer::process() } else if (r == 0) { sock.close(); + m_queue.push(make_shared()); break; } else { buf.resize(r); - m_queue.push(move(buf)); + m_queue.push(make_shared(move(buf))); } } catch (const TCPSocket::Interrupted&) { @@ -905,6 +906,11 @@ void TCPReceiveServer::process() catch (const TCPSocket::Timeout&) { num_timeouts++; } + catch (const runtime_error& e) { + sock.close(); + // TODO replace fprintf + fprintf(stderr, "TCP Receiver restarted after error: %s\n", e.what()); + } if (num_timeouts > max_num_timeouts) { sock.close(); diff --git a/lib/Socket.h b/lib/Socket.h index 8881be3..2291dd5 100644 --- a/lib/Socket.h +++ b/lib/Socket.h @@ -30,11 +30,12 @@ #include "ThreadsafeQueue.h" #include -#include -#include #include -#include +#include #include +#include +#include +#include #include #include @@ -265,6 +266,14 @@ class TCPDataDispatcher std::list m_connections; }; +struct TCPReceiveMessage { virtual ~TCPReceiveMessage() {}; }; +struct TCPReceiveMessageDisconnected : public TCPReceiveMessage { }; +struct TCPReceiveMessageEmpty : public TCPReceiveMessage { }; +struct TCPReceiveMessageData : public TCPReceiveMessage { + TCPReceiveMessageData(std::vector d) : data(d) {}; + std::vector data; +}; + /* A TCP Server to receive data, which abstracts the handling of connects and disconnects. */ class TCPReceiveServer { @@ -276,15 +285,15 @@ class TCPReceiveServer { void start(int listen_port, const std::string& address); - // Return a vector that contains up to blocksize bytes of data, or - // and empty vector if no data is available. - std::vector receive(); + // Return an instance of a subclass of TCPReceiveMessage that contains up to blocksize + // bytes of data, or TCPReceiveMessageEmpty if no data is available. + std::shared_ptr receive(); private: void process(); size_t m_blocksize = 0; - ThreadsafeQueue > m_queue; + ThreadsafeQueue > m_queue; std::atomic m_running = ATOMIC_VAR_INIT(false); std::string m_exception_data; std::thread m_listener_thread; diff --git a/lib/edi/PFT.cpp b/lib/edi/PFT.cpp index 158b206..85d6b63 100644 --- a/lib/edi/PFT.cpp +++ b/lib/edi/PFT.cpp @@ -23,6 +23,7 @@ #include #include #include +#include #include #include #include @@ -108,12 +109,19 @@ class FECDecoder { }; size_t Fragment::loadData(const std::vector &buf) +{ + return loadData(buf, 0); +} + +size_t Fragment::loadData(const std::vector &buf, int received_on_port) { const size_t header_len = 14; if (buf.size() < header_len) { return 0; } + this->received_on_port = received_on_port; + size_t index = 0; // Parse PFT Fragment Header (ETSI TS 102 821 V1.4.1 ch7.1) @@ -461,6 +469,32 @@ std::string AFBuilder::visualise() const return ss.str(); } +std::string AFBuilder::visualise_fragment_origins() const +{ + stringstream ss; + if (_fragments.size() == 0) { + return "No fragments"; + } + else { + ss << _fragments.size() << " fragments: "; + } + + std::map port_count; + + for (const auto& f : _fragments) { + port_count[f.second.received_on_port]++; + } + + for (const auto& p : port_count) { + ss << "p" << p.first << " " << + std::round(100.0 * ((double)p.second) / (double)_fragments.size()) << "% "; + } + + ss << "\n"; + + return ss.str(); +} + void PFT::pushPFTFrag(const Fragment &fragment) { // Start decoding the first pseq we receive. In normal @@ -518,6 +552,9 @@ std::vector PFT::getNextAFPacket() if (builder.canAttemptToDecode() == dar_t::yes) { auto afpacket = builder.extractAF(); assert(not afpacket.empty()); + if (m_verbose) { + etiLog.level(debug) << "Fragment origin stats: " << builder.visualise_fragment_origins(); + } incrementNextPseq(); return afpacket; } @@ -533,6 +570,9 @@ std::vector PFT::getNextAFPacket() if (afpacket.empty()) { etiLog.log(debug,"pseq %d timed out after RS", m_next_pseq); } + if (m_verbose) { + etiLog.level(debug) << "Fragment origin stats: " << builder.visualise_fragment_origins(); + } incrementNextPseq(); return afpacket; } diff --git a/lib/edi/PFT.hpp b/lib/edi/PFT.hpp index 208fd70..08dca45 100644 --- a/lib/edi/PFT.hpp +++ b/lib/edi/PFT.hpp @@ -36,11 +36,14 @@ using findex_t = uint32_t; // findex is a 24-bit value class Fragment { public: + int received_on_port = 0; + // Load the data for one fragment from buf into // the Fragment. // \returns the number of bytes of useful data found in buf // A non-zero return value doesn't imply a valid fragment // the isValid() method must be used to verify this. + size_t loadData(const std::vector &buf, int received_on_port); size_t loadData(const std::vector &buf); bool isValid() const { return _valid; } @@ -111,7 +114,9 @@ class AFBuilder return {_fragments.size(), _Fcount}; } - std::string visualise(void) const; + std::string visualise() const; + + std::string visualise_fragment_origins() const; /* The user of this instance can keep track of the lifetime of this * builder diff --git a/lib/edi/STIDecoder.cpp b/lib/edi/STIDecoder.cpp index b6b9878..99f7c11 100644 --- a/lib/edi/STIDecoder.cpp +++ b/lib/edi/STIDecoder.cpp @@ -60,9 +60,9 @@ void STIDecoder::push_bytes(const vector &buf) m_dispatcher.push_bytes(buf); } -void STIDecoder::push_packet(const vector &buf) +void STIDecoder::push_packet(Packet &pack) { - m_dispatcher.push_packet(buf); + m_dispatcher.push_packet(pack); } void STIDecoder::setMaxDelay(int num_af_packets) @@ -107,7 +107,7 @@ bool STIDecoder::decode_dsti(const std::vector& value, const tag_name_t uint8_t dfcth = (dstiHeader >> 8) & 0x1F; uint8_t dfctl = dstiHeader & 0xFF; - md.dflc = dfcth * 250 + dfctl; // modulo 5000 counter + md.dlfc = dfcth * 250 + dfctl; // modulo 5000 counter const size_t expected_length = 2 + (md.stihf ? 3 : 0) + @@ -115,10 +115,14 @@ bool STIDecoder::decode_dsti(const std::vector& value, const tag_name_t (md.rfadf ? 9 : 0); if (value.size() != expected_length) { - throw std::runtime_error("EDI dsti: decoding error:" - "value.size() != expected_length: " + - to_string(value.size()) + " " + - to_string(expected_length)); + etiLog.level(warn) << "EDI dsti: decoding error: " << + "value.size() != expected_length: " << + value.size() << " " << + expected_length << " " << + (md.stihf ? "STIHF " : " ") << + (md.atstf ? "ATSTF " : " ") << + (md.rfadf ? "RFADF " : " "); + return false; } if (md.stihf) { diff --git a/lib/edi/STIDecoder.hpp b/lib/edi/STIDecoder.hpp index 9d55728..28887f2 100644 --- a/lib/edi/STIDecoder.hpp +++ b/lib/edi/STIDecoder.hpp @@ -34,7 +34,7 @@ struct sti_management_data { bool stihf; bool atstf; bool rfadf; - uint16_t dflc; + uint16_t dlfc; uint32_t tsta; }; @@ -104,14 +104,15 @@ class STIDecoder { /* Push bytes into the decoder. The buf can contain more * than a single packet. This is useful when reading from streams - * (files, TCP) + * (files, TCP). Pushing an empty buf will clear the internal decoder + * state to ensure realignment (e.g. on stream reconnection) */ void push_bytes(const std::vector &buf); /* Push a complete packet into the decoder. Useful for UDP and other * datagram-oriented protocols. */ - void push_packet(const std::vector &buf); + void push_packet(Packet &pack); /* Set the maximum delay in number of AF Packets before we * abandon decoding a given pseq. diff --git a/lib/edi/STIWriter.cpp b/lib/edi/STIWriter.cpp index a7e4f20..29f0124 100644 --- a/lib/edi/STIWriter.cpp +++ b/lib/edi/STIWriter.cpp @@ -123,6 +123,7 @@ void STIWriter::assemble() // TODO check time validity sti_frame_t stiFrame; + stiFrame.dlfc = m_management_data.dlfc; stiFrame.frame = move(m_payload.istd); stiFrame.timestamp.seconds = m_seconds; stiFrame.timestamp.utco = m_utco; diff --git a/lib/edi/STIWriter.hpp b/lib/edi/STIWriter.hpp index fc08e97..a7a5cda 100644 --- a/lib/edi/STIWriter.hpp +++ b/lib/edi/STIWriter.hpp @@ -32,6 +32,7 @@ namespace EdiDecoder { struct sti_frame_t { std::vector frame; + uint16_t dlfc; frame_timestamp_t timestamp; audio_level_data audio_levels; odr_version_data version_data; diff --git a/lib/edi/common.cpp b/lib/edi/common.cpp index 306261a..7907656 100644 --- a/lib/edi/common.cpp +++ b/lib/edi/common.cpp @@ -22,6 +22,7 @@ #include "buffer_unpack.hpp" #include "Log.h" #include "crc.h" +#include #include #include #include @@ -142,6 +143,12 @@ void TagDispatcher::set_verbose(bool verbose) void TagDispatcher::push_bytes(const vector &buf) { + if (buf.empty()) { + m_input_data.clear(); + m_last_seq_valid = false; + return; + } + copy(buf.begin(), buf.end(), back_inserter(m_input_data)); while (m_input_data.size() > 2) { @@ -194,14 +201,16 @@ void TagDispatcher::push_bytes(const vector &buf) } } else { - etiLog.log(warn,"Unknown %c!", *m_input_data.data()); + etiLog.log(warn, "Unknown 0x%02x!", *m_input_data.data()); m_input_data.erase(m_input_data.begin()); } } } -void TagDispatcher::push_packet(const vector &buf) +void TagDispatcher::push_packet(const Packet &packet) { + auto& buf = packet.buf; + if (buf.size() < 2) { throw std::invalid_argument("Not enough bytes to read EDI packet header"); } @@ -216,7 +225,7 @@ void TagDispatcher::push_packet(const vector &buf) } else if (buf[0] == 'P' and buf[1] == 'F') { PFT::Fragment fragment; - fragment.loadData(buf); + fragment.loadData(buf, packet.received_on_port); if (fragment.isValid()) { m_pft.pushPFTFrag(fragment); @@ -232,11 +241,10 @@ void TagDispatcher::push_packet(const vector &buf) } } else { - const char packettype[3] = {(char)buf[0], (char)buf[1], '\0'}; std::stringstream ss; - ss << "Unknown EDI packet "; - ss << packettype; - throw std::invalid_argument(ss.str()); + ss << "Unknown EDI packet " << std::hex << (int)buf[0] << " " << (int)buf[1]; + m_ignored_tags.clear(); + throw invalid_argument(ss.str()); } } @@ -268,6 +276,7 @@ decode_state_t TagDispatcher::decode_afpacket( const uint16_t expected_seq = m_last_seq + 1; if (expected_seq != seq) { etiLog.level(warn) << "EDI AF Packet sequence error, " << seq; + m_ignored_tags.clear(); } } else { @@ -303,8 +312,7 @@ decode_state_t TagDispatcher::decode_afpacket( uint16_t packet_crc = read_16b(input_data.begin() + AFPACKET_HEADER_LEN + taglength); if (packet_crc != crc) { - throw invalid_argument( - "AF Packet crc wrong"); + throw invalid_argument("AF Packet crc wrong"); } else { vector payload(taglength); @@ -379,7 +387,10 @@ bool TagDispatcher::decode_tagpacket(const vector &payload) } if (not found) { - etiLog.log(warn, "Ignoring unknown TAG %s", tag.c_str()); + if (std::find(m_ignored_tags.begin(), m_ignored_tags.end(), tag) == m_ignored_tags.end()) { + etiLog.log(warn, "Ignoring unknown TAG %s", tag.c_str()); + m_ignored_tags.push_back(tag); + } break; } diff --git a/lib/edi/common.hpp b/lib/edi/common.hpp index c8c4bb3..14b91ba 100644 --- a/lib/edi/common.hpp +++ b/lib/edi/common.hpp @@ -60,6 +60,14 @@ using tag_name_t = std::array; std::string tag_name_to_human_readable(const tag_name_t& name); +struct Packet { + std::vector buf; + int received_on_port; + + Packet(std::vector&& b) : buf(b), received_on_port(0) { } + Packet() {} +}; + /* The TagDispatcher takes care of decoding EDI, with or without PFT, and * will call functions when TAGs are encountered. * @@ -72,17 +80,17 @@ class TagDispatcher { void set_verbose(bool verbose); - /* Push bytes into the decoder. The buf can contain more * than a single packet. This is useful when reading from streams - * (files, TCP) + * (files, TCP). Pushing an empty buf will clear the internal decoder + * state to ensure realignment (e.g. on stream reconnection) */ void push_bytes(const std::vector &buf); /* Push a complete packet into the decoder. Useful for UDP and other * datagram-oriented protocols. */ - void push_packet(const std::vector &buf); + void push_packet(const Packet &packet); /* Set the maximum delay in number of AF Packets before we * abandon decoding a given pseq. @@ -113,6 +121,8 @@ class TagDispatcher { std::map m_handlers; std::function m_af_packet_completed; tagpacket_handler m_tagpacket_handler; + + std::vector m_ignored_tags; }; // Data carried inside the ODRv EDI TAG diff --git a/lib/edioutput/EDIConfig.h b/lib/edioutput/EDIConfig.h index 4f1df97..647d77e 100644 --- a/lib/edioutput/EDIConfig.h +++ b/lib/edioutput/EDIConfig.h @@ -44,6 +44,7 @@ struct destination_t { // Can represent both unicast and multicast destinations struct udp_destination_t : public destination_t { std::string dest_addr; + unsigned int dest_port = 0; std::string source_addr; unsigned int source_port = 0; unsigned int ttl = 10; @@ -68,10 +69,8 @@ struct configuration_t { bool dump = false; // dump a file with the EDI packets bool verbose = false; bool enable_pft = false; // Enable protection and fragmentation - bool enable_transport_header = true; // Sets Addr, Source and Dest in PFT unsigned int tagpacket_alignment = 0; std::vector > destinations; - unsigned int dest_port = 0; // common destination port, because it's encoded in the transport layer unsigned int latency_frames = 0; // if nonzero, enable interleaver with a latency of latency_frames * 24ms bool enabled() const { return destinations.size() > 0; } diff --git a/lib/edioutput/PFT.cpp b/lib/edioutput/PFT.cpp index 1e3d4da..b2f07e0 100644 --- a/lib/edioutput/PFT.cpp +++ b/lib/edioutput/PFT.cpp @@ -55,7 +55,6 @@ PFT::PFT() { } PFT::PFT(const configuration_t &conf) : m_k(conf.chunk_len), m_m(conf.fec), - m_dest_port(conf.dest_port), m_pseq(0), m_num_chunks(0), m_verbose(conf.verbose) diff --git a/lib/edioutput/PFT.h b/lib/edioutput/PFT.h index 1019915..4d138c5 100644 --- a/lib/edioutput/PFT.h +++ b/lib/edioutput/PFT.h @@ -68,13 +68,14 @@ class PFT private: unsigned int m_k = 207; // length of RS data word unsigned int m_m = 3; // number of fragments that can be recovered if lost - unsigned int m_dest_port = 12000; // Destination port for transport header uint16_t m_pseq = 0; size_t m_num_chunks = 0; bool m_verbose = false; - bool m_transport_header = false; + // Transport header is always deactivated + const bool m_transport_header = false; const uint16_t m_addr_source = 0; + const unsigned int m_dest_port = 0; }; } diff --git a/lib/edioutput/TagItems.cpp b/lib/edioutput/TagItems.cpp index 9746469..739adfa 100644 --- a/lib/edioutput/TagItems.cpp +++ b/lib/edioutput/TagItems.cpp @@ -212,8 +212,8 @@ std::vector TagDSTI::Assemble() packet.push_back(0); packet.push_back(0); - uint8_t dfctl = dflc % 250; - uint8_t dfcth = dflc / 250; + uint8_t dfctl = dlfc % 250; + uint8_t dfcth = dlfc / 250; uint16_t dstiHeader = dfctl | (dfcth << 8) | (rfadf << 13) | (atstf << 14) | (stihf << 15); @@ -254,7 +254,7 @@ std::vector TagDSTI::Assemble() packet[6] = (taglength >> 8) & 0xFF; packet[7] = taglength & 0xFF; - dflc = (dflc+1) % 5000; + dlfc = (dlfc+1) % 5000; /* std::cerr << "TagItem dsti, packet.size " << packet.size() << std::endl; diff --git a/lib/edioutput/TagItems.h b/lib/edioutput/TagItems.h index 5c81b01..f24dc44 100644 --- a/lib/edioutput/TagItems.h +++ b/lib/edioutput/TagItems.h @@ -147,7 +147,7 @@ class TagDSTI : public TagItem bool stihf = false; bool atstf = false; // presence of atst data bool rfadf = false; - uint16_t dflc = 0; // modulo 5000 frame counter + uint16_t dlfc = 0; // modulo 5000 frame counter // STI Header (optional) uint8_t stat = 0; diff --git a/lib/edioutput/Transport.cpp b/lib/edioutput/Transport.cpp index fa7588a..cfed9ec 100644 --- a/lib/edioutput/Transport.cpp +++ b/lib/edioutput/Transport.cpp @@ -38,7 +38,7 @@ void configuration_t::print() const etiLog.level(info) << " verbose " << verbose; for (auto edi_dest : destinations) { if (auto udp_dest = dynamic_pointer_cast(edi_dest)) { - etiLog.level(info) << " UDP to " << udp_dest->dest_addr << ":" << dest_port; + etiLog.level(info) << " UDP to " << udp_dest->dest_addr << ":" << udp_dest->dest_port; if (not udp_dest->source_addr.empty()) { etiLog.level(info) << " source " << udp_dest->source_addr; etiLog.level(info) << " ttl " << udp_dest->ttl; @@ -148,7 +148,7 @@ void Sender::write(const TagPacket& tagpacket) for (auto& dest : m_conf.destinations) { if (const auto& udp_dest = dynamic_pointer_cast(dest)) { Socket::InetAddress addr; - addr.resolveUdpDestination(udp_dest->dest_addr, m_conf.dest_port); + addr.resolveUdpDestination(udp_dest->dest_addr, udp_dest->dest_port); udp_sockets.at(udp_dest.get())->send(edi_frag, addr); } @@ -176,7 +176,7 @@ void Sender::write(const TagPacket& tagpacket) for (auto& dest : m_conf.destinations) { if (const auto& udp_dest = dynamic_pointer_cast(dest)) { Socket::InetAddress addr; - addr.resolveUdpDestination(udp_dest->dest_addr, m_conf.dest_port); + addr.resolveUdpDestination(udp_dest->dest_addr, udp_dest->dest_port); if (af_packet.size() > 1400 and not m_udp_fragmentation_warning_printed) { fprintf(stderr, "EDI Output: AF packet larger than 1400," diff --git a/lib/zmq.hpp b/lib/zmq.hpp index eb5416e..74a0574 100644 --- a/lib/zmq.hpp +++ b/lib/zmq.hpp @@ -1,4 +1,5 @@ /* + Copyright (c) 2016-2017 ZeroMQ community Copyright (c) 2009-2011 250bpm s.r.o. Copyright (c) 2011 Botond Ballo Copyright (c) 2007-2009 iMatix Corporation @@ -25,35 +26,130 @@ #ifndef __ZMQ_HPP_INCLUDED__ #define __ZMQ_HPP_INCLUDED__ +// macros defined if has a specific standard or greater +#if (defined(__cplusplus) && __cplusplus >= 201103L) || (defined(_MSC_VER) && _MSC_VER >= 1900) + #define ZMQ_CPP11 +#endif +#if (defined(__cplusplus) && __cplusplus >= 201402L) || \ + (defined(_HAS_CXX14) && _HAS_CXX14 == 1) || \ + (defined(_HAS_CXX17) && _HAS_CXX17 == 1) // _HAS_CXX14 might not be defined when using C++17 on MSVC + #define ZMQ_CPP14 +#endif +#if (defined(__cplusplus) && __cplusplus >= 201703L) || (defined(_HAS_CXX17) && _HAS_CXX17 == 1) + #define ZMQ_CPP17 +#endif + +#if defined(ZMQ_CPP14) +#define ZMQ_DEPRECATED(msg) [[deprecated(msg)]] +#elif defined(_MSC_VER) +#define ZMQ_DEPRECATED(msg) __declspec(deprecated(msg)) +#elif defined(__GNUC__) +#define ZMQ_DEPRECATED(msg) __attribute__((deprecated(msg))) +#endif + +#if defined(ZMQ_CPP17) +#define ZMQ_NODISCARD [[nodiscard]] +#else +#define ZMQ_NODISCARD +#endif + +#if defined(ZMQ_CPP11) +#define ZMQ_NOTHROW noexcept +#define ZMQ_EXPLICIT explicit +#define ZMQ_OVERRIDE override +#define ZMQ_NULLPTR nullptr +#define ZMQ_CONSTEXPR_FN constexpr +#define ZMQ_CONSTEXPR_VAR constexpr +#else +#define ZMQ_NOTHROW throw() +#define ZMQ_EXPLICIT +#define ZMQ_OVERRIDE +#define ZMQ_NULLPTR 0 +#define ZMQ_CONSTEXPR_FN +#define ZMQ_CONSTEXPR_VAR const +#endif + #include -#include #include #include -#include + +#include #include +#include +#include +#include +#include +#ifdef ZMQ_CPP11 +#include +#include +#include +#include +#endif +#ifdef ZMQ_CPP17 +#ifdef __has_include +#if __has_include() +#include +#define ZMQ_HAS_OPTIONAL 1 +#endif +#if __has_include() +#include +#define ZMQ_HAS_STRING_VIEW 1 +#endif +#endif + +#endif + +/* Version macros for compile-time API version detection */ +#define CPPZMQ_VERSION_MAJOR 4 +#define CPPZMQ_VERSION_MINOR 6 +#define CPPZMQ_VERSION_PATCH 0 + +#define CPPZMQ_VERSION \ + ZMQ_MAKE_VERSION(CPPZMQ_VERSION_MAJOR, CPPZMQ_VERSION_MINOR, \ + CPPZMQ_VERSION_PATCH) // Detect whether the compiler supports C++11 rvalue references. -#if (defined(__GNUC__) && (__GNUC__ > 4 || \ - (__GNUC__ == 4 && __GNUC_MINOR__ > 2)) && \ - defined(__GXX_EXPERIMENTAL_CXX0X__)) - #define ZMQ_HAS_RVALUE_REFS - #define ZMQ_DELETED_FUNCTION = delete +#if (defined(__GNUC__) && (__GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ > 2)) \ + && defined(__GXX_EXPERIMENTAL_CXX0X__)) +#define ZMQ_HAS_RVALUE_REFS +#define ZMQ_DELETED_FUNCTION = delete #elif defined(__clang__) - #if __has_feature(cxx_rvalue_references) - #define ZMQ_HAS_RVALUE_REFS - #endif - - #if __has_feature(cxx_deleted_functions) - #define ZMQ_DELETED_FUNCTION = delete - #else - #define ZMQ_DELETED_FUNCTION - #endif +#if __has_feature(cxx_rvalue_references) +#define ZMQ_HAS_RVALUE_REFS +#endif + +#if __has_feature(cxx_deleted_functions) +#define ZMQ_DELETED_FUNCTION = delete +#else +#define ZMQ_DELETED_FUNCTION +#endif +#elif defined(_MSC_VER) && (_MSC_VER >= 1900) +#define ZMQ_HAS_RVALUE_REFS +#define ZMQ_DELETED_FUNCTION = delete #elif defined(_MSC_VER) && (_MSC_VER >= 1600) - #define ZMQ_HAS_RVALUE_REFS - #define ZMQ_DELETED_FUNCTION +#define ZMQ_HAS_RVALUE_REFS +#define ZMQ_DELETED_FUNCTION +#else +#define ZMQ_DELETED_FUNCTION +#endif + +#if defined(ZMQ_CPP11) && !defined(__llvm__) && !defined(__INTEL_COMPILER) \ + && defined(__GNUC__) && __GNUC__ < 5 +#define ZMQ_CPP11_PARTIAL +#elif defined(__GLIBCXX__) && __GLIBCXX__ < 20160805 +//the date here is the last date of gcc 4.9.4, which +// effectively means libstdc++ from gcc 5.5 and higher won't trigger this branch +#define ZMQ_CPP11_PARTIAL +#endif + +#ifdef ZMQ_CPP11 +#ifdef ZMQ_CPP11_PARTIAL +#define ZMQ_IS_TRIVIALLY_COPYABLE(T) __has_trivial_copy(T) #else - #define ZMQ_DELETED_FUNCTION +#include +#define ZMQ_IS_TRIVIALLY_COPYABLE(T) std::is_trivially_copyable::value +#endif #endif #if ZMQ_VERSION >= ZMQ_MAKE_VERSION(3, 3, 0) @@ -63,540 +159,1962 @@ #if ZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 1, 0) #define ZMQ_HAS_PROXY_STEERABLE /* Socket event data */ -typedef struct { - uint16_t event; // id of the event as bitfield - int32_t value ; // value is either error code, fd or reconnect interval +typedef struct +{ + uint16_t event; // id of the event as bitfield + int32_t value; // value is either error code, fd or reconnect interval } zmq_event_t; #endif +// Avoid using deprecated message receive function when possible +#if ZMQ_VERSION < ZMQ_MAKE_VERSION(3, 2, 0) +#define zmq_msg_recv(msg, socket, flags) zmq_recvmsg(socket, msg, flags) +#endif + + // In order to prevent unused variable warnings when building in non-debug // mode use this macro to make assertions. #ifndef NDEBUG -# define ZMQ_ASSERT(expression) assert(expression) +#define ZMQ_ASSERT(expression) assert(expression) #else -# define ZMQ_ASSERT(expression) (void)(expression) +#define ZMQ_ASSERT(expression) (void) (expression) #endif namespace zmq { - typedef zmq_free_fn free_fn; - typedef zmq_pollitem_t pollitem_t; +#ifdef ZMQ_CPP11 +namespace detail +{ +namespace ranges +{ +using std::begin; +using std::end; +template +auto begin(T&& r) -> decltype(begin(std::forward(r))) +{ + return begin(std::forward(r)); +} +template +auto end(T&& r) -> decltype(end(std::forward(r))) +{ + return end(std::forward(r)); +} +} // namespace ranges - class error_t : public std::exception - { - public: +template using void_t = void; - error_t () : errnum (zmq_errno ()) {} +template +using iter_value_t = typename std::iterator_traits::value_type; - virtual const char *what () const throw () - { - return zmq_strerror (errnum); - } +template +using range_iter_t = decltype( + ranges::begin(std::declval::type &>())); - int num () const - { - return errnum; - } +template +using range_value_t = iter_value_t>; + +template struct is_range : std::false_type +{ +}; + +template +struct is_range< + T, + void_t::type &>()) + == ranges::end(std::declval::type &>()))>> + : std::true_type +{ +}; + +} // namespace detail +#endif - private: +typedef zmq_free_fn free_fn; +typedef zmq_pollitem_t pollitem_t; - int errnum; - }; +class error_t : public std::exception +{ + public: + error_t() : errnum(zmq_errno()) {} + virtual const char *what() const ZMQ_NOTHROW ZMQ_OVERRIDE { return zmq_strerror(errnum); } + int num() const { return errnum; } + + private: + int errnum; +}; + +inline int poll(zmq_pollitem_t *items_, size_t nitems_, long timeout_ = -1) +{ + int rc = zmq_poll(items_, static_cast(nitems_), timeout_); + if (rc < 0) + throw error_t(); + return rc; +} + +ZMQ_DEPRECATED("from 4.3.1, use poll taking non-const items") +inline int poll(zmq_pollitem_t const *items_, size_t nitems_, long timeout_ = -1) +{ + return poll(const_cast(items_), nitems_, timeout_); +} - inline int poll (zmq_pollitem_t *items_, int nitems_, long timeout_ = -1) +#ifdef ZMQ_CPP11 +ZMQ_DEPRECATED("from 4.3.1, use poll taking non-const items") +inline int +poll(zmq_pollitem_t const *items, size_t nitems, std::chrono::milliseconds timeout) +{ + return poll(const_cast(items), nitems, static_cast(timeout.count())); +} + +ZMQ_DEPRECATED("from 4.3.1, use poll taking non-const items") +inline int poll(std::vector const &items, + std::chrono::milliseconds timeout) +{ + return poll(const_cast(items.data()), items.size(), static_cast(timeout.count())); +} + +ZMQ_DEPRECATED("from 4.3.1, use poll taking non-const items") +inline int poll(std::vector const &items, long timeout_ = -1) +{ + return poll(const_cast(items.data()), items.size(), timeout_); +} + +inline int +poll(zmq_pollitem_t *items, size_t nitems, std::chrono::milliseconds timeout) +{ + return poll(items, nitems, static_cast(timeout.count())); +} + +inline int poll(std::vector &items, + std::chrono::milliseconds timeout) +{ + return poll(items.data(), items.size(), static_cast(timeout.count())); +} + +inline int poll(std::vector &items, long timeout_ = -1) +{ + return poll(items.data(), items.size(), timeout_); +} +#endif + + +inline void version(int *major_, int *minor_, int *patch_) +{ + zmq_version(major_, minor_, patch_); +} + +#ifdef ZMQ_CPP11 +inline std::tuple version() +{ + std::tuple v; + zmq_version(&std::get<0>(v), &std::get<1>(v), &std::get<2>(v)); + return v; +} +#endif + +class message_t +{ + public: + message_t() ZMQ_NOTHROW { - int rc = zmq_poll (items_, nitems_, timeout_); - if (rc < 0) - throw error_t (); - return rc; + int rc = zmq_msg_init(&msg); + ZMQ_ASSERT(rc == 0); } - inline void proxy (void *frontend, void *backend, void *capture) + explicit message_t(size_t size_) { - int rc = zmq_proxy (frontend, backend, capture); + int rc = zmq_msg_init_size(&msg, size_); if (rc != 0) - throw error_t (); + throw error_t(); } - -#ifdef ZMQ_HAS_PROXY_STEERABLE - inline void proxy_steerable (void *frontend, void *backend, void *capture, void *control) + + template message_t(ForwardIter first, ForwardIter last) + { + typedef typename std::iterator_traits::value_type value_t; + + assert(std::distance(first, last) >= 0); + size_t const size_ = + static_cast(std::distance(first, last)) * sizeof(value_t); + int const rc = zmq_msg_init_size(&msg, size_); + if (rc != 0) + throw error_t(); + std::copy(first, last, data()); + } + + message_t(const void *data_, size_t size_) + { + int rc = zmq_msg_init_size(&msg, size_); + if (rc != 0) + throw error_t(); + memcpy(data(), data_, size_); + } + + message_t(void *data_, size_t size_, free_fn *ffn_, void *hint_ = ZMQ_NULLPTR) { - int rc = zmq_proxy_steerable (frontend, backend, capture, control); + int rc = zmq_msg_init_data(&msg, data_, size_, ffn_, hint_); if (rc != 0) - throw error_t (); + throw error_t(); + } + +#if defined(ZMQ_CPP11) && !defined(ZMQ_CPP11_PARTIAL) + template::value + && ZMQ_IS_TRIVIALLY_COPYABLE(detail::range_value_t) + && !std::is_same::value>::type> + explicit message_t(const Range &rng) : + message_t(detail::ranges::begin(rng), detail::ranges::end(rng)) + { } #endif - - inline void version (int *major_, int *minor_, int *patch_) + +#ifdef ZMQ_HAS_RVALUE_REFS + message_t(message_t &&rhs) ZMQ_NOTHROW : msg(rhs.msg) { - zmq_version (major_, minor_, patch_); + int rc = zmq_msg_init(&rhs.msg); + ZMQ_ASSERT(rc == 0); } - class message_t + message_t &operator=(message_t &&rhs) ZMQ_NOTHROW { - friend class socket_t; + std::swap(msg, rhs.msg); + return *this; + } +#endif - public: + ~message_t() ZMQ_NOTHROW + { + int rc = zmq_msg_close(&msg); + ZMQ_ASSERT(rc == 0); + } - inline message_t () - { - int rc = zmq_msg_init (&msg); - if (rc != 0) - throw error_t (); - } + void rebuild() + { + int rc = zmq_msg_close(&msg); + if (rc != 0) + throw error_t(); + rc = zmq_msg_init(&msg); + ZMQ_ASSERT(rc == 0); + } - inline explicit message_t (size_t size_) - { - int rc = zmq_msg_init_size (&msg, size_); - if (rc != 0) - throw error_t (); - } + void rebuild(size_t size_) + { + int rc = zmq_msg_close(&msg); + if (rc != 0) + throw error_t(); + rc = zmq_msg_init_size(&msg, size_); + if (rc != 0) + throw error_t(); + } - inline message_t (void *data_, size_t size_, free_fn *ffn_, - void *hint_ = NULL) - { - int rc = zmq_msg_init_data (&msg, data_, size_, ffn_, hint_); - if (rc != 0) - throw error_t (); - } + void rebuild(const void *data_, size_t size_) + { + int rc = zmq_msg_close(&msg); + if (rc != 0) + throw error_t(); + rc = zmq_msg_init_size(&msg, size_); + if (rc != 0) + throw error_t(); + memcpy(data(), data_, size_); + } -#ifdef ZMQ_HAS_RVALUE_REFS - inline message_t (message_t &&rhs) : msg (rhs.msg) - { - int rc = zmq_msg_init (&rhs.msg); - if (rc != 0) - throw error_t (); - } + void rebuild(void *data_, size_t size_, free_fn *ffn_, void *hint_ = ZMQ_NULLPTR) + { + int rc = zmq_msg_close(&msg); + if (rc != 0) + throw error_t(); + rc = zmq_msg_init_data(&msg, data_, size_, ffn_, hint_); + if (rc != 0) + throw error_t(); + } - inline message_t &operator = (message_t &&rhs) - { - std::swap (msg, rhs.msg); - return *this; - } -#endif + ZMQ_DEPRECATED("from 4.3.1, use move taking non-const reference instead") + void move(message_t const *msg_) + { + int rc = zmq_msg_move(&msg, const_cast(msg_->handle())); + if (rc != 0) + throw error_t(); + } - inline ~message_t () - { - int rc = zmq_msg_close (&msg); - ZMQ_ASSERT (rc == 0); - } + void move(message_t &msg_) + { + int rc = zmq_msg_move(&msg, msg_.handle()); + if (rc != 0) + throw error_t(); + } - inline void rebuild () - { - int rc = zmq_msg_close (&msg); - if (rc != 0) - throw error_t (); - rc = zmq_msg_init (&msg); - if (rc != 0) - throw error_t (); - } + ZMQ_DEPRECATED("from 4.3.1, use copy taking non-const reference instead") + void copy(message_t const *msg_) + { + int rc = zmq_msg_copy(&msg, const_cast(msg_->handle())); + if (rc != 0) + throw error_t(); + } - inline void rebuild (size_t size_) - { - int rc = zmq_msg_close (&msg); - if (rc != 0) - throw error_t (); - rc = zmq_msg_init_size (&msg, size_); - if (rc != 0) - throw error_t (); - } + void copy(message_t &msg_) + { + int rc = zmq_msg_copy(&msg, msg_.handle()); + if (rc != 0) + throw error_t(); + } - inline void rebuild (void *data_, size_t size_, free_fn *ffn_, - void *hint_ = NULL) - { - int rc = zmq_msg_close (&msg); - if (rc != 0) - throw error_t (); - rc = zmq_msg_init_data (&msg, data_, size_, ffn_, hint_); - if (rc != 0) - throw error_t (); - } + bool more() const ZMQ_NOTHROW + { + int rc = zmq_msg_more(const_cast(&msg)); + return rc != 0; + } - inline void move (message_t *msg_) - { - int rc = zmq_msg_move (&msg, &(msg_->msg)); - if (rc != 0) - throw error_t (); - } + void *data() ZMQ_NOTHROW { return zmq_msg_data(&msg); } - inline void copy (message_t *msg_) - { - int rc = zmq_msg_copy (&msg, &(msg_->msg)); - if (rc != 0) - throw error_t (); - } + const void *data() const ZMQ_NOTHROW + { + return zmq_msg_data(const_cast(&msg)); + } - inline bool more () - { - int rc = zmq_msg_more (&msg); - return rc != 0; - } + size_t size() const ZMQ_NOTHROW + { + return zmq_msg_size(const_cast(&msg)); + } - inline void *data () - { - return zmq_msg_data (&msg); - } + ZMQ_NODISCARD bool empty() const ZMQ_NOTHROW + { + return size() == 0u; + } - inline const void* data () const - { - return zmq_msg_data (const_cast(&msg)); - } + template T *data() ZMQ_NOTHROW { return static_cast(data()); } - inline size_t size () const - { - return zmq_msg_size (const_cast(&msg)); - } + template T const *data() const ZMQ_NOTHROW + { + return static_cast(data()); + } - private: + ZMQ_DEPRECATED("from 4.3.0, use operator== instead") + bool equal(const message_t *other) const ZMQ_NOTHROW + { + return *this == *other; + } - // The underlying message - zmq_msg_t msg; + bool operator==(const message_t &other) const ZMQ_NOTHROW + { + const size_t my_size = size(); + return my_size == other.size() && 0 == memcmp(data(), other.data(), my_size); + } - // Disable implicit message copying, so that users won't use shared - // messages (less efficient) without being aware of the fact. - message_t (const message_t&); - void operator = (const message_t&); - }; + bool operator!=(const message_t &other) const ZMQ_NOTHROW + { + return !(*this == other); + } - class context_t +#if ZMQ_VERSION >= ZMQ_MAKE_VERSION(3, 2, 0) + int get(int property_) { - friend class socket_t; + int value = zmq_msg_get(&msg, property_); + if (value == -1) + throw error_t(); + return value; + } +#endif - public: - inline context_t () - { - ptr = zmq_ctx_new (); - if (ptr == NULL) - throw error_t (); - } +#if ZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 1, 0) + const char *gets(const char *property_) + { + const char *value = zmq_msg_gets(&msg, property_); + if (value == ZMQ_NULLPTR) + throw error_t(); + return value; + } +#endif +#if defined(ZMQ_BUILD_DRAFT_API) && ZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 2, 0) + uint32_t routing_id() const + { + return zmq_msg_routing_id(const_cast(&msg)); + } - inline explicit context_t (int io_threads_, int max_sockets_ = ZMQ_MAX_SOCKETS_DFLT) - { - ptr = zmq_ctx_new (); - if (ptr == NULL) - throw error_t (); + void set_routing_id(uint32_t routing_id) + { + int rc = zmq_msg_set_routing_id(&msg, routing_id); + if (rc != 0) + throw error_t(); + } - int rc = zmq_ctx_set (ptr, ZMQ_IO_THREADS, io_threads_); - ZMQ_ASSERT (rc == 0); + const char* group() const + { + return zmq_msg_group(const_cast(&msg)); + } - rc = zmq_ctx_set (ptr, ZMQ_MAX_SOCKETS, max_sockets_); - ZMQ_ASSERT (rc == 0); - } + void set_group(const char* group) + { + int rc = zmq_msg_set_group(&msg, group); + if (rc != 0) + throw error_t(); + } +#endif -#ifdef ZMQ_HAS_RVALUE_REFS - inline context_t (context_t &&rhs) : ptr (rhs.ptr) - { - rhs.ptr = NULL; - } - inline context_t &operator = (context_t &&rhs) - { - std::swap (ptr, rhs.ptr); - return *this; - } + // interpret message content as a string + std::string to_string() const + { + return std::string(static_cast(data()), size()); + } +#ifdef ZMQ_CPP17 + // interpret message content as a string + std::string_view to_string_view() const noexcept + { + return std::string_view(static_cast(data()), size()); + } #endif - inline ~context_t () - { - close(); + /** Dump content to string for debugging. + * Ascii chars are readable, the rest is printed as hex. + * Probably ridiculously slow. + * Use to_string() or to_string_view() for + * interpreting the message as a string. + */ + std::string str() const + { + // Partly mutuated from the same method in zmq::multipart_t + std::stringstream os; + + const unsigned char *msg_data = this->data(); + unsigned char byte; + size_t size = this->size(); + int is_ascii[2] = {0, 0}; + + os << "zmq::message_t [size " << std::dec << std::setw(3) + << std::setfill('0') << size << "] ("; + // Totally arbitrary + if (size >= 1000) { + os << "... too big to print)"; + } else { + while (size--) { + byte = *msg_data++; + + is_ascii[1] = (byte >= 32 && byte < 127); + if (is_ascii[1] != is_ascii[0]) + os << " "; // Separate text/non text + + if (is_ascii[1]) { + os << byte; + } else { + os << std::hex << std::uppercase << std::setw(2) + << std::setfill('0') << static_cast(byte); + } + is_ascii[0] = is_ascii[1]; + } + os << ")"; } + return os.str(); + } - inline void close() - { - if (ptr == NULL) - return; - int rc = zmq_ctx_destroy (ptr); - ZMQ_ASSERT (rc == 0); - ptr = NULL; - } + void swap(message_t &other) ZMQ_NOTHROW + { + // this assumes zmq::msg_t from libzmq is trivially relocatable + std::swap(msg, other.msg); + } - // Be careful with this, it's probably only useful for - // using the C api together with an existing C++ api. - // Normally you should never need to use this. - inline operator void* () - { - return ptr; - } + ZMQ_NODISCARD zmq_msg_t *handle() ZMQ_NOTHROW { return &msg; } + ZMQ_NODISCARD const zmq_msg_t *handle() const ZMQ_NOTHROW { return &msg; } - private: + private: + // The underlying message + zmq_msg_t msg; - void *ptr; + // Disable implicit message copying, so that users won't use shared + // messages (less efficient) without being aware of the fact. + message_t(const message_t &) ZMQ_DELETED_FUNCTION; + void operator=(const message_t &) ZMQ_DELETED_FUNCTION; +}; - context_t (const context_t&); - void operator = (const context_t&); - }; +inline void swap(message_t &a, message_t &b) ZMQ_NOTHROW +{ + a.swap(b); +} - class socket_t +class context_t +{ + public: + context_t() { - friend class monitor_t; - public: + ptr = zmq_ctx_new(); + if (ptr == ZMQ_NULLPTR) + throw error_t(); + } - inline socket_t (context_t &context_, int type_) - { - ctxptr = context_.ptr; - ptr = zmq_socket (context_.ptr, type_); - if (ptr == NULL) - throw error_t (); - } + + explicit context_t(int io_threads_, + int max_sockets_ = ZMQ_MAX_SOCKETS_DFLT) + { + ptr = zmq_ctx_new(); + if (ptr == ZMQ_NULLPTR) + throw error_t(); + + int rc = zmq_ctx_set(ptr, ZMQ_IO_THREADS, io_threads_); + ZMQ_ASSERT(rc == 0); + + rc = zmq_ctx_set(ptr, ZMQ_MAX_SOCKETS, max_sockets_); + ZMQ_ASSERT(rc == 0); + } #ifdef ZMQ_HAS_RVALUE_REFS - inline socket_t(socket_t&& rhs) : ptr(rhs.ptr) - { - rhs.ptr = NULL; - } - inline socket_t& operator=(socket_t&& rhs) - { - std::swap(ptr, rhs.ptr); - return *this; - } + context_t(context_t &&rhs) ZMQ_NOTHROW : ptr(rhs.ptr) { rhs.ptr = ZMQ_NULLPTR; } + context_t &operator=(context_t &&rhs) ZMQ_NOTHROW + { + close(); + std::swap(ptr, rhs.ptr); + return *this; + } #endif - inline ~socket_t () - { - close(); - } + int setctxopt(int option_, int optval_) + { + int rc = zmq_ctx_set(ptr, option_, optval_); + ZMQ_ASSERT(rc == 0); + return rc; + } - inline operator void* () - { - return ptr; - } + int getctxopt(int option_) { return zmq_ctx_get(ptr, option_); } - inline void close() - { - if(ptr == NULL) - // already closed - return ; - int rc = zmq_close (ptr); - ZMQ_ASSERT (rc == 0); - ptr = 0 ; + ~context_t() ZMQ_NOTHROW { close(); } + + void close() ZMQ_NOTHROW + { + if (ptr == ZMQ_NULLPTR) + return; + + int rc; + do { + rc = zmq_ctx_destroy(ptr); + } while (rc == -1 && errno == EINTR); + + ZMQ_ASSERT(rc == 0); + ptr = ZMQ_NULLPTR; + } + + // Be careful with this, it's probably only useful for + // using the C api together with an existing C++ api. + // Normally you should never need to use this. + ZMQ_EXPLICIT operator void *() ZMQ_NOTHROW { return ptr; } + + ZMQ_EXPLICIT operator void const *() const ZMQ_NOTHROW { return ptr; } + + operator bool() const ZMQ_NOTHROW { return ptr != ZMQ_NULLPTR; } + + void swap(context_t &other) ZMQ_NOTHROW + { + std::swap(ptr, other.ptr); + } + + private: + void *ptr; + + context_t(const context_t &) ZMQ_DELETED_FUNCTION; + void operator=(const context_t &) ZMQ_DELETED_FUNCTION; +}; + +inline void swap(context_t &a, context_t &b) ZMQ_NOTHROW { + a.swap(b); +} + +#ifdef ZMQ_CPP11 + +struct recv_buffer_size +{ + size_t size; // number of bytes written to buffer + size_t untruncated_size; // untruncated message size in bytes + + ZMQ_NODISCARD bool truncated() const noexcept + { + return size != untruncated_size; + } +}; + +#if defined(ZMQ_HAS_OPTIONAL) && (ZMQ_HAS_OPTIONAL > 0) + +using send_result_t = std::optional; +using recv_result_t = std::optional; +using recv_buffer_result_t = std::optional; + +#else + +namespace detail +{ +// A C++11 type emulating the most basic +// operations of std::optional for trivial types +template class trivial_optional +{ + public: + static_assert(std::is_trivial::value, "T must be trivial"); + using value_type = T; + + trivial_optional() = default; + trivial_optional(T value) noexcept : _value(value), _has_value(true) {} + + const T *operator->() const noexcept + { + assert(_has_value); + return &_value; + } + T *operator->() noexcept + { + assert(_has_value); + return &_value; + } + + const T &operator*() const noexcept + { + assert(_has_value); + return _value; + } + T &operator*() noexcept + { + assert(_has_value); + return _value; + } + + T &value() + { + if (!_has_value) + throw std::exception(); + return _value; + } + const T &value() const + { + if (!_has_value) + throw std::exception(); + return _value; + } + + explicit operator bool() const noexcept { return _has_value; } + bool has_value() const noexcept { return _has_value; } + + private: + T _value{}; + bool _has_value{false}; +}; +} // namespace detail + +using send_result_t = detail::trivial_optional; +using recv_result_t = detail::trivial_optional; +using recv_buffer_result_t = detail::trivial_optional; + +#endif + +namespace detail +{ + +template +constexpr T enum_bit_or(T a, T b) noexcept +{ + static_assert(std::is_enum::value, "must be enum"); + using U = typename std::underlying_type::type; + return static_cast(static_cast(a) | static_cast(b)); +} +template +constexpr T enum_bit_and(T a, T b) noexcept +{ + static_assert(std::is_enum::value, "must be enum"); + using U = typename std::underlying_type::type; + return static_cast(static_cast(a) & static_cast(b)); +} +template +constexpr T enum_bit_xor(T a, T b) noexcept +{ + static_assert(std::is_enum::value, "must be enum"); + using U = typename std::underlying_type::type; + return static_cast(static_cast(a) ^ static_cast(b)); +} +template +constexpr T enum_bit_not(T a) noexcept +{ + static_assert(std::is_enum::value, "must be enum"); + using U = typename std::underlying_type::type; + return static_cast(~static_cast(a)); +} +} // namespace detail + +// partially satisfies named requirement BitmaskType +enum class send_flags : int +{ + none = 0, + dontwait = ZMQ_DONTWAIT, + sndmore = ZMQ_SNDMORE +}; + +constexpr send_flags operator|(send_flags a, send_flags b) noexcept +{ + return detail::enum_bit_or(a, b); +} +constexpr send_flags operator&(send_flags a, send_flags b) noexcept +{ + return detail::enum_bit_and(a, b); +} +constexpr send_flags operator^(send_flags a, send_flags b) noexcept +{ + return detail::enum_bit_xor(a, b); +} +constexpr send_flags operator~(send_flags a) noexcept +{ + return detail::enum_bit_not(a); +} + +// partially satisfies named requirement BitmaskType +enum class recv_flags : int +{ + none = 0, + dontwait = ZMQ_DONTWAIT +}; + +constexpr recv_flags operator|(recv_flags a, recv_flags b) noexcept +{ + return detail::enum_bit_or(a, b); +} +constexpr recv_flags operator&(recv_flags a, recv_flags b) noexcept +{ + return detail::enum_bit_and(a, b); +} +constexpr recv_flags operator^(recv_flags a, recv_flags b) noexcept +{ + return detail::enum_bit_xor(a, b); +} +constexpr recv_flags operator~(recv_flags a) noexcept +{ + return detail::enum_bit_not(a); +} + + +// mutable_buffer, const_buffer and buffer are based on +// the Networking TS specification, draft: +// http://www.open-std.org/jtc1/sc22/wg21/docs/papers/2018/n4771.pdf + +class mutable_buffer +{ + public: + constexpr mutable_buffer() noexcept : _data(nullptr), _size(0) {} + constexpr mutable_buffer(void *p, size_t n) noexcept : _data(p), _size(n) + { +#ifdef ZMQ_CPP14 + assert(p != nullptr || n == 0); +#endif + } + + constexpr void *data() const noexcept { return _data; } + constexpr size_t size() const noexcept { return _size; } + mutable_buffer &operator+=(size_t n) noexcept + { + // (std::min) is a workaround for when a min macro is defined + const auto shift = (std::min)(n, _size); + _data = static_cast(_data) + shift; + _size -= shift; + return *this; + } + + private: + void *_data; + size_t _size; +}; + +inline mutable_buffer operator+(const mutable_buffer &mb, size_t n) noexcept +{ + return mutable_buffer(static_cast(mb.data()) + (std::min)(n, mb.size()), + mb.size() - (std::min)(n, mb.size())); +} +inline mutable_buffer operator+(size_t n, const mutable_buffer &mb) noexcept +{ + return mb + n; +} + +class const_buffer +{ + public: + constexpr const_buffer() noexcept : _data(nullptr), _size(0) {} + constexpr const_buffer(const void *p, size_t n) noexcept : _data(p), _size(n) + { +#ifdef ZMQ_CPP14 + assert(p != nullptr || n == 0); +#endif + } + constexpr const_buffer(const mutable_buffer &mb) noexcept : + _data(mb.data()), + _size(mb.size()) + { + } + + constexpr const void *data() const noexcept { return _data; } + constexpr size_t size() const noexcept { return _size; } + const_buffer &operator+=(size_t n) noexcept + { + const auto shift = (std::min)(n, _size); + _data = static_cast(_data) + shift; + _size -= shift; + return *this; + } + + private: + const void *_data; + size_t _size; +}; + +inline const_buffer operator+(const const_buffer &cb, size_t n) noexcept +{ + return const_buffer(static_cast(cb.data()) + + (std::min)(n, cb.size()), + cb.size() - (std::min)(n, cb.size())); +} +inline const_buffer operator+(size_t n, const const_buffer &cb) noexcept +{ + return cb + n; +} + +// buffer creation + +constexpr mutable_buffer buffer(void* p, size_t n) noexcept +{ + return mutable_buffer(p, n); +} +constexpr const_buffer buffer(const void* p, size_t n) noexcept +{ + return const_buffer(p, n); +} +constexpr mutable_buffer buffer(const mutable_buffer& mb) noexcept +{ + return mb; +} +inline mutable_buffer buffer(const mutable_buffer& mb, size_t n) noexcept +{ + return mutable_buffer(mb.data(), (std::min)(mb.size(), n)); +} +constexpr const_buffer buffer(const const_buffer& cb) noexcept +{ + return cb; +} +inline const_buffer buffer(const const_buffer& cb, size_t n) noexcept +{ + return const_buffer(cb.data(), (std::min)(cb.size(), n)); +} + +namespace detail +{ + +template +struct is_buffer +{ + static constexpr bool value = + std::is_same::value || + std::is_same::value; +}; + +template struct is_pod_like +{ + // NOTE: The networking draft N4771 section 16.11 requires + // T in the buffer functions below to be + // trivially copyable OR standard layout. + // Here we decide to be conservative and require both. + static constexpr bool value = + ZMQ_IS_TRIVIALLY_COPYABLE(T) && std::is_standard_layout::value; +}; + +template constexpr auto seq_size(const C &c) noexcept -> decltype(c.size()) +{ + return c.size(); +} +template +constexpr size_t seq_size(const T (&/*array*/)[N]) noexcept +{ + return N; +} + +template +auto buffer_contiguous_sequence(Seq &&seq) noexcept + -> decltype(buffer(std::addressof(*std::begin(seq)), size_t{})) +{ + using T = typename std::remove_cv< + typename std::remove_reference::type>::type; + static_assert(detail::is_pod_like::value, "T must be POD"); + + const auto size = seq_size(seq); + return buffer(size != 0u ? std::addressof(*std::begin(seq)) : nullptr, + size * sizeof(T)); +} +template +auto buffer_contiguous_sequence(Seq &&seq, size_t n_bytes) noexcept + -> decltype(buffer_contiguous_sequence(seq)) +{ + using T = typename std::remove_cv< + typename std::remove_reference::type>::type; + static_assert(detail::is_pod_like::value, "T must be POD"); + + const auto size = seq_size(seq); + return buffer(size != 0u ? std::addressof(*std::begin(seq)) : nullptr, + (std::min)(size * sizeof(T), n_bytes)); +} + +} // namespace detail + +// C array +template mutable_buffer buffer(T (&data)[N]) noexcept +{ + return detail::buffer_contiguous_sequence(data); +} +template +mutable_buffer buffer(T (&data)[N], size_t n_bytes) noexcept +{ + return detail::buffer_contiguous_sequence(data, n_bytes); +} +template const_buffer buffer(const T (&data)[N]) noexcept +{ + return detail::buffer_contiguous_sequence(data); +} +template +const_buffer buffer(const T (&data)[N], size_t n_bytes) noexcept +{ + return detail::buffer_contiguous_sequence(data, n_bytes); +} +// std::array +template mutable_buffer buffer(std::array &data) noexcept +{ + return detail::buffer_contiguous_sequence(data); +} +template +mutable_buffer buffer(std::array &data, size_t n_bytes) noexcept +{ + return detail::buffer_contiguous_sequence(data, n_bytes); +} +template +const_buffer buffer(std::array &data) noexcept +{ + return detail::buffer_contiguous_sequence(data); +} +template +const_buffer buffer(std::array &data, size_t n_bytes) noexcept +{ + return detail::buffer_contiguous_sequence(data, n_bytes); +} +template +const_buffer buffer(const std::array &data) noexcept +{ + return detail::buffer_contiguous_sequence(data); +} +template +const_buffer buffer(const std::array &data, size_t n_bytes) noexcept +{ + return detail::buffer_contiguous_sequence(data, n_bytes); +} +// std::vector +template +mutable_buffer buffer(std::vector &data) noexcept +{ + return detail::buffer_contiguous_sequence(data); +} +template +mutable_buffer buffer(std::vector &data, size_t n_bytes) noexcept +{ + return detail::buffer_contiguous_sequence(data, n_bytes); +} +template +const_buffer buffer(const std::vector &data) noexcept +{ + return detail::buffer_contiguous_sequence(data); +} +template +const_buffer buffer(const std::vector &data, size_t n_bytes) noexcept +{ + return detail::buffer_contiguous_sequence(data, n_bytes); +} +// std::basic_string +template +mutable_buffer buffer(std::basic_string &data) noexcept +{ + return detail::buffer_contiguous_sequence(data); +} +template +mutable_buffer buffer(std::basic_string &data, + size_t n_bytes) noexcept +{ + return detail::buffer_contiguous_sequence(data, n_bytes); +} +template +const_buffer buffer(const std::basic_string &data) noexcept +{ + return detail::buffer_contiguous_sequence(data); +} +template +const_buffer buffer(const std::basic_string &data, + size_t n_bytes) noexcept +{ + return detail::buffer_contiguous_sequence(data, n_bytes); +} + +#if defined(ZMQ_HAS_STRING_VIEW) && (ZMQ_HAS_STRING_VIEW > 0) +// std::basic_string_view +template +const_buffer buffer(std::basic_string_view data) noexcept +{ + return detail::buffer_contiguous_sequence(data); +} +template +const_buffer buffer(std::basic_string_view data, size_t n_bytes) noexcept +{ + return detail::buffer_contiguous_sequence(data, n_bytes); +} +#endif + +// Buffer for a string literal (null terminated) +// where the buffer size excludes the terminating character. +// Equivalent to zmq::buffer(std::string_view("...")). +template +constexpr const_buffer str_buffer(const Char (&data)[N]) noexcept +{ + static_assert(detail::is_pod_like::value, "Char must be POD"); +#ifdef ZMQ_CPP14 + assert(data[N - 1] == Char{0}); +#endif + return const_buffer(static_cast(data), + (N - 1) * sizeof(Char)); +} + +namespace literals +{ + constexpr const_buffer operator"" _zbuf(const char* str, size_t len) noexcept + { + return const_buffer(str, len * sizeof(char)); + } + constexpr const_buffer operator"" _zbuf(const wchar_t* str, size_t len) noexcept + { + return const_buffer(str, len * sizeof(wchar_t)); + } + constexpr const_buffer operator"" _zbuf(const char16_t* str, size_t len) noexcept + { + return const_buffer(str, len * sizeof(char16_t)); + } + constexpr const_buffer operator"" _zbuf(const char32_t* str, size_t len) noexcept + { + return const_buffer(str, len * sizeof(char32_t)); + } +} + +#endif // ZMQ_CPP11 + +namespace detail +{ + +class socket_base +{ +public: + socket_base() ZMQ_NOTHROW : _handle(ZMQ_NULLPTR) {} + ZMQ_EXPLICIT socket_base(void *handle) ZMQ_NOTHROW : _handle(handle) {} + + template void setsockopt(int option_, T const &optval) + { + setsockopt(option_, &optval, sizeof(T)); + } + + void setsockopt(int option_, const void *optval_, size_t optvallen_) + { + int rc = zmq_setsockopt(_handle, option_, optval_, optvallen_); + if (rc != 0) + throw error_t(); + } + + void getsockopt(int option_, void *optval_, size_t *optvallen_) const + { + int rc = zmq_getsockopt(_handle, option_, optval_, optvallen_); + if (rc != 0) + throw error_t(); + } + + template T getsockopt(int option_) const + { + T optval; + size_t optlen = sizeof(T); + getsockopt(option_, &optval, &optlen); + return optval; + } + + void bind(std::string const &addr) { bind(addr.c_str()); } + + void bind(const char *addr_) + { + int rc = zmq_bind(_handle, addr_); + if (rc != 0) + throw error_t(); + } + + void unbind(std::string const &addr) { unbind(addr.c_str()); } + + void unbind(const char *addr_) + { + int rc = zmq_unbind(_handle, addr_); + if (rc != 0) + throw error_t(); + } + + void connect(std::string const &addr) { connect(addr.c_str()); } + + void connect(const char *addr_) + { + int rc = zmq_connect(_handle, addr_); + if (rc != 0) + throw error_t(); + } + + void disconnect(std::string const &addr) { disconnect(addr.c_str()); } + + void disconnect(const char *addr_) + { + int rc = zmq_disconnect(_handle, addr_); + if (rc != 0) + throw error_t(); + } + + bool connected() const ZMQ_NOTHROW { return (_handle != ZMQ_NULLPTR); } + +#ifdef ZMQ_CPP11 + ZMQ_DEPRECATED("from 4.3.1, use send taking a const_buffer and send_flags") +#endif + size_t send(const void *buf_, size_t len_, int flags_ = 0) + { + int nbytes = zmq_send(_handle, buf_, len_, flags_); + if (nbytes >= 0) + return static_cast(nbytes); + if (zmq_errno() == EAGAIN) + return 0; + throw error_t(); + } + +#ifdef ZMQ_CPP11 + ZMQ_DEPRECATED("from 4.3.1, use send taking message_t and send_flags") +#endif + bool send(message_t &msg_, + int flags_ = 0) // default until removed + { + int nbytes = zmq_msg_send(msg_.handle(), _handle, flags_); + if (nbytes >= 0) + return true; + if (zmq_errno() == EAGAIN) + return false; + throw error_t(); + } + + template +#ifdef ZMQ_CPP11 + ZMQ_DEPRECATED("from 4.4.1, use send taking message_t or buffer (for contiguous ranges), and send_flags") +#endif + bool send(T first, T last, int flags_ = 0) + { + zmq::message_t msg(first, last); + int nbytes = zmq_msg_send(msg.handle(), _handle, flags_); + if (nbytes >= 0) + return true; + if (zmq_errno() == EAGAIN) + return false; + throw error_t(); + } + +#ifdef ZMQ_HAS_RVALUE_REFS +#ifdef ZMQ_CPP11 + ZMQ_DEPRECATED("from 4.3.1, use send taking message_t and send_flags") +#endif + bool send(message_t &&msg_, + int flags_ = 0) // default until removed + { + #ifdef ZMQ_CPP11 + return send(msg_, static_cast(flags_)).has_value(); + #else + return send(msg_, flags_); + #endif + } +#endif + +#ifdef ZMQ_CPP11 + send_result_t send(const_buffer buf, send_flags flags = send_flags::none) + { + const int nbytes = + zmq_send(_handle, buf.data(), buf.size(), static_cast(flags)); + if (nbytes >= 0) + return static_cast(nbytes); + if (zmq_errno() == EAGAIN) + return {}; + throw error_t(); + } + + send_result_t send(message_t &msg, send_flags flags) + { + int nbytes = zmq_msg_send(msg.handle(), _handle, static_cast(flags)); + if (nbytes >= 0) + return static_cast(nbytes); + if (zmq_errno() == EAGAIN) + return {}; + throw error_t(); + } + + send_result_t send(message_t &&msg, send_flags flags) + { + return send(msg, flags); + } +#endif + +#ifdef ZMQ_CPP11 + ZMQ_DEPRECATED("from 4.3.1, use recv taking a mutable_buffer and recv_flags") +#endif + size_t recv(void *buf_, size_t len_, int flags_ = 0) + { + int nbytes = zmq_recv(_handle, buf_, len_, flags_); + if (nbytes >= 0) + return static_cast(nbytes); + if (zmq_errno() == EAGAIN) + return 0; + throw error_t(); + } + +#ifdef ZMQ_CPP11 + ZMQ_DEPRECATED("from 4.3.1, use recv taking a reference to message_t and recv_flags") +#endif + bool recv(message_t *msg_, int flags_ = 0) + { + int nbytes = zmq_msg_recv(msg_->handle(), _handle, flags_); + if (nbytes >= 0) + return true; + if (zmq_errno() == EAGAIN) + return false; + throw error_t(); + } + +#ifdef ZMQ_CPP11 + ZMQ_NODISCARD + recv_buffer_result_t recv(mutable_buffer buf, + recv_flags flags = recv_flags::none) + { + const int nbytes = + zmq_recv(_handle, buf.data(), buf.size(), static_cast(flags)); + if (nbytes >= 0) { + return recv_buffer_size{(std::min)(static_cast(nbytes), buf.size()), + static_cast(nbytes)}; } + if (zmq_errno() == EAGAIN) + return {}; + throw error_t(); + } + + ZMQ_NODISCARD + recv_result_t recv(message_t &msg, recv_flags flags = recv_flags::none) + { + const int nbytes = zmq_msg_recv(msg.handle(), _handle, static_cast(flags)); + if (nbytes >= 0) { + assert(msg.size() == static_cast(nbytes)); + return static_cast(nbytes); + } + if (zmq_errno() == EAGAIN) + return {}; + throw error_t(); + } +#endif + +#if defined(ZMQ_BUILD_DRAFT_API) && ZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 2, 0) + void join(const char* group) + { + int rc = zmq_join(_handle, group); + if (rc != 0) + throw error_t(); + } + + void leave(const char* group) + { + int rc = zmq_leave(_handle, group); + if (rc != 0) + throw error_t(); + } +#endif + + ZMQ_NODISCARD void *handle() ZMQ_NOTHROW { return _handle; } + ZMQ_NODISCARD const void *handle() const ZMQ_NOTHROW { return _handle; } + + ZMQ_EXPLICIT operator bool() const ZMQ_NOTHROW { return _handle != ZMQ_NULLPTR; } + // note: non-const operator bool can be removed once + // operator void* is removed from socket_t + ZMQ_EXPLICIT operator bool() ZMQ_NOTHROW { return _handle != ZMQ_NULLPTR; } + +protected: + void *_handle; +}; +} // namespace detail + +#ifdef ZMQ_CPP11 +enum class socket_type : int +{ + req = ZMQ_REQ, + rep = ZMQ_REP, + dealer = ZMQ_DEALER, + router = ZMQ_ROUTER, + pub = ZMQ_PUB, + sub = ZMQ_SUB, + xpub = ZMQ_XPUB, + xsub = ZMQ_XSUB, + push = ZMQ_PUSH, + pull = ZMQ_PULL, +#if defined(ZMQ_BUILD_DRAFT_API) && ZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 2, 0) + server = ZMQ_SERVER, + client = ZMQ_CLIENT, + radio = ZMQ_RADIO, + dish = ZMQ_DISH, +#endif +#if ZMQ_VERSION_MAJOR >= 4 + stream = ZMQ_STREAM, +#endif + pair = ZMQ_PAIR +}; +#endif - inline void setsockopt (int option_, const void *optval_, - size_t optvallen_) - { - int rc = zmq_setsockopt (ptr, option_, optval_, optvallen_); - if (rc != 0) - throw error_t (); - } +struct from_handle_t +{ + struct _private {}; // disabling use other than with from_handle + ZMQ_CONSTEXPR_FN ZMQ_EXPLICIT from_handle_t(_private /*p*/) ZMQ_NOTHROW {} +}; - inline void getsockopt (int option_, void *optval_, - size_t *optvallen_) - { - int rc = zmq_getsockopt (ptr, option_, optval_, optvallen_); - if (rc != 0) - throw error_t (); - } - - inline void bind (const char *addr_) - { - int rc = zmq_bind (ptr, addr_); - if (rc != 0) - throw error_t (); - } +ZMQ_CONSTEXPR_VAR from_handle_t from_handle = from_handle_t(from_handle_t::_private()); - inline void unbind (const char *addr_) - { - int rc = zmq_unbind (ptr, addr_); - if (rc != 0) - throw error_t (); - } +// A non-owning nullable reference to a socket. +// The reference is invalidated on socket close or destruction. +class socket_ref : public detail::socket_base +{ + public: + socket_ref() ZMQ_NOTHROW : detail::socket_base() {} +#ifdef ZMQ_CPP11 + socket_ref(std::nullptr_t) ZMQ_NOTHROW : detail::socket_base() {} +#endif + socket_ref(from_handle_t /*fh*/, void *handle) ZMQ_NOTHROW + : detail::socket_base(handle) {} +}; - inline void connect (const char *addr_) - { - int rc = zmq_connect (ptr, addr_); - if (rc != 0) - throw error_t (); - } +#ifdef ZMQ_CPP11 +inline bool operator==(socket_ref sr, std::nullptr_t /*p*/) ZMQ_NOTHROW +{ + return sr.handle() == nullptr; +} +inline bool operator==(std::nullptr_t /*p*/, socket_ref sr) ZMQ_NOTHROW +{ + return sr.handle() == nullptr; +} +inline bool operator!=(socket_ref sr, std::nullptr_t /*p*/) ZMQ_NOTHROW +{ + return !(sr == nullptr); +} +inline bool operator!=(std::nullptr_t /*p*/, socket_ref sr) ZMQ_NOTHROW +{ + return !(sr == nullptr); +} +#endif - inline void disconnect (const char *addr_) - { - int rc = zmq_disconnect (ptr, addr_); - if (rc != 0) - throw error_t (); - } +inline bool operator==(socket_ref a, socket_ref b) ZMQ_NOTHROW +{ + return std::equal_to()(a.handle(), b.handle()); +} +inline bool operator!=(socket_ref a, socket_ref b) ZMQ_NOTHROW +{ + return !(a == b); +} +inline bool operator<(socket_ref a, socket_ref b) ZMQ_NOTHROW +{ + return std::less()(a.handle(), b.handle()); +} +inline bool operator>(socket_ref a, socket_ref b) ZMQ_NOTHROW +{ + return b < a; +} +inline bool operator<=(socket_ref a, socket_ref b) ZMQ_NOTHROW +{ + return !(a > b); +} +inline bool operator>=(socket_ref a, socket_ref b) ZMQ_NOTHROW +{ + return !(a < b); +} - inline bool connected() - { - return(ptr != NULL); - } - - inline size_t send (const void *buf_, size_t len_, int flags_ = 0) - { - int nbytes = zmq_send (ptr, buf_, len_, flags_); - if (nbytes >= 0) - return (size_t) nbytes; - if (zmq_errno () == EAGAIN) - return 0; - throw error_t (); - } +} // namespace zmq - inline bool send (message_t &msg_, int flags_ = 0) - { - int nbytes = zmq_msg_send (&(msg_.msg), ptr, flags_); - if (nbytes >= 0) - return true; - if (zmq_errno () == EAGAIN) - return false; - throw error_t (); - } +#ifdef ZMQ_CPP11 +namespace std +{ +template<> +struct hash +{ + size_t operator()(zmq::socket_ref sr) const ZMQ_NOTHROW + { + return hash()(sr.handle()); + } +}; +} // namespace std +#endif + +namespace zmq +{ + +class socket_t : public detail::socket_base +{ + friend class monitor_t; + + public: + socket_t() ZMQ_NOTHROW + : detail::socket_base(ZMQ_NULLPTR) + , ctxptr(ZMQ_NULLPTR) + { + } + + socket_t(context_t &context_, int type_) + : detail::socket_base(zmq_socket(static_cast(context_), type_)) + , ctxptr(static_cast(context_)) + { + if (_handle == ZMQ_NULLPTR) + throw error_t(); + } + +#ifdef ZMQ_CPP11 + socket_t(context_t &context_, socket_type type_) + : socket_t(context_, static_cast(type_)) + { + } +#endif #ifdef ZMQ_HAS_RVALUE_REFS - inline bool send (message_t &&msg_, int flags_ = 0) - { - return send(msg_, flags_); - } + socket_t(socket_t &&rhs) ZMQ_NOTHROW : detail::socket_base(rhs._handle), ctxptr(rhs.ctxptr) + { + rhs._handle = ZMQ_NULLPTR; + rhs.ctxptr = ZMQ_NULLPTR; + } + socket_t &operator=(socket_t &&rhs) ZMQ_NOTHROW + { + close(); + std::swap(_handle, rhs._handle); + return *this; + } #endif - inline size_t recv (void *buf_, size_t len_, int flags_ = 0) - { - int nbytes = zmq_recv (ptr, buf_, len_, flags_); - if (nbytes >= 0) - return (size_t) nbytes; - if (zmq_errno () == EAGAIN) - return 0; - throw error_t (); - } + ~socket_t() ZMQ_NOTHROW { close(); } - inline bool recv (message_t *msg_, int flags_ = 0) - { - int nbytes = zmq_msg_recv (&(msg_->msg), ptr, flags_); - if (nbytes >= 0) - return true; - if (zmq_errno () == EAGAIN) - return false; - throw error_t (); + operator void *() ZMQ_NOTHROW { return _handle; } + + operator void const *() const ZMQ_NOTHROW { return _handle; } + + void close() ZMQ_NOTHROW + { + if (_handle == ZMQ_NULLPTR) + // already closed + return; + int rc = zmq_close(_handle); + ZMQ_ASSERT(rc == 0); + _handle = ZMQ_NULLPTR; + } + + void swap(socket_t &other) ZMQ_NOTHROW + { + std::swap(_handle, other._handle); + std::swap(ctxptr, other.ctxptr); + } + + operator socket_ref() ZMQ_NOTHROW + { + return socket_ref(from_handle, _handle); + } + + private: + void *ctxptr; + + socket_t(const socket_t &) ZMQ_DELETED_FUNCTION; + void operator=(const socket_t &) ZMQ_DELETED_FUNCTION; + + // used by monitor_t + socket_t(void *context_, int type_) + : detail::socket_base(zmq_socket(context_, type_)) + , ctxptr(context_) + { + if (_handle == ZMQ_NULLPTR) + throw error_t(); + } +}; + +inline void swap(socket_t &a, socket_t &b) ZMQ_NOTHROW { + a.swap(b); +} + +ZMQ_DEPRECATED("from 4.3.1, use proxy taking socket_t objects") +inline void proxy(void *frontend, void *backend, void *capture) +{ + int rc = zmq_proxy(frontend, backend, capture); + if (rc != 0) + throw error_t(); +} + +inline void +proxy(socket_ref frontend, socket_ref backend, socket_ref capture = socket_ref()) +{ + int rc = zmq_proxy(frontend.handle(), backend.handle(), capture.handle()); + if (rc != 0) + throw error_t(); +} + +#ifdef ZMQ_HAS_PROXY_STEERABLE +ZMQ_DEPRECATED("from 4.3.1, use proxy_steerable taking socket_t objects") +inline void +proxy_steerable(void *frontend, void *backend, void *capture, void *control) +{ + int rc = zmq_proxy_steerable(frontend, backend, capture, control); + if (rc != 0) + throw error_t(); +} + +inline void proxy_steerable(socket_ref frontend, + socket_ref backend, + socket_ref capture, + socket_ref control) +{ + int rc = zmq_proxy_steerable(frontend.handle(), backend.handle(), + capture.handle(), control.handle()); + if (rc != 0) + throw error_t(); +} +#endif + +class monitor_t +{ + public: + monitor_t() : _socket(), _monitor_socket() {} + + virtual ~monitor_t() + { + close(); + } + +#ifdef ZMQ_HAS_RVALUE_REFS + monitor_t(monitor_t &&rhs) ZMQ_NOTHROW : _socket(), _monitor_socket() + { + std::swap(_socket, rhs._socket); + std::swap(_monitor_socket, rhs._monitor_socket); + } + + monitor_t &operator=(monitor_t &&rhs) ZMQ_NOTHROW + { + close(); + _socket = socket_ref(); + std::swap(_socket, rhs._socket); + std::swap(_monitor_socket, rhs._monitor_socket); + return *this; + } +#endif + + + void + monitor(socket_t &socket, std::string const &addr, int events = ZMQ_EVENT_ALL) + { + monitor(socket, addr.c_str(), events); + } + + void monitor(socket_t &socket, const char *addr_, int events = ZMQ_EVENT_ALL) + { + init(socket, addr_, events); + while (true) { + check_event(-1); } - - private: - void *ptr; - void *ctxptr; + } - socket_t (const socket_t&) ZMQ_DELETED_FUNCTION; - void operator = (const socket_t&) ZMQ_DELETED_FUNCTION; - }; + void init(socket_t &socket, std::string const &addr, int events = ZMQ_EVENT_ALL) + { + init(socket, addr.c_str(), events); + } - class monitor_t + void init(socket_t &socket, const char *addr_, int events = ZMQ_EVENT_ALL) { - public: - monitor_t() : socketPtr(NULL) {} - virtual ~monitor_t() {} + int rc = zmq_socket_monitor(socket.handle(), addr_, events); + if (rc != 0) + throw error_t(); + + _socket = socket; + _monitor_socket = socket_t(socket.ctxptr, ZMQ_PAIR); + _monitor_socket.connect(addr_); + + on_monitor_started(); + } + + bool check_event(int timeout = 0) + { + assert(_monitor_socket); + + zmq_msg_t eventMsg; + zmq_msg_init(&eventMsg); + + zmq::pollitem_t items[] = { + {_monitor_socket.handle(), 0, ZMQ_POLLIN, 0}, + }; + + zmq::poll(&items[0], 1, timeout); + + if (items[0].revents & ZMQ_POLLIN) { + int rc = zmq_msg_recv(&eventMsg, _monitor_socket.handle(), 0); + if (rc == -1 && zmq_errno() == ETERM) + return false; + assert(rc != -1); + + } else { + zmq_msg_close(&eventMsg); + return false; + } - void monitor(socket_t &socket, const char *addr_, int events = ZMQ_EVENT_ALL) - { - int rc = zmq_socket_monitor(socket.ptr, addr_, events); - if (rc != 0) - throw error_t (); - - socketPtr = socket.ptr; - void *s = zmq_socket (socket.ctxptr, ZMQ_PAIR); - assert (s); - - rc = zmq_connect (s, addr_); - assert (rc == 0); - - on_monitor_started(); - - while (true) { - zmq_msg_t eventMsg; - zmq_msg_init (&eventMsg); - rc = zmq_recvmsg (s, &eventMsg, 0); - if (rc == -1 && zmq_errno() == ETERM) - break; - assert (rc != -1); #if ZMQ_VERSION_MAJOR >= 4 - const char* data = static_cast(zmq_msg_data(&eventMsg)); - zmq_event_t msgEvent; - memcpy(&msgEvent.event, data, sizeof(uint16_t)); data += sizeof(uint16_t); - memcpy(&msgEvent.value, data, sizeof(int32_t)); - zmq_event_t* event = &msgEvent; + const char *data = static_cast(zmq_msg_data(&eventMsg)); + zmq_event_t msgEvent; + memcpy(&msgEvent.event, data, sizeof(uint16_t)); + data += sizeof(uint16_t); + memcpy(&msgEvent.value, data, sizeof(int32_t)); + zmq_event_t *event = &msgEvent; #else - zmq_event_t* event = static_cast(zmq_msg_data(&eventMsg)); + zmq_event_t *event = static_cast(zmq_msg_data(&eventMsg)); #endif - + #ifdef ZMQ_NEW_MONITOR_EVENT_LAYOUT - zmq_msg_t addrMsg; - zmq_msg_init (&addrMsg); - rc = zmq_recvmsg (s, &addrMsg, 0); - if (rc == -1 && zmq_errno() == ETERM) - break; - assert (rc != -1); - const char* str = static_cast(zmq_msg_data (&addrMsg)); - std::string address(str, str + zmq_msg_size(&addrMsg)); - zmq_msg_close (&addrMsg); + zmq_msg_t addrMsg; + zmq_msg_init(&addrMsg); + int rc = zmq_msg_recv(&addrMsg, _monitor_socket.handle(), 0); + if (rc == -1 && zmq_errno() == ETERM) { + zmq_msg_close(&eventMsg); + return false; + } + + assert(rc != -1); + const char *str = static_cast(zmq_msg_data(&addrMsg)); + std::string address(str, str + zmq_msg_size(&addrMsg)); + zmq_msg_close(&addrMsg); #else - // Bit of a hack, but all events in the zmq_event_t union have the same layout so this will work for all event types. - std::string address = event->data.connected.addr; + // Bit of a hack, but all events in the zmq_event_t union have the same layout so this will work for all event types. + std::string address = event->data.connected.addr; #endif #ifdef ZMQ_EVENT_MONITOR_STOPPED - if (event->event == ZMQ_EVENT_MONITOR_STOPPED) - break; -#endif - - switch (event->event) { - case ZMQ_EVENT_CONNECTED: - on_event_connected(*event, address.c_str()); - break; - case ZMQ_EVENT_CONNECT_DELAYED: - on_event_connect_delayed(*event, address.c_str()); - break; - case ZMQ_EVENT_CONNECT_RETRIED: - on_event_connect_retried(*event, address.c_str()); - break; - case ZMQ_EVENT_LISTENING: - on_event_listening(*event, address.c_str()); - break; - case ZMQ_EVENT_BIND_FAILED: - on_event_bind_failed(*event, address.c_str()); - break; - case ZMQ_EVENT_ACCEPTED: - on_event_accepted(*event, address.c_str()); - break; - case ZMQ_EVENT_ACCEPT_FAILED: - on_event_accept_failed(*event, address.c_str()); - break; - case ZMQ_EVENT_CLOSED: - on_event_closed(*event, address.c_str()); - break; - case ZMQ_EVENT_CLOSE_FAILED: - on_event_close_failed(*event, address.c_str()); - break; - case ZMQ_EVENT_DISCONNECTED: - on_event_disconnected(*event, address.c_str()); - break; - default: - on_event_unknown(*event, address.c_str()); - break; - } - zmq_msg_close (&eventMsg); - } - zmq_close (s); - socketPtr = NULL; + if (event->event == ZMQ_EVENT_MONITOR_STOPPED) { + zmq_msg_close(&eventMsg); + return false; + } + +#endif + + switch (event->event) { + case ZMQ_EVENT_CONNECTED: + on_event_connected(*event, address.c_str()); + break; + case ZMQ_EVENT_CONNECT_DELAYED: + on_event_connect_delayed(*event, address.c_str()); + break; + case ZMQ_EVENT_CONNECT_RETRIED: + on_event_connect_retried(*event, address.c_str()); + break; + case ZMQ_EVENT_LISTENING: + on_event_listening(*event, address.c_str()); + break; + case ZMQ_EVENT_BIND_FAILED: + on_event_bind_failed(*event, address.c_str()); + break; + case ZMQ_EVENT_ACCEPTED: + on_event_accepted(*event, address.c_str()); + break; + case ZMQ_EVENT_ACCEPT_FAILED: + on_event_accept_failed(*event, address.c_str()); + break; + case ZMQ_EVENT_CLOSED: + on_event_closed(*event, address.c_str()); + break; + case ZMQ_EVENT_CLOSE_FAILED: + on_event_close_failed(*event, address.c_str()); + break; + case ZMQ_EVENT_DISCONNECTED: + on_event_disconnected(*event, address.c_str()); + break; +#ifdef ZMQ_BUILD_DRAFT_API +#if ZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 2, 3) + case ZMQ_EVENT_HANDSHAKE_FAILED_NO_DETAIL: + on_event_handshake_failed_no_detail(*event, address.c_str()); + break; + case ZMQ_EVENT_HANDSHAKE_FAILED_PROTOCOL: + on_event_handshake_failed_protocol(*event, address.c_str()); + break; + case ZMQ_EVENT_HANDSHAKE_FAILED_AUTH: + on_event_handshake_failed_auth(*event, address.c_str()); + break; + case ZMQ_EVENT_HANDSHAKE_SUCCEEDED: + on_event_handshake_succeeded(*event, address.c_str()); + break; +#elif ZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 2, 1) + case ZMQ_EVENT_HANDSHAKE_FAILED: + on_event_handshake_failed(*event, address.c_str()); + break; + case ZMQ_EVENT_HANDSHAKE_SUCCEED: + on_event_handshake_succeed(*event, address.c_str()); + break; +#endif +#endif + default: + on_event_unknown(*event, address.c_str()); + break; } + zmq_msg_close(&eventMsg); + + return true; + } #ifdef ZMQ_EVENT_MONITOR_STOPPED - void abort() - { - if (socketPtr) - zmq_socket_monitor(socketPtr, NULL, 0); + void abort() + { + if (_socket) + zmq_socket_monitor(_socket.handle(), ZMQ_NULLPTR, 0); + + _socket = socket_ref(); + } +#endif + virtual void on_monitor_started() {} + virtual void on_event_connected(const zmq_event_t &event_, const char *addr_) + { + (void) event_; + (void) addr_; + } + virtual void on_event_connect_delayed(const zmq_event_t &event_, + const char *addr_) + { + (void) event_; + (void) addr_; + } + virtual void on_event_connect_retried(const zmq_event_t &event_, + const char *addr_) + { + (void) event_; + (void) addr_; + } + virtual void on_event_listening(const zmq_event_t &event_, const char *addr_) + { + (void) event_; + (void) addr_; + } + virtual void on_event_bind_failed(const zmq_event_t &event_, const char *addr_) + { + (void) event_; + (void) addr_; + } + virtual void on_event_accepted(const zmq_event_t &event_, const char *addr_) + { + (void) event_; + (void) addr_; + } + virtual void on_event_accept_failed(const zmq_event_t &event_, const char *addr_) + { + (void) event_; + (void) addr_; + } + virtual void on_event_closed(const zmq_event_t &event_, const char *addr_) + { + (void) event_; + (void) addr_; + } + virtual void on_event_close_failed(const zmq_event_t &event_, const char *addr_) + { + (void) event_; + (void) addr_; + } + virtual void on_event_disconnected(const zmq_event_t &event_, const char *addr_) + { + (void) event_; + (void) addr_; + } +#if ZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 2, 3) + virtual void on_event_handshake_failed_no_detail(const zmq_event_t &event_, + const char *addr_) + { + (void) event_; + (void) addr_; + } + virtual void on_event_handshake_failed_protocol(const zmq_event_t &event_, + const char *addr_) + { + (void) event_; + (void) addr_; + } + virtual void on_event_handshake_failed_auth(const zmq_event_t &event_, + const char *addr_) + { + (void) event_; + (void) addr_; + } + virtual void on_event_handshake_succeeded(const zmq_event_t &event_, + const char *addr_) + { + (void) event_; + (void) addr_; + } +#elif ZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 2, 1) + virtual void on_event_handshake_failed(const zmq_event_t &event_, + const char *addr_) + { + (void) event_; + (void) addr_; + } + virtual void on_event_handshake_succeed(const zmq_event_t &event_, + const char *addr_) + { + (void) event_; + (void) addr_; + } +#endif + virtual void on_event_unknown(const zmq_event_t &event_, const char *addr_) + { + (void) event_; + (void) addr_; + } + + private: + monitor_t(const monitor_t &) ZMQ_DELETED_FUNCTION; + void operator=(const monitor_t &) ZMQ_DELETED_FUNCTION; + + socket_ref _socket; + socket_t _monitor_socket; + + void close() ZMQ_NOTHROW + { + if (_socket) + zmq_socket_monitor(_socket.handle(), ZMQ_NULLPTR, 0); + _monitor_socket.close(); + } +}; + +#if defined(ZMQ_BUILD_DRAFT_API) && defined(ZMQ_CPP11) && defined(ZMQ_HAVE_POLLER) + +// polling events +enum class event_flags : short +{ + none = 0, + pollin = ZMQ_POLLIN, + pollout = ZMQ_POLLOUT, + pollerr = ZMQ_POLLERR, + pollpri = ZMQ_POLLPRI +}; + +constexpr event_flags operator|(event_flags a, event_flags b) noexcept +{ + return detail::enum_bit_or(a, b); +} +constexpr event_flags operator&(event_flags a, event_flags b) noexcept +{ + return detail::enum_bit_and(a, b); +} +constexpr event_flags operator^(event_flags a, event_flags b) noexcept +{ + return detail::enum_bit_xor(a, b); +} +constexpr event_flags operator~(event_flags a) noexcept +{ + return detail::enum_bit_not(a); +} + +struct no_user_data; + +// layout compatible with zmq_poller_event_t +template +struct poller_event +{ + socket_ref socket; +#ifdef _WIN32 + SOCKET fd; +#else + int fd; +#endif + T *user_data; + event_flags events; +}; + +template class poller_t +{ + public: + using event_type = poller_event; + + poller_t() : poller_ptr(zmq_poller_new()) + { + if (!poller_ptr) + throw error_t(); + } + + template< + typename Dummy = void, + typename = + typename std::enable_if::value, Dummy>::type> + void add(zmq::socket_ref socket, event_flags events, T *user_data) + { + add_impl(socket, events, user_data); + } + + void add(zmq::socket_ref socket, event_flags events) + { + add_impl(socket, events, nullptr); + } + + void remove(zmq::socket_ref socket) + { + if (0 != zmq_poller_remove(poller_ptr.get(), socket.handle())) { + throw error_t(); + } + } + + void modify(zmq::socket_ref socket, event_flags events) + { + if (0 + != zmq_poller_modify(poller_ptr.get(), socket.handle(), + static_cast(events))) { + throw error_t(); } + } + + size_t wait_all(std::vector &poller_events, + const std::chrono::milliseconds timeout) + { + int rc = zmq_poller_wait_all( + poller_ptr.get(), + reinterpret_cast(poller_events.data()), + static_cast(poller_events.size()), + static_cast(timeout.count())); + if (rc > 0) + return static_cast(rc); + +#if ZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 2, 3) + if (zmq_errno() == EAGAIN) +#else + if (zmq_errno() == ETIMEDOUT) #endif - virtual void on_monitor_started() {} - virtual void on_event_connected(const zmq_event_t &event_, const char* addr_) { (void)event_; (void)addr_; } - virtual void on_event_connect_delayed(const zmq_event_t &event_, const char* addr_) { (void)event_; (void)addr_; } - virtual void on_event_connect_retried(const zmq_event_t &event_, const char* addr_) { (void)event_; (void)addr_; } - virtual void on_event_listening(const zmq_event_t &event_, const char* addr_) { (void)event_; (void)addr_; } - virtual void on_event_bind_failed(const zmq_event_t &event_, const char* addr_) { (void)event_; (void)addr_; } - virtual void on_event_accepted(const zmq_event_t &event_, const char* addr_) { (void)event_; (void)addr_; } - virtual void on_event_accept_failed(const zmq_event_t &event_, const char* addr_) { (void)event_; (void)addr_; } - virtual void on_event_closed(const zmq_event_t &event_, const char* addr_) { (void)event_; (void)addr_; } - virtual void on_event_close_failed(const zmq_event_t &event_, const char* addr_) { (void)event_; (void)addr_; } - virtual void on_event_disconnected(const zmq_event_t &event_, const char* addr_) { (void)event_; (void)addr_; } - virtual void on_event_unknown(const zmq_event_t &event_, const char* addr_) { (void)event_; (void)addr_; } - private: - void* socketPtr; + return 0; + + throw error_t(); + } + + private: + struct destroy_poller_t + { + void operator()(void *ptr) noexcept + { + int rc = zmq_poller_destroy(&ptr); + ZMQ_ASSERT(rc == 0); + } }; + + std::unique_ptr poller_ptr; + + void add_impl(zmq::socket_ref socket, event_flags events, T *user_data) + { + if (0 + != zmq_poller_add(poller_ptr.get(), socket.handle(), + user_data, static_cast(events))) { + throw error_t(); + } + } +}; +#endif // defined(ZMQ_BUILD_DRAFT_API) && defined(ZMQ_CPP11) && defined(ZMQ_HAVE_POLLER) + +inline std::ostream &operator<<(std::ostream &os, const message_t &msg) +{ + return os << msg.str(); } -#endif +} // namespace zmq + +#endif // __ZMQ_HPP_INCLUDED__ diff --git a/src/DabMux.cpp b/src/DabMux.cpp index 4ae607c..4265412 100644 --- a/src/DabMux.cpp +++ b/src/DabMux.cpp @@ -111,7 +111,6 @@ typedef DWORD32 uint32_t; using namespace std; using boost::property_tree::ptree; -using boost::property_tree::ptree_error; volatile sig_atomic_t running = 1; @@ -323,7 +322,6 @@ int main(int argc, char *argv[]) if (outputuid == "edi") { ptree pt_edi = pt_outputs.get_child("edi"); - bool require_dest_port = false; for (auto pt_edi_dest : pt_edi.get_child("destinations")) { const auto proto = pt_edi_dest.second.get("protocol", "udp"); @@ -335,9 +333,16 @@ int main(int argc, char *argv[]) dest->source_addr = pt_edi_dest.second.get("source", ""); dest->source_port = pt_edi_dest.second.get("sourceport"); - edi_conf.destinations.push_back(dest); + dest->dest_port = pt_edi_dest.second.get("port", 0); + if (dest->dest_port == 0) { + // Compatiblity: we have removed the transport and addressing in the + // PFT layer, which removed the requirement that all outputs must share + // the same destination port. If missing from the destination specification, + // we read it from the parent block, where it was before. + dest->dest_port = pt_edi.get("port"); + } - require_dest_port = true; + edi_conf.destinations.push_back(dest); } else if (proto == "tcp") { auto dest = make_shared(); @@ -350,14 +355,9 @@ int main(int argc, char *argv[]) } } - if (require_dest_port) { - edi_conf.dest_port = pt_edi.get("port"); - } - edi_conf.dump = pt_edi.get("dump", false); edi_conf.enable_pft = pt_edi.get("enable_pft", false); edi_conf.verbose = pt_edi.get("verbose", false); - edi_conf.enable_transport_header = pt_edi.get("enable_transport_addressing", true); edi_conf.fec = pt_edi.get("fec", 3); edi_conf.chunk_len = pt_edi.get("chunk_len", 207); diff --git a/src/input/Edi.cpp b/src/input/Edi.cpp index a5e6525..e6a7e3e 100644 --- a/src/input/Edi.cpp +++ b/src/input/Edi.cpp @@ -35,6 +35,8 @@ #include #include #include +#include "Socket.h" +#include "edi/common.hpp" #include "utils.h" using namespace std; @@ -330,13 +332,14 @@ void Edi::m_run() case InputUsed::UDP: { constexpr size_t packsize = 2048; - const auto packet = m_udp_sock.receive(packsize); + auto packet = m_udp_sock.receive(packsize); if (packet.buffer.size() == packsize) { fprintf(stderr, "Warning, possible UDP truncation\n"); } if (not packet.buffer.empty()) { try { - m_sti_decoder.push_packet(packet.buffer); + EdiDecoder::Packet p(move(packet.buffer)); + m_sti_decoder.push_packet(p); } catch (const runtime_error& e) { etiLog.level(warn) << "EDI input " << m_name << " exception: " << e.what(); @@ -350,19 +353,26 @@ void Edi::m_run() break; case InputUsed::TCP: { - auto packet = m_tcp_receive_server.receive(); - if (not packet.empty()) { + auto message = m_tcp_receive_server.receive(); + if (auto data = dynamic_pointer_cast(message)) { try { - m_sti_decoder.push_bytes(packet); + m_sti_decoder.push_bytes(data->data); } catch (const runtime_error& e) { etiLog.level(warn) << "EDI input " << m_name << " exception: " << e.what(); this_thread::sleep_for(chrono::milliseconds(24)); } } - else { + else if (dynamic_pointer_cast(message)) { + etiLog.level(info) << "EDI input " << m_name << " disconnected"; + m_sti_decoder.push_bytes({}); // Push an empty frame to clear the internal state + } + else if (dynamic_pointer_cast(message)) { this_thread::sleep_for(chrono::milliseconds(12)); } + else { + throw logic_error("unimplemented TCPReceiveMessage type"); + } } break; default: diff --git a/src/input/Zmq.cpp b/src/input/Zmq.cpp index 305653b..be3fd1f 100644 --- a/src/input/Zmq.cpp +++ b/src/input/Zmq.cpp @@ -51,6 +51,7 @@ #include #include "PcDebug.h" #include "Log.h" +#include "zmq.hpp" #ifdef __MINGW32__ # define bzero(s, n) memset(s, 0, n) @@ -348,7 +349,8 @@ int ZmqMPEG::readFromSocket(size_t framesize) zmq::message_t msg; try { - messageReceived = m_zmq_sock.recv(&msg, ZMQ_DONTWAIT); + auto result = m_zmq_sock.recv(msg, zmq::recv_flags::dontwait); + messageReceived = result.has_value(); if (not messageReceived) { return 0; } @@ -417,7 +419,8 @@ int ZmqAAC::readFromSocket(size_t framesize) zmq::message_t msg; try { - messageReceived = m_zmq_sock.recv(&msg, ZMQ_DONTWAIT); + auto result = m_zmq_sock.recv(msg, zmq::recv_flags::dontwait); + messageReceived = result.has_value(); if (not messageReceived) { return 0; } @@ -615,4 +618,3 @@ const string ZmqBase::get_parameter(const string& parameter) const } }; - diff --git a/src/zmq2edi/zmq2edi.cpp b/src/zmq2edi/zmq2edi.cpp index d907e6d..5baafd5 100644 --- a/src/zmq2edi/zmq2edi.cpp +++ b/src/zmq2edi/zmq2edi.cpp @@ -62,7 +62,6 @@ static void usage() cerr << " -C Before starting, run the given script, and only start if it returns 0." << endl; cerr << " This is useful for checking that NTP is properly synchronised" << endl; cerr << " -x Drop frames where for which the wait time would be negative, i.e. frames that arrived too late." << endl; - cerr << " -p Set the destination port." << endl; cerr << " -P Disable PFT and send AFPackets." << endl; cerr << " -f Set the FEC." << endl; cerr << " -i Enable the interleaver with this latency." << endl; @@ -73,6 +72,7 @@ static void usage() cerr << "The following options can be given several times, when more than UDP destination is desired:" << endl; cerr << " -d Set the destination ip." << endl; + cerr << " -p Set the destination port." << endl; cerr << " -s Set the source port." << endl; cerr << " -S Select the source IP in case we want to use multicast." << endl; cerr << " -t Set the packet's TTL." << endl << endl; @@ -163,6 +163,7 @@ static metadata_t get_md_one_frame(uint8_t *buf, size_t size, size_t *consumed_b * because several destinations can be given. */ static std::shared_ptr edi_destination; +static bool dest_port_set = false; static bool source_port_set = false; static bool source_addr_set = false; static bool ttl_set = false; @@ -178,6 +179,7 @@ static void add_edi_destination(void) edi_conf.destinations.push_back(move(edi_destination)); edi_destination = std::make_shared(); + dest_port_set = false; source_port_set = false; source_addr_set = false; ttl_set = false; @@ -191,6 +193,13 @@ static void parse_destination_args(char option) } switch (option) { + case 'p': + if (dest_port_set) { + add_edi_destination(); + } + edi_destination->dest_port = std::stoi(optarg); + dest_port_set = true; + break; case 's': if (source_port_set) { add_edi_destination(); @@ -253,10 +262,8 @@ int start(int argc, char **argv) case 's': case 'S': case 't': - parse_destination_args(ch); - break; case 'p': - edi_conf.dest_port = std::stoi(optarg); + parse_destination_args(ch); break; case 'P': edi_conf.enable_pft = false; @@ -332,11 +339,6 @@ int start(int argc, char **argv) return 1; } - if (edi_conf.dest_port == 0) { - etiLog.level(error) << "No EDI destination port defined"; - return 1; - } - if (edi_conf.destinations.empty()) { etiLog.level(error) << "No EDI destinations set"; return 1; -- cgit v1.2.3