diff options
author | Matthias P. Braendli <matthias.braendli@mpb.li> | 2023-02-21 18:13:26 +0100 |
---|---|---|
committer | Matthias P. Braendli <matthias.braendli@mpb.li> | 2023-02-21 18:13:26 +0100 |
commit | 05d3e26409a8f62c7f55851390d61d953f59489a (patch) | |
tree | 45c9428132e4f02f799ca81fce434fcf33fd1120 | |
parent | ba0c32703ee1af770deabdf864c806d974ea8206 (diff) | |
download | dabmod-05d3e26409a8f62c7f55851390d61d953f59489a.tar.gz dabmod-05d3e26409a8f62c7f55851390d61d953f59489a.tar.bz2 dabmod-05d3e26409a8f62c7f55851390d61d953f59489a.zip |
Change mod to output queue behaviour, fix SFN dexter + B200
-rw-r--r-- | lib/ThreadsafeQueue.h | 54 | ||||
-rw-r--r-- | src/DabMod.cpp | 90 | ||||
-rw-r--r-- | src/DabModulator.cpp | 2 | ||||
-rw-r--r-- | src/TimestampDecoder.cpp | 1 | ||||
-rw-r--r-- | src/TimestampDecoder.h | 2 | ||||
-rw-r--r-- | src/output/Dexter.cpp | 31 | ||||
-rw-r--r-- | src/output/SDR.cpp | 65 | ||||
-rw-r--r-- | src/output/SDR.h | 1 | ||||
-rw-r--r-- | src/output/Soapy.cpp | 1 | ||||
-rw-r--r-- | src/output/UHD.cpp | 3 |
10 files changed, 168 insertions, 82 deletions
diff --git a/lib/ThreadsafeQueue.h b/lib/ThreadsafeQueue.h index 815dfe0..8b385d6 100644 --- a/lib/ThreadsafeQueue.h +++ b/lib/ThreadsafeQueue.h @@ -2,7 +2,7 @@ Copyright (C) 2007, 2008, 2009, 2010, 2011 Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2018 + Copyright (C) 2023 Matthias P. Braendli, matthias.braendli@mpb.li An implementation for a threadsafe queue, depends on C++11 @@ -32,6 +32,7 @@ #include <condition_variable> #include <queue> #include <utility> +#include <cassert> /* This queue is meant to be used by two threads. One producer * that pushes elements into the queue, and one consumer that @@ -69,7 +70,6 @@ public: } size_t queue_size = the_queue.size(); lock.unlock(); - the_rx_notification.notify_one(); return queue_size; @@ -93,11 +93,57 @@ public: return queue_size; } + struct push_overflow_result { bool overflowed; size_t new_size; }; + + /* Push one element into the queue, and if queue is + * full remove one element from the other end. + * + * max_size == 0 is not allowed. + * + * returns the new queue size and a flag if overflow occurred. + */ + push_overflow_result push_overflow(T const& val, size_t max_size) + { + assert(max_size > 0); + std::unique_lock<std::mutex> lock(the_mutex); + + bool overflow = false; + while (the_queue.size() >= max_size) { + overflow = true; + the_queue.pop(); + } + the_queue.push(val); + const size_t queue_size = the_queue.size(); + lock.unlock(); + + the_rx_notification.notify_one(); + + return {overflow, queue_size}; + } + + push_overflow_result push_overflow(T&& val, size_t max_size) + { + assert(max_size > 0); + std::unique_lock<std::mutex> lock(the_mutex); + + bool overflow = false; + while (the_queue.size() >= max_size) { + overflow = true; + the_queue.pop(); + } + the_queue.emplace(std::move(val)); + const size_t queue_size = the_queue.size(); + lock.unlock(); + + the_rx_notification.notify_one(); + + return {overflow, queue_size}; + } + + /* Push one element into the queue, but wait until the * queue size goes below the threshold. * - * Notify waiting thread. - * * returns the new queue size. */ size_t push_wait_if_full(T const& val, size_t threshold) diff --git a/src/DabMod.cpp b/src/DabMod.cpp index 4b4fda0..e5436ad 100644 --- a/src/DabMod.cpp +++ b/src/DabMod.cpp @@ -25,6 +25,7 @@ along with ODR-DabMod. If not, see <http://www.gnu.org/licenses/>. */ +#include <fftw3.h> #ifdef HAVE_CONFIG_H # include "config.h" #endif @@ -342,6 +343,27 @@ int launch_modulator(int argc, char* argv[]) printModSettings(mod_settings); + { + // This is mostly useful on ARM systems where FFTW planning takes some time. If we do it here + // it will be done before the modulator starts up + etiLog.level(debug) << "Running FFTW planning..."; + constexpr size_t fft_size = 2048; // Transmission Mode I. If different, it'll recalculate on OfdmGenerator + // initialisation + auto *fft_in = (fftwf_complex*)fftwf_malloc(sizeof(fftwf_complex) * fft_size); + auto *fft_out = (fftwf_complex*)fftwf_malloc(sizeof(fftwf_complex) * fft_size); + if (fft_in == nullptr or fft_out == nullptr) { + throw std::runtime_error("FFTW malloc failed"); + } + fftwf_set_timelimit(2); + fftwf_plan plan = fftwf_plan_dft_1d(fft_size, fft_in, fft_out, FFTW_FORWARD, FFTW_MEASURE); + fftwf_destroy_plan(plan); + plan = fftwf_plan_dft_1d(fft_size, fft_in, fft_out, FFTW_BACKWARD, FFTW_MEASURE); + fftwf_destroy_plan(plan); + fftwf_free(fft_in); + fftwf_free(fft_out); + etiLog.level(debug) << "FFTW planning done."; + } + shared_ptr<FormatConverter> format_converter; if (mod_settings.useFileOutput and (mod_settings.fileOutputFormat == "s8" or @@ -563,48 +585,46 @@ static run_modulator_state_t run_modulator(const mod_settings_t& mod_settings, m ts = m.ediInput->ediReader.getTimestamp(); } - bool fct_good = false; - if (last_eti_fct == -1) { - if (fp != 0) { - // Do not start the flowgraph before we get to FP 0 - // to ensure all blocks are properly aligned. - if (m.ediInput) { - m.ediInput->ediReader.clearFrame(); - } - continue; - } - else { - fct_good = true; - } + // timestamp is good if we run unsynchronised, or if margin is sufficient + bool ts_good = not mod_settings.sdr_device_config.enableSync or + (ts.timestamp_valid and ts.offset_to_system_time() > 0.2); + + if (!ts_good) { + etiLog.level(warn) << "Modulator skipping frame " << fct << + " TS " << (ts.timestamp_valid ? "valid" : "invalid") << + " offset " << (ts.timestamp_valid ? ts.offset_to_system_time() : 0); } else { - const unsigned expected_fct = (last_eti_fct + 1) % 250; - if (fct == expected_fct) { - fct_good = true; + bool modulate = true; + if (last_eti_fct == -1) { + if (fp != 0) { + // Do not start the flowgraph before we get to FP 0 + // to ensure all blocks are properly aligned. + modulate = false; + } + else { + last_eti_fct = fct; + } } else { - etiLog.level(info) << "ETI FCT discontinuity, expected " << - expected_fct << " received " << fct; - if (m.ediInput) { - m.ediInput->ediReader.clearFrame(); + const unsigned expected_fct = (last_eti_fct + 1) % 250; + if (fct == expected_fct) { + last_eti_fct = fct; + } + else { + etiLog.level(info) << "ETI FCT discontinuity, expected " << + expected_fct << " received " << fct; + if (m.ediInput) { + m.ediInput->ediReader.clearFrame(); + } + return run_modulator_state_t::again; } - return run_modulator_state_t::again; } - } - // timestamp is good if we run unsynchronised, or if margin is insufficient - bool ts_good = not mod_settings.sdr_device_config.enableSync or - (ts.timestamp_valid and ts.offset_to_system_time() > 1); - - if (fct_good and ts_good) { - last_eti_fct = fct; - m.framecount++; - m.flowgraph->run(); - } - else { - etiLog.level(warn) << "Skipping frame " << - " TS " << (ts.timestamp_valid ? "valid" : "invalid") << - " offset " << (ts.timestamp_valid ? ts.offset_to_system_time() : 0); + if (modulate) { + m.framecount++; + m.flowgraph->run(); + } } if (m.ediInput) { diff --git a/src/DabModulator.cpp b/src/DabModulator.cpp index 1f16d1d..3d8bd46 100644 --- a/src/DabModulator.cpp +++ b/src/DabModulator.cpp @@ -126,6 +126,7 @@ int DabModulator::process(Buffer* dataOut) PDEBUG("DabModulator::process(dataOut: %p)\n", dataOut); if (not myFlowgraph) { + etiLog.level(debug) << "Setting up DabModulator..."; const unsigned mode = m_settings.dabMode; setMode(mode); @@ -364,6 +365,7 @@ int DabModulator::process(Buffer* dataOut) prev_plugin = p; } } + etiLog.level(debug) << "DabModulator set up."; } //////////////////////////////////////////////////////////////////// diff --git a/src/TimestampDecoder.cpp b/src/TimestampDecoder.cpp index 674f32c..6e97af6 100644 --- a/src/TimestampDecoder.cpp +++ b/src/TimestampDecoder.cpp @@ -110,6 +110,7 @@ frame_timestamp TimestampDecoder::getTimestamp() ts.fct = latestFCT; ts.fp = latestFP; + ts.timestamp_offset = timestamp_offset; ts.offset_changed = offset_changed; offset_changed = false; diff --git a/src/TimestampDecoder.h b/src/TimestampDecoder.h index 2793e02..597b777 100644 --- a/src/TimestampDecoder.h +++ b/src/TimestampDecoder.h @@ -42,6 +42,8 @@ struct frame_timestamp uint32_t timestamp_sec; // seconds in unix epoch uint32_t timestamp_pps; // In units of 1/16384000 s bool timestamp_valid = false; + + double timestamp_offset = 0.0; // copy of the configured modulator offset bool offset_changed = false; frame_timestamp& operator+=(const double& diff); diff --git a/src/output/Dexter.cpp b/src/output/Dexter.cpp index cc10c57..4e24cfb 100644 --- a/src/output/Dexter.cpp +++ b/src/output/Dexter.cpp @@ -46,6 +46,8 @@ namespace Output { static constexpr uint64_t DSP_CLOCK = 2048000uLL * 80; +static constexpr uint64_t IIO_TIMEOUT_MS = 1000; + static constexpr size_t TRANSMISSION_FRAME_LEN = (2656 + 76 * 2552) * 4; static constexpr size_t IIO_BUFFERS = 4; static constexpr size_t IIO_BUFFER_LEN = TRANSMISSION_FRAME_LEN / IIO_BUFFERS; @@ -53,7 +55,7 @@ static constexpr size_t IIO_BUFFER_LEN = TRANSMISSION_FRAME_LEN / IIO_BUFFERS; static string get_iio_error(int err) { char dst[256]; - iio_strerror(err, dst, sizeof(dst)); + iio_strerror(-err, dst, sizeof(dst)); return string(dst); } @@ -75,6 +77,11 @@ Dexter::Dexter(SDRDeviceConfig& config) : throw std::runtime_error("Dexter: Unable to create iio scan context"); } + int r; + if ((r = iio_context_set_timeout(m_ctx, IIO_TIMEOUT_MS)) != 0) { + etiLog.level(error) << "Failed to set IIO timeout " << get_iio_error(r); + } + m_dexter_dsp_tx = iio_context_find_device(m_ctx, "dexter_dsp_tx"); if (!m_dexter_dsp_tx) { throw std::runtime_error("Dexter: Unable to find dexter_dsp_tx iio device"); @@ -85,8 +92,6 @@ Dexter::Dexter(SDRDeviceConfig& config) : throw std::runtime_error("Dexter: Unable to find ad9957_tx0 iio device"); } - int r; - // TODO make DC offset configurable and add to RC if ((r = iio_device_attr_write_longlong(m_dexter_dsp_tx, "dc0", 0)) != 0) { etiLog.level(warn) << "Failed to set dexter_dsp_tx.dc0 = false: " << get_iio_error(r); @@ -97,11 +102,11 @@ Dexter::Dexter(SDRDeviceConfig& config) : } if ((r = iio_device_attr_write_longlong(m_dexter_dsp_tx, "stream0_start_clks", 0)) != 0) { - etiLog.level(warn) << "Failed to set dexter_dsp_tx.stream0_start_clks = 0: " << get_iio_error(r); + etiLog.level(error) << "Failed to set dexter_dsp_tx.stream0_start_clks = 0: " << get_iio_error(r); } if ((r = iio_device_attr_write_longlong(m_dexter_dsp_tx, "gain0", m_conf.txgain)) != 0) { - etiLog.level(warn) << "Failed to set dexter_dsp_tx.stream0_start_clks = 0: " << get_iio_error(r); + etiLog.level(error) << "Failed to set dexter_dsp_tx.stream0_start_clks = 0: " << get_iio_error(r); } if (m_conf.sampleRate != 2048000) { @@ -396,28 +401,28 @@ void Dexter::transmit_frame(const struct FrameData& frame) etiLog.level(error) << "Failed to get dexter_dsp_tx.pps_clks: " << get_iio_error(r); } - const double margin = (double)((int64_t)frame_ts_clocks - pps_clks) / DSP_CLOCK; + const double margin_s = (double)((int64_t)frame_ts_clocks - pps_clks) / DSP_CLOCK; - etiLog.level(debug) << "Dexter: TS CLK " << + etiLog.level(debug) << "DEXTER FCT " << frame.ts.fct << " TS CLK " << ((int64_t)frame.ts.timestamp_sec - (int64_t)m_utc_seconds_at_startup) * DSP_CLOCK << " + " << m_clock_count_at_startup << " + " << (uint64_t)frame.ts.timestamp_pps * TIMESTAMP_PPS_PER_DSP_CLOCKS << " = " << frame_ts_clocks << " DELTA " << - frame_ts_clocks << " - " << pps_clks << " = " << margin; + frame_ts_clocks << " - " << pps_clks << " = " << margin_s; - // Ensure we hand the frame over to HW at least 0.2s before timestamp - if (margin < 0.2) { - etiLog.level(warn) << "Skip frame short margin " << margin; + // Ensure we hand the frame over to HW with a bit of margin + if (margin_s < 0.1) { + etiLog.level(warn) << "Skip frame short margin " << margin_s; num_late++; return; } - if ((r = iio_device_attr_write_longlong(m_dexter_dsp_tx, "stream0_start_clks", frame_ts_clocks)) != 0) { etiLog.level(warn) << "Skip frame, failed to set dexter_dsp_tx.stream0_start_clks = " << frame_ts_clocks << " : " << get_iio_error(r); num_late++; return; } + m_require_timestamp_refresh = false; } channel_up(); @@ -469,6 +474,7 @@ void Dexter::underflow_read_process() set_thread_name("dexter_underflow"); while (m_running) { + this_thread::sleep_for(chrono::seconds(1)); long long attr_value = 0; int r = 0; @@ -481,7 +487,6 @@ void Dexter::underflow_read_process() underflows = underflows_new; } } - this_thread::sleep_for(chrono::seconds(1)); } m_running = false; } diff --git a/src/output/SDR.cpp b/src/output/SDR.cpp index 2b6700f..0b3299a 100644 --- a/src/output/SDR.cpp +++ b/src/output/SDR.cpp @@ -2,7 +2,7 @@ Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010 Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2022 + Copyright (C) 2023 Matthias P. Braendli, matthias.braendli@mpb.li http://opendigitalradio.org @@ -46,17 +46,13 @@ using namespace std; namespace Output { -// Maximum number of frames that can wait in frames +// Maximum number of frames that can wait in frames, when not using synchronised transmission static constexpr size_t FRAMES_MAX_SIZE = 8; // If the timestamp is further in the future than // 100 seconds, abort static constexpr double TIMESTAMP_ABORT_FUTURE = 100; -// Add a delay to increase buffers when -// frames are too far in the future -static constexpr double TIMESTAMP_MARGIN_FUTURE = 0.5; - SDR::SDR(SDRDeviceConfig& config, std::shared_ptr<SDRDevice> device) : ModOutput(), ModMetadata(), RemoteControllable("sdr"), m_config(config), @@ -127,6 +123,10 @@ int SDR::process(Buffer *dataIn) meta_vec_t SDR::process_metadata(const meta_vec_t& metadataIn) { + double frame_duration_s = + chrono::duration_cast<chrono::milliseconds>( + transmission_frame_duration(m_config.dabMode)).count() / 1000.0; + if (m_device and m_running) { FrameData frame; frame.buf = std::move(m_frame); @@ -163,9 +163,22 @@ meta_vec_t SDR::process_metadata(const meta_vec_t& metadataIn) m_config.sampleRate); } - size_t num_frames = m_queue.push_wait_if_full(frame, - FRAMES_MAX_SIZE); - etiLog.log(trace, "SDR,push %zu", num_frames); + + const auto max_size = m_config.enableSync ? + (frame.ts.timestamp_offset * 4.0) / frame_duration_s + : FRAMES_MAX_SIZE; + + auto r = m_queue.push_overflow(std::move(frame), max_size); + etiLog.log(trace, "SDR,push %d %zu", r.overflowed, r.new_size); + + if (r.overflowed) { + fprintf(stderr, "o"); + } + else { + fprintf(stderr, "."); + } + + num_queue_overflows += r.overflowed ? 1 : 0; } } else { @@ -186,16 +199,13 @@ void SDR::process_thread_entry() last_tx_time_initialised = false; - size_t last_num_underflows = 0; - size_t pop_prebuffering = FRAMES_MAX_SIZE; - m_running.store(true); try { while (m_running.load()) { struct FrameData frame; etiLog.log(trace, "SDR,wait"); - m_queue.wait_and_pop(frame, pop_prebuffering); + m_queue.wait_and_pop(frame); etiLog.log(trace, "SDR,pop"); if (m_running.load() == false) { @@ -204,19 +214,6 @@ void SDR::process_thread_entry() if (m_device) { handle_frame(frame); - - const auto rs = m_device->get_run_statistics(); - - /* Ensure we fill frames after every underrun and - * at startup to reduce underrun likelihood. */ - if (last_num_underflows < rs.num_underruns) { - pop_prebuffering = FRAMES_MAX_SIZE; - } - else { - pop_prebuffering = 1; - } - - last_num_underflows = rs.num_underruns; } } } @@ -302,6 +299,7 @@ void SDR::handle_frame(struct FrameData& frame) } if (frame.ts.offset_changed) { + etiLog.level(debug) << "TS offset changed"; m_device->require_timestamp_refresh(); } @@ -354,9 +352,19 @@ void SDR::handle_frame(struct FrameData& frame) " frame " << frame.ts.fct << ", tx_second " << tx_second << ", pps " << pps_offset; + m_device->require_timestamp_refresh(); return; } + etiLog.level(debug) << + "OutputSDR: Timestamp at FCT=" << frame.ts.fct << " offset: " << + std::fixed << + time_spec.get_real_secs() - device_time << + " (" << device_time << ")" + " frame " << frame.ts.fct << + ", tx_second " << tx_second << + ", pps " << pps_offset; + if (time_spec.get_real_secs() > device_time + TIMESTAMP_ABORT_FUTURE) { etiLog.level(error) << "OutputSDR: Timestamp way too far in the future at FCT=" << frame.ts.fct << " offset: " << @@ -367,9 +375,8 @@ void SDR::handle_frame(struct FrameData& frame) } if (m_config.muting) { - etiLog.log(info, - "OutputSDR: Muting FCT=%d requested", - frame.ts.fct); + etiLog.log(info, "OutputSDR: Muting FCT=%d requested", frame.ts.fct); + m_device->require_timestamp_refresh(); return; } diff --git a/src/output/SDR.h b/src/output/SDR.h index d7f7b46..5c3b599 100644 --- a/src/output/SDR.h +++ b/src/output/SDR.h @@ -88,6 +88,7 @@ class SDR : public ModOutput, public ModMetadata, public RemoteControllable { bool last_tx_time_initialised = false; uint32_t last_tx_second = 0; uint32_t last_tx_pps = 0; + size_t num_queue_overflows = 0; bool t_last_frame_initialised = false; std::chrono::steady_clock::time_point t_last_frame; diff --git a/src/output/Soapy.cpp b/src/output/Soapy.cpp index c2c5046..c2ae88a 100644 --- a/src/output/Soapy.cpp +++ b/src/output/Soapy.cpp @@ -349,6 +349,7 @@ void Soapy::transmit_frame(const struct FrameData& frame) SoapySDR::errToStr(ret_deact)); } m_tx_stream_active = false; + m_require_timestamp_refresh = false; } if (eob_because_muting) { diff --git a/src/output/UHD.cpp b/src/output/UHD.cpp index 6e38f73..6810249 100644 --- a/src/output/UHD.cpp +++ b/src/output/UHD.cpp @@ -350,6 +350,7 @@ void UHD::transmit_frame(const struct FrameData& frame) frame.ts.timestamp_valid and m_require_timestamp_refresh and samps_to_send <= usrp_max_num_samps ); + m_require_timestamp_refresh = false; //send a single packet size_t num_tx_samps = m_tx_stream->send( @@ -359,7 +360,7 @@ void UHD::transmit_frame(const struct FrameData& frame) num_acc_samps += num_tx_samps; - md_tx.time_spec += uhd::time_spec_t(0, num_tx_samps/m_conf.sampleRate); + md_tx.time_spec += uhd::time_spec_t::from_ticks(num_tx_samps, (double)m_conf.sampleRate); if (num_tx_samps == 0) { etiLog.log(warn, |