From 43f4a3a2a695c303bd4fdfbd7fec6def29284f2e Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Tue, 28 May 2019 16:56:43 +0200 Subject: Unify Socket abstractions --- src/dabOutput/edi/Transport.cpp | 24 ++++++++---------------- 1 file changed, 8 insertions(+), 16 deletions(-) (limited to 'src/dabOutput/edi/Transport.cpp') diff --git a/src/dabOutput/edi/Transport.cpp b/src/dabOutput/edi/Transport.cpp index d99e987..6d3950f 100644 --- a/src/dabOutput/edi/Transport.cpp +++ b/src/dabOutput/edi/Transport.cpp @@ -69,23 +69,17 @@ Sender::Sender(const configuration_t& conf) : for (const auto& edi_dest : m_conf.destinations) { if (const auto udp_dest = dynamic_pointer_cast(edi_dest)) { - auto udp_socket = std::make_shared(udp_dest->source_port); + auto udp_socket = std::make_shared(udp_dest->source_port); if (not udp_dest->source_addr.empty()) { - int err = udp_socket->setMulticastSource(udp_dest->source_addr.c_str()); - if (err) { - throw runtime_error("EDI socket set source failed!"); - } - err = udp_socket->setMulticastTTL(udp_dest->ttl); - if (err) { - throw runtime_error("EDI socket set TTL failed!"); - } + udp_socket->setMulticastSource(udp_dest->source_addr.c_str()); + udp_socket->setMulticastTTL(udp_dest->ttl); } udp_sockets.emplace(udp_dest.get(), udp_socket); } else if (auto tcp_dest = dynamic_pointer_cast(edi_dest)) { - auto dispatcher = make_shared(tcp_dest->max_frames_queued); + auto dispatcher = make_shared(tcp_dest->max_frames_queued); dispatcher->start(tcp_dest->listen_port, "0.0.0.0"); tcp_dispatchers.emplace(tcp_dest.get(), dispatcher); } @@ -129,9 +123,8 @@ void Sender::write(const TagPacket& tagpacket) for (const auto& edi_frag : edi_fragments) { for (auto& dest : m_conf.destinations) { if (const auto& udp_dest = dynamic_pointer_cast(dest)) { - InetAddress addr; - addr.setAddress(udp_dest->dest_addr.c_str()); - addr.setPort(m_conf.dest_port); + Socket::InetAddress addr; + addr.resolveUdpDestination(udp_dest->dest_addr, m_conf.dest_port); udp_sockets.at(udp_dest.get())->send(edi_frag, addr); } @@ -158,9 +151,8 @@ void Sender::write(const TagPacket& tagpacket) // Send over ethernet for (auto& dest : m_conf.destinations) { if (const auto& udp_dest = dynamic_pointer_cast(dest)) { - InetAddress addr; - addr.setAddress(udp_dest->dest_addr.c_str()); - addr.setPort(m_conf.dest_port); + Socket::InetAddress addr; + addr.resolveUdpDestination(udp_dest->dest_addr, m_conf.dest_port); udp_sockets.at(udp_dest.get())->send(af_packet, addr); } -- cgit v1.2.3 From 8b0d4b647a3ac2e21e6a8e7a902eea3ae462e213 Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Fri, 7 Jun 2019 10:14:51 +0200 Subject: Work on STI-D/EDI input --- Makefile.am | 21 +++-- doc/advanced.mux | 17 ++-- doc/example.mux | 3 +- lib/ReedSolomon.cpp | 116 ++++++++++++++++++++++++++ lib/ReedSolomon.h | 56 +++++++++++++ lib/Socket.cpp | 165 ++++++++++++++++++++++++++++--------- lib/Socket.h | 32 +++++++- lib/ThreadsafeQueue.h | 176 +++++++++++++++++++++++++++++++++++++++ src/ConfigParser.cpp | 61 ++++++++------ src/DabMux.cpp | 2 +- src/ReedSolomon.cpp | 116 -------------------------- src/ReedSolomon.h | 56 ------------- src/ThreadsafeQueue.h | 178 ---------------------------------------- src/dabOutput/edi/Config.h | 9 +- src/dabOutput/edi/PFT.cpp | 2 +- src/dabOutput/edi/Transport.cpp | 43 ++++++---- src/dabOutput/edi/Transport.h | 4 +- src/input/Udp.cpp | 8 +- 18 files changed, 614 insertions(+), 451 deletions(-) create mode 100644 lib/ReedSolomon.cpp create mode 100644 lib/ReedSolomon.h create mode 100644 lib/ThreadsafeQueue.h delete mode 100644 src/ReedSolomon.cpp delete mode 100644 src/ReedSolomon.h delete mode 100644 src/ThreadsafeQueue.h (limited to 'src/dabOutput/edi/Transport.cpp') diff --git a/Makefile.am b/Makefile.am index 2773bbe..80d24e0 100644 --- a/Makefile.am +++ b/Makefile.am @@ -69,6 +69,8 @@ odr_dabmux_SOURCES =src/DabMux.cpp \ src/input/File.h \ src/input/Udp.cpp \ src/input/Udp.h \ + src/input/Edi.cpp \ + src/input/Edi.h \ src/dabOutput/dabOutput.h \ src/dabOutput/dabOutputFile.cpp \ src/dabOutput/dabOutputFifo.cpp \ @@ -107,11 +109,8 @@ odr_dabmux_SOURCES =src/DabMux.cpp \ src/MuxElements.cpp \ src/MuxElements.h \ src/PcDebug.h \ - src/ReedSolomon.h \ - src/ReedSolomon.cpp \ src/RemoteControl.cpp \ src/RemoteControl.h \ - src/ThreadsafeQueue.h \ src/crc.h \ src/crc.c \ src/fig/FIG.h \ @@ -161,8 +160,19 @@ odr_dabmux_SOURCES =src/DabMux.cpp \ src/PrbsGenerator.h \ src/utils.cpp \ src/utils.h \ + lib/edi/STIDecoder.cpp \ + lib/edi/STIDecoder.h \ + lib/edi/STIWriter.cpp \ + lib/edi/STIWriter.h \ + lib/edi/PFT.cpp \ + lib/edi/PFT.h \ + lib/edi/common.cpp \ + lib/edi/common.h \ + lib/ReedSolomon.h \ + lib/ReedSolomon.cpp \ lib/Socket.h \ lib/Socket.cpp \ + lib/ThreadsafeQueue.h \ lib/zmq.hpp \ $(lib_fec_sources) \ $(lib_charset_sources) @@ -201,14 +211,15 @@ odr_zmq2edi_SOURCES = src/zmq2edi/zmq2edi.cpp \ src/dabOutput/edi/TagPacket.h \ src/dabOutput/edi/Transport.cpp \ src/dabOutput/edi/Transport.h \ - src/ReedSolomon.h \ - src/ReedSolomon.cpp \ src/Log.h \ src/Log.cpp \ src/crc.h \ src/crc.c \ + lib/ReedSolomon.h \ + lib/ReedSolomon.cpp \ lib/Socket.h \ lib/Socket.cpp \ + lib/ThreadsafeQueue.h \ lib/zmq.hpp \ $(lib_fec_sources) diff --git a/doc/advanced.mux b/doc/advanced.mux index fb67b82..b9cec05 100644 --- a/doc/advanced.mux +++ b/doc/advanced.mux @@ -163,7 +163,8 @@ subchannels { sub-fu { type audio ; example file input - inputfile "funk.mp2" + inputproto zmq + inputuri "funk.mp2" nonblock false bitrate 128 id 10 @@ -188,7 +189,8 @@ subchannels { ; Receive STI-D(LI) carried in STI(PI, X) inside RTP using UDP. ; This is intended to be compatible with AVT audio encoders. ; EXPERIMENTAL! - inputfile "sti-rtp://127.0.0.1:32010" + inputproto sti + inputuri "rtp://127.0.0.1:32010" bitrate 96 id 3 protection 3 @@ -196,11 +198,12 @@ subchannels { sub-ri { type dabplus ; example file input - ;inputfile "rick.dabp" + ;inputuri "rick.dabp" ; example zmq input: ; Accepts connections to port 9000 from any interface. ; Use ODR-AudioEnc as encoder - inputfile "tcp://*:9000" + inputproto zmq + inputuri "tcp://*:9000" bitrate 96 id 1 protection 1 @@ -256,7 +259,8 @@ subchannels { ; for audio types, you can use the ZeroMQ input (if compiled in) ; with the following configuration in combination with ; Toolame-DAB - inputfile "tcp://*:9001" + inputproto zmq + inputuri "tcp://*:9001" bitrate 96 id 1 protection 1 @@ -273,7 +277,8 @@ subchannels { type data ; Use the default PRBS polynomial. - inputfile "prbs://" + inputproto prbs + inputuri "prbs://" ; To use another polynomial, set it in the url as hexadecimal ; The default polynomial is G(x) = x^20 + x^17 + 1, represented as diff --git a/doc/example.mux b/doc/example.mux index 6c2bc18..31e072d 100644 --- a/doc/example.mux +++ b/doc/example.mux @@ -171,7 +171,8 @@ subchannels { type dabplus ; Accepts connections to port 9000 from any interface. ; Use ODR-AudioEnc as encoder - inputfile "tcp://*:9000" + inputproto zmq + inputuri "tcp://*:9000" bitrate 96 id 1 protection 3 diff --git a/lib/ReedSolomon.cpp b/lib/ReedSolomon.cpp new file mode 100644 index 0000000..38d8ea8 --- /dev/null +++ b/lib/ReedSolomon.cpp @@ -0,0 +1,116 @@ +/* + Copyright (C) 2005, 2006, 2007, 2008, 2009 Her Majesty the Queen in Right + of Canada (Communications Research Center Canada) + + Copyright (C) 2016 + Matthias P. Braendli, matthias.braendli@mpb.li + + http://www.opendigitalradio.org + */ +/* + This file is part of ODR-DabMux. + + ODR-DabMux is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as + published by the Free Software Foundation, either version 3 of the + License, or (at your option) any later version. + + ODR-DabMux is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with ODR-DabMux. If not, see . + */ + +#include "ReedSolomon.h" +#include +#include +#include +#include +#include // For galois.h ... +#include // For memcpy + +extern "C" { +#include "fec/fec.h" +} +#include + +#define SYMSIZE 8 + + +ReedSolomon::ReedSolomon(int N, int K, bool reverse, int gfpoly, int firstRoot, int primElem) +{ + setReverse(reverse); + + m_N = N; + m_K = K; + + const int symsize = SYMSIZE; + const int nroots = N - K; // For EDI PFT, this must be 48 + const int pad = ((1 << symsize) - 1) - N; // is 255-N + + rsData = init_rs_char(symsize, gfpoly, firstRoot, primElem, nroots, pad); + + if (rsData == nullptr) { + std::stringstream ss; + ss << "Invalid Reed-Solomon parameters! " << + "N=" << N << " ; K=" << K << " ; pad=" << pad; + throw std::invalid_argument(ss.str()); + } +} + + +ReedSolomon::~ReedSolomon() +{ + free_rs_char(rsData); +} + + +void ReedSolomon::setReverse(bool state) +{ + reverse = state; +} + + +int ReedSolomon::encode(void* data, void* fec, size_t size) +{ + uint8_t* input = reinterpret_cast(data); + uint8_t* output = reinterpret_cast(fec); + int ret = 0; + + if (reverse) { + std::vector buffer(m_N); + + memcpy(&buffer[0], input, m_K); + memcpy(&buffer[m_K], output, m_N - m_K); + + ret = decode_rs_char(rsData, &buffer[0], nullptr, 0); + if ((ret != 0) && (ret != -1)) { + memcpy(input, &buffer[0], m_K); + memcpy(output, &buffer[m_K], m_N - m_K); + } + } + else { + encode_rs_char(rsData, input, output); + } + + return ret; +} + + +int ReedSolomon::encode(void* data, size_t size) +{ + uint8_t* input = reinterpret_cast(data); + int ret = 0; + + if (reverse) { + ret = decode_rs_char(rsData, input, nullptr, 0); + } + else { + encode_rs_char(rsData, input, &input[m_K]); + } + + return ret; +} diff --git a/lib/ReedSolomon.h b/lib/ReedSolomon.h new file mode 100644 index 0000000..abcef62 --- /dev/null +++ b/lib/ReedSolomon.h @@ -0,0 +1,56 @@ +/* + Copyright (C) 2005, 2006, 2007, 2008, 2009 Her Majesty the Queen in Right + of Canada (Communications Research Center Canada) + + Copyright (C) 2016 + Matthias P. Braendli, matthias.braendli@mpb.li + + http://www.opendigitalradio.org + */ +/* + This file is part of ODR-DabMux. + + ODR-DabMux is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as + published by the Free Software Foundation, either version 3 of the + License, or (at your option) any later version. + + ODR-DabMux is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with ODR-DabMux. If not, see . + */ + +#pragma once + +#ifdef HAVE_CONFIG_H +# include +#endif + +#include + +class ReedSolomon +{ +public: + ReedSolomon(int N, int K, + bool reverse = false, + int gfpoly = 0x11d, int firstRoot = 0, int primElem = 1); + ReedSolomon(const ReedSolomon& other) = delete; + ReedSolomon operator=(const ReedSolomon& other) = delete; + ~ReedSolomon(); + + void setReverse(bool state); + int encode(void* data, void* fec, size_t size); + int encode(void* data, size_t size); + +private: + int m_N; + int m_K; + + void* rsData; + bool reverse; +}; + diff --git a/lib/Socket.cpp b/lib/Socket.cpp index fe3df44..9b404eb 100644 --- a/lib/Socket.cpp +++ b/lib/Socket.cpp @@ -42,13 +42,10 @@ void InetAddress::resolveUdpDestination(const std::string& destination, int port struct addrinfo hints; memset(&hints, 0, sizeof(struct addrinfo)); - hints.ai_family = AF_UNSPEC; /* Allow IPv4 or IPv6 */ + hints.ai_family = AF_INET; hints.ai_socktype = SOCK_DGRAM; /* Datagram socket */ hints.ai_flags = 0; hints.ai_protocol = 0; - hints.ai_canonname = nullptr; - hints.ai_addr = nullptr; - hints.ai_next = nullptr; struct addrinfo *result, *rp; int s = getaddrinfo(destination.c_str(), service, &hints, &result); @@ -77,19 +74,19 @@ UDPPacket::UDPPacket(size_t initSize) : UDPSocket::UDPSocket() : - listenSocket(INVALID_SOCKET) + m_sock(INVALID_SOCKET) { reinit(0, ""); } UDPSocket::UDPSocket(int port) : - listenSocket(INVALID_SOCKET) + m_sock(INVALID_SOCKET) { reinit(port, ""); } UDPSocket::UDPSocket(int port, const std::string& name) : - listenSocket(INVALID_SOCKET) + m_sock(INVALID_SOCKET) { reinit(port, name); } @@ -97,16 +94,28 @@ UDPSocket::UDPSocket(int port, const std::string& name) : void UDPSocket::setBlocking(bool block) { - int res = fcntl(listenSocket, F_SETFL, block ? 0 : O_NONBLOCK); + int res = fcntl(m_sock, F_SETFL, block ? 0 : O_NONBLOCK); if (res == -1) { throw runtime_error(string("Can't change blocking state of socket: ") + strerror(errno)); } } +void UDPSocket::reinit(int port) +{ + return reinit(port, ""); +} + void UDPSocket::reinit(int port, const std::string& name) { - if (listenSocket != INVALID_SOCKET) { - ::close(listenSocket); + if (m_sock != INVALID_SOCKET) { + ::close(m_sock); + } + + if (port == 0) { + // No need to bind to a given port, creating the + // socket is enough + m_sock = ::socket(AF_INET, SOCK_DGRAM, 0); + return; } char service[NI_MAXSERV]; @@ -114,7 +123,7 @@ void UDPSocket::reinit(int port, const std::string& name) struct addrinfo hints; memset(&hints, 0, sizeof(struct addrinfo)); - hints.ai_family = AF_UNSPEC; /* Allow IPv4 or IPv6 */ + hints.ai_family = AF_INET; hints.ai_socktype = SOCK_DGRAM; /* Datagram socket */ hints.ai_flags = AI_PASSIVE; /* For wildcard IP address */ hints.ai_protocol = 0; /* Any protocol */ @@ -141,7 +150,7 @@ void UDPSocket::reinit(int port, const std::string& name) } if (::bind(sfd, rp->ai_addr, rp->ai_addrlen) == 0) { - listenSocket = sfd; + m_sock = sfd; break; } @@ -157,17 +166,17 @@ void UDPSocket::reinit(int port, const std::string& name) void UDPSocket::close() { - if (listenSocket != INVALID_SOCKET) { - ::close(listenSocket); + if (m_sock != INVALID_SOCKET) { + ::close(m_sock); } - listenSocket = INVALID_SOCKET; + m_sock = INVALID_SOCKET; } UDPSocket::~UDPSocket() { - if (listenSocket != INVALID_SOCKET) { - ::close(listenSocket); + if (m_sock != INVALID_SOCKET) { + ::close(m_sock); } } @@ -177,7 +186,7 @@ UDPPacket UDPSocket::receive(size_t max_size) UDPPacket packet(max_size); socklen_t addrSize; addrSize = sizeof(*packet.address.as_sockaddr()); - ssize_t ret = recvfrom(listenSocket, + ssize_t ret = recvfrom(m_sock, packet.buffer.data(), packet.buffer.size(), 0, @@ -186,7 +195,13 @@ UDPPacket UDPSocket::receive(size_t max_size) if (ret == SOCKET_ERROR) { packet.buffer.resize(0); + + // This suppresses the -Wlogical-op warning +#if EAGAIN == EWOULDBLOCK if (errno == EAGAIN) { +#else + if (errno == EAGAIN or errno == EWOULDBLOCK) { +#endif return 0; } throw runtime_error(string("Can't receive data: ") + strerror(errno)); @@ -198,24 +213,24 @@ UDPPacket UDPSocket::receive(size_t max_size) void UDPSocket::send(UDPPacket& packet) { - int ret = sendto(listenSocket, packet.buffer.data(), packet.buffer.size(), 0, + const int ret = sendto(m_sock, packet.buffer.data(), packet.buffer.size(), 0, packet.address.as_sockaddr(), sizeof(*packet.address.as_sockaddr())); if (ret == SOCKET_ERROR && errno != ECONNREFUSED) { - throw runtime_error(string("Can't send UDP packet") + strerror(errno)); + throw runtime_error(string("Can't send UDP packet: ") + strerror(errno)); } } void UDPSocket::send(const std::vector& data, InetAddress destination) { - int ret = sendto(listenSocket, &data[0], data.size(), 0, + const int ret = sendto(m_sock, data.data(), data.size(), 0, destination.as_sockaddr(), sizeof(*destination.as_sockaddr())); if (ret == SOCKET_ERROR && errno != ECONNREFUSED) { - throw runtime_error(string("Can't send UDP packet") + strerror(errno)); + throw runtime_error(string("Can't send UDP packet: ") + strerror(errno)); } } -void UDPSocket::joinGroup(char* groupname) +void UDPSocket::joinGroup(const char* groupname, const char* if_addr) { ip_mreqn group; if ((group.imr_multiaddr.s_addr = inet_addr(groupname)) == INADDR_NONE) { @@ -224,9 +239,15 @@ void UDPSocket::joinGroup(char* groupname) if (!IN_MULTICAST(ntohl(group.imr_multiaddr.s_addr))) { throw runtime_error("Group name is not a multicast address"); } - group.imr_address.s_addr = htons(INADDR_ANY);; + + if (if_addr) { + group.imr_address.s_addr = inet_addr(if_addr); + } + else { + group.imr_address.s_addr = htons(INADDR_ANY); + } group.imr_ifindex = 0; - if (setsockopt(listenSocket, IPPROTO_IP, IP_ADD_MEMBERSHIP, &group, sizeof(group)) + if (setsockopt(m_sock, IPPROTO_IP, IP_ADD_MEMBERSHIP, &group, sizeof(group)) == SOCKET_ERROR) { throw runtime_error(string("Can't join multicast group") + strerror(errno)); } @@ -239,7 +260,7 @@ void UDPSocket::setMulticastSource(const char* source_addr) throw runtime_error(string("Can't parse source address") + strerror(errno)); } - if (setsockopt(listenSocket, IPPROTO_IP, IP_MULTICAST_IF, &addr, sizeof(addr)) + if (setsockopt(m_sock, IPPROTO_IP, IP_MULTICAST_IF, &addr, sizeof(addr)) == SOCKET_ERROR) { throw runtime_error(string("Can't set source address") + strerror(errno)); } @@ -247,26 +268,82 @@ void UDPSocket::setMulticastSource(const char* source_addr) void UDPSocket::setMulticastTTL(int ttl) { - if (setsockopt(listenSocket, IPPROTO_IP, IP_MULTICAST_TTL, &ttl, sizeof(ttl)) + if (setsockopt(m_sock, IPPROTO_IP, IP_MULTICAST_TTL, &ttl, sizeof(ttl)) == SOCKET_ERROR) { throw runtime_error(string("Can't set multicast ttl") + strerror(errno)); } } +UDPReceiver::~UDPReceiver() { + m_stop = true; + m_sock.close(); + if (m_thread.joinable()) { + m_thread.join(); + } +} + +void UDPReceiver::start(int port, const string& bindto, const string& mcastaddr, size_t max_packets_queued) { + m_port = port; + m_bindto = bindto; + m_mcastaddr = mcastaddr; + m_max_packets_queued = max_packets_queued; + m_thread = std::thread(&UDPReceiver::m_run, this); +} -TCPSocket::TCPSocket() +std::vector UDPReceiver::get_packet_buffer() { - if ((m_sock = socket(PF_INET, SOCK_STREAM, 0)) < 0) { - throw std::runtime_error("Can't create TCP socket"); + if (m_stop) { + throw runtime_error("UDP Receiver not running"); } -#if defined(HAVE_SO_NOSIGPIPE) - int val = 1; - if (setsockopt(m_sock, SOL_SOCKET, SO_NOSIGPIPE, - &val, sizeof(val)) < 0) { - throw std::runtime_error("Can't set SO_NOSIGPIPE"); + UDPPacket p; + m_packets.wait_and_pop(p); + + return p.buffer; +} + +void UDPReceiver::m_run() +{ + // Ensure that stop is set to true in case of exception or return + struct SetStopOnDestruct { + SetStopOnDestruct(atomic& stop) : m_stop(stop) {} + ~SetStopOnDestruct() { m_stop = true; } + private: atomic& m_stop; + } autoSetStop(m_stop); + + if (IN_MULTICAST(ntohl(inet_addr(m_mcastaddr.c_str())))) { + m_sock.reinit(m_port, m_mcastaddr); + m_sock.setMulticastSource(m_bindto.c_str()); + m_sock.joinGroup(m_mcastaddr.c_str(), m_bindto.c_str()); } -#endif + else { + m_sock.reinit(m_port, m_bindto); + } + + while (not m_stop) { + constexpr size_t packsize = 8192; + try { + auto packet = m_sock.receive(packsize); + if (packet.buffer.size() == packsize) { + // TODO replace fprintf + fprintf(stderr, "Warning, possible UDP truncation\n"); + } + + // If this blocks, the UDP socket will lose incoming packets + m_packets.push_wait_if_full(packet, m_max_packets_queued); + } + catch (const std::runtime_error& e) { + // TODO replace fprintf + // TODO handle intr + fprintf(stderr, "Socket error: %s\n", e.what()); + m_stop = true; + } + } +} + + +TCPSocket::TCPSocket() +{ } TCPSocket::~TCPSocket() @@ -314,7 +391,7 @@ void TCPSocket::connect(const std::string& hostname, int port) /* Obtain address(es) matching host/port */ struct addrinfo hints; memset(&hints, 0, sizeof(struct addrinfo)); - hints.ai_family = AF_UNSPEC; /* Allow IPv4 or IPv6 */ + hints.ai_family = AF_INET; hints.ai_socktype = SOCK_STREAM; hints.ai_flags = 0; hints.ai_protocol = 0; @@ -376,7 +453,7 @@ void TCPSocket::listen(int port, const string& name) struct addrinfo hints; memset(&hints, 0, sizeof(struct addrinfo)); - hints.ai_family = AF_UNSPEC; + hints.ai_family = AF_INET; hints.ai_socktype = SOCK_STREAM; hints.ai_flags = AI_PASSIVE; /* For wildcard IP address */ hints.ai_protocol = 0; @@ -410,6 +487,14 @@ void TCPSocket::listen(int port, const string& name) freeaddrinfo(result); +#if defined(HAVE_SO_NOSIGPIPE) + int val = 1; + if (setsockopt(m_sock, SOL_SOCKET, SO_NOSIGPIPE, + &val, sizeof(val)) < 0) { + throw std::runtime_error("Can't set SO_NOSIGPIPE"); + } +#endif + if (rp == nullptr) { throw runtime_error("Could not bind"); } @@ -683,7 +768,9 @@ TCPDataDispatcher::~TCPDataDispatcher() m_running = false; m_connections.clear(); m_listener_socket.close(); - m_listener_thread.join(); + if (m_listener_thread.joinable()) { + m_listener_thread.join(); + } } void TCPDataDispatcher::start(int port, const string& address) diff --git a/lib/Socket.h b/lib/Socket.h index 82ff5ad..2393584 100644 --- a/lib/Socket.h +++ b/lib/Socket.h @@ -104,13 +104,14 @@ class UDPSocket const UDPSocket& operator=(const UDPSocket& other) = delete; /** Close the already open socket, and create a new one. Throws a runtime_error on error. */ + void reinit(int port); void reinit(int port, const std::string& name); void close(void); void send(UDPPacket& packet); void send(const std::vector& data, InetAddress destination); UDPPacket receive(size_t max_size); - void joinGroup(char* groupname); + void joinGroup(const char* groupname, const char* if_addr = nullptr); void setMulticastSource(const char* source_addr); void setMulticastTTL(int ttl); @@ -120,9 +121,36 @@ class UDPSocket void setBlocking(bool block); protected: - SOCKET listenSocket; + SOCKET m_sock; }; +/* Threaded UDP receiver */ +class UDPReceiver { + public: + UDPReceiver() : m_port(0), m_thread(), m_stop(false), m_packets() {} + ~UDPReceiver(); + UDPReceiver(const UDPReceiver&) = delete; + UDPReceiver operator=(const UDPReceiver&) = delete; + + // Start the receiver in a separate thread + void start(int port, const std::string& bindto, const std::string& mcastaddr, size_t max_packets_queued); + + // Get the data contained in a UDP packet, blocks if none available + // In case of error, throws a runtime_error + std::vector get_packet_buffer(void); + + private: + void m_run(void); + + int m_port; + std::string m_bindto; + std::string m_mcastaddr; + size_t m_max_packets_queued; + std::thread m_thread; + std::atomic m_stop; + ThreadsafeQueue m_packets; + UDPSocket m_sock; +}; class TCPSocket { public: diff --git a/lib/ThreadsafeQueue.h b/lib/ThreadsafeQueue.h new file mode 100644 index 0000000..62f4c96 --- /dev/null +++ b/lib/ThreadsafeQueue.h @@ -0,0 +1,176 @@ +/* + Copyright (C) 2007, 2008, 2009, 2010, 2011 Her Majesty the Queen in + Right of Canada (Communications Research Center Canada) + + Copyright (C) 2018 + Matthias P. Braendli, matthias.braendli@mpb.li + + An implementation for a threadsafe queue, depends on C++11 + + When creating a ThreadsafeQueue, one can specify the minimal number + of elements it must contain before it is possible to take one + element out. + */ +/* + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see . + */ + +#pragma once + +#include +#include +#include +#include + +/* This queue is meant to be used by two threads. One producer + * that pushes elements into the queue, and one consumer that + * retrieves the elements. + * + * The queue can make the consumer block until an element + * is available, or a wakeup requested. + */ + +/* Class thrown by blocking pop to tell the consumer + * that there's a wakeup requested. */ +class ThreadsafeQueueWakeup {}; + +template +class ThreadsafeQueue +{ +public: + /* Push one element into the queue, and notify another thread that + * might be waiting. + * + * returns the new queue size. + */ + size_t push(T const& val) + { + std::unique_lock lock(the_mutex); + the_queue.push(val); + size_t queue_size = the_queue.size(); + lock.unlock(); + + the_rx_notification.notify_one(); + + return queue_size; + } + + size_t push(T&& val) + { + std::unique_lock lock(the_mutex); + the_queue.emplace(std::move(val)); + size_t queue_size = the_queue.size(); + lock.unlock(); + + the_rx_notification.notify_one(); + + return queue_size; + } + + /* Push one element into the queue, but wait until the + * queue size goes below the threshold. + * + * Notify waiting thread. + * + * returns the new queue size. + */ + size_t push_wait_if_full(T const& val, size_t threshold) + { + std::unique_lock lock(the_mutex); + while (the_queue.size() >= threshold) { + the_tx_notification.wait(lock); + } + the_queue.push(val); + size_t queue_size = the_queue.size(); + lock.unlock(); + + the_rx_notification.notify_one(); + + return queue_size; + } + + /* Trigger a wakeup event on a blocking consumer, which + * will receive a ThreadsafeQueueWakeup exception. + */ + void trigger_wakeup(void) + { + std::unique_lock lock(the_mutex); + wakeup_requested = true; + lock.unlock(); + the_rx_notification.notify_one(); + } + + /* Send a notification for the receiver thread */ + void notify(void) + { + the_rx_notification.notify_one(); + } + + bool empty() const + { + std::unique_lock lock(the_mutex); + return the_queue.empty(); + } + + size_t size() const + { + std::unique_lock lock(the_mutex); + return the_queue.size(); + } + + bool try_pop(T& popped_value) + { + std::unique_lock lock(the_mutex); + if (the_queue.empty()) { + return false; + } + + popped_value = the_queue.front(); + the_queue.pop(); + + lock.unlock(); + the_tx_notification.notify_one(); + + return true; + } + + void wait_and_pop(T& popped_value, size_t prebuffering = 1) + { + std::unique_lock lock(the_mutex); + while (the_queue.size() < prebuffering and + not wakeup_requested) { + the_rx_notification.wait(lock); + } + + if (wakeup_requested) { + wakeup_requested = false; + throw ThreadsafeQueueWakeup(); + } + else { + std::swap(popped_value, the_queue.front()); + the_queue.pop(); + + lock.unlock(); + the_tx_notification.notify_one(); + } + } + +private: + std::queue the_queue; + mutable std::mutex the_mutex; + std::condition_variable the_rx_notification; + std::condition_variable the_tx_notification; + bool wakeup_requested = false; +}; + diff --git a/src/ConfigParser.cpp b/src/ConfigParser.cpp index fb49efc..3142bb3 100644 --- a/src/ConfigParser.cpp +++ b/src/ConfigParser.cpp @@ -40,6 +40,7 @@ #include "utils.h" #include "DabMux.h" #include "ManagementServer.h" +#include "input/Edi.h" #include "input/Prbs.h" #include "input/Zmq.h" #include "input/File.h" @@ -876,34 +877,46 @@ static void setup_subchannel_from_ptree(shared_ptr& subchan, type = pt.get("type"); } catch (const ptree_error &e) { - stringstream ss; - ss << "Subchannel with uid " << subchanuid << " has no type defined!"; - throw runtime_error(ss.str()); + throw runtime_error("Subchannel with uid " + subchanuid + " has no type defined!"); } - /* Both inputfile and inputuri are supported, and are equivalent. - * inputuri has precedence + /* Up to v2.3.1, both inputfile and inputuri are supported, and are + * equivalent. inputuri has precedence. + * + * After that, either inputfile or the (inputproto, inputuri) pair must be given, but not both. */ string inputUri = pt.get("inputuri", ""); + string proto = pt.get("inputproto", ""); - if (inputUri == "") { + if (inputUri.empty() and proto.empty()) { try { + /* Old approach, derives proto from scheme used in the URL. + * This makes it impossible to distinguish between ZMQ tcp:// and + * EDI tcp:// + */ inputUri = pt.get("inputfile"); + size_t protopos = inputUri.find("://"); + + if (protopos == string::npos) { + proto = "file"; + } + else { + proto = inputUri.substr(0, protopos); + + if (proto == "tcp" or proto == "epgm" or proto == "ipc") { + proto = "zmq"; + } + else if (proto == "sti-rtp") { + proto = "sti"; + } + } } catch (const ptree_error &e) { - stringstream ss; - ss << "Subchannel with uid " << subchanuid << " has no inputUri defined!"; - throw runtime_error(ss.str()); + throw runtime_error("Subchannel with uid " + subchanuid + " has no input defined!"); } } - - string proto; - size_t protopos = inputUri.find("://"); - if (protopos == string::npos) { - proto = "file"; - } - else { - proto = inputUri.substr(0, protopos); + else if (inputUri.empty() or proto.empty()) { + throw runtime_error("Must define both inputuri and inputproto for uid " + subchanuid); } subchan->inputUri = inputUri; @@ -928,7 +941,7 @@ static void setup_subchannel_from_ptree(shared_ptr& subchan, throw logic_error("Incomplete handling of file input"); } } - else if (proto == "tcp" or proto == "epgm" or proto == "ipc") { + else if (proto == "zmq") { auto zmqconfig = setup_zmq_input(pt, subchanuid); if (type == "audio") { @@ -941,15 +954,11 @@ static void setup_subchannel_from_ptree(shared_ptr& subchan, rcs.enrol(inzmq.get()); subchan->input = inzmq; } - - if (proto == "epgm") { - etiLog.level(warn) << "Using untested epgm:// zeromq input"; - } - else if (proto == "ipc") { - etiLog.level(warn) << "Using untested ipc:// zeromq input"; - } } - else if (proto == "sti-rtp") { + else if (proto == "edi") { + subchan->input = make_shared(); + } + else if (proto == "stp") { subchan->input = make_shared(); } else { diff --git a/src/DabMux.cpp b/src/DabMux.cpp index d749ed3..e726fd3 100644 --- a/src/DabMux.cpp +++ b/src/DabMux.cpp @@ -304,7 +304,7 @@ int main(int argc, char *argv[]) edi_conf.destinations.push_back(dest); } else if (proto == "tcp") { - auto dest = make_shared(); + auto dest = make_shared(); dest->listen_port = pt_edi_dest.second.get("listenport"); dest->max_frames_queued = pt_edi_dest.second.get("max_frames_queued", 500); edi_conf.destinations.push_back(dest); diff --git a/src/ReedSolomon.cpp b/src/ReedSolomon.cpp deleted file mode 100644 index 38d8ea8..0000000 --- a/src/ReedSolomon.cpp +++ /dev/null @@ -1,116 +0,0 @@ -/* - Copyright (C) 2005, 2006, 2007, 2008, 2009 Her Majesty the Queen in Right - of Canada (Communications Research Center Canada) - - Copyright (C) 2016 - Matthias P. Braendli, matthias.braendli@mpb.li - - http://www.opendigitalradio.org - */ -/* - This file is part of ODR-DabMux. - - ODR-DabMux is free software: you can redistribute it and/or modify - it under the terms of the GNU General Public License as - published by the Free Software Foundation, either version 3 of the - License, or (at your option) any later version. - - ODR-DabMux is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with ODR-DabMux. If not, see . - */ - -#include "ReedSolomon.h" -#include -#include -#include -#include -#include // For galois.h ... -#include // For memcpy - -extern "C" { -#include "fec/fec.h" -} -#include - -#define SYMSIZE 8 - - -ReedSolomon::ReedSolomon(int N, int K, bool reverse, int gfpoly, int firstRoot, int primElem) -{ - setReverse(reverse); - - m_N = N; - m_K = K; - - const int symsize = SYMSIZE; - const int nroots = N - K; // For EDI PFT, this must be 48 - const int pad = ((1 << symsize) - 1) - N; // is 255-N - - rsData = init_rs_char(symsize, gfpoly, firstRoot, primElem, nroots, pad); - - if (rsData == nullptr) { - std::stringstream ss; - ss << "Invalid Reed-Solomon parameters! " << - "N=" << N << " ; K=" << K << " ; pad=" << pad; - throw std::invalid_argument(ss.str()); - } -} - - -ReedSolomon::~ReedSolomon() -{ - free_rs_char(rsData); -} - - -void ReedSolomon::setReverse(bool state) -{ - reverse = state; -} - - -int ReedSolomon::encode(void* data, void* fec, size_t size) -{ - uint8_t* input = reinterpret_cast(data); - uint8_t* output = reinterpret_cast(fec); - int ret = 0; - - if (reverse) { - std::vector buffer(m_N); - - memcpy(&buffer[0], input, m_K); - memcpy(&buffer[m_K], output, m_N - m_K); - - ret = decode_rs_char(rsData, &buffer[0], nullptr, 0); - if ((ret != 0) && (ret != -1)) { - memcpy(input, &buffer[0], m_K); - memcpy(output, &buffer[m_K], m_N - m_K); - } - } - else { - encode_rs_char(rsData, input, output); - } - - return ret; -} - - -int ReedSolomon::encode(void* data, size_t size) -{ - uint8_t* input = reinterpret_cast(data); - int ret = 0; - - if (reverse) { - ret = decode_rs_char(rsData, input, nullptr, 0); - } - else { - encode_rs_char(rsData, input, &input[m_K]); - } - - return ret; -} diff --git a/src/ReedSolomon.h b/src/ReedSolomon.h deleted file mode 100644 index abcef62..0000000 --- a/src/ReedSolomon.h +++ /dev/null @@ -1,56 +0,0 @@ -/* - Copyright (C) 2005, 2006, 2007, 2008, 2009 Her Majesty the Queen in Right - of Canada (Communications Research Center Canada) - - Copyright (C) 2016 - Matthias P. Braendli, matthias.braendli@mpb.li - - http://www.opendigitalradio.org - */ -/* - This file is part of ODR-DabMux. - - ODR-DabMux is free software: you can redistribute it and/or modify - it under the terms of the GNU General Public License as - published by the Free Software Foundation, either version 3 of the - License, or (at your option) any later version. - - ODR-DabMux is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with ODR-DabMux. If not, see . - */ - -#pragma once - -#ifdef HAVE_CONFIG_H -# include -#endif - -#include - -class ReedSolomon -{ -public: - ReedSolomon(int N, int K, - bool reverse = false, - int gfpoly = 0x11d, int firstRoot = 0, int primElem = 1); - ReedSolomon(const ReedSolomon& other) = delete; - ReedSolomon operator=(const ReedSolomon& other) = delete; - ~ReedSolomon(); - - void setReverse(bool state); - int encode(void* data, void* fec, size_t size); - int encode(void* data, size_t size); - -private: - int m_N; - int m_K; - - void* rsData; - bool reverse; -}; - diff --git a/src/ThreadsafeQueue.h b/src/ThreadsafeQueue.h deleted file mode 100644 index ab287b2..0000000 --- a/src/ThreadsafeQueue.h +++ /dev/null @@ -1,178 +0,0 @@ -/* - Copyright (C) 2007, 2008, 2009, 2010, 2011 Her Majesty the Queen in - Right of Canada (Communications Research Center Canada) - - Copyright (C) 2018 - Matthias P. Braendli, matthias.braendli@mpb.li - - An implementation for a threadsafe queue, depends on C++11 - - When creating a ThreadsafeQueue, one can specify the minimal number - of elements it must contain before it is possible to take one - element out. - */ -/* - This file is part of ODR-DabMux. - - ODR-DabMux is free software: you can redistribute it and/or modify - it under the terms of the GNU General Public License as - published by the Free Software Foundation, either version 3 of the - License, or (at your option) any later version. - - ODR-DabMux is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with ODR-DabMux. If not, see . - */ - -#pragma once - -#include -#include -#include -#include - -/* This queue is meant to be used by two threads. One producer - * that pushes elements into the queue, and one consumer that - * retrieves the elements. - * - * The queue can make the consumer block until an element - * is available, or a wakeup requested. - */ - -/* Class thrown by blocking pop to tell the consumer - * that there's a wakeup requested. */ -class ThreadsafeQueueWakeup {}; - -template -class ThreadsafeQueue -{ -public: - /* Push one element into the queue, and notify another thread that - * might be waiting. - * - * returns the new queue size. - */ - size_t push(T const& val) - { - std::unique_lock lock(the_mutex); - the_queue.push(val); - size_t queue_size = the_queue.size(); - lock.unlock(); - - the_rx_notification.notify_one(); - - return queue_size; - } - - size_t push(T&& val) - { - std::unique_lock lock(the_mutex); - the_queue.emplace(std::move(val)); - size_t queue_size = the_queue.size(); - lock.unlock(); - - the_rx_notification.notify_one(); - - return queue_size; - } - - /* Push one element into the queue, but wait until the - * queue size goes below the threshold. - * - * Notify waiting thread. - * - * returns the new queue size. - */ - size_t push_wait_if_full(T const& val, size_t threshold) - { - std::unique_lock lock(the_mutex); - while (the_queue.size() >= threshold) { - the_tx_notification.wait(lock); - } - the_queue.push(val); - size_t queue_size = the_queue.size(); - lock.unlock(); - - the_rx_notification.notify_one(); - - return queue_size; - } - - /* Trigger a wakeup event on a blocking consumer, which - * will receive a ThreadsafeQueueWakeup exception. - */ - void trigger_wakeup(void) - { - std::unique_lock lock(the_mutex); - wakeup_requested = true; - lock.unlock(); - the_rx_notification.notify_one(); - } - - /* Send a notification for the receiver thread */ - void notify(void) - { - the_rx_notification.notify_one(); - } - - bool empty() const - { - std::unique_lock lock(the_mutex); - return the_queue.empty(); - } - - size_t size() const - { - std::unique_lock lock(the_mutex); - return the_queue.size(); - } - - bool try_pop(T& popped_value) - { - std::unique_lock lock(the_mutex); - if (the_queue.empty()) { - return false; - } - - popped_value = the_queue.front(); - the_queue.pop(); - - lock.unlock(); - the_tx_notification.notify_one(); - - return true; - } - - void wait_and_pop(T& popped_value, size_t prebuffering = 1) - { - std::unique_lock lock(the_mutex); - while (the_queue.size() < prebuffering and - not wakeup_requested) { - the_rx_notification.wait(lock); - } - - if (wakeup_requested) { - wakeup_requested = false; - throw ThreadsafeQueueWakeup(); - } - else { - std::swap(popped_value, the_queue.front()); - the_queue.pop(); - - lock.unlock(); - the_tx_notification.notify_one(); - } - } - -private: - std::queue the_queue; - mutable std::mutex the_mutex; - std::condition_variable the_rx_notification; - std::condition_variable the_tx_notification; - bool wakeup_requested = false; -}; - diff --git a/src/dabOutput/edi/Config.h b/src/dabOutput/edi/Config.h index 55d5f0f..0c7dce8 100644 --- a/src/dabOutput/edi/Config.h +++ b/src/dabOutput/edi/Config.h @@ -50,11 +50,18 @@ struct udp_destination_t : public destination_t { }; // TCP server that can accept multiple connections -struct tcp_destination_t : public destination_t { +struct tcp_server_t : public destination_t { unsigned int listen_port = 0; size_t max_frames_queued = 1024; }; +// TCP client that connects to one endpoint +struct tcp_client_t : public destination_t { + std::string dest_addr; + unsigned int dest_port = 0; + size_t max_frames_queued = 1024; +}; + struct configuration_t { unsigned chunk_len = 207; // RSk, data length of each chunk unsigned fec = 0; // number of fragments that can be recovered diff --git a/src/dabOutput/edi/PFT.cpp b/src/dabOutput/edi/PFT.cpp index 5b93016..63dfa34 100644 --- a/src/dabOutput/edi/PFT.cpp +++ b/src/dabOutput/edi/PFT.cpp @@ -314,7 +314,7 @@ std::vector< PFTFragment > PFT::Assemble(AFPacket af_packet) #if 0 fprintf(stderr, "* PFT pseq %d, findex %d, fcount %d, plen %d\n", - m_pseq, findex, fcount, plen & ~0x8000); + m_pseq, findex, fcount, plen & ~0xC000); #endif } diff --git a/src/dabOutput/edi/Transport.cpp b/src/dabOutput/edi/Transport.cpp index 6d3950f..187aabe 100644 --- a/src/dabOutput/edi/Transport.cpp +++ b/src/dabOutput/edi/Transport.cpp @@ -45,12 +45,16 @@ void configuration_t::print() const } etiLog.level(info) << " source port " << udp_dest->source_port; } - else if (auto tcp_dest = dynamic_pointer_cast(edi_dest)) { + else if (auto tcp_dest = dynamic_pointer_cast(edi_dest)) { etiLog.level(info) << " TCP listening on port " << tcp_dest->listen_port; etiLog.level(info) << " max frames queued " << tcp_dest->max_frames_queued; } + else if (auto tcp_dest = dynamic_pointer_cast(edi_dest)) { + etiLog.level(info) << " TCP client connecting to " << tcp_dest->dest_addr << ":" << tcp_dest->dest_port; + etiLog.level(info) << " max frames queued " << tcp_dest->max_frames_queued; + } else { - throw std::logic_error("EDI destination not implemented"); + throw logic_error("EDI destination not implemented"); } } if (interleaver_enabled()) { @@ -78,13 +82,18 @@ Sender::Sender(const configuration_t& conf) : udp_sockets.emplace(udp_dest.get(), udp_socket); } - else if (auto tcp_dest = dynamic_pointer_cast(edi_dest)) { + else if (auto tcp_dest = dynamic_pointer_cast(edi_dest)) { auto dispatcher = make_shared(tcp_dest->max_frames_queued); dispatcher->start(tcp_dest->listen_port, "0.0.0.0"); tcp_dispatchers.emplace(tcp_dest.get(), dispatcher); } + else if (auto tcp_dest = dynamic_pointer_cast(edi_dest)) { + auto tcp_socket = make_shared(); + tcp_socket->connect(tcp_dest->dest_addr, tcp_dest->dest_port); + tcp_senders.emplace(tcp_dest.get(), tcp_socket); + } else { - throw std::logic_error("EDI destination not implemented"); + throw logic_error("EDI destination not implemented"); } } @@ -111,7 +120,7 @@ void Sender::write(const TagPacket& tagpacket) vector edi_fragments = edi_pft.Assemble(af_packet); if (m_conf.verbose) { - fprintf(stderr, "EDI number of PFT fragment before interleaver %zu", + fprintf(stderr, "EDI number of PFT fragment before interleaver %zu\n", edi_fragments.size()); } @@ -128,22 +137,25 @@ void Sender::write(const TagPacket& tagpacket) udp_sockets.at(udp_dest.get())->send(edi_frag, addr); } - else if (auto tcp_dest = dynamic_pointer_cast(dest)) { + else if (auto tcp_dest = dynamic_pointer_cast(dest)) { tcp_dispatchers.at(tcp_dest.get())->write(edi_frag); } + else if (auto tcp_dest = dynamic_pointer_cast(dest)) { + tcp_senders.at(tcp_dest.get())->sendall(edi_frag.data(), edi_frag.size()); + } else { - throw std::logic_error("EDI destination not implemented"); + throw logic_error("EDI destination not implemented"); } } if (m_conf.dump) { - std::ostream_iterator debug_iterator(edi_debug_file); - std::copy(edi_frag.begin(), edi_frag.end(), debug_iterator); + ostream_iterator debug_iterator(edi_debug_file); + copy(edi_frag.begin(), edi_frag.end(), debug_iterator); } } if (m_conf.verbose) { - fprintf(stderr, "EDI number of PFT fragments %zu", + fprintf(stderr, "EDI number of PFT fragments %zu\n", edi_fragments.size()); } } @@ -156,17 +168,20 @@ void Sender::write(const TagPacket& tagpacket) udp_sockets.at(udp_dest.get())->send(af_packet, addr); } - else if (auto tcp_dest = dynamic_pointer_cast(dest)) { + else if (auto tcp_dest = dynamic_pointer_cast(dest)) { tcp_dispatchers.at(tcp_dest.get())->write(af_packet); } + else if (auto tcp_dest = dynamic_pointer_cast(dest)) { + tcp_senders.at(tcp_dest.get())->sendall(af_packet.data(), af_packet.size()); + } else { - throw std::logic_error("EDI destination not implemented"); + throw logic_error("EDI destination not implemented"); } } if (m_conf.dump) { - std::ostream_iterator debug_iterator(edi_debug_file); - std::copy(af_packet.begin(), af_packet.end(), debug_iterator); + ostream_iterator debug_iterator(edi_debug_file); + copy(af_packet.begin(), af_packet.end(), debug_iterator); } } } diff --git a/src/dabOutput/edi/Transport.h b/src/dabOutput/edi/Transport.h index 74126d1..9633275 100644 --- a/src/dabOutput/edi/Transport.h +++ b/src/dabOutput/edi/Transport.h @@ -36,6 +36,7 @@ #include #include #include +#include #include namespace edi { @@ -62,7 +63,8 @@ class Sender { edi::Interleaver edi_interleaver; std::unordered_map> udp_sockets; - std::unordered_map> tcp_dispatchers; + std::unordered_map> tcp_dispatchers; + std::unordered_map> tcp_senders; }; } diff --git a/src/input/Udp.cpp b/src/input/Udp.cpp index b4cced0..5d4f964 100644 --- a/src/input/Udp.cpp +++ b/src/input/Udp.cpp @@ -151,8 +151,8 @@ static uint16_t unpack2(const uint8_t *buf) int Sti_d_Rtp::open(const std::string& name) { - // Skip the sti-rtp:// part if it is present - const string endpoint = (name.substr(0, 10) == "sti-rtp://") ? + // Skip the rtp:// part if it is present + const string endpoint = (name.substr(0, 10) == "rtp://") ? name.substr(10) : name; // The endpoint should be address:port @@ -160,8 +160,8 @@ int Sti_d_Rtp::open(const std::string& name) if (colon_pos == string::npos) { stringstream ss; ss << "'" << name << - " is an invalid format for sti-rtp address: " - "expected [sti-rtp://]address:port"; + " is an invalid format for rtp address: " + "expected [rtp://]address:port"; throw invalid_argument(ss.str()); } -- cgit v1.2.3 From 5ee85c4ac41337e383eb1a735bc05f1e5d46a98f Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Mon, 19 Aug 2019 17:18:51 +0200 Subject: Use EDI output from odr-mmbtools-common --- Makefile.am | 52 +++--- lib/ClockTAI.cpp | 159 ++++++++++------ lib/ClockTAI.h | 10 +- lib/edioutput/AFPacket.cpp | 96 ++++++++++ lib/edioutput/AFPacket.h | 61 ++++++ lib/edioutput/Config.h | 84 +++++++++ lib/edioutput/Interleaver.cpp | 122 ++++++++++++ lib/edioutput/Interleaver.h | 75 ++++++++ lib/edioutput/PFT.cpp | 327 ++++++++++++++++++++++++++++++++ lib/edioutput/PFT.h | 78 ++++++++ lib/edioutput/TagItems.cpp | 383 ++++++++++++++++++++++++++++++++++++++ lib/edioutput/TagItems.h | 229 +++++++++++++++++++++++ lib/edioutput/TagPacket.cpp | 78 ++++++++ lib/edioutput/TagPacket.h | 56 ++++++ lib/edioutput/Transport.cpp | 188 +++++++++++++++++++ lib/edioutput/Transport.h | 71 +++++++ src/DabMultiplexer.cpp | 2 +- src/DabMultiplexer.h | 8 +- src/dabOutput/edi/AFPacket.cpp | 96 ---------- src/dabOutput/edi/AFPacket.h | 61 ------ src/dabOutput/edi/Config.h | 84 --------- src/dabOutput/edi/Interleaver.cpp | 122 ------------ src/dabOutput/edi/Interleaver.h | 75 -------- src/dabOutput/edi/PFT.cpp | 327 -------------------------------- src/dabOutput/edi/PFT.h | 78 -------- src/dabOutput/edi/TagItems.cpp | 216 --------------------- src/dabOutput/edi/TagItems.h | 149 --------------- src/dabOutput/edi/TagPacket.cpp | 79 -------- src/dabOutput/edi/TagPacket.h | 56 ------ src/dabOutput/edi/Transport.cpp | 189 ------------------- src/dabOutput/edi/Transport.h | 71 ------- src/zmq2edi/EDISender.cpp | 2 +- src/zmq2edi/EDISender.h | 6 +- 33 files changed, 1990 insertions(+), 1700 deletions(-) create mode 100644 lib/edioutput/AFPacket.cpp create mode 100644 lib/edioutput/AFPacket.h create mode 100644 lib/edioutput/Config.h create mode 100644 lib/edioutput/Interleaver.cpp create mode 100644 lib/edioutput/Interleaver.h create mode 100644 lib/edioutput/PFT.cpp create mode 100644 lib/edioutput/PFT.h create mode 100644 lib/edioutput/TagItems.cpp create mode 100644 lib/edioutput/TagItems.h create mode 100644 lib/edioutput/TagPacket.cpp create mode 100644 lib/edioutput/TagPacket.h create mode 100644 lib/edioutput/Transport.cpp create mode 100644 lib/edioutput/Transport.h delete mode 100644 src/dabOutput/edi/AFPacket.cpp delete mode 100644 src/dabOutput/edi/AFPacket.h delete mode 100644 src/dabOutput/edi/Config.h delete mode 100644 src/dabOutput/edi/Interleaver.cpp delete mode 100644 src/dabOutput/edi/Interleaver.h delete mode 100644 src/dabOutput/edi/PFT.cpp delete mode 100644 src/dabOutput/edi/PFT.h delete mode 100644 src/dabOutput/edi/TagItems.cpp delete mode 100644 src/dabOutput/edi/TagItems.h delete mode 100644 src/dabOutput/edi/TagPacket.cpp delete mode 100644 src/dabOutput/edi/TagPacket.h delete mode 100644 src/dabOutput/edi/Transport.cpp delete mode 100644 src/dabOutput/edi/Transport.h (limited to 'src/dabOutput/edi/Transport.cpp') diff --git a/Makefile.am b/Makefile.am index 216f7c0..e426f74 100644 --- a/Makefile.am +++ b/Makefile.am @@ -81,19 +81,6 @@ odr_dabmux_SOURCES =src/DabMux.cpp \ src/dabOutput/dabOutputZMQ.cpp \ src/dabOutput/metadata.h \ src/dabOutput/metadata.cpp \ - src/dabOutput/edi/AFPacket.cpp \ - src/dabOutput/edi/AFPacket.h \ - src/dabOutput/edi/Config.h \ - src/dabOutput/edi/Interleaver.cpp \ - src/dabOutput/edi/Interleaver.h \ - src/dabOutput/edi/PFT.cpp \ - src/dabOutput/edi/PFT.h \ - src/dabOutput/edi/TagItems.cpp \ - src/dabOutput/edi/TagItems.h \ - src/dabOutput/edi/TagPacket.cpp \ - src/dabOutput/edi/TagPacket.h \ - src/dabOutput/edi/Transport.cpp \ - src/dabOutput/edi/Transport.h \ src/ConfigParser.cpp \ src/ConfigParser.h \ src/Eti.h \ @@ -169,6 +156,19 @@ odr_dabmux_SOURCES =src/DabMux.cpp \ lib/edi/common.cpp \ lib/edi/common.h \ lib/edi/buffer_unpack.hpp \ + lib/edioutput/AFPacket.cpp \ + lib/edioutput/AFPacket.h \ + lib/edioutput/Config.h \ + lib/edioutput/Interleaver.cpp \ + lib/edioutput/Interleaver.h \ + lib/edioutput/PFT.cpp \ + lib/edioutput/PFT.h \ + lib/edioutput/TagItems.cpp \ + lib/edioutput/TagItems.h \ + lib/edioutput/TagPacket.cpp \ + lib/edioutput/TagPacket.h \ + lib/edioutput/Transport.cpp \ + lib/edioutput/Transport.h \ lib/ReedSolomon.h \ lib/ReedSolomon.cpp \ lib/Socket.h \ @@ -199,19 +199,19 @@ odr_zmq2edi_SOURCES = src/zmq2edi/zmq2edi.cpp \ src/dabOutput/dabOutput.h \ src/dabOutput/metadata.h \ src/dabOutput/metadata.cpp \ - src/dabOutput/edi/AFPacket.cpp \ - src/dabOutput/edi/AFPacket.h \ - src/dabOutput/edi/Config.h \ - src/dabOutput/edi/Interleaver.cpp \ - src/dabOutput/edi/Interleaver.h \ - src/dabOutput/edi/PFT.cpp \ - src/dabOutput/edi/PFT.h \ - src/dabOutput/edi/TagItems.cpp \ - src/dabOutput/edi/TagItems.h \ - src/dabOutput/edi/TagPacket.cpp \ - src/dabOutput/edi/TagPacket.h \ - src/dabOutput/edi/Transport.cpp \ - src/dabOutput/edi/Transport.h \ + lib/edioutput/AFPacket.cpp \ + lib/edioutput/AFPacket.h \ + lib/edioutput/Config.h \ + lib/edioutput/Interleaver.cpp \ + lib/edioutput/Interleaver.h \ + lib/edioutput/PFT.cpp \ + lib/edioutput/PFT.h \ + lib/edioutput/TagItems.cpp \ + lib/edioutput/TagItems.h \ + lib/edioutput/TagPacket.cpp \ + lib/edioutput/TagPacket.h \ + lib/edioutput/Transport.cpp \ + lib/edioutput/Transport.h \ lib/Log.h \ lib/Log.cpp \ lib/crc.h \ diff --git a/lib/ClockTAI.cpp b/lib/ClockTAI.cpp index 42497f4..2656345 100644 --- a/lib/ClockTAI.cpp +++ b/lib/ClockTAI.cpp @@ -9,27 +9,27 @@ http://www.opendigitalradio.org */ /* - This file is part of ODR-DabMux. + This file is part of the ODR-mmbTools. - ODR-DabMux is free software: you can redistribute it and/or modify + This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. - ODR-DabMux is distributed in the hope that it will be useful, + This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License - along with ODR-DabMux. If not, see . -*/ + along with this program. If not, see . + */ /* This file downloads the TAI-UTC bulletins from the from IETF and parses them * so that correct time can be communicated in EDI timestamps. * * This file contains self-test code that can be executed by running - * g++ -g -Wall -DTEST -DHAVE_CURL -std=c++11 -lcurl -pthread \ + * g++ -g -Wall -DTAI_TEST -DHAVE_CURL -std=c++11 -lcurl -pthread \ * ClockTAI.cpp Log.cpp RemoteControl.cpp -lboost_system -o taitest && ./taitest */ @@ -40,9 +40,9 @@ #include "ClockTAI.h" #include "Log.h" -#include -#include -#include +#include +#include +#include #if SUPPORT_SETTING_CLOCK_TAI # include #endif @@ -54,10 +54,13 @@ #include #include #include +#include +#include +#include using namespace std; -#ifdef TEST +#ifdef DOWNLOADED_IN_THE_PAST_TEST static bool wait_longer = true; #endif @@ -76,7 +79,7 @@ static array default_tai_urls = { // According to the Filesystem Hierarchy Standard, the data in // /var/tmp "must not be deleted when the system is booted." -static const char *tai_cache_location = "/var/tmp/odr-dabmux-leap-seconds.cache"; +static const char *tai_cache_location = "/var/tmp/odr-leap-seconds.cache"; // read TAI offset from a valid bulletin in IETF format static int parse_ietf_bulletin(const std::string& bulletin) @@ -127,7 +130,7 @@ static int parse_ietf_bulletin(const std::string& bulletin) tai_utc_offset = offset; tai_utc_offset_valid = true; } -#if TEST +#if TAI_TEST else { cerr << "IETF Ignoring offset " << bulletin_offset << " at TS " << bulletin_ntp_timestamp << @@ -183,7 +186,7 @@ static bulletin_state parse_bulletin(const string& bulletin) const int64_t expiry_unix = std::atoll(expiry_data_str.c_str()) - ntp_unix_offset; -#ifdef TEST +#ifdef TAI_TEST etiLog.level(info) << "Bulletin expires in " << expiry_unix - now; #endif ret.expiry = expiry_unix - now; @@ -246,17 +249,46 @@ static string download_tai_utc_bulletin(const char* url) static string load_bulletin_from_file(const char* cache_filename) { - // Clear the bulletin - ifstream f(cache_filename); - if (not f.good()) { - return {}; + int fd = open(cache_filename, O_RDWR); // lockf requires O_RDWR + if (fd == -1) { + etiLog.level(error) << "TAI-UTC bulletin open cache for reading: " << + strerror(errno); + return ""; } - stringstream ss; - ss << f.rdbuf(); - f.close(); + lseek(fd, 0, SEEK_SET); + + vector buf(1024); + vector new_bulletin_data; + + ssize_t ret = lockf(fd, F_LOCK, 0); + if (ret == 0) { + // exclusive lock acquired + + do { + ret = read(fd, buf.data(), buf.size()); + + if (ret == -1) { + close(fd); + etiLog.level(error) << "TAI-UTC bulletin read cache: " << + strerror(errno); + return ""; + } + + copy(buf.data(), buf.data() + ret, back_inserter(new_bulletin_data)); + } while (ret > 0); + + close(fd); - return ss.str(); + return string{new_bulletin_data.data(), new_bulletin_data.size()}; + } + else { + etiLog.level(error) << + "TAI-UTC bulletin acquire cache lock for reading: " << + strerror(errno); + close(fd); + } + return ""; } ClockTAI::ClockTAI(const std::vector& bulletin_urls) : @@ -289,7 +321,7 @@ int ClockTAI::get_valid_offset() const auto state = parse_bulletin(m_bulletin); if (state.usable()) { -#if TEST +#if TAI_TEST etiLog.level(info) << "Bulletin already valid"; #endif offset = state.offset; @@ -297,20 +329,25 @@ int ClockTAI::get_valid_offset() } else { const auto cache_bulletin = load_bulletin_from_file(tai_cache_location); +#if TAI_TEST + etiLog.level(info) << "Loaded cache bulletin with " << + std::count_if(cache_bulletin.cbegin(), cache_bulletin.cend(), + [](const char c){ return c == '\n'; }) << " lines"; +#endif const auto cache_state = parse_bulletin(cache_bulletin); if (cache_state.usable()) { m_bulletin = cache_bulletin; offset = cache_state.offset; offset_valid = true; -#if TEST +#if TAI_TEST etiLog.level(info) << "Bulletin from cache valid with offset=" << offset; #endif } else { for (const auto url : m_bulletin_urls) { try { -#if TEST +#if TAI_TEST etiLog.level(info) << "Load bulletin from " << url; #endif const auto new_bulletin = download_tai_utc_bulletin(url.c_str()); @@ -368,7 +405,7 @@ int ClockTAI::get_offset() std::unique_lock lock(m_data_mutex); if (not m_offset_valid) { -#ifdef TEST +#ifdef DOWNLOADED_IN_THE_PAST_TEST // Assume we've downloaded it in the past: m_offset = 37; // Valid in early 2017 @@ -418,7 +455,7 @@ int ClockTAI::get_offset() m_bulletin_download_time += hours(download_retry_interval_hours); } -#ifdef TEST +#ifdef DOWNLOADED_IN_THE_PAST_TEST wait_longer = false; #endif break; @@ -426,14 +463,14 @@ int ClockTAI::get_offset() case future_status::deferred: case future_status::timeout: // Not ready yet -#ifdef TEST +#ifdef TAI_TEST etiLog.level(debug) << " async not ready yet"; #endif break; } } else { -#ifdef TEST +#ifdef TAI_TEST etiLog.level(debug) << " Launch async"; #endif m_offset_future = async(launch::async, &ClockTAI::get_valid_offset, this); @@ -463,13 +500,45 @@ int ClockTAI::update_local_tai_clock(int offset) void ClockTAI::update_cache(const char* cache_filename) { - ofstream f(cache_filename); - if (not f.good()) { - throw runtime_error("TAI-UTC bulletin open cache for writing"); + int fd = open(cache_filename, O_RDWR | O_CREAT, 00664); + if (fd == -1) { + etiLog.level(error) << + "TAI-UTC bulletin open cache for writing: " << + strerror(errno); + return; } - f << m_bulletin; - f.close(); + lseek(fd, 0, SEEK_SET); + + ssize_t ret = lockf(fd, F_LOCK, 0); + if (ret == 0) { + // exclusive lock acquired + const char *data = m_bulletin.data(); + size_t remaining = m_bulletin.size(); + + while (remaining > 0) { + ret = write(fd, data, remaining); + if (ret == -1) { + close(fd); + etiLog.level(error) << + "TAI-UTC bulletin write cache: " << + strerror(errno); + return; + } + + remaining -= ret; + data += ret; + } + etiLog.level(debug) << "TAI-UTC bulletin cache updated"; + close(fd); + } + else { + close(fd); + etiLog.level(error) << + "TAI-UTC bulletin acquire cache lock for writing: " << + strerror(errno); + return; + } } @@ -536,27 +605,3 @@ void debug_tai_clk() } #endif -#if TEST -int main(int argc, char **argv) -{ - using namespace std; - - ClockTAI tai({}); - - while (wait_longer) { - try { - etiLog.level(info) << - "Offset is " << tai.get_offset(); - } - catch (const exception &e) { - etiLog.level(error) << - "Exception " << e.what(); - } - - this_thread::sleep_for(chrono::seconds(2)); - } - - return 0; -} -#endif - diff --git a/lib/ClockTAI.h b/lib/ClockTAI.h index bb85815..50a6323 100644 --- a/lib/ClockTAI.h +++ b/lib/ClockTAI.h @@ -9,21 +9,21 @@ http://www.opendigitalradio.org */ /* - This file is part of ODR-DabMux. + This file is part of the ODR-mmbTools. - ODR-DabMux is free software: you can redistribute it and/or modify + This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. - ODR-DabMux is distributed in the hope that it will be useful, + This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License - along with ODR-DabMux. If not, see . -*/ + along with this program. If not, see . + */ /* The EDI output needs TAI clock, according to ETSI TS 102 693 Annex F * "EDI Timestamps". This module can set the local CLOCK_TAI clock by diff --git a/lib/edioutput/AFPacket.cpp b/lib/edioutput/AFPacket.cpp new file mode 100644 index 0000000..b38c38b --- /dev/null +++ b/lib/edioutput/AFPacket.cpp @@ -0,0 +1,96 @@ +/* + Copyright (C) 2014 + Matthias P. Braendli, matthias.braendli@mpb.li + + http://www.opendigitalradio.org + + EDI output. + This implements an AF Packet as defined ETSI TS 102 821. + Also see ETSI TS 102 693 + + */ +/* + This file is part of the ODR-mmbTools. + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as + published by the Free Software Foundation, either version 3 of the + License, or (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see . + */ +#include "config.h" +#include "crc.h" +#include "AFPacket.h" +#include "TagItems.h" +#include "TagPacket.h" +#include +#include +#include +#include +#include +#include + +namespace edi { + +// Header PT field. AF packet contains TAG payload +const uint8_t AFHEADER_PT_TAG = 'T'; + +// AF Packet Major (3 bits) and Minor (4 bits) version +const uint8_t AFHEADER_VERSION = 0x10; // MAJ=1, MIN=0 + +AFPacket AFPacketiser::Assemble(TagPacket tag_packet) +{ + std::vector payload = tag_packet.Assemble(); + + if (m_verbose) + std::cerr << "Assemble AFPacket " << seq << std::endl; + + std::string pack_data("AF"); // SYNC + std::vector packet(pack_data.begin(), pack_data.end()); + + uint32_t taglength = payload.size(); + + if (m_verbose) + std::cerr << " AFPacket payload size " << payload.size() << std::endl; + + // write length into packet + packet.push_back((taglength >> 24) & 0xFF); + packet.push_back((taglength >> 16) & 0xFF); + packet.push_back((taglength >> 8) & 0xFF); + packet.push_back(taglength & 0xFF); + + // fill rest of header + packet.push_back(seq >> 8); + packet.push_back(seq & 0xFF); + seq++; + packet.push_back((have_crc ? 0x80 : 0) | AFHEADER_VERSION); // ar_cf: CRC=1 + packet.push_back(AFHEADER_PT_TAG); + + // insert payload, must have a length multiple of 8 bytes + packet.insert(packet.end(), payload.begin(), payload.end()); + + // calculate CRC over AF Header and payload + uint16_t crc = 0xffff; + crc = crc16(crc, &(packet.front()), packet.size()); + crc ^= 0xffff; + + if (m_verbose) + fprintf(stderr, " AFPacket crc %x\n", crc); + + packet.push_back((crc >> 8) & 0xFF); + packet.push_back(crc & 0xFF); + + if (m_verbose) + std::cerr << " AFPacket length " << packet.size() << std::endl; + + return packet; +} + +} diff --git a/lib/edioutput/AFPacket.h b/lib/edioutput/AFPacket.h new file mode 100644 index 0000000..f2c4e35 --- /dev/null +++ b/lib/edioutput/AFPacket.h @@ -0,0 +1,61 @@ +/* + Copyright (C) 2014 + Matthias P. Braendli, matthias.braendli@mpb.li + + http://www.opendigitalradio.org + + EDI output. + This implements an AF Packet as defined ETSI TS 102 821. + Also see ETSI TS 102 693 + + */ +/* + This file is part of the ODR-mmbTools. + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as + published by the Free Software Foundation, either version 3 of the + License, or (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see . + */ + +#pragma once + +#include "config.h" +#include +#include +#include "TagItems.h" +#include "TagPacket.h" + +namespace edi { + +typedef std::vector AFPacket; + +// ETSI TS 102 821, 6.1 AF packet structure +class AFPacketiser +{ + public: + AFPacketiser() : + m_verbose(false) {}; + AFPacketiser(bool verbose) : + m_verbose(verbose) {}; + + AFPacket Assemble(TagPacket tag_packet); + + private: + static const bool have_crc = true; + + uint16_t seq = 0; //counter that overflows at 0xFFFF + + bool m_verbose; +}; + +} + diff --git a/lib/edioutput/Config.h b/lib/edioutput/Config.h new file mode 100644 index 0000000..ca76322 --- /dev/null +++ b/lib/edioutput/Config.h @@ -0,0 +1,84 @@ +/* + Copyright (C) 2019 + Matthias P. Braendli, matthias.braendli@mpb.li + + http://www.opendigitalradio.org + + EDI output, + UDP and TCP transports and their configuration + + */ +/* + This file is part of the ODR-mmbTools. + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as + published by the Free Software Foundation, either version 3 of the + License, or (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see . + */ + +#pragma once + +#include "config.h" +#include +#include +#include +#include + +namespace edi { + +/** Configuration for EDI output */ + +struct destination_t { + virtual ~destination_t() {}; +}; + +// Can represent both unicast and multicast destinations +struct udp_destination_t : public destination_t { + std::string dest_addr; + std::string source_addr; + unsigned int source_port = 0; + unsigned int ttl = 10; +}; + +// TCP server that can accept multiple connections +struct tcp_server_t : public destination_t { + unsigned int listen_port = 0; + size_t max_frames_queued = 1024; +}; + +// TCP client that connects to one endpoint +struct tcp_client_t : public destination_t { + std::string dest_addr; + unsigned int dest_port = 0; + size_t max_frames_queued = 1024; +}; + +struct configuration_t { + unsigned chunk_len = 207; // RSk, data length of each chunk + unsigned fec = 0; // number of fragments that can be recovered + bool dump = false; // dump a file with the EDI packets + bool verbose = false; + bool enable_pft = false; // Enable protection and fragmentation + unsigned int tagpacket_alignment = 0; + std::vector > destinations; + unsigned int dest_port = 0; // common destination port, because it's encoded in the transport layer + unsigned int latency_frames = 0; // if nonzero, enable interleaver with a latency of latency_frames * 24ms + + bool enabled() const { return destinations.size() > 0; } + bool interleaver_enabled() const { return latency_frames > 0; } + + void print() const; +}; + +} + + diff --git a/lib/edioutput/Interleaver.cpp b/lib/edioutput/Interleaver.cpp new file mode 100644 index 0000000..f26a50e --- /dev/null +++ b/lib/edioutput/Interleaver.cpp @@ -0,0 +1,122 @@ +/* + Copyright (C) 2017 + Matthias P. Braendli, matthias.braendli@mpb.li + + http://www.opendigitalradio.org + + EDI output, + Interleaving of PFT fragments to increase robustness against + burst packet loss. + + This is possible because EDI has to assume that fragments may reach + the receiver out of order. + + */ +/* + This file is part of ODR-DabMux. + + ODR-DabMux is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as + published by the Free Software Foundation, either version 3 of the + License, or (at your option) any later version. + + ODR-DabMux is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with ODR-DabMux. If not, see . + */ + +#include "Interleaver.h" +#include + +namespace edi { + +void Interleaver::SetLatency(size_t latency_frames) +{ + m_latency = latency_frames; +} + +Interleaver::fragment_vec Interleaver::Interleave(fragment_vec &fragments) +{ + m_fragment_count = fragments.size(); + + // Create vectors containing Fcount*latency fragments in total + // and store them into the deque + if (m_buffer.empty()) { + m_buffer.emplace_back(); + } + + auto& last_buffer = m_buffer.back(); + + for (auto& fragment : fragments) { + const bool last_buffer_is_complete = + (last_buffer.size() >= m_fragment_count * m_latency); + + if (last_buffer_is_complete) { + m_buffer.emplace_back(); + last_buffer = m_buffer.back(); + } + + last_buffer.push_back(std::move(fragment)); + } + + fragments.clear(); + + while ( not m_buffer.empty() and + (m_buffer.front().size() >= m_fragment_count * m_latency)) { + + auto& first_buffer = m_buffer.front(); + + assert(first_buffer.size() == m_fragment_count * m_latency); + + /* Assume we have 5 fragments per AF frame, and latency of 3. + * This will give the following strides: + * 0 1 2 + * +-------+-------+---+ + * | 0 1 | 2 3 | 4 | + * | | +---+ | + * | 5 6 | 7 | 8 9 | + * | +---+ | | + * |10 |11 12 |13 14 | + * +---+-------+-------+ + * + * ix will be 0, 5, 10, 1, 6 in the first loop + */ + + for (size_t i = 0; i < m_fragment_count; i++) { + const size_t ix = m_interleave_offset + m_fragment_count * m_stride; + m_interleaved_fragments.push_back(first_buffer.at(ix)); + + m_stride += 1; + if (m_stride >= m_latency) { + m_interleave_offset++; + m_stride = 0; + } + } + + if (m_interleave_offset >= m_fragment_count) { + m_interleave_offset = 0; + m_stride = 0; + m_buffer.pop_front(); + } + } + + std::vector interleaved_frags; + + const size_t n = std::min(m_fragment_count, m_interleaved_fragments.size()); + std::move(m_interleaved_fragments.begin(), + m_interleaved_fragments.begin() + n, + std::back_inserter(interleaved_frags)); + m_interleaved_fragments.erase( + m_interleaved_fragments.begin(), + m_interleaved_fragments.begin() + n); + + return interleaved_frags; +} + +} + + diff --git a/lib/edioutput/Interleaver.h b/lib/edioutput/Interleaver.h new file mode 100644 index 0000000..3029d5d --- /dev/null +++ b/lib/edioutput/Interleaver.h @@ -0,0 +1,75 @@ +/* + Copyright (C) 2017 + Matthias P. Braendli, matthias.braendli@mpb.li + + http://www.opendigitalradio.org + + EDI output, + Interleaving of PFT fragments to increase robustness against + burst packet loss. + + This is possible because EDI has to assume that fragments may reach + the receiver out of order. + + */ +/* + This file is part of the ODR-mmbTools. + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as + published by the Free Software Foundation, either version 3 of the + License, or (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see . + */ + +#pragma once + +#include "config.h" +#include +#include +#include +#include +#include "Log.h" +#include "PFT.h" + +namespace edi { + +class Interleaver { + public: + using fragment_vec = std::vector; + + /* Configure the interleaver to use latency_frames number of AF + * packets for interleaving. Total delay through the interleaver + * will be latency_frames * 24ms + */ + void SetLatency(size_t latency_frames); + + /* Move the fragments for an AF Packet into the interleaver and + * return interleaved fragments to be transmitted. + */ + fragment_vec Interleave(fragment_vec &fragments); + + private: + size_t m_latency = 0; + size_t m_fragment_count = 0; + size_t m_interleave_offset = 0; + size_t m_stride = 0; + + /* Buffer that accumulates enough fragments to interleave */ + std::deque m_buffer; + + /* Buffer that contains fragments that have been interleaved, + * to avoid that the interleaver output is too bursty + */ + std::deque m_interleaved_fragments; +}; + +} + diff --git a/lib/edioutput/PFT.cpp b/lib/edioutput/PFT.cpp new file mode 100644 index 0000000..371d36f --- /dev/null +++ b/lib/edioutput/PFT.cpp @@ -0,0 +1,327 @@ +/* + Copyright (C) 2019 + Matthias P. Braendli, matthias.braendli@mpb.li + + http://www.opendigitalradio.org + + EDI output, + Protection, Fragmentation and Transport. (PFT) + + Are supported: + Reed-Solomon and Fragmentation + + This implements part of PFT as defined ETSI TS 102 821. + + */ +/* + This file is part of the ODR-mmbTools. + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as + published by the Free Software Foundation, either version 3 of the + License, or (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see . + */ + +#include "config.h" +#include +#include +#include +#include +#include +#include +#include +#include +#include "PFT.h" +#include "crc.h" +#include "ReedSolomon.h" + +namespace edi { + +using namespace std; + +// An integer division that rounds up, i.e. ceil(a/b) +#define CEIL_DIV(a, b) (a % b == 0 ? a / b : a / b + 1) + +PFT::PFT() { } + +PFT::PFT(const configuration_t &conf) : + m_k(conf.chunk_len), + m_m(conf.fec), + m_dest_port(conf.dest_port), + m_pseq(0), + m_num_chunks(0), + m_verbose(conf.verbose) + { + if (m_k > 207) { + etiLog.level(warn) << + "EDI PFT: maximum chunk size is 207."; + throw std::out_of_range("EDI PFT Chunk size too large."); + } + + if (m_m > 5) { + etiLog.level(warn) << + "EDI PFT: high number of recoverable fragments" + " may lead to large overhead"; + // See TS 102 821, 7.2.1 Known values, list entry for 'm' + } + } + +RSBlock PFT::Protect(AFPacket af_packet) +{ + RSBlock rs_block; + + // number of chunks is ceil(afpacketsize / m_k) + // TS 102 821 7.2.2: c = ceil(l / k_max) + m_num_chunks = CEIL_DIV(af_packet.size(), m_k); + + if (m_verbose) { + fprintf(stderr, "Protect %zu chunks of size %zu\n", + m_num_chunks, af_packet.size()); + } + + // calculate size of chunk: + // TS 102 821 7.2.2: k = ceil(l / c) + // chunk_len does not include the 48 bytes of protection. + const size_t chunk_len = CEIL_DIV(af_packet.size(), m_num_chunks); + if (chunk_len > 207) { + std::stringstream ss; + ss << "Chunk length " << chunk_len << " too large (>207)"; + throw std::runtime_error(ss.str()); + } + + // The last RS chunk is zero padded + // TS 102 821 7.2.2: z = c*k - l + const size_t zero_pad = m_num_chunks * chunk_len - af_packet.size(); + + // Create the RS(k+p,k) encoder + const int firstRoot = 1; // Discovered by analysing EDI dump + const int gfPoly = 0x11d; + const bool reverse = false; + // The encoding has to be 255, 207 always, because the chunk has to + // be padded at the end, and not at the beginning as libfec would + // do + ReedSolomon rs_encoder(255, 207, reverse, gfPoly, firstRoot); + + // add zero padding to last chunk + for (size_t i = 0; i < zero_pad; i++) { + af_packet.push_back(0); + } + + if (m_verbose) { + fprintf(stderr, " add %zu zero padding\n", zero_pad); + } + + // Calculate RS for each chunk and assemble RS block + for (size_t i = 0; i < af_packet.size(); i+= chunk_len) { + vector chunk(207); + vector protection(PARITYBYTES); + + // copy chunk_len bytes into new chunk + memcpy(&chunk.front(), &af_packet[i], chunk_len); + + // calculate RS for chunk with padding + rs_encoder.encode(&chunk.front(), &protection.front(), 207); + + // Drop the padding + chunk.resize(chunk_len); + + // append new chunk and protection to the RS Packet + rs_block.insert(rs_block.end(), chunk.begin(), chunk.end()); + rs_block.insert(rs_block.end(), protection.begin(), protection.end()); + } + + return rs_block; +} + +vector< vector > PFT::ProtectAndFragment(AFPacket af_packet) +{ + const bool enable_RS = (m_m > 0); + + if (enable_RS) { + RSBlock rs_block = Protect(af_packet); + +#if 0 + fprintf(stderr, " af_packet (%zu):", af_packet.size()); + for (size_t i = 0; i < af_packet.size(); i++) { + fprintf(stderr, "%02x ", af_packet[i]); + } + fprintf(stderr, "\n"); + + fprintf(stderr, " rs_block (%zu):", rs_block.size()); + for (size_t i = 0; i < rs_block.size(); i++) { + fprintf(stderr, "%02x ", rs_block[i]); + } + fprintf(stderr, "\n"); +#endif + + // TS 102 821 7.2.2: s_max = MIN(floor(c*p/(m+1)), MTU - h)) + const size_t max_payload_size = ( m_num_chunks * PARITYBYTES ) / (m_m + 1); + + // Calculate fragment count and size + // TS 102 821 7.2.2: ceil((l + c*p + z) / s_max) + // l + c*p + z = length of RS block + const size_t num_fragments = CEIL_DIV(rs_block.size(), max_payload_size); + + // TS 102 821 7.2.2: ceil((l + c*p + z) / f) + const size_t fragment_size = CEIL_DIV(rs_block.size(), num_fragments); + + if (m_verbose) + fprintf(stderr, " PnF fragment_size %zu, num frag %zu\n", + fragment_size, num_fragments); + + vector< vector > fragments(num_fragments); + + for (size_t i = 0; i < num_fragments; i++) { + fragments[i].resize(fragment_size); + for (size_t j = 0; j < fragment_size; j++) { + const size_t ix = j*num_fragments + i; + if (ix < rs_block.size()) { + fragments[i][j] = rs_block[ix]; + } + else { + fragments[i][j] = 0; + } + } + } + + return fragments; + } + else { // No RS, only fragmentation + // TS 102 821 7.2.2: s_max = MTU - h + // Ethernet MTU is 1500, but maybe you are routing over a network which + // has some sort of packet encapsulation. Add some margin. + const size_t max_payload_size = 1400; + + // Calculate fragment count and size + // TS 102 821 7.2.2: ceil((l + c*p + z) / s_max) + // l + c*p + z = length of AF packet + const size_t num_fragments = CEIL_DIV(af_packet.size(), max_payload_size); + + // TS 102 821 7.2.2: ceil((l + c*p + z) / f) + const size_t fragment_size = CEIL_DIV(af_packet.size(), num_fragments); + vector< vector > fragments(num_fragments); + + for (size_t i = 0; i < num_fragments; i++) { + fragments[i].reserve(fragment_size); + + for (size_t j = 0; j < fragment_size; j++) { + const size_t ix = i*fragment_size + j; + if (ix < af_packet.size()) { + fragments[i].push_back(af_packet.at(ix)); + } + else { + break; + } + } + } + + return fragments; + } +} + +std::vector< PFTFragment > PFT::Assemble(AFPacket af_packet) +{ + vector< vector > fragments = ProtectAndFragment(af_packet); + vector< vector > pft_fragments; // These contain PF headers + + const bool enable_RS = (m_m > 0); + const bool enable_transport = true; + + unsigned int findex = 0; + + unsigned fcount = fragments.size(); + + // calculate size of chunk: + // TS 102 821 7.2.2: k = ceil(l / c) + // chunk_len does not include the 48 bytes of protection. + const size_t chunk_len = enable_RS ? + CEIL_DIV(af_packet.size(), m_num_chunks) : 0; + + // The last RS chunk is zero padded + // TS 102 821 7.2.2: z = c*k - l + const size_t zero_pad = enable_RS ? + m_num_chunks * chunk_len - af_packet.size() : 0; + + for (const auto &fragment : fragments) { + // Psync + std::string psync("PF"); + std::vector packet(psync.begin(), psync.end()); + + // Pseq + packet.push_back(m_pseq >> 8); + packet.push_back(m_pseq & 0xFF); + + // Findex + packet.push_back(findex >> 16); + packet.push_back(findex >> 8); + packet.push_back(findex & 0xFF); + findex++; + + // Fcount + packet.push_back(fcount >> 16); + packet.push_back(fcount >> 8); + packet.push_back(fcount & 0xFF); + + // RS (1 bit), transport (1 bit) and Plen (14 bits) + unsigned int plen = fragment.size(); + if (enable_RS) { + plen |= 0x8000; // Set FEC bit + } + + if (enable_transport) { + plen |= 0x4000; // Set ADDR bit + } + + packet.push_back(plen >> 8); + packet.push_back(plen & 0xFF); + + if (enable_RS) { + packet.push_back(chunk_len); // RSk + packet.push_back(zero_pad); // RSz + } + + if (enable_transport) { + // Source (16 bits) + uint16_t addr_source = 0; + packet.push_back(addr_source >> 8); + packet.push_back(addr_source & 0xFF); + + // Dest (16 bits) + packet.push_back(m_dest_port >> 8); + packet.push_back(m_dest_port & 0xFF); + } + + // calculate CRC over AF Header and payload + uint16_t crc = 0xffff; + crc = crc16(crc, &(packet.front()), packet.size()); + crc ^= 0xffff; + + packet.push_back((crc >> 8) & 0xFF); + packet.push_back(crc & 0xFF); + + // insert payload, must have a length multiple of 8 bytes + packet.insert(packet.end(), fragment.begin(), fragment.end()); + + pft_fragments.push_back(packet); + +#if 0 + fprintf(stderr, "* PFT pseq %d, findex %d, fcount %d, plen %d\n", + m_pseq, findex, fcount, plen & ~0xC000); +#endif + } + + m_pseq++; + + return pft_fragments; +} + +} + diff --git a/lib/edioutput/PFT.h b/lib/edioutput/PFT.h new file mode 100644 index 0000000..0ff4839 --- /dev/null +++ b/lib/edioutput/PFT.h @@ -0,0 +1,78 @@ +/* + Copyright (C) 2019 + Matthias P. Braendli, matthias.braendli@mpb.li + + http://www.opendigitalradio.org + + EDI output, + Protection, Fragmentation and Transport. (PFT) + + Are supported: + Reed-Solomon and Fragmentation + + This implements part of PFT as defined ETSI TS 102 821. + + */ +/* + This file is part of the ODR-mmbTools. + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as + published by the Free Software Foundation, either version 3 of the + License, or (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see . + */ + +#pragma once + +#include "config.h" +#include +#include +#include +#include +#include "AFPacket.h" +#include "Log.h" +#include "ReedSolomon.h" +#include "Config.h" + +namespace edi { + +typedef std::vector RSBlock; +typedef std::vector PFTFragment; + +class PFT +{ + public: + static constexpr int PARITYBYTES = 48; + + PFT(); + PFT(const configuration_t& conf); + + // return a list of PFT fragments with the correct + // PFT headers + std::vector< PFTFragment > Assemble(AFPacket af_packet); + + // Apply Reed-Solomon FEC to the AF Packet + RSBlock Protect(AFPacket af_packet); + + // Cut a RSBlock into several fragments that can be transmitted + std::vector< std::vector > ProtectAndFragment(AFPacket af_packet); + + private: + unsigned int m_k = 207; // length of RS data word + unsigned int m_m = 3; // number of fragments that can be recovered if lost + unsigned int m_dest_port = 12000; // Destination port for transport header + uint16_t m_pseq = 0; + size_t m_num_chunks = 0; + bool m_verbose = 0; +}; + +} + diff --git a/lib/edioutput/TagItems.cpp b/lib/edioutput/TagItems.cpp new file mode 100644 index 0000000..35a6852 --- /dev/null +++ b/lib/edioutput/TagItems.cpp @@ -0,0 +1,383 @@ +/* + EDI output. + This defines a few TAG items as defined ETSI TS 102 821 and + ETSI TS 102 693 + + Copyright (C) 2019 + Matthias P. Braendli, matthias.braendli@mpb.li + + http://www.opendigitalradio.org + + */ +/* + This file is part of the ODR-mmbTools. + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as + published by the Free Software Foundation, either version 3 of the + License, or (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see . + */ + +#include "config.h" +#include "TagItems.h" +#include +#include +#include +#include +#include + +namespace edi { + +TagStarPTR::TagStarPTR(const std::string& protocol) + : m_protocol(protocol) +{ + if (m_protocol.size() != 4) { + throw std::runtime_error("TagStarPTR protocol invalid length"); + } +} + +std::vector TagStarPTR::Assemble() +{ + //std::cerr << "TagItem *ptr" << std::endl; + std::string pack_data("*ptr"); + std::vector packet(pack_data.begin(), pack_data.end()); + + packet.push_back(0); + packet.push_back(0); + packet.push_back(0); + packet.push_back(0x40); + + packet.insert(packet.end(), m_protocol.begin(), m_protocol.end()); + + // Major + packet.push_back(0); + packet.push_back(0); + + // Minor + packet.push_back(0); + packet.push_back(0); + return packet; +} + +std::vector TagDETI::Assemble() +{ + std::string pack_data("deti"); + std::vector packet(pack_data.begin(), pack_data.end()); + packet.reserve(256); + + // Placeholder for length + packet.push_back(0); + packet.push_back(0); + packet.push_back(0); + packet.push_back(0); + + uint8_t fct = dlfc % 250; + uint8_t fcth = dlfc / 250; + + + uint16_t detiHeader = fct | (fcth << 8) | (rfudf << 13) | (ficf << 14) | (atstf << 15); + packet.push_back(detiHeader >> 8); + packet.push_back(detiHeader & 0xFF); + + uint32_t etiHeader = mnsc | (rfu << 16) | (rfa << 17) | + (fp << 19) | (mid << 22) | (stat << 24); + packet.push_back((etiHeader >> 24) & 0xFF); + packet.push_back((etiHeader >> 16) & 0xFF); + packet.push_back((etiHeader >> 8) & 0xFF); + packet.push_back(etiHeader & 0xFF); + + if (atstf) { + packet.push_back(utco); + + packet.push_back((seconds >> 24) & 0xFF); + packet.push_back((seconds >> 16) & 0xFF); + packet.push_back((seconds >> 8) & 0xFF); + packet.push_back(seconds & 0xFF); + + packet.push_back((tsta >> 16) & 0xFF); + packet.push_back((tsta >> 8) & 0xFF); + packet.push_back(tsta & 0xFF); + } + + if (ficf) { + for (size_t i = 0; i < fic_length; i++) { + packet.push_back(fic_data[i]); + } + } + + if (rfudf) { + packet.push_back((rfud >> 16) & 0xFF); + packet.push_back((rfud >> 8) & 0xFF); + packet.push_back(rfud & 0xFF); + } + + // calculate and update size + // remove TAG name and TAG length fields and convert to bits + uint32_t taglength = (packet.size() - 8) * 8; + + // write length into packet + packet[4] = (taglength >> 24) & 0xFF; + packet[5] = (taglength >> 16) & 0xFF; + packet[6] = (taglength >> 8) & 0xFF; + packet[7] = taglength & 0xFF; + + dlfc = (dlfc+1) % 5000; + + /* + std::cerr << "TagItem deti, packet.size " << packet.size() << std::endl; + std::cerr << " fic length " << fic_length << std::endl; + std::cerr << " length " << taglength / 8 << std::endl; + */ + return packet; +} + +void TagDETI::set_edi_time(const std::time_t t, int tai_utc_offset) +{ + utco = tai_utc_offset - 32; + + const std::time_t posix_timestamp_1_jan_2000 = 946684800; + + seconds = t - posix_timestamp_1_jan_2000 + utco; +} + +std::vector TagESTn::Assemble() +{ + std::string pack_data("est"); + std::vector packet(pack_data.begin(), pack_data.end()); + packet.reserve(mst_length*8 + 16); + + packet.push_back(id); + + // Placeholder for length + packet.push_back(0); + packet.push_back(0); + packet.push_back(0); + packet.push_back(0); + + if (tpl > 0x3F) { + throw std::runtime_error("TagESTn: invalid TPL value"); + } + + if (sad > 0x3FF) { + throw std::runtime_error("TagESTn: invalid SAD value"); + } + + if (scid > 0x3F) { + throw std::runtime_error("TagESTn: invalid SCID value"); + } + + uint32_t sstc = (scid << 18) | (sad << 8) | (tpl << 2) | rfa; + packet.push_back((sstc >> 16) & 0xFF); + packet.push_back((sstc >> 8) & 0xFF); + packet.push_back(sstc & 0xFF); + + for (size_t i = 0; i < mst_length * 8; i++) { + packet.push_back(mst_data[i]); + } + + // calculate and update size + // remove TAG name and TAG length fields and convert to bits + uint32_t taglength = (packet.size() - 8) * 8; + + // write length into packet + packet[4] = (taglength >> 24) & 0xFF; + packet[5] = (taglength >> 16) & 0xFF; + packet[6] = (taglength >> 8) & 0xFF; + packet[7] = taglength & 0xFF; + + /* + std::cerr << "TagItem ESTn, length " << packet.size() << std::endl; + std::cerr << " mst_length " << mst_length << std::endl; + */ + return packet; +} + +std::vector TagDSTI::Assemble() +{ + std::string pack_data("dsti"); + std::vector packet(pack_data.begin(), pack_data.end()); + packet.reserve(256); + + // Placeholder for length + packet.push_back(0); + packet.push_back(0); + packet.push_back(0); + packet.push_back(0); + + uint8_t dfctl = dflc % 250; + uint8_t dfcth = dflc / 250; + + + uint16_t dstiHeader = dfctl | (dfcth << 8) | (rfadf << 13) | (atstf << 14) | (stihf << 15); + packet.push_back(dstiHeader >> 8); + packet.push_back(dstiHeader & 0xFF); + + if (stihf) { + packet.push_back(stat); + packet.push_back((spid >> 8) & 0xFF); + packet.push_back(spid & 0xFF); + } + + if (atstf) { + packet.push_back(utco); + + packet.push_back((seconds >> 24) & 0xFF); + packet.push_back((seconds >> 16) & 0xFF); + packet.push_back((seconds >> 8) & 0xFF); + packet.push_back(seconds & 0xFF); + + packet.push_back((tsta >> 16) & 0xFF); + packet.push_back((tsta >> 8) & 0xFF); + packet.push_back(tsta & 0xFF); + } + + if (rfadf) { + for (size_t i = 0; i < rfad.size(); i++) { + packet.push_back(rfad[i]); + } + } + // calculate and update size + // remove TAG name and TAG length fields and convert to bits + uint32_t taglength = (packet.size() - 8) * 8; + + // write length into packet + packet[4] = (taglength >> 24) & 0xFF; + packet[5] = (taglength >> 16) & 0xFF; + packet[6] = (taglength >> 8) & 0xFF; + packet[7] = taglength & 0xFF; + + dflc = (dflc+1) % 5000; + + /* + std::cerr << "TagItem dsti, packet.size " << packet.size() << std::endl; + std::cerr << " length " << taglength / 8 << std::endl; + */ + return packet; +} + +void TagDSTI::set_edi_time(const std::time_t t, int tai_utc_offset) +{ + utco = tai_utc_offset - 32; + + const std::time_t posix_timestamp_1_jan_2000 = 946684800; + + seconds = t - posix_timestamp_1_jan_2000 + utco; +} + +#if 0 +/* Update the EDI time. t is in UTC, TAI offset is requested from adjtimex */ +void TagDSTI::set_edi_time(const std::time_t t) +{ + if (tai_offset_cache_updated_at == 0 or tai_offset_cache_updated_at + 3600 < t) { + struct timex timex_request; + timex_request.modes = 0; + + int err = adjtimex(&timex_request); + if (err == -1) { + throw std::runtime_error("adjtimex failed"); + } + + if (timex_request.tai == 0) { + throw std::runtime_error("CLOCK_TAI is not properly set up"); + } + tai_offset_cache = timex_request.tai; + tai_offset_cache_updated_at = t; + + fprintf(stderr, "adjtimex: %d, tai %d\n", err, timex_request.tai); + } + + utco = tai_offset_cache - 32; + + const std::time_t posix_timestamp_1_jan_2000 = 946684800; + + seconds = t - posix_timestamp_1_jan_2000 + utco; +} +#endif + +std::vector TagSSm::Assemble() +{ + std::string pack_data("ss"); + std::vector packet(pack_data.begin(), pack_data.end()); + packet.reserve(istd_length + 16); + + packet.push_back((id >> 8) & 0xFF); + packet.push_back(id & 0xFF); + + // Placeholder for length + packet.push_back(0); + packet.push_back(0); + packet.push_back(0); + packet.push_back(0); + + if (rfa > 0x1F) { + throw std::runtime_error("TagSSm: invalid RFA value"); + } + + if (tid > 0x7) { + throw std::runtime_error("TagSSm: invalid tid value"); + } + + if (tidext > 0x7) { + throw std::runtime_error("TagSSm: invalid tidext value"); + } + + if (stid > 0x0FFF) { + throw std::runtime_error("TagSSm: invalid stid value"); + } + + uint32_t istc = (rfa << 19) | (tid << 16) | (tidext << 13) | ((crcstf ? 1 : 0) << 12) | stid; + packet.push_back((istc >> 16) & 0xFF); + packet.push_back((istc >> 8) & 0xFF); + packet.push_back(istc & 0xFF); + + for (size_t i = 0; i < istd_length; i++) { + packet.push_back(istd_data[i]); + } + + // calculate and update size + // remove TAG name and TAG length fields and convert to bits + uint32_t taglength = (packet.size() - 8) * 8; + + // write length into packet + packet[4] = (taglength >> 24) & 0xFF; + packet[5] = (taglength >> 16) & 0xFF; + packet[6] = (taglength >> 8) & 0xFF; + packet[7] = taglength & 0xFF; + + /* + std::cerr << "TagItem SSm, length " << packet.size() << std::endl; + std::cerr << " istd_length " << istd_length << std::endl; + */ + return packet; +} + + +std::vector TagStarDMY::Assemble() +{ + std::string pack_data("*dmy"); + std::vector packet(pack_data.begin(), pack_data.end()); + + packet.resize(4 + 4 + length_); + + const uint32_t length_bits = length_ * 8; + + packet[4] = (length_bits >> 24) & 0xFF; + packet[5] = (length_bits >> 16) & 0xFF; + packet[6] = (length_bits >> 8) & 0xFF; + packet[7] = length_bits & 0xFF; + + // The remaining bytes in the packet are "undefined data" + + return packet; +} + +} + diff --git a/lib/edioutput/TagItems.h b/lib/edioutput/TagItems.h new file mode 100644 index 0000000..25daa14 --- /dev/null +++ b/lib/edioutput/TagItems.h @@ -0,0 +1,229 @@ +/* + EDI output. + This defines a few TAG items as defined ETSI TS 102 821 and + ETSI TS 102 693 + + Copyright (C) 2019 + Matthias P. Braendli, matthias.braendli@mpb.li + + http://www.opendigitalradio.org + + */ +/* + This file is part of the ODR-mmbTools. + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as + published by the Free Software Foundation, either version 3 of the + License, or (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see . + */ + +#pragma once + +#include "config.h" +#include +#include +#include +#include +#include + +namespace edi { + +class TagItem +{ + public: + virtual std::vector Assemble() = 0; +}; + +// ETSI TS 102 693, 5.1.1 Protocol type and revision +class TagStarPTR : public TagItem +{ + public: + TagStarPTR(const std::string& protocol); + std::vector Assemble(); + + private: + std::string m_protocol = ""; +}; + +// ETSI TS 102 693, 5.1.3 DAB ETI(LI) Management (deti) +class TagDETI : public TagItem +{ + public: + std::vector Assemble(); + + /***** DATA in intermediary format ****/ + // For the ETI Header: must be defined ! + uint8_t stat = 0; + uint8_t mid = 0; + uint8_t fp = 0; + uint8_t rfa = 0; + uint8_t rfu = 0; // MNSC is valid + uint16_t mnsc = 0; + uint16_t dlfc = 0; // modulo 5000 frame counter + + // ATST (optional) + bool atstf = false; // presence of atst data + + /* UTCO: Offset (in seconds) between UTC and the Seconds value. The + * value is expressed as an unsigned 8-bit quantity. As of February + * 2009, the value shall be 2 and shall change as a result of each + * modification of the number of leap seconds, as proscribed by + * International Earth Rotation and Reference Systems Service (IERS). + * + * According to Annex F + * EDI = TAI - 32s (constant) + * EDI = UTC + UTCO + * we derive + * UTCO = TAI-UTC - 32 + * where the TAI-UTC offset is given by the USNO bulletin using + * the ClockTAI module. + */ + uint8_t utco = 0; + + /* Update the EDI time. t is in UTC */ + void set_edi_time(const std::time_t t, int tai_utc_offset); + + /* The number of SI seconds since 2000-01-01 T 00:00:00 UTC as an + * unsigned 32-bit quantity. Contrary to POSIX, this value also + * counts leap seconds. + */ + uint32_t seconds = 0; + + /* TSTA: Shall be the 24 least significant bits of the Time Stamp + * (TIST) field from the STI-D(LI) Frame. The full definition for the + * STI TIST can be found in annex B of EN 300 797 [4]. The most + * significant 8 bits of the TIST field of the incoming STI-D(LI) + * frame, if required, may be carried in the RFAD field. + */ + uint32_t tsta = 0xFFFFFF; + + // the FIC (optional) + bool ficf = false; + const unsigned char* fic_data; + size_t fic_length; + + // rfu + bool rfudf = false; + uint32_t rfud = 0; + + +}; + +// ETSI TS 102 693, 5.1.5 ETI Sub-Channel Stream +class TagESTn : public TagItem +{ + public: + std::vector Assemble(); + + // SSTCn + uint8_t scid; + uint16_t sad; + uint8_t tpl; + uint8_t rfa; + + // Pointer to MSTn data + uint8_t* mst_data; + size_t mst_length; // STLn * 8 bytes + + uint8_t id; +}; + +// ETSI TS 102 693, 5.1.2 DAB STI-D(LI) Management +class TagDSTI : public TagItem +{ + public: + std::vector Assemble(); + + // dsti Header + bool stihf = false; + bool atstf = false; // presence of atst data + bool rfadf = false; + uint16_t dflc = 0; // modulo 5000 frame counter + + // STI Header (optional) + uint8_t stat = 0; + uint16_t spid = 0; + + /* UTCO: Offset (in seconds) between UTC and the Seconds value. The + * value is expressed as an unsigned 8-bit quantity. As of February + * 2009, the value shall be 2 and shall change as a result of each + * modification of the number of leap seconds, as proscribed by + * International Earth Rotation and Reference Systems Service (IERS). + * + * According to Annex F + * EDI = TAI - 32s (constant) + * EDI = UTC + UTCO + * we derive + * UTCO = TAI-UTC - 32 + * where the TAI-UTC offset is given by the USNO bulletin using + * the ClockTAI module. + */ + uint8_t utco = 0; + + /* Update the EDI time. t is in UTC */ + void set_edi_time(const std::time_t t, int tai_utc_offset); + + /* The number of SI seconds since 2000-01-01 T 00:00:00 UTC as an + * unsigned 32-bit quantity. Contrary to POSIX, this value also + * counts leap seconds. + */ + uint32_t seconds = 0; + + /* TSTA: Shall be the 24 least significant bits of the Time Stamp + * (TIST) field from the STI-D(LI) Frame. The full definition for the + * STI TIST can be found in annex B of EN 300 797 [4]. The most + * significant 8 bits of the TIST field of the incoming STI-D(LI) + * frame, if required, may be carried in the RFAD field. + */ + uint32_t tsta = 0xFFFFFF; + + std::array rfad; + + private: + int tai_offset_cache = 0; + std::time_t tai_offset_cache_updated_at = 0; +}; + +// ETSI TS 102 693, 5.1.4 STI-D Payload Stream +class TagSSm : public TagItem +{ + public: + std::vector Assemble(); + + // SSTCn + uint8_t rfa = 0; + uint8_t tid = 0; // See EN 300 797, 5.4.1.1. Value 0 means "MSC sub-channel" + uint8_t tidext = 0; // EN 300 797, 5.4.1.3, Value 0 means "MSC audio stream" + bool crcstf = false; + uint16_t stid = 0; + + // Pointer to ISTDm data + const uint8_t *istd_data; + size_t istd_length; // bytes + + uint16_t id = 0; +}; + +// ETSI TS 102 821, 5.2.2.2 Dummy padding +class TagStarDMY : public TagItem +{ + public: + /* length is the TAG value length in bytes */ + TagStarDMY(uint32_t length) : length_(length) {} + std::vector Assemble(); + + private: + uint32_t length_; +}; + +} + diff --git a/lib/edioutput/TagPacket.cpp b/lib/edioutput/TagPacket.cpp new file mode 100644 index 0000000..b0bf9a1 --- /dev/null +++ b/lib/edioutput/TagPacket.cpp @@ -0,0 +1,78 @@ +/* + Copyright (C) 2014 + Matthias P. Braendli, matthias.braendli@mpb.li + + http://www.opendigitalradio.org + + EDI output. + This defines a TAG Packet. + */ +/* + This file is part of the ODR-mmbTools. + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as + published by the Free Software Foundation, either version 3 of the + License, or (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see . + */ + +#include "config.h" +#include "TagPacket.h" +#include "TagItems.h" +#include +#include +#include +#include +#include +#include + +namespace edi { + +TagPacket::TagPacket(unsigned int alignment) : m_alignment(alignment) +{ } + +std::vector TagPacket::Assemble() +{ + std::list::iterator tag; + + std::vector packet; + + //std::cerr << "Assemble TAGPacket" << std::endl; + + for (tag = tag_items.begin(); tag != tag_items.end(); ++tag) { + std::vector tag_data = (*tag)->Assemble(); + packet.insert(packet.end(), tag_data.begin(), tag_data.end()); + + //std::cerr << " Add TAGItem of length " << tag_data.size() << std::endl; + } + + if (m_alignment == 0) { /* no padding */ } + else if (m_alignment == 8) { + // Add padding inside TAG packet + while (packet.size() % 8 > 0) { + packet.push_back(0); // TS 102 821, 5.1, "padding shall be undefined" + } + } + else if (m_alignment > 8) { + TagStarDMY dmy(m_alignment - 8); + auto dmy_data = dmy.Assemble(); + packet.insert(packet.end(), dmy_data.begin(), dmy_data.end()); + } + else { + std::cerr << "Invalid alignment requirement " << m_alignment << + " defined in TagPacket" << std::endl; + } + + return packet; +} + +} + diff --git a/lib/edioutput/TagPacket.h b/lib/edioutput/TagPacket.h new file mode 100644 index 0000000..1e40ce7 --- /dev/null +++ b/lib/edioutput/TagPacket.h @@ -0,0 +1,56 @@ +/* + Copyright (C) 2014 + Matthias P. Braendli, matthias.braendli@mpb.li + + http://www.opendigitalradio.org + + EDI output. + This defines a TAG Packet. + */ +/* + This file is part of the ODR-mmbTools. + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as + published by the Free Software Foundation, either version 3 of the + License, or (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see . + */ + +#pragma once + +#include "config.h" +#include "TagItems.h" +#include +#include +#include +#include + +namespace edi { + +// A TagPacket is nothing else than a list of tag items, with an +// Assemble function that puts the bytestream together and adds +// padding such that the total length is a multiple of 8 Bytes. +// +// ETSI TS 102 821, 5.1 Tag Packet +class TagPacket +{ + public: + TagPacket(unsigned int alignment); + std::vector Assemble(); + + std::list tag_items; + + private: + unsigned int m_alignment; +}; + +} + diff --git a/lib/edioutput/Transport.cpp b/lib/edioutput/Transport.cpp new file mode 100644 index 0000000..0d5c237 --- /dev/null +++ b/lib/edioutput/Transport.cpp @@ -0,0 +1,188 @@ +/* + Copyright (C) 2019 + Matthias P. Braendli, matthias.braendli@mpb.li + + http://www.opendigitalradio.org + + EDI output, + UDP and TCP transports and their configuration + + */ +/* + This file is part of the ODR-mmbTools. + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as + published by the Free Software Foundation, either version 3 of the + License, or (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see . + */ +#include "Transport.h" +#include + +using namespace std; + +namespace edi { + +void configuration_t::print() const +{ + etiLog.level(info) << "EDI"; + etiLog.level(info) << " verbose " << verbose; + for (auto edi_dest : destinations) { + if (auto udp_dest = dynamic_pointer_cast(edi_dest)) { + etiLog.level(info) << " UDP to " << udp_dest->dest_addr << ":" << dest_port; + if (not udp_dest->source_addr.empty()) { + etiLog.level(info) << " source " << udp_dest->source_addr; + etiLog.level(info) << " ttl " << udp_dest->ttl; + } + etiLog.level(info) << " source port " << udp_dest->source_port; + } + else if (auto tcp_dest = dynamic_pointer_cast(edi_dest)) { + etiLog.level(info) << " TCP listening on port " << tcp_dest->listen_port; + etiLog.level(info) << " max frames queued " << tcp_dest->max_frames_queued; + } + else if (auto tcp_dest = dynamic_pointer_cast(edi_dest)) { + etiLog.level(info) << " TCP client connecting to " << tcp_dest->dest_addr << ":" << tcp_dest->dest_port; + etiLog.level(info) << " max frames queued " << tcp_dest->max_frames_queued; + } + else { + throw logic_error("EDI destination not implemented"); + } + } + if (interleaver_enabled()) { + etiLog.level(info) << " interleave " << latency_frames * 24 << " ms"; + } +} + + +Sender::Sender(const configuration_t& conf) : + m_conf(conf), + edi_pft(m_conf) +{ + if (m_conf.verbose) { + etiLog.log(info, "Setup EDI"); + } + + for (const auto& edi_dest : m_conf.destinations) { + if (const auto udp_dest = dynamic_pointer_cast(edi_dest)) { + auto udp_socket = std::make_shared(udp_dest->source_port); + + if (not udp_dest->source_addr.empty()) { + udp_socket->setMulticastSource(udp_dest->source_addr.c_str()); + udp_socket->setMulticastTTL(udp_dest->ttl); + } + + udp_sockets.emplace(udp_dest.get(), udp_socket); + } + else if (auto tcp_dest = dynamic_pointer_cast(edi_dest)) { + auto dispatcher = make_shared(tcp_dest->max_frames_queued); + dispatcher->start(tcp_dest->listen_port, "0.0.0.0"); + tcp_dispatchers.emplace(tcp_dest.get(), dispatcher); + } + else if (auto tcp_dest = dynamic_pointer_cast(edi_dest)) { + auto tcp_socket = make_shared(); + tcp_socket->connect(tcp_dest->dest_addr, tcp_dest->dest_port); + tcp_senders.emplace(tcp_dest.get(), tcp_socket); + } + else { + throw logic_error("EDI destination not implemented"); + } + } + + if (m_conf.interleaver_enabled()) { + edi_interleaver.SetLatency(m_conf.latency_frames); + } + + if (m_conf.dump) { + edi_debug_file.open("./edi.debug"); + } + + if (m_conf.verbose) { + etiLog.log(info, "EDI set up"); + } +} + +void Sender::write(const TagPacket& tagpacket) +{ + // Assemble into one AF Packet + edi::AFPacket af_packet = edi_afPacketiser.Assemble(tagpacket); + + if (m_conf.enable_pft) { + // Apply PFT layer to AF Packet (Reed Solomon FEC and Fragmentation) + vector edi_fragments = edi_pft.Assemble(af_packet); + + if (m_conf.verbose) { + fprintf(stderr, "EDI number of PFT fragment before interleaver %zu\n", + edi_fragments.size()); + } + + if (m_conf.interleaver_enabled()) { + edi_fragments = edi_interleaver.Interleave(edi_fragments); + } + + // Send over ethernet + for (const auto& edi_frag : edi_fragments) { + for (auto& dest : m_conf.destinations) { + if (const auto& udp_dest = dynamic_pointer_cast(dest)) { + Socket::InetAddress addr; + addr.resolveUdpDestination(udp_dest->dest_addr, m_conf.dest_port); + + udp_sockets.at(udp_dest.get())->send(edi_frag, addr); + } + else if (auto tcp_dest = dynamic_pointer_cast(dest)) { + tcp_dispatchers.at(tcp_dest.get())->write(edi_frag); + } + else if (auto tcp_dest = dynamic_pointer_cast(dest)) { + tcp_senders.at(tcp_dest.get())->sendall(edi_frag.data(), edi_frag.size()); + } + else { + throw logic_error("EDI destination not implemented"); + } + } + + if (m_conf.dump) { + ostream_iterator debug_iterator(edi_debug_file); + copy(edi_frag.begin(), edi_frag.end(), debug_iterator); + } + } + + if (m_conf.verbose) { + fprintf(stderr, "EDI number of PFT fragments %zu\n", + edi_fragments.size()); + } + } + else { + // Send over ethernet + for (auto& dest : m_conf.destinations) { + if (const auto& udp_dest = dynamic_pointer_cast(dest)) { + Socket::InetAddress addr; + addr.resolveUdpDestination(udp_dest->dest_addr, m_conf.dest_port); + + udp_sockets.at(udp_dest.get())->send(af_packet, addr); + } + else if (auto tcp_dest = dynamic_pointer_cast(dest)) { + tcp_dispatchers.at(tcp_dest.get())->write(af_packet); + } + else if (auto tcp_dest = dynamic_pointer_cast(dest)) { + tcp_senders.at(tcp_dest.get())->sendall(af_packet.data(), af_packet.size()); + } + else { + throw logic_error("EDI destination not implemented"); + } + } + + if (m_conf.dump) { + ostream_iterator debug_iterator(edi_debug_file); + copy(af_packet.begin(), af_packet.end(), debug_iterator); + } + } +} + +} diff --git a/lib/edioutput/Transport.h b/lib/edioutput/Transport.h new file mode 100644 index 0000000..325acf8 --- /dev/null +++ b/lib/edioutput/Transport.h @@ -0,0 +1,71 @@ +/* + Copyright (C) 2019 + Matthias P. Braendli, matthias.braendli@mpb.li + + http://www.opendigitalradio.org + + EDI output, + UDP and TCP transports and their configuration + + */ +/* + This file is part of the ODR-mmbTools. + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as + published by the Free Software Foundation, either version 3 of the + License, or (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see . + */ + +#pragma once + +#include "config.h" +#include "Config.h" +#include "AFPacket.h" +#include "PFT.h" +#include "Interleaver.h" +#include "Socket.h" +#include +#include +#include +#include +#include + +namespace edi { + +/** Configuration for EDI output */ + +class Sender { + public: + Sender(const configuration_t& conf); + + void write(const TagPacket& tagpacket); + + private: + configuration_t m_conf; + std::ofstream edi_debug_file; + + // The TagPacket will then be placed into an AFPacket + edi::AFPacketiser edi_afPacketiser; + + // The AF Packet will be protected with reed-solomon and split in fragments + edi::PFT edi_pft; + + // To mitigate for burst packet loss, PFT fragments can be sent out-of-order + edi::Interleaver edi_interleaver; + + std::unordered_map> udp_sockets; + std::unordered_map> tcp_dispatchers; + std::unordered_map> tcp_senders; +}; + +} + diff --git a/src/DabMultiplexer.cpp b/src/DabMultiplexer.cpp index 489787f..0d68ac2 100644 --- a/src/DabMultiplexer.cpp +++ b/src/DabMultiplexer.cpp @@ -376,7 +376,7 @@ void DabMultiplexer::mux_frame(std::vector >& outputs // For EDI, save ETI(LI) Management data into a TAG Item DETI edi::TagDETI edi_tagDETI; - edi::TagStarPTR edi_tagStarPtr; + edi::TagStarPTR edi_tagStarPtr("DETI"); map edi_subchannelToTag; // The above Tag Items will be assembled into a TAG Packet diff --git a/src/DabMultiplexer.h b/src/DabMultiplexer.h index d1075a6..56a8dde 100644 --- a/src/DabMultiplexer.h +++ b/src/DabMultiplexer.h @@ -30,10 +30,10 @@ #endif #include "dabOutput/dabOutput.h" -#include "dabOutput/edi/TagItems.h" -#include "dabOutput/edi/TagPacket.h" -#include "dabOutput/edi/AFPacket.h" -#include "dabOutput/edi/Transport.h" +#include "edioutput/TagItems.h" +#include "edioutput/TagPacket.h" +#include "edioutput/AFPacket.h" +#include "edioutput/Transport.h" #include "fig/FIGCarousel.h" #include "crc.h" #include "utils.h" diff --git a/src/dabOutput/edi/AFPacket.cpp b/src/dabOutput/edi/AFPacket.cpp deleted file mode 100644 index a58a980..0000000 --- a/src/dabOutput/edi/AFPacket.cpp +++ /dev/null @@ -1,96 +0,0 @@ -/* - Copyright (C) 2014 - Matthias P. Braendli, matthias.braendli@mpb.li - - http://www.opendigitalradio.org - - EDI output. - This implements an AF Packet as defined ETSI TS 102 821. - Also see ETSI TS 102 693 - - */ -/* - This file is part of ODR-DabMux. - - ODR-DabMux is free software: you can redistribute it and/or modify - it under the terms of the GNU General Public License as - published by the Free Software Foundation, either version 3 of the - License, or (at your option) any later version. - - ODR-DabMux is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with ODR-DabMux. If not, see . - */ -#include "config.h" -#include "crc.h" -#include "AFPacket.h" -#include "TagItems.h" -#include "TagPacket.h" -#include -#include -#include -#include -#include -#include - -namespace edi { - -// Header PT field. AF packet contains TAG payload -const uint8_t AFHEADER_PT_TAG = 'T'; - -// AF Packet Major (3 bits) and Minor (4 bits) version -const uint8_t AFHEADER_VERSION = 0x10; // MAJ=1, MIN=0 - -AFPacket AFPacketiser::Assemble(TagPacket tag_packet) -{ - std::vector payload = tag_packet.Assemble(); - - if (m_verbose) - std::cerr << "Assemble AFPacket " << seq << std::endl; - - std::string pack_data("AF"); // SYNC - std::vector packet(pack_data.begin(), pack_data.end()); - - uint32_t taglength = payload.size(); - - if (m_verbose) - std::cerr << " AFPacket payload size " << payload.size() << std::endl; - - // write length into packet - packet.push_back((taglength >> 24) & 0xFF); - packet.push_back((taglength >> 16) & 0xFF); - packet.push_back((taglength >> 8) & 0xFF); - packet.push_back(taglength & 0xFF); - - // fill rest of header - packet.push_back(seq >> 8); - packet.push_back(seq & 0xFF); - seq++; - packet.push_back((have_crc ? 0x80 : 0) | AFHEADER_VERSION); // ar_cf: CRC=1 - packet.push_back(AFHEADER_PT_TAG); - - // insert payload, must have a length multiple of 8 bytes - packet.insert(packet.end(), payload.begin(), payload.end()); - - // calculate CRC over AF Header and payload - uint16_t crc = 0xffff; - crc = crc16(crc, &(packet.front()), packet.size()); - crc ^= 0xffff; - - if (m_verbose) - fprintf(stderr, " AFPacket crc %x\n", crc); - - packet.push_back((crc >> 8) & 0xFF); - packet.push_back(crc & 0xFF); - - if (m_verbose) - std::cerr << " AFPacket length " << packet.size() << std::endl; - - return packet; -} - -} diff --git a/src/dabOutput/edi/AFPacket.h b/src/dabOutput/edi/AFPacket.h deleted file mode 100644 index b4ccef1..0000000 --- a/src/dabOutput/edi/AFPacket.h +++ /dev/null @@ -1,61 +0,0 @@ -/* - Copyright (C) 2014 - Matthias P. Braendli, matthias.braendli@mpb.li - - http://www.opendigitalradio.org - - EDI output. - This implements an AF Packet as defined ETSI TS 102 821. - Also see ETSI TS 102 693 - - */ -/* - This file is part of ODR-DabMux. - - ODR-DabMux is free software: you can redistribute it and/or modify - it under the terms of the GNU General Public License as - published by the Free Software Foundation, either version 3 of the - License, or (at your option) any later version. - - ODR-DabMux is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with ODR-DabMux. If not, see . - */ - -#pragma once - -#include "config.h" -#include -#include -#include "TagItems.h" -#include "TagPacket.h" - -namespace edi { - -typedef std::vector AFPacket; - -// ETSI TS 102 821, 6.1 AF packet structure -class AFPacketiser -{ - public: - AFPacketiser() : - m_verbose(false) {}; - AFPacketiser(bool verbose) : - m_verbose(verbose) {}; - - AFPacket Assemble(TagPacket tag_packet); - - private: - static const bool have_crc = true; - - uint16_t seq = 0; //counter that overflows at 0xFFFF - - bool m_verbose; -}; - -} - diff --git a/src/dabOutput/edi/Config.h b/src/dabOutput/edi/Config.h deleted file mode 100644 index 0c7dce8..0000000 --- a/src/dabOutput/edi/Config.h +++ /dev/null @@ -1,84 +0,0 @@ -/* - Copyright (C) 2019 - Matthias P. Braendli, matthias.braendli@mpb.li - - http://www.opendigitalradio.org - - EDI output, - UDP and TCP transports and their configuration - - */ -/* - This file is part of ODR-DabMux. - - ODR-DabMux is free software: you can redistribute it and/or modify - it under the terms of the GNU General Public License as - published by the Free Software Foundation, either version 3 of the - License, or (at your option) any later version. - - ODR-DabMux is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with ODR-DabMux. If not, see . - */ - -#pragma once - -#include "config.h" -#include -#include -#include -#include - -namespace edi { - -/** Configuration for EDI output */ - -struct destination_t { - virtual ~destination_t() {}; -}; - -// Can represent both unicast and multicast destinations -struct udp_destination_t : public destination_t { - std::string dest_addr; - std::string source_addr; - unsigned int source_port = 0; - unsigned int ttl = 10; -}; - -// TCP server that can accept multiple connections -struct tcp_server_t : public destination_t { - unsigned int listen_port = 0; - size_t max_frames_queued = 1024; -}; - -// TCP client that connects to one endpoint -struct tcp_client_t : public destination_t { - std::string dest_addr; - unsigned int dest_port = 0; - size_t max_frames_queued = 1024; -}; - -struct configuration_t { - unsigned chunk_len = 207; // RSk, data length of each chunk - unsigned fec = 0; // number of fragments that can be recovered - bool dump = false; // dump a file with the EDI packets - bool verbose = false; - bool enable_pft = false; // Enable protection and fragmentation - unsigned int tagpacket_alignment = 0; - std::vector > destinations; - unsigned int dest_port = 0; // common destination port, because it's encoded in the transport layer - unsigned int latency_frames = 0; // if nonzero, enable interleaver with a latency of latency_frames * 24ms - - bool enabled() const { return destinations.size() > 0; } - bool interleaver_enabled() const { return latency_frames > 0; } - - void print() const; -}; - -} - - diff --git a/src/dabOutput/edi/Interleaver.cpp b/src/dabOutput/edi/Interleaver.cpp deleted file mode 100644 index f26a50e..0000000 --- a/src/dabOutput/edi/Interleaver.cpp +++ /dev/null @@ -1,122 +0,0 @@ -/* - Copyright (C) 2017 - Matthias P. Braendli, matthias.braendli@mpb.li - - http://www.opendigitalradio.org - - EDI output, - Interleaving of PFT fragments to increase robustness against - burst packet loss. - - This is possible because EDI has to assume that fragments may reach - the receiver out of order. - - */ -/* - This file is part of ODR-DabMux. - - ODR-DabMux is free software: you can redistribute it and/or modify - it under the terms of the GNU General Public License as - published by the Free Software Foundation, either version 3 of the - License, or (at your option) any later version. - - ODR-DabMux is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with ODR-DabMux. If not, see . - */ - -#include "Interleaver.h" -#include - -namespace edi { - -void Interleaver::SetLatency(size_t latency_frames) -{ - m_latency = latency_frames; -} - -Interleaver::fragment_vec Interleaver::Interleave(fragment_vec &fragments) -{ - m_fragment_count = fragments.size(); - - // Create vectors containing Fcount*latency fragments in total - // and store them into the deque - if (m_buffer.empty()) { - m_buffer.emplace_back(); - } - - auto& last_buffer = m_buffer.back(); - - for (auto& fragment : fragments) { - const bool last_buffer_is_complete = - (last_buffer.size() >= m_fragment_count * m_latency); - - if (last_buffer_is_complete) { - m_buffer.emplace_back(); - last_buffer = m_buffer.back(); - } - - last_buffer.push_back(std::move(fragment)); - } - - fragments.clear(); - - while ( not m_buffer.empty() and - (m_buffer.front().size() >= m_fragment_count * m_latency)) { - - auto& first_buffer = m_buffer.front(); - - assert(first_buffer.size() == m_fragment_count * m_latency); - - /* Assume we have 5 fragments per AF frame, and latency of 3. - * This will give the following strides: - * 0 1 2 - * +-------+-------+---+ - * | 0 1 | 2 3 | 4 | - * | | +---+ | - * | 5 6 | 7 | 8 9 | - * | +---+ | | - * |10 |11 12 |13 14 | - * +---+-------+-------+ - * - * ix will be 0, 5, 10, 1, 6 in the first loop - */ - - for (size_t i = 0; i < m_fragment_count; i++) { - const size_t ix = m_interleave_offset + m_fragment_count * m_stride; - m_interleaved_fragments.push_back(first_buffer.at(ix)); - - m_stride += 1; - if (m_stride >= m_latency) { - m_interleave_offset++; - m_stride = 0; - } - } - - if (m_interleave_offset >= m_fragment_count) { - m_interleave_offset = 0; - m_stride = 0; - m_buffer.pop_front(); - } - } - - std::vector interleaved_frags; - - const size_t n = std::min(m_fragment_count, m_interleaved_fragments.size()); - std::move(m_interleaved_fragments.begin(), - m_interleaved_fragments.begin() + n, - std::back_inserter(interleaved_frags)); - m_interleaved_fragments.erase( - m_interleaved_fragments.begin(), - m_interleaved_fragments.begin() + n); - - return interleaved_frags; -} - -} - - diff --git a/src/dabOutput/edi/Interleaver.h b/src/dabOutput/edi/Interleaver.h deleted file mode 100644 index f1cff30..0000000 --- a/src/dabOutput/edi/Interleaver.h +++ /dev/null @@ -1,75 +0,0 @@ -/* - Copyright (C) 2017 - Matthias P. Braendli, matthias.braendli@mpb.li - - http://www.opendigitalradio.org - - EDI output, - Interleaving of PFT fragments to increase robustness against - burst packet loss. - - This is possible because EDI has to assume that fragments may reach - the receiver out of order. - - */ -/* - This file is part of ODR-DabMux. - - ODR-DabMux is free software: you can redistribute it and/or modify - it under the terms of the GNU General Public License as - published by the Free Software Foundation, either version 3 of the - License, or (at your option) any later version. - - ODR-DabMux is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with ODR-DabMux. If not, see . - */ - -#pragma once - -#include "config.h" -#include -#include -#include -#include -#include "Log.h" -#include "PFT.h" - -namespace edi { - -class Interleaver { - public: - using fragment_vec = std::vector; - - /* Configure the interleaver to use latency_frames number of AF - * packets for interleaving. Total delay through the interleaver - * will be latency_frames * 24ms - */ - void SetLatency(size_t latency_frames); - - /* Move the fragments for an AF Packet into the interleaver and - * return interleaved fragments to be transmitted. - */ - fragment_vec Interleave(fragment_vec &fragments); - - private: - size_t m_latency = 0; - size_t m_fragment_count = 0; - size_t m_interleave_offset = 0; - size_t m_stride = 0; - - /* Buffer that accumulates enough fragments to interleave */ - std::deque m_buffer; - - /* Buffer that contains fragments that have been interleaved, - * to avoid that the interleaver output is too bursty - */ - std::deque m_interleaved_fragments; -}; - -} - diff --git a/src/dabOutput/edi/PFT.cpp b/src/dabOutput/edi/PFT.cpp deleted file mode 100644 index 63dfa34..0000000 --- a/src/dabOutput/edi/PFT.cpp +++ /dev/null @@ -1,327 +0,0 @@ -/* - Copyright (C) 2019 - Matthias P. Braendli, matthias.braendli@mpb.li - - http://www.opendigitalradio.org - - EDI output, - Protection, Fragmentation and Transport. (PFT) - - Are supported: - Reed-Solomon and Fragmentation - - This implements part of PFT as defined ETSI TS 102 821. - - */ -/* - This file is part of ODR-DabMux. - - ODR-DabMux is free software: you can redistribute it and/or modify - it under the terms of the GNU General Public License as - published by the Free Software Foundation, either version 3 of the - License, or (at your option) any later version. - - ODR-DabMux is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with ODR-DabMux. If not, see . - */ - -#include "config.h" -#include -#include -#include -#include -#include -#include -#include -#include -#include "PFT.h" -#include "crc.h" -#include "ReedSolomon.h" - -namespace edi { - -using namespace std; - -// An integer division that rounds up, i.e. ceil(a/b) -#define CEIL_DIV(a, b) (a % b == 0 ? a / b : a / b + 1) - -PFT::PFT() { } - -PFT::PFT(const configuration_t &conf) : - m_k(conf.chunk_len), - m_m(conf.fec), - m_dest_port(conf.dest_port), - m_pseq(0), - m_num_chunks(0), - m_verbose(conf.verbose) - { - if (m_k > 207) { - etiLog.level(warn) << - "EDI PFT: maximum chunk size is 207."; - throw std::out_of_range("EDI PFT Chunk size too large."); - } - - if (m_m > 5) { - etiLog.level(warn) << - "EDI PFT: high number of recoverable fragments" - " may lead to large overhead"; - // See TS 102 821, 7.2.1 Known values, list entry for 'm' - } - } - -RSBlock PFT::Protect(AFPacket af_packet) -{ - RSBlock rs_block; - - // number of chunks is ceil(afpacketsize / m_k) - // TS 102 821 7.2.2: c = ceil(l / k_max) - m_num_chunks = CEIL_DIV(af_packet.size(), m_k); - - if (m_verbose) { - fprintf(stderr, "Protect %zu chunks of size %zu\n", - m_num_chunks, af_packet.size()); - } - - // calculate size of chunk: - // TS 102 821 7.2.2: k = ceil(l / c) - // chunk_len does not include the 48 bytes of protection. - const size_t chunk_len = CEIL_DIV(af_packet.size(), m_num_chunks); - if (chunk_len > 207) { - std::stringstream ss; - ss << "Chunk length " << chunk_len << " too large (>207)"; - throw std::runtime_error(ss.str()); - } - - // The last RS chunk is zero padded - // TS 102 821 7.2.2: z = c*k - l - const size_t zero_pad = m_num_chunks * chunk_len - af_packet.size(); - - // Create the RS(k+p,k) encoder - const int firstRoot = 1; // Discovered by analysing EDI dump - const int gfPoly = 0x11d; - const bool reverse = false; - // The encoding has to be 255, 207 always, because the chunk has to - // be padded at the end, and not at the beginning as libfec would - // do - ReedSolomon rs_encoder(255, 207, reverse, gfPoly, firstRoot); - - // add zero padding to last chunk - for (size_t i = 0; i < zero_pad; i++) { - af_packet.push_back(0); - } - - if (m_verbose) { - fprintf(stderr, " add %zu zero padding\n", zero_pad); - } - - // Calculate RS for each chunk and assemble RS block - for (size_t i = 0; i < af_packet.size(); i+= chunk_len) { - vector chunk(207); - vector protection(PARITYBYTES); - - // copy chunk_len bytes into new chunk - memcpy(&chunk.front(), &af_packet[i], chunk_len); - - // calculate RS for chunk with padding - rs_encoder.encode(&chunk.front(), &protection.front(), 207); - - // Drop the padding - chunk.resize(chunk_len); - - // append new chunk and protection to the RS Packet - rs_block.insert(rs_block.end(), chunk.begin(), chunk.end()); - rs_block.insert(rs_block.end(), protection.begin(), protection.end()); - } - - return rs_block; -} - -vector< vector > PFT::ProtectAndFragment(AFPacket af_packet) -{ - const bool enable_RS = (m_m > 0); - - if (enable_RS) { - RSBlock rs_block = Protect(af_packet); - -#if 0 - fprintf(stderr, " af_packet (%zu):", af_packet.size()); - for (size_t i = 0; i < af_packet.size(); i++) { - fprintf(stderr, "%02x ", af_packet[i]); - } - fprintf(stderr, "\n"); - - fprintf(stderr, " rs_block (%zu):", rs_block.size()); - for (size_t i = 0; i < rs_block.size(); i++) { - fprintf(stderr, "%02x ", rs_block[i]); - } - fprintf(stderr, "\n"); -#endif - - // TS 102 821 7.2.2: s_max = MIN(floor(c*p/(m+1)), MTU - h)) - const size_t max_payload_size = ( m_num_chunks * PARITYBYTES ) / (m_m + 1); - - // Calculate fragment count and size - // TS 102 821 7.2.2: ceil((l + c*p + z) / s_max) - // l + c*p + z = length of RS block - const size_t num_fragments = CEIL_DIV(rs_block.size(), max_payload_size); - - // TS 102 821 7.2.2: ceil((l + c*p + z) / f) - const size_t fragment_size = CEIL_DIV(rs_block.size(), num_fragments); - - if (m_verbose) - fprintf(stderr, " PnF fragment_size %zu, num frag %zu\n", - fragment_size, num_fragments); - - vector< vector > fragments(num_fragments); - - for (size_t i = 0; i < num_fragments; i++) { - fragments[i].resize(fragment_size); - for (size_t j = 0; j < fragment_size; j++) { - const size_t ix = j*num_fragments + i; - if (ix < rs_block.size()) { - fragments[i][j] = rs_block[ix]; - } - else { - fragments[i][j] = 0; - } - } - } - - return fragments; - } - else { // No RS, only fragmentation - // TS 102 821 7.2.2: s_max = MTU - h - // Ethernet MTU is 1500, but maybe you are routing over a network which - // has some sort of packet encapsulation. Add some margin. - const size_t max_payload_size = 1400; - - // Calculate fragment count and size - // TS 102 821 7.2.2: ceil((l + c*p + z) / s_max) - // l + c*p + z = length of AF packet - const size_t num_fragments = CEIL_DIV(af_packet.size(), max_payload_size); - - // TS 102 821 7.2.2: ceil((l + c*p + z) / f) - const size_t fragment_size = CEIL_DIV(af_packet.size(), num_fragments); - vector< vector > fragments(num_fragments); - - for (size_t i = 0; i < num_fragments; i++) { - fragments[i].reserve(fragment_size); - - for (size_t j = 0; j < fragment_size; j++) { - const size_t ix = i*fragment_size + j; - if (ix < af_packet.size()) { - fragments[i].push_back(af_packet.at(ix)); - } - else { - break; - } - } - } - - return fragments; - } -} - -std::vector< PFTFragment > PFT::Assemble(AFPacket af_packet) -{ - vector< vector > fragments = ProtectAndFragment(af_packet); - vector< vector > pft_fragments; // These contain PF headers - - const bool enable_RS = (m_m > 0); - const bool enable_transport = true; - - unsigned int findex = 0; - - unsigned fcount = fragments.size(); - - // calculate size of chunk: - // TS 102 821 7.2.2: k = ceil(l / c) - // chunk_len does not include the 48 bytes of protection. - const size_t chunk_len = enable_RS ? - CEIL_DIV(af_packet.size(), m_num_chunks) : 0; - - // The last RS chunk is zero padded - // TS 102 821 7.2.2: z = c*k - l - const size_t zero_pad = enable_RS ? - m_num_chunks * chunk_len - af_packet.size() : 0; - - for (const auto &fragment : fragments) { - // Psync - std::string psync("PF"); - std::vector packet(psync.begin(), psync.end()); - - // Pseq - packet.push_back(m_pseq >> 8); - packet.push_back(m_pseq & 0xFF); - - // Findex - packet.push_back(findex >> 16); - packet.push_back(findex >> 8); - packet.push_back(findex & 0xFF); - findex++; - - // Fcount - packet.push_back(fcount >> 16); - packet.push_back(fcount >> 8); - packet.push_back(fcount & 0xFF); - - // RS (1 bit), transport (1 bit) and Plen (14 bits) - unsigned int plen = fragment.size(); - if (enable_RS) { - plen |= 0x8000; // Set FEC bit - } - - if (enable_transport) { - plen |= 0x4000; // Set ADDR bit - } - - packet.push_back(plen >> 8); - packet.push_back(plen & 0xFF); - - if (enable_RS) { - packet.push_back(chunk_len); // RSk - packet.push_back(zero_pad); // RSz - } - - if (enable_transport) { - // Source (16 bits) - uint16_t addr_source = 0; - packet.push_back(addr_source >> 8); - packet.push_back(addr_source & 0xFF); - - // Dest (16 bits) - packet.push_back(m_dest_port >> 8); - packet.push_back(m_dest_port & 0xFF); - } - - // calculate CRC over AF Header and payload - uint16_t crc = 0xffff; - crc = crc16(crc, &(packet.front()), packet.size()); - crc ^= 0xffff; - - packet.push_back((crc >> 8) & 0xFF); - packet.push_back(crc & 0xFF); - - // insert payload, must have a length multiple of 8 bytes - packet.insert(packet.end(), fragment.begin(), fragment.end()); - - pft_fragments.push_back(packet); - -#if 0 - fprintf(stderr, "* PFT pseq %d, findex %d, fcount %d, plen %d\n", - m_pseq, findex, fcount, plen & ~0xC000); -#endif - } - - m_pseq++; - - return pft_fragments; -} - -} - diff --git a/src/dabOutput/edi/PFT.h b/src/dabOutput/edi/PFT.h deleted file mode 100644 index 4076bf3..0000000 --- a/src/dabOutput/edi/PFT.h +++ /dev/null @@ -1,78 +0,0 @@ -/* - Copyright (C) 2019 - Matthias P. Braendli, matthias.braendli@mpb.li - - http://www.opendigitalradio.org - - EDI output, - Protection, Fragmentation and Transport. (PFT) - - Are supported: - Reed-Solomon and Fragmentation - - This implements part of PFT as defined ETSI TS 102 821. - - */ -/* - This file is part of ODR-DabMux. - - ODR-DabMux is free software: you can redistribute it and/or modify - it under the terms of the GNU General Public License as - published by the Free Software Foundation, either version 3 of the - License, or (at your option) any later version. - - ODR-DabMux is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with ODR-DabMux. If not, see . - */ - -#pragma once - -#include "config.h" -#include -#include -#include -#include -#include "AFPacket.h" -#include "Log.h" -#include "ReedSolomon.h" -#include "dabOutput/edi/Config.h" - -namespace edi { - -typedef std::vector RSBlock; -typedef std::vector PFTFragment; - -class PFT -{ - public: - static constexpr int PARITYBYTES = 48; - - PFT(); - PFT(const configuration_t& conf); - - // return a list of PFT fragments with the correct - // PFT headers - std::vector< PFTFragment > Assemble(AFPacket af_packet); - - // Apply Reed-Solomon FEC to the AF Packet - RSBlock Protect(AFPacket af_packet); - - // Cut a RSBlock into several fragments that can be transmitted - std::vector< std::vector > ProtectAndFragment(AFPacket af_packet); - - private: - unsigned int m_k = 207; // length of RS data word - unsigned int m_m = 3; // number of fragments that can be recovered if lost - unsigned int m_dest_port = 12000; // Destination port for transport header - uint16_t m_pseq = 0; - size_t m_num_chunks = 0; - bool m_verbose = 0; -}; - -} - diff --git a/src/dabOutput/edi/TagItems.cpp b/src/dabOutput/edi/TagItems.cpp deleted file mode 100644 index dfb4934..0000000 --- a/src/dabOutput/edi/TagItems.cpp +++ /dev/null @@ -1,216 +0,0 @@ -/* - EDI output. - This defines a few TAG items as defined ETSI TS 102 821 and - ETSI TS 102 693 - - Copyright (C) 2019 - Matthias P. Braendli, matthias.braendli@mpb.li - - http://www.opendigitalradio.org - - */ -/* - This file is part of ODR-DabMux. - - ODR-DabMux is free software: you can redistribute it and/or modify - it under the terms of the GNU General Public License as - published by the Free Software Foundation, either version 3 of the - License, or (at your option) any later version. - - ODR-DabMux is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with ODR-DabMux. If not, see . - */ - -#include "config.h" -#include "TagItems.h" -#include -#include -#include -#include -#include - -namespace edi { - -std::vector TagStarPTR::Assemble() -{ - //std::cerr << "TagItem *ptr" << std::endl; - std::string pack_data("*ptr"); - std::vector packet(pack_data.begin(), pack_data.end()); - - packet.push_back(0); - packet.push_back(0); - packet.push_back(0); - packet.push_back(0x40); - - std::string protocol("DETI"); - packet.insert(packet.end(), protocol.begin(), protocol.end()); - - // Major - packet.push_back(0); - packet.push_back(0); - - // Minor - packet.push_back(0); - packet.push_back(0); - return packet; -} - -std::vector TagDETI::Assemble() -{ - std::string pack_data("deti"); - std::vector packet(pack_data.begin(), pack_data.end()); - packet.reserve(256); - - // Placeholder for length - packet.push_back(0); - packet.push_back(0); - packet.push_back(0); - packet.push_back(0); - - uint8_t fct = dlfc % 250; - uint8_t fcth = dlfc / 250; - - - uint16_t detiHeader = fct | (fcth << 8) | (rfudf << 13) | (ficf << 14) | (atstf << 15); - packet.push_back(detiHeader >> 8); - packet.push_back(detiHeader & 0xFF); - - uint32_t etiHeader = mnsc | (rfu << 16) | (rfa << 17) | - (fp << 19) | (mid << 22) | (stat << 24); - packet.push_back((etiHeader >> 24) & 0xFF); - packet.push_back((etiHeader >> 16) & 0xFF); - packet.push_back((etiHeader >> 8) & 0xFF); - packet.push_back(etiHeader & 0xFF); - - if (atstf) { - packet.push_back(utco); - - packet.push_back((seconds >> 24) & 0xFF); - packet.push_back((seconds >> 16) & 0xFF); - packet.push_back((seconds >> 8) & 0xFF); - packet.push_back(seconds & 0xFF); - - packet.push_back((tsta >> 16) & 0xFF); - packet.push_back((tsta >> 8) & 0xFF); - packet.push_back(tsta & 0xFF); - } - - if (ficf) { - for (size_t i = 0; i < fic_length; i++) { - packet.push_back(fic_data[i]); - } - } - - if (rfudf) { - packet.push_back((rfud >> 16) & 0xFF); - packet.push_back((rfud >> 8) & 0xFF); - packet.push_back(rfud & 0xFF); - } - - // calculate and update size - // remove TAG name and TAG length fields and convert to bits - uint32_t taglength = (packet.size() - 8) * 8; - - // write length into packet - packet[4] = (taglength >> 24) & 0xFF; - packet[5] = (taglength >> 16) & 0xFF; - packet[6] = (taglength >> 8) & 0xFF; - packet[7] = taglength & 0xFF; - - dlfc = (dlfc+1) % 5000; - - /* - std::cerr << "TagItem deti, packet.size " << packet.size() << std::endl; - std::cerr << " fic length " << fic_length << std::endl; - std::cerr << " length " << taglength / 8 << std::endl; - */ - return packet; -} - -void TagDETI::set_edi_time(const std::time_t t, int tai_utc_offset) -{ - utco = tai_utc_offset - 32; - - const std::time_t posix_timestamp_1_jan_2000 = 946684800; - - seconds = t - posix_timestamp_1_jan_2000 + utco; -} - -std::vector TagESTn::Assemble() -{ - std::string pack_data("est"); - std::vector packet(pack_data.begin(), pack_data.end()); - packet.reserve(mst_length*8 + 16); - - packet.push_back(id); - - // Placeholder for length - packet.push_back(0); - packet.push_back(0); - packet.push_back(0); - packet.push_back(0); - - if (tpl > 0x3F) { - throw std::runtime_error("TagESTn: invalid TPL value"); - } - - if (sad > 0x3FF) { - throw std::runtime_error("TagESTn: invalid SAD value"); - } - - if (scid > 0x3F) { - throw std::runtime_error("TagESTn: invalid SCID value"); - } - - uint32_t sstc = (scid << 18) | (sad << 8) | (tpl << 2) | rfa; - packet.push_back((sstc >> 16) & 0xFF); - packet.push_back((sstc >> 8) & 0xFF); - packet.push_back(sstc & 0xFF); - - for (size_t i = 0; i < mst_length * 8; i++) { - packet.push_back(mst_data[i]); - } - - // calculate and update size - // remove TAG name and TAG length fields and convert to bits - uint32_t taglength = (packet.size() - 8) * 8; - - // write length into packet - packet[4] = (taglength >> 24) & 0xFF; - packet[5] = (taglength >> 16) & 0xFF; - packet[6] = (taglength >> 8) & 0xFF; - packet[7] = taglength & 0xFF; - - /* - std::cerr << "TagItem ESTn, length " << packet.size() << std::endl; - std::cerr << " mst_length " << mst_length << std::endl; - */ - return packet; -} - -std::vector TagStarDMY::Assemble() -{ - std::string pack_data("*dmy"); - std::vector packet(pack_data.begin(), pack_data.end()); - - packet.resize(4 + 4 + length_); - - const uint32_t length_bits = length_ * 8; - - packet[4] = (length_bits >> 24) & 0xFF; - packet[5] = (length_bits >> 16) & 0xFF; - packet[6] = (length_bits >> 8) & 0xFF; - packet[7] = length_bits & 0xFF; - - // The remaining bytes in the packet are "undefined data" - - return packet; -} - -} - diff --git a/src/dabOutput/edi/TagItems.h b/src/dabOutput/edi/TagItems.h deleted file mode 100644 index b29a142..0000000 --- a/src/dabOutput/edi/TagItems.h +++ /dev/null @@ -1,149 +0,0 @@ -/* - EDI output. - This defines a few TAG items as defined ETSI TS 102 821 and - ETSI TS 102 693 - - Copyright (C) 2019 - Matthias P. Braendli, matthias.braendli@mpb.li - - http://www.opendigitalradio.org - - */ -/* - This file is part of ODR-DabMux. - - ODR-DabMux is free software: you can redistribute it and/or modify - it under the terms of the GNU General Public License as - published by the Free Software Foundation, either version 3 of the - License, or (at your option) any later version. - - ODR-DabMux is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with ODR-DabMux. If not, see . - */ - -#pragma once - -#include "config.h" -#include "Eti.h" -#include -#include -#include -#include - -namespace edi { - -class TagItem -{ - public: - virtual std::vector Assemble() = 0; -}; - -// ETSI TS 102 693, 5.1.1 Protocol type and revision -class TagStarPTR : public TagItem -{ - public: - std::vector Assemble(); -}; - -// ETSI TS 102 693, 5.1.3 DAB ETI(LI) Management (deti) -class TagDETI : public TagItem -{ - public: - std::vector Assemble(); - - /***** DATA in intermediary format ****/ - // For the ETI Header: must be defined ! - uint8_t stat = 0; - uint8_t mid = 0; - uint8_t fp = 0; - uint8_t rfa = 0; - uint8_t rfu = 0; // MNSC is valid - uint16_t mnsc = 0; - uint16_t dlfc = 0; // modulo 5000 frame counter - - // ATST (optional) - bool atstf = false; // presence of atst data - - /* UTCO: Offset (in seconds) between UTC and the Seconds value. The - * value is expressed as an unsigned 8-bit quantity. As of February - * 2009, the value shall be 2 and shall change as a result of each - * modification of the number of leap seconds, as proscribed by - * International Earth Rotation and Reference Systems Service (IERS). - * - * According to Annex F - * EDI = TAI - 32s (constant) - * EDI = UTC + UTCO - * we derive - * UTCO = TAI-UTC - 32 - * where the TAI-UTC offset is given by the USNO bulletin using - * the ClockTAI module. - */ - uint8_t utco = 0; - - /* Update the EDI time. t is in UTC */ - void set_edi_time(const std::time_t t, int tai_utc_offset); - - /* The number of SI seconds since 2000-01-01 T 00:00:00 UTC as an - * unsigned 32-bit quantity. Contrary to POSIX, this value also - * counts leap seconds. - */ - uint32_t seconds = 0; - - /* TSTA: Shall be the 24 least significant bits of the Time Stamp - * (TIST) field from the STI-D(LI) Frame. The full definition for the - * STI TIST can be found in annex B of EN 300 797 [4]. The most - * significant 8 bits of the TIST field of the incoming STI-D(LI) - * frame, if required, may be carried in the RFAD field. - */ - uint32_t tsta = 0xFFFFFF; - - // the FIC (optional) - bool ficf = false; - const unsigned char* fic_data; - size_t fic_length; - - // rfu - bool rfudf = false; - uint32_t rfud = 0; - - -}; - -// ETSI TS 102 693, 5.1.5 ETI Sub-Channel Stream -class TagESTn : public TagItem -{ - public: - std::vector Assemble(); - - // SSTCn - uint8_t scid; - uint16_t sad; - uint8_t tpl; - uint8_t rfa; - - // Pointer to MSTn data - uint8_t* mst_data; - size_t mst_length; // STLn * 8 bytes - - uint8_t id; -}; - -// ETSI TS 102 821, 5.2.2.2 Dummy padding -class TagStarDMY : public TagItem -{ - public: - /* length is the TAG value length in bytes */ - TagStarDMY(uint32_t length) : length_(length) {} - std::vector Assemble(); - - private: - uint32_t length_; -}; - -} - diff --git a/src/dabOutput/edi/TagPacket.cpp b/src/dabOutput/edi/TagPacket.cpp deleted file mode 100644 index b16dc33..0000000 --- a/src/dabOutput/edi/TagPacket.cpp +++ /dev/null @@ -1,79 +0,0 @@ -/* - Copyright (C) 2014 - Matthias P. Braendli, matthias.braendli@mpb.li - - http://www.opendigitalradio.org - - EDI output. - This defines a TAG Packet. - */ -/* - This file is part of ODR-DabMux. - - ODR-DabMux is free software: you can redistribute it and/or modify - it under the terms of the GNU General Public License as - published by the Free Software Foundation, either version 3 of the - License, or (at your option) any later version. - - ODR-DabMux is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with ODR-DabMux. If not, see . - */ - -#include "config.h" -#include "Eti.h" -#include "TagPacket.h" -#include "TagItems.h" -#include -#include -#include -#include -#include -#include - -namespace edi { - -TagPacket::TagPacket(unsigned int alignment) : m_alignment(alignment) -{ } - -std::vector TagPacket::Assemble() -{ - std::list::iterator tag; - - std::vector packet; - - //std::cerr << "Assemble TAGPacket" << std::endl; - - for (tag = tag_items.begin(); tag != tag_items.end(); ++tag) { - std::vector tag_data = (*tag)->Assemble(); - packet.insert(packet.end(), tag_data.begin(), tag_data.end()); - - //std::cerr << " Add TAGItem of length " << tag_data.size() << std::endl; - } - - if (m_alignment == 0) { /* no padding */ } - else if (m_alignment == 8) { - // Add padding inside TAG packet - while (packet.size() % 8 > 0) { - packet.push_back(0); // TS 102 821, 5.1, "padding shall be undefined" - } - } - else if (m_alignment > 8) { - TagStarDMY dmy(m_alignment - 8); - auto dmy_data = dmy.Assemble(); - packet.insert(packet.end(), dmy_data.begin(), dmy_data.end()); - } - else { - std::cerr << "Invalid alignment requirement " << m_alignment << - " defined in TagPacket" << std::endl; - } - - return packet; -} - -} - diff --git a/src/dabOutput/edi/TagPacket.h b/src/dabOutput/edi/TagPacket.h deleted file mode 100644 index a861cbb..0000000 --- a/src/dabOutput/edi/TagPacket.h +++ /dev/null @@ -1,56 +0,0 @@ -/* - Copyright (C) 2014 - Matthias P. Braendli, matthias.braendli@mpb.li - - http://www.opendigitalradio.org - - EDI output. - This defines a TAG Packet. - */ -/* - This file is part of ODR-DabMux. - - ODR-DabMux is free software: you can redistribute it and/or modify - it under the terms of the GNU General Public License as - published by the Free Software Foundation, either version 3 of the - License, or (at your option) any later version. - - ODR-DabMux is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with ODR-DabMux. If not, see . - */ - -#pragma once - -#include "config.h" -#include "TagItems.h" -#include -#include -#include -#include - -namespace edi { - -// A TagPacket is nothing else than a list of tag items, with an -// Assemble function that puts the bytestream together and adds -// padding such that the total length is a multiple of 8 Bytes. -// -// ETSI TS 102 821, 5.1 Tag Packet -class TagPacket -{ - public: - TagPacket(unsigned int alignment); - std::vector Assemble(); - - std::list tag_items; - - private: - unsigned int m_alignment; -}; - -} - diff --git a/src/dabOutput/edi/Transport.cpp b/src/dabOutput/edi/Transport.cpp deleted file mode 100644 index 187aabe..0000000 --- a/src/dabOutput/edi/Transport.cpp +++ /dev/null @@ -1,189 +0,0 @@ -/* - Copyright (C) 2019 - Matthias P. Braendli, matthias.braendli@mpb.li - - http://www.opendigitalradio.org - - EDI output, - UDP and TCP transports and their configuration - - */ -/* - This file is part of ODR-DabMux. - - ODR-DabMux is free software: you can redistribute it and/or modify - it under the terms of the GNU General Public License as - published by the Free Software Foundation, either version 3 of the - License, or (at your option) any later version. - - ODR-DabMux is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with ODR-DabMux. If not, see . - */ - -#include "Transport.h" -#include - -using namespace std; - -namespace edi { - -void configuration_t::print() const -{ - etiLog.level(info) << "EDI"; - etiLog.level(info) << " verbose " << verbose; - for (auto edi_dest : destinations) { - if (auto udp_dest = dynamic_pointer_cast(edi_dest)) { - etiLog.level(info) << " UDP to " << udp_dest->dest_addr << ":" << dest_port; - if (not udp_dest->source_addr.empty()) { - etiLog.level(info) << " source " << udp_dest->source_addr; - etiLog.level(info) << " ttl " << udp_dest->ttl; - } - etiLog.level(info) << " source port " << udp_dest->source_port; - } - else if (auto tcp_dest = dynamic_pointer_cast(edi_dest)) { - etiLog.level(info) << " TCP listening on port " << tcp_dest->listen_port; - etiLog.level(info) << " max frames queued " << tcp_dest->max_frames_queued; - } - else if (auto tcp_dest = dynamic_pointer_cast(edi_dest)) { - etiLog.level(info) << " TCP client connecting to " << tcp_dest->dest_addr << ":" << tcp_dest->dest_port; - etiLog.level(info) << " max frames queued " << tcp_dest->max_frames_queued; - } - else { - throw logic_error("EDI destination not implemented"); - } - } - if (interleaver_enabled()) { - etiLog.level(info) << " interleave " << latency_frames * 24 << " ms"; - } -} - - -Sender::Sender(const configuration_t& conf) : - m_conf(conf), - edi_pft(m_conf) -{ - if (m_conf.verbose) { - etiLog.log(info, "Setup EDI"); - } - - for (const auto& edi_dest : m_conf.destinations) { - if (const auto udp_dest = dynamic_pointer_cast(edi_dest)) { - auto udp_socket = std::make_shared(udp_dest->source_port); - - if (not udp_dest->source_addr.empty()) { - udp_socket->setMulticastSource(udp_dest->source_addr.c_str()); - udp_socket->setMulticastTTL(udp_dest->ttl); - } - - udp_sockets.emplace(udp_dest.get(), udp_socket); - } - else if (auto tcp_dest = dynamic_pointer_cast(edi_dest)) { - auto dispatcher = make_shared(tcp_dest->max_frames_queued); - dispatcher->start(tcp_dest->listen_port, "0.0.0.0"); - tcp_dispatchers.emplace(tcp_dest.get(), dispatcher); - } - else if (auto tcp_dest = dynamic_pointer_cast(edi_dest)) { - auto tcp_socket = make_shared(); - tcp_socket->connect(tcp_dest->dest_addr, tcp_dest->dest_port); - tcp_senders.emplace(tcp_dest.get(), tcp_socket); - } - else { - throw logic_error("EDI destination not implemented"); - } - } - - if (m_conf.interleaver_enabled()) { - edi_interleaver.SetLatency(m_conf.latency_frames); - } - - if (m_conf.dump) { - edi_debug_file.open("./edi.debug"); - } - - if (m_conf.verbose) { - etiLog.log(info, "EDI set up"); - } -} - -void Sender::write(const TagPacket& tagpacket) -{ - // Assemble into one AF Packet - edi::AFPacket af_packet = edi_afPacketiser.Assemble(tagpacket); - - if (m_conf.enable_pft) { - // Apply PFT layer to AF Packet (Reed Solomon FEC and Fragmentation) - vector edi_fragments = edi_pft.Assemble(af_packet); - - if (m_conf.verbose) { - fprintf(stderr, "EDI number of PFT fragment before interleaver %zu\n", - edi_fragments.size()); - } - - if (m_conf.interleaver_enabled()) { - edi_fragments = edi_interleaver.Interleave(edi_fragments); - } - - // Send over ethernet - for (const auto& edi_frag : edi_fragments) { - for (auto& dest : m_conf.destinations) { - if (const auto& udp_dest = dynamic_pointer_cast(dest)) { - Socket::InetAddress addr; - addr.resolveUdpDestination(udp_dest->dest_addr, m_conf.dest_port); - - udp_sockets.at(udp_dest.get())->send(edi_frag, addr); - } - else if (auto tcp_dest = dynamic_pointer_cast(dest)) { - tcp_dispatchers.at(tcp_dest.get())->write(edi_frag); - } - else if (auto tcp_dest = dynamic_pointer_cast(dest)) { - tcp_senders.at(tcp_dest.get())->sendall(edi_frag.data(), edi_frag.size()); - } - else { - throw logic_error("EDI destination not implemented"); - } - } - - if (m_conf.dump) { - ostream_iterator debug_iterator(edi_debug_file); - copy(edi_frag.begin(), edi_frag.end(), debug_iterator); - } - } - - if (m_conf.verbose) { - fprintf(stderr, "EDI number of PFT fragments %zu\n", - edi_fragments.size()); - } - } - else { - // Send over ethernet - for (auto& dest : m_conf.destinations) { - if (const auto& udp_dest = dynamic_pointer_cast(dest)) { - Socket::InetAddress addr; - addr.resolveUdpDestination(udp_dest->dest_addr, m_conf.dest_port); - - udp_sockets.at(udp_dest.get())->send(af_packet, addr); - } - else if (auto tcp_dest = dynamic_pointer_cast(dest)) { - tcp_dispatchers.at(tcp_dest.get())->write(af_packet); - } - else if (auto tcp_dest = dynamic_pointer_cast(dest)) { - tcp_senders.at(tcp_dest.get())->sendall(af_packet.data(), af_packet.size()); - } - else { - throw logic_error("EDI destination not implemented"); - } - } - - if (m_conf.dump) { - ostream_iterator debug_iterator(edi_debug_file); - copy(af_packet.begin(), af_packet.end(), debug_iterator); - } - } -} - -} diff --git a/src/dabOutput/edi/Transport.h b/src/dabOutput/edi/Transport.h deleted file mode 100644 index 9633275..0000000 --- a/src/dabOutput/edi/Transport.h +++ /dev/null @@ -1,71 +0,0 @@ -/* - Copyright (C) 2019 - Matthias P. Braendli, matthias.braendli@mpb.li - - http://www.opendigitalradio.org - - EDI output, - UDP and TCP transports and their configuration - - */ -/* - This file is part of ODR-DabMux. - - ODR-DabMux is free software: you can redistribute it and/or modify - it under the terms of the GNU General Public License as - published by the Free Software Foundation, either version 3 of the - License, or (at your option) any later version. - - ODR-DabMux is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with ODR-DabMux. If not, see . - */ - -#pragma once - -#include "config.h" -#include "dabOutput/edi/Config.h" -#include "AFPacket.h" -#include "PFT.h" -#include "Interleaver.h" -#include "Socket.h" -#include -#include -#include -#include -#include - -namespace edi { - -/** Configuration for EDI output */ - -class Sender { - public: - Sender(const configuration_t& conf); - - void write(const TagPacket& tagpacket); - - private: - configuration_t m_conf; - std::ofstream edi_debug_file; - - // The TagPacket will then be placed into an AFPacket - edi::AFPacketiser edi_afPacketiser; - - // The AF Packet will be protected with reed-solomon and split in fragments - edi::PFT edi_pft; - - // To mitigate for burst packet loss, PFT fragments can be sent out-of-order - edi::Interleaver edi_interleaver; - - std::unordered_map> udp_sockets; - std::unordered_map> tcp_dispatchers; - std::unordered_map> tcp_senders; -}; - -} - diff --git a/src/zmq2edi/EDISender.cpp b/src/zmq2edi/EDISender.cpp index 2128abf..2188f8a 100644 --- a/src/zmq2edi/EDISender.cpp +++ b/src/zmq2edi/EDISender.cpp @@ -79,7 +79,7 @@ void EDISender::print_configuration() void EDISender::send_eti_frame(uint8_t* p, metadata_t metadata) { edi::TagDETI edi_tagDETI; - edi::TagStarPTR edi_tagStarPtr; + edi::TagStarPTR edi_tagStarPtr("DETI"); map edi_subchannelToTag; // The above Tag Items will be assembled into a TAG Packet edi::TagPacket edi_tagpacket(edi_conf.tagpacket_alignment); diff --git a/src/zmq2edi/EDISender.h b/src/zmq2edi/EDISender.h index bb9c8bc..3525b4b 100644 --- a/src/zmq2edi/EDISender.h +++ b/src/zmq2edi/EDISender.h @@ -34,9 +34,9 @@ #include #include "ThreadsafeQueue.h" #include "dabOutput/dabOutput.h" -#include "dabOutput/edi/TagItems.h" -#include "dabOutput/edi/TagPacket.h" -#include "dabOutput/edi/Transport.h" +#include "edioutput/TagItems.h" +#include "edioutput/TagPacket.h" +#include "edioutput/Transport.h" // This metadata gets transmitted in the zmq stream struct metadata_t { -- cgit v1.2.3