diff options
Diffstat (limited to 'src/dabOutput')
-rw-r--r-- | src/dabOutput/dabOutput.h | 21 | ||||
-rw-r--r-- | src/dabOutput/dabOutputTcp.cpp | 2 | ||||
-rw-r--r-- | src/dabOutput/dabOutputUdp.cpp | 65 | ||||
-rw-r--r-- | src/dabOutput/edi/Config.h | 9 | ||||
-rw-r--r-- | src/dabOutput/edi/PFT.cpp | 2 | ||||
-rw-r--r-- | src/dabOutput/edi/Transport.cpp | 67 | ||||
-rw-r--r-- | src/dabOutput/edi/Transport.h | 8 |
7 files changed, 85 insertions, 89 deletions
diff --git a/src/dabOutput/dabOutput.h b/src/dabOutput/dabOutput.h index 9cc18d7..c7e570b 100644 --- a/src/dabOutput/dabOutput.h +++ b/src/dabOutput/dabOutput.h @@ -28,8 +28,7 @@ #pragma once -#include "UdpSocket.h" -#include "TcpSocket.h" +#include "Socket.h" #include "Log.h" #include "string.h" #include <stdexcept> @@ -57,6 +56,8 @@ class DabOutput { return Open(name.c_str()); } + + // Return -1 on failure virtual int Write(void* buffer, int size) = 0; virtual int Close() = 0; @@ -145,15 +146,7 @@ class DabOutputRaw : public DabOutput class DabOutputUdp : public DabOutput { public: - DabOutputUdp() { - packet_ = new UdpPacket(6144); - socket_ = new UdpSocket(); - } - - virtual ~DabOutputUdp() { - delete socket_; - delete packet_; - } + DabOutputUdp(); int Open(const char* name); int Write(void* buffer, int size); @@ -171,8 +164,8 @@ class DabOutputUdp : public DabOutput DabOutputUdp operator=(const DabOutputUdp& other) = delete; std::string uri_; - UdpSocket* socket_; - UdpPacket* packet_; + Socket::UDPSocket socket_; + Socket::UDPPacket packet_; }; // -------------- TCP ------------------ @@ -190,7 +183,7 @@ class DabOutputTcp : public DabOutput private: std::string uri_; - std::shared_ptr<TCPDataDispatcher> dispatcher_; + std::shared_ptr<Socket::TCPDataDispatcher> dispatcher_; }; // -------------- Simul ------------------ diff --git a/src/dabOutput/dabOutputTcp.cpp b/src/dabOutput/dabOutputTcp.cpp index 87dbfd5..4dc3538 100644 --- a/src/dabOutput/dabOutputTcp.cpp +++ b/src/dabOutput/dabOutputTcp.cpp @@ -94,7 +94,7 @@ int DabOutputTcp::Open(const char* name) uri_ = name; if (success) { - dispatcher_ = make_shared<TCPDataDispatcher>(MAX_QUEUED_ETI_FRAMES); + dispatcher_ = make_shared<Socket::TCPDataDispatcher>(MAX_QUEUED_ETI_FRAMES); dispatcher_->start(port, address); } else { diff --git a/src/dabOutput/dabOutputUdp.cpp b/src/dabOutput/dabOutputUdp.cpp index c129569..b9c22db 100644 --- a/src/dabOutput/dabOutputUdp.cpp +++ b/src/dabOutput/dabOutputUdp.cpp @@ -38,18 +38,12 @@ #include <cstdio> #include <limits.h> #include "dabOutput.h" -#include "UdpSocket.h" - -#ifdef _WIN32 -# include <fscfg.h> -# include <sdci.h> -#else -# include <netinet/in.h> -# include <sys/types.h> -# include <sys/socket.h> -# include <sys/ioctl.h> -# include <net/if_arp.h> -#endif +#include "Socket.h" + +DabOutputUdp::DabOutputUdp() : + socket_(), + packet_(6144) +{ } int DabOutputUdp::Open(const char* name) { @@ -64,12 +58,6 @@ int DabOutputUdp::Open(const char* name) regex_constants::match_default)) { string address = what[1]; - if (this->packet_->getAddress().setAddress(address.c_str()) == -1) { - etiLog.level(error) << "can't set address " << - address << "(" << inetErrDesc << ": " << inetErrMsg << ")"; - return -1; - } - string port_str = what[2]; long port = std::strtol(port_str.c_str(), nullptr, 0); @@ -79,7 +67,7 @@ int DabOutputUdp::Open(const char* name) return -1; } - this->packet_->getAddress().setPort(port); + packet_.address.resolveUdpDestination(address, port); string query_params = what[3]; smatch query_what; @@ -87,28 +75,25 @@ int DabOutputUdp::Open(const char* name) regex_constants::match_default)) { string src = query_what[1]; - int err = socket_->setMulticastSource(src.c_str()); - if (err) { - etiLog.level(error) << "UDP output socket set source failed!"; - return -1; - } + try { + socket_.setMulticastSource(src.c_str()); - string ttl_str = query_what[2]; + string ttl_str = query_what[2]; - if (not ttl_str.empty()) { - long ttl = std::strtol(ttl_str.c_str(), nullptr, 0); - if ((ttl <= 0) || (ttl >= 255)) { - etiLog.level(error) << "Invalid TTL setting in " << - uri_without_proto; - return -1; - } + if (not ttl_str.empty()) { + long ttl = std::strtol(ttl_str.c_str(), nullptr, 0); + if ((ttl <= 0) || (ttl >= 255)) { + etiLog.level(error) << "Invalid TTL setting in " << + uri_without_proto; + return -1; + } - err = socket_->setMulticastTTL(ttl); - if (err) { - etiLog.level(error) << "UDP output socket set TTL failed!"; - return -1; + socket_.setMulticastTTL(ttl); } } + catch (const std::runtime_error& e) { + etiLog.level(error) << "Failed to set UDP output settings" << e.what(); + } } else if (not query_params.empty()) { etiLog.level(error) << "UDP output: could not parse parameters " << @@ -129,9 +114,11 @@ int DabOutputUdp::Open(const char* name) int DabOutputUdp::Write(void* buffer, int size) { - this->packet_->setSize(0); - this->packet_->addData(buffer, size); - return this->socket_->send(*this->packet_); + const uint8_t *buf = reinterpret_cast<uint8_t*>(buffer); + packet_.buffer.resize(0); + std::copy(buf, buf + size, std::back_inserter(packet_.buffer)); + socket_.send(packet_); + return 0; } #endif // defined(HAVE_OUTPUT_UDP) diff --git a/src/dabOutput/edi/Config.h b/src/dabOutput/edi/Config.h index 55d5f0f..0c7dce8 100644 --- a/src/dabOutput/edi/Config.h +++ b/src/dabOutput/edi/Config.h @@ -50,11 +50,18 @@ struct udp_destination_t : public destination_t { }; // TCP server that can accept multiple connections -struct tcp_destination_t : public destination_t { +struct tcp_server_t : public destination_t { unsigned int listen_port = 0; size_t max_frames_queued = 1024; }; +// TCP client that connects to one endpoint +struct tcp_client_t : public destination_t { + std::string dest_addr; + unsigned int dest_port = 0; + size_t max_frames_queued = 1024; +}; + struct configuration_t { unsigned chunk_len = 207; // RSk, data length of each chunk unsigned fec = 0; // number of fragments that can be recovered diff --git a/src/dabOutput/edi/PFT.cpp b/src/dabOutput/edi/PFT.cpp index 5b93016..63dfa34 100644 --- a/src/dabOutput/edi/PFT.cpp +++ b/src/dabOutput/edi/PFT.cpp @@ -314,7 +314,7 @@ std::vector< PFTFragment > PFT::Assemble(AFPacket af_packet) #if 0 fprintf(stderr, "* PFT pseq %d, findex %d, fcount %d, plen %d\n", - m_pseq, findex, fcount, plen & ~0x8000); + m_pseq, findex, fcount, plen & ~0xC000); #endif } diff --git a/src/dabOutput/edi/Transport.cpp b/src/dabOutput/edi/Transport.cpp index d99e987..187aabe 100644 --- a/src/dabOutput/edi/Transport.cpp +++ b/src/dabOutput/edi/Transport.cpp @@ -45,12 +45,16 @@ void configuration_t::print() const } etiLog.level(info) << " source port " << udp_dest->source_port; } - else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_destination_t>(edi_dest)) { + else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_server_t>(edi_dest)) { etiLog.level(info) << " TCP listening on port " << tcp_dest->listen_port; etiLog.level(info) << " max frames queued " << tcp_dest->max_frames_queued; } + else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_client_t>(edi_dest)) { + etiLog.level(info) << " TCP client connecting to " << tcp_dest->dest_addr << ":" << tcp_dest->dest_port; + etiLog.level(info) << " max frames queued " << tcp_dest->max_frames_queued; + } else { - throw std::logic_error("EDI destination not implemented"); + throw logic_error("EDI destination not implemented"); } } if (interleaver_enabled()) { @@ -69,28 +73,27 @@ Sender::Sender(const configuration_t& conf) : for (const auto& edi_dest : m_conf.destinations) { if (const auto udp_dest = dynamic_pointer_cast<edi::udp_destination_t>(edi_dest)) { - auto udp_socket = std::make_shared<UdpSocket>(udp_dest->source_port); + auto udp_socket = std::make_shared<Socket::UDPSocket>(udp_dest->source_port); if (not udp_dest->source_addr.empty()) { - int err = udp_socket->setMulticastSource(udp_dest->source_addr.c_str()); - if (err) { - throw runtime_error("EDI socket set source failed!"); - } - err = udp_socket->setMulticastTTL(udp_dest->ttl); - if (err) { - throw runtime_error("EDI socket set TTL failed!"); - } + udp_socket->setMulticastSource(udp_dest->source_addr.c_str()); + udp_socket->setMulticastTTL(udp_dest->ttl); } udp_sockets.emplace(udp_dest.get(), udp_socket); } - else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_destination_t>(edi_dest)) { - auto dispatcher = make_shared<TCPDataDispatcher>(tcp_dest->max_frames_queued); + else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_server_t>(edi_dest)) { + auto dispatcher = make_shared<Socket::TCPDataDispatcher>(tcp_dest->max_frames_queued); dispatcher->start(tcp_dest->listen_port, "0.0.0.0"); tcp_dispatchers.emplace(tcp_dest.get(), dispatcher); } + else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_client_t>(edi_dest)) { + auto tcp_socket = make_shared<Socket::TCPSocket>(); + tcp_socket->connect(tcp_dest->dest_addr, tcp_dest->dest_port); + tcp_senders.emplace(tcp_dest.get(), tcp_socket); + } else { - throw std::logic_error("EDI destination not implemented"); + throw logic_error("EDI destination not implemented"); } } @@ -117,7 +120,7 @@ void Sender::write(const TagPacket& tagpacket) vector<edi::PFTFragment> edi_fragments = edi_pft.Assemble(af_packet); if (m_conf.verbose) { - fprintf(stderr, "EDI number of PFT fragment before interleaver %zu", + fprintf(stderr, "EDI number of PFT fragment before interleaver %zu\n", edi_fragments.size()); } @@ -129,28 +132,30 @@ void Sender::write(const TagPacket& tagpacket) for (const auto& edi_frag : edi_fragments) { for (auto& dest : m_conf.destinations) { if (const auto& udp_dest = dynamic_pointer_cast<edi::udp_destination_t>(dest)) { - InetAddress addr; - addr.setAddress(udp_dest->dest_addr.c_str()); - addr.setPort(m_conf.dest_port); + Socket::InetAddress addr; + addr.resolveUdpDestination(udp_dest->dest_addr, m_conf.dest_port); udp_sockets.at(udp_dest.get())->send(edi_frag, addr); } - else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_destination_t>(dest)) { + else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_server_t>(dest)) { tcp_dispatchers.at(tcp_dest.get())->write(edi_frag); } + else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_client_t>(dest)) { + tcp_senders.at(tcp_dest.get())->sendall(edi_frag.data(), edi_frag.size()); + } else { - throw std::logic_error("EDI destination not implemented"); + throw logic_error("EDI destination not implemented"); } } if (m_conf.dump) { - std::ostream_iterator<uint8_t> debug_iterator(edi_debug_file); - std::copy(edi_frag.begin(), edi_frag.end(), debug_iterator); + ostream_iterator<uint8_t> debug_iterator(edi_debug_file); + copy(edi_frag.begin(), edi_frag.end(), debug_iterator); } } if (m_conf.verbose) { - fprintf(stderr, "EDI number of PFT fragments %zu", + fprintf(stderr, "EDI number of PFT fragments %zu\n", edi_fragments.size()); } } @@ -158,23 +163,25 @@ void Sender::write(const TagPacket& tagpacket) // Send over ethernet for (auto& dest : m_conf.destinations) { if (const auto& udp_dest = dynamic_pointer_cast<edi::udp_destination_t>(dest)) { - InetAddress addr; - addr.setAddress(udp_dest->dest_addr.c_str()); - addr.setPort(m_conf.dest_port); + Socket::InetAddress addr; + addr.resolveUdpDestination(udp_dest->dest_addr, m_conf.dest_port); udp_sockets.at(udp_dest.get())->send(af_packet, addr); } - else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_destination_t>(dest)) { + else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_server_t>(dest)) { tcp_dispatchers.at(tcp_dest.get())->write(af_packet); } + else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_client_t>(dest)) { + tcp_senders.at(tcp_dest.get())->sendall(af_packet.data(), af_packet.size()); + } else { - throw std::logic_error("EDI destination not implemented"); + throw logic_error("EDI destination not implemented"); } } if (m_conf.dump) { - std::ostream_iterator<uint8_t> debug_iterator(edi_debug_file); - std::copy(af_packet.begin(), af_packet.end(), debug_iterator); + ostream_iterator<uint8_t> debug_iterator(edi_debug_file); + copy(af_packet.begin(), af_packet.end(), debug_iterator); } } } diff --git a/src/dabOutput/edi/Transport.h b/src/dabOutput/edi/Transport.h index 7b0a0db..9633275 100644 --- a/src/dabOutput/edi/Transport.h +++ b/src/dabOutput/edi/Transport.h @@ -32,11 +32,12 @@ #include "AFPacket.h" #include "PFT.h" #include "Interleaver.h" +#include "Socket.h" #include <vector> #include <unordered_map> #include <stdexcept> +#include <fstream> #include <cstdint> -#include "dabOutput/dabOutput.h" namespace edi { @@ -61,8 +62,9 @@ class Sender { // To mitigate for burst packet loss, PFT fragments can be sent out-of-order edi::Interleaver edi_interleaver; - std::unordered_map<udp_destination_t*, std::shared_ptr<UdpSocket>> udp_sockets; - std::unordered_map<tcp_destination_t*, std::shared_ptr<TCPDataDispatcher>> tcp_dispatchers; + std::unordered_map<udp_destination_t*, std::shared_ptr<Socket::UDPSocket>> udp_sockets; + std::unordered_map<tcp_server_t*, std::shared_ptr<Socket::TCPDataDispatcher>> tcp_dispatchers; + std::unordered_map<tcp_client_t*, std::shared_ptr<Socket::TCPSocket>> tcp_senders; }; } |