diff options
| -rw-r--r-- | lib/ThreadsafeQueue.h | 54 | ||||
| -rw-r--r-- | src/DabMod.cpp | 90 | ||||
| -rw-r--r-- | src/DabModulator.cpp | 2 | ||||
| -rw-r--r-- | src/TimestampDecoder.cpp | 1 | ||||
| -rw-r--r-- | src/TimestampDecoder.h | 2 | ||||
| -rw-r--r-- | src/output/Dexter.cpp | 31 | ||||
| -rw-r--r-- | src/output/SDR.cpp | 65 | ||||
| -rw-r--r-- | src/output/SDR.h | 1 | ||||
| -rw-r--r-- | src/output/Soapy.cpp | 1 | ||||
| -rw-r--r-- | src/output/UHD.cpp | 3 | 
10 files changed, 168 insertions, 82 deletions
| diff --git a/lib/ThreadsafeQueue.h b/lib/ThreadsafeQueue.h index 815dfe0..8b385d6 100644 --- a/lib/ThreadsafeQueue.h +++ b/lib/ThreadsafeQueue.h @@ -2,7 +2,7 @@     Copyright (C) 2007, 2008, 2009, 2010, 2011 Her Majesty the Queen in     Right of Canada (Communications Research Center Canada) -   Copyright (C) 2018 +   Copyright (C) 2023     Matthias P. Braendli, matthias.braendli@mpb.li     An implementation for a threadsafe queue, depends on C++11 @@ -32,6 +32,7 @@  #include <condition_variable>  #include <queue>  #include <utility> +#include <cassert>  /* This queue is meant to be used by two threads. One producer   * that pushes elements into the queue, and one consumer that @@ -69,7 +70,6 @@ public:          }          size_t queue_size = the_queue.size();          lock.unlock(); -          the_rx_notification.notify_one();          return queue_size; @@ -93,11 +93,57 @@ public:          return queue_size;      } +    struct push_overflow_result { bool overflowed; size_t new_size; }; + +    /* Push one element into the queue, and if queue is +     * full remove one element from the other end. +     * +     * max_size == 0 is not allowed. +     * +     * returns the new queue size and a flag if overflow occurred. +     */ +    push_overflow_result push_overflow(T const& val, size_t max_size) +    { +        assert(max_size > 0); +        std::unique_lock<std::mutex> lock(the_mutex); + +        bool overflow = false; +        while (the_queue.size() >= max_size) { +            overflow = true; +            the_queue.pop(); +        } +        the_queue.push(val); +        const size_t queue_size = the_queue.size(); +        lock.unlock(); + +        the_rx_notification.notify_one(); + +        return {overflow, queue_size}; +    } + +    push_overflow_result push_overflow(T&& val, size_t max_size) +    { +        assert(max_size > 0); +        std::unique_lock<std::mutex> lock(the_mutex); + +        bool overflow = false; +        while (the_queue.size() >= max_size) { +            overflow = true; +            the_queue.pop(); +        } +        the_queue.emplace(std::move(val)); +        const size_t queue_size = the_queue.size(); +        lock.unlock(); + +        the_rx_notification.notify_one(); + +        return {overflow, queue_size}; +    } + +      /* Push one element into the queue, but wait until the       * queue size goes below the threshold.       * -     * Notify waiting thread. -     *       * returns the new queue size.       */      size_t push_wait_if_full(T const& val, size_t threshold) diff --git a/src/DabMod.cpp b/src/DabMod.cpp index 4b4fda0..e5436ad 100644 --- a/src/DabMod.cpp +++ b/src/DabMod.cpp @@ -25,6 +25,7 @@     along with ODR-DabMod.  If not, see <http://www.gnu.org/licenses/>.   */ +#include <fftw3.h>  #ifdef HAVE_CONFIG_H  #   include "config.h"  #endif @@ -342,6 +343,27 @@ int launch_modulator(int argc, char* argv[])      printModSettings(mod_settings); +    { +        // This is mostly useful on ARM systems where FFTW planning takes some time. If we do it here +        // it will be done before the modulator starts up +        etiLog.level(debug) << "Running FFTW planning..."; +        constexpr size_t fft_size = 2048; // Transmission Mode I. If different, it'll recalculate on OfdmGenerator +                                          // initialisation +        auto *fft_in = (fftwf_complex*)fftwf_malloc(sizeof(fftwf_complex) * fft_size); +        auto *fft_out = (fftwf_complex*)fftwf_malloc(sizeof(fftwf_complex) * fft_size); +        if (fft_in == nullptr or fft_out == nullptr) { +            throw std::runtime_error("FFTW malloc failed"); +        } +        fftwf_set_timelimit(2); +        fftwf_plan plan = fftwf_plan_dft_1d(fft_size, fft_in, fft_out, FFTW_FORWARD, FFTW_MEASURE); +        fftwf_destroy_plan(plan); +        plan = fftwf_plan_dft_1d(fft_size, fft_in, fft_out, FFTW_BACKWARD, FFTW_MEASURE); +        fftwf_destroy_plan(plan); +        fftwf_free(fft_in); +        fftwf_free(fft_out); +        etiLog.level(debug) << "FFTW planning done."; +    } +      shared_ptr<FormatConverter> format_converter;      if (mod_settings.useFileOutput and              (mod_settings.fileOutputFormat == "s8" or @@ -563,48 +585,46 @@ static run_modulator_state_t run_modulator(const mod_settings_t& mod_settings, m                  ts = m.ediInput->ediReader.getTimestamp();              } -            bool fct_good = false; -            if (last_eti_fct == -1) { -                if (fp != 0) { -                    // Do not start the flowgraph before we get to FP 0 -                    // to ensure all blocks are properly aligned. -                    if (m.ediInput) { -                        m.ediInput->ediReader.clearFrame(); -                    } -                    continue; -                } -                else { -                    fct_good = true; -                } +            // timestamp is good if we run unsynchronised, or if margin is sufficient +            bool ts_good = not mod_settings.sdr_device_config.enableSync or +                (ts.timestamp_valid and ts.offset_to_system_time() > 0.2); + +            if (!ts_good) { +                etiLog.level(warn) << "Modulator skipping frame " << fct << +                    " TS " << (ts.timestamp_valid ? "valid" : "invalid") << +                    " offset " << (ts.timestamp_valid ? ts.offset_to_system_time() : 0);              }              else { -                const unsigned expected_fct = (last_eti_fct + 1) % 250; -                if (fct == expected_fct) { -                    fct_good = true; +                bool modulate = true; +                if (last_eti_fct == -1) { +                    if (fp != 0) { +                        // Do not start the flowgraph before we get to FP 0 +                        // to ensure all blocks are properly aligned. +                        modulate = false; +                    } +                    else { +                        last_eti_fct = fct; +                    }                  }                  else { -                    etiLog.level(info) << "ETI FCT discontinuity, expected " << -                        expected_fct << " received " << fct; -                    if (m.ediInput) { -                        m.ediInput->ediReader.clearFrame(); +                    const unsigned expected_fct = (last_eti_fct + 1) % 250; +                    if (fct == expected_fct) { +                        last_eti_fct = fct; +                    } +                    else { +                        etiLog.level(info) << "ETI FCT discontinuity, expected " << +                            expected_fct << " received " << fct; +                        if (m.ediInput) { +                            m.ediInput->ediReader.clearFrame(); +                        } +                        return run_modulator_state_t::again;                      } -                    return run_modulator_state_t::again;                  } -            } -            // timestamp is good if we run unsynchronised, or if margin is insufficient -            bool ts_good = not mod_settings.sdr_device_config.enableSync or -                (ts.timestamp_valid and ts.offset_to_system_time() > 1); - -            if (fct_good and ts_good) { -                last_eti_fct = fct; -                m.framecount++; -                m.flowgraph->run(); -            } -            else { -                etiLog.level(warn) << "Skipping frame " << -                    " TS " << (ts.timestamp_valid ? "valid" : "invalid") << -                    " offset " << (ts.timestamp_valid ? ts.offset_to_system_time() : 0); +                if (modulate) { +                    m.framecount++; +                    m.flowgraph->run(); +                }              }              if (m.ediInput) { diff --git a/src/DabModulator.cpp b/src/DabModulator.cpp index 1f16d1d..3d8bd46 100644 --- a/src/DabModulator.cpp +++ b/src/DabModulator.cpp @@ -126,6 +126,7 @@ int DabModulator::process(Buffer* dataOut)      PDEBUG("DabModulator::process(dataOut: %p)\n", dataOut);      if (not myFlowgraph) { +        etiLog.level(debug) << "Setting up DabModulator...";          const unsigned mode = m_settings.dabMode;          setMode(mode); @@ -364,6 +365,7 @@ int DabModulator::process(Buffer* dataOut)                  prev_plugin = p;              }          } +        etiLog.level(debug) << "DabModulator set up.";      }      //////////////////////////////////////////////////////////////////// diff --git a/src/TimestampDecoder.cpp b/src/TimestampDecoder.cpp index 674f32c..6e97af6 100644 --- a/src/TimestampDecoder.cpp +++ b/src/TimestampDecoder.cpp @@ -110,6 +110,7 @@ frame_timestamp TimestampDecoder::getTimestamp()      ts.fct = latestFCT;      ts.fp = latestFP; +    ts.timestamp_offset = timestamp_offset;      ts.offset_changed = offset_changed;      offset_changed = false; diff --git a/src/TimestampDecoder.h b/src/TimestampDecoder.h index 2793e02..597b777 100644 --- a/src/TimestampDecoder.h +++ b/src/TimestampDecoder.h @@ -42,6 +42,8 @@ struct frame_timestamp      uint32_t timestamp_sec; // seconds in unix epoch      uint32_t timestamp_pps; // In units of 1/16384000 s      bool timestamp_valid = false; + +    double timestamp_offset = 0.0; // copy of the configured modulator offset      bool offset_changed = false;      frame_timestamp& operator+=(const double& diff); diff --git a/src/output/Dexter.cpp b/src/output/Dexter.cpp index cc10c57..4e24cfb 100644 --- a/src/output/Dexter.cpp +++ b/src/output/Dexter.cpp @@ -46,6 +46,8 @@ namespace Output {  static constexpr uint64_t DSP_CLOCK = 2048000uLL * 80; +static constexpr uint64_t IIO_TIMEOUT_MS = 1000; +  static constexpr size_t TRANSMISSION_FRAME_LEN = (2656 + 76 * 2552) * 4;  static constexpr size_t IIO_BUFFERS = 4;  static constexpr size_t IIO_BUFFER_LEN = TRANSMISSION_FRAME_LEN / IIO_BUFFERS; @@ -53,7 +55,7 @@ static constexpr size_t IIO_BUFFER_LEN = TRANSMISSION_FRAME_LEN / IIO_BUFFERS;  static string get_iio_error(int err)  {      char dst[256]; -    iio_strerror(err, dst, sizeof(dst)); +    iio_strerror(-err, dst, sizeof(dst));      return string(dst);  } @@ -75,6 +77,11 @@ Dexter::Dexter(SDRDeviceConfig& config) :          throw std::runtime_error("Dexter: Unable to create iio scan context");      } +    int r; +    if ((r = iio_context_set_timeout(m_ctx, IIO_TIMEOUT_MS)) != 0) { +        etiLog.level(error) << "Failed to set IIO timeout " << get_iio_error(r); +    } +      m_dexter_dsp_tx = iio_context_find_device(m_ctx, "dexter_dsp_tx");      if (!m_dexter_dsp_tx) {          throw std::runtime_error("Dexter: Unable to find dexter_dsp_tx iio device"); @@ -85,8 +92,6 @@ Dexter::Dexter(SDRDeviceConfig& config) :          throw std::runtime_error("Dexter: Unable to find ad9957_tx0 iio device");      } -    int r; -      // TODO make DC offset configurable and add to RC      if ((r = iio_device_attr_write_longlong(m_dexter_dsp_tx, "dc0", 0)) != 0) {          etiLog.level(warn) << "Failed to set dexter_dsp_tx.dc0 = false: " << get_iio_error(r); @@ -97,11 +102,11 @@ Dexter::Dexter(SDRDeviceConfig& config) :      }      if ((r = iio_device_attr_write_longlong(m_dexter_dsp_tx, "stream0_start_clks", 0)) != 0) { -        etiLog.level(warn) << "Failed to set dexter_dsp_tx.stream0_start_clks = 0: " << get_iio_error(r); +        etiLog.level(error) << "Failed to set dexter_dsp_tx.stream0_start_clks = 0: " << get_iio_error(r);      }      if ((r = iio_device_attr_write_longlong(m_dexter_dsp_tx, "gain0", m_conf.txgain)) != 0) { -        etiLog.level(warn) << "Failed to set dexter_dsp_tx.stream0_start_clks = 0: " << get_iio_error(r); +        etiLog.level(error) << "Failed to set dexter_dsp_tx.stream0_start_clks = 0: " << get_iio_error(r);      }      if (m_conf.sampleRate != 2048000) { @@ -396,28 +401,28 @@ void Dexter::transmit_frame(const struct FrameData& frame)                  etiLog.level(error) << "Failed to get dexter_dsp_tx.pps_clks: " << get_iio_error(r);              } -            const double margin = (double)((int64_t)frame_ts_clocks - pps_clks) / DSP_CLOCK; +            const double margin_s = (double)((int64_t)frame_ts_clocks - pps_clks) / DSP_CLOCK; -            etiLog.level(debug) << "Dexter: TS CLK " << +            etiLog.level(debug) << "DEXTER FCT " << frame.ts.fct << " TS CLK " <<                  ((int64_t)frame.ts.timestamp_sec - (int64_t)m_utc_seconds_at_startup) * DSP_CLOCK << " + " <<                  m_clock_count_at_startup << " + " <<                  (uint64_t)frame.ts.timestamp_pps * TIMESTAMP_PPS_PER_DSP_CLOCKS << " = " <<                  frame_ts_clocks << " DELTA " << -                frame_ts_clocks << " - " << pps_clks << " = " << margin; +                frame_ts_clocks << " - " << pps_clks << " = " << margin_s; -            // Ensure we hand the frame over to HW at least 0.2s before timestamp -            if (margin < 0.2) { -                etiLog.level(warn) << "Skip frame short margin " << margin; +            // Ensure we hand the frame over to HW with a bit of margin +            if (margin_s < 0.1) { +                etiLog.level(warn) << "Skip frame short margin " << margin_s;                  num_late++;                  return;              } -              if ((r = iio_device_attr_write_longlong(m_dexter_dsp_tx, "stream0_start_clks", frame_ts_clocks)) != 0) {                  etiLog.level(warn) << "Skip frame, failed to set dexter_dsp_tx.stream0_start_clks = " << frame_ts_clocks << " : " << get_iio_error(r);                  num_late++;                  return;              } +            m_require_timestamp_refresh = false;          }          channel_up(); @@ -469,6 +474,7 @@ void Dexter::underflow_read_process()      set_thread_name("dexter_underflow");      while (m_running) { +        this_thread::sleep_for(chrono::seconds(1));          long long attr_value = 0;          int r = 0; @@ -481,7 +487,6 @@ void Dexter::underflow_read_process()                  underflows = underflows_new;              }          } -        this_thread::sleep_for(chrono::seconds(1));      }      m_running = false;  } diff --git a/src/output/SDR.cpp b/src/output/SDR.cpp index 2b6700f..0b3299a 100644 --- a/src/output/SDR.cpp +++ b/src/output/SDR.cpp @@ -2,7 +2,7 @@     Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010 Her Majesty the     Queen in Right of Canada (Communications Research Center Canada) -   Copyright (C) 2022 +   Copyright (C) 2023     Matthias P. Braendli, matthias.braendli@mpb.li      http://opendigitalradio.org @@ -46,17 +46,13 @@ using namespace std;  namespace Output { -// Maximum number of frames that can wait in frames +// Maximum number of frames that can wait in frames, when not using synchronised transmission  static constexpr size_t FRAMES_MAX_SIZE = 8;  // If the timestamp is further in the future than  // 100 seconds, abort  static constexpr double TIMESTAMP_ABORT_FUTURE = 100; -// Add a delay to increase buffers when -// frames are too far in the future -static constexpr double TIMESTAMP_MARGIN_FUTURE = 0.5; -  SDR::SDR(SDRDeviceConfig& config, std::shared_ptr<SDRDevice> device) :      ModOutput(), ModMetadata(), RemoteControllable("sdr"),      m_config(config), @@ -127,6 +123,10 @@ int SDR::process(Buffer *dataIn)  meta_vec_t SDR::process_metadata(const meta_vec_t& metadataIn)  { +    double frame_duration_s = +        chrono::duration_cast<chrono::milliseconds>( +                transmission_frame_duration(m_config.dabMode)).count() / 1000.0; +      if (m_device and m_running) {          FrameData frame;          frame.buf = std::move(m_frame); @@ -163,9 +163,22 @@ meta_vec_t SDR::process_metadata(const meta_vec_t& metadataIn)                          m_config.sampleRate);              } -            size_t num_frames = m_queue.push_wait_if_full(frame, -                    FRAMES_MAX_SIZE); -            etiLog.log(trace, "SDR,push %zu", num_frames); + +            const auto max_size = m_config.enableSync ? +                (frame.ts.timestamp_offset * 4.0) / frame_duration_s +                : FRAMES_MAX_SIZE; + +            auto r = m_queue.push_overflow(std::move(frame), max_size); +            etiLog.log(trace, "SDR,push %d %zu", r.overflowed, r.new_size); + +            if (r.overflowed) { +                fprintf(stderr, "o"); +            } +            else { +                fprintf(stderr, "."); +            } + +            num_queue_overflows += r.overflowed ? 1 : 0;          }      }      else { @@ -186,16 +199,13 @@ void SDR::process_thread_entry()      last_tx_time_initialised = false; -    size_t last_num_underflows = 0; -    size_t pop_prebuffering = FRAMES_MAX_SIZE; -      m_running.store(true);      try {          while (m_running.load()) {              struct FrameData frame;              etiLog.log(trace, "SDR,wait"); -            m_queue.wait_and_pop(frame, pop_prebuffering); +            m_queue.wait_and_pop(frame);              etiLog.log(trace, "SDR,pop");              if (m_running.load() == false) { @@ -204,19 +214,6 @@ void SDR::process_thread_entry()              if (m_device) {                  handle_frame(frame); - -                const auto rs = m_device->get_run_statistics(); - -                /* Ensure we fill frames after every underrun and -                 * at startup to reduce underrun likelihood. */ -                if (last_num_underflows < rs.num_underruns) { -                    pop_prebuffering = FRAMES_MAX_SIZE; -                } -                else { -                    pop_prebuffering = 1; -                } - -                last_num_underflows = rs.num_underruns;              }          }      } @@ -302,6 +299,7 @@ void SDR::handle_frame(struct FrameData& frame)          }          if (frame.ts.offset_changed) { +            etiLog.level(debug) << "TS offset changed";              m_device->require_timestamp_refresh();          } @@ -354,9 +352,19 @@ void SDR::handle_frame(struct FrameData& frame)                  " frame " << frame.ts.fct <<                  ", tx_second " << tx_second <<                  ", pps " << pps_offset; +            m_device->require_timestamp_refresh();              return;          } +        etiLog.level(debug) << +            "OutputSDR: Timestamp             at FCT=" << frame.ts.fct << " offset: " << +            std::fixed << +            time_spec.get_real_secs() - device_time << +            "  (" << device_time << ")" +            " frame " << frame.ts.fct << +            ", tx_second " << tx_second << +            ", pps " << pps_offset; +          if (time_spec.get_real_secs() > device_time + TIMESTAMP_ABORT_FUTURE) {              etiLog.level(error) <<                  "OutputSDR: Timestamp way too far in the future at FCT=" << frame.ts.fct << " offset: " << @@ -367,9 +375,8 @@ void SDR::handle_frame(struct FrameData& frame)      }      if (m_config.muting) { -        etiLog.log(info, -                "OutputSDR: Muting FCT=%d requested", -                frame.ts.fct); +        etiLog.log(info, "OutputSDR: Muting FCT=%d requested", frame.ts.fct); +        m_device->require_timestamp_refresh();          return;      } diff --git a/src/output/SDR.h b/src/output/SDR.h index d7f7b46..5c3b599 100644 --- a/src/output/SDR.h +++ b/src/output/SDR.h @@ -88,6 +88,7 @@ class SDR : public ModOutput, public ModMetadata, public RemoteControllable {          bool     last_tx_time_initialised = false;          uint32_t last_tx_second = 0;          uint32_t last_tx_pps = 0; +        size_t   num_queue_overflows = 0;          bool     t_last_frame_initialised = false;          std::chrono::steady_clock::time_point t_last_frame; diff --git a/src/output/Soapy.cpp b/src/output/Soapy.cpp index c2c5046..c2ae88a 100644 --- a/src/output/Soapy.cpp +++ b/src/output/Soapy.cpp @@ -349,6 +349,7 @@ void Soapy::transmit_frame(const struct FrameData& frame)                          SoapySDR::errToStr(ret_deact));              }              m_tx_stream_active = false; +            m_require_timestamp_refresh = false;          }          if (eob_because_muting) { diff --git a/src/output/UHD.cpp b/src/output/UHD.cpp index 6e38f73..6810249 100644 --- a/src/output/UHD.cpp +++ b/src/output/UHD.cpp @@ -350,6 +350,7 @@ void UHD::transmit_frame(const struct FrameData& frame)                  frame.ts.timestamp_valid and                  m_require_timestamp_refresh and                  samps_to_send <= usrp_max_num_samps ); +        m_require_timestamp_refresh = false;          //send a single packet          size_t num_tx_samps = m_tx_stream->send( @@ -359,7 +360,7 @@ void UHD::transmit_frame(const struct FrameData& frame)          num_acc_samps += num_tx_samps; -        md_tx.time_spec += uhd::time_spec_t(0, num_tx_samps/m_conf.sampleRate); +        md_tx.time_spec += uhd::time_spec_t::from_ticks(num_tx_samps, (double)m_conf.sampleRate);          if (num_tx_samps == 0) {              etiLog.log(warn, | 
