diff options
| author | Matthias P. Braendli <matthias.braendli@mpb.li> | 2024-08-20 15:01:25 +0200 | 
|---|---|---|
| committer | Matthias P. Braendli <matthias.braendli@mpb.li> | 2024-08-20 15:01:25 +0200 | 
| commit | 9b18142f07d9d87bd9eced148a245b1d37bda29a (patch) | |
| tree | f58fac90e103b6a9406a1347cd5e13b865f6de0a | |
| parent | 3986a7b40c0043ec33c2c2b7cf5c615c65599997 (diff) | |
| download | dabmux-9b18142f07d9d87bd9eced148a245b1d37bda29a.tar.gz dabmux-9b18142f07d9d87bd9eced148a245b1d37bda29a.tar.bz2 dabmux-9b18142f07d9d87bd9eced148a245b1d37bda29a.zip  | |
Update common: improve multicast input and incomplete timestamps
| -rw-r--r-- | ChangeLog | 3 | ||||
| -rw-r--r-- | doc/advanced.mux | 16 | ||||
| -rw-r--r-- | lib/Socket.cpp | 103 | ||||
| -rw-r--r-- | lib/Socket.h | 10 | ||||
| -rw-r--r-- | lib/edi/common.cpp | 6 | ||||
| -rw-r--r-- | lib/edi/common.hpp | 2 | ||||
| -rw-r--r-- | src/input/Edi.cpp | 4 | 
7 files changed, 112 insertions, 32 deletions
@@ -4,6 +4,9 @@ ODR-DabMux in this repository  upcoming:  	Make compatible with easydab again  	Remove odr-zmq2edi +	Fix timestamp issue with EDI streams that have seconds=0 +	Fix receiving multicast streams, when several multicast groups are +	on the same port.  2024-05-05: Matthias P. Braendli <matthias@mpb.li>  	(v4.5.0): diff --git a/doc/advanced.mux b/doc/advanced.mux index 35757cd..246f981 100644 --- a/doc/advanced.mux +++ b/doc/advanced.mux @@ -201,6 +201,22 @@ subchannels {          inputproto sti          inputuri "rtp://127.0.0.1:32010"      } +    sub-udp { +        type dabplus +        bitrate 96 +        id 1 +        protection 3 +        inputproto edi + +        ; Receive EDI/UDP unicast on port 32010 +        inputuri "udp://:32010" + +        ; Receive EDI/UDP multicast stream on group 239.10.11.12 port 32010 +        ;inputuri "udp://@239.10.11.12:32010" + +        ; Same, but specify local interface address 192.168.0.1, to select which local interface to use +        ;inputuri "udp://192.168.0.10@239.10.11.12:32010" +    }      sub-ri {          type dabplus          bitrate 96 diff --git a/lib/Socket.cpp b/lib/Socket.cpp index 1281066..bcffb07 100644 --- a/lib/Socket.cpp +++ b/lib/Socket.cpp @@ -24,7 +24,7 @@  #include "Socket.h" -#include <iostream> +#include <stdexcept>  #include <cstdio>  #include <cstring>  #include <cerrno> @@ -106,16 +106,20 @@ UDPSocket::UDPSocket(UDPSocket&& other)  {      m_sock = other.m_sock;      m_port = other.m_port; +    m_multicast_source = other.m_multicast_source;      other.m_port = 0;      other.m_sock = INVALID_SOCKET; +    other.m_multicast_source = "";  }  const UDPSocket& UDPSocket::operator=(UDPSocket&& other)  {      m_sock = other.m_sock;      m_port = other.m_port; +    m_multicast_source = other.m_multicast_source;      other.m_port = 0;      other.m_sock = INVALID_SOCKET; +    other.m_multicast_source = "";      return *this;  } @@ -144,6 +148,7 @@ void UDPSocket::reinit(int port, const std::string& name)          // No need to bind to a given port, creating the          // socket is enough          m_sock = ::socket(AF_INET, SOCK_DGRAM, 0); +        post_init();          return;      } @@ -180,6 +185,7 @@ void UDPSocket::reinit(int port, const std::string& name)          if (::bind(sfd, rp->ai_addr, rp->ai_addrlen) == 0) {              m_sock = sfd; +            post_init();              break;          } @@ -193,6 +199,14 @@ void UDPSocket::reinit(int port, const std::string& name)      }  } +void UDPSocket::post_init() { +    int pktinfo = 1; +    if (setsockopt(m_sock, IPPROTO_IP, IP_PKTINFO, &pktinfo, sizeof(pktinfo)) == SOCKET_ERROR) { +        throw runtime_error(string("Can't request pktinfo: ") + strerror(errno)); +    } + +} +  void UDPSocket::init_receive_multicast(int port, const string& local_if_addr, const string& mcastaddr)  {      if (m_sock != INVALID_SOCKET) { @@ -201,9 +215,10 @@ void UDPSocket::init_receive_multicast(int port, const string& local_if_addr, co      m_port = port;      m_sock = ::socket(AF_INET, SOCK_DGRAM, 0); +    post_init();      int reuse_setting = 1; -    if (setsockopt(m_sock, SOL_SOCKET, SO_REUSEADDR, &reuse_setting, sizeof(reuse_setting)) == -1) { +    if (setsockopt(m_sock, SOL_SOCKET, SO_REUSEADDR, &reuse_setting, sizeof(reuse_setting)) == SOCKET_ERROR) {          throw runtime_error("Can't reuse address");      } @@ -216,8 +231,8 @@ void UDPSocket::init_receive_multicast(int port, const string& local_if_addr, co          throw runtime_error(string("Could not bind: ") + strerror(errno));      } -    joinGroup(mcastaddr.c_str(), local_if_addr.c_str()); - +    m_multicast_source = mcastaddr; +    join_group(mcastaddr.c_str(), local_if_addr.c_str());  } @@ -240,16 +255,26 @@ UDPSocket::~UDPSocket()  UDPPacket UDPSocket::receive(size_t max_size)  { +    struct sockaddr_in addr; +    struct msghdr msg; +    struct iovec iov; +    constexpr size_t BUFFER_SIZE = 1024; +    char control_buffer[BUFFER_SIZE]; +    struct cmsghdr *cmsg; +      UDPPacket packet(max_size); -    socklen_t addrSize; -    addrSize = sizeof(*packet.address.as_sockaddr()); -    ssize_t ret = recvfrom(m_sock, -            packet.buffer.data(), -            packet.buffer.size(), -            0, -            packet.address.as_sockaddr(), -            &addrSize); +    memset(&msg, 0, sizeof(msg)); +    msg.msg_name = &addr; +    msg.msg_namelen = sizeof(addr); +    msg.msg_iov = &iov; +    iov.iov_base = packet.buffer.data(); +    iov.iov_len = packet.buffer.size(); +    msg.msg_iovlen = 1; +    msg.msg_control = control_buffer; +    msg.msg_controllen = sizeof(control_buffer); + +    ssize_t ret = recvmsg(m_sock, &msg, 0);      if (ret == SOCKET_ERROR) {          packet.buffer.resize(0); @@ -260,12 +285,42 @@ UDPPacket UDPSocket::receive(size_t max_size)          if (errno == EAGAIN or errno == EWOULDBLOCK)  #endif          { -            return 0; +            return packet;          }          throw runtime_error(string("Can't receive data: ") + strerror(errno));      } -    packet.buffer.resize(ret); +    struct in_pktinfo *pktinfo = nullptr; +    for (cmsg = CMSG_FIRSTHDR(&msg); cmsg != NULL; cmsg = CMSG_NXTHDR(&msg, cmsg)) { +        if (cmsg->cmsg_level == IPPROTO_IP && cmsg->cmsg_type == IP_PKTINFO) { +            pktinfo = (struct in_pktinfo *)CMSG_DATA(cmsg); +            break; +        } +    } + +    if (pktinfo) { +        char src_addr[INET_ADDRSTRLEN]; +        char dst_addr[INET_ADDRSTRLEN]; +        inet_ntop(AF_INET, &(addr.sin_addr), src_addr, INET_ADDRSTRLEN); +        inet_ntop(AF_INET, &(pktinfo->ipi_addr), dst_addr, INET_ADDRSTRLEN); +        //fprintf(stderr, "Received packet from %s to %s: %zu\n", src_addr, dst_addr, ret); + +        memcpy(&packet.address.addr, &addr, sizeof(addr)); + +        if (m_multicast_source.empty() or +                strcmp(dst_addr, m_multicast_source.c_str()) == 0) { +            packet.buffer.resize(ret); +        } +        else { +            // Ignore packet for different multicast group +            packet.buffer.resize(0); +        } +    } +    else { +        //fprintf(stderr, "No pktinfo: %zu\n", ret); +        packet.buffer.resize(ret); +    } +      return packet;  } @@ -297,7 +352,7 @@ void UDPSocket::send(const std::string& data, InetAddress destination)      }  } -void UDPSocket::joinGroup(const char* groupname, const char* if_addr) +void UDPSocket::join_group(const char* groupname, const char* if_addr)  {      ip_mreqn group;      if ((group.imr_multiaddr.s_addr = inet_addr(groupname)) == INADDR_NONE) { @@ -316,7 +371,7 @@ void UDPSocket::joinGroup(const char* groupname, const char* if_addr)      group.imr_ifindex = 0;      if (setsockopt(m_sock, IPPROTO_IP, IP_ADD_MEMBERSHIP, &group, sizeof(group))              == SOCKET_ERROR) { -        throw runtime_error(string("Can't join multicast group") + strerror(errno)); +        throw runtime_error(string("Can't join multicast group: ") + strerror(errno));      }  } @@ -324,12 +379,12 @@ void UDPSocket::setMulticastSource(const char* source_addr)  {      struct in_addr addr;      if (inet_aton(source_addr, &addr) == 0) { -        throw runtime_error(string("Can't parse source address") + strerror(errno)); +        throw runtime_error(string("Can't parse source address: ") + strerror(errno));      }      if (setsockopt(m_sock, IPPROTO_IP, IP_MULTICAST_IF, &addr, sizeof(addr))              == SOCKET_ERROR) { -        throw runtime_error(string("Can't set source address") + strerror(errno)); +        throw runtime_error(string("Can't set source address: ") + strerror(errno));      }  } @@ -392,11 +447,13 @@ vector<UDPReceiver::ReceivedPacket> UDPReceiver::receive(int timeout_ms)          for (size_t i = 0; i < m_sockets.size(); i++) {              if (fds[i].revents & POLLIN) {                  auto p = m_sockets[i].receive(2048); // This is larger than the usual MTU -                ReceivedPacket rp; -                rp.packetdata = move(p.buffer); -                rp.received_from = move(p.address); -                rp.port_received_on = m_sockets[i].getPort(); -                received.push_back(move(rp)); +                if (not p.buffer.empty()) { +                    ReceivedPacket rp; +                    rp.packetdata = std::move(p.buffer); +                    rp.received_from = std::move(p.address); +                    rp.port_received_on = m_sockets[i].getPort(); +                    received.push_back(std::move(rp)); +                }              }          } diff --git a/lib/Socket.h b/lib/Socket.h index 44f93d0..1320a64 100644 --- a/lib/Socket.h +++ b/lib/Socket.h @@ -2,7 +2,7 @@     Copyright (C) 2003, 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the     Queen in Right of Canada (Communications Research Center Canada) -   Copyright (C) 2022 +   Copyright (C) 2024     Matthias P. Braendli, matthias.braendli@mpb.li      http://www.opendigitalradio.org @@ -31,7 +31,7 @@  #include "ThreadsafeQueue.h"  #include <cstdlib>  #include <atomic> -#include <iostream> +#include <string>  #include <list>  #include <memory>  #include <thread> @@ -118,7 +118,6 @@ class UDPSocket          void send(const std::vector<uint8_t>& data, InetAddress destination);          void send(const std::string& data, InetAddress destination);          UDPPacket receive(size_t max_size); -        void joinGroup(const char* groupname, const char* if_addr = nullptr);          void setMulticastSource(const char* source_addr);          void setMulticastTTL(int ttl); @@ -130,9 +129,14 @@ class UDPSocket          SOCKET getNativeSocket() const;          int getPort() const; +    private: +        void join_group(const char* groupname, const char* if_addr = nullptr); +        void post_init(); +      protected:          SOCKET m_sock = INVALID_SOCKET;          int m_port = 0; +        std::string m_multicast_source = "";  };  /* UDP packet receiver supporting receiving from several ports at once */ diff --git a/lib/edi/common.cpp b/lib/edi/common.cpp index b314737..38eadf9 100644 --- a/lib/edi/common.cpp +++ b/lib/edi/common.cpp @@ -33,9 +33,9 @@ namespace EdiDecoder {  using namespace std; -bool frame_timestamp_t::valid() const +bool frame_timestamp_t::is_valid() const  { -    return tsta != 0xFFFFFF; +    return tsta != 0xFFFFFF and seconds != 0;  }  string frame_timestamp_t::to_string() const @@ -43,7 +43,7 @@ string frame_timestamp_t::to_string() const      const time_t seconds_in_unix_epoch = to_unix_epoch();      stringstream ss; -    if (valid()) { +    if (is_valid()) {          ss << "Timestamp: ";      }      else { diff --git a/lib/edi/common.hpp b/lib/edi/common.hpp index f273ecf..fdd7424 100644 --- a/lib/edi/common.hpp +++ b/lib/edi/common.hpp @@ -39,7 +39,7 @@ struct frame_timestamp_t {      uint32_t utco = 0;      uint32_t tsta = 0xFFFFFF; // According to EN 300 797 Annex B -    bool valid() const; +    bool is_valid() const;      std::string to_string() const;      std::time_t to_unix_epoch() const;      std::chrono::system_clock::time_point to_system_clock() const; diff --git a/src/input/Edi.cpp b/src/input/Edi.cpp index fc380d8..141641f 100644 --- a/src/input/Edi.cpp +++ b/src/input/Edi.cpp @@ -254,7 +254,7 @@ size_t Edi::readFrame(uint8_t *buffer, size_t size, std::time_t seconds, int utc          while (not m_pending_sti_frame.frame.empty()) {              if (m_pending_sti_frame.frame.size() == size) { -                if (m_pending_sti_frame.timestamp.valid()) { +                if (m_pending_sti_frame.timestamp.is_valid()) {                      auto ts_req = EdiDecoder::frame_timestamp_t::from_unix_epoch(seconds, utco, tsta);                      ts_req += m_tist_delay;                      const double offset = ts_req.diff_s(m_pending_sti_frame.timestamp); @@ -324,7 +324,7 @@ size_t Edi::readFrame(uint8_t *buffer, size_t size, std::time_t seconds, int utc              m_is_prebuffering = true;              return 0;          } -        else if (not m_pending_sti_frame.timestamp.valid()) { +        else if (not m_pending_sti_frame.timestamp.is_valid()) {              etiLog.level(warn) << "EDI input " << m_name <<                  " invalid timestamp, ignoring";              memset(buffer, 0, size);  | 
