From 0d17e6f1a0ea8f3052f3eae54e437afcdad5fa95 Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Wed, 17 Jan 2024 09:51:30 +0100 Subject: ManagementServer: handle zmq sock recv returning failure --- src/ManagementServer.cpp | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) (limited to 'src/ManagementServer.cpp') diff --git a/src/ManagementServer.cpp b/src/ManagementServer.cpp index 599d744..568e80e 100644 --- a/src/ManagementServer.cpp +++ b/src/ManagementServer.cpp @@ -280,8 +280,10 @@ void ManagementServer::serverThread() if (pollItems[0].revents & ZMQ_POLLIN) { zmq::message_t zmq_message; - m_zmq_sock.recv(zmq_message); - handle_message(zmq_message); + const auto r = m_zmq_sock.recv(zmq_message); + if (r.has_value()) { + handle_message(zmq_message); + } } } } -- cgit v1.2.3 From 6517cc3078eba96ea96e085d033a4b8a96eb7151 Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Tue, 11 Mar 2025 16:35:08 +0100 Subject: Add EDI/TCP number of active connections statistics --- doc/STATS.md | 7 +++-- doc/show_dabmux_stats.py | 22 +++++++++++++ lib/Socket.cpp | 22 +++++++++++++ lib/Socket.h | 10 ++++++ lib/ThreadsafeQueue.h | 38 +++++++++++++++-------- lib/edioutput/Transport.cpp | 16 +++++++++- lib/edioutput/Transport.h | 11 +++++-- src/DabMultiplexer.cpp | 7 +++++ src/ManagementServer.cpp | 75 ++++++++++++++++++++++++++++++++------------- src/ManagementServer.h | 20 ++++++++---- 10 files changed, 181 insertions(+), 47 deletions(-) (limited to 'src/ManagementServer.cpp') diff --git a/doc/STATS.md b/doc/STATS.md index 385d41e..435a92e 100644 --- a/doc/STATS.md +++ b/doc/STATS.md @@ -4,12 +4,13 @@ Stats available through Management Server Interface --------- -The management server makes statistics about the inputs available through a ZMQ request/reply socket. +The management server makes statistics about the inputs and EDI/TCP outputs +available through a ZMQ request/reply socket. The `show_dabmux_stats.py` illustrates how to access this information. -Meaning of values ------------------ +Meaning of values for inputs +---------------------------- `max` and `min` indicate input buffer fullness in bytes. diff --git a/doc/show_dabmux_stats.py b/doc/show_dabmux_stats.py index 7ea60f7..3b6d869 100755 --- a/doc/show_dabmux_stats.py +++ b/doc/show_dabmux_stats.py @@ -46,6 +46,7 @@ if len(sys.argv) == 1: data = sock.recv().decode("utf-8") values = json.loads(data)['values'] + print("## INPUT STATS") tmpl = "{ident:20}{maxfill:>8}{minfill:>8}{under:>8}{over:>8}{audioleft:>8}{audioright:>8}{peakleft:>8}{peakright:>8}{state:>16}{version:>48}{uptime:>8}{offset:>8}" print(tmpl.format( ident="id", @@ -89,6 +90,27 @@ if len(sys.argv) == 1: uptime=v['uptime'], offset=v['last_tist_offset'])) + sock.send(b"output_values") + + poller = zmq.Poller() + poller.register(sock, zmq.POLLIN) + + socks = dict(poller.poll(1000)) + if socks: + if socks.get(sock) == zmq.POLLIN: + print() + print("## OUTPUT STATS") + data = sock.recv().decode("utf-8") + values = json.loads(data)['output_values'] + for identifier in values: + if identifier.startswith("edi_tcp_"): + listen_port = identifier.rsplit("_", 1)[-1] + num_connections = values[identifier]["num_connections"] + print(f"EDI TCP on port {listen_port}: {num_connections} connections") + else: + print(f"Unknown output type: {identifier}") + + elif len(sys.argv) == 2 and sys.argv[1] == "config": sock = connect() diff --git a/lib/Socket.cpp b/lib/Socket.cpp index 938b573..5c920d7 100644 --- a/lib/Socket.cpp +++ b/lib/Socket.cpp @@ -24,6 +24,7 @@ #include "Socket.h" +#include #include #include #include @@ -1063,6 +1064,17 @@ void TCPConnection::process() #endif } +TCPConnection::stats_t TCPConnection::get_stats() const +{ + TCPConnection::stats_t s; + const vector buffer_sizes = queue.map( + [](const vector& vec) { return vec.size(); } + ); + + s.buffer_fullness = std::accumulate(buffer_sizes.cbegin(), buffer_sizes.cend(), 0); + s.remote_address = m_sock.get_remote_address(); + return s; +} TCPDataDispatcher::TCPDataDispatcher(size_t max_queue_size, size_t buffers_to_preroll) : m_max_queue_size(max_queue_size), @@ -1136,6 +1148,16 @@ void TCPDataDispatcher::process() } } + +std::vector TCPDataDispatcher::get_stats() const +{ + std::vector s; + for (const auto& conn : m_connections) { + s.push_back(conn.get_stats()); + } + return s; +} + TCPReceiveServer::TCPReceiveServer(size_t blocksize) : m_blocksize(blocksize) { diff --git a/lib/Socket.h b/lib/Socket.h index 7709145..29b618a 100644 --- a/lib/Socket.h +++ b/lib/Socket.h @@ -213,6 +213,8 @@ class TCPSocket { SOCKET get_sockfd() const { return m_sock; } + InetAddress get_remote_address() const { return m_remote_address; } + private: explicit TCPSocket(int sockfd); explicit TCPSocket(int sockfd, InetAddress remote_address); @@ -254,6 +256,12 @@ class TCPConnection ThreadsafeQueue > queue; + struct stats_t { + size_t buffer_fullness = 0; + InetAddress remote_address; + }; + stats_t get_stats() const; + private: std::atomic m_running; std::thread m_sender_thread; @@ -276,6 +284,8 @@ class TCPDataDispatcher void start(int port, const std::string& address); void write(const std::vector& data); + std::vector get_stats() const; + private: void process(); diff --git a/lib/ThreadsafeQueue.h b/lib/ThreadsafeQueue.h index 8b385d6..13bc19e 100644 --- a/lib/ThreadsafeQueue.h +++ b/lib/ThreadsafeQueue.h @@ -2,7 +2,7 @@ Copyright (C) 2007, 2008, 2009, 2010, 2011 Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2023 + Copyright (C) 2025 Matthias P. Braendli, matthias.braendli@mpb.li An implementation for a threadsafe queue, depends on C++11 @@ -28,6 +28,7 @@ #pragma once +#include #include #include #include @@ -63,10 +64,10 @@ public: std::unique_lock lock(the_mutex); size_t queue_size_before = the_queue.size(); if (max_size == 0) { - the_queue.push(val); + the_queue.push_back(val); } else if (queue_size_before < max_size) { - the_queue.push(val); + the_queue.push_back(val); } size_t queue_size = the_queue.size(); lock.unlock(); @@ -80,10 +81,10 @@ public: std::unique_lock lock(the_mutex); size_t queue_size_before = the_queue.size(); if (max_size == 0) { - the_queue.emplace(std::move(val)); + the_queue.emplace_back(std::move(val)); } else if (queue_size_before < max_size) { - the_queue.emplace(std::move(val)); + the_queue.emplace_back(std::move(val)); } size_t queue_size = the_queue.size(); lock.unlock(); @@ -110,9 +111,9 @@ public: bool overflow = false; while (the_queue.size() >= max_size) { overflow = true; - the_queue.pop(); + the_queue.pop_front(); } - the_queue.push(val); + the_queue.push_back(val); const size_t queue_size = the_queue.size(); lock.unlock(); @@ -129,9 +130,9 @@ public: bool overflow = false; while (the_queue.size() >= max_size) { overflow = true; - the_queue.pop(); + the_queue.pop_front(); } - the_queue.emplace(std::move(val)); + the_queue.emplace_back(std::move(val)); const size_t queue_size = the_queue.size(); lock.unlock(); @@ -152,7 +153,7 @@ public: while (the_queue.size() >= threshold) { the_tx_notification.wait(lock); } - the_queue.push(val); + the_queue.push_back(val); size_t queue_size = the_queue.size(); lock.unlock(); @@ -198,7 +199,7 @@ public: } popped_value = the_queue.front(); - the_queue.pop(); + the_queue.pop_front(); lock.unlock(); the_tx_notification.notify_one(); @@ -220,15 +221,26 @@ public: } else { std::swap(popped_value, the_queue.front()); - the_queue.pop(); + the_queue.pop_front(); lock.unlock(); the_tx_notification.notify_one(); } } + template + std::vector map(std::function func) const + { + std::vector result; + std::unique_lock lock(the_mutex); + for (const T& elem : the_queue) { + result.push_back(func(elem)); + } + return result; + } + private: - std::queue the_queue; + std::deque the_queue; mutable std::mutex the_mutex; std::condition_variable the_rx_notification; std::condition_variable the_tx_notification; diff --git a/lib/edioutput/Transport.cpp b/lib/edioutput/Transport.cpp index 4979e93..a5e0bc3 100644 --- a/lib/edioutput/Transport.cpp +++ b/lib/edioutput/Transport.cpp @@ -1,5 +1,5 @@ /* - Copyright (C) 2022 + Copyright (C) 2025 Matthias P. Braendli, matthias.braendli@mpb.li http://www.opendigitalradio.org @@ -220,6 +220,20 @@ void Sender::override_pft_sequence(uint16_t pseq) edi_pft.OverridePSeq(pseq); } +std::vector Sender::get_tcp_server_stats() const +{ + std::vector stats; + + for (auto& el : tcp_dispatchers) { + Sender::stats_t s; + s.listen_port = el.first->listen_port; + s.stats = el.second->get_stats(); + stats.push_back(s); + } + + return stats; +} + void Sender::run() { while (m_running) { diff --git a/lib/edioutput/Transport.h b/lib/edioutput/Transport.h index c62545c..2ca638e 100644 --- a/lib/edioutput/Transport.h +++ b/lib/edioutput/Transport.h @@ -1,5 +1,5 @@ /* - Copyright (C) 2022 + Copyright (C) 2025 Matthias P. Braendli, matthias.braendli@mpb.li http://www.opendigitalradio.org @@ -38,10 +38,11 @@ #include #include #include +#include namespace edi { -/** STI sender for EDI output */ +/** ETI/STI sender for EDI output */ class Sender { public: @@ -64,6 +65,12 @@ class Sender { void override_af_sequence(uint16_t seq); void override_pft_sequence(uint16_t pseq); + struct stats_t { + uint16_t listen_port; + std::vector stats; + }; + std::vector get_tcp_server_stats() const; + private: void run(); diff --git a/src/DabMultiplexer.cpp b/src/DabMultiplexer.cpp index a68f09a..b9575fc 100644 --- a/src/DabMultiplexer.cpp +++ b/src/DabMultiplexer.cpp @@ -28,6 +28,7 @@ #include #include "DabMultiplexer.h" #include "ConfigParser.h" +#include "ManagementServer.h" #include "crc.h" #include "utils.h" @@ -795,6 +796,12 @@ void DabMultiplexer::mux_frame(std::vector >& outputs } edi_sender->write(edi_tagpacket); + + for (const auto& stat : edi_sender->get_tcp_server_stats()) { + get_mgmt_server().update_edi_tcp_output_stat( + stat.listen_port, + stat.stats.size()); + } } #if _DEBUG diff --git a/src/ManagementServer.cpp b/src/ManagementServer.cpp index 568e80e..dff093a 100644 --- a/src/ManagementServer.cpp +++ b/src/ManagementServer.cpp @@ -2,7 +2,7 @@ Copyright (C) 2009 Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2018 + Copyright (C) 2025 Matthias P. Braendli, matthias.braendli@mpb.li http://www.opendigitalradio.org @@ -28,13 +28,12 @@ along with ODR-DabMux. If not, see . */ -#include -#include -#include -#include -#include #include #include +#include +#include +#include +#include #include #include "ManagementServer.h" #include "Log.h" @@ -127,37 +126,42 @@ ManagementServer& get_mgmt_server() */ } -void ManagementServer::registerInput(InputStat* is) +void ManagementServer::register_input(InputStat* is) { unique_lock lock(m_statsmutex); std::string id(is->get_name()); - if (m_inputStats.count(id) == 1) { + if (m_input_stats.count(id) == 1) { etiLog.level(error) << "Double registration in MGMT Server with id '" << id << "'"; return; } - m_inputStats[id] = is; + m_input_stats[id] = is; } -void ManagementServer::unregisterInput(std::string id) +void ManagementServer::unregister_input(std::string id) { unique_lock lock(m_statsmutex); - if (m_inputStats.count(id) == 1) { - m_inputStats.erase(id); + if (m_input_stats.count(id) == 1) { + m_input_stats.erase(id); } } +// outputs will never disappear, no need to have a "remove" logic +void ManagementServer::update_edi_tcp_output_stat(uint16_t listen_port, size_t num_connections) +{ + m_output_stats[listen_port] = num_connections; +} bool ManagementServer::isInputRegistered(std::string& id) { unique_lock lock(m_statsmutex); - if (m_inputStats.count(id) == 0) { + if (m_input_stats.count(id) == 0) { etiLog.level(error) << "Management Server: id '" << id << "' does was not registered"; @@ -166,7 +170,7 @@ bool ManagementServer::isInputRegistered(std::string& id) return true; } -std::string ManagementServer::getStatConfigJSON() +std::string ManagementServer::get_input_config_json() { unique_lock lock(m_statsmutex); @@ -175,7 +179,7 @@ std::string ManagementServer::getStatConfigJSON() std::map::iterator iter; int i = 0; - for(iter = m_inputStats.begin(); iter != m_inputStats.end(); + for (iter = m_input_stats.begin(); iter != m_input_stats.end(); ++iter, i++) { std::string id = iter->first; @@ -192,16 +196,15 @@ std::string ManagementServer::getStatConfigJSON() return ss.str(); } -std::string ManagementServer::getValuesJSON() +std::string ManagementServer::get_input_values_json() { unique_lock lock(m_statsmutex); std::ostringstream ss; ss << "{ \"values\" : {\n"; - std::map::iterator iter; int i = 0; - for(iter = m_inputStats.begin(); iter != m_inputStats.end(); + for (auto iter = m_input_stats.begin(); iter != m_input_stats.end(); ++iter, i++) { const std::string& id = iter->first; @@ -220,6 +223,31 @@ std::string ManagementServer::getValuesJSON() return ss.str(); } +std::string ManagementServer::get_output_values_json() +{ + unique_lock lock(m_statsmutex); + + std::ostringstream ss; + ss << "{ \"output_values\" : {\n"; + + int i = 0; + for (auto iter = m_output_stats.begin(); iter != m_output_stats.end(); + ++iter, i++) + { + auto listen_port = iter->first; + auto num_connections = iter->second; + if (i > 0) { + ss << " ,\n"; + } + ss << " \"edi_tcp_" << listen_port << "\" : { \"num_connections\": " << + num_connections << "} "; + } + + ss << "}\n}\n"; + + return ss.str(); +} + ManagementServer::ManagementServer() : m_zmq_context(), m_zmq_sock(m_zmq_context, ZMQ_REP), @@ -323,10 +351,13 @@ void ManagementServer::handle_message(zmq::message_t& zmq_message) << "}\n"; } else if (data == "config") { - answer << getStatConfigJSON(); + answer << get_input_config_json(); } else if (data == "values") { - answer << getValuesJSON(); + answer << get_input_values_json(); + } + else if (data == "output_values") { + answer << get_output_values_json(); } else if (data == "getptree") { unique_lock lock(m_configmutex); @@ -366,12 +397,12 @@ InputStat::InputStat(const std::string& name) : InputStat::~InputStat() { - get_mgmt_server().unregisterInput(m_name); + get_mgmt_server().unregister_input(m_name); } void InputStat::registerAtServer() { - get_mgmt_server().registerInput(this); + get_mgmt_server().register_input(this); } void InputStat::notifyBuffer(long bufsize) diff --git a/src/ManagementServer.h b/src/ManagementServer.h index 6e39922..c7a4222 100644 --- a/src/ManagementServer.h +++ b/src/ManagementServer.h @@ -2,7 +2,7 @@ Copyright (C) 2009 Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2018 + Copyright (C) 2025 Matthias P. Braendli, matthias.braendli@mpb.li http://www.opendigitalradio.org @@ -50,6 +50,7 @@ # include "config.h" #endif +#include "Socket.h" #include "zmq.hpp" #include #include @@ -167,8 +168,10 @@ class ManagementServer void open(int listenport); /* Un-/Register a statistics data source */ - void registerInput(InputStat* is); - void unregisterInput(std::string id); + void register_input(InputStat* is); + void unregister_input(std::string id); + + void update_edi_tcp_output_stat(uint16_t listen_port, size_t num_connections); /* Load a ptree given by the management server. * @@ -205,20 +208,25 @@ class ManagementServer std::thread m_restarter_thread; /******* Statistics Data ********/ - std::map m_inputStats; + std::map m_input_stats; + + // Holds information about EDI/TCP outputs + std::map m_output_stats; /* Return a description of the configuration that will * allow to define what graphs to be created * * returns: a JSON encoded configuration */ - std::string getStatConfigJSON(); + std::string get_input_config_json(); /* Return the values for the statistics as defined in the configuration * * returns: JSON encoded statistics */ - std::string getValuesJSON(); + std::string get_input_values_json(); + + std::string get_output_values_json(); // mutex for accessing the map std::mutex m_statsmutex; -- cgit v1.2.3 From 90e9f058450cfb8bc2f06b02c60ba8cb533c2738 Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Wed, 10 Sep 2025 12:03:24 +0200 Subject: Remove some old _WIN32 ifdefs, restructure includes --- Makefile.am | 1 - src/DabMultiplexer.cpp | 1 + src/DabMux.cpp | 80 +++--------------------------------------------- src/DabMux.h | 45 --------------------------- src/Eti.cpp | 17 +++------- src/Eti.h | 18 ++--------- src/Interleaver.cpp | 5 --- src/ManagementServer.cpp | 11 +------ src/ManagementServer.h | 2 -- src/MuxElements.cpp | 9 +++++- src/MuxElements.h | 8 ++--- src/PcDebug.h | 62 ++++++++++++------------------------- src/fig/FIG0_6.h | 2 -- src/fig/FIG1.h | 13 +------- src/fig/FIG2.h | 14 +-------- src/input/File.cpp | 18 +++-------- src/mpeg.h | 15 ++------- 17 files changed, 54 insertions(+), 267 deletions(-) delete mode 100644 src/DabMux.h (limited to 'src/ManagementServer.cpp') diff --git a/Makefile.am b/Makefile.am index be2eed3..455a3a0 100644 --- a/Makefile.am +++ b/Makefile.am @@ -60,7 +60,6 @@ odr_dabmux_LDADD =$(ZMQ_LIBS) $(BOOST_LDFLAGS) \ $(PTHREAD_CFLAGS) $(PTHREAD_LIBS) $(BOOST_SYSTEM_LIB) odr_dabmux_SOURCES =src/DabMux.cpp \ - src/DabMux.h \ src/DabMultiplexer.cpp \ src/DabMultiplexer.h \ src/input/inputs.h \ diff --git a/src/DabMultiplexer.cpp b/src/DabMultiplexer.cpp index bea82c2..c665f2c 100644 --- a/src/DabMultiplexer.cpp +++ b/src/DabMultiplexer.cpp @@ -31,6 +31,7 @@ #include "ManagementServer.h" #include "crc.h" #include "utils.h" +#include "Eti.h" using namespace std; diff --git a/src/DabMux.cpp b/src/DabMux.cpp index 4b9352f..0066629 100644 --- a/src/DabMux.cpp +++ b/src/DabMux.cpp @@ -29,83 +29,27 @@ # include "config.h" #endif -#include #include #include #include #include +#include +#include #include -#include -#include -#include #include +#include #include -#include -#include -#include #include #include -// for basename -#include - -#include #include -#include #include -#include -#include -#include - -#ifdef _WIN32 -# include -# include -# include -# include -# include // For types... -typedef u_char uint8_t; -typedef WORD uint16_t; -typedef DWORD32 uint32_t; - -# ifndef __MINGW32__ -# include "xgetopt.h" -# endif -# define read _read -# define snprintf _snprintf -# define sleep(a) Sleep((a) * 1000) -#else -# include -# include -# include -# include -# include -# include -# include -# include - -#endif - -#include - -#ifdef _WIN32 -# pragma warning ( disable : 4103 ) -# include "Eti.h" -# pragma warning ( default : 4103 ) -#else -# include "Eti.h" -#endif -#include "input/Prbs.h" -#include "input/Zmq.h" +#include "DabMultiplexer.h" #include "dabOutput/dabOutput.h" -#include "crc.h" -#include "Socket.h" -#include "PcDebug.h" -#include "DabMux.h" #include "MuxElements.h" #include "utils.h" -#include "ConfigParser.h" #include "ManagementServer.h" #include "Log.h" #include "RemoteControl.h" @@ -120,14 +64,10 @@ volatile sig_atomic_t running = 1; */ void signalHandler(int signum) { -#ifdef _WIN32 - fprintf(stderr, "\npid: %i\n", _getpid()); -#else fprintf(stderr, "\npid: %i, ppid: %i\n", getpid(), getppid()); -#endif + #define SIG_MSG "Signal received: " switch (signum) { -#ifndef _WIN32 case SIGHUP: fprintf(stderr, SIG_MSG "SIGHUP\n"); break; @@ -138,7 +78,6 @@ void signalHandler(int signum) fprintf(stderr, SIG_MSG "SIGPIPE\n"); return; break; -#endif case SIGINT: fprintf(stderr, SIG_MSG "SIGINT\n"); break; @@ -150,9 +89,7 @@ void signalHandler(int signum) default: fprintf(stderr, SIG_MSG "number %i\n", signum); } -#ifndef _WIN32 killpg(0, SIGPIPE); -#endif running = 0; } @@ -185,12 +122,6 @@ int main(int argc, char *argv[]) } } -#ifdef _WIN32 - if (SetThreadPriority(GetCurrentThread(), THREAD_PRIORITY_HIGHEST) == 0) { - etiLog.log(warn, "Can't increase priority: %s\n", - strerror(errno)); - } -#else // Use the lowest real-time priority for this thread, and switch to real-time scheduling const int policy = SCHED_RR; sched_param sp; @@ -199,7 +130,6 @@ int main(int argc, char *argv[]) if (thread_prio_ret != 0) { etiLog.level(error) << "Could not set real-time priority for thread:" << thread_prio_ret; } -#endif int returnCode = 0; ptree pt; diff --git a/src/DabMux.h b/src/DabMux.h deleted file mode 100644 index 80b4881..0000000 --- a/src/DabMux.h +++ /dev/null @@ -1,45 +0,0 @@ -/* - Copyright (C) 2002, 2003, 2004, 2005, 2006, 2007, 2008, 2009, 2010, - 2011 Her Majesty the Queen in Right of Canada (Communications - Research Center Canada) - - Copyright (C) 2014 - Matthias P. Braendli, matthias.braendli@mpb.li - - This file declares several structures used in the multiplexer, - and defines default values for some parameters. - */ -/* - This file is part of ODR-DabMux. - - ODR-DabMux is free software: you can redistribute it and/or modify - it under the terms of the GNU General Public License as - published by the Free Software Foundation, either version 3 of the - License, or (at your option) any later version. - - ODR-DabMux is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with ODR-DabMux. If not, see . -*/ -#pragma once - -#include -#include -#include -#include "DabMultiplexer.h" -#include "RemoteControl.h" -#include "dabOutput/dabOutput.h" -#include "input/inputs.h" -#include "Eti.h" -#include "MuxElements.h" - -#ifdef _WIN32 -# include -#else -# include -#endif - diff --git a/src/Eti.cpp b/src/Eti.cpp index e1b51fb..2f26f2d 100644 --- a/src/Eti.cpp +++ b/src/Eti.cpp @@ -22,19 +22,10 @@ along with ODR-DabMux. If not, see . */ -#ifdef _WIN32 -# pragma warning ( disable : 4103 ) -# include "Eti.h" -# pragma warning ( default : 4103 ) -#else -# include "Eti.h" -# include -#endif - +#include "Eti.h" //definitions des structures des champs du ETI(NI, G703) - unsigned short eti_FC::getFrameLength() { return (unsigned short)((FL_high << 8) | FL_low); @@ -80,7 +71,7 @@ void eti_MNSC_TIME_1::setFromTime(struct tm *time_tm) { second_unit = time_tm->tm_sec % 10; second_tens = time_tm->tm_sec / 10; - + minute_unit = time_tm->tm_min % 10; minute_tens = time_tm->tm_min / 10; } @@ -89,7 +80,7 @@ void eti_MNSC_TIME_2::setFromTime(struct tm *time_tm) { hour_unit = time_tm->tm_hour % 10; hour_tens = time_tm->tm_hour / 10; - + day_unit = time_tm->tm_mday % 10; day_tens = time_tm->tm_mday / 10; } @@ -98,7 +89,7 @@ void eti_MNSC_TIME_3::setFromTime(struct tm *time_tm) { month_unit = (time_tm->tm_mon + 1) % 10; month_tens = (time_tm->tm_mon + 1) / 10; - + // They didn't see the y2k bug coming, did they ? year_unit = (time_tm->tm_year - 100) % 10; year_tens = (time_tm->tm_year - 100) / 10; diff --git a/src/Eti.h b/src/Eti.h index 88055c3..0d7aea5 100644 --- a/src/Eti.h +++ b/src/Eti.h @@ -29,24 +29,12 @@ # include #endif -#ifdef _WIN32 -# include // For types... -typedef WORD uint16_t; -typedef DWORD32 uint32_t; - -# define PACKED -# pragma pack(push, 1) -#else -# include -# include - -# define PACKED __attribute__ ((packed)) -#endif - +#include +#include +#define PACKED __attribute__ ((packed)) //definitions des structures des champs du ETI(NI, G703) - struct eti_SYNC { uint32_t ERR:8; uint32_t FSYNC:24; diff --git a/src/Interleaver.cpp b/src/Interleaver.cpp index cf0d235..1786d08 100644 --- a/src/Interleaver.cpp +++ b/src/Interleaver.cpp @@ -23,11 +23,6 @@ #include -#ifdef _WIN32 -# define bzero(a, b) memset((a), 0, (b)) -#endif // _WIN32 - - Interleaver::Interleaver(unsigned short I, unsigned short M, bool reverse) : I(I), M(M), diff --git a/src/ManagementServer.cpp b/src/ManagementServer.cpp index dff093a..2c25a7a 100644 --- a/src/ManagementServer.cpp +++ b/src/ManagementServer.cpp @@ -95,11 +95,6 @@ INPUT_COUNTER_RESET_TIME = std::chrono::minutes(30); static constexpr int INPUT_UNSTABLE_THRESHOLD = 3; -/* For how long the input buffers must be empty before we move an input to the - * NoData state. */ -static constexpr auto -INPUT_NODATA_TIMEOUT = std::chrono::seconds(30); - /* Keep 30s of min/max buffer fill information so that we can catch meaningful * values even if we have a slow poller */ static constexpr auto @@ -643,11 +638,7 @@ input_state_t InputStat::determineState() // STATE CALCULATION - /* If the buffer has been empty for more than - * INPUT_NODATA_TIMEOUT, we go to the NoData state. - * - * Consider an empty deque to be NoData too. - */ + /* Consider an empty deque to be NoData. */ if (std::all_of( m_buffer_fill_stats.begin(), m_buffer_fill_stats.end(), [](const fill_stat_t& fs) { return fs.bufsize == 0; }) ) { diff --git a/src/ManagementServer.h b/src/ManagementServer.h index c7a4222..d328f88 100644 --- a/src/ManagementServer.h +++ b/src/ManagementServer.h @@ -50,7 +50,6 @@ # include "config.h" #endif -#include "Socket.h" #include "zmq.hpp" #include #include @@ -65,7 +64,6 @@ #include #include -#include /*** State handing ***/ /* An input can be in one of the following three states: diff --git a/src/MuxElements.cpp b/src/MuxElements.cpp index d17b283..1f02a6d 100644 --- a/src/MuxElements.cpp +++ b/src/MuxElements.cpp @@ -743,7 +743,14 @@ const json::map_t dabEnsemble::get_all_values() const return map; } -bool dabEnsemble::validate_linkage_sets() +bool dabEnsemble::validate_linkage_sets() const +{ + return validate_linkage_sets(services, linkagesets); +} + +bool dabEnsemble::validate_linkage_sets( + const vec_sp_service& services, + std::vector > linkagesets) { for (const auto& ls : linkagesets) { const std::string keyserviceuid = ls->keyservice; diff --git a/src/MuxElements.h b/src/MuxElements.h index d118df9..0266671 100644 --- a/src/MuxElements.h +++ b/src/MuxElements.h @@ -33,16 +33,13 @@ #include #include #include -#include #include -#include #include #include #include #include "dabOutput/dabOutput.h" #include "input/inputs.h" #include "RemoteControl.h" -#include "Eti.h" // Protection levels and bitrates for UEP. const unsigned char ProtectionLevelTable[64] = { @@ -315,7 +312,10 @@ class dabEnsemble : public RemoteControllable { virtual const json::map_t get_all_values() const; /* Check if the Linkage Sets are valid */ - bool validate_linkage_sets(void); + bool validate_linkage_sets() const; + static bool validate_linkage_sets( + const vec_sp_service& services, + std::vector > linkagesets); /* all fields are public, since this was a struct before */ uint16_t id = 0; diff --git a/src/PcDebug.h b/src/PcDebug.h index d0b2b2c..68fceb8 100644 --- a/src/PcDebug.h +++ b/src/PcDebug.h @@ -19,8 +19,7 @@ along with ODR-DabMux. If not, see . */ -#ifndef PC_DEBUG_ -#define PC_DEBUG_ +#pragma once #ifdef HAVE_CONFIG_H # include "config.h" @@ -31,49 +30,28 @@ #include -#define LOG stderr +#define LOG stderr -#if !defined(_WIN32) || defined(__MINGW32__) -# ifndef PDEBUG -# ifdef DEBUG -# define PDEBUG(fmt, args...) fprintf (LOG, fmt , ## args) -# else -# define PDEBUG(fmt, args...) -# endif -# endif +#ifndef PDEBUG # ifdef DEBUG -# define PDEBUG_VERBOSE(level, verbosity, fmt, args...) if (level <= verbosity) { fprintf(LOG, fmt, ## args); fflush(LOG); } -# define PDEBUG0_VERBOSE(level, verbosity, txt) if (level <= verbosity) { fprintf(LOG, txt); fflush(LOG); } -# define PDEBUG1_VERBOSE(level, verbosity, txt, arg0) if (level <= verbosity) { fprintf(LOG, txt, arg0); fflush(LOG); } -# define PDEBUG2_VERBOSE(level, verbosity, txt, arg0, arg1) if (level <= verbosity) { fprintf(LOG, txt, arg0, arg1); fflush(LOG); } -# define PDEBUG3_VERBOSE(level, verbosity, txt, arg0, arg1, arg2) if (level <= verbosity) { fprintf(LOG, txt, arg0, arg1, arg2); fflush(LOG); } -# define PDEBUG4_VERBOSE(level, verbosity, txt, arg0, arg1, arg2, arg3) if (level <= verbosity) { fprintf(LOG, txt, arg0, arg1, arg2, arg3); fflush(LOG); } -# else -# define PDEBUG_VERBOSE(level, verbosity, fmt, args...) -# define PDEBUG0_VERBOSE(level, verbosity, txt) -# define PDEBUG1_VERBOSE(level, verbosity, txt, arg0) -# define PDEBUG2_VERBOSE(level, verbosity, txt, arg0, arg1) -# define PDEBUG3_VERBOSE(level, verbosity, txt, arg0, arg1, arg2) -# define PDEBUG4_VERBOSE(level, verbosity, txt, arg0, arg1, arg2, arg3) -# endif // DEBUG -#else // _WIN32 -# ifdef _DEBUG -# define PDEBUG -# define PDEBUG_VERBOSE -# define PDEBUG0_VERBOSE(level, verbosity, txt) if (level <= verbosity) { fprintf(LOG, txt); fflush(LOG); } -# define PDEBUG1_VERBOSE(level, verbosity, txt, arg0) if (level <= verbosity) { fprintf(LOG, txt, arg0); fflush(LOG); } -# define PDEBUG2_VERBOSE(level, verbosity, txt, arg0, arg1) if (level <= verbosity) { fprintf(LOG, txt, arg0, arg1); fflush(LOG); } -# define PDEBUG3_VERBOSE(level, verbosity, txt, arg0, arg1, arg2) if (level <= verbosity) { fprintf(LOG, txt, arg0, arg1, arg2); fflush(LOG); } -# define PDEBUG4_VERBOSE(level, verbosity, txt, arg0, arg1, arg2, arg3) if (level <= verbosity) { fprintf(LOG, txt, arg0, arg1, arg2, arg3); fflush(LOG); } +# define PDEBUG(fmt, args...) fprintf (LOG, fmt , ## args) # else -# define PDEBUG -# define PDEBUG_VERBOSE -# define PDEBUG0_VERBOSE(level, verbosity, txt) -# define PDEBUG1_VERBOSE(level, verbosity, txt, arg0) -# define PDEBUG2_VERBOSE(level, verbosity, txt, arg0, arg1) -# define PDEBUG3_VERBOSE(level, verbosity, txt, arg0, arg1, arg2) -# define PDEBUG4_VERBOSE(level, verbosity, txt, arg0, arg1, arg2, arg3) +# define PDEBUG(fmt, args...) # endif #endif +#ifdef DEBUG +# define PDEBUG_VERBOSE(level, verbosity, fmt, args...) if (level <= verbosity) { fprintf(LOG, fmt, ## args); fflush(LOG); } +# define PDEBUG0_VERBOSE(level, verbosity, txt) if (level <= verbosity) { fprintf(LOG, txt); fflush(LOG); } +# define PDEBUG1_VERBOSE(level, verbosity, txt, arg0) if (level <= verbosity) { fprintf(LOG, txt, arg0); fflush(LOG); } +# define PDEBUG2_VERBOSE(level, verbosity, txt, arg0, arg1) if (level <= verbosity) { fprintf(LOG, txt, arg0, arg1); fflush(LOG); } +# define PDEBUG3_VERBOSE(level, verbosity, txt, arg0, arg1, arg2) if (level <= verbosity) { fprintf(LOG, txt, arg0, arg1, arg2); fflush(LOG); } +# define PDEBUG4_VERBOSE(level, verbosity, txt, arg0, arg1, arg2, arg3) if (level <= verbosity) { fprintf(LOG, txt, arg0, arg1, arg2, arg3); fflush(LOG); } +#else +# define PDEBUG_VERBOSE(level, verbosity, fmt, args...) +# define PDEBUG0_VERBOSE(level, verbosity, txt) +# define PDEBUG1_VERBOSE(level, verbosity, txt, arg0) +# define PDEBUG2_VERBOSE(level, verbosity, txt, arg0, arg1) +# define PDEBUG3_VERBOSE(level, verbosity, txt, arg0, arg1, arg2) +# define PDEBUG4_VERBOSE(level, verbosity, txt, arg0, arg1, arg2, arg3) +#endif // DEBUG -#endif // PC_DEBUG_ diff --git a/src/fig/FIG0_6.h b/src/fig/FIG0_6.h index 770c4d5..96464d2 100644 --- a/src/fig/FIG0_6.h +++ b/src/fig/FIG0_6.h @@ -26,8 +26,6 @@ #pragma once #include -#include -#include namespace FIC { diff --git a/src/fig/FIG1.h b/src/fig/FIG1.h index 0fedffe..fe36717 100644 --- a/src/fig/FIG1.h +++ b/src/fig/FIG1.h @@ -23,8 +23,7 @@ along with ODR-DabMux. If not, see . */ -#ifndef __FIG1_H_ -#define __FIG1_H_ +#pragma once #include @@ -103,10 +102,6 @@ class FIG1_5 : public IFIG vec_sp_service::iterator service; }; -#ifdef _WIN32 -# pragma pack(push) -#endif - struct FIGtype1_0 { uint8_t Length:5; uint8_t FIGtypeNumber:3; @@ -165,11 +160,5 @@ struct FIGtype1_4_data { } PACKED; -#ifdef _WIN32 -# pragma pack(pop) -#endif - } // namespace FIC -#endif // __FIG1_H_ - diff --git a/src/fig/FIG2.h b/src/fig/FIG2.h index ee3fed9..e69c5db 100644 --- a/src/fig/FIG2.h +++ b/src/fig/FIG2.h @@ -22,9 +22,7 @@ You should have received a copy of the GNU General Public License along with ODR-DabMux. If not, see . */ - -#ifndef __FIG2_H_ -#define __FIG2_H_ +#pragma once #include #include @@ -117,10 +115,6 @@ class FIG2_4 : public IFIG std::map, FIG2_Segments> segment_per_component; }; -#ifdef _WIN32 -# pragma pack(push) -#endif - struct FIGtype2 { uint8_t Length:5; uint8_t FIGtypeNumber:3; @@ -159,11 +153,5 @@ struct FIG2_Extended_Label_WithTextControl { uint8_t EncodingFlag:1; } PACKED; -#ifdef _WIN32 -# pragma pack(pop) -#endif - } // namespace FIC -#endif // __FIG2_H_ - diff --git a/src/input/File.cpp b/src/input/File.cpp index d9fe02a..c70feee 100644 --- a/src/input/File.cpp +++ b/src/input/File.cpp @@ -28,9 +28,6 @@ #include #include #include -#ifndef _WIN32 -# define O_BINARY 0 -#endif #include "input/File.h" #include "mpeg.h" #include "ReedSolomon.h" @@ -39,9 +36,6 @@ using namespace std; namespace Inputs { -#ifdef _WIN32 -# pragma pack(push, 1) -#endif struct packetHeader { unsigned char addressHigh:2; unsigned char last:1; @@ -52,11 +46,7 @@ struct packetHeader { unsigned char dataLength:7; unsigned char command; } -#ifdef _WIN32 -# pragma pack(pop) -#else __attribute((packed)) -#endif ; @@ -68,7 +58,7 @@ void FileBase::open(const std::string& name) load_entire_file(); } else { - int flags = O_RDONLY | O_BINARY; + int flags = O_RDONLY; if (m_nonblock) { flags |= O_NONBLOCK; } @@ -140,13 +130,13 @@ ssize_t FileBase::load_entire_file() { // Clear the buffer if the file open fails, this allows user to stop transmission // of the current data. - vector old_file_contents = move(m_file_contents); + vector old_file_contents = std::move(m_file_contents); m_file_contents.clear(); m_file_contents_offset = 0; // Read entire file in chunks of 4MiB constexpr size_t blocksize = 4 * 1024 * 1024; - constexpr int flags = O_RDONLY | O_BINARY; + constexpr int flags = O_RDONLY; m_fd = ::open(m_filename.c_str(), flags); if (m_fd == -1) { if (not m_file_open_alert_shown) { @@ -225,7 +215,7 @@ ssize_t FileBase::readFromFile(uint8_t *buffer, size_t size) vector remaining_buf; copy(m_nonblock_buffer.begin() + size, m_nonblock_buffer.end(), back_inserter(remaining_buf)); - m_nonblock_buffer = move(remaining_buf); + m_nonblock_buffer = std::move(remaining_buf); return size; } diff --git a/src/mpeg.h b/src/mpeg.h index 15b9b80..29b3655 100644 --- a/src/mpeg.h +++ b/src/mpeg.h @@ -18,23 +18,13 @@ You should have received a copy of the GNU General Public License along with ODR-DabMux. If not, see . */ - -#ifndef _MPEG -#define _MPEG +#pragma once #ifdef HAVE_CONFIG_H # include "config.h" #endif -#ifdef _WIN32 -# include -# include -# include - -# define ssize_t SSIZE_T -#else -# include -#endif +#include #ifdef __cplusplus extern "C" { @@ -86,4 +76,3 @@ int checkDabMpegFrame(void* data); } #endif -#endif // _MPEG -- cgit v1.2.3 From c84727c8ec0f99d66d1ad7d4716de79b6235d4d1 Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Thu, 11 Sep 2025 15:37:32 +0200 Subject: Add runtime linkage-set reload --- src/ConfigParser.cpp | 15 +++++----- src/ConfigParser.h | 8 +++-- src/DabMultiplexer.cpp | 75 ++++++++++++++++++++++++++++++++++++++++------ src/DabMultiplexer.h | 78 ++++++++++++++++++++++++++++-------------------- src/DabMux.cpp | 44 ++++++++++----------------- src/ManagementServer.cpp | 4 +-- src/ManagementServer.h | 16 +++++----- src/MuxElements.h | 18 +++++------ 8 files changed, 161 insertions(+), 97 deletions(-) (limited to 'src/ManagementServer.cpp') diff --git a/src/ConfigParser.cpp b/src/ConfigParser.cpp index 7d166b6..2d500b3 100644 --- a/src/ConfigParser.cpp +++ b/src/ConfigParser.cpp @@ -3,7 +3,7 @@ 2011, 2012 Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2022 + Copyright (C) 2025 Matthias P. Braendli, matthias.braendli@mpb.li http://www.opendigitalradio.org @@ -110,10 +110,10 @@ static void parse_fig2_label(ptree& pt, DabLabel& label) { } // Parse the linkage section -static void parse_linkage(ptree& pt, - std::shared_ptr ensemble) +void parse_linkage( + const boost::optional pt_linking, + std::vector >& linkageSets) { - auto pt_linking = pt.get_child_optional("linking"); if (pt_linking) { for (const auto& it : *pt_linking) { const string setuid = it.first; @@ -132,7 +132,7 @@ static void parse_linkage(ptree& pt, string service_uid = pt_set.get("keyservice", ""); auto linkageset = make_shared(setuid, lsn, active, hard, international); - linkageset->keyservice = service_uid; // TODO check if it exists + linkageset->keyservice = service_uid; // existence check is done in validate_linkage_sets() auto pt_list = pt_set.get_child_optional("list"); @@ -189,7 +189,7 @@ static void parse_linkage(ptree& pt, linkageset->id_list.push_back(link); } } - ensemble->linkagesets.push_back(linkageset); + linkageSets.push_back(linkageset); } } } @@ -910,7 +910,8 @@ void parse_ptree( } - parse_linkage(pt, ensemble); + const auto pt_linking = pt.get_child_optional("linking"); + parse_linkage(pt_linking, ensemble->linkagesets); parse_freq_info(pt, ensemble); parse_other_service_linking(pt, ensemble); } diff --git a/src/ConfigParser.h b/src/ConfigParser.h index 9ca6c81..038247b 100644 --- a/src/ConfigParser.h +++ b/src/ConfigParser.h @@ -3,7 +3,7 @@ 2011, 2012 Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2016 + Copyright (C) 2025 Matthias P. Braendli, matthias.braendli@mpb.li The Configuration parser sets up the ensemble according @@ -34,6 +34,10 @@ #include #include -void parse_ptree(boost::property_tree::ptree& pt, +void parse_ptree( + boost::property_tree::ptree& pt, std::shared_ptr ensemble); +void parse_linkage( + const boost::optional pt_linking, + std::vector >& linkageSets); diff --git a/src/DabMultiplexer.cpp b/src/DabMultiplexer.cpp index c665f2c..7a8ac97 100644 --- a/src/DabMultiplexer.cpp +++ b/src/DabMultiplexer.cpp @@ -26,6 +26,9 @@ #include #include #include +#include +#include + #include "DabMultiplexer.h" #include "ConfigParser.h" #include "ManagementServer.h" @@ -132,15 +135,35 @@ std::pair MuxTime::get_milliseconds_seconds() } -DabMultiplexer::DabMultiplexer(boost::property_tree::ptree pt) : +void DabMultiplexerConfig::read(const std::string& filename) +{ + m_config_file = ""; + try { + if (stringEndsWith(filename, ".json")) { + read_json(filename, pt); + } + else { + read_info(filename, pt); + } + m_config_file = filename; + } + catch (const boost::property_tree::file_parser_error& e) + { + etiLog.level(warn) << "Failed to read " << filename; + } +} + +DabMultiplexer::DabMultiplexer(DabMultiplexerConfig& config) : RemoteControllable("mux"), - m_pt(pt), + m_config(config), m_time(), ensemble(std::make_shared()), - m_clock_tai(split_pipe_separated_string(pt.get("general.tai_clock_bulletins", ""))), + m_clock_tai(split_pipe_separated_string(m_config.pt.get("general.tai_clock_bulletins", ""))), fig_carousel(ensemble, [&]() { return m_time.get_milliseconds_seconds(); }) { RC_ADD_PARAMETER(frames, "Show number of frames generated [read-only]"); + RC_ADD_PARAMETER(tist_offset, "Configured tist-offset"); + RC_ADD_PARAMETER(reload_linkagesets, "Write 1 to this parameter to trigger a reload of the linkage sets from the config [write-only]"); rcs.enrol(&m_clock_tai); } @@ -160,7 +183,7 @@ void DabMultiplexer::set_edi_config(const edi::configuration_t& new_edi_conf) // Run a set of checks on the configuration void DabMultiplexer::prepare(bool require_tai_clock) { - parse_ptree(m_pt, ensemble); + parse_ptree(m_config.pt, ensemble); rcs.enrol(this); rcs.enrol(ensemble.get()); @@ -186,11 +209,11 @@ void DabMultiplexer::prepare(bool require_tai_clock) throw MuxInitException(); } - const uint32_t tist_at_fct0_ms = m_pt.get("general.tist_at_fct0", 0); - currentFrame = m_time.init(tist_at_fct0_ms, m_pt.get("general.tist_offset", 0.0)); + const uint32_t tist_at_fct0_ms = m_config.pt.get("general.tist_at_fct0", 0); + currentFrame = m_time.init(tist_at_fct0_ms, m_config.pt.get("general.tist_offset", 0.0)); m_time.mnsc_increment_time = false; - bool tist_enabled = m_pt.get("general.tist", false); + bool tist_enabled = m_config.pt.get("general.tist", false); auto tist_edi_time = m_time.get_tist_seconds(); const auto timestamp = tist_edi_time.first; @@ -439,6 +462,32 @@ void DabMultiplexer::prepare_data_inputs() } } +void DabMultiplexer::reload_linkagesets() +{ + try { + DabMultiplexerConfig new_conf; + new_conf.read(m_config.config_file()); + if (new_conf.valid()) { + const auto pt_linking = new_conf.pt.get_child_optional("linking"); + std::vector > linkagesets; + parse_linkage(pt_linking, linkagesets); + + etiLog.level(info) << "Validating " << linkagesets.size() << " new linkage sets."; + + if (ensemble->validate_linkage_sets(ensemble->services, linkagesets)) { + ensemble->linkagesets = linkagesets; + etiLog.level(info) << "Loaded new linkage sets."; + } + else { + etiLog.level(warn) << "New linkage set validation failed"; + } + } + } + catch (const std::exception& e) + { + etiLog.level(warn) << "Failed to update linkage sets: " << e.what(); + } +} /* Each call creates one ETI frame */ void DabMultiplexer::mux_frame(std::vector >& outputs) @@ -458,7 +507,7 @@ void DabMultiplexer::mux_frame(std::vector >& outputs // The above Tag Items will be assembled into a TAG Packet edi::TagPacket edi_tagpacket(edi_conf.tagpacket_alignment); - const bool tist_enabled = m_pt.get("general.tist", false); + const bool tist_enabled = m_config.pt.get("general.tist", false); int tai_utc_offset = 0; if (tist_enabled and m_tai_clock_required) { @@ -718,7 +767,7 @@ void DabMultiplexer::mux_frame(std::vector >& outputs index = (FLtmp + 2 + 1) * 4; eti_TIST *tist = (eti_TIST *) & etiFrame[index]; - bool enableTist = m_pt.get("general.tist", false); + bool enableTist = m_config.pt.get("general.tist", false); if (enableTist) { tist->TIST = htonl(timestamp) | 0xff; edi_tagDETI.tsta = timestamp & 0xffffff; @@ -848,6 +897,9 @@ void DabMultiplexer::set_parameter(const std::string& parameter, else if (parameter == "tist_offset") { m_time.set_tist_offset(std::stod(value)); } + else if (parameter == "reload_linkagesets") { + reload_linkagesets(); + } else { stringstream ss; ss << "Parameter '" << parameter << @@ -866,6 +918,11 @@ const std::string DabMultiplexer::get_parameter(const std::string& parameter) co else if (parameter == "tist_offset") { ss << m_time.tist_offset(); } + else if (parameter == "reload_linkagesets") { + ss << "Parameter '" << parameter << + "' is not write-only in controllable " << get_rc_name(); + throw ParameterError(ss.str()); + } else { ss << "Parameter '" << parameter << "' is not exported by controllable " << get_rc_name(); diff --git a/src/DabMultiplexer.h b/src/DabMultiplexer.h index 9306eed..620e65d 100644 --- a/src/DabMultiplexer.h +++ b/src/DabMultiplexer.h @@ -45,47 +45,59 @@ constexpr uint32_t ETI_FSYNC1 = 0x49C5F8; class MuxTime { private: - std::time_t m_edi_time = 0; - uint32_t m_pps_offset_ms = 0; - int64_t m_tist_offset_ms = 0; + std::time_t m_edi_time = 0; + uint32_t m_pps_offset_ms = 0; + int64_t m_tist_offset_ms = 0; public: - std::pair get_tist_seconds(); - std::pair get_milliseconds_seconds(); - - - /* Pre v3 odr-dabmux did the MNSC calculation differently, - * which works with the easydabv2. The rework in odr-dabmux, - * deriving MNSC time from EDI time broke this. - * - * That's why we're now tracking MNSC time in separate variables, - * to get the same behaviour back. - * - * I'm not aware of any devices using MNSC time besides the - * easydab. ODR-DabMod now considers EDI seconds or ZMQ metadata. - */ - bool mnsc_increment_time = false; - std::time_t mnsc_time = 0; - - /* Setup the time and return the initial currentFrame counter value */ - uint64_t init(uint32_t tist_at_fct0_ms, double tist_offset); - void increment_timestamp(); - double tist_offset() const { return m_tist_offset_ms / 1000.0; } - void set_tist_offset(double new_tist_offset); + std::pair get_tist_seconds(); + std::pair get_milliseconds_seconds(); + + + /* Pre v3 odr-dabmux did the MNSC calculation differently, + * which works with the easydabv2. The rework in odr-dabmux, + * deriving MNSC time from EDI time broke this. + * + * That's why we're now tracking MNSC time in separate variables, + * to get the same behaviour back. + * + * I'm not aware of any devices using MNSC time besides the + * easydab. ODR-DabMod now considers EDI seconds or ZMQ metadata. + */ + bool mnsc_increment_time = false; + std::time_t mnsc_time = 0; + + /* Setup the time and return the initial currentFrame counter value */ + uint64_t init(uint32_t tist_at_fct0_ms, double tist_offset); + void increment_timestamp(); + double tist_offset() const { return m_tist_offset_ms / 1000.0; } + void set_tist_offset(double new_tist_offset); +}; + +class DabMultiplexerConfig { + public: + boost::property_tree::ptree pt; + + void read(const std::string& filename); + bool valid() const { return m_config_file != ""; } + std::string config_file() const { return m_config_file; } + + private: + std::string m_config_file; }; class DabMultiplexer : public RemoteControllable { public: - DabMultiplexer(boost::property_tree::ptree pt); + DabMultiplexer(DabMultiplexerConfig& config); DabMultiplexer(const DabMultiplexer& other) = delete; DabMultiplexer& operator=(const DabMultiplexer& other) = delete; - ~DabMultiplexer(); + virtual ~DabMultiplexer(); void prepare(bool require_tai_clock); void mux_frame(std::vector >& outputs); - void print_info(void); + void print_info(); void set_edi_config(const edi::configuration_t& new_edi_conf); @@ -99,11 +111,13 @@ class DabMultiplexer : public RemoteControllable { virtual const json::map_t get_all_values() const; private: - void prepare_subchannels(void); - void prepare_services_components(void); - void prepare_data_inputs(void); + void prepare_subchannels(); + void prepare_services_components(); + void prepare_data_inputs(); + + void reload_linkagesets(); - boost::property_tree::ptree m_pt; + DabMultiplexerConfig& m_config; MuxTime m_time; uint64_t currentFrame = 0; diff --git a/src/DabMux.cpp b/src/DabMux.cpp index 0066629..7b5f5d6 100644 --- a/src/DabMux.cpp +++ b/src/DabMux.cpp @@ -31,8 +31,6 @@ #include #include -#include -#include #include #include #include @@ -132,12 +130,13 @@ int main(int argc, char *argv[]) } int returnCode = 0; - ptree pt; std::vector > outputs; try { string conf_file = ""; + DabMultiplexerConfig mux_conf; + if (argc == 2) { // Assume the only argument is a config file conf_file = argv[1]; @@ -154,8 +153,7 @@ int main(int argc, char *argv[]) } conf_file = argv[2]; - - read_info(conf_file, pt); + mux_conf.read(conf_file); } catch (runtime_error &e) { throw MuxInitException(e.what()); @@ -168,23 +166,18 @@ int main(int argc, char *argv[]) } try { - if (stringEndsWith(conf_file, ".json")) { - read_json(conf_file, pt); - } - else { - read_info(conf_file, pt); - } + mux_conf.read(conf_file); } catch (runtime_error &e) { throw MuxInitException(e.what()); } /* Enable Logging to syslog conditionally */ - if (pt.get("general.syslog", false)) { + if (mux_conf.pt.get("general.syslog", false)) { etiLog.register_backend(std::make_shared()); } - const auto startupcheck = pt.get("general.startupcheck", ""); + const auto startupcheck = mux_conf.pt.get("general.startupcheck", ""); if (not startupcheck.empty()) { etiLog.level(info) << "Running startup check '" << startupcheck << "'"; int wstatus = system(startupcheck.c_str()); @@ -204,26 +197,26 @@ int main(int argc, char *argv[]) } } - int mgmtserverport = pt.get("general.managementport", - pt.get("general.statsserverport", 0) ); + int mgmtserverport = mux_conf.pt.get("general.managementport", + mux_conf.pt.get("general.statsserverport", 0) ); /* Management: stats and config server */ get_mgmt_server().open(mgmtserverport); /************** READ REMOTE CONTROL PARAMETERS *************/ - int telnetport = pt.get("remotecontrol.telnetport", 0); + int telnetport = mux_conf.pt.get("remotecontrol.telnetport", 0); if (telnetport != 0) { auto rc = std::make_shared(telnetport); rcs.add_controller(rc); } - auto zmqendpoint = pt.get("remotecontrol.zmqendpoint", ""); + auto zmqendpoint = mux_conf.pt.get("remotecontrol.zmqendpoint", ""); if (not zmqendpoint.empty()) { auto rc = std::make_shared(zmqendpoint); rcs.add_controller(rc); } - DabMultiplexer mux(pt); + DabMultiplexer mux(mux_conf); etiLog.level(info) << PACKAGE_NAME << " " << @@ -240,7 +233,7 @@ int main(int argc, char *argv[]) /******************** READ OUTPUT PARAMETERS ***************/ set all_output_names; bool output_require_tai_clock = false; - ptree pt_outputs = pt.get_child("outputs"); + ptree pt_outputs = mux_conf.pt.get_child("outputs"); for (auto ptree_pair : pt_outputs) { string outputuid = ptree_pair.first; @@ -444,7 +437,6 @@ int main(int argc, char *argv[]) } outputs.push_back(output); - } } @@ -464,7 +456,7 @@ int main(int argc, char *argv[]) edi_conf.print(); } - size_t limit = pt.get("general.nbframes", 0); + const size_t limit = mux_conf.pt.get("general.nbframes", 0); etiLog.level(info) << "Start loop"; /* Each iteration of the main loop creates one ETI frame */ @@ -473,6 +465,7 @@ int main(int argc, char *argv[]) mux.mux_frame(outputs); if (limit && currentFrame >= limit) { + etiLog.level(info) << "Max number of ETI frames reached: " << currentFrame; break; } @@ -491,17 +484,12 @@ int main(int argc, char *argv[]) mgmt_server.restart(); } - mgmt_server.update_ptree(pt); + mgmt_server.update_ptree(mux_conf.pt); } } - - if (limit) { - etiLog.level(info) << "Max number of ETI frames reached: " << currentFrame; - } } catch (const MuxInitException& except) { - etiLog.level(error) << "Multiplex initialisation aborted: " << - except.what(); + etiLog.level(error) << "Multiplex initialisation aborted: " << except.what(); returnCode = 1; } catch (const std::invalid_argument& except) { diff --git a/src/ManagementServer.cpp b/src/ManagementServer.cpp index 2c25a7a..7344b8b 100644 --- a/src/ManagementServer.cpp +++ b/src/ManagementServer.cpp @@ -473,7 +473,7 @@ void InputStat::notifyPeakLevels(int peak_left, int peak_right) } } -void InputStat::notifyUnderrun(void) +void InputStat::notifyUnderrun() { unique_lock lock(m_mutex); @@ -492,7 +492,7 @@ void InputStat::notifyUnderrun(void) } } -void InputStat::notifyOverrun(void) +void InputStat::notifyOverrun() { unique_lock lock(m_mutex); diff --git a/src/ManagementServer.h b/src/ManagementServer.h index d328f88..93ad28c 100644 --- a/src/ManagementServer.h +++ b/src/ManagementServer.h @@ -93,20 +93,20 @@ class InputStat InputStat(const InputStat& other) = delete; InputStat& operator=(const InputStat& other) = delete; ~InputStat(); - void registerAtServer(void); + void registerAtServer(); - std::string get_name(void) const { return m_name; } + std::string get_name() const { return m_name; } /* This function is called for every frame read by * the multiplexer */ void notifyBuffer(long bufsize); void notifyTimestampOffset(double offset); void notifyPeakLevels(int peak_left, int peak_right); - void notifyUnderrun(void); - void notifyOverrun(void); + void notifyUnderrun(); + void notifyOverrun(); void notifyVersion(const std::string& version, uint32_t uptime_s); - std::string encodeValuesJSON(void); - input_state_t determineState(void); + std::string encodeValuesJSON(); + input_state_t determineState(); private: std::string m_name; @@ -183,7 +183,7 @@ class ManagementServer void update_ptree(const boost::property_tree::ptree& pt); bool fault_detected() const { return m_fault; } - void restart(void); + void restart(); private: void restart_thread(long); @@ -192,7 +192,7 @@ class ManagementServer zmq::context_t m_zmq_context; zmq::socket_t m_zmq_sock; - void serverThread(void); + void serverThread(); void handle_message(zmq::message_t& zmq_message); bool isInputRegistered(std::string& id); diff --git a/src/MuxElements.h b/src/MuxElements.h index 0266671..dfc4380 100644 --- a/src/MuxElements.h +++ b/src/MuxElements.h @@ -84,7 +84,7 @@ class MuxInitException : public std::exception MuxInitException(const std::string m = "ODR-DabMux initialisation error") throw() : msg(m) {} - ~MuxInitException(void) throw() {} + ~MuxInitException() throw() {} const char* what() const throw() { return msg.c_str(); } private: std::string msg; @@ -137,12 +137,12 @@ class AnnouncementCluster : public RemoteControllable { uint16_t flags = 0; std::string subchanneluid; - std::string tostring(void) const; + std::string tostring() const; /* Check if the activation/deactivation timeout occurred, * and return of if the Announcement is active */ - bool is_active(void); + bool is_active(); private: mutable std::mutex m_active_mutex; @@ -372,7 +372,7 @@ struct dabProtectionEEP { // select EEP profile A and B. // Other values are for future use, see // EN 300 401 Clause 6.2.1 "Basic sub-channel organisation" - uint8_t GetOption(void) const { + uint8_t GetOption() const { return (this->profile == EEP_A) ? 0 : 1; } }; @@ -402,16 +402,16 @@ public: protection() { } // Calculate subchannel size in number of CU - unsigned short getSizeCu(void) const; + unsigned short getSizeCu() const; // Calculate subchannel size in number of bytes - unsigned short getSizeByte(void) const; + unsigned short getSizeByte() const; // Calculate subchannel size in number of uint32_t - unsigned short getSizeWord(void) const; + unsigned short getSizeWord() const; // Calculate subchannel size in number of uint64_t - unsigned short getSizeDWord(void) const; + unsigned short getSizeDWord() const; // Read from the input, using the correct buffer management size_t readFrame(uint8_t *buffer, size_t size, std::time_t seconds, int utco, uint32_t tsta); @@ -574,7 +574,7 @@ class LinkageSet { bool hard, bool international); - std::string get_name(void) const { return m_name; } + std::string get_name() const { return m_name; } std::list id_list; -- cgit v1.2.3