aboutsummaryrefslogtreecommitdiffstats
path: root/lib/edioutput/Transport.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'lib/edioutput/Transport.cpp')
-rw-r--r--lib/edioutput/Transport.cpp288
1 files changed, 152 insertions, 136 deletions
diff --git a/lib/edioutput/Transport.cpp b/lib/edioutput/Transport.cpp
index a5e0bc3..e9559b5 100644
--- a/lib/edioutput/Transport.cpp
+++ b/lib/edioutput/Transport.cpp
@@ -25,7 +25,7 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "Transport.h"
-#include <iterator>
+#include "Log.h"
#include <cmath>
#include <thread>
@@ -57,13 +57,18 @@ void configuration_t::print() const
else {
throw logic_error("EDI destination not implemented");
}
+ etiLog.level(info) << " PFT=" << edi_dest->pft_settings.enable_pft;
+ if (edi_dest->pft_settings.enable_pft) {
+ etiLog.level(info) << " FEC=" << edi_dest->pft_settings.fec;
+ etiLog.level(info) << " Chunk Len=" << edi_dest->pft_settings.chunk_len;
+ etiLog.level(info) << " Fragment spreading factor=" << edi_dest->pft_settings.fragment_spreading_factor;
+ }
}
}
Sender::Sender(const configuration_t& conf) :
- m_conf(conf),
- edi_pft(m_conf)
+ m_conf(conf)
{
if (m_conf.verbose) {
etiLog.level(info) << "Setup EDI Output";
@@ -71,37 +76,39 @@ Sender::Sender(const configuration_t& conf) :
for (const auto& edi_dest : m_conf.destinations) {
if (const auto udp_dest = dynamic_pointer_cast<edi::udp_destination_t>(edi_dest)) {
- auto udp_socket = std::make_shared<Socket::UDPSocket>(udp_dest->source_port);
+ Socket::UDPSocket udp_socket(udp_dest->source_port);
if (not udp_dest->source_addr.empty()) {
- udp_socket->setMulticastSource(udp_dest->source_addr.c_str());
- udp_socket->setMulticastTTL(udp_dest->ttl);
+ udp_socket.setMulticastSource(udp_dest->source_addr.c_str());
+ udp_socket.setMulticastTTL(udp_dest->ttl);
}
- udp_sockets.emplace(udp_dest.get(), udp_socket);
+ auto sender = make_shared<udp_sender_t>(
+ udp_dest->dest_addr,
+ udp_dest->dest_port,
+ std::move(udp_socket));
+ m_pft_spreaders.emplace_back(
+ make_shared<PFTSpreader>(udp_dest->pft_settings, sender));
}
else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_server_t>(edi_dest)) {
- auto dispatcher = make_shared<Socket::TCPDataDispatcher>(
- tcp_dest->max_frames_queued, tcp_dest->tcp_server_preroll_buffers);
-
- dispatcher->start(tcp_dest->listen_port, "0.0.0.0");
- tcp_dispatchers.emplace(tcp_dest.get(), dispatcher);
+ auto sender = make_shared<tcp_dispatcher_t>(
+ tcp_dest->listen_port,
+ tcp_dest->max_frames_queued,
+ tcp_dest->tcp_server_preroll_buffers);
+ m_pft_spreaders.emplace_back(
+ make_shared<PFTSpreader>(tcp_dest->pft_settings, sender));
}
else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_client_t>(edi_dest)) {
- auto tcp_send_client = make_shared<Socket::TCPSendClient>(tcp_dest->dest_addr, tcp_dest->dest_port);
- tcp_senders.emplace(tcp_dest.get(), tcp_send_client);
+ auto sender = make_shared<tcp_send_client_t>(tcp_dest->dest_addr, tcp_dest->dest_port);
+ m_pft_spreaders.emplace_back(
+ make_shared<PFTSpreader>(tcp_dest->pft_settings, sender));
}
else {
throw logic_error("EDI destination not implemented");
}
}
- if (m_conf.dump) {
- edi_debug_file.open("./edi.debug");
- }
-
- if (m_conf.enable_pft) {
- unique_lock<mutex> lock(m_mutex);
+ {
m_running = true;
m_thread = thread(&Sender::run, this);
}
@@ -111,10 +118,52 @@ Sender::Sender(const configuration_t& conf) :
}
}
+void Sender::write(const TagPacket& tagpacket)
+{
+ // Assemble into one AF Packet
+ edi::AFPacket af_packet = edi_af_packetiser.Assemble(tagpacket);
+
+ write(af_packet);
+}
+
+void Sender::write(const AFPacket& af_packet)
+{
+ for (auto& sender : m_pft_spreaders) {
+ sender->send_af_packet(af_packet);
+ }
+}
+
+void Sender::override_af_sequence(uint16_t seq)
+{
+ edi_af_packetiser.OverrideSeq(seq);
+}
+
+void Sender::override_pft_sequence(uint16_t pseq)
+{
+ for (auto& spreader : m_pft_spreaders) {
+ spreader->edi_pft.OverridePSeq(pseq);
+ }
+}
+
+std::vector<Sender::stats_t> Sender::get_tcp_server_stats() const
+{
+ std::vector<Sender::stats_t> stats;
+
+ for (auto& spreader : m_pft_spreaders) {
+ if (auto sender = std::dynamic_pointer_cast<tcp_dispatcher_t>(spreader->sender)) {
+ Sender::stats_t s;
+ s.listen_port = sender->listen_port;
+ s.stats = sender->sock.get_stats();
+ stats.push_back(s);
+ }
+ }
+
+ return stats;
+}
+
Sender::~Sender()
{
{
- unique_lock<mutex> lock(m_mutex);
m_running = false;
}
@@ -123,36 +172,89 @@ Sender::~Sender()
}
}
-void Sender::write(const TagPacket& tagpacket)
+void Sender::run()
{
- // Assemble into one AF Packet
- edi::AFPacket af_packet = edi_afPacketiser.Assemble(tagpacket);
+ while (m_running) {
+ const auto now = chrono::steady_clock::now();
+ for (auto& spreader : m_pft_spreaders) {
+ spreader->tick(now);
+ }
- write(af_packet);
+ this_thread::sleep_for(chrono::microseconds(500));
+ }
}
-void Sender::write(const AFPacket& af_packet)
+
+void Sender::udp_sender_t::send_packet(const std::vector<uint8_t> &frame)
+{
+ Socket::InetAddress addr;
+ addr.resolveUdpDestination(dest_addr, dest_port);
+ sock.send(frame, addr);
+}
+
+void Sender::tcp_dispatcher_t::send_packet(const std::vector<uint8_t> &frame)
+{
+ sock.write(frame);
+}
+
+void Sender::tcp_send_client_t::send_packet(const std::vector<uint8_t> &frame)
+{
+ sock.sendall(frame);
+}
+
+Sender::udp_sender_t::udp_sender_t(std::string dest_addr,
+ uint16_t dest_port,
+ Socket::UDPSocket&& sock) :
+ dest_addr(dest_addr),
+ dest_port(dest_port),
+ sock(std::move(sock))
+{
+}
+
+Sender::tcp_dispatcher_t::tcp_dispatcher_t(uint16_t listen_port,
+ size_t max_frames_queued,
+ size_t tcp_server_preroll_buffers) :
+ listen_port(listen_port),
+ sock(max_frames_queued, tcp_server_preroll_buffers)
{
- if (m_conf.enable_pft) {
+ sock.start(listen_port, "0.0.0.0");
+}
+
+Sender::tcp_send_client_t::tcp_send_client_t(const std::string &dest_addr,
+ uint16_t dest_port) :
+ sock(dest_addr, dest_port)
+{
+}
+
+Sender::PFTSpreader::PFTSpreader(const pft_settings_t& conf, sender_sp sender) :
+ sender(sender),
+ edi_pft(conf)
+{
+}
+
+void Sender::PFTSpreader::send_af_packet(const AFPacket& af_packet)
+{
+ using namespace std::chrono;
+ if (edi_pft.is_enabled()) {
// Apply PFT layer to AF Packet (Reed Solomon FEC and Fragmentation)
vector<edi::PFTFragment> edi_fragments = edi_pft.Assemble(af_packet);
- if (m_conf.verbose and m_last_num_pft_fragments != edi_fragments.size()) {
+ if (settings.verbose and last_num_pft_fragments != edi_fragments.size()) {
etiLog.log(debug, "EDI Output: Number of PFT fragments %zu\n",
edi_fragments.size());
- m_last_num_pft_fragments = edi_fragments.size();
+ last_num_pft_fragments = edi_fragments.size();
}
/* Spread out the transmission of all fragments over part of the 24ms AF packet duration
* to reduce the risk of losing a burst of fragments because of congestion. */
- using namespace std::chrono;
auto inter_fragment_wait_time = microseconds(1);
if (edi_fragments.size() > 1) {
- if (m_conf.fragment_spreading_factor > 0) {
+ if (settings.fragment_spreading_factor > 0) {
inter_fragment_wait_time =
- microseconds(
- llrint(m_conf.fragment_spreading_factor * 24000.0 / edi_fragments.size())
- );
+ microseconds(llrint(
+ settings.fragment_spreading_factor * 24000.0 /
+ edi_fragments.size()
+ ));
}
}
@@ -162,121 +264,35 @@ void Sender::write(const AFPacket& af_packet)
auto tp = now;
unique_lock<mutex> lock(m_mutex);
for (auto& edi_frag : edi_fragments) {
- m_pending_frames[tp] = move(edi_frag);
+ m_pending_frames[tp] = std::move(edi_frag);
tp += inter_fragment_wait_time;
}
}
-
- // Transmission done in run() function
}
else /* PFT disabled */ {
- // Send over ethernet
- if (m_conf.dump) {
- ostream_iterator<uint8_t> debug_iterator(edi_debug_file);
- copy(af_packet.begin(), af_packet.end(), debug_iterator);
- }
-
- for (auto& dest : m_conf.destinations) {
- if (const auto& udp_dest = dynamic_pointer_cast<edi::udp_destination_t>(dest)) {
- Socket::InetAddress addr;
- addr.resolveUdpDestination(udp_dest->dest_addr, udp_dest->dest_port);
-
- if (af_packet.size() > 1400 and not m_udp_fragmentation_warning_printed) {
- fprintf(stderr, "EDI Output: AF packet larger than 1400,"
- " consider using PFT to avoid UP fragmentation.\n");
- m_udp_fragmentation_warning_printed = true;
- }
-
- udp_sockets.at(udp_dest.get())->send(af_packet, addr);
- }
- else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_server_t>(dest)) {
- tcp_dispatchers.at(tcp_dest.get())->write(af_packet);
- }
- else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_client_t>(dest)) {
- const auto error_stats = tcp_senders.at(tcp_dest.get())->sendall(af_packet);
-
- if (m_conf.verbose and error_stats.has_seen_new_errors) {
- fprintf(stderr, "TCP output %s:%d has %zu reconnects: most recent error: %s\n",
- tcp_dest->dest_addr.c_str(),
- tcp_dest->dest_port,
- error_stats.num_reconnects,
- error_stats.last_error.c_str());
- }
- }
- else {
- throw logic_error("EDI destination not implemented");
- }
- }
+ const auto now = steady_clock::now();
+ unique_lock<mutex> lock(m_mutex);
+ m_pending_frames[now] = std::move(af_packet);
}
-}
-void Sender::override_af_sequence(uint16_t seq)
-{
- edi_afPacketiser.OverrideSeq(seq);
-}
-
-void Sender::override_pft_sequence(uint16_t pseq)
-{
- edi_pft.OverridePSeq(pseq);
+ // Actual transmission done in tick() function
}
-std::vector<Sender::stats_t> Sender::get_tcp_server_stats() const
+void Sender::PFTSpreader::tick(const std::chrono::steady_clock::time_point& now)
{
- std::vector<Sender::stats_t> stats;
+ unique_lock<mutex> lock(m_mutex);
- for (auto& el : tcp_dispatchers) {
- Sender::stats_t s;
- s.listen_port = el.first->listen_port;
- s.stats = el.second->get_stats();
- stats.push_back(s);
- }
-
- return stats;
-}
+ for (auto it = m_pending_frames.begin(); it != m_pending_frames.end(); ) {
+ const auto& edi_frag = it->second;
-void Sender::run()
-{
- while (m_running) {
- unique_lock<mutex> lock(m_mutex);
- const auto now = chrono::steady_clock::now();
-
- // Send over ethernet
- for (auto it = m_pending_frames.begin(); it != m_pending_frames.end(); ) {
- const auto& edi_frag = it->second;
-
- if (it->first <= now) {
- if (m_conf.dump) {
- ostream_iterator<uint8_t> debug_iterator(edi_debug_file);
- copy(edi_frag.begin(), edi_frag.end(), debug_iterator);
- }
-
- for (auto& dest : m_conf.destinations) {
- if (const auto& udp_dest = dynamic_pointer_cast<edi::udp_destination_t>(dest)) {
- Socket::InetAddress addr;
- addr.resolveUdpDestination(udp_dest->dest_addr, udp_dest->dest_port);
-
- udp_sockets.at(udp_dest.get())->send(edi_frag, addr);
- }
- else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_server_t>(dest)) {
- tcp_dispatchers.at(tcp_dest.get())->write(edi_frag);
- }
- else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_client_t>(dest)) {
- tcp_senders.at(tcp_dest.get())->sendall(edi_frag);
- }
- else {
- throw logic_error("EDI destination not implemented");
- }
- }
- it = m_pending_frames.erase(it);
- }
- else {
- ++it;
- }
+ if (it->first <= now) {
+ sender->send_packet(edi_frag);
+ it = m_pending_frames.erase(it);
+ }
+ else {
+ ++it;
}
-
- lock.unlock();
- this_thread::sleep_for(chrono::microseconds(500));
}
}
-}
+} // namespace edi