aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--lib/ThreadsafeQueue.h54
-rw-r--r--src/DabMod.cpp90
-rw-r--r--src/DabModulator.cpp2
-rw-r--r--src/TimestampDecoder.cpp1
-rw-r--r--src/TimestampDecoder.h2
-rw-r--r--src/output/Dexter.cpp31
-rw-r--r--src/output/SDR.cpp65
-rw-r--r--src/output/SDR.h1
-rw-r--r--src/output/Soapy.cpp1
-rw-r--r--src/output/UHD.cpp3
10 files changed, 168 insertions, 82 deletions
diff --git a/lib/ThreadsafeQueue.h b/lib/ThreadsafeQueue.h
index 815dfe0..8b385d6 100644
--- a/lib/ThreadsafeQueue.h
+++ b/lib/ThreadsafeQueue.h
@@ -2,7 +2,7 @@
Copyright (C) 2007, 2008, 2009, 2010, 2011 Her Majesty the Queen in
Right of Canada (Communications Research Center Canada)
- Copyright (C) 2018
+ Copyright (C) 2023
Matthias P. Braendli, matthias.braendli@mpb.li
An implementation for a threadsafe queue, depends on C++11
@@ -32,6 +32,7 @@
#include <condition_variable>
#include <queue>
#include <utility>
+#include <cassert>
/* This queue is meant to be used by two threads. One producer
* that pushes elements into the queue, and one consumer that
@@ -69,7 +70,6 @@ public:
}
size_t queue_size = the_queue.size();
lock.unlock();
-
the_rx_notification.notify_one();
return queue_size;
@@ -93,11 +93,57 @@ public:
return queue_size;
}
+ struct push_overflow_result { bool overflowed; size_t new_size; };
+
+ /* Push one element into the queue, and if queue is
+ * full remove one element from the other end.
+ *
+ * max_size == 0 is not allowed.
+ *
+ * returns the new queue size and a flag if overflow occurred.
+ */
+ push_overflow_result push_overflow(T const& val, size_t max_size)
+ {
+ assert(max_size > 0);
+ std::unique_lock<std::mutex> lock(the_mutex);
+
+ bool overflow = false;
+ while (the_queue.size() >= max_size) {
+ overflow = true;
+ the_queue.pop();
+ }
+ the_queue.push(val);
+ const size_t queue_size = the_queue.size();
+ lock.unlock();
+
+ the_rx_notification.notify_one();
+
+ return {overflow, queue_size};
+ }
+
+ push_overflow_result push_overflow(T&& val, size_t max_size)
+ {
+ assert(max_size > 0);
+ std::unique_lock<std::mutex> lock(the_mutex);
+
+ bool overflow = false;
+ while (the_queue.size() >= max_size) {
+ overflow = true;
+ the_queue.pop();
+ }
+ the_queue.emplace(std::move(val));
+ const size_t queue_size = the_queue.size();
+ lock.unlock();
+
+ the_rx_notification.notify_one();
+
+ return {overflow, queue_size};
+ }
+
+
/* Push one element into the queue, but wait until the
* queue size goes below the threshold.
*
- * Notify waiting thread.
- *
* returns the new queue size.
*/
size_t push_wait_if_full(T const& val, size_t threshold)
diff --git a/src/DabMod.cpp b/src/DabMod.cpp
index 4b4fda0..e5436ad 100644
--- a/src/DabMod.cpp
+++ b/src/DabMod.cpp
@@ -25,6 +25,7 @@
along with ODR-DabMod. If not, see <http://www.gnu.org/licenses/>.
*/
+#include <fftw3.h>
#ifdef HAVE_CONFIG_H
# include "config.h"
#endif
@@ -342,6 +343,27 @@ int launch_modulator(int argc, char* argv[])
printModSettings(mod_settings);
+ {
+ // This is mostly useful on ARM systems where FFTW planning takes some time. If we do it here
+ // it will be done before the modulator starts up
+ etiLog.level(debug) << "Running FFTW planning...";
+ constexpr size_t fft_size = 2048; // Transmission Mode I. If different, it'll recalculate on OfdmGenerator
+ // initialisation
+ auto *fft_in = (fftwf_complex*)fftwf_malloc(sizeof(fftwf_complex) * fft_size);
+ auto *fft_out = (fftwf_complex*)fftwf_malloc(sizeof(fftwf_complex) * fft_size);
+ if (fft_in == nullptr or fft_out == nullptr) {
+ throw std::runtime_error("FFTW malloc failed");
+ }
+ fftwf_set_timelimit(2);
+ fftwf_plan plan = fftwf_plan_dft_1d(fft_size, fft_in, fft_out, FFTW_FORWARD, FFTW_MEASURE);
+ fftwf_destroy_plan(plan);
+ plan = fftwf_plan_dft_1d(fft_size, fft_in, fft_out, FFTW_BACKWARD, FFTW_MEASURE);
+ fftwf_destroy_plan(plan);
+ fftwf_free(fft_in);
+ fftwf_free(fft_out);
+ etiLog.level(debug) << "FFTW planning done.";
+ }
+
shared_ptr<FormatConverter> format_converter;
if (mod_settings.useFileOutput and
(mod_settings.fileOutputFormat == "s8" or
@@ -563,48 +585,46 @@ static run_modulator_state_t run_modulator(const mod_settings_t& mod_settings, m
ts = m.ediInput->ediReader.getTimestamp();
}
- bool fct_good = false;
- if (last_eti_fct == -1) {
- if (fp != 0) {
- // Do not start the flowgraph before we get to FP 0
- // to ensure all blocks are properly aligned.
- if (m.ediInput) {
- m.ediInput->ediReader.clearFrame();
- }
- continue;
- }
- else {
- fct_good = true;
- }
+ // timestamp is good if we run unsynchronised, or if margin is sufficient
+ bool ts_good = not mod_settings.sdr_device_config.enableSync or
+ (ts.timestamp_valid and ts.offset_to_system_time() > 0.2);
+
+ if (!ts_good) {
+ etiLog.level(warn) << "Modulator skipping frame " << fct <<
+ " TS " << (ts.timestamp_valid ? "valid" : "invalid") <<
+ " offset " << (ts.timestamp_valid ? ts.offset_to_system_time() : 0);
}
else {
- const unsigned expected_fct = (last_eti_fct + 1) % 250;
- if (fct == expected_fct) {
- fct_good = true;
+ bool modulate = true;
+ if (last_eti_fct == -1) {
+ if (fp != 0) {
+ // Do not start the flowgraph before we get to FP 0
+ // to ensure all blocks are properly aligned.
+ modulate = false;
+ }
+ else {
+ last_eti_fct = fct;
+ }
}
else {
- etiLog.level(info) << "ETI FCT discontinuity, expected " <<
- expected_fct << " received " << fct;
- if (m.ediInput) {
- m.ediInput->ediReader.clearFrame();
+ const unsigned expected_fct = (last_eti_fct + 1) % 250;
+ if (fct == expected_fct) {
+ last_eti_fct = fct;
+ }
+ else {
+ etiLog.level(info) << "ETI FCT discontinuity, expected " <<
+ expected_fct << " received " << fct;
+ if (m.ediInput) {
+ m.ediInput->ediReader.clearFrame();
+ }
+ return run_modulator_state_t::again;
}
- return run_modulator_state_t::again;
}
- }
- // timestamp is good if we run unsynchronised, or if margin is insufficient
- bool ts_good = not mod_settings.sdr_device_config.enableSync or
- (ts.timestamp_valid and ts.offset_to_system_time() > 1);
-
- if (fct_good and ts_good) {
- last_eti_fct = fct;
- m.framecount++;
- m.flowgraph->run();
- }
- else {
- etiLog.level(warn) << "Skipping frame " <<
- " TS " << (ts.timestamp_valid ? "valid" : "invalid") <<
- " offset " << (ts.timestamp_valid ? ts.offset_to_system_time() : 0);
+ if (modulate) {
+ m.framecount++;
+ m.flowgraph->run();
+ }
}
if (m.ediInput) {
diff --git a/src/DabModulator.cpp b/src/DabModulator.cpp
index 1f16d1d..3d8bd46 100644
--- a/src/DabModulator.cpp
+++ b/src/DabModulator.cpp
@@ -126,6 +126,7 @@ int DabModulator::process(Buffer* dataOut)
PDEBUG("DabModulator::process(dataOut: %p)\n", dataOut);
if (not myFlowgraph) {
+ etiLog.level(debug) << "Setting up DabModulator...";
const unsigned mode = m_settings.dabMode;
setMode(mode);
@@ -364,6 +365,7 @@ int DabModulator::process(Buffer* dataOut)
prev_plugin = p;
}
}
+ etiLog.level(debug) << "DabModulator set up.";
}
////////////////////////////////////////////////////////////////////
diff --git a/src/TimestampDecoder.cpp b/src/TimestampDecoder.cpp
index 674f32c..6e97af6 100644
--- a/src/TimestampDecoder.cpp
+++ b/src/TimestampDecoder.cpp
@@ -110,6 +110,7 @@ frame_timestamp TimestampDecoder::getTimestamp()
ts.fct = latestFCT;
ts.fp = latestFP;
+ ts.timestamp_offset = timestamp_offset;
ts.offset_changed = offset_changed;
offset_changed = false;
diff --git a/src/TimestampDecoder.h b/src/TimestampDecoder.h
index 2793e02..597b777 100644
--- a/src/TimestampDecoder.h
+++ b/src/TimestampDecoder.h
@@ -42,6 +42,8 @@ struct frame_timestamp
uint32_t timestamp_sec; // seconds in unix epoch
uint32_t timestamp_pps; // In units of 1/16384000 s
bool timestamp_valid = false;
+
+ double timestamp_offset = 0.0; // copy of the configured modulator offset
bool offset_changed = false;
frame_timestamp& operator+=(const double& diff);
diff --git a/src/output/Dexter.cpp b/src/output/Dexter.cpp
index cc10c57..4e24cfb 100644
--- a/src/output/Dexter.cpp
+++ b/src/output/Dexter.cpp
@@ -46,6 +46,8 @@ namespace Output {
static constexpr uint64_t DSP_CLOCK = 2048000uLL * 80;
+static constexpr uint64_t IIO_TIMEOUT_MS = 1000;
+
static constexpr size_t TRANSMISSION_FRAME_LEN = (2656 + 76 * 2552) * 4;
static constexpr size_t IIO_BUFFERS = 4;
static constexpr size_t IIO_BUFFER_LEN = TRANSMISSION_FRAME_LEN / IIO_BUFFERS;
@@ -53,7 +55,7 @@ static constexpr size_t IIO_BUFFER_LEN = TRANSMISSION_FRAME_LEN / IIO_BUFFERS;
static string get_iio_error(int err)
{
char dst[256];
- iio_strerror(err, dst, sizeof(dst));
+ iio_strerror(-err, dst, sizeof(dst));
return string(dst);
}
@@ -75,6 +77,11 @@ Dexter::Dexter(SDRDeviceConfig& config) :
throw std::runtime_error("Dexter: Unable to create iio scan context");
}
+ int r;
+ if ((r = iio_context_set_timeout(m_ctx, IIO_TIMEOUT_MS)) != 0) {
+ etiLog.level(error) << "Failed to set IIO timeout " << get_iio_error(r);
+ }
+
m_dexter_dsp_tx = iio_context_find_device(m_ctx, "dexter_dsp_tx");
if (!m_dexter_dsp_tx) {
throw std::runtime_error("Dexter: Unable to find dexter_dsp_tx iio device");
@@ -85,8 +92,6 @@ Dexter::Dexter(SDRDeviceConfig& config) :
throw std::runtime_error("Dexter: Unable to find ad9957_tx0 iio device");
}
- int r;
-
// TODO make DC offset configurable and add to RC
if ((r = iio_device_attr_write_longlong(m_dexter_dsp_tx, "dc0", 0)) != 0) {
etiLog.level(warn) << "Failed to set dexter_dsp_tx.dc0 = false: " << get_iio_error(r);
@@ -97,11 +102,11 @@ Dexter::Dexter(SDRDeviceConfig& config) :
}
if ((r = iio_device_attr_write_longlong(m_dexter_dsp_tx, "stream0_start_clks", 0)) != 0) {
- etiLog.level(warn) << "Failed to set dexter_dsp_tx.stream0_start_clks = 0: " << get_iio_error(r);
+ etiLog.level(error) << "Failed to set dexter_dsp_tx.stream0_start_clks = 0: " << get_iio_error(r);
}
if ((r = iio_device_attr_write_longlong(m_dexter_dsp_tx, "gain0", m_conf.txgain)) != 0) {
- etiLog.level(warn) << "Failed to set dexter_dsp_tx.stream0_start_clks = 0: " << get_iio_error(r);
+ etiLog.level(error) << "Failed to set dexter_dsp_tx.stream0_start_clks = 0: " << get_iio_error(r);
}
if (m_conf.sampleRate != 2048000) {
@@ -396,28 +401,28 @@ void Dexter::transmit_frame(const struct FrameData& frame)
etiLog.level(error) << "Failed to get dexter_dsp_tx.pps_clks: " << get_iio_error(r);
}
- const double margin = (double)((int64_t)frame_ts_clocks - pps_clks) / DSP_CLOCK;
+ const double margin_s = (double)((int64_t)frame_ts_clocks - pps_clks) / DSP_CLOCK;
- etiLog.level(debug) << "Dexter: TS CLK " <<
+ etiLog.level(debug) << "DEXTER FCT " << frame.ts.fct << " TS CLK " <<
((int64_t)frame.ts.timestamp_sec - (int64_t)m_utc_seconds_at_startup) * DSP_CLOCK << " + " <<
m_clock_count_at_startup << " + " <<
(uint64_t)frame.ts.timestamp_pps * TIMESTAMP_PPS_PER_DSP_CLOCKS << " = " <<
frame_ts_clocks << " DELTA " <<
- frame_ts_clocks << " - " << pps_clks << " = " << margin;
+ frame_ts_clocks << " - " << pps_clks << " = " << margin_s;
- // Ensure we hand the frame over to HW at least 0.2s before timestamp
- if (margin < 0.2) {
- etiLog.level(warn) << "Skip frame short margin " << margin;
+ // Ensure we hand the frame over to HW with a bit of margin
+ if (margin_s < 0.1) {
+ etiLog.level(warn) << "Skip frame short margin " << margin_s;
num_late++;
return;
}
-
if ((r = iio_device_attr_write_longlong(m_dexter_dsp_tx, "stream0_start_clks", frame_ts_clocks)) != 0) {
etiLog.level(warn) << "Skip frame, failed to set dexter_dsp_tx.stream0_start_clks = " << frame_ts_clocks << " : " << get_iio_error(r);
num_late++;
return;
}
+ m_require_timestamp_refresh = false;
}
channel_up();
@@ -469,6 +474,7 @@ void Dexter::underflow_read_process()
set_thread_name("dexter_underflow");
while (m_running) {
+ this_thread::sleep_for(chrono::seconds(1));
long long attr_value = 0;
int r = 0;
@@ -481,7 +487,6 @@ void Dexter::underflow_read_process()
underflows = underflows_new;
}
}
- this_thread::sleep_for(chrono::seconds(1));
}
m_running = false;
}
diff --git a/src/output/SDR.cpp b/src/output/SDR.cpp
index 2b6700f..0b3299a 100644
--- a/src/output/SDR.cpp
+++ b/src/output/SDR.cpp
@@ -2,7 +2,7 @@
Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010 Her Majesty the
Queen in Right of Canada (Communications Research Center Canada)
- Copyright (C) 2022
+ Copyright (C) 2023
Matthias P. Braendli, matthias.braendli@mpb.li
http://opendigitalradio.org
@@ -46,17 +46,13 @@ using namespace std;
namespace Output {
-// Maximum number of frames that can wait in frames
+// Maximum number of frames that can wait in frames, when not using synchronised transmission
static constexpr size_t FRAMES_MAX_SIZE = 8;
// If the timestamp is further in the future than
// 100 seconds, abort
static constexpr double TIMESTAMP_ABORT_FUTURE = 100;
-// Add a delay to increase buffers when
-// frames are too far in the future
-static constexpr double TIMESTAMP_MARGIN_FUTURE = 0.5;
-
SDR::SDR(SDRDeviceConfig& config, std::shared_ptr<SDRDevice> device) :
ModOutput(), ModMetadata(), RemoteControllable("sdr"),
m_config(config),
@@ -127,6 +123,10 @@ int SDR::process(Buffer *dataIn)
meta_vec_t SDR::process_metadata(const meta_vec_t& metadataIn)
{
+ double frame_duration_s =
+ chrono::duration_cast<chrono::milliseconds>(
+ transmission_frame_duration(m_config.dabMode)).count() / 1000.0;
+
if (m_device and m_running) {
FrameData frame;
frame.buf = std::move(m_frame);
@@ -163,9 +163,22 @@ meta_vec_t SDR::process_metadata(const meta_vec_t& metadataIn)
m_config.sampleRate);
}
- size_t num_frames = m_queue.push_wait_if_full(frame,
- FRAMES_MAX_SIZE);
- etiLog.log(trace, "SDR,push %zu", num_frames);
+
+ const auto max_size = m_config.enableSync ?
+ (frame.ts.timestamp_offset * 4.0) / frame_duration_s
+ : FRAMES_MAX_SIZE;
+
+ auto r = m_queue.push_overflow(std::move(frame), max_size);
+ etiLog.log(trace, "SDR,push %d %zu", r.overflowed, r.new_size);
+
+ if (r.overflowed) {
+ fprintf(stderr, "o");
+ }
+ else {
+ fprintf(stderr, ".");
+ }
+
+ num_queue_overflows += r.overflowed ? 1 : 0;
}
}
else {
@@ -186,16 +199,13 @@ void SDR::process_thread_entry()
last_tx_time_initialised = false;
- size_t last_num_underflows = 0;
- size_t pop_prebuffering = FRAMES_MAX_SIZE;
-
m_running.store(true);
try {
while (m_running.load()) {
struct FrameData frame;
etiLog.log(trace, "SDR,wait");
- m_queue.wait_and_pop(frame, pop_prebuffering);
+ m_queue.wait_and_pop(frame);
etiLog.log(trace, "SDR,pop");
if (m_running.load() == false) {
@@ -204,19 +214,6 @@ void SDR::process_thread_entry()
if (m_device) {
handle_frame(frame);
-
- const auto rs = m_device->get_run_statistics();
-
- /* Ensure we fill frames after every underrun and
- * at startup to reduce underrun likelihood. */
- if (last_num_underflows < rs.num_underruns) {
- pop_prebuffering = FRAMES_MAX_SIZE;
- }
- else {
- pop_prebuffering = 1;
- }
-
- last_num_underflows = rs.num_underruns;
}
}
}
@@ -302,6 +299,7 @@ void SDR::handle_frame(struct FrameData& frame)
}
if (frame.ts.offset_changed) {
+ etiLog.level(debug) << "TS offset changed";
m_device->require_timestamp_refresh();
}
@@ -354,9 +352,19 @@ void SDR::handle_frame(struct FrameData& frame)
" frame " << frame.ts.fct <<
", tx_second " << tx_second <<
", pps " << pps_offset;
+ m_device->require_timestamp_refresh();
return;
}
+ etiLog.level(debug) <<
+ "OutputSDR: Timestamp at FCT=" << frame.ts.fct << " offset: " <<
+ std::fixed <<
+ time_spec.get_real_secs() - device_time <<
+ " (" << device_time << ")"
+ " frame " << frame.ts.fct <<
+ ", tx_second " << tx_second <<
+ ", pps " << pps_offset;
+
if (time_spec.get_real_secs() > device_time + TIMESTAMP_ABORT_FUTURE) {
etiLog.level(error) <<
"OutputSDR: Timestamp way too far in the future at FCT=" << frame.ts.fct << " offset: " <<
@@ -367,9 +375,8 @@ void SDR::handle_frame(struct FrameData& frame)
}
if (m_config.muting) {
- etiLog.log(info,
- "OutputSDR: Muting FCT=%d requested",
- frame.ts.fct);
+ etiLog.log(info, "OutputSDR: Muting FCT=%d requested", frame.ts.fct);
+ m_device->require_timestamp_refresh();
return;
}
diff --git a/src/output/SDR.h b/src/output/SDR.h
index d7f7b46..5c3b599 100644
--- a/src/output/SDR.h
+++ b/src/output/SDR.h
@@ -88,6 +88,7 @@ class SDR : public ModOutput, public ModMetadata, public RemoteControllable {
bool last_tx_time_initialised = false;
uint32_t last_tx_second = 0;
uint32_t last_tx_pps = 0;
+ size_t num_queue_overflows = 0;
bool t_last_frame_initialised = false;
std::chrono::steady_clock::time_point t_last_frame;
diff --git a/src/output/Soapy.cpp b/src/output/Soapy.cpp
index c2c5046..c2ae88a 100644
--- a/src/output/Soapy.cpp
+++ b/src/output/Soapy.cpp
@@ -349,6 +349,7 @@ void Soapy::transmit_frame(const struct FrameData& frame)
SoapySDR::errToStr(ret_deact));
}
m_tx_stream_active = false;
+ m_require_timestamp_refresh = false;
}
if (eob_because_muting) {
diff --git a/src/output/UHD.cpp b/src/output/UHD.cpp
index 6e38f73..6810249 100644
--- a/src/output/UHD.cpp
+++ b/src/output/UHD.cpp
@@ -350,6 +350,7 @@ void UHD::transmit_frame(const struct FrameData& frame)
frame.ts.timestamp_valid and
m_require_timestamp_refresh and
samps_to_send <= usrp_max_num_samps );
+ m_require_timestamp_refresh = false;
//send a single packet
size_t num_tx_samps = m_tx_stream->send(
@@ -359,7 +360,7 @@ void UHD::transmit_frame(const struct FrameData& frame)
num_acc_samps += num_tx_samps;
- md_tx.time_spec += uhd::time_spec_t(0, num_tx_samps/m_conf.sampleRate);
+ md_tx.time_spec += uhd::time_spec_t::from_ticks(num_tx_samps, (double)m_conf.sampleRate);
if (num_tx_samps == 0) {
etiLog.log(warn,