aboutsummaryrefslogtreecommitdiffstats
path: root/lib
diff options
context:
space:
mode:
Diffstat (limited to 'lib')
-rw-r--r--lib/Socket.cpp137
-rw-r--r--lib/Socket.h11
-rw-r--r--lib/edi/STIDecoder.cpp14
-rw-r--r--lib/edi/STIDecoder.hpp7
-rw-r--r--lib/edi/common.cpp33
-rw-r--r--lib/edi/common.hpp12
6 files changed, 163 insertions, 51 deletions
diff --git a/lib/Socket.cpp b/lib/Socket.cpp
index b71c01e..2df1559 100644
--- a/lib/Socket.cpp
+++ b/lib/Socket.cpp
@@ -24,7 +24,7 @@
#include "Socket.h"
-#include <iostream>
+#include <stdexcept>
#include <cstdio>
#include <cstring>
#include <cerrno>
@@ -106,16 +106,20 @@ UDPSocket::UDPSocket(UDPSocket&& other)
{
m_sock = other.m_sock;
m_port = other.m_port;
+ m_multicast_source = other.m_multicast_source;
other.m_port = 0;
other.m_sock = INVALID_SOCKET;
+ other.m_multicast_source = "";
}
const UDPSocket& UDPSocket::operator=(UDPSocket&& other)
{
m_sock = other.m_sock;
m_port = other.m_port;
+ m_multicast_source = other.m_multicast_source;
other.m_port = 0;
other.m_sock = INVALID_SOCKET;
+ other.m_multicast_source = "";
return *this;
}
@@ -144,6 +148,7 @@ void UDPSocket::reinit(int port, const std::string& name)
// No need to bind to a given port, creating the
// socket is enough
m_sock = ::socket(AF_INET, SOCK_DGRAM, 0);
+ post_init();
return;
}
@@ -180,6 +185,7 @@ void UDPSocket::reinit(int port, const std::string& name)
if (::bind(sfd, rp->ai_addr, rp->ai_addrlen) == 0) {
m_sock = sfd;
+ post_init();
break;
}
@@ -189,10 +195,47 @@ void UDPSocket::reinit(int port, const std::string& name)
freeaddrinfo(result);
if (rp == nullptr) {
- throw runtime_error("Could not bind");
+ throw runtime_error(string{"Could not bind to port "} + to_string(port));
+ }
+}
+
+void UDPSocket::post_init() {
+ int pktinfo = 1;
+ if (setsockopt(m_sock, IPPROTO_IP, IP_PKTINFO, &pktinfo, sizeof(pktinfo)) == SOCKET_ERROR) {
+ throw runtime_error(string("Can't request pktinfo: ") + strerror(errno));
+ }
+
+}
+
+void UDPSocket::init_receive_multicast(int port, const string& local_if_addr, const string& mcastaddr)
+{
+ if (m_sock != INVALID_SOCKET) {
+ ::close(m_sock);
}
+
+ m_port = port;
+ m_sock = ::socket(AF_INET, SOCK_DGRAM, 0);
+ post_init();
+
+ int reuse_setting = 1;
+ if (setsockopt(m_sock, SOL_SOCKET, SO_REUSEADDR, &reuse_setting, sizeof(reuse_setting)) == SOCKET_ERROR) {
+ throw runtime_error("Can't reuse address");
+ }
+
+ struct sockaddr_in la;
+ memset((char *) &la, 0, sizeof(la));
+ la.sin_family = AF_INET;
+ la.sin_port = htons(port);
+ la.sin_addr.s_addr = INADDR_ANY;
+ if (::bind(m_sock, (struct sockaddr*)&la, sizeof(la))) {
+ throw runtime_error(string("Could not bind: ") + strerror(errno));
+ }
+
+ m_multicast_source = mcastaddr;
+ join_group(mcastaddr.c_str(), local_if_addr.c_str());
}
+
void UDPSocket::close()
{
if (m_sock != INVALID_SOCKET) {
@@ -212,16 +255,26 @@ UDPSocket::~UDPSocket()
UDPPacket UDPSocket::receive(size_t max_size)
{
+ struct sockaddr_in addr;
+ struct msghdr msg;
+ struct iovec iov;
+ constexpr size_t BUFFER_SIZE = 1024;
+ char control_buffer[BUFFER_SIZE];
+ struct cmsghdr *cmsg;
+
UDPPacket packet(max_size);
- socklen_t addrSize;
- addrSize = sizeof(*packet.address.as_sockaddr());
- ssize_t ret = recvfrom(m_sock,
- packet.buffer.data(),
- packet.buffer.size(),
- 0,
- packet.address.as_sockaddr(),
- &addrSize);
+ memset(&msg, 0, sizeof(msg));
+ msg.msg_name = &addr;
+ msg.msg_namelen = sizeof(addr);
+ msg.msg_iov = &iov;
+ iov.iov_base = packet.buffer.data();
+ iov.iov_len = packet.buffer.size();
+ msg.msg_iovlen = 1;
+ msg.msg_control = control_buffer;
+ msg.msg_controllen = sizeof(control_buffer);
+
+ ssize_t ret = recvmsg(m_sock, &msg, 0);
if (ret == SOCKET_ERROR) {
packet.buffer.resize(0);
@@ -232,12 +285,42 @@ UDPPacket UDPSocket::receive(size_t max_size)
if (errno == EAGAIN or errno == EWOULDBLOCK)
#endif
{
- return 0;
+ return packet;
}
throw runtime_error(string("Can't receive data: ") + strerror(errno));
}
- packet.buffer.resize(ret);
+ struct in_pktinfo *pktinfo = nullptr;
+ for (cmsg = CMSG_FIRSTHDR(&msg); cmsg != NULL; cmsg = CMSG_NXTHDR(&msg, cmsg)) {
+ if (cmsg->cmsg_level == IPPROTO_IP && cmsg->cmsg_type == IP_PKTINFO) {
+ pktinfo = (struct in_pktinfo *)CMSG_DATA(cmsg);
+ break;
+ }
+ }
+
+ if (pktinfo) {
+ char src_addr[INET_ADDRSTRLEN];
+ char dst_addr[INET_ADDRSTRLEN];
+ inet_ntop(AF_INET, &(addr.sin_addr), src_addr, INET_ADDRSTRLEN);
+ inet_ntop(AF_INET, &(pktinfo->ipi_addr), dst_addr, INET_ADDRSTRLEN);
+ //fprintf(stderr, "Received packet from %s to %s: %zu\n", src_addr, dst_addr, ret);
+
+ memcpy(&packet.address.addr, &addr, sizeof(addr));
+
+ if (m_multicast_source.empty() or
+ strcmp(dst_addr, m_multicast_source.c_str()) == 0) {
+ packet.buffer.resize(ret);
+ }
+ else {
+ // Ignore packet for different multicast group
+ packet.buffer.resize(0);
+ }
+ }
+ else {
+ //fprintf(stderr, "No pktinfo: %zu\n", ret);
+ packet.buffer.resize(ret);
+ }
+
return packet;
}
@@ -269,14 +352,14 @@ void UDPSocket::send(const std::string& data, InetAddress destination)
}
}
-void UDPSocket::joinGroup(const char* groupname, const char* if_addr)
+void UDPSocket::join_group(const char* groupname, const char* if_addr)
{
ip_mreqn group;
if ((group.imr_multiaddr.s_addr = inet_addr(groupname)) == INADDR_NONE) {
throw runtime_error("Cannot convert multicast group name");
}
if (!IN_MULTICAST(ntohl(group.imr_multiaddr.s_addr))) {
- throw runtime_error("Group name is not a multicast address");
+ throw runtime_error(string("Group name '") + groupname + "' is not a multicast address");
}
if (if_addr) {
@@ -288,7 +371,7 @@ void UDPSocket::joinGroup(const char* groupname, const char* if_addr)
group.imr_ifindex = 0;
if (setsockopt(m_sock, IPPROTO_IP, IP_ADD_MEMBERSHIP, &group, sizeof(group))
== SOCKET_ERROR) {
- throw runtime_error(string("Can't join multicast group") + strerror(errno));
+ throw runtime_error(string("Can't join multicast group: ") + strerror(errno));
}
}
@@ -296,12 +379,12 @@ void UDPSocket::setMulticastSource(const char* source_addr)
{
struct in_addr addr;
if (inet_aton(source_addr, &addr) == 0) {
- throw runtime_error(string("Can't parse source address") + strerror(errno));
+ throw runtime_error(string("Can't parse source address: ") + strerror(errno));
}
if (setsockopt(m_sock, IPPROTO_IP, IP_MULTICAST_IF, &addr, sizeof(addr))
== SOCKET_ERROR) {
- throw runtime_error(string("Can't set source address") + strerror(errno));
+ throw runtime_error(string("Can't set source address: ") + strerror(errno));
}
}
@@ -309,7 +392,7 @@ void UDPSocket::setMulticastTTL(int ttl)
{
if (setsockopt(m_sock, IPPROTO_IP, IP_MULTICAST_TTL, &ttl, sizeof(ttl))
== SOCKET_ERROR) {
- throw runtime_error(string("Can't set multicast ttl") + strerror(errno));
+ throw runtime_error(string("Can't set multicast ttl: ") + strerror(errno));
}
}
@@ -327,15 +410,13 @@ void UDPReceiver::add_receive_port(int port, const string& bindto, const string&
UDPSocket sock;
if (IN_MULTICAST(ntohl(inet_addr(mcastaddr.c_str())))) {
- sock.reinit(port, mcastaddr);
- sock.setMulticastSource(bindto.c_str());
- sock.joinGroup(mcastaddr.c_str(), bindto.c_str());
+ sock.init_receive_multicast(port, bindto, mcastaddr);
}
else {
sock.reinit(port, bindto);
}
- m_sockets.push_back(move(sock));
+ m_sockets.push_back(std::move(sock));
}
vector<UDPReceiver::ReceivedPacket> UDPReceiver::receive(int timeout_ms)
@@ -366,11 +447,13 @@ vector<UDPReceiver::ReceivedPacket> UDPReceiver::receive(int timeout_ms)
for (size_t i = 0; i < m_sockets.size(); i++) {
if (fds[i].revents & POLLIN) {
auto p = m_sockets[i].receive(2048); // This is larger than the usual MTU
- ReceivedPacket rp;
- rp.packetdata = move(p.buffer);
- rp.received_from = move(p.address);
- rp.port_received_on = m_sockets[i].getPort();
- received.push_back(move(rp));
+ if (not p.buffer.empty()) {
+ ReceivedPacket rp;
+ rp.packetdata = std::move(p.buffer);
+ rp.received_from = std::move(p.address);
+ rp.port_received_on = m_sockets[i].getPort();
+ received.push_back(std::move(rp));
+ }
}
}
diff --git a/lib/Socket.h b/lib/Socket.h
index d8242e2..1320a64 100644
--- a/lib/Socket.h
+++ b/lib/Socket.h
@@ -2,7 +2,7 @@
Copyright (C) 2003, 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the
Queen in Right of Canada (Communications Research Center Canada)
- Copyright (C) 2022
+ Copyright (C) 2024
Matthias P. Braendli, matthias.braendli@mpb.li
http://www.opendigitalradio.org
@@ -31,7 +31,7 @@
#include "ThreadsafeQueue.h"
#include <cstdlib>
#include <atomic>
-#include <iostream>
+#include <string>
#include <list>
#include <memory>
#include <thread>
@@ -111,13 +111,13 @@ class UDPSocket
/** Close the already open socket, and create a new one. Throws a runtime_error on error. */
void reinit(int port);
void reinit(int port, const std::string& name);
+ void init_receive_multicast(int port, const std::string& local_if_addr, const std::string& mcastaddr);
void close(void);
void send(UDPPacket& packet);
void send(const std::vector<uint8_t>& data, InetAddress destination);
void send(const std::string& data, InetAddress destination);
UDPPacket receive(size_t max_size);
- void joinGroup(const char* groupname, const char* if_addr = nullptr);
void setMulticastSource(const char* source_addr);
void setMulticastTTL(int ttl);
@@ -129,9 +129,14 @@ class UDPSocket
SOCKET getNativeSocket() const;
int getPort() const;
+ private:
+ void join_group(const char* groupname, const char* if_addr = nullptr);
+ void post_init();
+
protected:
SOCKET m_sock = INVALID_SOCKET;
int m_port = 0;
+ std::string m_multicast_source = "";
};
/* UDP packet receiver supporting receiving from several ports at once */
diff --git a/lib/edi/STIDecoder.cpp b/lib/edi/STIDecoder.cpp
index d387f1e..2de828b 100644
--- a/lib/edi/STIDecoder.cpp
+++ b/lib/edi/STIDecoder.cpp
@@ -70,6 +70,12 @@ void STIDecoder::setMaxDelay(int num_af_packets)
m_dispatcher.setMaxDelay(num_af_packets);
}
+void STIDecoder::filter_stream_index(bool enable, uint16_t index)
+{
+ m_filter_stream = enable;
+ m_filtered_stream_index = index;
+}
+
#define AFPACKET_HEADER_LEN 10 // includes SYNC
bool STIDecoder::decode_starptr(const std::vector<uint8_t>& value, const tag_name_t& /*n*/)
@@ -173,6 +179,14 @@ bool STIDecoder::decode_ssn(const std::vector<uint8_t>& value, const tag_name_t&
n = (uint16_t)(name[2]) << 8;
n |= (uint16_t)(name[3]);
+ if (n == 0) {
+ etiLog.level(warn) << "EDI: Stream index SSnn tag is zero";
+ }
+
+ if (m_filter_stream and m_filtered_stream_index != n) {
+ return true;
+ }
+
sti.stream_index = n - 1; // n is 1-indexed
sti.rfa = value[0] >> 3;
sti.tid = value[0] & 0x07;
diff --git a/lib/edi/STIDecoder.hpp b/lib/edi/STIDecoder.hpp
index f85f789..5e71ce7 100644
--- a/lib/edi/STIDecoder.hpp
+++ b/lib/edi/STIDecoder.hpp
@@ -119,6 +119,10 @@ class STIDecoder {
*/
void setMaxDelay(int num_af_packets);
+ /* Enable/disable stream-index filtering.
+ * index==0 is out of spec, but some encoders do it anyway. */
+ void filter_stream_index(bool enable, uint16_t index);
+
private:
bool decode_starptr(const std::vector<uint8_t>& value, const tag_name_t& n);
bool decode_dsti(const std::vector<uint8_t>& value, const tag_name_t& n);
@@ -132,6 +136,9 @@ class STIDecoder {
STIDataCollector& m_data_collector;
TagDispatcher m_dispatcher;
+
+ bool m_filter_stream = false;
+ uint16_t m_filtered_stream_index = 1;
};
}
diff --git a/lib/edi/common.cpp b/lib/edi/common.cpp
index c99997a..38eadf9 100644
--- a/lib/edi/common.cpp
+++ b/lib/edi/common.cpp
@@ -33,9 +33,9 @@ namespace EdiDecoder {
using namespace std;
-bool frame_timestamp_t::valid() const
+bool frame_timestamp_t::is_valid() const
{
- return tsta != 0xFFFFFF;
+ return tsta != 0xFFFFFF and seconds != 0;
}
string frame_timestamp_t::to_string() const
@@ -43,7 +43,7 @@ string frame_timestamp_t::to_string() const
const time_t seconds_in_unix_epoch = to_unix_epoch();
stringstream ss;
- if (valid()) {
+ if (is_valid()) {
ss << "Timestamp: ";
}
else {
@@ -129,10 +129,9 @@ std::string tag_name_to_human_readable(const tag_name_t& name)
return s;
}
-TagDispatcher::TagDispatcher(
- std::function<void()>&& af_packet_completed) :
- m_af_packet_completed(move(af_packet_completed)),
- m_tagpacket_handler([](const std::vector<uint8_t>& /*ignore*/){})
+TagDispatcher::TagDispatcher(std::function<void()>&& af_packet_completed) :
+ m_af_packet_completed(std::move(af_packet_completed)),
+ m_afpacket_handler([](std::vector<uint8_t>&& /*ignore*/){})
{
}
@@ -278,7 +277,6 @@ void TagDispatcher::setMaxDelay(int num_af_packets)
}
-#define AFPACKET_HEADER_LEN 10 // includes SYNC
TagDispatcher::decode_result_t TagDispatcher::decode_afpacket(
const std::vector<uint8_t> &input_data)
{
@@ -341,25 +339,30 @@ TagDispatcher::decode_result_t TagDispatcher::decode_afpacket(
return {decode_state_e::Error, AFPACKET_HEADER_LEN + taglength + crclen};
}
else {
+ vector<uint8_t> afpacket(AFPACKET_HEADER_LEN + taglength + crclen);
+ copy(input_data.begin(),
+ input_data.begin() + AFPACKET_HEADER_LEN + taglength + crclen,
+ afpacket.begin());
+ m_afpacket_handler(std::move(afpacket));
+
vector<uint8_t> payload(taglength);
copy(input_data.begin() + AFPACKET_HEADER_LEN,
input_data.begin() + AFPACKET_HEADER_LEN + taglength,
payload.begin());
- return {
- decode_tagpacket(payload) ? decode_state_e::Ok : decode_state_e::Error,
- AFPACKET_HEADER_LEN + taglength + crclen};
+ auto result = decode_tagpacket(payload) ? decode_state_e::Ok : decode_state_e::Error;
+ return {result, AFPACKET_HEADER_LEN + taglength + crclen};
}
}
void TagDispatcher::register_tag(const std::string& tag, tag_handler&& h)
{
- m_handlers[tag] = move(h);
+ m_handlers[tag] = std::move(h);
}
-void TagDispatcher::register_tagpacket_handler(tagpacket_handler&& h)
+void TagDispatcher::register_afpacket_handler(afpacket_handler&& h)
{
- m_tagpacket_handler = move(h);
+ m_afpacket_handler = std::move(h);
}
@@ -428,8 +431,6 @@ bool TagDispatcher::decode_tagpacket(const vector<uint8_t> &payload)
}
}
- m_tagpacket_handler(payload);
-
return success;
}
diff --git a/lib/edi/common.hpp b/lib/edi/common.hpp
index c3e6c40..fdd7424 100644
--- a/lib/edi/common.hpp
+++ b/lib/edi/common.hpp
@@ -32,12 +32,14 @@
namespace EdiDecoder {
+constexpr size_t AFPACKET_HEADER_LEN = 10; // includes SYNC
+
struct frame_timestamp_t {
uint32_t seconds = 0;
uint32_t utco = 0;
uint32_t tsta = 0xFFFFFF; // According to EN 300 797 Annex B
- bool valid() const;
+ bool is_valid() const;
std::string to_string() const;
std::time_t to_unix_epoch() const;
std::chrono::system_clock::time_point to_system_clock() const;
@@ -133,9 +135,9 @@ class TagDispatcher {
*/
void register_tag(const std::string& tag, tag_handler&& h);
- /* The complete tagpacket can also be retrieved */
- using tagpacket_handler = std::function<void(const std::vector<uint8_t>&)>;
- void register_tagpacket_handler(tagpacket_handler&& h);
+ /* The complete AF packet can also be retrieved */
+ using afpacket_handler = std::function<void(std::vector<uint8_t>&&)>;
+ void register_afpacket_handler(afpacket_handler&& h);
seq_info_t get_seq_info() const {
return m_last_sequences;
@@ -160,7 +162,7 @@ class TagDispatcher {
std::vector<uint8_t> m_input_data;
std::map<std::string, tag_handler> m_handlers;
std::function<void()> m_af_packet_completed;
- tagpacket_handler m_tagpacket_handler;
+ afpacket_handler m_afpacket_handler;
std::vector<std::string> m_ignored_tags;
};