diff options
| author | shunt010 <sam@maxxwave.co.uk> | 2025-12-31 13:19:28 +0000 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2025-12-31 13:19:28 +0000 |
| commit | f8eaf51f61cdae65e90675920e427d23b8da7027 (patch) | |
| tree | e70c92214ac05cf0a2001e0481e289343c094d65 /src/input | |
| parent | f8b5402727b7e94aecbfb663a601577f97bae5b9 (diff) | |
| parent | a5f80a99e0dad51c45e8511347f27d816ae92e20 (diff) | |
| download | dabmux-f8eaf51f61cdae65e90675920e427d23b8da7027.tar.gz dabmux-f8eaf51f61cdae65e90675920e427d23b8da7027.tar.bz2 dabmux-f8eaf51f61cdae65e90675920e427d23b8da7027.zip | |
Merge pull request #1 from Opendigitalradio/master
Bring up to date
Diffstat (limited to 'src/input')
| -rw-r--r-- | src/input/Edi.cpp | 72 | ||||
| -rw-r--r-- | src/input/Edi.h | 6 | ||||
| -rw-r--r-- | src/input/File.cpp | 18 | ||||
| -rw-r--r-- | src/input/Zmq.cpp | 12 | ||||
| -rw-r--r-- | src/input/Zmq.h | 1 |
5 files changed, 84 insertions, 25 deletions
diff --git a/src/input/Edi.cpp b/src/input/Edi.cpp index ebeca77..b100f32 100644 --- a/src/input/Edi.cpp +++ b/src/input/Edi.cpp @@ -2,7 +2,7 @@ Copyright (C) 2009 Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2019 + Copyright (C) 2024 Matthias P. Braendli, matthias.braendli@mpb.li http://www.opendigitalradio.org @@ -80,7 +80,9 @@ Edi::~Edi() { void Edi::open(const std::string& name) { const std::regex re_udp("udp://:([0-9]+)"); + const std::regex re_udp_bindto("udp://([^:]+):([0-9]+)"); const std::regex re_udp_multicast("udp://@([0-9.]+):([0-9]+)"); + const std::regex re_udp_multicast_bindto("udp://([0-9.])+@([0-9.]+):([0-9]+)"); const std::regex re_tcp("tcp://(.*):([0-9]+)"); lock_guard<mutex> lock(m_mutex); @@ -97,13 +99,37 @@ void Edi::open(const std::string& name) m_udp_sock.reinit(udp_port); m_udp_sock.setBlocking(false); } + else if (std::regex_match(name, m, re_udp_bindto)) { + const int udp_port = std::stoi(m[2].str()); + m_input_used = InputUsed::UDP; + m_udp_sock.reinit(udp_port, m[1].str()); + m_udp_sock.setBlocking(false); + } + else if (std::regex_match(name, m, re_udp_multicast_bindto)) { + const string bind_to = m[1].str(); + const string multicast_address = m[2].str(); + const int udp_port = std::stoi(m[3].str()); + + m_input_used = InputUsed::UDP; + if (IN_MULTICAST(ntohl(inet_addr(multicast_address.c_str())))) { + m_udp_sock.init_receive_multicast(udp_port, bind_to, multicast_address); + } + else { + throw runtime_error(string("Address ") + multicast_address + " is not a multicast address"); + } + m_udp_sock.setBlocking(false); + } else if (std::regex_match(name, m, re_udp_multicast)) { const string multicast_address = m[1].str(); const int udp_port = std::stoi(m[2].str()); m_input_used = InputUsed::UDP; - m_udp_sock.reinit(udp_port); + if (IN_MULTICAST(ntohl(inet_addr(multicast_address.c_str())))) { + m_udp_sock.init_receive_multicast(udp_port, "0.0.0.0", multicast_address); + } + else { + throw runtime_error(string("Address ") + multicast_address + " is not a multicast address"); + } m_udp_sock.setBlocking(false); - m_udp_sock.joinGroup(multicast_address.c_str()); } else if (std::regex_match(name, m, re_tcp)) { m_input_used = InputUsed::TCP; @@ -140,8 +166,11 @@ size_t Edi::readFrame(uint8_t *buffer, size_t size) else if (not m_pending_sti_frame.frame.empty()) { // Can only happen when switching from timestamp-based buffer management! if (m_pending_sti_frame.frame.size() != size) { - etiLog.level(debug) << "EDI input " << m_name << " size mismatch: " << - m_pending_sti_frame.frame.size() << " received, " << size << " requested"; + if (not m_size_mismatch_printed) { + etiLog.level(debug) << "EDI input " << m_name << " size mismatch: " << + m_pending_sti_frame.frame.size() << " received, " << size << " requested"; + m_size_mismatch_printed = true; + } memset(buffer, 0, size * sizeof(*buffer)); m_stats.notifyUnderrun(); return 0; @@ -158,6 +187,7 @@ size_t Edi::readFrame(uint8_t *buffer, size_t size) m_pending_sti_frame.frame.end(), buffer); m_pending_sti_frame.frame.clear(); + m_size_mismatch_printed = false; return size; } } @@ -192,11 +222,15 @@ size_t Edi::readFrame(uint8_t *buffer, size_t size) m_stats.notifyPeakLevels(sti.audio_levels.left, sti.audio_levels.right); copy(sti.frame.cbegin(), sti.frame.cend(), buffer); + m_size_mismatch_printed = false; return size; } else { - etiLog.level(debug) << "EDI input " << m_name << " size mismatch: " << - sti.frame.size() << " received, " << size << " requested"; + if (not m_size_mismatch_printed) { + etiLog.level(debug) << "EDI input " << m_name << " size mismatch: " << + sti.frame.size() << " received, " << size << " requested"; + m_size_mismatch_printed = true; + } memset(buffer, 0, size * sizeof(*buffer)); m_stats.notifyUnderrun(); return 0; @@ -206,6 +240,7 @@ size_t Edi::readFrame(uint8_t *buffer, size_t size) memset(buffer, 0, size * sizeof(*buffer)); m_is_prebuffering = true; etiLog.level(info) << "EDI input " << m_name << " re-enabling pre-buffering"; + m_size_mismatch_printed = false; m_stats.notifyUnderrun(); return 0; } @@ -226,7 +261,7 @@ size_t Edi::readFrame(uint8_t *buffer, size_t size, std::time_t seconds, int utc while (not m_pending_sti_frame.frame.empty()) { if (m_pending_sti_frame.frame.size() == size) { - if (m_pending_sti_frame.timestamp.valid()) { + if (m_pending_sti_frame.timestamp.is_valid()) { auto ts_req = EdiDecoder::frame_timestamp_t::from_unix_epoch(seconds, utco, tsta); ts_req += m_tist_delay; const double offset = ts_req.diff_s(m_pending_sti_frame.timestamp); @@ -296,7 +331,7 @@ size_t Edi::readFrame(uint8_t *buffer, size_t size, std::time_t seconds, int utc m_is_prebuffering = true; return 0; } - else if (not m_pending_sti_frame.timestamp.valid()) { + else if (not m_pending_sti_frame.timestamp.is_valid()) { etiLog.level(warn) << "EDI input " << m_name << " invalid timestamp, ignoring"; memset(buffer, 0, size); @@ -461,7 +496,7 @@ const std::string Edi::get_parameter(const std::string& parameter) const ss << "prebuffering"; break; case Inputs::BufferManagement::Timestamped: - ss << "Timestamped"; + ss << "timestamped"; break; } } @@ -474,4 +509,21 @@ const std::string Edi::get_parameter(const std::string& parameter) const return ss.str(); } +const json::map_t Edi::get_all_values() const +{ + json::map_t map; + map["buffer"].v = m_max_frames_overrun; + map["prebuffering"].v = m_num_frames_prebuffering; + switch (getBufferManagement()) { + case Inputs::BufferManagement::Prebuffering: + map["buffermanagement"].v = "prebuffering"; + break; + case Inputs::BufferManagement::Timestamped: + map["buffermanagement"].v = "timestamped"; + break; + } + map["tistdelay"].v = m_tist_delay.count(); + return map; +} + } diff --git a/src/input/Edi.h b/src/input/Edi.h index ca465bd..3de17a7 100644 --- a/src/input/Edi.h +++ b/src/input/Edi.h @@ -2,7 +2,7 @@ Copyright (C) 2009 Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2019 + Copyright (C) 2024 Matthias P. Braendli, matthias.braendli@mpb.li http://www.opendigitalradio.org @@ -78,6 +78,7 @@ class Edi : public InputBase, public RemoteControllable { /* Remote control */ virtual void set_parameter(const std::string& parameter, const std::string& value); virtual const std::string get_parameter(const std::string& parameter) const; + virtual const json::map_t get_all_values() const; protected: void m_run(); @@ -105,6 +106,9 @@ class Edi : public InputBase, public RemoteControllable { // State variable used in prebuffering-based buffer management bool m_is_prebuffering = true; + // Display the 'size mismatch' warning only once + bool m_size_mismatch_printed = false; + /* When using prebuffering, consider the buffer to be full on the * receive side if it's above the overrun threshold. * diff --git a/src/input/File.cpp b/src/input/File.cpp index d9fe02a..c70feee 100644 --- a/src/input/File.cpp +++ b/src/input/File.cpp @@ -28,9 +28,6 @@ #include <cstdio> #include <fcntl.h> #include <unistd.h> -#ifndef _WIN32 -# define O_BINARY 0 -#endif #include "input/File.h" #include "mpeg.h" #include "ReedSolomon.h" @@ -39,9 +36,6 @@ using namespace std; namespace Inputs { -#ifdef _WIN32 -# pragma pack(push, 1) -#endif struct packetHeader { unsigned char addressHigh:2; unsigned char last:1; @@ -52,11 +46,7 @@ struct packetHeader { unsigned char dataLength:7; unsigned char command; } -#ifdef _WIN32 -# pragma pack(pop) -#else __attribute((packed)) -#endif ; @@ -68,7 +58,7 @@ void FileBase::open(const std::string& name) load_entire_file(); } else { - int flags = O_RDONLY | O_BINARY; + int flags = O_RDONLY; if (m_nonblock) { flags |= O_NONBLOCK; } @@ -140,13 +130,13 @@ ssize_t FileBase::load_entire_file() { // Clear the buffer if the file open fails, this allows user to stop transmission // of the current data. - vector<uint8_t> old_file_contents = move(m_file_contents); + vector<uint8_t> old_file_contents = std::move(m_file_contents); m_file_contents.clear(); m_file_contents_offset = 0; // Read entire file in chunks of 4MiB constexpr size_t blocksize = 4 * 1024 * 1024; - constexpr int flags = O_RDONLY | O_BINARY; + constexpr int flags = O_RDONLY; m_fd = ::open(m_filename.c_str(), flags); if (m_fd == -1) { if (not m_file_open_alert_shown) { @@ -225,7 +215,7 @@ ssize_t FileBase::readFromFile(uint8_t *buffer, size_t size) vector<uint8_t> remaining_buf; copy(m_nonblock_buffer.begin() + size, m_nonblock_buffer.end(), back_inserter(remaining_buf)); - m_nonblock_buffer = move(remaining_buf); + m_nonblock_buffer = std::move(remaining_buf); return size; } diff --git a/src/input/Zmq.cpp b/src/input/Zmq.cpp index be3fd1f..d5e722e 100644 --- a/src/input/Zmq.cpp +++ b/src/input/Zmq.cpp @@ -614,7 +614,19 @@ const string ZmqBase::get_parameter(const string& parameter) const throw ParameterError(ss.str()); } return ss.str(); +} +const json::map_t ZmqBase::get_all_values() const +{ + json::map_t map; + map["buffer"].v = m_config.buffer_size; + map["prebuffering"].v = m_config.prebuffering; + map["enable"].v = m_enable_input; + map["encryption"].v = m_config.enable_encryption; + map["secretkey"].v = m_config.curve_secret_keyfile; + map["publickey"].v = m_config.curve_public_keyfile; + map["encoderkey"].v = m_config.curve_encoder_keyfile; + return map; } }; diff --git a/src/input/Zmq.h b/src/input/Zmq.h index c101da0..72fccbd 100644 --- a/src/input/Zmq.h +++ b/src/input/Zmq.h @@ -194,6 +194,7 @@ class ZmqBase : public InputBase, public RemoteControllable { /* Getting a parameter always returns a string. */ virtual const std::string get_parameter(const std::string& parameter) const; + virtual const json::map_t get_all_values() const; protected: virtual int readFromSocket(size_t framesize) = 0; |
