summaryrefslogtreecommitdiffstats
path: root/src/dabOutput/edi
diff options
context:
space:
mode:
Diffstat (limited to 'src/dabOutput/edi')
-rw-r--r--src/dabOutput/edi/Config.h9
-rw-r--r--src/dabOutput/edi/PFT.cpp2
-rw-r--r--src/dabOutput/edi/Transport.cpp67
-rw-r--r--src/dabOutput/edi/Transport.h8
4 files changed, 51 insertions, 35 deletions
diff --git a/src/dabOutput/edi/Config.h b/src/dabOutput/edi/Config.h
index 55d5f0f..0c7dce8 100644
--- a/src/dabOutput/edi/Config.h
+++ b/src/dabOutput/edi/Config.h
@@ -50,11 +50,18 @@ struct udp_destination_t : public destination_t {
};
// TCP server that can accept multiple connections
-struct tcp_destination_t : public destination_t {
+struct tcp_server_t : public destination_t {
unsigned int listen_port = 0;
size_t max_frames_queued = 1024;
};
+// TCP client that connects to one endpoint
+struct tcp_client_t : public destination_t {
+ std::string dest_addr;
+ unsigned int dest_port = 0;
+ size_t max_frames_queued = 1024;
+};
+
struct configuration_t {
unsigned chunk_len = 207; // RSk, data length of each chunk
unsigned fec = 0; // number of fragments that can be recovered
diff --git a/src/dabOutput/edi/PFT.cpp b/src/dabOutput/edi/PFT.cpp
index 5b93016..63dfa34 100644
--- a/src/dabOutput/edi/PFT.cpp
+++ b/src/dabOutput/edi/PFT.cpp
@@ -314,7 +314,7 @@ std::vector< PFTFragment > PFT::Assemble(AFPacket af_packet)
#if 0
fprintf(stderr, "* PFT pseq %d, findex %d, fcount %d, plen %d\n",
- m_pseq, findex, fcount, plen & ~0x8000);
+ m_pseq, findex, fcount, plen & ~0xC000);
#endif
}
diff --git a/src/dabOutput/edi/Transport.cpp b/src/dabOutput/edi/Transport.cpp
index d99e987..187aabe 100644
--- a/src/dabOutput/edi/Transport.cpp
+++ b/src/dabOutput/edi/Transport.cpp
@@ -45,12 +45,16 @@ void configuration_t::print() const
}
etiLog.level(info) << " source port " << udp_dest->source_port;
}
- else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_destination_t>(edi_dest)) {
+ else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_server_t>(edi_dest)) {
etiLog.level(info) << " TCP listening on port " << tcp_dest->listen_port;
etiLog.level(info) << " max frames queued " << tcp_dest->max_frames_queued;
}
+ else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_client_t>(edi_dest)) {
+ etiLog.level(info) << " TCP client connecting to " << tcp_dest->dest_addr << ":" << tcp_dest->dest_port;
+ etiLog.level(info) << " max frames queued " << tcp_dest->max_frames_queued;
+ }
else {
- throw std::logic_error("EDI destination not implemented");
+ throw logic_error("EDI destination not implemented");
}
}
if (interleaver_enabled()) {
@@ -69,28 +73,27 @@ Sender::Sender(const configuration_t& conf) :
for (const auto& edi_dest : m_conf.destinations) {
if (const auto udp_dest = dynamic_pointer_cast<edi::udp_destination_t>(edi_dest)) {
- auto udp_socket = std::make_shared<UdpSocket>(udp_dest->source_port);
+ auto udp_socket = std::make_shared<Socket::UDPSocket>(udp_dest->source_port);
if (not udp_dest->source_addr.empty()) {
- int err = udp_socket->setMulticastSource(udp_dest->source_addr.c_str());
- if (err) {
- throw runtime_error("EDI socket set source failed!");
- }
- err = udp_socket->setMulticastTTL(udp_dest->ttl);
- if (err) {
- throw runtime_error("EDI socket set TTL failed!");
- }
+ udp_socket->setMulticastSource(udp_dest->source_addr.c_str());
+ udp_socket->setMulticastTTL(udp_dest->ttl);
}
udp_sockets.emplace(udp_dest.get(), udp_socket);
}
- else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_destination_t>(edi_dest)) {
- auto dispatcher = make_shared<TCPDataDispatcher>(tcp_dest->max_frames_queued);
+ else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_server_t>(edi_dest)) {
+ auto dispatcher = make_shared<Socket::TCPDataDispatcher>(tcp_dest->max_frames_queued);
dispatcher->start(tcp_dest->listen_port, "0.0.0.0");
tcp_dispatchers.emplace(tcp_dest.get(), dispatcher);
}
+ else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_client_t>(edi_dest)) {
+ auto tcp_socket = make_shared<Socket::TCPSocket>();
+ tcp_socket->connect(tcp_dest->dest_addr, tcp_dest->dest_port);
+ tcp_senders.emplace(tcp_dest.get(), tcp_socket);
+ }
else {
- throw std::logic_error("EDI destination not implemented");
+ throw logic_error("EDI destination not implemented");
}
}
@@ -117,7 +120,7 @@ void Sender::write(const TagPacket& tagpacket)
vector<edi::PFTFragment> edi_fragments = edi_pft.Assemble(af_packet);
if (m_conf.verbose) {
- fprintf(stderr, "EDI number of PFT fragment before interleaver %zu",
+ fprintf(stderr, "EDI number of PFT fragment before interleaver %zu\n",
edi_fragments.size());
}
@@ -129,28 +132,30 @@ void Sender::write(const TagPacket& tagpacket)
for (const auto& edi_frag : edi_fragments) {
for (auto& dest : m_conf.destinations) {
if (const auto& udp_dest = dynamic_pointer_cast<edi::udp_destination_t>(dest)) {
- InetAddress addr;
- addr.setAddress(udp_dest->dest_addr.c_str());
- addr.setPort(m_conf.dest_port);
+ Socket::InetAddress addr;
+ addr.resolveUdpDestination(udp_dest->dest_addr, m_conf.dest_port);
udp_sockets.at(udp_dest.get())->send(edi_frag, addr);
}
- else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_destination_t>(dest)) {
+ else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_server_t>(dest)) {
tcp_dispatchers.at(tcp_dest.get())->write(edi_frag);
}
+ else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_client_t>(dest)) {
+ tcp_senders.at(tcp_dest.get())->sendall(edi_frag.data(), edi_frag.size());
+ }
else {
- throw std::logic_error("EDI destination not implemented");
+ throw logic_error("EDI destination not implemented");
}
}
if (m_conf.dump) {
- std::ostream_iterator<uint8_t> debug_iterator(edi_debug_file);
- std::copy(edi_frag.begin(), edi_frag.end(), debug_iterator);
+ ostream_iterator<uint8_t> debug_iterator(edi_debug_file);
+ copy(edi_frag.begin(), edi_frag.end(), debug_iterator);
}
}
if (m_conf.verbose) {
- fprintf(stderr, "EDI number of PFT fragments %zu",
+ fprintf(stderr, "EDI number of PFT fragments %zu\n",
edi_fragments.size());
}
}
@@ -158,23 +163,25 @@ void Sender::write(const TagPacket& tagpacket)
// Send over ethernet
for (auto& dest : m_conf.destinations) {
if (const auto& udp_dest = dynamic_pointer_cast<edi::udp_destination_t>(dest)) {
- InetAddress addr;
- addr.setAddress(udp_dest->dest_addr.c_str());
- addr.setPort(m_conf.dest_port);
+ Socket::InetAddress addr;
+ addr.resolveUdpDestination(udp_dest->dest_addr, m_conf.dest_port);
udp_sockets.at(udp_dest.get())->send(af_packet, addr);
}
- else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_destination_t>(dest)) {
+ else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_server_t>(dest)) {
tcp_dispatchers.at(tcp_dest.get())->write(af_packet);
}
+ else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_client_t>(dest)) {
+ tcp_senders.at(tcp_dest.get())->sendall(af_packet.data(), af_packet.size());
+ }
else {
- throw std::logic_error("EDI destination not implemented");
+ throw logic_error("EDI destination not implemented");
}
}
if (m_conf.dump) {
- std::ostream_iterator<uint8_t> debug_iterator(edi_debug_file);
- std::copy(af_packet.begin(), af_packet.end(), debug_iterator);
+ ostream_iterator<uint8_t> debug_iterator(edi_debug_file);
+ copy(af_packet.begin(), af_packet.end(), debug_iterator);
}
}
}
diff --git a/src/dabOutput/edi/Transport.h b/src/dabOutput/edi/Transport.h
index 7b0a0db..9633275 100644
--- a/src/dabOutput/edi/Transport.h
+++ b/src/dabOutput/edi/Transport.h
@@ -32,11 +32,12 @@
#include "AFPacket.h"
#include "PFT.h"
#include "Interleaver.h"
+#include "Socket.h"
#include <vector>
#include <unordered_map>
#include <stdexcept>
+#include <fstream>
#include <cstdint>
-#include "dabOutput/dabOutput.h"
namespace edi {
@@ -61,8 +62,9 @@ class Sender {
// To mitigate for burst packet loss, PFT fragments can be sent out-of-order
edi::Interleaver edi_interleaver;
- std::unordered_map<udp_destination_t*, std::shared_ptr<UdpSocket>> udp_sockets;
- std::unordered_map<tcp_destination_t*, std::shared_ptr<TCPDataDispatcher>> tcp_dispatchers;
+ std::unordered_map<udp_destination_t*, std::shared_ptr<Socket::UDPSocket>> udp_sockets;
+ std::unordered_map<tcp_server_t*, std::shared_ptr<Socket::TCPDataDispatcher>> tcp_dispatchers;
+ std::unordered_map<tcp_client_t*, std::shared_ptr<Socket::TCPSocket>> tcp_senders;
};
}