aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/DabMux.cpp18
-rw-r--r--src/input/Edi.cpp22
-rw-r--r--src/input/Zmq.cpp8
-rw-r--r--src/zmq2edi/zmq2edi.cpp20
4 files changed, 41 insertions, 27 deletions
diff --git a/src/DabMux.cpp b/src/DabMux.cpp
index 4ae607c..4265412 100644
--- a/src/DabMux.cpp
+++ b/src/DabMux.cpp
@@ -111,7 +111,6 @@ typedef DWORD32 uint32_t;
using namespace std;
using boost::property_tree::ptree;
-using boost::property_tree::ptree_error;
volatile sig_atomic_t running = 1;
@@ -323,7 +322,6 @@ int main(int argc, char *argv[])
if (outputuid == "edi") {
ptree pt_edi = pt_outputs.get_child("edi");
- bool require_dest_port = false;
for (auto pt_edi_dest : pt_edi.get_child("destinations")) {
const auto proto = pt_edi_dest.second.get<string>("protocol", "udp");
@@ -335,9 +333,16 @@ int main(int argc, char *argv[])
dest->source_addr = pt_edi_dest.second.get<string>("source", "");
dest->source_port = pt_edi_dest.second.get<unsigned int>("sourceport");
- edi_conf.destinations.push_back(dest);
+ dest->dest_port = pt_edi_dest.second.get<unsigned int>("port", 0);
+ if (dest->dest_port == 0) {
+ // Compatiblity: we have removed the transport and addressing in the
+ // PFT layer, which removed the requirement that all outputs must share
+ // the same destination port. If missing from the destination specification,
+ // we read it from the parent block, where it was before.
+ dest->dest_port = pt_edi.get<unsigned int>("port");
+ }
- require_dest_port = true;
+ edi_conf.destinations.push_back(dest);
}
else if (proto == "tcp") {
auto dest = make_shared<edi::tcp_server_t>();
@@ -350,14 +355,9 @@ int main(int argc, char *argv[])
}
}
- if (require_dest_port) {
- edi_conf.dest_port = pt_edi.get<unsigned int>("port");
- }
-
edi_conf.dump = pt_edi.get<bool>("dump", false);
edi_conf.enable_pft = pt_edi.get<bool>("enable_pft", false);
edi_conf.verbose = pt_edi.get<bool>("verbose", false);
- edi_conf.enable_transport_header = pt_edi.get<bool>("enable_transport_addressing", true);
edi_conf.fec = pt_edi.get<unsigned int>("fec", 3);
edi_conf.chunk_len = pt_edi.get<unsigned int>("chunk_len", 207);
diff --git a/src/input/Edi.cpp b/src/input/Edi.cpp
index a5e6525..e6a7e3e 100644
--- a/src/input/Edi.cpp
+++ b/src/input/Edi.cpp
@@ -35,6 +35,8 @@
#include <cstdlib>
#include <cerrno>
#include <climits>
+#include "Socket.h"
+#include "edi/common.hpp"
#include "utils.h"
using namespace std;
@@ -330,13 +332,14 @@ void Edi::m_run()
case InputUsed::UDP:
{
constexpr size_t packsize = 2048;
- const auto packet = m_udp_sock.receive(packsize);
+ auto packet = m_udp_sock.receive(packsize);
if (packet.buffer.size() == packsize) {
fprintf(stderr, "Warning, possible UDP truncation\n");
}
if (not packet.buffer.empty()) {
try {
- m_sti_decoder.push_packet(packet.buffer);
+ EdiDecoder::Packet p(move(packet.buffer));
+ m_sti_decoder.push_packet(p);
}
catch (const runtime_error& e) {
etiLog.level(warn) << "EDI input " << m_name << " exception: " << e.what();
@@ -350,19 +353,26 @@ void Edi::m_run()
break;
case InputUsed::TCP:
{
- auto packet = m_tcp_receive_server.receive();
- if (not packet.empty()) {
+ auto message = m_tcp_receive_server.receive();
+ if (auto data = dynamic_pointer_cast<Socket::TCPReceiveMessageData>(message)) {
try {
- m_sti_decoder.push_bytes(packet);
+ m_sti_decoder.push_bytes(data->data);
}
catch (const runtime_error& e) {
etiLog.level(warn) << "EDI input " << m_name << " exception: " << e.what();
this_thread::sleep_for(chrono::milliseconds(24));
}
}
- else {
+ else if (dynamic_pointer_cast<Socket::TCPReceiveMessageDisconnected>(message)) {
+ etiLog.level(info) << "EDI input " << m_name << " disconnected";
+ m_sti_decoder.push_bytes({}); // Push an empty frame to clear the internal state
+ }
+ else if (dynamic_pointer_cast<Socket::TCPReceiveMessageEmpty>(message)) {
this_thread::sleep_for(chrono::milliseconds(12));
}
+ else {
+ throw logic_error("unimplemented TCPReceiveMessage type");
+ }
}
break;
default:
diff --git a/src/input/Zmq.cpp b/src/input/Zmq.cpp
index 305653b..be3fd1f 100644
--- a/src/input/Zmq.cpp
+++ b/src/input/Zmq.cpp
@@ -51,6 +51,7 @@
#include <limits.h>
#include "PcDebug.h"
#include "Log.h"
+#include "zmq.hpp"
#ifdef __MINGW32__
# define bzero(s, n) memset(s, 0, n)
@@ -348,7 +349,8 @@ int ZmqMPEG::readFromSocket(size_t framesize)
zmq::message_t msg;
try {
- messageReceived = m_zmq_sock.recv(&msg, ZMQ_DONTWAIT);
+ auto result = m_zmq_sock.recv(msg, zmq::recv_flags::dontwait);
+ messageReceived = result.has_value();
if (not messageReceived) {
return 0;
}
@@ -417,7 +419,8 @@ int ZmqAAC::readFromSocket(size_t framesize)
zmq::message_t msg;
try {
- messageReceived = m_zmq_sock.recv(&msg, ZMQ_DONTWAIT);
+ auto result = m_zmq_sock.recv(msg, zmq::recv_flags::dontwait);
+ messageReceived = result.has_value();
if (not messageReceived) {
return 0;
}
@@ -615,4 +618,3 @@ const string ZmqBase::get_parameter(const string& parameter) const
}
};
-
diff --git a/src/zmq2edi/zmq2edi.cpp b/src/zmq2edi/zmq2edi.cpp
index d907e6d..5baafd5 100644
--- a/src/zmq2edi/zmq2edi.cpp
+++ b/src/zmq2edi/zmq2edi.cpp
@@ -62,7 +62,6 @@ static void usage()
cerr << " -C <path to script> Before starting, run the given script, and only start if it returns 0." << endl;
cerr << " This is useful for checking that NTP is properly synchronised" << endl;
cerr << " -x Drop frames where for which the wait time would be negative, i.e. frames that arrived too late." << endl;
- cerr << " -p <destination port> Set the destination port." << endl;
cerr << " -P Disable PFT and send AFPackets." << endl;
cerr << " -f <fec> Set the FEC." << endl;
cerr << " -i <interleave> Enable the interleaver with this latency." << endl;
@@ -73,6 +72,7 @@ static void usage()
cerr << "The following options can be given several times, when more than UDP destination is desired:" << endl;
cerr << " -d <destination ip> Set the destination ip." << endl;
+ cerr << " -p <destination port> Set the destination port." << endl;
cerr << " -s <source port> Set the source port." << endl;
cerr << " -S <source ip> Select the source IP in case we want to use multicast." << endl;
cerr << " -t <ttl> Set the packet's TTL." << endl << endl;
@@ -163,6 +163,7 @@ static metadata_t get_md_one_frame(uint8_t *buf, size_t size, size_t *consumed_b
* because several destinations can be given. */
static std::shared_ptr<edi::udp_destination_t> edi_destination;
+static bool dest_port_set = false;
static bool source_port_set = false;
static bool source_addr_set = false;
static bool ttl_set = false;
@@ -178,6 +179,7 @@ static void add_edi_destination(void)
edi_conf.destinations.push_back(move(edi_destination));
edi_destination = std::make_shared<edi::udp_destination_t>();
+ dest_port_set = false;
source_port_set = false;
source_addr_set = false;
ttl_set = false;
@@ -191,6 +193,13 @@ static void parse_destination_args(char option)
}
switch (option) {
+ case 'p':
+ if (dest_port_set) {
+ add_edi_destination();
+ }
+ edi_destination->dest_port = std::stoi(optarg);
+ dest_port_set = true;
+ break;
case 's':
if (source_port_set) {
add_edi_destination();
@@ -253,10 +262,8 @@ int start(int argc, char **argv)
case 's':
case 'S':
case 't':
- parse_destination_args(ch);
- break;
case 'p':
- edi_conf.dest_port = std::stoi(optarg);
+ parse_destination_args(ch);
break;
case 'P':
edi_conf.enable_pft = false;
@@ -332,11 +339,6 @@ int start(int argc, char **argv)
return 1;
}
- if (edi_conf.dest_port == 0) {
- etiLog.level(error) << "No EDI destination port defined";
- return 1;
- }
-
if (edi_conf.destinations.empty()) {
etiLog.level(error) << "No EDI destinations set";
return 1;