aboutsummaryrefslogtreecommitdiffstats
path: root/lib
diff options
context:
space:
mode:
Diffstat (limited to 'lib')
-rw-r--r--lib/Json.cpp4
-rw-r--r--lib/Json.h4
-rw-r--r--lib/Socket.cpp58
-rw-r--r--lib/Socket.h35
-rw-r--r--lib/ThreadsafeQueue.h38
-rw-r--r--lib/edi/STIDecoder.cpp21
-rw-r--r--lib/edi/STIDecoder.hpp5
-rw-r--r--lib/edioutput/EDIConfig.h31
-rw-r--r--lib/edioutput/PFT.cpp12
-rw-r--r--lib/edioutput/PFT.h15
-rw-r--r--lib/edioutput/Transport.cpp268
-rw-r--r--lib/edioutput/Transport.h106
12 files changed, 395 insertions, 202 deletions
diff --git a/lib/Json.cpp b/lib/Json.cpp
index 361a149..ee33671 100644
--- a/lib/Json.cpp
+++ b/lib/Json.cpp
@@ -3,7 +3,7 @@
Her Majesty the Queen in Right of Canada (Communications Research
Center Canada)
- Copyright (C) 2023
+ Copyright (C) 2025
Matthias P. Braendli, matthias.braendli@mpb.li
http://www.opendigitalradio.org
@@ -27,7 +27,7 @@
#include <sstream>
#include <iomanip>
#include <string>
-#include <algorithm>
+#include <stdexcept>
#include "Json.h"
diff --git a/lib/Json.h b/lib/Json.h
index b082f92..0168583 100644
--- a/lib/Json.h
+++ b/lib/Json.h
@@ -3,7 +3,7 @@
Her Majesty the Queen in Right of Canada (Communications Research
Center Canada)
- Copyright (C) 2023
+ Copyright (C) 2025
Matthias P. Braendli, matthias.braendli@mpb.li
http://www.opendigitalradio.org
@@ -34,10 +34,10 @@
#include <vector>
#include <memory>
#include <optional>
-#include <stdexcept>
#include <string>
#include <unordered_map>
#include <variant>
+#include <cstdint>
namespace json {
diff --git a/lib/Socket.cpp b/lib/Socket.cpp
index 2df1559..5c920d7 100644
--- a/lib/Socket.cpp
+++ b/lib/Socket.cpp
@@ -24,6 +24,7 @@
#include "Socket.h"
+#include <numeric>
#include <stdexcept>
#include <cstdio>
#include <cstring>
@@ -478,7 +479,7 @@ TCPSocket::~TCPSocket()
TCPSocket::TCPSocket(TCPSocket&& other) :
m_sock(other.m_sock),
- m_remote_address(move(other.m_remote_address))
+ m_remote_address(std::move(other.m_remote_address))
{
if (other.m_sock != -1) {
other.m_sock = -1;
@@ -967,12 +968,22 @@ ssize_t TCPClient::recv(void *buffer, size_t length, int flags, int timeout_ms)
reconnect();
}
+ m_last_received_packet_ts = chrono::steady_clock::now();
+
return ret;
}
catch (const TCPSocket::Interrupted&) {
return -1;
}
catch (const TCPSocket::Timeout&) {
+ const auto timeout = chrono::milliseconds(timeout_ms * 5);
+ if (m_last_received_packet_ts.has_value() and
+ chrono::steady_clock::now() - *m_last_received_packet_ts > timeout)
+ {
+ // This is to catch half-closed TCP connections
+ reconnect();
+ }
+
return 0;
}
@@ -983,6 +994,7 @@ void TCPClient::reconnect()
{
TCPSocket newsock;
m_sock = std::move(newsock);
+ m_last_received_packet_ts = nullopt;
m_sock.connect(m_hostname, m_port, true);
}
@@ -990,7 +1002,7 @@ TCPConnection::TCPConnection(TCPSocket&& sock) :
queue(),
m_running(true),
m_sender_thread(),
- m_sock(move(sock))
+ m_sock(std::move(sock))
{
#if MISSING_OWN_ADDR
auto own_addr = m_sock.getOwnAddress();
@@ -1052,6 +1064,17 @@ void TCPConnection::process()
#endif
}
+TCPConnection::stats_t TCPConnection::get_stats() const
+{
+ TCPConnection::stats_t s;
+ const vector<size_t> buffer_sizes = queue.map<size_t>(
+ [](const vector<uint8_t>& vec) { return vec.size(); }
+ );
+
+ s.buffer_fullness = std::accumulate(buffer_sizes.cbegin(), buffer_sizes.cend(), 0);
+ s.remote_address = m_sock.get_remote_address();
+ return s;
+}
TCPDataDispatcher::TCPDataDispatcher(size_t max_queue_size, size_t buffers_to_preroll) :
m_max_queue_size(max_queue_size),
@@ -1109,7 +1132,7 @@ void TCPDataDispatcher::process()
auto sock = m_listener_socket.accept(timeout_ms);
if (sock.valid()) {
auto lock = unique_lock<mutex>(m_mutex);
- m_connections.emplace(m_connections.begin(), move(sock));
+ m_connections.emplace(m_connections.begin(), std::move(sock));
if (m_buffers_to_preroll > 0) {
for (const auto& buf : m_preroll_queue) {
@@ -1125,6 +1148,16 @@ void TCPDataDispatcher::process()
}
}
+
+std::vector<TCPConnection::stats_t> TCPDataDispatcher::get_stats() const
+{
+ std::vector<TCPConnection::stats_t> s;
+ for (const auto& conn : m_connections) {
+ s.push_back(conn.get_stats());
+ }
+ return s;
+}
+
TCPReceiveServer::TCPReceiveServer(size_t blocksize) :
m_blocksize(blocksize)
{
@@ -1181,7 +1214,7 @@ void TCPReceiveServer::process()
}
else {
buf.resize(r);
- m_queue.push(make_shared<TCPReceiveMessageData>(move(buf)));
+ m_queue.push(make_shared<TCPReceiveMessageData>(std::move(buf)));
}
}
catch (const TCPSocket::Interrupted&) {
@@ -1222,7 +1255,7 @@ TCPSendClient::~TCPSendClient()
}
}
-void TCPSendClient::sendall(const std::vector<uint8_t>& buffer)
+TCPSendClient::ErrorStats TCPSendClient::sendall(const std::vector<uint8_t>& buffer)
{
if (not m_running) {
throw runtime_error(m_exception_data);
@@ -1234,6 +1267,17 @@ void TCPSendClient::sendall(const std::vector<uint8_t>& buffer)
vector<uint8_t> discard;
m_queue.try_pop(discard);
}
+
+ TCPSendClient::ErrorStats es;
+ es.num_reconnects = m_num_reconnects.load();
+
+ es.has_seen_new_errors = es.num_reconnects != m_num_reconnects_prev;
+ m_num_reconnects_prev = es.num_reconnects;
+
+ auto lock = unique_lock<mutex>(m_error_mutex);
+ es.last_error = m_last_error;
+
+ return es;
}
void TCPSendClient::process()
@@ -1255,12 +1299,16 @@ void TCPSendClient::process()
}
else {
try {
+ m_num_reconnects.fetch_add(1, std::memory_order_seq_cst);
m_sock.connect(m_hostname, m_port);
m_is_connected = true;
}
catch (const runtime_error& e) {
m_is_connected = false;
this_thread::sleep_for(chrono::seconds(1));
+
+ auto lock = unique_lock<mutex>(m_error_mutex);
+ m_last_error = e.what();
}
}
}
diff --git a/lib/Socket.h b/lib/Socket.h
index 1320a64..29b618a 100644
--- a/lib/Socket.h
+++ b/lib/Socket.h
@@ -31,9 +31,11 @@
#include "ThreadsafeQueue.h"
#include <cstdlib>
#include <atomic>
-#include <string>
+#include <chrono>
#include <list>
#include <memory>
+#include <optional>
+#include <string>
#include <thread>
#include <vector>
@@ -211,6 +213,8 @@ class TCPSocket {
SOCKET get_sockfd() const { return m_sock; }
+ InetAddress get_remote_address() const { return m_remote_address; }
+
private:
explicit TCPSocket(int sockfd);
explicit TCPSocket(int sockfd, InetAddress remote_address);
@@ -236,6 +240,8 @@ class TCPClient {
TCPSocket m_sock;
std::string m_hostname;
int m_port;
+
+ std::optional<std::chrono::steady_clock::time_point> m_last_received_packet_ts;
};
/* Helper class for TCPDataDispatcher, contains a queue of pending data and
@@ -250,6 +256,12 @@ class TCPConnection
ThreadsafeQueue<std::vector<uint8_t> > queue;
+ struct stats_t {
+ size_t buffer_fullness = 0;
+ InetAddress remote_address;
+ };
+ stats_t get_stats() const;
+
private:
std::atomic<bool> m_running;
std::thread m_sender_thread;
@@ -272,6 +284,8 @@ class TCPDataDispatcher
void start(int port, const std::string& address);
void write(const std::vector<uint8_t>& data);
+ std::vector<TCPConnection::stats_t> get_stats() const;
+
private:
void process();
@@ -329,10 +343,18 @@ class TCPSendClient {
public:
TCPSendClient(const std::string& hostname, int port);
~TCPSendClient();
+ TCPSendClient(const TCPSendClient&) = delete;
+ TCPSendClient& operator=(const TCPSendClient&) = delete;
- /* Throws a runtime_error on error
- */
- void sendall(const std::vector<uint8_t>& buffer);
+
+ struct ErrorStats {
+ std::string last_error = "";
+ size_t num_reconnects = 0;
+ bool has_seen_new_errors = false;
+ };
+
+ /* Throws a runtime_error when the process thread isn't running */
+ ErrorStats sendall(const std::vector<uint8_t>& buffer);
private:
void process();
@@ -349,6 +371,11 @@ class TCPSendClient {
std::string m_exception_data;
std::thread m_sender_thread;
TCPSocket m_listener_socket;
+
+ std::atomic<size_t> m_num_reconnects = ATOMIC_VAR_INIT(0);
+ size_t m_num_reconnects_prev = 0;
+ std::mutex m_error_mutex;
+ std::string m_last_error = "";
};
}
diff --git a/lib/ThreadsafeQueue.h b/lib/ThreadsafeQueue.h
index 8b385d6..13bc19e 100644
--- a/lib/ThreadsafeQueue.h
+++ b/lib/ThreadsafeQueue.h
@@ -2,7 +2,7 @@
Copyright (C) 2007, 2008, 2009, 2010, 2011 Her Majesty the Queen in
Right of Canada (Communications Research Center Canada)
- Copyright (C) 2023
+ Copyright (C) 2025
Matthias P. Braendli, matthias.braendli@mpb.li
An implementation for a threadsafe queue, depends on C++11
@@ -28,6 +28,7 @@
#pragma once
+#include <functional>
#include <mutex>
#include <condition_variable>
#include <queue>
@@ -63,10 +64,10 @@ public:
std::unique_lock<std::mutex> lock(the_mutex);
size_t queue_size_before = the_queue.size();
if (max_size == 0) {
- the_queue.push(val);
+ the_queue.push_back(val);
}
else if (queue_size_before < max_size) {
- the_queue.push(val);
+ the_queue.push_back(val);
}
size_t queue_size = the_queue.size();
lock.unlock();
@@ -80,10 +81,10 @@ public:
std::unique_lock<std::mutex> lock(the_mutex);
size_t queue_size_before = the_queue.size();
if (max_size == 0) {
- the_queue.emplace(std::move(val));
+ the_queue.emplace_back(std::move(val));
}
else if (queue_size_before < max_size) {
- the_queue.emplace(std::move(val));
+ the_queue.emplace_back(std::move(val));
}
size_t queue_size = the_queue.size();
lock.unlock();
@@ -110,9 +111,9 @@ public:
bool overflow = false;
while (the_queue.size() >= max_size) {
overflow = true;
- the_queue.pop();
+ the_queue.pop_front();
}
- the_queue.push(val);
+ the_queue.push_back(val);
const size_t queue_size = the_queue.size();
lock.unlock();
@@ -129,9 +130,9 @@ public:
bool overflow = false;
while (the_queue.size() >= max_size) {
overflow = true;
- the_queue.pop();
+ the_queue.pop_front();
}
- the_queue.emplace(std::move(val));
+ the_queue.emplace_back(std::move(val));
const size_t queue_size = the_queue.size();
lock.unlock();
@@ -152,7 +153,7 @@ public:
while (the_queue.size() >= threshold) {
the_tx_notification.wait(lock);
}
- the_queue.push(val);
+ the_queue.push_back(val);
size_t queue_size = the_queue.size();
lock.unlock();
@@ -198,7 +199,7 @@ public:
}
popped_value = the_queue.front();
- the_queue.pop();
+ the_queue.pop_front();
lock.unlock();
the_tx_notification.notify_one();
@@ -220,15 +221,26 @@ public:
}
else {
std::swap(popped_value, the_queue.front());
- the_queue.pop();
+ the_queue.pop_front();
lock.unlock();
the_tx_notification.notify_one();
}
}
+ template<typename R>
+ std::vector<R> map(std::function<R(const T&)> func) const
+ {
+ std::vector<R> result;
+ std::unique_lock<std::mutex> lock(the_mutex);
+ for (const T& elem : the_queue) {
+ result.push_back(func(elem));
+ }
+ return result;
+ }
+
private:
- std::queue<T> the_queue;
+ std::deque<T> the_queue;
mutable std::mutex the_mutex;
std::condition_variable the_rx_notification;
std::condition_variable the_tx_notification;
diff --git a/lib/edi/STIDecoder.cpp b/lib/edi/STIDecoder.cpp
index 2de828b..0499c53 100644
--- a/lib/edi/STIDecoder.cpp
+++ b/lib/edi/STIDecoder.cpp
@@ -1,5 +1,5 @@
/*
- Copyright (C) 2020
+ Copyright (C) 2025
Matthias P. Braendli, matthias.braendli@mpb.li
http://opendigitalradio.org
@@ -20,7 +20,6 @@
*/
#include "STIDecoder.hpp"
#include "buffer_unpack.hpp"
-#include "crc.h"
#include "Log.h"
#include <cstdio>
#include <cassert>
@@ -180,7 +179,13 @@ bool STIDecoder::decode_ssn(const std::vector<uint8_t>& value, const tag_name_t&
n |= (uint16_t)(name[3]);
if (n == 0) {
- etiLog.level(warn) << "EDI: Stream index SSnn tag is zero";
+ if (not m_ssnn_zero_warning_printed) {
+ etiLog.level(warn) << "EDI: Stream index SSnn tag is zero";
+ }
+ m_ssnn_zero_warning_printed = true;
+ }
+ else {
+ m_ssnn_zero_warning_printed = false;
}
if (m_filter_stream and m_filtered_stream_index != n) {
@@ -197,14 +202,20 @@ bool STIDecoder::decode_ssn(const std::vector<uint8_t>& value, const tag_name_t&
sti.stid = istc & 0xFFF;
if (sti.rfa != 0) {
- etiLog.level(warn) << "EDI: rfa field in SSnn tag non-null";
+ if (not m_rfa_nonnull_warning_printed) {
+ etiLog.level(warn) << "EDI: rfa field in SSnn tag non-null";
+ }
+ m_rfa_nonnull_warning_printed = true;
+ }
+ else {
+ m_rfa_nonnull_warning_printed = false;
}
copy( value.cbegin() + 3,
value.cend(),
back_inserter(sti.istd));
- m_data_collector.add_payload(move(sti));
+ m_data_collector.add_payload(std::move(sti));
return true;
}
diff --git a/lib/edi/STIDecoder.hpp b/lib/edi/STIDecoder.hpp
index 5e71ce7..81fbd82 100644
--- a/lib/edi/STIDecoder.hpp
+++ b/lib/edi/STIDecoder.hpp
@@ -1,5 +1,5 @@
/*
- Copyright (C) 2020
+ Copyright (C) 2025
Matthias P. Braendli, matthias.braendli@mpb.li
http://opendigitalradio.org
@@ -139,6 +139,9 @@ class STIDecoder {
bool m_filter_stream = false;
uint16_t m_filtered_stream_index = 1;
+
+ bool m_ssnn_zero_warning_printed = false;
+ bool m_rfa_nonnull_warning_printed = false;
};
}
diff --git a/lib/edioutput/EDIConfig.h b/lib/edioutput/EDIConfig.h
index 1997210..7016e87 100644
--- a/lib/edioutput/EDIConfig.h
+++ b/lib/edioutput/EDIConfig.h
@@ -1,5 +1,5 @@
/*
- Copyright (C) 2019
+ Copyright (C) 2025
Matthias P. Braendli, matthias.braendli@mpb.li
http://www.opendigitalradio.org
@@ -36,17 +36,31 @@ namespace edi {
/** Configuration for EDI output */
+struct pft_settings_t {
+ // protection and fragmentation settings
+ bool verbose = false;
+ bool enable_pft = false;
+ unsigned chunk_len = 207; // RSk, data length of each chunk
+ unsigned fec = 0; // number of fragments that can be recovered
+ double fragment_spreading_factor = 0.95;
+ // Spread transmission of fragments in time. 1.0 = 100% means spreading over the whole duration of a frame (24ms)
+ // Above 100% means that the fragments are spread over several 24ms periods, interleaving the AF packets.
+};
+
struct destination_t {
virtual ~destination_t() {};
+
+ pft_settings_t pft_settings = {};
};
+
// Can represent both unicast and multicast destinations
struct udp_destination_t : public destination_t {
std::string dest_addr;
- unsigned int dest_port = 0;
+ uint16_t dest_port = 0;
std::string source_addr;
- unsigned int source_port = 0;
- unsigned int ttl = 10;
+ uint16_t source_port = 0;
+ uint8_t ttl = 10;
};
// TCP server that can accept multiple connections
@@ -66,16 +80,9 @@ struct tcp_client_t : public destination_t {
};
struct configuration_t {
- unsigned chunk_len = 207; // RSk, data length of each chunk
- unsigned fec = 0; // number of fragments that can be recovered
- bool dump = false; // dump a file with the EDI packets
- bool verbose = false;
- bool enable_pft = false; // Enable protection and fragmentation
+ bool verbose = false;
unsigned int tagpacket_alignment = 0;
std::vector<std::shared_ptr<destination_t> > destinations;
- double fragment_spreading_factor = 0.95;
- // Spread transmission of fragments in time. 1.0 = 100% means spreading over the whole duration of a frame (24ms)
- // Above 100% means that the fragments are spread over several 24ms periods, interleaving the AF packets.
bool enabled() const { return destinations.size() > 0; }
diff --git a/lib/edioutput/PFT.cpp b/lib/edioutput/PFT.cpp
index 7e0e8e9..f65fd67 100644
--- a/lib/edioutput/PFT.cpp
+++ b/lib/edioutput/PFT.cpp
@@ -1,5 +1,5 @@
/*
- Copyright (C) 2021
+ Copyright (C) 2025
Matthias P. Braendli, matthias.braendli@mpb.li
http://www.opendigitalradio.org
@@ -31,7 +31,6 @@
*/
#include <vector>
-#include <list>
#include <cstdio>
#include <cstring>
#include <cstdint>
@@ -41,6 +40,7 @@
#include "PFT.h"
#include "crc.h"
#include "ReedSolomon.h"
+#include "Log.h"
namespace edi {
@@ -51,11 +51,10 @@ using namespace std;
PFT::PFT() { }
-PFT::PFT(const configuration_t &conf) :
+PFT::PFT(const pft_settings_t& conf) :
+ m_enabled(conf.enable_pft),
m_k(conf.chunk_len),
m_m(conf.fec),
- m_pseq(0),
- m_num_chunks(0),
m_verbose(conf.verbose)
{
if (m_k > 207) {
@@ -324,5 +323,4 @@ void PFT::OverridePSeq(uint16_t pseq)
m_pseq = pseq;
}
-}
-
+} // namespace edi
diff --git a/lib/edioutput/PFT.h b/lib/edioutput/PFT.h
index 42569a0..52e9f46 100644
--- a/lib/edioutput/PFT.h
+++ b/lib/edioutput/PFT.h
@@ -1,5 +1,5 @@
/*
- Copyright (C) 2021
+ Copyright (C) 2025
Matthias P. Braendli, matthias.braendli@mpb.li
http://www.opendigitalradio.org
@@ -33,12 +33,8 @@
#pragma once
#include <vector>
-#include <list>
-#include <stdexcept>
#include <cstdint>
#include "AFPacket.h"
-#include "Log.h"
-#include "ReedSolomon.h"
#include "EDIConfig.h"
namespace edi {
@@ -52,21 +48,24 @@ class PFT
static constexpr int PARITYBYTES = 48;
PFT();
- PFT(const configuration_t& conf);
+ PFT(const pft_settings_t& conf);
+
+ bool is_enabled() const { return m_enabled and m_k > 0; }
// return a list of PFT fragments with the correct
// PFT headers
- std::vector< PFTFragment > Assemble(AFPacket af_packet);
+ std::vector<PFTFragment> Assemble(AFPacket af_packet);
// Apply Reed-Solomon FEC to the AF Packet
RSBlock Protect(AFPacket af_packet);
// Cut a RSBlock into several fragments that can be transmitted
- std::vector< std::vector<uint8_t> > ProtectAndFragment(AFPacket af_packet);
+ std::vector<std::vector<uint8_t>> ProtectAndFragment(AFPacket af_packet);
void OverridePSeq(uint16_t pseq);
private:
+ bool m_enabled = false;
unsigned int m_k = 207; // length of RS data word
unsigned int m_m = 3; // number of fragments that can be recovered if lost
uint16_t m_pseq = 0;
diff --git a/lib/edioutput/Transport.cpp b/lib/edioutput/Transport.cpp
index 8ebb9fc..e9559b5 100644
--- a/lib/edioutput/Transport.cpp
+++ b/lib/edioutput/Transport.cpp
@@ -1,5 +1,5 @@
/*
- Copyright (C) 2022
+ Copyright (C) 2025
Matthias P. Braendli, matthias.braendli@mpb.li
http://www.opendigitalradio.org
@@ -25,7 +25,7 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "Transport.h"
-#include <iterator>
+#include "Log.h"
#include <cmath>
#include <thread>
@@ -57,13 +57,18 @@ void configuration_t::print() const
else {
throw logic_error("EDI destination not implemented");
}
+ etiLog.level(info) << " PFT=" << edi_dest->pft_settings.enable_pft;
+ if (edi_dest->pft_settings.enable_pft) {
+ etiLog.level(info) << " FEC=" << edi_dest->pft_settings.fec;
+ etiLog.level(info) << " Chunk Len=" << edi_dest->pft_settings.chunk_len;
+ etiLog.level(info) << " Fragment spreading factor=" << edi_dest->pft_settings.fragment_spreading_factor;
+ }
}
}
Sender::Sender(const configuration_t& conf) :
- m_conf(conf),
- edi_pft(m_conf)
+ m_conf(conf)
{
if (m_conf.verbose) {
etiLog.level(info) << "Setup EDI Output";
@@ -71,37 +76,39 @@ Sender::Sender(const configuration_t& conf) :
for (const auto& edi_dest : m_conf.destinations) {
if (const auto udp_dest = dynamic_pointer_cast<edi::udp_destination_t>(edi_dest)) {
- auto udp_socket = std::make_shared<Socket::UDPSocket>(udp_dest->source_port);
+ Socket::UDPSocket udp_socket(udp_dest->source_port);
if (not udp_dest->source_addr.empty()) {
- udp_socket->setMulticastSource(udp_dest->source_addr.c_str());
- udp_socket->setMulticastTTL(udp_dest->ttl);
+ udp_socket.setMulticastSource(udp_dest->source_addr.c_str());
+ udp_socket.setMulticastTTL(udp_dest->ttl);
}
- udp_sockets.emplace(udp_dest.get(), udp_socket);
+ auto sender = make_shared<udp_sender_t>(
+ udp_dest->dest_addr,
+ udp_dest->dest_port,
+ std::move(udp_socket));
+ m_pft_spreaders.emplace_back(
+ make_shared<PFTSpreader>(udp_dest->pft_settings, sender));
}
else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_server_t>(edi_dest)) {
- auto dispatcher = make_shared<Socket::TCPDataDispatcher>(
- tcp_dest->max_frames_queued, tcp_dest->tcp_server_preroll_buffers);
-
- dispatcher->start(tcp_dest->listen_port, "0.0.0.0");
- tcp_dispatchers.emplace(tcp_dest.get(), dispatcher);
+ auto sender = make_shared<tcp_dispatcher_t>(
+ tcp_dest->listen_port,
+ tcp_dest->max_frames_queued,
+ tcp_dest->tcp_server_preroll_buffers);
+ m_pft_spreaders.emplace_back(
+ make_shared<PFTSpreader>(tcp_dest->pft_settings, sender));
}
else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_client_t>(edi_dest)) {
- auto tcp_send_client = make_shared<Socket::TCPSendClient>(tcp_dest->dest_addr, tcp_dest->dest_port);
- tcp_senders.emplace(tcp_dest.get(), tcp_send_client);
+ auto sender = make_shared<tcp_send_client_t>(tcp_dest->dest_addr, tcp_dest->dest_port);
+ m_pft_spreaders.emplace_back(
+ make_shared<PFTSpreader>(tcp_dest->pft_settings, sender));
}
else {
throw logic_error("EDI destination not implemented");
}
}
- if (m_conf.dump) {
- edi_debug_file.open("./edi.debug");
- }
-
- if (m_conf.enable_pft) {
- unique_lock<mutex> lock(m_mutex);
+ {
m_running = true;
m_thread = thread(&Sender::run, this);
}
@@ -111,10 +118,52 @@ Sender::Sender(const configuration_t& conf) :
}
}
+void Sender::write(const TagPacket& tagpacket)
+{
+ // Assemble into one AF Packet
+ edi::AFPacket af_packet = edi_af_packetiser.Assemble(tagpacket);
+
+ write(af_packet);
+}
+
+void Sender::write(const AFPacket& af_packet)
+{
+ for (auto& sender : m_pft_spreaders) {
+ sender->send_af_packet(af_packet);
+ }
+}
+
+void Sender::override_af_sequence(uint16_t seq)
+{
+ edi_af_packetiser.OverrideSeq(seq);
+}
+
+void Sender::override_pft_sequence(uint16_t pseq)
+{
+ for (auto& spreader : m_pft_spreaders) {
+ spreader->edi_pft.OverridePSeq(pseq);
+ }
+}
+
+std::vector<Sender::stats_t> Sender::get_tcp_server_stats() const
+{
+ std::vector<Sender::stats_t> stats;
+
+ for (auto& spreader : m_pft_spreaders) {
+ if (auto sender = std::dynamic_pointer_cast<tcp_dispatcher_t>(spreader->sender)) {
+ Sender::stats_t s;
+ s.listen_port = sender->listen_port;
+ s.stats = sender->sock.get_stats();
+ stats.push_back(s);
+ }
+ }
+
+ return stats;
+}
+
Sender::~Sender()
{
{
- unique_lock<mutex> lock(m_mutex);
m_running = false;
}
@@ -123,36 +172,89 @@ Sender::~Sender()
}
}
-void Sender::write(const TagPacket& tagpacket)
+void Sender::run()
{
- // Assemble into one AF Packet
- edi::AFPacket af_packet = edi_afPacketiser.Assemble(tagpacket);
+ while (m_running) {
+ const auto now = chrono::steady_clock::now();
+ for (auto& spreader : m_pft_spreaders) {
+ spreader->tick(now);
+ }
- write(af_packet);
+ this_thread::sleep_for(chrono::microseconds(500));
+ }
}
-void Sender::write(const AFPacket& af_packet)
+
+void Sender::udp_sender_t::send_packet(const std::vector<uint8_t> &frame)
+{
+ Socket::InetAddress addr;
+ addr.resolveUdpDestination(dest_addr, dest_port);
+ sock.send(frame, addr);
+}
+
+void Sender::tcp_dispatcher_t::send_packet(const std::vector<uint8_t> &frame)
{
- if (m_conf.enable_pft) {
+ sock.write(frame);
+}
+
+void Sender::tcp_send_client_t::send_packet(const std::vector<uint8_t> &frame)
+{
+ sock.sendall(frame);
+}
+
+Sender::udp_sender_t::udp_sender_t(std::string dest_addr,
+ uint16_t dest_port,
+ Socket::UDPSocket&& sock) :
+ dest_addr(dest_addr),
+ dest_port(dest_port),
+ sock(std::move(sock))
+{
+}
+
+Sender::tcp_dispatcher_t::tcp_dispatcher_t(uint16_t listen_port,
+ size_t max_frames_queued,
+ size_t tcp_server_preroll_buffers) :
+ listen_port(listen_port),
+ sock(max_frames_queued, tcp_server_preroll_buffers)
+{
+ sock.start(listen_port, "0.0.0.0");
+}
+
+Sender::tcp_send_client_t::tcp_send_client_t(const std::string &dest_addr,
+ uint16_t dest_port) :
+ sock(dest_addr, dest_port)
+{
+}
+
+Sender::PFTSpreader::PFTSpreader(const pft_settings_t& conf, sender_sp sender) :
+ sender(sender),
+ edi_pft(conf)
+{
+}
+
+void Sender::PFTSpreader::send_af_packet(const AFPacket& af_packet)
+{
+ using namespace std::chrono;
+ if (edi_pft.is_enabled()) {
// Apply PFT layer to AF Packet (Reed Solomon FEC and Fragmentation)
vector<edi::PFTFragment> edi_fragments = edi_pft.Assemble(af_packet);
- if (m_conf.verbose and m_last_num_pft_fragments != edi_fragments.size()) {
+ if (settings.verbose and last_num_pft_fragments != edi_fragments.size()) {
etiLog.log(debug, "EDI Output: Number of PFT fragments %zu\n",
edi_fragments.size());
- m_last_num_pft_fragments = edi_fragments.size();
+ last_num_pft_fragments = edi_fragments.size();
}
/* Spread out the transmission of all fragments over part of the 24ms AF packet duration
* to reduce the risk of losing a burst of fragments because of congestion. */
- using namespace std::chrono;
auto inter_fragment_wait_time = microseconds(1);
if (edi_fragments.size() > 1) {
- if (m_conf.fragment_spreading_factor > 0) {
+ if (settings.fragment_spreading_factor > 0) {
inter_fragment_wait_time =
- microseconds(
- llrint(m_conf.fragment_spreading_factor * 24000.0 / edi_fragments.size())
- );
+ microseconds(llrint(
+ settings.fragment_spreading_factor * 24000.0 /
+ edi_fragments.size()
+ ));
}
}
@@ -162,99 +264,35 @@ void Sender::write(const AFPacket& af_packet)
auto tp = now;
unique_lock<mutex> lock(m_mutex);
for (auto& edi_frag : edi_fragments) {
- m_pending_frames[tp] = move(edi_frag);
+ m_pending_frames[tp] = std::move(edi_frag);
tp += inter_fragment_wait_time;
}
}
-
- // Transmission done in run() function
}
else /* PFT disabled */ {
- // Send over ethernet
- if (m_conf.dump) {
- ostream_iterator<uint8_t> debug_iterator(edi_debug_file);
- copy(af_packet.begin(), af_packet.end(), debug_iterator);
- }
-
- for (auto& dest : m_conf.destinations) {
- if (const auto& udp_dest = dynamic_pointer_cast<edi::udp_destination_t>(dest)) {
- Socket::InetAddress addr;
- addr.resolveUdpDestination(udp_dest->dest_addr, udp_dest->dest_port);
-
- if (af_packet.size() > 1400 and not m_udp_fragmentation_warning_printed) {
- fprintf(stderr, "EDI Output: AF packet larger than 1400,"
- " consider using PFT to avoid UP fragmentation.\n");
- m_udp_fragmentation_warning_printed = true;
- }
-
- udp_sockets.at(udp_dest.get())->send(af_packet, addr);
- }
- else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_server_t>(dest)) {
- tcp_dispatchers.at(tcp_dest.get())->write(af_packet);
- }
- else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_client_t>(dest)) {
- tcp_senders.at(tcp_dest.get())->sendall(af_packet);
- }
- else {
- throw logic_error("EDI destination not implemented");
- }
- }
+ const auto now = steady_clock::now();
+ unique_lock<mutex> lock(m_mutex);
+ m_pending_frames[now] = std::move(af_packet);
}
-}
-void Sender::override_af_sequence(uint16_t seq)
-{
- edi_afPacketiser.OverrideSeq(seq);
+ // Actual transmission done in tick() function
}
-void Sender::override_pft_sequence(uint16_t pseq)
+void Sender::PFTSpreader::tick(const std::chrono::steady_clock::time_point& now)
{
- edi_pft.OverridePSeq(pseq);
-}
+ unique_lock<mutex> lock(m_mutex);
-void Sender::run()
-{
- while (m_running) {
- unique_lock<mutex> lock(m_mutex);
- const auto now = chrono::steady_clock::now();
+ for (auto it = m_pending_frames.begin(); it != m_pending_frames.end(); ) {
+ const auto& edi_frag = it->second;
- // Send over ethernet
- for (auto it = m_pending_frames.begin(); it != m_pending_frames.end(); ) {
- const auto& edi_frag = it->second;
-
- if (it->first <= now) {
- if (m_conf.dump) {
- ostream_iterator<uint8_t> debug_iterator(edi_debug_file);
- copy(edi_frag.begin(), edi_frag.end(), debug_iterator);
- }
-
- for (auto& dest : m_conf.destinations) {
- if (const auto& udp_dest = dynamic_pointer_cast<edi::udp_destination_t>(dest)) {
- Socket::InetAddress addr;
- addr.resolveUdpDestination(udp_dest->dest_addr, udp_dest->dest_port);
-
- udp_sockets.at(udp_dest.get())->send(edi_frag, addr);
- }
- else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_server_t>(dest)) {
- tcp_dispatchers.at(tcp_dest.get())->write(edi_frag);
- }
- else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_client_t>(dest)) {
- tcp_senders.at(tcp_dest.get())->sendall(edi_frag);
- }
- else {
- throw logic_error("EDI destination not implemented");
- }
- }
- it = m_pending_frames.erase(it);
- }
- else {
- ++it;
- }
+ if (it->first <= now) {
+ sender->send_packet(edi_frag);
+ it = m_pending_frames.erase(it);
+ }
+ else {
+ ++it;
}
-
- lock.unlock();
- this_thread::sleep_for(chrono::microseconds(500));
}
}
-}
+} // namespace edi
diff --git a/lib/edioutput/Transport.h b/lib/edioutput/Transport.h
index 6a3f229..b8a9008 100644
--- a/lib/edioutput/Transport.h
+++ b/lib/edioutput/Transport.h
@@ -1,5 +1,5 @@
/*
- Copyright (C) 2022
+ Copyright (C) 2025
Matthias P. Braendli, matthias.braendli@mpb.li
http://www.opendigitalradio.org
@@ -31,25 +31,23 @@
#include "AFPacket.h"
#include "PFT.h"
#include "Socket.h"
-#include <vector>
#include <chrono>
#include <map>
-#include <unordered_map>
-#include <stdexcept>
-#include <fstream>
#include <cstdint>
#include <thread>
#include <mutex>
+#include <vector>
namespace edi {
-/** STI sender for EDI output */
-
+/** ETI/STI sender for EDI output */
class Sender {
public:
Sender(const configuration_t& conf);
Sender(const Sender&) = delete;
- Sender operator=(const Sender&) = delete;
+ Sender& operator=(const Sender&) = delete;
+ Sender(Sender&&) = delete;
+ Sender& operator=(Sender&&) = delete;
~Sender();
// Assemble the tagpacket into an AF packet, and if needed,
@@ -66,33 +64,85 @@ class Sender {
void override_af_sequence(uint16_t seq);
void override_pft_sequence(uint16_t pseq);
- private:
- void run();
-
- bool m_udp_fragmentation_warning_printed = false;
+ struct stats_t {
+ uint16_t listen_port;
+ std::vector<Socket::TCPConnection::stats_t> stats;
+ };
+ std::vector<stats_t> get_tcp_server_stats() const;
+ private:
configuration_t m_conf;
- std::ofstream edi_debug_file;
// The TagPacket will then be placed into an AFPacket
- edi::AFPacketiser edi_afPacketiser;
+ edi::AFPacketiser edi_af_packetiser;
- // The AF Packet will be protected with reed-solomon and split in fragments
- edi::PFT edi_pft;
+ // PFT spreading requires sending UDP packets at specific time,
+ // independently of time when write() gets called
+ bool m_running = false;
+ std::thread m_thread;
+ virtual void run();
- std::unordered_map<udp_destination_t*, std::shared_ptr<Socket::UDPSocket>> udp_sockets;
- std::unordered_map<tcp_server_t*, std::shared_ptr<Socket::TCPDataDispatcher>> tcp_dispatchers;
- std::unordered_map<tcp_client_t*, std::shared_ptr<Socket::TCPSendClient>> tcp_senders;
- // PFT spreading requires sending UDP packets at specific time, independently of
- // time when write() gets called
- std::thread m_thread;
- std::mutex m_mutex;
- bool m_running = false;
- std::map<std::chrono::steady_clock::time_point, edi::PFTFragment> m_pending_frames;
- size_t m_last_num_pft_fragments = 0;
-};
-}
+ struct i_sender {
+ virtual void send_packet(const std::vector<uint8_t> &frame) = 0;
+ virtual ~i_sender() { }
+ };
+
+ struct udp_sender_t : public i_sender {
+ udp_sender_t(
+ std::string dest_addr,
+ uint16_t dest_port,
+ Socket::UDPSocket&& sock);
+
+ std::string dest_addr;
+ uint16_t dest_port;
+ Socket::UDPSocket sock;
+
+ virtual void send_packet(const std::vector<uint8_t> &frame) override;
+ };
+
+ struct tcp_dispatcher_t : public i_sender {
+ tcp_dispatcher_t(
+ uint16_t listen_port,
+ size_t max_frames_queued,
+ size_t tcp_server_preroll_buffers);
+
+ uint16_t listen_port;
+ Socket::TCPDataDispatcher sock;
+ virtual void send_packet(const std::vector<uint8_t> &frame) override;
+ };
+
+ struct tcp_send_client_t : public i_sender {
+ tcp_send_client_t(
+ const std::string& dest_addr,
+ uint16_t dest_port);
+
+ Socket::TCPSendClient sock;
+ virtual void send_packet(const std::vector<uint8_t> &frame) override;
+ };
+
+ class PFTSpreader {
+ public:
+ using sender_sp = std::shared_ptr<i_sender>;
+ PFTSpreader(const pft_settings_t &conf, sender_sp sender);
+ sender_sp sender;
+ edi::PFT edi_pft;
+
+ void send_af_packet(const AFPacket &af_packet);
+ void tick(const std::chrono::steady_clock::time_point& now);
+
+ private:
+ // send_af_packet() and tick() are called from different threads, both
+ // are accessing m_pending_frames
+ std::mutex m_mutex;
+ std::map<std::chrono::steady_clock::time_point, edi::PFTFragment> m_pending_frames;
+ pft_settings_t settings;
+ size_t last_num_pft_fragments = 0;
+ };
+
+ std::vector<std::shared_ptr<PFTSpreader>> m_pft_spreaders;
+};
+}