diff options
-rw-r--r-- | Makefile.am | 2 | ||||
-rw-r--r-- | lib/ClockTAI.cpp | 428 | ||||
-rw-r--r-- | lib/ClockTAI.h | 72 | ||||
-rw-r--r-- | lib/Globals.cpp | 2 | ||||
-rw-r--r-- | lib/RemoteControl.cpp | 579 | ||||
-rw-r--r-- | lib/RemoteControl.h | 251 |
6 files changed, 343 insertions, 991 deletions
diff --git a/Makefile.am b/Makefile.am index 2eb2cc1..d9e5ae6 100644 --- a/Makefile.am +++ b/Makefile.am @@ -32,7 +32,7 @@ odr_sourcecompanion_SOURCES = src/odr-sourcecompanion.cpp \ lib/Globals.cpp \ lib/Log.h lib/Log.cpp \ lib/ReedSolomon.h lib/ReedSolomon.cpp \ - lib/RemoteControl.h lib/RemoteControl.cpp \ + lib/RemoteControl.h \ lib/Socket.h lib/Socket.cpp \ lib/ThreadsafeQueue.h \ lib/crc.h lib/crc.c \ diff --git a/lib/ClockTAI.cpp b/lib/ClockTAI.cpp index a244aba..06e88f8 100644 --- a/lib/ClockTAI.cpp +++ b/lib/ClockTAI.cpp @@ -3,7 +3,7 @@ 2011, 2012 Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2019 + Copyright (C) 2024 Matthias P. Braendli, matthias.braendli@mpb.li http://www.opendigitalradio.org @@ -33,6 +33,7 @@ * ClockTAI.cpp Log.cpp RemoteControl.cpp -lboost_system -o taitest && ./taitest */ +#include <iterator> #ifdef HAVE_CONFIG_H # include "config.h" #endif @@ -61,22 +62,15 @@ using namespace std; -#ifdef DOWNLOADED_IN_THE_PAST_TEST -static bool wait_longer = true; -#endif - constexpr int refresh_retry_interval_hours = 1; // Offset between NTP time and POSIX time: // timestamp_unix = timestamp_ntp - NTP_UNIX_OFFSET constexpr int64_t NTP_UNIX_OFFSET = 2208988800L; -constexpr int64_t MONTH = 3600 * 24 * 30; - -// leap seconds insertion bulletin is available from the IETF and in the TZ -// distribution -static array<const char*, 2> default_tai_urls = { - "https://www.ietf.org/timezones/data/leap-seconds.list", +// leap seconds insertion bulletin was previously available from the IETF and in the TZ +// distribution, but in late 2023 IETF stopped serving the file. +static array<const char*, 1> default_tai_urls = { "https://raw.githubusercontent.com/eggert/tz/master/leap-seconds.list", }; @@ -84,6 +78,32 @@ static array<const char*, 2> default_tai_urls = { // /var/tmp "must not be deleted when the system is booted." static const char *tai_cache_location = "/var/tmp/odr-leap-seconds.cache"; +static string join_string_with_pipe(const vector<string>& vec) +{ + stringstream ss; + for (auto it = vec.cbegin(); it != vec.cend(); ++it) { + ss << *it; + if (it + 1 != vec.cend()) { + ss << "|"; + } + } + return ss.str(); +} + +static vector<string> split_pipe_separated_string(const string& s) +{ + stringstream ss; + ss << s; + + string elem; + vector<string> components; + while (getline(ss, elem, '|')) { + components.push_back(elem); + } + return components; +} + + // read TAI offset from a valid bulletin in IETF format static int parse_ietf_bulletin(const std::string& bulletin) { @@ -146,20 +166,31 @@ static int parse_ietf_bulletin(const std::string& bulletin) throw runtime_error("No data in TAI bulletin"); } + // With the current evolution of the offset, we're probably going + // to reach 500 long after DAB gets replaced by another standard. + // Or maybe leap seconds get abolished first... + if (tai_utc_offset < 0 or tai_utc_offset > 500) { + throw runtime_error("Unreasonable TAI-UTC offset calculated"); + } + return tai_utc_offset; } +int64_t BulletinState::expires_in() const { + time_t now = time(nullptr); + return expires_at - now; +} -struct bulletin_state { - bool valid = false; - int64_t expiry = 0; - int offset = 0; +bool BulletinState::usable() const { + return valid and expires_in() > 0; +} - bool usable() const { return valid and expiry > 0; } - bool expires_soon() const { return usable() and expiry < 1 * MONTH; } -}; +bool BulletinState::expires_soon() const { + constexpr int64_t MONTH = 3600 * 24 * 30; + return usable() and expires_in() < 1 * MONTH; +} -static bulletin_state parse_bulletin(const string& bulletin) +BulletinState Bulletin::state() const { // The bulletin contains one line that specifies an expiration date // in NTP time. If that point in time is in the future, we consider @@ -168,11 +199,22 @@ static bulletin_state parse_bulletin(const string& bulletin) // The entry looks like this: //#@ 3707596800 - bulletin_state ret; + BulletinState ret; - std::regex regex_expiration(R"(#@\s+([0-9]+))"); + if (std::holds_alternative<Bulletin::OverrideData>(bulletin_or_override)) { + const auto& od = std::get<Bulletin::OverrideData>(bulletin_or_override); + ret.offset = od.offset; + ret.expires_at = od.expires_at; + ret.valid = true; +#ifdef TAI_TEST + etiLog.level(debug) << "state() from Override!"; +#endif + return ret; + } - time_t now = time(nullptr); + const auto& bulletin = std::get<string>(bulletin_or_override); + + std::regex regex_expiration(R"(#@\s+([0-9]+))"); stringstream ss(bulletin); @@ -190,20 +232,29 @@ static bulletin_state parse_bulletin(const string& bulletin) const int64_t expiry_unix = std::stoll(expiry_data_str) - NTP_UNIX_OFFSET; #ifdef TAI_TEST - etiLog.level(info) << "Bulletin expires in " << expiry_unix - now; + { + // Do not use `now` for anything else but debugging, otherwise it + // breaks the cache. + time_t now = time(nullptr); + etiLog.level(debug) << "Bulletin " << + get_source() << " expires in " << expiry_unix - now; + } #endif - ret.expiry = expiry_unix - now; + ret.expires_at = expiry_unix; ret.offset = parse_ietf_bulletin(bulletin); ret.valid = true; } catch (const invalid_argument& e) { - etiLog.level(warn) << "Could not parse bulletin: " << e.what(); + etiLog.level(warn) << "Could not parse bulletin from " << + get_source() << ": " << e.what(); } catch (const out_of_range&) { - etiLog.level(warn) << "Parse bulletin: conversion is out of range"; + etiLog.level(warn) << "Parse bulletin from " << + get_source() << ": conversion is out of range"; } catch (const runtime_error& e) { - etiLog.level(warn) << "Parse bulletin: " << e.what(); + etiLog.level(warn) << "Parse bulletin from " << + get_source() << ": " << e.what(); } break; } @@ -224,9 +275,9 @@ static size_t fill_bulletin(char *ptr, size_t size, size_t nmemb, void *ctx) return len; } -static string download_tai_utc_bulletin(const char* url) +Bulletin Bulletin::download_from_url(const char* url) { - stringstream bulletin; + stringstream bulletin_data; #ifdef HAVE_CURL CURL *curl; @@ -238,7 +289,7 @@ static string download_tai_utc_bulletin(const char* url) /* Tell libcurl to follow redirection */ curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1L); curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, fill_bulletin); - curl_easy_setopt(curl, CURLOPT_WRITEDATA, &bulletin); + curl_easy_setopt(curl, CURLOPT_WRITEDATA, &bulletin_data); res = curl_easy_perform(curl); /* always cleanup ! */ @@ -249,19 +300,39 @@ static string download_tai_utc_bulletin(const char* url) string(curl_easy_strerror(res))); } } - return bulletin.str(); + Bulletin bulletin; + bulletin.source = url; + bulletin.bulletin_or_override = bulletin_data.str(); + return bulletin; #else throw runtime_error("Cannot download TAI Clock information without cURL"); #endif // HAVE_CURL } -static string load_bulletin_from_file(const char* cache_filename) +Bulletin Bulletin::create_with_fixed_offset(int offset) { + Bulletin bulletin; + bulletin.source = "manual override"; + + OverrideData od; + od.offset = offset; + time_t now = time(nullptr); + // 10 years is probably equivalent to infinity in this case... + od.expires_at = now + 10L * 365 * 24 * 3600; + bulletin.bulletin_or_override = od; + return bulletin; +} + +Bulletin Bulletin::load_from_file(const char* cache_filename) +{ + Bulletin bulletin; + bulletin.source = cache_filename; + int fd = open(cache_filename, O_RDWR); // lockf requires O_RDWR if (fd == -1) { etiLog.level(error) << "TAI-UTC bulletin open cache for reading: " << strerror(errno); - return ""; + return bulletin; } lseek(fd, 0, SEEK_SET); @@ -280,7 +351,7 @@ static string load_bulletin_from_file(const char* cache_filename) close(fd); etiLog.level(error) << "TAI-UTC bulletin read cache: " << strerror(errno); - return ""; + return bulletin; } copy(buf.data(), buf.data() + ret, back_inserter(new_bulletin_data)); @@ -288,7 +359,7 @@ static string load_bulletin_from_file(const char* cache_filename) close(fd); - return string{new_bulletin_data.data(), new_bulletin_data.size()}; + bulletin.bulletin_or_override = string{new_bulletin_data.data(), new_bulletin_data.size()}; } else { etiLog.level(error) << @@ -296,13 +367,29 @@ static string load_bulletin_from_file(const char* cache_filename) strerror(errno); close(fd); } - return ""; + return bulletin; } +void Bulletin::clear_expiry_if_overridden() +{ + if (std::holds_alternative<Bulletin::OverrideData>(bulletin_or_override)) { + auto& od = std::get<Bulletin::OverrideData>(bulletin_or_override); + time_t now = time(nullptr); + od.expires_at = now; + } +} + +#if ENABLE_REMOTECONTROL ClockTAI::ClockTAI(const std::vector<std::string>& bulletin_urls) : RemoteControllable("clocktai") { + RC_ADD_PARAMETER(tai_utc_offset, "TAI-UTC offset"); RC_ADD_PARAMETER(expiry, "Number of seconds until TAI Bulletin expires"); + RC_ADD_PARAMETER(expires_at, "UNIX timestamp when TAI Bulletin expires"); + RC_ADD_PARAMETER(url, "URLs used to fetch the bulletin, separated by pipes"); +#else +ClockTAI::ClockTAI(const std::vector<std::string>& bulletin_urls) { +#endif // ENABLE_REMOTECONTROL if (bulletin_urls.empty()) { etiLog.level(debug) << "Initialising default TAI Bulletin URLs"; @@ -315,138 +402,119 @@ ClockTAI::ClockTAI(const std::vector<std::string>& bulletin_urls) : m_bulletin_urls = bulletin_urls; } - for (const auto& url : m_bulletin_urls) { - etiLog.level(info) << "TAI Bulletin URL: '" << url << "'"; - } + etiLog.level(debug) << "ClockTAI uses bulletin URL: '" << join_string_with_pipe(m_bulletin_urls) << "'"; } -int ClockTAI::get_valid_offset() +BulletinState ClockTAI::get_valid_offset() { - int offset = 0; - bool offset_valid = false; - bool refresh_m_bulletin = false; - std::unique_lock<std::mutex> lock(m_data_mutex); - const auto state = parse_bulletin(m_bulletin); - if (state.usable()) { + const auto state = m_bulletin.state(); #if TAI_TEST - etiLog.level(info) << "Bulletin already valid"; + etiLog.level(info) << "TAI get_valid_offset STEP 1 "; + etiLog.level(info) << " " << m_bulletin.get_source() << " " << + state.valid << " " << state.usable() << " " << state.expires_in(); #endif - offset = state.offset; - offset_valid = true; - - refresh_m_bulletin = state.expires_soon(); - } - else { - refresh_m_bulletin = true; + if (state.usable()) { + return state; } - if (refresh_m_bulletin) { - const auto cache_bulletin = load_bulletin_from_file(tai_cache_location); #if TAI_TEST - etiLog.level(info) << "Loaded cache bulletin with " << - std::count_if(cache_bulletin.cbegin(), cache_bulletin.cend(), - [](const char c){ return c == '\n'; }) << " lines"; + etiLog.level(info) << "TAI get_valid_offset STEP 2"; #endif - const auto cache_state = parse_bulletin(cache_bulletin); + const auto cache_bulletin = Bulletin::load_from_file(tai_cache_location); + const auto cache_state = cache_bulletin.state(); + if (cache_state.usable()) { + m_bulletin = cache_bulletin; +#if TAI_TEST + etiLog.level(info) << "TAI get_valid_offset STEP 2 take cache"; +#endif + return cache_state; + } - if (cache_state.usable() and not cache_state.expires_soon()) { - m_bulletin = cache_bulletin; - offset = cache_state.offset; - offset_valid = true; #if TAI_TEST - etiLog.level(info) << "Bulletin from cache valid with offset=" << offset; + etiLog.level(info) << "TAI get_valid_offset STEP 3"; #endif - } - else { - for (const auto& url : m_bulletin_urls) { - try { + + vector<Bulletin> bulletins({m_bulletin, cache_bulletin}); + + for (const auto& url : m_bulletin_urls) { + try { #if TAI_TEST - etiLog.level(info) << "Load bulletin from " << url; + etiLog.level(info) << "Load bulletin from " << url; #endif - const auto new_bulletin = download_tai_utc_bulletin(url.c_str()); - const auto new_state = parse_bulletin(new_bulletin); - if (new_state.usable()) { - m_bulletin = new_bulletin; - offset = new_state.offset; - offset_valid = true; - - etiLog.level(debug) << "Loaded valid TAI Bulletin from " << - url << " giving offset=" << offset; - } - else { - etiLog.level(debug) << "Skipping invalid TAI bulletin from " - << url; - } - } - catch (const runtime_error& e) { - etiLog.level(warn) << - "TAI-UTC offset could not be retrieved from " << - url << " : " << e.what(); - } + const auto new_bulletin = Bulletin::download_from_url(url.c_str()); + bulletins.push_back(new_bulletin); - if (offset_valid) { - update_cache(tai_cache_location); - break; - } + const auto new_state = new_bulletin.state(); + if (new_state.usable()) { + m_bulletin = new_bulletin; + new_bulletin.store_to_cache(tai_cache_location); + + etiLog.level(debug) << "Loaded valid TAI Bulletin from " << + url << " giving offset=" << new_state.offset; + return new_state; + } + else { + etiLog.level(debug) << "Skipping invalid TAI bulletin from " + << url; } } + catch (const runtime_error& e) { + etiLog.level(warn) << + "TAI-UTC offset could not be retrieved from " << + url << " : " << e.what(); + } } - if (offset_valid) { - // With the current evolution of the offset, we're probably going - // to reach 500 long after DAB gets replaced by another standard. - if (offset < 0 or offset > 500) { - stringstream ss; - ss << "TAI offset " << offset << " out of range"; - throw range_error(ss.str()); - } +#if TAI_TEST + etiLog.level(info) << "TAI get_valid_offset STEP 4"; +#endif - return offset; - } - else { - // Try again later - throw download_failed(); + // Maybe we have a valid but expired bulletin available. + // Place bulletins with largest expiry first + std::sort(bulletins.begin(), bulletins.end(), + [](const Bulletin& a, const Bulletin& b) { + return a.state().expires_at > b.state().expires_at; }); + + for (const auto& bulletin : bulletins) { + const auto& state = bulletin.state(); + if (state.valid) { + etiLog.level(warn) << "Taking TAI-UTC offset from expired bulletin from " << + bulletin.get_source() << " : " << state.offset << "s expired " << + state.expires_in() << "s ago"; + m_bulletin = bulletin; + return state; + } } + + throw download_failed(); } int ClockTAI::get_offset() { using namespace std::chrono; - const auto time_now = system_clock::now(); + const auto time_now = steady_clock::now(); std::unique_lock<std::mutex> lock(m_data_mutex); - if (not m_offset_valid) { -#ifdef DOWNLOADED_IN_THE_PAST_TEST - // Assume we've downloaded it in the past: - - m_offset = 37; // Valid in early 2017 - m_offset_valid = true; - - // Simulate requiring a new download - m_bulletin_refresh_time = time_now - hours(24 * 40); -#else + if (not m_state.has_value()) { // First time we run we must block until we know // the offset lock.unlock(); try { - m_offset = get_valid_offset(); + m_state = get_valid_offset(); } catch (const download_failed&) { throw runtime_error("Unable to download TAI bulletin"); } lock.lock(); - m_offset_valid = true; - m_bulletin_refresh_time = time_now; -#endif - etiLog.level(info) << - "Initialised TAI-UTC offset to " << m_offset << "s."; + m_state_last_updated = time_now; + etiLog.level(info) << "Initialised TAI-UTC offset to " << m_state->offset << "s."; } - if (m_bulletin_refresh_time + hours(1) < time_now) { + if (m_state_last_updated + hours(1) < time_now) { // Once per hour, parse the bulletin again, and // if necessary trigger a download. // Leap seconds are announced several months in advance @@ -457,23 +525,23 @@ int ClockTAI::get_offset() switch (state) { case future_status::ready: try { - m_offset = m_offset_future.get(); - m_offset_valid = true; - m_bulletin_refresh_time = time_now; + m_state = m_offset_future.get(); + m_state_last_updated = time_now; etiLog.level(info) << - "Updated TAI-UTC offset to " << m_offset << "s."; + "Updated TAI-UTC offset to " << m_state->offset << "s."; } catch (const download_failed&) { etiLog.level(warn) << "TAI-UTC download failed, will retry in " << refresh_retry_interval_hours << " hour(s)"; - m_bulletin_refresh_time += hours(refresh_retry_interval_hours); - } -#ifdef DOWNLOADED_IN_THE_PAST_TEST - wait_longer = false; +#if TAI_TEST + m_state_last_updated += seconds(11); +#else + m_state_last_updated += hours(refresh_retry_interval_hours); #endif + } break; case future_status::deferred: @@ -493,7 +561,10 @@ int ClockTAI::get_offset() } } - return m_offset; + if (m_state) { + return m_state->offset; + } + throw std::logic_error("ClockTAI: No valid m_state at end of get_offset()"); } #if SUPPORT_SETTING_CLOCK_TAI @@ -514,8 +585,13 @@ int ClockTAI::update_local_tai_clock(int offset) } #endif -void ClockTAI::update_cache(const char* cache_filename) +void Bulletin::store_to_cache(const char* cache_filename) const { + if (not std::holds_alternative<string>(bulletin_or_override)) { + etiLog.level(error) << "ClockTAI: Cannot store an artificial bulletin to cache!"; + } + const auto& bulletin = std::get<string>(bulletin_or_override); + int fd = open(cache_filename, O_RDWR | O_CREAT, 00664); if (fd == -1) { etiLog.level(error) << @@ -529,8 +605,8 @@ void ClockTAI::update_cache(const char* cache_filename) ssize_t ret = lockf(fd, F_LOCK, 0); if (ret == 0) { // exclusive lock acquired - const char *data = m_bulletin.data(); - size_t remaining = m_bulletin.size(); + const char *data = bulletin.data(); + size_t remaining = bulletin.size(); while (remaining > 0) { ret = write(fd, data, remaining); @@ -557,13 +633,36 @@ void ClockTAI::update_cache(const char* cache_filename) } } - +#if ENABLE_REMOTECONTROL void ClockTAI::set_parameter(const string& parameter, const string& value) { - if (parameter == "expiry") { + if (parameter == "expiry" or parameter == "expires_at") { throw ParameterError("Parameter '" + parameter + "' is read-only in controllable " + get_rc_name()); } + else if (parameter == "tai_utc_offset") { + const auto offset = std::stoi(value); + auto b = Bulletin::create_with_fixed_offset(offset); + + etiLog.level(warn) << "ClockTAI: manually overriding UTC-TAI offset to " << offset; + + std::unique_lock<std::mutex> lock(m_data_mutex); + m_bulletin = b; + m_state = b.state(); + m_state_last_updated = chrono::steady_clock::now(); + } + else if (parameter == "url") { + { + std::unique_lock<std::mutex> lock(m_data_mutex); + m_bulletin_urls = split_pipe_separated_string(value); + m_state_last_updated = chrono::steady_clock::time_point::min(); + + // Setting URL expires the bulletin, if it was manually overridden, + // so that the selection logic doesn't prefer it + m_bulletin.clear_expiry_if_overridden(); + } + etiLog.level(info) << "ClockTAI: triggering a reload from URLs..."; + } else { throw ParameterError("Parameter '" + parameter + "' is not exported by controllable " + get_rc_name()); @@ -574,13 +673,22 @@ const string ClockTAI::get_parameter(const string& parameter) const { if (parameter == "expiry") { std::unique_lock<std::mutex> lock(m_data_mutex); - const int64_t expiry = parse_bulletin(m_bulletin).expiry; - if (expiry > 0) { - return to_string(expiry); - } - else { - return "Bulletin expired or invalid!"; + return to_string(m_bulletin.state().expires_in()); + } + else if (parameter == "expires_at") { + std::unique_lock<std::mutex> lock(m_data_mutex); + return to_string(m_bulletin.state().expires_at); + } + else if (parameter == "tai_utc_offset") { + std::unique_lock<std::mutex> lock(m_data_mutex); + if (m_state) { + return to_string(m_state->offset); } + throw ParameterError("Parameter '" + parameter + + "' has no current value" + get_rc_name()); + } + else if (parameter == "url") { + return join_string_with_pipe(m_bulletin_urls);; } else { throw ParameterError("Parameter '" + parameter + @@ -588,6 +696,36 @@ const string ClockTAI::get_parameter(const string& parameter) const } } +const json::map_t ClockTAI::get_all_values() const +{ + json::map_t stat; + std::unique_lock<std::mutex> lock(m_data_mutex); + + const auto& state = m_bulletin.state(); + +#if TAI_TEST + etiLog.level(debug) << "CALC FROM m_bulletin: " << state.valid << " " << + state.offset << " " << state.expires_at << " -> " << state.expires_in(); + etiLog.level(debug) << "CACHED IN m_state: " << m_state->valid << " " << + m_state->offset << " " << m_state->expires_at << " -> " << m_state->expires_in(); +#endif + + stat["tai_utc_offset"].v = state.offset; + + stat["expiry"].v = state.expires_in(); // Might be negative when expired or 0 when invalid + if (state.valid) { + stat["expires_at"].v = state.expires_at; + } + else { + stat["expires_at"].v = nullopt; + } + + stat["url"].v = join_string_with_pipe(m_bulletin_urls); + + return stat; +} +#endif // ENABLE_REMOTECONTROL + #if 0 // Example testing code void debug_tai_clk() diff --git a/lib/ClockTAI.h b/lib/ClockTAI.h index 743cf68..d11dd10 100644 --- a/lib/ClockTAI.h +++ b/lib/ClockTAI.h @@ -3,7 +3,7 @@ 2011, 2012 Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2019 + Copyright (C) 2024 Matthias P. Braendli, matthias.braendli@mpb.li http://www.opendigitalradio.org @@ -35,21 +35,61 @@ #pragma once #include <cstdint> -#include <cstdlib> -#include <sstream> #include <chrono> #include <future> #include <mutex> #include <string> #include <vector> +#include <optional> +#include <variant> + #include "RemoteControl.h" // EDI needs to know UTC-TAI, but doesn't need the CLOCK_TAI to be set. // We can keep this code, maybe for future use #define SUPPORT_SETTING_CLOCK_TAI 0 +struct BulletinState { + bool valid = false; + int64_t expires_at = 0; + int offset = 0; + + int64_t expires_in() const; + bool usable() const; + bool expires_soon() const; +}; + +class Bulletin { + public: + static Bulletin download_from_url(const char *url); + static Bulletin create_with_fixed_offset(int offset); + static Bulletin load_from_file(const char *cache_filename); + + void clear_expiry_if_overridden(); + + void store_to_cache(const char* cache_filename) const; + + std::string get_source() const { return source; } + BulletinState state() const; + private: + // URL or file path from which the bulletin has been/will be loaded + std::string source; + + struct OverrideData { + int offset = 0; + int expires_at = 0; + }; + // string: A cache of the bulletin, or empty string if not loaded + // int: A manually overridden offset + std::variant<std::string, OverrideData> bulletin_or_override; +}; + /* Loads, parses and represents TAI-UTC offset information from the IETF bulletin */ -class ClockTAI : public RemoteControllable { +class ClockTAI +#if ENABLE_REMOTECONTROL +: public RemoteControllable +#endif // ENABLE_REMOTECONTROL +{ public: ClockTAI(const std::vector<std::string>& bulletin_urls); @@ -71,33 +111,33 @@ class ClockTAI : public RemoteControllable { // download it, and calculate the TAI-UTC offset. // Returns the offset or throws download_failed or a range_error // if the offset is out of bounds. - int get_valid_offset(void); + BulletinState get_valid_offset(void); // Download of new bulletin is done asynchronously - std::future<int> m_offset_future; + std::future<BulletinState> m_offset_future; // Protect all data members, as RC functions are in another thread mutable std::mutex m_data_mutex; - // The currently used TAI-UTC offset, extracted from m_bulletin and cached here - // to avoid having to parse the bulletin all the time - int m_offset = 0; - int m_offset_valid = false; - std::vector<std::string> m_bulletin_urls; - std::string m_bulletin; - std::chrono::system_clock::time_point m_bulletin_refresh_time; - - // Update the cache file with the current m_bulletin - void update_cache(const char* cache_filename); + Bulletin m_bulletin; + // The currently used TAI-UTC offset, extracted the bulletin and cached + // here to avoid having to parse the bulletin all the time + std::optional<BulletinState> m_state; + std::chrono::steady_clock::time_point m_state_last_updated; +#if ENABLE_REMOTECONTROL + public: /* Remote control */ virtual void set_parameter(const std::string& parameter, const std::string& value); /* Getting a parameter always returns a string. */ virtual const std::string get_parameter(const std::string& parameter) const; + + virtual const json::map_t get_all_values() const; +#endif // ENABLE_REMOTECONTROL }; diff --git a/lib/Globals.cpp b/lib/Globals.cpp index 6be26ec..6bd38fb 100644 --- a/lib/Globals.cpp +++ b/lib/Globals.cpp @@ -32,5 +32,7 @@ // the RC needs logging, and needs to be initialised later. Logger etiLog; +#if ENABLE_REMOTECONTROL RemoteControllers rcs; +#endif // ENABLE_REMOTECONTROL diff --git a/lib/RemoteControl.cpp b/lib/RemoteControl.cpp deleted file mode 100644 index 30dcb60..0000000 --- a/lib/RemoteControl.cpp +++ /dev/null @@ -1,579 +0,0 @@ -/* - Copyright (C) 2007, 2008, 2009, 2010, 2011, 2012 - Her Majesty the Queen in Right of Canada (Communications Research - Center Canada) - - Copyright (C) 2019 - Matthias P. Braendli, matthias.braendli@mpb.li - - http://www.opendigitalradio.org - */ -/* - This program is free software: you can redistribute it and/or modify - it under the terms of the GNU General Public License as published by - the Free Software Foundation, either version 3 of the License, or - (at your option) any later version. - - This program is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with this program. If not, see <https://www.gnu.org/licenses/>. - */ -#include <list> -#include <string> -#include <iostream> -#include <string> -#include <algorithm> - -#include "RemoteControl.h" -#if defined(HAVE_ZEROMQ) - #include "zmq.hpp" -#endif - -using namespace std; - -RemoteControllerTelnet::~RemoteControllerTelnet() -{ - m_active = false; - - if (m_restarter_thread.joinable()) { - m_restarter_thread.join(); - } - - if (m_child_thread.joinable()) { - m_child_thread.join(); - } -} - -void RemoteControllerTelnet::restart() -{ - if (m_restarter_thread.joinable()) { - m_restarter_thread.join(); - } - - m_restarter_thread = std::thread( - &RemoteControllerTelnet::restart_thread, - this, 0); -} - -RemoteControllable::~RemoteControllable() { - rcs.remove_controllable(this); -} - -std::list<std::string> RemoteControllable::get_supported_parameters() const { - std::list<std::string> parameterlist; - for (const auto& param : m_parameters) { - parameterlist.push_back(param[0]); - } - return parameterlist; -} - -void RemoteControllers::add_controller(std::shared_ptr<BaseRemoteController> rc) { - m_controllers.push_back(rc); -} - -void RemoteControllers::enrol(RemoteControllable *rc) { - controllables.push_back(rc); -} - -void RemoteControllers::remove_controllable(RemoteControllable *rc) { - controllables.remove(rc); -} - -std::list< std::vector<std::string> > RemoteControllers::get_param_list_values(const std::string& name) { - RemoteControllable* controllable = get_controllable_(name); - - std::list< std::vector<std::string> > allparams; - for (auto ¶m : controllable->get_supported_parameters()) { - std::vector<std::string> item; - item.push_back(param); - try { - item.push_back(controllable->get_parameter(param)); - } - catch (const ParameterError &e) { - item.push_back(std::string("error: ") + e.what()); - } - - allparams.push_back(item); - } - return allparams; -} - -std::string RemoteControllers::get_param(const std::string& name, const std::string& param) { - RemoteControllable* controllable = get_controllable_(name); - return controllable->get_parameter(param); -} - -void RemoteControllers::check_faults() { - for (auto &controller : m_controllers) { - if (controller->fault_detected()) { - etiLog.level(warn) << - "Detected Remote Control fault, restarting it"; - controller->restart(); - } - } -} - -RemoteControllable* RemoteControllers::get_controllable_(const std::string& name) -{ - auto rc = std::find_if(controllables.begin(), controllables.end(), - [&](RemoteControllable* r) { return r->get_rc_name() == name; }); - - if (rc == controllables.end()) { - throw ParameterError("Module name unknown"); - } - else { - return *rc; - } -} - -void RemoteControllers::set_param( - const std::string& name, - const std::string& param, - const std::string& value) -{ - etiLog.level(info) << "RC: Setting " << name << " " << param - << " to " << value; - RemoteControllable* controllable = get_controllable_(name); - try { - return controllable->set_parameter(param, value); - } - catch (const ios_base::failure& e) { - etiLog.level(info) << "RC: Failed to set " << name << " " << param - << " to " << value << ": " << e.what(); - throw ParameterError("Cannot understand value"); - } -} - -// This runs in a separate thread, because -// it would take too long to be done in the main loop -// thread. -void RemoteControllerTelnet::restart_thread(long) -{ - m_active = false; - - if (m_child_thread.joinable()) { - m_child_thread.join(); - } - - m_child_thread = std::thread(&RemoteControllerTelnet::process, this, 0); -} - -void RemoteControllerTelnet::handle_accept(Socket::TCPSocket&& socket) -{ - const std::string welcome = PACKAGE_NAME " Remote Control CLI\n" - "Write 'help' for help.\n" - "**********\n"; - const std::string prompt = "> "; - - std::string in_message; - - try { - etiLog.level(info) << "RC: Accepted"; - - socket.sendall(welcome.data(), welcome.size()); - - while (m_active and in_message != "quit") { - socket.sendall(prompt.data(), prompt.size()); - - stringstream in_message_stream; - - char last_char = '\0'; - try { - while (last_char != '\n') { - try { - auto ret = socket.recv(&last_char, 1, 0, 1000); - if (ret == 1) { - in_message_stream << last_char; - } - else { - break; - } - } - catch (const Socket::TCPSocket::Timeout&) { - if (not m_active) { - break; - } - } - } - } - catch (const Socket::TCPSocket::Interrupted&) { - in_message_stream.clear(); - } - - - if (in_message_stream.str().size() == 0) { - etiLog.level(info) << "RC: Connection terminated"; - break; - } - - std::getline(in_message_stream, in_message); - - while (in_message.length() > 0 && - (in_message[in_message.length()-1] == '\r' || - in_message[in_message.length()-1] == '\n')) { - in_message.erase(in_message.length()-1, 1); - } - - if (in_message.length() == 0) { - continue; - } - - etiLog.level(info) << "RC: Got message '" << in_message << "'"; - - dispatch_command(socket, in_message); - } - etiLog.level(info) << "RC: Closing socket"; - socket.close(); - } - catch (const std::exception& e) { - etiLog.level(error) << "Remote control caught exception: " << e.what(); - } -} - -void RemoteControllerTelnet::process(long) -{ - try { - m_active = true; - - m_socket.listen(m_port, "localhost"); - - etiLog.level(info) << "RC: Waiting for connection on port " << m_port; - while (m_active) { - auto sock = m_socket.accept(1000); - - if (sock.valid()) { - handle_accept(move(sock)); - etiLog.level(info) << "RC: Connection closed. Waiting for connection on port " << m_port; - } - } - } - catch (const runtime_error& e) { - etiLog.level(warn) << "RC: Encountered error: " << e.what(); - } - - etiLog.level(info) << "RC: Leaving"; - m_fault = true; -} - -static std::vector<std::string> tokenise(const std::string& message) { - stringstream ss(message); - std::vector<std::string> all_tokens; - std::string item; - - while (std::getline(ss, item, ' ')) { - all_tokens.push_back(move(item)); - } - return all_tokens; -} - - -void RemoteControllerTelnet::dispatch_command(Socket::TCPSocket& socket, string command) -{ - vector<string> cmd = tokenise(command); - - if (cmd[0] == "help") { - reply(socket, - "The following commands are supported:\n" - " list\n" - " * Lists the modules that are loaded and their parameters\n" - " show MODULE\n" - " * Lists all parameters and their values from module MODULE\n" - " get MODULE PARAMETER\n" - " * Gets the value for the specified PARAMETER from module MODULE\n" - " set MODULE PARAMETER VALUE\n" - " * Sets the value for the PARAMETER ofr module MODULE\n" - " quit\n" - " * Terminate this session\n" - "\n"); - } - else if (cmd[0] == "list") { - stringstream ss; - - if (cmd.size() == 1) { - for (auto &controllable : rcs.controllables) { - ss << controllable->get_rc_name() << endl; - - list< vector<string> > params = controllable->get_parameter_descriptions(); - for (auto ¶m : params) { - ss << "\t" << param[0] << " : " << param[1] << endl; - } - } - } - else { - reply(socket, "Too many arguments for command 'list'"); - } - - reply(socket, ss.str()); - } - else if (cmd[0] == "show") { - if (cmd.size() == 2) { - try { - stringstream ss; - list< vector<string> > r = rcs.get_param_list_values(cmd[1]); - for (auto ¶m_val : r) { - ss << param_val[0] << ": " << param_val[1] << endl; - } - reply(socket, ss.str()); - - } - catch (const ParameterError &e) { - reply(socket, e.what()); - } - } - else { - reply(socket, "Incorrect parameters for command 'show'"); - } - } - else if (cmd[0] == "get") { - if (cmd.size() == 3) { - try { - string r = rcs.get_param(cmd[1], cmd[2]); - reply(socket, r); - } - catch (const ParameterError &e) { - reply(socket, e.what()); - } - } - else { - reply(socket, "Incorrect parameters for command 'get'"); - } - } - else if (cmd[0] == "set") { - if (cmd.size() >= 4) { - try { - stringstream new_param_value; - for (size_t i = 3; i < cmd.size(); i++) { - new_param_value << cmd[i]; - - if (i+1 < cmd.size()) { - new_param_value << " "; - } - } - - rcs.set_param(cmd[1], cmd[2], new_param_value.str()); - reply(socket, "ok"); - } - catch (const ParameterError &e) { - reply(socket, e.what()); - } - catch (const exception &e) { - reply(socket, "Error: Invalid parameter value. "); - } - } - else { - reply(socket, "Incorrect parameters for command 'set'"); - } - } - else if (cmd[0] == "quit") { - reply(socket, "Goodbye"); - } - else { - reply(socket, "Message not understood"); - } -} - -void RemoteControllerTelnet::reply(Socket::TCPSocket& socket, string message) -{ - stringstream ss; - ss << message << "\r\n"; - socket.sendall(message.data(), message.size()); -} - - -#if defined(HAVE_ZEROMQ) - -RemoteControllerZmq::~RemoteControllerZmq() { - m_active = false; - m_fault = false; - - if (m_restarter_thread.joinable()) { - m_restarter_thread.join(); - } - - if (m_child_thread.joinable()) { - m_child_thread.join(); - } -} - -void RemoteControllerZmq::restart() -{ - if (m_restarter_thread.joinable()) { - m_restarter_thread.join(); - } - - m_restarter_thread = std::thread(&RemoteControllerZmq::restart_thread, this); -} - -// This runs in a separate thread, because -// it would take too long to be done in the main loop -// thread. -void RemoteControllerZmq::restart_thread() -{ - m_active = false; - - if (m_child_thread.joinable()) { - m_child_thread.join(); - } - - m_child_thread = std::thread(&RemoteControllerZmq::process, this); -} - -void RemoteControllerZmq::recv_all(zmq::socket_t& pSocket, std::vector<std::string> &message) -{ - bool more = true; - do { - zmq::message_t msg; - pSocket.recv(msg); - std::string incoming((char*)msg.data(), msg.size()); - message.push_back(incoming); - more = msg.more(); - } while (more); -} - -void RemoteControllerZmq::send_ok_reply(zmq::socket_t &pSocket) -{ - zmq::message_t msg(2); - char repCode[2] = {'o', 'k'}; - memcpy ((void*) msg.data(), repCode, 2); - pSocket.send(msg, zmq::send_flags::none); -} - -void RemoteControllerZmq::send_fail_reply(zmq::socket_t &pSocket, const std::string &error) -{ - zmq::message_t msg1(4); - char repCode[4] = {'f', 'a', 'i', 'l'}; - memcpy ((void*) msg1.data(), repCode, 4); - pSocket.send(msg1, zmq::send_flags::sndmore); - - zmq::message_t msg2(error.length()); - memcpy ((void*) msg2.data(), error.c_str(), error.length()); - pSocket.send(msg2, zmq::send_flags::none); -} - -void RemoteControllerZmq::process() -{ - m_fault = false; - - // create zmq reply socket for receiving ctrl parameters - try { - zmq::socket_t repSocket(m_zmqContext, ZMQ_REP); - - // connect the socket - int hwm = 100; - int linger = 0; - repSocket.setsockopt(ZMQ_RCVHWM, &hwm, sizeof(hwm)); - repSocket.setsockopt(ZMQ_SNDHWM, &hwm, sizeof(hwm)); - repSocket.setsockopt(ZMQ_LINGER, &linger, sizeof(linger)); - repSocket.bind(m_endpoint.c_str()); - - // create pollitem that polls the ZMQ sockets - zmq::pollitem_t pollItems[] = { {repSocket, 0, ZMQ_POLLIN, 0} }; - while (m_active) { - zmq::poll(pollItems, 1, 100); - std::vector<std::string> msg; - - if (pollItems[0].revents & ZMQ_POLLIN) { - recv_all(repSocket, msg); - - std::string command((char*)msg[0].data(), msg[0].size()); - - if (msg.size() == 1 && command == "ping") { - send_ok_reply(repSocket); - } - else if (msg.size() == 1 && command == "list") { - size_t cohort_size = rcs.controllables.size(); - for (auto &controllable : rcs.controllables) { - std::stringstream ss; - ss << "{ \"name\": \"" << controllable->get_rc_name() << "\"," << - " \"params\": { "; - - list< vector<string> > params = controllable->get_parameter_descriptions(); - size_t i = 0; - for (auto ¶m : params) { - if (i > 0) { - ss << ", "; - } - - ss << "\"" << param[0] << "\": " << - "\"" << param[1] << "\""; - - i++; - } - - ss << " } }"; - - std::string msg_s = ss.str(); - - zmq::message_t zmsg(ss.str().size()); - memcpy ((void*) zmsg.data(), msg_s.data(), msg_s.size()); - - repSocket.send(zmsg, (--cohort_size > 0) ? zmq::send_flags::sndmore : zmq::send_flags::none); - } - } - else if (msg.size() == 2 && command == "show") { - std::string module((char*) msg[1].data(), msg[1].size()); - try { - list< vector<string> > r = rcs.get_param_list_values(module); - size_t r_size = r.size(); - for (auto ¶m_val : r) { - std::stringstream ss; - ss << param_val[0] << ": " << param_val[1] << endl; - zmq::message_t zmsg(ss.str().size()); - memcpy(zmsg.data(), ss.str().data(), ss.str().size()); - - repSocket.send(zmsg, (--r_size > 0) ? zmq::send_flags::sndmore : zmq::send_flags::none); - } - } - catch (const ParameterError &err) { - send_fail_reply(repSocket, err.what()); - } - } - else if (msg.size() == 3 && command == "get") { - std::string module((char*) msg[1].data(), msg[1].size()); - std::string parameter((char*) msg[2].data(), msg[2].size()); - - try { - std::string value = rcs.get_param(module, parameter); - zmq::message_t zmsg(value.size()); - memcpy ((void*) zmsg.data(), value.data(), value.size()); - repSocket.send(zmsg, zmq::send_flags::none); - } - catch (const ParameterError &err) { - send_fail_reply(repSocket, err.what()); - } - } - else if (msg.size() == 4 && command == "set") { - std::string module((char*) msg[1].data(), msg[1].size()); - std::string parameter((char*) msg[2].data(), msg[2].size()); - std::string value((char*) msg[3].data(), msg[3].size()); - - try { - rcs.set_param(module, parameter, value); - send_ok_reply(repSocket); - } - catch (const ParameterError &err) { - send_fail_reply(repSocket, err.what()); - } - } - else { - send_fail_reply(repSocket, - "Unsupported command. commands: list, show, get, set"); - } - } - } - repSocket.close(); - } - catch (const zmq::error_t &e) { - etiLog.level(error) << "ZMQ RC error: " << std::string(e.what()); - } - catch (const std::exception& e) { - etiLog.level(error) << "ZMQ RC caught exception: " << e.what(); - m_fault = true; - } -} - -#endif diff --git a/lib/RemoteControl.h b/lib/RemoteControl.h index 2358b3a..eb87d22 100644 --- a/lib/RemoteControl.h +++ b/lib/RemoteControl.h @@ -1,251 +1,2 @@ -/* - Copyright (C) 2007, 2008, 2009, 2010, 2011, 2012 - Her Majesty the Queen in Right of Canada (Communications Research - Center Canada) - - Copyright (C) 2019 - Matthias P. Braendli, matthias.braendli@mpb.li - - http://www.opendigitalradio.org - - This module adds remote-control capability to some of the dabmux/dabmod modules. - */ -/* - This program is free software: you can redistribute it and/or modify - it under the terms of the GNU General Public License as published by - the Free Software Foundation, either version 3 of the License, or - (at your option) any later version. - - This program is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with this program. If not, see <https://www.gnu.org/licenses/>. - */ - #pragma once - -#ifdef HAVE_CONFIG_H -# include "config.h" -#endif - -#if defined(HAVE_ZEROMQ) -# include "zmq.hpp" -#endif - -#include <list> -#include <map> -#include <memory> -#include <string> -#include <atomic> -#include <iostream> -#include <thread> -#include <stdexcept> - -#include "Log.h" -#include "Socket.h" - -#define RC_ADD_PARAMETER(p, desc) { \ - std::vector<std::string> p; \ - p.push_back(#p); \ - p.push_back(desc); \ - m_parameters.push_back(p); \ -} - -class ParameterError : public std::exception -{ - public: - ParameterError(std::string message) : m_message(message) {} - ~ParameterError() throw() {} - const char* what() const throw() { return m_message.c_str(); } - - private: - std::string m_message; -}; - -class RemoteControllable; - -/* Remote controllers (that recieve orders from the user) - * must implement BaseRemoteController - */ -class BaseRemoteController { - public: - /* When this returns one, the remote controller cannot be - * used anymore, and must be restarted - */ - virtual bool fault_detected() = 0; - - /* In case of a fault, the remote controller can be - * restarted. - */ - virtual void restart() = 0; - - virtual ~BaseRemoteController() {} -}; - -/* Objects that support remote control must implement the following class */ -class RemoteControllable { - public: - RemoteControllable(const std::string& name) : - m_rc_name(name) {} - - RemoteControllable(const RemoteControllable& other) = delete; - RemoteControllable& operator=(const RemoteControllable& other) = delete; - - virtual ~RemoteControllable(); - - /* return a short name used to identify the controllable. - * It might be used in the commands the user has to type, so keep - * it short - */ - virtual std::string get_rc_name() const { return m_rc_name; } - - /* Return a list of possible parameters that can be set */ - virtual std::list<std::string> get_supported_parameters() const; - - /* Return a mapping of the descriptions of all parameters */ - virtual std::list< std::vector<std::string> > - get_parameter_descriptions() const - { - return m_parameters; - } - - /* Base function to set parameters. */ - virtual void set_parameter( - const std::string& parameter, - const std::string& value) = 0; - - /* Getting a parameter always returns a string. */ - virtual const std::string get_parameter(const std::string& parameter) const = 0; - - protected: - std::string m_rc_name; - std::list< std::vector<std::string> > m_parameters; -}; - -/* Holds all our remote controllers and controlled object. - */ -class RemoteControllers { - public: - void add_controller(std::shared_ptr<BaseRemoteController> rc); - void enrol(RemoteControllable *rc); - void remove_controllable(RemoteControllable *rc); - void check_faults(); - std::list< std::vector<std::string> > get_param_list_values(const std::string& name); - std::string get_param(const std::string& name, const std::string& param); - - void set_param( - const std::string& name, - const std::string& param, - const std::string& value); - - std::list<RemoteControllable*> controllables; - - private: - RemoteControllable* get_controllable_(const std::string& name); - - std::list<std::shared_ptr<BaseRemoteController> > m_controllers; -}; - -/* rcs is a singleton used in all parts of the program to interact with the RC. - * It is constructed in Globals.cpp */ -extern RemoteControllers rcs; - -/* Implements a Remote controller based on a simple telnet CLI - * that listens on localhost - */ -class RemoteControllerTelnet : public BaseRemoteController { - public: - RemoteControllerTelnet() - : m_active(false), - m_fault(false), - m_port(0) { } - - RemoteControllerTelnet(int port) - : m_active(port > 0), - m_fault(false), - m_port(port) - { - restart(); - } - - - RemoteControllerTelnet& operator=(const RemoteControllerTelnet& other) = delete; - RemoteControllerTelnet(const RemoteControllerTelnet& other) = delete; - - ~RemoteControllerTelnet(); - - virtual bool fault_detected() { return m_fault; } - - virtual void restart(); - - private: - void restart_thread(long); - - void process(long); - - void dispatch_command(Socket::TCPSocket& socket, std::string command); - void reply(Socket::TCPSocket& socket, std::string message); - void handle_accept(Socket::TCPSocket&& socket); - - std::atomic<bool> m_active; - - /* This is set to true if a fault occurred */ - std::atomic<bool> m_fault; - std::thread m_restarter_thread; - - std::thread m_child_thread; - - Socket::TCPSocket m_socket; - int m_port; -}; - -#if defined(HAVE_ZEROMQ) -/* Implements a Remote controller using ZMQ transportlayer - * that listens on localhost - */ -class RemoteControllerZmq : public BaseRemoteController { - public: - RemoteControllerZmq() - : m_active(false), m_fault(false), - m_zmqContext(1), - m_endpoint("") { } - - RemoteControllerZmq(const std::string& endpoint) - : m_active(not endpoint.empty()), m_fault(false), - m_zmqContext(1), - m_endpoint(endpoint), - m_child_thread(&RemoteControllerZmq::process, this) { } - - RemoteControllerZmq& operator=(const RemoteControllerZmq& other) = delete; - RemoteControllerZmq(const RemoteControllerZmq& other) = delete; - - ~RemoteControllerZmq(); - - virtual bool fault_detected() { return m_fault; } - - virtual void restart(); - - private: - void restart_thread(); - - void recv_all(zmq::socket_t &pSocket, std::vector<std::string> &message); - void send_ok_reply(zmq::socket_t &pSocket); - void send_fail_reply(zmq::socket_t &pSocket, const std::string &error); - void process(); - - std::atomic<bool> m_active; - - /* This is set to true if a fault occurred */ - std::atomic<bool> m_fault; - std::thread m_restarter_thread; - - zmq::context_t m_zmqContext; - - std::string m_endpoint; - std::thread m_child_thread; -}; -#endif - +#define ENABLE_REMOTECONTROL 0 |