From 43f4a3a2a695c303bd4fdfbd7fec6def29284f2e Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Tue, 28 May 2019 16:56:43 +0200 Subject: Unify Socket abstractions --- src/dabOutput/dabOutput.h | 21 +++++-------- src/dabOutput/dabOutputTcp.cpp | 2 +- src/dabOutput/dabOutputUdp.cpp | 65 +++++++++++++++++------------------------ src/dabOutput/edi/Transport.cpp | 24 +++++---------- src/dabOutput/edi/Transport.h | 6 ++-- 5 files changed, 45 insertions(+), 73 deletions(-) (limited to 'src/dabOutput') diff --git a/src/dabOutput/dabOutput.h b/src/dabOutput/dabOutput.h index 9cc18d7..c7e570b 100644 --- a/src/dabOutput/dabOutput.h +++ b/src/dabOutput/dabOutput.h @@ -28,8 +28,7 @@ #pragma once -#include "UdpSocket.h" -#include "TcpSocket.h" +#include "Socket.h" #include "Log.h" #include "string.h" #include @@ -57,6 +56,8 @@ class DabOutput { return Open(name.c_str()); } + + // Return -1 on failure virtual int Write(void* buffer, int size) = 0; virtual int Close() = 0; @@ -145,15 +146,7 @@ class DabOutputRaw : public DabOutput class DabOutputUdp : public DabOutput { public: - DabOutputUdp() { - packet_ = new UdpPacket(6144); - socket_ = new UdpSocket(); - } - - virtual ~DabOutputUdp() { - delete socket_; - delete packet_; - } + DabOutputUdp(); int Open(const char* name); int Write(void* buffer, int size); @@ -171,8 +164,8 @@ class DabOutputUdp : public DabOutput DabOutputUdp operator=(const DabOutputUdp& other) = delete; std::string uri_; - UdpSocket* socket_; - UdpPacket* packet_; + Socket::UDPSocket socket_; + Socket::UDPPacket packet_; }; // -------------- TCP ------------------ @@ -190,7 +183,7 @@ class DabOutputTcp : public DabOutput private: std::string uri_; - std::shared_ptr dispatcher_; + std::shared_ptr dispatcher_; }; // -------------- Simul ------------------ diff --git a/src/dabOutput/dabOutputTcp.cpp b/src/dabOutput/dabOutputTcp.cpp index 87dbfd5..4dc3538 100644 --- a/src/dabOutput/dabOutputTcp.cpp +++ b/src/dabOutput/dabOutputTcp.cpp @@ -94,7 +94,7 @@ int DabOutputTcp::Open(const char* name) uri_ = name; if (success) { - dispatcher_ = make_shared(MAX_QUEUED_ETI_FRAMES); + dispatcher_ = make_shared(MAX_QUEUED_ETI_FRAMES); dispatcher_->start(port, address); } else { diff --git a/src/dabOutput/dabOutputUdp.cpp b/src/dabOutput/dabOutputUdp.cpp index c129569..b9c22db 100644 --- a/src/dabOutput/dabOutputUdp.cpp +++ b/src/dabOutput/dabOutputUdp.cpp @@ -38,18 +38,12 @@ #include #include #include "dabOutput.h" -#include "UdpSocket.h" - -#ifdef _WIN32 -# include -# include -#else -# include -# include -# include -# include -# include -#endif +#include "Socket.h" + +DabOutputUdp::DabOutputUdp() : + socket_(), + packet_(6144) +{ } int DabOutputUdp::Open(const char* name) { @@ -64,12 +58,6 @@ int DabOutputUdp::Open(const char* name) regex_constants::match_default)) { string address = what[1]; - if (this->packet_->getAddress().setAddress(address.c_str()) == -1) { - etiLog.level(error) << "can't set address " << - address << "(" << inetErrDesc << ": " << inetErrMsg << ")"; - return -1; - } - string port_str = what[2]; long port = std::strtol(port_str.c_str(), nullptr, 0); @@ -79,7 +67,7 @@ int DabOutputUdp::Open(const char* name) return -1; } - this->packet_->getAddress().setPort(port); + packet_.address.resolveUdpDestination(address, port); string query_params = what[3]; smatch query_what; @@ -87,28 +75,25 @@ int DabOutputUdp::Open(const char* name) regex_constants::match_default)) { string src = query_what[1]; - int err = socket_->setMulticastSource(src.c_str()); - if (err) { - etiLog.level(error) << "UDP output socket set source failed!"; - return -1; - } + try { + socket_.setMulticastSource(src.c_str()); - string ttl_str = query_what[2]; + string ttl_str = query_what[2]; - if (not ttl_str.empty()) { - long ttl = std::strtol(ttl_str.c_str(), nullptr, 0); - if ((ttl <= 0) || (ttl >= 255)) { - etiLog.level(error) << "Invalid TTL setting in " << - uri_without_proto; - return -1; - } + if (not ttl_str.empty()) { + long ttl = std::strtol(ttl_str.c_str(), nullptr, 0); + if ((ttl <= 0) || (ttl >= 255)) { + etiLog.level(error) << "Invalid TTL setting in " << + uri_without_proto; + return -1; + } - err = socket_->setMulticastTTL(ttl); - if (err) { - etiLog.level(error) << "UDP output socket set TTL failed!"; - return -1; + socket_.setMulticastTTL(ttl); } } + catch (const std::runtime_error& e) { + etiLog.level(error) << "Failed to set UDP output settings" << e.what(); + } } else if (not query_params.empty()) { etiLog.level(error) << "UDP output: could not parse parameters " << @@ -129,9 +114,11 @@ int DabOutputUdp::Open(const char* name) int DabOutputUdp::Write(void* buffer, int size) { - this->packet_->setSize(0); - this->packet_->addData(buffer, size); - return this->socket_->send(*this->packet_); + const uint8_t *buf = reinterpret_cast(buffer); + packet_.buffer.resize(0); + std::copy(buf, buf + size, std::back_inserter(packet_.buffer)); + socket_.send(packet_); + return 0; } #endif // defined(HAVE_OUTPUT_UDP) diff --git a/src/dabOutput/edi/Transport.cpp b/src/dabOutput/edi/Transport.cpp index d99e987..6d3950f 100644 --- a/src/dabOutput/edi/Transport.cpp +++ b/src/dabOutput/edi/Transport.cpp @@ -69,23 +69,17 @@ Sender::Sender(const configuration_t& conf) : for (const auto& edi_dest : m_conf.destinations) { if (const auto udp_dest = dynamic_pointer_cast(edi_dest)) { - auto udp_socket = std::make_shared(udp_dest->source_port); + auto udp_socket = std::make_shared(udp_dest->source_port); if (not udp_dest->source_addr.empty()) { - int err = udp_socket->setMulticastSource(udp_dest->source_addr.c_str()); - if (err) { - throw runtime_error("EDI socket set source failed!"); - } - err = udp_socket->setMulticastTTL(udp_dest->ttl); - if (err) { - throw runtime_error("EDI socket set TTL failed!"); - } + udp_socket->setMulticastSource(udp_dest->source_addr.c_str()); + udp_socket->setMulticastTTL(udp_dest->ttl); } udp_sockets.emplace(udp_dest.get(), udp_socket); } else if (auto tcp_dest = dynamic_pointer_cast(edi_dest)) { - auto dispatcher = make_shared(tcp_dest->max_frames_queued); + auto dispatcher = make_shared(tcp_dest->max_frames_queued); dispatcher->start(tcp_dest->listen_port, "0.0.0.0"); tcp_dispatchers.emplace(tcp_dest.get(), dispatcher); } @@ -129,9 +123,8 @@ void Sender::write(const TagPacket& tagpacket) for (const auto& edi_frag : edi_fragments) { for (auto& dest : m_conf.destinations) { if (const auto& udp_dest = dynamic_pointer_cast(dest)) { - InetAddress addr; - addr.setAddress(udp_dest->dest_addr.c_str()); - addr.setPort(m_conf.dest_port); + Socket::InetAddress addr; + addr.resolveUdpDestination(udp_dest->dest_addr, m_conf.dest_port); udp_sockets.at(udp_dest.get())->send(edi_frag, addr); } @@ -158,9 +151,8 @@ void Sender::write(const TagPacket& tagpacket) // Send over ethernet for (auto& dest : m_conf.destinations) { if (const auto& udp_dest = dynamic_pointer_cast(dest)) { - InetAddress addr; - addr.setAddress(udp_dest->dest_addr.c_str()); - addr.setPort(m_conf.dest_port); + Socket::InetAddress addr; + addr.resolveUdpDestination(udp_dest->dest_addr, m_conf.dest_port); udp_sockets.at(udp_dest.get())->send(af_packet, addr); } diff --git a/src/dabOutput/edi/Transport.h b/src/dabOutput/edi/Transport.h index 7b0a0db..74126d1 100644 --- a/src/dabOutput/edi/Transport.h +++ b/src/dabOutput/edi/Transport.h @@ -32,11 +32,11 @@ #include "AFPacket.h" #include "PFT.h" #include "Interleaver.h" +#include "Socket.h" #include #include #include #include -#include "dabOutput/dabOutput.h" namespace edi { @@ -61,8 +61,8 @@ class Sender { // To mitigate for burst packet loss, PFT fragments can be sent out-of-order edi::Interleaver edi_interleaver; - std::unordered_map> udp_sockets; - std::unordered_map> tcp_dispatchers; + std::unordered_map> udp_sockets; + std::unordered_map> tcp_dispatchers; }; } -- cgit v1.2.3