From fdff20e96d7198124542e6d03c27e1d496c4b328 Mon Sep 17 00:00:00 2001 From: Sergiy G Date: Wed, 31 Jan 2018 10:41:34 +0200 Subject: Enabled multicast support. Fix missing header in PAPRStats --- lib/UdpSocket.cpp | 16 ++++++++++++---- lib/UdpSocket.h | 6 ++++-- src/EtiReader.cpp | 39 +++++++++++++++++++++++++-------------- src/EtiReader.h | 2 ++ src/PAPRStats.cpp | 1 + 5 files changed, 44 insertions(+), 20 deletions(-) diff --git a/lib/UdpSocket.cpp b/lib/UdpSocket.cpp index e711577..7c99724 100644 --- a/lib/UdpSocket.cpp +++ b/lib/UdpSocket.cpp @@ -170,7 +170,7 @@ int UdpSocket::send(const std::vector& data, InetAddress destination) * @param groupname The multicast address to join. * @return 0 if ok, -1 if error */ -int UdpSocket::joinGroup(char* groupname) +int UdpSocket::joinGroup(const char* groupname, const char* if_addr) { ip_mreqn group; if ((group.imr_multiaddr.s_addr = inet_addr(groupname)) == INADDR_NONE) { @@ -181,7 +181,7 @@ int UdpSocket::joinGroup(char* groupname) setInetError("Not a multicast address"); return -1; } - group.imr_address.s_addr = htons(INADDR_ANY);; + group.imr_address.s_addr = inet_addr(if_addr); group.imr_ifindex = 0; if (setsockopt(listenSocket, IPPROTO_IP, IP_ADD_MEMBERSHIP, &group, sizeof(group)) == SOCKET_ERROR) { @@ -261,8 +261,10 @@ UdpReceiver::~UdpReceiver() { } } -void UdpReceiver::start(int port, size_t max_packets_queued) { +void UdpReceiver::start(int port, std::string& bindto, std::string& mcastaddr, size_t max_packets_queued) { m_port = port; + m_bindto = bindto; + m_mcastaddr = mcastaddr; m_max_packets_queued = max_packets_queued; m_thread = std::thread(&UdpReceiver::m_run, this); } @@ -288,7 +290,13 @@ void UdpReceiver::m_run() private: atomic& m_stop; } autoSetStop(m_stop); - m_sock.reinit(m_port, "0.0.0.0"); + if(IN_MULTICAST(ntohl(inet_addr(m_mcastaddr.c_str())))){ + m_sock.reinit(m_port, m_mcastaddr); + m_sock.setMulticastSource(m_bindto.c_str()); + m_sock.joinGroup(m_mcastaddr.c_str(), m_bindto.c_str()); + } else { + m_sock.reinit(m_port, m_bindto); + } const size_t packsize = 8192; UdpPacket packet(packsize); diff --git a/lib/UdpSocket.h b/lib/UdpSocket.h index 8c968d2..8c71b9d 100644 --- a/lib/UdpSocket.h +++ b/lib/UdpSocket.h @@ -111,7 +111,7 @@ class UdpSocket */ int receive(UdpPacket& packet); - int joinGroup(char* groupname); + int joinGroup(const char* groupname, const char *if_addr); int setMulticastSource(const char* source_addr); int setMulticastTTL(int ttl); @@ -184,7 +184,7 @@ class UdpReceiver { UdpReceiver operator=(const UdpReceiver&) = delete; // Start the receiver in a separate thread - void start(int port, size_t max_packets_queued); + void start(int port, std::string& bindto, std::string& mcastaddr, size_t max_packets_queued); // Get the data contained in a UDP packet, blocks if none available // In case of error, throws a runtime_error @@ -194,6 +194,8 @@ class UdpReceiver { void m_run(void); int m_port; + std::string m_bindto; + std::string m_mcastaddr; size_t m_max_packets_queued; std::thread m_thread; std::atomic m_stop; diff --git a/src/EtiReader.cpp b/src/EtiReader.cpp index a28deb2..499e837 100644 --- a/src/EtiReader.cpp +++ b/src/EtiReader.cpp @@ -534,28 +534,39 @@ void EdiReader::assemble() EdiUdpInput::EdiUdpInput(EdiDecoder::ETIDecoder& decoder) : m_enabled(false), m_port(0), + m_bindto("0.0.0.0"), + m_mcastaddr("0.0.0.0"), m_decoder(decoder) { } + void EdiUdpInput::Open(const std::string& uri) { etiLog.level(info) << "Opening EDI :" << uri; + size_t found_port = uri.find_first_of(":", 6); + if(found_port == string::npos) + throw std::invalid_argument("EDI input port must be provided"); + + m_port =std::stoi(uri.substr(found_port+1)); + std::string host_full=uri.substr(6, found_port-6);// ignore udp:// + size_t found_mcast = host_full.find_first_of("@"); //have multicast address: + if(found_mcast != string::npos) { + if(found_mcast>0) + m_bindto=host_full.substr(0,found_mcast); + m_mcastaddr=host_full.substr(found_mcast+1); + } else if(found_port != 6) { + m_bindto=host_full; + } - const std::regex re_udp("udp://:([0-9]+)"); - std::smatch m; - if (std::regex_match(uri, m, re_udp)) { - m_port = std::stoi(m[1].str()); - - etiLog.level(info) << "EDI port :" << m_port; + etiLog.level(info) << "EDI input: host:" << m_bindto << ", source:" << m_mcastaddr << ", port:" << m_port; - // The max_fragments_queued is only a protection against a runaway - // memory usage. - // Rough calculation: - // 300 seconds, 24ms per frame, up to 20 fragments per frame - const size_t max_fragments_queued = 20 * 300 * 1000 / 24; + // The max_fragments_queued is only a protection against a runaway + // memory usage. + // Rough calculation: + // 300 seconds, 24ms per frame, up to 20 fragments per frame + const size_t max_fragments_queued = 20 * 300 * 1000 / 24; - m_udp_rx.start(m_port, max_fragments_queued); - m_enabled = true; - } + m_udp_rx.start(m_port, m_bindto, m_mcastaddr, max_fragments_queued); + m_enabled = true; } bool EdiUdpInput::rxPacket() diff --git a/src/EtiReader.h b/src/EtiReader.h index eb97e3b..4db5004 100644 --- a/src/EtiReader.h +++ b/src/EtiReader.h @@ -202,6 +202,8 @@ class EdiUdpInput { private: bool m_enabled; int m_port; + std::string m_bindto; + std::string m_mcastaddr; UdpReceiver m_udp_rx; EdiDecoder::ETIDecoder& m_decoder; diff --git a/src/PAPRStats.cpp b/src/PAPRStats.cpp index 1a72238..0c9764a 100644 --- a/src/PAPRStats.cpp +++ b/src/PAPRStats.cpp @@ -27,6 +27,7 @@ #include "PAPRStats.h" #include #include +#include #if defined(TEST) /* compile with g++ -std=c++11 -Wall -DTEST PAPRStats.cpp -o paprtest */ # include -- cgit v1.2.3