From 12670a017ddb14fbf4a932799051dcfe21dd6c78 Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Fri, 15 Jan 2021 07:09:03 +0100 Subject: Common 6b5db53: Update zmq.hpp, TCPReceiveServer, EDI decoder and output --- src/DabMux.cpp | 18 +++++++++--------- src/input/Edi.cpp | 22 ++++++++++++++++------ src/input/Zmq.cpp | 8 +++++--- src/zmq2edi/zmq2edi.cpp | 20 +++++++++++--------- 4 files changed, 41 insertions(+), 27 deletions(-) (limited to 'src') diff --git a/src/DabMux.cpp b/src/DabMux.cpp index 4ae607c..4265412 100644 --- a/src/DabMux.cpp +++ b/src/DabMux.cpp @@ -111,7 +111,6 @@ typedef DWORD32 uint32_t; using namespace std; using boost::property_tree::ptree; -using boost::property_tree::ptree_error; volatile sig_atomic_t running = 1; @@ -323,7 +322,6 @@ int main(int argc, char *argv[]) if (outputuid == "edi") { ptree pt_edi = pt_outputs.get_child("edi"); - bool require_dest_port = false; for (auto pt_edi_dest : pt_edi.get_child("destinations")) { const auto proto = pt_edi_dest.second.get("protocol", "udp"); @@ -335,9 +333,16 @@ int main(int argc, char *argv[]) dest->source_addr = pt_edi_dest.second.get("source", ""); dest->source_port = pt_edi_dest.second.get("sourceport"); - edi_conf.destinations.push_back(dest); + dest->dest_port = pt_edi_dest.second.get("port", 0); + if (dest->dest_port == 0) { + // Compatiblity: we have removed the transport and addressing in the + // PFT layer, which removed the requirement that all outputs must share + // the same destination port. If missing from the destination specification, + // we read it from the parent block, where it was before. + dest->dest_port = pt_edi.get("port"); + } - require_dest_port = true; + edi_conf.destinations.push_back(dest); } else if (proto == "tcp") { auto dest = make_shared(); @@ -350,14 +355,9 @@ int main(int argc, char *argv[]) } } - if (require_dest_port) { - edi_conf.dest_port = pt_edi.get("port"); - } - edi_conf.dump = pt_edi.get("dump", false); edi_conf.enable_pft = pt_edi.get("enable_pft", false); edi_conf.verbose = pt_edi.get("verbose", false); - edi_conf.enable_transport_header = pt_edi.get("enable_transport_addressing", true); edi_conf.fec = pt_edi.get("fec", 3); edi_conf.chunk_len = pt_edi.get("chunk_len", 207); diff --git a/src/input/Edi.cpp b/src/input/Edi.cpp index a5e6525..e6a7e3e 100644 --- a/src/input/Edi.cpp +++ b/src/input/Edi.cpp @@ -35,6 +35,8 @@ #include #include #include +#include "Socket.h" +#include "edi/common.hpp" #include "utils.h" using namespace std; @@ -330,13 +332,14 @@ void Edi::m_run() case InputUsed::UDP: { constexpr size_t packsize = 2048; - const auto packet = m_udp_sock.receive(packsize); + auto packet = m_udp_sock.receive(packsize); if (packet.buffer.size() == packsize) { fprintf(stderr, "Warning, possible UDP truncation\n"); } if (not packet.buffer.empty()) { try { - m_sti_decoder.push_packet(packet.buffer); + EdiDecoder::Packet p(move(packet.buffer)); + m_sti_decoder.push_packet(p); } catch (const runtime_error& e) { etiLog.level(warn) << "EDI input " << m_name << " exception: " << e.what(); @@ -350,19 +353,26 @@ void Edi::m_run() break; case InputUsed::TCP: { - auto packet = m_tcp_receive_server.receive(); - if (not packet.empty()) { + auto message = m_tcp_receive_server.receive(); + if (auto data = dynamic_pointer_cast(message)) { try { - m_sti_decoder.push_bytes(packet); + m_sti_decoder.push_bytes(data->data); } catch (const runtime_error& e) { etiLog.level(warn) << "EDI input " << m_name << " exception: " << e.what(); this_thread::sleep_for(chrono::milliseconds(24)); } } - else { + else if (dynamic_pointer_cast(message)) { + etiLog.level(info) << "EDI input " << m_name << " disconnected"; + m_sti_decoder.push_bytes({}); // Push an empty frame to clear the internal state + } + else if (dynamic_pointer_cast(message)) { this_thread::sleep_for(chrono::milliseconds(12)); } + else { + throw logic_error("unimplemented TCPReceiveMessage type"); + } } break; default: diff --git a/src/input/Zmq.cpp b/src/input/Zmq.cpp index 305653b..be3fd1f 100644 --- a/src/input/Zmq.cpp +++ b/src/input/Zmq.cpp @@ -51,6 +51,7 @@ #include #include "PcDebug.h" #include "Log.h" +#include "zmq.hpp" #ifdef __MINGW32__ # define bzero(s, n) memset(s, 0, n) @@ -348,7 +349,8 @@ int ZmqMPEG::readFromSocket(size_t framesize) zmq::message_t msg; try { - messageReceived = m_zmq_sock.recv(&msg, ZMQ_DONTWAIT); + auto result = m_zmq_sock.recv(msg, zmq::recv_flags::dontwait); + messageReceived = result.has_value(); if (not messageReceived) { return 0; } @@ -417,7 +419,8 @@ int ZmqAAC::readFromSocket(size_t framesize) zmq::message_t msg; try { - messageReceived = m_zmq_sock.recv(&msg, ZMQ_DONTWAIT); + auto result = m_zmq_sock.recv(msg, zmq::recv_flags::dontwait); + messageReceived = result.has_value(); if (not messageReceived) { return 0; } @@ -615,4 +618,3 @@ const string ZmqBase::get_parameter(const string& parameter) const } }; - diff --git a/src/zmq2edi/zmq2edi.cpp b/src/zmq2edi/zmq2edi.cpp index d907e6d..5baafd5 100644 --- a/src/zmq2edi/zmq2edi.cpp +++ b/src/zmq2edi/zmq2edi.cpp @@ -62,7 +62,6 @@ static void usage() cerr << " -C Before starting, run the given script, and only start if it returns 0." << endl; cerr << " This is useful for checking that NTP is properly synchronised" << endl; cerr << " -x Drop frames where for which the wait time would be negative, i.e. frames that arrived too late." << endl; - cerr << " -p Set the destination port." << endl; cerr << " -P Disable PFT and send AFPackets." << endl; cerr << " -f Set the FEC." << endl; cerr << " -i Enable the interleaver with this latency." << endl; @@ -73,6 +72,7 @@ static void usage() cerr << "The following options can be given several times, when more than UDP destination is desired:" << endl; cerr << " -d Set the destination ip." << endl; + cerr << " -p Set the destination port." << endl; cerr << " -s Set the source port." << endl; cerr << " -S Select the source IP in case we want to use multicast." << endl; cerr << " -t Set the packet's TTL." << endl << endl; @@ -163,6 +163,7 @@ static metadata_t get_md_one_frame(uint8_t *buf, size_t size, size_t *consumed_b * because several destinations can be given. */ static std::shared_ptr edi_destination; +static bool dest_port_set = false; static bool source_port_set = false; static bool source_addr_set = false; static bool ttl_set = false; @@ -178,6 +179,7 @@ static void add_edi_destination(void) edi_conf.destinations.push_back(move(edi_destination)); edi_destination = std::make_shared(); + dest_port_set = false; source_port_set = false; source_addr_set = false; ttl_set = false; @@ -191,6 +193,13 @@ static void parse_destination_args(char option) } switch (option) { + case 'p': + if (dest_port_set) { + add_edi_destination(); + } + edi_destination->dest_port = std::stoi(optarg); + dest_port_set = true; + break; case 's': if (source_port_set) { add_edi_destination(); @@ -253,10 +262,8 @@ int start(int argc, char **argv) case 's': case 'S': case 't': - parse_destination_args(ch); - break; case 'p': - edi_conf.dest_port = std::stoi(optarg); + parse_destination_args(ch); break; case 'P': edi_conf.enable_pft = false; @@ -332,11 +339,6 @@ int start(int argc, char **argv) return 1; } - if (edi_conf.dest_port == 0) { - etiLog.level(error) << "No EDI destination port defined"; - return 1; - } - if (edi_conf.destinations.empty()) { etiLog.level(error) << "No EDI destinations set"; return 1; -- cgit v1.2.3