summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--.travis.yml22
-rw-r--r--Makefile.am118
-rw-r--r--configure.ac3
-rw-r--r--doc/advanced.mux17
-rw-r--r--doc/example.mux3
-rw-r--r--lib/ClockTAI.cpp (renamed from src/ClockTAI.cpp)161
-rw-r--r--lib/ClockTAI.h (renamed from src/ClockTAI.h)14
-rw-r--r--lib/Log.cpp (renamed from src/Log.cpp)91
-rw-r--r--lib/Log.h (renamed from src/Log.h)63
-rw-r--r--lib/ReedSolomon.cpp (renamed from src/ReedSolomon.cpp)4
-rw-r--r--lib/ReedSolomon.h (renamed from src/ReedSolomon.h)0
-rw-r--r--lib/RemoteControl.cpp (renamed from src/RemoteControl.cpp)132
-rw-r--r--lib/RemoteControl.h (renamed from src/RemoteControl.h)46
-rw-r--r--lib/Socket.cpp898
-rw-r--r--lib/Socket.h294
-rw-r--r--lib/ThreadsafeQueue.h (renamed from src/ThreadsafeQueue.h)14
-rw-r--r--lib/crc.c (renamed from src/crc.c)0
-rw-r--r--lib/crc.h (renamed from src/crc.h)0
-rw-r--r--lib/edi/PFT.cpp574
-rw-r--r--lib/edi/PFT.hpp166
-rw-r--r--lib/edi/STIDecoder.cpp191
-rw-r--r--lib/edi/STIDecoder.hpp122
-rw-r--r--lib/edi/STIWriter.cpp139
-rw-r--r--lib/edi/STIWriter.hpp84
-rw-r--r--lib/edi/buffer_unpack.hpp62
-rw-r--r--lib/edi/common.cpp323
-rw-r--r--lib/edi/common.hpp92
-rw-r--r--lib/edioutput/AFPacket.cpp (renamed from src/dabOutput/edi/AFPacket.cpp)12
-rw-r--r--lib/edioutput/AFPacket.h (renamed from src/dabOutput/edi/AFPacket.h)12
-rw-r--r--lib/edioutput/Config.h (renamed from src/dabOutput/edi/Config.h)19
-rw-r--r--lib/edioutput/Interleaver.cpp (renamed from src/dabOutput/edi/Interleaver.cpp)0
-rw-r--r--lib/edioutput/Interleaver.h (renamed from src/dabOutput/edi/Interleaver.h)12
-rw-r--r--lib/edioutput/PFT.cpp (renamed from src/dabOutput/edi/PFT.cpp)14
-rw-r--r--lib/edioutput/PFT.h (renamed from src/dabOutput/edi/PFT.h)14
-rw-r--r--lib/edioutput/TagItems.cpp (renamed from src/dabOutput/edi/TagItems.cpp)183
-rw-r--r--lib/edioutput/TagItems.h (renamed from src/dabOutput/edi/TagItems.h)94
-rw-r--r--lib/edioutput/TagPacket.cpp (renamed from src/dabOutput/edi/TagPacket.cpp)13
-rw-r--r--lib/edioutput/TagPacket.h (renamed from src/dabOutput/edi/TagPacket.h)12
-rw-r--r--lib/edioutput/Transport.cpp (renamed from src/dabOutput/edi/Transport.cpp)78
-rw-r--r--lib/edioutput/Transport.h (renamed from src/dabOutput/edi/Transport.h)20
-rw-r--r--src/ConfigParser.cpp61
-rw-r--r--src/DabMultiplexer.cpp8
-rw-r--r--src/DabMultiplexer.h11
-rw-r--r--src/DabMux.cpp5
-rw-r--r--src/InetAddress.cpp155
-rw-r--r--src/InetAddress.h78
-rw-r--r--src/TcpSocket.cpp359
-rw-r--r--src/TcpSocket.h164
-rw-r--r--src/UdpSocket.cpp256
-rw-r--r--src/UdpSocket.h174
-rw-r--r--src/dabOutput/dabOutput.h21
-rw-r--r--src/dabOutput/dabOutputTcp.cpp2
-rw-r--r--src/dabOutput/dabOutputUdp.cpp65
-rw-r--r--src/fig/FIG0_19.cpp13
-rw-r--r--src/input/Edi.cpp221
-rw-r--r--src/input/Edi.h83
-rw-r--r--src/input/Udp.cpp59
-rw-r--r--src/input/Udp.h4
-rw-r--r--src/utils.cpp2
-rw-r--r--src/zmq2edi/EDISender.cpp2
-rw-r--r--src/zmq2edi/EDISender.h6
-rw-r--r--src/zmq2edi/zmq2edi.cpp135
-rw-r--r--src/zmq2farsync/zmq2farsync.cpp115
63 files changed, 4282 insertions, 1833 deletions
diff --git a/.travis.yml b/.travis.yml
index 16d4a7c..180c16e 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -9,9 +9,9 @@ matrix:
compiler: clang
# GCC and clang builds on Linux
- - env: MATRIX_EVAL="CC=gcc-6 CXX=g++-6" CONF="--disable-output-edi"
+ - env: MATRIX_EVAL="CC=gcc-9 CXX=g++-9" CONF="--disable-output-edi"
os: linux
- dist: trusty
+ dist: xenial
sudo: required
compiler: gcc
addons: &linuxaddons
@@ -20,18 +20,18 @@ matrix:
- ubuntu-toolchain-r-test
packages: &packages
- libzmq3-dev
- - libzmq3
+ - libzmq5
- automake
- libtool
- - libboost1.55-all-dev
+ - libboost1.58-all-dev
- libcurl4-openssl-dev
- - g++-6
+ - g++-9
- - env: MATRIX_EVAL="CC=gcc-6 CXX=g++-6" CONF="--enable-output-raw"
+ - env: MATRIX_EVAL="CC=gcc-9 CXX=g++-9" CONF="--enable-output-raw"
compiler: gcc
addons: *linuxaddons
- - env: MATRIX_EVAL="CC=gcc-6 CXX=g++-6" CONF=""
+ - env: MATRIX_EVAL="CC=gcc-9 CXX=g++-9" CONF=""
compiler: gcc
addons: *linuxaddons
@@ -41,15 +41,15 @@ matrix:
apt:
sources:
- ubuntu-toolchain-r-test
- - llvm-toolchain-trusty-4.0
+ - llvm-toolchain-xenial-8
packages:
- libzmq3-dev
- - libzmq3
+ - libzmq5
- automake
- libtool
- - libboost1.55-all-dev
+ - libboost1.58-all-dev
- libcurl4-openssl-dev
- - clang-4.0
+ - clang-8
before_install:
- eval "${MATRIX_EVAL}"
diff --git a/Makefile.am b/Makefile.am
index 2f06879..e426f74 100644
--- a/Makefile.am
+++ b/Makefile.am
@@ -69,6 +69,8 @@ odr_dabmux_SOURCES =src/DabMux.cpp \
src/input/File.h \
src/input/Udp.cpp \
src/input/Udp.h \
+ src/input/Edi.cpp \
+ src/input/Edi.h \
src/dabOutput/dabOutput.h \
src/dabOutput/dabOutputFile.cpp \
src/dabOutput/dabOutputFifo.cpp \
@@ -79,47 +81,17 @@ odr_dabmux_SOURCES =src/DabMux.cpp \
src/dabOutput/dabOutputZMQ.cpp \
src/dabOutput/metadata.h \
src/dabOutput/metadata.cpp \
- src/dabOutput/edi/AFPacket.cpp \
- src/dabOutput/edi/AFPacket.h \
- src/dabOutput/edi/Config.h \
- src/dabOutput/edi/Interleaver.cpp \
- src/dabOutput/edi/Interleaver.h \
- src/dabOutput/edi/PFT.cpp \
- src/dabOutput/edi/PFT.h \
- src/dabOutput/edi/TagItems.cpp \
- src/dabOutput/edi/TagItems.h \
- src/dabOutput/edi/TagPacket.cpp \
- src/dabOutput/edi/TagPacket.h \
- src/dabOutput/edi/Transport.cpp \
- src/dabOutput/edi/Transport.h \
- src/ClockTAI.h \
- src/ClockTAI.cpp \
src/ConfigParser.cpp \
src/ConfigParser.h \
src/Eti.h \
src/Eti.cpp \
- src/InetAddress.h \
- src/InetAddress.cpp \
src/Interleaver.h \
src/Interleaver.cpp \
- src/Log.h \
- src/Log.cpp \
src/ManagementServer.h \
src/ManagementServer.cpp \
src/MuxElements.cpp \
src/MuxElements.h \
src/PcDebug.h \
- src/ReedSolomon.h \
- src/ReedSolomon.cpp \
- src/RemoteControl.cpp \
- src/RemoteControl.h \
- src/TcpSocket.h \
- src/TcpSocket.cpp \
- src/UdpSocket.h \
- src/UdpSocket.cpp \
- src/ThreadsafeQueue.h \
- src/crc.h \
- src/crc.c \
src/fig/FIG.h \
src/fig/FIG.cpp \
src/fig/FIG0.h \
@@ -167,6 +139,41 @@ odr_dabmux_SOURCES =src/DabMux.cpp \
src/PrbsGenerator.h \
src/utils.cpp \
src/utils.h \
+ lib/crc.h \
+ lib/crc.c \
+ lib/ClockTAI.h \
+ lib/ClockTAI.cpp \
+ lib/Log.h \
+ lib/Log.cpp \
+ lib/RemoteControl.cpp \
+ lib/RemoteControl.h \
+ lib/edi/STIDecoder.cpp \
+ lib/edi/STIDecoder.h \
+ lib/edi/STIWriter.cpp \
+ lib/edi/STIWriter.h \
+ lib/edi/PFT.cpp \
+ lib/edi/PFT.h \
+ lib/edi/common.cpp \
+ lib/edi/common.h \
+ lib/edi/buffer_unpack.hpp \
+ lib/edioutput/AFPacket.cpp \
+ lib/edioutput/AFPacket.h \
+ lib/edioutput/Config.h \
+ lib/edioutput/Interleaver.cpp \
+ lib/edioutput/Interleaver.h \
+ lib/edioutput/PFT.cpp \
+ lib/edioutput/PFT.h \
+ lib/edioutput/TagItems.cpp \
+ lib/edioutput/TagItems.h \
+ lib/edioutput/TagPacket.cpp \
+ lib/edioutput/TagPacket.h \
+ lib/edioutput/Transport.cpp \
+ lib/edioutput/Transport.h \
+ lib/ReedSolomon.h \
+ lib/ReedSolomon.cpp \
+ lib/Socket.h \
+ lib/Socket.cpp \
+ lib/ThreadsafeQueue.h \
lib/zmq.hpp \
$(lib_fec_sources) \
$(lib_charset_sources)
@@ -178,8 +185,8 @@ zmqinput_keygen_CFLAGS = -Wall $(GITVERSION_FLAGS) $(ZMQ_CPPFLAGS)
odr_zmq2farsync_SOURCES = src/zmq2farsync/zmq2farsync.cpp \
src/dabOutput/dabOutput.h \
src/dabOutput/dabOutputRaw.cpp \
- src/Log.h \
- src/Log.cpp \
+ lib/Log.h \
+ lib/Log.cpp \
lib/zmq.hpp
odr_zmq2farsync_LDADD = $(ZMQ_LIBS)
@@ -192,31 +199,28 @@ odr_zmq2edi_SOURCES = src/zmq2edi/zmq2edi.cpp \
src/dabOutput/dabOutput.h \
src/dabOutput/metadata.h \
src/dabOutput/metadata.cpp \
- src/dabOutput/edi/AFPacket.cpp \
- src/dabOutput/edi/AFPacket.h \
- src/dabOutput/edi/Config.h \
- src/dabOutput/edi/Interleaver.cpp \
- src/dabOutput/edi/Interleaver.h \
- src/dabOutput/edi/PFT.cpp \
- src/dabOutput/edi/PFT.h \
- src/dabOutput/edi/TagItems.cpp \
- src/dabOutput/edi/TagItems.h \
- src/dabOutput/edi/TagPacket.cpp \
- src/dabOutput/edi/TagPacket.h \
- src/dabOutput/edi/Transport.cpp \
- src/dabOutput/edi/Transport.h \
- src/InetAddress.h \
- src/InetAddress.cpp \
- src/TcpSocket.h \
- src/TcpSocket.cpp \
- src/UdpSocket.h \
- src/UdpSocket.cpp \
- src/ReedSolomon.h \
- src/ReedSolomon.cpp \
- src/Log.h \
- src/Log.cpp \
- src/crc.h \
- src/crc.c \
+ lib/edioutput/AFPacket.cpp \
+ lib/edioutput/AFPacket.h \
+ lib/edioutput/Config.h \
+ lib/edioutput/Interleaver.cpp \
+ lib/edioutput/Interleaver.h \
+ lib/edioutput/PFT.cpp \
+ lib/edioutput/PFT.h \
+ lib/edioutput/TagItems.cpp \
+ lib/edioutput/TagItems.h \
+ lib/edioutput/TagPacket.cpp \
+ lib/edioutput/TagPacket.h \
+ lib/edioutput/Transport.cpp \
+ lib/edioutput/Transport.h \
+ lib/Log.h \
+ lib/Log.cpp \
+ lib/crc.h \
+ lib/crc.c \
+ lib/ReedSolomon.h \
+ lib/ReedSolomon.cpp \
+ lib/Socket.h \
+ lib/Socket.cpp \
+ lib/ThreadsafeQueue.h \
lib/zmq.hpp \
$(lib_fec_sources)
diff --git a/configure.ac b/configure.ac
index 50623a2..196780c 100644
--- a/configure.ac
+++ b/configure.ac
@@ -40,7 +40,6 @@ AX_CXX_COMPILE_STDCXX_11(noext,mandatory)
AX_PTHREAD([], AC_MSG_ERROR([requires pthread]))
AX_BOOST_BASE([1.48.0], [], AC_MSG_ERROR([BOOST 1.48 or later is required]))
AX_BOOST_SYSTEM
-AX_BOOST_ASIO
# Checks for header files.
AC_HEADER_STDC
@@ -128,7 +127,7 @@ AX_ZMQ([4.0.0], [], AC_MSG_ERROR(ZeroMQ 4.0.0 is required))
AC_DEFINE([HAVE_INPUT_ZEROMQ], [1], [Define if ZeroMQ input is enabled])
AC_DEFINE([HAVE_OUTPUT_ZEROMQ], [1], [Define if ZeroMQ output is enabled])
-AC_DEFINE([HAVE_RC_ZEROMQ], [1], [Define if ZeroMQ enabled for rc])
+AC_DEFINE([HAVE_ZEROMQ], [1], [Define if ZeroMQ enabled for rc])
# Do not build odr-zmq2farsync if no RAW output
AM_CONDITIONAL([HAVE_OUTPUT_RAW_TEST],
diff --git a/doc/advanced.mux b/doc/advanced.mux
index fb67b82..b9cec05 100644
--- a/doc/advanced.mux
+++ b/doc/advanced.mux
@@ -163,7 +163,8 @@ subchannels {
sub-fu {
type audio
; example file input
- inputfile "funk.mp2"
+ inputproto zmq
+ inputuri "funk.mp2"
nonblock false
bitrate 128
id 10
@@ -188,7 +189,8 @@ subchannels {
; Receive STI-D(LI) carried in STI(PI, X) inside RTP using UDP.
; This is intended to be compatible with AVT audio encoders.
; EXPERIMENTAL!
- inputfile "sti-rtp://127.0.0.1:32010"
+ inputproto sti
+ inputuri "rtp://127.0.0.1:32010"
bitrate 96
id 3
protection 3
@@ -196,11 +198,12 @@ subchannels {
sub-ri {
type dabplus
; example file input
- ;inputfile "rick.dabp"
+ ;inputuri "rick.dabp"
; example zmq input:
; Accepts connections to port 9000 from any interface.
; Use ODR-AudioEnc as encoder
- inputfile "tcp://*:9000"
+ inputproto zmq
+ inputuri "tcp://*:9000"
bitrate 96
id 1
protection 1
@@ -256,7 +259,8 @@ subchannels {
; for audio types, you can use the ZeroMQ input (if compiled in)
; with the following configuration in combination with
; Toolame-DAB
- inputfile "tcp://*:9001"
+ inputproto zmq
+ inputuri "tcp://*:9001"
bitrate 96
id 1
protection 1
@@ -273,7 +277,8 @@ subchannels {
type data
; Use the default PRBS polynomial.
- inputfile "prbs://"
+ inputproto prbs
+ inputuri "prbs://"
; To use another polynomial, set it in the url as hexadecimal
; The default polynomial is G(x) = x^20 + x^17 + 1, represented as
diff --git a/doc/example.mux b/doc/example.mux
index 6c2bc18..31e072d 100644
--- a/doc/example.mux
+++ b/doc/example.mux
@@ -171,7 +171,8 @@ subchannels {
type dabplus
; Accepts connections to port 9000 from any interface.
; Use ODR-AudioEnc as encoder
- inputfile "tcp://*:9000"
+ inputproto zmq
+ inputuri "tcp://*:9000"
bitrate 96
id 1
protection 3
diff --git a/src/ClockTAI.cpp b/lib/ClockTAI.cpp
index c376c07..2656345 100644
--- a/src/ClockTAI.cpp
+++ b/lib/ClockTAI.cpp
@@ -9,27 +9,27 @@
http://www.opendigitalradio.org
*/
/*
- This file is part of ODR-DabMux.
+ This file is part of the ODR-mmbTools.
- ODR-DabMux is free software: you can redistribute it and/or modify
+ This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as
published by the Free Software Foundation, either version 3 of the
License, or (at your option) any later version.
- ODR-DabMux is distributed in the hope that it will be useful,
+ This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
- along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>.
-*/
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
/* This file downloads the TAI-UTC bulletins from the from IETF and parses them
* so that correct time can be communicated in EDI timestamps.
*
* This file contains self-test code that can be executed by running
- * g++ -g -Wall -DTEST -DHAVE_CURL -std=c++11 -lcurl -pthread \
+ * g++ -g -Wall -DTAI_TEST -DHAVE_CURL -std=c++11 -lcurl -pthread \
* ClockTAI.cpp Log.cpp RemoteControl.cpp -lboost_system -o taitest && ./taitest
*/
@@ -40,9 +40,9 @@
#include "ClockTAI.h"
#include "Log.h"
-#include <time.h>
-#include <stdio.h>
-#include <errno.h>
+#include <ctime>
+#include <cstdio>
+#include <cerrno>
#if SUPPORT_SETTING_CLOCK_TAI
# include <sys/timex.h>
#endif
@@ -54,10 +54,13 @@
#include <iostream>
#include <algorithm>
#include <regex>
+#include <unistd.h>
+#include <sys/stat.h>
+#include <fcntl.h>
using namespace std;
-#ifdef TEST
+#ifdef DOWNLOADED_IN_THE_PAST_TEST
static bool wait_longer = true;
#endif
@@ -76,7 +79,7 @@ static array<const char*, 2> default_tai_urls = {
// According to the Filesystem Hierarchy Standard, the data in
// /var/tmp "must not be deleted when the system is booted."
-static const char *tai_cache_location = "/var/tmp/odr-dabmux-leap-seconds.cache";
+static const char *tai_cache_location = "/var/tmp/odr-leap-seconds.cache";
// read TAI offset from a valid bulletin in IETF format
static int parse_ietf_bulletin(const std::string& bulletin)
@@ -127,7 +130,7 @@ static int parse_ietf_bulletin(const std::string& bulletin)
tai_utc_offset = offset;
tai_utc_offset_valid = true;
}
-#if TEST
+#if TAI_TEST
else {
cerr << "IETF Ignoring offset " << bulletin_offset <<
" at TS " << bulletin_ntp_timestamp <<
@@ -183,7 +186,7 @@ static bulletin_state parse_bulletin(const string& bulletin)
const int64_t expiry_unix =
std::atoll(expiry_data_str.c_str()) - ntp_unix_offset;
-#ifdef TEST
+#ifdef TAI_TEST
etiLog.level(info) << "Bulletin expires in " << expiry_unix - now;
#endif
ret.expiry = expiry_unix - now;
@@ -246,17 +249,46 @@ static string download_tai_utc_bulletin(const char* url)
static string load_bulletin_from_file(const char* cache_filename)
{
- // Clear the bulletin
- ifstream f(cache_filename);
- if (not f.good()) {
- return {};
+ int fd = open(cache_filename, O_RDWR); // lockf requires O_RDWR
+ if (fd == -1) {
+ etiLog.level(error) << "TAI-UTC bulletin open cache for reading: " <<
+ strerror(errno);
+ return "";
}
- stringstream ss;
- ss << f.rdbuf();
- f.close();
+ lseek(fd, 0, SEEK_SET);
+
+ vector<char> buf(1024);
+ vector<char> new_bulletin_data;
+
+ ssize_t ret = lockf(fd, F_LOCK, 0);
+ if (ret == 0) {
+ // exclusive lock acquired
+
+ do {
+ ret = read(fd, buf.data(), buf.size());
+
+ if (ret == -1) {
+ close(fd);
+ etiLog.level(error) << "TAI-UTC bulletin read cache: " <<
+ strerror(errno);
+ return "";
+ }
+
+ copy(buf.data(), buf.data() + ret, back_inserter(new_bulletin_data));
+ } while (ret > 0);
+
+ close(fd);
- return ss.str();
+ return string{new_bulletin_data.data(), new_bulletin_data.size()};
+ }
+ else {
+ etiLog.level(error) <<
+ "TAI-UTC bulletin acquire cache lock for reading: " <<
+ strerror(errno);
+ close(fd);
+ }
+ return "";
}
ClockTAI::ClockTAI(const std::vector<std::string>& bulletin_urls) :
@@ -289,7 +321,7 @@ int ClockTAI::get_valid_offset()
const auto state = parse_bulletin(m_bulletin);
if (state.usable()) {
-#if TEST
+#if TAI_TEST
etiLog.level(info) << "Bulletin already valid";
#endif
offset = state.offset;
@@ -297,20 +329,25 @@ int ClockTAI::get_valid_offset()
}
else {
const auto cache_bulletin = load_bulletin_from_file(tai_cache_location);
+#if TAI_TEST
+ etiLog.level(info) << "Loaded cache bulletin with " <<
+ std::count_if(cache_bulletin.cbegin(), cache_bulletin.cend(),
+ [](const char c){ return c == '\n'; }) << " lines";
+#endif
const auto cache_state = parse_bulletin(cache_bulletin);
if (cache_state.usable()) {
m_bulletin = cache_bulletin;
offset = cache_state.offset;
offset_valid = true;
-#if TEST
+#if TAI_TEST
etiLog.level(info) << "Bulletin from cache valid with offset=" << offset;
#endif
}
else {
for (const auto url : m_bulletin_urls) {
try {
-#if TEST
+#if TAI_TEST
etiLog.level(info) << "Load bulletin from " << url;
#endif
const auto new_bulletin = download_tai_utc_bulletin(url.c_str());
@@ -368,7 +405,7 @@ int ClockTAI::get_offset()
std::unique_lock<std::mutex> lock(m_data_mutex);
if (not m_offset_valid) {
-#ifdef TEST
+#ifdef DOWNLOADED_IN_THE_PAST_TEST
// Assume we've downloaded it in the past:
m_offset = 37; // Valid in early 2017
@@ -418,7 +455,7 @@ int ClockTAI::get_offset()
m_bulletin_download_time += hours(download_retry_interval_hours);
}
-#ifdef TEST
+#ifdef DOWNLOADED_IN_THE_PAST_TEST
wait_longer = false;
#endif
break;
@@ -426,14 +463,14 @@ int ClockTAI::get_offset()
case future_status::deferred:
case future_status::timeout:
// Not ready yet
-#ifdef TEST
+#ifdef TAI_TEST
etiLog.level(debug) << " async not ready yet";
#endif
break;
}
}
else {
-#ifdef TEST
+#ifdef TAI_TEST
etiLog.level(debug) << " Launch async";
#endif
m_offset_future = async(launch::async, &ClockTAI::get_valid_offset, this);
@@ -463,13 +500,45 @@ int ClockTAI::update_local_tai_clock(int offset)
void ClockTAI::update_cache(const char* cache_filename)
{
- ofstream f(cache_filename);
- if (not f.good()) {
- throw runtime_error("TAI-UTC bulletin open cache for writing");
+ int fd = open(cache_filename, O_RDWR | O_CREAT, 00664);
+ if (fd == -1) {
+ etiLog.level(error) <<
+ "TAI-UTC bulletin open cache for writing: " <<
+ strerror(errno);
+ return;
}
- f << m_bulletin;
- f.close();
+ lseek(fd, 0, SEEK_SET);
+
+ ssize_t ret = lockf(fd, F_LOCK, 0);
+ if (ret == 0) {
+ // exclusive lock acquired
+ const char *data = m_bulletin.data();
+ size_t remaining = m_bulletin.size();
+
+ while (remaining > 0) {
+ ret = write(fd, data, remaining);
+ if (ret == -1) {
+ close(fd);
+ etiLog.level(error) <<
+ "TAI-UTC bulletin write cache: " <<
+ strerror(errno);
+ return;
+ }
+
+ remaining -= ret;
+ data += ret;
+ }
+ etiLog.level(debug) << "TAI-UTC bulletin cache updated";
+ close(fd);
+ }
+ else {
+ close(fd);
+ etiLog.level(error) <<
+ "TAI-UTC bulletin acquire cache lock for writing: " <<
+ strerror(errno);
+ return;
+ }
}
@@ -477,7 +546,7 @@ void ClockTAI::set_parameter(const string& parameter, const string& value)
{
if (parameter == "expiry") {
throw ParameterError("Parameter '" + parameter +
- "' is not read-only in controllable " + get_rc_name());
+ "' is read-only in controllable " + get_rc_name());
}
else {
throw ParameterError("Parameter '" + parameter +
@@ -536,27 +605,3 @@ void debug_tai_clk()
}
#endif
-#if TEST
-int main(int argc, char **argv)
-{
- using namespace std;
-
- ClockTAI tai({});
-
- while (wait_longer) {
- try {
- etiLog.level(info) <<
- "Offset is " << tai.get_offset();
- }
- catch (const exception &e) {
- etiLog.level(error) <<
- "Exception " << e.what();
- }
-
- this_thread::sleep_for(chrono::seconds(2));
- }
-
- return 0;
-}
-#endif
-
diff --git a/src/ClockTAI.h b/lib/ClockTAI.h
index 4b3c2ff..50a6323 100644
--- a/src/ClockTAI.h
+++ b/lib/ClockTAI.h
@@ -9,21 +9,21 @@
http://www.opendigitalradio.org
*/
/*
- This file is part of ODR-DabMux.
+ This file is part of the ODR-mmbTools.
- ODR-DabMux is free software: you can redistribute it and/or modify
+ This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as
published by the Free Software Foundation, either version 3 of the
License, or (at your option) any later version.
- ODR-DabMux is distributed in the hope that it will be useful,
+ This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
- along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>.
-*/
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
/* The EDI output needs TAI clock, according to ETSI TS 102 693 Annex F
* "EDI Timestamps". This module can set the local CLOCK_TAI clock by
@@ -34,8 +34,8 @@
#pragma once
-#include <stdint.h>
-#include <stdlib.h>
+#include <cstdint>
+#include <cstdlib>
#include <sstream>
#include <chrono>
#include <future>
diff --git a/src/Log.cpp b/lib/Log.cpp
index 6b78fe0..2417f3a 100644
--- a/src/Log.cpp
+++ b/lib/Log.cpp
@@ -9,31 +9,32 @@
http://www.opendigitalradio.org
*/
/*
- This file is part of ODR-DabMux.
+ This file is part of the ODR-mmbTools.
- ODR-DabMux is free software: you can redistribute it and/or modify
+ This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as
published by the Free Software Foundation, either version 3 of the
License, or (at your option) any later version.
- ODR-DabMux is distributed in the hope that it will be useful,
+ This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
- along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>.
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <list>
#include <cstdarg>
+#include <cinttypes>
#include <chrono>
#include "Log.h"
using namespace std;
-/* etiLog is a singleton used in all parts of ODR-DabMod to output log messages.
+/* etiLog is a singleton used in all parts of the program to output log messages.
*/
Logger etiLog;
@@ -74,22 +75,40 @@ void Logger::logstr(log_level_t level, std::string&& message)
{
if (level == discard) {
return;
- }
+ }
- /* Remove a potential trailing newline.
- * It doesn't look good in syslog
- */
- if (message[message.length()-1] == '\n') {
- message.resize(message.length()-1);
- }
+ log_message_t m(level, move(message));
+ m_message_queue.push(move(m));
+}
- for (auto &backend : backends) {
- backend->log(level, message);
- }
+void Logger::io_process()
+{
+ while (1) {
+ log_message_t m;
+ try {
+ m_message_queue.wait_and_pop(m);
+ }
+ catch (const ThreadsafeQueueWakeup&) {
+ break;
+ }
+
+ auto message = m.message;
+
+ /* Remove a potential trailing newline.
+ * It doesn't look good in syslog
+ */
+ if (message[message.length()-1] == '\n') {
+ message.resize(message.length()-1);
+ }
+
+ for (auto &backend : backends) {
+ backend->log(m.level, message);
+ }
- {
- std::lock_guard<std::mutex> guard(m_cerr_mutex);
- std::cerr << levels_as_str[level] << " " << message << std::endl;
+ if (m.level != log_level_t::trace) {
+ std::lock_guard<std::mutex> guard(m_cerr_mutex);
+ std::cerr << levels_as_str[m.level] << " " << message << std::endl;
+ }
}
}
@@ -112,7 +131,7 @@ LogToFile::LogToFile(const std::string& filename) : name("FILE")
void LogToFile::log(log_level_t level, const std::string& message)
{
- if (level != log_level_t::discard) {
+ if (not (level == log_level_t::trace or level == log_level_t::discard)) {
const char* log_level_text[] = {
"DEBUG", "INFO", "WARN", "ERROR", "ALERT", "EMERG"};
@@ -125,7 +144,7 @@ void LogToFile::log(log_level_t level, const std::string& message)
void LogToSyslog::log(log_level_t level, const std::string& message)
{
- if (level != log_level_t::discard) {
+ if (not (level == log_level_t::trace or level == log_level_t::discard)) {
int syslog_level = LOG_EMERG;
switch (level) {
case debug: syslog_level = LOG_DEBUG; break;
@@ -141,3 +160,35 @@ void LogToSyslog::log(log_level_t level, const std::string& message)
syslog(syslog_level, SYSLOG_IDENT " %s", message.c_str());
}
}
+
+LogTracer::LogTracer(const string& trace_filename) : name("TRACE")
+{
+ etiLog.level(info) << "Setting up TRACE to " << trace_filename;
+
+ FILE* fd = fopen(trace_filename.c_str(), "a");
+ if (fd == nullptr) {
+ fprintf(stderr, "Cannot open trace file !");
+ throw std::runtime_error("Cannot open trace file !");
+ }
+ m_trace_file.reset(fd);
+
+ using namespace std::chrono;
+ auto now = steady_clock::now().time_since_epoch();
+ m_trace_micros_startup = duration_cast<microseconds>(now).count();
+
+ fprintf(m_trace_file.get(),
+ "0,TRACER,startup at %" PRIu64 "\n", m_trace_micros_startup);
+}
+
+void LogTracer::log(log_level_t level, const std::string& message)
+{
+ if (level == log_level_t::trace) {
+ using namespace std::chrono;
+ const auto now = steady_clock::now().time_since_epoch();
+ const auto micros = duration_cast<microseconds>(now).count();
+
+ fprintf(m_trace_file.get(), "%" PRIu64 ",%s\n",
+ micros - m_trace_micros_startup,
+ message.c_str());
+ }
+}
diff --git a/src/Log.h b/lib/Log.h
index 18f8c99..d5c39e0 100644
--- a/src/Log.h
+++ b/lib/Log.h
@@ -9,20 +9,20 @@
http://www.opendigitalradio.org
*/
/*
- This file is part of ODR-DabMux.
+ This file is part of the ODR-mmbTools.
- ODR-DabMux is free software: you can redistribute it and/or modify
+ This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as
published by the Free Software Foundation, either version 3 of the
License, or (at your option) any later version.
- ODR-DabMux is distributed in the hope that it will be useful,
+ This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
- along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>.
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#pragma once
@@ -40,16 +40,19 @@
#include <list>
#include <stdexcept>
#include <string>
+#include <map>
#include <mutex>
#include <memory>
+#include <thread>
+#include "ThreadsafeQueue.h"
-#define SYSLOG_IDENT "ODR-DabMux"
+#define SYSLOG_IDENT PACKAGE_NAME
#define SYSLOG_FACILITY LOG_LOCAL0
-enum log_level_t {debug = 0, info, warn, error, alert, emerg, discard};
+enum log_level_t {debug = 0, info, warn, error, alert, emerg, trace, discard};
static const std::string levels_as_str[] =
- { " ", " ", "WARN ", "ERROR", "ALERT", "EMERG", "-----"} ;
+ { " ", " ", "WARN ", "ERROR", "ALERT", "EMERG", "TRACE", "-----"} ;
/** Abstract class all backends must inherit from */
class LogBackend {
@@ -97,10 +100,50 @@ class LogToFile : public LogBackend {
const LogToFile& operator=(const LogToFile& other) = delete;
};
+class LogTracer : public LogBackend {
+ public:
+ LogTracer(const std::string& filename);
+ void log(log_level_t level, const std::string& message);
+ std::string get_name() const { return name; }
+ private:
+ std::string name;
+ uint64_t m_trace_micros_startup = 0;
+
+ struct FILEDeleter{ void operator()(FILE* fd){ if(fd) fclose(fd);}};
+ std::unique_ptr<FILE, FILEDeleter> m_trace_file;
+
+ LogTracer(const LogTracer& other) = delete;
+ const LogTracer& operator=(const LogTracer& other) = delete;
+};
+
class LogLine;
+struct log_message_t {
+ log_message_t(log_level_t _level, std::string&& _message) :
+ level(_level),
+ message(move(_message)) {}
+
+ log_message_t() :
+ level(debug),
+ message("") {}
+
+ log_level_t level;
+ std::string message;
+};
+
class Logger {
public:
+ Logger() {
+ m_io_thread = std::thread(&Logger::io_process, this);
+ }
+
+ Logger(const Logger& other) = delete;
+ const Logger& operator=(const Logger& other) = delete;
+ ~Logger() {
+ m_message_queue.trigger_wakeup();
+ m_io_thread.join();
+ }
+
void register_backend(std::shared_ptr<LogBackend> backend);
/* Log the message to all backends */
@@ -108,6 +151,9 @@ class Logger {
void logstr(log_level_t level, std::string&& message);
+ /* All logging IO is done in another thread */
+ void io_process(void);
+
/* Return a LogLine for the given level
* so that you can write etiLog.level(info) << "stuff = " << 21 */
LogLine level(log_level_t level);
@@ -115,6 +161,8 @@ class Logger {
private:
std::list<std::shared_ptr<LogBackend> > backends;
+ ThreadsafeQueue<log_message_t> m_message_queue;
+ std::thread m_io_thread;
std::mutex m_cerr_mutex;
};
@@ -154,4 +202,3 @@ class LogLine {
Logger* logger_;
};
-
diff --git a/src/ReedSolomon.cpp b/lib/ReedSolomon.cpp
index 38d8ea8..1bf0b24 100644
--- a/src/ReedSolomon.cpp
+++ b/lib/ReedSolomon.cpp
@@ -64,7 +64,9 @@ ReedSolomon::ReedSolomon(int N, int K, bool reverse, int gfpoly, int firstRoot,
ReedSolomon::~ReedSolomon()
{
- free_rs_char(rsData);
+ if (rsData != nullptr) {
+ free_rs_char(rsData);
+ }
}
diff --git a/src/ReedSolomon.h b/lib/ReedSolomon.h
index abcef62..abcef62 100644
--- a/src/ReedSolomon.h
+++ b/lib/ReedSolomon.h
diff --git a/src/RemoteControl.cpp b/lib/RemoteControl.cpp
index b32c21a..878af59 100644
--- a/src/RemoteControl.cpp
+++ b/lib/RemoteControl.cpp
@@ -9,31 +9,27 @@
http://www.opendigitalradio.org
*/
/*
- This file is part of ODR-DabMux.
+ This program is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation, either version 3 of the License, or
+ (at your option) any later version.
- ODR-DabMux is free software: you can redistribute it and/or modify
- it under the terms of the GNU General Public License as
- published by the Free Software Foundation, either version 3 of the
- License, or (at your option) any later version.
-
- ODR-DabMux is distributed in the hope that it will be useful,
+ This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
- along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>.
+ along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
#include <list>
#include <string>
#include <iostream>
#include <string>
-#include <boost/asio.hpp>
-#include <boost/thread.hpp>
+#include <algorithm>
#include "RemoteControl.h"
-using boost::asio::ip::tcp;
using namespace std;
RemoteControllers rcs;
@@ -41,7 +37,6 @@ RemoteControllers rcs;
RemoteControllerTelnet::~RemoteControllerTelnet()
{
m_active = false;
- m_io_service.stop();
if (m_restarter_thread.joinable()) {
m_restarter_thread.join();
@@ -158,7 +153,6 @@ void RemoteControllers::set_param(
void RemoteControllerTelnet::restart_thread(long)
{
m_active = false;
- m_io_service.stop();
if (m_child_thread.joinable()) {
m_child_thread.join();
@@ -167,52 +161,56 @@ void RemoteControllerTelnet::restart_thread(long)
m_child_thread = std::thread(&RemoteControllerTelnet::process, this, 0);
}
-void RemoteControllerTelnet::handle_accept(
- const boost::system::error_code& boost_error,
- boost::shared_ptr< boost::asio::ip::tcp::socket > socket,
- boost::asio::ip::tcp::acceptor& acceptor)
+void RemoteControllerTelnet::handle_accept(Socket::TCPSocket&& socket)
{
-
- const std::string welcome = "ODR-DabMux Remote Control CLI\n"
+ const std::string welcome = PACKAGE_NAME " Remote Control CLI\n"
"Write 'help' for help.\n"
"**********\n";
const std::string prompt = "> ";
std::string in_message;
- size_t length;
-
- if (boost_error) {
- etiLog.level(error) << "RC: Error accepting connection";
- return;
- }
try {
etiLog.level(info) << "RC: Accepted";
- boost::system::error_code ignored_error;
+ socket.sendall(welcome.data(), welcome.size());
- boost::asio::write(*socket, boost::asio::buffer(welcome),
- boost::asio::transfer_all(),
- ignored_error);
+ while (m_active and in_message != "quit") {
+ socket.sendall(prompt.data(), prompt.size());
- while (m_active && in_message != "quit") {
- boost::asio::write(*socket, boost::asio::buffer(prompt),
- boost::asio::transfer_all(),
- ignored_error);
+ stringstream in_message_stream;
- in_message = "";
-
- boost::asio::streambuf buffer;
- length = boost::asio::read_until(*socket, buffer, "\n", ignored_error);
+ char last_char = '\0';
+ try {
+ while (last_char != '\n') {
+ try {
+ auto ret = socket.recv(&last_char, 1, 0, 1000);
+ if (ret == 1) {
+ in_message_stream << last_char;
+ }
+ else {
+ break;
+ }
+ }
+ catch (const Socket::TCPSocket::Timeout&) {
+ if (not m_active) {
+ break;
+ }
+ }
+ }
+ }
+ catch (const Socket::TCPSocket::Interrupted&) {
+ in_message_stream.clear();
+ }
- std::istream str(&buffer);
- std::getline(str, in_message);
- if (length == 0) {
+ if (in_message_stream.str().size() == 0) {
etiLog.level(info) << "RC: Connection terminated";
break;
}
+ std::getline(in_message_stream, in_message);
+
while (in_message.length() > 0 &&
(in_message[in_message.length()-1] == '\r' ||
in_message[in_message.length()-1] == '\n')) {
@@ -225,44 +223,35 @@ void RemoteControllerTelnet::handle_accept(
etiLog.level(info) << "RC: Got message '" << in_message << "'";
- dispatch_command(*socket, in_message);
+ dispatch_command(socket, in_message);
}
etiLog.level(info) << "RC: Closing socket";
- socket->close();
+ socket.close();
}
- catch (const std::exception& e)
- {
+ catch (const std::exception& e) {
etiLog.level(error) << "Remote control caught exception: " << e.what();
}
}
void RemoteControllerTelnet::process(long)
{
- m_active = true;
-
- while (m_active) {
- m_io_service.reset();
-
- tcp::acceptor acceptor(m_io_service, tcp::endpoint(
- boost::asio::ip::address::from_string("127.0.0.1"), m_port) );
-
+ try {
+ m_active = true;
- // Add a job to start accepting connections.
- boost::shared_ptr<tcp::socket> socket(
- new tcp::socket(acceptor.get_io_service()));
+ m_socket.listen(m_port, "localhost");
- // Add an accept call to the service. This will prevent io_service::run()
- // from returning.
etiLog.level(info) << "RC: Waiting for connection on port " << m_port;
- acceptor.async_accept(*socket,
- boost::bind(&RemoteControllerTelnet::handle_accept,
- this,
- boost::asio::placeholders::error,
- socket,
- boost::ref(acceptor)));
-
- // Process event loop.
- m_io_service.run();
+ while (m_active) {
+ auto sock = m_socket.accept(1000);
+
+ if (sock.valid()) {
+ handle_accept(move(sock));
+ etiLog.level(info) << "RC: Connection closed. Waiting for connection on port " << m_port;
+ }
+ }
+ }
+ catch (const runtime_error& e) {
+ etiLog.level(warn) << "RC: Encountered error: " << e.what();
}
etiLog.level(info) << "RC: Leaving";
@@ -281,7 +270,7 @@ static std::vector<std::string> tokenise(const std::string& message) {
}
-void RemoteControllerTelnet::dispatch_command(tcp::socket& socket, string command)
+void RemoteControllerTelnet::dispatch_command(Socket::TCPSocket& socket, string command)
{
vector<string> cmd = tokenise(command);
@@ -386,18 +375,15 @@ void RemoteControllerTelnet::dispatch_command(tcp::socket& socket, string comman
}
}
-void RemoteControllerTelnet::reply(tcp::socket& socket, string message)
+void RemoteControllerTelnet::reply(Socket::TCPSocket& socket, string message)
{
- boost::system::error_code ignored_error;
stringstream ss;
ss << message << "\r\n";
- boost::asio::write(socket, boost::asio::buffer(ss.str()),
- boost::asio::transfer_all(),
- ignored_error);
+ socket.sendall(message.data(), message.size());
}
-#if defined(HAVE_RC_ZEROMQ)
+#if defined(HAVE_ZEROMQ)
RemoteControllerZmq::~RemoteControllerZmq() {
m_active = false;
diff --git a/src/RemoteControl.h b/lib/RemoteControl.h
index 0726b28..bd88f82 100644
--- a/src/RemoteControl.h
+++ b/lib/RemoteControl.h
@@ -8,23 +8,21 @@
http://www.opendigitalradio.org
- This module adds remote-control capability to some of the dabmux modules.
+ This module adds remote-control capability to some of the dabmux/dabmod modules.
*/
/*
- This file is part of ODR-DabMux.
+ This program is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation, either version 3 of the License, or
+ (at your option) any later version.
- ODR-DabMux is free software: you can redistribute it and/or modify
- it under the terms of the GNU General Public License as
- published by the Free Software Foundation, either version 3 of the
- License, or (at your option) any later version.
-
- ODR-DabMux is distributed in the hope that it will be useful,
+ This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
- along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>.
+ along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
#pragma once
@@ -33,7 +31,7 @@
# include "config.h"
#endif
-#if defined(HAVE_RC_ZEROMQ)
+#if defined(HAVE_ZEROMQ)
# include "zmq.hpp"
#endif
@@ -43,14 +41,11 @@
#include <string>
#include <atomic>
#include <iostream>
-#include <boost/bind.hpp>
-#include <boost/asio.hpp>
-#include <boost/foreach.hpp>
-#include <boost/tokenizer.hpp>
#include <thread>
#include <stdexcept>
#include "Log.h"
+#include "Socket.h"
#define RC_ADD_PARAMETER(p, desc) { \
std::vector<std::string> p; \
@@ -78,7 +73,7 @@ class RemoteControllable;
class BaseRemoteController {
public:
/* When this returns one, the remote controller cannot be
- * used anymore, and must be restarted by dabmux
+ * used anymore, and must be restarted
*/
virtual bool fault_detected() = 0;
@@ -163,13 +158,11 @@ class RemoteControllerTelnet : public BaseRemoteController {
public:
RemoteControllerTelnet()
: m_active(false),
- m_io_service(),
m_fault(false),
m_port(0) { }
RemoteControllerTelnet(int port)
: m_active(port > 0),
- m_io_service(),
m_fault(false),
m_port(port)
{
@@ -191,31 +184,24 @@ class RemoteControllerTelnet : public BaseRemoteController {
void process(long);
- void dispatch_command(boost::asio::ip::tcp::socket& socket,
- std::string command);
-
- void reply(boost::asio::ip::tcp::socket& socket, std::string message);
-
- void handle_accept(
- const boost::system::error_code& boost_error,
- boost::shared_ptr< boost::asio::ip::tcp::socket > socket,
- boost::asio::ip::tcp::acceptor& acceptor);
+ void dispatch_command(Socket::TCPSocket& socket, std::string command);
+ void reply(Socket::TCPSocket& socket, std::string message);
+ void handle_accept(Socket::TCPSocket&& socket);
std::atomic<bool> m_active;
- boost::asio::io_service m_io_service;
-
/* This is set to true if a fault occurred */
std::atomic<bool> m_fault;
std::thread m_restarter_thread;
std::thread m_child_thread;
+ Socket::TCPSocket m_socket;
int m_port;
};
-#if defined(HAVE_RC_ZEROMQ)
-/* Implements a Remote controller using zmq transportlayer
+#if defined(HAVE_ZEROMQ)
+/* Implements a Remote controller using ZMQ transportlayer
* that listens on localhost
*/
class RemoteControllerZmq : public BaseRemoteController {
diff --git a/lib/Socket.cpp b/lib/Socket.cpp
new file mode 100644
index 0000000..cd70a8e
--- /dev/null
+++ b/lib/Socket.cpp
@@ -0,0 +1,898 @@
+/*
+ Copyright (C) 2003, 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the
+ Queen in Right of Canada (Communications Research Center Canada)
+
+ Copyright (C) 2019
+ Matthias P. Braendli, matthias.braendli@mpb.li
+
+ http://www.opendigitalradio.org
+ */
+/*
+ This program is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation, either version 3 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see <https://www.gnu.org/licenses/>.
+*/
+
+#include "Socket.h"
+
+#include <iostream>
+#include <cstdio>
+#include <cstring>
+#include <cerrno>
+#include <fcntl.h>
+#include <poll.h>
+
+namespace Socket {
+
+using namespace std;
+
+void InetAddress::resolveUdpDestination(const std::string& destination, int port)
+{
+ char service[NI_MAXSERV];
+ snprintf(service, NI_MAXSERV-1, "%d", port);
+
+ struct addrinfo hints;
+ memset(&hints, 0, sizeof(struct addrinfo));
+ hints.ai_family = AF_INET;
+ hints.ai_socktype = SOCK_DGRAM; /* Datagram socket */
+ hints.ai_flags = 0;
+ hints.ai_protocol = 0;
+
+ struct addrinfo *result, *rp;
+ int s = getaddrinfo(destination.c_str(), service, &hints, &result);
+ if (s != 0) {
+ throw runtime_error(string("getaddrinfo failed: ") + gai_strerror(s));
+ }
+
+ for (rp = result; rp != nullptr; rp = rp->ai_next) {
+ // Take the first result
+ memcpy(&addr, rp->ai_addr, rp->ai_addrlen);
+ break;
+ }
+
+ freeaddrinfo(result);
+
+ if (rp == nullptr) {
+ throw runtime_error("Could not resolve");
+ }
+}
+
+UDPPacket::UDPPacket() { }
+
+UDPPacket::UDPPacket(size_t initSize) :
+ buffer(initSize)
+{ }
+
+
+UDPSocket::UDPSocket() :
+ m_sock(INVALID_SOCKET)
+{
+ reinit(0, "");
+}
+
+UDPSocket::UDPSocket(int port) :
+ m_sock(INVALID_SOCKET)
+{
+ reinit(port, "");
+}
+
+UDPSocket::UDPSocket(int port, const std::string& name) :
+ m_sock(INVALID_SOCKET)
+{
+ reinit(port, name);
+}
+
+
+void UDPSocket::setBlocking(bool block)
+{
+ int res = fcntl(m_sock, F_SETFL, block ? 0 : O_NONBLOCK);
+ if (res == -1) {
+ throw runtime_error(string("Can't change blocking state of socket: ") + strerror(errno));
+ }
+}
+
+void UDPSocket::reinit(int port)
+{
+ return reinit(port, "");
+}
+
+void UDPSocket::reinit(int port, const std::string& name)
+{
+ if (m_sock != INVALID_SOCKET) {
+ ::close(m_sock);
+ }
+
+ if (port == 0) {
+ // No need to bind to a given port, creating the
+ // socket is enough
+ m_sock = ::socket(AF_INET, SOCK_DGRAM, 0);
+ return;
+ }
+
+ char service[NI_MAXSERV];
+ snprintf(service, NI_MAXSERV-1, "%d", port);
+
+ struct addrinfo hints;
+ memset(&hints, 0, sizeof(struct addrinfo));
+ hints.ai_family = AF_INET;
+ hints.ai_socktype = SOCK_DGRAM; /* Datagram socket */
+ hints.ai_flags = AI_PASSIVE; /* For wildcard IP address */
+ hints.ai_protocol = 0; /* Any protocol */
+ hints.ai_canonname = nullptr;
+ hints.ai_addr = nullptr;
+ hints.ai_next = nullptr;
+
+ struct addrinfo *result, *rp;
+ int s = getaddrinfo(name.empty() ? nullptr : name.c_str(),
+ port == 0 ? nullptr : service,
+ &hints, &result);
+ if (s != 0) {
+ throw runtime_error(string("getaddrinfo failed: ") + gai_strerror(s));
+ }
+
+ /* getaddrinfo() returns a list of address structures.
+ Try each address until we successfully bind(2).
+ If socket(2) (or bind(2)) fails, we (close the socket
+ and) try the next address. */
+ for (rp = result; rp != nullptr; rp = rp->ai_next) {
+ int sfd = ::socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol);
+ if (sfd == -1) {
+ continue;
+ }
+
+ if (::bind(sfd, rp->ai_addr, rp->ai_addrlen) == 0) {
+ m_sock = sfd;
+ break;
+ }
+
+ ::close(sfd);
+ }
+
+ freeaddrinfo(result);
+
+ if (rp == nullptr) {
+ throw runtime_error("Could not bind");
+ }
+}
+
+void UDPSocket::close()
+{
+ if (m_sock != INVALID_SOCKET) {
+ ::close(m_sock);
+ }
+
+ m_sock = INVALID_SOCKET;
+}
+
+UDPSocket::~UDPSocket()
+{
+ if (m_sock != INVALID_SOCKET) {
+ ::close(m_sock);
+ }
+}
+
+
+UDPPacket UDPSocket::receive(size_t max_size)
+{
+ UDPPacket packet(max_size);
+ socklen_t addrSize;
+ addrSize = sizeof(*packet.address.as_sockaddr());
+ ssize_t ret = recvfrom(m_sock,
+ packet.buffer.data(),
+ packet.buffer.size(),
+ 0,
+ packet.address.as_sockaddr(),
+ &addrSize);
+
+ if (ret == SOCKET_ERROR) {
+ packet.buffer.resize(0);
+
+ // This suppresses the -Wlogical-op warning
+#if EAGAIN == EWOULDBLOCK
+ if (errno == EAGAIN) {
+#else
+ if (errno == EAGAIN or errno == EWOULDBLOCK) {
+#endif
+ return 0;
+ }
+ throw runtime_error(string("Can't receive data: ") + strerror(errno));
+ }
+
+ packet.buffer.resize(ret);
+ return packet;
+}
+
+void UDPSocket::send(UDPPacket& packet)
+{
+ const int ret = sendto(m_sock, packet.buffer.data(), packet.buffer.size(), 0,
+ packet.address.as_sockaddr(), sizeof(*packet.address.as_sockaddr()));
+ if (ret == SOCKET_ERROR && errno != ECONNREFUSED) {
+ throw runtime_error(string("Can't send UDP packet: ") + strerror(errno));
+ }
+}
+
+
+void UDPSocket::send(const std::vector<uint8_t>& data, InetAddress destination)
+{
+ const int ret = sendto(m_sock, data.data(), data.size(), 0,
+ destination.as_sockaddr(), sizeof(*destination.as_sockaddr()));
+ if (ret == SOCKET_ERROR && errno != ECONNREFUSED) {
+ throw runtime_error(string("Can't send UDP packet: ") + strerror(errno));
+ }
+}
+
+void UDPSocket::joinGroup(const char* groupname, const char* if_addr)
+{
+ ip_mreqn group;
+ if ((group.imr_multiaddr.s_addr = inet_addr(groupname)) == INADDR_NONE) {
+ throw runtime_error("Cannot convert multicast group name");
+ }
+ if (!IN_MULTICAST(ntohl(group.imr_multiaddr.s_addr))) {
+ throw runtime_error("Group name is not a multicast address");
+ }
+
+ if (if_addr) {
+ group.imr_address.s_addr = inet_addr(if_addr);
+ }
+ else {
+ group.imr_address.s_addr = htons(INADDR_ANY);
+ }
+ group.imr_ifindex = 0;
+ if (setsockopt(m_sock, IPPROTO_IP, IP_ADD_MEMBERSHIP, &group, sizeof(group))
+ == SOCKET_ERROR) {
+ throw runtime_error(string("Can't join multicast group") + strerror(errno));
+ }
+}
+
+void UDPSocket::setMulticastSource(const char* source_addr)
+{
+ struct in_addr addr;
+ if (inet_aton(source_addr, &addr) == 0) {
+ throw runtime_error(string("Can't parse source address") + strerror(errno));
+ }
+
+ if (setsockopt(m_sock, IPPROTO_IP, IP_MULTICAST_IF, &addr, sizeof(addr))
+ == SOCKET_ERROR) {
+ throw runtime_error(string("Can't set source address") + strerror(errno));
+ }
+}
+
+void UDPSocket::setMulticastTTL(int ttl)
+{
+ if (setsockopt(m_sock, IPPROTO_IP, IP_MULTICAST_TTL, &ttl, sizeof(ttl))
+ == SOCKET_ERROR) {
+ throw runtime_error(string("Can't set multicast ttl") + strerror(errno));
+ }
+}
+
+UDPReceiver::UDPReceiver() { }
+
+UDPReceiver::~UDPReceiver() {
+ m_stop = true;
+ m_sock.close();
+ if (m_thread.joinable()) {
+ m_thread.join();
+ }
+}
+
+void UDPReceiver::start(int port, const string& bindto, const string& mcastaddr, size_t max_packets_queued) {
+ m_port = port;
+ m_bindto = bindto;
+ m_mcastaddr = mcastaddr;
+ m_max_packets_queued = max_packets_queued;
+ m_thread = std::thread(&UDPReceiver::m_run, this);
+}
+
+std::vector<uint8_t> UDPReceiver::get_packet_buffer()
+{
+ if (m_stop) {
+ throw runtime_error("UDP Receiver not running");
+ }
+
+ UDPPacket p;
+ m_packets.wait_and_pop(p);
+
+ return p.buffer;
+}
+
+void UDPReceiver::m_run()
+{
+ // Ensure that stop is set to true in case of exception or return
+ struct SetStopOnDestruct {
+ SetStopOnDestruct(atomic<bool>& stop) : m_stop(stop) {}
+ ~SetStopOnDestruct() { m_stop = true; }
+ private: atomic<bool>& m_stop;
+ } autoSetStop(m_stop);
+
+ if (IN_MULTICAST(ntohl(inet_addr(m_mcastaddr.c_str())))) {
+ m_sock.reinit(m_port, m_mcastaddr);
+ m_sock.setMulticastSource(m_bindto.c_str());
+ m_sock.joinGroup(m_mcastaddr.c_str(), m_bindto.c_str());
+ }
+ else {
+ m_sock.reinit(m_port, m_bindto);
+ }
+
+ while (not m_stop) {
+ constexpr size_t packsize = 8192;
+ try {
+ auto packet = m_sock.receive(packsize);
+ if (packet.buffer.size() == packsize) {
+ // TODO replace fprintf
+ fprintf(stderr, "Warning, possible UDP truncation\n");
+ }
+
+ // If this blocks, the UDP socket will lose incoming packets
+ m_packets.push_wait_if_full(packet, m_max_packets_queued);
+ }
+ catch (const std::runtime_error& e) {
+ // TODO replace fprintf
+ // TODO handle intr
+ fprintf(stderr, "Socket error: %s\n", e.what());
+ m_stop = true;
+ }
+ }
+}
+
+
+TCPSocket::TCPSocket()
+{
+}
+
+TCPSocket::~TCPSocket()
+{
+ if (m_sock != -1) {
+ ::close(m_sock);
+ }
+}
+
+TCPSocket::TCPSocket(TCPSocket&& other) :
+ m_sock(other.m_sock),
+ m_remote_address(move(other.m_remote_address))
+{
+ if (other.m_sock != -1) {
+ other.m_sock = -1;
+ }
+}
+
+TCPSocket& TCPSocket::operator=(TCPSocket&& other)
+{
+ swap(m_remote_address, other.m_remote_address);
+
+ m_sock = other.m_sock;
+ if (other.m_sock != -1) {
+ other.m_sock = -1;
+ }
+
+ return *this;
+}
+
+bool TCPSocket::valid() const
+{
+ return m_sock != -1;
+}
+
+void TCPSocket::connect(const std::string& hostname, int port)
+{
+ if (m_sock != INVALID_SOCKET) {
+ throw std::logic_error("You may only connect an invalid TCPSocket");
+ }
+
+ char service[NI_MAXSERV];
+ snprintf(service, NI_MAXSERV-1, "%d", port);
+
+ /* Obtain address(es) matching host/port */
+ struct addrinfo hints;
+ memset(&hints, 0, sizeof(struct addrinfo));
+ hints.ai_family = AF_INET;
+ hints.ai_socktype = SOCK_STREAM;
+ hints.ai_flags = 0;
+ hints.ai_protocol = 0;
+
+ struct addrinfo *result, *rp;
+ int s = getaddrinfo(hostname.c_str(), service, &hints, &result);
+ if (s != 0) {
+ throw runtime_error(string("getaddrinfo failed: ") + gai_strerror(s));
+ }
+
+ /* getaddrinfo() returns a list of address structures.
+ Try each address until we successfully connect(2).
+ If socket(2) (or connect(2)) fails, we (close the socket
+ and) try the next address. */
+
+ for (rp = result; rp != nullptr; rp = rp->ai_next) {
+ int sfd = ::socket(rp->ai_family, rp->ai_socktype,
+ rp->ai_protocol);
+ if (sfd == -1)
+ continue;
+
+ int ret = ::connect(sfd, rp->ai_addr, rp->ai_addrlen);
+ if (ret != -1 or (ret == -1 and errno == EINPROGRESS)) {
+ // As the TCPClient could set the socket to nonblocking, we
+ // must handle EINPROGRESS here
+ m_sock = sfd;
+ break;
+ }
+
+ ::close(sfd);
+ }
+
+ if (m_sock != INVALID_SOCKET) {
+#if defined(HAVE_SO_NOSIGPIPE)
+ int val = 1;
+ if (setsockopt(m_sock, SOL_SOCKET, SO_NOSIGPIPE, &val, sizeof(val))
+ == SOCKET_ERROR) {
+ throw std::runtime_error("Can't set SO_NOSIGPIPE");
+ }
+#endif
+ }
+
+ freeaddrinfo(result); /* No longer needed */
+
+ if (rp == nullptr) {
+ throw runtime_error("Could not connect");
+ }
+
+}
+
+void TCPSocket::listen(int port, const string& name)
+{
+ if (m_sock != INVALID_SOCKET) {
+ throw std::logic_error("You may only listen with an invalid TCPSocket");
+ }
+
+ char service[NI_MAXSERV];
+ snprintf(service, NI_MAXSERV-1, "%d", port);
+
+ struct addrinfo hints;
+ memset(&hints, 0, sizeof(struct addrinfo));
+ hints.ai_family = AF_INET;
+ hints.ai_socktype = SOCK_STREAM;
+ hints.ai_flags = AI_PASSIVE; /* For wildcard IP address */
+ hints.ai_protocol = 0;
+ hints.ai_canonname = nullptr;
+ hints.ai_addr = nullptr;
+ hints.ai_next = nullptr;
+
+ struct addrinfo *result, *rp;
+ int s = getaddrinfo(name.empty() ? nullptr : name.c_str(), service, &hints, &result);
+ if (s != 0) {
+ throw runtime_error(string("getaddrinfo failed: ") + gai_strerror(s));
+ }
+
+ /* getaddrinfo() returns a list of address structures.
+ Try each address until we successfully bind(2).
+ If socket(2) (or bind(2)) fails, we (close the socket
+ and) try the next address. */
+ for (rp = result; rp != nullptr; rp = rp->ai_next) {
+ int sfd = ::socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol);
+ if (sfd == -1) {
+ continue;
+ }
+
+ if (::bind(sfd, rp->ai_addr, rp->ai_addrlen) == 0) {
+ m_sock = sfd;
+ break;
+ }
+
+ ::close(sfd);
+ }
+
+ freeaddrinfo(result);
+
+ if (m_sock != INVALID_SOCKET) {
+#if defined(HAVE_SO_NOSIGPIPE)
+ int val = 1;
+ if (setsockopt(m_sock, SOL_SOCKET, SO_NOSIGPIPE,
+ &val, sizeof(val)) < 0) {
+ throw std::runtime_error("Can't set SO_NOSIGPIPE");
+ }
+#endif
+
+ int ret = ::listen(m_sock, 0);
+ if (ret == -1) {
+ throw std::runtime_error(string("Could not listen: ") + strerror(errno));
+ }
+ }
+
+ if (rp == nullptr) {
+ throw runtime_error("Could not bind");
+ }
+}
+
+void TCPSocket::close()
+{
+ ::close(m_sock);
+ m_sock = -1;
+}
+
+TCPSocket TCPSocket::accept(int timeout_ms)
+{
+ if (timeout_ms == 0) {
+ InetAddress remote_addr;
+ socklen_t client_len = sizeof(remote_addr.addr);
+ int sockfd = ::accept(m_sock, remote_addr.as_sockaddr(), &client_len);
+ TCPSocket s(sockfd, remote_addr);
+ return s;
+ }
+ else {
+ struct pollfd fds[1];
+ fds[0].fd = m_sock;
+ fds[0].events = POLLIN;
+
+ int retval = poll(fds, 1, timeout_ms);
+
+ if (retval == -1) {
+ std::string errstr(strerror(errno));
+ throw std::runtime_error("TCP Socket accept error: " + errstr);
+ }
+ else if (retval > 0) {
+ InetAddress remote_addr;
+ socklen_t client_len = sizeof(remote_addr.addr);
+ int sockfd = ::accept(m_sock, remote_addr.as_sockaddr(), &client_len);
+ TCPSocket s(sockfd, remote_addr);
+ return s;
+ }
+ else {
+ TCPSocket s(-1);
+ return s;
+ }
+ }
+}
+
+ssize_t TCPSocket::sendall(const void *buffer, size_t buflen)
+{
+ uint8_t *buf = (uint8_t*)buffer;
+ while (buflen > 0) {
+ /* On Linux, the MSG_NOSIGNAL flag ensures that the process
+ * would not receive a SIGPIPE and die.
+ * Other systems have SO_NOSIGPIPE set on the socket for the
+ * same effect. */
+#if defined(HAVE_MSG_NOSIGNAL)
+ const int flags = MSG_NOSIGNAL;
+#else
+ const int flags = 0;
+#endif
+ ssize_t sent = ::send(m_sock, buf, buflen, flags);
+ if (sent < 0) {
+ return -1;
+ }
+ else {
+ buf += sent;
+ buflen -= sent;
+ }
+ }
+ return buflen;
+}
+
+ssize_t TCPSocket::send(const void* data, size_t size, int timeout_ms)
+{
+ if (timeout_ms) {
+ struct pollfd fds[1];
+ fds[0].fd = m_sock;
+ fds[0].events = POLLOUT;
+
+ const int retval = poll(fds, 1, timeout_ms);
+
+ if (retval == -1) {
+ throw std::runtime_error(string("TCP Socket send error on poll(): ") + strerror(errno));
+ }
+ else if (retval == 0) {
+ // Timed out
+ return 0;
+ }
+ }
+
+ /* On Linux, the MSG_NOSIGNAL flag ensures that the process would not
+ * receive a SIGPIPE and die.
+ * Other systems have SO_NOSIGPIPE set on the socket for the same effect. */
+#if defined(HAVE_MSG_NOSIGNAL)
+ const int flags = MSG_NOSIGNAL;
+#else
+ const int flags = 0;
+#endif
+ const ssize_t ret = ::send(m_sock, (const char*)data, size, flags);
+
+ if (ret == SOCKET_ERROR) {
+ throw std::runtime_error(string("TCP Socket send error: ") + strerror(errno));
+ }
+ return ret;
+}
+
+ssize_t TCPSocket::recv(void *buffer, size_t length, int flags)
+{
+ ssize_t ret = ::recv(m_sock, buffer, length, flags);
+ if (ret == -1) {
+ std::string errstr(strerror(errno));
+ throw std::runtime_error("TCP receive error: " + errstr);
+ }
+ return ret;
+}
+
+ssize_t TCPSocket::recv(void *buffer, size_t length, int flags, int timeout_ms)
+{
+ struct pollfd fds[1];
+ fds[0].fd = m_sock;
+ fds[0].events = POLLIN;
+
+ int retval = poll(fds, 1, timeout_ms);
+
+ if (retval == -1 and errno == EINTR) {
+ throw Interrupted();
+ }
+ else if (retval == -1) {
+ std::string errstr(strerror(errno));
+ throw std::runtime_error("TCP receive with poll() error: " + errstr);
+ }
+ else if (retval > 0 and (fds[0].revents | POLLIN)) {
+ ssize_t ret = ::recv(m_sock, buffer, length, flags);
+ if (ret == -1) {
+ if (errno == ECONNREFUSED) {
+ return 0;
+ }
+ std::string errstr(strerror(errno));
+ throw std::runtime_error("TCP receive after poll() error: " + errstr);
+ }
+ return ret;
+ }
+ else {
+ throw Timeout();
+ }
+}
+
+TCPSocket::TCPSocket(int sockfd) :
+ m_sock(sockfd),
+ m_remote_address()
+{ }
+
+TCPSocket::TCPSocket(int sockfd, InetAddress remote_address) :
+ m_sock(sockfd),
+ m_remote_address(remote_address)
+{ }
+
+void TCPClient::connect(const std::string& hostname, int port)
+{
+ m_hostname = hostname;
+ m_port = port;
+ reconnect();
+}
+
+ssize_t TCPClient::recv(void *buffer, size_t length, int flags, int timeout_ms)
+{
+ try {
+ ssize_t ret = m_sock.recv(buffer, length, flags, timeout_ms);
+
+ if (ret == 0) {
+ m_sock.close();
+
+ TCPSocket newsock;
+ m_sock = std::move(newsock);
+ reconnect();
+ }
+
+ return ret;
+ }
+ catch (const TCPSocket::Interrupted&) {
+ return -1;
+ }
+ catch (const TCPSocket::Timeout&) {
+ return 0;
+ }
+
+ return 0;
+}
+
+void TCPClient::reconnect()
+{
+ int flags = fcntl(m_sock.m_sock, F_GETFL);
+ if (fcntl(m_sock.m_sock, F_SETFL, flags | O_NONBLOCK) == -1) {
+ std::string errstr(strerror(errno));
+ throw std::runtime_error("TCP: Could not set O_NONBLOCK: " + errstr);
+ }
+
+ m_sock.connect(m_hostname, m_port);
+}
+
+TCPConnection::TCPConnection(TCPSocket&& sock) :
+ queue(),
+ m_running(true),
+ m_sender_thread(),
+ m_sock(move(sock))
+{
+#if MISSING_OWN_ADDR
+ auto own_addr = m_sock.getOwnAddress();
+ auto addr = m_sock.getRemoteAddress();
+ etiLog.level(debug) << "New TCP Connection on port " <<
+ own_addr.getPort() << " from " <<
+ addr.getHostAddress() << ":" << addr.getPort();
+#endif
+ m_sender_thread = std::thread(&TCPConnection::process, this);
+}
+
+TCPConnection::~TCPConnection()
+{
+ m_running = false;
+ vector<uint8_t> termination_marker;
+ queue.push(termination_marker);
+ m_sender_thread.join();
+}
+
+void TCPConnection::process()
+{
+ while (m_running) {
+ vector<uint8_t> data;
+ queue.wait_and_pop(data);
+
+ if (data.empty()) {
+ // empty vector is the termination marker
+ m_running = false;
+ break;
+ }
+
+ try {
+ ssize_t remaining = data.size();
+ const uint8_t *buf = reinterpret_cast<const uint8_t*>(data.data());
+ const int timeout_ms = 10; // Less than one ETI frame
+
+ while (m_running and remaining > 0) {
+ const ssize_t sent = m_sock.send(buf, remaining, timeout_ms);
+ if (sent < 0 or sent > remaining) {
+ throw std::logic_error("Invalid TCPSocket::send() return value");
+ }
+ remaining -= sent;
+ buf += sent;
+ }
+ }
+ catch (const std::runtime_error& e) {
+ m_running = false;
+ }
+ }
+
+#if MISSING_OWN_ADDR
+ auto own_addr = m_sock.getOwnAddress();
+ auto addr = m_sock.getRemoteAddress();
+ etiLog.level(debug) << "Dropping TCP Connection on port " <<
+ own_addr.getPort() << " from " <<
+ addr.getHostAddress() << ":" << addr.getPort();
+#endif
+}
+
+
+TCPDataDispatcher::TCPDataDispatcher(size_t max_queue_size) :
+ m_max_queue_size(max_queue_size)
+{
+}
+
+TCPDataDispatcher::~TCPDataDispatcher()
+{
+ m_running = false;
+ m_connections.clear();
+ m_listener_socket.close();
+ if (m_listener_thread.joinable()) {
+ m_listener_thread.join();
+ }
+}
+
+void TCPDataDispatcher::start(int port, const string& address)
+{
+ m_listener_socket.listen(port, address);
+
+ m_running = true;
+ m_listener_thread = std::thread(&TCPDataDispatcher::process, this);
+}
+
+void TCPDataDispatcher::write(const vector<uint8_t>& data)
+{
+ if (not m_running) {
+ throw runtime_error(m_exception_data);
+ }
+
+ for (auto& connection : m_connections) {
+ connection.queue.push(data);
+ }
+
+ m_connections.remove_if(
+ [&](const TCPConnection& conn){ return conn.queue.size() > m_max_queue_size; });
+}
+
+void TCPDataDispatcher::process()
+{
+ try {
+ const int timeout_ms = 1000;
+
+ while (m_running) {
+ // Add a new TCPConnection to the list, constructing it from the client socket
+ auto sock = m_listener_socket.accept(timeout_ms);
+ if (sock.valid()) {
+ m_connections.emplace(m_connections.begin(), move(sock));
+ }
+ }
+ }
+ catch (const std::runtime_error& e) {
+ m_exception_data = string("TCPDataDispatcher error: ") + e.what();
+ m_running = false;
+ }
+}
+
+TCPReceiveServer::TCPReceiveServer(size_t blocksize) :
+ m_blocksize(blocksize)
+{
+}
+
+void TCPReceiveServer::start(int listen_port, const std::string& address)
+{
+ m_listener_socket.listen(listen_port, address);
+
+ m_running = true;
+ m_listener_thread = std::thread(&TCPReceiveServer::process, this);
+}
+
+TCPReceiveServer::~TCPReceiveServer()
+{
+ m_running = false;
+ if (m_listener_thread.joinable()) {
+ m_listener_thread.join();
+ }
+}
+
+vector<uint8_t> TCPReceiveServer::receive()
+{
+ vector<uint8_t> buffer;
+ m_queue.try_pop(buffer);
+
+ // we can ignore try_pop()'s return value, because
+ // if it is unsuccessful the buffer is not touched.
+ return buffer;
+}
+
+void TCPReceiveServer::process()
+{
+ constexpr int timeout_ms = 1000;
+ constexpr int disconnect_timeout_ms = 10000;
+ constexpr int max_num_timeouts = disconnect_timeout_ms / timeout_ms;
+
+ while (m_running) {
+ auto sock = m_listener_socket.accept(timeout_ms);
+
+ int num_timeouts = 0;
+
+ while (m_running and sock.valid()) {
+ try {
+ vector<uint8_t> buf(m_blocksize);
+ ssize_t r = sock.recv(buf.data(), buf.size(), 0, timeout_ms);
+ if (r < 0) {
+ throw logic_error("Invalid recv return value");
+ }
+ else if (r == 0) {
+ sock.close();
+ break;
+ }
+ else {
+ buf.resize(r);
+ m_queue.push(move(buf));
+ }
+ }
+ catch (const TCPSocket::Interrupted&) {
+ break;
+ }
+ catch (const TCPSocket::Timeout&) {
+ num_timeouts++;
+ }
+
+ if (num_timeouts > max_num_timeouts) {
+ sock.close();
+ }
+ }
+ }
+}
+
+}
diff --git a/lib/Socket.h b/lib/Socket.h
new file mode 100644
index 0000000..8bb7fe1
--- /dev/null
+++ b/lib/Socket.h
@@ -0,0 +1,294 @@
+/*
+ Copyright (C) 2003, 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the
+ Queen in Right of Canada (Communications Research Center Canada)
+
+ Copyright (C) 2019
+ Matthias P. Braendli, matthias.braendli@mpb.li
+
+ http://www.opendigitalradio.org
+ */
+/*
+ This program is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation, either version 3 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see <https://www.gnu.org/licenses/>.
+*/
+
+#pragma once
+
+#ifdef HAVE_CONFIG_H
+# include "config.h"
+#endif
+
+#include "ThreadsafeQueue.h"
+#include <cstdlib>
+#include <iostream>
+#include <vector>
+#include <atomic>
+#include <thread>
+#include <list>
+
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <unistd.h>
+#include <netdb.h>
+#include <arpa/inet.h>
+#include <pthread.h>
+#define SOCKET int
+#define INVALID_SOCKET -1
+#define SOCKET_ERROR -1
+
+
+namespace Socket {
+
+struct InetAddress {
+ struct sockaddr_storage addr;
+
+ struct sockaddr *as_sockaddr() { return reinterpret_cast<sockaddr*>(&addr); };
+
+ void resolveUdpDestination(const std::string& destination, int port);
+};
+
+/** This class represents a UDP packet.
+ *
+ * A UDP packet contains a payload (sequence of bytes) and an address. For
+ * outgoing packets, the address is the destination address. For incoming
+ * packets, the address tells the user from what source the packet arrived from.
+ */
+class UDPPacket
+{
+ public:
+ UDPPacket();
+ UDPPacket(size_t initSize);
+
+ std::vector<uint8_t> buffer;
+ InetAddress address;
+};
+
+/**
+ * This class represents a socket for sending and receiving UDP packets.
+ *
+ * A UDP socket is the sending or receiving point for a packet delivery service.
+ * Each packet sent or received on a datagram socket is individually
+ * addressed and routed. Multiple packets sent from one machine to another may
+ * be routed differently, and may arrive in any order.
+ */
+class UDPSocket
+{
+ public:
+ /** Create a new socket that will not be bound to any port. To be used
+ * for data output.
+ */
+ UDPSocket();
+ /** Create a new socket.
+ * @param port The port number on which the socket will be bound
+ */
+ UDPSocket(int port);
+ /** Create a new socket.
+ * @param port The port number on which the socket will be bound
+ * @param name The IP address on which the socket will be bound.
+ * It is used to bind the socket on a specific interface if
+ * the computer have many NICs.
+ */
+ UDPSocket(int port, const std::string& name);
+ ~UDPSocket();
+ UDPSocket(const UDPSocket& other) = delete;
+ const UDPSocket& operator=(const UDPSocket& other) = delete;
+
+ /** Close the already open socket, and create a new one. Throws a runtime_error on error. */
+ void reinit(int port);
+ void reinit(int port, const std::string& name);
+
+ void close(void);
+ void send(UDPPacket& packet);
+ void send(const std::vector<uint8_t>& data, InetAddress destination);
+ UDPPacket receive(size_t max_size);
+ void joinGroup(const char* groupname, const char* if_addr = nullptr);
+ void setMulticastSource(const char* source_addr);
+ void setMulticastTTL(int ttl);
+
+ /** Set blocking mode. By default, the socket is blocking.
+ * throws a runtime_error on error.
+ */
+ void setBlocking(bool block);
+
+ protected:
+ SOCKET m_sock;
+};
+
+/* Threaded UDP receiver */
+class UDPReceiver {
+ public:
+ UDPReceiver();
+ ~UDPReceiver();
+ UDPReceiver(const UDPReceiver&) = delete;
+ UDPReceiver operator=(const UDPReceiver&) = delete;
+
+ // Start the receiver in a separate thread
+ void start(int port, const std::string& bindto, const std::string& mcastaddr, size_t max_packets_queued);
+
+ // Get the data contained in a UDP packet, blocks if none available
+ // In case of error, throws a runtime_error
+ std::vector<uint8_t> get_packet_buffer(void);
+
+ private:
+ void m_run(void);
+
+ int m_port = 0;
+ std::string m_bindto;
+ std::string m_mcastaddr;
+ size_t m_max_packets_queued = 1;
+ std::thread m_thread;
+ std::atomic<bool> m_stop = ATOMIC_VAR_INIT(false);
+ ThreadsafeQueue<UDPPacket> m_packets;
+ UDPSocket m_sock;
+};
+
+class TCPSocket {
+ public:
+ TCPSocket();
+ ~TCPSocket();
+ TCPSocket(const TCPSocket& other) = delete;
+ TCPSocket& operator=(const TCPSocket& other) = delete;
+ TCPSocket(TCPSocket&& other);
+ TCPSocket& operator=(TCPSocket&& other);
+
+ bool valid(void) const;
+ void connect(const std::string& hostname, int port);
+ void listen(int port, const std::string& name);
+ void close(void);
+
+ /* throws a runtime_error on failure, an invalid socket on timeout */
+ TCPSocket accept(int timeout_ms);
+
+ /* returns -1 on error, doesn't work on nonblocking sockets */
+ ssize_t sendall(const void *buffer, size_t buflen);
+
+ /** Send data over the TCP connection.
+ * @param data The buffer that will be sent.
+ * @param size Number of bytes to send.
+ * @param timeout_ms number of milliseconds before timeout, or 0 for infinite timeout
+ * return number of bytes sent, 0 on timeout, or throws runtime_error.
+ */
+ ssize_t send(const void* data, size_t size, int timeout_ms=0);
+
+ /* Returns number of bytes read, 0 on disconnect. Throws a
+ * runtime_error on error */
+ ssize_t recv(void *buffer, size_t length, int flags);
+
+ class Timeout {};
+ class Interrupted {};
+ /* Returns number of bytes read, 0 on disconnect or refused connection.
+ * Throws a Timeout on timeout, Interrupted on EINTR, a runtime_error
+ * on error
+ */
+ ssize_t recv(void *buffer, size_t length, int flags, int timeout_ms);
+
+ private:
+ explicit TCPSocket(int sockfd);
+ explicit TCPSocket(int sockfd, InetAddress remote_address);
+ SOCKET m_sock = -1;
+
+ InetAddress m_remote_address;
+
+ friend class TCPClient;
+};
+
+/* Implements a TCP receiver that auto-reconnects on errors */
+class TCPClient {
+ public:
+ void connect(const std::string& hostname, int port);
+
+ /* Returns numer of bytes read, 0 on auto-reconnect, -1
+ * on interruption.
+ * Throws a runtime_error on error */
+ ssize_t recv(void *buffer, size_t length, int flags, int timeout_ms);
+
+ private:
+ void reconnect(void);
+ TCPSocket m_sock;
+ std::string m_hostname;
+ int m_port;
+};
+
+/* Helper class for TCPDataDispatcher, contains a queue of pending data and
+ * a sender thread. */
+class TCPConnection
+{
+ public:
+ TCPConnection(TCPSocket&& sock);
+ TCPConnection(const TCPConnection&) = delete;
+ TCPConnection& operator=(const TCPConnection&) = delete;
+ ~TCPConnection();
+
+ ThreadsafeQueue<std::vector<uint8_t> > queue;
+
+ private:
+ std::atomic<bool> m_running;
+ std::thread m_sender_thread;
+ TCPSocket m_sock;
+
+ void process(void);
+};
+
+/* Send a TCP stream to several destinations, and automatically disconnect destinations
+ * whose buffer overflows.
+ */
+class TCPDataDispatcher
+{
+ public:
+ TCPDataDispatcher(size_t max_queue_size);
+ ~TCPDataDispatcher();
+ TCPDataDispatcher(const TCPDataDispatcher&) = delete;
+ TCPDataDispatcher& operator=(const TCPDataDispatcher&) = delete;
+
+ void start(int port, const std::string& address);
+ void write(const std::vector<uint8_t>& data);
+
+ private:
+ void process();
+
+ size_t m_max_queue_size;
+
+ std::atomic<bool> m_running;
+ std::string m_exception_data;
+ std::thread m_listener_thread;
+ TCPSocket m_listener_socket;
+ std::list<TCPConnection> m_connections;
+};
+
+/* A TCP Server to receive data, which abstracts the handling of connects and disconnects.
+ */
+class TCPReceiveServer {
+ public:
+ TCPReceiveServer(size_t blocksize);
+ ~TCPReceiveServer();
+ TCPReceiveServer(const TCPReceiveServer&) = delete;
+ TCPReceiveServer& operator=(const TCPReceiveServer&) = delete;
+
+ void start(int listen_port, const std::string& address);
+
+ // Return a vector that contains up to blocksize bytes of data, or
+ // and empty vector if no data is available.
+ std::vector<uint8_t> receive();
+
+ private:
+ void process();
+
+ size_t m_blocksize = 0;
+ ThreadsafeQueue<std::vector<uint8_t> > m_queue;
+ std::atomic<bool> m_running;
+ std::string m_exception_data;
+ std::thread m_listener_thread;
+ TCPSocket m_listener_socket;
+};
+
+}
diff --git a/src/ThreadsafeQueue.h b/lib/ThreadsafeQueue.h
index ab287b2..62f4c96 100644
--- a/src/ThreadsafeQueue.h
+++ b/lib/ThreadsafeQueue.h
@@ -12,20 +12,18 @@
element out.
*/
/*
- This file is part of ODR-DabMux.
+ This program is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation, either version 3 of the License, or
+ (at your option) any later version.
- ODR-DabMux is free software: you can redistribute it and/or modify
- it under the terms of the GNU General Public License as
- published by the Free Software Foundation, either version 3 of the
- License, or (at your option) any later version.
-
- ODR-DabMux is distributed in the hope that it will be useful,
+ This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
- along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>.
+ along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
#pragma once
diff --git a/src/crc.c b/lib/crc.c
index cc02473..cc02473 100644
--- a/src/crc.c
+++ b/lib/crc.c
diff --git a/src/crc.h b/lib/crc.h
index b1785a1..b1785a1 100644
--- a/src/crc.h
+++ b/lib/crc.h
diff --git a/lib/edi/PFT.cpp b/lib/edi/PFT.cpp
new file mode 100644
index 0000000..aff7929
--- /dev/null
+++ b/lib/edi/PFT.cpp
@@ -0,0 +1,574 @@
+/* ------------------------------------------------------------------
+ * Copyright (C) 2017 AVT GmbH - Fabien Vercasson
+ * Copyright (C) 2017 Matthias P. Braendli
+ * matthias.braendli@mpb.li
+ *
+ * http://opendigitalradio.org
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ * -------------------------------------------------------------------
+ */
+
+#include <stdio.h>
+#include <cassert>
+#include <cstring>
+#include <sstream>
+#include <stdexcept>
+#include <algorithm>
+#include "crc.h"
+#include "PFT.hpp"
+#include "Log.h"
+#include "buffer_unpack.hpp"
+extern "C" {
+#include "fec/fec.h"
+}
+
+namespace EdiDecoder {
+namespace PFT {
+
+using namespace std;
+
+const findex_t NUM_AFBUILDERS_TO_KEEP = 10;
+
+static bool checkCRC(const uint8_t *buf, size_t size)
+{
+ const uint16_t crc_from_packet = read_16b(buf + size - 2);
+ uint16_t crc_calc = 0xffff;
+ crc_calc = crc16(crc_calc, buf, size - 2);
+ crc_calc ^= 0xffff;
+
+ return crc_from_packet == crc_calc;
+}
+
+class FECDecoder {
+ public:
+ FECDecoder() {
+ m_rs_handler = init_rs_char(
+ symsize, gfPoly, firstRoot, primElem, nroots, pad);
+ }
+ FECDecoder(const FECDecoder& other) = delete;
+ FECDecoder& operator=(const FECDecoder& other) = delete;
+ ~FECDecoder() {
+ free_rs_char(m_rs_handler);
+ }
+
+ // return -1 in case of failure, non-negative value if errors
+ // were corrected.
+ // Known positions of erasures should be given in eras_pos to
+ // improve decoding probability. After calling this function
+ // eras_pos will contain the positions of the corrected errors.
+ int decode(vector<uint8_t> &data, vector<int> &eras_pos) {
+ assert(data.size() == N);
+ const size_t no_eras = eras_pos.size();
+
+ eras_pos.resize(nroots);
+ int num_err = decode_rs_char(m_rs_handler, data.data(),
+ eras_pos.data(), no_eras);
+ if (num_err > 0) {
+ eras_pos.resize(num_err);
+ }
+ return num_err;
+ }
+
+ // return -1 in case of failure, non-negative value if errors
+ // were corrected. No known erasures.
+ int decode(vector<uint8_t> &data) {
+ assert(data.size() == N);
+ int num_err = decode_rs_char(m_rs_handler, data.data(), nullptr, 0);
+ return num_err;
+ }
+
+ private:
+ void* m_rs_handler;
+
+ const int firstRoot = 1; // Discovered by analysing EDI dump
+ const int gfPoly = 0x11d;
+
+ // The encoding has to be 255, 207 always, because the chunk has to
+ // be padded at the end, and not at the beginning as libfec would
+ // do
+ const size_t N = 255;
+ const size_t K = 207;
+ const int primElem = 1;
+ const int symsize = 8;
+ const size_t nroots = N - K; // For EDI PFT, this must be 48
+ const size_t pad = ((1 << symsize) - 1) - N; // is 255-N
+
+};
+
+size_t Fragment::loadData(const std::vector<uint8_t> &buf)
+{
+ const size_t header_len = 14;
+ if (buf.size() < header_len) {
+ return 0;
+ }
+
+ size_t index = 0;
+
+ // Parse PFT Fragment Header (ETSI TS 102 821 V1.4.1 ch7.1)
+ if (not (buf[0] == 'P' and buf[1] == 'F') ) {
+ throw invalid_argument("Invalid PFT SYNC bytes");
+ }
+ index += 2; // Psync
+
+ _Pseq = read_16b(buf.begin()+index); index += 2;
+ _Findex = read_24b(buf.begin()+index); index += 3;
+ _Fcount = read_24b(buf.begin()+index); index += 3;
+ _FEC = unpack1bit(buf[index], 0);
+ _Addr = unpack1bit(buf[index], 1);
+ _Plen = read_16b(buf.begin()+index) & 0x3FFF; index += 2;
+
+ const size_t required_len = header_len +
+ (_FEC ? 1 : 0) +
+ (_Addr ? 2 : 0) +
+ 2; // CRC
+ if (buf.size() < required_len) {
+ return 0;
+ }
+
+ // Optional RS Header
+ _RSk = 0;
+ _RSz = 0;
+ if (_FEC) {
+ _RSk = buf[index]; index += 1;
+ _RSz = buf[index]; index += 1;
+ }
+
+ // Optional transport header
+ _Source = 0;
+ _Dest = 0;
+ if (_Addr) {
+ _Source = read_16b(buf.begin()+index); index += 2;
+ _Dest = read_16b(buf.begin()+index); index += 2;
+ }
+
+ index += 2;
+ const bool crc_valid = checkCRC(buf.data(), index);
+ const bool buf_has_enough_data = (buf.size() >= index + _Plen);
+
+ if (not buf_has_enough_data) {
+ return 0;
+ }
+
+ _valid = ((not _FEC) or crc_valid) and buf_has_enough_data;
+
+#if 0
+ if (!_valid) {
+ stringstream ss;
+ ss << "Invalid PF fragment: ";
+ if (_FEC) {
+ ss << " RSk=" << (uint32_t)_RSk << " RSz=" << (uint32_t)_RSz;
+ }
+
+ if (_Addr) {
+ ss << " Source=" << _Source << " Dest=" << _Dest;
+ }
+ etiLog.log(debug, "%s\n", ss.str().c_str());
+ }
+#endif
+
+ _payload.clear();
+ if (_valid) {
+ copy( buf.begin()+index,
+ buf.begin()+index+_Plen,
+ back_inserter(_payload));
+ index += _Plen;
+ }
+
+ return index;
+}
+
+
+AFBuilder::AFBuilder(pseq_t Pseq, findex_t Fcount, size_t lifetime)
+{
+ _Pseq = Pseq;
+ _Fcount = Fcount;
+ assert(lifetime > 0);
+ lifeTime = lifetime;
+}
+
+void AFBuilder::pushPFTFrag(const Fragment &frag)
+{
+ if (_Pseq != frag.Pseq() or _Fcount != frag.Fcount()) {
+ throw invalid_argument("Invalid PFT fragment Pseq or Fcount");
+ }
+ const auto Findex = frag.Findex();
+ const bool fragment_already_received = _fragments.count(Findex);
+
+ if (not fragment_already_received)
+ {
+ _fragments[Findex] = frag;
+ }
+}
+
+bool Fragment::checkConsistency(const Fragment& other) const
+{
+ /* Consistency check, TS 102 821 Clause 7.3.2.
+ *
+ * Every PFT Fragment produced from a single AF or RS Packet shall have
+ * the same values in all of the PFT Header fields except for the Findex,
+ * Plen and HCRC fields.
+ */
+
+ return other._Fcount == _Fcount and
+ other._FEC == _FEC and
+ other._RSk == _RSk and
+ other._RSz == _RSz and
+ other._Addr == _Addr and
+ other._Source == _Source and
+ other._Dest == _Dest and
+
+ /* The Plen field of all fragments shall be the s for the initial f-1
+ * fragments and s - (L%f) for the final fragment.
+ * Note that when Reed Solomon has been used, all fragments will be of
+ * length s.
+ */
+ (_FEC ? other._Plen == _Plen : true);
+}
+
+
+AFBuilder::decode_attempt_result_t AFBuilder::canAttemptToDecode() const
+{
+ if (_fragments.empty()) {
+ return AFBuilder::decode_attempt_result_t::no;
+ }
+
+ if (_fragments.size() == _Fcount) {
+ return AFBuilder::decode_attempt_result_t::yes;
+ }
+
+ /* Check that all fragments are consistent */
+ const Fragment& first = _fragments.begin()->second;
+ if (not std::all_of(_fragments.begin(), _fragments.end(),
+ [&](const pair<int, Fragment>& pair) {
+ const Fragment& frag = pair.second;
+ return first.checkConsistency(frag) and _Pseq == frag.Pseq();
+ }) ) {
+ throw invalid_argument("Inconsistent PFT fragments");
+ }
+
+ // Calculate the minimum number of fragments necessary to apply FEC.
+ // This can't be done with the last fragment that may have a
+ // smaller size
+ // ETSI TS 102 821 V1.4.1 ch 7.4.4
+ auto frag_it = _fragments.begin();
+ if (frag_it->second.Fcount() == _Fcount - 1) {
+ frag_it++;
+
+ if (frag_it == _fragments.end()) {
+ return AFBuilder::decode_attempt_result_t::no;
+ }
+ }
+
+ const Fragment& frag = frag_it->second;
+
+ if ( frag.FEC() )
+ {
+ const uint16_t _Plen = frag.Plen();
+
+ /* max number of RS chunks that may have been sent */
+ const uint32_t _cmax = (_Fcount*_Plen) / (frag.RSk()+48);
+ assert(_cmax > 0);
+
+ /* Receiving _rxmin fragments does not guarantee that decoding
+ * will succeed! */
+ const uint32_t _rxmin = _Fcount - (_cmax*48)/_Plen;
+
+ if (_fragments.size() >= _rxmin) {
+ return AFBuilder::decode_attempt_result_t::maybe;
+ }
+ }
+
+ return AFBuilder::decode_attempt_result_t::no;
+}
+
+std::vector<uint8_t> AFBuilder::extractAF() const
+{
+ if (not _af_packet.empty()) {
+ return _af_packet;
+ }
+
+ bool ok = false;
+
+ if (canAttemptToDecode() != AFBuilder::decode_attempt_result_t::no) {
+
+ auto frag_it = _fragments.begin();
+ if (frag_it->second.Fcount() == _Fcount - 1) {
+ frag_it++;
+
+ if (frag_it == _fragments.end()) {
+ throw std::runtime_error("Invalid attempt at extracting AF");
+ }
+ }
+
+ const Fragment& ref_frag = frag_it->second;
+ const auto RSk = ref_frag.RSk();
+ const auto RSz = ref_frag.RSz();
+ const auto Plen = ref_frag.Plen();
+
+ if ( ref_frag.FEC() )
+ {
+ const uint32_t cmax = (_Fcount*Plen) / (RSk+48);
+
+ // Keep track of erasures (missing fragments) for
+ // every chunk
+ map<int, vector<int> > erasures;
+
+
+ // Assemble fragments into a RS block, immediately
+ // deinterleaving it.
+ vector<uint8_t> rs_block(Plen * _Fcount);
+ for (size_t j = 0; j < _Fcount; j++) {
+ const bool fragment_present = _fragments.count(j);
+ if (fragment_present) {
+ const auto& fragment = _fragments.at(j).payload();
+
+ if (j != _Fcount - 1 and fragment.size() != Plen) {
+ throw runtime_error("Incorrect fragment length " +
+ to_string(fragment.size()) + " " +
+ to_string(Plen));
+ }
+
+ if (j == _Fcount - 1 and fragment.size() > Plen) {
+ throw runtime_error("Incorrect last fragment length " +
+ to_string(fragment.size()) + " " +
+ to_string(Plen));
+ }
+
+ size_t k = 0;
+ for (; k < fragment.size(); k++) {
+ rs_block[k * _Fcount + j] = fragment[k];
+ }
+
+ for (; k < Plen; k++) {
+ rs_block[k * _Fcount + j] = 0x00;
+ }
+ }
+ else {
+ // fill with zeros if fragment is missing
+ for (size_t k = 0; k < Plen; k++) {
+ rs_block[k * _Fcount + j] = 0x00;
+
+ const size_t chunk_ix = (k * _Fcount + j) / (RSk + 48);
+ const size_t chunk_offset = (k * _Fcount + j) % (RSk + 48);
+ erasures[chunk_ix].push_back(chunk_offset);
+ }
+ }
+ }
+
+ // The RS block is a concatenation of chunks of RSk bytes + 48 parity
+ // followed by RSz padding
+
+ FECDecoder fec;
+ for (size_t i = 0; i < cmax; i++) {
+ // We need to pad the chunk ourself
+ vector<uint8_t> chunk(255);
+ const auto& block_begin = rs_block.begin() + (RSk + 48) * i;
+ copy(block_begin, block_begin + RSk, chunk.begin());
+ // bytes between RSk and 207 are 0x00 already
+ copy(block_begin + RSk, block_begin + RSk + 48,
+ chunk.begin() + 207);
+
+ int errors_corrected = -1;
+ if (erasures.count(i)) {
+ errors_corrected = fec.decode(chunk, erasures[i]);
+ }
+ else {
+ errors_corrected = fec.decode(chunk);
+ }
+
+ if (errors_corrected == -1) {
+ _af_packet.clear();
+ return {};
+ }
+
+#if 0
+ if (errors_corrected > 0) {
+ etiLog.log(debug, "Corrected %d errors at ", errors_corrected);
+ for (const auto &index : erasures[i]) {
+ etiLog.log(debug, " %d", index);
+ }
+ etiLog.log(debug, "\n");
+ }
+#endif
+
+ _af_packet.insert(_af_packet.end(), chunk.begin(), chunk.begin() + RSk);
+ }
+
+ _af_packet.resize(_af_packet.size() - RSz);
+ }
+ else {
+ // No FEC: just assemble fragments
+
+ for (size_t j = 0; j < _Fcount; ++j) {
+ const bool fragment_present = _fragments.count(j);
+ if (fragment_present)
+ {
+ const auto& fragment = _fragments.at(j);
+
+ _af_packet.insert(_af_packet.end(),
+ fragment.payload().begin(),
+ fragment.payload().end());
+ }
+ else {
+ throw logic_error("Missing fragment");
+ }
+ }
+ }
+
+ // EDI specific, must have a CRC.
+ if( _af_packet.size() >= 12 ) {
+ ok = checkCRC(_af_packet.data(), _af_packet.size());
+
+ if (not ok) {
+ etiLog.log(debug, "Too many errors to reconstruct AF from %zu/%u"
+ " PFT fragments\n", _fragments.size(), _Fcount);
+ }
+ }
+ }
+
+ if (not ok) {
+ _af_packet.clear();
+ }
+
+ return _af_packet;
+}
+
+std::string AFBuilder::visualise() const
+{
+ stringstream ss;
+ ss << "|";
+ for (size_t i = 0; i < _Fcount; i++) {
+ if (_fragments.count(i)) {
+ ss << ".";
+ }
+ else {
+ ss << " ";
+ }
+ }
+ ss << "| " << AFBuilder::dar_to_string(canAttemptToDecode()) << " " << lifeTime;
+ return ss.str();
+}
+
+void PFT::pushPFTFrag(const Fragment &fragment)
+{
+ // Start decoding the first pseq we receive. In normal
+ // operation without interruptions, the map should
+ // never become empty
+ if (m_afbuilders.empty()) {
+ m_next_pseq = fragment.Pseq();
+ etiLog.log(debug,"Initialise next_pseq to %u\n", m_next_pseq);
+ }
+
+ if (m_afbuilders.count(fragment.Pseq()) == 0) {
+ // The AFBuilder wants to know the lifetime in number of fragments,
+ // we know the delay in number of AF packets. Every AF packet
+ // is cut into Fcount fragments.
+ const size_t lifetime = fragment.Fcount() * m_max_delay;
+
+ // Build the afbuilder in the map in-place
+ m_afbuilders.emplace(std::piecewise_construct,
+ /* key */
+ std::forward_as_tuple(fragment.Pseq()),
+ /* builder */
+ std::forward_as_tuple(fragment.Pseq(), fragment.Fcount(), lifetime));
+ }
+
+ auto& p = m_afbuilders.at(fragment.Pseq());
+ p.pushPFTFrag(fragment);
+
+ if (m_verbose) {
+ etiLog.log(debug, "Got frag %u:%u, afbuilders: ",
+ fragment.Pseq(), fragment.Findex());
+ for (const auto &k : m_afbuilders) {
+ const bool isNextPseq = (m_next_pseq == k.first);
+ etiLog.level(debug) << (isNextPseq ? "->" : " ") <<
+ k.first << " " << k.second.visualise();
+ }
+ }
+}
+
+
+std::vector<uint8_t> PFT::getNextAFPacket()
+{
+ if (m_afbuilders.count(m_next_pseq) == 0) {
+ if (m_afbuilders.size() > m_max_delay) {
+ m_afbuilders.clear();
+ etiLog.level(debug) << " Reinit";
+ }
+
+ return {};
+ }
+
+ auto &builder = m_afbuilders.at(m_next_pseq);
+
+ using dar_t = AFBuilder::decode_attempt_result_t;
+
+ if (builder.canAttemptToDecode() == dar_t::yes) {
+ auto afpacket = builder.extractAF();
+ assert(not afpacket.empty());
+ incrementNextPseq();
+ return afpacket;
+ }
+ else if (builder.canAttemptToDecode() == dar_t::maybe) {
+ if (builder.lifeTime > 0) {
+ builder.lifeTime--;
+ }
+
+ if (builder.lifeTime == 0) {
+ // Attempt Reed-Solomon decoding
+ auto afpacket = builder.extractAF();
+
+ if (afpacket.empty()) {
+ etiLog.log(debug,"pseq %d timed out after RS", m_next_pseq);
+ }
+ incrementNextPseq();
+ return afpacket;
+ }
+ }
+ else {
+ if (builder.lifeTime > 0) {
+ builder.lifeTime--;
+ }
+
+ if (builder.lifeTime == 0) {
+ etiLog.log(debug, "pseq %d timed out\n", m_next_pseq);
+ incrementNextPseq();
+ }
+ }
+
+ return {};
+}
+
+void PFT::setMaxDelay(size_t num_af_packets)
+{
+ m_max_delay = num_af_packets;
+}
+
+void PFT::setVerbose(bool enable)
+{
+ m_verbose = enable;
+}
+
+void PFT::incrementNextPseq()
+{
+ if (m_afbuilders.count(m_next_pseq - NUM_AFBUILDERS_TO_KEEP) > 0) {
+ m_afbuilders.erase(m_next_pseq - NUM_AFBUILDERS_TO_KEEP);
+ }
+
+ m_next_pseq++;
+}
+
+}
+}
diff --git a/lib/edi/PFT.hpp b/lib/edi/PFT.hpp
new file mode 100644
index 0000000..779509b
--- /dev/null
+++ b/lib/edi/PFT.hpp
@@ -0,0 +1,166 @@
+/* ------------------------------------------------------------------
+ * Copyright (C) 2017 AVT GmbH - Fabien Vercasson
+ * Copyright (C) 2017 Matthias P. Braendli
+ * matthias.braendli@mpb.li
+ *
+ * http://opendigitalradio.org
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ * -------------------------------------------------------------------
+ */
+
+#pragma once
+#include <stdio.h>
+#include <vector>
+#include <map>
+#include <stdint.h>
+
+namespace EdiDecoder {
+namespace PFT {
+
+using pseq_t = uint16_t;
+using findex_t = uint32_t; // findex is a 24-bit value
+
+class Fragment
+{
+ public:
+ // Load the data for one fragment from buf into
+ // the Fragment.
+ // \returns the number of bytes of useful data found in buf
+ // A non-zero return value doesn't imply a valid fragment
+ // the isValid() method must be used to verify this.
+ size_t loadData(const std::vector<uint8_t> &buf);
+
+ bool isValid() const { return _valid; }
+ pseq_t Pseq() const { return _Pseq; }
+ findex_t Findex() const { return _Findex; }
+ findex_t Fcount() const { return _Fcount; }
+ bool FEC() const { return _FEC; }
+ uint16_t Plen() const { return _Plen; }
+ uint8_t RSk() const { return _RSk; }
+ uint8_t RSz() const { return _RSz; }
+ const std::vector<uint8_t>& payload() const
+ { return _payload; }
+
+ bool checkConsistency(const Fragment& other) const;
+
+ private:
+ std::vector<uint8_t> _payload;
+
+ pseq_t _Pseq = 0;
+ findex_t _Findex = 0;
+ findex_t _Fcount = 0;
+ bool _FEC = false;
+ bool _Addr = false;
+ uint16_t _Plen = 0;
+ uint8_t _RSk = 0;
+ uint8_t _RSz = 0;
+ uint16_t _Source = 0;
+ uint16_t _Dest = 0;
+ bool _valid = false;
+};
+
+/* The AFBuilder collects Fragments and builds an Application Frame
+ * out of them. It does error correction if necessary
+ */
+class AFBuilder
+{
+ public:
+ enum class decode_attempt_result_t {
+ yes, // The AF packet can be build because all fragments are present
+ maybe, // RS decoding may correctly decode the AF packet
+ no, // Not enough fragments present to permit RS
+ };
+
+ static std::string dar_to_string(decode_attempt_result_t dar) {
+ switch (dar) {
+ case decode_attempt_result_t::yes: return "y";
+ case decode_attempt_result_t::no: return "n";
+ case decode_attempt_result_t::maybe: return "m";
+ }
+ return "?";
+ }
+
+ AFBuilder(pseq_t Pseq, findex_t Fcount, size_t lifetime);
+
+ void pushPFTFrag(const Fragment &frag);
+
+ /* Assess if it may be possible to decode this AF packet */
+ decode_attempt_result_t canAttemptToDecode() const;
+
+ /* Try to build the AF with received fragments.
+ * Apply error correction if necessary (missing packets/CRC errors)
+ * \return an empty vector if building the AF is not possible
+ */
+ std::vector<uint8_t> extractAF(void) const;
+
+ std::pair<findex_t, findex_t>
+ numberOfFragments(void) const {
+ return {_fragments.size(), _Fcount};
+ }
+
+ std::string visualise(void) const;
+
+ /* The user of this instance can keep track of the lifetime of this
+ * builder
+ */
+ size_t lifeTime;
+
+ private:
+
+ // A map from fragment index to fragment
+ std::map<findex_t, Fragment> _fragments;
+
+ // cached version of decoded AF packet
+ mutable std::vector<uint8_t> _af_packet;
+
+ pseq_t _Pseq;
+ findex_t _Fcount;
+};
+
+class PFT
+{
+ public:
+ void pushPFTFrag(const Fragment &fragment);
+
+ /* Try to build the AF packet for the next pseq. This might
+ * skip one or more pseq according to the maximum delay setting.
+ *
+ * \return an empty vector if building the AF is not possible
+ */
+ std::vector<uint8_t> getNextAFPacket(void);
+
+ /* Set the maximum delay in number of AF Packets before we
+ * abandon decoding a given pseq.
+ */
+ void setMaxDelay(size_t num_af_packets);
+
+ /* Enable verbose fprintf */
+ void setVerbose(bool enable);
+
+ private:
+ void incrementNextPseq(void);
+
+ pseq_t m_next_pseq;
+ size_t m_max_delay = 10; // in AF packets
+
+ // Keep one AFBuilder for each Pseq
+ std::map<pseq_t, AFBuilder> m_afbuilders;
+
+ bool m_verbose = 0;
+};
+
+}
+
+}
diff --git a/lib/edi/STIDecoder.cpp b/lib/edi/STIDecoder.cpp
new file mode 100644
index 0000000..ca8cead
--- /dev/null
+++ b/lib/edi/STIDecoder.cpp
@@ -0,0 +1,191 @@
+/*
+ Copyright (C) 2019
+ Matthias P. Braendli, matthias.braendli@mpb.li
+
+ http://opendigitalradio.org
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License along
+ with this program; if not, write to the Free Software Foundation, Inc.,
+ 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+#include "STIDecoder.hpp"
+#include "buffer_unpack.hpp"
+#include "crc.h"
+#include "Log.h"
+#include <stdio.h>
+#include <cassert>
+#include <sstream>
+
+namespace EdiDecoder {
+
+using namespace std;
+
+STIDecoder::STIDecoder(STIDataCollector& data_collector, bool verbose) :
+ m_data_collector(data_collector),
+ m_dispatcher(std::bind(&STIDecoder::packet_completed, this), verbose)
+{
+ using std::placeholders::_1;
+ using std::placeholders::_2;
+ m_dispatcher.register_tag("*ptr",
+ std::bind(&STIDecoder::decode_starptr, this, _1, _2));
+ m_dispatcher.register_tag("dsti",
+ std::bind(&STIDecoder::decode_dsti, this, _1, _2));
+ m_dispatcher.register_tag("ss",
+ std::bind(&STIDecoder::decode_ssn, this, _1, _2));
+ m_dispatcher.register_tag("*dmy",
+ std::bind(&STIDecoder::decode_stardmy, this, _1, _2));
+}
+
+void STIDecoder::push_bytes(const vector<uint8_t> &buf)
+{
+ m_dispatcher.push_bytes(buf);
+}
+
+void STIDecoder::push_packet(const vector<uint8_t> &buf)
+{
+ m_dispatcher.push_packet(buf);
+}
+
+void STIDecoder::setMaxDelay(int num_af_packets)
+{
+ m_dispatcher.setMaxDelay(num_af_packets);
+}
+
+#define AFPACKET_HEADER_LEN 10 // includes SYNC
+
+bool STIDecoder::decode_starptr(const vector<uint8_t> &value, uint16_t)
+{
+ if (value.size() != 0x40 / 8) {
+ etiLog.log(warn, "Incorrect length %02lx for *PTR", value.size());
+ return false;
+ }
+
+ char protocol_sz[5];
+ protocol_sz[4] = '\0';
+ copy(value.begin(), value.begin() + 4, protocol_sz);
+ string protocol(protocol_sz);
+
+ uint16_t major = read_16b(value.begin() + 4);
+ uint16_t minor = read_16b(value.begin() + 6);
+
+ m_data_collector.update_protocol(protocol, major, minor);
+
+ return true;
+}
+
+bool STIDecoder::decode_dsti(const vector<uint8_t> &value, uint16_t)
+{
+ size_t offset = 0;
+
+ const uint16_t dstiHeader = read_16b(value.begin() + offset);
+ offset += 2;
+
+ sti_management_data md;
+
+ md.stihf = (dstiHeader >> 15) & 0x1;
+ md.atstf = (dstiHeader >> 14) & 0x1;
+ md.rfadf = (dstiHeader >> 13) & 0x1;
+ uint8_t dfcth = (dstiHeader >> 8) & 0x1F;
+ uint8_t dfctl = dstiHeader & 0xFF;
+
+ md.dflc = dfcth * 250 + dfctl; // modulo 5000 counter
+
+ const size_t expected_length = 2 +
+ (md.stihf ? 3 : 0) +
+ (md.atstf ? 1 + 4 + 3 : 0) +
+ (md.rfadf ? 9 : 0);
+
+ if (value.size() != expected_length) {
+ throw std::logic_error("EDI dsti: Assertion error:"
+ "value.size() != expected_length: " +
+ to_string(value.size()) + " " +
+ to_string(expected_length));
+ }
+
+ if (md.stihf) {
+ const uint8_t stat = value[offset++];
+ const uint16_t spid = read_16b(value.begin() + offset);
+ m_data_collector.update_stat(stat, spid);
+ offset += 2;
+ }
+
+ if (md.atstf) {
+ uint8_t utco = value[offset];
+ offset++;
+
+ uint32_t seconds = read_32b(value.begin() + offset);
+ offset += 4;
+
+ m_data_collector.update_edi_time(utco, seconds);
+
+ md.tsta = read_24b(value.begin() + offset);
+ offset += 3;
+ }
+ else {
+ // Null timestamp, ETSI ETS 300 799, C.2.2
+ md.tsta = 0xFFFFFF;
+ }
+
+
+ if (md.rfadf) {
+ std::array<uint8_t, 9> rfad;
+ copy(value.cbegin() + offset,
+ value.cbegin() + offset + 9,
+ rfad.begin());
+ offset += 9;
+
+ m_data_collector.update_rfad(rfad);
+ }
+
+ m_data_collector.update_sti_management(md);
+
+ return true;
+}
+
+bool STIDecoder::decode_ssn(const vector<uint8_t> &value, uint16_t n)
+{
+ sti_payload_data sti;
+
+ sti.stream_index = n - 1; // n is 1-indexed
+ sti.rfa = value[0] >> 3;
+ sti.tid = value[0] & 0x07;
+
+ uint16_t istc = read_24b(value.begin() + 1);
+ sti.tidext = istc >> 13;
+ sti.crcstf = (istc >> 12) & 0x1;
+ sti.stid = istc & 0xFFF;
+
+ if (sti.rfa != 0) {
+ etiLog.level(warn) << "EDI: rfa field in SSnn tag non-null";
+ }
+
+ copy( value.cbegin() + 3,
+ value.cend(),
+ back_inserter(sti.istd));
+
+ m_data_collector.add_payload(move(sti));
+
+ return true;
+}
+
+bool STIDecoder::decode_stardmy(const vector<uint8_t>& /*value*/, uint16_t)
+{
+ return true;
+}
+
+void STIDecoder::packet_completed()
+{
+ m_data_collector.assemble();
+}
+
+}
diff --git a/lib/edi/STIDecoder.hpp b/lib/edi/STIDecoder.hpp
new file mode 100644
index 0000000..201a176
--- /dev/null
+++ b/lib/edi/STIDecoder.hpp
@@ -0,0 +1,122 @@
+/*
+ Copyright (C) 2019
+ Matthias P. Braendli, matthias.braendli@mpb.li
+
+ http://opendigitalradio.org
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License along
+ with this program; if not, write to the Free Software Foundation, Inc.,
+ 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+#pragma once
+
+#include "common.hpp"
+#include <cstdint>
+#include <deque>
+#include <string>
+#include <vector>
+
+namespace EdiDecoder {
+
+// Information for STI-D Management
+struct sti_management_data {
+ bool stihf;
+ bool atstf;
+ bool rfadf;
+ uint16_t dflc;
+
+ uint32_t tsta;
+};
+
+// Information for a subchannel available in EDI
+struct sti_payload_data {
+ uint16_t stream_index;
+ uint8_t rfa;
+ uint8_t tid;
+ uint8_t tidext;
+ bool crcstf;
+ uint16_t stid;
+ std::vector<uint8_t> istd;
+
+ // Return the length of ISTD in bytes
+ uint16_t stl(void) const { return istd.size(); }
+};
+
+/* A class that receives STI data must implement the interface described
+ * in the STIDataCollector. This can be e.g. a converter to ETI, or something that
+ * prepares data structures for a modulator.
+ */
+class STIDataCollector {
+ public:
+ // Tell the ETIWriter what EDI protocol we receive in *ptr.
+ // This is not part of the ETI data, but is used as check
+ virtual void update_protocol(
+ const std::string& proto,
+ uint16_t major,
+ uint16_t minor) = 0;
+
+ // STAT error field and service provider ID
+ virtual void update_stat(uint8_t stat, uint16_t spid) = 0;
+
+ // In addition to TSTA in ETI, EDI also transports more time
+ // stamp information.
+ virtual void update_edi_time(uint32_t utco, uint32_t seconds) = 0;
+
+ virtual void update_rfad(std::array<uint8_t, 9> rfad) = 0;
+ virtual void update_sti_management(const sti_management_data& data) = 0;
+
+ virtual void add_payload(sti_payload_data&& payload) = 0;
+
+ virtual void assemble() = 0;
+};
+
+/* The STIDecoder takes care of decoding the EDI TAGs related to the transport
+ * of ETI(NI) data inside AF and PF packets.
+ *
+ * PF packets are handed over to the PFT decoder, which will in turn return
+ * AF packets. AF packets are directly handled (TAG extraction) here.
+ */
+class STIDecoder {
+ public:
+ STIDecoder(STIDataCollector& data_collector, bool verbose);
+
+ /* Push bytes into the decoder. The buf can contain more
+ * than a single packet. This is useful when reading from streams
+ * (files, TCP)
+ */
+ void push_bytes(const std::vector<uint8_t> &buf);
+
+ /* Push a complete packet into the decoder. Useful for UDP and other
+ * datagram-oriented protocols.
+ */
+ void push_packet(const std::vector<uint8_t> &buf);
+
+ /* Set the maximum delay in number of AF Packets before we
+ * abandon decoding a given pseq.
+ */
+ void setMaxDelay(int num_af_packets);
+
+ private:
+ bool decode_starptr(const std::vector<uint8_t> &value, uint16_t);
+ bool decode_dsti(const std::vector<uint8_t> &value, uint16_t);
+ bool decode_ssn(const std::vector<uint8_t> &value, uint16_t n);
+ bool decode_stardmy(const std::vector<uint8_t> &value, uint16_t);
+
+ void packet_completed();
+
+ STIDataCollector& m_data_collector;
+ TagDispatcher m_dispatcher;
+
+};
+
+}
diff --git a/lib/edi/STIWriter.cpp b/lib/edi/STIWriter.cpp
new file mode 100644
index 0000000..399922a
--- /dev/null
+++ b/lib/edi/STIWriter.cpp
@@ -0,0 +1,139 @@
+/*
+ Copyright (C) 2019
+ Matthias P. Braendli, matthias.braendli@mpb.li
+
+ http://opendigitalradio.org
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License along
+ with this program; if not, write to the Free Software Foundation, Inc.,
+ 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+#include "STIWriter.hpp"
+#include "crc.h"
+#include "Log.h"
+#include <cstdio>
+#include <cassert>
+#include <stdexcept>
+#include <sstream>
+#include <ctime>
+#include <iostream>
+#include <iomanip>
+#include <sstream>
+
+namespace EdiDecoder {
+
+using namespace std;
+
+void STIWriter::update_protocol(
+ const std::string& proto,
+ uint16_t major,
+ uint16_t minor)
+{
+ m_proto_valid = (proto == "DSTI" and major == 0 and minor == 0);
+
+ if (not m_proto_valid) {
+ throw std::invalid_argument("Wrong EDI protocol");
+ }
+}
+
+void STIWriter::reinit()
+{
+ m_proto_valid = false;
+ m_management_data_valid = false;
+ m_stat_valid = false;
+ m_time_valid = false;
+ m_payload_valid = false;
+ m_stiFrame.frame.clear();
+}
+
+void STIWriter::update_stat(uint8_t stat, uint16_t spid)
+{
+ m_stat = stat;
+ m_spid = spid;
+ m_stat_valid = true;
+
+ if (m_stat != 0xFF) {
+ etiLog.log(warn, "STI errorlevel %02x", m_stat);
+ }
+}
+
+void STIWriter::update_rfad(std::array<uint8_t, 9> rfad)
+{
+ (void)rfad;
+}
+
+void STIWriter::update_sti_management(const sti_management_data& data)
+{
+ m_management_data = data;
+ m_management_data_valid = true;
+}
+
+void STIWriter::add_payload(sti_payload_data&& payload)
+{
+ m_payload = move(payload);
+ m_payload_valid = true;
+}
+
+void STIWriter::update_edi_time(
+ uint32_t utco,
+ uint32_t seconds)
+{
+ if (not m_proto_valid) {
+ throw std::logic_error("Cannot update time before protocol");
+ }
+
+ m_utco = utco;
+ m_seconds = seconds;
+
+ // TODO check validity
+ m_time_valid = true;
+}
+
+
+void STIWriter::assemble()
+{
+ if (not m_proto_valid) {
+ throw std::logic_error("Cannot assemble STI before protocol");
+ }
+
+ if (not m_management_data_valid) {
+ throw std::logic_error("Cannot assemble STI before management data");
+ }
+
+ if (not m_payload_valid) {
+ throw std::logic_error("Cannot assemble STI without frame data");
+ }
+
+ // TODO check time validity
+
+ // Do copies so as to preserve existing payload data
+ m_stiFrame.frame = m_payload.istd;
+ m_stiFrame.timestamp.seconds = m_seconds;
+ m_stiFrame.timestamp.utco = m_utco;
+ m_stiFrame.timestamp.tsta = m_management_data.tsta;
+}
+
+sti_frame_t STIWriter::getFrame()
+{
+ if (m_stiFrame.frame.empty()) {
+ return {};
+ }
+
+ sti_frame_t sti;
+ swap(sti, m_stiFrame);
+ reinit();
+ return sti;
+}
+
+}
+
diff --git a/lib/edi/STIWriter.hpp b/lib/edi/STIWriter.hpp
new file mode 100644
index 0000000..a75cb69
--- /dev/null
+++ b/lib/edi/STIWriter.hpp
@@ -0,0 +1,84 @@
+/*
+ Copyright (C) 2019
+ Matthias P. Braendli, matthias.braendli@mpb.li
+
+ http://opendigitalradio.org
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License along
+ with this program; if not, write to the Free Software Foundation, Inc.,
+ 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+#pragma once
+
+#include "common.hpp"
+#include "STIDecoder.hpp"
+#include <cstdint>
+#include <string>
+#include <vector>
+#include <list>
+
+namespace EdiDecoder {
+
+struct sti_frame_t {
+ std::vector<uint8_t> frame;
+ frame_timestamp_t timestamp;
+};
+
+class STIWriter : public STIDataCollector {
+ public:
+ // Tell the ETIWriter what EDI protocol we receive in *ptr.
+ // This is not part of the ETI data, but is used as check
+ virtual void update_protocol(
+ const std::string& proto,
+ uint16_t major,
+ uint16_t minor);
+
+ virtual void update_stat(uint8_t stat, uint16_t spid);
+
+ virtual void update_edi_time(
+ uint32_t utco,
+ uint32_t seconds);
+
+ virtual void update_rfad(std::array<uint8_t, 9> rfad);
+ virtual void update_sti_management(const sti_management_data& data);
+ virtual void add_payload(sti_payload_data&& payload);
+
+ virtual void assemble(void);
+
+ // Return the assembled frame or an empty frame if not ready
+ sti_frame_t getFrame();
+
+ private:
+ void reinit(void);
+
+ bool m_proto_valid = false;
+
+ bool m_management_data_valid = false;
+ sti_management_data m_management_data;
+
+ bool m_stat_valid = false;
+ uint8_t m_stat = 0;
+ uint16_t m_spid = 0;
+
+ bool m_time_valid = false;
+ uint32_t m_utco = 0;
+ uint32_t m_seconds = 0;
+
+ bool m_payload_valid = false;
+ sti_payload_data m_payload;
+
+ sti_frame_t m_stiFrame;
+};
+
+}
+
diff --git a/lib/edi/buffer_unpack.hpp b/lib/edi/buffer_unpack.hpp
new file mode 100644
index 0000000..05a1534
--- /dev/null
+++ b/lib/edi/buffer_unpack.hpp
@@ -0,0 +1,62 @@
+/*
+ Copyright (C) 2016
+ Matthias P. Braendli, matthias.braendli@mpb.li
+
+ http://opendigitalradio.org
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License along
+ with this program; if not, write to the Free Software Foundation, Inc.,
+ 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+
+#pragma once
+#include <stdint.h>
+
+namespace EdiDecoder {
+
+template<class T>
+uint16_t read_16b(T buf)
+{
+ uint16_t value = 0;
+ value = (uint16_t)(buf[0]) << 8;
+ value |= (uint16_t)(buf[1]);
+ return value;
+}
+
+template<class T>
+uint32_t read_24b(T buf)
+{
+ uint32_t value = 0;
+ value = (uint32_t)(buf[0]) << 16;
+ value |= (uint32_t)(buf[1]) << 8;
+ value |= (uint32_t)(buf[2]);
+ return value;
+}
+
+template<class T>
+uint32_t read_32b(T buf)
+{
+ uint32_t value = 0;
+ value = (uint32_t)(buf[0]) << 24;
+ value |= (uint32_t)(buf[1]) << 16;
+ value |= (uint32_t)(buf[2]) << 8;
+ value |= (uint32_t)(buf[3]);
+ return value;
+}
+
+inline uint32_t unpack1bit(uint8_t byte, int bitpos)
+{
+ return (byte & 1 << (7-bitpos)) > (7-bitpos);
+}
+
+}
diff --git a/lib/edi/common.cpp b/lib/edi/common.cpp
new file mode 100644
index 0000000..b4b0c79
--- /dev/null
+++ b/lib/edi/common.cpp
@@ -0,0 +1,323 @@
+/*
+ Copyright (C) 2019
+ Matthias P. Braendli, matthias.braendli@mpb.li
+
+ http://opendigitalradio.org
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License along
+ with this program; if not, write to the Free Software Foundation, Inc.,
+ 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+#include "common.hpp"
+#include "buffer_unpack.hpp"
+#include "Log.h"
+#include "crc.h"
+#include <iomanip>
+#include <sstream>
+#include <cassert>
+#include <cmath>
+#include <cstdio>
+
+namespace EdiDecoder {
+
+using namespace std;
+
+bool frame_timestamp_t::valid() const
+{
+ return tsta != 0xFFFFFF;
+}
+
+string frame_timestamp_t::to_string() const
+{
+ const time_t seconds_in_unix_epoch = to_unix_epoch();
+
+ stringstream ss;
+ if (valid()) {
+ ss << "Timestamp: ";
+ }
+ else {
+ ss << "Timestamp not valid: ";
+ }
+ ss << std::put_time(std::gmtime(&seconds_in_unix_epoch), "%c %Z") <<
+ " + " << ((double)tsta / 16384000.0);
+ return ss.str();
+}
+
+time_t frame_timestamp_t::to_unix_epoch() const
+{
+ // EDI epoch: 2000-01-01T00:00:00Z
+ // Convert using
+ // TZ=UTC python -c 'import datetime; print(datetime.datetime(2000,1,1,0,0,0,0).strftime("%s"))'
+ return 946684800 + seconds - utco;
+}
+
+std::chrono::system_clock::time_point frame_timestamp_t::to_system_clock() const
+{
+ auto ts = chrono::system_clock::from_time_t(to_unix_epoch());
+
+ // PPS offset in seconds = tsta / 16384000
+ ts += chrono::nanoseconds(std::lrint(tsta / 0.016384));
+
+ return ts;
+}
+
+
+TagDispatcher::TagDispatcher(
+ std::function<void()>&& af_packet_completed, bool verbose) :
+ m_af_packet_completed(move(af_packet_completed))
+{
+ m_pft.setVerbose(verbose);
+}
+
+void TagDispatcher::push_bytes(const vector<uint8_t> &buf)
+{
+ copy(buf.begin(), buf.end(), back_inserter(m_input_data));
+
+ while (m_input_data.size() > 2) {
+ if (m_input_data[0] == 'A' and m_input_data[1] == 'F') {
+ const decode_state_t st = decode_afpacket(m_input_data);
+
+ if (st.num_bytes_consumed == 0 and not st.complete) {
+ // We need to refill our buffer
+ break;
+ }
+
+ if (st.num_bytes_consumed) {
+ vector<uint8_t> remaining_data;
+ copy(m_input_data.begin() + st.num_bytes_consumed,
+ m_input_data.end(),
+ back_inserter(remaining_data));
+ m_input_data = remaining_data;
+ }
+
+ if (st.complete) {
+ m_af_packet_completed();
+ }
+ }
+ else if (m_input_data[0] == 'P' and m_input_data[1] == 'F') {
+ PFT::Fragment fragment;
+ const size_t fragment_bytes = fragment.loadData(m_input_data);
+
+ if (fragment_bytes == 0) {
+ // We need to refill our buffer
+ break;
+ }
+
+ vector<uint8_t> remaining_data;
+ copy(m_input_data.begin() + fragment_bytes,
+ m_input_data.end(),
+ back_inserter(remaining_data));
+ m_input_data = remaining_data;
+
+ if (fragment.isValid()) {
+ m_pft.pushPFTFrag(fragment);
+ }
+
+ auto af = m_pft.getNextAFPacket();
+ if (not af.empty()) {
+ decode_state_t st = decode_afpacket(af);
+
+ if (st.complete) {
+ m_af_packet_completed();
+ }
+ }
+ }
+ else {
+ etiLog.log(warn,"Unknown %c!", *m_input_data.data());
+ m_input_data.erase(m_input_data.begin());
+ }
+ }
+}
+
+void TagDispatcher::push_packet(const vector<uint8_t> &buf)
+{
+ if (buf.size() < 2) {
+ throw std::invalid_argument("Not enough bytes to read EDI packet header");
+ }
+
+ if (buf[0] == 'A' and buf[1] == 'F') {
+ const decode_state_t st = decode_afpacket(buf);
+
+ if (st.complete) {
+ m_af_packet_completed();
+ }
+
+ }
+ else if (buf[0] == 'P' and buf[1] == 'F') {
+ PFT::Fragment fragment;
+ fragment.loadData(buf);
+
+ if (fragment.isValid()) {
+ m_pft.pushPFTFrag(fragment);
+ }
+
+ auto af = m_pft.getNextAFPacket();
+ if (not af.empty()) {
+ const decode_state_t st = decode_afpacket(af);
+
+ if (st.complete) {
+ m_af_packet_completed();
+ }
+ }
+ }
+ else {
+ const char packettype[3] = {(char)buf[0], (char)buf[1], '\0'};
+ std::stringstream ss;
+ ss << "Unknown EDI packet ";
+ ss << packettype;
+ throw std::invalid_argument(ss.str());
+ }
+}
+
+void TagDispatcher::setMaxDelay(int num_af_packets)
+{
+ m_pft.setMaxDelay(num_af_packets);
+}
+
+
+#define AFPACKET_HEADER_LEN 10 // includes SYNC
+decode_state_t TagDispatcher::decode_afpacket(
+ const std::vector<uint8_t> &input_data)
+{
+ if (input_data.size() < AFPACKET_HEADER_LEN) {
+ return {false, 0};
+ }
+
+ // read length from packet
+ uint32_t taglength = read_32b(input_data.begin() + 2);
+ uint16_t seq = read_16b(input_data.begin() + 6);
+
+ const size_t crclength = 2;
+ if (input_data.size() < AFPACKET_HEADER_LEN + taglength + crclength) {
+ return {false, 0};
+ }
+
+ if (m_last_seq + 1 != seq) {
+ etiLog.level(warn) << "EDI AF Packet sequence error, " << seq;
+ }
+ m_last_seq = seq;
+
+ bool has_crc = (input_data[8] & 0x80) ? true : false;
+ uint8_t major_revision = (input_data[8] & 0x70) >> 4;
+ uint8_t minor_revision = input_data[8] & 0x0F;
+ if (major_revision != 1 or minor_revision != 0) {
+ throw invalid_argument("EDI AF Packet has wrong revision " +
+ to_string(major_revision) + "." + to_string(minor_revision));
+ }
+ uint8_t pt = input_data[9];
+ if (pt != 'T') {
+ // only support Tag
+ return {false, 0};
+ }
+
+
+ if (not has_crc) {
+ throw invalid_argument("AF packet not supported, has no CRC");
+ }
+
+ uint16_t crc = 0xffff;
+ for (size_t i = 0; i < AFPACKET_HEADER_LEN + taglength; i++) {
+ crc = crc16(crc, &input_data[i], 1);
+ }
+ crc ^= 0xffff;
+
+ uint16_t packet_crc = read_16b(input_data.begin() + AFPACKET_HEADER_LEN + taglength);
+
+ if (packet_crc != crc) {
+ throw invalid_argument(
+ "AF Packet crc wrong");
+ }
+ else {
+ vector<uint8_t> payload(taglength);
+ copy(input_data.begin() + AFPACKET_HEADER_LEN,
+ input_data.begin() + AFPACKET_HEADER_LEN + taglength,
+ payload.begin());
+
+ return {decode_tagpacket(payload),
+ AFPACKET_HEADER_LEN + taglength + 2};
+ }
+}
+
+void TagDispatcher::register_tag(const std::string& tag, tag_handler&& h)
+{
+ m_handlers[tag] = move(h);
+}
+
+
+bool TagDispatcher::decode_tagpacket(const vector<uint8_t> &payload)
+{
+ size_t length = 0;
+
+ bool success = true;
+
+ for (size_t i = 0; i + 8 < payload.size(); i += 8 + length) {
+ char tag_sz[5];
+ tag_sz[4] = '\0';
+ copy(payload.begin() + i, payload.begin() + i + 4, tag_sz);
+
+ string tag(tag_sz);
+
+ uint32_t taglength = read_32b(payload.begin() + i + 4);
+
+ if (taglength % 8 != 0) {
+ etiLog.log(warn, "Invalid tag length!");
+ break;
+ }
+ taglength /= 8;
+
+ length = taglength;
+
+ vector<uint8_t> tag_value(taglength);
+ copy( payload.begin() + i+8,
+ payload.begin() + i+8+taglength,
+ tag_value.begin());
+
+ bool tagsuccess = false;
+ bool found = false;
+ for (auto tag_handler : m_handlers) {
+ if (tag_handler.first.size() == 4 and tag_handler.first == tag) {
+ found = true;
+ tagsuccess = tag_handler.second(tag_value, 0);
+ }
+ else if (tag_handler.first.size() == 3 and
+ tag.substr(0, 3) == tag_handler.first) {
+ found = true;
+ uint8_t n = tag_sz[3];
+ tagsuccess = tag_handler.second(tag_value, n);
+ }
+ else if (tag_handler.first.size() == 2 and
+ tag.substr(0, 2) == tag_handler.first) {
+ found = true;
+ uint16_t n = 0;
+ n = (uint16_t)(tag_sz[2]) << 8;
+ n |= (uint16_t)(tag_sz[3]);
+ tagsuccess = tag_handler.second(tag_value, n);
+ }
+ }
+
+ if (not found) {
+ etiLog.log(warn, "Ignoring unknown TAG %s", tag.c_str());
+ break;
+ }
+
+ if (not tagsuccess) {
+ etiLog.log(warn, "Error decoding TAG %s", tag.c_str());
+ success = tagsuccess;
+ break;
+ }
+ }
+
+ return success;
+}
+
+}
diff --git a/lib/edi/common.hpp b/lib/edi/common.hpp
new file mode 100644
index 0000000..887bc3d
--- /dev/null
+++ b/lib/edi/common.hpp
@@ -0,0 +1,92 @@
+/*
+ Copyright (C) 2019
+ Matthias P. Braendli, matthias.braendli@mpb.li
+
+ http://opendigitalradio.org
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License along
+ with this program; if not, write to the Free Software Foundation, Inc.,
+ 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+#pragma once
+
+#include "PFT.hpp"
+#include <functional>
+#include <map>
+#include <chrono>
+#include <string>
+#include <vector>
+#include <cstddef>
+#include <ctime>
+
+namespace EdiDecoder {
+
+struct frame_timestamp_t {
+ uint32_t seconds = 0;
+ uint32_t utco = 0;
+ uint32_t tsta = 0; // According to EN 300 797 Annex B
+
+ bool valid() const;
+ std::string to_string() const;
+ time_t to_unix_epoch() const;
+ std::chrono::system_clock::time_point to_system_clock() const;
+};
+
+struct decode_state_t {
+ decode_state_t(bool _complete, size_t _num_bytes_consumed) :
+ complete(_complete), num_bytes_consumed(_num_bytes_consumed) {}
+ bool complete;
+ size_t num_bytes_consumed;
+};
+
+/* The TagDispatcher takes care of decoding EDI, with or without PFT, and
+ * will call functions when TAGs are encountered.
+ *
+ * PF packets are handed over to the PFT decoder, which will in turn return
+ * AF packets. AF packets are directly dispatched to the TAG functions.
+ */
+class TagDispatcher {
+ public:
+ TagDispatcher(std::function<void()>&& af_packet_completed, bool verbose);
+
+ /* Push bytes into the decoder. The buf can contain more
+ * than a single packet. This is useful when reading from streams
+ * (files, TCP)
+ */
+ void push_bytes(const std::vector<uint8_t> &buf);
+
+ /* Push a complete packet into the decoder. Useful for UDP and other
+ * datagram-oriented protocols.
+ */
+ void push_packet(const std::vector<uint8_t> &buf);
+
+ /* Set the maximum delay in number of AF Packets before we
+ * abandon decoding a given pseq.
+ */
+ void setMaxDelay(int num_af_packets);
+
+ using tag_handler = std::function<bool(std::vector<uint8_t>, uint16_t)>;
+ void register_tag(const std::string& tag, tag_handler&& h);
+
+ private:
+ decode_state_t decode_afpacket(const std::vector<uint8_t> &input_data);
+ bool decode_tagpacket(const std::vector<uint8_t> &payload);
+
+ PFT::PFT m_pft;
+ uint16_t m_last_seq = 0;
+ std::vector<uint8_t> m_input_data;
+ std::map<std::string, tag_handler> m_handlers;
+ std::function<void()> m_af_packet_completed;
+};
+
+}
diff --git a/src/dabOutput/edi/AFPacket.cpp b/lib/edioutput/AFPacket.cpp
index a58a980..b38c38b 100644
--- a/src/dabOutput/edi/AFPacket.cpp
+++ b/lib/edioutput/AFPacket.cpp
@@ -10,21 +10,21 @@
*/
/*
- This file is part of ODR-DabMux.
+ This file is part of the ODR-mmbTools.
- ODR-DabMux is free software: you can redistribute it and/or modify
+ This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as
published by the Free Software Foundation, either version 3 of the
License, or (at your option) any later version.
- ODR-DabMux is distributed in the hope that it will be useful,
+ This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
- along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>.
- */
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
#include "config.h"
#include "crc.h"
#include "AFPacket.h"
@@ -34,7 +34,7 @@
#include <string>
#include <iostream>
#include <cstdio>
-#include <stdint.h>
+#include <cstdint>
#include <arpa/inet.h>
namespace edi {
diff --git a/src/dabOutput/edi/AFPacket.h b/lib/edioutput/AFPacket.h
index b4ccef1..f2c4e35 100644
--- a/src/dabOutput/edi/AFPacket.h
+++ b/lib/edioutput/AFPacket.h
@@ -10,27 +10,27 @@
*/
/*
- This file is part of ODR-DabMux.
+ This file is part of the ODR-mmbTools.
- ODR-DabMux is free software: you can redistribute it and/or modify
+ This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as
published by the Free Software Foundation, either version 3 of the
License, or (at your option) any later version.
- ODR-DabMux is distributed in the hope that it will be useful,
+ This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
- along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>.
- */
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
#pragma once
#include "config.h"
#include <vector>
-#include <stdint.h>
+#include <cstdint>
#include "TagItems.h"
#include "TagPacket.h"
diff --git a/src/dabOutput/edi/Config.h b/lib/edioutput/Config.h
index 55d5f0f..ca76322 100644
--- a/src/dabOutput/edi/Config.h
+++ b/lib/edioutput/Config.h
@@ -9,21 +9,21 @@
*/
/*
- This file is part of ODR-DabMux.
+ This file is part of the ODR-mmbTools.
- ODR-DabMux is free software: you can redistribute it and/or modify
+ This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as
published by the Free Software Foundation, either version 3 of the
License, or (at your option) any later version.
- ODR-DabMux is distributed in the hope that it will be useful,
+ This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
- along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>.
- */
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
#pragma once
@@ -50,11 +50,18 @@ struct udp_destination_t : public destination_t {
};
// TCP server that can accept multiple connections
-struct tcp_destination_t : public destination_t {
+struct tcp_server_t : public destination_t {
unsigned int listen_port = 0;
size_t max_frames_queued = 1024;
};
+// TCP client that connects to one endpoint
+struct tcp_client_t : public destination_t {
+ std::string dest_addr;
+ unsigned int dest_port = 0;
+ size_t max_frames_queued = 1024;
+};
+
struct configuration_t {
unsigned chunk_len = 207; // RSk, data length of each chunk
unsigned fec = 0; // number of fragments that can be recovered
diff --git a/src/dabOutput/edi/Interleaver.cpp b/lib/edioutput/Interleaver.cpp
index f26a50e..f26a50e 100644
--- a/src/dabOutput/edi/Interleaver.cpp
+++ b/lib/edioutput/Interleaver.cpp
diff --git a/src/dabOutput/edi/Interleaver.h b/lib/edioutput/Interleaver.h
index f1cff30..3029d5d 100644
--- a/src/dabOutput/edi/Interleaver.h
+++ b/lib/edioutput/Interleaver.h
@@ -13,21 +13,21 @@
*/
/*
- This file is part of ODR-DabMux.
+ This file is part of the ODR-mmbTools.
- ODR-DabMux is free software: you can redistribute it and/or modify
+ This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as
published by the Free Software Foundation, either version 3 of the
License, or (at your option) any later version.
- ODR-DabMux is distributed in the hope that it will be useful,
+ This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
- along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>.
- */
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
#pragma once
@@ -35,7 +35,7 @@
#include <vector>
#include <deque>
#include <stdexcept>
-#include <stdint.h>
+#include <cstdint>
#include "Log.h"
#include "PFT.h"
diff --git a/src/dabOutput/edi/PFT.cpp b/lib/edioutput/PFT.cpp
index 5b93016..371d36f 100644
--- a/src/dabOutput/edi/PFT.cpp
+++ b/lib/edioutput/PFT.cpp
@@ -14,28 +14,28 @@
*/
/*
- This file is part of ODR-DabMux.
+ This file is part of the ODR-mmbTools.
- ODR-DabMux is free software: you can redistribute it and/or modify
+ This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as
published by the Free Software Foundation, either version 3 of the
License, or (at your option) any later version.
- ODR-DabMux is distributed in the hope that it will be useful,
+ This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
- along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>.
- */
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
#include "config.h"
#include <vector>
#include <list>
#include <cstdio>
#include <cstring>
-#include <stdint.h>
+#include <cstdint>
#include <arpa/inet.h>
#include <stdexcept>
#include <sstream>
@@ -314,7 +314,7 @@ std::vector< PFTFragment > PFT::Assemble(AFPacket af_packet)
#if 0
fprintf(stderr, "* PFT pseq %d, findex %d, fcount %d, plen %d\n",
- m_pseq, findex, fcount, plen & ~0x8000);
+ m_pseq, findex, fcount, plen & ~0xC000);
#endif
}
diff --git a/src/dabOutput/edi/PFT.h b/lib/edioutput/PFT.h
index 4076bf3..0ff4839 100644
--- a/src/dabOutput/edi/PFT.h
+++ b/lib/edioutput/PFT.h
@@ -14,21 +14,21 @@
*/
/*
- This file is part of ODR-DabMux.
+ This file is part of the ODR-mmbTools.
- ODR-DabMux is free software: you can redistribute it and/or modify
+ This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as
published by the Free Software Foundation, either version 3 of the
License, or (at your option) any later version.
- ODR-DabMux is distributed in the hope that it will be useful,
+ This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
- along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>.
- */
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
#pragma once
@@ -36,11 +36,11 @@
#include <vector>
#include <list>
#include <stdexcept>
-#include <stdint.h>
+#include <cstdint>
#include "AFPacket.h"
#include "Log.h"
#include "ReedSolomon.h"
-#include "dabOutput/edi/Config.h"
+#include "Config.h"
namespace edi {
diff --git a/src/dabOutput/edi/TagItems.cpp b/lib/edioutput/TagItems.cpp
index dfb4934..35a6852 100644
--- a/src/dabOutput/edi/TagItems.cpp
+++ b/lib/edioutput/TagItems.cpp
@@ -10,32 +10,40 @@
*/
/*
- This file is part of ODR-DabMux.
+ This file is part of the ODR-mmbTools.
- ODR-DabMux is free software: you can redistribute it and/or modify
+ This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as
published by the Free Software Foundation, either version 3 of the
License, or (at your option) any later version.
- ODR-DabMux is distributed in the hope that it will be useful,
+ This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
- along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>.
- */
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
#include "config.h"
#include "TagItems.h"
#include <vector>
#include <iostream>
#include <string>
-#include <stdint.h>
+#include <cstdint>
#include <stdexcept>
namespace edi {
+TagStarPTR::TagStarPTR(const std::string& protocol)
+ : m_protocol(protocol)
+{
+ if (m_protocol.size() != 4) {
+ throw std::runtime_error("TagStarPTR protocol invalid length");
+ }
+}
+
std::vector<uint8_t> TagStarPTR::Assemble()
{
//std::cerr << "TagItem *ptr" << std::endl;
@@ -47,8 +55,7 @@ std::vector<uint8_t> TagStarPTR::Assemble()
packet.push_back(0);
packet.push_back(0x40);
- std::string protocol("DETI");
- packet.insert(packet.end(), protocol.begin(), protocol.end());
+ packet.insert(packet.end(), m_protocol.begin(), m_protocol.end());
// Major
packet.push_back(0);
@@ -193,6 +200,166 @@ std::vector<uint8_t> TagESTn::Assemble()
return packet;
}
+std::vector<uint8_t> TagDSTI::Assemble()
+{
+ std::string pack_data("dsti");
+ std::vector<uint8_t> packet(pack_data.begin(), pack_data.end());
+ packet.reserve(256);
+
+ // Placeholder for length
+ packet.push_back(0);
+ packet.push_back(0);
+ packet.push_back(0);
+ packet.push_back(0);
+
+ uint8_t dfctl = dflc % 250;
+ uint8_t dfcth = dflc / 250;
+
+
+ uint16_t dstiHeader = dfctl | (dfcth << 8) | (rfadf << 13) | (atstf << 14) | (stihf << 15);
+ packet.push_back(dstiHeader >> 8);
+ packet.push_back(dstiHeader & 0xFF);
+
+ if (stihf) {
+ packet.push_back(stat);
+ packet.push_back((spid >> 8) & 0xFF);
+ packet.push_back(spid & 0xFF);
+ }
+
+ if (atstf) {
+ packet.push_back(utco);
+
+ packet.push_back((seconds >> 24) & 0xFF);
+ packet.push_back((seconds >> 16) & 0xFF);
+ packet.push_back((seconds >> 8) & 0xFF);
+ packet.push_back(seconds & 0xFF);
+
+ packet.push_back((tsta >> 16) & 0xFF);
+ packet.push_back((tsta >> 8) & 0xFF);
+ packet.push_back(tsta & 0xFF);
+ }
+
+ if (rfadf) {
+ for (size_t i = 0; i < rfad.size(); i++) {
+ packet.push_back(rfad[i]);
+ }
+ }
+ // calculate and update size
+ // remove TAG name and TAG length fields and convert to bits
+ uint32_t taglength = (packet.size() - 8) * 8;
+
+ // write length into packet
+ packet[4] = (taglength >> 24) & 0xFF;
+ packet[5] = (taglength >> 16) & 0xFF;
+ packet[6] = (taglength >> 8) & 0xFF;
+ packet[7] = taglength & 0xFF;
+
+ dflc = (dflc+1) % 5000;
+
+ /*
+ std::cerr << "TagItem dsti, packet.size " << packet.size() << std::endl;
+ std::cerr << " length " << taglength / 8 << std::endl;
+ */
+ return packet;
+}
+
+void TagDSTI::set_edi_time(const std::time_t t, int tai_utc_offset)
+{
+ utco = tai_utc_offset - 32;
+
+ const std::time_t posix_timestamp_1_jan_2000 = 946684800;
+
+ seconds = t - posix_timestamp_1_jan_2000 + utco;
+}
+
+#if 0
+/* Update the EDI time. t is in UTC, TAI offset is requested from adjtimex */
+void TagDSTI::set_edi_time(const std::time_t t)
+{
+ if (tai_offset_cache_updated_at == 0 or tai_offset_cache_updated_at + 3600 < t) {
+ struct timex timex_request;
+ timex_request.modes = 0;
+
+ int err = adjtimex(&timex_request);
+ if (err == -1) {
+ throw std::runtime_error("adjtimex failed");
+ }
+
+ if (timex_request.tai == 0) {
+ throw std::runtime_error("CLOCK_TAI is not properly set up");
+ }
+ tai_offset_cache = timex_request.tai;
+ tai_offset_cache_updated_at = t;
+
+ fprintf(stderr, "adjtimex: %d, tai %d\n", err, timex_request.tai);
+ }
+
+ utco = tai_offset_cache - 32;
+
+ const std::time_t posix_timestamp_1_jan_2000 = 946684800;
+
+ seconds = t - posix_timestamp_1_jan_2000 + utco;
+}
+#endif
+
+std::vector<uint8_t> TagSSm::Assemble()
+{
+ std::string pack_data("ss");
+ std::vector<uint8_t> packet(pack_data.begin(), pack_data.end());
+ packet.reserve(istd_length + 16);
+
+ packet.push_back((id >> 8) & 0xFF);
+ packet.push_back(id & 0xFF);
+
+ // Placeholder for length
+ packet.push_back(0);
+ packet.push_back(0);
+ packet.push_back(0);
+ packet.push_back(0);
+
+ if (rfa > 0x1F) {
+ throw std::runtime_error("TagSSm: invalid RFA value");
+ }
+
+ if (tid > 0x7) {
+ throw std::runtime_error("TagSSm: invalid tid value");
+ }
+
+ if (tidext > 0x7) {
+ throw std::runtime_error("TagSSm: invalid tidext value");
+ }
+
+ if (stid > 0x0FFF) {
+ throw std::runtime_error("TagSSm: invalid stid value");
+ }
+
+ uint32_t istc = (rfa << 19) | (tid << 16) | (tidext << 13) | ((crcstf ? 1 : 0) << 12) | stid;
+ packet.push_back((istc >> 16) & 0xFF);
+ packet.push_back((istc >> 8) & 0xFF);
+ packet.push_back(istc & 0xFF);
+
+ for (size_t i = 0; i < istd_length; i++) {
+ packet.push_back(istd_data[i]);
+ }
+
+ // calculate and update size
+ // remove TAG name and TAG length fields and convert to bits
+ uint32_t taglength = (packet.size() - 8) * 8;
+
+ // write length into packet
+ packet[4] = (taglength >> 24) & 0xFF;
+ packet[5] = (taglength >> 16) & 0xFF;
+ packet[6] = (taglength >> 8) & 0xFF;
+ packet[7] = taglength & 0xFF;
+
+ /*
+ std::cerr << "TagItem SSm, length " << packet.size() << std::endl;
+ std::cerr << " istd_length " << istd_length << std::endl;
+ */
+ return packet;
+}
+
+
std::vector<uint8_t> TagStarDMY::Assemble()
{
std::string pack_data("*dmy");
diff --git a/src/dabOutput/edi/TagItems.h b/lib/edioutput/TagItems.h
index b29a142..25daa14 100644
--- a/src/dabOutput/edi/TagItems.h
+++ b/lib/edioutput/TagItems.h
@@ -10,30 +10,30 @@
*/
/*
- This file is part of ODR-DabMux.
+ This file is part of the ODR-mmbTools.
- ODR-DabMux is free software: you can redistribute it and/or modify
+ This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as
published by the Free Software Foundation, either version 3 of the
License, or (at your option) any later version.
- ODR-DabMux is distributed in the hope that it will be useful,
+ This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
- along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>.
- */
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
#pragma once
#include "config.h"
-#include "Eti.h"
#include <vector>
+#include <array>
#include <chrono>
#include <string>
-#include <stdint.h>
+#include <cstdint>
namespace edi {
@@ -47,7 +47,11 @@ class TagItem
class TagStarPTR : public TagItem
{
public:
+ TagStarPTR(const std::string& protocol);
std::vector<uint8_t> Assemble();
+
+ private:
+ std::string m_protocol = "";
};
// ETSI TS 102 693, 5.1.3 DAB ETI(LI) Management (deti)
@@ -133,6 +137,82 @@ class TagESTn : public TagItem
uint8_t id;
};
+// ETSI TS 102 693, 5.1.2 DAB STI-D(LI) Management
+class TagDSTI : public TagItem
+{
+ public:
+ std::vector<uint8_t> Assemble();
+
+ // dsti Header
+ bool stihf = false;
+ bool atstf = false; // presence of atst data
+ bool rfadf = false;
+ uint16_t dflc = 0; // modulo 5000 frame counter
+
+ // STI Header (optional)
+ uint8_t stat = 0;
+ uint16_t spid = 0;
+
+ /* UTCO: Offset (in seconds) between UTC and the Seconds value. The
+ * value is expressed as an unsigned 8-bit quantity. As of February
+ * 2009, the value shall be 2 and shall change as a result of each
+ * modification of the number of leap seconds, as proscribed by
+ * International Earth Rotation and Reference Systems Service (IERS).
+ *
+ * According to Annex F
+ * EDI = TAI - 32s (constant)
+ * EDI = UTC + UTCO
+ * we derive
+ * UTCO = TAI-UTC - 32
+ * where the TAI-UTC offset is given by the USNO bulletin using
+ * the ClockTAI module.
+ */
+ uint8_t utco = 0;
+
+ /* Update the EDI time. t is in UTC */
+ void set_edi_time(const std::time_t t, int tai_utc_offset);
+
+ /* The number of SI seconds since 2000-01-01 T 00:00:00 UTC as an
+ * unsigned 32-bit quantity. Contrary to POSIX, this value also
+ * counts leap seconds.
+ */
+ uint32_t seconds = 0;
+
+ /* TSTA: Shall be the 24 least significant bits of the Time Stamp
+ * (TIST) field from the STI-D(LI) Frame. The full definition for the
+ * STI TIST can be found in annex B of EN 300 797 [4]. The most
+ * significant 8 bits of the TIST field of the incoming STI-D(LI)
+ * frame, if required, may be carried in the RFAD field.
+ */
+ uint32_t tsta = 0xFFFFFF;
+
+ std::array<uint8_t, 9> rfad;
+
+ private:
+ int tai_offset_cache = 0;
+ std::time_t tai_offset_cache_updated_at = 0;
+};
+
+// ETSI TS 102 693, 5.1.4 STI-D Payload Stream <m>
+class TagSSm : public TagItem
+{
+ public:
+ std::vector<uint8_t> Assemble();
+
+ // SSTCn
+ uint8_t rfa = 0;
+ uint8_t tid = 0; // See EN 300 797, 5.4.1.1. Value 0 means "MSC sub-channel"
+ uint8_t tidext = 0; // EN 300 797, 5.4.1.3, Value 0 means "MSC audio stream"
+ bool crcstf = false;
+ uint16_t stid = 0;
+
+ // Pointer to ISTDm data
+ const uint8_t *istd_data;
+ size_t istd_length; // bytes
+
+ uint16_t id = 0;
+};
+
// ETSI TS 102 821, 5.2.2.2 Dummy padding
class TagStarDMY : public TagItem
{
diff --git a/src/dabOutput/edi/TagPacket.cpp b/lib/edioutput/TagPacket.cpp
index b16dc33..b0bf9a1 100644
--- a/src/dabOutput/edi/TagPacket.cpp
+++ b/lib/edioutput/TagPacket.cpp
@@ -8,31 +8,30 @@
This defines a TAG Packet.
*/
/*
- This file is part of ODR-DabMux.
+ This file is part of the ODR-mmbTools.
- ODR-DabMux is free software: you can redistribute it and/or modify
+ This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as
published by the Free Software Foundation, either version 3 of the
License, or (at your option) any later version.
- ODR-DabMux is distributed in the hope that it will be useful,
+ This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
- along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>.
- */
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
#include "config.h"
-#include "Eti.h"
#include "TagPacket.h"
#include "TagItems.h"
#include <vector>
#include <iostream>
#include <string>
#include <list>
-#include <stdint.h>
+#include <cstdint>
#include <cassert>
namespace edi {
diff --git a/src/dabOutput/edi/TagPacket.h b/lib/edioutput/TagPacket.h
index a861cbb..1e40ce7 100644
--- a/src/dabOutput/edi/TagPacket.h
+++ b/lib/edioutput/TagPacket.h
@@ -8,21 +8,21 @@
This defines a TAG Packet.
*/
/*
- This file is part of ODR-DabMux.
+ This file is part of the ODR-mmbTools.
- ODR-DabMux is free software: you can redistribute it and/or modify
+ This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as
published by the Free Software Foundation, either version 3 of the
License, or (at your option) any later version.
- ODR-DabMux is distributed in the hope that it will be useful,
+ This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
- along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>.
- */
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
#pragma once
@@ -31,7 +31,7 @@
#include <vector>
#include <string>
#include <list>
-#include <stdint.h>
+#include <cstdint>
namespace edi {
diff --git a/src/dabOutput/edi/Transport.cpp b/lib/edioutput/Transport.cpp
index d99e987..0d5c237 100644
--- a/src/dabOutput/edi/Transport.cpp
+++ b/lib/edioutput/Transport.cpp
@@ -9,22 +9,21 @@
*/
/*
- This file is part of ODR-DabMux.
+ This file is part of the ODR-mmbTools.
- ODR-DabMux is free software: you can redistribute it and/or modify
+ This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as
published by the Free Software Foundation, either version 3 of the
License, or (at your option) any later version.
- ODR-DabMux is distributed in the hope that it will be useful,
+ This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
- along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>.
- */
-
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
#include "Transport.h"
#include <iterator>
@@ -45,12 +44,16 @@ void configuration_t::print() const
}
etiLog.level(info) << " source port " << udp_dest->source_port;
}
- else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_destination_t>(edi_dest)) {
+ else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_server_t>(edi_dest)) {
etiLog.level(info) << " TCP listening on port " << tcp_dest->listen_port;
etiLog.level(info) << " max frames queued " << tcp_dest->max_frames_queued;
}
+ else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_client_t>(edi_dest)) {
+ etiLog.level(info) << " TCP client connecting to " << tcp_dest->dest_addr << ":" << tcp_dest->dest_port;
+ etiLog.level(info) << " max frames queued " << tcp_dest->max_frames_queued;
+ }
else {
- throw std::logic_error("EDI destination not implemented");
+ throw logic_error("EDI destination not implemented");
}
}
if (interleaver_enabled()) {
@@ -69,28 +72,27 @@ Sender::Sender(const configuration_t& conf) :
for (const auto& edi_dest : m_conf.destinations) {
if (const auto udp_dest = dynamic_pointer_cast<edi::udp_destination_t>(edi_dest)) {
- auto udp_socket = std::make_shared<UdpSocket>(udp_dest->source_port);
+ auto udp_socket = std::make_shared<Socket::UDPSocket>(udp_dest->source_port);
if (not udp_dest->source_addr.empty()) {
- int err = udp_socket->setMulticastSource(udp_dest->source_addr.c_str());
- if (err) {
- throw runtime_error("EDI socket set source failed!");
- }
- err = udp_socket->setMulticastTTL(udp_dest->ttl);
- if (err) {
- throw runtime_error("EDI socket set TTL failed!");
- }
+ udp_socket->setMulticastSource(udp_dest->source_addr.c_str());
+ udp_socket->setMulticastTTL(udp_dest->ttl);
}
udp_sockets.emplace(udp_dest.get(), udp_socket);
}
- else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_destination_t>(edi_dest)) {
- auto dispatcher = make_shared<TCPDataDispatcher>(tcp_dest->max_frames_queued);
+ else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_server_t>(edi_dest)) {
+ auto dispatcher = make_shared<Socket::TCPDataDispatcher>(tcp_dest->max_frames_queued);
dispatcher->start(tcp_dest->listen_port, "0.0.0.0");
tcp_dispatchers.emplace(tcp_dest.get(), dispatcher);
}
+ else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_client_t>(edi_dest)) {
+ auto tcp_socket = make_shared<Socket::TCPSocket>();
+ tcp_socket->connect(tcp_dest->dest_addr, tcp_dest->dest_port);
+ tcp_senders.emplace(tcp_dest.get(), tcp_socket);
+ }
else {
- throw std::logic_error("EDI destination not implemented");
+ throw logic_error("EDI destination not implemented");
}
}
@@ -117,7 +119,7 @@ void Sender::write(const TagPacket& tagpacket)
vector<edi::PFTFragment> edi_fragments = edi_pft.Assemble(af_packet);
if (m_conf.verbose) {
- fprintf(stderr, "EDI number of PFT fragment before interleaver %zu",
+ fprintf(stderr, "EDI number of PFT fragment before interleaver %zu\n",
edi_fragments.size());
}
@@ -129,28 +131,30 @@ void Sender::write(const TagPacket& tagpacket)
for (const auto& edi_frag : edi_fragments) {
for (auto& dest : m_conf.destinations) {
if (const auto& udp_dest = dynamic_pointer_cast<edi::udp_destination_t>(dest)) {
- InetAddress addr;
- addr.setAddress(udp_dest->dest_addr.c_str());
- addr.setPort(m_conf.dest_port);
+ Socket::InetAddress addr;
+ addr.resolveUdpDestination(udp_dest->dest_addr, m_conf.dest_port);
udp_sockets.at(udp_dest.get())->send(edi_frag, addr);
}
- else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_destination_t>(dest)) {
+ else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_server_t>(dest)) {
tcp_dispatchers.at(tcp_dest.get())->write(edi_frag);
}
+ else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_client_t>(dest)) {
+ tcp_senders.at(tcp_dest.get())->sendall(edi_frag.data(), edi_frag.size());
+ }
else {
- throw std::logic_error("EDI destination not implemented");
+ throw logic_error("EDI destination not implemented");
}
}
if (m_conf.dump) {
- std::ostream_iterator<uint8_t> debug_iterator(edi_debug_file);
- std::copy(edi_frag.begin(), edi_frag.end(), debug_iterator);
+ ostream_iterator<uint8_t> debug_iterator(edi_debug_file);
+ copy(edi_frag.begin(), edi_frag.end(), debug_iterator);
}
}
if (m_conf.verbose) {
- fprintf(stderr, "EDI number of PFT fragments %zu",
+ fprintf(stderr, "EDI number of PFT fragments %zu\n",
edi_fragments.size());
}
}
@@ -158,23 +162,25 @@ void Sender::write(const TagPacket& tagpacket)
// Send over ethernet
for (auto& dest : m_conf.destinations) {
if (const auto& udp_dest = dynamic_pointer_cast<edi::udp_destination_t>(dest)) {
- InetAddress addr;
- addr.setAddress(udp_dest->dest_addr.c_str());
- addr.setPort(m_conf.dest_port);
+ Socket::InetAddress addr;
+ addr.resolveUdpDestination(udp_dest->dest_addr, m_conf.dest_port);
udp_sockets.at(udp_dest.get())->send(af_packet, addr);
}
- else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_destination_t>(dest)) {
+ else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_server_t>(dest)) {
tcp_dispatchers.at(tcp_dest.get())->write(af_packet);
}
+ else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_client_t>(dest)) {
+ tcp_senders.at(tcp_dest.get())->sendall(af_packet.data(), af_packet.size());
+ }
else {
- throw std::logic_error("EDI destination not implemented");
+ throw logic_error("EDI destination not implemented");
}
}
if (m_conf.dump) {
- std::ostream_iterator<uint8_t> debug_iterator(edi_debug_file);
- std::copy(af_packet.begin(), af_packet.end(), debug_iterator);
+ ostream_iterator<uint8_t> debug_iterator(edi_debug_file);
+ copy(af_packet.begin(), af_packet.end(), debug_iterator);
}
}
}
diff --git a/src/dabOutput/edi/Transport.h b/lib/edioutput/Transport.h
index 7b0a0db..325acf8 100644
--- a/src/dabOutput/edi/Transport.h
+++ b/lib/edioutput/Transport.h
@@ -9,34 +9,35 @@
*/
/*
- This file is part of ODR-DabMux.
+ This file is part of the ODR-mmbTools.
- ODR-DabMux is free software: you can redistribute it and/or modify
+ This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as
published by the Free Software Foundation, either version 3 of the
License, or (at your option) any later version.
- ODR-DabMux is distributed in the hope that it will be useful,
+ This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
- along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>.
- */
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
#pragma once
#include "config.h"
-#include "dabOutput/edi/Config.h"
+#include "Config.h"
#include "AFPacket.h"
#include "PFT.h"
#include "Interleaver.h"
+#include "Socket.h"
#include <vector>
#include <unordered_map>
#include <stdexcept>
+#include <fstream>
#include <cstdint>
-#include "dabOutput/dabOutput.h"
namespace edi {
@@ -61,8 +62,9 @@ class Sender {
// To mitigate for burst packet loss, PFT fragments can be sent out-of-order
edi::Interleaver edi_interleaver;
- std::unordered_map<udp_destination_t*, std::shared_ptr<UdpSocket>> udp_sockets;
- std::unordered_map<tcp_destination_t*, std::shared_ptr<TCPDataDispatcher>> tcp_dispatchers;
+ std::unordered_map<udp_destination_t*, std::shared_ptr<Socket::UDPSocket>> udp_sockets;
+ std::unordered_map<tcp_server_t*, std::shared_ptr<Socket::TCPDataDispatcher>> tcp_dispatchers;
+ std::unordered_map<tcp_client_t*, std::shared_ptr<Socket::TCPSocket>> tcp_senders;
};
}
diff --git a/src/ConfigParser.cpp b/src/ConfigParser.cpp
index fb49efc..3142bb3 100644
--- a/src/ConfigParser.cpp
+++ b/src/ConfigParser.cpp
@@ -40,6 +40,7 @@
#include "utils.h"
#include "DabMux.h"
#include "ManagementServer.h"
+#include "input/Edi.h"
#include "input/Prbs.h"
#include "input/Zmq.h"
#include "input/File.h"
@@ -876,34 +877,46 @@ static void setup_subchannel_from_ptree(shared_ptr<DabSubchannel>& subchan,
type = pt.get<string>("type");
}
catch (const ptree_error &e) {
- stringstream ss;
- ss << "Subchannel with uid " << subchanuid << " has no type defined!";
- throw runtime_error(ss.str());
+ throw runtime_error("Subchannel with uid " + subchanuid + " has no type defined!");
}
- /* Both inputfile and inputuri are supported, and are equivalent.
- * inputuri has precedence
+ /* Up to v2.3.1, both inputfile and inputuri are supported, and are
+ * equivalent. inputuri has precedence.
+ *
+ * After that, either inputfile or the (inputproto, inputuri) pair must be given, but not both.
*/
string inputUri = pt.get<string>("inputuri", "");
+ string proto = pt.get<string>("inputproto", "");
- if (inputUri == "") {
+ if (inputUri.empty() and proto.empty()) {
try {
+ /* Old approach, derives proto from scheme used in the URL.
+ * This makes it impossible to distinguish between ZMQ tcp:// and
+ * EDI tcp://
+ */
inputUri = pt.get<string>("inputfile");
+ size_t protopos = inputUri.find("://");
+
+ if (protopos == string::npos) {
+ proto = "file";
+ }
+ else {
+ proto = inputUri.substr(0, protopos);
+
+ if (proto == "tcp" or proto == "epgm" or proto == "ipc") {
+ proto = "zmq";
+ }
+ else if (proto == "sti-rtp") {
+ proto = "sti";
+ }
+ }
}
catch (const ptree_error &e) {
- stringstream ss;
- ss << "Subchannel with uid " << subchanuid << " has no inputUri defined!";
- throw runtime_error(ss.str());
+ throw runtime_error("Subchannel with uid " + subchanuid + " has no input defined!");
}
}
-
- string proto;
- size_t protopos = inputUri.find("://");
- if (protopos == string::npos) {
- proto = "file";
- }
- else {
- proto = inputUri.substr(0, protopos);
+ else if (inputUri.empty() or proto.empty()) {
+ throw runtime_error("Must define both inputuri and inputproto for uid " + subchanuid);
}
subchan->inputUri = inputUri;
@@ -928,7 +941,7 @@ static void setup_subchannel_from_ptree(shared_ptr<DabSubchannel>& subchan,
throw logic_error("Incomplete handling of file input");
}
}
- else if (proto == "tcp" or proto == "epgm" or proto == "ipc") {
+ else if (proto == "zmq") {
auto zmqconfig = setup_zmq_input(pt, subchanuid);
if (type == "audio") {
@@ -941,15 +954,11 @@ static void setup_subchannel_from_ptree(shared_ptr<DabSubchannel>& subchan,
rcs.enrol(inzmq.get());
subchan->input = inzmq;
}
-
- if (proto == "epgm") {
- etiLog.level(warn) << "Using untested epgm:// zeromq input";
- }
- else if (proto == "ipc") {
- etiLog.level(warn) << "Using untested ipc:// zeromq input";
- }
}
- else if (proto == "sti-rtp") {
+ else if (proto == "edi") {
+ subchan->input = make_shared<Inputs::Edi>();
+ }
+ else if (proto == "stp") {
subchan->input = make_shared<Inputs::Sti_d_Rtp>();
}
else {
diff --git a/src/DabMultiplexer.cpp b/src/DabMultiplexer.cpp
index 9ff28a3..0d68ac2 100644
--- a/src/DabMultiplexer.cpp
+++ b/src/DabMultiplexer.cpp
@@ -284,13 +284,13 @@ void DabMultiplexer::prepare_services_components()
component->subchId, component->serviceId);
throw MuxInitException();
}
- if ((*subchannel)->type != subchannel_type_t::Packet) continue;
- component->packet.id = cur_packetid++;
+ if ((*subchannel)->type == subchannel_type_t::Packet) {
+ component->packet.id = cur_packetid++;
+ }
rcs.enrol(component.get());
}
-
}
void DabMultiplexer::prepare_data_inputs()
@@ -376,7 +376,7 @@ void DabMultiplexer::mux_frame(std::vector<std::shared_ptr<DabOutput> >& outputs
// For EDI, save ETI(LI) Management data into a TAG Item DETI
edi::TagDETI edi_tagDETI;
- edi::TagStarPTR edi_tagStarPtr;
+ edi::TagStarPTR edi_tagStarPtr("DETI");
map<DabSubchannel*, edi::TagESTn> edi_subchannelToTag;
// The above Tag Items will be assembled into a TAG Packet
diff --git a/src/DabMultiplexer.h b/src/DabMultiplexer.h
index 386c23c..56a8dde 100644
--- a/src/DabMultiplexer.h
+++ b/src/DabMultiplexer.h
@@ -30,15 +30,14 @@
#endif
#include "dabOutput/dabOutput.h"
-#include "dabOutput/edi/TagItems.h"
-#include "dabOutput/edi/TagPacket.h"
-#include "dabOutput/edi/AFPacket.h"
-#include "dabOutput/edi/Transport.h"
+#include "edioutput/TagItems.h"
+#include "edioutput/TagPacket.h"
+#include "edioutput/AFPacket.h"
+#include "edioutput/Transport.h"
#include "fig/FIGCarousel.h"
#include "crc.h"
#include "utils.h"
-#include "UdpSocket.h"
-#include "InetAddress.h"
+#include "Socket.h"
#include "PcDebug.h"
#include "MuxElements.h"
#include "RemoteControl.h"
diff --git a/src/DabMux.cpp b/src/DabMux.cpp
index 51f0310..578fc63 100644
--- a/src/DabMux.cpp
+++ b/src/DabMux.cpp
@@ -99,8 +99,7 @@ typedef DWORD32 uint32_t;
#include "dabOutput/dabOutput.h"
#include "crc.h"
-#include "UdpSocket.h"
-#include "InetAddress.h"
+#include "Socket.h"
#include "PcDebug.h"
#include "DabMux.h"
#include "MuxElements.h"
@@ -305,7 +304,7 @@ int main(int argc, char *argv[])
edi_conf.destinations.push_back(dest);
}
else if (proto == "tcp") {
- auto dest = make_shared<edi::tcp_destination_t>();
+ auto dest = make_shared<edi::tcp_server_t>();
dest->listen_port = pt_edi_dest.second.get<unsigned int>("listenport");
dest->max_frames_queued = pt_edi_dest.second.get<size_t>("max_frames_queued", 500);
edi_conf.destinations.push_back(dest);
diff --git a/src/InetAddress.cpp b/src/InetAddress.cpp
deleted file mode 100644
index 7660263..0000000
--- a/src/InetAddress.cpp
+++ /dev/null
@@ -1,155 +0,0 @@
-/*
- Copyright (C) 2003, 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the
- Queen in Right of Canada (Communications Research Center Canada)
-
- Copyright (C) 2016
- Matthias P. Braendli, matthias.braendli@mpb.li
-
- http://www.opendigitalradio.org
- */
-/*
- This file is part of ODR-DabMux.
-
- ODR-DabMux is free software: you can redistribute it and/or modify
- it under the terms of the GNU General Public License as
- published by the Free Software Foundation, either version 3 of the
- License, or (at your option) any later version.
-
- ODR-DabMux is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>.
- */
-
-#include "InetAddress.h"
-#include <iostream>
-#include <stdio.h>
-#include <errno.h>
-#include <string.h>
-
-#ifdef TRACE_ON
-# ifndef TRACE_CLASS
-# define TRACE_CLASS(clas, func) cout <<"-" <<(clas) <<"\t(" <<this <<")::" <<(func) <<endl
-# define TRACE_STATIC(clas, func) cout <<"-" <<(clas) <<"\t(static)::" <<(func) <<endl
-# endif
-#else
-# ifndef TRACE_CLASS
-# define TRACE_CLASS(clas, func)
-# define TRACE_STATIC(clas, func)
-# endif
-#endif
-
-
-int inetErrNo = 0;
-const char *inetErrMsg = nullptr;
-const char *inetErrDesc = nullptr;
-
-
-/**
- * Constructs an IP address.
- * @param port The port of this address
- * @param name The name of this address
- */
-InetAddress::InetAddress(int port, const char* name) {
- TRACE_CLASS("InetAddress", "InetAddress(int, char)");
- addr.sin_family = PF_INET;
- addr.sin_addr.s_addr = htons(INADDR_ANY);
- addr.sin_port = htons(port);
- if (name)
- setAddress(name);
-}
-
-
-/// Returns the raw IP address of this InetAddress object.
-sockaddr *InetAddress::getAddress() {
- TRACE_CLASS("InetAddress", "getAddress()");
- return (sockaddr *)&addr;
-}
-
-
-/// Return the port of this address.
-int InetAddress::getPort()
-{
- TRACE_CLASS("InetAddress", "getPort()");
- return ntohs(addr.sin_port);
-}
-
-
-/**
- * Returns the IP address string "%d.%d.%d.%d".
- * @return IP address
- */
-const char *InetAddress::getHostAddress() {
- TRACE_CLASS("InetAddress", "getHostAddress()");
- return inet_ntoa(addr.sin_addr);
-}
-
-
-/// Returns true if this address is multicast
-bool InetAddress::isMulticastAddress() {
- TRACE_CLASS("InetAddress", "isMulticastAddress()");
- return IN_MULTICAST(ntohl(addr.sin_addr.s_addr)); // a modifier
-}
-
-
-/**
- * Set the port number
- * @param port The new port number
- */
-void InetAddress::setPort(int port)
-{
- TRACE_CLASS("InetAddress", "setPort(int)");
- addr.sin_port = htons(port);
-}
-
-
-/**
- * Set the address
- * @param name The new address name
- * @return 0 if ok
- * -1 if error
- */
-int InetAddress::setAddress(const std::string& name)
-{
- TRACE_CLASS("InetAddress", "setAddress(string)");
- if (!name.empty()) {
- if (atoi(name.c_str())) { // If it start with a number
- if ((addr.sin_addr.s_addr = inet_addr(name.c_str())) == INADDR_NONE) {
- addr.sin_addr.s_addr = htons(INADDR_ANY);
- inetErrNo = 0;
- inetErrMsg = "Invalid address";
- inetErrDesc = name.c_str();
- return -1;
- }
- }
- else { // Assume it's a real name
- hostent *host = gethostbyname(name.c_str());
- if (host) {
- addr.sin_addr = *(in_addr *)(host->h_addr);
- } else {
- addr.sin_addr.s_addr = htons(INADDR_ANY);
- inetErrNo = 0;
- inetErrMsg = "Could not find address";
- inetErrDesc = name.c_str();
- return -1;
- }
- }
- }
- else {
- addr.sin_addr.s_addr = INADDR_ANY;
- }
- return 0;
-}
-
-
-void setInetError(const char* description)
-{
- inetErrNo = 0;
- inetErrNo = errno;
- inetErrMsg = strerror(inetErrNo);
- inetErrDesc = description;
-}
-
diff --git a/src/InetAddress.h b/src/InetAddress.h
deleted file mode 100644
index e246d4c..0000000
--- a/src/InetAddress.h
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- Copyright (C) 2003, 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the
- Queen in Right of Canada (Communications Research Center Canada)
-
- Copyright (C) 2016
- Matthias P. Braendli, matthias.braendli@mpb.li
-
- http://www.opendigitalradio.org
- */
-/*
- This file is part of ODR-DabMux.
-
- ODR-DabMux is free software: you can redistribute it and/or modify
- it under the terms of the GNU General Public License as
- published by the Free Software Foundation, either version 3 of the
- License, or (at your option) any later version.
-
- ODR-DabMux is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>.
- */
-
-#ifndef _InetAddress
-#define _InetAddress
-
-#ifdef HAVE_CONFIG_H
-# include "config.h"
-#endif
-
-#include <stdlib.h>
-#include <sys/socket.h>
-#include <netinet/in.h>
-#include <unistd.h>
-#include <netdb.h>
-#include <arpa/inet.h>
-#include <pthread.h>
-#include <string>
-
-#define SOCKET int
-#define INVALID_SOCKET -1
-#define INVALID_PORT -1
-
-
-/// The last error number
-extern int inetErrNo;
-/// The last error message
-extern const char *inetErrMsg;
-/// The description of the last error
-extern const char *inetErrDesc;
-/// Set the number, message and description of the last error
-void setInetError(const char* description);
-
-
-/**
- * This class represents an Internet Protocol (IP) address.
- * @author Pascal Charest pascal.charest@crc.ca
- */
-class InetAddress {
- public:
- InetAddress(int port = 0, const char* name = NULL);
-
- sockaddr *getAddress();
- const char *getHostAddress();
- int getPort();
- int setAddress(const std::string& name);
- void setPort(int port);
- bool isMulticastAddress();
-
- private:
- sockaddr_in addr;
-};
-
-
-#endif
diff --git a/src/TcpSocket.cpp b/src/TcpSocket.cpp
deleted file mode 100644
index 3ebe73c..0000000
--- a/src/TcpSocket.cpp
+++ /dev/null
@@ -1,359 +0,0 @@
-/*
- Copyright (C) 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the Queen in
- Right of Canada (Communications Research Center Canada)
-
- Copyright (C) 2019
- Matthias P. Braendli, matthias.braendli@mpb.li
-
- http://www.opendigitalradio.org
- */
-/*
- This file is part of ODR-DabMux.
-
- ODR-DabMux is free software: you can redistribute it and/or modify
- it under the terms of the GNU General Public License as
- published by the Free Software Foundation, either version 3 of the
- License, or (at your option) any later version.
-
- ODR-DabMux is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>.
- */
-
-#include "TcpSocket.h"
-#include "Log.h"
-#include <iostream>
-#include <cstdio>
-#include <cstring>
-#include <cstdint>
-#include <signal.h>
-#include <errno.h>
-#include <poll.h>
-#include <thread>
-
-using namespace std;
-
-using vec_u8 = std::vector<uint8_t>;
-
-
-TcpSocket::TcpSocket() :
- m_sock(INVALID_SOCKET)
-{
-}
-
-TcpSocket::TcpSocket(int port, const string& name) :
- m_sock(INVALID_SOCKET)
-{
- if (port) {
- if ((m_sock = socket(PF_INET, SOCK_STREAM, 0)) == INVALID_SOCKET) {
- throw std::runtime_error("Can't create socket");
- }
-
- reuseopt_t reuse = 1;
- if (setsockopt(m_sock, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse))
- == SOCKET_ERROR) {
- throw std::runtime_error("Can't reuse address");
- }
-
-#if defined(HAVE_SO_NOSIGPIPE)
- int val = 1;
- if (setsockopt(m_sock, SOL_SOCKET, SO_NOSIGPIPE, &val, sizeof(val))
- == SOCKET_ERROR) {
- throw std::runtime_error("Can't set SO_NOSIGPIPE");
- }
-#endif
-
- m_own_address.setAddress(name);
- m_own_address.setPort(port);
-
- if (::bind(m_sock, m_own_address.getAddress(), sizeof(sockaddr_in)) == SOCKET_ERROR) {
- ::close(m_sock);
- m_sock = INVALID_SOCKET;
- throw std::runtime_error("Can't bind socket");
- }
- }
-}
-
-TcpSocket::TcpSocket(SOCKET sock, InetAddress own, InetAddress remote) :
- m_own_address(own),
- m_remote_address(remote),
- m_sock(sock) { }
-
-// The move constructors must ensure the moved-from
-// TcpSocket won't destroy our socket handle
-TcpSocket::TcpSocket(TcpSocket&& other)
-{
- m_sock = other.m_sock;
- other.m_sock = INVALID_SOCKET;
-
- m_own_address = other.m_own_address;
- m_remote_address = other.m_remote_address;
-}
-
-TcpSocket& TcpSocket::operator=(TcpSocket&& other)
-{
- m_sock = other.m_sock;
- other.m_sock = INVALID_SOCKET;
-
- m_own_address = other.m_own_address;
- m_remote_address = other.m_remote_address;
- return *this;
-}
-
-/**
- * Close the underlying socket.
- * @return 0 if ok
- * -1 if error
- */
-int TcpSocket::close()
-{
- if (m_sock != INVALID_SOCKET) {
- int res = ::close(m_sock);
- if (res != 0) {
- setInetError("Can't close socket");
- return -1;
- }
- m_sock = INVALID_SOCKET;
- }
- return 0;
-}
-
-TcpSocket::~TcpSocket()
-{
- close();
-}
-
-bool TcpSocket::isValid()
-{
- return m_sock != INVALID_SOCKET;
-}
-
-ssize_t TcpSocket::recv(void* data, size_t size)
-{
- ssize_t ret = ::recv(m_sock, (char*)data, size, 0);
- if (ret == SOCKET_ERROR) {
- stringstream ss;
- ss << "TCP Socket recv error: " << strerror(errno);
- throw std::runtime_error(ss.str());
- }
- return ret;
-}
-
-
-ssize_t TcpSocket::send(const void* data, size_t size, int timeout_ms)
-{
- if (timeout_ms) {
- struct pollfd fds[1];
- fds[0].fd = m_sock;
- fds[0].events = POLLOUT;
-
- const int retval = poll(fds, 1, timeout_ms);
-
- if (retval == -1) {
- stringstream ss;
- ss << "TCP Socket send error on poll(): " << strerror(errno);
- throw std::runtime_error(ss.str());
- }
- else if (retval == 0) {
- // Timed out
- return 0;
- }
- }
-
- /* On Linux, the MSG_NOSIGNAL flag ensures that the process would not
- * receive a SIGPIPE and die.
- * Other systems have SO_NOSIGPIPE set on the socket for the same effect. */
-#if defined(HAVE_MSG_NOSIGNAL)
- const int flags = MSG_NOSIGNAL;
-#else
- const int flags = 0;
-#endif
- const ssize_t ret = ::send(m_sock, (const char*)data, size, flags);
-
- if (ret == SOCKET_ERROR) {
- stringstream ss;
- ss << "TCP Socket send error: " << strerror(errno);
- throw std::runtime_error(ss.str());
- }
- return ret;
-}
-
-void TcpSocket::listen()
-{
- if (::listen(m_sock, 1) == SOCKET_ERROR) {
- stringstream ss;
- ss << "TCP Socket listen error: " << strerror(errno);
- throw std::runtime_error(ss.str());
- }
-}
-
-TcpSocket TcpSocket::accept()
-{
- InetAddress remote_addr;
- socklen_t addrLen = sizeof(sockaddr_in);
-
- SOCKET socket = ::accept(m_sock, remote_addr.getAddress(), &addrLen);
- if (socket == SOCKET_ERROR) {
- stringstream ss;
- ss << "TCP Socket accept error: " << strerror(errno);
- throw std::runtime_error(ss.str());
- }
- else {
- TcpSocket client(socket, m_own_address, remote_addr);
- return client;
- }
-}
-
-TcpSocket TcpSocket::accept(int timeout_ms)
-{
- struct pollfd fds[1];
- fds[0].fd = m_sock;
- fds[0].events = POLLIN | POLLOUT;
-
- int retval = poll(fds, 1, timeout_ms);
-
- if (retval == -1) {
- stringstream ss;
- ss << "TCP Socket accept error: " << strerror(errno);
- throw std::runtime_error(ss.str());
- }
- else if (retval) {
- return accept();
- }
- else {
- TcpSocket invalidsock(0, "");
- return invalidsock;
- }
-}
-
-
-InetAddress TcpSocket::getOwnAddress() const
-{
- return m_own_address;
-}
-
-InetAddress TcpSocket::getRemoteAddress() const
-{
- return m_remote_address;
-}
-
-
-TCPConnection::TCPConnection(TcpSocket&& sock) :
- queue(),
- m_running(true),
- m_sender_thread(),
- m_sock(move(sock))
-{
- auto own_addr = m_sock.getOwnAddress();
- auto addr = m_sock.getRemoteAddress();
- etiLog.level(debug) << "New TCP Connection on port " <<
- own_addr.getPort() << " from " <<
- addr.getHostAddress() << ":" << addr.getPort();
- m_sender_thread = std::thread(&TCPConnection::process, this);
-}
-
-TCPConnection::~TCPConnection()
-{
- m_running = false;
- vec_u8 termination_marker;
- queue.push(termination_marker);
- m_sender_thread.join();
-}
-
-void TCPConnection::process()
-{
- while (m_running) {
- vec_u8 data;
- queue.wait_and_pop(data);
-
- if (data.empty()) {
- // empty vector is the termination marker
- m_running = false;
- break;
- }
-
- try {
- ssize_t remaining = data.size();
- const uint8_t *buf = reinterpret_cast<const uint8_t*>(data.data());
- const int timeout_ms = 10; // Less than one ETI frame
-
- while (m_running and remaining > 0) {
- const ssize_t sent = m_sock.send(buf, remaining, timeout_ms);
- if (sent < 0 or sent > remaining) {
- throw std::logic_error("Invalid TcpSocket::send() return value");
- }
- remaining -= sent;
- buf += sent;
- }
- }
- catch (const std::runtime_error& e) {
- m_running = false;
- }
- }
-
-
- auto own_addr = m_sock.getOwnAddress();
- auto addr = m_sock.getRemoteAddress();
- etiLog.level(debug) << "Dropping TCP Connection on port " <<
- own_addr.getPort() << " from " <<
- addr.getHostAddress() << ":" << addr.getPort();
-}
-
-
-TCPDataDispatcher::TCPDataDispatcher(size_t max_queue_size) :
- m_max_queue_size(max_queue_size)
-{
-}
-
-TCPDataDispatcher::~TCPDataDispatcher()
-{
- m_running = false;
- m_connections.clear();
- m_listener_socket.close();
- m_listener_thread.join();
-}
-
-void TCPDataDispatcher::start(int port, const string& address)
-{
- TcpSocket sock(port, address);
- m_listener_socket = move(sock);
-
- m_running = true;
- m_listener_thread = std::thread(&TCPDataDispatcher::process, this);
-}
-
-void TCPDataDispatcher::write(const vec_u8& data)
-{
- for (auto& connection : m_connections) {
- connection.queue.push(data);
- }
-
- m_connections.remove_if(
- [&](const TCPConnection& conn){ return conn.queue.size() > m_max_queue_size; });
-}
-
-void TCPDataDispatcher::process()
-{
- try {
- m_listener_socket.listen();
-
- const int timeout_ms = 1000;
-
- while (m_running) {
- // Add a new TCPConnection to the list, constructing it from the client socket
- auto sock = m_listener_socket.accept(timeout_ms);
- if (sock.isValid()) {
- m_connections.emplace(m_connections.begin(), move(sock));
- }
- }
- }
- catch (const std::runtime_error& e) {
- etiLog.level(error) << "TCPDataDispatcher caught runtime error: " << e.what();
- m_running = false;
- }
-}
-
diff --git a/src/TcpSocket.h b/src/TcpSocket.h
deleted file mode 100644
index ec7afd3..0000000
--- a/src/TcpSocket.h
+++ /dev/null
@@ -1,164 +0,0 @@
-/*
- Copyright (C) 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the Queen in
- Right of Canada (Communications Research Center Canada)
-
- Copyright (C) 2019
- Matthias P. Braendli, matthias.braendli@mpb.li
-
- http://www.opendigitalradio.org
- */
-/*
- This file is part of ODR-DabMux.
-
- ODR-DabMux is free software: you can redistribute it and/or modify
- it under the terms of the GNU General Public License as
- published by the Free Software Foundation, either version 3 of the
- License, or (at your option) any later version.
-
- ODR-DabMux is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>.
- */
-
-#ifndef _TCPSOCKET
-#define _TCPSOCKET
-
-#ifdef HAVE_CONFIG_H
-# include "config.h"
-#endif
-
-#include "InetAddress.h"
-#include "ThreadsafeQueue.h"
-#include <sys/socket.h>
-#include <netinet/in.h>
-#include <unistd.h>
-#include <netdb.h>
-#include <arpa/inet.h>
-#include <pthread.h>
-#define SOCKET int
-#define INVALID_SOCKET -1
-#define SOCKET_ERROR -1
-#define reuseopt_t int
-
-#include <iostream>
-#include <string>
-#include <vector>
-#include <memory>
-#include <atomic>
-#include <thread>
-#include <list>
-
-/**
- * This class represents a TCP socket.
- */
-class TcpSocket
-{
- public:
- /** Create a new socket that does nothing */
- TcpSocket();
-
- /** Create a new socket listening for incoming connections.
- * @param port The port number on which the socket will listen.
- * @param name The IP address on which the socket will be bound.
- * It is used to bind the socket on a specific interface if
- * the computer have many NICs.
- */
- TcpSocket(int port, const std::string& name);
- ~TcpSocket();
- TcpSocket(TcpSocket&& other);
- TcpSocket& operator=(TcpSocket&& other);
- TcpSocket(const TcpSocket& other) = delete;
- TcpSocket& operator=(const TcpSocket& other) = delete;
-
- bool isValid(void);
-
- int close(void);
-
- /** Send data over the TCP connection.
- * @param data The buffer that will be sent.
- * @param size Number of bytes to send.
- * @param timeout_ms number of milliseconds before timeout, or 0 for infinite timeout
- * return number of bytes sent, 0 on timeout, or throws runtime_error.
- */
- ssize_t send(const void* data, size_t size, int timeout_ms=0);
-
- /** Receive data from the socket.
- * @param data The buffer that will receive data.
- * @param size The buffer size.
- * @return number of bytes received or -1 (SOCKET_ERROR) if error
- */
- ssize_t recv(void* data, size_t size);
-
- void listen(void);
- TcpSocket accept(void);
-
- /* Returns either valid socket if a connection was
- * accepted before the timeout expired, or an invalid
- * socket otherwise.
- */
- TcpSocket accept(int timeout_ms);
-
- /** Retrieve address this socket is bound to */
- InetAddress getOwnAddress() const;
- InetAddress getRemoteAddress() const;
-
- private:
- TcpSocket(SOCKET sock, InetAddress own, InetAddress remote);
-
- /// The address on which the socket is bound.
- InetAddress m_own_address;
- InetAddress m_remote_address;
- /// The low-level socket used by system functions.
- SOCKET m_sock;
-};
-
-/* Helper class for TCPDataDispatcher, contains a queue of pending data and
- * a sender thread. */
-class TCPConnection
-{
- public:
- TCPConnection(TcpSocket&& sock);
- TCPConnection(const TCPConnection&) = delete;
- TCPConnection& operator=(const TCPConnection&) = delete;
- ~TCPConnection();
-
- ThreadsafeQueue<std::vector<uint8_t> > queue;
-
- private:
- std::atomic<bool> m_running;
- std::thread m_sender_thread;
- TcpSocket m_sock;
-
- void process(void);
-};
-
-/* Send a TCP stream to several destinations, and automatically disconnect destinations
- * whose buffer overflows.
- */
-class TCPDataDispatcher
-{
- public:
- TCPDataDispatcher(size_t max_queue_size);
- ~TCPDataDispatcher();
- TCPDataDispatcher(const TCPDataDispatcher&) = delete;
- TCPDataDispatcher& operator=(const TCPDataDispatcher&) = delete;
-
- void start(int port, const std::string& address);
- void write(const std::vector<uint8_t>& data);
-
- private:
- void process(void);
-
- size_t m_max_queue_size;
-
- std::atomic<bool> m_running;
- std::thread m_listener_thread;
- TcpSocket m_listener_socket;
- std::list<TCPConnection> m_connections;
-};
-
-#endif // _TCPSOCKET
diff --git a/src/UdpSocket.cpp b/src/UdpSocket.cpp
deleted file mode 100644
index 3d015ec..0000000
--- a/src/UdpSocket.cpp
+++ /dev/null
@@ -1,256 +0,0 @@
-/*
- Copyright (C) 2003, 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the
- Queen in Right of Canada (Communications Research Center Canada)
-
- Copyright (C) 2017
- Matthias P. Braendli, matthias.braendli@mpb.li
-
- http://www.opendigitalradio.org
- */
-/*
- This file is part of ODR-DabMux.
-
- ODR-DabMux is free software: you can redistribute it and/or modify
- it under the terms of the GNU General Public License as
- published by the Free Software Foundation, either version 3 of the
- License, or (at your option) any later version.
-
- ODR-DabMux is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>.
- */
-
-#include "UdpSocket.h"
-
-#include <iostream>
-#include <stdio.h>
-#include <errno.h>
-#include <fcntl.h>
-#include <string.h>
-
-using namespace std;
-
-UdpSocket::UdpSocket() :
- listenSocket(INVALID_SOCKET)
-{
- reinit(0, "");
-}
-
-UdpSocket::UdpSocket(int port) :
- listenSocket(INVALID_SOCKET)
-{
- reinit(port, "");
-}
-
-UdpSocket::UdpSocket(int port, const std::string& name) :
- listenSocket(INVALID_SOCKET)
-{
- reinit(port, name);
-}
-
-
-int UdpSocket::setBlocking(bool block)
-{
- int res;
- if (block)
- res = fcntl(listenSocket, F_SETFL, 0);
- else
- res = fcntl(listenSocket, F_SETFL, O_NONBLOCK);
- if (res == SOCKET_ERROR) {
- setInetError("Can't change blocking state of socket");
- return -1;
- }
- return 0;
-}
-
-int UdpSocket::reinit(int port, const std::string& name)
-{
- if (listenSocket != INVALID_SOCKET) {
- ::close(listenSocket);
- }
-
- if ((listenSocket = socket(PF_INET, SOCK_DGRAM, 0)) == INVALID_SOCKET) {
- setInetError("Can't create socket");
- return -1;
- }
- reuseopt_t reuse = 1;
- if (setsockopt(listenSocket, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse))
- == SOCKET_ERROR) {
- setInetError("Can't reuse address");
- return -1;
- }
-
- if (port) {
- address.setAddress(name);
- address.setPort(port);
-
- if (::bind(listenSocket, address.getAddress(), sizeof(sockaddr_in)) == SOCKET_ERROR) {
- setInetError("Can't bind socket");
- ::close(listenSocket);
- listenSocket = INVALID_SOCKET;
- return -1;
- }
- }
- return 0;
-}
-
-int UdpSocket::close()
-{
- if (listenSocket != INVALID_SOCKET) {
- ::close(listenSocket);
- }
-
- listenSocket = INVALID_SOCKET;
-
- return 0;
-}
-
-UdpSocket::~UdpSocket()
-{
- if (listenSocket != INVALID_SOCKET) {
- ::close(listenSocket);
- }
-}
-
-
-int UdpSocket::receive(UdpPacket& packet)
-{
- socklen_t addrSize;
- addrSize = sizeof(*packet.getAddress().getAddress());
- ssize_t ret = recvfrom(listenSocket,
- packet.getData(),
- packet.getSize(),
- 0,
- packet.getAddress().getAddress(),
- &addrSize);
-
- if (ret == SOCKET_ERROR) {
- packet.setSize(0);
- if (errno == EAGAIN) {
- return 0;
- }
- setInetError("Can't receive UDP packet");
- return -1;
- }
-
- packet.setSize(ret);
- return 0;
-}
-
-int UdpSocket::send(UdpPacket& packet)
-{
- int ret = sendto(listenSocket, packet.getData(), packet.getSize(), 0,
- packet.getAddress().getAddress(), sizeof(*packet.getAddress().getAddress()));
- if (ret == SOCKET_ERROR && errno != ECONNREFUSED) {
- setInetError("Can't send UDP packet");
- return -1;
- }
- return 0;
-}
-
-
-int UdpSocket::send(const std::vector<uint8_t>& data, InetAddress destination)
-{
- int ret = sendto(listenSocket, &data[0], data.size(), 0,
- destination.getAddress(), sizeof(*destination.getAddress()));
- if (ret == SOCKET_ERROR && errno != ECONNREFUSED) {
- setInetError("Can't send UDP packet");
- return -1;
- }
- return 0;
-}
-
-
-/**
- * Must be called to receive data on a multicast address.
- * @param groupname The multica
-st address to join.
- * @return 0 if ok, -1 if error
- */
-int UdpSocket::joinGroup(char* groupname)
-{
- ip_mreqn group;
- if ((group.imr_multiaddr.s_addr = inet_addr(groupname)) == INADDR_NONE) {
- setInetError(groupname);
- return -1;
- }
- if (!IN_MULTICAST(ntohl(group.imr_multiaddr.s_addr))) {
- setInetError("Not a multicast address");
- return -1;
- }
- group.imr_address.s_addr = htons(INADDR_ANY);;
- group.imr_ifindex = 0;
- if (setsockopt(listenSocket, IPPROTO_IP, IP_ADD_MEMBERSHIP, &group, sizeof(group))
- == SOCKET_ERROR) {
- setInetError("Can't join multicast group");
- }
- return 0;
-}
-
-int UdpSocket::setMulticastTTL(int ttl)
-{
- if (setsockopt(listenSocket, IPPROTO_IP, IP_MULTICAST_TTL, &ttl, sizeof(ttl))
- == SOCKET_ERROR) {
- setInetError("Can't set ttl");
- return -1;
- }
-
- return 0;
-}
-
-int UdpSocket::setMulticastSource(const char* source_addr)
-{
- struct in_addr addr;
- if (inet_aton(source_addr, &addr) == 0) {
- setInetError("Can't parse source address");
- return -1;
- }
-
- if (setsockopt(listenSocket, IPPROTO_IP, IP_MULTICAST_IF, &addr, sizeof(addr))
- == SOCKET_ERROR) {
- setInetError("Can't set source address");
- return -1;
- }
-
- return 0;
-}
-
-UdpPacket::UdpPacket() { }
-
-UdpPacket::UdpPacket(size_t initSize) :
- m_buffer(initSize)
-{ }
-
-
-void UdpPacket::setSize(size_t newSize)
-{
- m_buffer.resize(newSize);
-}
-
-
-uint8_t* UdpPacket::getData()
-{
- return &m_buffer[0];
-}
-
-
-void UdpPacket::addData(const void *data, size_t size)
-{
- uint8_t *d = (uint8_t*)data;
- std::copy(d, d + size, std::back_inserter(m_buffer));
-}
-
-size_t UdpPacket::getSize()
-{
- return m_buffer.size();
-}
-
-InetAddress UdpPacket::getAddress()
-{
- return address;
-}
-
diff --git a/src/UdpSocket.h b/src/UdpSocket.h
deleted file mode 100644
index f51e87c..0000000
--- a/src/UdpSocket.h
+++ /dev/null
@@ -1,174 +0,0 @@
-/*
- Copyright (C) 2003, 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the
- Queen in Right of Canada (Communications Research Center Canada)
-
- Copyright (C) 2017
- Matthias P. Braendli, matthias.braendli@mpb.li
-
- http://www.opendigitalradio.org
- */
-/*
- This file is part of ODR-DabMux.
-
- ODR-DabMux is free software: you can redistribute it and/or modify
- it under the terms of the GNU General Public License as
- published by the Free Software Foundation, either version 3 of the
- License, or (at your option) any later version.
-
- ODR-DabMux is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>.
- */
-
-#pragma once
-
-#ifdef HAVE_CONFIG_H
-# include "config.h"
-#endif
-
-#include "InetAddress.h"
-#include <sys/socket.h>
-#include <netinet/in.h>
-#include <unistd.h>
-#include <netdb.h>
-#include <arpa/inet.h>
-#include <pthread.h>
-#define SOCKET int
-#define INVALID_SOCKET -1
-#define SOCKET_ERROR -1
-#define reuseopt_t int
-
-#include <stdlib.h>
-#include <iostream>
-#include <vector>
-
-class UdpPacket;
-
-
-/**
- * This class represents a socket for sending and receiving UDP packets.
- *
- * A UDP socket is the sending or receiving point for a packet delivery service.
- * Each packet sent or received on a datagram socket is individually
- * addressed and routed. Multiple packets sent from one machine to another may
- * be routed differently, and may arrive in any order.
- */
-class UdpSocket
-{
- public:
- /** Create a new socket that will not be bound to any port. To be used
- * for data output.
- */
- UdpSocket();
- /** Create a new socket.
- * @param port The port number on which the socket will be bound
- */
- UdpSocket(int port);
- /** Create a new socket.
- * @param port The port number on which the socket will be bound
- * @param name The IP address on which the socket will be bound.
- * It is used to bind the socket on a specific interface if
- * the computer have many NICs.
- */
- UdpSocket(int port, const std::string& name);
- ~UdpSocket();
- UdpSocket(const UdpSocket& other) = delete;
- const UdpSocket& operator=(const UdpSocket& other) = delete;
-
- /** reinitialise socket. Close the already open socket, and
- * create a new one
- */
- int reinit(int port, const std::string& name);
-
- /** Close the socket
- */
- int close(void);
-
- /** Send an UDP packet.
- * @param packet The UDP packet to be sent. It includes the data and the
- * destination address
- * return 0 if ok, -1 if error
- */
- int send(UdpPacket& packet);
-
- /** Send an UDP packet
- *
- * return 0 if ok, -1 if error
- */
- int send(const std::vector<uint8_t>& data, InetAddress destination);
-
- /** Receive an UDP packet.
- * @param packet The packet that will receive the data. The address will be set
- * to the source address.
- * @return 0 if ok, -1 if error
- */
- int receive(UdpPacket& packet);
-
- int joinGroup(char* groupname);
- int setMulticastSource(const char* source_addr);
- int setMulticastTTL(int ttl);
-
- /** Set blocking mode. By default, the socket is blocking.
- * @return 0 if ok
- * -1 if error
- */
- int setBlocking(bool block);
-
- protected:
-
- /// The address on which the socket is bound.
- InetAddress address;
- /// The low-level socket used by system functions.
- SOCKET listenSocket;
-};
-
-/** This class represents a UDP packet.
- *
- * A UDP packet contains a payload (sequence of bytes) and an address. For
- * outgoing packets, the address is the destination address. For incoming
- * packets, the address tells the user from what source the packet arrived from.
- */
-class UdpPacket
-{
- public:
- /** Construct an empty UDP packet.
- */
- UdpPacket();
- UdpPacket(size_t initSize);
-
- /** Give the pointer to data.
- * @return The pointer
- */
- uint8_t* getData(void);
-
- /** Append some data at the end of data buffer and adjust size.
- * @param data Pointer to the data to add
- * @param size Size in bytes of new data
- */
- void addData(const void *data, size_t size);
-
- size_t getSize(void);
-
- /** Changes size of the data buffer size. Keeps data intact unless
- * truncated.
- */
- void setSize(size_t newSize);
-
- /** Returns the UDP address of the packet.
- */
- InetAddress getAddress(void);
-
- const std::vector<uint8_t>& getBuffer(void) const {
- return m_buffer;
- }
-
-
- private:
- std::vector<uint8_t> m_buffer;
- InetAddress address;
-};
-
diff --git a/src/dabOutput/dabOutput.h b/src/dabOutput/dabOutput.h
index 9cc18d7..c7e570b 100644
--- a/src/dabOutput/dabOutput.h
+++ b/src/dabOutput/dabOutput.h
@@ -28,8 +28,7 @@
#pragma once
-#include "UdpSocket.h"
-#include "TcpSocket.h"
+#include "Socket.h"
#include "Log.h"
#include "string.h"
#include <stdexcept>
@@ -57,6 +56,8 @@ class DabOutput
{
return Open(name.c_str());
}
+
+ // Return -1 on failure
virtual int Write(void* buffer, int size) = 0;
virtual int Close() = 0;
@@ -145,15 +146,7 @@ class DabOutputRaw : public DabOutput
class DabOutputUdp : public DabOutput
{
public:
- DabOutputUdp() {
- packet_ = new UdpPacket(6144);
- socket_ = new UdpSocket();
- }
-
- virtual ~DabOutputUdp() {
- delete socket_;
- delete packet_;
- }
+ DabOutputUdp();
int Open(const char* name);
int Write(void* buffer, int size);
@@ -171,8 +164,8 @@ class DabOutputUdp : public DabOutput
DabOutputUdp operator=(const DabOutputUdp& other) = delete;
std::string uri_;
- UdpSocket* socket_;
- UdpPacket* packet_;
+ Socket::UDPSocket socket_;
+ Socket::UDPPacket packet_;
};
// -------------- TCP ------------------
@@ -190,7 +183,7 @@ class DabOutputTcp : public DabOutput
private:
std::string uri_;
- std::shared_ptr<TCPDataDispatcher> dispatcher_;
+ std::shared_ptr<Socket::TCPDataDispatcher> dispatcher_;
};
// -------------- Simul ------------------
diff --git a/src/dabOutput/dabOutputTcp.cpp b/src/dabOutput/dabOutputTcp.cpp
index 87dbfd5..4dc3538 100644
--- a/src/dabOutput/dabOutputTcp.cpp
+++ b/src/dabOutput/dabOutputTcp.cpp
@@ -94,7 +94,7 @@ int DabOutputTcp::Open(const char* name)
uri_ = name;
if (success) {
- dispatcher_ = make_shared<TCPDataDispatcher>(MAX_QUEUED_ETI_FRAMES);
+ dispatcher_ = make_shared<Socket::TCPDataDispatcher>(MAX_QUEUED_ETI_FRAMES);
dispatcher_->start(port, address);
}
else {
diff --git a/src/dabOutput/dabOutputUdp.cpp b/src/dabOutput/dabOutputUdp.cpp
index c129569..b9c22db 100644
--- a/src/dabOutput/dabOutputUdp.cpp
+++ b/src/dabOutput/dabOutputUdp.cpp
@@ -38,18 +38,12 @@
#include <cstdio>
#include <limits.h>
#include "dabOutput.h"
-#include "UdpSocket.h"
-
-#ifdef _WIN32
-# include <fscfg.h>
-# include <sdci.h>
-#else
-# include <netinet/in.h>
-# include <sys/types.h>
-# include <sys/socket.h>
-# include <sys/ioctl.h>
-# include <net/if_arp.h>
-#endif
+#include "Socket.h"
+
+DabOutputUdp::DabOutputUdp() :
+ socket_(),
+ packet_(6144)
+{ }
int DabOutputUdp::Open(const char* name)
{
@@ -64,12 +58,6 @@ int DabOutputUdp::Open(const char* name)
regex_constants::match_default)) {
string address = what[1];
- if (this->packet_->getAddress().setAddress(address.c_str()) == -1) {
- etiLog.level(error) << "can't set address " <<
- address << "(" << inetErrDesc << ": " << inetErrMsg << ")";
- return -1;
- }
-
string port_str = what[2];
long port = std::strtol(port_str.c_str(), nullptr, 0);
@@ -79,7 +67,7 @@ int DabOutputUdp::Open(const char* name)
return -1;
}
- this->packet_->getAddress().setPort(port);
+ packet_.address.resolveUdpDestination(address, port);
string query_params = what[3];
smatch query_what;
@@ -87,28 +75,25 @@ int DabOutputUdp::Open(const char* name)
regex_constants::match_default)) {
string src = query_what[1];
- int err = socket_->setMulticastSource(src.c_str());
- if (err) {
- etiLog.level(error) << "UDP output socket set source failed!";
- return -1;
- }
+ try {
+ socket_.setMulticastSource(src.c_str());
- string ttl_str = query_what[2];
+ string ttl_str = query_what[2];
- if (not ttl_str.empty()) {
- long ttl = std::strtol(ttl_str.c_str(), nullptr, 0);
- if ((ttl <= 0) || (ttl >= 255)) {
- etiLog.level(error) << "Invalid TTL setting in " <<
- uri_without_proto;
- return -1;
- }
+ if (not ttl_str.empty()) {
+ long ttl = std::strtol(ttl_str.c_str(), nullptr, 0);
+ if ((ttl <= 0) || (ttl >= 255)) {
+ etiLog.level(error) << "Invalid TTL setting in " <<
+ uri_without_proto;
+ return -1;
+ }
- err = socket_->setMulticastTTL(ttl);
- if (err) {
- etiLog.level(error) << "UDP output socket set TTL failed!";
- return -1;
+ socket_.setMulticastTTL(ttl);
}
}
+ catch (const std::runtime_error& e) {
+ etiLog.level(error) << "Failed to set UDP output settings" << e.what();
+ }
}
else if (not query_params.empty()) {
etiLog.level(error) << "UDP output: could not parse parameters " <<
@@ -129,9 +114,11 @@ int DabOutputUdp::Open(const char* name)
int DabOutputUdp::Write(void* buffer, int size)
{
- this->packet_->setSize(0);
- this->packet_->addData(buffer, size);
- return this->socket_->send(*this->packet_);
+ const uint8_t *buf = reinterpret_cast<uint8_t*>(buffer);
+ packet_.buffer.resize(0);
+ std::copy(buf, buf + size, std::back_inserter(packet_.buffer));
+ socket_.send(packet_);
+ return 0;
}
#endif // defined(HAVE_OUTPUT_UDP)
diff --git a/src/fig/FIG0_19.cpp b/src/fig/FIG0_19.cpp
index f032bd5..5b6a384 100644
--- a/src/fig/FIG0_19.cpp
+++ b/src/fig/FIG0_19.cpp
@@ -109,6 +109,19 @@ FillStatus FIG0_19::fill(uint8_t *buf, size_t max_size)
else {
fig0_19->ASw = 0;
}
+
+ /* From the crc-mmbtools google groups, 2019-07-11, L. Cornell:
+ *
+ * Long ago, there was a defined use for the New flag - it was intended
+ * to indicate whether the announcement was new or was a repeated
+ * announcement. But the problem is that it doesn't really help
+ * receivers because they might tune to the ensemble at any time, and
+ * might tune to a service that may be interrupted at any time. So
+ * some years ago it was decided that the New flag would not longer be
+ * used in transmissions. The setting was fixed to be 1 because some
+ * receivers would have never switched to the announcement if the flag
+ * was set to 0.
+ */
fig0_19->NewFlag = 1;
fig0_19->RegionFlag = 0;
fig0_19->SubChId = 0;
diff --git a/src/input/Edi.cpp b/src/input/Edi.cpp
new file mode 100644
index 0000000..765a355
--- /dev/null
+++ b/src/input/Edi.cpp
@@ -0,0 +1,221 @@
+/*
+ Copyright (C) 2009 Her Majesty the Queen in Right of Canada (Communications
+ Research Center Canada)
+
+ Copyright (C) 2019
+ Matthias P. Braendli, matthias.braendli@mpb.li
+
+ http://www.opendigitalradio.org
+ */
+/*
+ This file is part of ODR-DabMux.
+
+ ODR-DabMux is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as
+ published by the Free Software Foundation, either version 3 of the
+ License, or (at your option) any later version.
+
+ ODR-DabMux is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include "input/Edi.h"
+
+#include <regex>
+#include <chrono>
+#include <stdexcept>
+#include <sstream>
+#include <cstring>
+#include <cstdlib>
+#include <cerrno>
+#include <climits>
+#include "utils.h"
+
+using namespace std;
+
+namespace Inputs {
+
+constexpr bool VERBOSE = false;
+constexpr size_t TCP_BLOCKSIZE = 2048;
+constexpr size_t MAX_FRAMES_QUEUED = 1000;
+
+Edi::Edi() :
+ m_tcp_receive_server(TCP_BLOCKSIZE),
+ m_sti_writer(),
+ m_sti_decoder(m_sti_writer, VERBOSE)
+{ }
+
+Edi::~Edi() {
+ m_running = false;
+ if (m_thread.joinable()) {
+ m_thread.join();
+ }
+}
+
+int Edi::open(const std::string& name)
+{
+ const std::regex re_udp("udp://:([0-9]+)");
+ const std::regex re_tcp("tcp://(.*):([0-9]+)");
+
+ lock_guard<mutex> lock(m_mutex);
+
+ m_running = false;
+ if (m_thread.joinable()) {
+ m_thread.join();
+ }
+
+ std::smatch m;
+ if (std::regex_match(name, m, re_udp)) {
+ const int udp_port = std::stoi(m[1].str());
+ m_input_used = InputUsed::UDP;
+ m_udp_sock.reinit(udp_port);
+ m_udp_sock.setBlocking(false);
+ // TODO multicast
+ }
+ else if (std::regex_match(name, m, re_tcp)) {
+ m_input_used = InputUsed::TCP;
+ const string addr = m[1].str();
+ const int tcp_port = std::stoi(m[2].str());
+ m_tcp_receive_server.start(tcp_port, addr);
+ }
+ else {
+ throw runtime_error("Cannot parse EDI input URI");
+ }
+
+ m_name = name;
+
+ m_running = true;
+ m_thread = std::thread(&Edi::m_run, this);
+
+ return 0;
+}
+
+int Edi::readFrame(uint8_t* buffer, size_t size)
+{
+ if (m_pending_sti_frame.frame.empty()) {
+ m_frames.try_pop(m_pending_sti_frame);
+ }
+
+ if (not m_pending_sti_frame.frame.empty()) {
+ if (m_pending_sti_frame.frame.size() != size) {
+ etiLog.level(debug) << "EDI input " << m_name << " size mismatch: " <<
+ m_pending_sti_frame.frame.size() << " received, " << size << " requested";
+ memset(buffer, 0, size * sizeof(*buffer));
+ }
+ else {
+ const auto now = chrono::system_clock::now();
+
+ if (m_pending_sti_frame.timestamp.to_system_clock() <= now) {
+ etiLog.level(debug) << "EDI input take frame with TS " <<
+ m_pending_sti_frame.timestamp.to_string() << " queue size " << m_frames.size();
+
+ std::copy(m_pending_sti_frame.frame.cbegin(), m_pending_sti_frame.frame.cend(), buffer);
+ m_pending_sti_frame.frame.clear();
+ }
+ else {
+ etiLog.level(debug) << "EDI input skip frame with TS " <<
+ m_pending_sti_frame.timestamp.to_string() << " queue size " << m_frames.size();
+ }
+ }
+ }
+
+ return size;
+
+#if 0
+ EdiDecoder::sti_frame_t sti;
+ if (m_is_prebuffering) {
+ m_is_prebuffering = m_frames.size() < 10;
+ if (not m_is_prebuffering) {
+ etiLog.level(info) << "EDI input " << m_name << " pre-buffering complete.";
+ }
+ }
+ else if (m_frames.try_pop(sti)) {
+ if (sti.frame.size() == 0) {
+ etiLog.level(debug) << "EDI input " << m_name << " empty frame";
+ return 0;
+ }
+ else if (sti.frame.size() == size) {
+ std::copy(sti.frame.cbegin(), sti.frame.cend(), buffer);
+ }
+ else {
+ etiLog.level(debug) << "EDI input " << m_name << " size mismatch: " << sti.frame.size() <<
+ " received, " << size << " requested";
+ memset(buffer, 0, size * sizeof(*buffer));
+ }
+ }
+ else {
+ memset(buffer, 0, size * sizeof(*buffer));
+ m_is_prebuffering = true;
+ etiLog.level(info) << "EDI input " << m_name << " re-enabling pre-buffering";
+ }
+ return size;
+#endif
+}
+
+void Edi::m_run()
+{
+ while (m_running) {
+ bool work_done = false;
+
+ switch (m_input_used) {
+ case InputUsed::UDP:
+ {
+ constexpr size_t packsize = 2048;
+ const auto packet = m_udp_sock.receive(packsize);
+ if (packet.buffer.size() == packsize) {
+ fprintf(stderr, "Warning, possible UDP truncation\n");
+ }
+ if (not packet.buffer.empty()) {
+ m_sti_decoder.push_packet(packet.buffer);
+ work_done = true;
+ }
+ }
+ break;
+ case InputUsed::TCP:
+ {
+ auto packet = m_tcp_receive_server.receive();
+ if (not packet.empty()) {
+ m_sti_decoder.push_bytes(packet);
+ work_done = true;
+ }
+ }
+ break;
+ default:
+ throw logic_error("unimplemented input");
+ }
+
+ const auto sti = m_sti_writer.getFrame();
+ if (not sti.frame.empty()) {
+ m_frames.push_wait_if_full(move(sti), MAX_FRAMES_QUEUED);
+ work_done = true;
+ }
+
+ if (not work_done) {
+ // Avoid fast loop
+ this_thread::sleep_for(chrono::milliseconds(12));
+ }
+ }
+}
+
+int Edi::setBitrate(int bitrate)
+{
+ if (bitrate <= 0) {
+ etiLog.level(error) << "Invalid bitrate (" << bitrate << ") for " << m_name;
+ return -1;
+ }
+
+ return bitrate;
+}
+
+int Edi::close()
+{
+ m_udp_sock.close();
+ return 0;
+}
+
+}
diff --git a/src/input/Edi.h b/src/input/Edi.h
new file mode 100644
index 0000000..66ff682
--- /dev/null
+++ b/src/input/Edi.h
@@ -0,0 +1,83 @@
+/*
+ Copyright (C) 2009 Her Majesty the Queen in Right of Canada (Communications
+ Research Center Canada)
+
+ Copyright (C) 2019
+ Matthias P. Braendli, matthias.braendli@mpb.li
+
+ http://www.opendigitalradio.org
+ */
+/*
+ This file is part of ODR-DabMux.
+
+ ODR-DabMux is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as
+ published by the Free Software Foundation, either version 3 of the
+ License, or (at your option) any later version.
+
+ ODR-DabMux is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#pragma once
+
+#include <string>
+#include <vector>
+#include <deque>
+#include <thread>
+#include <mutex>
+#include "Socket.h"
+#include "input/inputs.h"
+#include "edi/STIDecoder.hpp"
+#include "edi/STIWriter.hpp"
+#include "ThreadsafeQueue.h"
+
+namespace Inputs {
+
+/*
+ * Receives EDI from UDP or TCP in a separate thread and pushes that data
+ * into the STIDecoder. Complete frames are then put into a queue for the consumer.
+ *
+ * This way, the EDI decoding happens in a separate thread.
+ */
+class Edi : public InputBase {
+ public:
+ Edi();
+ Edi(const Edi&) = delete;
+ Edi& operator=(const Edi&) = delete;
+ ~Edi();
+
+ virtual int open(const std::string& name);
+ virtual int readFrame(uint8_t* buffer, size_t size);
+ virtual int setBitrate(int bitrate);
+ virtual int close();
+
+ protected:
+ void m_run();
+
+ std::mutex m_mutex;
+
+ enum class InputUsed { Invalid, UDP, TCP };
+ InputUsed m_input_used = InputUsed::Invalid;
+ Socket::UDPSocket m_udp_sock;
+ Socket::TCPReceiveServer m_tcp_receive_server;
+
+ EdiDecoder::STIWriter m_sti_writer;
+ EdiDecoder::STIDecoder m_sti_decoder;
+ std::thread m_thread;
+ std::atomic<bool> m_running = ATOMIC_VAR_INIT(false);
+ ThreadsafeQueue<EdiDecoder::sti_frame_t> m_frames;
+ EdiDecoder::sti_frame_t m_pending_sti_frame;
+
+ bool m_is_prebuffering = true;
+
+ std::string m_name;
+};
+
+};
+
diff --git a/src/input/Udp.cpp b/src/input/Udp.cpp
index 2cb49e7..5d4f964 100644
--- a/src/input/Udp.cpp
+++ b/src/input/Udp.cpp
@@ -82,17 +82,8 @@ void Udp::openUdpSocket(const std::string& endpoint)
throw out_of_range("can't use port number 0 in udp address");
}
- if (m_sock.reinit(port, address) == -1) {
- stringstream ss;
- ss << "Could not init UDP socket: " << inetErrMsg;
- throw runtime_error(ss.str());
- }
-
- if (m_sock.setBlocking(false) == -1) {
- stringstream ss;
- ss << "Could not set non-blocking UDP socket: " << inetErrMsg;
- throw runtime_error(ss.str());
- }
+ m_sock.reinit(port, address);
+ m_sock.setBlocking(false);
etiLog.level(info) << "Opened UDP port " << address << ":" << port;
}
@@ -100,17 +91,9 @@ void Udp::openUdpSocket(const std::string& endpoint)
int Udp::readFrame(uint8_t* buffer, size_t size)
{
// Regardless of buffer contents, try receiving data.
- UdpPacket packet(32768);
- int ret = m_sock.receive(packet);
-
- if (ret == -1) {
- stringstream ss;
- ss << "Could not read from UDP socket: " << inetErrMsg;
- throw runtime_error(ss.str());
- }
+ auto packet = m_sock.receive(32768);
- std::copy(packet.getData(), packet.getData() + packet.getSize(),
- back_inserter(m_buffer));
+ std::copy(packet.buffer.cbegin(), packet.buffer.cend(), back_inserter(m_buffer));
// Take data from the buffer if it contains enough data,
// in any case write the buffer
@@ -136,7 +119,8 @@ int Udp::setBitrate(int bitrate)
int Udp::close()
{
- return m_sock.close();
+ m_sock.close();
+ return 0;
}
@@ -167,8 +151,8 @@ static uint16_t unpack2(const uint8_t *buf)
int Sti_d_Rtp::open(const std::string& name)
{
- // Skip the sti-rtp:// part if it is present
- const string endpoint = (name.substr(0, 10) == "sti-rtp://") ?
+ // Skip the rtp:// part if it is present
+ const string endpoint = (name.substr(0, 10) == "rtp://") ?
name.substr(10) : name;
// The endpoint should be address:port
@@ -176,8 +160,8 @@ int Sti_d_Rtp::open(const std::string& name)
if (colon_pos == string::npos) {
stringstream ss;
ss << "'" << name <<
- " is an invalid format for sti-rtp address: "
- "expected [sti-rtp://]address:port";
+ " is an invalid format for rtp address: "
+ "expected [rtp://]address:port";
throw invalid_argument(ss.str());
}
@@ -190,29 +174,22 @@ int Sti_d_Rtp::open(const std::string& name)
void Sti_d_Rtp::receive_packet()
{
- UdpPacket packet(32768);
- int ret = m_sock.receive(packet);
-
- if (ret == -1) {
- stringstream ss;
- ss << "Could not read from UDP socket: " << inetErrMsg;
- throw runtime_error(ss.str());
- }
+ auto packet = m_sock.receive(32768);
- if (packet.getSize() == 0) {
+ if (packet.buffer.empty()) {
// No packet was received
return;
}
const size_t STI_FC_LEN = 8;
- if (packet.getSize() < RTP_HEADER_LEN + STI_SYNC_LEN + STI_FC_LEN) {
+ if (packet.buffer.size() < RTP_HEADER_LEN + STI_SYNC_LEN + STI_FC_LEN) {
etiLog.level(info) << "Received too small RTP packet for " <<
m_name;
return;
}
- if (not rtpHeaderValid(packet.getData())) {
+ if (not rtpHeaderValid(packet.buffer.data())) {
etiLog.level(info) << "Received invalid RTP header for " <<
m_name;
return;
@@ -220,7 +197,7 @@ void Sti_d_Rtp::receive_packet()
// STI(PI, X)
size_t index = RTP_HEADER_LEN;
- const uint8_t *buf = packet.getData();
+ const uint8_t *buf = packet.buffer.data();
// SYNC
index++; // Advance over STAT
@@ -242,7 +219,7 @@ void Sti_d_Rtp::receive_packet()
m_name;
return;
}
- if (packet.getSize() < index + DFS) {
+ if (packet.buffer.size() < index + DFS) {
etiLog.level(info) << "Received STI too small for given DFS for " <<
m_name;
return;
@@ -270,9 +247,9 @@ void Sti_d_Rtp::receive_packet()
uint16_t NST = unpack2(buf+index) & 0x7FF; // 11 bits
index += 2;
- if (packet.getSize() < index + 4*NST) {
+ if (packet.buffer.size() < index + 4*NST) {
etiLog.level(info) << "Received STI too small to contain NST for " <<
- m_name << " packet: " << packet.getSize() << " need " <<
+ m_name << " packet: " << packet.buffer.size() << " need " <<
index + 4*NST;
return;
}
diff --git a/src/input/Udp.h b/src/input/Udp.h
index dc01486..dd637c6 100644
--- a/src/input/Udp.h
+++ b/src/input/Udp.h
@@ -31,7 +31,7 @@
#include <deque>
#include <boost/thread.hpp>
#include "input/inputs.h"
-#include "UdpSocket.h"
+#include "Socket.h"
namespace Inputs {
@@ -46,7 +46,7 @@ class Udp : public InputBase {
virtual int close();
protected:
- UdpSocket m_sock;
+ Socket::UDPSocket m_sock;
std::string m_name;
void openUdpSocket(const std::string& endpoint);
diff --git a/src/utils.cpp b/src/utils.cpp
index 721c145..3e3e86e 100644
--- a/src/utils.cpp
+++ b/src/utils.cpp
@@ -328,7 +328,7 @@ void printSubchannels(const vec_sp_subchannel& subchannels)
etiLog.level(info) << " URI: " << subchannel->inputUri;
switch (subchannel->type) {
case subchannel_type_t::DABAudio:
- etiLog.log(info, " type: DAbAudio");
+ etiLog.log(info, " type: DABAudio");
break;
case subchannel_type_t::DABPlusAudio:
etiLog.log(info, " type: DABPlusAudio");
diff --git a/src/zmq2edi/EDISender.cpp b/src/zmq2edi/EDISender.cpp
index 2128abf..2188f8a 100644
--- a/src/zmq2edi/EDISender.cpp
+++ b/src/zmq2edi/EDISender.cpp
@@ -79,7 +79,7 @@ void EDISender::print_configuration()
void EDISender::send_eti_frame(uint8_t* p, metadata_t metadata)
{
edi::TagDETI edi_tagDETI;
- edi::TagStarPTR edi_tagStarPtr;
+ edi::TagStarPTR edi_tagStarPtr("DETI");
map<int, edi::TagESTn> edi_subchannelToTag;
// The above Tag Items will be assembled into a TAG Packet
edi::TagPacket edi_tagpacket(edi_conf.tagpacket_alignment);
diff --git a/src/zmq2edi/EDISender.h b/src/zmq2edi/EDISender.h
index bb9c8bc..3525b4b 100644
--- a/src/zmq2edi/EDISender.h
+++ b/src/zmq2edi/EDISender.h
@@ -34,9 +34,9 @@
#include <atomic>
#include "ThreadsafeQueue.h"
#include "dabOutput/dabOutput.h"
-#include "dabOutput/edi/TagItems.h"
-#include "dabOutput/edi/TagPacket.h"
-#include "dabOutput/edi/Transport.h"
+#include "edioutput/TagItems.h"
+#include "edioutput/TagPacket.h"
+#include "edioutput/Transport.h"
// This metadata gets transmitted in the zmq stream
struct metadata_t {
diff --git a/src/zmq2edi/zmq2edi.cpp b/src/zmq2edi/zmq2edi.cpp
index 3888d8a..7f34610 100644
--- a/src/zmq2edi/zmq2edi.cpp
+++ b/src/zmq2edi/zmq2edi.cpp
@@ -27,24 +27,27 @@
#include "Log.h"
#include "zmq.hpp"
-#include <math.h>
#include <getopt.h>
-#include <string.h>
+#include <cmath>
+#include <cstring>
+#include <chrono>
#include <iostream>
#include <iterator>
+#include <thread>
#include <vector>
#include "EDISender.h"
#include "dabOutput/dabOutput.h"
constexpr size_t MAX_ERROR_COUNT = 10;
+constexpr size_t MAX_NUM_RESETS = 180;
constexpr long ZMQ_TIMEOUT_MS = 1000;
static edi::configuration_t edi_conf;
static EDISender edisender;
-void usage(void)
+static void usage()
{
using namespace std;
@@ -70,8 +73,9 @@ void usage(void)
cerr << " -S <source ip> select the source IP in case we want to use multicast." << endl;
cerr << " -t <ttl> set the packet's TTL." << endl << endl;
- cerr << "odr-zmq2edi will quit if it does not receive data for " <<
+ cerr << "The input socket will be reset if no data is received for " <<
(int)(MAX_ERROR_COUNT * ZMQ_TIMEOUT_MS / 1000.0) << " seconds." << endl;
+ cerr << "After " << MAX_NUM_RESETS << " consecutive resets, the process will quit." << endl;
cerr << "It is best practice to run this tool under a process supervisor that will restart it automatically." << endl;
}
@@ -313,85 +317,92 @@ int start(int argc, char **argv)
const char* source_url = argv[optind];
-
- size_t frame_count = 0;
- size_t error_count = 0;
-
- etiLog.level(info) << "Opening ZMQ input: " << source_url;
-
zmq::context_t zmq_ctx(1);
- zmq::socket_t zmq_sock(zmq_ctx, ZMQ_SUB);
- zmq_sock.connect(source_url);
- zmq_sock.setsockopt(ZMQ_SUBSCRIBE, NULL, 0); // subscribe to all messages
-
- while (error_count < MAX_ERROR_COUNT) {
- zmq::message_t incoming;
- zmq::pollitem_t items[1];
- items[0].socket = zmq_sock;
- items[0].events = ZMQ_POLLIN;
- const int num_events = zmq::poll(items, 1, ZMQ_TIMEOUT_MS);
- if (num_events == 0) { // timeout
- error_count++;
- }
- else {
- // Event received: recv will not block
- zmq_sock.recv(&incoming);
-
- zmq_dab_message_t* dab_msg = (zmq_dab_message_t*)incoming.data();
+ etiLog.level(info) << "Opening ZMQ input: " << source_url;
- if (dab_msg->version != 1) {
- etiLog.level(error) << "ZeroMQ wrong packet version " << dab_msg->version;
+ size_t num_consecutive_resets = 0;
+ while (num_consecutive_resets < MAX_NUM_RESETS) {
+ zmq::socket_t zmq_sock(zmq_ctx, ZMQ_SUB);
+ zmq_sock.connect(source_url);
+ zmq_sock.setsockopt(ZMQ_SUBSCRIBE, NULL, 0); // subscribe to all messages
+
+ size_t error_count = 0;
+
+ while (error_count < MAX_ERROR_COUNT) {
+ zmq::message_t incoming;
+ zmq::pollitem_t items[1];
+ items[0].socket = zmq_sock;
+ items[0].events = ZMQ_POLLIN;
+ const int num_events = zmq::poll(items, 1, ZMQ_TIMEOUT_MS);
+ if (num_events == 0) { // timeout
error_count++;
}
+ else {
+ num_consecutive_resets = 0;
- int offset = sizeof(dab_msg->version) +
- NUM_FRAMES_PER_ZMQ_MESSAGE * sizeof(*dab_msg->buflen);
+ // Event received: recv will not block
+ zmq_sock.recv(&incoming);
- std::list<std::pair<std::vector<uint8_t>, metadata_t> > all_frames;
+ zmq_dab_message_t* dab_msg = (zmq_dab_message_t*)incoming.data();
- for (int i = 0; i < NUM_FRAMES_PER_ZMQ_MESSAGE; i++) {
- if (dab_msg->buflen[i] <= 0 or dab_msg->buflen[i] > 6144) {
- etiLog.level(error) << "ZeroMQ buffer " << i <<
- " has invalid length " << dab_msg->buflen[i];
+ if (dab_msg->version != 1) {
+ etiLog.level(error) << "ZeroMQ wrong packet version " << dab_msg->version;
error_count++;
}
- else {
- std::vector<uint8_t> buf(6144, 0x55);
- const int framesize = dab_msg->buflen[i];
+ int offset = sizeof(dab_msg->version) +
+ NUM_FRAMES_PER_ZMQ_MESSAGE * sizeof(*dab_msg->buflen);
+
+ std::list<std::pair<std::vector<uint8_t>, metadata_t> > all_frames;
+
+ for (int i = 0; i < NUM_FRAMES_PER_ZMQ_MESSAGE; i++) {
+ if (dab_msg->buflen[i] <= 0 or dab_msg->buflen[i] > 6144) {
+ etiLog.level(error) << "ZeroMQ buffer " << i <<
+ " has invalid length " << dab_msg->buflen[i];
+ error_count++;
+ }
+ else {
+ std::vector<uint8_t> buf(6144, 0x55);
- memcpy(&buf.front(),
- ((uint8_t*)incoming.data()) + offset,
- framesize);
+ const int framesize = dab_msg->buflen[i];
- all_frames.emplace_back(
- std::piecewise_construct,
- std::make_tuple(std::move(buf)),
- std::make_tuple());
+ memcpy(&buf.front(),
+ ((uint8_t*)incoming.data()) + offset,
+ framesize);
- offset += framesize;
+ all_frames.emplace_back(
+ std::piecewise_construct,
+ std::make_tuple(std::move(buf)),
+ std::make_tuple());
+
+ offset += framesize;
+ }
}
- }
- for (auto &f : all_frames) {
- size_t consumed_bytes = 0;
+ for (auto &f : all_frames) {
+ size_t consumed_bytes = 0;
- f.second = get_md_one_frame(
- static_cast<uint8_t*>(incoming.data()) + offset,
- incoming.size() - offset,
- &consumed_bytes);
+ f.second = get_md_one_frame(
+ static_cast<uint8_t*>(incoming.data()) + offset,
+ incoming.size() - offset,
+ &consumed_bytes);
- offset += consumed_bytes;
- }
+ offset += consumed_bytes;
+ }
- for (auto &f : all_frames) {
- edisender.push_frame(f);
- frame_count++;
+ for (auto &f : all_frames) {
+ edisender.push_frame(f);
+ }
}
}
- }
- etiLog.level(info) << "Quitting after " << frame_count << " frames transferred";
+ num_consecutive_resets++;
+
+ zmq_sock.close();
+ std::this_thread::sleep_for(std::chrono::seconds(5));
+ etiLog.level(info) << "ZMQ input (" << source_url << ") timeout after " <<
+ num_consecutive_resets << " consecutive resets.";
+ }
return 0;
}
diff --git a/src/zmq2farsync/zmq2farsync.cpp b/src/zmq2farsync/zmq2farsync.cpp
index 16830a2..95dc074 100644
--- a/src/zmq2farsync/zmq2farsync.cpp
+++ b/src/zmq2farsync/zmq2farsync.cpp
@@ -3,7 +3,7 @@
2011, 2012 Her Majesty the Queen in Right of Canada (Communications
Research Center Canada)
- Copyright (C) 2018
+ Copyright (C) 2019
Matthias P. Braendli, matthias.braendli@mpb.li
http://www.opendigitalradio.org
@@ -28,13 +28,16 @@
#include "dabOutput/dabOutput.h"
#include "Log.h"
#include "zmq.hpp"
+#include <chrono>
#include <iostream>
+#include <thread>
#include <vector>
constexpr size_t MAX_ERROR_COUNT = 10;
+constexpr size_t MAX_NUM_RESETS = 180;
constexpr long ZMQ_TIMEOUT_MS = 1000;
-void usage(void)
+static void usage()
{
using namespace std;
@@ -46,8 +49,9 @@ void usage(void)
cerr << " <destination> is the device information for the FarSync card." << endl << endl;
cerr << " The syntax is the same as for ODR-DabMux" << endl << endl;
- cerr << "odr-zmq2edi will quit if it does not receive data for " <<
+ cerr << "The input socket will be reset if no data is received for " <<
(int)(MAX_ERROR_COUNT * ZMQ_TIMEOUT_MS / 1000.0) << " seconds." << endl;
+ cerr << "After " << MAX_NUM_RESETS << " consecutive resets, the process will quit." << endl;
cerr << "It is best practice to run this tool under a process supervisor that will restart it automatically." << endl;
}
@@ -80,72 +84,81 @@ int main(int argc, char **argv)
etiLog.level(info) << "Opening ZMQ input: " << source_url;
zmq::context_t zmq_ctx(1);
- zmq::socket_t zmq_sock(zmq_ctx, ZMQ_SUB);
-
- zmq_sock.connect(source_url);
- zmq_sock.setsockopt(ZMQ_SUBSCRIBE, NULL, 0); // subscribe to all messages
-
- etiLog.level(info) << "Entering main loop";
size_t frame_count = 0;
size_t loop_counter = 0;
- size_t error_count = 0;
- while (error_count < MAX_ERROR_COUNT)
- {
- zmq::message_t incoming;
- zmq::pollitem_t items[1];
- items[0].socket = zmq_sock;
- items[0].events = ZMQ_POLLIN;
- const int num_events = zmq::poll(items, 1, ZMQ_TIMEOUT_MS);
- if (num_events == 0) { // timeout
- error_count++;
- }
- else {
- // Event received: recv will not block
- zmq_sock.recv(&incoming);
- zmq_dab_message_t* dab_msg = (zmq_dab_message_t*)incoming.data();
-
- if (dab_msg->version != 1) {
- etiLog.level(error) << "ZeroMQ wrong packet version " << dab_msg->version;
+ size_t num_consecutive_resets = 0;
+ while (num_consecutive_resets < MAX_NUM_RESETS) {
+ zmq::socket_t zmq_sock(zmq_ctx, ZMQ_SUB);
+ zmq_sock.connect(source_url);
+ zmq_sock.setsockopt(ZMQ_SUBSCRIBE, NULL, 0); // subscribe to all messages
+
+ size_t error_count = 0;
+ while (error_count < MAX_ERROR_COUNT) {
+ zmq::message_t incoming;
+ zmq::pollitem_t items[1];
+ items[0].socket = zmq_sock;
+ items[0].events = ZMQ_POLLIN;
+ const int num_events = zmq::poll(items, 1, ZMQ_TIMEOUT_MS);
+ if (num_events == 0) { // timeout
error_count++;
}
+ else {
+ num_consecutive_resets = 0;
- int offset = sizeof(dab_msg->version) +
- NUM_FRAMES_PER_ZMQ_MESSAGE * sizeof(*dab_msg->buflen);
+ // Event received: recv will not block
+ zmq_sock.recv(&incoming);
+ zmq_dab_message_t* dab_msg = (zmq_dab_message_t*)incoming.data();
- for (int i = 0; i < NUM_FRAMES_PER_ZMQ_MESSAGE; i++) {
- if (dab_msg->buflen[i] <= 0 or dab_msg->buflen[i] > 6144) {
- etiLog.level(error) << "ZeroMQ buffer " << i << " has invalid length " <<
- dab_msg->buflen[i];
+ if (dab_msg->version != 1) {
+ etiLog.level(error) << "ZeroMQ wrong packet version " << dab_msg->version;
error_count++;
}
- else {
- std::vector<uint8_t> buf(6144, 0x55);
- const int framesize = dab_msg->buflen[i];
+ int offset = sizeof(dab_msg->version) +
+ NUM_FRAMES_PER_ZMQ_MESSAGE * sizeof(*dab_msg->buflen);
- memcpy(&buf.front(),
- ((uint8_t*)incoming.data()) + offset,
- framesize);
-
- offset += framesize;
-
- if (output.Write(&buf.front(), buf.size()) == -1) {
- etiLog.level(error) << "Cannot write to output!";
+ for (int i = 0; i < NUM_FRAMES_PER_ZMQ_MESSAGE; i++) {
+ if (dab_msg->buflen[i] <= 0 or dab_msg->buflen[i] > 6144) {
+ etiLog.level(error) << "ZeroMQ buffer " << i << " has invalid length " <<
+ dab_msg->buflen[i];
error_count++;
}
+ else {
+ std::vector<uint8_t> buf(6144, 0x55);
+
+ const int framesize = dab_msg->buflen[i];
+
+ memcpy(&buf.front(),
+ ((uint8_t*)incoming.data()) + offset,
+ framesize);
- frame_count++;
+ offset += framesize;
+
+ if (output.Write(&buf.front(), buf.size()) == -1) {
+ etiLog.level(error) << "Cannot write to output!";
+ error_count++;
+ }
+
+ frame_count++;
+ }
}
- }
- loop_counter++;
- if (loop_counter > 250) {
- etiLog.level(info) << "Transmitted " << frame_count << " ETI frames";
- loop_counter = 0;
+ loop_counter++;
+ if (loop_counter > 250) {
+ etiLog.level(info) << "Transmitted " << frame_count << " ETI frames";
+ loop_counter = 0;
+ }
}
}
+
+ num_consecutive_resets++;
+
+ zmq_sock.close();
+ std::this_thread::sleep_for(std::chrono::seconds(1));
+ etiLog.level(info) << "ZMQ input (" << source_url << ") timeout after " <<
+ num_consecutive_resets << " consecutive resets.";
}
- return error_count > 0 ? 2 : 0;
+ return 0;
}