diff options
Diffstat (limited to 'src')
59 files changed, 1111 insertions, 5658 deletions
| diff --git a/src/ClockTAI.cpp b/src/ClockTAI.cpp deleted file mode 100644 index c376c07..0000000 --- a/src/ClockTAI.cpp +++ /dev/null @@ -1,562 +0,0 @@ -/* -   Copyright (C) 2002, 2003, 2004, 2005, 2006, 2007, 2008, 2009, 2010, -   2011, 2012 Her Majesty the Queen in Right of Canada (Communications -   Research Center Canada) - -   Copyright (C) 2019 -   Matthias P. Braendli, matthias.braendli@mpb.li - -    http://www.opendigitalradio.org -   */ -/* -   This file is part of ODR-DabMux. - -   ODR-DabMux is free software: you can redistribute it and/or modify -   it under the terms of the GNU General Public License as -   published by the Free Software Foundation, either version 3 of the -   License, or (at your option) any later version. - -   ODR-DabMux is distributed in the hope that it will be useful, -   but WITHOUT ANY WARRANTY; without even the implied warranty of -   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the -   GNU General Public License for more details. - -   You should have received a copy of the GNU General Public License -   along with ODR-DabMux.  If not, see <http://www.gnu.org/licenses/>. -*/ - -/* This file downloads the TAI-UTC bulletins from the from IETF and parses them - * so that correct time can be communicated in EDI timestamps. - * - * This file contains self-test code that can be executed by running - *  g++ -g -Wall -DTEST -DHAVE_CURL -std=c++11 -lcurl -pthread \ - *  ClockTAI.cpp Log.cpp RemoteControl.cpp -lboost_system -o taitest && ./taitest - */ - -#ifdef HAVE_CONFIG_H -#   include "config.h" -#endif - -#include "ClockTAI.h" -#include "Log.h" - -#include <time.h> -#include <stdio.h> -#include <errno.h> -#if SUPPORT_SETTING_CLOCK_TAI -#  include <sys/timex.h> -#endif -#ifdef HAVE_CURL -#  include <curl/curl.h> -#endif -#include <array> -#include <string> -#include <iostream> -#include <algorithm> -#include <regex> - -using namespace std; - -#ifdef TEST -static bool wait_longer = true; -#endif - -constexpr int download_retry_interval_hours = 1; - -// Offset between NTP time and POSIX time: -// timestamp_unix = timestamp_ntp - ntp_unix_offset -const int64_t ntp_unix_offset = 2208988800L; - -// leap seconds insertion bulletin is available from the IETF and in the TZ -// distribution -static array<const char*, 2> default_tai_urls = { -    "https://www.ietf.org/timezones/data/leap-seconds.list", -    "https://raw.githubusercontent.com/eggert/tz/master/leap-seconds.list", -}; - -// According to the Filesystem Hierarchy Standard, the data in -// /var/tmp "must not be deleted when the system is booted." -static const char *tai_cache_location = "/var/tmp/odr-dabmux-leap-seconds.cache"; - -// read TAI offset from a valid bulletin in IETF format -static int parse_ietf_bulletin(const std::string& bulletin) -{ -    // Example Line: -    // 3692217600	37	# 1 Jan 2017 -    // -    // NTP timestamp<TAB>leap seconds<TAB># some comment -    // The NTP timestamp starts at epoch 1.1.1900. -    // The difference between NTP timestamps and unix epoch is 70 -    // years i.e. 2208988800 seconds - -    std::regex regex_bulletin(R"(([0-9]+)\s+([0-9]+)\s+#.*)"); - -    time_t now = time(nullptr); - -    int tai_utc_offset = 0; - -    int tai_utc_offset_valid = false; - -    stringstream ss(bulletin); - -    /* We cannot just take the last line, because it might -     * be in the future, announcing an upcoming leap second. -     * -     * So we need to look at the current date, and compare it -     * with the date of the leap second. -     */ -    for (string line; getline(ss, line); ) { - -        std::smatch bulletin_entry; - -        bool is_match = std::regex_search(line, bulletin_entry, regex_bulletin); -        if (is_match) { -            if (bulletin_entry.size() != 3) { -                throw runtime_error( -                        "Incorrect number of matched TAI IETF bulletin entries"); -            } -            const string bulletin_ntp_timestamp(bulletin_entry[1]); -            const string bulletin_offset(bulletin_entry[2]); - -            const int64_t timestamp_unix = -                std::atoll(bulletin_ntp_timestamp.c_str()) - ntp_unix_offset; - -            const int offset = std::atoi(bulletin_offset.c_str()); -            // Ignore entries announcing leap seconds in the future -            if (timestamp_unix < now) { -                tai_utc_offset = offset; -                tai_utc_offset_valid = true; -            } -#if TEST -            else { -                cerr << "IETF Ignoring offset " << bulletin_offset << -                    " at TS " << bulletin_ntp_timestamp << -                    " in the future" << endl; -            } -#endif -        } -    } - -    if (not tai_utc_offset_valid) { -        throw runtime_error("No data in TAI bulletin"); -    } - -    return tai_utc_offset; -} - - -struct bulletin_state { -    bool valid = false; -    int64_t expiry = 0; -    int offset = 0; - -    bool usable() const { return valid and expiry > 0; } -}; - -static bulletin_state parse_bulletin(const string& bulletin) -{ -    // The bulletin contains one line that specifies an expiration date -    // in NTP time. If that point in time is in the future, we consider -    // the bulletin valid. -    // -    // The entry looks like this: -    //#@	3707596800 - -    bulletin_state ret; - -    std::regex regex_expiration(R"(#@\s+([0-9]+))"); - -    time_t now = time(nullptr); - -    stringstream ss(bulletin); - -    for (string line; getline(ss, line); ) { -        std::smatch bulletin_entry; - -        bool is_match = std::regex_search(line, bulletin_entry, regex_expiration); -        if (is_match) { -            if (bulletin_entry.size() != 2) { -                throw runtime_error( -                        "Incorrect number of matched TAI IETF bulletin expiration"); -            } -            const string expiry_data_str(bulletin_entry[1]); -            const int64_t expiry_unix = -                std::atoll(expiry_data_str.c_str()) - ntp_unix_offset; - -#ifdef TEST -            etiLog.level(info) << "Bulletin expires in " << expiry_unix - now; -#endif -            ret.expiry = expiry_unix - now; -            try { -                ret.offset = parse_ietf_bulletin(bulletin); -                ret.valid = true; -            } -            catch (const runtime_error& e) { -                etiLog.level(warn) << "Bulletin expiry ok but parse error: " << e.what(); -            } -            break; -        } -    } -    return ret; -} - - -// callback that receives data from cURL -static size_t fill_bulletin(char *ptr, size_t size, size_t nmemb, void *ctx) -{ -    auto *bulletin = reinterpret_cast<stringstream*>(ctx); - -    size_t len = size * nmemb; -    for (size_t i = 0; i < len; i++) { -        *bulletin << ptr[i]; -    } -    return len; -} - -static string download_tai_utc_bulletin(const char* url) -{ -    stringstream bulletin; - -#ifdef HAVE_CURL -    CURL *curl; -    CURLcode res; - -    curl = curl_easy_init(); -    if (curl) { -        curl_easy_setopt(curl, CURLOPT_URL, url); -        /* Tell libcurl to follow redirection */ -        curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1L); -        curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, fill_bulletin); -        curl_easy_setopt(curl, CURLOPT_WRITEDATA, &bulletin); - -        res = curl_easy_perform(curl); -        /* always cleanup ! */ -        curl_easy_cleanup(curl); - -        if (res != CURLE_OK) { -            throw runtime_error( "TAI-UTC bulletin download failed: " + -                    string(curl_easy_strerror(res))); -        } -    } -    return bulletin.str(); -#else -    throw runtime_error("Cannot download TAI Clock information without cURL"); -#endif // HAVE_CURL -} - -static string load_bulletin_from_file(const char* cache_filename) -{ -    // Clear the bulletin -    ifstream f(cache_filename); -    if (not f.good()) { -        return {}; -    } - -    stringstream ss; -    ss << f.rdbuf(); -    f.close(); - -    return ss.str(); -} - -ClockTAI::ClockTAI(const std::vector<std::string>& bulletin_urls) : -    RemoteControllable("clocktai") -{ -    RC_ADD_PARAMETER(expiry, "Number of seconds until TAI Bulletin expires"); - -    if (bulletin_urls.empty()) { -        etiLog.level(debug) << "Initialising default TAI Bulletin URLs"; -        for (const auto url : default_tai_urls) { -            m_bulletin_urls.push_back(url); -        } -    } -    else { -        etiLog.level(debug) << "Initialising user-configured TAI Bulletin URLs"; -        m_bulletin_urls = bulletin_urls; -    } - -    for (const auto url : m_bulletin_urls) { -        etiLog.level(info) << "TAI Bulletin URL: '" << url << "'"; -    } -} - -int ClockTAI::get_valid_offset() -{ -    int offset = 0; -    bool offset_valid = false; - -    std::unique_lock<std::mutex> lock(m_data_mutex); - -    const auto state = parse_bulletin(m_bulletin); -    if (state.usable()) { -#if TEST -        etiLog.level(info) << "Bulletin already valid"; -#endif -        offset = state.offset; -        offset_valid = true; -    } -    else { -        const auto cache_bulletin = load_bulletin_from_file(tai_cache_location); -        const auto cache_state = parse_bulletin(cache_bulletin); - -        if (cache_state.usable()) { -            m_bulletin = cache_bulletin; -            offset = cache_state.offset; -            offset_valid = true; -#if TEST -            etiLog.level(info) << "Bulletin from cache valid with offset=" << offset; -#endif -        } -        else { -            for (const auto url : m_bulletin_urls) { -                try { -#if TEST -                    etiLog.level(info) << "Load bulletin from " << url; -#endif -                    const auto new_bulletin = download_tai_utc_bulletin(url.c_str()); -                    const auto new_state = parse_bulletin(new_bulletin); -                    if (new_state.usable()) { -                        m_bulletin = new_bulletin; -                        offset = new_state.offset; -                        offset_valid = true; - -                        etiLog.level(debug) << "Loaded valid TAI Bulletin from " << -                            url << " giving offset=" << offset; -                    } -                    else { -                        etiLog.level(debug) << "Skipping invalid TAI bulletin from " -                            << url; -                    } -                } -                catch (const runtime_error& e) { -                    etiLog.level(warn) << -                        "TAI-UTC offset could not be retrieved from " << -                        url << " : " << e.what(); -                } - -                if (offset_valid) { -                    update_cache(tai_cache_location); -                    break; -                } -            } -        } -    } - -    if (offset_valid) { -        // With the current evolution of the offset, we're probably going -        // to reach 500 long after DAB gets replaced by another standard. -        if (offset < 0 or offset > 500) { -            stringstream ss; -            ss << "TAI offset " << offset << " out of range"; -            throw range_error(ss.str()); -        } - -        return offset; -    } -    else { -        // Try again later -        throw download_failed(); -    } -} - - -int ClockTAI::get_offset() -{ -    using namespace std::chrono; -    const auto time_now = system_clock::now(); - -    std::unique_lock<std::mutex> lock(m_data_mutex); - -    if (not m_offset_valid) { -#ifdef TEST -        // Assume we've downloaded it in the past: - -        m_offset = 37; // Valid in early 2017 -        m_offset_valid = true; - -        // Simulate requiring a new download -        m_bulletin_download_time = time_now - hours(24 * 40); -#else -        // First time we run we must block until we know -        // the offset -        lock.unlock(); -        try { -            m_offset = get_valid_offset(); -        } -        catch (const download_failed&) { -            throw runtime_error("Unable to download TAI bulletin"); -        } -        lock.lock(); -        m_offset_valid = true; -        m_bulletin_download_time = time_now; -#endif -        etiLog.level(info) << -            "Initialised TAI-UTC offset to " << m_offset << "s."; -    } - -    if (time_now - m_bulletin_download_time > hours(24 * 31)) { -        // Refresh if it's older than one month. Leap seconds are -        // announced several months in advance -        etiLog.level(debug) << "Trying to refresh TAI bulletin"; - -        if (m_offset_future.valid()) { -            auto state = m_offset_future.wait_for(seconds(0)); -            switch (state) { -                case future_status::ready: -                    try { -                        m_offset = m_offset_future.get(); -                        m_offset_valid = true; -                        m_bulletin_download_time = time_now; - -                        etiLog.level(info) << -                            "Updated TAI-UTC offset to " << m_offset << "s."; -                    } -                    catch (const download_failed&) { -                        etiLog.level(warn) << -                            "TAI-UTC download failed, will retry in " << -                            download_retry_interval_hours << " hour(s)"; - -                        m_bulletin_download_time += hours(download_retry_interval_hours); -                    } -#ifdef TEST -                    wait_longer = false; -#endif -                    break; - -                case future_status::deferred: -                case future_status::timeout: -                    // Not ready yet -#ifdef TEST -                    etiLog.level(debug) << "  async not ready yet"; -#endif -                    break; -            } -        } -        else { -#ifdef TEST -            etiLog.level(debug) << " Launch async"; -#endif -            m_offset_future = async(launch::async, &ClockTAI::get_valid_offset, this); -        } -    } - -    return m_offset; -} - -#if SUPPORT_SETTING_CLOCK_TAI -int ClockTAI::update_local_tai_clock(int offset) -{ -    struct timex timex_request; -    timex_request.modes = ADJ_TAI; -    timex_request.constant = offset; - -    int err = adjtimex(&timex_request); -    if (err == -1) { -        perror("adjtimex"); -    } - -    printf("adjtimex: %d, tai %d\n", err, timex_request.tai); - -    return err; -} -#endif - -void ClockTAI::update_cache(const char* cache_filename) -{ -    ofstream f(cache_filename); -    if (not f.good()) { -        throw runtime_error("TAI-UTC bulletin open cache for writing"); -    } - -    f << m_bulletin; -    f.close(); -} - - -void ClockTAI::set_parameter(const string& parameter, const string& value) -{ -    if (parameter == "expiry") { -        throw ParameterError("Parameter '" + parameter + -            "' is not read-only in controllable " + get_rc_name()); -    } -    else { -        throw ParameterError("Parameter '" + parameter + -            "' is not exported by controllable " + get_rc_name()); -    } -} - -const string ClockTAI::get_parameter(const string& parameter) const -{ -    if (parameter == "expiry") { -        std::unique_lock<std::mutex> lock(m_data_mutex); -        const int64_t expiry = parse_bulletin(m_bulletin).expiry; -        if (expiry > 0) { -            return to_string(expiry); -        } -        else { -            return "Bulletin expired or invalid!"; -        } -    } -    else { -        throw ParameterError("Parameter '" + parameter + -            "' is not exported by controllable " + get_rc_name()); -    } -} - -#if 0 -// Example testing code -void debug_tai_clk() -{ -    struct timespec rt_clk; - -    int err = clock_gettime(CLOCK_REALTIME, &rt_clk); -    if (err) { -        perror("REALTIME clock_gettime failed"); -    } - -    struct timespec tai_clk; - -    err = clock_gettime(CLOCK_TAI, &tai_clk); -    if (err) { -        perror("TAI clock_gettime failed"); -    } - -    printf("RT - TAI = %ld\n", rt_clk.tv_sec - tai_clk.tv_sec); - - -    struct timex timex_request; -    timex_request.modes = 0; // Do not set anything - -    err = adjtimex(&timex_request); -    if (err == -1) { -        perror("adjtimex"); -    } - -    printf("adjtimex: %d, tai %d\n", err, timex_request.tai); -} -#endif - -#if TEST -int main(int argc, char **argv) -{ -    using namespace std; - -    ClockTAI tai({}); - -    while (wait_longer) { -        try { -            etiLog.level(info) << -                "Offset is " << tai.get_offset(); -        } -        catch (const exception &e) { -            etiLog.level(error) << -                "Exception " << e.what(); -        } - -        this_thread::sleep_for(chrono::seconds(2)); -    } - -    return 0; -} -#endif - diff --git a/src/ClockTAI.h b/src/ClockTAI.h deleted file mode 100644 index 4b3c2ff..0000000 --- a/src/ClockTAI.h +++ /dev/null @@ -1,102 +0,0 @@ -/* -   Copyright (C) 2002, 2003, 2004, 2005, 2006, 2007, 2008, 2009, 2010, -   2011, 2012 Her Majesty the Queen in Right of Canada (Communications -   Research Center Canada) - -   Copyright (C) 2019 -   Matthias P. Braendli, matthias.braendli@mpb.li - -    http://www.opendigitalradio.org -   */ -/* -   This file is part of ODR-DabMux. - -   ODR-DabMux is free software: you can redistribute it and/or modify -   it under the terms of the GNU General Public License as -   published by the Free Software Foundation, either version 3 of the -   License, or (at your option) any later version. - -   ODR-DabMux is distributed in the hope that it will be useful, -   but WITHOUT ANY WARRANTY; without even the implied warranty of -   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the -   GNU General Public License for more details. - -   You should have received a copy of the GNU General Public License -   along with ODR-DabMux.  If not, see <http://www.gnu.org/licenses/>. -*/ - -/* The EDI output needs TAI clock, according to ETSI TS 102 693 Annex F - * "EDI Timestamps". This module can set the local CLOCK_TAI clock by - * setting the TAI-UTC offset using adjtimex. - * - * This functionality requires Linux 3.10 (30 Jun 2013) or newer. - */ - -#pragma once - -#include <stdint.h> -#include <stdlib.h> -#include <sstream> -#include <chrono> -#include <future> -#include <mutex> -#include <string> -#include <vector> -#include "RemoteControl.h" - -// EDI needs to know UTC-TAI, but doesn't need the CLOCK_TAI to be set. -// We can keep this code, maybe for future use -#define SUPPORT_SETTING_CLOCK_TAI 0 - -/* Loads, parses and represents TAI-UTC offset information from the IETF bulletin */ -class ClockTAI : public RemoteControllable { -    public: -        ClockTAI(const std::vector<std::string>& bulletin_urls); - -        // Fetch the bulletin from the IETF website and return the current -        // TAI-UTC offset. -        // Throws runtime_error on failure. -        int get_offset(void); - -#if SUPPORT_SETTING_CLOCK_TAI -        // Update the local TAI clock according to the TAI-UTC offset -        // return 0 on success -        int update_local_tai_clock(int offset); -#endif - -    private: -        class download_failed {}; - -        // Either retrieve the bulletin from the cache or if necessarly -        // download it, and calculate the TAI-UTC offset. -        // Returns the offset or throws download_failed or a range_error -        // if the offset is out of bounds. -        int get_valid_offset(void); - -        // Download of new bulletin is done asynchronously -        std::future<int> m_offset_future; - -        // Protect all data members, as RC functions are in another thread -        mutable std::mutex m_data_mutex; - -        // The currently used TAI-UTC offset -        int m_offset = 0; -        int m_offset_valid = false; - -        std::vector<std::string> m_bulletin_urls; - -        std::string m_bulletin; -        std::chrono::system_clock::time_point m_bulletin_download_time; - -        // Update the cache file with the current m_bulletin -        void update_cache(const char* cache_filename); - - -        /* Remote control */ -        virtual void set_parameter(const std::string& parameter, -               const std::string& value); - -        /* Getting a parameter always returns a string. */ -        virtual const std::string get_parameter(const std::string& parameter) const; -}; - diff --git a/src/ConfigParser.cpp b/src/ConfigParser.cpp index fb49efc..776ddc8 100644 --- a/src/ConfigParser.cpp +++ b/src/ConfigParser.cpp @@ -40,6 +40,7 @@  #include "utils.h"  #include "DabMux.h"  #include "ManagementServer.h" +#include "input/Edi.h"  #include "input/Prbs.h"  #include "input/Zmq.h"  #include "input/File.h" @@ -50,11 +51,12 @@  #include <boost/algorithm/string/split.hpp>  #include <cstdint>  #include <cstring> -#include <memory> +#include <chrono>  #include <exception>  #include <iostream> -#include <string>  #include <map> +#include <memory> +#include <string>  #include <vector>  using namespace std; @@ -876,34 +878,46 @@ static void setup_subchannel_from_ptree(shared_ptr<DabSubchannel>& subchan,          type = pt.get<string>("type");      }      catch (const ptree_error &e) { -        stringstream ss; -        ss << "Subchannel with uid " << subchanuid << " has no type defined!"; -        throw runtime_error(ss.str()); +        throw runtime_error("Subchannel with uid " + subchanuid + " has no type defined!");      } -    /* Both inputfile and inputuri are supported, and are equivalent. -     * inputuri has precedence +    /* Up to v2.3.1, both inputfile and inputuri are supported, and are +     * equivalent.  inputuri has precedence. +     * +     * After that, either inputfile or the (inputproto, inputuri) pair must be given, but not both.       */      string inputUri = pt.get<string>("inputuri", ""); +    string proto = pt.get<string>("inputproto", ""); -    if (inputUri == "") { +    if (inputUri.empty() and proto.empty()) {          try { +            /* Old approach, derives proto from scheme used in the URL. +             * This makes it impossible to distinguish between ZMQ tcp:// and +             * EDI tcp:// +             */              inputUri = pt.get<string>("inputfile"); +            size_t protopos = inputUri.find("://"); + +            if (protopos == string::npos) { +                proto = "file"; +            } +            else { +                proto = inputUri.substr(0, protopos); + +                if (proto == "tcp" or proto == "epgm" or proto == "ipc") { +                    proto = "zmq"; +                } +                else if (proto == "sti-rtp") { +                    proto = "sti"; +                } +            }          }          catch (const ptree_error &e) { -            stringstream ss; -            ss << "Subchannel with uid " << subchanuid << " has no inputUri defined!"; -            throw runtime_error(ss.str()); +            throw runtime_error("Subchannel with uid " + subchanuid + " has no input defined!");          }      } - -    string proto; -    size_t protopos = inputUri.find("://"); -    if (protopos == string::npos) { -        proto = "file"; -    } -    else { -        proto = inputUri.substr(0, protopos); +    else if (inputUri.empty() or proto.empty()) { +        throw runtime_error("Must define both inputuri and inputproto for uid " + subchanuid);      }      subchan->inputUri = inputUri; @@ -928,7 +942,7 @@ static void setup_subchannel_from_ptree(shared_ptr<DabSubchannel>& subchan,                  throw logic_error("Incomplete handling of file input");              }          } -        else if (proto == "tcp"  or proto == "epgm" or proto == "ipc") { +        else if (proto == "zmq") {              auto zmqconfig = setup_zmq_input(pt, subchanuid);              if (type == "audio") { @@ -941,15 +955,16 @@ static void setup_subchannel_from_ptree(shared_ptr<DabSubchannel>& subchan,                  rcs.enrol(inzmq.get());                  subchan->input = inzmq;              } - -            if (proto == "epgm") { -                etiLog.level(warn) << "Using untested epgm:// zeromq input"; -            } -            else if (proto == "ipc") { -                etiLog.level(warn) << "Using untested ipc:// zeromq input"; -            }          } -        else if (proto == "sti-rtp") { +        else if (proto == "edi") { +            Inputs::dab_input_edi_config_t config; +            config.buffer_size = pt.get("buffer", config.buffer_size); +            config.prebuffering = pt.get("prebuffering", config.prebuffering); +            auto inedi = make_shared<Inputs::Edi>(subchanuid, config); +            rcs.enrol(inedi.get()); +            subchan->input = inedi; +        } +        else if (proto == "stp") {              subchan->input = make_shared<Inputs::Sti_d_Rtp>();          }          else { @@ -1012,6 +1027,20 @@ static void setup_subchannel_from_ptree(shared_ptr<DabSubchannel>& subchan,          }      } +    const string bufferManagement = pt.get("buffer-management", "prebuffering"); +    if (bufferManagement == "prebuffering") { +        subchan->input->setBufferManagement(Inputs::BufferManagement::Prebuffering); +    } +    else if (bufferManagement == "timestamped") { +        subchan->input->setBufferManagement(Inputs::BufferManagement::Timestamped); +    } +    else { +        throw runtime_error("Subchannel with uid " + subchanuid + " has invalid buffer-management !"); +    } + +    const int32_t tist_delay = pt.get("tist-delay", 0); +    subchan->input->setTistDelay(chrono::milliseconds(tist_delay)); +      subchan->startAddress = 0;      dabProtection* protection = &subchan->protection; diff --git a/src/DabMultiplexer.cpp b/src/DabMultiplexer.cpp index 9ff28a3..2bd8d74 100644 --- a/src/DabMultiplexer.cpp +++ b/src/DabMultiplexer.cpp @@ -134,24 +134,23 @@ void DabMultiplexer::prepare(bool require_tai_clock)          throw MuxInitException();      } -    /* TODO: -     * In a SFN, when reconfiguring the ensemble, the multiplexer -     * has to be restarted (odr-dabmux doesn't support reconfiguration). -     * Ideally, we must be able to restart transmission s.t. the receiver -     * synchronisation is preserved. +    /* Ensure edi_time and TIST represent current time. Keep +     * a granularity of 24ms, which corresponds to the +     * duration of an ETI frame, to get nicer timestamps.       */      using Sec = chrono::seconds; -    const auto now = chrono::time_point_cast<Sec>(chrono::system_clock::now()); - -    edi_time = chrono::system_clock::to_time_t(now); - -    // We define that when the time is multiple of six seconds, the timestamp -    // (PPS offset) is 0. This ensures consistency of TIST even across a -    // mux restart +    const auto now = chrono::system_clock::now(); +    edi_time = chrono::system_clock::to_time_t(chrono::time_point_cast<Sec>(now)); +    auto offset = now - chrono::time_point_cast<Sec>(now); +    if (offset >= chrono::seconds(1)) { +        throw std::logic_error("Invalid startup offset calculation for TIST! " + +                to_string(chrono::duration_cast<chrono::milliseconds>(offset).count()) + +                " ms"); +    }      timestamp = 0; -    edi_time -= (edi_time % 6); -    while (edi_time < chrono::system_clock::to_time_t(now)) { +    while (offset >= chrono::milliseconds(24)) {          increment_timestamp(); +        offset -= chrono::milliseconds(24);      }      // Try to load offset once @@ -284,13 +283,13 @@ void DabMultiplexer::prepare_services_components()                      component->subchId, component->serviceId);              throw MuxInitException();          } -        if ((*subchannel)->type != subchannel_type_t::Packet) continue; -        component->packet.id = cur_packetid++; +        if ((*subchannel)->type == subchannel_type_t::Packet) { +            component->packet.id = cur_packetid++; +        }          rcs.enrol(component.get());      } -  }  void DabMultiplexer::prepare_data_inputs() @@ -308,10 +307,8 @@ void DabMultiplexer::prepare_data_inputs()              (*subchannel)->startAddress = (*(subchannel - 1))->startAddress +                  (*(subchannel - 1))->getSizeCu();          } -        if ((*subchannel)->input->open((*subchannel)->inputUri) == -1) { -            perror((*subchannel)->inputUri.c_str()); -            throw MuxInitException(); -        } + +        (*subchannel)->input->open((*subchannel)->inputUri);          // TODO Check errors          int subch_bitrate = (*subchannel)->input->setBitrate( (*subchannel)->bitrate); @@ -376,12 +373,23 @@ void DabMultiplexer::mux_frame(std::vector<std::shared_ptr<DabOutput> >& outputs      // For EDI, save ETI(LI) Management data into a TAG Item DETI      edi::TagDETI edi_tagDETI; -    edi::TagStarPTR edi_tagStarPtr; +    edi::TagStarPTR edi_tagStarPtr("DETI");      map<DabSubchannel*, edi::TagESTn> edi_subchannelToTag;      // The above Tag Items will be assembled into a TAG Packet      edi::TagPacket edi_tagpacket(edi_conf.tagpacket_alignment); +    const bool tist_enabled = m_pt.get("general.tist", false); + +    int tai_utc_offset = 0; +    if (tist_enabled and m_tai_clock_required) { +        try { +            tai_utc_offset = m_clock_tai.get_offset(); +        } +        catch (const std::runtime_error& e) { +            etiLog.level(error) << "Could not get UTC-TAI offset for EDI timestamp"; +        } +    }      update_dab_time();      // Initialise the ETI frame @@ -583,8 +591,9 @@ void DabMultiplexer::mux_frame(std::vector<std::shared_ptr<DabOutput> >& outputs          edi::TagESTn& tag = edi_subchannelToTag[subchannel.get()];          int sizeSubchannel = subchannel->getSizeByte(); -        int result = subchannel->input->readFrame( -                &etiFrame[index], sizeSubchannel); +        // no need to check enableTist because we always increment the timestamp +        int result = subchannel->readFrame(&etiFrame[index], +                        sizeSubchannel, edi_time + m_tist_offset, tai_utc_offset, timestamp);          if (result < 0) {              etiLog.log(info, @@ -637,34 +646,25 @@ void DabMultiplexer::mux_frame(std::vector<std::shared_ptr<DabOutput> >& outputs          edi_tagDETI.tsta = 0xffffff;      } -    const bool tist_enabled = m_pt.get("general.tist", false); -      if (tist_enabled and m_tai_clock_required) { -        try { -            const auto tai_utc_offset = m_clock_tai.get_offset(); -            edi_tagDETI.set_edi_time(edi_time + m_tist_offset, tai_utc_offset); -            edi_tagDETI.atstf = true; - -            for (auto output : outputs) { -                shared_ptr<OutputMetadata> md_utco = -                    make_shared<OutputMetadataUTCO>(edi_tagDETI.utco); -                output->setMetadata(md_utco); - -                shared_ptr<OutputMetadata> md_edi_time = -                    make_shared<OutputMetadataEDITime>(edi_tagDETI.seconds); -                output->setMetadata(md_edi_time); - -                shared_ptr<OutputMetadata> md_dlfc = -                    make_shared<OutputMetadataDLFC>(currentFrame % 5000); -                output->setMetadata(md_dlfc); -            } -        } -        catch (const std::runtime_error& e) { -            etiLog.level(error) << "Could not get UTC-TAI offset for EDI timestamp"; +        edi_tagDETI.set_edi_time(edi_time + m_tist_offset, tai_utc_offset); +        edi_tagDETI.atstf = true; + +        for (auto output : outputs) { +            shared_ptr<OutputMetadata> md_utco = +                make_shared<OutputMetadataUTCO>(edi_tagDETI.utco); +            output->setMetadata(md_utco); + +            shared_ptr<OutputMetadata> md_edi_time = +                make_shared<OutputMetadataEDITime>(edi_tagDETI.seconds); +            output->setMetadata(md_edi_time); + +            shared_ptr<OutputMetadata> md_dlfc = +                make_shared<OutputMetadataDLFC>(currentFrame % 5000); +            output->setMetadata(md_dlfc);          }      } -      /* Coding of the TIST, according to ETS 300 799 Annex C      Bit number      b0(MSb)..b6     b7..b9   b10..b17   b18..b20   b21..b23(LSb) diff --git a/src/DabMultiplexer.h b/src/DabMultiplexer.h index 386c23c..56a8dde 100644 --- a/src/DabMultiplexer.h +++ b/src/DabMultiplexer.h @@ -30,15 +30,14 @@  #endif  #include "dabOutput/dabOutput.h" -#include "dabOutput/edi/TagItems.h" -#include "dabOutput/edi/TagPacket.h" -#include "dabOutput/edi/AFPacket.h" -#include "dabOutput/edi/Transport.h" +#include "edioutput/TagItems.h" +#include "edioutput/TagPacket.h" +#include "edioutput/AFPacket.h" +#include "edioutput/Transport.h"  #include "fig/FIGCarousel.h"  #include "crc.h"  #include "utils.h" -#include "UdpSocket.h" -#include "InetAddress.h" +#include "Socket.h"  #include "PcDebug.h"  #include "MuxElements.h"  #include "RemoteControl.h" diff --git a/src/DabMux.cpp b/src/DabMux.cpp index 51f0310..de0c362 100644 --- a/src/DabMux.cpp +++ b/src/DabMux.cpp @@ -99,8 +99,7 @@ typedef DWORD32 uint32_t;  #include "dabOutput/dabOutput.h"  #include "crc.h" -#include "UdpSocket.h" -#include "InetAddress.h" +#include "Socket.h"  #include "PcDebug.h"  #include "DabMux.h"  #include "MuxElements.h" @@ -240,6 +239,26 @@ int main(int argc, char *argv[])              etiLog.register_backend(std::make_shared<LogToSyslog>());          } +        const auto startupcheck = pt.get<string>("general.startupcheck", ""); +        if (not startupcheck.empty()) { +            etiLog.level(info) << "Running startup check '" << startupcheck << "'"; +            int wstatus = system(startupcheck.c_str()); + +            if (WIFEXITED(wstatus)) { +                if (WEXITSTATUS(wstatus) == 0) { +                    etiLog.level(info) << "Startup check ok"; +                } +                else { +                    etiLog.level(error) << "Startup check failed, returned " << WEXITSTATUS(wstatus); +                    return 1; +                } +            } +            else { +                etiLog.level(error) << "Startup check failed, child didn't terminate normally"; +                return 1; +            } +        } +          int mgmtserverport = pt.get<int>("general.managementport",                               pt.get<int>("general.statsserverport", 0) ); @@ -292,6 +311,8 @@ int main(int argc, char *argv[])              if (outputuid == "edi") {                  ptree pt_edi = pt_outputs.get_child("edi"); +                bool require_dest_port = false; +                  for (auto pt_edi_dest : pt_edi.get_child("destinations")) {                      const auto proto = pt_edi_dest.second.get<string>("protocol", "udp");                      if (proto == "udp") { @@ -303,9 +324,11 @@ int main(int argc, char *argv[])                          dest->source_port = pt_edi_dest.second.get<unsigned int>("sourceport");                          edi_conf.destinations.push_back(dest); + +                        require_dest_port = true;                      }                      else if (proto == "tcp") { -                        auto dest = make_shared<edi::tcp_destination_t>(); +                        auto dest = make_shared<edi::tcp_server_t>();                          dest->listen_port = pt_edi_dest.second.get<unsigned int>("listenport");                          dest->max_frames_queued = pt_edi_dest.second.get<size_t>("max_frames_queued", 500);                          edi_conf.destinations.push_back(dest); @@ -315,11 +338,13 @@ int main(int argc, char *argv[])                      }                  } -                edi_conf.dest_port           = pt_edi.get<unsigned int>("port"); +                if (require_dest_port) { +                    edi_conf.dest_port       = pt_edi.get<unsigned int>("port"); +                } -                edi_conf.dump                = pt_edi.get<bool>("dump"); -                edi_conf.enable_pft          = pt_edi.get<bool>("enable_pft"); -                edi_conf.verbose             = pt_edi.get<bool>("verbose"); +                edi_conf.dump                = pt_edi.get<bool>("dump", false); +                edi_conf.enable_pft          = pt_edi.get<bool>("enable_pft", false); +                edi_conf.verbose             = pt_edi.get<bool>("verbose", false);                  edi_conf.fec                 = pt_edi.get<unsigned int>("fec", 3);                  edi_conf.chunk_len           = pt_edi.get<unsigned int>("chunk_len", 207); diff --git a/src/InetAddress.cpp b/src/InetAddress.cpp deleted file mode 100644 index 7660263..0000000 --- a/src/InetAddress.cpp +++ /dev/null @@ -1,155 +0,0 @@ -/* -   Copyright (C) 2003, 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the -   Queen in Right of Canada (Communications Research Center Canada) - -   Copyright (C) 2016 -   Matthias P. Braendli, matthias.braendli@mpb.li - -    http://www.opendigitalradio.org -   */ -/* -   This file is part of ODR-DabMux. - -   ODR-DabMux is free software: you can redistribute it and/or modify -   it under the terms of the GNU General Public License as -   published by the Free Software Foundation, either version 3 of the -   License, or (at your option) any later version. - -   ODR-DabMux is distributed in the hope that it will be useful, -   but WITHOUT ANY WARRANTY; without even the implied warranty of -   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the -   GNU General Public License for more details. - -   You should have received a copy of the GNU General Public License -   along with ODR-DabMux.  If not, see <http://www.gnu.org/licenses/>. -   */ - -#include "InetAddress.h" -#include <iostream> -#include <stdio.h> -#include <errno.h> -#include <string.h> - -#ifdef TRACE_ON -# ifndef TRACE_CLASS -#  define TRACE_CLASS(clas, func) cout <<"-" <<(clas) <<"\t(" <<this <<")::" <<(func) <<endl -#  define TRACE_STATIC(clas, func) cout <<"-" <<(clas) <<"\t(static)::" <<(func) <<endl -# endif -#else -# ifndef TRACE_CLASS -#  define TRACE_CLASS(clas, func) -#  define TRACE_STATIC(clas, func) -# endif -#endif - - -int inetErrNo = 0; -const char *inetErrMsg = nullptr; -const char *inetErrDesc = nullptr; - - -/** - *  Constructs an IP address. - *  @param port The port of this address - *  @param name The name of this address - */ -InetAddress::InetAddress(int port, const char* name) { -    TRACE_CLASS("InetAddress", "InetAddress(int, char)"); -    addr.sin_family = PF_INET; -    addr.sin_addr.s_addr = htons(INADDR_ANY); -    addr.sin_port = htons(port); -    if (name) -        setAddress(name); -} - - -/// Returns the raw IP address of this InetAddress object. -sockaddr *InetAddress::getAddress() { -    TRACE_CLASS("InetAddress", "getAddress()"); -    return (sockaddr *)&addr; -} - - -/// Return the port of this address. -int InetAddress::getPort() -{ -    TRACE_CLASS("InetAddress", "getPort()"); -    return ntohs(addr.sin_port); -} - - -/** - *  Returns the IP address string "%d.%d.%d.%d". - *  @return IP address - */ -const char *InetAddress::getHostAddress() { -    TRACE_CLASS("InetAddress", "getHostAddress()"); -    return inet_ntoa(addr.sin_addr); -} - - -/// Returns true if this address is multicast -bool InetAddress::isMulticastAddress() { -    TRACE_CLASS("InetAddress", "isMulticastAddress()"); -    return IN_MULTICAST(ntohl(addr.sin_addr.s_addr));		// a modifier -} - - -/** - *  Set the port number - *  @param port The new port number - */ -void InetAddress::setPort(int port) -{ -    TRACE_CLASS("InetAddress", "setPort(int)"); -    addr.sin_port = htons(port); -} - - -/** - *  Set the address - *  @param name The new address name - *  @return 0  if ok - *          -1 if error - */ -int InetAddress::setAddress(const std::string& name) -{ -    TRACE_CLASS("InetAddress", "setAddress(string)"); -    if (!name.empty()) { -        if (atoi(name.c_str())) {   // If it start with a number -            if ((addr.sin_addr.s_addr = inet_addr(name.c_str())) == INADDR_NONE) { -                addr.sin_addr.s_addr = htons(INADDR_ANY); -                inetErrNo = 0; -                inetErrMsg = "Invalid address"; -                inetErrDesc = name.c_str(); -                return -1; -            } -        } -        else {            // Assume it's a real name -            hostent *host = gethostbyname(name.c_str()); -            if (host) { -                addr.sin_addr = *(in_addr *)(host->h_addr); -            } else { -                addr.sin_addr.s_addr = htons(INADDR_ANY); -                inetErrNo = 0; -                inetErrMsg = "Could not find address"; -                inetErrDesc = name.c_str(); -                return -1; -            } -        } -    } -    else { -        addr.sin_addr.s_addr = INADDR_ANY; -    } -    return 0; -} - - -void setInetError(const char* description) -{ -    inetErrNo = 0; -    inetErrNo = errno; -    inetErrMsg = strerror(inetErrNo); -    inetErrDesc = description; -} - diff --git a/src/InetAddress.h b/src/InetAddress.h deleted file mode 100644 index e246d4c..0000000 --- a/src/InetAddress.h +++ /dev/null @@ -1,78 +0,0 @@ -/* -   Copyright (C) 2003, 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the -   Queen in Right of Canada (Communications Research Center Canada) - -   Copyright (C) 2016 -   Matthias P. Braendli, matthias.braendli@mpb.li - -    http://www.opendigitalradio.org -   */ -/* -   This file is part of ODR-DabMux. - -   ODR-DabMux is free software: you can redistribute it and/or modify -   it under the terms of the GNU General Public License as -   published by the Free Software Foundation, either version 3 of the -   License, or (at your option) any later version. - -   ODR-DabMux is distributed in the hope that it will be useful, -   but WITHOUT ANY WARRANTY; without even the implied warranty of -   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the -   GNU General Public License for more details. - -   You should have received a copy of the GNU General Public License -   along with ODR-DabMux.  If not, see <http://www.gnu.org/licenses/>. -   */ - -#ifndef _InetAddress -#define _InetAddress - -#ifdef HAVE_CONFIG_H -#   include "config.h" -#endif - -#include <stdlib.h> -#include <sys/socket.h> -#include <netinet/in.h> -#include <unistd.h> -#include <netdb.h> -#include <arpa/inet.h> -#include <pthread.h> -#include <string> - -#define SOCKET                  int -#define INVALID_SOCKET          -1 -#define INVALID_PORT            -1 - - -/// The last error number -extern int inetErrNo; -/// The last error message -extern const char *inetErrMsg; -/// The description of the last error -extern const char *inetErrDesc; -/// Set the number, message and description of the last error -void setInetError(const char* description); - - -/** - *  This class represents an Internet Protocol (IP) address. - *  @author Pascal Charest pascal.charest@crc.ca - */ -class InetAddress { - public: -  InetAddress(int port = 0, const char* name = NULL); - -  sockaddr *getAddress(); -  const char *getHostAddress(); -  int getPort(); -  int setAddress(const std::string& name); -  void setPort(int port); -  bool isMulticastAddress(); - - private: -  sockaddr_in addr; -}; - - -#endif diff --git a/src/Log.cpp b/src/Log.cpp deleted file mode 100644 index 6b78fe0..0000000 --- a/src/Log.cpp +++ /dev/null @@ -1,143 +0,0 @@ -/* -   Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012 -   Her Majesty the Queen in Right of Canada (Communications Research -   Center Canada) - -   Copyright (C) 2018 -   Matthias P. Braendli, matthias.braendli@mpb.li - -    http://www.opendigitalradio.org - */ -/* -   This file is part of ODR-DabMux. - -   ODR-DabMux is free software: you can redistribute it and/or modify -   it under the terms of the GNU General Public License as -   published by the Free Software Foundation, either version 3 of the -   License, or (at your option) any later version. - -   ODR-DabMux is distributed in the hope that it will be useful, -   but WITHOUT ANY WARRANTY; without even the implied warranty of -   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the -   GNU General Public License for more details. - -   You should have received a copy of the GNU General Public License -   along with ODR-DabMux.  If not, see <http://www.gnu.org/licenses/>. - */ - -#include <list> -#include <cstdarg> -#include <chrono> - -#include "Log.h" - -using namespace std; - -/* etiLog is a singleton used in all parts of ODR-DabMod to output log messages. - */ -Logger etiLog; - -void Logger::register_backend(std::shared_ptr<LogBackend> backend) -{ -    backends.push_back(backend); -} - - -void Logger::log(log_level_t level, const char* fmt, ...) -{ -    if (level == discard) { -        return; -    } - -    int size = 100; -    std::string str; -    va_list ap; -    while (1) { -        str.resize(size); -        va_start(ap, fmt); -        int n = vsnprintf((char *)str.c_str(), size, fmt, ap); -        va_end(ap); -        if (n > -1 && n < size) { -            str.resize(n); -            break; -        } -        if (n > -1) -            size = n + 1; -        else -            size *= 2; -    } - -    logstr(level, move(str)); -} - -void Logger::logstr(log_level_t level, std::string&& message) -{ -    if (level == discard) { -        return; -    } - -    /* Remove a potential trailing newline. -     * It doesn't look good in syslog -     */ -    if (message[message.length()-1] == '\n') { -        message.resize(message.length()-1); -    } - -    for (auto &backend : backends) { -        backend->log(level, message); -    } - -    { -        std::lock_guard<std::mutex> guard(m_cerr_mutex); -        std::cerr << levels_as_str[level] << " " << message << std::endl; -    } -} - - -LogLine Logger::level(log_level_t level) -{ -    return LogLine(this, level); -} - -LogToFile::LogToFile(const std::string& filename) : name("FILE") -{ -    FILE* fd = fopen(filename.c_str(), "a"); -    if (fd == nullptr) { -        fprintf(stderr, "Cannot open log file !"); -        throw std::runtime_error("Cannot open log file !"); -    } - -    log_file.reset(fd); -} - -void LogToFile::log(log_level_t level, const std::string& message) -{ -    if (level != log_level_t::discard) { -        const char* log_level_text[] = { -            "DEBUG", "INFO", "WARN", "ERROR", "ALERT", "EMERG"}; - -        // fprintf is thread-safe -        fprintf(log_file.get(), SYSLOG_IDENT ": %s: %s\n", -                log_level_text[(size_t)level], message.c_str()); -        fflush(log_file.get()); -    } -} - -void LogToSyslog::log(log_level_t level, const std::string& message) -{ -    if (level != log_level_t::discard) { -        int syslog_level = LOG_EMERG; -        switch (level) { -            case debug: syslog_level = LOG_DEBUG; break; -            case info:  syslog_level = LOG_INFO; break; -                        /* we don't have the notice level */ -            case warn:  syslog_level = LOG_WARNING; break; -            case error: syslog_level = LOG_ERR; break; -            default:    syslog_level = LOG_CRIT; break; -            case alert: syslog_level = LOG_ALERT; break; -            case emerg: syslog_level = LOG_EMERG; break; -        } - -        syslog(syslog_level, SYSLOG_IDENT " %s", message.c_str()); -    } -} diff --git a/src/Log.h b/src/Log.h deleted file mode 100644 index 18f8c99..0000000 --- a/src/Log.h +++ /dev/null @@ -1,157 +0,0 @@ -/* -   Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012 -   Her Majesty the Queen in Right of Canada (Communications Research -   Center Canada) - -   Copyright (C) 2018 -   Matthias P. Braendli, matthias.braendli@mpb.li - -    http://www.opendigitalradio.org - */ -/* -   This file is part of ODR-DabMux. - -   ODR-DabMux is free software: you can redistribute it and/or modify -   it under the terms of the GNU General Public License as -   published by the Free Software Foundation, either version 3 of the -   License, or (at your option) any later version. - -   ODR-DabMux is distributed in the hope that it will be useful, -   but WITHOUT ANY WARRANTY; without even the implied warranty of -   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the -   GNU General Public License for more details. - -   You should have received a copy of the GNU General Public License -   along with ODR-DabMux.  If not, see <http://www.gnu.org/licenses/>. - */ - -#pragma once - -#ifdef HAVE_CONFIG_H -#   include "config.h" -#endif - -#include <syslog.h> -#include <cstdarg> -#include <cstdio> -#include <fstream> -#include <sstream> -#include <iostream> -#include <list> -#include <stdexcept> -#include <string> -#include <mutex> -#include <memory> - -#define SYSLOG_IDENT "ODR-DabMux" -#define SYSLOG_FACILITY LOG_LOCAL0 - -enum log_level_t {debug = 0, info, warn, error, alert, emerg, discard}; - -static const std::string levels_as_str[] = -    { "     ", "     ", "WARN ", "ERROR", "ALERT", "EMERG", "-----"} ; - -/** Abstract class all backends must inherit from */ -class LogBackend { -    public: -        virtual ~LogBackend() {}; -        virtual void log(log_level_t level, const std::string& message) = 0; -        virtual std::string get_name() const = 0; -}; - -/** A Logging backend for Syslog */ -class LogToSyslog : public LogBackend { -    public: -        LogToSyslog() : name("SYSLOG") { -            openlog(SYSLOG_IDENT, LOG_PID, SYSLOG_FACILITY); -        } - -        virtual ~LogToSyslog() { -            closelog(); -        } - -        void log(log_level_t level, const std::string& message); - -        std::string get_name() const { return name; } - -    private: -        const std::string name; - -        LogToSyslog(const LogToSyslog& other) = delete; -        const LogToSyslog& operator=(const LogToSyslog& other) = delete; -}; - -class LogToFile : public LogBackend { -    public: -        LogToFile(const std::string& filename); -        void log(log_level_t level, const std::string& message); -        std::string get_name() const { return name; } - -    private: -        const std::string name; - -        struct FILEDeleter{ void operator()(FILE* fd){ if(fd) fclose(fd);}}; -        std::unique_ptr<FILE, FILEDeleter> log_file; - -        LogToFile(const LogToFile& other) = delete; -        const LogToFile& operator=(const LogToFile& other) = delete; -}; - -class LogLine; - -class Logger { -    public: -        void register_backend(std::shared_ptr<LogBackend> backend); - -        /* Log the message to all backends */ -        void log(log_level_t level, const char* fmt, ...); - -        void logstr(log_level_t level, std::string&& message); - -        /* Return a LogLine for the given level -         * so that you can write etiLog.level(info) << "stuff = " << 21 */ -        LogLine level(log_level_t level); - -    private: -        std::list<std::shared_ptr<LogBackend> > backends; - -        std::mutex m_cerr_mutex; -}; - -extern Logger etiLog; - -// Accumulate a line of logs, using same syntax as stringstream -// The line is logged when the LogLine gets destroyed -class LogLine { -    public: -        LogLine(const LogLine& logline); -        const LogLine& operator=(const LogLine& other) = delete; -        LogLine(Logger* logger, log_level_t level) : -            logger_(logger) -        { -            level_ = level; -        } - -        // Push the new element into the stringstream -        template <typename T> -        LogLine& operator<<(T s) { -            if (level_ != discard) { -                os << s; -            } -            return *this; -        } - -        ~LogLine() -        { -            if (level_ != discard) { -                logger_->logstr(level_, os.str()); -            } -        } - -    private: -        std::ostringstream os; -        log_level_t level_; -        Logger* logger_; -}; - - diff --git a/src/ManagementServer.cpp b/src/ManagementServer.cpp index 201fc7b..783a40b 100644 --- a/src/ManagementServer.cpp +++ b/src/ManagementServer.cpp @@ -485,6 +485,14 @@ void InputStat::notifyOverrun(void)      }  } +void InputStat::notifyVersion(const std::string& version, uint32_t uptime_s) +{ +    unique_lock<mutex> lock(m_mutex); + +    m_version = version; +    m_uptime_s = uptime_s; +} +  std::string InputStat::encodeValuesJSON()  {      std::ostringstream ss; @@ -548,6 +556,13 @@ std::string InputStat::encodeValuesJSON()          return dB;      }; +    auto version = m_version; +    size_t pos = 0; +    while ((pos = version.find("\"", pos)) != std::string::npos) { +         version.replace(pos, 1, "\\\""); +         pos++; +    } +      ss <<      "{ \"inputstat\" : {"          "\"min_fill\": " << min_fill_buffer << ", " @@ -557,7 +572,10 @@ std::string InputStat::encodeValuesJSON()          "\"peak_left_slow\": " << to_dB(peak_left) << ", "          "\"peak_right_slow\": " << to_dB(peak_right) << ", "          "\"num_underruns\": " << m_num_underruns << ", " -        "\"num_overruns\": " << m_num_overruns << ", "; +        "\"num_overruns\": " << m_num_overruns << ", " +        "\"version\": \"" << version << "\", " +        "\"uptime\": " << m_uptime_s << ", " +        ;      ss << "\"state\": "; diff --git a/src/ManagementServer.h b/src/ManagementServer.h index 18af48c..5b52957 100644 --- a/src/ManagementServer.h +++ b/src/ManagementServer.h @@ -100,6 +100,7 @@ class InputStat          void notifyPeakLevels(int peak_left, int peak_right);          void notifyUnderrun(void);          void notifyOverrun(void); +        void notifyVersion(const std::string& version, uint32_t uptime_s);          std::string encodeValuesJSON(void);          input_state_t determineState(void); @@ -131,6 +132,9 @@ class InputStat          size_t m_short_window_length = 0; +        std::string m_version; +        uint32_t m_uptime_s = 0; +          /************* STATE ***************/          /* Variables used for determining the input state */          int m_glitch_counter = 0; // saturating counter diff --git a/src/MuxElements.cpp b/src/MuxElements.cpp index ad1fcd4..81466a8 100644 --- a/src/MuxElements.cpp +++ b/src/MuxElements.cpp @@ -784,6 +784,17 @@ unsigned short DabSubchannel::getSizeDWord() const      return (bitrate * 3) >> 3;  } +size_t DabSubchannel::readFrame(uint8_t *buffer, size_t size, std::time_t seconds, int utco, uint32_t tsta) +{ +    switch (input->getBufferManagement()) { +        case Inputs::BufferManagement::Prebuffering: +            return input->readFrame(buffer, size); +        case Inputs::BufferManagement::Timestamped: +            return input->readFrame(buffer, size, seconds, utco, tsta); +    } +    throw logic_error("Unhandled case"); +} +  LinkageSet::LinkageSet(const std::string& name,          uint16_t lsn,          bool active, diff --git a/src/MuxElements.h b/src/MuxElements.h index ec79fdd..0f7e621 100644 --- a/src/MuxElements.h +++ b/src/MuxElements.h @@ -356,6 +356,9 @@ public:      // Calculate subchannel size in number of uint64_t      unsigned short getSizeDWord(void) const; +    // Read from the input, using the correct buffer management +    size_t readFrame(uint8_t *buffer, size_t size, std::time_t seconds, int utco, uint32_t tsta); +      std::string uid;      std::string inputUri; diff --git a/src/ReedSolomon.cpp b/src/ReedSolomon.cpp deleted file mode 100644 index 38d8ea8..0000000 --- a/src/ReedSolomon.cpp +++ /dev/null @@ -1,116 +0,0 @@ -/* -   Copyright (C) 2005, 2006, 2007, 2008, 2009 Her Majesty the Queen in Right -   of Canada (Communications Research Center Canada) - -   Copyright (C) 2016 -   Matthias P. Braendli, matthias.braendli@mpb.li - -    http://www.opendigitalradio.org -   */ -/* -   This file is part of ODR-DabMux. - -   ODR-DabMux is free software: you can redistribute it and/or modify -   it under the terms of the GNU General Public License as -   published by the Free Software Foundation, either version 3 of the -   License, or (at your option) any later version. - -   ODR-DabMux is distributed in the hope that it will be useful, -   but WITHOUT ANY WARRANTY; without even the implied warranty of -   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the -   GNU General Public License for more details. - -   You should have received a copy of the GNU General Public License -   along with ODR-DabMux.  If not, see <http://www.gnu.org/licenses/>. -   */ - -#include "ReedSolomon.h" -#include <vector> -#include <algorithm> -#include <stdexcept> -#include <sstream> -#include <stdio.h>          // For galois.h ... -#include <string.h>         // For memcpy - -extern "C" { -#include "fec/fec.h" -} -#include <assert.h> - -#define SYMSIZE     8 - - -ReedSolomon::ReedSolomon(int N, int K, bool reverse, int gfpoly, int firstRoot, int primElem) -{ -    setReverse(reverse); - -    m_N = N; -    m_K = K; - -    const int symsize = SYMSIZE; -    const int nroots = N - K; // For EDI PFT, this must be 48 -    const int pad = ((1 << symsize) - 1) - N; // is 255-N - -    rsData = init_rs_char(symsize, gfpoly, firstRoot, primElem, nroots, pad); - -    if (rsData == nullptr) { -        std::stringstream ss; -        ss << "Invalid Reed-Solomon parameters! " << -            "N=" << N << " ; K=" << K << " ; pad=" << pad; -        throw std::invalid_argument(ss.str()); -    } -} - - -ReedSolomon::~ReedSolomon() -{ -    free_rs_char(rsData); -} - - -void ReedSolomon::setReverse(bool state) -{ -    reverse = state; -} - - -int ReedSolomon::encode(void* data, void* fec, size_t size) -{ -    uint8_t* input = reinterpret_cast<uint8_t*>(data); -    uint8_t* output = reinterpret_cast<uint8_t*>(fec); -    int ret = 0; - -    if (reverse) { -        std::vector<uint8_t> buffer(m_N); - -        memcpy(&buffer[0], input, m_K); -        memcpy(&buffer[m_K], output, m_N - m_K); - -        ret = decode_rs_char(rsData, &buffer[0], nullptr, 0); -        if ((ret != 0) && (ret != -1)) { -            memcpy(input, &buffer[0], m_K); -            memcpy(output, &buffer[m_K], m_N - m_K); -        } -    } -    else { -        encode_rs_char(rsData, input, output); -    } - -    return ret; -} - - -int ReedSolomon::encode(void* data, size_t size) -{ -    uint8_t* input = reinterpret_cast<uint8_t*>(data); -    int ret = 0; - -    if (reverse) { -        ret = decode_rs_char(rsData, input, nullptr, 0); -    } -    else { -        encode_rs_char(rsData, input, &input[m_K]); -    } - -    return ret; -} diff --git a/src/ReedSolomon.h b/src/ReedSolomon.h deleted file mode 100644 index abcef62..0000000 --- a/src/ReedSolomon.h +++ /dev/null @@ -1,56 +0,0 @@ -/* -   Copyright (C) 2005, 2006, 2007, 2008, 2009 Her Majesty the Queen in Right -   of Canada (Communications Research Center Canada) - -   Copyright (C) 2016 -   Matthias P. Braendli, matthias.braendli@mpb.li - -    http://www.opendigitalradio.org -   */ -/* -   This file is part of ODR-DabMux. - -   ODR-DabMux is free software: you can redistribute it and/or modify -   it under the terms of the GNU General Public License as -   published by the Free Software Foundation, either version 3 of the -   License, or (at your option) any later version. - -   ODR-DabMux is distributed in the hope that it will be useful, -   but WITHOUT ANY WARRANTY; without even the implied warranty of -   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the -   GNU General Public License for more details. - -   You should have received a copy of the GNU General Public License -   along with ODR-DabMux.  If not, see <http://www.gnu.org/licenses/>. -   */ - -#pragma once - -#ifdef HAVE_CONFIG_H -# include <config.h> -#endif - -#include <stdlib.h> - -class ReedSolomon -{ -public: -    ReedSolomon(int N, int K, -            bool reverse = false, -            int gfpoly = 0x11d, int firstRoot = 0, int primElem = 1); -    ReedSolomon(const ReedSolomon& other) = delete; -    ReedSolomon operator=(const ReedSolomon& other) = delete; -    ~ReedSolomon(); - -    void setReverse(bool state); -    int encode(void* data, void* fec, size_t size); -    int encode(void* data, size_t size); - -private: -    int m_N; -    int m_K; - -    void* rsData; -    bool reverse; -}; - diff --git a/src/RemoteControl.cpp b/src/RemoteControl.cpp deleted file mode 100644 index b32c21a..0000000 --- a/src/RemoteControl.cpp +++ /dev/null @@ -1,595 +0,0 @@ -/* -   Copyright (C) 2007, 2008, 2009, 2010, 2011, 2012 -   Her Majesty the Queen in Right of Canada (Communications Research -   Center Canada) - -   Copyright (C) 2019 -   Matthias P. Braendli, matthias.braendli@mpb.li - -    http://www.opendigitalradio.org - */ -/* -   This file is part of ODR-DabMux. - -   ODR-DabMux is free software: you can redistribute it and/or modify -   it under the terms of the GNU General Public License as -   published by the Free Software Foundation, either version 3 of the -   License, or (at your option) any later version. - -   ODR-DabMux is distributed in the hope that it will be useful, -   but WITHOUT ANY WARRANTY; without even the implied warranty of -   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the -   GNU General Public License for more details. - -   You should have received a copy of the GNU General Public License -   along with ODR-DabMux.  If not, see <http://www.gnu.org/licenses/>. - */ -#include <list> -#include <string> -#include <iostream> -#include <string> -#include <boost/asio.hpp> -#include <boost/thread.hpp> - -#include "RemoteControl.h" - -using boost::asio::ip::tcp; -using namespace std; - -RemoteControllers rcs; - -RemoteControllerTelnet::~RemoteControllerTelnet() -{ -    m_active = false; -    m_io_service.stop(); - -    if (m_restarter_thread.joinable()) { -        m_restarter_thread.join(); -    } - -    if (m_child_thread.joinable()) { -        m_child_thread.join(); -    } -} - -void RemoteControllerTelnet::restart() -{ -    if (m_restarter_thread.joinable()) { -        m_restarter_thread.join(); -    } - -    m_restarter_thread = std::thread( -            &RemoteControllerTelnet::restart_thread, -            this, 0); -} - -RemoteControllable::~RemoteControllable() { -    rcs.remove_controllable(this); -} - -std::list<std::string> RemoteControllable::get_supported_parameters() const { -    std::list<std::string> parameterlist; -    for (const auto& param : m_parameters) { -        parameterlist.push_back(param[0]); -    } -    return parameterlist; -} - -void RemoteControllers::add_controller(std::shared_ptr<BaseRemoteController> rc) { -    m_controllers.push_back(rc); -} - -void RemoteControllers::enrol(RemoteControllable *rc) { -    controllables.push_back(rc); -} - -void RemoteControllers::remove_controllable(RemoteControllable *rc) { -    controllables.remove(rc); -} - -std::list< std::vector<std::string> > RemoteControllers::get_param_list_values(const std::string& name) { -    RemoteControllable* controllable = get_controllable_(name); - -    std::list< std::vector<std::string> > allparams; -    for (auto ¶m : controllable->get_supported_parameters()) { -        std::vector<std::string> item; -        item.push_back(param); -        try { -            item.push_back(controllable->get_parameter(param)); -        } -        catch (const ParameterError &e) { -            item.push_back(std::string("error: ") + e.what()); -        } - -        allparams.push_back(item); -    } -    return allparams; -} - -std::string RemoteControllers::get_param(const std::string& name, const std::string& param) { -    RemoteControllable* controllable = get_controllable_(name); -    return controllable->get_parameter(param); -} - -void RemoteControllers::check_faults() { -    for (auto &controller : m_controllers) { -        if (controller->fault_detected()) { -            etiLog.level(warn) << -                "Detected Remote Control fault, restarting it"; -            controller->restart(); -        } -    } -} - -RemoteControllable* RemoteControllers::get_controllable_(const std::string& name) -{ -    auto rc = std::find_if(controllables.begin(), controllables.end(), -            [&](RemoteControllable* r) { return r->get_rc_name() == name; }); - -    if (rc == controllables.end()) { -        throw ParameterError("Module name unknown"); -    } -    else { -        return *rc; -    } -} - -void RemoteControllers::set_param( -        const std::string& name, -        const std::string& param, -        const std::string& value) -{ -    etiLog.level(info) << "RC: Setting " << name << " " << param -        << " to " << value; -    RemoteControllable* controllable = get_controllable_(name); -    try { -        return controllable->set_parameter(param, value); -    } -    catch (const ios_base::failure& e) { -        etiLog.level(info) << "RC: Failed to set " << name << " " << param -        << " to " << value << ": " << e.what(); -        throw ParameterError("Cannot understand value"); -    } -} - -// This runs in a separate thread, because -// it would take too long to be done in the main loop -// thread. -void RemoteControllerTelnet::restart_thread(long) -{ -    m_active = false; -    m_io_service.stop(); - -    if (m_child_thread.joinable()) { -        m_child_thread.join(); -    } - -    m_child_thread = std::thread(&RemoteControllerTelnet::process, this, 0); -} - -void RemoteControllerTelnet::handle_accept( -        const boost::system::error_code& boost_error, -        boost::shared_ptr< boost::asio::ip::tcp::socket > socket, -        boost::asio::ip::tcp::acceptor& acceptor) -{ - -    const std::string welcome = "ODR-DabMux Remote Control CLI\n" -                                "Write 'help' for help.\n" -                                "**********\n"; -    const std::string prompt = "> "; - -    std::string in_message; -    size_t length; - -    if (boost_error) { -        etiLog.level(error) << "RC: Error accepting connection"; -        return; -    } - -    try { -        etiLog.level(info) << "RC: Accepted"; - -        boost::system::error_code ignored_error; - -        boost::asio::write(*socket, boost::asio::buffer(welcome), -                boost::asio::transfer_all(), -                ignored_error); - -        while (m_active && in_message != "quit") { -            boost::asio::write(*socket, boost::asio::buffer(prompt), -                    boost::asio::transfer_all(), -                    ignored_error); - -            in_message = ""; - -            boost::asio::streambuf buffer; -            length = boost::asio::read_until(*socket, buffer, "\n", ignored_error); - -            std::istream str(&buffer); -            std::getline(str, in_message); - -            if (length == 0) { -                etiLog.level(info) << "RC: Connection terminated"; -                break; -            } - -            while (in_message.length() > 0 && -                    (in_message[in_message.length()-1] == '\r' || -                     in_message[in_message.length()-1] == '\n')) { -                in_message.erase(in_message.length()-1, 1); -            } - -            if (in_message.length() == 0) { -                continue; -            } - -            etiLog.level(info) << "RC: Got message '" << in_message << "'"; - -            dispatch_command(*socket, in_message); -        } -        etiLog.level(info) << "RC: Closing socket"; -        socket->close(); -    } -    catch (const std::exception& e) -    { -        etiLog.level(error) << "Remote control caught exception: " << e.what(); -    } -} - -void RemoteControllerTelnet::process(long) -{ -    m_active = true; - -    while (m_active) { -        m_io_service.reset(); - -        tcp::acceptor acceptor(m_io_service, tcp::endpoint( -                    boost::asio::ip::address::from_string("127.0.0.1"), m_port) ); - - -        // Add a job to start accepting connections. -        boost::shared_ptr<tcp::socket> socket( -                new tcp::socket(acceptor.get_io_service())); - -        // Add an accept call to the service.  This will prevent io_service::run() -        // from returning. -        etiLog.level(info) << "RC: Waiting for connection on port " << m_port; -        acceptor.async_accept(*socket, -                boost::bind(&RemoteControllerTelnet::handle_accept, -                    this, -                    boost::asio::placeholders::error, -                    socket, -                    boost::ref(acceptor))); - -        // Process event loop. -        m_io_service.run(); -    } - -    etiLog.level(info) << "RC: Leaving"; -    m_fault = true; -} - -static std::vector<std::string> tokenise(const std::string& message) { -    stringstream ss(message); -    std::vector<std::string> all_tokens; -    std::string item; - -    while (std::getline(ss, item, ' ')) { -        all_tokens.push_back(move(item)); -    } -    return all_tokens; -} - - -void RemoteControllerTelnet::dispatch_command(tcp::socket& socket, string command) -{ -    vector<string> cmd = tokenise(command); - -    if (cmd[0] == "help") { -        reply(socket, -                "The following commands are supported:\n" -                "  list\n" -                "    * Lists the modules that are loaded and their parameters\n" -                "  show MODULE\n" -                "    * Lists all parameters and their values from module MODULE\n" -                "  get MODULE PARAMETER\n" -                "    * Gets the value for the specified PARAMETER from module MODULE\n" -                "  set MODULE PARAMETER VALUE\n" -                "    * Sets the value for the PARAMETER ofr module MODULE\n" -                "  quit\n" -                "    * Terminate this session\n" -                "\n"); -    } -    else if (cmd[0] == "list") { -        stringstream ss; - -        if (cmd.size() == 1) { -            for (auto &controllable : rcs.controllables) { -                ss << controllable->get_rc_name() << endl; - -                list< vector<string> > params = controllable->get_parameter_descriptions(); -                for (auto ¶m : params) { -                    ss << "\t" << param[0] << " : " << param[1] << endl; -                } -            } -        } -        else { -            reply(socket, "Too many arguments for command 'list'"); -        } - -        reply(socket, ss.str()); -    } -    else if (cmd[0] == "show") { -        if (cmd.size() == 2) { -            try { -                stringstream ss; -                list< vector<string> > r = rcs.get_param_list_values(cmd[1]); -                for (auto ¶m_val : r) { -                    ss << param_val[0] << ": " << param_val[1] << endl; -                } -                reply(socket, ss.str()); - -            } -            catch (const ParameterError &e) { -                reply(socket, e.what()); -            } -        } -        else { -            reply(socket, "Incorrect parameters for command 'show'"); -        } -    } -    else if (cmd[0] == "get") { -        if (cmd.size() == 3) { -            try { -                string r = rcs.get_param(cmd[1], cmd[2]); -                reply(socket, r); -            } -            catch (const ParameterError &e) { -                reply(socket, e.what()); -            } -        } -        else { -            reply(socket, "Incorrect parameters for command 'get'"); -        } -    } -    else if (cmd[0] == "set") { -        if (cmd.size() >= 4) { -            try { -                stringstream new_param_value; -                for (size_t i = 3; i < cmd.size(); i++) { -                    new_param_value << cmd[i]; - -                    if (i+1 < cmd.size()) { -                        new_param_value << " "; -                    } -                } - -                rcs.set_param(cmd[1], cmd[2], new_param_value.str()); -                reply(socket, "ok"); -            } -            catch (const ParameterError &e) { -                reply(socket, e.what()); -            } -            catch (const exception &e) { -                reply(socket, "Error: Invalid parameter value. "); -            } -        } -        else { -            reply(socket, "Incorrect parameters for command 'set'"); -        } -    } -    else if (cmd[0] == "quit") { -        reply(socket, "Goodbye"); -    } -    else { -        reply(socket, "Message not understood"); -    } -} - -void RemoteControllerTelnet::reply(tcp::socket& socket, string message) -{ -    boost::system::error_code ignored_error; -    stringstream ss; -    ss << message << "\r\n"; -    boost::asio::write(socket, boost::asio::buffer(ss.str()), -            boost::asio::transfer_all(), -            ignored_error); -} - - -#if defined(HAVE_RC_ZEROMQ) - -RemoteControllerZmq::~RemoteControllerZmq() { -    m_active = false; -    m_fault = false; - -    if (m_restarter_thread.joinable()) { -        m_restarter_thread.join(); -    } - -    if (m_child_thread.joinable()) { -        m_child_thread.join(); -    } -} - -void RemoteControllerZmq::restart() -{ -    if (m_restarter_thread.joinable()) { -        m_restarter_thread.join(); -    } - -    m_restarter_thread = std::thread(&RemoteControllerZmq::restart_thread, this); -} - -// This runs in a separate thread, because -// it would take too long to be done in the main loop -// thread. -void RemoteControllerZmq::restart_thread() -{ -    m_active = false; - -    if (m_child_thread.joinable()) { -        m_child_thread.join(); -    } - -    m_child_thread = std::thread(&RemoteControllerZmq::process, this); -} - -void RemoteControllerZmq::recv_all(zmq::socket_t& pSocket, std::vector<std::string> &message) -{ -    bool more = true; -    do { -        zmq::message_t msg; -        pSocket.recv(&msg); -        std::string incoming((char*)msg.data(), msg.size()); -        message.push_back(incoming); -        more = msg.more(); -    } while (more); -} - -void RemoteControllerZmq::send_ok_reply(zmq::socket_t &pSocket) -{ -    zmq::message_t msg(2); -    char repCode[2] = {'o', 'k'}; -    memcpy ((void*) msg.data(), repCode, 2); -    pSocket.send(msg, 0); -} - -void RemoteControllerZmq::send_fail_reply(zmq::socket_t &pSocket, const std::string &error) -{ -    zmq::message_t msg1(4); -    char repCode[4] = {'f', 'a', 'i', 'l'}; -    memcpy ((void*) msg1.data(), repCode, 4); -    pSocket.send(msg1, ZMQ_SNDMORE); - -    zmq::message_t msg2(error.length()); -    memcpy ((void*) msg2.data(), error.c_str(), error.length()); -    pSocket.send(msg2, 0); -} - -void RemoteControllerZmq::process() -{ -    m_fault = false; - -    // create zmq reply socket for receiving ctrl parameters -    try { -        zmq::socket_t repSocket(m_zmqContext, ZMQ_REP); - -        // connect the socket -        int hwm = 100; -        int linger = 0; -        repSocket.setsockopt(ZMQ_RCVHWM, &hwm, sizeof(hwm)); -        repSocket.setsockopt(ZMQ_SNDHWM, &hwm, sizeof(hwm)); -        repSocket.setsockopt(ZMQ_LINGER, &linger, sizeof(linger)); -        repSocket.bind(m_endpoint.c_str()); - -        // create pollitem that polls the  ZMQ sockets -        zmq::pollitem_t pollItems[] = { {repSocket, 0, ZMQ_POLLIN, 0} }; -        while (m_active) { -            zmq::poll(pollItems, 1, 100); -            std::vector<std::string> msg; - -            if (pollItems[0].revents & ZMQ_POLLIN) { -                recv_all(repSocket, msg); - -                std::string command((char*)msg[0].data(), msg[0].size()); - -                if (msg.size() == 1 && command == "ping") { -                    send_ok_reply(repSocket); -                } -                else if (msg.size() == 1 && command == "list") { -                    size_t cohort_size = rcs.controllables.size(); -                    for (auto &controllable : rcs.controllables) { -                        std::stringstream ss; -                        ss << "{ \"name\": \"" << controllable->get_rc_name() << "\"," << -                            " \"params\": { "; - -                        list< vector<string> > params = controllable->get_parameter_descriptions(); -                        size_t i = 0; -                        for (auto ¶m : params) { -                            if (i > 0) { -                                ss << ", "; -                            } - -                            ss << "\"" << param[0] << "\": " << -                                "\"" << param[1] << "\""; - -                            i++; -                        } - -                        ss << " } }"; - -                        std::string msg_s = ss.str(); - -                        zmq::message_t zmsg(ss.str().size()); -                        memcpy ((void*) zmsg.data(), msg_s.data(), msg_s.size()); - -                        int flag = (--cohort_size > 0) ? ZMQ_SNDMORE : 0; -                        repSocket.send(zmsg, flag); -                    } -                } -                else if (msg.size() == 2 && command == "show") { -                    std::string module((char*) msg[1].data(), msg[1].size()); -                    try { -                        list< vector<string> > r = rcs.get_param_list_values(module); -                        size_t r_size = r.size(); -                        for (auto ¶m_val : r) { -                            std::stringstream ss; -                            ss << param_val[0] << ": " << param_val[1] << endl; -                            zmq::message_t zmsg(ss.str().size()); -                            memcpy(zmsg.data(), ss.str().data(), ss.str().size()); - -                            int flag = (--r_size > 0) ? ZMQ_SNDMORE : 0; -                            repSocket.send(zmsg, flag); -                        } -                    } -                    catch (const ParameterError &err) { -                        send_fail_reply(repSocket, err.what()); -                    } -                } -                else if (msg.size() == 3 && command == "get") { -                    std::string module((char*) msg[1].data(), msg[1].size()); -                    std::string parameter((char*) msg[2].data(), msg[2].size()); - -                    try { -                        std::string value = rcs.get_param(module, parameter); -                        zmq::message_t zmsg(value.size()); -                        memcpy ((void*) zmsg.data(), value.data(), value.size()); -                        repSocket.send(zmsg, 0); -                    } -                    catch (const ParameterError &err) { -                        send_fail_reply(repSocket, err.what()); -                    } -                } -                else if (msg.size() == 4 && command == "set") { -                    std::string module((char*) msg[1].data(), msg[1].size()); -                    std::string parameter((char*) msg[2].data(), msg[2].size()); -                    std::string value((char*) msg[3].data(), msg[3].size()); - -                    try { -                        rcs.set_param(module, parameter, value); -                        send_ok_reply(repSocket); -                    } -                    catch (const ParameterError &err) { -                        send_fail_reply(repSocket, err.what()); -                    } -                } -                else { -                    send_fail_reply(repSocket, -                            "Unsupported command. commands: list, show, get, set"); -                } -            } -        } -        repSocket.close(); -    } -    catch (const zmq::error_t &e) { -        etiLog.level(error) << "ZMQ RC error: " << std::string(e.what()); -    } -    catch (const std::exception& e) { -        etiLog.level(error) << "ZMQ RC caught exception: " << e.what(); -        m_fault = true; -    } -} - -#endif - diff --git a/src/RemoteControl.h b/src/RemoteControl.h deleted file mode 100644 index 0726b28..0000000 --- a/src/RemoteControl.h +++ /dev/null @@ -1,263 +0,0 @@ -/* -   Copyright (C) 2007, 2008, 2009, 2010, 2011, 2012 -   Her Majesty the Queen in Right of Canada (Communications Research -   Center Canada) - -   Copyright (C) 2019 -   Matthias P. Braendli, matthias.braendli@mpb.li - -    http://www.opendigitalradio.org - -   This module adds remote-control capability to some of the dabmux modules. - */ -/* -   This file is part of ODR-DabMux. - -   ODR-DabMux is free software: you can redistribute it and/or modify -   it under the terms of the GNU General Public License as -   published by the Free Software Foundation, either version 3 of the -   License, or (at your option) any later version. - -   ODR-DabMux is distributed in the hope that it will be useful, -   but WITHOUT ANY WARRANTY; without even the implied warranty of -   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the -   GNU General Public License for more details. - -   You should have received a copy of the GNU General Public License -   along with ODR-DabMux.  If not, see <http://www.gnu.org/licenses/>. - */ - -#pragma once - -#ifdef HAVE_CONFIG_H -#  include "config.h" -#endif - -#if defined(HAVE_RC_ZEROMQ) -#  include "zmq.hpp" -#endif - -#include <list> -#include <map> -#include <memory> -#include <string> -#include <atomic> -#include <iostream> -#include <boost/bind.hpp> -#include <boost/asio.hpp> -#include <boost/foreach.hpp> -#include <boost/tokenizer.hpp> -#include <thread> -#include <stdexcept> - -#include "Log.h" - -#define RC_ADD_PARAMETER(p, desc) {   \ -  std::vector<std::string> p; \ -  p.push_back(#p); \ -  p.push_back(desc); \ -  m_parameters.push_back(p); \ -} - -class ParameterError : public std::exception -{ -    public: -        ParameterError(std::string message) : m_message(message) {} -        ~ParameterError() throw() {} -        const char* what() const throw() { return m_message.c_str(); } - -    private: -        std::string m_message; -}; - -class RemoteControllable; - -/* Remote controllers (that recieve orders from the user) - * must implement BaseRemoteController - */ -class BaseRemoteController { -    public: -        /* When this returns one, the remote controller cannot be -         * used anymore, and must be restarted by dabmux -         */ -        virtual bool fault_detected() = 0; - -        /* In case of a fault, the remote controller can be -         * restarted. -         */ -        virtual void restart() = 0; - -        virtual ~BaseRemoteController() {} -}; - -/* Objects that support remote control must implement the following class */ -class RemoteControllable { -    public: -        RemoteControllable(const std::string& name) : -            m_rc_name(name) {} - -        RemoteControllable(const RemoteControllable& other) = delete; -        RemoteControllable& operator=(const RemoteControllable& other) = delete; - -        virtual ~RemoteControllable(); - -        /* return a short name used to identify the controllable. -         * It might be used in the commands the user has to type, so keep -         * it short -         */ -        virtual std::string get_rc_name() const { return m_rc_name; } - -        /* Return a list of possible parameters that can be set */ -        virtual std::list<std::string> get_supported_parameters() const; - -        /* Return a mapping of the descriptions of all parameters */ -        virtual std::list< std::vector<std::string> > -            get_parameter_descriptions() const -            { -                return m_parameters; -            } - -        /* Base function to set parameters. */ -        virtual void set_parameter( -                const std::string& parameter, -                const std::string& value) = 0; - -        /* Getting a parameter always returns a string. */ -        virtual const std::string get_parameter(const std::string& parameter) const = 0; - -    protected: -        std::string m_rc_name; -        std::list< std::vector<std::string> > m_parameters; -}; - -/* Holds all our remote controllers and controlled object. - */ -class RemoteControllers { -    public: -        void add_controller(std::shared_ptr<BaseRemoteController> rc); -        void enrol(RemoteControllable *rc); -        void remove_controllable(RemoteControllable *rc); -        void check_faults(); -        std::list< std::vector<std::string> > get_param_list_values(const std::string& name); -        std::string get_param(const std::string& name, const std::string& param); - -        void set_param( -                const std::string& name, -                const std::string& param, -                const std::string& value); - -        std::list<RemoteControllable*> controllables; - -    private: -        RemoteControllable* get_controllable_(const std::string& name); - -        std::list<std::shared_ptr<BaseRemoteController> > m_controllers; -}; - -extern RemoteControllers rcs; - -/* Implements a Remote controller based on a simple telnet CLI - * that listens on localhost - */ -class RemoteControllerTelnet : public BaseRemoteController { -    public: -        RemoteControllerTelnet() -            : m_active(false), -            m_io_service(), -            m_fault(false), -            m_port(0) { } - -        RemoteControllerTelnet(int port) -            : m_active(port > 0), -            m_io_service(), -            m_fault(false), -            m_port(port) -        { -            restart(); -        } - - -        RemoteControllerTelnet& operator=(const RemoteControllerTelnet& other) = delete; -        RemoteControllerTelnet(const RemoteControllerTelnet& other) = delete; - -        ~RemoteControllerTelnet(); - -        virtual bool fault_detected() { return m_fault; } - -        virtual void restart(); - -    private: -        void restart_thread(long); - -        void process(long); - -        void dispatch_command(boost::asio::ip::tcp::socket& socket, -                std::string command); - -        void reply(boost::asio::ip::tcp::socket& socket, std::string message); - -        void handle_accept( -                const boost::system::error_code& boost_error, -                boost::shared_ptr< boost::asio::ip::tcp::socket > socket, -                boost::asio::ip::tcp::acceptor& acceptor); - -        std::atomic<bool> m_active; - -        boost::asio::io_service m_io_service; - -        /* This is set to true if a fault occurred */ -        std::atomic<bool> m_fault; -        std::thread m_restarter_thread; - -        std::thread m_child_thread; - -        int m_port; -}; - -#if defined(HAVE_RC_ZEROMQ) -/* Implements a Remote controller using zmq transportlayer - * that listens on localhost - */ -class RemoteControllerZmq : public BaseRemoteController { -    public: -        RemoteControllerZmq() -            : m_active(false), m_fault(false), -            m_zmqContext(1), -            m_endpoint("") { } - -        RemoteControllerZmq(const std::string& endpoint) -            : m_active(not endpoint.empty()), m_fault(false), -            m_zmqContext(1), -            m_endpoint(endpoint), -            m_child_thread(&RemoteControllerZmq::process, this) { } - -        RemoteControllerZmq& operator=(const RemoteControllerZmq& other) = delete; -        RemoteControllerZmq(const RemoteControllerZmq& other) = delete; - -        ~RemoteControllerZmq(); - -        virtual bool fault_detected() { return m_fault; } - -        virtual void restart(); - -    private: -        void restart_thread(); - -        void recv_all(zmq::socket_t &pSocket, std::vector<std::string> &message); -        void send_ok_reply(zmq::socket_t &pSocket); -        void send_fail_reply(zmq::socket_t &pSocket, const std::string &error); -        void process(); - -        std::atomic<bool> m_active; - -        /* This is set to true if a fault occurred */ -        std::atomic<bool> m_fault; -        std::thread m_restarter_thread; - -        zmq::context_t m_zmqContext; - -        std::string m_endpoint; -        std::thread m_child_thread; -}; -#endif - diff --git a/src/TcpSocket.cpp b/src/TcpSocket.cpp deleted file mode 100644 index 3ebe73c..0000000 --- a/src/TcpSocket.cpp +++ /dev/null @@ -1,359 +0,0 @@ -/* -   Copyright (C) 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the Queen in -   Right of Canada (Communications Research Center Canada) - -   Copyright (C) 2019 -   Matthias P. Braendli, matthias.braendli@mpb.li - -    http://www.opendigitalradio.org -   */ -/* -   This file is part of ODR-DabMux. - -   ODR-DabMux is free software: you can redistribute it and/or modify -   it under the terms of the GNU General Public License as -   published by the Free Software Foundation, either version 3 of the -   License, or (at your option) any later version. - -   ODR-DabMux is distributed in the hope that it will be useful, -   but WITHOUT ANY WARRANTY; without even the implied warranty of -   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the -   GNU General Public License for more details. - -   You should have received a copy of the GNU General Public License -   along with ODR-DabMux.  If not, see <http://www.gnu.org/licenses/>. -   */ - -#include "TcpSocket.h" -#include "Log.h" -#include <iostream> -#include <cstdio> -#include <cstring> -#include <cstdint> -#include <signal.h> -#include <errno.h> -#include <poll.h> -#include <thread> - -using namespace std; - -using vec_u8 = std::vector<uint8_t>; - - -TcpSocket::TcpSocket() : -    m_sock(INVALID_SOCKET) -{ -} - -TcpSocket::TcpSocket(int port, const string& name) : -    m_sock(INVALID_SOCKET) -{ -    if (port) { -        if ((m_sock = socket(PF_INET, SOCK_STREAM, 0)) == INVALID_SOCKET) { -            throw std::runtime_error("Can't create socket"); -        } - -        reuseopt_t reuse = 1; -        if (setsockopt(m_sock, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse)) -                == SOCKET_ERROR) { -            throw std::runtime_error("Can't reuse address"); -        } - -#if defined(HAVE_SO_NOSIGPIPE) -        int val = 1; -        if (setsockopt(m_sock, SOL_SOCKET, SO_NOSIGPIPE, &val, sizeof(val)) -                == SOCKET_ERROR) { -            throw std::runtime_error("Can't set SO_NOSIGPIPE"); -        } -#endif - -        m_own_address.setAddress(name); -        m_own_address.setPort(port); - -        if (::bind(m_sock, m_own_address.getAddress(), sizeof(sockaddr_in)) == SOCKET_ERROR) { -            ::close(m_sock); -            m_sock = INVALID_SOCKET; -            throw std::runtime_error("Can't bind socket"); -        } -    } -} - -TcpSocket::TcpSocket(SOCKET sock, InetAddress own, InetAddress remote) : -    m_own_address(own), -    m_remote_address(remote), -    m_sock(sock) { } - -// The move constructors must ensure the moved-from -// TcpSocket won't destroy our socket handle -TcpSocket::TcpSocket(TcpSocket&& other) -{ -    m_sock = other.m_sock; -    other.m_sock = INVALID_SOCKET; - -    m_own_address = other.m_own_address; -    m_remote_address = other.m_remote_address; -} - -TcpSocket& TcpSocket::operator=(TcpSocket&& other) -{ -    m_sock = other.m_sock; -    other.m_sock = INVALID_SOCKET; - -    m_own_address = other.m_own_address; -    m_remote_address = other.m_remote_address; -    return *this; -} - -/** - *  Close the underlying socket. - *  @return 0  if ok - *          -1 if error - */ -int TcpSocket::close() -{ -    if (m_sock != INVALID_SOCKET) { -        int res = ::close(m_sock); -        if (res != 0) { -            setInetError("Can't close socket"); -            return -1; -        } -        m_sock = INVALID_SOCKET; -    } -    return 0; -} - -TcpSocket::~TcpSocket() -{ -    close(); -} - -bool TcpSocket::isValid() -{ -    return m_sock != INVALID_SOCKET; -} - -ssize_t TcpSocket::recv(void* data, size_t size) -{ -    ssize_t ret = ::recv(m_sock, (char*)data, size, 0); -    if (ret == SOCKET_ERROR) { -        stringstream ss; -        ss << "TCP Socket recv error: " << strerror(errno); -        throw std::runtime_error(ss.str()); -    } -    return ret; -} - - -ssize_t TcpSocket::send(const void* data, size_t size, int timeout_ms) -{ -    if (timeout_ms) { -        struct pollfd fds[1]; -        fds[0].fd = m_sock; -        fds[0].events = POLLOUT; - -        const int retval = poll(fds, 1, timeout_ms); - -        if (retval == -1) { -            stringstream ss; -            ss << "TCP Socket send error on poll(): " << strerror(errno); -            throw std::runtime_error(ss.str()); -        } -        else if (retval == 0) { -            // Timed out -            return 0; -        } -    } - -    /* On Linux, the MSG_NOSIGNAL flag ensures that the process would not -     * receive a SIGPIPE and die. -     * Other systems have SO_NOSIGPIPE set on the socket for the same effect. */ -#if defined(HAVE_MSG_NOSIGNAL) -    const int flags = MSG_NOSIGNAL; -#else -    const int flags = 0; -#endif -    const ssize_t ret = ::send(m_sock, (const char*)data, size, flags); - -    if (ret == SOCKET_ERROR) { -        stringstream ss; -        ss << "TCP Socket send error: " << strerror(errno); -        throw std::runtime_error(ss.str()); -    } -    return ret; -} - -void TcpSocket::listen() -{ -    if (::listen(m_sock, 1) == SOCKET_ERROR) { -        stringstream ss; -        ss << "TCP Socket listen error: " << strerror(errno); -        throw std::runtime_error(ss.str()); -    } -} - -TcpSocket TcpSocket::accept() -{ -    InetAddress remote_addr; -    socklen_t addrLen = sizeof(sockaddr_in); - -    SOCKET socket = ::accept(m_sock, remote_addr.getAddress(), &addrLen); -    if (socket == SOCKET_ERROR) { -        stringstream ss; -        ss << "TCP Socket accept error: " << strerror(errno); -        throw std::runtime_error(ss.str()); -    } -    else { -        TcpSocket client(socket, m_own_address, remote_addr); -        return client; -    } -} - -TcpSocket TcpSocket::accept(int timeout_ms) -{ -    struct pollfd fds[1]; -    fds[0].fd = m_sock; -    fds[0].events = POLLIN | POLLOUT; - -    int retval = poll(fds, 1, timeout_ms); - -    if (retval == -1) { -        stringstream ss; -        ss << "TCP Socket accept error: " << strerror(errno); -        throw std::runtime_error(ss.str()); -    } -    else if (retval) { -        return accept(); -    } -    else { -        TcpSocket invalidsock(0, ""); -        return invalidsock; -    } -} - - -InetAddress TcpSocket::getOwnAddress() const -{ -    return m_own_address; -} - -InetAddress TcpSocket::getRemoteAddress() const -{ -    return m_remote_address; -} - - -TCPConnection::TCPConnection(TcpSocket&& sock) : -            queue(), -            m_running(true), -            m_sender_thread(), -            m_sock(move(sock)) -{ -    auto own_addr = m_sock.getOwnAddress(); -    auto addr = m_sock.getRemoteAddress(); -    etiLog.level(debug) << "New TCP Connection on port " << -        own_addr.getPort() << " from " << -        addr.getHostAddress() << ":" << addr.getPort(); -    m_sender_thread = std::thread(&TCPConnection::process, this); -} - -TCPConnection::~TCPConnection() -{ -    m_running = false; -    vec_u8 termination_marker; -    queue.push(termination_marker); -    m_sender_thread.join(); -} - -void TCPConnection::process() -{ -    while (m_running) { -        vec_u8 data; -        queue.wait_and_pop(data); - -        if (data.empty()) { -            // empty vector is the termination marker -            m_running = false; -            break; -        } - -        try { -            ssize_t remaining = data.size(); -            const uint8_t *buf = reinterpret_cast<const uint8_t*>(data.data()); -            const int timeout_ms = 10; // Less than one ETI frame - -            while (m_running and remaining > 0) { -                const ssize_t sent = m_sock.send(buf, remaining, timeout_ms); -                if (sent < 0 or sent > remaining) { -                    throw std::logic_error("Invalid TcpSocket::send() return value"); -                } -                remaining -= sent; -                buf += sent; -            } -        } -        catch (const std::runtime_error& e) { -            m_running = false; -        } -    } - - -    auto own_addr = m_sock.getOwnAddress(); -    auto addr = m_sock.getRemoteAddress(); -    etiLog.level(debug) << "Dropping TCP Connection on port " << -        own_addr.getPort() << " from " << -        addr.getHostAddress() << ":" << addr.getPort(); -} - - -TCPDataDispatcher::TCPDataDispatcher(size_t max_queue_size) : -    m_max_queue_size(max_queue_size) -{ -} - -TCPDataDispatcher::~TCPDataDispatcher() -{ -    m_running = false; -    m_connections.clear(); -    m_listener_socket.close(); -    m_listener_thread.join(); -} - -void TCPDataDispatcher::start(int port, const string& address) -{ -    TcpSocket sock(port, address); -    m_listener_socket = move(sock); - -    m_running = true; -    m_listener_thread = std::thread(&TCPDataDispatcher::process, this); -} - -void TCPDataDispatcher::write(const vec_u8& data) -{ -    for (auto& connection : m_connections) { -        connection.queue.push(data); -    } - -    m_connections.remove_if( -            [&](const TCPConnection& conn){ return conn.queue.size() > m_max_queue_size; }); -} - -void TCPDataDispatcher::process() -{ -    try { -        m_listener_socket.listen(); - -        const int timeout_ms = 1000; - -        while (m_running) { -            // Add a new TCPConnection to the list, constructing it from the client socket -            auto sock = m_listener_socket.accept(timeout_ms); -            if (sock.isValid()) { -                m_connections.emplace(m_connections.begin(), move(sock)); -            } -        } -    } -    catch (const std::runtime_error& e) { -        etiLog.level(error) << "TCPDataDispatcher caught runtime error: " << e.what(); -        m_running = false; -    } -} - diff --git a/src/TcpSocket.h b/src/TcpSocket.h deleted file mode 100644 index ec7afd3..0000000 --- a/src/TcpSocket.h +++ /dev/null @@ -1,164 +0,0 @@ -/* -   Copyright (C) 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the Queen in -   Right of Canada (Communications Research Center Canada) - -   Copyright (C) 2019 -   Matthias P. Braendli, matthias.braendli@mpb.li - -    http://www.opendigitalradio.org -   */ -/* -   This file is part of ODR-DabMux. - -   ODR-DabMux is free software: you can redistribute it and/or modify -   it under the terms of the GNU General Public License as -   published by the Free Software Foundation, either version 3 of the -   License, or (at your option) any later version. - -   ODR-DabMux is distributed in the hope that it will be useful, -   but WITHOUT ANY WARRANTY; without even the implied warranty of -   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the -   GNU General Public License for more details. - -   You should have received a copy of the GNU General Public License -   along with ODR-DabMux.  If not, see <http://www.gnu.org/licenses/>. -   */ - -#ifndef _TCPSOCKET -#define _TCPSOCKET - -#ifdef HAVE_CONFIG_H -#   include "config.h" -#endif - -#include "InetAddress.h" -#include "ThreadsafeQueue.h" -#include <sys/socket.h> -#include <netinet/in.h> -#include <unistd.h> -#include <netdb.h> -#include <arpa/inet.h> -#include <pthread.h> -#define SOCKET           int -#define INVALID_SOCKET   -1 -#define SOCKET_ERROR     -1 -#define reuseopt_t       int - -#include <iostream> -#include <string> -#include <vector> -#include <memory> -#include <atomic> -#include <thread> -#include <list> - -/** - *  This class represents a TCP socket. - */ -class TcpSocket -{ -    public: -        /** Create a new socket that does nothing */ -        TcpSocket(); - -        /** Create a new socket listening for incoming connections. -         *  @param port The port number on which the socket will listen. -         *  @param name The IP address on which the socket will be bound. -         *              It is used to bind the socket on a specific interface if -         *              the computer have many NICs. -         */ -        TcpSocket(int port, const std::string& name); -        ~TcpSocket(); -        TcpSocket(TcpSocket&& other); -        TcpSocket& operator=(TcpSocket&& other); -        TcpSocket(const TcpSocket& other) = delete; -        TcpSocket& operator=(const TcpSocket& other) = delete; - -        bool isValid(void); - -        int close(void); - -        /** Send data over the TCP connection. -         *  @param data The buffer that will be sent. -         *  @param size Number of bytes to send. -         *  @param timeout_ms number of milliseconds before timeout, or 0 for infinite timeout -         *  return number of bytes sent, 0 on timeout, or throws runtime_error. -         */ -        ssize_t send(const void* data, size_t size, int timeout_ms=0); - -        /** Receive data from the socket. -         *  @param data The buffer that will receive data. -         *  @param size The buffer size. -         *  @return number of bytes received or -1 (SOCKET_ERROR) if error -         */ -        ssize_t recv(void* data, size_t size); - -        void listen(void); -        TcpSocket accept(void); - -        /* Returns either valid socket if a connection was -         * accepted before the timeout expired, or an invalid -         * socket otherwise. -         */ -        TcpSocket accept(int timeout_ms); - -        /** Retrieve address this socket is bound to */ -        InetAddress getOwnAddress() const; -        InetAddress getRemoteAddress() const; - -    private: -        TcpSocket(SOCKET sock, InetAddress own, InetAddress remote); - -        /// The address on which the socket is bound. -        InetAddress m_own_address; -        InetAddress m_remote_address; -        /// The low-level socket used by system functions. -        SOCKET m_sock; -}; - -/* Helper class for TCPDataDispatcher, contains a queue of pending data and - * a sender thread. */ -class TCPConnection -{ -    public: -        TCPConnection(TcpSocket&& sock); -        TCPConnection(const TCPConnection&) = delete; -        TCPConnection& operator=(const TCPConnection&) = delete; -        ~TCPConnection(); - -        ThreadsafeQueue<std::vector<uint8_t> > queue; - -    private: -        std::atomic<bool> m_running; -        std::thread m_sender_thread; -        TcpSocket m_sock; - -        void process(void); -}; - -/* Send a TCP stream to several destinations, and automatically disconnect destinations - * whose buffer overflows. - */ -class TCPDataDispatcher -{ -    public: -        TCPDataDispatcher(size_t max_queue_size); -        ~TCPDataDispatcher(); -        TCPDataDispatcher(const TCPDataDispatcher&) = delete; -        TCPDataDispatcher& operator=(const TCPDataDispatcher&) = delete; - -        void start(int port, const std::string& address); -        void write(const std::vector<uint8_t>& data); - -    private: -        void process(void); - -        size_t m_max_queue_size; - -        std::atomic<bool> m_running; -        std::thread m_listener_thread; -        TcpSocket m_listener_socket; -        std::list<TCPConnection> m_connections; -}; - -#endif // _TCPSOCKET diff --git a/src/ThreadsafeQueue.h b/src/ThreadsafeQueue.h deleted file mode 100644 index ab287b2..0000000 --- a/src/ThreadsafeQueue.h +++ /dev/null @@ -1,178 +0,0 @@ -/* -   Copyright (C) 2007, 2008, 2009, 2010, 2011 Her Majesty the Queen in -   Right of Canada (Communications Research Center Canada) - -   Copyright (C) 2018 -   Matthias P. Braendli, matthias.braendli@mpb.li - -   An implementation for a threadsafe queue, depends on C++11 - -   When creating a ThreadsafeQueue, one can specify the minimal number -   of elements it must contain before it is possible to take one -   element out. - */ -/* -   This file is part of ODR-DabMux. - -   ODR-DabMux is free software: you can redistribute it and/or modify -   it under the terms of the GNU General Public License as -   published by the Free Software Foundation, either version 3 of the -   License, or (at your option) any later version. - -   ODR-DabMux is distributed in the hope that it will be useful, -   but WITHOUT ANY WARRANTY; without even the implied warranty of -   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the -   GNU General Public License for more details. - -   You should have received a copy of the GNU General Public License -   along with ODR-DabMux.  If not, see <http://www.gnu.org/licenses/>. - */ - -#pragma once - -#include <mutex> -#include <condition_variable> -#include <queue> -#include <utility> - -/* This queue is meant to be used by two threads. One producer - * that pushes elements into the queue, and one consumer that - * retrieves the elements. - * - * The queue can make the consumer block until an element - * is available, or a wakeup requested. - */ - -/* Class thrown by blocking pop to tell the consumer - * that there's a wakeup requested. */ -class ThreadsafeQueueWakeup {}; - -template<typename T> -class ThreadsafeQueue -{ -public: -    /* Push one element into the queue, and notify another thread that -     * might be waiting. -     * -     * returns the new queue size. -     */ -    size_t push(T const& val) -    { -        std::unique_lock<std::mutex> lock(the_mutex); -        the_queue.push(val); -        size_t queue_size = the_queue.size(); -        lock.unlock(); - -        the_rx_notification.notify_one(); - -        return queue_size; -    } - -    size_t push(T&& val) -    { -        std::unique_lock<std::mutex> lock(the_mutex); -        the_queue.emplace(std::move(val)); -        size_t queue_size = the_queue.size(); -        lock.unlock(); - -        the_rx_notification.notify_one(); - -        return queue_size; -    } - -    /* Push one element into the queue, but wait until the -     * queue size goes below the threshold. -     * -     * Notify waiting thread. -     * -     * returns the new queue size. -     */ -    size_t push_wait_if_full(T const& val, size_t threshold) -    { -        std::unique_lock<std::mutex> lock(the_mutex); -        while (the_queue.size() >= threshold) { -            the_tx_notification.wait(lock); -        } -        the_queue.push(val); -        size_t queue_size = the_queue.size(); -        lock.unlock(); - -        the_rx_notification.notify_one(); - -        return queue_size; -    } - -    /* Trigger a wakeup event on a blocking consumer, which -     * will receive a ThreadsafeQueueWakeup exception. -     */ -    void trigger_wakeup(void) -    { -        std::unique_lock<std::mutex> lock(the_mutex); -        wakeup_requested = true; -        lock.unlock(); -        the_rx_notification.notify_one(); -    } - -    /* Send a notification for the receiver thread */ -    void notify(void) -    { -        the_rx_notification.notify_one(); -    } - -    bool empty() const -    { -        std::unique_lock<std::mutex> lock(the_mutex); -        return the_queue.empty(); -    } - -    size_t size() const -    { -        std::unique_lock<std::mutex> lock(the_mutex); -        return the_queue.size(); -    } - -    bool try_pop(T& popped_value) -    { -        std::unique_lock<std::mutex> lock(the_mutex); -        if (the_queue.empty()) { -            return false; -        } - -        popped_value = the_queue.front(); -        the_queue.pop(); - -        lock.unlock(); -        the_tx_notification.notify_one(); - -        return true; -    } - -    void wait_and_pop(T& popped_value, size_t prebuffering = 1) -    { -        std::unique_lock<std::mutex> lock(the_mutex); -        while (the_queue.size() < prebuffering and -                not wakeup_requested) { -            the_rx_notification.wait(lock); -        } - -        if (wakeup_requested) { -            wakeup_requested = false; -            throw ThreadsafeQueueWakeup(); -        } -        else { -            std::swap(popped_value, the_queue.front()); -            the_queue.pop(); - -            lock.unlock(); -            the_tx_notification.notify_one(); -        } -    } - -private: -    std::queue<T> the_queue; -    mutable std::mutex the_mutex; -    std::condition_variable the_rx_notification; -    std::condition_variable the_tx_notification; -    bool wakeup_requested = false; -}; - diff --git a/src/UdpSocket.cpp b/src/UdpSocket.cpp deleted file mode 100644 index 3d015ec..0000000 --- a/src/UdpSocket.cpp +++ /dev/null @@ -1,256 +0,0 @@ -/* -   Copyright (C) 2003, 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the -   Queen in Right of Canada (Communications Research Center Canada) - -   Copyright (C) 2017 -   Matthias P. Braendli, matthias.braendli@mpb.li - -    http://www.opendigitalradio.org -   */ -/* -   This file is part of ODR-DabMux. - -   ODR-DabMux is free software: you can redistribute it and/or modify -   it under the terms of the GNU General Public License as -   published by the Free Software Foundation, either version 3 of the -   License, or (at your option) any later version. - -   ODR-DabMux is distributed in the hope that it will be useful, -   but WITHOUT ANY WARRANTY; without even the implied warranty of -   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the -   GNU General Public License for more details. - -   You should have received a copy of the GNU General Public License -   along with ODR-DabMux.  If not, see <http://www.gnu.org/licenses/>. -   */ - -#include "UdpSocket.h" - -#include <iostream> -#include <stdio.h> -#include <errno.h> -#include <fcntl.h> -#include <string.h> - -using namespace std; - -UdpSocket::UdpSocket() : -    listenSocket(INVALID_SOCKET) -{ -    reinit(0, ""); -} - -UdpSocket::UdpSocket(int port) : -    listenSocket(INVALID_SOCKET) -{ -    reinit(port, ""); -} - -UdpSocket::UdpSocket(int port, const std::string& name) : -    listenSocket(INVALID_SOCKET) -{ -    reinit(port, name); -} - - -int UdpSocket::setBlocking(bool block) -{ -    int res; -    if (block) -        res = fcntl(listenSocket, F_SETFL, 0); -    else -        res = fcntl(listenSocket, F_SETFL, O_NONBLOCK); -    if (res == SOCKET_ERROR) { -        setInetError("Can't change blocking state of socket"); -        return -1; -    } -    return 0; -} - -int UdpSocket::reinit(int port, const std::string& name) -{ -    if (listenSocket != INVALID_SOCKET) { -        ::close(listenSocket); -    } - -    if ((listenSocket = socket(PF_INET, SOCK_DGRAM, 0)) == INVALID_SOCKET) { -        setInetError("Can't create socket"); -        return -1; -    } -    reuseopt_t reuse = 1; -    if (setsockopt(listenSocket, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse)) -            == SOCKET_ERROR) { -        setInetError("Can't reuse address"); -        return -1; -    } - -    if (port) { -        address.setAddress(name); -        address.setPort(port); - -        if (::bind(listenSocket, address.getAddress(), sizeof(sockaddr_in)) == SOCKET_ERROR) { -            setInetError("Can't bind socket"); -            ::close(listenSocket); -            listenSocket = INVALID_SOCKET; -            return -1; -        } -    } -    return 0; -} - -int UdpSocket::close() -{ -    if (listenSocket != INVALID_SOCKET) { -        ::close(listenSocket); -    } - -    listenSocket = INVALID_SOCKET; - -    return 0; -} - -UdpSocket::~UdpSocket() -{ -    if (listenSocket != INVALID_SOCKET) { -        ::close(listenSocket); -    } -} - - -int UdpSocket::receive(UdpPacket& packet) -{ -    socklen_t addrSize; -    addrSize = sizeof(*packet.getAddress().getAddress()); -    ssize_t ret = recvfrom(listenSocket, -            packet.getData(), -            packet.getSize(), -            0, -            packet.getAddress().getAddress(), -            &addrSize); - -    if (ret == SOCKET_ERROR) { -        packet.setSize(0); -        if (errno == EAGAIN) { -            return 0; -        } -        setInetError("Can't receive UDP packet"); -        return -1; -    } - -    packet.setSize(ret); -    return 0; -} - -int UdpSocket::send(UdpPacket& packet) -{ -    int ret = sendto(listenSocket, packet.getData(), packet.getSize(), 0, -            packet.getAddress().getAddress(), sizeof(*packet.getAddress().getAddress())); -    if (ret == SOCKET_ERROR && errno != ECONNREFUSED) { -        setInetError("Can't send UDP packet"); -        return -1; -    } -    return 0; -} - - -int UdpSocket::send(const std::vector<uint8_t>& data, InetAddress destination) -{ -    int ret = sendto(listenSocket, &data[0], data.size(), 0, -            destination.getAddress(), sizeof(*destination.getAddress())); -    if (ret == SOCKET_ERROR && errno != ECONNREFUSED) { -        setInetError("Can't send UDP packet"); -        return -1; -    } -    return 0; -} - - -/** - *  Must be called to receive data on a multicast address. - *  @param groupname The multica -st address to join. - *  @return 0 if ok, -1 if error - */ -int UdpSocket::joinGroup(char* groupname) -{ -    ip_mreqn group; -    if ((group.imr_multiaddr.s_addr = inet_addr(groupname)) == INADDR_NONE) { -        setInetError(groupname); -        return -1; -    } -    if (!IN_MULTICAST(ntohl(group.imr_multiaddr.s_addr))) { -        setInetError("Not a multicast address"); -        return -1; -    } -    group.imr_address.s_addr = htons(INADDR_ANY);; -    group.imr_ifindex = 0; -    if (setsockopt(listenSocket, IPPROTO_IP, IP_ADD_MEMBERSHIP, &group, sizeof(group)) -            == SOCKET_ERROR) { -        setInetError("Can't join multicast group"); -    } -    return 0; -} - -int UdpSocket::setMulticastTTL(int ttl) -{ -    if (setsockopt(listenSocket, IPPROTO_IP, IP_MULTICAST_TTL, &ttl, sizeof(ttl)) -            == SOCKET_ERROR) { -        setInetError("Can't set ttl"); -        return -1; -    } - -    return 0; -} - -int UdpSocket::setMulticastSource(const char* source_addr) -{ -    struct in_addr addr; -    if (inet_aton(source_addr, &addr) == 0) { -        setInetError("Can't parse source address"); -        return -1; -    } - -    if (setsockopt(listenSocket, IPPROTO_IP, IP_MULTICAST_IF, &addr, sizeof(addr)) -            == SOCKET_ERROR) { -        setInetError("Can't set source address"); -        return -1; -    } - -    return 0; -} - -UdpPacket::UdpPacket() { } - -UdpPacket::UdpPacket(size_t initSize) : -    m_buffer(initSize) -{ } - - -void UdpPacket::setSize(size_t newSize) -{ -    m_buffer.resize(newSize); -} - - -uint8_t* UdpPacket::getData() -{ -    return &m_buffer[0]; -} - - -void UdpPacket::addData(const void *data, size_t size) -{ -    uint8_t *d = (uint8_t*)data; -    std::copy(d, d + size, std::back_inserter(m_buffer)); -} - -size_t UdpPacket::getSize() -{ -    return m_buffer.size(); -} - -InetAddress UdpPacket::getAddress() -{ -    return address; -} - diff --git a/src/UdpSocket.h b/src/UdpSocket.h deleted file mode 100644 index f51e87c..0000000 --- a/src/UdpSocket.h +++ /dev/null @@ -1,174 +0,0 @@ -/* -   Copyright (C) 2003, 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the -   Queen in Right of Canada (Communications Research Center Canada) - -   Copyright (C) 2017 -   Matthias P. Braendli, matthias.braendli@mpb.li - -    http://www.opendigitalradio.org -   */ -/* -   This file is part of ODR-DabMux. - -   ODR-DabMux is free software: you can redistribute it and/or modify -   it under the terms of the GNU General Public License as -   published by the Free Software Foundation, either version 3 of the -   License, or (at your option) any later version. - -   ODR-DabMux is distributed in the hope that it will be useful, -   but WITHOUT ANY WARRANTY; without even the implied warranty of -   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the -   GNU General Public License for more details. - -   You should have received a copy of the GNU General Public License -   along with ODR-DabMux.  If not, see <http://www.gnu.org/licenses/>. -   */ - -#pragma once - -#ifdef HAVE_CONFIG_H -#   include "config.h" -#endif - -#include "InetAddress.h" -#include <sys/socket.h> -#include <netinet/in.h> -#include <unistd.h> -#include <netdb.h> -#include <arpa/inet.h> -#include <pthread.h> -#define SOCKET           int -#define INVALID_SOCKET   -1 -#define SOCKET_ERROR     -1 -#define reuseopt_t       int - -#include <stdlib.h> -#include <iostream> -#include <vector> - -class UdpPacket; - - -/** - *  This class represents a socket for sending and receiving UDP packets. - * - *  A UDP socket is the sending or receiving point for a packet delivery service. - *  Each packet sent or received on a datagram socket is individually - *  addressed and routed. Multiple packets sent from one machine to another may - *  be routed differently, and may arrive in any order. - */ -class UdpSocket -{ -    public: -        /** Create a new socket that will not be bound to any port. To be used -         * for data output. -         */ -        UdpSocket(); -        /** Create a new socket. -         *  @param port The port number on which the socket will be bound -         */ -        UdpSocket(int port); -        /** Create a new socket. -         *  @param port The port number on which the socket will be bound -         *  @param name The IP address on which the socket will be bound. -         *              It is used to bind the socket on a specific interface if -         *              the computer have many NICs. -         */ -        UdpSocket(int port, const std::string& name); -        ~UdpSocket(); -        UdpSocket(const UdpSocket& other) = delete; -        const UdpSocket& operator=(const UdpSocket& other) = delete; - -        /** reinitialise socket. Close the already open socket, and -         * create a new one -         */ -        int reinit(int port, const std::string& name); - -        /** Close the socket -         */ -        int close(void); - -        /** Send an UDP packet. -         *  @param packet The UDP packet to be sent. It includes the data and the -         *                destination address -         *  return 0 if ok, -1 if error -         */ -        int send(UdpPacket& packet); - -        /** Send an UDP packet -         * -         *  return 0 if ok, -1 if error -         */ -        int send(const std::vector<uint8_t>& data, InetAddress destination); - -        /** Receive an UDP packet. -         *  @param packet The packet that will receive the data. The address will be set -         *                to the source address. -         *  @return 0 if ok, -1 if error -         */ -        int receive(UdpPacket& packet); - -        int joinGroup(char* groupname); -        int setMulticastSource(const char* source_addr); -        int setMulticastTTL(int ttl); - -        /** Set blocking mode. By default, the socket is blocking. -         *  @return 0  if ok -         *          -1 if error -         */ -        int setBlocking(bool block); - -    protected: - -        /// The address on which the socket is bound. -        InetAddress address; -        /// The low-level socket used by system functions. -        SOCKET listenSocket; -}; - -/** This class represents a UDP packet. - * - *  A UDP packet contains a payload (sequence of bytes) and an address. For - *  outgoing packets, the address is the destination address. For incoming - *  packets, the address tells the user from what source the packet arrived from. - */ -class UdpPacket -{ -    public: -        /** Construct an empty UDP packet. -         */ -        UdpPacket(); -        UdpPacket(size_t initSize); - -        /** Give the pointer to data. -         *  @return The pointer -         */ -        uint8_t* getData(void); - -        /** Append some data at the end of data buffer and adjust size. -         *  @param data Pointer to the data to add -         *  @param size Size in bytes of new data -         */ -        void addData(const void *data, size_t size); - -        size_t getSize(void); - -        /** Changes size of the data buffer size. Keeps data intact unless -         *  truncated. -         */ -        void setSize(size_t newSize); - -        /** Returns the UDP address of the packet. -         */ -        InetAddress getAddress(void); - -        const std::vector<uint8_t>& getBuffer(void) const { -            return m_buffer; -        } - - -    private: -        std::vector<uint8_t> m_buffer; -        InetAddress address; -}; - diff --git a/src/crc.c b/src/crc.c deleted file mode 100644 index cc02473..0000000 --- a/src/crc.c +++ /dev/null @@ -1,266 +0,0 @@ -/* -   Copyright (C) 2003, 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the -   Queen in Right of Canada (Communications Research Center Canada) -   */ -/* -   This file is part of ODR-DabMux. - -   ODR-DabMux is free software: you can redistribute it and/or modify -   it under the terms of the GNU General Public License as -   published by the Free Software Foundation, either version 3 of the -   License, or (at your option) any later version. - -   ODR-DabMux is distributed in the hope that it will be useful, -   but WITHOUT ANY WARRANTY; without even the implied warranty of -   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the -   GNU General Public License for more details. - -   You should have received a copy of the GNU General Public License -   along with ODR-DabMux.  If not, see <http://www.gnu.org/licenses/>. -   */ - -#include "crc.h" -#ifndef _WIN32 -#    include <unistd.h> -#    include <netinet/in.h> -#endif -#include <stdio.h> -#include <fcntl.h> - -//#define CCITT       0x1021 - -uint8_t crc8tab[256] = { -    0x00, 0x07, 0x0e, 0x09, 0x1c, 0x1b, 0x12, 0x15, -    0x38, 0x3f, 0x36, 0x31, 0x24, 0x23, 0x2a, 0x2d, -    0x70, 0x77, 0x7e, 0x79, 0x6c, 0x6b, 0x62, 0x65, -    0x48, 0x4f, 0x46, 0x41, 0x54, 0x53, 0x5a, 0x5d, -    0xe0, 0xe7, 0xee, 0xe9, 0xfc, 0xfb, 0xf2, 0xf5, -    0xd8, 0xdf, 0xd6, 0xd1, 0xc4, 0xc3, 0xca, 0xcd, -    0x90, 0x97, 0x9e, 0x99, 0x8c, 0x8b, 0x82, 0x85, -    0xa8, 0xaf, 0xa6, 0xa1, 0xb4, 0xb3, 0xba, 0xbd, -    0xc7, 0xc0, 0xc9, 0xce, 0xdb, 0xdc, 0xd5, 0xd2, -    0xff, 0xf8, 0xf1, 0xf6, 0xe3, 0xe4, 0xed, 0xea, -    0xb7, 0xb0, 0xb9, 0xbe, 0xab, 0xac, 0xa5, 0xa2, -    0x8f, 0x88, 0x81, 0x86, 0x93, 0x94, 0x9d, 0x9a, -    0x27, 0x20, 0x29, 0x2e, 0x3b, 0x3c, 0x35, 0x32, -    0x1f, 0x18, 0x11, 0x16, 0x03, 0x04, 0x0d, 0x0a, -    0x57, 0x50, 0x59, 0x5e, 0x4b, 0x4c, 0x45, 0x42, -    0x6f, 0x68, 0x61, 0x66, 0x73, 0x74, 0x7d, 0x7a, -    0x89, 0x8e, 0x87, 0x80, 0x95, 0x92, 0x9b, 0x9c, -    0xb1, 0xb6, 0xbf, 0xb8, 0xad, 0xaa, 0xa3, 0xa4, -    0xf9, 0xfe, 0xf7, 0xf0, 0xe5, 0xe2, 0xeb, 0xec, -    0xc1, 0xc6, 0xcf, 0xc8, 0xdd, 0xda, 0xd3, 0xd4, -    0x69, 0x6e, 0x67, 0x60, 0x75, 0x72, 0x7b, 0x7c, -    0x51, 0x56, 0x5f, 0x58, 0x4d, 0x4a, 0x43, 0x44, -    0x19, 0x1e, 0x17, 0x10, 0x05, 0x02, 0x0b, 0x0c, -    0x21, 0x26, 0x2f, 0x28, 0x3d, 0x3a, 0x33, 0x34, -    0x4e, 0x49, 0x40, 0x47, 0x52, 0x55, 0x5c, 0x5b, -    0x76, 0x71, 0x78, 0x7f, 0x6a, 0x6d, 0x64, 0x63, -    0x3e, 0x39, 0x30, 0x37, 0x22, 0x25, 0x2c, 0x2b, -    0x06, 0x01, 0x08, 0x0f, 0x1a, 0x1d, 0x14, 0x13, -    0xae, 0xa9, 0xa0, 0xa7, 0xb2, 0xb5, 0xbc, 0xbb, -    0x96, 0x91, 0x98, 0x9f, 0x8a, 0x8d, 0x84, 0x83, -    0xde, 0xd9, 0xd0, 0xd7, 0xc2, 0xc5, 0xcc, 0xcb, -    0xe6, 0xe1, 0xe8, 0xef, 0xfa, 0xfd, 0xf4, 0xf3 -}; - - -uint16_t crc16tab[256] = { -    0x0000, 0x1021, 0x2042, 0x3063, 0x4084, 0x50a5, 0x60c6, 0x70e7, -    0x8108, 0x9129, 0xa14a, 0xb16b, 0xc18c, 0xd1ad, 0xe1ce, 0xf1ef, -    0x1231, 0x0210, 0x3273, 0x2252, 0x52b5, 0x4294, 0x72f7, 0x62d6, -    0x9339, 0x8318, 0xb37b, 0xa35a, 0xd3bd, 0xc39c, 0xf3ff, 0xe3de, -    0x2462, 0x3443, 0x0420, 0x1401, 0x64e6, 0x74c7, 0x44a4, 0x5485, -    0xa56a, 0xb54b, 0x8528, 0x9509, 0xe5ee, 0xf5cf, 0xc5ac, 0xd58d, -    0x3653, 0x2672, 0x1611, 0x0630, 0x76d7, 0x66f6, 0x5695, 0x46b4, -    0xb75b, 0xa77a, 0x9719, 0x8738, 0xf7df, 0xe7fe, 0xd79d, 0xc7bc, -    0x48c4, 0x58e5, 0x6886, 0x78a7, 0x0840, 0x1861, 0x2802, 0x3823, -    0xc9cc, 0xd9ed, 0xe98e, 0xf9af, 0x8948, 0x9969, 0xa90a, 0xb92b, -    0x5af5, 0x4ad4, 0x7ab7, 0x6a96, 0x1a71, 0x0a50, 0x3a33, 0x2a12, -    0xdbfd, 0xcbdc, 0xfbbf, 0xeb9e, 0x9b79, 0x8b58, 0xbb3b, 0xab1a, -    0x6ca6, 0x7c87, 0x4ce4, 0x5cc5, 0x2c22, 0x3c03, 0x0c60, 0x1c41, -    0xedae, 0xfd8f, 0xcdec, 0xddcd, 0xad2a, 0xbd0b, 0x8d68, 0x9d49, -    0x7e97, 0x6eb6, 0x5ed5, 0x4ef4, 0x3e13, 0x2e32, 0x1e51, 0x0e70, -    0xff9f, 0xefbe, 0xdfdd, 0xcffc, 0xbf1b, 0xaf3a, 0x9f59, 0x8f78, -    0x9188, 0x81a9, 0xb1ca, 0xa1eb, 0xd10c, 0xc12d, 0xf14e, 0xe16f, -    0x1080, 0x00a1, 0x30c2, 0x20e3, 0x5004, 0x4025, 0x7046, 0x6067, -    0x83b9, 0x9398, 0xa3fb, 0xb3da, 0xc33d, 0xd31c, 0xe37f, 0xf35e, -    0x02b1, 0x1290, 0x22f3, 0x32d2, 0x4235, 0x5214, 0x6277, 0x7256, -    0xb5ea, 0xa5cb, 0x95a8, 0x8589, 0xf56e, 0xe54f, 0xd52c, 0xc50d, -    0x34e2, 0x24c3, 0x14a0, 0x0481, 0x7466, 0x6447, 0x5424, 0x4405, -    0xa7db, 0xb7fa, 0x8799, 0x97b8, 0xe75f, 0xf77e, 0xc71d, 0xd73c, -    0x26d3, 0x36f2, 0x0691, 0x16b0, 0x6657, 0x7676, 0x4615, 0x5634, -    0xd94c, 0xc96d, 0xf90e, 0xe92f, 0x99c8, 0x89e9, 0xb98a, 0xa9ab, -    0x5844, 0x4865, 0x7806, 0x6827, 0x18c0, 0x08e1, 0x3882, 0x28a3, -    0xcb7d, 0xdb5c, 0xeb3f, 0xfb1e, 0x8bf9, 0x9bd8, 0xabbb, 0xbb9a, -    0x4a75, 0x5a54, 0x6a37, 0x7a16, 0x0af1, 0x1ad0, 0x2ab3, 0x3a92, -    0xfd2e, 0xed0f, 0xdd6c, 0xcd4d, 0xbdaa, 0xad8b, 0x9de8, 0x8dc9, -    0x7c26, 0x6c07, 0x5c64, 0x4c45, 0x3ca2, 0x2c83, 0x1ce0, 0x0cc1, -    0xef1f, 0xff3e, 0xcf5d, 0xdf7c, 0xaf9b, 0xbfba, 0x8fd9, 0x9ff8, -    0x6e17, 0x7e36, 0x4e55, 0x5e74, 0x2e93, 0x3eb2, 0x0ed1, 0x1ef0 -}; - - -uint32_t crc32tab[256] = { -    0x00000000, 0x04c11db7, 0x09823b6e, 0x0d4326d9, -    0x130476dc, 0x17c56b6b, 0x1a864db2, 0x1e475005, -    0x2608edb8, 0x22c9f00f, 0x2f8ad6d6, 0x2b4bcb61, -    0x350c9b64, 0x31cd86d3, 0x3c8ea00a, 0x384fbdbd, -    0x4c11db70, 0x48d0c6c7, 0x4593e01e, 0x4152fda9, -    0x5f15adac, 0x5bd4b01b, 0x569796c2, 0x52568b75, -    0x6a1936c8, 0x6ed82b7f, 0x639b0da6, 0x675a1011, -    0x791d4014, 0x7ddc5da3, 0x709f7b7a, 0x745e66cd, -    0x9823b6e0, 0x9ce2ab57, 0x91a18d8e, 0x95609039, -    0x8b27c03c, 0x8fe6dd8b, 0x82a5fb52, 0x8664e6e5, -    0xbe2b5b58, 0xbaea46ef, 0xb7a96036, 0xb3687d81, -    0xad2f2d84, 0xa9ee3033, 0xa4ad16ea, 0xa06c0b5d, -    0xd4326d90, 0xd0f37027, 0xddb056fe, 0xd9714b49, -    0xc7361b4c, 0xc3f706fb, 0xceb42022, 0xca753d95, -    0xf23a8028, 0xf6fb9d9f, 0xfbb8bb46, 0xff79a6f1, -    0xe13ef6f4, 0xe5ffeb43, 0xe8bccd9a, 0xec7dd02d, -    0x34867077, 0x30476dc0, 0x3d044b19, 0x39c556ae, -    0x278206ab, 0x23431b1c, 0x2e003dc5, 0x2ac12072, -    0x128e9dcf, 0x164f8078, 0x1b0ca6a1, 0x1fcdbb16, -    0x018aeb13, 0x054bf6a4, 0x0808d07d, 0x0cc9cdca, -    0x7897ab07, 0x7c56b6b0, 0x71159069, 0x75d48dde, -    0x6b93dddb, 0x6f52c06c, 0x6211e6b5, 0x66d0fb02, -    0x5e9f46bf, 0x5a5e5b08, 0x571d7dd1, 0x53dc6066, -    0x4d9b3063, 0x495a2dd4, 0x44190b0d, 0x40d816ba, -    0xaca5c697, 0xa864db20, 0xa527fdf9, 0xa1e6e04e, -    0xbfa1b04b, 0xbb60adfc, 0xb6238b25, 0xb2e29692, -    0x8aad2b2f, 0x8e6c3698, 0x832f1041, 0x87ee0df6, -    0x99a95df3, 0x9d684044, 0x902b669d, 0x94ea7b2a, -    0xe0b41de7, 0xe4750050, 0xe9362689, 0xedf73b3e, -    0xf3b06b3b, 0xf771768c, 0xfa325055, 0xfef34de2, -    0xc6bcf05f, 0xc27dede8, 0xcf3ecb31, 0xcbffd686, -    0xd5b88683, 0xd1799b34, 0xdc3abded, 0xd8fba05a, -    0x690ce0ee, 0x6dcdfd59, 0x608edb80, 0x644fc637, -    0x7a089632, 0x7ec98b85, 0x738aad5c, 0x774bb0eb, -    0x4f040d56, 0x4bc510e1, 0x46863638, 0x42472b8f, -    0x5c007b8a, 0x58c1663d, 0x558240e4, 0x51435d53, -    0x251d3b9e, 0x21dc2629, 0x2c9f00f0, 0x285e1d47, -    0x36194d42, 0x32d850f5, 0x3f9b762c, 0x3b5a6b9b, -    0x0315d626, 0x07d4cb91, 0x0a97ed48, 0x0e56f0ff, -    0x1011a0fa, 0x14d0bd4d, 0x19939b94, 0x1d528623, -    0xf12f560e, 0xf5ee4bb9, 0xf8ad6d60, 0xfc6c70d7, -    0xe22b20d2, 0xe6ea3d65, 0xeba91bbc, 0xef68060b, -    0xd727bbb6, 0xd3e6a601, 0xdea580d8, 0xda649d6f, -    0xc423cd6a, 0xc0e2d0dd, 0xcda1f604, 0xc960ebb3, -    0xbd3e8d7e, 0xb9ff90c9, 0xb4bcb610, 0xb07daba7, -    0xae3afba2, 0xaafbe615, 0xa7b8c0cc, 0xa379dd7b, -    0x9b3660c6, 0x9ff77d71, 0x92b45ba8, 0x9675461f, -    0x8832161a, 0x8cf30bad, 0x81b02d74, 0x857130c3, -    0x5d8a9099, 0x594b8d2e, 0x5408abf7, 0x50c9b640, -    0x4e8ee645, 0x4a4ffbf2, 0x470cdd2b, 0x43cdc09c, -    0x7b827d21, 0x7f436096, 0x7200464f, 0x76c15bf8, -    0x68860bfd, 0x6c47164a, 0x61043093, 0x65c52d24, -    0x119b4be9, 0x155a565e, 0x18197087, 0x1cd86d30, -    0x029f3d35, 0x065e2082, 0x0b1d065b, 0x0fdc1bec, -    0x3793a651, 0x3352bbe6, 0x3e119d3f, 0x3ad08088, -    0x2497d08d, 0x2056cd3a, 0x2d15ebe3, 0x29d4f654, -    0xc5a92679, 0xc1683bce, 0xcc2b1d17, 0xc8ea00a0, -    0xd6ad50a5, 0xd26c4d12, 0xdf2f6bcb, 0xdbee767c, -    0xe3a1cbc1, 0xe760d676, 0xea23f0af, 0xeee2ed18, -    0xf0a5bd1d, 0xf464a0aa, 0xf9278673, 0xfde69bc4, -    0x89b8fd09, 0x8d79e0be, 0x803ac667, 0x84fbdbd0, -    0x9abc8bd5, 0x9e7d9662, 0x933eb0bb, 0x97ffad0c, -    0xafb010b1, 0xab710d06, 0xa6322bdf, 0xa2f33668, -    0xbcb4666d, 0xb8757bda, 0xb5365d03, 0xb1f740b4 -}; - -// This function can be used to create a new table with a different polynom -void init_crc8tab(uint8_t l_code, uint8_t l_init) -{ -    unsigned i, j, msb; -    uint8_t nb; -    uint8_t crc; - -    for (i = 0; i < 256; ++i) { -        crc = l_init; -        nb = i ^ 0xff; -        for (j = 0; j < 8; ++j) { -            msb = (nb & (0x80 >> j)) && 1; -            msb ^= (crc >> 7); -            crc <<= 1; -            if (msb) -                crc ^= l_code; -        } -        crc8tab[i] = crc; -    } -} - - -void init_crc16tab(uint16_t l_code, uint16_t l_init) -{ -    unsigned i, j, msb; -    uint8_t nb; -    uint16_t crc; - -    for (i = 0; i < 256; ++i) { -        crc = l_init; -        nb = i ^ 0xff; -        for (j = 0; j < 8; ++j) { -            msb = (nb & (0x80 >> j)) && 1; -            msb ^= (crc >> 15); -            crc <<= 1; -            if (msb) -                crc ^= l_code; -        } -        crc ^= 0xff00; -        crc16tab[i] = crc; -    } -} - - -void init_crc32tab(uint32_t l_code, uint32_t l_init) -{ -    unsigned i, j, msb; -    uint8_t nb; -    uint32_t crc; - -    for (i = 0; i < 256; ++i) { -        crc = l_init; -        nb = i ^ 0xff; -        for (j = 0; j < 8; ++j) { -            msb = (nb & (0x80 >> j)) && 1; -            msb ^= (crc >> 31); -            crc <<= 1; -            if (msb) -                crc ^= l_code; -        } -        crc ^= 0xffffff00; -        crc32tab[i] = crc; -    } -} - - -uint8_t crc8(uint8_t l_crc, const void *lp_data, unsigned l_nb) -{ -    const uint8_t* data = (const uint8_t*)lp_data; -    while (l_nb--) { -        l_crc = crc8tab[l_crc ^ *(data++)]; -    } -    return (l_crc); -} - - -uint16_t crc16(uint16_t l_crc, const void *lp_data, unsigned l_nb) -{ -    const uint8_t* data = (const uint8_t*)lp_data; -    while (l_nb--) { -        l_crc = -            (l_crc << 8) ^ crc16tab[(l_crc >> 8) ^ *(data++)]; -    } -    return (l_crc); -} - - -uint32_t crc32(uint32_t l_crc, const void *lp_data, unsigned l_nb) -{ -    const uint8_t* data = (const uint8_t*)lp_data; -    while (l_nb--) { -        l_crc = -            (l_crc << 8) ^ crc32tab[((l_crc >> 24) ^ *(data++)) & 0xff]; -    } -    return (l_crc); -} diff --git a/src/crc.h b/src/crc.h deleted file mode 100644 index b1785a1..0000000 --- a/src/crc.h +++ /dev/null @@ -1,59 +0,0 @@ -/* -   Copyright (C) 2003, 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the -   Queen in Right of Canada (Communications Research Center Canada) -   */ -/* -   This file is part of ODR-DabMux. - -   ODR-DabMux is free software: you can redistribute it and/or modify -   it under the terms of the GNU General Public License as -   published by the Free Software Foundation, either version 3 of the -   License, or (at your option) any later version. - -   ODR-DabMux is distributed in the hope that it will be useful, -   but WITHOUT ANY WARRANTY; without even the implied warranty of -   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the -   GNU General Public License for more details. - -   You should have received a copy of the GNU General Public License -   along with ODR-DabMux.  If not, see <http://www.gnu.org/licenses/>. -   */ - -#ifndef _CRC -#define _CRC - -#ifdef HAVE_CONFIG_H -#   include "config.h" -#endif - -#ifndef _WIN32 -  #include <stdint.h> -#else -  #include <winsock2.h>	// For types... -  typedef BYTE uint8_t; -  typedef WORD uint16_t; -  typedef DWORD32 uint32_t; -#endif - - -#ifdef __cplusplus -extern "C" { // } -#endif - -void init_crc8tab(uint8_t l_code, uint8_t l_init); -uint8_t crc8(uint8_t l_crc, const void *lp_data, unsigned l_nb); -extern uint8_t crc8tab[]; - -void init_crc16tab(uint16_t l_code, uint16_t l_init); -uint16_t crc16(uint16_t l_crc, const void *lp_data, unsigned l_nb); -extern uint16_t crc16tab[]; - -void init_crc32tab(uint32_t l_code, uint32_t l_init); -uint32_t crc32(uint32_t l_crc, const void *lp_data, unsigned l_nb); -extern uint32_t crc32tab[]; - -#ifdef __cplusplus -} -#endif - -#endif //_CRC diff --git a/src/dabOutput/dabOutput.h b/src/dabOutput/dabOutput.h index 9cc18d7..c7e570b 100644 --- a/src/dabOutput/dabOutput.h +++ b/src/dabOutput/dabOutput.h @@ -28,8 +28,7 @@  #pragma once -#include "UdpSocket.h" -#include "TcpSocket.h" +#include "Socket.h"  #include "Log.h"  #include "string.h"  #include <stdexcept> @@ -57,6 +56,8 @@ class DabOutput          {              return Open(name.c_str());          } + +        // Return -1 on failure          virtual int Write(void* buffer, int size) = 0;          virtual int Close() = 0; @@ -145,15 +146,7 @@ class DabOutputRaw : public DabOutput  class DabOutputUdp : public DabOutput  {      public: -        DabOutputUdp() { -            packet_ = new UdpPacket(6144); -            socket_ = new UdpSocket(); -        } - -        virtual ~DabOutputUdp() { -            delete socket_; -            delete packet_; -        } +        DabOutputUdp();          int Open(const char* name);          int Write(void* buffer, int size); @@ -171,8 +164,8 @@ class DabOutputUdp : public DabOutput          DabOutputUdp operator=(const DabOutputUdp& other) = delete;          std::string uri_; -        UdpSocket* socket_; -        UdpPacket* packet_; +        Socket::UDPSocket socket_; +        Socket::UDPPacket packet_;  };  // -------------- TCP ------------------ @@ -190,7 +183,7 @@ class DabOutputTcp : public DabOutput      private:          std::string uri_; -        std::shared_ptr<TCPDataDispatcher> dispatcher_; +        std::shared_ptr<Socket::TCPDataDispatcher> dispatcher_;  };  // -------------- Simul ------------------ diff --git a/src/dabOutput/dabOutputTcp.cpp b/src/dabOutput/dabOutputTcp.cpp index 87dbfd5..4dc3538 100644 --- a/src/dabOutput/dabOutputTcp.cpp +++ b/src/dabOutput/dabOutputTcp.cpp @@ -94,7 +94,7 @@ int DabOutputTcp::Open(const char* name)      uri_ = name;      if (success) { -        dispatcher_ = make_shared<TCPDataDispatcher>(MAX_QUEUED_ETI_FRAMES); +        dispatcher_ = make_shared<Socket::TCPDataDispatcher>(MAX_QUEUED_ETI_FRAMES);          dispatcher_->start(port, address);      }      else { diff --git a/src/dabOutput/dabOutputUdp.cpp b/src/dabOutput/dabOutputUdp.cpp index c129569..b9c22db 100644 --- a/src/dabOutput/dabOutputUdp.cpp +++ b/src/dabOutput/dabOutputUdp.cpp @@ -38,18 +38,12 @@  #include <cstdio>  #include <limits.h>  #include "dabOutput.h" -#include "UdpSocket.h" - -#ifdef _WIN32 -#   include <fscfg.h> -#   include <sdci.h> -#else -#   include <netinet/in.h> -#   include <sys/types.h> -#   include <sys/socket.h> -#   include <sys/ioctl.h> -#   include <net/if_arp.h> -#endif +#include "Socket.h" + +DabOutputUdp::DabOutputUdp() : +    socket_(), +    packet_(6144) +{ }  int DabOutputUdp::Open(const char* name)  { @@ -64,12 +58,6 @@ int DabOutputUdp::Open(const char* name)                  regex_constants::match_default)) {          string address = what[1]; -        if (this->packet_->getAddress().setAddress(address.c_str()) == -1) { -            etiLog.level(error) << "can't set address " << -               address <<  "(" << inetErrDesc << ": " << inetErrMsg << ")"; -            return -1; -        } -          string port_str = what[2];          long port = std::strtol(port_str.c_str(), nullptr, 0); @@ -79,7 +67,7 @@ int DabOutputUdp::Open(const char* name)              return -1;          } -        this->packet_->getAddress().setPort(port); +        packet_.address.resolveUdpDestination(address, port);          string query_params = what[3];          smatch query_what; @@ -87,28 +75,25 @@ int DabOutputUdp::Open(const char* name)                      regex_constants::match_default)) {              string src = query_what[1]; -            int err = socket_->setMulticastSource(src.c_str()); -            if (err) { -                etiLog.level(error) << "UDP output socket set source failed!"; -                return -1; -            } +            try { +                socket_.setMulticastSource(src.c_str()); -            string ttl_str = query_what[2]; +                string ttl_str = query_what[2]; -            if (not ttl_str.empty()) { -                long ttl = std::strtol(ttl_str.c_str(), nullptr, 0); -                if ((ttl <= 0) || (ttl >= 255)) { -                    etiLog.level(error) << "Invalid TTL setting in " << -                        uri_without_proto; -                    return -1; -                } +                if (not ttl_str.empty()) { +                    long ttl = std::strtol(ttl_str.c_str(), nullptr, 0); +                    if ((ttl <= 0) || (ttl >= 255)) { +                        etiLog.level(error) << "Invalid TTL setting in " << +                            uri_without_proto; +                        return -1; +                    } -                err = socket_->setMulticastTTL(ttl); -                if (err) { -                    etiLog.level(error) << "UDP output socket set TTL failed!"; -                    return -1; +                    socket_.setMulticastTTL(ttl);                  }              } +            catch (const std::runtime_error& e) { +                etiLog.level(error) << "Failed to set UDP output settings" << e.what(); +            }          }          else if (not query_params.empty()) {              etiLog.level(error) << "UDP output: could not parse parameters " << @@ -129,9 +114,11 @@ int DabOutputUdp::Open(const char* name)  int DabOutputUdp::Write(void* buffer, int size)  { -    this->packet_->setSize(0); -    this->packet_->addData(buffer, size); -    return this->socket_->send(*this->packet_); +    const uint8_t *buf = reinterpret_cast<uint8_t*>(buffer); +    packet_.buffer.resize(0); +    std::copy(buf, buf + size, std::back_inserter(packet_.buffer)); +    socket_.send(packet_); +    return 0;  }  #endif // defined(HAVE_OUTPUT_UDP) diff --git a/src/dabOutput/edi/AFPacket.cpp b/src/dabOutput/edi/AFPacket.cpp deleted file mode 100644 index a58a980..0000000 --- a/src/dabOutput/edi/AFPacket.cpp +++ /dev/null @@ -1,96 +0,0 @@ -/* -   Copyright (C) 2014 -   Matthias P. Braendli, matthias.braendli@mpb.li - -    http://www.opendigitalradio.org - -   EDI output. -    This implements an AF Packet as defined ETSI TS 102 821. -    Also see ETSI TS 102 693 - -   */ -/* -   This file is part of ODR-DabMux. - -   ODR-DabMux is free software: you can redistribute it and/or modify -   it under the terms of the GNU General Public License as -   published by the Free Software Foundation, either version 3 of the -   License, or (at your option) any later version. - -   ODR-DabMux is distributed in the hope that it will be useful, -   but WITHOUT ANY WARRANTY; without even the implied warranty of -   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the -   GNU General Public License for more details. - -   You should have received a copy of the GNU General Public License -   along with ODR-DabMux.  If not, see <http://www.gnu.org/licenses/>. -   */ -#include "config.h" -#include "crc.h" -#include "AFPacket.h" -#include "TagItems.h" -#include "TagPacket.h" -#include <vector> -#include <string> -#include <iostream> -#include <cstdio> -#include <stdint.h> -#include <arpa/inet.h> - -namespace edi { - -// Header PT field. AF packet contains TAG payload -const uint8_t AFHEADER_PT_TAG = 'T'; - -// AF Packet Major (3 bits) and Minor (4 bits) version -const uint8_t AFHEADER_VERSION = 0x10; // MAJ=1, MIN=0 - -AFPacket AFPacketiser::Assemble(TagPacket tag_packet) -{ -    std::vector<uint8_t> payload = tag_packet.Assemble(); - -    if (m_verbose) -        std::cerr << "Assemble AFPacket " << seq << std::endl; - -    std::string pack_data("AF"); // SYNC -    std::vector<uint8_t> packet(pack_data.begin(), pack_data.end()); - -    uint32_t taglength = payload.size(); - -    if (m_verbose) -        std::cerr << "         AFPacket payload size " << payload.size() << std::endl; - -    // write length into packet -    packet.push_back((taglength >> 24) & 0xFF); -    packet.push_back((taglength >> 16) & 0xFF); -    packet.push_back((taglength >> 8) & 0xFF); -    packet.push_back(taglength & 0xFF); - -    // fill rest of header -    packet.push_back(seq >> 8); -    packet.push_back(seq & 0xFF); -    seq++; -    packet.push_back((have_crc ? 0x80 : 0) | AFHEADER_VERSION); // ar_cf: CRC=1 -    packet.push_back(AFHEADER_PT_TAG); - -    // insert payload, must have a length multiple of 8 bytes -    packet.insert(packet.end(), payload.begin(), payload.end()); - -    // calculate CRC over AF Header and payload -    uint16_t crc = 0xffff; -    crc = crc16(crc, &(packet.front()), packet.size()); -    crc ^= 0xffff; - -    if (m_verbose) -        fprintf(stderr, "         AFPacket crc %x\n", crc); - -    packet.push_back((crc >> 8) & 0xFF); -    packet.push_back(crc & 0xFF); - -    if (m_verbose) -        std::cerr << "         AFPacket length " << packet.size() << std::endl; - -    return packet; -} - -} diff --git a/src/dabOutput/edi/AFPacket.h b/src/dabOutput/edi/AFPacket.h deleted file mode 100644 index b4ccef1..0000000 --- a/src/dabOutput/edi/AFPacket.h +++ /dev/null @@ -1,61 +0,0 @@ -/* -   Copyright (C) 2014 -   Matthias P. Braendli, matthias.braendli@mpb.li - -    http://www.opendigitalradio.org - -   EDI output. -    This implements an AF Packet as defined ETSI TS 102 821. -    Also see ETSI TS 102 693 - -   */ -/* -   This file is part of ODR-DabMux. - -   ODR-DabMux is free software: you can redistribute it and/or modify -   it under the terms of the GNU General Public License as -   published by the Free Software Foundation, either version 3 of the -   License, or (at your option) any later version. - -   ODR-DabMux is distributed in the hope that it will be useful, -   but WITHOUT ANY WARRANTY; without even the implied warranty of -   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the -   GNU General Public License for more details. - -   You should have received a copy of the GNU General Public License -   along with ODR-DabMux.  If not, see <http://www.gnu.org/licenses/>. -   */ - -#pragma once - -#include "config.h" -#include <vector> -#include <stdint.h> -#include "TagItems.h" -#include "TagPacket.h" - -namespace edi { - -typedef std::vector<uint8_t> AFPacket; - -// ETSI TS 102 821, 6.1 AF packet structure -class AFPacketiser -{ -    public: -        AFPacketiser() : -            m_verbose(false) {}; -        AFPacketiser(bool verbose) : -            m_verbose(verbose) {}; - -        AFPacket Assemble(TagPacket tag_packet); - -    private: -        static const bool have_crc = true; - -        uint16_t seq = 0; //counter that overflows at 0xFFFF - -        bool m_verbose; -}; - -} - diff --git a/src/dabOutput/edi/Config.h b/src/dabOutput/edi/Config.h deleted file mode 100644 index 55d5f0f..0000000 --- a/src/dabOutput/edi/Config.h +++ /dev/null @@ -1,77 +0,0 @@ -/* -   Copyright (C) 2019 -   Matthias P. Braendli, matthias.braendli@mpb.li - -    http://www.opendigitalradio.org - -   EDI output, -   UDP and TCP transports and their configuration - -   */ -/* -   This file is part of ODR-DabMux. - -   ODR-DabMux is free software: you can redistribute it and/or modify -   it under the terms of the GNU General Public License as -   published by the Free Software Foundation, either version 3 of the -   License, or (at your option) any later version. - -   ODR-DabMux is distributed in the hope that it will be useful, -   but WITHOUT ANY WARRANTY; without even the implied warranty of -   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the -   GNU General Public License for more details. - -   You should have received a copy of the GNU General Public License -   along with ODR-DabMux.  If not, see <http://www.gnu.org/licenses/>. -   */ - -#pragma once - -#include "config.h" -#include <vector> -#include <string> -#include <memory> -#include <cstdint> - -namespace edi { - -/** Configuration for EDI output */ - -struct destination_t { -    virtual ~destination_t() {}; -}; - -// Can represent both unicast and multicast destinations -struct udp_destination_t : public destination_t { -    std::string dest_addr; -    std::string source_addr; -    unsigned int source_port = 0; -    unsigned int ttl = 10; -}; - -// TCP server that can accept multiple connections -struct tcp_destination_t : public destination_t { -    unsigned int listen_port = 0; -    size_t max_frames_queued = 1024; -}; - -struct configuration_t { -    unsigned chunk_len = 207;        // RSk, data length of each chunk -    unsigned fec       = 0;          // number of fragments that can be recovered -    bool dump          = false;      // dump a file with the EDI packets -    bool verbose       = false; -    bool enable_pft    = false;      // Enable protection and fragmentation -    unsigned int tagpacket_alignment = 0; -    std::vector<std::shared_ptr<destination_t> > destinations; -    unsigned int dest_port = 0;      // common destination port, because it's encoded in the transport layer -    unsigned int latency_frames = 0; // if nonzero, enable interleaver with a latency of latency_frames * 24ms - -    bool enabled() const { return destinations.size() > 0; } -    bool interleaver_enabled() const { return latency_frames > 0; } - -    void print() const; -}; - -} - - diff --git a/src/dabOutput/edi/Interleaver.cpp b/src/dabOutput/edi/Interleaver.cpp deleted file mode 100644 index f26a50e..0000000 --- a/src/dabOutput/edi/Interleaver.cpp +++ /dev/null @@ -1,122 +0,0 @@ -/* -   Copyright (C) 2017 -   Matthias P. Braendli, matthias.braendli@mpb.li - -    http://www.opendigitalradio.org - -   EDI output, -   Interleaving of PFT fragments to increase robustness against -   burst packet loss. - -   This is possible because EDI has to assume that fragments may reach -   the receiver out of order. - -   */ -/* -   This file is part of ODR-DabMux. - -   ODR-DabMux is free software: you can redistribute it and/or modify -   it under the terms of the GNU General Public License as -   published by the Free Software Foundation, either version 3 of the -   License, or (at your option) any later version. - -   ODR-DabMux is distributed in the hope that it will be useful, -   but WITHOUT ANY WARRANTY; without even the implied warranty of -   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the -   GNU General Public License for more details. - -   You should have received a copy of the GNU General Public License -   along with ODR-DabMux.  If not, see <http://www.gnu.org/licenses/>. -   */ - -#include "Interleaver.h" -#include <cassert> - -namespace edi { - -void Interleaver::SetLatency(size_t latency_frames) -{ -    m_latency = latency_frames; -} - -Interleaver::fragment_vec Interleaver::Interleave(fragment_vec &fragments) -{ -    m_fragment_count = fragments.size(); - -    // Create vectors containing Fcount*latency fragments in total -    // and store them into the deque -    if (m_buffer.empty()) { -        m_buffer.emplace_back(); -    } - -    auto& last_buffer = m_buffer.back(); - -    for (auto& fragment : fragments) { -        const bool last_buffer_is_complete = -                (last_buffer.size() >= m_fragment_count * m_latency); - -        if (last_buffer_is_complete) { -            m_buffer.emplace_back(); -            last_buffer = m_buffer.back(); -        } - -        last_buffer.push_back(std::move(fragment)); -    } - -    fragments.clear(); - -    while ( not m_buffer.empty() and -            (m_buffer.front().size() >= m_fragment_count * m_latency)) { - -        auto& first_buffer = m_buffer.front(); - -        assert(first_buffer.size() == m_fragment_count * m_latency); - -        /* Assume we have 5 fragments per AF frame, and latency of 3. -         * This will give the following strides: -         *    0        1     2 -         * +-------+-------+---+ -         * | 0   1 | 2   3 | 4 | -         * |       |   +---+   | -         * | 5   6 | 7 | 8   9 | -         * |   +---+   |       | -         * |10 |11  12 |13  14 | -         * +---+-------+-------+ -         * -         * ix will be 0, 5, 10, 1, 6 in the first loop -         */ - -        for (size_t i = 0; i < m_fragment_count; i++) { -            const size_t ix = m_interleave_offset + m_fragment_count * m_stride; -            m_interleaved_fragments.push_back(first_buffer.at(ix)); - -            m_stride += 1; -            if (m_stride >= m_latency) { -                m_interleave_offset++; -                m_stride = 0; -            } -        } - -        if (m_interleave_offset >= m_fragment_count) { -            m_interleave_offset = 0; -            m_stride = 0; -            m_buffer.pop_front(); -        } -    } - -    std::vector<PFTFragment> interleaved_frags; - -    const size_t n = std::min(m_fragment_count, m_interleaved_fragments.size()); -    std::move(m_interleaved_fragments.begin(), -              m_interleaved_fragments.begin() + n, -              std::back_inserter(interleaved_frags)); -    m_interleaved_fragments.erase( -              m_interleaved_fragments.begin(), -              m_interleaved_fragments.begin() + n); - -    return interleaved_frags; -} - -} - - diff --git a/src/dabOutput/edi/Interleaver.h b/src/dabOutput/edi/Interleaver.h deleted file mode 100644 index f1cff30..0000000 --- a/src/dabOutput/edi/Interleaver.h +++ /dev/null @@ -1,75 +0,0 @@ -/* -   Copyright (C) 2017 -   Matthias P. Braendli, matthias.braendli@mpb.li - -    http://www.opendigitalradio.org - -   EDI output, -   Interleaving of PFT fragments to increase robustness against -   burst packet loss. - -   This is possible because EDI has to assume that fragments may reach -   the receiver out of order. - -   */ -/* -   This file is part of ODR-DabMux. - -   ODR-DabMux is free software: you can redistribute it and/or modify -   it under the terms of the GNU General Public License as -   published by the Free Software Foundation, either version 3 of the -   License, or (at your option) any later version. - -   ODR-DabMux is distributed in the hope that it will be useful, -   but WITHOUT ANY WARRANTY; without even the implied warranty of -   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the -   GNU General Public License for more details. - -   You should have received a copy of the GNU General Public License -   along with ODR-DabMux.  If not, see <http://www.gnu.org/licenses/>. -   */ - -#pragma once - -#include "config.h" -#include <vector> -#include <deque> -#include <stdexcept> -#include <stdint.h> -#include "Log.h" -#include "PFT.h" - -namespace edi { - -class Interleaver { -    public: -        using fragment_vec = std::vector<PFTFragment>; - -        /* Configure the interleaver to use latency_frames number of AF -         * packets for interleaving. Total delay through the interleaver -         * will be latency_frames * 24ms -         */ -        void SetLatency(size_t latency_frames); - -        /* Move the fragments for an AF Packet into the interleaver and -         * return interleaved fragments to be transmitted. -         */ -        fragment_vec Interleave(fragment_vec &fragments); - -    private: -        size_t m_latency = 0; -        size_t m_fragment_count = 0; -        size_t m_interleave_offset = 0; -        size_t m_stride = 0; - -        /* Buffer that accumulates enough fragments to interleave */ -        std::deque<fragment_vec> m_buffer; - -        /* Buffer that contains fragments that have been interleaved, -         * to avoid that the interleaver output is too bursty -         */ -        std::deque<PFTFragment> m_interleaved_fragments; -}; - -} - diff --git a/src/dabOutput/edi/PFT.cpp b/src/dabOutput/edi/PFT.cpp deleted file mode 100644 index 5b93016..0000000 --- a/src/dabOutput/edi/PFT.cpp +++ /dev/null @@ -1,327 +0,0 @@ -/* -   Copyright (C) 2019 -   Matthias P. Braendli, matthias.braendli@mpb.li - -    http://www.opendigitalradio.org - -   EDI output, -   Protection, Fragmentation and Transport. (PFT) - -   Are supported: -    Reed-Solomon and Fragmentation - -   This implements part of PFT as defined ETSI TS 102 821. - -   */ -/* -   This file is part of ODR-DabMux. - -   ODR-DabMux is free software: you can redistribute it and/or modify -   it under the terms of the GNU General Public License as -   published by the Free Software Foundation, either version 3 of the -   License, or (at your option) any later version. - -   ODR-DabMux is distributed in the hope that it will be useful, -   but WITHOUT ANY WARRANTY; without even the implied warranty of -   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the -   GNU General Public License for more details. - -   You should have received a copy of the GNU General Public License -   along with ODR-DabMux.  If not, see <http://www.gnu.org/licenses/>. -   */ - -#include "config.h" -#include <vector> -#include <list> -#include <cstdio> -#include <cstring> -#include <stdint.h> -#include <arpa/inet.h> -#include <stdexcept> -#include <sstream> -#include "PFT.h" -#include "crc.h" -#include "ReedSolomon.h" - -namespace edi { - -using namespace std; - -// An integer division that rounds up, i.e. ceil(a/b) -#define CEIL_DIV(a, b) (a % b == 0  ? a / b : a / b + 1) - -PFT::PFT() { } - -PFT::PFT(const configuration_t &conf) : -    m_k(conf.chunk_len), -    m_m(conf.fec), -    m_dest_port(conf.dest_port), -    m_pseq(0), -    m_num_chunks(0), -    m_verbose(conf.verbose) -    { -        if (m_k > 207) { -            etiLog.level(warn) << -                "EDI PFT: maximum chunk size is 207."; -            throw std::out_of_range("EDI PFT Chunk size too large."); -        } - -        if (m_m > 5) { -            etiLog.level(warn) << -                "EDI PFT: high number of recoverable fragments" -                " may lead to large overhead"; -            // See TS 102 821, 7.2.1 Known values, list entry for 'm' -        } -    } - -RSBlock PFT::Protect(AFPacket af_packet) -{ -    RSBlock rs_block; - -    // number of chunks is ceil(afpacketsize / m_k) -    // TS 102 821 7.2.2: c = ceil(l / k_max) -    m_num_chunks = CEIL_DIV(af_packet.size(), m_k); - -    if (m_verbose) { -        fprintf(stderr, "Protect %zu chunks of size %zu\n", -                m_num_chunks, af_packet.size()); -    } - -    // calculate size of chunk: -    // TS 102 821 7.2.2: k = ceil(l / c) -    // chunk_len does not include the 48 bytes of protection. -    const size_t chunk_len = CEIL_DIV(af_packet.size(), m_num_chunks); -    if (chunk_len > 207) { -        std::stringstream ss; -        ss << "Chunk length " << chunk_len << " too large (>207)"; -        throw std::runtime_error(ss.str()); -    } - -    // The last RS chunk is zero padded -    // TS 102 821 7.2.2: z = c*k - l -    const size_t zero_pad = m_num_chunks * chunk_len - af_packet.size(); - -    // Create the RS(k+p,k) encoder -    const int firstRoot = 1; // Discovered by analysing EDI dump -    const int gfPoly = 0x11d; -    const bool reverse = false; -    // The encoding has to be 255, 207 always, because the chunk has to -    // be padded at the end, and not at the beginning as libfec would -    // do -    ReedSolomon rs_encoder(255, 207, reverse, gfPoly, firstRoot); - -    // add zero padding to last chunk -    for (size_t i = 0; i < zero_pad; i++) { -        af_packet.push_back(0); -    } - -    if (m_verbose) { -        fprintf(stderr, "        add %zu zero padding\n", zero_pad); -    } - -    // Calculate RS for each chunk and assemble RS block -    for (size_t i = 0; i < af_packet.size(); i+= chunk_len) { -        vector<uint8_t> chunk(207); -        vector<uint8_t> protection(PARITYBYTES); - -        // copy chunk_len bytes into new chunk -        memcpy(&chunk.front(), &af_packet[i], chunk_len); - -        // calculate RS for chunk with padding -        rs_encoder.encode(&chunk.front(), &protection.front(), 207); - -        // Drop the padding -        chunk.resize(chunk_len); - -        // append new chunk and protection to the RS Packet -        rs_block.insert(rs_block.end(), chunk.begin(), chunk.end()); -        rs_block.insert(rs_block.end(), protection.begin(), protection.end()); -    } - -    return rs_block; -} - -vector< vector<uint8_t> > PFT::ProtectAndFragment(AFPacket af_packet) -{ -    const bool enable_RS = (m_m > 0); - -    if (enable_RS) { -        RSBlock rs_block = Protect(af_packet); - -#if 0 -        fprintf(stderr, "  af_packet (%zu):", af_packet.size()); -        for (size_t i = 0; i < af_packet.size(); i++) { -            fprintf(stderr, "%02x ", af_packet[i]); -        } -        fprintf(stderr, "\n"); - -        fprintf(stderr, "  rs_block (%zu):", rs_block.size()); -        for (size_t i = 0; i < rs_block.size(); i++) { -            fprintf(stderr, "%02x ", rs_block[i]); -        } -        fprintf(stderr, "\n"); -#endif - -        // TS 102 821 7.2.2: s_max = MIN(floor(c*p/(m+1)), MTU - h)) -        const size_t max_payload_size = ( m_num_chunks * PARITYBYTES ) / (m_m + 1); - -        // Calculate fragment count and size -        // TS 102 821 7.2.2: ceil((l + c*p + z) / s_max) -        // l + c*p + z = length of RS block -        const size_t num_fragments = CEIL_DIV(rs_block.size(), max_payload_size); - -        // TS 102 821 7.2.2: ceil((l + c*p + z) / f) -        const size_t fragment_size = CEIL_DIV(rs_block.size(), num_fragments); - -        if (m_verbose) -            fprintf(stderr, "  PnF fragment_size %zu, num frag %zu\n", -                    fragment_size, num_fragments); - -        vector< vector<uint8_t> > fragments(num_fragments); - -        for (size_t i = 0; i < num_fragments; i++) { -            fragments[i].resize(fragment_size); -            for (size_t j = 0; j < fragment_size; j++) { -                const size_t ix = j*num_fragments + i; -                if (ix < rs_block.size()) { -                    fragments[i][j] = rs_block[ix]; -                } -                else { -                    fragments[i][j] = 0; -                } -            } -        } - -        return fragments; -    } -    else { // No RS, only fragmentation -        // TS 102 821 7.2.2: s_max = MTU - h -        // Ethernet MTU is 1500, but maybe you are routing over a network which -        // has some sort of packet encapsulation. Add some margin. -        const size_t max_payload_size = 1400; - -        // Calculate fragment count and size -        // TS 102 821 7.2.2: ceil((l + c*p + z) / s_max) -        // l + c*p + z = length of AF packet -        const size_t num_fragments = CEIL_DIV(af_packet.size(), max_payload_size); - -        // TS 102 821 7.2.2: ceil((l + c*p + z) / f) -        const size_t fragment_size = CEIL_DIV(af_packet.size(), num_fragments); -        vector< vector<uint8_t> > fragments(num_fragments); - -        for (size_t i = 0; i < num_fragments; i++) { -            fragments[i].reserve(fragment_size); - -            for (size_t j = 0; j < fragment_size; j++) { -                const size_t ix = i*fragment_size + j; -                if (ix < af_packet.size()) { -                    fragments[i].push_back(af_packet.at(ix)); -                } -                else { -                    break; -                } -            } -        } - -        return fragments; -    } -} - -std::vector< PFTFragment > PFT::Assemble(AFPacket af_packet) -{ -    vector< vector<uint8_t> > fragments = ProtectAndFragment(af_packet); -    vector< vector<uint8_t> > pft_fragments; // These contain PF headers - -    const bool enable_RS = (m_m > 0); -    const bool enable_transport = true; - -    unsigned int findex = 0; - -    unsigned fcount = fragments.size(); - -    // calculate size of chunk: -    // TS 102 821 7.2.2: k = ceil(l / c) -    // chunk_len does not include the 48 bytes of protection. -    const size_t chunk_len = enable_RS ? -        CEIL_DIV(af_packet.size(), m_num_chunks) : 0; - -    // The last RS chunk is zero padded -    // TS 102 821 7.2.2: z = c*k - l -    const size_t zero_pad = enable_RS ? -        m_num_chunks * chunk_len - af_packet.size() : 0; - -    for (const auto &fragment : fragments) { -        // Psync -        std::string psync("PF"); -        std::vector<uint8_t> packet(psync.begin(), psync.end()); - -        // Pseq -        packet.push_back(m_pseq >> 8); -        packet.push_back(m_pseq & 0xFF); - -        // Findex -        packet.push_back(findex >> 16); -        packet.push_back(findex >> 8); -        packet.push_back(findex & 0xFF); -        findex++; - -        // Fcount -        packet.push_back(fcount >> 16); -        packet.push_back(fcount >> 8); -        packet.push_back(fcount & 0xFF); - -        // RS (1 bit), transport (1 bit) and Plen (14 bits) -        unsigned int plen = fragment.size(); -        if (enable_RS) { -            plen |= 0x8000; // Set FEC bit -        } - -        if (enable_transport) { -            plen |= 0x4000; // Set ADDR bit -        } - -        packet.push_back(plen >> 8); -        packet.push_back(plen & 0xFF); - -        if (enable_RS) { -            packet.push_back(chunk_len);   // RSk -            packet.push_back(zero_pad);    // RSz -        } - -        if (enable_transport) { -            // Source (16 bits) -            uint16_t addr_source = 0; -            packet.push_back(addr_source >> 8); -            packet.push_back(addr_source & 0xFF); - -            // Dest (16 bits) -            packet.push_back(m_dest_port >> 8); -            packet.push_back(m_dest_port & 0xFF); -        } - -        // calculate CRC over AF Header and payload -        uint16_t crc = 0xffff; -        crc = crc16(crc, &(packet.front()), packet.size()); -        crc ^= 0xffff; - -        packet.push_back((crc >> 8) & 0xFF); -        packet.push_back(crc & 0xFF); - -        // insert payload, must have a length multiple of 8 bytes -        packet.insert(packet.end(), fragment.begin(), fragment.end()); - -        pft_fragments.push_back(packet); - -#if 0 -        fprintf(stderr, "* PFT pseq %d, findex %d, fcount %d, plen %d\n", -                m_pseq, findex, fcount, plen & ~0x8000); -#endif -    } - -    m_pseq++; - -    return pft_fragments; -} - -} - diff --git a/src/dabOutput/edi/PFT.h b/src/dabOutput/edi/PFT.h deleted file mode 100644 index 4076bf3..0000000 --- a/src/dabOutput/edi/PFT.h +++ /dev/null @@ -1,78 +0,0 @@ -/* -   Copyright (C) 2019 -   Matthias P. Braendli, matthias.braendli@mpb.li - -    http://www.opendigitalradio.org - -   EDI output, -   Protection, Fragmentation and Transport. (PFT) - -   Are supported: -    Reed-Solomon and Fragmentation - -   This implements part of PFT as defined ETSI TS 102 821. - -   */ -/* -   This file is part of ODR-DabMux. - -   ODR-DabMux is free software: you can redistribute it and/or modify -   it under the terms of the GNU General Public License as -   published by the Free Software Foundation, either version 3 of the -   License, or (at your option) any later version. - -   ODR-DabMux is distributed in the hope that it will be useful, -   but WITHOUT ANY WARRANTY; without even the implied warranty of -   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the -   GNU General Public License for more details. - -   You should have received a copy of the GNU General Public License -   along with ODR-DabMux.  If not, see <http://www.gnu.org/licenses/>. -   */ - -#pragma once - -#include "config.h" -#include <vector> -#include <list> -#include <stdexcept> -#include <stdint.h> -#include "AFPacket.h" -#include "Log.h" -#include "ReedSolomon.h" -#include "dabOutput/edi/Config.h" - -namespace edi { - -typedef std::vector<uint8_t> RSBlock; -typedef std::vector<uint8_t> PFTFragment; - -class PFT -{ -    public: -        static constexpr int PARITYBYTES = 48; - -        PFT(); -        PFT(const configuration_t& conf); - -        // return a list of PFT fragments with the correct -        // PFT headers -        std::vector< PFTFragment > Assemble(AFPacket af_packet); - -        // Apply Reed-Solomon FEC to the AF Packet -        RSBlock Protect(AFPacket af_packet); - -        // Cut a RSBlock into several fragments that can be transmitted -        std::vector< std::vector<uint8_t> > ProtectAndFragment(AFPacket af_packet); - -    private: -        unsigned int m_k = 207; // length of RS data word -        unsigned int m_m = 3; // number of fragments that can be recovered if lost -        unsigned int m_dest_port = 12000; // Destination port for transport header -        uint16_t m_pseq = 0; -        size_t m_num_chunks = 0; -        bool m_verbose = 0; -}; - -} - diff --git a/src/dabOutput/edi/TagItems.cpp b/src/dabOutput/edi/TagItems.cpp deleted file mode 100644 index dfb4934..0000000 --- a/src/dabOutput/edi/TagItems.cpp +++ /dev/null @@ -1,216 +0,0 @@ -/* -   EDI output. -    This defines a few TAG items as defined ETSI TS 102 821 and -    ETSI TS 102 693 - -   Copyright (C) 2019 -   Matthias P. Braendli, matthias.braendli@mpb.li - -    http://www.opendigitalradio.org - -   */ -/* -   This file is part of ODR-DabMux. - -   ODR-DabMux is free software: you can redistribute it and/or modify -   it under the terms of the GNU General Public License as -   published by the Free Software Foundation, either version 3 of the -   License, or (at your option) any later version. - -   ODR-DabMux is distributed in the hope that it will be useful, -   but WITHOUT ANY WARRANTY; without even the implied warranty of -   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the -   GNU General Public License for more details. - -   You should have received a copy of the GNU General Public License -   along with ODR-DabMux.  If not, see <http://www.gnu.org/licenses/>. -   */ - -#include "config.h" -#include "TagItems.h" -#include <vector> -#include <iostream> -#include <string> -#include <stdint.h> -#include <stdexcept> - -namespace edi { - -std::vector<uint8_t> TagStarPTR::Assemble() -{ -    //std::cerr << "TagItem *ptr" << std::endl; -    std::string pack_data("*ptr"); -    std::vector<uint8_t> packet(pack_data.begin(), pack_data.end()); - -    packet.push_back(0); -    packet.push_back(0); -    packet.push_back(0); -    packet.push_back(0x40); - -    std::string protocol("DETI"); -    packet.insert(packet.end(), protocol.begin(), protocol.end()); - -    // Major -    packet.push_back(0); -    packet.push_back(0); - -    // Minor -    packet.push_back(0); -    packet.push_back(0); -    return packet; -} - -std::vector<uint8_t> TagDETI::Assemble() -{ -    std::string pack_data("deti"); -    std::vector<uint8_t> packet(pack_data.begin(), pack_data.end()); -    packet.reserve(256); - -    // Placeholder for length -    packet.push_back(0); -    packet.push_back(0); -    packet.push_back(0); -    packet.push_back(0); - -    uint8_t fct  = dlfc % 250; -    uint8_t fcth = dlfc / 250; - - -    uint16_t detiHeader = fct | (fcth << 8) | (rfudf << 13) | (ficf << 14) | (atstf << 15); -    packet.push_back(detiHeader >> 8); -    packet.push_back(detiHeader & 0xFF); - -    uint32_t etiHeader = mnsc | (rfu << 16) | (rfa << 17) | -                        (fp << 19) | (mid << 22) | (stat << 24); -    packet.push_back((etiHeader >> 24) & 0xFF); -    packet.push_back((etiHeader >> 16) & 0xFF); -    packet.push_back((etiHeader >> 8) & 0xFF); -    packet.push_back(etiHeader & 0xFF); - -    if (atstf) { -        packet.push_back(utco); - -        packet.push_back((seconds >> 24) & 0xFF); -        packet.push_back((seconds >> 16) & 0xFF); -        packet.push_back((seconds >> 8) & 0xFF); -        packet.push_back(seconds & 0xFF); - -        packet.push_back((tsta >> 16) & 0xFF); -        packet.push_back((tsta >> 8) & 0xFF); -        packet.push_back(tsta & 0xFF); -    } - -    if (ficf) { -        for (size_t i = 0; i < fic_length; i++) { -            packet.push_back(fic_data[i]); -        } -    } - -    if (rfudf) { -        packet.push_back((rfud >> 16) & 0xFF); -        packet.push_back((rfud >> 8) & 0xFF); -        packet.push_back(rfud & 0xFF); -    } - -    // calculate and update size -    // remove TAG name and TAG length fields and convert to bits -    uint32_t taglength = (packet.size() - 8) * 8; - -    // write length into packet -    packet[4] = (taglength >> 24) & 0xFF; -    packet[5] = (taglength >> 16) & 0xFF; -    packet[6] = (taglength >> 8) & 0xFF; -    packet[7] = taglength & 0xFF; - -    dlfc = (dlfc+1) % 5000; - -    /* -    std::cerr << "TagItem deti, packet.size " << packet.size() << std::endl; -    std::cerr << "              fic length " << fic_length << std::endl; -    std::cerr << "              length " << taglength / 8 << std::endl; -    */ -    return packet; -} - -void TagDETI::set_edi_time(const std::time_t t, int tai_utc_offset) -{ -    utco = tai_utc_offset - 32; - -    const std::time_t posix_timestamp_1_jan_2000 = 946684800; - -    seconds = t - posix_timestamp_1_jan_2000 + utco; -} - -std::vector<uint8_t> TagESTn::Assemble() -{ -    std::string pack_data("est"); -    std::vector<uint8_t> packet(pack_data.begin(), pack_data.end()); -    packet.reserve(mst_length*8 + 16); - -    packet.push_back(id); - -    // Placeholder for length -    packet.push_back(0); -    packet.push_back(0); -    packet.push_back(0); -    packet.push_back(0); - -    if (tpl > 0x3F) { -        throw std::runtime_error("TagESTn: invalid TPL value"); -    } - -    if (sad > 0x3FF) { -        throw std::runtime_error("TagESTn: invalid SAD value"); -    } - -    if (scid > 0x3F) { -        throw std::runtime_error("TagESTn: invalid SCID value"); -    } - -    uint32_t sstc = (scid << 18) | (sad << 8) | (tpl << 2) | rfa; -    packet.push_back((sstc >> 16) & 0xFF); -    packet.push_back((sstc >> 8) & 0xFF); -    packet.push_back(sstc & 0xFF); - -    for (size_t i = 0; i < mst_length * 8; i++) { -        packet.push_back(mst_data[i]); -    } - -    // calculate and update size -    // remove TAG name and TAG length fields and convert to bits -    uint32_t taglength = (packet.size() - 8) * 8; - -    // write length into packet -    packet[4] = (taglength >> 24) & 0xFF; -    packet[5] = (taglength >> 16) & 0xFF; -    packet[6] = (taglength >> 8) & 0xFF; -    packet[7] = taglength & 0xFF; - -    /* -    std::cerr << "TagItem ESTn, length " << packet.size() << std::endl; -    std::cerr << "              mst_length " << mst_length << std::endl; -    */ -    return packet; -} - -std::vector<uint8_t> TagStarDMY::Assemble() -{ -    std::string pack_data("*dmy"); -    std::vector<uint8_t> packet(pack_data.begin(), pack_data.end()); - -    packet.resize(4 + 4 + length_); - -    const uint32_t length_bits = length_ * 8; - -    packet[4] = (length_bits >> 24) & 0xFF; -    packet[5] = (length_bits >> 16) & 0xFF; -    packet[6] = (length_bits >> 8) & 0xFF; -    packet[7] = length_bits & 0xFF; - -    // The remaining bytes in the packet are "undefined data" - -    return packet; -} - -} - diff --git a/src/dabOutput/edi/TagItems.h b/src/dabOutput/edi/TagItems.h deleted file mode 100644 index b29a142..0000000 --- a/src/dabOutput/edi/TagItems.h +++ /dev/null @@ -1,149 +0,0 @@ -/* -   EDI output. -    This defines a few TAG items as defined ETSI TS 102 821 and -    ETSI TS 102 693 - -   Copyright (C) 2019 -   Matthias P. Braendli, matthias.braendli@mpb.li - -    http://www.opendigitalradio.org - -   */ -/* -   This file is part of ODR-DabMux. - -   ODR-DabMux is free software: you can redistribute it and/or modify -   it under the terms of the GNU General Public License as -   published by the Free Software Foundation, either version 3 of the -   License, or (at your option) any later version. - -   ODR-DabMux is distributed in the hope that it will be useful, -   but WITHOUT ANY WARRANTY; without even the implied warranty of -   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the -   GNU General Public License for more details. - -   You should have received a copy of the GNU General Public License -   along with ODR-DabMux.  If not, see <http://www.gnu.org/licenses/>. -   */ - -#pragma once - -#include "config.h" -#include "Eti.h" -#include <vector> -#include <chrono> -#include <string> -#include <stdint.h> - -namespace edi { - -class TagItem -{ -    public: -        virtual std::vector<uint8_t> Assemble() = 0; -}; - -// ETSI TS 102 693, 5.1.1 Protocol type and revision -class TagStarPTR : public TagItem -{ -    public: -        std::vector<uint8_t> Assemble(); -}; - -// ETSI TS 102 693, 5.1.3 DAB ETI(LI) Management (deti) -class TagDETI : public TagItem -{ -    public: -        std::vector<uint8_t> Assemble(); - -        /***** DATA in intermediary format ****/ -        // For the ETI Header: must be defined ! -        uint8_t stat = 0; -        uint8_t mid = 0; -        uint8_t fp = 0; -        uint8_t rfa = 0; -        uint8_t rfu = 0; // MNSC is valid -        uint16_t mnsc = 0; -        uint16_t dlfc = 0; // modulo 5000 frame counter - -        // ATST (optional) -        bool atstf = false; // presence of atst data - -        /* UTCO: Offset (in seconds) between UTC and the Seconds value. The -         * value is expressed as an unsigned 8-bit quantity. As of February -         * 2009, the value shall be 2 and shall change as a result of each -         * modification of the number of leap seconds, as proscribed by -         * International Earth Rotation and Reference Systems Service (IERS). -         * -         * According to Annex F -         *  EDI = TAI - 32s (constant) -         *  EDI = UTC + UTCO -         * we derive -         *  UTCO = TAI-UTC - 32 -         * where the TAI-UTC offset is given by the USNO bulletin using -         * the ClockTAI module. -         */ -        uint8_t utco = 0; - -        /* Update the EDI time. t is in UTC */ -        void set_edi_time(const std::time_t t, int tai_utc_offset); - -        /* The number of SI seconds since 2000-01-01 T 00:00:00 UTC as an -         * unsigned 32-bit quantity. Contrary to POSIX, this value also -         * counts leap seconds. -         */ -        uint32_t seconds = 0; - -        /* TSTA: Shall be the 24 least significant bits of the Time Stamp -         * (TIST) field from the STI-D(LI) Frame. The full definition for the -         * STI TIST can be found in annex B of EN 300 797 [4]. The most -         * significant 8 bits of the TIST field of the incoming STI-D(LI) -         * frame, if required, may be carried in the RFAD field. -         */ -        uint32_t tsta = 0xFFFFFF; - -        // the FIC (optional) -        bool ficf = false; -        const unsigned char* fic_data; -        size_t fic_length; - -        // rfu -        bool rfudf = false; -        uint32_t rfud = 0; - - -}; - -// ETSI TS 102 693, 5.1.5 ETI Sub-Channel Stream <n> -class TagESTn : public TagItem -{ -    public: -        std::vector<uint8_t> Assemble(); - -        // SSTCn -        uint8_t  scid; -        uint16_t sad; -        uint8_t  tpl; -        uint8_t  rfa; - -        // Pointer to MSTn data -        uint8_t* mst_data; -        size_t mst_length; // STLn * 8 bytes - -        uint8_t id; -}; - -// ETSI TS 102 821, 5.2.2.2 Dummy padding -class TagStarDMY : public TagItem -{ -    public: -        /* length is the TAG value length in bytes */ -        TagStarDMY(uint32_t length) : length_(length) {} -        std::vector<uint8_t> Assemble(); - -    private: -        uint32_t length_; -}; - -} - diff --git a/src/dabOutput/edi/TagPacket.cpp b/src/dabOutput/edi/TagPacket.cpp deleted file mode 100644 index b16dc33..0000000 --- a/src/dabOutput/edi/TagPacket.cpp +++ /dev/null @@ -1,79 +0,0 @@ -/* -   Copyright (C) 2014 -   Matthias P. Braendli, matthias.braendli@mpb.li - -    http://www.opendigitalradio.org - -   EDI output. -    This defines a TAG Packet. -   */ -/* -   This file is part of ODR-DabMux. - -   ODR-DabMux is free software: you can redistribute it and/or modify -   it under the terms of the GNU General Public License as -   published by the Free Software Foundation, either version 3 of the -   License, or (at your option) any later version. - -   ODR-DabMux is distributed in the hope that it will be useful, -   but WITHOUT ANY WARRANTY; without even the implied warranty of -   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the -   GNU General Public License for more details. - -   You should have received a copy of the GNU General Public License -   along with ODR-DabMux.  If not, see <http://www.gnu.org/licenses/>. -   */ - -#include "config.h" -#include "Eti.h" -#include "TagPacket.h" -#include "TagItems.h" -#include <vector> -#include <iostream> -#include <string> -#include <list> -#include <stdint.h> -#include <cassert> - -namespace edi { - -TagPacket::TagPacket(unsigned int alignment) : m_alignment(alignment) -{ } - -std::vector<uint8_t> TagPacket::Assemble() -{ -    std::list<TagItem*>::iterator tag; - -    std::vector<uint8_t> packet; - -    //std::cerr << "Assemble TAGPacket" << std::endl; - -    for (tag = tag_items.begin(); tag != tag_items.end(); ++tag) { -        std::vector<uint8_t> tag_data = (*tag)->Assemble(); -        packet.insert(packet.end(), tag_data.begin(), tag_data.end()); - -        //std::cerr << "     Add TAGItem of length " << tag_data.size() << std::endl; -    } - -    if (m_alignment == 0) { /* no padding */ } -    else if (m_alignment == 8) { -        // Add padding inside TAG packet -        while (packet.size() % 8 > 0) { -            packet.push_back(0); // TS 102 821, 5.1, "padding shall be undefined" -        } -    } -    else if (m_alignment > 8) { -        TagStarDMY dmy(m_alignment - 8); -        auto dmy_data = dmy.Assemble(); -        packet.insert(packet.end(), dmy_data.begin(), dmy_data.end()); -    } -    else { -        std::cerr << "Invalid alignment requirement " << m_alignment << -            " defined in TagPacket" << std::endl; -    } - -    return packet; -} - -} - diff --git a/src/dabOutput/edi/TagPacket.h b/src/dabOutput/edi/TagPacket.h deleted file mode 100644 index a861cbb..0000000 --- a/src/dabOutput/edi/TagPacket.h +++ /dev/null @@ -1,56 +0,0 @@ -/* -   Copyright (C) 2014 -   Matthias P. Braendli, matthias.braendli@mpb.li - -    http://www.opendigitalradio.org - -   EDI output. -    This defines a TAG Packet. -   */ -/* -   This file is part of ODR-DabMux. - -   ODR-DabMux is free software: you can redistribute it and/or modify -   it under the terms of the GNU General Public License as -   published by the Free Software Foundation, either version 3 of the -   License, or (at your option) any later version. - -   ODR-DabMux is distributed in the hope that it will be useful, -   but WITHOUT ANY WARRANTY; without even the implied warranty of -   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the -   GNU General Public License for more details. - -   You should have received a copy of the GNU General Public License -   along with ODR-DabMux.  If not, see <http://www.gnu.org/licenses/>. -   */ - -#pragma once - -#include "config.h" -#include "TagItems.h" -#include <vector> -#include <string> -#include <list> -#include <stdint.h> - -namespace edi { - -// A TagPacket is nothing else than a list of tag items, with an -// Assemble function that puts the bytestream together and adds -// padding such that the total length is a multiple of 8 Bytes. -// -// ETSI TS 102 821, 5.1 Tag Packet -class TagPacket -{ -    public: -        TagPacket(unsigned int alignment); -        std::vector<uint8_t> Assemble(); - -        std::list<TagItem*> tag_items; - -    private: -        unsigned int m_alignment; -}; - -} - diff --git a/src/dabOutput/edi/Transport.cpp b/src/dabOutput/edi/Transport.cpp deleted file mode 100644 index d99e987..0000000 --- a/src/dabOutput/edi/Transport.cpp +++ /dev/null @@ -1,182 +0,0 @@ -/* -   Copyright (C) 2019 -   Matthias P. Braendli, matthias.braendli@mpb.li - -    http://www.opendigitalradio.org - -   EDI output, -   UDP and TCP transports and their configuration - -   */ -/* -   This file is part of ODR-DabMux. - -   ODR-DabMux is free software: you can redistribute it and/or modify -   it under the terms of the GNU General Public License as -   published by the Free Software Foundation, either version 3 of the -   License, or (at your option) any later version. - -   ODR-DabMux is distributed in the hope that it will be useful, -   but WITHOUT ANY WARRANTY; without even the implied warranty of -   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the -   GNU General Public License for more details. - -   You should have received a copy of the GNU General Public License -   along with ODR-DabMux.  If not, see <http://www.gnu.org/licenses/>. -   */ - -#include "Transport.h" -#include <iterator> - -using namespace std; - -namespace edi { - -void configuration_t::print() const -{ -    etiLog.level(info) << "EDI"; -    etiLog.level(info) << " verbose     " << verbose; -    for (auto edi_dest : destinations) { -        if (auto udp_dest = dynamic_pointer_cast<edi::udp_destination_t>(edi_dest)) { -            etiLog.level(info) << " UDP to " << udp_dest->dest_addr << ":" << dest_port; -            if (not udp_dest->source_addr.empty()) { -                etiLog.level(info) << "  source      " << udp_dest->source_addr; -                etiLog.level(info) << "  ttl         " << udp_dest->ttl; -            } -            etiLog.level(info) << "  source port " << udp_dest->source_port; -        } -        else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_destination_t>(edi_dest)) { -            etiLog.level(info) << " TCP listening on port " << tcp_dest->listen_port; -            etiLog.level(info) << "  max frames queued    " << tcp_dest->max_frames_queued; -        } -        else { -            throw std::logic_error("EDI destination not implemented"); -        } -    } -    if (interleaver_enabled()) { -        etiLog.level(info) << " interleave     " << latency_frames * 24 << " ms"; -    } -} - - -Sender::Sender(const configuration_t& conf) : -    m_conf(conf), -    edi_pft(m_conf) -{ -    if (m_conf.verbose) { -        etiLog.log(info, "Setup EDI"); -    } - -    for (const auto& edi_dest : m_conf.destinations) { -        if (const auto udp_dest = dynamic_pointer_cast<edi::udp_destination_t>(edi_dest)) { -            auto udp_socket = std::make_shared<UdpSocket>(udp_dest->source_port); - -            if (not udp_dest->source_addr.empty()) { -                int err = udp_socket->setMulticastSource(udp_dest->source_addr.c_str()); -                if (err) { -                    throw runtime_error("EDI socket set source failed!"); -                } -                err = udp_socket->setMulticastTTL(udp_dest->ttl); -                if (err) { -                    throw runtime_error("EDI socket set TTL failed!"); -                } -            } - -            udp_sockets.emplace(udp_dest.get(), udp_socket); -        } -        else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_destination_t>(edi_dest)) { -            auto dispatcher = make_shared<TCPDataDispatcher>(tcp_dest->max_frames_queued); -            dispatcher->start(tcp_dest->listen_port, "0.0.0.0"); -            tcp_dispatchers.emplace(tcp_dest.get(), dispatcher); -        } -        else { -            throw std::logic_error("EDI destination not implemented"); -        } -    } - -    if (m_conf.interleaver_enabled()) { -        edi_interleaver.SetLatency(m_conf.latency_frames); -    } - -    if (m_conf.dump) { -        edi_debug_file.open("./edi.debug"); -    } - -    if (m_conf.verbose) { -        etiLog.log(info, "EDI set up"); -    } -} - -void Sender::write(const TagPacket& tagpacket) -{ -    // Assemble into one AF Packet -    edi::AFPacket af_packet = edi_afPacketiser.Assemble(tagpacket); - -    if (m_conf.enable_pft) { -        // Apply PFT layer to AF Packet (Reed Solomon FEC and Fragmentation) -        vector<edi::PFTFragment> edi_fragments = edi_pft.Assemble(af_packet); - -        if (m_conf.verbose) { -            fprintf(stderr, "EDI number of PFT fragment before interleaver %zu", -                    edi_fragments.size()); -        } - -        if (m_conf.interleaver_enabled()) { -            edi_fragments = edi_interleaver.Interleave(edi_fragments); -        } - -        // Send over ethernet -        for (const auto& edi_frag : edi_fragments) { -            for (auto& dest : m_conf.destinations) { -                if (const auto& udp_dest = dynamic_pointer_cast<edi::udp_destination_t>(dest)) { -                    InetAddress addr; -                    addr.setAddress(udp_dest->dest_addr.c_str()); -                    addr.setPort(m_conf.dest_port); - -                    udp_sockets.at(udp_dest.get())->send(edi_frag, addr); -                } -                else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_destination_t>(dest)) { -                    tcp_dispatchers.at(tcp_dest.get())->write(edi_frag); -                } -                else { -                    throw std::logic_error("EDI destination not implemented"); -                } -            } - -            if (m_conf.dump) { -                std::ostream_iterator<uint8_t> debug_iterator(edi_debug_file); -                std::copy(edi_frag.begin(), edi_frag.end(), debug_iterator); -            } -        } - -        if (m_conf.verbose) { -            fprintf(stderr, "EDI number of PFT fragments %zu", -                    edi_fragments.size()); -        } -    } -    else { -        // Send over ethernet -        for (auto& dest : m_conf.destinations) { -            if (const auto& udp_dest = dynamic_pointer_cast<edi::udp_destination_t>(dest)) { -                InetAddress addr; -                addr.setAddress(udp_dest->dest_addr.c_str()); -                addr.setPort(m_conf.dest_port); - -                udp_sockets.at(udp_dest.get())->send(af_packet, addr); -            } -            else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_destination_t>(dest)) { -                tcp_dispatchers.at(tcp_dest.get())->write(af_packet); -            } -            else { -                throw std::logic_error("EDI destination not implemented"); -            } -        } - -        if (m_conf.dump) { -            std::ostream_iterator<uint8_t> debug_iterator(edi_debug_file); -            std::copy(af_packet.begin(), af_packet.end(), debug_iterator); -        } -    } -} - -} diff --git a/src/dabOutput/edi/Transport.h b/src/dabOutput/edi/Transport.h deleted file mode 100644 index 7b0a0db..0000000 --- a/src/dabOutput/edi/Transport.h +++ /dev/null @@ -1,69 +0,0 @@ -/* -   Copyright (C) 2019 -   Matthias P. Braendli, matthias.braendli@mpb.li - -    http://www.opendigitalradio.org - -   EDI output, -   UDP and TCP transports and their configuration - -   */ -/* -   This file is part of ODR-DabMux. - -   ODR-DabMux is free software: you can redistribute it and/or modify -   it under the terms of the GNU General Public License as -   published by the Free Software Foundation, either version 3 of the -   License, or (at your option) any later version. - -   ODR-DabMux is distributed in the hope that it will be useful, -   but WITHOUT ANY WARRANTY; without even the implied warranty of -   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the -   GNU General Public License for more details. - -   You should have received a copy of the GNU General Public License -   along with ODR-DabMux.  If not, see <http://www.gnu.org/licenses/>. -   */ - -#pragma once - -#include "config.h" -#include "dabOutput/edi/Config.h" -#include "AFPacket.h" -#include "PFT.h" -#include "Interleaver.h" -#include <vector> -#include <unordered_map> -#include <stdexcept> -#include <cstdint> -#include "dabOutput/dabOutput.h" - -namespace edi { - -/** Configuration for EDI output */ - -class Sender { -    public: -        Sender(const configuration_t& conf); - -        void write(const TagPacket& tagpacket); - -    private: -        configuration_t m_conf; -        std::ofstream edi_debug_file; - -        // The TagPacket will then be placed into an AFPacket -        edi::AFPacketiser edi_afPacketiser; - -        // The AF Packet will be protected with reed-solomon and split in fragments -        edi::PFT edi_pft; - -        // To mitigate for burst packet loss, PFT fragments can be sent out-of-order -        edi::Interleaver edi_interleaver; - -        std::unordered_map<udp_destination_t*, std::shared_ptr<UdpSocket>> udp_sockets; -        std::unordered_map<tcp_destination_t*, std::shared_ptr<TCPDataDispatcher>> tcp_dispatchers; -}; - -} - diff --git a/src/dabOutput/metadata.h b/src/dabOutput/metadata.h index ed16de5..34e146f 100644 --- a/src/dabOutput/metadata.h +++ b/src/dabOutput/metadata.h @@ -54,6 +54,8 @@ enum class output_metadata_id_e {  };  struct OutputMetadata { +    virtual ~OutputMetadata() {}; +      virtual output_metadata_id_e getId(void) const = 0;      virtual size_t getLength(void) const = 0; diff --git a/src/fig/FIG0_19.cpp b/src/fig/FIG0_19.cpp index f032bd5..5b6a384 100644 --- a/src/fig/FIG0_19.cpp +++ b/src/fig/FIG0_19.cpp @@ -109,6 +109,19 @@ FillStatus FIG0_19::fill(uint8_t *buf, size_t max_size)          else {              fig0_19->ASw = 0;          } + +        /* From the crc-mmbtools google groups, 2019-07-11, L. Cornell: +         * +         * Long ago, there was a defined use for the New flag - it was intended +         * to indicate whether the announcement was new or was a repeated +         * announcement.  But the problem is that it doesn't really help +         * receivers because they might tune to the ensemble at any time, and +         * might tune to a service that may be interrupted at any time.  So +         * some years ago it was decided that the New flag would not longer be +         * used in transmissions.  The setting was fixed to be 1 because some +         * receivers would have never switched to the announcement if the flag +         * was set to 0. +         */          fig0_19->NewFlag = 1;          fig0_19->RegionFlag = 0;          fig0_19->SubChId = 0; diff --git a/src/input/Edi.cpp b/src/input/Edi.cpp new file mode 100644 index 0000000..b5301d2 --- /dev/null +++ b/src/input/Edi.cpp @@ -0,0 +1,427 @@ +/* +   Copyright (C) 2009 Her Majesty the Queen in Right of Canada (Communications +   Research Center Canada) + +   Copyright (C) 2019 +   Matthias P. Braendli, matthias.braendli@mpb.li + +    http://www.opendigitalradio.org +   */ +/* +   This file is part of ODR-DabMux. + +   ODR-DabMux is free software: you can redistribute it and/or modify +   it under the terms of the GNU General Public License as +   published by the Free Software Foundation, either version 3 of the +   License, or (at your option) any later version. + +   ODR-DabMux is distributed in the hope that it will be useful, +   but WITHOUT ANY WARRANTY; without even the implied warranty of +   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +   GNU General Public License for more details. + +   You should have received a copy of the GNU General Public License +   along with ODR-DabMux.  If not, see <http://www.gnu.org/licenses/>. +   */ + +#include "input/Edi.h" + +#include <regex> +#include <chrono> +#include <stdexcept> +#include <sstream> +#include <cstring> +#include <cmath> +#include <cstdlib> +#include <cerrno> +#include <climits> +#include "utils.h" + +using namespace std; + +namespace Inputs { + +constexpr bool VERBOSE = false; +constexpr size_t TCP_BLOCKSIZE = 2048; + +Edi::Edi(const std::string& name, const dab_input_edi_config_t& config) : +    RemoteControllable(name), +    m_tcp_receive_server(TCP_BLOCKSIZE), +    m_sti_writer(bind(&Edi::m_new_sti_frame_callback, this, placeholders::_1)), +    m_sti_decoder(m_sti_writer, VERBOSE), +    m_max_frames_overrun(config.buffer_size), +    m_num_frames_prebuffering(config.prebuffering), +    m_name(name), +    m_stats(name) +{ +    RC_ADD_PARAMETER(buffermanagement, +            "Set type of buffer management to use [prebuffering, timestamped]"); + +    RC_ADD_PARAMETER(buffer, +            "Size of the input buffer [24ms frames]"); + +    RC_ADD_PARAMETER(prebuffering, +            "Min buffer level before streaming starts [24ms frames]"); + +    RC_ADD_PARAMETER(tistdelay, "TIST delay to add [ms]"); +} + +Edi::~Edi() { +    m_running = false; +    if (m_thread.joinable()) { +        m_thread.join(); +    } +} + +void Edi::open(const std::string& name) +{ +    const std::regex re_udp("udp://:([0-9]+)"); +    const std::regex re_tcp("tcp://(.*):([0-9]+)"); + +    lock_guard<mutex> lock(m_mutex); + +    m_running = false; +    if (m_thread.joinable()) { +        m_thread.join(); +    } + +    std::smatch m; +    if (std::regex_match(name, m, re_udp)) { +        const int udp_port = std::stoi(m[1].str()); +        m_input_used = InputUsed::UDP; +        m_udp_sock.reinit(udp_port); +        m_udp_sock.setBlocking(false); +        // TODO multicast +    } +    else if (std::regex_match(name, m, re_tcp)) { +        m_input_used = InputUsed::TCP; +        const string addr = m[1].str(); +        const int tcp_port = std::stoi(m[2].str()); +        m_tcp_receive_server.start(tcp_port, addr); +    } +    else { +        throw runtime_error("Cannot parse EDI input URI"); +    } + +    m_stats.registerAtServer(); + +    m_running = true; +    m_thread = std::thread(&Edi::m_run, this); +} + +size_t Edi::readFrame(uint8_t *buffer, size_t size) +{ +    // Save stats data in bytes, not in frames +    m_stats.notifyBuffer(m_frames.size() * size); + +    EdiDecoder::sti_frame_t sti; +    if (m_is_prebuffering) { +        m_is_prebuffering = m_frames.size() < m_num_frames_prebuffering; +        if (not m_is_prebuffering) { +            etiLog.level(info) << "EDI input " << m_name << " pre-buffering complete."; +        } +        memset(buffer, 0, size * sizeof(*buffer)); +        return 0; +    } +    else if (not m_pending_sti_frame.frame.empty()) { +        // Can only happen when switching from timestamp-based buffer management! +        if (m_pending_sti_frame.frame.size() != size) { +            etiLog.level(debug) << "EDI input " << m_name << " size mismatch: " << +                m_pending_sti_frame.frame.size() << " received, " << size << " requested"; +            memset(buffer, 0, size * sizeof(*buffer)); +            return 0; +        } +        else { +            if (not m_pending_sti_frame.version_data.version.empty()) { +                m_stats.notifyVersion( +                        m_pending_sti_frame.version_data.version, +                        m_pending_sti_frame.version_data.uptime_s); +            } +            m_stats.notifyPeakLevels(m_pending_sti_frame.audio_levels.left, m_pending_sti_frame.audio_levels.right); + +            copy(m_pending_sti_frame.frame.begin(), +                    m_pending_sti_frame.frame.end(), +                    buffer); +            m_pending_sti_frame.frame.clear(); +            return size; +        } +    } +    else if (m_frames.try_pop(sti)) { +        if (sti.frame.size() == 0) { +            etiLog.level(debug) << "EDI input " << m_name << " empty frame"; +            return 0; +        } +        else if (sti.frame.size() == size) { +            // Steady-state when everything works well +            if (m_frames.size() > m_max_frames_overrun) { +                m_stats.notifyOverrun(); + +                /* If the buffer is too full, we drop as many frames as needed +                 * to get down to the prebuffering size. We would like to have our buffer +                 * filled to the prebuffering length. */ +                size_t over_max = m_frames.size() - m_num_frames_prebuffering; + +                while (over_max--) { +                    EdiDecoder::sti_frame_t discard; +                    m_frames.try_pop(discard); +                } +            } + +            if (not sti.version_data.version.empty()) { +                m_stats.notifyVersion( +                        sti.version_data.version, +                        sti.version_data.uptime_s); +            } +            m_stats.notifyPeakLevels(sti.audio_levels.left, sti.audio_levels.right); + +            copy(sti.frame.cbegin(), sti.frame.cend(), buffer); +            return size; +        } +        else { +            etiLog.level(debug) << "EDI input " << m_name << " size mismatch: " << +                sti.frame.size() << " received, " << size << " requested"; +            memset(buffer, 0, size * sizeof(*buffer)); +            return 0; +        } +    } +    else { +        memset(buffer, 0, size * sizeof(*buffer)); +        m_is_prebuffering = true; +        etiLog.level(info) << "EDI input " << m_name << " re-enabling pre-buffering"; +        m_stats.notifyUnderrun(); +        return 0; +    } +} + +size_t Edi::readFrame(uint8_t *buffer, size_t size, std::time_t seconds, int utco, uint32_t tsta) +{ +    if (m_pending_sti_frame.frame.empty()) { +        m_frames.try_pop(m_pending_sti_frame); +    } + +    m_stats.notifyBuffer(m_frames.size() * size); + +    if (m_is_prebuffering) { +        if (m_pending_sti_frame.frame.empty()) { +            memset(buffer, 0, size); +            return 0; +        } +        else if (m_pending_sti_frame.frame.size() == size) { +            // readFrame gets called every 24ms, so we allow max 24ms +            // difference between the input frame timestamp and the requested +            // timestamp. +            if (m_pending_sti_frame.timestamp.valid()) { +                auto ts_req = EdiDecoder::frame_timestamp_t::from_unix_epoch(seconds, utco, tsta); +                ts_req += m_tist_delay; +                const double offset = m_pending_sti_frame.timestamp.diff_ms(ts_req); + +                if (offset < 24e-3) { +                    m_is_prebuffering = false; +                    etiLog.level(warn) << "EDI input " << m_name << +                        " valid timestamp, pre-buffering complete"; + +                    if (not m_pending_sti_frame.version_data.version.empty()) { +                        m_stats.notifyVersion( +                                m_pending_sti_frame.version_data.version, +                                m_pending_sti_frame.version_data.uptime_s); +                    } + +                    m_stats.notifyPeakLevels(m_pending_sti_frame.audio_levels.left, +                            m_pending_sti_frame.audio_levels.right); +                    copy(m_pending_sti_frame.frame.cbegin(), m_pending_sti_frame.frame.cend(), buffer); +                    m_pending_sti_frame.frame.clear(); +                    return size; +                } +                else { +                    // Wait more, but erase the front of the frame queue to avoid +                    // stalling on one frame with incorrect timestamp +                    if (m_frames.size() >= m_max_frames_overrun) { +                        m_pending_sti_frame.frame.clear(); +                    } +                    m_stats.notifyUnderrun(); +                    memset(buffer, 0, size); +                    return 0; +                } +            } +            else { +                etiLog.level(debug) << "EDI input " << m_name << +                    " skipping frame without timestamp"; +                m_pending_sti_frame.frame.clear(); +                memset(buffer, 0, size); +                return 0; +            } +        } +        else { +            etiLog.level(debug) << "EDI input " << m_name << " size mismatch: " << +                m_pending_sti_frame.frame.size() << " received, " << size << " requested"; +            m_pending_sti_frame.frame.clear(); +            memset(buffer, 0, size); +            return 0; +        } +    } +    else { +        if (m_pending_sti_frame.frame.empty()) { +            etiLog.level(warn) << "EDI input " << m_name << +                " empty, re-enabling pre-buffering"; +            memset(buffer, 0, size); +            m_stats.notifyUnderrun(); +            m_is_prebuffering = true; +            return 0; +        } +        else if (not m_pending_sti_frame.timestamp.valid()) { +            etiLog.level(warn) << "EDI input " << m_name << +                " invalid timestamp, ignoring"; +            memset(buffer, 0, size); +            m_pending_sti_frame.frame.clear(); +            m_stats.notifyUnderrun(); +            return 0; +        } +        else { +            auto ts_req = EdiDecoder::frame_timestamp_t::from_unix_epoch(seconds, utco, tsta); +            ts_req += m_tist_delay; +            const double offset = m_pending_sti_frame.timestamp.diff_ms(ts_req); + +            if (offset > 24e-3) { +                m_stats.notifyUnderrun(); +                m_is_prebuffering = true; +                m_pending_sti_frame.frame.clear(); +                etiLog.level(warn) << "EDI input " << m_name << +                    " timestamp out of bounds, re-enabling pre-buffering"; +                memset(buffer, 0, size); +                return 0; +            } +            else { +                if (not m_pending_sti_frame.version_data.version.empty()) { +                    m_stats.notifyVersion( +                            m_pending_sti_frame.version_data.version, +                            m_pending_sti_frame.version_data.uptime_s); +                } + +                m_stats.notifyPeakLevels(m_pending_sti_frame.audio_levels.left, +                        m_pending_sti_frame.audio_levels.right); +                copy(m_pending_sti_frame.frame.cbegin(), m_pending_sti_frame.frame.cend(), buffer); +                m_pending_sti_frame.frame.clear(); +                return size; +            } +        } +    } +} + +void Edi::m_run() +{ +    while (m_running) { +        switch (m_input_used) { +            case InputUsed::UDP: +                { +                    constexpr size_t packsize = 2048; +                    const auto packet = m_udp_sock.receive(packsize); +                    if (packet.buffer.size() == packsize) { +                        fprintf(stderr, "Warning, possible UDP truncation\n"); +                    } +                    if (not packet.buffer.empty()) { +                        m_sti_decoder.push_packet(packet.buffer); +                    } +                    else { +                        this_thread::sleep_for(chrono::milliseconds(12)); +                    } +                } +                break; +            case InputUsed::TCP: +                { +                    auto packet = m_tcp_receive_server.receive(); +                    if (not packet.empty()) { +                        m_sti_decoder.push_bytes(packet); +                    } +                    else { +                        this_thread::sleep_for(chrono::milliseconds(12)); +                    } +                } +                break; +            default: +                throw logic_error("unimplemented input"); +        } +    } +} + +void Edi::m_new_sti_frame_callback(EdiDecoder::sti_frame_t&& sti) { +    if (not sti.frame.empty()) { +        // We should not wait here, because we want the complete input buffering +        // happening inside m_frames. Using the blocking function is only a protection +        // against runaway memory usage if something goes wrong in the consumer. +        m_frames.push_wait_if_full(move(sti), m_max_frames_overrun * 2); +    } +} + +int Edi::setBitrate(int bitrate) +{ +    if (bitrate <= 0) { +        throw invalid_argument("Invalid bitrate " + to_string(bitrate) + " for " + m_name); +    } + +    return bitrate; +} + +void Edi::close() +{ +    m_udp_sock.close(); +} + + +void Edi::set_parameter(const std::string& parameter, const std::string& value) +{ +    if (parameter == "buffer") { +        size_t new_limit = atol(value.c_str()); +        m_max_frames_overrun = new_limit; +    } +    else if (parameter == "prebuffering") { +        size_t new_limit = atol(value.c_str()); +        m_num_frames_prebuffering = new_limit; +    } +    else if (parameter == "buffermanagement") { +        if (value == "prebuffering") { +            setBufferManagement(Inputs::BufferManagement::Prebuffering); +        } +        else if (value == "timestamped") { +            setBufferManagement(Inputs::BufferManagement::Timestamped); +        } +        else { +            throw ParameterError("Invalid value for '" + parameter + "' in controllable " + get_rc_name()); +        } +    } +    else if (parameter == "tistdelay") { +        m_tist_delay = chrono::milliseconds(stoi(value)); +    } +    else { +        throw ParameterError("Parameter '" + parameter + "' is not exported by controllable " + get_rc_name()); +    } +} + +const std::string Edi::get_parameter(const std::string& parameter) const +{ +    stringstream ss; +    if (parameter == "buffer") { +        ss << m_max_frames_overrun; +    } +    else if (parameter == "prebuffering") { +        ss << m_num_frames_prebuffering; +    } +    else if (parameter == "buffermanagement") { +        switch (getBufferManagement()) { +            case Inputs::BufferManagement::Prebuffering: +                ss << "prebuffering"; +                break; +            case Inputs::BufferManagement::Timestamped: +                ss << "Timestamped"; +                break; +        } +    } +    else if (parameter == "tistdelay") { +        ss << m_tist_delay.count(); +    } +    else { +        throw ParameterError("Parameter '" + parameter + "' is not exported by controllable " + get_rc_name()); +    } +    return ss.str(); +} + +} diff --git a/src/input/Edi.h b/src/input/Edi.h new file mode 100644 index 0000000..ca465bd --- /dev/null +++ b/src/input/Edi.h @@ -0,0 +1,126 @@ +/* +   Copyright (C) 2009 Her Majesty the Queen in Right of Canada (Communications +   Research Center Canada) + +   Copyright (C) 2019 +   Matthias P. Braendli, matthias.braendli@mpb.li + +    http://www.opendigitalradio.org +   */ +/* +   This file is part of ODR-DabMux. + +   ODR-DabMux is free software: you can redistribute it and/or modify +   it under the terms of the GNU General Public License as +   published by the Free Software Foundation, either version 3 of the +   License, or (at your option) any later version. + +   ODR-DabMux is distributed in the hope that it will be useful, +   but WITHOUT ANY WARRANTY; without even the implied warranty of +   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +   GNU General Public License for more details. + +   You should have received a copy of the GNU General Public License +   along with ODR-DabMux.  If not, see <http://www.gnu.org/licenses/>. +   */ + +#pragma once + +#include <string> +#include <vector> +#include <deque> +#include <thread> +#include <mutex> +#include "Socket.h" +#include "input/inputs.h" +#include "edi/STIDecoder.hpp" +#include "edi/STIWriter.hpp" +#include "ThreadsafeQueue.h" +#include "ManagementServer.h" + +namespace Inputs { + +struct dab_input_edi_config_t +{ +    /* The size of the internal buffer, measured in number +     * of elements. +     * +     * Each element corresponds to one frame, i.e. 24ms +     */ +    size_t buffer_size = 100; + +    /* The amount of prebuffering to do before we start streaming +     * +     * Same units as buffer_size +     */ +    size_t prebuffering = 30; +}; + +/* + * Receives EDI from UDP or TCP in a separate thread and pushes that data + * into the STIDecoder. Complete frames are then put into a queue for the consumer. + * + * This way, the EDI decoding happens in a separate thread. + */ +class Edi : public InputBase, public RemoteControllable { +    public: +        Edi(const std::string& name, const dab_input_edi_config_t& config); +        Edi(const Edi&) = delete; +        Edi& operator=(const Edi&) = delete; +        ~Edi(); + +        virtual void open(const std::string& name); +        virtual size_t readFrame(uint8_t *buffer, size_t size); +        virtual size_t readFrame(uint8_t *buffer, size_t size, std::time_t seconds, int utco, uint32_t tsta); +        virtual int setBitrate(int bitrate); +        virtual void close(); + +        /* Remote control */ +        virtual void set_parameter(const std::string& parameter, const std::string& value); +        virtual const std::string get_parameter(const std::string& parameter) const; + +    protected: +        void m_run(); + +        void m_new_sti_frame_callback(EdiDecoder::sti_frame_t&& frame); + +        std::mutex m_mutex; + +        enum class InputUsed { Invalid, UDP, TCP }; +        InputUsed m_input_used = InputUsed::Invalid; +        Socket::UDPSocket m_udp_sock; +        Socket::TCPReceiveServer m_tcp_receive_server; + +        EdiDecoder::STIWriter m_sti_writer; +        EdiDecoder::STIDecoder m_sti_decoder; +        std::thread m_thread; +        std::atomic<bool> m_running = ATOMIC_VAR_INIT(false); +        ThreadsafeQueue<EdiDecoder::sti_frame_t> m_frames; + +        // InputBase defines bufferManagement and tist delay + +        // Used in timestamp-based buffer management +        EdiDecoder::sti_frame_t m_pending_sti_frame; + +        // State variable used in prebuffering-based buffer management +        bool m_is_prebuffering = true; + +        /* When using prebuffering, consider the buffer to be full on the +         * receive side if it's above the overrun threshold. +         * +         * When using timestamping, start discarding the front of the queue once the queue +         * is this full. Must be smaller than m_max_frames_queued. +         * +         * Parameter 'buffer' inside RC. */ +        std::atomic<size_t> m_max_frames_overrun = ATOMIC_VAR_INIT(1000); + +        /* When not using timestamping, how many frames to prebuffer. +         * Parameter 'prebuffering' inside RC. */ +        std::atomic<size_t> m_num_frames_prebuffering = ATOMIC_VAR_INIT(10); + +        std::string m_name; +        InputStat m_stats; +}; + +}; + diff --git a/src/input/File.cpp b/src/input/File.cpp index 20036ae..46bfb59 100644 --- a/src/input/File.cpp +++ b/src/input/File.cpp @@ -35,6 +35,8 @@  #include "mpeg.h"  #include "ReedSolomon.h" +using namespace std; +  namespace Inputs {  #ifdef _WIN32 @@ -58,7 +60,7 @@ __attribute((packed))  ; -int FileBase::open(const std::string& name) +void FileBase::open(const std::string& name)  {      int flags = O_RDONLY | O_BINARY;      if (m_nonblock) { @@ -67,30 +69,35 @@ int FileBase::open(const std::string& name)      m_fd = ::open(name.c_str(), flags);      if (m_fd == -1) { -        throw std::runtime_error("Could not open input file " + name + ": " + +        throw runtime_error("Could not open input file " + name + ": " +              strerror(errno));      } +} + +size_t FileBase::readFrame(uint8_t *buffer, size_t size, std::time_t seconds, int utco, uint32_t tsta) +{ +    // Will not be implemented, as there is no obvious way to carry timestamps +    // in files. +    memset(buffer, 0, size);      return 0;  }  int FileBase::setBitrate(int bitrate)  {      if (bitrate <= 0) { -        etiLog.log(error, "Invalid bitrate (%i)", bitrate); -        return -1; +        throw invalid_argument("Invalid bitrate " + to_string(bitrate));      }      return bitrate;  } -int FileBase::close() +void FileBase::close()  {      if (m_fd != -1) {          ::close(m_fd);          m_fd = -1;      } -    return 0;  }  void FileBase::setNonblocking(bool nonblock) @@ -182,7 +189,7 @@ ssize_t FileBase::readFromFile(uint8_t* buffer, size_t size)      return size;  } -int MPEGFile::readFrame(uint8_t* buffer, size_t size) +size_t MPEGFile::readFrame(uint8_t *buffer, size_t size)  {      int result;      bool do_rewind = false; @@ -275,12 +282,18 @@ MUTE_SUBCHANNEL:                  }              }      } + +    // TODO this is probably wrong, because it should return +    // the number of bytes written.      return result;  }  int MPEGFile::setBitrate(int bitrate)  { -    if (bitrate == 0) { +    if (bitrate < 0) { +        throw invalid_argument("Invalid bitrate " + to_string(bitrate)); +    } +    else if (bitrate == 0) {          uint8_t buffer[4];          if (readFrame(buffer, 4) == 0) { @@ -294,7 +307,7 @@ int MPEGFile::setBitrate(int bitrate)      return bitrate;  } -int RawFile::readFrame(uint8_t* buffer, size_t size) +size_t RawFile::readFrame(uint8_t *buffer, size_t size)  {      return readFromFile(buffer, size);  } @@ -304,7 +317,7 @@ PacketFile::PacketFile(bool enhancedPacketMode)      m_enhancedPacketEnabled = enhancedPacketMode;  } -int PacketFile::readFrame(uint8_t* buffer, size_t size) +size_t PacketFile::readFrame(uint8_t *buffer, size_t size)  {      size_t written = 0;      int length; @@ -357,7 +370,7 @@ int PacketFile::readFrame(uint8_t* buffer, size_t size)                      length = 24;                  }                  else { -                    std::copy(m_packetData.begin(), +                    copy(m_packetData.begin(),                              m_packetData.begin() + m_packetLength,                              buffer);                      length = m_packetLength; @@ -365,7 +378,7 @@ int PacketFile::readFrame(uint8_t* buffer, size_t size)                  }              }              else { -                std::copy(m_packetData.begin(), +                copy(m_packetData.begin(),                          m_packetData.begin() + m_packetLength,                          buffer);                  length = m_packetLength; diff --git a/src/input/File.h b/src/input/File.h index b574c39..39ce7fd 100644 --- a/src/input/File.h +++ b/src/input/File.h @@ -36,10 +36,11 @@ namespace Inputs {  class FileBase : public InputBase {      public: -        virtual int open(const std::string& name); -        virtual int readFrame(uint8_t* buffer, size_t size) = 0; +        virtual void open(const std::string& name); +        virtual size_t readFrame(uint8_t *buffer, size_t size) = 0; +        virtual size_t readFrame(uint8_t *buffer, size_t size, std::time_t seconds, int utco, uint32_t tsta);          virtual int setBitrate(int bitrate); -        virtual int close(); +        virtual void close();          virtual void setNonblocking(bool nonblock); @@ -63,7 +64,7 @@ class FileBase : public InputBase {  class MPEGFile : public FileBase {      public: -        virtual int readFrame(uint8_t* buffer, size_t size); +        virtual size_t readFrame(uint8_t *buffer, size_t size);          virtual int setBitrate(int bitrate);      private: @@ -72,13 +73,13 @@ class MPEGFile : public FileBase {  class RawFile : public FileBase {      public: -        virtual int readFrame(uint8_t* buffer, size_t size); +        virtual size_t readFrame(uint8_t *buffer, size_t size);  };  class PacketFile : public FileBase {      public:          PacketFile(bool enhancedPacketMode); -        virtual int readFrame(uint8_t* buffer, size_t size); +        virtual size_t readFrame(uint8_t *buffer, size_t size);      protected:          std::array<uint8_t, 96> m_packetData; diff --git a/src/input/Prbs.cpp b/src/input/Prbs.cpp index 7856a46..155e625 100644 --- a/src/input/Prbs.cpp +++ b/src/input/Prbs.cpp @@ -44,7 +44,7 @@ namespace Inputs {  // Preferred polynomial is G(x) = x^20 + x^17 + 1  const uint32_t PRBS_DEFAULT_POLY = (1 << 20) | (1 << 17) | (1 << 0); -int Prbs::open(const string& name) +void Prbs::open(const string& name)  {      if (name.substr(0, 7) != "prbs://") {          throw logic_error("Invalid PRBS name"); @@ -73,11 +73,9 @@ int Prbs::open(const string& name)          m_prbs.setup(polynomial);      }      rewind(); - -    return 0;  } -int Prbs::readFrame(uint8_t* buffer, size_t size) +size_t Prbs::readFrame(uint8_t *buffer, size_t size)  {      for (size_t i = 0; i < size; ++i) {          buffer[i] = m_prbs.step(); @@ -86,14 +84,22 @@ int Prbs::readFrame(uint8_t* buffer, size_t size)      return size;  } +size_t Prbs::readFrame(uint8_t *buffer, size_t size, std::time_t seconds, int utco, uint32_t tsta) +{ +    memset(buffer, 0, size); +    return 0; +} +  int Prbs::setBitrate(int bitrate)  { +    if (bitrate <= 0) { +        throw invalid_argument("Invalid bitrate " + to_string(bitrate)); +    }      return bitrate;  } -int Prbs::close() +void Prbs::close()  { -    return 0;  }  int Prbs::rewind() diff --git a/src/input/Prbs.h b/src/input/Prbs.h index 51b7756..e2b94ec 100644 --- a/src/input/Prbs.h +++ b/src/input/Prbs.h @@ -37,10 +37,11 @@ namespace Inputs {  class Prbs : public InputBase {      public: -        virtual int open(const std::string& name); -        virtual int readFrame(uint8_t* buffer, size_t size); +        virtual void open(const std::string& name); +        virtual size_t readFrame(uint8_t *buffer, size_t size); +        virtual size_t readFrame(uint8_t *buffer, size_t size, std::time_t seconds, int utco, uint32_t tsta);          virtual int setBitrate(int bitrate); -        virtual int close(); +        virtual void close();      private:          virtual int rewind(); diff --git a/src/input/Udp.cpp b/src/input/Udp.cpp index 2cb49e7..a37ee21 100644 --- a/src/input/Udp.cpp +++ b/src/input/Udp.cpp @@ -38,7 +38,7 @@ using namespace std;  namespace Inputs { -int Udp::open(const std::string& name) +void Udp::open(const std::string& name)  {      // Skip the udp:// part if it is present      const string endpoint = (name.substr(0, 6) == "udp://") ? @@ -57,8 +57,6 @@ int Udp::open(const std::string& name)      m_name = name;      openUdpSocket(endpoint); - -    return 0;  }  void Udp::openUdpSocket(const std::string& endpoint) @@ -82,61 +80,50 @@ void Udp::openUdpSocket(const std::string& endpoint)          throw out_of_range("can't use port number 0 in udp address");      } -    if (m_sock.reinit(port, address) == -1) { -        stringstream ss; -        ss << "Could not init UDP socket: " << inetErrMsg; -        throw runtime_error(ss.str()); -    } - -    if (m_sock.setBlocking(false) == -1) { -        stringstream ss; -        ss << "Could not set non-blocking UDP socket: " << inetErrMsg; -        throw runtime_error(ss.str()); -    } +    m_sock.reinit(port, address); +    m_sock.setBlocking(false);      etiLog.level(info) << "Opened UDP port " << address << ":" << port;  } -int Udp::readFrame(uint8_t* buffer, size_t size) +size_t Udp::readFrame(uint8_t *buffer, size_t size)  {      // Regardless of buffer contents, try receiving data. -    UdpPacket packet(32768); -    int ret = m_sock.receive(packet); - -    if (ret == -1) { -        stringstream ss; -        ss << "Could not read from UDP socket: " << inetErrMsg; -        throw runtime_error(ss.str()); -    } +    auto packet = m_sock.receive(32768); -    std::copy(packet.getData(), packet.getData() + packet.getSize(), -            back_inserter(m_buffer)); +    std::copy(packet.buffer.cbegin(), packet.buffer.cend(), back_inserter(m_buffer));      // Take data from the buffer if it contains enough data,      // in any case write the buffer      if (m_buffer.size() >= (size_t)size) {          std::copy(m_buffer.begin(), m_buffer.begin() + size, buffer); +        return size;      }      else {          memset(buffer, 0x0, size); +        return 0;      } +} -    return size; +size_t Udp::readFrame(uint8_t *buffer, size_t size, std::time_t seconds, int utco, uint32_t tsta) +{ +    // Maybe there's a way to carry timestamps, but we don't need it. +    memset(buffer, 0x0, size); +    return 0;  }  int Udp::setBitrate(int bitrate)  {      if (bitrate <= 0) { -        etiLog.log(error, "Invalid bitrate (%i)\n", bitrate); -        return -1; +        throw invalid_argument("Invalid bitrate " + to_string(bitrate) + " for " + m_name);      }      return bitrate;  } -int Udp::close() +void Udp::close()  { -    return m_sock.close(); +    m_sock.close();  } @@ -165,10 +152,10 @@ static uint16_t unpack2(const uint8_t *buf)      return (((uint16_t)buf[0]) << 8) | buf[1];  } -int Sti_d_Rtp::open(const std::string& name) +void Sti_d_Rtp::open(const std::string& name)  { -    // Skip the sti-rtp:// part if it is present -    const string endpoint = (name.substr(0, 10) == "sti-rtp://") ? +    // Skip the rtp:// part if it is present +    const string endpoint = (name.substr(0, 10) == "rtp://") ?          name.substr(10) : name;      // The endpoint should be address:port @@ -176,43 +163,34 @@ int Sti_d_Rtp::open(const std::string& name)      if (colon_pos == string::npos) {          stringstream ss;          ss << "'" << name << -                " is an invalid format for sti-rtp address: " -                "expected [sti-rtp://]address:port"; +                " is an invalid format for rtp address: " +                "expected [rtp://]address:port";          throw invalid_argument(ss.str());      }      m_name = name;      openUdpSocket(endpoint); - -    return 0;  }  void Sti_d_Rtp::receive_packet()  { -    UdpPacket packet(32768); -    int ret = m_sock.receive(packet); +    auto packet = m_sock.receive(32768); -    if (ret == -1) { -        stringstream ss; -        ss << "Could not read from UDP socket: " << inetErrMsg; -        throw runtime_error(ss.str()); -    } - -    if (packet.getSize() == 0) { +    if (packet.buffer.empty()) {          // No packet was received          return;      }      const size_t STI_FC_LEN = 8; -    if (packet.getSize() < RTP_HEADER_LEN + STI_SYNC_LEN + STI_FC_LEN) { +    if (packet.buffer.size() < RTP_HEADER_LEN + STI_SYNC_LEN + STI_FC_LEN) {          etiLog.level(info) << "Received too small RTP packet for " <<              m_name;          return;      } -    if (not rtpHeaderValid(packet.getData())) { +    if (not rtpHeaderValid(packet.buffer.data())) {          etiLog.level(info) << "Received invalid RTP header for " <<              m_name;          return; @@ -220,7 +198,7 @@ void Sti_d_Rtp::receive_packet()      //  STI(PI, X)      size_t index = RTP_HEADER_LEN; -    const uint8_t *buf = packet.getData(); +    const uint8_t *buf = packet.buffer.data();      //   SYNC      index++; // Advance over STAT @@ -242,7 +220,7 @@ void Sti_d_Rtp::receive_packet()              m_name;          return;      } -    if (packet.getSize() < index + DFS) { +    if (packet.buffer.size() < index + DFS) {          etiLog.level(info) << "Received STI too small for given DFS for " <<              m_name;          return; @@ -270,9 +248,9 @@ void Sti_d_Rtp::receive_packet()      uint16_t NST   = unpack2(buf+index) & 0x7FF; // 11 bits      index += 2; -    if (packet.getSize() < index + 4*NST) { +    if (packet.buffer.size() < index + 4*NST) {          etiLog.level(info) << "Received STI too small to contain NST for " << -            m_name << " packet: " << packet.getSize() << " need " << +            m_name << " packet: " << packet.buffer.size() << " need " <<              index + 4*NST;          return;      } @@ -307,7 +285,7 @@ void Sti_d_Rtp::receive_packet()      }  } -int Sti_d_Rtp::readFrame(uint8_t* buffer, size_t size) +size_t Sti_d_Rtp::readFrame(uint8_t *buffer, size_t size)  {      // Make sure we fill faster than we consume in case there      // are pending packets. @@ -316,19 +294,20 @@ int Sti_d_Rtp::readFrame(uint8_t* buffer, size_t size)      if (m_queue.empty()) {          memset(buffer, 0x0, size); +        return 0;      }      else if (m_queue.front().size() != size) {          etiLog.level(warn) << "Invalid input data size for STI " << m_name <<              " : RX " << m_queue.front().size() << " expected " << size;          memset(buffer, 0x0, size);          m_queue.pop_front(); +        return 0;      }      else {          copy(m_queue.front().begin(), m_queue.front().end(), buffer);          m_queue.pop_front(); +        return size;      } - -    return 0;  }  } diff --git a/src/input/Udp.h b/src/input/Udp.h index dc01486..e5961c7 100644 --- a/src/input/Udp.h +++ b/src/input/Udp.h @@ -31,7 +31,7 @@  #include <deque>  #include <boost/thread.hpp>  #include "input/inputs.h" -#include "UdpSocket.h" +#include "Socket.h"  namespace Inputs { @@ -40,13 +40,14 @@ namespace Inputs {   */  class Udp : public InputBase {      public: -        virtual int open(const std::string& name); -        virtual int readFrame(uint8_t* buffer, size_t size); +        virtual void open(const std::string& name); +        virtual size_t readFrame(uint8_t *buffer, size_t size); +        virtual size_t readFrame(uint8_t *buffer, size_t size, std::time_t seconds, int utco, uint32_t tsta);          virtual int setBitrate(int bitrate); -        virtual int close(); +        virtual void close();      protected: -        UdpSocket m_sock; +        Socket::UDPSocket m_sock;          std::string m_name;          void openUdpSocket(const std::string& endpoint); @@ -67,8 +68,8 @@ class Sti_d_Rtp : public Udp {      using vec_u8 = std::vector<uint8_t>;      public: -        virtual int open(const std::string& name); -        virtual int readFrame(uint8_t* buffer, size_t size); +        virtual void open(const std::string& name); +        virtual size_t readFrame(uint8_t *buffer, size_t size);      private:          void receive_packet(void); diff --git a/src/input/Zmq.cpp b/src/input/Zmq.cpp index 2e35907..0a9d59d 100644 --- a/src/input/Zmq.cpp +++ b/src/input/Zmq.cpp @@ -2,7 +2,7 @@     Copyright (C) 2009 Her Majesty the Queen in Right of Canada (Communications     Research Center Canada) -   Copyright (C) 2018 Matthias P. Braendli +   Copyright (C) 2019 Matthias P. Braendli      http://www.opendigitalradio.org     ZeroMQ input. see www.zeromq.org for more info @@ -220,7 +220,7 @@ void ZmqBase::rebind()      }  } -int ZmqBase::open(const std::string& inputUri) +void ZmqBase::open(const std::string& inputUri)  {      m_inputUri = inputUri; @@ -229,33 +229,32 @@ int ZmqBase::open(const std::string& inputUri)      // We want to appear in the statistics !      m_stats.registerAtServer(); - -    return 0;  } -int ZmqBase::close() +void ZmqBase::close()  {      m_zmq_sock.close(); -    return 0;  }  int ZmqBase::setBitrate(int bitrate)  { +    if (bitrate <= 0) { +        throw invalid_argument("Invalid bitrate " + to_string(bitrate) + " for " + m_name); +    } +      m_bitrate = bitrate; -    return bitrate; // TODO do a nice check here +    return bitrate;  }  // size corresponds to a frame size. It is constant for a given bitrate -int ZmqBase::readFrame(uint8_t* buffer, size_t size) +size_t ZmqBase::readFrame(uint8_t* buffer, size_t size)  { -    int rc; -      /* We must *always* read data from the ZMQ socket,       * to make sure that ZMQ internal buffers are emptied       * quickly. It's the only way to control the buffers       * of the whole path from encoder to our frame_buffer.       */ -    rc = readFromSocket(size); +    const auto readsize = readFromSocket(size);      /* Notify of a buffer overrun, and drop some frames */      if (m_frame_buffer.size() >= m_config.buffer_size) { @@ -296,10 +295,10 @@ int ZmqBase::readFrame(uint8_t* buffer, size_t size)      }      if (m_prebuf_current > 0) { -        if (rc > 0) +        if (readsize > 0)              m_prebuf_current--;          if (m_prebuf_current == 0) -            etiLog.log(info, "inputZMQ %s input pre-buffering complete\n", +            etiLog.log(info, "inputZMQ %s input pre-buffering complete",                  m_rc_name.c_str());          /* During prebuffering, give a zeroed frame to the mux */ @@ -312,7 +311,7 @@ int ZmqBase::readFrame(uint8_t* buffer, size_t size)      m_stats.notifyBuffer(m_frame_buffer.size() * size);      if (m_frame_buffer.empty()) { -        etiLog.log(warn, "inputZMQ %s input empty, re-enabling pre-buffering\n", +        etiLog.log(warn, "inputZMQ %s input empty, re-enabling pre-buffering",                  m_rc_name.c_str());          // reset prebuffering          m_prebuf_current = m_config.prebuffering; @@ -332,6 +331,13 @@ int ZmqBase::readFrame(uint8_t* buffer, size_t size)      }  } +size_t ZmqBase::readFrame(uint8_t *buffer, size_t size, std::time_t seconds, int utco, uint32_t tsta) +{ +    // TODO add timestamps into the metadata and implement this +    memset(buffer, 0, size); +    return 0; +} +  /******** MPEG input *******/ diff --git a/src/input/Zmq.h b/src/input/Zmq.h index eb67fe5..2e37b5f 100644 --- a/src/input/Zmq.h +++ b/src/input/Zmq.h @@ -2,7 +2,7 @@     Copyright (C) 2009 Her Majesty the Queen in Right of Canada (Communications     Research Center Canada) -   Copyright (C) 2017 Matthias P. Braendli +   Copyright (C) 2019 Matthias P. Braendli      http://www.opendigitalradio.org     ZeroMQ input. see www.zeromq.org for more info @@ -45,7 +45,7 @@  #include <list>  #include <string> -#include <stdint.h> +#include <cstdint>  #include "zmq.hpp"  #include "input/inputs.h"  #include "ManagementServer.h" @@ -156,6 +156,7 @@ class ZmqBase : public InputBase, public RemoteControllable {              m_bitrate(0),              m_enable_input(true),              m_config(config), +            m_name(name),              m_stats(name),              m_prebuf_current(config.prebuffering) {                  RC_ADD_PARAMETER(enable, @@ -180,10 +181,11 @@ class ZmqBase : public InputBase, public RemoteControllable {                  INVALIDATE_KEY(m_curve_encoder_key);              } -        virtual int open(const std::string& inputUri); -        virtual int readFrame(uint8_t* buffer, size_t size); +        virtual void open(const std::string& inputUri); +        virtual size_t readFrame(uint8_t *buffer, size_t size); +        virtual size_t readFrame(uint8_t *buffer, size_t size, std::time_t seconds, int utco, uint32_t tsta);          virtual int setBitrate(int bitrate); -        virtual int close(); +        virtual void close();          /* Remote control */          virtual void set_parameter(const std::string& parameter, @@ -220,6 +222,7 @@ class ZmqBase : public InputBase, public RemoteControllable {          char m_curve_encoder_key[CURVE_KEYLEN+1];          std::string m_inputUri; +        std::string m_name;          InputStat m_stats; diff --git a/src/input/inputs.h b/src/input/inputs.h index bfb1fb6..83cdbf2 100644 --- a/src/input/inputs.h +++ b/src/input/inputs.h @@ -2,7 +2,7 @@     Copyright (C) 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the Queen in     Right of Canada (Communications Research Center Canada) -   Copyright (C) 2016 +   Copyright (C) 2019     Matthias P. Braendli, matthias.braendli@mpb.li      http://www.opendigitalradio.org @@ -35,17 +35,63 @@  namespace Inputs { +enum class BufferManagement { +    // Use a buffer in the input that doesn't consider timestamps +    Prebuffering, + +    // Buffer incoming data until a given timestamp is reached +    Timestamped, +}; + +  /* New input object base */  class InputBase {      public: -        virtual int open(const std::string& name) = 0; -        virtual int readFrame(uint8_t* buffer, size_t size) = 0; +        /* Throws runtime_error or invalid_argument on failure */ +        virtual void open(const std::string& name) = 0; + +        /* read a frame from the input. Buffer management is either not necessary +         * (e.g. File input) or done with pre-buffering (network-based inputs). +         * +         * This ignores timestamps. All inputs support this. +         * +         * Returns number of data bytes written to the buffer. May clear the buffer +         * if no data bytes available, in which case it will return 0. +         * +         * Returns negative on error. +         */ +        virtual size_t readFrame(uint8_t *buffer, size_t size) = 0; + +        /* read a frame from the input, taking into account timestamp. The timestamp of the data +         * returned is not more recent than the timestamp specified in seconds and tsta. +         * +         * seconds is in UNIX epoch, utco is the TAI-UTC offset, tsta is in the format used by ETI. +         * +         * Returns number of data bytes written to the buffer. May clear the buffer +         * if no data bytes available, in which case it will return 0. +         * +         * Returns negative on error. +         * +         * Calling this function on inputs that do not support timestamps returns 0. This allows +         * changing the buffer management at runtime without risking an crash due to an exception. +         */ +        virtual size_t readFrame(uint8_t *buffer, size_t size, std::time_t seconds, int utco, uint32_t tsta) = 0; + +        /* Returns the effectively used bitrate, or throws invalid_argument on invalid bitrate */          virtual int setBitrate(int bitrate) = 0; -        virtual int close() = 0; +        virtual void close() = 0;          virtual ~InputBase() {} + +        void setTistDelay(const std::chrono::milliseconds& ms) { m_tist_delay = ms; } +        void setBufferManagement(BufferManagement bm) { m_bufferManagement = bm; } +        BufferManagement getBufferManagement() const { return m_bufferManagement; } +      protected:          InputBase() {} + +        std::atomic<BufferManagement> m_bufferManagement = ATOMIC_VAR_INIT(BufferManagement::Prebuffering); +        std::chrono::milliseconds m_tist_delay;  };  }; diff --git a/src/utils.cpp b/src/utils.cpp index 721c145..7cd441a 100644 --- a/src/utils.cpp +++ b/src/utils.cpp @@ -25,6 +25,7 @@     along with ODR-DabMux.  If not, see <http://www.gnu.org/licenses/>.  */  #include <cstring> +#include <climits>  #include <iostream>  #include <memory>  #include <boost/algorithm/string/join.hpp> @@ -328,7 +329,7 @@ void printSubchannels(const vec_sp_subchannel& subchannels)          etiLog.level(info) << "   URI:     " << subchannel->inputUri;          switch (subchannel->type) {              case subchannel_type_t::DABAudio: -                etiLog.log(info, " type:       DAbAudio"); +                etiLog.log(info, " type:       DABAudio");                  break;              case subchannel_type_t::DABPlusAudio:                  etiLog.log(info, " type:       DABPlusAudio"); diff --git a/src/zmq2edi/EDISender.cpp b/src/zmq2edi/EDISender.cpp index 2128abf..2188f8a 100644 --- a/src/zmq2edi/EDISender.cpp +++ b/src/zmq2edi/EDISender.cpp @@ -79,7 +79,7 @@ void EDISender::print_configuration()  void EDISender::send_eti_frame(uint8_t* p, metadata_t metadata)  {      edi::TagDETI edi_tagDETI; -    edi::TagStarPTR edi_tagStarPtr; +    edi::TagStarPTR edi_tagStarPtr("DETI");      map<int, edi::TagESTn> edi_subchannelToTag;      // The above Tag Items will be assembled into a TAG Packet      edi::TagPacket edi_tagpacket(edi_conf.tagpacket_alignment); diff --git a/src/zmq2edi/EDISender.h b/src/zmq2edi/EDISender.h index bb9c8bc..3525b4b 100644 --- a/src/zmq2edi/EDISender.h +++ b/src/zmq2edi/EDISender.h @@ -34,9 +34,9 @@  #include <atomic>  #include "ThreadsafeQueue.h"  #include "dabOutput/dabOutput.h" -#include "dabOutput/edi/TagItems.h" -#include "dabOutput/edi/TagPacket.h" -#include "dabOutput/edi/Transport.h" +#include "edioutput/TagItems.h" +#include "edioutput/TagPacket.h" +#include "edioutput/Transport.h"  // This metadata gets transmitted in the zmq stream  struct metadata_t { diff --git a/src/zmq2edi/zmq2edi.cpp b/src/zmq2edi/zmq2edi.cpp index 3888d8a..f7d733c 100644 --- a/src/zmq2edi/zmq2edi.cpp +++ b/src/zmq2edi/zmq2edi.cpp @@ -27,11 +27,13 @@  #include "Log.h"  #include "zmq.hpp" -#include <math.h>  #include <getopt.h> -#include <string.h> +#include <cmath> +#include <cstring> +#include <chrono>  #include <iostream>  #include <iterator> +#include <thread>  #include <vector>  #include "EDISender.h" @@ -39,12 +41,13 @@  constexpr size_t MAX_ERROR_COUNT = 10;  constexpr long ZMQ_TIMEOUT_MS = 1000; +constexpr long DEFAULT_BACKOFF = 5000;  static edi::configuration_t edi_conf;  static EDISender edisender; -void usage(void) +static void usage()  {      using namespace std; @@ -54,23 +57,25 @@ void usage(void)      cerr << "Options:" << endl;      cerr << "The following options can be given only once:" << endl;      cerr << " <source> is a ZMQ URL that points to a ODR-DabMux ZMQ output." << endl; -    cerr << " -w <delay> Keep every ETI frame until TIST is <delay> milliseconds after current system time." << endl; -    cerr << " -x Drop frames where for which the wait time would be negative, i.e. frames that arrived too late." << endl; -    cerr << " -p <destination port> sets the destination port." << endl; -    cerr << " -P Disable PFT and send AFPackets." << endl; -    cerr << " -f <fec> sets the FEC." << endl; -    cerr << " -i <interleave> enables the interleaved with this latency." << endl; -    cerr << " -D dumps the EDI to edi.debug file." << endl; -    cerr << " -v Enables verbose mode." << endl; -    cerr << " -a <tagpacket alignement> sets the alignment of the TAG Packet (default 8)." << endl << endl; +    cerr << " -w <delay>            Keep every ETI frame until TIST is <delay> milliseconds after current system time." << endl; +    cerr << "                       Negative delay values are also allowed." << endl; +    cerr << " -x                    Drop frames where for which the wait time would be negative, i.e. frames that arrived too late." << endl; +    cerr << " -p <destination port> Set the destination port." << endl; +    cerr << " -P                    Disable PFT and send AFPackets." << endl; +    cerr << " -f <fec>              Set the FEC." << endl; +    cerr << " -i <interleave>       Enable the interleaver with this latency." << endl; +    cerr << " -D                    Dump the EDI to edi.debug file." << endl; +    cerr << " -v                    Enables verbose mode." << endl; +    cerr << " -a <alignement>       Set the alignment of the TAG Packet (default 8)." << endl; +    cerr << " -b <backoff>          Number of milliseconds to backoff after an input reset (default " << DEFAULT_BACKOFF << ")." << endl << endl;      cerr << "The following options can be given several times, when more than UDP destination is desired:" << endl; -    cerr << " -d <destination ip> sets the destination ip." << endl; -    cerr << " -s <source port> sets the source port." << endl; -    cerr << " -S <source ip> select the source IP in case we want to use multicast." << endl; -    cerr << " -t <ttl> set the packet's TTL." << endl << endl; +    cerr << " -d <destination ip>   Set the destination ip." << endl; +    cerr << " -s <source port>      Set the source port." << endl; +    cerr << " -S <source ip>        Select the source IP in case we want to use multicast." << endl; +    cerr << " -t <ttl>              Set the packet's TTL." << endl << endl; -    cerr << "odr-zmq2edi will quit if it does not receive data for " << +    cerr << "The input socket will be reset if no data is received for " <<          (int)(MAX_ERROR_COUNT * ZMQ_TIMEOUT_MS / 1000.0) << " seconds." << endl;      cerr << "It is best practice to run this tool under a process supervisor that will restart it automatically." << endl;  } @@ -228,10 +233,11 @@ int start(int argc, char **argv)      int delay_ms = 500;      bool drop_late_packets = false; +    uint32_t backoff_after_reset_ms = DEFAULT_BACKOFF;      int ch = 0;      while (ch != -1) { -        ch = getopt(argc, argv, "d:p:s:S:t:Pf:i:Dva:w:x"); +        ch = getopt(argc, argv, "d:p:s:S:t:Pf:i:Dva:b:w:xh");          switch (ch) {              case -1:                  break; @@ -276,6 +282,9 @@ int start(int argc, char **argv)              case 'a':                  edi_conf.tagpacket_alignment = std::stoi(optarg);                  break; +            case 'b': +                backoff_after_reset_ms = std::stoi(optarg); +                break;              case 'w':                  delay_ms = std::stoi(optarg);                  break; @@ -313,85 +322,92 @@ int start(int argc, char **argv)      const char* source_url = argv[optind]; - -    size_t frame_count = 0; -    size_t error_count = 0; - -    etiLog.level(info) << "Opening ZMQ input: " << source_url; -      zmq::context_t zmq_ctx(1); -    zmq::socket_t zmq_sock(zmq_ctx, ZMQ_SUB); -    zmq_sock.connect(source_url); -    zmq_sock.setsockopt(ZMQ_SUBSCRIBE, NULL, 0); // subscribe to all messages - -    while (error_count < MAX_ERROR_COUNT) { -        zmq::message_t incoming; -        zmq::pollitem_t items[1]; -        items[0].socket = zmq_sock; -        items[0].events = ZMQ_POLLIN; -        const int num_events = zmq::poll(items, 1, ZMQ_TIMEOUT_MS); -        if (num_events == 0) { // timeout -            error_count++; -        } -        else { -            // Event received: recv will not block -            zmq_sock.recv(&incoming); - -            zmq_dab_message_t* dab_msg = (zmq_dab_message_t*)incoming.data(); +    etiLog.level(info) << "Opening ZMQ input: " << source_url; -            if (dab_msg->version != 1) { -                etiLog.level(error) << "ZeroMQ wrong packet version " << dab_msg->version; +    size_t num_consecutive_resets = 0; +    while (true) { +        zmq::socket_t zmq_sock(zmq_ctx, ZMQ_SUB); +        zmq_sock.connect(source_url); +        zmq_sock.setsockopt(ZMQ_SUBSCRIBE, NULL, 0); // subscribe to all messages + +        size_t error_count = 0; + +        while (error_count < MAX_ERROR_COUNT) { +            zmq::message_t incoming; +            zmq::pollitem_t items[1]; +            items[0].socket = zmq_sock; +            items[0].events = ZMQ_POLLIN; +            const int num_events = zmq::poll(items, 1, ZMQ_TIMEOUT_MS); +            if (num_events == 0) { // timeout                  error_count++;              } +            else { +                num_consecutive_resets = 0; -            int offset = sizeof(dab_msg->version) + -                NUM_FRAMES_PER_ZMQ_MESSAGE * sizeof(*dab_msg->buflen); +                // Event received: recv will not block +                zmq_sock.recv(&incoming); -            std::list<std::pair<std::vector<uint8_t>, metadata_t> > all_frames; +                zmq_dab_message_t* dab_msg = (zmq_dab_message_t*)incoming.data(); -            for (int i = 0; i < NUM_FRAMES_PER_ZMQ_MESSAGE; i++) { -                if (dab_msg->buflen[i] <= 0 or dab_msg->buflen[i] > 6144) { -                    etiLog.level(error) << "ZeroMQ buffer " << i << -                        " has invalid length " << dab_msg->buflen[i]; +                if (dab_msg->version != 1) { +                    etiLog.level(error) << "ZeroMQ wrong packet version " << dab_msg->version;                      error_count++;                  } -                else { -                    std::vector<uint8_t> buf(6144, 0x55); -                    const int framesize = dab_msg->buflen[i]; +                int offset = sizeof(dab_msg->version) + +                    NUM_FRAMES_PER_ZMQ_MESSAGE * sizeof(*dab_msg->buflen); + +                std::list<std::pair<std::vector<uint8_t>, metadata_t> > all_frames; + +                for (int i = 0; i < NUM_FRAMES_PER_ZMQ_MESSAGE; i++) { +                    if (dab_msg->buflen[i] <= 0 or dab_msg->buflen[i] > 6144) { +                        etiLog.level(error) << "ZeroMQ buffer " << i << +                            " has invalid length " << dab_msg->buflen[i]; +                        error_count++; +                    } +                    else { +                        std::vector<uint8_t> buf(6144, 0x55); + +                        const int framesize = dab_msg->buflen[i]; -                    memcpy(&buf.front(), -                            ((uint8_t*)incoming.data()) + offset, -                            framesize); +                        memcpy(&buf.front(), +                                ((uint8_t*)incoming.data()) + offset, +                                framesize); -                    all_frames.emplace_back( -                            std::piecewise_construct, -                            std::make_tuple(std::move(buf)), -                            std::make_tuple()); +                        all_frames.emplace_back( +                                std::piecewise_construct, +                                std::make_tuple(std::move(buf)), +                                std::make_tuple()); -                    offset += framesize; +                        offset += framesize; +                    }                  } -            } -            for (auto &f : all_frames) { -                size_t consumed_bytes = 0; +                for (auto &f : all_frames) { +                    size_t consumed_bytes = 0; -                f.second = get_md_one_frame( -                        static_cast<uint8_t*>(incoming.data()) + offset, -                        incoming.size() - offset, -                        &consumed_bytes); +                    f.second = get_md_one_frame( +                            static_cast<uint8_t*>(incoming.data()) + offset, +                            incoming.size() - offset, +                            &consumed_bytes); -                offset += consumed_bytes; -            } +                    offset += consumed_bytes; +                } -            for (auto &f : all_frames) { -                edisender.push_frame(f); -                frame_count++; +                for (auto &f : all_frames) { +                    edisender.push_frame(f); +                }              }          } -    } -    etiLog.level(info) << "Quitting after " << frame_count << " frames transferred"; +        num_consecutive_resets++; + +        zmq_sock.close(); +        std::this_thread::sleep_for(std::chrono::milliseconds(backoff_after_reset_ms)); +        etiLog.level(info) << "ZMQ input (" << source_url << ") timeout after " << +            num_consecutive_resets << " consecutive resets."; +    }      return 0;  } diff --git a/src/zmq2farsync/zmq2farsync.cpp b/src/zmq2farsync/zmq2farsync.cpp index 16830a2..95dc074 100644 --- a/src/zmq2farsync/zmq2farsync.cpp +++ b/src/zmq2farsync/zmq2farsync.cpp @@ -3,7 +3,7 @@     2011, 2012 Her Majesty the Queen in Right of Canada (Communications     Research Center Canada) -   Copyright (C) 2018 +   Copyright (C) 2019     Matthias P. Braendli, matthias.braendli@mpb.li      http://www.opendigitalradio.org @@ -28,13 +28,16 @@  #include "dabOutput/dabOutput.h"  #include "Log.h"  #include "zmq.hpp" +#include <chrono>  #include <iostream> +#include <thread>  #include <vector>  constexpr size_t MAX_ERROR_COUNT = 10; +constexpr size_t MAX_NUM_RESETS = 180;  constexpr long ZMQ_TIMEOUT_MS = 1000; -void usage(void) +static void usage()  {      using namespace std; @@ -46,8 +49,9 @@ void usage(void)      cerr << " <destination> is the device information for the FarSync card." << endl << endl;      cerr << " The syntax is the same as for ODR-DabMux" << endl << endl; -    cerr << "odr-zmq2edi will quit if it does not receive data for " << +    cerr << "The input socket will be reset if no data is received for " <<          (int)(MAX_ERROR_COUNT * ZMQ_TIMEOUT_MS / 1000.0) << " seconds." << endl; +    cerr << "After " << MAX_NUM_RESETS << " consecutive resets, the process will quit." << endl;      cerr << "It is best practice to run this tool under a process supervisor that will restart it automatically." << endl;  } @@ -80,72 +84,81 @@ int main(int argc, char **argv)      etiLog.level(info) << "Opening ZMQ input: " << source_url;      zmq::context_t zmq_ctx(1); -    zmq::socket_t zmq_sock(zmq_ctx, ZMQ_SUB); - -    zmq_sock.connect(source_url); -    zmq_sock.setsockopt(ZMQ_SUBSCRIBE, NULL, 0); // subscribe to all messages - -    etiLog.level(info) << "Entering main loop";      size_t frame_count = 0;      size_t loop_counter = 0; -    size_t error_count = 0; -    while (error_count < MAX_ERROR_COUNT) -    { -        zmq::message_t incoming; -        zmq::pollitem_t items[1]; -        items[0].socket = zmq_sock; -        items[0].events = ZMQ_POLLIN; -        const int num_events = zmq::poll(items, 1, ZMQ_TIMEOUT_MS); -        if (num_events == 0) { // timeout -            error_count++; -        } -        else { -            // Event received: recv will not block -            zmq_sock.recv(&incoming); -            zmq_dab_message_t* dab_msg = (zmq_dab_message_t*)incoming.data(); - -            if (dab_msg->version != 1) { -                etiLog.level(error) << "ZeroMQ wrong packet version " << dab_msg->version; +    size_t num_consecutive_resets = 0; +    while (num_consecutive_resets < MAX_NUM_RESETS) { +        zmq::socket_t zmq_sock(zmq_ctx, ZMQ_SUB); +        zmq_sock.connect(source_url); +        zmq_sock.setsockopt(ZMQ_SUBSCRIBE, NULL, 0); // subscribe to all messages + +        size_t error_count = 0; +        while (error_count < MAX_ERROR_COUNT) { +            zmq::message_t incoming; +            zmq::pollitem_t items[1]; +            items[0].socket = zmq_sock; +            items[0].events = ZMQ_POLLIN; +            const int num_events = zmq::poll(items, 1, ZMQ_TIMEOUT_MS); +            if (num_events == 0) { // timeout                  error_count++;              } +            else { +                num_consecutive_resets = 0; -            int offset = sizeof(dab_msg->version) + -                NUM_FRAMES_PER_ZMQ_MESSAGE * sizeof(*dab_msg->buflen); +                // Event received: recv will not block +                zmq_sock.recv(&incoming); +                zmq_dab_message_t* dab_msg = (zmq_dab_message_t*)incoming.data(); -            for (int i = 0; i < NUM_FRAMES_PER_ZMQ_MESSAGE; i++) { -                if (dab_msg->buflen[i] <= 0 or dab_msg->buflen[i] > 6144) { -                    etiLog.level(error) << "ZeroMQ buffer " << i << " has invalid length " << -                        dab_msg->buflen[i]; +                if (dab_msg->version != 1) { +                    etiLog.level(error) << "ZeroMQ wrong packet version " << dab_msg->version;                      error_count++;                  } -                else { -                    std::vector<uint8_t> buf(6144, 0x55); -                    const int framesize = dab_msg->buflen[i]; +                int offset = sizeof(dab_msg->version) + +                    NUM_FRAMES_PER_ZMQ_MESSAGE * sizeof(*dab_msg->buflen); -                    memcpy(&buf.front(), -                            ((uint8_t*)incoming.data()) + offset, -                            framesize); - -                    offset += framesize; - -                    if (output.Write(&buf.front(), buf.size()) == -1) { -                        etiLog.level(error) << "Cannot write to output!"; +                for (int i = 0; i < NUM_FRAMES_PER_ZMQ_MESSAGE; i++) { +                    if (dab_msg->buflen[i] <= 0 or dab_msg->buflen[i] > 6144) { +                        etiLog.level(error) << "ZeroMQ buffer " << i << " has invalid length " << +                            dab_msg->buflen[i];                          error_count++;                      } +                    else { +                        std::vector<uint8_t> buf(6144, 0x55); + +                        const int framesize = dab_msg->buflen[i]; + +                        memcpy(&buf.front(), +                                ((uint8_t*)incoming.data()) + offset, +                                framesize); -                    frame_count++; +                        offset += framesize; + +                        if (output.Write(&buf.front(), buf.size()) == -1) { +                            etiLog.level(error) << "Cannot write to output!"; +                            error_count++; +                        } + +                        frame_count++; +                    }                  } -            } -            loop_counter++; -            if (loop_counter > 250) { -                etiLog.level(info) << "Transmitted " << frame_count << " ETI frames"; -                loop_counter = 0; +                loop_counter++; +                if (loop_counter > 250) { +                    etiLog.level(info) << "Transmitted " << frame_count << " ETI frames"; +                    loop_counter = 0; +                }              }          } + +        num_consecutive_resets++; + +        zmq_sock.close(); +        std::this_thread::sleep_for(std::chrono::seconds(1)); +        etiLog.level(info) << "ZMQ input (" << source_url << ") timeout after " << +            num_consecutive_resets << " consecutive resets.";      } -    return error_count > 0 ? 2 : 0; +    return 0;  } | 
