diff options
Diffstat (limited to 'contrib/edioutput')
-rw-r--r-- | contrib/edioutput/EDIConfig.h | 5 | ||||
-rw-r--r-- | contrib/edioutput/Interleaver.cpp | 122 | ||||
-rw-r--r-- | contrib/edioutput/Interleaver.h | 75 | ||||
-rw-r--r-- | contrib/edioutput/Transport.cpp | 128 | ||||
-rw-r--r-- | contrib/edioutput/Transport.h | 20 |
5 files changed, 100 insertions, 250 deletions
diff --git a/contrib/edioutput/EDIConfig.h b/contrib/edioutput/EDIConfig.h index 647d77e..be6c9c4 100644 --- a/contrib/edioutput/EDIConfig.h +++ b/contrib/edioutput/EDIConfig.h @@ -71,10 +71,11 @@ struct configuration_t { bool enable_pft = false; // Enable protection and fragmentation unsigned int tagpacket_alignment = 0; std::vector<std::shared_ptr<destination_t> > destinations; - unsigned int latency_frames = 0; // if nonzero, enable interleaver with a latency of latency_frames * 24ms + double fragment_spreading_factor = 0.95; + // Spread transmission of fragments in time. 1.0 = 100% means spreading over the whole duration of a frame (24ms) + // Above 100% means that the fragments are spread over several 24ms periods, interleaving the AF packets. bool enabled() const { return destinations.size() > 0; } - bool interleaver_enabled() const { return latency_frames > 0; } void print() const; }; diff --git a/contrib/edioutput/Interleaver.cpp b/contrib/edioutput/Interleaver.cpp deleted file mode 100644 index f26a50e..0000000 --- a/contrib/edioutput/Interleaver.cpp +++ /dev/null @@ -1,122 +0,0 @@ -/* - Copyright (C) 2017 - Matthias P. Braendli, matthias.braendli@mpb.li - - http://www.opendigitalradio.org - - EDI output, - Interleaving of PFT fragments to increase robustness against - burst packet loss. - - This is possible because EDI has to assume that fragments may reach - the receiver out of order. - - */ -/* - This file is part of ODR-DabMux. - - ODR-DabMux is free software: you can redistribute it and/or modify - it under the terms of the GNU General Public License as - published by the Free Software Foundation, either version 3 of the - License, or (at your option) any later version. - - ODR-DabMux is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>. - */ - -#include "Interleaver.h" -#include <cassert> - -namespace edi { - -void Interleaver::SetLatency(size_t latency_frames) -{ - m_latency = latency_frames; -} - -Interleaver::fragment_vec Interleaver::Interleave(fragment_vec &fragments) -{ - m_fragment_count = fragments.size(); - - // Create vectors containing Fcount*latency fragments in total - // and store them into the deque - if (m_buffer.empty()) { - m_buffer.emplace_back(); - } - - auto& last_buffer = m_buffer.back(); - - for (auto& fragment : fragments) { - const bool last_buffer_is_complete = - (last_buffer.size() >= m_fragment_count * m_latency); - - if (last_buffer_is_complete) { - m_buffer.emplace_back(); - last_buffer = m_buffer.back(); - } - - last_buffer.push_back(std::move(fragment)); - } - - fragments.clear(); - - while ( not m_buffer.empty() and - (m_buffer.front().size() >= m_fragment_count * m_latency)) { - - auto& first_buffer = m_buffer.front(); - - assert(first_buffer.size() == m_fragment_count * m_latency); - - /* Assume we have 5 fragments per AF frame, and latency of 3. - * This will give the following strides: - * 0 1 2 - * +-------+-------+---+ - * | 0 1 | 2 3 | 4 | - * | | +---+ | - * | 5 6 | 7 | 8 9 | - * | +---+ | | - * |10 |11 12 |13 14 | - * +---+-------+-------+ - * - * ix will be 0, 5, 10, 1, 6 in the first loop - */ - - for (size_t i = 0; i < m_fragment_count; i++) { - const size_t ix = m_interleave_offset + m_fragment_count * m_stride; - m_interleaved_fragments.push_back(first_buffer.at(ix)); - - m_stride += 1; - if (m_stride >= m_latency) { - m_interleave_offset++; - m_stride = 0; - } - } - - if (m_interleave_offset >= m_fragment_count) { - m_interleave_offset = 0; - m_stride = 0; - m_buffer.pop_front(); - } - } - - std::vector<PFTFragment> interleaved_frags; - - const size_t n = std::min(m_fragment_count, m_interleaved_fragments.size()); - std::move(m_interleaved_fragments.begin(), - m_interleaved_fragments.begin() + n, - std::back_inserter(interleaved_frags)); - m_interleaved_fragments.erase( - m_interleaved_fragments.begin(), - m_interleaved_fragments.begin() + n); - - return interleaved_frags; -} - -} - - diff --git a/contrib/edioutput/Interleaver.h b/contrib/edioutput/Interleaver.h deleted file mode 100644 index 3029d5d..0000000 --- a/contrib/edioutput/Interleaver.h +++ /dev/null @@ -1,75 +0,0 @@ -/* - Copyright (C) 2017 - Matthias P. Braendli, matthias.braendli@mpb.li - - http://www.opendigitalradio.org - - EDI output, - Interleaving of PFT fragments to increase robustness against - burst packet loss. - - This is possible because EDI has to assume that fragments may reach - the receiver out of order. - - */ -/* - This file is part of the ODR-mmbTools. - - This program is free software: you can redistribute it and/or modify - it under the terms of the GNU General Public License as - published by the Free Software Foundation, either version 3 of the - License, or (at your option) any later version. - - This program is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with this program. If not, see <http://www.gnu.org/licenses/>. - */ - -#pragma once - -#include "config.h" -#include <vector> -#include <deque> -#include <stdexcept> -#include <cstdint> -#include "Log.h" -#include "PFT.h" - -namespace edi { - -class Interleaver { - public: - using fragment_vec = std::vector<PFTFragment>; - - /* Configure the interleaver to use latency_frames number of AF - * packets for interleaving. Total delay through the interleaver - * will be latency_frames * 24ms - */ - void SetLatency(size_t latency_frames); - - /* Move the fragments for an AF Packet into the interleaver and - * return interleaved fragments to be transmitted. - */ - fragment_vec Interleave(fragment_vec &fragments); - - private: - size_t m_latency = 0; - size_t m_fragment_count = 0; - size_t m_interleave_offset = 0; - size_t m_stride = 0; - - /* Buffer that accumulates enough fragments to interleave */ - std::deque<fragment_vec> m_buffer; - - /* Buffer that contains fragments that have been interleaved, - * to avoid that the interleaver output is too bursty - */ - std::deque<PFTFragment> m_interleaved_fragments; -}; - -} - diff --git a/contrib/edioutput/Transport.cpp b/contrib/edioutput/Transport.cpp index f8e5dc7..136c71c 100644 --- a/contrib/edioutput/Transport.cpp +++ b/contrib/edioutput/Transport.cpp @@ -27,6 +27,7 @@ #include "Transport.h" #include <iterator> #include <cmath> +#include <thread> using namespace std; @@ -57,9 +58,6 @@ void configuration_t::print() const throw logic_error("EDI destination not implemented"); } } - if (interleaver_enabled()) { - etiLog.level(info) << " interleave " << latency_frames * 24 << " ms"; - } } @@ -96,19 +94,33 @@ Sender::Sender(const configuration_t& conf) : } } - if (m_conf.interleaver_enabled()) { - edi_interleaver.SetLatency(m_conf.latency_frames); - } - if (m_conf.dump) { edi_debug_file.open("./edi.debug"); } + if (m_conf.enable_pft) { + unique_lock<mutex> lock(m_mutex); + m_running = true; + m_thread = thread(&Sender::run, this); + } + if (m_conf.verbose) { etiLog.log(info, "EDI output set up"); } } +Sender::~Sender() +{ + { + unique_lock<mutex> lock(m_mutex); + m_running = false; + } + + if (m_thread.joinable()) { + m_thread.join(); + } +} + void Sender::write(const TagPacket& tagpacket) { // Assemble into one AF Packet @@ -119,57 +131,34 @@ void Sender::write(const TagPacket& tagpacket) vector<edi::PFTFragment> edi_fragments = edi_pft.Assemble(af_packet); if (m_conf.verbose) { - fprintf(stderr, "EDI Output: Number of PFT fragment before interleaver %zu\n", - edi_fragments.size()); - } - - if (m_conf.interleaver_enabled()) { - edi_fragments = edi_interleaver.Interleave(edi_fragments); - } - - if (m_conf.verbose) { fprintf(stderr, "EDI Output: Number of PFT fragments %zu\n", edi_fragments.size()); } /* Spread out the transmission of all fragments over 25% of the 24ms AF packet duration - * to reduce the risk of losing a burst of fragments because of congestion. - * - * 25% was chosen so that other outputs still have time to do their thing. */ - auto inter_fragment_wait_time = std::chrono::microseconds(0); + * to reduce the risk of losing a burst of fragments because of congestion. */ + using namespace std::chrono; + auto inter_fragment_wait_time = microseconds(0); if (edi_fragments.size() > 1) { - inter_fragment_wait_time = std::chrono::microseconds(llrint(0.25 * 24000.0 / edi_fragments.size())); + inter_fragment_wait_time = microseconds( + llrint(m_conf.fragment_spreading_factor * 24000.0 / edi_fragments.size()) + ); } - // Send over ethernet - for (auto& edi_frag : edi_fragments) { - if (m_conf.dump) { - ostream_iterator<uint8_t> debug_iterator(edi_debug_file); - copy(edi_frag.begin(), edi_frag.end(), debug_iterator); + /* Separate insertion into map and transmission so as to make spreading possible */ + const auto now = steady_clock::now(); + { + auto tp = now; + unique_lock<mutex> lock(m_mutex); + for (auto& edi_frag : edi_fragments) { + m_pending_frames[tp] = move(edi_frag); + tp += inter_fragment_wait_time; } - - for (auto& dest : m_conf.destinations) { - if (const auto& udp_dest = dynamic_pointer_cast<edi::udp_destination_t>(dest)) { - Socket::InetAddress addr; - addr.resolveUdpDestination(udp_dest->dest_addr, udp_dest->dest_port); - - udp_sockets.at(udp_dest.get())->send(edi_frag, addr); - } - else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_server_t>(dest)) { - tcp_dispatchers.at(tcp_dest.get())->write(edi_frag); - } - else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_client_t>(dest)) { - tcp_senders.at(tcp_dest.get())->sendall(edi_frag); - } - else { - throw logic_error("EDI destination not implemented"); - } - } - - std::this_thread::sleep_for(inter_fragment_wait_time); } + + // Transmission done in run() function } - else { + else /* PFT disabled */ { // Send over ethernet if (m_conf.dump) { ostream_iterator<uint8_t> debug_iterator(edi_debug_file); @@ -202,4 +191,49 @@ void Sender::write(const TagPacket& tagpacket) } } +void Sender::run() +{ + while (m_running) { + unique_lock<mutex> lock(m_mutex); + const auto now = chrono::steady_clock::now(); + + // Send over ethernet + for (auto it = m_pending_frames.begin(); it != m_pending_frames.end(); ) { + const auto& edi_frag = it->second; + + if (it->first <= now) { + if (m_conf.dump) { + ostream_iterator<uint8_t> debug_iterator(edi_debug_file); + copy(edi_frag.begin(), edi_frag.end(), debug_iterator); + } + + for (auto& dest : m_conf.destinations) { + if (const auto& udp_dest = dynamic_pointer_cast<edi::udp_destination_t>(dest)) { + Socket::InetAddress addr; + addr.resolveUdpDestination(udp_dest->dest_addr, udp_dest->dest_port); + + udp_sockets.at(udp_dest.get())->send(edi_frag, addr); + } + else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_server_t>(dest)) { + tcp_dispatchers.at(tcp_dest.get())->write(edi_frag); + } + else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_client_t>(dest)) { + tcp_senders.at(tcp_dest.get())->sendall(edi_frag); + } + else { + throw logic_error("EDI destination not implemented"); + } + } + it = m_pending_frames.erase(it); + } + else { + ++it; + } + } + + lock.unlock(); + this_thread::sleep_for(chrono::microseconds(500)); + } +} + } diff --git a/contrib/edioutput/Transport.h b/contrib/edioutput/Transport.h index 56ded3b..3bcc2f4 100644 --- a/contrib/edioutput/Transport.h +++ b/contrib/edioutput/Transport.h @@ -31,13 +31,16 @@ #include "EDIConfig.h" #include "AFPacket.h" #include "PFT.h" -#include "Interleaver.h" #include "Socket.h" #include <vector> +#include <chrono> +#include <map> #include <unordered_map> #include <stdexcept> #include <fstream> #include <cstdint> +#include <thread> +#include <mutex> namespace edi { @@ -46,10 +49,15 @@ namespace edi { class Sender { public: Sender(const configuration_t& conf); + Sender(const Sender&) = delete; + Sender operator=(const Sender&) = delete; + ~Sender(); void write(const TagPacket& tagpacket); private: + void run(); + bool m_udp_fragmentation_warning_printed = false; configuration_t m_conf; @@ -61,12 +69,16 @@ class Sender { // The AF Packet will be protected with reed-solomon and split in fragments edi::PFT edi_pft; - // To mitigate for burst packet loss, PFT fragments can be sent out-of-order - edi::Interleaver edi_interleaver; - std::unordered_map<udp_destination_t*, std::shared_ptr<Socket::UDPSocket>> udp_sockets; std::unordered_map<tcp_server_t*, std::shared_ptr<Socket::TCPDataDispatcher>> tcp_dispatchers; std::unordered_map<tcp_client_t*, std::shared_ptr<Socket::TCPSendClient>> tcp_senders; + + // PFT spreading requires sending UDP packets at specific time, independently of + // time when write() gets called + std::thread m_thread; + std::mutex m_mutex; + bool m_running = false; + std::map<std::chrono::steady_clock::time_point, edi::PFTFragment> m_pending_frames; }; } |