summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--lib/UdpSocket.cpp16
-rw-r--r--lib/UdpSocket.h6
-rw-r--r--src/EtiReader.cpp39
-rw-r--r--src/EtiReader.h2
-rw-r--r--src/PAPRStats.cpp1
5 files changed, 44 insertions, 20 deletions
diff --git a/lib/UdpSocket.cpp b/lib/UdpSocket.cpp
index e711577..7c99724 100644
--- a/lib/UdpSocket.cpp
+++ b/lib/UdpSocket.cpp
@@ -170,7 +170,7 @@ int UdpSocket::send(const std::vector<uint8_t>& data, InetAddress destination)
* @param groupname The multicast address to join.
* @return 0 if ok, -1 if error
*/
-int UdpSocket::joinGroup(char* groupname)
+int UdpSocket::joinGroup(const char* groupname, const char* if_addr)
{
ip_mreqn group;
if ((group.imr_multiaddr.s_addr = inet_addr(groupname)) == INADDR_NONE) {
@@ -181,7 +181,7 @@ int UdpSocket::joinGroup(char* groupname)
setInetError("Not a multicast address");
return -1;
}
- group.imr_address.s_addr = htons(INADDR_ANY);;
+ group.imr_address.s_addr = inet_addr(if_addr);
group.imr_ifindex = 0;
if (setsockopt(listenSocket, IPPROTO_IP, IP_ADD_MEMBERSHIP, &group, sizeof(group))
== SOCKET_ERROR) {
@@ -261,8 +261,10 @@ UdpReceiver::~UdpReceiver() {
}
}
-void UdpReceiver::start(int port, size_t max_packets_queued) {
+void UdpReceiver::start(int port, std::string& bindto, std::string& mcastaddr, size_t max_packets_queued) {
m_port = port;
+ m_bindto = bindto;
+ m_mcastaddr = mcastaddr;
m_max_packets_queued = max_packets_queued;
m_thread = std::thread(&UdpReceiver::m_run, this);
}
@@ -288,7 +290,13 @@ void UdpReceiver::m_run()
private: atomic<bool>& m_stop;
} autoSetStop(m_stop);
- m_sock.reinit(m_port, "0.0.0.0");
+ if(IN_MULTICAST(ntohl(inet_addr(m_mcastaddr.c_str())))){
+ m_sock.reinit(m_port, m_mcastaddr);
+ m_sock.setMulticastSource(m_bindto.c_str());
+ m_sock.joinGroup(m_mcastaddr.c_str(), m_bindto.c_str());
+ } else {
+ m_sock.reinit(m_port, m_bindto);
+ }
const size_t packsize = 8192;
UdpPacket packet(packsize);
diff --git a/lib/UdpSocket.h b/lib/UdpSocket.h
index 8c968d2..8c71b9d 100644
--- a/lib/UdpSocket.h
+++ b/lib/UdpSocket.h
@@ -111,7 +111,7 @@ class UdpSocket
*/
int receive(UdpPacket& packet);
- int joinGroup(char* groupname);
+ int joinGroup(const char* groupname, const char *if_addr);
int setMulticastSource(const char* source_addr);
int setMulticastTTL(int ttl);
@@ -184,7 +184,7 @@ class UdpReceiver {
UdpReceiver operator=(const UdpReceiver&) = delete;
// Start the receiver in a separate thread
- void start(int port, size_t max_packets_queued);
+ void start(int port, std::string& bindto, std::string& mcastaddr, size_t max_packets_queued);
// Get the data contained in a UDP packet, blocks if none available
// In case of error, throws a runtime_error
@@ -194,6 +194,8 @@ class UdpReceiver {
void m_run(void);
int m_port;
+ std::string m_bindto;
+ std::string m_mcastaddr;
size_t m_max_packets_queued;
std::thread m_thread;
std::atomic<bool> m_stop;
diff --git a/src/EtiReader.cpp b/src/EtiReader.cpp
index a28deb2..499e837 100644
--- a/src/EtiReader.cpp
+++ b/src/EtiReader.cpp
@@ -534,28 +534,39 @@ void EdiReader::assemble()
EdiUdpInput::EdiUdpInput(EdiDecoder::ETIDecoder& decoder) :
m_enabled(false),
m_port(0),
+ m_bindto("0.0.0.0"),
+ m_mcastaddr("0.0.0.0"),
m_decoder(decoder) { }
+
void EdiUdpInput::Open(const std::string& uri)
{
etiLog.level(info) << "Opening EDI :" << uri;
+ size_t found_port = uri.find_first_of(":", 6);
+ if(found_port == string::npos)
+ throw std::invalid_argument("EDI input port must be provided");
+
+ m_port =std::stoi(uri.substr(found_port+1));
+ std::string host_full=uri.substr(6, found_port-6);// ignore udp://
+ size_t found_mcast = host_full.find_first_of("@"); //have multicast address:
+ if(found_mcast != string::npos) {
+ if(found_mcast>0)
+ m_bindto=host_full.substr(0,found_mcast);
+ m_mcastaddr=host_full.substr(found_mcast+1);
+ } else if(found_port != 6) {
+ m_bindto=host_full;
+ }
- const std::regex re_udp("udp://:([0-9]+)");
- std::smatch m;
- if (std::regex_match(uri, m, re_udp)) {
- m_port = std::stoi(m[1].str());
-
- etiLog.level(info) << "EDI port :" << m_port;
+ etiLog.level(info) << "EDI input: host:" << m_bindto << ", source:" << m_mcastaddr << ", port:" << m_port;
- // The max_fragments_queued is only a protection against a runaway
- // memory usage.
- // Rough calculation:
- // 300 seconds, 24ms per frame, up to 20 fragments per frame
- const size_t max_fragments_queued = 20 * 300 * 1000 / 24;
+ // The max_fragments_queued is only a protection against a runaway
+ // memory usage.
+ // Rough calculation:
+ // 300 seconds, 24ms per frame, up to 20 fragments per frame
+ const size_t max_fragments_queued = 20 * 300 * 1000 / 24;
- m_udp_rx.start(m_port, max_fragments_queued);
- m_enabled = true;
- }
+ m_udp_rx.start(m_port, m_bindto, m_mcastaddr, max_fragments_queued);
+ m_enabled = true;
}
bool EdiUdpInput::rxPacket()
diff --git a/src/EtiReader.h b/src/EtiReader.h
index eb97e3b..4db5004 100644
--- a/src/EtiReader.h
+++ b/src/EtiReader.h
@@ -202,6 +202,8 @@ class EdiUdpInput {
private:
bool m_enabled;
int m_port;
+ std::string m_bindto;
+ std::string m_mcastaddr;
UdpReceiver m_udp_rx;
EdiDecoder::ETIDecoder& m_decoder;
diff --git a/src/PAPRStats.cpp b/src/PAPRStats.cpp
index 1a72238..0c9764a 100644
--- a/src/PAPRStats.cpp
+++ b/src/PAPRStats.cpp
@@ -27,6 +27,7 @@
#include "PAPRStats.h"
#include <numeric>
#include <cmath>
+#include <stdexcept>
#if defined(TEST)
/* compile with g++ -std=c++11 -Wall -DTEST PAPRStats.cpp -o paprtest */
# include <iostream>