aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMatthias P. Braendli <matthias.braendli@mpb.li>2021-02-24 10:23:55 +0100
committerMatthias P. Braendli <matthias.braendli@mpb.li>2021-02-24 10:23:55 +0100
commitcdeded18deb9ef2a12bf5206757f235e3925c848 (patch)
treed704e6cf917fb4b9d77343305155db918da8395e
parent4516c0079dabc46d6787eeb037fd7b054ce8c81b (diff)
downloadODR-SourceCompanion-cdeded18deb9ef2a12bf5206757f235e3925c848.tar.gz
ODR-SourceCompanion-cdeded18deb9ef2a12bf5206757f235e3925c848.tar.bz2
ODR-SourceCompanion-cdeded18deb9ef2a12bf5206757f235e3925c848.zip
Common fc2902b and 4ad00b8: Update EDI output interleaver and spreading
-rw-r--r--lib/edioutput/EDIConfig.h5
-rw-r--r--lib/edioutput/Transport.cpp128
-rw-r--r--lib/edioutput/Transport.h20
3 files changed, 100 insertions, 53 deletions
diff --git a/lib/edioutput/EDIConfig.h b/lib/edioutput/EDIConfig.h
index 647d77e..be6c9c4 100644
--- a/lib/edioutput/EDIConfig.h
+++ b/lib/edioutput/EDIConfig.h
@@ -71,10 +71,11 @@ struct configuration_t {
bool enable_pft = false; // Enable protection and fragmentation
unsigned int tagpacket_alignment = 0;
std::vector<std::shared_ptr<destination_t> > destinations;
- unsigned int latency_frames = 0; // if nonzero, enable interleaver with a latency of latency_frames * 24ms
+ double fragment_spreading_factor = 0.95;
+ // Spread transmission of fragments in time. 1.0 = 100% means spreading over the whole duration of a frame (24ms)
+ // Above 100% means that the fragments are spread over several 24ms periods, interleaving the AF packets.
bool enabled() const { return destinations.size() > 0; }
- bool interleaver_enabled() const { return latency_frames > 0; }
void print() const;
};
diff --git a/lib/edioutput/Transport.cpp b/lib/edioutput/Transport.cpp
index f8e5dc7..136c71c 100644
--- a/lib/edioutput/Transport.cpp
+++ b/lib/edioutput/Transport.cpp
@@ -27,6 +27,7 @@
#include "Transport.h"
#include <iterator>
#include <cmath>
+#include <thread>
using namespace std;
@@ -57,9 +58,6 @@ void configuration_t::print() const
throw logic_error("EDI destination not implemented");
}
}
- if (interleaver_enabled()) {
- etiLog.level(info) << " interleave " << latency_frames * 24 << " ms";
- }
}
@@ -96,19 +94,33 @@ Sender::Sender(const configuration_t& conf) :
}
}
- if (m_conf.interleaver_enabled()) {
- edi_interleaver.SetLatency(m_conf.latency_frames);
- }
-
if (m_conf.dump) {
edi_debug_file.open("./edi.debug");
}
+ if (m_conf.enable_pft) {
+ unique_lock<mutex> lock(m_mutex);
+ m_running = true;
+ m_thread = thread(&Sender::run, this);
+ }
+
if (m_conf.verbose) {
etiLog.log(info, "EDI output set up");
}
}
+Sender::~Sender()
+{
+ {
+ unique_lock<mutex> lock(m_mutex);
+ m_running = false;
+ }
+
+ if (m_thread.joinable()) {
+ m_thread.join();
+ }
+}
+
void Sender::write(const TagPacket& tagpacket)
{
// Assemble into one AF Packet
@@ -119,57 +131,34 @@ void Sender::write(const TagPacket& tagpacket)
vector<edi::PFTFragment> edi_fragments = edi_pft.Assemble(af_packet);
if (m_conf.verbose) {
- fprintf(stderr, "EDI Output: Number of PFT fragment before interleaver %zu\n",
- edi_fragments.size());
- }
-
- if (m_conf.interleaver_enabled()) {
- edi_fragments = edi_interleaver.Interleave(edi_fragments);
- }
-
- if (m_conf.verbose) {
fprintf(stderr, "EDI Output: Number of PFT fragments %zu\n",
edi_fragments.size());
}
/* Spread out the transmission of all fragments over 25% of the 24ms AF packet duration
- * to reduce the risk of losing a burst of fragments because of congestion.
- *
- * 25% was chosen so that other outputs still have time to do their thing. */
- auto inter_fragment_wait_time = std::chrono::microseconds(0);
+ * to reduce the risk of losing a burst of fragments because of congestion. */
+ using namespace std::chrono;
+ auto inter_fragment_wait_time = microseconds(0);
if (edi_fragments.size() > 1) {
- inter_fragment_wait_time = std::chrono::microseconds(llrint(0.25 * 24000.0 / edi_fragments.size()));
+ inter_fragment_wait_time = microseconds(
+ llrint(m_conf.fragment_spreading_factor * 24000.0 / edi_fragments.size())
+ );
}
- // Send over ethernet
- for (auto& edi_frag : edi_fragments) {
- if (m_conf.dump) {
- ostream_iterator<uint8_t> debug_iterator(edi_debug_file);
- copy(edi_frag.begin(), edi_frag.end(), debug_iterator);
+ /* Separate insertion into map and transmission so as to make spreading possible */
+ const auto now = steady_clock::now();
+ {
+ auto tp = now;
+ unique_lock<mutex> lock(m_mutex);
+ for (auto& edi_frag : edi_fragments) {
+ m_pending_frames[tp] = move(edi_frag);
+ tp += inter_fragment_wait_time;
}
-
- for (auto& dest : m_conf.destinations) {
- if (const auto& udp_dest = dynamic_pointer_cast<edi::udp_destination_t>(dest)) {
- Socket::InetAddress addr;
- addr.resolveUdpDestination(udp_dest->dest_addr, udp_dest->dest_port);
-
- udp_sockets.at(udp_dest.get())->send(edi_frag, addr);
- }
- else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_server_t>(dest)) {
- tcp_dispatchers.at(tcp_dest.get())->write(edi_frag);
- }
- else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_client_t>(dest)) {
- tcp_senders.at(tcp_dest.get())->sendall(edi_frag);
- }
- else {
- throw logic_error("EDI destination not implemented");
- }
- }
-
- std::this_thread::sleep_for(inter_fragment_wait_time);
}
+
+ // Transmission done in run() function
}
- else {
+ else /* PFT disabled */ {
// Send over ethernet
if (m_conf.dump) {
ostream_iterator<uint8_t> debug_iterator(edi_debug_file);
@@ -202,4 +191,49 @@ void Sender::write(const TagPacket& tagpacket)
}
}
+void Sender::run()
+{
+ while (m_running) {
+ unique_lock<mutex> lock(m_mutex);
+ const auto now = chrono::steady_clock::now();
+
+ // Send over ethernet
+ for (auto it = m_pending_frames.begin(); it != m_pending_frames.end(); ) {
+ const auto& edi_frag = it->second;
+
+ if (it->first <= now) {
+ if (m_conf.dump) {
+ ostream_iterator<uint8_t> debug_iterator(edi_debug_file);
+ copy(edi_frag.begin(), edi_frag.end(), debug_iterator);
+ }
+
+ for (auto& dest : m_conf.destinations) {
+ if (const auto& udp_dest = dynamic_pointer_cast<edi::udp_destination_t>(dest)) {
+ Socket::InetAddress addr;
+ addr.resolveUdpDestination(udp_dest->dest_addr, udp_dest->dest_port);
+
+ udp_sockets.at(udp_dest.get())->send(edi_frag, addr);
+ }
+ else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_server_t>(dest)) {
+ tcp_dispatchers.at(tcp_dest.get())->write(edi_frag);
+ }
+ else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_client_t>(dest)) {
+ tcp_senders.at(tcp_dest.get())->sendall(edi_frag);
+ }
+ else {
+ throw logic_error("EDI destination not implemented");
+ }
+ }
+ it = m_pending_frames.erase(it);
+ }
+ else {
+ ++it;
+ }
+ }
+
+ lock.unlock();
+ this_thread::sleep_for(chrono::microseconds(500));
+ }
+}
+
}
diff --git a/lib/edioutput/Transport.h b/lib/edioutput/Transport.h
index 56ded3b..3bcc2f4 100644
--- a/lib/edioutput/Transport.h
+++ b/lib/edioutput/Transport.h
@@ -31,13 +31,16 @@
#include "EDIConfig.h"
#include "AFPacket.h"
#include "PFT.h"
-#include "Interleaver.h"
#include "Socket.h"
#include <vector>
+#include <chrono>
+#include <map>
#include <unordered_map>
#include <stdexcept>
#include <fstream>
#include <cstdint>
+#include <thread>
+#include <mutex>
namespace edi {
@@ -46,10 +49,15 @@ namespace edi {
class Sender {
public:
Sender(const configuration_t& conf);
+ Sender(const Sender&) = delete;
+ Sender operator=(const Sender&) = delete;
+ ~Sender();
void write(const TagPacket& tagpacket);
private:
+ void run();
+
bool m_udp_fragmentation_warning_printed = false;
configuration_t m_conf;
@@ -61,12 +69,16 @@ class Sender {
// The AF Packet will be protected with reed-solomon and split in fragments
edi::PFT edi_pft;
- // To mitigate for burst packet loss, PFT fragments can be sent out-of-order
- edi::Interleaver edi_interleaver;
-
std::unordered_map<udp_destination_t*, std::shared_ptr<Socket::UDPSocket>> udp_sockets;
std::unordered_map<tcp_server_t*, std::shared_ptr<Socket::TCPDataDispatcher>> tcp_dispatchers;
std::unordered_map<tcp_client_t*, std::shared_ptr<Socket::TCPSendClient>> tcp_senders;
+
+ // PFT spreading requires sending UDP packets at specific time, independently of
+ // time when write() gets called
+ std::thread m_thread;
+ std::mutex m_mutex;
+ bool m_running = false;
+ std::map<std::chrono::steady_clock::time_point, edi::PFTFragment> m_pending_frames;
};
}