summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--ChangeLog3
-rw-r--r--doc/advanced.mux16
-rw-r--r--lib/Socket.cpp103
-rw-r--r--lib/Socket.h10
-rw-r--r--lib/edi/common.cpp6
-rw-r--r--lib/edi/common.hpp2
-rw-r--r--src/input/Edi.cpp4
7 files changed, 112 insertions, 32 deletions
diff --git a/ChangeLog b/ChangeLog
index 6210fa3..5ede82f 100644
--- a/ChangeLog
+++ b/ChangeLog
@@ -4,6 +4,9 @@ ODR-DabMux in this repository
upcoming:
Make compatible with easydab again
Remove odr-zmq2edi
+ Fix timestamp issue with EDI streams that have seconds=0
+ Fix receiving multicast streams, when several multicast groups are
+ on the same port.
2024-05-05: Matthias P. Braendli <matthias@mpb.li>
(v4.5.0):
diff --git a/doc/advanced.mux b/doc/advanced.mux
index 35757cd..246f981 100644
--- a/doc/advanced.mux
+++ b/doc/advanced.mux
@@ -201,6 +201,22 @@ subchannels {
inputproto sti
inputuri "rtp://127.0.0.1:32010"
}
+ sub-udp {
+ type dabplus
+ bitrate 96
+ id 1
+ protection 3
+ inputproto edi
+
+ ; Receive EDI/UDP unicast on port 32010
+ inputuri "udp://:32010"
+
+ ; Receive EDI/UDP multicast stream on group 239.10.11.12 port 32010
+ ;inputuri "udp://@239.10.11.12:32010"
+
+ ; Same, but specify local interface address 192.168.0.1, to select which local interface to use
+ ;inputuri "udp://192.168.0.10@239.10.11.12:32010"
+ }
sub-ri {
type dabplus
bitrate 96
diff --git a/lib/Socket.cpp b/lib/Socket.cpp
index 1281066..bcffb07 100644
--- a/lib/Socket.cpp
+++ b/lib/Socket.cpp
@@ -24,7 +24,7 @@
#include "Socket.h"
-#include <iostream>
+#include <stdexcept>
#include <cstdio>
#include <cstring>
#include <cerrno>
@@ -106,16 +106,20 @@ UDPSocket::UDPSocket(UDPSocket&& other)
{
m_sock = other.m_sock;
m_port = other.m_port;
+ m_multicast_source = other.m_multicast_source;
other.m_port = 0;
other.m_sock = INVALID_SOCKET;
+ other.m_multicast_source = "";
}
const UDPSocket& UDPSocket::operator=(UDPSocket&& other)
{
m_sock = other.m_sock;
m_port = other.m_port;
+ m_multicast_source = other.m_multicast_source;
other.m_port = 0;
other.m_sock = INVALID_SOCKET;
+ other.m_multicast_source = "";
return *this;
}
@@ -144,6 +148,7 @@ void UDPSocket::reinit(int port, const std::string& name)
// No need to bind to a given port, creating the
// socket is enough
m_sock = ::socket(AF_INET, SOCK_DGRAM, 0);
+ post_init();
return;
}
@@ -180,6 +185,7 @@ void UDPSocket::reinit(int port, const std::string& name)
if (::bind(sfd, rp->ai_addr, rp->ai_addrlen) == 0) {
m_sock = sfd;
+ post_init();
break;
}
@@ -193,6 +199,14 @@ void UDPSocket::reinit(int port, const std::string& name)
}
}
+void UDPSocket::post_init() {
+ int pktinfo = 1;
+ if (setsockopt(m_sock, IPPROTO_IP, IP_PKTINFO, &pktinfo, sizeof(pktinfo)) == SOCKET_ERROR) {
+ throw runtime_error(string("Can't request pktinfo: ") + strerror(errno));
+ }
+
+}
+
void UDPSocket::init_receive_multicast(int port, const string& local_if_addr, const string& mcastaddr)
{
if (m_sock != INVALID_SOCKET) {
@@ -201,9 +215,10 @@ void UDPSocket::init_receive_multicast(int port, const string& local_if_addr, co
m_port = port;
m_sock = ::socket(AF_INET, SOCK_DGRAM, 0);
+ post_init();
int reuse_setting = 1;
- if (setsockopt(m_sock, SOL_SOCKET, SO_REUSEADDR, &reuse_setting, sizeof(reuse_setting)) == -1) {
+ if (setsockopt(m_sock, SOL_SOCKET, SO_REUSEADDR, &reuse_setting, sizeof(reuse_setting)) == SOCKET_ERROR) {
throw runtime_error("Can't reuse address");
}
@@ -216,8 +231,8 @@ void UDPSocket::init_receive_multicast(int port, const string& local_if_addr, co
throw runtime_error(string("Could not bind: ") + strerror(errno));
}
- joinGroup(mcastaddr.c_str(), local_if_addr.c_str());
-
+ m_multicast_source = mcastaddr;
+ join_group(mcastaddr.c_str(), local_if_addr.c_str());
}
@@ -240,16 +255,26 @@ UDPSocket::~UDPSocket()
UDPPacket UDPSocket::receive(size_t max_size)
{
+ struct sockaddr_in addr;
+ struct msghdr msg;
+ struct iovec iov;
+ constexpr size_t BUFFER_SIZE = 1024;
+ char control_buffer[BUFFER_SIZE];
+ struct cmsghdr *cmsg;
+
UDPPacket packet(max_size);
- socklen_t addrSize;
- addrSize = sizeof(*packet.address.as_sockaddr());
- ssize_t ret = recvfrom(m_sock,
- packet.buffer.data(),
- packet.buffer.size(),
- 0,
- packet.address.as_sockaddr(),
- &addrSize);
+ memset(&msg, 0, sizeof(msg));
+ msg.msg_name = &addr;
+ msg.msg_namelen = sizeof(addr);
+ msg.msg_iov = &iov;
+ iov.iov_base = packet.buffer.data();
+ iov.iov_len = packet.buffer.size();
+ msg.msg_iovlen = 1;
+ msg.msg_control = control_buffer;
+ msg.msg_controllen = sizeof(control_buffer);
+
+ ssize_t ret = recvmsg(m_sock, &msg, 0);
if (ret == SOCKET_ERROR) {
packet.buffer.resize(0);
@@ -260,12 +285,42 @@ UDPPacket UDPSocket::receive(size_t max_size)
if (errno == EAGAIN or errno == EWOULDBLOCK)
#endif
{
- return 0;
+ return packet;
}
throw runtime_error(string("Can't receive data: ") + strerror(errno));
}
- packet.buffer.resize(ret);
+ struct in_pktinfo *pktinfo = nullptr;
+ for (cmsg = CMSG_FIRSTHDR(&msg); cmsg != NULL; cmsg = CMSG_NXTHDR(&msg, cmsg)) {
+ if (cmsg->cmsg_level == IPPROTO_IP && cmsg->cmsg_type == IP_PKTINFO) {
+ pktinfo = (struct in_pktinfo *)CMSG_DATA(cmsg);
+ break;
+ }
+ }
+
+ if (pktinfo) {
+ char src_addr[INET_ADDRSTRLEN];
+ char dst_addr[INET_ADDRSTRLEN];
+ inet_ntop(AF_INET, &(addr.sin_addr), src_addr, INET_ADDRSTRLEN);
+ inet_ntop(AF_INET, &(pktinfo->ipi_addr), dst_addr, INET_ADDRSTRLEN);
+ //fprintf(stderr, "Received packet from %s to %s: %zu\n", src_addr, dst_addr, ret);
+
+ memcpy(&packet.address.addr, &addr, sizeof(addr));
+
+ if (m_multicast_source.empty() or
+ strcmp(dst_addr, m_multicast_source.c_str()) == 0) {
+ packet.buffer.resize(ret);
+ }
+ else {
+ // Ignore packet for different multicast group
+ packet.buffer.resize(0);
+ }
+ }
+ else {
+ //fprintf(stderr, "No pktinfo: %zu\n", ret);
+ packet.buffer.resize(ret);
+ }
+
return packet;
}
@@ -297,7 +352,7 @@ void UDPSocket::send(const std::string& data, InetAddress destination)
}
}
-void UDPSocket::joinGroup(const char* groupname, const char* if_addr)
+void UDPSocket::join_group(const char* groupname, const char* if_addr)
{
ip_mreqn group;
if ((group.imr_multiaddr.s_addr = inet_addr(groupname)) == INADDR_NONE) {
@@ -316,7 +371,7 @@ void UDPSocket::joinGroup(const char* groupname, const char* if_addr)
group.imr_ifindex = 0;
if (setsockopt(m_sock, IPPROTO_IP, IP_ADD_MEMBERSHIP, &group, sizeof(group))
== SOCKET_ERROR) {
- throw runtime_error(string("Can't join multicast group") + strerror(errno));
+ throw runtime_error(string("Can't join multicast group: ") + strerror(errno));
}
}
@@ -324,12 +379,12 @@ void UDPSocket::setMulticastSource(const char* source_addr)
{
struct in_addr addr;
if (inet_aton(source_addr, &addr) == 0) {
- throw runtime_error(string("Can't parse source address") + strerror(errno));
+ throw runtime_error(string("Can't parse source address: ") + strerror(errno));
}
if (setsockopt(m_sock, IPPROTO_IP, IP_MULTICAST_IF, &addr, sizeof(addr))
== SOCKET_ERROR) {
- throw runtime_error(string("Can't set source address") + strerror(errno));
+ throw runtime_error(string("Can't set source address: ") + strerror(errno));
}
}
@@ -392,11 +447,13 @@ vector<UDPReceiver::ReceivedPacket> UDPReceiver::receive(int timeout_ms)
for (size_t i = 0; i < m_sockets.size(); i++) {
if (fds[i].revents & POLLIN) {
auto p = m_sockets[i].receive(2048); // This is larger than the usual MTU
- ReceivedPacket rp;
- rp.packetdata = move(p.buffer);
- rp.received_from = move(p.address);
- rp.port_received_on = m_sockets[i].getPort();
- received.push_back(move(rp));
+ if (not p.buffer.empty()) {
+ ReceivedPacket rp;
+ rp.packetdata = std::move(p.buffer);
+ rp.received_from = std::move(p.address);
+ rp.port_received_on = m_sockets[i].getPort();
+ received.push_back(std::move(rp));
+ }
}
}
diff --git a/lib/Socket.h b/lib/Socket.h
index 44f93d0..1320a64 100644
--- a/lib/Socket.h
+++ b/lib/Socket.h
@@ -2,7 +2,7 @@
Copyright (C) 2003, 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the
Queen in Right of Canada (Communications Research Center Canada)
- Copyright (C) 2022
+ Copyright (C) 2024
Matthias P. Braendli, matthias.braendli@mpb.li
http://www.opendigitalradio.org
@@ -31,7 +31,7 @@
#include "ThreadsafeQueue.h"
#include <cstdlib>
#include <atomic>
-#include <iostream>
+#include <string>
#include <list>
#include <memory>
#include <thread>
@@ -118,7 +118,6 @@ class UDPSocket
void send(const std::vector<uint8_t>& data, InetAddress destination);
void send(const std::string& data, InetAddress destination);
UDPPacket receive(size_t max_size);
- void joinGroup(const char* groupname, const char* if_addr = nullptr);
void setMulticastSource(const char* source_addr);
void setMulticastTTL(int ttl);
@@ -130,9 +129,14 @@ class UDPSocket
SOCKET getNativeSocket() const;
int getPort() const;
+ private:
+ void join_group(const char* groupname, const char* if_addr = nullptr);
+ void post_init();
+
protected:
SOCKET m_sock = INVALID_SOCKET;
int m_port = 0;
+ std::string m_multicast_source = "";
};
/* UDP packet receiver supporting receiving from several ports at once */
diff --git a/lib/edi/common.cpp b/lib/edi/common.cpp
index b314737..38eadf9 100644
--- a/lib/edi/common.cpp
+++ b/lib/edi/common.cpp
@@ -33,9 +33,9 @@ namespace EdiDecoder {
using namespace std;
-bool frame_timestamp_t::valid() const
+bool frame_timestamp_t::is_valid() const
{
- return tsta != 0xFFFFFF;
+ return tsta != 0xFFFFFF and seconds != 0;
}
string frame_timestamp_t::to_string() const
@@ -43,7 +43,7 @@ string frame_timestamp_t::to_string() const
const time_t seconds_in_unix_epoch = to_unix_epoch();
stringstream ss;
- if (valid()) {
+ if (is_valid()) {
ss << "Timestamp: ";
}
else {
diff --git a/lib/edi/common.hpp b/lib/edi/common.hpp
index f273ecf..fdd7424 100644
--- a/lib/edi/common.hpp
+++ b/lib/edi/common.hpp
@@ -39,7 +39,7 @@ struct frame_timestamp_t {
uint32_t utco = 0;
uint32_t tsta = 0xFFFFFF; // According to EN 300 797 Annex B
- bool valid() const;
+ bool is_valid() const;
std::string to_string() const;
std::time_t to_unix_epoch() const;
std::chrono::system_clock::time_point to_system_clock() const;
diff --git a/src/input/Edi.cpp b/src/input/Edi.cpp
index fc380d8..141641f 100644
--- a/src/input/Edi.cpp
+++ b/src/input/Edi.cpp
@@ -254,7 +254,7 @@ size_t Edi::readFrame(uint8_t *buffer, size_t size, std::time_t seconds, int utc
while (not m_pending_sti_frame.frame.empty()) {
if (m_pending_sti_frame.frame.size() == size) {
- if (m_pending_sti_frame.timestamp.valid()) {
+ if (m_pending_sti_frame.timestamp.is_valid()) {
auto ts_req = EdiDecoder::frame_timestamp_t::from_unix_epoch(seconds, utco, tsta);
ts_req += m_tist_delay;
const double offset = ts_req.diff_s(m_pending_sti_frame.timestamp);
@@ -324,7 +324,7 @@ size_t Edi::readFrame(uint8_t *buffer, size_t size, std::time_t seconds, int utc
m_is_prebuffering = true;
return 0;
}
- else if (not m_pending_sti_frame.timestamp.valid()) {
+ else if (not m_pending_sti_frame.timestamp.is_valid()) {
etiLog.level(warn) << "EDI input " << m_name <<
" invalid timestamp, ignoring";
memset(buffer, 0, size);