aboutsummaryrefslogtreecommitdiffstats
path: root/src/dabOutput/edi
diff options
context:
space:
mode:
Diffstat (limited to 'src/dabOutput/edi')
-rw-r--r--src/dabOutput/edi/Config.h9
-rw-r--r--src/dabOutput/edi/PFT.cpp2
-rw-r--r--src/dabOutput/edi/Transport.cpp43
-rw-r--r--src/dabOutput/edi/Transport.h4
4 files changed, 41 insertions, 17 deletions
diff --git a/src/dabOutput/edi/Config.h b/src/dabOutput/edi/Config.h
index 55d5f0f..0c7dce8 100644
--- a/src/dabOutput/edi/Config.h
+++ b/src/dabOutput/edi/Config.h
@@ -50,11 +50,18 @@ struct udp_destination_t : public destination_t {
};
// TCP server that can accept multiple connections
-struct tcp_destination_t : public destination_t {
+struct tcp_server_t : public destination_t {
unsigned int listen_port = 0;
size_t max_frames_queued = 1024;
};
+// TCP client that connects to one endpoint
+struct tcp_client_t : public destination_t {
+ std::string dest_addr;
+ unsigned int dest_port = 0;
+ size_t max_frames_queued = 1024;
+};
+
struct configuration_t {
unsigned chunk_len = 207; // RSk, data length of each chunk
unsigned fec = 0; // number of fragments that can be recovered
diff --git a/src/dabOutput/edi/PFT.cpp b/src/dabOutput/edi/PFT.cpp
index 5b93016..63dfa34 100644
--- a/src/dabOutput/edi/PFT.cpp
+++ b/src/dabOutput/edi/PFT.cpp
@@ -314,7 +314,7 @@ std::vector< PFTFragment > PFT::Assemble(AFPacket af_packet)
#if 0
fprintf(stderr, "* PFT pseq %d, findex %d, fcount %d, plen %d\n",
- m_pseq, findex, fcount, plen & ~0x8000);
+ m_pseq, findex, fcount, plen & ~0xC000);
#endif
}
diff --git a/src/dabOutput/edi/Transport.cpp b/src/dabOutput/edi/Transport.cpp
index 6d3950f..187aabe 100644
--- a/src/dabOutput/edi/Transport.cpp
+++ b/src/dabOutput/edi/Transport.cpp
@@ -45,12 +45,16 @@ void configuration_t::print() const
}
etiLog.level(info) << " source port " << udp_dest->source_port;
}
- else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_destination_t>(edi_dest)) {
+ else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_server_t>(edi_dest)) {
etiLog.level(info) << " TCP listening on port " << tcp_dest->listen_port;
etiLog.level(info) << " max frames queued " << tcp_dest->max_frames_queued;
}
+ else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_client_t>(edi_dest)) {
+ etiLog.level(info) << " TCP client connecting to " << tcp_dest->dest_addr << ":" << tcp_dest->dest_port;
+ etiLog.level(info) << " max frames queued " << tcp_dest->max_frames_queued;
+ }
else {
- throw std::logic_error("EDI destination not implemented");
+ throw logic_error("EDI destination not implemented");
}
}
if (interleaver_enabled()) {
@@ -78,13 +82,18 @@ Sender::Sender(const configuration_t& conf) :
udp_sockets.emplace(udp_dest.get(), udp_socket);
}
- else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_destination_t>(edi_dest)) {
+ else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_server_t>(edi_dest)) {
auto dispatcher = make_shared<Socket::TCPDataDispatcher>(tcp_dest->max_frames_queued);
dispatcher->start(tcp_dest->listen_port, "0.0.0.0");
tcp_dispatchers.emplace(tcp_dest.get(), dispatcher);
}
+ else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_client_t>(edi_dest)) {
+ auto tcp_socket = make_shared<Socket::TCPSocket>();
+ tcp_socket->connect(tcp_dest->dest_addr, tcp_dest->dest_port);
+ tcp_senders.emplace(tcp_dest.get(), tcp_socket);
+ }
else {
- throw std::logic_error("EDI destination not implemented");
+ throw logic_error("EDI destination not implemented");
}
}
@@ -111,7 +120,7 @@ void Sender::write(const TagPacket& tagpacket)
vector<edi::PFTFragment> edi_fragments = edi_pft.Assemble(af_packet);
if (m_conf.verbose) {
- fprintf(stderr, "EDI number of PFT fragment before interleaver %zu",
+ fprintf(stderr, "EDI number of PFT fragment before interleaver %zu\n",
edi_fragments.size());
}
@@ -128,22 +137,25 @@ void Sender::write(const TagPacket& tagpacket)
udp_sockets.at(udp_dest.get())->send(edi_frag, addr);
}
- else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_destination_t>(dest)) {
+ else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_server_t>(dest)) {
tcp_dispatchers.at(tcp_dest.get())->write(edi_frag);
}
+ else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_client_t>(dest)) {
+ tcp_senders.at(tcp_dest.get())->sendall(edi_frag.data(), edi_frag.size());
+ }
else {
- throw std::logic_error("EDI destination not implemented");
+ throw logic_error("EDI destination not implemented");
}
}
if (m_conf.dump) {
- std::ostream_iterator<uint8_t> debug_iterator(edi_debug_file);
- std::copy(edi_frag.begin(), edi_frag.end(), debug_iterator);
+ ostream_iterator<uint8_t> debug_iterator(edi_debug_file);
+ copy(edi_frag.begin(), edi_frag.end(), debug_iterator);
}
}
if (m_conf.verbose) {
- fprintf(stderr, "EDI number of PFT fragments %zu",
+ fprintf(stderr, "EDI number of PFT fragments %zu\n",
edi_fragments.size());
}
}
@@ -156,17 +168,20 @@ void Sender::write(const TagPacket& tagpacket)
udp_sockets.at(udp_dest.get())->send(af_packet, addr);
}
- else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_destination_t>(dest)) {
+ else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_server_t>(dest)) {
tcp_dispatchers.at(tcp_dest.get())->write(af_packet);
}
+ else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_client_t>(dest)) {
+ tcp_senders.at(tcp_dest.get())->sendall(af_packet.data(), af_packet.size());
+ }
else {
- throw std::logic_error("EDI destination not implemented");
+ throw logic_error("EDI destination not implemented");
}
}
if (m_conf.dump) {
- std::ostream_iterator<uint8_t> debug_iterator(edi_debug_file);
- std::copy(af_packet.begin(), af_packet.end(), debug_iterator);
+ ostream_iterator<uint8_t> debug_iterator(edi_debug_file);
+ copy(af_packet.begin(), af_packet.end(), debug_iterator);
}
}
}
diff --git a/src/dabOutput/edi/Transport.h b/src/dabOutput/edi/Transport.h
index 74126d1..9633275 100644
--- a/src/dabOutput/edi/Transport.h
+++ b/src/dabOutput/edi/Transport.h
@@ -36,6 +36,7 @@
#include <vector>
#include <unordered_map>
#include <stdexcept>
+#include <fstream>
#include <cstdint>
namespace edi {
@@ -62,7 +63,8 @@ class Sender {
edi::Interleaver edi_interleaver;
std::unordered_map<udp_destination_t*, std::shared_ptr<Socket::UDPSocket>> udp_sockets;
- std::unordered_map<tcp_destination_t*, std::shared_ptr<Socket::TCPDataDispatcher>> tcp_dispatchers;
+ std::unordered_map<tcp_server_t*, std::shared_ptr<Socket::TCPDataDispatcher>> tcp_dispatchers;
+ std::unordered_map<tcp_client_t*, std::shared_ptr<Socket::TCPSocket>> tcp_senders;
};
}