summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMatthias P. Braendli <matthias.braendli@mpb.li>2019-05-06 15:04:51 +0200
committerMatthias P. Braendli <matthias.braendli@mpb.li>2019-05-06 15:04:51 +0200
commit956814cc526bdd245e52c5004bf5661a57d848cc (patch)
tree10a4e368432740fc0514ae9d1de572bb1c844768
parent8cb5b3eac1bb669b8828777489d54e9d9057fe6f (diff)
downloaddabmux-956814cc526bdd245e52c5004bf5661a57d848cc.tar.gz
dabmux-956814cc526bdd245e52c5004bf5661a57d848cc.tar.bz2
dabmux-956814cc526bdd245e52c5004bf5661a57d848cc.zip
EDI: put more code in common between DabMux and ZMQ2EDI
-rw-r--r--Makefile.am22
-rw-r--r--src/DabMultiplexer.cpp99
-rw-r--r--src/DabMultiplexer.h19
-rw-r--r--src/DabMux.cpp26
-rw-r--r--src/dabOutput/dabOutput.h28
-rw-r--r--src/dabOutput/edi/Config.h71
-rw-r--r--src/dabOutput/edi/Interleaver.cpp1
-rw-r--r--src/dabOutput/edi/PFT.cpp30
-rw-r--r--src/dabOutput/edi/PFT.h57
-rw-r--r--src/dabOutput/edi/Transport.cpp164
-rw-r--r--src/dabOutput/edi/Transport.h68
-rw-r--r--src/zmq2edi/EDISender.cpp110
-rw-r--r--src/zmq2edi/EDISender.h20
-rw-r--r--src/zmq2edi/zmq2edi.cpp23
14 files changed, 395 insertions, 343 deletions
diff --git a/Makefile.am b/Makefile.am
index 3dbb918..b756385 100644
--- a/Makefile.am
+++ b/Makefile.am
@@ -81,14 +81,17 @@ odr_dabmux_SOURCES =src/DabMux.cpp \
src/dabOutput/metadata.cpp \
src/dabOutput/edi/AFPacket.cpp \
src/dabOutput/edi/AFPacket.h \
+ src/dabOutput/edi/Config.h \
+ src/dabOutput/edi/Interleaver.cpp \
+ src/dabOutput/edi/Interleaver.h \
+ src/dabOutput/edi/PFT.cpp \
+ src/dabOutput/edi/PFT.h \
src/dabOutput/edi/TagItems.cpp \
src/dabOutput/edi/TagItems.h \
src/dabOutput/edi/TagPacket.cpp \
src/dabOutput/edi/TagPacket.h \
- src/dabOutput/edi/PFT.cpp \
- src/dabOutput/edi/PFT.h \
- src/dabOutput/edi/Interleaver.cpp \
- src/dabOutput/edi/Interleaver.h \
+ src/dabOutput/edi/Transport.cpp \
+ src/dabOutput/edi/Transport.h \
src/ClockTAI.h \
src/ClockTAI.cpp \
src/ConfigParser.cpp \
@@ -191,14 +194,17 @@ odr_zmq2edi_SOURCES = src/zmq2edi/zmq2edi.cpp \
src/dabOutput/metadata.cpp \
src/dabOutput/edi/AFPacket.cpp \
src/dabOutput/edi/AFPacket.h \
+ src/dabOutput/edi/Config.h \
+ src/dabOutput/edi/Interleaver.cpp \
+ src/dabOutput/edi/Interleaver.h \
+ src/dabOutput/edi/PFT.cpp \
+ src/dabOutput/edi/PFT.h \
src/dabOutput/edi/TagItems.cpp \
src/dabOutput/edi/TagItems.h \
src/dabOutput/edi/TagPacket.cpp \
src/dabOutput/edi/TagPacket.h \
- src/dabOutput/edi/PFT.cpp \
- src/dabOutput/edi/PFT.h \
- src/dabOutput/edi/Interleaver.cpp \
- src/dabOutput/edi/Interleaver.h \
+ src/dabOutput/edi/Transport.cpp \
+ src/dabOutput/edi/Transport.h \
src/InetAddress.h \
src/InetAddress.cpp \
src/UdpSocket.h \
diff --git a/src/DabMultiplexer.cpp b/src/DabMultiplexer.cpp
index 3a7f31f..9ff28a3 100644
--- a/src/DabMultiplexer.cpp
+++ b/src/DabMultiplexer.cpp
@@ -98,46 +98,10 @@ DabMultiplexer::~DabMultiplexer()
rcs.remove_controllable(&m_clock_tai);
}
-void DabMultiplexer::set_edi_config(const edi_configuration_t& new_edi_conf)
+void DabMultiplexer::set_edi_config(const edi::configuration_t& new_edi_conf)
{
edi_conf = new_edi_conf;
-
- if (edi_conf.verbose) {
- etiLog.log(info, "Setup EDI");
- }
-
- if (edi_conf.dump) {
- edi_debug_file.open("./edi.debug");
- }
-
- if (edi_conf.enabled()) {
- for (auto& edi_destination : edi_conf.destinations) {
- auto edi_output = std::make_shared<UdpSocket>(edi_destination.source_port);
-
- if (not edi_destination.source_addr.empty()) {
- int err = edi_output->setMulticastSource(edi_destination.source_addr.c_str());
- if (err) {
- etiLog.level(error) << "EDI socket set source failed!";
- throw MuxInitException();
- }
- err = edi_output->setMulticastTTL(edi_destination.ttl);
- if (err) {
- etiLog.level(error) << "EDI socket set TTL failed!";
- throw MuxInitException();
- }
- }
-
- edi_destination.socket = edi_output;
- }
- }
-
- if (edi_conf.verbose) {
- etiLog.log(info, "EDI set up");
- }
-
- // The AF Packet will be protected with reed-solomon and split in fragments
- edi::PFT pft(edi_conf);
- edi_pft = pft;
+ edi_sender = make_shared<edi::Sender>(edi_conf);
}
@@ -211,10 +175,6 @@ void DabMultiplexer::prepare(bool require_tai_clock)
}
}
- if (edi_conf.interleaver_enabled()) {
- edi_interleaver.SetLatency(edi_conf.latency_frames);
- }
-
}
@@ -749,7 +709,7 @@ void DabMultiplexer::mux_frame(std::vector<std::shared_ptr<DabOutput> >& outputs
/**********************************************************************
*********** Finalise and send EDI ********************************
**********************************************************************/
- if (edi_conf.enabled()) {
+ if (edi_sender and edi_conf.enabled()) {
// put tags *ptr, DETI and all subchannels into one TagPacket
edi_tagpacket.tag_items.push_back(&edi_tagStarPtr);
edi_tagpacket.tag_items.push_back(&edi_tagDETI);
@@ -758,58 +718,7 @@ void DabMultiplexer::mux_frame(std::vector<std::shared_ptr<DabOutput> >& outputs
edi_tagpacket.tag_items.push_back(&tag.second);
}
- // Assemble into one AF Packet
- edi::AFPacket edi_afpacket = edi_afPacketiser.Assemble(edi_tagpacket);
-
- if (edi_conf.enable_pft) {
- // Apply PFT layer to AF Packet (Reed Solomon FEC and Fragmentation)
- vector<edi::PFTFragment> edi_fragments = edi_pft.Assemble(edi_afpacket);
-
- if (edi_conf.verbose) {
- fprintf(stderr, "EDI number of PFT fragment before interleaver %zu",
- edi_fragments.size());
- }
-
- if (edi_conf.interleaver_enabled()) {
- edi_fragments = edi_interleaver.Interleave(edi_fragments);
- }
-
- // Send over ethernet
- for (const auto& edi_frag : edi_fragments) {
- for (auto& dest : edi_conf.destinations) {
- InetAddress addr;
- addr.setAddress(dest.dest_addr.c_str());
- addr.setPort(edi_conf.dest_port);
-
- dest.socket->send(edi_frag, addr);
- }
-
- if (edi_conf.dump) {
- std::ostream_iterator<uint8_t> debug_iterator(edi_debug_file);
- std::copy(edi_frag.begin(), edi_frag.end(), debug_iterator);
- }
- }
-
- if (edi_conf.verbose) {
- fprintf(stderr, "EDI number of PFT fragments %zu",
- edi_fragments.size());
- }
- }
- else {
- // Send over ethernet
- for (auto& dest : edi_conf.destinations) {
- InetAddress addr;
- addr.setAddress(dest.dest_addr.c_str());
- addr.setPort(edi_conf.dest_port);
-
- dest.socket->send(edi_afpacket, addr);
- }
-
- if (edi_conf.dump) {
- std::ostream_iterator<uint8_t> debug_iterator(edi_debug_file);
- std::copy(edi_afpacket.begin(), edi_afpacket.end(), debug_iterator);
- }
- }
+ edi_sender->write(edi_tagpacket);
}
#if _DEBUG
diff --git a/src/DabMultiplexer.h b/src/DabMultiplexer.h
index 7090be7..386c23c 100644
--- a/src/DabMultiplexer.h
+++ b/src/DabMultiplexer.h
@@ -33,8 +33,7 @@
#include "dabOutput/edi/TagItems.h"
#include "dabOutput/edi/TagPacket.h"
#include "dabOutput/edi/AFPacket.h"
-#include "dabOutput/edi/PFT.h"
-#include "dabOutput/edi/Interleaver.h"
+#include "dabOutput/edi/Transport.h"
#include "fig/FIGCarousel.h"
#include "crc.h"
#include "utils.h"
@@ -67,7 +66,7 @@ class DabMultiplexer : public RemoteControllable {
void print_info(void);
- void set_edi_config(const edi_configuration_t& new_edi_conf);
+ void set_edi_config(const edi::configuration_t& new_edi_conf);
/* Remote control */
virtual void set_parameter(const std::string& parameter,
@@ -88,7 +87,8 @@ class DabMultiplexer : public RemoteControllable {
std::time_t edi_time;
std::time_t edi_time_latched_for_mnsc;
- edi_configuration_t edi_conf;
+ edi::configuration_t edi_conf;
+ std::shared_ptr<edi::Sender> edi_sender;
uint32_t sync = 0x49C5F8;
unsigned long currentFrame = 0;
@@ -99,17 +99,6 @@ class DabMultiplexer : public RemoteControllable {
bool m_tai_clock_required = false;
ClockTAI m_clock_tai;
- std::ofstream edi_debug_file;
-
- // The TagPacket will then be placed into an AFPacket
- edi::AFPacketiser edi_afPacketiser;
-
- // The AF Packet will be protected with reed-solomon and split in fragments
- edi::PFT edi_pft;
-
- // To mitigate for burst packet loss, PFT fragments can be sent out-of-order
- edi::Interleaver edi_interleaver;
-
/* New FIG Carousel */
FIC::FIGCarousel fig_carousel;
};
diff --git a/src/DabMux.cpp b/src/DabMux.cpp
index 3d19ee4..3d0a7d9 100644
--- a/src/DabMux.cpp
+++ b/src/DabMux.cpp
@@ -271,7 +271,7 @@ int main(int argc, char *argv[])
" starting up";
- edi_configuration_t edi_conf;
+ edi::configuration_t edi_conf;
/******************** READ OUTPUT PARAMETERS ***************/
set<string> all_output_names;
@@ -293,12 +293,12 @@ int main(int argc, char *argv[])
if (outputuid == "edi") {
ptree pt_edi = pt_outputs.get_child("edi");
for (auto pt_edi_dest : pt_edi.get_child("destinations")) {
- edi_destination_t dest;
- dest.dest_addr = pt_edi_dest.second.get<string>("destination");
- dest.ttl = pt_edi_dest.second.get<unsigned int>("ttl", 1);
+ auto dest = make_shared<edi::udp_destination_t>();
+ dest->dest_addr = pt_edi_dest.second.get<string>("destination");
+ dest->ttl = pt_edi_dest.second.get<unsigned int>("ttl", 1);
- dest.source_addr = pt_edi_dest.second.get<string>("source", "");
- dest.source_port = pt_edi_dest.second.get<unsigned int>("sourceport");
+ dest->source_addr = pt_edi_dest.second.get<string>("source", "");
+ dest->source_port = pt_edi_dest.second.get<unsigned int>("sourceport");
edi_conf.destinations.push_back(dest);
}
@@ -460,19 +460,7 @@ int main(int argc, char *argv[])
printOutputs(outputs);
if (edi_conf.enabled()) {
- etiLog.level(info) << "EDI";
- etiLog.level(info) << " verbose " << edi_conf.verbose;
- for (auto& edi_dest : edi_conf.destinations) {
- etiLog.level(info) << " to " << edi_dest.dest_addr << ":" << edi_conf.dest_port;
- if (not edi_dest.source_addr.empty()) {
- etiLog.level(info) << " source " << edi_dest.source_addr;
- etiLog.level(info) << " ttl " << edi_dest.ttl;
- }
- etiLog.level(info) << " source port " << edi_dest.source_port;
- }
- if (edi_conf.interleaver_enabled()) {
- etiLog.level(info) << " interleave " << edi_conf.latency_frames * 24 << " ms";
- }
+ edi_conf.print();
}
size_t limit = pt.get("general.nbframes", 0);
diff --git a/src/dabOutput/dabOutput.h b/src/dabOutput/dabOutput.h
index e5a8a94..9cc18d7 100644
--- a/src/dabOutput/dabOutput.h
+++ b/src/dabOutput/dabOutput.h
@@ -48,34 +48,6 @@
#endif
#include "dabOutput/metadata.h"
-/** Configuration for EDI output */
-
-// Can represent both unicast and multicast destinations
-struct edi_destination_t {
- std::string dest_addr;
- std::string source_addr;
- unsigned int source_port = 0;
- unsigned int ttl = 10;
-
- std::shared_ptr<UdpSocket> socket;
-};
-
-struct edi_configuration_t {
- unsigned chunk_len = 207; // RSk, data length of each chunk
- unsigned fec = 0; // number of fragments that can be recovered
- bool dump = false; // dump a file with the EDI packets
- bool verbose = false;
- bool enable_pft = false; // Enable protection and fragmentation
- unsigned int tagpacket_alignment = 0;
- std::vector<edi_destination_t> destinations;
- unsigned int dest_port = 0; // common destination port, because it's encoded in the transport layer
- unsigned int latency_frames = 0; // if nonzero, enable interleaver with a latency of latency_frames * 24ms
-
- bool enabled() const { return destinations.size() > 0; }
- bool interleaver_enabled() const { return latency_frames > 0; }
-};
-
-
// Abstract base class for all outputs
class DabOutput
{
diff --git a/src/dabOutput/edi/Config.h b/src/dabOutput/edi/Config.h
new file mode 100644
index 0000000..d3678d9
--- /dev/null
+++ b/src/dabOutput/edi/Config.h
@@ -0,0 +1,71 @@
+/*
+ Copyright (C) 2019
+ Matthias P. Braendli, matthias.braendli@mpb.li
+
+ http://www.opendigitalradio.org
+
+ EDI output,
+ UDP and TCP transports and their configuration
+
+ */
+/*
+ This file is part of ODR-DabMux.
+
+ ODR-DabMux is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as
+ published by the Free Software Foundation, either version 3 of the
+ License, or (at your option) any later version.
+
+ ODR-DabMux is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#pragma once
+
+#include "config.h"
+#include <vector>
+#include <string>
+#include <memory>
+#include <cstdint>
+
+namespace edi {
+
+/** Configuration for EDI output */
+
+struct destination_t {
+ virtual ~destination_t() {};
+};
+
+// Can represent both unicast and multicast destinations
+struct udp_destination_t : public destination_t {
+ std::string dest_addr;
+ std::string source_addr;
+ unsigned int source_port = 0;
+ unsigned int ttl = 10;
+};
+
+struct configuration_t {
+ unsigned chunk_len = 207; // RSk, data length of each chunk
+ unsigned fec = 0; // number of fragments that can be recovered
+ bool dump = false; // dump a file with the EDI packets
+ bool verbose = false;
+ bool enable_pft = false; // Enable protection and fragmentation
+ unsigned int tagpacket_alignment = 0;
+ std::vector<std::shared_ptr<destination_t> > destinations;
+ unsigned int dest_port = 0; // common destination port, because it's encoded in the transport layer
+ unsigned int latency_frames = 0; // if nonzero, enable interleaver with a latency of latency_frames * 24ms
+
+ bool enabled() const { return destinations.size() > 0; }
+ bool interleaver_enabled() const { return latency_frames > 0; }
+
+ void print() const;
+};
+
+}
+
+
diff --git a/src/dabOutput/edi/Interleaver.cpp b/src/dabOutput/edi/Interleaver.cpp
index e31326b..f26a50e 100644
--- a/src/dabOutput/edi/Interleaver.cpp
+++ b/src/dabOutput/edi/Interleaver.cpp
@@ -30,6 +30,7 @@
*/
#include "Interleaver.h"
+#include <cassert>
namespace edi {
diff --git a/src/dabOutput/edi/PFT.cpp b/src/dabOutput/edi/PFT.cpp
index 1c885f9..5b93016 100644
--- a/src/dabOutput/edi/PFT.cpp
+++ b/src/dabOutput/edi/PFT.cpp
@@ -1,5 +1,5 @@
/*
- Copyright (C) 2014
+ Copyright (C) 2019
Matthias P. Braendli, matthias.braendli@mpb.li
http://www.opendigitalradio.org
@@ -50,6 +50,30 @@ using namespace std;
// An integer division that rounds up, i.e. ceil(a/b)
#define CEIL_DIV(a, b) (a % b == 0 ? a / b : a / b + 1)
+PFT::PFT() { }
+
+PFT::PFT(const configuration_t &conf) :
+ m_k(conf.chunk_len),
+ m_m(conf.fec),
+ m_dest_port(conf.dest_port),
+ m_pseq(0),
+ m_num_chunks(0),
+ m_verbose(conf.verbose)
+ {
+ if (m_k > 207) {
+ etiLog.level(warn) <<
+ "EDI PFT: maximum chunk size is 207.";
+ throw std::out_of_range("EDI PFT Chunk size too large.");
+ }
+
+ if (m_m > 5) {
+ etiLog.level(warn) <<
+ "EDI PFT: high number of recoverable fragments"
+ " may lead to large overhead";
+ // See TS 102 821, 7.2.1 Known values, list entry for 'm'
+ }
+ }
+
RSBlock PFT::Protect(AFPacket af_packet)
{
RSBlock rs_block;
@@ -98,7 +122,7 @@ RSBlock PFT::Protect(AFPacket af_packet)
// Calculate RS for each chunk and assemble RS block
for (size_t i = 0; i < af_packet.size(); i+= chunk_len) {
vector<uint8_t> chunk(207);
- vector<uint8_t> protection(ParityBytes);
+ vector<uint8_t> protection(PARITYBYTES);
// copy chunk_len bytes into new chunk
memcpy(&chunk.front(), &af_packet[i], chunk_len);
@@ -139,7 +163,7 @@ vector< vector<uint8_t> > PFT::ProtectAndFragment(AFPacket af_packet)
#endif
// TS 102 821 7.2.2: s_max = MIN(floor(c*p/(m+1)), MTU - h))
- const size_t max_payload_size = ( m_num_chunks * ParityBytes ) / (m_m + 1);
+ const size_t max_payload_size = ( m_num_chunks * PARITYBYTES ) / (m_m + 1);
// Calculate fragment count and size
// TS 102 821 7.2.2: ceil((l + c*p + z) / s_max)
diff --git a/src/dabOutput/edi/PFT.h b/src/dabOutput/edi/PFT.h
index 05afdb1..4076bf3 100644
--- a/src/dabOutput/edi/PFT.h
+++ b/src/dabOutput/edi/PFT.h
@@ -1,5 +1,5 @@
/*
- Copyright (C) 2014
+ Copyright (C) 2019
Matthias P. Braendli, matthias.braendli@mpb.li
http://www.opendigitalradio.org
@@ -40,7 +40,7 @@
#include "AFPacket.h"
#include "Log.h"
#include "ReedSolomon.h"
-#include "dabOutput/dabOutput.h"
+#include "dabOutput/edi/Config.h"
namespace edi {
@@ -50,38 +50,10 @@ typedef std::vector<uint8_t> PFTFragment;
class PFT
{
public:
- static const int ParityBytes = 48;
-
- PFT() :
- m_k(207),
- m_m(3),
- m_dest_port(12000),
- m_pseq(0),
- m_num_chunks(0),
- m_verbose(false)
- { }
-
- PFT(const edi_configuration_t &conf) :
- m_k(conf.chunk_len),
- m_m(conf.fec),
- m_dest_port(conf.dest_port),
- m_pseq(0),
- m_num_chunks(0),
- m_verbose(conf.verbose)
- {
- if (m_k > 207) {
- etiLog.level(warn) <<
- "EDI PFT: maximum chunk size is 207.";
- throw std::out_of_range("EDI PFT Chunk size too large.");
- }
-
- if (m_m > 5) {
- etiLog.level(warn) <<
- "EDI PFT: high number of recoverable fragments"
- " may lead to large overhead";
- // See TS 102 821, 7.2.1 Known values, list entry for 'm'
- }
- }
+ static constexpr int PARITYBYTES = 48;
+
+ PFT();
+ PFT(const configuration_t& conf);
// return a list of PFT fragments with the correct
// PFT headers
@@ -94,17 +66,12 @@ class PFT
std::vector< std::vector<uint8_t> > ProtectAndFragment(AFPacket af_packet);
private:
- unsigned int m_k; // length of RS data word
- unsigned int m_m; // number of fragments that can be recovered if lost
-
- unsigned int m_dest_port; // Destination port for transport header
-
- uint16_t m_pseq;
-
- size_t m_num_chunks;
-
- bool m_verbose;
-
+ unsigned int m_k = 207; // length of RS data word
+ unsigned int m_m = 3; // number of fragments that can be recovered if lost
+ unsigned int m_dest_port = 12000; // Destination port for transport header
+ uint16_t m_pseq = 0;
+ size_t m_num_chunks = 0;
+ bool m_verbose = 0;
};
}
diff --git a/src/dabOutput/edi/Transport.cpp b/src/dabOutput/edi/Transport.cpp
new file mode 100644
index 0000000..d433239
--- /dev/null
+++ b/src/dabOutput/edi/Transport.cpp
@@ -0,0 +1,164 @@
+/*
+ Copyright (C) 2019
+ Matthias P. Braendli, matthias.braendli@mpb.li
+
+ http://www.opendigitalradio.org
+
+ EDI output,
+ UDP and TCP transports and their configuration
+
+ */
+/*
+ This file is part of ODR-DabMux.
+
+ ODR-DabMux is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as
+ published by the Free Software Foundation, either version 3 of the
+ License, or (at your option) any later version.
+
+ ODR-DabMux is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include "Transport.h"
+#include <iterator>
+
+using namespace std;
+
+namespace edi {
+
+void configuration_t::print() const
+{
+ etiLog.level(info) << "EDI";
+ etiLog.level(info) << " verbose " << verbose;
+ for (auto edi_dest : destinations) {
+ if (auto udp_dest = dynamic_pointer_cast<edi::udp_destination_t>(edi_dest)) {
+ etiLog.level(info) << " to " << udp_dest->dest_addr << ":" << dest_port;
+ if (not udp_dest->source_addr.empty()) {
+ etiLog.level(info) << " source " << udp_dest->source_addr;
+ etiLog.level(info) << " ttl " << udp_dest->ttl;
+ }
+ etiLog.level(info) << " source port " << udp_dest->source_port;
+ }
+ else {
+ throw std::logic_error("EDI destination not implemented");
+ }
+ }
+ if (interleaver_enabled()) {
+ etiLog.level(info) << " interleave " << latency_frames * 24 << " ms";
+ }
+}
+
+
+Sender::Sender(const configuration_t& conf) :
+ m_conf(conf),
+ edi_pft(m_conf)
+{
+ if (m_conf.verbose) {
+ etiLog.log(info, "Setup EDI");
+ }
+
+ for (const auto& edi_dest : m_conf.destinations) {
+ if (const auto udp_dest = dynamic_pointer_cast<edi::udp_destination_t>(edi_dest)) {
+ auto udp_socket = std::make_shared<UdpSocket>(udp_dest->source_port);
+
+ if (not udp_dest->source_addr.empty()) {
+ int err = udp_socket->setMulticastSource(udp_dest->source_addr.c_str());
+ if (err) {
+ throw runtime_error("EDI socket set source failed!");
+ }
+ err = udp_socket->setMulticastTTL(udp_dest->ttl);
+ if (err) {
+ throw runtime_error("EDI socket set TTL failed!");
+ }
+ }
+
+ udp_sockets.emplace(udp_dest.get(), udp_socket);
+ }
+ }
+
+ if (m_conf.interleaver_enabled()) {
+ edi_interleaver.SetLatency(m_conf.latency_frames);
+ }
+
+ if (m_conf.dump) {
+ edi_debug_file.open("./edi.debug");
+ }
+
+ if (m_conf.verbose) {
+ etiLog.log(info, "EDI set up");
+ }
+}
+
+void Sender::write(const TagPacket& tagpacket)
+{
+ // Assemble into one AF Packet
+ edi::AFPacket af_packet = edi_afPacketiser.Assemble(tagpacket);
+
+ if (m_conf.enable_pft) {
+ // Apply PFT layer to AF Packet (Reed Solomon FEC and Fragmentation)
+ vector<edi::PFTFragment> edi_fragments = edi_pft.Assemble(af_packet);
+
+ if (m_conf.verbose) {
+ fprintf(stderr, "EDI number of PFT fragment before interleaver %zu",
+ edi_fragments.size());
+ }
+
+ if (m_conf.interleaver_enabled()) {
+ edi_fragments = edi_interleaver.Interleave(edi_fragments);
+ }
+
+ // Send over ethernet
+ for (const auto& edi_frag : edi_fragments) {
+ for (auto& dest : m_conf.destinations) {
+ if (const auto& udp_dest = dynamic_pointer_cast<edi::udp_destination_t>(dest)) {
+ InetAddress addr;
+ addr.setAddress(udp_dest->dest_addr.c_str());
+ addr.setPort(m_conf.dest_port);
+
+ udp_sockets.at(udp_dest.get())->send(edi_frag, addr);
+ }
+ else {
+ throw std::logic_error("EDI destination not implemented");
+ }
+ }
+
+ if (m_conf.dump) {
+ std::ostream_iterator<uint8_t> debug_iterator(edi_debug_file);
+ std::copy(edi_frag.begin(), edi_frag.end(), debug_iterator);
+ }
+ }
+
+ if (m_conf.verbose) {
+ fprintf(stderr, "EDI number of PFT fragments %zu",
+ edi_fragments.size());
+ }
+ }
+ else {
+ // Send over ethernet
+ for (auto& dest : m_conf.destinations) {
+ if (const auto& udp_dest = dynamic_pointer_cast<edi::udp_destination_t>(dest)) {
+ InetAddress addr;
+ addr.setAddress(udp_dest->dest_addr.c_str());
+ addr.setPort(m_conf.dest_port);
+
+ udp_sockets.at(udp_dest.get())->send(af_packet, addr);
+ }
+ else {
+ throw std::logic_error("EDI destination not implemented");
+ }
+ }
+
+ if (m_conf.dump) {
+ std::ostream_iterator<uint8_t> debug_iterator(edi_debug_file);
+ std::copy(af_packet.begin(), af_packet.end(), debug_iterator);
+ }
+ }
+}
+
+}
diff --git a/src/dabOutput/edi/Transport.h b/src/dabOutput/edi/Transport.h
new file mode 100644
index 0000000..3c48c96
--- /dev/null
+++ b/src/dabOutput/edi/Transport.h
@@ -0,0 +1,68 @@
+/*
+ Copyright (C) 2019
+ Matthias P. Braendli, matthias.braendli@mpb.li
+
+ http://www.opendigitalradio.org
+
+ EDI output,
+ UDP and TCP transports and their configuration
+
+ */
+/*
+ This file is part of ODR-DabMux.
+
+ ODR-DabMux is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as
+ published by the Free Software Foundation, either version 3 of the
+ License, or (at your option) any later version.
+
+ ODR-DabMux is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#pragma once
+
+#include "config.h"
+#include "dabOutput/edi/Config.h"
+#include "AFPacket.h"
+#include "PFT.h"
+#include "Interleaver.h"
+#include <vector>
+#include <unordered_map>
+#include <stdexcept>
+#include <cstdint>
+#include "dabOutput/dabOutput.h"
+
+namespace edi {
+
+/** Configuration for EDI output */
+
+class Sender {
+ public:
+ Sender(const configuration_t& conf);
+
+ void write(const TagPacket& tagpacket);
+
+ private:
+ configuration_t m_conf;
+ std::ofstream edi_debug_file;
+
+ // The TagPacket will then be placed into an AFPacket
+ edi::AFPacketiser edi_afPacketiser;
+
+ // The AF Packet will be protected with reed-solomon and split in fragments
+ edi::PFT edi_pft;
+
+ // To mitigate for burst packet loss, PFT fragments can be sent out-of-order
+ edi::Interleaver edi_interleaver;
+
+ std::unordered_map<udp_destination_t*, std::shared_ptr<UdpSocket>> udp_sockets;
+};
+
+}
+
diff --git a/src/zmq2edi/EDISender.cpp b/src/zmq2edi/EDISender.cpp
index 0df633f..2128abf 100644
--- a/src/zmq2edi/EDISender.cpp
+++ b/src/zmq2edi/EDISender.cpp
@@ -47,51 +47,14 @@ EDISender::~EDISender()
}
}
-void EDISender::start(const edi_configuration_t& conf,
+void EDISender::start(const edi::configuration_t& conf,
int delay_ms, bool drop_late_packets)
{
edi_conf = conf;
tist_delay_ms = delay_ms;
drop_late = drop_late_packets;
- if (edi_conf.verbose) {
- etiLog.log(info, "Setup EDI");
- }
-
- if (edi_conf.dump) {
- edi_debug_file.open("./edi.debug");
- }
-
- if (edi_conf.enabled()) {
- for (auto& edi_destination : edi_conf.destinations) {
- auto edi_output = make_shared<UdpSocket>(edi_destination.source_port);
-
- if (not edi_destination.source_addr.empty()) {
- int err = edi_output->setMulticastSource(edi_destination.source_addr.c_str());
- if (err) {
- throw runtime_error("EDI socket set source failed!");
- }
- err = edi_output->setMulticastTTL(edi_destination.ttl);
- if (err) {
- throw runtime_error("EDI socket set TTL failed!");
- }
- }
-
- edi_destination.socket = edi_output;
- }
- }
-
- if (edi_conf.verbose) {
- etiLog.log(info, "EDI set up");
- }
-
- // The AF Packet will be protected with reed-solomon and split in fragments
- edi::PFT pft(edi_conf);
- edi_pft = pft;
-
- if (edi_conf.interleaver_enabled()) {
- edi_interleaver.SetLatency(edi_conf.latency_frames);
- }
+ edi_sender = make_shared<edi::Sender>(edi_conf);
startTime = std::chrono::steady_clock::now();
running.store(true);
@@ -106,19 +69,7 @@ void EDISender::push_frame(const frame_t& frame)
void EDISender::print_configuration()
{
if (edi_conf.enabled()) {
- etiLog.level(info) << "EDI";
- etiLog.level(info) << " verbose " << edi_conf.verbose;
- for (auto& edi_dest : edi_conf.destinations) {
- etiLog.level(info) << " to " << edi_dest.dest_addr << ":" << edi_conf.dest_port;
- if (not edi_dest.source_addr.empty()) {
- etiLog.level(info) << " source " << edi_dest.source_addr;
- etiLog.level(info) << " ttl " << edi_dest.ttl;
- }
- etiLog.level(info) << " source port " << edi_dest.source_port;
- }
- if (edi_conf.interleaver_enabled()) {
- etiLog.level(info) << " interleave " << edi_conf.latency_frames * 24 << " ms";
- }
+ edi_conf.print();
}
else {
etiLog.level(info) << "EDI disabled";
@@ -251,7 +202,7 @@ void EDISender::send_eti_frame(uint8_t* p, metadata_t metadata)
edi_tagDETI.utco = metadata.utc_offset;
edi_tagDETI.seconds = metadata.edi_time;
- if (edi_conf.enabled()) {
+ if (edi_sender and edi_conf.enabled()) {
// put tags *ptr, DETI and all subchannels into one TagPacket
edi_tagpacket.tag_items.push_back(&edi_tagStarPtr);
edi_tagpacket.tag_items.push_back(&edi_tagDETI);
@@ -260,58 +211,7 @@ void EDISender::send_eti_frame(uint8_t* p, metadata_t metadata)
edi_tagpacket.tag_items.push_back(&tag.second);
}
- // Assemble into one AF Packet
- edi::AFPacket edi_afpacket = edi_afPacketiser.Assemble(edi_tagpacket);
-
- if (edi_conf.enable_pft) {
- // Apply PFT layer to AF Packet (Reed Solomon FEC and Fragmentation)
- vector<edi::PFTFragment> edi_fragments = edi_pft.Assemble(edi_afpacket);
-
- if (edi_conf.verbose) {
- fprintf(stderr, "EDI number of PFT fragment before interleaver %zu\n",
- edi_fragments.size());
- }
-
- if (edi_conf.interleaver_enabled()) {
- edi_fragments = edi_interleaver.Interleave(edi_fragments);
- }
-
- // Send over ethernet
- for (const auto& edi_frag : edi_fragments) {
- for (auto& dest : edi_conf.destinations) {
- InetAddress addr;
- addr.setAddress(dest.dest_addr.c_str());
- addr.setPort(edi_conf.dest_port);
-
- dest.socket->send(edi_frag, addr);
- }
-
- if (edi_conf.dump) {
- std::ostream_iterator<uint8_t> debug_iterator(edi_debug_file);
- std::copy(edi_frag.begin(), edi_frag.end(), debug_iterator);
- }
- }
-
- if (edi_conf.verbose) {
- fprintf(stderr, "EDI number of PFT fragments %zu\n",
- edi_fragments.size());
- }
- }
- else {
- // Send over ethernet
- for (auto& dest : edi_conf.destinations) {
- InetAddress addr;
- addr.setAddress(dest.dest_addr.c_str());
- addr.setPort(edi_conf.dest_port);
-
- dest.socket->send(edi_afpacket, addr);
- }
-
- if (edi_conf.dump) {
- std::ostream_iterator<uint8_t> debug_iterator(edi_debug_file);
- std::copy(edi_afpacket.begin(), edi_afpacket.end(), debug_iterator);
- }
- }
+ edi_sender->write(edi_tagpacket);
}
}
diff --git a/src/zmq2edi/EDISender.h b/src/zmq2edi/EDISender.h
index 4c2af54..bb9c8bc 100644
--- a/src/zmq2edi/EDISender.h
+++ b/src/zmq2edi/EDISender.h
@@ -3,7 +3,7 @@
2011, 2012 Her Majesty the Queen in Right of Canada (Communications
Research Center Canada)
- Copyright (C) 2018
+ Copyright (C) 2019
Matthias P. Braendli, matthias.braendli@mpb.li
http://www.opendigitalradio.org
@@ -36,9 +36,7 @@
#include "dabOutput/dabOutput.h"
#include "dabOutput/edi/TagItems.h"
#include "dabOutput/edi/TagPacket.h"
-#include "dabOutput/edi/AFPacket.h"
-#include "dabOutput/edi/PFT.h"
-#include "dabOutput/edi/Interleaver.h"
+#include "dabOutput/edi/Transport.h"
// This metadata gets transmitted in the zmq stream
struct metadata_t {
@@ -55,7 +53,7 @@ class EDISender {
EDISender(const EDISender& other) = delete;
EDISender& operator=(const EDISender& other) = delete;
~EDISender();
- void start(const edi_configuration_t& conf,
+ void start(const edi::configuration_t& conf,
int delay_ms, bool drop_late_packets);
void push_frame(const frame_t& frame);
void print_configuration(void);
@@ -68,19 +66,11 @@ class EDISender {
bool drop_late;
std::atomic<bool> running;
std::thread process_thread;
- edi_configuration_t edi_conf;
+ edi::configuration_t edi_conf;
std::chrono::steady_clock::time_point startTime;
ThreadsafeQueue<frame_t> frames;
- std::ofstream edi_debug_file;
- // The TagPacket will then be placed into an AFPacket
- edi::AFPacketiser edi_afPacketiser;
-
- // The AF Packet will be protected with reed-solomon and split in fragments
- edi::PFT edi_pft;
-
- // To mitigate for burst packet loss, PFT fragments can be sent out-of-order
- edi::Interleaver edi_interleaver;
+ std::shared_ptr<edi::Sender> edi_sender;
// For statistics about wait time before we transmit packets,
// in microseconds
diff --git a/src/zmq2edi/zmq2edi.cpp b/src/zmq2edi/zmq2edi.cpp
index 3364faa..ee5776e 100644
--- a/src/zmq2edi/zmq2edi.cpp
+++ b/src/zmq2edi/zmq2edi.cpp
@@ -3,7 +3,7 @@
2011, 2012 Her Majesty the Queen in Right of Canada (Communications
Research Center Canada)
- Copyright (C) 2018
+ Copyright (C) 2019
Matthias P. Braendli, matthias.braendli@mpb.li
http://www.opendigitalradio.org
@@ -40,7 +40,7 @@
constexpr size_t MAX_ERROR_COUNT = 10;
constexpr long ZMQ_TIMEOUT_MS = 1000;
-static edi_configuration_t edi_conf;
+static edi::configuration_t edi_conf;
static EDISender edisender;
@@ -155,7 +155,7 @@ static metadata_t get_md_one_frame(uint8_t *buf, size_t size, size_t *consumed_b
/* There is some state inside the parsing of destination arguments,
* because several destinations can be given. */
-static edi_destination_t edi_destination;
+static std::shared_ptr<edi::udp_destination_t> edi_destination;
static bool source_port_set = false;
static bool source_addr_set = false;
static bool ttl_set = false;
@@ -168,9 +168,8 @@ static void add_edi_destination(void)
std::to_string(edi_conf.destinations.size() + 1));
}
- edi_conf.destinations.push_back(edi_destination);
- edi_destination_t newdest;
- edi_destination = newdest;
+ edi_conf.destinations.push_back(move(edi_destination));
+ edi_destination.reset();
source_port_set = false;
source_addr_set = false;
@@ -180,33 +179,37 @@ static void add_edi_destination(void)
static void parse_destination_args(char option)
{
+ if (not edi_destination) {
+ edi_destination = std::make_shared<edi::udp_destination_t>();
+ }
+
switch (option) {
case 's':
if (source_port_set) {
add_edi_destination();
}
- edi_destination.source_port = std::stoi(optarg);
+ edi_destination->source_port = std::stoi(optarg);
source_port_set = true;
break;
case 'S':
if (source_addr_set) {
add_edi_destination();
}
- edi_destination.source_addr = optarg;
+ edi_destination->source_addr = optarg;
source_addr_set = true;
break;
case 't':
if (ttl_set) {
add_edi_destination();
}
- edi_destination.ttl = std::stoi(optarg);
+ edi_destination->ttl = std::stoi(optarg);
ttl_set = true;
break;
case 'd':
if (dest_addr_set) {
add_edi_destination();
}
- edi_destination.dest_addr = optarg;
+ edi_destination->dest_addr = optarg;
dest_addr_set = true;
break;
default: