aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--README.md190
-rw-r--r--TODO.md17
-rw-r--r--contrib/Socket.cpp42
-rw-r--r--contrib/Socket.h18
-rw-r--r--contrib/ThreadsafeQueue.h40
-rw-r--r--contrib/edioutput/EDIConfig.h32
-rw-r--r--contrib/edioutput/PFT.cpp12
-rw-r--r--contrib/edioutput/PFT.h15
-rw-r--r--contrib/edioutput/Transport.cpp300
-rw-r--r--contrib/edioutput/Transport.h113
-rw-r--r--contrib/fec/decode_rs.h12
-rw-r--r--man/odr-audioenc.14
-rw-r--r--src/AlsaInput.cpp12
-rw-r--r--src/GSTInput.cpp7
-rw-r--r--src/JackInput.cpp3
-rw-r--r--src/Outputs.cpp18
-rw-r--r--src/PadInterface.cpp26
-rw-r--r--src/PadInterface.h6
-rw-r--r--src/VLCInput.cpp87
-rw-r--r--src/odr-audioenc.cpp160
20 files changed, 707 insertions, 407 deletions
diff --git a/README.md b/README.md
index 6ca7741..95b96c5 100644
--- a/README.md
+++ b/README.md
@@ -32,27 +32,29 @@ For detailed usage, see the usage screen of the tool with the *-h* option.
More information is available on the
[Opendigitalradio wiki](http://opendigitalradio.org)
-# Installation
+## Installation
+
You have 3 ways to install odr-audioenc on your host:
-## Using your linux distribution packaging system
-`odr-audioenc` is available on the official repositories of several debian-based distributions, such as Debian
-(from Debian 12), Ubuntu (from 24.10), Opensuse and Arch.
+### Installing binary packages on some linux distributions
+
+[![Packaging status](https://repology.org/badge/vertical-allrepos/odr-audioenc.svg)](https://repology.org/project/odr-audioenc/versions)
-If you are using Debian 12 (Bookworm), you will need to
-[add the backports repository](https://backports.debian.org/Instructions/)
+### Using installation scripts
-## Using installation scripts
If your linux distribution is debian-based, you can install odr-audioenc
-as well as the other main components of the mmbTools set with the
+as well as the other main components of the mmbTools set with the
[Opendigitalradio dab-scripts](https://github.com/opendigitalradio/dab-scripts.git)
-## Compiling manually
+### Compiling manually
+
Unlike the 2 previous options, this one allows you to compile odr-audioenc with the features you really need.
-### Requirements
+#### Requirements
+
For Debian Bullseye-based OS, run the following commands:
-```
+
+```sh
# Required packages
## C++11 compiler
sudo apt-get install --yes build-essential automake libtool
@@ -79,39 +81,47 @@ sudo apt-get install --yes libcurl4-openssl-dev
**Attention**: on versions older than Debian Buster, you'll need `vlc-nox` instead of `vlc-plugin-base`
-### Compilation
+#### Compilation
+
1. Clone this repository:
- ```
+
+ ```sh
# stable version:
git clone https://github.com/Opendigitalradio/ODR-AudioEnc.git
# or development version (at your own risk):
git clone https://github.com/Opendigitalradio/ODR-AudioEnc.git -b next
```
+
1. Configure the project
- ```
+
+ ```sh
cd ODR-AudioEnc
./bootstrap
# Select the features you need:
./configure --enable-alsa --enable-jack --enable-vlc --enable-gst
```
+
1. Compile and install:
- ```
+
+ ```sh
make
sudo make install
```
-# How to use
+## How to use
We assume that you have a ODR-DabMux configured for an EDI
input on port 9000.
- ALSASRC="default"
- DST="tcp://yourserver:9000"
- BITRATE=64
+```sh
+ALSASRC="default"
+DST="tcp://yourserver:9000"
+BITRATE=64
+```
-## General remarks
+### General remarks
Avoid using sources that are already encoded with a low bitrate, because
encoder cascading will noticeably reduce audio quality. Best are sources
@@ -130,8 +140,8 @@ to saturation, especially when you have to resample. When you see little
exclamation marks with the `-l` option, it's too loud! Reduce the gain at the
source, or use the gain option if that's not possible.
+### DAB+ AAC encoder configuration
-## DAB+ AAC encoder configuration
By default, when not overridden by the `--aaclc`, `--sbr` or `--ps` options,
the encoder is configured according to bitrate and number of channels.
@@ -142,30 +152,41 @@ If two channels are used, PS (Parametric Stereo, also called HE-AAC v2)
is enabled up to 48kbps. Between 56kbps and 80kbps, SBR is enabled. 88kbps
and higher are using AAC-LC.
-## EDI output
+### EDI output
The EDI output included in ODR-AudioEnc is able to connect to
one or several instances of ODR-DabMux. The `-e` option can be used
more than once to achieve this. The same goes for the ZeroMQ output (`-o` option).
-## Scenario *wav file for offline processing*
+### Scenario *wav file for offline processing*
+
Wave file encoding, for non-realtime processing
- odr-audioenc -b $BITRATE -i wave_file.wav -o station1.dabp
+```sh
+odr-audioenc -b $BITRATE -i wave_file.wav -o station1.dabp
+```
+
+### Scenario *file that VLC supports*
-## Scenario *file that VLC supports*
If you want to input a file through libvlc, you need to give an absolute path:
- odr-audioenc -b $BITRATE -v file:///home/odr/audio/source.mp3 -o station1.dabp
+```sh
+odr-audioenc -b $BITRATE -v file:///home/odr/audio/source.mp3 -o station1.dabp
+```
+
+### Scenario *ALSA*
-## Scenario *ALSA*
Live Stream from ALSA sound card at 32kHz, with EDI output for ODR-DabMux:
- odr-audioenc -d $ALSASRC -c 2 -r 32000 -b $BITRATE -e $DST -l
+```sh
+odr-audioenc -d $ALSASRC -c 2 -r 32000 -b $BITRATE -e $DST -l
+```
To enable sound card drift compensation, add the option **-D**:
- odr-audioenc -d $ALSASRC -c 2 -r 32000 -b $BITRATE -e $DST -D -l
+```sh
+odr-audioenc -d $ALSASRC -c 2 -r 32000 -b $BITRATE -e $DST -D -l
+```
You might see **U** and **O** appearing on the terminal. They correspond
to audio **u**nderruns and **o**verruns that happen due to the different speeds at which
@@ -173,12 +194,15 @@ the audio is captured from the soundcard, and encoded into HE-AACv2.
High occurrence of these will lead to audible artifacts.
-## Scenario *encode a webstream*
+### Scenario *encode a webstream*
+
You can use either GStreamer with the `-G` option or libVLC with `-v`.
Read a webstream and send it to ODR-DabMux over EDI:
- odr-audioenc -G $URL -r 32000 -c 2 -e $DST -l -b $BITRATE
+```sh
+odr-audioenc -G $URL -r 32000 -c 2 -e $DST -l -b $BITRATE
+```
If you need to extract the ICY-Text information, e.g. for DLS, you can use the
`-w <filename>` option to write the ICY-Text into a file that can be read by
@@ -188,44 +212,55 @@ libVLC.
If the webstream bitrate is slightly wrong (bad clock at the source), you can
enable drift compensation with `-D`.
-## Scenario *Custom GStreamer pipeline*
+### Scenario *Custom GStreamer pipeline*
The `--gst-pipeline` option lets you run custom pipelines, using the same
syntax as `gst-launch`, which can be necessary for sources that you cannot specify through a URI through the `-G` option.
For example, you may use udpsrc to receive an RTP stream:
- odr-audioenc --gst-pipeline 'udpsrc port=5004 caps=application/x-rtp,media=(string)audio,payload=(int)10,clock-rate=44100 ! rtpL16depay ! audioconvert ! audioresample' \
+```sh
+odr-audioenc --gst-pipeline 'udpsrc port=5004 caps=application/x-rtp,media=(string)audio,payload=(int)10,clock-rate=44100 ! rtpL16depay ! audioconvert ! audioresample' \
-e $DST -l -b $BITRATE
+```
+
+### Scenario *JACK input*
-## Scenario *JACK input*
JACK input: Instead of `-i (file input)` or `-d (ALSA input)`, use `-j *name*`, where *name* specifies the JACK
name for the encoder:
- odr-audioenc -j myenc -l -b $BITRATE -e $DST
+```sh
+odr-audioenc -j myenc -l -b $BITRATE -e $DST
+```
The JACK server must run at the samplerate of the encoder (32kHz or 48kHz). If that is not possible,
one workaround is to access JACK through VLC, which will resample accordingly:
- odr-audioenc -l -v jack://dab -b $BITRATE -e $DST
+```sh
+odr-audioenc -l -v jack://dab -b $BITRATE -e $DST
+```
-## Scenario *LiveWire* or *AES67*
+### Scenario *LiveWire* or *AES67*
When audio data is available on the network as a multicast stream, it can be encoded using the following pipeline:
- rtpdump -F payload 239.192.1.1/5004 | \
- sox -t raw -e signed-integer -r 48000 -c 2 -b 24 -B /dev/stdin -t raw --no-dither -r 48000 -c 2 -b 16 -L /dev/stdout gain 4 | \
- odr-audioenc -f raw -b $BITRATE -i /dev/stdin -e $DST
+```sh
+rtpdump -F payload 239.192.1.1/5004 | \
+sox -t raw -e signed-integer -r 48000 -c 2 -b 24 -B /dev/stdin -t raw --no-dither -r 48000 -c 2 -b 16 -L /dev/stdout gain 4 | \
+odr-audioenc -f raw -b $BITRATE -i /dev/stdin -e $DST
+```
It is also possible to use the libvlc input, where you need to create an SDP file with the following contents:
- v=0
- o=Node 1 1 IN IP4 172.16.235.155
- s=TestSine
- t=0 0
- a=type:multicast
- c=IN IP4 239.192.0.1
- m=audio 5004 RTP/AVP 97
- a=rtpmap:97 L24/48000/2
+```text
+v=0
+o=Node 1 1 IN IP4 172.16.235.155
+s=TestSine
+t=0 0
+a=type:multicast
+c=IN IP4 239.192.0.1
+m=audio 5004 RTP/AVP 97
+a=rtpmap:97 L24/48000/2
+```
Replace the IP address in the `o=` field by the one corresponding to your
source node IP address, and the IP in `c=` by the multicast IP of your stream.
@@ -235,8 +270,8 @@ This could maybe also work with GStreamer, but needs more testing. Help would be
in improving the GStreamer input code to also support more advanced features, some pointers are
in *TODO.md*
+### Scenario *local file through snd-aloop*
-## Scenario *local file through snd-aloop*
Play some local audio source from a file, with EDI or ZMQ output for ODR-DabMux. The problem with
playing a file is that *odr-audioenc* cannot directly be used, because ODR-DabMux
does not back-pressure the encoder, which will therefore encode much faster than realtime.
@@ -244,43 +279,57 @@ does not back-pressure the encoder, which will therefore encode much faster than
While this issue is sorted out, the following trick is a very flexible solution: use the
alsa virtual loop soundcard *snd-aloop* in the following way:
- modprobe snd-aloop
+```sh
+modprobe snd-aloop
+```
This creates a new audio card (usually 'hw:1' but have a look at `/proc/asound/card` to be sure) that
can then be used for the alsa encoder.
- ./odr-audioenc -d hw:1 -c 2 -r 32000 -b 64 -e $DST -l
+```sh
+odr-audioenc -d hw:1 -c 2 -r 32000 -b 64 -e $DST -l
+```
Then, you can use any media player that has an alsa output to play whatever source it supports:
- cd your/preferred/music
- mplayer -ao alsa:device=hw=1.1 -srate 32000 -format=s16le -shuffle *
+```sh
+cd your/preferred/music
+mplayer -ao alsa:device=hw=1.1 -srate 32000 -format=s16le -shuffle *
+```
**Important**: you must specify the correct sample rate and sample format on both
"sides" of the virtual sound card.
+### Scenario *mplayer and fifo*
-## Scenario *mplayer and fifo*
**Warning**: Connection through pipes to ODR-DabMux are deprecated in favour of EDI.
Live Stream resampling (to 32KHz) and encoding from FIFO and preparing for DAB muxer, with FIFO to odr-dabmux
using mplayer. If there are no data in FIFO, encoder generates silence.
- mplayer -quiet -loop 0 -af resample=32000:nowaveheader,format=s16le,channels=2 -ao pcm:file=/tmp/aac.fifo:fast <FILE/URL> &
- odr-audioenc -l -f raw --fifo-silence -i /tmp/aac.fifo -r 32000 -c 2 -b 72 -o /dev/stdout \
- mbuffer -q -m 10k -P 100 -s 1080 > station1.fifo
+```sh
+mplayer \
+ -quiet -loop 0 \
+ -af resample=32000:nowaveheader,format=s16le,channels=2 \
+ -ao pcm:file=/tmp/aac.fifo:fast <FILE/URL> &
+odr-audioenc \
+ -l -f raw --fifo-silence -i /tmp/aac.fifo -r 32000 -c 2 -b 72 \
+ -o /dev/stdout \
+mbuffer -q -m 10k -P 100 -s 1080 > station1.fifo
+```
**Note**: Do not use `/dev/stdout` for PCM output in mplayer. Mplayer logs messages to stdout.
## Return values
+
odr-audioenc returns:
- * 0 if it encoded the whole input file
- * 1 if some options were not understood, or encoder initialisation failed
- * 2 if the silence timeout was reached
- * 3 if the AAC encoder failed
- * 4 it sending data over the network failed
- * 5 if the input had a fault
+* 0: if it encoded the whole input file
+* 1: if some options were not understood, or encoder initialisation failed
+* 2: if the silence timeout was reached
+* 3: if the AAC encoder failed
+* 4: it sending data over the network failed
+* 5: if the input had a fault
The `-R` option to get ODR-AudioEnc to restart the input
automatically has been deprecated. As this feature does not guarantee that
@@ -288,8 +337,8 @@ the odr-audioenc process will never die, running it under a process supervisor
is recommended regardless of this feature being enabled or not. It will be removed
in a future version.
-
## Known Limitations
+
Some receivers did not decode audio anymore between v0.3.0 and v0.5.0, because of
a change implemented to get PAD to work. The change was subsequently reverted in
v0.5.1 because it was deemed essential that audio decoding works on all receivers.
@@ -298,16 +347,16 @@ v0.7.0 fixes most issues, and PAD now works much more reliably.
Version 0.4.0 of the encoder changed the ZeroMQ framing. It will only work with
ODR-DabMux v0.7.0 and later.
-# LICENCE
+## LICENCE
The ODR-AudioEnc project contains
- - The code for odr-audioenc in src/ licensed under the Apache Licence v2.0. See
- http://www.apache.org/licenses/LICENSE-2.0
- - libtoolame-dab, derived from TooLAME, licensed under LGPL v2.1 or later. See
+* The code for odr-audioenc in src/ licensed under the Apache Licence v2.0. See
+ <http://www.apache.org/licenses/LICENSE-2.0>
+* libtoolame-dab, derived from TooLAME, licensed under LGPL v2.1 or later. See
`libtoolame-dab/LGPL.txt`. This is built into a shared library.
- - EDI output (files in src/edi) are GPLv3+
- - The FDK-AAC encoder, patched for DAB+ support, licensed under the terms in
+* EDI output (files in src/edi) are GPLv3+
+* The FDK-AAC encoder, patched for DAB+ support, licensed under the terms in
`fdk-aac/NOTICE`, built into a shared library.
The odr-audioenc binary is statically linked against the libtoolame-dab and fdk-aac
@@ -315,4 +364,3 @@ libraries.
Whether it is legal or not to distribute compiled binaries from these sources
is unclear to me. Please seek legal advice to answer this question.
-
diff --git a/TODO.md b/TODO.md
index a7c8cae..bfa44d7 100644
--- a/TODO.md
+++ b/TODO.md
@@ -1,24 +1,23 @@
+# To Do
+
This TODO file lists ideas and features for future developments. They are
more or less ordered according to their benefit, but that is subjective
to some degree.
Unless written, no activity has been started on the topics.
-Drift compenstation statistics
-------------------------------
+## Drift compenstation statistics
Insert drift compensation statistics into EDI metadata.
-
-GStreamer input and AES67
--------------------------
+## GStreamer input and AES67
AES67 support could be nice.
-GST can apparently use PTP https://gstreamer.freedesktop.org/documentation/net/gstptpclock.html?gi-language=c
+GST can apparently use PTP <https://gstreamer.freedesktop.org/documentation/net/gstptpclock.html?gi-language=c>
-https://gstreamer.freedesktop.org/documentation/sdpelem/sdpdemux.html?gi-language=c
+<https://gstreamer.freedesktop.org/documentation/sdpelem/sdpdemux.html?gi-language=c>
-https://www.collabora.com/news-and-blog/blog/2017/04/25/receiving-an-aes67-stream-with-gstreamer/
+<https://www.collabora.com/news-and-blog/blog/2017/04/25/receiving-an-aes67-stream-with-gstreamer/>
-https://archive.fosdem.org/2016/schedule/event/synchronised_gstreamer/attachments/slides/889/export/events/attachments/synchronised_gstreamer/slides/889/synchronised_multidevice_media_playback_with_GStreamer.pdf
+<https://archive.fosdem.org/2016/schedule/event/synchronised_gstreamer/attachments/slides/889/export/events/attachments/synchronised_gstreamer/slides/889/synchronised_multidevice_media_playback_with_GStreamer.pdf>
diff --git a/contrib/Socket.cpp b/contrib/Socket.cpp
index a85b98b..33c9c73 100644
--- a/contrib/Socket.cpp
+++ b/contrib/Socket.cpp
@@ -24,6 +24,7 @@
#include "Socket.h"
+#include <numeric>
#include <stdexcept>
#include <cstdio>
#include <cstring>
@@ -478,7 +479,7 @@ TCPSocket::~TCPSocket()
TCPSocket::TCPSocket(TCPSocket&& other) :
m_sock(other.m_sock),
- m_remote_address(move(other.m_remote_address))
+ m_remote_address(std::move(other.m_remote_address))
{
if (other.m_sock != -1) {
other.m_sock = -1;
@@ -967,12 +968,22 @@ ssize_t TCPClient::recv(void *buffer, size_t length, int flags, int timeout_ms)
reconnect();
}
+ m_last_received_packet_ts = chrono::steady_clock::now();
+
return ret;
}
catch (const TCPSocket::Interrupted&) {
return -1;
}
catch (const TCPSocket::Timeout&) {
+ const auto timeout = chrono::milliseconds(timeout_ms * 5);
+ if (m_last_received_packet_ts.has_value() and
+ chrono::steady_clock::now() - *m_last_received_packet_ts > timeout)
+ {
+ // This is to catch half-closed TCP connections
+ reconnect();
+ }
+
return 0;
}
@@ -983,6 +994,7 @@ void TCPClient::reconnect()
{
TCPSocket newsock;
m_sock = std::move(newsock);
+ m_last_received_packet_ts = nullopt;
m_sock.connect(m_hostname, m_port, true);
}
@@ -990,7 +1002,7 @@ TCPConnection::TCPConnection(TCPSocket&& sock) :
queue(),
m_running(true),
m_sender_thread(),
- m_sock(move(sock))
+ m_sock(std::move(sock))
{
#if MISSING_OWN_ADDR
auto own_addr = m_sock.getOwnAddress();
@@ -1052,6 +1064,17 @@ void TCPConnection::process()
#endif
}
+TCPConnection::stats_t TCPConnection::get_stats() const
+{
+ TCPConnection::stats_t s;
+ const vector<size_t> buffer_sizes = queue.map<size_t>(
+ [](const vector<uint8_t>& vec) { return vec.size(); }
+ );
+
+ s.buffer_fullness = std::accumulate(buffer_sizes.cbegin(), buffer_sizes.cend(), 0);
+ s.remote_address = m_sock.get_remote_address();
+ return s;
+}
TCPDataDispatcher::TCPDataDispatcher(size_t max_queue_size, size_t buffers_to_preroll) :
m_max_queue_size(max_queue_size),
@@ -1109,7 +1132,7 @@ void TCPDataDispatcher::process()
auto sock = m_listener_socket.accept(timeout_ms);
if (sock.valid()) {
auto lock = unique_lock<mutex>(m_mutex);
- m_connections.emplace(m_connections.begin(), move(sock));
+ m_connections.emplace(m_connections.begin(), std::move(sock));
if (m_buffers_to_preroll > 0) {
for (const auto& buf : m_preroll_queue) {
@@ -1125,6 +1148,17 @@ void TCPDataDispatcher::process()
}
}
+
+std::vector<TCPConnection::stats_t> TCPDataDispatcher::get_stats() const
+{
+ std::vector<TCPConnection::stats_t> s;
+ auto lock = unique_lock<mutex>(m_mutex);
+ for (const auto& conn : m_connections) {
+ s.push_back(conn.get_stats());
+ }
+ return s;
+}
+
TCPReceiveServer::TCPReceiveServer(size_t blocksize) :
m_blocksize(blocksize)
{
@@ -1181,7 +1215,7 @@ void TCPReceiveServer::process()
}
else {
buf.resize(r);
- m_queue.push(make_shared<TCPReceiveMessageData>(move(buf)));
+ m_queue.push(make_shared<TCPReceiveMessageData>(std::move(buf)));
}
}
catch (const TCPSocket::Interrupted&) {
diff --git a/contrib/Socket.h b/contrib/Socket.h
index ab2a14a..b9a40ee 100644
--- a/contrib/Socket.h
+++ b/contrib/Socket.h
@@ -31,9 +31,11 @@
#include "ThreadsafeQueue.h"
#include <cstdlib>
#include <atomic>
-#include <string>
+#include <chrono>
#include <list>
#include <memory>
+#include <optional>
+#include <string>
#include <thread>
#include <vector>
@@ -211,6 +213,8 @@ class TCPSocket {
SOCKET get_sockfd() const { return m_sock; }
+ InetAddress get_remote_address() const { return m_remote_address; }
+
private:
explicit TCPSocket(int sockfd);
explicit TCPSocket(int sockfd, InetAddress remote_address);
@@ -236,6 +240,8 @@ class TCPClient {
TCPSocket m_sock;
std::string m_hostname;
int m_port;
+
+ std::optional<std::chrono::steady_clock::time_point> m_last_received_packet_ts;
};
/* Helper class for TCPDataDispatcher, contains a queue of pending data and
@@ -250,6 +256,12 @@ class TCPConnection
ThreadsafeQueue<std::vector<uint8_t> > queue;
+ struct stats_t {
+ size_t buffer_fullness = 0;
+ InetAddress remote_address;
+ };
+ stats_t get_stats() const;
+
private:
std::atomic<bool> m_running;
std::thread m_sender_thread;
@@ -272,6 +284,8 @@ class TCPDataDispatcher
void start(int port, const std::string& address);
void write(const std::vector<uint8_t>& data);
+ std::vector<TCPConnection::stats_t> get_stats() const;
+
private:
void process();
@@ -284,7 +298,7 @@ class TCPDataDispatcher
std::thread m_listener_thread;
TCPSocket m_listener_socket;
- std::mutex m_mutex;
+ mutable std::mutex m_mutex;
std::deque<std::vector<uint8_t> > m_preroll_queue;
std::list<TCPConnection> m_connections;
};
diff --git a/contrib/ThreadsafeQueue.h b/contrib/ThreadsafeQueue.h
index 8b385d6..a8d2e85 100644
--- a/contrib/ThreadsafeQueue.h
+++ b/contrib/ThreadsafeQueue.h
@@ -2,7 +2,7 @@
Copyright (C) 2007, 2008, 2009, 2010, 2011 Her Majesty the Queen in
Right of Canada (Communications Research Center Canada)
- Copyright (C) 2023
+ Copyright (C) 2025
Matthias P. Braendli, matthias.braendli@mpb.li
An implementation for a threadsafe queue, depends on C++11
@@ -28,9 +28,10 @@
#pragma once
+#include <functional>
#include <mutex>
#include <condition_variable>
-#include <queue>
+#include <deque>
#include <utility>
#include <cassert>
@@ -63,10 +64,10 @@ public:
std::unique_lock<std::mutex> lock(the_mutex);
size_t queue_size_before = the_queue.size();
if (max_size == 0) {
- the_queue.push(val);
+ the_queue.push_back(val);
}
else if (queue_size_before < max_size) {
- the_queue.push(val);
+ the_queue.push_back(val);
}
size_t queue_size = the_queue.size();
lock.unlock();
@@ -80,10 +81,10 @@ public:
std::unique_lock<std::mutex> lock(the_mutex);
size_t queue_size_before = the_queue.size();
if (max_size == 0) {
- the_queue.emplace(std::move(val));
+ the_queue.emplace_back(std::move(val));
}
else if (queue_size_before < max_size) {
- the_queue.emplace(std::move(val));
+ the_queue.emplace_back(std::move(val));
}
size_t queue_size = the_queue.size();
lock.unlock();
@@ -110,9 +111,9 @@ public:
bool overflow = false;
while (the_queue.size() >= max_size) {
overflow = true;
- the_queue.pop();
+ the_queue.pop_front();
}
- the_queue.push(val);
+ the_queue.push_back(val);
const size_t queue_size = the_queue.size();
lock.unlock();
@@ -129,9 +130,9 @@ public:
bool overflow = false;
while (the_queue.size() >= max_size) {
overflow = true;
- the_queue.pop();
+ the_queue.pop_front();
}
- the_queue.emplace(std::move(val));
+ the_queue.emplace_back(std::move(val));
const size_t queue_size = the_queue.size();
lock.unlock();
@@ -152,7 +153,7 @@ public:
while (the_queue.size() >= threshold) {
the_tx_notification.wait(lock);
}
- the_queue.push(val);
+ the_queue.push_back(val);
size_t queue_size = the_queue.size();
lock.unlock();
@@ -198,7 +199,7 @@ public:
}
popped_value = the_queue.front();
- the_queue.pop();
+ the_queue.pop_front();
lock.unlock();
the_tx_notification.notify_one();
@@ -220,15 +221,26 @@ public:
}
else {
std::swap(popped_value, the_queue.front());
- the_queue.pop();
+ the_queue.pop_front();
lock.unlock();
the_tx_notification.notify_one();
}
}
+ template<typename R>
+ std::vector<R> map(std::function<R(const T&)> func) const
+ {
+ std::vector<R> result;
+ std::unique_lock<std::mutex> lock(the_mutex);
+ for (const T& elem : the_queue) {
+ result.push_back(func(elem));
+ }
+ return result;
+ }
+
private:
- std::queue<T> the_queue;
+ std::deque<T> the_queue;
mutable std::mutex the_mutex;
std::condition_variable the_rx_notification;
std::condition_variable the_tx_notification;
diff --git a/contrib/edioutput/EDIConfig.h b/contrib/edioutput/EDIConfig.h
index 1997210..de4217f 100644
--- a/contrib/edioutput/EDIConfig.h
+++ b/contrib/edioutput/EDIConfig.h
@@ -1,5 +1,5 @@
/*
- Copyright (C) 2019
+ Copyright (C) 2025
Matthias P. Braendli, matthias.braendli@mpb.li
http://www.opendigitalradio.org
@@ -27,6 +27,7 @@
#pragma once
+#include <optional>
#include <vector>
#include <string>
#include <memory>
@@ -36,17 +37,31 @@ namespace edi {
/** Configuration for EDI output */
+struct pft_settings_t {
+ // protection and fragmentation settings
+ bool verbose = false;
+ bool enable_pft = false;
+ unsigned chunk_len = 207; // RSk, data length of each chunk
+ unsigned fec = 0; // number of fragments that can be recovered
+ double fragment_spreading_factor = 0.95;
+ // Spread transmission of fragments in time. 1.0 = 100% means spreading over the whole duration of a frame (24ms)
+ // Above 100% means that the fragments are spread over several 24ms periods, interleaving the AF packets.
+};
+
struct destination_t {
virtual ~destination_t() {};
+
+ pft_settings_t pft_settings = {};
};
+
// Can represent both unicast and multicast destinations
struct udp_destination_t : public destination_t {
std::string dest_addr;
- unsigned int dest_port = 0;
+ uint16_t dest_port = 0;
std::string source_addr;
- unsigned int source_port = 0;
- unsigned int ttl = 10;
+ uint16_t source_port = 0;
+ std::optional<uint8_t> ttl = std::nullopt;
};
// TCP server that can accept multiple connections
@@ -66,16 +81,9 @@ struct tcp_client_t : public destination_t {
};
struct configuration_t {
- unsigned chunk_len = 207; // RSk, data length of each chunk
- unsigned fec = 0; // number of fragments that can be recovered
- bool dump = false; // dump a file with the EDI packets
- bool verbose = false;
- bool enable_pft = false; // Enable protection and fragmentation
+ bool verbose = false;
unsigned int tagpacket_alignment = 0;
std::vector<std::shared_ptr<destination_t> > destinations;
- double fragment_spreading_factor = 0.95;
- // Spread transmission of fragments in time. 1.0 = 100% means spreading over the whole duration of a frame (24ms)
- // Above 100% means that the fragments are spread over several 24ms periods, interleaving the AF packets.
bool enabled() const { return destinations.size() > 0; }
diff --git a/contrib/edioutput/PFT.cpp b/contrib/edioutput/PFT.cpp
index 7e0e8e9..f65fd67 100644
--- a/contrib/edioutput/PFT.cpp
+++ b/contrib/edioutput/PFT.cpp
@@ -1,5 +1,5 @@
/*
- Copyright (C) 2021
+ Copyright (C) 2025
Matthias P. Braendli, matthias.braendli@mpb.li
http://www.opendigitalradio.org
@@ -31,7 +31,6 @@
*/
#include <vector>
-#include <list>
#include <cstdio>
#include <cstring>
#include <cstdint>
@@ -41,6 +40,7 @@
#include "PFT.h"
#include "crc.h"
#include "ReedSolomon.h"
+#include "Log.h"
namespace edi {
@@ -51,11 +51,10 @@ using namespace std;
PFT::PFT() { }
-PFT::PFT(const configuration_t &conf) :
+PFT::PFT(const pft_settings_t& conf) :
+ m_enabled(conf.enable_pft),
m_k(conf.chunk_len),
m_m(conf.fec),
- m_pseq(0),
- m_num_chunks(0),
m_verbose(conf.verbose)
{
if (m_k > 207) {
@@ -324,5 +323,4 @@ void PFT::OverridePSeq(uint16_t pseq)
m_pseq = pseq;
}
-}
-
+} // namespace edi
diff --git a/contrib/edioutput/PFT.h b/contrib/edioutput/PFT.h
index 42569a0..52e9f46 100644
--- a/contrib/edioutput/PFT.h
+++ b/contrib/edioutput/PFT.h
@@ -1,5 +1,5 @@
/*
- Copyright (C) 2021
+ Copyright (C) 2025
Matthias P. Braendli, matthias.braendli@mpb.li
http://www.opendigitalradio.org
@@ -33,12 +33,8 @@
#pragma once
#include <vector>
-#include <list>
-#include <stdexcept>
#include <cstdint>
#include "AFPacket.h"
-#include "Log.h"
-#include "ReedSolomon.h"
#include "EDIConfig.h"
namespace edi {
@@ -52,21 +48,24 @@ class PFT
static constexpr int PARITYBYTES = 48;
PFT();
- PFT(const configuration_t& conf);
+ PFT(const pft_settings_t& conf);
+
+ bool is_enabled() const { return m_enabled and m_k > 0; }
// return a list of PFT fragments with the correct
// PFT headers
- std::vector< PFTFragment > Assemble(AFPacket af_packet);
+ std::vector<PFTFragment> Assemble(AFPacket af_packet);
// Apply Reed-Solomon FEC to the AF Packet
RSBlock Protect(AFPacket af_packet);
// Cut a RSBlock into several fragments that can be transmitted
- std::vector< std::vector<uint8_t> > ProtectAndFragment(AFPacket af_packet);
+ std::vector<std::vector<uint8_t>> ProtectAndFragment(AFPacket af_packet);
void OverridePSeq(uint16_t pseq);
private:
+ bool m_enabled = false;
unsigned int m_k = 207; // length of RS data word
unsigned int m_m = 3; // number of fragments that can be recovered if lost
uint16_t m_pseq = 0;
diff --git a/contrib/edioutput/Transport.cpp b/contrib/edioutput/Transport.cpp
index 4979e93..3898213 100644
--- a/contrib/edioutput/Transport.cpp
+++ b/contrib/edioutput/Transport.cpp
@@ -1,5 +1,5 @@
/*
- Copyright (C) 2022
+ Copyright (C) 2025
Matthias P. Braendli, matthias.braendli@mpb.li
http://www.opendigitalradio.org
@@ -25,7 +25,7 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "Transport.h"
-#include <iterator>
+#include "Log.h"
#include <cmath>
#include <thread>
@@ -41,10 +41,15 @@ void configuration_t::print() const
if (auto udp_dest = dynamic_pointer_cast<edi::udp_destination_t>(edi_dest)) {
etiLog.level(info) << " UDP to " << udp_dest->dest_addr << ":" << udp_dest->dest_port;
if (not udp_dest->source_addr.empty()) {
- etiLog.level(info) << " source " << udp_dest->source_addr;
- etiLog.level(info) << " ttl " << udp_dest->ttl;
+ etiLog.level(info) << " source address=" << udp_dest->source_addr;
}
- etiLog.level(info) << " source port " << udp_dest->source_port;
+ if (udp_dest->ttl) {
+ etiLog.level(info) << " ttl=" << (int)(*udp_dest->ttl);
+ }
+ else {
+ etiLog.level(info) << " ttl=(default)";
+ }
+ etiLog.level(info) << " source port=" << udp_dest->source_port;
}
else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_server_t>(edi_dest)) {
etiLog.level(info) << " TCP listening on port " << tcp_dest->listen_port;
@@ -57,13 +62,18 @@ void configuration_t::print() const
else {
throw logic_error("EDI destination not implemented");
}
+ etiLog.level(info) << " PFT=" << edi_dest->pft_settings.enable_pft;
+ if (edi_dest->pft_settings.enable_pft) {
+ etiLog.level(info) << " FEC=" << edi_dest->pft_settings.fec;
+ etiLog.level(info) << " Chunk Len=" << edi_dest->pft_settings.chunk_len;
+ etiLog.level(info) << " Fragment spreading factor=" << edi_dest->pft_settings.fragment_spreading_factor;
+ }
}
}
Sender::Sender(const configuration_t& conf) :
- m_conf(conf),
- edi_pft(m_conf)
+ m_conf(conf)
{
if (m_conf.verbose) {
etiLog.level(info) << "Setup EDI Output";
@@ -71,37 +81,42 @@ Sender::Sender(const configuration_t& conf) :
for (const auto& edi_dest : m_conf.destinations) {
if (const auto udp_dest = dynamic_pointer_cast<edi::udp_destination_t>(edi_dest)) {
- auto udp_socket = std::make_shared<Socket::UDPSocket>(udp_dest->source_port);
+ Socket::UDPSocket udp_socket(udp_dest->source_port);
if (not udp_dest->source_addr.empty()) {
- udp_socket->setMulticastSource(udp_dest->source_addr.c_str());
- udp_socket->setMulticastTTL(udp_dest->ttl);
+ udp_socket.setMulticastSource(udp_dest->source_addr.c_str());
}
- udp_sockets.emplace(udp_dest.get(), udp_socket);
+ if (udp_dest->ttl) {
+ udp_socket.setMulticastTTL(*udp_dest->ttl);
+ }
+
+ auto sender = make_shared<udp_sender_t>(
+ udp_dest->dest_addr,
+ udp_dest->dest_port,
+ std::move(udp_socket));
+ m_pft_spreaders.emplace_back(
+ make_shared<PFTSpreader>(udp_dest->pft_settings, sender));
}
else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_server_t>(edi_dest)) {
- auto dispatcher = make_shared<Socket::TCPDataDispatcher>(
- tcp_dest->max_frames_queued, tcp_dest->tcp_server_preroll_buffers);
-
- dispatcher->start(tcp_dest->listen_port, "0.0.0.0");
- tcp_dispatchers.emplace(tcp_dest.get(), dispatcher);
+ auto sender = make_shared<tcp_dispatcher_t>(
+ tcp_dest->listen_port,
+ tcp_dest->max_frames_queued,
+ tcp_dest->tcp_server_preroll_buffers);
+ m_pft_spreaders.emplace_back(
+ make_shared<PFTSpreader>(tcp_dest->pft_settings, sender));
}
else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_client_t>(edi_dest)) {
- auto tcp_send_client = make_shared<Socket::TCPSendClient>(tcp_dest->dest_addr, tcp_dest->dest_port);
- tcp_senders.emplace(tcp_dest.get(), tcp_send_client);
+ auto sender = make_shared<tcp_send_client_t>(tcp_dest->dest_addr, tcp_dest->dest_port, m_conf.verbose);
+ m_pft_spreaders.emplace_back(
+ make_shared<PFTSpreader>(tcp_dest->pft_settings, sender));
}
else {
throw logic_error("EDI destination not implemented");
}
}
- if (m_conf.dump) {
- edi_debug_file.open("./edi.debug");
- }
-
- if (m_conf.enable_pft) {
- unique_lock<mutex> lock(m_mutex);
+ {
m_running = true;
m_thread = thread(&Sender::run, this);
}
@@ -111,10 +126,52 @@ Sender::Sender(const configuration_t& conf) :
}
}
+void Sender::write(const TagPacket& tagpacket)
+{
+ // Assemble into one AF Packet
+ edi::AFPacket af_packet = edi_af_packetiser.Assemble(tagpacket);
+
+ write(af_packet);
+}
+
+void Sender::write(const AFPacket& af_packet)
+{
+ for (auto& sender : m_pft_spreaders) {
+ sender->send_af_packet(af_packet);
+ }
+}
+
+void Sender::override_af_sequence(uint16_t seq)
+{
+ edi_af_packetiser.OverrideSeq(seq);
+}
+
+void Sender::override_pft_sequence(uint16_t pseq)
+{
+ for (auto& spreader : m_pft_spreaders) {
+ spreader->edi_pft.OverridePSeq(pseq);
+ }
+}
+
+std::vector<Sender::stats_t> Sender::get_tcp_server_stats() const
+{
+ std::vector<Sender::stats_t> stats;
+
+ for (auto& spreader : m_pft_spreaders) {
+ if (auto sender = std::dynamic_pointer_cast<tcp_dispatcher_t>(spreader->sender)) {
+ Sender::stats_t s;
+ s.listen_port = sender->listen_port;
+ s.stats = sender->sock.get_stats();
+ stats.push_back(s);
+ }
+ }
+
+ return stats;
+}
+
Sender::~Sender()
{
{
- unique_lock<mutex> lock(m_mutex);
m_running = false;
}
@@ -123,36 +180,99 @@ Sender::~Sender()
}
}
-void Sender::write(const TagPacket& tagpacket)
+void Sender::run()
{
- // Assemble into one AF Packet
- edi::AFPacket af_packet = edi_afPacketiser.Assemble(tagpacket);
+ while (m_running) {
+ const auto now = chrono::steady_clock::now();
+ for (auto& spreader : m_pft_spreaders) {
+ spreader->tick(now);
+ }
- write(af_packet);
+ this_thread::sleep_for(chrono::microseconds(500));
+ }
}
-void Sender::write(const AFPacket& af_packet)
+
+void Sender::udp_sender_t::send_packet(const std::vector<uint8_t> &frame)
+{
+ Socket::InetAddress addr;
+ addr.resolveUdpDestination(dest_addr, dest_port);
+ sock.send(frame, addr);
+}
+
+void Sender::tcp_dispatcher_t::send_packet(const std::vector<uint8_t> &frame)
+{
+ sock.write(frame);
+}
+
+void Sender::tcp_send_client_t::send_packet(const std::vector<uint8_t> &frame)
+{
+ const auto error_stats = sock.sendall(frame);
+
+ if (verbose and error_stats.has_seen_new_errors) {
+ etiLog.level(warn) << "TCP output " << dest_addr << ":" << dest_port
+ << " has " << error_stats.num_reconnects
+ << " reconnects: most recent error: " << error_stats.last_error;
+ }
+}
+
+Sender::udp_sender_t::udp_sender_t(std::string dest_addr,
+ uint16_t dest_port,
+ Socket::UDPSocket&& sock) :
+ dest_addr(dest_addr),
+ dest_port(dest_port),
+ sock(std::move(sock))
+{
+}
+
+Sender::tcp_dispatcher_t::tcp_dispatcher_t(uint16_t listen_port,
+ size_t max_frames_queued,
+ size_t tcp_server_preroll_buffers) :
+ listen_port(listen_port),
+ sock(max_frames_queued, tcp_server_preroll_buffers)
+{
+ sock.start(listen_port, "0.0.0.0");
+}
+
+Sender::tcp_send_client_t::tcp_send_client_t(const std::string &dest_addr,
+ uint16_t dest_port,
+ bool verbose) :
+ dest_addr(dest_addr),
+ dest_port(dest_port),
+ verbose(verbose),
+ sock(dest_addr, dest_port)
{
- if (m_conf.enable_pft) {
+}
+
+Sender::PFTSpreader::PFTSpreader(const pft_settings_t& conf, sender_sp sender) :
+ sender(sender),
+ edi_pft(conf)
+{
+}
+
+void Sender::PFTSpreader::send_af_packet(const AFPacket& af_packet)
+{
+ using namespace std::chrono;
+ if (edi_pft.is_enabled()) {
// Apply PFT layer to AF Packet (Reed Solomon FEC and Fragmentation)
vector<edi::PFTFragment> edi_fragments = edi_pft.Assemble(af_packet);
- if (m_conf.verbose and m_last_num_pft_fragments != edi_fragments.size()) {
+ if (settings.verbose and last_num_pft_fragments != edi_fragments.size()) {
etiLog.log(debug, "EDI Output: Number of PFT fragments %zu\n",
edi_fragments.size());
- m_last_num_pft_fragments = edi_fragments.size();
+ last_num_pft_fragments = edi_fragments.size();
}
/* Spread out the transmission of all fragments over part of the 24ms AF packet duration
* to reduce the risk of losing a burst of fragments because of congestion. */
- using namespace std::chrono;
auto inter_fragment_wait_time = microseconds(1);
if (edi_fragments.size() > 1) {
- if (m_conf.fragment_spreading_factor > 0) {
+ if (settings.fragment_spreading_factor > 0) {
inter_fragment_wait_time =
- microseconds(
- llrint(m_conf.fragment_spreading_factor * 24000.0 / edi_fragments.size())
- );
+ microseconds(llrint(
+ settings.fragment_spreading_factor * 24000.0 /
+ edi_fragments.size()
+ ));
}
}
@@ -162,107 +282,35 @@ void Sender::write(const AFPacket& af_packet)
auto tp = now;
unique_lock<mutex> lock(m_mutex);
for (auto& edi_frag : edi_fragments) {
- m_pending_frames[tp] = move(edi_frag);
+ m_pending_frames[tp] = std::move(edi_frag);
tp += inter_fragment_wait_time;
}
}
-
- // Transmission done in run() function
}
else /* PFT disabled */ {
- // Send over ethernet
- if (m_conf.dump) {
- ostream_iterator<uint8_t> debug_iterator(edi_debug_file);
- copy(af_packet.begin(), af_packet.end(), debug_iterator);
- }
-
- for (auto& dest : m_conf.destinations) {
- if (const auto& udp_dest = dynamic_pointer_cast<edi::udp_destination_t>(dest)) {
- Socket::InetAddress addr;
- addr.resolveUdpDestination(udp_dest->dest_addr, udp_dest->dest_port);
-
- if (af_packet.size() > 1400 and not m_udp_fragmentation_warning_printed) {
- fprintf(stderr, "EDI Output: AF packet larger than 1400,"
- " consider using PFT to avoid UP fragmentation.\n");
- m_udp_fragmentation_warning_printed = true;
- }
-
- udp_sockets.at(udp_dest.get())->send(af_packet, addr);
- }
- else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_server_t>(dest)) {
- tcp_dispatchers.at(tcp_dest.get())->write(af_packet);
- }
- else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_client_t>(dest)) {
- const auto error_stats = tcp_senders.at(tcp_dest.get())->sendall(af_packet);
-
- if (m_conf.verbose and error_stats.has_seen_new_errors) {
- fprintf(stderr, "TCP output %s:%d has %zu reconnects: most recent error: %s\n",
- tcp_dest->dest_addr.c_str(),
- tcp_dest->dest_port,
- error_stats.num_reconnects,
- error_stats.last_error.c_str());
- }
- }
- else {
- throw logic_error("EDI destination not implemented");
- }
- }
+ const auto now = steady_clock::now();
+ unique_lock<mutex> lock(m_mutex);
+ m_pending_frames[now] = std::move(af_packet);
}
-}
-void Sender::override_af_sequence(uint16_t seq)
-{
- edi_afPacketiser.OverrideSeq(seq);
+ // Actual transmission done in tick() function
}
-void Sender::override_pft_sequence(uint16_t pseq)
+void Sender::PFTSpreader::tick(const std::chrono::steady_clock::time_point& now)
{
- edi_pft.OverridePSeq(pseq);
-}
+ unique_lock<mutex> lock(m_mutex);
-void Sender::run()
-{
- while (m_running) {
- unique_lock<mutex> lock(m_mutex);
- const auto now = chrono::steady_clock::now();
+ for (auto it = m_pending_frames.begin(); it != m_pending_frames.end(); ) {
+ const auto& edi_frag = it->second;
- // Send over ethernet
- for (auto it = m_pending_frames.begin(); it != m_pending_frames.end(); ) {
- const auto& edi_frag = it->second;
-
- if (it->first <= now) {
- if (m_conf.dump) {
- ostream_iterator<uint8_t> debug_iterator(edi_debug_file);
- copy(edi_frag.begin(), edi_frag.end(), debug_iterator);
- }
-
- for (auto& dest : m_conf.destinations) {
- if (const auto& udp_dest = dynamic_pointer_cast<edi::udp_destination_t>(dest)) {
- Socket::InetAddress addr;
- addr.resolveUdpDestination(udp_dest->dest_addr, udp_dest->dest_port);
-
- udp_sockets.at(udp_dest.get())->send(edi_frag, addr);
- }
- else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_server_t>(dest)) {
- tcp_dispatchers.at(tcp_dest.get())->write(edi_frag);
- }
- else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_client_t>(dest)) {
- tcp_senders.at(tcp_dest.get())->sendall(edi_frag);
- }
- else {
- throw logic_error("EDI destination not implemented");
- }
- }
- it = m_pending_frames.erase(it);
- }
- else {
- ++it;
- }
+ if (it->first <= now) {
+ sender->send_packet(edi_frag);
+ it = m_pending_frames.erase(it);
+ }
+ else {
+ ++it;
}
-
- lock.unlock();
- this_thread::sleep_for(chrono::microseconds(500));
}
}
-}
+} // namespace edi
diff --git a/contrib/edioutput/Transport.h b/contrib/edioutput/Transport.h
index c62545c..96784c0 100644
--- a/contrib/edioutput/Transport.h
+++ b/contrib/edioutput/Transport.h
@@ -1,5 +1,5 @@
/*
- Copyright (C) 2022
+ Copyright (C) 2025
Matthias P. Braendli, matthias.braendli@mpb.li
http://www.opendigitalradio.org
@@ -33,21 +33,21 @@
#include "Socket.h"
#include <chrono>
#include <map>
-#include <unordered_map>
-#include <fstream>
#include <cstdint>
#include <thread>
#include <mutex>
+#include <vector>
namespace edi {
-/** STI sender for EDI output */
-
+/** ETI/STI sender for EDI output */
class Sender {
public:
Sender(const configuration_t& conf);
Sender(const Sender&) = delete;
- Sender operator=(const Sender&) = delete;
+ Sender& operator=(const Sender&) = delete;
+ Sender(Sender&&) = delete;
+ Sender& operator=(Sender&&) = delete;
~Sender();
// Assemble the tagpacket into an AF packet, and if needed,
@@ -64,33 +64,90 @@ class Sender {
void override_af_sequence(uint16_t seq);
void override_pft_sequence(uint16_t pseq);
- private:
- void run();
-
- bool m_udp_fragmentation_warning_printed = false;
+ struct stats_t {
+ uint16_t listen_port;
+ std::vector<Socket::TCPConnection::stats_t> stats;
+ };
+ std::vector<stats_t> get_tcp_server_stats() const;
+ private:
configuration_t m_conf;
- std::ofstream edi_debug_file;
// The TagPacket will then be placed into an AFPacket
- edi::AFPacketiser edi_afPacketiser;
-
- // The AF Packet will be protected with reed-solomon and split in fragments
- edi::PFT edi_pft;
-
- std::unordered_map<udp_destination_t*, std::shared_ptr<Socket::UDPSocket>> udp_sockets;
- std::unordered_map<tcp_server_t*, std::shared_ptr<Socket::TCPDataDispatcher>> tcp_dispatchers;
- std::unordered_map<tcp_client_t*, std::shared_ptr<Socket::TCPSendClient>> tcp_senders;
+ edi::AFPacketiser edi_af_packetiser;
- // PFT spreading requires sending UDP packets at specific time, independently of
- // time when write() gets called
- std::thread m_thread;
- std::mutex m_mutex;
+ // PFT spreading requires sending UDP packets at specific time,
+ // independently of time when write() gets called
bool m_running = false;
- std::map<std::chrono::steady_clock::time_point, edi::PFTFragment> m_pending_frames;
-
- size_t m_last_num_pft_fragments = 0;
+ std::thread m_thread;
+ virtual void run();
+
+
+
+
+
+ struct i_sender {
+ virtual void send_packet(const std::vector<uint8_t> &frame) = 0;
+ virtual ~i_sender() { }
+ };
+
+ struct udp_sender_t : public i_sender {
+ udp_sender_t(
+ std::string dest_addr,
+ uint16_t dest_port,
+ Socket::UDPSocket&& sock);
+
+ std::string dest_addr;
+ uint16_t dest_port;
+ Socket::UDPSocket sock;
+
+ virtual void send_packet(const std::vector<uint8_t> &frame) override;
+ };
+
+ struct tcp_dispatcher_t : public i_sender {
+ tcp_dispatcher_t(
+ uint16_t listen_port,
+ size_t max_frames_queued,
+ size_t tcp_server_preroll_buffers);
+
+ uint16_t listen_port;
+ Socket::TCPDataDispatcher sock;
+ virtual void send_packet(const std::vector<uint8_t> &frame) override;
+ };
+
+ struct tcp_send_client_t : public i_sender {
+ tcp_send_client_t(
+ const std::string& dest_addr,
+ uint16_t dest_port,
+ bool verbose);
+
+ std::string dest_addr;
+ uint16_t dest_port;
+ bool verbose;
+ size_t m_num_reconnects_prev = 0;
+ Socket::TCPSendClient sock;
+ virtual void send_packet(const std::vector<uint8_t> &frame) override;
+ };
+
+ class PFTSpreader {
+ public:
+ using sender_sp = std::shared_ptr<i_sender>;
+ PFTSpreader(const pft_settings_t &conf, sender_sp sender);
+ sender_sp sender;
+ edi::PFT edi_pft;
+
+ void send_af_packet(const AFPacket &af_packet);
+ void tick(const std::chrono::steady_clock::time_point& now);
+
+ private:
+ // send_af_packet() and tick() are called from different threads, both
+ // are accessing m_pending_frames
+ std::mutex m_mutex;
+ std::map<std::chrono::steady_clock::time_point, edi::PFTFragment> m_pending_frames;
+ pft_settings_t settings;
+ size_t last_num_pft_fragments = 0;
+ };
+
+ std::vector<std::shared_ptr<PFTSpreader>> m_pft_spreaders;
};
-
}
-
diff --git a/contrib/fec/decode_rs.h b/contrib/fec/decode_rs.h
index c165cf3..647b885 100644
--- a/contrib/fec/decode_rs.h
+++ b/contrib/fec/decode_rs.h
@@ -145,15 +145,15 @@
count++;
}
if (count != no_eras) {
- printf("count = %d no_eras = %d\n lambda(x) is WRONG\n",count,no_eras);
+ fprintf(stderr, "count = %d no_eras = %d\n lambda(x) is WRONG\n",count,no_eras);
count = -1;
goto finish;
}
#if DEBUG >= 2
- printf("\n Erasure positions as determined by roots of Eras Loc Poly:\n");
+ fprintf(stderr, "\n Erasure positions as determined by roots of Eras Loc Poly:\n");
for (i = 0; i < count; i++)
- printf("%d ", loc[i]);
- printf("\n");
+ fprintf(stderr, "%d ", loc[i]);
+ fprintf(stderr, "\n");
#endif
#endif
}
@@ -227,7 +227,7 @@
continue; /* Not a root */
/* store root (index-form) and error location number */
#if DEBUG>=2
- printf("count %d root %d loc %d\n",count,i,k);
+ fprintf(stderr, "count %d root %d loc %d\n",count,i,k);
#endif
root[count] = i;
loc[count] = k;
@@ -279,7 +279,7 @@
}
#if DEBUG >= 1
if (den == 0) {
- printf("\n ERROR: denominator = 0\n");
+ fprintf(stderr, "\n ERROR: denominator = 0\n");
count = -1;
goto finish;
}
diff --git a/man/odr-audioenc.1 b/man/odr-audioenc.1
index 9d9089d..e070b11 100644
--- a/man/odr-audioenc.1
+++ b/man/odr-audioenc.1
@@ -153,7 +153,9 @@ Before starting, run the given script, and only start if it returns 0.
Enable PAD insertion and set PAD size in bytes.
.TP
\fB\-P\fR, \fB\-\-pad\-socket\fR=\fI\,IDENTIFIER\/\fR
-Use the given identifier to communicate with ODR\-PadEnc.
+Use the given identifier or path to communicate with ODR\-PadEnc.
+If it contains '/', it's treated as a full path, otherwise
+it's an identifier and sockets are created in /tmp/.
.TP
\fB\-l\fR, \fB\-\-level\fR
Show peak audio level indication.
diff --git a/src/AlsaInput.cpp b/src/AlsaInput.cpp
index 442304c..7d14eb0 100644
--- a/src/AlsaInput.cpp
+++ b/src/AlsaInput.cpp
@@ -21,6 +21,7 @@
#if HAVE_ALSA
#include "AlsaInput.h"
+#include "Log.h"
#include <cstdio>
#include <stdexcept>
#include <string>
@@ -51,7 +52,7 @@ void AlsaInput::m_init_alsa()
int err;
snd_pcm_hw_params_t *hw_params;
- fprintf(stderr, "Initialising ALSA...\n");
+ etiLog.level(info) << "Initialising ALSA...";
const int open_mode = 0;
@@ -104,7 +105,7 @@ void AlsaInput::m_init_alsa()
alsa_strerror(err) + ")");
}
- fprintf(stderr, "ALSA init done.\n");
+ etiLog.level(info) << "ALSA init done.";
}
ssize_t AlsaInput::m_read(uint8_t* buf, snd_pcm_uframes_t length)
@@ -115,11 +116,10 @@ ssize_t AlsaInput::m_read(uint8_t* buf, snd_pcm_uframes_t length)
if (err != (ssize_t)length) {
if (err < 0) {
- fprintf (stderr, "read from audio interface failed (%s)\n",
- snd_strerror(err));
+ etiLog.level(error) << "read from audio interface failed (" << snd_strerror(err) << ")";
}
else {
- fprintf(stderr, "short alsa read: %d\n", err);
+ etiLog.level(warn) << "short alsa read: " << err;
}
}
@@ -141,7 +141,7 @@ AlsaInputThreaded::~AlsaInputThreaded()
void AlsaInputThreaded::prepare()
{
if (m_fault) {
- fprintf(stderr, "Cannot start alsa input. Fault detected previsouly!\n");
+ etiLog.level(error) << "Cannot start alsa input. Fault detected previously!";
}
else {
m_init_alsa();
diff --git a/src/GSTInput.cpp b/src/GSTInput.cpp
index af27db1..811c13d 100644
--- a/src/GSTInput.cpp
+++ b/src/GSTInput.cpp
@@ -26,6 +26,7 @@
#include <cstring>
#include "GSTInput.h"
+#include "Log.h"
#include "config.h"
@@ -59,8 +60,8 @@ static void error_cb(GstBus *bus, GstMessage *msg, GSTData *data)
/* Print error details on the screen */
gst_message_parse_error(msg, &err, &debug_info);
- g_printerr("Error received from element %s: %s\n", GST_OBJECT_NAME (msg->src), err->message);
- g_printerr("Debugging information: %s\n", debug_info ? debug_info : "none");
+ etiLog.level(error) << "Error received from element " << GST_OBJECT_NAME (msg->src) << ": " << err->message;
+ etiLog.level(error) << "Debugging information: " << (debug_info ? debug_info : "none");
g_clear_error(&err);
g_free(debug_info);
}
@@ -282,7 +283,7 @@ void GSTInput::process()
{
GError *err = nullptr;
gst_message_parse_error(msg, &err, nullptr);
- fprintf(stderr, "GST error: %s\n", err->message);
+ etiLog.level(error) << "GST error: " << err->message;
g_error_free(err);
m_fault = true;
break;
diff --git a/src/JackInput.cpp b/src/JackInput.cpp
index 4d9a530..f5a63b5 100644
--- a/src/JackInput.cpp
+++ b/src/JackInput.cpp
@@ -28,6 +28,7 @@ extern "C" {
}
#include "JackInput.h"
+#include "Log.h"
#include <sys/time.h>
using namespace std;
@@ -60,7 +61,7 @@ void JackInput::prepare()
}
if (status & JackServerStarted) {
- fprintf(stderr, "JACK server started\n");
+ etiLog.level(info) << "JACK server started";
}
if (status & JackNameNotUnique) {
diff --git a/src/Outputs.cpp b/src/Outputs.cpp
index fd723f6..91f9181 100644
--- a/src/Outputs.cpp
+++ b/src/Outputs.cpp
@@ -18,6 +18,7 @@
*/
#include "Outputs.h"
+#include "Log.h"
#include <chrono>
#include <string>
#include <stdexcept>
@@ -75,7 +76,7 @@ ZMQ::~ZMQ() {}
void ZMQ::connect(const char *uri, const char *keyfile)
{
if (keyfile) {
- fprintf(stderr, "Enabling encryption\n");
+ etiLog.level(info) << "Enabling encryption";
int rc = readkey(keyfile, m_secretkey);
if (rc) {
@@ -130,7 +131,7 @@ bool ZMQ::write_frame(const uint8_t *buf, size_t len)
zmq::send_flags::dontwait);
}
catch (zmq::error_t& e) {
- fprintf(stderr, "ZeroMQ send error !\n");
+ etiLog.level(error) << "ZeroMQ send error !";
return false;
}
@@ -158,10 +159,8 @@ void EDI::add_udp_destination(const std::string& host, unsigned int port)
auto dest = make_shared<edi::udp_destination_t>();
dest->dest_addr = host;
dest->dest_port = port;
+ dest->pft_settings.enable_pft = true;
m_edi_conf.destinations.push_back(dest);
-
- // We cannot carry AF packets over UDP, because they would be too large.
- m_edi_conf.enable_pft = true;
}
void EDI::add_tcp_destination(const std::string& host, unsigned int port)
@@ -170,14 +169,15 @@ void EDI::add_tcp_destination(const std::string& host, unsigned int port)
dest->dest_addr = host;
dest->dest_port = port;
m_edi_conf.destinations.push_back(dest);
-
- m_edi_conf.dump = false;
}
void EDI::set_fec(int fec)
{
- m_edi_conf.enable_pft = true;
- m_edi_conf.fec = fec;
+ for (auto& edi_dest : m_edi_conf.destinations) {
+ if (auto udp_dest = dynamic_pointer_cast<edi::udp_destination_t>(edi_dest)) {
+ udp_dest->pft_settings.fec = fec;
+ }
+ }
}
bool EDI::enabled() const
diff --git a/src/PadInterface.cpp b/src/PadInterface.cpp
index a8a129c..8269004 100644
--- a/src/PadInterface.cpp
+++ b/src/PadInterface.cpp
@@ -56,7 +56,18 @@ void PadInterface::open(const std::string& pad_ident)
struct sockaddr_un claddr;
memset(&claddr, 0, sizeof(struct sockaddr_un));
claddr.sun_family = AF_UNIX;
- snprintf(claddr.sun_path, sizeof(claddr.sun_path), "/tmp/%s.audioenc", m_pad_ident.c_str());
+
+ // Check if pad_ident contains path separators - if so, treat as full path
+ size_t last_slash = m_pad_ident.find_last_of('/');
+ if (last_slash != std::string::npos) {
+ // It's a path - use directory and basename
+ std::string socket_dir = m_pad_ident.substr(0, last_slash);
+ std::string socket_base = m_pad_ident.substr(last_slash + 1);
+ snprintf(claddr.sun_path, sizeof(claddr.sun_path), "%s/%s.audioenc", socket_dir.c_str(), socket_base.c_str());
+ } else {
+ // It's just an identifier - use /tmp/ as before for backward compatibility
+ snprintf(claddr.sun_path, sizeof(claddr.sun_path), "/tmp/%s.audioenc", m_pad_ident.c_str());
+ }
if (unlink(claddr.sun_path) == -1 and errno != ENOENT) {
fprintf(stderr, "Unlinking of socket %s failed: %s\n", claddr.sun_path, strerror(errno));
}
@@ -84,7 +95,18 @@ vector<uint8_t> PadInterface::request(uint8_t padlen)
struct sockaddr_un claddr;
memset(&claddr, 0, sizeof(struct sockaddr_un));
claddr.sun_family = AF_UNIX;
- snprintf(claddr.sun_path, sizeof(claddr.sun_path), "/tmp/%s.padenc", m_pad_ident.c_str());
+
+ // Check if pad_ident contains path separators - if so, treat as full path
+ size_t last_slash = m_pad_ident.find_last_of('/');
+ if (last_slash != std::string::npos) {
+ // It's a path - use directory and basename
+ std::string socket_dir = m_pad_ident.substr(0, last_slash);
+ std::string socket_base = m_pad_ident.substr(last_slash + 1);
+ snprintf(claddr.sun_path, sizeof(claddr.sun_path), "%s/%s.padenc", socket_dir.c_str(), socket_base.c_str());
+ } else {
+ // It's just an identifier - use /tmp/ as before for backward compatibility
+ snprintf(claddr.sun_path, sizeof(claddr.sun_path), "/tmp/%s.padenc", m_pad_ident.c_str());
+ }
ssize_t ret = ::sendto(m_sock, packet, sizeof(packet), 0, (struct sockaddr*)&claddr, sizeof(struct sockaddr_un));
if (ret == -1) {
diff --git a/src/PadInterface.h b/src/PadInterface.h
index 9787d06..8b8f68f 100644
--- a/src/PadInterface.h
+++ b/src/PadInterface.h
@@ -30,8 +30,10 @@
class PadInterface {
public:
- /*! Create a new PAD data interface that binds to /tmp/pad_ident.audioenc and
- * communicates with ODR-PadEnc at /tmp/pad_ident.padenc
+ /*! Create a new PAD data interface. If pad_ident contains path separators,
+ * it is treated as a full path (creates pad_ident.audioenc and communicates
+ * with pad_ident.padenc). Otherwise, it is treated as an identifier and
+ * binds to /tmp/pad_ident.audioenc and communicates with /tmp/pad_ident.padenc
*/
void open(const std::string &pad_ident);
diff --git a/src/VLCInput.cpp b/src/VLCInput.cpp
index ef4cfc4..e3e815e 100644
--- a/src/VLCInput.cpp
+++ b/src/VLCInput.cpp
@@ -23,8 +23,10 @@
#include <chrono>
#include <algorithm>
#include <functional>
+#include <cstdarg>
#include "VLCInput.h"
+#include "Log.h"
#include "config.h"
@@ -122,6 +124,61 @@ void handleVLCExit(void* opaque)
((VLCInput*)opaque)->exit_cb();
}
+/*! VLC Log callback to route VLC messages through etiLog */
+void handleVLCLog(void* data, int level, const libvlc_log_t* ctx, const char* fmt, va_list args)
+{
+ // Map VLC log levels to etiLog levels
+ log_level_t etiLogLevel;
+ switch (level) {
+ case LIBVLC_DEBUG:
+ etiLogLevel = debug;
+ break;
+ case LIBVLC_NOTICE:
+ etiLogLevel = info;
+ break;
+ case LIBVLC_WARNING:
+ etiLogLevel = warn;
+ break;
+ case LIBVLC_ERROR:
+ etiLogLevel = error;
+ break;
+ default:
+ etiLogLevel = debug; // Default to debug for unknown levels
+ break;
+ }
+
+ // Format the message using vsnprintf
+ char buffer[1024];
+ int ret = vsnprintf(buffer, sizeof(buffer), fmt, args);
+
+ if (ret > 0) {
+ // Get module and object information from context if available
+ const char* module = nullptr;
+ const char* file = nullptr;
+ unsigned line = 0;
+ const char* object_type = nullptr;
+ const char* header = nullptr;
+ uintptr_t object_id = 0;
+
+ libvlc_log_get_context(ctx, &module, &file, &line);
+ libvlc_log_get_object(ctx, &object_type, &header, &object_id);
+
+ // Use module name if available, otherwise use object type
+ const char* identifier = nullptr;
+ if (module && strlen(module) > 0) {
+ identifier = module;
+ } else if (object_type && strlen(object_type) > 0) {
+ identifier = object_type;
+ }
+
+ if (identifier) {
+ etiLog.level(etiLogLevel) << "VLC [" << identifier << "] " << buffer;
+ } else {
+ etiLog.level(etiLogLevel) << "VLC " << buffer;
+ }
+ }
+}
+
VLCInput::~VLCInput()
{
m_running = false;
@@ -142,20 +199,20 @@ void VLCInput::prepare()
throw runtime_error("Cannot start VLC input. Fault detected previously!");
}
- fprintf(stderr, "Initialising VLC...\n");
+ etiLog.level(info) << "Initialising VLC...";
long long int handleStream_address;
long long int prepareRender_address;
switch (check_vlc_uses_size_t()) {
case vlc_data_type_e::vlc_uses_unsigned_int:
- fprintf(stderr, "You are using VLC with unsigned int size callbacks\n");
+ etiLog.level(info) << "You are using VLC with unsigned int size callbacks";
handleStream_address = (long long int)(intptr_t)(void*)&handleStream;
prepareRender_address = (long long int)(intptr_t)(void*)&prepareRender;
break;
case vlc_data_type_e::vlc_uses_size_t:
- fprintf(stderr, "You are using VLC with size_t size callbacks\n");
+ etiLog.level(info) << "You are using VLC with size_t size callbacks";
handleStream_address = (long long int)(intptr_t)(void*)&handleStream_size_t;
prepareRender_address = (long long int)(intptr_t)(void*)&prepareRender_size_t;
@@ -179,9 +236,9 @@ void VLCInput::prepare()
back_inserter(vlc_args));
if (m_verbosity) {
- fprintf(stderr, "Initialising VLC with options:\n");
+ etiLog.level(info) << "Initialising VLC with options:";
for (const auto& arg : vlc_args) {
- fprintf(stderr, " %s\n", arg.c_str());
+ etiLog.level(info) << " " << arg;
}
}
@@ -197,6 +254,9 @@ void VLCInput::prepare()
throw runtime_error("VLC initialisation failed");
}
+ // Set up VLC log callback to route messages through etiLog
+ libvlc_log_set(m_vlc, handleVLCLog, this);
+
libvlc_set_exit_handler(m_vlc, handleVLCExit, this);
// Load the media
@@ -224,7 +284,7 @@ void VLCInput::prepare()
(long long int)(intptr_t)this);
if (m_verbosity) {
- fprintf(stderr, "Setting VLC media option: %s\n", smem_options);
+ etiLog.level(debug) << "Setting VLC media option: " << smem_options;
}
libvlc_media_add_option(m, smem_options);
@@ -296,14 +356,14 @@ void VLCInput::exit_cb()
if (m_running) {
std::lock_guard<std::mutex> lock(m_queue_mutex);
- fprintf(stderr, "VLC exit, restarting...\n");
+ etiLog.level(warn) << "VLC exit, restarting...";
cleanup();
m_current_buf.clear();
prepare();
}
else {
- fprintf(stderr, "VLC exit.\n");
+ etiLog.level(info) << "VLC exit.";
}
}
@@ -342,8 +402,7 @@ void VLCInput::postRender_cb(unsigned int channels, size_t size)
}
}
else {
- fprintf(stderr, "Got invalid number of channels back from VLC! "
- "requested: %d, got %d\n", m_channels, channels);
+ etiLog.level(error) << "Got invalid number of channels back from VLC! requested: " << m_channels << ", got " << channels;
m_running = false;
m_fault = true;
}
@@ -388,7 +447,7 @@ ssize_t VLCInput::m_read(uint8_t* buf, size_t length)
libvlc_media_t *media = libvlc_media_player_get_media(m_mp);
if (!media) {
- fprintf(stderr, "VLC no media\n");
+ etiLog.level(error) << "VLC no media";
err = -1;
break;
}
@@ -397,7 +456,7 @@ ssize_t VLCInput::m_read(uint8_t* buf, size_t length)
if (!(st == libvlc_Opening ||
st == libvlc_Buffering ||
st == libvlc_Playing) ) {
- fprintf(stderr, "VLC state is %d\n", st);
+ etiLog.level(warn) << "VLC state is " << st;
err = -1;
break;
}
@@ -505,8 +564,8 @@ vlc_data_type_e check_vlc_uses_size_t()
}
}
- fprintf(stderr, "Error detecting VLC version!\n");
- fprintf(stderr, " you are using %s\n", libvlc_get_version());
+ etiLog.level(error) << "Error detecting VLC version!";
+ etiLog.level(error) << " you are using " << libvlc_get_version();
throw runtime_error("Cannot identify VLC datatype!");
}
diff --git a/src/odr-audioenc.cpp b/src/odr-audioenc.cpp
index f65c3e4..98e9c34 100644
--- a/src/odr-audioenc.cpp
+++ b/src/odr-audioenc.cpp
@@ -50,6 +50,7 @@
*/
#include "config.h"
+#include "Log.h"
#include "PadInterface.h"
#include "AlsaInput.h"
#include "FileInput.h"
@@ -207,7 +208,9 @@ static void usage(const char* name)
" --startup-check=SCRIPT_PATH Before starting, run the given script, and only start if it returns 0.\n"
" -k, --secret-key=FILE Enable ZMQ encryption with the given secret key.\n"
" -p, --pad=BYTES Enable PAD insertion and set PAD size in bytes.\n"
- " -P, --pad-socket=IDENTIFIER Use the given identifier to communicate with ODR-PadEnc.\n"
+ " -P, --pad-socket=IDENTIFIER Use the given identifier or path to communicate with ODR-PadEnc.\n"
+ " If it contains '/', it's treated as a full path, otherwise\n"
+ " it's an identifier and sockets are created in /tmp/.\n"
" -l, --level Show peak audio level indication.\n"
" -S, --stats=SOCKET_NAME Connect to the specified UNIX Datagram socket and send statistics.\n"
" This allows external tools to collect audio and drift compensation stats.\n"
@@ -236,13 +239,13 @@ static int prepare_aac_encoder(
case 1: mode = MODE_1; break;
case 2: mode = MODE_2; break;
default:
- fprintf(stderr, "Unsupported channels number %d\n", channels);
+ etiLog.level(error) << "Unsupported channels number " << channels;
return 1;
}
if (aacEncOpen(encoder, 0x01|0x02|0x04, channels) != AACENC_OK) {
- fprintf(stderr, "Unable to open encoder\n");
+ etiLog.level(error) << "Unable to open encoder";
return 1;
}
@@ -260,35 +263,34 @@ static int prepare_aac_encoder(
}
}
- fprintf(stderr, "Using %d subchannels. AAC type: %s%s%s. channels=%d, sample_rate=%d\n",
- subchannel_index,
- *aot == AOT_DABPLUS_PS ? "HE-AAC v2" : "",
- *aot == AOT_DABPLUS_SBR ? "HE-AAC" : "",
- *aot == AOT_DABPLUS_AAC_LC ? "AAC-LC" : "",
- channels, sample_rate);
+ etiLog.level(info) << "Using " << subchannel_index << " subchannels. AAC type: "
+ << (*aot == AOT_DABPLUS_PS ? "HE-AAC v2" : "")
+ << (*aot == AOT_DABPLUS_SBR ? "HE-AAC" : "")
+ << (*aot == AOT_DABPLUS_AAC_LC ? "AAC-LC" : "")
+ << ". channels=" << channels << ", sample_rate=" << sample_rate;
if (aacEncoder_SetParam(*encoder, AACENC_AOT, *aot) != AACENC_OK) {
- fprintf(stderr, "Unable to set the AOT\n");
+ etiLog.level(error) << "Unable to set the AOT";
return 1;
}
if (aacEncoder_SetParam(*encoder, AACENC_SAMPLERATE, sample_rate) != AACENC_OK) {
- fprintf(stderr, "Unable to set the sample rate\n");
+ etiLog.level(error) << "Unable to set the sample rate";
return 1;
}
if (aacEncoder_SetParam(*encoder, AACENC_CHANNELMODE, mode) != AACENC_OK) {
- fprintf(stderr, "Unable to set the channel mode\n");
+ etiLog.level(error) << "Unable to set the channel mode";
return 1;
}
if (aacEncoder_SetParam(*encoder, AACENC_CHANNELORDER, 1) != AACENC_OK) {
- fprintf(stderr, "Unable to set the wav channel order\n");
+ etiLog.level(error) << "Unable to set the wav channel order";
return 1;
}
if (aacEncoder_SetParam(*encoder, AACENC_GRANULE_LENGTH, 960) != AACENC_OK) {
- fprintf(stderr, "Unable to set the granule length\n");
+ etiLog.level(error) << "Unable to set the granule length";
return 1;
}
if (aacEncoder_SetParam(*encoder, AACENC_TRANSMUX, TT_DABPLUS) != AACENC_OK) {
- fprintf(stderr, "Unable to set the RAW transmux\n");
+ etiLog.level(error) << "Unable to set the RAW transmux";
return 1;
}
@@ -299,33 +301,33 @@ static int prepare_aac_encoder(
}*/
- fprintf(stderr, "AAC bitrate set to: %d\n", subchannel_index*8000);
+ etiLog.level(info) << "AAC bitrate set to: " << (subchannel_index*8000);
if (aacEncoder_SetParam(*encoder, AACENC_BITRATE, subchannel_index*8000) != AACENC_OK) {
- fprintf(stderr, "Unable to set the bitrate\n");
+ etiLog.level(error) << "Unable to set the bitrate";
return 1;
}
if (aacEncoder_SetParam(*encoder, AACENC_AFTERBURNER, afterburner) != AACENC_OK) {
- fprintf(stderr, "Unable to set the afterburner mode\n");
+ etiLog.level(error) << "Unable to set the afterburner mode";
return 1;
}
if (!afterburner) {
- fprintf(stderr, "Warning: Afterburned disabled!\n");
+ etiLog.level(warn) << "Warning: Afterburner disabled!";
}
if (bandwidth > 0) {
- fprintf(stderr, "Setting bandwidth is %d\n", bandwidth);
+ etiLog.level(info) << "Setting bandwidth to " << bandwidth;
if (aacEncoder_SetParam(*encoder, AACENC_BANDWIDTH, bandwidth) != AACENC_OK) {
- fprintf(stderr, "Unable to set bandwidth mode\n");
+ etiLog.level(error) << "Unable to set bandwidth mode";
return 1;
}
}
if (aacEncEncode(*encoder, nullptr, nullptr, nullptr, nullptr) != AACENC_OK) {
- fprintf(stderr, "Unable to initialize the encoder\n");
+ etiLog.level(error) << "Unable to initialize the encoder";
return 1;
}
const uint32_t bw = aacEncoder_GetParam(*encoder, AACENC_BANDWIDTH);
- fprintf(stderr, "Bandwidth is %d\n", bw);
+ etiLog.level(info) << "Bandwidth is " << bw;
return 0;
}
@@ -461,7 +463,7 @@ public:
vector<string> edi_output_uris;
void *rs_handler = nullptr;
- AACENC_InfoStruct info = { 0 };
+ AACENC_InfoStruct aac_info = { 0 };
int aot = AOT_NONE;
string decode_wavfilename;
@@ -526,11 +528,11 @@ int AudioEnc::run()
#endif
if (num_inputs == 0) {
- fprintf(stderr, "No input defined!\n");
+ etiLog.level(error) << "No input defined!";
return 1;
}
else if (num_inputs > 1) {
- fprintf(stderr, "You must define only one possible input, not several!\n");
+ etiLog.level(error) << "You must define only one possible input, not several!";
return 1;
}
@@ -542,13 +544,12 @@ int AudioEnc::run()
int subchannel_index = bitrate / 8;
if (subchannel_index < 1 || subchannel_index > 24) {
- fprintf(stderr, "Bad subchannel index: %d, must be between 1 and 24. Try other bitrate.\n",
- subchannel_index);
+ etiLog.level(error) << "Bad subchannel index: " << subchannel_index << ", must be between 1 and 24. Try other bitrate.";
return 1;
}
if ( ! (sample_rate == 32000 || sample_rate == 48000)) {
- fprintf(stderr, "Invalid sample rate. Possible values are: 32000, 48000.\n");
+ etiLog.level(error) << "Invalid sample rate. Possible values are: 32000, 48000.";
return 1;
}
}
@@ -558,25 +559,25 @@ int AudioEnc::run()
}
if ( ! (sample_rate == 24000 || sample_rate == 48000)) {
- fprintf(stderr, "Invalid sample rate. Possible values are: 24000, 48000.\n");
+ etiLog.level(error) << "Invalid sample rate. Possible values are: 24000, 48000.";
return 1;
}
}
if (padlen < 0 or padlen > 255) {
- fprintf(stderr, "Invalid PAD length specified\n");
+ etiLog.level(error) << "Invalid PAD length specified";
return 1;
}
if (output_uris.empty() and edi_output_uris.empty()) {
- fprintf(stderr, "No output defined\n");
+ etiLog.level(error) << "No output defined";
return 1;
}
for (const auto& uri : output_uris) {
if (uri == "-") {
if (file_output) {
- fprintf(stderr, "You can't write to more than one file!\n");
+ etiLog.level(error) << "You can't write to more than one file!";
return 1;
}
file_output = make_shared<Output::File>(stdout);
@@ -594,7 +595,7 @@ int AudioEnc::run()
}
else { // We assume it's a file name
if (file_output) {
- fprintf(stderr, "You can't write to more than one file!\n");
+ etiLog.level(error) << "You can't write to more than one file!";
return 1;
}
file_output = make_shared<Output::File>(uri.c_str());
@@ -621,11 +622,11 @@ int AudioEnc::run()
}
}
else {
- fprintf(stderr, "Invalid EDI URL host!\n");
+ etiLog.level(error) << "Invalid EDI URL host!";
}
}
else {
- fprintf(stderr, "Invalid EDI protocol!\n");
+ etiLog.level(error) << "Invalid EDI protocol!";
}
}
@@ -650,10 +651,10 @@ int AudioEnc::run()
if (padlen != 0 and not pad_ident.empty()) {
pad_intf.open(pad_ident);
- fprintf(stderr, "PAD socket opened\n");
+ etiLog.level(info) << "PAD socket opened";
}
else {
- fprintf(stderr, "PAD disabled because neither PAD length nor PAD identifier given\n");
+ etiLog.level(info) << "PAD disabled because neither PAD length nor PAD identifier given";
}
vec_u8 input_buf;
@@ -662,20 +663,18 @@ int AudioEnc::run()
int subchannel_index = bitrate / 8;
if (prepare_aac_encoder(&encoder, subchannel_index, channels,
sample_rate, afterburner, bandwidth, &aot) != 0) {
- fprintf(stderr, "Encoder preparation failed\n");
+ etiLog.level(error) << "Encoder preparation failed";
return 1;
}
- if (aacEncInfo(encoder, &info) != AACENC_OK) {
- fprintf(stderr, "Unable to get the encoder info\n");
+ if (aacEncInfo(encoder, &aac_info) != AACENC_OK) {
+ etiLog.level(error) << "Unable to get the encoder info";
return 1;
}
// Each DAB+ frame will need input_size audio bytes
- const int input_size = channels * BYTES_PER_SAMPLE * info.frameLength;
- fprintf(stderr, "DAB+ Encoding: framelen=%d (%dB)\n",
- info.frameLength,
- input_size);
+ const int input_size = channels * BYTES_PER_SAMPLE * aac_info.frameLength;
+ etiLog.level(info) << "DAB+ Encoding: framelen=" << aac_info.frameLength << " (" << input_size << "B)";
input_buf.resize(input_size);
@@ -702,8 +701,7 @@ int AudioEnc::run()
dab_channel_mode = 'm'; // Default to mono
}
else {
- fprintf(stderr, "Unsupported channels number %d\n",
- channels);
+ etiLog.level(error) << "Unsupported channels number " << channels;
return 1;
}
}
@@ -722,14 +720,14 @@ int AudioEnc::run()
}
if (err) {
- fprintf(stderr, "libtoolame-dab init failed: %d\n", err);
+ etiLog.level(error) << "libtoolame-dab init failed: " << err;
return err;
}
input_buf.resize(channels * 1152 * BYTES_PER_SAMPLE);
if (not decode_wavfilename.empty()) {
- fprintf(stderr, "--decode not supported for DAB\n");
+ etiLog.level(error) << "--decode not supported for DAB";
return 1;
}
}
@@ -741,7 +739,7 @@ int AudioEnc::run()
stats_publisher.reset(s);
}
catch (const runtime_error& e) {
- fprintf(stderr, "Failed to initialise Stats Publisher: %s", e.what());
+ etiLog.level(error) << "Failed to initialise Stats Publisher: " << e.what();
if (s != nullptr) {
delete s;
}
@@ -777,7 +775,7 @@ int AudioEnc::run()
input = initialise_input();
}
catch (const runtime_error& e) {
- fprintf(stderr, "Initialising input triggered exception: %s\n", e.what());
+ etiLog.level(error) << "Initialising input triggered exception: " << e.what();
return 1;
}
@@ -796,18 +794,18 @@ int AudioEnc::run()
case encoder_selection_t::toolame_dab:
outbuf_size = 4092;
outbuf.resize(outbuf_size);
- fprintf(stderr, "Setting outbuf size to %zu\n", outbuf.size());
+ etiLog.level(info) << "Setting outbuf size to " << outbuf.size();
break;
}
vector<uint8_t> pad_buf(padlen + 1);
if (restart_on_fault) {
- fprintf(stderr, "Autorestart has been deprecated and will be removed in the future!\n");
+ etiLog.level(warn) << "Autorestart has been deprecated and will be removed in the future!";
this_thread::sleep_for(chrono::seconds(2));
}
- fprintf(stderr, "Starting encoding\n");
+ etiLog.level(info) << "Starting encoding";
int retval = 0;
int send_error_count = 0;
@@ -846,7 +844,7 @@ int AudioEnc::run()
copy(pad_data.begin(), pad_data.end(), pad_buf.begin());
}
else {
- fprintf(stderr, "Incorrect PAD length received: %zu expected %d\n", pad_data.size(), padlen + 1);
+ etiLog.level(error) << "Incorrect PAD length received: " << pad_data.size() << " expected " << (padlen + 1);
break;
}
}
@@ -873,13 +871,13 @@ int AudioEnc::run()
*/
if (input->fault_detected()) {
- fprintf(stderr, "Detected fault in input!\n");
+ etiLog.level(warn) << "Detected fault in input!";
if (restart_on_fault) {
fault_counter++;
if (fault_counter >= MAX_FAULTS_ALLOWED) {
- fprintf(stderr, "Maximum number of input faults reached, aborting");
+ etiLog.level(error) << "Maximum number of input faults reached, aborting";
retval = 5;
break;
}
@@ -888,7 +886,7 @@ int AudioEnc::run()
input = initialise_input();
}
catch (const runtime_error& e) {
- fprintf(stderr, "Initialising input triggered exception: %s\n", e.what());
+ etiLog.level(error) << "Initialising input triggered exception: " << e.what();
retval = 5;
break;
}
@@ -902,7 +900,7 @@ int AudioEnc::run()
}
if (not input->read_source(input_buf.size())) {
- fprintf(stderr, "End of input reached\n");
+ etiLog.level(info) << "End of input reached";
retval = 0;
break;
}
@@ -926,7 +924,7 @@ int AudioEnc::run()
const auto elapsed = chrono::duration_cast<chrono::seconds>(
now - timepoint_last_received_sample);
if (elapsed.count() > 60) {
- fprintf(stderr, "Underruns for 60s, aborting!\n");
+ etiLog.level(error) << "Underruns for 60s, aborting!";
return 1;
}
}
@@ -957,13 +955,13 @@ int AudioEnc::run()
if (bytes_from_queue < read_bytes) {
// queue timeout occurred
- fprintf(stderr, "Detected fault in input! No data in time.\n");
+ etiLog.level(warn) << "Detected fault in input! No data in time.";
if (restart_on_fault) {
fault_counter++;
if (fault_counter >= MAX_FAULTS_ALLOWED) {
- fprintf(stderr, "Maximum number of input faults reached, aborting");
+ etiLog.level(error) << "Maximum number of input faults reached, aborting";
retval = 5;
break;
}
@@ -972,7 +970,7 @@ int AudioEnc::run()
input = initialise_input();
}
catch (const runtime_error& e) {
- fprintf(stderr, "Initialising input triggered exception: %s\n", e.what());
+ etiLog.level(error) << "Initialising input triggered exception: " << e.what();
return 1;
}
@@ -1011,7 +1009,7 @@ int AudioEnc::run()
bool success = write_icy_to_file(text, icytext_file, icytext_dlplus);
if (not success) {
- fprintf(stderr, "Failed to write ICY Text\n");
+ etiLog.level(warn) << "Failed to write ICY Text";
}
}
@@ -1068,8 +1066,7 @@ int AudioEnc::run()
measured_silence_ms += frame_time_msec;
if (measured_silence_ms > 1000*silence_timeout) {
- fprintf(stderr, "Silence detected for %d seconds, aborting.\n",
- silence_timeout);
+ etiLog.level(info) << "Silence detected for " << silence_timeout << " seconds, aborting.";
retval = 2;
break;
}
@@ -1121,10 +1118,10 @@ int AudioEnc::run()
if ((err = aacEncEncode(encoder, &in_buf, &out_buf, &in_args, &out_args))
!= AACENC_OK) {
if (err == AACENC_ENCODE_EOF) {
- fprintf(stderr, "encoder error: EOF reached\n");
+ etiLog.level(info) << "encoder error: EOF reached";
break;
}
- fprintf(stderr, "Encoding failed (%d)\n", err);
+ etiLog.level(error) << "Encoding failed (" << err << ")";
retval = 3;
break;
}
@@ -1151,7 +1148,7 @@ int AudioEnc::run()
}
}
else {
- fprintf(stderr, "INTERNAL ERROR! invalid number of channels\n");
+ etiLog.level(error) << "INTERNAL ERROR! invalid number of channels";
}
if (read_bytes) {
@@ -1167,7 +1164,7 @@ int AudioEnc::run()
decoder->decode_frame(outbuf.data(), numOutBytes);
}
catch (runtime_error &e) {
- fprintf(stderr, "Decoding failed with: %s\n", e.what());
+ etiLog.level(error) << "Decoding failed with: " << e.what();
return 1;
}
}
@@ -1181,8 +1178,7 @@ int AudioEnc::run()
// Our timing code depends on this
if (calls != enc_calls_per_output) {
- fprintf(stderr, "INTERNAL ERROR! calls=%d, expected %d\n",
- calls, enc_calls_per_output);
+ etiLog.level(error) << "INTERNAL ERROR! calls=" << calls << ", expected " << enc_calls_per_output;
}
calls = 0;
@@ -1218,7 +1214,7 @@ int AudioEnc::run()
bool success = send_frame(frame.data(), frame.size(), peak_left, peak_right);
if (not success) {
- fprintf(stderr, "Send error !\n");
+ etiLog.level(error) << "Send error !";
send_error_count ++;
}
}
@@ -1232,7 +1228,7 @@ int AudioEnc::run()
}
if (send_error_count > 10) {
- fprintf(stderr, "Send failed ten times, aborting!\n");
+ etiLog.level(error) << "Send failed ten times, aborting!";
retval = 4;
break;
}
@@ -1275,7 +1271,7 @@ int AudioEnc::run()
fflush(stdout);
} while (read_bytes > 0);
- fprintf(stderr, "\n");
+ // Final newline removed - etiLog provides its own line endings
return retval;
}
@@ -1483,7 +1479,7 @@ int main(int argc, char *argv[])
audio_enc.dab_channel_mode == "d" or
audio_enc.dab_channel_mode == "j" or
audio_enc.dab_channel_mode == "m")) {
- fprintf(stderr, "Invalid DAB channel mode\n");
+ etiLog.level(error) << "Invalid DAB channel mode";
usage(argv[0]);
return 1;
}
@@ -1499,7 +1495,7 @@ int main(int argc, char *argv[])
/* The 32 character length restriction is arbitrary, but guarantees
* that the EDI packet will not grow too large */
if (audio_enc.identifier.size() > 32) {
- fprintf(stderr, "Output Identifier too long!\n");
+ etiLog.level(error) << "Output Identifier too long!";
usage(argv[0]);
return 1;
}
@@ -1551,7 +1547,7 @@ int main(int argc, char *argv[])
}
break;
case 10:
- fprintf(stderr, "WARNING: the --vlc-gain option has been deprecated in favour of --audio-gain\n");
+ etiLog.level(warn) << "WARNING: the --vlc-gain option has been deprecated in favour of --audio-gain";
// fallthrough
case 'g':
audio_enc.gain_dB = std::stod(optarg);
@@ -1571,7 +1567,7 @@ int main(int argc, char *argv[])
#if HAVE_JACK
audio_enc.jack_name = optarg;
#else
- fprintf(stderr, "JACK disabled at compile time!\n");
+ etiLog.level(error) << "JACK disabled at compile time!";
return 1;
#endif
break;
@@ -1602,7 +1598,7 @@ int main(int argc, char *argv[])
audio_enc.die_on_silence = true;
}
else {
- fprintf(stderr, "Invalid silence timeout (%d) given!\n", audio_enc.silence_timeout);
+ etiLog.level(error) << "Invalid silence timeout (" << audio_enc.silence_timeout << ") given!";
return 1;
}
@@ -1628,7 +1624,7 @@ int main(int argc, char *argv[])
break;
#else
case 'v':
- fprintf(stderr, "VLC input not enabled at compile time!\n");
+ etiLog.level(error) << "VLC input not enabled at compile time!";
return 1;
#endif
case 'V':
@@ -1664,7 +1660,7 @@ int main(int argc, char *argv[])
return audio_enc.run();
}
catch (const std::runtime_error& e) {
- fprintf(stderr, "ODR-AudioEnc failed to start: %s\n", e.what());
+ etiLog.level(error) << "ODR-AudioEnc failed to start: " << e.what();
return 1;
}
}